You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@gobblin.apache.org by su...@apache.org on 2019/11/06 21:31:56 UTC

[incubator-gobblin] branch master updated: [GOBBLIN-945] Refactor Kafka extractor statistics tracking to allow co…

This is an automated email from the ASF dual-hosted git repository.

suvasude pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-gobblin.git


The following commit(s) were added to refs/heads/master by this push:
     new 8c7ff20  [GOBBLIN-945] Refactor Kafka extractor statistics tracking to allow co…
8c7ff20 is described below

commit 8c7ff20666f6f059bc1bd41e2b8a32e13d298f34
Author: sv2000 <su...@gmail.com>
AuthorDate: Wed Nov 6 13:31:49 2019 -0800

    [GOBBLIN-945] Refactor Kafka extractor statistics tracking to allow co…
    
    Closes #2795 from sv2000/statsTracker
---
 .../kafka/client/Kafka08ConsumerClient.java        |  18 +-
 .../kafka/client/GobblinKafkaConsumerClient.java   |  20 ++
 .../extract/kafka/KafkaAvroExtractor.java          |   6 +-
 .../extractor/extract/kafka/KafkaExtractor.java    | 265 +++---------------
 .../extract/kafka/KafkaExtractorStatsTracker.java  | 307 +++++++++++++++++++++
 .../extractor/extract/kafka/KafkaPartition.java    |  13 +-
 .../extractor/extract/kafka/KafkaSource.java       |  18 +-
 .../kafka/KafkaExtractorStatsTrackerTest.java      | 137 +++++++++
 8 files changed, 533 insertions(+), 251 deletions(-)

diff --git a/gobblin-modules/gobblin-kafka-08/src/main/java/org/apache/gobblin/kafka/client/Kafka08ConsumerClient.java b/gobblin-modules/gobblin-kafka-08/src/main/java/org/apache/gobblin/kafka/client/Kafka08ConsumerClient.java
index df08ed9..2ed5b70 100644
--- a/gobblin-modules/gobblin-kafka-08/src/main/java/org/apache/gobblin/kafka/client/Kafka08ConsumerClient.java
+++ b/gobblin-modules/gobblin-kafka-08/src/main/java/org/apache/gobblin/kafka/client/Kafka08ConsumerClient.java
@@ -24,7 +24,14 @@ import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
 import java.util.concurrent.ConcurrentMap;
-import java.util.regex.Pattern;
+
+import com.google.common.base.Function;
+import com.google.common.base.Splitter;
+import com.google.common.collect.Iterators;
+import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
+import com.google.common.net.HostAndPort;
+import com.typesafe.config.Config;
 
 import kafka.api.PartitionFetchInfo;
 import kafka.api.PartitionOffsetRequestInfo;
@@ -41,19 +48,10 @@ import kafka.javaapi.message.ByteBufferMessageSet;
 import kafka.message.MessageAndOffset;
 import lombok.extern.slf4j.Slf4j;
 
-import com.google.common.base.Function;
-import com.google.common.base.Splitter;
-import com.google.common.collect.Iterators;
-import com.google.common.collect.Lists;
-import com.google.common.collect.Maps;
-import com.google.common.net.HostAndPort;
-import com.typesafe.config.Config;
-
 import org.apache.gobblin.source.extractor.extract.kafka.KafkaOffsetRetrievalFailureException;
 import org.apache.gobblin.source.extractor.extract.kafka.KafkaPartition;
 import org.apache.gobblin.source.extractor.extract.kafka.KafkaTopic;
 import org.apache.gobblin.util.ConfigUtils;
-import org.apache.gobblin.util.DatasetFilterUtils;
 
 /**
  * A {@link GobblinKafkaConsumerClient} that uses kafka 08 scala consumer client. All the code has been moved from the
diff --git a/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/kafka/client/GobblinKafkaConsumerClient.java b/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/kafka/client/GobblinKafkaConsumerClient.java
index f94530b..4026f33 100644
--- a/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/kafka/client/GobblinKafkaConsumerClient.java
+++ b/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/kafka/client/GobblinKafkaConsumerClient.java
@@ -17,6 +17,7 @@
 package org.apache.gobblin.kafka.client;
 
 import java.io.Closeable;
+import java.util.Collection;
 import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
@@ -73,6 +74,25 @@ public interface GobblinKafkaConsumerClient extends Closeable {
   public long getLatestOffset(KafkaPartition partition) throws KafkaOffsetRetrievalFailureException;
 
   /**
+   * Get the latest available offset for a {@link Collection} of {@link KafkaPartition}s. NOTE: The default implementation
+   * is not efficient i.e. it will make a getLatestOffset() call for every {@link KafkaPartition}. Individual implementations
+   * of {@link GobblinKafkaConsumerClient} should override this method to use more advanced APIs of the underlying KafkaConsumer
+   * to retrieve the latest offsets for a collection of partitions.
+   *
+   * @param partitions for which latest offset is retrieved
+   *
+   * @throws KafkaOffsetRetrievalFailureException - If the underlying kafka-client does not support getting latest offset
+   */
+  public default Map<KafkaPartition, Long> getLatestOffsets(Collection<KafkaPartition> partitions)
+      throws KafkaOffsetRetrievalFailureException {
+    Map<KafkaPartition, Long> offsetMap = Maps.newHashMap();
+    for (KafkaPartition partition: partitions) {
+      offsetMap.put(partition, getLatestOffset(partition));
+    }
+    return offsetMap;
+  }
+
+  /**
    * API to consume records from kakfa starting from <code>nextOffset</code> till <code>maxOffset</code>.
    * If <code>maxOffset</code> is greater than <code>nextOffset</code>, returns a null.
    * <code>nextOffset</code>
diff --git a/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/source/extractor/extract/kafka/KafkaAvroExtractor.java b/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/source/extractor/extract/kafka/KafkaAvroExtractor.java
index 4547cfe..bbe69b5 100644
--- a/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/source/extractor/extract/kafka/KafkaAvroExtractor.java
+++ b/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/source/extractor/extract/kafka/KafkaAvroExtractor.java
@@ -19,8 +19,6 @@ package org.apache.gobblin.source.extractor.extract.kafka;
 
 import java.io.IOException;
 
-import lombok.extern.slf4j.Slf4j;
-
 import org.apache.avro.Schema;
 import org.apache.avro.SchemaBuilder;
 import org.apache.avro.generic.GenericData.Record;
@@ -31,6 +29,8 @@ import org.apache.avro.io.Decoder;
 import com.google.common.base.Optional;
 import com.google.common.base.Preconditions;
 
+import lombok.extern.slf4j.Slf4j;
+
 import org.apache.gobblin.configuration.WorkUnitState;
 import org.apache.gobblin.kafka.client.ByteArrayBasedKafkaRecord;
 import org.apache.gobblin.metrics.kafka.KafkaSchemaRegistry;
@@ -116,7 +116,7 @@ public abstract class KafkaAvroExtractor<K> extends KafkaExtractor<Schema, Gener
       record = convertRecord(record);
       return record;
     } catch (IOException e) {
-      log.error(String.format("Error during decoding record for partition %s: ", this.getCurrentPartition()));
+      log.error(String.format("Error during decoding record for partition %s: ", getCurrentPartition()));
       throw e;
     }
   }
diff --git a/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/source/extractor/extract/kafka/KafkaExtractor.java b/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/source/extractor/extract/kafka/KafkaExtractor.java
index 3ca24bb..5ea38b7 100644
--- a/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/source/extractor/extract/kafka/KafkaExtractor.java
+++ b/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/source/extractor/extract/kafka/KafkaExtractor.java
@@ -20,19 +20,15 @@ package org.apache.gobblin.source.extractor.extract.kafka;
 import java.io.IOException;
 import java.util.Iterator;
 import java.util.List;
-import java.util.Map;
-import java.util.Set;
-import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicBoolean;
 
-import org.apache.gobblin.runtime.JobShutdownException;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import com.google.common.base.Preconditions;
 import com.google.common.collect.Lists;
-import com.google.common.collect.Maps;
-import com.google.common.collect.Sets;
+
+import lombok.Getter;
 
 import org.apache.gobblin.configuration.ConfigurationKeys;
 import org.apache.gobblin.configuration.State;
@@ -43,7 +39,7 @@ import org.apache.gobblin.kafka.client.GobblinKafkaConsumerClient;
 import org.apache.gobblin.kafka.client.GobblinKafkaConsumerClient.GobblinKafkaConsumerClientFactory;
 import org.apache.gobblin.kafka.client.KafkaConsumerRecord;
 import org.apache.gobblin.metrics.Tag;
-import org.apache.gobblin.metrics.event.EventSubmitter;
+import org.apache.gobblin.runtime.JobShutdownException;
 import org.apache.gobblin.source.extractor.DataRecordException;
 import org.apache.gobblin.source.extractor.Extractor;
 import org.apache.gobblin.source.extractor.extract.EventBasedExtractor;
@@ -61,25 +57,19 @@ public abstract class KafkaExtractor<S, D> extends EventBasedExtractor<S, D> {
 
   private static final Logger LOG = LoggerFactory.getLogger(KafkaExtractor.class);
 
+
+  private final ClassAliasResolver<GobblinKafkaConsumerClientFactory> kafkaConsumerClientResolver;
+  private final AtomicBoolean shutdownRequested = new AtomicBoolean(false);
+
+  private Iterator<KafkaConsumerRecord> messageIterator = null;
+  @Getter
+  private int currentPartitionIdx = INITIAL_PARTITION_IDX;
+  @Getter
+  private long readStartTime;
+
   protected static final int INITIAL_PARTITION_IDX = -1;
-  protected static final Long MAX_LOG_DECODING_ERRORS = 5L;
 
-  // Constants for event submission
-  public static final String TOPIC = "topic";
-  public static final String PARTITION = "partition";
-  public static final String LOW_WATERMARK = "lowWatermark";
-  public static final String ACTUAL_HIGH_WATERMARK = "actualHighWatermark";
-  public static final String EXPECTED_HIGH_WATERMARK = "expectedHighWatermark";
-  public static final String ELAPSED_TIME = "elapsedTime";
-  public static final String PROCESSED_RECORD_COUNT = "processedRecordCount";
-  public static final String UNDECODABLE_MESSAGE_COUNT = "undecodableMessageCount";
-  public static final String PARTITION_TOTAL_SIZE = "partitionTotalSize";
-  public static final String AVG_RECORD_PULL_TIME = "avgRecordPullTime";
-  public static final String READ_RECORD_TIME = "readRecordTime";
-  public static final String DECODE_RECORD_TIME = "decodeRecordTime";
-  public static final String FETCH_MESSAGE_BUFFER_TIME = "fetchMessageBufferTime";
-  public static final String GOBBLIN_KAFKA_NAMESPACE = "gobblin.kafka";
-  public static final String KAFKA_EXTRACTOR_TOPIC_METADATA_EVENT_NAME = "KafkaExtractorTopicMetadata";
+  protected static final Long MAX_LOG_DECODING_ERRORS = 5L;
 
   protected final WorkUnitState workUnitState;
   protected final String topicName;
@@ -87,35 +77,11 @@ public abstract class KafkaExtractor<S, D> extends EventBasedExtractor<S, D> {
   protected final MultiLongWatermark lowWatermark;
   protected final MultiLongWatermark highWatermark;
   protected final MultiLongWatermark nextWatermark;
+  protected final KafkaExtractorStatsTracker statsTracker;
   protected final GobblinKafkaConsumerClient kafkaConsumerClient;
-  private final ClassAliasResolver<GobblinKafkaConsumerClientFactory> kafkaConsumerClientResolver;
-
-  protected final Map<KafkaPartition, Long> decodingErrorCount;
-  private final Map<KafkaPartition, Double> avgMillisPerRecord;
-  private final Map<KafkaPartition, Long> avgRecordSizes;
-  private final Map<KafkaPartition, Long> elapsedTime;
-  private final Map<KafkaPartition, Long> processedRecordCount;
-  private final Map<KafkaPartition, Long> partitionTotalSize;
-  private final Map<KafkaPartition, Long> decodeRecordTime;
-  private final Map<KafkaPartition, Long> fetchMessageBufferTime;
-  private final Map<KafkaPartition, Long> readRecordTime;
-  private final Map<KafkaPartition, Long> startFetchEpochTime;
-  private final Map<KafkaPartition, Long> stopFetchEpochTime;
-
-  private final Set<Integer> errorPartitions;
-  private int undecodableMessageCount = 0;
 
-  private Iterator<KafkaConsumerRecord> messageIterator = null;
-  private int currentPartitionIdx = INITIAL_PARTITION_IDX;
-  private long currentPartitionRecordCount = 0;
-  private long currentPartitionTotalSize = 0;
-  private long currentPartitionDecodeRecordTime = 0;
-  private long currentPartitionFetchMessageBufferTime = 0;
-  private long currentPartitionReadRecordTime = 0;
   protected D currentPartitionLastSuccessfulRecord = null;
 
-  private final AtomicBoolean shutdownRequested = new AtomicBoolean(false);
-
   public KafkaExtractor(WorkUnitState state) {
     super(state);
     this.workUnitState = state;
@@ -135,20 +101,7 @@ public abstract class KafkaExtractor<S, D> extends EventBasedExtractor<S, D> {
     } catch (InstantiationException | IllegalAccessException | ClassNotFoundException e) {
       throw new RuntimeException(e);
     }
-
-    this.decodingErrorCount = Maps.newHashMap();
-    this.avgMillisPerRecord = Maps.newHashMapWithExpectedSize(this.partitions.size());
-    this.avgRecordSizes = Maps.newHashMapWithExpectedSize(this.partitions.size());
-    this.elapsedTime = Maps.newHashMapWithExpectedSize(this.partitions.size());
-    this.processedRecordCount = Maps.newHashMapWithExpectedSize(this.partitions.size());
-    this.partitionTotalSize = Maps.newHashMapWithExpectedSize(this.partitions.size());
-    this.decodeRecordTime = Maps.newHashMapWithExpectedSize(this.partitions.size());
-    this.fetchMessageBufferTime = Maps.newHashMapWithExpectedSize(this.partitions.size());
-    this.readRecordTime = Maps.newHashMapWithExpectedSize(this.partitions.size());
-    this.startFetchEpochTime = Maps.newHashMapWithExpectedSize(this.partitions.size());
-    this.stopFetchEpochTime= Maps.newHashMapWithExpectedSize(this.partitions.size());
-
-    this.errorPartitions = Sets.newHashSet();
+    this.statsTracker = new KafkaExtractorStatsTracker(state,partitions);
 
     // The actual high watermark starts with the low watermark
     this.workUnitState.setActualHighWatermark(this.lowWatermark);
@@ -161,6 +114,13 @@ public abstract class KafkaExtractor<S, D> extends EventBasedExtractor<S, D> {
     return tags;
   }
 
+  protected KafkaPartition getCurrentPartition() {
+    Preconditions.checkElementIndex(this.currentPartitionIdx, this.partitions.size(),
+        "KafkaExtractor has finished extracting all partitions. There's no current partition.");
+    return this.partitions.get(this.currentPartitionIdx);
+  }
+
+
   /**
    * Return the next decodable record from the current partition. If the current partition has no more
    * decodable record, move on to the next partition. If all partitions have been processed, return null.
@@ -172,7 +132,7 @@ public abstract class KafkaExtractor<S, D> extends EventBasedExtractor<S, D> {
       return null;
     }
 
-    long readStartTime = System.nanoTime();
+    this.readStartTime = System.nanoTime();
 
     while (!allPartitionsFinished()) {
       if (currentPartitionFinished()) {
@@ -183,7 +143,7 @@ public abstract class KafkaExtractor<S, D> extends EventBasedExtractor<S, D> {
         try {
           long fetchStartTime = System.nanoTime();
           this.messageIterator = fetchNextMessageBuffer();
-          this.currentPartitionFetchMessageBufferTime += System.nanoTime() - fetchStartTime;
+          this.statsTracker.onFetchNextMessageBuffer(this.currentPartitionIdx, fetchStartTime);
         } catch (Exception e) {
           LOG.error(String.format("Failed to fetch next message buffer for partition %s. Will skip this partition.",
               getCurrentPartition()), e);
@@ -216,25 +176,18 @@ public abstract class KafkaExtractor<S, D> extends EventBasedExtractor<S, D> {
 
           D record = decodeKafkaMessage(nextValidMessage);
 
-          this.currentPartitionDecodeRecordTime += System.nanoTime() - decodeStartTime;
-          this.currentPartitionRecordCount++;
-          this.currentPartitionTotalSize += nextValidMessage.getValueSizeInBytes();
-          this.currentPartitionReadRecordTime += System.nanoTime() - readStartTime;
+          this.statsTracker.onDecodeableRecord(this.currentPartitionIdx, readStartTime, decodeStartTime, nextValidMessage.getValueSizeInBytes());
           this.currentPartitionLastSuccessfulRecord = record;
           return record;
         } catch (Throwable t) {
-          this.errorPartitions.add(this.currentPartitionIdx);
-          this.undecodableMessageCount++;
+          statsTracker.onUndecodeableRecord(this.currentPartitionIdx);
           if (shouldLogError()) {
             LOG.error(String.format("A record from partition %s cannot be decoded.", getCurrentPartition()), t);
           }
-          incrementErrorCount();
         }
       }
     }
     LOG.info("Finished pulling topic " + this.topicName);
-
-    this.currentPartitionReadRecordTime += System.nanoTime() - readStartTime;
     return null;
   }
 
@@ -275,7 +228,7 @@ public abstract class KafkaExtractor<S, D> extends EventBasedExtractor<S, D> {
     if (this.currentPartitionIdx == INITIAL_PARTITION_IDX) {
       return true;
     } else if (this.nextWatermark.get(this.currentPartitionIdx) >= this.highWatermark.get(this.currentPartitionIdx)) {
-      LOG.info("Finished pulling partition " + this.getCurrentPartition());
+      LOG.info("Finished pulling partition " + getCurrentPartition());
       return true;
     } else {
       return false;
@@ -291,60 +244,33 @@ public abstract class KafkaExtractor<S, D> extends EventBasedExtractor<S, D> {
       LOG.info("Pulling topic " + this.topicName);
       this.currentPartitionIdx = 0;
     } else {
-      updateStatisticsForCurrentPartition();
+      this.statsTracker.updateStatisticsForCurrentPartition(currentPartitionIdx, readStartTime, getLastSuccessfulRecordHeaderTimestamp());
       this.currentPartitionIdx++;
-      this.currentPartitionRecordCount = 0;
-      this.currentPartitionTotalSize = 0;
-      this.currentPartitionDecodeRecordTime = 0;
-      this.currentPartitionFetchMessageBufferTime = 0;
-      this.currentPartitionReadRecordTime = 0;
       this.currentPartitionLastSuccessfulRecord = null;
     }
 
     this.messageIterator = null;
     if (this.currentPartitionIdx < this.partitions.size()) {
-      LOG.info(String.format("Pulling partition %s from offset %d to %d, range=%d", this.getCurrentPartition(),
+      LOG.info(String.format("Pulling partition %s from offset %d to %d, range=%d", getCurrentPartition(),
           this.nextWatermark.get(this.currentPartitionIdx), this.highWatermark.get(this.currentPartitionIdx),
           this.highWatermark.get(this.currentPartitionIdx) - this.nextWatermark.get(this.currentPartitionIdx)));
       switchMetricContextToCurrentPartition();
     }
 
     if (!allPartitionsFinished()) {
-      this.startFetchEpochTime.put(this.getCurrentPartition(), System.currentTimeMillis());
+      this.statsTracker.resetStartFetchEpochTime(currentPartitionIdx);
     }
   }
 
-  protected void updateStatisticsForCurrentPartition() {
-    long stopFetchEpochTime = System.currentTimeMillis();
-
-    if (!allPartitionsFinished()) {
-      this.stopFetchEpochTime.put(this.getCurrentPartition(), stopFetchEpochTime);
-    }
-
-    if (this.currentPartitionRecordCount != 0) {
-      long currentPartitionFetchDuration =
-          stopFetchEpochTime - this.startFetchEpochTime.get(this.getCurrentPartition());
-      double avgMillisForCurrentPartition =
-          (double) currentPartitionFetchDuration / (double) this.currentPartitionRecordCount;
-      this.avgMillisPerRecord.put(this.getCurrentPartition(), avgMillisForCurrentPartition);
-
-      long avgRecordSize = this.currentPartitionTotalSize / this.currentPartitionRecordCount;
-      this.avgRecordSizes.put(this.getCurrentPartition(), avgRecordSize);
-
-      this.elapsedTime.put(this.getCurrentPartition(), currentPartitionFetchDuration);
-      this.processedRecordCount.put(this.getCurrentPartition(), this.currentPartitionRecordCount);
-      this.partitionTotalSize.put(this.getCurrentPartition(), this.currentPartitionTotalSize);
-      this.decodeRecordTime.put(this.getCurrentPartition(), this.currentPartitionDecodeRecordTime);
-      this.fetchMessageBufferTime.put(this.getCurrentPartition(), this.currentPartitionFetchMessageBufferTime);
-      this.readRecordTime.put(this.getCurrentPartition(), this.currentPartitionReadRecordTime);
-    }
+  protected long getLastSuccessfulRecordHeaderTimestamp() {
+    return 0;
   }
 
   private void switchMetricContextToCurrentPartition() {
     if (this.currentPartitionIdx >= this.partitions.size()) {
       return;
     }
-    int currentPartitionId = this.getCurrentPartition().getId();
+    int currentPartitionId = getCurrentPartition().getId();
     switchMetricContext(Lists.<Tag<?>> newArrayList(new Tag<>("kafka_partition", currentPartitionId)));
   }
 
@@ -354,22 +280,7 @@ public abstract class KafkaExtractor<S, D> extends EventBasedExtractor<S, D> {
   }
 
   private boolean shouldLogError() {
-    return !this.decodingErrorCount.containsKey(getCurrentPartition())
-        || this.decodingErrorCount.get(getCurrentPartition()) <= MAX_LOG_DECODING_ERRORS;
-  }
-
-  private void incrementErrorCount() {
-    if (this.decodingErrorCount.containsKey(getCurrentPartition())) {
-      this.decodingErrorCount.put(getCurrentPartition(), this.decodingErrorCount.get(getCurrentPartition()) + 1);
-    } else {
-      this.decodingErrorCount.put(getCurrentPartition(), 1L);
-    }
-  }
-
-  protected KafkaPartition getCurrentPartition() {
-    Preconditions.checkElementIndex(this.currentPartitionIdx, this.partitions.size(),
-        "KafkaExtractor has finished extracting all partitions. There's no current partition.");
-    return this.partitions.get(this.currentPartitionIdx);
+    return this.statsTracker.getDecodingErrorCount(this.currentPartitionIdx) <= MAX_LOG_DECODING_ERRORS;
   }
 
   protected abstract D decodeRecord(ByteArrayBasedKafkaRecord kafkaConsumerRecord) throws IOException;
@@ -392,112 +303,16 @@ public abstract class KafkaExtractor<S, D> extends EventBasedExtractor<S, D> {
 
   @Override
   public void close() throws IOException {
-    if (currentPartitionIdx != INITIAL_PARTITION_IDX) {
-      updateStatisticsForCurrentPartition();
+    if (!allPartitionsFinished()) {
+      this.statsTracker.updateStatisticsForCurrentPartition(currentPartitionIdx, readStartTime, getLastSuccessfulRecordHeaderTimestamp());
     }
-
-    Map<KafkaPartition, Map<String, String>> tagsForPartitionsMap = Maps.newHashMap();
-
     // Add error partition count and error message count to workUnitState
-    this.workUnitState.setProp(ConfigurationKeys.ERROR_PARTITION_COUNT, this.errorPartitions.size());
-    this.workUnitState.setProp(ConfigurationKeys.ERROR_MESSAGE_UNDECODABLE_COUNT, this.undecodableMessageCount);
-
-    for (int i = 0; i < this.partitions.size(); i++) {
-      LOG.info(String.format("Actual high watermark for partition %s=%d, expected=%d", this.partitions.get(i),
-          this.nextWatermark.get(i), this.highWatermark.get(i)));
-      tagsForPartitionsMap.put(this.partitions.get(i), createTagsForPartition(i));
-    }
+    this.workUnitState.setProp(ConfigurationKeys.ERROR_PARTITION_COUNT, this.statsTracker.getErrorPartitionCount());
+    this.workUnitState.setProp(ConfigurationKeys.ERROR_MESSAGE_UNDECODABLE_COUNT, this.statsTracker.getUndecodableMessageCount());
     this.workUnitState.setActualHighWatermark(this.nextWatermark);
-
     if (isInstrumentationEnabled()) {
-      for (Map.Entry<KafkaPartition, Map<String, String>> eventTags : tagsForPartitionsMap.entrySet()) {
-        new EventSubmitter.Builder(getMetricContext(), GOBBLIN_KAFKA_NAMESPACE).build()
-            .submit(KAFKA_EXTRACTOR_TOPIC_METADATA_EVENT_NAME, eventTags.getValue());
-      }
+      this.statsTracker.emitTrackingEvents(getMetricContext(), this.lowWatermark, this.highWatermark, this.nextWatermark);
     }
-
-    this.closer.close();
-  }
-
-  protected Map<String, String> createTagsForPartition(int partitionId) {
-    Map<String, String> tagsForPartition = Maps.newHashMap();
-    KafkaPartition partition = this.partitions.get(partitionId);
-
-    tagsForPartition.put(TOPIC, partition.getTopicName());
-    tagsForPartition.put(PARTITION, Integer.toString(partition.getId()));
-    tagsForPartition.put(LOW_WATERMARK, Long.toString(this.lowWatermark.get(partitionId)));
-    tagsForPartition.put(ACTUAL_HIGH_WATERMARK, Long.toString(this.nextWatermark.get(partitionId)));
-
-    // These are used to compute the load factor,
-    // gobblin consumption rate relative to the kafka production rate.
-    // The gobblin rate is computed as (processed record count/elapsed time)
-    // The kafka rate is computed as (expected high watermark - previous latest offset) /
-    // (current offset fetch epoch time - previous offset fetch epoch time).
-    tagsForPartition.put(EXPECTED_HIGH_WATERMARK, Long.toString(this.highWatermark.get(partitionId)));
-    tagsForPartition.put(KafkaSource.PREVIOUS_OFFSET_FETCH_EPOCH_TIME,
-        Long.toString(KafkaUtils.getPropAsLongFromSingleOrMultiWorkUnitState(this.workUnitState,
-            KafkaSource.PREVIOUS_OFFSET_FETCH_EPOCH_TIME, partitionId)));
-    tagsForPartition.put(KafkaSource.OFFSET_FETCH_EPOCH_TIME,
-        Long.toString(KafkaUtils.getPropAsLongFromSingleOrMultiWorkUnitState(this.workUnitState,
-            KafkaSource.OFFSET_FETCH_EPOCH_TIME, partitionId)));
-    tagsForPartition.put(KafkaSource.PREVIOUS_LATEST_OFFSET,
-        Long.toString(KafkaUtils.getPropAsLongFromSingleOrMultiWorkUnitState(this.workUnitState,
-            KafkaSource.PREVIOUS_LATEST_OFFSET, partitionId)));
-
-    tagsForPartition.put(KafkaSource.PREVIOUS_LOW_WATERMARK,
-        Long.toString(KafkaUtils.getPropAsLongFromSingleOrMultiWorkUnitState(this.workUnitState,
-            KafkaSource.PREVIOUS_LOW_WATERMARK, partitionId)));
-    tagsForPartition.put(KafkaSource.PREVIOUS_HIGH_WATERMARK,
-        Long.toString(KafkaUtils.getPropAsLongFromSingleOrMultiWorkUnitState(this.workUnitState,
-            KafkaSource.PREVIOUS_HIGH_WATERMARK, partitionId)));
-    tagsForPartition.put(KafkaSource.PREVIOUS_START_FETCH_EPOCH_TIME,
-        Long.toString(KafkaUtils.getPropAsLongFromSingleOrMultiWorkUnitState(this.workUnitState,
-            KafkaSource.PREVIOUS_START_FETCH_EPOCH_TIME, partitionId)));
-    tagsForPartition.put(KafkaSource.PREVIOUS_STOP_FETCH_EPOCH_TIME,
-        Long.toString(KafkaUtils.getPropAsLongFromSingleOrMultiWorkUnitState(this.workUnitState,
-            KafkaSource.PREVIOUS_STOP_FETCH_EPOCH_TIME, partitionId)));
-
-    tagsForPartition.put(KafkaSource.START_FETCH_EPOCH_TIME, Long.toString(this.startFetchEpochTime.getOrDefault(partition, 0L)));
-    tagsForPartition.put(KafkaSource.STOP_FETCH_EPOCH_TIME, Long.toString(this.stopFetchEpochTime.getOrDefault(partition, 0L)));
-    this.workUnitState.setProp(KafkaUtils.getPartitionPropName(KafkaSource.START_FETCH_EPOCH_TIME, partitionId),
-        Long.toString(this.startFetchEpochTime.getOrDefault(partition, 0L)));
-    this.workUnitState.setProp(KafkaUtils.getPartitionPropName(KafkaSource.STOP_FETCH_EPOCH_TIME, partitionId),
-        Long.toString(this.stopFetchEpochTime.getOrDefault(partition, 0L)));
-
-    if (this.processedRecordCount.containsKey(partition)) {
-      tagsForPartition.put(PROCESSED_RECORD_COUNT, Long.toString(this.processedRecordCount.get(partition)));
-      tagsForPartition.put(PARTITION_TOTAL_SIZE, Long.toString(this.partitionTotalSize.get(partition)));
-      tagsForPartition.put(ELAPSED_TIME, Long.toString(this.elapsedTime.get(partition)));
-      tagsForPartition.put(DECODE_RECORD_TIME, Long.toString(TimeUnit.NANOSECONDS.toMillis(
-          this.decodeRecordTime.get(partition))));
-      tagsForPartition.put(FETCH_MESSAGE_BUFFER_TIME, Long.toString(TimeUnit.NANOSECONDS.toMillis(
-          this.fetchMessageBufferTime.get(partition))));
-      tagsForPartition.put(READ_RECORD_TIME, Long.toString(TimeUnit.NANOSECONDS.toMillis(
-          this.readRecordTime.get(partition))));
-    } else {
-      tagsForPartition.put(PROCESSED_RECORD_COUNT, "0");
-      tagsForPartition.put(PARTITION_TOTAL_SIZE, "0");
-      tagsForPartition.put(ELAPSED_TIME, "0");
-      tagsForPartition.put(DECODE_RECORD_TIME, "0");
-      tagsForPartition.put(FETCH_MESSAGE_BUFFER_TIME, "0");
-      tagsForPartition.put(READ_RECORD_TIME, "0");
-    }
-
-    tagsForPartition.put(UNDECODABLE_MESSAGE_COUNT,
-        Long.toString(this.decodingErrorCount.getOrDefault(partition, 0L)));
-
-    // Commit avg time to pull a record for each partition
-    if (this.avgMillisPerRecord.containsKey(partition)) {
-      double avgMillis = this.avgMillisPerRecord.get(partition);
-      LOG.info(String.format("Avg time to pull a record for partition %s = %f milliseconds", partition, avgMillis));
-      KafkaUtils.setPartitionAvgRecordMillis(this.workUnitState, partition, avgMillis);
-      tagsForPartition.put(AVG_RECORD_PULL_TIME, Double.toString(avgMillis));
-    } else {
-      LOG.info(String.format("Avg time to pull a record for partition %s not recorded", partition));
-      tagsForPartition.put(AVG_RECORD_PULL_TIME, Double.toString(-1));
-    }
-
-    return tagsForPartition;
   }
 
   @Deprecated
diff --git a/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/source/extractor/extract/kafka/KafkaExtractorStatsTracker.java b/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/source/extractor/extract/kafka/KafkaExtractorStatsTracker.java
new file mode 100644
index 0000000..8173418
--- /dev/null
+++ b/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/source/extractor/extract/kafka/KafkaExtractorStatsTracker.java
@@ -0,0 +1,307 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.gobblin.source.extractor.extract.kafka;
+
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.TimeUnit;
+
+import com.google.common.collect.Maps;
+import com.google.common.collect.Sets;
+
+import lombok.Data;
+import lombok.Getter;
+import lombok.extern.slf4j.Slf4j;
+
+import org.apache.gobblin.configuration.WorkUnitState;
+import org.apache.gobblin.metrics.MetricContext;
+import org.apache.gobblin.metrics.event.EventSubmitter;
+
+
+/**
+ * A class that tracks KafkaExtractor statistics such as record decode time, #processed records, #undecodeable records etc.
+ *
+ */
+@Slf4j
+public class KafkaExtractorStatsTracker {
+  // Constants for event submission
+  public static final String TOPIC = "topic";
+  public static final String PARTITION = "partition";
+
+  private static final String GOBBLIN_KAFKA_NAMESPACE = "gobblin.kafka";
+  private static final String KAFKA_EXTRACTOR_TOPIC_METADATA_EVENT_NAME = "KafkaExtractorTopicMetadata";
+  private static final String LOW_WATERMARK = "lowWatermark";
+  private static final String ACTUAL_HIGH_WATERMARK = "actualHighWatermark";
+  private static final String EXPECTED_HIGH_WATERMARK = "expectedHighWatermark";
+  private static final String ELAPSED_TIME = "elapsedTime";
+  private static final String PROCESSED_RECORD_COUNT = "processedRecordCount";
+  private static final String UNDECODABLE_MESSAGE_COUNT = "undecodableMessageCount";
+  private static final String PARTITION_TOTAL_SIZE = "partitionTotalSize";
+  private static final String AVG_RECORD_PULL_TIME = "avgRecordPullTime";
+  private static final String AVG_RECORD_SIZE = "avgRecordSize";
+  private static final String READ_RECORD_TIME = "readRecordTime";
+  private static final String DECODE_RECORD_TIME = "decodeRecordTime";
+  private static final String FETCH_MESSAGE_BUFFER_TIME = "fetchMessageBufferTime";
+  private static final String LAST_RECORD_HEADER_TIMESTAMP = "lastRecordHeaderTimestamp";
+
+  @Getter
+  private final Map<KafkaPartition, ExtractorStats> statsMap;
+  private final Set<Integer> errorPartitions;
+  private final WorkUnitState workUnitState;
+
+  //A global count of number of undecodeable messages encountered by the KafkaExtractor across all Kafka
+  //TopicPartitions.
+  @Getter
+  private int undecodableMessageCount = 0;
+  private List<KafkaPartition> partitions;
+
+  public KafkaExtractorStatsTracker(WorkUnitState state, List<KafkaPartition> partitions) {
+    this.workUnitState = state;
+    this.partitions = partitions;
+    this.statsMap = Maps.newHashMapWithExpectedSize(this.partitions.size());
+    this.partitions.forEach(partition -> this.statsMap.put(partition, new ExtractorStats()));
+    this.errorPartitions = Sets.newHashSet();
+  }
+
+  public int getErrorPartitionCount() {
+    return this.errorPartitions.size();
+  }
+
+  /**
+   * A Java POJO that encapsulates various extractor stats.
+   */
+  @Data
+  public static class ExtractorStats {
+    private long decodingErrorCount = -1L;
+    private double avgMillisPerRecord = -1;
+    private long avgRecordSize;
+    private long elapsedTime;
+    private long processedRecordCount;
+    private long partitionTotalSize;
+    private long decodeRecordTime;
+    private long fetchMessageBufferTime;
+    private long readRecordTime;
+    private long startFetchEpochTime;
+    private long stopFetchEpochTime;
+    private long lastSuccessfulRecordHeaderTimestamp;
+  }
+
+  /**
+   *
+   * @param partitionIdx index of Kafka topic partition.
+   * @return the number of undecodeable records for a given partition id.
+   */
+  public Long getDecodingErrorCount(int partitionIdx) {
+    return this.statsMap.get(this.partitions.get(partitionIdx)).getDecodingErrorCount();
+  }
+
+  /**
+   * Called when the KafkaExtractor encounters an undecodeable record.
+   */
+  public void onUndecodeableRecord(int partitionIdx) {
+    this.errorPartitions.add(partitionIdx);
+    this.undecodableMessageCount++;
+    incrementErrorCount(partitionIdx);
+  }
+
+  private void incrementErrorCount(int partitionIdx) {
+    this.statsMap.computeIfPresent(this.partitions.get(partitionIdx), (k, v) -> {
+      if (v.decodingErrorCount < 0) {
+        v.decodingErrorCount = 1;
+      } else {
+        v.decodingErrorCount++;
+      }
+      return v;
+    });
+  }
+
+  public void resetStartFetchEpochTime(int partitionIdx) {
+    this.statsMap.computeIfPresent(this.partitions.get(partitionIdx), (k, v) -> {
+      v.startFetchEpochTime = System.currentTimeMillis();
+      return v;
+    });
+  }
+
+  /**
+   * A method that is called when a Kafka record is successfully decoded.
+   * @param partitionIdx the index of Kafka Partition .
+   * @param readStartTime the start time when readRecord() is invoked.
+   * @param decodeStartTime the time instant immediately before a record decoding begins.
+   * @param recordSizeInBytes the size of the decoded record in bytes.
+   */
+  public void onDecodeableRecord(int partitionIdx, long readStartTime, long decodeStartTime, long recordSizeInBytes) {
+    this.statsMap.computeIfPresent(this.partitions.get(partitionIdx), (k, v) -> {
+      long currentTime = System.nanoTime();
+      v.processedRecordCount++;
+      v.partitionTotalSize += recordSizeInBytes;
+      v.decodeRecordTime += currentTime - decodeStartTime;
+      v.readRecordTime += currentTime - readStartTime;
+      return v;
+    });
+  }
+
+  /**
+   * A method that is called after a batch of records has been fetched from Kafka e.g. via a consumer.poll().
+   * @param partitionIdx the index of Kafka partition
+   * @param fetchStartTime the time instant immediately before fetching records from Kafka.
+   */
+  public void onFetchNextMessageBuffer(int partitionIdx, long fetchStartTime) {
+    this.statsMap.computeIfPresent(this.partitions.get(partitionIdx), (k, v) -> {
+      v.fetchMessageBufferTime += System.nanoTime() - fetchStartTime;
+      return v;
+    });
+  }
+
+  /**
+   * A method when a partition has been processed.
+   * @param partitionIdx the index of Kafka partition
+   * @param readStartTime the start time when readRecord.
+   */
+  void onPartitionReadComplete(int partitionIdx, long readStartTime) {
+    this.statsMap.computeIfPresent(this.partitions.get(partitionIdx), (k, v) -> {
+      v.readRecordTime += System.nanoTime() - readStartTime;
+      return v;
+    });
+  }
+
+  /**
+   * A method that is invoked to update the statistics for current partition. In the batch mode of execution, this is
+   * invoked when a partition has been processed and before the next partition can be processed. In the streaming mode of
+   * execution, this method is invoked on every flush.
+   * @param partitionIdx the index of Kafka partition
+   * @param readStartTime the start time when readRecord() is invoked.
+   */
+  public void updateStatisticsForCurrentPartition(int partitionIdx, long readStartTime, long lastSuccessfulRecordHeaderTimestamp) {
+    long stopFetchEpochTime = System.currentTimeMillis();
+    this.statsMap.computeIfPresent(this.partitions.get(partitionIdx), (k, v) -> {
+      v.stopFetchEpochTime = stopFetchEpochTime;
+      if (v.processedRecordCount != 0) {
+        v.elapsedTime = stopFetchEpochTime - this.statsMap.get(this.partitions.get(partitionIdx)).getStartFetchEpochTime();
+        //Compute average stats
+        v.avgMillisPerRecord = (double) v.elapsedTime / (double) v.processedRecordCount;
+        v.avgRecordSize = this.statsMap.get(this.partitions.get(partitionIdx)).getPartitionTotalSize() / v.processedRecordCount;
+        v.lastSuccessfulRecordHeaderTimestamp = lastSuccessfulRecordHeaderTimestamp;
+      }
+      return v;
+    });
+    onPartitionReadComplete(partitionIdx, readStartTime);
+  }
+
+  private Map<String, String> createTagsForPartition(int partitionId, MultiLongWatermark lowWatermark, MultiLongWatermark highWatermark, MultiLongWatermark nextWatermark) {
+    Map<String, String> tagsForPartition = Maps.newHashMap();
+    KafkaPartition partition = this.partitions.get(partitionId);
+
+    tagsForPartition.put(TOPIC, partition.getTopicName());
+    tagsForPartition.put(PARTITION, Integer.toString(partition.getId()));
+    tagsForPartition.put(LOW_WATERMARK, Long.toString(lowWatermark.get(partitionId)));
+    tagsForPartition.put(ACTUAL_HIGH_WATERMARK, Long.toString(nextWatermark.get(partitionId)));
+
+    // These are used to compute the load factor,
+    // gobblin consumption rate relative to the kafka production rate.
+    // The gobblin rate is computed as (processed record count/elapsed time)
+    // The kafka rate is computed as (expected high watermark - previous latest offset) /
+    // (current offset fetch epoch time - previous offset fetch epoch time).
+    tagsForPartition.put(EXPECTED_HIGH_WATERMARK, Long.toString(highWatermark.get(partitionId)));
+    tagsForPartition.put(KafkaSource.PREVIOUS_OFFSET_FETCH_EPOCH_TIME,
+        Long.toString(KafkaUtils.getPropAsLongFromSingleOrMultiWorkUnitState(this.workUnitState,
+            KafkaSource.PREVIOUS_OFFSET_FETCH_EPOCH_TIME, partitionId)));
+    tagsForPartition.put(KafkaSource.OFFSET_FETCH_EPOCH_TIME,
+        Long.toString(KafkaUtils.getPropAsLongFromSingleOrMultiWorkUnitState(this.workUnitState,
+            KafkaSource.OFFSET_FETCH_EPOCH_TIME, partitionId)));
+    tagsForPartition.put(KafkaSource.PREVIOUS_LATEST_OFFSET,
+        Long.toString(KafkaUtils.getPropAsLongFromSingleOrMultiWorkUnitState(this.workUnitState,
+            KafkaSource.PREVIOUS_LATEST_OFFSET, partitionId)));
+
+    tagsForPartition.put(KafkaSource.PREVIOUS_LOW_WATERMARK,
+        Long.toString(KafkaUtils.getPropAsLongFromSingleOrMultiWorkUnitState(this.workUnitState,
+            KafkaSource.PREVIOUS_LOW_WATERMARK, partitionId)));
+    tagsForPartition.put(KafkaSource.PREVIOUS_HIGH_WATERMARK,
+        Long.toString(KafkaUtils.getPropAsLongFromSingleOrMultiWorkUnitState(this.workUnitState,
+            KafkaSource.PREVIOUS_HIGH_WATERMARK, partitionId)));
+    tagsForPartition.put(KafkaSource.PREVIOUS_START_FETCH_EPOCH_TIME,
+        Long.toString(KafkaUtils.getPropAsLongFromSingleOrMultiWorkUnitState(this.workUnitState,
+            KafkaSource.PREVIOUS_START_FETCH_EPOCH_TIME, partitionId)));
+    tagsForPartition.put(KafkaSource.PREVIOUS_STOP_FETCH_EPOCH_TIME,
+        Long.toString(KafkaUtils.getPropAsLongFromSingleOrMultiWorkUnitState(this.workUnitState,
+            KafkaSource.PREVIOUS_STOP_FETCH_EPOCH_TIME, partitionId)));
+
+    ExtractorStats stats = this.statsMap.getOrDefault(partition, new ExtractorStats());
+
+    tagsForPartition.put(KafkaSource.START_FETCH_EPOCH_TIME, Long.toString(stats.getStartFetchEpochTime()));
+    tagsForPartition.put(KafkaSource.STOP_FETCH_EPOCH_TIME, Long.toString(stats.getStopFetchEpochTime()));
+    this.workUnitState.setProp(KafkaUtils.getPartitionPropName(KafkaSource.START_FETCH_EPOCH_TIME, partitionId),
+        Long.toString(stats.getStartFetchEpochTime()));
+    this.workUnitState.setProp(KafkaUtils.getPartitionPropName(KafkaSource.STOP_FETCH_EPOCH_TIME, partitionId),
+        Long.toString(stats.getStopFetchEpochTime()));
+    tagsForPartition.put(PROCESSED_RECORD_COUNT, Long.toString(stats.getProcessedRecordCount()));
+    tagsForPartition.put(PARTITION_TOTAL_SIZE, Long.toString(stats.getPartitionTotalSize()));
+    tagsForPartition.put(AVG_RECORD_SIZE, Long.toString(stats.getAvgRecordSize()));
+    tagsForPartition.put(ELAPSED_TIME, Long.toString(stats.getElapsedTime()));
+    tagsForPartition.put(DECODE_RECORD_TIME, Long.toString(TimeUnit.NANOSECONDS.toMillis(stats.getDecodeRecordTime())));
+    tagsForPartition.put(FETCH_MESSAGE_BUFFER_TIME,
+        Long.toString(TimeUnit.NANOSECONDS.toMillis(stats.getFetchMessageBufferTime())));
+    tagsForPartition.put(READ_RECORD_TIME, Long.toString(TimeUnit.NANOSECONDS.toMillis(stats.getReadRecordTime())));
+    tagsForPartition.put(UNDECODABLE_MESSAGE_COUNT, Long.toString(stats.getDecodingErrorCount()));
+    tagsForPartition.put(LAST_RECORD_HEADER_TIMESTAMP, Long.toString(stats.getLastSuccessfulRecordHeaderTimestamp()));
+
+    // Commit avg time to pull a record for each partition
+    double avgMillis = stats.getAvgMillisPerRecord();
+    if (avgMillis >= 0) {
+      log.info(String.format("Avg time to pull a record for partition %s = %f milliseconds", partition, avgMillis));
+      KafkaUtils.setPartitionAvgRecordMillis(this.workUnitState, partition, avgMillis);
+      tagsForPartition.put(AVG_RECORD_PULL_TIME, Double.toString(avgMillis));
+    } else {
+      log.info(String.format("Avg time to pull a record for partition %s not recorded", partition));
+      tagsForPartition.put(AVG_RECORD_PULL_TIME, Double.toString(-1));
+    }
+
+    return tagsForPartition;
+  }
+
+  /**
+   * Emit Tracking events reporting the various statistics to be consumed by a monitoring application.
+   * @param context the current {@link MetricContext}
+   * @param lowWatermark begin Kafka offset for each topic partition
+   * @param highWatermark the expected last Kafka offset for each topic partition to be consumed by the Extractor
+   * @param nextWatermark the offset of next valid message for each Kafka topic partition consumed by the Extractor
+   */
+  public void emitTrackingEvents(MetricContext context, MultiLongWatermark lowWatermark, MultiLongWatermark highWatermark,
+      MultiLongWatermark nextWatermark) {
+    Map<KafkaPartition, Map<String, String>> tagsForPartitionsMap = Maps.newHashMap();
+    for (int i = 0; i < this.partitions.size(); i++) {
+      log.info(String.format("Actual high watermark for partition %s=%d, expected=%d", this.partitions.get(i),
+          nextWatermark.get(i), highWatermark.get(i)));
+      tagsForPartitionsMap
+          .put(this.partitions.get(i), createTagsForPartition(i, lowWatermark, highWatermark, nextWatermark));
+    }
+    for (Map.Entry<KafkaPartition, Map<String, String>> eventTags : tagsForPartitionsMap.entrySet()) {
+      new EventSubmitter.Builder(context, GOBBLIN_KAFKA_NAMESPACE).build()
+          .submit(KAFKA_EXTRACTOR_TOPIC_METADATA_EVENT_NAME, eventTags.getValue());
+    }
+  }
+
+  /**
+   * Reset all KafkaExtractor stats.
+   */
+  public void reset() {
+    this.partitions.forEach(partition -> this.statsMap.put(partition, new ExtractorStats()));
+    for (int partitionIdx = 0; partitionIdx < this.partitions.size(); partitionIdx++) {
+      resetStartFetchEpochTime(partitionIdx);
+    }
+  }
+}
diff --git a/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/source/extractor/extract/kafka/KafkaPartition.java b/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/source/extractor/extract/kafka/KafkaPartition.java
index 0a11bbf..29cea5b 100644
--- a/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/source/extractor/extract/kafka/KafkaPartition.java
+++ b/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/source/extractor/extract/kafka/KafkaPartition.java
@@ -31,6 +31,7 @@ public final class KafkaPartition {
   private final int id;
   private final String topicName;
   private KafkaLeader leader;
+  private int hashCode;
 
   public static class Builder {
     private int id = 0;
@@ -103,10 +104,14 @@ public final class KafkaPartition {
 
   @Override
   public int hashCode() {
-    final int prime = 31;
-    int result = 1;
-    result = prime * result + this.id;
-    result = prime * result + ((this.topicName == null) ? 0 : this.topicName.hashCode());
+    int result = hashCode;
+    if (result == 0) {
+      final int prime = 31;
+      result = 1;
+      result = prime * result + this.id;
+      result = prime * result + ((this.topicName == null) ? 0 : this.topicName.hashCode());
+      hashCode = result;
+    }
     return result;
   }
 
diff --git a/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/source/extractor/extract/kafka/KafkaSource.java b/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/source/extractor/extract/kafka/KafkaSource.java
index 62485f0..5fa53ea 100644
--- a/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/source/extractor/extract/kafka/KafkaSource.java
+++ b/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/source/extractor/extract/kafka/KafkaSource.java
@@ -30,14 +30,12 @@ import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicInteger;
 import java.util.regex.Pattern;
 
-import org.apache.gobblin.dataset.DatasetConstants;
-import org.apache.gobblin.dataset.DatasetDescriptor;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import com.codahale.metrics.Timer;
-import com.google.common.base.Joiner;
 import com.google.common.base.Function;
+import com.google.common.base.Joiner;
 import com.google.common.base.Optional;
 import com.google.common.base.Preconditions;
 import com.google.common.base.Splitter;
@@ -48,12 +46,19 @@ import com.google.common.collect.Maps;
 import com.google.common.collect.Sets;
 import com.typesafe.config.Config;
 
+import lombok.Getter;
+import lombok.Setter;
+
 import org.apache.gobblin.configuration.ConfigurationKeys;
 import org.apache.gobblin.configuration.SourceState;
 import org.apache.gobblin.configuration.State;
 import org.apache.gobblin.configuration.WorkUnitState;
+import org.apache.gobblin.dataset.DatasetConstants;
+import org.apache.gobblin.dataset.DatasetDescriptor;
+import org.apache.gobblin.instrumented.Instrumented;
 import org.apache.gobblin.kafka.client.GobblinKafkaConsumerClient;
 import org.apache.gobblin.kafka.client.GobblinKafkaConsumerClient.GobblinKafkaConsumerClientFactory;
+import org.apache.gobblin.metrics.MetricContext;
 import org.apache.gobblin.metrics.event.lineage.LineageInfo;
 import org.apache.gobblin.source.extractor.extract.EventBasedSource;
 import org.apache.gobblin.source.extractor.extract.kafka.workunit.packer.KafkaWorkUnitPacker;
@@ -66,13 +71,7 @@ import org.apache.gobblin.util.ConfigUtils;
 import org.apache.gobblin.util.DatasetFilterUtils;
 import org.apache.gobblin.util.ExecutorsUtils;
 import org.apache.gobblin.util.dataset.DatasetUtils;
-import org.apache.gobblin.instrumented.Instrumented;
-import org.apache.gobblin.metrics.MetricContext;
-
-import lombok.Getter;
-import lombok.Setter;
 
-import static java.util.stream.Collectors.toList;
 import static java.util.stream.Collectors.toSet;
 
 
@@ -115,6 +114,7 @@ public abstract class KafkaSource<S, D> extends EventBasedSource<S, D> {
   public static final String PREVIOUS_LATEST_OFFSET = "previousLatestOffset";
   public static final String OFFSET_FETCH_EPOCH_TIME = "offsetFetchEpochTime";
   public static final String PREVIOUS_OFFSET_FETCH_EPOCH_TIME = "previousOffsetFetchEpochTime";
+  public static final String NUM_TOPIC_PARTITIONS = "numTopicPartitions";
   public static final String GOBBLIN_KAFKA_CONSUMER_CLIENT_FACTORY_CLASS = "gobblin.kafka.consumerClient.class";
   public static final String GOBBLIN_KAFKA_EXTRACT_ALLOW_TABLE_TYPE_NAMESPACE_CUSTOMIZATION =
       "gobblin.kafka.extract.allowTableTypeAndNamspaceCustomization";
diff --git a/gobblin-modules/gobblin-kafka-common/src/test/java/org/apache/gobblin/source/extractor/extract/kafka/KafkaExtractorStatsTrackerTest.java b/gobblin-modules/gobblin-kafka-common/src/test/java/org/apache/gobblin/source/extractor/extract/kafka/KafkaExtractorStatsTrackerTest.java
new file mode 100644
index 0000000..abd0447
--- /dev/null
+++ b/gobblin-modules/gobblin-kafka-common/src/test/java/org/apache/gobblin/source/extractor/extract/kafka/KafkaExtractorStatsTrackerTest.java
@@ -0,0 +1,137 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.gobblin.source.extractor.extract.kafka;
+
+import java.util.ArrayList;
+import java.util.List;
+
+import org.testng.Assert;
+import org.testng.annotations.BeforeClass;
+import org.testng.annotations.Test;
+
+import org.apache.gobblin.configuration.WorkUnitState;
+
+
+public class KafkaExtractorStatsTrackerTest {
+  List<KafkaPartition> kafkaPartitions = new ArrayList<>();
+  private KafkaExtractorStatsTracker extractorStatsTracker;
+
+  @BeforeClass
+  public void setUp() {
+    kafkaPartitions.add(new KafkaPartition.Builder().withTopicName("test-topic").withId(0).build());
+    kafkaPartitions.add(new KafkaPartition.Builder().withTopicName("test-topic").withId(1).build());
+    this.extractorStatsTracker = new KafkaExtractorStatsTracker(new WorkUnitState(), kafkaPartitions);
+  }
+
+  @Test
+  public void testOnUndecodeableRecord() {
+    //Ensure that error counters are initialized correctly
+    Assert.assertEquals(this.extractorStatsTracker.getErrorPartitionCount(), 0);
+    Assert.assertEquals(this.extractorStatsTracker.getDecodingErrorCount(0).longValue(), -1);
+    Assert.assertEquals(this.extractorStatsTracker.getDecodingErrorCount(0).longValue(), -1);
+
+    //Ensure that error counters are updated correctly after 1st call to KafkaExtractorStatsTracker#onUndecodeableRecord()
+    this.extractorStatsTracker.onUndecodeableRecord(0);
+    Assert.assertEquals(this.extractorStatsTracker.getDecodingErrorCount(0).longValue(), 1);
+    Assert.assertEquals(this.extractorStatsTracker.getDecodingErrorCount(1).longValue(), -1);
+    Assert.assertEquals(this.extractorStatsTracker.getErrorPartitionCount(), 1);
+
+    //Ensure that error counters are updated correctly after 2nd call to KafkaExtractorStatsTracker#onUndecodeableRecord()
+    this.extractorStatsTracker.onUndecodeableRecord(0);
+    Assert.assertEquals(this.extractorStatsTracker.getDecodingErrorCount(0).longValue(), 2);
+    Assert.assertEquals(this.extractorStatsTracker.getDecodingErrorCount(1).longValue(), -1);
+    Assert.assertEquals(this.extractorStatsTracker.getErrorPartitionCount(), 1);
+  }
+
+  @Test
+  public void testResetStartFetchEpochTime() {
+    long currentTime = System.currentTimeMillis();
+    this.extractorStatsTracker.resetStartFetchEpochTime(1);
+    Assert.assertTrue(this.extractorStatsTracker.getStatsMap().get(kafkaPartitions.get(1)).getStartFetchEpochTime() >= currentTime);
+  }
+
+  @Test
+  public void testOnDecodeableRecord() throws InterruptedException {
+    long readStartTime = System.nanoTime();
+    Thread.sleep(1);
+    long decodeStartTime = System.nanoTime();
+
+    Assert.assertEquals(this.extractorStatsTracker.getStatsMap().get(kafkaPartitions.get(0)).getProcessedRecordCount(), 0);
+    Assert.assertEquals(this.extractorStatsTracker.getStatsMap().get(kafkaPartitions.get(0)).getPartitionTotalSize(), 0);
+    Assert.assertTrue(this.extractorStatsTracker.getStatsMap().get(kafkaPartitions.get(0)).getDecodeRecordTime() == 0);
+    Assert.assertTrue(this.extractorStatsTracker.getStatsMap().get(kafkaPartitions.get(0)).getReadRecordTime() == 0);
+
+    this.extractorStatsTracker.onDecodeableRecord(0, readStartTime, decodeStartTime, 100);
+    Assert.assertEquals(this.extractorStatsTracker.getStatsMap().get(kafkaPartitions.get(0)).getProcessedRecordCount(), 1);
+    Assert.assertEquals(this.extractorStatsTracker.getStatsMap().get(kafkaPartitions.get(0)).getPartitionTotalSize(), 100);
+    Assert.assertTrue(this.extractorStatsTracker.getStatsMap().get(kafkaPartitions.get(0)).getDecodeRecordTime() > 0);
+    Assert.assertTrue(this.extractorStatsTracker.getStatsMap().get(kafkaPartitions.get(0)).getReadRecordTime() > 0);
+
+    readStartTime = System.nanoTime();
+    Thread.sleep(1);
+    decodeStartTime = System.nanoTime();
+    long previousDecodeRecordTime = this.extractorStatsTracker.getStatsMap().get(kafkaPartitions.get(0)).getDecodeRecordTime();
+    long previousReadRecordTime = this.extractorStatsTracker.getStatsMap().get(kafkaPartitions.get(0)).getReadRecordTime();
+
+    this.extractorStatsTracker.onDecodeableRecord(0, readStartTime, decodeStartTime, 100);
+    Assert.assertEquals(this.extractorStatsTracker.getStatsMap().get(kafkaPartitions.get(0)).getProcessedRecordCount(), 2);
+    Assert.assertEquals(this.extractorStatsTracker.getStatsMap().get(kafkaPartitions.get(0)).getPartitionTotalSize(), 200);
+    Assert.assertTrue(this.extractorStatsTracker.getStatsMap().get(kafkaPartitions.get(0)).getDecodeRecordTime() > previousDecodeRecordTime);
+    Assert.assertTrue(this.extractorStatsTracker.getStatsMap().get(kafkaPartitions.get(0)).getReadRecordTime() > previousReadRecordTime);
+  }
+
+  @Test
+  public void testOnFetchNextMessageBuffer() throws InterruptedException {
+    Assert.assertEquals(this.extractorStatsTracker.getStatsMap().get(kafkaPartitions.get(1)).getFetchMessageBufferTime(), 0);
+    long fetchStartTime = System.nanoTime();
+    Thread.sleep(1);
+    this.extractorStatsTracker.onFetchNextMessageBuffer(1, fetchStartTime);
+    Assert.assertTrue(this.extractorStatsTracker.getStatsMap().get(kafkaPartitions.get(1)).getFetchMessageBufferTime() > 0);
+  }
+
+  @Test
+  public void testOnPartitionReadComplete() throws InterruptedException {
+    Assert.assertEquals(this.extractorStatsTracker.getStatsMap().get(kafkaPartitions.get(1)).getReadRecordTime(), 0);
+    long readStartTime = System.nanoTime();
+    Thread.sleep(1);
+    this.extractorStatsTracker.onPartitionReadComplete(1, readStartTime);
+    Assert.assertTrue(this.extractorStatsTracker.getStatsMap().get(kafkaPartitions.get(1)).getReadRecordTime() > 0);
+  }
+
+  @Test (dependsOnMethods = "testOnDecodeableRecord")
+  public void testUpdateStatisticsForCurrentPartition()
+      throws InterruptedException {
+    long readStartTime = System.nanoTime();
+    Assert.assertEquals(this.extractorStatsTracker.getStatsMap().get(kafkaPartitions.get(0)).getStopFetchEpochTime(), 0);
+    Assert.assertEquals(this.extractorStatsTracker.getStatsMap().get(kafkaPartitions.get(0)).getElapsedTime(), 0);
+    Assert.assertTrue(this.extractorStatsTracker.getStatsMap().get(kafkaPartitions.get(0)).getAvgMillisPerRecord() < 0);
+    this.extractorStatsTracker.updateStatisticsForCurrentPartition(0, readStartTime, 0);
+    Assert.assertTrue(this.extractorStatsTracker.getStatsMap().get(kafkaPartitions.get(0)).getStopFetchEpochTime() > 0);
+    Assert.assertTrue(this.extractorStatsTracker.getStatsMap().get(kafkaPartitions.get(0)).getElapsedTime() > 0);
+    Assert.assertTrue(this.extractorStatsTracker.getStatsMap().get(kafkaPartitions.get(0)).getAvgMillisPerRecord() > 0);
+    Assert.assertEquals(this.extractorStatsTracker.getStatsMap().get(kafkaPartitions.get(0)).getAvgRecordSize(), 100);
+
+    readStartTime = System.nanoTime();
+    Thread.sleep(1);
+    long decodeStartTime = System.nanoTime();
+    this.extractorStatsTracker.onDecodeableRecord(1, readStartTime, decodeStartTime, 100);
+    this.extractorStatsTracker.updateStatisticsForCurrentPartition(1, readStartTime, 0);
+    Assert.assertTrue(this.extractorStatsTracker.getStatsMap().get(kafkaPartitions.get(1)).getElapsedTime() > 0);
+    Assert.assertTrue(this.extractorStatsTracker.getStatsMap().get(kafkaPartitions.get(1)).getAvgMillisPerRecord() > 0);
+    Assert.assertEquals(this.extractorStatsTracker.getStatsMap().get(kafkaPartitions.get(1)).getAvgRecordSize(), 100);
+  }
+}
\ No newline at end of file