You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by jk...@apache.org on 2012/12/18 18:44:12 UTC

[28/30] git commit: ProducerRequest should take ByteBufferMessageSet instead of MessageSet; patched by Jun Rao; reviewed by Neha Narkhede; KAFKA-632

ProducerRequest should take ByteBufferMessageSet instead of MessageSet; patched by Jun Rao; reviewed by Neha Narkhede; KAFKA-632

git-svn-id: https://svn.apache.org/repos/asf/kafka/branches/0.8@1414917 13f79535-47bb-0310-9956-ffa450edef68


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/bdb04bba
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/bdb04bba
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/bdb04bba

Branch: refs/heads/trunk
Commit: bdb04bba67326bdf1451df3ec4a855ed3c109869
Parents: 961affa
Author: Jun Rao <ju...@apache.org>
Authored: Wed Nov 28 21:05:17 2012 +0000
Committer: Jun Rao <ju...@apache.org>
Committed: Wed Nov 28 21:05:17 2012 +0000

----------------------------------------------------------------------
 .../src/main/scala/kafka/api/ProducerRequest.scala |    6 +++---
 .../kafka/producer/async/DefaultEventHandler.scala |    6 +-----
 .../unit/kafka/producer/SyncProducerTest.scala     |    2 +-
 3 files changed, 5 insertions(+), 9 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/bdb04bba/core/src/main/scala/kafka/api/ProducerRequest.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/api/ProducerRequest.scala b/core/src/main/scala/kafka/api/ProducerRequest.scala
index 1713dee..87700a0 100644
--- a/core/src/main/scala/kafka/api/ProducerRequest.scala
+++ b/core/src/main/scala/kafka/api/ProducerRequest.scala
@@ -57,7 +57,7 @@ case class ProducerRequest(versionId: Short = ProducerRequest.CurrentVersion,
                            clientId: String,
                            requiredAcks: Short,
                            ackTimeoutMs: Int,
-                           data: Map[TopicAndPartition, MessageSet])
+                           data: Map[TopicAndPartition, ByteBufferMessageSet])
     extends RequestOrResponse(Some(RequestKeys.ProduceKey)) {
 
   /**
@@ -69,7 +69,7 @@ case class ProducerRequest(versionId: Short = ProducerRequest.CurrentVersion,
            clientId: String,
            requiredAcks: Short,
            ackTimeoutMs: Int,
-           data: Map[TopicAndPartition, MessageSet]) =
+           data: Map[TopicAndPartition, ByteBufferMessageSet]) =
     this(ProducerRequest.CurrentVersion, correlationId, clientId, requiredAcks, ackTimeoutMs, data)
 
   def writeTo(buffer: ByteBuffer) {
@@ -88,7 +88,7 @@ case class ProducerRequest(versionId: Short = ProducerRequest.CurrentVersion,
         topicAndPartitionData.foreach(partitionAndData => {
           val partition = partitionAndData._1.partition
           val partitionMessageData = partitionAndData._2
-          val bytes = partitionMessageData.asInstanceOf[ByteBufferMessageSet].buffer
+          val bytes = partitionMessageData.buffer
           buffer.putInt(partition)
           buffer.putInt(bytes.limit)
           buffer.put(bytes)

http://git-wip-us.apache.org/repos/asf/kafka/blob/bdb04bba/core/src/main/scala/kafka/producer/async/DefaultEventHandler.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/producer/async/DefaultEventHandler.scala b/core/src/main/scala/kafka/producer/async/DefaultEventHandler.scala
index a84be2d..9be87d0 100644
--- a/core/src/main/scala/kafka/producer/async/DefaultEventHandler.scala
+++ b/core/src/main/scala/kafka/producer/async/DefaultEventHandler.scala
@@ -210,12 +210,8 @@ class DefaultEventHandler[K,V](config: ProducerConfig,
       warn("Failed to send to broker %d with data %s".format(brokerId, messagesPerTopic))
       messagesPerTopic.keys.toSeq
     } else if(messagesPerTopic.size > 0) {
-      val topicPartitionDataPairs = messagesPerTopic.toSeq.map {
-        case (topicAndPartition, messages) =>
-          (topicAndPartition, messages)
-      }
       val producerRequest = new ProducerRequest(config.correlationId, config.clientId, config.requiredAcks,
-        config.requestTimeoutMs, Map(topicPartitionDataPairs:_*))
+        config.requestTimeoutMs, messagesPerTopic)
       try {
         val syncProducer = producerPool.getProducer(brokerId)
         val response = syncProducer.send(producerRequest)

http://git-wip-us.apache.org/repos/asf/kafka/blob/bdb04bba/core/src/test/scala/unit/kafka/producer/SyncProducerTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/producer/SyncProducerTest.scala b/core/src/test/scala/unit/kafka/producer/SyncProducerTest.scala
index ae34315..744554c 100644
--- a/core/src/test/scala/unit/kafka/producer/SyncProducerTest.scala
+++ b/core/src/test/scala/unit/kafka/producer/SyncProducerTest.scala
@@ -85,7 +85,7 @@ class SyncProducerTest extends JUnit3Suite with KafkaServerTestHarness {
     val clientId = SyncProducerConfig.DefaultClientId
     val ackTimeoutMs = SyncProducerConfig.DefaultAckTimeoutMs
     val ack = SyncProducerConfig.DefaultRequiredAcks
-    val emptyRequest = new kafka.api.ProducerRequest(correlationId, clientId, ack, ackTimeoutMs, Map[TopicAndPartition, MessageSet]())
+    val emptyRequest = new kafka.api.ProducerRequest(correlationId, clientId, ack, ackTimeoutMs, Map[TopicAndPartition, ByteBufferMessageSet]())
 
     val producer = new SyncProducer(new SyncProducerConfig(props))
     val response = producer.send(emptyRequest)