You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@atlas.apache.org by ma...@apache.org on 2017/07/15 21:50:24 UTC
incubator-atlas git commit: ATLAS-1908: updated NotificationConsumer
with a method removed earlier to avoid breaking existing usage (like in
Ranger)
Repository: incubator-atlas
Updated Branches:
refs/heads/master eddab3b12 -> bcabde9bb
ATLAS-1908: updated NotificationConsumer with a method removed earlier to avoid breaking existing usage (like in Ranger)
Change-Id: Ib8a7f338da7fd0f710fc683da87871e3d9c32035
Signed-off-by: Madhan Neethiraj <ma...@apache.org>
Project: http://git-wip-us.apache.org/repos/asf/incubator-atlas/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-atlas/commit/bcabde9b
Tree: http://git-wip-us.apache.org/repos/asf/incubator-atlas/tree/bcabde9b
Diff: http://git-wip-us.apache.org/repos/asf/incubator-atlas/diff/bcabde9b
Branch: refs/heads/master
Commit: bcabde9bbe8361bc7b4461b395dd2ffcb0906962
Parents: eddab3b
Author: nixonrodrigues <ni...@apache.org>
Authored: Sun Jul 16 00:14:29 2017 +0530
Committer: Madhan Neethiraj <ma...@apache.org>
Committed: Sat Jul 15 14:50:04 2017 -0700
----------------------------------------------------------------------
.../java/org/apache/atlas/kafka/AtlasKafkaConsumer.java | 9 ++++++++-
.../org/apache/atlas/notification/NotificationConsumer.java | 9 +++++++++
.../notification/AbstractNotificationConsumerTest.java | 5 +++++
3 files changed, 22 insertions(+), 1 deletion(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/bcabde9b/notification/src/main/java/org/apache/atlas/kafka/AtlasKafkaConsumer.java
----------------------------------------------------------------------
diff --git a/notification/src/main/java/org/apache/atlas/kafka/AtlasKafkaConsumer.java b/notification/src/main/java/org/apache/atlas/kafka/AtlasKafkaConsumer.java
index 52d0916..d431176 100644
--- a/notification/src/main/java/org/apache/atlas/kafka/AtlasKafkaConsumer.java
+++ b/notification/src/main/java/org/apache/atlas/kafka/AtlasKafkaConsumer.java
@@ -52,9 +52,15 @@ public class AtlasKafkaConsumer<T> extends AbstractNotificationConsumer<T> {
}
public List<AtlasKafkaMessage<T>> receive() {
+ return this.receive(this.pollTimeoutMilliSeconds);
+ }
+
+ @Override
+ public List<AtlasKafkaMessage<T>> receive(long timeoutMilliSeconds) {
+
List<AtlasKafkaMessage<T>> messages = new ArrayList();
- ConsumerRecords<?, ?> records = kafkaConsumer.poll(pollTimeoutMilliSeconds);
+ ConsumerRecords<?, ?> records = kafkaConsumer.poll(timeoutMilliSeconds);
if (records != null) {
for (ConsumerRecord<?, ?> record : records) {
@@ -70,6 +76,7 @@ public class AtlasKafkaConsumer<T> extends AbstractNotificationConsumer<T> {
}
return messages;
+
}
http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/bcabde9b/notification/src/main/java/org/apache/atlas/notification/NotificationConsumer.java
----------------------------------------------------------------------
diff --git a/notification/src/main/java/org/apache/atlas/notification/NotificationConsumer.java b/notification/src/main/java/org/apache/atlas/notification/NotificationConsumer.java
index 6d1c08a..0bd75e1 100644
--- a/notification/src/main/java/org/apache/atlas/notification/NotificationConsumer.java
+++ b/notification/src/main/java/org/apache/atlas/notification/NotificationConsumer.java
@@ -44,4 +44,13 @@ public interface NotificationConsumer<T> {
* @return List containing kafka message and partionId and offset.
*/
List<AtlasKafkaMessage<T>> receive();
+
+ /**
+ * Fetch data for the topics from Kafka
+ * @param timeoutMilliSeconds poll timeout
+ * @return List containing kafka message and partionId and offset.
+ */
+ List<AtlasKafkaMessage<T>> receive(long timeoutMilliSeconds);
+
+
}
http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/bcabde9b/notification/src/test/java/org/apache/atlas/notification/AbstractNotificationConsumerTest.java
----------------------------------------------------------------------
diff --git a/notification/src/test/java/org/apache/atlas/notification/AbstractNotificationConsumerTest.java b/notification/src/test/java/org/apache/atlas/notification/AbstractNotificationConsumerTest.java
index 68fe3d7..bcee00c 100644
--- a/notification/src/test/java/org/apache/atlas/notification/AbstractNotificationConsumerTest.java
+++ b/notification/src/test/java/org/apache/atlas/notification/AbstractNotificationConsumerTest.java
@@ -204,6 +204,11 @@ public class AbstractNotificationConsumerTest {
@Override
public List<AtlasKafkaMessage<T>> receive() {
+ return receive(1000L);
+ }
+
+ @Override
+ public List<AtlasKafkaMessage<T>> receive(long timeoutMilliSeconds) {
List<AtlasKafkaMessage<T>> tempMessageList = new ArrayList();
for(Object json : messageList) {
tempMessageList.add(new AtlasKafkaMessage(deserializer.deserialize((String)json), -1, -1));