You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by ju...@apache.org on 2012/04/06 18:59:45 UTC
svn commit: r1310482 -
/incubator/kafka/trunk/core/src/main/scala/kafka/producer/async/ProducerSendThread.scala
Author: junrao
Date: Fri Apr 6 16:59:44 2012
New Revision: 1310482
URL: http://svn.apache.org/viewvc?rev=1310482&view=rev
Log:
CallbackHandler.afterDequeuingExistingData is not called during event queue timeout; patched by Jun Rao; reviewed by Neha Narkhede; KAFKA-326
Modified:
incubator/kafka/trunk/core/src/main/scala/kafka/producer/async/ProducerSendThread.scala
Modified: incubator/kafka/trunk/core/src/main/scala/kafka/producer/async/ProducerSendThread.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/trunk/core/src/main/scala/kafka/producer/async/ProducerSendThread.scala?rev=1310482&r1=1310481&r2=1310482&view=diff
==============================================================================
--- incubator/kafka/trunk/core/src/main/scala/kafka/producer/async/ProducerSendThread.scala (original)
+++ incubator/kafka/trunk/core/src/main/scala/kafka/producer/async/ProducerSendThread.scala Fri Apr 6 16:59:44 2012
@@ -73,18 +73,21 @@ private[async] class ProducerSendThread[
// check if the queue time is reached. This happens when the poll method above returns after a timeout and
// returns a null object
val expired = currentQueueItem == null
- if(currentQueueItem != null) {
+ if(currentQueueItem != null)
trace("Dequeued item for topic %s and partition %d"
.format(currentQueueItem.getTopic, currentQueueItem.getPartition))
- // handle the dequeued current item
- if(cbkHandler != null)
- events = events ++ cbkHandler.afterDequeuingExistingData(currentQueueItem)
- else
- events += currentQueueItem
- // check if the batch size is reached
- full = events.size >= batchSize
+ // handle the dequeued current item
+ if(cbkHandler != null)
+ events = events ++ cbkHandler.afterDequeuingExistingData(currentQueueItem)
+ else {
+ if (currentQueueItem != null)
+ events += currentQueueItem
}
+
+ // check if the batch size is reached
+ full = events.size >= batchSize
+
if(full || expired) {
if(expired) debug(elapsed + " ms elapsed. Queue time reached. Sending..")
if(full) debug("Batch full. Sending..")