You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@samza.apache.org by cr...@apache.org on 2015/01/28 02:24:35 UTC

samza git commit: SAMZA-227; upgrade kafka producer to new java client

Repository: samza
Updated Branches:
  refs/heads/master 8ae92c6bd -> fa2c04f58


SAMZA-227; upgrade kafka producer to new java client


Project: http://git-wip-us.apache.org/repos/asf/samza/repo
Commit: http://git-wip-us.apache.org/repos/asf/samza/commit/fa2c04f5
Tree: http://git-wip-us.apache.org/repos/asf/samza/tree/fa2c04f5
Diff: http://git-wip-us.apache.org/repos/asf/samza/diff/fa2c04f5

Branch: refs/heads/master
Commit: fa2c04f581d89c00ccc9fe96b4216bd804c5c403
Parents: 8ae92c6
Author: Navina Ramesh <na...@gmail.com>
Authored: Tue Jan 27 16:53:45 2015 -0800
Committer: Chris Riccomini <cr...@criccomi-mn.linkedin.biz>
Committed: Tue Jan 27 16:53:45 2015 -0800

----------------------------------------------------------------------
 build.gradle                                    |   2 +
 .../versioned/jobs/configuration-table.html     |  31 +--
 gradle/dependency-versions.gradle               |   2 +-
 .../kafka/KafkaCheckpointManager.scala          |   9 +-
 .../kafka/KafkaCheckpointManagerFactory.scala   |  20 +-
 .../org/apache/samza/config/KafkaConfig.scala   |  51 +++-
 .../samza/system/kafka/KafkaSystemFactory.scala |  31 +--
 .../system/kafka/KafkaSystemProducer.scala      | 157 +++++++-----
 .../kafka/KafkaSystemProducerMetrics.scala      |  17 +-
 .../scala/org/apache/samza/util/KafkaUtil.scala |  12 +-
 .../kafka/TestKafkaCheckpointManager.scala      |  23 +-
 .../apache/samza/config/TestKafkaConfig.scala   |   4 +-
 .../samza/system/kafka/MockKafkaProducer.java   | 226 +++++++++++++++++
 .../system/kafka/TestKafkaSystemAdmin.scala     |  22 +-
 .../system/kafka/TestKafkaSystemFactory.scala   |   2 +-
 .../system/kafka/TestKafkaSystemProducer.scala  | 243 ++++++++++---------
 .../scala/org/apache/samza/utils/TestUtils.java | 112 +++++++++
 .../src/main/config/negate-number.properties    |  11 +-
 .../test/integration/TestStatefulTask.scala     |  28 ++-
 19 files changed, 709 insertions(+), 294 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/samza/blob/fa2c04f5/build.gradle
----------------------------------------------------------------------
diff --git a/build.gradle b/build.gradle
index e119db1..5e64b4a 100644
--- a/build.gradle
+++ b/build.gradle
@@ -155,9 +155,11 @@ project(":samza-kafka_$scalaVersion") {
     compile "org.apache.zookeeper:zookeeper:$zookeeperVersion"
     compile "org.codehaus.jackson:jackson-mapper-asl:$jacksonVersion"
     compile "org.apache.kafka:kafka_$scalaVersion:$kafkaVersion"
+    compile "org.apache.kafka:kafka-clients:$kafkaVersion"
     testCompile "org.apache.kafka:kafka_$scalaVersion:$kafkaVersion:test"
     testCompile "junit:junit:$junitVersion"
     testCompile "org.mockito:mockito-all:$mockitoVersion"
+    testCompile "org.scalatest:scalatest_$scalaVersion:$scalaTestVersion"
 
     // Logging in tests is good.
     testRuntime "org.slf4j:slf4j-simple:$slf4jVersion"

http://git-wip-us.apache.org/repos/asf/samza/blob/fa2c04f5/docs/learn/documentation/versioned/jobs/configuration-table.html
----------------------------------------------------------------------
diff --git a/docs/learn/documentation/versioned/jobs/configuration-table.html b/docs/learn/documentation/versioned/jobs/configuration-table.html
index 4ccc0e7..008fc78 100644
--- a/docs/learn/documentation/versioned/jobs/configuration-table.html
+++ b/docs/learn/documentation/versioned/jobs/configuration-table.html
@@ -675,9 +675,12 @@
                 </tr>
 
                 <tr>
-                    <td class="property" id="systems-samza-producer-metadata-broker-list">systems.<span class="system">system-name</span>.<br>producer.metadata.broker.list</td>
+                    <td class="property" id="systems-samza-producer-bootstrap-servers">systems.<span class="system">system-name</span>.<br>producer.bootstrap.servers</td>
                     <td class="default"></td>
                     <td class="description">
+                        <b>Note</b>:
+                        <i>This variable was previously defined as "producer.metadata.broker.list", which has been deprecated with this version.</i>
+                        <br />
                         A list of network endpoints where the Kafka brokers are running. This is given as
                         a comma-separated list of <code>hostname:port</code> pairs, for example
                         <code>kafka1.example.com:9092,kafka2.example.com:9092,kafka3.example.com:9092</code>.
@@ -689,34 +692,12 @@
                 </tr>
 
                 <tr>
-                    <td class="property" id="systems-samza-producer-producer-type">systems.<span class="system">system-name</span>.<br>producer.producer.type</td>
-                    <td class="default">sync</td>
-                    <td class="description">
-                        Controls whether messages emitted from a stream processor should be buffered before
-                        they are sent to Kafka. The options are:
-                        <dl>
-                            <dt><code>sync</code></dt>
-                            <dd>Any messages sent to output streams are synchronously flushed to the Kafka brokers
-                                before the next message from an input stream is processed.</dd>
-                            <dt><code>async</code></dt>
-                            <dd>Messages sent to output streams are buffered within the Samza container, and published
-                                to the Kafka brokers as a batch. This setting can increase throughput, but
-                                risks buffered messages being lost if a container abruptly fails. The maximum
-                                number of messages to buffer is controlled with
-                                systems.<span class="system">system-name</span>.producer.batch.num.messages
-                                and the maximum time (in milliseconds) to wait before flushing the buffer is set with
-                                systems.<span class="system">system-name</span>.producer.queue.buffering.max.ms.</dd>
-                        </dl>
-                    </td>
-                </tr>
-
-                <tr>
                     <td class="property" id="systems-samza-producer">systems.<span class="system">system-name</span>.<br>producer.*</td>
                     <td class="default"></td>
                     <td class="description">
-                        Any <a href="http://kafka.apache.org/documentation.html#producerconfigs">Kafka producer configuration</a>
+                        Any <a href="http://kafka.apache.org/documentation.html#newproducerconfigs">Kafka producer configuration</a>
                         can be included here. For example, to change the request timeout, you can set
-                        systems.<span class="system">system-name</span>.producer.request.timeout.ms.
+                        systems.<span class="system">system-name</span>.producer.timeout.ms.
                         (There is no need to configure <code>client.id</code> as it is automatically
                         configured by Samza.)
                     </td>

http://git-wip-us.apache.org/repos/asf/samza/blob/fa2c04f5/gradle/dependency-versions.gradle
----------------------------------------------------------------------
diff --git a/gradle/dependency-versions.gradle b/gradle/dependency-versions.gradle
index 44dd426..7bbaa41 100644
--- a/gradle/dependency-versions.gradle
+++ b/gradle/dependency-versions.gradle
@@ -26,7 +26,7 @@
   zkClientVersion = "0.3"
   zookeeperVersion = "3.3.4"
   metricsVersion = "2.2.0"
-  kafkaVersion = "0.8.1.1"
+  kafkaVersion = "0.8.2-beta"
   commonsHttpClientVersion = "3.1"
   leveldbVersion = "1.8"
   rocksdbVersion = "3.5.1"

http://git-wip-us.apache.org/repos/asf/samza/blob/fa2c04f5/samza-kafka/src/main/scala/org/apache/samza/checkpoint/kafka/KafkaCheckpointManager.scala
----------------------------------------------------------------------
diff --git a/samza-kafka/src/main/scala/org/apache/samza/checkpoint/kafka/KafkaCheckpointManager.scala b/samza-kafka/src/main/scala/org/apache/samza/checkpoint/kafka/KafkaCheckpointManager.scala
index 1d5627d..c96394d 100644
--- a/samza-kafka/src/main/scala/org/apache/samza/checkpoint/kafka/KafkaCheckpointManager.scala
+++ b/samza-kafka/src/main/scala/org/apache/samza/checkpoint/kafka/KafkaCheckpointManager.scala
@@ -31,8 +31,6 @@ import kafka.common.TopicExistsException
 import kafka.common.UnknownTopicOrPartitionException
 import kafka.consumer.SimpleConsumer
 import kafka.message.InvalidMessageException
-import kafka.producer.KeyedMessage
-import kafka.producer.Producer
 import kafka.utils.Utils
 import org.I0Itec.zkclient.ZkClient
 import org.apache.samza.SamzaException
@@ -45,6 +43,7 @@ import org.apache.samza.util.ExponentialSleepStrategy
 import org.apache.samza.util.TopicMetadataStore
 import scala.collection.mutable
 import java.util.Properties
+import org.apache.kafka.clients.producer.{Producer, ProducerRecord}
 
 /**
  * Kafka checkpoint manager is used to store checkpoints in a Kafka topic.
@@ -62,7 +61,7 @@ class KafkaCheckpointManager(
   bufferSize: Int,
   fetchSize: Int,
   metadataStore: TopicMetadataStore,
-  connectProducer: () => Producer[Array[Byte], Array[Byte]],
+  connectProducer: () => Producer,
   connectZk: () => ZkClient,
   systemStreamPartitionGrouperFactoryString: String,
   retryBackoff: ExponentialSleepStrategy = new ExponentialSleepStrategy,
@@ -71,7 +70,7 @@ class KafkaCheckpointManager(
   import KafkaCheckpointManager._
 
   var taskNames = Set[TaskName]()
-  var producer: Producer[Array[Byte], Array[Byte]] = null
+  var producer: Producer = null
   var taskNamesToOffsets: Map[TaskName, Checkpoint] = null
 
   var startingOffset: Option[Long] = None // Where to start reading for each subsequent call of readCheckpoint
@@ -121,7 +120,7 @@ class KafkaCheckpointManager(
           producer = connectProducer()
         }
 
-        producer.send(new KeyedMessage(checkpointTopic, key, 0, msg))
+        producer.send(new ProducerRecord(checkpointTopic, 0, key, msg)).get()
         loop.done
       },
 

http://git-wip-us.apache.org/repos/asf/samza/blob/fa2c04f5/samza-kafka/src/main/scala/org/apache/samza/checkpoint/kafka/KafkaCheckpointManagerFactory.scala
----------------------------------------------------------------------
diff --git a/samza-kafka/src/main/scala/org/apache/samza/checkpoint/kafka/KafkaCheckpointManagerFactory.scala b/samza-kafka/src/main/scala/org/apache/samza/checkpoint/kafka/KafkaCheckpointManagerFactory.scala
index f2defbd..0f64c08 100644
--- a/samza-kafka/src/main/scala/org/apache/samza/checkpoint/kafka/KafkaCheckpointManagerFactory.scala
+++ b/samza-kafka/src/main/scala/org/apache/samza/checkpoint/kafka/KafkaCheckpointManagerFactory.scala
@@ -20,7 +20,6 @@
 package org.apache.samza.checkpoint.kafka
 
 import org.apache.samza.util.Logging
-import kafka.producer.Producer
 import kafka.utils.ZKStringSerializer
 import org.I0Itec.zkclient.ZkClient
 import org.apache.samza.SamzaException
@@ -33,6 +32,7 @@ import org.apache.samza.metrics.MetricsRegistry
 import org.apache.samza.util.{ KafkaUtil, ClientUtilTopicMetadataStore }
 import java.util.Properties
 import scala.collection.JavaConversions._
+import org.apache.kafka.clients.producer.KafkaProducer
 
 object KafkaCheckpointManagerFactory {
   /**
@@ -41,17 +41,14 @@ object KafkaCheckpointManagerFactory {
   val CHECKPOINT_LOG_VERSION_NUMBER = 1
 
   val INJECTED_PRODUCER_PROPERTIES = Map(
-    "request.required.acks" -> "-1",
+    "acks" -> "all",
     // Forcibly disable compression because Kafka doesn't support compression
     // on log compacted topics. Details in SAMZA-393.
-    "compression.codec" -> "none",
-    "producer.type" -> "sync",
-    // Subtract one here, because DefaultEventHandler calls messageSendMaxRetries + 1.
-    "message.send.max.retries" -> (Integer.MAX_VALUE - 1).toString)
+    "compression.codec" -> "none")
 
   // Set the checkpoint topic configs to have a very small segment size and
-  // enable log compaction. This keeps job startup time small since there
-  // are fewer useless (overwritten) messages to read from the checkpoint
+  // enable log compaction. This keeps job startup time small since there 
+  // are fewer useless (overwritten) messages to read from the checkpoint 
   // topic.
   def getCheckpointTopicProperties(config: Config) = {
     val segmentBytes = config
@@ -83,7 +80,7 @@ class KafkaCheckpointManagerFactory extends CheckpointManagerFactory with Loggin
     val fetchSize = consumerConfig.fetchMessageMaxBytes // must be > buffer size
 
     val connectProducer = () => {
-      new Producer[Array[Byte], Array[Byte]](producerConfig)
+      new KafkaProducer(producerConfig.getProducerProperties)
     }
     val zkConnect = Option(consumerConfig.zkConnect)
       .getOrElse(throw new SamzaException("no zookeeper.connect defined in config"))
@@ -92,9 +89,8 @@ class KafkaCheckpointManagerFactory extends CheckpointManagerFactory with Loggin
     }
     val jobName = config.getName.getOrElse(throw new SamzaException("Missing job name in configs"))
     val jobId = config.getJobId.getOrElse("1")
-    val brokersListString = Option(producerConfig.brokerList)
-      .getOrElse(throw new SamzaException("No broker list defined in config for %s." format systemName))
-    val metadataStore = new ClientUtilTopicMetadataStore(brokersListString, clientId, socketTimeout)
+    val bootstrapServers = producerConfig.bootsrapServers
+    val metadataStore = new ClientUtilTopicMetadataStore(bootstrapServers, clientId, socketTimeout)
     val checkpointTopic = getTopic(jobName, jobId)
 
     // Find out the SSPGrouperFactory class so it can be included/verified in the key

http://git-wip-us.apache.org/repos/asf/samza/blob/fa2c04f5/samza-kafka/src/main/scala/org/apache/samza/config/KafkaConfig.scala
----------------------------------------------------------------------
diff --git a/samza-kafka/src/main/scala/org/apache/samza/config/KafkaConfig.scala b/samza-kafka/src/main/scala/org/apache/samza/config/KafkaConfig.scala
index e57b8ba..6d54350 100644
--- a/samza-kafka/src/main/scala/org/apache/samza/config/KafkaConfig.scala
+++ b/samza-kafka/src/main/scala/org/apache/samza/config/KafkaConfig.scala
@@ -22,14 +22,15 @@ package org.apache.samza.config
 
 import java.util.regex.Pattern
 
-import org.apache.samza.SamzaException
 import org.apache.samza.util.Util
+import org.apache.samza.util.Logging
 
 import scala.collection.JavaConversions._
 import kafka.consumer.ConsumerConfig
-import java.util.Properties
-import kafka.producer.ProducerConfig
-import java.util.UUID
+import java.util.{Properties, UUID}
+import org.apache.kafka.clients.producer.ProducerConfig
+import org.apache.samza.SamzaException
+import java.util
 import scala.collection.JavaConverters._
 import org.apache.samza.system.kafka.KafkaSystemFactory
 import org.apache.samza.config.SystemConfig.Config2System
@@ -153,10 +154,48 @@ class KafkaConfig(config: Config) extends ScalaMapConfig(config) {
     injectedProps: Map[String, String] = Map()) = {
 
     val subConf = config.subset("systems.%s.producer." format systemName, true)
-    val producerProps = new Properties()
+    val producerProps = new util.HashMap[String, Object]()
     producerProps.putAll(subConf)
     producerProps.put("client.id", clientId)
     producerProps.putAll(injectedProps)
-    new ProducerConfig(producerProps)
+    new KafkaProducerConfig(systemName, clientId, producerProps)
+  }
+}
+
+class KafkaProducerConfig(val systemName: String,
+                          val clientId: String = "",
+                          properties: java.util.Map[String, Object] = new util.HashMap[String, Object]()) extends Logging {
+
+  // Copied from new Kafka API - Workaround until KAFKA-1794 is resolved
+  val RECONNECT_BACKOFF_MS_DEFAULT = 10L
+
+  //Overrides specific to samza-kafka (these are considered as defaults in Samza & can be overridden by user
+  val MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION_DEFAULT: java.lang.Integer = 1.asInstanceOf[Integer]
+  val RETRIES_DEFAULT: java.lang.Integer = Integer.MAX_VALUE
+
+  def getProducerProperties = {
+    val producerProperties: java.util.Map[String, Object] = new util.HashMap[String, Object]()
+    producerProperties.putAll(properties)
+
+    // Always set (new) producer config for max_in_flight_requests_per_connection and retries_config to 1 & INT.MaxValue
+    // so that the producer does not optimistically send batches asychronously and thereby, messing up the ordering of outgoing messages
+    // Retries config is set to Max so that when all attempts fail, Samza also fails the send. We do not have any special handler
+    // for producer failure
+    // DO NOT let caller to override these 2 configs for Kafka
+    producerProperties.put(ProducerConfig.MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION, MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION_DEFAULT)
+    producerProperties.put(ProducerConfig.RETRIES_CONFIG, RETRIES_DEFAULT)
+    producerProperties
+  }
+
+  val reconnectIntervalMs =  Option(properties.get(ProducerConfig.RECONNECT_BACKOFF_MS_CONFIG))
+          .getOrElse(RECONNECT_BACKOFF_MS_DEFAULT).asInstanceOf[Long]
+
+  val bootsrapServers = {
+    if(properties.containsKey("metadata.broker.list"))
+      warn("Kafka producer configuration contains 'metadata.broker.list'. This configuration is deprecated . Samza has been upgraded " +
+             "to use Kafka's new producer API. Please update your configurations based on the documentation at http://kafka.apache.org/documentation.html#newproducerconfigs")
+    Option(properties.get("bootstrap.servers"))
+            .getOrElse(throw new SamzaException("No bootstrap servers defined in config for %s." format systemName))
+            .asInstanceOf[String]
   }
 }

http://git-wip-us.apache.org/repos/asf/samza/blob/fa2c04f5/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemFactory.scala
----------------------------------------------------------------------
diff --git a/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemFactory.scala b/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemFactory.scala
index 4506ea3..38e3815 100644
--- a/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemFactory.scala
+++ b/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemFactory.scala
@@ -19,18 +19,14 @@
 
 package org.apache.samza.system.kafka
 
-
-import java.util.Properties
-
-import kafka.utils.ZKStringSerializer
-import org.I0Itec.zkclient.ZkClient
 import org.apache.samza.util.{Logging, KafkaUtil, ExponentialSleepStrategy, ClientUtilTopicMetadataStore}
 import org.apache.samza.config.Config
 import org.apache.samza.metrics.MetricsRegistry
 import org.apache.samza.config.KafkaConfig.Config2Kafka
-import org.apache.samza.SamzaException
-import kafka.producer.Producer
+import org.apache.kafka.clients.producer.KafkaProducer
 import org.apache.samza.system.SystemFactory
+import org.I0Itec.zkclient.ZkClient
+import kafka.utils.ZKStringSerializer
 
 
 class KafkaSystemFactory extends SystemFactory with Logging {
@@ -40,8 +36,7 @@ class KafkaSystemFactory extends SystemFactory with Logging {
 
     // Kind of goofy to need a producer config for consumers, but we need metadata.
     val producerConfig = config.getKafkaSystemProducerConfig(systemName, clientId)
-    val brokerListString = Option(producerConfig.brokerList)
-      .getOrElse(throw new SamzaException("No broker list defined in config for %s." format systemName))
+    val bootstrapServers = producerConfig.bootsrapServers
     val consumerConfig = config.getKafkaSystemConsumerConfig(systemName, clientId)
 
     val timeout = consumerConfig.socketTimeoutMs
@@ -53,7 +48,7 @@ class KafkaSystemFactory extends SystemFactory with Logging {
     val autoOffsetResetTopics = config.getAutoOffsetResetTopics(systemName)
     val fetchThreshold = config.getConsumerFetchThreshold(systemName).getOrElse("50000").toInt
     val offsetGetter = new GetOffset(autoOffsetResetDefault, autoOffsetResetTopics)
-    val metadataStore = new ClientUtilTopicMetadataStore(brokerListString, clientId, timeout)
+    val metadataStore = new ClientUtilTopicMetadataStore(bootstrapServers, clientId, timeout)
 
     new KafkaSystemConsumer(
       systemName = systemName,
@@ -72,11 +67,7 @@ class KafkaSystemFactory extends SystemFactory with Logging {
   def getProducer(systemName: String, config: Config, registry: MetricsRegistry) = {
     val clientId = KafkaUtil.getClientId("samza-producer", config)
     val producerConfig = config.getKafkaSystemProducerConfig(systemName, clientId)
-    val batchSize = Option(producerConfig.batchNumMessages)
-      .getOrElse(1000)
-    val reconnectIntervalMs = Option(producerConfig.retryBackoffMs)
-      .getOrElse(1000)
-    val getProducer = () => { new Producer[Object, Object](producerConfig) }
+    val getProducer = () => { new KafkaProducer(producerConfig.getProducerProperties) }
     val metrics = new KafkaSystemProducerMetrics(systemName, registry)
 
     // Unlike consumer, no need to use encoders here, since they come for free 
@@ -85,8 +76,7 @@ class KafkaSystemFactory extends SystemFactory with Logging {
 
     new KafkaSystemProducer(
       systemName,
-      batchSize,
-      new ExponentialSleepStrategy(initialDelayMs = reconnectIntervalMs),
+      new ExponentialSleepStrategy(initialDelayMs = producerConfig.reconnectIntervalMs),
       getProducer,
       metrics)
   }
@@ -94,8 +84,7 @@ class KafkaSystemFactory extends SystemFactory with Logging {
   def getAdmin(systemName: String, config: Config) = {
     val clientId = KafkaUtil.getClientId("samza-admin", config)
     val producerConfig = config.getKafkaSystemProducerConfig(systemName, clientId)
-    val brokerListString = Option(producerConfig.brokerList)
-      .getOrElse(throw new SamzaException("No broker list defined in config for %s." format systemName))
+    val bootstrapServers = producerConfig.bootsrapServers
     val consumerConfig = config.getKafkaSystemConsumerConfig(systemName, clientId)
     val timeout = consumerConfig.socketTimeoutMs
     val bufferSize = consumerConfig.socketReceiveBufferBytes
@@ -106,13 +95,13 @@ class KafkaSystemFactory extends SystemFactory with Logging {
     {
        val replicationFactor = config.getChangelogStreamReplicationFactor(storeName).getOrElse("2").toInt
        val changelogInfo = ChangelogInfo(replicationFactor, config.getChangelogKafkaProperties(storeName))
-       info("Creating topic meta information for topic: " + topicName + " with replication factor: " + replicationFactor)
+       info("Creating topic meta information for topic: %s with replication factor: %s" format (topicName, replicationFactor))
        (topicName, changelogInfo)
     }}.toMap
 
     new KafkaSystemAdmin(
       systemName,
-      brokerListString,
+      bootstrapServers,
       timeout,
       bufferSize,
       clientId,

http://git-wip-us.apache.org/repos/asf/samza/blob/fa2c04f5/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemProducer.scala
----------------------------------------------------------------------
diff --git a/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemProducer.scala b/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemProducer.scala
index a0e1ccb..cd1fced 100644
--- a/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemProducer.scala
+++ b/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemProducer.scala
@@ -19,25 +19,32 @@
 
 package org.apache.samza.system.kafka
 
-import scala.collection.mutable.ArrayBuffer
 import org.apache.samza.util.Logging
-import kafka.producer.KeyedMessage
-import kafka.producer.Producer
+import org.apache.kafka.clients.producer.{RecordMetadata, Callback, ProducerRecord, Producer}
 import org.apache.samza.system.SystemProducer
 import org.apache.samza.system.OutgoingMessageEnvelope
 import org.apache.samza.util.ExponentialSleepStrategy
 import org.apache.samza.util.TimerUtils
-
-class KafkaSystemProducer(
-  systemName: String,
-  batchSize: Int,
-  retryBackoff: ExponentialSleepStrategy = new ExponentialSleepStrategy,
-  getProducer: () => Producer[Object, Object],
-  metrics: KafkaSystemProducerMetrics,
-  val clock: () => Long = () => System.currentTimeMillis) extends SystemProducer with Logging with TimerUtils {
-
-  var sourceBuffers = Map[String, ArrayBuffer[KeyedMessage[Object, Object]]]()
-  var producer: Producer[Object, Object] = null
+import org.apache.samza.util.KafkaUtil
+import java.util.concurrent.atomic.{AtomicReference, AtomicBoolean}
+import java.util.{Map => javaMap}
+import org.apache.samza.SamzaException
+import org.apache.kafka.common.errors.RetriableException
+import org.apache.kafka.common.PartitionInfo
+import java.util
+import java.util.concurrent.Future
+
+
+class KafkaSystemProducer(systemName: String,
+                          retryBackoff: ExponentialSleepStrategy = new ExponentialSleepStrategy,
+                          getProducer: () => Producer,
+                          metrics: KafkaSystemProducerMetrics,
+                          val clock: () => Long = () => System.currentTimeMillis) extends SystemProducer with Logging with TimerUtils
+{
+  var producer: Producer = null
+  val latestFuture: javaMap[String, Future[RecordMetadata]] = new util.HashMap[String, Future[RecordMetadata]]()
+  val sendFailed: AtomicBoolean = new AtomicBoolean(false)
+  var exceptionThrown: AtomicReference[Exception] = new AtomicReference[Exception]()
 
   def start() {
   }
@@ -49,59 +56,95 @@ class KafkaSystemProducer(
   }
 
   def register(source: String) {
-    sourceBuffers += source -> ArrayBuffer()
-
-    metrics.setBufferSize(source, () => sourceBuffers(source).size)
+    if(latestFuture.containsKey(source)) {
+      throw new SamzaException("%s is already registered with the %s system producer" format (source, systemName))
+    }
+    latestFuture.put(source, null)
   }
 
   def send(source: String, envelope: OutgoingMessageEnvelope) {
     trace("Enqueueing message: %s, %s." format (source, envelope))
-
-    metrics.sends.inc
-
-    sourceBuffers(source) += new KeyedMessage[Object, Object](
-      envelope.getSystemStream.getStream,
-      envelope.getKey,
-      envelope.getPartitionKey,
-      envelope.getMessage)
-
-    if (sourceBuffers(source).size >= batchSize) {
-      flush(source)
+    if(producer == null) {
+      info("Creating a new producer for system %s." format systemName)
+      producer = getProducer()
+      debug("Created a new producer for system %s." format systemName)
     }
+    // Java-based Kafka producer API requires an "Integer" type partitionKey and does not allow custom overriding of Partitioners
+    // Any kind of custom partitioning has to be done on the client-side
+    val topicName = envelope.getSystemStream.getStream
+    val partitions: java.util.List[PartitionInfo]  = producer.partitionsFor(topicName)
+    val partitionKey = if(envelope.getPartitionKey != null) KafkaUtil.getIntegerPartitionKey(envelope, partitions) else null
+    val record = new ProducerRecord(envelope.getSystemStream.getStream,
+                                    partitionKey,
+                                    envelope.getKey.asInstanceOf[Array[Byte]],
+                                    envelope.getMessage.asInstanceOf[Array[Byte]])
+
+    sendFailed.set(false)
+
+    retryBackoff.run(
+      loop => {
+        if(sendFailed.get()) {
+          throw exceptionThrown.get()
+        }
+        val futureRef: Future[RecordMetadata] =
+          producer.send(record, new Callback {
+            def onCompletion(metadata: RecordMetadata, exception: Exception): Unit = {
+              if (exception == null) {
+                //send was successful. Don't retry
+                metrics.sendSuccess.inc
+              }
+              else {
+                //If there is an exception in the callback, it means that the Kafka producer has exhausted the max-retries
+                //Hence, fail container!
+                exceptionThrown.compareAndSet(null, exception)
+                sendFailed.set(true)
+              }
+            }
+          })
+        latestFuture.put(source, futureRef)
+        metrics.sends.inc
+        if(!sendFailed.get())
+          loop.done
+      },
+      (exception, loop) => {
+        if(exception != null && !exception.isInstanceOf[RetriableException]) {   // Exception is thrown & not retriable
+          debug("Exception detail : ", exception)
+          //Close producer
+          stop()
+          producer = null
+          //Mark loop as done as we are not going to retry
+          loop.done
+          metrics.sendFailed.inc
+          throw new SamzaException("Failed to send message. Exception:\n %s".format(exception))
+        } else {
+          warn("Retrying send messsage due to RetriableException - %s. Turn on debugging to get a full stack trace".format(exception))
+          debug("Exception detail:", exception)
+          metrics.retries.inc
+        }
+      }
+    )
   }
 
   def flush(source: String) {
     updateTimer(metrics.flushMs) {
-      val buffer = sourceBuffers(source)
-      trace("Flushing buffer with size: %s." format buffer.size)
       metrics.flushes.inc
-
-      retryBackoff.run(
-        loop => {
-          if (producer == null) {
-            info("Creating a new producer for system %s." format systemName)
-            producer = getProducer()
-            debug("Created a new producer for system %s." format systemName)
-          }
-
-          producer.send(buffer: _*)
-          loop.done
-          metrics.flushSizes.inc(buffer.size)
-        },
-
-        (exception, loop) => {
-          warn("Triggering a reconnect for %s because connection failed: %s" format (systemName, exception))
-          debug("Exception detail: ", exception)
-          metrics.reconnects.inc
-
-          if (producer != null) {
-            producer.close
-            producer = null
-          }
-        })
-
-      buffer.clear
-      trace("Flushed buffer.")
+      //if latestFuture is null, it probably means that there has been no calls to "send" messages
+      //Hence, nothing to do in flush
+      if(latestFuture.get(source) != null) {
+        while (!latestFuture.get(source).isDone && !sendFailed.get()) {
+          //do nothing
+        }
+        if (sendFailed.get()) {
+          logger.error("Unable to send message from %s to system %s" format(source, systemName))
+          //Close producer
+          stop()
+          producer = null
+          metrics.flushFailed.inc
+          throw new SamzaException("Unable to send message from %s to system %s" format(source, systemName))
+        } else {
+          trace("Flushed %s." format (source))
+        }
+      }
     }
   }
 }

http://git-wip-us.apache.org/repos/asf/samza/blob/fa2c04f5/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemProducerMetrics.scala
----------------------------------------------------------------------
diff --git a/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemProducerMetrics.scala b/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemProducerMetrics.scala
index 7e1383f..8aa73ce 100644
--- a/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemProducerMetrics.scala
+++ b/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemProducerMetrics.scala
@@ -28,15 +28,20 @@ import org.apache.samza.metrics.Gauge
 import org.apache.samza.metrics.MetricsRegistry
 
 class KafkaSystemProducerMetrics(val systemName: String = "unknown", val registry: MetricsRegistry = new MetricsRegistryMap) extends MetricsHelper {
-  val reconnects = newCounter("producer-reconnects")
+  /* Tracks the number of calls made to send in KafkaSystemProducer */
   val sends = newCounter("producer-sends")
+  /* Tracks the number of calls made to flush in KafkaSystemProducer */
   val flushes = newCounter("flushes")
-  val flushSizes = newCounter("flush-sizes")
+  /* Tracks how long the flush call takes to complete */
   val flushMs = newTimer("flush-ms")
-
-  def setBufferSize(source: String, getValue: () => Int) {
-    newGauge("%s-producer-buffer-size" format source, getValue)
-  }
+  /* Tracks the number of times the system producer retries a send request (due to RetriableException) */
+  val retries = newCounter("producer-retries")
+  /* Tracks the number of times flush operation failed */
+  val flushFailed = newCounter("flush-failed")
+  /* Tracks the number of send requests that was failed by the KafkaProducer (due to unrecoverable errors) */
+  val sendFailed = newCounter("producer-send-failed")
+  /* Tracks the number of send requests that was successfully completed by the KafkaProducer */
+  val sendSuccess = newCounter("producer-send-success")
 
   override def getPrefix = systemName + "-"
 }

http://git-wip-us.apache.org/repos/asf/samza/blob/fa2c04f5/samza-kafka/src/main/scala/org/apache/samza/util/KafkaUtil.scala
----------------------------------------------------------------------
diff --git a/samza-kafka/src/main/scala/org/apache/samza/util/KafkaUtil.scala b/samza-kafka/src/main/scala/org/apache/samza/util/KafkaUtil.scala
index f1b7511..2482f23 100644
--- a/samza-kafka/src/main/scala/org/apache/samza/util/KafkaUtil.scala
+++ b/samza-kafka/src/main/scala/org/apache/samza/util/KafkaUtil.scala
@@ -21,7 +21,11 @@ package org.apache.samza.util
 
 import org.apache.samza.config.{Config, ConfigException}
 import org.apache.samza.config.JobConfig.Config2Job
-import java.util.concurrent.atomic.AtomicLong
+import java.util.concurrent.atomic.{AtomicInteger, AtomicLong}
+import org.apache.samza.system.OutgoingMessageEnvelope
+import org.apache.kafka.common.utils.Utils
+import java.util.Random
+import org.apache.kafka.common.PartitionInfo
 
 object KafkaUtil {
   val counter = new AtomicLong(0)
@@ -39,4 +43,10 @@ object KafkaUtil {
         System.currentTimeMillis,
         counter.getAndIncrement)
 
+  private def abs(n: Int) = if(n == Integer.MIN_VALUE) 0 else math.abs(n)
+
+  def getIntegerPartitionKey(envelope: OutgoingMessageEnvelope, partitions: java.util.List[PartitionInfo]): Integer = {
+    val numPartitions = partitions.size
+    abs(envelope.getPartitionKey.hashCode()) % numPartitions
+  }
 }

http://git-wip-us.apache.org/repos/asf/samza/blob/fa2c04f5/samza-kafka/src/test/scala/org/apache/samza/checkpoint/kafka/TestKafkaCheckpointManager.scala
----------------------------------------------------------------------
diff --git a/samza-kafka/src/test/scala/org/apache/samza/checkpoint/kafka/TestKafkaCheckpointManager.scala b/samza-kafka/src/test/scala/org/apache/samza/checkpoint/kafka/TestKafkaCheckpointManager.scala
index 553d6b4..7d4bea8 100644
--- a/samza-kafka/src/test/scala/org/apache/samza/checkpoint/kafka/TestKafkaCheckpointManager.scala
+++ b/samza-kafka/src/test/scala/org/apache/samza/checkpoint/kafka/TestKafkaCheckpointManager.scala
@@ -23,8 +23,7 @@ import kafka.admin.AdminUtils
 import kafka.common.InvalidMessageSizeException
 import kafka.common.UnknownTopicOrPartitionException
 import kafka.message.InvalidMessageException
-import kafka.producer.Producer
-import kafka.producer.ProducerConfig
+
 import kafka.server.KafkaConfig
 import kafka.server.KafkaServer
 import kafka.utils.TestUtils
@@ -35,7 +34,7 @@ import kafka.zk.EmbeddedZookeeper
 
 import org.I0Itec.zkclient.ZkClient
 import org.apache.samza.checkpoint.Checkpoint
-import org.apache.samza.config.MapConfig
+import org.apache.samza.config.{KafkaProducerConfig, MapConfig}
 import org.apache.samza.container.TaskName
 import org.apache.samza.container.grouper.stream.GroupByPartitionFactory
 import org.apache.samza.serializers.CheckpointSerde
@@ -47,6 +46,7 @@ import org.junit.{ AfterClass, BeforeClass, Test }
 
 import scala.collection._
 import scala.collection.JavaConversions._
+import org.apache.kafka.clients.producer.{ProducerConfig, KafkaProducer}
 
 object TestKafkaCheckpointManager {
   val checkpointTopic = "checkpoint-topic"
@@ -69,13 +69,14 @@ object TestKafkaCheckpointManager {
   val props3 = TestUtils.createBrokerConfig(brokerId3, port3)
   props1.put("controlled.shutdown.enable", "true")
 
-  val config = new java.util.Properties()
+  val config = new java.util.HashMap[String, Object]()
   val brokers = "localhost:%d,localhost:%d,localhost:%d" format (port1, port2, port3)
-  config.put("metadata.broker.list", brokers)
-  config.put("producer.type", "sync")
-  config.put("request.required.acks", "-1")
+  config.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, brokers)
+  config.put("acks", "all")
+  config.put(ProducerConfig.MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION, new Integer(1))
+  config.put(ProducerConfig.RETRIES_CONFIG, new Integer(java.lang.Integer.MAX_VALUE-1))
   config.putAll(KafkaCheckpointManagerFactory.INJECTED_PRODUCER_PROPERTIES)
-  val producerConfig = new ProducerConfig(config)
+  val producerConfig = new KafkaProducerConfig("kafka", "i001", config)
   val partition = new Partition(0)
   val cp1 = new Checkpoint(Map(new SystemStreamPartition("kafka", "topic", partition) -> "123"))
   val cp2 = new Checkpoint(Map(new SystemStreamPartition("kafka", "topic", partition) -> "12345"))
@@ -163,7 +164,7 @@ class TestKafkaCheckpointManager {
         fail("Expected a KafkaCheckpointException.")
       } catch {
         case e: KafkaCheckpointException => None
-      }
+        }
       kcm.stop
     }
   }
@@ -177,7 +178,7 @@ class TestKafkaCheckpointManager {
     bufferSize = 64 * 1024,
     fetchSize = 300 * 1024,
     metadataStore = metadataStore,
-    connectProducer = () => new Producer[Array[Byte], Array[Byte]](producerConfig),
+    connectProducer = () => new KafkaProducer(producerConfig.getProducerProperties),
     connectZk = () => new ZkClient(zkConnect, 6000, 6000, ZKStringSerializer),
     systemStreamPartitionGrouperFactoryString = systemStreamPartitionGrouperFactoryString,
     checkpointTopicProperties = KafkaCheckpointManagerFactory.getCheckpointTopicProperties(new MapConfig(Map[String, String]())))
@@ -192,7 +193,7 @@ class TestKafkaCheckpointManager {
     bufferSize = 64 * 1024,
     fetchSize = 300 * 1024,
     metadataStore = metadataStore,
-    connectProducer = () => new Producer[Array[Byte], Array[Byte]](producerConfig),
+    connectProducer = () => new KafkaProducer(producerConfig.getProducerProperties),
     connectZk = () => new ZkClient(zkConnect, 6000, 6000, ZKStringSerializer),
     systemStreamPartitionGrouperFactoryString = systemStreamPartitionGrouperFactoryString,
     serde = new InvalideSerde(exception),

http://git-wip-us.apache.org/repos/asf/samza/blob/fa2c04f5/samza-kafka/src/test/scala/org/apache/samza/config/TestKafkaConfig.scala
----------------------------------------------------------------------
diff --git a/samza-kafka/src/test/scala/org/apache/samza/config/TestKafkaConfig.scala b/samza-kafka/src/test/scala/org/apache/samza/config/TestKafkaConfig.scala
index 0e1c38e..95f76b6 100644
--- a/samza-kafka/src/test/scala/org/apache/samza/config/TestKafkaConfig.scala
+++ b/samza-kafka/src/test/scala/org/apache/samza/config/TestKafkaConfig.scala
@@ -39,7 +39,7 @@ class TestKafkaConfig {
     val props = new Properties
     props.setProperty(" systems.kafka.samza.factory", "org.apache.samza.system.kafka.KafkaSystemFactory")
     props.setProperty("systems.kafka.consumer.zookeeper.connect", "localhost:2181/")
-    props.setProperty("systems.kafka.producer.metadata.broker.list", "localhost:9092")
+    props.setProperty("systems.kafka.producer.bootstrap.servers", "localhost:9092")
 
     val mapConfig = new MapConfig(props.toMap[String, String])
     val kafkaConfig = new KafkaConfig(mapConfig)
@@ -82,7 +82,7 @@ class TestKafkaConfig {
   def testStreamLevelFetchSizeOverride() {
     val props = new Properties
     props.setProperty("systems.kafka.consumer.zookeeper.connect", "localhost:2181/")
-    props.setProperty("systems.kafka.producer.metadata.broker.list", "localhost:9092")
+    props.setProperty("systems.kafka.producer.bootstrap.servers", "localhost:9092")
 
     val mapConfig = new MapConfig(props.toMap[String, String])
     val kafkaConfig = new KafkaConfig(mapConfig)

http://git-wip-us.apache.org/repos/asf/samza/blob/fa2c04f5/samza-kafka/src/test/scala/org/apache/samza/system/kafka/MockKafkaProducer.java
----------------------------------------------------------------------
diff --git a/samza-kafka/src/test/scala/org/apache/samza/system/kafka/MockKafkaProducer.java b/samza-kafka/src/test/scala/org/apache/samza/system/kafka/MockKafkaProducer.java
new file mode 100644
index 0000000..6a2edf6
--- /dev/null
+++ b/samza-kafka/src/test/scala/org/apache/samza/system/kafka/MockKafkaProducer.java
@@ -0,0 +1,226 @@
+package org.apache.samza.system.kafka;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+import java.util.concurrent.FutureTask;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicReferenceArray;
+import org.apache.kafka.clients.producer.Callback;
+import org.apache.kafka.clients.producer.Producer;
+import org.apache.kafka.clients.producer.ProducerRecord;
+import org.apache.kafka.clients.producer.RecordMetadata;
+import org.apache.kafka.common.Cluster;
+import org.apache.kafka.common.Metric;
+import org.apache.kafka.common.PartitionInfo;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.samza.utils.TestUtils;
+
+
+public class MockKafkaProducer implements Producer {
+
+  private Cluster _cluster;
+  private List<FutureTask<RecordMetadata>> _callbacksList = new ArrayList<FutureTask<RecordMetadata>>();
+  private boolean shouldBuffer = false;
+  private boolean errorNext = false;
+  private Exception exception = null;
+  private AtomicInteger msgsSent = new AtomicInteger(0);
+
+  /*
+   * Helps mock out buffered behavior seen in KafkaProducer. This MockKafkaProducer enables you to:
+   *  - Create send that will instantly succeed & return a successful future
+   *  - Set error for the next message that is sent (using errorNext). In this case, the next call to send returns a
+   *    future with exception.
+   *    Please note that errorNext is reset to false once a message send has failed. This means that errorNext has to be
+   *    manually set to true in the unit test, before expecting failure for another message.
+   *  - "shouldBuffer" can be turned on to start buffering messages. This will store all the callbacks and execute it
+   *    at a later point of time in a separate thread. This thread NEEDS to be triggered from the unit test itself
+   *    using "startDelayedSendThread" method
+   *  - "Offset" in RecordMetadata is not guranteed to be correct
+   */
+  public MockKafkaProducer(int numNodes, String topicName, int numPartitions) {
+    this._cluster = TestUtils.clusterWith(numNodes, topicName, numPartitions);
+  }
+
+  public void setShouldBuffer(boolean shouldBuffer) {
+    this.shouldBuffer = shouldBuffer;
+  }
+
+  public void setErrorNext(boolean errorNext, Exception exception) {
+    this.errorNext = errorNext;
+    this.exception = exception;
+  }
+
+  public int getMsgsSent() {
+    return this.msgsSent.get();
+  }
+
+  public Thread startDelayedSendThread(final int sleepTime) {
+    Thread t = new Thread(new Runnable() {
+      @Override
+      public void run() {
+        FutureTask[] callbackArray = new FutureTask[_callbacksList.size()];
+        AtomicReferenceArray<FutureTask> _bufferList = new AtomicReferenceArray<FutureTask>(_callbacksList.toArray(callbackArray));
+        ExecutorService executor = Executors.newFixedThreadPool(10);
+        try {
+          for(int i = 0; i < _bufferList.length(); i++) {
+            Thread.sleep(sleepTime);
+            FutureTask f = _bufferList.get(i);
+            if(!f.isDone()) {
+              executor.submit(f).get();
+            }
+          }
+        } catch (InterruptedException e) {
+          e.printStackTrace();
+        } catch (ExecutionException ee) {
+          ee.printStackTrace();
+        }
+      }
+    });
+    t.start();
+    return t;
+  }
+
+  @Override
+  public Future<RecordMetadata> send(ProducerRecord record) {
+    return send(record, null);
+  }
+
+  private RecordMetadata getRecordMetadata(ProducerRecord record) {
+    return new RecordMetadata(new TopicPartition(record.topic(), record.partition() == null ? 0 : record.partition()), 0, this.msgsSent.get());
+  }
+
+  @Override
+  public Future<RecordMetadata> send(final ProducerRecord record, final Callback callback) {
+    if (errorNext) {
+      if (shouldBuffer) {
+        FutureTask<RecordMetadata> f = new FutureTask<RecordMetadata>(new Callable<RecordMetadata>() {
+          @Override
+          public RecordMetadata call()
+              throws Exception {
+            callback.onCompletion(null, exception);
+            return getRecordMetadata(record);
+          }
+        });
+        _callbacksList.add(f);
+        this.errorNext = false;
+        return f;
+      } else {
+        callback.onCompletion(null, this.exception);
+        this.errorNext = false;
+        return new FutureFailure(this.exception);
+      }
+    } else {
+      if (shouldBuffer) {
+        FutureTask<RecordMetadata> f = new FutureTask<RecordMetadata>(new Callable<RecordMetadata>() {
+          @Override
+          public RecordMetadata call()
+              throws Exception {
+            msgsSent.incrementAndGet();
+            RecordMetadata metadata = getRecordMetadata(record);
+            callback.onCompletion(metadata, null);
+            return metadata;
+          }
+        });
+        _callbacksList.add(f);
+        return f;
+      } else {
+        int offset = msgsSent.incrementAndGet();
+        final RecordMetadata metadata = getRecordMetadata(record);
+        callback.onCompletion(metadata, null);
+        return new FutureSuccess(record, offset);
+      }
+    }
+  }
+
+  @Override
+  public List<PartitionInfo> partitionsFor(String topic) {
+    return this._cluster.partitionsForTopic(topic);
+  }
+
+  @Override
+  public Map<String, ? extends Metric> metrics() {
+    return null;
+  }
+
+  @Override
+  public void close() {
+
+  }
+
+  private static class FutureFailure implements Future<RecordMetadata> {
+
+    private final ExecutionException exception;
+
+    public FutureFailure(Exception exception) {
+      this.exception = new ExecutionException(exception);
+    }
+
+    @Override
+    public boolean cancel(boolean interrupt) {
+      return false;
+    }
+
+    @Override
+    public RecordMetadata get() throws ExecutionException {
+      throw this.exception;
+    }
+
+    @Override
+    public RecordMetadata get(long timeout, TimeUnit unit) throws ExecutionException {
+      throw this.exception;
+    }
+
+    @Override
+    public boolean isCancelled() {
+      return false;
+    }
+
+    @Override
+    public boolean isDone() {
+      return true;
+    }
+  }
+
+  private static class FutureSuccess implements Future<RecordMetadata> {
+
+    private ProducerRecord record;
+    private final RecordMetadata _metadata;
+
+    public FutureSuccess(ProducerRecord record, int offset) {
+      this.record = record;
+      this._metadata = new RecordMetadata(new TopicPartition(record.topic(), record.partition() == null ? 0 : record.partition()), 0, offset);
+    }
+
+    @Override
+    public boolean cancel(boolean interrupt) {
+      return false;
+    }
+
+    @Override
+    public RecordMetadata get() throws ExecutionException {
+      return this._metadata;
+    }
+
+    @Override
+    public RecordMetadata get(long timeout, TimeUnit unit) throws ExecutionException {
+      return this._metadata;
+    }
+
+    @Override
+    public boolean isCancelled() {
+      return false;
+    }
+
+    @Override
+    public boolean isDone() {
+      return true;
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/samza/blob/fa2c04f5/samza-kafka/src/test/scala/org/apache/samza/system/kafka/TestKafkaSystemAdmin.scala
----------------------------------------------------------------------
diff --git a/samza-kafka/src/test/scala/org/apache/samza/system/kafka/TestKafkaSystemAdmin.scala b/samza-kafka/src/test/scala/org/apache/samza/system/kafka/TestKafkaSystemAdmin.scala
index c759a7b..a055204 100644
--- a/samza-kafka/src/test/scala/org/apache/samza/system/kafka/TestKafkaSystemAdmin.scala
+++ b/samza-kafka/src/test/scala/org/apache/samza/system/kafka/TestKafkaSystemAdmin.scala
@@ -27,9 +27,6 @@ import kafka.admin.AdminUtils
 import kafka.common.ErrorMapping
 import kafka.consumer.Consumer
 import kafka.consumer.ConsumerConfig
-import kafka.producer.KeyedMessage
-import kafka.producer.Producer
-import kafka.producer.ProducerConfig
 import kafka.server.KafkaConfig
 import kafka.server.KafkaServer
 import kafka.utils.TestUtils
@@ -50,6 +47,9 @@ import org.junit.Assert._
 import org.junit.{Test, BeforeClass, AfterClass}
 
 import scala.collection.JavaConversions._
+import org.apache.samza.config.KafkaProducerConfig
+import org.apache.kafka.clients.producer.{ProducerRecord, KafkaProducer}
+import java.util
 
 object TestKafkaSystemAdmin {
   val TOPIC = "input"
@@ -70,14 +70,13 @@ object TestKafkaSystemAdmin {
   val props2 = TestUtils.createBrokerConfig(brokerId2, port2)
   val props3 = TestUtils.createBrokerConfig(brokerId3, port3)
 
-  val config = new java.util.Properties()
+  val config = new util.HashMap[String, Object]()
   val brokers = "localhost:%d,localhost:%d,localhost:%d" format (port1, port2, port3)
-  config.put("metadata.broker.list", brokers)
-  config.put("producer.type", "sync")
+  config.put("bootstrap.servers", brokers)
   config.put("request.required.acks", "-1")
   config.put("serializer.class", "kafka.serializer.StringEncoder")
-  val producerConfig = new ProducerConfig(config)
-  var producer: Producer[String, String] = null
+  val producerConfig = new KafkaProducerConfig("kafka", "i001", config)
+  var producer: KafkaProducer = null
   var zookeeper: EmbeddedZookeeper = null
   var server1: KafkaServer = null
   var server2: KafkaServer = null
@@ -91,7 +90,7 @@ object TestKafkaSystemAdmin {
     server2 = TestUtils.createServer(new KafkaConfig(props2))
     server3 = TestUtils.createServer(new KafkaConfig(props3))
     zkClient = new ZkClient(zkConnect + "/", 6000, 6000, ZKStringSerializer)
-    producer = new Producer(producerConfig)
+    producer = new KafkaProducer(producerConfig.getProducerProperties)
     metadataStore = new ClientUtilTopicMetadataStore(brokers, "some-job-name")
   }
 
@@ -143,6 +142,7 @@ object TestKafkaSystemAdmin {
 
   @AfterClass
   def afterCleanLogDirs {
+    producer.close()
     server1.shutdown
     server1.awaitShutdown()
     server2.shutdown
@@ -229,7 +229,7 @@ class TestKafkaSystemAdmin {
 
     // Add a new message to one of the partitions, and verify that it works as 
     // expected.
-    producer.send(new KeyedMessage(TOPIC, "key1", "val1"))
+    producer.send(new ProducerRecord(TOPIC, 48, "key1".getBytes, "val1".getBytes)).get()
     metadata = systemAdmin.getSystemStreamMetadata(Set(TOPIC))
     assertEquals(1, metadata.size)
     val streamName = metadata.keySet.head
@@ -245,7 +245,7 @@ class TestKafkaSystemAdmin {
     assertEquals("0", sspMetadata.get(new Partition(3)).getUpcomingOffset)
 
     // Add a second message to one of the same partition.
-    producer.send(new KeyedMessage(TOPIC, "key1", "val2"))
+    producer.send(new ProducerRecord(TOPIC, 48, "key1".getBytes, "val2".getBytes)).get()
     metadata = systemAdmin.getSystemStreamMetadata(Set(TOPIC))
     assertEquals(1, metadata.size)
     assertEquals(TOPIC, streamName)

http://git-wip-us.apache.org/repos/asf/samza/blob/fa2c04f5/samza-kafka/src/test/scala/org/apache/samza/system/kafka/TestKafkaSystemFactory.scala
----------------------------------------------------------------------
diff --git a/samza-kafka/src/test/scala/org/apache/samza/system/kafka/TestKafkaSystemFactory.scala b/samza-kafka/src/test/scala/org/apache/samza/system/kafka/TestKafkaSystemFactory.scala
index 8067cbf..5f65144 100644
--- a/samza-kafka/src/test/scala/org/apache/samza/system/kafka/TestKafkaSystemFactory.scala
+++ b/samza-kafka/src/test/scala/org/apache/samza/system/kafka/TestKafkaSystemFactory.scala
@@ -66,7 +66,7 @@ class TestKafkaSystemFactory {
     val producerFactory = new KafkaSystemFactory
     val config = new MapConfig(Map[String, String](
       "job.name" -> "test",
-      "systems.test.producer.metadata.broker.list" -> "",
+      "systems.test.producer.bootstrap.servers" -> "",
       "systems.test.samza.key.serde" -> "json",
       "systems.test.samza.msg.serde" -> "json",
       "serializers.registry.json.class" -> "samza.serializers.JsonSerdeFactory"))

http://git-wip-us.apache.org/repos/asf/samza/blob/fa2c04f5/samza-kafka/src/test/scala/org/apache/samza/system/kafka/TestKafkaSystemProducer.scala
----------------------------------------------------------------------
diff --git a/samza-kafka/src/test/scala/org/apache/samza/system/kafka/TestKafkaSystemProducer.scala b/samza-kafka/src/test/scala/org/apache/samza/system/kafka/TestKafkaSystemProducer.scala
index 72b36f7..ca10ea5 100644
--- a/samza-kafka/src/test/scala/org/apache/samza/system/kafka/TestKafkaSystemProducer.scala
+++ b/samza-kafka/src/test/scala/org/apache/samza/system/kafka/TestKafkaSystemProducer.scala
@@ -19,141 +19,152 @@
 
 package org.apache.samza.system.kafka
 
-import java.nio.ByteBuffer
-import java.util.Properties
-import java.util.concurrent.CountDownLatch
-import java.util.concurrent.TimeUnit
-
-import kafka.producer.KeyedMessage
-import kafka.producer.Producer
-import kafka.producer.ProducerConfig
-import kafka.producer.ProducerPool
-import kafka.producer.async.DefaultEventHandler
-import kafka.serializer.Encoder
-
-import org.apache.samza.metrics.MetricsRegistryMap
-import org.apache.samza.system.OutgoingMessageEnvelope
-import org.apache.samza.system.SystemStream
-import org.junit.Assert._
+import org.apache.samza.system.{OutgoingMessageEnvelope, SystemStream}
 import org.junit.Test
 
-import scala.collection.JavaConversions._
+import org.apache.kafka.clients.producer._
+import java.util
+import org.junit.Assert._
+import org.scalatest.Assertions.intercept
+import org.apache.kafka.common.errors.RecordTooLargeException
+import org.apache.samza.SamzaException
 
-class TestKafkaSystemProducer {
 
-  val someMessage = new OutgoingMessageEnvelope(new SystemStream("test", "test"), "test")
+class TestKafkaSystemProducer {
 
-  def getProps = {
-    val props = new Properties
-    props.put("broker.list", "")
-    props.put("metadata.broker.list", "")
-    props
-  }
+  val someMessage = new OutgoingMessageEnvelope(new SystemStream("test", "test"), "test".getBytes)
 
   @Test
   def testKafkaProducer {
-    val props = getProps
-    @volatile var msgsSent = new CountDownLatch(1)
-
-    val producer = new KafkaSystemProducer(systemName = "test", batchSize = 1, getProducer = (() => {
-      new Producer[Object, Object](new ProducerConfig(props)) {
-        override def send(messages: KeyedMessage[Object, Object]*) {
-          msgsSent.countDown
-        }
-      }
-    }), metrics = new KafkaSystemProducerMetrics)
+    val systemProducer = new KafkaSystemProducer(systemName = "test",
+                                           getProducer = () => { new MockProducer(true) },
+                                           metrics = new KafkaSystemProducerMetrics)
+    systemProducer.register("test")
+    systemProducer.start
+    systemProducer.send("test", someMessage)
+    assertEquals(1, systemProducer.producer.asInstanceOf[MockProducer].history().size())
+    systemProducer.stop
+  }
 
-    producer.register("test")
-    producer.start
-    producer.send("test", someMessage)
-    producer.stop
-    msgsSent.await(120, TimeUnit.SECONDS)
-    assertEquals(0, msgsSent.getCount)
+  @Test
+  def testKafkaProducerUsingMockKafkaProducer {
+    val mockProducer = new MockKafkaProducer(1, "test", 1)
+    val systemProducer = new KafkaSystemProducer(systemName = "test",
+                                                 getProducer = () => mockProducer,
+                                                 metrics = new KafkaSystemProducerMetrics)
+    systemProducer.register("test")
+    systemProducer.start()
+    systemProducer.send("test", someMessage)
+    assertEquals(1, mockProducer.getMsgsSent)
+    systemProducer.stop()
   }
 
   @Test
-  def testKafkaProducerBatch {
-    val props = getProps
-    @volatile var msgsSent = 0
-
-    val producer = new KafkaSystemProducer(systemName = "test", batchSize = 2, getProducer = (() => {
-      new Producer[Object, Object](new ProducerConfig(props)) {
-        override def send(messages: KeyedMessage[Object, Object]*) {
-          msgsSent += 1
-        }
-      }
-    }), metrics = new KafkaSystemProducerMetrics)
-
-    // second message should trigger the count down
-    producer.register("test")
-    producer.start
-    producer.send("test", someMessage)
-    assertEquals(0, msgsSent)
-    producer.send("test", someMessage)
-    assertEquals(1, msgsSent)
-    producer.stop
+  def testKafkaProducerBufferedSend {
+    val msg1 = new OutgoingMessageEnvelope(new SystemStream("test", "test"), "a".getBytes)
+    val msg2 = new OutgoingMessageEnvelope(new SystemStream("test", "test"), "b".getBytes)
+    val msg3 = new OutgoingMessageEnvelope(new SystemStream("test", "test"), "c".getBytes)
+
+    val mockProducer = new MockKafkaProducer(1, "test", 1)
+    val systemProducer = new KafkaSystemProducer(systemName = "test",
+                                                 getProducer = () => mockProducer,
+                                                 metrics = new KafkaSystemProducerMetrics)
+    systemProducer.register("test")
+    systemProducer.start()
+    systemProducer.send("test", msg1)
+
+    mockProducer.setShouldBuffer(true)
+    systemProducer.send("test", msg2)
+    systemProducer.send("test", msg3)
+    assertEquals(1, mockProducer.getMsgsSent)
+
+    val sendThread: Thread = mockProducer.startDelayedSendThread(2000)
+    sendThread.join()
+
+    assertEquals(3, mockProducer.getMsgsSent)
+    systemProducer.stop()
   }
 
   @Test
-  def testKafkaProducerFlush {
-    val props = getProps
-    val msgs = scala.collection.mutable.ListBuffer[String]()
-    val msg1 = new OutgoingMessageEnvelope(new SystemStream("test", "test"), "a")
-    val msg2 = new OutgoingMessageEnvelope(new SystemStream("test", "test"), "b")
-
-    val producer = new KafkaSystemProducer(systemName = "test", batchSize = 3, getProducer = (() => {
-      new Producer[Object, Object](new ProducerConfig(props)) {
-        override def send(messages: KeyedMessage[Object, Object]*) {
-          msgs ++= messages.map(_.message.asInstanceOf[String])
-        }
-      }
-    }), metrics = new KafkaSystemProducerMetrics)
-
-    // flush should trigger the count down
-    producer.register("test")
-    producer.start
-    producer.send("test", msg1)
-    producer.send("test", msg2)
-    assertEquals(0, msgs.size)
-    producer.flush("test")
-    assertEquals(2, msgs.size)
-    assertEquals("a", msgs(0))
-    assertEquals("b", msgs(1))
-    producer.stop
+  def testKafkaProducerFlushSuccessful {
+    val msg1 = new OutgoingMessageEnvelope(new SystemStream("test", "test"), "a".getBytes)
+    val msg2 = new OutgoingMessageEnvelope(new SystemStream("test", "test"), "b".getBytes)
+    val msg3 = new OutgoingMessageEnvelope(new SystemStream("test", "test"), "c".getBytes)
+
+    val mockProducer = new MockKafkaProducer(1, "test", 1)
+    val systemProducer = new KafkaSystemProducer(systemName = "test",
+                                                 getProducer = () => mockProducer,
+                                                 metrics = new KafkaSystemProducerMetrics)
+    systemProducer.register("test")
+    systemProducer.start()
+    systemProducer.send("test", msg1)
+
+    mockProducer.setShouldBuffer(true)
+    systemProducer.send("test", msg2)
+    systemProducer.send("test", msg3)
+    assertEquals(1, mockProducer.getMsgsSent)
+    mockProducer.startDelayedSendThread(2000)
+    systemProducer.flush("test")
+    assertEquals(3, mockProducer.getMsgsSent)
+    systemProducer.stop()
   }
 
   @Test
-  def testKafkaSyncProducerExceptions {
-    var msgsSent = 0
-    val props = new Properties
-    val out = new OutgoingMessageEnvelope(new SystemStream("test", "test"), "a")
-    props.put("metadata.broker.list", "")
-    props.put("broker.list", "")
-    props.put("producer.type", "sync")
-
-    var failCount = 0
-    val producer = new KafkaSystemProducer(systemName = "test", batchSize = 1, getProducer = (() => {
-      failCount += 1
-      if (failCount <= 5) {
-        throw new RuntimeException("Pretend to fail in factory")
-      }
-      new Producer[Object, Object](new ProducerConfig(props)) {
-        override def send(messages: KeyedMessage[Object, Object]*) {
-          assertNotNull(messages)
-          assertEquals(1, messages.length)
-          assertEquals("a", messages(0).message)
-          msgsSent += 1
-          if (msgsSent <= 5) {
-            throw new RuntimeException("Pretend to fail in send")
-          }
-        }
-      }
-    }), metrics = new KafkaSystemProducerMetrics)
+  def testKafkaProducerFlushWithException {
+    val msg1 = new OutgoingMessageEnvelope(new SystemStream("test", "test"), "a".getBytes)
+    val msg2 = new OutgoingMessageEnvelope(new SystemStream("test", "test"), "b".getBytes)
+    val msg3 = new OutgoingMessageEnvelope(new SystemStream("test", "test"), "c".getBytes)
+    val msg4 = new OutgoingMessageEnvelope(new SystemStream("test", "test"), "d".getBytes)
+
+    val mockProducer = new MockKafkaProducer(1, "test", 1)
+    val systemProducer = new KafkaSystemProducer(systemName = "test",
+                                                 getProducer = () => mockProducer,
+                                                 metrics = new KafkaSystemProducerMetrics)
+    systemProducer.register("test")
+    systemProducer.start()
+    systemProducer.send("test", msg1)
+
+    mockProducer.setShouldBuffer(true)
+    systemProducer.send("test", msg2)
+    mockProducer.setErrorNext(true, new RecordTooLargeException())
+    systemProducer.send("test", msg3)
+    systemProducer.send("test", msg4)
+
+    assertEquals(1, mockProducer.getMsgsSent)
+    mockProducer.startDelayedSendThread(2000)
+    val thrown = intercept[SamzaException] {
+                                             systemProducer.flush("test")
+                                           }
+    assertTrue(thrown.isInstanceOf[SamzaException])
+    assertEquals(2, mockProducer.getMsgsSent)
+    systemProducer.stop()
+  }
 
+  @Test
+  def testKafkaProducerExceptions {
+    val msg1 = new OutgoingMessageEnvelope(new SystemStream("test", "test"), "a".getBytes)
+    val msg2 = new OutgoingMessageEnvelope(new SystemStream("test", "test"), "b".getBytes)
+    val msg3 = new OutgoingMessageEnvelope(new SystemStream("test", "test"), "c".getBytes)
+    val msg4 = new OutgoingMessageEnvelope(new SystemStream("test", "test"), "d".getBytes)
+
+    val mockProducer = new MockKafkaProducer(1, "test", 1)
+    val producer = new KafkaSystemProducer(systemName =  "test",
+                                           getProducer = () => mockProducer,
+                                           metrics = new KafkaSystemProducerMetrics)
     producer.register("test")
-    producer.start
-    producer.send("test", out)
-    producer.stop
-    assertEquals(6, msgsSent)
+    producer.start()
+    producer.send("test", msg1)
+    producer.send("test", msg2)
+    producer.send("test", msg3)
+    mockProducer.setErrorNext(true, new RecordTooLargeException())
+
+    val thrown = intercept[SamzaException] {
+       producer.send("test", msg4)
+    }
+    assertTrue(thrown.isInstanceOf[SamzaException])
+    assertTrue(thrown.getMessage.contains("RecordTooLargeException"))
+    assertEquals(true, producer.sendFailed.get())
+    assertEquals(3, mockProducer.getMsgsSent)
+    producer.stop()
   }
 }

http://git-wip-us.apache.org/repos/asf/samza/blob/fa2c04f5/samza-kafka/src/test/scala/org/apache/samza/utils/TestUtils.java
----------------------------------------------------------------------
diff --git a/samza-kafka/src/test/scala/org/apache/samza/utils/TestUtils.java b/samza-kafka/src/test/scala/org/apache/samza/utils/TestUtils.java
new file mode 100644
index 0000000..2fa743f
--- /dev/null
+++ b/samza-kafka/src/test/scala/org/apache/samza/utils/TestUtils.java
@@ -0,0 +1,112 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.samza.utils;
+
+import java.io.File;
+import java.io.IOException;
+import java.net.ServerSocket;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Random;
+import org.apache.kafka.common.Cluster;
+import org.apache.kafka.common.Node;
+import org.apache.kafka.common.PartitionInfo;
+
+import static java.util.Arrays.asList;
+
+
+/**
+ * Copied from :kafka-clients API as a workaround until KAFKA-1861 is resolved
+ * Helper functions for writing unit tests
+ */
+public class TestUtils {
+
+    public static File IO_TMP_DIR = new File(System.getProperty("java.io.tmpdir"));
+
+    public static String LETTERS = "ABCDEFGHIJKLMNOPQRSTUVWXYZabcdefghijklmnopqrstuvwxyz";
+    public static String DIGITS = "0123456789";
+    public static String LETTERS_AND_DIGITS = LETTERS + DIGITS;
+
+    /* A consistent random number generator to make tests repeatable */
+    public static final Random seededRandom = new Random(192348092834L);
+    public static final Random random = new Random();
+
+    public static Cluster singletonCluster(String topic, int partitions) {
+        return clusterWith(1, topic, partitions);
+    }
+
+    public static Cluster clusterWith(int nodes, String topic, int partitions) {
+        Node[] ns = new Node[nodes];
+        for (int i = 0; i < nodes; i++)
+            ns[i] = new Node(0, "localhost", 1969);
+        List<PartitionInfo> parts = new ArrayList<PartitionInfo>();
+        for (int i = 0; i < partitions; i++)
+            parts.add(new PartitionInfo(topic, i, ns[i % ns.length], ns, ns));
+        return new Cluster(asList(ns), parts);
+    }
+
+    /**
+     * Choose a number of random available ports
+     */
+    public static int[] choosePorts(int count) {
+        try {
+            ServerSocket[] sockets = new ServerSocket[count];
+            int[] ports = new int[count];
+            for (int i = 0; i < count; i++) {
+                sockets[i] = new ServerSocket(0);
+                ports[i] = sockets[i].getLocalPort();
+            }
+            for (int i = 0; i < count; i++)
+                sockets[i].close();
+            return ports;
+        } catch (IOException e) {
+            throw new RuntimeException(e);
+        }
+    }
+
+    /**
+     * Choose an available port
+     */
+    public static int choosePort() {
+        return choosePorts(1)[0];
+    }
+
+    /**
+     * Generate an array of random bytes
+     * 
+     * @param size The size of the array
+     */
+    public static byte[] randomBytes(int size) {
+        byte[] bytes = new byte[size];
+        seededRandom.nextBytes(bytes);
+        return bytes;
+    }
+
+    /**
+     * Generate a random string of letters and digits of the given length
+     * 
+     * @param len The length of the string
+     * @return The random string
+     */
+    public static String randomString(int len) {
+        StringBuilder b = new StringBuilder();
+        for (int i = 0; i < len; i++)
+            b.append(LETTERS_AND_DIGITS.charAt(seededRandom.nextInt(LETTERS_AND_DIGITS.length())));
+        return b.toString();
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/samza/blob/fa2c04f5/samza-test/src/main/config/negate-number.properties
----------------------------------------------------------------------
diff --git a/samza-test/src/main/config/negate-number.properties b/samza-test/src/main/config/negate-number.properties
index 4989b27..379fa61 100644
--- a/samza-test/src/main/config/negate-number.properties
+++ b/samza-test/src/main/config/negate-number.properties
@@ -43,13 +43,12 @@ systems.kafka.samza.msg.serde=string
 systems.kafka.samza.key.serde=string
 systems.kafka.samza.offset.default=oldest
 systems.kafka.consumer.zookeeper.connect=localhost:2181/
-systems.kafka.producer.compression.codec=gzip
-systems.kafka.producer.metadata.broker.list=localhost:9092
-systems.kafka.producer.request.required.acks=1
-systems.kafka.producer.topic.metadata.refresh.interval.ms=86400000
-systems.kafka.producer.producer.type=sync
+systems.kafka.producer.compression.type=gzip
+systems.kafka.producer.bootstrap.servers=localhost:9092
+systems.kafka.producer.acks=1
+systems.kafka.producer.metadata.max.age.ms=86400000
 # Normally, we'd set this much higher, but we want things to look snappy in the demo.
-systems.kafka.producer.batch.num.messages=1
+systems.kafka.producer.buffer.memory=1000000
 
 # negate-number
 streams.samza-test-topic.consumer.reset.offset=true

http://git-wip-us.apache.org/repos/asf/samza/blob/fa2c04f5/samza-test/src/test/scala/org/apache/samza/test/integration/TestStatefulTask.scala
----------------------------------------------------------------------
diff --git a/samza-test/src/test/scala/org/apache/samza/test/integration/TestStatefulTask.scala b/samza-test/src/test/scala/org/apache/samza/test/integration/TestStatefulTask.scala
index ca25258..6f0eb21 100644
--- a/samza-test/src/test/scala/org/apache/samza/test/integration/TestStatefulTask.scala
+++ b/samza-test/src/test/scala/org/apache/samza/test/integration/TestStatefulTask.scala
@@ -22,16 +22,12 @@ package org.apache.samza.test.integration
 import java.util.Properties
 import java.util.concurrent.CountDownLatch
 import java.util.concurrent.TimeUnit
-import java.util.logging.{Level, LogManager, Handler, Logger}
 
 import kafka.admin.AdminUtils
 import kafka.common.ErrorMapping
 import kafka.consumer.Consumer
 import kafka.consumer.ConsumerConfig
 import kafka.message.MessageAndMetadata
-import kafka.producer.KeyedMessage
-import kafka.producer.Producer
-import kafka.producer.ProducerConfig
 import kafka.server.KafkaConfig
 import kafka.server.KafkaServer
 import kafka.utils.TestUtils
@@ -52,6 +48,7 @@ import org.apache.samza.job.StreamJob
 import org.apache.samza.storage.kv.KeyValueStore
 import org.apache.samza.system.kafka.TopicMetadataCache
 import org.apache.samza.system.{SystemStreamPartition, IncomingMessageEnvelope}
+import org.apache.samza.config.KafkaProducerConfig
 import org.apache.samza.task.InitableTask
 import org.apache.samza.task.MessageCollector
 import org.apache.samza.task.StreamTask
@@ -67,6 +64,9 @@ import scala.collection.JavaConversions._
 import scala.collection.mutable.ArrayBuffer
 import scala.collection.mutable.HashMap
 import scala.collection.mutable.SynchronizedMap
+import org.apache.kafka.clients.producer.{ProducerConfig, Producer, ProducerRecord, KafkaProducer}
+import java.util
+
 
 object TestStatefulTask {
   val INPUT_TOPIC = "input"
@@ -93,14 +93,15 @@ object TestStatefulTask {
   props2.setProperty("auto.create.topics.enable","false")
   props3.setProperty("auto.create.topics.enable","false")
 
-  val config = new java.util.Properties()
+  val config = new util.HashMap[String, Object]()
   val brokers = "localhost:%d,localhost:%d,localhost:%d" format (port1, port2, port3)
-  config.put("metadata.broker.list", brokers)
-  config.put("producer.type", "sync")
+  config.put("bootstrap.servers", brokers)
   config.put("request.required.acks", "-1")
-  config.put("serializer.class", "kafka.serializer.StringEncoder");
-  val producerConfig = new ProducerConfig(config)
-  var producer: Producer[String, String] = null
+  config.put("serializer.class", "kafka.serializer.StringEncoder")
+  config.put(ProducerConfig.MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION, new Integer(1))
+  config.put(ProducerConfig.RETRIES_CONFIG, new Integer(Integer.MAX_VALUE-1))
+  val producerConfig = new KafkaProducerConfig("kafka", "i001", config)
+  var producer: Producer = null
   val cp1 = new Checkpoint(Map(new SystemStreamPartition("kafka", "topic", new Partition(0)) -> "123"))
   val cp2 = new Checkpoint(Map(new SystemStreamPartition("kafka", "topic", new Partition(0)) -> "12345"))
   var zookeeper: EmbeddedZookeeper = null
@@ -116,7 +117,7 @@ object TestStatefulTask {
     server2 = TestUtils.createServer(new KafkaConfig(props2))
     server3 = TestUtils.createServer(new KafkaConfig(props3))
     zkClient = new ZkClient(zkConnect + "/", 6000, 6000, ZKStringSerializer)
-    producer = new Producer(producerConfig)
+    producer = new KafkaProducer(producerConfig.getProducerProperties)
     metadataStore = new ClientUtilTopicMetadataStore(brokers, "some-job-name")
 
     createTopics
@@ -163,6 +164,7 @@ object TestStatefulTask {
 
   @AfterClass
   def afterCleanLogDirs {
+    producer.close()
     server1.shutdown
     server1.awaitShutdown()
     server2.shutdown
@@ -216,7 +218,7 @@ class TestStatefulTask {
     "systems.kafka.consumer.auto.offset.reset" -> "smallest", // applies to an empty topic
     "systems.kafka.samza.msg.serde" -> "string",
     "systems.kafka.consumer.zookeeper.connect" -> zkConnect,
-    "systems.kafka.producer.metadata.broker.list" -> ("localhost:%s" format port1),
+    "systems.kafka.producer.bootstrap.servers" -> ("localhost:%s" format port1),
     // Since using state, need a checkpoint manager
     "task.checkpoint.factory" -> "org.apache.samza.checkpoint.kafka.KafkaCheckpointManagerFactory",
     "task.checkpoint.system" -> "kafka",
@@ -353,7 +355,7 @@ class TestStatefulTask {
    * Send a message to the input topic, and validate that it gets to the test task.
    */
   def send(task: TestTask, msg: String) {
-    producer.send(new KeyedMessage(INPUT_TOPIC, msg))
+    producer.send(new ProducerRecord(INPUT_TOPIC, msg.getBytes)).get()
     task.awaitMessage
     assertEquals(msg, task.received.last)
   }