You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by ju...@apache.org on 2013/04/04 02:15:19 UTC
git commit: kafka-846;
AbstractFetcherThread should do shallow instead of deep iteration;
patched by Jun Rao; reviewed by Neha Narkhede
Updated Branches:
refs/heads/0.8 bd262ac70 -> 5a50f7e55
kafka-846; AbstractFetcherThread should do shallow instead of deep iteration; patched by Jun Rao; reviewed by Neha Narkhede
Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/5a50f7e5
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/5a50f7e5
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/5a50f7e5
Branch: refs/heads/0.8
Commit: 5a50f7e555510b8cde08cd588cf4b67b06484b16
Parents: bd262ac
Author: Jun Rao <ju...@gmail.com>
Authored: Wed Apr 3 17:15:06 2013 -0700
Committer: Jun Rao <ju...@gmail.com>
Committed: Wed Apr 3 17:15:06 2013 -0700
----------------------------------------------------------------------
.../scala/kafka/consumer/PartitionTopicInfo.scala | 17 ++-------
.../scala/kafka/server/AbstractFetcherThread.scala | 28 +++++++++------
2 files changed, 20 insertions(+), 25 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/kafka/blob/5a50f7e5/core/src/main/scala/kafka/consumer/PartitionTopicInfo.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/consumer/PartitionTopicInfo.scala b/core/src/main/scala/kafka/consumer/PartitionTopicInfo.scala
index 9792244..64b702b 100644
--- a/core/src/main/scala/kafka/consumer/PartitionTopicInfo.scala
+++ b/core/src/main/scala/kafka/consumer/PartitionTopicInfo.scala
@@ -50,12 +50,12 @@ class PartitionTopicInfo(val topic: String,
}
/**
- * Enqueue a message set for processing
+ * Enqueue a message set for processing.
*/
def enqueue(messages: ByteBufferMessageSet) {
- val size = messages.sizeInBytes
+ val size = messages.validBytes
if(size > 0) {
- val next = nextOffset(messages)
+ val next = messages.shallowIterator.toSeq.last.nextOffset
trace("Updating fetch offset = " + fetchedOffset.get + " to " + next)
chunkQueue.put(new FetchedDataChunk(messages, this, fetchedOffset.get))
fetchedOffset.set(next)
@@ -65,17 +65,6 @@ class PartitionTopicInfo(val topic: String,
}
}
- /**
- * Get the next fetch offset after this message set
- */
- private def nextOffset(messages: ByteBufferMessageSet): Long = {
- var nextOffset = PartitionTopicInfo.InvalidOffset
- val iter = messages.shallowIterator
- while(iter.hasNext)
- nextOffset = iter.next.nextOffset
- nextOffset
- }
-
override def toString(): String = topic + ":" + partitionId.toString + ": fetched offset = " + fetchedOffset.get +
": consumed offset = " + consumedOffset.get
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/5a50f7e5/core/src/main/scala/kafka/server/AbstractFetcherThread.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/server/AbstractFetcherThread.scala b/core/src/main/scala/kafka/server/AbstractFetcherThread.scala
index 087979f..cfa7747 100644
--- a/core/src/main/scala/kafka/server/AbstractFetcherThread.scala
+++ b/core/src/main/scala/kafka/server/AbstractFetcherThread.scala
@@ -18,7 +18,6 @@
package kafka.server
import kafka.cluster.Broker
-import kafka.common.{ClientIdAndBroker, TopicAndPartition, ErrorMapping}
import collection.mutable
import kafka.message.ByteBufferMessageSet
import kafka.message.MessageAndOffset
@@ -30,6 +29,7 @@ import java.util.concurrent.TimeUnit
import java.util.concurrent.locks.ReentrantLock
import kafka.consumer.{PartitionTopicInfo, SimpleConsumer}
import kafka.api.{FetchRequest, FetchResponse, FetchResponsePartitionData, FetchRequestBuilder}
+import kafka.common.{KafkaException, ClientIdAndBroker, TopicAndPartition, ErrorMapping}
/**
@@ -118,17 +118,23 @@ abstract class AbstractFetcherThread(name: String, clientId: String, sourceBroke
if (currentOffset.isDefined && fetchRequest.requestInfo(topicAndPartition).offset == currentOffset.get) {
partitionData.error match {
case ErrorMapping.NoError =>
- val messages = partitionData.messages.asInstanceOf[ByteBufferMessageSet]
- val validBytes = messages.validBytes
- val newOffset = messages.lastOption match {
- case Some(m: MessageAndOffset) => m.nextOffset
- case None => currentOffset.get
+ try {
+ val messages = partitionData.messages.asInstanceOf[ByteBufferMessageSet]
+ val validBytes = messages.validBytes
+ val newOffset = messages.shallowIterator.toSeq.lastOption match {
+ case Some(m: MessageAndOffset) => m.nextOffset
+ case None => currentOffset.get
+ }
+ partitionMap.put(topicAndPartition, newOffset)
+ fetcherLagStats.getFetcherLagStats(topic, partitionId).lag = partitionData.hw - newOffset
+ fetcherStats.byteRate.mark(validBytes)
+ // Once we hand off the partition data to the subclass, we can't mess with it any more in this thread
+ processPartitionData(topicAndPartition, currentOffset.get, partitionData)
+ } catch {
+ case e =>
+ throw new KafkaException("error processing data for topic %s partititon %d offset %d"
+ .format(topic, partitionId, currentOffset.get), e)
}
- partitionMap.put(topicAndPartition, newOffset)
- fetcherLagStats.getFetcherLagStats(topic, partitionId).lag = partitionData.hw - newOffset
- fetcherStats.byteRate.mark(validBytes)
- // Once we hand off the partition data to the subclass, we can't mess with it any more in this thread
- processPartitionData(topicAndPartition, currentOffset.get, partitionData)
case ErrorMapping.OffsetOutOfRangeCode =>
try {
val newOffset = handleOffsetOutOfRange(topicAndPartition)