You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by ne...@apache.org on 2011/08/19 02:29:10 UTC
svn commit: r1159452 - in /incubator/kafka/trunk/core/src:
main/scala/kafka/producer/async/DefaultEventHandler.scala
test/scala/unit/kafka/producer/AsyncProducerTest.scala
Author: nehanarkhede
Date: Fri Aug 19 00:29:09 2011
New Revision: 1159452
URL: http://svn.apache.org/viewvc?rev=1159452&view=rev
Log:
Bug in serialize and collate logic in the DefaultEventHandler KAFKA-107; patched by Neha; reviewed by Jun
Modified:
incubator/kafka/trunk/core/src/main/scala/kafka/producer/async/DefaultEventHandler.scala
incubator/kafka/trunk/core/src/test/scala/unit/kafka/producer/AsyncProducerTest.scala
Modified: incubator/kafka/trunk/core/src/main/scala/kafka/producer/async/DefaultEventHandler.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/trunk/core/src/main/scala/kafka/producer/async/DefaultEventHandler.scala?rev=1159452&r1=1159451&r2=1159452&view=diff
==============================================================================
--- incubator/kafka/trunk/core/src/main/scala/kafka/producer/async/DefaultEventHandler.scala (original)
+++ incubator/kafka/trunk/core/src/main/scala/kafka/producer/async/DefaultEventHandler.scala Fri Aug 19 00:29:09 2011
@@ -23,8 +23,9 @@ import org.apache.log4j.Logger
import kafka.api.ProducerRequest
import kafka.serializer.Encoder
import java.util.Properties
+import kafka.producer.{ProducerConfig, SyncProducer}
import kafka.message.{NoCompressionCodec, ByteBufferMessageSet}
-import kafka.producer.{ProducerConfig, SyncProducerConfigShared, SyncProducerConfig, SyncProducer}
+
private[kafka] class DefaultEventHandler[T](val config: ProducerConfig,
val cbkHandler: CallbackHandler[T]) extends EventHandler[T] {
@@ -37,6 +38,7 @@ private[kafka] class DefaultEventHandler
var processedEvents = events
if(cbkHandler != null)
processedEvents = cbkHandler.beforeSendingData(events)
+
send(serialize(collate(processedEvents), serializer), syncProducer)
}
@@ -51,7 +53,6 @@ private[kafka] class DefaultEventHandler
private def serialize(eventsPerTopic: Map[(String,Int), Seq[T]],
serializer: Encoder[T]): Map[(String, Int), ByteBufferMessageSet] = {
- import scala.collection.JavaConversions._
val eventsPerTopicMap = eventsPerTopic.map(e => ((e._1._1, e._1._2) , e._2.map(l => serializer.toMessage(l))))
val topicsAndPartitions = eventsPerTopic.map(e => e._1)
/** enforce the compressed.topics config here.
@@ -60,34 +61,36 @@ private[kafka] class DefaultEventHandler
* If the list of compressed topics is empty, then enable the specified compression codec for all topics
* If the compression codec is NoCompressionCodec, compression is disabled for all topics
*/
- val messages = eventsPerTopicMap.map(e => {
- config.compressionCodec match {
- case NoCompressionCodec =>
- if(logger.isDebugEnabled)
- logger.debug("Sending %d messages with no compression".format(e._2.size))
- new ByteBufferMessageSet(NoCompressionCodec, e._2: _*)
- case _ =>
- config.compressedTopics.size match {
- case 0 =>
- if(logger.isDebugEnabled)
- logger.debug("Sending %d messages with compression %d".format(e._2.size, config.compressionCodec.codec))
- new ByteBufferMessageSet(config.compressionCodec, e._2: _*)
- case _ =>
- if(config.compressedTopics.contains(e._1._1)) {
- if(logger.isDebugEnabled)
- logger.debug("Sending %d messages with compression %d".format(e._2.size, config.compressionCodec.codec))
- new ByteBufferMessageSet(config.compressionCodec, e._2: _*)
- }
- else {
+
+ val messagesPerTopicPartition = eventsPerTopicMap.map { topicAndEvents =>
+ ((topicAndEvents._1._1, topicAndEvents._1._2),
+ config.compressionCodec match {
+ case NoCompressionCodec =>
+ if(logger.isDebugEnabled)
+ logger.debug("Sending %d messages with no compression".format(topicAndEvents._2.size))
+ new ByteBufferMessageSet(NoCompressionCodec, topicAndEvents._2: _*)
+ case _ =>
+ config.compressedTopics.size match {
+ case 0 =>
if(logger.isDebugEnabled)
- logger.debug("Sending %d messages with no compression as %s is not in compressed.topics - %s"
- .format(e._2.size, e._1._1, config.compressedTopics.toString))
- new ByteBufferMessageSet(NoCompressionCodec, e._2: _*)
- }
- }
- }
- })
- topicsAndPartitions.zip(messages)
+ logger.debug("Sending %d messages with compression %d".format(topicAndEvents._2.size, config.compressionCodec.codec))
+ new ByteBufferMessageSet(config.compressionCodec, topicAndEvents._2: _*)
+ case _ =>
+ if(config.compressedTopics.contains(topicAndEvents._1._1)) {
+ if(logger.isDebugEnabled)
+ logger.debug("Sending %d messages with compression %d".format(topicAndEvents._2.size, config.compressionCodec.codec))
+ new ByteBufferMessageSet(config.compressionCodec, topicAndEvents._2: _*)
+ }
+ else {
+ if(logger.isDebugEnabled)
+ logger.debug("Sending %d messages with no compression as %s is not in compressed.topics - %s"
+ .format(topicAndEvents._2.size, topicAndEvents._1._1, config.compressedTopics.toString))
+ new ByteBufferMessageSet(NoCompressionCodec, topicAndEvents._2: _*)
+ }
+ }
+ })
+ }
+ messagesPerTopicPartition
}
private def collate(events: Seq[QueueItem[T]]): Map[(String,Int), Seq[T]] = {
@@ -100,8 +103,8 @@ private[kafka] class DefaultEventHandler
val topicEvents = remainingEvents partition (e => e.getTopic.equals(topic))
remainingEvents = topicEvents._2
distinctPartitions.foreach { p =>
- val topicPartitionEvents = topicEvents._1 partition (e => (e.getPartition == p))
- collatedEvents += ( (topic, p) -> topicPartitionEvents._1.map(q => q.getData).toSeq)
+ val topicPartitionEvents = (topicEvents._1 partition (e => (e.getPartition == p)))._1
+ collatedEvents += ( (topic, p) -> topicPartitionEvents.map(q => q.getData))
}
}
collatedEvents
Modified: incubator/kafka/trunk/core/src/test/scala/unit/kafka/producer/AsyncProducerTest.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/trunk/core/src/test/scala/unit/kafka/producer/AsyncProducerTest.scala?rev=1159452&r1=1159451&r2=1159452&view=diff
==============================================================================
--- incubator/kafka/trunk/core/src/test/scala/unit/kafka/producer/AsyncProducerTest.scala (original)
+++ incubator/kafka/trunk/core/src/test/scala/unit/kafka/producer/AsyncProducerTest.scala Fri Aug 19 00:29:09 2011
@@ -243,6 +243,48 @@ class AsyncProducerTest extends JUnitSui
}
+ @Test
+ def testCollateAndSerializeEvents() {
+ val basicProducer = EasyMock.createMock(classOf[SyncProducer])
+ basicProducer.multiSend(EasyMock.aryEq(Array(new ProducerRequest(topic2, 1,
+ getMessageSetOfSize(List(message2), 5)),
+ new ProducerRequest(topic1, 0,
+ getMessageSetOfSize(List(message1), 5)),
+ new ProducerRequest(topic1, 1,
+ getMessageSetOfSize(List(message1), 5)),
+ new ProducerRequest(topic2, 0,
+ getMessageSetOfSize(List(message2), 5)))))
+
+ EasyMock.expectLastCall
+ basicProducer.close
+ EasyMock.expectLastCall
+ EasyMock.replay(basicProducer)
+
+ val props = new Properties()
+ props.put("host", "localhost")
+ props.put("port", "9092")
+ props.put("queue.size", "50")
+ props.put("serializer.class", "kafka.producer.StringSerializer")
+ props.put("batch.size", "20")
+
+ val config = new AsyncProducerConfig(props)
+
+ val producer = new AsyncProducer[String](config, basicProducer, new StringSerializer)
+
+ producer.start
+ val serializer = new StringSerializer
+ for(i <- 0 until 5) {
+ producer.send(topic2, messageContent2, 0)
+ producer.send(topic2, messageContent2, 1)
+ producer.send(topic1, messageContent1, 0)
+ producer.send(topic1, messageContent1, 1)
+ }
+
+ producer.close
+ EasyMock.verify(basicProducer)
+
+ }
+
private def getMessageSetOfSize(messages: List[Message], counts: Int): ByteBufferMessageSet = {
var messageList = new Array[Message](counts)
for(message <- messages) {