You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by rh...@apache.org on 2022/03/25 03:00:18 UTC

[kafka] branch 3.1 updated: KAFKA-13770: Restore compatibility with KafkaBasedLog using older Kafka brokers (#11946)

This is an automated email from the ASF dual-hosted git repository.

rhauch pushed a commit to branch 3.1
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/3.1 by this push:
     new e176f4b  KAFKA-13770: Restore compatibility with KafkaBasedLog using older Kafka brokers (#11946)
e176f4b is described below

commit e176f4b2443f221d467c7985ecb9387a626751bf
Author: Randall Hauch <rh...@gmail.com>
AuthorDate: Thu Mar 24 21:40:10 2022 -0500

    KAFKA-13770: Restore compatibility with KafkaBasedLog using older Kafka brokers (#11946)
    
    The `retryEndOffsets(…)` method in `TopicAdmin` recently added (KAFKA-12879, #11797) to allow the `KafkaBasedLog.start()` method to retry any failures reading the last offsets for a topic. However, this introduce a regression when talking to older brokers (0.10.x or earlier).
    
    The `KafkaBasedLog` already had logic that expected an `UnsupportedVersionException` thrown by the admin client when a Kafka API is not available on an older broker, but the new retry logic in `TopicAdmin` did not account for this and wrapped the exception, thereby breaking the `KafkaBasedLog` logic and preventing startup.
    
    The fix is to propagate this `UnsupportedVersionException` from the `TopicAdmin.retryEndOffsets(…)` method. Added a new unit test that first replicated the problem before the fix, and verified the fix corrects the problem.
---
 .../org/apache/kafka/connect/util/TopicAdmin.java  |  4 ++++
 .../apache/kafka/connect/util/TopicAdminTest.java  | 25 +++++++++++++++++++++-
 2 files changed, 28 insertions(+), 1 deletion(-)

diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/util/TopicAdmin.java b/connect/runtime/src/main/java/org/apache/kafka/connect/util/TopicAdmin.java
index c1afd2c..8b2859a 100644
--- a/connect/runtime/src/main/java/org/apache/kafka/connect/util/TopicAdmin.java
+++ b/connect/runtime/src/main/java/org/apache/kafka/connect/util/TopicAdmin.java
@@ -714,6 +714,7 @@ public class TopicAdmin implements AutoCloseable {
      *                          must be 0 or more
      * @return                  the map of offset for each topic partition, or an empty map if the supplied partitions
      *                          are null or empty
+     * @throws UnsupportedVersionException if the broker is too old to support the admin client API to read end offsets
      * @throws ConnectException if {@code timeoutDuration} is exhausted
      * @see TopicAdmin#endOffsets(Set)
      */
@@ -725,6 +726,9 @@ public class TopicAdmin implements AutoCloseable {
                     () -> "list offsets for topic partitions",
                     timeoutDuration,
                     retryBackoffMs);
+        } catch (UnsupportedVersionException e) {
+            // Older brokers don't support this admin method, so rethrow it without wrapping it
+            throw e;
         } catch (Exception e) {
             throw new ConnectException("Failed to list offsets for topic partitions.", e);
         }
diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/util/TopicAdminTest.java b/connect/runtime/src/test/java/org/apache/kafka/connect/util/TopicAdminTest.java
index 32cec61..3c2f3e4 100644
--- a/connect/runtime/src/test/java/org/apache/kafka/connect/util/TopicAdminTest.java
+++ b/connect/runtime/src/test/java/org/apache/kafka/connect/util/TopicAdminTest.java
@@ -467,8 +467,31 @@ public class TopicAdminTest {
         }
     }
 
+    /**
+     * TopicAdmin can be used to read the end offsets, but the admin client API used to do this was
+     * added to the broker in 0.11.0.0. This means that if Connect talks to older brokers,
+     * the admin client cannot be used to read end offsets, and will throw an UnsupportedVersionException.
+     */
+    @Test
+    public void retryEndOffsetsShouldRethrowUnknownVersionException() {
+        String topicName = "myTopic";
+        TopicPartition tp1 = new TopicPartition(topicName, 0);
+        Set<TopicPartition> tps = Collections.singleton(tp1);
+        Long offset = null; // response should use error
+        Cluster cluster = createCluster(1, topicName, 1);
+        try (AdminClientUnitTestEnv env = new AdminClientUnitTestEnv(new MockTime(), cluster)) {
+            env.kafkaClient().setNodeApiVersions(NodeApiVersions.create());
+            env.kafkaClient().prepareResponse(prepareMetadataResponse(cluster, Errors.NONE));
+            // Expect the admin client list offsets will throw unsupported version, simulating older brokers
+            env.kafkaClient().prepareResponse(listOffsetsResultWithUnsupportedVersion(tp1, offset));
+            TopicAdmin admin = new TopicAdmin(null, env.adminClient());
+            // The retryEndOffsets should catch and rethrow an unsupported version exception
+            assertThrows(UnsupportedVersionException.class, () -> admin.retryEndOffsets(tps, Duration.ofMillis(100), 1));
+        }
+    }
+
     @Test
-    public void retryEndOffsetsShouldThrowConnectException() {
+    public void retryEndOffsetsShouldWrapNonRetriableExceptionsWithConnectException() {
         String topicName = "myTopic";
         TopicPartition tp1 = new TopicPartition(topicName, 0);
         Set<TopicPartition> tps = Collections.singleton(tp1);