You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@atlas.apache.org by ma...@apache.org on 2017/08/02 20:20:17 UTC

atlas git commit: ATLAS-1944: updated handling of shutdown in KafkaConsumer

Repository: atlas
Updated Branches:
  refs/heads/master b59460ffa -> 0267eecd8


ATLAS-1944: updated handling of shutdown in KafkaConsumer

Change-Id: I07cbe1955cd08005660f5189f30f0690809ce1b1

Signed-off-by: Madhan Neethiraj <ma...@apache.org>


Project: http://git-wip-us.apache.org/repos/asf/atlas/repo
Commit: http://git-wip-us.apache.org/repos/asf/atlas/commit/0267eecd
Tree: http://git-wip-us.apache.org/repos/asf/atlas/tree/0267eecd
Diff: http://git-wip-us.apache.org/repos/asf/atlas/diff/0267eecd

Branch: refs/heads/master
Commit: 0267eecd831c4927cac7aa460a548f8628fb31f8
Parents: b59460f
Author: nixonrodrigues <ni...@apache.org>
Authored: Tue Aug 1 11:51:21 2017 +0530
Committer: Madhan Neethiraj <ma...@apache.org>
Committed: Wed Aug 2 13:20:11 2017 -0700

----------------------------------------------------------------------
 .../apache/atlas/kafka/AtlasKafkaConsumer.java  |  7 +++
 .../notification/NotificationConsumer.java      |  3 ++
 .../AbstractNotificationConsumerTest.java       |  5 +++
 .../notification/NotificationHookConsumer.java  | 45 +++++++++++++++-----
 4 files changed, 50 insertions(+), 10 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/atlas/blob/0267eecd/notification/src/main/java/org/apache/atlas/kafka/AtlasKafkaConsumer.java
----------------------------------------------------------------------
diff --git a/notification/src/main/java/org/apache/atlas/kafka/AtlasKafkaConsumer.java b/notification/src/main/java/org/apache/atlas/kafka/AtlasKafkaConsumer.java
index d431176..d3b4e49 100644
--- a/notification/src/main/java/org/apache/atlas/kafka/AtlasKafkaConsumer.java
+++ b/notification/src/main/java/org/apache/atlas/kafka/AtlasKafkaConsumer.java
@@ -96,4 +96,11 @@ public class AtlasKafkaConsumer<T> extends AbstractNotificationConsumer<T> {
             kafkaConsumer.close();
         }
     }
+
+    @Override
+    public void wakeup() {
+        if (kafkaConsumer != null) {
+            kafkaConsumer.wakeup();
+        }
+    }
 }

http://git-wip-us.apache.org/repos/asf/atlas/blob/0267eecd/notification/src/main/java/org/apache/atlas/notification/NotificationConsumer.java
----------------------------------------------------------------------
diff --git a/notification/src/main/java/org/apache/atlas/notification/NotificationConsumer.java b/notification/src/main/java/org/apache/atlas/notification/NotificationConsumer.java
index 0bd75e1..f3e81ec 100644
--- a/notification/src/main/java/org/apache/atlas/notification/NotificationConsumer.java
+++ b/notification/src/main/java/org/apache/atlas/notification/NotificationConsumer.java
@@ -39,6 +39,8 @@ public interface NotificationConsumer<T> {
 
     void close();
 
+    void wakeup();
+
     /**
      * Fetch data for the topics from Kafka
      * @return List containing kafka message and partionId and offset.
@@ -53,4 +55,5 @@ public interface NotificationConsumer<T> {
     List<AtlasKafkaMessage<T>> receive(long timeoutMilliSeconds);
 
 
+
 }

http://git-wip-us.apache.org/repos/asf/atlas/blob/0267eecd/notification/src/test/java/org/apache/atlas/notification/AbstractNotificationConsumerTest.java
----------------------------------------------------------------------
diff --git a/notification/src/test/java/org/apache/atlas/notification/AbstractNotificationConsumerTest.java b/notification/src/test/java/org/apache/atlas/notification/AbstractNotificationConsumerTest.java
index bcee00c..3b2a093 100644
--- a/notification/src/test/java/org/apache/atlas/notification/AbstractNotificationConsumerTest.java
+++ b/notification/src/test/java/org/apache/atlas/notification/AbstractNotificationConsumerTest.java
@@ -203,6 +203,11 @@ public class AbstractNotificationConsumerTest {
         }
 
         @Override
+        public void wakeup() {
+
+        }
+
+        @Override
         public List<AtlasKafkaMessage<T>> receive() {
             return receive(1000L);
         }

http://git-wip-us.apache.org/repos/asf/atlas/blob/0267eecd/webapp/src/main/java/org/apache/atlas/notification/NotificationHookConsumer.java
----------------------------------------------------------------------
diff --git a/webapp/src/main/java/org/apache/atlas/notification/NotificationHookConsumer.java b/webapp/src/main/java/org/apache/atlas/notification/NotificationHookConsumer.java
index b8255b3..a74b841 100644
--- a/webapp/src/main/java/org/apache/atlas/notification/NotificationHookConsumer.java
+++ b/webapp/src/main/java/org/apache/atlas/notification/NotificationHookConsumer.java
@@ -168,12 +168,16 @@ public class NotificationHookConsumer implements Service, ActiveStateChangeHandl
     }
 
     private void stopConsumerThreads() {
+        LOG.info("==> stopConsumerThreads()");
+
         if (consumers != null) {
             for (HookConsumer consumer : consumers) {
-                consumer.stop();
+                consumer.shutdown();
             }
             consumers.clear();
         }
+
+        LOG.info("<== stopConsumerThreads()");
     }
 
     /**
@@ -218,21 +222,35 @@ public class NotificationHookConsumer implements Service, ActiveStateChangeHandl
 
         @Override
         public void doWork() {
+            LOG.info("==> HookConsumer doWork()");
+
             shouldRun.set(true);
 
             if (!serverAvailable(new NotificationHookConsumer.Timer())) {
                 return;
             }
 
-            while (shouldRun.get()) {
-                try {
-                    List<AtlasKafkaMessage<HookNotificationMessage>> messages = consumer.receive();
-                    for (AtlasKafkaMessage<HookNotificationMessage> msg : messages) {
-                        handleMessage(msg);
+            try {
+                while (shouldRun.get()) {
+                    try {
+                        List<AtlasKafkaMessage<HookNotificationMessage>> messages = consumer.receive();
+                        for (AtlasKafkaMessage<HookNotificationMessage> msg : messages) {
+                            handleMessage(msg);
+                        }
+                    } catch (Exception e) {
+                        if (shouldRun.get()) {
+                            LOG.warn("Exception in NotificationHookConsumer", e);
+                        }
                     }
-                } catch (Throwable t) {
-                    LOG.warn("Failure in NotificationHookConsumer", t);
                 }
+            } finally {
+                if (consumer != null) {
+                    LOG.info("closing NotificationConsumer");
+
+                    consumer.close();
+                }
+
+                LOG.info("<== HookConsumer doWork()");
             }
         }
 
@@ -369,7 +387,7 @@ public class NotificationHookConsumer implements Service, ActiveStateChangeHandl
         private void commit(AtlasKafkaMessage<HookNotificationMessage> kafkaMessage) {
             recordFailedMessages();
             TopicPartition partition = new TopicPartition("ATLAS_HOOK", kafkaMessage.getPartition());
-            consumer.commit(partition, kafkaMessage.getOffset());
+            consumer.commit(partition, kafkaMessage.getOffset() + 1);
         }
 
         boolean serverAvailable(Timer timer) {
@@ -397,11 +415,18 @@ public class NotificationHookConsumer implements Service, ActiveStateChangeHandl
 
         @Override
         public void shutdown() {
+            LOG.info("==> HookConsumer shutdown()");
+
             super.initiateShutdown();
             shouldRun.set(false);
-            consumer.close();
+            if (consumer != null) {
+                consumer.wakeup();
+            }
             super.awaitShutdown();
+
+            LOG.info("<== HookConsumer shutdown()");
         }
+
     }
 
     private void audit(String messageUser, String method, String path) {