You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by da...@apache.org on 2021/09/03 18:31:57 UTC
[kafka] branch trunk updated: KAFKA-13103: add
REBALANCE_IN_PROGRESS error as retriable error for
AlterConsumerGroupOffsetsHandler (#11086)
This is an automated email from the ASF dual-hosted git repository.
dajac pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new 37584ce KAFKA-13103: add REBALANCE_IN_PROGRESS error as retriable error for AlterConsumerGroupOffsetsHandler (#11086)
37584ce is described below
commit 37584ce4f57a9e087e9c9280803e556a856d95e8
Author: Luke Chen <sh...@gmail.com>
AuthorDate: Sat Sep 4 02:29:44 2021 +0800
KAFKA-13103: add REBALANCE_IN_PROGRESS error as retriable error for AlterConsumerGroupOffsetsHandler (#11086)
This patch adds `REBALANCE_IN_PROGRESS` error as retriable error for `AlterConsumerGroupOffsetsHandler`, and tests for it.
Reviewers: David Jacot <dj...@confluent.io>
---
.../admin/internals/AlterConsumerGroupOffsetsHandler.java | 10 +++++-----
.../admin/internals/ListConsumerGroupOffsetsHandler.java | 1 -
.../admin/internals/RemoveMembersFromConsumerGroupHandler.java | 7 +------
.../org/apache/kafka/clients/admin/KafkaAdminClientTest.java | 9 ++++++---
.../admin/internals/AlterConsumerGroupOffsetsHandlerTest.java | 2 +-
5 files changed, 13 insertions(+), 16 deletions(-)
diff --git a/clients/src/main/java/org/apache/kafka/clients/admin/internals/AlterConsumerGroupOffsetsHandler.java b/clients/src/main/java/org/apache/kafka/clients/admin/internals/AlterConsumerGroupOffsetsHandler.java
index 7ac90b6..cb7551e 100644
--- a/clients/src/main/java/org/apache/kafka/clients/admin/internals/AlterConsumerGroupOffsetsHandler.java
+++ b/clients/src/main/java/org/apache/kafka/clients/admin/internals/AlterConsumerGroupOffsetsHandler.java
@@ -159,24 +159,24 @@ public class AlterConsumerGroupOffsetsHandler implements AdminApiHandler<Coordin
Set<CoordinatorKey> groupsToRetry
) {
switch (error) {
- // If the coordinator is in the middle of loading, then we just need to retry.
+ // If the coordinator is in the middle of loading, or rebalance is in progress, then we just need to retry.
case COORDINATOR_LOAD_IN_PROGRESS:
- log.debug("OffsetCommit request for group id {} failed because the coordinator" +
- " is still in the process of loading state. Will retry.", groupId.idValue);
+ case REBALANCE_IN_PROGRESS:
+ log.debug("OffsetCommit request for group id {} returned error {}. Will retry.",
+ groupId.idValue, error);
groupsToRetry.add(groupId);
break;
// If the coordinator is not available, then we unmap and retry.
case COORDINATOR_NOT_AVAILABLE:
case NOT_COORDINATOR:
- log.debug("OffsetCommit request for group id {} returned error {}. Will retry.",
+ log.debug("OffsetCommit request for group id {} returned error {}. Will rediscover the coordinator and retry.",
groupId.idValue, error);
groupsToUnmap.add(groupId);
break;
// Group level errors.
case INVALID_GROUP_ID:
- case REBALANCE_IN_PROGRESS:
case INVALID_COMMIT_OFFSET_SIZE:
case GROUP_AUTHORIZATION_FAILED:
log.debug("OffsetCommit request for group id {} failed due to error {}.",
diff --git a/clients/src/main/java/org/apache/kafka/clients/admin/internals/ListConsumerGroupOffsetsHandler.java b/clients/src/main/java/org/apache/kafka/clients/admin/internals/ListConsumerGroupOffsetsHandler.java
index d5b2105..b1d2e9d 100644
--- a/clients/src/main/java/org/apache/kafka/clients/admin/internals/ListConsumerGroupOffsetsHandler.java
+++ b/clients/src/main/java/org/apache/kafka/clients/admin/internals/ListConsumerGroupOffsetsHandler.java
@@ -139,7 +139,6 @@ public class ListConsumerGroupOffsetsHandler implements AdminApiHandler<Coordina
log.debug("`OffsetFetch` request for group id {} failed due to error {}", groupId.idValue, error);
failed.put(groupId, error.exception());
break;
-
case COORDINATOR_LOAD_IN_PROGRESS:
// If the coordinator is in the middle of loading, then we just need to retry
log.debug("`OffsetFetch` request for group id {} failed because the coordinator " +
diff --git a/clients/src/main/java/org/apache/kafka/clients/admin/internals/RemoveMembersFromConsumerGroupHandler.java b/clients/src/main/java/org/apache/kafka/clients/admin/internals/RemoveMembersFromConsumerGroupHandler.java
index e463911..90b3865 100644
--- a/clients/src/main/java/org/apache/kafka/clients/admin/internals/RemoveMembersFromConsumerGroupHandler.java
+++ b/clients/src/main/java/org/apache/kafka/clients/admin/internals/RemoveMembersFromConsumerGroupHandler.java
@@ -110,11 +110,7 @@ public class RemoveMembersFromConsumerGroupHandler implements AdminApiHandler<Co
Errors.forCode(memberResponse.errorCode()));
}
- return new ApiResult<>(
- Collections.singletonMap(groupId, memberErrors),
- Collections.emptyMap(),
- Collections.emptyList()
- );
+ return ApiResult.completed(groupId, memberErrors);
}
}
@@ -129,7 +125,6 @@ public class RemoveMembersFromConsumerGroupHandler implements AdminApiHandler<Co
log.debug("`LeaveGroup` request for group id {} failed due to error {}", groupId.idValue, error);
failed.put(groupId, error.exception());
break;
-
case COORDINATOR_LOAD_IN_PROGRESS:
// If the coordinator is in the middle of loading, then we just need to retry
log.debug("`LeaveGroup` request for group id {} failed because the coordinator " +
diff --git a/clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java b/clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java
index 46542db..b648b2d 100644
--- a/clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java
+++ b/clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java
@@ -3199,7 +3199,7 @@ public class KafkaAdminClientTest {
try (AdminClientUnitTestEnv env = new AdminClientUnitTestEnv(mockCluster(1, 0))) {
env.kafkaClient().setNodeApiVersions(NodeApiVersions.create(Arrays.asList(findCoordinatorV3, describeGroups)));
- //Retriable FindCoordinatorResponse errors should be retried
+ // Retriable FindCoordinatorResponse errors should be retried
env.kafkaClient().prepareResponse(prepareOldFindCoordinatorResponse(Errors.COORDINATOR_NOT_AVAILABLE, Node.noNode()));
env.kafkaClient().prepareResponse(prepareOldFindCoordinatorResponse(Errors.COORDINATOR_LOAD_IN_PROGRESS, Node.noNode()));
@@ -3219,9 +3219,9 @@ public class KafkaAdminClientTest {
final KafkaFuture<Void> results = result.deletedGroups().get("groupId");
assertNull(results.get());
- //should throw error for non-retriable errors
+ // should throw error for non-retriable errors
env.kafkaClient().prepareResponse(
- prepareOldFindCoordinatorResponse(Errors.GROUP_AUTHORIZATION_FAILED, Node.noNode()));
+ prepareOldFindCoordinatorResponse(Errors.GROUP_AUTHORIZATION_FAILED, Node.noNode()));
DeleteConsumerGroupsResult errorResult = env.adminClient().deleteConsumerGroups(groupIds);
TestUtils.assertFutureError(errorResult.deletedGroups().get("groupId"), GroupAuthorizationException.class);
@@ -4174,6 +4174,9 @@ public class KafkaAdminClientTest {
prepareFindCoordinatorResponse(Errors.NONE, env.cluster().controller()));
env.kafkaClient().prepareResponse(
+ prepareOffsetCommitResponse(tp1, Errors.REBALANCE_IN_PROGRESS));
+
+ env.kafkaClient().prepareResponse(
prepareOffsetCommitResponse(tp1, Errors.NONE));
Map<TopicPartition, OffsetAndMetadata> offsets = new HashMap<>();
diff --git a/clients/src/test/java/org/apache/kafka/clients/admin/internals/AlterConsumerGroupOffsetsHandlerTest.java b/clients/src/test/java/org/apache/kafka/clients/admin/internals/AlterConsumerGroupOffsetsHandlerTest.java
index 8988c0f..c0ea2ba 100644
--- a/clients/src/test/java/org/apache/kafka/clients/admin/internals/AlterConsumerGroupOffsetsHandlerTest.java
+++ b/clients/src/test/java/org/apache/kafka/clients/admin/internals/AlterConsumerGroupOffsetsHandlerTest.java
@@ -83,6 +83,7 @@ public class AlterConsumerGroupOffsetsHandlerTest {
assertUnmappedKey(partitionErrors(Errors.NOT_COORDINATOR));
assertUnmappedKey(partitionErrors(Errors.COORDINATOR_NOT_AVAILABLE));
assertRetriableError(partitionErrors(Errors.COORDINATOR_LOAD_IN_PROGRESS));
+ assertRetriableError(partitionErrors(Errors.REBALANCE_IN_PROGRESS));
}
@Test
@@ -94,7 +95,6 @@ public class AlterConsumerGroupOffsetsHandlerTest {
assertFatalError(partitionErrors(Errors.OFFSET_METADATA_TOO_LARGE));
assertFatalError(partitionErrors(Errors.ILLEGAL_GENERATION));
assertFatalError(partitionErrors(Errors.UNKNOWN_MEMBER_ID));
- assertFatalError(partitionErrors(Errors.REBALANCE_IN_PROGRESS));
assertFatalError(partitionErrors(Errors.INVALID_COMMIT_OFFSET_SIZE));
assertFatalError(partitionErrors(Errors.UNKNOWN_SERVER_ERROR));
}