You are viewing a plain text version of this content. The canonical link for it is here.
Posted to jira@kafka.apache.org by GitBox <gi...@apache.org> on 2021/07/19 10:50:57 UTC

[GitHub] [kafka] rajinisivaram commented on a change in pull request #11016: KAFKA-13058; AlterConsumerGroupOffsetsHandler does not handle partition errors correctly.

rajinisivaram commented on a change in pull request #11016:
URL: https://github.com/apache/kafka/pull/11016#discussion_r672176498



##########
File path: clients/src/main/java/org/apache/kafka/clients/admin/internals/AlterConsumerGroupOffsetsHandler.java
##########
@@ -73,29 +75,40 @@ public String apiName() {
         return AdminApiFuture.forKeys(Collections.singleton(CoordinatorKey.byGroupId(groupId)));
     }
 
-    @Override
-    public OffsetCommitRequest.Builder buildRequest(int coordinatorId, Set<CoordinatorKey> keys) {
-        List<OffsetCommitRequestTopic> topics = new ArrayList<>();
-        Map<String, List<OffsetCommitRequestPartition>> offsetData = new HashMap<>();
-        for (Map.Entry<TopicPartition, OffsetAndMetadata> entry : offsets.entrySet()) {
-            String topic = entry.getKey().topic();
-            OffsetAndMetadata oam = entry.getValue();
-            OffsetCommitRequestPartition partition = new OffsetCommitRequestPartition()
-                    .setCommittedOffset(oam.offset())
-                    .setCommittedLeaderEpoch(oam.leaderEpoch().orElse(-1))
-                    .setCommittedMetadata(oam.metadata())
-                    .setPartitionIndex(entry.getKey().partition());
-            offsetData.computeIfAbsent(topic, key -> new ArrayList<>()).add(partition);
-        }
-        for (Map.Entry<String, List<OffsetCommitRequestPartition>> entry : offsetData.entrySet()) {
-            OffsetCommitRequestTopic topic = new OffsetCommitRequestTopic()
-                    .setName(entry.getKey())
-                    .setPartitions(entry.getValue());
-            topics.add(topic);
+    private void validateKeys(
+        Set<CoordinatorKey> groupIds
+    ) {

Review comment:
       I think we put args on the same line unless the list is too big

##########
File path: clients/src/main/java/org/apache/kafka/clients/admin/internals/AlterConsumerGroupOffsetsHandler.java
##########
@@ -105,53 +118,96 @@ public String apiName() {
         Set<CoordinatorKey> groupIds,
         AbstractResponse abstractResponse
     ) {
+        validateKeys(groupIds);
+
         final OffsetCommitResponse response = (OffsetCommitResponse) abstractResponse;
-        Map<CoordinatorKey, Map<TopicPartition, Errors>> completed = new HashMap<>();
-        Map<CoordinatorKey, Throwable> failed = new HashMap<>();
-        List<CoordinatorKey> unmapped = new ArrayList<>();
+        final Set<CoordinatorKey> groupsToUnmap = new HashSet<>();
+        final Set<CoordinatorKey> groupsToRetry = new HashSet<>();
+        final Map<TopicPartition, Errors> partitionResults = new HashMap<>();
 
-        Map<TopicPartition, Errors> partitions = new HashMap<>();
         for (OffsetCommitResponseTopic topic : response.data().topics()) {
             for (OffsetCommitResponsePartition partition : topic.partitions()) {
-                TopicPartition tp = new TopicPartition(topic.name(), partition.partitionIndex());
+                TopicPartition topicPartition = new TopicPartition(topic.name(), partition.partitionIndex());
                 Errors error = Errors.forCode(partition.errorCode());
+
                 if (error != Errors.NONE) {
-                    handleError(groupId, error, failed, unmapped);
+                    handleError(
+                        groupId,
+                        topicPartition,
+                        error,
+                        partitionResults,
+                        groupsToUnmap,
+                        groupsToRetry
+                    );
                 } else {
-                    partitions.put(tp, error);
+                    partitionResults.put(topicPartition, error);
                 }
             }
         }
-        if (failed.isEmpty() && unmapped.isEmpty())
-            completed.put(groupId, partitions);
 
-        return new ApiResult<>(completed, failed, unmapped);
+        if (groupsToUnmap.isEmpty() && groupsToRetry.isEmpty()) {
+            return new ApiResult<>(
+                Collections.singletonMap(groupId, partitionResults),
+                Collections.emptyMap(),
+                Collections.emptyList()
+            );
+        } else {
+            return new ApiResult<>(
+                Collections.emptyMap(),
+                Collections.emptyMap(),
+                new ArrayList<>(groupsToUnmap)
+            );
+        }
     }
 
     private void handleError(
         CoordinatorKey groupId,
+        TopicPartition topicPartition,
         Errors error,
-        Map<CoordinatorKey, Throwable> failed,
-        List<CoordinatorKey> unmapped
+        Map<TopicPartition, Errors> partitionResults,
+        Set<CoordinatorKey> groupsToUnmap,
+        Set<CoordinatorKey> groupsToRetry
     ) {
         switch (error) {
-            case GROUP_AUTHORIZATION_FAILED:
-                log.error("Received authorization failure for group {} in `OffsetCommit` response", groupId,
-                        error.exception());
-                failed.put(groupId, error.exception());
-                break;
+            // If the coordinator is in the middle of loading, then we just need to retry.
             case COORDINATOR_LOAD_IN_PROGRESS:
+                log.debug("OffsetCommit request for group id {} failed because the coordinator" +
+                    " is still in the process of loading state. Will retry.", groupId.idValue);
+                groupsToRetry.add(groupId);
+                break;
+
+            // If the coordinator is not available, then we unmap and retry.
             case COORDINATOR_NOT_AVAILABLE:
             case NOT_COORDINATOR:
-                log.debug("OffsetCommit request for group {} returned error {}. Will retry", groupId, error);
-                unmapped.add(groupId);
+                log.debug("OffsetCommit request for group id {} returned error {}. Will retry.",
+                    groupId.idValue, error);
+                groupsToUnmap.add(groupId);
                 break;
+
+            // Group level errors.
+            case INVALID_GROUP_ID:
+            case REBALANCE_IN_PROGRESS:
+            case INVALID_COMMIT_OFFSET_SIZE:
+            case GROUP_AUTHORIZATION_FAILED:
+                log.debug("OffsetCommit request for group id {} failed due to error {}.",
+                    groupId.idValue, error);
+                partitionResults.put(topicPartition, error);

Review comment:
       Should REBALANCE_IN_PROGRESS and GROUP_AUTHORIZATION_FAILED be added to `groupsToRetry`?

##########
File path: clients/src/main/java/org/apache/kafka/clients/admin/internals/AlterConsumerGroupOffsetsHandler.java
##########
@@ -105,53 +118,96 @@ public String apiName() {
         Set<CoordinatorKey> groupIds,
         AbstractResponse abstractResponse
     ) {
+        validateKeys(groupIds);
+
         final OffsetCommitResponse response = (OffsetCommitResponse) abstractResponse;
-        Map<CoordinatorKey, Map<TopicPartition, Errors>> completed = new HashMap<>();
-        Map<CoordinatorKey, Throwable> failed = new HashMap<>();
-        List<CoordinatorKey> unmapped = new ArrayList<>();
+        final Set<CoordinatorKey> groupsToUnmap = new HashSet<>();
+        final Set<CoordinatorKey> groupsToRetry = new HashSet<>();
+        final Map<TopicPartition, Errors> partitionResults = new HashMap<>();
 
-        Map<TopicPartition, Errors> partitions = new HashMap<>();
         for (OffsetCommitResponseTopic topic : response.data().topics()) {
             for (OffsetCommitResponsePartition partition : topic.partitions()) {
-                TopicPartition tp = new TopicPartition(topic.name(), partition.partitionIndex());
+                TopicPartition topicPartition = new TopicPartition(topic.name(), partition.partitionIndex());
                 Errors error = Errors.forCode(partition.errorCode());
+
                 if (error != Errors.NONE) {
-                    handleError(groupId, error, failed, unmapped);
+                    handleError(
+                        groupId,
+                        topicPartition,
+                        error,
+                        partitionResults,
+                        groupsToUnmap,
+                        groupsToRetry
+                    );
                 } else {
-                    partitions.put(tp, error);
+                    partitionResults.put(topicPartition, error);
                 }
             }
         }
-        if (failed.isEmpty() && unmapped.isEmpty())
-            completed.put(groupId, partitions);
 
-        return new ApiResult<>(completed, failed, unmapped);
+        if (groupsToUnmap.isEmpty() && groupsToRetry.isEmpty()) {
+            return new ApiResult<>(

Review comment:
       We can use `ApiResult.completed()`

##########
File path: clients/src/main/java/org/apache/kafka/clients/admin/internals/AlterConsumerGroupOffsetsHandler.java
##########
@@ -105,53 +118,96 @@ public String apiName() {
         Set<CoordinatorKey> groupIds,
         AbstractResponse abstractResponse
     ) {
+        validateKeys(groupIds);
+
         final OffsetCommitResponse response = (OffsetCommitResponse) abstractResponse;
-        Map<CoordinatorKey, Map<TopicPartition, Errors>> completed = new HashMap<>();
-        Map<CoordinatorKey, Throwable> failed = new HashMap<>();
-        List<CoordinatorKey> unmapped = new ArrayList<>();
+        final Set<CoordinatorKey> groupsToUnmap = new HashSet<>();
+        final Set<CoordinatorKey> groupsToRetry = new HashSet<>();
+        final Map<TopicPartition, Errors> partitionResults = new HashMap<>();
 
-        Map<TopicPartition, Errors> partitions = new HashMap<>();
         for (OffsetCommitResponseTopic topic : response.data().topics()) {
             for (OffsetCommitResponsePartition partition : topic.partitions()) {
-                TopicPartition tp = new TopicPartition(topic.name(), partition.partitionIndex());
+                TopicPartition topicPartition = new TopicPartition(topic.name(), partition.partitionIndex());
                 Errors error = Errors.forCode(partition.errorCode());
+
                 if (error != Errors.NONE) {
-                    handleError(groupId, error, failed, unmapped);
+                    handleError(
+                        groupId,
+                        topicPartition,
+                        error,
+                        partitionResults,
+                        groupsToUnmap,
+                        groupsToRetry
+                    );
                 } else {
-                    partitions.put(tp, error);
+                    partitionResults.put(topicPartition, error);
                 }
             }
         }
-        if (failed.isEmpty() && unmapped.isEmpty())
-            completed.put(groupId, partitions);
 
-        return new ApiResult<>(completed, failed, unmapped);
+        if (groupsToUnmap.isEmpty() && groupsToRetry.isEmpty()) {
+            return new ApiResult<>(
+                Collections.singletonMap(groupId, partitionResults),
+                Collections.emptyMap(),
+                Collections.emptyList()
+            );
+        } else {
+            return new ApiResult<>(

Review comment:
       We can use `ApiResult.unmapped()`

##########
File path: clients/src/main/java/org/apache/kafka/clients/admin/internals/AlterConsumerGroupOffsetsHandler.java
##########
@@ -105,53 +118,96 @@ public String apiName() {
         Set<CoordinatorKey> groupIds,
         AbstractResponse abstractResponse
     ) {
+        validateKeys(groupIds);
+
         final OffsetCommitResponse response = (OffsetCommitResponse) abstractResponse;
-        Map<CoordinatorKey, Map<TopicPartition, Errors>> completed = new HashMap<>();
-        Map<CoordinatorKey, Throwable> failed = new HashMap<>();
-        List<CoordinatorKey> unmapped = new ArrayList<>();
+        final Set<CoordinatorKey> groupsToUnmap = new HashSet<>();
+        final Set<CoordinatorKey> groupsToRetry = new HashSet<>();
+        final Map<TopicPartition, Errors> partitionResults = new HashMap<>();
 
-        Map<TopicPartition, Errors> partitions = new HashMap<>();
         for (OffsetCommitResponseTopic topic : response.data().topics()) {
             for (OffsetCommitResponsePartition partition : topic.partitions()) {
-                TopicPartition tp = new TopicPartition(topic.name(), partition.partitionIndex());
+                TopicPartition topicPartition = new TopicPartition(topic.name(), partition.partitionIndex());
                 Errors error = Errors.forCode(partition.errorCode());
+
                 if (error != Errors.NONE) {
-                    handleError(groupId, error, failed, unmapped);
+                    handleError(
+                        groupId,
+                        topicPartition,
+                        error,
+                        partitionResults,
+                        groupsToUnmap,
+                        groupsToRetry
+                    );
                 } else {
-                    partitions.put(tp, error);
+                    partitionResults.put(topicPartition, error);
                 }
             }
         }
-        if (failed.isEmpty() && unmapped.isEmpty())
-            completed.put(groupId, partitions);
 
-        return new ApiResult<>(completed, failed, unmapped);
+        if (groupsToUnmap.isEmpty() && groupsToRetry.isEmpty()) {
+            return new ApiResult<>(
+                Collections.singletonMap(groupId, partitionResults),
+                Collections.emptyMap(),
+                Collections.emptyList()
+            );
+        } else {
+            return new ApiResult<>(
+                Collections.emptyMap(),
+                Collections.emptyMap(),
+                new ArrayList<>(groupsToUnmap)
+            );
+        }
     }
 
     private void handleError(
         CoordinatorKey groupId,
+        TopicPartition topicPartition,
         Errors error,
-        Map<CoordinatorKey, Throwable> failed,
-        List<CoordinatorKey> unmapped
+        Map<TopicPartition, Errors> partitionResults,
+        Set<CoordinatorKey> groupsToUnmap,
+        Set<CoordinatorKey> groupsToRetry
     ) {
         switch (error) {
-            case GROUP_AUTHORIZATION_FAILED:
-                log.error("Received authorization failure for group {} in `OffsetCommit` response", groupId,
-                        error.exception());
-                failed.put(groupId, error.exception());
-                break;
+            // If the coordinator is in the middle of loading, then we just need to retry.
             case COORDINATOR_LOAD_IN_PROGRESS:
+                log.debug("OffsetCommit request for group id {} failed because the coordinator" +
+                    " is still in the process of loading state. Will retry.", groupId.idValue);
+                groupsToRetry.add(groupId);
+                break;
+
+            // If the coordinator is not available, then we unmap and retry.
             case COORDINATOR_NOT_AVAILABLE:
             case NOT_COORDINATOR:
-                log.debug("OffsetCommit request for group {} returned error {}. Will retry", groupId, error);
-                unmapped.add(groupId);
+                log.debug("OffsetCommit request for group id {} returned error {}. Will retry.",
+                    groupId.idValue, error);
+                groupsToUnmap.add(groupId);
                 break;
+
+            // Group level errors.
+            case INVALID_GROUP_ID:
+            case REBALANCE_IN_PROGRESS:
+            case INVALID_COMMIT_OFFSET_SIZE:

Review comment:
       Is this a group-level error?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org