You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by rh...@apache.org on 2021/02/22 16:15:22 UTC
[kafka] 02/02: KAFKA-12339: Add retry to admin client's listOffsets
(#10152)
This is an automated email from the ASF dual-hosted git repository.
rhauch pushed a commit to branch 2.8
in repository https://gitbox.apache.org/repos/asf/kafka.git
commit 09bba2755914e03ee4217811975732ac6564d649
Author: Chia-Ping Tsai <ch...@gmail.com>
AuthorDate: Sat Feb 20 07:02:09 2021 +0800
KAFKA-12339: Add retry to admin client's listOffsets (#10152)
`KafkaAdmin.listOffsets` did not handle topic-level errors, hence the UnknownTopicOrPartitionException on topic-level can obstruct a Connect worker from running when the new internal topic is NOT synced to all brokers. The method did handle partition-level retriable errors by retrying, so this changes to handle topic-level retriable errors in the same way.
This allows a Connect worker to start up and have the admin client retry when the worker is trying to read to the end of the newly-created internal topics until the internal topic metadata is synced to all brokers.
Author: Chia-Ping Tsai <ch...@gmail.com>
Reviewers: Randall Hauch <rh...@gmail.com>, Konstantine Karantasis <ko...@confluent.io>
---
.../admin/internals/MetadataOperationContext.java | 1 +
.../kafka/clients/admin/KafkaAdminClientTest.java | 54 ++++++++++++++++++----
2 files changed, 47 insertions(+), 8 deletions(-)
diff --git a/clients/src/main/java/org/apache/kafka/clients/admin/internals/MetadataOperationContext.java b/clients/src/main/java/org/apache/kafka/clients/admin/internals/MetadataOperationContext.java
index c05e5cf..e7f2c07 100644
--- a/clients/src/main/java/org/apache/kafka/clients/admin/internals/MetadataOperationContext.java
+++ b/clients/src/main/java/org/apache/kafka/clients/admin/internals/MetadataOperationContext.java
@@ -82,6 +82,7 @@ public final class MetadataOperationContext<T, O extends AbstractOptions<O>> {
public static void handleMetadataErrors(MetadataResponse response) {
for (TopicMetadata tm : response.topicMetadata()) {
+ if (shouldRefreshMetadata(tm.error())) throw tm.error().exception();
for (PartitionMetadata pm : tm.partitionMetadata()) {
if (shouldRefreshMetadata(pm.error)) {
throw pm.error.exception();
diff --git a/clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java b/clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java
index ce8f056..6eb972b 100644
--- a/clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java
+++ b/clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java
@@ -445,12 +445,16 @@ public class KafkaAdminClientTest {
}
private static MetadataResponse prepareMetadataResponse(Cluster cluster, Errors error) {
+ return prepareMetadataResponse(cluster, error, error);
+ }
+
+ private static MetadataResponse prepareMetadataResponse(Cluster cluster, Errors topicError, Errors partitionError) {
List<MetadataResponseTopic> metadata = new ArrayList<>();
for (String topic : cluster.topics()) {
List<MetadataResponsePartition> pms = new ArrayList<>();
for (PartitionInfo pInfo : cluster.availablePartitionsForTopic(topic)) {
MetadataResponsePartition pm = new MetadataResponsePartition()
- .setErrorCode(error.code())
+ .setErrorCode(partitionError.code())
.setPartitionIndex(pInfo.partition())
.setLeaderId(pInfo.leader().id())
.setLeaderEpoch(234)
@@ -460,19 +464,19 @@ public class KafkaAdminClientTest {
pms.add(pm);
}
MetadataResponseTopic tm = new MetadataResponseTopic()
- .setErrorCode(error.code())
+ .setErrorCode(topicError.code())
.setName(topic)
.setIsInternal(false)
.setPartitions(pms);
metadata.add(tm);
}
return MetadataResponse.prepareResponse(true,
- 0,
- cluster.nodes(),
- cluster.clusterResource().clusterId(),
- cluster.controller().id(),
- metadata,
- MetadataResponse.AUTHORIZED_OPERATIONS_OMITTED);
+ 0,
+ cluster.nodes(),
+ cluster.clusterResource().clusterId(),
+ cluster.controller().id(),
+ metadata,
+ MetadataResponse.AUTHORIZED_OPERATIONS_OMITTED);
}
private static DescribeGroupsResponseData prepareDescribeGroupsResponseData(String groupId,
@@ -3909,6 +3913,40 @@ public class KafkaAdminClientTest {
}
@Test
+ public void testListOffsetsRetriableErrorOnMetadata() throws Exception {
+ Node node = new Node(0, "localhost", 8120);
+ List<Node> nodes = Collections.singletonList(node);
+ final Cluster cluster = new Cluster(
+ "mockClusterId",
+ nodes,
+ Collections.singleton(new PartitionInfo("foo", 0, node, new Node[]{node}, new Node[]{node})),
+ Collections.emptySet(),
+ Collections.emptySet(),
+ node);
+ final TopicPartition tp0 = new TopicPartition("foo", 0);
+
+ try (AdminClientUnitTestEnv env = new AdminClientUnitTestEnv(cluster)) {
+ env.kafkaClient().setNodeApiVersions(NodeApiVersions.create());
+ env.kafkaClient().prepareResponse(prepareMetadataResponse(cluster, Errors.UNKNOWN_TOPIC_OR_PARTITION, Errors.NONE));
+ // metadata refresh because of UNKNOWN_TOPIC_OR_PARTITION
+ env.kafkaClient().prepareResponse(prepareMetadataResponse(cluster, Errors.NONE));
+ // listoffsets response from broker 0
+ ListOffsetsResponseData responseData = new ListOffsetsResponseData()
+ .setThrottleTimeMs(0)
+ .setTopics(Collections.singletonList(ListOffsetsResponse.singletonListOffsetsTopicResponse(tp0, Errors.NONE, -1L, 123L, 321)));
+ env.kafkaClient().prepareResponseFrom(new ListOffsetsResponse(responseData), node);
+
+ ListOffsetsResult result = env.adminClient().listOffsets(Collections.singletonMap(tp0, OffsetSpec.latest()));
+
+ Map<TopicPartition, ListOffsetsResultInfo> offsets = result.all().get(3, TimeUnit.SECONDS);
+ assertEquals(1, offsets.size());
+ assertEquals(123L, offsets.get(tp0).offset());
+ assertEquals(321, offsets.get(tp0).leaderEpoch().get().intValue());
+ assertEquals(-1L, offsets.get(tp0).timestamp());
+ }
+ }
+
+ @Test
public void testListOffsetsRetriableErrors() throws Exception {
Node node0 = new Node(0, "localhost", 8120);