You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by gu...@apache.org on 2017/05/26 22:34:22 UTC
kafka git commit: KAFKA-5191: Autogenerate Consumer Fetcher metrics
Repository: kafka
Updated Branches:
refs/heads/trunk ca8915d2e -> 0bc4f75ee
KAFKA-5191: Autogenerate Consumer Fetcher metrics
Autogenerate docs for the Consumer Fetcher's metrics. This is a smaller subset of the original PR https://github.com/apache/kafka/pull/1202.
CC ijuma benstopford hachikuji
Author: James Cheng <jy...@yahoo.com>
Reviewers: Ismael Juma <is...@juma.me.uk>, Guozhang Wang <wa...@gmail.com>
Closes #2993 from wushujames/fetcher_metrics_docs
Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/0bc4f75e
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/0bc4f75e
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/0bc4f75e
Branch: refs/heads/trunk
Commit: 0bc4f75eedf14af6d0b2e3e9be62a460b1049d0b
Parents: ca8915d
Author: James Cheng <jy...@yahoo.com>
Authored: Fri May 26 15:34:20 2017 -0700
Committer: Guozhang Wang <wa...@gmail.com>
Committed: Fri May 26 15:34:20 2017 -0700
----------------------------------------------------------------------
build.gradle | 9 +-
.../kafka/clients/consumer/KafkaConsumer.java | 6 +-
.../consumer/internals/ConsumerMetrics.java | 51 +++++++
.../clients/consumer/internals/Fetcher.java | 95 +++++--------
.../internals/FetcherMetricsRegistry.java | 140 +++++++++++++++++++
.../apache/kafka/common/MetricNameTemplate.java | 72 ++++++++++
.../kafka/common/metrics/JmxReporter.java | 6 +-
.../apache/kafka/common/metrics/Metrics.java | 83 +++++++++++
.../clients/consumer/KafkaConsumerTest.java | 5 +-
.../clients/consumer/internals/FetcherTest.java | 28 ++--
.../kafka/common/metrics/MetricsTest.java | 54 +++++++
.../kafka/common/metrics/SampleMetrics.java | 31 ++++
docs/ops.html | 105 +-------------
13 files changed, 498 insertions(+), 187 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/kafka/blob/0bc4f75e/build.gradle
----------------------------------------------------------------------
diff --git a/build.gradle b/build.gradle
index 56c643e..07835ca 100644
--- a/build.gradle
+++ b/build.gradle
@@ -617,10 +617,17 @@ project(':core') {
standardOutput = new File(generatedDocsDir, "topic_config.html").newOutputStream()
}
+ task genConsumerMetricsDocs(type: JavaExec) {
+ classpath = sourceSets.test.runtimeClasspath
+ main = 'org.apache.kafka.clients.consumer.internals.ConsumerMetrics'
+ if( !generatedDocsDir.exists() ) { generatedDocsDir.mkdirs() }
+ standardOutput = new File(generatedDocsDir, "consumer_metrics.html").newOutputStream()
+ }
+
task siteDocsTar(dependsOn: ['genProtocolErrorDocs', 'genProtocolApiKeyDocs', 'genProtocolMessageDocs',
'genProducerConfigDocs', 'genConsumerConfigDocs', 'genKafkaConfigDocs',
'genTopicConfigDocs', ':connect:runtime:genConnectConfigDocs', ':connect:runtime:genConnectTransformationDocs',
- ':streams:genStreamsConfigDocs'], type: Tar) {
+ ':streams:genStreamsConfigDocs', 'genConsumerMetricsDocs'], type: Tar) {
classifier = 'site-docs'
compression = Compression.GZIP
from project.file("$rootDir/docs")
http://git-wip-us.apache.org/repos/asf/kafka/blob/0bc4f75e/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java b/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java
index 0359071..52d9456 100644
--- a/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java
+++ b/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java
@@ -22,6 +22,7 @@ import org.apache.kafka.clients.Metadata;
import org.apache.kafka.clients.NetworkClient;
import org.apache.kafka.clients.consumer.internals.ConsumerCoordinator;
import org.apache.kafka.clients.consumer.internals.ConsumerInterceptors;
+import org.apache.kafka.clients.consumer.internals.ConsumerMetrics;
import org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient;
import org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.PollCondition;
import org.apache.kafka.clients.consumer.internals.Fetcher;
@@ -662,11 +663,12 @@ public class KafkaConsumer<K, V> implements Consumer<K, V> {
List<InetSocketAddress> addresses = ClientUtils.parseAndValidateAddresses(config.getList(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG));
this.metadata.update(Cluster.bootstrap(addresses), Collections.<String>emptySet(), 0);
String metricGrpPrefix = "consumer";
+ ConsumerMetrics metricsRegistry = new ConsumerMetrics(metricsTags.keySet(), "consumer");
ChannelBuilder channelBuilder = ClientUtils.createChannelBuilder(config);
IsolationLevel isolationLevel = IsolationLevel.valueOf(
config.getString(ConsumerConfig.ISOLATION_LEVEL_CONFIG).toUpperCase(Locale.ROOT));
- Sensor throttleTimeSensor = Fetcher.throttleTimeSensor(metrics, metricGrpPrefix);
+ Sensor throttleTimeSensor = Fetcher.throttleTimeSensor(metrics, metricsRegistry.fetcherMetrics);
NetworkClient netClient = new NetworkClient(
new Selector(config.getLong(ConsumerConfig.CONNECTIONS_MAX_IDLE_MS_CONFIG), metrics, time, metricGrpPrefix, channelBuilder),
@@ -718,7 +720,7 @@ public class KafkaConsumer<K, V> implements Consumer<K, V> {
this.metadata,
this.subscriptions,
metrics,
- metricGrpPrefix,
+ metricsRegistry.fetcherMetrics,
this.time,
this.retryBackoffMs,
isolationLevel);
http://git-wip-us.apache.org/repos/asf/kafka/blob/0bc4f75e/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerMetrics.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerMetrics.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerMetrics.java
new file mode 100644
index 0000000..3492323
--- /dev/null
+++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerMetrics.java
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.clients.consumer.internals;
+
+import java.util.ArrayList;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+
+import org.apache.kafka.common.MetricNameTemplate;
+import org.apache.kafka.common.metrics.Metrics;
+
+public class ConsumerMetrics {
+
+ public FetcherMetricsRegistry fetcherMetrics;
+
+ public ConsumerMetrics(Set<String> metricsTags, String metricGrpPrefix) {
+ this.fetcherMetrics = new FetcherMetricsRegistry(metricsTags, metricGrpPrefix);
+ }
+
+ public ConsumerMetrics(String metricGroupPrefix) {
+ this(new HashSet<String>(), metricGroupPrefix);
+ }
+
+ private List<MetricNameTemplate> getAllTemplates() {
+ List<MetricNameTemplate> l = new ArrayList<>();
+ l.addAll(this.fetcherMetrics.getAllTemplates());
+ return l;
+ }
+
+ public static void main(String[] args) {
+ Set<String> tags = new HashSet<>();
+ tags.add("client-id");
+ ConsumerMetrics metrics = new ConsumerMetrics(tags, "consumer");
+ System.out.println(Metrics.toHtmlTable("kafka.consumer", metrics.getAllTemplates()));
+ }
+}
http://git-wip-us.apache.org/repos/asf/kafka/blob/0bc4f75e/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java
index cd32850..440ca6e 100644
--- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java
+++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java
@@ -125,7 +125,7 @@ public class Fetcher<K, V> implements SubscriptionState.Listener, Closeable {
Metadata metadata,
SubscriptionState subscriptions,
Metrics metrics,
- String metricGrpPrefix,
+ FetcherMetricsRegistry metricsRegistry,
Time time,
long retryBackoffMs,
IsolationLevel isolationLevel) {
@@ -142,7 +142,7 @@ public class Fetcher<K, V> implements SubscriptionState.Listener, Closeable {
this.keyDeserializer = ensureExtended(keyDeserializer);
this.valueDeserializer = ensureExtended(valueDeserializer);
this.completedFetches = new ConcurrentLinkedQueue<>();
- this.sensors = new FetchManagerMetrics(metrics, metricGrpPrefix);
+ this.sensors = new FetchManagerMetrics(metrics, metricsRegistry);
this.retryBackoffMs = retryBackoffMs;
this.isolationLevel = isolationLevel;
@@ -931,15 +931,12 @@ public class Fetcher<K, V> implements SubscriptionState.Listener, Closeable {
sensors.updatePartitionLagSensors(assignment);
}
- public static Sensor throttleTimeSensor(Metrics metrics, String metricGrpPrefix) {
- String metricGrpName = metricGrpPrefix + FetchManagerMetrics.METRIC_GROUP_SUFFIX;
+ public static Sensor throttleTimeSensor(Metrics metrics, FetcherMetricsRegistry metricsRegistry) {
Sensor fetchThrottleTimeSensor = metrics.sensor("fetch-throttle-time");
- fetchThrottleTimeSensor.add(metrics.metricName("fetch-throttle-time-avg",
- metricGrpName,
- "The average throttle time in ms"), new Avg());
- fetchThrottleTimeSensor.add(metrics.metricName("fetch-throttle-time-max",
- metricGrpName,
- "The maximum throttle time in ms"), new Max());
+ fetchThrottleTimeSensor.add(metrics.metricInstance(metricsRegistry.fetchThrottleTimeAvg), new Avg());
+
+ fetchThrottleTimeSensor.add(metrics.metricInstance(metricsRegistry.fetchThrottleTimeMax), new Max());
+
return fetchThrottleTimeSensor;
}
@@ -1225,54 +1222,35 @@ public class Fetcher<K, V> implements SubscriptionState.Listener, Closeable {
}
private static class FetchManagerMetrics {
- private static final String METRIC_GROUP_SUFFIX = "-fetch-manager-metrics";
private final Metrics metrics;
- private final String metricGrpName;
+ private FetcherMetricsRegistry metricsRegistry;
private final Sensor bytesFetched;
private final Sensor recordsFetched;
private final Sensor fetchLatency;
private final Sensor recordsFetchLag;
private Set<TopicPartition> assignedPartitions;
-
- private FetchManagerMetrics(Metrics metrics, String metricGrpPrefix) {
+
+ private FetchManagerMetrics(Metrics metrics, FetcherMetricsRegistry metricsRegistry) {
this.metrics = metrics;
- this.metricGrpName = metricGrpPrefix + METRIC_GROUP_SUFFIX;
+ this.metricsRegistry = metricsRegistry;
this.bytesFetched = metrics.sensor("bytes-fetched");
- this.bytesFetched.add(metrics.metricName("fetch-size-avg",
- this.metricGrpName,
- "The average number of bytes fetched per request"), new Avg());
- this.bytesFetched.add(metrics.metricName("fetch-size-max",
- this.metricGrpName,
- "The maximum number of bytes fetched per request"), new Max());
- this.bytesFetched.add(metrics.metricName("bytes-consumed-rate",
- this.metricGrpName,
- "The average number of bytes consumed per second"), new Rate());
+ this.bytesFetched.add(metrics.metricInstance(metricsRegistry.fetchSizeAvg), new Avg());
+ this.bytesFetched.add(metrics.metricInstance(metricsRegistry.fetchSizeMax), new Max());
+ this.bytesFetched.add(metrics.metricInstance(metricsRegistry.bytesConsumedRate), new Rate());
this.recordsFetched = metrics.sensor("records-fetched");
- this.recordsFetched.add(metrics.metricName("records-per-request-avg",
- this.metricGrpName,
- "The average number of records in each request"), new Avg());
- this.recordsFetched.add(metrics.metricName("records-consumed-rate",
- this.metricGrpName,
- "The average number of records consumed per second"), new Rate());
+ this.recordsFetched.add(metrics.metricInstance(metricsRegistry.recordsPerRequestAvg), new Avg());
+ this.recordsFetched.add(metrics.metricInstance(metricsRegistry.recordsConsumedRate), new Rate());
this.fetchLatency = metrics.sensor("fetch-latency");
- this.fetchLatency.add(metrics.metricName("fetch-latency-avg",
- this.metricGrpName,
- "The average time taken for a fetch request."), new Avg());
- this.fetchLatency.add(metrics.metricName("fetch-latency-max",
- this.metricGrpName,
- "The max time taken for any fetch request."), new Max());
- this.fetchLatency.add(metrics.metricName("fetch-rate",
- this.metricGrpName,
- "The number of fetch requests per second."), new Rate(new Count()));
+ this.fetchLatency.add(metrics.metricInstance(metricsRegistry.fetchLatencyAvg), new Avg());
+ this.fetchLatency.add(metrics.metricInstance(metricsRegistry.fetchLatencyMax), new Max());
+ this.fetchLatency.add(metrics.metricInstance(metricsRegistry.fetchRequestRate), new Rate(new Count()));
this.recordsFetchLag = metrics.sensor("records-lag");
- this.recordsFetchLag.add(metrics.metricName("records-lag-max",
- this.metricGrpName,
- "The maximum lag in terms of number of records for any partition in this window"), new Max());
+ this.recordsFetchLag.add(metrics.metricInstance(metricsRegistry.recordsLagMax), new Max());
}
private void recordTopicFetchMetrics(String topic, int bytes, int records) {
@@ -1283,17 +1261,11 @@ public class Fetcher<K, V> implements SubscriptionState.Listener, Closeable {
Map<String, String> metricTags = Collections.singletonMap("topic", topic.replace('.', '_'));
bytesFetched = this.metrics.sensor(name);
- bytesFetched.add(this.metrics.metricName("fetch-size-avg",
- this.metricGrpName,
- "The average number of bytes fetched per request for topic " + topic,
+ bytesFetched.add(this.metrics.metricInstance(metricsRegistry.topicFetchSizeAvg,
metricTags), new Avg());
- bytesFetched.add(this.metrics.metricName("fetch-size-max",
- this.metricGrpName,
- "The maximum number of bytes fetched per request for topic " + topic,
+ bytesFetched.add(this.metrics.metricInstance(metricsRegistry.topicFetchSizeMax,
metricTags), new Max());
- bytesFetched.add(this.metrics.metricName("bytes-consumed-rate",
- this.metricGrpName,
- "The average number of bytes consumed per second for topic " + topic,
+ bytesFetched.add(this.metrics.metricInstance(metricsRegistry.topicBytesConsumedRate,
metricTags), new Rate());
}
bytesFetched.record(bytes);
@@ -1306,13 +1278,9 @@ public class Fetcher<K, V> implements SubscriptionState.Listener, Closeable {
metricTags.put("topic", topic.replace('.', '_'));
recordsFetched = this.metrics.sensor(name);
- recordsFetched.add(this.metrics.metricName("records-per-request-avg",
- this.metricGrpName,
- "The average number of records in each request for topic " + topic,
+ recordsFetched.add(this.metrics.metricInstance(metricsRegistry.topicRecordsPerRequestAvg,
metricTags), new Avg());
- recordsFetched.add(this.metrics.metricName("records-consumed-rate",
- this.metricGrpName,
- "The average number of records consumed per second for topic " + topic,
+ recordsFetched.add(this.metrics.metricInstance(metricsRegistry.topicRecordsConsumedRate,
metricTags), new Rate());
}
recordsFetched.record(records);
@@ -1335,14 +1303,15 @@ public class Fetcher<K, V> implements SubscriptionState.Listener, Closeable {
Sensor recordsLag = this.metrics.getSensor(name);
if (recordsLag == null) {
recordsLag = this.metrics.sensor(name);
- recordsLag.add(this.metrics.metricName(name, this.metricGrpName, "The latest lag of the partition"),
- new Value());
+ recordsLag.add(this.metrics.metricName(name,
+ metricsRegistry.partitionRecordsLag.group(),
+ metricsRegistry.partitionRecordsLag.description()), new Value());
recordsLag.add(this.metrics.metricName(name + "-max",
- this.metricGrpName,
- "The max lag of the partition"), new Max());
+ metricsRegistry.partitionRecordsLagMax.group(),
+ metricsRegistry.partitionRecordsLagMax.description()), new Max());
recordsLag.add(this.metrics.metricName(name + "-avg",
- this.metricGrpName,
- "The average lag of the partition"), new Avg());
+ metricsRegistry.partitionRecordsLagAvg.group(),
+ metricsRegistry.partitionRecordsLagAvg.description()), new Avg());
}
recordsLag.record(lag);
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/0bc4f75e/clients/src/main/java/org/apache/kafka/clients/consumer/internals/FetcherMetricsRegistry.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/FetcherMetricsRegistry.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/FetcherMetricsRegistry.java
new file mode 100644
index 0000000..0c98342
--- /dev/null
+++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/FetcherMetricsRegistry.java
@@ -0,0 +1,140 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.clients.consumer.internals;
+
+import java.util.Arrays;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+
+import org.apache.kafka.common.MetricNameTemplate;
+
+public class FetcherMetricsRegistry {
+
+ public MetricNameTemplate fetchSizeAvg;
+ public MetricNameTemplate fetchSizeMax;
+ public MetricNameTemplate bytesConsumedRate;
+ public MetricNameTemplate recordsPerRequestAvg;
+ public MetricNameTemplate recordsConsumedRate;
+ public MetricNameTemplate fetchLatencyAvg;
+ public MetricNameTemplate fetchLatencyMax;
+ public MetricNameTemplate fetchRequestRate;
+ public MetricNameTemplate recordsLagMax;
+ public MetricNameTemplate fetchThrottleTimeAvg;
+ public MetricNameTemplate fetchThrottleTimeMax;
+ public MetricNameTemplate topicFetchSizeAvg;
+ public MetricNameTemplate topicFetchSizeMax;
+ public MetricNameTemplate topicBytesConsumedRate;
+ public MetricNameTemplate topicRecordsPerRequestAvg;
+ public MetricNameTemplate topicRecordsConsumedRate;
+ public MetricNameTemplate partitionRecordsLag;
+ public MetricNameTemplate partitionRecordsLagMax;
+ public MetricNameTemplate partitionRecordsLagAvg;
+
+ public FetcherMetricsRegistry() {
+ this(new HashSet<String>(), "");
+ }
+
+ public FetcherMetricsRegistry(String metricGrpPrefix) {
+ this(new HashSet<String>(), metricGrpPrefix);
+ }
+
+ public FetcherMetricsRegistry(Set<String> tags, String metricGrpPrefix) {
+
+ /***** Client level *****/
+ String groupName = metricGrpPrefix + "-fetch-manager-metrics";
+
+ this.fetchSizeAvg = new MetricNameTemplate("fetch-size-avg", groupName,
+ "The average number of bytes fetched per request", tags);
+
+ this.fetchSizeMax = new MetricNameTemplate("fetch-size-max", groupName,
+ "The maximum number of bytes fetched per request", tags);
+ this.bytesConsumedRate = new MetricNameTemplate("bytes-consumed-rate", groupName,
+ "The average number of bytes consumed per second", tags);
+
+ this.recordsPerRequestAvg = new MetricNameTemplate("records-per-request-avg", groupName,
+ "The average number of records in each request", tags);
+ this.recordsConsumedRate = new MetricNameTemplate("records-consumed-rate", groupName,
+ "The average number of records consumed per second", tags);
+
+ this.fetchLatencyAvg = new MetricNameTemplate("fetch-latency-avg", groupName,
+ "The average time taken for a fetch request.", tags);
+ this.fetchLatencyMax = new MetricNameTemplate("fetch-latency-max", groupName,
+ "The max time taken for any fetch request.", tags);
+ this.fetchRequestRate = new MetricNameTemplate("fetch-rate", groupName,
+ "The number of fetch requests per second.", tags);
+
+ this.recordsLagMax = new MetricNameTemplate("records-lag-max", groupName,
+ "The maximum lag in terms of number of records for any partition in this window", tags);
+
+ this.fetchThrottleTimeAvg = new MetricNameTemplate("fetch-throttle-time-avg", groupName,
+ "The average throttle time in ms", tags);
+ this.fetchThrottleTimeMax = new MetricNameTemplate("fetch-throttle-time-max", groupName,
+ "The maximum throttle time in ms", tags);
+
+ /***** Topic level *****/
+ Set<String> topicTags = new HashSet<>(tags);
+ topicTags.add("topic");
+
+ this.topicFetchSizeAvg = new MetricNameTemplate("fetch-size-avg", groupName,
+ "The average number of bytes fetched per request for a topic", topicTags);
+ this.topicFetchSizeMax = new MetricNameTemplate("fetch-size-max", groupName,
+ "The maximum number of bytes fetched per request for a topic", topicTags);
+ this.topicBytesConsumedRate = new MetricNameTemplate("bytes-consumed-rate", groupName,
+ "The average number of bytes consumed per second for a topic", topicTags);
+
+ this.topicRecordsPerRequestAvg = new MetricNameTemplate("records-per-request-avg", groupName,
+ "The average number of records in each request for a topic", topicTags);
+ this.topicRecordsConsumedRate = new MetricNameTemplate("records-consumed-rate", groupName,
+ "The average number of records consumed per second for a topic", topicTags);
+
+ /***** Partition level *****/
+ this.partitionRecordsLag = new MetricNameTemplate("{topic}-{partition}.records-lag", groupName,
+ "The latest lag of the partition", tags);
+ this.partitionRecordsLagMax = new MetricNameTemplate("{topic}-{partition}.records-lag-max", groupName,
+ "The max lag of the partition", tags);
+ this.partitionRecordsLagAvg = new MetricNameTemplate("{topic}-{partition}.records-lag-avg", groupName,
+ "The average lag of the partition", tags);
+
+
+ }
+
+ public List<MetricNameTemplate> getAllTemplates() {
+ return Arrays.asList(
+ fetchSizeAvg,
+ fetchSizeMax,
+ bytesConsumedRate,
+ recordsPerRequestAvg,
+ recordsConsumedRate,
+ fetchLatencyAvg,
+ fetchLatencyMax,
+ fetchRequestRate,
+ recordsLagMax,
+ fetchThrottleTimeAvg,
+ fetchThrottleTimeMax,
+ topicFetchSizeAvg,
+ topicFetchSizeMax,
+ topicBytesConsumedRate,
+ topicRecordsPerRequestAvg,
+ topicRecordsConsumedRate,
+ partitionRecordsLag,
+ partitionRecordsLagAvg,
+ partitionRecordsLagMax
+ );
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/kafka/blob/0bc4f75e/clients/src/main/java/org/apache/kafka/common/MetricNameTemplate.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/MetricNameTemplate.java b/clients/src/main/java/org/apache/kafka/common/MetricNameTemplate.java
new file mode 100644
index 0000000..d768f22
--- /dev/null
+++ b/clients/src/main/java/org/apache/kafka/common/MetricNameTemplate.java
@@ -0,0 +1,72 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.common;
+
+import java.util.HashSet;
+import java.util.Set;
+
+import org.apache.kafka.common.utils.Utils;
+
+/**
+ * A template for a MetricName. It contains a name, group, and description, as
+ * well as all the tags that will be used to create the mBean name. Tag values
+ * are omitted from the template, but are filled in at runtime with their
+ * specified values.
+ */
+public class MetricNameTemplate {
+ private final String name;
+ private final String group;
+ private final String description;
+ private Set<String> tags;
+
+ public MetricNameTemplate(String name, String group, String description, Set<String> tags) {
+ this.name = Utils.notNull(name);
+ this.group = Utils.notNull(group);
+ this.description = Utils.notNull(description);
+ this.tags = Utils.notNull(tags);
+ }
+
+ public MetricNameTemplate(String name, String group, String description, String... keys) {
+ this(name, group, description, getTags(keys));
+ }
+
+ private static Set<String> getTags(String... keys) {
+ Set<String> tags = new HashSet<String>();
+
+ for (int i = 0; i < keys.length; i++)
+ tags.add(keys[i]);
+
+ return tags;
+ }
+
+ public String name() {
+ return this.name;
+ }
+
+ public String group() {
+ return this.group;
+ }
+
+ public String description() {
+ return this.description;
+ }
+
+
+ public Set<String> tags() {
+ return tags;
+ }
+}
http://git-wip-us.apache.org/repos/asf/kafka/blob/0bc4f75e/clients/src/main/java/org/apache/kafka/common/metrics/JmxReporter.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/metrics/JmxReporter.java b/clients/src/main/java/org/apache/kafka/common/metrics/JmxReporter.java
index 52704d6..67dfaa8 100644
--- a/clients/src/main/java/org/apache/kafka/common/metrics/JmxReporter.java
+++ b/clients/src/main/java/org/apache/kafka/common/metrics/JmxReporter.java
@@ -97,7 +97,7 @@ public class JmxReporter implements MetricsReporter {
private KafkaMbean removeAttribute(KafkaMetric metric) {
MetricName metricName = metric.metricName();
- String mBeanName = getMBeanName(metricName);
+ String mBeanName = getMBeanName(prefix, metricName);
KafkaMbean mbean = this.mbeans.get(mBeanName);
if (mbean != null)
mbean.removeAttribute(metricName.name());
@@ -107,7 +107,7 @@ public class JmxReporter implements MetricsReporter {
private KafkaMbean addAttribute(KafkaMetric metric) {
try {
MetricName metricName = metric.metricName();
- String mBeanName = getMBeanName(metricName);
+ String mBeanName = getMBeanName(prefix, metricName);
if (!this.mbeans.containsKey(mBeanName))
mbeans.put(mBeanName, new KafkaMbean(mBeanName));
KafkaMbean mbean = this.mbeans.get(mBeanName);
@@ -122,7 +122,7 @@ public class JmxReporter implements MetricsReporter {
* @param metricName
* @return standard JMX MBean name in the following format domainName:type=metricType,key1=val1,key2=val2
*/
- private String getMBeanName(MetricName metricName) {
+ static String getMBeanName(String prefix, MetricName metricName) {
StringBuilder mBeanName = new StringBuilder();
mBeanName.append(prefix);
mBeanName.append(":type=");
http://git-wip-us.apache.org/repos/asf/kafka/blob/0bc4f75e/clients/src/main/java/org/apache/kafka/common/metrics/Metrics.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/metrics/Metrics.java b/clients/src/main/java/org/apache/kafka/common/metrics/Metrics.java
index 3861d30..c4cd676 100644
--- a/clients/src/main/java/org/apache/kafka/common/metrics/Metrics.java
+++ b/clients/src/main/java/org/apache/kafka/common/metrics/Metrics.java
@@ -17,6 +17,7 @@
package org.apache.kafka.common.metrics;
import org.apache.kafka.common.MetricName;
+import org.apache.kafka.common.MetricNameTemplate;
import org.apache.kafka.common.utils.Time;
import org.apache.kafka.common.utils.Utils;
import org.slf4j.Logger;
@@ -26,9 +27,13 @@ import java.io.Closeable;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
+import java.util.HashSet;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
+import java.util.Map.Entry;
+import java.util.Set;
+import java.util.TreeMap;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.ScheduledThreadPoolExecutor;
@@ -226,6 +231,65 @@ public class Metrics implements Closeable {
return tags;
}
+ public static String toHtmlTable(String domain, List<MetricNameTemplate> allMetrics) {
+ Map<String, Map<String, String>> beansAndAttributes = new TreeMap<String, Map<String, String>>();
+
+ try (Metrics metrics = new Metrics()) {
+ for (MetricNameTemplate template : allMetrics) {
+ Map<String, String> tags = new TreeMap<String, String>();
+ for (String s : template.tags()) {
+ tags.put(s, "{" + s + "}");
+ }
+
+ MetricName metricName = metrics.metricName(template.name(), template.group(), template.description(), tags);
+ String mBeanName = JmxReporter.getMBeanName(domain, metricName);
+ if (!beansAndAttributes.containsKey(mBeanName)) {
+ beansAndAttributes.put(mBeanName, new TreeMap<String, String>());
+ }
+ Map<String, String> attrAndDesc = beansAndAttributes.get(mBeanName);
+ if (!attrAndDesc.containsKey(template.name())) {
+ attrAndDesc.put(template.name(), template.description());
+ } else {
+ throw new IllegalArgumentException("mBean '" + mBeanName + "' attribute '" + template.name() + "' is defined twice.");
+ }
+ }
+ }
+
+ StringBuilder b = new StringBuilder();
+ b.append("<table class=\"data-table\"><tbody>\n");
+
+ for (Entry<String, Map<String, String>> e : beansAndAttributes.entrySet()) {
+ b.append("<tr>\n");
+ b.append("<td colspan=3 class=\"mbeanName\" style=\"background-color:#ccc; font-weight: bold;\">");
+ b.append(e.getKey());
+ b.append("</td>");
+ b.append("</tr>\n");
+
+ b.append("<tr>\n");
+ b.append("<th style=\"width: 90px\"></th>\n");
+ b.append("<th>Attribute name</th>\n");
+ b.append("<th>Description</th>\n");
+ b.append("</tr>\n");
+
+ for (Entry<String, String> e2 : e.getValue().entrySet()) {
+ b.append("<tr>\n");
+ b.append("<td></td>");
+ b.append("<td>");
+ b.append(e2.getKey());
+ b.append("</td>");
+ b.append("<td>");
+ b.append(e2.getValue());
+ b.append("</td>");
+ b.append("</tr>\n");
+ }
+
+ }
+ b.append("</tbody></table>");
+
+ return b.toString();
+
+ }
+
public MetricConfig config() {
return config;
}
@@ -484,6 +548,25 @@ public class Metrics implements Closeable {
return Collections.unmodifiableMap(childrenSensors);
}
+ public MetricName metricInstance(MetricNameTemplate template, String... keyValue) {
+ return metricInstance(template, getTags(keyValue));
+ }
+
+ public MetricName metricInstance(MetricNameTemplate template, Map<String, String> tags) {
+ // check to make sure that the runtime defined tags contain all the template tags.
+ Set<String> runtimeTagKeys = new HashSet<>(tags.keySet());
+ runtimeTagKeys.addAll(config().tags().keySet());
+
+ Set<String> templateTagKeys = template.tags();
+
+ if (!runtimeTagKeys.equals(templateTagKeys)) {
+ throw new IllegalArgumentException("For '" + template.name() + "', runtime-defined metric tags do not match the tags in the template. " + ""
+ + "Runtime = " + runtimeTagKeys.toString() + " Template = " + templateTagKeys.toString());
+ }
+
+ return this.metricName(template.name(), template.group(), template.description(), tags);
+ }
+
/**
* Close this metrics repository.
*/
http://git-wip-us.apache.org/repos/asf/kafka/blob/0bc4f75e/clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java
----------------------------------------------------------------------
diff --git a/clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java b/clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java
index 9117f71..1249896 100644
--- a/clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java
+++ b/clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java
@@ -22,6 +22,7 @@ import org.apache.kafka.clients.Metadata;
import org.apache.kafka.clients.MockClient;
import org.apache.kafka.clients.consumer.internals.ConsumerCoordinator;
import org.apache.kafka.clients.consumer.internals.ConsumerInterceptors;
+import org.apache.kafka.clients.consumer.internals.ConsumerMetrics;
import org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient;
import org.apache.kafka.clients.consumer.internals.ConsumerProtocol;
import org.apache.kafka.clients.consumer.internals.Fetcher;
@@ -1540,6 +1541,8 @@ public class KafkaConsumerTest {
ConsumerInterceptors<String, String> interceptors = null;
Metrics metrics = new Metrics();
+ ConsumerMetrics metricsRegistry = new ConsumerMetrics(metricGroupPrefix);
+
SubscriptionState subscriptions = new SubscriptionState(autoResetStrategy);
ConsumerNetworkClient consumerClient = new ConsumerNetworkClient(client, metadata, time, retryBackoffMs, requestTimeoutMs);
ConsumerCoordinator consumerCoordinator = new ConsumerCoordinator(
@@ -1574,7 +1577,7 @@ public class KafkaConsumerTest {
metadata,
subscriptions,
metrics,
- metricGroupPrefix,
+ metricsRegistry.fetcherMetrics,
time,
retryBackoffMs,
IsolationLevel.READ_UNCOMMITTED);
http://git-wip-us.apache.org/repos/asf/kafka/blob/0bc4f75e/clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java
----------------------------------------------------------------------
diff --git a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java
index ba5b7d5..720079c 100644
--- a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java
+++ b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java
@@ -122,6 +122,8 @@ public class FetcherTest {
private Cluster cluster = TestUtils.singletonCluster(topicName, 2);
private Node node = cluster.nodes().get(0);
private Metrics metrics = new Metrics(time);
+ FetcherMetricsRegistry metricsRegistry = new FetcherMetricsRegistry("consumer" + groupId);
+
private SubscriptionState subscriptions = new SubscriptionState(OffsetResetStrategy.EARLIEST);
private SubscriptionState subscriptionsNoAutoReset = new SubscriptionState(OffsetResetStrategy.NONE);
private static final double EPSILON = 0.0001;
@@ -1043,7 +1045,7 @@ public class FetcherTest {
@Test
public void testQuotaMetrics() throws Exception {
MockSelector selector = new MockSelector(time);
- Sensor throttleTimeSensor = Fetcher.throttleTimeSensor(metrics, "consumer" + groupId);
+ Sensor throttleTimeSensor = Fetcher.throttleTimeSensor(metrics, metricsRegistry);
Cluster cluster = TestUtils.singletonCluster("test", 1);
Node node = cluster.nodes().get(0);
NetworkClient client = new NetworkClient(selector, metadata, "mock", Integer.MAX_VALUE,
@@ -1070,8 +1072,8 @@ public class FetcherTest {
selector.clear();
}
Map<MetricName, KafkaMetric> allMetrics = metrics.metrics();
- KafkaMetric avgMetric = allMetrics.get(metrics.metricName("fetch-throttle-time-avg", metricGroup, ""));
- KafkaMetric maxMetric = allMetrics.get(metrics.metricName("fetch-throttle-time-max", metricGroup, ""));
+ KafkaMetric avgMetric = allMetrics.get(metrics.metricInstance(metricsRegistry.fetchThrottleTimeAvg));
+ KafkaMetric maxMetric = allMetrics.get(metrics.metricInstance(metricsRegistry.fetchThrottleTimeMax));
// Throttle times are ApiVersions=400, Fetch=(100, 200, 300)
assertEquals(250, avgMetric.value(), EPSILON);
assertEquals(400, maxMetric.value(), EPSILON);
@@ -1087,7 +1089,7 @@ public class FetcherTest {
subscriptions.assignFromUser(singleton(tp1));
subscriptions.seek(tp1, 0);
- MetricName maxLagMetric = metrics.metricName("records-lag-max", metricGroup);
+ MetricName maxLagMetric = metrics.metricInstance(metricsRegistry.recordsLagMax);
MetricName partitionLagMetric = metrics.metricName(tp1 + ".records-lag", metricGroup);
Map<MetricName, KafkaMetric> allMetrics = metrics.metrics();
@@ -1122,8 +1124,8 @@ public class FetcherTest {
subscriptions.seek(tp1, 0);
Map<MetricName, KafkaMetric> allMetrics = metrics.metrics();
- KafkaMetric fetchSizeAverage = allMetrics.get(metrics.metricName("fetch-size-avg", metricGroup));
- KafkaMetric recordsCountAverage = allMetrics.get(metrics.metricName("records-per-request-avg", metricGroup));
+ KafkaMetric fetchSizeAverage = allMetrics.get(metrics.metricInstance(metricsRegistry.fetchSizeAvg));
+ KafkaMetric recordsCountAverage = allMetrics.get(metrics.metricInstance(metricsRegistry.recordsPerRequestAvg));
MemoryRecordsBuilder builder = MemoryRecords.builder(ByteBuffer.allocate(1024), CompressionType.NONE,
TimestampType.CREATE_TIME, 0L);
@@ -1146,8 +1148,8 @@ public class FetcherTest {
subscriptions.seek(tp1, 1);
Map<MetricName, KafkaMetric> allMetrics = metrics.metrics();
- KafkaMetric fetchSizeAverage = allMetrics.get(metrics.metricName("fetch-size-avg", metricGroup));
- KafkaMetric recordsCountAverage = allMetrics.get(metrics.metricName("records-per-request-avg", metricGroup));
+ KafkaMetric fetchSizeAverage = allMetrics.get(metrics.metricInstance(metricsRegistry.fetchSizeAvg));
+ KafkaMetric recordsCountAverage = allMetrics.get(metrics.metricInstance(metricsRegistry.recordsPerRequestAvg));
MemoryRecordsBuilder builder = MemoryRecords.builder(ByteBuffer.allocate(1024), CompressionType.NONE,
TimestampType.CREATE_TIME, 0L);
@@ -1173,8 +1175,8 @@ public class FetcherTest {
subscriptions.seek(tp2, 0);
Map<MetricName, KafkaMetric> allMetrics = metrics.metrics();
- KafkaMetric fetchSizeAverage = allMetrics.get(metrics.metricName("fetch-size-avg", metricGroup));
- KafkaMetric recordsCountAverage = allMetrics.get(metrics.metricName("records-per-request-avg", metricGroup));
+ KafkaMetric fetchSizeAverage = allMetrics.get(metrics.metricInstance(metricsRegistry.fetchSizeAvg));
+ KafkaMetric recordsCountAverage = allMetrics.get(metrics.metricInstance(metricsRegistry.recordsPerRequestAvg));
MemoryRecordsBuilder builder = MemoryRecords.builder(ByteBuffer.allocate(1024), CompressionType.NONE,
TimestampType.CREATE_TIME, 0L);
@@ -1208,8 +1210,8 @@ public class FetcherTest {
subscriptions.seek(tp2, 0);
Map<MetricName, KafkaMetric> allMetrics = metrics.metrics();
- KafkaMetric fetchSizeAverage = allMetrics.get(metrics.metricName("fetch-size-avg", metricGroup));
- KafkaMetric recordsCountAverage = allMetrics.get(metrics.metricName("records-per-request-avg", metricGroup));
+ KafkaMetric fetchSizeAverage = allMetrics.get(metrics.metricInstance(metricsRegistry.fetchSizeAvg));
+ KafkaMetric recordsCountAverage = allMetrics.get(metrics.metricInstance(metricsRegistry.recordsPerRequestAvg));
// send the fetch and then seek to a new offset
assertEquals(1, fetcher.sendFetches());
@@ -1816,7 +1818,7 @@ public class FetcherTest {
metadata,
subscriptions,
metrics,
- "consumer" + groupId,
+ metricsRegistry,
time,
retryBackoffMs,
isolationLevel);
http://git-wip-us.apache.org/repos/asf/kafka/blob/0bc4f75e/clients/src/test/java/org/apache/kafka/common/metrics/MetricsTest.java
----------------------------------------------------------------------
diff --git a/clients/src/test/java/org/apache/kafka/common/metrics/MetricsTest.java b/clients/src/test/java/org/apache/kafka/common/metrics/MetricsTest.java
index dac9570..0904a41 100644
--- a/clients/src/test/java/org/apache/kafka/common/metrics/MetricsTest.java
+++ b/clients/src/test/java/org/apache/kafka/common/metrics/MetricsTest.java
@@ -530,4 +530,58 @@ public class MetricsTest {
private Double measure(Measurable rate, MetricConfig config) {
return rate.measure(config, time.milliseconds());
}
+
+ @Test
+ public void testMetricInstances() {
+ MetricName n1 = metrics.metricInstance(SampleMetrics.METRIC1, "key1", "value1", "key2", "value2");
+ Map<String, String> tags = new HashMap<String, String>();
+ tags.put("key1", "value1");
+ tags.put("key2", "value2");
+ MetricName n2 = metrics.metricInstance(SampleMetrics.METRIC2, tags);
+ assertEquals("metric names created in two different ways should be equal", n1, n2);
+
+ try {
+ metrics.metricInstance(SampleMetrics.METRIC1, "key1");
+ fail("Creating MetricName with an odd number of keyValue should fail");
+ } catch (IllegalArgumentException e) {
+ // this is expected
+ }
+
+ Map<String, String> parentTagsWithValues = new HashMap<>();
+ parentTagsWithValues.put("parent-tag", "parent-tag-value");
+
+ Map<String, String> childTagsWithValues = new HashMap<>();
+ childTagsWithValues.put("child-tag", "child-tag-value");
+
+ try (Metrics inherited = new Metrics(new MetricConfig().tags(parentTagsWithValues), Arrays.asList((MetricsReporter) new JmxReporter()), time, true)) {
+ MetricName inheritedMetric = inherited.metricInstance(SampleMetrics.METRIC_WITH_INHERITED_TAGS, childTagsWithValues);
+
+ Map<String, String> filledOutTags = inheritedMetric.tags();
+ assertEquals("parent-tag should be set properly", filledOutTags.get("parent-tag"), "parent-tag-value");
+ assertEquals("child-tag should be set properly", filledOutTags.get("child-tag"), "child-tag-value");
+
+ try {
+ inherited.metricInstance(SampleMetrics.METRIC_WITH_INHERITED_TAGS, parentTagsWithValues);
+ fail("Creating MetricName should fail if the child metrics are not defined at runtime");
+ } catch (IllegalArgumentException e) {
+ // this is expected
+ }
+
+ try {
+
+ Map<String, String> runtimeTags = new HashMap<>();
+ runtimeTags.put("child-tag", "child-tag-value");
+ runtimeTags.put("tag-not-in-template", "unexpected-value");
+
+ inherited.metricInstance(SampleMetrics.METRIC_WITH_INHERITED_TAGS, runtimeTags);
+ fail("Creating MetricName should fail if there is a tag at runtime that is not in the template");
+ } catch (IllegalArgumentException e) {
+ // this is expected
+ }
+ }
+
+ }
+
+
+
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/0bc4f75e/clients/src/test/java/org/apache/kafka/common/metrics/SampleMetrics.java
----------------------------------------------------------------------
diff --git a/clients/src/test/java/org/apache/kafka/common/metrics/SampleMetrics.java b/clients/src/test/java/org/apache/kafka/common/metrics/SampleMetrics.java
new file mode 100644
index 0000000..1e3e817
--- /dev/null
+++ b/clients/src/test/java/org/apache/kafka/common/metrics/SampleMetrics.java
@@ -0,0 +1,31 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.common.metrics;
+
+import org.apache.kafka.common.MetricNameTemplate;
+
+/**
+ * A registry of predefined Metrics for the MetricsTest.java class.
+ */
+public class SampleMetrics {
+
+ public static final MetricNameTemplate METRIC1 = new MetricNameTemplate("name", "group", "The first metric used in testMetricName()", "key1", "key2");
+ public static final MetricNameTemplate METRIC2 = new MetricNameTemplate("name", "group", "The second metric used in testMetricName()", "key1", "key2");
+
+ public static final MetricNameTemplate METRIC_WITH_INHERITED_TAGS = new MetricNameTemplate("inherited.tags", "group", "inherited.tags in testMetricName", "parent-tag", "child-tag");
+}
+
http://git-wip-us.apache.org/repos/asf/kafka/blob/0bc4f75e/docs/ops.html
----------------------------------------------------------------------
diff --git a/docs/ops.html b/docs/ops.html
index eface56..23caeaf 100644
--- a/docs/ops.html
+++ b/docs/ops.html
@@ -1224,110 +1224,7 @@
<h5><a id="new_consumer_fetch_monitoring" href="#new_consumer_fetch_monitoring">Consumer Fetch Metrics</a></h5>
- <table class="data-table">
- <tbody>
- <tr>
- <th>Metric/Attribute name</th>
- <th>Description</th>
- <th>Mbean name</th>
- </tr>
- <tr>
- <td>fetch-size-avg</td>
- <td>The average number of bytes fetched per request</td>
- <td>kafka.consumer:type=consumer-fetch-manager-metrics,client-id=([-.\w]+)</td>
- </tr>
- <tr>
- <td>fetch-size-max</td>
- <td>The maximum number of bytes fetched per request</td>
- <td>kafka.consumer:type=consumer-fetch-manager-metrics,client-id=([-.\w]+)</td>
- </tr>
- <tr>
- <td>bytes-consumed-rate</td>
- <td>The average number of bytes consumed per second</td>
- <td>kafka.consumer:type=consumer-fetch-manager-metrics,client-id=([-.\w]+)</td>
- </tr>
- <tr>
- <td>records-per-request-avg</td>
- <td>The average number of records in each request</td>
- <td>kafka.consumer:type=consumer-fetch-manager-metrics,client-id=([-.\w]+)</td>
- </tr>
- <tr>
- <td>records-consumed-rate</td>
- <td>The average number of records consumed per second</td>
- <td>kafka.consumer:type=consumer-fetch-manager-metrics,client-id=([-.\w]+)</td>
- </tr>
- <tr>
- <td>fetch-latency-avg</td>
- <td>The average time taken for a fetch request</td>
- <td>kafka.consumer:type=consumer-fetch-manager-metrics,client-id=([-.\w]+)</td>
- </tr>
- <tr>
- <td>fetch-latency-max</td>
- <td>The max time taken for a fetch request</td>
- <td>kafka.consumer:type=consumer-fetch-manager-metrics,client-id=([-.\w]+)</td>
- </tr>
- <tr>
- <td>fetch-rate</td>
- <td>The number of fetch requests per second</td>
- <td>kafka.consumer:type=consumer-fetch-manager-metrics,client-id=([-.\w]+)</td>
- </tr>
- <tr>
- <td>records-lag-max</td>
- <td>The maximum lag in terms of number of records for any partition in this window</td>
- <td>kafka.consumer:type=consumer-fetch-manager-metrics,client-id=([-.\w]+)</td>
- </tr>
- <tr>
- <td>fetch-throttle-time-avg</td>
- <td>The average throttle time in ms</td>
- <td>kafka.consumer:type=consumer-fetch-manager-metrics,client-id=([-.\w]+)</td>
- </tr>
- <tr>
- <td>fetch-throttle-time-max</td>
- <td>The maximum throttle time in ms</td>
- <td>kafka.consumer:type=consumer-fetch-manager-metrics,client-id=([-.\w]+)</td>
- </tr>
- </tbody>
- </table>
-
-
- <h5><a id="topic_fetch_monitoring" href="#topic_fetch_monitoring">Topic-level Fetch Metrics</a></h5>
-
- <table class="data-table">
- <tbody>
- <tr>
- <th>Metric/Attribute name</th>
- <th>Description</th>
- <th>Mbean name</th>
- </tr>
- <tr>
- <td>fetch-size-avg</td>
- <td>The average number of bytes fetched per request for a specific topic.</td>
- <td>kafka.consumer:type=consumer-fetch-manager-metrics,client-id=([-.\w]+),topic=([-.\w]+)</td>
- </tr>
- <tr>
- <td>fetch-size-max</td>
- <td>The maximum number of bytes fetched per request for a specific topic.</td>
- <td>kafka.consumer:type=consumer-fetch-manager-metrics,client-id=([-.\w]+),topic=([-.\w]+)</td>
- </tr>
- <tr>
- <td>bytes-consumed-rate</td>
- <td>The average number of bytes consumed per second for a specific topic.</td>
- <td>kafka.consumer:type=consumer-fetch-manager-metrics,client-id=([-.\w]+),topic=([-.\w]+)</td>
- </tr>
- <tr>
- <td>records-per-request-avg</td>
- <td>The average number of records in each request for a specific topic.</td>
- <td>kafka.consumer:type=consumer-fetch-manager-metrics,client-id=([-.\w]+),topic=([-.\w]+)</td>
- </tr>
- <tr>
- <td>records-consumed-rate</td>
- <td>The average number of records consumed per second for a specific topic.</td>
- <td>kafka.consumer:type=consumer-fetch-manager-metrics,client-id=([-.\w]+),topic=([-.\w]+)</td>
- </tr>
- </tbody>
- </table>
-
-
+ <!--#include virtual="generated/consumer_metrics.html" -->
<h4><a id="kafka_streams_monitoring" href="#kafka_streams_monitoring">Streams Monitoring</a></h4>