You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by gu...@apache.org on 2020/10/15 17:03:22 UTC
[kafka] branch 2.7 updated: KAFKA-10613: Only set leader epoch when
list-offset version >= 4 (#9438)
This is an automated email from the ASF dual-hosted git repository.
guozhang pushed a commit to branch 2.7
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/2.7 by this push:
new eb77e66 KAFKA-10613: Only set leader epoch when list-offset version >= 4 (#9438)
eb77e66 is described below
commit eb77e665751f94fed6132a4d41014a3a976dbd78
Author: Guozhang Wang <wa...@gmail.com>
AuthorDate: Thu Oct 15 10:01:51 2020 -0700
KAFKA-10613: Only set leader epoch when list-offset version >= 4 (#9438)
The leader epoch field is added in version 4, and the auto-generated protocol code would throw unsupported version exception if the field is set to any non-default values for version < 4. This would cause older versioned clients to never receive list-offset results.
Reviewers: Boyang Chen <bo...@confluent.io>
---
core/src/main/scala/kafka/server/KafkaApis.scala | 3 +-
.../unit/kafka/server/ListOffsetsRequestTest.scala | 118 ++++++++++++---------
2 files changed, 70 insertions(+), 51 deletions(-)
diff --git a/core/src/main/scala/kafka/server/KafkaApis.scala b/core/src/main/scala/kafka/server/KafkaApis.scala
index 5077c32..ba8cd08 100644
--- a/core/src/main/scala/kafka/server/KafkaApis.scala
+++ b/core/src/main/scala/kafka/server/KafkaApis.scala
@@ -1007,6 +1007,7 @@ class KafkaApis(val requestChannel: RequestChannel,
val correlationId = request.header.correlationId
val clientId = request.header.clientId
val offsetRequest = request.body[ListOffsetRequest]
+ val version = request.header.apiVersion
def buildErrorResponse(e: Errors, partition: ListOffsetPartition): ListOffsetPartitionResponse = {
new ListOffsetPartitionResponse()
@@ -1055,7 +1056,7 @@ class KafkaApis(val requestChannel: RequestChannel,
.setErrorCode(Errors.NONE.code)
.setTimestamp(found.timestamp)
.setOffset(found.offset)
- if (found.leaderEpoch.isPresent)
+ if (found.leaderEpoch.isPresent && version >= 4)
partitionResponse.setLeaderEpoch(found.leaderEpoch.get)
partitionResponse
case None =>
diff --git a/core/src/test/scala/unit/kafka/server/ListOffsetsRequestTest.scala b/core/src/test/scala/unit/kafka/server/ListOffsetsRequestTest.scala
index 07ff804..cedbf0a 100644
--- a/core/src/test/scala/unit/kafka/server/ListOffsetsRequestTest.scala
+++ b/core/src/test/scala/unit/kafka/server/ListOffsetsRequestTest.scala
@@ -30,10 +30,11 @@ import scala.jdk.CollectionConverters._
class ListOffsetsRequestTest extends BaseRequestTest {
+ val topic = "topic"
+ val partition = new TopicPartition(topic, 0)
+
@Test
def testListOffsetsErrorCodes(): Unit = {
- val topic = "topic"
- val partition = new TopicPartition(topic, 0)
val targetTimes = List(new ListOffsetTopic()
.setName(topic)
.setPartitions(List(new ListOffsetPartition()
@@ -79,6 +80,22 @@ class ListOffsetsRequestTest extends BaseRequestTest {
assertResponseError(Errors.NOT_LEADER_OR_FOLLOWER, nonReplica, debugReplicaRequest)
}
+ def assertResponseErrorForEpoch(error: Errors, brokerId: Int, currentLeaderEpoch: Optional[Integer]): Unit = {
+ val listOffsetPartition = new ListOffsetPartition()
+ .setPartitionIndex(partition.partition)
+ .setTimestamp(ListOffsetRequest.EARLIEST_TIMESTAMP)
+ if (currentLeaderEpoch.isPresent)
+ listOffsetPartition.setCurrentLeaderEpoch(currentLeaderEpoch.get)
+ val targetTimes = List(new ListOffsetTopic()
+ .setName(topic)
+ .setPartitions(List(listOffsetPartition).asJava)).asJava
+ val request = ListOffsetRequest.Builder
+ .forConsumer(false, IsolationLevel.READ_UNCOMMITTED)
+ .setTargetTimes(targetTimes)
+ .build()
+ assertResponseError(error, brokerId, request)
+ }
+
@Test
def testCurrentEpochValidation(): Unit = {
val topic = "topic"
@@ -86,22 +103,6 @@ class ListOffsetsRequestTest extends BaseRequestTest {
val partitionToLeader = TestUtils.createTopic(zkClient, topic, numPartitions = 1, replicationFactor = 3, servers)
val firstLeaderId = partitionToLeader(topicPartition.partition)
- def assertResponseErrorForEpoch(error: Errors, brokerId: Int, currentLeaderEpoch: Optional[Integer]): Unit = {
- val partition = new ListOffsetPartition()
- .setPartitionIndex(topicPartition.partition)
- .setTimestamp(ListOffsetRequest.EARLIEST_TIMESTAMP)
- if (currentLeaderEpoch.isPresent)
- partition.setCurrentLeaderEpoch(currentLeaderEpoch.get)
- val targetTimes = List(new ListOffsetTopic()
- .setName(topic)
- .setPartitions(List(partition).asJava)).asJava
- val request = ListOffsetRequest.Builder
- .forConsumer(false, IsolationLevel.READ_UNCOMMITTED)
- .setTargetTimes(targetTimes)
- .build()
- assertResponseError(error, brokerId, request)
- }
-
// We need a leader change in order to check epoch fencing since the first epoch is 0 and
// -1 is treated as having no epoch at all
killBroker(firstLeaderId)
@@ -122,50 +123,68 @@ class ListOffsetsRequestTest extends BaseRequestTest {
assertResponseErrorForEpoch(Errors.FENCED_LEADER_EPOCH, followerId, Optional.of(secondLeaderEpoch - 1))
}
- @Test
- def testResponseIncludesLeaderEpoch(): Unit = {
- val topic = "topic"
- val topicPartition = new TopicPartition(topic, 0)
- val partitionToLeader = TestUtils.createTopic(zkClient, topic, numPartitions = 1, replicationFactor = 3, servers)
- val firstLeaderId = partitionToLeader(topicPartition.partition)
+ // -1 indicate "latest"
+ def fetchOffsetAndEpoch(serverId: Int,
+ timestamp: Long,
+ version: Short): (Long, Int) = {
+ val targetTimes = List(new ListOffsetTopic()
+ .setName(topic)
+ .setPartitions(List(new ListOffsetPartition()
+ .setPartitionIndex(partition.partition)
+ .setTimestamp(timestamp)).asJava)).asJava
- TestUtils.generateAndProduceMessages(servers, topic, 10)
+ val builder = ListOffsetRequest.Builder
+ .forConsumer(false, IsolationLevel.READ_UNCOMMITTED)
+ .setTargetTimes(targetTimes)
- def fetchOffsetAndEpoch(serverId: Int,
- timestamp: Long): (Long, Int) = {
- val targetTimes = List(new ListOffsetTopic()
- .setName(topic)
- .setPartitions(List(new ListOffsetPartition()
- .setPartitionIndex(topicPartition.partition)
- .setTimestamp(timestamp)).asJava)).asJava
+ val request = if (version == -1) builder.build() else builder.build(version)
- val request = ListOffsetRequest.Builder
- .forConsumer(false, IsolationLevel.READ_UNCOMMITTED)
- .setTargetTimes(targetTimes)
- .build()
+ val response = sendRequest(serverId, request)
+ val partitionData = response.topics.asScala.find(_.name == topic).get
+ .partitions.asScala.find(_.partitionIndex == partition.partition).get
- val response = sendRequest(serverId, request)
- val partitionData = response.topics.asScala.find(_.name == topic).get
- .partitions.asScala.find(_.partitionIndex == topicPartition.partition).get
+ (partitionData.offset, partitionData.leaderEpoch)
+ }
- (partitionData.offset, partitionData.leaderEpoch)
- }
+ @Test
+ def testResponseIncludesLeaderEpoch(): Unit = {
+ val partitionToLeader = TestUtils.createTopic(zkClient, topic, numPartitions = 1, replicationFactor = 3, servers)
+ val firstLeaderId = partitionToLeader(partition.partition)
- assertEquals((0L, 0), fetchOffsetAndEpoch(firstLeaderId, 0L))
- assertEquals((0L, 0), fetchOffsetAndEpoch(firstLeaderId, ListOffsetRequest.EARLIEST_TIMESTAMP))
- assertEquals((10L, 0), fetchOffsetAndEpoch(firstLeaderId, ListOffsetRequest.LATEST_TIMESTAMP))
+ TestUtils.generateAndProduceMessages(servers, topic, 10)
+
+ assertEquals((0L, 0), fetchOffsetAndEpoch(firstLeaderId, 0L, -1))
+ assertEquals((0L, 0), fetchOffsetAndEpoch(firstLeaderId, ListOffsetRequest.EARLIEST_TIMESTAMP, -1))
+ assertEquals((10L, 0), fetchOffsetAndEpoch(firstLeaderId, ListOffsetRequest.LATEST_TIMESTAMP, -1))
// Kill the first leader so that we can verify the epoch change when fetching the latest offset
killBroker(firstLeaderId)
- val secondLeaderId = TestUtils.awaitLeaderChange(servers, topicPartition, firstLeaderId)
- val secondLeaderEpoch = TestUtils.findLeaderEpoch(secondLeaderId, topicPartition, servers)
+ val secondLeaderId = TestUtils.awaitLeaderChange(servers, partition, firstLeaderId)
+ val secondLeaderEpoch = TestUtils.findLeaderEpoch(secondLeaderId, partition, servers)
// No changes to written data
- assertEquals((0L, 0), fetchOffsetAndEpoch(secondLeaderId, 0L))
- assertEquals((0L, 0), fetchOffsetAndEpoch(secondLeaderId, ListOffsetRequest.EARLIEST_TIMESTAMP))
+ assertEquals((0L, 0), fetchOffsetAndEpoch(secondLeaderId, 0L, -1))
+ assertEquals((0L, 0), fetchOffsetAndEpoch(secondLeaderId, ListOffsetRequest.EARLIEST_TIMESTAMP, -1))
+
+ assertEquals((0L, 0), fetchOffsetAndEpoch(secondLeaderId, 0L, -1))
+ assertEquals((0L, 0), fetchOffsetAndEpoch(secondLeaderId, ListOffsetRequest.EARLIEST_TIMESTAMP, -1))
// The latest offset reflects the updated epoch
- assertEquals((10L, secondLeaderEpoch), fetchOffsetAndEpoch(secondLeaderId, ListOffsetRequest.LATEST_TIMESTAMP))
+ assertEquals((10L, secondLeaderEpoch), fetchOffsetAndEpoch(secondLeaderId, ListOffsetRequest.LATEST_TIMESTAMP, -1))
+ }
+
+ @Test
+ def testResponseDefaultOffsetAndLeaderEpochForLowerVersions(): Unit = {
+ val partitionToLeader = TestUtils.createTopic(zkClient, topic, numPartitions = 1, replicationFactor = 3, servers)
+ val firstLeaderId = partitionToLeader(partition.partition)
+
+ TestUtils.generateAndProduceMessages(servers, topic, 10)
+
+ assertEquals((-1L, -1), fetchOffsetAndEpoch(firstLeaderId, 0L, 0))
+ assertEquals((0L, -1), fetchOffsetAndEpoch(firstLeaderId, 0L, 1))
+ assertEquals((0L, -1), fetchOffsetAndEpoch(firstLeaderId, 0L, 2))
+ assertEquals((0L, -1), fetchOffsetAndEpoch(firstLeaderId, 0L, 3))
+ assertEquals((0L, 0), fetchOffsetAndEpoch(firstLeaderId, 0L, 4))
}
private def assertResponseError(error: Errors, brokerId: Int, request: ListOffsetRequest): Unit = {
@@ -181,5 +200,4 @@ class ListOffsetsRequestTest extends BaseRequestTest {
private def sendRequest(leaderId: Int, request: ListOffsetRequest): ListOffsetResponse = {
connectAndReceive[ListOffsetResponse](request, destination = brokerSocketServer(leaderId))
}
-
}