You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by rs...@apache.org on 2021/06/29 17:32:58 UTC

[kafka] branch 2.8 updated: KAFKA-12996; Return OFFSET_OUT_OF_RANGE for fetchOffset < startOffset even for diverging epochs (#10930)

This is an automated email from the ASF dual-hosted git repository.

rsivaram pushed a commit to branch 2.8
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/2.8 by this push:
     new fc0a10f  KAFKA-12996; Return OFFSET_OUT_OF_RANGE for fetchOffset < startOffset even for diverging epochs (#10930)
fc0a10f is described below

commit fc0a10f1a8cc510f23c43851fe73bb31a5b7f491
Author: Rajini Sivaram <ra...@googlemail.com>
AuthorDate: Tue Jun 29 16:49:36 2021 +0100

    KAFKA-12996; Return OFFSET_OUT_OF_RANGE for fetchOffset < startOffset even for diverging epochs (#10930)
    
    If fetchOffset < startOffset, we currently throw OffsetOutOfRangeException when attempting to read from the log in the regular case. But for diverging epochs, we return Errors.NONE with the new leader start offset, hwm etc.. ReplicaFetcherThread throws OffsetOutOfRangeException when processing responses with Errors.NONE if the leader's offsets in the response are out of range and this moves the partition to failed state. The PR adds a check for this case when processing fetch requests [...]
    
    Reviewers: Luke Chen <sh...@gmail.com>, Nikhil Bhatia <ri...@gmail.com>, Guozhang Wang <wa...@gmail.com>
---
 core/src/main/scala/kafka/cluster/Partition.scala          | 6 ++++++
 core/src/test/scala/unit/kafka/cluster/PartitionTest.scala | 5 +++++
 2 files changed, 11 insertions(+)

diff --git a/core/src/main/scala/kafka/cluster/Partition.scala b/core/src/main/scala/kafka/cluster/Partition.scala
index 917ca50..9b35a00 100755
--- a/core/src/main/scala/kafka/cluster/Partition.scala
+++ b/core/src/main/scala/kafka/cluster/Partition.scala
@@ -1109,6 +1109,12 @@ class Partition(val topicPartition: TopicPartition,
           s"$lastFetchedEpoch from the request")
       }
 
+      // If fetch offset is less than log start, fail with OffsetOutOfRangeException, regardless of whether epochs are diverging
+      if (fetchOffset < initialLogStartOffset) {
+        throw new OffsetOutOfRangeException(s"Received request for offset $fetchOffset for partition $topicPartition, " +
+          s"but we only have log segments in the range $initialLogStartOffset to $initialLogEndOffset.")
+      }
+
       if (epochEndOffset.leaderEpoch < fetchEpoch || epochEndOffset.endOffset < fetchOffset) {
         val emptyFetchData = FetchDataInfo(
           fetchOffsetMetadata = LogOffsetMetadata(fetchOffset),
diff --git a/core/src/test/scala/unit/kafka/cluster/PartitionTest.scala b/core/src/test/scala/unit/kafka/cluster/PartitionTest.scala
index d569c89..b3369df 100644
--- a/core/src/test/scala/unit/kafka/cluster/PartitionTest.scala
+++ b/core/src/test/scala/unit/kafka/cluster/PartitionTest.scala
@@ -124,6 +124,11 @@ class PartitionTest extends AbstractPartitionTest {
     assertNoDivergence(read(lastFetchedEpoch = 3, fetchOffset = 5))
 
     assertThrows(classOf[OffsetOutOfRangeException], () => read(lastFetchedEpoch = 0, fetchOffset = 0))
+
+    // Fetch offset lower than start offset should throw OffsetOutOfRangeException
+    log.maybeIncrementLogStartOffset(newLogStartOffset = 10, ClientRecordDeletion)
+    assertThrows(classOf[OffsetOutOfRangeException], () => read(lastFetchedEpoch = 5, fetchOffset = 6)) // diverging
+    assertThrows(classOf[OffsetOutOfRangeException], () => read(lastFetchedEpoch = 3, fetchOffset = 6)) // not diverging
   }
 
   @Test