You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by jg...@apache.org on 2019/05/09 22:31:28 UTC

[kafka] branch trunk updated: KAFKA-6789; Handle retriable group errors in AdminClient API (#5578)

This is an automated email from the ASF dual-hosted git repository.

jgus pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 5a8d74e  KAFKA-6789; Handle retriable group errors in AdminClient API (#5578)
5a8d74e is described below

commit 5a8d74e151e4472d5e5b4541972f5515e4f138ff
Author: Manikumar Reddy <ma...@gmail.com>
AuthorDate: Fri May 10 04:01:10 2019 +0530

    KAFKA-6789; Handle retriable group errors in AdminClient API (#5578)
    
    This patch adds support to retry all group operations after COORDINATOR_LOAD_IN_PROGRESS and COORDINATOR_NOT_AVAILABLE in AdminClient group operations. Previously we only had logic to retry after FindCoordinator failures.
    
    Reviewers: Yishun Guan <gy...@gmail.com>, Viktor Somogyi <vi...@gmail.com>, Jason Gustafson <ja...@confluent.io>
---
 .../kafka/clients/admin/KafkaAdminClient.java      | 116 ++++++++++-----------
 .../kafka/common/requests/ListGroupsResponse.java  |   2 +-
 .../kafka/clients/admin/KafkaAdminClientTest.java  |  68 ++++++++++--
 3 files changed, 117 insertions(+), 69 deletions(-)

diff --git a/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java b/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java
index 8870519..f0e6635 100644
--- a/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java
+++ b/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java
@@ -2551,7 +2551,7 @@ public class KafkaAdminClient extends AdminClient {
                 void handleResponse(AbstractResponse abstractResponse) {
                     final FindCoordinatorResponse fcResponse = (FindCoordinatorResponse) abstractResponse;
 
-                    if (handleFindCoordinatorError(fcResponse, futures.get(groupId)))
+                    if (handleGroupRequestError(fcResponse.error(), futures.get(groupId)))
                         return;
 
                     final long nowDescribeConsumerGroups = time.milliseconds();
@@ -2577,38 +2577,37 @@ public class KafkaAdminClient extends AdminClient {
                                 .findFirst().get();
 
                             final Errors groupError = Errors.forCode(describedGroup.errorCode());
-                            if (groupError != Errors.NONE) {
-                                // TODO: KAFKA-6789, we can retry based on the error code
-                                future.completeExceptionally(groupError.exception());
-                            } else {
-                                final String protocolType = describedGroup.protocolType();
-                                if (protocolType.equals(ConsumerProtocol.PROTOCOL_TYPE) || protocolType.isEmpty()) {
-                                    final List<DescribedGroupMember> members = describedGroup.members();
-                                    final List<MemberDescription> memberDescriptions = new ArrayList<>(members.size());
-                                    final Set<AclOperation> authorizedOperations = validAclOperations(describedGroup.authorizedOperations());
-                                    for (DescribedGroupMember groupMember : members) {
-                                        Set<TopicPartition> partitions = Collections.emptySet();
-                                        if (groupMember.memberAssignment().length > 0) {
-                                            final PartitionAssignor.Assignment assignment = ConsumerProtocol.
-                                                deserializeAssignment(ByteBuffer.wrap(groupMember.memberAssignment()));
-                                            partitions = new HashSet<>(assignment.partitions());
-                                        }
-                                        final MemberDescription memberDescription =
-                                            new MemberDescription(groupMember.memberId(),
-                                                groupMember.clientId(),
-                                                groupMember.clientHost(),
-                                                new MemberAssignment(partitions));
-                                        memberDescriptions.add(memberDescription);
+
+                            if (handleGroupRequestError(groupError, future))
+                                return;
+
+                            final String protocolType = describedGroup.protocolType();
+                            if (protocolType.equals(ConsumerProtocol.PROTOCOL_TYPE) || protocolType.isEmpty()) {
+                                final List<DescribedGroupMember> members = describedGroup.members();
+                                final List<MemberDescription> memberDescriptions = new ArrayList<>(members.size());
+                                final Set<AclOperation> authorizedOperations = validAclOperations(describedGroup.authorizedOperations());
+                                for (DescribedGroupMember groupMember : members) {
+                                    Set<TopicPartition> partitions = Collections.emptySet();
+                                    if (groupMember.memberAssignment().length > 0) {
+                                        final PartitionAssignor.Assignment assignment = ConsumerProtocol.
+                                            deserializeAssignment(ByteBuffer.wrap(groupMember.memberAssignment()));
+                                        partitions = new HashSet<>(assignment.partitions());
                                     }
-                                    final ConsumerGroupDescription consumerGroupDescription =
-                                            new ConsumerGroupDescription(groupId, protocolType.isEmpty(),
-                                                memberDescriptions,
-                                                describedGroup.protocolData(),
-                                                ConsumerGroupState.parse(describedGroup.groupState()),
-                                                fcResponse.node(),
-                                                authorizedOperations);
-                                    future.complete(consumerGroupDescription);
+                                    final MemberDescription memberDescription =
+                                        new MemberDescription(groupMember.memberId(),
+                                            groupMember.clientId(),
+                                            groupMember.clientHost(),
+                                            new MemberAssignment(partitions));
+                                    memberDescriptions.add(memberDescription);
                                 }
+                                final ConsumerGroupDescription consumerGroupDescription =
+                                    new ConsumerGroupDescription(groupId, protocolType.isEmpty(),
+                                        memberDescriptions,
+                                        describedGroup.protocolData(),
+                                        ConsumerGroupState.parse(describedGroup.groupState()),
+                                        fcResponse.node(),
+                                        authorizedOperations);
+                                future.complete(consumerGroupDescription);
                             }
                         }
 
@@ -2641,11 +2640,10 @@ public class KafkaAdminClient extends AdminClient {
             .collect(Collectors.toSet());
     }
 
-    private boolean handleFindCoordinatorError(FindCoordinatorResponse response, KafkaFutureImpl<?> future) {
-        Errors error = response.error();
-        if (error.exception() instanceof RetriableException) {
+    private boolean handleGroupRequestError(Errors error, KafkaFutureImpl<?> future) {
+        if (error == Errors.COORDINATOR_LOAD_IN_PROGRESS || error == Errors.COORDINATOR_NOT_AVAILABLE) {
             throw error.exception();
-        } else if (response.hasError()) {
+        } else if (error != Errors.NONE) {
             future.completeExceptionally(error.exception());
             return true;
         }
@@ -2797,7 +2795,7 @@ public class KafkaAdminClient extends AdminClient {
             void handleResponse(AbstractResponse abstractResponse) {
                 final FindCoordinatorResponse response = (FindCoordinatorResponse) abstractResponse;
 
-                if (handleFindCoordinatorError(response, groupOffsetListingFuture))
+                if (handleGroupRequestError(response.error(), groupOffsetListingFuture))
                     return;
 
                 final long nowListConsumerGroupOffsets = time.milliseconds();
@@ -2815,26 +2813,25 @@ public class KafkaAdminClient extends AdminClient {
                         final OffsetFetchResponse response = (OffsetFetchResponse) abstractResponse;
                         final Map<TopicPartition, OffsetAndMetadata> groupOffsetsListing = new HashMap<>();
 
-                        if (response.hasError()) {
-                            groupOffsetListingFuture.completeExceptionally(response.error().exception());
-                        } else {
-                            for (Map.Entry<TopicPartition, OffsetFetchResponse.PartitionData> entry :
-                                    response.responseData().entrySet()) {
-                                final TopicPartition topicPartition = entry.getKey();
-                                OffsetFetchResponse.PartitionData partitionData = entry.getValue();
-                                final Errors error = partitionData.error;
-
-                                if (error == Errors.NONE) {
-                                    final Long offset = partitionData.offset;
-                                    final String metadata = partitionData.metadata;
-                                    final Optional<Integer> leaderEpoch = partitionData.leaderEpoch;
-                                    groupOffsetsListing.put(topicPartition, new OffsetAndMetadata(offset, leaderEpoch, metadata));
-                                } else {
-                                    log.warn("Skipping return offset for {} due to error {}.", topicPartition, error);
-                                }
+                        if (handleGroupRequestError(response.error(), groupOffsetListingFuture))
+                            return;
+
+                        for (Map.Entry<TopicPartition, OffsetFetchResponse.PartitionData> entry :
+                            response.responseData().entrySet()) {
+                            final TopicPartition topicPartition = entry.getKey();
+                            OffsetFetchResponse.PartitionData partitionData = entry.getValue();
+                            final Errors error = partitionData.error;
+
+                            if (error == Errors.NONE) {
+                                final Long offset = partitionData.offset;
+                                final String metadata = partitionData.metadata;
+                                final Optional<Integer> leaderEpoch = partitionData.leaderEpoch;
+                                groupOffsetsListing.put(topicPartition, new OffsetAndMetadata(offset, leaderEpoch, metadata));
+                            } else {
+                                log.warn("Skipping return offset for {} due to error {}.", topicPartition, error);
                             }
-                            groupOffsetListingFuture.complete(groupOffsetsListing);
                         }
+                        groupOffsetListingFuture.complete(groupOffsetsListing);
                     }
 
                     @Override
@@ -2891,7 +2888,7 @@ public class KafkaAdminClient extends AdminClient {
                 void handleResponse(AbstractResponse abstractResponse) {
                     final FindCoordinatorResponse response = (FindCoordinatorResponse) abstractResponse;
 
-                    if (handleFindCoordinatorError(response, futures.get(groupId)))
+                    if (handleGroupRequestError(response.error(), futures.get(groupId)))
                         return;
 
                     final long nowDeleteConsumerGroups = time.milliseconds();
@@ -2912,11 +2909,10 @@ public class KafkaAdminClient extends AdminClient {
                             KafkaFutureImpl<Void> future = futures.get(groupId);
                             final Errors groupError = response.get(groupId);
 
-                            if (groupError != Errors.NONE) {
-                                future.completeExceptionally(groupError.exception());
-                            } else {
-                                future.complete(null);
-                            }
+                            if (handleGroupRequestError(groupError, future))
+                                return;
+
+                            future.complete(null);
                         }
 
                         @Override
diff --git a/clients/src/main/java/org/apache/kafka/common/requests/ListGroupsResponse.java b/clients/src/main/java/org/apache/kafka/common/requests/ListGroupsResponse.java
index af6f721..de16998 100644
--- a/clients/src/main/java/org/apache/kafka/common/requests/ListGroupsResponse.java
+++ b/clients/src/main/java/org/apache/kafka/common/requests/ListGroupsResponse.java
@@ -62,7 +62,7 @@ public class ListGroupsResponse extends AbstractResponse {
     /**
      * Possible error codes:
      *
-     * COORDINATOR_LOADING_IN_PROGRESS (14)
+     * COORDINATOR_LOAD_IN_PROGRESS (14)
      * COORDINATOR_NOT_AVAILABLE (15)
      * AUTHORIZATION_FAILED (29)
      */
diff --git a/clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java b/clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java
index 687dad2..40ba8b1 100644
--- a/clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java
+++ b/clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java
@@ -942,7 +942,7 @@ public class KafkaAdminClientTest {
                 Collections.emptySet(),
                 Collections.emptySet(), nodes.get(0));
 
-        try (AdminClientUnitTestEnv env = new AdminClientUnitTestEnv(cluster)) {
+        try (AdminClientUnitTestEnv env = new AdminClientUnitTestEnv(cluster, AdminClientConfig.RETRIES_CONFIG, "2")) {
             env.kafkaClient().setNodeApiVersions(NodeApiVersions.create());
 
             // Empty metadata response should be retried
@@ -972,8 +972,8 @@ public class KafkaAdminClientTest {
             // handle retriable errors
             env.kafkaClient().prepareResponseFrom(
                     new ListGroupsResponse(
-                            Errors.COORDINATOR_NOT_AVAILABLE,
-                            Collections.emptyList()
+                        Errors.COORDINATOR_NOT_AVAILABLE,
+                        Collections.emptyList()
                     ),
                     node1);
             env.kafkaClient().prepareResponseFrom(
@@ -1076,9 +1076,37 @@ public class KafkaAdminClientTest {
         try (AdminClientUnitTestEnv env = new AdminClientUnitTestEnv(cluster)) {
             env.kafkaClient().setNodeApiVersions(NodeApiVersions.create());
 
-            env.kafkaClient().prepareResponse(prepareFindCoordinatorResponse(Errors.NONE, env.cluster().controller()));
+            //Retriable FindCoordinatorResponse errors should be retried
+            env.kafkaClient().prepareResponse(FindCoordinatorResponse.prepareResponse(Errors.COORDINATOR_NOT_AVAILABLE,  Node.noNode()));
+            env.kafkaClient().prepareResponse(FindCoordinatorResponse.prepareResponse(Errors.COORDINATOR_LOAD_IN_PROGRESS,  Node.noNode()));
+
+            env.kafkaClient().prepareResponse(FindCoordinatorResponse.prepareResponse(Errors.NONE, env.cluster().controller()));
 
             DescribeGroupsResponseData data = new DescribeGroupsResponseData();
+
+            //Retriable  errors should be retried
+            data.groups().add(DescribeGroupsResponse.groupMetadata(
+                "group-0",
+                Errors.COORDINATOR_LOAD_IN_PROGRESS,
+                "",
+                "",
+                "",
+                Collections.emptyList(),
+                Collections.emptySet()));
+            env.kafkaClient().prepareResponse(new DescribeGroupsResponse(data));
+
+            data = new DescribeGroupsResponseData();
+            data.groups().add(DescribeGroupsResponse.groupMetadata(
+                "group-0",
+                Errors.COORDINATOR_NOT_AVAILABLE,
+                "",
+                "",
+                "",
+                Collections.emptyList(),
+                Collections.emptySet()));
+            env.kafkaClient().prepareResponse(new DescribeGroupsResponse(data));
+
+            data = new DescribeGroupsResponseData();
             TopicPartition myTopicPartition0 = new TopicPartition("my_topic", 0);
             TopicPartition myTopicPartition1 = new TopicPartition("my_topic", 1);
             TopicPartition myTopicPartition2 = new TopicPartition("my_topic", 2);
@@ -1143,7 +1171,14 @@ public class KafkaAdminClientTest {
         try (AdminClientUnitTestEnv env = new AdminClientUnitTestEnv(cluster)) {
             env.kafkaClient().setNodeApiVersions(NodeApiVersions.create());
 
-            env.kafkaClient().prepareResponse(prepareFindCoordinatorResponse(Errors.NONE, env.cluster().controller()));
+            //Retriable FindCoordinatorResponse errors should be retried
+            env.kafkaClient().prepareResponse(FindCoordinatorResponse.prepareResponse(Errors.COORDINATOR_NOT_AVAILABLE,  Node.noNode()));
+
+            env.kafkaClient().prepareResponse(FindCoordinatorResponse.prepareResponse(Errors.NONE, env.cluster().controller()));
+
+            //Retriable  errors should be retried
+            env.kafkaClient().prepareResponse(new OffsetFetchResponse(Errors.COORDINATOR_NOT_AVAILABLE, Collections.emptyMap()));
+            env.kafkaClient().prepareResponse(new OffsetFetchResponse(Errors.COORDINATOR_LOAD_IN_PROGRESS, Collections.emptyMap()));
 
             TopicPartition myTopicPartition0 = new TopicPartition("my_topic", 0);
             TopicPartition myTopicPartition1 = new TopicPartition("my_topic", 1);
@@ -1192,9 +1227,9 @@ public class KafkaAdminClientTest {
 
             env.kafkaClient().prepareResponse(prepareFindCoordinatorResponse(Errors.NONE, env.cluster().controller()));
 
-            final Map<String, Errors> response = new HashMap<>();
-            response.put("group-0", Errors.NONE);
-            env.kafkaClient().prepareResponse(new DeleteGroupsResponse(response));
+            final Map<String, Errors> validResponse = new HashMap<>();
+            validResponse.put("group-0", Errors.NONE);
+            env.kafkaClient().prepareResponse(new DeleteGroupsResponse(validResponse));
 
             final DeleteConsumerGroupsResult result = env.adminClient().deleteConsumerGroups(groupIds);
 
@@ -1207,6 +1242,23 @@ public class KafkaAdminClientTest {
             final DeleteConsumerGroupsResult errorResult = env.adminClient().deleteConsumerGroups(groupIds);
             TestUtils.assertFutureError(errorResult.deletedGroups().get("group-0"), GroupAuthorizationException.class);
 
+            //Retriable  errors should be retried
+            env.kafkaClient().prepareResponse(FindCoordinatorResponse.prepareResponse(Errors.NONE, env.cluster().controller()));
+
+            final Map<String, Errors> errorResponse1 = new HashMap<>();
+            errorResponse1.put("group-0", Errors.COORDINATOR_NOT_AVAILABLE);
+            env.kafkaClient().prepareResponse(new DeleteGroupsResponse(errorResponse1));
+
+            final Map<String, Errors> errorResponse2 = new HashMap<>();
+            errorResponse2.put("group-0", Errors.COORDINATOR_LOAD_IN_PROGRESS);
+            env.kafkaClient().prepareResponse(new DeleteGroupsResponse(errorResponse2));
+
+            env.kafkaClient().prepareResponse(new DeleteGroupsResponse(validResponse));
+
+            final DeleteConsumerGroupsResult errorResult1 = env.adminClient().deleteConsumerGroups(groupIds);
+
+            final KafkaFuture<Void> errorResults = errorResult1.deletedGroups().get("group-0");
+            assertNull(errorResults.get());
         }
     }