You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by ju...@apache.org on 2015/12/09 04:43:12 UTC

[2/2] kafka git commit: KAFKA-2668; Add a metric that records the total number of metrics

KAFKA-2668; Add a metric that records the total number of metrics

onurkaraman becketqin Do you have time to review this patch? It addresses the ticket that jjkoshy filed in KAFKA-2668.

Author: Dong Lin <li...@gmail.com>

Reviewers: Onur Karaman <ok...@linkedin.com>, Joel Koshy <jj...@gmail.com>, Guozhang Wang <wa...@gmail.com>, Jun Rao <ju...@gmail.com>

Closes #328 from lindong28/KAFKA-2668


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/ef92a8ae
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/ef92a8ae
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/ef92a8ae

Branch: refs/heads/trunk
Commit: ef92a8ae7479560b26edecfa8db79934065f13cf
Parents: ee6b5e0
Author: Dong Lin <li...@gmail.com>
Authored: Tue Dec 8 19:43:05 2015 -0800
Committer: Jun Rao <ju...@gmail.com>
Committed: Tue Dec 8 19:43:05 2015 -0800

----------------------------------------------------------------------
 .../kafka/clients/consumer/KafkaConsumer.java   |  14 +-
 .../consumer/internals/AbstractCoordinator.java |  51 +++----
 .../consumer/internals/ConsumerCoordinator.java |  28 ++--
 .../clients/consumer/internals/Fetcher.java     |  61 ++++----
 .../kafka/clients/producer/KafkaProducer.java   |  15 +-
 .../clients/producer/internals/BufferPool.java  |  11 +-
 .../producer/internals/RecordAccumulator.java   |  18 ++-
 .../clients/producer/internals/Sender.java      |  47 +++----
 .../org/apache/kafka/common/MetricName.java     |  34 ++++-
 .../kafka/common/metrics/MetricConfig.java      |  13 ++
 .../apache/kafka/common/metrics/Metrics.java    |  95 +++++++++++++
 .../apache/kafka/common/network/Selector.java   |  50 +++----
 .../internals/ConsumerCoordinatorTest.java      |   4 -
 .../clients/consumer/internals/FetcherTest.java |   7 +-
 .../producer/internals/BufferPoolTest.java      |  13 +-
 .../internals/RecordAccumulatorTest.java        |  22 ++-
 .../clients/producer/internals/SenderTest.java  |  38 ++---
 .../kafka/common/metrics/JmxReporterTest.java   |   9 +-
 .../kafka/common/metrics/MetricsTest.java       | 140 ++++++++++---------
 .../kafka/common/network/SelectorTest.java      |   2 +-
 .../kafka/common/network/SslSelectorTest.java   |   5 +-
 .../common/network/SslTransportLayerTest.java   |   7 +-
 .../org/apache/kafka/test/MetricsBench.java     |  11 +-
 .../runtime/distributed/WorkerCoordinator.java  |  23 ++-
 .../runtime/distributed/WorkerGroupMember.java  |  12 +-
 .../distributed/WorkerCoordinatorTest.java      |   3 -
 .../main/scala/kafka/admin/AdminClient.scala    |   1 -
 .../consumer/ZookeeperConsumerConnector.scala   |   9 ++
 .../main/scala/kafka/network/SocketServer.scala |   4 +-
 .../scala/kafka/server/ClientQuotaManager.scala |   6 +-
 .../main/scala/kafka/server/KafkaServer.scala   |   9 ++
 core/src/main/scala/kafka/tools/JmxTool.scala   |   3 -
 .../integration/kafka/api/QuotasTest.scala      |  32 ++---
 .../kafka/server/ClientQuotaManagerTest.scala   |   2 +-
 .../processor/internals/StreamThread.java       |  34 ++---
 35 files changed, 458 insertions(+), 375 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/ef92a8ae/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java b/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java
index c559593..912b307 100644
--- a/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java
+++ b/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java
@@ -531,12 +531,14 @@ public class KafkaConsumer<K, V> implements Consumer<K, V> {
                 throw new ConfigException(ConsumerConfig.REQUEST_TIMEOUT_MS_CONFIG + " should be greater than " + ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG + " and " + ConsumerConfig.FETCH_MAX_WAIT_MS_CONFIG);
             this.time = new SystemTime();
 
-            MetricConfig metricConfig = new MetricConfig().samples(config.getInt(ConsumerConfig.METRICS_NUM_SAMPLES_CONFIG))
-                    .timeWindow(config.getLong(ConsumerConfig.METRICS_SAMPLE_WINDOW_MS_CONFIG),
-                            TimeUnit.MILLISECONDS);
             clientId = config.getString(ConsumerConfig.CLIENT_ID_CONFIG);
             if (clientId.length() <= 0)
                 clientId = "consumer-" + CONSUMER_CLIENT_ID_SEQUENCE.getAndIncrement();
+            Map<String, String> metricsTags = new LinkedHashMap<String, String>();
+            metricsTags.put("client-id", clientId);
+            MetricConfig metricConfig = new MetricConfig().samples(config.getInt(ConsumerConfig.METRICS_NUM_SAMPLES_CONFIG))
+                    .timeWindow(config.getLong(ConsumerConfig.METRICS_SAMPLE_WINDOW_MS_CONFIG), TimeUnit.MILLISECONDS)
+                    .tags(metricsTags);
             List<MetricsReporter> reporters = config.getConfiguredInstances(ConsumerConfig.METRIC_REPORTER_CLASSES_CONFIG,
                     MetricsReporter.class);
             reporters.add(new JmxReporter(JMX_PREFIX));
@@ -546,11 +548,9 @@ public class KafkaConsumer<K, V> implements Consumer<K, V> {
             List<InetSocketAddress> addresses = ClientUtils.parseAndValidateAddresses(config.getList(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG));
             this.metadata.update(Cluster.bootstrap(addresses), 0);
             String metricGrpPrefix = "consumer";
-            Map<String, String> metricsTags = new LinkedHashMap<String, String>();
-            metricsTags.put("client-id", clientId);
             ChannelBuilder channelBuilder = ClientUtils.createChannelBuilder(config.values());
             NetworkClient netClient = new NetworkClient(
-                    new Selector(config.getLong(ConsumerConfig.CONNECTIONS_MAX_IDLE_MS_CONFIG), metrics, time, metricGrpPrefix, metricsTags, channelBuilder),
+                    new Selector(config.getLong(ConsumerConfig.CONNECTIONS_MAX_IDLE_MS_CONFIG), metrics, time, metricGrpPrefix, channelBuilder),
                     this.metadata,
                     clientId,
                     100, // a fixed large enough value will suffice
@@ -573,7 +573,6 @@ public class KafkaConsumer<K, V> implements Consumer<K, V> {
                     this.subscriptions,
                     metrics,
                     metricGrpPrefix,
-                    metricsTags,
                     this.time,
                     retryBackoffMs,
                     new ConsumerCoordinator.DefaultOffsetCommitCallback(),
@@ -606,7 +605,6 @@ public class KafkaConsumer<K, V> implements Consumer<K, V> {
                     this.subscriptions,
                     metrics,
                     metricGrpPrefix,
-                    metricsTags,
                     this.time,
                     this.retryBackoffMs);
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/ef92a8ae/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java
index 33886ed..c6492bc 100644
--- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java
+++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java
@@ -14,7 +14,6 @@ package org.apache.kafka.clients.consumer.internals;
 
 import org.apache.kafka.clients.ClientResponse;
 import org.apache.kafka.common.KafkaException;
-import org.apache.kafka.common.MetricName;
 import org.apache.kafka.common.Node;
 import org.apache.kafka.common.errors.DisconnectException;
 import org.apache.kafka.common.errors.GroupAuthorizationException;
@@ -107,7 +106,6 @@ public abstract class AbstractCoordinator implements Closeable {
                                int heartbeatIntervalMs,
                                Metrics metrics,
                                String metricGrpPrefix,
-                               Map<String, String> metricTags,
                                Time time,
                                long retryBackoffMs) {
         this.client = client;
@@ -119,7 +117,7 @@ public abstract class AbstractCoordinator implements Closeable {
         this.sessionTimeoutMs = sessionTimeoutMs;
         this.heartbeat = new Heartbeat(this.sessionTimeoutMs, heartbeatIntervalMs, time.milliseconds());
         this.heartbeatTask = new HeartbeatTask();
-        this.sensors = new GroupCoordinatorMetrics(metrics, metricGrpPrefix, metricTags);
+        this.sensors = new GroupCoordinatorMetrics(metrics, metricGrpPrefix);
         this.retryBackoffMs = retryBackoffMs;
     }
 
@@ -679,47 +677,39 @@ public abstract class AbstractCoordinator implements Closeable {
         public final Sensor joinLatency;
         public final Sensor syncLatency;
 
-        public GroupCoordinatorMetrics(Metrics metrics, String metricGrpPrefix, Map<String, String> tags) {
+        public GroupCoordinatorMetrics(Metrics metrics, String metricGrpPrefix) {
             this.metrics = metrics;
             this.metricGrpName = metricGrpPrefix + "-coordinator-metrics";
 
             this.heartbeatLatency = metrics.sensor("heartbeat-latency");
-            this.heartbeatLatency.add(new MetricName("heartbeat-response-time-max",
+            this.heartbeatLatency.add(metrics.metricName("heartbeat-response-time-max",
                 this.metricGrpName,
-                "The max time taken to receive a response to a heartbeat request",
-                tags), new Max());
-            this.heartbeatLatency.add(new MetricName("heartbeat-rate",
+                "The max time taken to receive a response to a heartbeat request"), new Max());
+            this.heartbeatLatency.add(metrics.metricName("heartbeat-rate",
                 this.metricGrpName,
-                "The average number of heartbeats per second",
-                tags), new Rate(new Count()));
+                "The average number of heartbeats per second"), new Rate(new Count()));
 
             this.joinLatency = metrics.sensor("join-latency");
-            this.joinLatency.add(new MetricName("join-time-avg",
+            this.joinLatency.add(metrics.metricName("join-time-avg",
                     this.metricGrpName,
-                    "The average time taken for a group rejoin",
-                    tags), new Avg());
-            this.joinLatency.add(new MetricName("join-time-max",
+                    "The average time taken for a group rejoin"), new Avg());
+            this.joinLatency.add(metrics.metricName("join-time-max",
                     this.metricGrpName,
-                    "The max time taken for a group rejoin",
-                    tags), new Avg());
-            this.joinLatency.add(new MetricName("join-rate",
+                    "The max time taken for a group rejoin"), new Avg());
+            this.joinLatency.add(metrics.metricName("join-rate",
                     this.metricGrpName,
-                    "The number of group joins per second",
-                    tags), new Rate(new Count()));
+                    "The number of group joins per second"), new Rate(new Count()));
 
             this.syncLatency = metrics.sensor("sync-latency");
-            this.syncLatency.add(new MetricName("sync-time-avg",
+            this.syncLatency.add(metrics.metricName("sync-time-avg",
                     this.metricGrpName,
-                    "The average time taken for a group sync",
-                    tags), new Avg());
-            this.syncLatency.add(new MetricName("sync-time-max",
+                    "The average time taken for a group sync"), new Avg());
+            this.syncLatency.add(metrics.metricName("sync-time-max",
                     this.metricGrpName,
-                    "The max time taken for a group sync",
-                    tags), new Avg());
-            this.syncLatency.add(new MetricName("sync-rate",
+                    "The max time taken for a group sync"), new Avg());
+            this.syncLatency.add(metrics.metricName("sync-rate",
                     this.metricGrpName,
-                    "The number of group syncs per second",
-                    tags), new Rate(new Count()));
+                    "The number of group syncs per second"), new Rate(new Count()));
 
             Measurable lastHeartbeat =
                 new Measurable() {
@@ -727,10 +717,9 @@ public abstract class AbstractCoordinator implements Closeable {
                         return TimeUnit.SECONDS.convert(now - heartbeat.lastHeartbeatSend(), TimeUnit.MILLISECONDS);
                     }
                 };
-            metrics.addMetric(new MetricName("last-heartbeat-seconds-ago",
+            metrics.addMetric(metrics.metricName("last-heartbeat-seconds-ago",
                 this.metricGrpName,
-                "The number of seconds since the last controller heartbeat",
-                tags),
+                "The number of seconds since the last controller heartbeat"),
                 lastHeartbeat);
         }
     }

http://git-wip-us.apache.org/repos/asf/kafka/blob/ef92a8ae/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java
index 4ac05a3..41d2a27 100644
--- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java
+++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java
@@ -22,7 +22,6 @@ import org.apache.kafka.clients.consumer.internals.PartitionAssignor.Assignment;
 import org.apache.kafka.clients.consumer.internals.PartitionAssignor.Subscription;
 import org.apache.kafka.common.Cluster;
 import org.apache.kafka.common.KafkaException;
-import org.apache.kafka.common.MetricName;
 import org.apache.kafka.common.TopicPartition;
 import org.apache.kafka.common.errors.GroupAuthorizationException;
 import org.apache.kafka.common.errors.TopicAuthorizationException;
@@ -82,7 +81,6 @@ public final class ConsumerCoordinator extends AbstractCoordinator {
                                SubscriptionState subscriptions,
                                Metrics metrics,
                                String metricGrpPrefix,
-                               Map<String, String> metricTags,
                                Time time,
                                long retryBackoffMs,
                                OffsetCommitCallback defaultOffsetCommitCallback,
@@ -94,7 +92,6 @@ public final class ConsumerCoordinator extends AbstractCoordinator {
                 heartbeatIntervalMs,
                 metrics,
                 metricGrpPrefix,
-                metricTags,
                 time,
                 retryBackoffMs);
         this.metadata = metadata;
@@ -109,7 +106,7 @@ public final class ConsumerCoordinator extends AbstractCoordinator {
         addMetadataListener();
 
         this.autoCommitTask = autoCommitEnabled ? new AutoCommitTask(autoCommitIntervalMs) : null;
-        this.sensors = new ConsumerCoordinatorMetrics(metrics, metricGrpPrefix, metricTags);
+        this.sensors = new ConsumerCoordinatorMetrics(metrics, metricGrpPrefix);
     }
 
     @Override
@@ -639,23 +636,20 @@ public final class ConsumerCoordinator extends AbstractCoordinator {
 
         public final Sensor commitLatency;
 
-        public ConsumerCoordinatorMetrics(Metrics metrics, String metricGrpPrefix, Map<String, String> tags) {
+        public ConsumerCoordinatorMetrics(Metrics metrics, String metricGrpPrefix) {
             this.metrics = metrics;
             this.metricGrpName = metricGrpPrefix + "-coordinator-metrics";
 
             this.commitLatency = metrics.sensor("commit-latency");
-            this.commitLatency.add(new MetricName("commit-latency-avg",
+            this.commitLatency.add(metrics.metricName("commit-latency-avg",
                 this.metricGrpName,
-                "The average time taken for a commit request",
-                tags), new Avg());
-            this.commitLatency.add(new MetricName("commit-latency-max",
+                "The average time taken for a commit request"), new Avg());
+            this.commitLatency.add(metrics.metricName("commit-latency-max",
                 this.metricGrpName,
-                "The max time taken for a commit request",
-                tags), new Max());
-            this.commitLatency.add(new MetricName("commit-rate",
+                "The max time taken for a commit request"), new Max());
+            this.commitLatency.add(metrics.metricName("commit-rate",
                 this.metricGrpName,
-                "The number of commit calls per second",
-                tags), new Rate(new Count()));
+                "The number of commit calls per second"), new Rate(new Count()));
 
             Measurable numParts =
                 new Measurable() {
@@ -663,11 +657,9 @@ public final class ConsumerCoordinator extends AbstractCoordinator {
                         return subscriptions.assignedPartitions().size();
                     }
                 };
-            metrics.addMetric(new MetricName("assigned-partitions",
+            metrics.addMetric(metrics.metricName("assigned-partitions",
                 this.metricGrpName,
-                "The number of partitions currently assigned to this consumer",
-                tags),
-                numParts);
+                "The number of partitions currently assigned to this consumer"), numParts);
         }
     }
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/ef92a8ae/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java
index 5708869..e152088 100644
--- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java
+++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java
@@ -21,7 +21,6 @@ import org.apache.kafka.clients.consumer.OffsetOutOfRangeException;
 import org.apache.kafka.clients.consumer.OffsetResetStrategy;
 import org.apache.kafka.common.Cluster;
 import org.apache.kafka.common.KafkaException;
-import org.apache.kafka.common.MetricName;
 import org.apache.kafka.common.Node;
 import org.apache.kafka.common.PartitionInfo;
 import org.apache.kafka.common.TopicPartition;
@@ -99,7 +98,6 @@ public class Fetcher<K, V> {
                    SubscriptionState subscriptions,
                    Metrics metrics,
                    String metricGrpPrefix,
-                   Map<String, String> metricTags,
                    Time time,
                    long retryBackoffMs) {
 
@@ -120,7 +118,7 @@ public class Fetcher<K, V> {
         this.unauthorizedTopics = new HashSet<>();
         this.recordTooLargePartitions = new HashMap<>();
 
-        this.sensors = new FetchManagerMetrics(metrics, metricGrpPrefix, metricTags);
+        this.sensors = new FetchManagerMetrics(metrics, metricGrpPrefix);
         this.retryBackoffMs = retryBackoffMs;
     }
 
@@ -656,64 +654,53 @@ public class Fetcher<K, V> {
         public final Sensor fetchThrottleTimeSensor;
 
 
-        public FetchManagerMetrics(Metrics metrics, String metricGrpPrefix, Map<String, String> tags) {
+        public FetchManagerMetrics(Metrics metrics, String metricGrpPrefix) {
             this.metrics = metrics;
             this.metricGrpName = metricGrpPrefix + "-fetch-manager-metrics";
 
             this.bytesFetched = metrics.sensor("bytes-fetched");
-            this.bytesFetched.add(new MetricName("fetch-size-avg",
+            this.bytesFetched.add(metrics.metricName("fetch-size-avg",
                 this.metricGrpName,
-                "The average number of bytes fetched per request",
-                tags), new Avg());
-            this.bytesFetched.add(new MetricName("fetch-size-max",
+                "The average number of bytes fetched per request"), new Avg());
+            this.bytesFetched.add(metrics.metricName("fetch-size-max",
                 this.metricGrpName,
-                "The maximum number of bytes fetched per request",
-                tags), new Max());
-            this.bytesFetched.add(new MetricName("bytes-consumed-rate",
+                "The maximum number of bytes fetched per request"), new Max());
+            this.bytesFetched.add(metrics.metricName("bytes-consumed-rate",
                 this.metricGrpName,
-                "The average number of bytes consumed per second",
-                tags), new Rate());
+                "The average number of bytes consumed per second"), new Rate());
 
             this.recordsFetched = metrics.sensor("records-fetched");
-            this.recordsFetched.add(new MetricName("records-per-request-avg",
+            this.recordsFetched.add(metrics.metricName("records-per-request-avg",
                 this.metricGrpName,
-                "The average number of records in each request",
-                tags), new Avg());
-            this.recordsFetched.add(new MetricName("records-consumed-rate",
+                "The average number of records in each request"), new Avg());
+            this.recordsFetched.add(metrics.metricName("records-consumed-rate",
                 this.metricGrpName,
-                "The average number of records consumed per second",
-                tags), new Rate());
+                "The average number of records consumed per second"), new Rate());
 
             this.fetchLatency = metrics.sensor("fetch-latency");
-            this.fetchLatency.add(new MetricName("fetch-latency-avg",
+            this.fetchLatency.add(metrics.metricName("fetch-latency-avg",
                 this.metricGrpName,
-                "The average time taken for a fetch request.",
-                tags), new Avg());
-            this.fetchLatency.add(new MetricName("fetch-latency-max",
+                "The average time taken for a fetch request."), new Avg());
+            this.fetchLatency.add(metrics.metricName("fetch-latency-max",
                 this.metricGrpName,
-                "The max time taken for any fetch request.",
-                tags), new Max());
-            this.fetchLatency.add(new MetricName("fetch-rate",
+                "The max time taken for any fetch request."), new Max());
+            this.fetchLatency.add(metrics.metricName("fetch-rate",
                 this.metricGrpName,
-                "The number of fetch requests per second.",
-                tags), new Rate(new Count()));
+                "The number of fetch requests per second."), new Rate(new Count()));
 
             this.recordsFetchLag = metrics.sensor("records-lag");
-            this.recordsFetchLag.add(new MetricName("records-lag-max",
+            this.recordsFetchLag.add(metrics.metricName("records-lag-max",
                 this.metricGrpName,
-                "The maximum lag in terms of number of records for any partition in this window",
-                tags), new Max());
+                "The maximum lag in terms of number of records for any partition in this window"), new Max());
 
             this.fetchThrottleTimeSensor = metrics.sensor("fetch-throttle-time");
-            this.fetchThrottleTimeSensor.add(new MetricName("fetch-throttle-time-avg",
+            this.fetchThrottleTimeSensor.add(metrics.metricName("fetch-throttle-time-avg",
                                                          this.metricGrpName,
-                                                         "The average throttle time in ms",
-                                                         tags), new Avg());
+                                                         "The average throttle time in ms"), new Avg());
 
-            this.fetchThrottleTimeSensor.add(new MetricName("fetch-throttle-time-max",
+            this.fetchThrottleTimeSensor.add(metrics.metricName("fetch-throttle-time-max",
                                                          this.metricGrpName,
-                                                         "The maximum throttle time in ms",
-                                                         tags), new Max());
+                                                         "The maximum throttle time in ms"), new Max());
         }
 
         public void recordTopicFetchMetrics(String topic, int bytes, int records) {

http://git-wip-us.apache.org/repos/asf/kafka/blob/ef92a8ae/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java b/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java
index 38fb6a6..49560b5 100644
--- a/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java
+++ b/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java
@@ -203,12 +203,14 @@ public class KafkaProducer<K, V> implements Producer<K, V> {
             this.producerConfig = config;
             this.time = new SystemTime();
 
-            MetricConfig metricConfig = new MetricConfig().samples(config.getInt(ProducerConfig.METRICS_NUM_SAMPLES_CONFIG))
-                    .timeWindow(config.getLong(ProducerConfig.METRICS_SAMPLE_WINDOW_MS_CONFIG),
-                            TimeUnit.MILLISECONDS);
             clientId = config.getString(ProducerConfig.CLIENT_ID_CONFIG);
             if (clientId.length() <= 0)
                 clientId = "producer-" + PRODUCER_CLIENT_ID_SEQUENCE.getAndIncrement();
+            Map<String, String> metricTags = new LinkedHashMap<String, String>();
+            metricTags.put("client-id", clientId);
+            MetricConfig metricConfig = new MetricConfig().samples(config.getInt(ProducerConfig.METRICS_NUM_SAMPLES_CONFIG))
+                    .timeWindow(config.getLong(ProducerConfig.METRICS_SAMPLE_WINDOW_MS_CONFIG), TimeUnit.MILLISECONDS)
+                    .tags(metricTags);
             List<MetricsReporter> reporters = config.getConfiguredInstances(ProducerConfig.METRIC_REPORTER_CLASSES_CONFIG,
                     MetricsReporter.class);
             reporters.add(new JmxReporter(JMX_PREFIX));
@@ -256,21 +258,18 @@ public class KafkaProducer<K, V> implements Producer<K, V> {
                 this.requestTimeoutMs = config.getInt(ProducerConfig.REQUEST_TIMEOUT_MS_CONFIG);
             }
 
-            Map<String, String> metricTags = new LinkedHashMap<String, String>();
-            metricTags.put("client-id", clientId);
             this.accumulator = new RecordAccumulator(config.getInt(ProducerConfig.BATCH_SIZE_CONFIG),
                     this.totalMemorySize,
                     this.compressionType,
                     config.getLong(ProducerConfig.LINGER_MS_CONFIG),
                     retryBackoffMs,
                     metrics,
-                    time,
-                    metricTags);
+                    time);
             List<InetSocketAddress> addresses = ClientUtils.parseAndValidateAddresses(config.getList(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG));
             this.metadata.update(Cluster.bootstrap(addresses), time.milliseconds());
             ChannelBuilder channelBuilder = ClientUtils.createChannelBuilder(config.values());
             NetworkClient client = new NetworkClient(
-                    new Selector(config.getLong(ProducerConfig.CONNECTIONS_MAX_IDLE_MS_CONFIG), this.metrics, time, "producer", metricTags, channelBuilder),
+                    new Selector(config.getLong(ProducerConfig.CONNECTIONS_MAX_IDLE_MS_CONFIG), this.metrics, time, "producer", channelBuilder),
                     this.metadata,
                     clientId,
                     config.getInt(ProducerConfig.MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION),

http://git-wip-us.apache.org/repos/asf/kafka/blob/ef92a8ae/clients/src/main/java/org/apache/kafka/clients/producer/internals/BufferPool.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/internals/BufferPool.java b/clients/src/main/java/org/apache/kafka/clients/producer/internals/BufferPool.java
index 2a45075..f881e62 100644
--- a/clients/src/main/java/org/apache/kafka/clients/producer/internals/BufferPool.java
+++ b/clients/src/main/java/org/apache/kafka/clients/producer/internals/BufferPool.java
@@ -19,7 +19,6 @@ package org.apache.kafka.clients.producer.internals;
 import java.nio.ByteBuffer;
 import java.util.ArrayDeque;
 import java.util.Deque;
-import java.util.Map;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.locks.Condition;
 import java.util.concurrent.locks.ReentrantLock;
@@ -62,9 +61,8 @@ public final class BufferPool {
      * @param metrics instance of Metrics
      * @param time time instance
      * @param metricGrpName logical group name for metrics
-     * @param metricTags additional key/val attributes for metrics
      */
-    public BufferPool(long memory, int poolableSize, Metrics metrics, Time time , String metricGrpName , Map<String, String> metricTags) {
+    public BufferPool(long memory, int poolableSize, Metrics metrics, Time time, String metricGrpName) {
         this.poolableSize = poolableSize;
         this.lock = new ReentrantLock();
         this.free = new ArrayDeque<ByteBuffer>();
@@ -74,10 +72,9 @@ public final class BufferPool {
         this.metrics = metrics;
         this.time = time;
         this.waitTime = this.metrics.sensor("bufferpool-wait-time");
-        MetricName metricName = new MetricName("bufferpool-wait-ratio",
-                                               metricGrpName,
-                                               "The fraction of time an appender waits for space allocation.",
-                                               metricTags);
+        MetricName metricName = metrics.metricName("bufferpool-wait-ratio",
+                                                   metricGrpName,
+                                                   "The fraction of time an appender waits for space allocation.");
         this.waitTime.add(metricName, new Rate(TimeUnit.NANOSECONDS));
     }
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/ef92a8ae/clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java b/clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java
index d4a8a23..4b394f9 100644
--- a/clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java
+++ b/clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java
@@ -85,7 +85,6 @@ public final class RecordAccumulator {
      *        exhausting all retries in a short period of time.
      * @param metrics The metrics
      * @param time The time instance to use
-     * @param metricTags additional key/value attributes of the metric
      */
     public RecordAccumulator(int batchSize,
                              long totalSize,
@@ -93,8 +92,7 @@ public final class RecordAccumulator {
                              long lingerMs,
                              long retryBackoffMs,
                              Metrics metrics,
-                             Time time,
-                             Map<String, String> metricTags) {
+                             Time time) {
         this.drainIndex = 0;
         this.closed = false;
         this.flushesInProgress = new AtomicInteger(0);
@@ -105,14 +103,14 @@ public final class RecordAccumulator {
         this.retryBackoffMs = retryBackoffMs;
         this.batches = new CopyOnWriteMap<TopicPartition, Deque<RecordBatch>>();
         String metricGrpName = "producer-metrics";
-        this.free = new BufferPool(totalSize, batchSize, metrics, time , metricGrpName , metricTags);
+        this.free = new BufferPool(totalSize, batchSize, metrics, time, metricGrpName);
         this.incomplete = new IncompleteRecordBatches();
         this.time = time;
-        registerMetrics(metrics, metricGrpName, metricTags);
+        registerMetrics(metrics, metricGrpName);
     }
 
-    private void registerMetrics(Metrics metrics, String metricGrpName, Map<String, String> metricTags) {
-        MetricName metricName = new MetricName("waiting-threads", metricGrpName, "The number of user threads blocked waiting for buffer memory to enqueue their records", metricTags);
+    private void registerMetrics(Metrics metrics, String metricGrpName) {
+        MetricName metricName = metrics.metricName("waiting-threads", metricGrpName, "The number of user threads blocked waiting for buffer memory to enqueue their records");
         Measurable waitingThreads = new Measurable() {
             public double measure(MetricConfig config, long now) {
                 return free.queued();
@@ -120,7 +118,7 @@ public final class RecordAccumulator {
         };
         metrics.addMetric(metricName, waitingThreads);
 
-        metricName = new MetricName("buffer-total-bytes", metricGrpName, "The maximum amount of buffer memory the client can use (whether or not it is currently used).", metricTags);
+        metricName = metrics.metricName("buffer-total-bytes", metricGrpName, "The maximum amount of buffer memory the client can use (whether or not it is currently used).");
         Measurable totalBytes = new Measurable() {
             public double measure(MetricConfig config, long now) {
                 return free.totalMemory();
@@ -128,7 +126,7 @@ public final class RecordAccumulator {
         };
         metrics.addMetric(metricName, totalBytes);
 
-        metricName = new MetricName("buffer-available-bytes", metricGrpName, "The total amount of buffer memory that is not being used (either unallocated or in the free list).", metricTags);
+        metricName = metrics.metricName("buffer-available-bytes", metricGrpName, "The total amount of buffer memory that is not being used (either unallocated or in the free list).");
         Measurable availableBytes = new Measurable() {
             public double measure(MetricConfig config, long now) {
                 return free.availableMemory();
@@ -137,7 +135,7 @@ public final class RecordAccumulator {
         metrics.addMetric(metricName, availableBytes);
 
         Sensor bufferExhaustedRecordSensor = metrics.sensor("buffer-exhausted-records");
-        metricName = new MetricName("buffer-exhausted-rate", metricGrpName, "The average per-second number of record sends that are dropped due to buffer exhaustion", metricTags);
+        metricName = metrics.metricName("buffer-exhausted-rate", metricGrpName, "The average per-second number of record sends that are dropped due to buffer exhaustion");
         bufferExhaustedRecordSensor.add(metricName, new Rate());
     }
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/ef92a8ae/clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java b/clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java
index cada626..b8215e1 100644
--- a/clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java
+++ b/clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java
@@ -367,65 +367,63 @@ public class Sender implements Runnable {
 
         public SenderMetrics(Metrics metrics) {
             this.metrics = metrics;
-            Map<String, String> metricTags = new LinkedHashMap<String, String>();
-            metricTags.put("client-id", clientId);
             String metricGrpName = "producer-metrics";
 
             this.batchSizeSensor = metrics.sensor("batch-size");
-            MetricName m = new MetricName("batch-size-avg", metricGrpName, "The average number of bytes sent per partition per-request.", metricTags);
+            MetricName m = metrics.metricName("batch-size-avg", metricGrpName, "The average number of bytes sent per partition per-request.");
             this.batchSizeSensor.add(m, new Avg());
-            m = new MetricName("batch-size-max", metricGrpName, "The max number of bytes sent per partition per-request.", metricTags);
+            m = metrics.metricName("batch-size-max", metricGrpName, "The max number of bytes sent per partition per-request.");
             this.batchSizeSensor.add(m, new Max());
 
             this.compressionRateSensor = metrics.sensor("compression-rate");
-            m = new MetricName("compression-rate-avg", metricGrpName, "The average compression rate of record batches.", metricTags);
+            m = metrics.metricName("compression-rate-avg", metricGrpName, "The average compression rate of record batches.");
             this.compressionRateSensor.add(m, new Avg());
 
             this.queueTimeSensor = metrics.sensor("queue-time");
-            m = new MetricName("record-queue-time-avg", metricGrpName, "The average time in ms record batches spent in the record accumulator.", metricTags);
+            m = metrics.metricName("record-queue-time-avg", metricGrpName, "The average time in ms record batches spent in the record accumulator.");
             this.queueTimeSensor.add(m, new Avg());
-            m = new MetricName("record-queue-time-max", metricGrpName, "The maximum time in ms record batches spent in the record accumulator.", metricTags);
+            m = metrics.metricName("record-queue-time-max", metricGrpName, "The maximum time in ms record batches spent in the record accumulator.");
             this.queueTimeSensor.add(m, new Max());
 
             this.requestTimeSensor = metrics.sensor("request-time");
-            m = new MetricName("request-latency-avg", metricGrpName, "The average request latency in ms", metricTags);
+            m = metrics.metricName("request-latency-avg", metricGrpName, "The average request latency in ms");
             this.requestTimeSensor.add(m, new Avg());
-            m = new MetricName("request-latency-max", metricGrpName, "The maximum request latency in ms", metricTags);
+            m = metrics.metricName("request-latency-max", metricGrpName, "The maximum request latency in ms");
             this.requestTimeSensor.add(m, new Max());
 
             this.produceThrottleTimeSensor = metrics.sensor("produce-throttle-time");
-            m = new MetricName("produce-throttle-time-avg", metricGrpName, "The average throttle time in ms", metricTags);
+            m = metrics.metricName("produce-throttle-time-avg", metricGrpName, "The average throttle time in ms");
             this.produceThrottleTimeSensor.add(m, new Avg());
-            m = new MetricName("produce-throttle-time-max", metricGrpName, "The maximum throttle time in ms", metricTags);
+            m = metrics.metricName("produce-throttle-time-max", metricGrpName, "The maximum throttle time in ms");
             this.produceThrottleTimeSensor.add(m, new Max());
 
             this.recordsPerRequestSensor = metrics.sensor("records-per-request");
-            m = new MetricName("record-send-rate", metricGrpName, "The average number of records sent per second.", metricTags);
+            m = metrics.metricName("record-send-rate", metricGrpName, "The average number of records sent per second.");
             this.recordsPerRequestSensor.add(m, new Rate());
-            m = new MetricName("records-per-request-avg", metricGrpName, "The average number of records per request.", metricTags);
+            m = metrics.metricName("records-per-request-avg", metricGrpName, "The average number of records per request.");
             this.recordsPerRequestSensor.add(m, new Avg());
 
             this.retrySensor = metrics.sensor("record-retries");
-            m = new MetricName("record-retry-rate", metricGrpName, "The average per-second number of retried record sends", metricTags);
+            m = metrics.metricName("record-retry-rate", metricGrpName, "The average per-second number of retried record sends");
             this.retrySensor.add(m, new Rate());
 
             this.errorSensor = metrics.sensor("errors");
-            m = new MetricName("record-error-rate", metricGrpName, "The average per-second number of record sends that resulted in errors", metricTags);
+            m = metrics.metricName("record-error-rate", metricGrpName, "The average per-second number of record sends that resulted in errors");
             this.errorSensor.add(m, new Rate());
 
             this.maxRecordSizeSensor = metrics.sensor("record-size-max");
-            m = new MetricName("record-size-max", metricGrpName, "The maximum record size", metricTags);
+            m = metrics.metricName("record-size-max", metricGrpName, "The maximum record size");
             this.maxRecordSizeSensor.add(m, new Max());
-            m = new MetricName("record-size-avg", metricGrpName, "The average record size", metricTags);
+            m = metrics.metricName("record-size-avg", metricGrpName, "The average record size");
             this.maxRecordSizeSensor.add(m, new Avg());
 
-            m = new MetricName("requests-in-flight", metricGrpName, "The current number of in-flight requests awaiting a response.", metricTags);
+            m = metrics.metricName("requests-in-flight", metricGrpName, "The current number of in-flight requests awaiting a response.");
             this.metrics.addMetric(m, new Measurable() {
                 public double measure(MetricConfig config, long now) {
                     return client.inFlightRequestCount();
                 }
             });
-            m = new MetricName("metadata-age", metricGrpName, "The age in seconds of the current producer metadata being used.", metricTags);
+            m = metrics.metricName("metadata-age", metricGrpName, "The age in seconds of the current producer metadata being used.");
             metrics.addMetric(m, new Measurable() {
                 public double measure(MetricConfig config, long now) {
                     return (now - metadata.lastSuccessfulUpdate()) / 1000.0;
@@ -440,32 +438,31 @@ public class Sender implements Runnable {
             Sensor topicRecordCount = this.metrics.getSensor(topicRecordsCountName);
             if (topicRecordCount == null) {
                 Map<String, String> metricTags = new LinkedHashMap<String, String>();
-                metricTags.put("client-id", clientId);
                 metricTags.put("topic", topic);
                 String metricGrpName = "producer-topic-metrics";
 
                 topicRecordCount = this.metrics.sensor(topicRecordsCountName);
-                MetricName m = new MetricName("record-send-rate", metricGrpName , metricTags);
+                MetricName m = this.metrics.metricName("record-send-rate", metricGrpName, metricTags);
                 topicRecordCount.add(m, new Rate());
 
                 String topicByteRateName = "topic." + topic + ".bytes";
                 Sensor topicByteRate = this.metrics.sensor(topicByteRateName);
-                m = new MetricName("byte-rate", metricGrpName , metricTags);
+                m = this.metrics.metricName("byte-rate", metricGrpName, metricTags);
                 topicByteRate.add(m, new Rate());
 
                 String topicCompressionRateName = "topic." + topic + ".compression-rate";
                 Sensor topicCompressionRate = this.metrics.sensor(topicCompressionRateName);
-                m = new MetricName("compression-rate", metricGrpName , metricTags);
+                m = this.metrics.metricName("compression-rate", metricGrpName, metricTags);
                 topicCompressionRate.add(m, new Avg());
 
                 String topicRetryName = "topic." + topic + ".record-retries";
                 Sensor topicRetrySensor = this.metrics.sensor(topicRetryName);
-                m = new MetricName("record-retry-rate", metricGrpName , metricTags);
+                m = this.metrics.metricName("record-retry-rate", metricGrpName, metricTags);
                 topicRetrySensor.add(m, new Rate());
 
                 String topicErrorName = "topic." + topic + ".record-errors";
                 Sensor topicErrorSensor = this.metrics.sensor(topicErrorName);
-                m = new MetricName("record-error-rate", metricGrpName , metricTags);
+                m = this.metrics.metricName("record-error-rate", metricGrpName, metricTags);
                 topicErrorSensor.add(m, new Rate());
             }
         }

http://git-wip-us.apache.org/repos/asf/kafka/blob/ef92a8ae/clients/src/main/java/org/apache/kafka/common/MetricName.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/MetricName.java b/clients/src/main/java/org/apache/kafka/common/MetricName.java
index ee50f33..2b81030 100644
--- a/clients/src/main/java/org/apache/kafka/common/MetricName.java
+++ b/clients/src/main/java/org/apache/kafka/common/MetricName.java
@@ -18,7 +18,7 @@ import java.util.Map;
 import org.apache.kafka.common.utils.Utils;
 
 /**
- * The <code>MetricName</code> class encapsulates a metric's name, logical group and its related attributes
+ * The <code>MetricName</code> class encapsulates a metric's name, logical group and its related attributes. It should be constructed using metrics.MetricName(...).
  * <p>
  * This class captures the following parameters
  * <pre>
@@ -31,23 +31,27 @@ import org.apache.kafka.common.utils.Utils;
  * <p>
  * Ex: standard JMX MBean can be constructed like  <b>domainName:type=group,key1=val1,key2=val2</b>
  * <p>
+ *
  * Usage looks something like this:
  * <pre>{@code
  * // set up metrics:
- * Metrics metrics = new Metrics(); // this is the global repository of metrics and sensors
- * Sensor sensor = metrics.sensor("message-sizes");
  *
  * Map<String, String> metricTags = new LinkedHashMap<String, String>();
  * metricTags.put("client-id", "producer-1");
  * metricTags.put("topic", "topic");
  *
- * MetricName metricName = new MetricName("message-size-avg", "producer-metrics", "average message size", metricTags);
+ * MetricConfig metricConfig = new MetricConfig().tags(metricTags);
+ * Metrics metrics = new Metrics(metricConfig); // this is the global repository of metrics and sensors
+ *
+ * Sensor sensor = metrics.sensor("message-sizes");
+ *
+ * MetricName metricName = metrics.metricName("message-size-avg", "producer-metrics", "average message size");
  * sensor.add(metricName, new Avg());
  *
- * metricName = new MetricName("message-size-max", "producer-metrics", metricTags);
+ * metricName = metrics.metricName("message-size-max", "producer-metrics");
  * sensor.add(metricName, new Max());
  *
- * metricName = new MetricName("message-size-min", "producer-metrics", "message minimum size", "client-id", "my-client", "topic", "my-topic");
+ * metricName = metrics.metricName("message-size-min", "producer-metrics", "message minimum size", "client-id", "my-client", "topic", "my-topic");
  * sensor.add(metricName, new Min());
  *
  * // as messages are sent we record the sizes
@@ -63,6 +67,8 @@ public final class MetricName {
     private int hash = 0;
 
     /**
+     * Please create MetricName by method {@link org.apache.kafka.common.metrics.Metrics#metricName(String, String, String, Map<String, String>)}
+     *
      * @param name        The name of the metric
      * @param group       logical group name of the metrics to which this metric belongs
      * @param description A human-readable description to include in the metric
@@ -76,11 +82,15 @@ public final class MetricName {
     }
 
     /**
+     * @deprecated This method will be removed in a future release.
+     * Please create MetricName by method {@link org.apache.kafka.common.metrics.Metrics#metricName(String, String, String, String...)}
+     *
      * @param name          The name of the metric
      * @param group         logical group name of the metrics to which this metric belongs
      * @param description   A human-readable description to include in the metric
      * @param keyValue      additional key/value attributes of the metric (must come in pairs)
      */
+    @Deprecated
     public MetricName(String name, String group, String description, String... keyValue) {
         this(name, group, description, getTags(keyValue));
     }
@@ -97,27 +107,39 @@ public final class MetricName {
     }
 
     /**
+     * @deprecated This method will be removed in a future release.
+     * Please create MetricName by method {@link org.apache.kafka.common.metrics.Metrics#metricName(String, String, Map<String, String>)}
+     *
      * @param name  The name of the metric
      * @param group logical group name of the metrics to which this metric belongs
      * @param tags  key/value attributes of the metric
      */
+    @Deprecated
     public MetricName(String name, String group, Map<String, String> tags) {
         this(name, group, "", tags);
     }
 
     /**
+     * @deprecated This method will be removed in a future release.
+     * Please create MetricName by method {@link org.apache.kafka.common.metrics.Metrics#metricName(String, String, String)}
+     *
      * @param name        The name of the metric
      * @param group       logical group name of the metrics to which this metric belongs
      * @param description A human-readable description to include in the metric
      */
+    @Deprecated
     public MetricName(String name, String group, String description) {
         this(name, group, description, new HashMap<String, String>());
     }
 
     /**
+     * @deprecated This method will be removed in a future release.
+     * Please create MetricName by method {@link org.apache.kafka.common.metrics.Metrics#metricName(String, String)}
+     *
      * @param name  The name of the metric
      * @param group logical group name of the metrics to which this metric belongs
      */
+    @Deprecated
     public MetricName(String name, String group) {
         this(name, group, "", new HashMap<String, String>());
     }

http://git-wip-us.apache.org/repos/asf/kafka/blob/ef92a8ae/clients/src/main/java/org/apache/kafka/common/metrics/MetricConfig.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/metrics/MetricConfig.java b/clients/src/main/java/org/apache/kafka/common/metrics/MetricConfig.java
index dfa1b0a..6bd351d 100644
--- a/clients/src/main/java/org/apache/kafka/common/metrics/MetricConfig.java
+++ b/clients/src/main/java/org/apache/kafka/common/metrics/MetricConfig.java
@@ -16,6 +16,8 @@
  */
 package org.apache.kafka.common.metrics;
 
+import java.util.LinkedHashMap;
+import java.util.Map;
 import java.util.concurrent.TimeUnit;
 
 /**
@@ -28,6 +30,7 @@ public class MetricConfig {
     private long eventWindow;
     private long timeWindowMs;
     private TimeUnit unit;
+    private Map<String, String> tags;
 
     public MetricConfig() {
         super();
@@ -36,6 +39,7 @@ public class MetricConfig {
         this.eventWindow = Long.MAX_VALUE;
         this.timeWindowMs = TimeUnit.MILLISECONDS.convert(30, TimeUnit.SECONDS);
         this.unit = TimeUnit.SECONDS;
+        this.tags = new LinkedHashMap<>();
     }
 
     public Quota quota() {
@@ -65,6 +69,15 @@ public class MetricConfig {
         return this;
     }
 
+    public Map<String, String> tags() {
+        return this.tags;
+    }
+
+    public MetricConfig tags(Map<String, String> tags) {
+        this.tags = tags;
+        return this;
+    }
+
     public int samples() {
         return this.samples;
     }

http://git-wip-us.apache.org/repos/asf/kafka/blob/ef92a8ae/clients/src/main/java/org/apache/kafka/common/metrics/Metrics.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/metrics/Metrics.java b/clients/src/main/java/org/apache/kafka/common/metrics/Metrics.java
index fdb7dac..842e0f7 100644
--- a/clients/src/main/java/org/apache/kafka/common/metrics/Metrics.java
+++ b/clients/src/main/java/org/apache/kafka/common/metrics/Metrics.java
@@ -15,6 +15,8 @@ package org.apache.kafka.common.metrics;
 import java.io.Closeable;
 import java.util.ArrayList;
 import java.util.Collections;
+import java.util.HashMap;
+import java.util.LinkedHashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.concurrent.ConcurrentHashMap;
@@ -81,6 +83,15 @@ public class Metrics implements Closeable {
     }
 
     /**
+     * Create a metrics repository with no metric reporters and the given default configuration.
+     * Expiration of Sensors is disabled.
+     */
+    public Metrics(MetricConfig defaultConfig, Time time) {
+      this(defaultConfig, new ArrayList<MetricsReporter>(0), time);
+    }
+
+
+  /**
      * Create a metrics repository with no reporters and the given default config. This config will be used for any
      * metric that doesn't override its own config. Expiration of Sensors is disabled.
      * @param defaultConfig The default config to use for all metrics that don't override their config
@@ -130,6 +141,90 @@ public class Metrics implements Closeable {
         } else {
             this.metricsScheduler = null;
         }
+
+        addMetric(metricName("count", "kafka-metrics-count", "total number of registered metrics"),
+            new Measurable() {
+                @Override
+                public double measure(MetricConfig config, long now) {
+                    return metrics.size();
+                }
+            });
+    }
+
+    /**
+     * Create a MetricName with the given name, group, description and tags, plus default tags specified in the metric
+     * configuration. Tag in tags takes precedence if the same tag key is specified in the default metric configuration.
+     *
+     * @param name        The name of the metric
+     * @param group       logical group name of the metrics to which this metric belongs
+     * @param description A human-readable description to include in the metric
+     * @param tags        additional key/value attributes of the metric
+     */
+    public MetricName metricName(String name, String group, String description, Map<String, String> tags) {
+        Map<String, String> combinedTag = new LinkedHashMap<>(config.tags());
+        combinedTag.putAll(tags);
+        return new MetricName(name, group, description, combinedTag);
+    }
+
+    /**
+     * Create a MetricName with the given name, group, description, and default tags
+     * specified in the metric configuration.
+     *
+     * @param name        The name of the metric
+     * @param group       logical group name of the metrics to which this metric belongs
+     * @param description A human-readable description to include in the metric
+     */
+    public MetricName metricName(String name, String group, String description) {
+        return metricName(name, group, description, new HashMap<String, String>());
+    }
+
+    /**
+     * Create a MetricName with the given name, group and default tags specified in the metric configuration.
+     *
+     * @param name        The name of the metric
+     * @param group       logical group name of the metrics to which this metric belongs
+     */
+    public MetricName metricName(String name, String group) {
+        return metricName(name, group, "", new HashMap<String, String>());
+    }
+
+    /**
+     * Create a MetricName with the given name, group, description, and keyValue as tags,  plus default tags specified in the metric
+     * configuration. Tag in keyValue takes precedence if the same tag key is specified in the default metric configuration.
+     *
+     * @param name          The name of the metric
+     * @param group         logical group name of the metrics to which this metric belongs
+     * @param description   A human-readable description to include in the metric
+     * @param keyValue      additional key/value attributes of the metric (must come in pairs)
+     */
+    public MetricName metricName(String name, String group, String description, String... keyValue) {
+        return metricName(name, group, description, getTags(keyValue));
+    }
+
+    /**
+     * Create a MetricName with the given name, group and tags, plus default tags specified in the metric
+     * configuration. Tag in tags takes precedence if the same tag key is specified in the default metric configuration.
+     *
+     * @param name  The name of the metric
+     * @param group logical group name of the metrics to which this metric belongs
+     * @param tags  key/value attributes of the metric
+     */
+    public MetricName metricName(String name, String group, Map<String, String> tags) {
+        return metricName(name, group, "", tags);
+    }
+
+    private static Map<String, String> getTags(String... keyValue) {
+        if ((keyValue.length % 2) != 0)
+            throw new IllegalArgumentException("keyValue needs to be specified in pairs");
+        Map<String, String> tags = new HashMap<String, String>();
+
+        for (int i = 0; i < keyValue.length; i += 2)
+            tags.put(keyValue[i], keyValue[i + 1]);
+        return tags;
+    }
+
+    public MetricConfig config() {
+        return config;
     }
 
     /**

http://git-wip-us.apache.org/repos/asf/kafka/blob/ef92a8ae/clients/src/main/java/org/apache/kafka/common/network/Selector.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/network/Selector.java b/clients/src/main/java/org/apache/kafka/common/network/Selector.java
index 639a2be..387c063 100644
--- a/clients/src/main/java/org/apache/kafka/common/network/Selector.java
+++ b/clients/src/main/java/org/apache/kafka/common/network/Selector.java
@@ -118,8 +118,8 @@ public class Selector implements Selectable {
         this.metricsPerConnection = metricsPerConnection;
     }
 
-    public Selector(long connectionMaxIdleMS, Metrics metrics, Time time, String metricGrpPrefix, Map<String, String> metricTags, ChannelBuilder channelBuilder) {
-        this(NetworkReceive.UNLIMITED, connectionMaxIdleMS, metrics, time, metricGrpPrefix, metricTags, true, channelBuilder);
+    public Selector(long connectionMaxIdleMS, Metrics metrics, Time time, String metricGrpPrefix, ChannelBuilder channelBuilder) {
+        this(NetworkReceive.UNLIMITED, connectionMaxIdleMS, metrics, time, metricGrpPrefix, new HashMap<String, String>(), true, channelBuilder);
     }
 
     /**
@@ -568,48 +568,48 @@ public class Selector implements Selectable {
             }
 
             this.connectionClosed = sensor("connections-closed:" + tagsSuffix.toString());
-            MetricName metricName = new MetricName("connection-close-rate", metricGrpName, "Connections closed per second in the window.", metricTags);
+            MetricName metricName = metrics.metricName("connection-close-rate", metricGrpName, "Connections closed per second in the window.", metricTags);
             this.connectionClosed.add(metricName, new Rate());
 
             this.connectionCreated = sensor("connections-created:" + tagsSuffix.toString());
-            metricName = new MetricName("connection-creation-rate", metricGrpName, "New connections established per second in the window.", metricTags);
+            metricName = metrics.metricName("connection-creation-rate", metricGrpName, "New connections established per second in the window.", metricTags);
             this.connectionCreated.add(metricName, new Rate());
 
             this.bytesTransferred = sensor("bytes-sent-received:" + tagsSuffix.toString());
-            metricName = new MetricName("network-io-rate", metricGrpName, "The average number of network operations (reads or writes) on all connections per second.", metricTags);
+            metricName = metrics.metricName("network-io-rate", metricGrpName, "The average number of network operations (reads or writes) on all connections per second.", metricTags);
             bytesTransferred.add(metricName, new Rate(new Count()));
 
             this.bytesSent = sensor("bytes-sent:" + tagsSuffix.toString(), bytesTransferred);
-            metricName = new MetricName("outgoing-byte-rate", metricGrpName, "The average number of outgoing bytes sent per second to all servers.", metricTags);
+            metricName = metrics.metricName("outgoing-byte-rate", metricGrpName, "The average number of outgoing bytes sent per second to all servers.", metricTags);
             this.bytesSent.add(metricName, new Rate());
-            metricName = new MetricName("request-rate", metricGrpName, "The average number of requests sent per second.", metricTags);
+            metricName = metrics.metricName("request-rate", metricGrpName, "The average number of requests sent per second.", metricTags);
             this.bytesSent.add(metricName, new Rate(new Count()));
-            metricName = new MetricName("request-size-avg", metricGrpName, "The average size of all requests in the window..", metricTags);
+            metricName = metrics.metricName("request-size-avg", metricGrpName, "The average size of all requests in the window..", metricTags);
             this.bytesSent.add(metricName, new Avg());
-            metricName = new MetricName("request-size-max", metricGrpName, "The maximum size of any request sent in the window.", metricTags);
+            metricName = metrics.metricName("request-size-max", metricGrpName, "The maximum size of any request sent in the window.", metricTags);
             this.bytesSent.add(metricName, new Max());
 
             this.bytesReceived = sensor("bytes-received:" + tagsSuffix.toString(), bytesTransferred);
-            metricName = new MetricName("incoming-byte-rate", metricGrpName, "Bytes/second read off all sockets", metricTags);
+            metricName = metrics.metricName("incoming-byte-rate", metricGrpName, "Bytes/second read off all sockets", metricTags);
             this.bytesReceived.add(metricName, new Rate());
-            metricName = new MetricName("response-rate", metricGrpName, "Responses received sent per second.", metricTags);
+            metricName = metrics.metricName("response-rate", metricGrpName, "Responses received sent per second.", metricTags);
             this.bytesReceived.add(metricName, new Rate(new Count()));
 
             this.selectTime = sensor("select-time:" + tagsSuffix.toString());
-            metricName = new MetricName("select-rate", metricGrpName, "Number of times the I/O layer checked for new I/O to perform per second", metricTags);
+            metricName = metrics.metricName("select-rate", metricGrpName, "Number of times the I/O layer checked for new I/O to perform per second", metricTags);
             this.selectTime.add(metricName, new Rate(new Count()));
-            metricName = new MetricName("io-wait-time-ns-avg", metricGrpName, "The average length of time the I/O thread spent waiting for a socket ready for reads or writes in nanoseconds.", metricTags);
+            metricName = metrics.metricName("io-wait-time-ns-avg", metricGrpName, "The average length of time the I/O thread spent waiting for a socket ready for reads or writes in nanoseconds.", metricTags);
             this.selectTime.add(metricName, new Avg());
-            metricName = new MetricName("io-wait-ratio", metricGrpName, "The fraction of time the I/O thread spent waiting.", metricTags);
+            metricName = metrics.metricName("io-wait-ratio", metricGrpName, "The fraction of time the I/O thread spent waiting.", metricTags);
             this.selectTime.add(metricName, new Rate(TimeUnit.NANOSECONDS));
 
             this.ioTime = sensor("io-time:" + tagsSuffix.toString());
-            metricName = new MetricName("io-time-ns-avg", metricGrpName, "The average length of time for I/O per select call in nanoseconds.", metricTags);
+            metricName = metrics.metricName("io-time-ns-avg", metricGrpName, "The average length of time for I/O per select call in nanoseconds.", metricTags);
             this.ioTime.add(metricName, new Avg());
-            metricName = new MetricName("io-ratio", metricGrpName, "The fraction of time the I/O thread spent doing I/O", metricTags);
+            metricName = metrics.metricName("io-ratio", metricGrpName, "The fraction of time the I/O thread spent doing I/O", metricTags);
             this.ioTime.add(metricName, new Rate(TimeUnit.NANOSECONDS));
 
-            metricName = new MetricName("connection-count", metricGrpName, "The current number of active connections.", metricTags);
+            metricName = metrics.metricName("connection-count", metricGrpName, "The current number of active connections.", metricTags);
             topLevelMetricNames.add(metricName);
             this.metrics.addMetric(metricName, new Measurable() {
                 public double measure(MetricConfig config, long now) {
@@ -637,27 +637,27 @@ public class Selector implements Selectable {
                     tags.put("node-id", "node-" + connectionId);
 
                     nodeRequest = sensor(nodeRequestName);
-                    MetricName metricName = new MetricName("outgoing-byte-rate", metricGrpName, tags);
+                    MetricName metricName = metrics.metricName("outgoing-byte-rate", metricGrpName, tags);
                     nodeRequest.add(metricName, new Rate());
-                    metricName = new MetricName("request-rate", metricGrpName, "The average number of requests sent per second.", tags);
+                    metricName = metrics.metricName("request-rate", metricGrpName, "The average number of requests sent per second.", tags);
                     nodeRequest.add(metricName, new Rate(new Count()));
-                    metricName = new MetricName("request-size-avg", metricGrpName, "The average size of all requests in the window..", tags);
+                    metricName = metrics.metricName("request-size-avg", metricGrpName, "The average size of all requests in the window..", tags);
                     nodeRequest.add(metricName, new Avg());
-                    metricName = new MetricName("request-size-max", metricGrpName, "The maximum size of any request sent in the window.", tags);
+                    metricName = metrics.metricName("request-size-max", metricGrpName, "The maximum size of any request sent in the window.", tags);
                     nodeRequest.add(metricName, new Max());
 
                     String nodeResponseName = "node-" + connectionId + ".bytes-received";
                     Sensor nodeResponse = sensor(nodeResponseName);
-                    metricName = new MetricName("incoming-byte-rate", metricGrpName, tags);
+                    metricName = metrics.metricName("incoming-byte-rate", metricGrpName, tags);
                     nodeResponse.add(metricName, new Rate());
-                    metricName = new MetricName("response-rate", metricGrpName, "The average number of responses received per second.", tags);
+                    metricName = metrics.metricName("response-rate", metricGrpName, "The average number of responses received per second.", tags);
                     nodeResponse.add(metricName, new Rate(new Count()));
 
                     String nodeTimeName = "node-" + connectionId + ".latency";
                     Sensor nodeRequestTime = sensor(nodeTimeName);
-                    metricName = new MetricName("request-latency-avg", metricGrpName, tags);
+                    metricName = metrics.metricName("request-latency-avg", metricGrpName, tags);
                     nodeRequestTime.add(metricName, new Avg());
-                    metricName = new MetricName("request-latency-max", metricGrpName, tags);
+                    metricName = metrics.metricName("request-latency-max", metricGrpName, tags);
                     nodeRequestTime.add(metricName, new Max());
                 }
             }

http://git-wip-us.apache.org/repos/asf/kafka/blob/ef92a8ae/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinatorTest.java
----------------------------------------------------------------------
diff --git a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinatorTest.java b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinatorTest.java
index 9f9682a..3ae1a36 100644
--- a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinatorTest.java
+++ b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinatorTest.java
@@ -60,7 +60,6 @@ import java.util.Arrays;
 import java.util.Collection;
 import java.util.Collections;
 import java.util.HashMap;
-import java.util.LinkedHashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.concurrent.atomic.AtomicBoolean;
@@ -89,7 +88,6 @@ public class ConsumerCoordinatorTest {
     private SubscriptionState subscriptions;
     private Metadata metadata;
     private Metrics metrics;
-    private Map<String, String> metricTags = new LinkedHashMap<>();
     private ConsumerNetworkClient consumerClient;
     private MockRebalanceListener rebalanceListener;
     private MockCommitCallback defaultOffsetCommitCallback;
@@ -109,7 +107,6 @@ public class ConsumerCoordinatorTest {
         this.partitionAssignor.clear();
 
         client.setNode(node);
-
         this.coordinator = buildCoordinator(metrics, assignors);
     }
 
@@ -912,7 +909,6 @@ public class ConsumerCoordinatorTest {
                 subscriptions,
                 metrics,
                 "consumer" + groupId,
-                metricTags,
                 time,
                 retryBackoffMs,
                 defaultOffsetCommitCallback,

http://git-wip-us.apache.org/repos/asf/kafka/blob/ef92a8ae/clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java
----------------------------------------------------------------------
diff --git a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java
index 1ffff4a..7e8bd40 100644
--- a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java
+++ b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java
@@ -54,7 +54,6 @@ import java.nio.ByteBuffer;
 import java.util.Arrays;
 import java.util.Collections;
 import java.util.HashMap;
-import java.util.LinkedHashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.Random;
@@ -83,7 +82,6 @@ public class FetcherTest {
     private SubscriptionState subscriptions = new SubscriptionState(OffsetResetStrategy.EARLIEST);
     private SubscriptionState subscriptionsNoAutoReset = new SubscriptionState(OffsetResetStrategy.NONE);
     private Metrics metrics = new Metrics(time);
-    private Map<String, String> metricTags = new LinkedHashMap<String, String>();
     private static final double EPSILON = 0.0001;
     private ConsumerNetworkClient consumerClient = new ConsumerNetworkClient(client, metadata, time, 100);
 
@@ -484,8 +482,8 @@ public class FetcherTest {
         }
 
         Map<MetricName, KafkaMetric> allMetrics = metrics.metrics();
-        KafkaMetric avgMetric = allMetrics.get(new MetricName("fetch-throttle-time-avg", metricGroup, "", metricTags));
-        KafkaMetric maxMetric = allMetrics.get(new MetricName("fetch-throttle-time-max", metricGroup, "", metricTags));
+        KafkaMetric avgMetric = allMetrics.get(metrics.metricName("fetch-throttle-time-avg", metricGroup, ""));
+        KafkaMetric maxMetric = allMetrics.get(metrics.metricName("fetch-throttle-time-max", metricGroup, ""));
         assertEquals(200, avgMetric.value(), EPSILON);
         assertEquals(300, maxMetric.value(), EPSILON);
     }
@@ -527,7 +525,6 @@ public class FetcherTest {
                 subscriptions,
                 metrics,
                 "consumer" + groupId,
-                metricTags,
                 time,
                 retryBackoffMs);
     }

http://git-wip-us.apache.org/repos/asf/kafka/blob/ef92a8ae/clients/src/test/java/org/apache/kafka/clients/producer/internals/BufferPoolTest.java
----------------------------------------------------------------------
diff --git a/clients/src/test/java/org/apache/kafka/clients/producer/internals/BufferPoolTest.java b/clients/src/test/java/org/apache/kafka/clients/producer/internals/BufferPoolTest.java
index f8567e9..b103bee 100644
--- a/clients/src/test/java/org/apache/kafka/clients/producer/internals/BufferPoolTest.java
+++ b/clients/src/test/java/org/apache/kafka/clients/producer/internals/BufferPoolTest.java
@@ -25,9 +25,7 @@ import org.junit.Test;
 
 import java.nio.ByteBuffer;
 import java.util.ArrayList;
-import java.util.LinkedHashMap;
 import java.util.List;
-import java.util.Map;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.atomic.AtomicBoolean;
 
@@ -38,7 +36,6 @@ public class BufferPoolTest {
     private Metrics metrics = new Metrics(time);
     private final long maxBlockTimeMs =  2000;
     String metricGroup = "TestMetrics";
-    Map<String, String> metricTags = new LinkedHashMap<String, String>();
 
     @After
     public void teardown() {
@@ -52,7 +49,7 @@ public class BufferPoolTest {
     public void testSimple() throws Exception {
         long totalMemory = 64 * 1024;
         int size = 1024;
-        BufferPool pool = new BufferPool(totalMemory, size, metrics, time, metricGroup, metricTags);
+        BufferPool pool = new BufferPool(totalMemory, size, metrics, time, metricGroup);
         ByteBuffer buffer = pool.allocate(size, maxBlockTimeMs);
         assertEquals("Buffer size should equal requested size.", size, buffer.limit());
         assertEquals("Unallocated memory should have shrunk", totalMemory - size, pool.unallocatedMemory());
@@ -79,7 +76,7 @@ public class BufferPoolTest {
      */
     @Test(expected = IllegalArgumentException.class)
     public void testCantAllocateMoreMemoryThanWeHave() throws Exception {
-        BufferPool pool = new BufferPool(1024, 512, metrics, time, metricGroup, metricTags);
+        BufferPool pool = new BufferPool(1024, 512, metrics, time, metricGroup);
         ByteBuffer buffer = pool.allocate(1024, maxBlockTimeMs);
         assertEquals(1024, buffer.limit());
         pool.deallocate(buffer);
@@ -91,7 +88,7 @@ public class BufferPoolTest {
      */
     @Test
     public void testDelayedAllocation() throws Exception {
-        BufferPool pool = new BufferPool(5 * 1024, 1024, metrics, time, metricGroup, metricTags);
+        BufferPool pool = new BufferPool(5 * 1024, 1024, metrics, time, metricGroup);
         ByteBuffer buffer = pool.allocate(1024, maxBlockTimeMs);
         CountDownLatch doDealloc = asyncDeallocate(pool, buffer);
         CountDownLatch allocation = asyncAllocate(pool, 5 * 1024);
@@ -140,7 +137,7 @@ public class BufferPoolTest {
      */
     @Test
     public void testBlockTimeout() throws Exception {
-        BufferPool pool = new BufferPool(2, 1, metrics, time, metricGroup, metricTags);
+        BufferPool pool = new BufferPool(2, 1, metrics, time, metricGroup);
         pool.allocate(1, maxBlockTimeMs);
         try {
             pool.allocate(2, maxBlockTimeMs);
@@ -159,7 +156,7 @@ public class BufferPoolTest {
         final int iterations = 50000;
         final int poolableSize = 1024;
         final long totalMemory = numThreads / 2 * poolableSize;
-        final BufferPool pool = new BufferPool(totalMemory, poolableSize, metrics, time, metricGroup, metricTags);
+        final BufferPool pool = new BufferPool(totalMemory, poolableSize, metrics, time, metricGroup);
         List<StressTestThread> threads = new ArrayList<StressTestThread>();
         for (int i = 0; i < numThreads; i++)
             threads.add(new StressTestThread(pool, iterations));

http://git-wip-us.apache.org/repos/asf/kafka/blob/ef92a8ae/clients/src/test/java/org/apache/kafka/clients/producer/internals/RecordAccumulatorTest.java
----------------------------------------------------------------------
diff --git a/clients/src/test/java/org/apache/kafka/clients/producer/internals/RecordAccumulatorTest.java b/clients/src/test/java/org/apache/kafka/clients/producer/internals/RecordAccumulatorTest.java
index 4674a91..723e450 100644
--- a/clients/src/test/java/org/apache/kafka/clients/producer/internals/RecordAccumulatorTest.java
+++ b/clients/src/test/java/org/apache/kafka/clients/producer/internals/RecordAccumulatorTest.java
@@ -22,7 +22,6 @@ import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collections;
 import java.util.Iterator;
-import java.util.LinkedHashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
@@ -65,7 +64,6 @@ public class RecordAccumulatorTest {
     private int msgSize = Records.LOG_OVERHEAD + Record.recordSize(key, value);
     private Cluster cluster = new Cluster(Arrays.asList(node1, node2), Arrays.asList(part1, part2, part3), Collections.<String>emptySet());
     private Metrics metrics = new Metrics(time);
-    Map<String, String> metricTags = new LinkedHashMap<String, String>();
     private final long maxBlockTimeMs = 1000;
 
     @After
@@ -76,7 +74,7 @@ public class RecordAccumulatorTest {
     @Test
     public void testFull() throws Exception {
         long now = time.milliseconds();
-        RecordAccumulator accum = new RecordAccumulator(1024, 10 * 1024, CompressionType.NONE, 10L, 100L, metrics, time,  metricTags);
+        RecordAccumulator accum = new RecordAccumulator(1024, 10 * 1024, CompressionType.NONE, 10L, 100L, metrics, time);
         int appends = 1024 / msgSize;
         for (int i = 0; i < appends; i++) {
             accum.append(tp1, key, value, null, maxBlockTimeMs);
@@ -100,7 +98,7 @@ public class RecordAccumulatorTest {
     @Test
     public void testAppendLarge() throws Exception {
         int batchSize = 512;
-        RecordAccumulator accum = new RecordAccumulator(batchSize, 10 * 1024, CompressionType.NONE, 0L, 100L, metrics, time, metricTags);
+        RecordAccumulator accum = new RecordAccumulator(batchSize, 10 * 1024, CompressionType.NONE, 0L, 100L, metrics, time);
         accum.append(tp1, key, new byte[2 * batchSize], null, maxBlockTimeMs);
         assertEquals("Our partition's leader should be ready", Collections.singleton(node1), accum.ready(cluster, time.milliseconds()).readyNodes);
     }
@@ -108,7 +106,7 @@ public class RecordAccumulatorTest {
     @Test
     public void testLinger() throws Exception {
         long lingerMs = 10L;
-        RecordAccumulator accum = new RecordAccumulator(1024, 10 * 1024, CompressionType.NONE, lingerMs, 100L, metrics, time, metricTags);
+        RecordAccumulator accum = new RecordAccumulator(1024, 10 * 1024, CompressionType.NONE, lingerMs, 100L, metrics, time);
         accum.append(tp1, key, value, null, maxBlockTimeMs);
         assertEquals("No partitions should be ready", 0, accum.ready(cluster, time.milliseconds()).readyNodes.size());
         time.sleep(10);
@@ -126,7 +124,7 @@ public class RecordAccumulatorTest {
 
     @Test
     public void testPartialDrain() throws Exception {
-        RecordAccumulator accum = new RecordAccumulator(1024, 10 * 1024, CompressionType.NONE, 10L, 100L, metrics, time, metricTags);
+        RecordAccumulator accum = new RecordAccumulator(1024, 10 * 1024, CompressionType.NONE, 10L, 100L, metrics, time);
         int appends = 1024 / msgSize + 1;
         List<TopicPartition> partitions = asList(tp1, tp2);
         for (TopicPartition tp : partitions) {
@@ -145,7 +143,7 @@ public class RecordAccumulatorTest {
         final int numThreads = 5;
         final int msgs = 10000;
         final int numParts = 2;
-        final RecordAccumulator accum = new RecordAccumulator(1024, 10 * 1024, CompressionType.NONE, 0L, 100L, metrics, time, metricTags);
+        final RecordAccumulator accum = new RecordAccumulator(1024, 10 * 1024, CompressionType.NONE, 0L, 100L, metrics, time);
         List<Thread> threads = new ArrayList<Thread>();
         for (int i = 0; i < numThreads; i++) {
             threads.add(new Thread() {
@@ -185,7 +183,7 @@ public class RecordAccumulatorTest {
     public void testNextReadyCheckDelay() throws Exception {
         // Next check time will use lingerMs since this test won't trigger any retries/backoff
         long lingerMs = 10L;
-        RecordAccumulator accum = new RecordAccumulator(1024, 10 * 1024,  CompressionType.NONE, lingerMs, 100L, metrics, time, metricTags);
+        RecordAccumulator accum = new RecordAccumulator(1024, 10 * 1024,  CompressionType.NONE, lingerMs, 100L, metrics, time);
         // Just short of going over the limit so we trigger linger time
         int appends = 1024 / msgSize;
 
@@ -219,7 +217,7 @@ public class RecordAccumulatorTest {
     public void testRetryBackoff() throws Exception {
         long lingerMs = Long.MAX_VALUE / 4;
         long retryBackoffMs = Long.MAX_VALUE / 2;
-        final RecordAccumulator accum = new RecordAccumulator(1024, 10 * 1024, CompressionType.NONE, lingerMs, retryBackoffMs, metrics, time, metricTags);
+        final RecordAccumulator accum = new RecordAccumulator(1024, 10 * 1024, CompressionType.NONE, lingerMs, retryBackoffMs, metrics, time);
 
         long now = time.milliseconds();
         accum.append(tp1, key, value, null, maxBlockTimeMs);
@@ -256,7 +254,7 @@ public class RecordAccumulatorTest {
     @Test
     public void testFlush() throws Exception {
         long lingerMs = Long.MAX_VALUE;
-        final RecordAccumulator accum = new RecordAccumulator(4 * 1024, 64 * 1024, CompressionType.NONE, lingerMs, 100L, metrics, time, metricTags);
+        final RecordAccumulator accum = new RecordAccumulator(4 * 1024, 64 * 1024, CompressionType.NONE, lingerMs, 100L, metrics, time);
         for (int i = 0; i < 100; i++)
             accum.append(new TopicPartition(topic, i % 3), key, value, null, maxBlockTimeMs);
         RecordAccumulator.ReadyCheckResult result = accum.ready(cluster, time.milliseconds());
@@ -280,7 +278,7 @@ public class RecordAccumulatorTest {
     public void testAbortIncompleteBatches() throws Exception {
         long lingerMs = Long.MAX_VALUE;
         final AtomicInteger numExceptionReceivedInCallback = new AtomicInteger(0);
-        final RecordAccumulator accum = new RecordAccumulator(4 * 1024, 64 * 1024, CompressionType.NONE, lingerMs, 100L, metrics, time, metricTags);
+        final RecordAccumulator accum = new RecordAccumulator(4 * 1024, 64 * 1024, CompressionType.NONE, lingerMs, 100L, metrics, time);
         class TestCallback implements Callback {
             @Override
             public void onCompletion(RecordMetadata metadata, Exception exception) {
@@ -303,7 +301,7 @@ public class RecordAccumulatorTest {
     public void testExpiredBatches() throws InterruptedException {
         Time time = new SystemTime();
         long now = time.milliseconds();
-        RecordAccumulator accum = new RecordAccumulator(1024, 10 * 1024, CompressionType.NONE, 10, 100L, metrics, time, metricTags);
+        RecordAccumulator accum = new RecordAccumulator(1024, 10 * 1024, CompressionType.NONE, 10, 100L, metrics, time);
         int appends = 1024 / msgSize;
         for (int i = 0; i < appends; i++) {
             accum.append(tp1, key, value, null, maxBlockTimeMs);

http://git-wip-us.apache.org/repos/asf/kafka/blob/ef92a8ae/clients/src/test/java/org/apache/kafka/clients/producer/internals/SenderTest.java
----------------------------------------------------------------------
diff --git a/clients/src/test/java/org/apache/kafka/clients/producer/internals/SenderTest.java b/clients/src/test/java/org/apache/kafka/clients/producer/internals/SenderTest.java
index bcc618a..dc61fc2 100644
--- a/clients/src/test/java/org/apache/kafka/clients/producer/internals/SenderTest.java
+++ b/clients/src/test/java/org/apache/kafka/clients/producer/internals/SenderTest.java
@@ -29,6 +29,7 @@ import org.apache.kafka.common.Cluster;
 import org.apache.kafka.common.MetricName;
 import org.apache.kafka.common.TopicPartition;
 import org.apache.kafka.common.metrics.KafkaMetric;
+import org.apache.kafka.common.metrics.MetricConfig;
 import org.apache.kafka.common.metrics.Metrics;
 import org.apache.kafka.common.protocol.Errors;
 import org.apache.kafka.common.protocol.types.Struct;
@@ -57,24 +58,29 @@ public class SenderTest {
     private int batchSize = 16 * 1024;
     private Metadata metadata = new Metadata(0, Long.MAX_VALUE);
     private Cluster cluster = TestUtils.singletonCluster("test", 1);
-    private Metrics metrics = new Metrics(time);
-    Map<String, String> metricTags = new LinkedHashMap<String, String>();
-    private RecordAccumulator accumulator = new RecordAccumulator(batchSize, 1024 * 1024, CompressionType.NONE, 0L, 0L, metrics, time, metricTags);
-    private Sender sender = new Sender(client,
-                                       metadata,
-                                       this.accumulator,
-                                       MAX_REQUEST_SIZE,
-                                       ACKS_ALL,
-                                       MAX_RETRIES,
-                                       metrics,
-                                       time,
-                                       CLIENT_ID,
-                                       REQUEST_TIMEOUT);
+    private Metrics metrics = null;
+    private RecordAccumulator accumulator = null;
+    private Sender sender = null;
 
     @Before
     public void setup() {
-        metadata.update(cluster, time.milliseconds());
+        Map<String, String> metricTags = new LinkedHashMap<String, String>();
         metricTags.put("client-id", CLIENT_ID);
+        MetricConfig metricConfig = new MetricConfig().tags(metricTags);
+        metrics = new Metrics(metricConfig, time);
+        accumulator = new RecordAccumulator(batchSize, 1024 * 1024, CompressionType.NONE, 0L, 0L, metrics, time);
+        sender = new Sender(client,
+                            metadata,
+                            this.accumulator,
+                            MAX_REQUEST_SIZE,
+                            ACKS_ALL,
+                            MAX_RETRIES,
+                            metrics,
+                            time,
+                            CLIENT_ID,
+                            REQUEST_TIMEOUT);
+
+        metadata.update(cluster, time.milliseconds());
     }
 
     @After
@@ -110,8 +116,8 @@ public class SenderTest {
             sender.run(time.milliseconds());
         }
         Map<MetricName, KafkaMetric> allMetrics = metrics.metrics();
-        KafkaMetric avgMetric = allMetrics.get(new MetricName("produce-throttle-time-avg", METRIC_GROUP, "", metricTags));
-        KafkaMetric maxMetric = allMetrics.get(new MetricName("produce-throttle-time-max", METRIC_GROUP, "", metricTags));
+        KafkaMetric avgMetric = allMetrics.get(metrics.metricName("produce-throttle-time-avg", METRIC_GROUP, ""));
+        KafkaMetric maxMetric = allMetrics.get(metrics.metricName("produce-throttle-time-max", METRIC_GROUP, ""));
         assertEquals(200, avgMetric.value(), EPS);
         assertEquals(300, maxMetric.value(), EPS);
     }