You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by ma...@apache.org on 2020/11/02 18:18:37 UTC

[kafka] branch 2.7 updated: KAFKA-10669: Make CurrentLeaderEpoch field ignorable and set MaxNumOffsets field default to 1

This is an automated email from the ASF dual-hosted git repository.

manikumar pushed a commit to branch 2.7
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/2.7 by this push:
     new edc2c4f  KAFKA-10669: Make CurrentLeaderEpoch field ignorable and set MaxNumOffsets field default to 1
edc2c4f is described below

commit edc2c4fd8d1e91237b08c8df70628e74637e4e47
Author: Manikumar Reddy <ma...@gmail.com>
AuthorDate: Mon Nov 2 23:39:03 2020 +0530

    KAFKA-10669: Make CurrentLeaderEpoch field ignorable and set MaxNumOffsets field default to 1
    
    Couple of failures observed after KAFKA-9627: Replace ListOffset request/response with automated protocol (https://github.com/apache/kafka/pull/8295)
    
    1. Latest consumer fails to consume from 0.10.0.1 brokers. Below system tests are failing
    kafkatest.tests.client.client_compatibility_features_test.ClientCompatibilityFeaturesTest
    kafkatest.tests.client.client_compatibility_produce_consume_test.ClientCompatibilityProduceConsumeTest
    
    Solution: Current default value for MaxNumOffsets is 0. because to this brokers are not returning offsets for v0 request. Set default value for MaxNumOffsets field to 1.  This is similar to previous [approach]
    (https://github.com/apache/kafka/blob/2.6/clients/src/main/java/org/apache/kafka/common/requests/ListOffsetRequest.java#L204)
    
    2. In some scenarios, latest consumer fails with below error when connecting to a Kafka cluster which consists of newer and older (<=2.0) Kafka brokers
    `org.apache.kafka.common.errors.UnsupportedVersionException: Attempted to write a non-default currentLeaderEpoch at version 3`
    
    Solution: After #8295, consumer can set non-default CurrentLeaderEpoch value for v3 and below requests. One solution is to make CurrentLeaderEpoch ignorable.
    
    Author: Manikumar Reddy <ma...@gmail.com>
    
    Reviewers: David Jacot <dj...@confluent.io>
    
    Closes #9540 from omkreddy/fix-listoffsets
    
    (cherry picked from commit 236d7dc890e82c9b146579a8be801c1c7f54feb9)
    Signed-off-by: Manikumar Reddy <ma...@gmail.com>
---
 .../common/message/ListOffsetRequest.json          |  4 +--
 .../kafka/common/requests/RequestResponseTest.java | 13 +++++-----
 .../unit/kafka/server/ListOffsetsRequestTest.scala | 30 +++++++++++++++++-----
 3 files changed, 32 insertions(+), 15 deletions(-)

diff --git a/clients/src/main/resources/common/message/ListOffsetRequest.json b/clients/src/main/resources/common/message/ListOffsetRequest.json
index 259d7bf..5ecc2d6 100644
--- a/clients/src/main/resources/common/message/ListOffsetRequest.json
+++ b/clients/src/main/resources/common/message/ListOffsetRequest.json
@@ -42,11 +42,11 @@
         "about": "Each partition in the request.", "fields": [
         { "name": "PartitionIndex", "type": "int32", "versions": "0+",
           "about": "The partition index." },
-        { "name": "CurrentLeaderEpoch", "type": "int32", "versions": "4+", "default": "-1",
+        { "name": "CurrentLeaderEpoch", "type": "int32", "versions": "4+", "default": "-1", "ignorable": true,
           "about": "The current leader epoch." },
         { "name": "Timestamp", "type": "int64", "versions": "0+",
           "about": "The current timestamp." },
-        { "name": "MaxNumOffsets", "type": "int32", "versions": "0",
+        { "name": "MaxNumOffsets", "type": "int32", "versions": "0", "default": "1",
           "about": "The maximum number of offsets to report." }
       ]}
     ]}
diff --git a/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java b/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java
index 0862e2b..71048d8 100644
--- a/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java
+++ b/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java
@@ -1243,7 +1243,8 @@ public class RequestResponseTest {
                     .setPartitions(Arrays.asList(new ListOffsetPartition()
                             .setPartitionIndex(0)
                             .setTimestamp(1000000L)
-                            .setMaxNumOffsets(10)));
+                            .setMaxNumOffsets(10)
+                            .setCurrentLeaderEpoch(5)));
             return ListOffsetRequest.Builder
                     .forConsumer(false, IsolationLevel.READ_UNCOMMITTED)
                     .setTargetTimes(Collections.singletonList(topic))
@@ -1253,7 +1254,8 @@ public class RequestResponseTest {
                     .setName("test")
                     .setPartitions(Arrays.asList(new ListOffsetPartition()
                             .setPartitionIndex(0)
-                            .setTimestamp(1000000L)));
+                            .setTimestamp(1000000L)
+                            .setCurrentLeaderEpoch(5)));
             return ListOffsetRequest.Builder
                     .forConsumer(true, IsolationLevel.READ_UNCOMMITTED)
                     .setTargetTimes(Collections.singletonList(topic))
@@ -1261,10 +1263,9 @@ public class RequestResponseTest {
         } else if (version >= 2 && version <= 5) {
             ListOffsetPartition partition = new ListOffsetPartition()
                     .setPartitionIndex(0)
-                    .setTimestamp(1000000L);
-            if (version >= 4) {
-                partition.setCurrentLeaderEpoch(5);
-            }
+                    .setTimestamp(1000000L)
+                    .setCurrentLeaderEpoch(5);
+
             ListOffsetTopic topic = new ListOffsetTopic()
                     .setName("test")
                     .setPartitions(Arrays.asList(partition));
diff --git a/core/src/test/scala/unit/kafka/server/ListOffsetsRequestTest.scala b/core/src/test/scala/unit/kafka/server/ListOffsetsRequestTest.scala
index cedbf0a..ce324c7 100644
--- a/core/src/test/scala/unit/kafka/server/ListOffsetsRequestTest.scala
+++ b/core/src/test/scala/unit/kafka/server/ListOffsetsRequestTest.scala
@@ -143,7 +143,13 @@ class ListOffsetsRequestTest extends BaseRequestTest {
     val partitionData = response.topics.asScala.find(_.name == topic).get
       .partitions.asScala.find(_.partitionIndex == partition.partition).get
 
-    (partitionData.offset, partitionData.leaderEpoch)
+    if (version == 0) {
+      if (partitionData.oldStyleOffsets().isEmpty)
+        (-1, partitionData.leaderEpoch)
+      else
+        (partitionData.oldStyleOffsets().asScala.head, partitionData.leaderEpoch)
+    } else
+      (partitionData.offset, partitionData.leaderEpoch)
   }
 
   @Test
@@ -174,17 +180,27 @@ class ListOffsetsRequestTest extends BaseRequestTest {
   }
 
   @Test
-  def testResponseDefaultOffsetAndLeaderEpochForLowerVersions(): Unit = {
+  def testResponseDefaultOffsetAndLeaderEpochForAllVersions(): Unit = {
     val partitionToLeader = TestUtils.createTopic(zkClient, topic, numPartitions = 1, replicationFactor = 3, servers)
     val firstLeaderId = partitionToLeader(partition.partition)
 
     TestUtils.generateAndProduceMessages(servers, topic, 10)
 
-    assertEquals((-1L, -1), fetchOffsetAndEpoch(firstLeaderId, 0L, 0))
-    assertEquals((0L, -1), fetchOffsetAndEpoch(firstLeaderId, 0L, 1))
-    assertEquals((0L, -1), fetchOffsetAndEpoch(firstLeaderId, 0L, 2))
-    assertEquals((0L, -1), fetchOffsetAndEpoch(firstLeaderId, 0L, 3))
-    assertEquals((0L, 0), fetchOffsetAndEpoch(firstLeaderId, 0L, 4))
+    for (version <- ApiKeys.LIST_OFFSETS.oldestVersion to ApiKeys.LIST_OFFSETS.latestVersion) {
+      if (version == 0) {
+        assertEquals((-1L, -1), fetchOffsetAndEpoch(firstLeaderId, 0L, version.toShort))
+        assertEquals((0L, -1), fetchOffsetAndEpoch(firstLeaderId, ListOffsetRequest.EARLIEST_TIMESTAMP, version.toShort))
+        assertEquals((10L, -1), fetchOffsetAndEpoch(firstLeaderId, ListOffsetRequest.LATEST_TIMESTAMP, version.toShort))
+      } else if (version >= 1 && version <= 3) {
+        assertEquals((0L, -1), fetchOffsetAndEpoch(firstLeaderId, 0L, version.toShort))
+        assertEquals((0L, -1), fetchOffsetAndEpoch(firstLeaderId, ListOffsetRequest.EARLIEST_TIMESTAMP, version.toShort))
+        assertEquals((10L, -1), fetchOffsetAndEpoch(firstLeaderId, ListOffsetRequest.LATEST_TIMESTAMP, version.toShort))
+      } else if (version >= 4) {
+        assertEquals((0L, 0), fetchOffsetAndEpoch(firstLeaderId, 0L, version.toShort))
+        assertEquals((0L, 0), fetchOffsetAndEpoch(firstLeaderId, ListOffsetRequest.EARLIEST_TIMESTAMP, version.toShort))
+        assertEquals((10L, 0), fetchOffsetAndEpoch(firstLeaderId, ListOffsetRequest.LATEST_TIMESTAMP, version.toShort))
+      }
+    }
   }
 
   private def assertResponseError(error: Errors, brokerId: Int, request: ListOffsetRequest): Unit = {