You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by jk...@apache.org on 2012/12/18 18:44:12 UTC

[24/30] git commit: KAFKA-622 Create mbeans per client; patched by Swapnil; reviewed by Neha Narkhede

KAFKA-622 Create mbeans per client; patched by Swapnil; reviewed by Neha Narkhede

git-svn-id: https://svn.apache.org/repos/asf/kafka/branches/0.8@1415021 13f79535-47bb-0310-9956-ffa450edef68


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/d7c71c09
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/d7c71c09
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/d7c71c09

Branch: refs/heads/trunk
Commit: d7c71c0949e0cbae8963ca76bc69d64bee8f10af
Parents: e556063
Author: Neha Narkhede <ne...@apache.org>
Authored: Thu Nov 29 01:31:18 2012 +0000
Committer: Neha Narkhede <ne...@apache.org>
Committed: Thu Nov 29 01:31:18 2012 +0000

----------------------------------------------------------------------
 .../src/main/java/kafka/etl/KafkaETLContext.java   |    2 +-
 core/src/main/scala/kafka/api/FetchRequest.scala   |    2 +-
 core/src/main/scala/kafka/client/ClientUtils.scala |    4 +-
 .../kafka/common/InvalidClientIdException.scala    |   22 +++++
 .../kafka/consumer/ConsumerFetcherManager.scala    |    2 +-
 .../scala/kafka/consumer/ConsumerIterator.scala    |    7 +-
 .../scala/kafka/consumer/ConsumerTopicStat.scala   |   40 --------
 .../scala/kafka/consumer/ConsumerTopicStats.scala  |   41 ++++++++
 .../main/scala/kafka/consumer/KafkaStream.scala    |    5 +-
 .../scala/kafka/consumer/PartitionTopicInfo.scala  |    7 +-
 .../main/scala/kafka/consumer/SimpleConsumer.scala |   23 +++--
 .../consumer/ZookeeperConsumerConnector.scala      |   21 +++--
 .../scala/kafka/controller/KafkaController.scala   |    2 +-
 .../kafka/controller/PartitionLeaderSelector.scala |    6 +-
 .../kafka/controller/PartitionStateMachine.scala   |    4 +-
 .../kafka/controller/ReplicaStateMachine.scala     |    2 +-
 .../kafka/javaapi/consumer/SimpleConsumer.scala    |    6 +-
 .../consumer/ZookeeperConsumerConnector.scala      |    1 -
 core/src/main/scala/kafka/log/Log.scala            |    6 +-
 .../kafka/metrics/KafkaCSVMetricsReporter.scala    |    1 -
 .../scala/kafka/producer/BrokerPartitionInfo.scala |    6 +-
 .../scala/kafka/producer/ConsoleProducer.scala     |    1 -
 .../scala/kafka/producer/DefaultPartitioner.scala  |    1 -
 core/src/main/scala/kafka/producer/Producer.scala  |   73 +++++++++------
 .../main/scala/kafka/producer/ProducerPool.scala   |   29 ++++--
 .../main/scala/kafka/producer/SyncProducer.scala   |   14 +--
 .../scala/kafka/producer/SyncProducerConfig.scala  |    2 +-
 .../kafka/producer/async/AsyncProducerStats.scala  |   25 -----
 .../kafka/producer/async/DefaultEventHandler.scala |   14 ++-
 .../kafka/producer/async/ProducerSendThread.scala  |    5 +-
 core/src/main/scala/kafka/serializer/Decoder.scala |    1 -
 core/src/main/scala/kafka/serializer/Encoder.scala |    2 -
 .../scala/kafka/server/AbstractFetcherThread.scala |   45 +++++----
 core/src/main/scala/kafka/server/KafkaApis.scala   |   16 ++--
 .../scala/kafka/server/KafkaRequestHandler.scala   |    8 +-
 core/src/main/scala/kafka/server/KafkaServer.scala |    8 +-
 .../scala/kafka/server/ReplicaFetcherThread.scala  |    2 +-
 .../scala/kafka/tools/ConsumerOffsetChecker.scala  |    2 +-
 .../main/scala/kafka/tools/GetOffsetShell.scala    |    2 +-
 core/src/main/scala/kafka/tools/MirrorMaker.scala  |    1 -
 .../main/scala/kafka/tools/ReplayLogProducer.scala |    2 +-
 .../scala/kafka/tools/SimpleConsumerShell.scala    |    8 +-
 .../main/scala/kafka/tools/UpdateOffsetsInZK.scala |    2 +-
 .../main/scala/kafka/utils/ClientIdAndTopic.scala  |   64 +++++++++++++
 core/src/main/scala/kafka/utils/Topic.scala        |   41 --------
 .../test/scala/other/kafka/TestKafkaAppender.scala |    1 -
 .../scala/other/kafka/TestZKConsumerOffsets.scala  |    1 -
 .../unit/kafka/consumer/ConsumerIteratorTest.scala |    6 +-
 .../kafka/integration/AutoOffsetResetTest.scala    |    1 -
 .../scala/unit/kafka/integration/FetcherTest.scala |    4 +-
 .../kafka/integration/LazyInitProducerTest.scala   |    2 +-
 .../unit/kafka/integration/PrimitiveApiTest.scala  |    1 -
 .../integration/ProducerConsumerTestHarness.scala  |    6 +-
 .../consumer/ZookeeperConsumerConnectorTest.scala  |    2 +-
 .../test/scala/unit/kafka/log/LogOffsetTest.scala  |    2 +-
 .../unit/kafka/log4j/KafkaLog4jAppenderTest.scala  |    3 +-
 .../unit/kafka/producer/AsyncProducerTest.scala    |   65 ++++++++-----
 .../scala/unit/kafka/producer/ProducerTest.scala   |    5 +-
 .../scala/unit/kafka/server/LogRecoveryTest.scala  |    2 -
 .../unit/kafka/server/ServerShutdownTest.scala     |    7 +-
 .../test/scala/unit/kafka/utils/ClientIdTest.scala |   61 ++++++++++++
 .../java/kafka/examples/SimpleConsumerDemo.java    |    3 +-
 .../kafka/perf/SimpleConsumerPerformance.scala     |    2 +-
 63 files changed, 441 insertions(+), 311 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/d7c71c09/contrib/hadoop-consumer/src/main/java/kafka/etl/KafkaETLContext.java
----------------------------------------------------------------------
diff --git a/contrib/hadoop-consumer/src/main/java/kafka/etl/KafkaETLContext.java b/contrib/hadoop-consumer/src/main/java/kafka/etl/KafkaETLContext.java
index b0e75bc..8e98efc 100644
--- a/contrib/hadoop-consumer/src/main/java/kafka/etl/KafkaETLContext.java
+++ b/contrib/hadoop-consumer/src/main/java/kafka/etl/KafkaETLContext.java
@@ -109,7 +109,7 @@ public class KafkaETLContext {
         
         // read data from queue
         URI uri = _request.getURI();
-        _consumer = new SimpleConsumer(uri.getHost(), uri.getPort(), _timeout, _bufferSize);
+        _consumer = new SimpleConsumer(uri.getHost(), uri.getPort(), _timeout, _bufferSize, "KafkaETLContext");
         
         // get available offset range
         _offsetRange = getOffsetRange();

http://git-wip-us.apache.org/repos/asf/kafka/blob/d7c71c09/core/src/main/scala/kafka/api/FetchRequest.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/api/FetchRequest.scala b/core/src/main/scala/kafka/api/FetchRequest.scala
index 9892fb3..4ed071a 100644
--- a/core/src/main/scala/kafka/api/FetchRequest.scala
+++ b/core/src/main/scala/kafka/api/FetchRequest.scala
@@ -33,7 +33,7 @@ object FetchRequest {
   val CurrentVersion = 1.shortValue()
   val DefaultMaxWait = 0
   val DefaultMinBytes = 0
-  val ReplicaFetcherClientId = "replica fetcher"
+  val ReplicaFetcherClientId = "replica-fetcher"
   val DefaultCorrelationId = 0
 
   def readFrom(buffer: ByteBuffer): FetchRequest = {

http://git-wip-us.apache.org/repos/asf/kafka/blob/d7c71c09/core/src/main/scala/kafka/client/ClientUtils.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/client/ClientUtils.scala b/core/src/main/scala/kafka/client/ClientUtils.scala
index aeead2d..e6e7200 100644
--- a/core/src/main/scala/kafka/client/ClientUtils.scala
+++ b/core/src/main/scala/kafka/client/ClientUtils.scala
@@ -12,14 +12,14 @@ import kafka.utils.{Utils, Logging}
  */
 object ClientUtils extends Logging{
 
-  def fetchTopicMetadata(topics: Set[String], brokers: Seq[Broker]): TopicMetadataResponse = {
+  def fetchTopicMetadata(topics: Set[String], clientId: String, brokers: Seq[Broker]): TopicMetadataResponse = {
     var fetchMetaDataSucceeded: Boolean = false
     var i: Int = 0
     val topicMetadataRequest = new TopicMetadataRequest(topics.toSeq)
     var topicMetadataResponse: TopicMetadataResponse = null
     var t: Throwable = null
     while(i < brokers.size && !fetchMetaDataSucceeded) {
-      val producer: SyncProducer = ProducerPool.createSyncProducer(None, brokers(i))
+      val producer: SyncProducer = ProducerPool.createSyncProducer(clientId + "-FetchTopicMetadata", brokers(i))
       info("Fetching metadata for topic %s".format(topics))
       try {
         topicMetadataResponse = producer.send(topicMetadataRequest)

http://git-wip-us.apache.org/repos/asf/kafka/blob/d7c71c09/core/src/main/scala/kafka/common/InvalidClientIdException.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/common/InvalidClientIdException.scala b/core/src/main/scala/kafka/common/InvalidClientIdException.scala
new file mode 100644
index 0000000..edf072d
--- /dev/null
+++ b/core/src/main/scala/kafka/common/InvalidClientIdException.scala
@@ -0,0 +1,22 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package kafka.common
+
+class InvalidClientIdException(message: String) extends RuntimeException(message) {
+  def this() = this(null)
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/d7c71c09/core/src/main/scala/kafka/consumer/ConsumerFetcherManager.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/consumer/ConsumerFetcherManager.scala b/core/src/main/scala/kafka/consumer/ConsumerFetcherManager.scala
index bdc020b..a80fac9 100644
--- a/core/src/main/scala/kafka/consumer/ConsumerFetcherManager.scala
+++ b/core/src/main/scala/kafka/consumer/ConsumerFetcherManager.scala
@@ -54,7 +54,7 @@ class ConsumerFetcherManager(private val consumerIdString: String,
         try {
           trace("Partitions without leader %s".format(noLeaderPartitionSet))
           val brokers = getAllBrokersInCluster(zkClient)
-          val topicsMetadata = ClientUtils.fetchTopicMetadata(noLeaderPartitionSet.map(m => m.topic).toSet, brokers).topicsMetadata
+          val topicsMetadata = ClientUtils.fetchTopicMetadata(noLeaderPartitionSet.map(m => m.topic).toSet, config.clientId, brokers).topicsMetadata
           val leaderForPartitionsMap = new HashMap[TopicAndPartition, Broker]
           topicsMetadata.foreach(
             tmd => {

http://git-wip-us.apache.org/repos/asf/kafka/blob/d7c71c09/core/src/main/scala/kafka/consumer/ConsumerIterator.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/consumer/ConsumerIterator.scala b/core/src/main/scala/kafka/consumer/ConsumerIterator.scala
index fd80104..c5062fc 100644
--- a/core/src/main/scala/kafka/consumer/ConsumerIterator.scala
+++ b/core/src/main/scala/kafka/consumer/ConsumerIterator.scala
@@ -34,7 +34,8 @@ class ConsumerIterator[K, V](private val channel: BlockingQueue[FetchedDataChunk
                              consumerTimeoutMs: Int,
                              private val keyDecoder: Decoder[K],
                              private val valueDecoder: Decoder[V],
-                             val enableShallowIterator: Boolean)
+                             val enableShallowIterator: Boolean,
+                             val consumerTopicStats: ConsumerTopicStats)
   extends IteratorTemplate[MessageAndMetadata[K, V]] with Logging {
 
   private var current: AtomicReference[Iterator[MessageAndOffset]] = new AtomicReference(null)
@@ -48,8 +49,8 @@ class ConsumerIterator[K, V](private val channel: BlockingQueue[FetchedDataChunk
     currentTopicInfo.resetConsumeOffset(consumedOffset)
     val topic = currentTopicInfo.topic
     trace("Setting %s consumed offset to %d".format(topic, consumedOffset))
-    ConsumerTopicStat.getConsumerTopicStat(topic).messageRate.mark()
-    ConsumerTopicStat.getConsumerAllTopicStat().messageRate.mark()
+    consumerTopicStats.getConsumerTopicStats(topic).messageRate.mark()
+    consumerTopicStats.getConsumerAllTopicStats().messageRate.mark()
     item
   }
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/d7c71c09/core/src/main/scala/kafka/consumer/ConsumerTopicStat.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/consumer/ConsumerTopicStat.scala b/core/src/main/scala/kafka/consumer/ConsumerTopicStat.scala
index 2c9c204..e69de29 100644
--- a/core/src/main/scala/kafka/consumer/ConsumerTopicStat.scala
+++ b/core/src/main/scala/kafka/consumer/ConsumerTopicStat.scala
@@ -1,40 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package kafka.consumer
-
-import kafka.utils.{Pool, threadsafe, Logging}
-import java.util.concurrent.TimeUnit
-import kafka.metrics.KafkaMetricsGroup
-
-@threadsafe
-class ConsumerTopicStat(name: String) extends KafkaMetricsGroup {
-  val messageRate = newMeter(name + "MessagesPerSec",  "messages", TimeUnit.SECONDS)
-  val byteRate = newMeter(name + "BytesPerSec",  "bytes", TimeUnit.SECONDS)
-}
-
-object ConsumerTopicStat extends Logging {
-  private val valueFactory = (k: String) => new ConsumerTopicStat(k)
-  private val stats = new Pool[String, ConsumerTopicStat](Some(valueFactory))
-  private val allTopicStat = new ConsumerTopicStat("AllTopics")
-
-  def getConsumerAllTopicStat(): ConsumerTopicStat = allTopicStat
-
-  def getConsumerTopicStat(topic: String): ConsumerTopicStat = {
-    stats.getAndMaybePut(topic + "-")
-  }
-}

http://git-wip-us.apache.org/repos/asf/kafka/blob/d7c71c09/core/src/main/scala/kafka/consumer/ConsumerTopicStats.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/consumer/ConsumerTopicStats.scala b/core/src/main/scala/kafka/consumer/ConsumerTopicStats.scala
new file mode 100644
index 0000000..2a9d9fb
--- /dev/null
+++ b/core/src/main/scala/kafka/consumer/ConsumerTopicStats.scala
@@ -0,0 +1,41 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package kafka.consumer
+
+import kafka.utils.{ClientIdAndTopic, Pool, threadsafe, Logging}
+import java.util.concurrent.TimeUnit
+import kafka.metrics.KafkaMetricsGroup
+
+@threadsafe
+class ConsumerTopicMetrics(clientIdTopic: ClientIdAndTopic) extends KafkaMetricsGroup {
+  val messageRate = newMeter(clientIdTopic + "-MessagesPerSec",  "messages", TimeUnit.SECONDS)
+  val byteRate = newMeter(clientIdTopic + "-BytesPerSec",  "bytes", TimeUnit.SECONDS)
+}
+
+class ConsumerTopicStats(clientId: String) extends Logging {
+  private val valueFactory = (k: ClientIdAndTopic) => new ConsumerTopicMetrics(k)
+  private val stats = new Pool[ClientIdAndTopic, ConsumerTopicMetrics](Some(valueFactory))
+  private val allTopicStats = new ConsumerTopicMetrics(new ClientIdAndTopic(clientId, "AllTopics"))
+
+  def getConsumerAllTopicStats(): ConsumerTopicMetrics = allTopicStats
+
+  def getConsumerTopicStats(topic: String): ConsumerTopicMetrics = {
+    stats.getAndMaybePut(new ClientIdAndTopic(clientId, topic))
+  }
+}
+

http://git-wip-us.apache.org/repos/asf/kafka/blob/d7c71c09/core/src/main/scala/kafka/consumer/KafkaStream.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/consumer/KafkaStream.scala b/core/src/main/scala/kafka/consumer/KafkaStream.scala
index 115d41a..569f6df 100644
--- a/core/src/main/scala/kafka/consumer/KafkaStream.scala
+++ b/core/src/main/scala/kafka/consumer/KafkaStream.scala
@@ -26,11 +26,12 @@ class KafkaStream[K,V](private val queue: BlockingQueue[FetchedDataChunk],
                         consumerTimeoutMs: Int,
                         private val keyDecoder: Decoder[K],
                         private val valueDecoder: Decoder[V],
-                        val enableShallowIterator: Boolean)
+                        val enableShallowIterator: Boolean,
+                        val consumerTopicStats: ConsumerTopicStats)
    extends Iterable[MessageAndMetadata[K,V]] with java.lang.Iterable[MessageAndMetadata[K,V]] {
 
   private val iter: ConsumerIterator[K,V] =
-    new ConsumerIterator[K,V](queue, consumerTimeoutMs, keyDecoder, valueDecoder, enableShallowIterator)
+    new ConsumerIterator[K,V](queue, consumerTimeoutMs, keyDecoder, valueDecoder, enableShallowIterator, consumerTopicStats)
 
   /**
    *  Create an iterator over messages in the stream.

http://git-wip-us.apache.org/repos/asf/kafka/blob/d7c71c09/core/src/main/scala/kafka/consumer/PartitionTopicInfo.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/consumer/PartitionTopicInfo.scala b/core/src/main/scala/kafka/consumer/PartitionTopicInfo.scala
index 5249ddc..8c42d11 100644
--- a/core/src/main/scala/kafka/consumer/PartitionTopicInfo.scala
+++ b/core/src/main/scala/kafka/consumer/PartitionTopicInfo.scala
@@ -28,7 +28,8 @@ class PartitionTopicInfo(val topic: String,
                          private val chunkQueue: BlockingQueue[FetchedDataChunk],
                          private val consumedOffset: AtomicLong,
                          private val fetchedOffset: AtomicLong,
-                         private val fetchSize: AtomicInteger) extends Logging {
+                         private val fetchSize: AtomicInteger,
+                         private val consumerTopicStats: ConsumerTopicStats) extends Logging {
 
   debug("initial consumer offset of " + this + " is " + consumedOffset.get)
   debug("initial fetch offset of " + this + " is " + fetchedOffset.get)
@@ -58,8 +59,8 @@ class PartitionTopicInfo(val topic: String,
       chunkQueue.put(new FetchedDataChunk(messages, this, fetchedOffset.get))
       fetchedOffset.set(next)
       debug("updated fetch offset of (%s) to %d".format(this, next))
-      ConsumerTopicStat.getConsumerTopicStat(topic).byteRate.mark(size)
-      ConsumerTopicStat.getConsumerAllTopicStat().byteRate.mark(size)
+      consumerTopicStats.getConsumerTopicStats(topic).byteRate.mark(size)
+      consumerTopicStats.getConsumerAllTopicStats().byteRate.mark(size)
     }
   }
   

http://git-wip-us.apache.org/repos/asf/kafka/blob/d7c71c09/core/src/main/scala/kafka/consumer/SimpleConsumer.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/consumer/SimpleConsumer.scala b/core/src/main/scala/kafka/consumer/SimpleConsumer.scala
index d642a67..e42923a 100644
--- a/core/src/main/scala/kafka/consumer/SimpleConsumer.scala
+++ b/core/src/main/scala/kafka/consumer/SimpleConsumer.scala
@@ -31,12 +31,12 @@ import kafka.cluster.Broker
 
 object SimpleConsumer extends Logging {
   def earliestOrLatestOffset(broker: Broker, topic: String, partitionId: Int, earliestOrLatest: Long,
-                             isFromOrdinaryConsumer: Boolean): Long = {
+                             clientId: String, isFromOrdinaryConsumer: Boolean): Long = {
     var simpleConsumer: SimpleConsumer = null
     var producedOffset: Long = -1L
     try {
       simpleConsumer = new SimpleConsumer(broker.host, broker.port, ConsumerConfig.SocketTimeout,
-                                          ConsumerConfig.SocketBufferSize)
+                                          ConsumerConfig.SocketBufferSize, clientId)
       val topicAndPartition = TopicAndPartition(topic, partitionId)
       val request = if(isFromOrdinaryConsumer)
         new OffsetRequest(immutable.Map(topicAndPartition -> PartitionOffsetRequestInfo(earliestOrLatest, 1)))
@@ -56,14 +56,14 @@ object SimpleConsumer extends Logging {
   }
 
   def earliestOrLatestOffset(zkClient: ZkClient, topic: String, brokerId: Int, partitionId: Int,
-                             earliestOrLatest: Long, isFromOrdinaryConsumer: Boolean = true): Long = {
+                             earliestOrLatest: Long, clientId: String, isFromOrdinaryConsumer: Boolean = true): Long = {
     val cluster = getCluster(zkClient)
     val broker = cluster.getBroker(brokerId) match {
       case Some(b) => b
       case None => throw new KafkaException("Broker " + brokerId + " is unavailable. Cannot issue " +
                                                     "getOffsetsBefore request")
     }
-    earliestOrLatestOffset(broker, topic, partitionId, earliestOrLatest, isFromOrdinaryConsumer)
+    earliestOrLatestOffset(broker, topic, partitionId, earliestOrLatest, clientId, isFromOrdinaryConsumer)
   }
 }
 
@@ -75,10 +75,13 @@ object SimpleConsumer extends Logging {
 class SimpleConsumer(val host: String,
                      val port: Int,
                      val soTimeout: Int,
-                     val bufferSize: Int) extends Logging {
+                     val bufferSize: Int,
+                     val clientId: String) extends Logging {
 
+  ClientId.validate(clientId)
   private val lock = new Object()
   private val blockingChannel = new BlockingChannel(host, port, bufferSize, BlockingChannel.UseDefaultBufferSize, soTimeout)
+  private val fetchRequestAndResponseStats = new FetchRequestAndResponseStats(clientId, "host_" + host + "-port_" + port)
 
   private def connect(): BlockingChannel = {
     close
@@ -143,12 +146,12 @@ class SimpleConsumer(val host: String,
    */
   def fetch(request: FetchRequest): FetchResponse = {
     var response: Receive = null
-    FetchRequestAndResponseStat.requestTimer.time {
+    fetchRequestAndResponseStats.requestTimer.time {
       response = sendRequest(request)
     }
     val fetchResponse = FetchResponse.readFrom(response.buffer)
     val fetchedSize = fetchResponse.sizeInBytes
-    FetchRequestAndResponseStat.respondSizeHist.update(fetchedSize)
+    fetchRequestAndResponseStats.respondSizeHist.update(fetchedSize)
     fetchResponse
   }
 
@@ -166,7 +169,7 @@ class SimpleConsumer(val host: String,
   }
 }
 
-object FetchRequestAndResponseStat extends KafkaMetricsGroup {
-  val requestTimer = new KafkaTimer(newTimer("FetchRequestRateAndTimeMs", TimeUnit.MILLISECONDS, TimeUnit.SECONDS))
-  val respondSizeHist = newHistogram("FetchResponseSize")
+class FetchRequestAndResponseStats(clientId: String, brokerInfo: String) extends KafkaMetricsGroup {
+  val requestTimer = new KafkaTimer(newTimer(clientId + "-" + brokerInfo + "-FetchRequestRateAndTimeMs", TimeUnit.MILLISECONDS, TimeUnit.SECONDS))
+  val respondSizeHist = newHistogram(clientId + "-" + brokerInfo + "-FetchResponseSize")
 }

http://git-wip-us.apache.org/repos/asf/kafka/blob/d7c71c09/core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala b/core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala
index 227c90d..fbb82a2 100644
--- a/core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala
+++ b/core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala
@@ -35,7 +35,6 @@ import kafka.client.ClientUtils
 import com.yammer.metrics.core.Gauge
 import kafka.api.OffsetRequest
 import kafka.metrics._
-import kafka.producer.ProducerConfig
 
 
 /**
@@ -80,6 +79,8 @@ private[kafka] object ZookeeperConsumerConnector {
 private[kafka] class ZookeeperConsumerConnector(val config: ConsumerConfig,
                                                 val enableFetcher: Boolean) // for testing only
         extends ConsumerConnector with Logging with KafkaMetricsGroup {
+
+  ClientId.validate(config.clientId)
   private val isShuttingDown = new AtomicBoolean(false)
   private val rebalanceLock = new Object
   private var fetcher: Option[ConsumerFetcherManager] = None
@@ -94,6 +95,8 @@ private[kafka] class ZookeeperConsumerConnector(val config: ConsumerConfig,
 
   private var wildcardTopicWatcher: ZookeeperTopicEventWatcher = null
 
+  private val consumerTopicStats = new ConsumerTopicStats(config.clientId)
+
   val consumerIdString = {
     var consumerUuid : String = null
     config.consumerId match {
@@ -195,7 +198,7 @@ private[kafka] class ZookeeperConsumerConnector(val config: ConsumerConfig,
       threadIdSet.map(_ => {
         val queue =  new LinkedBlockingQueue[FetchedDataChunk](config.maxQueuedChunks)
         val stream = new KafkaStream[K,V](
-          queue, config.consumerTimeoutMs, keyDecoder, valueDecoder, config.enableShallowIterator)
+          queue, config.consumerTimeoutMs, keyDecoder, valueDecoder, config.enableShallowIterator, consumerTopicStats)
         (queue, stream)
       })
     ).flatten.toList
@@ -399,7 +402,7 @@ private[kafka] class ZookeeperConsumerConnector(val config: ConsumerConfig,
       val myTopicThreadIdsMap = TopicCount.constructTopicCount(group, consumerIdString, zkClient).getConsumerThreadIdsPerTopic
       val consumersPerTopicMap = getConsumersPerTopic(zkClient, group)
       val brokers = getAllBrokersInCluster(zkClient)
-      val topicsMetadata = ClientUtils.fetchTopicMetadata(myTopicThreadIdsMap.keySet, brokers).topicsMetadata
+      val topicsMetadata = ClientUtils.fetchTopicMetadata(myTopicThreadIdsMap.keySet, config.clientId, brokers).topicsMetadata
       val partitionsPerTopicMap = new mutable.HashMap[String, Seq[Int]]
       val leaderIdForPartitionsMap = new mutable.HashMap[(String, Int), Int]
       topicsMetadata.foreach(m =>{
@@ -595,9 +598,9 @@ private[kafka] class ZookeeperConsumerConnector(val config: ConsumerConfig,
           case None =>
             config.autoOffsetReset match {
               case OffsetRequest.SmallestTimeString =>
-                SimpleConsumer.earliestOrLatestOffset(zkClient, topic, leader, partition, OffsetRequest.EarliestTime)
+                SimpleConsumer.earliestOrLatestOffset(zkClient, topic, leader, partition, OffsetRequest.EarliestTime, config.clientId)
               case OffsetRequest.LargestTimeString =>
-                SimpleConsumer.earliestOrLatestOffset(zkClient, topic, leader, partition, OffsetRequest.LatestTime)
+                SimpleConsumer.earliestOrLatestOffset(zkClient, topic, leader, partition, OffsetRequest.LatestTime, config.clientId)
               case _ =>
                 throw new InvalidConfigException("Wrong value in autoOffsetReset in ConsumerConfig")
             }
@@ -611,7 +614,8 @@ private[kafka] class ZookeeperConsumerConnector(val config: ConsumerConfig,
                                                  queue,
                                                  consumedOffset,
                                                  fetchedOffset,
-                                                 new AtomicInteger(config.fetchSize))
+                                                 new AtomicInteger(config.fetchSize),
+                                                 consumerTopicStats)
       partTopicInfoMap.put(partition, partTopicInfo)
       debug(partTopicInfo + " selected new offset " + offset)
     }
@@ -667,7 +671,7 @@ private[kafka] class ZookeeperConsumerConnector(val config: ConsumerConfig,
       val q = e._2._1
       topicThreadIdAndQueues.put(topicThreadId, q)
       newGauge(
-        config.groupId + "-" + topicThreadId._1 + "-" + topicThreadId._2 + "-FetchQueueSize",
+        config.clientId + "-" + config.groupId + "-" + topicThreadId._1 + "-" + topicThreadId._2 + "-FetchQueueSize",
         new Gauge[Int] {
           def getValue = q.size
         }
@@ -714,7 +718,8 @@ private[kafka] class ZookeeperConsumerConnector(val config: ConsumerConfig,
                                           config.consumerTimeoutMs, 
                                           keyDecoder, 
                                           valueDecoder, 
-                                          config.enableShallowIterator)
+                                          config.enableShallowIterator,
+                                          consumerTopicStats)
         (queue, stream)
     }).toList
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/d7c71c09/core/src/main/scala/kafka/controller/KafkaController.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/controller/KafkaController.scala b/core/src/main/scala/kafka/controller/KafkaController.scala
index 3b1a53f..4840c0c 100644
--- a/core/src/main/scala/kafka/controller/KafkaController.scala
+++ b/core/src/main/scala/kafka/controller/KafkaController.scala
@@ -961,7 +961,7 @@ case class PartitionAndReplica(topic: String, partition: Int, replica: Int)
 
 case class LeaderIsrAndControllerEpoch(val leaderAndIsr: LeaderAndIsr, controllerEpoch: Int)
 
-object ControllerStat extends KafkaMetricsGroup {
+object ControllerStats extends KafkaMetricsGroup {
   val offlinePartitionRate = newMeter("OfflinePartitionsPerSec",  "partitions", TimeUnit.SECONDS)
   val uncleanLeaderElectionRate = newMeter("UncleanLeaderElectionsPerSec",  "elections", TimeUnit.SECONDS)
   val leaderElectionTimer = new KafkaTimer(newTimer("LeaderElectionRateAndTimeMs", TimeUnit.MILLISECONDS, TimeUnit.SECONDS))

http://git-wip-us.apache.org/repos/asf/kafka/blob/d7c71c09/core/src/main/scala/kafka/controller/PartitionLeaderSelector.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/controller/PartitionLeaderSelector.scala b/core/src/main/scala/kafka/controller/PartitionLeaderSelector.scala
index 3e5435e..3eb23cd 100644
--- a/core/src/main/scala/kafka/controller/PartitionLeaderSelector.scala
+++ b/core/src/main/scala/kafka/controller/PartitionLeaderSelector.scala
@@ -58,12 +58,12 @@ class OfflinePartitionLeaderSelector(controllerContext: ControllerContext) exten
               .format(liveAssignedReplicasToThisPartition.mkString(",")))
             liveAssignedReplicasToThisPartition.isEmpty match {
               case true =>
-                ControllerStat.offlinePartitionRate.mark()
+                ControllerStats.offlinePartitionRate.mark()
                 throw new PartitionOfflineException(("No replica for partition " +
                   "([%s, %d]) is alive. Live brokers are: [%s],".format(topic, partition, controllerContext.liveBrokerIds)) +
                   " Assigned replicas are: [%s]".format(assignedReplicas))
               case false =>
-                ControllerStat.uncleanLeaderElectionRate.mark()
+                ControllerStats.uncleanLeaderElectionRate.mark()
                 val newLeader = liveAssignedReplicasToThisPartition.head
                 warn("No broker in ISR is alive, elected leader from the alive replicas is [%s], ".format(newLeader) +
                   "There's potential data loss")
@@ -78,7 +78,7 @@ class OfflinePartitionLeaderSelector(controllerContext: ControllerContext) exten
           partition))
         (newLeaderAndIsr, liveAssignedReplicasToThisPartition)
       case None =>
-        ControllerStat.offlinePartitionRate.mark()
+        ControllerStats.offlinePartitionRate.mark()
         throw new PartitionOfflineException("Partition [%s, %d] doesn't have".format(topic, partition) +
           "replicas assigned to it")
     }

http://git-wip-us.apache.org/repos/asf/kafka/blob/d7c71c09/core/src/main/scala/kafka/controller/PartitionStateMachine.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/controller/PartitionStateMachine.scala b/core/src/main/scala/kafka/controller/PartitionStateMachine.scala
index 17dfbee..0278782 100644
--- a/core/src/main/scala/kafka/controller/PartitionStateMachine.scala
+++ b/core/src/main/scala/kafka/controller/PartitionStateMachine.scala
@@ -223,7 +223,7 @@ class PartitionStateMachine(controller: KafkaController) extends Logging {
     val liveAssignedReplicas = replicaAssignment.filter(r => controllerContext.liveBrokerIds.contains(r))
     liveAssignedReplicas.size match {
       case 0 =>
-        ControllerStat.offlinePartitionRate.mark()
+        ControllerStats.offlinePartitionRate.mark()
         throw new StateChangeFailedException(("During state change of partition %s from NEW to ONLINE, assigned replicas are " +
           "[%s], live brokers are [%s]. No assigned replica is alive").format(topicAndPartition,
           replicaAssignment.mkString(","), controllerContext.liveBrokerIds))
@@ -249,7 +249,7 @@ class PartitionStateMachine(controller: KafkaController) extends Logging {
             // read the controller epoch
             val leaderIsrAndEpoch = ZkUtils.getLeaderIsrAndEpochForPartition(zkClient, topicAndPartition.topic,
               topicAndPartition.partition).get
-            ControllerStat.offlinePartitionRate.mark()
+            ControllerStats.offlinePartitionRate.mark()
             throw new StateChangeFailedException("Error while changing partition %s's state from New to Online"
               .format(topicAndPartition) + " since Leader and isr path already exists with value " +
               "%s and controller epoch %d".format(leaderIsrAndEpoch.leaderAndIsr.toString(), leaderIsrAndEpoch.controllerEpoch))

http://git-wip-us.apache.org/repos/asf/kafka/blob/d7c71c09/core/src/main/scala/kafka/controller/ReplicaStateMachine.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/controller/ReplicaStateMachine.scala b/core/src/main/scala/kafka/controller/ReplicaStateMachine.scala
index 5c8aec5..1753947 100644
--- a/core/src/main/scala/kafka/controller/ReplicaStateMachine.scala
+++ b/core/src/main/scala/kafka/controller/ReplicaStateMachine.scala
@@ -227,7 +227,7 @@ class ReplicaStateMachine(controller: KafkaController) extends Logging {
   class BrokerChangeListener() extends IZkChildListener with Logging {
     this.logIdent = "[BrokerChangeListener on Controller " + controller.config.brokerId + "]: "
     def handleChildChange(parentPath : String, currentBrokerList : java.util.List[String]) {
-      ControllerStat.leaderElectionTimer.time {
+      ControllerStats.leaderElectionTimer.time {
         info("Broker change listener fired for path %s with children %s".format(parentPath, currentBrokerList.mkString(",")))
         if(!isShuttingDown.get()) {
           controllerContext.controllerLock synchronized {

http://git-wip-us.apache.org/repos/asf/kafka/blob/d7c71c09/core/src/main/scala/kafka/javaapi/consumer/SimpleConsumer.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/javaapi/consumer/SimpleConsumer.scala b/core/src/main/scala/kafka/javaapi/consumer/SimpleConsumer.scala
index 803ec4b..58c7081 100644
--- a/core/src/main/scala/kafka/javaapi/consumer/SimpleConsumer.scala
+++ b/core/src/main/scala/kafka/javaapi/consumer/SimpleConsumer.scala
@@ -29,8 +29,10 @@ import kafka.javaapi.OffsetRequest
 class SimpleConsumer(val host: String,
                      val port: Int,
                      val soTimeout: Int,
-                     val bufferSize: Int) {
-  private val underlying = new kafka.consumer.SimpleConsumer(host, port, soTimeout, bufferSize)
+                     val bufferSize: Int,
+                     val clientId: String) {
+
+  private val underlying = new kafka.consumer.SimpleConsumer(host, port, soTimeout, bufferSize, clientId)
 
   /**
    *  Fetch a set of messages from a topic. This version of the fetch method

http://git-wip-us.apache.org/repos/asf/kafka/blob/d7c71c09/core/src/main/scala/kafka/javaapi/consumer/ZookeeperConsumerConnector.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/javaapi/consumer/ZookeeperConsumerConnector.scala b/core/src/main/scala/kafka/javaapi/consumer/ZookeeperConsumerConnector.scala
index 2b20aa4..14c4c8a 100644
--- a/core/src/main/scala/kafka/javaapi/consumer/ZookeeperConsumerConnector.scala
+++ b/core/src/main/scala/kafka/javaapi/consumer/ZookeeperConsumerConnector.scala
@@ -16,7 +16,6 @@
  */
 package kafka.javaapi.consumer
 
-import kafka.message.Message
 import kafka.serializer._
 import kafka.consumer._
 import scala.collection.JavaConversions.asList

http://git-wip-us.apache.org/repos/asf/kafka/blob/d7c71c09/core/src/main/scala/kafka/log/Log.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/log/Log.scala b/core/src/main/scala/kafka/log/Log.scala
index 4cb2445..c2fccec 100644
--- a/core/src/main/scala/kafka/log/Log.scala
+++ b/core/src/main/scala/kafka/log/Log.scala
@@ -24,7 +24,7 @@ import java.util.concurrent.atomic._
 import kafka.utils._
 import scala.math._
 import java.text.NumberFormat
-import kafka.server.BrokerTopicStat
+import kafka.server.BrokerTopicStats
 import kafka.message._
 import kafka.common._
 import kafka.metrics.KafkaMetricsGroup
@@ -244,8 +244,8 @@ private[kafka] class Log(val dir: File,
     if(messageSetInfo.count == 0) {
       (-1L, -1L)
     } else {
-      BrokerTopicStat.getBrokerTopicStat(topicName).messagesInRate.mark(messageSetInfo.count)
-      BrokerTopicStat.getBrokerAllTopicStat.messagesInRate.mark(messageSetInfo.count)
+      BrokerTopicStats.getBrokerTopicStats(topicName).messagesInRate.mark(messageSetInfo.count)
+      BrokerTopicStats.getBrokerAllTopicStats.messagesInRate.mark(messageSetInfo.count)
 
       // trim any invalid bytes or partial messages before appending it to the on-disk log
       var validMessages = trimInvalidBytes(messages)

http://git-wip-us.apache.org/repos/asf/kafka/blob/d7c71c09/core/src/main/scala/kafka/metrics/KafkaCSVMetricsReporter.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/metrics/KafkaCSVMetricsReporter.scala b/core/src/main/scala/kafka/metrics/KafkaCSVMetricsReporter.scala
index fda0b24..ea9559f 100644
--- a/core/src/main/scala/kafka/metrics/KafkaCSVMetricsReporter.scala
+++ b/core/src/main/scala/kafka/metrics/KafkaCSVMetricsReporter.scala
@@ -24,7 +24,6 @@ import com.yammer.metrics.Metrics
 import java.io.File
 import com.yammer.metrics.reporting.CsvReporter
 import java.util.concurrent.TimeUnit
-import java.util.concurrent.atomic.AtomicBoolean
 import kafka.utils.{Utils, VerifiableProperties, Logging}
 
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/d7c71c09/core/src/main/scala/kafka/producer/BrokerPartitionInfo.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/producer/BrokerPartitionInfo.scala b/core/src/main/scala/kafka/producer/BrokerPartitionInfo.scala
index 27bae5e..938504a 100644
--- a/core/src/main/scala/kafka/producer/BrokerPartitionInfo.scala
+++ b/core/src/main/scala/kafka/producer/BrokerPartitionInfo.scala
@@ -68,11 +68,11 @@ class BrokerPartitionInfo(producerConfig: ProducerConfig,
 
   /**
    * It updates the cache by issuing a get topic metadata request to a random broker.
-   * @param topic the topic for which the metadata is to be fetched
+   * @param topics the topics for which the metadata is to be fetched
    */
-  def updateInfo(topics: Set[String]) = {
+  def updateInfo(topics: Set[String]) {
     var topicsMetadata: Seq[TopicMetadata] = Nil
-    val topicMetadataResponse = ClientUtils.fetchTopicMetadata(topics, brokers)
+    val topicMetadataResponse = ClientUtils.fetchTopicMetadata(topics, producerConfig.clientId, brokers)
     topicsMetadata = topicMetadataResponse.topicsMetadata
     // throw partition specific exception
     topicsMetadata.foreach(tmd =>{

http://git-wip-us.apache.org/repos/asf/kafka/blob/d7c71c09/core/src/main/scala/kafka/producer/ConsoleProducer.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/producer/ConsoleProducer.scala b/core/src/main/scala/kafka/producer/ConsoleProducer.scala
index e7f50e4..8c32115 100644
--- a/core/src/main/scala/kafka/producer/ConsoleProducer.scala
+++ b/core/src/main/scala/kafka/producer/ConsoleProducer.scala
@@ -20,7 +20,6 @@ package kafka.producer
 import scala.collection.JavaConversions._
 import joptsimple._
 import java.util.Properties
-import java.util.regex._
 import java.io._
 import kafka.common._
 import kafka.message._

http://git-wip-us.apache.org/repos/asf/kafka/blob/d7c71c09/core/src/main/scala/kafka/producer/DefaultPartitioner.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/producer/DefaultPartitioner.scala b/core/src/main/scala/kafka/producer/DefaultPartitioner.scala
index d0a89eb..c82670e 100644
--- a/core/src/main/scala/kafka/producer/DefaultPartitioner.scala
+++ b/core/src/main/scala/kafka/producer/DefaultPartitioner.scala
@@ -17,7 +17,6 @@
 
 package kafka.producer
 
-import kafka.utils.Utils
 
 import kafka.utils._
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/d7c71c09/core/src/main/scala/kafka/producer/Producer.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/producer/Producer.scala b/core/src/main/scala/kafka/producer/Producer.scala
index 7b8926c..3bfd563 100644
--- a/core/src/main/scala/kafka/producer/Producer.scala
+++ b/core/src/main/scala/kafka/producer/Producer.scala
@@ -16,7 +16,7 @@
  */
 package kafka.producer
 
-import async.{AsyncProducerStats, DefaultEventHandler, ProducerSendThread, EventHandler}
+import async.{DefaultEventHandler, ProducerSendThread, EventHandler}
 import kafka.utils._
 import java.util.Random
 import java.util.concurrent.{TimeUnit, LinkedBlockingQueue}
@@ -27,8 +27,11 @@ import kafka.metrics._
 
 
 class Producer[K,V](config: ProducerConfig,
-                    private val eventHandler: EventHandler[K,V]) // for testing only
-extends Logging {
+                    private val eventHandler: EventHandler[K,V],
+                    private val producerStats: ProducerStats,
+                    private val producerTopicStats: ProducerTopicStats)  // only for unit testing
+  extends Logging {
+
   private val hasShutdown = new AtomicBoolean(false)
   if (config.batchSize > config.queueSize)
     throw new InvalidConfigException("Batch size can't be larger than queue size.")
@@ -47,25 +50,38 @@ extends Logging {
                                                        queue,
                                                        eventHandler, 
                                                        config.queueTime, 
-                                                       config.batchSize)
+                                                       config.batchSize,
+                                                       config.clientId)
       producerSendThread.start()
     case _ => throw new InvalidConfigException("Valid values for producer.type are sync/async")
   }
 
   KafkaMetricsReporter.startReporters(config.props)
 
+  def this(t: (ProducerConfig, EventHandler[K,V], ProducerStats, ProducerTopicStats)) =
+    this(t._1, t._2, t._3, t._4)
+
   def this(config: ProducerConfig) =
-    this(config,
-         new DefaultEventHandler[K,V](config,
-                                      Utils.createObject[Partitioner[K]](config.partitionerClass, config.props),
-                                      Utils.createObject[Encoder[V]](config.serializerClass, config.props),
-                                      Utils.createObject[Encoder[K]](config.keySerializerClass, config.props),
-                                      new ProducerPool(config)))
+    this {
+      ClientId.validate(config.clientId)
+      val producerStats = new ProducerStats(config.clientId)
+      val producerTopicStats = new ProducerTopicStats(config.clientId)
+      (config,
+       new DefaultEventHandler[K,V](config,
+                                    Utils.createObject[Partitioner[K]](config.partitionerClass, config.props),
+                                    Utils.createObject[Encoder[V]](config.serializerClass, config.props),
+                                    Utils.createObject[Encoder[K]](config.keySerializerClass, config.props),
+                                    new ProducerPool(config),
+                                    producerStats = producerStats,
+                                    producerTopicStats = producerTopicStats),
+       producerStats,
+       producerTopicStats)
+    }
 
   /**
    * Sends the data, partitioned by key to the topic using either the
    * synchronous or the asynchronous producer
-   * @param producerData the producer data object that encapsulates the topic, key and message data
+   * @param messages the producer data object that encapsulates the topic, key and message data
    */
   def send(messages: KeyedMessage[K,V]*) {
     if (hasShutdown.get)
@@ -79,8 +95,8 @@ extends Logging {
 
   private def recordStats(messages: Seq[KeyedMessage[K,V]]) {
     for (message <- messages) {
-      ProducerTopicStat.getProducerTopicStat(message.topic).messageRate.mark()
-      ProducerTopicStat.getProducerAllTopicStat.messageRate.mark()
+      producerTopicStats.getProducerTopicStats(message.topic).messageRate.mark()
+      producerTopicStats.getProducerAllTopicStats.messageRate.mark()
     }
   }
 
@@ -105,7 +121,7 @@ extends Logging {
           }
       }
       if(!added) {
-        AsyncProducerStats.droppedMessageRate.mark()
+        producerStats.droppedMessageRate.mark()
         error("Event queue is full of unsent messages, could not send event: " + message.toString)
         throw new QueueFullException("Event queue is full of unsent messages, could not send event: " + message.toString)
       }else {
@@ -131,26 +147,27 @@ extends Logging {
 }
 
 @threadsafe
-class ProducerTopicStat(name: String) extends KafkaMetricsGroup {
-  val messageRate = newMeter(name + "MessagesPerSec",  "messages", TimeUnit.SECONDS)
-  val byteRate = newMeter(name + "BytesPerSec",  "bytes", TimeUnit.SECONDS)
+class ProducerTopicMetrics(clientIdTopic: ClientIdAndTopic) extends KafkaMetricsGroup {
+  val messageRate = newMeter(clientIdTopic + "-MessagesPerSec",  "messages", TimeUnit.SECONDS)
+  val byteRate = newMeter(clientIdTopic + "-BytesPerSec",  "bytes", TimeUnit.SECONDS)
 }
 
-object ProducerTopicStat {
-  private val valueFactory = (k: String) => new ProducerTopicStat(k)
-  private val stats = new Pool[String, ProducerTopicStat](Some(valueFactory))
-  private val allTopicStat = new ProducerTopicStat("AllTopics")
+class ProducerTopicStats(clientId: String) {
+  private val valueFactory = (k: ClientIdAndTopic) => new ProducerTopicMetrics(k)
+  private val stats = new Pool[ClientIdAndTopic, ProducerTopicMetrics](Some(valueFactory))
+  private val allTopicStats = new ProducerTopicMetrics(new ClientIdAndTopic(clientId, "AllTopics"))
 
-  def getProducerAllTopicStat(): ProducerTopicStat = allTopicStat
+  def getProducerAllTopicStats(): ProducerTopicMetrics = allTopicStats
 
-  def getProducerTopicStat(topic: String): ProducerTopicStat = {
-    stats.getAndMaybePut(topic + "-")
+  def getProducerTopicStats(topic: String): ProducerTopicMetrics = {
+    stats.getAndMaybePut(new ClientIdAndTopic(clientId, topic))
   }
 }
 
-object ProducerStats extends KafkaMetricsGroup {
-  val serializationErrorRate = newMeter("SerializationErrorsPerSec",  "errors", TimeUnit.SECONDS)
-  val resendRate = newMeter( "ResendsPerSec",  "resends", TimeUnit.SECONDS)
-  val failedSendRate = newMeter("FailedSendsPerSec",  "failed sends", TimeUnit.SECONDS)
+class ProducerStats(clientId: String) extends KafkaMetricsGroup {
+  val serializationErrorRate = newMeter(clientId + "-SerializationErrorsPerSec",  "errors", TimeUnit.SECONDS)
+  val resendRate = newMeter(clientId + "-ResendsPerSec",  "resends", TimeUnit.SECONDS)
+  val failedSendRate = newMeter(clientId + "-FailedSendsPerSec",  "failed sends", TimeUnit.SECONDS)
+  val droppedMessageRate = newMeter(clientId + "-DroppedMessagesPerSec",  "drops", TimeUnit.SECONDS)
 }
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/d7c71c09/core/src/main/scala/kafka/producer/ProducerPool.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/producer/ProducerPool.scala b/core/src/main/scala/kafka/producer/ProducerPool.scala
index eb8ead3..7e78c7e 100644
--- a/core/src/main/scala/kafka/producer/ProducerPool.scala
+++ b/core/src/main/scala/kafka/producer/ProducerPool.scala
@@ -26,13 +26,26 @@ import kafka.api.TopicMetadata
 import kafka.common.UnavailableProducerException
 
 
-object ProducerPool{
-  def createSyncProducer(configOpt: Option[ProducerConfig], broker: Broker): SyncProducer = {
+object ProducerPool {
+  /**
+   * Used in ProducerPool to initiate a SyncProducer connection with a broker.
+   */
+  def createSyncProducer(config: ProducerConfig, broker: Broker): SyncProducer = {
+    val props = new Properties()
+    props.put("host", broker.host)
+    props.put("port", broker.port.toString)
+    props.putAll(config.props.props)
+    new SyncProducer(new SyncProducerConfig(props))
+  }
+
+  /**
+   * Used in ClientUtils to send TopicMetadataRequest to a broker.
+   */
+  def createSyncProducer(clientId: String, broker: Broker): SyncProducer = {
     val props = new Properties()
     props.put("host", broker.host)
     props.put("port", broker.port.toString)
-    if(configOpt.isDefined)
-      props.putAll(configOpt.get.props.props)
+    props.put("client.id", clientId)
     new SyncProducer(new SyncProducerConfig(props))
   }
 }
@@ -41,9 +54,9 @@ class ProducerPool(val config: ProducerConfig) extends Logging {
   private val syncProducers = new HashMap[Int, SyncProducer]
   private val lock = new Object()
 
-  def updateProducer(topicMetaDatas: Seq[TopicMetadata]) {
+  def updateProducer(topicMetadatas: Seq[TopicMetadata]) {
     val newBrokers = new collection.mutable.HashSet[Broker]
-    topicMetaDatas.foreach(tmd => {
+    topicMetadatas.foreach(tmd => {
       tmd.partitionsMetadata.foreach(pmd => {
         if(pmd.leader.isDefined)
           newBrokers+=(pmd.leader.get)
@@ -53,9 +66,9 @@ class ProducerPool(val config: ProducerConfig) extends Logging {
       newBrokers.foreach(b => {
         if(syncProducers.contains(b.id)){
           syncProducers(b.id).close()
-          syncProducers.put(b.id, ProducerPool.createSyncProducer(Some(config), b))
+          syncProducers.put(b.id, ProducerPool.createSyncProducer(config, b))
         } else
-          syncProducers.put(b.id, ProducerPool.createSyncProducer(Some(config), b))
+          syncProducers.put(b.id, ProducerPool.createSyncProducer(config, b))
       })
     }
   }

http://git-wip-us.apache.org/repos/asf/kafka/blob/d7c71c09/core/src/main/scala/kafka/producer/SyncProducer.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/producer/SyncProducer.scala b/core/src/main/scala/kafka/producer/SyncProducer.scala
index 15733bb..ea03d51 100644
--- a/core/src/main/scala/kafka/producer/SyncProducer.scala
+++ b/core/src/main/scala/kafka/producer/SyncProducer.scala
@@ -34,14 +34,12 @@ object SyncProducer {
  */
 @threadsafe
 class SyncProducer(val config: SyncProducerConfig) extends Logging {
-  
-  private val MaxConnectBackoffMs = 60000
-  private var sentOnConnection = 0
 
   private val lock = new Object()
   @volatile private var shutdown: Boolean = false
   private val blockingChannel = new BlockingChannel(config.host, config.port, BlockingChannel.UseDefaultBufferSize,
     config.bufferSize, config.requestTimeoutMs)
+  val producerRequestStats = new ProducerRequestStats(config.clientId, "host_" + config.host + "-port_" + config.port)
 
   trace("Instantiating Scala Sync Producer")
 
@@ -89,9 +87,9 @@ class SyncProducer(val config: SyncProducerConfig) extends Logging {
    * Send a message
    */
   def send(producerRequest: ProducerRequest): ProducerResponse = {
-    ProducerRequestStat.requestSizeHist.update(producerRequest.sizeInBytes)
+    producerRequestStats.requestSizeHist.update(producerRequest.sizeInBytes)
     var response: Receive = null
-    ProducerRequestStat.requestTimer.time {
+    producerRequestStats.requestTimer.time {
       response = doSend(producerRequest)
     }
     ProducerResponse.readFrom(response.buffer)
@@ -152,7 +150,7 @@ class SyncProducer(val config: SyncProducerConfig) extends Logging {
   }
 }
 
-object ProducerRequestStat extends KafkaMetricsGroup {
-  val requestTimer = new KafkaTimer(newTimer("ProduceRequestRateAndTimeMs", TimeUnit.MILLISECONDS, TimeUnit.SECONDS))
-  val requestSizeHist = newHistogram("ProducerRequestSize")
+class ProducerRequestStats(clientId: String, brokerInfo: String) extends KafkaMetricsGroup {
+  val requestTimer = new KafkaTimer(newTimer(clientId + "-" + brokerInfo + "-ProduceRequestRateAndTimeMs", TimeUnit.MILLISECONDS, TimeUnit.SECONDS))
+  val requestSizeHist = newHistogram(clientId + "-" + brokerInfo + "-ProducerRequestSize")
 }

http://git-wip-us.apache.org/repos/asf/kafka/blob/d7c71c09/core/src/main/scala/kafka/producer/SyncProducerConfig.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/producer/SyncProducerConfig.scala b/core/src/main/scala/kafka/producer/SyncProducerConfig.scala
index 6f6a3f3..3e3dc49 100644
--- a/core/src/main/scala/kafka/producer/SyncProducerConfig.scala
+++ b/core/src/main/scala/kafka/producer/SyncProducerConfig.scala
@@ -44,7 +44,7 @@ trait SyncProducerConfigShared {
   val correlationId = props.getInt("producer.request.correlation_id", SyncProducerConfig.DefaultCorrelationId)
 
   /* the client application sending the producer requests */
-  val clientId = props.getString("producer.request.client_id",SyncProducerConfig.DefaultClientId)
+  val clientId = props.getString("clientid", SyncProducerConfig.DefaultClientId)
 
   /*
    * The required acks of the producer requests - negative value means ack

http://git-wip-us.apache.org/repos/asf/kafka/blob/d7c71c09/core/src/main/scala/kafka/producer/async/AsyncProducerStats.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/producer/async/AsyncProducerStats.scala b/core/src/main/scala/kafka/producer/async/AsyncProducerStats.scala
index dd9078e..e69de29 100644
--- a/core/src/main/scala/kafka/producer/async/AsyncProducerStats.scala
+++ b/core/src/main/scala/kafka/producer/async/AsyncProducerStats.scala
@@ -1,25 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
-*/
-
-package kafka.producer.async
-
-import kafka.metrics.KafkaMetricsGroup
-import java.util.concurrent.TimeUnit
-
-object AsyncProducerStats extends KafkaMetricsGroup {
-  val droppedMessageRate = newMeter("DroppedMessagesPerSec",  "drops", TimeUnit.SECONDS)
-}

http://git-wip-us.apache.org/repos/asf/kafka/blob/d7c71c09/core/src/main/scala/kafka/producer/async/DefaultEventHandler.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/producer/async/DefaultEventHandler.scala b/core/src/main/scala/kafka/producer/async/DefaultEventHandler.scala
index 9be87d0..645402e 100644
--- a/core/src/main/scala/kafka/producer/async/DefaultEventHandler.scala
+++ b/core/src/main/scala/kafka/producer/async/DefaultEventHandler.scala
@@ -33,7 +33,9 @@ class DefaultEventHandler[K,V](config: ProducerConfig,
                                private val encoder: Encoder[V],
                                private val keyEncoder: Encoder[K],
                                private val producerPool: ProducerPool,
-                               private val topicPartitionInfos: HashMap[String, TopicMetadata] = new HashMap[String, TopicMetadata])
+                               private val topicPartitionInfos: HashMap[String, TopicMetadata] = new HashMap[String, TopicMetadata],
+                               private val producerStats: ProducerStats,
+                               private val producerTopicStats: ProducerTopicStats)
   extends EventHandler[K,V] with Logging {
   val isSync = ("sync" == config.producerType)
 
@@ -48,8 +50,8 @@ class DefaultEventHandler[K,V](config: ProducerConfig,
       serializedData.foreach{
         keyed => 
           val dataSize = keyed.message.payloadSize
-          ProducerTopicStat.getProducerTopicStat(keyed.topic).byteRate.mark(dataSize)
-          ProducerTopicStat.getProducerAllTopicStat.byteRate.mark(dataSize)
+          producerTopicStats.getProducerTopicStats(keyed.topic).byteRate.mark(dataSize)
+          producerTopicStats.getProducerAllTopicStats.byteRate.mark(dataSize)
       }
       var outstandingProduceRequests = serializedData
       var remainingRetries = config.producerRetries + 1
@@ -61,11 +63,11 @@ class DefaultEventHandler[K,V](config: ProducerConfig,
           // get topics of the outstanding produce requests and refresh metadata for those
           Utils.swallowError(brokerPartitionInfo.updateInfo(outstandingProduceRequests.map(_.topic).toSet))
           remainingRetries -= 1
-          ProducerStats.resendRate.mark()
+          producerStats.resendRate.mark()
         }
       }
       if(outstandingProduceRequests.size > 0) {
-        ProducerStats.failedSendRate.mark()
+        producerStats.failedSendRate.mark()
         error("Failed to send the following requests: " + outstandingProduceRequests)
         throw new FailedToSendMessageException("Failed to send messages after " + config.producerRetries + " tries.", null)
       }
@@ -111,7 +113,7 @@ class DefaultEventHandler[K,V](config: ProducerConfig,
           serializedMessages += KeyedMessage[K,Message](topic = e.topic, key = null.asInstanceOf[K], message = new Message(bytes = encoder.toBytes(e.message)))
       } catch {
         case t =>
-          ProducerStats.serializationErrorRate.mark()
+          producerStats.serializationErrorRate.mark()
           if (isSync) {
             throw t
           } else {

http://git-wip-us.apache.org/repos/asf/kafka/blob/d7c71c09/core/src/main/scala/kafka/producer/async/ProducerSendThread.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/producer/async/ProducerSendThread.scala b/core/src/main/scala/kafka/producer/async/ProducerSendThread.scala
index c900c45..2b39cab 100644
--- a/core/src/main/scala/kafka/producer/async/ProducerSendThread.scala
+++ b/core/src/main/scala/kafka/producer/async/ProducerSendThread.scala
@@ -28,12 +28,13 @@ class ProducerSendThread[K,V](val threadName: String,
                               val queue: BlockingQueue[KeyedMessage[K,V]],
                               val handler: EventHandler[K,V],
                               val queueTime: Long,
-                              val batchSize: Int) extends Thread(threadName) with Logging with KafkaMetricsGroup {
+                              val batchSize: Int,
+                              val clientId: String) extends Thread(threadName) with Logging with KafkaMetricsGroup {
 
   private val shutdownLatch = new CountDownLatch(1)
   private val shutdownCommand = new KeyedMessage[K,V]("shutdown", null.asInstanceOf[K], null.asInstanceOf[V])
 
-  newGauge("ProducerQueueSize-" + getId,
+  newGauge(clientId + "-ProducerQueueSize-" + getId,
           new Gauge[Int] {
             def getValue = queue.size
           })

http://git-wip-us.apache.org/repos/asf/kafka/blob/d7c71c09/core/src/main/scala/kafka/serializer/Decoder.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/serializer/Decoder.scala b/core/src/main/scala/kafka/serializer/Decoder.scala
index 6fc3b1d..54d0b93 100644
--- a/core/src/main/scala/kafka/serializer/Decoder.scala
+++ b/core/src/main/scala/kafka/serializer/Decoder.scala
@@ -17,7 +17,6 @@
 
 package kafka.serializer
 
-import kafka.message._
 import kafka.utils.VerifiableProperties
 
 /**

http://git-wip-us.apache.org/repos/asf/kafka/blob/d7c71c09/core/src/main/scala/kafka/serializer/Encoder.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/serializer/Encoder.scala b/core/src/main/scala/kafka/serializer/Encoder.scala
index fa9067f..020e73c 100644
--- a/core/src/main/scala/kafka/serializer/Encoder.scala
+++ b/core/src/main/scala/kafka/serializer/Encoder.scala
@@ -18,8 +18,6 @@
 package kafka.serializer
 
 import kafka.utils.VerifiableProperties
-import kafka.message._
-import kafka.utils.Utils
 
 /**
  * An encoder is a method of turning objects into byte arrays.

http://git-wip-us.apache.org/repos/asf/kafka/blob/d7c71c09/core/src/main/scala/kafka/server/AbstractFetcherThread.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/server/AbstractFetcherThread.scala b/core/src/main/scala/kafka/server/AbstractFetcherThread.scala
index c2cc3cb..e4520a4 100644
--- a/core/src/main/scala/kafka/server/AbstractFetcherThread.scala
+++ b/core/src/main/scala/kafka/server/AbstractFetcherThread.scala
@@ -27,7 +27,7 @@ import kafka.api.{FetchResponse, FetchResponsePartitionData, FetchRequestBuilder
 import kafka.metrics.KafkaMetricsGroup
 import com.yammer.metrics.core.Gauge
 import java.util.concurrent.atomic.AtomicLong
-import kafka.utils.{Pool, ShutdownableThread}
+import kafka.utils.{ClientIdAndTopic, Pool, ShutdownableThread}
 import java.util.concurrent.TimeUnit
 import java.util.concurrent.locks.ReentrantLock
 
@@ -38,12 +38,13 @@ import java.util.concurrent.locks.ReentrantLock
 abstract class AbstractFetcherThread(name: String, clientId: String, sourceBroker: Broker, socketTimeout: Int, socketBufferSize: Int,
                                      fetchSize: Int, fetcherBrokerId: Int = -1, maxWait: Int = 0, minBytes: Int = 1)
   extends ShutdownableThread(name) {
-
   private val partitionMap = new mutable.HashMap[TopicAndPartition, Long] // a (topic, partition) -> offset map
   private val partitionMapLock = new ReentrantLock
   private val partitionMapCond = partitionMapLock.newCondition()
-  val simpleConsumer = new SimpleConsumer(sourceBroker.host, sourceBroker.port, socketTimeout, socketBufferSize)
-  val fetcherMetrics = FetcherStat.getFetcherStat(name + "-" + sourceBroker.id)
+  val simpleConsumer = new SimpleConsumer(sourceBroker.host, sourceBroker.port, socketTimeout, socketBufferSize, clientId)
+  val fetcherStats = new FetcherStats(clientId)
+  val fetcherMetrics = fetcherStats.getFetcherStats(name + "-" + sourceBroker.id)
+  val fetcherLagStats = new FetcherLagStats(clientId)
 
   /* callbacks to be defined in subclass */
 
@@ -117,7 +118,7 @@ abstract class AbstractFetcherThread(name: String, clientId: String, sourceBroke
                     case None => currentOffset.get
                   }
                   partitionMap.put(topicAndPartition, newOffset)
-                  FetcherLagMetrics.getFetcherLagMetrics(topic, partitionId).lag = partitionData.hw - newOffset
+                  fetcherLagStats.getFetcherLagStats(topic, partitionId).lag = partitionData.hw - newOffset
                   fetcherMetrics.byteRate.mark(validBytes)
                   // Once we hand off the partition data to the subclass, we can't mess with it any more in this thread
                   processPartitionData(topicAndPartition, currentOffset.get, partitionData)
@@ -182,10 +183,10 @@ abstract class AbstractFetcherThread(name: String, clientId: String, sourceBroke
   }
 }
 
-class FetcherLagMetrics(name: (String, Int)) extends KafkaMetricsGroup {
+class FetcherLagMetrics(clientIdTopicPartition: ClientIdTopicPartition) extends KafkaMetricsGroup {
   private[this] var lagVal = new AtomicLong(-1L)
   newGauge(
-    name._1 + "-" + name._2 + "-ConsumerLag",
+    clientIdTopicPartition + "-ConsumerLag",
     new Gauge[Long] {
       def getValue = lagVal.get
     }
@@ -198,25 +199,29 @@ class FetcherLagMetrics(name: (String, Int)) extends KafkaMetricsGroup {
   def lag = lagVal.get
 }
 
-object FetcherLagMetrics {
-  private val valueFactory = (k: (String, Int)) => new FetcherLagMetrics(k)
-  private val stats = new Pool[(String, Int), FetcherLagMetrics](Some(valueFactory))
+class FetcherLagStats(clientId: String) {
+  private val valueFactory = (k: ClientIdTopicPartition) => new FetcherLagMetrics(k)
+  private val stats = new Pool[ClientIdTopicPartition, FetcherLagMetrics](Some(valueFactory))
 
-  def getFetcherLagMetrics(topic: String, partitionId: Int): FetcherLagMetrics = {
-    stats.getAndMaybePut( (topic, partitionId) )
+  def getFetcherLagStats(topic: String, partitionId: Int): FetcherLagMetrics = {
+    stats.getAndMaybePut(new ClientIdTopicPartition(clientId, topic, partitionId))
   }
 }
 
-class FetcherStat(name: String) extends KafkaMetricsGroup {
-  val requestRate = newMeter(name + "RequestsPerSec",  "requests", TimeUnit.SECONDS)
-  val byteRate = newMeter(name + "BytesPerSec",  "bytes", TimeUnit.SECONDS)
+class FetcherMetrics(clientIdTopic: ClientIdAndTopic) extends KafkaMetricsGroup {
+  val requestRate = newMeter(clientIdTopic + "-RequestsPerSec",  "requests", TimeUnit.SECONDS)
+  val byteRate = newMeter(clientIdTopic + "-BytesPerSec",  "bytes", TimeUnit.SECONDS)
 }
 
-object FetcherStat {
-  private val valueFactory = (k: String) => new FetcherStat(k)
-  private val stats = new Pool[String, FetcherStat](Some(valueFactory))
+class FetcherStats(clientId: String) {
+  private val valueFactory = (k: ClientIdAndTopic) => new FetcherMetrics(k)
+  private val stats = new Pool[ClientIdAndTopic, FetcherMetrics](Some(valueFactory))
 
-  def getFetcherStat(name: String): FetcherStat = {
-    stats.getAndMaybePut(name)
+  def getFetcherStats(name: String): FetcherMetrics = {
+    stats.getAndMaybePut(new ClientIdAndTopic(clientId, name))
   }
 }
+
+case class ClientIdTopicPartition(clientId: String, topic: String, partitionId: Int) {
+  override def toString = "%s-%s-%d".format(clientId, topic, partitionId)
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/d7c71c09/core/src/main/scala/kafka/server/KafkaApis.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/server/KafkaApis.scala b/core/src/main/scala/kafka/server/KafkaApis.scala
index a14e0a2..cc04ed5 100644
--- a/core/src/main/scala/kafka/server/KafkaApis.scala
+++ b/core/src/main/scala/kafka/server/KafkaApis.scala
@@ -237,8 +237,8 @@ class KafkaApis(val requestChannel: RequestChannel,
   private def appendToLocalLog(partitionAndData: Map[TopicAndPartition, MessageSet]): Iterable[ProduceResult] = {
     trace("Append [%s] to local log ".format(partitionAndData.toString))
     partitionAndData.map {case (topicAndPartition, messages) =>
-      BrokerTopicStat.getBrokerTopicStat(topicAndPartition.topic).bytesInRate.mark(messages.sizeInBytes)
-      BrokerTopicStat.getBrokerAllTopicStat.bytesInRate.mark(messages.sizeInBytes)
+      BrokerTopicStats.getBrokerTopicStats(topicAndPartition.topic).bytesInRate.mark(messages.sizeInBytes)
+      BrokerTopicStats.getBrokerAllTopicStats.bytesInRate.mark(messages.sizeInBytes)
 
       try {
         val localReplica = replicaManager.getLeaderReplicaIfLocal(topicAndPartition.topic, topicAndPartition.partition)
@@ -255,8 +255,8 @@ class KafkaApis(val requestChannel: RequestChannel,
           Runtime.getRuntime.halt(1)
           null
         case e =>
-          BrokerTopicStat.getBrokerTopicStat(topicAndPartition.topic).failedProduceRequestRate.mark()
-          BrokerTopicStat.getBrokerAllTopicStat.failedProduceRequestRate.mark()
+          BrokerTopicStats.getBrokerTopicStats(topicAndPartition.topic).failedProduceRequestRate.mark()
+          BrokerTopicStats.getBrokerAllTopicStats.failedProduceRequestRate.mark()
           error("Error processing ProducerRequest on %s:%d".format(topicAndPartition.topic, topicAndPartition.partition), e)
           new ProduceResult(topicAndPartition, e)
        }
@@ -323,8 +323,8 @@ class KafkaApis(val requestChannel: RequestChannel,
         val partitionData =
           try {
             val (messages, highWatermark) = readMessageSet(topic, partition, offset, fetchSize, fetchRequest.replicaId)
-            BrokerTopicStat.getBrokerTopicStat(topic).bytesOutRate.mark(messages.sizeInBytes)
-            BrokerTopicStat.getBrokerAllTopicStat.bytesOutRate.mark(messages.sizeInBytes)
+            BrokerTopicStats.getBrokerTopicStats(topic).bytesOutRate.mark(messages.sizeInBytes)
+            BrokerTopicStats.getBrokerAllTopicStats.bytesOutRate.mark(messages.sizeInBytes)
             if (!isFetchFromFollower) {
               new FetchResponsePartitionData(ErrorMapping.NoError, offset, highWatermark, messages)
             } else {
@@ -334,8 +334,8 @@ class KafkaApis(val requestChannel: RequestChannel,
             }
           } catch {
             case t: Throwable =>
-              BrokerTopicStat.getBrokerTopicStat(topic).failedFetchRequestRate.mark()
-              BrokerTopicStat.getBrokerAllTopicStat.failedFetchRequestRate.mark()
+              BrokerTopicStats.getBrokerTopicStats(topic).failedFetchRequestRate.mark()
+              BrokerTopicStats.getBrokerAllTopicStats.failedFetchRequestRate.mark()
               error("error when processing request " + (topic, partition, offset, fetchSize), t)
               new FetchResponsePartitionData(ErrorMapping.codeFor(t.getClass.asInstanceOf[Class[Throwable]]),
                                              offset, -1L, MessageSet.Empty)

http://git-wip-us.apache.org/repos/asf/kafka/blob/d7c71c09/core/src/main/scala/kafka/server/KafkaRequestHandler.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/server/KafkaRequestHandler.scala b/core/src/main/scala/kafka/server/KafkaRequestHandler.scala
index d949652..f0c05a5 100644
--- a/core/src/main/scala/kafka/server/KafkaRequestHandler.scala
+++ b/core/src/main/scala/kafka/server/KafkaRequestHandler.scala
@@ -79,14 +79,14 @@ class BrokerTopicMetrics(name: String) extends KafkaMetricsGroup {
   val failedFetchRequestRate = newMeter(name + "FailedFetchRequestsPerSec",  "requests", TimeUnit.SECONDS)
 }
 
-object BrokerTopicStat extends Logging {
+object BrokerTopicStats extends Logging {
   private val valueFactory = (k: String) => new BrokerTopicMetrics(k)
   private val stats = new Pool[String, BrokerTopicMetrics](Some(valueFactory))
-  private val allTopicStat = new BrokerTopicMetrics("AllTopics")
+  private val allTopicStats = new BrokerTopicMetrics("AllTopics")
 
-  def getBrokerAllTopicStat(): BrokerTopicMetrics = allTopicStat
+  def getBrokerAllTopicStats(): BrokerTopicMetrics = allTopicStats
 
-  def getBrokerTopicStat(topic: String): BrokerTopicMetrics = {
+  def getBrokerTopicStats(topic: String): BrokerTopicMetrics = {
     stats.getAndMaybePut(topic + "-")
   }
 }

http://git-wip-us.apache.org/repos/asf/kafka/blob/d7c71c09/core/src/main/scala/kafka/server/KafkaServer.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/server/KafkaServer.scala b/core/src/main/scala/kafka/server/KafkaServer.scala
index e0a86b9..d444d22 100644
--- a/core/src/main/scala/kafka/server/KafkaServer.scala
+++ b/core/src/main/scala/kafka/server/KafkaServer.scala
@@ -23,7 +23,7 @@ import kafka.utils._
 import java.util.concurrent._
 import atomic.AtomicBoolean
 import org.I0Itec.zkclient.ZkClient
-import kafka.controller.{ControllerStat, KafkaController}
+import kafka.controller.{ControllerStats, KafkaController}
 
 /**
  * Represents the lifecycle of a single Kafka broker. Handles all functionality required
@@ -96,9 +96,9 @@ class KafkaServer(val config: KafkaConfig, time: Time = SystemTime) extends Logg
    *  Forces some dynamic jmx beans to be registered on server startup.
    */
   private def registerStats() {
-    BrokerTopicStat.getBrokerAllTopicStat()
-    ControllerStat.offlinePartitionRate
-    ControllerStat.uncleanLeaderElectionRate
+    BrokerTopicStats.getBrokerAllTopicStats()
+    ControllerStats.offlinePartitionRate
+    ControllerStats.uncleanLeaderElectionRate
   }
 
   /**

http://git-wip-us.apache.org/repos/asf/kafka/blob/d7c71c09/core/src/main/scala/kafka/server/ReplicaFetcherThread.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/server/ReplicaFetcherThread.scala b/core/src/main/scala/kafka/server/ReplicaFetcherThread.scala
index 40afcab..34166e4 100644
--- a/core/src/main/scala/kafka/server/ReplicaFetcherThread.scala
+++ b/core/src/main/scala/kafka/server/ReplicaFetcherThread.scala
@@ -28,7 +28,7 @@ class ReplicaFetcherThread(name:String,
                            brokerConfig: KafkaConfig,
                            replicaMgr: ReplicaManager)
   extends AbstractFetcherThread(name = name,
-                                clientId = FetchRequest.ReplicaFetcherClientId + "- %s:%d".format(sourceBroker.host, sourceBroker.port) ,
+                                clientId = FetchRequest.ReplicaFetcherClientId + "-host_%s-port_%d".format(sourceBroker.host, sourceBroker.port),
                                 sourceBroker = sourceBroker,
                                 socketTimeout = brokerConfig.replicaSocketTimeoutMs,
                                 socketBufferSize = brokerConfig.replicaSocketBufferSize,

http://git-wip-us.apache.org/repos/asf/kafka/blob/d7c71c09/core/src/main/scala/kafka/tools/ConsumerOffsetChecker.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/tools/ConsumerOffsetChecker.scala b/core/src/main/scala/kafka/tools/ConsumerOffsetChecker.scala
index 9ca4dc8..db9acc9 100644
--- a/core/src/main/scala/kafka/tools/ConsumerOffsetChecker.scala
+++ b/core/src/main/scala/kafka/tools/ConsumerOffsetChecker.scala
@@ -41,7 +41,7 @@ object ConsumerOffsetChecker extends Logging {
     val brokerInfo = ZkUtils.readDataMaybeNull(zkClient, "/brokers/ids/%s".format(bid))._1
     val consumer = brokerInfo match {
       case BrokerIpPattern(ip, port) =>
-        Some(new SimpleConsumer(ip, port.toInt, 10000, 100000))
+        Some(new SimpleConsumer(ip, port.toInt, 10000, 100000, "ConsumerOffsetChecker"))
       case _ =>
         error("Could not parse broker info %s".format(brokerInfo))
         None

http://git-wip-us.apache.org/repos/asf/kafka/blob/d7c71c09/core/src/main/scala/kafka/tools/GetOffsetShell.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/tools/GetOffsetShell.scala b/core/src/main/scala/kafka/tools/GetOffsetShell.scala
index e78d53d..2b9438a 100644
--- a/core/src/main/scala/kafka/tools/GetOffsetShell.scala
+++ b/core/src/main/scala/kafka/tools/GetOffsetShell.scala
@@ -67,7 +67,7 @@ object GetOffsetShell {
     val partition = options.valueOf(partitionOpt).intValue
     var time = options.valueOf(timeOpt).longValue
     val nOffsets = options.valueOf(nOffsetsOpt).intValue
-    val consumer = new SimpleConsumer(url.getHost, url.getPort, 10000, 100000)
+    val consumer = new SimpleConsumer(url.getHost, url.getPort, 10000, 100000, "GetOffsetShell")
     val topicAndPartition = TopicAndPartition(topic, partition)
     val request = OffsetRequest(Map(topicAndPartition -> PartitionOffsetRequestInfo(time, nOffsets)))
     val offsets = consumer.getOffsetsBefore(request).partitionErrorAndOffsets(topicAndPartition).offsets

http://git-wip-us.apache.org/repos/asf/kafka/blob/d7c71c09/core/src/main/scala/kafka/tools/MirrorMaker.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/tools/MirrorMaker.scala b/core/src/main/scala/kafka/tools/MirrorMaker.scala
index aa23559..5c4b3d2 100644
--- a/core/src/main/scala/kafka/tools/MirrorMaker.scala
+++ b/core/src/main/scala/kafka/tools/MirrorMaker.scala
@@ -17,7 +17,6 @@
 
 package kafka.tools
 
-import kafka.message.Message
 import joptsimple.OptionParser
 import kafka.utils.{Utils, CommandLineUtils, Logging}
 import kafka.producer.{KeyedMessage, ProducerConfig, Producer}

http://git-wip-us.apache.org/repos/asf/kafka/blob/d7c71c09/core/src/main/scala/kafka/tools/ReplayLogProducer.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/tools/ReplayLogProducer.scala b/core/src/main/scala/kafka/tools/ReplayLogProducer.scala
index 79ffcc5..db14c82 100644
--- a/core/src/main/scala/kafka/tools/ReplayLogProducer.scala
+++ b/core/src/main/scala/kafka/tools/ReplayLogProducer.scala
@@ -24,7 +24,7 @@ import kafka.producer.{KeyedMessage, ProducerConfig, Producer}
 import kafka.consumer._
 import kafka.utils.{Logging, ZkUtils}
 import kafka.api.OffsetRequest
-import kafka.message.{CompressionCodec, Message}
+import kafka.message.CompressionCodec
 
 object ReplayLogProducer extends Logging {
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/d7c71c09/core/src/main/scala/kafka/tools/SimpleConsumerShell.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/tools/SimpleConsumerShell.scala b/core/src/main/scala/kafka/tools/SimpleConsumerShell.scala
index e34a432..addd8db 100644
--- a/core/src/main/scala/kafka/tools/SimpleConsumerShell.scala
+++ b/core/src/main/scala/kafka/tools/SimpleConsumerShell.scala
@@ -19,12 +19,10 @@ package kafka.tools
 
 import joptsimple._
 import kafka.utils._
-import kafka.producer.ProducerConfig
 import kafka.consumer._
 import kafka.client.ClientUtils
 import kafka.api.{OffsetRequest, FetchRequestBuilder, Request}
 import kafka.cluster.Broker
-import java.util.Properties
 import scala.collection.JavaConversions._
 
 /**
@@ -127,7 +125,7 @@ object SimpleConsumerShell extends Logging {
     // getting topic metadata
     info("Getting topic metatdata...")
     val metadataTargetBrokers = ClientUtils.parseBrokerList(options.valueOf(brokerListOpt))
-    val topicsMetadata = ClientUtils.fetchTopicMetadata(Set(topic), metadataTargetBrokers).topicsMetadata
+    val topicsMetadata = ClientUtils.fetchTopicMetadata(Set(topic), clientId, metadataTargetBrokers).topicsMetadata
     if(topicsMetadata.size != 1 || !topicsMetadata(0).topic.equals(topic)) {
       System.err.println(("Error: no valid topic metadata for topic: %s, " + "what we get from server is only: %s").format(topic, topicsMetadata))
       System.exit(1)
@@ -167,7 +165,7 @@ object SimpleConsumerShell extends Logging {
       System.exit(1)
     }
     if(startingOffset < 0)
-      startingOffset = SimpleConsumer.earliestOrLatestOffset(fetchTargetBroker, topic, partitionId, startingOffset, false)
+      startingOffset = SimpleConsumer.earliestOrLatestOffset(fetchTargetBroker, topic, partitionId, startingOffset, clientId, false)
 
     // initializing formatter
     val formatter: MessageFormatter = messageFormatterClass.newInstance().asInstanceOf[MessageFormatter]
@@ -175,7 +173,7 @@ object SimpleConsumerShell extends Logging {
 
     info("Starting simple consumer shell to partition [%s, %d], replica [%d], host and port: [%s, %d], from offset [%d]"
                  .format(topic, partitionId, replicaId, fetchTargetBroker.host, fetchTargetBroker.port, startingOffset))
-    val simpleConsumer = new SimpleConsumer(fetchTargetBroker.host, fetchTargetBroker.port, 10000, 64*1024)
+    val simpleConsumer = new SimpleConsumer(fetchTargetBroker.host, fetchTargetBroker.port, 10000, 64*1024, clientId)
     val thread = Utils.newThread("kafka-simpleconsumer-shell", new Runnable() {
       def run() {
         var offset = startingOffset

http://git-wip-us.apache.org/repos/asf/kafka/blob/d7c71c09/core/src/main/scala/kafka/tools/UpdateOffsetsInZK.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/tools/UpdateOffsetsInZK.scala b/core/src/main/scala/kafka/tools/UpdateOffsetsInZK.scala
index fed7aad..111c9a8 100644
--- a/core/src/main/scala/kafka/tools/UpdateOffsetsInZK.scala
+++ b/core/src/main/scala/kafka/tools/UpdateOffsetsInZK.scala
@@ -65,7 +65,7 @@ object UpdateOffsetsInZK {
 
       ZkUtils.getBrokerInfo(zkClient, broker) match {
         case Some(brokerInfo) =>
-          val consumer = new SimpleConsumer(brokerInfo.host, brokerInfo.port, 10000, 100 * 1024)
+          val consumer = new SimpleConsumer(brokerInfo.host, brokerInfo.port, 10000, 100 * 1024, "UpdateOffsetsInZk")
           val topicAndPartition = TopicAndPartition(topic, partition)
           val request = OffsetRequest(Map(topicAndPartition -> PartitionOffsetRequestInfo(offsetOption, 1)))
           val offset = consumer.getOffsetsBefore(request).partitionErrorAndOffsets(topicAndPartition).offsets.head

http://git-wip-us.apache.org/repos/asf/kafka/blob/d7c71c09/core/src/main/scala/kafka/utils/ClientIdAndTopic.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/utils/ClientIdAndTopic.scala b/core/src/main/scala/kafka/utils/ClientIdAndTopic.scala
new file mode 100644
index 0000000..780339e
--- /dev/null
+++ b/core/src/main/scala/kafka/utils/ClientIdAndTopic.scala
@@ -0,0 +1,64 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package kafka.utils
+
+import kafka.common.InvalidTopicException
+import kafka.common.InvalidClientIdException
+import util.matching.Regex
+
+object ClientId {
+  val legalChars = "[a-zA-Z0-9_-]"
+  val maxNameLength = 200 // to prevent hitting filename max length limit
+  private val rgx = new Regex(legalChars + "*")
+
+  def validate(clientId: String) {
+    if (clientId.length > maxNameLength)
+      throw new InvalidClientIdException("ClientId is illegal, can't be longer than " + maxNameLength + " characters")
+
+    rgx.findFirstIn(clientId) match {
+      case Some(t) =>
+        if (!t.equals(clientId))
+          throw new InvalidClientIdException("ClientId " + clientId + " is illegal, contains a character other than ASCII alphanumerics, _ and -")
+      case None => throw new InvalidClientIdException("ClientId " + clientId + " is illegal,  contains a character other than ASCII alphanumerics, _ and -")
+    }
+  }
+}
+
+object Topic {
+  val legalChars = "[a-zA-Z0-9_-]"
+  val maxNameLength = 255
+  private val rgx = new Regex(legalChars + "+")
+
+  def validate(topic: String) {
+    if (topic.length <= 0)
+      throw new InvalidTopicException("topic name is illegal, can't be empty")
+    else if (topic.length > maxNameLength)
+      throw new InvalidTopicException("topic name is illegal, can't be longer than " + maxNameLength + " characters")
+
+    rgx.findFirstIn(topic) match {
+      case Some(t) =>
+        if (!t.equals(topic))
+          throw new InvalidTopicException("topic name " + topic + " is illegal, contains a character other than ASCII alphanumerics, _ and -")
+      case None => throw new InvalidTopicException("topic name " + topic + " is illegal,  contains a character other than ASCII alphanumerics, _ and -")
+    }
+  }
+}
+
+case class ClientIdAndTopic(clientId: String, topic:String) {
+  override def toString = "%s-%s".format(clientId, topic)
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/kafka/blob/d7c71c09/core/src/main/scala/kafka/utils/Topic.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/utils/Topic.scala b/core/src/main/scala/kafka/utils/Topic.scala
index fe79adf..e69de29 100644
--- a/core/src/main/scala/kafka/utils/Topic.scala
+++ b/core/src/main/scala/kafka/utils/Topic.scala
@@ -1,41 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package kafka.utils
-
-import kafka.common.InvalidTopicException
-import util.matching.Regex
-
-object Topic {
-  val legalChars = "[a-zA-Z0-9_-]"
-  val maxNameLength = 255
-  private val rgx = new Regex(legalChars + "+")
-
-  def validate(topic: String) {
-    if (topic.length <= 0)
-      throw new InvalidTopicException("topic name is illegal, can't be empty")
-    else if (topic.length > maxNameLength)
-      throw new InvalidTopicException("topic name is illegal, can't be longer than " + maxNameLength + " characters")
-
-    rgx.findFirstIn(topic) match {
-      case Some(t) =>
-        if (!t.equals(topic))
-          throw new InvalidTopicException("topic name " + topic + " is illegal, contains a character other than ASCII alphanumerics, _ and -")
-      case None => throw new InvalidTopicException("topic name " + topic + " is illegal,  contains a character other than ASCII alphanumerics, _ and -")
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/kafka/blob/d7c71c09/core/src/test/scala/other/kafka/TestKafkaAppender.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/other/kafka/TestKafkaAppender.scala b/core/src/test/scala/other/kafka/TestKafkaAppender.scala
index bd09d78..ab807a1 100644
--- a/core/src/test/scala/other/kafka/TestKafkaAppender.scala
+++ b/core/src/test/scala/other/kafka/TestKafkaAppender.scala
@@ -17,7 +17,6 @@
 
 package kafka
 
-import message.Message
 import org.apache.log4j.PropertyConfigurator
 import kafka.utils.Logging
 import serializer.Encoder

http://git-wip-us.apache.org/repos/asf/kafka/blob/d7c71c09/core/src/test/scala/other/kafka/TestZKConsumerOffsets.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/other/kafka/TestZKConsumerOffsets.scala b/core/src/test/scala/other/kafka/TestZKConsumerOffsets.scala
index 7d48458..5b72eed 100644
--- a/core/src/test/scala/other/kafka/TestZKConsumerOffsets.scala
+++ b/core/src/test/scala/other/kafka/TestZKConsumerOffsets.scala
@@ -18,7 +18,6 @@
 package kafka
 
 import consumer._
-import message.Message
 import utils.Utils
 import java.util.concurrent.CountDownLatch