You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by ne...@apache.org on 2011/08/19 02:29:10 UTC

svn commit: r1159452 - in /incubator/kafka/trunk/core/src: main/scala/kafka/producer/async/DefaultEventHandler.scala test/scala/unit/kafka/producer/AsyncProducerTest.scala

Author: nehanarkhede
Date: Fri Aug 19 00:29:09 2011
New Revision: 1159452

URL: http://svn.apache.org/viewvc?rev=1159452&view=rev
Log:
Bug in serialize and collate logic in the DefaultEventHandler KAFKA-107; patched by Neha; reviewed by Jun

Modified:
    incubator/kafka/trunk/core/src/main/scala/kafka/producer/async/DefaultEventHandler.scala
    incubator/kafka/trunk/core/src/test/scala/unit/kafka/producer/AsyncProducerTest.scala

Modified: incubator/kafka/trunk/core/src/main/scala/kafka/producer/async/DefaultEventHandler.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/trunk/core/src/main/scala/kafka/producer/async/DefaultEventHandler.scala?rev=1159452&r1=1159451&r2=1159452&view=diff
==============================================================================
--- incubator/kafka/trunk/core/src/main/scala/kafka/producer/async/DefaultEventHandler.scala (original)
+++ incubator/kafka/trunk/core/src/main/scala/kafka/producer/async/DefaultEventHandler.scala Fri Aug 19 00:29:09 2011
@@ -23,8 +23,9 @@ import org.apache.log4j.Logger
 import kafka.api.ProducerRequest
 import kafka.serializer.Encoder
 import java.util.Properties
+import kafka.producer.{ProducerConfig, SyncProducer}
 import kafka.message.{NoCompressionCodec, ByteBufferMessageSet}
-import kafka.producer.{ProducerConfig, SyncProducerConfigShared, SyncProducerConfig, SyncProducer}
+
 
 private[kafka] class DefaultEventHandler[T](val config: ProducerConfig,
                                             val cbkHandler: CallbackHandler[T]) extends EventHandler[T] {
@@ -37,6 +38,7 @@ private[kafka] class DefaultEventHandler
     var processedEvents = events
     if(cbkHandler != null)
       processedEvents = cbkHandler.beforeSendingData(events)
+
     send(serialize(collate(processedEvents), serializer), syncProducer)
   }
 
@@ -51,7 +53,6 @@ private[kafka] class DefaultEventHandler
 
   private def serialize(eventsPerTopic: Map[(String,Int), Seq[T]],
                         serializer: Encoder[T]): Map[(String, Int), ByteBufferMessageSet] = {
-    import scala.collection.JavaConversions._
     val eventsPerTopicMap = eventsPerTopic.map(e => ((e._1._1, e._1._2) , e._2.map(l => serializer.toMessage(l))))
     val topicsAndPartitions = eventsPerTopic.map(e => e._1)
     /** enforce the compressed.topics config here.
@@ -60,34 +61,36 @@ private[kafka] class DefaultEventHandler
      *    If the list of compressed topics is empty, then enable the specified compression codec for all topics
      *  If the compression codec is NoCompressionCodec, compression is disabled for all topics
      */
-    val messages = eventsPerTopicMap.map(e => {
-      config.compressionCodec match {
-        case NoCompressionCodec =>
-          if(logger.isDebugEnabled)
-            logger.debug("Sending %d messages with no compression".format(e._2.size))
-          new ByteBufferMessageSet(NoCompressionCodec, e._2: _*)
-        case _ =>
-          config.compressedTopics.size match {
-            case 0 =>
-              if(logger.isDebugEnabled)
-                logger.debug("Sending %d messages with compression %d".format(e._2.size, config.compressionCodec.codec))
-              new ByteBufferMessageSet(config.compressionCodec, e._2: _*)
-            case _ =>
-              if(config.compressedTopics.contains(e._1._1)) {
-                if(logger.isDebugEnabled)
-                  logger.debug("Sending %d messages with compression %d".format(e._2.size, config.compressionCodec.codec))
-                new ByteBufferMessageSet(config.compressionCodec, e._2: _*)
-              }
-              else {
+
+    val messagesPerTopicPartition = eventsPerTopicMap.map { topicAndEvents =>
+      ((topicAndEvents._1._1, topicAndEvents._1._2),
+        config.compressionCodec match {
+          case NoCompressionCodec =>
+            if(logger.isDebugEnabled)
+              logger.debug("Sending %d messages with no compression".format(topicAndEvents._2.size))
+            new ByteBufferMessageSet(NoCompressionCodec, topicAndEvents._2: _*)
+          case _ =>
+            config.compressedTopics.size match {
+              case 0 =>
                 if(logger.isDebugEnabled)
-                  logger.debug("Sending %d messages with no compression as %s is not in compressed.topics - %s"
-                    .format(e._2.size, e._1._1, config.compressedTopics.toString))
-                new ByteBufferMessageSet(NoCompressionCodec, e._2: _*)
-              }
-          }
-      }
-    })
-    topicsAndPartitions.zip(messages)
+                  logger.debug("Sending %d messages with compression %d".format(topicAndEvents._2.size, config.compressionCodec.codec))
+                new ByteBufferMessageSet(config.compressionCodec, topicAndEvents._2: _*)
+              case _ =>
+                if(config.compressedTopics.contains(topicAndEvents._1._1)) {
+                  if(logger.isDebugEnabled)
+                    logger.debug("Sending %d messages with compression %d".format(topicAndEvents._2.size, config.compressionCodec.codec))
+                  new ByteBufferMessageSet(config.compressionCodec, topicAndEvents._2: _*)
+                }
+                else {
+                  if(logger.isDebugEnabled)
+                    logger.debug("Sending %d messages with no compression as %s is not in compressed.topics - %s"
+                      .format(topicAndEvents._2.size, topicAndEvents._1._1, config.compressedTopics.toString))
+                  new ByteBufferMessageSet(NoCompressionCodec, topicAndEvents._2: _*)
+                }
+            }
+        })
+    }
+    messagesPerTopicPartition
   }
 
   private def collate(events: Seq[QueueItem[T]]): Map[(String,Int), Seq[T]] = {
@@ -100,8 +103,8 @@ private[kafka] class DefaultEventHandler
       val topicEvents = remainingEvents partition (e => e.getTopic.equals(topic))
       remainingEvents = topicEvents._2
       distinctPartitions.foreach { p =>
-        val topicPartitionEvents = topicEvents._1 partition (e => (e.getPartition == p))
-        collatedEvents += ( (topic, p) -> topicPartitionEvents._1.map(q => q.getData).toSeq)
+        val topicPartitionEvents = (topicEvents._1 partition (e => (e.getPartition == p)))._1
+        collatedEvents += ( (topic, p) -> topicPartitionEvents.map(q => q.getData))
       }
     }
     collatedEvents

Modified: incubator/kafka/trunk/core/src/test/scala/unit/kafka/producer/AsyncProducerTest.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/trunk/core/src/test/scala/unit/kafka/producer/AsyncProducerTest.scala?rev=1159452&r1=1159451&r2=1159452&view=diff
==============================================================================
--- incubator/kafka/trunk/core/src/test/scala/unit/kafka/producer/AsyncProducerTest.scala (original)
+++ incubator/kafka/trunk/core/src/test/scala/unit/kafka/producer/AsyncProducerTest.scala Fri Aug 19 00:29:09 2011
@@ -243,6 +243,48 @@ class AsyncProducerTest extends JUnitSui
 
   }
 
+  @Test
+  def testCollateAndSerializeEvents() {
+    val basicProducer = EasyMock.createMock(classOf[SyncProducer])
+    basicProducer.multiSend(EasyMock.aryEq(Array(new ProducerRequest(topic2, 1,
+                                                                     getMessageSetOfSize(List(message2), 5)),
+                                                 new ProducerRequest(topic1, 0,
+                                                                     getMessageSetOfSize(List(message1), 5)),
+                                                 new ProducerRequest(topic1, 1,
+                                                                     getMessageSetOfSize(List(message1), 5)),
+                                                 new ProducerRequest(topic2, 0,
+                                                                     getMessageSetOfSize(List(message2), 5)))))
+
+    EasyMock.expectLastCall
+    basicProducer.close
+    EasyMock.expectLastCall
+    EasyMock.replay(basicProducer)
+
+    val props = new Properties()
+    props.put("host", "localhost")
+    props.put("port", "9092")
+    props.put("queue.size", "50")
+    props.put("serializer.class", "kafka.producer.StringSerializer")
+    props.put("batch.size", "20")
+
+    val config = new AsyncProducerConfig(props)
+
+    val producer = new AsyncProducer[String](config, basicProducer, new StringSerializer)
+
+    producer.start
+    val serializer = new StringSerializer
+    for(i <- 0 until 5) {
+      producer.send(topic2, messageContent2, 0)
+      producer.send(topic2, messageContent2, 1)
+      producer.send(topic1, messageContent1, 0)
+      producer.send(topic1, messageContent1, 1)
+    }
+
+    producer.close
+    EasyMock.verify(basicProducer)
+
+  }
+
   private def getMessageSetOfSize(messages: List[Message], counts: Int): ByteBufferMessageSet = {
     var messageList = new Array[Message](counts)
     for(message <- messages) {