You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@atlas.apache.org by ma...@apache.org on 2018/09/18 18:06:29 UTC
atlas git commit: ATLAS-2634: Avoid duplicate message processing.
Repository: atlas
Updated Branches:
refs/heads/branch-0.8 37f59dc95 -> 2f7348988
ATLAS-2634: Avoid duplicate message processing.
Signed-off-by: Ashutosh Mestry <am...@hortonworks.com>
(cherry picked from commit f29a2b7bb2b555e68d7f5e2b43221f85877aa39c)
Project: http://git-wip-us.apache.org/repos/asf/atlas/repo
Commit: http://git-wip-us.apache.org/repos/asf/atlas/commit/2f734898
Tree: http://git-wip-us.apache.org/repos/asf/atlas/tree/2f734898
Diff: http://git-wip-us.apache.org/repos/asf/atlas/diff/2f734898
Branch: refs/heads/branch-0.8
Commit: 2f7348988b992e8a9e5a71cf1a483803fa7d6db8
Parents: 37f59dc
Author: Ashutosh Mestry <am...@hortonworks.com>
Authored: Thu May 3 16:22:10 2018 -0700
Committer: Madhan Neethiraj <ma...@apache.org>
Committed: Tue Sep 18 10:58:21 2018 -0700
----------------------------------------------------------------------
.../apache/atlas/kafka/KafkaNotification.java | 3 +-
.../notification/NotificationHookConsumer.java | 42 +++++++++++-
.../NotificationHookConsumerKafkaTest.java | 67 ++++++++++++++++++++
3 files changed, 108 insertions(+), 4 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/atlas/blob/2f734898/notification/src/main/java/org/apache/atlas/kafka/KafkaNotification.java
----------------------------------------------------------------------
diff --git a/notification/src/main/java/org/apache/atlas/kafka/KafkaNotification.java b/notification/src/main/java/org/apache/atlas/kafka/KafkaNotification.java
index 4c63027..4c753d2 100644
--- a/notification/src/main/java/org/apache/atlas/kafka/KafkaNotification.java
+++ b/notification/src/main/java/org/apache/atlas/kafka/KafkaNotification.java
@@ -245,8 +245,9 @@ public class KafkaNotification extends AbstractNotification implements Service {
}
+ @VisibleForTesting
// Get properties for consumer request
- private Properties getConsumerProperties(NotificationType type) {
+ public Properties getConsumerProperties(NotificationType type) {
// find the configured group id for the given notification type
String groupId = properties.getProperty(type.toString().toLowerCase() + "." + CONSUMER_GROUP_ID_PROPERTY);
http://git-wip-us.apache.org/repos/asf/atlas/blob/2f734898/webapp/src/main/java/org/apache/atlas/notification/NotificationHookConsumer.java
----------------------------------------------------------------------
diff --git a/webapp/src/main/java/org/apache/atlas/notification/NotificationHookConsumer.java b/webapp/src/main/java/org/apache/atlas/notification/NotificationHookConsumer.java
index 88a8cce..1a567af 100644
--- a/webapp/src/main/java/org/apache/atlas/notification/NotificationHookConsumer.java
+++ b/webapp/src/main/java/org/apache/atlas/notification/NotificationHookConsumer.java
@@ -286,10 +286,13 @@ public class NotificationHookConsumer implements Service, ActiveStateChangeHandl
private final List<String> failedMessages = new ArrayList<>();
private final AdaptiveWaiter adaptiveWaiter = new AdaptiveWaiter(minWaitDuration, maxWaitDuration, minWaitDuration);
+ @VisibleForTesting
+ final FailedCommitOffsetRecorder failedCommitOffsetRecorder;
public HookConsumer(NotificationConsumer<HookNotificationMessage> consumer) {
super("atlas-hook-consumer-thread", false);
this.consumer = consumer;
+ failedCommitOffsetRecorder = new FailedCommitOffsetRecorder();
}
@Override
@@ -342,6 +345,11 @@ public class NotificationHookConsumer implements Service, ActiveStateChangeHandl
}
try {
+ if(failedCommitOffsetRecorder.isMessageReplayed(kafkaMsg.getOffset())) {
+ commit(kafkaMsg);
+ return;
+ }
+
// Used for intermediate conversions during create and update
for (int numRetries = 0; numRetries < maxRetries; numRetries++) {
if (LOG.isDebugEnabled()) {
@@ -538,9 +546,17 @@ public class NotificationHookConsumer implements Service, ActiveStateChangeHandl
}
private void commit(AtlasKafkaMessage<HookNotificationMessage> kafkaMessage) {
- recordFailedMessages();
- TopicPartition partition = new TopicPartition("ATLAS_HOOK", kafkaMessage.getPartition());
- consumer.commit(partition, kafkaMessage.getOffset() + 1);
+ boolean commitSucceessStatus = false;
+ try {
+ recordFailedMessages();
+
+ TopicPartition partition = new TopicPartition("ATLAS_HOOK", kafkaMessage.getPartition());
+
+ consumer.commit(partition, kafkaMessage.getOffset() + 1);
+ commitSucceessStatus = true;
+ } finally {
+ failedCommitOffsetRecorder.recordIfFailed(commitSucceessStatus, kafkaMessage.getOffset());
+ }
}
boolean serverAvailable(Timer timer) {
@@ -595,4 +611,24 @@ public class NotificationHookConsumer implements Service, ActiveStateChangeHandl
AuditFilter.audit(messageUser, THREADNAME_PREFIX, method, LOCALHOST, path, LOCALHOST,
DateTimeHelper.formatDateUTC(new Date()));
}
+
+ static class FailedCommitOffsetRecorder {
+ private Long currentOffset;
+
+ public void recordIfFailed(boolean commitStatus, long offset) {
+ if(commitStatus) {
+ currentOffset = null;
+ } else {
+ currentOffset = offset;
+ }
+ }
+
+ public boolean isMessageReplayed(long offset) {
+ return currentOffset != null && currentOffset == offset;
+ }
+
+ public Long getCurrentOffset() {
+ return currentOffset;
+ }
+ }
}
http://git-wip-us.apache.org/repos/asf/atlas/blob/2f734898/webapp/src/test/java/org/apache/atlas/notification/NotificationHookConsumerKafkaTest.java
----------------------------------------------------------------------
diff --git a/webapp/src/test/java/org/apache/atlas/notification/NotificationHookConsumerKafkaTest.java b/webapp/src/test/java/org/apache/atlas/notification/NotificationHookConsumerKafkaTest.java
index e7a400e..14ecc2d 100644
--- a/webapp/src/test/java/org/apache/atlas/notification/NotificationHookConsumerKafkaTest.java
+++ b/webapp/src/test/java/org/apache/atlas/notification/NotificationHookConsumerKafkaTest.java
@@ -35,6 +35,9 @@ import org.apache.atlas.type.AtlasTypeRegistry;
import org.apache.atlas.typesystem.Referenceable;
import org.apache.atlas.web.service.ServiceState;
import org.apache.commons.lang.RandomStringUtils;
+import org.apache.kafka.clients.consumer.KafkaConsumer;
+import org.apache.kafka.common.KafkaException;
+import org.apache.kafka.common.TopicPartition;
import org.mockito.Mock;
import org.mockito.MockitoAnnotations;
import org.testng.Assert;
@@ -43,6 +46,7 @@ import org.testng.annotations.BeforeTest;
import org.testng.annotations.Test;
import static org.apache.atlas.notification.hook.HookNotification.HookNotificationMessage;
import java.util.List;
+import java.util.Properties;
import static org.mockito.Matchers.any;
import static org.mockito.Matchers.anyBoolean;
@@ -112,6 +116,38 @@ public class NotificationHookConsumerKafkaTest {
reset(atlasEntityStore);
}
+ @Test
+ public void consumerConsumesNewMessageButCommitThrowsAnException_MessageOffsetIsRecorded() throws AtlasException, InterruptedException, AtlasBaseException {
+
+ ExceptionThrowingCommitConsumer consumer = createNewConsumerThatThrowsExceptionInCommit(kafkaNotification, true);
+ NotificationHookConsumer notificationHookConsumer = new NotificationHookConsumer(notificationInterface, atlasEntityStore, serviceState, instanceConverter, typeRegistry);
+ NotificationHookConsumer.HookConsumer hookConsumer = notificationHookConsumer.new HookConsumer(consumer);
+ NotificationHookConsumer.FailedCommitOffsetRecorder failedCommitOffsetRecorder = hookConsumer.failedCommitOffsetRecorder;
+
+ produceMessage(new HookNotification.EntityCreateRequest("test_user2", createEntity()));
+
+ try {
+ produceMessage(new HookNotification.EntityCreateRequest("test_user1", createEntity()));
+ consumeOneMessage(consumer, hookConsumer);
+ consumeOneMessage(consumer, hookConsumer);
+ }
+ catch(KafkaException ex) {
+ assertTrue(true, "ExceptionThrowing consumer throws an excepion.");
+ }
+
+ assertTrue(failedCommitOffsetRecorder.getCurrentOffset() > -1);
+
+ consumer.disableCommitExpcetion();
+
+ produceMessage(new HookNotification.EntityCreateRequest("test_user1", createEntity()));
+ consumeOneMessage(consumer, hookConsumer);
+ consumeOneMessage(consumer, hookConsumer);
+
+ assertNull(failedCommitOffsetRecorder.getCurrentOffset());
+
+ reset(atlasEntityStore);
+ }
+
@Test(dependsOnMethods = "testConsumerConsumesNewMessageWithAutoCommitDisabled")
public void testConsumerRemainsAtSameMessageWithAutoCommitEnabled() throws Exception {
produceMessage(new HookNotification.EntityCreateRequest("test_user3", createEntity()));
@@ -138,6 +174,12 @@ public class NotificationHookConsumerKafkaTest {
return (AtlasKafkaConsumer) kafkaNotification.createConsumers(NotificationInterface.NotificationType.HOOK, 1, autoCommitEnabled).get(0);
}
+ ExceptionThrowingCommitConsumer createNewConsumerThatThrowsExceptionInCommit(KafkaNotification kafkaNotification, boolean autoCommitEnabled) {
+ Properties prop = kafkaNotification.getConsumerProperties(NotificationInterface.NotificationType.HOOK);
+ KafkaConsumer consumer = kafkaNotification.getKafkaConsumer(prop, NotificationInterface.NotificationType.HOOK, true);
+ return new ExceptionThrowingCommitConsumer(NotificationInterface.NotificationType.HOOK, consumer, autoCommitEnabled, 1000);
+ }
+
void consumeOneMessage(NotificationConsumer<HookNotificationMessage> consumer,
NotificationHookConsumer.HookConsumer hookConsumer) throws InterruptedException {
try {
@@ -197,4 +239,29 @@ public class NotificationHookConsumerKafkaTest {
private void produceMessage(HookNotificationMessage message) throws NotificationException {
kafkaNotification.send(NotificationInterface.NotificationType.HOOK, message);
}
+
+ private static class ExceptionThrowingCommitConsumer extends AtlasKafkaConsumer {
+
+ private boolean exceptionThrowingEnabled;
+
+ public ExceptionThrowingCommitConsumer(NotificationInterface.NotificationType notificationType,
+ KafkaConsumer kafkaConsumer, boolean autoCommitEnabled, long pollTimeoutMilliSeconds) {
+ super(notificationType.getDeserializer(), kafkaConsumer, autoCommitEnabled, pollTimeoutMilliSeconds);
+ exceptionThrowingEnabled = true;
+ }
+
+ @Override
+ public void commit(TopicPartition partition, long offset) {
+ if(exceptionThrowingEnabled) {
+ throw new KafkaException("test case verifying exception");
+ }
+ else {
+ super.commit(partition, offset);
+ }
+ }
+
+ public void disableCommitExpcetion() {
+ exceptionThrowingEnabled = false;
+ }
+ }
}