You are viewing a plain text version of this content. The canonical link for it is here.
Posted to jira@kafka.apache.org by GitBox <gi...@apache.org> on 2021/07/12 08:31:14 UTC

[GitHub] [kafka] showuon opened a new pull request #11021: KAFKA-13062: refactor DeleteConsumerGroupsHandler and tests

showuon opened a new pull request #11021:
URL: https://github.com/apache/kafka/pull/11021


   refactor `DeleteConsumerGroupsHandler` and tests:
   
   1. add all expected errors in the handleError
   2. update tests
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] dajac commented on pull request #11021: KAFKA-13062: Make DeleteConsumerGroupsHandler unmap for COORDINATOR_NOT_AVAILABLE error

Posted by GitBox <gi...@apache.org>.
dajac commented on pull request #11021:
URL: https://github.com/apache/kafka/pull/11021#issuecomment-880661583


   Merged to trunk and to 3.0. cc @kkonstantine 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] dajac commented on pull request #11021: KAFKA-13062: Make DeleteConsumerGroupsHandler unmap for COORDINATOR_NOT_AVAILABLE error

Posted by GitBox <gi...@apache.org>.
dajac commented on pull request #11021:
URL: https://github.com/apache/kafka/pull/11021#issuecomment-880660679


   Failures are not related:
   ```
   Build / JDK 16 and Scala 2.13 / shouldBeAbleToQueryFilterState – org.apache.kafka.streams.integration.QueryableStateIntegrationTest
   43s
   Build / JDK 11 and Scala 2.13 / remoteCloseWithoutBufferedReceives() – kafka.network.SocketServerTest
   <1s
   Build / JDK 11 and Scala 2.13 / shouldWorkWithUncleanShutdownWipeOutStateStore[exactly_once_v2] – org.apache.kafka.streams.integration.EOSUncleanShutdownIntegrationTest
   36s
   Build / JDK 8 and Scala 2.12 / shouldAddCurrentLeaderEpochToMessagesAsTheyAreWrittenToLeader() – kafka.server.epoch.LeaderEpochIntegrationTest
   36s
   Build / JDK 8 and Scala 2.12 / shouldInnerJoinMultiPartitionQueryable – org.apache.kafka.streams.integration.KTableKTableForeignKeyInnerJoinMultiIntegrationTest
   ```


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] dajac merged pull request #11021: KAFKA-13062: Make DeleteConsumerGroupsHandler unmap for COORDINATOR_NOT_AVAILABLE error

Posted by GitBox <gi...@apache.org>.
dajac merged pull request #11021:
URL: https://github.com/apache/kafka/pull/11021


   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] dajac commented on a change in pull request #11021: KAFKA-13062: refactor DeleteConsumerGroupsHandler and tests

Posted by GitBox <gi...@apache.org>.
dajac commented on a change in pull request #11021:
URL: https://github.com/apache/kafka/pull/11021#discussion_r669379080



##########
File path: clients/src/main/java/org/apache/kafka/clients/admin/internals/DeleteConsumerGroupsHandler.java
##########
@@ -86,47 +87,54 @@ public String apiName() {
         Set<CoordinatorKey> groupIds,
         AbstractResponse abstractResponse
     ) {
-        DeleteGroupsResponse response = (DeleteGroupsResponse) abstractResponse;
-        Map<CoordinatorKey, Void> completed = new HashMap<>();
-        Map<CoordinatorKey, Throwable> failed = new HashMap<>();
-        List<CoordinatorKey> unmapped = new ArrayList<>();
+        final DeleteGroupsResponse response = (DeleteGroupsResponse) abstractResponse;
+        final Map<CoordinatorKey, Void> completed = new HashMap<>();
+        final Map<CoordinatorKey, Throwable> failed = new HashMap<>();
+        final Set<CoordinatorKey> groupsToUnmap = new HashSet<>();
 
         for (DeletableGroupResult deletedGroup : response.data().results()) {
             CoordinatorKey groupIdKey = CoordinatorKey.byGroupId(deletedGroup.groupId());
             Errors error = Errors.forCode(deletedGroup.errorCode());
             if (error != Errors.NONE) {
-                handleError(groupIdKey, error, failed, unmapped);
+                handleError(groupIdKey, error, failed, groupsToUnmap);
                 continue;
             }
 
             completed.put(groupIdKey, null);
         }
-        return new ApiResult<>(completed, failed, unmapped);
+
+        return new ApiResult<>(completed, failed, new ArrayList<>(groupsToUnmap));
     }
 
     private void handleError(
         CoordinatorKey groupId,
         Errors error,
         Map<CoordinatorKey, Throwable> failed,
-        List<CoordinatorKey> unmapped
+        Set<CoordinatorKey> groupsToUnmap
     ) {
         switch (error) {
             case GROUP_AUTHORIZATION_FAILED:
-                log.error("Received authorization failure for group {} in `DeleteConsumerGroups` response", groupId,
-                        error.exception());
+            case INVALID_GROUP_ID:
+            case NON_EMPTY_GROUP:
+            case GROUP_ID_NOT_FOUND:
+                log.debug("`DeleteConsumerGroups` request for group id {} failed due to error {}", groupId, error);
                 failed.put(groupId, error.exception());
                 break;
             case COORDINATOR_LOAD_IN_PROGRESS:
-            case COORDINATOR_NOT_AVAILABLE:
+                // If the coordinator is in the middle of loading, then we just need to retry
+                log.debug("`DeleteConsumerGroups` request for group {} failed because the coordinator " +
+                    "is still in the process of loading state. Will retry", groupId);
                 break;
+            case COORDINATOR_NOT_AVAILABLE:
             case NOT_COORDINATOR:
-                log.debug("DeleteConsumerGroups request for group {} returned error {}. Will retry",
-                        groupId, error);
-                unmapped.add(groupId);
+                // If the coordinator is unavailable or there was a coordinator change, then we unmap
+                // the key so that we retry the `FindCoordinator` request
+                log.debug("`DeleteConsumerGroups` request for group {} returned error {}. " +

Review comment:
       nit: `group` -> `group id`?

##########
File path: clients/src/main/java/org/apache/kafka/clients/admin/internals/DeleteConsumerGroupsHandler.java
##########
@@ -86,47 +87,54 @@ public String apiName() {
         Set<CoordinatorKey> groupIds,
         AbstractResponse abstractResponse
     ) {
-        DeleteGroupsResponse response = (DeleteGroupsResponse) abstractResponse;
-        Map<CoordinatorKey, Void> completed = new HashMap<>();
-        Map<CoordinatorKey, Throwable> failed = new HashMap<>();
-        List<CoordinatorKey> unmapped = new ArrayList<>();
+        final DeleteGroupsResponse response = (DeleteGroupsResponse) abstractResponse;
+        final Map<CoordinatorKey, Void> completed = new HashMap<>();
+        final Map<CoordinatorKey, Throwable> failed = new HashMap<>();
+        final Set<CoordinatorKey> groupsToUnmap = new HashSet<>();
 
         for (DeletableGroupResult deletedGroup : response.data().results()) {
             CoordinatorKey groupIdKey = CoordinatorKey.byGroupId(deletedGroup.groupId());
             Errors error = Errors.forCode(deletedGroup.errorCode());
             if (error != Errors.NONE) {
-                handleError(groupIdKey, error, failed, unmapped);
+                handleError(groupIdKey, error, failed, groupsToUnmap);
                 continue;
             }
 
             completed.put(groupIdKey, null);
         }
-        return new ApiResult<>(completed, failed, unmapped);
+
+        return new ApiResult<>(completed, failed, new ArrayList<>(groupsToUnmap));
     }
 
     private void handleError(
         CoordinatorKey groupId,
         Errors error,
         Map<CoordinatorKey, Throwable> failed,
-        List<CoordinatorKey> unmapped
+        Set<CoordinatorKey> groupsToUnmap
     ) {
         switch (error) {
             case GROUP_AUTHORIZATION_FAILED:
-                log.error("Received authorization failure for group {} in `DeleteConsumerGroups` response", groupId,
-                        error.exception());
+            case INVALID_GROUP_ID:
+            case NON_EMPTY_GROUP:
+            case GROUP_ID_NOT_FOUND:
+                log.debug("`DeleteConsumerGroups` request for group id {} failed due to error {}", groupId, error);
                 failed.put(groupId, error.exception());
                 break;
             case COORDINATOR_LOAD_IN_PROGRESS:
-            case COORDINATOR_NOT_AVAILABLE:
+                // If the coordinator is in the middle of loading, then we just need to retry
+                log.debug("`DeleteConsumerGroups` request for group {} failed because the coordinator " +

Review comment:
       nit: `group` -> `group id`?

##########
File path: clients/src/main/java/org/apache/kafka/clients/admin/internals/DeleteConsumerGroupsHandler.java
##########
@@ -86,47 +87,54 @@ public String apiName() {
         Set<CoordinatorKey> groupIds,
         AbstractResponse abstractResponse
     ) {
-        DeleteGroupsResponse response = (DeleteGroupsResponse) abstractResponse;
-        Map<CoordinatorKey, Void> completed = new HashMap<>();
-        Map<CoordinatorKey, Throwable> failed = new HashMap<>();
-        List<CoordinatorKey> unmapped = new ArrayList<>();
+        final DeleteGroupsResponse response = (DeleteGroupsResponse) abstractResponse;
+        final Map<CoordinatorKey, Void> completed = new HashMap<>();
+        final Map<CoordinatorKey, Throwable> failed = new HashMap<>();
+        final Set<CoordinatorKey> groupsToUnmap = new HashSet<>();
 
         for (DeletableGroupResult deletedGroup : response.data().results()) {
             CoordinatorKey groupIdKey = CoordinatorKey.byGroupId(deletedGroup.groupId());
             Errors error = Errors.forCode(deletedGroup.errorCode());
             if (error != Errors.NONE) {
-                handleError(groupIdKey, error, failed, unmapped);
+                handleError(groupIdKey, error, failed, groupsToUnmap);
                 continue;
             }
 
             completed.put(groupIdKey, null);
         }
-        return new ApiResult<>(completed, failed, unmapped);
+
+        return new ApiResult<>(completed, failed, new ArrayList<>(groupsToUnmap));
     }
 
     private void handleError(
         CoordinatorKey groupId,
         Errors error,
         Map<CoordinatorKey, Throwable> failed,
-        List<CoordinatorKey> unmapped
+        Set<CoordinatorKey> groupsToUnmap
     ) {
         switch (error) {
             case GROUP_AUTHORIZATION_FAILED:
-                log.error("Received authorization failure for group {} in `DeleteConsumerGroups` response", groupId,
-                        error.exception());
+            case INVALID_GROUP_ID:
+            case NON_EMPTY_GROUP:
+            case GROUP_ID_NOT_FOUND:
+                log.debug("`DeleteConsumerGroups` request for group id {} failed due to error {}", groupId, error);

Review comment:
       nit: We should use `groupId.idValue` here and in the others.

##########
File path: clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java
##########
@@ -3179,41 +3179,47 @@ public void testDeleteConsumerGroupsWithOlderBroker() throws Exception {
             env.kafkaClient().prepareResponse(
                 prepareOldFindCoordinatorResponse(Errors.GROUP_AUTHORIZATION_FAILED,  Node.noNode()));
 
-            final DeleteConsumerGroupsResult errorResult = env.adminClient().deleteConsumerGroups(groupIds);
+            DeleteConsumerGroupsResult errorResult = env.adminClient().deleteConsumerGroups(groupIds);
             TestUtils.assertFutureError(errorResult.deletedGroups().get("groupId"), GroupAuthorizationException.class);
 
-            //Retriable errors should be retried
+            // Retriable errors should be retried
             env.kafkaClient().prepareResponse(
                 prepareOldFindCoordinatorResponse(Errors.NONE, env.cluster().controller()));
 
-            final DeletableGroupResultCollection errorResponse1 = new DeletableGroupResultCollection();
-            errorResponse1.add(new DeletableGroupResult()
-                                   .setGroupId("groupId")
-                                   .setErrorCode(Errors.COORDINATOR_NOT_AVAILABLE.code())
-            );
-            env.kafkaClient().prepareResponse(new DeleteGroupsResponse(
-                new DeleteGroupsResponseData()
-                    .setResults(errorResponse1)));

Review comment:
       Why are we moving this to later?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] dajac commented on a change in pull request #11021: KAFKA-13062: refactor DeleteConsumerGroupsHandler and tests

Posted by GitBox <gi...@apache.org>.
dajac commented on a change in pull request #11021:
URL: https://github.com/apache/kafka/pull/11021#discussion_r668789179



##########
File path: clients/src/main/java/org/apache/kafka/clients/admin/internals/DeleteConsumerGroupsHandler.java
##########
@@ -89,46 +91,76 @@ public String apiName() {
         DeleteGroupsResponse response = (DeleteGroupsResponse) abstractResponse;
         Map<CoordinatorKey, Void> completed = new HashMap<>();
         Map<CoordinatorKey, Throwable> failed = new HashMap<>();
-        List<CoordinatorKey> unmapped = new ArrayList<>();
+        final Set<CoordinatorKey> groupsToUnmap = new HashSet<>();
+        final Set<CoordinatorKey> groupsToRetry = new HashSet<>();
 
         for (DeletableGroupResult deletedGroup : response.data().results()) {
             CoordinatorKey groupIdKey = CoordinatorKey.byGroupId(deletedGroup.groupId());
             Errors error = Errors.forCode(deletedGroup.errorCode());
             if (error != Errors.NONE) {
-                handleError(groupIdKey, error, failed, unmapped);
+                handleError(groupIdKey, error, failed, groupsToUnmap, groupsToRetry);
                 continue;
             }
 
             completed.put(groupIdKey, null);
         }
-        return new ApiResult<>(completed, failed, unmapped);
+
+        if (groupsToUnmap.isEmpty() && groupsToRetry.isEmpty()) {

Review comment:
       It seems incorrect to do this here. We were able to do so in the other because they were expecting only one group at the time. This one is different. The driver will retry if the group is not completed nor failed. It seems to me that we could keep the existing code, no?

##########
File path: clients/src/main/java/org/apache/kafka/clients/admin/internals/DeleteConsumerGroupsHandler.java
##########
@@ -89,46 +91,76 @@ public String apiName() {
         DeleteGroupsResponse response = (DeleteGroupsResponse) abstractResponse;
         Map<CoordinatorKey, Void> completed = new HashMap<>();
         Map<CoordinatorKey, Throwable> failed = new HashMap<>();
-        List<CoordinatorKey> unmapped = new ArrayList<>();
+        final Set<CoordinatorKey> groupsToUnmap = new HashSet<>();
+        final Set<CoordinatorKey> groupsToRetry = new HashSet<>();
 
         for (DeletableGroupResult deletedGroup : response.data().results()) {
             CoordinatorKey groupIdKey = CoordinatorKey.byGroupId(deletedGroup.groupId());
             Errors error = Errors.forCode(deletedGroup.errorCode());
             if (error != Errors.NONE) {
-                handleError(groupIdKey, error, failed, unmapped);
+                handleError(groupIdKey, error, failed, groupsToUnmap, groupsToRetry);
                 continue;
             }
 
             completed.put(groupIdKey, null);
         }
-        return new ApiResult<>(completed, failed, unmapped);
+
+        if (groupsToUnmap.isEmpty() && groupsToRetry.isEmpty()) {
+            return new ApiResult<>(
+                completed,
+                failed,
+                Collections.emptyList()
+            );
+        } else {
+            // retry the request, so don't send completed/failed results back
+            return new ApiResult<>(
+                Collections.emptyMap(),
+                Collections.emptyMap(),
+                new ArrayList<>(groupsToUnmap)
+            );
+        }
     }
 
     private void handleError(
         CoordinatorKey groupId,
         Errors error,
         Map<CoordinatorKey, Throwable> failed,
-        List<CoordinatorKey> unmapped
+        Set<CoordinatorKey> groupsToUnmap,
+        Set<CoordinatorKey> groupsToRetry
     ) {
         switch (error) {
             case GROUP_AUTHORIZATION_FAILED:
-                log.error("Received authorization failure for group {} in `DeleteConsumerGroups` response", groupId,
-                        error.exception());
+                log.error("Received authorization failure for group {} in `{}` response", groupId,
+                    apiName(), error.exception());
+                failed.put(groupId, error.exception());
+                break;
+            case INVALID_GROUP_ID:
+            case NON_EMPTY_GROUP:
+            case GROUP_ID_NOT_FOUND:
+                log.error("Received non retriable failure for group {} in `{}` response", groupId,

Review comment:
       I would also try to uniformize the logs and would use debug all the time except for the unexpected errors.

##########
File path: clients/src/main/java/org/apache/kafka/clients/admin/internals/DeleteConsumerGroupsHandler.java
##########
@@ -89,46 +91,76 @@ public String apiName() {
         DeleteGroupsResponse response = (DeleteGroupsResponse) abstractResponse;
         Map<CoordinatorKey, Void> completed = new HashMap<>();
         Map<CoordinatorKey, Throwable> failed = new HashMap<>();
-        List<CoordinatorKey> unmapped = new ArrayList<>();
+        final Set<CoordinatorKey> groupsToUnmap = new HashSet<>();
+        final Set<CoordinatorKey> groupsToRetry = new HashSet<>();
 
         for (DeletableGroupResult deletedGroup : response.data().results()) {
             CoordinatorKey groupIdKey = CoordinatorKey.byGroupId(deletedGroup.groupId());
             Errors error = Errors.forCode(deletedGroup.errorCode());
             if (error != Errors.NONE) {
-                handleError(groupIdKey, error, failed, unmapped);
+                handleError(groupIdKey, error, failed, groupsToUnmap, groupsToRetry);
                 continue;
             }
 
             completed.put(groupIdKey, null);
         }
-        return new ApiResult<>(completed, failed, unmapped);
+
+        if (groupsToUnmap.isEmpty() && groupsToRetry.isEmpty()) {
+            return new ApiResult<>(
+                completed,
+                failed,
+                Collections.emptyList()
+            );
+        } else {
+            // retry the request, so don't send completed/failed results back
+            return new ApiResult<>(
+                Collections.emptyMap(),
+                Collections.emptyMap(),
+                new ArrayList<>(groupsToUnmap)
+            );
+        }
     }
 
     private void handleError(
         CoordinatorKey groupId,
         Errors error,
         Map<CoordinatorKey, Throwable> failed,
-        List<CoordinatorKey> unmapped
+        Set<CoordinatorKey> groupsToUnmap,
+        Set<CoordinatorKey> groupsToRetry
     ) {
         switch (error) {
             case GROUP_AUTHORIZATION_FAILED:
-                log.error("Received authorization failure for group {} in `DeleteConsumerGroups` response", groupId,
-                        error.exception());
+                log.error("Received authorization failure for group {} in `{}` response", groupId,
+                    apiName(), error.exception());
+                failed.put(groupId, error.exception());
+                break;
+            case INVALID_GROUP_ID:
+            case NON_EMPTY_GROUP:
+            case GROUP_ID_NOT_FOUND:
+                log.error("Received non retriable failure for group {} in `{}` response", groupId,
+                    apiName(), error.exception());
                 failed.put(groupId, error.exception());
                 break;
             case COORDINATOR_LOAD_IN_PROGRESS:
-            case COORDINATOR_NOT_AVAILABLE:
+                // If the coordinator is in the middle of loading, then we just need to retry
+                log.debug("`{}` request for group {} failed because the coordinator " +
+                    "is still in the process of loading state. Will retry", apiName(), groupId);
+                groupsToRetry.add(groupId);
                 break;
+            case COORDINATOR_NOT_AVAILABLE:
             case NOT_COORDINATOR:
-                log.debug("DeleteConsumerGroups request for group {} returned error {}. Will retry",
-                        groupId, error);
-                unmapped.add(groupId);
+                // If the coordinator is unavailable or there was a coordinator change, then we unmap
+                // the key so that we retry the `FindCoordinator` request
+                log.debug("`{}` request for group {} returned error {}. " +
+                    "Will attempt to find the coordinator again and retry", apiName(), groupId, error);
+                groupsToUnmap.add(groupId);
                 break;
             default:
-                log.error("Received unexpected error for group {} in `DeleteConsumerGroups` response",
-                        groupId, error.exception());
+                final String unexpectedErrorMsg = String.format("Received unexpected error for group %s in `%s` response",
+                    groupId, apiName());
+                log.error(unexpectedErrorMsg, error.exception());
                 failed.put(groupId, error.exception());
         }
     }
 
-}
+}

Review comment:
       nit: Could we revert this?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] showuon commented on a change in pull request #11021: KAFKA-13062: Make DeleteConsumerGroupsHandler unmap for COORDINATOR_NOT_AVAILABLE error

Posted by GitBox <gi...@apache.org>.
showuon commented on a change in pull request #11021:
URL: https://github.com/apache/kafka/pull/11021#discussion_r669478307



##########
File path: clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java
##########
@@ -3179,41 +3179,47 @@ public void testDeleteConsumerGroupsWithOlderBroker() throws Exception {
             env.kafkaClient().prepareResponse(
                 prepareOldFindCoordinatorResponse(Errors.GROUP_AUTHORIZATION_FAILED,  Node.noNode()));
 
-            final DeleteConsumerGroupsResult errorResult = env.adminClient().deleteConsumerGroups(groupIds);
+            DeleteConsumerGroupsResult errorResult = env.adminClient().deleteConsumerGroups(groupIds);
             TestUtils.assertFutureError(errorResult.deletedGroups().get("groupId"), GroupAuthorizationException.class);
 
-            //Retriable errors should be retried
+            // Retriable errors should be retried
             env.kafkaClient().prepareResponse(
                 prepareOldFindCoordinatorResponse(Errors.NONE, env.cluster().controller()));
 
-            final DeletableGroupResultCollection errorResponse1 = new DeletableGroupResultCollection();
-            errorResponse1.add(new DeletableGroupResult()
-                                   .setGroupId("groupId")
-                                   .setErrorCode(Errors.COORDINATOR_NOT_AVAILABLE.code())
-            );
-            env.kafkaClient().prepareResponse(new DeleteGroupsResponse(
-                new DeleteGroupsResponseData()
-                    .setResults(errorResponse1)));

Review comment:
       This section is testing "retriable" errors should be retried. Before the change, `COORDINATOR_NOT_AVAILABLE` is considered as retriable error. But after this PR, it'll considered as unmapped error, so it is moved to later, to test when receiving the error, we should re-find coordinator, and then re-send request. 




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] showuon commented on a change in pull request #11021: KAFKA-13062: Make DeleteConsumerGroupsHandler unmap for COORDINATOR_NOT_AVAILABLE error

Posted by GitBox <gi...@apache.org>.
showuon commented on a change in pull request #11021:
URL: https://github.com/apache/kafka/pull/11021#discussion_r669479150



##########
File path: clients/src/main/java/org/apache/kafka/clients/admin/internals/DeleteConsumerGroupsHandler.java
##########
@@ -89,46 +91,76 @@ public String apiName() {
         DeleteGroupsResponse response = (DeleteGroupsResponse) abstractResponse;
         Map<CoordinatorKey, Void> completed = new HashMap<>();
         Map<CoordinatorKey, Throwable> failed = new HashMap<>();
-        List<CoordinatorKey> unmapped = new ArrayList<>();
+        final Set<CoordinatorKey> groupsToUnmap = new HashSet<>();
+        final Set<CoordinatorKey> groupsToRetry = new HashSet<>();
 
         for (DeletableGroupResult deletedGroup : response.data().results()) {
             CoordinatorKey groupIdKey = CoordinatorKey.byGroupId(deletedGroup.groupId());
             Errors error = Errors.forCode(deletedGroup.errorCode());
             if (error != Errors.NONE) {
-                handleError(groupIdKey, error, failed, unmapped);
+                handleError(groupIdKey, error, failed, groupsToUnmap, groupsToRetry);
                 continue;
             }
 
             completed.put(groupIdKey, null);
         }
-        return new ApiResult<>(completed, failed, unmapped);
+
+        if (groupsToUnmap.isEmpty() && groupsToRetry.isEmpty()) {

Review comment:
       You are right! Updated.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] showuon commented on pull request #11021: KAFKA-13062: refactor DeleteConsumerGroupsHandler and tests

Posted by GitBox <gi...@apache.org>.
showuon commented on pull request #11021:
URL: https://github.com/apache/kafka/pull/11021#issuecomment-878082917


   @dajac , please take a look. Thanks.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org