You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by gu...@apache.org on 2020/10/15 17:03:22 UTC

[kafka] branch 2.7 updated: KAFKA-10613: Only set leader epoch when list-offset version >= 4 (#9438)

This is an automated email from the ASF dual-hosted git repository.

guozhang pushed a commit to branch 2.7
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/2.7 by this push:
     new eb77e66  KAFKA-10613: Only set leader epoch when list-offset version >= 4 (#9438)
eb77e66 is described below

commit eb77e665751f94fed6132a4d41014a3a976dbd78
Author: Guozhang Wang <wa...@gmail.com>
AuthorDate: Thu Oct 15 10:01:51 2020 -0700

    KAFKA-10613: Only set leader epoch when list-offset version >= 4 (#9438)
    
    The leader epoch field is added in version 4, and the auto-generated protocol code would throw unsupported version exception if the field is set to any non-default values for version < 4. This would cause older versioned clients to never receive list-offset results.
    
    Reviewers: Boyang Chen <bo...@confluent.io>
---
 core/src/main/scala/kafka/server/KafkaApis.scala   |   3 +-
 .../unit/kafka/server/ListOffsetsRequestTest.scala | 118 ++++++++++++---------
 2 files changed, 70 insertions(+), 51 deletions(-)

diff --git a/core/src/main/scala/kafka/server/KafkaApis.scala b/core/src/main/scala/kafka/server/KafkaApis.scala
index 5077c32..ba8cd08 100644
--- a/core/src/main/scala/kafka/server/KafkaApis.scala
+++ b/core/src/main/scala/kafka/server/KafkaApis.scala
@@ -1007,6 +1007,7 @@ class KafkaApis(val requestChannel: RequestChannel,
     val correlationId = request.header.correlationId
     val clientId = request.header.clientId
     val offsetRequest = request.body[ListOffsetRequest]
+    val version = request.header.apiVersion
 
     def buildErrorResponse(e: Errors, partition: ListOffsetPartition): ListOffsetPartitionResponse = {
       new ListOffsetPartitionResponse()
@@ -1055,7 +1056,7 @@ class KafkaApis(val requestChannel: RequestChannel,
                   .setErrorCode(Errors.NONE.code)
                   .setTimestamp(found.timestamp)
                   .setOffset(found.offset)
-                if (found.leaderEpoch.isPresent)
+                if (found.leaderEpoch.isPresent && version >= 4)
                   partitionResponse.setLeaderEpoch(found.leaderEpoch.get)
                 partitionResponse
               case None =>
diff --git a/core/src/test/scala/unit/kafka/server/ListOffsetsRequestTest.scala b/core/src/test/scala/unit/kafka/server/ListOffsetsRequestTest.scala
index 07ff804..cedbf0a 100644
--- a/core/src/test/scala/unit/kafka/server/ListOffsetsRequestTest.scala
+++ b/core/src/test/scala/unit/kafka/server/ListOffsetsRequestTest.scala
@@ -30,10 +30,11 @@ import scala.jdk.CollectionConverters._
 
 class ListOffsetsRequestTest extends BaseRequestTest {
 
+  val topic = "topic"
+  val partition = new TopicPartition(topic, 0)
+
   @Test
   def testListOffsetsErrorCodes(): Unit = {
-    val topic = "topic"
-    val partition = new TopicPartition(topic, 0)
     val targetTimes = List(new ListOffsetTopic()
       .setName(topic)
       .setPartitions(List(new ListOffsetPartition()
@@ -79,6 +80,22 @@ class ListOffsetsRequestTest extends BaseRequestTest {
     assertResponseError(Errors.NOT_LEADER_OR_FOLLOWER, nonReplica, debugReplicaRequest)
   }
 
+  def assertResponseErrorForEpoch(error: Errors, brokerId: Int, currentLeaderEpoch: Optional[Integer]): Unit = {
+    val listOffsetPartition = new ListOffsetPartition()
+      .setPartitionIndex(partition.partition)
+      .setTimestamp(ListOffsetRequest.EARLIEST_TIMESTAMP)
+    if (currentLeaderEpoch.isPresent)
+      listOffsetPartition.setCurrentLeaderEpoch(currentLeaderEpoch.get)
+    val targetTimes = List(new ListOffsetTopic()
+      .setName(topic)
+      .setPartitions(List(listOffsetPartition).asJava)).asJava
+    val request = ListOffsetRequest.Builder
+      .forConsumer(false, IsolationLevel.READ_UNCOMMITTED)
+      .setTargetTimes(targetTimes)
+      .build()
+    assertResponseError(error, brokerId, request)
+  }
+
   @Test
   def testCurrentEpochValidation(): Unit = {
     val topic = "topic"
@@ -86,22 +103,6 @@ class ListOffsetsRequestTest extends BaseRequestTest {
     val partitionToLeader = TestUtils.createTopic(zkClient, topic, numPartitions = 1, replicationFactor = 3, servers)
     val firstLeaderId = partitionToLeader(topicPartition.partition)
 
-    def assertResponseErrorForEpoch(error: Errors, brokerId: Int, currentLeaderEpoch: Optional[Integer]): Unit = {
-      val partition = new ListOffsetPartition()
-          .setPartitionIndex(topicPartition.partition)
-          .setTimestamp(ListOffsetRequest.EARLIEST_TIMESTAMP)
-      if (currentLeaderEpoch.isPresent)
-          partition.setCurrentLeaderEpoch(currentLeaderEpoch.get)
-      val targetTimes = List(new ListOffsetTopic()
-        .setName(topic)
-        .setPartitions(List(partition).asJava)).asJava
-      val request = ListOffsetRequest.Builder
-        .forConsumer(false, IsolationLevel.READ_UNCOMMITTED)
-        .setTargetTimes(targetTimes)
-        .build()
-      assertResponseError(error, brokerId, request)
-    }
-
     // We need a leader change in order to check epoch fencing since the first epoch is 0 and
     // -1 is treated as having no epoch at all
     killBroker(firstLeaderId)
@@ -122,50 +123,68 @@ class ListOffsetsRequestTest extends BaseRequestTest {
     assertResponseErrorForEpoch(Errors.FENCED_LEADER_EPOCH, followerId, Optional.of(secondLeaderEpoch - 1))
   }
 
-  @Test
-  def testResponseIncludesLeaderEpoch(): Unit = {
-    val topic = "topic"
-    val topicPartition = new TopicPartition(topic, 0)
-    val partitionToLeader = TestUtils.createTopic(zkClient, topic, numPartitions = 1, replicationFactor = 3, servers)
-    val firstLeaderId = partitionToLeader(topicPartition.partition)
+  // -1 indicate "latest"
+  def fetchOffsetAndEpoch(serverId: Int,
+                          timestamp: Long,
+                          version: Short): (Long, Int) = {
+    val targetTimes = List(new ListOffsetTopic()
+      .setName(topic)
+      .setPartitions(List(new ListOffsetPartition()
+        .setPartitionIndex(partition.partition)
+        .setTimestamp(timestamp)).asJava)).asJava
 
-    TestUtils.generateAndProduceMessages(servers, topic, 10)
+    val builder = ListOffsetRequest.Builder
+      .forConsumer(false, IsolationLevel.READ_UNCOMMITTED)
+      .setTargetTimes(targetTimes)
 
-    def fetchOffsetAndEpoch(serverId: Int,
-                            timestamp: Long): (Long, Int) = {
-      val targetTimes = List(new ListOffsetTopic()
-        .setName(topic)
-        .setPartitions(List(new ListOffsetPartition()
-          .setPartitionIndex(topicPartition.partition)
-          .setTimestamp(timestamp)).asJava)).asJava
+    val request = if (version == -1) builder.build() else builder.build(version)
 
-      val request = ListOffsetRequest.Builder
-        .forConsumer(false, IsolationLevel.READ_UNCOMMITTED)
-        .setTargetTimes(targetTimes)
-        .build()
+    val response = sendRequest(serverId, request)
+    val partitionData = response.topics.asScala.find(_.name == topic).get
+      .partitions.asScala.find(_.partitionIndex == partition.partition).get
 
-      val response = sendRequest(serverId, request)
-      val partitionData = response.topics.asScala.find(_.name == topic).get
-        .partitions.asScala.find(_.partitionIndex == topicPartition.partition).get
+    (partitionData.offset, partitionData.leaderEpoch)
+  }
 
-      (partitionData.offset, partitionData.leaderEpoch)
-    }
+  @Test
+  def testResponseIncludesLeaderEpoch(): Unit = {
+    val partitionToLeader = TestUtils.createTopic(zkClient, topic, numPartitions = 1, replicationFactor = 3, servers)
+    val firstLeaderId = partitionToLeader(partition.partition)
 
-    assertEquals((0L, 0), fetchOffsetAndEpoch(firstLeaderId, 0L))
-    assertEquals((0L, 0), fetchOffsetAndEpoch(firstLeaderId, ListOffsetRequest.EARLIEST_TIMESTAMP))
-    assertEquals((10L, 0), fetchOffsetAndEpoch(firstLeaderId, ListOffsetRequest.LATEST_TIMESTAMP))
+    TestUtils.generateAndProduceMessages(servers, topic, 10)
+
+    assertEquals((0L, 0), fetchOffsetAndEpoch(firstLeaderId, 0L, -1))
+    assertEquals((0L, 0), fetchOffsetAndEpoch(firstLeaderId, ListOffsetRequest.EARLIEST_TIMESTAMP, -1))
+    assertEquals((10L, 0), fetchOffsetAndEpoch(firstLeaderId, ListOffsetRequest.LATEST_TIMESTAMP, -1))
 
     // Kill the first leader so that we can verify the epoch change when fetching the latest offset
     killBroker(firstLeaderId)
-    val secondLeaderId = TestUtils.awaitLeaderChange(servers, topicPartition, firstLeaderId)
-    val secondLeaderEpoch = TestUtils.findLeaderEpoch(secondLeaderId, topicPartition, servers)
+    val secondLeaderId = TestUtils.awaitLeaderChange(servers, partition, firstLeaderId)
+    val secondLeaderEpoch = TestUtils.findLeaderEpoch(secondLeaderId, partition, servers)
 
     // No changes to written data
-    assertEquals((0L, 0), fetchOffsetAndEpoch(secondLeaderId, 0L))
-    assertEquals((0L, 0), fetchOffsetAndEpoch(secondLeaderId, ListOffsetRequest.EARLIEST_TIMESTAMP))
+    assertEquals((0L, 0), fetchOffsetAndEpoch(secondLeaderId, 0L, -1))
+    assertEquals((0L, 0), fetchOffsetAndEpoch(secondLeaderId, ListOffsetRequest.EARLIEST_TIMESTAMP, -1))
+
+    assertEquals((0L, 0), fetchOffsetAndEpoch(secondLeaderId, 0L, -1))
+    assertEquals((0L, 0), fetchOffsetAndEpoch(secondLeaderId, ListOffsetRequest.EARLIEST_TIMESTAMP, -1))
 
     // The latest offset reflects the updated epoch
-    assertEquals((10L, secondLeaderEpoch), fetchOffsetAndEpoch(secondLeaderId, ListOffsetRequest.LATEST_TIMESTAMP))
+    assertEquals((10L, secondLeaderEpoch), fetchOffsetAndEpoch(secondLeaderId, ListOffsetRequest.LATEST_TIMESTAMP, -1))
+  }
+
+  @Test
+  def testResponseDefaultOffsetAndLeaderEpochForLowerVersions(): Unit = {
+    val partitionToLeader = TestUtils.createTopic(zkClient, topic, numPartitions = 1, replicationFactor = 3, servers)
+    val firstLeaderId = partitionToLeader(partition.partition)
+
+    TestUtils.generateAndProduceMessages(servers, topic, 10)
+
+    assertEquals((-1L, -1), fetchOffsetAndEpoch(firstLeaderId, 0L, 0))
+    assertEquals((0L, -1), fetchOffsetAndEpoch(firstLeaderId, 0L, 1))
+    assertEquals((0L, -1), fetchOffsetAndEpoch(firstLeaderId, 0L, 2))
+    assertEquals((0L, -1), fetchOffsetAndEpoch(firstLeaderId, 0L, 3))
+    assertEquals((0L, 0), fetchOffsetAndEpoch(firstLeaderId, 0L, 4))
   }
 
   private def assertResponseError(error: Errors, brokerId: Int, request: ListOffsetRequest): Unit = {
@@ -181,5 +200,4 @@ class ListOffsetsRequestTest extends BaseRequestTest {
   private def sendRequest(leaderId: Int, request: ListOffsetRequest): ListOffsetResponse = {
     connectAndReceive[ListOffsetResponse](request, destination = brokerSocketServer(leaderId))
   }
-
 }