You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@gobblin.apache.org by su...@apache.org on 2020/03/18 04:37:19 UTC

[incubator-gobblin] branch master updated: [GOBBLIN-1087] Track and report histogram of observed lag from Gobblin…

This is an automated email from the ASF dual-hosted git repository.

suvasude pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-gobblin.git


The following commit(s) were added to refs/heads/master by this push:
     new 5e742cf  [GOBBLIN-1087] Track and report histogram of observed lag from Gobblin…
5e742cf is described below

commit 5e742cfd84d01d368b46c5db16688b870c389871
Author: sv2000 <su...@gmail.com>
AuthorDate: Tue Mar 17 21:37:13 2020 -0700

    [GOBBLIN-1087] Track and report histogram of observed lag from Gobblin…
    
    Closes #2928 from sv2000/observedLag
---
 gobblin-core-base/build.gradle                     |   3 +-
 gobblin-modules/gobblin-kafka-common/build.gradle  |   9 ++
 .../kafka/HdrHistogramPerformanceBenchmark.java    | 154 +++++++++++++++++++++
 .../gobblin/kafka/client/KafkaConsumerRecord.java  |  17 +--
 .../extractor/extract/kafka/KafkaExtractor.java    |  10 +-
 .../extract/kafka/KafkaExtractorStatsTracker.java  |  89 +++++++++++-
 .../extractor/extract/kafka/KafkaSource.java       |  20 ++-
 .../kafka/KafkaExtractorStatsTrackerTest.java      |  58 ++++++--
 gobblin-runtime/build.gradle                       |   3 +-
 gradle/scripts/dependencyDefinitions.gradle        |   3 +
 10 files changed, 339 insertions(+), 27 deletions(-)

diff --git a/gobblin-core-base/build.gradle b/gobblin-core-base/build.gradle
index 80b0f67..11ca360 100644
--- a/gobblin-core-base/build.gradle
+++ b/gobblin-core-base/build.gradle
@@ -38,8 +38,7 @@ dependencies {
 
   testCompile externalDependency.testng
   testCompile externalDependency.mockito
-
-  jmh 'org.openjdk.jmh:jmh-core:1.17.3'
+  testCompile externalDependency.jmh
 }
 
 test {
diff --git a/gobblin-modules/gobblin-kafka-common/build.gradle b/gobblin-modules/gobblin-kafka-common/build.gradle
index d38d7d3..dba07a7 100644
--- a/gobblin-modules/gobblin-kafka-common/build.gradle
+++ b/gobblin-modules/gobblin-kafka-common/build.gradle
@@ -16,6 +16,7 @@
  */
 
 apply plugin: 'java'
+apply plugin: 'me.champeau.gradle.jmh'
 
 dependencies {
   compile project(":gobblin-api")
@@ -33,6 +34,7 @@ dependencies {
   compile externalDependency.commonsPool
   compile externalDependency.guava
   compile externalDependency.gson
+  compile externalDependency.hdrHistogram
   compile externalDependency.jacksonCore
   compile externalDependency.jacksonMapper
   compile externalDependency.slf4j
@@ -44,6 +46,7 @@ dependencies {
   testCompile project(":gobblin-test-utils")
   testCompile externalDependency.mockito
   testCompile externalDependency.testng
+  testCompile externalDependency.jmh
 }
 
 configurations {
@@ -59,4 +62,10 @@ test {
   workingDir rootProject.rootDir
 }
 
+jmh {
+  include = ""
+  zip64 = true
+  duplicateClassesStrategy = "EXCLUDE"
+}
+
 ext.classification="library"
diff --git a/gobblin-modules/gobblin-kafka-common/src/jmh/java/org/apache/gobblin/source/extractor/extract/kafka/HdrHistogramPerformanceBenchmark.java b/gobblin-modules/gobblin-kafka-common/src/jmh/java/org/apache/gobblin/source/extractor/extract/kafka/HdrHistogramPerformanceBenchmark.java
new file mode 100644
index 0000000..23631f9
--- /dev/null
+++ b/gobblin-modules/gobblin-kafka-common/src/jmh/java/org/apache/gobblin/source/extractor/extract/kafka/HdrHistogramPerformanceBenchmark.java
@@ -0,0 +1,154 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.gobblin.source.extractor.extract.kafka;
+
+import java.util.concurrent.TimeUnit;
+import java.util.stream.IntStream;
+
+import org.HdrHistogram.Histogram;
+import org.apache.commons.math3.random.RandomDataGenerator;
+import org.openjdk.jmh.annotations.Benchmark;
+import org.openjdk.jmh.annotations.BenchmarkMode;
+import org.openjdk.jmh.annotations.Fork;
+import org.openjdk.jmh.annotations.Level;
+import org.openjdk.jmh.annotations.Measurement;
+import org.openjdk.jmh.annotations.Mode;
+import org.openjdk.jmh.annotations.OutputTimeUnit;
+import org.openjdk.jmh.annotations.Scope;
+import org.openjdk.jmh.annotations.Setup;
+import org.openjdk.jmh.annotations.State;
+import org.openjdk.jmh.annotations.TearDown;
+import org.openjdk.jmh.annotations.Warmup;
+import org.openjdk.jmh.infra.Blackhole;
+import org.openjdk.jmh.runner.Runner;
+import org.openjdk.jmh.runner.options.ChainedOptionsBuilder;
+import org.openjdk.jmh.runner.options.OptionsBuilder;
+
+import lombok.extern.slf4j.Slf4j;
+
+
+/**
+ * A micro-benchmark to measure the time taken to serialize a {@link Histogram} instance to its String representation. The
+ * benchmark uses a Random number generator to generate values according to a Uniform Distribution, an adversarial pattern
+ * for a Histogram that is likely to produce more count buckets in comparison with a skewed distribution. The benchmark
+ * provides an upper bound on memory footprint of the histogram, serialization time, as well as the size of the
+ * serialized representation.
+ */
+@Warmup (iterations = 3)
+@Measurement (iterations = 10)
+@BenchmarkMode (value = Mode.AverageTime)
+@Fork (value = 1)
+@OutputTimeUnit (TimeUnit.MILLISECONDS)
+@Slf4j
+public class HdrHistogramPerformanceBenchmark {
+
+  @State (value = Scope.Benchmark)
+  public static class HistogramState {
+    private static long MIN_VALUE = 1;
+    private static long MAX_VALUE = TimeUnit.HOURS.toMillis(24);
+
+    private Histogram histogram1;
+    private Histogram histogram2;
+    private Histogram histogram3;
+    private Histogram histogram4;
+
+    private final RandomDataGenerator random = new RandomDataGenerator();
+
+    @Setup (value = Level.Iteration)
+    public void setUp() {
+      this.histogram1 = buildHistogram(1000000);
+      this.histogram2 = buildHistogram(2000000);
+      this.histogram3 = buildHistogram(4000000);
+      this.histogram4 = buildHistogram(10000000);
+    }
+
+    private Histogram buildHistogram(int size) {
+      Histogram histogram = new Histogram(MIN_VALUE, MAX_VALUE, 3);
+      IntStream.range(0, size).mapToLong(i -> random.nextLong(MIN_VALUE, MAX_VALUE))
+          .forEachOrdered(histogram::recordValue);
+      System.out.println("Estimated memory footprint of histogram is: " + histogram.getEstimatedFootprintInBytes());
+      return histogram;
+    }
+
+    @TearDown (value = Level.Iteration)
+    public void tearDown() {
+      this.histogram1.reset();
+      this.histogram2.reset();
+      this.histogram3.reset();
+      this.histogram4.reset();
+    }
+  }
+
+  @Benchmark
+  public String trackHistogram1MToStringConversion(HistogramState histogramState) {
+    String histogramString = KafkaExtractorStatsTracker.convertHistogramToString(histogramState.histogram1);
+    System.out.println("Histogram serialized string size: " + histogramString.length());
+    return histogramString;
+  }
+
+  @Benchmark
+  public String trackHistogram2MToStringConversion(HistogramState histogramState) {
+    String histogramString = KafkaExtractorStatsTracker.convertHistogramToString(histogramState.histogram2);
+    System.out.println("Histogram serialized string size: " + histogramString.length());
+    return histogramString;
+  }
+
+  @Benchmark
+  public String trackHistogram4MToStringConversion(HistogramState histogramState) {
+    String histogramString = KafkaExtractorStatsTracker.convertHistogramToString(histogramState.histogram3);
+    System.out.println("Histogram serialized string size: " + histogramString.length());
+    return histogramString;
+  }
+
+  @Benchmark
+  public String trackHistogram10MToStringConversion(HistogramState histogramState) {
+    String histogramString = KafkaExtractorStatsTracker.convertHistogramToString(histogramState.histogram4);
+    System.out.println("Histogram serialized string size: " + histogramString.length());
+    return histogramString;
+  }
+
+  @Benchmark
+  public Histogram trackMergeHistogram(HistogramState histogramState) {
+    Histogram histogram = new Histogram(histogramState.MIN_VALUE, histogramState.MAX_VALUE, 3);
+    histogram.add(histogramState.histogram1);
+    histogram.add(histogramState.histogram2);
+    histogram.add(histogramState.histogram3);
+    histogram.add(histogramState.histogram4);
+    return histogram;
+  }
+
+  @Benchmark
+  public Histogram trackBuildHistogram(HistogramState histogramState) {
+    Histogram histogram = new Histogram(histogramState.MIN_VALUE, histogramState.MAX_VALUE, 3);
+    return histogram;
+  }
+
+  @Benchmark
+  public void trackResetHistogram(HistogramState histogramState, Blackhole blackhole) {
+    int dummyVal = 1;
+    histogramState.histogram4.reset();
+    blackhole.consume(dummyVal);
+  }
+
+  public static void main(String[] args) throws Exception {
+    ChainedOptionsBuilder opt = new OptionsBuilder()
+        .include(HdrHistogramPerformanceBenchmark.class.getSimpleName())
+        .warmupIterations(3)
+        .measurementIterations(10);
+    new Runner(opt.build()).run();
+  }
+}
diff --git a/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/kafka/client/KafkaConsumerRecord.java b/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/kafka/client/KafkaConsumerRecord.java
index cba9fa2..4808436 100644
--- a/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/kafka/client/KafkaConsumerRecord.java
+++ b/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/kafka/client/KafkaConsumerRecord.java
@@ -16,6 +16,9 @@
  */
 package org.apache.gobblin.kafka.client;
 
+import java.util.concurrent.TimeUnit;
+
+
 /**
  * A kafka message/record consumed from {@link GobblinKafkaConsumerClient}. This interface provides APIs to read message
  * metadata. Extension interfaces like {@link DecodeableKafkaRecord} or {@link ByteArrayBasedKafkaRecord} provide APIs
@@ -51,14 +54,6 @@ public interface KafkaConsumerRecord {
     return false;
   }
 
-  default boolean isTimestampCreateTime() {
-    return false;
-  }
-
-  default boolean isTimestampNone() {
-    return false;
-  }
-
   /**
    * @return Partition id for this record
    */
@@ -69,4 +64,10 @@ public interface KafkaConsumerRecord {
    */
   String getTopic();
 
+  /**
+   * @param fieldName the field name containing the record creation time.
+   * @param timeUnit the timeunit for the timestamp field.
+   * @return the record creation timestamp, if it is available. Defaults to 0.
+   */
+  public default long getRecordCreationTimestamp(String fieldName, TimeUnit timeUnit) { return 0; }
 }
diff --git a/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/source/extractor/extract/kafka/KafkaExtractor.java b/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/source/extractor/extract/kafka/KafkaExtractor.java
index ce0377a..4055fb0 100644
--- a/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/source/extractor/extract/kafka/KafkaExtractor.java
+++ b/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/source/extractor/extract/kafka/KafkaExtractor.java
@@ -20,6 +20,7 @@ package org.apache.gobblin.source.extractor.extract.kafka;
 import java.io.IOException;
 import java.util.Iterator;
 import java.util.List;
+import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicBoolean;
 
 import org.slf4j.Logger;
@@ -60,6 +61,8 @@ public abstract class KafkaExtractor<S, D> extends EventBasedExtractor<S, D> {
 
   private final ClassAliasResolver<GobblinKafkaConsumerClientFactory> kafkaConsumerClientResolver;
   private final AtomicBoolean shutdownRequested = new AtomicBoolean(false);
+  private final String recordCreationTimestampFieldName;
+  private final TimeUnit recordCreationTimestampUnit;
 
   private Iterator<KafkaConsumerRecord> messageIterator = null;
   @Getter
@@ -105,6 +108,9 @@ public abstract class KafkaExtractor<S, D> extends EventBasedExtractor<S, D> {
 
     // The actual high watermark starts with the low watermark
     this.workUnitState.setActualHighWatermark(this.lowWatermark);
+
+    this.recordCreationTimestampFieldName = this.workUnitState.getProp(KafkaSource.RECORD_CREATION_TIMESTAMP_FIELD, null);
+    this.recordCreationTimestampUnit = TimeUnit.valueOf(this.workUnitState.getProp(KafkaSource.RECORD_CREATION_TIMESTAMP_UNIT, TimeUnit.MILLISECONDS.name()));
   }
 
   @Override
@@ -177,7 +183,9 @@ public abstract class KafkaExtractor<S, D> extends EventBasedExtractor<S, D> {
           D record = decodeKafkaMessage(nextValidMessage);
 
           this.statsTracker.onDecodeableRecord(this.currentPartitionIdx, readStartTime, decodeStartTime,
-              nextValidMessage.getValueSizeInBytes(), nextValidMessage.isTimestampLogAppend() ? nextValidMessage.getTimestamp() : 0L);
+              nextValidMessage.getValueSizeInBytes(), nextValidMessage.isTimestampLogAppend() ? nextValidMessage.getTimestamp() : 0L,
+              (this.recordCreationTimestampFieldName != null) ? nextValidMessage
+                  .getRecordCreationTimestamp(this.recordCreationTimestampFieldName, this.recordCreationTimestampUnit) : 0L);
           this.currentPartitionLastSuccessfulRecord = record;
           return record;
         } catch (Throwable t) {
diff --git a/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/source/extractor/extract/kafka/KafkaExtractorStatsTracker.java b/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/source/extractor/extract/kafka/KafkaExtractorStatsTracker.java
index 6811fc2..a1f4520 100644
--- a/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/source/extractor/extract/kafka/KafkaExtractorStatsTracker.java
+++ b/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/source/extractor/extract/kafka/KafkaExtractorStatsTracker.java
@@ -16,12 +16,19 @@
  */
 package org.apache.gobblin.source.extractor.extract.kafka;
 
+import java.io.ByteArrayOutputStream;
+import java.io.PrintStream;
+import java.io.UnsupportedEncodingException;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
 import java.util.concurrent.TimeUnit;
 
+import org.HdrHistogram.Histogram;
+import org.HdrHistogram.HistogramLogWriter;
+
 import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Charsets;
 import com.google.common.collect.Maps;
 import com.google.common.collect.Sets;
 
@@ -46,6 +53,7 @@ public class KafkaExtractorStatsTracker {
   public static final String TOPIC = "topic";
   public static final String PARTITION = "partition";
 
+  private static final String EMPTY_STRING = "";
   private static final String GOBBLIN_KAFKA_NAMESPACE = "gobblin.kafka";
   private static final String KAFKA_EXTRACTOR_TOPIC_METADATA_EVENT_NAME = "KafkaExtractorTopicMetadata";
   private static final String LOW_WATERMARK = "lowWatermark";
@@ -64,32 +72,71 @@ public class KafkaExtractorStatsTracker {
   private static final String DECODE_RECORD_TIME = "decodeRecordTime";
   private static final String FETCH_MESSAGE_BUFFER_TIME = "fetchMessageBufferTime";
   private static final String LAST_RECORD_HEADER_TIMESTAMP = "lastRecordHeaderTimestamp";
+  private static final String OBSERVED_LATENCY_HISTOGRAM = "observedLatencyHistogram";
 
   @Getter
   private final Map<KafkaPartition, ExtractorStats> statsMap;
   private final Set<Integer> errorPartitions;
   private final WorkUnitState workUnitState;
   private final TaskEventMetadataGenerator taskEventMetadataGenerator;
+  @Getter
+  private final Histogram observedLatencyHistogram;
   private boolean isSlaConfigured;
   private long recordLevelSlaMillis;
+  //Minimum partition index processed by this task. Statistics that are aggregated across all partitions (e.g. observed latency histogram)
+  // processed by the task are reported against this partition index.
+  private int minPartitionIdx = Integer.MAX_VALUE;
 
   //A global count of number of undecodeable messages encountered by the KafkaExtractor across all Kafka
   //TopicPartitions.
   @Getter
   private int undecodableMessageCount = 0;
   private List<KafkaPartition> partitions;
+  private long maxPossibleLatency;
 
   public KafkaExtractorStatsTracker(WorkUnitState state, List<KafkaPartition> partitions) {
     this.workUnitState = state;
     this.partitions = partitions;
     this.statsMap = Maps.newHashMapWithExpectedSize(this.partitions.size());
-    this.partitions.forEach(partition -> this.statsMap.put(partition, new ExtractorStats()));
+    this.partitions.forEach(partition -> {
+      this.statsMap.put(partition, new ExtractorStats());
+      if (partition.getId() < minPartitionIdx) {
+        minPartitionIdx = partition.getId();
+      }
+    });
     this.errorPartitions = Sets.newHashSet();
     if (this.workUnitState.contains(KafkaSource.RECORD_LEVEL_SLA_MINUTES_KEY)) {
       this.isSlaConfigured = true;
       this.recordLevelSlaMillis = TimeUnit.MINUTES.toMillis(this.workUnitState.getPropAsLong(KafkaSource.RECORD_LEVEL_SLA_MINUTES_KEY));
     }
     this.taskEventMetadataGenerator = TaskEventMetadataUtils.getTaskEventMetadataGenerator(workUnitState);
+    if (state.getPropAsBoolean(KafkaSource.OBSERVED_LATENCY_MEASUREMENT_ENABLED, KafkaSource.DEFAULT_OBSERVED_LATENCY_MEASUREMENT_ENABLED)) {
+      this.observedLatencyHistogram = buildobservedLatencyHistogram(state);
+    } else {
+      this.observedLatencyHistogram = null;
+    }
+  }
+
+  /**
+   * A method that constructs a {@link Histogram} object based on a minimum value, a maximum value and precision in terms
+   * of number of significant digits. The returned {@link Histogram} is not an auto-resizing histogram and any outliers
+   * above the maximum possible value are discarded in favor of bounding the worst-case performance.
+   *
+   * @param state
+   * @return a non auto-resizing {@link Histogram} with a bounded range and precision.
+   */
+  private Histogram buildobservedLatencyHistogram(WorkUnitState state) {
+    this.maxPossibleLatency = TimeUnit.HOURS.toMillis(state.getPropAsInt(KafkaSource.MAX_POSSIBLE_OBSERVED_LATENCY_IN_HOURS,
+        KafkaSource.DEFAULT_MAX_POSSIBLE_OBSERVED_LATENCY_IN_HOURS));
+    int numSignificantDigits = state.getPropAsInt(KafkaSource.OBSERVED_LATENCY_PRECISION, KafkaSource.DEFAULT_OBSERVED_LATENCY_PRECISION);
+    if (numSignificantDigits > 5) {
+      log.warn("Max precision must be <= 5; Setting precision for observed latency to 5.");
+      numSignificantDigits = 5;
+    } else if (numSignificantDigits < 1) {
+      log.warn("Max precision must be >= 1; Setting precision to the default value of 3.");
+      numSignificantDigits = 3;
+    }
+    return new Histogram(1, maxPossibleLatency, numSignificantDigits);
   }
 
   public int getErrorPartitionCount() {
@@ -161,14 +208,24 @@ public class KafkaExtractorStatsTracker {
    * @param decodeStartTime the time instant immediately before a record decoding begins.
    * @param recordSizeInBytes the size of the decoded record in bytes.
    * @param logAppendTimestamp the log append time of the {@link org.apache.gobblin.kafka.client.KafkaConsumerRecord}.
+   * @param recordCreationTimestamp the time of the {@link org.apache.gobblin.kafka.client.KafkaConsumerRecord}.
    */
-  public void onDecodeableRecord(int partitionIdx, long readStartTime, long decodeStartTime, long recordSizeInBytes, long logAppendTimestamp) {
+  public void onDecodeableRecord(int partitionIdx, long readStartTime, long decodeStartTime, long recordSizeInBytes, long logAppendTimestamp, long recordCreationTimestamp) {
     this.statsMap.computeIfPresent(this.partitions.get(partitionIdx), (k, v) -> {
       long currentTime = System.nanoTime();
       v.processedRecordCount++;
       v.partitionTotalSize += recordSizeInBytes;
       v.decodeRecordTime += currentTime - decodeStartTime;
       v.readRecordTime += currentTime - readStartTime;
+      if (this.observedLatencyHistogram != null && recordCreationTimestamp > 0) {
+        long observedLatency = System.currentTimeMillis() - recordCreationTimestamp;
+        // Discard outliers larger than maxPossibleLatency to avoid additional overhead that may otherwise be incurred due to dynamic
+        // re-sizing of Histogram when observedLatency exceeds the maximum assumed latency. Essentially, we trade-off accuracy for
+        // performance in a pessimistic scenario.
+        if (observedLatency < this.maxPossibleLatency) {
+          this.observedLatencyHistogram.recordValue(observedLatency);
+        }
+      }
       if (this.isSlaConfigured) {
         if (v.slaMissedRecordCount < 0) {
           v.slaMissedRecordCount = 0;
@@ -308,10 +365,35 @@ public class KafkaExtractorStatsTracker {
       tagsForPartition.put(AVG_RECORD_PULL_TIME, Double.toString(-1));
     }
 
+    //Report observed latency histogram as part
+    if ((partitionId == minPartitionIdx) && (this.observedLatencyHistogram != null)) {
+      tagsForPartition.put(OBSERVED_LATENCY_HISTOGRAM, convertHistogramToString(this.observedLatencyHistogram));
+    }
     return tagsForPartition;
   }
 
   /**
+   * A helper method to serialize a {@link Histogram} to its string representation. This method uses the
+   * compressed logging format provided by the {@link org.HdrHistogram.HistogramLogWriter}
+   * to represent the Histogram as a string. The readers can use the {@link org.HdrHistogram.HistogramLogReader} to
+   * deserialize the string back to a {@link Histogram} object.
+   * @param observedLatencyHistogram
+   * @return
+   */
+  @VisibleForTesting
+  public static String convertHistogramToString(Histogram observedLatencyHistogram) {
+    ByteArrayOutputStream baos = new ByteArrayOutputStream();
+    try (PrintStream stream = new PrintStream(baos, true, Charsets.UTF_8.name())) {
+      HistogramLogWriter histogramLogWriter = new HistogramLogWriter(stream);
+      histogramLogWriter.outputIntervalHistogram(observedLatencyHistogram);
+      return new String(baos.toByteArray(), Charsets.UTF_8);
+    } catch (UnsupportedEncodingException e) {
+      log.error("Exception {} encountered when creating PrintStream; returning empty string", e);
+      return EMPTY_STRING;
+    }
+  }
+
+  /**
    * Emit Tracking events reporting the various statistics to be consumed by a monitoring application.
    * @param context the current {@link MetricContext}
    * @param lowWatermark begin Kafka offset for each topic partition
@@ -398,5 +480,8 @@ public class KafkaExtractorStatsTracker {
     for (int partitionIdx = 0; partitionIdx < this.partitions.size(); partitionIdx++) {
       resetStartFetchEpochTime(partitionIdx);
     }
+    if (this.observedLatencyHistogram != null) {
+      this.observedLatencyHistogram.reset();
+    }
   }
 }
diff --git a/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/source/extractor/extract/kafka/KafkaSource.java b/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/source/extractor/extract/kafka/KafkaSource.java
index 087077d..0932be6 100644
--- a/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/source/extractor/extract/kafka/KafkaSource.java
+++ b/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/source/extractor/extract/kafka/KafkaSource.java
@@ -114,7 +114,6 @@ public abstract class KafkaSource<S, D> extends EventBasedSource<S, D> {
   public static final String PREVIOUS_LATEST_OFFSET = "previousLatestOffset";
   public static final String OFFSET_FETCH_EPOCH_TIME = "offsetFetchEpochTime";
   public static final String PREVIOUS_OFFSET_FETCH_EPOCH_TIME = "previousOffsetFetchEpochTime";
-  public static final String NUM_TOPIC_PARTITIONS = "numTopicPartitions";
   public static final String GOBBLIN_KAFKA_CONSUMER_CLIENT_FACTORY_CLASS = "gobblin.kafka.consumerClient.class";
   public static final String GOBBLIN_KAFKA_EXTRACT_ALLOW_TABLE_TYPE_NAMESPACE_CUSTOMIZATION =
       "gobblin.kafka.extract.allowTableTypeAndNamspaceCustomization";
@@ -125,6 +124,14 @@ public abstract class KafkaSource<S, D> extends EventBasedSource<S, D> {
   public static final boolean DEFAULT_GOBBLIN_KAFKA_SHOULD_ENABLE_DATASET_STATESTORE = false;
   public static final String OFFSET_FETCH_TIMER = "offsetFetchTimer";
   public static final String RECORD_LEVEL_SLA_MINUTES_KEY = "gobblin.kafka.recordLevelSlaMinutes";
+  public static final String MAX_POSSIBLE_OBSERVED_LATENCY_IN_HOURS = "gobblin.kafka.maxobservedLatencyInHours";
+  public static final Integer DEFAULT_MAX_POSSIBLE_OBSERVED_LATENCY_IN_HOURS = 24;
+  public static final String OBSERVED_LATENCY_PRECISION = "gobblin.kafka.observedLatencyPrecision";
+  public static final Integer DEFAULT_OBSERVED_LATENCY_PRECISION = 3;
+  public static final String OBSERVED_LATENCY_MEASUREMENT_ENABLED = "gobblin.kafka.observedLatencyMeasurementEnabled";
+  public static final Boolean DEFAULT_OBSERVED_LATENCY_MEASUREMENT_ENABLED = false;
+  public static final String RECORD_CREATION_TIMESTAMP_FIELD = "gobblin.kafka.recordCreationTimestampField";
+  public static final String RECORD_CREATION_TIMESTAMP_UNIT = "gobblin.kafka.recordCreationTimestampUnit";
 
   private final Set<String> moveToLatestTopics = Sets.newTreeSet(String.CASE_INSENSITIVE_ORDER);
   private final Map<KafkaPartition, Long> previousOffsets = Maps.newConcurrentMap();
@@ -558,6 +565,17 @@ public abstract class KafkaSource<S, D> extends EventBasedSource<S, D> {
     if (state.contains(KafkaSource.RECORD_LEVEL_SLA_MINUTES_KEY)) {
       workUnit.setProp(KafkaSource.RECORD_LEVEL_SLA_MINUTES_KEY, state.getProp(KafkaSource.RECORD_LEVEL_SLA_MINUTES_KEY));
     }
+    boolean isobservedLatencyMeasurementEnabled = state.getPropAsBoolean(KafkaSource.OBSERVED_LATENCY_MEASUREMENT_ENABLED, DEFAULT_OBSERVED_LATENCY_MEASUREMENT_ENABLED);
+    if (isobservedLatencyMeasurementEnabled) {
+      Preconditions.checkArgument(state.contains(KafkaSource.RECORD_CREATION_TIMESTAMP_FIELD), "Missing config key: " + KafkaSource.RECORD_CREATION_TIMESTAMP_FIELD);
+      workUnit.setProp(KafkaSource.OBSERVED_LATENCY_MEASUREMENT_ENABLED, isobservedLatencyMeasurementEnabled);
+      workUnit.setProp(KafkaSource.MAX_POSSIBLE_OBSERVED_LATENCY_IN_HOURS,
+          state.getPropAsInt(KafkaSource.MAX_POSSIBLE_OBSERVED_LATENCY_IN_HOURS, DEFAULT_MAX_POSSIBLE_OBSERVED_LATENCY_IN_HOURS));
+      workUnit.setProp(KafkaSource.OBSERVED_LATENCY_PRECISION,
+          state.getPropAsInt(KafkaSource.OBSERVED_LATENCY_PRECISION, KafkaSource.DEFAULT_OBSERVED_LATENCY_PRECISION));
+      workUnit.setProp(KafkaSource.RECORD_CREATION_TIMESTAMP_FIELD, state.getProp(KafkaSource.RECORD_CREATION_TIMESTAMP_FIELD));
+      workUnit.setProp(KafkaSource.RECORD_CREATION_TIMESTAMP_UNIT, state.getProp(KafkaSource.RECORD_CREATION_TIMESTAMP_UNIT, TimeUnit.MILLISECONDS.name()));
+    }
   }
 
   private long getPreviousStartFetchEpochTimeForPartition(KafkaPartition partition, SourceState state) {
diff --git a/gobblin-modules/gobblin-kafka-common/src/test/java/org/apache/gobblin/source/extractor/extract/kafka/KafkaExtractorStatsTrackerTest.java b/gobblin-modules/gobblin-kafka-common/src/test/java/org/apache/gobblin/source/extractor/extract/kafka/KafkaExtractorStatsTrackerTest.java
index 6502fbe..278ff9b 100644
--- a/gobblin-modules/gobblin-kafka-common/src/test/java/org/apache/gobblin/source/extractor/extract/kafka/KafkaExtractorStatsTrackerTest.java
+++ b/gobblin-modules/gobblin-kafka-common/src/test/java/org/apache/gobblin/source/extractor/extract/kafka/KafkaExtractorStatsTrackerTest.java
@@ -16,19 +16,23 @@
  */
 package org.apache.gobblin.source.extractor.extract.kafka;
 
+import java.io.ByteArrayInputStream;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.List;
 import java.util.Map;
 
+import org.HdrHistogram.Histogram;
+import org.HdrHistogram.HistogramLogReader;
 import org.testng.Assert;
 import org.testng.annotations.BeforeClass;
 import org.testng.annotations.Test;
 
-import org.apache.gobblin.configuration.WorkUnitState;
-
+import com.google.common.base.Charsets;
 import com.google.common.collect.ImmutableMap;
 
+import org.apache.gobblin.configuration.WorkUnitState;
+
 
 public class KafkaExtractorStatsTrackerTest {
   List<KafkaPartition> kafkaPartitions = new ArrayList<>();
@@ -41,7 +45,8 @@ public class KafkaExtractorStatsTrackerTest {
     kafkaPartitions.add(PARTITION0);
     kafkaPartitions.add(PARTITION1);
     WorkUnitState workUnitState = new WorkUnitState();
-    workUnitState.setProp("gobblin.kafka.recordLevelSlaMinutes", 10L);
+    workUnitState.setProp(KafkaSource.RECORD_LEVEL_SLA_MINUTES_KEY, 10L);
+    workUnitState.setProp(KafkaSource.OBSERVED_LATENCY_MEASUREMENT_ENABLED, true);
     this.extractorStatsTracker = new KafkaExtractorStatsTracker(workUnitState, kafkaPartitions);
   }
 
@@ -77,7 +82,9 @@ public class KafkaExtractorStatsTrackerTest {
     long readStartTime = System.nanoTime();
     Thread.sleep(1);
     long decodeStartTime = System.nanoTime();
-    long logAppendTimestamp = System.currentTimeMillis() - 15 * 60 * 1000L;
+    long currentTimeMillis = System.currentTimeMillis();
+    long logAppendTimestamp = currentTimeMillis - 15 * 60 * 1000L;
+    long recordCreationTimestamp = currentTimeMillis - 16 * 60 * 1000L;
     Assert.assertEquals(this.extractorStatsTracker.getStatsMap().get(kafkaPartitions.get(0)).getProcessedRecordCount(), 0);
     Assert.assertEquals(this.extractorStatsTracker.getStatsMap().get(kafkaPartitions.get(0)).getPartitionTotalSize(), 0);
     Assert.assertTrue(this.extractorStatsTracker.getStatsMap().get(kafkaPartitions.get(0)).getDecodeRecordTime() == 0);
@@ -85,8 +92,9 @@ public class KafkaExtractorStatsTrackerTest {
     Assert.assertEquals(this.extractorStatsTracker.getStatsMap().get(kafkaPartitions.get(0)).getSlaMissedRecordCount(), -1);
     Assert.assertEquals(this.extractorStatsTracker.getStatsMap().get(kafkaPartitions.get(0)).getMinLogAppendTime(), -1);
     Assert.assertEquals(this.extractorStatsTracker.getStatsMap().get(kafkaPartitions.get(0)).getMaxLogAppendTime(), -1);
+    Assert.assertEquals(this.extractorStatsTracker.getObservedLatencyHistogram().getTotalCount(), 0);
 
-    this.extractorStatsTracker.onDecodeableRecord(0, readStartTime, decodeStartTime, 100, logAppendTimestamp);
+    this.extractorStatsTracker.onDecodeableRecord(0, readStartTime, decodeStartTime, 100, logAppendTimestamp, recordCreationTimestamp);
     Assert.assertEquals(this.extractorStatsTracker.getStatsMap().get(kafkaPartitions.get(0)).getProcessedRecordCount(), 1);
     Assert.assertEquals(this.extractorStatsTracker.getStatsMap().get(kafkaPartitions.get(0)).getPartitionTotalSize(), 100);
     Assert.assertTrue(this.extractorStatsTracker.getStatsMap().get(kafkaPartitions.get(0)).getDecodeRecordTime() > 0);
@@ -94,16 +102,19 @@ public class KafkaExtractorStatsTrackerTest {
     Assert.assertEquals(this.extractorStatsTracker.getStatsMap().get(kafkaPartitions.get(0)).getSlaMissedRecordCount(), 1);
     Assert.assertEquals(this.extractorStatsTracker.getStatsMap().get(kafkaPartitions.get(0)).getMinLogAppendTime(), logAppendTimestamp);
     Assert.assertEquals(this.extractorStatsTracker.getStatsMap().get(kafkaPartitions.get(0)).getMaxLogAppendTime(), logAppendTimestamp);
+    Assert.assertEquals(this.extractorStatsTracker.getObservedLatencyHistogram().getTotalCount(), 1);
 
     readStartTime = System.nanoTime();
     Thread.sleep(1);
     decodeStartTime = System.nanoTime();
     long previousLogAppendTimestamp = logAppendTimestamp;
-    logAppendTimestamp = System.currentTimeMillis() - 10;
+    currentTimeMillis = System.currentTimeMillis();
+    logAppendTimestamp = currentTimeMillis - 10;
+    recordCreationTimestamp = currentTimeMillis - 20;
     long previousDecodeRecordTime = this.extractorStatsTracker.getStatsMap().get(kafkaPartitions.get(0)).getDecodeRecordTime();
     long previousReadRecordTime = this.extractorStatsTracker.getStatsMap().get(kafkaPartitions.get(0)).getReadRecordTime();
 
-    this.extractorStatsTracker.onDecodeableRecord(0, readStartTime, decodeStartTime, 100, logAppendTimestamp);
+    this.extractorStatsTracker.onDecodeableRecord(0, readStartTime, decodeStartTime, 100, logAppendTimestamp, recordCreationTimestamp);
     Assert.assertEquals(this.extractorStatsTracker.getStatsMap().get(kafkaPartitions.get(0)).getProcessedRecordCount(), 2);
     Assert.assertEquals(this.extractorStatsTracker.getStatsMap().get(kafkaPartitions.get(0)).getPartitionTotalSize(), 200);
     Assert.assertTrue(this.extractorStatsTracker.getStatsMap().get(kafkaPartitions.get(0)).getDecodeRecordTime() > previousDecodeRecordTime);
@@ -111,6 +122,7 @@ public class KafkaExtractorStatsTrackerTest {
     Assert.assertEquals(this.extractorStatsTracker.getStatsMap().get(kafkaPartitions.get(0)).getSlaMissedRecordCount(), 1);
     Assert.assertEquals(this.extractorStatsTracker.getStatsMap().get(kafkaPartitions.get(0)).getMinLogAppendTime(), previousLogAppendTimestamp);
     Assert.assertEquals(this.extractorStatsTracker.getStatsMap().get(kafkaPartitions.get(0)).getMaxLogAppendTime(), logAppendTimestamp);
+    Assert.assertEquals(this.extractorStatsTracker.getObservedLatencyHistogram().getTotalCount(), 2);
   }
 
   @Test
@@ -145,10 +157,12 @@ public class KafkaExtractorStatsTrackerTest {
     Assert.assertEquals(this.extractorStatsTracker.getStatsMap().get(kafkaPartitions.get(0)).getAvgRecordSize(), 100);
 
     readStartTime = System.nanoTime();
-    long logAppendTimestamp = System.currentTimeMillis() - 10;
+    long currentTimeMillis = System.currentTimeMillis();
+    long logAppendTimestamp = currentTimeMillis - 10;
+    long recordCreationTimestamp = currentTimeMillis - 20;
     Thread.sleep(1);
     long decodeStartTime = System.nanoTime();
-    this.extractorStatsTracker.onDecodeableRecord(1, readStartTime, decodeStartTime, 100, logAppendTimestamp);
+    this.extractorStatsTracker.onDecodeableRecord(1, readStartTime, decodeStartTime, 100, logAppendTimestamp, recordCreationTimestamp);
     this.extractorStatsTracker.updateStatisticsForCurrentPartition(1, readStartTime, 0);
     Assert.assertTrue(this.extractorStatsTracker.getStatsMap().get(kafkaPartitions.get(1)).getElapsedTime() > 0);
     Assert.assertTrue(this.extractorStatsTracker.getStatsMap().get(kafkaPartitions.get(1)).getAvgMillisPerRecord() > 0);
@@ -156,6 +170,7 @@ public class KafkaExtractorStatsTrackerTest {
     Assert.assertEquals(this.extractorStatsTracker.getStatsMap().get(kafkaPartitions.get(1)).getSlaMissedRecordCount(), 0);
     Assert.assertEquals(this.extractorStatsTracker.getStatsMap().get(kafkaPartitions.get(1)).getMinLogAppendTime(), logAppendTimestamp);
     Assert.assertEquals(this.extractorStatsTracker.getStatsMap().get(kafkaPartitions.get(1)).getMaxLogAppendTime(), logAppendTimestamp);
+    Assert.assertEquals(this.extractorStatsTracker.getObservedLatencyHistogram().getTotalCount(), 3);
   }
 
   @Test (dependsOnMethods = "testUpdateStatisticsForCurrentPartition")
@@ -166,8 +181,10 @@ public class KafkaExtractorStatsTrackerTest {
     Assert.assertEquals(this.extractorStatsTracker.getAvgRecordSize(0), 0);
     long readStartTime = System.nanoTime();
     long decodeStartTime = readStartTime + 1;
-    long logAppendTimeStamp = System.currentTimeMillis() - 10;
-    this.extractorStatsTracker.onDecodeableRecord(1, readStartTime, decodeStartTime, 150, logAppendTimeStamp);
+    long currentTimeMillis = System.currentTimeMillis();
+    long logAppendTimestamp = currentTimeMillis - 10;
+    long recordCreationTimestamp = currentTimeMillis - 20;
+    this.extractorStatsTracker.onDecodeableRecord(1, readStartTime, decodeStartTime, 150, logAppendTimestamp, recordCreationTimestamp);
     Assert.assertEquals(this.extractorStatsTracker.getAvgRecordSize(1), 150);
   }
 
@@ -184,4 +201,23 @@ public class KafkaExtractorStatsTrackerTest {
     Assert.assertEquals(result.get(PARTITION0).get("testKey"), "testValue");
     Assert.assertFalse(result.get(PARTITION1).containsKey("testKey"));
   }
+
+  @Test
+  public void testConvertHistogramToString() {
+    Histogram histogram = new Histogram(1, 100, 3);
+    histogram.recordValue(3);
+    histogram.recordValue(25);
+    histogram.recordValue(25);
+    histogram.recordValue(92);
+    String histogramString = KafkaExtractorStatsTracker.convertHistogramToString(histogram);
+
+    HistogramLogReader logReader = new HistogramLogReader(new ByteArrayInputStream(histogramString.getBytes(
+        Charsets.UTF_8)));
+    Histogram histogram1 = (Histogram) logReader.nextIntervalHistogram();
+    Assert.assertEquals(histogram1.getTotalCount(), 4);
+    Assert.assertEquals(histogram1.getMaxValue(), 92);
+    Assert.assertEquals(histogram1.getCountAtValue(25), 2);
+    Assert.assertEquals(histogram1.getCountAtValue(3), 1);
+    Assert.assertEquals(histogram1.getCountAtValue(92), 1);
+  }
 }
\ No newline at end of file
diff --git a/gobblin-runtime/build.gradle b/gobblin-runtime/build.gradle
index 0ff442a..306036c 100644
--- a/gobblin-runtime/build.gradle
+++ b/gobblin-runtime/build.gradle
@@ -94,8 +94,7 @@ dependencies {
   testCompile externalDependency.curatorTest
   testCompile externalDependency.mockito
   testRuntime externalDependency.derby
-
-  jmh 'org.openjdk.jmh:jmh-core:1.17.3'
+  testCompile externalDependency.jmh
 }
 
 // Begin HACK to get around POM being depenendent on the (empty) gobblin-rest-api instead of gobblin-rest-api-rest-client
diff --git a/gradle/scripts/dependencyDefinitions.gradle b/gradle/scripts/dependencyDefinitions.gradle
index 7c83a79..6cc9363 100644
--- a/gradle/scripts/dependencyDefinitions.gradle
+++ b/gradle/scripts/dependencyDefinitions.gradle
@@ -62,6 +62,7 @@ ext.externalDependency = [
     "hadoopYarnMiniCluster": "org.apache.hadoop:hadoop-minicluster:" + hadoopVersion,
     "hadoopAnnotations": "org.apache.hadoop:hadoop-annotations:" + hadoopVersion,
     "hadoopAws": "org.apache.hadoop:hadoop-aws:2.6.0",
+    "hdrHistogram": "org.hdrhistogram:HdrHistogram:2.1.11",
     "helix": "org.apache.helix:helix-core:0.8.2",
     "hiveCommon": "org.apache.hive:hive-common:" + hiveVersion,
     "hiveService": "org.apache.hive:hive-service:" + hiveVersion,
@@ -74,6 +75,8 @@ ext.externalDependency = [
     "httpcore": "org.apache.httpcomponents:httpcore:4.4.4",
     "httpasyncclient": "org.apache.httpcomponents:httpasyncclient:4.1.3",
     "jgit": "org.eclipse.jgit:org.eclipse.jgit:5.1.1.201809181055-r",
+    "jmh": "org.openjdk.jmh:jmh-core:1.17.3",
+    "jmhAnnotations": "org.openjdk.jmh:jmh-generator-annprocess:1.17.3",
     "kafka08": "org.apache.kafka:kafka_2.11:" + kafka08Version,
     "kafka08Test": "org.apache.kafka:kafka_2.11:" + kafka08Version + ":test",
     "kafka08Client": "org.apache.kafka:kafka-clients:" + kafka08Version,