You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by li...@apache.org on 2018/05/25 00:33:31 UTC

[kafka] branch trunk updated: KAFKA-6921; Remove old Scala producer and related code

This is an automated email from the ASF dual-hosted git repository.

lindong pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 7132a85  KAFKA-6921; Remove old Scala producer and related code
7132a85 is described below

commit 7132a85fc394bc0627fe1763c17cb523d8a8ff37
Author: Ismael Juma <is...@juma.me.uk>
AuthorDate: Thu May 24 17:32:49 2018 -0700

    KAFKA-6921; Remove old Scala producer and related code
    
    * Removed Scala producers, request classes, kafka.tools.ProducerPerformance, encoders,
    tests.
    * Updated ConsoleProducer to remove Scala producer support (removed `BaseProducer`
    and several options that are not used by the Java producer).
    * Updated a few Scala consumer tests to use the new producer (including a minor
    refactor of `produceMessages` methods in `TestUtils`).
    * Updated `ClientUtils.fetchTopicMetadata` to use `SimpleConsumer` instead of
    `SyncProducer`.
    * Removed `TestKafkaAppender` as it looks useless and it defined an `Encoder`.
    * Minor import clean-ups
    
    No new tests added since behaviour should remain the same after these changes.
    
    Author: Ismael Juma <is...@juma.me.uk>
    
    Reviewers: Manikumar Reddy O <ma...@gmail.com>, Dong Lin <li...@gmail.com>
    
    Closes #5045 from ijuma/kafka-6921-remove-old-producer
---
 .../src/main/scala/kafka/api/ProducerRequest.scala | 148 ------
 .../main/scala/kafka/api/ProducerResponse.scala    | 110 -----
 core/src/main/scala/kafka/client/ClientUtils.scala |  48 +-
 .../scala/kafka/javaapi/producer/Producer.scala    |  52 ---
 .../scala/kafka/metrics/KafkaMetricsGroup.scala    |  10 -
 .../main/scala/kafka/producer/BaseProducer.scala   |  74 ---
 .../scala/kafka/producer/BrokerPartitionInfo.scala | 105 -----
 .../kafka/producer/ByteArrayPartitioner.scala      |  30 --
 .../scala/kafka/producer/DefaultPartitioner.scala  |  30 --
 .../main/scala/kafka/producer/KeyedMessage.scala   |  44 --
 .../main/scala/kafka/producer/Partitioner.scala    |  35 --
 core/src/main/scala/kafka/producer/Producer.scala  | 142 ------
 .../kafka/producer/ProducerClosedException.scala   |  22 -
 .../main/scala/kafka/producer/ProducerConfig.scala | 121 -----
 .../main/scala/kafka/producer/ProducerPool.scala   |  90 ----
 .../kafka/producer/ProducerRequestStats.scala      |  69 ---
 .../main/scala/kafka/producer/ProducerStats.scala  |  46 --
 .../scala/kafka/producer/ProducerTopicStats.scala  |  69 ---
 .../main/scala/kafka/producer/SyncProducer.scala   | 169 -------
 .../scala/kafka/producer/SyncProducerConfig.scala  |  72 ---
 .../kafka/producer/async/AsyncProducerConfig.scala |  49 --
 .../kafka/producer/async/DefaultEventHandler.scala | 359 ---------------
 .../scala/kafka/producer/async/EventHandler.scala  |  37 --
 .../async/IllegalQueueStateException.scala         |  26 --
 .../producer/async/MissingConfigException.scala    |  24 -
 .../kafka/producer/async/ProducerSendThread.scala  | 114 -----
 core/src/main/scala/kafka/serializer/Encoder.scala |  79 ----
 .../main/scala/kafka/tools/ConsoleProducer.scala   |  94 +---
 .../scala/kafka/tools/ProducerPerformance.scala    | 308 -------------
 .../kafka/api/CustomQuotaCallbackTest.scala        |   1 -
 .../server/DynamicBrokerReconfigurationTest.scala  |   7 -
 .../test/scala/other/kafka/TestKafkaAppender.scala |  50 --
 .../scala/unit/kafka/admin/DeleteTopicTest.scala   |   2 +-
 .../admin/ReassignPartitionsClusterTest.scala      |  20 +-
 .../kafka/admin/ResetConsumerGroupOffsetTest.scala |  65 +--
 .../api/RequestResponseSerializationTest.scala     |  36 +-
 .../test/scala/unit/kafka/common/ConfigTest.scala  |  32 --
 .../consumer/ZookeeperConsumerConnectorTest.scala  |  41 +-
 .../kafka/integration/AutoOffsetResetTest.scala    |  17 +-
 .../scala/unit/kafka/integration/FetcherTest.scala |   4 +-
 .../unit/kafka/integration/PrimitiveApiTest.scala  | 276 -----------
 .../integration/ProducerConsumerTestHarness.scala  |  48 --
 .../integration/UncleanLeaderElectionTest.scala    |   6 -
 .../consumer/ZookeeperConsumerConnectorTest.scala  |  44 +-
 .../scala/unit/kafka/metrics/MetricsTest.scala     |  41 +-
 .../unit/kafka/producer/AsyncProducerTest.scala    | 505 ---------------------
 .../scala/unit/kafka/producer/ProducerTest.scala   | 348 --------------
 .../unit/kafka/producer/SyncProducerTest.scala     | 253 -----------
 .../kafka/server/AbstractFetcherThreadTest.scala   |   1 -
 .../kafka/server/DescribeLogDirsRequestTest.scala  |   2 +-
 .../scala/unit/kafka/server/KafkaApisTest.scala    |   2 +-
 .../server/ReplicaAlterLogDirsThreadTest.scala     |   8 +-
 .../kafka/server/ReplicaFetcherThreadTest.scala    |   2 -
 .../unit/kafka/tools/ConsoleProducerTest.scala     |  10 +-
 .../test/scala/unit/kafka/utils/TestUtils.scala    | 160 +------
 docs/upgrade.html                                  |   5 +
 .../apache/kafka/streams/scala/TopologyTest.scala  |   7 +-
 .../services/performance/producer_performance.py   |   1 -
 58 files changed, 168 insertions(+), 4402 deletions(-)

diff --git a/core/src/main/scala/kafka/api/ProducerRequest.scala b/core/src/main/scala/kafka/api/ProducerRequest.scala
deleted file mode 100644
index 9cdb14b..0000000
--- a/core/src/main/scala/kafka/api/ProducerRequest.scala
+++ /dev/null
@@ -1,148 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- * 
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package kafka.api
-
-import java.nio._
-
-import kafka.api.ApiUtils._
-import kafka.common._
-import kafka.message._
-import org.apache.kafka.common.protocol.ApiKeys
-
-@deprecated("This object has been deprecated and will be removed in a future release.", "1.0.0")
-object ProducerRequest {
-  val CurrentVersion = 2.shortValue
-
-  def readFrom(buffer: ByteBuffer): ProducerRequest = {
-    val versionId: Short = buffer.getShort
-    val correlationId: Int = buffer.getInt
-    val clientId: String = readShortString(buffer)
-    val requiredAcks: Short = buffer.getShort
-    val ackTimeoutMs: Int = buffer.getInt
-    //build the topic structure
-    val topicCount = buffer.getInt
-    val partitionDataPairs = (1 to topicCount).flatMap(_ => {
-      // process topic
-      val topic = readShortString(buffer)
-      val partitionCount = buffer.getInt
-      (1 to partitionCount).map(_ => {
-        val partition = buffer.getInt
-        val messageSetSize = buffer.getInt
-        val messageSetBuffer = new Array[Byte](messageSetSize)
-        buffer.get(messageSetBuffer,0,messageSetSize)
-        (TopicAndPartition(topic, partition), new ByteBufferMessageSet(ByteBuffer.wrap(messageSetBuffer)))
-      })
-    })
-
-    ProducerRequest(versionId, correlationId, clientId, requiredAcks, ackTimeoutMs, collection.mutable.Map(partitionDataPairs:_*))
-  }
-}
-
-@deprecated("This object has been deprecated and will be removed in a future release.", "1.0.0")
-case class ProducerRequest(versionId: Short = ProducerRequest.CurrentVersion,
-                           correlationId: Int,
-                           clientId: String,
-                           requiredAcks: Short,
-                           ackTimeoutMs: Int,
-                           data: collection.mutable.Map[TopicAndPartition, ByteBufferMessageSet])
-    extends RequestOrResponse(Some(ApiKeys.PRODUCE.id)) {
-
-  /**
-   * Partitions the data into a map of maps (one for each topic).
-   */
-  private lazy val dataGroupedByTopic = data.groupBy(_._1.topic)
-  val topicPartitionMessageSizeMap = data.map(r => r._1 -> r._2.sizeInBytes).toMap
-
-  def this(correlationId: Int,
-           clientId: String,
-           requiredAcks: Short,
-           ackTimeoutMs: Int,
-           data: collection.mutable.Map[TopicAndPartition, ByteBufferMessageSet]) =
-    this(ProducerRequest.CurrentVersion, correlationId, clientId, requiredAcks, ackTimeoutMs, data)
-
-  def writeTo(buffer: ByteBuffer) {
-    buffer.putShort(versionId)
-    buffer.putInt(correlationId)
-    writeShortString(buffer, clientId)
-    buffer.putShort(requiredAcks)
-    buffer.putInt(ackTimeoutMs)
-
-    //save the topic structure
-    buffer.putInt(dataGroupedByTopic.size) //the number of topics
-    dataGroupedByTopic.foreach {
-      case (topic, topicAndPartitionData) =>
-        writeShortString(buffer, topic) //write the topic
-        buffer.putInt(topicAndPartitionData.size) //the number of partitions
-        topicAndPartitionData.foreach(partitionAndData => {
-          val partition = partitionAndData._1.partition
-          val partitionMessageData = partitionAndData._2
-          val bytes = partitionMessageData.buffer
-          buffer.putInt(partition)
-          buffer.putInt(bytes.limit())
-          buffer.put(bytes)
-          bytes.rewind
-        })
-    }
-  }
-
-  def sizeInBytes: Int = {
-    2 + /* versionId */
-    4 + /* correlationId */
-    shortStringLength(clientId) + /* client id */
-    2 + /* requiredAcks */
-    4 + /* ackTimeoutMs */
-    4 + /* number of topics */
-    dataGroupedByTopic.foldLeft(0)((foldedTopics, currTopic) => {
-      foldedTopics +
-      shortStringLength(currTopic._1) +
-      4 + /* the number of partitions */
-      {
-        currTopic._2.foldLeft(0)((foldedPartitions, currPartition) => {
-          foldedPartitions +
-          4 + /* partition id */
-          4 + /* byte-length of serialized messages */
-          currPartition._2.sizeInBytes
-        })
-      }
-    })
-  }
-
-  def numPartitions = data.size
-
-  override def toString: String = {
-    describe(true)
-  }
-
-  override def describe(details: Boolean): String = {
-    val producerRequest = new StringBuilder
-    producerRequest.append("Name: " + this.getClass.getSimpleName)
-    producerRequest.append("; Version: " + versionId)
-    producerRequest.append("; CorrelationId: " + correlationId)
-    producerRequest.append("; ClientId: " + clientId)
-    producerRequest.append("; RequiredAcks: " + requiredAcks)
-    producerRequest.append("; AckTimeoutMs: " + ackTimeoutMs + " ms")
-    if(details)
-      producerRequest.append("; TopicAndPartition: " + topicPartitionMessageSizeMap.mkString(","))
-    producerRequest.toString()
-  }
-
-  def emptyData(){
-    data.clear()
-  }
-}
-
diff --git a/core/src/main/scala/kafka/api/ProducerResponse.scala b/core/src/main/scala/kafka/api/ProducerResponse.scala
deleted file mode 100644
index 2d3c9cc..0000000
--- a/core/src/main/scala/kafka/api/ProducerResponse.scala
+++ /dev/null
@@ -1,110 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package kafka.api
-
-import java.nio.ByteBuffer
-import kafka.message.Message
-import org.apache.kafka.common.protocol.Errors
-
-import scala.collection.Map
-import kafka.common.TopicAndPartition
-import kafka.api.ApiUtils._
-
-@deprecated("This object has been deprecated and will be removed in a future release.", "1.0.0")
-object ProducerResponse {
-  // readFrom assumes that the response is written using V2 format
-  def readFrom(buffer: ByteBuffer): ProducerResponse = {
-    val correlationId = buffer.getInt
-    val topicCount = buffer.getInt
-    val statusPairs = (1 to topicCount).flatMap(_ => {
-      val topic = readShortString(buffer)
-      val partitionCount = buffer.getInt
-      (1 to partitionCount).map(_ => {
-        val partition = buffer.getInt
-        val error = Errors.forCode(buffer.getShort)
-        val offset = buffer.getLong
-        val timestamp = buffer.getLong
-        (TopicAndPartition(topic, partition), ProducerResponseStatus(error, offset, timestamp))
-      })
-    })
-
-    val throttleTime = buffer.getInt
-    ProducerResponse(correlationId, Map(statusPairs:_*), ProducerRequest.CurrentVersion, throttleTime)
-  }
-}
-
-case class ProducerResponseStatus(var error: Errors, offset: Long, timestamp: Long = Message.NoTimestamp)
-
-@deprecated("This object has been deprecated and will be removed in a future release.", "1.0.0")
-case class ProducerResponse(correlationId: Int,
-                            status: Map[TopicAndPartition, ProducerResponseStatus],
-                            requestVersion: Int = 0,
-                            throttleTime: Int = 0)
-    extends RequestOrResponse() {
-
-  /**
-   * Partitions the status map into a map of maps (one for each topic).
-   */
-  private lazy val statusGroupedByTopic = status.groupBy(_._1.topic)
-
-  def hasError = status.values.exists(_.error != Errors.NONE)
-
-  val sizeInBytes = {
-    val throttleTimeSize = if (requestVersion > 0) 4 else 0
-    val groupedStatus = statusGroupedByTopic
-    4 + /* correlation id */
-    4 + /* topic count */
-    groupedStatus.foldLeft (0) ((foldedTopics, currTopic) => {
-      foldedTopics +
-      shortStringLength(currTopic._1) +
-      4 + /* partition count for this topic */
-      currTopic._2.size * {
-        4 + /* partition id */
-        2 + /* error code */
-        8 + /* offset */
-        8 /* timestamp */
-      }
-    }) +
-    throttleTimeSize
-  }
-
-  def writeTo(buffer: ByteBuffer) {
-    val groupedStatus = statusGroupedByTopic
-    buffer.putInt(correlationId)
-    buffer.putInt(groupedStatus.size) // topic count
-
-    groupedStatus.foreach(topicStatus => {
-      val (topic, errorsAndOffsets) = topicStatus
-      writeShortString(buffer, topic)
-      buffer.putInt(errorsAndOffsets.size) // partition count
-      errorsAndOffsets.foreach {
-        case((TopicAndPartition(_, partition), ProducerResponseStatus(error, nextOffset, timestamp))) =>
-          buffer.putInt(partition)
-          buffer.putShort(error.code)
-          buffer.putLong(nextOffset)
-          buffer.putLong(timestamp)
-      }
-    })
-    // Throttle time is only supported on V1 style requests
-    if (requestVersion > 0)
-      buffer.putInt(throttleTime)
-  }
-
-  override def describe(details: Boolean):String = { toString }
-}
-
diff --git a/core/src/main/scala/kafka/client/ClientUtils.scala b/core/src/main/scala/kafka/client/ClientUtils.scala
index 5573256..53f3895 100755
--- a/core/src/main/scala/kafka/client/ClientUtils.scala
+++ b/core/src/main/scala/kafka/client/ClientUtils.scala
@@ -21,16 +21,15 @@ import org.apache.kafka.common.protocol.Errors
 import scala.collection._
 import kafka.cluster._
 import kafka.api._
-import kafka.producer._
 import kafka.common.{BrokerEndPointNotAvailableException, KafkaException}
 import kafka.utils.{CoreUtils, Logging}
-import java.util.Properties
 
 import util.Random
 import kafka.network.BlockingChannel
 import kafka.utils.ZkUtils
 import java.io.IOException
 
+import kafka.consumer.SimpleConsumer
 import org.apache.kafka.common.security.auth.SecurityProtocol
 
  /**
@@ -39,28 +38,32 @@ import org.apache.kafka.common.security.auth.SecurityProtocol
 @deprecated("This class has been deprecated and will be removed in a future release.", "0.11.0.0")
 object ClientUtils extends Logging {
 
-  /**
-   * Used by the producer to send a metadata request since it has access to the ProducerConfig
+   /**
+   * Send a metadata request
    * @param topics The topics for which the metadata needs to be fetched
-   * @param brokers The brokers in the cluster as configured on the producer through metadata.broker.list
-   * @param producerConfig The producer's config
+   * @param brokers The brokers in the cluster as configured on the client
+   * @param clientId The client's identifier
    * @return topic metadata response
    */
-  @deprecated("This method has been deprecated and will be removed in a future release.", "0.10.0.0")
-  def fetchTopicMetadata(topics: Set[String], brokers: Seq[BrokerEndPoint], producerConfig: ProducerConfig, correlationId: Int): TopicMetadataResponse = {
+  def fetchTopicMetadata(topics: Set[String], brokers: Seq[BrokerEndPoint], clientId: String, timeoutMs: Int,
+                                 correlationId: Int = 0): TopicMetadataResponse = {
     var fetchMetaDataSucceeded: Boolean = false
     var i: Int = 0
-    val topicMetadataRequest = new TopicMetadataRequest(TopicMetadataRequest.CurrentVersion, correlationId, producerConfig.clientId, topics.toSeq)
+    val topicMetadataRequest = new TopicMetadataRequest(TopicMetadataRequest.CurrentVersion, correlationId, clientId,
+      topics.toSeq)
     var topicMetadataResponse: TopicMetadataResponse = null
     var t: Throwable = null
     // shuffle the list of brokers before sending metadata requests so that most requests don't get routed to the
     // same broker
     val shuffledBrokers = Random.shuffle(brokers)
     while(i < shuffledBrokers.size && !fetchMetaDataSucceeded) {
-      val producer: SyncProducer = ProducerPool.createSyncProducer(producerConfig, shuffledBrokers(i))
-      info("Fetching metadata from broker %s with correlation id %d for %d topic(s) %s".format(shuffledBrokers(i), correlationId, topics.size, topics))
+      val broker = shuffledBrokers(i)
+      val consumer = new SimpleConsumer(broker.host, broker.port, timeoutMs, BlockingChannel.UseDefaultBufferSize,
+        clientId)
+      info("Fetching metadata from broker %s with correlation id %d for %d topic(s) %s".format(shuffledBrokers(i),
+        correlationId, topics.size, topics))
       try {
-        topicMetadataResponse = producer.send(topicMetadataRequest)
+        topicMetadataResponse = consumer.send(topicMetadataRequest)
         fetchMetaDataSucceeded = true
       }
       catch {
@@ -70,10 +73,10 @@ object ClientUtils extends Logging {
           t = e
       } finally {
         i = i + 1
-        producer.close()
+        consumer.close()
       }
     }
-    if(!fetchMetaDataSucceeded) {
+    if (!fetchMetaDataSucceeded) {
       throw new KafkaException("fetching topic metadata for topics [%s] from broker [%s] failed".format(topics, shuffledBrokers), t)
     } else {
       debug("Successfully fetched metadata for %d topic(s) %s".format(topics.size, topics))
@@ -82,23 +85,6 @@ object ClientUtils extends Logging {
   }
 
   /**
-   * Used by a non-producer client to send a metadata request
-   * @param topics The topics for which the metadata needs to be fetched
-   * @param brokers The brokers in the cluster as configured on the client
-   * @param clientId The client's identifier
-   * @return topic metadata response
-   */
-  def fetchTopicMetadata(topics: Set[String], brokers: Seq[BrokerEndPoint], clientId: String, timeoutMs: Int,
-                         correlationId: Int = 0): TopicMetadataResponse = {
-    val props = new Properties()
-    props.put("metadata.broker.list", brokers.map(_.connectionString).mkString(","))
-    props.put("client.id", clientId)
-    props.put("request.timeout.ms", timeoutMs.toString)
-    val producerConfig = new ProducerConfig(props)
-    fetchTopicMetadata(topics, brokers, producerConfig, correlationId)
-  }
-
-  /**
    * Parse a list of broker urls in the form host1:port1, host2:port2, ...
    */
   def parseBrokerList(brokerListStr: String): Seq[BrokerEndPoint] = {
diff --git a/core/src/main/scala/kafka/javaapi/producer/Producer.scala b/core/src/main/scala/kafka/javaapi/producer/Producer.scala
deleted file mode 100644
index b0b40b9..0000000
--- a/core/src/main/scala/kafka/javaapi/producer/Producer.scala
+++ /dev/null
@@ -1,52 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
-*/
-
-package kafka.javaapi.producer
-
-import kafka.producer.ProducerConfig
-import kafka.producer.KeyedMessage
-import scala.collection.mutable
-
-@deprecated("This class has been deprecated and will be removed in a future release. " +
-            "Please use org.apache.kafka.clients.producer.KafkaProducer instead.", "0.10.0.0")
-class Producer[K,V](private val underlying: kafka.producer.Producer[K,V]) // for testing only
-{
-  def this(config: ProducerConfig) = this(new kafka.producer.Producer[K,V](config))
-  /**
-   * Sends the data to a single topic, partitioned by key, using either the
-   * synchronous or the asynchronous producer
-   * @param message the producer data object that encapsulates the topic, key and message data
-   */
-  def send(message: KeyedMessage[K,V]) {
-    underlying.send(message)
-  }
-
-  /**
-   * Use this API to send data to multiple topics
-   * @param messages list of producer data objects that encapsulate the topic, key and message data
-   */
-  def send(messages: java.util.List[KeyedMessage[K,V]]) {
-    import collection.JavaConversions._
-    underlying.send((messages: mutable.Buffer[KeyedMessage[K,V]]).toSeq: _*)
-  }
-
-  /**
-   * Close API to close the producer pool connections to all Kafka brokers. Also closes
-   * the zookeeper client connection if one exists
-   */
-  def close() = underlying.close()
-}
diff --git a/core/src/main/scala/kafka/metrics/KafkaMetricsGroup.scala b/core/src/main/scala/kafka/metrics/KafkaMetricsGroup.scala
index 75eaafd..f95d0ad 100644
--- a/core/src/main/scala/kafka/metrics/KafkaMetricsGroup.scala
+++ b/core/src/main/scala/kafka/metrics/KafkaMetricsGroup.scala
@@ -22,7 +22,6 @@ import java.util.concurrent.TimeUnit
 import com.yammer.metrics.Metrics
 import com.yammer.metrics.core.{Gauge, MetricName}
 import kafka.consumer.{ConsumerTopicStatsRegistry, FetchRequestAndResponseStatsRegistry}
-import kafka.producer.{ProducerRequestStatsRegistry, ProducerStatsRegistry, ProducerTopicStatsRegistry}
 import kafka.utils.Logging
 import org.apache.kafka.common.utils.Sanitizer
 
@@ -179,18 +178,9 @@ object KafkaMetricsGroup extends KafkaMetricsGroup with Logging {
   def removeAllConsumerMetrics(clientId: String) {
     FetchRequestAndResponseStatsRegistry.removeConsumerFetchRequestAndResponseStats(clientId)
     ConsumerTopicStatsRegistry.removeConsumerTopicStat(clientId)
-    ProducerRequestStatsRegistry.removeProducerRequestStats(clientId)
     removeAllMetricsInList(KafkaMetricsGroup.consumerMetricNameList, clientId)
   }
 
-  @deprecated("This method has been deprecated and will be removed in a future release.", "0.10.0.0")
-  def removeAllProducerMetrics(clientId: String) {
-    ProducerRequestStatsRegistry.removeProducerRequestStats(clientId)
-    ProducerTopicStatsRegistry.removeProducerTopicStats(clientId)
-    ProducerStatsRegistry.removeProducerStats(clientId)
-    removeAllMetricsInList(KafkaMetricsGroup.producerMetricNameList, clientId)
-  }
-
   private def removeAllMetricsInList(metricNameList: immutable.List[MetricName], clientId: String) {
     metricNameList.foreach(metric => {
       val pattern = (".*clientId=" + clientId + ".*").r
diff --git a/core/src/main/scala/kafka/producer/BaseProducer.scala b/core/src/main/scala/kafka/producer/BaseProducer.scala
deleted file mode 100644
index 83d9aa7..0000000
--- a/core/src/main/scala/kafka/producer/BaseProducer.scala
+++ /dev/null
@@ -1,74 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package kafka.producer
-
-import java.util.Properties
-
-// A base producer used whenever we need to have options for both old and new producers;
-// this class will be removed once we fully rolled out 0.9
-@deprecated("This trait has been deprecated and will be removed in a future release. " +
-            "Please use org.apache.kafka.clients.producer.KafkaProducer instead.", "0.10.0.0")
-trait BaseProducer {
-  def send(topic: String, key: Array[Byte], value: Array[Byte])
-  def close()
-}
-
-@deprecated("This class has been deprecated and will be removed in a future release. " +
-            "Please use org.apache.kafka.clients.producer.KafkaProducer instead.", "0.10.0.0")
-class NewShinyProducer(producerProps: Properties) extends BaseProducer {
-  import org.apache.kafka.clients.producer.{KafkaProducer, ProducerRecord}
-  import org.apache.kafka.clients.producer.internals.ErrorLoggingCallback
-
-  // decide whether to send synchronously based on producer properties
-  val sync = producerProps.getProperty("producer.type", "async").equals("sync")
-
-  val producer = new KafkaProducer[Array[Byte],Array[Byte]](producerProps)
-
-  override def send(topic: String, key: Array[Byte], value: Array[Byte]) {
-    val record = new ProducerRecord[Array[Byte],Array[Byte]](topic, key, value)
-    if(sync) {
-      this.producer.send(record).get()
-    } else {
-      this.producer.send(record,
-        new ErrorLoggingCallback(topic, key, value, false))
-    }
-  }
-
-  override def close() {
-    this.producer.close()
-  }
-}
-
-@deprecated("This class has been deprecated and will be removed in a future release. " +
-            "Please use org.apache.kafka.clients.producer.KafkaProducer instead.", "0.10.0.0")
-class OldProducer(producerProps: Properties) extends BaseProducer {
-
-  // default to byte array partitioner
-  if (producerProps.getProperty("partitioner.class") == null)
-    producerProps.setProperty("partitioner.class", classOf[kafka.producer.ByteArrayPartitioner].getName)
-  val producer = new kafka.producer.Producer[Array[Byte], Array[Byte]](new ProducerConfig(producerProps))
-
-  override def send(topic: String, key: Array[Byte], value: Array[Byte]) {
-    this.producer.send(new KeyedMessage[Array[Byte], Array[Byte]](topic, key, value))
-  }
-
-  override def close() {
-    this.producer.close()
-  }
-}
-
diff --git a/core/src/main/scala/kafka/producer/BrokerPartitionInfo.scala b/core/src/main/scala/kafka/producer/BrokerPartitionInfo.scala
deleted file mode 100644
index e77a50c..0000000
--- a/core/src/main/scala/kafka/producer/BrokerPartitionInfo.scala
+++ /dev/null
@@ -1,105 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
-*/
-package kafka.producer
-
-import org.apache.kafka.common.protocol.Errors
-
-import collection.mutable.HashMap
-import kafka.api.TopicMetadata
-import kafka.common.KafkaException
-import kafka.utils.Logging
-import kafka.client.ClientUtils
-
-@deprecated("This class has been deprecated and will be removed in a future release.", "0.10.0.0")
-class BrokerPartitionInfo(producerConfig: ProducerConfig,
-                          producerPool: ProducerPool,
-                          topicPartitionInfo: HashMap[String, TopicMetadata])
-        extends Logging {
-  val brokerList = producerConfig.brokerList
-  val brokers = ClientUtils.parseBrokerList(brokerList)
-
-  /**
-   * Return a sequence of (brokerId, numPartitions).
-   * @param topic the topic for which this information is to be returned
-   * @return a sequence of (brokerId, numPartitions). Returns a zero-length
-   * sequence if no brokers are available.
-   */
-  def getBrokerPartitionInfo(topic: String, correlationId: Int): Seq[PartitionAndLeader] = {
-    debug("Getting broker partition info for topic %s".format(topic))
-    // check if the cache has metadata for this topic
-    val topicMetadata = topicPartitionInfo.get(topic)
-    val metadata: TopicMetadata =
-      topicMetadata match {
-        case Some(m) => m
-        case None =>
-          // refresh the topic metadata cache
-          updateInfo(Set(topic), correlationId)
-          val topicMetadata = topicPartitionInfo.get(topic)
-          topicMetadata match {
-            case Some(m) => m
-            case None => throw new KafkaException("Failed to fetch topic metadata for topic: " + topic)
-          }
-      }
-    val partitionMetadata = metadata.partitionsMetadata
-    if(partitionMetadata.isEmpty) {
-      if(metadata.error != Errors.NONE) {
-        throw new KafkaException(metadata.error.exception)
-      } else {
-        throw new KafkaException("Topic metadata %s has empty partition metadata and no error code".format(metadata))
-      }
-    }
-    partitionMetadata.map { m =>
-      m.leader match {
-        case Some(leader) =>
-          debug("Partition [%s,%d] has leader %d".format(topic, m.partitionId, leader.id))
-          new PartitionAndLeader(topic, m.partitionId, Some(leader.id))
-        case None =>
-          debug("Partition [%s,%d] does not have a leader yet".format(topic, m.partitionId))
-          new PartitionAndLeader(topic, m.partitionId, None)
-      }
-    }.sortWith((s, t) => s.partitionId < t.partitionId)
-  }
-
-  /**
-   * It updates the cache by issuing a get topic metadata request to a random broker.
-   * @param topics the topics for which the metadata is to be fetched
-   */
-  def updateInfo(topics: Set[String], correlationId: Int) {
-    var topicsMetadata: Seq[TopicMetadata] = Nil
-    val topicMetadataResponse = ClientUtils.fetchTopicMetadata(topics, brokers, producerConfig, correlationId)
-    topicsMetadata = topicMetadataResponse.topicsMetadata
-    // throw partition specific exception
-    topicsMetadata.foreach(tmd =>{
-      trace("Metadata for topic %s is %s".format(tmd.topic, tmd))
-      if(tmd.error == Errors.NONE) {
-        topicPartitionInfo.put(tmd.topic, tmd)
-      } else
-        warn("Error while fetching metadata [%s] for topic [%s]: %s ".format(tmd, tmd.topic, tmd.error.exception.getClass))
-      tmd.partitionsMetadata.foreach(pmd =>{
-        if (pmd.error != Errors.NONE && pmd.error == Errors.LEADER_NOT_AVAILABLE) {
-          warn("Error while fetching metadata %s for topic partition [%s,%d]: [%s]".format(pmd, tmd.topic, pmd.partitionId,
-            pmd.error.exception.getClass))
-        } // any other error code (e.g. ReplicaNotAvailable) can be ignored since the producer does not need to access the replica and isr metadata
-      })
-    })
-    producerPool.updateProducer(topicsMetadata)
-  }
-
-}
-
-@deprecated("This class has been deprecated and will be removed in a future release.", "0.10.0.0")
-case class PartitionAndLeader(topic: String, partitionId: Int, leaderBrokerIdOpt: Option[Int])
diff --git a/core/src/main/scala/kafka/producer/ByteArrayPartitioner.scala b/core/src/main/scala/kafka/producer/ByteArrayPartitioner.scala
deleted file mode 100755
index 7848456..0000000
--- a/core/src/main/scala/kafka/producer/ByteArrayPartitioner.scala
+++ /dev/null
@@ -1,30 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package kafka.producer
-
-
-import kafka.utils._
-import org.apache.kafka.common.utils.Utils
-
-@deprecated("This class has been deprecated and will be removed in a future release. " +
-            "It has been replaced by org.apache.kafka.clients.producer.internals.DefaultPartitioner.", "0.10.0.0")
-class ByteArrayPartitioner(props: VerifiableProperties = null) extends Partitioner {
-  def partition(key: Any, numPartitions: Int): Int = {
-    Utils.abs(java.util.Arrays.hashCode(key.asInstanceOf[Array[Byte]])) % numPartitions
-  }
-}
diff --git a/core/src/main/scala/kafka/producer/DefaultPartitioner.scala b/core/src/main/scala/kafka/producer/DefaultPartitioner.scala
deleted file mode 100755
index f793811..0000000
--- a/core/src/main/scala/kafka/producer/DefaultPartitioner.scala
+++ /dev/null
@@ -1,30 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
-*/
-
-package kafka.producer
-
-
-import kafka.utils._
-import org.apache.kafka.common.utils.Utils
-
-@deprecated("This class has been deprecated and will be removed in a future release. " +
-            "It has been replaced by org.apache.kafka.clients.producer.internals.DefaultPartitioner.", "0.10.0.0")
-class DefaultPartitioner(props: VerifiableProperties = null) extends Partitioner {
-  def partition(key: Any, numPartitions: Int): Int = {
-    Utils.abs(key.hashCode) % numPartitions
-  }
-}
diff --git a/core/src/main/scala/kafka/producer/KeyedMessage.scala b/core/src/main/scala/kafka/producer/KeyedMessage.scala
deleted file mode 100644
index 84ea232..0000000
--- a/core/src/main/scala/kafka/producer/KeyedMessage.scala
+++ /dev/null
@@ -1,44 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
-*/
-
-package kafka.producer
-
-/**
- * A topic, key, and value.
- * If a partition key is provided it will override the key for the purpose of partitioning but will not be stored.
- */
-@deprecated("This class has been deprecated and will be removed in a future release. " +
-            "Please use org.apache.kafka.clients.producer.ProducerRecord instead.", "0.10.0.0")
-case class KeyedMessage[K, V](topic: String, key: K, partKey: Any, message: V) {
-  if(topic == null)
-    throw new IllegalArgumentException("Topic cannot be null.")
-  
-  def this(topic: String, message: V) = this(topic, null.asInstanceOf[K], null, message)
-  
-  def this(topic: String, key: K, message: V) = this(topic, key, key, message)
-  
-  def partitionKey = {
-    if(partKey != null)
-      partKey
-    else if(hasKey)
-      key
-    else
-      null  
-  }
-  
-  def hasKey = key != null
-}
diff --git a/core/src/main/scala/kafka/producer/Partitioner.scala b/core/src/main/scala/kafka/producer/Partitioner.scala
deleted file mode 100644
index 5d24692..0000000
--- a/core/src/main/scala/kafka/producer/Partitioner.scala
+++ /dev/null
@@ -1,35 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
-*/
-package kafka.producer
-
-/**
- * A partitioner controls the mapping between user-provided keys and kafka partitions. Users can implement a custom
- * partitioner to change this mapping.
- * 
- * Implementations will be constructed via reflection and are required to have a constructor that takes a single 
- * VerifiableProperties instance--this allows passing configuration properties into the partitioner implementation.
- */
-@deprecated("This trait has been deprecated and will be removed in a future release. " +
-            "Please use org.apache.kafka.clients.producer.Partitioner instead.", "0.10.0.0")
-trait Partitioner {
-  /**
-   * Uses the key to calculate a partition bucket id for routing
-   * the data to the appropriate broker partition
-   * @return an integer between 0 and numPartitions-1
-   */
-  def partition(key: Any, numPartitions: Int): Int
-}
diff --git a/core/src/main/scala/kafka/producer/Producer.scala b/core/src/main/scala/kafka/producer/Producer.scala
deleted file mode 100755
index d6cf4c8..0000000
--- a/core/src/main/scala/kafka/producer/Producer.scala
+++ /dev/null
@@ -1,142 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package kafka.producer
-
-import java.util.concurrent.atomic.AtomicBoolean
-import java.util.concurrent.{LinkedBlockingQueue, TimeUnit}
-
-import kafka.common.{AppInfo, QueueFullException}
-import kafka.metrics._
-import kafka.producer.async.{DefaultEventHandler, EventHandler, ProducerSendThread}
-import kafka.serializer.Encoder
-import kafka.utils._
-
-@deprecated("This class has been deprecated and will be removed in a future release. " +
-            "Please use org.apache.kafka.clients.producer.KafkaProducer instead.", "0.10.0.0")
-class Producer[K,V](val config: ProducerConfig,
-                    private val eventHandler: EventHandler[K,V])  // only for unit testing
-  extends Logging {
-
-  private val hasShutdown = new AtomicBoolean(false)
-  private val queue = new LinkedBlockingQueue[KeyedMessage[K,V]](config.queueBufferingMaxMessages)
-
-  private var sync: Boolean = true
-  private var producerSendThread: ProducerSendThread[K,V] = null
-  private val lock = new Object()
-
-  config.producerType match {
-    case "sync" =>
-    case "async" =>
-      sync = false
-      producerSendThread = new ProducerSendThread[K,V]("ProducerSendThread-" + config.clientId,
-                                                       queue,
-                                                       eventHandler,
-                                                       config.queueBufferingMaxMs,
-                                                       config.batchNumMessages,
-                                                       config.clientId)
-      producerSendThread.start()
-  }
-
-  private val producerTopicStats = ProducerTopicStatsRegistry.getProducerTopicStats(config.clientId)
-
-  KafkaMetricsReporter.startReporters(config.props)
-  AppInfo.registerInfo()
-
-  def this(config: ProducerConfig) =
-    this(config,
-         new DefaultEventHandler[K,V](config,
-                                      CoreUtils.createObject[Partitioner](config.partitionerClass, config.props),
-                                      CoreUtils.createObject[Encoder[V]](config.serializerClass, config.props),
-                                      CoreUtils.createObject[Encoder[K]](config.keySerializerClass, config.props),
-                                      new ProducerPool(config)))
-
-  /**
-   * Sends the data, partitioned by key to the topic using either the
-   * synchronous or the asynchronous producer
-   * @param messages the producer data object that encapsulates the topic, key and message data
-   */
-  def send(messages: KeyedMessage[K,V]*) {
-    lock synchronized {
-      if (hasShutdown.get)
-        throw new ProducerClosedException
-      recordStats(messages)
-      if (sync)
-        eventHandler.handle(messages)
-      else
-        asyncSend(messages)
-    }
-  }
-
-
-  private def recordStats(messages: Seq[KeyedMessage[K,V]]) {
-    for (message <- messages) {
-      producerTopicStats.getProducerTopicStats(message.topic).messageRate.mark()
-      producerTopicStats.getProducerAllTopicsStats.messageRate.mark()
-    }
-  }
-
-  private def asyncSend(messages: Seq[KeyedMessage[K,V]]) {
-    for (message <- messages) {
-      val added = config.queueEnqueueTimeoutMs match {
-        case 0  =>
-          queue.offer(message)
-        case _  =>
-          try {
-            if (config.queueEnqueueTimeoutMs < 0) {
-              queue.put(message)
-              true
-            } else {
-              queue.offer(message, config.queueEnqueueTimeoutMs, TimeUnit.MILLISECONDS)
-            }
-          }
-          catch {
-            case _: InterruptedException =>
-              false
-          }
-      }
-      if(!added) {
-        producerTopicStats.getProducerTopicStats(message.topic).droppedMessageRate.mark()
-        producerTopicStats.getProducerAllTopicsStats.droppedMessageRate.mark()
-        throw new QueueFullException("Event queue is full of unsent messages, could not send event: " + message.toString)
-      }else {
-        trace("Added to send queue an event: " + message.toString)
-        trace("Remaining queue size: " + queue.remainingCapacity)
-      }
-    }
-  }
-
-  /**
-   * Close API to close the producer pool connections to all Kafka brokers. Also closes
-   * the zookeeper client connection if one exists
-   */
-  def close() = {
-    lock synchronized {
-      val canShutdown = hasShutdown.compareAndSet(false, true)
-      if(canShutdown) {
-        info("Shutting down producer")
-        val startTime = System.nanoTime()
-        KafkaMetricsGroup.removeAllProducerMetrics(config.clientId)
-        if (producerSendThread != null)
-          producerSendThread.shutdown
-        eventHandler.close()
-        info("Producer shutdown completed in " + (System.nanoTime() - startTime) / 1000000 + " ms")
-      }
-    }
-  }
-}
-
-
diff --git a/core/src/main/scala/kafka/producer/ProducerClosedException.scala b/core/src/main/scala/kafka/producer/ProducerClosedException.scala
deleted file mode 100644
index 4f2f731..0000000
--- a/core/src/main/scala/kafka/producer/ProducerClosedException.scala
+++ /dev/null
@@ -1,22 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package kafka.producer
-
-@deprecated("This class has been deprecated and will be removed in a future release.", "0.10.0.0")
-class ProducerClosedException() extends RuntimeException("producer already closed") {
-}
diff --git a/core/src/main/scala/kafka/producer/ProducerConfig.scala b/core/src/main/scala/kafka/producer/ProducerConfig.scala
deleted file mode 100755
index c2715d0..0000000
--- a/core/src/main/scala/kafka/producer/ProducerConfig.scala
+++ /dev/null
@@ -1,121 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
-*/
-
-package kafka.producer
-
-import async.AsyncProducerConfig
-import java.util.Properties
-import kafka.utils.{CoreUtils, VerifiableProperties}
-import kafka.message.NoCompressionCodec
-import kafka.common.{InvalidConfigException, Config}
-
-@deprecated("This object has been deprecated and will be removed in a future release. " +
-            "Please use org.apache.kafka.clients.producer.ProducerConfig instead.", "0.10.0.0")
-object ProducerConfig extends Config {
-  def validate(config: ProducerConfig) {
-    validateClientId(config.clientId)
-    validateBatchSize(config.batchNumMessages, config.queueBufferingMaxMessages)
-    validateProducerType(config.producerType)
-  }
-
-  def validateClientId(clientId: String) {
-    validateChars("client.id", clientId)
-  }
-
-  def validateBatchSize(batchSize: Int, queueSize: Int) {
-    if (batchSize > queueSize)
-      throw new InvalidConfigException("Batch size = " + batchSize + " can't be larger than queue size = " + queueSize)
-  }
-
-  def validateProducerType(producerType: String) {
-    producerType match {
-      case "sync" =>
-      case "async"=>
-      case _ => throw new InvalidConfigException("Invalid value " + producerType + " for producer.type, valid values are sync/async")
-    }
-  }
-}
-
-@deprecated("This class has been deprecated and will be removed in a future release. " +
-            "Please use org.apache.kafka.clients.producer.ProducerConfig instead.", "0.10.0.0")
-class ProducerConfig private (val props: VerifiableProperties)
-        extends AsyncProducerConfig with SyncProducerConfigShared {
-  import ProducerConfig._
-
-  def this(originalProps: Properties) {
-    this(new VerifiableProperties(originalProps))
-    props.verify()
-  }
-
-  /** This is for bootstrapping and the producer will only use it for getting metadata
-   * (topics, partitions and replicas). The socket connections for sending the actual data
-   * will be established based on the broker information returned in the metadata. The
-   * format is host1:port1,host2:port2, and the list can be a subset of brokers or
-   * a VIP pointing to a subset of brokers.
-   */
-  val brokerList = props.getString("metadata.broker.list")
-
-  /** the partitioner class for partitioning events amongst sub-topics */
-  val partitionerClass = props.getString("partitioner.class", "kafka.producer.DefaultPartitioner")
-
-  /** this parameter specifies whether the messages are sent asynchronously *
-   * or not. Valid values are - async for asynchronous send                 *
-   *                            sync for synchronous send                   */
-  val producerType = props.getString("producer.type", "sync")
-
-  /**
-   * This parameter allows you to specify the compression codec for all data generated *
-   * by this producer. The default is NoCompressionCodec
-   */
-  val compressionCodec = props.getCompressionCodec("compression.codec", NoCompressionCodec)
-
-  /** This parameter allows you to set whether compression should be turned *
-   *  on for particular topics
-   *
-   *  If the compression codec is anything other than NoCompressionCodec,
-   *
-   *    Enable compression only for specified topics if any
-   *
-   *    If the list of compressed topics is empty, then enable the specified compression codec for all topics
-   *
-   *  If the compression codec is NoCompressionCodec, compression is disabled for all topics
-   */
-  val compressedTopics = CoreUtils.parseCsvList(props.getString("compressed.topics", null))
-
-  /** The leader may be unavailable transiently, which can fail the sending of a message.
-    *  This property specifies the number of retries when such failures occur.
-    */
-  val messageSendMaxRetries = props.getInt("message.send.max.retries", 3)
-
-  /** Before each retry, the producer refreshes the metadata of relevant topics. Since leader
-    * election takes a bit of time, this property specifies the amount of time that the producer
-    * waits before refreshing the metadata.
-    */
-  val retryBackoffMs = props.getInt("retry.backoff.ms", 100)
-
-  /**
-   * The producer generally refreshes the topic metadata from brokers when there is a failure
-   * (partition missing, leader not available...). It will also poll regularly (default: every 10min
-   * so 600000ms). If you set this to a negative value, metadata will only get refreshed on failure.
-   * If you set this to zero, the metadata will get refreshed after each message sent (not recommended)
-   * Important note: the refresh happen only AFTER the message is sent, so if the producer never sends
-   * a message the metadata is never refreshed
-   */
-  val topicMetadataRefreshIntervalMs = props.getInt("topic.metadata.refresh.interval.ms", 600000)
-
-  validate(this)
-}
diff --git a/core/src/main/scala/kafka/producer/ProducerPool.scala b/core/src/main/scala/kafka/producer/ProducerPool.scala
deleted file mode 100644
index 6d4e4b7..0000000
--- a/core/src/main/scala/kafka/producer/ProducerPool.scala
+++ /dev/null
@@ -1,90 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package kafka.producer
-
-import java.util.Properties
-
-import kafka.api.TopicMetadata
-import kafka.cluster.BrokerEndPoint
-import kafka.common.UnavailableProducerException
-import kafka.utils.Logging
-import kafka.utils.Implicits._
-
-import scala.collection.mutable.HashMap
-
-@deprecated("This object has been deprecated and will be removed in a future release.", "0.10.0.0")
-object ProducerPool {
-  /**
-   * Used in ProducerPool to initiate a SyncProducer connection with a broker.
-   */
-  def createSyncProducer(config: ProducerConfig, broker: BrokerEndPoint): SyncProducer = {
-    val props = new Properties()
-    props.put("host", broker.host)
-    props.put("port", broker.port.toString)
-    props ++= config.props.props
-    new SyncProducer(new SyncProducerConfig(props))
-  }
-}
-
-@deprecated("This class has been deprecated and will be removed in a future release.", "0.10.0.0")
-class ProducerPool(val config: ProducerConfig) extends Logging {
-  private val syncProducers = new HashMap[Int, SyncProducer]
-  private val lock = new Object()
-
-  def updateProducer(topicMetadata: Seq[TopicMetadata]) {
-    val newBrokers = new collection.mutable.HashSet[BrokerEndPoint]
-    topicMetadata.foreach(tmd => {
-      tmd.partitionsMetadata.foreach(pmd => {
-        if(pmd.leader.isDefined) {
-          newBrokers += pmd.leader.get
-        }
-      })
-    })
-    lock synchronized {
-      newBrokers.foreach(b => {
-        if(syncProducers.contains(b.id)){
-          syncProducers(b.id).close()
-          syncProducers.put(b.id, ProducerPool.createSyncProducer(config, b))
-        } else
-          syncProducers.put(b.id, ProducerPool.createSyncProducer(config, b))
-      })
-    }
-  }
-
-  def getProducer(brokerId: Int) : SyncProducer = {
-    lock.synchronized {
-      val producer = syncProducers.get(brokerId)
-      producer match {
-        case Some(p) => p
-        case None => throw new UnavailableProducerException("Sync producer for broker id %d does not exist".format(brokerId))
-      }
-    }
-  }
-
-  /**
-   * Closes all the producers in the pool
-   */
-  def close() = {
-    lock.synchronized {
-      info("Closing all sync producers")
-      val iter = syncProducers.values.iterator
-      while(iter.hasNext)
-        iter.next.close
-    }
-  }
-}
diff --git a/core/src/main/scala/kafka/producer/ProducerRequestStats.scala b/core/src/main/scala/kafka/producer/ProducerRequestStats.scala
deleted file mode 100644
index 92bbbcf..0000000
--- a/core/src/main/scala/kafka/producer/ProducerRequestStats.scala
+++ /dev/null
@@ -1,69 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package kafka.producer
-
-import kafka.metrics.{KafkaTimer, KafkaMetricsGroup}
-import java.util.concurrent.TimeUnit
-import kafka.utils.Pool
-import kafka.common.{ClientIdAllBrokers, ClientIdBroker, ClientIdAndBroker}
-
-@deprecated("This class has been deprecated and will be removed in a future release.", "0.10.0.0")
-class ProducerRequestMetrics(metricId: ClientIdBroker) extends KafkaMetricsGroup {
-  val tags = metricId match {
-    case ClientIdAndBroker(clientId, brokerHost, brokerPort) => Map("clientId" -> clientId, "brokerHost" -> brokerHost, "brokerPort" -> brokerPort.toString)
-    case ClientIdAllBrokers(clientId) => Map("clientId" -> clientId)
-  }
-
-  val requestTimer = new KafkaTimer(newTimer("ProducerRequestRateAndTimeMs", TimeUnit.MILLISECONDS, TimeUnit.SECONDS, tags))
-  val requestSizeHist = newHistogram("ProducerRequestSize", biased = true, tags)
-  val throttleTimeStats = newTimer("ProducerRequestThrottleRateAndTimeMs", TimeUnit.MILLISECONDS, TimeUnit.SECONDS, tags)
-}
-
-/**
- * Tracks metrics of requests made by a given producer client to all brokers.
- * @param clientId ClientId of the given producer
- */
-@deprecated("This class has been deprecated and will be removed in a future release.", "0.10.0.0")
-class ProducerRequestStats(clientId: String) {
-  private val valueFactory = (k: ClientIdBroker) => new ProducerRequestMetrics(k)
-  private val stats = new Pool[ClientIdBroker, ProducerRequestMetrics](Some(valueFactory))
-  private val allBrokersStats = new ProducerRequestMetrics(new ClientIdAllBrokers(clientId))
-
-  def getProducerRequestAllBrokersStats(): ProducerRequestMetrics = allBrokersStats
-
-  def getProducerRequestStats(brokerHost: String, brokerPort: Int): ProducerRequestMetrics = {
-    stats.getAndMaybePut(new ClientIdAndBroker(clientId, brokerHost, brokerPort))
-  }
-}
-
-/**
- * Stores the request stats information of each producer client in a (clientId -> ProducerRequestStats) map.
- */
-@deprecated("This object has been deprecated and will be removed in a future release.", "0.10.0.0")
-object ProducerRequestStatsRegistry {
-  private val valueFactory = (k: String) => new ProducerRequestStats(k)
-  private val globalStats = new Pool[String, ProducerRequestStats](Some(valueFactory))
-
-  def getProducerRequestStats(clientId: String) = {
-    globalStats.getAndMaybePut(clientId)
-  }
-
-  def removeProducerRequestStats(clientId: String) {
-    globalStats.remove(clientId)
-  }
-}
-
diff --git a/core/src/main/scala/kafka/producer/ProducerStats.scala b/core/src/main/scala/kafka/producer/ProducerStats.scala
deleted file mode 100644
index 9466f26..0000000
--- a/core/src/main/scala/kafka/producer/ProducerStats.scala
+++ /dev/null
@@ -1,46 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package kafka.producer
-
-import kafka.metrics.KafkaMetricsGroup
-import java.util.concurrent.TimeUnit
-import kafka.utils.Pool
-
-@deprecated("This class has been deprecated and will be removed in a future release.", "0.10.0.0")
-class ProducerStats(clientId: String) extends KafkaMetricsGroup {
-  val tags: Map[String, String] = Map("clientId" -> clientId)
-  val serializationErrorRate = newMeter("SerializationErrorsPerSec", "errors", TimeUnit.SECONDS, tags)
-  val resendRate = newMeter("ResendsPerSec", "resends", TimeUnit.SECONDS, tags)
-  val failedSendRate = newMeter("FailedSendsPerSec", "failed sends", TimeUnit.SECONDS, tags)
-}
-
-/**
- * Stores metrics of serialization and message sending activity of each producer client in a (clientId -> ProducerStats) map.
- */
-@deprecated("This object has been deprecated and will be removed in a future release.", "0.10.0.0")
-object ProducerStatsRegistry {
-  private val valueFactory = (k: String) => new ProducerStats(k)
-  private val statsRegistry = new Pool[String, ProducerStats](Some(valueFactory))
-
-  def getProducerStats(clientId: String) = {
-    statsRegistry.getAndMaybePut(clientId)
-  }
-
-  def removeProducerStats(clientId: String) {
-    statsRegistry.remove(clientId)
-  }
-}
diff --git a/core/src/main/scala/kafka/producer/ProducerTopicStats.scala b/core/src/main/scala/kafka/producer/ProducerTopicStats.scala
deleted file mode 100644
index 7bb9610..0000000
--- a/core/src/main/scala/kafka/producer/ProducerTopicStats.scala
+++ /dev/null
@@ -1,69 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package kafka.producer
-
-import kafka.metrics.KafkaMetricsGroup
-import kafka.common.{ClientIdTopic, ClientIdAllTopics, ClientIdAndTopic}
-import kafka.utils.{Pool, threadsafe}
-import java.util.concurrent.TimeUnit
-
-@deprecated("This class has been deprecated and will be removed in a future release.", "0.10.0.0")
-@threadsafe
-class ProducerTopicMetrics(metricId: ClientIdTopic) extends KafkaMetricsGroup {
-  val tags = metricId match {
-    case ClientIdAndTopic(clientId, topic) => Map("clientId" -> clientId, "topic" -> topic)
-    case ClientIdAllTopics(clientId) => Map("clientId" -> clientId)
-  }
-
-  val messageRate = newMeter("MessagesPerSec", "messages", TimeUnit.SECONDS, tags)
-  val byteRate = newMeter("BytesPerSec", "bytes", TimeUnit.SECONDS, tags)
-  val droppedMessageRate = newMeter("DroppedMessagesPerSec", "drops", TimeUnit.SECONDS, tags)
-}
-
-/**
- * Tracks metrics for each topic the given producer client has produced data to.
- * @param clientId The clientId of the given producer client.
- */
-@deprecated("This class has been deprecated and will be removed in a future release.", "0.10.0.0")
-class ProducerTopicStats(clientId: String) {
-  private val valueFactory = (k: ClientIdTopic) => new ProducerTopicMetrics(k)
-  private val stats = new Pool[ClientIdTopic, ProducerTopicMetrics](Some(valueFactory))
-  private val allTopicsStats = new ProducerTopicMetrics(new ClientIdAllTopics(clientId)) // to differentiate from a topic named AllTopics
-
-  def getProducerAllTopicsStats(): ProducerTopicMetrics = allTopicsStats
-
-  def getProducerTopicStats(topic: String): ProducerTopicMetrics = {
-    stats.getAndMaybePut(new ClientIdAndTopic(clientId, topic))
-  }
-}
-
-/**
- * Stores the topic stats information of each producer client in a (clientId -> ProducerTopicStats) map.
- */
-@deprecated("This object has been deprecated and will be removed in a future release.", "0.10.0.0")
-object ProducerTopicStatsRegistry {
-  private val valueFactory = (k: String) => new ProducerTopicStats(k)
-  private val globalStats = new Pool[String, ProducerTopicStats](Some(valueFactory))
-
-  def getProducerTopicStats(clientId: String) = {
-    globalStats.getAndMaybePut(clientId)
-  }
-
-  def removeProducerTopicStats(clientId: String) {
-    globalStats.remove(clientId)
-  }
-}
diff --git a/core/src/main/scala/kafka/producer/SyncProducer.scala b/core/src/main/scala/kafka/producer/SyncProducer.scala
deleted file mode 100644
index b132293..0000000
--- a/core/src/main/scala/kafka/producer/SyncProducer.scala
+++ /dev/null
@@ -1,169 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package kafka.producer
-
-import java.util.Random
-import java.util.concurrent.TimeUnit
-
-import kafka.api._
-import kafka.network.{RequestOrResponseSend, BlockingChannel}
-import kafka.utils._
-import org.apache.kafka.common.network.NetworkReceive
-import org.apache.kafka.common.protocol.ApiKeys
-import org.apache.kafka.common.utils.Utils._
-
-@deprecated("This object has been deprecated and will be removed in a future release. " +
-            "Please use org.apache.kafka.clients.producer.KafkaProducer instead.", "0.10.0.0")
-object SyncProducer {
-  val RequestKey: Short = 0
-  val randomGenerator = new Random
-}
-
-/*
- * Send a message set.
- */
-@threadsafe
-@deprecated("This class has been deprecated and will be removed in a future release. " +
-            "Please use org.apache.kafka.clients.producer.KafkaProducer instead.", "0.10.0.0")
-class SyncProducer(val config: SyncProducerConfig) extends Logging {
-
-  private val lock = new Object()
-  @volatile private var shutdown: Boolean = false
-  private val blockingChannel = new BlockingChannel(config.host, config.port, BlockingChannel.UseDefaultBufferSize,
-    config.sendBufferBytes, config.requestTimeoutMs)
-  val producerRequestStats = ProducerRequestStatsRegistry.getProducerRequestStats(config.clientId)
-
-  trace("Instantiating Scala Sync Producer with properties: %s".format(config.props))
-
-  private def verifyRequest(request: RequestOrResponse) = {
-    /**
-     * This seems a little convoluted, but the idea is to turn on verification simply changing log4j settings
-     * Also, when verification is turned on, care should be taken to see that the logs don't fill up with unnecessary
-     * data. So, leaving the rest of the logging at TRACE, while errors should be logged at ERROR level
-     */
-    if (isDebugEnabled) {
-      val buffer = new RequestOrResponseSend("", request).buffer
-      trace("verifying sendbuffer of size " + buffer.limit())
-      val requestTypeId = buffer.getShort()
-      if(requestTypeId == ApiKeys.PRODUCE.id) {
-        val request = ProducerRequest.readFrom(buffer)
-        trace(request.toString)
-      }
-    }
-  }
-
-  /**
-   * Common functionality for the public send methods
-   */
-  private def doSend(request: RequestOrResponse, readResponse: Boolean = true): NetworkReceive = {
-    lock synchronized {
-      verifyRequest(request)
-      getOrMakeConnection()
-
-      var response: NetworkReceive = null
-      try {
-        blockingChannel.send(request)
-        if(readResponse)
-          response = blockingChannel.receive()
-        else
-          trace("Skipping reading response")
-      } catch {
-        case e: java.io.IOException =>
-          // no way to tell if write succeeded. Disconnect and re-throw exception to let client handle retry
-          disconnect()
-          throw e
-        case e: Throwable => throw e
-      }
-      response
-    }
-  }
-
-  /**
-   * Send a message. If the producerRequest had required.request.acks=0, then the
-   * returned response object is null
-   */
-  def send(producerRequest: ProducerRequest): ProducerResponse = {
-    val requestSize = producerRequest.sizeInBytes
-    producerRequestStats.getProducerRequestStats(config.host, config.port).requestSizeHist.update(requestSize)
-    producerRequestStats.getProducerRequestAllBrokersStats.requestSizeHist.update(requestSize)
-
-    var response: NetworkReceive = null
-    val specificTimer = producerRequestStats.getProducerRequestStats(config.host, config.port).requestTimer
-    val aggregateTimer = producerRequestStats.getProducerRequestAllBrokersStats.requestTimer
-    aggregateTimer.time {
-      specificTimer.time {
-        response = doSend(producerRequest, producerRequest.requiredAcks != 0)
-      }
-    }
-    if(producerRequest.requiredAcks != 0) {
-      val producerResponse = ProducerResponse.readFrom(response.payload)
-      producerRequestStats.getProducerRequestStats(config.host, config.port).throttleTimeStats.update(producerResponse.throttleTime, TimeUnit.MILLISECONDS)
-      producerRequestStats.getProducerRequestAllBrokersStats.throttleTimeStats.update(producerResponse.throttleTime, TimeUnit.MILLISECONDS)
-      producerResponse
-    }
-    else
-      null
-  }
-
-  def send(request: TopicMetadataRequest): TopicMetadataResponse = {
-    val response = doSend(request)
-    TopicMetadataResponse.readFrom(response.payload)
-  }
-
-  def close() = {
-    lock synchronized {
-      disconnect()
-      shutdown = true
-    }
-  }
-
-  /**
-   * Disconnect from current channel, closing connection.
-   * Side effect: channel field is set to null on successful disconnect
-   */
-  private def disconnect() {
-    try {
-      info("Disconnecting from " + formatAddress(config.host, config.port))
-      blockingChannel.disconnect()
-    } catch {
-      case e: Exception => error("Error on disconnect: ", e)
-    }
-  }
-
-  private def connect(): BlockingChannel = {
-    if (!blockingChannel.isConnected && !shutdown) {
-      try {
-        blockingChannel.connect()
-        info("Connected to " + formatAddress(config.host, config.port) + " for producing")
-      } catch {
-        case e: Exception => {
-          disconnect()
-          error("Producer connection to " + formatAddress(config.host, config.port) + " unsuccessful", e)
-          throw e
-        }
-      }
-    }
-    blockingChannel
-  }
-
-  private def getOrMakeConnection() {
-    if(!blockingChannel.isConnected) {
-      connect()
-    }
-  }
-}
diff --git a/core/src/main/scala/kafka/producer/SyncProducerConfig.scala b/core/src/main/scala/kafka/producer/SyncProducerConfig.scala
deleted file mode 100644
index 207779c..0000000
--- a/core/src/main/scala/kafka/producer/SyncProducerConfig.scala
+++ /dev/null
@@ -1,72 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
-*/
-
-package kafka.producer
-
-import java.util.Properties
-import kafka.utils.VerifiableProperties
-
-@deprecated("This class has been deprecated and will be removed in a future release. " +
-            "Please use org.apache.kafka.clients.producer.ProducerConfig instead.", "0.10.0.0")
-class SyncProducerConfig private (val props: VerifiableProperties) extends SyncProducerConfigShared {
-  def this(originalProps: Properties) {
-    this(new VerifiableProperties(originalProps))
-    // no need to verify the property since SyncProducerConfig is supposed to be used internally
-  }
-
-  /** the broker to which the producer sends events */
-  val host = props.getString("host")
-
-  /** the port on which the broker is running */
-  val port = props.getInt("port")
-}
-
-@deprecated("This trait has been deprecated and will be removed in a future release. " +
-            "Please use org.apache.kafka.clients.producer.ProducerConfig instead.", "0.10.0.0")
-trait SyncProducerConfigShared {
-  val props: VerifiableProperties
-  
-  val sendBufferBytes = props.getInt("send.buffer.bytes", 100*1024)
-
-  /* the client application sending the producer requests */
-  val clientId = props.getString("client.id", SyncProducerConfig.DefaultClientId)
-
-  /*
-   * The number of acknowledgments the producer requires the leader to have received before considering a request complete.
-   * This controls the durability of the messages sent by the producer.
-   *
-   * request.required.acks = 0 - means the producer will not wait for any acknowledgement from the leader.
-   * request.required.acks = 1 - means the leader will write the message to its local log and immediately acknowledge
-   * request.required.acks = -1 - means the leader will wait for acknowledgement from all in-sync replicas before acknowledging the write
-   */
-
-  val requestRequiredAcks = props.getShortInRange("request.required.acks", SyncProducerConfig.DefaultRequiredAcks,(-1,1))
-
-  /*
-   * The ack timeout of the producer requests. Value must be non-negative and non-zero
-   */
-  val requestTimeoutMs = props.getIntInRange("request.timeout.ms", SyncProducerConfig.DefaultAckTimeoutMs,
-                                             (1, Integer.MAX_VALUE))
-}
-
-@deprecated("This object has been deprecated and will be removed in a future release. " +
-            "Please use org.apache.kafka.clients.producer.ProducerConfig instead.", "0.10.0.0")
-object SyncProducerConfig {
-  val DefaultClientId = ""
-  val DefaultRequiredAcks : Short = 0
-  val DefaultAckTimeoutMs = 10000
-}
diff --git a/core/src/main/scala/kafka/producer/async/AsyncProducerConfig.scala b/core/src/main/scala/kafka/producer/async/AsyncProducerConfig.scala
deleted file mode 100644
index cc3a79d..0000000
--- a/core/src/main/scala/kafka/producer/async/AsyncProducerConfig.scala
+++ /dev/null
@@ -1,49 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
-*/
-package kafka.producer.async
-
-import kafka.utils.VerifiableProperties
-
-@deprecated("This trait has been deprecated and will be removed in a future release. " +
-            "Please use org.apache.kafka.clients.producer.ProducerConfig instead.", "0.10.0.0")
-trait AsyncProducerConfig {
-  val props: VerifiableProperties
-
-  /* maximum time, in milliseconds, for buffering data on the producer queue */
-  val queueBufferingMaxMs = props.getInt("queue.buffering.max.ms", 5000)
-
-  /** the maximum size of the blocking queue for buffering on the producer */
-  val queueBufferingMaxMessages = props.getInt("queue.buffering.max.messages", 10000)
-
-  /**
-   * Timeout for event enqueue:
-   * 0: events will be enqueued immediately or dropped if the queue is full
-   * -ve: enqueue will block indefinitely if the queue is full
-   * +ve: enqueue will block up to this many milliseconds if the queue is full
-   */
-  val queueEnqueueTimeoutMs = props.getInt("queue.enqueue.timeout.ms", -1)
-
-  /** the number of messages batched at the producer */
-  val batchNumMessages = props.getInt("batch.num.messages", 200)
-
-  /** the serializer class for values */
-  val serializerClass = props.getString("serializer.class", "kafka.serializer.DefaultEncoder")
-  
-  /** the serializer class for keys (defaults to the same as for values) */
-  val keySerializerClass = props.getString("key.serializer.class", serializerClass)
-  
-}
diff --git a/core/src/main/scala/kafka/producer/async/DefaultEventHandler.scala b/core/src/main/scala/kafka/producer/async/DefaultEventHandler.scala
deleted file mode 100755
index 8c7465f..0000000
--- a/core/src/main/scala/kafka/producer/async/DefaultEventHandler.scala
+++ /dev/null
@@ -1,359 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package kafka.producer.async
-
-import kafka.common._
-import kafka.message.{ByteBufferMessageSet, Message, NoCompressionCodec}
-import kafka.producer._
-import kafka.serializer.Encoder
-import kafka.utils._
-import org.apache.kafka.common.errors.{LeaderNotAvailableException, UnknownTopicOrPartitionException}
-import org.apache.kafka.common.protocol.Errors
-
-import scala.util.Random
-import scala.collection.{Map, Seq}
-import scala.collection.mutable.{ArrayBuffer, HashMap, Set}
-import java.util.concurrent.atomic._
-
-import kafka.api.{ProducerRequest, TopicMetadata}
-import org.apache.kafka.common.utils.{Time, Utils}
-import org.slf4j.event.Level
-
-@deprecated("This class has been deprecated and will be removed in a future release.", "0.10.0.0")
-class DefaultEventHandler[K,V](config: ProducerConfig,
-                               private val partitioner: Partitioner,
-                               private val encoder: Encoder[V],
-                               private val keyEncoder: Encoder[K],
-                               private val producerPool: ProducerPool,
-                               private val topicPartitionInfos: HashMap[String, TopicMetadata] = new HashMap[String, TopicMetadata],
-                               private val time: Time = Time.SYSTEM)
-  extends EventHandler[K,V] with Logging {
-
-  val isSync = ("sync" == config.producerType)
-
-  val correlationId = new AtomicInteger(0)
-  val brokerPartitionInfo = new BrokerPartitionInfo(config, producerPool, topicPartitionInfos)
-
-  private val topicMetadataRefreshInterval = config.topicMetadataRefreshIntervalMs
-  private var lastTopicMetadataRefreshTime = 0L
-  private val topicMetadataToRefresh = Set.empty[String]
-  private val sendPartitionPerTopicCache = HashMap.empty[String, Int]
-
-  private val producerStats = ProducerStatsRegistry.getProducerStats(config.clientId)
-  private val producerTopicStats = ProducerTopicStatsRegistry.getProducerTopicStats(config.clientId)
-
-  def handle(events: Seq[KeyedMessage[K,V]]) {
-    val serializedData = serialize(events)
-    serializedData.foreach {
-      keyed =>
-        val dataSize = keyed.message.payloadSize
-        producerTopicStats.getProducerTopicStats(keyed.topic).byteRate.mark(dataSize)
-        producerTopicStats.getProducerAllTopicsStats.byteRate.mark(dataSize)
-    }
-    var outstandingProduceRequests = serializedData
-    var remainingRetries = config.messageSendMaxRetries + 1
-    val correlationIdStart = correlationId.get()
-    debug("Handling %d events".format(events.size))
-    while (remainingRetries > 0 && outstandingProduceRequests.nonEmpty) {
-      topicMetadataToRefresh ++= outstandingProduceRequests.map(_.topic)
-      if (topicMetadataRefreshInterval >= 0 &&
-          Time.SYSTEM.milliseconds - lastTopicMetadataRefreshTime > topicMetadataRefreshInterval) {
-        CoreUtils.swallow(brokerPartitionInfo.updateInfo(topicMetadataToRefresh.toSet, correlationId.getAndIncrement), this, Level.ERROR)
-        sendPartitionPerTopicCache.clear()
-        topicMetadataToRefresh.clear
-        lastTopicMetadataRefreshTime = Time.SYSTEM.milliseconds
-      }
-      outstandingProduceRequests = dispatchSerializedData(outstandingProduceRequests)
-      if (outstandingProduceRequests.nonEmpty) {
-        info("Back off for %d ms before retrying send. Remaining retries = %d".format(config.retryBackoffMs, remainingRetries-1))
-        // back off and update the topic metadata cache before attempting another send operation
-        Thread.sleep(config.retryBackoffMs)
-        // get topics of the outstanding produce requests and refresh metadata for those
-        CoreUtils.swallow(brokerPartitionInfo.updateInfo(outstandingProduceRequests.map(_.topic).toSet, correlationId.getAndIncrement), this, Level.ERROR)
-        sendPartitionPerTopicCache.clear()
-        remainingRetries -= 1
-        producerStats.resendRate.mark()
-      }
-    }
-    if(outstandingProduceRequests.nonEmpty) {
-      producerStats.failedSendRate.mark()
-      val correlationIdEnd = correlationId.get()
-      error("Failed to send requests for topics %s with correlation ids in [%d,%d]"
-        .format(outstandingProduceRequests.map(_.topic).toSet.mkString(","),
-        correlationIdStart, correlationIdEnd-1))
-      throw new FailedToSendMessageException("Failed to send messages after " + config.messageSendMaxRetries + " tries.", null)
-    }
-  }
-
-  private def dispatchSerializedData(messages: Seq[KeyedMessage[K,Message]]): Seq[KeyedMessage[K, Message]] = {
-    val partitionedDataOpt = partitionAndCollate(messages)
-    partitionedDataOpt match {
-      case Some(partitionedData) =>
-        val failedProduceRequests = new ArrayBuffer[KeyedMessage[K, Message]]
-        for ((brokerid, messagesPerBrokerMap) <- partitionedData) {
-          if (isTraceEnabled) {
-            messagesPerBrokerMap.foreach(partitionAndEvent =>
-              trace("Handling event for Topic: %s, Broker: %d, Partitions: %s".format(partitionAndEvent._1, brokerid, partitionAndEvent._2)))
-          }
-          val messageSetPerBrokerOpt = groupMessagesToSet(messagesPerBrokerMap)
-          messageSetPerBrokerOpt match {
-            case Some(messageSetPerBroker) =>
-              val failedTopicPartitions = send(brokerid, messageSetPerBroker)
-              failedTopicPartitions.foreach(topicPartition => {
-                messagesPerBrokerMap.get(topicPartition).foreach(failedProduceRequests.appendAll)
-              })
-            case None => // failed to group messages
-              messagesPerBrokerMap.values.foreach(m => failedProduceRequests.appendAll(m))
-          }
-        }
-        failedProduceRequests
-      case None => // failed to collate messages
-        messages
-    }
-  }
-
-  def serialize(events: Seq[KeyedMessage[K,V]]): Seq[KeyedMessage[K,Message]] = {
-    val serializedMessages = new ArrayBuffer[KeyedMessage[K,Message]](events.size)
-    events.foreach{e =>
-      try {
-        if(e.hasKey)
-          serializedMessages += new KeyedMessage[K,Message](
-            topic = e.topic,
-            key = e.key,
-            partKey = e.partKey,
-            message = new Message(key = keyEncoder.toBytes(e.key),
-                                  bytes = encoder.toBytes(e.message),
-                                  timestamp = time.milliseconds,
-                                  magicValue = Message.MagicValue_V1))
-        else
-          serializedMessages += new KeyedMessage[K,Message](
-            topic = e.topic,
-            key = e.key,
-            partKey = e.partKey,
-            message = new Message(bytes = encoder.toBytes(e.message),
-                                  timestamp = time.milliseconds,
-                                  magicValue = Message.MagicValue_V1))
-      } catch {
-        case t: Throwable =>
-          producerStats.serializationErrorRate.mark()
-          if (isSync) {
-            throw t
-          } else {
-            // currently, if in async mode, we just log the serialization error. We need to revisit
-            // this when doing kafka-496
-            error("Error serializing message for topic %s".format(e.topic), t)
-          }
-      }
-    }
-    serializedMessages
-  }
-
-  def partitionAndCollate(messages: Seq[KeyedMessage[K,Message]]): Option[Map[Int, collection.mutable.Map[TopicAndPartition, Seq[KeyedMessage[K,Message]]]]] = {
-    val ret = new HashMap[Int, collection.mutable.Map[TopicAndPartition, Seq[KeyedMessage[K,Message]]]]
-    try {
-      for (message <- messages) {
-        val topicPartitionsList = getPartitionListForTopic(message)
-        val partitionIndex = getPartition(message.topic, message.partitionKey, topicPartitionsList)
-        val brokerPartition = topicPartitionsList(partitionIndex)
-
-        // postpone the failure until the send operation, so that requests for other brokers are handled correctly
-        val leaderBrokerId = brokerPartition.leaderBrokerIdOpt.getOrElse(-1)
-
-        var dataPerBroker: HashMap[TopicAndPartition, Seq[KeyedMessage[K,Message]]] = null
-        ret.get(leaderBrokerId) match {
-          case Some(element) =>
-            dataPerBroker = element.asInstanceOf[HashMap[TopicAndPartition, Seq[KeyedMessage[K,Message]]]]
-          case None =>
-            dataPerBroker = new HashMap[TopicAndPartition, Seq[KeyedMessage[K,Message]]]
-            ret.put(leaderBrokerId, dataPerBroker)
-        }
-
-        val topicAndPartition = TopicAndPartition(message.topic, brokerPartition.partitionId)
-        var dataPerTopicPartition: ArrayBuffer[KeyedMessage[K,Message]] = null
-        dataPerBroker.get(topicAndPartition) match {
-          case Some(element) =>
-            dataPerTopicPartition = element.asInstanceOf[ArrayBuffer[KeyedMessage[K,Message]]]
-          case None =>
-            dataPerTopicPartition = new ArrayBuffer[KeyedMessage[K,Message]]
-            dataPerBroker.put(topicAndPartition, dataPerTopicPartition)
-        }
-        dataPerTopicPartition.append(message)
-      }
-      Some(ret)
-    }catch {    // Swallow recoverable exceptions and return None so that they can be retried.
-      case ute: UnknownTopicOrPartitionException => warn("Failed to collate messages by topic,partition due to: " + ute.getMessage); None
-      case lnae: LeaderNotAvailableException => warn("Failed to collate messages by topic,partition due to: " + lnae.getMessage); None
-      case oe: Throwable => error("Failed to collate messages by topic, partition due to: " + oe.getMessage); None
-    }
-  }
-
-  private def getPartitionListForTopic(m: KeyedMessage[K,Message]): Seq[PartitionAndLeader] = {
-    val topicPartitionsList = brokerPartitionInfo.getBrokerPartitionInfo(m.topic, correlationId.getAndIncrement)
-    debug("Broker partitions registered for topic: %s are %s"
-      .format(m.topic, topicPartitionsList.map(p => p.partitionId).mkString(",")))
-    val totalNumPartitions = topicPartitionsList.length
-    if(totalNumPartitions == 0)
-      throw new NoBrokersForPartitionException("Partition key = " + m.key)
-    topicPartitionsList
-  }
-
-  /**
-   * Retrieves the partition id and throws an UnknownTopicOrPartitionException if
-   * the value of partition is not between 0 and numPartitions-1
-   * @param topic The topic
-   * @param key the partition key
-   * @param topicPartitionList the list of available partitions
-   * @return the partition id
-   */
-  private def getPartition(topic: String, key: Any, topicPartitionList: Seq[PartitionAndLeader]): Int = {
-    val numPartitions = topicPartitionList.size
-    if(numPartitions <= 0)
-      throw new UnknownTopicOrPartitionException("Topic " + topic + " doesn't exist")
-    val partition =
-      if(key == null) {
-        // If the key is null, we don't really need a partitioner
-        // So we look up in the send partition cache for the topic to decide the target partition
-        val id = sendPartitionPerTopicCache.get(topic)
-        id match {
-          case Some(partitionId) =>
-            // directly return the partitionId without checking availability of the leader,
-            // since we want to postpone the failure until the send operation anyways
-            partitionId
-          case None =>
-            val availablePartitions = topicPartitionList.filter(_.leaderBrokerIdOpt.isDefined)
-            if (availablePartitions.isEmpty)
-              throw new LeaderNotAvailableException("No leader for any partition in topic " + topic)
-            val index = Utils.abs(Random.nextInt) % availablePartitions.size
-            val partitionId = availablePartitions(index).partitionId
-            sendPartitionPerTopicCache.put(topic, partitionId)
-            partitionId
-        }
-      } else
-        partitioner.partition(key, numPartitions)
-    if(partition < 0 || partition >= numPartitions)
-      throw new UnknownTopicOrPartitionException("Invalid partition id: " + partition + " for topic " + topic +
-        "; Valid values are in the inclusive range of [0, " + (numPartitions-1) + "]")
-    trace("Assigning message of topic %s and key %s to a selected partition %d".format(topic, if (key == null) "[none]" else key.toString, partition))
-    partition
-  }
-
-  /**
-   * Constructs and sends the produce request based on a map from (topic, partition) -> messages
-   *
-   * @param brokerId the broker that will receive the request
-   * @param messagesPerTopic the messages as a map from (topic, partition) -> messages
-   * @return the set (topic, partitions) messages which incurred an error sending or processing
-   */
-  private def send(brokerId: Int, messagesPerTopic: collection.mutable.Map[TopicAndPartition, ByteBufferMessageSet]) = {
-    if(brokerId < 0) {
-      warn("Failed to send data since partitions %s don't have a leader".format(messagesPerTopic.keys.mkString(",")))
-      messagesPerTopic.keys.toSeq
-    } else if(messagesPerTopic.nonEmpty) {
-      val currentCorrelationId = correlationId.getAndIncrement
-      val producerRequest = new ProducerRequest(currentCorrelationId, config.clientId, config.requestRequiredAcks,
-        config.requestTimeoutMs, messagesPerTopic)
-      var failedTopicPartitions = Seq.empty[TopicAndPartition]
-      try {
-        val syncProducer = producerPool.getProducer(brokerId)
-        debug("Producer sending messages with correlation id %d for topics %s to broker %d on %s:%d"
-          .format(currentCorrelationId, messagesPerTopic.keySet.mkString(","), brokerId, syncProducer.config.host, syncProducer.config.port))
-        val response = syncProducer.send(producerRequest)
-        debug("Producer sent messages with correlation id %d for topics %s to broker %d on %s:%d"
-          .format(currentCorrelationId, messagesPerTopic.keySet.mkString(","), brokerId, syncProducer.config.host, syncProducer.config.port))
-        if(response != null) {
-          if (response.status.size != producerRequest.data.size)
-            throw new KafkaException("Incomplete response (%s) for producer request (%s)".format(response, producerRequest))
-          if (isTraceEnabled) {
-            val successfullySentData = response.status.filter(_._2.error == Errors.NONE)
-            successfullySentData.foreach(m => messagesPerTopic(m._1).foreach(message =>
-              trace("Successfully sent message: %s".format(if(message.message.isNull) null else message.message.toString()))))
-          }
-          val failedPartitionsAndStatus = response.status.filter(_._2.error != Errors.NONE).toSeq
-          failedTopicPartitions = failedPartitionsAndStatus.map(partitionStatus => partitionStatus._1)
-          if(failedTopicPartitions.nonEmpty) {
-            val errorString = failedPartitionsAndStatus
-              .sortWith((p1, p2) => p1._1.topic.compareTo(p2._1.topic) < 0 ||
-                                    (p1._1.topic.compareTo(p2._1.topic) == 0 && p1._1.partition < p2._1.partition))
-              .map{
-                case(topicAndPartition, status) =>
-                  topicAndPartition.toString + ": " + status.error.exceptionName
-              }.mkString(",")
-            warn("Produce request with correlation id %d failed due to %s".format(currentCorrelationId, errorString))
-          }
-          failedTopicPartitions
-        } else {
-          Seq.empty[TopicAndPartition]
-        }
-      } catch {
-        case t: Throwable =>
-          warn("Failed to send producer request with correlation id %d to broker %d with data for partitions %s"
-            .format(currentCorrelationId, brokerId, messagesPerTopic.keys.mkString(",")), t)
-          messagesPerTopic.keys.toSeq
-      }
-    } else {
-      List.empty
-    }
-  }
-
-  private def groupMessagesToSet(messagesPerTopicAndPartition: collection.mutable.Map[TopicAndPartition, Seq[KeyedMessage[K, Message]]]) = {
-    /** enforce the compressed.topics config here.
-      * If the compression codec is anything other than NoCompressionCodec,
-      * Enable compression only for specified topics if any
-      * If the list of compressed topics is empty, then enable the specified compression codec for all topics
-      * If the compression codec is NoCompressionCodec, compression is disabled for all topics
-      */
-    try {
-      val messagesPerTopicPartition = messagesPerTopicAndPartition.map { case (topicAndPartition, messages) =>
-        val rawMessages = messages.map(_.message)
-        (topicAndPartition,
-          config.compressionCodec match {
-            case NoCompressionCodec =>
-              debug("Sending %d messages with no compression to %s".format(messages.size, topicAndPartition))
-              new ByteBufferMessageSet(NoCompressionCodec, rawMessages: _*)
-            case _ =>
-              config.compressedTopics.size match {
-                case 0 =>
-                  debug("Sending %d messages with compression codec %d to %s"
-                    .format(messages.size, config.compressionCodec.codec, topicAndPartition))
-                  new ByteBufferMessageSet(config.compressionCodec, rawMessages: _*)
-                case _ =>
-                  if (config.compressedTopics.contains(topicAndPartition.topic)) {
-                    debug("Sending %d messages with compression codec %d to %s"
-                      .format(messages.size, config.compressionCodec.codec, topicAndPartition))
-                    new ByteBufferMessageSet(config.compressionCodec, rawMessages: _*)
-                  }
-                  else {
-                    debug("Sending %d messages to %s with no compression as it is not in compressed.topics - %s"
-                      .format(messages.size, topicAndPartition, config.compressedTopics.toString))
-                    new ByteBufferMessageSet(NoCompressionCodec, rawMessages: _*)
-                  }
-              }
-          }
-          )
-      }
-      Some(messagesPerTopicPartition)
-    } catch {
-      case t: Throwable => error("Failed to group messages", t); None
-    }
-  }
-
-  def close() {
-    if (producerPool != null)
-      producerPool.close
-  }
-}
diff --git a/core/src/main/scala/kafka/producer/async/EventHandler.scala b/core/src/main/scala/kafka/producer/async/EventHandler.scala
deleted file mode 100644
index 44fb1eb..0000000
--- a/core/src/main/scala/kafka/producer/async/EventHandler.scala
+++ /dev/null
@@ -1,37 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
-*/
-package kafka.producer.async
-
-import kafka.producer.KeyedMessage
-
-/**
- * Handler that dispatches the batched data from the queue.
- */
-@deprecated("This trait has been deprecated and will be removed in a future release.", "0.10.0.0")
-trait EventHandler[K,V] {
-
-  /**
-   * Callback to dispatch the batched data and send it to a Kafka server
-   * @param events the data sent to the producer
-  */
-  def handle(events: Seq[KeyedMessage[K,V]])
-
-  /**
-   * Cleans up and shuts down the event handler
-  */
-  def close(): Unit
-}
diff --git a/core/src/main/scala/kafka/producer/async/IllegalQueueStateException.scala b/core/src/main/scala/kafka/producer/async/IllegalQueueStateException.scala
deleted file mode 100644
index 7779715..0000000
--- a/core/src/main/scala/kafka/producer/async/IllegalQueueStateException.scala
+++ /dev/null
@@ -1,26 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
-*/
-
-package kafka.producer.async
-
-/**
- * Indicates that the given config parameter has invalid value
- */
-@deprecated("This class has been deprecated and will be removed in a future release.", "0.10.0.0")
-class IllegalQueueStateException(message: String) extends RuntimeException(message) {
-  def this() = this(null)
-}
diff --git a/core/src/main/scala/kafka/producer/async/MissingConfigException.scala b/core/src/main/scala/kafka/producer/async/MissingConfigException.scala
deleted file mode 100644
index a42678b..0000000
--- a/core/src/main/scala/kafka/producer/async/MissingConfigException.scala
+++ /dev/null
@@ -1,24 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package kafka.producer.async
-
-/* Indicates any missing configuration parameter */
-@deprecated("This class has been deprecated and will be removed in a future release.", "0.10.0.0")
-class MissingConfigException(message: String) extends RuntimeException(message) {
-  def this() = this(null)
-}
diff --git a/core/src/main/scala/kafka/producer/async/ProducerSendThread.scala b/core/src/main/scala/kafka/producer/async/ProducerSendThread.scala
deleted file mode 100644
index 0377093..0000000
--- a/core/src/main/scala/kafka/producer/async/ProducerSendThread.scala
+++ /dev/null
@@ -1,114 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package kafka.producer.async
-
-import kafka.utils.Logging
-import java.util.concurrent.{BlockingQueue, CountDownLatch, TimeUnit}
-
-import collection.mutable.ArrayBuffer
-import kafka.producer.KeyedMessage
-import kafka.metrics.KafkaMetricsGroup
-import com.yammer.metrics.core.Gauge
-import org.apache.kafka.common.utils.Time
-
-@deprecated("This class has been deprecated and will be removed in a future release.", "0.10.0.0")
-class ProducerSendThread[K,V](val threadName: String,
-                              val queue: BlockingQueue[KeyedMessage[K,V]],
-                              val handler: EventHandler[K,V],
-                              val queueTime: Long,
-                              val batchSize: Int,
-                              val clientId: String) extends Thread(threadName) with Logging with KafkaMetricsGroup {
-
-  private val shutdownLatch = new CountDownLatch(1)
-  private val shutdownCommand = new KeyedMessage[K,V]("shutdown", null.asInstanceOf[K], null.asInstanceOf[V])
-
-  newGauge("ProducerQueueSize",
-          new Gauge[Int] {
-            def value = queue.size
-          },
-          Map("clientId" -> clientId))
-
-  override def run {
-    try {
-      processEvents
-    }catch {
-      case e: Throwable => error("Error in sending events: ", e)
-    }finally {
-      shutdownLatch.countDown
-    }
-  }
-
-  def shutdown(): Unit = {
-    info("Begin shutting down ProducerSendThread")
-    queue.put(shutdownCommand)
-    shutdownLatch.await
-    info("Shutdown ProducerSendThread complete")
-  }
-
-  private def processEvents() {
-    var lastSend = Time.SYSTEM.milliseconds
-    var events = new ArrayBuffer[KeyedMessage[K,V]]
-    var full: Boolean = false
-
-    // drain the queue until you get a shutdown command
-    Iterator.continually(queue.poll(scala.math.max(0, (lastSend + queueTime) - Time.SYSTEM.milliseconds), TimeUnit.MILLISECONDS))
-                      .takeWhile(item => if(item != null) item ne shutdownCommand else true).foreach {
-      currentQueueItem =>
-        val elapsed = Time.SYSTEM.milliseconds - lastSend
-        // check if the queue time is reached. This happens when the poll method above returns after a timeout and
-        // returns a null object
-        val expired = currentQueueItem == null
-        if(currentQueueItem != null) {
-          trace("Dequeued item for topic %s, partition key: %s, data: %s"
-              .format(currentQueueItem.topic, currentQueueItem.key, currentQueueItem.message))
-          events += currentQueueItem
-        }
-
-        // check if the batch size is reached
-        full = events.size >= batchSize
-
-        if(full || expired) {
-          if(expired)
-            debug(elapsed + " ms elapsed. Queue time reached. Sending..")
-          if(full)
-            debug("Batch full. Sending..")
-          // if either queue time has reached or batch size has reached, dispatch to event handler
-          tryToHandle(events)
-          lastSend = Time.SYSTEM.milliseconds
-          events = new ArrayBuffer[KeyedMessage[K,V]]
-        }
-    }
-    // send the last batch of events
-    tryToHandle(events)
-    if(queue.size > 0)
-      throw new IllegalQueueStateException("Invalid queue state! After queue shutdown, %d remaining items in the queue"
-        .format(queue.size))
-  }
-
-  def tryToHandle(events: Seq[KeyedMessage[K,V]]) {
-    val size = events.size
-    try {
-      debug("Handling " + size + " events")
-      if(size > 0)
-        handler.handle(events)
-    }catch {
-      case e: Throwable => error("Error in handling batch of " + size + " events", e)
-    }
-  }
-
-}
diff --git a/core/src/main/scala/kafka/serializer/Encoder.scala b/core/src/main/scala/kafka/serializer/Encoder.scala
deleted file mode 100644
index b1277e1..0000000
--- a/core/src/main/scala/kafka/serializer/Encoder.scala
+++ /dev/null
@@ -1,79 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- * 
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package kafka.serializer
-
-import java.nio.ByteBuffer
-
-import kafka.utils.VerifiableProperties
-
-/**
- * An encoder is a method of turning objects into byte arrays.
- * An implementation is required to provide a constructor that
- * takes a VerifiableProperties instance.
- */
-trait Encoder[T] {
-  def toBytes(t: T): Array[Byte]
-}
-
-/**
- * The default implementation is a no-op, it just returns the same array it takes in
- */
-class DefaultEncoder(props: VerifiableProperties = null) extends Encoder[Array[Byte]] {
-  override def toBytes(value: Array[Byte]): Array[Byte] = value
-}
-
-class NullEncoder[T](props: VerifiableProperties = null) extends Encoder[T] {
-  override def toBytes(value: T): Array[Byte] = null
-}
-
-/**
- * The string encoder takes an optional parameter serializer.encoding which controls
- * the character set used in encoding the string into bytes.
- */
-class StringEncoder(props: VerifiableProperties = null) extends Encoder[String] {
-  val encoding = 
-    if(props == null) 
-      "UTF8" 
-    else 
-      props.getString("serializer.encoding", "UTF8")
-  
-  override def toBytes(s: String): Array[Byte] = 
-    if(s == null)
-      null
-    else
-      s.getBytes(encoding)
-}
-
-/**
-  * The long encoder translates longs into bytes.
-  */
-class LongEncoder(props: VerifiableProperties = null) extends Encoder[Long] {
-  override def toBytes(l: Long): Array[Byte] =
-    ByteBuffer.allocate(8).putLong(l).array()
-}
-
-/**
-  * The integer encoder translates integers into bytes.
-  */
-class IntegerEncoder(props: VerifiableProperties = null) extends Encoder[Integer] {
-  override def toBytes(i: Integer): Array[Byte] =
-    if(i == null)
-      null
-    else
-      ByteBuffer.allocate(4).putInt(i).array()
-}
diff --git a/core/src/main/scala/kafka/tools/ConsoleProducer.scala b/core/src/main/scala/kafka/tools/ConsoleProducer.scala
index 39bb0ff..e5b72a3 100644
--- a/core/src/main/scala/kafka/tools/ConsoleProducer.scala
+++ b/core/src/main/scala/kafka/tools/ConsoleProducer.scala
@@ -19,35 +19,29 @@ package kafka.tools
 
 import kafka.common._
 import kafka.message._
-import kafka.serializer._
 import kafka.utils.{CommandLineUtils, Exit, ToolsUtils}
 import kafka.utils.Implicits._
-import kafka.producer.{NewShinyProducer, OldProducer}
 import java.util.Properties
 import java.io._
 import java.nio.charset.StandardCharsets
 
 import joptsimple._
-import org.apache.kafka.clients.producer.{ProducerConfig, ProducerRecord}
+import org.apache.kafka.clients.producer.internals.ErrorLoggingCallback
+import org.apache.kafka.clients.producer.{KafkaProducer, ProducerConfig, ProducerRecord}
 import org.apache.kafka.common.utils.Utils
 
 import scala.collection.JavaConverters._
 
 object ConsoleProducer {
 
-  def main(args: Array[String]) {
+  def main(args: Array[String]): Unit = {
 
     try {
         val config = new ProducerConfig(args)
         val reader = Class.forName(config.readerClass).newInstance().asInstanceOf[MessageReader]
         reader.init(System.in, getReaderProps(config))
 
-        val producer =
-          if(config.useOldProducer) {
-            new OldProducer(getOldProducerProps(config))
-          } else {
-            new NewShinyProducer(getNewProducerProps(config))
-          }
+        val producer = new KafkaProducer[Array[Byte], Array[Byte]](producerProps(config))
 
         Runtime.getRuntime.addShutdownHook(new Thread() {
           override def run() {
@@ -55,12 +49,12 @@ object ConsoleProducer {
           }
         })
 
-        var message: ProducerRecord[Array[Byte], Array[Byte]] = null
+        var record: ProducerRecord[Array[Byte], Array[Byte]] = null
         do {
-          message = reader.readMessage()
-          if (message != null)
-            producer.send(message.topic, message.key, message.value)
-        } while (message != null)
+          record = reader.readMessage()
+          if (record != null)
+            send(producer, record, config.sync)
+        } while (record != null)
     } catch {
       case e: joptsimple.OptionException =>
         System.err.println(e.getMessage)
@@ -72,47 +66,28 @@ object ConsoleProducer {
     Exit.exit(0)
   }
 
+  private def send(producer: KafkaProducer[Array[Byte], Array[Byte]],
+                         record: ProducerRecord[Array[Byte], Array[Byte]], sync: Boolean): Unit = {
+    if (sync)
+      producer.send(record).get()
+    else
+      producer.send(record, new ErrorLoggingCallback(record.topic, record.key, record.value, false))
+  }
+
   def getReaderProps(config: ProducerConfig): Properties = {
     val props = new Properties
-    props.put("topic",config.topic)
+    props.put("topic", config.topic)
     props ++= config.cmdLineProps
     props
   }
 
-  def getOldProducerProps(config: ProducerConfig): Properties = {
-    val props = producerProps(config)
-
-    props.put("metadata.broker.list", config.brokerList)
-    props.put("compression.codec", config.compressionCodec)
-    props.put("producer.type", if(config.sync) "sync" else "async")
-    props.put("batch.num.messages", config.batchSize.toString)
-    props.put("message.send.max.retries", config.messageSendMaxRetries.toString)
-    props.put("retry.backoff.ms", config.retryBackoffMs.toString)
-    props.put("queue.buffering.max.ms", config.sendTimeout.toString)
-    props.put("queue.buffering.max.messages", config.queueSize.toString)
-    props.put("queue.enqueue.timeout.ms", config.queueEnqueueTimeoutMs.toString)
-    props.put("request.required.acks", config.requestRequiredAcks)
-    props.put("request.timeout.ms", config.requestTimeoutMs.toString)
-    props.put("key.serializer.class", config.keyEncoderClass)
-    props.put("serializer.class", config.valueEncoderClass)
-    props.put("send.buffer.bytes", config.socketBuffer.toString)
-    props.put("topic.metadata.refresh.interval.ms", config.metadataExpiryMs.toString)
-    props.put("client.id", "console-producer")
-
-    props
-  }
-
-  private def producerProps(config: ProducerConfig): Properties = {
+  def producerProps(config: ProducerConfig): Properties = {
     val props =
       if (config.options.has(config.producerConfigOpt))
         Utils.loadProps(config.options.valueOf(config.producerConfigOpt))
       else new Properties
-    props ++= config.extraProducerProps
-    props
-  }
 
-  def getNewProducerProps(config: ProducerConfig): Properties = {
-    val props = producerProps(config)
+    props ++= config.extraProducerProps
 
     props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, config.brokerList)
     props.put(ProducerConfig.COMPRESSION_TYPE_CONFIG, config.compressionCodec)
@@ -168,17 +143,6 @@ object ConsoleProducer {
       .describedAs("timeout_ms")
       .ofType(classOf[java.lang.Integer])
       .defaultsTo(1000)
-    val queueSizeOpt = parser.accepts("queue-size", "If set and the producer is running in asynchronous mode, this gives the maximum amount of " +
-      " messages will queue awaiting sufficient batch size.")
-      .withRequiredArg
-      .describedAs("queue_size")
-      .ofType(classOf[java.lang.Integer])
-      .defaultsTo(10000)
-    val queueEnqueueTimeoutMsOpt = parser.accepts("queue-enqueuetimeout-ms", "Timeout for event enqueue")
-      .withRequiredArg
-      .describedAs("queue enqueuetimeout ms")
-      .ofType(classOf[java.lang.Integer])
-      .defaultsTo(Int.MaxValue)
     val requestRequiredAcksOpt = parser.accepts("request-required-acks", "The required acks of the producer requests")
       .withRequiredArg
       .describedAs("request required acks")
@@ -214,16 +178,6 @@ object ConsoleProducer {
       .describedAs("memory in bytes per partition")
       .ofType(classOf[java.lang.Long])
       .defaultsTo(16 * 1024L)
-    val valueEncoderOpt = parser.accepts("value-serializer", "The class name of the message encoder implementation to use for serializing values.")
-      .withRequiredArg
-      .describedAs("encoder_class")
-      .ofType(classOf[java.lang.String])
-      .defaultsTo(classOf[DefaultEncoder].getName)
-    val keyEncoderOpt = parser.accepts("key-serializer", "The class name of the message encoder implementation to use for serializing keys.")
-      .withRequiredArg
-      .describedAs("encoder_class")
-      .ofType(classOf[java.lang.String])
-      .defaultsTo(classOf[DefaultEncoder].getName)
     val messageReaderOpt = parser.accepts("line-reader", "The class name of the class to use for reading lines from standard in. " +
       "By default each line is read as a separate message.")
       .withRequiredArg
@@ -248,14 +202,12 @@ object ConsoleProducer {
       .withRequiredArg
       .describedAs("config file")
       .ofType(classOf[String])
-    val useOldProducerOpt = parser.accepts("old-producer", "Use the old producer implementation.")
 
     val options = parser.parse(args : _*)
-    if(args.length == 0)
+    if (args.length == 0)
       CommandLineUtils.printUsageAndDie(parser, "Read data from standard input and publish it to Kafka.")
     CommandLineUtils.checkRequiredArgs(parser, options, topicOpt, brokerListOpt)
 
-    val useOldProducer = options.has(useOldProducerOpt)
     val topic = options.valueOf(topicOpt)
     val brokerList = options.valueOf(brokerListOpt)
     ToolsUtils.validatePortOrDie(parser,brokerList)
@@ -268,14 +220,10 @@ object ConsoleProducer {
                            else NoCompressionCodec.name
     val batchSize = options.valueOf(batchSizeOpt)
     val sendTimeout = options.valueOf(sendTimeoutOpt)
-    val queueSize = options.valueOf(queueSizeOpt)
-    val queueEnqueueTimeoutMs = options.valueOf(queueEnqueueTimeoutMsOpt)
     val requestRequiredAcks = options.valueOf(requestRequiredAcksOpt)
     val requestTimeoutMs = options.valueOf(requestTimeoutMsOpt)
     val messageSendMaxRetries = options.valueOf(messageSendMaxRetriesOpt)
     val retryBackoffMs = options.valueOf(retryBackoffMsOpt)
-    val keyEncoderClass = options.valueOf(keyEncoderOpt)
-    val valueEncoderClass = options.valueOf(valueEncoderOpt)
     val readerClass = options.valueOf(messageReaderOpt)
     val socketBuffer = options.valueOf(socketBufferSizeOpt)
     val cmdLineProps = CommandLineUtils.parseKeyValueArgs(options.valuesOf(propertyOpt).asScala)
diff --git a/core/src/main/scala/kafka/tools/ProducerPerformance.scala b/core/src/main/scala/kafka/tools/ProducerPerformance.scala
deleted file mode 100644
index f96200d..0000000
--- a/core/src/main/scala/kafka/tools/ProducerPerformance.scala
+++ /dev/null
@@ -1,308 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package kafka.tools
-
-import kafka.metrics.KafkaMetricsReporter
-import kafka.producer.{NewShinyProducer, OldProducer}
-import kafka.utils.{CommandLineUtils, Exit, Logging, ToolsUtils, VerifiableProperties}
-import kafka.utils.Implicits._
-import kafka.message.CompressionCodec
-import kafka.serializer._
-import java.util.concurrent.{CountDownLatch, Executors}
-import java.util.concurrent.atomic.AtomicLong
-import java.util._
-import java.text.SimpleDateFormat
-import java.math.BigInteger
-import java.nio.charset.StandardCharsets
-
-import org.apache.kafka.common.utils.Utils
-
-/**
- * Load test for the producer
- */
-@deprecated("This class will be replaced by org.apache.kafka.tools.ProducerPerformance after the old producer client is removed", "0.9.0.0")
-object ProducerPerformance extends Logging {
-
-  def main(args: Array[String]) {
-    val config = new ProducerPerfConfig(args)
-    if (!config.isFixedSize)
-      logger.info("WARN: Throughput will be slower due to changing message size per request")
-
-    val totalBytesSent = new AtomicLong(0)
-    val totalMessagesSent = new AtomicLong(0)
-    val executor = Executors.newFixedThreadPool(config.numThreads)
-    val allDone = new CountDownLatch(config.numThreads)
-    val startMs = System.currentTimeMillis
-    val rand = new java.util.Random
-
-    if (!config.hideHeader)
-      println("start.time, end.time, compression, message.size, batch.size, total.data.sent.in.MB, MB.sec, " +
-        "total.data.sent.in.nMsg, nMsg.sec")
-
-    for (i <- 0 until config.numThreads) {
-      executor.execute(new ProducerThread(i, config, totalBytesSent, totalMessagesSent, allDone, rand))
-    }
-
-    allDone.await()
-    val endMs = System.currentTimeMillis
-    val elapsedSecs = (endMs - startMs) / 1000.0
-    val totalMBSent = (totalBytesSent.get * 1.0) / (1024 * 1024)
-    println("%s, %s, %d, %d, %d, %.2f, %.4f, %d, %.4f".format(
-      config.dateFormat.format(startMs), config.dateFormat.format(endMs),
-      config.compressionCodec.codec, config.messageSize, config.batchSize, totalMBSent,
-      totalMBSent / elapsedSecs, totalMessagesSent.get, totalMessagesSent.get / elapsedSecs))
-    Exit.exit(0)
-  }
-
-  class ProducerPerfConfig(args: Array[String]) extends PerfConfig(args) {
-    val brokerListOpt = parser.accepts("broker-list", "REQUIRED: broker info the list of broker host and port for bootstrap.")
-      .withRequiredArg
-      .describedAs("hostname:port,..,hostname:port")
-      .ofType(classOf[String])
-    val producerConfigOpt = parser.accepts("producer.config", "Producer config properties file.")
-      .withRequiredArg
-      .describedAs("config file")
-      .ofType(classOf[String])
-    val topicsOpt = parser.accepts("topics", "REQUIRED: The comma separated list of topics to produce to")
-      .withRequiredArg
-      .describedAs("topic1,topic2..")
-      .ofType(classOf[String])
-    val producerRequestTimeoutMsOpt = parser.accepts("request-timeout-ms", "The producer request timeout in ms")
-      .withRequiredArg()
-      .ofType(classOf[java.lang.Integer])
-      .defaultsTo(3000)
-    val producerNumRetriesOpt = parser.accepts("producer-num-retries", "The producer retries number")
-      .withRequiredArg()
-      .ofType(classOf[java.lang.Integer])
-      .defaultsTo(3)
-    val producerRetryBackOffMsOpt = parser.accepts("producer-retry-backoff-ms", "The producer retry backoff time in milliseconds")
-      .withRequiredArg()
-      .ofType(classOf[java.lang.Integer])
-      .defaultsTo(100)
-    val producerRequestRequiredAcksOpt = parser.accepts("request-num-acks", "Number of acks required for producer request " +
-      "to complete")
-      .withRequiredArg()
-      .ofType(classOf[java.lang.Integer])
-      .defaultsTo(-1)
-    val varyMessageSizeOpt = parser.accepts("vary-message-size", "If set, message size will vary up to the given maximum.")
-    val syncOpt = parser.accepts("sync", "If set, messages are sent synchronously.")
-    val numThreadsOpt = parser.accepts("threads", "Number of sending threads.")
-      .withRequiredArg
-      .describedAs("number of threads")
-      .ofType(classOf[java.lang.Integer])
-      .defaultsTo(1)
-    val initialMessageIdOpt = parser.accepts("initial-message-id", "The is used for generating test data, If set, messages will be tagged with an " +
-      "ID and sent by producer starting from this ID sequentially. Message content will be String type and " +
-      "in the form of 'Message:000...1:xxx...'")
-      .withRequiredArg()
-      .describedAs("initial message id")
-      .ofType(classOf[java.lang.Integer])
-    val messageSendGapMsOpt = parser.accepts("message-send-gap-ms", "If set, the send thread will wait for specified time between two sends")
-      .withRequiredArg()
-      .describedAs("message send time gap")
-      .ofType(classOf[java.lang.Integer])
-      .defaultsTo(0)
-    val csvMetricsReporterEnabledOpt = parser.accepts("csv-reporter-enabled", "If set, the CSV metrics reporter will be enabled")
-    val metricsDirectoryOpt = parser.accepts("metrics-dir", "If csv-reporter-enable is set, and this parameter is" +
-      "set, the csv metrics will be outputted here")
-      .withRequiredArg
-      .describedAs("metrics directory")
-      .ofType(classOf[java.lang.String])
-    val useNewProducerOpt = parser.accepts("new-producer", "Use the new producer implementation.")
-    val messageSizeOpt = parser.accepts("message-size", "The size of each message.")
-      .withRequiredArg
-      .describedAs("size")
-      .ofType(classOf[java.lang.Integer])
-      .defaultsTo(100)
-    val batchSizeOpt = parser.accepts("batch-size", "Number of messages to write in a single batch.")
-      .withRequiredArg
-      .describedAs("size")
-      .ofType(classOf[java.lang.Integer])
-      .defaultsTo(200)
-    val compressionCodecOpt = parser.accepts("compression-codec", "If set, messages are sent compressed")
-      .withRequiredArg
-      .describedAs("supported codec: NoCompressionCodec as 0, GZIPCompressionCodec as 1, SnappyCompressionCodec as 2, LZ4CompressionCodec as 3")
-      .ofType(classOf[java.lang.Integer])
-      .defaultsTo(0)
-
-    val options = parser.parse(args: _*)
-    CommandLineUtils.checkRequiredArgs(parser, options, topicsOpt, brokerListOpt, numMessagesOpt)
-
-    val topicsStr = options.valueOf(topicsOpt)
-    val topics = topicsStr.split(",")
-    val numMessages = options.valueOf(numMessagesOpt).longValue
-    val reportingInterval = options.valueOf(reportingIntervalOpt).intValue
-    val dateFormat = new SimpleDateFormat(options.valueOf(dateFormatOpt))
-    val hideHeader = options.has(hideHeaderOpt)
-    val brokerList = options.valueOf(brokerListOpt)
-    ToolsUtils.validatePortOrDie(parser,brokerList)
-    val messageSize = options.valueOf(messageSizeOpt).intValue
-    var isFixedSize = !options.has(varyMessageSizeOpt)
-    var isSync = options.has(syncOpt)
-    var batchSize = options.valueOf(batchSizeOpt).intValue
-    var numThreads = options.valueOf(numThreadsOpt).intValue
-    val compressionCodec = CompressionCodec.getCompressionCodec(options.valueOf(compressionCodecOpt).intValue)
-    val seqIdMode = options.has(initialMessageIdOpt)
-    var initialMessageId: Int = 0
-    if (seqIdMode)
-      initialMessageId = options.valueOf(initialMessageIdOpt).intValue()
-    val producerRequestTimeoutMs = options.valueOf(producerRequestTimeoutMsOpt).intValue()
-    val producerRequestRequiredAcks = options.valueOf(producerRequestRequiredAcksOpt).intValue()
-    val producerNumRetries = options.valueOf(producerNumRetriesOpt).intValue()
-    val producerRetryBackoffMs = options.valueOf(producerRetryBackOffMsOpt).intValue()
-    val useNewProducer = options.has(useNewProducerOpt)
-
-    val csvMetricsReporterEnabled = options.has(csvMetricsReporterEnabledOpt)
-
-    val producerProps = if (options.has(producerConfigOpt))
-      Utils.loadProps(options.valueOf(producerConfigOpt))
-    else
-      new Properties()
-
-    if (csvMetricsReporterEnabled) {
-      val props = new Properties()
-      props.put("kafka.metrics.polling.interval.secs", "1")
-      props.put("kafka.metrics.reporters", "kafka.metrics.KafkaCSVMetricsReporter")
-      if (options.has(metricsDirectoryOpt))
-        props.put("kafka.csv.metrics.dir", options.valueOf(metricsDirectoryOpt))
-      else
-        props.put("kafka.csv.metrics.dir", "kafka_metrics")
-      props.put("kafka.csv.metrics.reporter.enabled", "true")
-      val verifiableProps = new VerifiableProperties(props)
-      KafkaMetricsReporter.startReporters(verifiableProps)
-    }
-
-    val messageSendGapMs = options.valueOf(messageSendGapMsOpt).intValue()
-  }
-
-  class ProducerThread(val threadId: Int,
-    val config: ProducerPerfConfig,
-    val totalBytesSent: AtomicLong,
-    val totalMessagesSent: AtomicLong,
-    val allDone: CountDownLatch,
-    val rand: Random) extends Runnable {
-    val seqIdNumDigit = 10 // no. of digits for max int value
-
-    val messagesPerThread = config.numMessages / config.numThreads
-    debug("Messages per thread = " + messagesPerThread)
-    val props = new Properties()
-    val producer =
-      if (config.useNewProducer) {
-        import org.apache.kafka.clients.producer.ProducerConfig
-        props ++= config.producerProps
-        props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, config.brokerList)
-        props.put(ProducerConfig.SEND_BUFFER_CONFIG, (64 * 1024).toString)
-        props.put(ProducerConfig.CLIENT_ID_CONFIG, "producer-performance")
-        props.put(ProducerConfig.ACKS_CONFIG, config.producerRequestRequiredAcks.toString)
-        props.put(ProducerConfig.RETRIES_CONFIG, config.producerNumRetries.toString)
-        props.put(ProducerConfig.RETRY_BACKOFF_MS_CONFIG, config.producerRetryBackoffMs.toString)
-        props.put(ProducerConfig.COMPRESSION_TYPE_CONFIG, config.compressionCodec.name)
-        props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.ByteArraySerializer")
-        props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.ByteArraySerializer")
-        new NewShinyProducer(props)
-      } else {
-        props ++= config.producerProps
-        props.put("metadata.broker.list", config.brokerList)
-        props.put("compression.codec", config.compressionCodec.codec.toString)
-        props.put("send.buffer.bytes", (64 * 1024).toString)
-        if (!config.isSync) {
-          props.put("producer.type", "async")
-          props.put("batch.num.messages", config.batchSize.toString)
-          props.put("queue.enqueue.timeout.ms", "-1")
-        }
-        props.put("client.id", "producer-performance")
-        props.put("request.required.acks", config.producerRequestRequiredAcks.toString)
-        props.put("request.timeout.ms", config.producerRequestTimeoutMs.toString)
-        props.put("message.send.max.retries", config.producerNumRetries.toString)
-        props.put("retry.backoff.ms", config.producerRetryBackoffMs.toString)
-        props.put("serializer.class", classOf[DefaultEncoder].getName)
-        props.put("key.serializer.class", classOf[NullEncoder[Long]].getName)
-        new OldProducer(props)
-      }
-
-    // generate the sequential message ID
-    private val SEP = ":" // message field separator
-    private val messageIdLabel = "MessageID"
-    private val threadIdLabel = "ThreadID"
-    private val topicLabel = "Topic"
-    private var leftPaddedSeqId: String = ""
-
-    private def generateMessageWithSeqId(topic: String, msgId: Long, msgSize: Int): Array[Byte] = {
-      // Each thread gets a unique range of sequential no. for its ids.
-      // Eg. 1000 msg in 10 threads => 100 msg per thread
-      // thread 0 IDs :   0 ~  99
-      // thread 1 IDs : 100 ~ 199
-      // thread 2 IDs : 200 ~ 299
-      // . . .
-      leftPaddedSeqId = String.format("%0" + seqIdNumDigit + "d", long2Long(msgId))
-
-      val msgHeader = topicLabel + SEP +
-        topic + SEP +
-        threadIdLabel + SEP +
-        threadId + SEP +
-        messageIdLabel + SEP +
-        leftPaddedSeqId + SEP
-
-      val seqMsgString = String.format("%1$-" + msgSize + "s", msgHeader).replace(' ', 'x')
-      debug(seqMsgString)
-      seqMsgString.getBytes(StandardCharsets.UTF_8)
-    }
-
-    private def generateProducerData(topic: String, messageId: Long): Array[Byte] = {
-      val msgSize = if (config.isFixedSize) config.messageSize else 1 + rand.nextInt(config.messageSize)
-      if (config.seqIdMode) {
-        val seqId = config.initialMessageId + (messagesPerThread * threadId) + messageId
-        generateMessageWithSeqId(topic, seqId, msgSize)
-      } else {
-        new Array[Byte](msgSize)
-      }
-    }
-
-    override def run {
-      var bytesSent = 0L
-      var nSends = 0
-      var i: Long = 0L
-      var message: Array[Byte] = null
-
-      while (i < messagesPerThread) {
-        try {
-          config.topics.foreach(
-            topic => {
-              message = generateProducerData(topic, i)
-              producer.send(topic, BigInteger.valueOf(i).toByteArray, message)
-              bytesSent += message.size
-              nSends += 1
-              if (config.messageSendGapMs > 0)
-                Thread.sleep(config.messageSendGapMs)
-            })
-        } catch {
-          case e: Throwable => error("Error when sending message " + new String(message, StandardCharsets.UTF_8), e)
-        }
-        i += 1
-      }
-      try {
-        producer.close()
-      } catch {
-        case e: Throwable => error("Error when closing producer", e)
-      }
-      totalBytesSent.addAndGet(bytesSent)
-      totalMessagesSent.addAndGet(nSends)
-      allDone.countDown()
-    }
-  }
-}
diff --git a/core/src/test/scala/integration/kafka/api/CustomQuotaCallbackTest.scala b/core/src/test/scala/integration/kafka/api/CustomQuotaCallbackTest.scala
index 018971b..ffe7ffd 100644
--- a/core/src/test/scala/integration/kafka/api/CustomQuotaCallbackTest.scala
+++ b/core/src/test/scala/integration/kafka/api/CustomQuotaCallbackTest.scala
@@ -31,7 +31,6 @@ import org.apache.kafka.clients.consumer.{ConsumerConfig, KafkaConsumer}
 import org.apache.kafka.clients.producer.{KafkaProducer, ProducerConfig, ProducerRecord}
 import org.apache.kafka.common.{Cluster, Reconfigurable}
 import org.apache.kafka.common.config.SaslConfigs
-import org.apache.kafka.common.errors.SaslAuthenticationException
 import org.apache.kafka.common.network.ListenerName
 import org.apache.kafka.common.security.auth._
 import org.apache.kafka.common.security.scram.ScramCredential
diff --git a/core/src/test/scala/integration/kafka/server/DynamicBrokerReconfigurationTest.scala b/core/src/test/scala/integration/kafka/server/DynamicBrokerReconfigurationTest.scala
index a4854ae..45b3fdc 100644
--- a/core/src/test/scala/integration/kafka/server/DynamicBrokerReconfigurationTest.scala
+++ b/core/src/test/scala/integration/kafka/server/DynamicBrokerReconfigurationTest.scala
@@ -1068,13 +1068,6 @@ class DynamicBrokerReconfigurationTest extends ZooKeeperTestHarness with SaslSet
       (s"$configPrefix$SSL_KEYSTORE_LOCATION_CONFIG", props.getProperty(SSL_KEYSTORE_LOCATION_CONFIG)), expectFailure)
   }
 
-  private def alterSslTruststore(adminClient: AdminClient, props: Properties, listener: String, expectFailure: Boolean  = false): Unit = {
-    val configPrefix = listenerPrefix(listener)
-    val newProps = securityProps(props, TRUSTSTORE_PROPS, configPrefix)
-    reconfigureServers(newProps, perBrokerConfig = true,
-      (s"$configPrefix$SSL_TRUSTSTORE_LOCATION_CONFIG", props.getProperty(SSL_TRUSTSTORE_LOCATION_CONFIG)), expectFailure)
-  }
-
   private def alterSslKeystoreUsingConfigCommand(props: Properties, listener: String): Unit = {
     val configPrefix = listenerPrefix(listener)
     val newProps = securityProps(props, KEYSTORE_PROPS, configPrefix)
diff --git a/core/src/test/scala/other/kafka/TestKafkaAppender.scala b/core/src/test/scala/other/kafka/TestKafkaAppender.scala
deleted file mode 100644
index ecdcac0..0000000
--- a/core/src/test/scala/other/kafka/TestKafkaAppender.scala
+++ /dev/null
@@ -1,50 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
-*/
-
-package kafka
-
-import org.apache.log4j.PropertyConfigurator
-import kafka.utils.{Exit, Logging}
-import serializer.Encoder
-
-object TestKafkaAppender extends Logging {
-  
-  def main(args:Array[String]) {
-    
-    if(args.length < 1) {
-      println("USAGE: " + TestKafkaAppender.getClass.getName + " log4j_config")
-      Exit.exit(1)
-    }
-
-    try {
-      PropertyConfigurator.configure(args(0))
-    } catch {
-      case e: Exception =>
-        System.err.println("KafkaAppender could not be initialized ! Exiting..")
-        e.printStackTrace()
-        Exit.exit(1)
-    }
-
-    for (_ <- 1 to 10)
-      info("test")    
-  }
-}
-
-class AppenderStringSerializer(encoding: String = "UTF-8") extends Encoder[AnyRef] {
-  def toBytes(event: AnyRef): Array[Byte] = event.toString.getBytes(encoding)
-}
-
diff --git a/core/src/test/scala/unit/kafka/admin/DeleteTopicTest.scala b/core/src/test/scala/unit/kafka/admin/DeleteTopicTest.scala
index 4c033c4..057814b 100644
--- a/core/src/test/scala/unit/kafka/admin/DeleteTopicTest.scala
+++ b/core/src/test/scala/unit/kafka/admin/DeleteTopicTest.scala
@@ -24,7 +24,7 @@ import org.junit.Assert._
 import org.junit.{After, Test}
 import java.util.Properties
 
-import kafka.common.{TopicAlreadyMarkedForDeletionException, TopicAndPartition}
+import kafka.common.TopicAlreadyMarkedForDeletionException
 import kafka.controller.{OfflineReplica, PartitionAndReplica, ReplicaDeletionSuccessful}
 import org.apache.kafka.common.TopicPartition
 import org.apache.kafka.common.errors.UnknownTopicOrPartitionException
diff --git a/core/src/test/scala/unit/kafka/admin/ReassignPartitionsClusterTest.scala b/core/src/test/scala/unit/kafka/admin/ReassignPartitionsClusterTest.scala
index b42d7f7..4f40b27 100644
--- a/core/src/test/scala/unit/kafka/admin/ReassignPartitionsClusterTest.scala
+++ b/core/src/test/scala/unit/kafka/admin/ReassignPartitionsClusterTest.scala
@@ -34,6 +34,8 @@ import scala.collection.Seq
 import scala.util.Random
 import java.io.File
 
+import org.apache.kafka.clients.producer.ProducerRecord
+
 class ReassignPartitionsClusterTest extends ZooKeeperTestHarness with Logging {
   val partitionId = 0
   var servers: Seq[KafkaServer] = null
@@ -271,9 +273,9 @@ class ReassignPartitionsClusterTest extends ZooKeeperTestHarness with Logging {
     //Given throttle set so replication will take a certain number of secs
     val initialThrottle = Throttle(10 * 1000 * 1000, -1, () => zkUpdateDelay)
     val expectedDurationSecs = 5
-    val numMessages: Int = 500
-    val msgSize: Int = 100 * 1000
-    produceMessages(servers, topicName, numMessages, acks = 0, msgSize)
+    val numMessages = 500
+    val msgSize = 100 * 1000
+    produceMessages(topicName, numMessages, acks = 0, msgSize)
     assertEquals(expectedDurationSecs, numMessages * msgSize / initialThrottle.interBrokerLimit)
 
     //Start rebalance which will move replica on 100 -> replica on 102
@@ -321,8 +323,8 @@ class ReassignPartitionsClusterTest extends ZooKeeperTestHarness with Logging {
 
     //Given throttle set so replication will take a while
     val throttle: Long = 1000 * 1000
-    produceMessages(servers, "topic1", 100, acks = 0, 100 * 1000)
-    produceMessages(servers, "topic2", 100, acks = 0, 100 * 1000)
+    produceMessages("topic1", 100, acks = 0, 100 * 1000)
+    produceMessages("topic2", 100, acks = 0, 100 * 1000)
 
     //Start rebalance
     val newAssignment = Map(
@@ -358,7 +360,7 @@ class ReassignPartitionsClusterTest extends ZooKeeperTestHarness with Logging {
 
     //Given throttle set so replication will take at least 20 sec (we won't wait this long)
     val initialThrottle: Long = 1000 * 1000
-    produceMessages(servers, topicName, numMessages = 200, acks = 0, valueBytes = 100 * 1000)
+    produceMessages(topicName, numMessages = 200, acks = 0, valueLength = 100 * 1000)
 
     //Start rebalance
     val newAssignment = generateAssignment(zkClient, Array(101, 102), json(topicName), true)._1
@@ -630,4 +632,10 @@ class ReassignPartitionsClusterTest extends ZooKeeperTestHarness with Logging {
     val topicStr = topic.map { t => "{\"topic\": \"" + t + "\"}" }.mkString(",")
     s"""{"topics": [$topicStr],"version":1}"""
   }
+
+  private def produceMessages(topic: String, numMessages: Int, acks: Int, valueLength: Int): Unit = {
+    val records = (0 until numMessages).map(_ => new ProducerRecord[Array[Byte], Array[Byte]](topic,
+      new Array[Byte](valueLength)))
+    TestUtils.produceMessages(servers, records, acks)
+  }
 }
diff --git a/core/src/test/scala/unit/kafka/admin/ResetConsumerGroupOffsetTest.scala b/core/src/test/scala/unit/kafka/admin/ResetConsumerGroupOffsetTest.scala
index 9674486..04fc428 100644
--- a/core/src/test/scala/unit/kafka/admin/ResetConsumerGroupOffsetTest.scala
+++ b/core/src/test/scala/unit/kafka/admin/ResetConsumerGroupOffsetTest.scala
@@ -19,6 +19,7 @@ import java.util.{Calendar, Date, Properties}
 import kafka.admin.ConsumerGroupCommand.ConsumerGroupService
 import kafka.server.KafkaConfig
 import kafka.utils.TestUtils
+import org.apache.kafka.clients.producer.ProducerRecord
 import org.apache.kafka.common.TopicPartition
 import org.junit.Assert._
 import org.junit.Test
@@ -93,7 +94,7 @@ class ResetConsumerGroupOffsetTest extends ConsumerGroupCommandTest {
   def testResetOffsetsNewConsumerExistingTopic(): Unit = {
     val args = Array("--bootstrap-server", brokerList, "--reset-offsets", "--group", "new.group", "--topic", topic,
       "--to-offset", "50")
-    TestUtils.produceMessages(servers, topic, 100, acks = 1, 100 * 1000)
+    produceMessages(topic, 100)
     resetAndAssertOffsets(args, expectedOffset = 50, dryRun = true)
     resetAndAssertOffsets(args ++ Array("--dry-run"), expectedOffset = 50, dryRun = true)
     resetAndAssertOffsets(args ++ Array("--execute"), expectedOffset = 50, group = "new.group")
@@ -105,7 +106,7 @@ class ResetConsumerGroupOffsetTest extends ConsumerGroupCommandTest {
     val calendar = Calendar.getInstance()
     calendar.add(Calendar.DATE, -1)
 
-    TestUtils.produceMessages(servers, topic, 100, acks = 1, 100 * 1000)
+    produceMessages(topic, 100)
 
     val executor = addConsumerGroupExecutor(numConsumers = 1, topic)
     awaitConsumerProgress(count = 100L)
@@ -120,9 +121,9 @@ class ResetConsumerGroupOffsetTest extends ConsumerGroupCommandTest {
   def testResetOffsetsToZonedDateTime() {
     val format = new SimpleDateFormat("yyyy-MM-dd'T'HH:mm:ss.SSSXXX")
 
-    TestUtils.produceMessages(servers, topic, 50, acks = 1, 100 * 1000)
+    produceMessages(topic, 50)
     val checkpoint = new Date()
-    TestUtils.produceMessages(servers, topic, 50, acks = 1, 100 * 1000)
+    produceMessages(topic, 50)
 
     val executor = addConsumerGroupExecutor(numConsumers = 1, topic)
     awaitConsumerProgress(count = 100L)
@@ -137,7 +138,7 @@ class ResetConsumerGroupOffsetTest extends ConsumerGroupCommandTest {
   def testResetOffsetsByDuration() {
     val args = Array("--bootstrap-server", brokerList, "--reset-offsets", "--group", group, "--all-topics",
       "--by-duration", "PT1M", "--execute")
-    produceConsumeAndShutdown(totalMessages = 100)
+    produceConsumeAndShutdown(topic, totalMessages = 100)
     resetAndAssertOffsets(args, expectedOffset = 0)
   }
 
@@ -145,7 +146,7 @@ class ResetConsumerGroupOffsetTest extends ConsumerGroupCommandTest {
   def testResetOffsetsByDurationToEarliest() {
     val args = Array("--bootstrap-server", brokerList, "--reset-offsets", "--group", group, "--all-topics",
       "--by-duration", "PT0.1S", "--execute")
-    produceConsumeAndShutdown(totalMessages = 100)
+    produceConsumeAndShutdown(topic, totalMessages = 100)
     resetAndAssertOffsets(args, expectedOffset = 100)
   }
 
@@ -153,7 +154,7 @@ class ResetConsumerGroupOffsetTest extends ConsumerGroupCommandTest {
   def testResetOffsetsToEarliest() {
     val args = Array("--bootstrap-server", brokerList, "--reset-offsets", "--group", group, "--all-topics",
       "--to-earliest", "--execute")
-    produceConsumeAndShutdown(totalMessages = 100)
+    produceConsumeAndShutdown(topic, totalMessages = 100)
     resetAndAssertOffsets(args, expectedOffset = 0)
   }
 
@@ -161,8 +162,8 @@ class ResetConsumerGroupOffsetTest extends ConsumerGroupCommandTest {
   def testResetOffsetsToLatest() {
     val args = Array("--bootstrap-server", brokerList, "--reset-offsets", "--group", group, "--all-topics",
       "--to-latest", "--execute")
-    produceConsumeAndShutdown(totalMessages = 100)
-    TestUtils.produceMessages(servers, topic, 100, acks = 1, 100 * 1000)
+    produceConsumeAndShutdown(topic, totalMessages = 100)
+    produceMessages(topic, 100)
     resetAndAssertOffsets(args, expectedOffset = 200)
   }
 
@@ -170,8 +171,8 @@ class ResetConsumerGroupOffsetTest extends ConsumerGroupCommandTest {
   def testResetOffsetsToCurrentOffset() {
     val args = Array("--bootstrap-server", brokerList, "--reset-offsets", "--group", group, "--all-topics",
       "--to-current", "--execute")
-    produceConsumeAndShutdown(totalMessages = 100)
-    TestUtils.produceMessages(servers, topic, 100, acks = 1, 100 * 1000)
+    produceConsumeAndShutdown(topic, totalMessages = 100)
+    produceMessages(topic, 100)
     resetAndAssertOffsets(args, expectedOffset = 100)
   }
 
@@ -179,7 +180,7 @@ class ResetConsumerGroupOffsetTest extends ConsumerGroupCommandTest {
   def testResetOffsetsToSpecificOffset() {
     val args = Array("--bootstrap-server", brokerList, "--reset-offsets", "--group", group, "--all-topics",
       "--to-offset", "1", "--execute")
-    produceConsumeAndShutdown(totalMessages = 100)
+    produceConsumeAndShutdown(topic, totalMessages = 100)
     resetAndAssertOffsets(args, expectedOffset = 1)
   }
 
@@ -187,8 +188,8 @@ class ResetConsumerGroupOffsetTest extends ConsumerGroupCommandTest {
   def testResetOffsetsShiftPlus() {
     val args = Array("--bootstrap-server", brokerList, "--reset-offsets", "--group", group, "--all-topics",
       "--shift-by", "50", "--execute")
-    produceConsumeAndShutdown(totalMessages = 100)
-    TestUtils.produceMessages(servers, topic, 100, acks = 1, 100 * 1000)
+    produceConsumeAndShutdown(topic, totalMessages = 100)
+    produceMessages(topic, 100)
     resetAndAssertOffsets(args, expectedOffset = 150)
   }
 
@@ -196,8 +197,8 @@ class ResetConsumerGroupOffsetTest extends ConsumerGroupCommandTest {
   def testResetOffsetsShiftMinus() {
     val args = Array("--bootstrap-server", brokerList, "--reset-offsets", "--group", group, "--all-topics",
       "--shift-by", "-50", "--execute")
-    produceConsumeAndShutdown(totalMessages = 100)
-    TestUtils.produceMessages(servers, topic, 100, acks = 1, 100 * 1000)
+    produceConsumeAndShutdown(topic, totalMessages = 100)
+    produceMessages(topic, 100)
     resetAndAssertOffsets(args, expectedOffset = 50)
   }
 
@@ -205,8 +206,8 @@ class ResetConsumerGroupOffsetTest extends ConsumerGroupCommandTest {
   def testResetOffsetsShiftByLowerThanEarliest() {
     val args = Array("--bootstrap-server", brokerList, "--reset-offsets", "--group", group, "--all-topics",
       "--shift-by", "-150", "--execute")
-    produceConsumeAndShutdown(totalMessages = 100)
-    TestUtils.produceMessages(servers, topic, 100, acks = 1, 100 * 1000)
+    produceConsumeAndShutdown(topic, totalMessages = 100)
+    produceMessages(topic, 100)
     resetAndAssertOffsets(args, expectedOffset = 0)
   }
 
@@ -214,8 +215,8 @@ class ResetConsumerGroupOffsetTest extends ConsumerGroupCommandTest {
   def testResetOffsetsShiftByHigherThanLatest() {
     val args = Array("--bootstrap-server", brokerList, "--reset-offsets", "--group", group, "--all-topics",
       "--shift-by", "150", "--execute")
-    produceConsumeAndShutdown(totalMessages = 100)
-    TestUtils.produceMessages(servers, topic, 100, acks = 1, 100 * 1000)
+    produceConsumeAndShutdown(topic, totalMessages = 100)
+    produceMessages(topic, 100)
     resetAndAssertOffsets(args, expectedOffset = 200)
   }
 
@@ -223,7 +224,7 @@ class ResetConsumerGroupOffsetTest extends ConsumerGroupCommandTest {
   def testResetOffsetsToEarliestOnOneTopic() {
     val args = Array("--bootstrap-server", brokerList, "--reset-offsets", "--group", group, "--topic", topic,
       "--to-earliest", "--execute")
-    produceConsumeAndShutdown(totalMessages = 100)
+    produceConsumeAndShutdown(topic, totalMessages = 100)
     resetAndAssertOffsets(args, expectedOffset = 0)
   }
 
@@ -236,7 +237,7 @@ class ResetConsumerGroupOffsetTest extends ConsumerGroupCommandTest {
       s"$topic:1", "--to-earliest", "--execute")
     val consumerGroupCommand = getConsumerGroupService(args)
 
-    produceConsumeAndShutdown(totalMessages = 100, numConsumers = 2, topic)
+    produceConsumeAndShutdown(topic, totalMessages = 100, numConsumers = 2)
     val priorCommittedOffsets = committedOffsets(topic = topic)
 
     val tp0 = new TopicPartition(topic, 0)
@@ -258,8 +259,8 @@ class ResetConsumerGroupOffsetTest extends ConsumerGroupCommandTest {
       "--topic", topic2, "--to-earliest", "--execute")
     val consumerGroupCommand = getConsumerGroupService(args)
 
-    produceConsumeAndShutdown(100, 1, topic1)
-    produceConsumeAndShutdown(100, 1, topic2)
+    produceConsumeAndShutdown(topic1, 100, 1)
+    produceConsumeAndShutdown(topic2, 100, 1)
 
     val tp1 = new TopicPartition(topic1, 0)
     val tp2 = new TopicPartition(topic2, 0)
@@ -285,8 +286,8 @@ class ResetConsumerGroupOffsetTest extends ConsumerGroupCommandTest {
       s"$topic1:1", "--topic", s"$topic2:1", "--to-earliest", "--execute")
     val consumerGroupCommand = getConsumerGroupService(args)
 
-    produceConsumeAndShutdown(100, 2, topic1)
-    produceConsumeAndShutdown(100, 2, topic2)
+    produceConsumeAndShutdown(topic1, 100, 2)
+    produceConsumeAndShutdown(topic2, 100, 2)
 
     val priorCommittedOffsets1 = committedOffsets(topic1)
     val priorCommittedOffsets2 = committedOffsets(topic2)
@@ -314,7 +315,7 @@ class ResetConsumerGroupOffsetTest extends ConsumerGroupCommandTest {
       "--to-offset", "2", "--export")
     val consumerGroupCommand = getConsumerGroupService(cgcArgs)
 
-    produceConsumeAndShutdown(100, 2, topic)
+    produceConsumeAndShutdown(topic, 100, 2)
 
     val file = File.createTempFile("reset", ".csv")
     file.deleteOnExit()
@@ -334,8 +335,14 @@ class ResetConsumerGroupOffsetTest extends ConsumerGroupCommandTest {
     adminZkClient.deleteTopic(topic)
   }
 
-  private def produceConsumeAndShutdown(totalMessages: Int, numConsumers: Int = 1, topic: String = topic) {
-    TestUtils.produceMessages(servers, topic, totalMessages, acks = 1, 100 * 1000)
+  private def produceMessages(topic: String, numMessages: Int): Unit = {
+    val records = (0 until numMessages).map(_ => new ProducerRecord[Array[Byte], Array[Byte]](topic,
+      new Array[Byte](100 * 1000)))
+    TestUtils.produceMessages(servers, records, acks = 1)
+  }
+
+  private def produceConsumeAndShutdown(topic: String, totalMessages: Int, numConsumers: Int = 1) {
+    produceMessages(topic, totalMessages)
     val executor =  addConsumerGroupExecutor(numConsumers, topic)
     awaitConsumerProgress(topic, totalMessages)
     executor.shutdown()
diff --git a/core/src/test/scala/unit/kafka/api/RequestResponseSerializationTest.scala b/core/src/test/scala/unit/kafka/api/RequestResponseSerializationTest.scala
index 4bcf61d..33f9352 100644
--- a/core/src/test/scala/unit/kafka/api/RequestResponseSerializationTest.scala
+++ b/core/src/test/scala/unit/kafka/api/RequestResponseSerializationTest.scala
@@ -80,16 +80,6 @@ object SerializationTestUtils {
   private val brokers = List(createBroker(0, "localhost", 1011), createBroker(0, "localhost", 1012),
     createBroker(0, "localhost", 1013))
 
-  def createTestProducerRequest: ProducerRequest = {
-    new ProducerRequest(1, "client 1", 0, 1000, topicDataProducerRequest)
-  }
-
-  def createTestProducerResponse: ProducerResponse =
-    ProducerResponse(1, Map(
-      TopicAndPartition(topic1, 0) -> ProducerResponseStatus(Errors.forCode(0.toShort), 10001),
-      TopicAndPartition(topic2, 0) -> ProducerResponseStatus(Errors.forCode(0.toShort), 20001)
-    ), ProducerRequest.CurrentVersion, 100)
-
   def createTestFetchRequest: FetchRequest = new FetchRequest(requestInfo = requestInfos.toVector)
 
   def createTestFetchResponse: FetchResponse = FetchResponse(1, topicDataFetchResponse.toVector)
@@ -163,8 +153,6 @@ object SerializationTestUtils {
 }
 
 class RequestResponseSerializationTest extends JUnitSuite {
-  private val producerRequest = SerializationTestUtils.createTestProducerRequest
-  private val producerResponse = SerializationTestUtils.createTestProducerResponse
   private val fetchRequest = SerializationTestUtils.createTestFetchRequest
   private val offsetRequest = SerializationTestUtils.createTestOffsetRequest
   private val offsetResponse = SerializationTestUtils.createTestOffsetResponse
@@ -182,8 +170,7 @@ class RequestResponseSerializationTest extends JUnitSuite {
   def testSerializationAndDeserialization() {
 
     val requestsAndResponses =
-      collection.immutable.Seq(producerRequest, producerResponse,
-                               fetchRequest, offsetRequest, offsetResponse,
+      collection.immutable.Seq(fetchRequest, offsetRequest, offsetResponse,
                                offsetCommitRequestV0, offsetCommitRequestV1, offsetCommitRequestV2, offsetCommitResponse,
                                offsetFetchRequest, offsetFetchResponse,
                                consumerMetadataRequest, consumerMetadataResponse,
@@ -202,27 +189,6 @@ class RequestResponseSerializationTest extends JUnitSuite {
   }
 
   @Test
-  def testProduceResponseVersion() {
-    val oldClientResponse = ProducerResponse(1, Map(
-      TopicAndPartition("t1", 0) -> ProducerResponseStatus(Errors.NONE, 10001),
-      TopicAndPartition("t2", 0) -> ProducerResponseStatus(Errors.NONE, 20001)
-    ))
-
-    val newClientResponse = ProducerResponse(1, Map(
-      TopicAndPartition("t1", 0) -> ProducerResponseStatus(Errors.NONE, 10001),
-      TopicAndPartition("t2", 0) -> ProducerResponseStatus(Errors.NONE, 20001)
-    ), 1, 100)
-
-    // new response should have 4 bytes more than the old response since delayTime is an INT32
-    assertEquals(oldClientResponse.sizeInBytes + 4, newClientResponse.sizeInBytes)
-
-    val buffer = ByteBuffer.allocate(newClientResponse.sizeInBytes)
-    newClientResponse.writeTo(buffer)
-    buffer.rewind()
-    assertEquals(ProducerResponse.readFrom(buffer).throttleTime, 100)
-  }
-
-  @Test
   def testFetchResponseVersion() {
     val oldClientResponse = FetchResponse(1, Map(
       TopicAndPartition("t1", 0) -> new FetchResponsePartitionData(messages = new ByteBufferMessageSet(new Message("first message".getBytes)))
diff --git a/core/src/test/scala/unit/kafka/common/ConfigTest.scala b/core/src/test/scala/unit/kafka/common/ConfigTest.scala
index 2d20b1e..a506d52 100644
--- a/core/src/test/scala/unit/kafka/common/ConfigTest.scala
+++ b/core/src/test/scala/unit/kafka/common/ConfigTest.scala
@@ -20,43 +20,11 @@ package kafka.common
 import org.junit.Assert._
 import collection.mutable.ArrayBuffer
 import org.junit.Test
-import kafka.producer.ProducerConfig
 import kafka.consumer.ConsumerConfig
 
 class ConfigTest {
 
   @Test
-  @deprecated("This test is deprecated and it will be removed in a future release.", "0.10.0.0")
-  def testInvalidClientIds() {
-    val invalidClientIds = new ArrayBuffer[String]()
-    val badChars = Array('/', '\\', ',', '\u0000', ':', "\"", '\'', ';', '*', '?', ' ', '\t', '\r', '\n', '=')
-    for (weirdChar <- badChars) {
-      invalidClientIds += "Is" + weirdChar + "illegal"
-    }
-
-    for (i <- 0 until invalidClientIds.size) {
-      try {
-        ProducerConfig.validateClientId(invalidClientIds(i))
-        fail("Should throw InvalidClientIdException.")
-      }
-      catch {
-        case _: InvalidConfigException => // This is good
-      }
-    }
-
-    val validClientIds = new ArrayBuffer[String]()
-    validClientIds += ("valid", "CLIENT", "iDs", "ar6", "VaL1d", "_0-9_.", "")
-    for (i <- 0 until validClientIds.size) {
-      try {
-        ProducerConfig.validateClientId(validClientIds(i))
-      }
-      catch {
-        case _: Exception => fail("Should not throw exception.")
-      }
-    }
-  }
-
-  @Test
   def testInvalidGroupIds() {
     val invalidGroupIds = new ArrayBuffer[String]()
     val badChars = Array('/', '\\', ',', '\u0000', ':', "\"", '\'', ';', '*', '?', ' ', '\t', '\r', '\n', '=')
diff --git a/core/src/test/scala/unit/kafka/consumer/ZookeeperConsumerConnectorTest.scala b/core/src/test/scala/unit/kafka/consumer/ZookeeperConsumerConnectorTest.scala
index b4381a4..91d0af4 100644
--- a/core/src/test/scala/unit/kafka/consumer/ZookeeperConsumerConnectorTest.scala
+++ b/core/src/test/scala/unit/kafka/consumer/ZookeeperConsumerConnectorTest.scala
@@ -23,11 +23,11 @@ import org.junit.Assert._
 import kafka.common.MessageStreamsExistException
 import kafka.integration.KafkaServerTestHarness
 import kafka.javaapi.consumer.ConsumerRebalanceListener
-import kafka.message._
 import kafka.serializer._
 import kafka.server._
 import kafka.utils.TestUtils._
 import kafka.utils._
+import org.apache.kafka.common.record.CompressionType
 import org.apache.kafka.common.security.JaasUtils
 import org.apache.log4j.{Level, Logger}
 import org.junit.{After, Before, Test}
@@ -97,8 +97,7 @@ class ZookeeperConsumerConnectorTest extends KafkaServerTestHarness with Logging
     zkConsumerConnector0.shutdown
 
     // send some messages to each broker
-    val sentMessages1 = sendMessages(servers, topic, nMessages, 0) ++
-      sendMessages(servers, topic, nMessages, 1)
+    val sentMessages1 = produceMessages(nMessages, acks = 0) ++ produceMessages(nMessages, acks = 1)
 
     // wait to make sure the topic and partition have a leader for the successful case
     waitUntilLeaderIsElectedOrChanged(zkClient, topic, 0)
@@ -131,8 +130,7 @@ class ZookeeperConsumerConnectorTest extends KafkaServerTestHarness with Logging
     val zkConsumerConnector2 = new ZookeeperConsumerConnector(consumerConfig2, true)
     val topicMessageStreams2 = zkConsumerConnector2.createMessageStreams(Map(topic -> 1), new StringDecoder(), new StringDecoder())
     // send some messages to each broker
-    val sentMessages2 = sendMessages(servers, topic, nMessages, 0) ++
-                         sendMessages(servers, topic, nMessages, 1)
+    val sentMessages2 = produceMessages(nMessages, acks = 0) ++ produceMessages(nMessages, acks = 1)
 
     waitUntilLeaderIsElectedOrChanged(zkClient, topic, 0)
     waitUntilLeaderIsElectedOrChanged(zkClient, topic, 1)
@@ -152,8 +150,7 @@ class ZookeeperConsumerConnectorTest extends KafkaServerTestHarness with Logging
     val zkConsumerConnector3 = new ZookeeperConsumerConnector(consumerConfig3, true)
     zkConsumerConnector3.createMessageStreams(new mutable.HashMap[String, Int]())
     // send some messages to each broker
-    val sentMessages3 = sendMessages(servers, topic, nMessages, 0) ++
-                        sendMessages(servers, topic, nMessages, 1)
+    val sentMessages3 = produceMessages(nMessages, acks = 0) ++ produceMessages(nMessages, acks = 1)
 
     waitUntilLeaderIsElectedOrChanged(zkClient, topic, 0)
     waitUntilLeaderIsElectedOrChanged(zkClient, topic, 1)
@@ -180,14 +177,19 @@ class ZookeeperConsumerConnectorTest extends KafkaServerTestHarness with Logging
     requestHandlerLogger.setLevel(Level.ERROR)
   }
 
+  private def produceMessages(numMessages: Int, acks: Int = -1,
+                              compressionType: CompressionType = CompressionType.NONE): Seq[String] = {
+    TestUtils.generateAndProduceMessages(servers, topic, numMessages, acks, compressionType)
+  }
+
   @Test
   def testCompression() {
     val requestHandlerLogger = Logger.getLogger(classOf[kafka.server.KafkaRequestHandler])
     requestHandlerLogger.setLevel(Level.FATAL)
 
     // send some messages to each broker
-    val sentMessages1 = sendMessages(servers, topic, nMessages, 0, GZIPCompressionCodec) ++
-                        sendMessages(servers, topic, nMessages, 1, GZIPCompressionCodec)
+    val sentMessages1 = produceMessages(nMessages, acks = 0, CompressionType.GZIP) ++
+                        produceMessages(nMessages, acks = 1, CompressionType.GZIP)
 
     waitUntilLeaderIsElectedOrChanged(zkClient, topic, 0)
     waitUntilLeaderIsElectedOrChanged(zkClient, topic, 1)
@@ -219,8 +221,8 @@ class ZookeeperConsumerConnectorTest extends KafkaServerTestHarness with Logging
     val zkConsumerConnector2 = new ZookeeperConsumerConnector(consumerConfig2, true)
     val topicMessageStreams2 = zkConsumerConnector2.createMessageStreams(Map(topic -> 1), new StringDecoder(), new StringDecoder())
     // send some messages to each broker
-    val sentMessages2 = sendMessages(servers, topic, nMessages, 0, GZIPCompressionCodec) ++
-                        sendMessages(servers, topic, nMessages, 1, GZIPCompressionCodec)
+    val sentMessages2 = produceMessages(nMessages, acks = 0, CompressionType.GZIP) ++
+                        produceMessages(nMessages, acks = 1, CompressionType.GZIP)
 
     waitUntilLeaderIsElectedOrChanged(zkClient, topic, 0)
     waitUntilLeaderIsElectedOrChanged(zkClient, topic, 1)
@@ -240,8 +242,8 @@ class ZookeeperConsumerConnectorTest extends KafkaServerTestHarness with Logging
     val zkConsumerConnector3 = new ZookeeperConsumerConnector(consumerConfig3, true)
     zkConsumerConnector3.createMessageStreams(new mutable.HashMap[String, Int](), new StringDecoder(), new StringDecoder())
     // send some messages to each broker
-    val sentMessages3 = sendMessages(servers, topic, nMessages, 0, GZIPCompressionCodec) ++
-                        sendMessages(servers, topic, nMessages, 1, GZIPCompressionCodec)
+    val sentMessages3 = produceMessages(nMessages, acks = 0, CompressionType.GZIP) ++
+                        produceMessages(nMessages, acks = 1, CompressionType.GZIP)
 
     waitUntilLeaderIsElectedOrChanged(zkClient, topic, 0)
     waitUntilLeaderIsElectedOrChanged(zkClient, topic, 1)
@@ -263,8 +265,8 @@ class ZookeeperConsumerConnectorTest extends KafkaServerTestHarness with Logging
   @Test
   def testCompressionSetConsumption() {
     // send some messages to each broker
-    val sentMessages = sendMessages(servers, topic, 200, 0, DefaultCompressionCodec) ++
-                       sendMessages(servers, topic, 200, 1, DefaultCompressionCodec)
+    val sentMessages = produceMessages(200, acks = 0, CompressionType.GZIP) ++
+                       produceMessages(200, acks = 1, CompressionType.GZIP)
 
     TestUtils.waitUntilMetadataIsPropagated(servers, topic, 0)
     TestUtils.waitUntilMetadataIsPropagated(servers, topic, 1)
@@ -290,8 +292,7 @@ class ZookeeperConsumerConnectorTest extends KafkaServerTestHarness with Logging
     requestHandlerLogger.setLevel(Level.FATAL)
 
     // send some messages to each broker
-    val sentMessages = sendMessages(servers, topic, nMessages, 0, NoCompressionCodec) ++
-                       sendMessages(servers, topic, nMessages, 1, NoCompressionCodec)
+    val sentMessages = produceMessages(nMessages, acks = 0) ++ produceMessages(nMessages, acks = 1)
 
     TestUtils.waitUntilMetadataIsPropagated(servers, topic, 0)
     TestUtils.waitUntilMetadataIsPropagated(servers, topic, 1)
@@ -332,7 +333,7 @@ class ZookeeperConsumerConnectorTest extends KafkaServerTestHarness with Logging
     createTopic(topic, numPartitions = 1, replicationFactor = 1)
 
     // send some messages to each broker
-    val sentMessages1 = sendMessages(servers, topic, nMessages)
+    val sentMessages1 = produceMessages(nMessages)
 
     // create a consumer
     val consumerConfig1 = new ConsumerConfig(TestUtils.createConsumerProperties(zkConnect, group, consumer1))
@@ -359,8 +360,8 @@ class ZookeeperConsumerConnectorTest extends KafkaServerTestHarness with Logging
   @Test
   def testConsumerRebalanceListener() {
     // Send messages to create topic
-    sendMessages(servers, topic, nMessages, 0)
-    sendMessages(servers, topic, nMessages, 1)
+    produceMessages(nMessages, acks = 0)
+    produceMessages(nMessages, acks = 1)
 
     val consumerConfig1 = new ConsumerConfig(TestUtils.createConsumerProperties(zkConnect, group, consumer1))
     val zkConsumerConnector1 = new ZookeeperConsumerConnector(consumerConfig1, true)
diff --git a/core/src/test/scala/unit/kafka/integration/AutoOffsetResetTest.scala b/core/src/test/scala/unit/kafka/integration/AutoOffsetResetTest.scala
index a49523f..5abc352 100644
--- a/core/src/test/scala/unit/kafka/integration/AutoOffsetResetTest.scala
+++ b/core/src/test/scala/unit/kafka/integration/AutoOffsetResetTest.scala
@@ -17,13 +17,12 @@
 
 package kafka.integration
 
-import kafka.utils.{ZKGroupTopicDirs, Logging}
-import kafka.consumer.{ConsumerTimeoutException, ConsumerConfig, ConsumerConnector, Consumer}
+import kafka.utils.{Logging, ZKGroupTopicDirs}
+import kafka.consumer.{Consumer, ConsumerConfig, ConsumerConnector, ConsumerTimeoutException}
 import kafka.server._
 import kafka.utils.TestUtils
-import kafka.serializer._
-import kafka.producer.{Producer, KeyedMessage}
-
+import kafka.utils.TestUtils.createNewProducer
+import org.apache.kafka.clients.producer.ProducerRecord
 import org.junit.{After, Before, Test}
 import org.apache.log4j.{Level, Logger}
 import org.junit.Assert._
@@ -79,12 +78,10 @@ class AutoOffsetResetTest extends KafkaServerTestHarness with Logging {
   def resetAndConsume(numMessages: Int, resetTo: String, offset: Long): Int = {
     createTopic(topic, 1, 1)
 
-    val producer: Producer[String, Array[Byte]] = TestUtils.createProducer(
-      TestUtils.getBrokerListStrFromServers(servers),
-      keyEncoder = classOf[StringEncoder].getName)
+    val producer = createNewProducer(TestUtils.getBrokerListStrFromServers(servers), retries = 5)
 
-    for(_ <- 0 until numMessages)
-      producer.send(new KeyedMessage[String, Array[Byte]](topic, topic, "test".getBytes))
+    val futures = (0 until numMessages).map(_ => producer.send(new ProducerRecord(topic, topic.getBytes, "test".getBytes)))
+    futures.foreach(_.get)
 
     // update offset in ZooKeeper for consumer to jump "forward" in time
     val dirs = new ZKGroupTopicDirs(group, topic)
diff --git a/core/src/test/scala/unit/kafka/integration/FetcherTest.scala b/core/src/test/scala/unit/kafka/integration/FetcherTest.scala
index 0a8c49f..18373f2 100644
--- a/core/src/test/scala/unit/kafka/integration/FetcherTest.scala
+++ b/core/src/test/scala/unit/kafka/integration/FetcherTest.scala
@@ -77,11 +77,11 @@ class FetcherTest extends KafkaServerTestHarness {
   @Test
   def testFetcher() {
     val perNode = 2
-    var count = TestUtils.produceMessages(servers, topic, perNode).size
+    var count = TestUtils.generateAndProduceMessages(servers, topic, perNode).size
 
     fetch(count)
     assertQueueEmpty()
-    count = TestUtils.produceMessages(servers, topic, perNode).size
+    count = TestUtils.generateAndProduceMessages(servers, topic, perNode).size
     fetch(count)
     assertQueueEmpty()
   }
diff --git a/core/src/test/scala/unit/kafka/integration/PrimitiveApiTest.scala b/core/src/test/scala/unit/kafka/integration/PrimitiveApiTest.scala
deleted file mode 100755
index 0cf95e9..0000000
--- a/core/src/test/scala/unit/kafka/integration/PrimitiveApiTest.scala
+++ /dev/null
@@ -1,276 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- * 
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package kafka.integration
-
-import java.nio.ByteBuffer
-
-import org.junit.Assert._
-import kafka.api.{FetchRequest, FetchRequestBuilder, PartitionFetchInfo}
-import kafka.server.{KafkaConfig, KafkaRequestHandler}
-import kafka.producer.{KeyedMessage, Producer}
-import org.apache.log4j.{Level, Logger}
-import org.junit.Test
-
-import scala.collection._
-import kafka.common.{ErrorMapping, OffsetOutOfRangeException, TopicAndPartition, UnknownTopicOrPartitionException}
-import kafka.utils.{StaticPartitioner, TestUtils}
-import kafka.serializer.StringEncoder
-import java.util.Properties
-
-import org.apache.kafka.common.TopicPartition
-
-/**
- * End to end tests of the primitive apis against a local server
- */
-@deprecated("This test has been deprecated and it will be removed in a future release", "0.10.0.0")
-class PrimitiveApiTest extends ProducerConsumerTestHarness {
-  val requestHandlerLogger = Logger.getLogger(classOf[KafkaRequestHandler])
-
-  def generateConfigs = List(KafkaConfig.fromProps(TestUtils.createBrokerConfig(0, zkConnect)))
-
-  @Test
-  def testFetchRequestCanProperlySerialize() {
-    val request = new FetchRequestBuilder()
-      .clientId("test-client")
-      .maxWait(10001)
-      .minBytes(4444)
-      .addFetch("topic1", 0, 0, 10000)
-      .addFetch("topic2", 1, 1024, 9999)
-      .addFetch("topic1", 1, 256, 444)
-      .build()
-    val serializedBuffer = ByteBuffer.allocate(request.sizeInBytes)
-    request.writeTo(serializedBuffer)
-    serializedBuffer.rewind()
-    val deserializedRequest = FetchRequest.readFrom(serializedBuffer)
-    assertEquals(request, deserializedRequest)
-  }
-
-  @Test
-  def testEmptyFetchRequest() {
-    val partitionRequests = immutable.Map[TopicAndPartition, PartitionFetchInfo]()
-    val request = new FetchRequest(requestInfo = partitionRequests.toVector)
-    val fetched = consumer.fetch(request)
-    assertTrue(!fetched.hasError && fetched.data.isEmpty)
-  }
-
-  @Test
-  def testDefaultEncoderProducerAndFetch() {
-    val topic = "test-topic"
-
-    producer.send(new KeyedMessage[String, String](topic, "test-message"))
-
-    val replica = servers.head.replicaManager.getReplica(new TopicPartition(topic, 0)).get
-    assertTrue("HighWatermark should equal logEndOffset with just 1 replica",
-               replica.logEndOffset.messageOffset > 0 && replica.logEndOffset.equals(replica.highWatermark))
-
-    val request = new FetchRequestBuilder()
-      .clientId("test-client")
-      .addFetch(topic, 0, 0, 10000)
-      .build()
-    val fetched = consumer.fetch(request)
-    assertEquals("Returned correlationId doesn't match that in request.", 0, fetched.correlationId)
-
-    val messageSet = fetched.messageSet(topic, 0)
-    assertTrue(messageSet.iterator.hasNext)
-
-    val fetchedMessageAndOffset = messageSet.head
-    assertEquals("test-message", TestUtils.readString(fetchedMessageAndOffset.message.payload, "UTF-8"))
-  }
-
-  @Test
-  def testDefaultEncoderProducerAndFetchWithCompression() {
-    val topic = "test-topic"
-    val props = new Properties()
-    props.put("compression.codec", "gzip")
-
-    val stringProducer1 = TestUtils.createProducer[String, String](
-      TestUtils.getBrokerListStrFromServers(servers),
-      encoder = classOf[StringEncoder].getName,
-      keyEncoder = classOf[StringEncoder].getName,
-      partitioner = classOf[StaticPartitioner].getName,
-      producerProps = props)
-
-    stringProducer1.send(new KeyedMessage[String, String](topic, "test-message"))
-
-    val fetched = consumer.fetch(new FetchRequestBuilder().addFetch(topic, 0, 0, 10000).build())
-    val messageSet = fetched.messageSet(topic, 0)
-    assertTrue(messageSet.iterator.hasNext)
-
-    val fetchedMessageAndOffset = messageSet.head
-    assertEquals("test-message", TestUtils.readString(fetchedMessageAndOffset.message.payload, "UTF-8"))
-  }
-
-  private def produceAndMultiFetch(producer: Producer[String, String]) {
-    for(topic <- List("test1", "test2", "test3", "test4"))
-      createTopic(topic)
-
-    // send some messages
-    val topics = List(("test4", 0), ("test1", 0), ("test2", 0), ("test3", 0));
-    {
-      val messages = new mutable.HashMap[String, Seq[String]]
-      val builder = new FetchRequestBuilder()
-      for( (topic, partition) <- topics) {
-        val messageList = List("a_" + topic, "b_" + topic)
-        val producerData = messageList.map(new KeyedMessage[String, String](topic, topic, _))
-        messages += topic -> messageList
-        producer.send(producerData:_*)
-        builder.addFetch(topic, partition, 0, 10000)
-      }
-
-      val request = builder.build()
-      val response = consumer.fetch(request)
-      for((topic, partition) <- topics) {
-        val fetched = response.messageSet(topic, partition)
-        assertEquals(messages(topic), fetched.map(messageAndOffset => TestUtils.readString(messageAndOffset.message.payload)))
-      }
-    }
-
-    // temporarily set request handler logger to a higher level
-    requestHandlerLogger.setLevel(Level.FATAL)
-
-    {
-      // send some invalid offsets
-      val builder = new FetchRequestBuilder()
-      for((topic, partition) <- topics)
-        builder.addFetch(topic, partition, -1, 10000)
-
-      try {
-        val request = builder.build()
-        val response = consumer.fetch(request)
-        response.data.foreach(pdata => ErrorMapping.maybeThrowException(pdata._2.error.code))
-        fail("Expected exception when fetching message with invalid offset")
-      } catch {
-        case _: OffsetOutOfRangeException => // This is good.
-      }
-    }
-
-    {
-      // send some invalid partitions
-      val builder = new FetchRequestBuilder()
-      for((topic, _) <- topics)
-        builder.addFetch(topic, -1, 0, 10000)
-
-      try {
-        val request = builder.build()
-        val response = consumer.fetch(request)
-        response.data.foreach(pdata => ErrorMapping.maybeThrowException(pdata._2.error.code))
-        fail("Expected exception when fetching message with invalid partition")
-      } catch {
-        case _: UnknownTopicOrPartitionException => // This is good.
-      }
-    }
-
-    // restore set request handler logger to a higher level
-    requestHandlerLogger.setLevel(Level.ERROR)
-  }
-
-  @Test
-  def testProduceAndMultiFetch() {
-    produceAndMultiFetch(producer)
-  }
-
-  private def multiProduce(producer: Producer[String, String]) {
-    val topics = Map("test4" -> 0, "test1" -> 0, "test2" -> 0, "test3" -> 0)
-    topics.keys.map(topic => createTopic(topic))
-
-    val messages = new mutable.HashMap[String, Seq[String]]
-    val builder = new FetchRequestBuilder()
-    for((topic, partition) <- topics) {
-      val messageList = List("a_" + topic, "b_" + topic)
-      val producerData = messageList.map(new KeyedMessage[String, String](topic, topic, _))
-      messages += topic -> messageList
-      producer.send(producerData:_*)
-      builder.addFetch(topic, partition, 0, 10000)
-    }
-
-    val request = builder.build()
-    val response = consumer.fetch(request)
-    for((topic, partition) <- topics) {
-      val fetched = response.messageSet(topic, partition)
-      assertEquals(messages(topic), fetched.map(messageAndOffset => TestUtils.readString(messageAndOffset.message.payload)))
-    }
-  }
-
-  @Test
-  def testMultiProduce() {
-    multiProduce(producer)
-  }
-
-  @Test
-  def testConsumerEmptyTopic() {
-    val newTopic = "new-topic"
-    createTopic(newTopic, numPartitions = 1, replicationFactor = 1)
-
-    val fetchResponse = consumer.fetch(new FetchRequestBuilder().addFetch(newTopic, 0, 0, 10000).build())
-    assertFalse(fetchResponse.messageSet(newTopic, 0).iterator.hasNext)
-  }
-
-  @Test
-  def testPipelinedProduceRequests() {
-    val topics = Map("test4" -> 0, "test1" -> 0, "test2" -> 0, "test3" -> 0)
-    topics.keys.map(topic => createTopic(topic))
-    val props = new Properties()
-    props.put("request.required.acks", "0")
-    val pipelinedProducer: Producer[String, String] =
-      TestUtils.createProducer[String, String](
-        TestUtils.getBrokerListStrFromServers(servers),
-        encoder = classOf[StringEncoder].getName,
-        keyEncoder = classOf[StringEncoder].getName,
-        partitioner = classOf[StaticPartitioner].getName,
-        producerProps = props)
-
-    // send some messages
-    val messages = new mutable.HashMap[String, Seq[String]]
-    val builder = new FetchRequestBuilder()
-    for( (topic, partition) <- topics) {
-      val messageList = List("a_" + topic, "b_" + topic)
-      val producerData = messageList.map(new KeyedMessage[String, String](topic, topic, _))
-      messages += topic -> messageList
-      pipelinedProducer.send(producerData:_*)
-      builder.addFetch(topic, partition, 0, 10000)
-    }
-
-    // wait until the messages are published
-    TestUtils.waitUntilTrue(() => { servers.head.logManager.getLog(new TopicPartition("test1", 0)).get.logEndOffset == 2 },
-                            "Published messages should be in the log")
-    TestUtils.waitUntilTrue(() => { servers.head.logManager.getLog(new TopicPartition("test2", 0)).get.logEndOffset == 2 },
-                            "Published messages should be in the log")
-    TestUtils.waitUntilTrue(() => { servers.head.logManager.getLog(new TopicPartition("test3", 0)).get.logEndOffset == 2 },
-                            "Published messages should be in the log")
-    TestUtils.waitUntilTrue(() => { servers.head.logManager.getLog(new TopicPartition("test4", 0)).get.logEndOffset == 2 },
-                            "Published messages should be in the log")
-
-    val replicaId = servers.head.config.brokerId
-    TestUtils.waitUntilTrue(() => { servers.head.replicaManager.getReplica(new TopicPartition("test1", 0), replicaId).get.highWatermark.messageOffset == 2 },
-                            "High watermark should equal to log end offset")
-    TestUtils.waitUntilTrue(() => { servers.head.replicaManager.getReplica(new TopicPartition("test2", 0), replicaId).get.highWatermark.messageOffset == 2 },
-                            "High watermark should equal to log end offset")
-    TestUtils.waitUntilTrue(() => { servers.head.replicaManager.getReplica(new TopicPartition("test3", 0), replicaId).get.highWatermark.messageOffset == 2 },
-                            "High watermark should equal to log end offset")
-    TestUtils.waitUntilTrue(() => { servers.head.replicaManager.getReplica(new TopicPartition("test4", 0), replicaId).get.highWatermark.messageOffset == 2 },
-                            "High watermark should equal to log end offset")
-
-    // test if the consumer received the messages in the correct order when producer has enabled request pipelining
-    val request = builder.build()
-    val response = consumer.fetch(request)
-    for( (topic, partition) <- topics) {
-      val fetched = response.messageSet(topic, partition)
-      assertEquals(messages(topic), fetched.map(messageAndOffset => TestUtils.readString(messageAndOffset.message.payload)))
-    }
-  }
-}
diff --git a/core/src/test/scala/unit/kafka/integration/ProducerConsumerTestHarness.scala b/core/src/test/scala/unit/kafka/integration/ProducerConsumerTestHarness.scala
deleted file mode 100644
index e3115e1..0000000
--- a/core/src/test/scala/unit/kafka/integration/ProducerConsumerTestHarness.scala
+++ /dev/null
@@ -1,48 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package kafka.integration
-
-import kafka.consumer.SimpleConsumer
-import org.junit.{After, Before}
-import kafka.producer.Producer
-import kafka.utils.{StaticPartitioner, TestUtils}
-import kafka.serializer.StringEncoder
-
-@deprecated("This test has been deprecated and it will be removed in a future release", "0.10.0.0")
-trait ProducerConsumerTestHarness extends KafkaServerTestHarness {
-  val host = "localhost"
-  var producer: Producer[String, String] = null
-  var consumer: SimpleConsumer = null
-
-  @Before
-  override def setUp() {
-    super.setUp
-    producer = TestUtils.createProducer[String, String](TestUtils.getBrokerListStrFromServers(servers),
-      encoder = classOf[StringEncoder].getName,
-      keyEncoder = classOf[StringEncoder].getName,
-      partitioner = classOf[StaticPartitioner].getName)
-    consumer = new SimpleConsumer(host, TestUtils.boundPort(servers.head), 1000000, 64 * 1024, "")
-  }
-
-  @After
-  override def tearDown() {
-    producer.close()
-    consumer.close()
-    super.tearDown
-  }
-}
diff --git a/core/src/test/scala/unit/kafka/integration/UncleanLeaderElectionTest.scala b/core/src/test/scala/unit/kafka/integration/UncleanLeaderElectionTest.scala
index 608f3a6..a15ddb8 100755
--- a/core/src/test/scala/unit/kafka/integration/UncleanLeaderElectionTest.scala
+++ b/core/src/test/scala/unit/kafka/integration/UncleanLeaderElectionTest.scala
@@ -54,8 +54,6 @@ class UncleanLeaderElectionTest extends ZooKeeperTestHarness {
 
   val kafkaApisLogger = Logger.getLogger(classOf[kafka.server.KafkaApis])
   val networkProcessorLogger = Logger.getLogger(classOf[kafka.network.Processor])
-  val syncProducerLogger = Logger.getLogger(classOf[kafka.producer.SyncProducer])
-  val eventHandlerLogger = Logger.getLogger(classOf[kafka.producer.async.DefaultEventHandler[Object, Object]])
 
   @Before
   override def setUp() {
@@ -73,8 +71,6 @@ class UncleanLeaderElectionTest extends ZooKeeperTestHarness {
     // temporarily set loggers to a higher level so that tests run quietly
     kafkaApisLogger.setLevel(Level.FATAL)
     networkProcessorLogger.setLevel(Level.FATAL)
-    syncProducerLogger.setLevel(Level.FATAL)
-    eventHandlerLogger.setLevel(Level.FATAL)
   }
 
   @After
@@ -85,8 +81,6 @@ class UncleanLeaderElectionTest extends ZooKeeperTestHarness {
     // restore log levels
     kafkaApisLogger.setLevel(Level.ERROR)
     networkProcessorLogger.setLevel(Level.ERROR)
-    syncProducerLogger.setLevel(Level.ERROR)
-    eventHandlerLogger.setLevel(Level.ERROR)
 
     super.tearDown()
   }
diff --git a/core/src/test/scala/unit/kafka/javaapi/consumer/ZookeeperConsumerConnectorTest.scala b/core/src/test/scala/unit/kafka/javaapi/consumer/ZookeeperConsumerConnectorTest.scala
index dbd9118..695b1b6 100644
--- a/core/src/test/scala/unit/kafka/javaapi/consumer/ZookeeperConsumerConnectorTest.scala
+++ b/core/src/test/scala/unit/kafka/javaapi/consumer/ZookeeperConsumerConnectorTest.scala
@@ -19,18 +19,17 @@ package kafka.javaapi.consumer
 
 import java.util.Properties
 
-import kafka.server._
 import kafka.serializer._
+import kafka.server._
 import kafka.integration.KafkaServerTestHarness
-import kafka.producer.KeyedMessage
-import kafka.javaapi.producer.Producer
-import kafka.utils.IntEncoder
 import kafka.utils.{Logging, TestUtils}
-import kafka.consumer.{KafkaStream, ConsumerConfig}
+import kafka.consumer.{ConsumerConfig, KafkaStream}
 import kafka.common.MessageStreamsExistException
+import org.apache.kafka.clients.producer.ProducerRecord
+import org.apache.kafka.common.serialization.{IntegerSerializer, StringSerializer}
 import org.junit.Test
 
-import scala.collection.JavaConversions
+import scala.collection.JavaConverters._
 
 import org.apache.log4j.{Level, Logger}
 import org.junit.Assert._
@@ -65,14 +64,14 @@ class ZookeeperConsumerConnectorTest extends KafkaServerTestHarness with Logging
     // create a consumer
     val consumerConfig1 = new ConsumerConfig(TestUtils.createConsumerProperties(zkConnect, group, consumer1))
     val zkConsumerConnector1 = new ZookeeperConsumerConnector(consumerConfig1, true)
-    val topicMessageStreams1 = zkConsumerConnector1.createMessageStreams(toJavaMap(Map(topic -> numNodes*numParts/2)), new StringDecoder(), new StringDecoder())
+    val topicMessageStreams1 = zkConsumerConnector1.createMessageStreams(Map[String, Integer](topic -> numNodes*numParts/2).asJava, new StringDecoder(), new StringDecoder())
 
     val receivedMessages1 = getMessages(nMessages*2, topicMessageStreams1)
     assertEquals(sentMessages1.sorted, receivedMessages1.sorted)
 
     // call createMesssageStreams twice should throw MessageStreamsExistException
     try {
-      zkConsumerConnector1.createMessageStreams(toJavaMap(Map(topic -> numNodes*numParts/2)), new StringDecoder(), new StringDecoder())
+      zkConsumerConnector1.createMessageStreams(Map[String, Integer](topic -> numNodes*numParts/2).asJava, new StringDecoder(), new StringDecoder())
       fail("Should fail with MessageStreamsExistException")
     } catch {
       case _: MessageStreamsExistException => // expected
@@ -86,35 +85,22 @@ class ZookeeperConsumerConnectorTest extends KafkaServerTestHarness with Logging
                    messagesPerNode: Int,
                    header: String): List[String] = {
     var messages: List[String] = Nil
-    for(server <- servers) {
-      val producer: kafka.producer.Producer[Int, String] =
-        TestUtils.createProducer(TestUtils.getBrokerListStrFromServers(servers),
-          encoder = classOf[StringEncoder].getName,
-          keyEncoder = classOf[IntEncoder].getName)
-      val javaProducer: Producer[Int, String] = new kafka.javaapi.producer.Producer(producer)
+    val producer = TestUtils.createNewProducer[Integer, String](TestUtils.getBrokerListStrFromServers(servers),
+      keySerializer = new IntegerSerializer, valueSerializer = new StringSerializer)
+    for (server <- servers) {
       for (partition <- 0 until numParts) {
-        val ms = 0.until(messagesPerNode).map(x => header + server.config.brokerId + "-" + partition + "-" + x)
+        val ms = (0 until messagesPerNode).map(x => header + server.config.brokerId + "-" + partition + "-" + x)
         messages ++= ms
-        import JavaConversions._
-        javaProducer.send(ms.map(new KeyedMessage[Int, String](topic, partition, _)): java.util.List[KeyedMessage[Int, String]])
+        ms.map(new ProducerRecord[Integer, String](topic, partition, partition, _)).map(producer.send).foreach(_.get)
       }
-      javaProducer.close
     }
+    producer.close()
     messages
   }
 
   def getMessages(nMessagesPerThread: Int,
                   jTopicMessageStreams: java.util.Map[String, java.util.List[KafkaStream[String, String]]]): List[String] = {
-    var messages: List[String] = Nil
-    import scala.collection.JavaConversions._
-    val topicMessageStreams = jTopicMessageStreams.mapValues(_.toList)
-    messages = TestUtils.getMessages(topicMessageStreams, nMessagesPerThread)
-    messages
-  }
-
-  private def toJavaMap(scalaMap: Map[String, Int]): java.util.Map[String, java.lang.Integer] = {
-    val javaMap = new java.util.HashMap[String, java.lang.Integer]()
-    scalaMap.foreach(m => javaMap.put(m._1, m._2.asInstanceOf[java.lang.Integer]))
-    javaMap
+    val topicMessageStreams = jTopicMessageStreams.asScala.mapValues(_.asScala.toList)
+    TestUtils.getMessages(topicMessageStreams, nMessagesPerThread)
   }
 }
diff --git a/core/src/test/scala/unit/kafka/metrics/MetricsTest.scala b/core/src/test/scala/unit/kafka/metrics/MetricsTest.scala
index 4227764..2423e4c 100644
--- a/core/src/test/scala/unit/kafka/metrics/MetricsTest.scala
+++ b/core/src/test/scala/unit/kafka/metrics/MetricsTest.scala
@@ -26,14 +26,11 @@ import org.junit.Test
 import org.junit.Assert._
 import kafka.integration.KafkaServerTestHarness
 import kafka.server._
-import kafka.serializer._
 import kafka.utils._
-import kafka.utils.TestUtils._
 
 import scala.collection._
 import scala.collection.JavaConverters._
 import scala.util.matching.Regex
-import kafka.consumer.{ConsumerConfig, ZookeeperConsumerConnector}
 import kafka.log.LogConfig
 import org.apache.kafka.common.TopicPartition
 
@@ -50,26 +47,6 @@ class MetricsTest extends KafkaServerTestHarness with Logging {
   val nMessages = 2
 
   @Test
-  @deprecated("This test has been deprecated and it will be removed in a future release", "0.10.0.0")
-  def testMetricsLeak() {
-    val topic = "test-metrics-leak"
-    // create topic topic1 with 1 partition on broker 0
-    createTopic(topic, numPartitions = 1, replicationFactor = 1)
-    // force creation not client's specific metrics.
-    createAndShutdownStep(topic, "group0", "consumer0", "producer0")
-
-    //this assertion is only used for creating the metrics for DelayedFetchMetrics, it should never fail, but should not be removed
-    assertNotNull(DelayedFetchMetrics)
-
-    val countOfStaticMetrics = Metrics.defaultRegistry.allMetrics.keySet.size
-
-    for (i <- 0 to 5) {
-      createAndShutdownStep(topic, "group" + i % 3, "consumer" + i % 2, "producer" + i % 2)
-      assertEquals(countOfStaticMetrics, Metrics.defaultRegistry.allMetrics.keySet.size)
-    }
-  }
-
-  @Test
   def testMetricsReporterAfterDeletingTopic() {
     val topic = "test-topic-metric"
     adminZkClient.createTopic(topic, 1, 1)
@@ -84,7 +61,7 @@ class MetricsTest extends KafkaServerTestHarness with Logging {
     adminZkClient.createTopic(topic, 2, 1)
     // Produce a few messages to create the metrics
     // Don't consume messages as it may cause metrics to be re-created causing the test to fail, see KAFKA-5238
-    TestUtils.produceMessages(servers, topic, nMessages)
+    TestUtils.generateAndProduceMessages(servers, topic, nMessages)
     assertTrue("Topic metrics don't exist", topicMetricGroups(topic).nonEmpty)
     servers.foreach(s => assertNotNull(s.brokerTopicStats.topicStats(topic)))
     adminZkClient.deleteTopic(topic)
@@ -108,18 +85,6 @@ class MetricsTest extends KafkaServerTestHarness with Logging {
     assert(metric.getMBeanName.endsWith(expectedMBeanName))
   }
 
-  @deprecated("This test has been deprecated and it will be removed in a future release", "0.10.0.0")
-  def createAndShutdownStep(topic: String, group: String, consumerId: String, producerId: String): Unit = {
-    sendMessages(servers, topic, nMessages)
-    // create a consumer
-    val consumerConfig1 = new ConsumerConfig(TestUtils.createConsumerProperties(zkConnect, group, consumerId))
-    val zkConsumerConnector1 = new ZookeeperConsumerConnector(consumerConfig1, true)
-    val topicMessageStreams1 = zkConsumerConnector1.createMessageStreams(Map(topic -> 1), new StringDecoder, new StringDecoder)
-    getMessages(topicMessageStreams1, nMessages)
-
-    zkConsumerConnector1.shutdown()
-  }
-
   @Test
   def testBrokerTopicMetricsBytesInOut(): Unit = {
     val topic = "test-bytes-in-out"
@@ -132,7 +97,7 @@ class MetricsTest extends KafkaServerTestHarness with Logging {
     topicConfig.setProperty(LogConfig.MinInSyncReplicasProp, "2")
     createTopic(topic, 1, numNodes, topicConfig)
     // Produce a few messages to create the metrics
-    TestUtils.produceMessages(servers, topic, nMessages)
+    TestUtils.generateAndProduceMessages(servers, topic, nMessages)
 
     // Check the log size for each broker so that we can distinguish between failures caused by replication issues
     // versus failures caused by the metrics
@@ -151,7 +116,7 @@ class MetricsTest extends KafkaServerTestHarness with Logging {
     val initialBytesOut = meterCount(bytesOut)
 
     // Produce a few messages to make the metrics tick
-    TestUtils.produceMessages(servers, topic, nMessages)
+    TestUtils.generateAndProduceMessages(servers, topic, nMessages)
 
     assertTrue(meterCount(replicationBytesIn) > initialReplicationBytesIn)
     assertTrue(meterCount(replicationBytesOut) > initialReplicationBytesOut)
diff --git a/core/src/test/scala/unit/kafka/producer/AsyncProducerTest.scala b/core/src/test/scala/unit/kafka/producer/AsyncProducerTest.scala
deleted file mode 100755
index 370a1ad..0000000
--- a/core/src/test/scala/unit/kafka/producer/AsyncProducerTest.scala
+++ /dev/null
@@ -1,505 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
-*/
-
-package kafka.producer
-
-import java.util.Properties
-import java.util.concurrent.LinkedBlockingQueue
-
-import org.apache.kafka.common.protocol.Errors
-import org.junit.Assert.{assertEquals, assertTrue}
-import org.easymock.EasyMock
-import org.junit.Test
-import kafka.api._
-import kafka.cluster.BrokerEndPoint
-import kafka.common._
-import kafka.message._
-import kafka.producer.async._
-import kafka.serializer._
-import kafka.server.KafkaConfig
-import kafka.utils.TestUtils._
-
-import scala.collection.Map
-import scala.collection.mutable.ArrayBuffer
-import kafka.utils._
-import org.apache.kafka.common.security.auth.SecurityProtocol
-import org.apache.kafka.common.utils.Time
-
-@deprecated("This test has been deprecated and it will be removed in a future release.", "0.10.0.0")
-class AsyncProducerTest {
-
-  class NegativePartitioner(props: VerifiableProperties = null) extends Partitioner {
-    def partition(data: Any, numPartitions: Int): Int = -1
-  }
-
-  // One of the few cases we can just set a fixed port because the producer is mocked out here since this uses mocks
-  val props = Seq(createBrokerConfig(1, "127.0.0.1:1", port = 65534))
-  val configs = props.map(KafkaConfig.fromProps)
-  val brokerList = configs.map { config =>
-    val endPoint = config.advertisedListeners.find(_.securityProtocol == SecurityProtocol.PLAINTEXT).get
-    org.apache.kafka.common.utils.Utils.formatAddress(endPoint.host, endPoint.port)
-  }.mkString(",")
-
-  @Test
-  def testProducerQueueSize() {
-    // a mock event handler that blocks
-    val mockEventHandler = new EventHandler[String,String] {
-
-      def handle(events: Seq[KeyedMessage[String,String]]) {
-        Thread.sleep(500)
-      }
-
-      def close(): Unit = ()
-    }
-
-    val props = new Properties()
-    props.put("serializer.class", "kafka.serializer.StringEncoder")
-    props.put("metadata.broker.list", brokerList)
-    props.put("producer.type", "async")
-    props.put("queue.buffering.max.messages", "10")
-    props.put("batch.num.messages", "1")
-    props.put("queue.enqueue.timeout.ms", "0")
-
-    val config = new ProducerConfig(props)
-    val produceData = getProduceData(12)
-    val producer = new Producer[String, String](config, mockEventHandler)
-    try {
-      // send all 10 messages, should hit the batch size and then reach broker
-      producer.send(produceData: _*)
-      fail("Queue should be full")
-    }
-    catch {
-      case _: QueueFullException => //expected
-    }finally {
-      producer.close()
-    }
-  }
-
-  @Test
-  def testProduceAfterClosed() {
-    val produceData = getProduceData(10)
-    val producer = createProducer[String, String](
-      brokerList,
-      encoder = classOf[StringEncoder].getName)
-
-    producer.close
-
-    try {
-      producer.send(produceData: _*)
-      fail("should complain that producer is already closed")
-    }
-    catch {
-      case _: ProducerClosedException => //expected
-    }
-  }
-
-  @Test
-  def testBatchSize() {
-    /**
-     *  Send a total of 10 messages with batch size of 5. Expect 2 calls to the handler, one for each batch.
-     */
-    val producerDataList = getProduceData(10)
-    val mockHandler = EasyMock.createStrictMock(classOf[DefaultEventHandler[String,String]])
-    mockHandler.handle(producerDataList.take(5))
-    EasyMock.expectLastCall
-    mockHandler.handle(producerDataList.takeRight(5))
-    EasyMock.expectLastCall
-    EasyMock.replay(mockHandler)
-
-    val queue = new LinkedBlockingQueue[KeyedMessage[String,String]](10)
-    val producerSendThread =
-      new ProducerSendThread[String,String]("thread1", queue, mockHandler, Integer.MAX_VALUE, 5, "")
-    producerSendThread.start()
-
-    for (producerData <- producerDataList)
-      queue.put(producerData)
-
-    producerSendThread.shutdown
-    EasyMock.verify(mockHandler)
-  }
-
-  @Test
-  def testQueueTimeExpired() {
-    /**
-     *  Send a total of 2 messages with batch size of 5 and queue time of 200ms.
-     *  Expect 1 calls to the handler after 200ms.
-     */
-    val producerDataList = getProduceData(2)
-    val mockHandler = EasyMock.createStrictMock(classOf[DefaultEventHandler[String,String]])
-    mockHandler.handle(producerDataList)
-    EasyMock.expectLastCall
-    EasyMock.replay(mockHandler)
-
-    val queueExpirationTime = 200
-    val queue = new LinkedBlockingQueue[KeyedMessage[String,String]](10)
-    val producerSendThread =
-      new ProducerSendThread[String,String]("thread1", queue, mockHandler, queueExpirationTime, 5, "")
-    producerSendThread.start()
-
-    for (producerData <- producerDataList)
-      queue.put(producerData)
-
-    Thread.sleep(queueExpirationTime + 100)
-    EasyMock.verify(mockHandler)
-    producerSendThread.shutdown
-  }
-
-  @Test
-  def testPartitionAndCollateEvents() {
-    val producerDataList = new ArrayBuffer[KeyedMessage[Int,Message]]
-    // use bogus key and partition key override for some messages
-    producerDataList.append(new KeyedMessage[Int,Message]("topic1", key = 0, message = new Message("msg1".getBytes)))
-    producerDataList.append(new KeyedMessage[Int,Message]("topic2", key = -99, partKey = 1, message = new Message("msg2".getBytes)))
-    producerDataList.append(new KeyedMessage[Int,Message]("topic1", key = 2, message = new Message("msg3".getBytes)))
-    producerDataList.append(new KeyedMessage[Int,Message]("topic1", key = -101, partKey = 3, message = new Message("msg4".getBytes)))
-    producerDataList.append(new KeyedMessage[Int,Message]("topic2", key = 4, message = new Message("msg5".getBytes)))
-
-    val props = new Properties()
-    props.put("metadata.broker.list", brokerList)
-    val broker1 = new BrokerEndPoint(0, "localhost", 9092)
-    val broker2 = new BrokerEndPoint(1, "localhost", 9093)
-
-    // form expected partitions metadata
-    val partition1Metadata = new PartitionMetadata(0, Some(broker1), List(broker1, broker2))
-    val partition2Metadata = new PartitionMetadata(1, Some(broker2), List(broker1, broker2))
-    val topic1Metadata = new TopicMetadata("topic1", List(partition1Metadata, partition2Metadata))
-    val topic2Metadata = new TopicMetadata("topic2", List(partition1Metadata, partition2Metadata))
-
-    val topicPartitionInfos = new collection.mutable.HashMap[String, TopicMetadata]
-    topicPartitionInfos.put("topic1", topic1Metadata)
-    topicPartitionInfos.put("topic2", topic2Metadata)
-
-    val intPartitioner = new Partitioner {
-      def partition(key: Any, numPartitions: Int): Int = key.asInstanceOf[Int] % numPartitions
-    }
-    val config = new ProducerConfig(props)
-
-    val producerPool = new ProducerPool(config)
-    val handler = new DefaultEventHandler[Int,String](config,
-                                                      partitioner = intPartitioner,
-                                                      encoder = null.asInstanceOf[Encoder[String]],
-                                                      keyEncoder = new IntEncoder(),
-                                                      producerPool = producerPool,
-                                                      topicPartitionInfos = topicPartitionInfos)
-
-    val topic1Broker1Data =
-      ArrayBuffer[KeyedMessage[Int,Message]](new KeyedMessage[Int,Message]("topic1", 0, new Message("msg1".getBytes)),
-                                             new KeyedMessage[Int,Message]("topic1", 2, new Message("msg3".getBytes)))
-    val topic1Broker2Data = ArrayBuffer[KeyedMessage[Int,Message]](new KeyedMessage[Int,Message]("topic1", -101, 3, new Message("msg4".getBytes)))
-    val topic2Broker1Data = ArrayBuffer[KeyedMessage[Int,Message]](new KeyedMessage[Int,Message]("topic2", 4, new Message("msg5".getBytes)))
-    val topic2Broker2Data = ArrayBuffer[KeyedMessage[Int,Message]](new KeyedMessage[Int,Message]("topic2", -99, 1, new Message("msg2".getBytes)))
-    val expectedResult = Some(Map(
-        0 -> Map(
-              TopicAndPartition("topic1", 0) -> topic1Broker1Data,
-              TopicAndPartition("topic2", 0) -> topic2Broker1Data),
-        1 -> Map(
-              TopicAndPartition("topic1", 1) -> topic1Broker2Data,
-              TopicAndPartition("topic2", 1) -> topic2Broker2Data)
-      ))
-
-    val actualResult = handler.partitionAndCollate(producerDataList)
-    assertEquals(expectedResult, actualResult)
-  }
-
-  @Test
-  def testSerializeEvents() {
-    val produceData = TestUtils.getMsgStrings(5).map(m => new KeyedMessage[String,String]("topic1",m))
-    val props = new Properties()
-    props.put("metadata.broker.list", brokerList)
-    val config = new ProducerConfig(props)
-    // form expected partitions metadata
-    val topic1Metadata = getTopicMetadata("topic1", 0, 0, "localhost", 9092)
-    val topicPartitionInfos = new collection.mutable.HashMap[String, TopicMetadata]
-    topicPartitionInfos.put("topic1", topic1Metadata)
-
-    val producerPool = new ProducerPool(config)
-
-    val handler = new DefaultEventHandler[String,String](config,
-                                                         partitioner = null.asInstanceOf[Partitioner],
-                                                         encoder = new StringEncoder,
-                                                         keyEncoder = new StringEncoder,
-                                                         producerPool = producerPool,
-                                                         topicPartitionInfos = topicPartitionInfos)
-
-    val serializedData = handler.serialize(produceData)
-    val deserializedData = serializedData.map(d => new KeyedMessage[String,String](d.topic, TestUtils.readString(d.message.payload)))
-
-    // Test that the serialize handles seq from a Stream
-    val streamedSerializedData = handler.serialize(Stream(produceData:_*))
-    val deserializedStreamData = streamedSerializedData.map(d => new KeyedMessage[String,String](d.topic, TestUtils.readString(d.message.payload)))
-
-    TestUtils.checkEquals(produceData.iterator, deserializedData.iterator)
-    TestUtils.checkEquals(produceData.iterator, deserializedStreamData.iterator)
-  }
-
-  @Test
-  def testInvalidPartition() {
-    val producerDataList = new ArrayBuffer[KeyedMessage[String,Message]]
-    producerDataList.append(new KeyedMessage[String,Message]("topic1", "key1", new Message("msg1".getBytes)))
-    val props = new Properties()
-    props.put("metadata.broker.list", brokerList)
-    val config = new ProducerConfig(props)
-
-    // form expected partitions metadata
-    val topic1Metadata = getTopicMetadata("topic1", 0, 0, "localhost", 9092)
-
-    val topicPartitionInfos = new collection.mutable.HashMap[String, TopicMetadata]
-    topicPartitionInfos.put("topic1", topic1Metadata)
-
-    val producerPool = new ProducerPool(config)
-
-    val handler = new DefaultEventHandler[String,String](config,
-                                                         partitioner = new NegativePartitioner,
-                                                         encoder = null.asInstanceOf[Encoder[String]],
-                                                         keyEncoder = null.asInstanceOf[Encoder[String]],
-                                                         producerPool = producerPool,
-                                                         topicPartitionInfos = topicPartitionInfos)
-    try {
-      handler.partitionAndCollate(producerDataList)
-    }
-    catch {
-      // should not throw any exception
-      case _: Throwable => fail("Should not throw any exception")
-
-    }
-  }
-
-  @Test
-  def testNoBroker() {
-    val props = new Properties()
-    props.put("metadata.broker.list", brokerList)
-
-    val config = new ProducerConfig(props)
-    // create topic metadata with 0 partitions
-    val topic1Metadata = new TopicMetadata("topic1", Seq.empty)
-
-    val topicPartitionInfos = new collection.mutable.HashMap[String, TopicMetadata]
-    topicPartitionInfos.put("topic1", topic1Metadata)
-
-    val producerPool = new ProducerPool(config)
-
-    val producerDataList = new ArrayBuffer[KeyedMessage[String,String]]
-    producerDataList.append(new KeyedMessage[String,String]("topic1", "msg1"))
-    val handler = new DefaultEventHandler[String,String](config,
-                                                         partitioner = null.asInstanceOf[Partitioner],
-                                                         encoder = new StringEncoder,
-                                                         keyEncoder = new StringEncoder,
-                                                         producerPool = producerPool,
-                                                         topicPartitionInfos = topicPartitionInfos)
-    try {
-      handler.handle(producerDataList)
-      fail("Should fail with FailedToSendMessageException")
-    }
-    catch {
-      case _: FailedToSendMessageException => // we retry on any exception now
-    }
-  }
-
-  @Test
-  def testIncompatibleEncoder() {
-    val props = new Properties()
-    // no need to retry since the send will always fail
-    props.put("message.send.max.retries", "0")
-    val producer= createProducer[String, String](
-      brokerList = brokerList,
-      encoder = classOf[DefaultEncoder].getName,
-      keyEncoder = classOf[DefaultEncoder].getName,
-      producerProps = props)
-
-    try {
-      producer.send(getProduceData(1): _*)
-      fail("Should fail with ClassCastException due to incompatible Encoder")
-    } catch {
-      case _: ClassCastException =>
-    } finally {
-      producer.close()
-    }
-  }
-
-  @Test
-  def testRandomPartitioner() {
-    val props = new Properties()
-    props.put("metadata.broker.list", brokerList)
-    val config = new ProducerConfig(props)
-
-    // create topic metadata with 0 partitions
-    val topic1Metadata = getTopicMetadata("topic1", 0, 0, "localhost", 9092)
-    val topic2Metadata = getTopicMetadata("topic2", 0, 0, "localhost", 9092)
-
-    val topicPartitionInfos = new collection.mutable.HashMap[String, TopicMetadata]
-    topicPartitionInfos.put("topic1", topic1Metadata)
-    topicPartitionInfos.put("topic2", topic2Metadata)
-
-    val producerPool = new ProducerPool(config)
-    val handler = new DefaultEventHandler[String,String](config,
-                                                         partitioner = null.asInstanceOf[Partitioner],
-                                                         encoder = null.asInstanceOf[Encoder[String]],
-                                                         keyEncoder = null.asInstanceOf[Encoder[String]],
-                                                         producerPool = producerPool,
-                                                         topicPartitionInfos = topicPartitionInfos)
-    val producerDataList = new ArrayBuffer[KeyedMessage[String,Message]]
-    producerDataList.append(new KeyedMessage[String,Message]("topic1", new Message("msg1".getBytes)))
-    producerDataList.append(new KeyedMessage[String,Message]("topic2", new Message("msg2".getBytes)))
-    producerDataList.append(new KeyedMessage[String,Message]("topic1", new Message("msg3".getBytes)))
-
-    val partitionedDataOpt = handler.partitionAndCollate(producerDataList)
-    partitionedDataOpt match {
-      case Some(partitionedData) =>
-        for (dataPerBroker <- partitionedData.values) {
-          for (tp <- dataPerBroker.keys)
-            assertTrue(tp.partition == 0)
-        }
-      case None =>
-        fail("Failed to collate requests by topic, partition")
-    }
-  }
-
-  @Test
-  def testFailedSendRetryLogic() {
-    val props = new Properties()
-    props.put("metadata.broker.list", brokerList)
-    props.put("request.required.acks", "1")
-    props.put("serializer.class", classOf[StringEncoder].getName)
-    props.put("key.serializer.class", classOf[NullEncoder[Int]].getName)
-    props.put("producer.num.retries", "3")
-
-    val config = new ProducerConfig(props)
-
-    val topic1 = "topic1"
-    val topic1Metadata = getTopicMetadata(topic1, Array(0, 1), 0, "localhost", 9092)
-    val topicPartitionInfos = new collection.mutable.HashMap[String, TopicMetadata]
-    topicPartitionInfos.put("topic1", topic1Metadata)
-
-    val msgs = TestUtils.getMsgStrings(2)
-
-    import SyncProducerConfig.{DefaultAckTimeoutMs, DefaultClientId}
-
-    // produce request for topic1 and partitions 0 and 1.  Let the first request fail
-    // entirely.  The second request will succeed for partition 1 but fail for partition 0.
-    // On the third try for partition 0, let it succeed.
-    val request1 = TestUtils.produceRequestWithAcks(List(topic1), List(0, 1), messagesToSet(msgs), acks = 1,
-      correlationId = 5, timeout = DefaultAckTimeoutMs, clientId = DefaultClientId)
-    val request2 = TestUtils.produceRequestWithAcks(List(topic1), List(0, 1), messagesToSet(msgs), acks = 1,
-      correlationId = 11, timeout = DefaultAckTimeoutMs, clientId = DefaultClientId)
-    val response1 = ProducerResponse(0,
-      Map((TopicAndPartition("topic1", 0), ProducerResponseStatus(Errors.NOT_LEADER_FOR_PARTITION, 0L)),
-          (TopicAndPartition("topic1", 1), ProducerResponseStatus(Errors.NONE, 0L))))
-    val request3 = TestUtils.produceRequest(topic1, 0, messagesToSet(msgs), acks = 1, correlationId = 15,
-      timeout = DefaultAckTimeoutMs, clientId = DefaultClientId)
-    val response2 = ProducerResponse(0,
-      Map((TopicAndPartition("topic1", 0), ProducerResponseStatus(Errors.NONE, 0L))))
-    val mockSyncProducer = EasyMock.createMock(classOf[SyncProducer])
-    // don't care about config mock
-    val myProps = new Properties()
-    myProps.put("host", "localhost")
-    myProps.put("port", "9092")
-    val myConfig = new SyncProducerConfig(myProps)
-    EasyMock.expect(mockSyncProducer.config).andReturn(myConfig).anyTimes()
-    EasyMock.expect(mockSyncProducer.send(request1)).andThrow(new RuntimeException) // simulate SocketTimeoutException
-    EasyMock.expect(mockSyncProducer.send(request2)).andReturn(response1)
-    EasyMock.expect(mockSyncProducer.send(request3)).andReturn(response2)
-    EasyMock.replay(mockSyncProducer)
-
-    val producerPool = EasyMock.createMock(classOf[ProducerPool])
-    EasyMock.expect(producerPool.getProducer(0)).andReturn(mockSyncProducer).times(3)
-    EasyMock.expect(producerPool.close())
-    EasyMock.replay(producerPool)
-    val time = new Time {
-      override def nanoseconds: Long = 0L
-      override def milliseconds: Long = 0L
-      override def sleep(ms: Long): Unit = {}
-      override def hiResClockMs: Long = 0L
-    }
-    val handler = new DefaultEventHandler(config,
-                                          partitioner = new FixedValuePartitioner(),
-                                          encoder = new StringEncoder(),
-                                          keyEncoder = new NullEncoder[Int](),
-                                          producerPool = producerPool,
-                                          topicPartitionInfos = topicPartitionInfos,
-                                          time = time)
-    val data = msgs.map(m => new KeyedMessage(topic1, 0, m)) ++ msgs.map(m => new KeyedMessage(topic1, 1, m))
-    handler.handle(data)
-    handler.close()
-
-    EasyMock.verify(mockSyncProducer)
-    EasyMock.verify(producerPool)
-  }
-
-  @Test
-  def testJavaProducer() {
-    val topic = "topic1"
-    val msgs = TestUtils.getMsgStrings(5)
-    val scalaProducerData = msgs.map(m => new KeyedMessage[String, String](topic, m))
-    val javaProducerData: java.util.List[KeyedMessage[String, String]] = {
-      import scala.collection.JavaConversions._
-      scalaProducerData
-    }
-
-    val mockScalaProducer = EasyMock.createMock(classOf[kafka.producer.Producer[String, String]])
-    mockScalaProducer.send(scalaProducerData.head)
-    EasyMock.expectLastCall()
-    mockScalaProducer.send(scalaProducerData: _*)
-    EasyMock.expectLastCall()
-    EasyMock.replay(mockScalaProducer)
-
-    val javaProducer = new kafka.javaapi.producer.Producer[String, String](mockScalaProducer)
-    javaProducer.send(javaProducerData.get(0))
-    javaProducer.send(javaProducerData)
-
-    EasyMock.verify(mockScalaProducer)
-  }
-
-  @Test
-  def testInvalidConfiguration() {
-    val props = new Properties()
-    props.put("serializer.class", "kafka.serializer.StringEncoder")
-    props.put("producer.type", "async")
-    try {
-      new ProducerConfig(props)
-      fail("should complain about wrong config")
-    }
-    catch {
-      case _: IllegalArgumentException => //expected
-    }
-  }
-
-  def getProduceData(nEvents: Int): Seq[KeyedMessage[String,String]] = {
-    val producerDataList = new ArrayBuffer[KeyedMessage[String,String]]
-    for (i <- 0 until nEvents)
-      producerDataList.append(new KeyedMessage[String,String]("topic1", null, "msg" + i))
-    producerDataList
-  }
-
-  private def getTopicMetadata(topic: String, partition: Int, brokerId: Int, brokerHost: String, brokerPort: Int): TopicMetadata = {
-    getTopicMetadata(topic, List(partition), brokerId, brokerHost, brokerPort)
-  }
-
-  private def getTopicMetadata(topic: String, partition: Seq[Int], brokerId: Int, brokerHost: String, brokerPort: Int): TopicMetadata = {
-    val broker1 = new BrokerEndPoint(brokerId, brokerHost, brokerPort)
-    new TopicMetadata(topic, partition.map(new PartitionMetadata(_, Some(broker1), List(broker1))))
-  }
-
-  def messagesToSet(messages: Seq[String]): ByteBufferMessageSet = {
-    new ByteBufferMessageSet(NoCompressionCodec, messages.map(m => new Message(m.getBytes, 0L, Message.MagicValue_V1)): _*)
-  }
-
-  def messagesToSet(key: Array[Byte], messages: Seq[Array[Byte]]): ByteBufferMessageSet = {
-    new ByteBufferMessageSet(
-      NoCompressionCodec,
-      messages.map(m => new Message(key = key, bytes = m, timestamp = 0L, magicValue = Message.MagicValue_V1)): _*)
-  }
-}
diff --git a/core/src/test/scala/unit/kafka/producer/ProducerTest.scala b/core/src/test/scala/unit/kafka/producer/ProducerTest.scala
deleted file mode 100755
index dc2a5ed..0000000
--- a/core/src/test/scala/unit/kafka/producer/ProducerTest.scala
+++ /dev/null
@@ -1,348 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package kafka.producer
-
-import java.nio.ByteBuffer
-import java.util
-import java.util.Properties
-
-import kafka.api.FetchRequestBuilder
-import kafka.common.FailedToSendMessageException
-import kafka.consumer.SimpleConsumer
-import kafka.message.{Message, MessageAndOffset}
-import kafka.serializer.StringEncoder
-import kafka.server.{KafkaConfig, KafkaRequestHandler, KafkaServer}
-import kafka.utils._
-import kafka.zk.ZooKeeperTestHarness
-import org.apache.kafka.common.TopicPartition
-import org.apache.kafka.common.record.TimestampType
-import org.apache.kafka.common.utils.Time
-import org.apache.log4j.{Level, Logger}
-import org.junit.Assert._
-import org.junit.{After, Before, Test}
-import org.scalatest.exceptions.TestFailedException
-
-@deprecated("This test has been deprecated and it will be removed in a future release.", "0.10.0.0")
-class ProducerTest extends ZooKeeperTestHarness with Logging{
-  private val brokerId1 = 0
-  private val brokerId2 = 1
-  private var server1: KafkaServer = null
-  private var server2: KafkaServer = null
-  private var consumer1: SimpleConsumer = null
-  private var consumer2: SimpleConsumer = null
-  private val requestHandlerLogger = Logger.getLogger(classOf[KafkaRequestHandler])
-  private var servers = List.empty[KafkaServer]
-
-  // Creation of consumers is deferred until they are actually needed. This allows us to kill brokers that use random
-  // ports and then get a consumer instance that will be pointed at the correct port
-  def getConsumer1() = {
-    if (consumer1 == null)
-      consumer1 = new SimpleConsumer("localhost", TestUtils.boundPort(server1), 1000000, 64*1024, "")
-    consumer1
-  }
-
-  def getConsumer2() = {
-    if (consumer2 == null)
-      consumer2 = new SimpleConsumer("localhost", TestUtils.boundPort(server2), 1000000, 64*1024, "")
-    consumer2
-  }
-
-  @Before
-  override def setUp() {
-    super.setUp()
-    // set up 2 brokers with 4 partitions each
-    val props1 = TestUtils.createBrokerConfig(brokerId1, zkConnect, false)
-    props1.put("num.partitions", "4")
-    val config1 = KafkaConfig.fromProps(props1)
-    val props2 = TestUtils.createBrokerConfig(brokerId2, zkConnect, false)
-    props2.put("num.partitions", "4")
-    val config2 = KafkaConfig.fromProps(props2)
-    server1 = TestUtils.createServer(config1)
-    server2 = TestUtils.createServer(config2)
-    servers = List(server1,server2)
-
-    // temporarily set request handler logger to a higher level
-    requestHandlerLogger.setLevel(Level.FATAL)
-  }
-
-  @After
-  override def tearDown() {
-    // restore set request handler logger to a higher level
-    requestHandlerLogger.setLevel(Level.ERROR)
-
-    if (consumer1 != null)
-      consumer1.close()
-    if (consumer2 != null)
-      consumer2.close()
-
-    TestUtils.shutdownServers(Seq(server1, server2))
-    super.tearDown()
-  }
-
-  @Test
-  def testUpdateBrokerPartitionInfo() {
-    val topic = "new-topic"
-    TestUtils.createTopic(zkClient, topic, numPartitions = 1, replicationFactor = 2, servers = servers)
-
-    val props = new Properties()
-    // no need to retry since the send will always fail
-    props.put("message.send.max.retries", "0")
-    val producer1 = TestUtils.createProducer[String, String](
-        brokerList = "localhost:80,localhost:81",
-        encoder = classOf[StringEncoder].getName,
-        keyEncoder = classOf[StringEncoder].getName,
-        producerProps = props)
-
-    try {
-      producer1.send(new KeyedMessage[String, String](topic, "test", "test1"))
-      fail("Test should fail because the broker list provided are not valid")
-    } catch {
-      case _: FailedToSendMessageException => // this is expected
-    } finally producer1.close()
-
-    val producer2 = TestUtils.createProducer[String, String](
-      brokerList = "localhost:80," + TestUtils.getBrokerListStrFromServers(Seq(server1)),
-      encoder = classOf[StringEncoder].getName,
-      keyEncoder = classOf[StringEncoder].getName)
-
-    try{
-      producer2.send(new KeyedMessage[String, String](topic, "test", "test1"))
-    } catch {
-      case e: Throwable => fail("Should succeed sending the message", e)
-    } finally {
-      producer2.close()
-    }
-
-    val producer3 =  TestUtils.createProducer[String, String](
-      brokerList = TestUtils.getBrokerListStrFromServers(Seq(server1, server2)),
-      encoder = classOf[StringEncoder].getName,
-      keyEncoder = classOf[StringEncoder].getName)
-
-    try{
-      producer3.send(new KeyedMessage[String, String](topic, "test", "test1"))
-    } catch {
-      case e: Throwable => fail("Should succeed sending the message", e)
-    } finally {
-      producer3.close()
-    }
-  }
-
-  @Test
-  def testSendToNewTopic() {
-    val props1 = new util.Properties()
-    props1.put("request.required.acks", "-1")
-
-    val topic = "new-topic"
-    // create topic with 1 partition and await leadership
-    TestUtils.createTopic(zkClient, topic, numPartitions = 1, replicationFactor = 2, servers = servers)
-
-    val producer1 = TestUtils.createProducer[String, String](
-      brokerList = TestUtils.getBrokerListStrFromServers(Seq(server1, server2)),
-      encoder = classOf[StringEncoder].getName,
-      keyEncoder = classOf[StringEncoder].getName,
-      partitioner = classOf[StaticPartitioner].getName,
-      producerProps = props1)
-    val startTime = System.currentTimeMillis()
-    // Available partition ids should be 0.
-    producer1.send(new KeyedMessage[String, String](topic, "test", "test1"))
-    producer1.send(new KeyedMessage[String, String](topic, "test", "test2"))
-    val endTime = System.currentTimeMillis()
-    // get the leader
-    val leaderOpt = zkClient.getLeaderForPartition(new TopicPartition(topic, 0))
-    assertTrue("Leader for topic new-topic partition 0 should exist", leaderOpt.isDefined)
-    val leader = leaderOpt.get
-
-    val messageSet = if(leader == server1.config.brokerId) {
-      val response1 = getConsumer1().fetch(new FetchRequestBuilder().addFetch(topic, 0, 0, 10000).build())
-      response1.messageSet("new-topic", 0).iterator.toBuffer
-    }else {
-      val response2 = getConsumer2().fetch(new FetchRequestBuilder().addFetch(topic, 0, 0, 10000).build())
-      response2.messageSet("new-topic", 0).iterator.toBuffer
-    }
-    assertEquals("Should have fetched 2 messages", 2, messageSet.size)
-    // Message 1
-    assertTrue(ByteBuffer.wrap("test1".getBytes).equals(messageSet.head.message.payload))
-    assertTrue(ByteBuffer.wrap("test".getBytes).equals(messageSet.head.message.key))
-    assertTrue(messageSet.head.message.timestamp >= startTime && messageSet.head.message.timestamp < endTime)
-    assertEquals(TimestampType.CREATE_TIME, messageSet.head.message.timestampType)
-    assertEquals(Message.MagicValue_V1, messageSet.head.message.magic)
-
-    // Message 2
-    assertTrue(ByteBuffer.wrap("test2".getBytes).equals(messageSet(1).message.payload))
-    assertTrue(ByteBuffer.wrap("test".getBytes).equals(messageSet(1).message.key))
-    assertTrue(messageSet(1).message.timestamp >= startTime && messageSet(1).message.timestamp < endTime)
-    assertEquals(TimestampType.CREATE_TIME, messageSet(1).message.timestampType)
-    assertEquals(Message.MagicValue_V1, messageSet(1).message.magic)
-    producer1.close()
-
-    val props2 = new util.Properties()
-    props2.put("request.required.acks", "3")
-    // no need to retry since the send will always fail
-    props2.put("message.send.max.retries", "0")
-
-    try {
-      val producer2 = TestUtils.createProducer[String, String](
-        brokerList = TestUtils.getBrokerListStrFromServers(Seq(server1, server2)),
-        encoder = classOf[StringEncoder].getName,
-        keyEncoder = classOf[StringEncoder].getName,
-        partitioner = classOf[StaticPartitioner].getName,
-        producerProps = props2)
-        producer2.close
-        fail("we don't support request.required.acks greater than 1")
-    }
-    catch {
-      case _: IllegalArgumentException =>  // this is expected
-    }
-  }
-
-
-  @Test
-  def testSendWithDeadBroker() {
-    val props = new Properties()
-    props.put("request.required.acks", "1")
-    // No need to retry since the topic will be created beforehand and normal send will succeed on the first try.
-    // Reducing the retries will save the time on the subsequent failure test.
-    props.put("message.send.max.retries", "0")
-
-    val topic = "new-topic"
-    // create topic
-    TestUtils.createTopic(zkClient, topic, partitionReplicaAssignment = Map(0->Seq(0), 1->Seq(0), 2->Seq(0), 3->Seq(0)),
-                          servers = servers)
-
-    val producer = TestUtils.createProducer[String, String](
-      brokerList = TestUtils.getBrokerListStrFromServers(Seq(server1, server2)),
-      encoder = classOf[StringEncoder].getName,
-      keyEncoder = classOf[StringEncoder].getName,
-      partitioner = classOf[StaticPartitioner].getName,
-      producerProps = props)
-    val startTime = System.currentTimeMillis()
-    try {
-      // Available partition ids should be 0, 1, 2 and 3, all lead and hosted only
-      // on broker 0
-      producer.send(new KeyedMessage[String, String](topic, "test", "test1"))
-    } catch {
-      case e: Throwable => fail("Unexpected exception: " + e)
-    }
-    val endTime = System.currentTimeMillis()
-    // kill the broker
-    server1.shutdown
-    server1.awaitShutdown()
-
-    try {
-      // These sends should fail since there are no available brokers
-      producer.send(new KeyedMessage[String, String](topic, "test", "test1"))
-      fail("Should fail since no leader exists for the partition.")
-    } catch {
-      case e : TestFailedException => throw e // catch and re-throw the failure message
-      case _: Throwable => // otherwise success
-    }
-
-    // restart server 1
-    server1.startup()
-    TestUtils.waitUntilLeaderIsElectedOrChanged(zkClient, topic, 0)
-    TestUtils.waitUntilMetadataIsPropagated(servers, topic, 0)
-    TestUtils.waitUntilLeaderIsKnown(servers, topic, 0)
-
-    try {
-      // cross check if broker 1 got the messages
-      val response1 = getConsumer1().fetch(new FetchRequestBuilder().addFetch(topic, 0, 0, 10000).build())
-      val messageSet1 = response1.messageSet(topic, 0).iterator
-      assertTrue("Message set should have 1 message", messageSet1.hasNext)
-      val message = messageSet1.next.message
-      assertTrue(ByteBuffer.wrap("test1".getBytes).equals(message.payload))
-      assertTrue(ByteBuffer.wrap("test".getBytes).equals(message.key))
-      assertTrue(message.timestamp >= startTime && message.timestamp < endTime)
-      assertEquals(TimestampType.CREATE_TIME, message.timestampType)
-      assertEquals(Message.MagicValue_V1, message.magic)
-      assertFalse("Message set should have another message", messageSet1.hasNext)
-    } catch {
-      case e: Exception => fail("Not expected", e)
-    }
-    producer.close
-  }
-
-  @Test
-  def testAsyncSendCanCorrectlyFailWithTimeout() {
-    val topic = "new-topic"
-    // create topics in ZK
-    TestUtils.createTopic(zkClient, topic, partitionReplicaAssignment = Map(0->Seq(0, 1)), servers = servers)
-
-    val timeoutMs = 500
-    val props = new Properties()
-    props.put("request.timeout.ms", timeoutMs.toString)
-    props.put("request.required.acks", "1")
-    props.put("message.send.max.retries", "0")
-    props.put("client.id","ProducerTest-testAsyncSendCanCorrectlyFailWithTimeout")
-    val producer = TestUtils.createProducer[String, String](
-      brokerList = TestUtils.getBrokerListStrFromServers(Seq(server1, server2)),
-      encoder = classOf[StringEncoder].getName,
-      keyEncoder = classOf[StringEncoder].getName,
-      partitioner = classOf[StaticPartitioner].getName,
-      producerProps = props)
-
-    // do a simple test to make sure plumbing is okay
-    try {
-      // this message should be assigned to partition 0 whose leader is on broker 0
-      producer.send(new KeyedMessage(topic, "test", "test"))
-      // cross check if the broker received the messages
-      // we need the loop because the broker won't return the message until it has been replicated and the producer is
-      // using acks=1
-      var messageSet1: Iterator[MessageAndOffset] = null
-      TestUtils.waitUntilTrue(() => {
-        val response1 = getConsumer1().fetch(new FetchRequestBuilder().addFetch(topic, 0, 0, 10000).build())
-        messageSet1 = response1.messageSet(topic, 0).iterator
-        messageSet1.hasNext
-      }, "Message set should have 1 message")
-      assertEquals(ByteBuffer.wrap("test".getBytes), messageSet1.next.message.payload)
-
-      // stop IO threads and request handling, but leave networking operational
-      // any requests should be accepted and queue up, but not handled
-      server1.requestHandlerPool.shutdown()
-
-      val t1 = Time.SYSTEM.milliseconds
-      try {
-        // this message should be assigned to partition 0 whose leader is on broker 0, but
-        // broker 0 will not respond within timeoutMs millis.
-        producer.send(new KeyedMessage(topic, "test", "test"))
-        fail("Exception should have been thrown")
-      } catch {
-        case _: FailedToSendMessageException => /* success */
-      }
-      val t2 = Time.SYSTEM.milliseconds
-      // make sure we don't wait fewer than timeoutMs
-      assertTrue((t2-t1) >= timeoutMs)
-
-    } finally producer.close()
-  }
-
-  @Test
-  def testSendNullMessage() {
-    val producer = TestUtils.createProducer[String, String](
-      brokerList = TestUtils.getBrokerListStrFromServers(Seq(server1, server2)),
-      encoder = classOf[StringEncoder].getName,
-      keyEncoder = classOf[StringEncoder].getName,
-      partitioner = classOf[StaticPartitioner].getName)
-
-    try {
-      TestUtils.createTopic(zkClient, "new-topic", 2, 1, servers)
-      producer.send(new KeyedMessage("new-topic", "key", null))
-    } finally {
-      producer.close()
-    }
-  }
-}
diff --git a/core/src/test/scala/unit/kafka/producer/SyncProducerTest.scala b/core/src/test/scala/unit/kafka/producer/SyncProducerTest.scala
deleted file mode 100644
index 5c1d4da..0000000
--- a/core/src/test/scala/unit/kafka/producer/SyncProducerTest.scala
+++ /dev/null
@@ -1,253 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package kafka.producer
-
-import java.net.SocketTimeoutException
-import java.util.Properties
-
-import kafka.api.{ProducerRequest, ProducerResponseStatus}
-import kafka.common.TopicAndPartition
-import kafka.integration.KafkaServerTestHarness
-import kafka.message._
-import kafka.server.KafkaConfig
-import kafka.utils._
-import org.apache.kafka.common.protocol.Errors
-import org.apache.kafka.common.utils.Time
-import org.apache.kafka.common.record.{DefaultRecordBatch, DefaultRecord}
-import org.junit.Test
-import org.junit.Assert._
-
-@deprecated("This test has been deprecated and it will be removed in a future release", "0.10.0.0")
-class SyncProducerTest extends KafkaServerTestHarness {
-  private val messageBytes =  new Array[Byte](2)
-  // turning off controlled shutdown since testProducerCanTimeout() explicitly shuts down request handler pool.
-  def generateConfigs = List(KafkaConfig.fromProps(TestUtils.createBrokerConfigs(1, zkConnect, false).head))
-
-  private def produceRequest(topic: String,
-    partition: Int,
-    message: ByteBufferMessageSet,
-    acks: Int,
-    timeout: Int = SyncProducerConfig.DefaultAckTimeoutMs,
-    correlationId: Int = 0,
-    clientId: String = SyncProducerConfig.DefaultClientId): ProducerRequest = {
-    TestUtils.produceRequest(topic, partition, message, acks, timeout, correlationId, clientId)
-  }
-
-  @Test
-  def testReachableServer() {
-    val server = servers.head
-    val props = TestUtils.getSyncProducerConfig(boundPort(server))
-
-    val producer = new SyncProducer(new SyncProducerConfig(props))
-
-    val firstStart = Time.SYSTEM.milliseconds
-    var response = producer.send(produceRequest("test", 0,
-      new ByteBufferMessageSet(compressionCodec = NoCompressionCodec, messages = new Message(messageBytes)), acks = 1))
-    assertNotNull(response)
-    assertTrue((Time.SYSTEM.milliseconds - firstStart) < 12000)
-
-    val secondStart = Time.SYSTEM.milliseconds
-    response = producer.send(produceRequest("test", 0,
-      new ByteBufferMessageSet(compressionCodec = NoCompressionCodec, messages = new Message(messageBytes)), acks = 1))
-    assertNotNull(response)
-    assertTrue((Time.SYSTEM.milliseconds - secondStart) < 12000)
-
-    response = producer.send(produceRequest("test", 0,
-      new ByteBufferMessageSet(compressionCodec = NoCompressionCodec, messages = new Message(messageBytes)), acks = 1))
-    assertNotNull(response)
-  }
-
-  @Test
-  def testEmptyProduceRequest() {
-    val server = servers.head
-    val props = TestUtils.getSyncProducerConfig(boundPort(server))
-
-
-    val correlationId = 0
-    val clientId = SyncProducerConfig.DefaultClientId
-    val ackTimeoutMs = SyncProducerConfig.DefaultAckTimeoutMs
-    val ack: Short = 1
-    val emptyRequest = new kafka.api.ProducerRequest(correlationId, clientId, ack, ackTimeoutMs, collection.mutable.Map[TopicAndPartition, ByteBufferMessageSet]())
-
-    val producer = new SyncProducer(new SyncProducerConfig(props))
-    val response = producer.send(emptyRequest)
-    assertTrue(response != null)
-    assertTrue(!response.hasError && response.status.isEmpty)
-  }
-
-  @Test
-  def testMessageSizeTooLarge() {
-    val server = servers.head
-    val props = TestUtils.getSyncProducerConfig(boundPort(server))
-
-    val producer = new SyncProducer(new SyncProducerConfig(props))
-    createTopic("test", numPartitions = 1, replicationFactor = 1)
-
-    val message1 = new Message(new Array[Byte](configs.head.messageMaxBytes + 1))
-    val messageSet1 = new ByteBufferMessageSet(compressionCodec = NoCompressionCodec, messages = message1)
-    val response1 = producer.send(produceRequest("test", 0, messageSet1, acks = 1))
-
-    assertEquals(1, response1.status.count(_._2.error != Errors.NONE))
-    assertEquals(Errors.MESSAGE_TOO_LARGE, response1.status(TopicAndPartition("test", 0)).error)
-    assertEquals(-1L, response1.status(TopicAndPartition("test", 0)).offset)
-
-    val safeSize = configs.head.messageMaxBytes - DefaultRecordBatch.RECORD_BATCH_OVERHEAD - DefaultRecord.MAX_RECORD_OVERHEAD
-    val message2 = new Message(new Array[Byte](safeSize))
-    val messageSet2 = new ByteBufferMessageSet(compressionCodec = NoCompressionCodec, messages = message2)
-    val response2 = producer.send(produceRequest("test", 0, messageSet2, acks = 1))
-
-    assertEquals(1, response1.status.count(_._2.error != Errors.NONE))
-    assertEquals(Errors.NONE, response2.status(TopicAndPartition("test", 0)).error)
-    assertEquals(0, response2.status(TopicAndPartition("test", 0)).offset)
-  }
-
-  @Test
-  def testMessageSizeTooLargeWithAckZero() {
-    val server = servers.head
-    val props = TestUtils.getSyncProducerConfig(boundPort(server))
-
-    props.put("request.required.acks", "0")
-
-    val producer = new SyncProducer(new SyncProducerConfig(props))
-    adminZkClient.createTopic("test", 1, 1)
-    TestUtils.waitUntilLeaderIsElectedOrChanged(zkClient, "test", 0)
-
-    // This message will be dropped silently since message size too large.
-    producer.send(produceRequest("test", 0,
-      new ByteBufferMessageSet(compressionCodec = NoCompressionCodec, messages = new Message(new Array[Byte](configs.head.messageMaxBytes + 1))), acks = 0))
-
-    // Send another message whose size is large enough to exceed the buffer size so
-    // the socket buffer will be flushed immediately;
-    // this send should fail since the socket has been closed
-    try {
-      producer.send(produceRequest("test", 0,
-        new ByteBufferMessageSet(compressionCodec = NoCompressionCodec, messages = new Message(new Array[Byte](configs.head.messageMaxBytes + 1))), acks = 0))
-    } catch {
-      case _ : java.io.IOException => // success
-    }
-  }
-
-  @Test
-  def testProduceCorrectlyReceivesResponse() {
-    val server = servers.head
-    val props = TestUtils.getSyncProducerConfig(boundPort(server))
-
-    val producer = new SyncProducer(new SyncProducerConfig(props))
-    val messages = new ByteBufferMessageSet(NoCompressionCodec, new Message(messageBytes))
-
-    // #1 - test that we get an error when partition does not belong to broker in response
-    val request = TestUtils.produceRequestWithAcks(Array("topic1", "topic2", "topic3"), Array(0), messages, 1,
-      timeout = SyncProducerConfig.DefaultAckTimeoutMs, clientId = SyncProducerConfig.DefaultClientId)
-    val response = producer.send(request)
-
-    assertNotNull(response)
-    assertEquals(request.correlationId, response.correlationId)
-    assertEquals(3, response.status.size)
-    response.status.values.foreach {
-      case ProducerResponseStatus(error, nextOffset, timestamp) =>
-        assertEquals(Errors.UNKNOWN_TOPIC_OR_PARTITION, error)
-        assertEquals(-1L, nextOffset)
-        assertEquals(Message.NoTimestamp, timestamp)
-    }
-
-    // #2 - test that we get correct offsets when partition is owned by broker
-    adminZkClient.createTopic("topic1", 1, 1)
-    TestUtils.waitUntilLeaderIsElectedOrChanged(zkClient, "topic1", 0)
-    adminZkClient.createTopic("topic3", 1, 1)
-    TestUtils.waitUntilLeaderIsElectedOrChanged(zkClient, "topic3", 0)
-
-    val response2 = producer.send(request)
-    assertNotNull(response2)
-    assertEquals(request.correlationId, response2.correlationId)
-    assertEquals(3, response2.status.size)
-
-    // the first and last message should have been accepted by broker
-    assertEquals(Errors.NONE, response2.status(TopicAndPartition("topic1", 0)).error)
-    assertEquals(Errors.NONE, response2.status(TopicAndPartition("topic3", 0)).error)
-    assertEquals(0, response2.status(TopicAndPartition("topic1", 0)).offset)
-    assertEquals(0, response2.status(TopicAndPartition("topic3", 0)).offset)
-
-    // the middle message should have been rejected because broker doesn't lead partition
-    assertEquals(Errors.UNKNOWN_TOPIC_OR_PARTITION,
-                        response2.status(TopicAndPartition("topic2", 0)).error)
-    assertEquals(-1, response2.status(TopicAndPartition("topic2", 0)).offset)
-  }
-
-  @Test
-  def testProducerCanTimeout() {
-    val timeoutMs = 500
-
-    val server = servers.head
-    val props = TestUtils.getSyncProducerConfig(boundPort(server))
-    val producer = new SyncProducer(new SyncProducerConfig(props))
-
-    val messages = new ByteBufferMessageSet(NoCompressionCodec, new Message(messageBytes))
-    val request = produceRequest("topic1", 0, messages, acks = 1)
-
-    // stop IO threads and request handling, but leave networking operational
-    // any requests should be accepted and queue up, but not handled
-    server.requestHandlerPool.shutdown()
-
-    val t1 = Time.SYSTEM.milliseconds
-    try {
-      producer.send(request)
-      fail("Should have received timeout exception since request handling is stopped.")
-    } catch {
-      case _: SocketTimeoutException => /* success */
-    }
-    val t2 = Time.SYSTEM.milliseconds
-    // make sure we don't wait fewer than timeoutMs for a response
-    assertTrue((t2-t1) >= timeoutMs)
-  }
-
-  @Test
-  def testProduceRequestWithNoResponse() {
-    val server = servers.head
-
-    val port = TestUtils.boundPort(server)
-    val props = TestUtils.getSyncProducerConfig(port)
-    val correlationId = 0
-    val clientId = SyncProducerConfig.DefaultClientId
-    val ackTimeoutMs = SyncProducerConfig.DefaultAckTimeoutMs
-    val ack: Short = 0
-    val emptyRequest = new kafka.api.ProducerRequest(correlationId, clientId, ack, ackTimeoutMs, collection.mutable.Map[TopicAndPartition, ByteBufferMessageSet]())
-    val producer = new SyncProducer(new SyncProducerConfig(props))
-    val response = producer.send(emptyRequest)
-    assertTrue(response == null)
-  }
-
-  @Test
-  def testNotEnoughReplicas()  {
-    val topicName = "minisrtest"
-    val server = servers.head
-    val props = TestUtils.getSyncProducerConfig(boundPort(server))
-
-    props.put("request.required.acks", "-1")
-
-    val producer = new SyncProducer(new SyncProducerConfig(props))
-    val topicProps = new Properties()
-    topicProps.put("min.insync.replicas","2")
-    adminZkClient.createTopic(topicName, 1, 1,topicProps)
-    TestUtils.waitUntilLeaderIsElectedOrChanged(zkClient, topicName, 0)
-
-    val response = producer.send(produceRequest(topicName, 0,
-      new ByteBufferMessageSet(compressionCodec = NoCompressionCodec, messages = new Message(messageBytes)),-1))
-
-    assertEquals(Errors.NOT_ENOUGH_REPLICAS, response.status(TopicAndPartition(topicName, 0)).error)
-  }
-}
diff --git a/core/src/test/scala/unit/kafka/server/AbstractFetcherThreadTest.scala b/core/src/test/scala/unit/kafka/server/AbstractFetcherThreadTest.scala
index bf6db2f..7ad9371 100644
--- a/core/src/test/scala/unit/kafka/server/AbstractFetcherThreadTest.scala
+++ b/core/src/test/scala/unit/kafka/server/AbstractFetcherThreadTest.scala
@@ -21,7 +21,6 @@ import AbstractFetcherThread._
 import com.yammer.metrics.Metrics
 import kafka.cluster.BrokerEndPoint
 import kafka.server.AbstractFetcherThread.{FetchRequest, PartitionData}
-import kafka.server.OffsetTruncationState
 import kafka.utils.TestUtils
 import org.apache.kafka.common.TopicPartition
 import org.apache.kafka.common.protocol.Errors
diff --git a/core/src/test/scala/unit/kafka/server/DescribeLogDirsRequestTest.scala b/core/src/test/scala/unit/kafka/server/DescribeLogDirsRequestTest.scala
index 8d1eb2c..35e3262 100644
--- a/core/src/test/scala/unit/kafka/server/DescribeLogDirsRequestTest.scala
+++ b/core/src/test/scala/unit/kafka/server/DescribeLogDirsRequestTest.scala
@@ -41,7 +41,7 @@ class DescribeLogDirsRequestTest extends BaseRequestTest {
     val offlineDir = new File(servers.head.config.logDirs.tail.head).getAbsolutePath
     servers.head.replicaManager.handleLogDirFailure(offlineDir)
     createTopic(topic, partitionNum, 1)
-    TestUtils.produceMessages(servers, topic, 10)
+    TestUtils.generateAndProduceMessages(servers, topic, 10)
 
     val request = new DescribeLogDirsRequest.Builder(null).build()
     val response = connectAndSend(request, ApiKeys.DESCRIBE_LOG_DIRS, controllerSocketServer)
diff --git a/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala b/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala
index 96f74a0..8212100 100644
--- a/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala
+++ b/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala
@@ -568,7 +568,7 @@ class KafkaApisTest {
     capturedResponse
   }
 
-  private def setupBasicMetadataCache(topic: String, numPartitions: Int = 1): Unit = {
+  private def setupBasicMetadataCache(topic: String, numPartitions: Int): Unit = {
     val replicas = List(0.asInstanceOf[Integer]).asJava
     val partitionState = new UpdateMetadataRequest.PartitionState(1, 0, 1, replicas, 0, replicas, Collections.emptyList())
     val plaintextListener = ListenerName.forSecurityProtocol(SecurityProtocol.PLAINTEXT)
diff --git a/core/src/test/scala/unit/kafka/server/ReplicaAlterLogDirsThreadTest.scala b/core/src/test/scala/unit/kafka/server/ReplicaAlterLogDirsThreadTest.scala
index c7a07ec..dcbeb21 100644
--- a/core/src/test/scala/unit/kafka/server/ReplicaAlterLogDirsThreadTest.scala
+++ b/core/src/test/scala/unit/kafka/server/ReplicaAlterLogDirsThreadTest.scala
@@ -21,24 +21,20 @@ import kafka.api.Request
 import kafka.cluster.{BrokerEndPoint, Replica, Partition}
 import kafka.log.LogManager
 import kafka.server.AbstractFetcherThread.ResultWithPartitions
-import kafka.server.FetchPartitionData
 import kafka.server.epoch.LeaderEpochCache
 import org.apache.kafka.common.errors.{ReplicaNotAvailableException, KafkaStorageException}
 import kafka.utils.{DelayedItem, TestUtils}
 import org.apache.kafka.common.TopicPartition
 import org.apache.kafka.common.protocol.Errors
-import org.apache.kafka.common.requests.{EpochEndOffset, FetchResponse, FetchMetadata => JFetchMetadata}
-import org.apache.kafka.common.requests.FetchResponse.PartitionData
+import org.apache.kafka.common.requests.EpochEndOffset
 import org.apache.kafka.common.requests.EpochEndOffset.{UNDEFINED_EPOCH_OFFSET, UNDEFINED_EPOCH}
-import org.apache.kafka.common.utils.SystemTime
 import org.easymock.EasyMock._
 import org.easymock.{Capture, CaptureType, EasyMock, IAnswer}
 import org.junit.Assert._
 import org.junit.Test
 
 import scala.collection.JavaConverters._
-import scala.collection.Seq
-import scala.collection.{Map, mutable}
+import scala.collection.{Seq, Map}
 
 class ReplicaAlterLogDirsThreadTest {
 
diff --git a/core/src/test/scala/unit/kafka/server/ReplicaFetcherThreadTest.scala b/core/src/test/scala/unit/kafka/server/ReplicaFetcherThreadTest.scala
index f8f4948..ac5b7ed 100644
--- a/core/src/test/scala/unit/kafka/server/ReplicaFetcherThreadTest.scala
+++ b/core/src/test/scala/unit/kafka/server/ReplicaFetcherThreadTest.scala
@@ -215,8 +215,6 @@ class ReplicaFetcherThreadTest {
     // Create a capture to track what partitions/offsets are truncated
     val truncateToCapture: Capture[Long] = newCapture(CaptureType.ALL)
 
-    val config = KafkaConfig.fromProps(TestUtils.createBrokerConfig(1, "localhost:1234"))
-
     // Setup all the dependencies
     val configs = TestUtils.createBrokerConfigs(1, "localhost:1234").map(KafkaConfig.fromProps)
     val quota = createNiceMock(classOf[ReplicationQuotaManager])
diff --git a/core/src/test/scala/unit/kafka/tools/ConsoleProducerTest.scala b/core/src/test/scala/unit/kafka/tools/ConsoleProducerTest.scala
index da80c0d..0e4b5b3 100644
--- a/core/src/test/scala/unit/kafka/tools/ConsoleProducerTest.scala
+++ b/core/src/test/scala/unit/kafka/tools/ConsoleProducerTest.scala
@@ -17,7 +17,6 @@
 
 package kafka.tools
 
-import kafka.producer.ProducerConfig
 import ConsoleProducer.LineMessageReader
 import org.apache.kafka.clients.producer.KafkaProducer
 import org.junit.{Assert, Test}
@@ -45,18 +44,11 @@ class ConsoleProducerTest {
     val config = new ConsoleProducer.ProducerConfig(validArgs)
     // New ProducerConfig constructor is package private, so we can't call it directly
     // Creating new Producer to validate instead
-    val producer = new KafkaProducer(ConsoleProducer.getNewProducerProps(config))
+    val producer = new KafkaProducer(ConsoleProducer.producerProps(config))
     producer.close()
   }
 
   @Test
-  @deprecated("This test has been deprecated and it will be removed in a future release.", "0.10.0.0")
-  def testValidConfigsOldProducer() {
-    val config = new ConsoleProducer.ProducerConfig(validArgs)
-    new ProducerConfig(ConsoleProducer.getOldProducerProps(config))
-  }
-
-  @Test
   def testInvalidConfigs() {
     try {
       new ConsoleProducer.ProducerConfig(invalidArgs)
diff --git a/core/src/test/scala/unit/kafka/utils/TestUtils.scala b/core/src/test/scala/unit/kafka/utils/TestUtils.scala
index 16b7e87..ec6c756 100755
--- a/core/src/test/scala/unit/kafka/utils/TestUtils.scala
+++ b/core/src/test/scala/unit/kafka/utils/TestUtils.scala
@@ -28,13 +28,10 @@ import javax.net.ssl.X509TrustManager
 
 import kafka.api._
 import kafka.cluster.{Broker, EndPoint}
-import kafka.common.TopicAndPartition
 import kafka.consumer.{ConsumerConfig, ConsumerTimeoutException, KafkaStream}
 import kafka.log._
 import kafka.message._
-import kafka.producer._
 import kafka.security.auth.{Acl, Authorizer, Resource}
-import kafka.serializer.{DefaultEncoder, Encoder, StringEncoder}
 import kafka.server._
 import kafka.server.checkpoints.OffsetCheckpointFile
 import Implicits._
@@ -507,28 +504,6 @@ object TestUtils extends Logging {
     builder.toString
   }
 
-  /**
-   * Create a producer with a few pre-configured properties.
-   * If certain properties need to be overridden, they can be provided in producerProps.
-   */
-  @deprecated("This method has been deprecated and it will be removed in a future release.", "0.10.0.0")
-  def createProducer[K, V](brokerList: String,
-                           encoder: String = classOf[DefaultEncoder].getName,
-                           keyEncoder: String = classOf[DefaultEncoder].getName,
-                           partitioner: String = classOf[DefaultPartitioner].getName,
-                           producerProps: Properties = null): Producer[K, V] = {
-    val props: Properties = getProducerConfig(brokerList)
-
-    //override any explicitly specified properties
-    if (producerProps != null)
-      props ++= producerProps
-
-    props.put("serializer.class", encoder)
-    props.put("key.serializer.class", keyEncoder)
-    props.put("partitioner.class", partitioner)
-    new Producer[K, V](new kafka.producer.ProducerConfig(props))
-  }
-
   def securityConfigs(mode: Mode,
                               securityProtocol: SecurityProtocol,
                               trustStoreFile: Option[File],
@@ -672,18 +647,6 @@ object TestUtils extends Logging {
     props
   }
 
-  @deprecated("This method has been deprecated and will be removed in a future release", "0.11.0.0")
-  def getSyncProducerConfig(port: Int): Properties = {
-    val props = new Properties()
-    props.put("host", "localhost")
-    props.put("port", port.toString)
-    props.put("request.timeout.ms", "10000")
-    props.put("request.required.acks", "1")
-    props.put("serializer.class", classOf[StringEncoder].getName)
-    props
-  }
-
-
   @deprecated("This method has been deprecated and will be removed in a future release.", "0.11.0.0")
   def updateConsumerOffset(config : ConsumerConfig, path : String, offset : Long) = {
     val zkUtils = ZkUtils(config.zkConnect, config.zkSessionTimeoutMs, config.zkConnectionTimeoutMs, false)
@@ -731,34 +694,6 @@ object TestUtils extends Logging {
     buffer
   }
 
-  /**
-   * Create a wired format request based on simple basic information
-   */
-  @deprecated("This method has been deprecated and it will be removed in a future release", "0.10.0.0")
-  def produceRequest(topic: String,
-                     partition: Int,
-                     message: ByteBufferMessageSet,
-                     acks: Int,
-                     timeout: Int,
-                     correlationId: Int = 0,
-                     clientId: String): ProducerRequest = {
-    produceRequestWithAcks(Seq(topic), Seq(partition), message, acks, timeout, correlationId, clientId)
-  }
-
-  @deprecated("This method has been deprecated and it will be removed in a future release", "0.10.0.0")
-  def produceRequestWithAcks(topics: Seq[String],
-                             partitions: Seq[Int],
-                             message: ByteBufferMessageSet,
-                             acks: Int,
-                             timeout: Int,
-                             correlationId: Int = 0,
-                             clientId: String): ProducerRequest = {
-    val data = topics.flatMap(topic =>
-      partitions.map(partition => (TopicAndPartition(topic,  partition), message))
-    )
-    new ProducerRequest(correlationId, clientId, acks.toShort, timeout, collection.mutable.Map(data:_*))
-  }
-
   def makeLeaderForPartition(zkClient: KafkaZkClient,
                              topic: String,
                              leaderPerPartitionMap: scala.collection.immutable.Map[Int, Int],
@@ -1040,73 +975,32 @@ object TestUtils extends Logging {
                    logDirFailureChannel = new LogDirFailureChannel(logDirs.size))
   }
 
-  @deprecated("This method has been deprecated and it will be removed in a future release.", "0.10.0.0")
-  def sendMessages(servers: Seq[KafkaServer],
-                   topic: String,
-                   numMessages: Int,
-                   partition: Int = -1,
-                   compression: CompressionCodec = NoCompressionCodec): List[String] = {
-    val header = "test-%d".format(partition)
-    val props = new Properties()
-    props.put("compression.codec", compression.codec.toString)
-    val ms = 0.until(numMessages).map(x => header + "-" + x)
-
-    // Specific Partition
-    if (partition >= 0) {
-      val producer: Producer[Int, String] =
-        createProducer(TestUtils.getBrokerListStrFromServers(servers),
-          encoder = classOf[StringEncoder].getName,
-          keyEncoder = classOf[IntEncoder].getName,
-          partitioner = classOf[FixedValuePartitioner].getName,
-          producerProps = props)
-
-      producer.send(ms.map(m => new KeyedMessage[Int, String](topic, partition, m)): _*)
-      debug("Sent %d messages for partition [%s,%d]".format(ms.size, topic, partition))
-      producer.close()
-      ms.toList
-    } else {
-      // Use topic as the key to determine partition
-      val producer: Producer[String, String] = createProducer(
-        TestUtils.getBrokerListStrFromServers(servers),
-        encoder = classOf[StringEncoder].getName,
-        keyEncoder = classOf[StringEncoder].getName,
-        partitioner = classOf[DefaultPartitioner].getName,
-        producerProps = props)
-      producer.send(ms.map(m => new KeyedMessage[String, String](topic, topic, m)): _*)
-      producer.close()
-      debug("Sent %d messages for topic [%s]".format(ms.size, topic))
-      ms.toList
-    }
-  }
-
   def produceMessages(servers: Seq[KafkaServer],
-                      topic: String,
-                      numMessages: Int,
+                      records: Seq[ProducerRecord[Array[Byte], Array[Byte]]],
                       acks: Int = -1,
-                      valueBytes: Int = -1): Seq[Array[Byte]] = {
-
-    val producer = createNewProducer(
-      TestUtils.getBrokerListStrFromServers(servers),
-      retries = 5,
-      acks = acks
-    )
-    val values = try {
-      val curValues = (0 until numMessages).map(x => valueBytes match {
-        case -1 => s"test-$x".getBytes
-        case _ => new Array[Byte](valueBytes)
-      })
-
-      val futures = curValues.map { value =>
-        producer.send(new ProducerRecord(topic, value))
-      }
+                      compressionType: CompressionType = CompressionType.NONE): Unit = {
+    val props = new Properties()
+    props.put(ProducerConfig.COMPRESSION_TYPE_CONFIG, compressionType.name)
+    val producer = createNewProducer(TestUtils.getBrokerListStrFromServers(servers), retries = 5, acks = acks)
+    try {
+      val futures = records.map(producer.send)
       futures.foreach(_.get)
-      curValues 
     } finally {
       producer.close()
     }
 
-    debug(s"Sent ${values.size} messages for topic [$topic]")
+    val topics = records.map(_.topic).distinct
+    debug(s"Sent ${records.size} messages for topics ${topics.mkString(",")}")
+  }
 
+  def generateAndProduceMessages(servers: Seq[KafkaServer],
+                                 topic: String,
+                                 numMessages: Int,
+                                 acks: Int = -1,
+                                 compressionType: CompressionType = CompressionType.NONE): Seq[String] = {
+    val values = (0 until numMessages).map(x =>  s"test-$x")
+    val records = values.map(v => new ProducerRecord[Array[Byte], Array[Byte]](topic, v.getBytes))
+    produceMessages(servers, records, acks, compressionType)
     values
   }
 
@@ -1130,7 +1024,7 @@ object TestUtils extends Logging {
    */
   @deprecated("This method has been deprecated and will be removed in a future release.", "0.11.0.0")
   def getMessages(topicMessageStreams: Map[String, List[KafkaStream[String, String]]],
-                     nMessagesPerThread: Int = -1): List[String] = {
+                  nMessagesPerThread: Int = -1): List[String] = {
 
     var messages: List[String] = Nil
     val shouldGetAllMessages = nMessagesPerThread < 0
@@ -1544,19 +1438,3 @@ object TestUtils extends Logging {
   }
 
 }
-
-class IntEncoder(props: VerifiableProperties = null) extends Encoder[Int] {
-  override def toBytes(n: Int) = n.toString.getBytes
-}
-
-@deprecated("This class is deprecated and it will be removed in a future release.", "0.10.0.0")
-class StaticPartitioner(props: VerifiableProperties = null) extends Partitioner {
-  def partition(data: Any, numPartitions: Int): Int = {
-    data.asInstanceOf[String].length % numPartitions
-  }
-}
-
-@deprecated("This class has been deprecated and it will be removed in a future release.", "0.10.0.0")
-class FixedValuePartitioner(props: VerifiableProperties = null) extends Partitioner {
-  def partition(data: Any, numPartitions: Int): Int = data.asInstanceOf[Int]
-}
diff --git a/docs/upgrade.html b/docs/upgrade.html
index 7ae69ce..c7b8aaa 100644
--- a/docs/upgrade.html
+++ b/docs/upgrade.html
@@ -74,6 +74,11 @@
         JMX monitoring tools that do not automatically aggregate. To get the total count for a specific request type, the tool needs to be
         updated to aggregate across different versions.
     </li>
+    <li>The Scala producers, which have been deprecated since 0.10.0.0, have been removed. The Java producer has been the recommended option
+        since 0.9.0.0. Note that the behaviour of the default partitioner in the Java producer differs from the default partitioner
+        in the Scala producers. Users migrating should consider configuring a custom partitioner that retains the previous behaviour.</li>
+    <li>The ConsoleProducer no longer supports the Scala producer.</li>
+    <li>The deprecated kafka.tools.ProducerPerformance has been removed, please use org.apache.kafka.tools.ProducerPerformance.</li>
     <li>New Kafka Streams configuration parameter <code>upgrade.from</code> added that allows rolling bounce upgrade from older version. </li>
     <li><a href="https://cwiki.apache.org/confluence/x/DVyHB">KIP-284</a> changed the retention time for Kafka Streams repartition topics by setting its default value to <code>Long.MAX_VALUE</code>.</li>
     <li>Updated <code>ProcessorStateManager</code> APIs in Kafka Streams for registering state stores to the processor topology. For more details please read the Streams <a href="/{{version}}/documentation/streams/upgrade-guide#streams_api_changes_200">Upgrade Guide</a>.</li>
diff --git a/streams/streams-scala/src/test/scala/org/apache/kafka/streams/scala/TopologyTest.scala b/streams/streams-scala/src/test/scala/org/apache/kafka/streams/scala/TopologyTest.scala
index e8b9f0f..ffae666 100644
--- a/streams/streams-scala/src/test/scala/org/apache/kafka/streams/scala/TopologyTest.scala
+++ b/streams/streams-scala/src/test/scala/org/apache/kafka/streams/scala/TopologyTest.scala
@@ -19,7 +19,6 @@
  */
 package org.apache.kafka.streams.scala
 
-import java.util.Properties
 import java.util.regex.Pattern
 
 import org.scalatest.junit.JUnitSuite
@@ -28,11 +27,9 @@ import org.junit._
 
 import org.apache.kafka.streams.scala.kstream._
 
-import org.apache.kafka.common.serialization._
-
 import ImplicitConversions._
 
-import org.apache.kafka.streams.{KafkaStreams => KafkaStreamsJ, StreamsBuilder => StreamsBuilderJ, _}
+import org.apache.kafka.streams.{StreamsBuilder => StreamsBuilderJ, _}
 import org.apache.kafka.streams.kstream.{KTable => KTableJ, KStream => KStreamJ, KGroupedStream => KGroupedStreamJ, _}
 import collection.JavaConverters._
 
@@ -53,7 +50,6 @@ class TopologyTest extends JUnitSuite {
     def getTopologyScala(): TopologyDescription = {
 
       import Serdes._
-      import collection.JavaConverters._
   
       val streamBuilder = new StreamsBuilder
       val textLines = streamBuilder.stream[String, String](inputTopic)
@@ -88,7 +84,6 @@ class TopologyTest extends JUnitSuite {
     def getTopologyScala(): TopologyDescription = {
 
       import Serdes._
-      import collection.JavaConverters._
   
       val streamBuilder = new StreamsBuilder
       val textLines = streamBuilder.stream[String, String](inputTopic)
diff --git a/tests/kafkatest/services/performance/producer_performance.py b/tests/kafkatest/services/performance/producer_performance.py
index 18790a7..9cd56ec 100644
--- a/tests/kafkatest/services/performance/producer_performance.py
+++ b/tests/kafkatest/services/performance/producer_performance.py
@@ -88,7 +88,6 @@ class ProducerPerformanceService(HttpMetricsCollector, PerformanceService):
             # tool from the development branch
             tools_jar = self.path.jar(TOOLS_JAR_NAME, DEV_BRANCH)
             tools_dependant_libs_jar = self.path.jar(TOOLS_DEPENDANT_TEST_LIBS_JAR_NAME, DEV_BRANCH)
-            tools_dependant_libs_jar = self.path.jar(TOOLS_DEPENDANT_TEST_LIBS_JAR_NAME, DEV_BRANCH)
 
             for jar in (tools_jar, tools_dependant_libs_jar):
                 cmd += "for file in %s; do CLASSPATH=$CLASSPATH:$file; done; " % jar

-- 
To stop receiving notification emails like this one, please contact
lindong@apache.org.