You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by ju...@apache.org on 2013/04/04 02:15:19 UTC

git commit: kafka-846; AbstractFetcherThread should do shallow instead of deep iteration; patched by Jun Rao; reviewed by Neha Narkhede

Updated Branches:
  refs/heads/0.8 bd262ac70 -> 5a50f7e55


kafka-846; AbstractFetcherThread should do shallow instead of deep iteration; patched by Jun Rao; reviewed by Neha Narkhede


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/5a50f7e5
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/5a50f7e5
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/5a50f7e5

Branch: refs/heads/0.8
Commit: 5a50f7e555510b8cde08cd588cf4b67b06484b16
Parents: bd262ac
Author: Jun Rao <ju...@gmail.com>
Authored: Wed Apr 3 17:15:06 2013 -0700
Committer: Jun Rao <ju...@gmail.com>
Committed: Wed Apr 3 17:15:06 2013 -0700

----------------------------------------------------------------------
 .../scala/kafka/consumer/PartitionTopicInfo.scala  |   17 ++-------
 .../scala/kafka/server/AbstractFetcherThread.scala |   28 +++++++++------
 2 files changed, 20 insertions(+), 25 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/5a50f7e5/core/src/main/scala/kafka/consumer/PartitionTopicInfo.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/consumer/PartitionTopicInfo.scala b/core/src/main/scala/kafka/consumer/PartitionTopicInfo.scala
index 9792244..64b702b 100644
--- a/core/src/main/scala/kafka/consumer/PartitionTopicInfo.scala
+++ b/core/src/main/scala/kafka/consumer/PartitionTopicInfo.scala
@@ -50,12 +50,12 @@ class PartitionTopicInfo(val topic: String,
   }
 
   /**
-   * Enqueue a message set for processing
+   * Enqueue a message set for processing.
    */
   def enqueue(messages: ByteBufferMessageSet) {
-    val size = messages.sizeInBytes
+    val size = messages.validBytes
     if(size > 0) {
-      val next = nextOffset(messages)
+      val next = messages.shallowIterator.toSeq.last.nextOffset
       trace("Updating fetch offset = " + fetchedOffset.get + " to " + next)
       chunkQueue.put(new FetchedDataChunk(messages, this, fetchedOffset.get))
       fetchedOffset.set(next)
@@ -65,17 +65,6 @@ class PartitionTopicInfo(val topic: String,
     }
   }
   
-  /**
-   * Get the next fetch offset after this message set
-   */
-  private def nextOffset(messages: ByteBufferMessageSet): Long = {
-    var nextOffset = PartitionTopicInfo.InvalidOffset
-    val iter = messages.shallowIterator
-    while(iter.hasNext)
-      nextOffset = iter.next.nextOffset
-    nextOffset
-  }
-
   override def toString(): String = topic + ":" + partitionId.toString + ": fetched offset = " + fetchedOffset.get +
     ": consumed offset = " + consumedOffset.get
 }

http://git-wip-us.apache.org/repos/asf/kafka/blob/5a50f7e5/core/src/main/scala/kafka/server/AbstractFetcherThread.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/server/AbstractFetcherThread.scala b/core/src/main/scala/kafka/server/AbstractFetcherThread.scala
index 087979f..cfa7747 100644
--- a/core/src/main/scala/kafka/server/AbstractFetcherThread.scala
+++ b/core/src/main/scala/kafka/server/AbstractFetcherThread.scala
@@ -18,7 +18,6 @@
 package kafka.server
 
 import kafka.cluster.Broker
-import kafka.common.{ClientIdAndBroker, TopicAndPartition, ErrorMapping}
 import collection.mutable
 import kafka.message.ByteBufferMessageSet
 import kafka.message.MessageAndOffset
@@ -30,6 +29,7 @@ import java.util.concurrent.TimeUnit
 import java.util.concurrent.locks.ReentrantLock
 import kafka.consumer.{PartitionTopicInfo, SimpleConsumer}
 import kafka.api.{FetchRequest, FetchResponse, FetchResponsePartitionData, FetchRequestBuilder}
+import kafka.common.{KafkaException, ClientIdAndBroker, TopicAndPartition, ErrorMapping}
 
 
 /**
@@ -118,17 +118,23 @@ abstract class AbstractFetcherThread(name: String, clientId: String, sourceBroke
             if (currentOffset.isDefined && fetchRequest.requestInfo(topicAndPartition).offset == currentOffset.get) {
               partitionData.error match {
                 case ErrorMapping.NoError =>
-                  val messages = partitionData.messages.asInstanceOf[ByteBufferMessageSet]
-                  val validBytes = messages.validBytes
-                  val newOffset = messages.lastOption match {
-                    case Some(m: MessageAndOffset) => m.nextOffset
-                    case None => currentOffset.get
+                  try {
+                    val messages = partitionData.messages.asInstanceOf[ByteBufferMessageSet]
+                    val validBytes = messages.validBytes
+                    val newOffset = messages.shallowIterator.toSeq.lastOption match {
+                      case Some(m: MessageAndOffset) => m.nextOffset
+                      case None => currentOffset.get
+                    }
+                    partitionMap.put(topicAndPartition, newOffset)
+                    fetcherLagStats.getFetcherLagStats(topic, partitionId).lag = partitionData.hw - newOffset
+                    fetcherStats.byteRate.mark(validBytes)
+                    // Once we hand off the partition data to the subclass, we can't mess with it any more in this thread
+                    processPartitionData(topicAndPartition, currentOffset.get, partitionData)
+                  } catch {
+                    case e =>
+                      throw new KafkaException("error processing data for topic %s partititon %d offset %d"
+                                               .format(topic, partitionId, currentOffset.get), e)
                   }
-                  partitionMap.put(topicAndPartition, newOffset)
-                  fetcherLagStats.getFetcherLagStats(topic, partitionId).lag = partitionData.hw - newOffset
-                  fetcherStats.byteRate.mark(validBytes)
-                  // Once we hand off the partition data to the subclass, we can't mess with it any more in this thread
-                  processPartitionData(topicAndPartition, currentOffset.get, partitionData)
                 case ErrorMapping.OffsetOutOfRangeCode =>
                   try {
                     val newOffset = handleOffsetOutOfRange(topicAndPartition)