You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@atlas.apache.org by am...@apache.org on 2018/05/04 22:54:52 UTC

atlas git commit: ATLAS-2634: Avoid duplicate message processing.

Repository: atlas
Updated Branches:
  refs/heads/master 015b8bf38 -> f29a2b7bb


ATLAS-2634: Avoid duplicate message processing.

Signed-off-by: Ashutosh Mestry <am...@hortonworks.com>


Project: http://git-wip-us.apache.org/repos/asf/atlas/repo
Commit: http://git-wip-us.apache.org/repos/asf/atlas/commit/f29a2b7b
Tree: http://git-wip-us.apache.org/repos/asf/atlas/tree/f29a2b7b
Diff: http://git-wip-us.apache.org/repos/asf/atlas/diff/f29a2b7b

Branch: refs/heads/master
Commit: f29a2b7bb2b555e68d7f5e2b43221f85877aa39c
Parents: 015b8bf
Author: Ashutosh Mestry <am...@hortonworks.com>
Authored: Thu May 3 16:22:10 2018 -0700
Committer: Ashutosh Mestry <am...@hortonworks.com>
Committed: Fri May 4 15:54:17 2018 -0700

----------------------------------------------------------------------
 .../apache/atlas/kafka/KafkaNotification.java   |  6 +-
 .../notification/NotificationHookConsumer.java  | 42 ++++++++++++-
 .../NotificationHookConsumerKafkaTest.java      | 66 ++++++++++++++++++++
 3 files changed, 109 insertions(+), 5 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/atlas/blob/f29a2b7b/notification/src/main/java/org/apache/atlas/kafka/KafkaNotification.java
----------------------------------------------------------------------
diff --git a/notification/src/main/java/org/apache/atlas/kafka/KafkaNotification.java b/notification/src/main/java/org/apache/atlas/kafka/KafkaNotification.java
index 80dc514..00e56e3 100644
--- a/notification/src/main/java/org/apache/atlas/kafka/KafkaNotification.java
+++ b/notification/src/main/java/org/apache/atlas/kafka/KafkaNotification.java
@@ -241,8 +241,10 @@ public class KafkaNotification extends AbstractNotification implements Service {
     }
 
 
-    // Get properties for consumer request
-    private Properties getConsumerProperties(NotificationType type) {
+    @VisibleForTesting
+    public
+        // Get properties for consumer request
+    Properties getConsumerProperties(NotificationType type) {
         // find the configured group id for the given notification type
         String groupId = properties.getProperty(type.toString().toLowerCase() + "." + CONSUMER_GROUP_ID_PROPERTY);
 

http://git-wip-us.apache.org/repos/asf/atlas/blob/f29a2b7b/webapp/src/main/java/org/apache/atlas/notification/NotificationHookConsumer.java
----------------------------------------------------------------------
diff --git a/webapp/src/main/java/org/apache/atlas/notification/NotificationHookConsumer.java b/webapp/src/main/java/org/apache/atlas/notification/NotificationHookConsumer.java
index 7a4596a..f5e555d 100644
--- a/webapp/src/main/java/org/apache/atlas/notification/NotificationHookConsumer.java
+++ b/webapp/src/main/java/org/apache/atlas/notification/NotificationHookConsumer.java
@@ -55,6 +55,7 @@ import org.apache.atlas.web.filters.AuditFilter;
 import org.apache.atlas.web.filters.AuditFilter.AuditLog;
 import org.apache.atlas.web.service.ServiceState;
 import org.apache.commons.configuration.Configuration;
+import org.apache.kafka.common.KafkaException;
 import org.apache.kafka.common.TopicPartition;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -297,10 +298,14 @@ public class NotificationHookConsumer implements Service, ActiveStateChangeHandl
         private final List<HookNotification>                 failedMessages = new ArrayList<>();
         private final AdaptiveWaiter                         adaptiveWaiter = new AdaptiveWaiter(minWaitDuration, maxWaitDuration, minWaitDuration);
 
+        @VisibleForTesting
+        final FailedCommitOffsetRecorder failedCommitOffsetRecorder;
+
         public HookConsumer(NotificationConsumer<HookNotification> consumer) {
             super("atlas-hook-consumer-thread", false);
 
             this.consumer = consumer;
+            failedCommitOffsetRecorder = new FailedCommitOffsetRecorder();
         }
 
         @Override
@@ -358,6 +363,11 @@ public class NotificationHookConsumer implements Service, ActiveStateChangeHandl
             }
 
             try {
+                if(failedCommitOffsetRecorder.isMessageReplayed(kafkaMsg.getOffset())) {
+                    commit(kafkaMsg);
+                    return;
+                }
+
                 // Used for intermediate conversions during create and update
                 for (int numRetries = 0; numRetries < maxRetries; numRetries++) {
                     if (LOG.isDebugEnabled()) {
@@ -558,11 +568,17 @@ public class NotificationHookConsumer implements Service, ActiveStateChangeHandl
         }
 
         private void commit(AtlasKafkaMessage<HookNotification> kafkaMessage) {
-            recordFailedMessages();
+            boolean commitSucceessStatus = false;
+            try {
+                recordFailedMessages();
 
-            TopicPartition partition = new TopicPartition("ATLAS_HOOK", kafkaMessage.getPartition());
+                TopicPartition partition = new TopicPartition("ATLAS_HOOK", kafkaMessage.getPartition());
 
-            consumer.commit(partition, kafkaMessage.getOffset() + 1);
+                consumer.commit(partition, kafkaMessage.getOffset() + 1);
+                commitSucceessStatus = true;
+            } finally {
+                failedCommitOffsetRecorder.recordIfFailed(commitSucceessStatus, kafkaMessage.getOffset());
+            }
         }
 
         boolean serverAvailable(Timer timer) {
@@ -612,4 +628,24 @@ public class NotificationHookConsumer implements Service, ActiveStateChangeHandl
             LOG.info("<== HookConsumer shutdown()");
         }
     }
+
+    static class FailedCommitOffsetRecorder {
+        private Long currentOffset;
+
+        public void recordIfFailed(boolean commitStatus, long offset) {
+            if(commitStatus) {
+                currentOffset = null;
+            } else {
+                currentOffset = offset;
+            }
+        }
+
+        public boolean isMessageReplayed(long offset) {
+            return currentOffset != null && currentOffset == offset;
+        }
+
+        public Long getCurrentOffset() {
+            return currentOffset;
+        }
+    }
 }

http://git-wip-us.apache.org/repos/asf/atlas/blob/f29a2b7b/webapp/src/test/java/org/apache/atlas/notification/NotificationHookConsumerKafkaTest.java
----------------------------------------------------------------------
diff --git a/webapp/src/test/java/org/apache/atlas/notification/NotificationHookConsumerKafkaTest.java b/webapp/src/test/java/org/apache/atlas/notification/NotificationHookConsumerKafkaTest.java
index d2b3dfd..dbe1a07 100644
--- a/webapp/src/test/java/org/apache/atlas/notification/NotificationHookConsumerKafkaTest.java
+++ b/webapp/src/test/java/org/apache/atlas/notification/NotificationHookConsumerKafkaTest.java
@@ -34,6 +34,9 @@ import org.apache.atlas.type.AtlasType;
 import org.apache.atlas.type.AtlasTypeRegistry;
 import org.apache.atlas.web.service.ServiceState;
 import org.apache.commons.lang.RandomStringUtils;
+import org.apache.kafka.clients.consumer.KafkaConsumer;
+import org.apache.kafka.common.KafkaException;
+import org.apache.kafka.common.TopicPartition;
 import org.mockito.Mock;
 import org.mockito.MockitoAnnotations;
 import org.testng.Assert;
@@ -42,6 +45,7 @@ import org.testng.annotations.BeforeTest;
 import org.testng.annotations.Test;
 
 import java.util.List;
+import java.util.Properties;
 
 import static org.mockito.Matchers.any;
 import static org.mockito.Matchers.anyBoolean;
@@ -115,6 +119,38 @@ public class NotificationHookConsumerKafkaTest {
         reset(atlasEntityStore);
     }
 
+    @Test
+    public void consumerConsumesNewMessageButCommitThrowsAnException_MessageOffsetIsRecorded() throws AtlasException, InterruptedException, AtlasBaseException {
+
+        ExceptionThrowingCommitConsumer        consumer                 = createNewConsumerThatThrowsExceptionInCommit(kafkaNotification, true);
+        NotificationHookConsumer               notificationHookConsumer = new NotificationHookConsumer(notificationInterface, atlasEntityStore, serviceState, instanceConverter, typeRegistry);
+        NotificationHookConsumer.HookConsumer  hookConsumer             = notificationHookConsumer.new HookConsumer(consumer);
+        NotificationHookConsumer.FailedCommitOffsetRecorder failedCommitOffsetRecorder = hookConsumer.failedCommitOffsetRecorder;
+
+        produceMessage(new HookNotificationV1.EntityCreateRequest("test_user2", createEntity()));
+
+        try {
+            produceMessage(new HookNotificationV1.EntityCreateRequest("test_user1", createEntity()));
+            consumeOneMessage(consumer, hookConsumer);
+            consumeOneMessage(consumer, hookConsumer);
+        }
+        catch(KafkaException ex) {
+            assertTrue(true, "ExceptionThrowing consumer throws an excepion.");
+        }
+
+        assertTrue(failedCommitOffsetRecorder.getCurrentOffset() > -1);
+
+        consumer.disableCommitExpcetion();
+
+        produceMessage(new HookNotificationV1.EntityCreateRequest("test_user1", createEntity()));
+        consumeOneMessage(consumer, hookConsumer);
+        consumeOneMessage(consumer, hookConsumer);
+
+        assertNull(failedCommitOffsetRecorder.getCurrentOffset());
+
+        reset(atlasEntityStore);
+    }
+
     @Test(dependsOnMethods = "testConsumerConsumesNewMessageWithAutoCommitDisabled")
     public void testConsumerRemainsAtSameMessageWithAutoCommitEnabled() throws Exception {
         produceMessage(new HookNotificationV1.EntityCreateRequest("test_user3", createEntity()));
@@ -140,6 +176,12 @@ public class NotificationHookConsumerKafkaTest {
         return (AtlasKafkaConsumer) kafkaNotification.createConsumers(NotificationInterface.NotificationType.HOOK, 1, autoCommitEnabled).get(0);
     }
 
+    ExceptionThrowingCommitConsumer createNewConsumerThatThrowsExceptionInCommit(KafkaNotification kafkaNotification, boolean autoCommitEnabled) {
+        Properties prop = kafkaNotification.getConsumerProperties(NotificationInterface.NotificationType.HOOK);
+        KafkaConsumer consumer = kafkaNotification.getKafkaConsumer(prop, NotificationInterface.NotificationType.HOOK, true);
+        return new ExceptionThrowingCommitConsumer(NotificationInterface.NotificationType.HOOK, consumer, autoCommitEnabled, 1000);
+    }
+
     void consumeOneMessage(NotificationConsumer<HookNotification> consumer,
                            NotificationHookConsumer.HookConsumer hookConsumer) throws InterruptedException {
         try {
@@ -205,4 +247,28 @@ public class NotificationHookConsumerKafkaTest {
         }
     }
 
+    private static class ExceptionThrowingCommitConsumer extends AtlasKafkaConsumer {
+
+        private boolean exceptionThrowingEnabled;
+
+        public ExceptionThrowingCommitConsumer(NotificationInterface.NotificationType notificationType,
+                                               KafkaConsumer kafkaConsumer, boolean autoCommitEnabled, long pollTimeoutMilliSeconds) {
+            super(notificationType, kafkaConsumer, autoCommitEnabled, pollTimeoutMilliSeconds);
+            exceptionThrowingEnabled = true;
+        }
+
+        @Override
+        public void commit(TopicPartition partition, long offset) {
+            if(exceptionThrowingEnabled) {
+                throw new KafkaException("test case verifying exception");
+            }
+            else {
+                super.commit(partition, offset);
+            }
+        }
+
+        public void disableCommitExpcetion() {
+            exceptionThrowingEnabled = false;
+        }
+    }
 }