You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@gobblin.apache.org by su...@apache.org on 2019/11/11 18:06:57 UTC

[incubator-gobblin] branch master updated: [GOBBLIN-955] Expose a method to get average record size in KafkaExtra…

This is an automated email from the ASF dual-hosted git repository.

suvasude pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-gobblin.git


The following commit(s) were added to refs/heads/master by this push:
     new 2dd06a3  [GOBBLIN-955] Expose a method to get average record size in KafkaExtra…
2dd06a3 is described below

commit 2dd06a35ba7105dd39cb7e82b665ce542e32c93a
Author: sv2000 <su...@gmail.com>
AuthorDate: Mon Nov 11 10:06:49 2019 -0800

    [GOBBLIN-955] Expose a method to get average record size in KafkaExtra…
    
    Closes #2804 from sv2000/producerRate
---
 .../extract/kafka/KafkaExtractorStatsTracker.java   | 21 +++++++++++++++++++++
 .../kafka/KafkaExtractorStatsTrackerTest.java       | 12 ++++++++++++
 2 files changed, 33 insertions(+)

diff --git a/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/source/extractor/extract/kafka/KafkaExtractorStatsTracker.java b/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/source/extractor/extract/kafka/KafkaExtractorStatsTracker.java
index 8173418..db61731 100644
--- a/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/source/extractor/extract/kafka/KafkaExtractorStatsTracker.java
+++ b/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/source/extractor/extract/kafka/KafkaExtractorStatsTracker.java
@@ -296,6 +296,27 @@ public class KafkaExtractorStatsTracker {
   }
 
   /**
+   *
+   * @param partitionIdx the index of Kafka partition
+   * @return the average record size of records for a given {@link KafkaPartition}
+   */
+  public long getAvgRecordSize(int partitionIdx) {
+    ExtractorStats stats = this.statsMap.getOrDefault(this.partitions.get(partitionIdx), null);
+    if (stats != null) {
+      if (stats.getAvgRecordSize() != 0) {
+        //Average record size already computed.
+        return stats.getAvgRecordSize();
+      } else {
+        //Compute average record size
+        if (stats.getProcessedRecordCount() != 0) {
+          return stats.getPartitionTotalSize() / stats.getProcessedRecordCount();
+        }
+      }
+    }
+    return 0;
+  }
+
+  /**
    * Reset all KafkaExtractor stats.
    */
   public void reset() {
diff --git a/gobblin-modules/gobblin-kafka-common/src/test/java/org/apache/gobblin/source/extractor/extract/kafka/KafkaExtractorStatsTrackerTest.java b/gobblin-modules/gobblin-kafka-common/src/test/java/org/apache/gobblin/source/extractor/extract/kafka/KafkaExtractorStatsTrackerTest.java
index abd0447..6dbb387 100644
--- a/gobblin-modules/gobblin-kafka-common/src/test/java/org/apache/gobblin/source/extractor/extract/kafka/KafkaExtractorStatsTrackerTest.java
+++ b/gobblin-modules/gobblin-kafka-common/src/test/java/org/apache/gobblin/source/extractor/extract/kafka/KafkaExtractorStatsTrackerTest.java
@@ -134,4 +134,16 @@ public class KafkaExtractorStatsTrackerTest {
     Assert.assertTrue(this.extractorStatsTracker.getStatsMap().get(kafkaPartitions.get(1)).getAvgMillisPerRecord() > 0);
     Assert.assertEquals(this.extractorStatsTracker.getStatsMap().get(kafkaPartitions.get(1)).getAvgRecordSize(), 100);
   }
+
+  @Test (dependsOnMethods = "testUpdateStatisticsForCurrentPartition")
+  public void testGetAvgRecordSize() {
+    Assert.assertEquals(this.extractorStatsTracker.getAvgRecordSize(0), 100);
+    Assert.assertEquals(this.extractorStatsTracker.getAvgRecordSize(1), 100);
+    this.extractorStatsTracker.reset();
+    Assert.assertEquals(this.extractorStatsTracker.getAvgRecordSize(0), 0);
+    long readStartTime = System.nanoTime();
+    long decodeStartTime = readStartTime + 1;
+    this.extractorStatsTracker.onDecodeableRecord(1, readStartTime, decodeStartTime, 150);
+    Assert.assertEquals(this.extractorStatsTracker.getAvgRecordSize(1), 150);
+  }
 }
\ No newline at end of file