You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by rh...@apache.org on 2021/02/20 01:03:21 UTC

[kafka] branch 2.7 updated: KAFKA-12339: Add retry to admin client's listOffsets (#10152)

This is an automated email from the ASF dual-hosted git repository.

rhauch pushed a commit to branch 2.7
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/2.7 by this push:
     new 4695745  KAFKA-12339: Add retry to admin client's listOffsets (#10152)
4695745 is described below

commit 4695745749cfba8528c38eed5ce00a9e90f85f03
Author: Chia-Ping Tsai <ch...@gmail.com>
AuthorDate: Sat Feb 20 07:02:09 2021 +0800

    KAFKA-12339: Add retry to admin client's listOffsets (#10152)
    
    `KafkaAdmin.listOffsets` did not handle topic-level errors, hence the UnknownTopicOrPartitionException on topic-level can obstruct a Connect worker from running when the new internal topic is NOT synced to all brokers. The method did handle partition-level retriable errors by retrying, so this changes to handle topic-level retriable errors in the same way.
    
    This allows a Connect worker to start up and have the admin client retry when the worker is trying to read to the end of the newly-created internal topics until the internal topic metadata is synced to all brokers.
    
    Author: Chia-Ping Tsai <ch...@gmail.com>
    Reviewers: Randall Hauch <rh...@gmail.com>, Konstantine Karantasis <ko...@confluent.io>
---
 .../admin/internals/MetadataOperationContext.java  |  1 +
 .../kafka/clients/admin/KafkaAdminClientTest.java  | 54 +++++++++++++++++++---
 2 files changed, 48 insertions(+), 7 deletions(-)

diff --git a/clients/src/main/java/org/apache/kafka/clients/admin/internals/MetadataOperationContext.java b/clients/src/main/java/org/apache/kafka/clients/admin/internals/MetadataOperationContext.java
index c05e5cf..e7f2c07 100644
--- a/clients/src/main/java/org/apache/kafka/clients/admin/internals/MetadataOperationContext.java
+++ b/clients/src/main/java/org/apache/kafka/clients/admin/internals/MetadataOperationContext.java
@@ -82,6 +82,7 @@ public final class MetadataOperationContext<T, O extends AbstractOptions<O>> {
 
     public static void handleMetadataErrors(MetadataResponse response) {
         for (TopicMetadata tm : response.topicMetadata()) {
+            if (shouldRefreshMetadata(tm.error())) throw tm.error().exception();
             for (PartitionMetadata pm : tm.partitionMetadata()) {
                 if (shouldRefreshMetadata(pm.error)) {
                     throw pm.error.exception();
diff --git a/clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java b/clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java
index 33e80ed..eec6a05 100644
--- a/clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java
+++ b/clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java
@@ -156,6 +156,7 @@ import org.apache.kafka.common.requests.MetadataResponse;
 import org.apache.kafka.common.requests.OffsetCommitResponse;
 import org.apache.kafka.common.requests.OffsetDeleteResponse;
 import org.apache.kafka.common.requests.OffsetFetchResponse;
+import org.apache.kafka.common.requests.OffsetFetchResponse.PartitionData;
 import org.apache.kafka.common.requests.UpdateFeaturesRequest;
 import org.apache.kafka.common.requests.UpdateFeaturesResponse;
 import org.apache.kafka.common.resource.PatternType;
@@ -434,12 +435,16 @@ public class KafkaAdminClientTest {
     }
 
     private static MetadataResponse prepareMetadataResponse(Cluster cluster, Errors error) {
+        return prepareMetadataResponse(cluster, error, error);
+    }
+
+    private static MetadataResponse prepareMetadataResponse(Cluster cluster, Errors topicError, Errors partitionError) {
         List<MetadataResponseTopic> metadata = new ArrayList<>();
         for (String topic : cluster.topics()) {
             List<MetadataResponsePartition> pms = new ArrayList<>();
             for (PartitionInfo pInfo : cluster.availablePartitionsForTopic(topic)) {
                 MetadataResponsePartition pm  = new MetadataResponsePartition()
-                    .setErrorCode(error.code())
+                    .setErrorCode(partitionError.code())
                     .setPartitionIndex(pInfo.partition())
                     .setLeaderId(pInfo.leader().id())
                     .setLeaderEpoch(234)
@@ -449,7 +454,7 @@ public class KafkaAdminClientTest {
                 pms.add(pm);
             }
             MetadataResponseTopic tm = new MetadataResponseTopic()
-                .setErrorCode(error.code())
+                .setErrorCode(topicError.code())
                 .setName(topic)
                 .setIsInternal(false)
                 .setPartitions(pms);
@@ -2720,14 +2725,14 @@ public class KafkaAdminClientTest {
             TopicPartition myTopicPartition2 = new TopicPartition("my_topic", 2);
             TopicPartition myTopicPartition3 = new TopicPartition("my_topic", 3);
 
-            final Map<TopicPartition, OffsetFetchResponse.PartitionData> responseData = new HashMap<>();
-            responseData.put(myTopicPartition0, new OffsetFetchResponse.PartitionData(10,
+            final Map<TopicPartition, PartitionData> responseData = new HashMap<>();
+            responseData.put(myTopicPartition0, new PartitionData(10,
                     Optional.empty(), "", Errors.NONE));
-            responseData.put(myTopicPartition1, new OffsetFetchResponse.PartitionData(0,
+            responseData.put(myTopicPartition1, new PartitionData(0,
                     Optional.empty(), "", Errors.NONE));
-            responseData.put(myTopicPartition2, new OffsetFetchResponse.PartitionData(20,
+            responseData.put(myTopicPartition2, new PartitionData(20,
                     Optional.empty(), "", Errors.NONE));
-            responseData.put(myTopicPartition3, new OffsetFetchResponse.PartitionData(OffsetFetchResponse.INVALID_OFFSET,
+            responseData.put(myTopicPartition3, new PartitionData(OffsetFetchResponse.INVALID_OFFSET,
                     Optional.empty(), "", Errors.NONE));
             env.kafkaClient().prepareResponse(new OffsetFetchResponse(Errors.NONE, responseData));
 
@@ -3834,6 +3839,41 @@ public class KafkaAdminClientTest {
     }
 
     @Test
+    public void testListOffsetsRetriableErrorOnMetadata() throws Exception {
+        Node node = new Node(0, "localhost", 8120);
+        List<Node> nodes = Collections.singletonList(node);
+        final Cluster cluster = new Cluster(
+            "mockClusterId",
+            nodes,
+            Collections.singleton(new PartitionInfo("foo", 0, node, new Node[]{node}, new Node[]{node})),
+            Collections.emptySet(),
+            Collections.emptySet(),
+            node);
+        final TopicPartition tp0 = new TopicPartition("foo", 0);
+
+        try (AdminClientUnitTestEnv env = new AdminClientUnitTestEnv(cluster)) {
+            env.kafkaClient().setNodeApiVersions(NodeApiVersions.create());
+            env.kafkaClient().prepareResponse(prepareMetadataResponse(cluster, Errors.UNKNOWN_TOPIC_OR_PARTITION, Errors.NONE));
+            // metadata refresh because of UNKNOWN_TOPIC_OR_PARTITION
+            env.kafkaClient().prepareResponse(prepareMetadataResponse(cluster, Errors.NONE));
+            // listoffsets response from broker 0
+            ListOffsetTopicResponse t0 = ListOffsetResponse.singletonListOffsetTopicResponse(tp0, Errors.NONE, -1L, 123L, 321);
+            ListOffsetResponseData responseData = new ListOffsetResponseData()
+                .setThrottleTimeMs(0)
+                .setTopics(Arrays.asList(t0));
+            env.kafkaClient().prepareResponse(new ListOffsetResponse(responseData));
+
+            ListOffsetsResult result = env.adminClient().listOffsets(Collections.singletonMap(tp0, OffsetSpec.latest()));
+
+            Map<TopicPartition, ListOffsetsResultInfo> offsets = result.all().get(3, TimeUnit.SECONDS);
+            assertEquals(1, offsets.size());
+            assertEquals(123L, offsets.get(tp0).offset());
+            assertEquals(321, offsets.get(tp0).leaderEpoch().get().intValue());
+            assertEquals(-1L, offsets.get(tp0).timestamp());
+        }
+    }
+
+    @Test
     public void testListOffsetsRetriableErrors() throws Exception {
 
         Node node0 = new Node(0, "localhost", 8120);