You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by jk...@apache.org on 2012/12/18 18:44:12 UTC

[9/30] git commit: KAFKA-646 Provide aggregate stats at the high level Producer and ZookeeperConsumerConnector level; reviewed by Neha Narkhede

KAFKA-646 Provide aggregate stats at the high level Producer and ZookeeperConsumerConnector level; reviewed by Neha Narkhede


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/8ad9018e
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/8ad9018e
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/8ad9018e

Branch: refs/heads/trunk
Commit: 8ad9018ea197676bdc4e57ccf985dc752e54e5c7
Parents: f0d2633
Author: Swapnil Ghike <sg...@linkedin.com>
Authored: Mon Dec 17 14:05:17 2012 -0800
Committer: Neha Narkhede <ne...@apache.org>
Committed: Mon Dec 17 14:05:17 2012 -0800

----------------------------------------------------------------------
 .../scala/kafka/admin/CreateTopicCommand.scala     |    1 +
 .../kafka/common/InvalidClientIdException.scala    |   22 -----
 .../main/scala/kafka/consumer/ConsumerConfig.scala |   31 +++++++-
 .../scala/kafka/consumer/ConsumerIterator.scala    |    3 +-
 .../scala/kafka/consumer/ConsumerTopicStats.scala  |   26 +++++-
 .../main/scala/kafka/consumer/KafkaStream.scala    |    4 +-
 .../scala/kafka/consumer/PartitionTopicInfo.scala  |    4 +-
 .../main/scala/kafka/consumer/SimpleConsumer.scala |   22 +++---
 .../consumer/ZookeeperConsumerConnector.scala      |   14 +--
 core/src/main/scala/kafka/producer/Producer.scala  |   61 +++-----------
 .../main/scala/kafka/producer/ProducerConfig.scala |   29 +++++++
 .../main/scala/kafka/producer/SyncProducer.scala   |   22 +++---
 .../kafka/producer/async/DefaultEventHandler.scala |    7 +-
 .../scala/kafka/server/AbstractFetcherThread.scala |   46 ++++++-----
 .../main/scala/kafka/utils/ClientIdAndTopic.scala  |   64 ---------------
 .../unit/kafka/consumer/ConsumerIteratorTest.scala |    6 +-
 .../scala/unit/kafka/integration/FetcherTest.scala |    6 +-
 .../unit/kafka/producer/AsyncProducerTest.scala    |   40 +++-------
 .../test/scala/unit/kafka/utils/ClientIdTest.scala |   60 --------------
 .../test/scala/unit/kafka/utils/TestUtils.scala    |    1 -
 .../test/scala/unit/kafka/utils/TopicTest.scala    |   61 --------------
 21 files changed, 172 insertions(+), 358 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/8ad9018e/core/src/main/scala/kafka/admin/CreateTopicCommand.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/admin/CreateTopicCommand.scala b/core/src/main/scala/kafka/admin/CreateTopicCommand.scala
index e304fee..fd3a397 100644
--- a/core/src/main/scala/kafka/admin/CreateTopicCommand.scala
+++ b/core/src/main/scala/kafka/admin/CreateTopicCommand.scala
@@ -21,6 +21,7 @@ import joptsimple.OptionParser
 import kafka.utils._
 import org.I0Itec.zkclient.ZkClient
 import scala.collection.mutable
+import kafka.common.Topic
 
 object CreateTopicCommand extends Logging {
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/8ad9018e/core/src/main/scala/kafka/common/InvalidClientIdException.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/common/InvalidClientIdException.scala b/core/src/main/scala/kafka/common/InvalidClientIdException.scala
index edf072d..e69de29 100644
--- a/core/src/main/scala/kafka/common/InvalidClientIdException.scala
+++ b/core/src/main/scala/kafka/common/InvalidClientIdException.scala
@@ -1,22 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package kafka.common
-
-class InvalidClientIdException(message: String) extends RuntimeException(message) {
-  def this() = this(null)
-}

http://git-wip-us.apache.org/repos/asf/kafka/blob/8ad9018e/core/src/main/scala/kafka/consumer/ConsumerConfig.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/consumer/ConsumerConfig.scala b/core/src/main/scala/kafka/consumer/ConsumerConfig.scala
index 660a977..50a2cd4 100644
--- a/core/src/main/scala/kafka/consumer/ConsumerConfig.scala
+++ b/core/src/main/scala/kafka/consumer/ConsumerConfig.scala
@@ -19,9 +19,10 @@ package kafka.consumer
 
 import java.util.Properties
 import kafka.api.OffsetRequest
-import kafka.utils.{VerifiableProperties, ZKConfig}
+import kafka.utils._
+import kafka.common.{InvalidConfigException, Config}
 
-object ConsumerConfig {
+object ConsumerConfig extends Config {
   val SocketTimeout = 30 * 1000
   val SocketBufferSize = 64*1024
   val FetchSize = 1024 * 1024
@@ -43,6 +44,28 @@ object ConsumerConfig {
   val MirrorTopicsBlacklistProp = "mirror.topics.blacklist"
   val MirrorConsumerNumThreadsProp = "mirror.consumer.numthreads"
   val DefaultClientId = ""
+
+  def validate(config: ConsumerConfig) {
+    validateClientId(config.clientId)
+    validateGroupId(config.groupId)
+    validateAutoOffsetReset(config.autoOffsetReset)
+  }
+
+  def validateClientId(clientId: String) {
+    validateChars("clientid", clientId)
+  }
+
+  def validateGroupId(groupId: String) {
+    validateChars("groupid", groupId)
+  }
+
+  def validateAutoOffsetReset(autoOffsetReset: String) {
+    autoOffsetReset match {
+      case OffsetRequest.SmallestTimeString =>
+      case OffsetRequest.LargestTimeString =>
+      case _ => throw new InvalidConfigException("Wrong value " + autoOffsetReset + " of autoOffsetReset in ConsumerConfig")
+    }
+  }
 }
 
 class ConsumerConfig private (val props: VerifiableProperties) extends ZKConfig(props) {
@@ -109,8 +132,10 @@ class ConsumerConfig private (val props: VerifiableProperties) extends ZKConfig(
   val enableShallowIterator = props.getBoolean("shallowiterator.enable", false)
 
   /**
-   * Cliient id is specified by the kafka consumer client, used to distinguish different clients
+   * Client id is specified by the kafka consumer client, used to distinguish different clients
    */
   val clientId = props.getString("clientid", groupId)
+
+  validate(this)
 }
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/8ad9018e/core/src/main/scala/kafka/consumer/ConsumerIterator.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/consumer/ConsumerIterator.scala b/core/src/main/scala/kafka/consumer/ConsumerIterator.scala
index c5062fc..746a4bd 100644
--- a/core/src/main/scala/kafka/consumer/ConsumerIterator.scala
+++ b/core/src/main/scala/kafka/consumer/ConsumerIterator.scala
@@ -35,12 +35,13 @@ class ConsumerIterator[K, V](private val channel: BlockingQueue[FetchedDataChunk
                              private val keyDecoder: Decoder[K],
                              private val valueDecoder: Decoder[V],
                              val enableShallowIterator: Boolean,
-                             val consumerTopicStats: ConsumerTopicStats)
+                             val clientId: String)
   extends IteratorTemplate[MessageAndMetadata[K, V]] with Logging {
 
   private var current: AtomicReference[Iterator[MessageAndOffset]] = new AtomicReference(null)
   private var currentTopicInfo: PartitionTopicInfo = null
   private var consumedOffset: Long = -1L
+  private val consumerTopicStats = ConsumerTopicStatsRegistry.getConsumerTopicStat(clientId)
 
   override def next(): MessageAndMetadata[K, V] = {
     val item = super.next()

http://git-wip-us.apache.org/repos/asf/kafka/blob/8ad9018e/core/src/main/scala/kafka/consumer/ConsumerTopicStats.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/consumer/ConsumerTopicStats.scala b/core/src/main/scala/kafka/consumer/ConsumerTopicStats.scala
index 2a9d9fb..bb38f35 100644
--- a/core/src/main/scala/kafka/consumer/ConsumerTopicStats.scala
+++ b/core/src/main/scala/kafka/consumer/ConsumerTopicStats.scala
@@ -17,20 +17,25 @@
 
 package kafka.consumer
 
-import kafka.utils.{ClientIdAndTopic, Pool, threadsafe, Logging}
+import kafka.utils.{Pool, threadsafe, Logging}
 import java.util.concurrent.TimeUnit
 import kafka.metrics.KafkaMetricsGroup
+import kafka.common.ClientIdAndTopic
 
 @threadsafe
-class ConsumerTopicMetrics(clientIdTopic: ClientIdAndTopic) extends KafkaMetricsGroup {
-  val messageRate = newMeter(clientIdTopic + "-MessagesPerSec",  "messages", TimeUnit.SECONDS)
-  val byteRate = newMeter(clientIdTopic + "-BytesPerSec",  "bytes", TimeUnit.SECONDS)
+class ConsumerTopicMetrics(metricId: ClientIdAndTopic) extends KafkaMetricsGroup {
+  val messageRate = newMeter(metricId + "-MessagesPerSec",  "messages", TimeUnit.SECONDS)
+  val byteRate = newMeter(metricId + "-BytesPerSec",  "bytes", TimeUnit.SECONDS)
 }
 
+/**
+ * Tracks metrics for each topic the given consumer client has consumed data from.
+ * @param clientId The clientId of the given consumer client.
+ */
 class ConsumerTopicStats(clientId: String) extends Logging {
   private val valueFactory = (k: ClientIdAndTopic) => new ConsumerTopicMetrics(k)
   private val stats = new Pool[ClientIdAndTopic, ConsumerTopicMetrics](Some(valueFactory))
-  private val allTopicStats = new ConsumerTopicMetrics(new ClientIdAndTopic(clientId, "AllTopics"))
+  private val allTopicStats = new ConsumerTopicMetrics(new ClientIdAndTopic(clientId, "All.Topics")) // to differentiate from a topic named AllTopics
 
   def getConsumerAllTopicStats(): ConsumerTopicMetrics = allTopicStats
 
@@ -39,3 +44,14 @@ class ConsumerTopicStats(clientId: String) extends Logging {
   }
 }
 
+/**
+ * Stores the topic stats information of each consumer client in a (clientId -> ConsumerTopicStats) map.
+ */
+object ConsumerTopicStatsRegistry {
+  private val valueFactory = (k: String) => new ConsumerTopicStats(k)
+  private val globalStats = new Pool[String, ConsumerTopicStats](Some(valueFactory))
+
+  def getConsumerTopicStat(clientId: String) = {
+    globalStats.getAndMaybePut(clientId)
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/kafka/blob/8ad9018e/core/src/main/scala/kafka/consumer/KafkaStream.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/consumer/KafkaStream.scala b/core/src/main/scala/kafka/consumer/KafkaStream.scala
index 569f6df..d4e0e96 100644
--- a/core/src/main/scala/kafka/consumer/KafkaStream.scala
+++ b/core/src/main/scala/kafka/consumer/KafkaStream.scala
@@ -27,11 +27,11 @@ class KafkaStream[K,V](private val queue: BlockingQueue[FetchedDataChunk],
                         private val keyDecoder: Decoder[K],
                         private val valueDecoder: Decoder[V],
                         val enableShallowIterator: Boolean,
-                        val consumerTopicStats: ConsumerTopicStats)
+                        val clientId: String)
    extends Iterable[MessageAndMetadata[K,V]] with java.lang.Iterable[MessageAndMetadata[K,V]] {
 
   private val iter: ConsumerIterator[K,V] =
-    new ConsumerIterator[K,V](queue, consumerTimeoutMs, keyDecoder, valueDecoder, enableShallowIterator, consumerTopicStats)
+    new ConsumerIterator[K,V](queue, consumerTimeoutMs, keyDecoder, valueDecoder, enableShallowIterator, clientId)
 
   /**
    *  Create an iterator over messages in the stream.

http://git-wip-us.apache.org/repos/asf/kafka/blob/8ad9018e/core/src/main/scala/kafka/consumer/PartitionTopicInfo.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/consumer/PartitionTopicInfo.scala b/core/src/main/scala/kafka/consumer/PartitionTopicInfo.scala
index 8c42d11..6003cab 100644
--- a/core/src/main/scala/kafka/consumer/PartitionTopicInfo.scala
+++ b/core/src/main/scala/kafka/consumer/PartitionTopicInfo.scala
@@ -29,11 +29,13 @@ class PartitionTopicInfo(val topic: String,
                          private val consumedOffset: AtomicLong,
                          private val fetchedOffset: AtomicLong,
                          private val fetchSize: AtomicInteger,
-                         private val consumerTopicStats: ConsumerTopicStats) extends Logging {
+                         private val clientId: String) extends Logging {
 
   debug("initial consumer offset of " + this + " is " + consumedOffset.get)
   debug("initial fetch offset of " + this + " is " + fetchedOffset.get)
 
+  private val consumerTopicStats = ConsumerTopicStatsRegistry.getConsumerTopicStat(clientId)
+
   def getConsumeOffset() = consumedOffset.get
 
   def getFetchOffset() = fetchedOffset.get

http://git-wip-us.apache.org/repos/asf/kafka/blob/8ad9018e/core/src/main/scala/kafka/consumer/SimpleConsumer.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/consumer/SimpleConsumer.scala b/core/src/main/scala/kafka/consumer/SimpleConsumer.scala
index 5e1e6ab..6b83deb 100644
--- a/core/src/main/scala/kafka/consumer/SimpleConsumer.scala
+++ b/core/src/main/scala/kafka/consumer/SimpleConsumer.scala
@@ -20,8 +20,6 @@ package kafka.consumer
 import kafka.api._
 import kafka.network._
 import kafka.utils._
-import java.util.concurrent.TimeUnit
-import kafka.metrics.{KafkaTimer, KafkaMetricsGroup}
 import kafka.utils.ZkUtils._
 import collection.immutable
 import kafka.common.{TopicAndPartition, KafkaException}
@@ -87,10 +85,11 @@ class SimpleConsumer(val host: String,
                      val bufferSize: Int,
                      val clientId: String) extends Logging {
 
-  ClientId.validate(clientId)
+  ConsumerConfig.validateClientId(clientId)
   private val lock = new Object()
   private val blockingChannel = new BlockingChannel(host, port, bufferSize, BlockingChannel.UseDefaultBufferSize, soTimeout)
-  private val fetchRequestAndResponseStats = new FetchRequestAndResponseStats(clientId + "-host_%s-port_%s".format(host, port))
+  val brokerInfo = "host_%s-port_%s".format(host, port)
+  private val fetchRequestAndResponseStats = FetchRequestAndResponseStatsRegistry.getFetchRequestAndResponseStats(clientId)
 
   private def connect(): BlockingChannel = {
     close
@@ -155,12 +154,17 @@ class SimpleConsumer(val host: String,
    */
   def fetch(request: FetchRequest): FetchResponse = {
     var response: Receive = null
-    fetchRequestAndResponseStats.requestTimer.time {
-      response = sendRequest(request)
+    val specificTimer = fetchRequestAndResponseStats.getFetchRequestAndResponseStats(brokerInfo).requestTimer
+    val aggregateTimer = fetchRequestAndResponseStats.getFetchRequestAndResponseAllBrokersStats.requestTimer
+    aggregateTimer.time {
+      specificTimer.time {
+        response = sendRequest(request)
+      }
     }
     val fetchResponse = FetchResponse.readFrom(response.buffer)
     val fetchedSize = fetchResponse.sizeInBytes
-    fetchRequestAndResponseStats.respondSizeHist.update(fetchedSize)
+    fetchRequestAndResponseStats.getFetchRequestAndResponseStats(brokerInfo).requestSizeHist.update(fetchedSize)
+    fetchRequestAndResponseStats.getFetchRequestAndResponseAllBrokersStats.requestSizeHist.update(fetchedSize)
     fetchResponse
   }
 
@@ -178,7 +182,3 @@ class SimpleConsumer(val host: String,
   }
 }
 
-class FetchRequestAndResponseStats(clientId: String) extends KafkaMetricsGroup {
-  val requestTimer = new KafkaTimer(newTimer(clientId + "-FetchRequestRateAndTimeMs", TimeUnit.MILLISECONDS, TimeUnit.SECONDS))
-  val respondSizeHist = newHistogram(clientId + "-FetchResponseSize")
-}

http://git-wip-us.apache.org/repos/asf/kafka/blob/8ad9018e/core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala b/core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala
index 43e9fa6..aee9293 100644
--- a/core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala
+++ b/core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala
@@ -20,7 +20,7 @@ package kafka.consumer
 import java.util.concurrent._
 import java.util.concurrent.atomic._
 import locks.ReentrantLock
-import scala.collection._
+import collection._
 import kafka.cluster._
 import kafka.utils._
 import org.I0Itec.zkclient.exception.ZkNodeExistsException
@@ -35,6 +35,7 @@ import kafka.client.ClientUtils
 import com.yammer.metrics.core.Gauge
 import kafka.api.OffsetRequest
 import kafka.metrics._
+import scala.Some
 
 
 /**
@@ -80,7 +81,6 @@ private[kafka] class ZookeeperConsumerConnector(val config: ConsumerConfig,
                                                 val enableFetcher: Boolean) // for testing only
         extends ConsumerConnector with Logging with KafkaMetricsGroup {
 
-  ClientId.validate(config.clientId)
   private val isShuttingDown = new AtomicBoolean(false)
   private val rebalanceLock = new Object
   private var fetcher: Option[ConsumerFetcherManager] = None
@@ -95,8 +95,6 @@ private[kafka] class ZookeeperConsumerConnector(val config: ConsumerConfig,
 
   private var wildcardTopicWatcher: ZookeeperTopicEventWatcher = null
 
-  private val consumerTopicStats = new ConsumerTopicStats(config.clientId)
-
   val consumerIdString = {
     var consumerUuid : String = null
     config.consumerId match {
@@ -198,7 +196,7 @@ private[kafka] class ZookeeperConsumerConnector(val config: ConsumerConfig,
       threadIdSet.map(_ => {
         val queue =  new LinkedBlockingQueue[FetchedDataChunk](config.maxQueuedChunks)
         val stream = new KafkaStream[K,V](
-          queue, config.consumerTimeoutMs, keyDecoder, valueDecoder, config.enableShallowIterator, consumerTopicStats)
+          queue, config.consumerTimeoutMs, keyDecoder, valueDecoder, config.enableShallowIterator, config.clientId)
         (queue, stream)
       })
     ).flatten.toList
@@ -601,8 +599,6 @@ private[kafka] class ZookeeperConsumerConnector(val config: ConsumerConfig,
                 SimpleConsumer.earliestOrLatestOffset(zkClient, topic, leader, partition, OffsetRequest.EarliestTime, config.clientId)
               case OffsetRequest.LargestTimeString =>
                 SimpleConsumer.earliestOrLatestOffset(zkClient, topic, leader, partition, OffsetRequest.LatestTime, config.clientId)
-              case _ =>
-                throw new InvalidConfigException("Wrong value in autoOffsetReset in ConsumerConfig")
             }
         }
       val queue = topicThreadIdAndQueues.get((topic, consumerThreadId))
@@ -615,7 +611,7 @@ private[kafka] class ZookeeperConsumerConnector(val config: ConsumerConfig,
                                                  consumedOffset,
                                                  fetchedOffset,
                                                  new AtomicInteger(config.fetchSize),
-                                                 consumerTopicStats)
+                                                 config.clientId)
       partTopicInfoMap.put(partition, partTopicInfo)
       debug(partTopicInfo + " selected new offset " + offset)
     }
@@ -719,7 +715,7 @@ private[kafka] class ZookeeperConsumerConnector(val config: ConsumerConfig,
                                           keyDecoder, 
                                           valueDecoder, 
                                           config.enableShallowIterator,
-                                          consumerTopicStats)
+                                          config.clientId)
         (queue, stream)
     }).toList
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/8ad9018e/core/src/main/scala/kafka/producer/Producer.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/producer/Producer.scala b/core/src/main/scala/kafka/producer/Producer.scala
index 3bfd563..a183525 100644
--- a/core/src/main/scala/kafka/producer/Producer.scala
+++ b/core/src/main/scala/kafka/producer/Producer.scala
@@ -22,20 +22,15 @@ import java.util.Random
 import java.util.concurrent.{TimeUnit, LinkedBlockingQueue}
 import kafka.serializer.Encoder
 import java.util.concurrent.atomic.AtomicBoolean
-import kafka.common.{QueueFullException, InvalidConfigException}
+import kafka.common.QueueFullException
 import kafka.metrics._
 
 
 class Producer[K,V](config: ProducerConfig,
-                    private val eventHandler: EventHandler[K,V],
-                    private val producerStats: ProducerStats,
-                    private val producerTopicStats: ProducerTopicStats)  // only for unit testing
+                    private val eventHandler: EventHandler[K,V])  // only for unit testing
   extends Logging {
 
   private val hasShutdown = new AtomicBoolean(false)
-  if (config.batchSize > config.queueSize)
-    throw new InvalidConfigException("Batch size can't be larger than queue size.")
-
   private val queue = new LinkedBlockingQueue[KeyedMessage[K,V]](config.queueSize)
 
   private val random = new Random
@@ -53,30 +48,20 @@ class Producer[K,V](config: ProducerConfig,
                                                        config.batchSize,
                                                        config.clientId)
       producerSendThread.start()
-    case _ => throw new InvalidConfigException("Valid values for producer.type are sync/async")
   }
 
-  KafkaMetricsReporter.startReporters(config.props)
+  private val producerStats = ProducerStatsRegistry.getProducerStats(config.clientId)
+  private val producerTopicStats = ProducerTopicStatsRegistry.getProducerTopicStats(config.clientId)
 
-  def this(t: (ProducerConfig, EventHandler[K,V], ProducerStats, ProducerTopicStats)) =
-    this(t._1, t._2, t._3, t._4)
+  KafkaMetricsReporter.startReporters(config.props)
 
   def this(config: ProducerConfig) =
-    this {
-      ClientId.validate(config.clientId)
-      val producerStats = new ProducerStats(config.clientId)
-      val producerTopicStats = new ProducerTopicStats(config.clientId)
-      (config,
-       new DefaultEventHandler[K,V](config,
-                                    Utils.createObject[Partitioner[K]](config.partitionerClass, config.props),
-                                    Utils.createObject[Encoder[V]](config.serializerClass, config.props),
-                                    Utils.createObject[Encoder[K]](config.keySerializerClass, config.props),
-                                    new ProducerPool(config),
-                                    producerStats = producerStats,
-                                    producerTopicStats = producerTopicStats),
-       producerStats,
-       producerTopicStats)
-    }
+    this(config,
+         new DefaultEventHandler[K,V](config,
+                                      Utils.createObject[Partitioner[K]](config.partitionerClass, config.props),
+                                      Utils.createObject[Encoder[V]](config.serializerClass, config.props),
+                                      Utils.createObject[Encoder[K]](config.keySerializerClass, config.props),
+                                      new ProducerPool(config)))
 
   /**
    * Sends the data, partitioned by key to the topic using either the
@@ -146,28 +131,4 @@ class Producer[K,V](config: ProducerConfig,
   }
 }
 
-@threadsafe
-class ProducerTopicMetrics(clientIdTopic: ClientIdAndTopic) extends KafkaMetricsGroup {
-  val messageRate = newMeter(clientIdTopic + "-MessagesPerSec",  "messages", TimeUnit.SECONDS)
-  val byteRate = newMeter(clientIdTopic + "-BytesPerSec",  "bytes", TimeUnit.SECONDS)
-}
-
-class ProducerTopicStats(clientId: String) {
-  private val valueFactory = (k: ClientIdAndTopic) => new ProducerTopicMetrics(k)
-  private val stats = new Pool[ClientIdAndTopic, ProducerTopicMetrics](Some(valueFactory))
-  private val allTopicStats = new ProducerTopicMetrics(new ClientIdAndTopic(clientId, "AllTopics"))
-
-  def getProducerAllTopicStats(): ProducerTopicMetrics = allTopicStats
-
-  def getProducerTopicStats(topic: String): ProducerTopicMetrics = {
-    stats.getAndMaybePut(new ClientIdAndTopic(clientId, topic))
-  }
-}
-
-class ProducerStats(clientId: String) extends KafkaMetricsGroup {
-  val serializationErrorRate = newMeter(clientId + "-SerializationErrorsPerSec",  "errors", TimeUnit.SECONDS)
-  val resendRate = newMeter(clientId + "-ResendsPerSec",  "resends", TimeUnit.SECONDS)
-  val failedSendRate = newMeter(clientId + "-FailedSendsPerSec",  "failed sends", TimeUnit.SECONDS)
-  val droppedMessageRate = newMeter(clientId + "-DroppedMessagesPerSec",  "drops", TimeUnit.SECONDS)
-}
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/8ad9018e/core/src/main/scala/kafka/producer/ProducerConfig.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/producer/ProducerConfig.scala b/core/src/main/scala/kafka/producer/ProducerConfig.scala
index 30b1dc3..e559187 100644
--- a/core/src/main/scala/kafka/producer/ProducerConfig.scala
+++ b/core/src/main/scala/kafka/producer/ProducerConfig.scala
@@ -21,9 +21,36 @@ import async.AsyncProducerConfig
 import java.util.Properties
 import kafka.utils.{Utils, VerifiableProperties}
 import kafka.message.{CompressionCodec, NoCompressionCodec}
+import kafka.common.{InvalidConfigException, Config}
+
+object ProducerConfig extends Config {
+  def validate(config: ProducerConfig) {
+    validateClientId(config.clientId)
+    validateBatchSize(config.batchSize, config.queueSize)
+    validateProducerType(config.producerType)
+  }
+
+  def validateClientId(clientId: String) {
+    validateChars("clientid", clientId)
+  }
+
+  def validateBatchSize(batchSize: Int, queueSize: Int) {
+    if (batchSize > queueSize)
+      throw new InvalidConfigException("Batch size = " + batchSize + " can't be larger than queue size = " + queueSize)
+  }
+
+  def validateProducerType(producerType: String) {
+    producerType match {
+      case "sync" =>
+      case "async"=>
+      case _ => throw new InvalidConfigException("Invalid value " + producerType + " for producer.type, valid values are sync/async")
+    }
+  }
+}
 
 class ProducerConfig private (val props: VerifiableProperties)
         extends AsyncProducerConfig with SyncProducerConfigShared {
+  import ProducerConfig._
 
   def this(originalProps: Properties) {
     this(new VerifiableProperties(originalProps))
@@ -85,4 +112,6 @@ class ProducerConfig private (val props: VerifiableProperties)
   val producerRetries = props.getInt("producer.num.retries", 3)
 
   val producerRetryBackoffMs = props.getInt("producer.retry.backoff.ms", 100)
+
+  validate(this)
 }

http://git-wip-us.apache.org/repos/asf/kafka/blob/8ad9018e/core/src/main/scala/kafka/producer/SyncProducer.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/producer/SyncProducer.scala b/core/src/main/scala/kafka/producer/SyncProducer.scala
index 3183ceb..0ef320b 100644
--- a/core/src/main/scala/kafka/producer/SyncProducer.scala
+++ b/core/src/main/scala/kafka/producer/SyncProducer.scala
@@ -21,8 +21,6 @@ import kafka.api._
 import kafka.network.{BlockingChannel, BoundedByteBufferSend, Receive}
 import kafka.utils._
 import java.util.Random
-import java.util.concurrent.TimeUnit
-import kafka.metrics.{KafkaTimer, KafkaMetricsGroup}
 
 object SyncProducer {
   val RequestKey: Short = 0
@@ -39,7 +37,8 @@ class SyncProducer(val config: SyncProducerConfig) extends Logging {
   @volatile private var shutdown: Boolean = false
   private val blockingChannel = new BlockingChannel(config.host, config.port, BlockingChannel.UseDefaultBufferSize,
     config.bufferSize, config.requestTimeoutMs)
-  val producerRequestStats = new ProducerRequestStats(config.clientId + "-host_%s-port_%s".format(config.host, config.port))
+  val brokerInfo = "host_%s-port_%s".format(config.host, config.port)
+  val producerRequestStats = ProducerRequestStatsRegistry.getProducerRequestStats(config.clientId)
 
   trace("Instantiating Scala Sync Producer")
 
@@ -87,10 +86,17 @@ class SyncProducer(val config: SyncProducerConfig) extends Logging {
    * Send a message
    */
   def send(producerRequest: ProducerRequest): ProducerResponse = {
-    producerRequestStats.requestSizeHist.update(producerRequest.sizeInBytes)
+    val requestSize = producerRequest.sizeInBytes
+    producerRequestStats.getProducerRequestStats(brokerInfo).requestSizeHist.update(requestSize)
+    producerRequestStats.getProducerRequestAllBrokersStats.requestSizeHist.update(requestSize)
+
     var response: Receive = null
-    producerRequestStats.requestTimer.time {
-      response = doSend(producerRequest)
+    val specificTimer = producerRequestStats.getProducerRequestStats(brokerInfo).requestTimer
+    val aggregateTimer = producerRequestStats.getProducerRequestAllBrokersStats.requestTimer
+    aggregateTimer.time {
+      specificTimer.time {
+        response = doSend(producerRequest)
+      }
     }
     ProducerResponse.readFrom(response.buffer)
   }
@@ -150,7 +156,3 @@ class SyncProducer(val config: SyncProducerConfig) extends Logging {
   }
 }
 
-class ProducerRequestStats(clientId: String) extends KafkaMetricsGroup {
-  val requestTimer = new KafkaTimer(newTimer(clientId + "-ProduceRequestRateAndTimeMs", TimeUnit.MILLISECONDS, TimeUnit.SECONDS))
-  val requestSizeHist = newHistogram(clientId + "-ProducerRequestSize")
-}

http://git-wip-us.apache.org/repos/asf/kafka/blob/8ad9018e/core/src/main/scala/kafka/producer/async/DefaultEventHandler.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/producer/async/DefaultEventHandler.scala b/core/src/main/scala/kafka/producer/async/DefaultEventHandler.scala
index 7d0f609..4f04862 100644
--- a/core/src/main/scala/kafka/producer/async/DefaultEventHandler.scala
+++ b/core/src/main/scala/kafka/producer/async/DefaultEventHandler.scala
@@ -33,9 +33,7 @@ class DefaultEventHandler[K,V](config: ProducerConfig,
                                private val encoder: Encoder[V],
                                private val keyEncoder: Encoder[K],
                                private val producerPool: ProducerPool,
-                               private val topicPartitionInfos: HashMap[String, TopicMetadata] = new HashMap[String, TopicMetadata],
-                               private val producerStats: ProducerStats,
-                               private val producerTopicStats: ProducerTopicStats)
+                               private val topicPartitionInfos: HashMap[String, TopicMetadata] = new HashMap[String, TopicMetadata])
   extends EventHandler[K,V] with Logging {
   val isSync = ("sync" == config.producerType)
 
@@ -45,6 +43,9 @@ class DefaultEventHandler[K,V](config: ProducerConfig,
 
   private val lock = new Object()
 
+  private val producerStats = ProducerStatsRegistry.getProducerStats(config.clientId)
+  private val producerTopicStats = ProducerTopicStatsRegistry.getProducerTopicStats(config.clientId)
+
   def handle(events: Seq[KeyedMessage[K,V]]) {
     lock synchronized {
       val serializedData = serialize(events)

http://git-wip-us.apache.org/repos/asf/kafka/blob/8ad9018e/core/src/main/scala/kafka/server/AbstractFetcherThread.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/server/AbstractFetcherThread.scala b/core/src/main/scala/kafka/server/AbstractFetcherThread.scala
index 6d73c82..96f0df6 100644
--- a/core/src/main/scala/kafka/server/AbstractFetcherThread.scala
+++ b/core/src/main/scala/kafka/server/AbstractFetcherThread.scala
@@ -19,7 +19,7 @@ package kafka.server
 
 import kafka.cluster.Broker
 import kafka.consumer.SimpleConsumer
-import kafka.common.{TopicAndPartition, ErrorMapping}
+import kafka.common.{ClientIdAndBroker, TopicAndPartition, ErrorMapping}
 import collection.mutable
 import kafka.message.ByteBufferMessageSet
 import kafka.message.MessageAndOffset
@@ -27,7 +27,7 @@ import kafka.api.{FetchResponse, FetchResponsePartitionData, FetchRequestBuilder
 import kafka.metrics.KafkaMetricsGroup
 import com.yammer.metrics.core.Gauge
 import java.util.concurrent.atomic.AtomicLong
-import kafka.utils.{ClientIdAndTopic, Pool, ShutdownableThread}
+import kafka.utils.{Pool, ShutdownableThread}
 import java.util.concurrent.TimeUnit
 import java.util.concurrent.locks.ReentrantLock
 
@@ -43,9 +43,10 @@ abstract class AbstractFetcherThread(name: String, clientId: String, sourceBroke
   private val partitionMapCond = partitionMapLock.newCondition()
   val simpleConsumer = new SimpleConsumer(sourceBroker.host, sourceBroker.port, socketTimeout, socketBufferSize, clientId)
   private val brokerInfo = "host_%s-port_%s".format(sourceBroker.host, sourceBroker.port)
-  val fetcherStats = new FetcherStats(clientId + "-" + brokerInfo)
+  private val metricId = new ClientIdAndBroker(clientId, brokerInfo)
+  val fetcherStats = new FetcherStats(metricId)
   val fetcherMetrics = fetcherStats.getFetcherStats(name + "-" + sourceBroker.id)
-  val fetcherLagStats = new FetcherLagStats(clientId + "-" + brokerInfo)
+  val fetcherLagStats = new FetcherLagStats(metricId)
 
   /* callbacks to be defined in subclass */
 
@@ -66,7 +67,7 @@ abstract class AbstractFetcherThread(name: String, clientId: String, sourceBroke
 
   override def doWork() {
     val fetchRequestuilder = new FetchRequestBuilder().
-            clientId(clientId + "-" + brokerInfo).
+            clientId(clientId).
             replicaId(fetcherBrokerId).
             maxWait(maxWait).
             minBytes(minBytes)
@@ -184,10 +185,10 @@ abstract class AbstractFetcherThread(name: String, clientId: String, sourceBroke
   }
 }
 
-class FetcherLagMetrics(clientIdTopicPartition: ClientIdTopicPartition) extends KafkaMetricsGroup {
+class FetcherLagMetrics(metricId: ClientIdBrokerTopicPartition) extends KafkaMetricsGroup {
   private[this] var lagVal = new AtomicLong(-1L)
   newGauge(
-    clientIdTopicPartition + "-ConsumerLag",
+    metricId + "-ConsumerLag",
     new Gauge[Long] {
       def getValue = lagVal.get
     }
@@ -200,29 +201,34 @@ class FetcherLagMetrics(clientIdTopicPartition: ClientIdTopicPartition) extends
   def lag = lagVal.get
 }
 
-class FetcherLagStats(clientId: String) {
-  private val valueFactory = (k: ClientIdTopicPartition) => new FetcherLagMetrics(k)
-  private val stats = new Pool[ClientIdTopicPartition, FetcherLagMetrics](Some(valueFactory))
+class FetcherLagStats(metricId: ClientIdAndBroker) {
+  private val valueFactory = (k: ClientIdBrokerTopicPartition) => new FetcherLagMetrics(k)
+  private val stats = new Pool[ClientIdBrokerTopicPartition, FetcherLagMetrics](Some(valueFactory))
 
   def getFetcherLagStats(topic: String, partitionId: Int): FetcherLagMetrics = {
-    stats.getAndMaybePut(new ClientIdTopicPartition(clientId, topic, partitionId))
+    stats.getAndMaybePut(new ClientIdBrokerTopicPartition(metricId.clientId, metricId.brokerInfo, topic, partitionId))
   }
 }
 
-class FetcherMetrics(clientIdTopic: ClientIdAndTopic) extends KafkaMetricsGroup {
-  val requestRate = newMeter(clientIdTopic + "-RequestsPerSec",  "requests", TimeUnit.SECONDS)
-  val byteRate = newMeter(clientIdTopic + "-BytesPerSec",  "bytes", TimeUnit.SECONDS)
+class FetcherMetrics(metricId: ClientIdBrokerTopic) extends KafkaMetricsGroup {
+  val requestRate = newMeter(metricId + "-RequestsPerSec",  "requests", TimeUnit.SECONDS)
+  val byteRate = newMeter(metricId + "-BytesPerSec",  "bytes", TimeUnit.SECONDS)
 }
 
-class FetcherStats(clientId: String) {
-  private val valueFactory = (k: ClientIdAndTopic) => new FetcherMetrics(k)
-  private val stats = new Pool[ClientIdAndTopic, FetcherMetrics](Some(valueFactory))
+class FetcherStats(metricId: ClientIdAndBroker) {
+  private val valueFactory = (k: ClientIdBrokerTopic) => new FetcherMetrics(k)
+  private val stats = new Pool[ClientIdBrokerTopic, FetcherMetrics](Some(valueFactory))
 
   def getFetcherStats(name: String): FetcherMetrics = {
-    stats.getAndMaybePut(new ClientIdAndTopic(clientId, name))
+    stats.getAndMaybePut(new ClientIdBrokerTopic(metricId.clientId, metricId.brokerInfo, name))
   }
 }
 
-case class ClientIdTopicPartition(clientId: String, topic: String, partitionId: Int) {
-  override def toString = "%s-%s-%d".format(clientId, topic, partitionId)
+case class ClientIdBrokerTopic(clientId: String, brokerInfo: String, topic: String) {
+  override def toString = "%s-%s-%s".format(clientId, brokerInfo, topic)
 }
+
+case class ClientIdBrokerTopicPartition(clientId: String, brokerInfo: String, topic: String, partitionId: Int) {
+  override def toString = "%s-%s-%s-%d".format(clientId, brokerInfo, topic, partitionId)
+}
+

http://git-wip-us.apache.org/repos/asf/kafka/blob/8ad9018e/core/src/main/scala/kafka/utils/ClientIdAndTopic.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/utils/ClientIdAndTopic.scala b/core/src/main/scala/kafka/utils/ClientIdAndTopic.scala
index 780339e..e69de29 100644
--- a/core/src/main/scala/kafka/utils/ClientIdAndTopic.scala
+++ b/core/src/main/scala/kafka/utils/ClientIdAndTopic.scala
@@ -1,64 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package kafka.utils
-
-import kafka.common.InvalidTopicException
-import kafka.common.InvalidClientIdException
-import util.matching.Regex
-
-object ClientId {
-  val legalChars = "[a-zA-Z0-9_-]"
-  val maxNameLength = 200 // to prevent hitting filename max length limit
-  private val rgx = new Regex(legalChars + "*")
-
-  def validate(clientId: String) {
-    if (clientId.length > maxNameLength)
-      throw new InvalidClientIdException("ClientId is illegal, can't be longer than " + maxNameLength + " characters")
-
-    rgx.findFirstIn(clientId) match {
-      case Some(t) =>
-        if (!t.equals(clientId))
-          throw new InvalidClientIdException("ClientId " + clientId + " is illegal, contains a character other than ASCII alphanumerics, _ and -")
-      case None => throw new InvalidClientIdException("ClientId " + clientId + " is illegal,  contains a character other than ASCII alphanumerics, _ and -")
-    }
-  }
-}
-
-object Topic {
-  val legalChars = "[a-zA-Z0-9_-]"
-  val maxNameLength = 255
-  private val rgx = new Regex(legalChars + "+")
-
-  def validate(topic: String) {
-    if (topic.length <= 0)
-      throw new InvalidTopicException("topic name is illegal, can't be empty")
-    else if (topic.length > maxNameLength)
-      throw new InvalidTopicException("topic name is illegal, can't be longer than " + maxNameLength + " characters")
-
-    rgx.findFirstIn(topic) match {
-      case Some(t) =>
-        if (!t.equals(topic))
-          throw new InvalidTopicException("topic name " + topic + " is illegal, contains a character other than ASCII alphanumerics, _ and -")
-      case None => throw new InvalidTopicException("topic name " + topic + " is illegal,  contains a character other than ASCII alphanumerics, _ and -")
-    }
-  }
-}
-
-case class ClientIdAndTopic(clientId: String, topic:String) {
-  override def toString = "%s-%s".format(clientId, topic)
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/kafka/blob/8ad9018e/core/src/test/scala/unit/kafka/consumer/ConsumerIteratorTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/consumer/ConsumerIteratorTest.scala b/core/src/test/scala/unit/kafka/consumer/ConsumerIteratorTest.scala
index 246b1ec..0b5363f 100644
--- a/core/src/test/scala/unit/kafka/consumer/ConsumerIteratorTest.scala
+++ b/core/src/test/scala/unit/kafka/consumer/ConsumerIteratorTest.scala
@@ -56,7 +56,7 @@ class ConsumerIteratorTest extends JUnit3Suite with KafkaServerTestHarness {
                                                            new AtomicLong(consumedOffset),
                                                            new AtomicLong(0),
                                                            new AtomicInteger(0),
-                                                           new ConsumerTopicStats("")))
+                                                           ""))
   val consumerConfig = new ConsumerConfig(TestUtils.createConsumerProperties(zkConnect, group, consumer0))
 
   override def setUp() {
@@ -80,8 +80,8 @@ class ConsumerIteratorTest extends JUnit3Suite with KafkaServerTestHarness {
                                                     new StringDecoder(), 
                                                     new StringDecoder(),
                                                     enableShallowIterator = false,
-                                                    consumerTopicStats = new ConsumerTopicStats(""))
-    var receivedMessages = (0 until 5).map(i => iter.next.message).toList
+                                                    clientId = "")
+    val receivedMessages = (0 until 5).map(i => iter.next.message).toList
 
     assertFalse(iter.hasNext)
     assertEquals(1, queue.size) // This is only the shutdown command.

http://git-wip-us.apache.org/repos/asf/kafka/blob/8ad9018e/core/src/test/scala/unit/kafka/integration/FetcherTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/integration/FetcherTest.scala b/core/src/test/scala/unit/kafka/integration/FetcherTest.scala
index 021f419..61d9fc9 100644
--- a/core/src/test/scala/unit/kafka/integration/FetcherTest.scala
+++ b/core/src/test/scala/unit/kafka/integration/FetcherTest.scala
@@ -50,7 +50,7 @@ class FetcherTest extends JUnit3Suite with KafkaServerTestHarness {
                                                            new AtomicLong(0),
                                                            new AtomicLong(0),
                                                            new AtomicInteger(0),
-                                                           new ConsumerTopicStats("")))
+                                                           ""))
 
   var fetcher: ConsumerFetcherManager = null
 
@@ -84,7 +84,9 @@ class FetcherTest extends JUnit3Suite with KafkaServerTestHarness {
   def sendMessages(messagesPerNode: Int): Int = {
     var count = 0
     for(conf <- configs) {
-      val producer: Producer[String, Array[Byte]] = TestUtils.createProducer(TestUtils.getBrokerListStrFromConfigs(configs), new DefaultEncoder(), new StringEncoder())
+      val producer: Producer[String, Array[Byte]] = TestUtils.createProducer(TestUtils.getBrokerListStrFromConfigs(configs),
+                                                                             new DefaultEncoder(),
+                                                                             new StringEncoder())
       val ms = 0.until(messagesPerNode).map(x => (conf.brokerId * 5 + x).toString.getBytes).toArray
       messages += conf.brokerId -> ms
       producer.send(ms.map(m => KeyedMessage[String, Array[Byte]](topic, topic, m)):_*)

http://git-wip-us.apache.org/repos/asf/kafka/blob/8ad9018e/core/src/test/scala/unit/kafka/producer/AsyncProducerTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/producer/AsyncProducerTest.scala b/core/src/test/scala/unit/kafka/producer/AsyncProducerTest.scala
index 90a7ed8..4767618 100644
--- a/core/src/test/scala/unit/kafka/producer/AsyncProducerTest.scala
+++ b/core/src/test/scala/unit/kafka/producer/AsyncProducerTest.scala
@@ -68,10 +68,7 @@ class AsyncProducerTest extends JUnit3Suite {
 
     val config = new ProducerConfig(props)
     val produceData = getProduceData(12)
-    val producer = new Producer[String, String](config,
-                                                mockEventHandler,
-                                                new ProducerStats(""),
-                                                new ProducerTopicStats(""))
+    val producer = new Producer[String, String](config, mockEventHandler)
     try {
       // send all 10 messages, should hit the batch size and then reach broker
       producer.send(produceData: _*)
@@ -192,9 +189,7 @@ class AsyncProducerTest extends JUnit3Suite {
                                                       encoder = null.asInstanceOf[Encoder[String]],
                                                       keyEncoder = new IntEncoder(),
                                                       producerPool = producerPool,
-                                                      topicPartitionInfos = topicPartitionInfos,
-                                                      producerStats = new ProducerStats(""),
-                                                      producerTopicStats = new ProducerTopicStats(""))
+                                                      topicPartitionInfos = topicPartitionInfos)
 
     val topic1Broker1Data = 
       ArrayBuffer[KeyedMessage[Int,Message]](new KeyedMessage[Int,Message]("topic1", 0, new Message("msg1".getBytes)),
@@ -233,9 +228,7 @@ class AsyncProducerTest extends JUnit3Suite {
                                                          encoder = new StringEncoder,
                                                          keyEncoder = new StringEncoder,
                                                          producerPool = producerPool,
-                                                         topicPartitionInfos = topicPartitionInfos,
-                                                         producerStats = new ProducerStats(""),
-                                                         producerTopicStats = new ProducerTopicStats(""))
+                                                         topicPartitionInfos = topicPartitionInfos)
 
     val serializedData = handler.serialize(produceData)
     val deserializedData = serializedData.map(d => new KeyedMessage[String,String](d.topic, Utils.readString(d.message.payload)))
@@ -263,9 +256,7 @@ class AsyncProducerTest extends JUnit3Suite {
                                                          encoder = null.asInstanceOf[Encoder[String]],
                                                          keyEncoder = null.asInstanceOf[Encoder[String]],
                                                          producerPool = producerPool,
-                                                         topicPartitionInfos = topicPartitionInfos,
-                                                         producerStats = new ProducerStats(""),
-                                                         producerTopicStats = new ProducerTopicStats(""))
+                                                         topicPartitionInfos = topicPartitionInfos)
     try {
       handler.partitionAndCollate(producerDataList)
       fail("Should fail with UnknownTopicOrPartitionException")
@@ -296,9 +287,7 @@ class AsyncProducerTest extends JUnit3Suite {
                                                          encoder = new StringEncoder,
                                                          keyEncoder = new StringEncoder,
                                                          producerPool = producerPool,
-                                                         topicPartitionInfos = topicPartitionInfos,
-                                                         producerStats = new ProducerStats(""),
-                                                         producerTopicStats = new ProducerTopicStats(""))
+                                                         topicPartitionInfos = topicPartitionInfos)
     try {
       handler.handle(producerDataList)
       fail("Should fail with NoBrokersForPartitionException")
@@ -345,9 +334,7 @@ class AsyncProducerTest extends JUnit3Suite {
                                                          encoder = null.asInstanceOf[Encoder[String]],
                                                          keyEncoder = null.asInstanceOf[Encoder[String]],
                                                          producerPool = producerPool,
-                                                         topicPartitionInfos = topicPartitionInfos,
-                                                         producerStats = new ProducerStats(""),
-                                                         producerTopicStats = new ProducerTopicStats(""))
+                                                         topicPartitionInfos = topicPartitionInfos)
     val producerDataList = new ArrayBuffer[KeyedMessage[String,Message]]
     producerDataList.append(new KeyedMessage[String,Message]("topic1", new Message("msg1".getBytes)))
     producerDataList.append(new KeyedMessage[String,Message]("topic2", new Message("msg2".getBytes)))
@@ -390,14 +377,9 @@ class AsyncProducerTest extends JUnit3Suite {
                                                          encoder = new StringEncoder,
                                                          keyEncoder = new StringEncoder,
                                                          producerPool = producerPool,
-                                                         topicPartitionInfos = topicPartitionInfos,
-                                                         producerStats = new ProducerStats(""),
-                                                         producerTopicStats = new ProducerTopicStats(""))
-
-    val producer = new Producer[String, String](config,
-                                                handler,
-                                                new ProducerStats(""),
-                                                new ProducerTopicStats(""))
+                                                         topicPartitionInfos = topicPartitionInfos)
+
+    val producer = new Producer[String, String](config, handler)
     try {
       // send all 10 messages, should create 2 batches and 2 syncproducer calls
       producer.send(msgs.map(m => new KeyedMessage[String,String](topic, m)): _*)
@@ -453,9 +435,7 @@ class AsyncProducerTest extends JUnit3Suite {
                                                       encoder = new StringEncoder(),
                                                       keyEncoder = new NullEncoder[Int](),
                                                       producerPool = producerPool,
-                                                      topicPartitionInfos = topicPartitionInfos,
-                                                      producerStats = new ProducerStats(""),
-                                                      producerTopicStats = new ProducerTopicStats(""))
+                                                      topicPartitionInfos = topicPartitionInfos)
     val data = msgs.map(m => new KeyedMessage[Int,String](topic1, 0, m)) ++ msgs.map(m => new KeyedMessage[Int,String](topic1, 1, m))
     handler.handle(data)
     handler.close()

http://git-wip-us.apache.org/repos/asf/kafka/blob/8ad9018e/core/src/test/scala/unit/kafka/utils/ClientIdTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/utils/ClientIdTest.scala b/core/src/test/scala/unit/kafka/utils/ClientIdTest.scala
index 794dcdc..e69de29 100644
--- a/core/src/test/scala/unit/kafka/utils/ClientIdTest.scala
+++ b/core/src/test/scala/unit/kafka/utils/ClientIdTest.scala
@@ -1,60 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package kafka.utils
-
-import junit.framework.Assert._
-import collection.mutable.ArrayBuffer
-import kafka.common.InvalidClientIdException
-import org.junit.Test
-
-class ClientIdTest {
-
-  @Test
-  def testInvalidClientIds() {
-    val invalidclientIds = new ArrayBuffer[String]()
-    var longName = "ATCG"
-    for (i <- 1 to 6)
-      longName += longName
-    invalidclientIds += longName
-    val badChars = Array('/', '\\', ',', '\0', ':', "\"", '\'', ';', '*', '?', '.', ' ', '\t', '\r', '\n', '=')
-    for (weirdChar <- badChars) {
-      invalidclientIds += "Is" + weirdChar + "funny"
-    }
-
-    for (i <- 0 until invalidclientIds.size) {
-      try {
-        ClientId.validate(invalidclientIds(i))
-        fail("Should throw InvalidClientIdException.")
-      }
-      catch {
-        case e: InvalidClientIdException => "This is good."
-      }
-    }
-
-    val validClientIds = new ArrayBuffer[String]()
-    validClientIds += ("valid", "CLIENT", "iDs", "ar6", "VaL1d", "_0-9_", "")
-    for (i <- 0 until validClientIds.size) {
-      try {
-        ClientId.validate(validClientIds(i))
-      }
-      catch {
-        case e: Exception => fail("Should not throw exception.")
-      }
-    }
-  }
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/kafka/blob/8ad9018e/core/src/test/scala/unit/kafka/utils/TestUtils.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/utils/TestUtils.scala b/core/src/test/scala/unit/kafka/utils/TestUtils.scala
index a145f2a..cebb371 100644
--- a/core/src/test/scala/unit/kafka/utils/TestUtils.scala
+++ b/core/src/test/scala/unit/kafka/utils/TestUtils.scala
@@ -24,7 +24,6 @@ import java.nio.channels._
 import java.util.Random
 import java.util.Properties
 import junit.framework.Assert._
-import kafka.api._
 import kafka.server._
 import kafka.producer._
 import kafka.message._

http://git-wip-us.apache.org/repos/asf/kafka/blob/8ad9018e/core/src/test/scala/unit/kafka/utils/TopicTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/utils/TopicTest.scala b/core/src/test/scala/unit/kafka/utils/TopicTest.scala
index e567be5..e69de29 100644
--- a/core/src/test/scala/unit/kafka/utils/TopicTest.scala
+++ b/core/src/test/scala/unit/kafka/utils/TopicTest.scala
@@ -1,61 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package kafka.utils
-
-import junit.framework.Assert._
-import collection.mutable.ArrayBuffer
-import kafka.common.InvalidTopicException
-import org.junit.Test
-
-class TopicTest {
-
-  @Test
-  def testInvalidTopicNames() {
-    val invalidTopicNames = new ArrayBuffer[String]()
-    invalidTopicNames += ("", ".", "..")
-    var longName = "ATCG"
-    for (i <- 1 to 6)
-      longName += longName
-    invalidTopicNames += longName
-    val badChars = Array('/', '\\', ',', '\0', ':', "\"", '\'', ';', '*', '?', '.')
-    for (weirdChar <- badChars) {
-      invalidTopicNames += "Is" + weirdChar + "funny"
-    }
-
-    for (i <- 0 until invalidTopicNames.size) {
-      try {
-        Topic.validate(invalidTopicNames(i))
-        fail("Should throw InvalidTopicException.")
-      }
-      catch {
-        case e: InvalidTopicException => "This is good."
-      }
-    }
-
-    val validTopicNames = new ArrayBuffer[String]()
-    validTopicNames += ("valid", "TOPIC", "nAmEs", "ar6", "VaL1d", "_0-9_")
-    for (i <- 0 until validTopicNames.size) {
-      try {
-        Topic.validate(validTopicNames(i))
-      }
-      catch {
-        case e: Exception => fail("Should not throw exception.")
-      }
-    }
-  }
-}