You are viewing a plain text version of this content. The canonical link for it is here.
Posted to jira@kafka.apache.org by GitBox <gi...@apache.org> on 2021/07/13 19:19:39 UTC

[GitHub] [kafka] dajac commented on a change in pull request #11021: KAFKA-13062: refactor DeleteConsumerGroupsHandler and tests

dajac commented on a change in pull request #11021:
URL: https://github.com/apache/kafka/pull/11021#discussion_r668789179



##########
File path: clients/src/main/java/org/apache/kafka/clients/admin/internals/DeleteConsumerGroupsHandler.java
##########
@@ -89,46 +91,76 @@ public String apiName() {
         DeleteGroupsResponse response = (DeleteGroupsResponse) abstractResponse;
         Map<CoordinatorKey, Void> completed = new HashMap<>();
         Map<CoordinatorKey, Throwable> failed = new HashMap<>();
-        List<CoordinatorKey> unmapped = new ArrayList<>();
+        final Set<CoordinatorKey> groupsToUnmap = new HashSet<>();
+        final Set<CoordinatorKey> groupsToRetry = new HashSet<>();
 
         for (DeletableGroupResult deletedGroup : response.data().results()) {
             CoordinatorKey groupIdKey = CoordinatorKey.byGroupId(deletedGroup.groupId());
             Errors error = Errors.forCode(deletedGroup.errorCode());
             if (error != Errors.NONE) {
-                handleError(groupIdKey, error, failed, unmapped);
+                handleError(groupIdKey, error, failed, groupsToUnmap, groupsToRetry);
                 continue;
             }
 
             completed.put(groupIdKey, null);
         }
-        return new ApiResult<>(completed, failed, unmapped);
+
+        if (groupsToUnmap.isEmpty() && groupsToRetry.isEmpty()) {

Review comment:
       It seems incorrect to do this here. We were able to do so in the other because they were expecting only one group at the time. This one is different. The driver will retry if the group is not completed nor failed. It seems to me that we could keep the existing code, no?

##########
File path: clients/src/main/java/org/apache/kafka/clients/admin/internals/DeleteConsumerGroupsHandler.java
##########
@@ -89,46 +91,76 @@ public String apiName() {
         DeleteGroupsResponse response = (DeleteGroupsResponse) abstractResponse;
         Map<CoordinatorKey, Void> completed = new HashMap<>();
         Map<CoordinatorKey, Throwable> failed = new HashMap<>();
-        List<CoordinatorKey> unmapped = new ArrayList<>();
+        final Set<CoordinatorKey> groupsToUnmap = new HashSet<>();
+        final Set<CoordinatorKey> groupsToRetry = new HashSet<>();
 
         for (DeletableGroupResult deletedGroup : response.data().results()) {
             CoordinatorKey groupIdKey = CoordinatorKey.byGroupId(deletedGroup.groupId());
             Errors error = Errors.forCode(deletedGroup.errorCode());
             if (error != Errors.NONE) {
-                handleError(groupIdKey, error, failed, unmapped);
+                handleError(groupIdKey, error, failed, groupsToUnmap, groupsToRetry);
                 continue;
             }
 
             completed.put(groupIdKey, null);
         }
-        return new ApiResult<>(completed, failed, unmapped);
+
+        if (groupsToUnmap.isEmpty() && groupsToRetry.isEmpty()) {
+            return new ApiResult<>(
+                completed,
+                failed,
+                Collections.emptyList()
+            );
+        } else {
+            // retry the request, so don't send completed/failed results back
+            return new ApiResult<>(
+                Collections.emptyMap(),
+                Collections.emptyMap(),
+                new ArrayList<>(groupsToUnmap)
+            );
+        }
     }
 
     private void handleError(
         CoordinatorKey groupId,
         Errors error,
         Map<CoordinatorKey, Throwable> failed,
-        List<CoordinatorKey> unmapped
+        Set<CoordinatorKey> groupsToUnmap,
+        Set<CoordinatorKey> groupsToRetry
     ) {
         switch (error) {
             case GROUP_AUTHORIZATION_FAILED:
-                log.error("Received authorization failure for group {} in `DeleteConsumerGroups` response", groupId,
-                        error.exception());
+                log.error("Received authorization failure for group {} in `{}` response", groupId,
+                    apiName(), error.exception());
+                failed.put(groupId, error.exception());
+                break;
+            case INVALID_GROUP_ID:
+            case NON_EMPTY_GROUP:
+            case GROUP_ID_NOT_FOUND:
+                log.error("Received non retriable failure for group {} in `{}` response", groupId,

Review comment:
       I would also try to uniformize the logs and would use debug all the time except for the unexpected errors.

##########
File path: clients/src/main/java/org/apache/kafka/clients/admin/internals/DeleteConsumerGroupsHandler.java
##########
@@ -89,46 +91,76 @@ public String apiName() {
         DeleteGroupsResponse response = (DeleteGroupsResponse) abstractResponse;
         Map<CoordinatorKey, Void> completed = new HashMap<>();
         Map<CoordinatorKey, Throwable> failed = new HashMap<>();
-        List<CoordinatorKey> unmapped = new ArrayList<>();
+        final Set<CoordinatorKey> groupsToUnmap = new HashSet<>();
+        final Set<CoordinatorKey> groupsToRetry = new HashSet<>();
 
         for (DeletableGroupResult deletedGroup : response.data().results()) {
             CoordinatorKey groupIdKey = CoordinatorKey.byGroupId(deletedGroup.groupId());
             Errors error = Errors.forCode(deletedGroup.errorCode());
             if (error != Errors.NONE) {
-                handleError(groupIdKey, error, failed, unmapped);
+                handleError(groupIdKey, error, failed, groupsToUnmap, groupsToRetry);
                 continue;
             }
 
             completed.put(groupIdKey, null);
         }
-        return new ApiResult<>(completed, failed, unmapped);
+
+        if (groupsToUnmap.isEmpty() && groupsToRetry.isEmpty()) {
+            return new ApiResult<>(
+                completed,
+                failed,
+                Collections.emptyList()
+            );
+        } else {
+            // retry the request, so don't send completed/failed results back
+            return new ApiResult<>(
+                Collections.emptyMap(),
+                Collections.emptyMap(),
+                new ArrayList<>(groupsToUnmap)
+            );
+        }
     }
 
     private void handleError(
         CoordinatorKey groupId,
         Errors error,
         Map<CoordinatorKey, Throwable> failed,
-        List<CoordinatorKey> unmapped
+        Set<CoordinatorKey> groupsToUnmap,
+        Set<CoordinatorKey> groupsToRetry
     ) {
         switch (error) {
             case GROUP_AUTHORIZATION_FAILED:
-                log.error("Received authorization failure for group {} in `DeleteConsumerGroups` response", groupId,
-                        error.exception());
+                log.error("Received authorization failure for group {} in `{}` response", groupId,
+                    apiName(), error.exception());
+                failed.put(groupId, error.exception());
+                break;
+            case INVALID_GROUP_ID:
+            case NON_EMPTY_GROUP:
+            case GROUP_ID_NOT_FOUND:
+                log.error("Received non retriable failure for group {} in `{}` response", groupId,
+                    apiName(), error.exception());
                 failed.put(groupId, error.exception());
                 break;
             case COORDINATOR_LOAD_IN_PROGRESS:
-            case COORDINATOR_NOT_AVAILABLE:
+                // If the coordinator is in the middle of loading, then we just need to retry
+                log.debug("`{}` request for group {} failed because the coordinator " +
+                    "is still in the process of loading state. Will retry", apiName(), groupId);
+                groupsToRetry.add(groupId);
                 break;
+            case COORDINATOR_NOT_AVAILABLE:
             case NOT_COORDINATOR:
-                log.debug("DeleteConsumerGroups request for group {} returned error {}. Will retry",
-                        groupId, error);
-                unmapped.add(groupId);
+                // If the coordinator is unavailable or there was a coordinator change, then we unmap
+                // the key so that we retry the `FindCoordinator` request
+                log.debug("`{}` request for group {} returned error {}. " +
+                    "Will attempt to find the coordinator again and retry", apiName(), groupId, error);
+                groupsToUnmap.add(groupId);
                 break;
             default:
-                log.error("Received unexpected error for group {} in `DeleteConsumerGroups` response",
-                        groupId, error.exception());
+                final String unexpectedErrorMsg = String.format("Received unexpected error for group %s in `%s` response",
+                    groupId, apiName());
+                log.error(unexpectedErrorMsg, error.exception());
                 failed.put(groupId, error.exception());
         }
     }
 
-}
+}

Review comment:
       nit: Could we revert this?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org