You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@atlas.apache.org by ma...@apache.org on 2018/09/18 18:06:29 UTC

atlas git commit: ATLAS-2634: Avoid duplicate message processing.

Repository: atlas
Updated Branches:
  refs/heads/branch-0.8 37f59dc95 -> 2f7348988


ATLAS-2634: Avoid duplicate message processing.

Signed-off-by: Ashutosh Mestry <am...@hortonworks.com>
(cherry picked from commit f29a2b7bb2b555e68d7f5e2b43221f85877aa39c)


Project: http://git-wip-us.apache.org/repos/asf/atlas/repo
Commit: http://git-wip-us.apache.org/repos/asf/atlas/commit/2f734898
Tree: http://git-wip-us.apache.org/repos/asf/atlas/tree/2f734898
Diff: http://git-wip-us.apache.org/repos/asf/atlas/diff/2f734898

Branch: refs/heads/branch-0.8
Commit: 2f7348988b992e8a9e5a71cf1a483803fa7d6db8
Parents: 37f59dc
Author: Ashutosh Mestry <am...@hortonworks.com>
Authored: Thu May 3 16:22:10 2018 -0700
Committer: Madhan Neethiraj <ma...@apache.org>
Committed: Tue Sep 18 10:58:21 2018 -0700

----------------------------------------------------------------------
 .../apache/atlas/kafka/KafkaNotification.java   |  3 +-
 .../notification/NotificationHookConsumer.java  | 42 +++++++++++-
 .../NotificationHookConsumerKafkaTest.java      | 67 ++++++++++++++++++++
 3 files changed, 108 insertions(+), 4 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/atlas/blob/2f734898/notification/src/main/java/org/apache/atlas/kafka/KafkaNotification.java
----------------------------------------------------------------------
diff --git a/notification/src/main/java/org/apache/atlas/kafka/KafkaNotification.java b/notification/src/main/java/org/apache/atlas/kafka/KafkaNotification.java
index 4c63027..4c753d2 100644
--- a/notification/src/main/java/org/apache/atlas/kafka/KafkaNotification.java
+++ b/notification/src/main/java/org/apache/atlas/kafka/KafkaNotification.java
@@ -245,8 +245,9 @@ public class KafkaNotification extends AbstractNotification implements Service {
     }
 
 
+    @VisibleForTesting
     // Get properties for consumer request
-    private Properties getConsumerProperties(NotificationType type) {
+    public Properties getConsumerProperties(NotificationType type) {
         // find the configured group id for the given notification type
         String groupId = properties.getProperty(type.toString().toLowerCase() + "." + CONSUMER_GROUP_ID_PROPERTY);
 

http://git-wip-us.apache.org/repos/asf/atlas/blob/2f734898/webapp/src/main/java/org/apache/atlas/notification/NotificationHookConsumer.java
----------------------------------------------------------------------
diff --git a/webapp/src/main/java/org/apache/atlas/notification/NotificationHookConsumer.java b/webapp/src/main/java/org/apache/atlas/notification/NotificationHookConsumer.java
index 88a8cce..1a567af 100644
--- a/webapp/src/main/java/org/apache/atlas/notification/NotificationHookConsumer.java
+++ b/webapp/src/main/java/org/apache/atlas/notification/NotificationHookConsumer.java
@@ -286,10 +286,13 @@ public class NotificationHookConsumer implements Service, ActiveStateChangeHandl
         private final List<String>                                  failedMessages = new ArrayList<>();
         private final AdaptiveWaiter                                adaptiveWaiter = new AdaptiveWaiter(minWaitDuration, maxWaitDuration, minWaitDuration);
 
+        @VisibleForTesting
+        final FailedCommitOffsetRecorder failedCommitOffsetRecorder;
 
         public HookConsumer(NotificationConsumer<HookNotificationMessage> consumer) {
             super("atlas-hook-consumer-thread", false);
             this.consumer = consumer;
+            failedCommitOffsetRecorder = new FailedCommitOffsetRecorder();
         }
 
         @Override
@@ -342,6 +345,11 @@ public class NotificationHookConsumer implements Service, ActiveStateChangeHandl
             }
 
             try {
+                if(failedCommitOffsetRecorder.isMessageReplayed(kafkaMsg.getOffset())) {
+                    commit(kafkaMsg);
+                    return;
+                }
+
                 // Used for intermediate conversions during create and update
                 for (int numRetries = 0; numRetries < maxRetries; numRetries++) {
                     if (LOG.isDebugEnabled()) {
@@ -538,9 +546,17 @@ public class NotificationHookConsumer implements Service, ActiveStateChangeHandl
         }
 
         private void commit(AtlasKafkaMessage<HookNotificationMessage> kafkaMessage) {
-            recordFailedMessages();
-            TopicPartition partition = new TopicPartition("ATLAS_HOOK", kafkaMessage.getPartition());
-            consumer.commit(partition, kafkaMessage.getOffset() + 1);
+            boolean commitSucceessStatus = false;
+            try {
+                recordFailedMessages();
+
+                TopicPartition partition = new TopicPartition("ATLAS_HOOK", kafkaMessage.getPartition());
+
+                consumer.commit(partition, kafkaMessage.getOffset() + 1);
+                commitSucceessStatus = true;
+            } finally {
+                failedCommitOffsetRecorder.recordIfFailed(commitSucceessStatus, kafkaMessage.getOffset());
+            }
         }
 
         boolean serverAvailable(Timer timer) {
@@ -595,4 +611,24 @@ public class NotificationHookConsumer implements Service, ActiveStateChangeHandl
         AuditFilter.audit(messageUser, THREADNAME_PREFIX, method, LOCALHOST, path, LOCALHOST,
                 DateTimeHelper.formatDateUTC(new Date()));
     }
+
+    static class FailedCommitOffsetRecorder {
+        private Long currentOffset;
+
+        public void recordIfFailed(boolean commitStatus, long offset) {
+            if(commitStatus) {
+                currentOffset = null;
+            } else {
+                currentOffset = offset;
+            }
+        }
+
+        public boolean isMessageReplayed(long offset) {
+            return currentOffset != null && currentOffset == offset;
+        }
+
+        public Long getCurrentOffset() {
+            return currentOffset;
+        }
+    }
 }

http://git-wip-us.apache.org/repos/asf/atlas/blob/2f734898/webapp/src/test/java/org/apache/atlas/notification/NotificationHookConsumerKafkaTest.java
----------------------------------------------------------------------
diff --git a/webapp/src/test/java/org/apache/atlas/notification/NotificationHookConsumerKafkaTest.java b/webapp/src/test/java/org/apache/atlas/notification/NotificationHookConsumerKafkaTest.java
index e7a400e..14ecc2d 100644
--- a/webapp/src/test/java/org/apache/atlas/notification/NotificationHookConsumerKafkaTest.java
+++ b/webapp/src/test/java/org/apache/atlas/notification/NotificationHookConsumerKafkaTest.java
@@ -35,6 +35,9 @@ import org.apache.atlas.type.AtlasTypeRegistry;
 import org.apache.atlas.typesystem.Referenceable;
 import org.apache.atlas.web.service.ServiceState;
 import org.apache.commons.lang.RandomStringUtils;
+import org.apache.kafka.clients.consumer.KafkaConsumer;
+import org.apache.kafka.common.KafkaException;
+import org.apache.kafka.common.TopicPartition;
 import org.mockito.Mock;
 import org.mockito.MockitoAnnotations;
 import org.testng.Assert;
@@ -43,6 +46,7 @@ import org.testng.annotations.BeforeTest;
 import org.testng.annotations.Test;
 import static org.apache.atlas.notification.hook.HookNotification.HookNotificationMessage;
 import java.util.List;
+import java.util.Properties;
 
 import static org.mockito.Matchers.any;
 import static org.mockito.Matchers.anyBoolean;
@@ -112,6 +116,38 @@ public class NotificationHookConsumerKafkaTest {
         reset(atlasEntityStore);
     }
 
+    @Test
+    public void consumerConsumesNewMessageButCommitThrowsAnException_MessageOffsetIsRecorded() throws AtlasException, InterruptedException, AtlasBaseException {
+
+        ExceptionThrowingCommitConsumer        consumer                 = createNewConsumerThatThrowsExceptionInCommit(kafkaNotification, true);
+        NotificationHookConsumer               notificationHookConsumer = new NotificationHookConsumer(notificationInterface, atlasEntityStore, serviceState, instanceConverter, typeRegistry);
+        NotificationHookConsumer.HookConsumer  hookConsumer             = notificationHookConsumer.new HookConsumer(consumer);
+        NotificationHookConsumer.FailedCommitOffsetRecorder failedCommitOffsetRecorder = hookConsumer.failedCommitOffsetRecorder;
+
+        produceMessage(new HookNotification.EntityCreateRequest("test_user2", createEntity()));
+
+        try {
+            produceMessage(new HookNotification.EntityCreateRequest("test_user1", createEntity()));
+            consumeOneMessage(consumer, hookConsumer);
+            consumeOneMessage(consumer, hookConsumer);
+        }
+        catch(KafkaException ex) {
+            assertTrue(true, "ExceptionThrowing consumer throws an excepion.");
+        }
+
+        assertTrue(failedCommitOffsetRecorder.getCurrentOffset() > -1);
+
+        consumer.disableCommitExpcetion();
+
+        produceMessage(new HookNotification.EntityCreateRequest("test_user1", createEntity()));
+        consumeOneMessage(consumer, hookConsumer);
+        consumeOneMessage(consumer, hookConsumer);
+
+        assertNull(failedCommitOffsetRecorder.getCurrentOffset());
+
+        reset(atlasEntityStore);
+    }
+
     @Test(dependsOnMethods = "testConsumerConsumesNewMessageWithAutoCommitDisabled")
     public void testConsumerRemainsAtSameMessageWithAutoCommitEnabled() throws Exception {
         produceMessage(new HookNotification.EntityCreateRequest("test_user3", createEntity()));
@@ -138,6 +174,12 @@ public class NotificationHookConsumerKafkaTest {
         return (AtlasKafkaConsumer) kafkaNotification.createConsumers(NotificationInterface.NotificationType.HOOK, 1, autoCommitEnabled).get(0);
     }
 
+    ExceptionThrowingCommitConsumer createNewConsumerThatThrowsExceptionInCommit(KafkaNotification kafkaNotification, boolean autoCommitEnabled) {
+        Properties prop = kafkaNotification.getConsumerProperties(NotificationInterface.NotificationType.HOOK);
+        KafkaConsumer consumer = kafkaNotification.getKafkaConsumer(prop, NotificationInterface.NotificationType.HOOK, true);
+        return new ExceptionThrowingCommitConsumer(NotificationInterface.NotificationType.HOOK, consumer, autoCommitEnabled, 1000);
+    }
+
     void consumeOneMessage(NotificationConsumer<HookNotificationMessage> consumer,
                            NotificationHookConsumer.HookConsumer hookConsumer) throws InterruptedException {
         try {
@@ -197,4 +239,29 @@ public class NotificationHookConsumerKafkaTest {
     private void produceMessage(HookNotificationMessage message) throws NotificationException {
         kafkaNotification.send(NotificationInterface.NotificationType.HOOK, message);
     }
+
+    private static class ExceptionThrowingCommitConsumer extends AtlasKafkaConsumer {
+
+        private boolean exceptionThrowingEnabled;
+
+        public ExceptionThrowingCommitConsumer(NotificationInterface.NotificationType notificationType,
+                                               KafkaConsumer kafkaConsumer, boolean autoCommitEnabled, long pollTimeoutMilliSeconds) {
+            super(notificationType.getDeserializer(), kafkaConsumer, autoCommitEnabled, pollTimeoutMilliSeconds);
+            exceptionThrowingEnabled = true;
+        }
+
+        @Override
+        public void commit(TopicPartition partition, long offset) {
+            if(exceptionThrowingEnabled) {
+                throw new KafkaException("test case verifying exception");
+            }
+            else {
+                super.commit(partition, offset);
+            }
+        }
+
+        public void disableCommitExpcetion() {
+            exceptionThrowingEnabled = false;
+        }
+    }
 }