You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by gu...@apache.org on 2016/03/06 00:17:29 UTC
kafka git commit: KAFKA-3233: expose consumer per-topic metrics
Repository: kafka
Updated Branches:
refs/heads/trunk b7d6fae59 -> 33f330754
KAFKA-3233: expose consumer per-topic metrics
In version of 0.8.2.1, the old consumer will provide the metrics reporter per-topic consumer metrics under group 'ConsumerTopicMetrics'. For example:
*.ConsumerTopicMetrics.clientId.[topic name].BytesPerSec.count
*.ConsumerTopicMetrics.clientId.[topic name].MessagesPerSec.count
These consumer metrics are useful since it helps us monitor consumer rate for each topic. But the new consumer(0.9.0.0) doesn't expose per topic metrics anymore, even though I did find sensor objects in consumer metrics object collecting per-topic metrics.
After investigation, I found that these sensors are not registering any KafkaMetrics.
Author: Yifan Ying <yy...@fitbit.com>
Reviewers: Grant Henke, Jason Gustafson, Guozhang Wang
Closes #939 from happymap/KAFKA-3233
Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/33f33075
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/33f33075
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/33f33075
Branch: refs/heads/trunk
Commit: 33f33075468f53bf6d4d1bd79fdb64443ca30dfa
Parents: b7d6fae
Author: Yifan Ying <yy...@fitbit.com>
Authored: Sat Mar 5 15:17:21 2016 -0800
Committer: Guozhang Wang <wa...@gmail.com>
Committed: Sat Mar 5 15:17:21 2016 -0800
----------------------------------------------------------------------
.../clients/consumer/internals/Fetcher.java | 29 ++++++++++++++++++--
1 file changed, 27 insertions(+), 2 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/kafka/blob/33f33075/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java
index e2a5548..7a1a720 100644
--- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java
+++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java
@@ -766,18 +766,43 @@ public class Fetcher<K, V> {
}
public void recordTopicFetchMetrics(String topic, int bytes, int records) {
+ Map<String, String> metricTags = new HashMap<>();
+ metricTags.put("topic", topic.replace(".", "_"));
+
// record bytes fetched
String name = "topic." + topic + ".bytes-fetched";
Sensor bytesFetched = this.metrics.getSensor(name);
- if (bytesFetched == null)
+ if (bytesFetched == null) {
bytesFetched = this.metrics.sensor(name);
+ bytesFetched.add(this.metrics.metricName("fetch-size-avg",
+ this.metricGrpName,
+ "The average number of bytes fetched per request for topic " + topic,
+ metricTags), new Avg());
+ bytesFetched.add(this.metrics.metricName("fetch-size-max",
+ this.metricGrpName,
+ "The maximum number of bytes fetched per request for topic " + topic,
+ metricTags), new Max());
+ bytesFetched.add(this.metrics.metricName("bytes-consumed-rate",
+ this.metricGrpName,
+ "The average number of bytes consumed per second for topic " + topic,
+ metricTags), new Rate());
+ }
bytesFetched.record(bytes);
// record records fetched
name = "topic." + topic + ".records-fetched";
Sensor recordsFetched = this.metrics.getSensor(name);
- if (recordsFetched == null)
+ if (recordsFetched == null) {
recordsFetched = this.metrics.sensor(name);
+ recordsFetched.add(this.metrics.metricName("records-per-request-avg",
+ this.metricGrpName,
+ "The average number of records in each request for topic " + topic,
+ metricTags), new Avg());
+ recordsFetched.add(this.metrics.metricName("records-consumed-rate",
+ this.metricGrpName,
+ "The average number of records consumed per second for topic " + topic,
+ metricTags), new Rate());
+ }
recordsFetched.record(records);
}
}