You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by jg...@apache.org on 2018/09/01 01:47:10 UTC
[kafka] branch 2.0 updated: KAFKA-7369;
Handle retriable errors in AdminClient list groups API (#5595)
This is an automated email from the ASF dual-hosted git repository.
jgus pushed a commit to branch 2.0
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/2.0 by this push:
new bc93633 KAFKA-7369; Handle retriable errors in AdminClient list groups API (#5595)
bc93633 is described below
commit bc93633031d22d9b8f4bd38fb977fdeddd6b79f0
Author: Jason Gustafson <ja...@confluent.io>
AuthorDate: Fri Aug 31 18:40:42 2018 -0700
KAFKA-7369; Handle retriable errors in AdminClient list groups API (#5595)
We should retry when possible if ListGroups fails due to a retriable error (e.g. coordinator loading).
Reviewers: Colin Patrick McCabe <co...@cmccabe.xyz>, Guozhang Wang <wa...@gmail.com>
---
.../kafka/clients/admin/KafkaAdminClient.java | 7 ++--
.../kafka/common/requests/ListGroupsResponse.java | 1 +
.../kafka/clients/admin/KafkaAdminClientTest.java | 40 +++++++++++++++++++---
3 files changed, 41 insertions(+), 7 deletions(-)
diff --git a/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java b/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java
index 7e245d1..7a1fd62 100644
--- a/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java
+++ b/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java
@@ -2565,8 +2565,11 @@ public class KafkaAdminClient extends AdminClient {
void handleResponse(AbstractResponse abstractResponse) {
final ListGroupsResponse response = (ListGroupsResponse) abstractResponse;
synchronized (results) {
- if (response.error() != Errors.NONE) {
- results.addError(response.error().exception(), node);
+ Errors error = response.error();
+ if (error == Errors.COORDINATOR_LOAD_IN_PROGRESS || error == Errors.COORDINATOR_NOT_AVAILABLE) {
+ throw error.exception();
+ } else if (error != Errors.NONE) {
+ results.addError(error.exception(), node);
} else {
for (ListGroupsResponse.Group group : response.groups()) {
maybeAddConsumerGroup(group);
diff --git a/clients/src/main/java/org/apache/kafka/common/requests/ListGroupsResponse.java b/clients/src/main/java/org/apache/kafka/common/requests/ListGroupsResponse.java
index b108803..af6f721 100644
--- a/clients/src/main/java/org/apache/kafka/common/requests/ListGroupsResponse.java
+++ b/clients/src/main/java/org/apache/kafka/common/requests/ListGroupsResponse.java
@@ -62,6 +62,7 @@ public class ListGroupsResponse extends AbstractResponse {
/**
* Possible error codes:
*
+ * COORDINATOR_LOADING_IN_PROGRESS (14)
* COORDINATOR_NOT_AVAILABLE (15)
* AUTHORIZATION_FAILED (29)
*/
diff --git a/clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java b/clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java
index 8363079..9fe79e2 100644
--- a/clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java
+++ b/clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java
@@ -36,7 +36,6 @@ import org.apache.kafka.common.acl.AclOperation;
import org.apache.kafka.common.acl.AclPermissionType;
import org.apache.kafka.common.config.ConfigResource;
import org.apache.kafka.common.errors.AuthenticationException;
-import org.apache.kafka.common.errors.CoordinatorNotAvailableException;
import org.apache.kafka.common.errors.GroupAuthorizationException;
import org.apache.kafka.common.errors.InvalidTopicException;
import org.apache.kafka.common.errors.LeaderNotAvailableException;
@@ -45,6 +44,7 @@ import org.apache.kafka.common.errors.OffsetOutOfRangeException;
import org.apache.kafka.common.errors.SaslAuthenticationException;
import org.apache.kafka.common.errors.SecurityDisabledException;
import org.apache.kafka.common.errors.TimeoutException;
+import org.apache.kafka.common.errors.UnknownServerException;
import org.apache.kafka.common.errors.UnknownTopicOrPartitionException;
import org.apache.kafka.common.protocol.Errors;
import org.apache.kafka.common.requests.ApiError;
@@ -848,9 +848,11 @@ public class KafkaAdminClientTest {
Node node0 = new Node(0, "localhost", 8121);
Node node1 = new Node(1, "localhost", 8122);
Node node2 = new Node(2, "localhost", 8123);
+ Node node3 = new Node(3, "localhost", 8124);
nodes.put(0, node0);
nodes.put(1, node1);
nodes.put(2, node2);
+ nodes.put(3, node3);
final Cluster cluster = new Cluster(
"mockClusterId",
@@ -887,13 +889,19 @@ public class KafkaAdminClientTest {
)),
node0);
+ // handle retriable errors
env.kafkaClient().prepareResponseFrom(
new ListGroupsResponse(
Errors.COORDINATOR_NOT_AVAILABLE,
Collections.emptyList()
),
node1);
-
+ env.kafkaClient().prepareResponseFrom(
+ new ListGroupsResponse(
+ Errors.COORDINATOR_LOAD_IN_PROGRESS,
+ Collections.emptyList()
+ ),
+ node1);
env.kafkaClient().prepareResponseFrom(
new ListGroupsResponse(
Errors.NONE,
@@ -901,15 +909,37 @@ public class KafkaAdminClientTest {
new ListGroupsResponse.Group("group-2", ConsumerProtocol.PROTOCOL_TYPE),
new ListGroupsResponse.Group("group-connect-2", "connector")
)),
+ node1);
+
+ env.kafkaClient().prepareResponseFrom(
+ new ListGroupsResponse(
+ Errors.NONE,
+ asList(
+ new ListGroupsResponse.Group("group-3", ConsumerProtocol.PROTOCOL_TYPE),
+ new ListGroupsResponse.Group("group-connect-3", "connector")
+ )),
node2);
+ // fatal error
+ env.kafkaClient().prepareResponseFrom(
+ new ListGroupsResponse(
+ Errors.UNKNOWN_SERVER_ERROR,
+ Collections.emptyList()),
+ node3);
+
+
final ListConsumerGroupsResult result = env.adminClient().listConsumerGroups();
- assertFutureError(result.all(), CoordinatorNotAvailableException.class);
+ assertFutureError(result.all(), UnknownServerException.class);
+
Collection<ConsumerGroupListing> listings = result.valid().get();
- assertEquals(2, listings.size());
+ assertEquals(3, listings.size());
+
+ Set<String> groupIds = new HashSet<>();
for (ConsumerGroupListing listing : listings) {
- assertTrue(listing.groupId().equals("group-1") || listing.groupId().equals("group-2"));
+ groupIds.add(listing.groupId());
}
+
+ assertEquals(Utils.mkSet("group-1", "group-2", "group-3"), groupIds);
assertEquals(1, result.errors().get().size());
}
}