You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by ju...@apache.org on 2015/01/13 07:09:56 UTC

[2/2] kafka git commit: KAFKA-1723; make the metrics name in new producer more standard; patched by Manikumar Reddy; reviewed by Jay Kreps and Jun Rao

KAFKA-1723; make the metrics name in new producer more standard; patched by Manikumar Reddy; reviewed by Jay Kreps and Jun Rao


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/688e38ce
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/688e38ce
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/688e38ce

Branch: refs/heads/trunk
Commit: 688e38ce45a7358a1e0bb359aa9b1a698a841619
Parents: 6f4dea9
Author: Manikumar Reddy <ma...@gmail.com>
Authored: Mon Jan 12 22:01:43 2015 -0800
Committer: Jun Rao <ju...@gmail.com>
Committed: Mon Jan 12 22:02:02 2015 -0800

----------------------------------------------------------------------
 build.gradle                                    |   1 +
 .../apache/kafka/clients/consumer/Consumer.java |   5 +-
 .../kafka/clients/consumer/KafkaConsumer.java   |   3 +-
 .../kafka/clients/consumer/MockConsumer.java    |   3 +-
 .../kafka/clients/producer/KafkaProducer.java   |  19 +-
 .../kafka/clients/producer/MockProducer.java    |   7 +-
 .../apache/kafka/clients/producer/Producer.java |   4 +-
 .../clients/producer/internals/BufferPool.java  |  28 +--
 .../producer/internals/RecordAccumulator.java   |  25 +--
 .../clients/producer/internals/Sender.java      |  81 ++++++---
 .../java/org/apache/kafka/common/Metric.java    |   9 +-
 .../org/apache/kafka/common/MetricName.java     | 179 +++++++++++++++++++
 .../kafka/common/metrics/CompoundStat.java      |  14 +-
 .../kafka/common/metrics/JmxReporter.java       |  60 ++++---
 .../kafka/common/metrics/KafkaMetric.java       |  18 +-
 .../apache/kafka/common/metrics/Metrics.java    |  58 ++----
 .../org/apache/kafka/common/metrics/Sensor.java |  51 ++----
 .../kafka/common/metrics/stats/Percentile.java  |  18 +-
 .../kafka/common/metrics/stats/Percentiles.java |   2 +-
 .../apache/kafka/common/network/Selector.java   |  93 ++++++----
 .../kafka/clients/producer/BufferPoolTest.java  |  14 +-
 .../clients/producer/RecordAccumulatorTest.java |  16 +-
 .../kafka/clients/producer/SenderTest.java      |  11 +-
 .../kafka/common/metrics/JmxReporterTest.java   |  13 +-
 .../kafka/common/metrics/MetricsTest.java       |  87 +++++----
 .../kafka/common/network/SelectorTest.java      |   3 +-
 .../org/apache/kafka/test/MetricsBench.java     |  11 +-
 27 files changed, 533 insertions(+), 300 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/688e38ce/build.gradle
----------------------------------------------------------------------
diff --git a/build.gradle b/build.gradle
index ba52288..c9ac433 100644
--- a/build.gradle
+++ b/build.gradle
@@ -370,6 +370,7 @@ project(':clients') {
 
   javadoc {
     include "**/org/apache/kafka/clients/producer/*"
+    include "**/org/apache/kafka/common/*"
     include "**/org/apache/kafka/common/errors/*"
     include "**/org/apache/kafka/common/serialization/*"
   }

http://git-wip-us.apache.org/repos/asf/kafka/blob/688e38ce/clients/src/main/java/org/apache/kafka/clients/consumer/Consumer.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/Consumer.java b/clients/src/main/java/org/apache/kafka/clients/consumer/Consumer.java
index 1bce501..c0c636b 100644
--- a/clients/src/main/java/org/apache/kafka/clients/consumer/Consumer.java
+++ b/clients/src/main/java/org/apache/kafka/clients/consumer/Consumer.java
@@ -18,6 +18,7 @@ import java.util.Map;
 
 import org.apache.kafka.common.Metric;
 import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.MetricName;
 
 /**
  * @see KafkaConsumer
@@ -111,11 +112,11 @@ public interface Consumer<K,V> extends Closeable {
      * @return The offsets for messages that were written to the server before the specified timestamp.
      */
     public Map<TopicPartition, Long> offsetsBeforeTime(long timestamp, Collection<TopicPartition> partitions);
-    
+
     /**
      * Return a map of metrics maintained by the consumer
      */
-    public Map<String, ? extends Metric> metrics();
+    public Map<MetricName, ? extends Metric> metrics();
 
     /**
      * Close this consumer

http://git-wip-us.apache.org/repos/asf/kafka/blob/688e38ce/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java b/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java
index a5fedce..76efc21 100644
--- a/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java
+++ b/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java
@@ -16,6 +16,7 @@ import org.apache.kafka.common.Metric;
 import org.apache.kafka.common.TopicPartition;
 import org.apache.kafka.common.metrics.JmxReporter;
 import org.apache.kafka.common.metrics.MetricConfig;
+import org.apache.kafka.common.MetricName;
 import org.apache.kafka.common.metrics.Metrics;
 import org.apache.kafka.common.metrics.MetricsReporter;
 import org.apache.kafka.common.serialization.Deserializer;
@@ -627,7 +628,7 @@ public class KafkaConsumer<K,V> implements Consumer<K,V> {
     }
 
     @Override
-    public Map<String, ? extends Metric> metrics() {
+    public Map<MetricName, ? extends Metric> metrics() {
         return Collections.unmodifiableMap(this.metrics.metrics());
     }
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/688e38ce/clients/src/main/java/org/apache/kafka/clients/consumer/MockConsumer.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/MockConsumer.java b/clients/src/main/java/org/apache/kafka/clients/consumer/MockConsumer.java
index 8cab16c..fa88ac1 100644
--- a/clients/src/main/java/org/apache/kafka/clients/consumer/MockConsumer.java
+++ b/clients/src/main/java/org/apache/kafka/clients/consumer/MockConsumer.java
@@ -25,6 +25,7 @@ import java.util.Map.Entry;
 import java.util.Set;
 import org.apache.kafka.common.Metric;
 import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.MetricName;
 
 /**
  * A mock of the {@link Consumer} interface you can use for testing code that uses Kafka.
@@ -179,7 +180,7 @@ public class MockConsumer implements Consumer<byte[], byte[]> {
     }
 
     @Override
-    public Map<String, ? extends Metric> metrics() {        
+    public Map<MetricName, ? extends Metric> metrics() {
         return null;
     }
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/688e38ce/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java b/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java
index d3abeb1..c79149a 100644
--- a/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java
+++ b/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java
@@ -17,6 +17,7 @@ import java.util.*;
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.Future;
 import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
 
 import org.apache.kafka.clients.NetworkClient;
 import org.apache.kafka.clients.producer.internals.Metadata;
@@ -35,6 +36,7 @@ import org.apache.kafka.common.errors.SerializationException;
 import org.apache.kafka.common.errors.TimeoutException;
 import org.apache.kafka.common.metrics.JmxReporter;
 import org.apache.kafka.common.metrics.MetricConfig;
+import org.apache.kafka.common.MetricName;
 import org.apache.kafka.common.metrics.Metrics;
 import org.apache.kafka.common.metrics.MetricsReporter;
 import org.apache.kafka.common.metrics.Sensor;
@@ -77,6 +79,7 @@ public class KafkaProducer<K,V> implements Producer<K,V> {
     private final Serializer<K> keySerializer;
     private final Serializer<V> valueSerializer;
     private final ProducerConfig producerConfig;
+    private static final AtomicInteger producerAutoId = new AtomicInteger(1);
 
     /**
      * A producer is instantiated by providing a set of key-value pairs as configuration. Valid configuration strings
@@ -159,7 +162,9 @@ public class KafkaProducer<K,V> implements Producer<K,V> {
                                                       .timeWindow(config.getLong(ProducerConfig.METRICS_SAMPLE_WINDOW_MS_CONFIG),
                                                                   TimeUnit.MILLISECONDS);
         String clientId = config.getString(ProducerConfig.CLIENT_ID_CONFIG);
-        String jmxPrefix = "kafka.producer." + (clientId.length() > 0 ? clientId + "." : "");
+        if(clientId.length() <= 0)
+          clientId = "producer-" + producerAutoId.getAndIncrement();
+        String jmxPrefix = "kafka.producer";
         List<MetricsReporter> reporters = config.getConfiguredInstances(ProducerConfig.METRIC_REPORTER_CLASSES_CONFIG,
                                                                         MetricsReporter.class);
         reporters.add(new JmxReporter(jmxPrefix));
@@ -171,17 +176,20 @@ public class KafkaProducer<K,V> implements Producer<K,V> {
         this.maxRequestSize = config.getInt(ProducerConfig.MAX_REQUEST_SIZE_CONFIG);
         this.totalMemorySize = config.getLong(ProducerConfig.BUFFER_MEMORY_CONFIG);
         this.compressionType = CompressionType.forName(config.getString(ProducerConfig.COMPRESSION_TYPE_CONFIG));
+        Map<String, String> metricTags = new LinkedHashMap<String, String>();
+        metricTags.put("client-id", clientId);
         this.accumulator = new RecordAccumulator(config.getInt(ProducerConfig.BATCH_SIZE_CONFIG),
                                                  this.totalMemorySize,
                                                  config.getLong(ProducerConfig.LINGER_MS_CONFIG),
                                                  retryBackoffMs,
                                                  config.getBoolean(ProducerConfig.BLOCK_ON_BUFFER_FULL_CONFIG),
                                                  metrics,
-                                                 time);
+                                                 time,
+                                                 metricTags);
         List<InetSocketAddress> addresses = ClientUtils.parseAndValidateAddresses(config.getList(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG));
         this.metadata.update(Cluster.bootstrap(addresses), time.milliseconds());
 
-        NetworkClient client = new NetworkClient(new Selector(this.metrics, time),
+        NetworkClient client = new NetworkClient(new Selector(this.metrics, time , "producer", metricTags),
                                                  this.metadata,
                                                  clientId,
                                                  config.getInt(ProducerConfig.MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION),
@@ -196,7 +204,8 @@ public class KafkaProducer<K,V> implements Producer<K,V> {
                                  config.getInt(ProducerConfig.RETRIES_CONFIG),
                                  config.getInt(ProducerConfig.TIMEOUT_CONFIG),
                                  this.metrics,
-                                 new SystemTime());
+                                 new SystemTime(),
+                                 clientId);
         String ioThreadName = "kafka-producer-network-thread" + (clientId.length() > 0 ? " | " + clientId : "");
         this.ioThread = new KafkaThread(ioThreadName, this.sender, true);
         this.ioThread.start();
@@ -398,7 +407,7 @@ public class KafkaProducer<K,V> implements Producer<K,V> {
     }
 
     @Override
-    public Map<String, ? extends Metric> metrics() {
+    public Map<MetricName, ? extends Metric> metrics() {
         return Collections.unmodifiableMap(this.metrics.metrics());
     }
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/688e38ce/clients/src/main/java/org/apache/kafka/clients/producer/MockProducer.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/MockProducer.java b/clients/src/main/java/org/apache/kafka/clients/producer/MockProducer.java
index 34624c3..904976f 100644
--- a/clients/src/main/java/org/apache/kafka/clients/producer/MockProducer.java
+++ b/clients/src/main/java/org/apache/kafka/clients/producer/MockProducer.java
@@ -28,10 +28,7 @@ import java.util.concurrent.Future;
 import org.apache.kafka.clients.producer.internals.FutureRecordMetadata;
 import org.apache.kafka.clients.producer.internals.Partitioner;
 import org.apache.kafka.clients.producer.internals.ProduceRequestResult;
-import org.apache.kafka.common.Cluster;
-import org.apache.kafka.common.Metric;
-import org.apache.kafka.common.PartitionInfo;
-import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.*;
 
 
 /**
@@ -136,7 +133,7 @@ public class MockProducer implements Producer<byte[], byte[]> {
         return this.cluster.partitionsForTopic(topic);
     }
 
-    public Map<String, Metric> metrics() {
+    public Map<MetricName, Metric> metrics() {
         return Collections.emptyMap();
     }
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/688e38ce/clients/src/main/java/org/apache/kafka/clients/producer/Producer.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/Producer.java b/clients/src/main/java/org/apache/kafka/clients/producer/Producer.java
index 5baa606..6b2471f 100644
--- a/clients/src/main/java/org/apache/kafka/clients/producer/Producer.java
+++ b/clients/src/main/java/org/apache/kafka/clients/producer/Producer.java
@@ -23,11 +23,11 @@ import java.util.concurrent.Future;
 
 import org.apache.kafka.common.Metric;
 import org.apache.kafka.common.PartitionInfo;
+import org.apache.kafka.common.MetricName;
 
 
 /**
  * The interface for the {@link KafkaProducer}
- * 
  * @see KafkaProducer
  * @see MockProducer
  */
@@ -55,7 +55,7 @@ public interface Producer<K,V> extends Closeable {
     /**
      * Return a map of metrics maintained by the producer
      */
-    public Map<String, ? extends Metric> metrics();
+    public Map<MetricName, ? extends Metric> metrics();
 
     /**
      * Close this producer

http://git-wip-us.apache.org/repos/asf/kafka/blob/688e38ce/clients/src/main/java/org/apache/kafka/clients/producer/internals/BufferPool.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/internals/BufferPool.java b/clients/src/main/java/org/apache/kafka/clients/producer/internals/BufferPool.java
index aa91e14..8d4156d 100644
--- a/clients/src/main/java/org/apache/kafka/clients/producer/internals/BufferPool.java
+++ b/clients/src/main/java/org/apache/kafka/clients/producer/internals/BufferPool.java
@@ -16,19 +16,21 @@
  */
 package org.apache.kafka.clients.producer.internals;
 
-import org.apache.kafka.clients.producer.BufferExhaustedException;
-import org.apache.kafka.common.metrics.Metrics;
-import org.apache.kafka.common.metrics.Sensor;
-import org.apache.kafka.common.metrics.stats.Rate;
-import org.apache.kafka.common.utils.Time;
-
 import java.nio.ByteBuffer;
 import java.util.ArrayDeque;
 import java.util.Deque;
+import java.util.Map;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.locks.Condition;
 import java.util.concurrent.locks.ReentrantLock;
 
+import org.apache.kafka.clients.producer.BufferExhaustedException;
+import org.apache.kafka.common.MetricName;
+import org.apache.kafka.common.metrics.Metrics;
+import org.apache.kafka.common.metrics.Sensor;
+import org.apache.kafka.common.metrics.stats.Rate;
+import org.apache.kafka.common.utils.Time;
+
 
 /**
  * A pool of ByteBuffers kept under a given memory limit. This class is fairly specific to the needs of the producer. In
@@ -61,8 +63,12 @@ public final class BufferPool {
      * @param blockOnExhaustion This controls the behavior when the buffer pool is out of memory. If true the
      *        {@link #allocate(int)} call will block and wait for memory to be returned to the pool. If false
      *        {@link #allocate(int)} will throw an exception if the buffer is out of memory.
+     * @param metrics instance of Metrics
+     * @param time time instance
+     * @param metricGrpName logical group name for metrics
+     * @param metricTags additional key/val attributes for metrics
      */
-    public BufferPool(long memory, int poolableSize, boolean blockOnExhaustion, Metrics metrics, Time time) {
+    public BufferPool(long memory, int poolableSize, boolean blockOnExhaustion, Metrics metrics, Time time , String metricGrpName , Map<String, String> metricTags) {
         this.poolableSize = poolableSize;
         this.blockOnExhaustion = blockOnExhaustion;
         this.lock = new ReentrantLock();
@@ -73,9 +79,11 @@ public final class BufferPool {
         this.metrics = metrics;
         this.time = time;
         this.waitTime = this.metrics.sensor("bufferpool-wait-time");
-        this.waitTime.add("bufferpool-wait-ratio",
-                          "The fraction of time an appender waits for space allocation.",
-                          new Rate(TimeUnit.NANOSECONDS));
+        MetricName metricName = new MetricName("bufferpool-wait-ratio",
+                                              metricGrpName,
+                                              "The fraction of time an appender waits for space allocation.",
+                                              metricTags);
+        this.waitTime.add(metricName, new Rate(TimeUnit.NANOSECONDS));
    }
 
     /**

http://git-wip-us.apache.org/repos/asf/kafka/blob/688e38ce/clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java b/clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java
index c15485d..50889e4 100644
--- a/clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java
+++ b/clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java
@@ -31,6 +31,7 @@ import org.apache.kafka.common.PartitionInfo;
 import org.apache.kafka.common.TopicPartition;
 import org.apache.kafka.common.metrics.Measurable;
 import org.apache.kafka.common.metrics.MetricConfig;
+import org.apache.kafka.common.MetricName;
 import org.apache.kafka.common.metrics.Metrics;
 import org.apache.kafka.common.record.CompressionType;
 import org.apache.kafka.common.record.MemoryRecords;
@@ -76,6 +77,7 @@ public final class RecordAccumulator {
      *        memory
      * @param metrics The metrics
      * @param time The time instance to use
+     * @param metricTags additional key/value attributes of the metric
      */
     public RecordAccumulator(int batchSize,
                              long totalSize,
@@ -83,35 +85,38 @@ public final class RecordAccumulator {
                              long retryBackoffMs,
                              boolean blockOnBufferFull,
                              Metrics metrics,
-                             Time time) {
+                             Time time,
+                             Map<String, String> metricTags) {
         this.drainIndex = 0;
         this.closed = false;
         this.batchSize = batchSize;
         this.lingerMs = lingerMs;
         this.retryBackoffMs = retryBackoffMs;
         this.batches = new CopyOnWriteMap<TopicPartition, Deque<RecordBatch>>();
-        this.free = new BufferPool(totalSize, batchSize, blockOnBufferFull, metrics, time);
+        String metricGrpName = "producer-metrics";
+        this.free = new BufferPool(totalSize, batchSize, blockOnBufferFull, metrics, time , metricGrpName , metricTags);
         this.time = time;
-        registerMetrics(metrics);
+        registerMetrics(metrics, metricGrpName, metricTags);
     }
 
-    private void registerMetrics(Metrics metrics) {
-        metrics.addMetric("waiting-threads",
-                          "The number of user threads blocked waiting for buffer memory to enqueue their records",
+    private void registerMetrics(Metrics metrics, String metricGrpName, Map<String, String> metricTags) {
+
+        MetricName metricName = new MetricName("waiting-threads", metricGrpName, "The number of user threads blocked waiting for buffer memory to enqueue their records", metricTags);
+        metrics.addMetric(metricName,
                           new Measurable() {
                               public double measure(MetricConfig config, long now) {
                                   return free.queued();
                               }
                           });
-        metrics.addMetric("buffer-total-bytes",
-                          "The maximum amount of buffer memory the client can use (whether or not it is currently used).",
+        metricName = new MetricName("buffer-total-bytes", metricGrpName, "The maximum amount of buffer memory the client can use (whether or not it is currently used).", metricTags);
+        metrics.addMetric(metricName,
                           new Measurable() {
                               public double measure(MetricConfig config, long now) {
                                   return free.totalMemory();
                               }
                           });
-        metrics.addMetric("buffer-available-bytes",
-                          "The total amount of buffer memory that is not being used (either unallocated or in the free list).",
+        metricName = new MetricName("buffer-available-bytes", metricGrpName, "The total amount of buffer memory that is not being used (either unallocated or in the free list).", metricTags);
+        metrics.addMetric(metricName,
                           new Measurable() {
                               public double measure(MetricConfig config, long now) {
                                   return free.availableMemory();

http://git-wip-us.apache.org/repos/asf/kafka/blob/688e38ce/clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java b/clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java
index 84a7a07..ccc03d8 100644
--- a/clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java
+++ b/clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java
@@ -16,6 +16,7 @@ import java.nio.ByteBuffer;
 import java.util.ArrayList;
 import java.util.HashMap;
 import java.util.Iterator;
+import java.util.LinkedHashMap;
 import java.util.List;
 import java.util.Map;
 
@@ -29,6 +30,7 @@ import org.apache.kafka.common.errors.InvalidMetadataException;
 import org.apache.kafka.common.errors.RetriableException;
 import org.apache.kafka.common.metrics.Measurable;
 import org.apache.kafka.common.metrics.MetricConfig;
+import org.apache.kafka.common.MetricName;
 import org.apache.kafka.common.metrics.Metrics;
 import org.apache.kafka.common.metrics.Sensor;
 import org.apache.kafka.common.metrics.stats.Avg;
@@ -82,6 +84,9 @@ public class Sender implements Runnable {
     /* metrics */
     private final SenderMetrics sensors;
 
+    /* param clientId of the client */
+    private String clientId;
+
     public Sender(KafkaClient client,
                   Metadata metadata,
                   RecordAccumulator accumulator,
@@ -90,7 +95,8 @@ public class Sender implements Runnable {
                   int retries,
                   int requestTimeout,
                   Metrics metrics,
-                  Time time) {
+                  Time time,
+                  String clientId) {
         this.client = client;
         this.accumulator = accumulator;
         this.metadata = metadata;
@@ -100,6 +106,7 @@ public class Sender implements Runnable {
         this.acks = acks;
         this.retries = retries;
         this.time = time;
+        this.clientId = clientId;
         this.sensors = new SenderMetrics(metrics);
     }
 
@@ -324,46 +331,60 @@ public class Sender implements Runnable {
 
         public SenderMetrics(Metrics metrics) {
             this.metrics = metrics;
+            Map<String, String> metricTags = new LinkedHashMap<String, String>();
+            metricTags.put("client-id", clientId);
+            String metricGrpName = "producer-metrics";
 
             this.batchSizeSensor = metrics.sensor("batch-size");
-            this.batchSizeSensor.add("batch-size-avg", "The average number of bytes sent per partition per-request.", new Avg());
-            this.batchSizeSensor.add("batch-size-max", "The max number of bytes sent per partition per-request.", new Max());
+            MetricName m = new MetricName("batch-size-avg", metricGrpName, "The average number of bytes sent per partition per-request.", metricTags);
+            this.batchSizeSensor.add(m, new Avg());
+            m = new MetricName("batch-size-max", metricGrpName, "The max number of bytes sent per partition per-request.", metricTags);
+            this.batchSizeSensor.add(m, new Max());
 
             this.compressionRateSensor = metrics.sensor("compression-rate");
-            this.compressionRateSensor.add("compression-rate-avg", "The average compression rate of record batches.", new Avg());
+            m = new MetricName("compression-rate-avg", metricGrpName, "The average compression rate of record batches.", metricTags);
+            this.compressionRateSensor.add(m, new Avg());
 
             this.queueTimeSensor = metrics.sensor("queue-time");
-            this.queueTimeSensor.add("record-queue-time-avg",
-                                     "The average time in ms record batches spent in the record accumulator.",
-                                     new Avg());
-            this.queueTimeSensor.add("record-queue-time-max",
-                                     "The maximum time in ms record batches spent in the record accumulator.",
-                                     new Max());
+            m = new MetricName("record-queue-time-avg", metricGrpName, "The average time in ms record batches spent in the record accumulator.", metricTags);
+            this.queueTimeSensor.add(m, new Avg());
+            m = new MetricName("record-queue-time-max", metricGrpName, "The maximum time in ms record batches spent in the record accumulator.", metricTags);
+            this.queueTimeSensor.add(m, new Max());
 
             this.requestTimeSensor = metrics.sensor("request-time");
-            this.requestTimeSensor.add("request-latency-avg", "The average request latency in ms", new Avg());
-            this.requestTimeSensor.add("request-latency-max", "The maximum request latency in ms", new Max());
+            m = new MetricName("request-latency-avg", metricGrpName, "The average request latency in ms", metricTags);
+            this.requestTimeSensor.add(m, new Avg());
+            m = new MetricName("request-latency-max", metricGrpName, "The maximum request latency in ms", metricTags);
+            this.requestTimeSensor.add(m, new Max());
 
             this.recordsPerRequestSensor = metrics.sensor("records-per-request");
-            this.recordsPerRequestSensor.add("record-send-rate", "The average number of records sent per second.", new Rate());
-            this.recordsPerRequestSensor.add("records-per-request-avg", "The average number of records per request.", new Avg());
+            m = new MetricName("record-send-rate", metricGrpName, "The average number of records sent per second.", metricTags);
+            this.recordsPerRequestSensor.add(m, new Rate());
+            m = new MetricName("records-per-request-avg", metricGrpName, "The average number of records per request.", metricTags);
+            this.recordsPerRequestSensor.add(m, new Avg());
 
             this.retrySensor = metrics.sensor("record-retries");
-            this.retrySensor.add("record-retry-rate", "The average per-second number of retried record sends", new Rate());
+            m = new MetricName("record-retry-rate", metricGrpName, "The average per-second number of retried record sends", metricTags);
+            this.retrySensor.add(m, new Rate());
 
             this.errorSensor = metrics.sensor("errors");
-            this.errorSensor.add("record-error-rate", "The average per-second number of record sends that resulted in errors", new Rate());
+            m = new MetricName("record-error-rate", metricGrpName, "The average per-second number of record sends that resulted in errors", metricTags);
+            this.errorSensor.add(m, new Rate());
 
             this.maxRecordSizeSensor = metrics.sensor("record-size-max");
-            this.maxRecordSizeSensor.add("record-size-max", "The maximum record size", new Max());
-            this.maxRecordSizeSensor.add("record-size-avg", "The average record size", new Avg());
+            m = new MetricName("record-size-max", metricGrpName, "The maximum record size", metricTags);
+            this.maxRecordSizeSensor.add(m, new Max());
+            m = new MetricName("record-size-avg", metricGrpName, "The average record size", metricTags);
+            this.maxRecordSizeSensor.add(m, new Avg());
 
-            this.metrics.addMetric("requests-in-flight", "The current number of in-flight requests awaiting a response.", new Measurable() {
+            m = new MetricName("requests-in-flight", metricGrpName, "The current number of in-flight requests awaiting a response.", metricTags);
+            this.metrics.addMetric(m, new Measurable() {
                 public double measure(MetricConfig config, long now) {
                     return client.inFlightRequestCount();
                 }
             });
-            metrics.addMetric("metadata-age", "The age in seconds of the current producer metadata being used.", new Measurable() {
+            m = new MetricName("metadata-age", metricGrpName, "The age in seconds of the current producer metadata being used.", metricTags);
+            metrics.addMetric(m, new Measurable() {
                 public double measure(MetricConfig config, long now) {
                     return (now - metadata.lastUpdate()) / 1000.0;
                 }
@@ -376,24 +397,34 @@ public class Sender implements Runnable {
             String topicRecordsCountName = "topic." + topic + ".records-per-batch";
             Sensor topicRecordCount = this.metrics.getSensor(topicRecordsCountName);
             if (topicRecordCount == null) {
+                Map<String, String> metricTags = new LinkedHashMap<String, String>();
+                metricTags.put("client-id", clientId);
+                metricTags.put("topic", topic);
+                String metricGrpName = "producer-topic-metrics";
+
                 topicRecordCount = this.metrics.sensor(topicRecordsCountName);
-                topicRecordCount.add("topic." + topic + ".record-send-rate", new Rate());
+                MetricName m = new MetricName("record-send-rate", metricGrpName , metricTags);
+                topicRecordCount.add(m, new Rate());
 
                 String topicByteRateName = "topic." + topic + ".bytes";
                 Sensor topicByteRate = this.metrics.sensor(topicByteRateName);
-                topicByteRate.add("topic." + topic + ".byte-rate", new Rate());
+                m = new MetricName("byte-rate", metricGrpName , metricTags);
+                topicByteRate.add(m, new Rate());
 
                 String topicCompressionRateName = "topic." + topic + ".compression-rate";
                 Sensor topicCompressionRate = this.metrics.sensor(topicCompressionRateName);
-                topicCompressionRate.add("topic." + topic + ".compression-rate", new Avg());
+                m = new MetricName("compression-rate", metricGrpName , metricTags);
+                topicCompressionRate.add(m, new Avg());
 
                 String topicRetryName = "topic." + topic + ".record-retries";
                 Sensor topicRetrySensor = this.metrics.sensor(topicRetryName);
-                topicRetrySensor.add("topic." + topic + ".record-retry-rate", new Rate());
+                m = new MetricName("record-retry-rate", metricGrpName , metricTags);
+                topicRetrySensor.add(m, new Rate());
 
                 String topicErrorName = "topic." + topic + ".record-errors";
                 Sensor topicErrorSensor = this.metrics.sensor(topicErrorName);
-                topicErrorSensor.add("topic." + topic + ".record-error-rate", new Rate());
+                m = new MetricName("record-error-rate", metricGrpName , metricTags);
+                topicErrorSensor.add(m, new Rate());
             }
         }
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/688e38ce/clients/src/main/java/org/apache/kafka/common/Metric.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/Metric.java b/clients/src/main/java/org/apache/kafka/common/Metric.java
index b023e8e..d4ef77e 100644
--- a/clients/src/main/java/org/apache/kafka/common/Metric.java
+++ b/clients/src/main/java/org/apache/kafka/common/Metric.java
@@ -22,14 +22,9 @@ package org.apache.kafka.common;
 public interface Metric {
 
     /**
-     * A unique name for this metric
+     * A name for this metric
      */
-    public String name();
-
-    /**
-     * A description of what is measured...this will be "" if no description was given
-     */
-    public String description();
+    public MetricName metricName();
 
     /**
      * The value of the metric

http://git-wip-us.apache.org/repos/asf/kafka/blob/688e38ce/clients/src/main/java/org/apache/kafka/common/MetricName.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/MetricName.java b/clients/src/main/java/org/apache/kafka/common/MetricName.java
new file mode 100644
index 0000000..4e810d5
--- /dev/null
+++ b/clients/src/main/java/org/apache/kafka/common/MetricName.java
@@ -0,0 +1,179 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE
+ * file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file
+ * to You under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the
+ * License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
+ * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations under the License.
+ */
+package org.apache.kafka.common;
+
+import java.util.HashMap;
+import java.util.Map;
+
+import org.apache.kafka.common.utils.Utils;
+
+/**
+ * The <code>MetricName</code> class encapsulates a metric's name, logical group and its related attributes
+ * <p/>
+ * This class captures the following parameters
+ * <pre>
+ *  <b>name</b> The name of the metric
+ *  <b>group</b> logical group name of the metrics to which this metric belongs.
+ *  <b>description</b> A human-readable description to include in the metric. This is optional.
+ *  <b>tags</b> additional key/value attributes of the metric. This is optional.
+ *   </pre>
+ * group, tags parameters can be used to create unique metric names while reporting in JMX or any custom reporting.
+ *
+ * Ex: standard JMX MBean can be constructed like  <b>domainName:type=group,key1=val1,key2=val2</b>
+ *
+ * Usage looks something like this:
+ * <pre>
+ * // set up metrics:
+ * Metrics metrics = new Metrics(); // this is the global repository of metrics and sensors
+ * Sensor sensor = metrics.sensor(&quot;message-sizes&quot;);
+ * Map<String, String> metricTags = new LinkedHashMap<String, String>();
+ * metricTags.put("client-id", "producer-1");
+ * metricTags.put("topic", "topic");
+ * MetricName metricName = new MetricName(&quot;message-size-avg&quot;, &quot;producer-metrics&quot;, "average message size", metricTags);
+ * sensor.add(metricName, new Avg());
+ * metricName = new MetricName(&quot;message-size-max&quot;, &quot;producer-metrics&quot;,metricTags);
+ * sensor.add(metricName, new Max());
+ *
+ * // as messages are sent we record the sizes
+ * sensor.record(messageSize);
+ * </pre>
+ */
+public final class MetricName {
+
+    private final String name;
+    private final String group;
+    private final String description;
+    private Map<String, String> tags;
+    private int hash = 0;
+
+    /**
+     * @param name        The name of the metric
+     * @param group       logical group name of the metrics to which this metric belongs
+     * @param description A human-readable description to include in the metric
+     * @param tags        additional key/value attributes of the metric
+     */
+    public MetricName(String name, String group, String description, Map<String, String> tags) {
+        this.name = Utils.notNull(name);
+        this.group = Utils.notNull(group);
+        this.description = Utils.notNull(description);
+        this.tags = Utils.notNull(tags);
+    }
+
+    /**
+     * @param name          The name of the metric
+     * @param group         logical group name of the metrics to which this metric belongs
+     * @param description   A human-readable description to include in the metric
+     * @param keyValue      additional key/value attributes of the metric (must come in pairs)
+     */
+    public MetricName(String name, String group, String description, String... keyValue) {
+        this(name, group, description, getTags(keyValue));
+    }
+
+    private static Map<String, String> getTags(String... keyValue) {
+        if ((keyValue.length % 2) != 0)
+            throw new IllegalArgumentException("keyValue needs to be specified in paris");
+        Map<String, String> tags = new HashMap<String, String>();
+
+        for (int i=0; i<(keyValue.length / 2); i++)
+            tags.put(keyValue[i], keyValue[i+1]);
+        return tags;
+    }
+
+    /**
+     * @param name  The name of the metric
+     * @param group logical group name of the metrics to which this metric belongs
+     * @param tags  key/value attributes of the metric
+     */
+    public MetricName(String name, String group, Map<String, String> tags) {
+        this(name, group, "", tags);
+    }
+
+    /**
+     * @param name        The name of the metric
+     * @param group       logical group name of the metrics to which this metric belongs
+     * @param description A human-readable description to include in the metric
+     */
+    public MetricName(String name, String group, String description) {
+        this(name, group, description, new HashMap<String, String>());
+    }
+
+    /**
+     * @param name  The name of the metric
+     * @param group logical group name of the metrics to which this metric belongs
+     */
+    public MetricName(String name, String group) {
+        this(name, group, "", new HashMap<String, String>());
+    }
+
+    public String name() {
+        return this.name;
+    }
+
+    public String group() {
+        return this.group;
+    }
+
+    public Map<String, String> tags() {
+        return this.tags;
+    }
+
+    public String description() {
+        return this.description;
+    }
+
+    @Override
+    public int hashCode() {
+        if (hash != 0)
+            return hash;
+        final int prime = 31;
+        int result = 1;
+        result = prime * result + ((group == null) ? 0 : group.hashCode());
+        result = prime * result + ((name == null) ? 0 : name.hashCode());
+        result = prime * result + ((tags == null) ? 0 : tags.hashCode());
+        this.hash = result;
+        return result;
+    }
+
+    @Override
+    public boolean equals(Object obj) {
+        if (this == obj)
+            return true;
+        if (obj == null)
+            return false;
+        if (getClass() != obj.getClass())
+            return false;
+        MetricName other = (MetricName) obj;
+        if (group == null) {
+            if (other.group != null)
+                return false;
+        } else if (!group.equals(other.group))
+            return false;
+        if (name == null) {
+            if (other.name != null)
+                return false;
+        } else if (!name.equals(other.name))
+            return false;
+        if (tags == null) {
+            if (other.tags != null)
+                return false;
+        } else if (!tags.equals(other.tags))
+            return false;
+        return true;
+    }
+
+    @Override
+    public String toString() {
+        return "MetricName [name=" + name + ", group=" + group + ", description="
+                + description + ", tags=" + tags + "]";
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/kafka/blob/688e38ce/clients/src/main/java/org/apache/kafka/common/metrics/CompoundStat.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/metrics/CompoundStat.java b/clients/src/main/java/org/apache/kafka/common/metrics/CompoundStat.java
index 29185a6..e0969aa 100644
--- a/clients/src/main/java/org/apache/kafka/common/metrics/CompoundStat.java
+++ b/clients/src/main/java/org/apache/kafka/common/metrics/CompoundStat.java
@@ -16,6 +16,8 @@
  */
 package org.apache.kafka.common.metrics;
 
+import org.apache.kafka.common.MetricName;
+
 import java.util.List;
 
 /**
@@ -28,25 +30,19 @@ public interface CompoundStat extends Stat {
 
     public static class NamedMeasurable {
 
-        private final String name;
-        private final String description;
+        private final MetricName name;
         private final Measurable stat;
 
-        public NamedMeasurable(String name, String description, Measurable stat) {
+        public NamedMeasurable(MetricName name, Measurable stat) {
             super();
             this.name = name;
-            this.description = description;
             this.stat = stat;
         }
 
-        public String name() {
+        public MetricName name() {
             return name;
         }
 
-        public String description() {
-            return description;
-        }
-
         public Measurable stat() {
             return stat;
         }

http://git-wip-us.apache.org/repos/asf/kafka/blob/688e38ce/clients/src/main/java/org/apache/kafka/common/metrics/JmxReporter.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/metrics/JmxReporter.java b/clients/src/main/java/org/apache/kafka/common/metrics/JmxReporter.java
index 3c31201..9c20538 100644
--- a/clients/src/main/java/org/apache/kafka/common/metrics/JmxReporter.java
+++ b/clients/src/main/java/org/apache/kafka/common/metrics/JmxReporter.java
@@ -32,6 +32,7 @@ import javax.management.ObjectName;
 import javax.management.ReflectionException;
 
 import org.apache.kafka.common.KafkaException;
+import org.apache.kafka.common.MetricName;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -80,18 +81,39 @@ public class JmxReporter implements MetricsReporter {
 
     private KafkaMbean addAttribute(KafkaMetric metric) {
         try {
-            String[] names = split(prefix + metric.name());
-            String qualifiedName = names[0] + "." + names[1];
-            if (!this.mbeans.containsKey(qualifiedName))
-                mbeans.put(qualifiedName, new KafkaMbean(names[0], names[1]));
-            KafkaMbean mbean = this.mbeans.get(qualifiedName);
-            mbean.setAttribute(names[2], metric);
+            MetricName metricName = metric.metricName();
+            String mBeanName = getMBeanName(metricName);
+            if (!this.mbeans.containsKey(mBeanName))
+                mbeans.put(mBeanName, new KafkaMbean(mBeanName));
+            KafkaMbean mbean = this.mbeans.get(mBeanName);
+            mbean.setAttribute(metricName.name() , metric);
             return mbean;
         } catch (JMException e) {
-            throw new KafkaException("Error creating mbean attribute " + metric.name(), e);
+            throw new KafkaException("Error creating mbean attribute for metricName :" + metric.metricName(), e);
         }
     }
 
+  /**
+   * @param metricName
+   * @return standard JMX MBean name in the following format
+   *       domainName:type=metricType,key1=val1,key2=val2
+   */
+  private String getMBeanName(MetricName metricName) {
+    StringBuilder mBeanName = new StringBuilder();
+    mBeanName.append(prefix);
+    mBeanName.append(":type=");
+    mBeanName.append(metricName.group());
+    for (Map.Entry<String, String> entry : metricName.tags().entrySet()) {
+      if(entry.getKey().length() <= 0 || entry.getValue().length() <= 0)
+         continue;
+      mBeanName.append(",");
+      mBeanName.append(entry.getKey());
+      mBeanName.append("=");
+      mBeanName.append(entry.getValue());
+    }
+    return mBeanName.toString();
+  }
+
     public void close() {
         synchronized (lock) {
             for (KafkaMbean mbean : this.mbeans.values())
@@ -118,29 +140,13 @@ public class JmxReporter implements MetricsReporter {
         }
     }
 
-    private String[] split(String name) {
-        int attributeStart = name.lastIndexOf('.');
-        if (attributeStart < 0)
-            throw new IllegalArgumentException("No MBean name in metric name: " + name);
-        String attributeName = name.substring(attributeStart + 1, name.length());
-        String remainder = name.substring(0, attributeStart);
-        int beanStart = remainder.lastIndexOf('.');
-        if (beanStart < 0)
-            return new String[] { "", remainder, attributeName };
-        String packageName = remainder.substring(0, beanStart);
-        String beanName = remainder.substring(beanStart + 1, remainder.length());
-        return new String[] { packageName, beanName, attributeName };
-    }
-
     private static class KafkaMbean implements DynamicMBean {
-        private final String beanName;
         private final ObjectName objectName;
         private final Map<String, KafkaMetric> metrics;
 
-        public KafkaMbean(String packageName, String beanName) throws MalformedObjectNameException {
-            this.beanName = beanName;
+        public KafkaMbean(String mbeanName) throws MalformedObjectNameException {
             this.metrics = new HashMap<String, KafkaMetric>();
-            this.objectName = new ObjectName(packageName + ":type=" + beanName);
+            this.objectName = new ObjectName(mbeanName);
         }
 
         public ObjectName name() {
@@ -179,10 +185,10 @@ public class JmxReporter implements MetricsReporter {
             for (Map.Entry<String, KafkaMetric> entry : this.metrics.entrySet()) {
                 String attribute = entry.getKey();
                 KafkaMetric metric = entry.getValue();
-                attrs[i] = new MBeanAttributeInfo(attribute, double.class.getName(), metric.description(), true, false, false);
+                attrs[i] = new MBeanAttributeInfo(attribute, double.class.getName(), metric.metricName().description(), true, false, false);
                 i += 1;
             }
-            return new MBeanInfo(beanName, "", attrs, null, null, null);
+            return new MBeanInfo(this.getClass().getName(), "", attrs, null, null, null);
         }
 
         @Override

http://git-wip-us.apache.org/repos/asf/kafka/blob/688e38ce/clients/src/main/java/org/apache/kafka/common/metrics/KafkaMetric.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/metrics/KafkaMetric.java b/clients/src/main/java/org/apache/kafka/common/metrics/KafkaMetric.java
index a7458b5..89df1a4 100644
--- a/clients/src/main/java/org/apache/kafka/common/metrics/KafkaMetric.java
+++ b/clients/src/main/java/org/apache/kafka/common/metrics/KafkaMetric.java
@@ -17,21 +17,20 @@
 package org.apache.kafka.common.metrics;
 
 import org.apache.kafka.common.Metric;
+import org.apache.kafka.common.MetricName;
 import org.apache.kafka.common.utils.Time;
 
 public final class KafkaMetric implements Metric {
 
-    private final String name;
-    private final String description;
+    private MetricName metricName;
     private final Object lock;
     private final Time time;
     private final Measurable measurable;
     private MetricConfig config;
 
-    KafkaMetric(Object lock, String name, String description, Measurable measurable, MetricConfig config, Time time) {
+    KafkaMetric(Object lock, MetricName metricName, Measurable measurable, MetricConfig config, Time time) {
         super();
-        this.name = name;
-        this.description = description;
+        this.metricName = metricName;
         this.lock = lock;
         this.measurable = measurable;
         this.config = config;
@@ -43,13 +42,8 @@ public final class KafkaMetric implements Metric {
     }
 
     @Override
-    public String name() {
-        return this.name;
-    }
-
-    @Override
-    public String description() {
-        return this.description;
+    public MetricName metricName() {
+        return this.metricName;
     }
 
     @Override

http://git-wip-us.apache.org/repos/asf/kafka/blob/688e38ce/clients/src/main/java/org/apache/kafka/common/metrics/Metrics.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/metrics/Metrics.java b/clients/src/main/java/org/apache/kafka/common/metrics/Metrics.java
index 49be401..b3d3d7c 100644
--- a/clients/src/main/java/org/apache/kafka/common/metrics/Metrics.java
+++ b/clients/src/main/java/org/apache/kafka/common/metrics/Metrics.java
@@ -17,6 +17,7 @@ import java.util.List;
 import java.util.Map;
 import java.util.concurrent.ConcurrentMap;
 
+import org.apache.kafka.common.MetricName;
 import org.apache.kafka.common.utils.CopyOnWriteMap;
 import org.apache.kafka.common.utils.SystemTime;
 import org.apache.kafka.common.utils.Time;
@@ -36,8 +37,10 @@ import org.apache.kafka.common.utils.Utils;
  * // set up metrics:
  * Metrics metrics = new Metrics(); // this is the global repository of metrics and sensors
  * Sensor sensor = metrics.sensor(&quot;message-sizes&quot;);
- * sensor.add(&quot;kafka.producer.message-sizes.avg&quot;, new Avg());
- * sensor.add(&quot;kafka.producer.message-sizes.max&quot;, new Max());
+ * MetricName metricName = new MetricName(&quot;message-size-avg&quot;, &quot;producer-metrics&quot;);
+ * sensor.add(metricName, new Avg());
+ * metricName = new MetricName(&quot;message-size-max&quot;, &quot;producer-metrics&quot;);
+ * sensor.add(metricName, new Max());
  * 
  * // as messages are sent we record the sizes
  * sensor.record(messageSize);
@@ -46,7 +49,7 @@ import org.apache.kafka.common.utils.Utils;
 public class Metrics {
 
     private final MetricConfig config;
-    private final ConcurrentMap<String, KafkaMetric> metrics;
+    private final ConcurrentMap<MetricName, KafkaMetric> metrics;
     private final ConcurrentMap<String, Sensor> sensors;
     private final List<MetricsReporter> reporters;
     private final Time time;
@@ -83,7 +86,7 @@ public class Metrics {
     public Metrics(MetricConfig defaultConfig, List<MetricsReporter> reporters, Time time) {
         this.config = defaultConfig;
         this.sensors = new CopyOnWriteMap<String, Sensor>();
-        this.metrics = new CopyOnWriteMap<String, KafkaMetric>();
+        this.metrics = new CopyOnWriteMap<MetricName, KafkaMetric>();
         this.reporters = Utils.notNull(reporters);
         this.time = time;
         for (MetricsReporter reporter : reporters)
@@ -139,47 +142,23 @@ public class Metrics {
     /**
      * Add a metric to monitor an object that implements measurable. This metric won't be associated with any sensor.
      * This is a way to expose existing values as metrics.
-     * @param name The name of the metric
+     * @param metricName The name of the metric
      * @param measurable The measurable that will be measured by this metric
      */
-    public void addMetric(String name, Measurable measurable) {
-        addMetric(name, "", measurable);
+    public void addMetric(MetricName metricName, Measurable measurable) {
+        addMetric(metricName, null, measurable);
     }
 
     /**
      * Add a metric to monitor an object that implements measurable. This metric won't be associated with any sensor.
      * This is a way to expose existing values as metrics.
-     * @param name The name of the metric
-     * @param description A human-readable description to include in the metric
-     * @param measurable The measurable that will be measured by this metric
-     */
-    public void addMetric(String name, String description, Measurable measurable) {
-        addMetric(name, description, null, measurable);
-    }
-
-    /**
-     * Add a metric to monitor an object that implements measurable. This metric won't be associated with any sensor.
-     * This is a way to expose existing values as metrics.
-     * @param name The name of the metric
-     * @param config The configuration to use when measuring this measurable
-     * @param measurable The measurable that will be measured by this metric
-     */
-    public void addMetric(String name, MetricConfig config, Measurable measurable) {
-        addMetric(name, "", config, measurable);
-    }
-
-    /**
-     * Add a metric to monitor an object that implements measurable. This metric won't be associated with any sensor.
-     * This is a way to expose existing values as metrics.
-     * @param name The name of the metric
-     * @param description A human-readable description to include in the metric
+     * @param metricName The name of the metric
      * @param config The configuration to use when measuring this measurable
      * @param measurable The measurable that will be measured by this metric
      */
-    public synchronized void addMetric(String name, String description, MetricConfig config, Measurable measurable) {
+    public synchronized void addMetric(MetricName metricName, MetricConfig config, Measurable measurable) {
         KafkaMetric m = new KafkaMetric(new Object(),
-                                        Utils.notNull(name),
-                                        Utils.notNull(description),
+                                        Utils.notNull(metricName),
                                         Utils.notNull(measurable),
                                         config == null ? this.config : config,
                                         time);
@@ -195,17 +174,18 @@ public class Metrics {
     }
 
     synchronized void registerMetric(KafkaMetric metric) {
-        if (this.metrics.containsKey(metric.name()))
-            throw new IllegalArgumentException("A metric named '" + metric.name() + "' already exists, can't register another one.");
-        this.metrics.put(metric.name(), metric);
+        MetricName metricName = metric.metricName();
+        if (this.metrics.containsKey(metricName))
+            throw new IllegalArgumentException("A metric named '" + metricName + "' already exists, can't register another one.");
+        this.metrics.put(metricName, metric);
         for (MetricsReporter reporter : reporters)
             reporter.metricChange(metric);
     }
 
     /**
-     * Get all the metrics currently maintained indexed by metric name
+     * Get all the metrics currently maintained indexed by metricName
      */
-    public Map<String, KafkaMetric> metrics() {
+    public Map<MetricName, KafkaMetric> metrics() {
         return this.metrics;
     }
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/688e38ce/clients/src/main/java/org/apache/kafka/common/metrics/Sensor.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/metrics/Sensor.java b/clients/src/main/java/org/apache/kafka/common/metrics/Sensor.java
index 25c1faf..e53cfaa 100644
--- a/clients/src/main/java/org/apache/kafka/common/metrics/Sensor.java
+++ b/clients/src/main/java/org/apache/kafka/common/metrics/Sensor.java
@@ -18,6 +18,7 @@ import java.util.HashSet;
 import java.util.List;
 import java.util.Set;
 
+import org.apache.kafka.common.MetricName;
 import org.apache.kafka.common.metrics.CompoundStat.NamedMeasurable;
 import org.apache.kafka.common.utils.Time;
 import org.apache.kafka.common.utils.Utils;
@@ -112,7 +113,7 @@ public final class Sensor {
                 Quota quota = config.quota();
                 if (quota != null) {
                     if (!quota.acceptable(metric.value(timeMs)))
-                        throw new QuotaViolationException("Metric " + metric.name() + " is in violation of its quota of " + quota.bound());
+                        throw new QuotaViolationException(metric.metricName() + " is in violation of its quota of " + quota.bound());
                 }
             }
         }
@@ -134,55 +135,33 @@ public final class Sensor {
     public synchronized void add(CompoundStat stat, MetricConfig config) {
         this.stats.add(Utils.notNull(stat));
         for (NamedMeasurable m : stat.stats()) {
-            KafkaMetric metric = new KafkaMetric(this, m.name(), m.description(), m.stat(), config == null ? this.config : config, time);
+            KafkaMetric metric = new KafkaMetric(this, m.name(), m.stat(), config == null ? this.config : config, time);
             this.registry.registerMetric(metric);
             this.metrics.add(metric);
         }
     }
 
     /**
-     * Add a metric with default configuration and no description. Equivalent to
-     * {@link Sensor#add(String, String, MeasurableStat, MetricConfig) add(name, "", stat, null)}
-     * 
-     */
-    public void add(String name, MeasurableStat stat) {
-        add(name, stat, null);
-    }
-
-    /**
-     * Add a metric with default configuration. Equivalent to
-     * {@link Sensor#add(String, String, MeasurableStat, MetricConfig) add(name, description, stat, null)}
-     * 
-     */
-    public void add(String name, String description, MeasurableStat stat) {
-        add(name, description, stat, null);
-    }
-
-    /**
-     * Add a metric to this sensor with no description. Equivalent to
-     * {@link Sensor#add(String, String, MeasurableStat, MetricConfig) add(name, "", stat, config)}
-     * @param name
-     * @param stat
-     * @param config
+     * Register a metric with this sensor
+     * @param metricName The name of the metric
+     * @param stat The statistic to keep
      */
-    public void add(String name, MeasurableStat stat, MetricConfig config) {
-        add(name, "", stat, config);
+    public void add(MetricName metricName, MeasurableStat stat) {
+      add(metricName, stat, null);
     }
 
     /**
      * Register a metric with this sensor
-     * @param name The name of the metric
-     * @param description A description used when reporting the value
+     * @param metricName The name of the metric
      * @param stat The statistic to keep
      * @param config A special configuration for this metric. If null use the sensor default configuration.
      */
-    public synchronized void add(String name, String description, MeasurableStat stat, MetricConfig config) {
-        KafkaMetric metric = new KafkaMetric(this,
-                                             Utils.notNull(name),
-                                             Utils.notNull(description),
-                                             Utils.notNull(stat),
-                                             config == null ? this.config : config,
-                                             time);
+    public synchronized void add(MetricName metricName, MeasurableStat stat, MetricConfig config) {
+      KafkaMetric metric = new KafkaMetric(new Object(),
+                                      Utils.notNull(metricName),
+                                      Utils.notNull(stat),
+                                      config == null ? this.config : config,
+                                      time);
         this.registry.registerMetric(metric);
         this.metrics.add(metric);
         this.stats.add(stat);

http://git-wip-us.apache.org/repos/asf/kafka/blob/688e38ce/clients/src/main/java/org/apache/kafka/common/metrics/stats/Percentile.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/metrics/stats/Percentile.java b/clients/src/main/java/org/apache/kafka/common/metrics/stats/Percentile.java
index 7365ceb..fb741ae 100644
--- a/clients/src/main/java/org/apache/kafka/common/metrics/stats/Percentile.java
+++ b/clients/src/main/java/org/apache/kafka/common/metrics/stats/Percentile.java
@@ -16,31 +16,23 @@
  */
 package org.apache.kafka.common.metrics.stats;
 
+import org.apache.kafka.common.MetricName;
+
 public class Percentile {
 
-    private final String name;
-    private final String description;
+    private final MetricName name;
     private final double percentile;
 
-    public Percentile(String name, double percentile) {
-        this(name, "", percentile);
-    }
-
-    public Percentile(String name, String description, double percentile) {
+    public Percentile(MetricName name, double percentile) {
         super();
         this.name = name;
-        this.description = description;
         this.percentile = percentile;
     }
 
-    public String name() {
+    public MetricName name() {
         return this.name;
     }
 
-    public String description() {
-        return this.description;
-    }
-
     public double percentile() {
         return this.percentile;
     }

http://git-wip-us.apache.org/repos/asf/kafka/blob/688e38ce/clients/src/main/java/org/apache/kafka/common/metrics/stats/Percentiles.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/metrics/stats/Percentiles.java b/clients/src/main/java/org/apache/kafka/common/metrics/stats/Percentiles.java
index c70d577..78c93e8 100644
--- a/clients/src/main/java/org/apache/kafka/common/metrics/stats/Percentiles.java
+++ b/clients/src/main/java/org/apache/kafka/common/metrics/stats/Percentiles.java
@@ -59,7 +59,7 @@ public class Percentiles extends SampledStat implements CompoundStat {
         List<NamedMeasurable> ms = new ArrayList<NamedMeasurable>(this.percentiles.length);
         for (Percentile percentile : this.percentiles) {
             final double pct = percentile.percentile();
-            ms.add(new NamedMeasurable(percentile.name(), percentile.description(), new Measurable() {
+            ms.add(new NamedMeasurable(percentile.name(), new Measurable() {
                 public double measure(MetricConfig config, long now) {
                     return value(config, now, pct / 100.0);
                 }

http://git-wip-us.apache.org/repos/asf/kafka/blob/688e38ce/clients/src/main/java/org/apache/kafka/common/network/Selector.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/network/Selector.java b/clients/src/main/java/org/apache/kafka/common/network/Selector.java
index 4dd2cdf..74d695b 100644
--- a/clients/src/main/java/org/apache/kafka/common/network/Selector.java
+++ b/clients/src/main/java/org/apache/kafka/common/network/Selector.java
@@ -23,6 +23,7 @@ import java.nio.channels.UnresolvedAddressException;
 import java.util.ArrayList;
 import java.util.HashMap;
 import java.util.Iterator;
+import java.util.LinkedHashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
@@ -31,6 +32,7 @@ import java.util.concurrent.TimeUnit;
 import org.apache.kafka.common.KafkaException;
 import org.apache.kafka.common.metrics.Measurable;
 import org.apache.kafka.common.metrics.MetricConfig;
+import org.apache.kafka.common.MetricName;
 import org.apache.kafka.common.metrics.Metrics;
 import org.apache.kafka.common.metrics.Sensor;
 import org.apache.kafka.common.metrics.stats.Avg;
@@ -81,17 +83,21 @@ public class Selector implements Selectable {
     private final List<Integer> connected;
     private final Time time;
     private final SelectorMetrics sensors;
+    private final String metricGrpPrefix;
+    private final Map<String, String> metricTags;
 
     /**
      * Create a new selector
      */
-    public Selector(Metrics metrics, Time time) {
+    public Selector(Metrics metrics, Time time , String metricGrpPrefix , Map<String, String> metricTags) {
         try {
             this.selector = java.nio.channels.Selector.open();
         } catch (IOException e) {
             throw new KafkaException(e);
         }
         this.time = time;
+        this.metricGrpPrefix = metricGrpPrefix;
+        this.metricTags = metricTags;
         this.keys = new HashMap<Integer, SelectionKey>();
         this.completedSends = new ArrayList<NetworkSend>();
         this.completedReceives = new ArrayList<NetworkReceive>();
@@ -410,42 +416,52 @@ public class Selector implements Selectable {
 
         public SelectorMetrics(Metrics metrics) {
             this.metrics = metrics;
+            String metricGrpName = metricGrpPrefix + "-metrics";
 
             this.connectionClosed = this.metrics.sensor("connections-closed");
-            this.connectionClosed.add("connection-close-rate", "Connections closed per second in the window.", new Rate());
+            MetricName metricName = new MetricName("connection-close-rate", metricGrpName, "Connections closed per second in the window.", metricTags);
+            this.connectionClosed.add(metricName, new Rate());
 
             this.connectionCreated = this.metrics.sensor("connections-created");
-            this.connectionCreated.add("connection-creation-rate", "New connections established per second in the window.", new Rate());
+            metricName = new MetricName("connection-creation-rate", metricGrpName, "New connections established per second in the window.", metricTags);
+            this.connectionCreated.add(metricName, new Rate());
 
             this.bytesTransferred = this.metrics.sensor("bytes-sent-received");
-            bytesTransferred.add("network-io-rate",
-                                 "The average number of network operations (reads or writes) on all connections per second.",
-                                 new Rate(new Count()));
+            metricName = new MetricName("network-io-rate", metricGrpName, "The average number of network operations (reads or writes) on all connections per second.", metricTags);
+            bytesTransferred.add(metricName, new Rate(new Count()));
 
             this.bytesSent = this.metrics.sensor("bytes-sent", bytesTransferred);
-            this.bytesSent.add("outgoing-byte-rate", "The average number of outgoing bytes sent per second to all servers.", new Rate());
-            this.bytesSent.add("request-rate", "The average number of requests sent per second.", new Rate(new Count()));
-            this.bytesSent.add("request-size-avg", "The average size of all requests in the window..", new Avg());
-            this.bytesSent.add("request-size-max", "The maximum size of any request sent in the window.", new Max());
+            metricName = new MetricName("outgoing-byte-rate", metricGrpName, "The average number of outgoing bytes sent per second to all servers.", metricTags);
+            this.bytesSent.add(metricName, new Rate());
+            metricName = new MetricName("request-rate", metricGrpName, "The average number of requests sent per second.", metricTags);
+            this.bytesSent.add(metricName, new Rate(new Count()));
+            metricName = new MetricName("request-size-avg", metricGrpName, "The average size of all requests in the window..", metricTags);
+            this.bytesSent.add(metricName, new Avg());
+            metricName = new MetricName("request-size-max", metricGrpName, "The maximum size of any request sent in the window.", metricTags);
+            this.bytesSent.add(metricName, new Max());
 
             this.bytesReceived = this.metrics.sensor("bytes-received", bytesTransferred);
-            this.bytesReceived.add("incoming-byte-rate", "Bytes/second read off all sockets", new Rate());
-            this.bytesReceived.add("response-rate", "Responses received sent per second.", new Rate(new Count()));
+            metricName = new MetricName("incoming-byte-rate", metricGrpName, "Bytes/second read off all sockets", metricTags);
+            this.bytesReceived.add(metricName, new Rate());
+            metricName = new MetricName("response-rate", metricGrpName, "Responses received sent per second.", metricTags);
+            this.bytesReceived.add(metricName, new Rate(new Count()));
 
             this.selectTime = this.metrics.sensor("select-time");
-            this.selectTime.add("select-rate",
-                                "Number of times the I/O layer checked for new I/O to perform per second",
-                                new Rate(new Count()));
-            this.selectTime.add("io-wait-time-ns-avg",
-                                "The average length of time the I/O thread spent waiting for a socket ready for reads or writes in nanoseconds.",
-                                new Avg());
-            this.selectTime.add("io-wait-ratio", "The fraction of time the I/O thread spent waiting.", new Rate(TimeUnit.NANOSECONDS));
+            metricName = new MetricName("select-rate", metricGrpName, "Number of times the I/O layer checked for new I/O to perform per second", metricTags);
+            this.selectTime.add(metricName, new Rate(new Count()));
+            metricName = new MetricName("io-wait-time-ns-avg", metricGrpName, "The average length of time the I/O thread spent waiting for a socket ready for reads or writes in nanoseconds.", metricTags);
+            this.selectTime.add(metricName, new Avg());
+            metricName = new MetricName("io-wait-ratio", metricGrpName, "The fraction of time the I/O thread spent waiting.", metricTags);
+            this.selectTime.add(metricName, new Rate(TimeUnit.NANOSECONDS));
 
             this.ioTime = this.metrics.sensor("io-time");
-            this.ioTime.add("io-time-ns-avg", "The average length of time for I/O per select call in nanoseconds.", new Avg());
-            this.ioTime.add("io-ratio", "The fraction of time the I/O thread spent doing I/O", new Rate(TimeUnit.NANOSECONDS));
+            metricName = new MetricName("io-time-ns-avg", metricGrpName, "The average length of time for I/O per select call in nanoseconds.", metricTags);
+            this.ioTime.add(metricName, new Avg());
+            metricName = new MetricName("io-ratio", metricGrpName, "The fraction of time the I/O thread spent doing I/O", metricTags);
+            this.ioTime.add(metricName, new Rate(TimeUnit.NANOSECONDS));
 
-            this.metrics.addMetric("connection-count", "The current number of active connections.", new Measurable() {
+            metricName = new MetricName("connection-count", metricGrpName, "The current number of active connections.", metricTags);
+            this.metrics.addMetric(metricName, new Measurable() {
                 public double measure(MetricConfig config, long now) {
                     return keys.size();
                 }
@@ -459,25 +475,34 @@ public class Selector implements Selectable {
                 String nodeRequestName = "node-" + node + ".bytes-sent";
                 Sensor nodeRequest = this.metrics.getSensor(nodeRequestName);
                 if (nodeRequest == null) {
+                    String metricGrpName = metricGrpPrefix + "-node-metrics";
+
+                    Map<String, String> tags = new LinkedHashMap<String, String>(metricTags);
+                    tags.put("node-id", "node-"+node);
+
                     nodeRequest = this.metrics.sensor(nodeRequestName);
-                    nodeRequest.add("node-" + node + ".outgoing-byte-rate", new Rate());
-                    nodeRequest.add("node-" + node + ".request-rate",
-                                    "The average number of requests sent per second.",
-                                    new Rate(new Count()));
-                    nodeRequest.add("node-" + node + ".request-size-avg", "The average size of all requests in the window..", new Avg());
-                    nodeRequest.add("node-" + node + ".request-size-max", "The maximum size of any request sent in the window.", new Max());
+                    MetricName metricName = new MetricName("outgoing-byte-rate", metricGrpName, tags);
+                    nodeRequest.add(metricName, new Rate());
+                    metricName = new MetricName("request-rate", metricGrpName, "The average number of requests sent per second.", tags);
+                    nodeRequest.add(metricName, new Rate(new Count()));
+                    metricName = new MetricName("request-size-avg", metricGrpName, "The average size of all requests in the window..", tags);
+                    nodeRequest.add(metricName, new Avg());
+                    metricName = new MetricName("request-size-max", metricGrpName, "The maximum size of any request sent in the window.", tags);
+                    nodeRequest.add(metricName, new Max());
 
                     String nodeResponseName = "node-" + node + ".bytes-received";
                     Sensor nodeResponse = this.metrics.sensor(nodeResponseName);
-                    nodeResponse.add("node-" + node + ".incoming-byte-rate", new Rate());
-                    nodeResponse.add("node-" + node + ".response-rate",
-                                     "The average number of responses received per second.",
-                                     new Rate(new Count()));
+                    metricName = new MetricName("incoming-byte-rate", metricGrpName, tags);
+                    nodeResponse.add(metricName, new Rate());
+                    metricName = new MetricName("response-rate", metricGrpName, "The average number of responses received per second.", tags);
+                    nodeResponse.add(metricName, new Rate(new Count()));
 
                     String nodeTimeName = "node-" + node + ".latency";
                     Sensor nodeRequestTime = this.metrics.sensor(nodeTimeName);
-                    nodeRequestTime.add("node-" + node + ".request-latency-avg", new Avg());
-                    nodeRequestTime.add("node-" + node + ".request-latency-max", new Max());
+                    metricName = new MetricName("request-latency-avg", metricGrpName, tags);
+                    nodeRequestTime.add(metricName, new Avg());
+                    metricName = new MetricName("request-latency-max", metricGrpName, tags);
+                    nodeRequestTime.add(metricName, new Max());
                 }
             }
         }

http://git-wip-us.apache.org/repos/asf/kafka/blob/688e38ce/clients/src/test/java/org/apache/kafka/clients/producer/BufferPoolTest.java
----------------------------------------------------------------------
diff --git a/clients/src/test/java/org/apache/kafka/clients/producer/BufferPoolTest.java b/clients/src/test/java/org/apache/kafka/clients/producer/BufferPoolTest.java
index fe3c13f..1236803 100644
--- a/clients/src/test/java/org/apache/kafka/clients/producer/BufferPoolTest.java
+++ b/clients/src/test/java/org/apache/kafka/clients/producer/BufferPoolTest.java
@@ -24,7 +24,9 @@ import org.junit.Test;
 
 import java.nio.ByteBuffer;
 import java.util.ArrayList;
+import java.util.LinkedHashMap;
 import java.util.List;
+import java.util.Map;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.atomic.AtomicBoolean;
 
@@ -33,6 +35,8 @@ import static org.junit.Assert.*;
 public class BufferPoolTest {
     private MockTime time = new MockTime();
     private Metrics metrics = new Metrics(time);
+    String metricGroup = "TestMetrics";
+    Map<String, String> metricTags = new LinkedHashMap<String, String>();
 
     /**
      * Test the simple non-blocking allocation paths
@@ -41,7 +45,7 @@ public class BufferPoolTest {
     public void testSimple() throws Exception {
         int totalMemory = 64 * 1024;
         int size = 1024;
-        BufferPool pool = new BufferPool(totalMemory, size, false, metrics, time);
+        BufferPool pool = new BufferPool(totalMemory, size, false, metrics, time, metricGroup, metricTags);
         ByteBuffer buffer = pool.allocate(size);
         assertEquals("Buffer size should equal requested size.", size, buffer.limit());
         assertEquals("Unallocated memory should have shrunk", totalMemory - size, pool.unallocatedMemory());
@@ -68,7 +72,7 @@ public class BufferPoolTest {
      */
     @Test(expected = IllegalArgumentException.class)
     public void testCantAllocateMoreMemoryThanWeHave() throws Exception {
-        BufferPool pool = new BufferPool(1024, 512, true, metrics, time);
+        BufferPool pool = new BufferPool(1024, 512, true, metrics, time, metricGroup, metricTags);
         ByteBuffer buffer = pool.allocate(1024);
         assertEquals(1024, buffer.limit());
         pool.deallocate(buffer);
@@ -77,7 +81,7 @@ public class BufferPoolTest {
 
     @Test
     public void testNonblockingMode() throws Exception {
-        BufferPool pool = new BufferPool(2, 1, false, metrics, time);
+        BufferPool pool = new BufferPool(2, 1, false, metrics, time, metricGroup, metricTags);
         pool.allocate(1);
         try {
             pool.allocate(2);
@@ -92,7 +96,7 @@ public class BufferPoolTest {
      */
     @Test
     public void testDelayedAllocation() throws Exception {
-        BufferPool pool = new BufferPool(5 * 1024, 1024, true, metrics, time);
+        BufferPool pool = new BufferPool(5 * 1024, 1024, true, metrics, time, metricGroup, metricTags);
         ByteBuffer buffer = pool.allocate(1024);
         CountDownLatch doDealloc = asyncDeallocate(pool, buffer);
         CountDownLatch allocation = asyncAllocate(pool, 5 * 1024);
@@ -141,7 +145,7 @@ public class BufferPoolTest {
         final int iterations = 50000;
         final int poolableSize = 1024;
         final int totalMemory = numThreads / 2 * poolableSize;
-        final BufferPool pool = new BufferPool(totalMemory, poolableSize, true, metrics, time);
+        final BufferPool pool = new BufferPool(totalMemory, poolableSize, true, metrics, time, metricGroup, metricTags);
         List<StressTestThread> threads = new ArrayList<StressTestThread>();
         for (int i = 0; i < numThreads; i++)
             threads.add(new StressTestThread(pool, iterations));

http://git-wip-us.apache.org/repos/asf/kafka/blob/688e38ce/clients/src/test/java/org/apache/kafka/clients/producer/RecordAccumulatorTest.java
----------------------------------------------------------------------
diff --git a/clients/src/test/java/org/apache/kafka/clients/producer/RecordAccumulatorTest.java b/clients/src/test/java/org/apache/kafka/clients/producer/RecordAccumulatorTest.java
index 2c99324..e2bb8da 100644
--- a/clients/src/test/java/org/apache/kafka/clients/producer/RecordAccumulatorTest.java
+++ b/clients/src/test/java/org/apache/kafka/clients/producer/RecordAccumulatorTest.java
@@ -22,7 +22,9 @@ import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collections;
 import java.util.Iterator;
+import java.util.LinkedHashMap;
 import java.util.List;
+import java.util.Map;
 import java.util.Set;
 
 import org.apache.kafka.clients.producer.internals.RecordAccumulator;
@@ -59,11 +61,13 @@ public class RecordAccumulatorTest {
     private int msgSize = Records.LOG_OVERHEAD + Record.recordSize(key, value);
     private Cluster cluster = new Cluster(Arrays.asList(node1, node2), Arrays.asList(part1, part2, part3));
     private Metrics metrics = new Metrics(time);
+    String metricGroup = "TestMetrics";
+    Map<String, String> metricTags = new LinkedHashMap<String, String>();
 
     @Test
     public void testFull() throws Exception {
         long now = time.milliseconds();
-        RecordAccumulator accum = new RecordAccumulator(1024, 10 * 1024, 10L, 100L, false, metrics, time);
+        RecordAccumulator accum = new RecordAccumulator(1024, 10 * 1024, 10L, 100L, false, metrics, time,  metricTags);
         int appends = 1024 / msgSize;
         for (int i = 0; i < appends; i++) {
             accum.append(tp1, key, value, CompressionType.NONE, null);
@@ -86,7 +90,7 @@ public class RecordAccumulatorTest {
     @Test
     public void testAppendLarge() throws Exception {
         int batchSize = 512;
-        RecordAccumulator accum = new RecordAccumulator(batchSize, 10 * 1024, 0L, 100L, false, metrics, time);
+        RecordAccumulator accum = new RecordAccumulator(batchSize, 10 * 1024, 0L, 100L, false, metrics, time, metricTags);
         accum.append(tp1, key, new byte[2 * batchSize], CompressionType.NONE, null);
         assertEquals("Our partition's leader should be ready", Collections.singleton(node1), accum.ready(cluster, time.milliseconds()).readyNodes);
     }
@@ -94,7 +98,7 @@ public class RecordAccumulatorTest {
     @Test
     public void testLinger() throws Exception {
         long lingerMs = 10L;
-        RecordAccumulator accum = new RecordAccumulator(1024, 10 * 1024, lingerMs, 100L, false, metrics, time);
+        RecordAccumulator accum = new RecordAccumulator(1024, 10 * 1024, lingerMs, 100L, false, metrics, time, metricTags);
         accum.append(tp1, key, value, CompressionType.NONE, null);
         assertEquals("No partitions should be ready", 0, accum.ready(cluster, time.milliseconds()).readyNodes.size());
         time.sleep(10);
@@ -111,7 +115,7 @@ public class RecordAccumulatorTest {
 
     @Test
     public void testPartialDrain() throws Exception {
-        RecordAccumulator accum = new RecordAccumulator(1024, 10 * 1024, 10L, 100L, false, metrics, time);
+        RecordAccumulator accum = new RecordAccumulator(1024, 10 * 1024, 10L, 100L, false, metrics, time, metricTags);
         int appends = 1024 / msgSize + 1;
         List<TopicPartition> partitions = asList(tp1, tp2);
         for (TopicPartition tp : partitions) {
@@ -129,7 +133,7 @@ public class RecordAccumulatorTest {
         final int numThreads = 5;
         final int msgs = 10000;
         final int numParts = 2;
-        final RecordAccumulator accum = new RecordAccumulator(1024, 10 * 1024, 0L, 100L, true, metrics, time);
+        final RecordAccumulator accum = new RecordAccumulator(1024, 10 * 1024, 0L, 100L, true, metrics, time, metricTags);
         List<Thread> threads = new ArrayList<Thread>();
         for (int i = 0; i < numThreads; i++) {
             threads.add(new Thread() {
@@ -169,7 +173,7 @@ public class RecordAccumulatorTest {
     public void testNextReadyCheckDelay() throws Exception {
         // Next check time will use lingerMs since this test won't trigger any retries/backoff
         long lingerMs = 10L;
-        RecordAccumulator accum = new RecordAccumulator(1024, 10 * 1024, lingerMs, 100L, false, metrics, time);
+        RecordAccumulator accum = new RecordAccumulator(1024, 10 * 1024, lingerMs, 100L, false, metrics, time, metricTags);
         // Just short of going over the limit so we trigger linger time
         int appends = 1024 / msgSize;
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/688e38ce/clients/src/test/java/org/apache/kafka/clients/producer/SenderTest.java
----------------------------------------------------------------------
diff --git a/clients/src/test/java/org/apache/kafka/clients/producer/SenderTest.java b/clients/src/test/java/org/apache/kafka/clients/producer/SenderTest.java
index ef2ca65..66cbdf5 100644
--- a/clients/src/test/java/org/apache/kafka/clients/producer/SenderTest.java
+++ b/clients/src/test/java/org/apache/kafka/clients/producer/SenderTest.java
@@ -16,6 +16,8 @@ import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertTrue;
 import static org.junit.Assert.fail;
 
+import java.util.LinkedHashMap;
+import java.util.Map;
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.Future;
 
@@ -50,7 +52,8 @@ public class SenderTest {
     private Metadata metadata = new Metadata(0, Long.MAX_VALUE);
     private Cluster cluster = TestUtils.singletonCluster("test", 1);
     private Metrics metrics = new Metrics(time);
-    private RecordAccumulator accumulator = new RecordAccumulator(batchSize, 1024 * 1024, 0L, 0L, false, metrics, time);
+    Map<String, String> metricTags = new LinkedHashMap<String, String>();
+    private RecordAccumulator accumulator = new RecordAccumulator(batchSize, 1024 * 1024, 0L, 0L, false, metrics, time, metricTags);
     private Sender sender = new Sender(client,
                                        metadata,
                                        this.accumulator,
@@ -59,7 +62,8 @@ public class SenderTest {
                                        MAX_RETRIES,
                                        REQUEST_TIMEOUT_MS,
                                        metrics,
-                                       time);
+                                       time,
+                                       "clientId");
 
     @Before
     public void setup() {
@@ -93,7 +97,8 @@ public class SenderTest {
                                    maxRetries,
                                    REQUEST_TIMEOUT_MS,
                                    new Metrics(),
-                                   time);
+                                   time,
+                                   "clientId");
         // do a successful retry
         Future<RecordMetadata> future = accumulator.append(tp, "key".getBytes(), "value".getBytes(), CompressionType.NONE, null).future;
         sender.run(time.milliseconds()); // connect

http://git-wip-us.apache.org/repos/asf/kafka/blob/688e38ce/clients/src/test/java/org/apache/kafka/common/metrics/JmxReporterTest.java
----------------------------------------------------------------------
diff --git a/clients/src/test/java/org/apache/kafka/common/metrics/JmxReporterTest.java b/clients/src/test/java/org/apache/kafka/common/metrics/JmxReporterTest.java
index 2f43c49..07b1b60 100644
--- a/clients/src/test/java/org/apache/kafka/common/metrics/JmxReporterTest.java
+++ b/clients/src/test/java/org/apache/kafka/common/metrics/JmxReporterTest.java
@@ -16,10 +16,7 @@
  */
 package org.apache.kafka.common.metrics;
 
-
-import org.apache.kafka.common.metrics.JmxReporter;
-import org.apache.kafka.common.metrics.Metrics;
-import org.apache.kafka.common.metrics.Sensor;
+import org.apache.kafka.common.MetricName;
 import org.apache.kafka.common.metrics.stats.Avg;
 import org.apache.kafka.common.metrics.stats.Total;
 import org.junit.Test;
@@ -31,10 +28,10 @@ public class JmxReporterTest {
         Metrics metrics = new Metrics();
         metrics.addReporter(new JmxReporter());
         Sensor sensor = metrics.sensor("kafka.requests");
-        sensor.add("pack.bean1.avg", new Avg());
-        sensor.add("pack.bean2.total", new Total());
+        sensor.add(new MetricName("pack.bean1.avg", "grp1"), new Avg());
+        sensor.add(new MetricName("pack.bean2.total", "grp2"), new Total());
         Sensor sensor2 = metrics.sensor("kafka.blah");
-        sensor2.add("pack.bean1.some", new Total());
-        sensor2.add("pack.bean2.some", new Total());
+        sensor2.add(new MetricName("pack.bean1.some", "grp1"), new Total());
+        sensor2.add(new MetricName("pack.bean2.some", "grp1"), new Total());
     }
 }