You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by ju...@apache.org on 2011/10/11 21:11:39 UTC
svn commit: r1182028 -
/incubator/kafka/trunk/core/src/main/scala/kafka/consumer/PartitionTopicInfo.scala
Author: junrao
Date: Tue Oct 11 19:11:38 2011
New Revision: 1182028
URL: http://svn.apache.org/viewvc?rev=1182028&view=rev
Log:
ZK consumer may lose a chunk worth of message during rebalance; patched by Jun Rao; reviewed by Neha Narkhede; KAFKA-154
Modified:
incubator/kafka/trunk/core/src/main/scala/kafka/consumer/PartitionTopicInfo.scala
Modified: incubator/kafka/trunk/core/src/main/scala/kafka/consumer/PartitionTopicInfo.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/trunk/core/src/main/scala/kafka/consumer/PartitionTopicInfo.scala?rev=1182028&r1=1182027&r2=1182028&view=diff
==============================================================================
--- incubator/kafka/trunk/core/src/main/scala/kafka/consumer/PartitionTopicInfo.scala (original)
+++ incubator/kafka/trunk/core/src/main/scala/kafka/consumer/PartitionTopicInfo.scala Tue Oct 11 19:11:38 2011
@@ -64,10 +64,10 @@ private[consumer] class PartitionTopicIn
// update fetched offset to the compressed data chunk size, not the decompressed message set size
if(logger.isTraceEnabled)
logger.trace("Updating fetch offset = " + fetchedOffset.get + " with size = " + size)
+ chunkQueue.put(new FetchedDataChunk(messages, this, fetchOffset))
val newOffset = fetchedOffset.addAndGet(size)
if (logger.isDebugEnabled)
logger.debug("updated fetch offset of ( %s ) to %d".format(this, newOffset))
- chunkQueue.put(new FetchedDataChunk(messages, this, fetchOffset))
}
size
}