You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by ju...@apache.org on 2016/02/09 23:57:58 UTC
kafka git commit: KAFKA-3159;
stale high watermark segment offset causes early fetch return
Repository: kafka
Updated Branches:
refs/heads/trunk b5e6b8671 -> 6d0dca734
KAFKA-3159; stale high watermark segment offset causes early fetch return
Author: Jason Gustafson <ja...@confluent.io>
Reviewers: Ismael Juma <is...@juma.me.uk>, Jun Rao <ju...@gmail.com>
Closes #884 from hachikuji/K3159
Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/6d0dca73
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/6d0dca73
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/6d0dca73
Branch: refs/heads/trunk
Commit: 6d0dca7345d9e3c0b8924496a4632954ca1268e5
Parents: b5e6b86
Author: Jason Gustafson <ja...@confluent.io>
Authored: Tue Feb 9 16:57:53 2016 -0600
Committer: Jun Rao <ju...@gmail.com>
Committed: Tue Feb 9 16:57:53 2016 -0600
----------------------------------------------------------------------
.../main/scala/kafka/cluster/Partition.scala | 2 +-
.../main/scala/kafka/server/DelayedFetch.scala | 29 ++++++++++++--------
.../scala/kafka/server/LogOffsetMetadata.scala | 19 ++-----------
3 files changed, 21 insertions(+), 29 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/kafka/blob/6d0dca73/core/src/main/scala/kafka/cluster/Partition.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/cluster/Partition.scala b/core/src/main/scala/kafka/cluster/Partition.scala
index 1bfb144..4e79bdc 100755
--- a/core/src/main/scala/kafka/cluster/Partition.scala
+++ b/core/src/main/scala/kafka/cluster/Partition.scala
@@ -354,7 +354,7 @@ class Partition(val topic: String,
val allLogEndOffsets = inSyncReplicas.map(_.logEndOffset)
val newHighWatermark = allLogEndOffsets.min(new LogOffsetMetadata.OffsetOrdering)
val oldHighWatermark = leaderReplica.highWatermark
- if(oldHighWatermark.precedes(newHighWatermark)) {
+ if (oldHighWatermark.messageOffset < newHighWatermark.messageOffset || oldHighWatermark.onOlderSegment(newHighWatermark)) {
leaderReplica.highWatermark = newHighWatermark
debug("High watermark for partition [%s,%d] updated to %s".format(topic, partitionId, newHighWatermark))
true
http://git-wip-us.apache.org/repos/asf/kafka/blob/6d0dca73/core/src/main/scala/kafka/server/DelayedFetch.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/server/DelayedFetch.scala b/core/src/main/scala/kafka/server/DelayedFetch.scala
index c8cb21d..a0ff00d 100644
--- a/core/src/main/scala/kafka/server/DelayedFetch.scala
+++ b/core/src/main/scala/kafka/server/DelayedFetch.scala
@@ -81,18 +81,23 @@ class DelayedFetch(delayMs: Long,
else
replica.logEndOffset
- if (endOffset.offsetOnOlderSegment(fetchOffset)) {
- // Case C, this can happen when the new fetch operation is on a truncated leader
- debug("Satisfying fetch %s since it is fetching later segments of partition %s.".format(fetchMetadata, topicAndPartition))
- return forceComplete()
- } else if (fetchOffset.offsetOnOlderSegment(endOffset)) {
- // Case C, this can happen when the fetch operation is falling behind the current segment
- // or the partition has just rolled a new segment
- debug("Satisfying fetch %s immediately since it is fetching older segments.".format(fetchMetadata))
- return forceComplete()
- } else if (fetchOffset.precedes(endOffset)) {
- // we need take the partition fetch size as upper bound when accumulating the bytes
- accumulatedSize += math.min(endOffset.positionDiff(fetchOffset), fetchStatus.fetchInfo.fetchSize)
+ // Go directly to the check for Case D if the message offsets are the same. If the log segment
+ // has just rolled, then the high watermark offset will remain the same but be on the old segment,
+ // which would incorrectly be seen as an instance of Case C.
+ if (endOffset.messageOffset != fetchOffset.messageOffset) {
+ if (endOffset.onOlderSegment(fetchOffset)) {
+ // Case C, this can happen when the new fetch operation is on a truncated leader
+ debug("Satisfying fetch %s since it is fetching later segments of partition %s.".format(fetchMetadata, topicAndPartition))
+ return forceComplete()
+ } else if (fetchOffset.onOlderSegment(endOffset)) {
+ // Case C, this can happen when the fetch operation is falling behind the current segment
+ // or the partition has just rolled a new segment
+ debug("Satisfying fetch %s immediately since it is fetching older segments.".format(fetchMetadata))
+ return forceComplete()
+ } else if (fetchOffset.messageOffset < endOffset.messageOffset) {
+ // we need take the partition fetch size as upper bound when accumulating the bytes
+ accumulatedSize += math.min(endOffset.positionDiff(fetchOffset), fetchStatus.fetchInfo.fetchSize)
+ }
}
}
} catch {
http://git-wip-us.apache.org/repos/asf/kafka/blob/6d0dca73/core/src/main/scala/kafka/server/LogOffsetMetadata.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/server/LogOffsetMetadata.scala b/core/src/main/scala/kafka/server/LogOffsetMetadata.scala
index 6add702..7067b20 100644
--- a/core/src/main/scala/kafka/server/LogOffsetMetadata.scala
+++ b/core/src/main/scala/kafka/server/LogOffsetMetadata.scala
@@ -43,7 +43,7 @@ case class LogOffsetMetadata(messageOffset: Long,
relativePositionInSegment: Int = LogOffsetMetadata.UnknownFilePosition) {
// check if this offset is already on an older segment compared with the given offset
- def offsetOnOlderSegment(that: LogOffsetMetadata): Boolean = {
+ def onOlderSegment(that: LogOffsetMetadata): Boolean = {
if (messageOffsetOnly())
throw new KafkaException(s"$this cannot compare its segment info with $that since it only has message offset info")
@@ -51,26 +51,13 @@ case class LogOffsetMetadata(messageOffset: Long,
}
// check if this offset is on the same segment with the given offset
- def offsetOnSameSegment(that: LogOffsetMetadata): Boolean = {
+ def onSameSegment(that: LogOffsetMetadata): Boolean = {
if (messageOffsetOnly())
throw new KafkaException(s"$this cannot compare its segment info with $that since it only has message offset info")
this.segmentBaseOffset == that.segmentBaseOffset
}
- // check if this offset is before the given offset. We need to compare both message offset and segment base offset.
- def precedes(that: LogOffsetMetadata): Boolean = {
- this.messageOffset < that.messageOffset || {
- if (this.messageOffsetOnly() && that.messageOffsetOnly())
- false
- else if (!this.messageOffsetOnly() && !that.messageOffsetOnly())
- this.segmentBaseOffset < that.segmentBaseOffset
- else
- throw new KafkaException(s"Cannot compare $this with $that as one has segment base offsets and the other does not.")
- }
- }
-
-
// compute the number of messages between this offset to the given offset
def offsetDiff(that: LogOffsetMetadata): Long = {
this.messageOffset - that.messageOffset
@@ -79,7 +66,7 @@ case class LogOffsetMetadata(messageOffset: Long,
// compute the number of bytes between this offset to the given offset
// if they are on the same segment and this offset precedes the given offset
def positionDiff(that: LogOffsetMetadata): Int = {
- if(!offsetOnSameSegment(that))
+ if(!onSameSegment(that))
throw new KafkaException(s"$this cannot compare its segment position with $that since they are not on the same segment")
if(messageOffsetOnly())
throw new KafkaException(s"$this cannot compare its segment position with $that since it only has message offset info")