You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by jg...@apache.org on 2018/09/01 01:47:10 UTC

[kafka] branch 2.0 updated: KAFKA-7369; Handle retriable errors in AdminClient list groups API (#5595)

This is an automated email from the ASF dual-hosted git repository.

jgus pushed a commit to branch 2.0
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/2.0 by this push:
     new bc93633  KAFKA-7369; Handle retriable errors in AdminClient list groups API (#5595)
bc93633 is described below

commit bc93633031d22d9b8f4bd38fb977fdeddd6b79f0
Author: Jason Gustafson <ja...@confluent.io>
AuthorDate: Fri Aug 31 18:40:42 2018 -0700

    KAFKA-7369; Handle retriable errors in AdminClient list groups API (#5595)
    
    We should retry when possible if ListGroups fails due to a retriable error (e.g. coordinator loading).
    
    Reviewers: Colin Patrick McCabe <co...@cmccabe.xyz>,  Guozhang Wang <wa...@gmail.com>
---
 .../kafka/clients/admin/KafkaAdminClient.java      |  7 ++--
 .../kafka/common/requests/ListGroupsResponse.java  |  1 +
 .../kafka/clients/admin/KafkaAdminClientTest.java  | 40 +++++++++++++++++++---
 3 files changed, 41 insertions(+), 7 deletions(-)

diff --git a/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java b/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java
index 7e245d1..7a1fd62 100644
--- a/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java
+++ b/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java
@@ -2565,8 +2565,11 @@ public class KafkaAdminClient extends AdminClient {
                         void handleResponse(AbstractResponse abstractResponse) {
                             final ListGroupsResponse response = (ListGroupsResponse) abstractResponse;
                             synchronized (results) {
-                                if (response.error() != Errors.NONE) {
-                                    results.addError(response.error().exception(), node);
+                                Errors error = response.error();
+                                if (error == Errors.COORDINATOR_LOAD_IN_PROGRESS || error == Errors.COORDINATOR_NOT_AVAILABLE) {
+                                    throw error.exception();
+                                } else if (error != Errors.NONE) {
+                                    results.addError(error.exception(), node);
                                 } else {
                                     for (ListGroupsResponse.Group group : response.groups()) {
                                         maybeAddConsumerGroup(group);
diff --git a/clients/src/main/java/org/apache/kafka/common/requests/ListGroupsResponse.java b/clients/src/main/java/org/apache/kafka/common/requests/ListGroupsResponse.java
index b108803..af6f721 100644
--- a/clients/src/main/java/org/apache/kafka/common/requests/ListGroupsResponse.java
+++ b/clients/src/main/java/org/apache/kafka/common/requests/ListGroupsResponse.java
@@ -62,6 +62,7 @@ public class ListGroupsResponse extends AbstractResponse {
     /**
      * Possible error codes:
      *
+     * COORDINATOR_LOADING_IN_PROGRESS (14)
      * COORDINATOR_NOT_AVAILABLE (15)
      * AUTHORIZATION_FAILED (29)
      */
diff --git a/clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java b/clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java
index 8363079..9fe79e2 100644
--- a/clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java
+++ b/clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java
@@ -36,7 +36,6 @@ import org.apache.kafka.common.acl.AclOperation;
 import org.apache.kafka.common.acl.AclPermissionType;
 import org.apache.kafka.common.config.ConfigResource;
 import org.apache.kafka.common.errors.AuthenticationException;
-import org.apache.kafka.common.errors.CoordinatorNotAvailableException;
 import org.apache.kafka.common.errors.GroupAuthorizationException;
 import org.apache.kafka.common.errors.InvalidTopicException;
 import org.apache.kafka.common.errors.LeaderNotAvailableException;
@@ -45,6 +44,7 @@ import org.apache.kafka.common.errors.OffsetOutOfRangeException;
 import org.apache.kafka.common.errors.SaslAuthenticationException;
 import org.apache.kafka.common.errors.SecurityDisabledException;
 import org.apache.kafka.common.errors.TimeoutException;
+import org.apache.kafka.common.errors.UnknownServerException;
 import org.apache.kafka.common.errors.UnknownTopicOrPartitionException;
 import org.apache.kafka.common.protocol.Errors;
 import org.apache.kafka.common.requests.ApiError;
@@ -848,9 +848,11 @@ public class KafkaAdminClientTest {
         Node node0 = new Node(0, "localhost", 8121);
         Node node1 = new Node(1, "localhost", 8122);
         Node node2 = new Node(2, "localhost", 8123);
+        Node node3 = new Node(3, "localhost", 8124);
         nodes.put(0, node0);
         nodes.put(1, node1);
         nodes.put(2, node2);
+        nodes.put(3, node3);
 
         final Cluster cluster = new Cluster(
                 "mockClusterId",
@@ -887,13 +889,19 @@ public class KafkaAdminClientTest {
                             )),
                     node0);
 
+            // handle retriable errors
             env.kafkaClient().prepareResponseFrom(
                     new ListGroupsResponse(
                             Errors.COORDINATOR_NOT_AVAILABLE,
                             Collections.emptyList()
                     ),
                     node1);
-
+            env.kafkaClient().prepareResponseFrom(
+                    new ListGroupsResponse(
+                            Errors.COORDINATOR_LOAD_IN_PROGRESS,
+                            Collections.emptyList()
+                    ),
+                    node1);
             env.kafkaClient().prepareResponseFrom(
                     new ListGroupsResponse(
                             Errors.NONE,
@@ -901,15 +909,37 @@ public class KafkaAdminClientTest {
                                     new ListGroupsResponse.Group("group-2", ConsumerProtocol.PROTOCOL_TYPE),
                                     new ListGroupsResponse.Group("group-connect-2", "connector")
                             )),
+                    node1);
+
+            env.kafkaClient().prepareResponseFrom(
+                    new ListGroupsResponse(
+                            Errors.NONE,
+                            asList(
+                                    new ListGroupsResponse.Group("group-3", ConsumerProtocol.PROTOCOL_TYPE),
+                                    new ListGroupsResponse.Group("group-connect-3", "connector")
+                            )),
                     node2);
 
+            // fatal error
+            env.kafkaClient().prepareResponseFrom(
+                    new ListGroupsResponse(
+                            Errors.UNKNOWN_SERVER_ERROR,
+                            Collections.emptyList()),
+                    node3);
+
+
             final ListConsumerGroupsResult result = env.adminClient().listConsumerGroups();
-            assertFutureError(result.all(), CoordinatorNotAvailableException.class);
+            assertFutureError(result.all(), UnknownServerException.class);
+
             Collection<ConsumerGroupListing> listings = result.valid().get();
-            assertEquals(2, listings.size());
+            assertEquals(3, listings.size());
+
+            Set<String> groupIds = new HashSet<>();
             for (ConsumerGroupListing listing : listings) {
-                assertTrue(listing.groupId().equals("group-1") || listing.groupId().equals("group-2"));
+                groupIds.add(listing.groupId());
             }
+
+            assertEquals(Utils.mkSet("group-1", "group-2", "group-3"), groupIds);
             assertEquals(1, result.errors().get().size());
         }
     }