You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@gobblin.apache.org by ab...@apache.org on 2018/03/21 08:30:52 UTC

[34/50] incubator-gobblin git commit: [GOBBLIN-408] Add more info to the KafkaExtractorTopicMetadata event for tracking execution times and rates

[GOBBLIN-408] Add more info to the KafkaExtractorTopicMetadata event for tracking execution times and rates

Closes #2285 from htran1/kafka_load_factor


Project: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/commit/a3189d73
Tree: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/tree/a3189d73
Diff: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/diff/a3189d73

Branch: refs/heads/0.12.0
Commit: a3189d73360c13412d91d42bea05f6ded1e4006a
Parents: 11182dc
Author: Hung Tran <hu...@linkedin.com>
Authored: Tue Feb 20 11:16:32 2018 -0800
Committer: Hung Tran <hu...@linkedin.com>
Committed: Tue Feb 20 11:16:32 2018 -0800

----------------------------------------------------------------------
 .../extractor/extract/kafka/KafkaExtractor.java | 81 ++++++++++++++++--
 .../extractor/extract/kafka/KafkaSource.java    | 87 ++++++++++++++++++--
 .../workunit/packer/KafkaWorkUnitPacker.java    | 18 ++++
 3 files changed, 173 insertions(+), 13 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/a3189d73/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/source/extractor/extract/kafka/KafkaExtractor.java
----------------------------------------------------------------------
diff --git a/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/source/extractor/extract/kafka/KafkaExtractor.java b/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/source/extractor/extract/kafka/KafkaExtractor.java
index 1ff0159..0ec3caf 100644
--- a/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/source/extractor/extract/kafka/KafkaExtractor.java
+++ b/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/source/extractor/extract/kafka/KafkaExtractor.java
@@ -69,7 +69,12 @@ public abstract class KafkaExtractor<S, D> extends EventBasedExtractor<S, D> {
   public static final String LOW_WATERMARK = "lowWatermark";
   public static final String ACTUAL_HIGH_WATERMARK = "actualHighWatermark";
   public static final String EXPECTED_HIGH_WATERMARK = "expectedHighWatermark";
+  public static final String ELAPSED_TIME = "elapsedTime";
+  public static final String PROCESSED_RECORD_COUNT = "processedRecordCount";
   public static final String AVG_RECORD_PULL_TIME = "avgRecordPullTime";
+  public static final String READ_RECORD_TIME = "readRecordTime";
+  public static final String DECODE_RECORD_TIME = "decodeRecordTime";
+  public static final String FETCH_MESSAGE_BUFFER_TIME = "fetchMessageBufferTime";
   public static final String GOBBLIN_KAFKA_NAMESPACE = "gobblin.kafka";
   public static final String KAFKA_EXTRACTOR_TOPIC_METADATA_EVENT_NAME = "KafkaExtractorTopicMetadata";
 
@@ -87,6 +92,11 @@ public abstract class KafkaExtractor<S, D> extends EventBasedExtractor<S, D> {
   protected final Map<KafkaPartition, Integer> decodingErrorCount;
   private final Map<KafkaPartition, Double> avgMillisPerRecord;
   private final Map<KafkaPartition, Long> avgRecordSizes;
+  private final Map<KafkaPartition, Long> elapsedTime;
+  private final Map<KafkaPartition, Long> processedRecordCount;
+  private final Map<KafkaPartition, Long> decodeRecordTime;
+  private final Map<KafkaPartition, Long> fetchMessageBufferTime;
+  private final Map<KafkaPartition, Long> readRecordTime;
 
   private final Set<Integer> errorPartitions;
   private int undecodableMessageCount = 0;
@@ -95,6 +105,10 @@ public abstract class KafkaExtractor<S, D> extends EventBasedExtractor<S, D> {
   private int currentPartitionIdx = INITIAL_PARTITION_IDX;
   private long currentPartitionRecordCount = 0;
   private long currentPartitionTotalSize = 0;
+  private long currentPartitionFetchDuration = 0;
+  private long currentPartitionDecodeRecordTime = 0;
+  private long currentPartitionFetchMessageBufferTime = 0;
+  private long currentPartitionReadRecordTime = 0;
 
   public KafkaExtractor(WorkUnitState state) {
     super(state);
@@ -121,6 +135,11 @@ public abstract class KafkaExtractor<S, D> extends EventBasedExtractor<S, D> {
     this.decodingErrorCount = Maps.newHashMap();
     this.avgMillisPerRecord = Maps.newHashMapWithExpectedSize(this.partitions.size());
     this.avgRecordSizes = Maps.newHashMapWithExpectedSize(this.partitions.size());
+    this.elapsedTime = Maps.newHashMapWithExpectedSize(this.partitions.size());
+    this.processedRecordCount = Maps.newHashMapWithExpectedSize(this.partitions.size());
+    this.decodeRecordTime = Maps.newHashMapWithExpectedSize(this.partitions.size());
+    this.fetchMessageBufferTime = Maps.newHashMapWithExpectedSize(this.partitions.size());
+    this.readRecordTime = Maps.newHashMapWithExpectedSize(this.partitions.size());
 
     this.errorPartitions = Sets.newHashSet();
 
@@ -142,6 +161,8 @@ public abstract class KafkaExtractor<S, D> extends EventBasedExtractor<S, D> {
   @SuppressWarnings("unchecked")
   @Override
   public D readRecordImpl(D reuse) throws DataRecordException, IOException {
+    long readStartTime = System.nanoTime();
+
     while (!allPartitionsFinished()) {
       if (currentPartitionFinished()) {
         moveToNextPartition();
@@ -149,7 +170,9 @@ public abstract class KafkaExtractor<S, D> extends EventBasedExtractor<S, D> {
       }
       if (this.messageIterator == null || !this.messageIterator.hasNext()) {
         try {
+          long fetchStartTime = System.nanoTime();
           this.messageIterator = fetchNextMessageBuffer();
+          this.currentPartitionFetchMessageBufferTime += System.nanoTime() - fetchStartTime;
         } catch (Exception e) {
           LOG.error(String.format("Failed to fetch next message buffer for partition %s. Will skip this partition.",
               getCurrentPartition()), e);
@@ -178,6 +201,9 @@ public abstract class KafkaExtractor<S, D> extends EventBasedExtractor<S, D> {
         this.nextWatermark.set(this.currentPartitionIdx, nextValidMessage.getNextOffset());
         try {
           D record = null;
+          // track time for decode/convert depending on the record type
+          long decodeStartTime = System.nanoTime();
+
           if (nextValidMessage instanceof ByteArrayBasedKafkaRecord) {
             record = decodeRecord((ByteArrayBasedKafkaRecord)nextValidMessage);
           } else if (nextValidMessage instanceof DecodeableKafkaRecord){
@@ -194,8 +220,10 @@ public abstract class KafkaExtractor<S, D> extends EventBasedExtractor<S, D> {
                     + " or DecodeableKafkaRecord");
           }
 
+          this.currentPartitionDecodeRecordTime += System.nanoTime() - decodeStartTime;
           this.currentPartitionRecordCount++;
           this.currentPartitionTotalSize += nextValidMessage.getValueSizeInBytes();
+          this.currentPartitionReadRecordTime += System.nanoTime() - readStartTime;
           return record;
         } catch (Throwable t) {
           this.errorPartitions.add(this.currentPartitionIdx);
@@ -208,6 +236,8 @@ public abstract class KafkaExtractor<S, D> extends EventBasedExtractor<S, D> {
       }
     }
     LOG.info("Finished pulling topic " + this.topicName);
+
+    this.currentPartitionReadRecordTime += System.nanoTime() - readStartTime;
     return null;
   }
 
@@ -235,10 +265,13 @@ public abstract class KafkaExtractor<S, D> extends EventBasedExtractor<S, D> {
       LOG.info("Pulling topic " + this.topicName);
       this.currentPartitionIdx = 0;
     } else {
-      computeAvgMillisPerRecordForCurrentPartition();
+      updateStatisticsForCurrentPartition();
       this.currentPartitionIdx++;
       this.currentPartitionRecordCount = 0;
-      this.currentPartitionTotalSize = 0;
+      this.currentPartitionFetchDuration = 0;
+      this.currentPartitionDecodeRecordTime = 0;
+      this.currentPartitionFetchMessageBufferTime = 0;
+      this.currentPartitionReadRecordTime = 0;
     }
 
     this.messageIterator = null;
@@ -251,16 +284,25 @@ public abstract class KafkaExtractor<S, D> extends EventBasedExtractor<S, D> {
     this.stopwatch.start();
   }
 
-  private void computeAvgMillisPerRecordForCurrentPartition() {
+  private void updateStatisticsForCurrentPartition() {
     this.stopwatch.stop();
+
     if (this.currentPartitionRecordCount != 0) {
+      this.currentPartitionFetchDuration = this.stopwatch.elapsed(TimeUnit.MILLISECONDS);
       double avgMillisForCurrentPartition =
-          (double) this.stopwatch.elapsed(TimeUnit.MILLISECONDS) / (double) this.currentPartitionRecordCount;
+          (double) this.currentPartitionFetchDuration / (double) this.currentPartitionRecordCount;
       this.avgMillisPerRecord.put(this.getCurrentPartition(), avgMillisForCurrentPartition);
 
       long avgRecordSize = this.currentPartitionTotalSize / this.currentPartitionRecordCount;
       this.avgRecordSizes.put(this.getCurrentPartition(), avgRecordSize);
+
+      this.elapsedTime.put(this.getCurrentPartition(), this.currentPartitionFetchDuration);
+      this.processedRecordCount.put(this.getCurrentPartition(), this.currentPartitionRecordCount);
+      this.decodeRecordTime.put(this.getCurrentPartition(), this.currentPartitionDecodeRecordTime);
+      this.fetchMessageBufferTime.put(this.getCurrentPartition(), this.currentPartitionFetchMessageBufferTime);
+      this.readRecordTime.put(this.getCurrentPartition(), this.currentPartitionReadRecordTime);
     }
+
     this.stopwatch.reset();
   }
 
@@ -317,7 +359,7 @@ public abstract class KafkaExtractor<S, D> extends EventBasedExtractor<S, D> {
   @Override
   public void close() throws IOException {
 
-    computeAvgMillisPerRecordForCurrentPartition();
+    updateStatisticsForCurrentPartition();
 
     Map<KafkaPartition, Map<String, String>> tagsForPartitionsMap = Maps.newHashMap();
 
@@ -336,7 +378,36 @@ public abstract class KafkaExtractor<S, D> extends EventBasedExtractor<S, D> {
       tagsForPartition.put(PARTITION, Integer.toString(partition.getId()));
       tagsForPartition.put(LOW_WATERMARK, Long.toString(this.lowWatermark.get(i)));
       tagsForPartition.put(ACTUAL_HIGH_WATERMARK, Long.toString(this.nextWatermark.get(i)));
+      // These are used to compute the load factor,
+      // gobblin consumption rate relative to the kafka production rate.
+      // The gobblin rate is computed as (processed record count/elapsed time)
+      // The kafka rate is computed as (expected high watermark - previous latest offset) /
+      // (current offset fetch epoch time - previous offset fetch epoch time).
       tagsForPartition.put(EXPECTED_HIGH_WATERMARK, Long.toString(this.highWatermark.get(i)));
+      tagsForPartition.put(KafkaSource.PREVIOUS_OFFSET_FETCH_EPOCH_TIME,
+          this.workUnitState.getProp(KafkaUtils.getPartitionPropName(KafkaSource.PREVIOUS_OFFSET_FETCH_EPOCH_TIME,
+              i)));
+      tagsForPartition.put(KafkaSource.OFFSET_FETCH_EPOCH_TIME,
+          this.workUnitState.getProp(KafkaUtils.getPartitionPropName(KafkaSource.OFFSET_FETCH_EPOCH_TIME, i)));
+      tagsForPartition.put(KafkaSource.PREVIOUS_LATEST_OFFSET,
+          this.workUnitState.getProp(KafkaUtils.getPartitionPropName(KafkaSource.PREVIOUS_LATEST_OFFSET, i)));
+
+      if (this.processedRecordCount.containsKey(partition)) {
+        tagsForPartition.put(PROCESSED_RECORD_COUNT, Long.toString(this.processedRecordCount.get(partition)));
+        tagsForPartition.put(ELAPSED_TIME, Long.toString(this.elapsedTime.get(partition)));
+        tagsForPartition.put(DECODE_RECORD_TIME, Long.toString(TimeUnit.NANOSECONDS.toMillis(
+            this.decodeRecordTime.get(partition))));
+        tagsForPartition.put(FETCH_MESSAGE_BUFFER_TIME, Long.toString(TimeUnit.NANOSECONDS.toMillis(
+            this.fetchMessageBufferTime.get(partition))));
+        tagsForPartition.put(READ_RECORD_TIME, Long.toString(TimeUnit.NANOSECONDS.toMillis(
+            this.readRecordTime.get(partition))));
+      } else {
+        tagsForPartition.put(PROCESSED_RECORD_COUNT, "0");
+        tagsForPartition.put(ELAPSED_TIME, "0");
+        tagsForPartition.put(DECODE_RECORD_TIME, "0");
+        tagsForPartition.put(FETCH_MESSAGE_BUFFER_TIME, "0");
+        tagsForPartition.put(READ_RECORD_TIME, "0");
+      }
 
       tagsForPartitionsMap.put(partition, tagsForPartition);
     }

http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/a3189d73/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/source/extractor/extract/kafka/KafkaSource.java
----------------------------------------------------------------------
diff --git a/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/source/extractor/extract/kafka/KafkaSource.java b/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/source/extractor/extract/kafka/KafkaSource.java
index 69ebea6..b96412c 100644
--- a/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/source/extractor/extract/kafka/KafkaSource.java
+++ b/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/source/extractor/extract/kafka/KafkaSource.java
@@ -104,6 +104,9 @@ public abstract class KafkaSource<S, D> extends EventBasedSource<S, D> {
   public static final String ALL_TOPICS = "all";
   public static final String AVG_RECORD_SIZE = "avg.record.size";
   public static final String AVG_RECORD_MILLIS = "avg.record.millis";
+  public static final String PREVIOUS_LATEST_OFFSET = "previousLatestOffset";
+  public static final String OFFSET_FETCH_EPOCH_TIME = "offsetFetchEpochTime";
+  public static final String PREVIOUS_OFFSET_FETCH_EPOCH_TIME = "previousOffsetFetchEpochTime";
   public static final String GOBBLIN_KAFKA_CONSUMER_CLIENT_FACTORY_CLASS = "gobblin.kafka.consumerClient.class";
   public static final String GOBBLIN_KAFKA_EXTRACT_ALLOW_TABLE_TYPE_NAMESPACE_CUSTOMIZATION =
       "gobblin.kafka.extract.allowTableTypeAndNamspaceCustomization";
@@ -116,6 +119,8 @@ public abstract class KafkaSource<S, D> extends EventBasedSource<S, D> {
 
   private final Set<String> moveToLatestTopics = Sets.newTreeSet(String.CASE_INSENSITIVE_ORDER);
   private final Map<KafkaPartition, Long> previousOffsets = Maps.newConcurrentMap();
+  private final Map<KafkaPartition, Long> previousExpectedHighWatermarks = Maps.newConcurrentMap();
+  private final Map<KafkaPartition, Long> previousOffsetFetchEpochTimes = Maps.newConcurrentMap();
 
   private final Set<KafkaPartition> partitionsToBeProcessed = Sets.newConcurrentHashSet();
 
@@ -298,7 +303,7 @@ public abstract class KafkaSource<S, D> extends EventBasedSource<S, D> {
       Map<String, State> topicSpecificStateMap, SourceState state) {
 
     // in case the previous offset not been set
-    getAllPreviousOffsets(state);
+    getAllPreviousOffsetState(state);
 
     // For each partition that has a previous offset, create an empty WorkUnit for it if
     // it is not in this.partitionsToBeProcessed.
@@ -309,6 +314,7 @@ public abstract class KafkaSource<S, D> extends EventBasedSource<S, D> {
         if (!this.isDatasetStateEnabled.get() || this.topicsToProcess.contains(topicName)) {
           long previousOffset = entry.getValue();
           WorkUnit emptyWorkUnit = createEmptyWorkUnit(partition, previousOffset,
+              this.previousOffsetFetchEpochTimes.get(partition),
               Optional.fromNullable(topicSpecificStateMap.get(partition.getTopicName())));
 
           if (workUnits.containsKey(topicName)) {
@@ -368,6 +374,7 @@ public abstract class KafkaSource<S, D> extends EventBasedSource<S, D> {
     boolean failedToGetKafkaOffsets = false;
 
     try (Timer.Context context = this.metricContext.timer(OFFSET_FETCH_TIMER).time()) {
+      offsets.setOffsetFetchEpochTime(System.currentTimeMillis());
       offsets.setEarliestOffset(this.kafkaConsumerClient.get().getEarliestOffset(partition));
       offsets.setLatestOffset(this.kafkaConsumerClient.get().getLatestOffset(partition));
     } catch (KafkaOffsetRetrievalFailureException e) {
@@ -375,9 +382,13 @@ public abstract class KafkaSource<S, D> extends EventBasedSource<S, D> {
     }
 
     long previousOffset = 0;
+    long previousOffsetFetchEpochTime = 0;
     boolean previousOffsetNotFound = false;
     try {
       previousOffset = getPreviousOffsetForPartition(partition, state);
+      offsets.setPreviousLatestOffset(getPreviousExpectedHighWatermark(partition, state));
+      previousOffsetFetchEpochTime = getPreviousOffsetFetchEpochTimeForPartition(partition, state);
+      offsets.setPreviousOffsetFetchEpochTime(previousOffsetFetchEpochTime);
     } catch (PreviousOffsetNotFoundException e) {
       previousOffsetNotFound = true;
     }
@@ -392,7 +403,8 @@ public abstract class KafkaSource<S, D> extends EventBasedSource<S, D> {
       LOG.warn(String
           .format("Failed to retrieve earliest and/or latest offset for partition %s. This partition will be skipped.",
               partition));
-      return previousOffsetNotFound ? null : createEmptyWorkUnit(partition, previousOffset, topicSpecificState);
+      return previousOffsetNotFound ? null : createEmptyWorkUnit(partition, previousOffset, previousOffsetFetchEpochTime,
+          topicSpecificState);
     }
 
     if (shouldMoveToLatestOffset(partition, state)) {
@@ -444,7 +456,7 @@ public abstract class KafkaSource<S, D> extends EventBasedSource<S, D> {
           offsets.startAtEarliestOffset();
         } else {
           LOG.warn(offsetOutOfRangeMsg + "This partition will be skipped.");
-          return createEmptyWorkUnit(partition, previousOffset, topicSpecificState);
+          return createEmptyWorkUnit(partition, previousOffset, previousOffsetFetchEpochTime, topicSpecificState);
         }
       }
     }
@@ -452,10 +464,24 @@ public abstract class KafkaSource<S, D> extends EventBasedSource<S, D> {
     return getWorkUnitForTopicPartition(partition, offsets, topicSpecificState);
   }
 
+  private long getPreviousOffsetFetchEpochTimeForPartition(KafkaPartition partition, SourceState state)
+      throws PreviousOffsetNotFoundException {
+
+    getAllPreviousOffsetState(state);
+
+    if (this.previousOffsetFetchEpochTimes.containsKey(partition)) {
+      return this.previousOffsetFetchEpochTimes.get(partition);
+    }
+
+    throw new PreviousOffsetNotFoundException(String
+        .format("Previous offset fetch epoch time for topic %s, partition %s not found.", partition.getTopicName(),
+            partition.getId()));
+  }
+
   private long getPreviousOffsetForPartition(KafkaPartition partition, SourceState state)
       throws PreviousOffsetNotFoundException {
 
-    getAllPreviousOffsets(state);
+    getAllPreviousOffsetState(state);
 
     if (this.previousOffsets.containsKey(partition)) {
       return this.previousOffsets.get(partition);
@@ -464,12 +490,28 @@ public abstract class KafkaSource<S, D> extends EventBasedSource<S, D> {
         .format("Previous offset for topic %s, partition %s not found.", partition.getTopicName(), partition.getId()));
   }
 
-  // need to be synchronized as this.previousOffsets need to be initialized once
-  private synchronized void getAllPreviousOffsets(SourceState state) {
+  private long getPreviousExpectedHighWatermark(KafkaPartition partition, SourceState state)
+      throws PreviousOffsetNotFoundException {
+
+    getAllPreviousOffsetState(state);
+
+    if (this.previousExpectedHighWatermarks.containsKey(partition)) {
+      return this.previousExpectedHighWatermarks.get(partition);
+    }
+    throw new PreviousOffsetNotFoundException(String
+        .format("Previous expected high watermark for topic %s, partition %s not found.", partition.getTopicName(),
+            partition.getId()));
+  }
+
+  // need to be synchronized as this.previousOffsets, this.previousExpectedHighWatermarks, and
+  // this.previousOffsetFetchEpochTimes need to be initialized once
+  private synchronized void getAllPreviousOffsetState(SourceState state) {
     if (this.doneGettingAllPreviousOffsets) {
       return;
     }
     this.previousOffsets.clear();
+    this.previousExpectedHighWatermarks.clear();
+    this.previousOffsetFetchEpochTimes.clear();
     Map<String, Iterable<WorkUnitState>> workUnitStatesByDatasetUrns = state.getPreviousWorkUnitStatesByDatasetUrns();
 
     if (!workUnitStatesByDatasetUrns.isEmpty() &&
@@ -481,13 +523,26 @@ public abstract class KafkaSource<S, D> extends EventBasedSource<S, D> {
     for (WorkUnitState workUnitState : state.getPreviousWorkUnitStates()) {
       List<KafkaPartition> partitions = KafkaUtils.getPartitions(workUnitState);
       MultiLongWatermark watermark = workUnitState.getActualHighWatermark(MultiLongWatermark.class);
+      MultiLongWatermark previousExpectedHighWatermark =
+          workUnitState.getWorkunit().getExpectedHighWatermark(MultiLongWatermark.class);
       Preconditions.checkArgument(partitions.size() == watermark.size(), String
           .format("Num of partitions doesn't match number of watermarks: partitions=%s, watermarks=%s", partitions,
               watermark));
+
       for (int i = 0; i < partitions.size(); i++) {
+        KafkaPartition partition = partitions.get(i);
+
         if (watermark.get(i) != ConfigurationKeys.DEFAULT_WATERMARK_VALUE) {
-          this.previousOffsets.put(partitions.get(i), watermark.get(i));
+          this.previousOffsets.put(partition, watermark.get(i));
+        }
+
+        if (previousExpectedHighWatermark.get(i) != ConfigurationKeys.DEFAULT_WATERMARK_VALUE) {
+          this.previousExpectedHighWatermarks.put(partition, previousExpectedHighWatermark.get(i));
         }
+
+        this.previousOffsetFetchEpochTimes.put(partition,
+          Long.valueOf(workUnitState.getProp(KafkaUtils.getPartitionPropName(KafkaSource.OFFSET_FETCH_EPOCH_TIME, i),
+              "0")));
       }
     }
 
@@ -511,12 +566,13 @@ public abstract class KafkaSource<S, D> extends EventBasedSource<S, D> {
   }
 
   // thread safe
-  private WorkUnit createEmptyWorkUnit(KafkaPartition partition, long previousOffset,
+  private WorkUnit createEmptyWorkUnit(KafkaPartition partition, long previousOffset, long previousFetchEpochTime,
       Optional<State> topicSpecificState) {
     Offsets offsets = new Offsets();
     offsets.setEarliestOffset(previousOffset);
     offsets.setLatestOffset(previousOffset);
     offsets.startAtEarliestOffset();
+    offsets.setOffsetFetchEpochTime(previousFetchEpochTime);
     return getWorkUnitForTopicPartition(partition, offsets, topicSpecificState);
   }
 
@@ -552,6 +608,9 @@ public abstract class KafkaSource<S, D> extends EventBasedSource<S, D> {
     workUnit.setProp(LEADER_HOSTANDPORT, partition.getLeader().getHostAndPort().toString());
     workUnit.setProp(ConfigurationKeys.WORK_UNIT_LOW_WATER_MARK_KEY, offsets.getStartOffset());
     workUnit.setProp(ConfigurationKeys.WORK_UNIT_HIGH_WATER_MARK_KEY, offsets.getLatestOffset());
+    workUnit.setProp(PREVIOUS_OFFSET_FETCH_EPOCH_TIME, offsets.getPreviousOffsetFetchEpochTime());
+    workUnit.setProp(OFFSET_FETCH_EPOCH_TIME, offsets.getOffsetFetchEpochTime());
+    workUnit.setProp(PREVIOUS_LATEST_OFFSET, offsets.getPreviousLatestOffset());
 
     // Add lineage info
     DatasetDescriptor source = new DatasetDescriptor(DatasetConstants.PLATFORM_KAFKA, partition.getTopicName());
@@ -608,6 +667,18 @@ public abstract class KafkaSource<S, D> extends EventBasedSource<S, D> {
     @Setter
     private long latestOffset = 0;
 
+    @Getter
+    @Setter
+    private long offsetFetchEpochTime = 0;
+
+    @Getter
+    @Setter
+    private long previousOffsetFetchEpochTime = 0;
+
+    @Getter
+    @Setter
+    private long previousLatestOffset = 0;
+
     private void startAt(long offset)
         throws StartOffsetOutOfRangeException {
       if (offset < this.earliestOffset || offset > this.latestOffset) {

http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/a3189d73/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/source/extractor/extract/kafka/workunit/packer/KafkaWorkUnitPacker.java
----------------------------------------------------------------------
diff --git a/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/source/extractor/extract/kafka/workunit/packer/KafkaWorkUnitPacker.java b/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/source/extractor/extract/kafka/workunit/packer/KafkaWorkUnitPacker.java
index 8d03f4f..0d93796 100644
--- a/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/source/extractor/extract/kafka/workunit/packer/KafkaWorkUnitPacker.java
+++ b/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/source/extractor/extract/kafka/workunit/packer/KafkaWorkUnitPacker.java
@@ -219,6 +219,24 @@ public abstract class KafkaWorkUnitPacker {
     workUnit.removeProp(ConfigurationKeys.WORK_UNIT_LOW_WATER_MARK_KEY);
     workUnit.removeProp(ConfigurationKeys.WORK_UNIT_HIGH_WATER_MARK_KEY);
     workUnit.setWatermarkInterval(interval);
+
+    // Update offset fetch epoch time and previous latest offset. These are used to compute the load factor,
+    // gobblin consumption rate relative to the kafka production rate. The kafka rate is computed as
+    // (current latest offset - previous latest offset)/(current epoch time - previous epoch time).
+    int index = 0;
+    for (WorkUnit wu : multiWorkUnit.getWorkUnits()) {
+      workUnit.setProp(KafkaUtils.getPartitionPropName(KafkaSource.PREVIOUS_OFFSET_FETCH_EPOCH_TIME, index),
+          wu.getProp(KafkaSource.PREVIOUS_OFFSET_FETCH_EPOCH_TIME));
+      workUnit.setProp(KafkaUtils.getPartitionPropName(KafkaSource.OFFSET_FETCH_EPOCH_TIME, index),
+          wu.getProp(KafkaSource.OFFSET_FETCH_EPOCH_TIME));
+      workUnit.setProp(KafkaUtils.getPartitionPropName(KafkaSource.PREVIOUS_LATEST_OFFSET, index),
+          wu.getProp(KafkaSource.PREVIOUS_LATEST_OFFSET));
+      index++;
+    }
+    workUnit.removeProp(KafkaSource.PREVIOUS_OFFSET_FETCH_EPOCH_TIME);
+    workUnit.removeProp(KafkaSource.OFFSET_FETCH_EPOCH_TIME);
+    workUnit.removeProp(KafkaSource.PREVIOUS_LATEST_OFFSET);
+
     // Remove the original partition information
     workUnit.removeProp(KafkaSource.PARTITION_ID);
     workUnit.removeProp(KafkaSource.LEADER_ID);