You are viewing a plain text version of this content. The canonical link for it is here.
Posted to jira@kafka.apache.org by GitBox <gi...@apache.org> on 2021/07/03 11:56:49 UTC

[GitHub] [kafka] rajinisivaram commented on a change in pull request #10962: KIP-709: Implement request/response for offsetFetch batching

rajinisivaram commented on a change in pull request #10962:
URL: https://github.com/apache/kafka/pull/10962#discussion_r663350976



##########
File path: clients/src/main/java/org/apache/kafka/clients/admin/internals/ListConsumerGroupOffsetsHandler.java
##########
@@ -73,7 +73,10 @@ public String apiName() {
     public OffsetFetchRequest.Builder buildRequest(int coordinatorId, Set<CoordinatorKey> keys) {
         // Set the flag to false as for admin client request,
         // we don't need to wait for any pending offset state to clear.
-        return new OffsetFetchRequest.Builder(groupId.idValue, false, partitions, false);

Review comment:
       Given that single groupid is a common pattern, we could just retain the old constructor as well.

##########
File path: clients/src/main/java/org/apache/kafka/common/requests/OffsetFetchRequest.java
##########
@@ -174,6 +319,10 @@ public boolean isAllPartitions() {
         return data.topics() == ALL_TOPIC_PARTITIONS;
     }
 
+    public List<OffsetFetchRequestTopics> isAllPartitionsForGroup() {

Review comment:
       Name of method suggests we are returning a boolean, but we are returning null. Why do we need a public method that always returns null?

##########
File path: clients/src/main/java/org/apache/kafka/common/requests/OffsetFetchResponse.java
##########
@@ -65,6 +69,8 @@
 
     private final OffsetFetchResponseData data;
     private final Errors error;
+    private final Map<String, Errors> groupLevelErrors = new HashMap<>();
+    private final Map<String, Map<TopicPartition, PartitionData>> groupToPartitionData = new HashMap<>();

Review comment:
       Why are we caching these when we have them in `data`?

##########
File path: clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java
##########
@@ -1308,29 +1308,41 @@ private OffsetFetchResponseHandler() {
 
         @Override
         public void handle(OffsetFetchResponse response, RequestFuture<Map<TopicPartition, OffsetAndMetadata>> future) {
-            if (response.hasError()) {
-                Errors error = response.error();
-                log.debug("Offset fetch failed: {}", error.message());
+            Errors responseError = response.error();
+            // check if error is null, if it is we are dealing with v8 response
+            if (responseError == null) {

Review comment:
       We can move this to the response object, otherwise it is duplicated from above.

##########
File path: clients/src/main/java/org/apache/kafka/common/requests/OffsetFetchRequest.java
##########
@@ -68,36 +74,117 @@ public Builder(String groupId,
             }
 
             this.data = new OffsetFetchRequestData()
-                            .setGroupId(groupId)
-                            .setRequireStable(requireStable)
-                            .setTopics(topics);
+                .setGroupId(groupId)
+                .setRequireStable(requireStable)
+                .setTopics(topics);

Review comment:
       nit: unnecessary indentation change

##########
File path: clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java
##########
@@ -1308,29 +1308,41 @@ private OffsetFetchResponseHandler() {
 
         @Override
         public void handle(OffsetFetchResponse response, RequestFuture<Map<TopicPartition, OffsetAndMetadata>> future) {
-            if (response.hasError()) {
-                Errors error = response.error();
-                log.debug("Offset fetch failed: {}", error.message());
+            Errors responseError = response.error();
+            // check if error is null, if it is we are dealing with v8 response
+            if (responseError == null) {
+                if (response.groupHasError(rebalanceConfig.groupId)) {
+                    responseError = response.groupLevelError(rebalanceConfig.groupId);
+                } else {
+                    responseError = Errors.NONE;
+                }
+            }
+            if (responseError != Errors.NONE) {
+                log.debug("Offset fetch failed: {}", responseError.message());
 
-                if (error == Errors.COORDINATOR_LOAD_IN_PROGRESS) {
+                if (responseError == Errors.COORDINATOR_LOAD_IN_PROGRESS) {
                     // just retry
-                    future.raise(error);
-                } else if (error == Errors.NOT_COORDINATOR) {
+                    future.raise(responseError);
+                } else if (responseError == Errors.NOT_COORDINATOR) {
                     // re-discover the coordinator and retry
-                    markCoordinatorUnknown(error);
-                    future.raise(error);
-                } else if (error == Errors.GROUP_AUTHORIZATION_FAILED) {
+                    markCoordinatorUnknown(responseError);
+                    future.raise(responseError);
+                } else if (responseError == Errors.GROUP_AUTHORIZATION_FAILED) {
                     future.raise(GroupAuthorizationException.forGroupId(rebalanceConfig.groupId));
                 } else {
-                    future.raise(new KafkaException("Unexpected error in fetch offset response: " + error.message()));
+                    future.raise(new KafkaException("Unexpected error in fetch offset response: " + responseError.message()));
                 }
                 return;
             }
 
             Set<String> unauthorizedTopics = null;
-            Map<TopicPartition, OffsetAndMetadata> offsets = new HashMap<>(response.responseData().size());
+            // if map entry is null, we know we are handling a response less than V8
+            boolean useV8 = response.responseData(rebalanceConfig.groupId) != null;
+            Map<TopicPartition, OffsetFetchResponse.PartitionData> responseData = useV8 ?
+                response.responseData(rebalanceConfig.groupId) : response.oldResponseData();

Review comment:
       As before, it will be good if we don't have repeated logic like this in every client.

##########
File path: clients/src/main/java/org/apache/kafka/common/requests/OffsetFetchRequest.java
##########
@@ -46,9 +52,9 @@
         private final boolean throwOnFetchStableOffsetsUnsupported;
 
         public Builder(String groupId,
-                       boolean requireStable,
-                       List<TopicPartition> partitions,
-                       boolean throwOnFetchStableOffsetsUnsupported) {
+            boolean requireStable,
+            List<TopicPartition> partitions,
+            boolean throwOnFetchStableOffsetsUnsupported) {

Review comment:
       nit: unnecessary indentation change

##########
File path: clients/src/main/java/org/apache/kafka/common/requests/OffsetFetchRequest.java
##########
@@ -68,36 +74,117 @@ public Builder(String groupId,
             }
 
             this.data = new OffsetFetchRequestData()
-                            .setGroupId(groupId)
-                            .setRequireStable(requireStable)
-                            .setTopics(topics);
+                .setGroupId(groupId)
+                .setRequireStable(requireStable)
+                .setTopics(topics);
             this.throwOnFetchStableOffsetsUnsupported = throwOnFetchStableOffsetsUnsupported;
         }
 
         boolean isAllTopicPartitions() {
             return this.data.topics() == ALL_TOPIC_PARTITIONS;
         }
 
-        @Override
-        public OffsetFetchRequest build(short version) {
-            if (isAllTopicPartitions() && version < 2) {
-                throw new UnsupportedVersionException("The broker only supports OffsetFetchRequest " +
-                    "v" + version + ", but we need v2 or newer to request all topic partitions.");
-            }
+        public Builder(Map<String, List<TopicPartition>> groupIdToTopicPartitionMap,
+            boolean requireStable,
+            boolean throwOnFetchStableOffsetsUnsupported) {
+            super(ApiKeys.OFFSET_FETCH);
 
-            if (data.requireStable() && version < 7) {
-                if (throwOnFetchStableOffsetsUnsupported) {
-                    throw new UnsupportedVersionException("Broker unexpectedly " +
-                        "doesn't support requireStable flag on version " + version);
+            List<OffsetFetchRequestGroup> groups = new ArrayList<>();
+            for (Entry<String, List<TopicPartition>> entry : groupIdToTopicPartitionMap.entrySet()) {
+                final List<OffsetFetchRequestTopics> topics;
+                if (groupIdToTopicPartitionMap.get(entry.getKey()) != null) {

Review comment:
       Use `entry.getValue()`?

##########
File path: clients/src/main/java/org/apache/kafka/common/requests/OffsetFetchRequest.java
##########
@@ -68,36 +74,117 @@ public Builder(String groupId,
             }
 
             this.data = new OffsetFetchRequestData()
-                            .setGroupId(groupId)
-                            .setRequireStable(requireStable)
-                            .setTopics(topics);
+                .setGroupId(groupId)
+                .setRequireStable(requireStable)
+                .setTopics(topics);
             this.throwOnFetchStableOffsetsUnsupported = throwOnFetchStableOffsetsUnsupported;
         }
 
         boolean isAllTopicPartitions() {
             return this.data.topics() == ALL_TOPIC_PARTITIONS;
         }
 
-        @Override
-        public OffsetFetchRequest build(short version) {
-            if (isAllTopicPartitions() && version < 2) {
-                throw new UnsupportedVersionException("The broker only supports OffsetFetchRequest " +
-                    "v" + version + ", but we need v2 or newer to request all topic partitions.");
-            }
+        public Builder(Map<String, List<TopicPartition>> groupIdToTopicPartitionMap,
+            boolean requireStable,
+            boolean throwOnFetchStableOffsetsUnsupported) {
+            super(ApiKeys.OFFSET_FETCH);
 
-            if (data.requireStable() && version < 7) {
-                if (throwOnFetchStableOffsetsUnsupported) {
-                    throw new UnsupportedVersionException("Broker unexpectedly " +
-                        "doesn't support requireStable flag on version " + version);
+            List<OffsetFetchRequestGroup> groups = new ArrayList<>();
+            for (Entry<String, List<TopicPartition>> entry : groupIdToTopicPartitionMap.entrySet()) {
+                final List<OffsetFetchRequestTopics> topics;
+                if (groupIdToTopicPartitionMap.get(entry.getKey()) != null) {
+                    Map<String, OffsetFetchRequestTopics> offsetFetchRequestTopicMap =
+                        new HashMap<>();
+                    for (TopicPartition topicPartition : groupIdToTopicPartitionMap.get(entry.getKey())) {
+                        String topicName = topicPartition.topic();
+                        OffsetFetchRequestTopics topic = offsetFetchRequestTopicMap.getOrDefault(
+                            topicName, new OffsetFetchRequestTopics().setName(topicName));
+                        topic.partitionIndexes().add(topicPartition.partition());
+                        offsetFetchRequestTopicMap.put(topicName, topic);
+                    }
+                    topics = new ArrayList<>(offsetFetchRequestTopicMap.values());
                 } else {
-                    log.trace("Fallback the requireStable flag to false as broker " +
-                                  "only supports OffsetFetchRequest version {}. Need " +
-                                  "v7 or newer to enable this feature", version);
-
-                    return new OffsetFetchRequest(data.setRequireStable(false), version);
+                    topics = ALL_TOPIC_PARTITIONS_BATCH;
                 }
+                groups.add(new OffsetFetchRequestGroup()
+                    .setGroupId(entry.getKey())
+                    .setTopics(topics));
             }
+            this.data = new OffsetFetchRequestData()
+                .setGroupIds(groups)
+                .setRequireStable(requireStable);
+            this.throwOnFetchStableOffsetsUnsupported = throwOnFetchStableOffsetsUnsupported;
+        }
+
+        @Override
+        public OffsetFetchRequest build(short version) {
+            if (version < 8) {
+                if (data.groupIds().size() > 1) {
+                    throw new NoBatchedOffsetFetchRequestException("Broker does not support"
+                        + " batching groups for fetch offset request on version " + version);
+                }
+                OffsetFetchRequestData oldDataFormat = null;
+                if (!data.groupIds().isEmpty()) {
+                    OffsetFetchRequestGroup group = data.groupIds().get(0);
+                    String groupName = group.groupId();
+                    List<OffsetFetchRequestTopics> topics = group.topics();
+                    List<OffsetFetchRequestTopic> oldFormatTopics = null;
+                    if (topics != null) {
+                        oldFormatTopics = topics
+                            .stream()
+                            .map(t ->
+                                new OffsetFetchRequestTopic()
+                                    .setName(t.name())
+                                    .setPartitionIndexes(t.partitionIndexes()))
+                            .collect(Collectors.toList());
+                    }
+                    oldDataFormat = new OffsetFetchRequestData()
+                        .setGroupId(groupName)
+                        .setTopics(oldFormatTopics)
+                        .setRequireStable(data.requireStable());
+                }
+                if (isAllTopicPartitions() && version < 2) {
+                    throw new UnsupportedVersionException("The broker only supports OffsetFetchRequest " +
+                        "v" + version + ", but we need v2 or newer to request all topic partitions.");
+                }
+                if (data.requireStable() && version < 7) {

Review comment:
       Again better to move this to the top as well so that we have smaller sequence of if statements rather than nested ifs.

##########
File path: clients/src/main/java/org/apache/kafka/common/requests/OffsetFetchRequest.java
##########
@@ -152,13 +283,27 @@ public OffsetFetchResponse getErrorResponse(int throttleTimeMs, Errors error) {
                         new TopicPartition(topic.name(), partitionIndex), partitionError);
                 }
             }
+            return new OffsetFetchResponse(error, responsePartitions);
         }
-
-        if (version() >= 3) {
-            return new OffsetFetchResponse(throttleTimeMs, error, responsePartitions);
-        } else {
+        if (version() == 2) {
             return new OffsetFetchResponse(error, responsePartitions);
         }
+        if (version() >= 3 && version() < 8) {
+            return new OffsetFetchResponse(throttleTimeMs, error, responsePartitions);
+        }
+        List<String> groupIds =

Review comment:
       We have a method above that does this

##########
File path: clients/src/main/java/org/apache/kafka/clients/admin/internals/ListConsumerGroupOffsetsHandler.java
##########
@@ -87,12 +90,26 @@ public String apiName() {
         Map<CoordinatorKey, Throwable> failed = new HashMap<>();
         List<CoordinatorKey> unmapped = new ArrayList<>();
 
-        if (response.error() != Errors.NONE) {
-            handleError(groupId, response.error(), failed, unmapped);
+        Errors responseError = response.error();
+        // check if error is null, if it is we are dealing with v8 response

Review comment:
       Can we move this logic to the response class?

##########
File path: clients/src/main/java/org/apache/kafka/common/requests/OffsetFetchRequest.java
##########
@@ -68,36 +74,117 @@ public Builder(String groupId,
             }
 
             this.data = new OffsetFetchRequestData()
-                            .setGroupId(groupId)
-                            .setRequireStable(requireStable)
-                            .setTopics(topics);
+                .setGroupId(groupId)
+                .setRequireStable(requireStable)
+                .setTopics(topics);
             this.throwOnFetchStableOffsetsUnsupported = throwOnFetchStableOffsetsUnsupported;
         }
 
         boolean isAllTopicPartitions() {
             return this.data.topics() == ALL_TOPIC_PARTITIONS;
         }
 
-        @Override
-        public OffsetFetchRequest build(short version) {
-            if (isAllTopicPartitions() && version < 2) {
-                throw new UnsupportedVersionException("The broker only supports OffsetFetchRequest " +
-                    "v" + version + ", but we need v2 or newer to request all topic partitions.");
-            }
+        public Builder(Map<String, List<TopicPartition>> groupIdToTopicPartitionMap,
+            boolean requireStable,
+            boolean throwOnFetchStableOffsetsUnsupported) {
+            super(ApiKeys.OFFSET_FETCH);
 
-            if (data.requireStable() && version < 7) {
-                if (throwOnFetchStableOffsetsUnsupported) {
-                    throw new UnsupportedVersionException("Broker unexpectedly " +
-                        "doesn't support requireStable flag on version " + version);
+            List<OffsetFetchRequestGroup> groups = new ArrayList<>();
+            for (Entry<String, List<TopicPartition>> entry : groupIdToTopicPartitionMap.entrySet()) {
+                final List<OffsetFetchRequestTopics> topics;
+                if (groupIdToTopicPartitionMap.get(entry.getKey()) != null) {
+                    Map<String, OffsetFetchRequestTopics> offsetFetchRequestTopicMap =
+                        new HashMap<>();
+                    for (TopicPartition topicPartition : groupIdToTopicPartitionMap.get(entry.getKey())) {

Review comment:
       As before, `entry.getValue()`, we should just store that in a local variable 

##########
File path: clients/src/main/java/org/apache/kafka/clients/admin/internals/ListConsumerGroupOffsetsHandler.java
##########
@@ -87,12 +90,26 @@ public String apiName() {
         Map<CoordinatorKey, Throwable> failed = new HashMap<>();
         List<CoordinatorKey> unmapped = new ArrayList<>();
 
-        if (response.error() != Errors.NONE) {
-            handleError(groupId, response.error(), failed, unmapped);
+        Errors responseError = response.error();
+        // check if error is null, if it is we are dealing with v8 response
+        if (responseError == null) {
+            if (response.groupHasError(groupId.idValue)) {
+                responseError = response.groupLevelError(groupId.idValue);
+            } else {
+                responseError = Errors.NONE;
+            }
+        }
+
+        if (responseError != Errors.NONE) {
+            handleError(groupId, responseError, failed, unmapped);
         } else {
             final Map<TopicPartition, OffsetAndMetadata> groupOffsetsListing = new HashMap<>();
-            for (Map.Entry<TopicPartition, OffsetFetchResponse.PartitionData> entry :
-                response.responseData().entrySet()) {
+            // if entry for group level response data is null, we are getting back an older version
+            // of the response
+            Map<TopicPartition, OffsetFetchResponse.PartitionData> responseData =
+                response.responseData(groupId.idValue) == null ? response.oldResponseData() :
+                    response.responseData(groupId.idValue);
+            for (Map.Entry<TopicPartition, OffsetFetchResponse.PartitionData> entry : responseData.entrySet()) {

Review comment:
       As before, it will be good if we can move the version logic to the response. 

##########
File path: clients/src/main/java/org/apache/kafka/common/requests/OffsetFetchRequest.java
##########
@@ -68,36 +74,117 @@ public Builder(String groupId,
             }
 
             this.data = new OffsetFetchRequestData()
-                            .setGroupId(groupId)
-                            .setRequireStable(requireStable)
-                            .setTopics(topics);
+                .setGroupId(groupId)
+                .setRequireStable(requireStable)
+                .setTopics(topics);
             this.throwOnFetchStableOffsetsUnsupported = throwOnFetchStableOffsetsUnsupported;
         }
 
         boolean isAllTopicPartitions() {
             return this.data.topics() == ALL_TOPIC_PARTITIONS;
         }
 
-        @Override
-        public OffsetFetchRequest build(short version) {
-            if (isAllTopicPartitions() && version < 2) {
-                throw new UnsupportedVersionException("The broker only supports OffsetFetchRequest " +
-                    "v" + version + ", but we need v2 or newer to request all topic partitions.");
-            }
+        public Builder(Map<String, List<TopicPartition>> groupIdToTopicPartitionMap,
+            boolean requireStable,
+            boolean throwOnFetchStableOffsetsUnsupported) {
+            super(ApiKeys.OFFSET_FETCH);
 
-            if (data.requireStable() && version < 7) {
-                if (throwOnFetchStableOffsetsUnsupported) {
-                    throw new UnsupportedVersionException("Broker unexpectedly " +
-                        "doesn't support requireStable flag on version " + version);
+            List<OffsetFetchRequestGroup> groups = new ArrayList<>();
+            for (Entry<String, List<TopicPartition>> entry : groupIdToTopicPartitionMap.entrySet()) {
+                final List<OffsetFetchRequestTopics> topics;
+                if (groupIdToTopicPartitionMap.get(entry.getKey()) != null) {
+                    Map<String, OffsetFetchRequestTopics> offsetFetchRequestTopicMap =
+                        new HashMap<>();
+                    for (TopicPartition topicPartition : groupIdToTopicPartitionMap.get(entry.getKey())) {
+                        String topicName = topicPartition.topic();
+                        OffsetFetchRequestTopics topic = offsetFetchRequestTopicMap.getOrDefault(
+                            topicName, new OffsetFetchRequestTopics().setName(topicName));
+                        topic.partitionIndexes().add(topicPartition.partition());
+                        offsetFetchRequestTopicMap.put(topicName, topic);
+                    }
+                    topics = new ArrayList<>(offsetFetchRequestTopicMap.values());
                 } else {
-                    log.trace("Fallback the requireStable flag to false as broker " +
-                                  "only supports OffsetFetchRequest version {}. Need " +
-                                  "v7 or newer to enable this feature", version);
-
-                    return new OffsetFetchRequest(data.setRequireStable(false), version);
+                    topics = ALL_TOPIC_PARTITIONS_BATCH;
                 }
+                groups.add(new OffsetFetchRequestGroup()
+                    .setGroupId(entry.getKey())
+                    .setTopics(topics));
             }
+            this.data = new OffsetFetchRequestData()
+                .setGroupIds(groups)
+                .setRequireStable(requireStable);
+            this.throwOnFetchStableOffsetsUnsupported = throwOnFetchStableOffsetsUnsupported;
+        }
+
+        @Override
+        public OffsetFetchRequest build(short version) {
+            if (version < 8) {
+                if (data.groupIds().size() > 1) {
+                    throw new NoBatchedOffsetFetchRequestException("Broker does not support"
+                        + " batching groups for fetch offset request on version " + version);
+                }
+                OffsetFetchRequestData oldDataFormat = null;
+                if (!data.groupIds().isEmpty()) {
+                    OffsetFetchRequestGroup group = data.groupIds().get(0);
+                    String groupName = group.groupId();
+                    List<OffsetFetchRequestTopics> topics = group.topics();
+                    List<OffsetFetchRequestTopic> oldFormatTopics = null;
+                    if (topics != null) {
+                        oldFormatTopics = topics
+                            .stream()
+                            .map(t ->
+                                new OffsetFetchRequestTopic()
+                                    .setName(t.name())
+                                    .setPartitionIndexes(t.partitionIndexes()))
+                            .collect(Collectors.toList());
+                    }
+                    oldDataFormat = new OffsetFetchRequestData()
+                        .setGroupId(groupName)
+                        .setTopics(oldFormatTopics)
+                        .setRequireStable(data.requireStable());
+                }
+                if (isAllTopicPartitions() && version < 2) {

Review comment:
       This could be right at the top as it was before to avoid this big `if block`

##########
File path: clients/src/main/java/org/apache/kafka/common/requests/OffsetFetchResponse.java
##########
@@ -154,14 +166,88 @@ public OffsetFetchResponse(int throttleTimeMs, Errors error, Map<TopicPartition,
         this.error = error;
     }
 
+    /**
+     * Constructor without throttle time for version 8 and above.
+     * @param errors Error code on a per group level basis
+     * @param responseData Fetched offset information grouped group id
+     */
+    public OffsetFetchResponse(Map<String, Errors> errors, Map<String, Map<TopicPartition, PartitionData>> responseData) {

Review comment:
       Why do we have this constructor?

##########
File path: clients/src/main/java/org/apache/kafka/common/requests/OffsetFetchRequest.java
##########
@@ -128,6 +227,38 @@ public boolean requireStable() {
         return data.requireStable();
     }
 
+    public Map<String, List<TopicPartition>> groupIdsToPartitions() {
+        Map<String, List<TopicPartition>> groupIdsToPartitions = new HashMap<>();
+        for (OffsetFetchRequestGroup group : data.groupIds()) {
+            List<TopicPartition> tpList = null;
+            if (group.topics() != ALL_TOPIC_PARTITIONS_BATCH) {
+                tpList = new ArrayList<>();
+                for (OffsetFetchRequestTopics topic : group.topics()) {
+                    for (Integer partitionIndex : topic.partitionIndexes()) {
+                        tpList.add(new TopicPartition(topic.name(), partitionIndex));
+                    }
+                }
+            }
+            groupIdsToPartitions.put(group.groupId(), tpList);
+        }
+        return groupIdsToPartitions;
+    }
+
+    public Map<String, List<OffsetFetchRequestTopics>> groupIdsToTopics() {
+        Map<String, List<OffsetFetchRequestTopics>> groupIdsToTopics = new HashMap<>();
+        for (OffsetFetchRequestGroup group : data.groupIds()) {

Review comment:
       Could use stream()?

##########
File path: clients/src/main/java/org/apache/kafka/common/requests/OffsetFetchResponse.java
##########
@@ -154,14 +166,88 @@ public OffsetFetchResponse(int throttleTimeMs, Errors error, Map<TopicPartition,
         this.error = error;
     }
 
+    /**
+     * Constructor without throttle time for version 8 and above.
+     * @param errors Error code on a per group level basis
+     * @param responseData Fetched offset information grouped group id
+     */
+    public OffsetFetchResponse(Map<String, Errors> errors, Map<String, Map<TopicPartition, PartitionData>> responseData) {
+        this(DEFAULT_THROTTLE_TIME, errors, responseData);
+    }
+
+    /**
+     * Constructor with throttle time for version 8 and above.
+     * @param throttleTimeMs The time in milliseconds that this response was throttled
+     * @param errors Potential coordinator or group level error code (for api version 2 and later)

Review comment:
       why api version 2 and later? This constructor is or version 8 and above?

##########
File path: clients/src/main/java/org/apache/kafka/common/requests/OffsetFetchResponse.java
##########
@@ -154,14 +166,88 @@ public OffsetFetchResponse(int throttleTimeMs, Errors error, Map<TopicPartition,
         this.error = error;
     }
 
+    /**
+     * Constructor without throttle time for version 8 and above.
+     * @param errors Error code on a per group level basis
+     * @param responseData Fetched offset information grouped group id
+     */
+    public OffsetFetchResponse(Map<String, Errors> errors, Map<String, Map<TopicPartition, PartitionData>> responseData) {
+        this(DEFAULT_THROTTLE_TIME, errors, responseData);
+    }
+
+    /**
+     * Constructor with throttle time for version 8 and above.
+     * @param throttleTimeMs The time in milliseconds that this response was throttled
+     * @param errors Potential coordinator or group level error code (for api version 2 and later)
+     * @param responseData Fetched offset information grouped by topic-partition and by group
+     */
+    public OffsetFetchResponse(int throttleTimeMs, Map<String, Errors> errors, Map<String,
+        Map<TopicPartition, PartitionData>> responseData) {
+        super(ApiKeys.OFFSET_FETCH);
+        List<OffsetFetchResponseGroup> groupList = new ArrayList<>();
+        for (Entry<String, Map<TopicPartition, PartitionData>> entry : responseData.entrySet()) {
+            Map<String, OffsetFetchResponseTopics> offsetFetchResponseTopicsMap = new HashMap<>();
+            for (Entry<TopicPartition, PartitionData> partitionEntry :
+                responseData.get(entry.getKey()).entrySet()) {

Review comment:
       `entry.getValue()`

##########
File path: clients/src/main/java/org/apache/kafka/common/requests/OffsetFetchResponse.java
##########
@@ -214,6 +321,10 @@ public Errors error() {
         return responseData;
     }
 
+    public Map<TopicPartition, PartitionData> responseData(String groupId) {

Review comment:
       We should probably have a `responseData` method that takes version or ensure we have the data cached regardless of version

##########
File path: clients/src/main/java/org/apache/kafka/common/requests/OffsetFetchResponse.java
##########
@@ -185,21 +271,42 @@ public boolean hasError() {
         return error != Errors.NONE;
     }
 
+    public boolean groupHasError(String groupId) {
+        return groupLevelErrors.get(groupId) != Errors.NONE;
+    }
+
     public Errors error() {
         return error;
     }
 
+    public Errors groupLevelError(String groupId) {
+        return groupLevelErrors.get(groupId);
+    }
+
     @Override
     public Map<Errors, Integer> errorCounts() {
         Map<Errors, Integer> counts = new HashMap<>();
-        updateErrorCounts(counts, error);
-        data.topics().forEach(topic ->
-                topic.partitions().forEach(partition ->
+        if (!groupLevelErrors.isEmpty()) {
+            // built response with v8 or above
+            for (Map.Entry<String, Errors> entry : groupLevelErrors.entrySet()) {
+                updateErrorCounts(counts, entry.getValue());
+            }
+            for (OffsetFetchResponseGroup group : data.groupIds()) {
+                group.topics().forEach(topic ->
+                    topic.partitions().forEach(partition ->
                         updateErrorCounts(counts, Errors.forCode(partition.errorCode()))));
+            }
+        } else {
+            // built response with v0-v7
+            updateErrorCounts(counts, error);
+            data.topics().forEach(topic ->
+                topic.partitions().forEach(partition ->
+                    updateErrorCounts(counts, Errors.forCode(partition.errorCode()))));
+        }
         return counts;
     }
 
-    public Map<TopicPartition, PartitionData> responseData() {
+    public Map<TopicPartition, PartitionData> oldResponseData() {

Review comment:
       Is it an odd method name for a public method that we would use elsewhere. What happens when we add other versions - old/older/oldest?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org