You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@gobblin.apache.org by hu...@apache.org on 2018/09/17 18:03:48 UTC
incubator-gobblin git commit: [GOBBLIN-589] Add more statistics to
KafkaExtractor tracking event
Repository: incubator-gobblin
Updated Branches:
refs/heads/master ef59a1517 -> fcc4d412a
[GOBBLIN-589] Add more statistics to KafkaExtractor tracking event
- Added emitting start/stop fetch epoch time
statistics as well as
partition total size
- Added lagging and emitting of fetch epoch times
and watermark
statistics from previous run
- Made changes to allow subclasses of
KafkaExtractor to emit
statistics based on the lastSuccessfulRecord
Closes #2455 from cshen98/metrics1
Project: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/commit/fcc4d412
Tree: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/tree/fcc4d412
Diff: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/diff/fcc4d412
Branch: refs/heads/master
Commit: fcc4d412afa40a83ec1fccb25d76ea146ff1164b
Parents: ef59a15
Author: Carl Shen <ca...@gmail.com>
Authored: Mon Sep 17 11:03:43 2018 -0700
Committer: Hung Tran <hu...@linkedin.com>
Committed: Mon Sep 17 11:03:43 2018 -0700
----------------------------------------------------------------------
.../extractor/extract/kafka/KafkaExtractor.java | 173 +++++++++++--------
.../extractor/extract/kafka/KafkaSource.java | 82 ++++++++-
.../extractor/extract/kafka/KafkaUtils.java | 14 ++
.../workunit/packer/KafkaWorkUnitPacker.java | 12 ++
4 files changed, 209 insertions(+), 72 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/fcc4d412/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/source/extractor/extract/kafka/KafkaExtractor.java
----------------------------------------------------------------------
diff --git a/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/source/extractor/extract/kafka/KafkaExtractor.java b/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/source/extractor/extract/kafka/KafkaExtractor.java
index 0ec3caf..664446f 100644
--- a/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/source/extractor/extract/kafka/KafkaExtractor.java
+++ b/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/source/extractor/extract/kafka/KafkaExtractor.java
@@ -28,7 +28,6 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.google.common.base.Preconditions;
-import com.google.common.base.Stopwatch;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import com.google.common.collect.Sets;
@@ -71,6 +70,7 @@ public abstract class KafkaExtractor<S, D> extends EventBasedExtractor<S, D> {
public static final String EXPECTED_HIGH_WATERMARK = "expectedHighWatermark";
public static final String ELAPSED_TIME = "elapsedTime";
public static final String PROCESSED_RECORD_COUNT = "processedRecordCount";
+ public static final String PARTITION_TOTAL_SIZE = "partitionTotalSize";
public static final String AVG_RECORD_PULL_TIME = "avgRecordPullTime";
public static final String READ_RECORD_TIME = "readRecordTime";
public static final String DECODE_RECORD_TIME = "decodeRecordTime";
@@ -87,16 +87,17 @@ public abstract class KafkaExtractor<S, D> extends EventBasedExtractor<S, D> {
protected final GobblinKafkaConsumerClient kafkaConsumerClient;
private final ClassAliasResolver<GobblinKafkaConsumerClientFactory> kafkaConsumerClientResolver;
- protected final Stopwatch stopwatch;
-
protected final Map<KafkaPartition, Integer> decodingErrorCount;
private final Map<KafkaPartition, Double> avgMillisPerRecord;
private final Map<KafkaPartition, Long> avgRecordSizes;
private final Map<KafkaPartition, Long> elapsedTime;
private final Map<KafkaPartition, Long> processedRecordCount;
+ private final Map<KafkaPartition, Long> partitionTotalSize;
private final Map<KafkaPartition, Long> decodeRecordTime;
private final Map<KafkaPartition, Long> fetchMessageBufferTime;
private final Map<KafkaPartition, Long> readRecordTime;
+ private final Map<KafkaPartition, Long> startFetchEpochTime;
+ private final Map<KafkaPartition, Long> stopFetchEpochTime;
private final Set<Integer> errorPartitions;
private int undecodableMessageCount = 0;
@@ -105,10 +106,10 @@ public abstract class KafkaExtractor<S, D> extends EventBasedExtractor<S, D> {
private int currentPartitionIdx = INITIAL_PARTITION_IDX;
private long currentPartitionRecordCount = 0;
private long currentPartitionTotalSize = 0;
- private long currentPartitionFetchDuration = 0;
private long currentPartitionDecodeRecordTime = 0;
private long currentPartitionFetchMessageBufferTime = 0;
private long currentPartitionReadRecordTime = 0;
+ protected D currentPartitionLastSuccessfulRecord = null;
public KafkaExtractor(WorkUnitState state) {
super(state);
@@ -130,16 +131,17 @@ public abstract class KafkaExtractor<S, D> extends EventBasedExtractor<S, D> {
throw new RuntimeException(e);
}
- this.stopwatch = Stopwatch.createUnstarted();
-
this.decodingErrorCount = Maps.newHashMap();
this.avgMillisPerRecord = Maps.newHashMapWithExpectedSize(this.partitions.size());
this.avgRecordSizes = Maps.newHashMapWithExpectedSize(this.partitions.size());
this.elapsedTime = Maps.newHashMapWithExpectedSize(this.partitions.size());
this.processedRecordCount = Maps.newHashMapWithExpectedSize(this.partitions.size());
+ this.partitionTotalSize = Maps.newHashMapWithExpectedSize(this.partitions.size());
this.decodeRecordTime = Maps.newHashMapWithExpectedSize(this.partitions.size());
this.fetchMessageBufferTime = Maps.newHashMapWithExpectedSize(this.partitions.size());
this.readRecordTime = Maps.newHashMapWithExpectedSize(this.partitions.size());
+ this.startFetchEpochTime = Maps.newHashMapWithExpectedSize(this.partitions.size());
+ this.stopFetchEpochTime= Maps.newHashMapWithExpectedSize(this.partitions.size());
this.errorPartitions = Sets.newHashSet();
@@ -224,6 +226,7 @@ public abstract class KafkaExtractor<S, D> extends EventBasedExtractor<S, D> {
this.currentPartitionRecordCount++;
this.currentPartitionTotalSize += nextValidMessage.getValueSizeInBytes();
this.currentPartitionReadRecordTime += System.nanoTime() - readStartTime;
+ this.currentPartitionLastSuccessfulRecord = record;
return record;
} catch (Throwable t) {
this.errorPartitions.add(this.currentPartitionIdx);
@@ -268,10 +271,11 @@ public abstract class KafkaExtractor<S, D> extends EventBasedExtractor<S, D> {
updateStatisticsForCurrentPartition();
this.currentPartitionIdx++;
this.currentPartitionRecordCount = 0;
- this.currentPartitionFetchDuration = 0;
+ this.currentPartitionTotalSize = 0;
this.currentPartitionDecodeRecordTime = 0;
this.currentPartitionFetchMessageBufferTime = 0;
this.currentPartitionReadRecordTime = 0;
+ this.currentPartitionLastSuccessfulRecord = null;
}
this.messageIterator = null;
@@ -281,29 +285,36 @@ public abstract class KafkaExtractor<S, D> extends EventBasedExtractor<S, D> {
this.highWatermark.get(this.currentPartitionIdx) - this.nextWatermark.get(this.currentPartitionIdx)));
switchMetricContextToCurrentPartition();
}
- this.stopwatch.start();
+
+ if (!allPartitionsFinished()) {
+ this.startFetchEpochTime.put(this.getCurrentPartition(), System.currentTimeMillis());
+ }
}
- private void updateStatisticsForCurrentPartition() {
- this.stopwatch.stop();
+ protected void updateStatisticsForCurrentPartition() {
+ long stopFetchEpochTime = System.currentTimeMillis();
+
+ if (!allPartitionsFinished()) {
+ this.stopFetchEpochTime.put(this.getCurrentPartition(), stopFetchEpochTime);
+ }
if (this.currentPartitionRecordCount != 0) {
- this.currentPartitionFetchDuration = this.stopwatch.elapsed(TimeUnit.MILLISECONDS);
+ long currentPartitionFetchDuration =
+ stopFetchEpochTime - this.startFetchEpochTime.get(this.getCurrentPartition());
double avgMillisForCurrentPartition =
- (double) this.currentPartitionFetchDuration / (double) this.currentPartitionRecordCount;
+ (double) currentPartitionFetchDuration / (double) this.currentPartitionRecordCount;
this.avgMillisPerRecord.put(this.getCurrentPartition(), avgMillisForCurrentPartition);
long avgRecordSize = this.currentPartitionTotalSize / this.currentPartitionRecordCount;
this.avgRecordSizes.put(this.getCurrentPartition(), avgRecordSize);
- this.elapsedTime.put(this.getCurrentPartition(), this.currentPartitionFetchDuration);
+ this.elapsedTime.put(this.getCurrentPartition(), currentPartitionFetchDuration);
this.processedRecordCount.put(this.getCurrentPartition(), this.currentPartitionRecordCount);
+ this.partitionTotalSize.put(this.getCurrentPartition(), this.currentPartitionTotalSize);
this.decodeRecordTime.put(this.getCurrentPartition(), this.currentPartitionDecodeRecordTime);
this.fetchMessageBufferTime.put(this.getCurrentPartition(), this.currentPartitionFetchMessageBufferTime);
this.readRecordTime.put(this.getCurrentPartition(), this.currentPartitionReadRecordTime);
}
-
- this.stopwatch.reset();
}
private void switchMetricContextToCurrentPartition() {
@@ -367,65 +378,13 @@ public abstract class KafkaExtractor<S, D> extends EventBasedExtractor<S, D> {
this.workUnitState.setProp(ConfigurationKeys.ERROR_PARTITION_COUNT, this.errorPartitions.size());
this.workUnitState.setProp(ConfigurationKeys.ERROR_MESSAGE_UNDECODABLE_COUNT, this.undecodableMessageCount);
- // Commit actual high watermark for each partition
for (int i = 0; i < this.partitions.size(); i++) {
LOG.info(String.format("Actual high watermark for partition %s=%d, expected=%d", this.partitions.get(i),
this.nextWatermark.get(i), this.highWatermark.get(i)));
-
- Map<String, String> tagsForPartition = Maps.newHashMap();
- KafkaPartition partition = this.partitions.get(i);
- tagsForPartition.put(TOPIC, partition.getTopicName());
- tagsForPartition.put(PARTITION, Integer.toString(partition.getId()));
- tagsForPartition.put(LOW_WATERMARK, Long.toString(this.lowWatermark.get(i)));
- tagsForPartition.put(ACTUAL_HIGH_WATERMARK, Long.toString(this.nextWatermark.get(i)));
- // These are used to compute the load factor,
- // gobblin consumption rate relative to the kafka production rate.
- // The gobblin rate is computed as (processed record count/elapsed time)
- // The kafka rate is computed as (expected high watermark - previous latest offset) /
- // (current offset fetch epoch time - previous offset fetch epoch time).
- tagsForPartition.put(EXPECTED_HIGH_WATERMARK, Long.toString(this.highWatermark.get(i)));
- tagsForPartition.put(KafkaSource.PREVIOUS_OFFSET_FETCH_EPOCH_TIME,
- this.workUnitState.getProp(KafkaUtils.getPartitionPropName(KafkaSource.PREVIOUS_OFFSET_FETCH_EPOCH_TIME,
- i)));
- tagsForPartition.put(KafkaSource.OFFSET_FETCH_EPOCH_TIME,
- this.workUnitState.getProp(KafkaUtils.getPartitionPropName(KafkaSource.OFFSET_FETCH_EPOCH_TIME, i)));
- tagsForPartition.put(KafkaSource.PREVIOUS_LATEST_OFFSET,
- this.workUnitState.getProp(KafkaUtils.getPartitionPropName(KafkaSource.PREVIOUS_LATEST_OFFSET, i)));
-
- if (this.processedRecordCount.containsKey(partition)) {
- tagsForPartition.put(PROCESSED_RECORD_COUNT, Long.toString(this.processedRecordCount.get(partition)));
- tagsForPartition.put(ELAPSED_TIME, Long.toString(this.elapsedTime.get(partition)));
- tagsForPartition.put(DECODE_RECORD_TIME, Long.toString(TimeUnit.NANOSECONDS.toMillis(
- this.decodeRecordTime.get(partition))));
- tagsForPartition.put(FETCH_MESSAGE_BUFFER_TIME, Long.toString(TimeUnit.NANOSECONDS.toMillis(
- this.fetchMessageBufferTime.get(partition))));
- tagsForPartition.put(READ_RECORD_TIME, Long.toString(TimeUnit.NANOSECONDS.toMillis(
- this.readRecordTime.get(partition))));
- } else {
- tagsForPartition.put(PROCESSED_RECORD_COUNT, "0");
- tagsForPartition.put(ELAPSED_TIME, "0");
- tagsForPartition.put(DECODE_RECORD_TIME, "0");
- tagsForPartition.put(FETCH_MESSAGE_BUFFER_TIME, "0");
- tagsForPartition.put(READ_RECORD_TIME, "0");
- }
-
- tagsForPartitionsMap.put(partition, tagsForPartition);
+ tagsForPartitionsMap.put(this.partitions.get(i), createTagsForPartition(i));
}
this.workUnitState.setActualHighWatermark(this.nextWatermark);
- // Commit avg time to pull a record for each partition
- for (KafkaPartition partition : this.partitions) {
- if (this.avgMillisPerRecord.containsKey(partition)) {
- double avgMillis = this.avgMillisPerRecord.get(partition);
- LOG.info(String.format("Avg time to pull a record for partition %s = %f milliseconds", partition, avgMillis));
- KafkaUtils.setPartitionAvgRecordMillis(this.workUnitState, partition, avgMillis);
- tagsForPartitionsMap.get(partition).put(AVG_RECORD_PULL_TIME, Double.toString(avgMillis));
- } else {
- LOG.info(String.format("Avg time to pull a record for partition %s not recorded", partition));
- tagsForPartitionsMap.get(partition).put(AVG_RECORD_PULL_TIME, Double.toString(-1));
- }
- }
-
if (isInstrumentationEnabled()) {
for (Map.Entry<KafkaPartition, Map<String, String>> eventTags : tagsForPartitionsMap.entrySet()) {
new EventSubmitter.Builder(getMetricContext(), GOBBLIN_KAFKA_NAMESPACE).build()
@@ -436,6 +395,84 @@ public abstract class KafkaExtractor<S, D> extends EventBasedExtractor<S, D> {
this.closer.close();
}
+ protected Map<String, String> createTagsForPartition(int partitionId) {
+ Map<String, String> tagsForPartition = Maps.newHashMap();
+ KafkaPartition partition = this.partitions.get(partitionId);
+
+ tagsForPartition.put(TOPIC, partition.getTopicName());
+ tagsForPartition.put(PARTITION, Integer.toString(partition.getId()));
+ tagsForPartition.put(LOW_WATERMARK, Long.toString(this.lowWatermark.get(partitionId)));
+ tagsForPartition.put(ACTUAL_HIGH_WATERMARK, Long.toString(this.nextWatermark.get(partitionId)));
+
+ // These are used to compute the load factor,
+ // gobblin consumption rate relative to the kafka production rate.
+ // The gobblin rate is computed as (processed record count/elapsed time)
+ // The kafka rate is computed as (expected high watermark - previous latest offset) /
+ // (current offset fetch epoch time - previous offset fetch epoch time).
+ tagsForPartition.put(EXPECTED_HIGH_WATERMARK, Long.toString(this.highWatermark.get(partitionId)));
+ tagsForPartition.put(KafkaSource.PREVIOUS_OFFSET_FETCH_EPOCH_TIME,
+ Long.toString(KafkaUtils.getPropAsLongFromSingleOrMultiWorkUnitState(this.workUnitState,
+ KafkaSource.PREVIOUS_OFFSET_FETCH_EPOCH_TIME, partitionId)));
+ tagsForPartition.put(KafkaSource.OFFSET_FETCH_EPOCH_TIME,
+ Long.toString(KafkaUtils.getPropAsLongFromSingleOrMultiWorkUnitState(this.workUnitState,
+ KafkaSource.OFFSET_FETCH_EPOCH_TIME, partitionId)));
+ tagsForPartition.put(KafkaSource.PREVIOUS_LATEST_OFFSET,
+ Long.toString(KafkaUtils.getPropAsLongFromSingleOrMultiWorkUnitState(this.workUnitState,
+ KafkaSource.PREVIOUS_LATEST_OFFSET, partitionId)));
+
+ tagsForPartition.put(KafkaSource.PREVIOUS_LOW_WATERMARK,
+ Long.toString(KafkaUtils.getPropAsLongFromSingleOrMultiWorkUnitState(this.workUnitState,
+ KafkaSource.PREVIOUS_LOW_WATERMARK, partitionId)));
+ tagsForPartition.put(KafkaSource.PREVIOUS_HIGH_WATERMARK,
+ Long.toString(KafkaUtils.getPropAsLongFromSingleOrMultiWorkUnitState(this.workUnitState,
+ KafkaSource.PREVIOUS_HIGH_WATERMARK, partitionId)));
+ tagsForPartition.put(KafkaSource.PREVIOUS_START_FETCH_EPOCH_TIME,
+ Long.toString(KafkaUtils.getPropAsLongFromSingleOrMultiWorkUnitState(this.workUnitState,
+ KafkaSource.PREVIOUS_START_FETCH_EPOCH_TIME, partitionId)));
+ tagsForPartition.put(KafkaSource.PREVIOUS_STOP_FETCH_EPOCH_TIME,
+ Long.toString(KafkaUtils.getPropAsLongFromSingleOrMultiWorkUnitState(this.workUnitState,
+ KafkaSource.PREVIOUS_STOP_FETCH_EPOCH_TIME, partitionId)));
+
+ tagsForPartition.put(KafkaSource.START_FETCH_EPOCH_TIME, Long.toString(this.startFetchEpochTime.get(partition)));
+ tagsForPartition.put(KafkaSource.STOP_FETCH_EPOCH_TIME, Long.toString(this.stopFetchEpochTime.get(partition)));
+ this.workUnitState.setProp(KafkaUtils.getPartitionPropName(KafkaSource.START_FETCH_EPOCH_TIME, partitionId),
+ Long.toString(this.startFetchEpochTime.get(partition)));
+ this.workUnitState.setProp(KafkaUtils.getPartitionPropName(KafkaSource.STOP_FETCH_EPOCH_TIME, partitionId),
+ Long.toString(this.stopFetchEpochTime.get(partition)));
+
+ if (this.processedRecordCount.containsKey(partition)) {
+ tagsForPartition.put(PROCESSED_RECORD_COUNT, Long.toString(this.processedRecordCount.get(partition)));
+ tagsForPartition.put(PARTITION_TOTAL_SIZE, Long.toString(this.partitionTotalSize.get(partition)));
+ tagsForPartition.put(ELAPSED_TIME, Long.toString(this.elapsedTime.get(partition)));
+ tagsForPartition.put(DECODE_RECORD_TIME, Long.toString(TimeUnit.NANOSECONDS.toMillis(
+ this.decodeRecordTime.get(partition))));
+ tagsForPartition.put(FETCH_MESSAGE_BUFFER_TIME, Long.toString(TimeUnit.NANOSECONDS.toMillis(
+ this.fetchMessageBufferTime.get(partition))));
+ tagsForPartition.put(READ_RECORD_TIME, Long.toString(TimeUnit.NANOSECONDS.toMillis(
+ this.readRecordTime.get(partition))));
+ } else {
+ tagsForPartition.put(PROCESSED_RECORD_COUNT, "0");
+ tagsForPartition.put(PARTITION_TOTAL_SIZE, "0");
+ tagsForPartition.put(ELAPSED_TIME, "0");
+ tagsForPartition.put(DECODE_RECORD_TIME, "0");
+ tagsForPartition.put(FETCH_MESSAGE_BUFFER_TIME, "0");
+ tagsForPartition.put(READ_RECORD_TIME, "0");
+ }
+
+ // Commit avg time to pull a record for each partition
+ if (this.avgMillisPerRecord.containsKey(partition)) {
+ double avgMillis = this.avgMillisPerRecord.get(partition);
+ LOG.info(String.format("Avg time to pull a record for partition %s = %f milliseconds", partition, avgMillis));
+ KafkaUtils.setPartitionAvgRecordMillis(this.workUnitState, partition, avgMillis);
+ tagsForPartition.put(AVG_RECORD_PULL_TIME, Double.toString(avgMillis));
+ } else {
+ LOG.info(String.format("Avg time to pull a record for partition %s not recorded", partition));
+ tagsForPartition.put(AVG_RECORD_PULL_TIME, Double.toString(-1));
+ }
+
+ return tagsForPartition;
+ }
+
@Deprecated
@Override
public long getHighWatermark() {
http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/fcc4d412/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/source/extractor/extract/kafka/KafkaSource.java
----------------------------------------------------------------------
diff --git a/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/source/extractor/extract/kafka/KafkaSource.java b/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/source/extractor/extract/kafka/KafkaSource.java
index b96412c..e7d7da0 100644
--- a/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/source/extractor/extract/kafka/KafkaSource.java
+++ b/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/source/extractor/extract/kafka/KafkaSource.java
@@ -104,6 +104,12 @@ public abstract class KafkaSource<S, D> extends EventBasedSource<S, D> {
public static final String ALL_TOPICS = "all";
public static final String AVG_RECORD_SIZE = "avg.record.size";
public static final String AVG_RECORD_MILLIS = "avg.record.millis";
+ public static final String START_FETCH_EPOCH_TIME = "startFetchEpochTime";
+ public static final String STOP_FETCH_EPOCH_TIME = "stopFetchEpochTime";
+ public static final String PREVIOUS_START_FETCH_EPOCH_TIME = "previousStartFetchEpochTime";
+ public static final String PREVIOUS_STOP_FETCH_EPOCH_TIME = "previousStopFetchEpochTime";
+ public static final String PREVIOUS_LOW_WATERMARK = "previousLowWatermark";
+ public static final String PREVIOUS_HIGH_WATERMARK = "previousHighWatermark";
public static final String PREVIOUS_LATEST_OFFSET = "previousLatestOffset";
public static final String OFFSET_FETCH_EPOCH_TIME = "offsetFetchEpochTime";
public static final String PREVIOUS_OFFSET_FETCH_EPOCH_TIME = "previousOffsetFetchEpochTime";
@@ -119,8 +125,11 @@ public abstract class KafkaSource<S, D> extends EventBasedSource<S, D> {
private final Set<String> moveToLatestTopics = Sets.newTreeSet(String.CASE_INSENSITIVE_ORDER);
private final Map<KafkaPartition, Long> previousOffsets = Maps.newConcurrentMap();
+ private final Map<KafkaPartition, Long> previousLowWatermarks = Maps.newConcurrentMap();
private final Map<KafkaPartition, Long> previousExpectedHighWatermarks = Maps.newConcurrentMap();
private final Map<KafkaPartition, Long> previousOffsetFetchEpochTimes = Maps.newConcurrentMap();
+ private final Map<KafkaPartition, Long> previousStartFetchEpochTimes = Maps.newConcurrentMap();
+ private final Map<KafkaPartition, Long> previousStopFetchEpochTimes = Maps.newConcurrentMap();
private final Set<KafkaPartition> partitionsToBeProcessed = Sets.newConcurrentHashSet();
@@ -386,6 +395,10 @@ public abstract class KafkaSource<S, D> extends EventBasedSource<S, D> {
boolean previousOffsetNotFound = false;
try {
previousOffset = getPreviousOffsetForPartition(partition, state);
+ offsets.setPreviousEndOffset(previousOffset);
+ offsets.setPreviousStartOffset(getPreviousLowWatermark(partition, state));
+ offsets.setPreviousStartFetchEpochTime(getPreviousStartFetchEpochTimeForPartition(partition, state));
+ offsets.setPreviousStopFetchEpochTime(getPreviousStopFetchEpochTimeForPartition(partition, state));
offsets.setPreviousLatestOffset(getPreviousExpectedHighWatermark(partition, state));
previousOffsetFetchEpochTime = getPreviousOffsetFetchEpochTimeForPartition(partition, state);
offsets.setPreviousOffsetFetchEpochTime(previousOffsetFetchEpochTime);
@@ -464,6 +477,18 @@ public abstract class KafkaSource<S, D> extends EventBasedSource<S, D> {
return getWorkUnitForTopicPartition(partition, offsets, topicSpecificState);
}
+ private long getPreviousStartFetchEpochTimeForPartition(KafkaPartition partition, SourceState state) {
+ getAllPreviousOffsetState(state);
+ return this.previousStartFetchEpochTimes.containsKey(partition) ?
+ this.previousStartFetchEpochTimes.get(partition) : 0;
+ }
+
+ private long getPreviousStopFetchEpochTimeForPartition(KafkaPartition partition, SourceState state) {
+ getAllPreviousOffsetState(state);
+ return this.previousStopFetchEpochTimes.containsKey(partition) ?
+ this.previousStopFetchEpochTimes.get(partition) : 0;
+ }
+
private long getPreviousOffsetFetchEpochTimeForPartition(KafkaPartition partition, SourceState state)
throws PreviousOffsetNotFoundException {
@@ -503,6 +528,19 @@ public abstract class KafkaSource<S, D> extends EventBasedSource<S, D> {
partition.getId()));
}
+ private long getPreviousLowWatermark(KafkaPartition partition, SourceState state)
+ throws PreviousOffsetNotFoundException {
+
+ getAllPreviousOffsetState(state);
+
+ if (this.previousLowWatermarks.containsKey(partition)) {
+ return this.previousLowWatermarks.get(partition);
+ }
+ throw new PreviousOffsetNotFoundException(String
+ .format("Previous low watermark for topic %s, partition %s not found.", partition.getTopicName(),
+ partition.getId()));
+ }
+
// need to be synchronized as this.previousOffsets, this.previousExpectedHighWatermarks, and
// this.previousOffsetFetchEpochTimes need to be initialized once
private synchronized void getAllPreviousOffsetState(SourceState state) {
@@ -510,8 +548,11 @@ public abstract class KafkaSource<S, D> extends EventBasedSource<S, D> {
return;
}
this.previousOffsets.clear();
+ this.previousLowWatermarks.clear();
this.previousExpectedHighWatermarks.clear();
this.previousOffsetFetchEpochTimes.clear();
+ this.previousStartFetchEpochTimes.clear();
+ this.previousStopFetchEpochTimes.clear();
Map<String, Iterable<WorkUnitState>> workUnitStatesByDatasetUrns = state.getPreviousWorkUnitStatesByDatasetUrns();
if (!workUnitStatesByDatasetUrns.isEmpty() &&
@@ -522,9 +563,11 @@ public abstract class KafkaSource<S, D> extends EventBasedSource<S, D> {
for (WorkUnitState workUnitState : state.getPreviousWorkUnitStates()) {
List<KafkaPartition> partitions = KafkaUtils.getPartitions(workUnitState);
+ WorkUnit workUnit = workUnitState.getWorkunit();
+
MultiLongWatermark watermark = workUnitState.getActualHighWatermark(MultiLongWatermark.class);
- MultiLongWatermark previousExpectedHighWatermark =
- workUnitState.getWorkunit().getExpectedHighWatermark(MultiLongWatermark.class);
+ MultiLongWatermark previousLowWatermark = workUnit.getLowWatermark(MultiLongWatermark.class);
+ MultiLongWatermark previousExpectedHighWatermark = workUnit.getExpectedHighWatermark(MultiLongWatermark.class);
Preconditions.checkArgument(partitions.size() == watermark.size(), String
.format("Num of partitions doesn't match number of watermarks: partitions=%s, watermarks=%s", partitions,
watermark));
@@ -536,13 +579,22 @@ public abstract class KafkaSource<S, D> extends EventBasedSource<S, D> {
this.previousOffsets.put(partition, watermark.get(i));
}
+ if (previousLowWatermark.get(i) != ConfigurationKeys.DEFAULT_WATERMARK_VALUE) {
+ this.previousLowWatermarks.put(partition, previousLowWatermark.get(i));
+ }
+
if (previousExpectedHighWatermark.get(i) != ConfigurationKeys.DEFAULT_WATERMARK_VALUE) {
this.previousExpectedHighWatermarks.put(partition, previousExpectedHighWatermark.get(i));
}
this.previousOffsetFetchEpochTimes.put(partition,
- Long.valueOf(workUnitState.getProp(KafkaUtils.getPartitionPropName(KafkaSource.OFFSET_FETCH_EPOCH_TIME, i),
- "0")));
+ KafkaUtils.getPropAsLongFromSingleOrMultiWorkUnitState(workUnitState, OFFSET_FETCH_EPOCH_TIME, i));
+
+ this.previousStartFetchEpochTimes.put(partition,
+ KafkaUtils.getPropAsLongFromSingleOrMultiWorkUnitState(workUnitState, START_FETCH_EPOCH_TIME, i));
+
+ this.previousStopFetchEpochTimes.put(partition,
+ KafkaUtils.getPropAsLongFromSingleOrMultiWorkUnitState(workUnitState, STOP_FETCH_EPOCH_TIME, i));
}
}
@@ -608,6 +660,10 @@ public abstract class KafkaSource<S, D> extends EventBasedSource<S, D> {
workUnit.setProp(LEADER_HOSTANDPORT, partition.getLeader().getHostAndPort().toString());
workUnit.setProp(ConfigurationKeys.WORK_UNIT_LOW_WATER_MARK_KEY, offsets.getStartOffset());
workUnit.setProp(ConfigurationKeys.WORK_UNIT_HIGH_WATER_MARK_KEY, offsets.getLatestOffset());
+ workUnit.setProp(PREVIOUS_START_FETCH_EPOCH_TIME, offsets.getPreviousStartFetchEpochTime());
+ workUnit.setProp(PREVIOUS_STOP_FETCH_EPOCH_TIME, offsets.getPreviousStopFetchEpochTime());
+ workUnit.setProp(PREVIOUS_LOW_WATERMARK, offsets.getPreviousStartOffset());
+ workUnit.setProp(PREVIOUS_HIGH_WATERMARK, offsets.getPreviousEndOffset());
workUnit.setProp(PREVIOUS_OFFSET_FETCH_EPOCH_TIME, offsets.getPreviousOffsetFetchEpochTime());
workUnit.setProp(OFFSET_FETCH_EPOCH_TIME, offsets.getOffsetFetchEpochTime());
workUnit.setProp(PREVIOUS_LATEST_OFFSET, offsets.getPreviousLatestOffset());
@@ -679,6 +735,24 @@ public abstract class KafkaSource<S, D> extends EventBasedSource<S, D> {
@Setter
private long previousLatestOffset = 0;
+ // previous low watermark
+ @Getter
+ @Setter
+ private long previousStartOffset = 0;
+
+ // previous actual high watermark
+ @Getter
+ @Setter
+ private long previousEndOffset = 0;
+
+ @Getter
+ @Setter
+ private long previousStartFetchEpochTime = 0;
+
+ @Getter
+ @Setter
+ private long previousStopFetchEpochTime = 0;
+
private void startAt(long offset)
throws StartOffsetOutOfRangeException {
if (offset < this.earliestOffset || offset > this.latestOffset) {
http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/fcc4d412/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/source/extractor/extract/kafka/KafkaUtils.java
----------------------------------------------------------------------
diff --git a/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/source/extractor/extract/kafka/KafkaUtils.java b/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/source/extractor/extract/kafka/KafkaUtils.java
index 55ecab4..5472134 100644
--- a/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/source/extractor/extract/kafka/KafkaUtils.java
+++ b/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/source/extractor/extract/kafka/KafkaUtils.java
@@ -19,6 +19,7 @@ package org.apache.gobblin.source.extractor.extract.kafka;
import org.apache.gobblin.configuration.ConfigurationKeys;
import org.apache.gobblin.configuration.State;
+import org.apache.gobblin.configuration.WorkUnitState;
import java.util.List;
@@ -168,4 +169,17 @@ public class KafkaUtils {
getPartitionPropName(partition.getTopicName(), partition.getId()) + "." + KafkaSource.AVG_RECORD_MILLIS,
millis);
}
+
+ /**
+ * Get a property as long from a work unit that may or may not be a multiworkunit.
+ * This method is needed because the SingleLevelWorkUnitPacker does not squeeze work units
+ * into a multiworkunit, and thus does not append the partitionId to property keys, while
+ * the BiLevelWorkUnitPacker does.
+ * Return 0 as default if key not found in either form.
+ */
+ public static long getPropAsLongFromSingleOrMultiWorkUnitState(WorkUnitState workUnitState,
+ String key, int partitionId) {
+ return Long.parseLong(workUnitState.contains(key) ? workUnitState.getProp(key)
+ : workUnitState.getProp(KafkaUtils.getPartitionPropName(key, partitionId), "0"));
+ }
}
http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/fcc4d412/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/source/extractor/extract/kafka/workunit/packer/KafkaWorkUnitPacker.java
----------------------------------------------------------------------
diff --git a/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/source/extractor/extract/kafka/workunit/packer/KafkaWorkUnitPacker.java b/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/source/extractor/extract/kafka/workunit/packer/KafkaWorkUnitPacker.java
index 0d93796..fef3219 100644
--- a/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/source/extractor/extract/kafka/workunit/packer/KafkaWorkUnitPacker.java
+++ b/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/source/extractor/extract/kafka/workunit/packer/KafkaWorkUnitPacker.java
@@ -225,6 +225,14 @@ public abstract class KafkaWorkUnitPacker {
// (current latest offset - previous latest offset)/(current epoch time - previous epoch time).
int index = 0;
for (WorkUnit wu : multiWorkUnit.getWorkUnits()) {
+ workUnit.setProp(KafkaUtils.getPartitionPropName(KafkaSource.PREVIOUS_START_FETCH_EPOCH_TIME, index),
+ wu.getProp(KafkaSource.PREVIOUS_START_FETCH_EPOCH_TIME));
+ workUnit.setProp(KafkaUtils.getPartitionPropName(KafkaSource.PREVIOUS_STOP_FETCH_EPOCH_TIME, index),
+ wu.getProp(KafkaSource.PREVIOUS_STOP_FETCH_EPOCH_TIME));
+ workUnit.setProp(KafkaUtils.getPartitionPropName(KafkaSource.PREVIOUS_LOW_WATERMARK, index),
+ wu.getProp(KafkaSource.PREVIOUS_LOW_WATERMARK));
+ workUnit.setProp(KafkaUtils.getPartitionPropName(KafkaSource.PREVIOUS_HIGH_WATERMARK, index),
+ wu.getProp(KafkaSource.PREVIOUS_HIGH_WATERMARK));
workUnit.setProp(KafkaUtils.getPartitionPropName(KafkaSource.PREVIOUS_OFFSET_FETCH_EPOCH_TIME, index),
wu.getProp(KafkaSource.PREVIOUS_OFFSET_FETCH_EPOCH_TIME));
workUnit.setProp(KafkaUtils.getPartitionPropName(KafkaSource.OFFSET_FETCH_EPOCH_TIME, index),
@@ -233,6 +241,10 @@ public abstract class KafkaWorkUnitPacker {
wu.getProp(KafkaSource.PREVIOUS_LATEST_OFFSET));
index++;
}
+ workUnit.removeProp(KafkaSource.PREVIOUS_START_FETCH_EPOCH_TIME);
+ workUnit.removeProp(KafkaSource.PREVIOUS_STOP_FETCH_EPOCH_TIME);
+ workUnit.removeProp(KafkaSource.PREVIOUS_LOW_WATERMARK);
+ workUnit.removeProp(KafkaSource.PREVIOUS_HIGH_WATERMARK);
workUnit.removeProp(KafkaSource.PREVIOUS_OFFSET_FETCH_EPOCH_TIME);
workUnit.removeProp(KafkaSource.OFFSET_FETCH_EPOCH_TIME);
workUnit.removeProp(KafkaSource.PREVIOUS_LATEST_OFFSET);