You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by da...@apache.org on 2021/09/03 18:31:57 UTC

[kafka] branch trunk updated: KAFKA-13103: add REBALANCE_IN_PROGRESS error as retriable error for AlterConsumerGroupOffsetsHandler (#11086)

This is an automated email from the ASF dual-hosted git repository.

dajac pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 37584ce  KAFKA-13103: add REBALANCE_IN_PROGRESS error as retriable error for AlterConsumerGroupOffsetsHandler (#11086)
37584ce is described below

commit 37584ce4f57a9e087e9c9280803e556a856d95e8
Author: Luke Chen <sh...@gmail.com>
AuthorDate: Sat Sep 4 02:29:44 2021 +0800

    KAFKA-13103: add REBALANCE_IN_PROGRESS error as retriable error for AlterConsumerGroupOffsetsHandler (#11086)
    
    This patch adds `REBALANCE_IN_PROGRESS` error as retriable error for `AlterConsumerGroupOffsetsHandler`, and tests for it.
    
    Reviewers: David Jacot <dj...@confluent.io>
---
 .../admin/internals/AlterConsumerGroupOffsetsHandler.java      | 10 +++++-----
 .../admin/internals/ListConsumerGroupOffsetsHandler.java       |  1 -
 .../admin/internals/RemoveMembersFromConsumerGroupHandler.java |  7 +------
 .../org/apache/kafka/clients/admin/KafkaAdminClientTest.java   |  9 ++++++---
 .../admin/internals/AlterConsumerGroupOffsetsHandlerTest.java  |  2 +-
 5 files changed, 13 insertions(+), 16 deletions(-)

diff --git a/clients/src/main/java/org/apache/kafka/clients/admin/internals/AlterConsumerGroupOffsetsHandler.java b/clients/src/main/java/org/apache/kafka/clients/admin/internals/AlterConsumerGroupOffsetsHandler.java
index 7ac90b6..cb7551e 100644
--- a/clients/src/main/java/org/apache/kafka/clients/admin/internals/AlterConsumerGroupOffsetsHandler.java
+++ b/clients/src/main/java/org/apache/kafka/clients/admin/internals/AlterConsumerGroupOffsetsHandler.java
@@ -159,24 +159,24 @@ public class AlterConsumerGroupOffsetsHandler implements AdminApiHandler<Coordin
         Set<CoordinatorKey> groupsToRetry
     ) {
         switch (error) {
-            // If the coordinator is in the middle of loading, then we just need to retry.
+            // If the coordinator is in the middle of loading, or rebalance is in progress, then we just need to retry.
             case COORDINATOR_LOAD_IN_PROGRESS:
-                log.debug("OffsetCommit request for group id {} failed because the coordinator" +
-                    " is still in the process of loading state. Will retry.", groupId.idValue);
+            case REBALANCE_IN_PROGRESS:
+                log.debug("OffsetCommit request for group id {} returned error {}. Will retry.",
+                    groupId.idValue, error);
                 groupsToRetry.add(groupId);
                 break;
 
             // If the coordinator is not available, then we unmap and retry.
             case COORDINATOR_NOT_AVAILABLE:
             case NOT_COORDINATOR:
-                log.debug("OffsetCommit request for group id {} returned error {}. Will retry.",
+                log.debug("OffsetCommit request for group id {} returned error {}. Will rediscover the coordinator and retry.",
                     groupId.idValue, error);
                 groupsToUnmap.add(groupId);
                 break;
 
             // Group level errors.
             case INVALID_GROUP_ID:
-            case REBALANCE_IN_PROGRESS:
             case INVALID_COMMIT_OFFSET_SIZE:
             case GROUP_AUTHORIZATION_FAILED:
                 log.debug("OffsetCommit request for group id {} failed due to error {}.",
diff --git a/clients/src/main/java/org/apache/kafka/clients/admin/internals/ListConsumerGroupOffsetsHandler.java b/clients/src/main/java/org/apache/kafka/clients/admin/internals/ListConsumerGroupOffsetsHandler.java
index d5b2105..b1d2e9d 100644
--- a/clients/src/main/java/org/apache/kafka/clients/admin/internals/ListConsumerGroupOffsetsHandler.java
+++ b/clients/src/main/java/org/apache/kafka/clients/admin/internals/ListConsumerGroupOffsetsHandler.java
@@ -139,7 +139,6 @@ public class ListConsumerGroupOffsetsHandler implements AdminApiHandler<Coordina
                 log.debug("`OffsetFetch` request for group id {} failed due to error {}", groupId.idValue, error);
                 failed.put(groupId, error.exception());
                 break;
-
             case COORDINATOR_LOAD_IN_PROGRESS:
                 // If the coordinator is in the middle of loading, then we just need to retry
                 log.debug("`OffsetFetch` request for group id {} failed because the coordinator " +
diff --git a/clients/src/main/java/org/apache/kafka/clients/admin/internals/RemoveMembersFromConsumerGroupHandler.java b/clients/src/main/java/org/apache/kafka/clients/admin/internals/RemoveMembersFromConsumerGroupHandler.java
index e463911..90b3865 100644
--- a/clients/src/main/java/org/apache/kafka/clients/admin/internals/RemoveMembersFromConsumerGroupHandler.java
+++ b/clients/src/main/java/org/apache/kafka/clients/admin/internals/RemoveMembersFromConsumerGroupHandler.java
@@ -110,11 +110,7 @@ public class RemoveMembersFromConsumerGroupHandler implements AdminApiHandler<Co
                                  Errors.forCode(memberResponse.errorCode()));
             }
 
-            return new ApiResult<>(
-                Collections.singletonMap(groupId, memberErrors),
-                Collections.emptyMap(),
-                Collections.emptyList()
-            );
+            return ApiResult.completed(groupId, memberErrors);
         }
     }
 
@@ -129,7 +125,6 @@ public class RemoveMembersFromConsumerGroupHandler implements AdminApiHandler<Co
                 log.debug("`LeaveGroup` request for group id {} failed due to error {}", groupId.idValue, error);
                 failed.put(groupId, error.exception());
                 break;
-
             case COORDINATOR_LOAD_IN_PROGRESS:
                 // If the coordinator is in the middle of loading, then we just need to retry
                 log.debug("`LeaveGroup` request for group id {} failed because the coordinator " +
diff --git a/clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java b/clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java
index 46542db..b648b2d 100644
--- a/clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java
+++ b/clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java
@@ -3199,7 +3199,7 @@ public class KafkaAdminClientTest {
         try (AdminClientUnitTestEnv env = new AdminClientUnitTestEnv(mockCluster(1, 0))) {
             env.kafkaClient().setNodeApiVersions(NodeApiVersions.create(Arrays.asList(findCoordinatorV3, describeGroups)));
 
-            //Retriable FindCoordinatorResponse errors should be retried
+            // Retriable FindCoordinatorResponse errors should be retried
             env.kafkaClient().prepareResponse(prepareOldFindCoordinatorResponse(Errors.COORDINATOR_NOT_AVAILABLE,  Node.noNode()));
             env.kafkaClient().prepareResponse(prepareOldFindCoordinatorResponse(Errors.COORDINATOR_LOAD_IN_PROGRESS, Node.noNode()));
 
@@ -3219,9 +3219,9 @@ public class KafkaAdminClientTest {
             final KafkaFuture<Void> results = result.deletedGroups().get("groupId");
             assertNull(results.get());
 
-            //should throw error for non-retriable errors
+            // should throw error for non-retriable errors
             env.kafkaClient().prepareResponse(
-                prepareOldFindCoordinatorResponse(Errors.GROUP_AUTHORIZATION_FAILED,  Node.noNode()));
+                prepareOldFindCoordinatorResponse(Errors.GROUP_AUTHORIZATION_FAILED, Node.noNode()));
 
             DeleteConsumerGroupsResult errorResult = env.adminClient().deleteConsumerGroups(groupIds);
             TestUtils.assertFutureError(errorResult.deletedGroups().get("groupId"), GroupAuthorizationException.class);
@@ -4174,6 +4174,9 @@ public class KafkaAdminClientTest {
                 prepareFindCoordinatorResponse(Errors.NONE, env.cluster().controller()));
 
             env.kafkaClient().prepareResponse(
+                prepareOffsetCommitResponse(tp1, Errors.REBALANCE_IN_PROGRESS));
+
+            env.kafkaClient().prepareResponse(
                 prepareOffsetCommitResponse(tp1, Errors.NONE));
 
             Map<TopicPartition, OffsetAndMetadata> offsets = new HashMap<>();
diff --git a/clients/src/test/java/org/apache/kafka/clients/admin/internals/AlterConsumerGroupOffsetsHandlerTest.java b/clients/src/test/java/org/apache/kafka/clients/admin/internals/AlterConsumerGroupOffsetsHandlerTest.java
index 8988c0f..c0ea2ba 100644
--- a/clients/src/test/java/org/apache/kafka/clients/admin/internals/AlterConsumerGroupOffsetsHandlerTest.java
+++ b/clients/src/test/java/org/apache/kafka/clients/admin/internals/AlterConsumerGroupOffsetsHandlerTest.java
@@ -83,6 +83,7 @@ public class AlterConsumerGroupOffsetsHandlerTest {
         assertUnmappedKey(partitionErrors(Errors.NOT_COORDINATOR));
         assertUnmappedKey(partitionErrors(Errors.COORDINATOR_NOT_AVAILABLE));
         assertRetriableError(partitionErrors(Errors.COORDINATOR_LOAD_IN_PROGRESS));
+        assertRetriableError(partitionErrors(Errors.REBALANCE_IN_PROGRESS));
     }
 
     @Test
@@ -94,7 +95,6 @@ public class AlterConsumerGroupOffsetsHandlerTest {
         assertFatalError(partitionErrors(Errors.OFFSET_METADATA_TOO_LARGE));
         assertFatalError(partitionErrors(Errors.ILLEGAL_GENERATION));
         assertFatalError(partitionErrors(Errors.UNKNOWN_MEMBER_ID));
-        assertFatalError(partitionErrors(Errors.REBALANCE_IN_PROGRESS));
         assertFatalError(partitionErrors(Errors.INVALID_COMMIT_OFFSET_SIZE));
         assertFatalError(partitionErrors(Errors.UNKNOWN_SERVER_ERROR));
     }