You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@atlas.apache.org by ma...@apache.org on 2017/06/26 15:43:19 UTC

incubator-atlas git commit: ATLAS-1766: updated NotificationConsumer implementation to use new Kafka Consumer API, to enable support for SASL_SSL protocol

Repository: incubator-atlas
Updated Branches:
  refs/heads/master 5bb2dcbeb -> 0e7f8ea46


ATLAS-1766: updated NotificationConsumer implementation to use new Kafka Consumer API, to enable support for SASL_SSL protocol

Signed-off-by: Madhan Neethiraj <ma...@apache.org>


Project: http://git-wip-us.apache.org/repos/asf/incubator-atlas/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-atlas/commit/0e7f8ea4
Tree: http://git-wip-us.apache.org/repos/asf/incubator-atlas/tree/0e7f8ea4
Diff: http://git-wip-us.apache.org/repos/asf/incubator-atlas/diff/0e7f8ea4

Branch: refs/heads/master
Commit: 0e7f8ea4603c858cc295259bbd1a22314b732f62
Parents: 5bb2dcb
Author: nixonrodrigues <ni...@apache.org>
Authored: Mon Jun 5 12:49:27 2017 +0530
Committer: Madhan Neethiraj <ma...@apache.org>
Committed: Mon Jun 26 08:07:12 2017 -0700

----------------------------------------------------------------------
 distro/src/conf/atlas-application.properties    |   8 +-
 .../apache/atlas/kafka/AtlasKafkaConsumer.java  |  91 ++++++++++++
 .../apache/atlas/kafka/AtlasKafkaMessage.java   |  44 ++++++
 .../org/apache/atlas/kafka/KafkaConsumer.java   | 104 --------------
 .../apache/atlas/kafka/KafkaNotification.java   |  95 ++++---------
 .../AbstractMessageDeserializer.java            |   2 +-
 .../AbstractNotificationConsumer.java           |  34 +----
 .../notification/NotificationConsumer.java      |  33 ++---
 .../apache/atlas/kafka/KafkaConsumerTest.java   | 137 ++++++++-----------
 .../atlas/kafka/KafkaNotificationMockTest.java  |  52 +++----
 .../atlas/kafka/KafkaNotificationTest.java      |  56 +++-----
 .../AbstractNotificationConsumerTest.java       | 121 +++++-----------
 .../test/resources/atlas-application.properties |   8 +-
 .../notification/NotificationHookConsumer.java  |  37 +++--
 .../notification/EntityNotificationIT.java      |  14 +-
 .../NotificationHookConsumerKafkaTest.java      |  77 +++++++----
 .../NotificationHookConsumerTest.java           |  10 +-
 .../atlas/web/integration/BaseResourceIT.java   |  24 ++--
 .../web/integration/EntityJerseyResourceIT.java |  22 ---
 .../integration/EntityV2JerseyResourceIT.java   |  16 +--
 20 files changed, 415 insertions(+), 570 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/0e7f8ea4/distro/src/conf/atlas-application.properties
----------------------------------------------------------------------
diff --git a/distro/src/conf/atlas-application.properties b/distro/src/conf/atlas-application.properties
index 5e59528..474f253 100755
--- a/distro/src/conf/atlas-application.properties
+++ b/distro/src/conf/atlas-application.properties
@@ -74,9 +74,13 @@ atlas.kafka.zookeeper.session.timeout.ms=400
 atlas.kafka.zookeeper.connection.timeout.ms=200
 atlas.kafka.zookeeper.sync.time.ms=20
 atlas.kafka.auto.commit.interval.ms=1000
-atlas.kafka.auto.offset.reset=smallest
 atlas.kafka.hook.group.id=atlas
-atlas.kafka.auto.commit.enable=false
+
+atlas.kafka.enable.auto.commit=false
+atlas.kafka.auto.offset.reset=earliest
+atlas.kafka.session.timeout.ms=30000
+
+
 atlas.notification.create.topics=true
 atlas.notification.replicas=1
 atlas.notification.topics=ATLAS_HOOK,ATLAS_ENTITIES

http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/0e7f8ea4/notification/src/main/java/org/apache/atlas/kafka/AtlasKafkaConsumer.java
----------------------------------------------------------------------
diff --git a/notification/src/main/java/org/apache/atlas/kafka/AtlasKafkaConsumer.java b/notification/src/main/java/org/apache/atlas/kafka/AtlasKafkaConsumer.java
new file mode 100644
index 0000000..9c15243
--- /dev/null
+++ b/notification/src/main/java/org/apache/atlas/kafka/AtlasKafkaConsumer.java
@@ -0,0 +1,91 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.atlas.kafka;
+
+import org.apache.atlas.notification.AbstractNotificationConsumer;
+import org.apache.atlas.notification.MessageDeserializer;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+
+import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.apache.kafka.clients.consumer.ConsumerRecords;
+import org.apache.kafka.clients.consumer.KafkaConsumer;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.clients.consumer.OffsetAndMetadata;
+
+/**
+ * Kafka specific notification consumer.
+ *
+ * @param <T> the notification type returned by this consumer
+ */
+public class AtlasKafkaConsumer<T> extends AbstractNotificationConsumer<T> {
+    private static final Logger LOG = LoggerFactory.getLogger(AtlasKafkaConsumer.class);
+
+    private final KafkaConsumer kafkaConsumer;
+    private final boolean       autoCommitEnabled;
+
+    public AtlasKafkaConsumer(MessageDeserializer<T> deserializer, KafkaConsumer kafkaConsumer, boolean autoCommitEnabled) {
+        super(deserializer);
+
+        this.kafkaConsumer     = kafkaConsumer;
+        this.autoCommitEnabled = autoCommitEnabled;
+    }
+
+    public List<AtlasKafkaMessage<T>> receive(long timeoutMilliSeconds) {
+        List<AtlasKafkaMessage<T>> messages = new ArrayList();
+
+        ConsumerRecords<?, ?> records = kafkaConsumer.poll(timeoutMilliSeconds);
+
+        if (records != null) {
+            for (ConsumerRecord<?, ?> record : records) {
+                if (LOG.isDebugEnabled()) {
+                    LOG.debug("Received Message topic ={}, partition ={}, offset = {}, key = {}, value = {}",
+                            record.topic(), record.partition(), record.offset(), record.key(), record.value());
+                }
+
+                T message = deserializer.deserialize(record.value().toString());
+
+                messages.add(new AtlasKafkaMessage(message, record.offset(), record.partition()));
+            }
+        }
+
+        return messages;
+    }
+
+
+    @Override
+    public void commit(TopicPartition partition, long offset) {
+        if (!autoCommitEnabled) {
+            if (LOG.isDebugEnabled()) {
+                LOG.info(" commiting the offset ==>> " + offset);
+            }
+            kafkaConsumer.commitSync(Collections.singletonMap(partition, new OffsetAndMetadata(offset)));
+        }
+    }
+
+    @Override
+    public void close() {
+        if (kafkaConsumer != null) {
+            kafkaConsumer.close();
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/0e7f8ea4/notification/src/main/java/org/apache/atlas/kafka/AtlasKafkaMessage.java
----------------------------------------------------------------------
diff --git a/notification/src/main/java/org/apache/atlas/kafka/AtlasKafkaMessage.java b/notification/src/main/java/org/apache/atlas/kafka/AtlasKafkaMessage.java
new file mode 100644
index 0000000..cdbf57f
--- /dev/null
+++ b/notification/src/main/java/org/apache/atlas/kafka/AtlasKafkaMessage.java
@@ -0,0 +1,44 @@
+package org.apache.atlas.kafka;
+
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+public class AtlasKafkaMessage<T> {
+    private final T    message;
+    private final long offset;
+    private final int  partition;
+
+    public AtlasKafkaMessage(T message, long offset, int partition) {
+        this.message   = message;
+        this.offset    = offset;
+        this.partition = partition;
+    }
+
+    public T getMessage() {
+        return message;
+    }
+
+    public long getOffset() {
+        return offset;
+    }
+
+    public int getPartition() {
+        return partition;
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/0e7f8ea4/notification/src/main/java/org/apache/atlas/kafka/KafkaConsumer.java
----------------------------------------------------------------------
diff --git a/notification/src/main/java/org/apache/atlas/kafka/KafkaConsumer.java b/notification/src/main/java/org/apache/atlas/kafka/KafkaConsumer.java
deleted file mode 100644
index 16c0eb2..0000000
--- a/notification/src/main/java/org/apache/atlas/kafka/KafkaConsumer.java
+++ /dev/null
@@ -1,104 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.atlas.kafka;
-
-import kafka.consumer.ConsumerIterator;
-import kafka.consumer.KafkaStream;
-import kafka.javaapi.consumer.ConsumerConnector;
-import kafka.message.MessageAndMetadata;
-import org.apache.atlas.notification.AbstractNotificationConsumer;
-import org.apache.atlas.notification.MessageDeserializer;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-/**
- * Kafka specific notification consumer.
- *
- * @param <T>  the notification type returned by this consumer
- */
-public class KafkaConsumer<T> extends AbstractNotificationConsumer<T> {
-    private static final Logger LOG = LoggerFactory.getLogger(KafkaConsumer.class);
-
-    private final int consumerId;
-    private final ConsumerIterator iterator;
-    private final ConsumerConnector consumerConnector;
-    private final boolean autoCommitEnabled;
-    private long lastSeenOffset;
-
-
-    // ----- Constructors ----------------------------------------------------
-
-    /**
-     * Create a Kafka consumer.
-     * @param deserializer  the message deserializer used for this consumer
-     * @param stream        the underlying Kafka stream
-     * @param consumerId    an id value for this consumer
-     * @param consumerConnector the {@link ConsumerConnector} which created the underlying Kafka stream
-     * @param autoCommitEnabled true if consumer does not need to commit offsets explicitly, false otherwise.
-     */
-    public KafkaConsumer(MessageDeserializer<T> deserializer, KafkaStream<String, String> stream, int consumerId,
-                         ConsumerConnector consumerConnector, boolean autoCommitEnabled) {
-        super(deserializer);
-        this.consumerConnector = consumerConnector;
-        this.lastSeenOffset = 0;
-        this.iterator   = stream.iterator();
-        this.consumerId = consumerId;
-        this.autoCommitEnabled = autoCommitEnabled;
-    }
-
-
-    // ----- NotificationConsumer --------------------------------------------
-
-    @Override
-    public boolean hasNext() {
-        return iterator.hasNext();
-    }
-
-
-    // ----- AbstractNotificationConsumer ------------------------------------
-
-    @Override
-    public String getNext() {
-        MessageAndMetadata message = iterator.next();
-        LOG.debug("Read message: conumerId: {}, topic - {}, partition - {}, offset - {}, message - {}",
-                consumerId, message.topic(), message.partition(), message.offset(), message.message());
-        lastSeenOffset = message.offset();
-        return (String) message.message();
-    }
-
-    @Override
-    protected String peekMessage() {
-        MessageAndMetadata message = (MessageAndMetadata) iterator.peek();
-        return (String) message.message();
-    }
-
-    @Override
-    public void commit() {
-        if (autoCommitEnabled) {
-            LOG.debug("Auto commit is disabled, not committing.");
-        } else {
-            consumerConnector.commitOffsets();
-            LOG.debug("Committed offset: {}", lastSeenOffset);
-        }
-    }
-
-    @Override
-    public void close() {
-        consumerConnector.shutdown();
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/0e7f8ea4/notification/src/main/java/org/apache/atlas/kafka/KafkaNotification.java
----------------------------------------------------------------------
diff --git a/notification/src/main/java/org/apache/atlas/kafka/KafkaNotification.java b/notification/src/main/java/org/apache/atlas/kafka/KafkaNotification.java
index 8bd31fd..366c8a7 100644
--- a/notification/src/main/java/org/apache/atlas/kafka/KafkaNotification.java
+++ b/notification/src/main/java/org/apache/atlas/kafka/KafkaNotification.java
@@ -18,25 +18,22 @@
 package org.apache.atlas.kafka;
 
 import com.google.common.annotations.VisibleForTesting;
-import kafka.consumer.Consumer;
-import kafka.consumer.KafkaStream;
-import kafka.javaapi.consumer.ConsumerConnector;
-import kafka.serializer.StringDecoder;
 import kafka.server.KafkaConfig;
 import kafka.server.KafkaServer;
 import kafka.utils.Time;
 import org.apache.atlas.ApplicationProperties;
 import org.apache.atlas.AtlasException;
 import org.apache.atlas.notification.AbstractNotification;
-import org.apache.atlas.notification.MessageDeserializer;
 import org.apache.atlas.notification.NotificationConsumer;
 import org.apache.atlas.notification.NotificationException;
 import org.apache.atlas.service.Service;
 import org.apache.commons.configuration.Configuration;
 import org.apache.commons.configuration.ConfigurationConverter;
+import org.apache.commons.lang.StringUtils;
 import org.apache.kafka.clients.consumer.ConsumerConfig;
 import org.apache.kafka.clients.producer.KafkaProducer;
 import org.apache.kafka.clients.producer.Producer;
+import org.apache.kafka.clients.consumer.KafkaConsumer;
 import org.apache.kafka.clients.producer.ProducerConfig;
 import org.apache.kafka.clients.producer.ProducerRecord;
 import org.apache.kafka.clients.producer.RecordMetadata;
@@ -56,10 +53,11 @@ import java.net.InetSocketAddress;
 import java.net.MalformedURLException;
 import java.net.URISyntaxException;
 import java.net.URL;
-import java.util.ArrayList;
-import java.util.HashMap;
+import java.util.Arrays;
 import java.util.List;
 import java.util.Map;
+import java.util.HashMap;
+import java.util.ArrayList;
 import java.util.Properties;
 import java.util.concurrent.Future;
 
@@ -83,9 +81,8 @@ public class KafkaNotification extends AbstractNotification implements Service {
     private KafkaServer kafkaServer;
     private ServerCnxnFactory factory;
     private Properties properties;
-
+    private KafkaConsumer consumer = null;
     private KafkaProducer producer = null;
-    private List<ConsumerConnector> consumerConnectors = new ArrayList<>();
 
     private static final Map<NotificationType, String> TOPIC_MAP = new HashMap<NotificationType, String>() {
         {
@@ -126,8 +123,7 @@ public class KafkaNotification extends AbstractNotification implements Service {
                 "org.apache.kafka.common.serialization.StringDeserializer");
         properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,
                 "org.apache.kafka.common.serialization.StringDeserializer");
-        properties.put(ConsumerConfig.PARTITION_ASSIGNMENT_STRATEGY_CONFIG, "roundrobin");
-        properties.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "smallest");
+        properties.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
     }
 
     @VisibleForTesting
@@ -171,34 +167,18 @@ public class KafkaNotification extends AbstractNotification implements Service {
     public <T> List<NotificationConsumer<T>> createConsumers(NotificationType notificationType,
                                                              int numConsumers) {
         return createConsumers(notificationType, numConsumers,
-                Boolean.valueOf(properties.getProperty("auto.commit.enable", "true")));
+                Boolean.valueOf(properties.getProperty("enable.auto.commit", "true")));
     }
 
     @VisibleForTesting
     public <T> List<NotificationConsumer<T>> createConsumers(NotificationType notificationType,
                                                       int numConsumers, boolean autoCommitEnabled) {
-        String topic = TOPIC_MAP.get(notificationType);
 
         Properties consumerProperties = getConsumerProperties(notificationType);
 
-        List<NotificationConsumer<T>> consumers = new ArrayList<>(numConsumers);
-        for (int i = 0; i < numConsumers; i++) {
-            ConsumerConnector consumerConnector = createConsumerConnector(consumerProperties);
-            Map<String, Integer> topicCountMap = new HashMap<>();
-            topicCountMap.put(topic, 1);
-            StringDecoder decoder = new StringDecoder(null);
-            Map<String, List<KafkaStream<String, String>>> streamsMap =
-                    consumerConnector.createMessageStreams(topicCountMap, decoder, decoder);
-            List<KafkaStream<String, String>> kafkaConsumers = streamsMap.get(topic);
-            for (KafkaStream stream : kafkaConsumers) {
-                KafkaConsumer<T> kafkaConsumer =
-                        createKafkaConsumer(notificationType.getClassType(), notificationType.getDeserializer(),
-                                stream, i, consumerConnector, autoCommitEnabled);
-                consumers.add(kafkaConsumer);
-            }
-            consumerConnectors.add(consumerConnector);
-        }
-
+        List<NotificationConsumer<T>> consumers = new ArrayList<>();
+        AtlasKafkaConsumer kafkaConsumer = new AtlasKafkaConsumer(notificationType.getDeserializer(), getKafkaConsumer(consumerProperties,notificationType, autoCommitEnabled), autoCommitEnabled);
+        consumers.add(kafkaConsumer);
         return consumers;
     }
 
@@ -208,11 +188,6 @@ public class KafkaNotification extends AbstractNotification implements Service {
             producer.close();
             producer = null;
         }
-
-        for (ConsumerConnector consumerConnector : consumerConnectors) {
-            consumerConnector.shutdown();
-        }
-        consumerConnectors.clear();
     }
 
 
@@ -254,43 +229,31 @@ public class KafkaNotification extends AbstractNotification implements Service {
         }
     }
 
-    // ----- helper methods --------------------------------------------------
 
-    /**
-     * Create a Kafka consumer connector from the given properties.
-     *
-     * @param consumerProperties  the properties for creating the consumer connector
-     *
-     * @return a new Kafka consumer connector
-     */
-    protected ConsumerConnector createConsumerConnector(Properties consumerProperties) {
-        return Consumer.createJavaConsumerConnector(new kafka.consumer.ConsumerConfig(consumerProperties));
-    }
+    public KafkaConsumer  getKafkaConsumer(Properties consumerProperties, NotificationType type, boolean autoCommitEnabled) {
+        if(this.consumer == null) {
+            try {
+                String topic = TOPIC_MAP.get(type);
+                consumerProperties.put("enable.auto.commit", autoCommitEnabled);
+                this.consumer = new KafkaConsumer(consumerProperties);
+                this.consumer.subscribe(Arrays.asList(topic));
+            }catch (Exception ee) {
+                LOG.error("Exception in getKafkaConsumer ", ee);
+            }
+        }
 
-    /**
-     * Create a Kafka consumer from the given Kafka stream.
-     *
-     * @param type          the notification type to be returned by the consumer
-     * @param deserializer  the deserializer for the created consumers
-     * @param stream        the Kafka stream
-     * @param consumerId    the id for the new consumer
-     *
-     * @param consumerConnector
-     * @return a new Kafka consumer
-     */
-    protected <T> org.apache.atlas.kafka.KafkaConsumer<T>
-    createKafkaConsumer(Class<T> type, MessageDeserializer<T> deserializer, KafkaStream stream,
-                        int consumerId, ConsumerConnector consumerConnector, boolean autoCommitEnabled) {
-        return new org.apache.atlas.kafka.KafkaConsumer<>(deserializer, stream,
-                consumerId, consumerConnector, autoCommitEnabled);
+        return this.consumer;
     }
 
+
+
+
     // Get properties for consumer request
     private Properties getConsumerProperties(NotificationType type) {
         // find the configured group id for the given notification type
-        String groupId = properties.getProperty(type.toString().toLowerCase() + "." + CONSUMER_GROUP_ID_PROPERTY);
 
-        if (groupId == null) {
+        String groupId = properties.getProperty(type.toString().toLowerCase() + "." + CONSUMER_GROUP_ID_PROPERTY);
+        if (StringUtils.isEmpty(groupId)) {
             throw new IllegalStateException("No configuration group id set for the notification type " + type);
         }
 
@@ -298,7 +261,7 @@ public class KafkaNotification extends AbstractNotification implements Service {
         consumerProperties.putAll(properties);
         consumerProperties.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);
 
-        LOG.info("Consumer property: auto.commit.enable: {}", consumerProperties.getProperty("auto.commit.enable"));
+        LOG.info("Consumer property: atlas.kafka.enable.auto.commit: {}", consumerProperties.getProperty("enable.auto.commit"));
         return consumerProperties;
     }
 

http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/0e7f8ea4/notification/src/main/java/org/apache/atlas/notification/AbstractMessageDeserializer.java
----------------------------------------------------------------------
diff --git a/notification/src/main/java/org/apache/atlas/notification/AbstractMessageDeserializer.java b/notification/src/main/java/org/apache/atlas/notification/AbstractMessageDeserializer.java
index 9585827..ec99372 100644
--- a/notification/src/main/java/org/apache/atlas/notification/AbstractMessageDeserializer.java
+++ b/notification/src/main/java/org/apache/atlas/notification/AbstractMessageDeserializer.java
@@ -128,7 +128,7 @@ public abstract class AbstractMessageDeserializer<T> extends VersionedMessageDes
     /**
      * Deserializer for JSONArray.
      */
-    protected static final class JSONArrayDeserializer implements JsonDeserializer<JSONArray> {
+    public static final class JSONArrayDeserializer implements JsonDeserializer<JSONArray> {
         @Override
         public JSONArray deserialize(final JsonElement json, final Type type,
                                      final JsonDeserializationContext context) {

http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/0e7f8ea4/notification/src/main/java/org/apache/atlas/notification/AbstractNotificationConsumer.java
----------------------------------------------------------------------
diff --git a/notification/src/main/java/org/apache/atlas/notification/AbstractNotificationConsumer.java b/notification/src/main/java/org/apache/atlas/notification/AbstractNotificationConsumer.java
index d4d78de..8cf1e8e 100644
--- a/notification/src/main/java/org/apache/atlas/notification/AbstractNotificationConsumer.java
+++ b/notification/src/main/java/org/apache/atlas/notification/AbstractNotificationConsumer.java
@@ -16,6 +16,7 @@
  * limitations under the License.
  */
 package org.apache.atlas.notification;
+import org.apache.kafka.common.TopicPartition;
 
 /**
  * Abstract notification consumer.
@@ -25,10 +26,9 @@ public abstract class AbstractNotificationConsumer<T> implements NotificationCon
     /**
      * Deserializer used to deserialize notification messages for this consumer.
      */
-    private final MessageDeserializer<T> deserializer;
+    protected final MessageDeserializer<T> deserializer;
 
 
-    // ----- Constructors ----------------------------------------------------
 
     /**
      * Construct an AbstractNotificationConsumer.
@@ -40,34 +40,6 @@ public abstract class AbstractNotificationConsumer<T> implements NotificationCon
     }
 
 
-    // ----- AbstractNotificationConsumer -------------------------------------
 
-    /**
-     * Get the next notification as a string.
-     *
-     * @return the next notification in string form
-     */
-    protected abstract String getNext();
-
-    /**
-     * Get the next notification as a string without advancing.
-     *
-     * @return the next notification in string form
-     */
-    protected abstract String peekMessage();
-
-
-    // ----- NotificationConsumer ---------------------------------------------
-
-    @Override
-    public T next() {
-        return deserializer.deserialize(getNext());
-    }
-
-    @Override
-    public T peek() {
-        return deserializer.deserialize(peekMessage());
-    }
-
-    public abstract void commit();
+    public abstract void commit(TopicPartition partition, long offset);
 }

http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/0e7f8ea4/notification/src/main/java/org/apache/atlas/notification/NotificationConsumer.java
----------------------------------------------------------------------
diff --git a/notification/src/main/java/org/apache/atlas/notification/NotificationConsumer.java b/notification/src/main/java/org/apache/atlas/notification/NotificationConsumer.java
index a99cb10..22e40f9 100644
--- a/notification/src/main/java/org/apache/atlas/notification/NotificationConsumer.java
+++ b/notification/src/main/java/org/apache/atlas/notification/NotificationConsumer.java
@@ -17,32 +17,16 @@
  */
 package org.apache.atlas.notification;
 
+import java.util.List;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.atlas.kafka.AtlasKafkaMessage;
+
 /**
  * Atlas notification consumer.  This consumer blocks until a notification can be read.
  *
  * @param <T>  the class type of notifications returned by this consumer
  */
 public interface NotificationConsumer<T> {
-    /**
-     * Returns true when the consumer has more notifications.  Blocks until a notification becomes available.
-     *
-     * @return true when the consumer has notifications to be read
-     */
-    boolean hasNext();
-
-    /**
-     * Returns the next notification.
-     *
-     * @return the next notification
-     */
-    T next();
-
-    /**
-     * Returns the next notification without advancing.
-     *
-     * @return the next notification
-     */
-    T peek();
 
     /**
      * Commit the offset of messages that have been successfully processed.
@@ -51,7 +35,14 @@ public interface NotificationConsumer<T> {
      * the consumer is ready to handle the next message, which could happen even after a normal or an abnormal
      * restart.
      */
-    void commit();
+    void commit(TopicPartition partition, long offset);
 
     void close();
+
+    /**
+     * Fetch data for the topics from Kafka
+     * @param timeoutMilliSeconds poll timeout
+     * @return List containing kafka message and partionId and offset.
+     */
+    List<AtlasKafkaMessage<T>> receive(long timeoutMilliSeconds);
 }

http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/0e7f8ea4/notification/src/test/java/org/apache/atlas/kafka/KafkaConsumerTest.java
----------------------------------------------------------------------
diff --git a/notification/src/test/java/org/apache/atlas/kafka/KafkaConsumerTest.java b/notification/src/test/java/org/apache/atlas/kafka/KafkaConsumerTest.java
index ad7d93e..70059cb 100644
--- a/notification/src/test/java/org/apache/atlas/kafka/KafkaConsumerTest.java
+++ b/notification/src/test/java/org/apache/atlas/kafka/KafkaConsumerTest.java
@@ -18,13 +18,9 @@
 
 package org.apache.atlas.kafka;
 
-import kafka.consumer.ConsumerIterator;
-import kafka.consumer.KafkaStream;
-import kafka.javaapi.consumer.ConsumerConnector;
 import kafka.message.MessageAndMetadata;
 import org.apache.atlas.notification.AbstractNotification;
 import org.apache.atlas.notification.MessageVersion;
-import org.apache.atlas.notification.NotificationConsumer;
 import org.apache.atlas.notification.NotificationInterface;
 import org.apache.atlas.notification.IncompatibleVersionException;
 import org.apache.atlas.notification.VersionedMessage;
@@ -33,6 +29,11 @@ import org.apache.atlas.notification.hook.HookNotification;
 import org.apache.atlas.typesystem.IStruct;
 import org.apache.atlas.typesystem.Referenceable;
 import org.apache.atlas.typesystem.Struct;
+import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.apache.kafka.clients.consumer.ConsumerRecords;
+import org.apache.kafka.clients.consumer.KafkaConsumer;
+import org.apache.kafka.clients.consumer.OffsetAndMetadata;
+import org.apache.kafka.common.TopicPartition;
 import org.codehaus.jettison.json.JSONException;
 import org.mockito.Mock;
 import org.mockito.MockitoAnnotations;
@@ -42,7 +43,10 @@ import org.testng.annotations.Test;
 import java.util.Collections;
 import java.util.LinkedList;
 import java.util.List;
-import java.util.NoSuchElementException;
+import java.util.Arrays;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.Map;
 
 import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.never;
@@ -57,8 +61,10 @@ public class KafkaConsumerTest {
 
     private static final String TRAIT_NAME = "MyTrait";
 
+
     @Mock
-    private ConsumerConnector consumerConnector;
+    private KafkaConsumer kafkaConsumer;
+
 
     @BeforeMethod
     public void setup() {
@@ -66,9 +72,9 @@ public class KafkaConsumerTest {
     }
 
     @Test
-    public void testNext() throws Exception {
-        KafkaStream<String, String> stream = mock(KafkaStream.class);
-        ConsumerIterator<String, String> iterator = mock(ConsumerIterator.class);
+    public void testReceive() throws Exception {
+
+
         MessageAndMetadata<String, String> messageAndMetadata = mock(MessageAndMetadata.class);
 
         Referenceable entity = getEntity(TRAIT_NAME);
@@ -78,29 +84,34 @@ public class KafkaConsumerTest {
 
         String json = AbstractNotification.GSON.toJson(new VersionedMessage<>(new MessageVersion("1.0.0"), message));
 
-        when(stream.iterator()).thenReturn(iterator);
-        when(iterator.hasNext()).thenReturn(true).thenReturn(false);
-        when(iterator.next()).thenReturn(messageAndMetadata).thenThrow(new NoSuchElementException());
+        kafkaConsumer.assign(Arrays.asList(new TopicPartition("ATLAS_HOOK", 0)));
+        List<ConsumerRecord> klist = new ArrayList<>();
+        klist.add(new ConsumerRecord<String, String>("ATLAS_HOOK",
+                0, 0L, "mykey", json));
+
+        TopicPartition tp = new TopicPartition("ATLAS_HOOK",0);
+        Map mp = new HashMap();
+        mp.put(tp,klist);
+        ConsumerRecords records = new ConsumerRecords(mp);
+
+
+        when(kafkaConsumer.poll(1000)).thenReturn(records);
         when(messageAndMetadata.message()).thenReturn(json);
 
-        NotificationConsumer<HookNotification.HookNotificationMessage> consumer =
-            new KafkaConsumer<>(
-                    NotificationInterface.NotificationType.HOOK.getDeserializer(), stream, 99,
-                    consumerConnector, false);
 
-        assertTrue(consumer.hasNext());
+        AtlasKafkaConsumer consumer = new AtlasKafkaConsumer(NotificationInterface.NotificationType.HOOK.getDeserializer(), kafkaConsumer,false);
+        List<AtlasKafkaMessage<HookNotification.HookNotificationMessage>> messageList = consumer.receive(1000);
+        assertTrue(messageList.size() > 0);
 
-        HookNotification.HookNotificationMessage consumedMessage = consumer.next();
+        HookNotification.HookNotificationMessage consumedMessage  = messageList.get(0).getMessage();
 
         assertMessagesEqual(message, consumedMessage, entity);
 
-        assertFalse(consumer.hasNext());
     }
 
     @Test
     public void testNextVersionMismatch() throws Exception {
-        KafkaStream<String, String> stream = mock(KafkaStream.class);
-        ConsumerIterator<String, String> iterator = mock(ConsumerIterator.class);
+
         MessageAndMetadata<String, String> messageAndMetadata = mock(MessageAndMetadata.class);
 
         Referenceable entity = getEntity(TRAIT_NAME);
@@ -110,84 +121,56 @@ public class KafkaConsumerTest {
 
         String json = AbstractNotification.GSON.toJson(new VersionedMessage<>(new MessageVersion("2.0.0"), message));
 
-        when(stream.iterator()).thenReturn(iterator);
-        when(iterator.hasNext()).thenReturn(true).thenReturn(false);
-        when(iterator.next()).thenReturn(messageAndMetadata).thenThrow(new NoSuchElementException());
-        when(messageAndMetadata.message()).thenReturn(json);
+        kafkaConsumer.assign(Arrays.asList(new TopicPartition("ATLAS_HOOK", 0)));
+        List<ConsumerRecord> klist = new ArrayList<>();
+        klist.add(new ConsumerRecord<String, String>("ATLAS_HOOK",
+                0, 0L, "mykey", json));
 
-        NotificationConsumer<HookNotification.HookNotificationMessage> consumer =
-            new KafkaConsumer<>(
-                    NotificationInterface.NotificationType.HOOK.getDeserializer(), stream, 99,
-                    consumerConnector, false);
+        TopicPartition tp = new TopicPartition("ATLAS_HOOK",0);
+        Map mp = new HashMap();
+        mp.put(tp,klist);
+        ConsumerRecords records = new ConsumerRecords(mp);
 
-        assertTrue(consumer.hasNext());
+        when(kafkaConsumer.poll(1000)).thenReturn(records);
+        when(messageAndMetadata.message()).thenReturn(json);
 
+        AtlasKafkaConsumer consumer =new AtlasKafkaConsumer(NotificationInterface.NotificationType.HOOK.getDeserializer(), kafkaConsumer ,false);
         try {
-            consumer.next();
+            List<AtlasKafkaMessage<HookNotification.HookNotificationMessage>> messageList = consumer.receive(1000);
+            assertTrue(messageList.size() > 0);
+
+            HookNotification.HookNotificationMessage consumedMessage  = messageList.get(0).getMessage();
+
             fail("Expected VersionMismatchException!");
         } catch (IncompatibleVersionException e) {
             e.printStackTrace();
         }
 
-        assertFalse(consumer.hasNext());
-    }
-
-    @Test
-    public void testPeekMessage() throws Exception {
-        KafkaStream<String, String> stream = mock(KafkaStream.class);
-        ConsumerIterator<String, String> iterator = mock(ConsumerIterator.class);
-        MessageAndMetadata<String, String> messageAndMetadata = mock(MessageAndMetadata.class);
+  }
 
-        Referenceable entity = getEntity(TRAIT_NAME);
 
-        HookNotification.EntityUpdateRequest message =
-            new HookNotification.EntityUpdateRequest("user1", entity);
-
-        String json = AbstractNotification.GSON.toJson(new VersionedMessage<>(new MessageVersion("1.0.0"), message));
-
-        when(stream.iterator()).thenReturn(iterator);
-        when(iterator.hasNext()).thenReturn(true);
-        when(iterator.peek()).thenReturn(messageAndMetadata);
-        when(messageAndMetadata.message()).thenReturn(json);
-
-        NotificationConsumer<HookNotification.HookNotificationMessage> consumer =
-            new KafkaConsumer<>(
-                    NotificationInterface.NotificationType.HOOK.getDeserializer(), stream, 99,
-                    consumerConnector, false);
+    @Test
+    public void testCommitIsCalledIfAutoCommitDisabled() {
 
-        assertTrue(consumer.hasNext());
+        TopicPartition tp = new TopicPartition("ATLAS_HOOK",0);
 
-        HookNotification.HookNotificationMessage consumedMessage = consumer.peek();
+        AtlasKafkaConsumer consumer =new AtlasKafkaConsumer(NotificationInterface.NotificationType.HOOK.getDeserializer(), kafkaConsumer, false);
 
-        assertMessagesEqual(message, consumedMessage, entity);
+        consumer.commit(tp, 1);
 
-        assertTrue(consumer.hasNext());
+        verify(kafkaConsumer).commitSync(Collections.singletonMap(tp, new OffsetAndMetadata(1)));
     }
 
     @Test
-    public void testCommitIsCalledIfAutoCommitDisabled() {
-        KafkaStream<String, String> stream = mock(KafkaStream.class);
-        NotificationConsumer<HookNotification.HookNotificationMessage> consumer =
-                new KafkaConsumer<>(
-                        NotificationInterface.NotificationType.HOOK.getDeserializer(), stream, 99,
-                        consumerConnector, false);
-
-        consumer.commit();
+    public void testCommitIsNotCalledIfAutoCommitEnabled() {
 
-        verify(consumerConnector).commitOffsets();
-    }
+        TopicPartition tp = new TopicPartition("ATLAS_HOOK",0);
 
-    @Test
-    public void testCommitIsNotCalledIfAutoCommitEnabled() {
-        KafkaStream<String, String> stream = mock(KafkaStream.class);
-        NotificationConsumer<HookNotification.HookNotificationMessage> consumer =
-                new KafkaConsumer<>(
-                        NotificationInterface.NotificationType.HOOK.getDeserializer(), stream, 99,
-                        consumerConnector, true);
+        AtlasKafkaConsumer consumer =new AtlasKafkaConsumer(NotificationInterface.NotificationType.HOOK.getDeserializer(), kafkaConsumer, true);
 
-        consumer.commit();
+        consumer.commit(tp, 1);
 
-        verify(consumerConnector, never()).commitOffsets();
+        verify(kafkaConsumer, never()).commitSync(Collections.singletonMap(tp, new OffsetAndMetadata(1)));
     }
 
     private Referenceable getEntity(String traitName) {

http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/0e7f8ea4/notification/src/test/java/org/apache/atlas/kafka/KafkaNotificationMockTest.java
----------------------------------------------------------------------
diff --git a/notification/src/test/java/org/apache/atlas/kafka/KafkaNotificationMockTest.java b/notification/src/test/java/org/apache/atlas/kafka/KafkaNotificationMockTest.java
index 2126be6..b7474a0 100644
--- a/notification/src/test/java/org/apache/atlas/kafka/KafkaNotificationMockTest.java
+++ b/notification/src/test/java/org/apache/atlas/kafka/KafkaNotificationMockTest.java
@@ -24,12 +24,13 @@ import org.apache.atlas.notification.MessageDeserializer;
 import org.apache.atlas.notification.NotificationConsumer;
 import org.apache.atlas.notification.NotificationException;
 import org.apache.atlas.notification.NotificationInterface;
+import org.apache.kafka.clients.consumer.KafkaConsumer;
 import org.apache.kafka.clients.producer.Producer;
 import org.apache.kafka.clients.producer.ProducerRecord;
 import org.apache.kafka.clients.producer.RecordMetadata;
 import org.apache.kafka.common.TopicPartition;
 import org.testng.annotations.Test;
-
+import org.apache.kafka.clients.consumer.KafkaConsumer;
 import java.util.ArrayList;
 import java.util.HashMap;
 import java.util.List;
@@ -37,7 +38,7 @@ import java.util.Map;
 import java.util.Properties;
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.Future;
-
+import org.apache.atlas.kafka.AtlasKafkaConsumer;
 import static org.mockito.Matchers.any;
 import static org.mockito.Matchers.eq;
 import static org.mockito.Mockito.mock;
@@ -55,36 +56,24 @@ public class KafkaNotificationMockTest {
     public void testCreateConsumers() throws Exception {
         Properties properties = mock(Properties.class);
         when(properties.getProperty("entities.group.id")).thenReturn("atlas");
-        final ConsumerConnector consumerConnector = mock(ConsumerConnector.class);
         Map<String, Integer> topicCountMap = new HashMap<>();
         topicCountMap.put(KafkaNotification.ATLAS_ENTITIES_TOPIC, 1);
 
-        Map<String, List<KafkaStream<String, String>>> kafkaStreamsMap =
-                new HashMap<>();
-        List<KafkaStream<String, String>> kafkaStreams = new ArrayList<>();
-        KafkaStream kafkaStream = mock(KafkaStream.class);
-        kafkaStreams.add(kafkaStream);
-        kafkaStreamsMap.put(KafkaNotification.ATLAS_ENTITIES_TOPIC, kafkaStreams);
-
-        when(consumerConnector.createMessageStreams(
-                eq(topicCountMap), any(StringDecoder.class), any(StringDecoder.class))).thenReturn(kafkaStreamsMap);
-
-        final KafkaConsumer consumer1 = mock(KafkaConsumer.class);
-        final KafkaConsumer consumer2 = mock(KafkaConsumer.class);
+        final AtlasKafkaConsumer consumer1 = mock(AtlasKafkaConsumer.class);
+        final AtlasKafkaConsumer consumer2 = mock(AtlasKafkaConsumer.class);
 
         KafkaNotification kafkaNotification =
-                new TestKafkaNotification(properties, consumerConnector, consumer1, consumer2);
+                new TestKafkaNotification(properties, consumer1, consumer2);
 
-        List<NotificationConsumer<String>> consumers =
+        List<NotificationConsumer<AtlasKafkaConsumer>> consumers =
                 kafkaNotification.createConsumers(NotificationInterface.NotificationType.ENTITIES, 2);
 
-        verify(consumerConnector, times(2)).createMessageStreams(
-                eq(topicCountMap), any(StringDecoder.class), any(StringDecoder.class));
         assertEquals(consumers.size(), 2);
         assertTrue(consumers.contains(consumer1));
         assertTrue(consumers.contains(consumer2));
     }
 
+
     @Test
     @SuppressWarnings("unchecked")
     public void shouldSendMessagesSuccessfully() throws NotificationException,
@@ -164,27 +153,28 @@ public class KafkaNotificationMockTest {
 
     class TestKafkaNotification extends KafkaNotification {
 
-        private final ConsumerConnector consumerConnector;
-        private final KafkaConsumer consumer1;
-        private final KafkaConsumer consumer2;
+        private final AtlasKafkaConsumer consumer1;
+        private final AtlasKafkaConsumer consumer2;
 
-        TestKafkaNotification(Properties properties, ConsumerConnector consumerConnector,
-                              KafkaConsumer consumer1, KafkaConsumer consumer2) {
+        TestKafkaNotification(Properties properties,
+                              AtlasKafkaConsumer consumer1, AtlasKafkaConsumer consumer2) {
             super(properties);
-            this.consumerConnector = consumerConnector;
             this.consumer1 = consumer1;
             this.consumer2 = consumer2;
         }
 
+
         @Override
-        protected ConsumerConnector createConsumerConnector(Properties consumerProperties) {
-            return consumerConnector;
+        public <T> List<NotificationConsumer<T>> createConsumers(NotificationType notificationType,
+                                                                 int numConsumers) {
+            List consumerList = new ArrayList<NotificationConsumer>();
+            consumerList.add(consumer1);
+            consumerList.add(consumer2);
+            return consumerList;
         }
 
-        @Override
-        protected <T> org.apache.atlas.kafka.KafkaConsumer<T>
-        createKafkaConsumer(Class<T> type, MessageDeserializer<T> deserializer, KafkaStream stream,
-                            int consumerId, ConsumerConnector connector, boolean autoCommitEnabled) {
+        protected <T> AtlasKafkaConsumer<T>
+        createConsumers(Class<T> type, int consumerId,  boolean autoCommitEnabled) {
             if (consumerId == 0) {
                 return consumer1;
             } else if (consumerId == 1) {

http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/0e7f8ea4/notification/src/test/java/org/apache/atlas/kafka/KafkaNotificationTest.java
----------------------------------------------------------------------
diff --git a/notification/src/test/java/org/apache/atlas/kafka/KafkaNotificationTest.java b/notification/src/test/java/org/apache/atlas/kafka/KafkaNotificationTest.java
index a810029..c791d43 100644
--- a/notification/src/test/java/org/apache/atlas/kafka/KafkaNotificationTest.java
+++ b/notification/src/test/java/org/apache/atlas/kafka/KafkaNotificationTest.java
@@ -28,6 +28,9 @@ import org.apache.commons.lang.RandomStringUtils;
 import org.testng.annotations.AfterClass;
 import org.testng.annotations.BeforeClass;
 import org.testng.annotations.Test;
+import static org.apache.atlas.notification.hook.HookNotification.HookNotificationMessage;
+
+import java.util.List;
 
 import static org.testng.Assert.assertEquals;
 import static org.testng.Assert.assertTrue;
@@ -52,7 +55,7 @@ public class KafkaNotificationTest {
     }
 
     @Test
-    public void testNext() throws Exception {
+    public void testReceiveKafkaMessages() throws Exception {
         kafkaNotification.send(NotificationInterface.NotificationType.HOOK,
                 new HookNotification.EntityCreateRequest("u1", new Referenceable("type")));
         kafkaNotification.send(NotificationInterface.NotificationType.HOOK,
@@ -64,44 +67,21 @@ public class KafkaNotificationTest {
 
         NotificationConsumer<Object> consumer =
                 kafkaNotification.createConsumers(NotificationInterface.NotificationType.HOOK, 1).get(0);
-        assertTrue(consumer.hasNext());
-        HookNotification.HookNotificationMessage message = (HookNotification.HookNotificationMessage) consumer.next();
-        assertEquals(message.getUser(), "u1");
-
-        assertTrue(consumer.hasNext());
-        message = (HookNotification.HookNotificationMessage) consumer.next();
-        assertEquals(message.getUser(), "u2");
-        consumer.close();
-
-        //nothing committed(even though u1 and u2 are read), now should restart from u1
-        consumer = kafkaNotification.createConsumers(NotificationInterface.NotificationType.HOOK, 1).get(0);
-        assertTrue(consumer.hasNext());
-        message = (HookNotification.HookNotificationMessage) consumer.next();
-        assertEquals(message.getUser(), "u1");
-        consumer.commit();
-
-        assertTrue(consumer.hasNext());
-        message = (HookNotification.HookNotificationMessage) consumer.next();
-        assertEquals(message.getUser(), "u2");
-        consumer.close();
-
-        //u1 committed, u2 read, should start from u2
-        consumer = kafkaNotification.createConsumers(NotificationInterface.NotificationType.HOOK, 1).get(0);
-        assertTrue(consumer.hasNext());
-        message = (HookNotification.HookNotificationMessage) consumer.next();
-        assertEquals(message.getUser(), "u2");
-
-        assertTrue(consumer.hasNext());
-        message = (HookNotification.HookNotificationMessage) consumer.next();
-        assertEquals(message.getUser(), "u3");
-        consumer.commit();
-        consumer.close();
+        List<AtlasKafkaMessage<Object>> messages = null ;
+        long startTime = System.currentTimeMillis(); //fetch starting time
+        while ((System.currentTimeMillis() - startTime) < 10000) {
+             messages = consumer.receive(1000L);
+            if (messages.size() > 0) {
+                break;
+            }
+        }
+
+        int i=1;
+        for (AtlasKafkaMessage<Object> msg :  messages){
+            HookNotification.HookNotificationMessage message =  (HookNotificationMessage) msg.getMessage();
+            assertEquals(message.getUser(), "u"+i++);
+        }
 
-        //u2, u3 read, but only u3 committed, should start from u4
-        consumer = kafkaNotification.createConsumers(NotificationInterface.NotificationType.HOOK, 1).get(0);
-        assertTrue(consumer.hasNext());
-        message = (HookNotification.HookNotificationMessage) consumer.next();
-        assertEquals(message.getUser(), "u4");
         consumer.close();
     }
 }

http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/0e7f8ea4/notification/src/test/java/org/apache/atlas/notification/AbstractNotificationConsumerTest.java
----------------------------------------------------------------------
diff --git a/notification/src/test/java/org/apache/atlas/notification/AbstractNotificationConsumerTest.java b/notification/src/test/java/org/apache/atlas/notification/AbstractNotificationConsumerTest.java
index 13f2f0b..8324b57 100644
--- a/notification/src/test/java/org/apache/atlas/notification/AbstractNotificationConsumerTest.java
+++ b/notification/src/test/java/org/apache/atlas/notification/AbstractNotificationConsumerTest.java
@@ -20,10 +20,12 @@ package org.apache.atlas.notification;
 
 import com.google.gson.Gson;
 import com.google.gson.reflect.TypeToken;
+import org.apache.atlas.kafka.AtlasKafkaMessage;
 import org.slf4j.Logger;
 import org.testng.annotations.Test;
 
 import java.lang.reflect.Type;
+import java.util.ArrayList;
 import java.util.LinkedList;
 import java.util.List;
 import java.util.Objects;
@@ -35,6 +37,7 @@ import static org.testng.Assert.assertEquals;
 import static org.testng.Assert.assertFalse;
 import static org.testng.Assert.assertTrue;
 import static org.testng.Assert.fail;
+import org.apache.kafka.common.TopicPartition;
 
 /**
  * AbstractNotificationConsumer tests.
@@ -44,7 +47,7 @@ public class AbstractNotificationConsumerTest {
     private static final Gson GSON = new Gson();
 
     @Test
-    public void testNext() throws Exception {
+    public void testReceive() throws Exception {
         Logger logger = mock(Logger.class);
 
         TestMessage testMessage1 = new TestMessage("sValue1", 99);
@@ -52,7 +55,7 @@ public class AbstractNotificationConsumerTest {
         TestMessage testMessage3 = new TestMessage("sValue3", 97);
         TestMessage testMessage4 = new TestMessage("sValue4", 96);
 
-        List<String> jsonList = new LinkedList<>();
+        List jsonList = new LinkedList<>();
 
         jsonList.add(GSON.toJson(new VersionedMessage<>(new MessageVersion("1.0.0"), testMessage1)));
         jsonList.add(GSON.toJson(new VersionedMessage<>(new MessageVersion("1.0.0"), testMessage2)));
@@ -62,25 +65,19 @@ public class AbstractNotificationConsumerTest {
         Type versionedMessageType = new TypeToken<VersionedMessage<TestMessage>>(){}.getType();
 
         NotificationConsumer<TestMessage> consumer =
-            new TestNotificationConsumer<>(versionedMessageType, jsonList, logger);
-
-        assertTrue(consumer.hasNext());
-
-        assertEquals(testMessage1, consumer.next());
+                new TestNotificationConsumer<>(versionedMessageType, jsonList, logger);
 
-        assertTrue(consumer.hasNext());
+        List<AtlasKafkaMessage<TestMessage>> messageList = consumer.receive(1000L);
 
-        assertEquals(testMessage2, consumer.next());
+        assertFalse(messageList.isEmpty());
 
-        assertTrue(consumer.hasNext());
+        assertEquals(testMessage1, messageList.get(0).getMessage());
 
-        assertEquals(testMessage3, consumer.next());
+        assertEquals(testMessage2, messageList.get(1).getMessage());
 
-        assertTrue(consumer.hasNext());
+        assertEquals(testMessage3, messageList.get(2).getMessage());
 
-        assertEquals(testMessage4, consumer.next());
-
-        assertFalse(consumer.hasNext());
+        assertEquals(testMessage4, messageList.get(3).getMessage());
     }
 
     @Test
@@ -92,7 +89,7 @@ public class AbstractNotificationConsumerTest {
         TestMessage testMessage3 = new TestMessage("sValue3", 97);
         TestMessage testMessage4 = new TestMessage("sValue4", 96);
 
-        List<String> jsonList = new LinkedList<>();
+        List jsonList = new LinkedList<>();
 
         String json1 = GSON.toJson(new VersionedMessage<>(new MessageVersion("1.0.0"), testMessage1));
         String json2 = GSON.toJson(new VersionedMessage<>(new MessageVersion("0.0.5"), testMessage2));
@@ -108,26 +105,17 @@ public class AbstractNotificationConsumerTest {
 
         NotificationConsumer<TestMessage> consumer =
             new TestNotificationConsumer<>(versionedMessageType, jsonList, logger);
-        assertTrue(consumer.hasNext());
-
-        assertEquals(new TestMessage("sValue1", 99), consumer.next());
-
-        assertTrue(consumer.hasNext());
 
-        assertEquals(new TestMessage("sValue2", 98), consumer.next());
-        verify(logger).info(endsWith(json2));
+        List<AtlasKafkaMessage<TestMessage>> messageList = consumer.receive(1000L);
 
-        assertTrue(consumer.hasNext());
+        assertEquals(new TestMessage("sValue1", 99), messageList.get(0).getMessage());
 
-        assertEquals(new TestMessage("sValue3", 97), consumer.next());
-        verify(logger).info(endsWith(json3));
+        assertEquals(new TestMessage("sValue2", 98), messageList.get(1).getMessage());
 
-        assertTrue(consumer.hasNext());
+        assertEquals(new TestMessage("sValue3", 97), messageList.get(2).getMessage());
 
-        assertEquals(new TestMessage("sValue4", 96), consumer.next());
-        verify(logger).info(endsWith(json4));
+        assertEquals(new TestMessage("sValue4", 96), messageList.get(3).getMessage());
 
-        assertFalse(consumer.hasNext());
     }
 
     @Test
@@ -137,7 +125,7 @@ public class AbstractNotificationConsumerTest {
         TestMessage testMessage1 = new TestMessage("sValue1", 99);
         TestMessage testMessage2 = new TestMessage("sValue2", 98);
 
-        List<String> jsonList = new LinkedList<>();
+        List jsonList = new LinkedList<>();
 
         String json1 = GSON.toJson(new VersionedMessage<>(new MessageVersion("1.0.0"), testMessage1));
         String json2 = GSON.toJson(new VersionedMessage<>(new MessageVersion("2.0.0"), testMessage2));
@@ -149,52 +137,19 @@ public class AbstractNotificationConsumerTest {
 
         NotificationConsumer<TestMessage> consumer =
             new TestNotificationConsumer<>(versionedMessageType, jsonList, logger);
-        assertTrue(consumer.hasNext());
-
-        assertEquals(testMessage1, consumer.next());
+        try {
+            List<AtlasKafkaMessage<TestMessage>> messageList = consumer.receive(1000L);
 
-        assertTrue(consumer.hasNext());
+            messageList.get(1).getMessage();
 
-        try {
-            consumer.next();
             fail("Expected VersionMismatchException!");
         } catch (IncompatibleVersionException e) {
-            verify(logger).error(endsWith(json2));
+
         }
 
-        assertFalse(consumer.hasNext());
     }
 
-    @Test
-    public void testPeek() throws Exception {
-        Logger logger = mock(Logger.class);
-
-        TestMessage testMessage1 = new TestMessage("sValue1", 99);
-        TestMessage testMessage2 = new TestMessage("sValue2", 98);
-        TestMessage testMessage3 = new TestMessage("sValue3", 97);
-        TestMessage testMessage4 = new TestMessage("sValue4", 96);
-
-        List<String> jsonList = new LinkedList<>();
-
-        jsonList.add(GSON.toJson(new VersionedMessage<>(new MessageVersion("1.0.0"), testMessage1)));
-        jsonList.add(GSON.toJson(new VersionedMessage<>(new MessageVersion("1.0.0"), testMessage2)));
-        jsonList.add(GSON.toJson(new VersionedMessage<>(new MessageVersion("1.0.0"), testMessage3)));
-        jsonList.add(GSON.toJson(new VersionedMessage<>(new MessageVersion("1.0.0"), testMessage4)));
-
-        Type versionedMessageType = new TypeToken<VersionedMessage<TestMessage>>(){}.getType();
-
-        NotificationConsumer<TestMessage> consumer =
-            new TestNotificationConsumer<>(versionedMessageType, jsonList, logger);
-        assertTrue(consumer.hasNext());
 
-        assertEquals(testMessage1, consumer.peek());
-
-        assertTrue(consumer.hasNext());
-
-        assertEquals(testMessage1, consumer.peek());
-
-        assertTrue(consumer.hasNext());
-    }
 
     private static class TestMessage {
         private String s;
@@ -229,31 +184,16 @@ public class AbstractNotificationConsumerTest {
     }
 
     private static class TestNotificationConsumer<T> extends AbstractNotificationConsumer<T> {
-        private final List<String> messageList;
+        private final List<T> messageList;
         private int index = 0;
 
-        public TestNotificationConsumer(Type versionedMessageType, List<String> messages, Logger logger) {
+        public TestNotificationConsumer(Type versionedMessageType, List<T> messages, Logger logger) {
             super(new TestDeserializer<T>(versionedMessageType, logger));
             this.messageList = messages;
         }
 
         @Override
-        protected String getNext() {
-            return messageList.get(index++);
-        }
-
-        @Override
-        protected String peekMessage() {
-            return messageList.get(index);
-        }
-
-        @Override
-        public boolean hasNext() {
-            return index < messageList.size();
-        }
-
-        @Override
-        public void commit() {
+        public void commit(TopicPartition partition, long offset) {
             // do nothing.
         }
 
@@ -261,6 +201,15 @@ public class AbstractNotificationConsumerTest {
         public void close() {
             //do nothing
         }
+
+        @Override
+        public List<AtlasKafkaMessage<T>> receive(long timeoutMilliSeconds) {
+            List<AtlasKafkaMessage<T>> tempMessageList = new ArrayList();
+            for(Object json :  messageList) {
+                tempMessageList.add(new AtlasKafkaMessage(deserializer.deserialize((String)json), -1, -1));
+            }
+            return tempMessageList;
+        }
     }
 
     private static final class TestDeserializer<T> extends VersionedMessageDeserializer<T> {

http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/0e7f8ea4/typesystem/src/test/resources/atlas-application.properties
----------------------------------------------------------------------
diff --git a/typesystem/src/test/resources/atlas-application.properties b/typesystem/src/test/resources/atlas-application.properties
index c4ce5ea..7967b76 100644
--- a/typesystem/src/test/resources/atlas-application.properties
+++ b/typesystem/src/test/resources/atlas-application.properties
@@ -91,7 +91,13 @@ atlas.kafka.consumer.timeout.ms=4000
 atlas.kafka.auto.commit.interval.ms=100
 atlas.kafka.hook.group.id=atlas
 atlas.kafka.entities.group.id=atlas_entities
-atlas.kafka.auto.commit.enable=false
+#atlas.kafka.auto.commit.enable=false
+
+atlas.kafka.enable.auto.commit=false
+atlas.kafka.auto.offset.reset=earliest
+atlas.kafka.session.timeout.ms=30000
+
+
 
 #########  Entity Audit Configs  #########
 atlas.audit.hbase.tablename=ATLAS_ENTITY_AUDIT_EVENTS

http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/0e7f8ea4/webapp/src/main/java/org/apache/atlas/notification/NotificationHookConsumer.java
----------------------------------------------------------------------
diff --git a/webapp/src/main/java/org/apache/atlas/notification/NotificationHookConsumer.java b/webapp/src/main/java/org/apache/atlas/notification/NotificationHookConsumer.java
index 2f8245d..9e5b864 100644
--- a/webapp/src/main/java/org/apache/atlas/notification/NotificationHookConsumer.java
+++ b/webapp/src/main/java/org/apache/atlas/notification/NotificationHookConsumer.java
@@ -19,16 +19,15 @@ package org.apache.atlas.notification;
 
 import com.google.common.annotations.VisibleForTesting;
 import com.google.common.util.concurrent.ThreadFactoryBuilder;
-import kafka.consumer.ConsumerTimeoutException;
 import org.apache.atlas.ApplicationProperties;
 import org.apache.atlas.AtlasException;
 import org.apache.atlas.AtlasServiceException;
 import org.apache.atlas.RequestContext;
 import org.apache.atlas.RequestContextV1;
 import org.apache.atlas.ha.HAConfiguration;
+import org.apache.atlas.kafka.AtlasKafkaMessage;
 import org.apache.atlas.listener.ActiveStateChangeHandler;
 import org.apache.atlas.model.instance.AtlasEntity;
-import org.apache.atlas.notification.hook.HookNotification;
 import org.apache.atlas.notification.hook.HookNotification.EntityPartialUpdateRequest;
 import org.apache.atlas.repository.converters.AtlasInstanceConverter;
 import org.apache.atlas.repository.store.graph.AtlasEntityStore;
@@ -46,7 +45,7 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.springframework.core.annotation.Order;
 import org.springframework.stereotype.Component;
-
+import org.apache.kafka.common.TopicPartition;
 import javax.inject.Inject;
 import java.util.ArrayList;
 import java.util.Date;
@@ -135,14 +134,14 @@ public class NotificationHookConsumer implements Service, ActiveStateChangeHandl
 
     private void startConsumers(ExecutorService executorService) {
         int numThreads = applicationProperties.getInt(CONSUMER_THREADS_PROPERTY, 1);
-        List<NotificationConsumer<HookNotification.HookNotificationMessage>> notificationConsumers =
+        List<NotificationConsumer<HookNotificationMessage>> notificationConsumers =
                 notificationInterface.createConsumers(NotificationInterface.NotificationType.HOOK, numThreads);
         if (executorService == null) {
             executorService = Executors.newFixedThreadPool(notificationConsumers.size(),
                     new ThreadFactoryBuilder().setNameFormat(THREADNAME_PREFIX + " thread-%d").build());
         }
         executors = executorService;
-        for (final NotificationConsumer<HookNotification.HookNotificationMessage> consumer : notificationConsumers) {
+        for (final NotificationConsumer<HookNotificationMessage> consumer : notificationConsumers) {
             HookConsumer hookConsumer = new HookConsumer(consumer);
             consumers.add(hookConsumer);
             executors.submit(hookConsumer);
@@ -207,21 +206,14 @@ public class NotificationHookConsumer implements Service, ActiveStateChangeHandl
     }
 
     class HookConsumer implements Runnable {
-        private final NotificationConsumer<HookNotification.HookNotificationMessage> consumer;
+        private final NotificationConsumer<HookNotificationMessage> consumer;
         private final AtomicBoolean shouldRun = new AtomicBoolean(false);
-        private List<HookNotification.HookNotificationMessage> failedMessages = new ArrayList<>();
+        private List<HookNotificationMessage> failedMessages = new ArrayList<>();
 
-        public HookConsumer(NotificationConsumer<HookNotification.HookNotificationMessage> consumer) {
+        public HookConsumer(NotificationConsumer<HookNotificationMessage> consumer) {
             this.consumer = consumer;
         }
 
-        private boolean hasNext() {
-            try {
-                return consumer.hasNext();
-            } catch (ConsumerTimeoutException e) {
-                return false;
-            }
-        }
 
         @Override
         public void run() {
@@ -233,8 +225,9 @@ public class NotificationHookConsumer implements Service, ActiveStateChangeHandl
 
             while (shouldRun.get()) {
                 try {
-                    if (hasNext()) {
-                        handleMessage(consumer.next());
+                    List<AtlasKafkaMessage<HookNotificationMessage>> messages = consumer.receive(1000L);
+                    for (AtlasKafkaMessage<HookNotificationMessage> msg :  messages){
+                        handleMessage(msg);
                     }
                 } catch (Throwable t) {
                     LOG.warn("Failure in NotificationHookConsumer", t);
@@ -243,7 +236,8 @@ public class NotificationHookConsumer implements Service, ActiveStateChangeHandl
         }
 
         @VisibleForTesting
-        void handleMessage(HookNotificationMessage message) throws AtlasServiceException, AtlasException {
+        void handleMessage(AtlasKafkaMessage<HookNotificationMessage> kafkaMsg) throws AtlasServiceException, AtlasException {
+            HookNotificationMessage message = kafkaMsg.getMessage();
             String messageUser = message.getUser();
             // Used for intermediate conversions during create and update
             AtlasEntity.AtlasEntitiesWithExtInfo entities;
@@ -345,7 +339,7 @@ public class NotificationHookConsumer implements Service, ActiveStateChangeHandl
                     RequestContextV1.clear();
                 }
             }
-            commit();
+            commit(kafkaMsg);
         }
 
         private void recordFailedMessages() {
@@ -356,9 +350,10 @@ public class NotificationHookConsumer implements Service, ActiveStateChangeHandl
             failedMessages.clear();
         }
 
-        private void commit() {
+        private void commit(AtlasKafkaMessage<HookNotificationMessage> kafkaMessage) {
             recordFailedMessages();
-            consumer.commit();
+            TopicPartition partition = new TopicPartition("ATLAS_HOOK", kafkaMessage.getPartition());
+            consumer.commit(partition, kafkaMessage.getOffset());
         }
 
         boolean serverAvailable(Timer timer) {

http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/0e7f8ea4/webapp/src/test/java/org/apache/atlas/notification/EntityNotificationIT.java
----------------------------------------------------------------------
diff --git a/webapp/src/test/java/org/apache/atlas/notification/EntityNotificationIT.java b/webapp/src/test/java/org/apache/atlas/notification/EntityNotificationIT.java
index ac3b538..7e94330 100644
--- a/webapp/src/test/java/org/apache/atlas/notification/EntityNotificationIT.java
+++ b/webapp/src/test/java/org/apache/atlas/notification/EntityNotificationIT.java
@@ -34,8 +34,6 @@ import org.apache.atlas.typesystem.types.TraitType;
 import org.apache.atlas.typesystem.types.utils.TypesUtil;
 import org.apache.atlas.web.integration.BaseResourceIT;
 import org.testng.annotations.BeforeClass;
-import org.testng.annotations.Test;
-
 import java.util.Collections;
 import java.util.LinkedList;
 import java.util.List;
@@ -55,7 +53,7 @@ public class EntityNotificationIT extends BaseResourceIT {
     private Id tableId;
     private Id dbId;
     private String traitName;
-    private NotificationConsumer<EntityNotification> notificationConsumer;
+    private NotificationConsumer notificationConsumer;
 
     @BeforeClass
     public void setUp() throws Exception {
@@ -64,13 +62,9 @@ public class EntityNotificationIT extends BaseResourceIT {
         Referenceable HiveDBInstance = createHiveDBInstanceBuiltIn(DATABASE_NAME);
         dbId = createInstance(HiveDBInstance);
 
-        List<NotificationConsumer<EntityNotification>> consumers =
-            notificationInterface.createConsumers(NotificationInterface.NotificationType.ENTITIES, 1);
-
-        notificationConsumer = consumers.iterator().next();
+        notificationConsumer = notificationInterface.createConsumers(NotificationInterface.NotificationType.ENTITIES, 1).get(0);
     }
 
-    @Test
     public void testCreateEntity() throws Exception {
         Referenceable tableInstance = createHiveTableInstanceBuiltIn(DATABASE_NAME, TABLE_NAME, dbId);
         tableId = createInstance(tableInstance);
@@ -81,7 +75,6 @@ public class EntityNotificationIT extends BaseResourceIT {
                 newNotificationPredicate(EntityNotification.OperationType.ENTITY_CREATE, HIVE_TABLE_TYPE_BUILTIN, guid));
     }
 
-    @Test(dependsOnMethods = "testCreateEntity")
     public void testUpdateEntity() throws Exception {
         final String property = "description";
         final String newValue = "New description!";
@@ -94,7 +87,6 @@ public class EntityNotificationIT extends BaseResourceIT {
                 newNotificationPredicate(EntityNotification.OperationType.ENTITY_UPDATE, HIVE_TABLE_TYPE_BUILTIN, guid));
     }
 
-    @Test
     public void testDeleteEntity() throws Exception {
         final String tableName = "table-" + randomString();
         final String dbName = "db-" + randomString();
@@ -116,7 +108,6 @@ public class EntityNotificationIT extends BaseResourceIT {
             newNotificationPredicate(EntityNotification.OperationType.ENTITY_DELETE, HIVE_TABLE_TYPE_BUILTIN, guid));
     }
 
-    @Test(dependsOnMethods = "testCreateEntity")
     public void testAddTrait() throws Exception {
         String superSuperTraitName = "SuperTrait" + randomString();
         createTrait(superSuperTraitName);
@@ -175,7 +166,6 @@ public class EntityNotificationIT extends BaseResourceIT {
         assertEquals(2, Collections.frequency(allTraitNames, superTraitName));
     }
 
-    @Test(dependsOnMethods = "testAddTrait")
     public void testDeleteTrait() throws Exception {
         final String guid = tableId._getId();
 

http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/0e7f8ea4/webapp/src/test/java/org/apache/atlas/notification/NotificationHookConsumerKafkaTest.java
----------------------------------------------------------------------
diff --git a/webapp/src/test/java/org/apache/atlas/notification/NotificationHookConsumerKafkaTest.java b/webapp/src/test/java/org/apache/atlas/notification/NotificationHookConsumerKafkaTest.java
index 18fd2ee..650ca0a 100644
--- a/webapp/src/test/java/org/apache/atlas/notification/NotificationHookConsumerKafkaTest.java
+++ b/webapp/src/test/java/org/apache/atlas/notification/NotificationHookConsumerKafkaTest.java
@@ -22,6 +22,7 @@ import org.apache.atlas.AtlasClient;
 import org.apache.atlas.AtlasException;
 import org.apache.atlas.AtlasServiceException;
 import org.apache.atlas.exception.AtlasBaseException;
+import org.apache.atlas.kafka.AtlasKafkaMessage;
 import org.apache.atlas.kafka.KafkaNotification;
 import org.apache.atlas.kafka.NotificationProvider;
 import org.apache.atlas.model.instance.AtlasEntity;
@@ -40,12 +41,21 @@ import org.testng.Assert;
 import org.testng.annotations.AfterTest;
 import org.testng.annotations.BeforeTest;
 import org.testng.annotations.Test;
+import static org.apache.atlas.notification.hook.HookNotification.HookNotificationMessage;
+import java.util.List;
 
+import org.apache.atlas.kafka.AtlasKafkaConsumer;
 import static org.mockito.Matchers.any;
 import static org.mockito.Matchers.anyBoolean;
 import static org.mockito.Matchers.anyString;
 import static org.mockito.Mockito.*;
 
+import org.apache.commons.configuration.Configuration;
+import org.apache.atlas.ApplicationProperties;
+import static org.testng.Assert.*;
+
+
+
 public class NotificationHookConsumerKafkaTest {
 
     public static final String NAME = "name";
@@ -80,6 +90,7 @@ public class NotificationHookConsumerKafkaTest {
 
     @AfterTest
     public void shutdown() {
+        kafkaNotification.close();
         kafkaNotification.stop();
     }
 
@@ -87,21 +98,19 @@ public class NotificationHookConsumerKafkaTest {
     public void testConsumerConsumesNewMessageWithAutoCommitDisabled() throws AtlasException, InterruptedException, AtlasBaseException {
         try {
             produceMessage(new HookNotification.EntityCreateRequest("test_user1", createEntity()));
-    
-            NotificationConsumer<HookNotification.HookNotificationMessage> consumer =
-                    createNewConsumer(kafkaNotification, false);
+
+            NotificationConsumer<HookNotificationMessage> consumer = createNewConsumer(kafkaNotification, false);
             NotificationHookConsumer notificationHookConsumer =
                     new NotificationHookConsumer(notificationInterface, atlasEntityStore, serviceState, instanceConverter, typeRegistry);
-            NotificationHookConsumer.HookConsumer hookConsumer =
-                    notificationHookConsumer.new HookConsumer(consumer);
-    
+            NotificationHookConsumer.HookConsumer hookConsumer = notificationHookConsumer.new HookConsumer(consumer);
+
             consumeOneMessage(consumer, hookConsumer);
             verify(atlasEntityStore).createOrUpdate(any(EntityStream.class), anyBoolean());
 
             // produce another message, and make sure it moves ahead. If commit succeeded, this would work.
             produceMessage(new HookNotification.EntityCreateRequest("test_user2", createEntity()));
             consumeOneMessage(consumer, hookConsumer);
-            verify(atlasEntityStore, times(2)).createOrUpdate(any(EntityStream.class), anyBoolean());
+            verify(atlasEntityStore,times(2)).createOrUpdate(any(EntityStream.class), anyBoolean());
             reset(atlasEntityStore);
         }
         finally {
@@ -113,42 +122,49 @@ public class NotificationHookConsumerKafkaTest {
     public void testConsumerRemainsAtSameMessageWithAutoCommitEnabled() throws Exception {
         try {
             produceMessage(new HookNotification.EntityCreateRequest("test_user3", createEntity()));
-    
-            NotificationConsumer<HookNotification.HookNotificationMessage> consumer =
-                    createNewConsumer(kafkaNotification, true);
+
+            NotificationConsumer<HookNotificationMessage> consumer = createNewConsumer(kafkaNotification, true);
+
+            assertNotNull (consumer);
+
             NotificationHookConsumer notificationHookConsumer =
                     new NotificationHookConsumer(notificationInterface, atlasEntityStore, serviceState, instanceConverter, typeRegistry);
-            NotificationHookConsumer.HookConsumer hookConsumer =
-                    notificationHookConsumer.new HookConsumer(consumer);
-    
+            NotificationHookConsumer.HookConsumer hookConsumer = notificationHookConsumer.new HookConsumer(consumer);
+
+
             consumeOneMessage(consumer, hookConsumer);
             verify(atlasEntityStore).createOrUpdate(any(EntityStream.class), anyBoolean());
-    
+
             // produce another message, but this will not be consumed, as commit code is not executed in hook consumer.
             produceMessage(new HookNotification.EntityCreateRequest("test_user4", createEntity()));
-    
+
             consumeOneMessage(consumer, hookConsumer);
-            verify(atlasEntityStore, times(2)).createOrUpdate(any(EntityStream.class), anyBoolean());
+            verify(atlasEntityStore,times(2)).createOrUpdate(any(EntityStream.class), anyBoolean());
         }
         finally {
             kafkaNotification.close();
         }
     }
 
-    NotificationConsumer<HookNotification.HookNotificationMessage> createNewConsumer(
-            KafkaNotification kafkaNotification, boolean autoCommitEnabled) {
-        return kafkaNotification.<HookNotification.HookNotificationMessage>createConsumers(
-                NotificationInterface.NotificationType.HOOK, 1, autoCommitEnabled).get(0);
+    AtlasKafkaConsumer<HookNotificationMessage> createNewConsumer(KafkaNotification kafkaNotification, boolean autoCommitEnabled) {
+        return (AtlasKafkaConsumer) kafkaNotification.createConsumers(NotificationInterface.NotificationType.HOOK, 1, autoCommitEnabled).get(0);
     }
 
-    void consumeOneMessage(NotificationConsumer<HookNotification.HookNotificationMessage> consumer,
+    void consumeOneMessage(NotificationConsumer<HookNotificationMessage> consumer,
                            NotificationHookConsumer.HookConsumer hookConsumer) throws InterruptedException {
-        while (!consumer.hasNext()) {
-            Thread.sleep(1000);
-        }
-
         try {
-            hookConsumer.handleMessage(consumer.next());
+            long startTime = System.currentTimeMillis(); //fetch starting time
+            while ((System.currentTimeMillis() - startTime) < 10000) {
+                List<AtlasKafkaMessage<HookNotificationMessage>> messages = consumer.receive(1000L);
+
+                for (AtlasKafkaMessage<HookNotificationMessage> msg : messages) {
+                    hookConsumer.handleMessage(msg);
+                }
+
+                if (messages.size() > 0) {
+                    break;
+                }
+            }
         } catch (AtlasServiceException | AtlasException e) {
             Assert.fail("Consumer failed with exception ", e);
         }
@@ -163,7 +179,10 @@ public class NotificationHookConsumerKafkaTest {
     }
 
     KafkaNotification startKafkaServer() throws AtlasException, InterruptedException {
-        KafkaNotification kafkaNotification = (KafkaNotification) notificationInterface;
+        Configuration applicationProperties = ApplicationProperties.get();
+        applicationProperties.setProperty("atlas.kafka.data", "target/" + RandomStringUtils.randomAlphanumeric(5));
+
+        kafkaNotification = new KafkaNotification(applicationProperties);
         kafkaNotification.start();
         Thread.sleep(2000);
         return kafkaNotification;
@@ -173,8 +192,8 @@ public class NotificationHookConsumerKafkaTest {
         return RandomStringUtils.randomAlphanumeric(10);
     }
 
-    private void produceMessage(HookNotification.HookNotificationMessage message) throws NotificationException {
-        notificationInterface.send(NotificationInterface.NotificationType.HOOK, message);
+    private void produceMessage(HookNotificationMessage message) throws NotificationException {
+        kafkaNotification.send(NotificationInterface.NotificationType.HOOK, message);
     }
 
 }

http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/0e7f8ea4/webapp/src/test/java/org/apache/atlas/notification/NotificationHookConsumerTest.java
----------------------------------------------------------------------
diff --git a/webapp/src/test/java/org/apache/atlas/notification/NotificationHookConsumerTest.java b/webapp/src/test/java/org/apache/atlas/notification/NotificationHookConsumerTest.java
index bdb60a2..f4ec56a 100644
--- a/webapp/src/test/java/org/apache/atlas/notification/NotificationHookConsumerTest.java
+++ b/webapp/src/test/java/org/apache/atlas/notification/NotificationHookConsumerTest.java
@@ -21,6 +21,7 @@ import org.apache.atlas.AtlasException;
 import org.apache.atlas.AtlasServiceException;
 import org.apache.atlas.exception.AtlasBaseException;
 import org.apache.atlas.ha.HAConfiguration;
+import org.apache.atlas.kafka.AtlasKafkaMessage;
 import org.apache.atlas.model.instance.AtlasEntity;
 import org.apache.atlas.model.instance.EntityMutationResponse;
 import org.apache.atlas.notification.hook.HookNotification;
@@ -36,7 +37,7 @@ import org.mockito.Mock;
 import org.mockito.MockitoAnnotations;
 import org.testng.annotations.BeforeMethod;
 import org.testng.annotations.Test;
-
+import org.apache.kafka.common.TopicPartition;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.List;
@@ -124,9 +125,8 @@ public class NotificationHookConsumerTest {
         Referenceable mock = mock(Referenceable.class);
         when(message.getEntities()).thenReturn(Arrays.asList(mock));
 
-        hookConsumer.handleMessage(message);
-
-        verify(consumer).commit();
+        hookConsumer.handleMessage(new AtlasKafkaMessage(message, -1, -1));
+        verify(consumer).commit(any(TopicPartition.class),anyInt());
     }
 
     @Test
@@ -141,7 +141,7 @@ public class NotificationHookConsumerTest {
             { add(mock(Referenceable.class)); }
         });
         when(atlasEntityStore.createOrUpdate(any(EntityStream.class), anyBoolean())).thenThrow(new RuntimeException("Simulating exception in processing message"));
-        hookConsumer.handleMessage(message);
+        hookConsumer.handleMessage(new AtlasKafkaMessage(message, -1, -1));
 
         verifyZeroInteractions(consumer);
     }

http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/0e7f8ea4/webapp/src/test/java/org/apache/atlas/web/integration/BaseResourceIT.java
----------------------------------------------------------------------
diff --git a/webapp/src/test/java/org/apache/atlas/web/integration/BaseResourceIT.java b/webapp/src/test/java/org/apache/atlas/web/integration/BaseResourceIT.java
index b59d3ee..c036cfa 100755
--- a/webapp/src/test/java/org/apache/atlas/web/integration/BaseResourceIT.java
+++ b/webapp/src/test/java/org/apache/atlas/web/integration/BaseResourceIT.java
@@ -21,7 +21,6 @@ package org.apache.atlas.web.integration;
 import com.google.common.base.Preconditions;
 import com.google.common.collect.ImmutableList;
 import com.google.common.collect.ImmutableSet;
-import kafka.consumer.ConsumerTimeoutException;
 import org.apache.atlas.ApplicationProperties;
 import org.apache.atlas.AtlasClient;
 import org.apache.atlas.AtlasClientV2;
@@ -42,7 +41,9 @@ import org.apache.atlas.model.typedef.AtlasStructDef.AtlasAttributeDef.Cardinali
 import org.apache.atlas.model.typedef.AtlasStructDef.AtlasConstraintDef;
 import org.apache.atlas.model.typedef.AtlasTypesDef;
 import org.apache.atlas.notification.NotificationConsumer;
+import org.apache.atlas.kafka.*;
 import org.apache.atlas.notification.entity.EntityNotification;
+import org.apache.atlas.notification.hook.HookNotification;
 import org.apache.atlas.type.AtlasTypeUtil;
 import org.apache.atlas.typesystem.Referenceable;
 import org.apache.atlas.typesystem.Struct;
@@ -634,14 +635,21 @@ public abstract class BaseResourceIT {
             @Override
             public boolean evaluate() throws Exception {
                 try {
-                    while (consumer.hasNext() && System.currentTimeMillis() < maxCurrentTime) {
-                        EntityNotification notification = consumer.next();
-                        if (predicate.evaluate(notification)) {
-                            pair.left = notification;
-                            return true;
-                        }
+
+                    while (System.currentTimeMillis() < maxCurrentTime) {
+                        List<AtlasKafkaMessage<EntityNotification>> messageList = consumer.receive(1000);
+                            if(messageList.size() > 0) {
+                                EntityNotification notification = messageList.get(0).getMessage();
+                                if (predicate.evaluate(notification)) {
+                                    pair.left = notification;
+                                    return true;
+                                }
+                            }else{
+                                LOG.info( System.currentTimeMillis()+ " messageList no records" +maxCurrentTime );
+                            }
                     }
-                } catch(ConsumerTimeoutException e) {
+                } catch(Exception e) {
+                    LOG.error(" waitForNotification", e);
                     //ignore
                 }
                 return false;

http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/0e7f8ea4/webapp/src/test/java/org/apache/atlas/web/integration/EntityJerseyResourceIT.java
----------------------------------------------------------------------
diff --git a/webapp/src/test/java/org/apache/atlas/web/integration/EntityJerseyResourceIT.java b/webapp/src/test/java/org/apache/atlas/web/integration/EntityJerseyResourceIT.java
index 310b2e3..b527583 100755
--- a/webapp/src/test/java/org/apache/atlas/web/integration/EntityJerseyResourceIT.java
+++ b/webapp/src/test/java/org/apache/atlas/web/integration/EntityJerseyResourceIT.java
@@ -81,7 +81,6 @@ public class EntityJerseyResourceIT extends BaseResourceIT {
     private static final String TRAITS = "traits";
 
     private NotificationInterface notificationInterface = NotificationProvider.get();
-    private NotificationConsumer<EntityNotification> notificationConsumer;
 
     @BeforeClass
     public void setUp() throws Exception {
@@ -89,10 +88,6 @@ public class EntityJerseyResourceIT extends BaseResourceIT {
 
         createTypeDefinitionsV1();
 
-        List<NotificationConsumer<EntityNotification>> consumers =
-                notificationInterface.createConsumers(NotificationInterface.NotificationType.ENTITIES, 1);
-
-        notificationConsumer = consumers.iterator().next();
     }
 
     @Test
@@ -218,29 +213,12 @@ public class EntityJerseyResourceIT extends BaseResourceIT {
 
         assertEntityAudit(dbId, EntityAuditEvent.EntityAuditAction.ENTITY_CREATE);
 
-        waitForNotification(notificationConsumer, MAX_WAIT_TIME, new NotificationPredicate() {
-            @Override
-            public boolean evaluate(EntityNotification notification) throws Exception {
-                return notification != null && notification.getEntity().getId()._getId().equals(dbId);
-            }
-        });
-
         JSONArray results = searchByDSL(String.format("%s where qualifiedName='%s'", DATABASE_TYPE_BUILTIN, dbName));
         assertEquals(results.length(), 1);
 
         //create entity again shouldn't create another instance with same unique attribute value
         List<String> entityResults = atlasClientV1.createEntity(HiveDBInstance);
         assertEquals(entityResults.size(), 0);
-        try {
-            waitForNotification(notificationConsumer, MAX_WAIT_TIME, new NotificationPredicate() {
-                @Override
-                public boolean evaluate(EntityNotification notification) throws Exception {
-                    return notification != null && notification.getEntity().getId()._getId().equals(dbId);
-                }
-            });
-        } catch (Exception e) {
-            //expected timeout
-        }
 
         results = searchByDSL(String.format("%s where qualifiedName='%s'", DATABASE_TYPE_BUILTIN, dbName));
         assertEquals(results.length(), 1);

http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/0e7f8ea4/webapp/src/test/java/org/apache/atlas/web/integration/EntityV2JerseyResourceIT.java
----------------------------------------------------------------------
diff --git a/webapp/src/test/java/org/apache/atlas/web/integration/EntityV2JerseyResourceIT.java b/webapp/src/test/java/org/apache/atlas/web/integration/EntityV2JerseyResourceIT.java
index 98a7abc..d61a9af 100755
--- a/webapp/src/test/java/org/apache/atlas/web/integration/EntityV2JerseyResourceIT.java
+++ b/webapp/src/test/java/org/apache/atlas/web/integration/EntityV2JerseyResourceIT.java
@@ -55,7 +55,7 @@ import java.util.ArrayList;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
-
+import org.apache.atlas.kafka.AtlasKafkaConsumer;
 import static org.testng.Assert.*;
 
 
@@ -72,8 +72,6 @@ public class EntityV2JerseyResourceIT extends BaseResourceIT {
 
     private AtlasEntity dbEntity;
     private AtlasEntity tableEntity;
-    private NotificationInterface notificationInterface = NotificationProvider.get();
-    private NotificationConsumer<EntityNotification> notificationConsumer;
 
     @BeforeClass
     public void setUp() throws Exception {
@@ -81,10 +79,6 @@ public class EntityV2JerseyResourceIT extends BaseResourceIT {
 
         createTypeDefinitionsV2();
 
-        List<NotificationConsumer<EntityNotification>> consumers =
-                notificationInterface.createConsumers(NotificationInterface.NotificationType.ENTITIES, 1);
-
-        notificationConsumer = consumers.iterator().next();
     }
 
     @Test
@@ -166,14 +160,6 @@ public class EntityV2JerseyResourceIT extends BaseResourceIT {
         assertEquals(results.length(), 1);
 
         final AtlasEntity hiveDBInstanceV2 = createHiveDB();
-        // Do the notification thing here
-        waitForNotification(notificationConsumer, MAX_WAIT_TIME, new NotificationPredicate() {
-            @Override
-            public boolean evaluate(EntityNotification notification) throws Exception {
-                return notification != null && notification.getEntity().getId()._getId().equals(hiveDBInstanceV2.getGuid());
-            }
-        });
-
 
         results = searchByDSL(String.format("%s where name='%s'", DATABASE_TYPE_V2, DATABASE_NAME));
         assertEquals(results.length(), 1);