You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@atlas.apache.org by am...@apache.org on 2021/02/18 05:30:18 UTC

[atlas] branch master updated: ATLAS-4155: NotificationHookConsumer: Fix for Large Message Processing Problem

This is an automated email from the ASF dual-hosted git repository.

amestry pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/atlas.git


The following commit(s) were added to refs/heads/master by this push:
     new e17ef54  ATLAS-4155: NotificationHookConsumer: Fix for Large Message Processing Problem
e17ef54 is described below

commit e17ef54d0f701bd5b8521d0ecd2908db32f984f3
Author: Ashutosh Mestry <am...@cloudera.com>
AuthorDate: Wed Feb 17 21:17:30 2021 -0800

    ATLAS-4155: NotificationHookConsumer: Fix for Large Message Processing Problem
---
 .../org/apache/atlas/kafka/AtlasKafkaConsumer.java | 73 ++++++++++++++--------
 .../atlas/notification/NotificationConsumer.java   |  9 ++-
 .../AbstractNotificationConsumerTest.java          |  6 ++
 .../notification/NotificationHookConsumer.java     | 50 +++------------
 .../NotificationHookConsumerKafkaTest.java         |  5 --
 5 files changed, 71 insertions(+), 72 deletions(-)

diff --git a/notification/src/main/java/org/apache/atlas/kafka/AtlasKafkaConsumer.java b/notification/src/main/java/org/apache/atlas/kafka/AtlasKafkaConsumer.java
index c38a504..f7d9668 100644
--- a/notification/src/main/java/org/apache/atlas/kafka/AtlasKafkaConsumer.java
+++ b/notification/src/main/java/org/apache/atlas/kafka/AtlasKafkaConsumer.java
@@ -20,12 +20,14 @@ package org.apache.atlas.kafka;
 import org.apache.atlas.notification.AbstractNotificationConsumer;
 import org.apache.atlas.notification.AtlasNotificationMessageDeserializer;
 import org.apache.atlas.notification.NotificationInterface;
+import org.apache.commons.collections.MapUtils;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import java.util.ArrayList;
 import java.util.Collections;
 import java.util.List;
+import java.util.Map;
 
 import org.apache.kafka.clients.consumer.ConsumerRecord;
 import org.apache.kafka.clients.consumer.ConsumerRecords;
@@ -63,7 +65,40 @@ public class AtlasKafkaConsumer<T> extends AbstractNotificationConsumer<T> {
 
     @Override
     public List<AtlasKafkaMessage<T>> receive(long timeoutMilliSeconds) {
+        return receive(this.pollTimeoutMilliSeconds, null);
+    }
+
+    @Override
+    public List<AtlasKafkaMessage<T>> receiveWithCheckedCommit(Map<TopicPartition, Long> lastCommittedPartitionOffset) {
+        return receive(this.pollTimeoutMilliSeconds, lastCommittedPartitionOffset);
+    }
+
+
+    @Override
+    public void commit(TopicPartition partition, long offset) {
+        if (!autoCommitEnabled) {
+            if (LOG.isDebugEnabled()) {
+                LOG.info(" commiting the offset ==>> " + offset);
+            }
+            kafkaConsumer.commitSync(Collections.singletonMap(partition, new OffsetAndMetadata(offset)));
+        }
+    }
+
+    @Override
+    public void close() {
+        if (kafkaConsumer != null) {
+            kafkaConsumer.close();
+        }
+    }
+
+    @Override
+    public void wakeup() {
+        if (kafkaConsumer != null) {
+            kafkaConsumer.wakeup();
+        }
+    }
 
+    private List<AtlasKafkaMessage<T>> receive(long timeoutMilliSeconds, Map<TopicPartition, Long> lastCommittedPartitionOffset) {
         List<AtlasKafkaMessage<T>> messages = new ArrayList();
 
         ConsumerRecords<?, ?> records = kafkaConsumer != null ? kafkaConsumer.poll(timeoutMilliSeconds) : null;
@@ -75,13 +110,24 @@ public class AtlasKafkaConsumer<T> extends AbstractNotificationConsumer<T> {
                             record.topic(), record.partition(), record.offset(), record.key(), record.value());
                 }
 
+                TopicPartition topicPartition = new TopicPartition(record.topic(), record.partition());
+                if (MapUtils.isNotEmpty(lastCommittedPartitionOffset)
+                        && lastCommittedPartitionOffset.containsKey(topicPartition)
+                        && record.offset() < lastCommittedPartitionOffset.get(topicPartition)) {
+
+                    commit(topicPartition, record.offset());
+                    LOG.info("Skipping already processed message: topic={}, partition={} offset={}. Last processed offset={}",
+                                record.topic(), record.partition(), record.offset(), lastCommittedPartitionOffset.get(topicPartition));
+                    continue;
+                }
+
                 T message = null;
 
                 try {
                     message = deserializer.deserialize(record.value().toString());
                 } catch (OutOfMemoryError excp) {
                     LOG.error("Ignoring message that failed to deserialize: topic={}, partition={}, offset={}, key={}, value={}",
-                              record.topic(), record.partition(), record.offset(), record.key(), record.value(), excp);
+                            record.topic(), record.partition(), record.offset(), record.key(), record.value(), excp);
                 }
 
                 if (message == null) {
@@ -95,29 +141,4 @@ public class AtlasKafkaConsumer<T> extends AbstractNotificationConsumer<T> {
         return messages;
 
     }
-
-
-    @Override
-    public void commit(TopicPartition partition, long offset) {
-        if (!autoCommitEnabled) {
-            if (LOG.isDebugEnabled()) {
-                LOG.info(" commiting the offset ==>> " + offset);
-            }
-            kafkaConsumer.commitSync(Collections.singletonMap(partition, new OffsetAndMetadata(offset)));
-        }
-    }
-
-    @Override
-    public void close() {
-        if (kafkaConsumer != null) {
-            kafkaConsumer.close();
-        }
-    }
-
-    @Override
-    public void wakeup() {
-        if (kafkaConsumer != null) {
-            kafkaConsumer.wakeup();
-        }
-    }
 }
diff --git a/notification/src/main/java/org/apache/atlas/notification/NotificationConsumer.java b/notification/src/main/java/org/apache/atlas/notification/NotificationConsumer.java
index f3e81ec..1fb9f99 100644
--- a/notification/src/main/java/org/apache/atlas/notification/NotificationConsumer.java
+++ b/notification/src/main/java/org/apache/atlas/notification/NotificationConsumer.java
@@ -18,6 +18,8 @@
 package org.apache.atlas.notification;
 
 import java.util.List;
+import java.util.Map;
+
 import org.apache.kafka.common.TopicPartition;
 import org.apache.atlas.kafka.AtlasKafkaMessage;
 
@@ -55,5 +57,10 @@ public interface NotificationConsumer<T> {
     List<AtlasKafkaMessage<T>> receive(long timeoutMilliSeconds);
 
 
-
+    /**
+     * Fetch data for the topics from Kafka, if lastCommittedOffset same as message
+     * received offset, it will proceed with commit.
+     * @return List containing kafka message and partionId and offset.
+     */
+    List<AtlasKafkaMessage<T>> receiveWithCheckedCommit(Map<TopicPartition, Long> lastCommittedPartitionOffset);
 }
diff --git a/notification/src/test/java/org/apache/atlas/notification/AbstractNotificationConsumerTest.java b/notification/src/test/java/org/apache/atlas/notification/AbstractNotificationConsumerTest.java
index 05d0d81..1b486e5 100644
--- a/notification/src/test/java/org/apache/atlas/notification/AbstractNotificationConsumerTest.java
+++ b/notification/src/test/java/org/apache/atlas/notification/AbstractNotificationConsumerTest.java
@@ -30,6 +30,7 @@ import org.testng.annotations.Test;
 import java.util.ArrayList;
 import java.util.LinkedList;
 import java.util.List;
+import java.util.Map;
 import java.util.Objects;
 
 import static org.mockito.Mockito.mock;
@@ -223,6 +224,11 @@ public class AbstractNotificationConsumerTest {
             }
             return tempMessageList;
         }
+
+        @Override
+        public List<AtlasKafkaMessage<TestMessage>> receiveWithCheckedCommit(Map<TopicPartition, Long> lastCommittedPartitionOffset) {
+            return receive();
+        }
     }
 
     public static class TestMessageDeserializer extends AbstractMessageDeserializer<TestMessage> {
diff --git a/webapp/src/main/java/org/apache/atlas/notification/NotificationHookConsumer.java b/webapp/src/main/java/org/apache/atlas/notification/NotificationHookConsumer.java
index 0e58dac..84cc8d8 100644
--- a/webapp/src/main/java/org/apache/atlas/notification/NotificationHookConsumer.java
+++ b/webapp/src/main/java/org/apache/atlas/notification/NotificationHookConsumer.java
@@ -67,6 +67,7 @@ import org.apache.commons.collections.MapUtils;
 import org.apache.commons.collections4.map.PassiveExpiringMap;
 import org.apache.commons.configuration.Configuration;
 import org.apache.commons.lang.StringUtils;
+import org.apache.kafka.common.TopicPartition;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.springframework.context.annotation.DependsOn;
@@ -84,6 +85,7 @@ import java.time.Instant;
 import java.util.ArrayList;
 import java.util.Collection;
 import java.util.Collections;
+import java.util.HashMap;
 import java.util.HashSet;
 import java.util.Iterator;
 import java.util.List;
@@ -188,6 +190,7 @@ public class NotificationHookConsumer implements Service, ActiveStateChangeHandl
     private final Configuration                 applicationProperties;
     private       ExecutorService               executors;
     private       Instant                       nextStatsLogTime = AtlasMetricsCounter.getNextHourStartTime(Instant.now());
+    private final Map<TopicPartition, Long>     lastCommittedPartitionOffset;
 
     @VisibleForTesting
     final int consumerRetryInterval;
@@ -205,7 +208,8 @@ public class NotificationHookConsumer implements Service, ActiveStateChangeHandl
         this.instanceConverter     = instanceConverter;
         this.typeRegistry          = typeRegistry;
         this.applicationProperties = ApplicationProperties.get();
-        this.metricsUtil           = metricsUtil;
+        this.metricsUtil                    = metricsUtil;
+        this.lastCommittedPartitionOffset   = new HashMap<>();
 
         maxRetries            = applicationProperties.getInt(CONSUMER_RETRIES_PROPERTY, 3);
         failedMsgCacheSize    = applicationProperties.getInt(CONSUMER_FAILEDCACHESIZE_PROPERTY, 1);
@@ -517,14 +521,10 @@ public class NotificationHookConsumer implements Service, ActiveStateChangeHandl
         private final List<String>                           failedMessages = new ArrayList<>();
         private final AdaptiveWaiter                         adaptiveWaiter = new AdaptiveWaiter(minWaitDuration, maxWaitDuration, minWaitDuration);
 
-        @VisibleForTesting
-        final FailedCommitOffsetRecorder failedCommitOffsetRecorder;
-
         public HookConsumer(NotificationConsumer<HookNotification> consumer) {
             super("atlas-hook-consumer-thread", false);
 
             this.consumer = consumer;
-            failedCommitOffsetRecorder = new FailedCommitOffsetRecorder();
         }
 
         @Override
@@ -540,7 +540,7 @@ public class NotificationHookConsumer implements Service, ActiveStateChangeHandl
             try {
                 while (shouldRun.get()) {
                     try {
-                        List<AtlasKafkaMessage<HookNotification>> messages = consumer.receive();
+                        List<AtlasKafkaMessage<HookNotification>> messages = consumer.receiveWithCheckedCommit(lastCommittedPartitionOffset);
 
                         for (AtlasKafkaMessage<HookNotification> msg : messages) {
                             handleMessage(msg);
@@ -586,11 +586,6 @@ public class NotificationHookConsumer implements Service, ActiveStateChangeHandl
             }
 
             try {
-                if(failedCommitOffsetRecorder.isMessageReplayed(kafkaMsg.getOffset())) {
-                    commit(kafkaMsg);
-                    return;
-                }
-
                 // covert V1 messages to V2 to enable preProcess
                 try {
                     switch (message.getType()) {
@@ -919,16 +914,11 @@ public class NotificationHookConsumer implements Service, ActiveStateChangeHandl
         }
 
         private void commit(AtlasKafkaMessage<HookNotification> kafkaMessage) {
-            boolean commitSucceessStatus = false;
-            try {
-                recordFailedMessages();
-
-                consumer.commit(kafkaMessage.getTopicPartition(), kafkaMessage.getOffset() + 1);
+            recordFailedMessages();
 
-                commitSucceessStatus = true;
-            } finally {
-                failedCommitOffsetRecorder.recordIfFailed(commitSucceessStatus, kafkaMessage.getOffset());
-            }
+            long commitOffset = kafkaMessage.getOffset() + 1;
+            lastCommittedPartitionOffset.put(kafkaMessage.getTopicPartition(), commitOffset);
+            consumer.commit(kafkaMessage.getTopicPartition(), commitOffset);
         }
 
         boolean serverAvailable(Timer timer) {
@@ -1330,24 +1320,4 @@ public class NotificationHookConsumer implements Service, ActiveStateChangeHandl
 
         return ret;
     }
-
-    static class FailedCommitOffsetRecorder {
-        private Long currentOffset;
-
-        public void recordIfFailed(boolean commitStatus, long offset) {
-            if(commitStatus) {
-                currentOffset = null;
-            } else {
-                currentOffset = offset;
-            }
-        }
-
-        public boolean isMessageReplayed(long offset) {
-            return currentOffset != null && currentOffset == offset;
-        }
-
-        public Long getCurrentOffset() {
-            return currentOffset;
-        }
-    }
 }
diff --git a/webapp/src/test/java/org/apache/atlas/notification/NotificationHookConsumerKafkaTest.java b/webapp/src/test/java/org/apache/atlas/notification/NotificationHookConsumerKafkaTest.java
index 33191a7..65e8b50 100644
--- a/webapp/src/test/java/org/apache/atlas/notification/NotificationHookConsumerKafkaTest.java
+++ b/webapp/src/test/java/org/apache/atlas/notification/NotificationHookConsumerKafkaTest.java
@@ -130,7 +130,6 @@ public class NotificationHookConsumerKafkaTest {
         ExceptionThrowingCommitConsumer        consumer                 = createNewConsumerThatThrowsExceptionInCommit(kafkaNotification, true);
         NotificationHookConsumer               notificationHookConsumer = new NotificationHookConsumer(notificationInterface, atlasEntityStore, serviceState, instanceConverter, typeRegistry, metricsUtil);
         NotificationHookConsumer.HookConsumer  hookConsumer             = notificationHookConsumer.new HookConsumer(consumer);
-        NotificationHookConsumer.FailedCommitOffsetRecorder failedCommitOffsetRecorder = hookConsumer.failedCommitOffsetRecorder;
 
         produceMessage(new HookNotificationV1.EntityCreateRequest("test_user2", createEntity()));
 
@@ -143,16 +142,12 @@ public class NotificationHookConsumerKafkaTest {
             assertTrue(true, "ExceptionThrowing consumer throws an excepion.");
         }
 
-        assertTrue(failedCommitOffsetRecorder.getCurrentOffset() > -1);
-
         consumer.disableCommitExpcetion();
 
         produceMessage(new HookNotificationV1.EntityCreateRequest("test_user1", createEntity()));
         consumeOneMessage(consumer, hookConsumer);
         consumeOneMessage(consumer, hookConsumer);
 
-        assertNull(failedCommitOffsetRecorder.getCurrentOffset());
-
         reset(atlasEntityStore);
     }