You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@gobblin.apache.org by hu...@apache.org on 2018/09/17 18:03:48 UTC

incubator-gobblin git commit: [GOBBLIN-589] Add more statistics to KafkaExtractor tracking event

Repository: incubator-gobblin
Updated Branches:
  refs/heads/master ef59a1517 -> fcc4d412a


[GOBBLIN-589] Add more statistics to KafkaExtractor tracking event

- Added emitting start/stop fetch epoch time
statistics as well as
partition total size
- Added lagging and emitting of fetch epoch times
and watermark
statistics from previous run
- Made changes to allow subclasses of
KafkaExtractor to emit
statistics based on the lastSuccessfulRecord

Closes #2455 from cshen98/metrics1


Project: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/commit/fcc4d412
Tree: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/tree/fcc4d412
Diff: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/diff/fcc4d412

Branch: refs/heads/master
Commit: fcc4d412afa40a83ec1fccb25d76ea146ff1164b
Parents: ef59a15
Author: Carl Shen <ca...@gmail.com>
Authored: Mon Sep 17 11:03:43 2018 -0700
Committer: Hung Tran <hu...@linkedin.com>
Committed: Mon Sep 17 11:03:43 2018 -0700

----------------------------------------------------------------------
 .../extractor/extract/kafka/KafkaExtractor.java | 173 +++++++++++--------
 .../extractor/extract/kafka/KafkaSource.java    |  82 ++++++++-
 .../extractor/extract/kafka/KafkaUtils.java     |  14 ++
 .../workunit/packer/KafkaWorkUnitPacker.java    |  12 ++
 4 files changed, 209 insertions(+), 72 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/fcc4d412/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/source/extractor/extract/kafka/KafkaExtractor.java
----------------------------------------------------------------------
diff --git a/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/source/extractor/extract/kafka/KafkaExtractor.java b/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/source/extractor/extract/kafka/KafkaExtractor.java
index 0ec3caf..664446f 100644
--- a/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/source/extractor/extract/kafka/KafkaExtractor.java
+++ b/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/source/extractor/extract/kafka/KafkaExtractor.java
@@ -28,7 +28,6 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import com.google.common.base.Preconditions;
-import com.google.common.base.Stopwatch;
 import com.google.common.collect.Lists;
 import com.google.common.collect.Maps;
 import com.google.common.collect.Sets;
@@ -71,6 +70,7 @@ public abstract class KafkaExtractor<S, D> extends EventBasedExtractor<S, D> {
   public static final String EXPECTED_HIGH_WATERMARK = "expectedHighWatermark";
   public static final String ELAPSED_TIME = "elapsedTime";
   public static final String PROCESSED_RECORD_COUNT = "processedRecordCount";
+  public static final String PARTITION_TOTAL_SIZE = "partitionTotalSize";
   public static final String AVG_RECORD_PULL_TIME = "avgRecordPullTime";
   public static final String READ_RECORD_TIME = "readRecordTime";
   public static final String DECODE_RECORD_TIME = "decodeRecordTime";
@@ -87,16 +87,17 @@ public abstract class KafkaExtractor<S, D> extends EventBasedExtractor<S, D> {
   protected final GobblinKafkaConsumerClient kafkaConsumerClient;
   private final ClassAliasResolver<GobblinKafkaConsumerClientFactory> kafkaConsumerClientResolver;
 
-  protected final Stopwatch stopwatch;
-
   protected final Map<KafkaPartition, Integer> decodingErrorCount;
   private final Map<KafkaPartition, Double> avgMillisPerRecord;
   private final Map<KafkaPartition, Long> avgRecordSizes;
   private final Map<KafkaPartition, Long> elapsedTime;
   private final Map<KafkaPartition, Long> processedRecordCount;
+  private final Map<KafkaPartition, Long> partitionTotalSize;
   private final Map<KafkaPartition, Long> decodeRecordTime;
   private final Map<KafkaPartition, Long> fetchMessageBufferTime;
   private final Map<KafkaPartition, Long> readRecordTime;
+  private final Map<KafkaPartition, Long> startFetchEpochTime;
+  private final Map<KafkaPartition, Long> stopFetchEpochTime;
 
   private final Set<Integer> errorPartitions;
   private int undecodableMessageCount = 0;
@@ -105,10 +106,10 @@ public abstract class KafkaExtractor<S, D> extends EventBasedExtractor<S, D> {
   private int currentPartitionIdx = INITIAL_PARTITION_IDX;
   private long currentPartitionRecordCount = 0;
   private long currentPartitionTotalSize = 0;
-  private long currentPartitionFetchDuration = 0;
   private long currentPartitionDecodeRecordTime = 0;
   private long currentPartitionFetchMessageBufferTime = 0;
   private long currentPartitionReadRecordTime = 0;
+  protected D currentPartitionLastSuccessfulRecord = null;
 
   public KafkaExtractor(WorkUnitState state) {
     super(state);
@@ -130,16 +131,17 @@ public abstract class KafkaExtractor<S, D> extends EventBasedExtractor<S, D> {
       throw new RuntimeException(e);
     }
 
-    this.stopwatch = Stopwatch.createUnstarted();
-
     this.decodingErrorCount = Maps.newHashMap();
     this.avgMillisPerRecord = Maps.newHashMapWithExpectedSize(this.partitions.size());
     this.avgRecordSizes = Maps.newHashMapWithExpectedSize(this.partitions.size());
     this.elapsedTime = Maps.newHashMapWithExpectedSize(this.partitions.size());
     this.processedRecordCount = Maps.newHashMapWithExpectedSize(this.partitions.size());
+    this.partitionTotalSize = Maps.newHashMapWithExpectedSize(this.partitions.size());
     this.decodeRecordTime = Maps.newHashMapWithExpectedSize(this.partitions.size());
     this.fetchMessageBufferTime = Maps.newHashMapWithExpectedSize(this.partitions.size());
     this.readRecordTime = Maps.newHashMapWithExpectedSize(this.partitions.size());
+    this.startFetchEpochTime = Maps.newHashMapWithExpectedSize(this.partitions.size());
+    this.stopFetchEpochTime= Maps.newHashMapWithExpectedSize(this.partitions.size());
 
     this.errorPartitions = Sets.newHashSet();
 
@@ -224,6 +226,7 @@ public abstract class KafkaExtractor<S, D> extends EventBasedExtractor<S, D> {
           this.currentPartitionRecordCount++;
           this.currentPartitionTotalSize += nextValidMessage.getValueSizeInBytes();
           this.currentPartitionReadRecordTime += System.nanoTime() - readStartTime;
+          this.currentPartitionLastSuccessfulRecord = record;
           return record;
         } catch (Throwable t) {
           this.errorPartitions.add(this.currentPartitionIdx);
@@ -268,10 +271,11 @@ public abstract class KafkaExtractor<S, D> extends EventBasedExtractor<S, D> {
       updateStatisticsForCurrentPartition();
       this.currentPartitionIdx++;
       this.currentPartitionRecordCount = 0;
-      this.currentPartitionFetchDuration = 0;
+      this.currentPartitionTotalSize = 0;
       this.currentPartitionDecodeRecordTime = 0;
       this.currentPartitionFetchMessageBufferTime = 0;
       this.currentPartitionReadRecordTime = 0;
+      this.currentPartitionLastSuccessfulRecord = null;
     }
 
     this.messageIterator = null;
@@ -281,29 +285,36 @@ public abstract class KafkaExtractor<S, D> extends EventBasedExtractor<S, D> {
           this.highWatermark.get(this.currentPartitionIdx) - this.nextWatermark.get(this.currentPartitionIdx)));
       switchMetricContextToCurrentPartition();
     }
-    this.stopwatch.start();
+
+    if (!allPartitionsFinished()) {
+      this.startFetchEpochTime.put(this.getCurrentPartition(), System.currentTimeMillis());
+    }
   }
 
-  private void updateStatisticsForCurrentPartition() {
-    this.stopwatch.stop();
+  protected void updateStatisticsForCurrentPartition() {
+    long stopFetchEpochTime = System.currentTimeMillis();
+
+    if (!allPartitionsFinished()) {
+      this.stopFetchEpochTime.put(this.getCurrentPartition(), stopFetchEpochTime);
+    }
 
     if (this.currentPartitionRecordCount != 0) {
-      this.currentPartitionFetchDuration = this.stopwatch.elapsed(TimeUnit.MILLISECONDS);
+      long currentPartitionFetchDuration =
+          stopFetchEpochTime - this.startFetchEpochTime.get(this.getCurrentPartition());
       double avgMillisForCurrentPartition =
-          (double) this.currentPartitionFetchDuration / (double) this.currentPartitionRecordCount;
+          (double) currentPartitionFetchDuration / (double) this.currentPartitionRecordCount;
       this.avgMillisPerRecord.put(this.getCurrentPartition(), avgMillisForCurrentPartition);
 
       long avgRecordSize = this.currentPartitionTotalSize / this.currentPartitionRecordCount;
       this.avgRecordSizes.put(this.getCurrentPartition(), avgRecordSize);
 
-      this.elapsedTime.put(this.getCurrentPartition(), this.currentPartitionFetchDuration);
+      this.elapsedTime.put(this.getCurrentPartition(), currentPartitionFetchDuration);
       this.processedRecordCount.put(this.getCurrentPartition(), this.currentPartitionRecordCount);
+      this.partitionTotalSize.put(this.getCurrentPartition(), this.currentPartitionTotalSize);
       this.decodeRecordTime.put(this.getCurrentPartition(), this.currentPartitionDecodeRecordTime);
       this.fetchMessageBufferTime.put(this.getCurrentPartition(), this.currentPartitionFetchMessageBufferTime);
       this.readRecordTime.put(this.getCurrentPartition(), this.currentPartitionReadRecordTime);
     }
-
-    this.stopwatch.reset();
   }
 
   private void switchMetricContextToCurrentPartition() {
@@ -367,65 +378,13 @@ public abstract class KafkaExtractor<S, D> extends EventBasedExtractor<S, D> {
     this.workUnitState.setProp(ConfigurationKeys.ERROR_PARTITION_COUNT, this.errorPartitions.size());
     this.workUnitState.setProp(ConfigurationKeys.ERROR_MESSAGE_UNDECODABLE_COUNT, this.undecodableMessageCount);
 
-    // Commit actual high watermark for each partition
     for (int i = 0; i < this.partitions.size(); i++) {
       LOG.info(String.format("Actual high watermark for partition %s=%d, expected=%d", this.partitions.get(i),
           this.nextWatermark.get(i), this.highWatermark.get(i)));
-
-      Map<String, String> tagsForPartition = Maps.newHashMap();
-      KafkaPartition partition = this.partitions.get(i);
-      tagsForPartition.put(TOPIC, partition.getTopicName());
-      tagsForPartition.put(PARTITION, Integer.toString(partition.getId()));
-      tagsForPartition.put(LOW_WATERMARK, Long.toString(this.lowWatermark.get(i)));
-      tagsForPartition.put(ACTUAL_HIGH_WATERMARK, Long.toString(this.nextWatermark.get(i)));
-      // These are used to compute the load factor,
-      // gobblin consumption rate relative to the kafka production rate.
-      // The gobblin rate is computed as (processed record count/elapsed time)
-      // The kafka rate is computed as (expected high watermark - previous latest offset) /
-      // (current offset fetch epoch time - previous offset fetch epoch time).
-      tagsForPartition.put(EXPECTED_HIGH_WATERMARK, Long.toString(this.highWatermark.get(i)));
-      tagsForPartition.put(KafkaSource.PREVIOUS_OFFSET_FETCH_EPOCH_TIME,
-          this.workUnitState.getProp(KafkaUtils.getPartitionPropName(KafkaSource.PREVIOUS_OFFSET_FETCH_EPOCH_TIME,
-              i)));
-      tagsForPartition.put(KafkaSource.OFFSET_FETCH_EPOCH_TIME,
-          this.workUnitState.getProp(KafkaUtils.getPartitionPropName(KafkaSource.OFFSET_FETCH_EPOCH_TIME, i)));
-      tagsForPartition.put(KafkaSource.PREVIOUS_LATEST_OFFSET,
-          this.workUnitState.getProp(KafkaUtils.getPartitionPropName(KafkaSource.PREVIOUS_LATEST_OFFSET, i)));
-
-      if (this.processedRecordCount.containsKey(partition)) {
-        tagsForPartition.put(PROCESSED_RECORD_COUNT, Long.toString(this.processedRecordCount.get(partition)));
-        tagsForPartition.put(ELAPSED_TIME, Long.toString(this.elapsedTime.get(partition)));
-        tagsForPartition.put(DECODE_RECORD_TIME, Long.toString(TimeUnit.NANOSECONDS.toMillis(
-            this.decodeRecordTime.get(partition))));
-        tagsForPartition.put(FETCH_MESSAGE_BUFFER_TIME, Long.toString(TimeUnit.NANOSECONDS.toMillis(
-            this.fetchMessageBufferTime.get(partition))));
-        tagsForPartition.put(READ_RECORD_TIME, Long.toString(TimeUnit.NANOSECONDS.toMillis(
-            this.readRecordTime.get(partition))));
-      } else {
-        tagsForPartition.put(PROCESSED_RECORD_COUNT, "0");
-        tagsForPartition.put(ELAPSED_TIME, "0");
-        tagsForPartition.put(DECODE_RECORD_TIME, "0");
-        tagsForPartition.put(FETCH_MESSAGE_BUFFER_TIME, "0");
-        tagsForPartition.put(READ_RECORD_TIME, "0");
-      }
-
-      tagsForPartitionsMap.put(partition, tagsForPartition);
+      tagsForPartitionsMap.put(this.partitions.get(i), createTagsForPartition(i));
     }
     this.workUnitState.setActualHighWatermark(this.nextWatermark);
 
-    // Commit avg time to pull a record for each partition
-    for (KafkaPartition partition : this.partitions) {
-      if (this.avgMillisPerRecord.containsKey(partition)) {
-        double avgMillis = this.avgMillisPerRecord.get(partition);
-        LOG.info(String.format("Avg time to pull a record for partition %s = %f milliseconds", partition, avgMillis));
-        KafkaUtils.setPartitionAvgRecordMillis(this.workUnitState, partition, avgMillis);
-        tagsForPartitionsMap.get(partition).put(AVG_RECORD_PULL_TIME, Double.toString(avgMillis));
-      } else {
-        LOG.info(String.format("Avg time to pull a record for partition %s not recorded", partition));
-        tagsForPartitionsMap.get(partition).put(AVG_RECORD_PULL_TIME, Double.toString(-1));
-      }
-    }
-
     if (isInstrumentationEnabled()) {
       for (Map.Entry<KafkaPartition, Map<String, String>> eventTags : tagsForPartitionsMap.entrySet()) {
         new EventSubmitter.Builder(getMetricContext(), GOBBLIN_KAFKA_NAMESPACE).build()
@@ -436,6 +395,84 @@ public abstract class KafkaExtractor<S, D> extends EventBasedExtractor<S, D> {
     this.closer.close();
   }
 
+  protected Map<String, String> createTagsForPartition(int partitionId) {
+    Map<String, String> tagsForPartition = Maps.newHashMap();
+    KafkaPartition partition = this.partitions.get(partitionId);
+
+    tagsForPartition.put(TOPIC, partition.getTopicName());
+    tagsForPartition.put(PARTITION, Integer.toString(partition.getId()));
+    tagsForPartition.put(LOW_WATERMARK, Long.toString(this.lowWatermark.get(partitionId)));
+    tagsForPartition.put(ACTUAL_HIGH_WATERMARK, Long.toString(this.nextWatermark.get(partitionId)));
+
+    // These are used to compute the load factor,
+    // gobblin consumption rate relative to the kafka production rate.
+    // The gobblin rate is computed as (processed record count/elapsed time)
+    // The kafka rate is computed as (expected high watermark - previous latest offset) /
+    // (current offset fetch epoch time - previous offset fetch epoch time).
+    tagsForPartition.put(EXPECTED_HIGH_WATERMARK, Long.toString(this.highWatermark.get(partitionId)));
+    tagsForPartition.put(KafkaSource.PREVIOUS_OFFSET_FETCH_EPOCH_TIME,
+        Long.toString(KafkaUtils.getPropAsLongFromSingleOrMultiWorkUnitState(this.workUnitState,
+            KafkaSource.PREVIOUS_OFFSET_FETCH_EPOCH_TIME, partitionId)));
+    tagsForPartition.put(KafkaSource.OFFSET_FETCH_EPOCH_TIME,
+        Long.toString(KafkaUtils.getPropAsLongFromSingleOrMultiWorkUnitState(this.workUnitState,
+            KafkaSource.OFFSET_FETCH_EPOCH_TIME, partitionId)));
+    tagsForPartition.put(KafkaSource.PREVIOUS_LATEST_OFFSET,
+        Long.toString(KafkaUtils.getPropAsLongFromSingleOrMultiWorkUnitState(this.workUnitState,
+            KafkaSource.PREVIOUS_LATEST_OFFSET, partitionId)));
+
+    tagsForPartition.put(KafkaSource.PREVIOUS_LOW_WATERMARK,
+        Long.toString(KafkaUtils.getPropAsLongFromSingleOrMultiWorkUnitState(this.workUnitState,
+            KafkaSource.PREVIOUS_LOW_WATERMARK, partitionId)));
+    tagsForPartition.put(KafkaSource.PREVIOUS_HIGH_WATERMARK,
+        Long.toString(KafkaUtils.getPropAsLongFromSingleOrMultiWorkUnitState(this.workUnitState,
+            KafkaSource.PREVIOUS_HIGH_WATERMARK, partitionId)));
+    tagsForPartition.put(KafkaSource.PREVIOUS_START_FETCH_EPOCH_TIME,
+        Long.toString(KafkaUtils.getPropAsLongFromSingleOrMultiWorkUnitState(this.workUnitState,
+            KafkaSource.PREVIOUS_START_FETCH_EPOCH_TIME, partitionId)));
+    tagsForPartition.put(KafkaSource.PREVIOUS_STOP_FETCH_EPOCH_TIME,
+        Long.toString(KafkaUtils.getPropAsLongFromSingleOrMultiWorkUnitState(this.workUnitState,
+            KafkaSource.PREVIOUS_STOP_FETCH_EPOCH_TIME, partitionId)));
+
+    tagsForPartition.put(KafkaSource.START_FETCH_EPOCH_TIME, Long.toString(this.startFetchEpochTime.get(partition)));
+    tagsForPartition.put(KafkaSource.STOP_FETCH_EPOCH_TIME, Long.toString(this.stopFetchEpochTime.get(partition)));
+    this.workUnitState.setProp(KafkaUtils.getPartitionPropName(KafkaSource.START_FETCH_EPOCH_TIME, partitionId),
+        Long.toString(this.startFetchEpochTime.get(partition)));
+    this.workUnitState.setProp(KafkaUtils.getPartitionPropName(KafkaSource.STOP_FETCH_EPOCH_TIME, partitionId),
+        Long.toString(this.stopFetchEpochTime.get(partition)));
+
+    if (this.processedRecordCount.containsKey(partition)) {
+      tagsForPartition.put(PROCESSED_RECORD_COUNT, Long.toString(this.processedRecordCount.get(partition)));
+      tagsForPartition.put(PARTITION_TOTAL_SIZE, Long.toString(this.partitionTotalSize.get(partition)));
+      tagsForPartition.put(ELAPSED_TIME, Long.toString(this.elapsedTime.get(partition)));
+      tagsForPartition.put(DECODE_RECORD_TIME, Long.toString(TimeUnit.NANOSECONDS.toMillis(
+          this.decodeRecordTime.get(partition))));
+      tagsForPartition.put(FETCH_MESSAGE_BUFFER_TIME, Long.toString(TimeUnit.NANOSECONDS.toMillis(
+          this.fetchMessageBufferTime.get(partition))));
+      tagsForPartition.put(READ_RECORD_TIME, Long.toString(TimeUnit.NANOSECONDS.toMillis(
+          this.readRecordTime.get(partition))));
+    } else {
+      tagsForPartition.put(PROCESSED_RECORD_COUNT, "0");
+      tagsForPartition.put(PARTITION_TOTAL_SIZE, "0");
+      tagsForPartition.put(ELAPSED_TIME, "0");
+      tagsForPartition.put(DECODE_RECORD_TIME, "0");
+      tagsForPartition.put(FETCH_MESSAGE_BUFFER_TIME, "0");
+      tagsForPartition.put(READ_RECORD_TIME, "0");
+    }
+
+    // Commit avg time to pull a record for each partition
+    if (this.avgMillisPerRecord.containsKey(partition)) {
+      double avgMillis = this.avgMillisPerRecord.get(partition);
+      LOG.info(String.format("Avg time to pull a record for partition %s = %f milliseconds", partition, avgMillis));
+      KafkaUtils.setPartitionAvgRecordMillis(this.workUnitState, partition, avgMillis);
+      tagsForPartition.put(AVG_RECORD_PULL_TIME, Double.toString(avgMillis));
+    } else {
+      LOG.info(String.format("Avg time to pull a record for partition %s not recorded", partition));
+      tagsForPartition.put(AVG_RECORD_PULL_TIME, Double.toString(-1));
+    }
+
+    return tagsForPartition;
+  }
+
   @Deprecated
   @Override
   public long getHighWatermark() {

http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/fcc4d412/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/source/extractor/extract/kafka/KafkaSource.java
----------------------------------------------------------------------
diff --git a/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/source/extractor/extract/kafka/KafkaSource.java b/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/source/extractor/extract/kafka/KafkaSource.java
index b96412c..e7d7da0 100644
--- a/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/source/extractor/extract/kafka/KafkaSource.java
+++ b/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/source/extractor/extract/kafka/KafkaSource.java
@@ -104,6 +104,12 @@ public abstract class KafkaSource<S, D> extends EventBasedSource<S, D> {
   public static final String ALL_TOPICS = "all";
   public static final String AVG_RECORD_SIZE = "avg.record.size";
   public static final String AVG_RECORD_MILLIS = "avg.record.millis";
+  public static final String START_FETCH_EPOCH_TIME = "startFetchEpochTime";
+  public static final String STOP_FETCH_EPOCH_TIME = "stopFetchEpochTime";
+  public static final String PREVIOUS_START_FETCH_EPOCH_TIME = "previousStartFetchEpochTime";
+  public static final String PREVIOUS_STOP_FETCH_EPOCH_TIME = "previousStopFetchEpochTime";
+  public static final String PREVIOUS_LOW_WATERMARK = "previousLowWatermark";
+  public static final String PREVIOUS_HIGH_WATERMARK = "previousHighWatermark";
   public static final String PREVIOUS_LATEST_OFFSET = "previousLatestOffset";
   public static final String OFFSET_FETCH_EPOCH_TIME = "offsetFetchEpochTime";
   public static final String PREVIOUS_OFFSET_FETCH_EPOCH_TIME = "previousOffsetFetchEpochTime";
@@ -119,8 +125,11 @@ public abstract class KafkaSource<S, D> extends EventBasedSource<S, D> {
 
   private final Set<String> moveToLatestTopics = Sets.newTreeSet(String.CASE_INSENSITIVE_ORDER);
   private final Map<KafkaPartition, Long> previousOffsets = Maps.newConcurrentMap();
+  private final Map<KafkaPartition, Long> previousLowWatermarks = Maps.newConcurrentMap();
   private final Map<KafkaPartition, Long> previousExpectedHighWatermarks = Maps.newConcurrentMap();
   private final Map<KafkaPartition, Long> previousOffsetFetchEpochTimes = Maps.newConcurrentMap();
+  private final Map<KafkaPartition, Long> previousStartFetchEpochTimes = Maps.newConcurrentMap();
+  private final Map<KafkaPartition, Long> previousStopFetchEpochTimes = Maps.newConcurrentMap();
 
   private final Set<KafkaPartition> partitionsToBeProcessed = Sets.newConcurrentHashSet();
 
@@ -386,6 +395,10 @@ public abstract class KafkaSource<S, D> extends EventBasedSource<S, D> {
     boolean previousOffsetNotFound = false;
     try {
       previousOffset = getPreviousOffsetForPartition(partition, state);
+      offsets.setPreviousEndOffset(previousOffset);
+      offsets.setPreviousStartOffset(getPreviousLowWatermark(partition, state));
+      offsets.setPreviousStartFetchEpochTime(getPreviousStartFetchEpochTimeForPartition(partition, state));
+      offsets.setPreviousStopFetchEpochTime(getPreviousStopFetchEpochTimeForPartition(partition, state));
       offsets.setPreviousLatestOffset(getPreviousExpectedHighWatermark(partition, state));
       previousOffsetFetchEpochTime = getPreviousOffsetFetchEpochTimeForPartition(partition, state);
       offsets.setPreviousOffsetFetchEpochTime(previousOffsetFetchEpochTime);
@@ -464,6 +477,18 @@ public abstract class KafkaSource<S, D> extends EventBasedSource<S, D> {
     return getWorkUnitForTopicPartition(partition, offsets, topicSpecificState);
   }
 
+  private long getPreviousStartFetchEpochTimeForPartition(KafkaPartition partition, SourceState state) {
+    getAllPreviousOffsetState(state);
+    return this.previousStartFetchEpochTimes.containsKey(partition) ?
+        this.previousStartFetchEpochTimes.get(partition) : 0;
+  }
+
+  private long getPreviousStopFetchEpochTimeForPartition(KafkaPartition partition, SourceState state) {
+    getAllPreviousOffsetState(state);
+    return this.previousStopFetchEpochTimes.containsKey(partition) ?
+        this.previousStopFetchEpochTimes.get(partition) : 0;
+  }
+
   private long getPreviousOffsetFetchEpochTimeForPartition(KafkaPartition partition, SourceState state)
       throws PreviousOffsetNotFoundException {
 
@@ -503,6 +528,19 @@ public abstract class KafkaSource<S, D> extends EventBasedSource<S, D> {
             partition.getId()));
   }
 
+  private long getPreviousLowWatermark(KafkaPartition partition, SourceState state)
+      throws PreviousOffsetNotFoundException {
+
+    getAllPreviousOffsetState(state);
+
+    if (this.previousLowWatermarks.containsKey(partition)) {
+      return this.previousLowWatermarks.get(partition);
+    }
+    throw new PreviousOffsetNotFoundException(String
+        .format("Previous low watermark for topic %s, partition %s not found.", partition.getTopicName(),
+            partition.getId()));
+  }
+
   // need to be synchronized as this.previousOffsets, this.previousExpectedHighWatermarks, and
   // this.previousOffsetFetchEpochTimes need to be initialized once
   private synchronized void getAllPreviousOffsetState(SourceState state) {
@@ -510,8 +548,11 @@ public abstract class KafkaSource<S, D> extends EventBasedSource<S, D> {
       return;
     }
     this.previousOffsets.clear();
+    this.previousLowWatermarks.clear();
     this.previousExpectedHighWatermarks.clear();
     this.previousOffsetFetchEpochTimes.clear();
+    this.previousStartFetchEpochTimes.clear();
+    this.previousStopFetchEpochTimes.clear();
     Map<String, Iterable<WorkUnitState>> workUnitStatesByDatasetUrns = state.getPreviousWorkUnitStatesByDatasetUrns();
 
     if (!workUnitStatesByDatasetUrns.isEmpty() &&
@@ -522,9 +563,11 @@ public abstract class KafkaSource<S, D> extends EventBasedSource<S, D> {
 
     for (WorkUnitState workUnitState : state.getPreviousWorkUnitStates()) {
       List<KafkaPartition> partitions = KafkaUtils.getPartitions(workUnitState);
+      WorkUnit workUnit = workUnitState.getWorkunit();
+
       MultiLongWatermark watermark = workUnitState.getActualHighWatermark(MultiLongWatermark.class);
-      MultiLongWatermark previousExpectedHighWatermark =
-          workUnitState.getWorkunit().getExpectedHighWatermark(MultiLongWatermark.class);
+      MultiLongWatermark previousLowWatermark = workUnit.getLowWatermark(MultiLongWatermark.class);
+      MultiLongWatermark previousExpectedHighWatermark = workUnit.getExpectedHighWatermark(MultiLongWatermark.class);
       Preconditions.checkArgument(partitions.size() == watermark.size(), String
           .format("Num of partitions doesn't match number of watermarks: partitions=%s, watermarks=%s", partitions,
               watermark));
@@ -536,13 +579,22 @@ public abstract class KafkaSource<S, D> extends EventBasedSource<S, D> {
           this.previousOffsets.put(partition, watermark.get(i));
         }
 
+        if (previousLowWatermark.get(i) != ConfigurationKeys.DEFAULT_WATERMARK_VALUE) {
+          this.previousLowWatermarks.put(partition, previousLowWatermark.get(i));
+        }
+
         if (previousExpectedHighWatermark.get(i) != ConfigurationKeys.DEFAULT_WATERMARK_VALUE) {
           this.previousExpectedHighWatermarks.put(partition, previousExpectedHighWatermark.get(i));
         }
 
         this.previousOffsetFetchEpochTimes.put(partition,
-          Long.valueOf(workUnitState.getProp(KafkaUtils.getPartitionPropName(KafkaSource.OFFSET_FETCH_EPOCH_TIME, i),
-              "0")));
+            KafkaUtils.getPropAsLongFromSingleOrMultiWorkUnitState(workUnitState, OFFSET_FETCH_EPOCH_TIME, i));
+
+        this.previousStartFetchEpochTimes.put(partition,
+            KafkaUtils.getPropAsLongFromSingleOrMultiWorkUnitState(workUnitState, START_FETCH_EPOCH_TIME, i));
+
+        this.previousStopFetchEpochTimes.put(partition,
+            KafkaUtils.getPropAsLongFromSingleOrMultiWorkUnitState(workUnitState, STOP_FETCH_EPOCH_TIME, i));
       }
     }
 
@@ -608,6 +660,10 @@ public abstract class KafkaSource<S, D> extends EventBasedSource<S, D> {
     workUnit.setProp(LEADER_HOSTANDPORT, partition.getLeader().getHostAndPort().toString());
     workUnit.setProp(ConfigurationKeys.WORK_UNIT_LOW_WATER_MARK_KEY, offsets.getStartOffset());
     workUnit.setProp(ConfigurationKeys.WORK_UNIT_HIGH_WATER_MARK_KEY, offsets.getLatestOffset());
+    workUnit.setProp(PREVIOUS_START_FETCH_EPOCH_TIME, offsets.getPreviousStartFetchEpochTime());
+    workUnit.setProp(PREVIOUS_STOP_FETCH_EPOCH_TIME, offsets.getPreviousStopFetchEpochTime());
+    workUnit.setProp(PREVIOUS_LOW_WATERMARK, offsets.getPreviousStartOffset());
+    workUnit.setProp(PREVIOUS_HIGH_WATERMARK, offsets.getPreviousEndOffset());
     workUnit.setProp(PREVIOUS_OFFSET_FETCH_EPOCH_TIME, offsets.getPreviousOffsetFetchEpochTime());
     workUnit.setProp(OFFSET_FETCH_EPOCH_TIME, offsets.getOffsetFetchEpochTime());
     workUnit.setProp(PREVIOUS_LATEST_OFFSET, offsets.getPreviousLatestOffset());
@@ -679,6 +735,24 @@ public abstract class KafkaSource<S, D> extends EventBasedSource<S, D> {
     @Setter
     private long previousLatestOffset = 0;
 
+    // previous low watermark
+    @Getter
+    @Setter
+    private long previousStartOffset = 0;
+
+    // previous actual high watermark
+    @Getter
+    @Setter
+    private long previousEndOffset = 0;
+
+    @Getter
+    @Setter
+    private long previousStartFetchEpochTime = 0;
+
+    @Getter
+    @Setter
+    private long previousStopFetchEpochTime = 0;
+
     private void startAt(long offset)
         throws StartOffsetOutOfRangeException {
       if (offset < this.earliestOffset || offset > this.latestOffset) {

http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/fcc4d412/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/source/extractor/extract/kafka/KafkaUtils.java
----------------------------------------------------------------------
diff --git a/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/source/extractor/extract/kafka/KafkaUtils.java b/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/source/extractor/extract/kafka/KafkaUtils.java
index 55ecab4..5472134 100644
--- a/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/source/extractor/extract/kafka/KafkaUtils.java
+++ b/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/source/extractor/extract/kafka/KafkaUtils.java
@@ -19,6 +19,7 @@ package org.apache.gobblin.source.extractor.extract.kafka;
 
 import org.apache.gobblin.configuration.ConfigurationKeys;
 import org.apache.gobblin.configuration.State;
+import org.apache.gobblin.configuration.WorkUnitState;
 
 import java.util.List;
 
@@ -168,4 +169,17 @@ public class KafkaUtils {
         getPartitionPropName(partition.getTopicName(), partition.getId()) + "." + KafkaSource.AVG_RECORD_MILLIS,
         millis);
   }
+
+  /**
+   * Get a property as long from a work unit that may or may not be a multiworkunit.
+   * This method is needed because the SingleLevelWorkUnitPacker does not squeeze work units
+   * into a multiworkunit, and thus does not append the partitionId to property keys, while
+   * the BiLevelWorkUnitPacker does.
+   * Return 0 as default if key not found in either form.
+   */
+  public static long getPropAsLongFromSingleOrMultiWorkUnitState(WorkUnitState workUnitState,
+                                                                 String key, int partitionId) {
+    return Long.parseLong(workUnitState.contains(key) ? workUnitState.getProp(key)
+        : workUnitState.getProp(KafkaUtils.getPartitionPropName(key, partitionId), "0"));
+  }
 }

http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/fcc4d412/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/source/extractor/extract/kafka/workunit/packer/KafkaWorkUnitPacker.java
----------------------------------------------------------------------
diff --git a/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/source/extractor/extract/kafka/workunit/packer/KafkaWorkUnitPacker.java b/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/source/extractor/extract/kafka/workunit/packer/KafkaWorkUnitPacker.java
index 0d93796..fef3219 100644
--- a/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/source/extractor/extract/kafka/workunit/packer/KafkaWorkUnitPacker.java
+++ b/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/source/extractor/extract/kafka/workunit/packer/KafkaWorkUnitPacker.java
@@ -225,6 +225,14 @@ public abstract class KafkaWorkUnitPacker {
     // (current latest offset - previous latest offset)/(current epoch time - previous epoch time).
     int index = 0;
     for (WorkUnit wu : multiWorkUnit.getWorkUnits()) {
+      workUnit.setProp(KafkaUtils.getPartitionPropName(KafkaSource.PREVIOUS_START_FETCH_EPOCH_TIME, index),
+          wu.getProp(KafkaSource.PREVIOUS_START_FETCH_EPOCH_TIME));
+      workUnit.setProp(KafkaUtils.getPartitionPropName(KafkaSource.PREVIOUS_STOP_FETCH_EPOCH_TIME, index),
+          wu.getProp(KafkaSource.PREVIOUS_STOP_FETCH_EPOCH_TIME));
+      workUnit.setProp(KafkaUtils.getPartitionPropName(KafkaSource.PREVIOUS_LOW_WATERMARK, index),
+          wu.getProp(KafkaSource.PREVIOUS_LOW_WATERMARK));
+      workUnit.setProp(KafkaUtils.getPartitionPropName(KafkaSource.PREVIOUS_HIGH_WATERMARK, index),
+          wu.getProp(KafkaSource.PREVIOUS_HIGH_WATERMARK));
       workUnit.setProp(KafkaUtils.getPartitionPropName(KafkaSource.PREVIOUS_OFFSET_FETCH_EPOCH_TIME, index),
           wu.getProp(KafkaSource.PREVIOUS_OFFSET_FETCH_EPOCH_TIME));
       workUnit.setProp(KafkaUtils.getPartitionPropName(KafkaSource.OFFSET_FETCH_EPOCH_TIME, index),
@@ -233,6 +241,10 @@ public abstract class KafkaWorkUnitPacker {
           wu.getProp(KafkaSource.PREVIOUS_LATEST_OFFSET));
       index++;
     }
+    workUnit.removeProp(KafkaSource.PREVIOUS_START_FETCH_EPOCH_TIME);
+    workUnit.removeProp(KafkaSource.PREVIOUS_STOP_FETCH_EPOCH_TIME);
+    workUnit.removeProp(KafkaSource.PREVIOUS_LOW_WATERMARK);
+    workUnit.removeProp(KafkaSource.PREVIOUS_HIGH_WATERMARK);
     workUnit.removeProp(KafkaSource.PREVIOUS_OFFSET_FETCH_EPOCH_TIME);
     workUnit.removeProp(KafkaSource.OFFSET_FETCH_EPOCH_TIME);
     workUnit.removeProp(KafkaSource.PREVIOUS_LATEST_OFFSET);