You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by jg...@apache.org on 2016/12/09 23:11:04 UTC

kafka git commit: KAFKA-4000; Collect and record per-topic consumer metrics

Repository: kafka
Updated Branches:
  refs/heads/trunk 055ca9b7a -> 7f8edbc8e


KAFKA-4000; Collect and record per-topic consumer metrics

Improve consumer metric collection by collecting and recording metrics per topic.

Author: Vahid Hashemian <va...@us.ibm.com>

Reviewers: Jason Gustafson <ja...@confluent.io>

Closes #1684 from vahidhashemian/KAFKA-4000


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/7f8edbc8
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/7f8edbc8
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/7f8edbc8

Branch: refs/heads/trunk
Commit: 7f8edbc8e848d22b1cb016a7208bee1e52a65b73
Parents: 055ca9b
Author: Vahid Hashemian <va...@us.ibm.com>
Authored: Fri Dec 9 14:54:30 2016 -0800
Committer: Jason Gustafson <ja...@confluent.io>
Committed: Fri Dec 9 14:55:42 2016 -0800

----------------------------------------------------------------------
 .../clients/consumer/internals/Fetcher.java     | 45 +++++++++++++++-----
 1 file changed, 34 insertions(+), 11 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/7f8edbc8/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java
index e414fcb..4bfe466 100644
--- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java
+++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java
@@ -688,7 +688,6 @@ public class Fetcher<K, V> {
                 }
 
                 recordsCount = parsed.size();
-                this.sensors.recordTopicFetchMetrics(tp.topic(), bytes, recordsCount);
 
                 if (!parsed.isEmpty()) {
                     log.trace("Adding fetched record for partition {} with offset {} to buffered record list", tp, position);
@@ -833,11 +832,11 @@ public class Fetcher<K, V> {
         private final FetchManagerMetrics sensors;
         private final Set<TopicPartition> unrecordedPartitions;
 
-        private int totalBytes;
-        private int totalRecords;
+        private final FetchMetrics fetchMetrics = new FetchMetrics();
+        private final Map<String, FetchMetrics> topicFetchMetrics = new HashMap<>();
 
-        public FetchResponseMetricAggregator(FetchManagerMetrics sensors,
-                                             Set<TopicPartition> partitions) {
+        private FetchResponseMetricAggregator(FetchManagerMetrics sensors,
+                                              Set<TopicPartition> partitions) {
             this.sensors = sensors;
             this.unrecordedPartitions = partitions;
         }
@@ -847,14 +846,38 @@ public class Fetcher<K, V> {
          * and number of records parsed. After all partitions have reported, we write the metric.
          */
         public void record(TopicPartition partition, int bytes, int records) {
-            unrecordedPartitions.remove(partition);
-            totalBytes += bytes;
-            totalRecords += records;
+            this.unrecordedPartitions.remove(partition);
+            this.fetchMetrics.increment(bytes, records);
+
+            // collect and aggregate per-topic metrics
+            String topic = partition.topic();
+            FetchMetrics topicFetchMetric = this.topicFetchMetrics.get(topic);
+            if (topicFetchMetric == null) {
+                topicFetchMetric = new FetchMetrics();
+                this.topicFetchMetrics.put(topic, topicFetchMetric);
+            }
+            topicFetchMetric.increment(bytes, records);
 
-            if (unrecordedPartitions.isEmpty()) {
+            if (this.unrecordedPartitions.isEmpty()) {
                 // once all expected partitions from the fetch have reported in, record the metrics
-                sensors.bytesFetched.record(totalBytes);
-                sensors.recordsFetched.record(totalRecords);
+                this.sensors.bytesFetched.record(topicFetchMetric.fetchBytes);
+                this.sensors.recordsFetched.record(topicFetchMetric.fetchRecords);
+
+                // also record per-topic metrics
+                for (Map.Entry<String, FetchMetrics> entry: this.topicFetchMetrics.entrySet()) {
+                    FetchMetrics metric = entry.getValue();
+                    this.sensors.recordTopicFetchMetrics(entry.getKey(), metric.fetchBytes, metric.fetchRecords);
+                }
+            }
+        }
+
+        private static class FetchMetrics {
+            private int fetchBytes;
+            private int fetchRecords;
+
+            protected void increment(int bytes, int records) {
+                this.fetchBytes += bytes;
+                this.fetchRecords += records;
             }
         }
     }