You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@atlas.apache.org by am...@apache.org on 2018/05/04 22:54:52 UTC
atlas git commit: ATLAS-2634: Avoid duplicate message processing.
Repository: atlas
Updated Branches:
refs/heads/master 015b8bf38 -> f29a2b7bb
ATLAS-2634: Avoid duplicate message processing.
Signed-off-by: Ashutosh Mestry <am...@hortonworks.com>
Project: http://git-wip-us.apache.org/repos/asf/atlas/repo
Commit: http://git-wip-us.apache.org/repos/asf/atlas/commit/f29a2b7b
Tree: http://git-wip-us.apache.org/repos/asf/atlas/tree/f29a2b7b
Diff: http://git-wip-us.apache.org/repos/asf/atlas/diff/f29a2b7b
Branch: refs/heads/master
Commit: f29a2b7bb2b555e68d7f5e2b43221f85877aa39c
Parents: 015b8bf
Author: Ashutosh Mestry <am...@hortonworks.com>
Authored: Thu May 3 16:22:10 2018 -0700
Committer: Ashutosh Mestry <am...@hortonworks.com>
Committed: Fri May 4 15:54:17 2018 -0700
----------------------------------------------------------------------
.../apache/atlas/kafka/KafkaNotification.java | 6 +-
.../notification/NotificationHookConsumer.java | 42 ++++++++++++-
.../NotificationHookConsumerKafkaTest.java | 66 ++++++++++++++++++++
3 files changed, 109 insertions(+), 5 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/atlas/blob/f29a2b7b/notification/src/main/java/org/apache/atlas/kafka/KafkaNotification.java
----------------------------------------------------------------------
diff --git a/notification/src/main/java/org/apache/atlas/kafka/KafkaNotification.java b/notification/src/main/java/org/apache/atlas/kafka/KafkaNotification.java
index 80dc514..00e56e3 100644
--- a/notification/src/main/java/org/apache/atlas/kafka/KafkaNotification.java
+++ b/notification/src/main/java/org/apache/atlas/kafka/KafkaNotification.java
@@ -241,8 +241,10 @@ public class KafkaNotification extends AbstractNotification implements Service {
}
- // Get properties for consumer request
- private Properties getConsumerProperties(NotificationType type) {
+ @VisibleForTesting
+ public
+ // Get properties for consumer request
+ Properties getConsumerProperties(NotificationType type) {
// find the configured group id for the given notification type
String groupId = properties.getProperty(type.toString().toLowerCase() + "." + CONSUMER_GROUP_ID_PROPERTY);
http://git-wip-us.apache.org/repos/asf/atlas/blob/f29a2b7b/webapp/src/main/java/org/apache/atlas/notification/NotificationHookConsumer.java
----------------------------------------------------------------------
diff --git a/webapp/src/main/java/org/apache/atlas/notification/NotificationHookConsumer.java b/webapp/src/main/java/org/apache/atlas/notification/NotificationHookConsumer.java
index 7a4596a..f5e555d 100644
--- a/webapp/src/main/java/org/apache/atlas/notification/NotificationHookConsumer.java
+++ b/webapp/src/main/java/org/apache/atlas/notification/NotificationHookConsumer.java
@@ -55,6 +55,7 @@ import org.apache.atlas.web.filters.AuditFilter;
import org.apache.atlas.web.filters.AuditFilter.AuditLog;
import org.apache.atlas.web.service.ServiceState;
import org.apache.commons.configuration.Configuration;
+import org.apache.kafka.common.KafkaException;
import org.apache.kafka.common.TopicPartition;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -297,10 +298,14 @@ public class NotificationHookConsumer implements Service, ActiveStateChangeHandl
private final List<HookNotification> failedMessages = new ArrayList<>();
private final AdaptiveWaiter adaptiveWaiter = new AdaptiveWaiter(minWaitDuration, maxWaitDuration, minWaitDuration);
+ @VisibleForTesting
+ final FailedCommitOffsetRecorder failedCommitOffsetRecorder;
+
public HookConsumer(NotificationConsumer<HookNotification> consumer) {
super("atlas-hook-consumer-thread", false);
this.consumer = consumer;
+ failedCommitOffsetRecorder = new FailedCommitOffsetRecorder();
}
@Override
@@ -358,6 +363,11 @@ public class NotificationHookConsumer implements Service, ActiveStateChangeHandl
}
try {
+ if(failedCommitOffsetRecorder.isMessageReplayed(kafkaMsg.getOffset())) {
+ commit(kafkaMsg);
+ return;
+ }
+
// Used for intermediate conversions during create and update
for (int numRetries = 0; numRetries < maxRetries; numRetries++) {
if (LOG.isDebugEnabled()) {
@@ -558,11 +568,17 @@ public class NotificationHookConsumer implements Service, ActiveStateChangeHandl
}
private void commit(AtlasKafkaMessage<HookNotification> kafkaMessage) {
- recordFailedMessages();
+ boolean commitSucceessStatus = false;
+ try {
+ recordFailedMessages();
- TopicPartition partition = new TopicPartition("ATLAS_HOOK", kafkaMessage.getPartition());
+ TopicPartition partition = new TopicPartition("ATLAS_HOOK", kafkaMessage.getPartition());
- consumer.commit(partition, kafkaMessage.getOffset() + 1);
+ consumer.commit(partition, kafkaMessage.getOffset() + 1);
+ commitSucceessStatus = true;
+ } finally {
+ failedCommitOffsetRecorder.recordIfFailed(commitSucceessStatus, kafkaMessage.getOffset());
+ }
}
boolean serverAvailable(Timer timer) {
@@ -612,4 +628,24 @@ public class NotificationHookConsumer implements Service, ActiveStateChangeHandl
LOG.info("<== HookConsumer shutdown()");
}
}
+
+ static class FailedCommitOffsetRecorder {
+ private Long currentOffset;
+
+ public void recordIfFailed(boolean commitStatus, long offset) {
+ if(commitStatus) {
+ currentOffset = null;
+ } else {
+ currentOffset = offset;
+ }
+ }
+
+ public boolean isMessageReplayed(long offset) {
+ return currentOffset != null && currentOffset == offset;
+ }
+
+ public Long getCurrentOffset() {
+ return currentOffset;
+ }
+ }
}
http://git-wip-us.apache.org/repos/asf/atlas/blob/f29a2b7b/webapp/src/test/java/org/apache/atlas/notification/NotificationHookConsumerKafkaTest.java
----------------------------------------------------------------------
diff --git a/webapp/src/test/java/org/apache/atlas/notification/NotificationHookConsumerKafkaTest.java b/webapp/src/test/java/org/apache/atlas/notification/NotificationHookConsumerKafkaTest.java
index d2b3dfd..dbe1a07 100644
--- a/webapp/src/test/java/org/apache/atlas/notification/NotificationHookConsumerKafkaTest.java
+++ b/webapp/src/test/java/org/apache/atlas/notification/NotificationHookConsumerKafkaTest.java
@@ -34,6 +34,9 @@ import org.apache.atlas.type.AtlasType;
import org.apache.atlas.type.AtlasTypeRegistry;
import org.apache.atlas.web.service.ServiceState;
import org.apache.commons.lang.RandomStringUtils;
+import org.apache.kafka.clients.consumer.KafkaConsumer;
+import org.apache.kafka.common.KafkaException;
+import org.apache.kafka.common.TopicPartition;
import org.mockito.Mock;
import org.mockito.MockitoAnnotations;
import org.testng.Assert;
@@ -42,6 +45,7 @@ import org.testng.annotations.BeforeTest;
import org.testng.annotations.Test;
import java.util.List;
+import java.util.Properties;
import static org.mockito.Matchers.any;
import static org.mockito.Matchers.anyBoolean;
@@ -115,6 +119,38 @@ public class NotificationHookConsumerKafkaTest {
reset(atlasEntityStore);
}
+ @Test
+ public void consumerConsumesNewMessageButCommitThrowsAnException_MessageOffsetIsRecorded() throws AtlasException, InterruptedException, AtlasBaseException {
+
+ ExceptionThrowingCommitConsumer consumer = createNewConsumerThatThrowsExceptionInCommit(kafkaNotification, true);
+ NotificationHookConsumer notificationHookConsumer = new NotificationHookConsumer(notificationInterface, atlasEntityStore, serviceState, instanceConverter, typeRegistry);
+ NotificationHookConsumer.HookConsumer hookConsumer = notificationHookConsumer.new HookConsumer(consumer);
+ NotificationHookConsumer.FailedCommitOffsetRecorder failedCommitOffsetRecorder = hookConsumer.failedCommitOffsetRecorder;
+
+ produceMessage(new HookNotificationV1.EntityCreateRequest("test_user2", createEntity()));
+
+ try {
+ produceMessage(new HookNotificationV1.EntityCreateRequest("test_user1", createEntity()));
+ consumeOneMessage(consumer, hookConsumer);
+ consumeOneMessage(consumer, hookConsumer);
+ }
+ catch(KafkaException ex) {
+ assertTrue(true, "ExceptionThrowing consumer throws an excepion.");
+ }
+
+ assertTrue(failedCommitOffsetRecorder.getCurrentOffset() > -1);
+
+ consumer.disableCommitExpcetion();
+
+ produceMessage(new HookNotificationV1.EntityCreateRequest("test_user1", createEntity()));
+ consumeOneMessage(consumer, hookConsumer);
+ consumeOneMessage(consumer, hookConsumer);
+
+ assertNull(failedCommitOffsetRecorder.getCurrentOffset());
+
+ reset(atlasEntityStore);
+ }
+
@Test(dependsOnMethods = "testConsumerConsumesNewMessageWithAutoCommitDisabled")
public void testConsumerRemainsAtSameMessageWithAutoCommitEnabled() throws Exception {
produceMessage(new HookNotificationV1.EntityCreateRequest("test_user3", createEntity()));
@@ -140,6 +176,12 @@ public class NotificationHookConsumerKafkaTest {
return (AtlasKafkaConsumer) kafkaNotification.createConsumers(NotificationInterface.NotificationType.HOOK, 1, autoCommitEnabled).get(0);
}
+ ExceptionThrowingCommitConsumer createNewConsumerThatThrowsExceptionInCommit(KafkaNotification kafkaNotification, boolean autoCommitEnabled) {
+ Properties prop = kafkaNotification.getConsumerProperties(NotificationInterface.NotificationType.HOOK);
+ KafkaConsumer consumer = kafkaNotification.getKafkaConsumer(prop, NotificationInterface.NotificationType.HOOK, true);
+ return new ExceptionThrowingCommitConsumer(NotificationInterface.NotificationType.HOOK, consumer, autoCommitEnabled, 1000);
+ }
+
void consumeOneMessage(NotificationConsumer<HookNotification> consumer,
NotificationHookConsumer.HookConsumer hookConsumer) throws InterruptedException {
try {
@@ -205,4 +247,28 @@ public class NotificationHookConsumerKafkaTest {
}
}
+ private static class ExceptionThrowingCommitConsumer extends AtlasKafkaConsumer {
+
+ private boolean exceptionThrowingEnabled;
+
+ public ExceptionThrowingCommitConsumer(NotificationInterface.NotificationType notificationType,
+ KafkaConsumer kafkaConsumer, boolean autoCommitEnabled, long pollTimeoutMilliSeconds) {
+ super(notificationType, kafkaConsumer, autoCommitEnabled, pollTimeoutMilliSeconds);
+ exceptionThrowingEnabled = true;
+ }
+
+ @Override
+ public void commit(TopicPartition partition, long offset) {
+ if(exceptionThrowingEnabled) {
+ throw new KafkaException("test case verifying exception");
+ }
+ else {
+ super.commit(partition, offset);
+ }
+ }
+
+ public void disableCommitExpcetion() {
+ exceptionThrowingEnabled = false;
+ }
+ }
}