You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by rs...@apache.org on 2020/11/06 08:53:35 UTC
[kafka] branch trunk updated: MINOR: Always return partitions with
diverging epochs in fetch response (#9567)
This is an automated email from the ASF dual-hosted git repository.
rsivaram pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new 8e211eb MINOR: Always return partitions with diverging epochs in fetch response (#9567)
8e211eb is described below
commit 8e211eb72f9a45897cc37fed394a38096aa47feb
Author: Rajini Sivaram <ra...@googlemail.com>
AuthorDate: Fri Nov 6 08:51:14 2020 +0000
MINOR: Always return partitions with diverging epochs in fetch response (#9567)
Reviewers: Jason Gustafson <ja...@confluent.io>, Chia-Ping Tsai <ch...@gmail.com>
---
.../src/main/scala/kafka/server/FetchSession.scala | 4 ++
.../scala/unit/kafka/server/FetchSessionTest.scala | 55 ++++++++++++++++++++++
2 files changed, 59 insertions(+)
diff --git a/core/src/main/scala/kafka/server/FetchSession.scala b/core/src/main/scala/kafka/server/FetchSession.scala
index 140fba0..c6280f3 100644
--- a/core/src/main/scala/kafka/server/FetchSession.scala
+++ b/core/src/main/scala/kafka/server/FetchSession.scala
@@ -154,6 +154,10 @@ class CachedPartition(val topic: String,
highWatermark = -1
mustRespond = true
}
+ if (respData.divergingEpoch.isPresent) {
+ // Partitions with diverging epoch are always included in response to trigger truncation.
+ mustRespond = true
+ }
mustRespond
}
diff --git a/core/src/test/scala/unit/kafka/server/FetchSessionTest.scala b/core/src/test/scala/unit/kafka/server/FetchSessionTest.scala
index 5ef3105..9d99fb5 100755
--- a/core/src/test/scala/unit/kafka/server/FetchSessionTest.scala
+++ b/core/src/test/scala/unit/kafka/server/FetchSessionTest.scala
@@ -21,10 +21,12 @@ import java.util.{Collections, Optional}
import kafka.utils.MockTime
import org.apache.kafka.common.TopicPartition
+import org.apache.kafka.common.message.FetchResponseData
import org.apache.kafka.common.protocol.Errors
import org.apache.kafka.common.record.Records
import org.apache.kafka.common.requests.FetchMetadata.{FINAL_EPOCH, INVALID_SESSION_ID}
import org.apache.kafka.common.requests.{FetchRequest, FetchResponse, FetchMetadata => JFetchMetadata}
+import org.apache.kafka.common.utils.Utils
import org.junit.Assert._
import org.junit.rules.Timeout
import org.junit.{Rule, Test}
@@ -643,4 +645,57 @@ class FetchSessionTest {
assertTrue(resp2.responseData().isEmpty)
assertEquals(0, cache.size)
}
+
+ @Test
+ def testDivergingEpoch(): Unit = {
+ val time = new MockTime()
+ val cache = new FetchSessionCache(10, 1000)
+ val fetchManager = new FetchManager(time, cache)
+ val tp1 = new TopicPartition("foo", 1)
+ val tp2 = new TopicPartition("bar", 2)
+
+ val reqData = new util.LinkedHashMap[TopicPartition, FetchRequest.PartitionData]
+ reqData.put(tp1, new FetchRequest.PartitionData(100, 0, 1000, Optional.of(5), Optional.of(4)))
+ reqData.put(tp2, new FetchRequest.PartitionData(100, 0, 1000, Optional.of(5), Optional.of(4)))
+
+ // Full fetch context returns all partitions in the response
+ val context1 = fetchManager.newContext(JFetchMetadata.INITIAL, reqData, EMPTY_PART_LIST, isFollower = false)
+ assertEquals(classOf[FullFetchContext], context1.getClass)
+ val respData = new util.LinkedHashMap[TopicPartition, FetchResponse.PartitionData[Records]]
+ respData.put(tp1, new FetchResponse.PartitionData(Errors.NONE,
+ 105, 105, 0, Optional.empty(), Collections.emptyList(), Optional.empty(), null))
+ val divergingEpoch = Optional.of(new FetchResponseData.EpochEndOffset().setEpoch(3).setEndOffset(90))
+ respData.put(tp2, new FetchResponse.PartitionData(Errors.NONE,
+ 105, 105, 0, Optional.empty(), Collections.emptyList(), divergingEpoch, null))
+ val resp1 = context1.updateAndGenerateResponseData(respData)
+ assertEquals(Errors.NONE, resp1.error)
+ assertNotEquals(INVALID_SESSION_ID, resp1.sessionId)
+ assertEquals(Utils.mkSet(tp1, tp2), resp1.responseData.keySet)
+
+ // Incremental fetch context returns partitions with divergent epoch even if none
+ // of the other conditions for return are met.
+ val context2 = fetchManager.newContext(new JFetchMetadata(resp1.sessionId, 1), reqData, EMPTY_PART_LIST, isFollower = false)
+ assertEquals(classOf[IncrementalFetchContext], context2.getClass)
+ val resp2 = context2.updateAndGenerateResponseData(respData)
+ assertEquals(Errors.NONE, resp2.error)
+ assertEquals(resp1.sessionId, resp2.sessionId)
+ assertEquals(Collections.singleton(tp2), resp2.responseData.keySet)
+
+ // All partitions with divergent epoch should be returned.
+ respData.put(tp1, new FetchResponse.PartitionData(Errors.NONE,
+ 105, 105, 0, Optional.empty(), Collections.emptyList(), divergingEpoch, null))
+ val resp3 = context2.updateAndGenerateResponseData(respData)
+ assertEquals(Errors.NONE, resp3.error)
+ assertEquals(resp1.sessionId, resp3.sessionId)
+ assertEquals(Utils.mkSet(tp1, tp2), resp3.responseData.keySet)
+
+ // Partitions that meet other conditions should be returned regardless of whether
+ // divergingEpoch is set or not.
+ respData.put(tp1, new FetchResponse.PartitionData(Errors.NONE,
+ 110, 110, 0, Optional.empty(), Collections.emptyList(), Optional.empty(), null))
+ val resp4 = context2.updateAndGenerateResponseData(respData)
+ assertEquals(Errors.NONE, resp4.error)
+ assertEquals(resp1.sessionId, resp4.sessionId)
+ assertEquals(Utils.mkSet(tp1, tp2), resp4.responseData.keySet)
+ }
}