You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by ju...@apache.org on 2013/03/04 05:22:01 UTC

[24/37] git commit: KAFKA-736 Add an option to the 0.8 producer to mimic 0.7 producer behavior; reviewed by Jun Rao and Sriram Subramanian

KAFKA-736 Add an option to the 0.8 producer to mimic 0.7 producer behavior; reviewed by Jun Rao and Sriram Subramanian


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/4d8fb1ee
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/4d8fb1ee
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/4d8fb1ee

Branch: refs/heads/trunk
Commit: 4d8fb1eebc043fab11c58d3309e93cc83ef24a89
Parents: 828ce83
Author: Neha Narkhede <ne...@gmail.com>
Authored: Fri Feb 22 17:48:10 2013 -0800
Committer: Neha Narkhede <ne...@gmail.com>
Committed: Fri Feb 22 17:48:10 2013 -0800

----------------------------------------------------------------------
 config/log4j.properties                            |    1 +
 .../main/scala/kafka/api/ProducerResponse.scala    |    2 -
 .../main/scala/kafka/network/RequestChannel.scala  |    2 +-
 .../main/scala/kafka/network/SocketServer.scala    |   30 +++++--
 .../scala/kafka/producer/BrokerPartitionInfo.scala |    2 +-
 .../scala/kafka/producer/ConsoleProducer.scala     |   10 ++-
 .../scala/kafka/producer/KafkaLog4jAppender.scala  |   13 ++-
 core/src/main/scala/kafka/producer/Producer.scala  |    2 +-
 .../main/scala/kafka/producer/SyncProducer.scala   |   17 +++-
 .../kafka/producer/async/DefaultEventHandler.scala |   30 ++++---
 core/src/main/scala/kafka/server/KafkaApis.scala   |    9 ++-
 .../unit/kafka/integration/PrimitiveApiTest.scala  |   48 ++++++++---
 .../integration/ProducerConsumerTestHarness.scala  |   14 +---
 .../scala/unit/kafka/log/OffsetIndexTest.scala     |    2 +-
 .../unit/kafka/log4j/KafkaLog4jAppenderTest.scala  |    7 +-
 .../unit/kafka/network/SocketServerTest.scala      |   36 +++++++-
 .../unit/kafka/producer/AsyncProducerTest.scala    |   13 ++--
 .../scala/unit/kafka/producer/ProducerTest.scala   |    3 +-
 .../unit/kafka/producer/SyncProducerTest.scala     |   69 +++++++--------
 .../scala/unit/kafka/server/LogRecoveryTest.scala  |    2 +-
 .../unit/kafka/server/ServerShutdownTest.scala     |    2 +-
 .../test/scala/unit/kafka/utils/TestUtils.scala    |   25 ++++--
 22 files changed, 215 insertions(+), 124 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/4d8fb1ee/config/log4j.properties
----------------------------------------------------------------------
diff --git a/config/log4j.properties b/config/log4j.properties
index e104751..5692da0 100644
--- a/config/log4j.properties
+++ b/config/log4j.properties
@@ -47,6 +47,7 @@ log4j.logger.kafka=INFO, kafkaAppender
 log4j.logger.kafka.network.RequestChannel$=TRACE, requestAppender
 log4j.additivity.kafka.network.RequestChannel$=false
 
+#log4j.logger.kafka.network.Processor=TRACE, requestAppender
 #log4j.logger.kafka.server.KafkaApis=TRACE, requestAppender
 #log4j.additivity.kafka.server.KafkaApis=false
 log4j.logger.kafka.request.logger=TRACE, requestAppender

http://git-wip-us.apache.org/repos/asf/kafka/blob/4d8fb1ee/core/src/main/scala/kafka/api/ProducerResponse.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/api/ProducerResponse.scala b/core/src/main/scala/kafka/api/ProducerResponse.scala
index 743227d..5bff709 100644
--- a/core/src/main/scala/kafka/api/ProducerResponse.scala
+++ b/core/src/main/scala/kafka/api/ProducerResponse.scala
@@ -22,7 +22,6 @@ import scala.collection.Map
 import kafka.common.{TopicAndPartition, ErrorMapping}
 import kafka.api.ApiUtils._
 
-
 object ProducerResponse {
   def readFrom(buffer: ByteBuffer): ProducerResponse = {
     val correlationId = buffer.getInt
@@ -44,7 +43,6 @@ object ProducerResponse {
 
 case class ProducerResponseStatus(error: Short, offset: Long)
 
-
 case class ProducerResponse(correlationId: Int,
                             status: Map[TopicAndPartition, ProducerResponseStatus]) extends RequestOrResponse {
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/4d8fb1ee/core/src/main/scala/kafka/network/RequestChannel.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/network/RequestChannel.scala b/core/src/main/scala/kafka/network/RequestChannel.scala
index 7747ddd..931092d 100644
--- a/core/src/main/scala/kafka/network/RequestChannel.scala
+++ b/core/src/main/scala/kafka/network/RequestChannel.scala
@@ -116,7 +116,7 @@ class RequestChannel(val numProcessors: Int, val queueSize: Int) extends KafkaMe
   }
 
   /** Get the next request or block until there is one */
-  def receiveRequest(): RequestChannel.Request = 
+  def receiveRequest(): RequestChannel.Request =
     requestQueue.take()
 
   /** Get a response for the given processor if there is one */

http://git-wip-us.apache.org/repos/asf/kafka/blob/4d8fb1ee/core/src/main/scala/kafka/network/SocketServer.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/network/SocketServer.scala b/core/src/main/scala/kafka/network/SocketServer.scala
index d5a24f6..648d936 100644
--- a/core/src/main/scala/kafka/network/SocketServer.scala
+++ b/core/src/main/scala/kafka/network/SocketServer.scala
@@ -35,7 +35,7 @@ import kafka.utils._
 class SocketServer(val brokerId: Int,
                    val host: String,
                    val port: Int,
-                   val numProcessorThreads: Int, 
+                   val numProcessorThreads: Int,
                    val maxQueuedRequests: Int,
                    val maxRequestSize: Int = Int.MaxValue) extends Logging {
   this.logIdent = "[Socket Server on Broker " + brokerId + "], "
@@ -206,7 +206,7 @@ private[kafka] class Acceptor(val host: String, val port: Int, private val proce
  * each of which has its own selectors
  */
 private[kafka] class Processor(val id: Int,
-                               val time: Time, 
+                               val time: Time,
                                val maxRequestSize: Int,
                                val requestChannel: RequestChannel) extends AbstractServerThread {
   
@@ -219,7 +219,9 @@ private[kafka] class Processor(val id: Int,
       configureNewConnections()
       // register any new responses for writing
       processNewResponses()
+      val startSelectTime = SystemTime.milliseconds
       val ready = selector.select(300)
+      trace("Processor id " + id + " selection time = " + (SystemTime.milliseconds - startSelectTime) + " ms")
       if(ready > 0) {
         val keys = selector.selectedKeys()
         val iter = keys.iterator()
@@ -259,11 +261,21 @@ private[kafka] class Processor(val id: Int,
   private def processNewResponses() {
     var curr = requestChannel.receiveResponse(id)
     while(curr != null) {
-      trace("Socket server received response to send, registering for write: " + curr)
       val key = curr.request.requestKey.asInstanceOf[SelectionKey]
       try {
-        key.interestOps(SelectionKey.OP_WRITE)
-        key.attach(curr)
+        if(curr.responseSend == null) {
+          // a null response send object indicates that there is no response to send to the client.
+          // In this case, we just want to turn the interest ops to READ to be able to read more pipelined requests
+          // that are sitting in the server's socket buffer
+          trace("Socket server received empty response to send, registering for read: " + curr)
+          key.interestOps(SelectionKey.OP_READ)
+          key.attach(null)
+          curr.request.updateRequestMetrics
+        } else {
+          trace("Socket server received response to send, registering for write: " + curr)
+          key.interestOps(SelectionKey.OP_WRITE)
+          key.attach(curr)
+        }
       } catch {
         case e: CancelledKeyException => {
           debug("Ignoring response for closed socket.")
@@ -298,7 +310,7 @@ private[kafka] class Processor(val id: Int,
   private def configureNewConnections() {
     while(newConnections.size() > 0) {
       val channel = newConnections.poll()
-      debug("Listening to new connection from " + channel.socket.getRemoteSocketAddress)
+      debug("Processor " + id + " listening to new connection from " + channel.socket.getRemoteSocketAddress)
       channel.register(selector, SelectionKey.OP_READ)
     }
   }
@@ -321,10 +333,12 @@ private[kafka] class Processor(val id: Int,
     } else if(receive.complete) {
       val req = RequestChannel.Request(processor = id, requestKey = key, buffer = receive.buffer, startTimeMs = time.milliseconds, remoteAddress = address)
       requestChannel.sendRequest(req)
-      trace("Received request, sending for processing by handler: " + req)
       key.attach(null)
+      // explicitly reset interest ops to not READ, no need to wake up the selector just yet
+      key.interestOps(key.interestOps & (~SelectionKey.OP_READ))
     } else {
       // more reading to be done
+      trace("Did not finish reading, registering for read again on connection " + socketChannel.socket.getRemoteSocketAddress())
       key.interestOps(SelectionKey.OP_READ)
       wakeup()
     }
@@ -344,8 +358,10 @@ private[kafka] class Processor(val id: Int,
     if(responseSend.complete) {
       response.request.updateRequestMetrics()
       key.attach(null)
+      trace("Finished writing, registering for read on connection " + socketChannel.socket.getRemoteSocketAddress())
       key.interestOps(SelectionKey.OP_READ)
     } else {
+      trace("Did not finish writing, registering for write again on connection " + socketChannel.socket.getRemoteSocketAddress())
       key.interestOps(SelectionKey.OP_WRITE)
       wakeup()
     }

http://git-wip-us.apache.org/repos/asf/kafka/blob/4d8fb1ee/core/src/main/scala/kafka/producer/BrokerPartitionInfo.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/producer/BrokerPartitionInfo.scala b/core/src/main/scala/kafka/producer/BrokerPartitionInfo.scala
index b209a97..a0e2b44 100644
--- a/core/src/main/scala/kafka/producer/BrokerPartitionInfo.scala
+++ b/core/src/main/scala/kafka/producer/BrokerPartitionInfo.scala
@@ -80,7 +80,7 @@ class BrokerPartitionInfo(producerConfig: ProducerConfig,
       if(tmd.errorCode == ErrorMapping.NoError){
         topicPartitionInfo.put(tmd.topic, tmd)
       } else
-        warn("Metadata for topic [%s] is erronous: [%s]".format(tmd.topic, tmd), ErrorMapping.exceptionFor(tmd.errorCode))
+        warn("Metadata for topic [%s] is erroneous: [%s]".format(tmd.topic, tmd), ErrorMapping.exceptionFor(tmd.errorCode))
       tmd.partitionsMetadata.foreach(pmd =>{
         if (pmd.errorCode != ErrorMapping.NoError){
           debug("Metadata for topic partition [%s, %d] is errornous: [%s]".format(tmd.topic, pmd.partitionId, pmd), ErrorMapping.exceptionFor(pmd.errorCode))

http://git-wip-us.apache.org/repos/asf/kafka/blob/4d8fb1ee/core/src/main/scala/kafka/producer/ConsoleProducer.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/producer/ConsoleProducer.scala b/core/src/main/scala/kafka/producer/ConsoleProducer.scala
index 8b77465..eebfda6 100644
--- a/core/src/main/scala/kafka/producer/ConsoleProducer.scala
+++ b/core/src/main/scala/kafka/producer/ConsoleProducer.scala
@@ -87,7 +87,12 @@ object ConsoleProducer {
                                   .describedAs("reader_class")
                                   .ofType(classOf[java.lang.String])
                                   .defaultsTo(classOf[LineMessageReader].getName)
-    val propertyOpt = parser.accepts("property", "A mechanism to pass user-defined properties in the form key=value to the message reader. " + 
+    val socketBufferSizeOpt = parser.accepts("socket-buffer-size", "The size of the tcp RECV size.")
+                                  .withRequiredArg
+                                  .describedAs("size")
+                                  .ofType(classOf[java.lang.Integer])
+                                  .defaultsTo(1024*100)
+    val propertyOpt = parser.accepts("property", "A mechanism to pass user-defined properties in the form key=value to the message reader. " +
                                                  "This allows custom configuration for a user-defined message reader.")
                             .withRequiredArg
                             .describedAs("prop")
@@ -116,6 +121,7 @@ object ConsoleProducer {
     val keyEncoderClass = options.valueOf(keyEncoderOpt)
     val valueEncoderClass = options.valueOf(valueEncoderOpt)
     val readerClass = options.valueOf(messageReaderOpt)
+    val socketBuffer = options.valueOf(socketBufferSizeOpt)
     val cmdLineProps = parseLineReaderArgs(options.valuesOf(propertyOpt))
     cmdLineProps.put("topic", topic)
 
@@ -133,7 +139,7 @@ object ConsoleProducer {
     props.put("request.timeout.ms", requestTimeoutMs.toString)
     props.put("key.serializer.class", keyEncoderClass)
     props.put("serializer.class", valueEncoderClass)
-
+    props.put("send.buffer.bytes", socketBuffer.toString)
     val reader = Class.forName(readerClass).newInstance().asInstanceOf[MessageReader[AnyRef, AnyRef]]
     reader.init(System.in, cmdLineProps)
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/4d8fb1ee/core/src/main/scala/kafka/producer/KafkaLog4jAppender.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/producer/KafkaLog4jAppender.scala b/core/src/main/scala/kafka/producer/KafkaLog4jAppender.scala
index af077e0..3d22e6d 100644
--- a/core/src/main/scala/kafka/producer/KafkaLog4jAppender.scala
+++ b/core/src/main/scala/kafka/producer/KafkaLog4jAppender.scala
@@ -32,6 +32,7 @@ class KafkaLog4jAppender extends AppenderSkeleton with Logging {
   var compressionCodec:String = null
   var enqueueTimeout:String = null
   var queueSize:String = null
+  var requiredNumAcks: Int = Int.MaxValue
 
   private var producer: Producer[String, String] = null
 
@@ -40,22 +41,25 @@ class KafkaLog4jAppender extends AppenderSkeleton with Logging {
 
   def getBrokerList:String = brokerList
   def setBrokerList(brokerList: String) { this.brokerList = brokerList }
-  
+
   def getSerializerClass:String = serializerClass
   def setSerializerClass(serializerClass:String) { this.serializerClass = serializerClass }
 
   def getProducerType:String = producerType
   def setProducerType(producerType:String) { this.producerType = producerType }
-  
+
   def getCompressionCodec:String = compressionCodec
   def setCompressionCodec(compressionCodec:String) { this.compressionCodec = compressionCodec }
-  
+
   def getEnqueueTimeout:String = enqueueTimeout
   def setEnqueueTimeout(enqueueTimeout:String) { this.enqueueTimeout = enqueueTimeout }
 
   def getQueueSize:String = queueSize
   def setQueueSize(queueSize:String) { this.queueSize = queueSize }
 
+  def getRequiredNumAcks:Int = requiredNumAcks
+  def setRequiredNumAcks(requiredNumAcks:Int) { this.requiredNumAcks = requiredNumAcks }
+
   override def activateOptions() {
     // check for config parameter validity
     val props = new Properties()
@@ -75,12 +79,13 @@ class KafkaLog4jAppender extends AppenderSkeleton with Logging {
     if(compressionCodec != null) props.put("compression.codec", compressionCodec)
     if(enqueueTimeout != null) props.put("queue.enqueue.timeout.ms", enqueueTimeout)
     if(queueSize != null) props.put("queue.buffering.max.messages", queueSize)
+    if(requiredNumAcks != Int.MaxValue) props.put("request.required.acks", requiredNumAcks.toString)
     val config : ProducerConfig = new ProducerConfig(props)
     producer = new Producer[String, String](config)
     LogLog.debug("Kafka producer connected to " +  config.brokerList)
     LogLog.debug("Logging for topic: " + topic)
   }
-  
+
   override def append(event: LoggingEvent)  {
     val message : String = if( this.layout == null) {
       event.getRenderedMessage

http://git-wip-us.apache.org/repos/asf/kafka/blob/4d8fb1ee/core/src/main/scala/kafka/producer/Producer.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/producer/Producer.scala b/core/src/main/scala/kafka/producer/Producer.scala
index 66638f2..3ded46e 100644
--- a/core/src/main/scala/kafka/producer/Producer.scala
+++ b/core/src/main/scala/kafka/producer/Producer.scala
@@ -26,7 +26,7 @@ import kafka.common.QueueFullException
 import kafka.metrics._
 
 
-class Producer[K,V](config: ProducerConfig,
+class Producer[K,V](val config: ProducerConfig,
                     private val eventHandler: EventHandler[K,V])  // only for unit testing
   extends Logging {
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/4d8fb1ee/core/src/main/scala/kafka/producer/SyncProducer.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/producer/SyncProducer.scala b/core/src/main/scala/kafka/producer/SyncProducer.scala
index 0469a39..306f200 100644
--- a/core/src/main/scala/kafka/producer/SyncProducer.scala
+++ b/core/src/main/scala/kafka/producer/SyncProducer.scala
@@ -62,7 +62,7 @@ class SyncProducer(val config: SyncProducerConfig) extends Logging {
   /**
    * Common functionality for the public send methods
    */
-  private def doSend(request: RequestOrResponse): Receive = {
+  private def doSend(request: RequestOrResponse, readResponse: Boolean = true): Receive = {
     lock synchronized {
       verifyRequest(request)
       getOrMakeConnection()
@@ -70,7 +70,10 @@ class SyncProducer(val config: SyncProducerConfig) extends Logging {
       var response: Receive = null
       try {
         blockingChannel.send(request)
-        response = blockingChannel.receive()
+        if(readResponse)
+          response = blockingChannel.receive()
+        else
+          trace("Skipping reading response")
       } catch {
         case e: java.io.IOException =>
           // no way to tell if write succeeded. Disconnect and re-throw exception to let client handle retry
@@ -83,7 +86,8 @@ class SyncProducer(val config: SyncProducerConfig) extends Logging {
   }
 
   /**
-   * Send a message
+   * Send a message. If the producerRequest had required.request.acks=0, then the
+   * returned response object is null
    */
   def send(producerRequest: ProducerRequest): ProducerResponse = {
     val requestSize = producerRequest.sizeInBytes
@@ -95,10 +99,13 @@ class SyncProducer(val config: SyncProducerConfig) extends Logging {
     val aggregateTimer = producerRequestStats.getProducerRequestAllBrokersStats.requestTimer
     aggregateTimer.time {
       specificTimer.time {
-        response = doSend(producerRequest)
+        response = doSend(producerRequest, if(producerRequest.requiredAcks == 0) false else true)
       }
     }
-    ProducerResponse.readFrom(response.buffer)
+    if(producerRequest.requiredAcks != 0)
+      ProducerResponse.readFrom(response.buffer)
+    else
+      null
   }
 
   def send(request: TopicMetadataRequest): TopicMetadataResponse = {

http://git-wip-us.apache.org/repos/asf/kafka/blob/4d8fb1ee/core/src/main/scala/kafka/producer/async/DefaultEventHandler.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/producer/async/DefaultEventHandler.scala b/core/src/main/scala/kafka/producer/async/DefaultEventHandler.scala
index 5569cc2..ebab1da 100644
--- a/core/src/main/scala/kafka/producer/async/DefaultEventHandler.scala
+++ b/core/src/main/scala/kafka/producer/async/DefaultEventHandler.scala
@@ -243,20 +243,22 @@ class DefaultEventHandler[K,V](config: ProducerConfig,
         val response = syncProducer.send(producerRequest)
         debug("Producer sent messages with correlation id %d for topics %s to broker %d on %s:%d"
           .format(currentCorrelationId, messagesPerTopic.keySet.mkString(","), brokerId, syncProducer.config.host, syncProducer.config.port))
-        if (response.status.size != producerRequest.data.size)
-          throw new KafkaException("Incomplete response (%s) for producer request (%s)"
-            .format(response, producerRequest))
-        if (logger.isTraceEnabled) {
-          val successfullySentData = response.status.filter(_._2.error == ErrorMapping.NoError)
-          successfullySentData.foreach(m => messagesPerTopic(m._1).foreach(message =>
-            trace("Successfully sent message: %s".format(Utils.readString(message.message.payload)))))
-        }
-        failedTopicPartitions = response.status.filter(_._2.error != ErrorMapping.NoError).toSeq
-                                    .map(partitionStatus => partitionStatus._1)
-        if(failedTopicPartitions.size > 0)
-          error("Produce request with correlation id %d failed due to response %s. List of failed topic partitions is %s"
-            .format(currentCorrelationId, response.toString, failedTopicPartitions.mkString(",")))
-        failedTopicPartitions
+        if(response != null) {
+          if (response.status.size != producerRequest.data.size)
+            throw new KafkaException("Incomplete response (%s) for producer request (%s)".format(response, producerRequest))
+          if (logger.isTraceEnabled) {
+            val successfullySentData = response.status.filter(_._2.error == ErrorMapping.NoError)
+            successfullySentData.foreach(m => messagesPerTopic(m._1).foreach(message =>
+              trace("Successfully sent message: %s".format(Utils.readString(message.message.payload)))))
+          }
+          failedTopicPartitions = response.status.filter(_._2.error != ErrorMapping.NoError).toSeq
+            .map(partitionStatus => partitionStatus._1)
+          if(failedTopicPartitions.size > 0)
+            error("Produce request with correlation id %d failed due to response %s. List of failed topic partitions is %s"
+              .format(currentCorrelationId, response.toString, failedTopicPartitions.mkString(",")))
+          failedTopicPartitions
+        } else
+          Seq.empty[TopicAndPartition]
       } catch {
         case t: Throwable =>
           warn("Failed to send producer request with correlation id %d to broker %d with data for partitions %s"

http://git-wip-us.apache.org/repos/asf/kafka/blob/4d8fb1ee/core/src/main/scala/kafka/server/KafkaApis.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/server/KafkaApis.scala b/core/src/main/scala/kafka/server/KafkaApis.scala
index 6df077b..ece1b46 100644
--- a/core/src/main/scala/kafka/server/KafkaApis.scala
+++ b/core/src/main/scala/kafka/server/KafkaApis.scala
@@ -23,13 +23,13 @@ import kafka.message._
 import kafka.network._
 import org.apache.log4j.Logger
 import scala.collection._
-import kafka.network.RequestChannel.Response
 import java.util.concurrent.TimeUnit
 import java.util.concurrent.atomic._
 import kafka.metrics.KafkaMetricsGroup
 import org.I0Itec.zkclient.ZkClient
 import kafka.common._
 import kafka.utils.{ZkUtils, Pool, SystemTime, Logging}
+import kafka.network.RequestChannel.Response
 
 
 /**
@@ -127,8 +127,11 @@ class KafkaApis(val requestChannel: RequestChannel,
     val allPartitionHaveReplicationFactorOne =
       !produceRequest.data.keySet.exists(
         m => replicaManager.getReplicationFactorForPartition(m.topic, m.partition) != 1)
-    if (produceRequest.requiredAcks == 0 ||
-        produceRequest.requiredAcks == 1 ||
+    if(produceRequest.requiredAcks == 0) {
+      // send a fake producer response if producer request.required.acks = 0. This mimics the behavior of a 0.7 producer
+      // and is tuned for very high throughput
+      requestChannel.sendResponse(new RequestChannel.Response(request.processor, request, null))
+    } else if (produceRequest.requiredAcks == 1 ||
         produceRequest.numPartitions <= 0 ||
         allPartitionHaveReplicationFactorOne ||
         numPartitionsInError == produceRequest.numPartitions) {

http://git-wip-us.apache.org/repos/asf/kafka/blob/4d8fb1ee/core/src/test/scala/unit/kafka/integration/PrimitiveApiTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/integration/PrimitiveApiTest.scala b/core/src/test/scala/unit/kafka/integration/PrimitiveApiTest.scala
index 402fced..007e85d 100644
--- a/core/src/test/scala/unit/kafka/integration/PrimitiveApiTest.scala
+++ b/core/src/test/scala/unit/kafka/integration/PrimitiveApiTest.scala
@@ -21,11 +21,7 @@ import java.nio.ByteBuffer
 import junit.framework.Assert._
 import kafka.api.{PartitionFetchInfo, FetchRequest, FetchRequestBuilder}
 import kafka.server.{KafkaRequestHandler, KafkaConfig}
-import java.util.Properties
-import kafka.utils.Utils
 import kafka.producer.{KeyedMessage, Producer, ProducerConfig}
-import kafka.serializer._
-import kafka.utils.TestUtils
 import org.apache.log4j.{Level, Logger}
 import org.I0Itec.zkclient.ZkClient
 import kafka.zk.ZooKeeperTestHarness
@@ -33,6 +29,7 @@ import org.scalatest.junit.JUnit3Suite
 import scala.collection._
 import kafka.admin.{AdminUtils, CreateTopicCommand}
 import kafka.common.{TopicAndPartition, ErrorMapping, UnknownTopicOrPartitionException, OffsetOutOfRangeException}
+import kafka.utils.{TestUtils, Utils}
 
 /**
  * End to end tests of the primitive apis against a local server
@@ -83,9 +80,7 @@ class PrimitiveApiTest extends JUnit3Suite with ProducerConsumerTestHarness with
 
   def testDefaultEncoderProducerAndFetch() {
     val topic = "test-topic"
-    val props = new Properties()
-    props.put("serializer.class", "kafka.serializer.StringEncoder")
-    props.put("broker.list", TestUtils.getBrokerListStrFromConfigs(configs))
+    val props = producer.config.props.props
     val config = new ProducerConfig(props)
 
     val stringProducer1 = new Producer[String, String](config)
@@ -111,9 +106,7 @@ class PrimitiveApiTest extends JUnit3Suite with ProducerConsumerTestHarness with
 
   def testDefaultEncoderProducerAndFetchWithCompression() {
     val topic = "test-topic"
-    val props = new Properties()
-    props.put("serializer.class", classOf[StringEncoder].getName.toString)
-    props.put("broker.list", TestUtils.getBrokerListStrFromConfigs(configs))
+    val props = producer.config.props.props
     props.put("compression", "true")
     val config = new ProducerConfig(props)
 
@@ -272,7 +265,6 @@ class PrimitiveApiTest extends JUnit3Suite with ProducerConsumerTestHarness with
     }
     producer.send(produceList: _*)
 
-    // wait a bit for produced message to be available
     val request = builder.build()
     val response = consumer.fetch(request)
     for( (topic, partition) <- topics) {
@@ -315,6 +307,40 @@ class PrimitiveApiTest extends JUnit3Suite with ProducerConsumerTestHarness with
     assertFalse(fetchResponse.messageSet(newTopic, 0).iterator.hasNext)
   }
 
+  def testPipelinedProduceRequests() {
+    createSimpleTopicsAndAwaitLeader(zkClient, List("test1", "test2", "test3", "test4"), config.brokerId)
+    val props = producer.config.props.props
+    props.put("request.required.acks", "0")
+    val pipelinedProducer: Producer[String, String] = new Producer(new ProducerConfig(props))
+
+    // send some messages
+    val topics = List(("test4", 0), ("test1", 0), ("test2", 0), ("test3", 0));
+    val messages = new mutable.HashMap[String, Seq[String]]
+    val builder = new FetchRequestBuilder()
+    var produceList: List[KeyedMessage[String, String]] = Nil
+    for( (topic, partition) <- topics) {
+      val messageList = List("a_" + topic, "b_" + topic)
+      val producerData = messageList.map(new KeyedMessage[String, String](topic, topic, _))
+      messages += topic -> messageList
+      pipelinedProducer.send(producerData:_*)
+      builder.addFetch(topic, partition, 0, 10000)
+    }
+
+    // wait until the messages are published
+    TestUtils.waitUntilTrue(() => { servers.head.logManager.getLog("test1", 0).get.logEndOffset == 2 }, 1000)
+    TestUtils.waitUntilTrue(() => { servers.head.logManager.getLog("test2", 0).get.logEndOffset == 2 }, 1000)
+    TestUtils.waitUntilTrue(() => { servers.head.logManager.getLog("test3", 0).get.logEndOffset == 2 }, 1000)
+    TestUtils.waitUntilTrue(() => { servers.head.logManager.getLog("test4", 0).get.logEndOffset == 2 }, 1000)
+
+    // test if the consumer received the messages in the correct order when producer has enabled request pipelining
+    val request = builder.build()
+    val response = consumer.fetch(request)
+    for( (topic, partition) <- topics) {
+      val fetched = response.messageSet(topic, partition)
+      assertEquals(messages(topic), fetched.map(messageAndOffset => Utils.readString(messageAndOffset.message.payload)))
+    }
+  }
+
   /**
    * For testing purposes, just create these topics each with one partition and one replica for
    * which the provided broker should the leader for.  Create and wait for broker to lead.  Simple.

http://git-wip-us.apache.org/repos/asf/kafka/blob/4d8fb1ee/core/src/test/scala/unit/kafka/integration/ProducerConsumerTestHarness.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/integration/ProducerConsumerTestHarness.scala b/core/src/test/scala/unit/kafka/integration/ProducerConsumerTestHarness.scala
index 0fde254..731ee59 100644
--- a/core/src/test/scala/unit/kafka/integration/ProducerConsumerTestHarness.scala
+++ b/core/src/test/scala/unit/kafka/integration/ProducerConsumerTestHarness.scala
@@ -19,11 +19,8 @@ package kafka.integration
 
 import kafka.consumer.SimpleConsumer
 import org.scalatest.junit.JUnit3Suite
-import java.util.Properties
 import kafka.producer.{ProducerConfig, Producer}
 import kafka.utils.TestUtils
-import kafka.serializer._
-
 trait ProducerConsumerTestHarness extends JUnit3Suite with KafkaServerTestHarness {
     val port: Int
     val host = "localhost"
@@ -32,16 +29,7 @@ trait ProducerConsumerTestHarness extends JUnit3Suite with KafkaServerTestHarnes
 
   override def setUp() {
       super.setUp
-      val props = new Properties()
-      props.put("partitioner.class", "kafka.utils.StaticPartitioner")
-      props.put("broker.list", TestUtils.getBrokerListStrFromConfigs(configs))
-      props.put("send.buffer.bytes", "65536")
-      props.put("connect.timeout.ms", "100000")
-      props.put("reconnect.interval", "10000")
-      props.put("retry.backoff.ms", "1000")
-      props.put("message.send.max.retries", "3")
-      props.put("request.required.acks", "-1")
-      props.put("serializer.class", classOf[StringEncoder].getName.toString)
+      val props = TestUtils.getProducerConfig(TestUtils.getBrokerListStrFromConfigs(configs), "kafka.utils.StaticPartitioner")
       producer = new Producer(new ProducerConfig(props))
       consumer = new SimpleConsumer(host, port, 1000000, 64*1024, "")
     }

http://git-wip-us.apache.org/repos/asf/kafka/blob/4d8fb1ee/core/src/test/scala/unit/kafka/log/OffsetIndexTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/log/OffsetIndexTest.scala b/core/src/test/scala/unit/kafka/log/OffsetIndexTest.scala
index 051ebe3..3b2c069 100644
--- a/core/src/test/scala/unit/kafka/log/OffsetIndexTest.scala
+++ b/core/src/test/scala/unit/kafka/log/OffsetIndexTest.scala
@@ -24,7 +24,7 @@ import org.junit._
 import org.scalatest.junit.JUnitSuite
 import scala.collection._
 import scala.util.Random
-import kafka.utils._
+import kafka.utils.TestUtils
 
 class OffsetIndexTest extends JUnitSuite {
   

http://git-wip-us.apache.org/repos/asf/kafka/blob/4d8fb1ee/core/src/test/scala/unit/kafka/log4j/KafkaLog4jAppenderTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/log4j/KafkaLog4jAppenderTest.scala b/core/src/test/scala/unit/kafka/log4j/KafkaLog4jAppenderTest.scala
index c25255f..67497dd 100644
--- a/core/src/test/scala/unit/kafka/log4j/KafkaLog4jAppenderTest.scala
+++ b/core/src/test/scala/unit/kafka/log4j/KafkaLog4jAppenderTest.scala
@@ -36,7 +36,7 @@ class KafkaLog4jAppenderTest extends JUnit3Suite with ZooKeeperTestHarness with
 
   var logDirZk: File = null
   var config: KafkaConfig = null
-  var serverZk: KafkaServer = null
+  var server: KafkaServer = null
 
   var simpleConsumerZk: SimpleConsumer = null
 
@@ -55,14 +55,14 @@ class KafkaLog4jAppenderTest extends JUnit3Suite with ZooKeeperTestHarness with
     val logDirZkPath = propsZk.getProperty("log.dir")
     logDirZk = new File(logDirZkPath)
     config = new KafkaConfig(propsZk)
-    serverZk = TestUtils.createServer(config);
+    server = TestUtils.createServer(config);
     simpleConsumerZk = new SimpleConsumer("localhost", portZk, 1000000, 64*1024, "")
   }
 
   @After
   override def tearDown() {
     simpleConsumerZk.close
-    serverZk.shutdown
+    server.shutdown
     Utils.rm(logDirZk)
     super.tearDown()
   }
@@ -164,6 +164,7 @@ class KafkaLog4jAppenderTest extends JUnit3Suite with ZooKeeperTestHarness with
     props.put("log4j.appender.KAFKA.brokerList", TestUtils.getBrokerListStrFromConfigs(Seq(config)))
     props.put("log4j.appender.KAFKA.Topic", "test-topic")
     props.put("log4j.logger.kafka.log4j", "INFO,KAFKA")
+    props.put("log4j.appender.KAFKA.requiredNumAcks", "1")
     props
   }
 }

http://git-wip-us.apache.org/repos/asf/kafka/blob/4d8fb1ee/core/src/test/scala/unit/kafka/network/SocketServerTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/network/SocketServerTest.scala b/core/src/test/scala/unit/kafka/network/SocketServerTest.scala
index 7395cbc..9322b2c 100644
--- a/core/src/test/scala/unit/kafka/network/SocketServerTest.scala
+++ b/core/src/test/scala/unit/kafka/network/SocketServerTest.scala
@@ -21,7 +21,6 @@ import java.net._
 import java.io._
 import org.junit._
 import org.scalatest.junit.JUnitSuite
-import kafka.utils.TestUtils
 import java.util.Random
 import junit.framework.Assert._
 import kafka.producer.SyncProducerConfig
@@ -29,13 +28,14 @@ import kafka.api.ProducerRequest
 import java.nio.ByteBuffer
 import kafka.common.TopicAndPartition
 import kafka.message.ByteBufferMessageSet
+import java.nio.channels.SelectionKey
 
 
 class SocketServerTest extends JUnitSuite {
 
   val server: SocketServer = new SocketServer(0,
                                               host = null,
-                                              port = TestUtils.choosePort,
+                                              port = kafka.utils.TestUtils.choosePort,
                                               numProcessorThreads = 1,
                                               maxQueuedRequests = 50,
                                               maxRequestSize = 50)
@@ -102,4 +102,36 @@ class SocketServerTest extends JUnitSuite {
     receiveResponse(socket)
   }
 
+  @Test
+  def testPipelinedRequestOrdering() {
+    val socket = connect()
+    val correlationId = -1
+    val clientId = SyncProducerConfig.DefaultClientId
+    val ackTimeoutMs = SyncProducerConfig.DefaultAckTimeoutMs
+    val ack: Short = 0
+    val emptyRequest =
+      new ProducerRequest(correlationId, clientId, ack, ackTimeoutMs, Map[TopicAndPartition, ByteBufferMessageSet]())
+
+    val byteBuffer = ByteBuffer.allocate(emptyRequest.sizeInBytes)
+    emptyRequest.writeTo(byteBuffer)
+    byteBuffer.rewind()
+    val serializedBytes = new Array[Byte](byteBuffer.remaining)
+    byteBuffer.get(serializedBytes)
+
+    sendRequest(socket, 0, serializedBytes)
+    sendRequest(socket, 0, serializedBytes)
+
+    // here the socket server should've read only the first request completely and since the response is not sent yet
+    // the selection key should not be readable
+    val request = server.requestChannel.receiveRequest
+    Assert.assertFalse((request.requestKey.asInstanceOf[SelectionKey].interestOps & SelectionKey.OP_READ) == SelectionKey.OP_READ)
+
+    server.requestChannel.sendResponse(new RequestChannel.Response(0, request, null))
+
+    // if everything is working correctly, until you send a response for the first request,
+    // the 2nd request will not be read by the socket server
+    val request2 = server.requestChannel.receiveRequest
+    server.requestChannel.sendResponse(new RequestChannel.Response(0, request2, null))
+    Assert.assertFalse((request.requestKey.asInstanceOf[SelectionKey].interestOps & SelectionKey.OP_READ) == SelectionKey.OP_READ)
+  }
 }

http://git-wip-us.apache.org/repos/asf/kafka/blob/4d8fb1ee/core/src/test/scala/unit/kafka/producer/AsyncProducerTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/producer/AsyncProducerTest.scala b/core/src/test/scala/unit/kafka/producer/AsyncProducerTest.scala
index fb0666f..922a200 100644
--- a/core/src/test/scala/unit/kafka/producer/AsyncProducerTest.scala
+++ b/core/src/test/scala/unit/kafka/producer/AsyncProducerTest.scala
@@ -356,11 +356,9 @@ class AsyncProducerTest extends JUnit3Suite {
   @Test
   def testBrokerListAndAsync() {
     return
-    val props = new Properties()
-    props.put("serializer.class", "kafka.serializer.StringEncoder")
+    val props = TestUtils.getProducerConfig(TestUtils.getBrokerListStrFromConfigs(configs))
     props.put("producer.type", "async")
     props.put("batch.num.messages", "5")
-    props.put("broker.list", TestUtils.getBrokerListStrFromConfigs(configs))
 
     val config = new ProducerConfig(props)
 
@@ -394,9 +392,10 @@ class AsyncProducerTest extends JUnit3Suite {
   @Test
   def testFailedSendRetryLogic() {
     val props = new Properties()
+    props.put("broker.list", TestUtils.getBrokerListStrFromConfigs(configs))
+    props.put("request.required.acks", "1")
     props.put("serializer.class", classOf[StringEncoder].getName.toString)
     props.put("key.serializer.class", classOf[NullEncoder[Int]].getName.toString)
-    props.put("broker.list", TestUtils.getBrokerListStrFromConfigs(configs))
 
     val config = new ProducerConfig(props)
 
@@ -410,12 +409,12 @@ class AsyncProducerTest extends JUnit3Suite {
     // produce request for topic1 and partitions 0 and 1.  Let the first request fail
     // entirely.  The second request will succeed for partition 1 but fail for partition 0.
     // On the third try for partition 0, let it succeed.
-    val request1 = TestUtils.produceRequestWithAcks(List(topic1), List(0, 1), messagesToSet(msgs), acks = 0, correlationId = 11)
-    val request2 = TestUtils.produceRequestWithAcks(List(topic1), List(0, 1), messagesToSet(msgs), acks = 0, correlationId = 17)
+    val request1 = TestUtils.produceRequestWithAcks(List(topic1), List(0, 1), messagesToSet(msgs), acks = 1, correlationId = 11)
+    val request2 = TestUtils.produceRequestWithAcks(List(topic1), List(0, 1), messagesToSet(msgs), acks = 1, correlationId = 17)
     val response1 = ProducerResponse(0,
       Map((TopicAndPartition("topic1", 0), ProducerResponseStatus(ErrorMapping.NotLeaderForPartitionCode.toShort, 0L)),
           (TopicAndPartition("topic1", 1), ProducerResponseStatus(ErrorMapping.NoError, 0L))))
-    val request3 = TestUtils.produceRequest(topic1, 0, messagesToSet(msgs), correlationId = 21)
+    val request3 = TestUtils.produceRequest(topic1, 0, messagesToSet(msgs), acks = 1, correlationId = 21)
     val response2 = ProducerResponse(0,
       Map((TopicAndPartition("topic1", 0), ProducerResponseStatus(ErrorMapping.NoError, 0L))))
     val mockSyncProducer = EasyMock.createMock(classOf[SyncProducer])

http://git-wip-us.apache.org/repos/asf/kafka/blob/4d8fb1ee/core/src/test/scala/unit/kafka/producer/ProducerTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/producer/ProducerTest.scala b/core/src/test/scala/unit/kafka/producer/ProducerTest.scala
index 792919b..04acef5 100644
--- a/core/src/test/scala/unit/kafka/producer/ProducerTest.scala
+++ b/core/src/test/scala/unit/kafka/producer/ProducerTest.scala
@@ -199,7 +199,7 @@ class ProducerTest extends JUnit3Suite with ZooKeeperTestHarness with Logging{
     props.put("serializer.class", "kafka.serializer.StringEncoder")
     props.put("partitioner.class", "kafka.utils.StaticPartitioner")
     props.put("request.timeout.ms", "2000")
-//    props.put("request.required.acks", "-1")
+    props.put("request.required.acks", "1")
     props.put("broker.list", TestUtils.getBrokerListStrFromConfigs(Seq(config1, config2)))
 
     // create topic
@@ -258,6 +258,7 @@ class ProducerTest extends JUnit3Suite with ZooKeeperTestHarness with Logging{
     props.put("partitioner.class", "kafka.utils.StaticPartitioner")
     props.put("request.timeout.ms", String.valueOf(timeoutMs))
     props.put("broker.list", TestUtils.getBrokerListStrFromConfigs(Seq(config1, config2)))
+    props.put("request.required.acks", "1")
 
     val config = new ProducerConfig(props)
     val producer = new Producer[String, String](config)

http://git-wip-us.apache.org/repos/asf/kafka/blob/4d8fb1ee/core/src/test/scala/unit/kafka/producer/SyncProducerTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/producer/SyncProducerTest.scala b/core/src/test/scala/unit/kafka/producer/SyncProducerTest.scala
index 89ba944..81b2736 100644
--- a/core/src/test/scala/unit/kafka/producer/SyncProducerTest.scala
+++ b/core/src/test/scala/unit/kafka/producer/SyncProducerTest.scala
@@ -18,7 +18,6 @@
 package kafka.producer
 
 import java.net.SocketTimeoutException
-import java.util.Properties
 import junit.framework.Assert
 import kafka.admin.CreateTopicCommand
 import kafka.integration.KafkaServerTestHarness
@@ -38,16 +37,13 @@ class SyncProducerTest extends JUnit3Suite with KafkaServerTestHarness {
   @Test
   def testReachableServer() {
     val server = servers.head
-    val props = new Properties()
-    props.put("host", "localhost")
-    props.put("port", server.socketServer.port.toString)
-    props.put("send.buffer.bytes", "102400")
-    props.put("connect.timeout.ms", "500")
-    props.put("reconnect.interval", "1000")
+    val props = TestUtils.getSyncProducerConfig(server.socketServer.port)
+
     val producer = new SyncProducer(new SyncProducerConfig(props))
     val firstStart = SystemTime.milliseconds
     try {
-      val response = producer.send(TestUtils.produceRequest("test", 0, new ByteBufferMessageSet(compressionCodec = NoCompressionCodec, messages = new Message(messageBytes))))
+      val response = producer.send(TestUtils.produceRequest("test", 0,
+        new ByteBufferMessageSet(compressionCodec = NoCompressionCodec, messages = new Message(messageBytes)), acks = 1))
       Assert.assertNotNull(response)
     } catch {
       case e: Exception => Assert.fail("Unexpected failure sending message to broker. " + e.getMessage)
@@ -56,7 +52,8 @@ class SyncProducerTest extends JUnit3Suite with KafkaServerTestHarness {
     Assert.assertTrue((firstEnd-firstStart) < 500)
     val secondStart = SystemTime.milliseconds
     try {
-      val response = producer.send(TestUtils.produceRequest("test", 0, new ByteBufferMessageSet(compressionCodec = NoCompressionCodec, messages = new Message(messageBytes))))
+      val response = producer.send(TestUtils.produceRequest("test", 0,
+        new ByteBufferMessageSet(compressionCodec = NoCompressionCodec, messages = new Message(messageBytes)), acks = 1))
       Assert.assertNotNull(response)
     } catch {
       case e: Exception => Assert.fail("Unexpected failure sending message to broker. " + e.getMessage)
@@ -64,7 +61,8 @@ class SyncProducerTest extends JUnit3Suite with KafkaServerTestHarness {
     val secondEnd = SystemTime.milliseconds
     Assert.assertTrue((secondEnd-secondStart) < 500)
     try {
-      val response = producer.send(TestUtils.produceRequest("test", 0, new ByteBufferMessageSet(compressionCodec = NoCompressionCodec, messages = new Message(messageBytes))))
+      val response = producer.send(TestUtils.produceRequest("test", 0,
+        new ByteBufferMessageSet(compressionCodec = NoCompressionCodec, messages = new Message(messageBytes)), acks = 1))
       Assert.assertNotNull(response)
     } catch {
       case e: Exception => Assert.fail("Unexpected failure sending message to broker. " + e.getMessage)
@@ -74,36 +72,31 @@ class SyncProducerTest extends JUnit3Suite with KafkaServerTestHarness {
   @Test
   def testEmptyProduceRequest() {
     val server = servers.head
-    val props = new Properties()
-    props.put("host", "localhost")
-    props.put("port", server.socketServer.port.toString)
-    props.put("send.buffer.bytes", "102400")
-    props.put("connect.timeout.ms", "300")
-    props.put("reconnect.interval", "500")
+    val props = TestUtils.getSyncProducerConfig(server.socketServer.port)
+
     val correlationId = 0
     val clientId = SyncProducerConfig.DefaultClientId
     val ackTimeoutMs = SyncProducerConfig.DefaultAckTimeoutMs
-    val ack = SyncProducerConfig.DefaultRequiredAcks
+    val ack: Short = 1
     val emptyRequest = new kafka.api.ProducerRequest(correlationId, clientId, ack, ackTimeoutMs, Map[TopicAndPartition, ByteBufferMessageSet]())
-
     val producer = new SyncProducer(new SyncProducerConfig(props))
     val response = producer.send(emptyRequest)
+    Assert.assertTrue(response != null)
     Assert.assertTrue(!response.hasError && response.status.size == 0)
   }
 
   @Test
   def testMessageSizeTooLarge() {
     val server = servers.head
-    val props = new Properties()
-    props.put("host", "localhost")
-    props.put("port", server.socketServer.port.toString)
+    val props = TestUtils.getSyncProducerConfig(server.socketServer.port)
+
     val producer = new SyncProducer(new SyncProducerConfig(props))
     CreateTopicCommand.createTopic(zkClient, "test", 1, 1)
     TestUtils.waitUntilLeaderIsElectedOrChanged(zkClient, "test", 0, 500)
 
     val message1 = new Message(new Array[Byte](configs(0).messageMaxBytes + 1))
     val messageSet1 = new ByteBufferMessageSet(compressionCodec = NoCompressionCodec, messages = message1)
-    val response1 = producer.send(TestUtils.produceRequest("test", 0, messageSet1))
+    val response1 = producer.send(TestUtils.produceRequest("test", 0, messageSet1, acks = 1))
 
     Assert.assertEquals(1, response1.status.count(_._2.error != ErrorMapping.NoError))
     Assert.assertEquals(ErrorMapping.MessageSizeTooLargeCode, response1.status(TopicAndPartition("test", 0)).error)
@@ -112,7 +105,7 @@ class SyncProducerTest extends JUnit3Suite with KafkaServerTestHarness {
     val safeSize = configs(0).messageMaxBytes - Message.MessageOverhead - MessageSet.LogOverhead - 1
     val message2 = new Message(new Array[Byte](safeSize))
     val messageSet2 = new ByteBufferMessageSet(compressionCodec = NoCompressionCodec, messages = message2)
-    val response2 = producer.send(TestUtils.produceRequest("test", 0, messageSet2))
+    val response2 = producer.send(TestUtils.produceRequest("test", 0, messageSet2, acks = 1))
 
     Assert.assertEquals(1, response1.status.count(_._2.error != ErrorMapping.NoError))
     Assert.assertEquals(ErrorMapping.NoError, response2.status(TopicAndPartition("test", 0)).error)
@@ -122,12 +115,7 @@ class SyncProducerTest extends JUnit3Suite with KafkaServerTestHarness {
   @Test
   def testProduceCorrectlyReceivesResponse() {
     val server = servers.head
-    val props = new Properties()
-    props.put("host", "localhost")
-    props.put("port", server.socketServer.port.toString)
-    props.put("send.buffer.bytes", "102400")
-    props.put("connect.timeout.ms", "300")
-    props.put("reconnect.interval", "500")
+    val props = TestUtils.getSyncProducerConfig(server.socketServer.port)
 
     val producer = new SyncProducer(new SyncProducerConfig(props))
     val messages = new ByteBufferMessageSet(NoCompressionCodec, new Message(messageBytes))
@@ -173,15 +161,11 @@ class SyncProducerTest extends JUnit3Suite with KafkaServerTestHarness {
     val timeoutMs = 500
 
     val server = servers.head
-    val props = new Properties()
-    props.put("host", "localhost")
-    props.put("port", server.socketServer.port.toString)
-    props.put("send.buffer.bytes", "102400")
-    props.put("request.timeout.ms", String.valueOf(timeoutMs))
+    val props = TestUtils.getSyncProducerConfig(server.socketServer.port)
     val producer = new SyncProducer(new SyncProducerConfig(props))
 
     val messages = new ByteBufferMessageSet(NoCompressionCodec, new Message(messageBytes))
-    val request = TestUtils.produceRequest("topic1", 0, messages)
+    val request = TestUtils.produceRequest("topic1", 0, messages, acks = 1)
 
     // stop IO threads and request handling, but leave networking operational
     // any requests should be accepted and queue up, but not handled
@@ -196,8 +180,21 @@ class SyncProducerTest extends JUnit3Suite with KafkaServerTestHarness {
       case e => Assert.fail("Unexpected exception when expecting timeout: " + e)
     }
     val t2 = SystemTime.milliseconds
-
     // make sure we don't wait fewer than timeoutMs for a response
     Assert.assertTrue((t2-t1) >= timeoutMs)
   }
+
+  @Test
+  def testProduceRequestWithNoResponse() {
+    val server = servers.head
+    val props = TestUtils.getSyncProducerConfig(server.socketServer.port)
+    val correlationId = 0
+    val clientId = SyncProducerConfig.DefaultClientId
+    val ackTimeoutMs = SyncProducerConfig.DefaultAckTimeoutMs
+    val ack: Short = 0
+    val emptyRequest = new kafka.api.ProducerRequest(correlationId, clientId, ack, ackTimeoutMs, Map[TopicAndPartition, ByteBufferMessageSet]())
+    val producer = new SyncProducer(new SyncProducerConfig(props))
+    val response = producer.send(emptyRequest)
+    Assert.assertTrue(response == null)
+  }
 }

http://git-wip-us.apache.org/repos/asf/kafka/blob/4d8fb1ee/core/src/test/scala/unit/kafka/server/LogRecoveryTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/server/LogRecoveryTest.scala b/core/src/test/scala/unit/kafka/server/LogRecoveryTest.scala
index cd724a3..db46247 100644
--- a/core/src/test/scala/unit/kafka/server/LogRecoveryTest.scala
+++ b/core/src/test/scala/unit/kafka/server/LogRecoveryTest.scala
@@ -48,7 +48,7 @@ class LogRecoveryTest extends JUnit3Suite with ZooKeeperTestHarness {
   var hwFile2: HighwaterMarkCheckpoint = new HighwaterMarkCheckpoint(configProps2.logDirs(0))
   var servers: Seq[KafkaServer] = Seq.empty[KafkaServer]
   
-  val producerProps = getProducerConfig(TestUtils.getBrokerListStrFromConfigs(configs), 64*1024, 100000, 10000)
+  val producerProps = getProducerConfig(TestUtils.getBrokerListStrFromConfigs(configs))
   producerProps.put("key.serializer.class", classOf[IntEncoder].getName.toString)
   producerProps.put("request.required.acks", "-1")
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/4d8fb1ee/core/src/test/scala/unit/kafka/server/ServerShutdownTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/server/ServerShutdownTest.scala b/core/src/test/scala/unit/kafka/server/ServerShutdownTest.scala
index 7afbe54..3728f8c 100644
--- a/core/src/test/scala/unit/kafka/server/ServerShutdownTest.scala
+++ b/core/src/test/scala/unit/kafka/server/ServerShutdownTest.scala
@@ -44,7 +44,7 @@ class ServerShutdownTest extends JUnit3Suite with ZooKeeperTestHarness {
   def testCleanShutdown() {
     var server = new KafkaServer(config)
     server.startup()
-    val producerConfig = getProducerConfig(TestUtils.getBrokerListStrFromConfigs(Seq(config)), 64*1024, 100000, 10000)
+    val producerConfig = getProducerConfig(TestUtils.getBrokerListStrFromConfigs(Seq(config)))
     producerConfig.put("key.serializer.class", classOf[IntEncoder].getName.toString)
     var producer = new Producer[Int, String](new ProducerConfig(producerConfig))
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/4d8fb1ee/core/src/test/scala/unit/kafka/utils/TestUtils.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/utils/TestUtils.scala b/core/src/test/scala/unit/kafka/utils/TestUtils.scala
index 9400328..217ff7a 100644
--- a/core/src/test/scala/unit/kafka/utils/TestUtils.scala
+++ b/core/src/test/scala/unit/kafka/utils/TestUtils.scala
@@ -301,16 +301,25 @@ object TestUtils extends Logging {
     new Producer[K, V](new ProducerConfig(props))
   }
 
-  def getProducerConfig(brokerList: String, bufferSize: Int, connectTimeout: Int,
-                        reconnectInterval: Int): Properties = {
+  def getProducerConfig(brokerList: String, partitioner: String = "kafka.producer.DefaultPartitioner"): Properties = {
     val props = new Properties()
-    props.put("producer.type", "sync")
     props.put("broker.list", brokerList)
-    props.put("partitioner.class", "kafka.utils.FixedValuePartitioner")
-    props.put("send.buffer.bytes", bufferSize.toString)
-    props.put("connect.timeout.ms", connectTimeout.toString)
-    props.put("reconnect.interval", reconnectInterval.toString)
-    props.put("request.timeout.ms", 30000.toString)
+    props.put("partitioner.class", partitioner)
+    props.put("message.send.max.retries", "3")
+    props.put("retry.backoff.ms", "1000")
+    props.put("request.timeout.ms", "500")
+    props.put("request.required.acks", "-1")
+    props.put("serializer.class", classOf[StringEncoder].getName.toString)
+
+    props
+  }
+
+  def getSyncProducerConfig(port: Int): Properties = {
+    val props = new Properties()
+    props.put("host", "localhost")
+    props.put("port", port.toString)
+    props.put("request.timeout.ms", "500")
+    props.put("request.required.acks", "1")
     props.put("serializer.class", classOf[StringEncoder].getName.toString)
     props
   }