You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@atlas.apache.org by ma...@apache.org on 2017/07/15 21:50:24 UTC

incubator-atlas git commit: ATLAS-1908: updated NotificationConsumer with a method removed earlier to avoid breaking existing usage (like in Ranger)

Repository: incubator-atlas
Updated Branches:
  refs/heads/master eddab3b12 -> bcabde9bb


ATLAS-1908: updated NotificationConsumer with a method removed earlier to avoid breaking existing usage (like in Ranger)

Change-Id: Ib8a7f338da7fd0f710fc683da87871e3d9c32035

Signed-off-by: Madhan Neethiraj <ma...@apache.org>


Project: http://git-wip-us.apache.org/repos/asf/incubator-atlas/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-atlas/commit/bcabde9b
Tree: http://git-wip-us.apache.org/repos/asf/incubator-atlas/tree/bcabde9b
Diff: http://git-wip-us.apache.org/repos/asf/incubator-atlas/diff/bcabde9b

Branch: refs/heads/master
Commit: bcabde9bbe8361bc7b4461b395dd2ffcb0906962
Parents: eddab3b
Author: nixonrodrigues <ni...@apache.org>
Authored: Sun Jul 16 00:14:29 2017 +0530
Committer: Madhan Neethiraj <ma...@apache.org>
Committed: Sat Jul 15 14:50:04 2017 -0700

----------------------------------------------------------------------
 .../java/org/apache/atlas/kafka/AtlasKafkaConsumer.java     | 9 ++++++++-
 .../org/apache/atlas/notification/NotificationConsumer.java | 9 +++++++++
 .../notification/AbstractNotificationConsumerTest.java      | 5 +++++
 3 files changed, 22 insertions(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/bcabde9b/notification/src/main/java/org/apache/atlas/kafka/AtlasKafkaConsumer.java
----------------------------------------------------------------------
diff --git a/notification/src/main/java/org/apache/atlas/kafka/AtlasKafkaConsumer.java b/notification/src/main/java/org/apache/atlas/kafka/AtlasKafkaConsumer.java
index 52d0916..d431176 100644
--- a/notification/src/main/java/org/apache/atlas/kafka/AtlasKafkaConsumer.java
+++ b/notification/src/main/java/org/apache/atlas/kafka/AtlasKafkaConsumer.java
@@ -52,9 +52,15 @@ public class AtlasKafkaConsumer<T> extends AbstractNotificationConsumer<T> {
     }
 
     public List<AtlasKafkaMessage<T>> receive() {
+        return this.receive(this.pollTimeoutMilliSeconds);
+    }
+
+    @Override
+    public List<AtlasKafkaMessage<T>> receive(long timeoutMilliSeconds) {
+
         List<AtlasKafkaMessage<T>> messages = new ArrayList();
 
-        ConsumerRecords<?, ?> records = kafkaConsumer.poll(pollTimeoutMilliSeconds);
+        ConsumerRecords<?, ?> records = kafkaConsumer.poll(timeoutMilliSeconds);
 
         if (records != null) {
             for (ConsumerRecord<?, ?> record : records) {
@@ -70,6 +76,7 @@ public class AtlasKafkaConsumer<T> extends AbstractNotificationConsumer<T> {
         }
 
         return messages;
+
     }
 
 

http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/bcabde9b/notification/src/main/java/org/apache/atlas/notification/NotificationConsumer.java
----------------------------------------------------------------------
diff --git a/notification/src/main/java/org/apache/atlas/notification/NotificationConsumer.java b/notification/src/main/java/org/apache/atlas/notification/NotificationConsumer.java
index 6d1c08a..0bd75e1 100644
--- a/notification/src/main/java/org/apache/atlas/notification/NotificationConsumer.java
+++ b/notification/src/main/java/org/apache/atlas/notification/NotificationConsumer.java
@@ -44,4 +44,13 @@ public interface NotificationConsumer<T> {
      * @return List containing kafka message and partionId and offset.
      */
     List<AtlasKafkaMessage<T>> receive();
+
+    /**
+     * Fetch data for the topics from Kafka
+     * @param timeoutMilliSeconds poll timeout
+     * @return List containing kafka message and partionId and offset.
+     */
+    List<AtlasKafkaMessage<T>> receive(long timeoutMilliSeconds);
+
+
 }

http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/bcabde9b/notification/src/test/java/org/apache/atlas/notification/AbstractNotificationConsumerTest.java
----------------------------------------------------------------------
diff --git a/notification/src/test/java/org/apache/atlas/notification/AbstractNotificationConsumerTest.java b/notification/src/test/java/org/apache/atlas/notification/AbstractNotificationConsumerTest.java
index 68fe3d7..bcee00c 100644
--- a/notification/src/test/java/org/apache/atlas/notification/AbstractNotificationConsumerTest.java
+++ b/notification/src/test/java/org/apache/atlas/notification/AbstractNotificationConsumerTest.java
@@ -204,6 +204,11 @@ public class AbstractNotificationConsumerTest {
 
         @Override
         public List<AtlasKafkaMessage<T>> receive() {
+            return receive(1000L);
+        }
+
+        @Override
+        public List<AtlasKafkaMessage<T>> receive(long timeoutMilliSeconds) {
             List<AtlasKafkaMessage<T>> tempMessageList = new ArrayList();
             for(Object json :  messageList) {
                 tempMessageList.add(new AtlasKafkaMessage(deserializer.deserialize((String)json), -1, -1));