You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by gu...@apache.org on 2017/05/26 22:34:22 UTC

kafka git commit: KAFKA-5191: Autogenerate Consumer Fetcher metrics

Repository: kafka
Updated Branches:
  refs/heads/trunk ca8915d2e -> 0bc4f75ee


KAFKA-5191: Autogenerate Consumer Fetcher metrics

Autogenerate docs for the Consumer Fetcher's metrics. This is a smaller subset of the original PR https://github.com/apache/kafka/pull/1202.

CC ijuma benstopford hachikuji

Author: James Cheng <jy...@yahoo.com>

Reviewers: Ismael Juma <is...@juma.me.uk>, Guozhang Wang <wa...@gmail.com>

Closes #2993 from wushujames/fetcher_metrics_docs


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/0bc4f75e
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/0bc4f75e
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/0bc4f75e

Branch: refs/heads/trunk
Commit: 0bc4f75eedf14af6d0b2e3e9be62a460b1049d0b
Parents: ca8915d
Author: James Cheng <jy...@yahoo.com>
Authored: Fri May 26 15:34:20 2017 -0700
Committer: Guozhang Wang <wa...@gmail.com>
Committed: Fri May 26 15:34:20 2017 -0700

----------------------------------------------------------------------
 build.gradle                                    |   9 +-
 .../kafka/clients/consumer/KafkaConsumer.java   |   6 +-
 .../consumer/internals/ConsumerMetrics.java     |  51 +++++++
 .../clients/consumer/internals/Fetcher.java     |  95 +++++--------
 .../internals/FetcherMetricsRegistry.java       | 140 +++++++++++++++++++
 .../apache/kafka/common/MetricNameTemplate.java |  72 ++++++++++
 .../kafka/common/metrics/JmxReporter.java       |   6 +-
 .../apache/kafka/common/metrics/Metrics.java    |  83 +++++++++++
 .../clients/consumer/KafkaConsumerTest.java     |   5 +-
 .../clients/consumer/internals/FetcherTest.java |  28 ++--
 .../kafka/common/metrics/MetricsTest.java       |  54 +++++++
 .../kafka/common/metrics/SampleMetrics.java     |  31 ++++
 docs/ops.html                                   | 105 +-------------
 13 files changed, 498 insertions(+), 187 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/0bc4f75e/build.gradle
----------------------------------------------------------------------
diff --git a/build.gradle b/build.gradle
index 56c643e..07835ca 100644
--- a/build.gradle
+++ b/build.gradle
@@ -617,10 +617,17 @@ project(':core') {
     standardOutput = new File(generatedDocsDir, "topic_config.html").newOutputStream()
   }
 
+  task genConsumerMetricsDocs(type: JavaExec) {
+    classpath = sourceSets.test.runtimeClasspath
+    main = 'org.apache.kafka.clients.consumer.internals.ConsumerMetrics'
+    if( !generatedDocsDir.exists() ) { generatedDocsDir.mkdirs() }
+    standardOutput = new File(generatedDocsDir, "consumer_metrics.html").newOutputStream()
+  }
+
   task siteDocsTar(dependsOn: ['genProtocolErrorDocs', 'genProtocolApiKeyDocs', 'genProtocolMessageDocs',
                                'genProducerConfigDocs', 'genConsumerConfigDocs', 'genKafkaConfigDocs',
                                'genTopicConfigDocs', ':connect:runtime:genConnectConfigDocs', ':connect:runtime:genConnectTransformationDocs',
-                               ':streams:genStreamsConfigDocs'], type: Tar) {
+                               ':streams:genStreamsConfigDocs', 'genConsumerMetricsDocs'], type: Tar) {
     classifier = 'site-docs'
     compression = Compression.GZIP
     from project.file("$rootDir/docs")

http://git-wip-us.apache.org/repos/asf/kafka/blob/0bc4f75e/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java b/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java
index 0359071..52d9456 100644
--- a/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java
+++ b/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java
@@ -22,6 +22,7 @@ import org.apache.kafka.clients.Metadata;
 import org.apache.kafka.clients.NetworkClient;
 import org.apache.kafka.clients.consumer.internals.ConsumerCoordinator;
 import org.apache.kafka.clients.consumer.internals.ConsumerInterceptors;
+import org.apache.kafka.clients.consumer.internals.ConsumerMetrics;
 import org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient;
 import org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.PollCondition;
 import org.apache.kafka.clients.consumer.internals.Fetcher;
@@ -662,11 +663,12 @@ public class KafkaConsumer<K, V> implements Consumer<K, V> {
             List<InetSocketAddress> addresses = ClientUtils.parseAndValidateAddresses(config.getList(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG));
             this.metadata.update(Cluster.bootstrap(addresses), Collections.<String>emptySet(), 0);
             String metricGrpPrefix = "consumer";
+            ConsumerMetrics metricsRegistry = new ConsumerMetrics(metricsTags.keySet(), "consumer");
             ChannelBuilder channelBuilder = ClientUtils.createChannelBuilder(config);
 
             IsolationLevel isolationLevel = IsolationLevel.valueOf(
                     config.getString(ConsumerConfig.ISOLATION_LEVEL_CONFIG).toUpperCase(Locale.ROOT));
-            Sensor throttleTimeSensor = Fetcher.throttleTimeSensor(metrics, metricGrpPrefix);
+            Sensor throttleTimeSensor = Fetcher.throttleTimeSensor(metrics, metricsRegistry.fetcherMetrics);
 
             NetworkClient netClient = new NetworkClient(
                     new Selector(config.getLong(ConsumerConfig.CONNECTIONS_MAX_IDLE_MS_CONFIG), metrics, time, metricGrpPrefix, channelBuilder),
@@ -718,7 +720,7 @@ public class KafkaConsumer<K, V> implements Consumer<K, V> {
                     this.metadata,
                     this.subscriptions,
                     metrics,
-                    metricGrpPrefix,
+                    metricsRegistry.fetcherMetrics,
                     this.time,
                     this.retryBackoffMs,
                     isolationLevel);

http://git-wip-us.apache.org/repos/asf/kafka/blob/0bc4f75e/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerMetrics.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerMetrics.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerMetrics.java
new file mode 100644
index 0000000..3492323
--- /dev/null
+++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerMetrics.java
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.clients.consumer.internals;
+
+import java.util.ArrayList;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+
+import org.apache.kafka.common.MetricNameTemplate;
+import org.apache.kafka.common.metrics.Metrics;
+
+public class ConsumerMetrics {
+    
+    public FetcherMetricsRegistry fetcherMetrics;
+    
+    public ConsumerMetrics(Set<String> metricsTags, String metricGrpPrefix) {
+        this.fetcherMetrics = new FetcherMetricsRegistry(metricsTags, metricGrpPrefix);
+    }
+
+    public ConsumerMetrics(String metricGroupPrefix) {
+        this(new HashSet<String>(), metricGroupPrefix);
+    }
+
+    private List<MetricNameTemplate> getAllTemplates() {
+        List<MetricNameTemplate> l = new ArrayList<>();
+        l.addAll(this.fetcherMetrics.getAllTemplates());
+        return l;
+    }
+
+    public static void main(String[] args) {
+        Set<String> tags = new HashSet<>();
+        tags.add("client-id");
+        ConsumerMetrics metrics = new ConsumerMetrics(tags, "consumer");
+        System.out.println(Metrics.toHtmlTable("kafka.consumer", metrics.getAllTemplates()));
+    }
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/0bc4f75e/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java
index cd32850..440ca6e 100644
--- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java
+++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java
@@ -125,7 +125,7 @@ public class Fetcher<K, V> implements SubscriptionState.Listener, Closeable {
                    Metadata metadata,
                    SubscriptionState subscriptions,
                    Metrics metrics,
-                   String metricGrpPrefix,
+                   FetcherMetricsRegistry metricsRegistry,
                    Time time,
                    long retryBackoffMs,
                    IsolationLevel isolationLevel) {
@@ -142,7 +142,7 @@ public class Fetcher<K, V> implements SubscriptionState.Listener, Closeable {
         this.keyDeserializer = ensureExtended(keyDeserializer);
         this.valueDeserializer = ensureExtended(valueDeserializer);
         this.completedFetches = new ConcurrentLinkedQueue<>();
-        this.sensors = new FetchManagerMetrics(metrics, metricGrpPrefix);
+        this.sensors = new FetchManagerMetrics(metrics, metricsRegistry);
         this.retryBackoffMs = retryBackoffMs;
         this.isolationLevel = isolationLevel;
 
@@ -931,15 +931,12 @@ public class Fetcher<K, V> implements SubscriptionState.Listener, Closeable {
         sensors.updatePartitionLagSensors(assignment);
     }
 
-    public static Sensor throttleTimeSensor(Metrics metrics, String metricGrpPrefix) {
-        String metricGrpName = metricGrpPrefix + FetchManagerMetrics.METRIC_GROUP_SUFFIX;
+    public static Sensor throttleTimeSensor(Metrics metrics, FetcherMetricsRegistry metricsRegistry) {
         Sensor fetchThrottleTimeSensor = metrics.sensor("fetch-throttle-time");
-        fetchThrottleTimeSensor.add(metrics.metricName("fetch-throttle-time-avg",
-                                                     metricGrpName,
-                                                     "The average throttle time in ms"), new Avg());
-        fetchThrottleTimeSensor.add(metrics.metricName("fetch-throttle-time-max",
-                                                     metricGrpName,
-                                                     "The maximum throttle time in ms"), new Max());
+        fetchThrottleTimeSensor.add(metrics.metricInstance(metricsRegistry.fetchThrottleTimeAvg), new Avg());
+
+        fetchThrottleTimeSensor.add(metrics.metricInstance(metricsRegistry.fetchThrottleTimeMax), new Max());
+
         return fetchThrottleTimeSensor;
     }
 
@@ -1225,54 +1222,35 @@ public class Fetcher<K, V> implements SubscriptionState.Listener, Closeable {
     }
 
     private static class FetchManagerMetrics {
-        private static final String METRIC_GROUP_SUFFIX = "-fetch-manager-metrics";
         private final Metrics metrics;
-        private final String metricGrpName;
+        private FetcherMetricsRegistry metricsRegistry;
         private final Sensor bytesFetched;
         private final Sensor recordsFetched;
         private final Sensor fetchLatency;
         private final Sensor recordsFetchLag;
 
         private Set<TopicPartition> assignedPartitions;
-
-        private FetchManagerMetrics(Metrics metrics, String metricGrpPrefix) {
+        
+        private FetchManagerMetrics(Metrics metrics, FetcherMetricsRegistry metricsRegistry) {
             this.metrics = metrics;
-            this.metricGrpName = metricGrpPrefix + METRIC_GROUP_SUFFIX;
+            this.metricsRegistry = metricsRegistry;
 
             this.bytesFetched = metrics.sensor("bytes-fetched");
-            this.bytesFetched.add(metrics.metricName("fetch-size-avg",
-                this.metricGrpName,
-                "The average number of bytes fetched per request"), new Avg());
-            this.bytesFetched.add(metrics.metricName("fetch-size-max",
-                this.metricGrpName,
-                "The maximum number of bytes fetched per request"), new Max());
-            this.bytesFetched.add(metrics.metricName("bytes-consumed-rate",
-                this.metricGrpName,
-                "The average number of bytes consumed per second"), new Rate());
+            this.bytesFetched.add(metrics.metricInstance(metricsRegistry.fetchSizeAvg), new Avg());
+            this.bytesFetched.add(metrics.metricInstance(metricsRegistry.fetchSizeMax), new Max());
+            this.bytesFetched.add(metrics.metricInstance(metricsRegistry.bytesConsumedRate), new Rate());
 
             this.recordsFetched = metrics.sensor("records-fetched");
-            this.recordsFetched.add(metrics.metricName("records-per-request-avg",
-                this.metricGrpName,
-                "The average number of records in each request"), new Avg());
-            this.recordsFetched.add(metrics.metricName("records-consumed-rate",
-                this.metricGrpName,
-                "The average number of records consumed per second"), new Rate());
+            this.recordsFetched.add(metrics.metricInstance(metricsRegistry.recordsPerRequestAvg), new Avg());
+            this.recordsFetched.add(metrics.metricInstance(metricsRegistry.recordsConsumedRate), new Rate());
 
             this.fetchLatency = metrics.sensor("fetch-latency");
-            this.fetchLatency.add(metrics.metricName("fetch-latency-avg",
-                this.metricGrpName,
-                "The average time taken for a fetch request."), new Avg());
-            this.fetchLatency.add(metrics.metricName("fetch-latency-max",
-                this.metricGrpName,
-                "The max time taken for any fetch request."), new Max());
-            this.fetchLatency.add(metrics.metricName("fetch-rate",
-                this.metricGrpName,
-                "The number of fetch requests per second."), new Rate(new Count()));
+            this.fetchLatency.add(metrics.metricInstance(metricsRegistry.fetchLatencyAvg), new Avg());
+            this.fetchLatency.add(metrics.metricInstance(metricsRegistry.fetchLatencyMax), new Max());
+            this.fetchLatency.add(metrics.metricInstance(metricsRegistry.fetchRequestRate), new Rate(new Count()));
 
             this.recordsFetchLag = metrics.sensor("records-lag");
-            this.recordsFetchLag.add(metrics.metricName("records-lag-max",
-                this.metricGrpName,
-                "The maximum lag in terms of number of records for any partition in this window"), new Max());
+            this.recordsFetchLag.add(metrics.metricInstance(metricsRegistry.recordsLagMax), new Max());
         }
 
         private void recordTopicFetchMetrics(String topic, int bytes, int records) {
@@ -1283,17 +1261,11 @@ public class Fetcher<K, V> implements SubscriptionState.Listener, Closeable {
                 Map<String, String> metricTags = Collections.singletonMap("topic", topic.replace('.', '_'));
 
                 bytesFetched = this.metrics.sensor(name);
-                bytesFetched.add(this.metrics.metricName("fetch-size-avg",
-                        this.metricGrpName,
-                        "The average number of bytes fetched per request for topic " + topic,
+                bytesFetched.add(this.metrics.metricInstance(metricsRegistry.topicFetchSizeAvg,
                         metricTags), new Avg());
-                bytesFetched.add(this.metrics.metricName("fetch-size-max",
-                        this.metricGrpName,
-                        "The maximum number of bytes fetched per request for topic " + topic,
+                bytesFetched.add(this.metrics.metricInstance(metricsRegistry.topicFetchSizeMax,
                         metricTags), new Max());
-                bytesFetched.add(this.metrics.metricName("bytes-consumed-rate",
-                        this.metricGrpName,
-                        "The average number of bytes consumed per second for topic " + topic,
+                bytesFetched.add(this.metrics.metricInstance(metricsRegistry.topicBytesConsumedRate,
                         metricTags), new Rate());
             }
             bytesFetched.record(bytes);
@@ -1306,13 +1278,9 @@ public class Fetcher<K, V> implements SubscriptionState.Listener, Closeable {
                 metricTags.put("topic", topic.replace('.', '_'));
 
                 recordsFetched = this.metrics.sensor(name);
-                recordsFetched.add(this.metrics.metricName("records-per-request-avg",
-                        this.metricGrpName,
-                        "The average number of records in each request for topic " + topic,
+                recordsFetched.add(this.metrics.metricInstance(metricsRegistry.topicRecordsPerRequestAvg,
                         metricTags), new Avg());
-                recordsFetched.add(this.metrics.metricName("records-consumed-rate",
-                        this.metricGrpName,
-                        "The average number of records consumed per second for topic " + topic,
+                recordsFetched.add(this.metrics.metricInstance(metricsRegistry.topicRecordsConsumedRate,
                         metricTags), new Rate());
             }
             recordsFetched.record(records);
@@ -1335,14 +1303,15 @@ public class Fetcher<K, V> implements SubscriptionState.Listener, Closeable {
             Sensor recordsLag = this.metrics.getSensor(name);
             if (recordsLag == null) {
                 recordsLag = this.metrics.sensor(name);
-                recordsLag.add(this.metrics.metricName(name, this.metricGrpName, "The latest lag of the partition"),
-                               new Value());
+                recordsLag.add(this.metrics.metricName(name, 
+                        metricsRegistry.partitionRecordsLag.group(), 
+                        metricsRegistry.partitionRecordsLag.description()), new Value());
                 recordsLag.add(this.metrics.metricName(name + "-max",
-                        this.metricGrpName,
-                        "The max lag of the partition"), new Max());
+                        metricsRegistry.partitionRecordsLagMax.group(),
+                        metricsRegistry.partitionRecordsLagMax.description()), new Max());
                 recordsLag.add(this.metrics.metricName(name + "-avg",
-                        this.metricGrpName,
-                        "The average lag of the partition"), new Avg());
+                        metricsRegistry.partitionRecordsLagAvg.group(),
+                        metricsRegistry.partitionRecordsLagAvg.description()), new Avg());
             }
             recordsLag.record(lag);
         }

http://git-wip-us.apache.org/repos/asf/kafka/blob/0bc4f75e/clients/src/main/java/org/apache/kafka/clients/consumer/internals/FetcherMetricsRegistry.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/FetcherMetricsRegistry.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/FetcherMetricsRegistry.java
new file mode 100644
index 0000000..0c98342
--- /dev/null
+++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/FetcherMetricsRegistry.java
@@ -0,0 +1,140 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.clients.consumer.internals;
+
+import java.util.Arrays;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+
+import org.apache.kafka.common.MetricNameTemplate;
+
+public class FetcherMetricsRegistry {
+
+    public MetricNameTemplate fetchSizeAvg;
+    public MetricNameTemplate fetchSizeMax;
+    public MetricNameTemplate bytesConsumedRate;
+    public MetricNameTemplate recordsPerRequestAvg;
+    public MetricNameTemplate recordsConsumedRate;
+    public MetricNameTemplate fetchLatencyAvg;
+    public MetricNameTemplate fetchLatencyMax;
+    public MetricNameTemplate fetchRequestRate;
+    public MetricNameTemplate recordsLagMax;
+    public MetricNameTemplate fetchThrottleTimeAvg;
+    public MetricNameTemplate fetchThrottleTimeMax;
+    public MetricNameTemplate topicFetchSizeAvg;
+    public MetricNameTemplate topicFetchSizeMax;
+    public MetricNameTemplate topicBytesConsumedRate;
+    public MetricNameTemplate topicRecordsPerRequestAvg;
+    public MetricNameTemplate topicRecordsConsumedRate;
+    public MetricNameTemplate partitionRecordsLag;
+    public MetricNameTemplate partitionRecordsLagMax;
+    public MetricNameTemplate partitionRecordsLagAvg;
+
+    public FetcherMetricsRegistry() {
+        this(new HashSet<String>(), "");
+    }
+
+    public FetcherMetricsRegistry(String metricGrpPrefix) {
+        this(new HashSet<String>(), metricGrpPrefix);
+    }
+
+    public FetcherMetricsRegistry(Set<String> tags, String metricGrpPrefix) {
+        
+        /***** Client level *****/
+        String groupName = metricGrpPrefix + "-fetch-manager-metrics";
+                
+        this.fetchSizeAvg = new MetricNameTemplate("fetch-size-avg", groupName, 
+                "The average number of bytes fetched per request", tags);
+
+        this.fetchSizeMax = new MetricNameTemplate("fetch-size-max", groupName, 
+                "The maximum number of bytes fetched per request", tags);
+        this.bytesConsumedRate = new MetricNameTemplate("bytes-consumed-rate", groupName, 
+                "The average number of bytes consumed per second", tags);
+
+        this.recordsPerRequestAvg = new MetricNameTemplate("records-per-request-avg", groupName, 
+                "The average number of records in each request", tags);
+        this.recordsConsumedRate = new MetricNameTemplate("records-consumed-rate", groupName, 
+                "The average number of records consumed per second", tags);
+
+        this.fetchLatencyAvg = new MetricNameTemplate("fetch-latency-avg", groupName, 
+                "The average time taken for a fetch request.", tags);
+        this.fetchLatencyMax = new MetricNameTemplate("fetch-latency-max", groupName, 
+                "The max time taken for any fetch request.", tags);
+        this.fetchRequestRate = new MetricNameTemplate("fetch-rate", groupName, 
+                "The number of fetch requests per second.", tags);
+
+        this.recordsLagMax = new MetricNameTemplate("records-lag-max", groupName, 
+                "The maximum lag in terms of number of records for any partition in this window", tags);
+
+        this.fetchThrottleTimeAvg = new MetricNameTemplate("fetch-throttle-time-avg", groupName, 
+                "The average throttle time in ms", tags);
+        this.fetchThrottleTimeMax = new MetricNameTemplate("fetch-throttle-time-max", groupName, 
+                "The maximum throttle time in ms", tags);
+
+        /***** Topic level *****/
+        Set<String> topicTags = new HashSet<>(tags);
+        topicTags.add("topic");
+
+        this.topicFetchSizeAvg = new MetricNameTemplate("fetch-size-avg", groupName, 
+                "The average number of bytes fetched per request for a topic", topicTags);
+        this.topicFetchSizeMax = new MetricNameTemplate("fetch-size-max", groupName, 
+                "The maximum number of bytes fetched per request for a topic", topicTags);
+        this.topicBytesConsumedRate = new MetricNameTemplate("bytes-consumed-rate", groupName, 
+                "The average number of bytes consumed per second for a topic", topicTags);
+
+        this.topicRecordsPerRequestAvg = new MetricNameTemplate("records-per-request-avg", groupName, 
+                "The average number of records in each request for a topic", topicTags);
+        this.topicRecordsConsumedRate = new MetricNameTemplate("records-consumed-rate", groupName, 
+                "The average number of records consumed per second for a topic", topicTags);
+        
+        /***** Partition level *****/
+        this.partitionRecordsLag = new MetricNameTemplate("{topic}-{partition}.records-lag", groupName, 
+                "The latest lag of the partition", tags);
+        this.partitionRecordsLagMax = new MetricNameTemplate("{topic}-{partition}.records-lag-max", groupName, 
+                "The max lag of the partition", tags);
+        this.partitionRecordsLagAvg = new MetricNameTemplate("{topic}-{partition}.records-lag-avg", groupName, 
+                "The average lag of the partition", tags);
+        
+    
+    }
+    
+    public List<MetricNameTemplate> getAllTemplates() {
+        return Arrays.asList(
+            fetchSizeAvg,
+            fetchSizeMax,
+            bytesConsumedRate,
+            recordsPerRequestAvg,
+            recordsConsumedRate,
+            fetchLatencyAvg,
+            fetchLatencyMax,
+            fetchRequestRate,
+            recordsLagMax,
+            fetchThrottleTimeAvg,
+            fetchThrottleTimeMax,
+            topicFetchSizeAvg,
+            topicFetchSizeMax,
+            topicBytesConsumedRate,
+            topicRecordsPerRequestAvg,
+            topicRecordsConsumedRate,
+            partitionRecordsLag,
+            partitionRecordsLagAvg,
+            partitionRecordsLagMax
+        );
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/0bc4f75e/clients/src/main/java/org/apache/kafka/common/MetricNameTemplate.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/MetricNameTemplate.java b/clients/src/main/java/org/apache/kafka/common/MetricNameTemplate.java
new file mode 100644
index 0000000..d768f22
--- /dev/null
+++ b/clients/src/main/java/org/apache/kafka/common/MetricNameTemplate.java
@@ -0,0 +1,72 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.common;
+
+import java.util.HashSet;
+import java.util.Set;
+
+import org.apache.kafka.common.utils.Utils;
+
+/**
+ * A template for a MetricName. It contains a name, group, and description, as
+ * well as all the tags that will be used to create the mBean name. Tag values
+ * are omitted from the template, but are filled in at runtime with their
+ * specified values.
+ */
+public class MetricNameTemplate {
+    private final String name;
+    private final String group;
+    private final String description;
+    private Set<String> tags;
+
+    public MetricNameTemplate(String name, String group, String description, Set<String> tags) {
+        this.name = Utils.notNull(name);
+        this.group = Utils.notNull(group);
+        this.description = Utils.notNull(description);
+        this.tags = Utils.notNull(tags);
+    }
+    
+    public MetricNameTemplate(String name, String group, String description, String... keys) {
+        this(name, group, description, getTags(keys));
+    }
+
+    private static Set<String> getTags(String... keys) {
+        Set<String> tags = new HashSet<String>();
+        
+        for (int i = 0; i < keys.length; i++)
+            tags.add(keys[i]);
+
+        return tags;
+    }
+
+    public String name() {
+        return this.name;
+    }
+
+    public String group() {
+        return this.group;
+    }
+
+    public String description() {
+        return this.description;
+    }
+
+
+    public Set<String> tags() {
+        return tags;
+    }
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/0bc4f75e/clients/src/main/java/org/apache/kafka/common/metrics/JmxReporter.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/metrics/JmxReporter.java b/clients/src/main/java/org/apache/kafka/common/metrics/JmxReporter.java
index 52704d6..67dfaa8 100644
--- a/clients/src/main/java/org/apache/kafka/common/metrics/JmxReporter.java
+++ b/clients/src/main/java/org/apache/kafka/common/metrics/JmxReporter.java
@@ -97,7 +97,7 @@ public class JmxReporter implements MetricsReporter {
 
     private KafkaMbean removeAttribute(KafkaMetric metric) {
         MetricName metricName = metric.metricName();
-        String mBeanName = getMBeanName(metricName);
+        String mBeanName = getMBeanName(prefix, metricName);
         KafkaMbean mbean = this.mbeans.get(mBeanName);
         if (mbean != null)
             mbean.removeAttribute(metricName.name());
@@ -107,7 +107,7 @@ public class JmxReporter implements MetricsReporter {
     private KafkaMbean addAttribute(KafkaMetric metric) {
         try {
             MetricName metricName = metric.metricName();
-            String mBeanName = getMBeanName(metricName);
+            String mBeanName = getMBeanName(prefix, metricName);
             if (!this.mbeans.containsKey(mBeanName))
                 mbeans.put(mBeanName, new KafkaMbean(mBeanName));
             KafkaMbean mbean = this.mbeans.get(mBeanName);
@@ -122,7 +122,7 @@ public class JmxReporter implements MetricsReporter {
      * @param metricName
      * @return standard JMX MBean name in the following format domainName:type=metricType,key1=val1,key2=val2
      */
-    private String getMBeanName(MetricName metricName) {
+    static String getMBeanName(String prefix, MetricName metricName) {
         StringBuilder mBeanName = new StringBuilder();
         mBeanName.append(prefix);
         mBeanName.append(":type=");

http://git-wip-us.apache.org/repos/asf/kafka/blob/0bc4f75e/clients/src/main/java/org/apache/kafka/common/metrics/Metrics.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/metrics/Metrics.java b/clients/src/main/java/org/apache/kafka/common/metrics/Metrics.java
index 3861d30..c4cd676 100644
--- a/clients/src/main/java/org/apache/kafka/common/metrics/Metrics.java
+++ b/clients/src/main/java/org/apache/kafka/common/metrics/Metrics.java
@@ -17,6 +17,7 @@
 package org.apache.kafka.common.metrics;
 
 import org.apache.kafka.common.MetricName;
+import org.apache.kafka.common.MetricNameTemplate;
 import org.apache.kafka.common.utils.Time;
 import org.apache.kafka.common.utils.Utils;
 import org.slf4j.Logger;
@@ -26,9 +27,13 @@ import java.io.Closeable;
 import java.util.ArrayList;
 import java.util.Collections;
 import java.util.HashMap;
+import java.util.HashSet;
 import java.util.LinkedHashMap;
 import java.util.List;
 import java.util.Map;
+import java.util.Map.Entry;
+import java.util.Set;
+import java.util.TreeMap;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentMap;
 import java.util.concurrent.ScheduledThreadPoolExecutor;
@@ -226,6 +231,65 @@ public class Metrics implements Closeable {
         return tags;
     }
 
+    public static String toHtmlTable(String domain, List<MetricNameTemplate> allMetrics) {
+        Map<String, Map<String, String>> beansAndAttributes = new TreeMap<String, Map<String, String>>();
+    
+        try (Metrics metrics = new Metrics()) {
+            for (MetricNameTemplate template : allMetrics) {
+                Map<String, String> tags = new TreeMap<String, String>();
+                for (String s : template.tags()) {
+                    tags.put(s, "{" + s + "}");
+                }
+    
+                MetricName metricName = metrics.metricName(template.name(), template.group(), template.description(), tags);
+                String mBeanName = JmxReporter.getMBeanName(domain, metricName);
+                if (!beansAndAttributes.containsKey(mBeanName)) {
+                    beansAndAttributes.put(mBeanName, new TreeMap<String, String>());
+                }
+                Map<String, String> attrAndDesc = beansAndAttributes.get(mBeanName);
+                if (!attrAndDesc.containsKey(template.name())) {
+                    attrAndDesc.put(template.name(), template.description());
+                } else {
+                    throw new IllegalArgumentException("mBean '" + mBeanName + "' attribute '" + template.name() + "' is defined twice.");
+                }
+            }
+        }
+        
+        StringBuilder b = new StringBuilder();
+        b.append("<table class=\"data-table\"><tbody>\n");
+    
+        for (Entry<String, Map<String, String>> e : beansAndAttributes.entrySet()) {
+            b.append("<tr>\n");
+            b.append("<td colspan=3 class=\"mbeanName\" style=\"background-color:#ccc; font-weight: bold;\">");
+            b.append(e.getKey());
+            b.append("</td>");
+            b.append("</tr>\n");
+            
+            b.append("<tr>\n");
+            b.append("<th style=\"width: 90px\"></th>\n");
+            b.append("<th>Attribute name</th>\n");
+            b.append("<th>Description</th>\n");
+            b.append("</tr>\n");
+            
+            for (Entry<String, String> e2 : e.getValue().entrySet()) {
+                b.append("<tr>\n");
+                b.append("<td></td>");
+                b.append("<td>");
+                b.append(e2.getKey());
+                b.append("</td>");
+                b.append("<td>");
+                b.append(e2.getValue());
+                b.append("</td>");
+                b.append("</tr>\n");
+            }
+    
+        }
+        b.append("</tbody></table>");
+    
+        return b.toString();
+    
+    }
+
     public MetricConfig config() {
         return config;
     }
@@ -484,6 +548,25 @@ public class Metrics implements Closeable {
         return Collections.unmodifiableMap(childrenSensors);
     }
 
+    public MetricName metricInstance(MetricNameTemplate template, String... keyValue) {
+        return metricInstance(template, getTags(keyValue));
+    }
+
+    public MetricName metricInstance(MetricNameTemplate template, Map<String, String> tags) {
+        // check to make sure that the runtime defined tags contain all the template tags.
+        Set<String> runtimeTagKeys = new HashSet<>(tags.keySet());
+        runtimeTagKeys.addAll(config().tags().keySet());
+        
+        Set<String> templateTagKeys = template.tags();
+        
+        if (!runtimeTagKeys.equals(templateTagKeys)) {
+            throw new IllegalArgumentException("For '" + template.name() + "', runtime-defined metric tags do not match the tags in the template. " + ""
+                    + "Runtime = " + runtimeTagKeys.toString() + " Template = " + templateTagKeys.toString());
+        }
+                
+        return this.metricName(template.name(), template.group(), template.description(), tags);
+    }
+
     /**
      * Close this metrics repository.
      */

http://git-wip-us.apache.org/repos/asf/kafka/blob/0bc4f75e/clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java
----------------------------------------------------------------------
diff --git a/clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java b/clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java
index 9117f71..1249896 100644
--- a/clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java
+++ b/clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java
@@ -22,6 +22,7 @@ import org.apache.kafka.clients.Metadata;
 import org.apache.kafka.clients.MockClient;
 import org.apache.kafka.clients.consumer.internals.ConsumerCoordinator;
 import org.apache.kafka.clients.consumer.internals.ConsumerInterceptors;
+import org.apache.kafka.clients.consumer.internals.ConsumerMetrics;
 import org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient;
 import org.apache.kafka.clients.consumer.internals.ConsumerProtocol;
 import org.apache.kafka.clients.consumer.internals.Fetcher;
@@ -1540,6 +1541,8 @@ public class KafkaConsumerTest {
         ConsumerInterceptors<String, String> interceptors = null;
 
         Metrics metrics = new Metrics();
+        ConsumerMetrics metricsRegistry = new ConsumerMetrics(metricGroupPrefix);
+
         SubscriptionState subscriptions = new SubscriptionState(autoResetStrategy);
         ConsumerNetworkClient consumerClient = new ConsumerNetworkClient(client, metadata, time, retryBackoffMs, requestTimeoutMs);
         ConsumerCoordinator consumerCoordinator = new ConsumerCoordinator(
@@ -1574,7 +1577,7 @@ public class KafkaConsumerTest {
                 metadata,
                 subscriptions,
                 metrics,
-                metricGroupPrefix,
+                metricsRegistry.fetcherMetrics,
                 time,
                 retryBackoffMs,
                 IsolationLevel.READ_UNCOMMITTED);

http://git-wip-us.apache.org/repos/asf/kafka/blob/0bc4f75e/clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java
----------------------------------------------------------------------
diff --git a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java
index ba5b7d5..720079c 100644
--- a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java
+++ b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java
@@ -122,6 +122,8 @@ public class FetcherTest {
     private Cluster cluster = TestUtils.singletonCluster(topicName, 2);
     private Node node = cluster.nodes().get(0);
     private Metrics metrics = new Metrics(time);
+    FetcherMetricsRegistry metricsRegistry = new FetcherMetricsRegistry("consumer" + groupId);
+
     private SubscriptionState subscriptions = new SubscriptionState(OffsetResetStrategy.EARLIEST);
     private SubscriptionState subscriptionsNoAutoReset = new SubscriptionState(OffsetResetStrategy.NONE);
     private static final double EPSILON = 0.0001;
@@ -1043,7 +1045,7 @@ public class FetcherTest {
     @Test
     public void testQuotaMetrics() throws Exception {
         MockSelector selector = new MockSelector(time);
-        Sensor throttleTimeSensor = Fetcher.throttleTimeSensor(metrics,  "consumer" + groupId);
+        Sensor throttleTimeSensor = Fetcher.throttleTimeSensor(metrics, metricsRegistry);
         Cluster cluster = TestUtils.singletonCluster("test", 1);
         Node node = cluster.nodes().get(0);
         NetworkClient client = new NetworkClient(selector, metadata, "mock", Integer.MAX_VALUE,
@@ -1070,8 +1072,8 @@ public class FetcherTest {
             selector.clear();
         }
         Map<MetricName, KafkaMetric> allMetrics = metrics.metrics();
-        KafkaMetric avgMetric = allMetrics.get(metrics.metricName("fetch-throttle-time-avg", metricGroup, ""));
-        KafkaMetric maxMetric = allMetrics.get(metrics.metricName("fetch-throttle-time-max", metricGroup, ""));
+        KafkaMetric avgMetric = allMetrics.get(metrics.metricInstance(metricsRegistry.fetchThrottleTimeAvg));
+        KafkaMetric maxMetric = allMetrics.get(metrics.metricInstance(metricsRegistry.fetchThrottleTimeMax));
         // Throttle times are ApiVersions=400, Fetch=(100, 200, 300)
         assertEquals(250, avgMetric.value(), EPSILON);
         assertEquals(400, maxMetric.value(), EPSILON);
@@ -1087,7 +1089,7 @@ public class FetcherTest {
         subscriptions.assignFromUser(singleton(tp1));
         subscriptions.seek(tp1, 0);
 
-        MetricName maxLagMetric = metrics.metricName("records-lag-max", metricGroup);
+        MetricName maxLagMetric = metrics.metricInstance(metricsRegistry.recordsLagMax);
         MetricName partitionLagMetric = metrics.metricName(tp1 + ".records-lag", metricGroup);
 
         Map<MetricName, KafkaMetric> allMetrics = metrics.metrics();
@@ -1122,8 +1124,8 @@ public class FetcherTest {
         subscriptions.seek(tp1, 0);
 
         Map<MetricName, KafkaMetric> allMetrics = metrics.metrics();
-        KafkaMetric fetchSizeAverage = allMetrics.get(metrics.metricName("fetch-size-avg", metricGroup));
-        KafkaMetric recordsCountAverage = allMetrics.get(metrics.metricName("records-per-request-avg", metricGroup));
+        KafkaMetric fetchSizeAverage = allMetrics.get(metrics.metricInstance(metricsRegistry.fetchSizeAvg));
+        KafkaMetric recordsCountAverage = allMetrics.get(metrics.metricInstance(metricsRegistry.recordsPerRequestAvg));
 
         MemoryRecordsBuilder builder = MemoryRecords.builder(ByteBuffer.allocate(1024), CompressionType.NONE,
                 TimestampType.CREATE_TIME, 0L);
@@ -1146,8 +1148,8 @@ public class FetcherTest {
         subscriptions.seek(tp1, 1);
 
         Map<MetricName, KafkaMetric> allMetrics = metrics.metrics();
-        KafkaMetric fetchSizeAverage = allMetrics.get(metrics.metricName("fetch-size-avg", metricGroup));
-        KafkaMetric recordsCountAverage = allMetrics.get(metrics.metricName("records-per-request-avg", metricGroup));
+        KafkaMetric fetchSizeAverage = allMetrics.get(metrics.metricInstance(metricsRegistry.fetchSizeAvg));
+        KafkaMetric recordsCountAverage = allMetrics.get(metrics.metricInstance(metricsRegistry.recordsPerRequestAvg));
 
         MemoryRecordsBuilder builder = MemoryRecords.builder(ByteBuffer.allocate(1024), CompressionType.NONE,
                 TimestampType.CREATE_TIME, 0L);
@@ -1173,8 +1175,8 @@ public class FetcherTest {
         subscriptions.seek(tp2, 0);
 
         Map<MetricName, KafkaMetric> allMetrics = metrics.metrics();
-        KafkaMetric fetchSizeAverage = allMetrics.get(metrics.metricName("fetch-size-avg", metricGroup));
-        KafkaMetric recordsCountAverage = allMetrics.get(metrics.metricName("records-per-request-avg", metricGroup));
+        KafkaMetric fetchSizeAverage = allMetrics.get(metrics.metricInstance(metricsRegistry.fetchSizeAvg));
+        KafkaMetric recordsCountAverage = allMetrics.get(metrics.metricInstance(metricsRegistry.recordsPerRequestAvg));
 
         MemoryRecordsBuilder builder = MemoryRecords.builder(ByteBuffer.allocate(1024), CompressionType.NONE,
                 TimestampType.CREATE_TIME, 0L);
@@ -1208,8 +1210,8 @@ public class FetcherTest {
         subscriptions.seek(tp2, 0);
 
         Map<MetricName, KafkaMetric> allMetrics = metrics.metrics();
-        KafkaMetric fetchSizeAverage = allMetrics.get(metrics.metricName("fetch-size-avg", metricGroup));
-        KafkaMetric recordsCountAverage = allMetrics.get(metrics.metricName("records-per-request-avg", metricGroup));
+        KafkaMetric fetchSizeAverage = allMetrics.get(metrics.metricInstance(metricsRegistry.fetchSizeAvg));
+        KafkaMetric recordsCountAverage = allMetrics.get(metrics.metricInstance(metricsRegistry.recordsPerRequestAvg));
 
         // send the fetch and then seek to a new offset
         assertEquals(1, fetcher.sendFetches());
@@ -1816,7 +1818,7 @@ public class FetcherTest {
                 metadata,
                 subscriptions,
                 metrics,
-                "consumer" + groupId,
+                metricsRegistry,
                 time,
                 retryBackoffMs,
                 isolationLevel);

http://git-wip-us.apache.org/repos/asf/kafka/blob/0bc4f75e/clients/src/test/java/org/apache/kafka/common/metrics/MetricsTest.java
----------------------------------------------------------------------
diff --git a/clients/src/test/java/org/apache/kafka/common/metrics/MetricsTest.java b/clients/src/test/java/org/apache/kafka/common/metrics/MetricsTest.java
index dac9570..0904a41 100644
--- a/clients/src/test/java/org/apache/kafka/common/metrics/MetricsTest.java
+++ b/clients/src/test/java/org/apache/kafka/common/metrics/MetricsTest.java
@@ -530,4 +530,58 @@ public class MetricsTest {
     private Double measure(Measurable rate, MetricConfig config) {
         return rate.measure(config, time.milliseconds());
     }
+    
+    @Test
+    public void testMetricInstances() {
+        MetricName n1 = metrics.metricInstance(SampleMetrics.METRIC1, "key1", "value1", "key2", "value2");
+        Map<String, String> tags = new HashMap<String, String>();
+        tags.put("key1", "value1");
+        tags.put("key2", "value2");
+        MetricName n2 = metrics.metricInstance(SampleMetrics.METRIC2, tags);
+        assertEquals("metric names created in two different ways should be equal", n1, n2);
+
+        try {
+            metrics.metricInstance(SampleMetrics.METRIC1, "key1");
+            fail("Creating MetricName with an odd number of keyValue should fail");
+        } catch (IllegalArgumentException e) {
+            // this is expected
+        }
+        
+        Map<String, String> parentTagsWithValues = new HashMap<>();
+        parentTagsWithValues.put("parent-tag", "parent-tag-value");
+
+        Map<String, String> childTagsWithValues = new HashMap<>();
+        childTagsWithValues.put("child-tag", "child-tag-value");
+
+        try (Metrics inherited = new Metrics(new MetricConfig().tags(parentTagsWithValues), Arrays.asList((MetricsReporter) new JmxReporter()), time, true)) {
+            MetricName inheritedMetric = inherited.metricInstance(SampleMetrics.METRIC_WITH_INHERITED_TAGS, childTagsWithValues);
+
+            Map<String, String> filledOutTags = inheritedMetric.tags();
+            assertEquals("parent-tag should be set properly", filledOutTags.get("parent-tag"), "parent-tag-value");
+            assertEquals("child-tag should be set properly", filledOutTags.get("child-tag"), "child-tag-value");
+
+            try {
+                inherited.metricInstance(SampleMetrics.METRIC_WITH_INHERITED_TAGS, parentTagsWithValues);
+                fail("Creating MetricName should fail if the child metrics are not defined at runtime");
+            } catch (IllegalArgumentException e) {
+                // this is expected
+            }
+
+            try {
+
+                Map<String, String> runtimeTags = new HashMap<>();
+                runtimeTags.put("child-tag", "child-tag-value");
+                runtimeTags.put("tag-not-in-template", "unexpected-value");
+
+                inherited.metricInstance(SampleMetrics.METRIC_WITH_INHERITED_TAGS, runtimeTags);
+                fail("Creating MetricName should fail if there is a tag at runtime that is not in the template");
+            } catch (IllegalArgumentException e) {
+                // this is expected
+            }
+        }
+
+    }
+
+    
+
 }

http://git-wip-us.apache.org/repos/asf/kafka/blob/0bc4f75e/clients/src/test/java/org/apache/kafka/common/metrics/SampleMetrics.java
----------------------------------------------------------------------
diff --git a/clients/src/test/java/org/apache/kafka/common/metrics/SampleMetrics.java b/clients/src/test/java/org/apache/kafka/common/metrics/SampleMetrics.java
new file mode 100644
index 0000000..1e3e817
--- /dev/null
+++ b/clients/src/test/java/org/apache/kafka/common/metrics/SampleMetrics.java
@@ -0,0 +1,31 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.common.metrics;
+
+import org.apache.kafka.common.MetricNameTemplate;
+
+/**
+ * A registry of predefined Metrics for the MetricsTest.java class.
+ */
+public class SampleMetrics {
+
+    public static final MetricNameTemplate METRIC1 = new MetricNameTemplate("name", "group", "The first metric used in testMetricName()", "key1", "key2");
+    public static final MetricNameTemplate METRIC2 = new MetricNameTemplate("name", "group", "The second metric used in testMetricName()", "key1", "key2");
+
+    public static final MetricNameTemplate METRIC_WITH_INHERITED_TAGS = new MetricNameTemplate("inherited.tags", "group", "inherited.tags in testMetricName", "parent-tag", "child-tag");
+}
+

http://git-wip-us.apache.org/repos/asf/kafka/blob/0bc4f75e/docs/ops.html
----------------------------------------------------------------------
diff --git a/docs/ops.html b/docs/ops.html
index eface56..23caeaf 100644
--- a/docs/ops.html
+++ b/docs/ops.html
@@ -1224,110 +1224,7 @@
 
   <h5><a id="new_consumer_fetch_monitoring" href="#new_consumer_fetch_monitoring">Consumer Fetch Metrics</a></h5>
 
-  <table class="data-table">
-    <tbody>
-      <tr>
-        <th>Metric/Attribute name</th>
-        <th>Description</th>
-        <th>Mbean name</th>
-      </tr>
-      <tr>
-        <td>fetch-size-avg</td>
-        <td>The average number of bytes fetched per request</td>
-        <td>kafka.consumer:type=consumer-fetch-manager-metrics,client-id=([-.\w]+)</td>
-      </tr>
-      <tr>
-        <td>fetch-size-max</td>
-        <td>The maximum number of bytes fetched per request</td>
-        <td>kafka.consumer:type=consumer-fetch-manager-metrics,client-id=([-.\w]+)</td>
-      </tr>
-      <tr>
-        <td>bytes-consumed-rate</td>
-        <td>The average number of bytes consumed per second</td>
-        <td>kafka.consumer:type=consumer-fetch-manager-metrics,client-id=([-.\w]+)</td>
-      </tr>
-      <tr>
-        <td>records-per-request-avg</td>
-        <td>The average number of records in each request</td>
-        <td>kafka.consumer:type=consumer-fetch-manager-metrics,client-id=([-.\w]+)</td>
-      </tr>
-      <tr>
-        <td>records-consumed-rate</td>
-        <td>The average number of records consumed per second</td>
-        <td>kafka.consumer:type=consumer-fetch-manager-metrics,client-id=([-.\w]+)</td>
-      </tr>
-      <tr>
-        <td>fetch-latency-avg</td>
-        <td>The average time taken for a fetch request</td>
-        <td>kafka.consumer:type=consumer-fetch-manager-metrics,client-id=([-.\w]+)</td>
-      </tr>
-      <tr>
-        <td>fetch-latency-max</td>
-        <td>The max time taken for a fetch request</td>
-        <td>kafka.consumer:type=consumer-fetch-manager-metrics,client-id=([-.\w]+)</td>
-      </tr>
-      <tr>
-        <td>fetch-rate</td>
-        <td>The number of fetch requests per second</td>
-        <td>kafka.consumer:type=consumer-fetch-manager-metrics,client-id=([-.\w]+)</td>
-      </tr>
-      <tr>
-        <td>records-lag-max</td>
-        <td>The maximum lag in terms of number of records for any partition in this window</td>
-        <td>kafka.consumer:type=consumer-fetch-manager-metrics,client-id=([-.\w]+)</td>
-      </tr>
-      <tr>
-        <td>fetch-throttle-time-avg</td>
-        <td>The average throttle time in ms</td>
-        <td>kafka.consumer:type=consumer-fetch-manager-metrics,client-id=([-.\w]+)</td>
-      </tr>
-      <tr>
-        <td>fetch-throttle-time-max</td>
-        <td>The maximum throttle time in ms</td>
-        <td>kafka.consumer:type=consumer-fetch-manager-metrics,client-id=([-.\w]+)</td>
-      </tr>
-    </tbody>
-  </table>
-
-
-  <h5><a id="topic_fetch_monitoring" href="#topic_fetch_monitoring">Topic-level Fetch Metrics</a></h5>
-
-  <table class="data-table">
-    <tbody>
-      <tr>
-        <th>Metric/Attribute name</th>
-        <th>Description</th>
-        <th>Mbean name</th>
-      </tr>
-      <tr>
-        <td>fetch-size-avg</td>
-        <td>The average number of bytes fetched per request for a specific topic.</td>
-        <td>kafka.consumer:type=consumer-fetch-manager-metrics,client-id=([-.\w]+),topic=([-.\w]+)</td>
-      </tr>
-      <tr>
-        <td>fetch-size-max</td>
-        <td>The maximum number of bytes fetched per request for a specific topic.</td>
-        <td>kafka.consumer:type=consumer-fetch-manager-metrics,client-id=([-.\w]+),topic=([-.\w]+)</td>
-      </tr>
-      <tr>
-        <td>bytes-consumed-rate</td>
-        <td>The average number of bytes consumed per second for a specific topic.</td>
-        <td>kafka.consumer:type=consumer-fetch-manager-metrics,client-id=([-.\w]+),topic=([-.\w]+)</td>
-      </tr>
-      <tr>
-        <td>records-per-request-avg</td>
-        <td>The average number of records in each request for a specific topic.</td>
-        <td>kafka.consumer:type=consumer-fetch-manager-metrics,client-id=([-.\w]+),topic=([-.\w]+)</td>
-      </tr>
-      <tr>
-        <td>records-consumed-rate</td>
-        <td>The average number of records consumed per second for a specific topic.</td>
-        <td>kafka.consumer:type=consumer-fetch-manager-metrics,client-id=([-.\w]+),topic=([-.\w]+)</td>
-      </tr>
-    </tbody>
-  </table>
-
-
+  <!--#include virtual="generated/consumer_metrics.html" -->
 
   <h4><a id="kafka_streams_monitoring" href="#kafka_streams_monitoring">Streams Monitoring</a></h4>