You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@gobblin.apache.org by ib...@apache.org on 2017/10/12 22:40:47 UTC

incubator-gobblin git commit: [GOBBLIN-285] Kafka extractor computes avgMillisPerRecord even when a partition pull is interrupted

Repository: incubator-gobblin
Updated Branches:
  refs/heads/master c5e83a331 -> 07c86f2a7


[GOBBLIN-285] Kafka extractor computes avgMillisPerRecord even when a partition pull is interrupted

Closes #2138 from ibuenros/avgpulltime-fix


Project: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/commit/07c86f2a
Tree: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/tree/07c86f2a
Diff: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/diff/07c86f2a

Branch: refs/heads/master
Commit: 07c86f2a7401c4a81d7104eb668dd8826b609520
Parents: c5e83a3
Author: ibuenros <is...@gmail.com>
Authored: Thu Oct 12 15:40:40 2017 -0700
Committer: Issac Buenrostro <ib...@apache.org>
Committed: Thu Oct 12 15:40:40 2017 -0700

----------------------------------------------------------------------
 .../extractor/extract/kafka/KafkaExtractor.java | 26 ++++++++++++--------
 1 file changed, 16 insertions(+), 10 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/07c86f2a/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/source/extractor/extract/kafka/KafkaExtractor.java
----------------------------------------------------------------------
diff --git a/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/source/extractor/extract/kafka/KafkaExtractor.java b/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/source/extractor/extract/kafka/KafkaExtractor.java
index ac1a7f2..1ff0159 100644
--- a/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/source/extractor/extract/kafka/KafkaExtractor.java
+++ b/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/source/extractor/extract/kafka/KafkaExtractor.java
@@ -235,19 +235,10 @@ public abstract class KafkaExtractor<S, D> extends EventBasedExtractor<S, D> {
       LOG.info("Pulling topic " + this.topicName);
       this.currentPartitionIdx = 0;
     } else {
-      this.stopwatch.stop();
-      if (this.currentPartitionRecordCount != 0) {
-        double avgMillisForCurrentPartition =
-            (double) this.stopwatch.elapsed(TimeUnit.MILLISECONDS) / (double) this.currentPartitionRecordCount;
-        this.avgMillisPerRecord.put(this.getCurrentPartition(), avgMillisForCurrentPartition);
-
-        long avgRecordSize = this.currentPartitionTotalSize / this.currentPartitionRecordCount;
-        this.avgRecordSizes.put(this.getCurrentPartition(), avgRecordSize);
-      }
+      computeAvgMillisPerRecordForCurrentPartition();
       this.currentPartitionIdx++;
       this.currentPartitionRecordCount = 0;
       this.currentPartitionTotalSize = 0;
-      this.stopwatch.reset();
     }
 
     this.messageIterator = null;
@@ -260,6 +251,19 @@ public abstract class KafkaExtractor<S, D> extends EventBasedExtractor<S, D> {
     this.stopwatch.start();
   }
 
+  private void computeAvgMillisPerRecordForCurrentPartition() {
+    this.stopwatch.stop();
+    if (this.currentPartitionRecordCount != 0) {
+      double avgMillisForCurrentPartition =
+          (double) this.stopwatch.elapsed(TimeUnit.MILLISECONDS) / (double) this.currentPartitionRecordCount;
+      this.avgMillisPerRecord.put(this.getCurrentPartition(), avgMillisForCurrentPartition);
+
+      long avgRecordSize = this.currentPartitionTotalSize / this.currentPartitionRecordCount;
+      this.avgRecordSizes.put(this.getCurrentPartition(), avgRecordSize);
+    }
+    this.stopwatch.reset();
+  }
+
   private void switchMetricContextToCurrentPartition() {
     if (this.currentPartitionIdx >= this.partitions.size()) {
       return;
@@ -313,6 +317,8 @@ public abstract class KafkaExtractor<S, D> extends EventBasedExtractor<S, D> {
   @Override
   public void close() throws IOException {
 
+    computeAvgMillisPerRecordForCurrentPartition();
+
     Map<KafkaPartition, Map<String, String>> tagsForPartitionsMap = Maps.newHashMap();
 
     // Add error partition count and error message count to workUnitState