You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@gobblin.apache.org by su...@apache.org on 2020/03/18 04:37:19 UTC
[incubator-gobblin] branch master updated: [GOBBLIN-1087] Track and report histogram of observed lag from Gobblin…
This is an automated email from the ASF dual-hosted git repository.
suvasude pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-gobblin.git
The following commit(s) were added to refs/heads/master by this push:
new 5e742cf [GOBBLIN-1087] Track and report histogram of observed lag from Gobblin…
5e742cf is described below
commit 5e742cfd84d01d368b46c5db16688b870c389871
Author: sv2000 <su...@gmail.com>
AuthorDate: Tue Mar 17 21:37:13 2020 -0700
[GOBBLIN-1087] Track and report histogram of observed lag from Gobblin…
Closes #2928 from sv2000/observedLag
---
gobblin-core-base/build.gradle | 3 +-
gobblin-modules/gobblin-kafka-common/build.gradle | 9 ++
.../kafka/HdrHistogramPerformanceBenchmark.java | 154 +++++++++++++++++++++
.../gobblin/kafka/client/KafkaConsumerRecord.java | 17 +--
.../extractor/extract/kafka/KafkaExtractor.java | 10 +-
.../extract/kafka/KafkaExtractorStatsTracker.java | 89 +++++++++++-
.../extractor/extract/kafka/KafkaSource.java | 20 ++-
.../kafka/KafkaExtractorStatsTrackerTest.java | 58 ++++++--
gobblin-runtime/build.gradle | 3 +-
gradle/scripts/dependencyDefinitions.gradle | 3 +
10 files changed, 339 insertions(+), 27 deletions(-)
diff --git a/gobblin-core-base/build.gradle b/gobblin-core-base/build.gradle
index 80b0f67..11ca360 100644
--- a/gobblin-core-base/build.gradle
+++ b/gobblin-core-base/build.gradle
@@ -38,8 +38,7 @@ dependencies {
testCompile externalDependency.testng
testCompile externalDependency.mockito
-
- jmh 'org.openjdk.jmh:jmh-core:1.17.3'
+ testCompile externalDependency.jmh
}
test {
diff --git a/gobblin-modules/gobblin-kafka-common/build.gradle b/gobblin-modules/gobblin-kafka-common/build.gradle
index d38d7d3..dba07a7 100644
--- a/gobblin-modules/gobblin-kafka-common/build.gradle
+++ b/gobblin-modules/gobblin-kafka-common/build.gradle
@@ -16,6 +16,7 @@
*/
apply plugin: 'java'
+apply plugin: 'me.champeau.gradle.jmh'
dependencies {
compile project(":gobblin-api")
@@ -33,6 +34,7 @@ dependencies {
compile externalDependency.commonsPool
compile externalDependency.guava
compile externalDependency.gson
+ compile externalDependency.hdrHistogram
compile externalDependency.jacksonCore
compile externalDependency.jacksonMapper
compile externalDependency.slf4j
@@ -44,6 +46,7 @@ dependencies {
testCompile project(":gobblin-test-utils")
testCompile externalDependency.mockito
testCompile externalDependency.testng
+ testCompile externalDependency.jmh
}
configurations {
@@ -59,4 +62,10 @@ test {
workingDir rootProject.rootDir
}
+jmh {
+ include = ""
+ zip64 = true
+ duplicateClassesStrategy = "EXCLUDE"
+}
+
ext.classification="library"
diff --git a/gobblin-modules/gobblin-kafka-common/src/jmh/java/org/apache/gobblin/source/extractor/extract/kafka/HdrHistogramPerformanceBenchmark.java b/gobblin-modules/gobblin-kafka-common/src/jmh/java/org/apache/gobblin/source/extractor/extract/kafka/HdrHistogramPerformanceBenchmark.java
new file mode 100644
index 0000000..23631f9
--- /dev/null
+++ b/gobblin-modules/gobblin-kafka-common/src/jmh/java/org/apache/gobblin/source/extractor/extract/kafka/HdrHistogramPerformanceBenchmark.java
@@ -0,0 +1,154 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.gobblin.source.extractor.extract.kafka;
+
+import java.util.concurrent.TimeUnit;
+import java.util.stream.IntStream;
+
+import org.HdrHistogram.Histogram;
+import org.apache.commons.math3.random.RandomDataGenerator;
+import org.openjdk.jmh.annotations.Benchmark;
+import org.openjdk.jmh.annotations.BenchmarkMode;
+import org.openjdk.jmh.annotations.Fork;
+import org.openjdk.jmh.annotations.Level;
+import org.openjdk.jmh.annotations.Measurement;
+import org.openjdk.jmh.annotations.Mode;
+import org.openjdk.jmh.annotations.OutputTimeUnit;
+import org.openjdk.jmh.annotations.Scope;
+import org.openjdk.jmh.annotations.Setup;
+import org.openjdk.jmh.annotations.State;
+import org.openjdk.jmh.annotations.TearDown;
+import org.openjdk.jmh.annotations.Warmup;
+import org.openjdk.jmh.infra.Blackhole;
+import org.openjdk.jmh.runner.Runner;
+import org.openjdk.jmh.runner.options.ChainedOptionsBuilder;
+import org.openjdk.jmh.runner.options.OptionsBuilder;
+
+import lombok.extern.slf4j.Slf4j;
+
+
+/**
+ * A micro-benchmark to measure the time taken to serialize a {@link Histogram} instance to its String representation. The
+ * benchmark uses a Random number generator to generate values according to a Uniform Distribution, an adversarial pattern
+ * for a Histogram that is likely to produce more count buckets in comparison with a skewed distribution. The benchmark
+ * provides an upper bound on memory footprint of the histogram, serialization time, as well as the size of the
+ * serialized representation.
+ */
+@Warmup (iterations = 3)
+@Measurement (iterations = 10)
+@BenchmarkMode (value = Mode.AverageTime)
+@Fork (value = 1)
+@OutputTimeUnit (TimeUnit.MILLISECONDS)
+@Slf4j
+public class HdrHistogramPerformanceBenchmark {
+
+ @State (value = Scope.Benchmark)
+ public static class HistogramState {
+ private static long MIN_VALUE = 1;
+ private static long MAX_VALUE = TimeUnit.HOURS.toMillis(24);
+
+ private Histogram histogram1;
+ private Histogram histogram2;
+ private Histogram histogram3;
+ private Histogram histogram4;
+
+ private final RandomDataGenerator random = new RandomDataGenerator();
+
+ @Setup (value = Level.Iteration)
+ public void setUp() {
+ this.histogram1 = buildHistogram(1000000);
+ this.histogram2 = buildHistogram(2000000);
+ this.histogram3 = buildHistogram(4000000);
+ this.histogram4 = buildHistogram(10000000);
+ }
+
+ private Histogram buildHistogram(int size) {
+ Histogram histogram = new Histogram(MIN_VALUE, MAX_VALUE, 3);
+ IntStream.range(0, size).mapToLong(i -> random.nextLong(MIN_VALUE, MAX_VALUE))
+ .forEachOrdered(histogram::recordValue);
+ System.out.println("Estimated memory footprint of histogram is: " + histogram.getEstimatedFootprintInBytes());
+ return histogram;
+ }
+
+ @TearDown (value = Level.Iteration)
+ public void tearDown() {
+ this.histogram1.reset();
+ this.histogram2.reset();
+ this.histogram3.reset();
+ this.histogram4.reset();
+ }
+ }
+
+ @Benchmark
+ public String trackHistogram1MToStringConversion(HistogramState histogramState) {
+ String histogramString = KafkaExtractorStatsTracker.convertHistogramToString(histogramState.histogram1);
+ System.out.println("Histogram serialized string size: " + histogramString.length());
+ return histogramString;
+ }
+
+ @Benchmark
+ public String trackHistogram2MToStringConversion(HistogramState histogramState) {
+ String histogramString = KafkaExtractorStatsTracker.convertHistogramToString(histogramState.histogram2);
+ System.out.println("Histogram serialized string size: " + histogramString.length());
+ return histogramString;
+ }
+
+ @Benchmark
+ public String trackHistogram4MToStringConversion(HistogramState histogramState) {
+ String histogramString = KafkaExtractorStatsTracker.convertHistogramToString(histogramState.histogram3);
+ System.out.println("Histogram serialized string size: " + histogramString.length());
+ return histogramString;
+ }
+
+ @Benchmark
+ public String trackHistogram10MToStringConversion(HistogramState histogramState) {
+ String histogramString = KafkaExtractorStatsTracker.convertHistogramToString(histogramState.histogram4);
+ System.out.println("Histogram serialized string size: " + histogramString.length());
+ return histogramString;
+ }
+
+ @Benchmark
+ public Histogram trackMergeHistogram(HistogramState histogramState) {
+ Histogram histogram = new Histogram(histogramState.MIN_VALUE, histogramState.MAX_VALUE, 3);
+ histogram.add(histogramState.histogram1);
+ histogram.add(histogramState.histogram2);
+ histogram.add(histogramState.histogram3);
+ histogram.add(histogramState.histogram4);
+ return histogram;
+ }
+
+ @Benchmark
+ public Histogram trackBuildHistogram(HistogramState histogramState) {
+ Histogram histogram = new Histogram(histogramState.MIN_VALUE, histogramState.MAX_VALUE, 3);
+ return histogram;
+ }
+
+ @Benchmark
+ public void trackResetHistogram(HistogramState histogramState, Blackhole blackhole) {
+ int dummyVal = 1;
+ histogramState.histogram4.reset();
+ blackhole.consume(dummyVal);
+ }
+
+ public static void main(String[] args) throws Exception {
+ ChainedOptionsBuilder opt = new OptionsBuilder()
+ .include(HdrHistogramPerformanceBenchmark.class.getSimpleName())
+ .warmupIterations(3)
+ .measurementIterations(10);
+ new Runner(opt.build()).run();
+ }
+}
diff --git a/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/kafka/client/KafkaConsumerRecord.java b/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/kafka/client/KafkaConsumerRecord.java
index cba9fa2..4808436 100644
--- a/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/kafka/client/KafkaConsumerRecord.java
+++ b/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/kafka/client/KafkaConsumerRecord.java
@@ -16,6 +16,9 @@
*/
package org.apache.gobblin.kafka.client;
+import java.util.concurrent.TimeUnit;
+
+
/**
* A kafka message/record consumed from {@link GobblinKafkaConsumerClient}. This interface provides APIs to read message
* metadata. Extension interfaces like {@link DecodeableKafkaRecord} or {@link ByteArrayBasedKafkaRecord} provide APIs
@@ -51,14 +54,6 @@ public interface KafkaConsumerRecord {
return false;
}
- default boolean isTimestampCreateTime() {
- return false;
- }
-
- default boolean isTimestampNone() {
- return false;
- }
-
/**
* @return Partition id for this record
*/
@@ -69,4 +64,10 @@ public interface KafkaConsumerRecord {
*/
String getTopic();
+ /**
+ * @param fieldName the field name containing the record creation time.
+ * @param timeUnit the timeunit for the timestamp field.
+ * @return the record creation timestamp, if it is available. Defaults to 0.
+ */
+ public default long getRecordCreationTimestamp(String fieldName, TimeUnit timeUnit) { return 0; }
}
diff --git a/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/source/extractor/extract/kafka/KafkaExtractor.java b/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/source/extractor/extract/kafka/KafkaExtractor.java
index ce0377a..4055fb0 100644
--- a/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/source/extractor/extract/kafka/KafkaExtractor.java
+++ b/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/source/extractor/extract/kafka/KafkaExtractor.java
@@ -20,6 +20,7 @@ package org.apache.gobblin.source.extractor.extract.kafka;
import java.io.IOException;
import java.util.Iterator;
import java.util.List;
+import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import org.slf4j.Logger;
@@ -60,6 +61,8 @@ public abstract class KafkaExtractor<S, D> extends EventBasedExtractor<S, D> {
private final ClassAliasResolver<GobblinKafkaConsumerClientFactory> kafkaConsumerClientResolver;
private final AtomicBoolean shutdownRequested = new AtomicBoolean(false);
+ private final String recordCreationTimestampFieldName;
+ private final TimeUnit recordCreationTimestampUnit;
private Iterator<KafkaConsumerRecord> messageIterator = null;
@Getter
@@ -105,6 +108,9 @@ public abstract class KafkaExtractor<S, D> extends EventBasedExtractor<S, D> {
// The actual high watermark starts with the low watermark
this.workUnitState.setActualHighWatermark(this.lowWatermark);
+
+ this.recordCreationTimestampFieldName = this.workUnitState.getProp(KafkaSource.RECORD_CREATION_TIMESTAMP_FIELD, null);
+ this.recordCreationTimestampUnit = TimeUnit.valueOf(this.workUnitState.getProp(KafkaSource.RECORD_CREATION_TIMESTAMP_UNIT, TimeUnit.MILLISECONDS.name()));
}
@Override
@@ -177,7 +183,9 @@ public abstract class KafkaExtractor<S, D> extends EventBasedExtractor<S, D> {
D record = decodeKafkaMessage(nextValidMessage);
this.statsTracker.onDecodeableRecord(this.currentPartitionIdx, readStartTime, decodeStartTime,
- nextValidMessage.getValueSizeInBytes(), nextValidMessage.isTimestampLogAppend() ? nextValidMessage.getTimestamp() : 0L);
+ nextValidMessage.getValueSizeInBytes(), nextValidMessage.isTimestampLogAppend() ? nextValidMessage.getTimestamp() : 0L,
+ (this.recordCreationTimestampFieldName != null) ? nextValidMessage
+ .getRecordCreationTimestamp(this.recordCreationTimestampFieldName, this.recordCreationTimestampUnit) : 0L);
this.currentPartitionLastSuccessfulRecord = record;
return record;
} catch (Throwable t) {
diff --git a/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/source/extractor/extract/kafka/KafkaExtractorStatsTracker.java b/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/source/extractor/extract/kafka/KafkaExtractorStatsTracker.java
index 6811fc2..a1f4520 100644
--- a/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/source/extractor/extract/kafka/KafkaExtractorStatsTracker.java
+++ b/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/source/extractor/extract/kafka/KafkaExtractorStatsTracker.java
@@ -16,12 +16,19 @@
*/
package org.apache.gobblin.source.extractor.extract.kafka;
+import java.io.ByteArrayOutputStream;
+import java.io.PrintStream;
+import java.io.UnsupportedEncodingException;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.TimeUnit;
+import org.HdrHistogram.Histogram;
+import org.HdrHistogram.HistogramLogWriter;
+
import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Charsets;
import com.google.common.collect.Maps;
import com.google.common.collect.Sets;
@@ -46,6 +53,7 @@ public class KafkaExtractorStatsTracker {
public static final String TOPIC = "topic";
public static final String PARTITION = "partition";
+ private static final String EMPTY_STRING = "";
private static final String GOBBLIN_KAFKA_NAMESPACE = "gobblin.kafka";
private static final String KAFKA_EXTRACTOR_TOPIC_METADATA_EVENT_NAME = "KafkaExtractorTopicMetadata";
private static final String LOW_WATERMARK = "lowWatermark";
@@ -64,32 +72,71 @@ public class KafkaExtractorStatsTracker {
private static final String DECODE_RECORD_TIME = "decodeRecordTime";
private static final String FETCH_MESSAGE_BUFFER_TIME = "fetchMessageBufferTime";
private static final String LAST_RECORD_HEADER_TIMESTAMP = "lastRecordHeaderTimestamp";
+ private static final String OBSERVED_LATENCY_HISTOGRAM = "observedLatencyHistogram";
@Getter
private final Map<KafkaPartition, ExtractorStats> statsMap;
private final Set<Integer> errorPartitions;
private final WorkUnitState workUnitState;
private final TaskEventMetadataGenerator taskEventMetadataGenerator;
+ @Getter
+ private final Histogram observedLatencyHistogram;
private boolean isSlaConfigured;
private long recordLevelSlaMillis;
+ //Minimum partition index processed by this task. Statistics that are aggregated across all partitions (e.g. observed latency histogram)
+ // processed by the task are reported against this partition index.
+ private int minPartitionIdx = Integer.MAX_VALUE;
//A global count of number of undecodeable messages encountered by the KafkaExtractor across all Kafka
//TopicPartitions.
@Getter
private int undecodableMessageCount = 0;
private List<KafkaPartition> partitions;
+ private long maxPossibleLatency;
public KafkaExtractorStatsTracker(WorkUnitState state, List<KafkaPartition> partitions) {
this.workUnitState = state;
this.partitions = partitions;
this.statsMap = Maps.newHashMapWithExpectedSize(this.partitions.size());
- this.partitions.forEach(partition -> this.statsMap.put(partition, new ExtractorStats()));
+ this.partitions.forEach(partition -> {
+ this.statsMap.put(partition, new ExtractorStats());
+ if (partition.getId() < minPartitionIdx) {
+ minPartitionIdx = partition.getId();
+ }
+ });
this.errorPartitions = Sets.newHashSet();
if (this.workUnitState.contains(KafkaSource.RECORD_LEVEL_SLA_MINUTES_KEY)) {
this.isSlaConfigured = true;
this.recordLevelSlaMillis = TimeUnit.MINUTES.toMillis(this.workUnitState.getPropAsLong(KafkaSource.RECORD_LEVEL_SLA_MINUTES_KEY));
}
this.taskEventMetadataGenerator = TaskEventMetadataUtils.getTaskEventMetadataGenerator(workUnitState);
+ if (state.getPropAsBoolean(KafkaSource.OBSERVED_LATENCY_MEASUREMENT_ENABLED, KafkaSource.DEFAULT_OBSERVED_LATENCY_MEASUREMENT_ENABLED)) {
+ this.observedLatencyHistogram = buildobservedLatencyHistogram(state);
+ } else {
+ this.observedLatencyHistogram = null;
+ }
+ }
+
+ /**
+ * A method that constructs a {@link Histogram} object based on a minimum value, a maximum value and precision in terms
+ * of number of significant digits. The returned {@link Histogram} is not an auto-resizing histogram and any outliers
+ * above the maximum possible value are discarded in favor of bounding the worst-case performance.
+ *
+ * @param state
+ * @return a non auto-resizing {@link Histogram} with a bounded range and precision.
+ */
+ private Histogram buildobservedLatencyHistogram(WorkUnitState state) {
+ this.maxPossibleLatency = TimeUnit.HOURS.toMillis(state.getPropAsInt(KafkaSource.MAX_POSSIBLE_OBSERVED_LATENCY_IN_HOURS,
+ KafkaSource.DEFAULT_MAX_POSSIBLE_OBSERVED_LATENCY_IN_HOURS));
+ int numSignificantDigits = state.getPropAsInt(KafkaSource.OBSERVED_LATENCY_PRECISION, KafkaSource.DEFAULT_OBSERVED_LATENCY_PRECISION);
+ if (numSignificantDigits > 5) {
+ log.warn("Max precision must be <= 5; Setting precision for observed latency to 5.");
+ numSignificantDigits = 5;
+ } else if (numSignificantDigits < 1) {
+ log.warn("Max precision must be >= 1; Setting precision to the default value of 3.");
+ numSignificantDigits = 3;
+ }
+ return new Histogram(1, maxPossibleLatency, numSignificantDigits);
}
public int getErrorPartitionCount() {
@@ -161,14 +208,24 @@ public class KafkaExtractorStatsTracker {
* @param decodeStartTime the time instant immediately before a record decoding begins.
* @param recordSizeInBytes the size of the decoded record in bytes.
* @param logAppendTimestamp the log append time of the {@link org.apache.gobblin.kafka.client.KafkaConsumerRecord}.
+ * @param recordCreationTimestamp the time of the {@link org.apache.gobblin.kafka.client.KafkaConsumerRecord}.
*/
- public void onDecodeableRecord(int partitionIdx, long readStartTime, long decodeStartTime, long recordSizeInBytes, long logAppendTimestamp) {
+ public void onDecodeableRecord(int partitionIdx, long readStartTime, long decodeStartTime, long recordSizeInBytes, long logAppendTimestamp, long recordCreationTimestamp) {
this.statsMap.computeIfPresent(this.partitions.get(partitionIdx), (k, v) -> {
long currentTime = System.nanoTime();
v.processedRecordCount++;
v.partitionTotalSize += recordSizeInBytes;
v.decodeRecordTime += currentTime - decodeStartTime;
v.readRecordTime += currentTime - readStartTime;
+ if (this.observedLatencyHistogram != null && recordCreationTimestamp > 0) {
+ long observedLatency = System.currentTimeMillis() - recordCreationTimestamp;
+ // Discard outliers larger than maxPossibleLatency to avoid additional overhead that may otherwise be incurred due to dynamic
+ // re-sizing of Histogram when observedLatency exceeds the maximum assumed latency. Essentially, we trade-off accuracy for
+ // performance in a pessimistic scenario.
+ if (observedLatency < this.maxPossibleLatency) {
+ this.observedLatencyHistogram.recordValue(observedLatency);
+ }
+ }
if (this.isSlaConfigured) {
if (v.slaMissedRecordCount < 0) {
v.slaMissedRecordCount = 0;
@@ -308,10 +365,35 @@ public class KafkaExtractorStatsTracker {
tagsForPartition.put(AVG_RECORD_PULL_TIME, Double.toString(-1));
}
+ //Report observed latency histogram as part
+ if ((partitionId == minPartitionIdx) && (this.observedLatencyHistogram != null)) {
+ tagsForPartition.put(OBSERVED_LATENCY_HISTOGRAM, convertHistogramToString(this.observedLatencyHistogram));
+ }
return tagsForPartition;
}
/**
+ * A helper method to serialize a {@link Histogram} to its string representation. This method uses the
+ * compressed logging format provided by the {@link org.HdrHistogram.HistogramLogWriter}
+ * to represent the Histogram as a string. The readers can use the {@link org.HdrHistogram.HistogramLogReader} to
+ * deserialize the string back to a {@link Histogram} object.
+ * @param observedLatencyHistogram
+ * @return
+ */
+ @VisibleForTesting
+ public static String convertHistogramToString(Histogram observedLatencyHistogram) {
+ ByteArrayOutputStream baos = new ByteArrayOutputStream();
+ try (PrintStream stream = new PrintStream(baos, true, Charsets.UTF_8.name())) {
+ HistogramLogWriter histogramLogWriter = new HistogramLogWriter(stream);
+ histogramLogWriter.outputIntervalHistogram(observedLatencyHistogram);
+ return new String(baos.toByteArray(), Charsets.UTF_8);
+ } catch (UnsupportedEncodingException e) {
+ log.error("Exception {} encountered when creating PrintStream; returning empty string", e);
+ return EMPTY_STRING;
+ }
+ }
+
+ /**
* Emit Tracking events reporting the various statistics to be consumed by a monitoring application.
* @param context the current {@link MetricContext}
* @param lowWatermark begin Kafka offset for each topic partition
@@ -398,5 +480,8 @@ public class KafkaExtractorStatsTracker {
for (int partitionIdx = 0; partitionIdx < this.partitions.size(); partitionIdx++) {
resetStartFetchEpochTime(partitionIdx);
}
+ if (this.observedLatencyHistogram != null) {
+ this.observedLatencyHistogram.reset();
+ }
}
}
diff --git a/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/source/extractor/extract/kafka/KafkaSource.java b/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/source/extractor/extract/kafka/KafkaSource.java
index 087077d..0932be6 100644
--- a/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/source/extractor/extract/kafka/KafkaSource.java
+++ b/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/source/extractor/extract/kafka/KafkaSource.java
@@ -114,7 +114,6 @@ public abstract class KafkaSource<S, D> extends EventBasedSource<S, D> {
public static final String PREVIOUS_LATEST_OFFSET = "previousLatestOffset";
public static final String OFFSET_FETCH_EPOCH_TIME = "offsetFetchEpochTime";
public static final String PREVIOUS_OFFSET_FETCH_EPOCH_TIME = "previousOffsetFetchEpochTime";
- public static final String NUM_TOPIC_PARTITIONS = "numTopicPartitions";
public static final String GOBBLIN_KAFKA_CONSUMER_CLIENT_FACTORY_CLASS = "gobblin.kafka.consumerClient.class";
public static final String GOBBLIN_KAFKA_EXTRACT_ALLOW_TABLE_TYPE_NAMESPACE_CUSTOMIZATION =
"gobblin.kafka.extract.allowTableTypeAndNamspaceCustomization";
@@ -125,6 +124,14 @@ public abstract class KafkaSource<S, D> extends EventBasedSource<S, D> {
public static final boolean DEFAULT_GOBBLIN_KAFKA_SHOULD_ENABLE_DATASET_STATESTORE = false;
public static final String OFFSET_FETCH_TIMER = "offsetFetchTimer";
public static final String RECORD_LEVEL_SLA_MINUTES_KEY = "gobblin.kafka.recordLevelSlaMinutes";
+ public static final String MAX_POSSIBLE_OBSERVED_LATENCY_IN_HOURS = "gobblin.kafka.maxobservedLatencyInHours";
+ public static final Integer DEFAULT_MAX_POSSIBLE_OBSERVED_LATENCY_IN_HOURS = 24;
+ public static final String OBSERVED_LATENCY_PRECISION = "gobblin.kafka.observedLatencyPrecision";
+ public static final Integer DEFAULT_OBSERVED_LATENCY_PRECISION = 3;
+ public static final String OBSERVED_LATENCY_MEASUREMENT_ENABLED = "gobblin.kafka.observedLatencyMeasurementEnabled";
+ public static final Boolean DEFAULT_OBSERVED_LATENCY_MEASUREMENT_ENABLED = false;
+ public static final String RECORD_CREATION_TIMESTAMP_FIELD = "gobblin.kafka.recordCreationTimestampField";
+ public static final String RECORD_CREATION_TIMESTAMP_UNIT = "gobblin.kafka.recordCreationTimestampUnit";
private final Set<String> moveToLatestTopics = Sets.newTreeSet(String.CASE_INSENSITIVE_ORDER);
private final Map<KafkaPartition, Long> previousOffsets = Maps.newConcurrentMap();
@@ -558,6 +565,17 @@ public abstract class KafkaSource<S, D> extends EventBasedSource<S, D> {
if (state.contains(KafkaSource.RECORD_LEVEL_SLA_MINUTES_KEY)) {
workUnit.setProp(KafkaSource.RECORD_LEVEL_SLA_MINUTES_KEY, state.getProp(KafkaSource.RECORD_LEVEL_SLA_MINUTES_KEY));
}
+ boolean isobservedLatencyMeasurementEnabled = state.getPropAsBoolean(KafkaSource.OBSERVED_LATENCY_MEASUREMENT_ENABLED, DEFAULT_OBSERVED_LATENCY_MEASUREMENT_ENABLED);
+ if (isobservedLatencyMeasurementEnabled) {
+ Preconditions.checkArgument(state.contains(KafkaSource.RECORD_CREATION_TIMESTAMP_FIELD), "Missing config key: " + KafkaSource.RECORD_CREATION_TIMESTAMP_FIELD);
+ workUnit.setProp(KafkaSource.OBSERVED_LATENCY_MEASUREMENT_ENABLED, isobservedLatencyMeasurementEnabled);
+ workUnit.setProp(KafkaSource.MAX_POSSIBLE_OBSERVED_LATENCY_IN_HOURS,
+ state.getPropAsInt(KafkaSource.MAX_POSSIBLE_OBSERVED_LATENCY_IN_HOURS, DEFAULT_MAX_POSSIBLE_OBSERVED_LATENCY_IN_HOURS));
+ workUnit.setProp(KafkaSource.OBSERVED_LATENCY_PRECISION,
+ state.getPropAsInt(KafkaSource.OBSERVED_LATENCY_PRECISION, KafkaSource.DEFAULT_OBSERVED_LATENCY_PRECISION));
+ workUnit.setProp(KafkaSource.RECORD_CREATION_TIMESTAMP_FIELD, state.getProp(KafkaSource.RECORD_CREATION_TIMESTAMP_FIELD));
+ workUnit.setProp(KafkaSource.RECORD_CREATION_TIMESTAMP_UNIT, state.getProp(KafkaSource.RECORD_CREATION_TIMESTAMP_UNIT, TimeUnit.MILLISECONDS.name()));
+ }
}
private long getPreviousStartFetchEpochTimeForPartition(KafkaPartition partition, SourceState state) {
diff --git a/gobblin-modules/gobblin-kafka-common/src/test/java/org/apache/gobblin/source/extractor/extract/kafka/KafkaExtractorStatsTrackerTest.java b/gobblin-modules/gobblin-kafka-common/src/test/java/org/apache/gobblin/source/extractor/extract/kafka/KafkaExtractorStatsTrackerTest.java
index 6502fbe..278ff9b 100644
--- a/gobblin-modules/gobblin-kafka-common/src/test/java/org/apache/gobblin/source/extractor/extract/kafka/KafkaExtractorStatsTrackerTest.java
+++ b/gobblin-modules/gobblin-kafka-common/src/test/java/org/apache/gobblin/source/extractor/extract/kafka/KafkaExtractorStatsTrackerTest.java
@@ -16,19 +16,23 @@
*/
package org.apache.gobblin.source.extractor.extract.kafka;
+import java.io.ByteArrayInputStream;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.Map;
+import org.HdrHistogram.Histogram;
+import org.HdrHistogram.HistogramLogReader;
import org.testng.Assert;
import org.testng.annotations.BeforeClass;
import org.testng.annotations.Test;
-import org.apache.gobblin.configuration.WorkUnitState;
-
+import com.google.common.base.Charsets;
import com.google.common.collect.ImmutableMap;
+import org.apache.gobblin.configuration.WorkUnitState;
+
public class KafkaExtractorStatsTrackerTest {
List<KafkaPartition> kafkaPartitions = new ArrayList<>();
@@ -41,7 +45,8 @@ public class KafkaExtractorStatsTrackerTest {
kafkaPartitions.add(PARTITION0);
kafkaPartitions.add(PARTITION1);
WorkUnitState workUnitState = new WorkUnitState();
- workUnitState.setProp("gobblin.kafka.recordLevelSlaMinutes", 10L);
+ workUnitState.setProp(KafkaSource.RECORD_LEVEL_SLA_MINUTES_KEY, 10L);
+ workUnitState.setProp(KafkaSource.OBSERVED_LATENCY_MEASUREMENT_ENABLED, true);
this.extractorStatsTracker = new KafkaExtractorStatsTracker(workUnitState, kafkaPartitions);
}
@@ -77,7 +82,9 @@ public class KafkaExtractorStatsTrackerTest {
long readStartTime = System.nanoTime();
Thread.sleep(1);
long decodeStartTime = System.nanoTime();
- long logAppendTimestamp = System.currentTimeMillis() - 15 * 60 * 1000L;
+ long currentTimeMillis = System.currentTimeMillis();
+ long logAppendTimestamp = currentTimeMillis - 15 * 60 * 1000L;
+ long recordCreationTimestamp = currentTimeMillis - 16 * 60 * 1000L;
Assert.assertEquals(this.extractorStatsTracker.getStatsMap().get(kafkaPartitions.get(0)).getProcessedRecordCount(), 0);
Assert.assertEquals(this.extractorStatsTracker.getStatsMap().get(kafkaPartitions.get(0)).getPartitionTotalSize(), 0);
Assert.assertTrue(this.extractorStatsTracker.getStatsMap().get(kafkaPartitions.get(0)).getDecodeRecordTime() == 0);
@@ -85,8 +92,9 @@ public class KafkaExtractorStatsTrackerTest {
Assert.assertEquals(this.extractorStatsTracker.getStatsMap().get(kafkaPartitions.get(0)).getSlaMissedRecordCount(), -1);
Assert.assertEquals(this.extractorStatsTracker.getStatsMap().get(kafkaPartitions.get(0)).getMinLogAppendTime(), -1);
Assert.assertEquals(this.extractorStatsTracker.getStatsMap().get(kafkaPartitions.get(0)).getMaxLogAppendTime(), -1);
+ Assert.assertEquals(this.extractorStatsTracker.getObservedLatencyHistogram().getTotalCount(), 0);
- this.extractorStatsTracker.onDecodeableRecord(0, readStartTime, decodeStartTime, 100, logAppendTimestamp);
+ this.extractorStatsTracker.onDecodeableRecord(0, readStartTime, decodeStartTime, 100, logAppendTimestamp, recordCreationTimestamp);
Assert.assertEquals(this.extractorStatsTracker.getStatsMap().get(kafkaPartitions.get(0)).getProcessedRecordCount(), 1);
Assert.assertEquals(this.extractorStatsTracker.getStatsMap().get(kafkaPartitions.get(0)).getPartitionTotalSize(), 100);
Assert.assertTrue(this.extractorStatsTracker.getStatsMap().get(kafkaPartitions.get(0)).getDecodeRecordTime() > 0);
@@ -94,16 +102,19 @@ public class KafkaExtractorStatsTrackerTest {
Assert.assertEquals(this.extractorStatsTracker.getStatsMap().get(kafkaPartitions.get(0)).getSlaMissedRecordCount(), 1);
Assert.assertEquals(this.extractorStatsTracker.getStatsMap().get(kafkaPartitions.get(0)).getMinLogAppendTime(), logAppendTimestamp);
Assert.assertEquals(this.extractorStatsTracker.getStatsMap().get(kafkaPartitions.get(0)).getMaxLogAppendTime(), logAppendTimestamp);
+ Assert.assertEquals(this.extractorStatsTracker.getObservedLatencyHistogram().getTotalCount(), 1);
readStartTime = System.nanoTime();
Thread.sleep(1);
decodeStartTime = System.nanoTime();
long previousLogAppendTimestamp = logAppendTimestamp;
- logAppendTimestamp = System.currentTimeMillis() - 10;
+ currentTimeMillis = System.currentTimeMillis();
+ logAppendTimestamp = currentTimeMillis - 10;
+ recordCreationTimestamp = currentTimeMillis - 20;
long previousDecodeRecordTime = this.extractorStatsTracker.getStatsMap().get(kafkaPartitions.get(0)).getDecodeRecordTime();
long previousReadRecordTime = this.extractorStatsTracker.getStatsMap().get(kafkaPartitions.get(0)).getReadRecordTime();
- this.extractorStatsTracker.onDecodeableRecord(0, readStartTime, decodeStartTime, 100, logAppendTimestamp);
+ this.extractorStatsTracker.onDecodeableRecord(0, readStartTime, decodeStartTime, 100, logAppendTimestamp, recordCreationTimestamp);
Assert.assertEquals(this.extractorStatsTracker.getStatsMap().get(kafkaPartitions.get(0)).getProcessedRecordCount(), 2);
Assert.assertEquals(this.extractorStatsTracker.getStatsMap().get(kafkaPartitions.get(0)).getPartitionTotalSize(), 200);
Assert.assertTrue(this.extractorStatsTracker.getStatsMap().get(kafkaPartitions.get(0)).getDecodeRecordTime() > previousDecodeRecordTime);
@@ -111,6 +122,7 @@ public class KafkaExtractorStatsTrackerTest {
Assert.assertEquals(this.extractorStatsTracker.getStatsMap().get(kafkaPartitions.get(0)).getSlaMissedRecordCount(), 1);
Assert.assertEquals(this.extractorStatsTracker.getStatsMap().get(kafkaPartitions.get(0)).getMinLogAppendTime(), previousLogAppendTimestamp);
Assert.assertEquals(this.extractorStatsTracker.getStatsMap().get(kafkaPartitions.get(0)).getMaxLogAppendTime(), logAppendTimestamp);
+ Assert.assertEquals(this.extractorStatsTracker.getObservedLatencyHistogram().getTotalCount(), 2);
}
@Test
@@ -145,10 +157,12 @@ public class KafkaExtractorStatsTrackerTest {
Assert.assertEquals(this.extractorStatsTracker.getStatsMap().get(kafkaPartitions.get(0)).getAvgRecordSize(), 100);
readStartTime = System.nanoTime();
- long logAppendTimestamp = System.currentTimeMillis() - 10;
+ long currentTimeMillis = System.currentTimeMillis();
+ long logAppendTimestamp = currentTimeMillis - 10;
+ long recordCreationTimestamp = currentTimeMillis - 20;
Thread.sleep(1);
long decodeStartTime = System.nanoTime();
- this.extractorStatsTracker.onDecodeableRecord(1, readStartTime, decodeStartTime, 100, logAppendTimestamp);
+ this.extractorStatsTracker.onDecodeableRecord(1, readStartTime, decodeStartTime, 100, logAppendTimestamp, recordCreationTimestamp);
this.extractorStatsTracker.updateStatisticsForCurrentPartition(1, readStartTime, 0);
Assert.assertTrue(this.extractorStatsTracker.getStatsMap().get(kafkaPartitions.get(1)).getElapsedTime() > 0);
Assert.assertTrue(this.extractorStatsTracker.getStatsMap().get(kafkaPartitions.get(1)).getAvgMillisPerRecord() > 0);
@@ -156,6 +170,7 @@ public class KafkaExtractorStatsTrackerTest {
Assert.assertEquals(this.extractorStatsTracker.getStatsMap().get(kafkaPartitions.get(1)).getSlaMissedRecordCount(), 0);
Assert.assertEquals(this.extractorStatsTracker.getStatsMap().get(kafkaPartitions.get(1)).getMinLogAppendTime(), logAppendTimestamp);
Assert.assertEquals(this.extractorStatsTracker.getStatsMap().get(kafkaPartitions.get(1)).getMaxLogAppendTime(), logAppendTimestamp);
+ Assert.assertEquals(this.extractorStatsTracker.getObservedLatencyHistogram().getTotalCount(), 3);
}
@Test (dependsOnMethods = "testUpdateStatisticsForCurrentPartition")
@@ -166,8 +181,10 @@ public class KafkaExtractorStatsTrackerTest {
Assert.assertEquals(this.extractorStatsTracker.getAvgRecordSize(0), 0);
long readStartTime = System.nanoTime();
long decodeStartTime = readStartTime + 1;
- long logAppendTimeStamp = System.currentTimeMillis() - 10;
- this.extractorStatsTracker.onDecodeableRecord(1, readStartTime, decodeStartTime, 150, logAppendTimeStamp);
+ long currentTimeMillis = System.currentTimeMillis();
+ long logAppendTimestamp = currentTimeMillis - 10;
+ long recordCreationTimestamp = currentTimeMillis - 20;
+ this.extractorStatsTracker.onDecodeableRecord(1, readStartTime, decodeStartTime, 150, logAppendTimestamp, recordCreationTimestamp);
Assert.assertEquals(this.extractorStatsTracker.getAvgRecordSize(1), 150);
}
@@ -184,4 +201,23 @@ public class KafkaExtractorStatsTrackerTest {
Assert.assertEquals(result.get(PARTITION0).get("testKey"), "testValue");
Assert.assertFalse(result.get(PARTITION1).containsKey("testKey"));
}
+
+ @Test
+ public void testConvertHistogramToString() {
+ Histogram histogram = new Histogram(1, 100, 3);
+ histogram.recordValue(3);
+ histogram.recordValue(25);
+ histogram.recordValue(25);
+ histogram.recordValue(92);
+ String histogramString = KafkaExtractorStatsTracker.convertHistogramToString(histogram);
+
+ HistogramLogReader logReader = new HistogramLogReader(new ByteArrayInputStream(histogramString.getBytes(
+ Charsets.UTF_8)));
+ Histogram histogram1 = (Histogram) logReader.nextIntervalHistogram();
+ Assert.assertEquals(histogram1.getTotalCount(), 4);
+ Assert.assertEquals(histogram1.getMaxValue(), 92);
+ Assert.assertEquals(histogram1.getCountAtValue(25), 2);
+ Assert.assertEquals(histogram1.getCountAtValue(3), 1);
+ Assert.assertEquals(histogram1.getCountAtValue(92), 1);
+ }
}
\ No newline at end of file
diff --git a/gobblin-runtime/build.gradle b/gobblin-runtime/build.gradle
index 0ff442a..306036c 100644
--- a/gobblin-runtime/build.gradle
+++ b/gobblin-runtime/build.gradle
@@ -94,8 +94,7 @@ dependencies {
testCompile externalDependency.curatorTest
testCompile externalDependency.mockito
testRuntime externalDependency.derby
-
- jmh 'org.openjdk.jmh:jmh-core:1.17.3'
+ testCompile externalDependency.jmh
}
// Begin HACK to get around POM being depenendent on the (empty) gobblin-rest-api instead of gobblin-rest-api-rest-client
diff --git a/gradle/scripts/dependencyDefinitions.gradle b/gradle/scripts/dependencyDefinitions.gradle
index 7c83a79..6cc9363 100644
--- a/gradle/scripts/dependencyDefinitions.gradle
+++ b/gradle/scripts/dependencyDefinitions.gradle
@@ -62,6 +62,7 @@ ext.externalDependency = [
"hadoopYarnMiniCluster": "org.apache.hadoop:hadoop-minicluster:" + hadoopVersion,
"hadoopAnnotations": "org.apache.hadoop:hadoop-annotations:" + hadoopVersion,
"hadoopAws": "org.apache.hadoop:hadoop-aws:2.6.0",
+ "hdrHistogram": "org.hdrhistogram:HdrHistogram:2.1.11",
"helix": "org.apache.helix:helix-core:0.8.2",
"hiveCommon": "org.apache.hive:hive-common:" + hiveVersion,
"hiveService": "org.apache.hive:hive-service:" + hiveVersion,
@@ -74,6 +75,8 @@ ext.externalDependency = [
"httpcore": "org.apache.httpcomponents:httpcore:4.4.4",
"httpasyncclient": "org.apache.httpcomponents:httpasyncclient:4.1.3",
"jgit": "org.eclipse.jgit:org.eclipse.jgit:5.1.1.201809181055-r",
+ "jmh": "org.openjdk.jmh:jmh-core:1.17.3",
+ "jmhAnnotations": "org.openjdk.jmh:jmh-generator-annprocess:1.17.3",
"kafka08": "org.apache.kafka:kafka_2.11:" + kafka08Version,
"kafka08Test": "org.apache.kafka:kafka_2.11:" + kafka08Version + ":test",
"kafka08Client": "org.apache.kafka:kafka-clients:" + kafka08Version,