You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by ne...@apache.org on 2014/09/15 05:40:44 UTC

git commit: KAFKA-1496 Using batch message in sync producer only sends the first message if we use a Scala Stream as the argument; reviewed by Neha Narkhede

Repository: kafka
Updated Branches:
  refs/heads/trunk 3e8854c0c -> d677701b9


KAFKA-1496 Using batch message in sync producer only sends the first message if we use a Scala Stream as the argument; reviewed by Neha Narkhede


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/d677701b
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/d677701b
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/d677701b

Branch: refs/heads/trunk
Commit: d677701b93b3b792fac6191588984f7a5323eecd
Parents: 3e8854c
Author: Guan Liao <gu...@mailchimp.com>
Authored: Sun Sep 14 20:40:05 2014 -0700
Committer: Neha Narkhede <ne...@gmail.com>
Committed: Sun Sep 14 20:40:35 2014 -0700

----------------------------------------------------------------------
 .../main/scala/kafka/producer/async/DefaultEventHandler.scala  | 2 +-
 .../src/test/scala/unit/kafka/producer/AsyncProducerTest.scala | 6 ++++++
 2 files changed, 7 insertions(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/d677701b/core/src/main/scala/kafka/producer/async/DefaultEventHandler.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/producer/async/DefaultEventHandler.scala b/core/src/main/scala/kafka/producer/async/DefaultEventHandler.scala
index d8ac915..33470ff 100644
--- a/core/src/main/scala/kafka/producer/async/DefaultEventHandler.scala
+++ b/core/src/main/scala/kafka/producer/async/DefaultEventHandler.scala
@@ -122,7 +122,7 @@ class DefaultEventHandler[K,V](config: ProducerConfig,
 
   def serialize(events: Seq[KeyedMessage[K,V]]): Seq[KeyedMessage[K,Message]] = {
     val serializedMessages = new ArrayBuffer[KeyedMessage[K,Message]](events.size)
-    events.map{e =>
+    events.foreach{e =>
       try {
         if(e.hasKey)
           serializedMessages += new KeyedMessage[K,Message](topic = e.topic, key = e.key, partKey = e.partKey, message = new Message(key = keyEncoder.toBytes(e.key), bytes = encoder.toBytes(e.message)))

http://git-wip-us.apache.org/repos/asf/kafka/blob/d677701b/core/src/test/scala/unit/kafka/producer/AsyncProducerTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/producer/AsyncProducerTest.scala b/core/src/test/scala/unit/kafka/producer/AsyncProducerTest.scala
index 906600c..1db6ac3 100644
--- a/core/src/test/scala/unit/kafka/producer/AsyncProducerTest.scala
+++ b/core/src/test/scala/unit/kafka/producer/AsyncProducerTest.scala
@@ -230,7 +230,13 @@ class AsyncProducerTest extends JUnit3Suite {
 
     val serializedData = handler.serialize(produceData)
     val deserializedData = serializedData.map(d => new KeyedMessage[String,String](d.topic, Utils.readString(d.message.payload)))
+
+    // Test that the serialize handles seq from a Stream
+    val streamedSerializedData = handler.serialize(Stream(produceData:_*))
+    val deserializedStreamData = streamedSerializedData.map(d => new KeyedMessage[String,String](d.topic, Utils.readString(d.message.payload)))
+
     TestUtils.checkEquals(produceData.iterator, deserializedData.iterator)
+    TestUtils.checkEquals(produceData.iterator, deserializedStreamData.iterator)
   }
 
   @Test