You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by ju...@apache.org on 2012/04/06 18:59:45 UTC

svn commit: r1310482 - /incubator/kafka/trunk/core/src/main/scala/kafka/producer/async/ProducerSendThread.scala

Author: junrao
Date: Fri Apr  6 16:59:44 2012
New Revision: 1310482

URL: http://svn.apache.org/viewvc?rev=1310482&view=rev
Log:
CallbackHandler.afterDequeuingExistingData is not called during event queue timeout; patched by Jun Rao; reviewed by Neha Narkhede; KAFKA-326

Modified:
    incubator/kafka/trunk/core/src/main/scala/kafka/producer/async/ProducerSendThread.scala

Modified: incubator/kafka/trunk/core/src/main/scala/kafka/producer/async/ProducerSendThread.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/trunk/core/src/main/scala/kafka/producer/async/ProducerSendThread.scala?rev=1310482&r1=1310481&r2=1310482&view=diff
==============================================================================
--- incubator/kafka/trunk/core/src/main/scala/kafka/producer/async/ProducerSendThread.scala (original)
+++ incubator/kafka/trunk/core/src/main/scala/kafka/producer/async/ProducerSendThread.scala Fri Apr  6 16:59:44 2012
@@ -73,18 +73,21 @@ private[async] class ProducerSendThread[
         // check if the queue time is reached. This happens when the poll method above returns after a timeout and
         // returns a null object
         val expired = currentQueueItem == null
-        if(currentQueueItem != null) {
+        if(currentQueueItem != null)
           trace("Dequeued item for topic %s and partition %d"
               .format(currentQueueItem.getTopic, currentQueueItem.getPartition))
-          // handle the dequeued current item
-          if(cbkHandler != null)
-            events = events ++ cbkHandler.afterDequeuingExistingData(currentQueueItem)
-          else
-            events += currentQueueItem
 
-          // check if the batch size is reached
-          full = events.size >= batchSize
+        // handle the dequeued current item
+        if(cbkHandler != null)
+          events = events ++ cbkHandler.afterDequeuingExistingData(currentQueueItem)
+        else {
+          if (currentQueueItem != null)
+            events += currentQueueItem
         }
+
+        // check if the batch size is reached
+        full = events.size >= batchSize
+
         if(full || expired) {
           if(expired) debug(elapsed + " ms elapsed. Queue time reached. Sending..")
           if(full) debug("Batch full. Sending..")