You are viewing a plain text version of this content. The canonical link for it is here.
Posted to jira@kafka.apache.org by GitBox <gi...@apache.org> on 2023/01/09 13:04:33 UTC

[GitHub] [kafka] dajac commented on a diff in pull request #12870: KAFKA-14367; Add `OffsetFetch` to the new `GroupCoordinator` interface

dajac commented on code in PR #12870:
URL: https://github.com/apache/kafka/pull/12870#discussion_r1064618016


##########
clients/src/main/java/org/apache/kafka/common/requests/OffsetFetchResponse.java:
##########
@@ -208,6 +203,56 @@ public OffsetFetchResponse(int throttleTimeMs,
         this.error = null;
     }
 
+    public OffsetFetchResponse(List<OffsetFetchResponseGroup> groups, short version) {
+        super(ApiKeys.OFFSET_FETCH);
+        data = new OffsetFetchResponseData();
+
+        if (version >= 8) {
+            data.setGroups(groups);
+            error = null;
+
+            for (OffsetFetchResponseGroup group : data.groups()) {
+                this.groupLevelErrors.put(group.groupId(), Errors.forCode(group.errorCode()));
+            }
+        } else {
+            if (groups.size() != 1) {
+                throw new UnsupportedVersionException(
+                    "Version " + version + " of OffsetFetchResponse only support one group."
+                );
+            }
+
+            OffsetFetchResponseGroup group = groups.get(0);
+            data.setErrorCode(group.errorCode());
+            error = Errors.forCode(group.errorCode());
+
+            group.topics().forEach(topic -> {
+                OffsetFetchResponseTopic newTopic = new OffsetFetchResponseTopic().setName(topic.name());
+                data.topics().add(newTopic);
+
+                topic.partitions().forEach(partition -> {
+                    OffsetFetchResponsePartition newPartition;
+
+                    if (version < 2 && group.errorCode() != Errors.NONE.code()) {
+                        // Versions prior to version 2 does not support a top level error. Therefore
+                        // we put it at the partition level.
+                        newPartition = new OffsetFetchResponsePartition()
+                            .setPartitionIndex(partition.partitionIndex())
+                            .setErrorCode(group.errorCode());
+                    } else {

Review Comment:
   It is still possible to have a partition level error with version >= 2 (e.g. UNSTABLE_OFFSET_COMMIT). To answer your second point, if there is an error, the offset/metadata should be correctly set at this stage so we can just copy whatever we have got here.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org