You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by ju...@apache.org on 2011/10/11 21:11:39 UTC

svn commit: r1182028 - /incubator/kafka/trunk/core/src/main/scala/kafka/consumer/PartitionTopicInfo.scala

Author: junrao
Date: Tue Oct 11 19:11:38 2011
New Revision: 1182028

URL: http://svn.apache.org/viewvc?rev=1182028&view=rev
Log:
ZK consumer may lose a chunk worth of message during rebalance; patched by Jun Rao; reviewed by Neha Narkhede; KAFKA-154

Modified:
    incubator/kafka/trunk/core/src/main/scala/kafka/consumer/PartitionTopicInfo.scala

Modified: incubator/kafka/trunk/core/src/main/scala/kafka/consumer/PartitionTopicInfo.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/trunk/core/src/main/scala/kafka/consumer/PartitionTopicInfo.scala?rev=1182028&r1=1182027&r2=1182028&view=diff
==============================================================================
--- incubator/kafka/trunk/core/src/main/scala/kafka/consumer/PartitionTopicInfo.scala (original)
+++ incubator/kafka/trunk/core/src/main/scala/kafka/consumer/PartitionTopicInfo.scala Tue Oct 11 19:11:38 2011
@@ -64,10 +64,10 @@ private[consumer] class PartitionTopicIn
       // update fetched offset to the compressed data chunk size, not the decompressed message set size
       if(logger.isTraceEnabled)
         logger.trace("Updating fetch offset = " + fetchedOffset.get + " with size = " + size)
+      chunkQueue.put(new FetchedDataChunk(messages, this, fetchOffset))
       val newOffset = fetchedOffset.addAndGet(size)
       if (logger.isDebugEnabled)
         logger.debug("updated fetch offset of ( %s ) to %d".format(this, newOffset))
-      chunkQueue.put(new FetchedDataChunk(messages, this, fetchOffset))
     }
     size
   }