You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by rs...@apache.org on 2022/07/14 12:47:57 UTC

[kafka] branch trunk updated: KAFKA-13043: Implement Admin APIs for offsetFetch batching (#10964)

This is an automated email from the ASF dual-hosted git repository.

rsivaram pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new beac86f049 KAFKA-13043: Implement Admin APIs for offsetFetch batching (#10964)
beac86f049 is described below

commit beac86f049385932309158c1cb49c8657e53f45f
Author: Sanjana Kaundinya <sk...@gmail.com>
AuthorDate: Thu Jul 14 05:47:34 2022 -0700

    KAFKA-13043: Implement Admin APIs for offsetFetch batching (#10964)
    
    This implements the AdminAPI portion of KIP-709: https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=173084258. The request/response protocol changes were implemented in 3.0.0. A new batched API has been introduced to list consumer offsets for different groups. For brokers older than 3.0.0, separate requests are sent for each group.
    
    Co-authored-by: Rajini Sivaram <ra...@googlemail.com>
    Co-authored-by: David Jacot <dj...@confluent.io>
    
    Reviewers: David Jacot <dj...@confluent.io>,  Rajini Sivaram <ra...@googlemail.com>
---
 .../java/org/apache/kafka/clients/admin/Admin.java |  36 ++-
 .../kafka/clients/admin/KafkaAdminClient.java      |  11 +-
 .../admin/ListConsumerGroupOffsetsOptions.java     |  14 +-
 .../admin/ListConsumerGroupOffsetsResult.java      |  56 +++-
 .../admin/ListConsumerGroupOffsetsSpec.java        |  79 ++++++
 .../clients/admin/internals/AdminApiDriver.java    |   3 +-
 .../admin/internals/CoordinatorStrategy.java       |   4 +
 .../internals/ListConsumerGroupOffsetsHandler.java | 128 +++++----
 .../kafka/common/requests/OffsetFetchResponse.java |  10 +-
 .../kafka/clients/admin/AdminClientTestUtils.java  |  12 +-
 .../kafka/clients/admin/KafkaAdminClientTest.java  | 206 ++++++++++++--
 .../kafka/clients/admin/MockAdminClient.java       |  16 +-
 .../ListConsumerGroupOffsetsHandlerTest.java       | 308 +++++++++++++++++++--
 .../kafka/clients/consumer/KafkaConsumerTest.java  |   6 +-
 .../internals/ConsumerCoordinatorTest.java         |  26 +-
 .../scala/kafka/admin/ConsumerGroupCommand.scala   |   8 +-
 .../kafka/admin/ConsumerGroupServiceTest.scala     |  22 +-
 .../scala/unit/kafka/server/RequestQuotaTest.scala |   2 +-
 .../processor/internals/StoreChangelogReader.java  |  12 +-
 .../internals/StoreChangelogReaderTest.java        |  11 +-
 20 files changed, 813 insertions(+), 157 deletions(-)

diff --git a/clients/src/main/java/org/apache/kafka/clients/admin/Admin.java b/clients/src/main/java/org/apache/kafka/clients/admin/Admin.java
index fdacc09db8..0698d29702 100644
--- a/clients/src/main/java/org/apache/kafka/clients/admin/Admin.java
+++ b/clients/src/main/java/org/apache/kafka/clients/admin/Admin.java
@@ -36,6 +36,7 @@ import org.apache.kafka.common.requests.LeaveGroupResponse;
 
 import java.time.Duration;
 import java.util.Collection;
+import java.util.Collections;
 import java.util.List;
 import java.util.Map;
 import java.util.Optional;
@@ -919,12 +920,20 @@ public interface Admin extends AutoCloseable {
      * @param options The options to use when listing the consumer group offsets.
      * @return The ListGroupOffsetsResult
      */
-    ListConsumerGroupOffsetsResult listConsumerGroupOffsets(String groupId, ListConsumerGroupOffsetsOptions options);
+    default ListConsumerGroupOffsetsResult listConsumerGroupOffsets(String groupId, ListConsumerGroupOffsetsOptions options) {
+        ListConsumerGroupOffsetsOptions listOptions = new ListConsumerGroupOffsetsOptions()
+            .requireStable(options.requireStable());
+        @SuppressWarnings("deprecation")
+        ListConsumerGroupOffsetsSpec groupSpec = new ListConsumerGroupOffsetsSpec()
+            .topicPartitions(options.topicPartitions());
+        return listConsumerGroupOffsets(Collections.singletonMap(groupId, groupSpec), listOptions);
+    }
 
     /**
      * List the consumer group offsets available in the cluster with the default options.
      * <p>
-     * This is a convenience method for {@link #listConsumerGroupOffsets(String, ListConsumerGroupOffsetsOptions)} with default options.
+     * This is a convenience method for {@link #listConsumerGroupOffsets(Map, ListConsumerGroupOffsetsOptions)}
+     * to list offsets of all partitions of one group with default options.
      *
      * @return The ListGroupOffsetsResult.
      */
@@ -932,6 +941,29 @@ public interface Admin extends AutoCloseable {
         return listConsumerGroupOffsets(groupId, new ListConsumerGroupOffsetsOptions());
     }
 
+    /**
+     * List the consumer group offsets available in the cluster for the specified consumer groups.
+     *
+     * @param groupSpecs Map of consumer group ids to a spec that specifies the topic partitions of the group to list offsets for.
+     *
+     * @param options The options to use when listing the consumer group offsets.
+     * @return The ListConsumerGroupOffsetsResult
+     */
+    ListConsumerGroupOffsetsResult listConsumerGroupOffsets(Map<String, ListConsumerGroupOffsetsSpec> groupSpecs, ListConsumerGroupOffsetsOptions options);
+
+    /**
+     * List the consumer group offsets available in the cluster for the specified groups with the default options.
+     * <p>
+     * This is a convenience method for
+     * {@link #listConsumerGroupOffsets(Map, ListConsumerGroupOffsetsOptions)} with default options.
+     *
+     * @param groupSpecs Map of consumer group ids to a spec that specifies the topic partitions of the group to list offsets for.
+     * @return The ListConsumerGroupOffsetsResult.
+     */
+    default ListConsumerGroupOffsetsResult listConsumerGroupOffsets(Map<String, ListConsumerGroupOffsetsSpec> groupSpecs) {
+        return listConsumerGroupOffsets(groupSpecs, new ListConsumerGroupOffsetsOptions());
+    }
+
     /**
      * Delete consumer groups from the cluster.
      *
diff --git a/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java b/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java
index 2b2642e351..41eb27a1dd 100644
--- a/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java
+++ b/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java
@@ -3401,13 +3401,14 @@ public class KafkaAdminClient extends AdminClient {
     }
 
     @Override
-    public ListConsumerGroupOffsetsResult listConsumerGroupOffsets(final String groupId,
-                                                                   final ListConsumerGroupOffsetsOptions options) {
+    public ListConsumerGroupOffsetsResult listConsumerGroupOffsets(Map<String, ListConsumerGroupOffsetsSpec> groupSpecs,
+                                                                   ListConsumerGroupOffsetsOptions options) {
         SimpleAdminApiFuture<CoordinatorKey, Map<TopicPartition, OffsetAndMetadata>> future =
-                ListConsumerGroupOffsetsHandler.newFuture(groupId);
-        ListConsumerGroupOffsetsHandler handler = new ListConsumerGroupOffsetsHandler(groupId, options.topicPartitions(), options.requireStable(), logContext);
+                ListConsumerGroupOffsetsHandler.newFuture(groupSpecs.keySet());
+        ListConsumerGroupOffsetsHandler handler =
+            new ListConsumerGroupOffsetsHandler(groupSpecs, options.requireStable(), logContext);
         invokeDriver(handler, future, options.timeoutMs);
-        return new ListConsumerGroupOffsetsResult(future.get(CoordinatorKey.byGroupId(groupId)));
+        return new ListConsumerGroupOffsetsResult(future.all());
     }
 
     @Override
diff --git a/clients/src/main/java/org/apache/kafka/clients/admin/ListConsumerGroupOffsetsOptions.java b/clients/src/main/java/org/apache/kafka/clients/admin/ListConsumerGroupOffsetsOptions.java
index 292a47ef39..44d3a40732 100644
--- a/clients/src/main/java/org/apache/kafka/clients/admin/ListConsumerGroupOffsetsOptions.java
+++ b/clients/src/main/java/org/apache/kafka/clients/admin/ListConsumerGroupOffsetsOptions.java
@@ -23,23 +23,28 @@ import org.apache.kafka.common.annotation.InterfaceStability;
 import java.util.List;
 
 /**
- * Options for {@link Admin#listConsumerGroupOffsets(String)}.
+ * Options for {@link Admin#listConsumerGroupOffsets(java.util.Map)} and {@link Admin#listConsumerGroupOffsets(String)}.
  * <p>
  * The API of this class is evolving, see {@link Admin} for details.
  */
 @InterfaceStability.Evolving
 public class ListConsumerGroupOffsetsOptions extends AbstractOptions<ListConsumerGroupOffsetsOptions> {
 
-    private List<TopicPartition> topicPartitions = null;
+    private List<TopicPartition> topicPartitions;
     private boolean requireStable = false;
 
     /**
      * Set the topic partitions to list as part of the result.
      * {@code null} includes all topic partitions.
+     * <p>
+     * @deprecated Since 3.3.
+     * Use {@link Admin#listConsumerGroupOffsets(java.util.Map, ListConsumerGroupOffsetsOptions)}
+     * to specify topic partitions.
      *
      * @param topicPartitions List of topic partitions to include
      * @return This ListGroupOffsetsOptions
      */
+    @Deprecated
     public ListConsumerGroupOffsetsOptions topicPartitions(List<TopicPartition> topicPartitions) {
         this.topicPartitions = topicPartitions;
         return this;
@@ -55,7 +60,12 @@ public class ListConsumerGroupOffsetsOptions extends AbstractOptions<ListConsume
 
     /**
      * Returns a list of topic partitions to add as part of the result.
+     * <p>
+     * @deprecated Since 3.3.
+     * Use {@link Admin#listConsumerGroupOffsets(java.util.Map, ListConsumerGroupOffsetsOptions)}
+     * to specify topic partitions.
      */
+    @Deprecated
     public List<TopicPartition> topicPartitions() {
         return topicPartitions;
     }
diff --git a/clients/src/main/java/org/apache/kafka/clients/admin/ListConsumerGroupOffsetsResult.java b/clients/src/main/java/org/apache/kafka/clients/admin/ListConsumerGroupOffsetsResult.java
index 48f4531418..2136e33a40 100644
--- a/clients/src/main/java/org/apache/kafka/clients/admin/ListConsumerGroupOffsetsResult.java
+++ b/clients/src/main/java/org/apache/kafka/clients/admin/ListConsumerGroupOffsetsResult.java
@@ -17,25 +17,32 @@
 
 package org.apache.kafka.clients.admin;
 
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.concurrent.ExecutionException;
+import java.util.stream.Collectors;
+
+import org.apache.kafka.clients.admin.internals.CoordinatorKey;
 import org.apache.kafka.clients.consumer.OffsetAndMetadata;
 import org.apache.kafka.common.KafkaFuture;
 import org.apache.kafka.common.TopicPartition;
 import org.apache.kafka.common.annotation.InterfaceStability;
 
-import java.util.Map;
-
 /**
- * The result of the {@link Admin#listConsumerGroupOffsets(String)} call.
+ * The result of the {@link Admin#listConsumerGroupOffsets(Map)} and
+ * {@link Admin#listConsumerGroupOffsets(String)} call.
  * <p>
  * The API of this class is evolving, see {@link Admin} for details.
  */
 @InterfaceStability.Evolving
 public class ListConsumerGroupOffsetsResult {
 
-    final KafkaFuture<Map<TopicPartition, OffsetAndMetadata>> future;
+    final Map<String, KafkaFuture<Map<TopicPartition, OffsetAndMetadata>>> futures;
 
-    ListConsumerGroupOffsetsResult(KafkaFuture<Map<TopicPartition, OffsetAndMetadata>> future) {
-        this.future = future;
+    ListConsumerGroupOffsetsResult(final Map<CoordinatorKey, KafkaFuture<Map<TopicPartition, OffsetAndMetadata>>> futures) {
+        this.futures = futures.entrySet().stream()
+                .collect(Collectors.toMap(e -> e.getKey().idValue, Entry::getValue));
     }
 
     /**
@@ -43,7 +50,42 @@ public class ListConsumerGroupOffsetsResult {
      * If the group does not have a committed offset for this partition, the corresponding value in the returned map will be null.
      */
     public KafkaFuture<Map<TopicPartition, OffsetAndMetadata>> partitionsToOffsetAndMetadata() {
-        return future;
+        if (futures.size() != 1) {
+            throw new IllegalStateException("Offsets from multiple consumer groups were requested. " +
+                    "Use partitionsToOffsetAndMetadata(groupId) instead to get future for a specific group.");
+        }
+        return futures.values().iterator().next();
     }
 
+    /**
+     * Return a future which yields a map of topic partitions to OffsetAndMetadata objects for
+     * the specified group. If the group doesn't have a committed offset for a specific
+     * partition, the corresponding value in the returned map will be null.
+     */
+    public KafkaFuture<Map<TopicPartition, OffsetAndMetadata>> partitionsToOffsetAndMetadata(String groupId) {
+        if (!futures.containsKey(groupId))
+            throw new IllegalArgumentException("Offsets for consumer group '" + groupId + "' were not requested.");
+        return futures.get(groupId);
+    }
+
+    /**
+     * Return a future which yields all Map<String, Map<TopicPartition, OffsetAndMetadata> objects,
+     * if requests for all the groups succeed.
+     */
+    public KafkaFuture<Map<String, Map<TopicPartition, OffsetAndMetadata>>> all() {
+        return KafkaFuture.allOf(futures.values().toArray(new KafkaFuture[0])).thenApply(
+            nil -> {
+                Map<String, Map<TopicPartition, OffsetAndMetadata>> listedConsumerGroupOffsets = new HashMap<>(futures.size());
+                futures.forEach((key, future) -> {
+                    try {
+                        listedConsumerGroupOffsets.put(key, future.get());
+                    } catch (InterruptedException | ExecutionException e) {
+                        // This should be unreachable, since the KafkaFuture#allOf already ensured
+                        // that all of the futures completed successfully.
+                        throw new RuntimeException(e);
+                    }
+                });
+                return listedConsumerGroupOffsets;
+            });
+    }
 }
diff --git a/clients/src/main/java/org/apache/kafka/clients/admin/ListConsumerGroupOffsetsSpec.java b/clients/src/main/java/org/apache/kafka/clients/admin/ListConsumerGroupOffsetsSpec.java
new file mode 100644
index 0000000000..83858e49c8
--- /dev/null
+++ b/clients/src/main/java/org/apache/kafka/clients/admin/ListConsumerGroupOffsetsSpec.java
@@ -0,0 +1,79 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.clients.admin;
+
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.annotation.InterfaceStability;
+
+import java.util.Collection;
+import java.util.Objects;
+
+/**
+ * Specification of consumer group offsets to list using {@link Admin#listConsumerGroupOffsets(java.util.Map)}.
+ *
+ * The API of this class is evolving, see {@link Admin} for details.
+ */
+@InterfaceStability.Evolving
+public class ListConsumerGroupOffsetsSpec {
+
+    private Collection<TopicPartition> topicPartitions;
+
+    /**
+     * Set the topic partitions whose offsets are to be listed for a consumer group.
+     * {@code null} includes all topic partitions.
+     *
+     * @param topicPartitions List of topic partitions to include
+     * @return This ListConsumerGroupOffsetSpec
+     */
+    public ListConsumerGroupOffsetsSpec topicPartitions(Collection<TopicPartition> topicPartitions) {
+        this.topicPartitions = topicPartitions;
+        return this;
+    }
+
+    /**
+     * Returns the topic partitions whose offsets are to be listed for a consumer group.
+     * {@code null} indicates that offsets of all partitions of the group are to be listed.
+     */
+    public Collection<TopicPartition> topicPartitions() {
+        return topicPartitions;
+    }
+
+    @Override
+    public boolean equals(Object o) {
+        if (this == o) {
+            return true;
+        }
+        if (!(o instanceof ListConsumerGroupOffsetsSpec)) {
+            return false;
+        }
+        ListConsumerGroupOffsetsSpec that = (ListConsumerGroupOffsetsSpec) o;
+        return Objects.equals(topicPartitions, that.topicPartitions);
+    }
+
+    @Override
+    public int hashCode() {
+        return Objects.hash(topicPartitions);
+    }
+
+    @Override
+    public String toString() {
+        return "ListConsumerGroupOffsetsSpec(" +
+                "topicPartitions=" + topicPartitions +
+                ')';
+    }
+}
diff --git a/clients/src/main/java/org/apache/kafka/clients/admin/internals/AdminApiDriver.java b/clients/src/main/java/org/apache/kafka/clients/admin/internals/AdminApiDriver.java
index d00db4b18c..0e1b03d964 100644
--- a/clients/src/main/java/org/apache/kafka/clients/admin/internals/AdminApiDriver.java
+++ b/clients/src/main/java/org/apache/kafka/clients/admin/internals/AdminApiDriver.java
@@ -21,6 +21,7 @@ import org.apache.kafka.common.errors.DisconnectException;
 import org.apache.kafka.common.requests.AbstractRequest;
 import org.apache.kafka.common.requests.AbstractResponse;
 import org.apache.kafka.common.requests.FindCoordinatorRequest.NoBatchedFindCoordinatorsException;
+import org.apache.kafka.common.requests.OffsetFetchRequest.NoBatchedOffsetFetchRequestException;
 import org.apache.kafka.common.utils.LogContext;
 import org.slf4j.Logger;
 
@@ -253,7 +254,7 @@ public class AdminApiDriver<K, V> {
                 .collect(Collectors.toSet());
             retryLookup(keysToUnmap);
 
-        } else if (t instanceof NoBatchedFindCoordinatorsException) {
+        } else if (t instanceof NoBatchedFindCoordinatorsException || t instanceof NoBatchedOffsetFetchRequestException) {
             ((CoordinatorStrategy) handler.lookupStrategy()).disableBatch();
             Set<K> keysToUnmap = spec.keys.stream()
                 .filter(future.lookupKeys()::contains)
diff --git a/clients/src/main/java/org/apache/kafka/clients/admin/internals/CoordinatorStrategy.java b/clients/src/main/java/org/apache/kafka/clients/admin/internals/CoordinatorStrategy.java
index e6fc0d624a..02b68527c3 100644
--- a/clients/src/main/java/org/apache/kafka/clients/admin/internals/CoordinatorStrategy.java
+++ b/clients/src/main/java/org/apache/kafka/clients/admin/internals/CoordinatorStrategy.java
@@ -120,6 +120,10 @@ public class CoordinatorStrategy implements AdminApiLookupStrategy<CoordinatorKe
         batch = false;
     }
 
+    public boolean batch() {
+        return batch;
+    }
+
     private CoordinatorKey requireSingletonAndType(Set<CoordinatorKey> keys) {
         if (keys.size() != 1) {
             throw new IllegalArgumentException("Unexpected size of key set: expected 1, but got " + keys.size());
diff --git a/clients/src/main/java/org/apache/kafka/clients/admin/internals/ListConsumerGroupOffsetsHandler.java b/clients/src/main/java/org/apache/kafka/clients/admin/internals/ListConsumerGroupOffsetsHandler.java
index 08648821f7..21c7d8d488 100644
--- a/clients/src/main/java/org/apache/kafka/clients/admin/internals/ListConsumerGroupOffsetsHandler.java
+++ b/clients/src/main/java/org/apache/kafka/clients/admin/internals/ListConsumerGroupOffsetsHandler.java
@@ -17,14 +17,16 @@
 package org.apache.kafka.clients.admin.internals;
 
 import java.util.ArrayList;
+import java.util.Collection;
 import java.util.Collections;
 import java.util.HashMap;
-import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
 import java.util.Optional;
 import java.util.Set;
+import java.util.stream.Collectors;
 
+import org.apache.kafka.clients.admin.ListConsumerGroupOffsetsSpec;
 import org.apache.kafka.clients.consumer.OffsetAndMetadata;
 import org.apache.kafka.common.Node;
 import org.apache.kafka.common.TopicPartition;
@@ -36,39 +38,26 @@ import org.apache.kafka.common.requests.FindCoordinatorRequest.CoordinatorType;
 import org.apache.kafka.common.utils.LogContext;
 import org.slf4j.Logger;
 
-public class ListConsumerGroupOffsetsHandler extends AdminApiHandler.Batched<CoordinatorKey, Map<TopicPartition, OffsetAndMetadata>> {
+public class ListConsumerGroupOffsetsHandler implements AdminApiHandler<CoordinatorKey, Map<TopicPartition, OffsetAndMetadata>> {
 
-    private final CoordinatorKey groupId;
-    private final List<TopicPartition> partitions;
     private final boolean requireStable;
+    private final Map<String, ListConsumerGroupOffsetsSpec> groupSpecs;
     private final Logger log;
-    private final AdminApiLookupStrategy<CoordinatorKey> lookupStrategy;
+    private final CoordinatorStrategy lookupStrategy;
 
     public ListConsumerGroupOffsetsHandler(
-        String groupId,
-        List<TopicPartition> partitions,
-        LogContext logContext
-    ) {
-        this(groupId, partitions, false, logContext);
-    }
-
-    public ListConsumerGroupOffsetsHandler(
-        String groupId,
-        List<TopicPartition> partitions,
+        Map<String, ListConsumerGroupOffsetsSpec> groupSpecs,
         boolean requireStable,
         LogContext logContext
     ) {
-        this.groupId = CoordinatorKey.byGroupId(groupId);
-        this.partitions = partitions;
-        this.requireStable = requireStable;
         this.log = logContext.logger(ListConsumerGroupOffsetsHandler.class);
         this.lookupStrategy = new CoordinatorStrategy(CoordinatorType.GROUP, logContext);
+        this.groupSpecs = groupSpecs;
+        this.requireStable = requireStable;
     }
 
-    public static AdminApiFuture.SimpleAdminApiFuture<CoordinatorKey, Map<TopicPartition, OffsetAndMetadata>> newFuture(
-        String groupId
-    ) {
-        return AdminApiFuture.forKeys(Collections.singleton(CoordinatorKey.byGroupId(groupId)));
+    public static AdminApiFuture.SimpleAdminApiFuture<CoordinatorKey, Map<TopicPartition, OffsetAndMetadata>> newFuture(Collection<String> groupIds) {
+        return AdminApiFuture.forKeys(coordinatorKeys(groupIds));
     }
 
     @Override
@@ -82,16 +71,45 @@ public class ListConsumerGroupOffsetsHandler extends AdminApiHandler.Batched<Coo
     }
 
     private void validateKeys(Set<CoordinatorKey> groupIds) {
-        if (!groupIds.equals(Collections.singleton(groupId))) {
+        Set<CoordinatorKey> keys = coordinatorKeys(groupSpecs.keySet());
+        if (!keys.containsAll(groupIds)) {
             throw new IllegalArgumentException("Received unexpected group ids " + groupIds +
-                " (expected only " + Collections.singleton(groupId) + ")");
+                    " (expected one of " + keys + ")");
         }
     }
 
+    private static Set<CoordinatorKey> coordinatorKeys(Collection<String> groupIds) {
+        return groupIds.stream()
+           .map(CoordinatorKey::byGroupId)
+           .collect(Collectors.toSet());
+    }
+
+    public OffsetFetchRequest.Builder buildBatchedRequest(Set<CoordinatorKey> groupIds) {
+        // Create a map that only contains the consumer groups owned by the coordinator.
+        Map<String, List<TopicPartition>> coordinatorGroupIdToTopicPartitions = new HashMap<>(groupIds.size());
+        groupIds.forEach(g -> {
+            ListConsumerGroupOffsetsSpec spec = groupSpecs.get(g.idValue);
+            List<TopicPartition> partitions = spec.topicPartitions() != null ? new ArrayList<>(spec.topicPartitions()) : null;
+            coordinatorGroupIdToTopicPartitions.put(g.idValue, partitions);
+        });
+
+        return new OffsetFetchRequest.Builder(coordinatorGroupIdToTopicPartitions, requireStable, false);
+    }
+
     @Override
-    public OffsetFetchRequest.Builder buildBatchedRequest(int coordinatorId, Set<CoordinatorKey> groupIds) {
+    public Collection<RequestAndKeys<CoordinatorKey>> buildRequest(int brokerId, Set<CoordinatorKey> groupIds) {
         validateKeys(groupIds);
-        return new OffsetFetchRequest.Builder(groupId.idValue, requireStable, partitions, false);
+
+        // When the OffsetFetchRequest fails with NoBatchedOffsetFetchRequestException, we completely disable
+        // the batching end-to-end, including the FindCoordinatorRequest.
+        if (lookupStrategy.batch()) {
+            return Collections.singletonList(new RequestAndKeys<>(buildBatchedRequest(groupIds), groupIds));
+        } else {
+            return groupIds.stream().map(groupId -> {
+                Set<CoordinatorKey> keys = Collections.singleton(groupId);
+                return new RequestAndKeys<>(buildBatchedRequest(keys), keys);
+            }).collect(Collectors.toList());
+        }
     }
 
     @Override
@@ -104,44 +122,46 @@ public class ListConsumerGroupOffsetsHandler extends AdminApiHandler.Batched<Coo
 
         final OffsetFetchResponse response = (OffsetFetchResponse) abstractResponse;
 
-        // the groupError will contain the group level error for v0-v8 OffsetFetchResponse
-        Errors groupError = response.groupLevelError(groupId.idValue);
-        if (groupError != Errors.NONE) {
-            final Map<CoordinatorKey, Throwable> failed = new HashMap<>();
-            final Set<CoordinatorKey> groupsToUnmap = new HashSet<>();
-
-            handleGroupError(groupId, groupError, failed, groupsToUnmap);
-
-            return new ApiResult<>(Collections.emptyMap(), failed, new ArrayList<>(groupsToUnmap));
-        } else {
-            final Map<TopicPartition, OffsetAndMetadata> groupOffsetsListing = new HashMap<>();
-
-            response.partitionDataMap(groupId.idValue).forEach((topicPartition, partitionData) -> {
-                final Errors error = partitionData.error;
-                if (error == Errors.NONE) {
-                    final long offset = partitionData.offset;
-                    final String metadata = partitionData.metadata;
-                    final Optional<Integer> leaderEpoch = partitionData.leaderEpoch;
-                    // Negative offset indicates that the group has no committed offset for this partition
-                    if (offset < 0) {
-                        groupOffsetsListing.put(topicPartition, null);
+        Map<CoordinatorKey, Map<TopicPartition, OffsetAndMetadata>> completed = new HashMap<>();
+        Map<CoordinatorKey, Throwable> failed = new HashMap<>();
+        List<CoordinatorKey> unmapped = new ArrayList<>();
+        for (CoordinatorKey coordinatorKey : groupIds) {
+            String group = coordinatorKey.idValue;
+            if (response.groupHasError(group)) {
+                handleGroupError(CoordinatorKey.byGroupId(group), response.groupLevelError(group), failed, unmapped);
+            } else {
+                final Map<TopicPartition, OffsetAndMetadata> groupOffsetsListing = new HashMap<>();
+                Map<TopicPartition, OffsetFetchResponse.PartitionData> responseData = response.partitionDataMap(group);
+                for (Map.Entry<TopicPartition, OffsetFetchResponse.PartitionData> partitionEntry : responseData.entrySet()) {
+                    final TopicPartition topicPartition = partitionEntry.getKey();
+                    OffsetFetchResponse.PartitionData partitionData = partitionEntry.getValue();
+                    final Errors error = partitionData.error;
+
+                    if (error == Errors.NONE) {
+                        final long offset = partitionData.offset;
+                        final String metadata = partitionData.metadata;
+                        final Optional<Integer> leaderEpoch = partitionData.leaderEpoch;
+                        // Negative offset indicates that the group has no committed offset for this partition
+                        if (offset < 0) {
+                            groupOffsetsListing.put(topicPartition, null);
+                        } else {
+                            groupOffsetsListing.put(topicPartition, new OffsetAndMetadata(offset, leaderEpoch, metadata));
+                        }
                     } else {
-                        groupOffsetsListing.put(topicPartition, new OffsetAndMetadata(offset, leaderEpoch, metadata));
+                        log.warn("Skipping return offset for {} due to error {}.", topicPartition, error);
                     }
-                } else {
-                    log.warn("Skipping return offset for {} due to error {}.", topicPartition, error);
                 }
-            });
-
-            return ApiResult.completed(groupId, groupOffsetsListing);
+                completed.put(CoordinatorKey.byGroupId(group), groupOffsetsListing);
+            }
         }
+        return new ApiResult<>(completed, failed, unmapped);
     }
 
     private void handleGroupError(
         CoordinatorKey groupId,
         Errors error,
         Map<CoordinatorKey, Throwable> failed,
-        Set<CoordinatorKey> groupsToUnmap
+        List<CoordinatorKey> groupsToUnmap
     ) {
         switch (error) {
             case GROUP_AUTHORIZATION_FAILED:
diff --git a/clients/src/main/java/org/apache/kafka/common/requests/OffsetFetchResponse.java b/clients/src/main/java/org/apache/kafka/common/requests/OffsetFetchResponse.java
index 213182ec8c..4e25984668 100644
--- a/clients/src/main/java/org/apache/kafka/common/requests/OffsetFetchResponse.java
+++ b/clients/src/main/java/org/apache/kafka/common/requests/OffsetFetchResponse.java
@@ -173,8 +173,8 @@ public class OffsetFetchResponse extends AbstractResponse {
      * @param responseData Fetched offset information grouped by topic-partition and by group
      */
     public OffsetFetchResponse(int throttleTimeMs,
-                               Map<String, Errors> errors, Map<String,
-                               Map<TopicPartition, PartitionData>> responseData) {
+                               Map<String, Errors> errors,
+                               Map<String, Map<TopicPartition, PartitionData>> responseData) {
         super(ApiKeys.OFFSET_FETCH);
         List<OffsetFetchResponseGroup> groupList = new ArrayList<>();
         for (Entry<String, Map<TopicPartition, PartitionData>> entry : responseData.entrySet()) {
@@ -250,7 +250,11 @@ public class OffsetFetchResponse extends AbstractResponse {
     }
 
     public boolean groupHasError(String groupId) {
-        return groupLevelErrors.get(groupId) != Errors.NONE;
+        Errors error = groupLevelErrors.get(groupId);
+        if (error == null) {
+            return this.error != null && this.error != Errors.NONE;
+        }
+        return error != Errors.NONE;
     }
 
     public Errors error() {
diff --git a/clients/src/test/java/org/apache/kafka/clients/admin/AdminClientTestUtils.java b/clients/src/test/java/org/apache/kafka/clients/admin/AdminClientTestUtils.java
index 6f98a166b1..d8b9f427d6 100644
--- a/clients/src/test/java/org/apache/kafka/clients/admin/AdminClientTestUtils.java
+++ b/clients/src/test/java/org/apache/kafka/clients/admin/AdminClientTestUtils.java
@@ -24,6 +24,7 @@ import java.util.stream.Collectors;
 import org.apache.kafka.clients.HostResolver;
 import org.apache.kafka.clients.admin.CreateTopicsResult.TopicMetadataAndConfig;
 import org.apache.kafka.clients.admin.internals.MetadataOperationContext;
+import org.apache.kafka.clients.admin.internals.CoordinatorKey;
 import org.apache.kafka.clients.consumer.OffsetAndMetadata;
 import org.apache.kafka.common.KafkaException;
 import org.apache.kafka.common.KafkaFuture;
@@ -104,14 +105,17 @@ public class AdminClientTestUtils {
                 .collect(Collectors.toMap(Map.Entry::getKey, e -> KafkaFuture.completedFuture(e.getValue()))));
     }
 
-    public static ListConsumerGroupOffsetsResult listConsumerGroupOffsetsResult(Map<TopicPartition, OffsetAndMetadata> offsets) {
-        return new ListConsumerGroupOffsetsResult(KafkaFuture.completedFuture(offsets));
+    public static ListConsumerGroupOffsetsResult listConsumerGroupOffsetsResult(Map<String, Map<TopicPartition, OffsetAndMetadata>> offsets) {
+        Map<CoordinatorKey, KafkaFuture<Map<TopicPartition, OffsetAndMetadata>>> resultMap = offsets.entrySet().stream()
+            .collect(Collectors.toMap(e -> CoordinatorKey.byGroupId(e.getKey()),
+                                      e -> KafkaFutureImpl.completedFuture(e.getValue())));
+        return new ListConsumerGroupOffsetsResult(resultMap);
     }
 
-    public static ListConsumerGroupOffsetsResult listConsumerGroupOffsetsResult(KafkaException exception) {
+    public static ListConsumerGroupOffsetsResult listConsumerGroupOffsetsResult(String group, KafkaException exception) {
         final KafkaFutureImpl<Map<TopicPartition, OffsetAndMetadata>> future = new KafkaFutureImpl<>();
         future.completeExceptionally(exception);
-        return new ListConsumerGroupOffsetsResult(future);
+        return new ListConsumerGroupOffsetsResult(Collections.singletonMap(CoordinatorKey.byGroupId(group), future));
     }
 
     /**
diff --git a/clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java b/clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java
index 61a2aaa00b..3d285a45f7 100644
--- a/clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java
+++ b/clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java
@@ -108,6 +108,7 @@ import org.apache.kafka.common.message.DescribeUserScramCredentialsResponseData;
 import org.apache.kafka.common.message.DescribeUserScramCredentialsResponseData.CredentialInfo;
 import org.apache.kafka.common.message.ElectLeadersResponseData.PartitionResult;
 import org.apache.kafka.common.message.ElectLeadersResponseData.ReplicaElectionResult;
+import org.apache.kafka.common.message.FindCoordinatorRequestData;
 import org.apache.kafka.common.message.FindCoordinatorResponseData;
 import org.apache.kafka.common.message.IncrementalAlterConfigsResponseData;
 import org.apache.kafka.common.message.IncrementalAlterConfigsResponseData.AlterConfigsResourceResponse;
@@ -192,6 +193,7 @@ import org.apache.kafka.common.requests.OffsetCommitResponse;
 import org.apache.kafka.common.requests.OffsetDeleteResponse;
 import org.apache.kafka.common.requests.OffsetFetchRequest;
 import org.apache.kafka.common.requests.OffsetFetchResponse;
+import org.apache.kafka.common.requests.OffsetFetchResponse.PartitionData;
 import org.apache.kafka.common.requests.RequestTestUtils;
 import org.apache.kafka.common.requests.UnregisterBrokerResponse;
 import org.apache.kafka.common.requests.UpdateFeaturesRequest;
@@ -224,6 +226,7 @@ import java.util.HashSet;
 import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
+import java.util.Map.Entry;
 import java.util.Optional;
 import java.util.OptionalInt;
 import java.util.OptionalLong;
@@ -266,6 +269,7 @@ import static org.junit.jupiter.api.Assertions.fail;
 public class KafkaAdminClientTest {
     private static final Logger log = LoggerFactory.getLogger(KafkaAdminClientTest.class);
     private static final String GROUP_ID = "group-0";
+    private static final int THROTTLE = 10;
 
     @Test
     public void testDefaultApiTimeoutAndRequestTimeoutConflicts() {
@@ -501,6 +505,21 @@ public class KafkaAdminClientTest {
         return FindCoordinatorResponse.prepareOldResponse(error, node);
     }
 
+    private static FindCoordinatorResponse prepareBatchedFindCoordinatorResponse(Errors error, Node node, Collection<String> groups) {
+        FindCoordinatorResponseData data = new FindCoordinatorResponseData();
+        List<FindCoordinatorResponseData.Coordinator> coordinators = groups.stream()
+                .map(group -> new FindCoordinatorResponseData.Coordinator()
+                        .setErrorCode(error.code())
+                        .setErrorMessage(error.message())
+                        .setKey(group)
+                        .setHost(node.host())
+                        .setPort(node.port())
+                        .setNodeId(node.id()))
+                .collect(Collectors.toList());
+        data.setCoordinators(coordinators);
+        return new FindCoordinatorResponse(data);
+    }
+
     private static MetadataResponse prepareMetadataResponse(Cluster cluster, Errors error) {
         List<MetadataResponseTopic> metadata = new ArrayList<>();
         for (String topic : cluster.topics()) {
@@ -3067,9 +3086,11 @@ public class KafkaAdminClientTest {
             env.kafkaClient().prepareResponse(prepareFindCoordinatorResponse(Errors.NONE, env.cluster().controller()));
 
             final TopicPartition tp1 = new TopicPartition("A", 0);
-            final ListConsumerGroupOffsetsOptions options = new ListConsumerGroupOffsetsOptions();
-            options.topicPartitions(Collections.singletonList(tp1)).requireStable(true);
-            final ListConsumerGroupOffsetsResult result = env.adminClient().listConsumerGroupOffsets(GROUP_ID, options);
+            final ListConsumerGroupOffsetsOptions options = new ListConsumerGroupOffsetsOptions()
+                    .requireStable(true);
+            final ListConsumerGroupOffsetsSpec groupSpec = new ListConsumerGroupOffsetsSpec()
+                    .topicPartitions(Collections.singletonList(tp1));
+            env.adminClient().listConsumerGroupOffsets(Collections.singletonMap(GROUP_ID, groupSpec), options);
 
             final MockClient mockClient = env.kafkaClient();
             TestUtils.waitForCondition(() -> {
@@ -3077,11 +3098,11 @@ public class KafkaAdminClientTest {
                 if (clientRequest != null) {
                     OffsetFetchRequestData data = ((OffsetFetchRequest.Builder) clientRequest.requestBuilder()).data;
                     return data.requireStable() &&
-                        data.topics().get(0).name().equals("A") &&
-                        data.topics().get(0).partitionIndexes().equals(Collections.singletonList(0));
+                        data.groups().get(0).topics().get(0).name().equals("A") &&
+                        data.groups().get(0).topics().get(0).partitionIndexes().equals(Collections.singletonList(0));
                 }
                 return false;
-            }, "Failed awaiting ListConsumerGroupOffsets request");
+            }, "Failed awaiting ListConsumerGroupOfsets request");
         }
     }
 
@@ -3095,12 +3116,11 @@ public class KafkaAdminClientTest {
             env.kafkaClient().setNodeApiVersions(NodeApiVersions.create());
 
             env.kafkaClient().prepareResponse(prepareFindCoordinatorResponse(Errors.NONE, env.cluster().controller()));
-            env.kafkaClient().prepareResponse(new OffsetFetchResponse(Errors.NOT_COORDINATOR, Collections.emptyMap()));
+            env.kafkaClient().prepareResponse(offsetFetchResponse(Errors.NOT_COORDINATOR, Collections.emptyMap()));
             env.kafkaClient().prepareResponse(prepareFindCoordinatorResponse(Errors.NONE, env.cluster().controller()));
 
             final ListConsumerGroupOffsetsResult result = env.adminClient().listConsumerGroupOffsets(GROUP_ID);
 
-
             TestUtils.assertFutureError(result.partitionsToOffsetAndMetadata(), TimeoutException.class);
         }
     }
@@ -3124,16 +3144,16 @@ public class KafkaAdminClientTest {
             mockClient.prepareResponse(body -> {
                 firstAttemptTime.set(time.milliseconds());
                 return true;
-            }, new OffsetFetchResponse(Errors.NOT_COORDINATOR, Collections.emptyMap()));
+            }, offsetFetchResponse(Errors.NOT_COORDINATOR, Collections.emptyMap()));
 
             mockClient.prepareResponse(prepareFindCoordinatorResponse(Errors.NONE, env.cluster().controller()));
 
             mockClient.prepareResponse(body -> {
                 secondAttemptTime.set(time.milliseconds());
                 return true;
-            }, new OffsetFetchResponse(Errors.NONE, Collections.emptyMap()));
+            }, offsetFetchResponse(Errors.NONE, Collections.emptyMap()));
 
-            final KafkaFuture<Map<TopicPartition, OffsetAndMetadata>> future = env.adminClient().listConsumerGroupOffsets("group-0").partitionsToOffsetAndMetadata();
+            final KafkaFuture<Map<TopicPartition, OffsetAndMetadata>> future = env.adminClient().listConsumerGroupOffsets(GROUP_ID).partitionsToOffsetAndMetadata();
 
             TestUtils.waitForCondition(() -> mockClient.numAwaitingResponses() == 1, "Failed awaiting ListConsumerGroupOffsets first request failure");
             TestUtils.waitForCondition(() -> ((KafkaAdminClient) env.adminClient()).numPendingCalls() == 1, "Failed to add retry ListConsumerGroupOffsets call on first failure");
@@ -3157,7 +3177,8 @@ public class KafkaAdminClientTest {
                 prepareFindCoordinatorResponse(Errors.NONE, env.cluster().controller()));
 
             env.kafkaClient().prepareResponse(
-                new OffsetFetchResponse(Errors.COORDINATOR_LOAD_IN_PROGRESS, Collections.emptyMap()));
+                offsetFetchResponse(Errors.COORDINATOR_LOAD_IN_PROGRESS, Collections.emptyMap()));
+
             /*
              * We need to return two responses here, one for NOT_COORDINATOR call when calling list consumer offsets
              * api using coordinator that has moved. This will retry whole operation. So we need to again respond with a
@@ -3166,19 +3187,19 @@ public class KafkaAdminClientTest {
              * And the same reason for the following COORDINATOR_NOT_AVAILABLE error response
              */
             env.kafkaClient().prepareResponse(
-                new OffsetFetchResponse(Errors.NOT_COORDINATOR, Collections.emptyMap()));
+                offsetFetchResponse(Errors.NOT_COORDINATOR, Collections.emptyMap()));
 
             env.kafkaClient().prepareResponse(
                 prepareFindCoordinatorResponse(Errors.NONE, env.cluster().controller()));
 
             env.kafkaClient().prepareResponse(
-                new OffsetFetchResponse(Errors.COORDINATOR_NOT_AVAILABLE, Collections.emptyMap()));
+                offsetFetchResponse(Errors.COORDINATOR_NOT_AVAILABLE, Collections.emptyMap()));
 
             env.kafkaClient().prepareResponse(
                 prepareFindCoordinatorResponse(Errors.NONE, env.cluster().controller()));
 
             env.kafkaClient().prepareResponse(
-                new OffsetFetchResponse(Errors.NONE, Collections.emptyMap()));
+                offsetFetchResponse(Errors.NONE, Collections.emptyMap()));
 
             final ListConsumerGroupOffsetsResult errorResult1 = env.adminClient().listConsumerGroupOffsets(GROUP_ID);
 
@@ -3199,8 +3220,7 @@ public class KafkaAdminClientTest {
                 env.kafkaClient().prepareResponse(
                     prepareFindCoordinatorResponse(Errors.NONE, env.cluster().controller()));
 
-                env.kafkaClient().prepareResponse(
-                    new OffsetFetchResponse(error, Collections.emptyMap()));
+                env.kafkaClient().prepareResponse(offsetFetchResponse(error, Collections.emptyMap()));
 
                 ListConsumerGroupOffsetsResult errorResult = env.adminClient().listConsumerGroupOffsets(GROUP_ID);
 
@@ -3220,7 +3240,7 @@ public class KafkaAdminClientTest {
             env.kafkaClient().prepareResponse(prepareFindCoordinatorResponse(Errors.NONE, env.cluster().controller()));
 
             // Retriable errors should be retried
-            env.kafkaClient().prepareResponse(new OffsetFetchResponse(Errors.COORDINATOR_LOAD_IN_PROGRESS, Collections.emptyMap()));
+            env.kafkaClient().prepareResponse(offsetFetchResponse(Errors.COORDINATOR_LOAD_IN_PROGRESS, Collections.emptyMap()));
 
             /*
              * We need to return two responses here, one for NOT_COORDINATOR error when calling list consumer group offsets
@@ -3229,10 +3249,10 @@ public class KafkaAdminClientTest {
              *
              * And the same reason for the following COORDINATOR_NOT_AVAILABLE error response
              */
-            env.kafkaClient().prepareResponse(new OffsetFetchResponse(Errors.NOT_COORDINATOR, Collections.emptyMap()));
+            env.kafkaClient().prepareResponse(offsetFetchResponse(Errors.NOT_COORDINATOR, Collections.emptyMap()));
             env.kafkaClient().prepareResponse(prepareFindCoordinatorResponse(Errors.NONE, env.cluster().controller()));
 
-            env.kafkaClient().prepareResponse(new OffsetFetchResponse(Errors.COORDINATOR_NOT_AVAILABLE, Collections.emptyMap()));
+            env.kafkaClient().prepareResponse(offsetFetchResponse(Errors.COORDINATOR_NOT_AVAILABLE, Collections.emptyMap()));
             env.kafkaClient().prepareResponse(prepareFindCoordinatorResponse(Errors.NONE, env.cluster().controller()));
 
             TopicPartition myTopicPartition0 = new TopicPartition("my_topic", 0);
@@ -3249,7 +3269,7 @@ public class KafkaAdminClientTest {
                     Optional.empty(), "", Errors.NONE));
             responseData.put(myTopicPartition3, new OffsetFetchResponse.PartitionData(OffsetFetchResponse.INVALID_OFFSET,
                     Optional.empty(), "", Errors.NONE));
-            env.kafkaClient().prepareResponse(new OffsetFetchResponse(Errors.NONE, responseData));
+            env.kafkaClient().prepareResponse(offsetFetchResponse(Errors.NONE, responseData));
 
             final ListConsumerGroupOffsetsResult result = env.adminClient().listConsumerGroupOffsets(GROUP_ID);
             final Map<TopicPartition, OffsetAndMetadata> partitionToOffsetAndMetadata = result.partitionsToOffsetAndMetadata().get();
@@ -3263,6 +3283,144 @@ public class KafkaAdminClientTest {
         }
     }
 
+    @Test
+    public void testBatchedListConsumerGroupOffsets() throws Exception {
+        Cluster cluster = mockCluster(1, 0);
+        Time time = new MockTime();
+        Map<String, ListConsumerGroupOffsetsSpec> groupSpecs = batchedListConsumerGroupOffsetsSpec();
+
+        try (AdminClientUnitTestEnv env = new AdminClientUnitTestEnv(time, cluster, AdminClientConfig.RETRIES_CONFIG, "0")) {
+            env.kafkaClient().setNodeApiVersions(NodeApiVersions.create());
+            env.kafkaClient().prepareResponse(prepareBatchedFindCoordinatorResponse(Errors.NONE, env.cluster().controller(), groupSpecs.keySet()));
+
+            ListConsumerGroupOffsetsResult result = env.adminClient().listConsumerGroupOffsets(groupSpecs, new ListConsumerGroupOffsetsOptions());
+            sendOffsetFetchResponse(env.kafkaClient(), groupSpecs, true, Errors.NONE);
+
+            verifyListOffsetsForMultipleGroups(groupSpecs, result);
+        }
+    }
+
+    @Test
+    public void testBatchedListConsumerGroupOffsetsWithNoFindCoordinatorBatching() throws Exception {
+        Cluster cluster = mockCluster(1, 0);
+        Time time = new MockTime();
+        Map<String, ListConsumerGroupOffsetsSpec> groupSpecs = batchedListConsumerGroupOffsetsSpec();
+
+        ApiVersion findCoordinatorV3 = new ApiVersion()
+                .setApiKey(ApiKeys.FIND_COORDINATOR.id)
+                .setMinVersion((short) 0)
+                .setMaxVersion((short) 3);
+        ApiVersion offsetFetchV7 = new ApiVersion()
+                .setApiKey(ApiKeys.OFFSET_FETCH.id)
+                .setMinVersion((short) 0)
+                .setMaxVersion((short) 7);
+
+        try (AdminClientUnitTestEnv env = new AdminClientUnitTestEnv(time, cluster, AdminClientConfig.RETRY_BACKOFF_MS_CONFIG, "0")) {
+            env.kafkaClient().setNodeApiVersions(NodeApiVersions.create(Arrays.asList(findCoordinatorV3, offsetFetchV7)));
+            env.kafkaClient().prepareResponse(prepareOldFindCoordinatorResponse(Errors.COORDINATOR_NOT_AVAILABLE, Node.noNode()));
+            env.kafkaClient().prepareResponse(prepareOldFindCoordinatorResponse(Errors.NONE, env.cluster().controller()));
+            env.kafkaClient().prepareResponse(prepareOldFindCoordinatorResponse(Errors.NONE, env.cluster().controller()));
+
+            ListConsumerGroupOffsetsResult result = env.adminClient().listConsumerGroupOffsets(groupSpecs);
+
+            // Fail the first request in order to ensure that the group is not batched when retried.
+            sendOffsetFetchResponse(env.kafkaClient(), groupSpecs, false, Errors.COORDINATOR_LOAD_IN_PROGRESS);
+
+            sendOffsetFetchResponse(env.kafkaClient(), groupSpecs, false, Errors.NONE);
+            sendOffsetFetchResponse(env.kafkaClient(), groupSpecs, false, Errors.NONE);
+
+            verifyListOffsetsForMultipleGroups(groupSpecs, result);
+        }
+    }
+
+    @Test
+    public void testBatchedListConsumerGroupOffsetsWithNoOffsetFetchBatching() throws Exception {
+        Cluster cluster = mockCluster(1, 0);
+        Time time = new MockTime();
+        Map<String, ListConsumerGroupOffsetsSpec> groupSpecs = batchedListConsumerGroupOffsetsSpec();
+
+        ApiVersion offsetFetchV7 = new ApiVersion()
+                .setApiKey(ApiKeys.OFFSET_FETCH.id)
+                .setMinVersion((short) 0)
+                .setMaxVersion((short) 7);
+
+        try (AdminClientUnitTestEnv env = new AdminClientUnitTestEnv(time, cluster, AdminClientConfig.RETRY_BACKOFF_MS_CONFIG, "0")) {
+            env.kafkaClient().setNodeApiVersions(NodeApiVersions.create(Collections.singleton(offsetFetchV7)));
+            env.kafkaClient().prepareResponse(prepareBatchedFindCoordinatorResponse(Errors.NONE, env.cluster().controller(), groupSpecs.keySet()));
+            // Prepare a response to force client to attempt batched request creation that throws
+            // NoBatchedOffsetFetchRequestException. This triggers creation of non-batched requests.
+            env.kafkaClient().prepareResponse(offsetFetchResponse(Errors.COORDINATOR_NOT_AVAILABLE, Collections.emptyMap()));
+
+            ListConsumerGroupOffsetsResult result = env.adminClient().listConsumerGroupOffsets(groupSpecs);
+
+            // The request handler attempts both FindCoordinator and OffsetFetch requests. This seems
+            // ok since since we expect this scenario only during upgrades from versions < 3.0.0 where
+            // some upgraded brokers could handle batched FindCoordinator while non-upgraded coordinators
+            // rejected batched OffsetFetch requests.
+            sendFindCoordinatorResponse(env.kafkaClient(), env.cluster().controller());
+            sendFindCoordinatorResponse(env.kafkaClient(), env.cluster().controller());
+            sendOffsetFetchResponse(env.kafkaClient(), groupSpecs, false, Errors.NONE);
+            sendOffsetFetchResponse(env.kafkaClient(), groupSpecs, false, Errors.NONE);
+
+            verifyListOffsetsForMultipleGroups(groupSpecs, result);
+        }
+    }
+
+    private Map<String, ListConsumerGroupOffsetsSpec> batchedListConsumerGroupOffsetsSpec() {
+        Set<TopicPartition> groupAPartitions = Collections.singleton(new TopicPartition("A", 1));
+        Set<TopicPartition> groupBPartitions =  Collections.singleton(new TopicPartition("B", 2));
+
+        ListConsumerGroupOffsetsSpec groupASpec = new ListConsumerGroupOffsetsSpec().topicPartitions(groupAPartitions);
+        ListConsumerGroupOffsetsSpec groupBSpec = new ListConsumerGroupOffsetsSpec().topicPartitions(groupBPartitions);
+        return Utils.mkMap(Utils.mkEntry("groupA", groupASpec), Utils.mkEntry("groupB", groupBSpec));
+    }
+
+    private void waitForRequest(MockClient mockClient, ApiKeys apiKeys) throws Exception {
+        TestUtils.waitForCondition(() -> {
+            ClientRequest clientRequest = mockClient.requests().peek();
+            return clientRequest != null && clientRequest.apiKey() == apiKeys;
+        }, "Failed awaiting " + apiKeys + " request");
+    }
+
+    private void sendFindCoordinatorResponse(MockClient mockClient, Node coordinator) throws Exception {
+        waitForRequest(mockClient, ApiKeys.FIND_COORDINATOR);
+
+        ClientRequest clientRequest = mockClient.requests().peek();
+        FindCoordinatorRequestData data = ((FindCoordinatorRequest.Builder) clientRequest.requestBuilder()).data();
+        mockClient.respond(prepareFindCoordinatorResponse(Errors.NONE, data.key(), coordinator));
+    }
+
+    private void sendOffsetFetchResponse(MockClient mockClient, Map<String, ListConsumerGroupOffsetsSpec> groupSpecs, boolean batched, Errors error) throws Exception {
+        waitForRequest(mockClient, ApiKeys.OFFSET_FETCH);
+
+        ClientRequest clientRequest = mockClient.requests().peek();
+        OffsetFetchRequestData data = ((OffsetFetchRequest.Builder) clientRequest.requestBuilder()).data;
+        Map<String, Map<TopicPartition, PartitionData>> results = new HashMap<>();
+        Map<String, Errors> errors = new HashMap<>();
+        data.groups().forEach(group -> {
+            Map<TopicPartition, PartitionData> partitionResults = new HashMap<>();
+            for (TopicPartition tp : groupSpecs.get(group.groupId()).topicPartitions()) {
+                partitionResults.put(tp, new PartitionData(10, Optional.empty(), "", Errors.NONE));
+            }
+            results.put(group.groupId(), partitionResults);
+            errors.put(group.groupId(), error);
+        });
+        if (!batched) {
+            assertEquals(1, data.groups().size());
+            mockClient.respond(new OffsetFetchResponse(THROTTLE, error, results.values().iterator().next()));
+        } else
+            mockClient.respond(new OffsetFetchResponse(THROTTLE, errors, results));
+    }
+
+    private void verifyListOffsetsForMultipleGroups(Map<String, ListConsumerGroupOffsetsSpec> groupSpecs,
+                                                    ListConsumerGroupOffsetsResult result) throws Exception {
+        assertEquals(groupSpecs.size(), result.all().get(10, TimeUnit.SECONDS).size());
+        for (Map.Entry<String, ListConsumerGroupOffsetsSpec> entry : groupSpecs.entrySet()) {
+            assertEquals(entry.getValue().topicPartitions(),
+                    result.partitionsToOffsetAndMetadata(entry.getKey()).get().keySet());
+        }
+    }
+
     @Test
     public void testDeleteConsumerGroupsNumRetries() throws Exception {
         final Cluster cluster = mockCluster(3, 0);
@@ -6544,6 +6702,12 @@ public class KafkaAdminClientTest {
                     .setLogDir(logDir))));
     }
 
+    private OffsetFetchResponse offsetFetchResponse(Errors error, Map<TopicPartition, PartitionData> responseData) {
+        return new OffsetFetchResponse(THROTTLE,
+                                       Collections.singletonMap(GROUP_ID, error),
+                                       Collections.singletonMap(GROUP_ID, responseData));
+    }
+
     private static MemberDescription convertToMemberDescriptions(DescribedGroupMember member,
                                                                  MemberAssignment assignment) {
         return new MemberDescription(member.memberId(),
diff --git a/clients/src/test/java/org/apache/kafka/clients/admin/MockAdminClient.java b/clients/src/test/java/org/apache/kafka/clients/admin/MockAdminClient.java
index ef858c5003..8c31c7cf69 100644
--- a/clients/src/test/java/org/apache/kafka/clients/admin/MockAdminClient.java
+++ b/clients/src/test/java/org/apache/kafka/clients/admin/MockAdminClient.java
@@ -17,6 +17,7 @@
 package org.apache.kafka.clients.admin;
 
 import org.apache.kafka.clients.admin.DescribeReplicaLogDirsResult.ReplicaLogDirInfo;
+import org.apache.kafka.clients.admin.internals.CoordinatorKey;
 import org.apache.kafka.clients.consumer.OffsetAndMetadata;
 import org.apache.kafka.common.ElectionType;
 import org.apache.kafka.common.KafkaException;
@@ -583,24 +584,29 @@ public class MockAdminClient extends AdminClient {
     }
 
     @Override
-    synchronized public ListConsumerGroupOffsetsResult listConsumerGroupOffsets(String groupId, ListConsumerGroupOffsetsOptions options) {
-        // ignoring the groupId and assume one test would only work on one group only
+    synchronized public ListConsumerGroupOffsetsResult listConsumerGroupOffsets(Map<String, ListConsumerGroupOffsetsSpec> groupSpecs, ListConsumerGroupOffsetsOptions options) {
+        // ignoring the groups and assume one test would only work on one group only
+        if (groupSpecs.size() != 1)
+            throw new UnsupportedOperationException("Not implemented yet");
+
+        String group = groupSpecs.keySet().iterator().next();
+        Collection<TopicPartition> topicPartitions = groupSpecs.get(group).topicPartitions();
         final KafkaFutureImpl<Map<TopicPartition, OffsetAndMetadata>> future = new KafkaFutureImpl<>();
 
         if (listConsumerGroupOffsetsException != null) {
             future.completeExceptionally(listConsumerGroupOffsetsException);
         } else {
-            if (options.topicPartitions().isEmpty()) {
+            if (topicPartitions.isEmpty()) {
                 future.complete(committedOffsets.entrySet().stream()
                         .collect(Collectors.toMap(Map.Entry::getKey, entry -> new OffsetAndMetadata(entry.getValue()))));
             } else {
                 future.complete(committedOffsets.entrySet().stream()
-                        .filter(entry -> options.topicPartitions().contains(entry.getKey()))
+                        .filter(entry -> topicPartitions.contains(entry.getKey()))
                         .collect(Collectors.toMap(Map.Entry::getKey, entry -> new OffsetAndMetadata(entry.getValue()))));
             }
         }
 
-        return new ListConsumerGroupOffsetsResult(future);
+        return new ListConsumerGroupOffsetsResult(Collections.singletonMap(CoordinatorKey.byGroupId(group), future));
     }
 
     @Override
diff --git a/clients/src/test/java/org/apache/kafka/clients/admin/internals/ListConsumerGroupOffsetsHandlerTest.java b/clients/src/test/java/org/apache/kafka/clients/admin/internals/ListConsumerGroupOffsetsHandlerTest.java
index 27597ce035..95fabb3fc2 100644
--- a/clients/src/test/java/org/apache/kafka/clients/admin/internals/ListConsumerGroupOffsetsHandlerTest.java
+++ b/clients/src/test/java/org/apache/kafka/clients/admin/internals/ListConsumerGroupOffsetsHandlerTest.java
@@ -24,52 +24,140 @@ import static org.junit.jupiter.api.Assertions.assertEquals;
 import static org.junit.jupiter.api.Assertions.assertTrue;
 
 import java.util.Arrays;
+import java.util.Collection;
 import java.util.Collections;
 import java.util.HashMap;
+import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
 import java.util.Optional;
 
+import java.util.Set;
+import java.util.function.Function;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
+
+import org.apache.kafka.clients.admin.ListConsumerGroupOffsetsSpec;
+import org.apache.kafka.clients.admin.internals.AdminApiHandler.RequestAndKeys;
 import org.apache.kafka.clients.consumer.OffsetAndMetadata;
 import org.apache.kafka.common.Node;
 import org.apache.kafka.common.TopicPartition;
 import org.apache.kafka.common.errors.GroupAuthorizationException;
 import org.apache.kafka.common.errors.GroupIdNotFoundException;
 import org.apache.kafka.common.errors.InvalidGroupIdException;
+import org.apache.kafka.common.message.OffsetFetchRequestData.OffsetFetchRequestGroup;
+import org.apache.kafka.common.message.OffsetFetchRequestData.OffsetFetchRequestTopics;
 import org.apache.kafka.common.protocol.Errors;
 import org.apache.kafka.common.requests.OffsetFetchRequest;
 import org.apache.kafka.common.requests.OffsetFetchResponse;
 import org.apache.kafka.common.requests.OffsetFetchResponse.PartitionData;
 import org.apache.kafka.common.utils.LogContext;
+import org.apache.kafka.common.utils.Utils;
 import org.junit.jupiter.api.Test;
 
 public class ListConsumerGroupOffsetsHandlerTest {
 
     private final LogContext logContext = new LogContext();
-    private final String groupId = "group-id";
+    private final int throttleMs = 10;
+    private final String groupZero = "group0";
+    private final String groupOne = "group1";
+    private final String groupTwo = "group2";
+    private final List<String> groups = Arrays.asList(groupZero, groupOne, groupTwo);
     private final TopicPartition t0p0 = new TopicPartition("t0", 0);
     private final TopicPartition t0p1 = new TopicPartition("t0", 1);
     private final TopicPartition t1p0 = new TopicPartition("t1", 0);
     private final TopicPartition t1p1 = new TopicPartition("t1", 1);
-    private final List<TopicPartition> tps = Arrays.asList(t0p0, t0p1, t1p0, t1p1);
+    private final TopicPartition t2p0 = new TopicPartition("t2", 0);
+    private final TopicPartition t2p1 = new TopicPartition("t2", 1);
+    private final TopicPartition t2p2 = new TopicPartition("t2", 2);
+    private final Map<String, ListConsumerGroupOffsetsSpec> singleRequestMap = Collections.singletonMap(groupZero,
+            new ListConsumerGroupOffsetsSpec().topicPartitions(Arrays.asList(t0p0, t0p1, t1p0, t1p1)));
+    private final Map<String, ListConsumerGroupOffsetsSpec> batchedRequestMap =
+            new HashMap<String, ListConsumerGroupOffsetsSpec>() {{
+                put(groupZero, new ListConsumerGroupOffsetsSpec().topicPartitions(singletonList(t0p0)));
+                put(groupOne, new ListConsumerGroupOffsetsSpec().topicPartitions(Arrays.asList(t0p0, t1p0, t1p1)));
+                put(groupTwo, new ListConsumerGroupOffsetsSpec().topicPartitions(Arrays.asList(t0p0, t1p0, t1p1, t2p0, t2p1, t2p2)));
+            }};
 
     @Test
     public void testBuildRequest() {
-        ListConsumerGroupOffsetsHandler handler = new ListConsumerGroupOffsetsHandler(groupId, tps, logContext);
-        OffsetFetchRequest request = handler.buildBatchedRequest(1, singleton(CoordinatorKey.byGroupId(groupId))).build();
-        assertEquals(groupId, request.data().groups().get(0).groupId());
+        ListConsumerGroupOffsetsHandler handler =
+            new ListConsumerGroupOffsetsHandler(singleRequestMap, false, logContext);
+        OffsetFetchRequest request = handler.buildBatchedRequest(coordinatorKeys(groupZero)).build();
+        assertEquals(groupZero, request.data().groups().get(0).groupId());
         assertEquals(2, request.data().groups().get(0).topics().size());
         assertEquals(2, request.data().groups().get(0).topics().get(0).partitionIndexes().size());
         assertEquals(2, request.data().groups().get(0).topics().get(1).partitionIndexes().size());
     }
 
+    @Test
+    public void testBuildRequestWithMultipleGroups() {
+        Map<String, ListConsumerGroupOffsetsSpec> requestMap = new HashMap<>(this.batchedRequestMap);
+        String groupThree = "group3";
+        requestMap.put(groupThree, new ListConsumerGroupOffsetsSpec()
+                .topicPartitions(Arrays.asList(new TopicPartition("t3", 0), new TopicPartition("t3", 1))));
+
+        ListConsumerGroupOffsetsHandler handler = new ListConsumerGroupOffsetsHandler(requestMap, false, logContext);
+        OffsetFetchRequest request1 = handler.buildBatchedRequest(coordinatorKeys(groupZero, groupOne, groupTwo)).build();
+        assertEquals(Utils.mkSet(groupZero, groupOne, groupTwo), requestGroups(request1));
+
+        OffsetFetchRequest request2 = handler.buildBatchedRequest(coordinatorKeys(groupThree)).build();
+        assertEquals(Utils.mkSet(groupThree), requestGroups(request2));
+
+        Map<String, ListConsumerGroupOffsetsSpec> builtRequests = new HashMap<>();
+        request1.groupIdsToPartitions().forEach((group, partitions) ->
+                builtRequests.put(group, new ListConsumerGroupOffsetsSpec().topicPartitions(partitions)));
+        request2.groupIdsToPartitions().forEach((group, partitions) ->
+                builtRequests.put(group, new ListConsumerGroupOffsetsSpec().topicPartitions(partitions)));
+
+        assertEquals(requestMap, builtRequests);
+        Map<String, List<OffsetFetchRequestTopics>> groupIdsToTopics = request1.groupIdsToTopics();
+
+        assertEquals(3, groupIdsToTopics.size());
+        assertEquals(1, groupIdsToTopics.get(groupZero).size());
+        assertEquals(2, groupIdsToTopics.get(groupOne).size());
+        assertEquals(3, groupIdsToTopics.get(groupTwo).size());
+
+        assertEquals(1, groupIdsToTopics.get(groupZero).get(0).partitionIndexes().size());
+        assertEquals(1, groupIdsToTopics.get(groupOne).get(0).partitionIndexes().size());
+        assertEquals(2, groupIdsToTopics.get(groupOne).get(1).partitionIndexes().size());
+        assertEquals(1, groupIdsToTopics.get(groupTwo).get(0).partitionIndexes().size());
+        assertEquals(2, groupIdsToTopics.get(groupTwo).get(1).partitionIndexes().size());
+        assertEquals(3, groupIdsToTopics.get(groupTwo).get(2).partitionIndexes().size());
+
+        groupIdsToTopics = request2.groupIdsToTopics();
+        assertEquals(1, groupIdsToTopics.size());
+        assertEquals(1, groupIdsToTopics.get(groupThree).size());
+        assertEquals(2, groupIdsToTopics.get(groupThree).get(0).partitionIndexes().size());
+    }
+
+    @Test
+    public void testBuildRequestBatchGroups() {
+        ListConsumerGroupOffsetsHandler handler = new ListConsumerGroupOffsetsHandler(batchedRequestMap, false, logContext);
+        Collection<RequestAndKeys<CoordinatorKey>> requests = handler.buildRequest(1, coordinatorKeys(groupZero, groupOne, groupTwo));
+        assertEquals(1, requests.size());
+        assertEquals(Utils.mkSet(groupZero, groupOne, groupTwo), requestGroups((OffsetFetchRequest) requests.iterator().next().request.build()));
+    }
+
+    @Test
+    public void testBuildRequestDoesNotBatchGroup() {
+        ListConsumerGroupOffsetsHandler handler = new ListConsumerGroupOffsetsHandler(batchedRequestMap, false, logContext);
+        // Disable batching.
+        ((CoordinatorStrategy) handler.lookupStrategy()).disableBatch();
+        Collection<RequestAndKeys<CoordinatorKey>> requests = handler.buildRequest(1, coordinatorKeys(groupZero, groupOne, groupTwo));
+        assertEquals(3, requests.size());
+        assertEquals(
+            Utils.mkSet(Utils.mkSet(groupZero), Utils.mkSet(groupOne), Utils.mkSet(groupTwo)),
+            requests.stream().map(requestAndKey -> requestGroups((OffsetFetchRequest) requestAndKey.request.build())).collect(Collectors.toSet())
+        );
+    }
+
     @Test
     public void testSuccessfulHandleResponse() {
         Map<TopicPartition, OffsetAndMetadata> expected = new HashMap<>();
         assertCompleted(handleWithError(Errors.NONE), expected);
     }
 
-
     @Test
     public void testSuccessfulHandleResponseWithOnePartitionError() {
         Map<TopicPartition, OffsetAndMetadata> expectedResult = Collections.singletonMap(t0p0, new OffsetAndMetadata(10L));
@@ -80,17 +168,62 @@ public class ListConsumerGroupOffsetsHandlerTest {
         assertCompleted(handleWithPartitionError(Errors.UNSTABLE_OFFSET_COMMIT), expectedResult);
     }
 
+    @Test
+    public void testSuccessfulHandleResponseWithOnePartitionErrorWithMultipleGroups() {
+        Map<TopicPartition, OffsetAndMetadata> offsetAndMetadataMapZero =
+            Collections.singletonMap(t0p0, new OffsetAndMetadata(10L));
+        Map<TopicPartition, OffsetAndMetadata> offsetAndMetadataMapOne =
+            Collections.singletonMap(t1p1, new OffsetAndMetadata(10L));
+        Map<TopicPartition, OffsetAndMetadata> offsetAndMetadataMapTwo =
+            Collections.singletonMap(t2p2, new OffsetAndMetadata(10L));
+        Map<String, Map<TopicPartition, OffsetAndMetadata>> expectedResult =
+            new HashMap<String, Map<TopicPartition, OffsetAndMetadata>>() {{
+                put(groupZero, offsetAndMetadataMapZero);
+                put(groupOne, offsetAndMetadataMapOne);
+                put(groupTwo, offsetAndMetadataMapTwo);
+            }};
+
+        assertCompletedForMultipleGroups(
+            handleWithPartitionErrorMultipleGroups(Errors.UNKNOWN_TOPIC_OR_PARTITION), expectedResult);
+        assertCompletedForMultipleGroups(
+            handleWithPartitionErrorMultipleGroups(Errors.TOPIC_AUTHORIZATION_FAILED), expectedResult);
+        assertCompletedForMultipleGroups(
+            handleWithPartitionErrorMultipleGroups(Errors.UNSTABLE_OFFSET_COMMIT), expectedResult);
+    }
+
+    @Test
+    public void testSuccessfulHandleResponseWithMultipleGroups() {
+        Map<String, Map<TopicPartition, OffsetAndMetadata>> expected = new HashMap<>();
+        Map<String, Errors> errorMap = errorMap(groups, Errors.NONE);
+        assertCompletedForMultipleGroups(handleWithErrorWithMultipleGroups(errorMap, batchedRequestMap), expected);
+    }
+
     @Test
     public void testUnmappedHandleResponse() {
         assertUnmapped(handleWithError(Errors.COORDINATOR_NOT_AVAILABLE));
         assertUnmapped(handleWithError(Errors.NOT_COORDINATOR));
     }
 
+    @Test
+    public void testUnmappedHandleResponseWithMultipleGroups() {
+        Map<String, Errors> errorMap = new HashMap<>();
+        errorMap.put(groupZero, Errors.NOT_COORDINATOR);
+        errorMap.put(groupOne, Errors.COORDINATOR_NOT_AVAILABLE);
+        errorMap.put(groupTwo, Errors.NOT_COORDINATOR);
+        assertUnmappedWithMultipleGroups(handleWithErrorWithMultipleGroups(errorMap, batchedRequestMap));
+    }
+
     @Test
     public void testRetriableHandleResponse() {
         assertRetriable(handleWithError(Errors.COORDINATOR_LOAD_IN_PROGRESS));
     }
 
+    @Test
+    public void testRetriableHandleResponseWithMultipleGroups() {
+        Map<String, Errors> errorMap = errorMap(groups, Errors.COORDINATOR_LOAD_IN_PROGRESS);
+        assertRetriable(handleWithErrorWithMultipleGroups(errorMap, batchedRequestMap));
+    }
+
     @Test
     public void testFailedHandleResponse() {
         assertFailed(GroupAuthorizationException.class, handleWithError(Errors.GROUP_AUTHORIZATION_FAILED));
@@ -98,10 +231,50 @@ public class ListConsumerGroupOffsetsHandlerTest {
         assertFailed(InvalidGroupIdException.class, handleWithError(Errors.INVALID_GROUP_ID));
     }
 
+    @Test
+    public void testFailedHandleResponseWithMultipleGroups() {
+        Map<String, Errors> errorMap = new HashMap<>();
+        errorMap.put(groupZero, Errors.GROUP_AUTHORIZATION_FAILED);
+        errorMap.put(groupOne, Errors.GROUP_ID_NOT_FOUND);
+        errorMap.put(groupTwo, Errors.INVALID_GROUP_ID);
+        Map<String, Class<? extends Throwable>> groupToExceptionMap = new HashMap<>();
+        groupToExceptionMap.put(groupZero, GroupAuthorizationException.class);
+        groupToExceptionMap.put(groupOne, GroupIdNotFoundException.class);
+        groupToExceptionMap.put(groupTwo, InvalidGroupIdException.class);
+        assertFailedForMultipleGroups(groupToExceptionMap,
+            handleWithErrorWithMultipleGroups(errorMap, batchedRequestMap));
+    }
+
     private OffsetFetchResponse buildResponse(Errors error) {
-        Map<TopicPartition, PartitionData> responseData = new HashMap<>();
-        OffsetFetchResponse response = new OffsetFetchResponse(error, responseData);
-        return response;
+        return new OffsetFetchResponse(
+            throttleMs,
+            Collections.singletonMap(groupZero, error),
+            Collections.singletonMap(groupZero, new HashMap<>()));
+    }
+
+    private OffsetFetchResponse buildResponseWithMultipleGroups(
+        Map<String, Errors> errorMap,
+        Map<String, Map<TopicPartition, PartitionData>> responseData
+    ) {
+        return new OffsetFetchResponse(throttleMs, errorMap, responseData);
+    }
+
+    private AdminApiHandler.ApiResult<CoordinatorKey, Map<TopicPartition, OffsetAndMetadata>> handleWithErrorWithMultipleGroups(
+        Map<String, Errors> errorMap,
+        Map<String, ListConsumerGroupOffsetsSpec> groupSpecs
+    ) {
+        ListConsumerGroupOffsetsHandler handler = new ListConsumerGroupOffsetsHandler(groupSpecs, false, logContext);
+        Map<String, Map<TopicPartition, PartitionData>> responseData = new HashMap<>();
+        for (String group : errorMap.keySet()) {
+            responseData.put(group, new HashMap<>());
+        }
+        OffsetFetchResponse response = buildResponseWithMultipleGroups(errorMap, responseData);
+        return handler.handleResponse(new Node(1, "host", 1234),
+                errorMap.keySet()
+                        .stream()
+                        .map(CoordinatorKey::byGroupId)
+                        .collect(Collectors.toSet()),
+                response);
     }
 
     private OffsetFetchResponse buildResponseWithPartitionError(Errors error) {
@@ -110,24 +283,68 @@ public class ListConsumerGroupOffsetsHandlerTest {
         responseData.put(t0p0, new OffsetFetchResponse.PartitionData(10, Optional.empty(), "", Errors.NONE));
         responseData.put(t0p1, new OffsetFetchResponse.PartitionData(10, Optional.empty(), "", error));
 
-        OffsetFetchResponse response = new OffsetFetchResponse(Errors.NONE, responseData);
-        return response;
+        return new OffsetFetchResponse(Errors.NONE, responseData);
+    }
+
+    private OffsetFetchResponse buildResponseWithPartitionErrorWithMultipleGroups(Errors error) {
+        Map<TopicPartition, PartitionData> responseDataZero = new HashMap<>();
+        responseDataZero.put(t0p0, new OffsetFetchResponse.PartitionData(10, Optional.empty(), "", Errors.NONE));
+
+        Map<TopicPartition, PartitionData> responseDataOne = new HashMap<>();
+        responseDataOne.put(t0p0, new OffsetFetchResponse.PartitionData(10, Optional.empty(), "", error));
+        responseDataOne.put(t1p0, new OffsetFetchResponse.PartitionData(10, Optional.empty(), "", error));
+        responseDataOne.put(t1p1, new OffsetFetchResponse.PartitionData(10, Optional.empty(), "", Errors.NONE));
+
+        Map<TopicPartition, PartitionData> responseDataTwo = new HashMap<>();
+        responseDataTwo.put(t0p0, new OffsetFetchResponse.PartitionData(10, Optional.empty(), "", error));
+        responseDataTwo.put(t1p0, new OffsetFetchResponse.PartitionData(10, Optional.empty(), "", error));
+        responseDataTwo.put(t1p1, new OffsetFetchResponse.PartitionData(10, Optional.empty(), "", error));
+        responseDataTwo.put(t2p0, new OffsetFetchResponse.PartitionData(10, Optional.empty(), "", error));
+        responseDataTwo.put(t2p1, new OffsetFetchResponse.PartitionData(10, Optional.empty(), "", error));
+        responseDataTwo.put(t2p2, new OffsetFetchResponse.PartitionData(10, Optional.empty(), "", Errors.NONE));
+
+        Map<String, Map<TopicPartition, PartitionData>> responseData =
+            new HashMap<String, Map<TopicPartition, PartitionData>>() {{
+                put(groupZero, responseDataZero);
+                put(groupOne, responseDataOne);
+                put(groupTwo, responseDataTwo);
+            }};
+
+        Map<String, Errors> errorMap = errorMap(groups, Errors.NONE);
+        return new OffsetFetchResponse(0, errorMap, responseData);
     }
 
     private AdminApiHandler.ApiResult<CoordinatorKey, Map<TopicPartition, OffsetAndMetadata>> handleWithPartitionError(
         Errors error
     ) {
-        ListConsumerGroupOffsetsHandler handler = new ListConsumerGroupOffsetsHandler(groupId, tps, logContext);
+        ListConsumerGroupOffsetsHandler handler = new ListConsumerGroupOffsetsHandler(singleRequestMap,
+            false, logContext);
         OffsetFetchResponse response = buildResponseWithPartitionError(error);
-        return handler.handleResponse(new Node(1, "host", 1234), singleton(CoordinatorKey.byGroupId(groupId)), response);
+        return handler.handleResponse(new Node(1, "host", 1234),
+            singleton(CoordinatorKey.byGroupId(groupZero)), response);
+    }
+
+    private AdminApiHandler.ApiResult<CoordinatorKey, Map<TopicPartition, OffsetAndMetadata>> handleWithPartitionErrorMultipleGroups(
+        Errors error
+    ) {
+        ListConsumerGroupOffsetsHandler handler = new ListConsumerGroupOffsetsHandler(
+                batchedRequestMap, false, logContext);
+        OffsetFetchResponse response = buildResponseWithPartitionErrorWithMultipleGroups(error);
+        return handler.handleResponse(
+            new Node(1, "host", 1234),
+            coordinatorKeys(groupZero, groupOne, groupTwo),
+            response);
     }
 
     private AdminApiHandler.ApiResult<CoordinatorKey, Map<TopicPartition, OffsetAndMetadata>> handleWithError(
         Errors error
     ) {
-        ListConsumerGroupOffsetsHandler handler = new ListConsumerGroupOffsetsHandler(groupId, tps, logContext);
+        ListConsumerGroupOffsetsHandler handler = new ListConsumerGroupOffsetsHandler(
+            singleRequestMap, false, logContext);
         OffsetFetchResponse response = buildResponse(error);
-        return handler.handleResponse(new Node(1, "host", 1234), singleton(CoordinatorKey.byGroupId(groupId)), response);
+        return handler.handleResponse(new Node(1, "host", 1234),
+            singleton(CoordinatorKey.byGroupId(groupZero)),
+            response);
     }
 
     private void assertUnmapped(
@@ -135,11 +352,19 @@ public class ListConsumerGroupOffsetsHandlerTest {
     ) {
         assertEquals(emptySet(), result.completedKeys.keySet());
         assertEquals(emptySet(), result.failedKeys.keySet());
-        assertEquals(singletonList(CoordinatorKey.byGroupId(groupId)), result.unmappedKeys);
+        assertEquals(singletonList(CoordinatorKey.byGroupId(groupZero)), result.unmappedKeys);
+    }
+
+    private void assertUnmappedWithMultipleGroups(
+            AdminApiHandler.ApiResult<CoordinatorKey, Map<TopicPartition, OffsetAndMetadata>> result
+    ) {
+        assertEquals(emptySet(), result.completedKeys.keySet());
+        assertEquals(emptySet(), result.failedKeys.keySet());
+        assertEquals(coordinatorKeys(groupZero, groupOne, groupTwo), new HashSet<>(result.unmappedKeys));
     }
 
     private void assertRetriable(
-        AdminApiHandler.ApiResult<CoordinatorKey, Map<TopicPartition, OffsetAndMetadata>> result
+            AdminApiHandler.ApiResult<CoordinatorKey, Map<TopicPartition, OffsetAndMetadata>> result
     ) {
         assertEquals(emptySet(), result.completedKeys.keySet());
         assertEquals(emptySet(), result.failedKeys.keySet());
@@ -150,21 +375,64 @@ public class ListConsumerGroupOffsetsHandlerTest {
         AdminApiHandler.ApiResult<CoordinatorKey, Map<TopicPartition, OffsetAndMetadata>> result,
         Map<TopicPartition, OffsetAndMetadata> expected
     ) {
-        CoordinatorKey key = CoordinatorKey.byGroupId(groupId);
+        CoordinatorKey key = CoordinatorKey.byGroupId(groupZero);
         assertEquals(emptySet(), result.failedKeys.keySet());
         assertEquals(emptyList(), result.unmappedKeys);
         assertEquals(singleton(key), result.completedKeys.keySet());
-        assertEquals(expected, result.completedKeys.get(CoordinatorKey.byGroupId(groupId)));
+        assertEquals(expected, result.completedKeys.get(key));
+    }
+
+    private void assertCompletedForMultipleGroups(
+        AdminApiHandler.ApiResult<CoordinatorKey, Map<TopicPartition, OffsetAndMetadata>> result,
+        Map<String, Map<TopicPartition, OffsetAndMetadata>> expected
+    ) {
+        assertEquals(emptySet(), result.failedKeys.keySet());
+        assertEquals(emptyList(), result.unmappedKeys);
+        for (String g : expected.keySet()) {
+            CoordinatorKey key = CoordinatorKey.byGroupId(g);
+            assertTrue(result.completedKeys.containsKey(key));
+            assertEquals(expected.get(g), result.completedKeys.get(key));
+        }
     }
 
     private void assertFailed(
         Class<? extends Throwable> expectedExceptionType,
         AdminApiHandler.ApiResult<CoordinatorKey, Map<TopicPartition, OffsetAndMetadata>> result
     ) {
-        CoordinatorKey key = CoordinatorKey.byGroupId(groupId);
+        CoordinatorKey key = CoordinatorKey.byGroupId(groupZero);
         assertEquals(emptySet(), result.completedKeys.keySet());
         assertEquals(emptyList(), result.unmappedKeys);
         assertEquals(singleton(key), result.failedKeys.keySet());
         assertTrue(expectedExceptionType.isInstance(result.failedKeys.get(key)));
     }
+
+    private void assertFailedForMultipleGroups(
+        Map<String, Class<? extends Throwable>> groupToExceptionMap,
+        AdminApiHandler.ApiResult<CoordinatorKey, Map<TopicPartition, OffsetAndMetadata>> result
+    ) {
+        assertEquals(emptySet(), result.completedKeys.keySet());
+        assertEquals(emptyList(), result.unmappedKeys);
+        for (String g : groupToExceptionMap.keySet()) {
+            CoordinatorKey key = CoordinatorKey.byGroupId(g);
+            assertTrue(result.failedKeys.containsKey(key));
+            assertTrue(groupToExceptionMap.get(g).isInstance(result.failedKeys.get(key)));
+        }
+    }
+
+    private Set<CoordinatorKey> coordinatorKeys(String... groups) {
+        return Stream.of(groups)
+                .map(CoordinatorKey::byGroupId)
+                .collect(Collectors.toSet());
+    }
+
+    private Set<String> requestGroups(OffsetFetchRequest request) {
+        return request.data().groups()
+                .stream()
+                .map(OffsetFetchRequestGroup::groupId)
+                .collect(Collectors.toSet());
+    }
+
+    private Map<String, Errors> errorMap(Collection<String> groups, Errors error) {
+        return groups.stream().collect(Collectors.toMap(Function.identity(), unused -> error));
+    }
 }
diff --git a/clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java b/clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java
index da3acf4983..e7f25345c6 100644
--- a/clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java
+++ b/clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java
@@ -172,6 +172,7 @@ public class KafkaConsumerTest {
     // Set auto commit interval lower than heartbeat so we don't need to deal with
     // a concurrent heartbeat request
     private final int autoCommitIntervalMs = 500;
+    private final int throttleMs = 10;
 
     private final String groupId = "mock-group";
     private final String memberId = "memberId";
@@ -2434,7 +2435,10 @@ public class KafkaConsumerTest {
             partitionData.put(entry.getKey(), new OffsetFetchResponse.PartitionData(entry.getValue(),
                     Optional.empty(), "", error));
         }
-        return new OffsetFetchResponse(Errors.NONE, partitionData);
+        return new OffsetFetchResponse(
+            throttleMs,
+            Collections.singletonMap(groupId, Errors.NONE),
+            Collections.singletonMap(groupId, partitionData));
     }
 
     private ListOffsetsResponse listOffsetsResponse(Map<TopicPartition, Long> offsets) {
diff --git a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinatorTest.java b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinatorTest.java
index c65d33176f..db483c6c0f 100644
--- a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinatorTest.java
+++ b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinatorTest.java
@@ -71,6 +71,7 @@ import org.apache.kafka.common.requests.MetadataResponse;
 import org.apache.kafka.common.requests.OffsetCommitRequest;
 import org.apache.kafka.common.requests.OffsetCommitResponse;
 import org.apache.kafka.common.requests.OffsetFetchResponse;
+import org.apache.kafka.common.requests.OffsetFetchResponse.PartitionData;
 import org.apache.kafka.common.requests.RequestTestUtils;
 import org.apache.kafka.common.requests.SyncGroupRequest;
 import org.apache.kafka.common.requests.SyncGroupResponse;
@@ -140,6 +141,7 @@ public abstract class ConsumerCoordinatorTest {
     private final long retryBackoffMs = 100;
     private final int autoCommitIntervalMs = 2000;
     private final int requestTimeoutMs = 30000;
+    private final int throttleMs = 10;
     private final MockTime time = new MockTime();
     private GroupRebalanceConfig rebalanceConfig;
 
@@ -2872,7 +2874,7 @@ public abstract class ConsumerCoordinatorTest {
         OffsetFetchResponse.PartitionData data = new OffsetFetchResponse.PartitionData(offset, leaderEpoch,
                 metadata, Errors.NONE);
 
-        client.prepareResponse(new OffsetFetchResponse(Errors.NONE, singletonMap(t1p, data)));
+        client.prepareResponse(offsetFetchResponse(Errors.NONE, singletonMap(t1p, data)));
         Map<TopicPartition, OffsetAndMetadata> fetchedOffsets = coordinator.fetchCommittedOffsets(singleton(t1p),
                 time.timer(Long.MAX_VALUE));
 
@@ -2888,7 +2890,7 @@ public abstract class ConsumerCoordinatorTest {
         OffsetFetchResponse.PartitionData data = new OffsetFetchResponse.PartitionData(-1, Optional.empty(),
                 "", Errors.TOPIC_AUTHORIZATION_FAILED);
 
-        client.prepareResponse(new OffsetFetchResponse(Errors.NONE, singletonMap(t1p, data)));
+        client.prepareResponse(offsetFetchResponse(Errors.NONE, singletonMap(t1p, data)));
         TopicAuthorizationException exception = assertThrows(TopicAuthorizationException.class, () ->
                 coordinator.fetchCommittedOffsets(singleton(t1p), time.timer(Long.MAX_VALUE)));
 
@@ -2901,7 +2903,7 @@ public abstract class ConsumerCoordinatorTest {
         coordinator.ensureCoordinatorReady(time.timer(Long.MAX_VALUE));
 
         subscriptions.assignFromUser(singleton(t1p));
-        client.prepareResponse(offsetFetchResponse(Errors.COORDINATOR_LOAD_IN_PROGRESS));
+        client.prepareResponse(offsetFetchResponse(Errors.COORDINATOR_LOAD_IN_PROGRESS, Collections.emptyMap()));
         client.prepareResponse(offsetFetchResponse(t1p, Errors.NONE, "", 100L));
         coordinator.refreshCommittedOffsetsIfNeeded(time.timer(Long.MAX_VALUE));
 
@@ -2916,7 +2918,7 @@ public abstract class ConsumerCoordinatorTest {
         coordinator.ensureCoordinatorReady(time.timer(Long.MAX_VALUE));
 
         subscriptions.assignFromUser(singleton(t1p));
-        client.prepareResponse(offsetFetchResponse(Errors.GROUP_AUTHORIZATION_FAILED));
+        client.prepareResponse(offsetFetchResponse(Errors.GROUP_AUTHORIZATION_FAILED, Collections.emptyMap()));
         try {
             coordinator.refreshCommittedOffsetsIfNeeded(time.timer(Long.MAX_VALUE));
             fail("Expected group authorization error");
@@ -2959,7 +2961,7 @@ public abstract class ConsumerCoordinatorTest {
         coordinator.ensureCoordinatorReady(time.timer(Long.MAX_VALUE));
 
         subscriptions.assignFromUser(singleton(t1p));
-        client.prepareResponse(offsetFetchResponse(Errors.NOT_COORDINATOR));
+        client.prepareResponse(offsetFetchResponse(Errors.NOT_COORDINATOR, Collections.emptyMap()));
         client.prepareResponse(groupCoordinatorResponse(node, Errors.NONE));
         client.prepareResponse(offsetFetchResponse(t1p, Errors.NONE, "", 100L));
         coordinator.refreshCommittedOffsetsIfNeeded(time.timer(Long.MAX_VALUE));
@@ -3435,7 +3437,11 @@ public abstract class ConsumerCoordinatorTest {
         OffsetFetchResponse.PartitionData data = new OffsetFetchResponse.PartitionData(offset, leaderEpoch,
             metadata, Errors.NONE);
 
-        client.prepareResponse(new OffsetFetchResponse(Errors.NONE, singletonMap(t1p, data)));
+        if (upperVersion < 8) {
+            client.prepareResponse(new OffsetFetchResponse(Errors.NONE, singletonMap(t1p, data)));
+        } else {
+            client.prepareResponse(offsetFetchResponse(Errors.NONE, singletonMap(t1p, data)));
+        }
         if (expectThrows) {
             assertThrows(UnsupportedVersionException.class,
                 () -> coordinator.fetchCommittedOffsets(singleton(t1p), time.timer(Long.MAX_VALUE)));
@@ -3690,8 +3696,10 @@ public abstract class ConsumerCoordinatorTest {
         return new OffsetCommitResponse(responseData);
     }
 
-    private OffsetFetchResponse offsetFetchResponse(Errors topLevelError) {
-        return new OffsetFetchResponse(topLevelError, Collections.emptyMap());
+    private OffsetFetchResponse offsetFetchResponse(Errors error, Map<TopicPartition, PartitionData> responseData) {
+        return new OffsetFetchResponse(throttleMs,
+                                       singletonMap(groupId, error),
+                                       singletonMap(groupId, responseData));
     }
 
     private OffsetFetchResponse offsetFetchResponse(TopicPartition tp, Errors partitionLevelError, String metadata, long offset) {
@@ -3701,7 +3709,7 @@ public abstract class ConsumerCoordinatorTest {
     private OffsetFetchResponse offsetFetchResponse(TopicPartition tp, Errors partitionLevelError, String metadata, long offset, Optional<Integer> epoch) {
         OffsetFetchResponse.PartitionData data = new OffsetFetchResponse.PartitionData(offset,
                 epoch, metadata, partitionLevelError);
-        return new OffsetFetchResponse(Errors.NONE, singletonMap(tp, data));
+        return offsetFetchResponse(Errors.NONE, singletonMap(tp, data));
     }
 
     private OffsetCommitCallback callback(final AtomicBoolean success) {
diff --git a/core/src/main/scala/kafka/admin/ConsumerGroupCommand.scala b/core/src/main/scala/kafka/admin/ConsumerGroupCommand.scala
index 47c1d173b3..d5aee881c9 100755
--- a/core/src/main/scala/kafka/admin/ConsumerGroupCommand.scala
+++ b/core/src/main/scala/kafka/admin/ConsumerGroupCommand.scala
@@ -18,7 +18,7 @@
 package kafka.admin
 
 import java.time.{Duration, Instant}
-import java.util.Properties
+import java.util.{Collections, Properties}
 import com.fasterxml.jackson.dataformat.csv.CsvMapper
 import com.fasterxml.jackson.module.scala.DefaultScalaModule
 import kafka.utils._
@@ -753,9 +753,9 @@ object ConsumerGroupCommand extends Logging {
 
     private def getCommittedOffsets(groupId: String): Map[TopicPartition, OffsetAndMetadata] = {
       adminClient.listConsumerGroupOffsets(
-        groupId,
-        withTimeoutMs(new ListConsumerGroupOffsetsOptions)
-      ).partitionsToOffsetAndMetadata.get.asScala
+        Collections.singletonMap(groupId, new ListConsumerGroupOffsetsSpec),
+        withTimeoutMs(new ListConsumerGroupOffsetsOptions())
+      ).partitionsToOffsetAndMetadata(groupId).get().asScala
     }
 
     type GroupMetadata = immutable.Map[String, immutable.Map[TopicPartition, OffsetAndMetadata]]
diff --git a/core/src/test/scala/unit/kafka/admin/ConsumerGroupServiceTest.scala b/core/src/test/scala/unit/kafka/admin/ConsumerGroupServiceTest.scala
index 76a3855a87..44b241a7ed 100644
--- a/core/src/test/scala/unit/kafka/admin/ConsumerGroupServiceTest.scala
+++ b/core/src/test/scala/unit/kafka/admin/ConsumerGroupServiceTest.scala
@@ -49,8 +49,8 @@ class ConsumerGroupServiceTest {
 
     when(admin.describeConsumerGroups(ArgumentMatchers.eq(Collections.singletonList(group)), any()))
       .thenReturn(describeGroupsResult(ConsumerGroupState.STABLE))
-    when(admin.listConsumerGroupOffsets(ArgumentMatchers.eq(group), any()))
-      .thenReturn(listGroupOffsetsResult)
+    when(admin.listConsumerGroupOffsets(ArgumentMatchers.eq(listConsumerGroupOffsetsSpec), any()))
+      .thenReturn(listGroupOffsetsResult(group))
     when(admin.listOffsets(offsetsArgMatcher, any()))
       .thenReturn(listOffsetsResult)
 
@@ -60,7 +60,7 @@ class ConsumerGroupServiceTest {
     assertEquals(topicPartitions.size, assignments.get.size)
 
     verify(admin, times(1)).describeConsumerGroups(ArgumentMatchers.eq(Collections.singletonList(group)), any())
-    verify(admin, times(1)).listConsumerGroupOffsets(ArgumentMatchers.eq(group), any())
+    verify(admin, times(1)).listConsumerGroupOffsets(ArgumentMatchers.eq(listConsumerGroupOffsetsSpec), any())
     verify(admin, times(1)).listOffsets(offsetsArgMatcher, any())
   }
 
@@ -112,8 +112,10 @@ class ConsumerGroupServiceTest {
     future.complete(consumerGroupDescription)
     when(admin.describeConsumerGroups(ArgumentMatchers.eq(Collections.singletonList(group)), any()))
       .thenReturn(new DescribeConsumerGroupsResult(Collections.singletonMap(group, future)))
-    when(admin.listConsumerGroupOffsets(ArgumentMatchers.eq(group), any()))
-      .thenReturn(AdminClientTestUtils.listConsumerGroupOffsetsResult(commitedOffsets))
+    when(admin.listConsumerGroupOffsets(ArgumentMatchers.eq(listConsumerGroupOffsetsSpec), any()))
+      .thenReturn(
+        AdminClientTestUtils.listConsumerGroupOffsetsResult(
+          Collections.singletonMap(group, commitedOffsets)))
     when(admin.listOffsets(
       ArgumentMatchers.argThat(offsetsArgMatcher(assignedTopicPartitions)),
       any()
@@ -142,7 +144,7 @@ class ConsumerGroupServiceTest {
     assertEquals(expectedOffsets, returnedOffsets)
 
     verify(admin, times(1)).describeConsumerGroups(ArgumentMatchers.eq(Collections.singletonList(group)), any())
-    verify(admin, times(1)).listConsumerGroupOffsets(ArgumentMatchers.eq(group), any())
+    verify(admin, times(1)).listConsumerGroupOffsets(ArgumentMatchers.eq(listConsumerGroupOffsetsSpec), any())
     verify(admin, times(1)).listOffsets(ArgumentMatchers.argThat(offsetsArgMatcher(assignedTopicPartitions)), any())
     verify(admin, times(1)).listOffsets(ArgumentMatchers.argThat(offsetsArgMatcher(unassignedTopicPartitions)), any())
   }
@@ -192,9 +194,9 @@ class ConsumerGroupServiceTest {
     new DescribeConsumerGroupsResult(Collections.singletonMap(group, future))
   }
 
-  private def listGroupOffsetsResult: ListConsumerGroupOffsetsResult = {
+  private def listGroupOffsetsResult(groupId: String): ListConsumerGroupOffsetsResult = {
     val offsets = topicPartitions.map(_ -> new OffsetAndMetadata(100)).toMap.asJava
-    AdminClientTestUtils.listConsumerGroupOffsetsResult(offsets)
+    AdminClientTestUtils.listConsumerGroupOffsetsResult(Map(groupId -> offsets).asJava)
   }
 
   private def offsetsArgMatcher: util.Map[TopicPartition, OffsetSpec] = {
@@ -217,4 +219,8 @@ class ConsumerGroupServiceTest {
     }.toMap
     AdminClientTestUtils.describeTopicsResult(topicDescriptions.asJava)
   }
+
+  private def listConsumerGroupOffsetsSpec: util.Map[String, ListConsumerGroupOffsetsSpec] = {
+    Collections.singletonMap(group, new ListConsumerGroupOffsetsSpec())
+  }
 }
diff --git a/core/src/test/scala/unit/kafka/server/RequestQuotaTest.scala b/core/src/test/scala/unit/kafka/server/RequestQuotaTest.scala
index 6d17e93782..82c19949e3 100644
--- a/core/src/test/scala/unit/kafka/server/RequestQuotaTest.scala
+++ b/core/src/test/scala/unit/kafka/server/RequestQuotaTest.scala
@@ -320,7 +320,7 @@ class RequestQuotaTest extends BaseRequestTest {
               )
           )
         case ApiKeys.OFFSET_FETCH =>
-          new OffsetFetchRequest.Builder("test-group", false, List(tp).asJava, false)
+          new OffsetFetchRequest.Builder(Map("test-group"-> List(tp).asJava).asJava, false, false)
 
         case ApiKeys.FIND_COORDINATOR =>
           new FindCoordinatorRequest.Builder(
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StoreChangelogReader.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StoreChangelogReader.java
index 02cfb0b49c..5240534ce7 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StoreChangelogReader.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StoreChangelogReader.java
@@ -18,6 +18,7 @@ package org.apache.kafka.streams.processor.internals;
 
 import org.apache.kafka.clients.admin.Admin;
 import org.apache.kafka.clients.admin.ListConsumerGroupOffsetsOptions;
+import org.apache.kafka.clients.admin.ListConsumerGroupOffsetsSpec;
 import org.apache.kafka.clients.admin.ListOffsetsOptions;
 import org.apache.kafka.clients.admin.OffsetSpec;
 import org.apache.kafka.clients.consumer.Consumer;
@@ -695,11 +696,12 @@ public class StoreChangelogReader implements ChangelogReader {
 
         try {
             // those which do not have a committed offset would default to 0
-            final ListConsumerGroupOffsetsOptions options = new ListConsumerGroupOffsetsOptions();
-            options.topicPartitions(new ArrayList<>(partitions));
-            options.requireStable(true);
-            final Map<TopicPartition, Long> committedOffsets = adminClient.listConsumerGroupOffsets(groupId, options)
-                    .partitionsToOffsetAndMetadata().get().entrySet()
+            final ListConsumerGroupOffsetsOptions options = new ListConsumerGroupOffsetsOptions()
+                    .requireStable(true);
+            final ListConsumerGroupOffsetsSpec spec = new ListConsumerGroupOffsetsSpec()
+                    .topicPartitions(new ArrayList<>(partitions));
+            final Map<TopicPartition, Long> committedOffsets = adminClient.listConsumerGroupOffsets(Collections.singletonMap(groupId, spec))
+                    .partitionsToOffsetAndMetadata(groupId).get().entrySet()
                     .stream()
                     .collect(Collectors.toMap(Map.Entry::getKey, e -> e.getValue() == null ? 0L : e.getValue().offset()));
 
diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StoreChangelogReaderTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StoreChangelogReaderTest.java
index 1961736620..fbc8d42326 100644
--- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StoreChangelogReaderTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StoreChangelogReaderTest.java
@@ -19,6 +19,7 @@ package org.apache.kafka.streams.processor.internals;
 import org.apache.kafka.clients.admin.AdminClientTestUtils;
 import org.apache.kafka.clients.admin.ListConsumerGroupOffsetsOptions;
 import org.apache.kafka.clients.admin.ListConsumerGroupOffsetsResult;
+import org.apache.kafka.clients.admin.ListConsumerGroupOffsetsSpec;
 import org.apache.kafka.clients.admin.ListOffsetsOptions;
 import org.apache.kafka.clients.admin.ListOffsetsResult;
 import org.apache.kafka.clients.admin.MockAdminClient;
@@ -648,12 +649,12 @@ public class StoreChangelogReaderTest extends EasyMockSupport {
         final AtomicBoolean functionCalled = new AtomicBoolean(false);
         final MockAdminClient adminClient = new MockAdminClient() {
             @Override
-            public synchronized ListConsumerGroupOffsetsResult listConsumerGroupOffsets(final String groupId, final ListConsumerGroupOffsetsOptions options) {
+            public synchronized ListConsumerGroupOffsetsResult listConsumerGroupOffsets(final Map<String, ListConsumerGroupOffsetsSpec> groupSpecs, final ListConsumerGroupOffsetsOptions options) {
                 if (functionCalled.get()) {
-                    return super.listConsumerGroupOffsets(groupId, options);
+                    return super.listConsumerGroupOffsets(groupSpecs, options);
                 } else {
                     functionCalled.set(true);
-                    return AdminClientTestUtils.listConsumerGroupOffsetsResult(new TimeoutException("KABOOM!"));
+                    return AdminClientTestUtils.listConsumerGroupOffsetsResult(groupSpecs.keySet().iterator().next(), new TimeoutException("KABOOM!"));
                 }
             }
         };
@@ -708,7 +709,7 @@ public class StoreChangelogReaderTest extends EasyMockSupport {
 
         final MockAdminClient adminClient = new MockAdminClient() {
             @Override
-            public synchronized ListConsumerGroupOffsetsResult listConsumerGroupOffsets(final String groupId, final ListConsumerGroupOffsetsOptions options) {
+            public synchronized ListConsumerGroupOffsetsResult listConsumerGroupOffsets(final Map<String, ListConsumerGroupOffsetsSpec> groupSpecs, final ListConsumerGroupOffsetsOptions options) {
                 throw kaboom;
             }
         };
@@ -790,7 +791,7 @@ public class StoreChangelogReaderTest extends EasyMockSupport {
 
         final MockAdminClient adminClient = new MockAdminClient() {
             @Override
-            public synchronized ListConsumerGroupOffsetsResult listConsumerGroupOffsets(final String groupId, final ListConsumerGroupOffsetsOptions options) {
+            public synchronized ListConsumerGroupOffsetsResult listConsumerGroupOffsets(final Map<String, ListConsumerGroupOffsetsSpec> groupSpecs, final ListConsumerGroupOffsetsOptions options) {
                 throw new AssertionError("Should not try to fetch committed offsets");
             }
         };