You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by gu...@apache.org on 2016/03/06 00:17:29 UTC

kafka git commit: KAFKA-3233: expose consumer per-topic metrics

Repository: kafka
Updated Branches:
  refs/heads/trunk b7d6fae59 -> 33f330754


KAFKA-3233: expose consumer per-topic metrics

In version of 0.8.2.1, the old consumer will provide the metrics reporter per-topic consumer metrics under group 'ConsumerTopicMetrics'. For example:

*.ConsumerTopicMetrics.clientId.[topic name].BytesPerSec.count
*.ConsumerTopicMetrics.clientId.[topic name].MessagesPerSec.count

These consumer metrics are useful since it helps us monitor consumer rate for each topic. But the new consumer(0.9.0.0) doesn't expose per topic metrics anymore, even though I did find sensor objects in consumer metrics object collecting per-topic metrics.

After investigation, I found that these sensors are not registering any KafkaMetrics.

Author: Yifan Ying <yy...@fitbit.com>

Reviewers: Grant Henke, Jason Gustafson, Guozhang Wang

Closes #939 from happymap/KAFKA-3233


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/33f33075
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/33f33075
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/33f33075

Branch: refs/heads/trunk
Commit: 33f33075468f53bf6d4d1bd79fdb64443ca30dfa
Parents: b7d6fae
Author: Yifan Ying <yy...@fitbit.com>
Authored: Sat Mar 5 15:17:21 2016 -0800
Committer: Guozhang Wang <wa...@gmail.com>
Committed: Sat Mar 5 15:17:21 2016 -0800

----------------------------------------------------------------------
 .../clients/consumer/internals/Fetcher.java     | 29 ++++++++++++++++++--
 1 file changed, 27 insertions(+), 2 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/33f33075/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java
index e2a5548..7a1a720 100644
--- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java
+++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java
@@ -766,18 +766,43 @@ public class Fetcher<K, V> {
         }
 
         public void recordTopicFetchMetrics(String topic, int bytes, int records) {
+            Map<String, String> metricTags = new HashMap<>();
+            metricTags.put("topic", topic.replace(".", "_"));
+
             // record bytes fetched
             String name = "topic." + topic + ".bytes-fetched";
             Sensor bytesFetched = this.metrics.getSensor(name);
-            if (bytesFetched == null)
+            if (bytesFetched == null) {
                 bytesFetched = this.metrics.sensor(name);
+                bytesFetched.add(this.metrics.metricName("fetch-size-avg",
+                        this.metricGrpName,
+                        "The average number of bytes fetched per request for topic " + topic,
+                        metricTags), new Avg());
+                bytesFetched.add(this.metrics.metricName("fetch-size-max",
+                        this.metricGrpName,
+                        "The maximum number of bytes fetched per request for topic " + topic,
+                        metricTags), new Max());
+                bytesFetched.add(this.metrics.metricName("bytes-consumed-rate",
+                        this.metricGrpName,
+                        "The average number of bytes consumed per second for topic " + topic,
+                        metricTags), new Rate());
+            }
             bytesFetched.record(bytes);
 
             // record records fetched
             name = "topic." + topic + ".records-fetched";
             Sensor recordsFetched = this.metrics.getSensor(name);
-            if (recordsFetched == null)
+            if (recordsFetched == null) {
                 recordsFetched = this.metrics.sensor(name);
+                recordsFetched.add(this.metrics.metricName("records-per-request-avg",
+                        this.metricGrpName,
+                        "The average number of records in each request for topic " + topic,
+                        metricTags), new Avg());
+                recordsFetched.add(this.metrics.metricName("records-consumed-rate",
+                        this.metricGrpName,
+                        "The average number of records consumed per second for topic " + topic,
+                        metricTags), new Rate());
+            }
             recordsFetched.record(records);
         }
     }