You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by ju...@apache.org on 2014/11/18 03:35:57 UTC

[1/2] kafka git commit: kafka-1481; Stop using dashes AND underscores as separators in MBean names; patched by Vladimir Tretyakov; reviewed by Joel Koshy and Jun Rao

Repository: kafka
Updated Branches:
  refs/heads/0.8.2 7e1907e32 -> 117a02de0


http://git-wip-us.apache.org/repos/asf/kafka/blob/117a02de/core/src/test/scala/unit/kafka/metrics/MetricsTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/metrics/MetricsTest.scala b/core/src/test/scala/unit/kafka/metrics/MetricsTest.scala
new file mode 100644
index 0000000..3cf23b3
--- /dev/null
+++ b/core/src/test/scala/unit/kafka/metrics/MetricsTest.scala
@@ -0,0 +1,72 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package kafka.consumer
+
+import com.yammer.metrics.Metrics
+import junit.framework.Assert._
+import kafka.integration.KafkaServerTestHarness
+import kafka.server._
+import scala.collection._
+import org.scalatest.junit.JUnit3Suite
+import kafka.message._
+import kafka.serializer._
+import kafka.utils._
+import kafka.utils.TestUtils._
+
+class MetricsTest extends JUnit3Suite with KafkaServerTestHarness with Logging {
+  val zookeeperConnect = TestZKUtils.zookeeperConnect
+  val numNodes = 2
+  val numParts = 2
+  val topic = "topic1"
+  val configs =
+    for (props <- TestUtils.createBrokerConfigs(numNodes))
+    yield new KafkaConfig(props) {
+      override val zkConnect = zookeeperConnect
+      override val numPartitions = numParts
+    }
+  val nMessages = 2
+
+  override def tearDown() {
+    super.tearDown()
+  }
+
+  def testMetricsLeak() {
+    // create topic topic1 with 1 partition on broker 0
+    createTopic(zkClient, topic, numPartitions = 1, replicationFactor = 1, servers = servers)
+    // force creation not client's specific metrics.
+    createAndShutdownStep("group0", "consumer0", "producer0")
+
+    val countOfStaticMetrics = Metrics.defaultRegistry().allMetrics().keySet().size
+
+    for (i <- 0 to 5) {
+      createAndShutdownStep("group" + i % 3, "consumer" + i % 2, "producer" + i % 2)
+      assertEquals(countOfStaticMetrics, Metrics.defaultRegistry().allMetrics().keySet().size)
+    }
+  }
+
+  def createAndShutdownStep(group: String, consumerId: String, producerId: String): Unit = {
+    val sentMessages1 = sendMessages(configs, topic, producerId, nMessages, "batch1", NoCompressionCodec, 1)
+    // create a consumer
+    val consumerConfig1 = new ConsumerConfig(TestUtils.createConsumerProperties(zkConnect, group, consumerId))
+    val zkConsumerConnector1 = new ZookeeperConsumerConnector(consumerConfig1, true)
+    val topicMessageStreams1 = zkConsumerConnector1.createMessageStreams(Map(topic -> 1), new StringDecoder(), new StringDecoder())
+    val receivedMessages1 = getMessages(nMessages, topicMessageStreams1)
+
+    zkConsumerConnector1.shutdown()
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/kafka/blob/117a02de/core/src/test/scala/unit/kafka/utils/TestUtils.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/utils/TestUtils.scala b/core/src/test/scala/unit/kafka/utils/TestUtils.scala
index dd3640f..0da774d 100644
--- a/core/src/test/scala/unit/kafka/utils/TestUtils.scala
+++ b/core/src/test/scala/unit/kafka/utils/TestUtils.scala
@@ -26,7 +26,6 @@ import java.util.Properties
 
 import org.apache.kafka.common.utils.Utils._
 
-import collection.mutable.Map
 import collection.mutable.ListBuffer
 
 import org.I0Itec.zkclient.ZkClient
@@ -36,7 +35,7 @@ import kafka.producer._
 import kafka.message._
 import kafka.api._
 import kafka.cluster.Broker
-import kafka.consumer.ConsumerConfig
+import kafka.consumer.{KafkaStream, ConsumerConfig}
 import kafka.serializer.{StringEncoder, DefaultEncoder, Encoder}
 import kafka.common.TopicAndPartition
 import kafka.admin.AdminUtils
@@ -47,6 +46,8 @@ import junit.framework.AssertionFailedError
 import junit.framework.Assert._
 import org.apache.kafka.clients.producer.KafkaProducer
 
+import scala.collection.Map
+
 /**
  * Utility functions to help with testing
  */
@@ -483,7 +484,7 @@ object TestUtils extends Logging {
     val data = topics.flatMap(topic =>
       partitions.map(partition => (TopicAndPartition(topic,  partition), message))
     )
-    new ProducerRequest(correlationId, clientId, acks.toShort, timeout, Map(data:_*))
+    new ProducerRequest(correlationId, clientId, acks.toShort, timeout, collection.mutable.Map(data:_*))
   }
 
   def makeLeaderForPartition(zkClient: ZkClient, topic: String,
@@ -720,6 +721,73 @@ object TestUtils extends Logging {
       time = time,
       brokerState = new BrokerState())
   }
+
+  def sendMessagesToPartition(configs: Seq[KafkaConfig],
+                              topic: String,
+                              partition: Int,
+                              numMessages: Int,
+                              compression: CompressionCodec = NoCompressionCodec): List[String] = {
+    val header = "test-%d".format(partition)
+    val props = new Properties()
+    props.put("compression.codec", compression.codec.toString)
+    val producer: Producer[Int, String] =
+      createProducer(TestUtils.getBrokerListStrFromConfigs(configs),
+        encoder = classOf[StringEncoder].getName,
+        keyEncoder = classOf[IntEncoder].getName,
+        partitioner = classOf[FixedValuePartitioner].getName,
+        producerProps = props)
+
+    val ms = 0.until(numMessages).map(x => header + "-" + x)
+    producer.send(ms.map(m => new KeyedMessage[Int, String](topic, partition, m)):_*)
+    debug("Sent %d messages for partition [%s,%d]".format(ms.size, topic, partition))
+    producer.close()
+    ms.toList
+  }
+
+  def sendMessages(configs: Seq[KafkaConfig],
+                   topic: String,
+                   producerId: String,
+                   messagesPerNode: Int,
+                   header: String,
+                   compression: CompressionCodec,
+                   numParts: Int): List[String]= {
+    var messages: List[String] = Nil
+    val props = new Properties()
+    props.put("compression.codec", compression.codec.toString)
+    props.put("client.id", producerId)
+    val   producer: Producer[Int, String] =
+      createProducer(brokerList = TestUtils.getBrokerListStrFromConfigs(configs),
+        encoder = classOf[StringEncoder].getName,
+        keyEncoder = classOf[IntEncoder].getName,
+        partitioner = classOf[FixedValuePartitioner].getName,
+        producerProps = props)
+
+    for (partition <- 0 until numParts) {
+      val ms = 0.until(messagesPerNode).map(x => header + "-" + partition + "-" + x)
+      producer.send(ms.map(m => new KeyedMessage[Int, String](topic, partition, m)):_*)
+      messages ++= ms
+      debug("Sent %d messages for partition [%s,%d]".format(ms.size, topic, partition))
+    }
+    producer.close()
+    messages
+  }
+
+  def getMessages(nMessagesPerThread: Int,
+                  topicMessageStreams: Map[String, List[KafkaStream[String, String]]]): List[String] = {
+    var messages: List[String] = Nil
+    for ((topic, messageStreams) <- topicMessageStreams) {
+      for (messageStream <- messageStreams) {
+        val iterator = messageStream.iterator
+        for (i <- 0 until nMessagesPerThread) {
+          assertTrue(iterator.hasNext)
+          val message = iterator.next.message
+          messages ::= message
+          debug("received message: " + message)
+        }
+      }
+    }
+    messages.reverse
+  }
 }
 
 object TestZKUtils {


[2/2] kafka git commit: kafka-1481; Stop using dashes AND underscores as separators in MBean names; patched by Vladimir Tretyakov; reviewed by Joel Koshy and Jun Rao

Posted by ju...@apache.org.
kafka-1481; Stop using dashes AND underscores as separators in MBean names; patched by Vladimir Tretyakov; reviewed by Joel Koshy and Jun Rao


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/117a02de
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/117a02de
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/117a02de

Branch: refs/heads/0.8.2
Commit: 117a02de06bab60e179345d6cd82d56e5b725189
Parents: 7e1907e
Author: Vladimir Tretyakov <vl...@sematext.com>
Authored: Mon Nov 17 18:35:45 2014 -0800
Committer: Jun Rao <ju...@gmail.com>
Committed: Mon Nov 17 18:35:45 2014 -0800

----------------------------------------------------------------------
 build.gradle                                    |   6 +
 .../main/scala/kafka/cluster/Partition.scala    |  13 +-
 core/src/main/scala/kafka/common/AppInfo.scala  |  66 +++++++++
 .../scala/kafka/common/ClientIdAndBroker.scala  |  14 +-
 .../scala/kafka/common/ClientIdAndTopic.scala   |  14 +-
 .../kafka/consumer/ConsumerFetcherThread.scala  |   2 +-
 .../kafka/consumer/ConsumerTopicStats.scala     |  17 ++-
 .../consumer/FetchRequestAndResponseStats.scala |  28 ++--
 .../scala/kafka/consumer/SimpleConsumer.scala   |   5 +-
 .../consumer/ZookeeperConsumerConnector.scala   |  37 ++---
 core/src/main/scala/kafka/log/Log.scala         |  38 ++++--
 .../scala/kafka/metrics/KafkaMetricsGroup.scala | 135 +++++++++++--------
 .../scala/kafka/network/RequestChannel.scala    |  27 ++--
 .../main/scala/kafka/network/SocketServer.scala |   2 +-
 .../main/scala/kafka/producer/Producer.scala    |   3 +-
 .../kafka/producer/ProducerRequestStats.scala   |  23 ++--
 .../scala/kafka/producer/ProducerStats.scala    |   7 +-
 .../kafka/producer/ProducerTopicStats.scala     |  23 ++--
 .../scala/kafka/producer/SyncProducer.scala     |  12 +-
 .../producer/async/ProducerSendThread.scala     |   5 +-
 .../kafka/server/AbstractFetcherManager.scala   |  31 ++---
 .../kafka/server/AbstractFetcherThread.scala    |  36 ++---
 .../main/scala/kafka/server/DelayedFetch.scala  |   2 +-
 .../scala/kafka/server/DelayedProduce.scala     |   2 +-
 .../scala/kafka/server/DelayedRequestKey.scala  |  38 ------
 .../src/main/scala/kafka/server/KafkaApis.scala |   7 +-
 .../kafka/server/KafkaRequestHandler.scala      |  25 ++--
 .../main/scala/kafka/server/KafkaServer.scala   |   1 -
 .../kafka/server/KafkaServerStartable.scala     |   2 +
 .../kafka/server/ProducerRequestPurgatory.scala |  22 +--
 .../scala/kafka/server/ReplicaManager.scala     |   8 +-
 .../scala/kafka/tools/ProducerPerformance.scala |   1 -
 .../ZookeeperConsumerConnectorTest.scala        | 101 +++-----------
 .../scala/unit/kafka/metrics/MetricsTest.scala  |  72 ++++++++++
 .../test/scala/unit/kafka/utils/TestUtils.scala |  74 +++++++++-
 35 files changed, 555 insertions(+), 344 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/117a02de/build.gradle
----------------------------------------------------------------------
diff --git a/build.gradle b/build.gradle
index 17b9969..665771b 100644
--- a/build.gradle
+++ b/build.gradle
@@ -263,6 +263,12 @@ project(':core') {
     dependsOn 'copyDependantLibs'
   }
 
+  jar.manifest {
+    attributes(
+      'Version': "${version}"
+    )
+  }
+
   task testJar(type: Jar) {
     classifier = 'test'
     from sourceSets.test.output

http://git-wip-us.apache.org/repos/asf/kafka/blob/117a02de/core/src/main/scala/kafka/cluster/Partition.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/cluster/Partition.scala b/core/src/main/scala/kafka/cluster/Partition.scala
index e88ecf2..419d336 100644
--- a/core/src/main/scala/kafka/cluster/Partition.scala
+++ b/core/src/main/scala/kafka/cluster/Partition.scala
@@ -21,7 +21,7 @@ import kafka.admin.AdminUtils
 import kafka.utils._
 import kafka.api.{PartitionStateInfo, LeaderAndIsr}
 import kafka.log.LogConfig
-import kafka.server.{TopicPartitionRequestKey, LogOffsetMetadata, OffsetManager, ReplicaManager}
+import kafka.server.{LogOffsetMetadata, OffsetManager, ReplicaManager}
 import kafka.metrics.KafkaMetricsGroup
 import kafka.controller.KafkaController
 import kafka.message.ByteBufferMessageSet
@@ -29,7 +29,6 @@ import kafka.message.ByteBufferMessageSet
 import java.io.IOException
 import java.util.concurrent.locks.ReentrantReadWriteLock
 import kafka.utils.Utils.{inReadLock,inWriteLock}
-import scala.Some
 import scala.collection.immutable.Set
 
 import com.yammer.metrics.core.Gauge
@@ -62,13 +61,13 @@ class Partition(val topic: String,
 
   private def isReplicaLocal(replicaId: Int) : Boolean = (replicaId == localBrokerId)
 
-  newGauge(
-    topic + "-" + partitionId + "-UnderReplicated",
+  newGauge("UnderReplicated",
     new Gauge[Int] {
       def value = {
         if (isUnderReplicated) 1 else 0
       }
-    }
+    },
+    Map("topic" -> topic, "partition" -> partitionId.toString)
   )
 
   def isUnderReplicated(): Boolean = {
@@ -309,7 +308,7 @@ class Partition(val topic: String,
       leaderReplica.highWatermark = newHighWatermark
       debug("High watermark for partition [%s,%d] updated to %s".format(topic, partitionId, newHighWatermark))
       // some delayed requests may be unblocked after HW changed
-      val requestKey = new TopicPartitionRequestKey(this.topic, this.partitionId)
+      val requestKey = new TopicAndPartition(this.topic, this.partitionId)
       replicaManager.unblockDelayedFetchRequests(requestKey)
       replicaManager.unblockDelayedProduceRequests(requestKey)
     } else {
@@ -379,7 +378,7 @@ class Partition(val topic: String,
 
           val info = log.append(messages, assignOffsets = true)
           // probably unblock some follower fetch requests since log end offset has been updated
-          replicaManager.unblockDelayedFetchRequests(new TopicPartitionRequestKey(this.topic, this.partitionId))
+          replicaManager.unblockDelayedFetchRequests(new TopicAndPartition(this.topic, this.partitionId))
           // we may need to increment high watermark since ISR could be down to 1
           maybeIncrementLeaderHW(leaderReplica)
           info

http://git-wip-us.apache.org/repos/asf/kafka/blob/117a02de/core/src/main/scala/kafka/common/AppInfo.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/common/AppInfo.scala b/core/src/main/scala/kafka/common/AppInfo.scala
new file mode 100644
index 0000000..d642ca5
--- /dev/null
+++ b/core/src/main/scala/kafka/common/AppInfo.scala
@@ -0,0 +1,66 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package kafka.common
+
+import java.net.URL
+import java.util.jar.{Attributes, Manifest}
+
+import com.yammer.metrics.core.Gauge
+import kafka.metrics.KafkaMetricsGroup
+
+object AppInfo extends KafkaMetricsGroup {
+  private var isRegistered = false
+  private val lock = new Object()
+
+  def registerInfo(): Unit = {
+    lock.synchronized {
+      if (isRegistered) {
+        return
+      }
+    }
+
+    try {
+      val clazz = AppInfo.getClass
+      val className = clazz.getSimpleName + ".class"
+      val classPath = clazz.getResource(className).toString
+      if (!classPath.startsWith("jar")) {
+        // Class not from JAR
+        return
+      }
+      val manifestPath = classPath.substring(0, classPath.lastIndexOf("!") + 1) + "/META-INF/MANIFEST.MF"
+
+      val mf = new Manifest
+      mf.read(new URL(manifestPath).openStream())
+      val version = mf.getMainAttributes.get(new Attributes.Name("Version")).toString
+
+      newGauge("Version",
+        new Gauge[String] {
+          def value = {
+            version
+          }
+        })
+
+      lock.synchronized {
+        isRegistered = true
+      }
+    } catch {
+      case e: Exception =>
+        warn("Can't read Kafka version from MANIFEST.MF. Possible cause: %s".format(e))
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/117a02de/core/src/main/scala/kafka/common/ClientIdAndBroker.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/common/ClientIdAndBroker.scala b/core/src/main/scala/kafka/common/ClientIdAndBroker.scala
index 93223a9..3b09041 100644
--- a/core/src/main/scala/kafka/common/ClientIdAndBroker.scala
+++ b/core/src/main/scala/kafka/common/ClientIdAndBroker.scala
@@ -8,7 +8,7 @@ package kafka.common
  * (the "License"); you may not use this file except in compliance with
  * the License.  You may obtain a copy of the License at
  *
- *    http://www.apache.org/licenses/LICENSE-2.0
+ * http://www.apache.org/licenses/LICENSE-2.0
  *
  * Unless required by applicable law or agreed to in writing, software
  * distributed under the License is distributed on an "AS IS" BASIS,
@@ -21,6 +21,14 @@ package kafka.common
  * Convenience case class since (clientId, brokerInfo) pairs are used to create
  * SyncProducer Request Stats and SimpleConsumer Request and Response Stats.
  */
-case class ClientIdAndBroker(clientId: String, brokerInfo: String) {
-  override def toString = "%s-%s".format(clientId, brokerInfo)
+
+trait ClientIdBroker {
+}
+
+case class ClientIdAndBroker(clientId: String, brokerHost: String, brokerPort: Int) extends ClientIdBroker {
+  override def toString = "%s-%s-%d".format(clientId, brokerHost, brokerPort)
+}
+
+case class ClientIdAllBrokers(clientId: String) extends ClientIdBroker {
+  override def toString = "%s-%s".format(clientId, "AllBrokers")
 }

http://git-wip-us.apache.org/repos/asf/kafka/blob/117a02de/core/src/main/scala/kafka/common/ClientIdAndTopic.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/common/ClientIdAndTopic.scala b/core/src/main/scala/kafka/common/ClientIdAndTopic.scala
index 7acf9e7..5825aad 100644
--- a/core/src/main/scala/kafka/common/ClientIdAndTopic.scala
+++ b/core/src/main/scala/kafka/common/ClientIdAndTopic.scala
@@ -1,5 +1,3 @@
-package kafka.common
-
 /**
  * Licensed to the Apache Software Foundation (ASF) under one or more
  * contributor license agreements.  See the NOTICE file distributed with
@@ -17,11 +15,21 @@ package kafka.common
  * limitations under the License.
  */
 
+package kafka.common
+
 /**
  * Convenience case class since (clientId, topic) pairs are used in the creation
  * of many Stats objects.
  */
-case class ClientIdAndTopic(clientId: String, topic: String) {
+trait ClientIdTopic {
+}
+
+case class ClientIdAndTopic(clientId: String, topic: String) extends ClientIdTopic {
   override def toString = "%s-%s".format(clientId, topic)
 }
 
+case class ClientIdAllTopics(clientId: String) extends ClientIdTopic {
+  override def toString = "%s-%s".format(clientId, "AllTopics")
+}
+
+

http://git-wip-us.apache.org/repos/asf/kafka/blob/117a02de/core/src/main/scala/kafka/consumer/ConsumerFetcherThread.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/consumer/ConsumerFetcherThread.scala b/core/src/main/scala/kafka/consumer/ConsumerFetcherThread.scala
index f8c1b4e..ee6139c 100644
--- a/core/src/main/scala/kafka/consumer/ConsumerFetcherThread.scala
+++ b/core/src/main/scala/kafka/consumer/ConsumerFetcherThread.scala
@@ -30,7 +30,7 @@ class ConsumerFetcherThread(name: String,
                             partitionMap: Map[TopicAndPartition, PartitionTopicInfo],
                             val consumerFetcherManager: ConsumerFetcherManager)
         extends AbstractFetcherThread(name = name, 
-                                      clientId = config.clientId + "-" + name,
+                                      clientId = config.clientId,
                                       sourceBroker = sourceBroker,
                                       socketTimeout = config.socketTimeoutMs,
                                       socketBufferSize = config.socketReceiveBufferBytes,

http://git-wip-us.apache.org/repos/asf/kafka/blob/117a02de/core/src/main/scala/kafka/consumer/ConsumerTopicStats.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/consumer/ConsumerTopicStats.scala b/core/src/main/scala/kafka/consumer/ConsumerTopicStats.scala
index f63e6c5..01797ff 100644
--- a/core/src/main/scala/kafka/consumer/ConsumerTopicStats.scala
+++ b/core/src/main/scala/kafka/consumer/ConsumerTopicStats.scala
@@ -20,12 +20,17 @@ package kafka.consumer
 import kafka.utils.{Pool, threadsafe, Logging}
 import java.util.concurrent.TimeUnit
 import kafka.metrics.KafkaMetricsGroup
-import kafka.common.ClientIdAndTopic
+import kafka.common.{ClientIdTopic, ClientIdAllTopics, ClientIdAndTopic}
 
 @threadsafe
-class ConsumerTopicMetrics(metricId: ClientIdAndTopic) extends KafkaMetricsGroup {
-  val messageRate = newMeter(metricId + "MessagesPerSec",  "messages", TimeUnit.SECONDS)
-  val byteRate = newMeter(metricId + "BytesPerSec",  "bytes", TimeUnit.SECONDS)
+class ConsumerTopicMetrics(metricId: ClientIdTopic) extends KafkaMetricsGroup {
+  val tags = metricId match {
+    case ClientIdAndTopic(clientId, topic) => Map("clientId" -> clientId, "topic" -> topic)
+    case ClientIdAllTopics(clientId) => Map("clientId" -> clientId)
+  }
+
+  val messageRate = newMeter("MessagesPerSec", "messages", TimeUnit.SECONDS, tags)
+  val byteRate = newMeter("BytesPerSec", "bytes", TimeUnit.SECONDS, tags)
 }
 
 /**
@@ -35,12 +40,12 @@ class ConsumerTopicMetrics(metricId: ClientIdAndTopic) extends KafkaMetricsGroup
 class ConsumerTopicStats(clientId: String) extends Logging {
   private val valueFactory = (k: ClientIdAndTopic) => new ConsumerTopicMetrics(k)
   private val stats = new Pool[ClientIdAndTopic, ConsumerTopicMetrics](Some(valueFactory))
-  private val allTopicStats = new ConsumerTopicMetrics(new ClientIdAndTopic(clientId, "AllTopics")) // to differentiate from a topic named AllTopics
+  private val allTopicStats = new ConsumerTopicMetrics(new ClientIdAllTopics(clientId)) // to differentiate from a topic named AllTopics
 
   def getConsumerAllTopicStats(): ConsumerTopicMetrics = allTopicStats
 
   def getConsumerTopicStats(topic: String): ConsumerTopicMetrics = {
-    stats.getAndMaybePut(new ClientIdAndTopic(clientId, topic + "-"))
+    stats.getAndMaybePut(new ClientIdAndTopic(clientId, topic))
   }
 }
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/117a02de/core/src/main/scala/kafka/consumer/FetchRequestAndResponseStats.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/consumer/FetchRequestAndResponseStats.scala b/core/src/main/scala/kafka/consumer/FetchRequestAndResponseStats.scala
index 5243f41..3df55e1 100644
--- a/core/src/main/scala/kafka/consumer/FetchRequestAndResponseStats.scala
+++ b/core/src/main/scala/kafka/consumer/FetchRequestAndResponseStats.scala
@@ -19,13 +19,21 @@ package kafka.consumer
 
 import java.util.concurrent.TimeUnit
 
-import kafka.common.ClientIdAndBroker
+import kafka.common.{ClientIdAllBrokers, ClientIdBroker, ClientIdAndBroker}
 import kafka.metrics.{KafkaMetricsGroup, KafkaTimer}
 import kafka.utils.Pool
 
-class FetchRequestAndResponseMetrics(metricId: ClientIdAndBroker) extends KafkaMetricsGroup {
-  val requestTimer = new KafkaTimer(newTimer(metricId + "FetchRequestRateAndTimeMs", TimeUnit.MILLISECONDS, TimeUnit.SECONDS))
-  val requestSizeHist = newHistogram(metricId + "FetchResponseSize")
+class FetchRequestAndResponseMetrics(metricId: ClientIdBroker) extends KafkaMetricsGroup {
+  val tags = metricId match {
+    case ClientIdAndBroker(clientId, brokerHost, brokerPort) =>
+      Map("clientId" -> clientId, "brokerHost" -> brokerHost,
+      "brokerPort" -> brokerPort.toString)
+    case ClientIdAllBrokers(clientId) =>
+      Map("clientId" -> clientId)
+  }
+
+  val requestTimer = new KafkaTimer(newTimer("FetchRequestRateAndTimeMs", TimeUnit.MILLISECONDS, TimeUnit.SECONDS, tags))
+  val requestSizeHist = newHistogram("FetchResponseSize", biased = true, tags)
 }
 
 /**
@@ -33,14 +41,14 @@ class FetchRequestAndResponseMetrics(metricId: ClientIdAndBroker) extends KafkaM
  * @param clientId ClientId of the given consumer
  */
 class FetchRequestAndResponseStats(clientId: String) {
-  private val valueFactory = (k: ClientIdAndBroker) => new FetchRequestAndResponseMetrics(k)
-  private val stats = new Pool[ClientIdAndBroker, FetchRequestAndResponseMetrics](Some(valueFactory))
-  private val allBrokersStats = new FetchRequestAndResponseMetrics(new ClientIdAndBroker(clientId, "AllBrokers"))
+  private val valueFactory = (k: ClientIdBroker) => new FetchRequestAndResponseMetrics(k)
+  private val stats = new Pool[ClientIdBroker, FetchRequestAndResponseMetrics](Some(valueFactory))
+  private val allBrokersStats = new FetchRequestAndResponseMetrics(new ClientIdAllBrokers(clientId))
 
   def getFetchRequestAndResponseAllBrokersStats(): FetchRequestAndResponseMetrics = allBrokersStats
 
-  def getFetchRequestAndResponseStats(brokerInfo: String): FetchRequestAndResponseMetrics = {
-    stats.getAndMaybePut(new ClientIdAndBroker(clientId, brokerInfo + "-"))
+  def getFetchRequestAndResponseStats(brokerHost: String, brokerPort: Int): FetchRequestAndResponseMetrics = {
+    stats.getAndMaybePut(new ClientIdAndBroker(clientId, brokerHost, brokerPort))
   }
 }
 
@@ -56,7 +64,7 @@ object FetchRequestAndResponseStatsRegistry {
   }
 
   def removeConsumerFetchRequestAndResponseStats(clientId: String) {
-    val pattern = (clientId + "-ConsumerFetcherThread.*").r
+    val pattern = (".*" + clientId + ".*").r
     val keys = globalStats.keys
     for (key <- keys) {
       pattern.findFirstIn(key) match {

http://git-wip-us.apache.org/repos/asf/kafka/blob/117a02de/core/src/main/scala/kafka/consumer/SimpleConsumer.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/consumer/SimpleConsumer.scala b/core/src/main/scala/kafka/consumer/SimpleConsumer.scala
index d349a30..e53ee51 100644
--- a/core/src/main/scala/kafka/consumer/SimpleConsumer.scala
+++ b/core/src/main/scala/kafka/consumer/SimpleConsumer.scala
@@ -36,7 +36,6 @@ class SimpleConsumer(val host: String,
   ConsumerConfig.validateClientId(clientId)
   private val lock = new Object()
   private val blockingChannel = new BlockingChannel(host, port, bufferSize, BlockingChannel.UseDefaultBufferSize, soTimeout)
-  val brokerInfo = "host_%s-port_%s".format(host, port)
   private val fetchRequestAndResponseStats = FetchRequestAndResponseStatsRegistry.getFetchRequestAndResponseStats(clientId)
   private var isClosed = false
 
@@ -106,7 +105,7 @@ class SimpleConsumer(val host: String,
    */
   def fetch(request: FetchRequest): FetchResponse = {
     var response: Receive = null
-    val specificTimer = fetchRequestAndResponseStats.getFetchRequestAndResponseStats(brokerInfo).requestTimer
+    val specificTimer = fetchRequestAndResponseStats.getFetchRequestAndResponseStats(host, port).requestTimer
     val aggregateTimer = fetchRequestAndResponseStats.getFetchRequestAndResponseAllBrokersStats.requestTimer
     aggregateTimer.time {
       specificTimer.time {
@@ -115,7 +114,7 @@ class SimpleConsumer(val host: String,
     }
     val fetchResponse = FetchResponse.readFrom(response.buffer)
     val fetchedSize = fetchResponse.sizeInBytes
-    fetchRequestAndResponseStats.getFetchRequestAndResponseStats(brokerInfo).requestSizeHist.update(fetchedSize)
+    fetchRequestAndResponseStats.getFetchRequestAndResponseStats(host, port).requestSizeHist.update(fetchedSize)
     fetchRequestAndResponseStats.getFetchRequestAndResponseAllBrokersStats.requestSizeHist.update(fetchedSize)
     fetchResponse
   }

http://git-wip-us.apache.org/repos/asf/kafka/blob/117a02de/core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala b/core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala
index f476973..fe9d8e0 100644
--- a/core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala
+++ b/core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala
@@ -104,9 +104,9 @@ private[kafka] class ZookeeperConsumerConnector(val config: ConsumerConfig,
   private var wildcardTopicWatcher: ZookeeperTopicEventWatcher = null
 
   // useful for tracking migration of consumers to store offsets in kafka
-  private val kafkaCommitMeter = newMeter(config.clientId + "-KafkaCommitsPerSec", "commits", TimeUnit.SECONDS)
-  private val zkCommitMeter = newMeter(config.clientId + "-ZooKeeperCommitsPerSec", "commits", TimeUnit.SECONDS)
-  private val rebalanceTimer = new KafkaTimer(newTimer(config.clientId + "-RebalanceRateAndTime", TimeUnit.MILLISECONDS, TimeUnit.SECONDS))
+  private val kafkaCommitMeter = newMeter("KafkaCommitsPerSec", "commits", TimeUnit.SECONDS, Map("clientId" -> config.clientId))
+  private val zkCommitMeter = newMeter("ZooKeeperCommitsPerSec", "commits", TimeUnit.SECONDS, Map("clientId" -> config.clientId))
+  private val rebalanceTimer = new KafkaTimer(newTimer("RebalanceRateAndTime", TimeUnit.MILLISECONDS, TimeUnit.SECONDS, Map("clientId" -> config.clientId)))
 
   val consumerIdString = {
     var consumerUuid : String = null
@@ -138,6 +138,7 @@ private[kafka] class ZookeeperConsumerConnector(val config: ConsumerConfig,
   }
 
   KafkaMetricsReporter.startReporters(config.props)
+  AppInfo.registerInfo()
 
   def this(config: ConsumerConfig) = this(config, true)
 
@@ -516,14 +517,15 @@ private[kafka] class ZookeeperConsumerConnector(val config: ConsumerConfig,
     private var isWatcherTriggered = false
     private val lock = new ReentrantLock
     private val cond = lock.newCondition()
-    
+
     @volatile private var allTopicsOwnedPartitionsCount = 0
-    newGauge(config.clientId + "-" + config.groupId + "-AllTopicsOwnedPartitionsCount", new Gauge[Int] {
-      def value() = allTopicsOwnedPartitionsCount
-    })
+    newGauge("OwnedPartitionsCount",
+      new Gauge[Int] {
+        def value() = allTopicsOwnedPartitionsCount
+      },
+      Map("clientId" -> config.clientId, "groupId" -> config.groupId))
 
-    private def ownedPartitionsCountMetricName(topic: String) =
-      "%s-%s-%s-OwnedPartitionsCount".format(config.clientId, config.groupId, topic)
+    private def ownedPartitionsCountMetricTags(topic: String) = Map("clientId" -> config.clientId, "groupId" -> config.groupId, "topic" -> topic)
 
     private val watcherExecutorThread = new Thread(consumerIdString + "_watcher_executor") {
       override def run() {
@@ -576,7 +578,7 @@ private[kafka] class ZookeeperConsumerConnector(val config: ConsumerConfig,
         for(partition <- infos.keys) {
           deletePartitionOwnershipFromZK(topic, partition)
         }
-        removeMetric(ownedPartitionsCountMetricName(topic))
+        removeMetric("OwnedPartitionsCount", ownedPartitionsCountMetricTags(topic))
         localTopicRegistry.remove(topic)
       }
       allTopicsOwnedPartitionsCount = 0
@@ -679,9 +681,11 @@ private[kafka] class ZookeeperConsumerConnector(val config: ConsumerConfig,
 
             partitionOwnershipDecision.view.groupBy { case(topicPartition, consumerThreadId) => topicPartition.topic }
                                       .foreach { case (topic, partitionThreadPairs) =>
-              newGauge(ownedPartitionsCountMetricName(topic), new Gauge[Int] {
-                def value() = partitionThreadPairs.size
-              })
+              newGauge("OwnedPartitionsCount",
+                new Gauge[Int] {
+                  def value() = partitionThreadPairs.size
+                },
+                ownedPartitionsCountMetricTags(topic))
             }
 
             topicRegistry = currentTopicRegistry
@@ -863,10 +867,13 @@ private[kafka] class ZookeeperConsumerConnector(val config: ConsumerConfig,
       topicThreadIdAndQueues.put(topicThreadId, q)
       debug("Adding topicThreadId %s and queue %s to topicThreadIdAndQueues data structure".format(topicThreadId, q.toString))
       newGauge(
-        config.clientId + "-" + config.groupId + "-" + topicThreadId._1 + "-" + topicThreadId._2 + "-FetchQueueSize",
+        "FetchQueueSize",
         new Gauge[Int] {
           def value = q.size
-        }
+        },
+        Map("clientId" -> config.clientId,
+          "topic" -> topicThreadId._1,
+          "threadId" -> topicThreadId._2.threadId.toString)
       )
     })
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/117a02de/core/src/main/scala/kafka/log/Log.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/log/Log.scala b/core/src/main/scala/kafka/log/Log.scala
index 157d673..ec19215 100644
--- a/core/src/main/scala/kafka/log/Log.scala
+++ b/core/src/main/scala/kafka/log/Log.scala
@@ -73,17 +73,31 @@ class Log(val dir: File,
 
   info("Completed load of log %s with log end offset %d".format(name, logEndOffset))
 
-  newGauge(name + "-" + "NumLogSegments",
-           new Gauge[Int] { def value = numberOfSegments })
-
-  newGauge(name + "-" + "LogStartOffset",
-           new Gauge[Long] { def value = logStartOffset })
-
-  newGauge(name + "-" + "LogEndOffset",
-           new Gauge[Long] { def value = logEndOffset })
-           
-  newGauge(name + "-" + "Size", 
-           new Gauge[Long] {def value = size})
+  val tags = Map("topic" -> topicAndPartition.topic, "partition" -> topicAndPartition.partition.toString)
+
+  newGauge("NumLogSegments",
+    new Gauge[Int] {
+      def value = numberOfSegments
+    },
+    tags)
+
+  newGauge("LogStartOffset",
+    new Gauge[Long] {
+      def value = logStartOffset
+    },
+    tags)
+
+  newGauge("LogEndOffset",
+    new Gauge[Long] {
+      def value = logEndOffset
+    },
+    tags)
+
+  newGauge("Size",
+    new Gauge[Long] {
+      def value = size
+    },
+    tags)
 
   /** The name of this log */
   def name  = dir.getName()
@@ -153,7 +167,7 @@ class Log(val dir: File,
 
     if(logSegments.size == 0) {
       // no existing segments, create a new mutable segment beginning at offset 0
-      segments.put(0, new LogSegment(dir = dir, 
+      segments.put(0L, new LogSegment(dir = dir,
                                      startOffset = 0,
                                      indexIntervalBytes = config.indexInterval, 
                                      maxIndexSize = config.maxIndexSize,

http://git-wip-us.apache.org/repos/asf/kafka/blob/117a02de/core/src/main/scala/kafka/metrics/KafkaMetricsGroup.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/metrics/KafkaMetricsGroup.scala b/core/src/main/scala/kafka/metrics/KafkaMetricsGroup.scala
index 2313a57..e9e4918 100644
--- a/core/src/main/scala/kafka/metrics/KafkaMetricsGroup.scala
+++ b/core/src/main/scala/kafka/metrics/KafkaMetricsGroup.scala
@@ -6,7 +6,7 @@
  * (the "License"); you may not use this file except in compliance with
  * the License.  You may obtain a copy of the License at
  *
- *    http://www.apache.org/licenses/LICENSE-2.0
+ * http://www.apache.org/licenses/LICENSE-2.0
  *
  * Unless required by applicable law or agreed to in writing, software
  * distributed under the License is distributed on an "AS IS" BASIS,
@@ -35,29 +35,52 @@ trait KafkaMetricsGroup extends Logging {
    * Creates a new MetricName object for gauges, meters, etc. created for this
    * metrics group.
    * @param name Descriptive name of the metric.
+   * @param tags Additional attributes which mBean will have.
    * @return Sanitized metric name object.
    */
-  private def metricName(name: String) = {
+  private def metricName(name: String, tags: scala.collection.Map[String, String] = Map.empty) = {
     val klass = this.getClass
     val pkg = if (klass.getPackage == null) "" else klass.getPackage.getName
     val simpleName = klass.getSimpleName.replaceAll("\\$$", "")
-    new MetricName(pkg, simpleName, name)
+
+    explicitMetricName(pkg, simpleName, name, tags)
   }
 
-  def newGauge[T](name: String, metric: Gauge[T]) =
-    Metrics.defaultRegistry().newGauge(metricName(name), metric)
 
-  def newMeter(name: String, eventType: String, timeUnit: TimeUnit) =
-    Metrics.defaultRegistry().newMeter(metricName(name), eventType, timeUnit)
+  private def explicitMetricName(group: String, typeName: String, name: String, tags: scala.collection.Map[String, String] = Map.empty) = {
+    val nameBuilder: StringBuilder = new StringBuilder
+
+    nameBuilder.append(group)
+
+    nameBuilder.append(":type=")
+
+    nameBuilder.append(typeName)
 
-  def newHistogram(name: String, biased: Boolean = true) =
-    Metrics.defaultRegistry().newHistogram(metricName(name), biased)
+    if (name.length > 0) {
+      nameBuilder.append(",name=")
+      nameBuilder.append(name)
+    }
+
+    KafkaMetricsGroup.toMBeanName(tags).map(mbeanName => nameBuilder.append(",").append(mbeanName))
+
+    new MetricName(group, typeName, name, null, nameBuilder.toString())
+  }
 
-  def newTimer(name: String, durationUnit: TimeUnit, rateUnit: TimeUnit) =
-    Metrics.defaultRegistry().newTimer(metricName(name), durationUnit, rateUnit)
+  def newGauge[T](name: String, metric: Gauge[T], tags: scala.collection.Map[String, String] = Map.empty) =
+    Metrics.defaultRegistry().newGauge(metricName(name, tags), metric)
+
+  def newMeter(name: String, eventType: String, timeUnit: TimeUnit, tags: scala.collection.Map[String, String] = Map.empty) =
+    Metrics.defaultRegistry().newMeter(metricName(name, tags), eventType, timeUnit)
+
+  def newHistogram(name: String, biased: Boolean = true, tags: scala.collection.Map[String, String] = Map.empty) =
+    Metrics.defaultRegistry().newHistogram(metricName(name, tags), biased)
+
+  def newTimer(name: String, durationUnit: TimeUnit, rateUnit: TimeUnit, tags: scala.collection.Map[String, String] = Map.empty) =
+    Metrics.defaultRegistry().newTimer(metricName(name, tags), durationUnit, rateUnit)
+
+  def removeMetric(name: String, tags: scala.collection.Map[String, String] = Map.empty) =
+    Metrics.defaultRegistry().removeMetric(metricName(name, tags))
 
-  def removeMetric(name: String) =
-    Metrics.defaultRegistry().removeMetric(metricName(name))
 
 }
 
@@ -68,72 +91,75 @@ object KafkaMetricsGroup extends KafkaMetricsGroup with Logging {
    */
   private val consumerMetricNameList: immutable.List[MetricName] = immutable.List[MetricName](
     // kafka.consumer.ZookeeperConsumerConnector
-    new MetricName("kafka.consumer", "ZookeeperConsumerConnector", "-FetchQueueSize"),
-    new MetricName("kafka.consumer", "ZookeeperConsumerConnector", "-KafkaCommitsPerSec"),
-    new MetricName("kafka.consumer", "ZookeeperConsumerConnector", "-ZooKeeperCommitsPerSec"),
-    new MetricName("kafka.consumer", "ZookeeperConsumerConnector", "-RebalanceRateAndTime"),
-    new MetricName("kafka.consumer", "ZookeeperConsumerConnector", "-OwnedPartitionsCount"),
-    new MetricName("kafka.consumer", "ZookeeperConsumerConnector", "AllTopicsOwnedPartitionsCount"),
+    new MetricName("kafka.consumer", "ZookeeperConsumerConnector", "FetchQueueSize"),
+    new MetricName("kafka.consumer", "ZookeeperConsumerConnector", "KafkaCommitsPerSec"),
+    new MetricName("kafka.consumer", "ZookeeperConsumerConnector", "ZooKeeperCommitsPerSec"),
+    new MetricName("kafka.consumer", "ZookeeperConsumerConnector", "RebalanceRateAndTime"),
+    new MetricName("kafka.consumer", "ZookeeperConsumerConnector", "OwnedPartitionsCount"),
 
     // kafka.consumer.ConsumerFetcherManager
-    new MetricName("kafka.consumer", "ConsumerFetcherManager", "-MaxLag"),
-    new MetricName("kafka.consumer", "ConsumerFetcherManager", "-MinFetchRate"),
+    new MetricName("kafka.consumer", "ConsumerFetcherManager", "MaxLag"),
+    new MetricName("kafka.consumer", "ConsumerFetcherManager", "MinFetchRate"),
 
     // kafka.server.AbstractFetcherThread <-- kafka.consumer.ConsumerFetcherThread
-    new MetricName("kafka.server", "FetcherLagMetrics", "-ConsumerLag"),
+    new MetricName("kafka.server", "FetcherLagMetrics", "ConsumerLag"),
 
     // kafka.consumer.ConsumerTopicStats <-- kafka.consumer.{ConsumerIterator, PartitionTopicInfo}
-    new MetricName("kafka.consumer", "ConsumerTopicMetrics", "-MessagesPerSec"),
-    new MetricName("kafka.consumer", "ConsumerTopicMetrics", "-AllTopicsMessagesPerSec"),
+    new MetricName("kafka.consumer", "ConsumerTopicMetrics", "MessagesPerSec"),
 
     // kafka.consumer.ConsumerTopicStats
-    new MetricName("kafka.consumer", "ConsumerTopicMetrics", "-BytesPerSec"),
-    new MetricName("kafka.consumer", "ConsumerTopicMetrics", "-AllTopicsBytesPerSec"),
+    new MetricName("kafka.consumer", "ConsumerTopicMetrics", "BytesPerSec"),
 
     // kafka.server.AbstractFetcherThread <-- kafka.consumer.ConsumerFetcherThread
-    new MetricName("kafka.server", "FetcherStats", "-BytesPerSec"),
-    new MetricName("kafka.server", "FetcherStats", "-RequestsPerSec"),
+    new MetricName("kafka.server", "FetcherStats", "BytesPerSec"),
+    new MetricName("kafka.server", "FetcherStats", "RequestsPerSec"),
 
     // kafka.consumer.FetchRequestAndResponseStats <-- kafka.consumer.SimpleConsumer
-    new MetricName("kafka.consumer", "FetchRequestAndResponseMetrics", "-FetchResponseSize"),
-    new MetricName("kafka.consumer", "FetchRequestAndResponseMetrics", "-FetchRequestRateAndTimeMs"),
-    new MetricName("kafka.consumer", "FetchRequestAndResponseMetrics", "-AllBrokersFetchResponseSize"),
-    new MetricName("kafka.consumer", "FetchRequestAndResponseMetrics", "-AllBrokersFetchRequestRateAndTimeMs"),
+    new MetricName("kafka.consumer", "FetchRequestAndResponseMetrics", "FetchResponseSize"),
+    new MetricName("kafka.consumer", "FetchRequestAndResponseMetrics", "FetchRequestRateAndTimeMs"),
 
     /**
      * ProducerRequestStats <-- SyncProducer
      * metric for SyncProducer in fetchTopicMetaData() needs to be removed when consumer is closed.
      */
-    new MetricName("kafka.producer", "ProducerRequestMetrics", "-ProducerRequestRateAndTimeMs"),
-    new MetricName("kafka.producer", "ProducerRequestMetrics", "-ProducerRequestSize"),
-    new MetricName("kafka.producer", "ProducerRequestMetrics", "-AllBrokersProducerRequestRateAndTimeMs"),
-    new MetricName("kafka.producer", "ProducerRequestMetrics", "-AllBrokersProducerRequestSize")
+    new MetricName("kafka.producer", "ProducerRequestMetrics", "ProducerRequestRateAndTimeMs"),
+    new MetricName("kafka.producer", "ProducerRequestMetrics", "ProducerRequestSize")
   )
 
-  private val producerMetricNameList: immutable.List[MetricName] = immutable.List[MetricName] (
+  private val producerMetricNameList: immutable.List[MetricName] = immutable.List[MetricName](
     // kafka.producer.ProducerStats <-- DefaultEventHandler <-- Producer
-    new MetricName("kafka.producer", "ProducerStats", "-SerializationErrorsPerSec"),
-    new MetricName("kafka.producer", "ProducerStats", "-ResendsPerSec"),
-    new MetricName("kafka.producer", "ProducerStats", "-FailedSendsPerSec"),
+    new MetricName("kafka.producer", "ProducerStats", "SerializationErrorsPerSec"),
+    new MetricName("kafka.producer", "ProducerStats", "ResendsPerSec"),
+    new MetricName("kafka.producer", "ProducerStats", "FailedSendsPerSec"),
 
     // kafka.producer.ProducerSendThread
-    new MetricName("kafka.producer.async", "ProducerSendThread", "-ProducerQueueSize"),
+    new MetricName("kafka.producer.async", "ProducerSendThread", "ProducerQueueSize"),
 
     // kafka.producer.ProducerTopicStats <-- kafka.producer.{Producer, async.DefaultEventHandler}
-    new MetricName("kafka.producer", "ProducerTopicMetrics", "-MessagesPerSec"),
-    new MetricName("kafka.producer", "ProducerTopicMetrics", "-DroppedMessagesPerSec"),
-    new MetricName("kafka.producer", "ProducerTopicMetrics", "-BytesPerSec"),
-    new MetricName("kafka.producer", "ProducerTopicMetrics", "-AllTopicsMessagesPerSec"),
-    new MetricName("kafka.producer", "ProducerTopicMetrics", "-AllTopicsDroppedMessagesPerSec"),
-    new MetricName("kafka.producer", "ProducerTopicMetrics", "-AllTopicsBytesPerSec"),
+    new MetricName("kafka.producer", "ProducerTopicMetrics", "MessagesPerSec"),
+    new MetricName("kafka.producer", "ProducerTopicMetrics", "DroppedMessagesPerSec"),
+    new MetricName("kafka.producer", "ProducerTopicMetrics", "BytesPerSec"),
 
     // kafka.producer.ProducerRequestStats <-- SyncProducer
-    new MetricName("kafka.producer", "ProducerRequestMetrics", "-ProducerRequestRateAndTimeMs"),
-    new MetricName("kafka.producer", "ProducerRequestMetrics", "-ProducerRequestSize"),
-    new MetricName("kafka.producer", "ProducerRequestMetrics", "-AllBrokersProducerRequestRateAndTimeMs"),
-    new MetricName("kafka.producer", "ProducerRequestMetrics", "-AllBrokersProducerRequestSize")
+    new MetricName("kafka.producer", "ProducerRequestMetrics", "ProducerRequestRateAndTimeMs"),
+    new MetricName("kafka.producer", "ProducerRequestMetrics", "ProducerRequestSize")
   )
 
+  private def toMBeanName(tags: collection.Map[String, String]): Option[String] = {
+    val filteredTags = tags
+      .filter { case (tagKey, tagValue) => tagValue != ""}
+    if (filteredTags.nonEmpty) {
+      val tagsString = filteredTags
+        .map { case (key, value) => "%s=%s".format(key, value)}
+        .mkString(",")
+
+      Some(tagsString)
+    }
+    else {
+      None
+    }
+  }
+
   def removeAllConsumerMetrics(clientId: String) {
     FetchRequestAndResponseStatsRegistry.removeConsumerFetchRequestAndResponseStats(clientId)
     ConsumerTopicStatsRegistry.removeConsumerTopicStat(clientId)
@@ -150,18 +176,19 @@ object KafkaMetricsGroup extends KafkaMetricsGroup with Logging {
 
   private def removeAllMetricsInList(metricNameList: immutable.List[MetricName], clientId: String) {
     metricNameList.foreach(metric => {
-      val pattern = (clientId + ".*" + metric.getName +".*").r
+      val pattern = (".*clientId=" + clientId + ".*").r
       val registeredMetrics = scala.collection.JavaConversions.asScalaSet(Metrics.defaultRegistry().allMetrics().keySet())
       for (registeredMetric <- registeredMetrics) {
         if (registeredMetric.getGroup == metric.getGroup &&
+          registeredMetric.getName == metric.getName &&
           registeredMetric.getType == metric.getType) {
-          pattern.findFirstIn(registeredMetric.getName) match {
+          pattern.findFirstIn(registeredMetric.getMBeanName) match {
             case Some(_) => {
               val beforeRemovalSize = Metrics.defaultRegistry().allMetrics().keySet().size
               Metrics.defaultRegistry().removeMetric(registeredMetric)
               val afterRemovalSize = Metrics.defaultRegistry().allMetrics().keySet().size
               trace("Removing metric %s. Metrics registry size reduced from %d to %d".format(
-                  registeredMetric, beforeRemovalSize, afterRemovalSize))
+                registeredMetric, beforeRemovalSize, afterRemovalSize))
             }
             case _ =>
           }

http://git-wip-us.apache.org/repos/asf/kafka/blob/117a02de/core/src/main/scala/kafka/network/RequestChannel.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/network/RequestChannel.scala b/core/src/main/scala/kafka/network/RequestChannel.scala
index 4560d8f..7b1db3d 100644
--- a/core/src/main/scala/kafka/network/RequestChannel.scala
+++ b/core/src/main/scala/kafka/network/RequestChannel.scala
@@ -125,12 +125,12 @@ class RequestChannel(val numProcessors: Int, val queueSize: Int) extends KafkaMe
     def value = responseQueues.foldLeft(0) {(total, q) => total + q.size()}
   })
 
-  for(i <- 0 until numProcessors) {
-    newGauge(
-      "Processor-" + i + "-ResponseQueueSize",
+  for (i <- 0 until numProcessors) {
+    newGauge("ResponseQueueSize",
       new Gauge[Int] {
         def value = responseQueues(i).size()
-      }
+      },
+      Map("processor" -> i.toString)
     )
   }
 
@@ -187,24 +187,25 @@ class RequestChannel(val numProcessors: Int, val queueSize: Int) extends KafkaMe
 
 object RequestMetrics {
   val metricsMap = new scala.collection.mutable.HashMap[String, RequestMetrics]
-  val consumerFetchMetricName = RequestKeys.nameForKey(RequestKeys.FetchKey) + "-Consumer"
-  val followFetchMetricName = RequestKeys.nameForKey(RequestKeys.FetchKey) + "-Follower"
+  val consumerFetchMetricName = RequestKeys.nameForKey(RequestKeys.FetchKey) + "Consumer"
+  val followFetchMetricName = RequestKeys.nameForKey(RequestKeys.FetchKey) + "Follower"
   (RequestKeys.keyToNameAndDeserializerMap.values.map(e => e._1)
     ++ List(consumerFetchMetricName, followFetchMetricName)).foreach(name => metricsMap.put(name, new RequestMetrics(name)))
 }
 
 class RequestMetrics(name: String) extends KafkaMetricsGroup {
-  val requestRate = newMeter(name + "-RequestsPerSec",  "requests", TimeUnit.SECONDS)
+  val tags = Map("request" -> name)
+  val requestRate = newMeter("RequestsPerSec", "requests", TimeUnit.SECONDS, tags)
   // time a request spent in a request queue
-  val requestQueueTimeHist = newHistogram(name + "-RequestQueueTimeMs")
+  val requestQueueTimeHist = newHistogram("RequestQueueTimeMs", biased = true, tags)
   // time a request takes to be processed at the local broker
-  val localTimeHist = newHistogram(name + "-LocalTimeMs")
+  val localTimeHist = newHistogram("LocalTimeMs", biased = true, tags)
   // time a request takes to wait on remote brokers (only relevant to fetch and produce requests)
-  val remoteTimeHist = newHistogram(name + "-RemoteTimeMs")
+  val remoteTimeHist = newHistogram("RemoteTimeMs", biased = true, tags)
   // time a response spent in a response queue
-  val responseQueueTimeHist = newHistogram(name + "-ResponseQueueTimeMs")
+  val responseQueueTimeHist = newHistogram("ResponseQueueTimeMs", biased = true, tags)
   // time to send the response to the requester
-  val responseSendTimeHist = newHistogram(name + "-ResponseSendTimeMs")
-  val totalTimeHist = newHistogram(name + "-TotalTimeMs")
+  val responseSendTimeHist = newHistogram("ResponseSendTimeMs", biased = true, tags)
+  val totalTimeHist = newHistogram("TotalTimeMs", biased = true, tags)
 }
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/117a02de/core/src/main/scala/kafka/network/SocketServer.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/network/SocketServer.scala b/core/src/main/scala/kafka/network/SocketServer.scala
index cee76b3..e451592 100644
--- a/core/src/main/scala/kafka/network/SocketServer.scala
+++ b/core/src/main/scala/kafka/network/SocketServer.scala
@@ -67,7 +67,7 @@ class SocketServer(val brokerId: Int,
                                     time, 
                                     maxRequestSize, 
                                     aggregateIdleMeter,
-                                    newMeter("NetworkProcessor-" + i + "-IdlePercent", "percent", TimeUnit.NANOSECONDS),
+                                    newMeter("IdlePercent", "percent", TimeUnit.NANOSECONDS, Map("networkProcessor" -> i.toString)),
                                     numProcessorThreads, 
                                     requestChannel,
                                     quotas,

http://git-wip-us.apache.org/repos/asf/kafka/blob/117a02de/core/src/main/scala/kafka/producer/Producer.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/producer/Producer.scala b/core/src/main/scala/kafka/producer/Producer.scala
index cd634f6..e38d2fa 100644
--- a/core/src/main/scala/kafka/producer/Producer.scala
+++ b/core/src/main/scala/kafka/producer/Producer.scala
@@ -19,7 +19,7 @@ package kafka.producer
 import java.util.concurrent.atomic.AtomicBoolean
 import java.util.concurrent.{LinkedBlockingQueue, TimeUnit}
 
-import kafka.common.QueueFullException
+import kafka.common.{AppInfo, QueueFullException}
 import kafka.metrics._
 import kafka.producer.async.{DefaultEventHandler, EventHandler, ProducerSendThread}
 import kafka.serializer.Encoder
@@ -53,6 +53,7 @@ class Producer[K,V](val config: ProducerConfig,
   private val producerTopicStats = ProducerTopicStatsRegistry.getProducerTopicStats(config.clientId)
 
   KafkaMetricsReporter.startReporters(config.props)
+  AppInfo.registerInfo()
 
   def this(config: ProducerConfig) =
     this(config,

http://git-wip-us.apache.org/repos/asf/kafka/blob/117a02de/core/src/main/scala/kafka/producer/ProducerRequestStats.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/producer/ProducerRequestStats.scala b/core/src/main/scala/kafka/producer/ProducerRequestStats.scala
index 1c46d72..026e93a 100644
--- a/core/src/main/scala/kafka/producer/ProducerRequestStats.scala
+++ b/core/src/main/scala/kafka/producer/ProducerRequestStats.scala
@@ -19,11 +19,16 @@ package kafka.producer
 import kafka.metrics.{KafkaTimer, KafkaMetricsGroup}
 import java.util.concurrent.TimeUnit
 import kafka.utils.Pool
-import kafka.common.ClientIdAndBroker
+import kafka.common.{ClientIdAllBrokers, ClientIdBroker, ClientIdAndBroker}
 
-class ProducerRequestMetrics(metricId: ClientIdAndBroker) extends KafkaMetricsGroup {
-  val requestTimer = new KafkaTimer(newTimer(metricId + "ProducerRequestRateAndTimeMs", TimeUnit.MILLISECONDS, TimeUnit.SECONDS))
-  val requestSizeHist = newHistogram(metricId + "ProducerRequestSize")
+class ProducerRequestMetrics(metricId: ClientIdBroker) extends KafkaMetricsGroup {
+  val tags = metricId match {
+    case ClientIdAndBroker(clientId, brokerHost, brokerPort) => Map("clientId" -> clientId, "brokerHost" -> brokerHost, "brokerPort" -> brokerPort.toString)
+    case ClientIdAllBrokers(clientId) => Map("clientId" -> clientId)
+  }
+
+  val requestTimer = new KafkaTimer(newTimer("ProducerRequestRateAndTimeMs", TimeUnit.MILLISECONDS, TimeUnit.SECONDS, tags))
+  val requestSizeHist = newHistogram("ProducerRequestSize", biased = true, tags)
 }
 
 /**
@@ -31,14 +36,14 @@ class ProducerRequestMetrics(metricId: ClientIdAndBroker) extends KafkaMetricsGr
  * @param clientId ClientId of the given producer
  */
 class ProducerRequestStats(clientId: String) {
-  private val valueFactory = (k: ClientIdAndBroker) => new ProducerRequestMetrics(k)
-  private val stats = new Pool[ClientIdAndBroker, ProducerRequestMetrics](Some(valueFactory))
-  private val allBrokersStats = new ProducerRequestMetrics(new ClientIdAndBroker(clientId, "AllBrokers"))
+  private val valueFactory = (k: ClientIdBroker) => new ProducerRequestMetrics(k)
+  private val stats = new Pool[ClientIdBroker, ProducerRequestMetrics](Some(valueFactory))
+  private val allBrokersStats = new ProducerRequestMetrics(new ClientIdAllBrokers(clientId))
 
   def getProducerRequestAllBrokersStats(): ProducerRequestMetrics = allBrokersStats
 
-  def getProducerRequestStats(brokerInfo: String): ProducerRequestMetrics = {
-    stats.getAndMaybePut(new ClientIdAndBroker(clientId, brokerInfo + "-"))
+  def getProducerRequestStats(brokerHost: String, brokerPort: Int): ProducerRequestMetrics = {
+    stats.getAndMaybePut(new ClientIdAndBroker(clientId, brokerHost, brokerPort))
   }
 }
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/117a02de/core/src/main/scala/kafka/producer/ProducerStats.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/producer/ProducerStats.scala b/core/src/main/scala/kafka/producer/ProducerStats.scala
index 35e3aae..1d0fa88 100644
--- a/core/src/main/scala/kafka/producer/ProducerStats.scala
+++ b/core/src/main/scala/kafka/producer/ProducerStats.scala
@@ -21,9 +21,10 @@ import java.util.concurrent.TimeUnit
 import kafka.utils.Pool
 
 class ProducerStats(clientId: String) extends KafkaMetricsGroup {
-  val serializationErrorRate = newMeter(clientId + "-SerializationErrorsPerSec",  "errors", TimeUnit.SECONDS)
-  val resendRate = newMeter(clientId + "-ResendsPerSec",  "resends", TimeUnit.SECONDS)
-  val failedSendRate = newMeter(clientId + "-FailedSendsPerSec",  "failed sends", TimeUnit.SECONDS)
+  val tags: Map[String, String] = Map("clientId" -> clientId)
+  val serializationErrorRate = newMeter("SerializationErrorsPerSec", "errors", TimeUnit.SECONDS, tags)
+  val resendRate = newMeter("ResendsPerSec", "resends", TimeUnit.SECONDS, tags)
+  val failedSendRate = newMeter("FailedSendsPerSec", "failed sends", TimeUnit.SECONDS, tags)
 }
 
 /**

http://git-wip-us.apache.org/repos/asf/kafka/blob/117a02de/core/src/main/scala/kafka/producer/ProducerTopicStats.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/producer/ProducerTopicStats.scala b/core/src/main/scala/kafka/producer/ProducerTopicStats.scala
index 9bb1419..97594c8 100644
--- a/core/src/main/scala/kafka/producer/ProducerTopicStats.scala
+++ b/core/src/main/scala/kafka/producer/ProducerTopicStats.scala
@@ -17,16 +17,21 @@
 package kafka.producer
 
 import kafka.metrics.KafkaMetricsGroup
-import kafka.common.ClientIdAndTopic
+import kafka.common.{ClientIdTopic, ClientIdAllTopics, ClientIdAndTopic}
 import kafka.utils.{Pool, threadsafe}
 import java.util.concurrent.TimeUnit
 
 
 @threadsafe
-class ProducerTopicMetrics(metricId: ClientIdAndTopic) extends KafkaMetricsGroup {
-  val messageRate = newMeter(metricId + "MessagesPerSec", "messages", TimeUnit.SECONDS)
-  val byteRate = newMeter(metricId + "BytesPerSec", "bytes", TimeUnit.SECONDS)
-  val droppedMessageRate = newMeter(metricId + "DroppedMessagesPerSec", "drops", TimeUnit.SECONDS)
+class ProducerTopicMetrics(metricId: ClientIdTopic) extends KafkaMetricsGroup {
+  val tags = metricId match {
+    case ClientIdAndTopic(clientId, topic) => Map("clientId" -> clientId, "topic" -> topic)
+    case ClientIdAllTopics(clientId) => Map("clientId" -> clientId)
+  }
+
+  val messageRate = newMeter("MessagesPerSec", "messages", TimeUnit.SECONDS, tags)
+  val byteRate = newMeter("BytesPerSec", "bytes", TimeUnit.SECONDS, tags)
+  val droppedMessageRate = newMeter("DroppedMessagesPerSec", "drops", TimeUnit.SECONDS, tags)
 }
 
 /**
@@ -34,14 +39,14 @@ class ProducerTopicMetrics(metricId: ClientIdAndTopic) extends KafkaMetricsGroup
  * @param clientId The clientId of the given producer client.
  */
 class ProducerTopicStats(clientId: String) {
-  private val valueFactory = (k: ClientIdAndTopic) => new ProducerTopicMetrics(k)
-  private val stats = new Pool[ClientIdAndTopic, ProducerTopicMetrics](Some(valueFactory))
-  private val allTopicsStats = new ProducerTopicMetrics(new ClientIdAndTopic(clientId, "AllTopics")) // to differentiate from a topic named AllTopics
+  private val valueFactory = (k: ClientIdTopic) => new ProducerTopicMetrics(k)
+  private val stats = new Pool[ClientIdTopic, ProducerTopicMetrics](Some(valueFactory))
+  private val allTopicsStats = new ProducerTopicMetrics(new ClientIdAllTopics(clientId)) // to differentiate from a topic named AllTopics
 
   def getProducerAllTopicsStats(): ProducerTopicMetrics = allTopicsStats
 
   def getProducerTopicStats(topic: String): ProducerTopicMetrics = {
-    stats.getAndMaybePut(new ClientIdAndTopic(clientId, topic + "-"))
+    stats.getAndMaybePut(new ClientIdAndTopic(clientId, topic))
   }
 }
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/117a02de/core/src/main/scala/kafka/producer/SyncProducer.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/producer/SyncProducer.scala b/core/src/main/scala/kafka/producer/SyncProducer.scala
index 35e9e8c..0f09951 100644
--- a/core/src/main/scala/kafka/producer/SyncProducer.scala
+++ b/core/src/main/scala/kafka/producer/SyncProducer.scala
@@ -5,7 +5,7 @@
  * The ASF licenses this file to You under the Apache License, Version 2.0
  * (the "License"); you may not use this file except in compliance with
  * the License.  You may obtain a copy of the License at
- * 
+ *
  *    http://www.apache.org/licenses/LICENSE-2.0
  *
  * Unless required by applicable law or agreed to in writing, software
@@ -39,7 +39,6 @@ class SyncProducer(val config: SyncProducerConfig) extends Logging {
   @volatile private var shutdown: Boolean = false
   private val blockingChannel = new BlockingChannel(config.host, config.port, BlockingChannel.UseDefaultBufferSize,
     config.sendBufferBytes, config.requestTimeoutMs)
-  val brokerInfo = "host_%s-port_%s".format(config.host, config.port)
   val producerRequestStats = ProducerRequestStatsRegistry.getProducerRequestStats(config.clientId)
 
   trace("Instantiating Scala Sync Producer with properties: %s".format(config.props))
@@ -93,11 +92,11 @@ class SyncProducer(val config: SyncProducerConfig) extends Logging {
    */
   def send(producerRequest: ProducerRequest): ProducerResponse = {
     val requestSize = producerRequest.sizeInBytes
-    producerRequestStats.getProducerRequestStats(brokerInfo).requestSizeHist.update(requestSize)
+    producerRequestStats.getProducerRequestStats(config.host, config.port).requestSizeHist.update(requestSize)
     producerRequestStats.getProducerRequestAllBrokersStats.requestSizeHist.update(requestSize)
 
     var response: Receive = null
-    val specificTimer = producerRequestStats.getProducerRequestStats(brokerInfo).requestTimer
+    val specificTimer = producerRequestStats.getProducerRequestStats(config.host, config.port).requestTimer
     val aggregateTimer = producerRequestStats.getProducerRequestAllBrokersStats.requestTimer
     aggregateTimer.time {
       specificTimer.time {
@@ -134,7 +133,7 @@ class SyncProducer(val config: SyncProducerConfig) extends Logging {
       case e: Exception => error("Error on disconnect: ", e)
     }
   }
-    
+
   private def connect(): BlockingChannel = {
     if (!blockingChannel.isConnected && !shutdown) {
       try {
@@ -156,5 +155,4 @@ class SyncProducer(val config: SyncProducerConfig) extends Logging {
       connect()
     }
   }
-}
-
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/kafka/blob/117a02de/core/src/main/scala/kafka/producer/async/ProducerSendThread.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/producer/async/ProducerSendThread.scala b/core/src/main/scala/kafka/producer/async/ProducerSendThread.scala
index 42e9c74..2ccf82a 100644
--- a/core/src/main/scala/kafka/producer/async/ProducerSendThread.scala
+++ b/core/src/main/scala/kafka/producer/async/ProducerSendThread.scala
@@ -34,10 +34,11 @@ class ProducerSendThread[K,V](val threadName: String,
   private val shutdownLatch = new CountDownLatch(1)
   private val shutdownCommand = new KeyedMessage[K,V]("shutdown", null.asInstanceOf[K], null.asInstanceOf[V])
 
-  newGauge(clientId + "-ProducerQueueSize",
+  newGauge("ProducerQueueSize",
           new Gauge[Int] {
             def value = queue.size
-          })
+          },
+          Map("clientId" -> clientId))
 
   override def run {
     try {

http://git-wip-us.apache.org/repos/asf/kafka/blob/117a02de/core/src/main/scala/kafka/server/AbstractFetcherManager.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/server/AbstractFetcherManager.scala b/core/src/main/scala/kafka/server/AbstractFetcherManager.scala
index 9390edf..20c00cb 100644
--- a/core/src/main/scala/kafka/server/AbstractFetcherManager.scala
+++ b/core/src/main/scala/kafka/server/AbstractFetcherManager.scala
@@ -26,7 +26,7 @@ import kafka.metrics.KafkaMetricsGroup
 import kafka.common.TopicAndPartition
 import com.yammer.metrics.core.Gauge
 
-abstract class AbstractFetcherManager(protected val name: String, metricPrefix: String, numFetchers: Int = 1)
+abstract class AbstractFetcherManager(protected val name: String, clientId: String, numFetchers: Int = 1)
   extends Logging with KafkaMetricsGroup {
   // map of (source broker_id, fetcher_id per source broker) => fetcher
   private val fetcherThreadMap = new mutable.HashMap[BrokerAndFetcherId, AbstractFetcherThread]
@@ -34,7 +34,7 @@ abstract class AbstractFetcherManager(protected val name: String, metricPrefix:
   this.logIdent = "[" + name + "] "
 
   newGauge(
-    metricPrefix + "-MaxLag",
+    "MaxLag",
     new Gauge[Long] {
       // current max lag across all fetchers/topics/partitions
       def value = fetcherThreadMap.foldLeft(0L)((curMaxAll, fetcherThreadMapEntry) => {
@@ -42,24 +42,25 @@ abstract class AbstractFetcherManager(protected val name: String, metricPrefix:
           curMaxThread.max(fetcherLagStatsEntry._2.lag)
         }).max(curMaxAll)
       })
-    }
+    },
+    Map("clientId" -> clientId)
   )
 
   newGauge(
-    metricPrefix + "-MinFetchRate",
-    {
-      new Gauge[Double] {
-        // current min fetch rate across all fetchers/topics/partitions
-        def value = {
-          val headRate: Double =
-            fetcherThreadMap.headOption.map(_._2.fetcherStats.requestRate.oneMinuteRate).getOrElse(0)
-
-          fetcherThreadMap.foldLeft(headRate)((curMinAll, fetcherThreadMapEntry) => {
-            fetcherThreadMapEntry._2.fetcherStats.requestRate.oneMinuteRate.min(curMinAll)
-          })
-        }
+  "MinFetchRate", {
+    new Gauge[Double] {
+      // current min fetch rate across all fetchers/topics/partitions
+      def value = {
+        val headRate: Double =
+          fetcherThreadMap.headOption.map(_._2.fetcherStats.requestRate.oneMinuteRate).getOrElse(0)
+
+        fetcherThreadMap.foldLeft(headRate)((curMinAll, fetcherThreadMapEntry) => {
+          fetcherThreadMapEntry._2.fetcherStats.requestRate.oneMinuteRate.min(curMinAll)
+        })
       }
     }
+  },
+  Map("clientId" -> clientId)
   )
 
   private def getFetcherId(topic: String, partitionId: Int) : Int = {

http://git-wip-us.apache.org/repos/asf/kafka/blob/117a02de/core/src/main/scala/kafka/server/AbstractFetcherThread.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/server/AbstractFetcherThread.scala b/core/src/main/scala/kafka/server/AbstractFetcherThread.scala
index 2e9532e..8c281d4 100644
--- a/core/src/main/scala/kafka/server/AbstractFetcherThread.scala
+++ b/core/src/main/scala/kafka/server/AbstractFetcherThread.scala
@@ -26,9 +26,7 @@ import kafka.utils.Utils.inLock
 import kafka.message.{InvalidMessageException, ByteBufferMessageSet, MessageAndOffset}
 import kafka.metrics.KafkaMetricsGroup
 
-import scala.collection.mutable
-import scala.collection.Set
-import scala.collection.Map
+import scala.collection.{mutable, Set, Map}
 import java.util.concurrent.TimeUnit
 import java.util.concurrent.locks.ReentrantLock
 import java.util.concurrent.atomic.AtomicLong
@@ -46,8 +44,7 @@ abstract class AbstractFetcherThread(name: String, clientId: String, sourceBroke
   private val partitionMapLock = new ReentrantLock
   private val partitionMapCond = partitionMapLock.newCondition()
   val simpleConsumer = new SimpleConsumer(sourceBroker.host, sourceBroker.port, socketTimeout, socketBufferSize, clientId)
-  private val brokerInfo = "host_%s-port_%s".format(sourceBroker.host, sourceBroker.port)
-  private val metricId = new ClientIdAndBroker(clientId, brokerInfo)
+  private val metricId = new ClientIdAndBroker(clientId, sourceBroker.host, sourceBroker.port)
   val fetcherStats = new FetcherStats(metricId)
   val fetcherLagStats = new FetcherLagStats(metricId)
   val fetchRequestBuilder = new FetchRequestBuilder().
@@ -204,13 +201,15 @@ abstract class AbstractFetcherThread(name: String, clientId: String, sourceBroke
   }
 }
 
-class FetcherLagMetrics(metricId: ClientIdBrokerTopicPartition) extends KafkaMetricsGroup {
+class FetcherLagMetrics(metricId: ClientIdTopicPartition) extends KafkaMetricsGroup {
   private[this] val lagVal = new AtomicLong(-1L)
-  newGauge(
-    metricId + "-ConsumerLag",
+  newGauge("ConsumerLag",
     new Gauge[Long] {
       def value = lagVal.get
-    }
+    },
+    Map("clientId" -> metricId.clientId,
+      "topic" -> metricId.topic,
+      "partition" -> metricId.partitionId.toString)
   )
 
   def lag_=(newLag: Long) {
@@ -221,20 +220,25 @@ class FetcherLagMetrics(metricId: ClientIdBrokerTopicPartition) extends KafkaMet
 }
 
 class FetcherLagStats(metricId: ClientIdAndBroker) {
-  private val valueFactory = (k: ClientIdBrokerTopicPartition) => new FetcherLagMetrics(k)
-  val stats = new Pool[ClientIdBrokerTopicPartition, FetcherLagMetrics](Some(valueFactory))
+  private val valueFactory = (k: ClientIdTopicPartition) => new FetcherLagMetrics(k)
+  val stats = new Pool[ClientIdTopicPartition, FetcherLagMetrics](Some(valueFactory))
 
   def getFetcherLagStats(topic: String, partitionId: Int): FetcherLagMetrics = {
-    stats.getAndMaybePut(new ClientIdBrokerTopicPartition(metricId.clientId, metricId.brokerInfo, topic, partitionId))
+    stats.getAndMaybePut(new ClientIdTopicPartition(metricId.clientId, topic, partitionId))
   }
 }
 
 class FetcherStats(metricId: ClientIdAndBroker) extends KafkaMetricsGroup {
-  val requestRate = newMeter(metricId + "-RequestsPerSec", "requests", TimeUnit.SECONDS)
-  val byteRate = newMeter(metricId + "-BytesPerSec", "bytes", TimeUnit.SECONDS)
+  val tags = Map("clientId" -> metricId.clientId,
+    "brokerHost" -> metricId.brokerHost,
+    "brokerPort" -> metricId.brokerPort.toString)
+
+  val requestRate = newMeter("RequestsPerSec", "requests", TimeUnit.SECONDS, tags)
+
+  val byteRate = newMeter("BytesPerSec", "bytes", TimeUnit.SECONDS, tags)
 }
 
-case class ClientIdBrokerTopicPartition(clientId: String, brokerInfo: String, topic: String, partitionId: Int) {
-  override def toString = "%s-%s-%s-%d".format(clientId, brokerInfo, topic, partitionId)
+case class ClientIdTopicPartition(clientId: String, topic: String, partitionId: Int) {
+  override def toString = "%s-%s-%d".format(clientId, topic, partitionId)
 }
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/117a02de/core/src/main/scala/kafka/server/DelayedFetch.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/server/DelayedFetch.scala b/core/src/main/scala/kafka/server/DelayedFetch.scala
index e0f14e2..7861240 100644
--- a/core/src/main/scala/kafka/server/DelayedFetch.scala
+++ b/core/src/main/scala/kafka/server/DelayedFetch.scala
@@ -37,7 +37,7 @@ import scala.collection.Seq
  *   - should return whatever data is available.
  */
 
-class DelayedFetch(override val keys: Seq[TopicPartitionRequestKey],
+class DelayedFetch(override val keys: Seq[TopicAndPartition],
                    override val request: RequestChannel.Request,
                    override val delayMs: Long,
                    val fetch: FetchRequest,

http://git-wip-us.apache.org/repos/asf/kafka/blob/117a02de/core/src/main/scala/kafka/server/DelayedProduce.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/server/DelayedProduce.scala b/core/src/main/scala/kafka/server/DelayedProduce.scala
index 9481508..a647761 100644
--- a/core/src/main/scala/kafka/server/DelayedProduce.scala
+++ b/core/src/main/scala/kafka/server/DelayedProduce.scala
@@ -35,7 +35,7 @@ import scala.collection.Seq
   *   B.2 - else, at least requiredAcks replicas should be caught up to this request.
   */
 
-class DelayedProduce(override val keys: Seq[TopicPartitionRequestKey],
+class DelayedProduce(override val keys: Seq[TopicAndPartition],
                      override val request: RequestChannel.Request,
                      override val delayMs: Long,
                      val produce: ProducerRequest,

http://git-wip-us.apache.org/repos/asf/kafka/blob/117a02de/core/src/main/scala/kafka/server/DelayedRequestKey.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/server/DelayedRequestKey.scala b/core/src/main/scala/kafka/server/DelayedRequestKey.scala
deleted file mode 100644
index 628ef59..0000000
--- a/core/src/main/scala/kafka/server/DelayedRequestKey.scala
+++ /dev/null
@@ -1,38 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package kafka.server
-
-import kafka.common.TopicAndPartition
-
-/**
- * Keys used for delayed request metrics recording
- */
-trait DelayedRequestKey {
-  def keyLabel: String
-}
-
-object DelayedRequestKey {
-  val globalLabel = "All"
-}
-
-case class TopicPartitionRequestKey(topic: String, partition: Int) extends DelayedRequestKey {
-
-  def this(topicAndPartition: TopicAndPartition) = this(topicAndPartition.topic, topicAndPartition.partition)
-
-  override def keyLabel = "%s-%d".format(topic, partition)
-}

http://git-wip-us.apache.org/repos/asf/kafka/blob/117a02de/core/src/main/scala/kafka/server/KafkaApis.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/server/KafkaApis.scala b/core/src/main/scala/kafka/server/KafkaApis.scala
index 85498b4..2f00992 100644
--- a/core/src/main/scala/kafka/server/KafkaApis.scala
+++ b/core/src/main/scala/kafka/server/KafkaApis.scala
@@ -201,8 +201,7 @@ class KafkaApis(val requestChannel: RequestChannel,
       requestChannel.sendResponse(new RequestChannel.Response(request, new BoundedByteBufferSend(response)))
     } else {
       // create a list of (topic, partition) pairs to use as keys for this delayed request
-      val producerRequestKeys = produceRequest.data.keys.map(
-        topicAndPartition => new TopicPartitionRequestKey(topicAndPartition)).toSeq
+      val producerRequestKeys = produceRequest.data.keys.toSeq
       val statuses = localProduceResults.map(r =>
         r.key -> DelayedProduceResponseStatus(r.end + 1, ProducerResponseStatus(r.errorCode, r.start))).toMap
       val delayedRequest =  new DelayedProduce(
@@ -330,7 +329,7 @@ class KafkaApis(val requestChannel: RequestChannel,
       debug("Putting fetch request with correlation id %d from client %s into purgatory".format(fetchRequest.correlationId,
         fetchRequest.clientId))
       // create a list of (topic, partition) pairs to use as keys for this delayed request
-      val delayedFetchKeys = fetchRequest.requestInfo.keys.toSeq.map(new TopicPartitionRequestKey(_))
+      val delayedFetchKeys = fetchRequest.requestInfo.keys.toSeq
       val delayedFetch = new DelayedFetch(delayedFetchKeys, request, fetchRequest.maxWait, fetchRequest,
         dataRead.mapValues(_.offset))
 
@@ -350,7 +349,7 @@ class KafkaApis(val requestChannel: RequestChannel,
 
         // for producer requests with ack > 1, we need to check
         // if they can be unblocked after some follower's log end offsets have moved
-        replicaManager.unblockDelayedProduceRequests(new TopicPartitionRequestKey(topicAndPartition))
+        replicaManager.unblockDelayedProduceRequests(topicAndPartition)
     }
   }
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/117a02de/core/src/main/scala/kafka/server/KafkaRequestHandler.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/server/KafkaRequestHandler.scala b/core/src/main/scala/kafka/server/KafkaRequestHandler.scala
index 00bcc06..e4053fb 100644
--- a/core/src/main/scala/kafka/server/KafkaRequestHandler.scala
+++ b/core/src/main/scala/kafka/server/KafkaRequestHandler.scala
@@ -93,23 +93,28 @@ class KafkaRequestHandlerPool(val brokerId: Int,
   }
 }
 
-class BrokerTopicMetrics(name: String) extends KafkaMetricsGroup {
-  val messagesInRate = newMeter(name + "MessagesInPerSec",  "messages", TimeUnit.SECONDS)
-  val bytesInRate = newMeter(name + "BytesInPerSec",  "bytes", TimeUnit.SECONDS)
-  val bytesOutRate = newMeter(name + "BytesOutPerSec",  "bytes", TimeUnit.SECONDS)
-  val bytesRejectedRate = newMeter(name + "BytesRejectedPerSec",  "bytes", TimeUnit.SECONDS)
-  val failedProduceRequestRate = newMeter(name + "FailedProduceRequestsPerSec",  "requests", TimeUnit.SECONDS)
-  val failedFetchRequestRate = newMeter(name + "FailedFetchRequestsPerSec",  "requests", TimeUnit.SECONDS)
+class BrokerTopicMetrics(name: Option[String]) extends KafkaMetricsGroup {
+  val tags: scala.collection.Map[String, String] = name match {
+    case None => scala.collection.Map.empty
+    case Some(topic) => Map("topic" -> topic)
+  }
+
+  val messagesInRate = newMeter("MessagesInPerSec", "messages", TimeUnit.SECONDS, tags)
+  val bytesInRate = newMeter("BytesInPerSec", "bytes", TimeUnit.SECONDS, tags)
+  val bytesOutRate = newMeter("BytesOutPerSec", "bytes", TimeUnit.SECONDS, tags)
+  val bytesRejectedRate = newMeter("BytesRejectedPerSec", "bytes", TimeUnit.SECONDS, tags)
+  val failedProduceRequestRate = newMeter("FailedProduceRequestsPerSec", "requests", TimeUnit.SECONDS, tags)
+  val failedFetchRequestRate = newMeter("FailedFetchRequestsPerSec", "requests", TimeUnit.SECONDS, tags)
 }
 
 object BrokerTopicStats extends Logging {
-  private val valueFactory = (k: String) => new BrokerTopicMetrics(k)
+  private val valueFactory = (k: String) => new BrokerTopicMetrics(Some(k))
   private val stats = new Pool[String, BrokerTopicMetrics](Some(valueFactory))
-  private val allTopicsStats = new BrokerTopicMetrics("AllTopics")
+  private val allTopicsStats = new BrokerTopicMetrics(None)
 
   def getBrokerAllTopicsStats(): BrokerTopicMetrics = allTopicsStats
 
   def getBrokerTopicStats(topic: String): BrokerTopicMetrics = {
-    stats.getAndMaybePut(topic + "-")
+    stats.getAndMaybePut(topic)
   }
 }

http://git-wip-us.apache.org/repos/asf/kafka/blob/117a02de/core/src/main/scala/kafka/server/KafkaServer.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/server/KafkaServer.scala b/core/src/main/scala/kafka/server/KafkaServer.scala
index 4de8123..1bf7d10 100644
--- a/core/src/main/scala/kafka/server/KafkaServer.scala
+++ b/core/src/main/scala/kafka/server/KafkaServer.scala
@@ -25,7 +25,6 @@ import kafka.utils._
 import java.util.concurrent._
 import atomic.{AtomicInteger, AtomicBoolean}
 import java.io.File
-import java.net.BindException
 import org.I0Itec.zkclient.ZkClient
 import kafka.controller.{ControllerStats, KafkaController}
 import kafka.cluster.Broker

http://git-wip-us.apache.org/repos/asf/kafka/blob/117a02de/core/src/main/scala/kafka/server/KafkaServerStartable.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/server/KafkaServerStartable.scala b/core/src/main/scala/kafka/server/KafkaServerStartable.scala
index cd64bbe..1c1b75b 100644
--- a/core/src/main/scala/kafka/server/KafkaServerStartable.scala
+++ b/core/src/main/scala/kafka/server/KafkaServerStartable.scala
@@ -17,6 +17,7 @@
 
 package kafka.server
 
+import kafka.common.AppInfo
 import kafka.utils.Logging
 
 
@@ -26,6 +27,7 @@ class KafkaServerStartable(val serverConfig: KafkaConfig) extends Logging {
   def startup() {
     try {
       server.startup()
+      AppInfo.registerInfo()
     }
     catch {
       case e: Throwable =>

http://git-wip-us.apache.org/repos/asf/kafka/blob/117a02de/core/src/main/scala/kafka/server/ProducerRequestPurgatory.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/server/ProducerRequestPurgatory.scala b/core/src/main/scala/kafka/server/ProducerRequestPurgatory.scala
index d4a7d4a..e7ff411 100644
--- a/core/src/main/scala/kafka/server/ProducerRequestPurgatory.scala
+++ b/core/src/main/scala/kafka/server/ProducerRequestPurgatory.scala
@@ -17,6 +17,7 @@
 
 package kafka.server
 
+import kafka.common.TopicAndPartition
 import kafka.metrics.KafkaMetricsGroup
 import kafka.utils.Pool
 import kafka.network.{BoundedByteBufferSend, RequestChannel}
@@ -30,19 +31,24 @@ class ProducerRequestPurgatory(replicaManager: ReplicaManager, offsetManager: Of
   extends RequestPurgatory[DelayedProduce](replicaManager.config.brokerId, replicaManager.config.producerPurgatoryPurgeIntervalRequests) {
   this.logIdent = "[ProducerRequestPurgatory-%d] ".format(replicaManager.config.brokerId)
 
-  private class DelayedProducerRequestMetrics(keyLabel: String = DelayedRequestKey.globalLabel) extends KafkaMetricsGroup {
-    val expiredRequestMeter = newMeter(keyLabel + "ExpiresPerSecond", "requests", TimeUnit.SECONDS)
+  private class DelayedProducerRequestMetrics(metricId: Option[TopicAndPartition]) extends KafkaMetricsGroup {
+    val tags: scala.collection.Map[String, String] = metricId match {
+      case Some(topicAndPartition) => Map("topic" -> topicAndPartition.topic, "partition" -> topicAndPartition.partition.toString)
+      case None => Map.empty
+    }
+
+    val expiredRequestMeter = newMeter("ExpiresPerSecond", "requests", TimeUnit.SECONDS, tags)
   }
 
   private val producerRequestMetricsForKey = {
-    val valueFactory = (k: DelayedRequestKey) => new DelayedProducerRequestMetrics(k.keyLabel + "-")
-    new Pool[DelayedRequestKey, DelayedProducerRequestMetrics](Some(valueFactory))
+    val valueFactory = (k: TopicAndPartition) => new DelayedProducerRequestMetrics(Some(k))
+    new Pool[TopicAndPartition, DelayedProducerRequestMetrics](Some(valueFactory))
   }
 
-  private val aggregateProduceRequestMetrics = new DelayedProducerRequestMetrics
+  private val aggregateProduceRequestMetrics = new DelayedProducerRequestMetrics(None)
 
-  private def recordDelayedProducerKeyExpired(key: DelayedRequestKey) {
-    val keyMetrics = producerRequestMetricsForKey.getAndMaybePut(key)
+  private def recordDelayedProducerKeyExpired(metricId: TopicAndPartition) {
+    val keyMetrics = producerRequestMetricsForKey.getAndMaybePut(metricId)
     List(keyMetrics, aggregateProduceRequestMetrics).foreach(_.expiredRequestMeter.mark())
   }
 
@@ -57,7 +63,7 @@ class ProducerRequestPurgatory(replicaManager: ReplicaManager, offsetManager: Of
   def expire(delayedProduce: DelayedProduce) {
     debug("Expiring produce request %s.".format(delayedProduce.produce))
     for ((topicPartition, responseStatus) <- delayedProduce.partitionStatus if responseStatus.acksPending)
-      recordDelayedProducerKeyExpired(new TopicPartitionRequestKey(topicPartition))
+      recordDelayedProducerKeyExpired(topicPartition)
     respond(delayedProduce)
   }
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/117a02de/core/src/main/scala/kafka/server/ReplicaManager.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/server/ReplicaManager.scala b/core/src/main/scala/kafka/server/ReplicaManager.scala
index 0637fab..1afb0cb 100644
--- a/core/src/main/scala/kafka/server/ReplicaManager.scala
+++ b/core/src/main/scala/kafka/server/ReplicaManager.scala
@@ -113,10 +113,10 @@ class ReplicaManager(val config: KafkaConfig,
   /**
    * Unblock some delayed produce requests with the request key
    */
-  def unblockDelayedProduceRequests(key: DelayedRequestKey) {
+  def unblockDelayedProduceRequests(key: TopicAndPartition) {
     val satisfied = producerRequestPurgatory.update(key)
     debug("Request key %s unblocked %d producer requests."
-      .format(key.keyLabel, satisfied.size))
+      .format(key, satisfied.size))
 
     // send any newly unblocked responses
     satisfied.foreach(producerRequestPurgatory.respond(_))
@@ -125,9 +125,9 @@ class ReplicaManager(val config: KafkaConfig,
   /**
    * Unblock some delayed fetch requests with the request key
    */
-  def unblockDelayedFetchRequests(key: DelayedRequestKey) {
+  def unblockDelayedFetchRequests(key: TopicAndPartition) {
     val satisfied = fetchRequestPurgatory.update(key)
-    debug("Request key %s unblocked %d fetch requests.".format(key.keyLabel, satisfied.size))
+    debug("Request key %s unblocked %d fetch requests.".format(key, satisfied.size))
 
     // send any newly unblocked responses
     satisfied.foreach(fetchRequestPurgatory.respond(_))

http://git-wip-us.apache.org/repos/asf/kafka/blob/117a02de/core/src/main/scala/kafka/tools/ProducerPerformance.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/tools/ProducerPerformance.scala b/core/src/main/scala/kafka/tools/ProducerPerformance.scala
index f61c7c7..f2dc4ed 100644
--- a/core/src/main/scala/kafka/tools/ProducerPerformance.scala
+++ b/core/src/main/scala/kafka/tools/ProducerPerformance.scala
@@ -28,7 +28,6 @@ import java.util.concurrent.atomic.AtomicLong
 import java.util._
 import java.text.SimpleDateFormat
 import java.math.BigInteger
-import scala.collection.immutable.List
 
 import org.apache.log4j.Logger
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/117a02de/core/src/test/scala/unit/kafka/consumer/ZookeeperConsumerConnectorTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/consumer/ZookeeperConsumerConnectorTest.scala b/core/src/test/scala/unit/kafka/consumer/ZookeeperConsumerConnectorTest.scala
index e1d8711..bad099a 100644
--- a/core/src/test/scala/unit/kafka/consumer/ZookeeperConsumerConnectorTest.scala
+++ b/core/src/test/scala/unit/kafka/consumer/ZookeeperConsumerConnectorTest.scala
@@ -26,8 +26,7 @@ import kafka.message._
 import kafka.serializer._
 import org.I0Itec.zkclient.ZkClient
 import kafka.utils._
-import kafka.producer.{ProducerConfig, KeyedMessage, Producer}
-import java.util.{Collections, Properties}
+import java.util.Collections
 import org.apache.log4j.{Logger, Level}
 import kafka.utils.TestUtils._
 import kafka.common.MessageStreamsExistException
@@ -89,8 +88,8 @@ class ZookeeperConsumerConnectorTest extends JUnit3Suite with KafkaServerTestHar
     zkConsumerConnector0.shutdown
 
     // send some messages to each broker
-    val sentMessages1 = sendMessagesToBrokerPartition(configs.head, topic, 0, nMessages) ++
-                        sendMessagesToBrokerPartition(configs.last, topic, 1, nMessages)
+    val sentMessages1 = sendMessagesToPartition(configs, topic, 0, nMessages) ++
+                        sendMessagesToPartition(configs, topic, 1, nMessages)
 
     // wait to make sure the topic and partition have a leader for the successful case
     waitUntilLeaderIsElectedOrChanged(zkClient, topic, 0)
@@ -123,8 +122,8 @@ class ZookeeperConsumerConnectorTest extends JUnit3Suite with KafkaServerTestHar
     val zkConsumerConnector2 = new ZookeeperConsumerConnector(consumerConfig2, true)
     val topicMessageStreams2 = zkConsumerConnector2.createMessageStreams(Map(topic -> 1), new StringDecoder(), new StringDecoder())
     // send some messages to each broker
-    val sentMessages2 = sendMessagesToBrokerPartition(configs.head, topic, 0, nMessages) ++
-                        sendMessagesToBrokerPartition(configs.last, topic, 1, nMessages)
+    val sentMessages2 = sendMessagesToPartition(configs, topic, 0, nMessages) ++
+                        sendMessagesToPartition(configs, topic, 1, nMessages)
 
     waitUntilLeaderIsElectedOrChanged(zkClient, topic, 0)
     waitUntilLeaderIsElectedOrChanged(zkClient, topic, 1)
@@ -144,8 +143,8 @@ class ZookeeperConsumerConnectorTest extends JUnit3Suite with KafkaServerTestHar
     val zkConsumerConnector3 = new ZookeeperConsumerConnector(consumerConfig3, true)
     val topicMessageStreams3 = zkConsumerConnector3.createMessageStreams(new mutable.HashMap[String, Int]())
     // send some messages to each broker
-    val sentMessages3 = sendMessagesToBrokerPartition(configs.head, topic, 0, nMessages) ++
-                        sendMessagesToBrokerPartition(configs.last, topic, 1, nMessages)
+    val sentMessages3 = sendMessagesToPartition(configs, topic, 0, nMessages) ++
+                        sendMessagesToPartition(configs, topic, 1, nMessages)
 
     waitUntilLeaderIsElectedOrChanged(zkClient, topic, 0)
     waitUntilLeaderIsElectedOrChanged(zkClient, topic, 1)
@@ -178,8 +177,8 @@ class ZookeeperConsumerConnectorTest extends JUnit3Suite with KafkaServerTestHar
     requestHandlerLogger.setLevel(Level.FATAL)
 
     // send some messages to each broker
-    val sentMessages1 = sendMessagesToBrokerPartition(configs.head, topic, 0, nMessages, GZIPCompressionCodec) ++
-                        sendMessagesToBrokerPartition(configs.last, topic, 1, nMessages, GZIPCompressionCodec)
+    val sentMessages1 = sendMessagesToPartition(configs, topic, 0, nMessages, GZIPCompressionCodec) ++
+                        sendMessagesToPartition(configs, topic, 1, nMessages, GZIPCompressionCodec)
 
     waitUntilLeaderIsElectedOrChanged(zkClient, topic, 0)
     waitUntilLeaderIsElectedOrChanged(zkClient, topic, 1)
@@ -211,8 +210,8 @@ class ZookeeperConsumerConnectorTest extends JUnit3Suite with KafkaServerTestHar
     val zkConsumerConnector2 = new ZookeeperConsumerConnector(consumerConfig2, true)
     val topicMessageStreams2 = zkConsumerConnector2.createMessageStreams(Map(topic -> 1), new StringDecoder(), new StringDecoder())
     // send some messages to each broker
-    val sentMessages2 = sendMessagesToBrokerPartition(configs.head, topic, 0, nMessages, GZIPCompressionCodec) ++
-                        sendMessagesToBrokerPartition(configs.last, topic, 1, nMessages, GZIPCompressionCodec)
+    val sentMessages2 = sendMessagesToPartition(configs, topic, 0, nMessages, GZIPCompressionCodec) ++
+                        sendMessagesToPartition(configs, topic, 1, nMessages, GZIPCompressionCodec)
 
     waitUntilLeaderIsElectedOrChanged(zkClient, topic, 0)
     waitUntilLeaderIsElectedOrChanged(zkClient, topic, 1)
@@ -232,8 +231,8 @@ class ZookeeperConsumerConnectorTest extends JUnit3Suite with KafkaServerTestHar
     val zkConsumerConnector3 = new ZookeeperConsumerConnector(consumerConfig3, true)
     val topicMessageStreams3 = zkConsumerConnector3.createMessageStreams(new mutable.HashMap[String, Int](), new StringDecoder(), new StringDecoder())
     // send some messages to each broker
-    val sentMessages3 = sendMessagesToBrokerPartition(configs.head, topic, 0, nMessages, GZIPCompressionCodec) ++
-                        sendMessagesToBrokerPartition(configs.last, topic, 1, nMessages, GZIPCompressionCodec)
+    val sentMessages3 = sendMessagesToPartition(configs, topic, 0, nMessages, GZIPCompressionCodec) ++
+                        sendMessagesToPartition(configs, topic, 1, nMessages, GZIPCompressionCodec)
 
     waitUntilLeaderIsElectedOrChanged(zkClient, topic, 0)
     waitUntilLeaderIsElectedOrChanged(zkClient, topic, 1)
@@ -254,8 +253,8 @@ class ZookeeperConsumerConnectorTest extends JUnit3Suite with KafkaServerTestHar
 
   def testCompressionSetConsumption() {
     // send some messages to each broker
-    val sentMessages = sendMessagesToBrokerPartition(configs.head, topic, 0, 200, DefaultCompressionCodec) ++
-                       sendMessagesToBrokerPartition(configs.last, topic, 1, 200, DefaultCompressionCodec)
+    val sentMessages = sendMessagesToPartition(configs, topic, 0, 200, DefaultCompressionCodec) ++
+                       sendMessagesToPartition(configs, topic, 1, 200, DefaultCompressionCodec)
 
     TestUtils.waitUntilMetadataIsPropagated(servers, topic, 0)
     TestUtils.waitUntilMetadataIsPropagated(servers, topic, 1)
@@ -280,8 +279,8 @@ class ZookeeperConsumerConnectorTest extends JUnit3Suite with KafkaServerTestHar
     requestHandlerLogger.setLevel(Level.FATAL)
 
     // send some messages to each broker
-    val sentMessages = sendMessagesToBrokerPartition(configs.head, topic, 0, nMessages, NoCompressionCodec) ++
-                       sendMessagesToBrokerPartition(configs.last, topic, 1, nMessages, NoCompressionCodec)
+    val sentMessages = sendMessagesToPartition(configs, topic, 0, nMessages, NoCompressionCodec) ++
+                       sendMessagesToPartition(configs, topic, 1, nMessages, NoCompressionCodec)
 
     TestUtils.waitUntilMetadataIsPropagated(servers, topic, 0)
     TestUtils.waitUntilMetadataIsPropagated(servers, topic, 1)
@@ -321,7 +320,7 @@ class ZookeeperConsumerConnectorTest extends JUnit3Suite with KafkaServerTestHar
     createTopic(zkClient, topic, numPartitions = 1, replicationFactor = 1, servers = servers)
 
     // send some messages to each broker
-    val sentMessages1 = sendMessages(configs.head, nMessages, "batch1", NoCompressionCodec, 1)
+    val sentMessages1 = sendMessages(configs, topic, "producer1", nMessages, "batch1", NoCompressionCodec, 1)
 
     // create a consumer
     val consumerConfig1 = new ConsumerConfig(TestUtils.createConsumerProperties(zkConnect, group, consumer1))
@@ -345,70 +344,6 @@ class ZookeeperConsumerConnectorTest extends JUnit3Suite with KafkaServerTestHar
     zkClient.close()
   }
 
-  def sendMessagesToBrokerPartition(config: KafkaConfig,
-                                    topic: String,
-                                    partition: Int,
-                                    numMessages: Int,
-                                    compression: CompressionCodec = NoCompressionCodec): List[String] = {
-    val header = "test-%d-%d".format(config.brokerId, partition)
-    val props = new Properties()
-    props.put("compression.codec", compression.codec.toString)
-    val producer: Producer[Int, String] =
-      createProducer(TestUtils.getBrokerListStrFromConfigs(configs),
-                     encoder = classOf[StringEncoder].getName,
-                     keyEncoder = classOf[IntEncoder].getName,
-                     partitioner = classOf[FixedValuePartitioner].getName,
-                     producerProps = props)
-
-    val ms = 0.until(numMessages).map(x => header + config.brokerId + "-" + partition + "-" + x)
-    producer.send(ms.map(m => new KeyedMessage[Int, String](topic, partition, m)):_*)
-    debug("Sent %d messages to broker %d for partition [%s,%d]".format(ms.size, config.brokerId, topic, partition))
-    producer.close()
-    ms.toList
-  }
-
-  def sendMessages(config: KafkaConfig,
-                   messagesPerNode: Int,
-                   header: String,
-                   compression: CompressionCodec,
-                   numParts: Int): List[String]= {
-    var messages: List[String] = Nil
-    val props = new Properties()
-    props.put("compression.codec", compression.codec.toString)
-    val producer: Producer[Int, String] =
-      createProducer(brokerList = TestUtils.getBrokerListStrFromConfigs(configs),
-                     encoder = classOf[StringEncoder].getName,
-                     keyEncoder = classOf[IntEncoder].getName,
-                     partitioner = classOf[FixedValuePartitioner].getName,
-                     producerProps = props)
-
-    for (partition <- 0 until numParts) {
-      val ms = 0.until(messagesPerNode).map(x => header + config.brokerId + "-" + partition + "-" + x)
-      producer.send(ms.map(m => new KeyedMessage[Int, String](topic, partition, m)):_*)
-      messages ++= ms
-      debug("Sent %d messages to broker %d for partition [%s,%d]".format(ms.size, config.brokerId, topic, partition))
-    }
-    producer.close()
-    messages
-  }
-
-  def getMessages(nMessagesPerThread: Int,
-                  topicMessageStreams: Map[String,List[KafkaStream[String, String]]]): List[String]= {
-    var messages: List[String] = Nil
-    for((topic, messageStreams) <- topicMessageStreams) {
-      for (messageStream <- messageStreams) {
-        val iterator = messageStream.iterator
-        for(i <- 0 until nMessagesPerThread) {
-          assertTrue(iterator.hasNext)
-          val message = iterator.next.message
-          messages ::= message
-          debug("received message: " + message)
-        }
-      }
-    }
-    messages.reverse
-  }
-
   def getZKChildrenValues(path : String) : Seq[Tuple2[String,String]] = {
     val children = zkClient.getChildren(path)
     Collections.sort(children)