You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by ju...@apache.org on 2011/09/28 02:51:51 UTC

svn commit: r1176672 - in /incubator/kafka/trunk/core/src/main/scala/kafka/producer/async: DefaultEventHandler.scala ProducerSendThread.scala

Author: junrao
Date: Wed Sep 28 00:51:51 2011
New Revision: 1176672

URL: http://svn.apache.org/viewvc?rev=1176672&view=rev
Log:
Bug in the queue timeout logic of the async producer; patched by Neha Narkhede; reviewed by Jun Rao; KAFKA-138

Modified:
    incubator/kafka/trunk/core/src/main/scala/kafka/producer/async/DefaultEventHandler.scala
    incubator/kafka/trunk/core/src/main/scala/kafka/producer/async/ProducerSendThread.scala

Modified: incubator/kafka/trunk/core/src/main/scala/kafka/producer/async/DefaultEventHandler.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/trunk/core/src/main/scala/kafka/producer/async/DefaultEventHandler.scala?rev=1176672&r1=1176671&r2=1176672&view=diff
==============================================================================
--- incubator/kafka/trunk/core/src/main/scala/kafka/producer/async/DefaultEventHandler.scala (original)
+++ incubator/kafka/trunk/core/src/main/scala/kafka/producer/async/DefaultEventHandler.scala Wed Sep 28 00:51:51 2011
@@ -83,8 +83,8 @@ private[kafka] class DefaultEventHandler
               case _ =>
                 if(config.compressedTopics.contains(topicAndEvents._1._1)) {
                   if(logger.isTraceEnabled)
-                    logger.trace("Sending %d messages with compression %d to topic %s on partition %d"
-                      .format(topicAndEvents._2.size, topicAndEvents._1._1, topicAndEvents._1._2, config.compressionCodec.codec))
+                    logger.trace("Sending %d messages with compression codec %d to topic %s on partition %d"
+                      .format(topicAndEvents._2.size, config.compressionCodec.codec, topicAndEvents._1._1, topicAndEvents._1._2))
                   new ByteBufferMessageSet(config.compressionCodec, topicAndEvents._2: _*)
                 }
                 else {

Modified: incubator/kafka/trunk/core/src/main/scala/kafka/producer/async/ProducerSendThread.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/trunk/core/src/main/scala/kafka/producer/async/ProducerSendThread.scala?rev=1176672&r1=1176671&r2=1176672&view=diff
==============================================================================
--- incubator/kafka/trunk/core/src/main/scala/kafka/producer/async/ProducerSendThread.scala (original)
+++ incubator/kafka/trunk/core/src/main/scala/kafka/producer/async/ProducerSendThread.scala Wed Sep 28 00:51:51 2011
@@ -69,7 +69,7 @@ private[async] class ProducerSendThread[
     var full: Boolean = false
 
     // drain the queue until you get a shutdown command
-    Stream.continually(queue.poll(scala.math.max(0, queueTime - (lastSend - SystemTime.milliseconds)), TimeUnit.MILLISECONDS))
+    Stream.continually(queue.poll(scala.math.max(0, (lastSend + queueTime) - SystemTime.milliseconds), TimeUnit.MILLISECONDS))
                       .takeWhile(item => if(item != null) item.getData != shutdownCommand else true).foreach {
       currentQueueItem =>
         val elapsed = (SystemTime.milliseconds - lastSend)
@@ -129,4 +129,4 @@ private[async] class ProducerSendThread[
         logger.trace(event.getData.toString)
     }
   }
-}
\ No newline at end of file
+}