You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by ju...@apache.org on 2016/02/09 23:57:58 UTC

kafka git commit: KAFKA-3159; stale high watermark segment offset causes early fetch return

Repository: kafka
Updated Branches:
  refs/heads/trunk b5e6b8671 -> 6d0dca734


KAFKA-3159; stale high watermark segment offset causes early fetch return

Author: Jason Gustafson <ja...@confluent.io>

Reviewers: Ismael Juma <is...@juma.me.uk>, Jun Rao <ju...@gmail.com>

Closes #884 from hachikuji/K3159


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/6d0dca73
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/6d0dca73
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/6d0dca73

Branch: refs/heads/trunk
Commit: 6d0dca7345d9e3c0b8924496a4632954ca1268e5
Parents: b5e6b86
Author: Jason Gustafson <ja...@confluent.io>
Authored: Tue Feb 9 16:57:53 2016 -0600
Committer: Jun Rao <ju...@gmail.com>
Committed: Tue Feb 9 16:57:53 2016 -0600

----------------------------------------------------------------------
 .../main/scala/kafka/cluster/Partition.scala    |  2 +-
 .../main/scala/kafka/server/DelayedFetch.scala  | 29 ++++++++++++--------
 .../scala/kafka/server/LogOffsetMetadata.scala  | 19 ++-----------
 3 files changed, 21 insertions(+), 29 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/6d0dca73/core/src/main/scala/kafka/cluster/Partition.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/cluster/Partition.scala b/core/src/main/scala/kafka/cluster/Partition.scala
index 1bfb144..4e79bdc 100755
--- a/core/src/main/scala/kafka/cluster/Partition.scala
+++ b/core/src/main/scala/kafka/cluster/Partition.scala
@@ -354,7 +354,7 @@ class Partition(val topic: String,
     val allLogEndOffsets = inSyncReplicas.map(_.logEndOffset)
     val newHighWatermark = allLogEndOffsets.min(new LogOffsetMetadata.OffsetOrdering)
     val oldHighWatermark = leaderReplica.highWatermark
-    if(oldHighWatermark.precedes(newHighWatermark)) {
+    if (oldHighWatermark.messageOffset < newHighWatermark.messageOffset || oldHighWatermark.onOlderSegment(newHighWatermark)) {
       leaderReplica.highWatermark = newHighWatermark
       debug("High watermark for partition [%s,%d] updated to %s".format(topic, partitionId, newHighWatermark))
       true

http://git-wip-us.apache.org/repos/asf/kafka/blob/6d0dca73/core/src/main/scala/kafka/server/DelayedFetch.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/server/DelayedFetch.scala b/core/src/main/scala/kafka/server/DelayedFetch.scala
index c8cb21d..a0ff00d 100644
--- a/core/src/main/scala/kafka/server/DelayedFetch.scala
+++ b/core/src/main/scala/kafka/server/DelayedFetch.scala
@@ -81,18 +81,23 @@ class DelayedFetch(delayMs: Long,
               else
                 replica.logEndOffset
 
-            if (endOffset.offsetOnOlderSegment(fetchOffset)) {
-              // Case C, this can happen when the new fetch operation is on a truncated leader
-              debug("Satisfying fetch %s since it is fetching later segments of partition %s.".format(fetchMetadata, topicAndPartition))
-              return forceComplete()
-            } else if (fetchOffset.offsetOnOlderSegment(endOffset)) {
-              // Case C, this can happen when the fetch operation is falling behind the current segment
-              // or the partition has just rolled a new segment
-              debug("Satisfying fetch %s immediately since it is fetching older segments.".format(fetchMetadata))
-              return forceComplete()
-            } else if (fetchOffset.precedes(endOffset)) {
-              // we need take the partition fetch size as upper bound when accumulating the bytes
-              accumulatedSize += math.min(endOffset.positionDiff(fetchOffset), fetchStatus.fetchInfo.fetchSize)
+            // Go directly to the check for Case D if the message offsets are the same. If the log segment
+            // has just rolled, then the high watermark offset will remain the same but be on the old segment,
+            // which would incorrectly be seen as an instance of Case C.
+            if (endOffset.messageOffset != fetchOffset.messageOffset) {
+              if (endOffset.onOlderSegment(fetchOffset)) {
+                // Case C, this can happen when the new fetch operation is on a truncated leader
+                debug("Satisfying fetch %s since it is fetching later segments of partition %s.".format(fetchMetadata, topicAndPartition))
+                return forceComplete()
+              } else if (fetchOffset.onOlderSegment(endOffset)) {
+                // Case C, this can happen when the fetch operation is falling behind the current segment
+                // or the partition has just rolled a new segment
+                debug("Satisfying fetch %s immediately since it is fetching older segments.".format(fetchMetadata))
+                return forceComplete()
+              } else if (fetchOffset.messageOffset < endOffset.messageOffset) {
+                // we need take the partition fetch size as upper bound when accumulating the bytes
+                accumulatedSize += math.min(endOffset.positionDiff(fetchOffset), fetchStatus.fetchInfo.fetchSize)
+              }
             }
           }
         } catch {

http://git-wip-us.apache.org/repos/asf/kafka/blob/6d0dca73/core/src/main/scala/kafka/server/LogOffsetMetadata.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/server/LogOffsetMetadata.scala b/core/src/main/scala/kafka/server/LogOffsetMetadata.scala
index 6add702..7067b20 100644
--- a/core/src/main/scala/kafka/server/LogOffsetMetadata.scala
+++ b/core/src/main/scala/kafka/server/LogOffsetMetadata.scala
@@ -43,7 +43,7 @@ case class LogOffsetMetadata(messageOffset: Long,
                              relativePositionInSegment: Int = LogOffsetMetadata.UnknownFilePosition) {
 
   // check if this offset is already on an older segment compared with the given offset
-  def offsetOnOlderSegment(that: LogOffsetMetadata): Boolean = {
+  def onOlderSegment(that: LogOffsetMetadata): Boolean = {
     if (messageOffsetOnly())
       throw new KafkaException(s"$this cannot compare its segment info with $that since it only has message offset info")
 
@@ -51,26 +51,13 @@ case class LogOffsetMetadata(messageOffset: Long,
   }
 
   // check if this offset is on the same segment with the given offset
-  def offsetOnSameSegment(that: LogOffsetMetadata): Boolean = {
+  def onSameSegment(that: LogOffsetMetadata): Boolean = {
     if (messageOffsetOnly())
       throw new KafkaException(s"$this cannot compare its segment info with $that since it only has message offset info")
 
     this.segmentBaseOffset == that.segmentBaseOffset
   }
 
-  // check if this offset is before the given offset. We need to compare both message offset and segment base offset.
-  def precedes(that: LogOffsetMetadata): Boolean = {
-    this.messageOffset < that.messageOffset || {
-      if (this.messageOffsetOnly() && that.messageOffsetOnly())
-        false
-      else if (!this.messageOffsetOnly() && !that.messageOffsetOnly())
-        this.segmentBaseOffset < that.segmentBaseOffset
-      else
-        throw new KafkaException(s"Cannot compare $this with $that as one has segment base offsets and the other does not.")
-    }
-  }
-
-
   // compute the number of messages between this offset to the given offset
   def offsetDiff(that: LogOffsetMetadata): Long = {
     this.messageOffset - that.messageOffset
@@ -79,7 +66,7 @@ case class LogOffsetMetadata(messageOffset: Long,
   // compute the number of bytes between this offset to the given offset
   // if they are on the same segment and this offset precedes the given offset
   def positionDiff(that: LogOffsetMetadata): Int = {
-    if(!offsetOnSameSegment(that))
+    if(!onSameSegment(that))
       throw new KafkaException(s"$this cannot compare its segment position with $that since they are not on the same segment")
     if(messageOffsetOnly())
       throw new KafkaException(s"$this cannot compare its segment position with $that since it only has message offset info")