You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by jg...@apache.org on 2019/05/09 22:49:14 UTC
[kafka] branch 2.2 updated: KAFKA-6789;
Handle retriable group errors in AdminClient API (#5578)
This is an automated email from the ASF dual-hosted git repository.
jgus pushed a commit to branch 2.2
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/2.2 by this push:
new f8447a8 KAFKA-6789; Handle retriable group errors in AdminClient API (#5578)
f8447a8 is described below
commit f8447a831e8791fc7f7c9240ebfa0cf7e7e331f3
Author: Manikumar Reddy <ma...@gmail.com>
AuthorDate: Fri May 10 04:01:10 2019 +0530
KAFKA-6789; Handle retriable group errors in AdminClient API (#5578)
This patch adds support to retry all group operations after COORDINATOR_LOAD_IN_PROGRESS and COORDINATOR_NOT_AVAILABLE in AdminClient group operations. Previously we only had logic to retry after FindCoordinator failures.
Reviewers: Yishun Guan <gy...@gmail.com>, Viktor Somogyi <vi...@gmail.com>, Jason Gustafson <ja...@confluent.io>
---
.../kafka/clients/admin/KafkaAdminClient.java | 117 ++++++++++-----------
.../kafka/common/requests/ListGroupsResponse.java | 2 +-
.../kafka/clients/admin/KafkaAdminClientTest.java | 63 +++++++++--
3 files changed, 113 insertions(+), 69 deletions(-)
diff --git a/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java b/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java
index f983cbb..a6f2aea 100644
--- a/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java
+++ b/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java
@@ -2418,7 +2418,7 @@ public class KafkaAdminClient extends AdminClient {
void handleResponse(AbstractResponse abstractResponse) {
final FindCoordinatorResponse fcResponse = (FindCoordinatorResponse) abstractResponse;
- if (handleFindCoordinatorError(fcResponse, futures.get(groupId)))
+ if (handleGroupRequestError(fcResponse.error(), futures.get(groupId)))
return;
final long nowDescribeConsumerGroups = time.milliseconds();
@@ -2436,39 +2436,36 @@ public class KafkaAdminClient extends AdminClient {
KafkaFutureImpl<ConsumerGroupDescription> future = futures.get(groupId);
final DescribeGroupsResponse.GroupMetadata groupMetadata = response.groups().get(groupId);
- final Errors groupError = groupMetadata.error();
- if (groupError != Errors.NONE) {
- // TODO: KAFKA-6789, we can retry based on the error code
- future.completeExceptionally(groupError.exception());
- } else {
- final String protocolType = groupMetadata.protocolType();
- if (protocolType.equals(ConsumerProtocol.PROTOCOL_TYPE) || protocolType.isEmpty()) {
- final List<DescribeGroupsResponse.GroupMember> members = groupMetadata.members();
- final List<MemberDescription> memberDescriptions = new ArrayList<>(members.size());
-
- for (DescribeGroupsResponse.GroupMember groupMember : members) {
- Set<TopicPartition> partitions = Collections.emptySet();
- if (groupMember.memberAssignment().remaining() > 0) {
- final PartitionAssignor.Assignment assignment = ConsumerProtocol.
- deserializeAssignment(groupMember.memberAssignment().duplicate());
- partitions = new HashSet<>(assignment.partitions());
- }
- final MemberDescription memberDescription =
- new MemberDescription(groupMember.memberId(),
- groupMember.clientId(),
- groupMember.clientHost(),
- new MemberAssignment(partitions));
- memberDescriptions.add(memberDescription);
+ if (handleGroupRequestError(groupMetadata.error(), future))
+ return;
+
+ final String protocolType = groupMetadata.protocolType();
+ if (protocolType.equals(ConsumerProtocol.PROTOCOL_TYPE) || protocolType.isEmpty()) {
+ final List<DescribeGroupsResponse.GroupMember> members = groupMetadata.members();
+ final List<MemberDescription> memberDescriptions = new ArrayList<>(members.size());
+
+ for (DescribeGroupsResponse.GroupMember groupMember : members) {
+ Set<TopicPartition> partitions = Collections.emptySet();
+ if (groupMember.memberAssignment().remaining() > 0) {
+ final PartitionAssignor.Assignment assignment = ConsumerProtocol.
+ deserializeAssignment(groupMember.memberAssignment().duplicate());
+ partitions = new HashSet<>(assignment.partitions());
}
- final ConsumerGroupDescription consumerGroupDescription =
- new ConsumerGroupDescription(groupId,
- protocolType.isEmpty(),
- memberDescriptions,
- groupMetadata.protocol(),
- ConsumerGroupState.parse(groupMetadata.state()),
- fcResponse.node());
- future.complete(consumerGroupDescription);
+ final MemberDescription memberDescription =
+ new MemberDescription(groupMember.memberId(),
+ groupMember.clientId(),
+ groupMember.clientHost(),
+ new MemberAssignment(partitions));
+ memberDescriptions.add(memberDescription);
}
+ final ConsumerGroupDescription consumerGroupDescription =
+ new ConsumerGroupDescription(groupId,
+ protocolType.isEmpty(),
+ memberDescriptions,
+ groupMetadata.protocol(),
+ ConsumerGroupState.parse(groupMetadata.state()),
+ fcResponse.node());
+ future.complete(consumerGroupDescription);
}
}
@@ -2491,11 +2488,11 @@ public class KafkaAdminClient extends AdminClient {
return new DescribeConsumerGroupsResult(new HashMap<>(futures));
}
- private boolean handleFindCoordinatorError(FindCoordinatorResponse response, KafkaFutureImpl<?> future) {
- Errors error = response.error();
- if (error.exception() instanceof RetriableException) {
+
+ private boolean handleGroupRequestError(Errors error, KafkaFutureImpl<?> future) {
+ if (error == Errors.COORDINATOR_LOAD_IN_PROGRESS || error == Errors.COORDINATOR_NOT_AVAILABLE) {
throw error.exception();
- } else if (response.hasError()) {
+ } else if (error != Errors.NONE) {
future.completeExceptionally(error.exception());
return true;
}
@@ -2642,7 +2639,7 @@ public class KafkaAdminClient extends AdminClient {
void handleResponse(AbstractResponse abstractResponse) {
final FindCoordinatorResponse response = (FindCoordinatorResponse) abstractResponse;
- if (handleFindCoordinatorError(response, groupOffsetListingFuture))
+ if (handleGroupRequestError(response.error(), groupOffsetListingFuture))
return;
final long nowListConsumerGroupOffsets = time.milliseconds();
@@ -2660,26 +2657,25 @@ public class KafkaAdminClient extends AdminClient {
final OffsetFetchResponse response = (OffsetFetchResponse) abstractResponse;
final Map<TopicPartition, OffsetAndMetadata> groupOffsetsListing = new HashMap<>();
- if (response.hasError()) {
- groupOffsetListingFuture.completeExceptionally(response.error().exception());
- } else {
- for (Map.Entry<TopicPartition, OffsetFetchResponse.PartitionData> entry :
- response.responseData().entrySet()) {
- final TopicPartition topicPartition = entry.getKey();
- OffsetFetchResponse.PartitionData partitionData = entry.getValue();
- final Errors error = partitionData.error;
-
- if (error == Errors.NONE) {
- final Long offset = partitionData.offset;
- final String metadata = partitionData.metadata;
- final Optional<Integer> leaderEpoch = partitionData.leaderEpoch;
- groupOffsetsListing.put(topicPartition, new OffsetAndMetadata(offset, leaderEpoch, metadata));
- } else {
- log.warn("Skipping return offset for {} due to error {}.", topicPartition, error);
- }
+ if (handleGroupRequestError(response.error(), groupOffsetListingFuture))
+ return;
+
+ for (Map.Entry<TopicPartition, OffsetFetchResponse.PartitionData> entry :
+ response.responseData().entrySet()) {
+ final TopicPartition topicPartition = entry.getKey();
+ OffsetFetchResponse.PartitionData partitionData = entry.getValue();
+ final Errors error = partitionData.error;
+
+ if (error == Errors.NONE) {
+ final Long offset = partitionData.offset;
+ final String metadata = partitionData.metadata;
+ final Optional<Integer> leaderEpoch = partitionData.leaderEpoch;
+ groupOffsetsListing.put(topicPartition, new OffsetAndMetadata(offset, leaderEpoch, metadata));
+ } else {
+ log.warn("Skipping return offset for {} due to error {}.", topicPartition, error);
}
- groupOffsetListingFuture.complete(groupOffsetsListing);
}
+ groupOffsetListingFuture.complete(groupOffsetsListing);
}
@Override
@@ -2733,7 +2729,7 @@ public class KafkaAdminClient extends AdminClient {
void handleResponse(AbstractResponse abstractResponse) {
final FindCoordinatorResponse response = (FindCoordinatorResponse) abstractResponse;
- if (handleFindCoordinatorError(response, futures.get(groupId)))
+ if (handleGroupRequestError(response.error(), futures.get(groupId)))
return;
final long nowDeleteConsumerGroups = time.milliseconds();
@@ -2754,11 +2750,10 @@ public class KafkaAdminClient extends AdminClient {
KafkaFutureImpl<Void> future = futures.get(groupId);
final Errors groupError = response.get(groupId);
- if (groupError != Errors.NONE) {
- future.completeExceptionally(groupError.exception());
- } else {
- future.complete(null);
- }
+ if (handleGroupRequestError(groupError, future))
+ return;
+
+ future.complete(null);
}
@Override
diff --git a/clients/src/main/java/org/apache/kafka/common/requests/ListGroupsResponse.java b/clients/src/main/java/org/apache/kafka/common/requests/ListGroupsResponse.java
index af6f721..de16998 100644
--- a/clients/src/main/java/org/apache/kafka/common/requests/ListGroupsResponse.java
+++ b/clients/src/main/java/org/apache/kafka/common/requests/ListGroupsResponse.java
@@ -62,7 +62,7 @@ public class ListGroupsResponse extends AbstractResponse {
/**
* Possible error codes:
*
- * COORDINATOR_LOADING_IN_PROGRESS (14)
+ * COORDINATOR_LOAD_IN_PROGRESS (14)
* COORDINATOR_NOT_AVAILABLE (15)
* AUTHORIZATION_FAILED (29)
*/
diff --git a/clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java b/clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java
index 4c7c8ae..7e9f4c4 100644
--- a/clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java
+++ b/clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java
@@ -915,7 +915,7 @@ public class KafkaAdminClientTest {
Collections.emptySet(),
Collections.emptySet(), nodes.get(0));
- try (AdminClientUnitTestEnv env = new AdminClientUnitTestEnv(cluster)) {
+ try (AdminClientUnitTestEnv env = new AdminClientUnitTestEnv(cluster, AdminClientConfig.RETRIES_CONFIG, "2")) {
env.kafkaClient().setNodeApiVersions(NodeApiVersions.create());
// Empty metadata response should be retried
@@ -945,8 +945,8 @@ public class KafkaAdminClientTest {
// handle retriable errors
env.kafkaClient().prepareResponseFrom(
new ListGroupsResponse(
- Errors.COORDINATOR_NOT_AVAILABLE,
- Collections.emptyList()
+ Errors.COORDINATOR_NOT_AVAILABLE,
+ Collections.emptyList()
),
node1);
env.kafkaClient().prepareResponseFrom(
@@ -1049,9 +1049,34 @@ public class KafkaAdminClientTest {
try (AdminClientUnitTestEnv env = new AdminClientUnitTestEnv(cluster)) {
env.kafkaClient().setNodeApiVersions(NodeApiVersions.create());
+ //Retriable FindCoordinatorResponse errors should be retried
+ env.kafkaClient().prepareResponse(new FindCoordinatorResponse(Errors.COORDINATOR_NOT_AVAILABLE, Node.noNode()));
+ env.kafkaClient().prepareResponse(new FindCoordinatorResponse(Errors.COORDINATOR_LOAD_IN_PROGRESS, Node.noNode()));
env.kafkaClient().prepareResponse(new FindCoordinatorResponse(Errors.NONE, env.cluster().controller()));
- final Map<String, DescribeGroupsResponse.GroupMetadata> groupMetadataMap = new HashMap<>();
+ Map<String, DescribeGroupsResponse.GroupMetadata> groupMetadataMap = new HashMap<>();
+
+ //Retriable errors should be retried
+ groupMetadataMap.put(
+ "group-0",
+ new DescribeGroupsResponse.GroupMetadata(
+ Errors.COORDINATOR_NOT_AVAILABLE,
+ "",
+ ConsumerProtocol.PROTOCOL_TYPE,
+ "",
+ Collections.emptyList()));
+ groupMetadataMap.put(
+ "group-connect-0",
+ new DescribeGroupsResponse.GroupMetadata(
+ Errors.COORDINATOR_LOAD_IN_PROGRESS,
+ "",
+ "connect",
+ "",
+ Collections.emptyList()));
+ env.kafkaClient().prepareResponse(new DescribeGroupsResponse(groupMetadataMap));
+
+ groupMetadataMap = new HashMap<>();
+
TopicPartition myTopicPartition0 = new TopicPartition("my_topic", 0);
TopicPartition myTopicPartition1 = new TopicPartition("my_topic", 1);
TopicPartition myTopicPartition2 = new TopicPartition("my_topic", 2);
@@ -1111,8 +1136,15 @@ public class KafkaAdminClientTest {
try (AdminClientUnitTestEnv env = new AdminClientUnitTestEnv(cluster)) {
env.kafkaClient().setNodeApiVersions(NodeApiVersions.create());
+ //Retriable FindCoordinatorResponse errors should be retried
+ env.kafkaClient().prepareResponse(new FindCoordinatorResponse(Errors.COORDINATOR_NOT_AVAILABLE, Node.noNode()));
+
env.kafkaClient().prepareResponse(new FindCoordinatorResponse(Errors.NONE, env.cluster().controller()));
+ //Retriable errors should be retried
+ env.kafkaClient().prepareResponse(new OffsetFetchResponse(Errors.COORDINATOR_NOT_AVAILABLE, Collections.emptyMap()));
+ env.kafkaClient().prepareResponse(new OffsetFetchResponse(Errors.COORDINATOR_LOAD_IN_PROGRESS, Collections.emptyMap()));
+
TopicPartition myTopicPartition0 = new TopicPartition("my_topic", 0);
TopicPartition myTopicPartition1 = new TopicPartition("my_topic", 1);
TopicPartition myTopicPartition2 = new TopicPartition("my_topic", 2);
@@ -1160,9 +1192,9 @@ public class KafkaAdminClientTest {
env.kafkaClient().prepareResponse(new FindCoordinatorResponse(Errors.NONE, env.cluster().controller()));
- final Map<String, Errors> response = new HashMap<>();
- response.put("group-0", Errors.NONE);
- env.kafkaClient().prepareResponse(new DeleteGroupsResponse(response));
+ final Map<String, Errors> validResponse = new HashMap<>();
+ validResponse.put("group-0", Errors.NONE);
+ env.kafkaClient().prepareResponse(new DeleteGroupsResponse(validResponse));
final DeleteConsumerGroupsResult result = env.adminClient().deleteConsumerGroups(groupIds);
@@ -1175,6 +1207,23 @@ public class KafkaAdminClientTest {
final DeleteConsumerGroupsResult errorResult = env.adminClient().deleteConsumerGroups(groupIds);
TestUtils.assertFutureError(errorResult.deletedGroups().get("group-0"), GroupAuthorizationException.class);
+ //Retriable errors should be retried
+ env.kafkaClient().prepareResponse(new FindCoordinatorResponse(Errors.NONE, env.cluster().controller()));
+
+ final Map<String, Errors> errorResponse1 = new HashMap<>();
+ errorResponse1.put("group-0", Errors.COORDINATOR_NOT_AVAILABLE);
+ env.kafkaClient().prepareResponse(new DeleteGroupsResponse(errorResponse1));
+
+ final Map<String, Errors> errorResponse2 = new HashMap<>();
+ errorResponse2.put("group-0", Errors.COORDINATOR_LOAD_IN_PROGRESS);
+ env.kafkaClient().prepareResponse(new DeleteGroupsResponse(errorResponse2));
+
+ env.kafkaClient().prepareResponse(new DeleteGroupsResponse(validResponse));
+
+ final DeleteConsumerGroupsResult errorResult1 = env.adminClient().deleteConsumerGroups(groupIds);
+
+ final KafkaFuture<Void> errorResults = errorResult1.deletedGroups().get("group-0");
+ assertNull(errorResults.get());
}
}