You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by gw...@apache.org on 2016/03/18 02:12:46 UTC

kafka git commit: KAFKA-2982; Mark the old Scala producer and related classes as deprecated

Repository: kafka
Updated Branches:
  refs/heads/trunk dea0719e9 -> e89a9ce1a


KAFKA-2982; Mark the old Scala producer and related classes as deprecated

Also update server tests to always use new producer.

Author: Ismael Juma <is...@juma.me.uk>

Reviewers: Gwen Shapira

Closes #1092 from ijuma/kafka-2982-deprecate-old-producers


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/e89a9ce1
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/e89a9ce1
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/e89a9ce1

Branch: refs/heads/trunk
Commit: e89a9ce1a4383af32435c7f4ee04361b1b65797d
Parents: dea0719
Author: Ismael Juma <is...@juma.me.uk>
Authored: Thu Mar 17 18:12:40 2016 -0700
Committer: Gwen Shapira <cs...@gmail.com>
Committed: Thu Mar 17 18:12:40 2016 -0700

----------------------------------------------------------------------
 .../main/scala/kafka/client/ClientUtils.scala   |  1 +
 .../scala/kafka/javaapi/producer/Producer.scala |  2 +
 .../scala/kafka/metrics/KafkaMetricsGroup.scala |  1 +
 .../scala/kafka/producer/BaseProducer.scala     |  6 ++
 .../kafka/producer/BrokerPartitionInfo.scala    |  3 +-
 .../kafka/producer/ByteArrayPartitioner.scala   |  2 +
 .../kafka/producer/DefaultPartitioner.scala     |  2 +
 .../scala/kafka/producer/KeyedMessage.scala     |  2 +
 .../main/scala/kafka/producer/Partitioner.scala |  2 +
 .../main/scala/kafka/producer/Producer.scala    |  3 +-
 .../producer/ProducerClosedException.scala      |  1 +
 .../scala/kafka/producer/ProducerConfig.scala   |  4 +
 .../scala/kafka/producer/ProducerPool.scala     |  3 +-
 .../kafka/producer/ProducerRequestStats.scala   |  3 +
 .../scala/kafka/producer/ProducerStats.scala    |  2 +
 .../kafka/producer/ProducerTopicStats.scala     |  4 +-
 .../scala/kafka/producer/SyncProducer.scala     |  4 +
 .../kafka/producer/SyncProducerConfig.scala     |  6 ++
 .../producer/async/AsyncProducerConfig.scala    |  2 +
 .../producer/async/DefaultEventHandler.scala    |  1 +
 .../kafka/producer/async/EventHandler.scala     |  1 +
 .../async/IllegalQueueStateException.scala      |  1 +
 .../producer/async/MissingConfigException.scala |  1 +
 .../producer/async/ProducerSendThread.scala     |  1 +
 .../scala/kafka/tools/KafkaMigrationTool.java   |  2 +-
 .../kafka/api/BaseProducerSendTest.scala        | 22 +----
 .../kafka/api/PlaintextProducerSendTest.scala   | 24 ++++-
 .../kafka/api/ProducerFailureHandlingTest.scala |  9 +-
 .../scala/kafka/tools/ConsoleProducerTest.scala |  3 +-
 .../scala/unit/kafka/common/ConfigTest.scala    |  1 +
 .../ZookeeperConsumerConnectorTest.scala        |  1 +
 .../kafka/integration/AutoOffsetResetTest.scala |  1 +
 .../unit/kafka/integration/FetcherTest.scala    |  4 +-
 .../kafka/integration/PrimitiveApiTest.scala    |  1 +
 .../ProducerConsumerTestHarness.scala           |  1 +
 .../integration/UncleanLeaderElectionTest.scala | 23 +++--
 .../ZookeeperConsumerConnectorTest.scala        |  2 +-
 .../scala/unit/kafka/metrics/MetricsTest.scala  |  6 +-
 .../unit/kafka/network/SocketServerTest.scala   | 14 ++-
 .../unit/kafka/producer/AsyncProducerTest.scala | 21 +++--
 .../unit/kafka/producer/ProducerTest.scala      |  1 +
 .../unit/kafka/producer/SyncProducerTest.scala  | 34 ++++---
 .../kafka/server/BaseReplicaFetchTest.scala     | 22 +++--
 .../unit/kafka/server/LogRecoveryTest.scala     | 25 ++---
 .../unit/kafka/server/ServerShutdownTest.scala  | 36 +++----
 .../test/scala/unit/kafka/utils/TestUtils.scala | 99 ++++++++++++--------
 46 files changed, 263 insertions(+), 147 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/e89a9ce1/core/src/main/scala/kafka/client/ClientUtils.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/client/ClientUtils.scala b/core/src/main/scala/kafka/client/ClientUtils.scala
index 2093749..fd1fc26 100755
--- a/core/src/main/scala/kafka/client/ClientUtils.scala
+++ b/core/src/main/scala/kafka/client/ClientUtils.scala
@@ -42,6 +42,7 @@ object ClientUtils extends Logging{
    * @param producerConfig The producer's config
    * @return topic metadata response
    */
+  @deprecated("This method has been deprecated and will be removed in a future release.", "0.10.0.0")
   def fetchTopicMetadata(topics: Set[String], brokers: Seq[BrokerEndPoint], producerConfig: ProducerConfig, correlationId: Int): TopicMetadataResponse = {
     var fetchMetaDataSucceeded: Boolean = false
     var i: Int = 0

http://git-wip-us.apache.org/repos/asf/kafka/blob/e89a9ce1/core/src/main/scala/kafka/javaapi/producer/Producer.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/javaapi/producer/Producer.scala b/core/src/main/scala/kafka/javaapi/producer/Producer.scala
index c465da5..44f9245 100644
--- a/core/src/main/scala/kafka/javaapi/producer/Producer.scala
+++ b/core/src/main/scala/kafka/javaapi/producer/Producer.scala
@@ -21,6 +21,8 @@ import kafka.producer.ProducerConfig
 import kafka.producer.KeyedMessage
 import scala.collection.mutable
 
+@deprecated("This class has been deprecated and will be removed in a future release. " +
+            "Please use org.apache.kafka.clients.producer.KafkaProducer instead.", "0.10.0.0")
 class Producer[K,V](private val underlying: kafka.producer.Producer[K,V]) // for testing only
 {
   def this(config: ProducerConfig) = this(new kafka.producer.Producer[K,V](config))

http://git-wip-us.apache.org/repos/asf/kafka/blob/e89a9ce1/core/src/main/scala/kafka/metrics/KafkaMetricsGroup.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/metrics/KafkaMetricsGroup.scala b/core/src/main/scala/kafka/metrics/KafkaMetricsGroup.scala
index 72ecae1..12dfeb1 100644
--- a/core/src/main/scala/kafka/metrics/KafkaMetricsGroup.scala
+++ b/core/src/main/scala/kafka/metrics/KafkaMetricsGroup.scala
@@ -192,6 +192,7 @@ object KafkaMetricsGroup extends KafkaMetricsGroup with Logging {
     removeAllMetricsInList(KafkaMetricsGroup.consumerMetricNameList, clientId)
   }
 
+  @deprecated("This method has been deprecated and will be removed in a future release.", "0.10.0.0")
   def removeAllProducerMetrics(clientId: String) {
     ProducerRequestStatsRegistry.removeProducerRequestStats(clientId)
     ProducerTopicStatsRegistry.removeProducerTopicStats(clientId)

http://git-wip-us.apache.org/repos/asf/kafka/blob/e89a9ce1/core/src/main/scala/kafka/producer/BaseProducer.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/producer/BaseProducer.scala b/core/src/main/scala/kafka/producer/BaseProducer.scala
index 9d0976f..83d9aa7 100644
--- a/core/src/main/scala/kafka/producer/BaseProducer.scala
+++ b/core/src/main/scala/kafka/producer/BaseProducer.scala
@@ -21,11 +21,15 @@ import java.util.Properties
 
 // A base producer used whenever we need to have options for both old and new producers;
 // this class will be removed once we fully rolled out 0.9
+@deprecated("This trait has been deprecated and will be removed in a future release. " +
+            "Please use org.apache.kafka.clients.producer.KafkaProducer instead.", "0.10.0.0")
 trait BaseProducer {
   def send(topic: String, key: Array[Byte], value: Array[Byte])
   def close()
 }
 
+@deprecated("This class has been deprecated and will be removed in a future release. " +
+            "Please use org.apache.kafka.clients.producer.KafkaProducer instead.", "0.10.0.0")
 class NewShinyProducer(producerProps: Properties) extends BaseProducer {
   import org.apache.kafka.clients.producer.{KafkaProducer, ProducerRecord}
   import org.apache.kafka.clients.producer.internals.ErrorLoggingCallback
@@ -50,6 +54,8 @@ class NewShinyProducer(producerProps: Properties) extends BaseProducer {
   }
 }
 
+@deprecated("This class has been deprecated and will be removed in a future release. " +
+            "Please use org.apache.kafka.clients.producer.KafkaProducer instead.", "0.10.0.0")
 class OldProducer(producerProps: Properties) extends BaseProducer {
 
   // default to byte array partitioner

http://git-wip-us.apache.org/repos/asf/kafka/blob/e89a9ce1/core/src/main/scala/kafka/producer/BrokerPartitionInfo.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/producer/BrokerPartitionInfo.scala b/core/src/main/scala/kafka/producer/BrokerPartitionInfo.scala
index 6fa00dd..4616c7e 100644
--- a/core/src/main/scala/kafka/producer/BrokerPartitionInfo.scala
+++ b/core/src/main/scala/kafka/producer/BrokerPartitionInfo.scala
@@ -24,7 +24,7 @@ import kafka.common.KafkaException
 import kafka.utils.Logging
 import kafka.client.ClientUtils
 
-
+@deprecated("This class has been deprecated and will be removed in a future release.", "0.10.0.0")
 class BrokerPartitionInfo(producerConfig: ProducerConfig,
                           producerPool: ProducerPool,
                           topicPartitionInfo: HashMap[String, TopicMetadata])
@@ -101,4 +101,5 @@ class BrokerPartitionInfo(producerConfig: ProducerConfig,
 
 }
 
+@deprecated("This class has been deprecated and will be removed in a future release.", "0.10.0.0")
 case class PartitionAndLeader(topic: String, partitionId: Int, leaderBrokerIdOpt: Option[Int])

http://git-wip-us.apache.org/repos/asf/kafka/blob/e89a9ce1/core/src/main/scala/kafka/producer/ByteArrayPartitioner.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/producer/ByteArrayPartitioner.scala b/core/src/main/scala/kafka/producer/ByteArrayPartitioner.scala
index e6b100e..7848456 100755
--- a/core/src/main/scala/kafka/producer/ByteArrayPartitioner.scala
+++ b/core/src/main/scala/kafka/producer/ByteArrayPartitioner.scala
@@ -21,6 +21,8 @@ package kafka.producer
 import kafka.utils._
 import org.apache.kafka.common.utils.Utils
 
+@deprecated("This class has been deprecated and will be removed in a future release. " +
+            "It has been replaced by org.apache.kafka.clients.producer.internals.DefaultPartitioner.", "0.10.0.0")
 class ByteArrayPartitioner(props: VerifiableProperties = null) extends Partitioner {
   def partition(key: Any, numPartitions: Int): Int = {
     Utils.abs(java.util.Arrays.hashCode(key.asInstanceOf[Array[Byte]])) % numPartitions

http://git-wip-us.apache.org/repos/asf/kafka/blob/e89a9ce1/core/src/main/scala/kafka/producer/DefaultPartitioner.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/producer/DefaultPartitioner.scala b/core/src/main/scala/kafka/producer/DefaultPartitioner.scala
index 1141ed1..6b10e51 100755
--- a/core/src/main/scala/kafka/producer/DefaultPartitioner.scala
+++ b/core/src/main/scala/kafka/producer/DefaultPartitioner.scala
@@ -21,6 +21,8 @@ package kafka.producer
 import kafka.utils._
 import org.apache.kafka.common.utils.Utils
 
+@deprecated("This class has been deprecated and will be removed in a future release. " +
+            "It has been replaced by org.apache.kafka.clients.producer.internals.DefaultPartitioner.", "0.10.0.0")
 class DefaultPartitioner(props: VerifiableProperties = null) extends Partitioner {
   private val random = new java.util.Random
   

http://git-wip-us.apache.org/repos/asf/kafka/blob/e89a9ce1/core/src/main/scala/kafka/producer/KeyedMessage.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/producer/KeyedMessage.scala b/core/src/main/scala/kafka/producer/KeyedMessage.scala
index dbcf295..84ea232 100644
--- a/core/src/main/scala/kafka/producer/KeyedMessage.scala
+++ b/core/src/main/scala/kafka/producer/KeyedMessage.scala
@@ -21,6 +21,8 @@ package kafka.producer
  * A topic, key, and value.
  * If a partition key is provided it will override the key for the purpose of partitioning but will not be stored.
  */
+@deprecated("This class has been deprecated and will be removed in a future release. " +
+            "Please use org.apache.kafka.clients.producer.ProducerRecord instead.", "0.10.0.0")
 case class KeyedMessage[K, V](topic: String, key: K, partKey: Any, message: V) {
   if(topic == null)
     throw new IllegalArgumentException("Topic cannot be null.")

http://git-wip-us.apache.org/repos/asf/kafka/blob/e89a9ce1/core/src/main/scala/kafka/producer/Partitioner.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/producer/Partitioner.scala b/core/src/main/scala/kafka/producer/Partitioner.scala
index efe6d6d..5d24692 100644
--- a/core/src/main/scala/kafka/producer/Partitioner.scala
+++ b/core/src/main/scala/kafka/producer/Partitioner.scala
@@ -23,6 +23,8 @@ package kafka.producer
  * Implementations will be constructed via reflection and are required to have a constructor that takes a single 
  * VerifiableProperties instance--this allows passing configuration properties into the partitioner implementation.
  */
+@deprecated("This trait has been deprecated and will be removed in a future release. " +
+            "Please use org.apache.kafka.clients.producer.Partitioner instead.", "0.10.0.0")
 trait Partitioner {
   /**
    * Uses the key to calculate a partition bucket id for routing

http://git-wip-us.apache.org/repos/asf/kafka/blob/e89a9ce1/core/src/main/scala/kafka/producer/Producer.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/producer/Producer.scala b/core/src/main/scala/kafka/producer/Producer.scala
index 4be06c8..c11ad21 100755
--- a/core/src/main/scala/kafka/producer/Producer.scala
+++ b/core/src/main/scala/kafka/producer/Producer.scala
@@ -25,7 +25,8 @@ import kafka.producer.async.{DefaultEventHandler, EventHandler, ProducerSendThre
 import kafka.serializer.Encoder
 import kafka.utils._
 
-
+@deprecated("This class has been deprecated and will be removed in a future release. " +
+            "Please use org.apache.kafka.clients.producer.KafkaProducer instead.", "0.10.0.0")
 class Producer[K,V](val config: ProducerConfig,
                     private val eventHandler: EventHandler[K,V])  // only for unit testing
   extends Logging {

http://git-wip-us.apache.org/repos/asf/kafka/blob/e89a9ce1/core/src/main/scala/kafka/producer/ProducerClosedException.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/producer/ProducerClosedException.scala b/core/src/main/scala/kafka/producer/ProducerClosedException.scala
index 27a5293..4f2f731 100644
--- a/core/src/main/scala/kafka/producer/ProducerClosedException.scala
+++ b/core/src/main/scala/kafka/producer/ProducerClosedException.scala
@@ -17,5 +17,6 @@
 
 package kafka.producer
 
+@deprecated("This class has been deprecated and will be removed in a future release.", "0.10.0.0")
 class ProducerClosedException() extends RuntimeException("producer already closed") {
 }

http://git-wip-us.apache.org/repos/asf/kafka/blob/e89a9ce1/core/src/main/scala/kafka/producer/ProducerConfig.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/producer/ProducerConfig.scala b/core/src/main/scala/kafka/producer/ProducerConfig.scala
index 08a4e51..c2715d0 100755
--- a/core/src/main/scala/kafka/producer/ProducerConfig.scala
+++ b/core/src/main/scala/kafka/producer/ProducerConfig.scala
@@ -23,6 +23,8 @@ import kafka.utils.{CoreUtils, VerifiableProperties}
 import kafka.message.NoCompressionCodec
 import kafka.common.{InvalidConfigException, Config}
 
+@deprecated("This object has been deprecated and will be removed in a future release. " +
+            "Please use org.apache.kafka.clients.producer.ProducerConfig instead.", "0.10.0.0")
 object ProducerConfig extends Config {
   def validate(config: ProducerConfig) {
     validateClientId(config.clientId)
@@ -48,6 +50,8 @@ object ProducerConfig extends Config {
   }
 }
 
+@deprecated("This class has been deprecated and will be removed in a future release. " +
+            "Please use org.apache.kafka.clients.producer.ProducerConfig instead.", "0.10.0.0")
 class ProducerConfig private (val props: VerifiableProperties)
         extends AsyncProducerConfig with SyncProducerConfigShared {
   import ProducerConfig._

http://git-wip-us.apache.org/repos/asf/kafka/blob/e89a9ce1/core/src/main/scala/kafka/producer/ProducerPool.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/producer/ProducerPool.scala b/core/src/main/scala/kafka/producer/ProducerPool.scala
index 5ad6812..60cef63 100644
--- a/core/src/main/scala/kafka/producer/ProducerPool.scala
+++ b/core/src/main/scala/kafka/producer/ProducerPool.scala
@@ -26,7 +26,7 @@ import kafka.utils.Logging
 
 import scala.collection.mutable.HashMap
 
-
+@deprecated("This object has been deprecated and will be removed in a future release.", "0.10.0.0")
 object ProducerPool {
   /**
    * Used in ProducerPool to initiate a SyncProducer connection with a broker.
@@ -40,6 +40,7 @@ object ProducerPool {
   }
 }
 
+@deprecated("This class has been deprecated and will be removed in a future release.", "0.10.0.0")
 class ProducerPool(val config: ProducerConfig) extends Logging {
   private val syncProducers = new HashMap[Int, SyncProducer]
   private val lock = new Object()

http://git-wip-us.apache.org/repos/asf/kafka/blob/e89a9ce1/core/src/main/scala/kafka/producer/ProducerRequestStats.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/producer/ProducerRequestStats.scala b/core/src/main/scala/kafka/producer/ProducerRequestStats.scala
index b453f63..8ab948a 100644
--- a/core/src/main/scala/kafka/producer/ProducerRequestStats.scala
+++ b/core/src/main/scala/kafka/producer/ProducerRequestStats.scala
@@ -21,6 +21,7 @@ import java.util.concurrent.TimeUnit
 import kafka.utils.Pool
 import kafka.common.{ClientIdAllBrokers, ClientIdBroker, ClientIdAndBroker}
 
+@deprecated("This class has been deprecated and will be removed in a future release.", "0.10.0.0")
 class ProducerRequestMetrics(metricId: ClientIdBroker) extends KafkaMetricsGroup {
   val tags = metricId match {
     case ClientIdAndBroker(clientId, brokerHost, brokerPort) => Map("clientId" -> clientId, "brokerHost" -> brokerHost, "brokerPort" -> brokerPort.toString)
@@ -36,6 +37,7 @@ class ProducerRequestMetrics(metricId: ClientIdBroker) extends KafkaMetricsGroup
  * Tracks metrics of requests made by a given producer client to all brokers.
  * @param clientId ClientId of the given producer
  */
+@deprecated("This class has been deprecated and will be removed in a future release.", "0.10.0.0")
 class ProducerRequestStats(clientId: String) {
   private val valueFactory = (k: ClientIdBroker) => new ProducerRequestMetrics(k)
   private val stats = new Pool[ClientIdBroker, ProducerRequestMetrics](Some(valueFactory))
@@ -51,6 +53,7 @@ class ProducerRequestStats(clientId: String) {
 /**
  * Stores the request stats information of each producer client in a (clientId -> ProducerRequestStats) map.
  */
+@deprecated("This object has been deprecated and will be removed in a future release.", "0.10.0.0")
 object ProducerRequestStatsRegistry {
   private val valueFactory = (k: String) => new ProducerRequestStats(k)
   private val globalStats = new Pool[String, ProducerRequestStats](Some(valueFactory))

http://git-wip-us.apache.org/repos/asf/kafka/blob/e89a9ce1/core/src/main/scala/kafka/producer/ProducerStats.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/producer/ProducerStats.scala b/core/src/main/scala/kafka/producer/ProducerStats.scala
index 1d0fa88..9466f26 100644
--- a/core/src/main/scala/kafka/producer/ProducerStats.scala
+++ b/core/src/main/scala/kafka/producer/ProducerStats.scala
@@ -20,6 +20,7 @@ import kafka.metrics.KafkaMetricsGroup
 import java.util.concurrent.TimeUnit
 import kafka.utils.Pool
 
+@deprecated("This class has been deprecated and will be removed in a future release.", "0.10.0.0")
 class ProducerStats(clientId: String) extends KafkaMetricsGroup {
   val tags: Map[String, String] = Map("clientId" -> clientId)
   val serializationErrorRate = newMeter("SerializationErrorsPerSec", "errors", TimeUnit.SECONDS, tags)
@@ -30,6 +31,7 @@ class ProducerStats(clientId: String) extends KafkaMetricsGroup {
 /**
  * Stores metrics of serialization and message sending activity of each producer client in a (clientId -> ProducerStats) map.
  */
+@deprecated("This object has been deprecated and will be removed in a future release.", "0.10.0.0")
 object ProducerStatsRegistry {
   private val valueFactory = (k: String) => new ProducerStats(k)
   private val statsRegistry = new Pool[String, ProducerStats](Some(valueFactory))

http://git-wip-us.apache.org/repos/asf/kafka/blob/e89a9ce1/core/src/main/scala/kafka/producer/ProducerTopicStats.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/producer/ProducerTopicStats.scala b/core/src/main/scala/kafka/producer/ProducerTopicStats.scala
index 97594c8..7bb9610 100644
--- a/core/src/main/scala/kafka/producer/ProducerTopicStats.scala
+++ b/core/src/main/scala/kafka/producer/ProducerTopicStats.scala
@@ -21,7 +21,7 @@ import kafka.common.{ClientIdTopic, ClientIdAllTopics, ClientIdAndTopic}
 import kafka.utils.{Pool, threadsafe}
 import java.util.concurrent.TimeUnit
 
-
+@deprecated("This class has been deprecated and will be removed in a future release.", "0.10.0.0")
 @threadsafe
 class ProducerTopicMetrics(metricId: ClientIdTopic) extends KafkaMetricsGroup {
   val tags = metricId match {
@@ -38,6 +38,7 @@ class ProducerTopicMetrics(metricId: ClientIdTopic) extends KafkaMetricsGroup {
  * Tracks metrics for each topic the given producer client has produced data to.
  * @param clientId The clientId of the given producer client.
  */
+@deprecated("This class has been deprecated and will be removed in a future release.", "0.10.0.0")
 class ProducerTopicStats(clientId: String) {
   private val valueFactory = (k: ClientIdTopic) => new ProducerTopicMetrics(k)
   private val stats = new Pool[ClientIdTopic, ProducerTopicMetrics](Some(valueFactory))
@@ -53,6 +54,7 @@ class ProducerTopicStats(clientId: String) {
 /**
  * Stores the topic stats information of each producer client in a (clientId -> ProducerTopicStats) map.
  */
+@deprecated("This object has been deprecated and will be removed in a future release.", "0.10.0.0")
 object ProducerTopicStatsRegistry {
   private val valueFactory = (k: String) => new ProducerTopicStats(k)
   private val globalStats = new Pool[String, ProducerTopicStats](Some(valueFactory))

http://git-wip-us.apache.org/repos/asf/kafka/blob/e89a9ce1/core/src/main/scala/kafka/producer/SyncProducer.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/producer/SyncProducer.scala b/core/src/main/scala/kafka/producer/SyncProducer.scala
index ec3c4ab..de4f4ad 100644
--- a/core/src/main/scala/kafka/producer/SyncProducer.scala
+++ b/core/src/main/scala/kafka/producer/SyncProducer.scala
@@ -27,6 +27,8 @@ import org.apache.kafka.common.network.NetworkReceive
 import org.apache.kafka.common.protocol.ApiKeys
 import org.apache.kafka.common.utils.Utils._
 
+@deprecated("This object has been deprecated and will be removed in a future release. " +
+            "Please use org.apache.kafka.clients.producer.KafkaProducer instead.", "0.10.0.0")
 object SyncProducer {
   val RequestKey: Short = 0
   val randomGenerator = new Random
@@ -36,6 +38,8 @@ object SyncProducer {
  * Send a message set.
  */
 @threadsafe
+@deprecated("This class has been deprecated and will be removed in a future release. " +
+            "Please use org.apache.kafka.clients.producer.KafkaProducer instead.", "0.10.0.0")
 class SyncProducer(val config: SyncProducerConfig) extends Logging {
 
   private val lock = new Object()

http://git-wip-us.apache.org/repos/asf/kafka/blob/e89a9ce1/core/src/main/scala/kafka/producer/SyncProducerConfig.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/producer/SyncProducerConfig.scala b/core/src/main/scala/kafka/producer/SyncProducerConfig.scala
index a08ce00..207779c 100644
--- a/core/src/main/scala/kafka/producer/SyncProducerConfig.scala
+++ b/core/src/main/scala/kafka/producer/SyncProducerConfig.scala
@@ -20,6 +20,8 @@ package kafka.producer
 import java.util.Properties
 import kafka.utils.VerifiableProperties
 
+@deprecated("This class has been deprecated and will be removed in a future release. " +
+            "Please use org.apache.kafka.clients.producer.ProducerConfig instead.", "0.10.0.0")
 class SyncProducerConfig private (val props: VerifiableProperties) extends SyncProducerConfigShared {
   def this(originalProps: Properties) {
     this(new VerifiableProperties(originalProps))
@@ -33,6 +35,8 @@ class SyncProducerConfig private (val props: VerifiableProperties) extends SyncP
   val port = props.getInt("port")
 }
 
+@deprecated("This trait has been deprecated and will be removed in a future release. " +
+            "Please use org.apache.kafka.clients.producer.ProducerConfig instead.", "0.10.0.0")
 trait SyncProducerConfigShared {
   val props: VerifiableProperties
   
@@ -59,6 +63,8 @@ trait SyncProducerConfigShared {
                                              (1, Integer.MAX_VALUE))
 }
 
+@deprecated("This object has been deprecated and will be removed in a future release. " +
+            "Please use org.apache.kafka.clients.producer.ProducerConfig instead.", "0.10.0.0")
 object SyncProducerConfig {
   val DefaultClientId = ""
   val DefaultRequiredAcks : Short = 0

http://git-wip-us.apache.org/repos/asf/kafka/blob/e89a9ce1/core/src/main/scala/kafka/producer/async/AsyncProducerConfig.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/producer/async/AsyncProducerConfig.scala b/core/src/main/scala/kafka/producer/async/AsyncProducerConfig.scala
index dd39de5..cc3a79d 100644
--- a/core/src/main/scala/kafka/producer/async/AsyncProducerConfig.scala
+++ b/core/src/main/scala/kafka/producer/async/AsyncProducerConfig.scala
@@ -18,6 +18,8 @@ package kafka.producer.async
 
 import kafka.utils.VerifiableProperties
 
+@deprecated("This trait has been deprecated and will be removed in a future release. " +
+            "Please use org.apache.kafka.clients.producer.ProducerConfig instead.", "0.10.0.0")
 trait AsyncProducerConfig {
   val props: VerifiableProperties
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/e89a9ce1/core/src/main/scala/kafka/producer/async/DefaultEventHandler.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/producer/async/DefaultEventHandler.scala b/core/src/main/scala/kafka/producer/async/DefaultEventHandler.scala
index 7abe48a..b79e64b 100755
--- a/core/src/main/scala/kafka/producer/async/DefaultEventHandler.scala
+++ b/core/src/main/scala/kafka/producer/async/DefaultEventHandler.scala
@@ -31,6 +31,7 @@ import java.util.concurrent.atomic._
 import kafka.api.{TopicMetadata, ProducerRequest}
 import org.apache.kafka.common.utils.Utils
 
+@deprecated("This class has been deprecated and will be removed in a future release.", "0.10.0.0")
 class DefaultEventHandler[K,V](config: ProducerConfig,
                                private val partitioner: Partitioner,
                                private val encoder: Encoder[V],

http://git-wip-us.apache.org/repos/asf/kafka/blob/e89a9ce1/core/src/main/scala/kafka/producer/async/EventHandler.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/producer/async/EventHandler.scala b/core/src/main/scala/kafka/producer/async/EventHandler.scala
index e724000..3a17bfb 100644
--- a/core/src/main/scala/kafka/producer/async/EventHandler.scala
+++ b/core/src/main/scala/kafka/producer/async/EventHandler.scala
@@ -21,6 +21,7 @@ import kafka.producer.KeyedMessage
 /**
  * Handler that dispatches the batched data from the queue.
  */
+@deprecated("This trait has been deprecated and will be removed in a future release.", "0.10.0.0")
 trait EventHandler[K,V] {
 
   /**

http://git-wip-us.apache.org/repos/asf/kafka/blob/e89a9ce1/core/src/main/scala/kafka/producer/async/IllegalQueueStateException.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/producer/async/IllegalQueueStateException.scala b/core/src/main/scala/kafka/producer/async/IllegalQueueStateException.scala
index 9ecdf76..7779715 100644
--- a/core/src/main/scala/kafka/producer/async/IllegalQueueStateException.scala
+++ b/core/src/main/scala/kafka/producer/async/IllegalQueueStateException.scala
@@ -20,6 +20,7 @@ package kafka.producer.async
 /**
  * Indicates that the given config parameter has invalid value
  */
+@deprecated("This class has been deprecated and will be removed in a future release.", "0.10.0.0")
 class IllegalQueueStateException(message: String) extends RuntimeException(message) {
   def this() = this(null)
 }

http://git-wip-us.apache.org/repos/asf/kafka/blob/e89a9ce1/core/src/main/scala/kafka/producer/async/MissingConfigException.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/producer/async/MissingConfigException.scala b/core/src/main/scala/kafka/producer/async/MissingConfigException.scala
index 304e0b2..a42678b 100644
--- a/core/src/main/scala/kafka/producer/async/MissingConfigException.scala
+++ b/core/src/main/scala/kafka/producer/async/MissingConfigException.scala
@@ -18,6 +18,7 @@
 package kafka.producer.async
 
 /* Indicates any missing configuration parameter */
+@deprecated("This class has been deprecated and will be removed in a future release.", "0.10.0.0")
 class MissingConfigException(message: String) extends RuntimeException(message) {
   def this() = this(null)
 }

http://git-wip-us.apache.org/repos/asf/kafka/blob/e89a9ce1/core/src/main/scala/kafka/producer/async/ProducerSendThread.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/producer/async/ProducerSendThread.scala b/core/src/main/scala/kafka/producer/async/ProducerSendThread.scala
index 8a903f3..d423757 100644
--- a/core/src/main/scala/kafka/producer/async/ProducerSendThread.scala
+++ b/core/src/main/scala/kafka/producer/async/ProducerSendThread.scala
@@ -24,6 +24,7 @@ import kafka.producer.KeyedMessage
 import kafka.metrics.KafkaMetricsGroup
 import com.yammer.metrics.core.Gauge
 
+@deprecated("This class has been deprecated and will be removed in a future release.", "0.10.0.0")
 class ProducerSendThread[K,V](val threadName: String,
                               val queue: BlockingQueue[KeyedMessage[K,V]],
                               val handler: EventHandler[K,V],

http://git-wip-us.apache.org/repos/asf/kafka/blob/e89a9ce1/core/src/main/scala/kafka/tools/KafkaMigrationTool.java
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/tools/KafkaMigrationTool.java b/core/src/main/scala/kafka/tools/KafkaMigrationTool.java
index b1ab649..0b94902 100755
--- a/core/src/main/scala/kafka/tools/KafkaMigrationTool.java
+++ b/core/src/main/scala/kafka/tools/KafkaMigrationTool.java
@@ -60,7 +60,7 @@ import java.util.concurrent.atomic.AtomicBoolean;
  * The user need to provide the configuration file for 0.7 consumer and 0.8 producer. For 0.8 producer,
  * the "serializer.class" config is set to "kafka.serializer.DefaultEncoder" by the code.
  */
-@SuppressWarnings({"unchecked", "rawtypes"})
+@SuppressWarnings({"unchecked", "rawtypes", "deprecation"})
 public class KafkaMigrationTool {
     private static final org.apache.log4j.Logger log = org.apache.log4j.Logger.getLogger(KafkaMigrationTool.class.getName());
     private static final String KAFKA_07_STATIC_CONSUMER_CLASS_NAME = "kafka.consumer.Consumer";

http://git-wip-us.apache.org/repos/asf/kafka/blob/e89a9ce1/core/src/test/scala/integration/kafka/api/BaseProducerSendTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/integration/kafka/api/BaseProducerSendTest.scala b/core/src/test/scala/integration/kafka/api/BaseProducerSendTest.scala
index 2d89bf8..49ce748 100644
--- a/core/src/test/scala/integration/kafka/api/BaseProducerSendTest.scala
+++ b/core/src/test/scala/integration/kafka/api/BaseProducerSendTest.scala
@@ -48,7 +48,7 @@ abstract class BaseProducerSendTest extends KafkaServerTestHarness {
   private var consumer2: SimpleConsumer = null
   private val producers = Buffer[KafkaProducer[Array[Byte], Array[Byte]]]()
 
-  private val topic = "topic"
+  protected val topic = "topic"
   private val numRecords = 100
 
   @Before
@@ -227,26 +227,6 @@ abstract class BaseProducerSendTest extends KafkaServerTestHarness {
     }
   }
 
-  @Test
-  def testWrongSerializer() {
-    // send a record with a wrong type should receive a serialization exception
-    try {
-      val producer = createProducerWithWrongSerializer(brokerList)
-      val record5 = new ProducerRecord[Array[Byte], Array[Byte]](topic, new Integer(0), "key".getBytes, "value".getBytes)
-      producer.send(record5)
-      fail("Should have gotten a SerializationException")
-    } catch {
-      case se: SerializationException => // this is ok
-    }
-  }
-
-  private def createProducerWithWrongSerializer(brokerList: String): KafkaProducer[Array[Byte], Array[Byte]] = {
-    val producerProps = new Properties()
-    producerProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer")
-    producerProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer")
-    createProducer(brokerList, props = Some(producerProps))
-  }
-
   /**
    * testClose checks the closing behavior
    *

http://git-wip-us.apache.org/repos/asf/kafka/blob/e89a9ce1/core/src/test/scala/integration/kafka/api/PlaintextProducerSendTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/integration/kafka/api/PlaintextProducerSendTest.scala b/core/src/test/scala/integration/kafka/api/PlaintextProducerSendTest.scala
index d017d13..111bc15 100644
--- a/core/src/test/scala/integration/kafka/api/PlaintextProducerSendTest.scala
+++ b/core/src/test/scala/integration/kafka/api/PlaintextProducerSendTest.scala
@@ -19,8 +19,9 @@ package kafka.api
 
 import java.util.Properties
 
-import org.apache.kafka.clients.producer.{ProducerConfig, KafkaProducer}
+import org.apache.kafka.clients.producer.{KafkaProducer, ProducerConfig, ProducerRecord}
 import org.apache.kafka.common.config.ConfigException
+import org.apache.kafka.common.errors.SerializationException
 import org.apache.kafka.common.serialization.ByteArraySerializer
 import org.junit.Test
 
@@ -51,4 +52,25 @@ class PlaintextProducerSendTest extends BaseProducerSendTest {
     return new KafkaProducer[Array[Byte],Array[Byte]](producerProps, new ByteArraySerializer, new ByteArraySerializer)
   }
 
+  @Test
+  def testWrongSerializer() {
+    // send a record with a wrong type should receive a serialization exception
+    try {
+      val producer = createProducerWithWrongSerializer(brokerList)
+      val record5 = new ProducerRecord[Array[Byte], Array[Byte]](topic, new Integer(0), "key".getBytes, "value".getBytes)
+      producer.send(record5)
+      fail("Should have gotten a SerializationException")
+    } catch {
+      case se: SerializationException => // this is ok
+    }
+  }
+
+  private def createProducerWithWrongSerializer(brokerList: String): KafkaProducer[Array[Byte], Array[Byte]] = {
+    val producerProps = new Properties()
+    producerProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, brokerList)
+    producerProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer")
+    producerProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer")
+    return new KafkaProducer[Array[Byte],Array[Byte]](producerProps)
+  }
+
 }

http://git-wip-us.apache.org/repos/asf/kafka/blob/e89a9ce1/core/src/test/scala/integration/kafka/api/ProducerFailureHandlingTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/integration/kafka/api/ProducerFailureHandlingTest.scala b/core/src/test/scala/integration/kafka/api/ProducerFailureHandlingTest.scala
index 2bb203d..7a22c73 100644
--- a/core/src/test/scala/integration/kafka/api/ProducerFailureHandlingTest.scala
+++ b/core/src/test/scala/integration/kafka/api/ProducerFailureHandlingTest.scala
@@ -63,9 +63,12 @@ class ProducerFailureHandlingTest extends KafkaServerTestHarness {
   override def setUp() {
     super.setUp()
 
-    producer1 = TestUtils.createNewProducer(brokerList, acks = 0, maxBlockMs = 10000L, bufferSize = producerBufferSize)
-    producer2 = TestUtils.createNewProducer(brokerList, acks = 1, maxBlockMs = 10000L, bufferSize = producerBufferSize)
-    producer3 = TestUtils.createNewProducer(brokerList, acks = -1, maxBlockMs = 10000L, bufferSize = producerBufferSize)
+    producer1 = TestUtils.createNewProducer(brokerList, acks = 0, requestTimeoutMs = 30000L, maxBlockMs = 10000L,
+      bufferSize = producerBufferSize)
+    producer2 = TestUtils.createNewProducer(brokerList, acks = 1, requestTimeoutMs = 30000L, maxBlockMs = 10000L,
+      bufferSize = producerBufferSize)
+    producer3 = TestUtils.createNewProducer(brokerList, acks = -1, requestTimeoutMs = 30000L, maxBlockMs = 10000L,
+      bufferSize = producerBufferSize)
   }
 
   @After

http://git-wip-us.apache.org/repos/asf/kafka/blob/e89a9ce1/core/src/test/scala/kafka/tools/ConsoleProducerTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/kafka/tools/ConsoleProducerTest.scala b/core/src/test/scala/kafka/tools/ConsoleProducerTest.scala
index 7e211b7..4ddc7fe 100644
--- a/core/src/test/scala/kafka/tools/ConsoleProducerTest.scala
+++ b/core/src/test/scala/kafka/tools/ConsoleProducerTest.scala
@@ -51,9 +51,10 @@ class ConsoleProducerTest {
   }
 
   @Test
+  @deprecated("This test has been deprecated and it will be removed in a future release.", "0.10.0.0")
   def testValidConfigsOldProducer() {
     val config = new ConsoleProducer.ProducerConfig(validArgs)
-    new producer.ProducerConfig(ConsoleProducer.getOldProducerProps(config));
+    new producer.ProducerConfig(ConsoleProducer.getOldProducerProps(config))
   }
 
   @Test

http://git-wip-us.apache.org/repos/asf/kafka/blob/e89a9ce1/core/src/test/scala/unit/kafka/common/ConfigTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/common/ConfigTest.scala b/core/src/test/scala/unit/kafka/common/ConfigTest.scala
index a42836c..26154f2 100644
--- a/core/src/test/scala/unit/kafka/common/ConfigTest.scala
+++ b/core/src/test/scala/unit/kafka/common/ConfigTest.scala
@@ -26,6 +26,7 @@ import kafka.consumer.ConsumerConfig
 class ConfigTest {
 
   @Test
+  @deprecated("This test is deprecated and it will be removed in a future release.", "0.10.0.0")
   def testInvalidClientIds() {
     val invalidClientIds = new ArrayBuffer[String]()
     val badChars = Array('/', '\\', ',', '\u0000', ':', "\"", '\'', ';', '*', '?', ' ', '\t', '\r', '\n', '=')

http://git-wip-us.apache.org/repos/asf/kafka/blob/e89a9ce1/core/src/test/scala/unit/kafka/consumer/ZookeeperConsumerConnectorTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/consumer/ZookeeperConsumerConnectorTest.scala b/core/src/test/scala/unit/kafka/consumer/ZookeeperConsumerConnectorTest.scala
index 28b1dd5..a69fba1 100644
--- a/core/src/test/scala/unit/kafka/consumer/ZookeeperConsumerConnectorTest.scala
+++ b/core/src/test/scala/unit/kafka/consumer/ZookeeperConsumerConnectorTest.scala
@@ -34,6 +34,7 @@ import org.junit.{Test, After, Before}
 
 import scala.collection._
 
+@deprecated("This test has been deprecated and it will be removed in a future release", "0.10.0.0")
 class ZookeeperConsumerConnectorTest extends KafkaServerTestHarness with Logging {
 
   val RebalanceBackoffMs = 5000

http://git-wip-us.apache.org/repos/asf/kafka/blob/e89a9ce1/core/src/test/scala/unit/kafka/integration/AutoOffsetResetTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/integration/AutoOffsetResetTest.scala b/core/src/test/scala/unit/kafka/integration/AutoOffsetResetTest.scala
index a71ddf1..4515b94 100644
--- a/core/src/test/scala/unit/kafka/integration/AutoOffsetResetTest.scala
+++ b/core/src/test/scala/unit/kafka/integration/AutoOffsetResetTest.scala
@@ -28,6 +28,7 @@ import org.junit.{After, Before, Test}
 import org.apache.log4j.{Level, Logger}
 import org.junit.Assert._
 
+@deprecated("This test has been deprecated and it will be removed in a future release", "0.10.0.0")
 class AutoOffsetResetTest extends KafkaServerTestHarness with Logging {
 
   def generateConfigs() = List(KafkaConfig.fromProps(TestUtils.createBrokerConfig(0, zkConnect)))

http://git-wip-us.apache.org/repos/asf/kafka/blob/e89a9ce1/core/src/test/scala/unit/kafka/integration/FetcherTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/integration/FetcherTest.scala b/core/src/test/scala/unit/kafka/integration/FetcherTest.scala
index 5af5d1a..3dd0454 100644
--- a/core/src/test/scala/unit/kafka/integration/FetcherTest.scala
+++ b/core/src/test/scala/unit/kafka/integration/FetcherTest.scala
@@ -68,11 +68,11 @@ class FetcherTest extends KafkaServerTestHarness {
   @Test
   def testFetcher() {
     val perNode = 2
-    var count = TestUtils.sendMessages(servers, topic, perNode).size
+    var count = TestUtils.produceMessages(servers, topic, perNode).size
 
     fetch(count)
     assertQueueEmpty()
-    count = TestUtils.sendMessages(servers, topic, perNode).size
+    count = TestUtils.produceMessages(servers, topic, perNode).size
     fetch(count)
     assertQueueEmpty()
   }

http://git-wip-us.apache.org/repos/asf/kafka/blob/e89a9ce1/core/src/test/scala/unit/kafka/integration/PrimitiveApiTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/integration/PrimitiveApiTest.scala b/core/src/test/scala/unit/kafka/integration/PrimitiveApiTest.scala
index df752db..beb5d0e 100755
--- a/core/src/test/scala/unit/kafka/integration/PrimitiveApiTest.scala
+++ b/core/src/test/scala/unit/kafka/integration/PrimitiveApiTest.scala
@@ -34,6 +34,7 @@ import java.util.Properties
 /**
  * End to end tests of the primitive apis against a local server
  */
+@deprecated("This test has been deprecated and it will be removed in a future release", "0.10.0.0")
 class PrimitiveApiTest extends ProducerConsumerTestHarness with ZooKeeperTestHarness {
   val requestHandlerLogger = Logger.getLogger(classOf[KafkaRequestHandler])
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/e89a9ce1/core/src/test/scala/unit/kafka/integration/ProducerConsumerTestHarness.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/integration/ProducerConsumerTestHarness.scala b/core/src/test/scala/unit/kafka/integration/ProducerConsumerTestHarness.scala
index cc5954d..2fdfc48 100644
--- a/core/src/test/scala/unit/kafka/integration/ProducerConsumerTestHarness.scala
+++ b/core/src/test/scala/unit/kafka/integration/ProducerConsumerTestHarness.scala
@@ -23,6 +23,7 @@ import kafka.producer.Producer
 import kafka.utils.{StaticPartitioner, TestUtils}
 import kafka.serializer.StringEncoder
 
+@deprecated("This test has been deprecated and it will be removed in a future release", "0.10.0.0")
 trait ProducerConsumerTestHarness extends KafkaServerTestHarness {
   val host = "localhost"
   var producer: Producer[String, String] = null

http://git-wip-us.apache.org/repos/asf/kafka/blob/e89a9ce1/core/src/test/scala/unit/kafka/integration/UncleanLeaderElectionTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/integration/UncleanLeaderElectionTest.scala b/core/src/test/scala/unit/kafka/integration/UncleanLeaderElectionTest.scala
index 8e72ad3..b725d8b 100755
--- a/core/src/test/scala/unit/kafka/integration/UncleanLeaderElectionTest.scala
+++ b/core/src/test/scala/unit/kafka/integration/UncleanLeaderElectionTest.scala
@@ -23,6 +23,8 @@ import org.junit.{Test, After, Before}
 import scala.util.Random
 import org.apache.log4j.{Level, Logger}
 import java.util.Properties
+import java.util.concurrent.ExecutionException
+
 import kafka.admin.AdminUtils
 import kafka.common.FailedToSendMessageException
 import kafka.consumer.{Consumer, ConsumerConfig}
@@ -31,6 +33,7 @@ import kafka.server.{KafkaConfig, KafkaServer}
 import kafka.utils.CoreUtils
 import kafka.utils.TestUtils._
 import kafka.zk.ZooKeeperTestHarness
+import org.apache.kafka.common.errors.TimeoutException
 import org.junit.Assert._
 
 class UncleanLeaderElectionTest extends ZooKeeperTestHarness {
@@ -180,14 +183,14 @@ class UncleanLeaderElectionTest extends ZooKeeperTestHarness {
     val followerId = if (leaderId == brokerId1) brokerId2 else brokerId1
     debug("Follower for " + topic  + " is: %s".format(followerId))
 
-    sendMessage(servers, topic, "first")
+    produceMessage(servers, topic, "first")
     waitUntilMetadataIsPropagated(servers, topic, partitionId)
     assertEquals(List("first"), consumeAllMessages(topic))
 
     // shutdown follower server
     servers.filter(server => server.config.brokerId == followerId).map(server => shutdownServer(server))
 
-    sendMessage(servers, topic, "second")
+    produceMessage(servers, topic, "second")
     assertEquals(List("first", "second"), consumeAllMessages(topic))
 
     // shutdown leader and then restart follower
@@ -197,7 +200,7 @@ class UncleanLeaderElectionTest extends ZooKeeperTestHarness {
     // wait until new leader is (uncleanly) elected
     waitUntilLeaderIsElectedOrChanged(zkUtils, topic, partitionId, newLeaderOpt = Some(followerId))
 
-    sendMessage(servers, topic, "third")
+    produceMessage(servers, topic, "third")
 
     // second message was lost due to unclean election
     assertEquals(List("first", "third"), consumeAllMessages(topic))
@@ -215,14 +218,14 @@ class UncleanLeaderElectionTest extends ZooKeeperTestHarness {
     val followerId = if (leaderId == brokerId1) brokerId2 else brokerId1
     debug("Follower for " + topic  + " is: %s".format(followerId))
 
-    sendMessage(servers, topic, "first")
+    produceMessage(servers, topic, "first")
     waitUntilMetadataIsPropagated(servers, topic, partitionId)
     assertEquals(List("first"), consumeAllMessages(topic))
 
     // shutdown follower server
     servers.filter(server => server.config.brokerId == followerId).map(server => shutdownServer(server))
 
-    sendMessage(servers, topic, "second")
+    produceMessage(servers, topic, "second")
     assertEquals(List("first", "second"), consumeAllMessages(topic))
 
     // shutdown leader and then restart follower
@@ -233,16 +236,20 @@ class UncleanLeaderElectionTest extends ZooKeeperTestHarness {
     waitUntilLeaderIsElectedOrChanged(zkUtils, topic, partitionId, newLeaderOpt = Some(-1))
 
     // message production and consumption should both fail while leader is down
-    intercept[FailedToSendMessageException] {
-      sendMessage(servers, topic, "third")
+    try {
+      produceMessage(servers, topic, "third")
+      fail("Message produced while leader is down should fail, but it succeeded")
+    } catch {
+      case e: ExecutionException if e.getCause.isInstanceOf[TimeoutException] => // expected
     }
+
     assertEquals(List.empty[String], consumeAllMessages(topic))
 
     // restart leader temporarily to send a successfully replicated message
     servers.filter(server => server.config.brokerId == leaderId).map(server => server.startup())
     waitUntilLeaderIsElectedOrChanged(zkUtils, topic, partitionId, newLeaderOpt = Some(leaderId))
 
-    sendMessage(servers, topic, "third")
+    produceMessage(servers, topic, "third")
     waitUntilMetadataIsPropagated(servers, topic, partitionId)
     servers.filter(server => server.config.brokerId == leaderId).map(server => shutdownServer(server))
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/e89a9ce1/core/src/test/scala/unit/kafka/javaapi/consumer/ZookeeperConsumerConnectorTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/javaapi/consumer/ZookeeperConsumerConnectorTest.scala b/core/src/test/scala/unit/kafka/javaapi/consumer/ZookeeperConsumerConnectorTest.scala
index 88d95e8..e4c4697 100644
--- a/core/src/test/scala/unit/kafka/javaapi/consumer/ZookeeperConsumerConnectorTest.scala
+++ b/core/src/test/scala/unit/kafka/javaapi/consumer/ZookeeperConsumerConnectorTest.scala
@@ -36,7 +36,7 @@ import scala.collection.JavaConversions
 import org.apache.log4j.{Level, Logger}
 import org.junit.Assert._
 
-
+@deprecated("This test has been deprecated and it will be removed in a future release", "0.10.0.0")
 class ZookeeperConsumerConnectorTest extends KafkaServerTestHarness with ZooKeeperTestHarness with Logging {
   val numNodes = 2
   val numParts = 2

http://git-wip-us.apache.org/repos/asf/kafka/blob/e89a9ce1/core/src/test/scala/unit/kafka/metrics/MetricsTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/metrics/MetricsTest.scala b/core/src/test/scala/unit/kafka/metrics/MetricsTest.scala
index ee41fd7..3707deb 100644
--- a/core/src/test/scala/unit/kafka/metrics/MetricsTest.scala
+++ b/core/src/test/scala/unit/kafka/metrics/MetricsTest.scala
@@ -52,6 +52,7 @@ class MetricsTest extends KafkaServerTestHarness with Logging {
   }
 
   @Test
+  @deprecated("This test has been deprecated and it will be removed in a future release", "0.10.0.0")
   def testMetricsLeak() {
     // create topic topic1 with 1 partition on broker 0
     createTopic(zkUtils, topic, numPartitions = 1, replicationFactor = 1, servers = servers)
@@ -78,13 +79,14 @@ class MetricsTest extends KafkaServerTestHarness with Logging {
     assertFalse("Topic metrics exists after deleteTopic", checkTopicMetricsExists(topic))
   }
 
+  @deprecated("This test has been deprecated and it will be removed in a future release", "0.10.0.0")
   def createAndShutdownStep(group: String, consumerId: String, producerId: String): Unit = {
-    val sentMessages1 = sendMessages(servers, topic, nMessages)
+    sendMessages(servers, topic, nMessages)
     // create a consumer
     val consumerConfig1 = new ConsumerConfig(TestUtils.createConsumerProperties(zkConnect, group, consumerId))
     val zkConsumerConnector1 = new ZookeeperConsumerConnector(consumerConfig1, true)
     val topicMessageStreams1 = zkConsumerConnector1.createMessageStreams(Map(topic -> 1), new StringDecoder(), new StringDecoder())
-    val receivedMessages1 = getMessages(topicMessageStreams1, nMessages)
+    getMessages(topicMessageStreams1, nMessages)
 
     zkConsumerConnector1.shutdown()
   }

http://git-wip-us.apache.org/repos/asf/kafka/blob/e89a9ce1/core/src/test/scala/unit/kafka/network/SocketServerTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/network/SocketServerTest.scala b/core/src/test/scala/unit/kafka/network/SocketServerTest.scala
index d94c314..5d28894 100644
--- a/core/src/test/scala/unit/kafka/network/SocketServerTest.scala
+++ b/core/src/test/scala/unit/kafka/network/SocketServerTest.scala
@@ -17,7 +17,6 @@
 
 package kafka.network;
 
-
 import java.net._
 import javax.net.ssl._
 import java.io._
@@ -33,7 +32,6 @@ import org.apache.kafka.common.TopicPartition
 import org.apache.kafka.common.requests.{ProduceRequest, RequestHeader}
 import org.apache.kafka.common.utils.SystemTime
 
-import kafka.producer.SyncProducerConfig
 import kafka.server.KafkaConfig
 import kafka.utils.TestUtils
 
@@ -103,9 +101,9 @@ class SocketServerTest extends JUnitSuite {
   private def producerRequestBytes: Array[Byte] = {
     val apiKey: Short = 0
     val correlationId = -1
-    val clientId = SyncProducerConfig.DefaultClientId
-    val ackTimeoutMs = SyncProducerConfig.DefaultAckTimeoutMs
-    val ack = SyncProducerConfig.DefaultRequiredAcks
+    val clientId = ""
+    val ackTimeoutMs = 10000
+    val ack = 0: Short
 
     val emptyHeader = new RequestHeader(apiKey, clientId, correlationId)
     val emptyRequest = new ProduceRequest(ack, ackTimeoutMs, new HashMap[TopicPartition, ByteBuffer]())
@@ -249,9 +247,9 @@ class SocketServerTest extends JUnitSuite {
 
       val apiKey = ApiKeys.PRODUCE.id
       val correlationId = -1
-      val clientId = SyncProducerConfig.DefaultClientId
-      val ackTimeoutMs = SyncProducerConfig.DefaultAckTimeoutMs
-      val ack = SyncProducerConfig.DefaultRequiredAcks
+      val clientId = ""
+      val ackTimeoutMs = 10000
+      val ack = 0: Short
       val emptyHeader = new RequestHeader(apiKey, clientId, correlationId)
       val emptyRequest = new ProduceRequest(ack, ackTimeoutMs, new HashMap[TopicPartition, ByteBuffer]())
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/e89a9ce1/core/src/test/scala/unit/kafka/producer/AsyncProducerTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/producer/AsyncProducerTest.scala b/core/src/test/scala/unit/kafka/producer/AsyncProducerTest.scala
index f711ca4..3088199 100755
--- a/core/src/test/scala/unit/kafka/producer/AsyncProducerTest.scala
+++ b/core/src/test/scala/unit/kafka/producer/AsyncProducerTest.scala
@@ -35,7 +35,13 @@ import scala.collection.Map
 import scala.collection.mutable.ArrayBuffer
 import kafka.utils._
 
+@deprecated("This test has been deprecated and it will be removed in a future release.", "0.10.0.0")
 class AsyncProducerTest {
+
+  class NegativePartitioner(props: VerifiableProperties = null) extends Partitioner {
+    def partition(data: Any, numPartitions: Int): Int = -1
+  }
+
   // One of the few cases we can just set a fixed port because the producer is mocked out here since this uses mocks
   val props = Seq(createBrokerConfig(1, "127.0.0.1:1", port=65534))
   val configs = props.map(KafkaConfig.fromProps)
@@ -373,15 +379,20 @@ class AsyncProducerTest {
 
     val msgs = TestUtils.getMsgStrings(2)
 
+    import SyncProducerConfig.{DefaultAckTimeoutMs, DefaultClientId}
+
     // produce request for topic1 and partitions 0 and 1.  Let the first request fail
     // entirely.  The second request will succeed for partition 1 but fail for partition 0.
     // On the third try for partition 0, let it succeed.
-    val request1 = TestUtils.produceRequestWithAcks(List(topic1), List(0, 1), messagesToSet(msgs), acks = 1, correlationId = 11)
-    val request2 = TestUtils.produceRequestWithAcks(List(topic1), List(0, 1), messagesToSet(msgs), acks = 1, correlationId = 17)
+    val request1 = TestUtils.produceRequestWithAcks(List(topic1), List(0, 1), messagesToSet(msgs), acks = 1,
+      correlationId = 11, timeout = DefaultAckTimeoutMs, clientId = DefaultClientId)
+    val request2 = TestUtils.produceRequestWithAcks(List(topic1), List(0, 1), messagesToSet(msgs), acks = 1,
+      correlationId = 17, timeout = DefaultAckTimeoutMs, clientId = DefaultClientId)
     val response1 = ProducerResponse(0,
       Map((TopicAndPartition("topic1", 0), ProducerResponseStatus(Errors.NOT_LEADER_FOR_PARTITION.code, 0L)),
           (TopicAndPartition("topic1", 1), ProducerResponseStatus(Errors.NONE.code, 0L))))
-    val request3 = TestUtils.produceRequest(topic1, 0, messagesToSet(msgs), acks = 1, correlationId = 21)
+    val request3 = TestUtils.produceRequest(topic1, 0, messagesToSet(msgs), acks = 1, correlationId = 21,
+      timeout = DefaultAckTimeoutMs, clientId = DefaultClientId)
     val response2 = ProducerResponse(0,
       Map((TopicAndPartition("topic1", 0), ProducerResponseStatus(Errors.NONE.code, 0L))))
     val mockSyncProducer = EasyMock.createMock(classOf[SyncProducer])
@@ -480,7 +491,3 @@ class AsyncProducerTest {
       messages.map(m => new Message(key = key, bytes = m, timestamp = 0L, magicValue = Message.MagicValue_V1)): _*)
   }
 }
-
-class NegativePartitioner(props: VerifiableProperties = null) extends Partitioner {
-  def partition(data: Any, numPartitions: Int): Int = -1
-}

http://git-wip-us.apache.org/repos/asf/kafka/blob/e89a9ce1/core/src/test/scala/unit/kafka/producer/ProducerTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/producer/ProducerTest.scala b/core/src/test/scala/unit/kafka/producer/ProducerTest.scala
index de19f6f..4a1ad5a 100755
--- a/core/src/test/scala/unit/kafka/producer/ProducerTest.scala
+++ b/core/src/test/scala/unit/kafka/producer/ProducerTest.scala
@@ -37,6 +37,7 @@ import org.junit.Assert._
 import org.junit.{After, Before, Test}
 import org.scalatest.exceptions.TestFailedException
 
+@deprecated("This test has been deprecated and it will be removed in a future release.", "0.10.0.0")
 class ProducerTest extends ZooKeeperTestHarness with Logging{
   private val brokerId1 = 0
   private val brokerId2 = 1

http://git-wip-us.apache.org/repos/asf/kafka/blob/e89a9ce1/core/src/test/scala/unit/kafka/producer/SyncProducerTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/producer/SyncProducerTest.scala b/core/src/test/scala/unit/kafka/producer/SyncProducerTest.scala
index c1034fe..8e234d2 100644
--- a/core/src/test/scala/unit/kafka/producer/SyncProducerTest.scala
+++ b/core/src/test/scala/unit/kafka/producer/SyncProducerTest.scala
@@ -21,7 +21,7 @@ import java.net.SocketTimeoutException
 import java.util.Properties
 
 import kafka.admin.AdminUtils
-import kafka.api.ProducerResponseStatus
+import kafka.api.{ProducerRequest, ProducerResponseStatus}
 import kafka.common.TopicAndPartition
 import kafka.integration.KafkaServerTestHarness
 import kafka.message._
@@ -31,11 +31,22 @@ import org.apache.kafka.common.protocol.{Errors, SecurityProtocol}
 import org.junit.Test
 import org.junit.Assert._
 
+@deprecated("This test has been deprecated and it will be removed in a future release", "0.10.0.0")
 class SyncProducerTest extends KafkaServerTestHarness {
   private val messageBytes =  new Array[Byte](2)
   // turning off controlled shutdown since testProducerCanTimeout() explicitly shuts down request handler pool.
   def generateConfigs() = List(KafkaConfig.fromProps(TestUtils.createBrokerConfigs(1, zkConnect, false).head))
 
+  private def produceRequest(topic: String,
+    partition: Int,
+    message: ByteBufferMessageSet,
+    acks: Int,
+    timeout: Int = SyncProducerConfig.DefaultAckTimeoutMs,
+    correlationId: Int = 0,
+    clientId: String = SyncProducerConfig.DefaultClientId): ProducerRequest = {
+    TestUtils.produceRequest(topic, partition, message, acks, timeout, correlationId, clientId)
+  }
+
   @Test
   def testReachableServer() {
     val server = servers.head
@@ -46,7 +57,7 @@ class SyncProducerTest extends KafkaServerTestHarness {
     val producer = new SyncProducer(new SyncProducerConfig(props))
     val firstStart = SystemTime.milliseconds
     try {
-      val response = producer.send(TestUtils.produceRequest("test", 0,
+      val response = producer.send(produceRequest("test", 0,
         new ByteBufferMessageSet(compressionCodec = NoCompressionCodec, messages = new Message(messageBytes)), acks = 1))
       assertNotNull(response)
     } catch {
@@ -56,7 +67,7 @@ class SyncProducerTest extends KafkaServerTestHarness {
     assertTrue((firstEnd-firstStart) < 500)
     val secondStart = SystemTime.milliseconds
     try {
-      val response = producer.send(TestUtils.produceRequest("test", 0,
+      val response = producer.send(produceRequest("test", 0,
         new ByteBufferMessageSet(compressionCodec = NoCompressionCodec, messages = new Message(messageBytes)), acks = 1))
       assertNotNull(response)
     } catch {
@@ -65,7 +76,7 @@ class SyncProducerTest extends KafkaServerTestHarness {
     val secondEnd = SystemTime.milliseconds
     assertTrue((secondEnd-secondStart) < 500)
     try {
-      val response = producer.send(TestUtils.produceRequest("test", 0,
+      val response = producer.send(produceRequest("test", 0,
         new ByteBufferMessageSet(compressionCodec = NoCompressionCodec, messages = new Message(messageBytes)), acks = 1))
       assertNotNull(response)
     } catch {
@@ -101,7 +112,7 @@ class SyncProducerTest extends KafkaServerTestHarness {
 
     val message1 = new Message(new Array[Byte](configs(0).messageMaxBytes + 1))
     val messageSet1 = new ByteBufferMessageSet(compressionCodec = NoCompressionCodec, messages = message1)
-    val response1 = producer.send(TestUtils.produceRequest("test", 0, messageSet1, acks = 1))
+    val response1 = producer.send(produceRequest("test", 0, messageSet1, acks = 1))
 
     assertEquals(1, response1.status.count(_._2.error != Errors.NONE.code))
     assertEquals(Errors.MESSAGE_TOO_LARGE.code, response1.status(TopicAndPartition("test", 0)).error)
@@ -110,7 +121,7 @@ class SyncProducerTest extends KafkaServerTestHarness {
     val safeSize = configs(0).messageMaxBytes - Message.MinMessageOverhead - Message.TimestampLength - MessageSet.LogOverhead - 1
     val message2 = new Message(new Array[Byte](safeSize))
     val messageSet2 = new ByteBufferMessageSet(compressionCodec = NoCompressionCodec, messages = message2)
-    val response2 = producer.send(TestUtils.produceRequest("test", 0, messageSet2, acks = 1))
+    val response2 = producer.send(produceRequest("test", 0, messageSet2, acks = 1))
 
     assertEquals(1, response1.status.count(_._2.error != Errors.NONE.code))
     assertEquals(Errors.NONE.code, response2.status(TopicAndPartition("test", 0)).error)
@@ -130,14 +141,14 @@ class SyncProducerTest extends KafkaServerTestHarness {
     TestUtils.waitUntilLeaderIsElectedOrChanged(zkUtils, "test", 0)
 
     // This message will be dropped silently since message size too large.
-    producer.send(TestUtils.produceRequest("test", 0,
+    producer.send(produceRequest("test", 0,
       new ByteBufferMessageSet(compressionCodec = NoCompressionCodec, messages = new Message(new Array[Byte](configs(0).messageMaxBytes + 1))), acks = 0))
 
     // Send another message whose size is large enough to exceed the buffer size so
     // the socket buffer will be flushed immediately;
     // this send should fail since the socket has been closed
     try {
-      producer.send(TestUtils.produceRequest("test", 0,
+      producer.send(produceRequest("test", 0,
         new ByteBufferMessageSet(compressionCodec = NoCompressionCodec, messages = new Message(new Array[Byte](configs(0).messageMaxBytes + 1))), acks = 0))
     } catch {
       case e : java.io.IOException => // success
@@ -154,7 +165,8 @@ class SyncProducerTest extends KafkaServerTestHarness {
     val messages = new ByteBufferMessageSet(NoCompressionCodec, new Message(messageBytes))
 
     // #1 - test that we get an error when partition does not belong to broker in response
-    val request = TestUtils.produceRequestWithAcks(Array("topic1", "topic2", "topic3"), Array(0), messages, 1)
+    val request = TestUtils.produceRequestWithAcks(Array("topic1", "topic2", "topic3"), Array(0), messages, 1,
+      timeout = SyncProducerConfig.DefaultAckTimeoutMs, clientId = SyncProducerConfig.DefaultClientId)
     val response = producer.send(request)
 
     assertNotNull(response)
@@ -199,7 +211,7 @@ class SyncProducerTest extends KafkaServerTestHarness {
     val producer = new SyncProducer(new SyncProducerConfig(props))
 
     val messages = new ByteBufferMessageSet(NoCompressionCodec, new Message(messageBytes))
-    val request = TestUtils.produceRequest("topic1", 0, messages, acks = 1)
+    val request = produceRequest("topic1", 0, messages, acks = 1)
 
     // stop IO threads and request handling, but leave networking operational
     // any requests should be accepted and queue up, but not handled
@@ -248,7 +260,7 @@ class SyncProducerTest extends KafkaServerTestHarness {
     AdminUtils.createTopic(zkUtils, topicName, 1, 1,topicProps)
     TestUtils.waitUntilLeaderIsElectedOrChanged(zkUtils, topicName, 0)
 
-    val response = producer.send(TestUtils.produceRequest(topicName, 0,
+    val response = producer.send(produceRequest(topicName, 0,
       new ByteBufferMessageSet(compressionCodec = NoCompressionCodec, messages = new Message(messageBytes)),-1))
 
     assertEquals(Errors.NOT_ENOUGH_REPLICAS.code, response.status(TopicAndPartition(topicName, 0)).error)

http://git-wip-us.apache.org/repos/asf/kafka/blob/e89a9ce1/core/src/test/scala/unit/kafka/server/BaseReplicaFetchTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/server/BaseReplicaFetchTest.scala b/core/src/test/scala/unit/kafka/server/BaseReplicaFetchTest.scala
index 5ecc2c0..c5b61de 100644
--- a/core/src/test/scala/unit/kafka/server/BaseReplicaFetchTest.scala
+++ b/core/src/test/scala/unit/kafka/server/BaseReplicaFetchTest.scala
@@ -20,13 +20,13 @@ package kafka.server
 import java.io.File
 
 import org.apache.kafka.common.protocol.SecurityProtocol
-import org.junit.{Test, After, Before}
+import org.junit.{After, Before, Test}
 import kafka.zk.ZooKeeperTestHarness
-import kafka.utils.TestUtils._
-import kafka.producer.KeyedMessage
-import kafka.serializer.StringEncoder
-import kafka.utils.{TestUtils}
+import kafka.utils.TestUtils
+import TestUtils._
 import kafka.common._
+import org.apache.kafka.clients.producer.ProducerRecord
+import org.apache.kafka.common.serialization.StringSerializer
 
 abstract class BaseReplicaFetchTest extends ZooKeeperTestHarness  {
   var brokers: Seq[KafkaServer] = null
@@ -63,11 +63,13 @@ abstract class BaseReplicaFetchTest extends ZooKeeperTestHarness  {
     }
 
     // send test messages to leader
-    val producer = TestUtils.createProducer[String, String](TestUtils.getBrokerListStrFromServers(brokers),
-                                                            encoder = classOf[StringEncoder].getName,
-                                                            keyEncoder = classOf[StringEncoder].getName)
-    val messages = testMessageList1.map(m => new KeyedMessage(topic1, m, m)) ++ testMessageList2.map(m => new KeyedMessage(topic2, m, m))
-    producer.send(messages:_*)
+    val producer = TestUtils.createNewProducer(TestUtils.getBrokerListStrFromServers(brokers),
+                                               retries = 5,
+                                               keySerializer = new StringSerializer,
+                                               valueSerializer = new StringSerializer)
+    val records = testMessageList1.map(m => new ProducerRecord(topic1, m, m)) ++
+      testMessageList2.map(m => new ProducerRecord(topic2, m, m))
+    records.map(producer.send).foreach(_.get)
     producer.close()
 
     def logsMatch(): Boolean = {

http://git-wip-us.apache.org/repos/asf/kafka/blob/e89a9ce1/core/src/test/scala/unit/kafka/server/LogRecoveryTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/server/LogRecoveryTest.scala b/core/src/test/scala/unit/kafka/server/LogRecoveryTest.scala
index d11c40f..e13bfd9 100755
--- a/core/src/test/scala/unit/kafka/server/LogRecoveryTest.scala
+++ b/core/src/test/scala/unit/kafka/server/LogRecoveryTest.scala
@@ -19,15 +19,14 @@ package kafka.server
 import java.util.Properties
 
 import kafka.utils.TestUtils._
-import kafka.utils.{IntEncoder, CoreUtils, TestUtils}
+import kafka.utils.{CoreUtils, TestUtils}
 import kafka.zk.ZooKeeperTestHarness
 import kafka.common._
-import kafka.producer.{KeyedMessage, Producer}
-import kafka.serializer.StringEncoder
-
 import java.io.File
 
-import org.junit.{Test, After, Before}
+import org.apache.kafka.clients.producer.{KafkaProducer, ProducerRecord}
+import org.apache.kafka.common.serialization.{IntegerSerializer, StringSerializer}
+import org.junit.{After, Before, Test}
 import org.junit.Assert._
 
 class LogRecoveryTest extends ZooKeeperTestHarness {
@@ -54,7 +53,7 @@ class LogRecoveryTest extends ZooKeeperTestHarness {
 
   val message = "hello"
 
-  var producer: Producer[Int, String] = null
+  var producer: KafkaProducer[Integer, String] = null
   def hwFile1 = new OffsetCheckpoint(new File(configProps1.logDirs(0), ReplicaManager.HighWatermarkFilename))
   def hwFile2 = new OffsetCheckpoint(new File(configProps2.logDirs(0), ReplicaManager.HighWatermarkFilename))
   var servers = Seq.empty[KafkaServer]
@@ -64,16 +63,19 @@ class LogRecoveryTest extends ZooKeeperTestHarness {
   def updateProducer() = {
     if (producer != null)
       producer.close()
-    producer = TestUtils.createProducer[Int, String](TestUtils.getBrokerListStrFromServers(servers),
-      encoder = classOf[StringEncoder].getName,
-      keyEncoder = classOf[IntEncoder].getName)
+    producer = TestUtils.createNewProducer(
+      TestUtils.getBrokerListStrFromServers(servers),
+      retries = 5,
+      keySerializer = new IntegerSerializer,
+      valueSerializer = new StringSerializer
+    )
   }
 
   @Before
   override def setUp() {
     super.setUp()
 
-    configs = TestUtils.createBrokerConfigs(2, zkConnect, false).map(KafkaConfig.fromProps(_, overridingProps))
+    configs = TestUtils.createBrokerConfigs(2, zkConnect, enableControlledShutdown = false).map(KafkaConfig.fromProps(_, overridingProps))
 
     // start both servers
     server1 = TestUtils.createServer(configProps1)
@@ -230,7 +232,6 @@ class LogRecoveryTest extends ZooKeeperTestHarness {
   }
 
   private def sendMessages(n: Int = 1) {
-    for(i <- 0 until n)
-      producer.send(new KeyedMessage[Int, String](topic, 0, message))
+    (0 until n).map(_ => producer.send(new ProducerRecord(topic, 0, message))).foreach(_.get)
   }
 }

http://git-wip-us.apache.org/repos/asf/kafka/blob/e89a9ce1/core/src/test/scala/unit/kafka/server/ServerShutdownTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/server/ServerShutdownTest.scala b/core/src/test/scala/unit/kafka/server/ServerShutdownTest.scala
index 8f081b9..67f62d9 100755
--- a/core/src/test/scala/unit/kafka/server/ServerShutdownTest.scala
+++ b/core/src/test/scala/unit/kafka/server/ServerShutdownTest.scala
@@ -18,15 +18,14 @@ package kafka.server
 
 import kafka.zk.ZooKeeperTestHarness
 import kafka.consumer.SimpleConsumer
-import kafka.producer._
-import kafka.utils.{IntEncoder, TestUtils, CoreUtils}
+import kafka.utils.{CoreUtils, TestUtils}
 import kafka.utils.TestUtils._
 import kafka.api.FetchRequestBuilder
 import kafka.message.ByteBufferMessageSet
-import kafka.serializer.StringEncoder
-
 import java.io.File
 
+import org.apache.kafka.clients.producer.{KafkaProducer, ProducerRecord}
+import org.apache.kafka.common.serialization.{IntegerSerializer, StringSerializer}
 import org.junit.{Before, Test}
 import org.junit.Assert._
 
@@ -46,27 +45,34 @@ class ServerShutdownTest extends ZooKeeperTestHarness {
 
   @Test
   def testCleanShutdown() {
+
+    def createProducer(server: KafkaServer): KafkaProducer[Integer, String] =
+      TestUtils.createNewProducer(
+        TestUtils.getBrokerListStrFromServers(Seq(server)),
+        retries = 5,
+        keySerializer = new IntegerSerializer,
+        valueSerializer = new StringSerializer
+      )
+
     var server = new KafkaServer(config, threadNamePrefix = Option(this.getClass.getName))
     server.startup()
-    var producer = TestUtils.createProducer[Int, String](TestUtils.getBrokerListStrFromServers(Seq(server)),
-      encoder = classOf[StringEncoder].getName,
-      keyEncoder = classOf[IntEncoder].getName)
+    var producer = createProducer(server)
 
     // create topic
     createTopic(zkUtils, topic, numPartitions = 1, replicationFactor = 1, servers = Seq(server))
 
     // send some messages
-    producer.send(sent1.map(m => new KeyedMessage[Int, String](topic, 0, m)):_*)
+    sent1.map(value => producer.send(new ProducerRecord(topic, 0, value))).foreach(_.get)
 
     // do a clean shutdown and check that offset checkpoint file exists
     server.shutdown()
-    for(logDir <- config.logDirs) {
+    for (logDir <- config.logDirs) {
       val OffsetCheckpointFile = new File(logDir, server.logManager.RecoveryPointCheckpointFile)
       assertTrue(OffsetCheckpointFile.exists)
       assertTrue(OffsetCheckpointFile.length() > 0)
     }
     producer.close()
-    
+
     /* now restart the server and check that the written data is still readable and everything still works */
     server = new KafkaServer(config)
     server.startup()
@@ -74,13 +80,11 @@ class ServerShutdownTest extends ZooKeeperTestHarness {
     // wait for the broker to receive the update metadata request after startup
     TestUtils.waitUntilMetadataIsPropagated(Seq(server), topic, 0)
 
-    producer = TestUtils.createProducer[Int, String](TestUtils.getBrokerListStrFromServers(Seq(server)),
-      encoder = classOf[StringEncoder].getName,
-      keyEncoder = classOf[IntEncoder].getName)
+    producer = createProducer(server)
     val consumer = new SimpleConsumer(host, server.boundPort(), 1000000, 64*1024, "")
 
     var fetchedMessage: ByteBufferMessageSet = null
-    while(fetchedMessage == null || fetchedMessage.validBytes == 0) {
+    while (fetchedMessage == null || fetchedMessage.validBytes == 0) {
       val fetched = consumer.fetch(new FetchRequestBuilder().addFetch(topic, 0, 0, 10000).maxWait(0).build())
       fetchedMessage = fetched.messageSet(topic, 0)
     }
@@ -88,10 +92,10 @@ class ServerShutdownTest extends ZooKeeperTestHarness {
     val newOffset = fetchedMessage.last.nextOffset
 
     // send some more messages
-    producer.send(sent2.map(m => new KeyedMessage[Int, String](topic, 0, m)):_*)
+    sent2.map(value => producer.send(new ProducerRecord(topic, 0, value))).foreach(_.get)
 
     fetchedMessage = null
-    while(fetchedMessage == null || fetchedMessage.validBytes == 0) {
+    while (fetchedMessage == null || fetchedMessage.validBytes == 0) {
       val fetched = consumer.fetch(new FetchRequestBuilder().addFetch(topic, 0, newOffset, 10000).build())
       fetchedMessage = fetched.messageSet(topic, 0)
     }

http://git-wip-us.apache.org/repos/asf/kafka/blob/e89a9ce1/core/src/test/scala/unit/kafka/utils/TestUtils.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/utils/TestUtils.scala b/core/src/test/scala/unit/kafka/utils/TestUtils.scala
index 49fb85f..7b3e955 100755
--- a/core/src/test/scala/unit/kafka/utils/TestUtils.scala
+++ b/core/src/test/scala/unit/kafka/utils/TestUtils.scala
@@ -42,14 +42,15 @@ import kafka.consumer.{ConsumerConfig, ConsumerTimeoutException, KafkaStream}
 import kafka.serializer.{DefaultEncoder, Encoder, StringEncoder}
 import kafka.common.TopicAndPartition
 import kafka.admin.AdminUtils
-import kafka.producer.ProducerConfig
 import kafka.log._
 import kafka.utils.ZkUtils._
 import org.junit.Assert._
-import org.apache.kafka.clients.producer.KafkaProducer
-import org.apache.kafka.clients.consumer.{KafkaConsumer, RangeAssignor}
+import org.apache.kafka.clients.producer.{KafkaProducer, ProducerConfig, ProducerRecord}
+import org.apache.kafka.clients.consumer.{Consumer, ConsumerRecord, KafkaConsumer, RangeAssignor}
 import org.apache.kafka.clients.CommonClientConfigs
 import org.apache.kafka.common.network.Mode
+import org.apache.kafka.common.record.CompressionType
+import org.apache.kafka.common.serialization.{ByteArraySerializer, Serializer}
 
 import scala.collection.Map
 import scala.collection.JavaConversions._
@@ -342,7 +343,7 @@ object TestUtils extends Logging {
 
     // check if the actual iterator was longer
     if (actual.hasNext) {
-      var length2 = length;
+      var length2 = length
       while (actual.hasNext) {
         actual.next
         length2 += 1
@@ -419,6 +420,7 @@ object TestUtils extends Logging {
    * Create a producer with a few pre-configured properties.
    * If certain properties need to be overridden, they can be provided in producerProps.
    */
+  @deprecated("This method has been deprecated and it will be removed in a future release.", "0.10.0.0")
   def createProducer[K, V](brokerList: String,
                            encoder: String = classOf[DefaultEncoder].getName,
                            keyEncoder: String = classOf[DefaultEncoder].getName,
@@ -433,7 +435,7 @@ object TestUtils extends Logging {
     props.put("serializer.class", encoder)
     props.put("key.serializer.class", keyEncoder)
     props.put("partitioner.class", partitioner)
-    new Producer[K, V](new ProducerConfig(props))
+    new Producer[K, V](new kafka.producer.ProducerConfig(props))
   }
 
   private def securityConfigs(mode: Mode,
@@ -453,16 +455,18 @@ object TestUtils extends Logging {
   /**
    * Create a (new) producer with a few pre-configured properties.
    */
-  def createNewProducer(brokerList: String,
+  def createNewProducer[K, V](brokerList: String,
                         acks: Int = -1,
                         maxBlockMs: Long = 60 * 1000L,
                         bufferSize: Long = 1024L * 1024L,
                         retries: Int = 0,
                         lingerMs: Long = 0,
+                        requestTimeoutMs: Long = 10 * 1024L,
                         securityProtocol: SecurityProtocol = SecurityProtocol.PLAINTEXT,
                         trustStoreFile: Option[File] = None,
-                        props: Option[Properties] = None): KafkaProducer[Array[Byte], Array[Byte]] = {
-    import org.apache.kafka.clients.producer.ProducerConfig
+                        keySerializer: Serializer[K] = new ByteArraySerializer,
+                        valueSerializer: Serializer[V] = new ByteArraySerializer,
+                        props: Option[Properties] = None): KafkaProducer[K, V] = {
 
     val producerProps = props.getOrElse(new Properties)
     producerProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, brokerList)
@@ -470,15 +474,15 @@ object TestUtils extends Logging {
     producerProps.put(ProducerConfig.MAX_BLOCK_MS_CONFIG, maxBlockMs.toString)
     producerProps.put(ProducerConfig.BUFFER_MEMORY_CONFIG, bufferSize.toString)
     producerProps.put(ProducerConfig.RETRIES_CONFIG, retries.toString)
+    producerProps.put(ProducerConfig.REQUEST_TIMEOUT_MS_CONFIG, requestTimeoutMs.toString)
 
     /* Only use these if not already set */
     val defaultProps = Map(
       ProducerConfig.RETRY_BACKOFF_MS_CONFIG -> "100",
       ProducerConfig.RECONNECT_BACKOFF_MS_CONFIG -> "200",
-      ProducerConfig.LINGER_MS_CONFIG -> lingerMs.toString,
-      ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG -> "org.apache.kafka.common.serialization.ByteArraySerializer",
-      ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG -> "org.apache.kafka.common.serialization.ByteArraySerializer"
+      ProducerConfig.LINGER_MS_CONFIG -> lingerMs.toString
     )
+
     defaultProps.foreach { case (key, value) =>
       if (!producerProps.containsKey(key)) producerProps.put(key, value)
     }
@@ -489,10 +493,10 @@ object TestUtils extends Logging {
      * invoke it before this call in IntegrationTestHarness, otherwise the
      * SSL client auth fails.
      */
-    if(!producerProps.containsKey(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG))
+    if (!producerProps.containsKey(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG))
       producerProps.putAll(producerSecurityConfigs(securityProtocol, trustStoreFile))
 
-    new KafkaProducer[Array[Byte],Array[Byte]](producerProps)
+    new KafkaProducer[K, V](producerProps, keySerializer, valueSerializer)
   }
 
   private def usesSslTransportLayer(securityProtocol: SecurityProtocol): Boolean = securityProtocol match {
@@ -558,8 +562,6 @@ object TestUtils extends Logging {
     props.put("request.timeout.ms", "2000")
     props.put("request.required.acks", "-1")
     props.put("send.buffer.bytes", "65536")
-    props.put("connect.timeout.ms", "100000")
-    props.put("reconnect.interval", "10000")
 
     props
   }
@@ -620,23 +622,25 @@ object TestUtils extends Logging {
   /**
    * Create a wired format request based on simple basic information
    */
+  @deprecated("This method has been deprecated and it will be removed in a future release", "0.10.0.0")
   def produceRequest(topic: String,
                      partition: Int,
                      message: ByteBufferMessageSet,
-                     acks: Int = SyncProducerConfig.DefaultRequiredAcks,
-                     timeout: Int = SyncProducerConfig.DefaultAckTimeoutMs,
+                     acks: Int,
+                     timeout: Int,
                      correlationId: Int = 0,
-                     clientId: String = SyncProducerConfig.DefaultClientId): ProducerRequest = {
+                     clientId: String): ProducerRequest = {
     produceRequestWithAcks(Seq(topic), Seq(partition), message, acks, timeout, correlationId, clientId)
   }
 
+  @deprecated("This method has been deprecated and it will be removed in a future release", "0.10.0.0")
   def produceRequestWithAcks(topics: Seq[String],
                              partitions: Seq[Int],
                              message: ByteBufferMessageSet,
-                             acks: Int = SyncProducerConfig.DefaultRequiredAcks,
-                             timeout: Int = SyncProducerConfig.DefaultAckTimeoutMs,
+                             acks: Int,
+                             timeout: Int,
                              correlationId: Int = 0,
-                             clientId: String = SyncProducerConfig.DefaultClientId): ProducerRequest = {
+                             clientId: String): ProducerRequest = {
     val data = topics.flatMap(topic =>
       partitions.map(partition => (TopicAndPartition(topic,  partition), message))
     )
@@ -889,6 +893,8 @@ object TestUtils extends Logging {
                    time = time,
                    brokerState = new BrokerState())
   }
+
+  @deprecated("This method has been deprecated and it will be removed in a future release.", "0.10.0.0")
   def sendMessages(servers: Seq[KafkaServer],
                    topic: String,
                    numMessages: Int,
@@ -908,7 +914,7 @@ object TestUtils extends Logging {
           partitioner = classOf[FixedValuePartitioner].getName,
           producerProps = props)
 
-      producer.send(ms.map(m => new KeyedMessage[Int, String](topic, partition, m)):_*)
+      producer.send(ms.map(m => new KeyedMessage[Int, String](topic, partition, m)): _*)
       debug("Sent %d messages for partition [%s,%d]".format(ms.size, topic, partition))
       producer.close()
       ms.toList
@@ -920,24 +926,43 @@ object TestUtils extends Logging {
         keyEncoder = classOf[StringEncoder].getName,
         partitioner = classOf[DefaultPartitioner].getName,
         producerProps = props)
-      producer.send(ms.map(m => new KeyedMessage[String, String](topic, topic, m)):_*)
+      producer.send(ms.map(m => new KeyedMessage[String, String](topic, topic, m)): _*)
       producer.close()
       debug("Sent %d messages for topic [%s]".format(ms.size, topic))
       ms.toList
     }
-
   }
 
-  def sendMessage(servers: Seq[KafkaServer],
-                  topic: String,
-                  message: String) = {
+  def produceMessages(servers: Seq[KafkaServer],
+                      topic: String,
+                      numMessages: Int): Seq[String] = {
+
+    val producer = createNewProducer(
+      TestUtils.getBrokerListStrFromServers(servers),
+      retries = 5,
+      requestTimeoutMs = 2000
+    )
+
+    val values = (0 until numMessages).map(x => s"test-$x")
+    
+    val futures = values.map { value =>
+      producer.send(new ProducerRecord(topic, null, null, value.getBytes))
+    }
+    futures.foreach(_.get)
+    producer.close()
+
+    debug(s"Sent ${values.size} messages for topic [$topic]")
 
-    val producer: Producer[String, String] =
-      createProducer(TestUtils.getBrokerListStrFromServers(servers),
-        encoder = classOf[StringEncoder].getName(),
-        keyEncoder = classOf[StringEncoder].getName())
+    values
+  }
 
-    producer.send(new KeyedMessage[String, String](topic, topic, message))
+  def produceMessage(servers: Seq[KafkaServer], topic: String, message: String) {
+    val producer = createNewProducer(
+      TestUtils.getBrokerListStrFromServers(servers),
+      retries = 5,
+      requestTimeoutMs = 2000
+    )
+    producer.send(new ProducerRecord(topic, topic.getBytes, message.getBytes)).get
     producer.close()
   }
 
@@ -1056,18 +1081,14 @@ class IntEncoder(props: VerifiableProperties = null) extends Encoder[Int] {
   override def toBytes(n: Int) = n.toString.getBytes
 }
 
-class StaticPartitioner(props: VerifiableProperties = null) extends Partitioner{
+@deprecated("This class is deprecated and it will be removed in a future release.", "0.10.0.0")
+class StaticPartitioner(props: VerifiableProperties = null) extends Partitioner {
   def partition(data: Any, numPartitions: Int): Int = {
     (data.asInstanceOf[String].length % numPartitions)
   }
 }
 
-class HashPartitioner(props: VerifiableProperties = null) extends Partitioner {
-  def partition(data: Any, numPartitions: Int): Int = {
-    (data.hashCode % numPartitions)
-  }
-}
-
+@deprecated("This class has been deprecated and it will be removed in a future release.", "0.10.0.0")
 class FixedValuePartitioner(props: VerifiableProperties = null) extends Partitioner {
   def partition(data: Any, numPartitions: Int): Int = data.asInstanceOf[Int]
 }