You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by ju...@apache.org on 2011/09/28 02:51:51 UTC
svn commit: r1176672 - in
/incubator/kafka/trunk/core/src/main/scala/kafka/producer/async:
DefaultEventHandler.scala ProducerSendThread.scala
Author: junrao
Date: Wed Sep 28 00:51:51 2011
New Revision: 1176672
URL: http://svn.apache.org/viewvc?rev=1176672&view=rev
Log:
Bug in the queue timeout logic of the async producer; patched by Neha Narkhede; reviewed by Jun Rao; KAFKA-138
Modified:
incubator/kafka/trunk/core/src/main/scala/kafka/producer/async/DefaultEventHandler.scala
incubator/kafka/trunk/core/src/main/scala/kafka/producer/async/ProducerSendThread.scala
Modified: incubator/kafka/trunk/core/src/main/scala/kafka/producer/async/DefaultEventHandler.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/trunk/core/src/main/scala/kafka/producer/async/DefaultEventHandler.scala?rev=1176672&r1=1176671&r2=1176672&view=diff
==============================================================================
--- incubator/kafka/trunk/core/src/main/scala/kafka/producer/async/DefaultEventHandler.scala (original)
+++ incubator/kafka/trunk/core/src/main/scala/kafka/producer/async/DefaultEventHandler.scala Wed Sep 28 00:51:51 2011
@@ -83,8 +83,8 @@ private[kafka] class DefaultEventHandler
case _ =>
if(config.compressedTopics.contains(topicAndEvents._1._1)) {
if(logger.isTraceEnabled)
- logger.trace("Sending %d messages with compression %d to topic %s on partition %d"
- .format(topicAndEvents._2.size, topicAndEvents._1._1, topicAndEvents._1._2, config.compressionCodec.codec))
+ logger.trace("Sending %d messages with compression codec %d to topic %s on partition %d"
+ .format(topicAndEvents._2.size, config.compressionCodec.codec, topicAndEvents._1._1, topicAndEvents._1._2))
new ByteBufferMessageSet(config.compressionCodec, topicAndEvents._2: _*)
}
else {
Modified: incubator/kafka/trunk/core/src/main/scala/kafka/producer/async/ProducerSendThread.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/trunk/core/src/main/scala/kafka/producer/async/ProducerSendThread.scala?rev=1176672&r1=1176671&r2=1176672&view=diff
==============================================================================
--- incubator/kafka/trunk/core/src/main/scala/kafka/producer/async/ProducerSendThread.scala (original)
+++ incubator/kafka/trunk/core/src/main/scala/kafka/producer/async/ProducerSendThread.scala Wed Sep 28 00:51:51 2011
@@ -69,7 +69,7 @@ private[async] class ProducerSendThread[
var full: Boolean = false
// drain the queue until you get a shutdown command
- Stream.continually(queue.poll(scala.math.max(0, queueTime - (lastSend - SystemTime.milliseconds)), TimeUnit.MILLISECONDS))
+ Stream.continually(queue.poll(scala.math.max(0, (lastSend + queueTime) - SystemTime.milliseconds), TimeUnit.MILLISECONDS))
.takeWhile(item => if(item != null) item.getData != shutdownCommand else true).foreach {
currentQueueItem =>
val elapsed = (SystemTime.milliseconds - lastSend)
@@ -129,4 +129,4 @@ private[async] class ProducerSendThread[
logger.trace(event.getData.toString)
}
}
-}
\ No newline at end of file
+}