You are viewing a plain text version of this content. The canonical link for it is here.
Posted to jira@kafka.apache.org by GitBox <gi...@apache.org> on 2021/07/12 20:05:59 UTC

[GitHub] [kafka] showuon opened a new pull request #11026: KAFKA-13064: refactor ListConsumerGroupOffsetsHandler and tests

showuon opened a new pull request #11026:
URL: https://github.com/apache/kafka/pull/11026


   Some issues found in the ListConsumerGroupOffsetsHandler:
   
   1. if coordinator errors is put in the topic partition, we don't do retry
   2. Didn't handle possible partition level exception
   
   This is the old handle response logic. FYR:
   ```java
   void handleResponse(AbstractResponse abstractResponse) {
         final OffsetFetchResponse response = (OffsetFetchResponse) abstractResponse;
         final Map<TopicPartition, OffsetAndMetadata> groupOffsetsListing = new HashMap<>();
   
         // If coordinator changed since we fetched it, retry
         if (ConsumerGroupOperationContext.hasCoordinatorMoved(response)) {
             Call call = getListConsumerGroupOffsetsCall(context);
             rescheduleFindCoordinatorTask(context, () -> call, this);
             return;
         }
   
         if (handleGroupRequestError(response.error(), context.future()))
             return;
   
         for (Map.Entry<TopicPartition, OffsetFetchResponse.PartitionData> entry :
             response.responseData().entrySet()) {
             final TopicPartition topicPartition = entry.getKey();
             OffsetFetchResponse.PartitionData partitionData = entry.getValue();
             final Errors error = partitionData.error;
   
             if (error == Errors.NONE) {
                 final Long offset = partitionData.offset;
                 final String metadata = partitionData.metadata;
                 final Optional<Integer> leaderEpoch = partitionData.leaderEpoch;
                 // Negative offset indicates that the group has no committed offset for this partition
                 if (offset < 0) {
                     groupOffsetsListing.put(topicPartition, null);
                 } else {
                     groupOffsetsListing.put(topicPartition, new OffsetAndMetadata(offset, leaderEpoch, metadata));
                 }
             } else {
                 log.warn("Skipping return offset for {} due to error {}.", topicPartition, error);
             }
         }
         context.future().complete(groupOffsetsListing);
     }
   ```
   
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] dajac commented on pull request #11026: KAFKA-13064: Make ListConsumerGroupOffsetsHandler unmap for COORDINATOR_NOT_AVAILABLE error

Posted by GitBox <gi...@apache.org>.
dajac commented on pull request #11026:
URL: https://github.com/apache/kafka/pull/11026#issuecomment-881255593


   @showuon Thanks for the patches. Could you update the description of this PR and the others to ensure that the description reflects the changes?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] showuon commented on a change in pull request #11026: KAFKA-13064: refactor ListConsumerGroupOffsetsHandler and tests

Posted by GitBox <gi...@apache.org>.
showuon commented on a change in pull request #11026:
URL: https://github.com/apache/kafka/pull/11026#discussion_r670917051



##########
File path: clients/src/main/java/org/apache/kafka/clients/admin/internals/ListConsumerGroupOffsetsHandler.java
##########
@@ -82,14 +93,17 @@ public String apiName() {
         Set<CoordinatorKey> groupIds,
         AbstractResponse abstractResponse
     ) {
+        validateKeys(groupIds);
+
         final OffsetFetchResponse response = (OffsetFetchResponse) abstractResponse;
-        Map<CoordinatorKey, Map<TopicPartition, OffsetAndMetadata>> completed = new HashMap<>();
-        Map<CoordinatorKey, Throwable> failed = new HashMap<>();
-        List<CoordinatorKey> unmapped = new ArrayList<>();
+        final Map<CoordinatorKey, Map<TopicPartition, OffsetAndMetadata>> completed = new HashMap<>();
+        final Map<CoordinatorKey, Throwable> failed = new HashMap<>();
+        final Set<CoordinatorKey> groupsToUnmap = new HashSet<>();
+        final Set<CoordinatorKey> groupsToRetry = new HashSet<>();
 
         Errors responseError = response.groupLevelError(groupId.idValue);

Review comment:
       @dajac , I also confirmed you are right. No need to sorry, I also missed that. I removed the `handlePartitionError`, and just return ApiResult just like other PRs did. Thank you. :)




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] showuon commented on pull request #11026: KAFKA-13064: Make ListConsumerGroupOffsetsHandler unmap for COORDINATOR_NOT_AVAILABLE error

Posted by GitBox <gi...@apache.org>.
showuon commented on pull request #11026:
URL: https://github.com/apache/kafka/pull/11026#issuecomment-881258733


   @dajac , all checked and updated. Thank you very much for your patiently review all these PRs! After these update, we are more confident in these new handlers. :)


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] dajac commented on pull request #11026: KAFKA-13064: Make ListConsumerGroupOffsetsHandler unmap for COORDINATOR_NOT_AVAILABLE error

Posted by GitBox <gi...@apache.org>.
dajac commented on pull request #11026:
URL: https://github.com/apache/kafka/pull/11026#issuecomment-881254117


   Failures are not related:
   ```
   Build / JDK 16 and Scala 2.13 / testCommitTransactionTimeout() – kafka.api.TransactionsTest
   9s
   Build / JDK 11 and Scala 2.13 / testCloseDuringRebalance() – kafka.api.ConsumerBounceTest
   7s
   Build / JDK 11 and Scala 2.13 / testCloseDuringRebalance() – kafka.api.ConsumerBounceTest
   10s
   Build / JDK 8 and Scala 2.12 / testCloseDuringRebalance() – kafka.api.ConsumerBounceTest
   6s
   Build / JDK 8 and Scala 2.12 / testCloseDuringRebalance() – kafka.api.ConsumerBounceTest
   ```


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] showuon commented on pull request #11026: KAFKA-13064: refactor ListConsumerGroupOffsetsHandler and tests

Posted by GitBox <gi...@apache.org>.
showuon commented on pull request #11026:
URL: https://github.com/apache/kafka/pull/11026#issuecomment-878254300


   @dajac , please take a look. Thanks.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] showuon commented on a change in pull request #11026: KAFKA-13064: refactor ListConsumerGroupOffsetsHandler and tests

Posted by GitBox <gi...@apache.org>.
showuon commented on a change in pull request #11026:
URL: https://github.com/apache/kafka/pull/11026#discussion_r669628826



##########
File path: clients/src/main/java/org/apache/kafka/clients/admin/internals/ListConsumerGroupOffsetsHandler.java
##########
@@ -110,41 +112,97 @@ public String apiName() {
                         groupOffsetsListing.put(topicPartition, new OffsetAndMetadata(offset, leaderEpoch, metadata));
                     }
                 } else {
-                    log.warn("Skipping return offset for {} due to error {}.", topicPartition, error);
+                    handlePartitionError(groupId, topicPartition, error, groupsToUnmap, groupsToRetry);

Review comment:
       Sorry, it should be v0-v1, I'll update it.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] showuon commented on a change in pull request #11026: KAFKA-13064: refactor ListConsumerGroupOffsetsHandler and tests

Posted by GitBox <gi...@apache.org>.
showuon commented on a change in pull request #11026:
URL: https://github.com/apache/kafka/pull/11026#discussion_r671039369



##########
File path: clients/src/main/java/org/apache/kafka/clients/admin/internals/ListConsumerGroupOffsetsHandler.java
##########
@@ -110,40 +124,96 @@ public String apiName() {
                         groupOffsetsListing.put(topicPartition, new OffsetAndMetadata(offset, leaderEpoch, metadata));
                     }
                 } else {
-                    log.warn("Skipping return offset for {} due to error {}.", topicPartition, error);
+                    // In responseData V0 and V1, there's no top level error, we have to handle errors here
+                    handlePartitionError(groupId, topicPartition, error, groupsToUnmap, groupsToRetry);
                 }
             }
             completed.put(groupId, groupOffsetsListing);
         }
-        return new ApiResult<>(completed, failed, unmapped);
+
+        if (groupsToUnmap.isEmpty() && groupsToRetry.isEmpty()) {
+            return new ApiResult<>(
+                completed,
+                failed,
+                Collections.emptyList()
+            );
+        } else {
+            // retry the request, so don't send completed/failed results back
+            return new ApiResult<>(
+                Collections.emptyMap(),
+                Collections.emptyMap(),
+                new ArrayList<>(groupsToUnmap)
+            );
+        }
     }
 
-    private void handleError(
+    private void handleGroupError(
         CoordinatorKey groupId,
         Errors error,
-        Map<CoordinatorKey,
-        Throwable> failed,
-        List<CoordinatorKey> unmapped
+        Map<CoordinatorKey, Throwable> failed,
+        Set<CoordinatorKey> groupsToUnmap,
+        Set<CoordinatorKey> groupsToRetry
     ) {
         switch (error) {
             case GROUP_AUTHORIZATION_FAILED:
-                log.error("Received authorization failure for group {} in `OffsetFetch` response", groupId,
-                        error.exception());
+                log.debug("`OffsetFetch` request for group id {} failed due to error {}", groupId.idValue, error);
                 failed.put(groupId, error.exception());
                 break;
+
             case COORDINATOR_LOAD_IN_PROGRESS:
+                // If the coordinator is in the middle of loading, then we just need to retry
+                log.debug("`OffsetFetch` request for group id {} failed because the coordinator " +
+                    "is still in the process of loading state. Will retry", groupId.idValue);
+                groupsToRetry.add(groupId);
+                break;
             case COORDINATOR_NOT_AVAILABLE:
+            case NOT_COORDINATOR:
+                // If the coordinator is unavailable or there was a coordinator change, then we unmap
+                // the key so that we retry the `FindCoordinator` request
+                log.debug("`OffsetFetch` request for group id {} returned error {}. " +
+                    "Will attempt to find the coordinator again and retry", groupId.idValue, error);
+                groupsToUnmap.add(groupId);
+                break;
+
+            default:
+                final String unexpectedErrorMsg =
+                    String.format("`OffsetFetch` request for group id %s failed due to error %s", groupId.idValue, error);
+                log.error(unexpectedErrorMsg);
+                failed.put(groupId, error.exception(unexpectedErrorMsg));

Review comment:
       Updated. Thanks.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] showuon commented on a change in pull request #11026: KAFKA-13064: refactor ListConsumerGroupOffsetsHandler and tests

Posted by GitBox <gi...@apache.org>.
showuon commented on a change in pull request #11026:
URL: https://github.com/apache/kafka/pull/11026#discussion_r669551754



##########
File path: clients/src/main/java/org/apache/kafka/clients/admin/internals/ListConsumerGroupOffsetsHandler.java
##########
@@ -110,41 +112,97 @@ public String apiName() {
                         groupOffsetsListing.put(topicPartition, new OffsetAndMetadata(offset, leaderEpoch, metadata));
                     }
                 } else {
-                    log.warn("Skipping return offset for {} due to error {}.", topicPartition, error);
+                    handlePartitionError(groupId, topicPartition, error, groupsToUnmap, groupsToRetry);

Review comment:
       Marked KAFKA-13064 as blocker, and add comments:
   `// In responseData V0-V7, there's no group level error, we have to handle partition errors here`
   Thanks.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] showuon commented on a change in pull request #11026: KAFKA-13064: refactor ListConsumerGroupOffsetsHandler and tests

Posted by GitBox <gi...@apache.org>.
showuon commented on a change in pull request #11026:
URL: https://github.com/apache/kafka/pull/11026#discussion_r669553529



##########
File path: clients/src/main/java/org/apache/kafka/clients/admin/internals/ListConsumerGroupOffsetsHandler.java
##########
@@ -110,41 +112,97 @@ public String apiName() {
                         groupOffsetsListing.put(topicPartition, new OffsetAndMetadata(offset, leaderEpoch, metadata));
                     }
                 } else {
-                    log.warn("Skipping return offset for {} due to error {}.", topicPartition, error);
+                    handlePartitionError(groupId, topicPartition, error, groupsToUnmap, groupsToRetry);
                 }
             }
             completed.put(groupId, groupOffsetsListing);
         }
-        return new ApiResult<>(completed, failed, unmapped);
+
+        if (groupsToUnmap.isEmpty() && groupsToRetry.isEmpty()) {
+            return new ApiResult<>(
+                completed,

Review comment:
       No, we can't do that because the `completed` here could be `empty map`. If we put `Collections.singletonMap(groupId, groupOffsetsListing)`, it'll always not empty. Thanks.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] dajac commented on pull request #11026: KAFKA-13064: Make ListConsumerGroupOffsetsHandler unmap for COORDINATOR_NOT_AVAILABLE error

Posted by GitBox <gi...@apache.org>.
dajac commented on pull request #11026:
URL: https://github.com/apache/kafka/pull/11026#issuecomment-881255176


   Merged to trunk and 3.0.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] dajac commented on a change in pull request #11026: KAFKA-13064: refactor ListConsumerGroupOffsetsHandler and tests

Posted by GitBox <gi...@apache.org>.
dajac commented on a change in pull request #11026:
URL: https://github.com/apache/kafka/pull/11026#discussion_r668864349



##########
File path: clients/src/main/java/org/apache/kafka/clients/admin/internals/ListConsumerGroupOffsetsHandler.java
##########
@@ -110,41 +112,97 @@ public String apiName() {
                         groupOffsetsListing.put(topicPartition, new OffsetAndMetadata(offset, leaderEpoch, metadata));
                     }
                 } else {
-                    log.warn("Skipping return offset for {} due to error {}.", topicPartition, error);
+                    handlePartitionError(groupId, topicPartition, error, groupsToUnmap, groupsToRetry);
                 }
             }
             completed.put(groupId, groupOffsetsListing);
         }
-        return new ApiResult<>(completed, failed, unmapped);
+
+        if (groupsToUnmap.isEmpty() && groupsToRetry.isEmpty()) {
+            return new ApiResult<>(
+                completed,

Review comment:
       We could get rid of `completed` and use `Collections.singletonMap(groupId, groupOffsetsListing)`, no?

##########
File path: clients/src/main/java/org/apache/kafka/clients/admin/internals/ListConsumerGroupOffsetsHandler.java
##########
@@ -85,11 +86,12 @@ public String apiName() {
         final OffsetFetchResponse response = (OffsetFetchResponse) abstractResponse;

Review comment:
       As we expect a specific `groupId`, I would check the provided `groupIds`.

##########
File path: clients/src/main/java/org/apache/kafka/clients/admin/internals/ListConsumerGroupOffsetsHandler.java
##########
@@ -85,11 +86,12 @@ public String apiName() {
         final OffsetFetchResponse response = (OffsetFetchResponse) abstractResponse;
         Map<CoordinatorKey, Map<TopicPartition, OffsetAndMetadata>> completed = new HashMap<>();
         Map<CoordinatorKey, Throwable> failed = new HashMap<>();
-        List<CoordinatorKey> unmapped = new ArrayList<>();
+        final Set<CoordinatorKey> groupsToUnmap = new HashSet<>();

Review comment:
       nit: Let's make all variables as final or none in this block.

##########
File path: clients/src/main/java/org/apache/kafka/clients/admin/internals/ListConsumerGroupOffsetsHandler.java
##########
@@ -110,41 +112,97 @@ public String apiName() {
                         groupOffsetsListing.put(topicPartition, new OffsetAndMetadata(offset, leaderEpoch, metadata));
                     }
                 } else {
-                    log.warn("Skipping return offset for {} due to error {}.", topicPartition, error);
+                    handlePartitionError(groupId, topicPartition, error, groupsToUnmap, groupsToRetry);
                 }
             }
             completed.put(groupId, groupOffsetsListing);
         }
-        return new ApiResult<>(completed, failed, unmapped);
+
+        if (groupsToUnmap.isEmpty() && groupsToRetry.isEmpty()) {
+            return new ApiResult<>(
+                completed,
+                failed,
+                Collections.emptyList()
+            );
+        } else {
+            // retry the request, so don't send completed/failed results back
+            return new ApiResult<>(
+                Collections.emptyMap(),
+                Collections.emptyMap(),
+                new ArrayList<>(groupsToUnmap)
+            );
+        }
     }
 
-    private void handleError(
+    private void handleGroupError(
         CoordinatorKey groupId,
         Errors error,
-        Map<CoordinatorKey,
-        Throwable> failed,
-        List<CoordinatorKey> unmapped
+        Map<CoordinatorKey, Throwable> failed,
+        Set<CoordinatorKey> groupsToUnmap,
+        Set<CoordinatorKey> groupsToRetry
     ) {
         switch (error) {
             case GROUP_AUTHORIZATION_FAILED:
-                log.error("Received authorization failure for group {} in `OffsetFetch` response", groupId,
-                        error.exception());
+                log.error("Received authorization failure for group {} in `{}` response", groupId,
+                    apiName(), error.exception());
                 failed.put(groupId, error.exception());
                 break;
+
             case COORDINATOR_LOAD_IN_PROGRESS:
+                // If the coordinator is in the middle of loading, then we just need to retry
+                log.debug("`{}` request for group {} failed because the coordinator " +
+                    "is still in the process of loading state. Will retry", apiName(), groupId);
+                groupsToRetry.add(groupId);
+                break;
             case COORDINATOR_NOT_AVAILABLE:
+            case NOT_COORDINATOR:
+                // If the coordinator is unavailable or there was a coordinator change, then we unmap
+                // the key so that we retry the `FindCoordinator` request
+                log.debug("`{}` request for group {} returned error {}. " +
+                    "Will attempt to find the coordinator again and retry", apiName(), groupId, error);
+                groupsToUnmap.add(groupId);
                 break;
+
+            default:
+                final String unexpectedErrorMsg = String.format("Received unexpected error for group %s in `%s` response",
+                    groupId, apiName());
+                log.error(unexpectedErrorMsg, error.exception());
+                failed.put(groupId, error.exception(unexpectedErrorMsg));
+        }
+    }
+
+    private void handlePartitionError(
+        CoordinatorKey groupId,
+        TopicPartition topicPartition,
+        Errors error,
+        Set<CoordinatorKey> groupsToUnmap,
+        Set<CoordinatorKey> groupsToRetry
+    ) {
+        switch (error) {
+            case COORDINATOR_LOAD_IN_PROGRESS:
+                // If the coordinator is in the middle of loading, then we just need to retry
+                log.debug("`{}` request for group {} failed because the coordinator " +
+                    "is still in the process of loading state. Will retry", apiName(), groupId);
+                groupsToRetry.add(groupId);
+                break;
+            case COORDINATOR_NOT_AVAILABLE:
             case NOT_COORDINATOR:
-                log.debug("OffsetFetch request for group {} returned error {}. Will retry",
-                        groupId, error);
-                unmapped.add(groupId);
+                // If the coordinator is unavailable or there was a coordinator change, then we unmap
+                // the key so that we retry the `FindCoordinator` request
+                log.debug("`{}` request for group {} returned error {}. " +
+                    "Will attempt to find the coordinator again and retry", apiName(), groupId, error);
+                groupsToUnmap.add(groupId);
+                break;
+            case UNKNOWN_TOPIC_OR_PARTITION:
+            case TOPIC_AUTHORIZATION_FAILED:
+            case UNSTABLE_OFFSET_COMMIT:
+                log.warn("`{}` request for group {} returned error {} in partition {}. Skipping return offset for it.",
+                    apiName(), groupId, error, topicPartition);
                 break;
             default:
-                log.error("Received unexpected error for group {} in `OffsetFetch` response",
-                        groupId, error.exception());
-                failed.put(groupId, error.exception(
-                        "Received unexpected error for group " + groupId + " in `OffsetFetch` response"));
+                log.error("`{}` request for group {} returned unexpected error {} in partition {}. Skipping return offset for it.",
+                    apiName(), groupId, error, topicPartition);
         }
     }
 
-}
+}

Review comment:
       nit: Could we bring this back?

##########
File path: clients/src/main/java/org/apache/kafka/clients/admin/internals/ListConsumerGroupOffsetsHandler.java
##########
@@ -110,41 +112,97 @@ public String apiName() {
                         groupOffsetsListing.put(topicPartition, new OffsetAndMetadata(offset, leaderEpoch, metadata));
                     }
                 } else {
-                    log.warn("Skipping return offset for {} due to error {}.", topicPartition, error);
+                    handlePartitionError(groupId, topicPartition, error, groupsToUnmap, groupsToRetry);

Review comment:
       v0 and v1 do not have the top level error so it is indeed correct to check them here. Could you put a small comment to explain this here? Could you also mark KAFKA-13064 as a blocker for 3.0? This is a regression, I think.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] showuon commented on a change in pull request #11026: KAFKA-13064: refactor ListConsumerGroupOffsetsHandler and tests

Posted by GitBox <gi...@apache.org>.
showuon commented on a change in pull request #11026:
URL: https://github.com/apache/kafka/pull/11026#discussion_r670917051



##########
File path: clients/src/main/java/org/apache/kafka/clients/admin/internals/ListConsumerGroupOffsetsHandler.java
##########
@@ -82,14 +93,17 @@ public String apiName() {
         Set<CoordinatorKey> groupIds,
         AbstractResponse abstractResponse
     ) {
+        validateKeys(groupIds);
+
         final OffsetFetchResponse response = (OffsetFetchResponse) abstractResponse;
-        Map<CoordinatorKey, Map<TopicPartition, OffsetAndMetadata>> completed = new HashMap<>();
-        Map<CoordinatorKey, Throwable> failed = new HashMap<>();
-        List<CoordinatorKey> unmapped = new ArrayList<>();
+        final Map<CoordinatorKey, Map<TopicPartition, OffsetAndMetadata>> completed = new HashMap<>();
+        final Map<CoordinatorKey, Throwable> failed = new HashMap<>();
+        final Set<CoordinatorKey> groupsToUnmap = new HashSet<>();
+        final Set<CoordinatorKey> groupsToRetry = new HashSet<>();
 
         Errors responseError = response.groupLevelError(groupId.idValue);

Review comment:
       @dajac , I also confirmed that. No need to sorry, I also missed that. I removed the `handlePartitionError`, and just return ApiResult just like other PRs did. Thank you. :)




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] showuon commented on pull request #11026: KAFKA-13064: refactor ListConsumerGroupOffsetsHandler and tests

Posted by GitBox <gi...@apache.org>.
showuon commented on pull request #11026:
URL: https://github.com/apache/kafka/pull/11026#issuecomment-881249270


   Failed tests are unrelated, thanks.
   ```
       Build / JDK 16 and Scala 2.13 / kafka.api.TransactionsTest.testCommitTransactionTimeout()
       Build / JDK 11 and Scala 2.13 / kafka.api.ConsumerBounceTest.testCloseDuringRebalance()
       Build / JDK 11 and Scala 2.13 / kafka.api.ConsumerBounceTest.testCloseDuringRebalance()
       Build / JDK 8 and Scala 2.12 / kafka.api.ConsumerBounceTest.testCloseDuringRebalance()
       Build / JDK 8 and Scala 2.12 / kafka.api.ConsumerBounceTest.testCloseDuringRebalance()
   ```
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] dajac commented on a change in pull request #11026: KAFKA-13064: refactor ListConsumerGroupOffsetsHandler and tests

Posted by GitBox <gi...@apache.org>.
dajac commented on a change in pull request #11026:
URL: https://github.com/apache/kafka/pull/11026#discussion_r670798256



##########
File path: clients/src/main/java/org/apache/kafka/clients/admin/internals/ListConsumerGroupOffsetsHandler.java
##########
@@ -110,40 +124,96 @@ public String apiName() {
                         groupOffsetsListing.put(topicPartition, new OffsetAndMetadata(offset, leaderEpoch, metadata));
                     }
                 } else {
-                    log.warn("Skipping return offset for {} due to error {}.", topicPartition, error);
+                    // In responseData V0 and V1, there's no top level error, we have to handle errors here
+                    handlePartitionError(groupId, topicPartition, error, groupsToUnmap, groupsToRetry);
                 }
             }
             completed.put(groupId, groupOffsetsListing);
         }
-        return new ApiResult<>(completed, failed, unmapped);
+
+        if (groupsToUnmap.isEmpty() && groupsToRetry.isEmpty()) {
+            return new ApiResult<>(
+                completed,
+                failed,
+                Collections.emptyList()
+            );
+        } else {
+            // retry the request, so don't send completed/failed results back
+            return new ApiResult<>(
+                Collections.emptyMap(),
+                Collections.emptyMap(),
+                new ArrayList<>(groupsToUnmap)
+            );
+        }
     }
 
-    private void handleError(
+    private void handleGroupError(
         CoordinatorKey groupId,
         Errors error,
-        Map<CoordinatorKey,
-        Throwable> failed,
-        List<CoordinatorKey> unmapped
+        Map<CoordinatorKey, Throwable> failed,
+        Set<CoordinatorKey> groupsToUnmap,
+        Set<CoordinatorKey> groupsToRetry
     ) {
         switch (error) {
             case GROUP_AUTHORIZATION_FAILED:
-                log.error("Received authorization failure for group {} in `OffsetFetch` response", groupId,
-                        error.exception());
+                log.debug("`OffsetFetch` request for group id {} failed due to error {}", groupId.idValue, error);
                 failed.put(groupId, error.exception());
                 break;
+
             case COORDINATOR_LOAD_IN_PROGRESS:
+                // If the coordinator is in the middle of loading, then we just need to retry
+                log.debug("`OffsetFetch` request for group id {} failed because the coordinator " +
+                    "is still in the process of loading state. Will retry", groupId.idValue);
+                groupsToRetry.add(groupId);
+                break;
             case COORDINATOR_NOT_AVAILABLE:
+            case NOT_COORDINATOR:
+                // If the coordinator is unavailable or there was a coordinator change, then we unmap
+                // the key so that we retry the `FindCoordinator` request
+                log.debug("`OffsetFetch` request for group id {} returned error {}. " +
+                    "Will attempt to find the coordinator again and retry", groupId.idValue, error);
+                groupsToUnmap.add(groupId);
+                break;
+
+            default:
+                final String unexpectedErrorMsg =
+                    String.format("`OffsetFetch` request for group id %s failed due to error %s", groupId.idValue, error);
+                log.error(unexpectedErrorMsg);
+                failed.put(groupId, error.exception(unexpectedErrorMsg));

Review comment:
       Could we also remove providing the error message here like we did for the others?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] showuon commented on a change in pull request #11026: KAFKA-13064: refactor ListConsumerGroupOffsetsHandler and tests

Posted by GitBox <gi...@apache.org>.
showuon commented on a change in pull request #11026:
URL: https://github.com/apache/kafka/pull/11026#discussion_r669554178



##########
File path: clients/src/main/java/org/apache/kafka/clients/admin/internals/ListConsumerGroupOffsetsHandler.java
##########
@@ -85,11 +86,12 @@ public String apiName() {
         final OffsetFetchResponse response = (OffsetFetchResponse) abstractResponse;
         Map<CoordinatorKey, Map<TopicPartition, OffsetAndMetadata>> completed = new HashMap<>();
         Map<CoordinatorKey, Throwable> failed = new HashMap<>();
-        List<CoordinatorKey> unmapped = new ArrayList<>();
+        final Set<CoordinatorKey> groupsToUnmap = new HashSet<>();

Review comment:
       Updated! I also updated in all other PRs. Thanks.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] showuon commented on a change in pull request #11026: KAFKA-13064: refactor ListConsumerGroupOffsetsHandler and tests

Posted by GitBox <gi...@apache.org>.
showuon commented on a change in pull request #11026:
URL: https://github.com/apache/kafka/pull/11026#discussion_r670488214



##########
File path: clients/src/main/java/org/apache/kafka/clients/admin/internals/ListConsumerGroupOffsetsHandler.java
##########
@@ -110,40 +124,96 @@ public String apiName() {
                         groupOffsetsListing.put(topicPartition, new OffsetAndMetadata(offset, leaderEpoch, metadata));
                     }
                 } else {
-                    log.warn("Skipping return offset for {} due to error {}.", topicPartition, error);
+                    // In responseData V0 and V1, there's no top level error, we have to handle errors here
+                    handlePartitionError(groupId, topicPartition, error, groupsToUnmap, groupsToRetry);
                 }
             }
             completed.put(groupId, groupOffsetsListing);
         }
-        return new ApiResult<>(completed, failed, unmapped);
+
+        if (groupsToUnmap.isEmpty() && groupsToRetry.isEmpty()) {
+            return new ApiResult<>(
+                completed,
+                failed,
+                Collections.emptyList()
+            );
+        } else {
+            // retry the request, so don't send completed/failed results back
+            return new ApiResult<>(
+                Collections.emptyMap(),
+                Collections.emptyMap(),
+                new ArrayList<>(groupsToUnmap)
+            );
+        }
     }
 
-    private void handleError(
+    private void handleGroupError(
         CoordinatorKey groupId,
         Errors error,
-        Map<CoordinatorKey,
-        Throwable> failed,
-        List<CoordinatorKey> unmapped
+        Map<CoordinatorKey, Throwable> failed,
+        Set<CoordinatorKey> groupsToUnmap,
+        Set<CoordinatorKey> groupsToRetry
     ) {
         switch (error) {
             case GROUP_AUTHORIZATION_FAILED:
-                log.error("Received authorization failure for group {} in `OffsetFetch` response", groupId,
-                        error.exception());
+                log.debug("`OffsetFetch` request for group id {} failed due to error {}", groupId.idValue, error);
                 failed.put(groupId, error.exception());
                 break;
+
             case COORDINATOR_LOAD_IN_PROGRESS:
+                // If the coordinator is in the middle of loading, then we just need to retry
+                log.debug("`OffsetFetch` request for group id {} failed because the coordinator " +
+                    "is still in the process of loading state. Will retry", groupId.idValue);
+                groupsToRetry.add(groupId);
+                break;
             case COORDINATOR_NOT_AVAILABLE:
+            case NOT_COORDINATOR:
+                // If the coordinator is unavailable or there was a coordinator change, then we unmap
+                // the key so that we retry the `FindCoordinator` request
+                log.debug("`OffsetFetch` request for group id {} returned error {}. " +
+                    "Will attempt to find the coordinator again and retry", groupId.idValue, error);
+                groupsToUnmap.add(groupId);
+                break;
+
+            default:
+                final String unexpectedErrorMsg =
+                    String.format("`OffsetFetch` request for group id %s failed due to error %s", groupId.idValue, error);
+                log.error(unexpectedErrorMsg);
+                failed.put(groupId, error.exception(unexpectedErrorMsg));
+        }
+    }
+
+    private void handlePartitionError(
+        CoordinatorKey groupId,
+        TopicPartition topicPartition,
+        Errors error,
+        Set<CoordinatorKey> groupsToUnmap,
+        Set<CoordinatorKey> groupsToRetry
+    ) {
+        switch (error) {
+            case COORDINATOR_LOAD_IN_PROGRESS:
+                // If the coordinator is in the middle of loading, then we just need to retry
+                log.debug("`{}` request for group {} failed because the coordinator " +

Review comment:
       Oh, sorry, I forgot the partitionError section. Will do.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] dajac commented on a change in pull request #11026: KAFKA-13064: refactor ListConsumerGroupOffsetsHandler and tests

Posted by GitBox <gi...@apache.org>.
dajac commented on a change in pull request #11026:
URL: https://github.com/apache/kafka/pull/11026#discussion_r670436565



##########
File path: clients/src/main/java/org/apache/kafka/clients/admin/internals/ListConsumerGroupOffsetsHandler.java
##########
@@ -110,41 +112,97 @@ public String apiName() {
                         groupOffsetsListing.put(topicPartition, new OffsetAndMetadata(offset, leaderEpoch, metadata));
                     }
                 } else {
-                    log.warn("Skipping return offset for {} due to error {}.", topicPartition, error);
+                    handlePartitionError(groupId, topicPartition, error, groupsToUnmap, groupsToRetry);
                 }
             }
             completed.put(groupId, groupOffsetsListing);
         }
-        return new ApiResult<>(completed, failed, unmapped);
+
+        if (groupsToUnmap.isEmpty() && groupsToRetry.isEmpty()) {
+            return new ApiResult<>(
+                completed,

Review comment:
       @showuon I think that there is a case that we don't handle correctly.
   
   Imagine that `GROUP_AUTHORIZATION_FAILED` is returned as a partition error. In this case, we ignore it in `handlePartitionError` and therefore don't add the failed group to `failed`. I think that we should also handle all the group level errors in `handlePartitionError`.
   
   The second thing is that if there is a group failure, we should not add the group to `completed` at L131. Otherwise, this will complete the group future with an empty list.
   
   Could you check this out and add a test for it?

##########
File path: clients/src/main/java/org/apache/kafka/clients/admin/internals/ListConsumerGroupOffsetsHandler.java
##########
@@ -110,40 +124,96 @@ public String apiName() {
                         groupOffsetsListing.put(topicPartition, new OffsetAndMetadata(offset, leaderEpoch, metadata));
                     }
                 } else {
-                    log.warn("Skipping return offset for {} due to error {}.", topicPartition, error);
+                    // In responseData V0 and V1, there's no top level error, we have to handle errors here
+                    handlePartitionError(groupId, topicPartition, error, groupsToUnmap, groupsToRetry);
                 }
             }
             completed.put(groupId, groupOffsetsListing);
         }
-        return new ApiResult<>(completed, failed, unmapped);
+
+        if (groupsToUnmap.isEmpty() && groupsToRetry.isEmpty()) {
+            return new ApiResult<>(
+                completed,
+                failed,
+                Collections.emptyList()
+            );
+        } else {
+            // retry the request, so don't send completed/failed results back
+            return new ApiResult<>(
+                Collections.emptyMap(),
+                Collections.emptyMap(),
+                new ArrayList<>(groupsToUnmap)
+            );
+        }
     }
 
-    private void handleError(
+    private void handleGroupError(
         CoordinatorKey groupId,
         Errors error,
-        Map<CoordinatorKey,
-        Throwable> failed,
-        List<CoordinatorKey> unmapped
+        Map<CoordinatorKey, Throwable> failed,
+        Set<CoordinatorKey> groupsToUnmap,
+        Set<CoordinatorKey> groupsToRetry
     ) {
         switch (error) {
             case GROUP_AUTHORIZATION_FAILED:
-                log.error("Received authorization failure for group {} in `OffsetFetch` response", groupId,
-                        error.exception());
+                log.debug("`OffsetFetch` request for group id {} failed due to error {}", groupId.idValue, error);
                 failed.put(groupId, error.exception());
                 break;
+
             case COORDINATOR_LOAD_IN_PROGRESS:
+                // If the coordinator is in the middle of loading, then we just need to retry
+                log.debug("`OffsetFetch` request for group id {} failed because the coordinator " +
+                    "is still in the process of loading state. Will retry", groupId.idValue);
+                groupsToRetry.add(groupId);
+                break;
             case COORDINATOR_NOT_AVAILABLE:
+            case NOT_COORDINATOR:
+                // If the coordinator is unavailable or there was a coordinator change, then we unmap
+                // the key so that we retry the `FindCoordinator` request
+                log.debug("`OffsetFetch` request for group id {} returned error {}. " +
+                    "Will attempt to find the coordinator again and retry", groupId.idValue, error);
+                groupsToUnmap.add(groupId);
+                break;
+
+            default:
+                final String unexpectedErrorMsg =
+                    String.format("`OffsetFetch` request for group id %s failed due to error %s", groupId.idValue, error);
+                log.error(unexpectedErrorMsg);
+                failed.put(groupId, error.exception(unexpectedErrorMsg));
+        }
+    }
+
+    private void handlePartitionError(
+        CoordinatorKey groupId,
+        TopicPartition topicPartition,
+        Errors error,
+        Set<CoordinatorKey> groupsToUnmap,
+        Set<CoordinatorKey> groupsToRetry
+    ) {
+        switch (error) {
+            case COORDINATOR_LOAD_IN_PROGRESS:
+                // If the coordinator is in the middle of loading, then we just need to retry
+                log.debug("`{}` request for group {} failed because the coordinator " +

Review comment:
       Could we also update the log messages here and below to follow what you did in `handleGroupError`?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] dajac merged pull request #11026: KAFKA-13064: Make ListConsumerGroupOffsetsHandler unmap for COORDINATOR_NOT_AVAILABLE error

Posted by GitBox <gi...@apache.org>.
dajac merged pull request #11026:
URL: https://github.com/apache/kafka/pull/11026


   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] showuon commented on a change in pull request #11026: KAFKA-13064: refactor ListConsumerGroupOffsetsHandler and tests

Posted by GitBox <gi...@apache.org>.
showuon commented on a change in pull request #11026:
URL: https://github.com/apache/kafka/pull/11026#discussion_r670487440



##########
File path: clients/src/main/java/org/apache/kafka/clients/admin/internals/ListConsumerGroupOffsetsHandler.java
##########
@@ -110,41 +112,97 @@ public String apiName() {
                         groupOffsetsListing.put(topicPartition, new OffsetAndMetadata(offset, leaderEpoch, metadata));
                     }
                 } else {
-                    log.warn("Skipping return offset for {} due to error {}.", topicPartition, error);
+                    handlePartitionError(groupId, topicPartition, error, groupsToUnmap, groupsToRetry);
                 }
             }
             completed.put(groupId, groupOffsetsListing);
         }
-        return new ApiResult<>(completed, failed, unmapped);
+
+        if (groupsToUnmap.isEmpty() && groupsToRetry.isEmpty()) {
+            return new ApiResult<>(
+                completed,

Review comment:
       Good suggestion! Will do it tomorrow (my time). Thanks.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] dajac commented on a change in pull request #11026: KAFKA-13064: refactor ListConsumerGroupOffsetsHandler and tests

Posted by GitBox <gi...@apache.org>.
dajac commented on a change in pull request #11026:
URL: https://github.com/apache/kafka/pull/11026#discussion_r670737002



##########
File path: clients/src/main/java/org/apache/kafka/clients/admin/internals/ListConsumerGroupOffsetsHandler.java
##########
@@ -82,14 +93,17 @@ public String apiName() {
         Set<CoordinatorKey> groupIds,
         AbstractResponse abstractResponse
     ) {
+        validateKeys(groupIds);
+
         final OffsetFetchResponse response = (OffsetFetchResponse) abstractResponse;
-        Map<CoordinatorKey, Map<TopicPartition, OffsetAndMetadata>> completed = new HashMap<>();
-        Map<CoordinatorKey, Throwable> failed = new HashMap<>();
-        List<CoordinatorKey> unmapped = new ArrayList<>();
+        final Map<CoordinatorKey, Map<TopicPartition, OffsetAndMetadata>> completed = new HashMap<>();
+        final Map<CoordinatorKey, Throwable> failed = new HashMap<>();
+        final Set<CoordinatorKey> groupsToUnmap = new HashSet<>();
+        final Set<CoordinatorKey> groupsToRetry = new HashSet<>();
 
         Errors responseError = response.groupLevelError(groupId.idValue);

Review comment:
       @showuon I just looked at the implementation of 'topLevelError' and I realized that it checks the partition errors as well. Therefore, it seems that we don't need to check them again afterwards. Sorry for this. I was not aware of this.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org