You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by jo...@apache.org on 2012/03/03 06:46:44 UTC

svn commit: r1296577 - in /incubator/kafka/branches/0.8/core/src: main/scala/kafka/api/ main/scala/kafka/javaapi/ main/scala/kafka/javaapi/message/ main/scala/kafka/javaapi/producer/ main/scala/kafka/message/ main/scala/kafka/producer/ main/scala/kafka...

Author: joestein
Date: Sat Mar  3 05:46:43 2012
New Revision: 1296577

URL: http://svn.apache.org/viewvc?rev=1296577&view=rev
Log:
KAFKA-240 ProducerRequest wire format protocol update and related changes

Removed:
    incubator/kafka/branches/0.8/core/src/main/scala/kafka/api/MultiProducerRequest.scala
    incubator/kafka/branches/0.8/core/src/main/scala/kafka/server/MultiMessageSetSend.scala
Modified:
    incubator/kafka/branches/0.8/core/src/main/scala/kafka/api/FetchResponse.scala
    incubator/kafka/branches/0.8/core/src/main/scala/kafka/api/ProducerRequest.scala
    incubator/kafka/branches/0.8/core/src/main/scala/kafka/javaapi/FetchResponse.scala
    incubator/kafka/branches/0.8/core/src/main/scala/kafka/javaapi/ProducerRequest.scala
    incubator/kafka/branches/0.8/core/src/main/scala/kafka/javaapi/message/ByteBufferMessageSet.scala
    incubator/kafka/branches/0.8/core/src/main/scala/kafka/javaapi/producer/SyncProducer.scala
    incubator/kafka/branches/0.8/core/src/main/scala/kafka/message/ByteBufferMessageSet.scala
    incubator/kafka/branches/0.8/core/src/main/scala/kafka/message/FileMessageSet.scala
    incubator/kafka/branches/0.8/core/src/main/scala/kafka/message/MessageSet.scala
    incubator/kafka/branches/0.8/core/src/main/scala/kafka/producer/SyncProducer.scala
    incubator/kafka/branches/0.8/core/src/main/scala/kafka/producer/SyncProducerConfig.scala
    incubator/kafka/branches/0.8/core/src/main/scala/kafka/producer/async/DefaultEventHandler.scala
    incubator/kafka/branches/0.8/core/src/main/scala/kafka/server/KafkaApis.scala
    incubator/kafka/branches/0.8/core/src/main/scala/kafka/utils/Utils.scala
    incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/message/ByteBufferMessageSetTest.scala
    incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/producer/AsyncProducerTest.scala
    incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/producer/SyncProducerTest.scala
    incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/utils/TestUtils.scala

Modified: incubator/kafka/branches/0.8/core/src/main/scala/kafka/api/FetchResponse.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/main/scala/kafka/api/FetchResponse.scala?rev=1296577&r1=1296576&r2=1296577&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/main/scala/kafka/api/FetchResponse.scala (original)
+++ incubator/kafka/branches/0.8/core/src/main/scala/kafka/api/FetchResponse.scala Sat Mar  3 05:46:43 2012
@@ -39,6 +39,15 @@ object PartitionData {
 
 case class PartitionData(partition: Int, error: Int = ErrorMapping.NoError, initialOffset:Long = 0L, messages: MessageSet) {
   val sizeInBytes = 4 + 4 + 8 + 4 + messages.sizeInBytes.intValue()
+
+  def this(partition: Int, messages: MessageSet) = this(partition, ErrorMapping.NoError, 0L, messages)
+
+  def getTranslatedPartition(topic: String, randomSelector: String => Int): Int = {
+    if (partition == ProducerRequest.RandomPartition)
+      return randomSelector(topic)
+    else 
+      return partition 
+  }
 }
 
 object TopicData {
@@ -73,6 +82,15 @@ object TopicData {
 
 case class TopicData(topic: String, partitionData: Array[PartitionData]) {
   val sizeInBytes = 2 + topic.length + partitionData.foldLeft(4)(_ + _.sizeInBytes)
+
+  override def equals(other: Any): Boolean = {
+    other match {
+      case that: TopicData =>
+        ( topic == that.topic &&
+          partitionData.toSeq == that.partitionData.toSeq )
+      case _ => false
+    }
+  }
 }
 
 object FetchResponse {

Modified: incubator/kafka/branches/0.8/core/src/main/scala/kafka/api/ProducerRequest.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/main/scala/kafka/api/ProducerRequest.scala?rev=1296577&r1=1296576&r2=1296577&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/main/scala/kafka/api/ProducerRequest.scala (original)
+++ incubator/kafka/branches/0.8/core/src/main/scala/kafka/api/ProducerRequest.scala Sat Mar  3 05:46:43 2012
@@ -24,60 +24,108 @@ import kafka.utils._
 
 object ProducerRequest {
   val RandomPartition = -1
-  
+  val versionId: Short = 0
+
   def readFrom(buffer: ByteBuffer): ProducerRequest = {
-    val topic = Utils.readShortString(buffer, "UTF-8")
-    val partition = buffer.getInt
-    val messageSetSize = buffer.getInt
-    val messageSetBuffer = buffer.slice()
-    messageSetBuffer.limit(messageSetSize)
-    buffer.position(buffer.position + messageSetSize)
-    new ProducerRequest(topic, partition, new ByteBufferMessageSet(messageSetBuffer))
+    val versionId: Short = buffer.getShort
+    val correlationId: Int = buffer.getInt
+    val clientId: String = Utils.readShortString(buffer, "UTF-8")
+    val requiredAcks: Short = buffer.getShort
+    val ackTimeout: Int = buffer.getInt
+    //build the topic structure
+    val topicCount = buffer.getInt
+    val data = new Array[TopicData](topicCount)
+    for(i <- 0 until topicCount) {
+      val topic = Utils.readShortString(buffer, "UTF-8")
+      		
+      val partitionCount = buffer.getInt
+      //build the partition structure within this topic
+      val partitionData = new Array[PartitionData](partitionCount)
+      for (j <- 0 until partitionCount) {
+        val partition = buffer.getInt
+        val messageSetSize = buffer.getInt
+        val messageSetBuffer = new Array[Byte](messageSetSize)
+        buffer.get(messageSetBuffer,0,messageSetSize)
+        partitionData(j) = new PartitionData(partition,new ByteBufferMessageSet(ByteBuffer.wrap(messageSetBuffer)))
+      }
+      data(i) = new TopicData(topic,partitionData)
+    }
+    new ProducerRequest(versionId, correlationId, clientId, requiredAcks, ackTimeout, data)
   }
 }
 
-class ProducerRequest(val topic: String,
-                      val partition: Int,
-                      val messages: ByteBufferMessageSet) extends Request(RequestKeys.Produce) {
+case class ProducerRequest(val versionId: Short, val correlationId: Int,
+                      val clientId: String,
+                      val requiredAcks: Short,
+                      val ackTimeout: Int,
+                      val data: Array[TopicData]) extends Request(RequestKeys.Produce) {
+
+  def this(correlationId: Int, clientId: String, requiredAcks: Short, ackTimeout: Int, data: Array[TopicData]) = this(ProducerRequest.versionId, correlationId, clientId, requiredAcks, ackTimeout, data)
 
   def writeTo(buffer: ByteBuffer) {
-    Utils.writeShortString(buffer, topic)
-    buffer.putInt(partition)
-    buffer.putInt(messages.serialized.limit)
-    buffer.put(messages.serialized)
-    messages.serialized.rewind
+    buffer.putShort(versionId)
+    buffer.putInt(correlationId)
+    Utils.writeShortString(buffer, clientId, "UTF-8")
+    buffer.putShort(requiredAcks)
+    buffer.putInt(ackTimeout)
+    //save the topic structure
+    buffer.putInt(data.size) //the number of topics
+    data.foreach(d =>{
+      Utils.writeShortString(buffer, d.topic, "UTF-8") //write the topic
+      buffer.putInt(d.partitionData.size) //the number of partitions
+      d.partitionData.foreach(p => {
+        buffer.putInt(p.partition)
+        buffer.putInt(p.messages.getSerialized().limit)
+        buffer.put(p.messages.getSerialized())
+        p.messages.getSerialized().rewind
+      })
+    })
   }
-  
-  def sizeInBytes(): Int = 2 + topic.length + 4 + 4 + messages.sizeInBytes.asInstanceOf[Int]
 
-  def getTranslatedPartition(randomSelector: String => Int): Int = {
-    if (partition == ProducerRequest.RandomPartition)
-      return randomSelector(topic)
-    else 
-      return partition
+  def sizeInBytes(): Int = {
+    var size = 0 
+    //size, request_type_id, version_id, correlation_id, client_id, required_acks, ack_timeout, data.size
+    size = 2 + 4 + 2 + clientId.length + 2 + 4 + 4; 
+    data.foreach(d =>{
+	  size += 2 + d.topic.length + 4
+	  d.partitionData.foreach(p => {
+	    size += 4 + 4 + p.messages.sizeInBytes.asInstanceOf[Int]
+	  })
+    })
+    size
   }
 
   override def toString: String = {
     val builder = new StringBuilder()
     builder.append("ProducerRequest(")
-    builder.append(topic + ",")
-    builder.append(partition + ",")
-    builder.append(messages.sizeInBytes)
+    builder.append(versionId + ",")
+    builder.append(correlationId + ",")
+    builder.append(clientId + ",")
+    builder.append(requiredAcks + ",")
+    builder.append(ackTimeout)
+	data.foreach(d =>{
+      builder.append(":[" + d.topic)
+      d.partitionData.foreach(p => {
+        builder.append(":[")
+        builder.append(p.partition + ",")
+        builder.append(p.messages.sizeInBytes)
+        builder.append("]")
+      })
+      builder.append("]")
+    })
     builder.append(")")
     builder.toString
   }
 
   override def equals(other: Any): Boolean = {
-    other match {
+   other match {
       case that: ProducerRequest =>
-        (that canEqual this) && topic == that.topic && partition == that.partition &&
-                messages.equals(that.messages) 
+        ( correlationId == that.correlationId &&
+          clientId == that.clientId &&
+          requiredAcks == that.requiredAcks &&
+          ackTimeout == that.ackTimeout &&
+          data.toSeq == that.data.toSeq)
       case _ => false
     }
   }
-
-  def canEqual(other: Any): Boolean = other.isInstanceOf[ProducerRequest]
-
-  override def hashCode: Int = 31 + (17 * partition) + topic.hashCode + messages.hashCode
-
-}
+}
\ No newline at end of file

Modified: incubator/kafka/branches/0.8/core/src/main/scala/kafka/javaapi/FetchResponse.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/main/scala/kafka/javaapi/FetchResponse.scala?rev=1296577&r1=1296576&r2=1296577&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/main/scala/kafka/javaapi/FetchResponse.scala (original)
+++ incubator/kafka/branches/0.8/core/src/main/scala/kafka/javaapi/FetchResponse.scala Sat Mar  3 05:46:43 2012
@@ -22,7 +22,7 @@ import kafka.api.TopicData
 
 class FetchResponse( val versionId: Short,
                      val correlationId: Int,
-                     val data: Array[TopicData] ) {
+                     private val data: Array[TopicData] ) {
 
   private val underlying = new kafka.api.FetchResponse(versionId, correlationId, data)
 

Modified: incubator/kafka/branches/0.8/core/src/main/scala/kafka/javaapi/ProducerRequest.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/main/scala/kafka/javaapi/ProducerRequest.scala?rev=1296577&r1=1296576&r2=1296577&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/main/scala/kafka/javaapi/ProducerRequest.scala (original)
+++ incubator/kafka/branches/0.8/core/src/main/scala/kafka/javaapi/ProducerRequest.scala Sat Mar  3 05:46:43 2012
@@ -17,36 +17,29 @@
 package kafka.javaapi
 
 import kafka.network.Request
-import kafka.api.RequestKeys
+import kafka.api.{RequestKeys, TopicData}
 import java.nio.ByteBuffer
 
-class ProducerRequest(val topic: String,
-                      val partition: Int,
-                      val messages: kafka.javaapi.message.ByteBufferMessageSet) extends Request(RequestKeys.Produce) {
+class ProducerRequest(val correlationId: Int,
+                      val clientId: String,
+                      val requiredAcks: Short,
+                      val ackTimeout: Int,
+                      val data: Array[TopicData]) extends Request(RequestKeys.Produce) {
+	
   import Implicits._
-  private val underlying = new kafka.api.ProducerRequest(topic, partition, messages)
+  val underlying = new kafka.api.ProducerRequest(correlationId, clientId, requiredAcks, ackTimeout, data)
 
   def writeTo(buffer: ByteBuffer) { underlying.writeTo(buffer) }
 
   def sizeInBytes(): Int = underlying.sizeInBytes
 
-  def getTranslatedPartition(randomSelector: String => Int): Int =
-    underlying.getTranslatedPartition(randomSelector)
-
   override def toString: String =
     underlying.toString
 
-  override def equals(other: Any): Boolean = {
-    other match {
-      case that: ProducerRequest =>
-        (that canEqual this) && topic == that.topic && partition == that.partition &&
-                messages.equals(that.messages)
-      case _ => false
-    }
-  }
+  override def equals(other: Any): Boolean = underlying.equals(other)
 
   def canEqual(other: Any): Boolean = other.isInstanceOf[ProducerRequest]
 
-  override def hashCode: Int = 31 + (17 * partition) + topic.hashCode + messages.hashCode
+  override def hashCode: Int = underlying.hashCode
 
-}
+}
\ No newline at end of file

Modified: incubator/kafka/branches/0.8/core/src/main/scala/kafka/javaapi/message/ByteBufferMessageSet.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/main/scala/kafka/javaapi/message/ByteBufferMessageSet.scala?rev=1296577&r1=1296576&r2=1296577&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/main/scala/kafka/javaapi/message/ByteBufferMessageSet.scala (original)
+++ incubator/kafka/branches/0.8/core/src/main/scala/kafka/javaapi/message/ByteBufferMessageSet.scala Sat Mar  3 05:46:43 2012
@@ -39,7 +39,7 @@ class ByteBufferMessageSet(private val b
 
   def validBytes: Long = underlying.validBytes
 
-  def serialized():ByteBuffer = underlying.serialized
+  def serialized():ByteBuffer = underlying.getSerialized()
 
   def getInitialOffset = initialOffset
 

Modified: incubator/kafka/branches/0.8/core/src/main/scala/kafka/javaapi/producer/SyncProducer.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/main/scala/kafka/javaapi/producer/SyncProducer.scala?rev=1296577&r1=1296576&r2=1296577&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/main/scala/kafka/javaapi/producer/SyncProducer.scala (original)
+++ incubator/kafka/branches/0.8/core/src/main/scala/kafka/javaapi/producer/SyncProducer.scala Sat Mar  3 05:46:43 2012
@@ -18,6 +18,8 @@ package kafka.javaapi.producer
 
 import kafka.producer.SyncProducerConfig
 import kafka.javaapi.message.ByteBufferMessageSet
+import kafka.javaapi.ProducerRequest
+import kafka.api.{PartitionData, TopicData}
 
 class SyncProducer(syncProducer: kafka.producer.SyncProducer) {
 
@@ -25,21 +27,17 @@ class SyncProducer(syncProducer: kafka.p
 
   val underlying = syncProducer
 
-  def send(topic: String, partition: Int, messages: ByteBufferMessageSet) {
-    import kafka.javaapi.Implicits._
-    underlying.send(topic, partition, messages)
+  def send(producerRequest: kafka.javaapi.ProducerRequest) {
+    underlying.send(producerRequest.underlying)	
   }
 
-  def send(topic: String, messages: ByteBufferMessageSet): Unit = send(topic,
-                                                                       kafka.api.ProducerRequest.RandomPartition,
-                                                                       messages)
-
-  def multiSend(produces: Array[kafka.javaapi.ProducerRequest]) {
-    import kafka.javaapi.Implicits._
-    val produceRequests = new Array[kafka.api.ProducerRequest](produces.length)
-    for(i <- 0 until produces.length)
-      produceRequests(i) = new kafka.api.ProducerRequest(produces(i).topic, produces(i).partition, produces(i).messages)
-    underlying.multiSend(produceRequests)
+  def send(topic: String, messages: ByteBufferMessageSet): Unit = {
+    var data = new Array[TopicData](1)
+    var partition_data = new Array[PartitionData](1)
+    partition_data(0) = new PartitionData(-1,messages.underlying)
+    data(0) = new TopicData(topic,partition_data)
+    val producerRequest = new kafka.api.ProducerRequest(-1, "", 0, 0, data)
+    underlying.send(producerRequest)      	
   }
 
   def close() {

Modified: incubator/kafka/branches/0.8/core/src/main/scala/kafka/message/ByteBufferMessageSet.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/main/scala/kafka/message/ByteBufferMessageSet.scala?rev=1296577&r1=1296576&r2=1296577&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/main/scala/kafka/message/ByteBufferMessageSet.scala (original)
+++ incubator/kafka/branches/0.8/core/src/main/scala/kafka/message/ByteBufferMessageSet.scala Sat Mar  3 05:46:43 2012
@@ -53,7 +53,7 @@ class ByteBufferMessageSet(private val b
 
   def getErrorCode = errorCode
 
-  def serialized(): ByteBuffer = buffer
+  def getSerialized(): ByteBuffer = buffer
 
   def validBytes: Long = shallowValidBytes
 

Modified: incubator/kafka/branches/0.8/core/src/main/scala/kafka/message/FileMessageSet.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/main/scala/kafka/message/FileMessageSet.scala?rev=1296577&r1=1296576&r2=1296577&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/main/scala/kafka/message/FileMessageSet.scala (original)
+++ incubator/kafka/branches/0.8/core/src/main/scala/kafka/message/FileMessageSet.scala Sat Mar  3 05:46:43 2012
@@ -40,6 +40,8 @@ class FileMessageSet private[kafka](priv
   private val setSize = new AtomicLong()
   private val setHighWaterMark = new AtomicLong()
   
+  def getSerialized(): ByteBuffer = throw new java.lang.UnsupportedOperationException()
+
   if(mutable) {
     if(limit < Long.MaxValue || offset > 0)
       throw new IllegalArgumentException("Attempt to open a mutable message set with a view or offset, which is not allowed.")

Modified: incubator/kafka/branches/0.8/core/src/main/scala/kafka/message/MessageSet.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/main/scala/kafka/message/MessageSet.scala?rev=1296577&r1=1296576&r2=1296577&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/main/scala/kafka/message/MessageSet.scala (original)
+++ incubator/kafka/branches/0.8/core/src/main/scala/kafka/message/MessageSet.scala Sat Mar  3 05:46:43 2012
@@ -111,4 +111,9 @@ abstract class MessageSet extends Iterab
         throw new InvalidMessageException
   }
   
+  /**
+   * Used to allow children to have serialization on implementation
+   */
+  def getSerialized(): ByteBuffer
+
 }

Modified: incubator/kafka/branches/0.8/core/src/main/scala/kafka/producer/SyncProducer.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/main/scala/kafka/producer/SyncProducer.scala?rev=1296577&r1=1296576&r2=1296577&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/main/scala/kafka/producer/SyncProducer.scala (original)
+++ incubator/kafka/branches/0.8/core/src/main/scala/kafka/producer/SyncProducer.scala Sat Mar  3 05:46:43 2012
@@ -51,29 +51,10 @@ class SyncProducer(val config: SyncProdu
     if (logger.isTraceEnabled) {
       trace("verifying sendbuffer of size " + buffer.limit)
       val requestTypeId = buffer.getShort()
-      if (requestTypeId == RequestKeys.MultiProduce) {
-        try {
-          val request = MultiProducerRequest.readFrom(buffer)
-          for (produce <- request.produces) {
-            try {
-              for (messageAndOffset <- produce.messages)
-                if (!messageAndOffset.message.isValid)
-                  trace("topic " + produce.topic + " is invalid")
-            }
-            catch {
-              case e: Throwable =>
-                trace("error iterating messages ", e)
-            }
-          }
-        }
-        catch {
-          case e: Throwable =>
-            trace("error verifying sendbuffer ", e)
-        }
-      }
+      val request = ProducerRequest.readFrom(buffer)
+      trace(request.toString)
     }
   }
-
   /**
    * Common functionality for the public send methods
    */
@@ -108,21 +89,15 @@ class SyncProducer(val config: SyncProdu
   /**
    * Send a message
    */
-  def send(topic: String, partition: Int, messages: ByteBufferMessageSet) {
-    verifyMessageSize(messages)
-    val setSize = messages.sizeInBytes.asInstanceOf[Int]
-    trace("Got message set with " + setSize + " bytes to send")
-    send(new BoundedByteBufferSend(new ProducerRequest(topic, partition, messages)))
-  }
- 
-  def send(topic: String, messages: ByteBufferMessageSet): Unit = send(topic, ProducerRequest.RandomPartition, messages)
-
-  def multiSend(produces: Array[ProducerRequest]) {
-    for (request <- produces)
-      verifyMessageSize(request.messages)
-    val setSize = produces.foldLeft(0L)(_ + _.messages.sizeInBytes)
-    trace("Got multi message sets with " + setSize + " bytes to send")
-    send(new BoundedByteBufferSend(new MultiProducerRequest(produces)))
+  def send(producerRequest: ProducerRequest) {
+    producerRequest.data.foreach(d => {
+      d.partitionData.foreach(p => {
+	    verifyMessageSize(new ByteBufferMessageSet(p.messages.getSerialized()))
+        val setSize = p.messages.sizeInBytes.asInstanceOf[Int]
+        trace("Got message set with " + setSize + " bytes to send")
+      })
+    })
+    send(new BoundedByteBufferSend(producerRequest))
   }
 
   def send(request: TopicMetadataRequest): Seq[TopicMetadata] = {

Modified: incubator/kafka/branches/0.8/core/src/main/scala/kafka/producer/SyncProducerConfig.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/main/scala/kafka/producer/SyncProducerConfig.scala?rev=1296577&r1=1296576&r2=1296577&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/main/scala/kafka/producer/SyncProducerConfig.scala (original)
+++ incubator/kafka/branches/0.8/core/src/main/scala/kafka/producer/SyncProducerConfig.scala Sat Mar  3 05:46:43 2012
@@ -41,4 +41,23 @@ trait SyncProducerConfigShared {
   val reconnectInterval = Utils.getInt(props, "reconnect.interval", 30000)
 
   val maxMessageSize = Utils.getInt(props, "max.message.size", 1000000)
+
+  /* the client application sending the producer requests */
+  val correlationId = Utils.getInt(props,"producer.request.correlation_id",-1)
+
+  /* the client application sending the producer requests */
+  val clientId = Utils.getString(props,"producer.request.client_id","")
+
+  /* the required_acks of the producer requests */
+  val requiredAcks = Utils.getShort(props,"producer.request.required_acks",0)
+
+  /* the ack_timeout of the producer requests */
+  val ackTimeout = Utils.getInt(props,"producer.request.ack_timeout",1)
 }
+
+object SyncProducerConfig {
+  val DefaultCorrelationId = -1
+  val DefaultClientId = ""
+  val DefaultRequiredAcks : Short = 0
+  val DefaultAckTimeoutMs = 1
+}
\ No newline at end of file

Modified: incubator/kafka/branches/0.8/core/src/main/scala/kafka/producer/async/DefaultEventHandler.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/main/scala/kafka/producer/async/DefaultEventHandler.scala?rev=1296577&r1=1296576&r2=1296577&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/main/scala/kafka/producer/async/DefaultEventHandler.scala (original)
+++ incubator/kafka/branches/0.8/core/src/main/scala/kafka/producer/async/DefaultEventHandler.scala Sat Mar  3 05:46:43 2012
@@ -17,7 +17,7 @@
 
 package kafka.producer.async
 
-import kafka.api.ProducerRequest
+import kafka.api.{ProducerRequest, TopicData, PartitionData}
 import kafka.serializer.Encoder
 import kafka.producer._
 import kafka.cluster.{Partition, Broker}
@@ -147,9 +147,22 @@ class DefaultEventHandler[K,V](config: P
 
   private def send(brokerId: Int, messagesPerTopic: Map[(String, Int), ByteBufferMessageSet]) {
     if(messagesPerTopic.size > 0) {
-      val requests = messagesPerTopic.map(f => new ProducerRequest(f._1._1, f._1._2, f._2)).toArray
+      val topics = new HashMap[String, ListBuffer[PartitionData]]()
+      val requests = messagesPerTopic.map(f => {
+        val topicName = f._1._1
+        val partitionId = f._1._2
+        val messagesSet= f._2
+        val topic = topics.get(topicName) // checking to see if this topics exists
+        topic match {
+          case None => topics += topicName -> new ListBuffer[PartitionData]() //create a new listbuffer for this topic
+          case Some(x) => trace("found " + topicName)
+        }
+	    topics(topicName).append(new PartitionData(partitionId, messagesSet))
+      })
+      val topicData = topics.map(kv => new TopicData(kv._1,kv._2.toArray))
+      val producerRequest = new ProducerRequest(config.correlationId, config.clientId, config.requiredAcks, config.ackTimeout, topicData.toArray) //new kafka.javaapi.ProducerRequest(correlation_id, client_id, required_acks, ack_timeout, topic_data.toArray)
       val syncProducer = producerPool.getProducer(brokerId)
-      syncProducer.multiSend(requests)
+      syncProducer.send(producerRequest)
       trace("kafka producer sent messages for topics %s to broker %s:%d"
         .format(messagesPerTopic, syncProducer.config.host, syncProducer.config.port))
     }

Modified: incubator/kafka/branches/0.8/core/src/main/scala/kafka/server/KafkaApis.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/main/scala/kafka/server/KafkaApis.scala?rev=1296577&r1=1296576&r2=1296577&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/main/scala/kafka/server/KafkaApis.scala (original)
+++ incubator/kafka/branches/0.8/core/src/main/scala/kafka/server/KafkaApis.scala Sat Mar  3 05:46:43 2012
@@ -41,7 +41,6 @@ class KafkaApis(val logManager: LogManag
     apiId match {
       case RequestKeys.Produce => handleProducerRequest(receive)
       case RequestKeys.Fetch => handleFetchRequest(receive)
-      case RequestKeys.MultiProduce => handleMultiProducerRequest(receive)
       case RequestKeys.Offsets => handleOffsetRequest(receive)
       case RequestKeys.TopicMetadata => handleTopicMetadataRequest(receive)
       case _ => throw new IllegalStateException("No mapping found for handler id " + apiId)
@@ -59,31 +58,38 @@ class KafkaApis(val logManager: LogManag
     None
   }
 
-  def handleMultiProducerRequest(receive: Receive): Option[Send] = {
-    val request = MultiProducerRequest.readFrom(receive.buffer)
-    if(requestLogger.isTraceEnabled)
-      requestLogger.trace("Multiproducer request " + request.toString)
-    request.produces.map(handleProducerRequest(_, "MultiProducerRequest"))
-    None
-  }
-
-  private def handleProducerRequest(request: ProducerRequest, requestHandlerName: String) = {
-    val partition = request.getTranslatedPartition(logManager.chooseRandomPartition)
-    try {
-      logManager.getOrCreateLog(request.topic, partition).append(request.messages)
-      trace(request.messages.sizeInBytes + " bytes written to logs.")
-    } catch {
-      case e =>
-        error("Error processing " + requestHandlerName + " on " + request.topic + ":" + partition, e)
-        e match {
-          case _: IOException => 
-            fatal("Halting due to unrecoverable I/O error while handling producer request: " + e.getMessage, e)
-            System.exit(1)
-          case _ =>
+  private def handleProducerRequest(request: ProducerRequest, requestHandlerName: String): Option[ProducerResponse] = {
+	val requestSize = request.data.size
+	val errors = new Array[Int](requestSize)
+	val offsets = new Array[Long](requestSize)
+	
+    request.data.foreach(d => {
+	  d.partitionData.foreach(p => {
+        val partition = p.getTranslatedPartition(d.topic, logManager.chooseRandomPartition)
+        try {
+          logManager.getOrCreateLog(d.topic, partition).append(p.messages)
+          trace(p.messages.sizeInBytes + " bytes written to logs.")
+          p.messages.foreach(m => trace("wrote message %s to disk".format(m.message.checksum)))
         }
-        throw e
-    }
-    None
+        catch {
+          case e =>
+            //TODO: handle response in ProducerResponse
+            error("Error processing " + requestHandlerName + " on " + d.topic + ":" + partition, e)
+            e match {
+              case _: IOException =>
+                fatal("Halting due to unrecoverable I/O error while handling producer request: " + e.getMessage, e)
+                Runtime.getRuntime.halt(1)
+              case _ =>
+            }
+          //throw e
+        }
+      })
+    //None
+    })
+    if (request.requiredAcks == 0)
+      None
+    else
+      None //TODO: send when KAFKA-49 can receive this Some(new ProducerResponse(request.versionId, request.correlationId, errors, offsets))
   }
 
   def handleFetchRequest(request: Receive): Option[Send] = {

Modified: incubator/kafka/branches/0.8/core/src/main/scala/kafka/utils/Utils.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/main/scala/kafka/utils/Utils.scala?rev=1296577&r1=1296576&r2=1296577&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/main/scala/kafka/utils/Utils.scala (original)
+++ incubator/kafka/branches/0.8/core/src/main/scala/kafka/utils/Utils.scala Sat Mar  3 05:46:43 2012
@@ -195,6 +195,9 @@ object Utils extends Logging {
   def getInt(props: Properties, name: String, default: Int): Int = 
     getIntInRange(props, name, default, (Int.MinValue, Int.MaxValue))
   
+  def getShort(props: Properties, name: String, default: Short): Short = 
+    getShortInRange(props, name, default, (Short.MinValue, Short.MaxValue))
+
   /**
    * Read an integer from the properties instance. Throw an exception 
    * if the value is not in the given range (inclusive)
@@ -217,6 +220,18 @@ object Utils extends Logging {
       v
   }
 
+ def getShortInRange(props: Properties, name: String, default: Short, range: (Short, Short)): Short = {
+    val v = 
+      if(props.containsKey(name))
+        props.getProperty(name).toShort
+      else
+        default
+    if(v < range._1 || v > range._2)
+      throw new IllegalArgumentException(name + " has value " + v + " which is not in the range " + range + ".")
+    else
+      v
+  }
+
   def getIntInRange(buffer: ByteBuffer, name: String, range: (Int, Int)): Int = {
     val value = buffer.getInt
     if(value < range._1 || value > range._2)
@@ -777,4 +792,4 @@ class SnapshotStats(private val monitorD
 
     def durationMs: Double = (end.get - start) / (1000.0 * 1000.0)
   }
-}
+}
\ No newline at end of file

Modified: incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/message/ByteBufferMessageSetTest.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/message/ByteBufferMessageSetTest.scala?rev=1296577&r1=1296576&r2=1296577&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/message/ByteBufferMessageSetTest.scala (original)
+++ incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/message/ByteBufferMessageSetTest.scala Sat Mar  3 05:46:43 2012
@@ -33,7 +33,7 @@ class ByteBufferMessageSetTest extends B
     // create a ByteBufferMessageSet that doesn't contain a full message
     // iterating it should get an InvalidMessageSizeException
     val messages = new ByteBufferMessageSet(NoCompressionCodec, new Message("01234567890123456789".getBytes()))
-    val buffer = messages.serialized.slice
+    val buffer = messages.getSerialized().slice
     buffer.limit(10)
     val messageSetWithNoFullMessage = new ByteBufferMessageSet(buffer = buffer, initialOffset = 1000)
     try {
@@ -51,7 +51,7 @@ class ByteBufferMessageSetTest extends B
     {
       val messages = new ByteBufferMessageSet(NoCompressionCodec, new Message("hello".getBytes()), new Message("there".getBytes()))
       val buffer = ByteBuffer.allocate(messages.sizeInBytes.toInt + 2)
-      buffer.put(messages.serialized)
+      buffer.put(messages.getSerialized())
       buffer.putShort(4)
       val messagesPlus = new ByteBufferMessageSet(buffer)
       assertEquals("Adding invalid bytes shouldn't change byte count", messages.validBytes, messagesPlus.validBytes)
@@ -93,7 +93,7 @@ class ByteBufferMessageSetTest extends B
       //make sure ByteBufferMessageSet is re-iterable.
       TestUtils.checkEquals[Message](messageList.iterator, TestUtils.getMessageIterator(messageSet.iterator))
       //make sure the last offset after iteration is correct
-      assertEquals("offset of last message not expected", messageSet.last.offset, messageSet.serialized.limit)
+      assertEquals("offset of last message not expected", messageSet.last.offset, messageSet.getSerialized().limit)
     }
 
     // test for compressed regular messages
@@ -103,7 +103,7 @@ class ByteBufferMessageSetTest extends B
       //make sure ByteBufferMessageSet is re-iterable.
       TestUtils.checkEquals[Message](messageList.iterator, TestUtils.getMessageIterator(messageSet.iterator))
       //make sure the last offset after iteration is correct
-      assertEquals("offset of last message not expected", messageSet.last.offset, messageSet.serialized.limit)
+      assertEquals("offset of last message not expected", messageSet.last.offset, messageSet.getSerialized().limit)
     }
 
     // test for mixed empty and non-empty messagesets uncompressed
@@ -111,16 +111,16 @@ class ByteBufferMessageSetTest extends B
       val emptyMessageList : List[Message] = Nil
       val emptyMessageSet = new ByteBufferMessageSet(NoCompressionCodec, emptyMessageList: _*)
       val regularMessgeSet = new ByteBufferMessageSet(NoCompressionCodec, messageList: _*)
-      val buffer = ByteBuffer.allocate(emptyMessageSet.serialized.limit + regularMessgeSet.serialized.limit)
-      buffer.put(emptyMessageSet.serialized)
-      buffer.put(regularMessgeSet.serialized)
+      val buffer = ByteBuffer.allocate(emptyMessageSet.getSerialized().limit + regularMessgeSet.getSerialized().limit)
+      buffer.put(emptyMessageSet.getSerialized())
+      buffer.put(regularMessgeSet.getSerialized())
       buffer.rewind
       val mixedMessageSet = new ByteBufferMessageSet(buffer, 0, 0)
       TestUtils.checkEquals[Message](messageList.iterator, TestUtils.getMessageIterator(mixedMessageSet.iterator))
       //make sure ByteBufferMessageSet is re-iterable.
       TestUtils.checkEquals[Message](messageList.iterator, TestUtils.getMessageIterator(mixedMessageSet.iterator))
       //make sure the last offset after iteration is correct
-      assertEquals("offset of last message not expected", mixedMessageSet.last.offset, mixedMessageSet.serialized.limit)
+      assertEquals("offset of last message not expected", mixedMessageSet.last.offset, mixedMessageSet.getSerialized().limit)
     }
 
     // test for mixed empty and non-empty messagesets compressed
@@ -128,16 +128,16 @@ class ByteBufferMessageSetTest extends B
       val emptyMessageList : List[Message] = Nil
       val emptyMessageSet = new ByteBufferMessageSet(DefaultCompressionCodec, emptyMessageList: _*)
       val regularMessgeSet = new ByteBufferMessageSet(DefaultCompressionCodec, messageList: _*)
-      val buffer = ByteBuffer.allocate(emptyMessageSet.serialized.limit + regularMessgeSet.serialized.limit)
-      buffer.put(emptyMessageSet.serialized)
-      buffer.put(regularMessgeSet.serialized)
+      val buffer = ByteBuffer.allocate(emptyMessageSet.getSerialized().limit + regularMessgeSet.getSerialized().limit)
+      buffer.put(emptyMessageSet.getSerialized())
+      buffer.put(regularMessgeSet.getSerialized())
       buffer.rewind
       val mixedMessageSet = new ByteBufferMessageSet(buffer, 0, 0)
       TestUtils.checkEquals[Message](messageList.iterator, TestUtils.getMessageIterator(mixedMessageSet.iterator))
       //make sure ByteBufferMessageSet is re-iterable.
       TestUtils.checkEquals[Message](messageList.iterator, TestUtils.getMessageIterator(mixedMessageSet.iterator))
       //make sure the last offset after iteration is correct
-      assertEquals("offset of last message not expected", mixedMessageSet.last.offset, mixedMessageSet.serialized.limit)
+      assertEquals("offset of last message not expected", mixedMessageSet.last.offset, mixedMessageSet.getSerialized().limit)
     }
   }
 

Modified: incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/producer/AsyncProducerTest.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/producer/AsyncProducerTest.scala?rev=1296577&r1=1296576&r2=1296577&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/producer/AsyncProducerTest.scala (original)
+++ incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/producer/AsyncProducerTest.scala Sat Mar  3 05:46:43 2012
@@ -381,11 +381,12 @@ class AsyncProducerTest extends JUnit3Su
     val mockSyncProducer = EasyMock.createMock(classOf[SyncProducer])
     mockSyncProducer.send(new TopicMetadataRequest(List(topic)))
     EasyMock.expectLastCall().andReturn(List(topic1Metadata))
-    mockSyncProducer.multiSend(EasyMock.aryEq(Array(new ProducerRequest(topic, 0, messagesToSet(msgs.take(5))))))
+    mockSyncProducer.send(TestUtils.produceRequest(topic, 0,
+    messagesToSet(msgs.take(5))))
     EasyMock.expectLastCall
-    mockSyncProducer.multiSend(EasyMock.aryEq(Array(new ProducerRequest(topic, 0, messagesToSet(msgs.takeRight(5))))))
-    EasyMock.expectLastCall
-    EasyMock.replay(mockSyncProducer)
+    mockSyncProducer.send(TestUtils.produceRequest(topic, 0,
+    messagesToSet(msgs.takeRight(5))))
+	EasyMock.replay(mockSyncProducer)
 
     val producerPool = EasyMock.createMock(classOf[ProducerPool])
     producerPool.getZkClient
@@ -495,10 +496,7 @@ class AsyncProducerTest extends JUnit3Su
   }
 
   class MockProducer(override val config: SyncProducerConfig) extends SyncProducer(config) {
-    override def send(topic: String, messages: ByteBufferMessageSet): Unit = {
-      Thread.sleep(1000)
-    }
-    override def multiSend(produces: Array[ProducerRequest]) {
+    override def send(produceRequest: ProducerRequest): Unit = {
       Thread.sleep(1000)
     }
   }

Modified: incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/producer/SyncProducerTest.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/producer/SyncProducerTest.scala?rev=1296577&r1=1296576&r2=1296577&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/producer/SyncProducerTest.scala (original)
+++ incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/producer/SyncProducerTest.scala Sat Mar  3 05:46:43 2012
@@ -44,7 +44,7 @@ class SyncProducerTest extends JUnit3Sui
     var failed = false
     val firstStart = SystemTime.milliseconds
     try {
-      producer.send("test", 0, new ByteBufferMessageSet(compressionCodec = NoCompressionCodec, messages = new Message(messageBytes)))
+      producer.send(TestUtils.produceRequest("test", 0, new ByteBufferMessageSet(compressionCodec = NoCompressionCodec, messages = new Message(messageBytes))))
     }catch {
       case e: Exception => failed=true
     }
@@ -54,7 +54,7 @@ class SyncProducerTest extends JUnit3Sui
     Assert.assertTrue((firstEnd-firstStart) < 500)
     val secondStart = SystemTime.milliseconds
     try {
-      producer.send("test", 0, new ByteBufferMessageSet(compressionCodec = NoCompressionCodec, messages = new Message(messageBytes)))
+      producer.send(TestUtils.produceRequest("test", 0, new ByteBufferMessageSet(compressionCodec = NoCompressionCodec, messages = new Message(messageBytes))))
     }catch {
       case e: Exception => failed = true
     }
@@ -63,7 +63,7 @@ class SyncProducerTest extends JUnit3Sui
     Assert.assertTrue((secondEnd-secondStart) < 500)
 
     try {
-      producer.multiSend(Array(new ProducerRequest("test", 0, new ByteBufferMessageSet(compressionCodec = NoCompressionCodec, messages = new Message(messageBytes)))))
+      producer.send(TestUtils.produceRequest("test", 0, new ByteBufferMessageSet(compressionCodec = NoCompressionCodec, messages = new Message(messageBytes))))
     }catch {
       case e: Exception => failed=true
     }
@@ -83,7 +83,7 @@ class SyncProducerTest extends JUnit3Sui
     val bytes = new Array[Byte](101)
     var failed = false
     try {
-      producer.send("test", 0, new ByteBufferMessageSet(compressionCodec = NoCompressionCodec, messages = new Message(bytes)))
+      producer.send(TestUtils.produceRequest("test", 0, new ByteBufferMessageSet(compressionCodec = NoCompressionCodec, messages = new Message(bytes))))
     }catch {
       case e: MessageSizeTooLargeException => failed = true
     }

Modified: incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/utils/TestUtils.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/utils/TestUtils.scala?rev=1296577&r1=1296576&r2=1296577&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/utils/TestUtils.scala (original)
+++ incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/utils/TestUtils.scala Sat Mar  3 05:46:43 2012
@@ -33,6 +33,7 @@ import collection.mutable.ListBuffer
 import kafka.consumer.{KafkaMessageStream, ConsumerConfig}
 import scala.collection.Map
 import kafka.serializer.Encoder
+import kafka.api.{ProducerRequest, TopicData, PartitionData}
 
 /**
  * Utility functions to help with testing
@@ -336,7 +337,47 @@ object TestUtils {
       buffer += ("msg" + i)
     buffer
   }
+  /**
+   * Create a wired format request based on simple basic information
+   */
+  def produceRequest(topic: String, message: ByteBufferMessageSet): kafka.api.ProducerRequest = {
+    produceRequest(SyncProducerConfig.DefaultCorrelationId,topic,ProducerRequest.RandomPartition,message)
+  }
+  def produceRequest(topic: String, partition: Int, message: ByteBufferMessageSet): kafka.api.ProducerRequest = {
+    produceRequest(SyncProducerConfig.DefaultCorrelationId,topic,partition,message)
+  }
+
+  def produceRequest(correlationId: Int, topic: String, partition: Int, message: ByteBufferMessageSet): kafka.api.ProducerRequest = {
+    val clientId = SyncProducerConfig.DefaultClientId
+    val requiredAcks: Short = SyncProducerConfig.DefaultRequiredAcks
+    val ackTimeout = SyncProducerConfig.DefaultAckTimeoutMs
+    var data = new Array[TopicData](1)
+    var partitionData = new Array[PartitionData](1)
+    partitionData(0) = new PartitionData(partition,message)
+    data(0) = new TopicData(topic,partitionData)
+    val pr = new kafka.api.ProducerRequest(correlationId, clientId, requiredAcks, ackTimeout, data)  	
+    pr
+  }
 
+  def produceJavaRequest(topic: String, message: kafka.javaapi.message.ByteBufferMessageSet): kafka.javaapi.ProducerRequest = {
+    produceJavaRequest(-1,topic,-1,message)
+  }
+
+  def produceJavaRequest(topic: String, partition: Int, message: kafka.javaapi.message.ByteBufferMessageSet): kafka.javaapi.ProducerRequest = {
+    produceJavaRequest(-1,topic,partition,message)
+  }
+
+  def produceJavaRequest(correlationId: Int, topic: String, partition: Int, message: kafka.javaapi.message.ByteBufferMessageSet): kafka.javaapi.ProducerRequest = {
+    val clientId = "test"
+    val requiredAcks: Short = 0
+    val ackTimeout = 0
+    var data = new Array[TopicData](1)
+    var partitionData = new Array[PartitionData](1)
+    partitionData(0) = new PartitionData(partition,message.underlying)
+    data(0) = new TopicData(topic,partitionData)
+    val pr = new kafka.javaapi.ProducerRequest(correlationId, clientId, requiredAcks, ackTimeout, data)  	
+    pr
+  }
 }
 
 object TestZKUtils {