You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@atlas.apache.org by ma...@apache.org on 2019/07/12 01:33:14 UTC

[atlas] branch branch-2.0 updated: ATLAS-3133: enhanced Atlas server to process notifications from multiple Kafka topics

This is an automated email from the ASF dual-hosted git repository.

madhan pushed a commit to branch branch-2.0
in repository https://gitbox.apache.org/repos/asf/atlas.git


The following commit(s) were added to refs/heads/branch-2.0 by this push:
     new f724df8  ATLAS-3133: enhanced Atlas server to process notifications from multiple Kafka topics
f724df8 is described below

commit f724df8201328b3b893c2eb8975bca57ecfce3dc
Author: Saqeeb Shaikh <sa...@freestoneinfotech.com>
AuthorDate: Thu Jul 11 12:28:30 2019 +0530

    ATLAS-3133: enhanced Atlas server to process notifications from multiple Kafka topics
    
    Signed-off-by: Madhan Neethiraj <ma...@apache.org>
    (cherry picked from commit 47d4d588f2ed70396bb64b80c6362d8115350339)
---
 .../java/org/apache/atlas/AtlasConfiguration.java  |  26 +++
 .../apache/atlas/model/metrics/AtlasMetrics.java   |   3 +-
 .../org/apache/atlas/kafka/AtlasKafkaConsumer.java |   2 +-
 .../org/apache/atlas/kafka/AtlasKafkaMessage.java  |  28 ++--
 .../org/apache/atlas/kafka/KafkaNotification.java  | 176 +++++++++++++++------
 .../org/apache/atlas/kafka/KafkaConsumerTest.java  |  32 ++--
 .../atlas/kafka/KafkaNotificationMockTest.java     |   6 +-
 .../AbstractNotificationConsumerTest.java          |   4 +-
 .../org/apache/atlas/util/AtlasMetricsCounter.java | 111 +++++++------
 .../org/apache/atlas/util/AtlasMetricsUtil.java    | 121 +++++++++++---
 .../apache/atlas/services/MetricsServiceTest.java  |   4 +-
 .../notification/NotificationHookConsumer.java     |   7 +-
 .../NotificationHookConsumerKafkaTest.java         |   5 +-
 .../notification/NotificationHookConsumerTest.java |   5 +-
 14 files changed, 372 insertions(+), 158 deletions(-)

diff --git a/intg/src/main/java/org/apache/atlas/AtlasConfiguration.java b/intg/src/main/java/org/apache/atlas/AtlasConfiguration.java
index 3ff1316..9da51f5 100644
--- a/intg/src/main/java/org/apache/atlas/AtlasConfiguration.java
+++ b/intg/src/main/java/org/apache/atlas/AtlasConfiguration.java
@@ -19,6 +19,7 @@
 package org.apache.atlas;
 
 import org.apache.commons.configuration.Configuration;
+import org.apache.commons.lang.StringUtils;
 
 /**
  * Enum that encapsulated each property name and its default value.
@@ -39,6 +40,9 @@ public enum AtlasConfiguration {
     NOTIFICATION_HOOK_TOPIC_NAME("atlas.notification.hook.topic.name", "ATLAS_HOOK"),
     NOTIFICATION_ENTITIES_TOPIC_NAME("atlas.notification.entities.topic.name", "ATLAS_ENTITIES"),
 
+    NOTIFICATION_HOOK_CONSUMER_TOPIC_NAMES("atlas.notification.hook.consumer.topic.names", "ATLAS_HOOK"), //  a comma separated list of topic names
+    NOTIFICATION_ENTITIES_CONSUMER_TOPIC_NAMES("atlas.notification.entities.consumer.topic.names", "ATLAS_ENTITIES"), //  a comma separated list of topic names
+
     NOTIFICATION_MESSAGE_MAX_LENGTH_BYTES("atlas.notification.message.max.length.bytes", (1000 * 1000)),
     NOTIFICATION_MESSAGE_COMPRESSION_ENABLED("atlas.notification.message.compression.enabled", true),
     NOTIFICATION_SPLIT_MESSAGE_SEGMENTS_WAIT_TIME_SECONDS("atlas.notification.split.message.segments.wait.time.seconds", 15 * 60),
@@ -84,6 +88,28 @@ public enum AtlasConfiguration {
         return APPLICATION_PROPERTIES.getString(propertyName, defaultValue.toString());
     }
 
+    public String[] getStringArray() {
+        String[] ret = APPLICATION_PROPERTIES.getStringArray(propertyName);
+
+        if (ret == null ||  ret.length == 0 || (ret.length == 1 && StringUtils.isEmpty(ret[0]))) {
+            if (defaultValue != null) {
+                ret = StringUtils.split(defaultValue.toString(), ',');
+            }
+        }
+
+        return ret;
+    }
+
+    public String[] getStringArray(String... defaultValue) {
+        String[] ret = APPLICATION_PROPERTIES.getStringArray(propertyName);
+
+        if (ret == null ||  ret.length == 0 || (ret.length == 1 && StringUtils.isEmpty(ret[0]))) {
+            ret = defaultValue;
+        }
+
+        return ret;
+    }
+
     public Object get() {
         Object value = APPLICATION_PROPERTIES.getProperty(propertyName);
         return value == null ? defaultValue : value;
diff --git a/intg/src/main/java/org/apache/atlas/model/metrics/AtlasMetrics.java b/intg/src/main/java/org/apache/atlas/model/metrics/AtlasMetrics.java
index c011ad9..a48d93b 100644
--- a/intg/src/main/java/org/apache/atlas/model/metrics/AtlasMetrics.java
+++ b/intg/src/main/java/org/apache/atlas/model/metrics/AtlasMetrics.java
@@ -55,8 +55,7 @@ public class AtlasMetrics {
     public static final String STAT_NOTIFY_FAILED_COUNT_CURR_HOUR      = PREFIX_NOTIFICATION + "currentHourFailed";
     public static final String STAT_NOTIFY_START_TIME_CURR_HOUR        = PREFIX_NOTIFICATION + "currentHourStartTime";
     public static final String STAT_NOTIFY_LAST_MESSAGE_PROCESSED_TIME = PREFIX_NOTIFICATION + "lastMessageProcessedTime";
-    public static final String STAT_NOTIFY_START_OFFSET                = PREFIX_NOTIFICATION + "offsetStart";
-    public static final String STAT_NOTIFY_CURRENT_OFFSET              = PREFIX_NOTIFICATION + "offsetCurrent";
+    public static final String STAT_NOTIFY_TOPIC_OFFSETS               = PREFIX_NOTIFICATION + "topicOffsets";
     public static final String STAT_NOTIFY_COUNT_PREV_DAY              = PREFIX_NOTIFICATION + "previousDay";
     public static final String STAT_NOTIFY_AVG_TIME_PREV_DAY           = PREFIX_NOTIFICATION + "previousDayAvgTime";
     public static final String STAT_NOTIFY_CREATES_COUNT_PREV_DAY      = PREFIX_NOTIFICATION + "previousDayEntityCreates";
diff --git a/notification/src/main/java/org/apache/atlas/kafka/AtlasKafkaConsumer.java b/notification/src/main/java/org/apache/atlas/kafka/AtlasKafkaConsumer.java
index 5c840c3..49f9ba3 100644
--- a/notification/src/main/java/org/apache/atlas/kafka/AtlasKafkaConsumer.java
+++ b/notification/src/main/java/org/apache/atlas/kafka/AtlasKafkaConsumer.java
@@ -88,7 +88,7 @@ public class AtlasKafkaConsumer<T> extends AbstractNotificationConsumer<T> {
                     continue;
                 }
 
-                messages.add(new AtlasKafkaMessage(message, record.offset(), record.partition()));
+                messages.add(new AtlasKafkaMessage(message, record.offset(), record.topic(), record.partition()));
             }
         }
 
diff --git a/notification/src/main/java/org/apache/atlas/kafka/AtlasKafkaMessage.java b/notification/src/main/java/org/apache/atlas/kafka/AtlasKafkaMessage.java
index b04aba9..22bd79f 100644
--- a/notification/src/main/java/org/apache/atlas/kafka/AtlasKafkaMessage.java
+++ b/notification/src/main/java/org/apache/atlas/kafka/AtlasKafkaMessage.java
@@ -18,15 +18,17 @@
 
 package org.apache.atlas.kafka;
 
+import org.apache.kafka.common.TopicPartition;
+
 public class AtlasKafkaMessage<T> {
-    private final T    message;
-    private final long offset;
-    private final int  partition;
-
-    public AtlasKafkaMessage(T message, long offset, int partition) {
-        this.message   = message;
-        this.offset    = offset;
-        this.partition = partition;
+    private final T              message;
+    private final long           offset;
+    private final TopicPartition topicPartition;
+
+    public AtlasKafkaMessage(T message, long offset, String topic, int partition) {
+        this.message        = message;
+        this.offset         = offset;
+        this.topicPartition = new TopicPartition(topic, partition);
     }
 
     public T getMessage() {
@@ -37,8 +39,16 @@ public class AtlasKafkaMessage<T> {
         return offset;
     }
 
+    public TopicPartition getTopicPartition() {
+        return topicPartition;
+    }
+
+    public String getTopic() {
+        return topicPartition.topic();
+    }
+
     public int getPartition() {
-        return partition;
+        return topicPartition.partition();
     }
 
 }
diff --git a/notification/src/main/java/org/apache/atlas/kafka/KafkaNotification.java b/notification/src/main/java/org/apache/atlas/kafka/KafkaNotification.java
index 449eb6f..46c68be 100644
--- a/notification/src/main/java/org/apache/atlas/kafka/KafkaNotification.java
+++ b/notification/src/main/java/org/apache/atlas/kafka/KafkaNotification.java
@@ -57,20 +57,30 @@ public class KafkaNotification extends AbstractNotification implements Service {
     public    static final String ATLAS_ENTITIES_TOPIC       = AtlasConfiguration.NOTIFICATION_ENTITIES_TOPIC_NAME.getString();
     protected static final String CONSUMER_GROUP_ID_PROPERTY = "group.id";
 
+    private   static final String[] ATLAS_HOOK_CONSUMER_TOPICS     = AtlasConfiguration.NOTIFICATION_HOOK_CONSUMER_TOPIC_NAMES.getStringArray(ATLAS_HOOK_TOPIC);
+    private   static final String[] ATLAS_ENTITIES_CONSUMER_TOPICS = AtlasConfiguration.NOTIFICATION_ENTITIES_CONSUMER_TOPIC_NAMES.getStringArray(ATLAS_ENTITIES_TOPIC);
+
     private static final String DEFAULT_CONSUMER_CLOSED_ERROR_MESSAGE = "This consumer has already been closed.";
 
-    private static final Map<NotificationType, String> TOPIC_MAP = new HashMap<NotificationType, String>() {
+    private static final Map<NotificationType, String> PRODUCER_TOPIC_MAP = new HashMap<NotificationType, String>() {
         {
             put(NotificationType.HOOK, ATLAS_HOOK_TOPIC);
             put(NotificationType.ENTITIES, ATLAS_ENTITIES_TOPIC);
         }
     };
 
-    private final Properties    properties;
-    private final Long          pollTimeOutMs;
-    private       KafkaConsumer consumer;
-    private       KafkaProducer producer;
-    private       String        consumerClosedErrorMsg;
+    private static final Map<NotificationType, String[]> CONSUMER_TOPICS_MAP = new HashMap<NotificationType, String[]>() {
+        {
+            put(NotificationType.HOOK, trimAndPurge(ATLAS_HOOK_CONSUMER_TOPICS));
+            put(NotificationType.ENTITIES, trimAndPurge(ATLAS_ENTITIES_CONSUMER_TOPICS));
+        }
+    };
+
+    private final Properties                                 properties;
+    private final Long                                       pollTimeOutMs;
+    private final Map<NotificationType, List<KafkaConsumer>> consumers = new HashMap<>();
+    private final Map<NotificationType, KafkaProducer>       producers = new HashMap<>();
+    private       String                                     consumerClosedErrorMsg;
 
     // ----- Constructors ----------------------------------------------------
 
@@ -125,8 +135,8 @@ public class KafkaNotification extends AbstractNotification implements Service {
     }
 
     @VisibleForTesting
-    String getTopicName(NotificationType notificationType) {
-        return TOPIC_MAP.get(notificationType);
+    String getProducerTopicName(NotificationType notificationType) {
+        return PRODUCER_TOPIC_MAP.get(notificationType);
     }
 
     // ----- Service ---------------------------------------------------------
@@ -156,10 +166,43 @@ public class KafkaNotification extends AbstractNotification implements Service {
     public <T> List<NotificationConsumer<T>> createConsumers(NotificationType notificationType, int numConsumers, boolean autoCommitEnabled) {
         LOG.info("==> KafkaNotification.createConsumers(notificationType={}, numConsumers={}, autoCommitEnabled={})", notificationType, numConsumers, autoCommitEnabled);
 
-        Properties         consumerProperties = getConsumerProperties(notificationType);
-        AtlasKafkaConsumer kafkaConsumer      = new AtlasKafkaConsumer(notificationType, getKafkaConsumer(consumerProperties, notificationType, autoCommitEnabled), autoCommitEnabled, pollTimeOutMs);
+        String[] topics = CONSUMER_TOPICS_MAP.get(notificationType);
+
+        if (numConsumers < topics.length) {
+            LOG.warn("consumers count {} is fewer than number of topics {}. Creating {} consumers, so that consumer count is equal to number of topics.", numConsumers, topics.length, topics.length);
+
+            numConsumers = topics.length;
+        } else if (numConsumers > topics.length) {
+            LOG.warn("consumers count {} is higher than number of topics {}. Creating {} consumers, so that consumer count is equal to number of topics", numConsumers, topics.length, topics.length);
+
+            numConsumers = topics.length;
+        }
+
+        List<KafkaConsumer> notificationConsumers = this.consumers.get(notificationType);
+
+        if (notificationConsumers == null) {
+            notificationConsumers = new ArrayList<>(numConsumers);
+
+            this.consumers.put(notificationType, notificationConsumers);
+        }
+
+        List<NotificationConsumer<T>> consumers          = new ArrayList<>();
+        Properties                    consumerProperties = getConsumerProperties(notificationType);
+
+        consumerProperties.put("enable.auto.commit", autoCommitEnabled);
 
-        List<NotificationConsumer<T>> consumers = Collections.singletonList(kafkaConsumer);
+        for (int i = 0; i < numConsumers; i++) {
+            KafkaConsumer existingConsumer = notificationConsumers.size() > i ? notificationConsumers.get(i) : null;
+            KafkaConsumer kafkaConsumer    = getOrCreateKafkaConsumer(existingConsumer, consumerProperties, notificationType, i);
+
+            if (notificationConsumers.size() > i) {
+                notificationConsumers.set(i, kafkaConsumer);
+            } else {
+                notificationConsumers.add(kafkaConsumer);
+            }
+
+            consumers.add(new AtlasKafkaConsumer(notificationType, kafkaConsumer, autoCommitEnabled, pollTimeOutMs));
+        }
 
         LOG.info("<== KafkaNotification.createConsumers(notificationType={}, numConsumers={}, autoCommitEnabled={})", notificationType, numConsumers, autoCommitEnabled);
 
@@ -170,29 +213,33 @@ public class KafkaNotification extends AbstractNotification implements Service {
     public void close() {
         LOG.info("==> KafkaNotification.close()");
 
-        if (producer != null) {
-            producer.close();
-
-            producer = null;
+        for (KafkaProducer producer : producers.values()) {
+            if (producer != null) {
+                try {
+                    producer.close();
+                } catch (Throwable t) {
+                    LOG.error("failed to close Kafka producer. Ignoring", t);
+                }
+            }
         }
 
+        producers.clear();
+
         LOG.info("<== KafkaNotification.close()");
     }
 
 
     // ----- AbstractNotification --------------------------------------------
     @Override
-    public void sendInternal(NotificationType type, List<String> messages) throws NotificationException {
-        if (producer == null) {
-            createProducer();
-        }
+    public void sendInternal(NotificationType notificationType, List<String> messages) throws NotificationException {
+        KafkaProducer producer = getOrCreateProducer(notificationType);
 
-        sendInternalToProducer(producer, type, messages);
+        sendInternalToProducer(producer, notificationType, messages);
     }
 
     @VisibleForTesting
-    void sendInternalToProducer(Producer p, NotificationType type, List<String> messages) throws NotificationException {
-        String               topic           = TOPIC_MAP.get(type);
+    void sendInternalToProducer(Producer p, NotificationType notificationType, List<String> messages) throws NotificationException {
+        String               topic           = PRODUCER_TOPIC_MAP.get(notificationType);
         List<MessageContext> messageContexts = new ArrayList<>();
 
         for (String message : messages) {
@@ -229,53 +276,82 @@ public class KafkaNotification extends AbstractNotification implements Service {
         }
     }
 
+    // Get properties for consumer request
+    @VisibleForTesting
+    public Properties getConsumerProperties(NotificationType notificationType) {
+        // find the configured group id for the given notification type
+        String groupId = properties.getProperty(notificationType.toString().toLowerCase() + "." + CONSUMER_GROUP_ID_PROPERTY);
 
-    public KafkaConsumer getKafkaConsumer(Properties consumerProperties, NotificationType type, boolean autoCommitEnabled) {
-        if (consumer == null || !isKafkaConsumerOpen(consumer)) {
-            try {
-                String topic = TOPIC_MAP.get(type);
+        if (StringUtils.isEmpty(groupId)) {
+            throw new IllegalStateException("No configuration group id set for the notification type " + notificationType);
+        }
 
-                consumerProperties.put("enable.auto.commit", autoCommitEnabled);
+        Properties consumerProperties = new Properties();
+
+        consumerProperties.putAll(properties);
+        consumerProperties.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);
 
-                this.consumer = new KafkaConsumer(consumerProperties);
+        return consumerProperties;
+    }
+
+    @VisibleForTesting
+    public KafkaConsumer getOrCreateKafkaConsumer(KafkaConsumer existingConsumer, Properties consumerProperties, NotificationType notificationType, int idxConsumer) {
+        KafkaConsumer ret = existingConsumer;
 
-                this.consumer.subscribe(Arrays.asList(topic));
-            } catch (Exception ee) {
-                LOG.error("Exception in getKafkaConsumer ", ee);
+        try {
+            if (ret == null || !isKafkaConsumerOpen(ret)) {
+                String[] topics = CONSUMER_TOPICS_MAP.get(notificationType);
+                String   topic  = topics[idxConsumer % topics.length];
+
+                LOG.debug("Creating new KafkaConsumer for topic : {}, index : {}", topic, idxConsumer);
+
+                ret = new KafkaConsumer(consumerProperties);
+
+                ret.subscribe(Arrays.asList(topic));
             }
+        } catch (Exception ee) {
+            LOG.error("Exception in getKafkaConsumer ", ee);
         }
 
-        return this.consumer;
+        return ret;
     }
 
+    private KafkaProducer getOrCreateProducer(NotificationType notificationType) {
+        LOG.debug("==> KafkaNotification.getOrCreateProducer()");
 
-    @VisibleForTesting
-    public
-        // Get properties for consumer request
-    Properties getConsumerProperties(NotificationType type) {
-        // find the configured group id for the given notification type
-        String groupId = properties.getProperty(type.toString().toLowerCase() + "." + CONSUMER_GROUP_ID_PROPERTY);
+        KafkaProducer ret = producers.get(notificationType);
 
-        if (StringUtils.isEmpty(groupId)) {
-            throw new IllegalStateException("No configuration group id set for the notification type " + type);
-        }
+        if (ret == null) {
+            synchronized (this) {
+                ret = producers.get(notificationType);
 
-        Properties consumerProperties = new Properties();
+                if (ret == null) {
+                    ret = new KafkaProducer(properties);
 
-        consumerProperties.putAll(properties);
-        consumerProperties.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);
+                    producers.put(notificationType, ret);
+                }
+            }
+        }
 
-        return consumerProperties;
+        LOG.debug("<== KafkaNotification.getOrCreateProducer()");
+
+        return ret;
     }
 
-    private synchronized void createProducer() {
-        LOG.info("==> KafkaNotification.createProducer()");
+    public static String[] trimAndPurge(String[] strings)  {
+        List<String> ret = new ArrayList<>();
 
-        if (producer == null) {
-            producer = new KafkaProducer(properties);
+        if (strings != null) {
+            for (int i = 0; i < strings.length; i++) {
+                String str = StringUtils.trim(strings[i]);
+
+                if (StringUtils.isNotEmpty(str)) {
+                    ret.add(str);
+                }
+            }
         }
 
-        LOG.info("<== KafkaNotification.createProducer()");
+        return ret.toArray(new String[ret.size()]);
     }
 
     private class MessageContext {
diff --git a/notification/src/test/java/org/apache/atlas/kafka/KafkaConsumerTest.java b/notification/src/test/java/org/apache/atlas/kafka/KafkaConsumerTest.java
index 847caa3..1af1f3e 100644
--- a/notification/src/test/java/org/apache/atlas/kafka/KafkaConsumerTest.java
+++ b/notification/src/test/java/org/apache/atlas/kafka/KafkaConsumerTest.java
@@ -54,7 +54,8 @@ import static org.testng.Assert.*;
 public class KafkaConsumerTest {
     private static final String TRAIT_NAME = "MyTrait";
 
-    private final String ATLAS_HOOK_TOPIC = AtlasConfiguration.NOTIFICATION_HOOK_TOPIC_NAME.getString();
+    private static final String ATLAS_HOOK_TOPIC = AtlasConfiguration.NOTIFICATION_HOOK_TOPIC_NAME.getString();
+    private static final String[] ATLAS_HOOK_CONSUMER_TOPICS = KafkaNotification.trimAndPurge(AtlasConfiguration.NOTIFICATION_HOOK_CONSUMER_TOPIC_NAMES.getStringArray(ATLAS_HOOK_TOPIC));
 
 
     @Mock
@@ -67,11 +68,25 @@ public class KafkaConsumerTest {
 
     @Test
     public void testReceive() throws Exception {
-        Referenceable                        entity  = getEntity(TRAIT_NAME);
-        EntityUpdateRequest                  message = new EntityUpdateRequest("user1", entity);
+        for (String topic : ATLAS_HOOK_CONSUMER_TOPICS) {
+            String traitName = TRAIT_NAME + "_" + topic;
+            Referenceable entity = getEntity(traitName);
+            EntityUpdateRequest message = new EntityUpdateRequest("user1", entity);
+            List<AtlasKafkaMessage<HookNotification>> messageList = testReceiveHelper(message, topic);
+            assertTrue(messageList.size() > 0);
+
+            HookNotification consumedMessage = messageList.get(0).getMessage();
+
+            assertMessagesEqual(message, consumedMessage, entity);
+        }
+    }
+
+
+    private List<AtlasKafkaMessage<HookNotification>> testReceiveHelper(EntityUpdateRequest message, String topic) throws Exception {
+
         String                               json    = AtlasType.toV1Json(new AtlasNotificationMessage<>(new MessageVersion("1.0.0"), message));
-        TopicPartition                       tp      = new TopicPartition(ATLAS_HOOK_TOPIC, 0);
-        List<ConsumerRecord<String, String>> klist   = Collections.singletonList(new ConsumerRecord<>(ATLAS_HOOK_TOPIC, 0, 0L, "mykey", json));
+        TopicPartition                       tp      = new TopicPartition(topic, 0);
+        List<ConsumerRecord<String, String>> klist   = Collections.singletonList(new ConsumerRecord<>(topic, 0, 0L, "mykey", json));
         Map                                  mp      = Collections.singletonMap(tp, klist);
         ConsumerRecords                      records = new ConsumerRecords(mp);
 
@@ -81,12 +96,7 @@ public class KafkaConsumerTest {
 
         AtlasKafkaConsumer                        consumer    = new AtlasKafkaConsumer(NotificationType.HOOK, kafkaConsumer, false, 100L);
         List<AtlasKafkaMessage<HookNotification>> messageList = consumer.receive();
-
-        assertTrue(messageList.size() > 0);
-
-        HookNotification consumedMessage  = messageList.get(0).getMessage();
-
-        assertMessagesEqual(message, consumedMessage, entity);
+        return messageList;
     }
 
     @Test
diff --git a/notification/src/test/java/org/apache/atlas/kafka/KafkaNotificationMockTest.java b/notification/src/test/java/org/apache/atlas/kafka/KafkaNotificationMockTest.java
index 263903b..9b5891f 100644
--- a/notification/src/test/java/org/apache/atlas/kafka/KafkaNotificationMockTest.java
+++ b/notification/src/test/java/org/apache/atlas/kafka/KafkaNotificationMockTest.java
@@ -74,7 +74,7 @@ public class KafkaNotificationMockTest {
         KafkaNotification kafkaNotification = new KafkaNotification(configProperties);
 
         Producer producer = mock(Producer.class);
-        String topicName = kafkaNotification.getTopicName(NotificationInterface.NotificationType.HOOK);
+        String topicName = kafkaNotification.getProducerTopicName(NotificationInterface.NotificationType.HOOK);
         String message = "This is a test message";
         Future returnValue = mock(Future.class);
         TopicPartition topicPartition = new TopicPartition(topicName, 0);
@@ -96,7 +96,7 @@ public class KafkaNotificationMockTest {
         KafkaNotification kafkaNotification = new KafkaNotification(configProperties);
 
         Producer producer = mock(Producer.class);
-        String topicName = kafkaNotification.getTopicName(NotificationInterface.NotificationType.HOOK);
+        String topicName = kafkaNotification.getProducerTopicName(NotificationInterface.NotificationType.HOOK);
         String message = "This is a test message";
         Future returnValue = mock(Future.class);
         when(returnValue.get()).thenThrow(new RuntimeException("Simulating exception"));
@@ -121,7 +121,7 @@ public class KafkaNotificationMockTest {
         KafkaNotification kafkaNotification = new KafkaNotification(configProperties);
 
         Producer producer = mock(Producer.class);
-        String topicName = kafkaNotification.getTopicName(NotificationInterface.NotificationType.HOOK);
+        String topicName = kafkaNotification.getProducerTopicName(NotificationInterface.NotificationType.HOOK);
         String message1 = "This is a test message1";
         String message2 = "This is a test message2";
         Future returnValue1 = mock(Future.class);
diff --git a/notification/src/test/java/org/apache/atlas/notification/AbstractNotificationConsumerTest.java b/notification/src/test/java/org/apache/atlas/notification/AbstractNotificationConsumerTest.java
index c963830..05d0d81 100644
--- a/notification/src/test/java/org/apache/atlas/notification/AbstractNotificationConsumerTest.java
+++ b/notification/src/test/java/org/apache/atlas/notification/AbstractNotificationConsumerTest.java
@@ -183,6 +183,8 @@ public class AbstractNotificationConsumerTest {
     }
 
     private static class TestNotificationConsumer extends AbstractNotificationConsumer<TestMessage> {
+        private static final String TEST_TOPIC_NAME = "TEST_TOPIC";
+
         private final List<TestMessage> messageList;
         private       int              index = 0;
 
@@ -217,7 +219,7 @@ public class AbstractNotificationConsumerTest {
         public List<AtlasKafkaMessage<TestMessage>> receive(long timeoutMilliSeconds) {
             List<AtlasKafkaMessage<TestMessage>> tempMessageList = new ArrayList();
             for(Object json :  messageList) {
-                tempMessageList.add(new AtlasKafkaMessage(deserializer.deserialize((String) json), -1, -1));
+                tempMessageList.add(new AtlasKafkaMessage(deserializer.deserialize((String) json), -1, TEST_TOPIC_NAME, -1));
             }
             return tempMessageList;
         }
diff --git a/repository/src/main/java/org/apache/atlas/util/AtlasMetricsCounter.java b/repository/src/main/java/org/apache/atlas/util/AtlasMetricsCounter.java
index d5a4412..10319d0 100644
--- a/repository/src/main/java/org/apache/atlas/util/AtlasMetricsCounter.java
+++ b/repository/src/main/java/org/apache/atlas/util/AtlasMetricsCounter.java
@@ -24,6 +24,7 @@ import java.time.Instant;
 import java.time.LocalDateTime;
 import java.time.LocalTime;
 import java.time.ZoneOffset;
+import java.util.concurrent.atomic.AtomicLong;
 
 import static org.apache.atlas.util.AtlasMetricsCounter.Period.*;
 
@@ -87,10 +88,10 @@ public class AtlasMetricsCounter {
         }
     }
 
-    public Stats report() {
+    public StatsReport report() {
         updateForTime(clock.instant());
 
-        return new Stats(stats, dayStartTime.toEpochMilli(), hourStartTime.toEpochMilli());
+        return new StatsReport(stats, dayStartTime.toEpochMilli(), hourStartTime.toEpochMilli());
     }
 
     // visible only for testing
@@ -179,16 +180,15 @@ public class AtlasMetricsCounter {
         return LocalDateTime.of(time.toLocalDate().plusDays(1), LocalTime.MIN).toInstant(ZoneOffset.UTC);
     }
 
-
     public static class Stats {
         private static final int NUM_PERIOD = Period.values().length;
 
-        private final long   dayStartTimeMs;
-        private final long   hourStartTimeMs;
-        private final long[] count           = new long[NUM_PERIOD];
-        private final long[] measureSum      = new long[NUM_PERIOD];
-        private final long[] measureMin      = new long[NUM_PERIOD];
-        private final long[] measureMax      = new long[NUM_PERIOD];
+        private final long         dayStartTimeMs;
+        private final long         hourStartTimeMs;
+        private final AtomicLong[] count           = new AtomicLong[NUM_PERIOD];
+        private final AtomicLong[] measureSum      = new AtomicLong[NUM_PERIOD];
+        private final AtomicLong[] measureMin      = new AtomicLong[NUM_PERIOD];
+        private final AtomicLong[] measureMax      = new AtomicLong[NUM_PERIOD];
 
 
         public Stats() {
@@ -200,7 +200,57 @@ public class AtlasMetricsCounter {
             }
         }
 
-        public Stats(Stats other, long dayStartTimeMs, long hourStartTimeMs) {
+        public void addCount(Period period, long num) {
+            count[period.ordinal()].addAndGet(num);
+        }
+
+        public void addMeasure(Period period, long measure) {
+            int idx = period.ordinal();
+
+            measureSum[idx].addAndGet(measure);
+
+            if (measureMin[idx].get() > measure) {
+                measureMin[idx].set(measure);
+            }
+
+            if (measureMax[idx].get() < measure) {
+                measureMax[idx].set(measure);
+            }
+        }
+
+        private void copy(Period src, Period dest) {
+            int srcIdx  = src.ordinal();
+            int destIdx = dest.ordinal();
+
+            count[destIdx].set(count[srcIdx].get());
+            measureSum[destIdx].set(measureSum[srcIdx].get());
+            measureMin[destIdx].set(measureMin[srcIdx].get());
+            measureMax[destIdx].set( measureMax[srcIdx].get());
+        }
+
+        private void reset(Period period) {
+            int idx = period.ordinal();
+
+            count[idx]      = new AtomicLong(0);
+            measureSum[idx] = new AtomicLong(0);
+            measureMin[idx] = new AtomicLong(Long.MAX_VALUE);
+            measureMax[idx] = new AtomicLong(Long.MIN_VALUE);
+        }
+
+    }
+
+    public static class StatsReport {
+        private static final int NUM_PERIOD = Period.values().length;
+
+        private final long   dayStartTimeMs;
+        private final long   hourStartTimeMs;
+        private final long[] count           = new long[NUM_PERIOD];
+        private final long[] measureSum      = new long[NUM_PERIOD];
+        private final long[] measureMin      = new long[NUM_PERIOD];
+        private final long[] measureMax      = new long[NUM_PERIOD];
+
+
+        public StatsReport(Stats other, long dayStartTimeMs, long hourStartTimeMs) {
             this.dayStartTimeMs  = dayStartTimeMs;
             this.hourStartTimeMs = hourStartTimeMs;
 
@@ -229,46 +279,9 @@ public class AtlasMetricsCounter {
             return c != 0 ? (measureSum[idx] / c) : 0;
         }
 
-        public void addCount(Period period, long num) {
-            count[period.ordinal()] += num;
-        }
-
-        public void addMeasure(Period period, long measure) {
-            int idx = period.ordinal();
-
-            measureSum[idx] += measure;
-
-            if (measureMin[idx] > measure) {
-                measureMin[idx] = measure;
-            }
-
-            if (measureMax[idx] < measure) {
-                measureMax[idx] = measure;
-            }
-        }
-
-        private void copy(Period src, Period dest) {
-            int srcIdx  = src.ordinal();
-            int destIdx = dest.ordinal();
-
-            count[destIdx]      = count[srcIdx];
-            measureSum[destIdx] = measureSum[srcIdx];
-            measureMin[destIdx] = measureMin[srcIdx];
-            measureMax[destIdx] = measureMax[srcIdx];
-        }
-
-        private void reset(Period period) {
-            int idx = period.ordinal();
-
-            count[idx]      = 0;
-            measureSum[idx] = 0;
-            measureMin[idx] = Long.MAX_VALUE;
-            measureMax[idx] = Long.MIN_VALUE;
-        }
-
-        private void copy(long[] src, long[] dest) {
+        private void copy(AtomicLong[] src, long[] dest) {
             for (int i = 0; i < dest.length; i++) {
-                dest[i] = src[i];
+                dest[i] = src[i].get();
             }
         }
     }
diff --git a/repository/src/main/java/org/apache/atlas/util/AtlasMetricsUtil.java b/repository/src/main/java/org/apache/atlas/util/AtlasMetricsUtil.java
index f658caa..2c78cbc 100644
--- a/repository/src/main/java/org/apache/atlas/util/AtlasMetricsUtil.java
+++ b/repository/src/main/java/org/apache/atlas/util/AtlasMetricsUtil.java
@@ -21,7 +21,7 @@ import org.apache.atlas.model.instance.EntityMutationResponse;
 import org.apache.atlas.repository.Constants;
 import org.apache.atlas.repository.graphdb.AtlasGraph;
 import org.apache.atlas.repository.store.graph.v2.AtlasGraphUtilsV2;
-import org.apache.atlas.util.AtlasMetricsCounter.Stats;
+import org.apache.atlas.util.AtlasMetricsCounter.StatsReport;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -50,16 +50,15 @@ public class AtlasMetricsUtil {
     private static final String STATUS_CONNECTED     = "connected";
     private static final String STATUS_NOT_CONNECTED = "not-connected";
 
-    private final AtlasGraph          graph;
-    private       long                serverStartTime   = 0;
-    private       long                serverActiveTime  = 0;
-    private       long                msgOffsetStart    = -1;
-    private       long                msgOffsetCurrent  = 0;
-    private final AtlasMetricsCounter messagesProcessed = new AtlasMetricsCounter("messagesProcessed");
-    private final AtlasMetricsCounter messagesFailed    = new AtlasMetricsCounter("messagesFailed");
-    private final AtlasMetricsCounter entityCreates     = new AtlasMetricsCounter("entityCreates");
-    private final AtlasMetricsCounter entityUpdates     = new AtlasMetricsCounter("entityUpdates");
-    private final AtlasMetricsCounter entityDeletes     = new AtlasMetricsCounter("entityDeletes");
+    private final AtlasGraph              graph;
+    private       long                    serverStartTime   = 0;
+    private       long                    serverActiveTime  = 0;
+    private final Map<String, TopicStats> topicStats        = new HashMap<>();
+    private final AtlasMetricsCounter     messagesProcessed = new AtlasMetricsCounter("messagesProcessed");
+    private final AtlasMetricsCounter     messagesFailed    = new AtlasMetricsCounter("messagesFailed");
+    private final AtlasMetricsCounter     entityCreates     = new AtlasMetricsCounter("entityCreates");
+    private final AtlasMetricsCounter     entityUpdates     = new AtlasMetricsCounter("entityUpdates");
+    private final AtlasMetricsCounter     entityDeletes     = new AtlasMetricsCounter("entityDeletes");
 
     @Inject
     public AtlasMetricsUtil(AtlasGraph graph) {
@@ -83,7 +82,7 @@ public class AtlasMetricsUtil {
         serverActiveTime = System.currentTimeMillis();
     }
 
-    public void onNotificationProcessingComplete(long msgOffset, NotificationStat stats) {
+    public void onNotificationProcessingComplete(String topicName, int partition, long msgOffset, NotificationStat stats) {
         messagesProcessed.incrWithMeasure(stats.timeTakenMs);
         entityCreates.incrBy(stats.entityCreates);
         entityUpdates.incrBy(stats.entityUpdates);
@@ -93,21 +92,33 @@ public class AtlasMetricsUtil {
             messagesFailed.incr();
         }
 
-        if (msgOffsetStart == -1) {
-            msgOffsetStart = msgOffset;
+        TopicStats topicStat = topicStats.get(topicName);
+
+        if (topicStat == null) {
+            topicStat = new TopicStats(topicName);
+
+            topicStats.put(topicName, topicStat);
         }
 
-        msgOffsetCurrent = ++msgOffset;
+        TopicPartitionStat partitionStat = topicStat.get(partition);
+
+        if (partitionStat == null) {
+            partitionStat = new TopicPartitionStat(topicName, partition, msgOffset, msgOffset);
+
+            topicStat.set(partition, partitionStat);
+        }
+
+        partitionStat.setCurrentOffset(msgOffset + 1);
     }
 
     public Map<String, Object> getStats() {
         Map<String, Object> ret = new HashMap<>();
 
-        Stats messagesProcessed = this.messagesProcessed.report();
-        Stats messagesFailed    = this.messagesFailed.report();
-        Stats entityCreates     = this.entityCreates.report();
-        Stats entityUpdates     = this.entityUpdates.report();
-        Stats entityDeletes     = this.entityDeletes.report();
+        StatsReport messagesProcessed = this.messagesProcessed.report();
+        StatsReport messagesFailed    = this.messagesFailed.report();
+        StatsReport entityCreates     = this.entityCreates.report();
+        StatsReport entityUpdates     = this.entityUpdates.report();
+        StatsReport entityDeletes     = this.entityDeletes.report();
 
         ret.put(STAT_SERVER_START_TIMESTAMP, serverStartTime);
         ret.put(STAT_SERVER_ACTIVE_TIMESTAMP, serverActiveTime);
@@ -115,8 +126,20 @@ public class AtlasMetricsUtil {
         ret.put(STAT_SERVER_STATUS_BACKEND_STORE, getBackendStoreStatus() ? STATUS_CONNECTED : STATUS_NOT_CONNECTED);
         ret.put(STAT_SERVER_STATUS_INDEX_STORE, getIndexStoreStatus() ? STATUS_CONNECTED : STATUS_NOT_CONNECTED);
 
-        ret.put(STAT_NOTIFY_START_OFFSET, msgOffsetStart);
-        ret.put(STAT_NOTIFY_CURRENT_OFFSET, msgOffsetCurrent);
+        Map<String, Map<String, Long>> topicOffsets = new HashMap<>();
+
+        for (TopicStats tStat : topicStats.values()) {
+            for (TopicPartitionStat tpStat : tStat.partitionStats.values()) {
+                Map<String, Long> tpOffsets = new HashMap<>();
+
+                tpOffsets.put("offsetStart", tpStat.startOffset);
+                tpOffsets.put("offsetCurrent", tpStat.currentOffset);
+
+                topicOffsets.put(tpStat.topicName + "-" + tpStat.partition, tpOffsets);
+            }
+        }
+
+        ret.put(STAT_NOTIFY_TOPIC_OFFSETS, topicOffsets);
         ret.put(STAT_NOTIFY_LAST_MESSAGE_PROCESSED_TIME, this.messagesProcessed.getLastIncrTime().toEpochMilli());
 
         ret.put(STAT_NOTIFY_COUNT_TOTAL,         messagesProcessed.getCount(ALL));
@@ -297,4 +320,58 @@ public class AtlasMetricsUtil {
             return collection != null ? collection.size() : 0;
         }
     }
+
+    class TopicStats {
+        private final String                           topicName;
+        private final Map<Integer, TopicPartitionStat> partitionStats = new HashMap<>();
+
+        public TopicStats(String topicName) {
+            this.topicName = topicName;
+        }
+
+        public String getTopicName() { return topicName; }
+
+        public Map<Integer, TopicPartitionStat> getPartitionStats() { return partitionStats; }
+
+        public TopicPartitionStat get(Integer partition) { return partitionStats.get(partition); }
+
+        public void set(Integer partition, TopicPartitionStat partitionStat) {
+            partitionStats.put(partition, partitionStat);
+        }
+    }
+
+    class TopicPartitionStat {
+        private final String topicName;
+        private final int    partition;
+        private final long   startOffset;
+        private       long   currentOffset;
+
+        public TopicPartitionStat(String  topicName, int partition, long startOffset, long currentOffset) {
+            this.topicName     = topicName;
+            this.partition     = partition;
+            this.startOffset   = startOffset;
+            this.currentOffset = currentOffset;
+        }
+
+        public String getTopicName() {
+            return topicName;
+        }
+
+        public int getPartition() {
+            return partition;
+        }
+
+        public long getStartOffset() {
+            return startOffset;
+        }
+
+        public long getCurrentOffset() {
+            return currentOffset;
+        }
+
+        public void setCurrentOffset(long currentOffset) {
+            this.currentOffset = currentOffset;
+        }
+
+    };
 }
diff --git a/repository/src/test/java/org/apache/atlas/services/MetricsServiceTest.java b/repository/src/test/java/org/apache/atlas/services/MetricsServiceTest.java
index b56019e..b2f2633 100644
--- a/repository/src/test/java/org/apache/atlas/services/MetricsServiceTest.java
+++ b/repository/src/test/java/org/apache/atlas/services/MetricsServiceTest.java
@@ -227,10 +227,10 @@ public class MetricsServiceTest {
     private void processMessage(Instant instant) {
         clock.setInstant(instant);
 
-        metricsUtil.onNotificationProcessingComplete(++msgOffset, new AtlasMetricsUtil.NotificationStat(true, 1));
+        metricsUtil.onNotificationProcessingComplete("ATLAS_HOOK", 0, ++msgOffset, new AtlasMetricsUtil.NotificationStat(true, 1));
 
         for (int i = 0; i < 10; i++) {
-            metricsUtil.onNotificationProcessingComplete(msgOffset++, new AtlasMetricsUtil.NotificationStat(false, 1));
+            metricsUtil.onNotificationProcessingComplete("ATLAS_HOOK", 0, msgOffset++, new AtlasMetricsUtil.NotificationStat(false, 1));
         }
 
         clock.setInstant(null);
diff --git a/webapp/src/main/java/org/apache/atlas/notification/NotificationHookConsumer.java b/webapp/src/main/java/org/apache/atlas/notification/NotificationHookConsumer.java
index 1f8e810..1c8d72b 100644
--- a/webapp/src/main/java/org/apache/atlas/notification/NotificationHookConsumer.java
+++ b/webapp/src/main/java/org/apache/atlas/notification/NotificationHookConsumer.java
@@ -23,7 +23,6 @@ import kafka.utils.ShutdownableThread;
 import org.apache.atlas.ApplicationProperties;
 import org.apache.atlas.AtlasClient;
 import org.apache.atlas.AtlasClientV2;
-import org.apache.atlas.AtlasConfiguration;
 import org.apache.atlas.AtlasException;
 import org.apache.atlas.AtlasServiceException;
 import org.apache.atlas.RequestContext;
@@ -118,7 +117,6 @@ public class NotificationHookConsumer implements Service, ActiveStateChangeHandl
     private static final String ATTRIBUTE_QUALIFIED_NAME = "qualifiedName";
 
     private static final String THREADNAME_PREFIX = NotificationHookConsumer.class.getSimpleName();
-    private static final String ATLAS_HOOK_TOPIC  = AtlasConfiguration.NOTIFICATION_HOOK_TOPIC_NAME.getString();
 
     public static final String CONSUMER_THREADS_PROPERTY         = "atlas.notification.hook.numthreads";
     public static final String CONSUMER_RETRIES_PROPERTY         = "atlas.notification.hook.maxretries";
@@ -701,7 +699,7 @@ public class NotificationHookConsumer implements Service, ActiveStateChangeHandl
 
                 stats.timeTakenMs = System.currentTimeMillis() - startTime;
 
-                metricsUtil.onNotificationProcessingComplete(kafkaMsg.getOffset(), stats);
+                metricsUtil.onNotificationProcessingComplete(kafkaMsg.getTopic(), kafkaMsg.getPartition(), kafkaMsg.getOffset(), stats);
 
                 if (stats.timeTakenMs > largeMessageProcessingTimeThresholdMs) {
                     String strMessage = AbstractNotification.getMessageJson(message);
@@ -785,9 +783,8 @@ public class NotificationHookConsumer implements Service, ActiveStateChangeHandl
             try {
                 recordFailedMessages();
 
-                TopicPartition partition = new TopicPartition(ATLAS_HOOK_TOPIC, kafkaMessage.getPartition());
+                consumer.commit(kafkaMessage.getTopicPartition(), kafkaMessage.getOffset() + 1);
 
-                consumer.commit(partition, kafkaMessage.getOffset() + 1);
                 commitSucceessStatus = true;
             } finally {
                 failedCommitOffsetRecorder.recordIfFailed(commitSucceessStatus, kafkaMessage.getOffset());
diff --git a/webapp/src/test/java/org/apache/atlas/notification/NotificationHookConsumerKafkaTest.java b/webapp/src/test/java/org/apache/atlas/notification/NotificationHookConsumerKafkaTest.java
index fb3ff26..b1b0e9f 100644
--- a/webapp/src/test/java/org/apache/atlas/notification/NotificationHookConsumerKafkaTest.java
+++ b/webapp/src/test/java/org/apache/atlas/notification/NotificationHookConsumerKafkaTest.java
@@ -182,7 +182,10 @@ public class NotificationHookConsumerKafkaTest {
 
     ExceptionThrowingCommitConsumer createNewConsumerThatThrowsExceptionInCommit(KafkaNotification kafkaNotification, boolean autoCommitEnabled) {
         Properties prop = kafkaNotification.getConsumerProperties(NotificationInterface.NotificationType.HOOK);
-        KafkaConsumer consumer = kafkaNotification.getKafkaConsumer(prop, NotificationInterface.NotificationType.HOOK, true);
+
+        prop.put("enable.auto.commit", autoCommitEnabled);
+
+        KafkaConsumer consumer = kafkaNotification.getOrCreateKafkaConsumer(null, prop, NotificationInterface.NotificationType.HOOK, 0);
         return new ExceptionThrowingCommitConsumer(NotificationInterface.NotificationType.HOOK, consumer, autoCommitEnabled, 1000);
     }
 
diff --git a/webapp/src/test/java/org/apache/atlas/notification/NotificationHookConsumerTest.java b/webapp/src/test/java/org/apache/atlas/notification/NotificationHookConsumerTest.java
index 3e35511..ece46a4 100644
--- a/webapp/src/test/java/org/apache/atlas/notification/NotificationHookConsumerTest.java
+++ b/webapp/src/test/java/org/apache/atlas/notification/NotificationHookConsumerTest.java
@@ -22,6 +22,7 @@ import org.apache.atlas.AtlasServiceException;
 import org.apache.atlas.exception.AtlasBaseException;
 import org.apache.atlas.ha.HAConfiguration;
 import org.apache.atlas.kafka.AtlasKafkaMessage;
+import org.apache.atlas.kafka.KafkaNotification;
 import org.apache.atlas.model.instance.AtlasEntity.AtlasEntitiesWithExtInfo;
 import org.apache.atlas.model.instance.EntityMutationResponse;
 import org.apache.atlas.model.notification.HookNotification.HookNotificationType;
@@ -136,7 +137,7 @@ public class NotificationHookConsumerTest {
         when(message.getType()).thenReturn(HookNotificationType.ENTITY_CREATE);
         when(message.getEntities()).thenReturn(Arrays.asList(mock));
 
-        hookConsumer.handleMessage(new AtlasKafkaMessage(message, -1, -1));
+        hookConsumer.handleMessage(new AtlasKafkaMessage(message, -1, KafkaNotification.ATLAS_HOOK_TOPIC, -1));
 
         verify(consumer).commit(any(TopicPartition.class), anyInt());
     }
@@ -150,7 +151,7 @@ public class NotificationHookConsumerTest {
 
         when(atlasEntityStore.createOrUpdate(any(EntityStream.class), anyBoolean())).thenThrow(new RuntimeException("Simulating exception in processing message"));
 
-        hookConsumer.handleMessage(new AtlasKafkaMessage(message, -1, -1));
+        hookConsumer.handleMessage(new AtlasKafkaMessage(message, -1, KafkaNotification.ATLAS_HOOK_TOPIC, -1));
 
         verifyZeroInteractions(consumer);
     }