You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@gobblin.apache.org by su...@apache.org on 2019/11/11 18:06:57 UTC
[incubator-gobblin] branch master updated: [GOBBLIN-955] Expose a method to get average record size in KafkaExtra…
This is an automated email from the ASF dual-hosted git repository.
suvasude pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-gobblin.git
The following commit(s) were added to refs/heads/master by this push:
new 2dd06a3 [GOBBLIN-955] Expose a method to get average record size in KafkaExtra…
2dd06a3 is described below
commit 2dd06a35ba7105dd39cb7e82b665ce542e32c93a
Author: sv2000 <su...@gmail.com>
AuthorDate: Mon Nov 11 10:06:49 2019 -0800
[GOBBLIN-955] Expose a method to get average record size in KafkaExtra…
Closes #2804 from sv2000/producerRate
---
.../extract/kafka/KafkaExtractorStatsTracker.java | 21 +++++++++++++++++++++
.../kafka/KafkaExtractorStatsTrackerTest.java | 12 ++++++++++++
2 files changed, 33 insertions(+)
diff --git a/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/source/extractor/extract/kafka/KafkaExtractorStatsTracker.java b/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/source/extractor/extract/kafka/KafkaExtractorStatsTracker.java
index 8173418..db61731 100644
--- a/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/source/extractor/extract/kafka/KafkaExtractorStatsTracker.java
+++ b/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/source/extractor/extract/kafka/KafkaExtractorStatsTracker.java
@@ -296,6 +296,27 @@ public class KafkaExtractorStatsTracker {
}
/**
+ *
+ * @param partitionIdx the index of Kafka partition
+ * @return the average record size of records for a given {@link KafkaPartition}
+ */
+ public long getAvgRecordSize(int partitionIdx) {
+ ExtractorStats stats = this.statsMap.getOrDefault(this.partitions.get(partitionIdx), null);
+ if (stats != null) {
+ if (stats.getAvgRecordSize() != 0) {
+ //Average record size already computed.
+ return stats.getAvgRecordSize();
+ } else {
+ //Compute average record size
+ if (stats.getProcessedRecordCount() != 0) {
+ return stats.getPartitionTotalSize() / stats.getProcessedRecordCount();
+ }
+ }
+ }
+ return 0;
+ }
+
+ /**
* Reset all KafkaExtractor stats.
*/
public void reset() {
diff --git a/gobblin-modules/gobblin-kafka-common/src/test/java/org/apache/gobblin/source/extractor/extract/kafka/KafkaExtractorStatsTrackerTest.java b/gobblin-modules/gobblin-kafka-common/src/test/java/org/apache/gobblin/source/extractor/extract/kafka/KafkaExtractorStatsTrackerTest.java
index abd0447..6dbb387 100644
--- a/gobblin-modules/gobblin-kafka-common/src/test/java/org/apache/gobblin/source/extractor/extract/kafka/KafkaExtractorStatsTrackerTest.java
+++ b/gobblin-modules/gobblin-kafka-common/src/test/java/org/apache/gobblin/source/extractor/extract/kafka/KafkaExtractorStatsTrackerTest.java
@@ -134,4 +134,16 @@ public class KafkaExtractorStatsTrackerTest {
Assert.assertTrue(this.extractorStatsTracker.getStatsMap().get(kafkaPartitions.get(1)).getAvgMillisPerRecord() > 0);
Assert.assertEquals(this.extractorStatsTracker.getStatsMap().get(kafkaPartitions.get(1)).getAvgRecordSize(), 100);
}
+
+ @Test (dependsOnMethods = "testUpdateStatisticsForCurrentPartition")
+ public void testGetAvgRecordSize() {
+ Assert.assertEquals(this.extractorStatsTracker.getAvgRecordSize(0), 100);
+ Assert.assertEquals(this.extractorStatsTracker.getAvgRecordSize(1), 100);
+ this.extractorStatsTracker.reset();
+ Assert.assertEquals(this.extractorStatsTracker.getAvgRecordSize(0), 0);
+ long readStartTime = System.nanoTime();
+ long decodeStartTime = readStartTime + 1;
+ this.extractorStatsTracker.onDecodeableRecord(1, readStartTime, decodeStartTime, 150);
+ Assert.assertEquals(this.extractorStatsTracker.getAvgRecordSize(1), 150);
+ }
}
\ No newline at end of file