You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@gobblin.apache.org by ab...@apache.org on 2018/03/21 08:30:52 UTC
[34/50] incubator-gobblin git commit: [GOBBLIN-408] Add more info to
the KafkaExtractorTopicMetadata event for tracking execution times and rates
[GOBBLIN-408] Add more info to the KafkaExtractorTopicMetadata event for tracking execution times and rates
Closes #2285 from htran1/kafka_load_factor
Project: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/commit/a3189d73
Tree: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/tree/a3189d73
Diff: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/diff/a3189d73
Branch: refs/heads/0.12.0
Commit: a3189d73360c13412d91d42bea05f6ded1e4006a
Parents: 11182dc
Author: Hung Tran <hu...@linkedin.com>
Authored: Tue Feb 20 11:16:32 2018 -0800
Committer: Hung Tran <hu...@linkedin.com>
Committed: Tue Feb 20 11:16:32 2018 -0800
----------------------------------------------------------------------
.../extractor/extract/kafka/KafkaExtractor.java | 81 ++++++++++++++++--
.../extractor/extract/kafka/KafkaSource.java | 87 ++++++++++++++++++--
.../workunit/packer/KafkaWorkUnitPacker.java | 18 ++++
3 files changed, 173 insertions(+), 13 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/a3189d73/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/source/extractor/extract/kafka/KafkaExtractor.java
----------------------------------------------------------------------
diff --git a/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/source/extractor/extract/kafka/KafkaExtractor.java b/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/source/extractor/extract/kafka/KafkaExtractor.java
index 1ff0159..0ec3caf 100644
--- a/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/source/extractor/extract/kafka/KafkaExtractor.java
+++ b/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/source/extractor/extract/kafka/KafkaExtractor.java
@@ -69,7 +69,12 @@ public abstract class KafkaExtractor<S, D> extends EventBasedExtractor<S, D> {
public static final String LOW_WATERMARK = "lowWatermark";
public static final String ACTUAL_HIGH_WATERMARK = "actualHighWatermark";
public static final String EXPECTED_HIGH_WATERMARK = "expectedHighWatermark";
+ public static final String ELAPSED_TIME = "elapsedTime";
+ public static final String PROCESSED_RECORD_COUNT = "processedRecordCount";
public static final String AVG_RECORD_PULL_TIME = "avgRecordPullTime";
+ public static final String READ_RECORD_TIME = "readRecordTime";
+ public static final String DECODE_RECORD_TIME = "decodeRecordTime";
+ public static final String FETCH_MESSAGE_BUFFER_TIME = "fetchMessageBufferTime";
public static final String GOBBLIN_KAFKA_NAMESPACE = "gobblin.kafka";
public static final String KAFKA_EXTRACTOR_TOPIC_METADATA_EVENT_NAME = "KafkaExtractorTopicMetadata";
@@ -87,6 +92,11 @@ public abstract class KafkaExtractor<S, D> extends EventBasedExtractor<S, D> {
protected final Map<KafkaPartition, Integer> decodingErrorCount;
private final Map<KafkaPartition, Double> avgMillisPerRecord;
private final Map<KafkaPartition, Long> avgRecordSizes;
+ private final Map<KafkaPartition, Long> elapsedTime;
+ private final Map<KafkaPartition, Long> processedRecordCount;
+ private final Map<KafkaPartition, Long> decodeRecordTime;
+ private final Map<KafkaPartition, Long> fetchMessageBufferTime;
+ private final Map<KafkaPartition, Long> readRecordTime;
private final Set<Integer> errorPartitions;
private int undecodableMessageCount = 0;
@@ -95,6 +105,10 @@ public abstract class KafkaExtractor<S, D> extends EventBasedExtractor<S, D> {
private int currentPartitionIdx = INITIAL_PARTITION_IDX;
private long currentPartitionRecordCount = 0;
private long currentPartitionTotalSize = 0;
+ private long currentPartitionFetchDuration = 0;
+ private long currentPartitionDecodeRecordTime = 0;
+ private long currentPartitionFetchMessageBufferTime = 0;
+ private long currentPartitionReadRecordTime = 0;
public KafkaExtractor(WorkUnitState state) {
super(state);
@@ -121,6 +135,11 @@ public abstract class KafkaExtractor<S, D> extends EventBasedExtractor<S, D> {
this.decodingErrorCount = Maps.newHashMap();
this.avgMillisPerRecord = Maps.newHashMapWithExpectedSize(this.partitions.size());
this.avgRecordSizes = Maps.newHashMapWithExpectedSize(this.partitions.size());
+ this.elapsedTime = Maps.newHashMapWithExpectedSize(this.partitions.size());
+ this.processedRecordCount = Maps.newHashMapWithExpectedSize(this.partitions.size());
+ this.decodeRecordTime = Maps.newHashMapWithExpectedSize(this.partitions.size());
+ this.fetchMessageBufferTime = Maps.newHashMapWithExpectedSize(this.partitions.size());
+ this.readRecordTime = Maps.newHashMapWithExpectedSize(this.partitions.size());
this.errorPartitions = Sets.newHashSet();
@@ -142,6 +161,8 @@ public abstract class KafkaExtractor<S, D> extends EventBasedExtractor<S, D> {
@SuppressWarnings("unchecked")
@Override
public D readRecordImpl(D reuse) throws DataRecordException, IOException {
+ long readStartTime = System.nanoTime();
+
while (!allPartitionsFinished()) {
if (currentPartitionFinished()) {
moveToNextPartition();
@@ -149,7 +170,9 @@ public abstract class KafkaExtractor<S, D> extends EventBasedExtractor<S, D> {
}
if (this.messageIterator == null || !this.messageIterator.hasNext()) {
try {
+ long fetchStartTime = System.nanoTime();
this.messageIterator = fetchNextMessageBuffer();
+ this.currentPartitionFetchMessageBufferTime += System.nanoTime() - fetchStartTime;
} catch (Exception e) {
LOG.error(String.format("Failed to fetch next message buffer for partition %s. Will skip this partition.",
getCurrentPartition()), e);
@@ -178,6 +201,9 @@ public abstract class KafkaExtractor<S, D> extends EventBasedExtractor<S, D> {
this.nextWatermark.set(this.currentPartitionIdx, nextValidMessage.getNextOffset());
try {
D record = null;
+ // track time for decode/convert depending on the record type
+ long decodeStartTime = System.nanoTime();
+
if (nextValidMessage instanceof ByteArrayBasedKafkaRecord) {
record = decodeRecord((ByteArrayBasedKafkaRecord)nextValidMessage);
} else if (nextValidMessage instanceof DecodeableKafkaRecord){
@@ -194,8 +220,10 @@ public abstract class KafkaExtractor<S, D> extends EventBasedExtractor<S, D> {
+ " or DecodeableKafkaRecord");
}
+ this.currentPartitionDecodeRecordTime += System.nanoTime() - decodeStartTime;
this.currentPartitionRecordCount++;
this.currentPartitionTotalSize += nextValidMessage.getValueSizeInBytes();
+ this.currentPartitionReadRecordTime += System.nanoTime() - readStartTime;
return record;
} catch (Throwable t) {
this.errorPartitions.add(this.currentPartitionIdx);
@@ -208,6 +236,8 @@ public abstract class KafkaExtractor<S, D> extends EventBasedExtractor<S, D> {
}
}
LOG.info("Finished pulling topic " + this.topicName);
+
+ this.currentPartitionReadRecordTime += System.nanoTime() - readStartTime;
return null;
}
@@ -235,10 +265,13 @@ public abstract class KafkaExtractor<S, D> extends EventBasedExtractor<S, D> {
LOG.info("Pulling topic " + this.topicName);
this.currentPartitionIdx = 0;
} else {
- computeAvgMillisPerRecordForCurrentPartition();
+ updateStatisticsForCurrentPartition();
this.currentPartitionIdx++;
this.currentPartitionRecordCount = 0;
- this.currentPartitionTotalSize = 0;
+ this.currentPartitionFetchDuration = 0;
+ this.currentPartitionDecodeRecordTime = 0;
+ this.currentPartitionFetchMessageBufferTime = 0;
+ this.currentPartitionReadRecordTime = 0;
}
this.messageIterator = null;
@@ -251,16 +284,25 @@ public abstract class KafkaExtractor<S, D> extends EventBasedExtractor<S, D> {
this.stopwatch.start();
}
- private void computeAvgMillisPerRecordForCurrentPartition() {
+ private void updateStatisticsForCurrentPartition() {
this.stopwatch.stop();
+
if (this.currentPartitionRecordCount != 0) {
+ this.currentPartitionFetchDuration = this.stopwatch.elapsed(TimeUnit.MILLISECONDS);
double avgMillisForCurrentPartition =
- (double) this.stopwatch.elapsed(TimeUnit.MILLISECONDS) / (double) this.currentPartitionRecordCount;
+ (double) this.currentPartitionFetchDuration / (double) this.currentPartitionRecordCount;
this.avgMillisPerRecord.put(this.getCurrentPartition(), avgMillisForCurrentPartition);
long avgRecordSize = this.currentPartitionTotalSize / this.currentPartitionRecordCount;
this.avgRecordSizes.put(this.getCurrentPartition(), avgRecordSize);
+
+ this.elapsedTime.put(this.getCurrentPartition(), this.currentPartitionFetchDuration);
+ this.processedRecordCount.put(this.getCurrentPartition(), this.currentPartitionRecordCount);
+ this.decodeRecordTime.put(this.getCurrentPartition(), this.currentPartitionDecodeRecordTime);
+ this.fetchMessageBufferTime.put(this.getCurrentPartition(), this.currentPartitionFetchMessageBufferTime);
+ this.readRecordTime.put(this.getCurrentPartition(), this.currentPartitionReadRecordTime);
}
+
this.stopwatch.reset();
}
@@ -317,7 +359,7 @@ public abstract class KafkaExtractor<S, D> extends EventBasedExtractor<S, D> {
@Override
public void close() throws IOException {
- computeAvgMillisPerRecordForCurrentPartition();
+ updateStatisticsForCurrentPartition();
Map<KafkaPartition, Map<String, String>> tagsForPartitionsMap = Maps.newHashMap();
@@ -336,7 +378,36 @@ public abstract class KafkaExtractor<S, D> extends EventBasedExtractor<S, D> {
tagsForPartition.put(PARTITION, Integer.toString(partition.getId()));
tagsForPartition.put(LOW_WATERMARK, Long.toString(this.lowWatermark.get(i)));
tagsForPartition.put(ACTUAL_HIGH_WATERMARK, Long.toString(this.nextWatermark.get(i)));
+ // These are used to compute the load factor,
+ // gobblin consumption rate relative to the kafka production rate.
+ // The gobblin rate is computed as (processed record count/elapsed time)
+ // The kafka rate is computed as (expected high watermark - previous latest offset) /
+ // (current offset fetch epoch time - previous offset fetch epoch time).
tagsForPartition.put(EXPECTED_HIGH_WATERMARK, Long.toString(this.highWatermark.get(i)));
+ tagsForPartition.put(KafkaSource.PREVIOUS_OFFSET_FETCH_EPOCH_TIME,
+ this.workUnitState.getProp(KafkaUtils.getPartitionPropName(KafkaSource.PREVIOUS_OFFSET_FETCH_EPOCH_TIME,
+ i)));
+ tagsForPartition.put(KafkaSource.OFFSET_FETCH_EPOCH_TIME,
+ this.workUnitState.getProp(KafkaUtils.getPartitionPropName(KafkaSource.OFFSET_FETCH_EPOCH_TIME, i)));
+ tagsForPartition.put(KafkaSource.PREVIOUS_LATEST_OFFSET,
+ this.workUnitState.getProp(KafkaUtils.getPartitionPropName(KafkaSource.PREVIOUS_LATEST_OFFSET, i)));
+
+ if (this.processedRecordCount.containsKey(partition)) {
+ tagsForPartition.put(PROCESSED_RECORD_COUNT, Long.toString(this.processedRecordCount.get(partition)));
+ tagsForPartition.put(ELAPSED_TIME, Long.toString(this.elapsedTime.get(partition)));
+ tagsForPartition.put(DECODE_RECORD_TIME, Long.toString(TimeUnit.NANOSECONDS.toMillis(
+ this.decodeRecordTime.get(partition))));
+ tagsForPartition.put(FETCH_MESSAGE_BUFFER_TIME, Long.toString(TimeUnit.NANOSECONDS.toMillis(
+ this.fetchMessageBufferTime.get(partition))));
+ tagsForPartition.put(READ_RECORD_TIME, Long.toString(TimeUnit.NANOSECONDS.toMillis(
+ this.readRecordTime.get(partition))));
+ } else {
+ tagsForPartition.put(PROCESSED_RECORD_COUNT, "0");
+ tagsForPartition.put(ELAPSED_TIME, "0");
+ tagsForPartition.put(DECODE_RECORD_TIME, "0");
+ tagsForPartition.put(FETCH_MESSAGE_BUFFER_TIME, "0");
+ tagsForPartition.put(READ_RECORD_TIME, "0");
+ }
tagsForPartitionsMap.put(partition, tagsForPartition);
}
http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/a3189d73/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/source/extractor/extract/kafka/KafkaSource.java
----------------------------------------------------------------------
diff --git a/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/source/extractor/extract/kafka/KafkaSource.java b/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/source/extractor/extract/kafka/KafkaSource.java
index 69ebea6..b96412c 100644
--- a/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/source/extractor/extract/kafka/KafkaSource.java
+++ b/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/source/extractor/extract/kafka/KafkaSource.java
@@ -104,6 +104,9 @@ public abstract class KafkaSource<S, D> extends EventBasedSource<S, D> {
public static final String ALL_TOPICS = "all";
public static final String AVG_RECORD_SIZE = "avg.record.size";
public static final String AVG_RECORD_MILLIS = "avg.record.millis";
+ public static final String PREVIOUS_LATEST_OFFSET = "previousLatestOffset";
+ public static final String OFFSET_FETCH_EPOCH_TIME = "offsetFetchEpochTime";
+ public static final String PREVIOUS_OFFSET_FETCH_EPOCH_TIME = "previousOffsetFetchEpochTime";
public static final String GOBBLIN_KAFKA_CONSUMER_CLIENT_FACTORY_CLASS = "gobblin.kafka.consumerClient.class";
public static final String GOBBLIN_KAFKA_EXTRACT_ALLOW_TABLE_TYPE_NAMESPACE_CUSTOMIZATION =
"gobblin.kafka.extract.allowTableTypeAndNamspaceCustomization";
@@ -116,6 +119,8 @@ public abstract class KafkaSource<S, D> extends EventBasedSource<S, D> {
private final Set<String> moveToLatestTopics = Sets.newTreeSet(String.CASE_INSENSITIVE_ORDER);
private final Map<KafkaPartition, Long> previousOffsets = Maps.newConcurrentMap();
+ private final Map<KafkaPartition, Long> previousExpectedHighWatermarks = Maps.newConcurrentMap();
+ private final Map<KafkaPartition, Long> previousOffsetFetchEpochTimes = Maps.newConcurrentMap();
private final Set<KafkaPartition> partitionsToBeProcessed = Sets.newConcurrentHashSet();
@@ -298,7 +303,7 @@ public abstract class KafkaSource<S, D> extends EventBasedSource<S, D> {
Map<String, State> topicSpecificStateMap, SourceState state) {
// in case the previous offset not been set
- getAllPreviousOffsets(state);
+ getAllPreviousOffsetState(state);
// For each partition that has a previous offset, create an empty WorkUnit for it if
// it is not in this.partitionsToBeProcessed.
@@ -309,6 +314,7 @@ public abstract class KafkaSource<S, D> extends EventBasedSource<S, D> {
if (!this.isDatasetStateEnabled.get() || this.topicsToProcess.contains(topicName)) {
long previousOffset = entry.getValue();
WorkUnit emptyWorkUnit = createEmptyWorkUnit(partition, previousOffset,
+ this.previousOffsetFetchEpochTimes.get(partition),
Optional.fromNullable(topicSpecificStateMap.get(partition.getTopicName())));
if (workUnits.containsKey(topicName)) {
@@ -368,6 +374,7 @@ public abstract class KafkaSource<S, D> extends EventBasedSource<S, D> {
boolean failedToGetKafkaOffsets = false;
try (Timer.Context context = this.metricContext.timer(OFFSET_FETCH_TIMER).time()) {
+ offsets.setOffsetFetchEpochTime(System.currentTimeMillis());
offsets.setEarliestOffset(this.kafkaConsumerClient.get().getEarliestOffset(partition));
offsets.setLatestOffset(this.kafkaConsumerClient.get().getLatestOffset(partition));
} catch (KafkaOffsetRetrievalFailureException e) {
@@ -375,9 +382,13 @@ public abstract class KafkaSource<S, D> extends EventBasedSource<S, D> {
}
long previousOffset = 0;
+ long previousOffsetFetchEpochTime = 0;
boolean previousOffsetNotFound = false;
try {
previousOffset = getPreviousOffsetForPartition(partition, state);
+ offsets.setPreviousLatestOffset(getPreviousExpectedHighWatermark(partition, state));
+ previousOffsetFetchEpochTime = getPreviousOffsetFetchEpochTimeForPartition(partition, state);
+ offsets.setPreviousOffsetFetchEpochTime(previousOffsetFetchEpochTime);
} catch (PreviousOffsetNotFoundException e) {
previousOffsetNotFound = true;
}
@@ -392,7 +403,8 @@ public abstract class KafkaSource<S, D> extends EventBasedSource<S, D> {
LOG.warn(String
.format("Failed to retrieve earliest and/or latest offset for partition %s. This partition will be skipped.",
partition));
- return previousOffsetNotFound ? null : createEmptyWorkUnit(partition, previousOffset, topicSpecificState);
+ return previousOffsetNotFound ? null : createEmptyWorkUnit(partition, previousOffset, previousOffsetFetchEpochTime,
+ topicSpecificState);
}
if (shouldMoveToLatestOffset(partition, state)) {
@@ -444,7 +456,7 @@ public abstract class KafkaSource<S, D> extends EventBasedSource<S, D> {
offsets.startAtEarliestOffset();
} else {
LOG.warn(offsetOutOfRangeMsg + "This partition will be skipped.");
- return createEmptyWorkUnit(partition, previousOffset, topicSpecificState);
+ return createEmptyWorkUnit(partition, previousOffset, previousOffsetFetchEpochTime, topicSpecificState);
}
}
}
@@ -452,10 +464,24 @@ public abstract class KafkaSource<S, D> extends EventBasedSource<S, D> {
return getWorkUnitForTopicPartition(partition, offsets, topicSpecificState);
}
+ private long getPreviousOffsetFetchEpochTimeForPartition(KafkaPartition partition, SourceState state)
+ throws PreviousOffsetNotFoundException {
+
+ getAllPreviousOffsetState(state);
+
+ if (this.previousOffsetFetchEpochTimes.containsKey(partition)) {
+ return this.previousOffsetFetchEpochTimes.get(partition);
+ }
+
+ throw new PreviousOffsetNotFoundException(String
+ .format("Previous offset fetch epoch time for topic %s, partition %s not found.", partition.getTopicName(),
+ partition.getId()));
+ }
+
private long getPreviousOffsetForPartition(KafkaPartition partition, SourceState state)
throws PreviousOffsetNotFoundException {
- getAllPreviousOffsets(state);
+ getAllPreviousOffsetState(state);
if (this.previousOffsets.containsKey(partition)) {
return this.previousOffsets.get(partition);
@@ -464,12 +490,28 @@ public abstract class KafkaSource<S, D> extends EventBasedSource<S, D> {
.format("Previous offset for topic %s, partition %s not found.", partition.getTopicName(), partition.getId()));
}
- // need to be synchronized as this.previousOffsets need to be initialized once
- private synchronized void getAllPreviousOffsets(SourceState state) {
+ private long getPreviousExpectedHighWatermark(KafkaPartition partition, SourceState state)
+ throws PreviousOffsetNotFoundException {
+
+ getAllPreviousOffsetState(state);
+
+ if (this.previousExpectedHighWatermarks.containsKey(partition)) {
+ return this.previousExpectedHighWatermarks.get(partition);
+ }
+ throw new PreviousOffsetNotFoundException(String
+ .format("Previous expected high watermark for topic %s, partition %s not found.", partition.getTopicName(),
+ partition.getId()));
+ }
+
+ // need to be synchronized as this.previousOffsets, this.previousExpectedHighWatermarks, and
+ // this.previousOffsetFetchEpochTimes need to be initialized once
+ private synchronized void getAllPreviousOffsetState(SourceState state) {
if (this.doneGettingAllPreviousOffsets) {
return;
}
this.previousOffsets.clear();
+ this.previousExpectedHighWatermarks.clear();
+ this.previousOffsetFetchEpochTimes.clear();
Map<String, Iterable<WorkUnitState>> workUnitStatesByDatasetUrns = state.getPreviousWorkUnitStatesByDatasetUrns();
if (!workUnitStatesByDatasetUrns.isEmpty() &&
@@ -481,13 +523,26 @@ public abstract class KafkaSource<S, D> extends EventBasedSource<S, D> {
for (WorkUnitState workUnitState : state.getPreviousWorkUnitStates()) {
List<KafkaPartition> partitions = KafkaUtils.getPartitions(workUnitState);
MultiLongWatermark watermark = workUnitState.getActualHighWatermark(MultiLongWatermark.class);
+ MultiLongWatermark previousExpectedHighWatermark =
+ workUnitState.getWorkunit().getExpectedHighWatermark(MultiLongWatermark.class);
Preconditions.checkArgument(partitions.size() == watermark.size(), String
.format("Num of partitions doesn't match number of watermarks: partitions=%s, watermarks=%s", partitions,
watermark));
+
for (int i = 0; i < partitions.size(); i++) {
+ KafkaPartition partition = partitions.get(i);
+
if (watermark.get(i) != ConfigurationKeys.DEFAULT_WATERMARK_VALUE) {
- this.previousOffsets.put(partitions.get(i), watermark.get(i));
+ this.previousOffsets.put(partition, watermark.get(i));
+ }
+
+ if (previousExpectedHighWatermark.get(i) != ConfigurationKeys.DEFAULT_WATERMARK_VALUE) {
+ this.previousExpectedHighWatermarks.put(partition, previousExpectedHighWatermark.get(i));
}
+
+ this.previousOffsetFetchEpochTimes.put(partition,
+ Long.valueOf(workUnitState.getProp(KafkaUtils.getPartitionPropName(KafkaSource.OFFSET_FETCH_EPOCH_TIME, i),
+ "0")));
}
}
@@ -511,12 +566,13 @@ public abstract class KafkaSource<S, D> extends EventBasedSource<S, D> {
}
// thread safe
- private WorkUnit createEmptyWorkUnit(KafkaPartition partition, long previousOffset,
+ private WorkUnit createEmptyWorkUnit(KafkaPartition partition, long previousOffset, long previousFetchEpochTime,
Optional<State> topicSpecificState) {
Offsets offsets = new Offsets();
offsets.setEarliestOffset(previousOffset);
offsets.setLatestOffset(previousOffset);
offsets.startAtEarliestOffset();
+ offsets.setOffsetFetchEpochTime(previousFetchEpochTime);
return getWorkUnitForTopicPartition(partition, offsets, topicSpecificState);
}
@@ -552,6 +608,9 @@ public abstract class KafkaSource<S, D> extends EventBasedSource<S, D> {
workUnit.setProp(LEADER_HOSTANDPORT, partition.getLeader().getHostAndPort().toString());
workUnit.setProp(ConfigurationKeys.WORK_UNIT_LOW_WATER_MARK_KEY, offsets.getStartOffset());
workUnit.setProp(ConfigurationKeys.WORK_UNIT_HIGH_WATER_MARK_KEY, offsets.getLatestOffset());
+ workUnit.setProp(PREVIOUS_OFFSET_FETCH_EPOCH_TIME, offsets.getPreviousOffsetFetchEpochTime());
+ workUnit.setProp(OFFSET_FETCH_EPOCH_TIME, offsets.getOffsetFetchEpochTime());
+ workUnit.setProp(PREVIOUS_LATEST_OFFSET, offsets.getPreviousLatestOffset());
// Add lineage info
DatasetDescriptor source = new DatasetDescriptor(DatasetConstants.PLATFORM_KAFKA, partition.getTopicName());
@@ -608,6 +667,18 @@ public abstract class KafkaSource<S, D> extends EventBasedSource<S, D> {
@Setter
private long latestOffset = 0;
+ @Getter
+ @Setter
+ private long offsetFetchEpochTime = 0;
+
+ @Getter
+ @Setter
+ private long previousOffsetFetchEpochTime = 0;
+
+ @Getter
+ @Setter
+ private long previousLatestOffset = 0;
+
private void startAt(long offset)
throws StartOffsetOutOfRangeException {
if (offset < this.earliestOffset || offset > this.latestOffset) {
http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/a3189d73/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/source/extractor/extract/kafka/workunit/packer/KafkaWorkUnitPacker.java
----------------------------------------------------------------------
diff --git a/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/source/extractor/extract/kafka/workunit/packer/KafkaWorkUnitPacker.java b/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/source/extractor/extract/kafka/workunit/packer/KafkaWorkUnitPacker.java
index 8d03f4f..0d93796 100644
--- a/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/source/extractor/extract/kafka/workunit/packer/KafkaWorkUnitPacker.java
+++ b/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/source/extractor/extract/kafka/workunit/packer/KafkaWorkUnitPacker.java
@@ -219,6 +219,24 @@ public abstract class KafkaWorkUnitPacker {
workUnit.removeProp(ConfigurationKeys.WORK_UNIT_LOW_WATER_MARK_KEY);
workUnit.removeProp(ConfigurationKeys.WORK_UNIT_HIGH_WATER_MARK_KEY);
workUnit.setWatermarkInterval(interval);
+
+ // Update offset fetch epoch time and previous latest offset. These are used to compute the load factor,
+ // gobblin consumption rate relative to the kafka production rate. The kafka rate is computed as
+ // (current latest offset - previous latest offset)/(current epoch time - previous epoch time).
+ int index = 0;
+ for (WorkUnit wu : multiWorkUnit.getWorkUnits()) {
+ workUnit.setProp(KafkaUtils.getPartitionPropName(KafkaSource.PREVIOUS_OFFSET_FETCH_EPOCH_TIME, index),
+ wu.getProp(KafkaSource.PREVIOUS_OFFSET_FETCH_EPOCH_TIME));
+ workUnit.setProp(KafkaUtils.getPartitionPropName(KafkaSource.OFFSET_FETCH_EPOCH_TIME, index),
+ wu.getProp(KafkaSource.OFFSET_FETCH_EPOCH_TIME));
+ workUnit.setProp(KafkaUtils.getPartitionPropName(KafkaSource.PREVIOUS_LATEST_OFFSET, index),
+ wu.getProp(KafkaSource.PREVIOUS_LATEST_OFFSET));
+ index++;
+ }
+ workUnit.removeProp(KafkaSource.PREVIOUS_OFFSET_FETCH_EPOCH_TIME);
+ workUnit.removeProp(KafkaSource.OFFSET_FETCH_EPOCH_TIME);
+ workUnit.removeProp(KafkaSource.PREVIOUS_LATEST_OFFSET);
+
// Remove the original partition information
workUnit.removeProp(KafkaSource.PARTITION_ID);
workUnit.removeProp(KafkaSource.LEADER_ID);