You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by jg...@apache.org on 2016/12/09 23:11:04 UTC
kafka git commit: KAFKA-4000;
Collect and record per-topic consumer metrics
Repository: kafka
Updated Branches:
refs/heads/trunk 055ca9b7a -> 7f8edbc8e
KAFKA-4000; Collect and record per-topic consumer metrics
Improve consumer metric collection by collecting and recording metrics per topic.
Author: Vahid Hashemian <va...@us.ibm.com>
Reviewers: Jason Gustafson <ja...@confluent.io>
Closes #1684 from vahidhashemian/KAFKA-4000
Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/7f8edbc8
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/7f8edbc8
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/7f8edbc8
Branch: refs/heads/trunk
Commit: 7f8edbc8e848d22b1cb016a7208bee1e52a65b73
Parents: 055ca9b
Author: Vahid Hashemian <va...@us.ibm.com>
Authored: Fri Dec 9 14:54:30 2016 -0800
Committer: Jason Gustafson <ja...@confluent.io>
Committed: Fri Dec 9 14:55:42 2016 -0800
----------------------------------------------------------------------
.../clients/consumer/internals/Fetcher.java | 45 +++++++++++++++-----
1 file changed, 34 insertions(+), 11 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/kafka/blob/7f8edbc8/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java
index e414fcb..4bfe466 100644
--- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java
+++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java
@@ -688,7 +688,6 @@ public class Fetcher<K, V> {
}
recordsCount = parsed.size();
- this.sensors.recordTopicFetchMetrics(tp.topic(), bytes, recordsCount);
if (!parsed.isEmpty()) {
log.trace("Adding fetched record for partition {} with offset {} to buffered record list", tp, position);
@@ -833,11 +832,11 @@ public class Fetcher<K, V> {
private final FetchManagerMetrics sensors;
private final Set<TopicPartition> unrecordedPartitions;
- private int totalBytes;
- private int totalRecords;
+ private final FetchMetrics fetchMetrics = new FetchMetrics();
+ private final Map<String, FetchMetrics> topicFetchMetrics = new HashMap<>();
- public FetchResponseMetricAggregator(FetchManagerMetrics sensors,
- Set<TopicPartition> partitions) {
+ private FetchResponseMetricAggregator(FetchManagerMetrics sensors,
+ Set<TopicPartition> partitions) {
this.sensors = sensors;
this.unrecordedPartitions = partitions;
}
@@ -847,14 +846,38 @@ public class Fetcher<K, V> {
* and number of records parsed. After all partitions have reported, we write the metric.
*/
public void record(TopicPartition partition, int bytes, int records) {
- unrecordedPartitions.remove(partition);
- totalBytes += bytes;
- totalRecords += records;
+ this.unrecordedPartitions.remove(partition);
+ this.fetchMetrics.increment(bytes, records);
+
+ // collect and aggregate per-topic metrics
+ String topic = partition.topic();
+ FetchMetrics topicFetchMetric = this.topicFetchMetrics.get(topic);
+ if (topicFetchMetric == null) {
+ topicFetchMetric = new FetchMetrics();
+ this.topicFetchMetrics.put(topic, topicFetchMetric);
+ }
+ topicFetchMetric.increment(bytes, records);
- if (unrecordedPartitions.isEmpty()) {
+ if (this.unrecordedPartitions.isEmpty()) {
// once all expected partitions from the fetch have reported in, record the metrics
- sensors.bytesFetched.record(totalBytes);
- sensors.recordsFetched.record(totalRecords);
+ this.sensors.bytesFetched.record(topicFetchMetric.fetchBytes);
+ this.sensors.recordsFetched.record(topicFetchMetric.fetchRecords);
+
+ // also record per-topic metrics
+ for (Map.Entry<String, FetchMetrics> entry: this.topicFetchMetrics.entrySet()) {
+ FetchMetrics metric = entry.getValue();
+ this.sensors.recordTopicFetchMetrics(entry.getKey(), metric.fetchBytes, metric.fetchRecords);
+ }
+ }
+ }
+
+ private static class FetchMetrics {
+ private int fetchBytes;
+ private int fetchRecords;
+
+ protected void increment(int bytes, int records) {
+ this.fetchBytes += bytes;
+ this.fetchRecords += records;
}
}
}