You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@gobblin.apache.org by ib...@apache.org on 2017/10/12 22:40:47 UTC
incubator-gobblin git commit: [GOBBLIN-285] Kafka extractor computes
avgMillisPerRecord even when a partition pull is interrupted
Repository: incubator-gobblin
Updated Branches:
refs/heads/master c5e83a331 -> 07c86f2a7
[GOBBLIN-285] Kafka extractor computes avgMillisPerRecord even when a partition pull is interrupted
Closes #2138 from ibuenros/avgpulltime-fix
Project: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/commit/07c86f2a
Tree: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/tree/07c86f2a
Diff: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/diff/07c86f2a
Branch: refs/heads/master
Commit: 07c86f2a7401c4a81d7104eb668dd8826b609520
Parents: c5e83a3
Author: ibuenros <is...@gmail.com>
Authored: Thu Oct 12 15:40:40 2017 -0700
Committer: Issac Buenrostro <ib...@apache.org>
Committed: Thu Oct 12 15:40:40 2017 -0700
----------------------------------------------------------------------
.../extractor/extract/kafka/KafkaExtractor.java | 26 ++++++++++++--------
1 file changed, 16 insertions(+), 10 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/07c86f2a/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/source/extractor/extract/kafka/KafkaExtractor.java
----------------------------------------------------------------------
diff --git a/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/source/extractor/extract/kafka/KafkaExtractor.java b/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/source/extractor/extract/kafka/KafkaExtractor.java
index ac1a7f2..1ff0159 100644
--- a/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/source/extractor/extract/kafka/KafkaExtractor.java
+++ b/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/source/extractor/extract/kafka/KafkaExtractor.java
@@ -235,19 +235,10 @@ public abstract class KafkaExtractor<S, D> extends EventBasedExtractor<S, D> {
LOG.info("Pulling topic " + this.topicName);
this.currentPartitionIdx = 0;
} else {
- this.stopwatch.stop();
- if (this.currentPartitionRecordCount != 0) {
- double avgMillisForCurrentPartition =
- (double) this.stopwatch.elapsed(TimeUnit.MILLISECONDS) / (double) this.currentPartitionRecordCount;
- this.avgMillisPerRecord.put(this.getCurrentPartition(), avgMillisForCurrentPartition);
-
- long avgRecordSize = this.currentPartitionTotalSize / this.currentPartitionRecordCount;
- this.avgRecordSizes.put(this.getCurrentPartition(), avgRecordSize);
- }
+ computeAvgMillisPerRecordForCurrentPartition();
this.currentPartitionIdx++;
this.currentPartitionRecordCount = 0;
this.currentPartitionTotalSize = 0;
- this.stopwatch.reset();
}
this.messageIterator = null;
@@ -260,6 +251,19 @@ public abstract class KafkaExtractor<S, D> extends EventBasedExtractor<S, D> {
this.stopwatch.start();
}
+ private void computeAvgMillisPerRecordForCurrentPartition() {
+ this.stopwatch.stop();
+ if (this.currentPartitionRecordCount != 0) {
+ double avgMillisForCurrentPartition =
+ (double) this.stopwatch.elapsed(TimeUnit.MILLISECONDS) / (double) this.currentPartitionRecordCount;
+ this.avgMillisPerRecord.put(this.getCurrentPartition(), avgMillisForCurrentPartition);
+
+ long avgRecordSize = this.currentPartitionTotalSize / this.currentPartitionRecordCount;
+ this.avgRecordSizes.put(this.getCurrentPartition(), avgRecordSize);
+ }
+ this.stopwatch.reset();
+ }
+
private void switchMetricContextToCurrentPartition() {
if (this.currentPartitionIdx >= this.partitions.size()) {
return;
@@ -313,6 +317,8 @@ public abstract class KafkaExtractor<S, D> extends EventBasedExtractor<S, D> {
@Override
public void close() throws IOException {
+ computeAvgMillisPerRecordForCurrentPartition();
+
Map<KafkaPartition, Map<String, String>> tagsForPartitionsMap = Maps.newHashMap();
// Add error partition count and error message count to workUnitState