You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@atlas.apache.org by am...@apache.org on 2021/02/18 05:40:47 UTC
[atlas] branch branch-2.0 updated: ATLAS-4155:
NotificationHookConsumer: Fix for Large Message Processing Problem
This is an automated email from the ASF dual-hosted git repository.
amestry pushed a commit to branch branch-2.0
in repository https://gitbox.apache.org/repos/asf/atlas.git
The following commit(s) were added to refs/heads/branch-2.0 by this push:
new dbdfcd2 ATLAS-4155: NotificationHookConsumer: Fix for Large Message Processing Problem
dbdfcd2 is described below
commit dbdfcd27f264c6aa0bc6333ece5b992f9f985885
Author: Ashutosh Mestry <am...@cloudera.com>
AuthorDate: Wed Feb 17 21:17:30 2021 -0800
ATLAS-4155: NotificationHookConsumer: Fix for Large Message Processing Problem
---
.../org/apache/atlas/kafka/AtlasKafkaConsumer.java | 73 ++++++++++++++--------
.../atlas/notification/NotificationConsumer.java | 9 ++-
.../AbstractNotificationConsumerTest.java | 6 ++
.../notification/NotificationHookConsumer.java | 50 +++------------
.../NotificationHookConsumerKafkaTest.java | 5 --
5 files changed, 71 insertions(+), 72 deletions(-)
diff --git a/notification/src/main/java/org/apache/atlas/kafka/AtlasKafkaConsumer.java b/notification/src/main/java/org/apache/atlas/kafka/AtlasKafkaConsumer.java
index c38a504..f7d9668 100644
--- a/notification/src/main/java/org/apache/atlas/kafka/AtlasKafkaConsumer.java
+++ b/notification/src/main/java/org/apache/atlas/kafka/AtlasKafkaConsumer.java
@@ -20,12 +20,14 @@ package org.apache.atlas.kafka;
import org.apache.atlas.notification.AbstractNotificationConsumer;
import org.apache.atlas.notification.AtlasNotificationMessageDeserializer;
import org.apache.atlas.notification.NotificationInterface;
+import org.apache.commons.collections.MapUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
+import java.util.Map;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
@@ -63,7 +65,40 @@ public class AtlasKafkaConsumer<T> extends AbstractNotificationConsumer<T> {
@Override
public List<AtlasKafkaMessage<T>> receive(long timeoutMilliSeconds) {
+ return receive(this.pollTimeoutMilliSeconds, null);
+ }
+
+ @Override
+ public List<AtlasKafkaMessage<T>> receiveWithCheckedCommit(Map<TopicPartition, Long> lastCommittedPartitionOffset) {
+ return receive(this.pollTimeoutMilliSeconds, lastCommittedPartitionOffset);
+ }
+
+
+ @Override
+ public void commit(TopicPartition partition, long offset) {
+ if (!autoCommitEnabled) {
+ if (LOG.isDebugEnabled()) {
+ LOG.info(" commiting the offset ==>> " + offset);
+ }
+ kafkaConsumer.commitSync(Collections.singletonMap(partition, new OffsetAndMetadata(offset)));
+ }
+ }
+
+ @Override
+ public void close() {
+ if (kafkaConsumer != null) {
+ kafkaConsumer.close();
+ }
+ }
+
+ @Override
+ public void wakeup() {
+ if (kafkaConsumer != null) {
+ kafkaConsumer.wakeup();
+ }
+ }
+ private List<AtlasKafkaMessage<T>> receive(long timeoutMilliSeconds, Map<TopicPartition, Long> lastCommittedPartitionOffset) {
List<AtlasKafkaMessage<T>> messages = new ArrayList();
ConsumerRecords<?, ?> records = kafkaConsumer != null ? kafkaConsumer.poll(timeoutMilliSeconds) : null;
@@ -75,13 +110,24 @@ public class AtlasKafkaConsumer<T> extends AbstractNotificationConsumer<T> {
record.topic(), record.partition(), record.offset(), record.key(), record.value());
}
+ TopicPartition topicPartition = new TopicPartition(record.topic(), record.partition());
+ if (MapUtils.isNotEmpty(lastCommittedPartitionOffset)
+ && lastCommittedPartitionOffset.containsKey(topicPartition)
+ && record.offset() < lastCommittedPartitionOffset.get(topicPartition)) {
+
+ commit(topicPartition, record.offset());
+ LOG.info("Skipping already processed message: topic={}, partition={} offset={}. Last processed offset={}",
+ record.topic(), record.partition(), record.offset(), lastCommittedPartitionOffset.get(topicPartition));
+ continue;
+ }
+
T message = null;
try {
message = deserializer.deserialize(record.value().toString());
} catch (OutOfMemoryError excp) {
LOG.error("Ignoring message that failed to deserialize: topic={}, partition={}, offset={}, key={}, value={}",
- record.topic(), record.partition(), record.offset(), record.key(), record.value(), excp);
+ record.topic(), record.partition(), record.offset(), record.key(), record.value(), excp);
}
if (message == null) {
@@ -95,29 +141,4 @@ public class AtlasKafkaConsumer<T> extends AbstractNotificationConsumer<T> {
return messages;
}
-
-
- @Override
- public void commit(TopicPartition partition, long offset) {
- if (!autoCommitEnabled) {
- if (LOG.isDebugEnabled()) {
- LOG.info(" commiting the offset ==>> " + offset);
- }
- kafkaConsumer.commitSync(Collections.singletonMap(partition, new OffsetAndMetadata(offset)));
- }
- }
-
- @Override
- public void close() {
- if (kafkaConsumer != null) {
- kafkaConsumer.close();
- }
- }
-
- @Override
- public void wakeup() {
- if (kafkaConsumer != null) {
- kafkaConsumer.wakeup();
- }
- }
}
diff --git a/notification/src/main/java/org/apache/atlas/notification/NotificationConsumer.java b/notification/src/main/java/org/apache/atlas/notification/NotificationConsumer.java
index f3e81ec..1fb9f99 100644
--- a/notification/src/main/java/org/apache/atlas/notification/NotificationConsumer.java
+++ b/notification/src/main/java/org/apache/atlas/notification/NotificationConsumer.java
@@ -18,6 +18,8 @@
package org.apache.atlas.notification;
import java.util.List;
+import java.util.Map;
+
import org.apache.kafka.common.TopicPartition;
import org.apache.atlas.kafka.AtlasKafkaMessage;
@@ -55,5 +57,10 @@ public interface NotificationConsumer<T> {
List<AtlasKafkaMessage<T>> receive(long timeoutMilliSeconds);
-
+ /**
+ * Fetch data for the topics from Kafka, if lastCommittedOffset same as message
+ * received offset, it will proceed with commit.
+ * @return List containing kafka message and partionId and offset.
+ */
+ List<AtlasKafkaMessage<T>> receiveWithCheckedCommit(Map<TopicPartition, Long> lastCommittedPartitionOffset);
}
diff --git a/notification/src/test/java/org/apache/atlas/notification/AbstractNotificationConsumerTest.java b/notification/src/test/java/org/apache/atlas/notification/AbstractNotificationConsumerTest.java
index 05d0d81..1b486e5 100644
--- a/notification/src/test/java/org/apache/atlas/notification/AbstractNotificationConsumerTest.java
+++ b/notification/src/test/java/org/apache/atlas/notification/AbstractNotificationConsumerTest.java
@@ -30,6 +30,7 @@ import org.testng.annotations.Test;
import java.util.ArrayList;
import java.util.LinkedList;
import java.util.List;
+import java.util.Map;
import java.util.Objects;
import static org.mockito.Mockito.mock;
@@ -223,6 +224,11 @@ public class AbstractNotificationConsumerTest {
}
return tempMessageList;
}
+
+ @Override
+ public List<AtlasKafkaMessage<TestMessage>> receiveWithCheckedCommit(Map<TopicPartition, Long> lastCommittedPartitionOffset) {
+ return receive();
+ }
}
public static class TestMessageDeserializer extends AbstractMessageDeserializer<TestMessage> {
diff --git a/webapp/src/main/java/org/apache/atlas/notification/NotificationHookConsumer.java b/webapp/src/main/java/org/apache/atlas/notification/NotificationHookConsumer.java
index 0e58dac..84cc8d8 100644
--- a/webapp/src/main/java/org/apache/atlas/notification/NotificationHookConsumer.java
+++ b/webapp/src/main/java/org/apache/atlas/notification/NotificationHookConsumer.java
@@ -67,6 +67,7 @@ import org.apache.commons.collections.MapUtils;
import org.apache.commons.collections4.map.PassiveExpiringMap;
import org.apache.commons.configuration.Configuration;
import org.apache.commons.lang.StringUtils;
+import org.apache.kafka.common.TopicPartition;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.context.annotation.DependsOn;
@@ -84,6 +85,7 @@ import java.time.Instant;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
+import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
@@ -188,6 +190,7 @@ public class NotificationHookConsumer implements Service, ActiveStateChangeHandl
private final Configuration applicationProperties;
private ExecutorService executors;
private Instant nextStatsLogTime = AtlasMetricsCounter.getNextHourStartTime(Instant.now());
+ private final Map<TopicPartition, Long> lastCommittedPartitionOffset;
@VisibleForTesting
final int consumerRetryInterval;
@@ -205,7 +208,8 @@ public class NotificationHookConsumer implements Service, ActiveStateChangeHandl
this.instanceConverter = instanceConverter;
this.typeRegistry = typeRegistry;
this.applicationProperties = ApplicationProperties.get();
- this.metricsUtil = metricsUtil;
+ this.metricsUtil = metricsUtil;
+ this.lastCommittedPartitionOffset = new HashMap<>();
maxRetries = applicationProperties.getInt(CONSUMER_RETRIES_PROPERTY, 3);
failedMsgCacheSize = applicationProperties.getInt(CONSUMER_FAILEDCACHESIZE_PROPERTY, 1);
@@ -517,14 +521,10 @@ public class NotificationHookConsumer implements Service, ActiveStateChangeHandl
private final List<String> failedMessages = new ArrayList<>();
private final AdaptiveWaiter adaptiveWaiter = new AdaptiveWaiter(minWaitDuration, maxWaitDuration, minWaitDuration);
- @VisibleForTesting
- final FailedCommitOffsetRecorder failedCommitOffsetRecorder;
-
public HookConsumer(NotificationConsumer<HookNotification> consumer) {
super("atlas-hook-consumer-thread", false);
this.consumer = consumer;
- failedCommitOffsetRecorder = new FailedCommitOffsetRecorder();
}
@Override
@@ -540,7 +540,7 @@ public class NotificationHookConsumer implements Service, ActiveStateChangeHandl
try {
while (shouldRun.get()) {
try {
- List<AtlasKafkaMessage<HookNotification>> messages = consumer.receive();
+ List<AtlasKafkaMessage<HookNotification>> messages = consumer.receiveWithCheckedCommit(lastCommittedPartitionOffset);
for (AtlasKafkaMessage<HookNotification> msg : messages) {
handleMessage(msg);
@@ -586,11 +586,6 @@ public class NotificationHookConsumer implements Service, ActiveStateChangeHandl
}
try {
- if(failedCommitOffsetRecorder.isMessageReplayed(kafkaMsg.getOffset())) {
- commit(kafkaMsg);
- return;
- }
-
// covert V1 messages to V2 to enable preProcess
try {
switch (message.getType()) {
@@ -919,16 +914,11 @@ public class NotificationHookConsumer implements Service, ActiveStateChangeHandl
}
private void commit(AtlasKafkaMessage<HookNotification> kafkaMessage) {
- boolean commitSucceessStatus = false;
- try {
- recordFailedMessages();
-
- consumer.commit(kafkaMessage.getTopicPartition(), kafkaMessage.getOffset() + 1);
+ recordFailedMessages();
- commitSucceessStatus = true;
- } finally {
- failedCommitOffsetRecorder.recordIfFailed(commitSucceessStatus, kafkaMessage.getOffset());
- }
+ long commitOffset = kafkaMessage.getOffset() + 1;
+ lastCommittedPartitionOffset.put(kafkaMessage.getTopicPartition(), commitOffset);
+ consumer.commit(kafkaMessage.getTopicPartition(), commitOffset);
}
boolean serverAvailable(Timer timer) {
@@ -1330,24 +1320,4 @@ public class NotificationHookConsumer implements Service, ActiveStateChangeHandl
return ret;
}
-
- static class FailedCommitOffsetRecorder {
- private Long currentOffset;
-
- public void recordIfFailed(boolean commitStatus, long offset) {
- if(commitStatus) {
- currentOffset = null;
- } else {
- currentOffset = offset;
- }
- }
-
- public boolean isMessageReplayed(long offset) {
- return currentOffset != null && currentOffset == offset;
- }
-
- public Long getCurrentOffset() {
- return currentOffset;
- }
- }
}
diff --git a/webapp/src/test/java/org/apache/atlas/notification/NotificationHookConsumerKafkaTest.java b/webapp/src/test/java/org/apache/atlas/notification/NotificationHookConsumerKafkaTest.java
index 33191a7..65e8b50 100644
--- a/webapp/src/test/java/org/apache/atlas/notification/NotificationHookConsumerKafkaTest.java
+++ b/webapp/src/test/java/org/apache/atlas/notification/NotificationHookConsumerKafkaTest.java
@@ -130,7 +130,6 @@ public class NotificationHookConsumerKafkaTest {
ExceptionThrowingCommitConsumer consumer = createNewConsumerThatThrowsExceptionInCommit(kafkaNotification, true);
NotificationHookConsumer notificationHookConsumer = new NotificationHookConsumer(notificationInterface, atlasEntityStore, serviceState, instanceConverter, typeRegistry, metricsUtil);
NotificationHookConsumer.HookConsumer hookConsumer = notificationHookConsumer.new HookConsumer(consumer);
- NotificationHookConsumer.FailedCommitOffsetRecorder failedCommitOffsetRecorder = hookConsumer.failedCommitOffsetRecorder;
produceMessage(new HookNotificationV1.EntityCreateRequest("test_user2", createEntity()));
@@ -143,16 +142,12 @@ public class NotificationHookConsumerKafkaTest {
assertTrue(true, "ExceptionThrowing consumer throws an excepion.");
}
- assertTrue(failedCommitOffsetRecorder.getCurrentOffset() > -1);
-
consumer.disableCommitExpcetion();
produceMessage(new HookNotificationV1.EntityCreateRequest("test_user1", createEntity()));
consumeOneMessage(consumer, hookConsumer);
consumeOneMessage(consumer, hookConsumer);
- assertNull(failedCommitOffsetRecorder.getCurrentOffset());
-
reset(atlasEntityStore);
}