You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@atlas.apache.org by ma...@apache.org on 2017/08/02 20:20:17 UTC
atlas git commit: ATLAS-1944: updated handling of shutdown in
KafkaConsumer
Repository: atlas
Updated Branches:
refs/heads/master b59460ffa -> 0267eecd8
ATLAS-1944: updated handling of shutdown in KafkaConsumer
Change-Id: I07cbe1955cd08005660f5189f30f0690809ce1b1
Signed-off-by: Madhan Neethiraj <ma...@apache.org>
Project: http://git-wip-us.apache.org/repos/asf/atlas/repo
Commit: http://git-wip-us.apache.org/repos/asf/atlas/commit/0267eecd
Tree: http://git-wip-us.apache.org/repos/asf/atlas/tree/0267eecd
Diff: http://git-wip-us.apache.org/repos/asf/atlas/diff/0267eecd
Branch: refs/heads/master
Commit: 0267eecd831c4927cac7aa460a548f8628fb31f8
Parents: b59460f
Author: nixonrodrigues <ni...@apache.org>
Authored: Tue Aug 1 11:51:21 2017 +0530
Committer: Madhan Neethiraj <ma...@apache.org>
Committed: Wed Aug 2 13:20:11 2017 -0700
----------------------------------------------------------------------
.../apache/atlas/kafka/AtlasKafkaConsumer.java | 7 +++
.../notification/NotificationConsumer.java | 3 ++
.../AbstractNotificationConsumerTest.java | 5 +++
.../notification/NotificationHookConsumer.java | 45 +++++++++++++++-----
4 files changed, 50 insertions(+), 10 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/atlas/blob/0267eecd/notification/src/main/java/org/apache/atlas/kafka/AtlasKafkaConsumer.java
----------------------------------------------------------------------
diff --git a/notification/src/main/java/org/apache/atlas/kafka/AtlasKafkaConsumer.java b/notification/src/main/java/org/apache/atlas/kafka/AtlasKafkaConsumer.java
index d431176..d3b4e49 100644
--- a/notification/src/main/java/org/apache/atlas/kafka/AtlasKafkaConsumer.java
+++ b/notification/src/main/java/org/apache/atlas/kafka/AtlasKafkaConsumer.java
@@ -96,4 +96,11 @@ public class AtlasKafkaConsumer<T> extends AbstractNotificationConsumer<T> {
kafkaConsumer.close();
}
}
+
+ @Override
+ public void wakeup() {
+ if (kafkaConsumer != null) {
+ kafkaConsumer.wakeup();
+ }
+ }
}
http://git-wip-us.apache.org/repos/asf/atlas/blob/0267eecd/notification/src/main/java/org/apache/atlas/notification/NotificationConsumer.java
----------------------------------------------------------------------
diff --git a/notification/src/main/java/org/apache/atlas/notification/NotificationConsumer.java b/notification/src/main/java/org/apache/atlas/notification/NotificationConsumer.java
index 0bd75e1..f3e81ec 100644
--- a/notification/src/main/java/org/apache/atlas/notification/NotificationConsumer.java
+++ b/notification/src/main/java/org/apache/atlas/notification/NotificationConsumer.java
@@ -39,6 +39,8 @@ public interface NotificationConsumer<T> {
void close();
+ void wakeup();
+
/**
* Fetch data for the topics from Kafka
* @return List containing kafka message and partionId and offset.
@@ -53,4 +55,5 @@ public interface NotificationConsumer<T> {
List<AtlasKafkaMessage<T>> receive(long timeoutMilliSeconds);
+
}
http://git-wip-us.apache.org/repos/asf/atlas/blob/0267eecd/notification/src/test/java/org/apache/atlas/notification/AbstractNotificationConsumerTest.java
----------------------------------------------------------------------
diff --git a/notification/src/test/java/org/apache/atlas/notification/AbstractNotificationConsumerTest.java b/notification/src/test/java/org/apache/atlas/notification/AbstractNotificationConsumerTest.java
index bcee00c..3b2a093 100644
--- a/notification/src/test/java/org/apache/atlas/notification/AbstractNotificationConsumerTest.java
+++ b/notification/src/test/java/org/apache/atlas/notification/AbstractNotificationConsumerTest.java
@@ -203,6 +203,11 @@ public class AbstractNotificationConsumerTest {
}
@Override
+ public void wakeup() {
+
+ }
+
+ @Override
public List<AtlasKafkaMessage<T>> receive() {
return receive(1000L);
}
http://git-wip-us.apache.org/repos/asf/atlas/blob/0267eecd/webapp/src/main/java/org/apache/atlas/notification/NotificationHookConsumer.java
----------------------------------------------------------------------
diff --git a/webapp/src/main/java/org/apache/atlas/notification/NotificationHookConsumer.java b/webapp/src/main/java/org/apache/atlas/notification/NotificationHookConsumer.java
index b8255b3..a74b841 100644
--- a/webapp/src/main/java/org/apache/atlas/notification/NotificationHookConsumer.java
+++ b/webapp/src/main/java/org/apache/atlas/notification/NotificationHookConsumer.java
@@ -168,12 +168,16 @@ public class NotificationHookConsumer implements Service, ActiveStateChangeHandl
}
private void stopConsumerThreads() {
+ LOG.info("==> stopConsumerThreads()");
+
if (consumers != null) {
for (HookConsumer consumer : consumers) {
- consumer.stop();
+ consumer.shutdown();
}
consumers.clear();
}
+
+ LOG.info("<== stopConsumerThreads()");
}
/**
@@ -218,21 +222,35 @@ public class NotificationHookConsumer implements Service, ActiveStateChangeHandl
@Override
public void doWork() {
+ LOG.info("==> HookConsumer doWork()");
+
shouldRun.set(true);
if (!serverAvailable(new NotificationHookConsumer.Timer())) {
return;
}
- while (shouldRun.get()) {
- try {
- List<AtlasKafkaMessage<HookNotificationMessage>> messages = consumer.receive();
- for (AtlasKafkaMessage<HookNotificationMessage> msg : messages) {
- handleMessage(msg);
+ try {
+ while (shouldRun.get()) {
+ try {
+ List<AtlasKafkaMessage<HookNotificationMessage>> messages = consumer.receive();
+ for (AtlasKafkaMessage<HookNotificationMessage> msg : messages) {
+ handleMessage(msg);
+ }
+ } catch (Exception e) {
+ if (shouldRun.get()) {
+ LOG.warn("Exception in NotificationHookConsumer", e);
+ }
}
- } catch (Throwable t) {
- LOG.warn("Failure in NotificationHookConsumer", t);
}
+ } finally {
+ if (consumer != null) {
+ LOG.info("closing NotificationConsumer");
+
+ consumer.close();
+ }
+
+ LOG.info("<== HookConsumer doWork()");
}
}
@@ -369,7 +387,7 @@ public class NotificationHookConsumer implements Service, ActiveStateChangeHandl
private void commit(AtlasKafkaMessage<HookNotificationMessage> kafkaMessage) {
recordFailedMessages();
TopicPartition partition = new TopicPartition("ATLAS_HOOK", kafkaMessage.getPartition());
- consumer.commit(partition, kafkaMessage.getOffset());
+ consumer.commit(partition, kafkaMessage.getOffset() + 1);
}
boolean serverAvailable(Timer timer) {
@@ -397,11 +415,18 @@ public class NotificationHookConsumer implements Service, ActiveStateChangeHandl
@Override
public void shutdown() {
+ LOG.info("==> HookConsumer shutdown()");
+
super.initiateShutdown();
shouldRun.set(false);
- consumer.close();
+ if (consumer != null) {
+ consumer.wakeup();
+ }
super.awaitShutdown();
+
+ LOG.info("<== HookConsumer shutdown()");
}
+
}
private void audit(String messageUser, String method, String path) {