You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by rs...@apache.org on 2022/07/14 13:21:15 UTC
[kafka] branch 3.3 updated: KAFKA-13043: Implement Admin APIs for offsetFetch batching (#10964)
This is an automated email from the ASF dual-hosted git repository.
rsivaram pushed a commit to branch 3.3
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/3.3 by this push:
new 66000787c1 KAFKA-13043: Implement Admin APIs for offsetFetch batching (#10964)
66000787c1 is described below
commit 66000787c1146c6d08d88b9c564f5a000608f013
Author: Sanjana Kaundinya <sk...@gmail.com>
AuthorDate: Thu Jul 14 05:47:34 2022 -0700
KAFKA-13043: Implement Admin APIs for offsetFetch batching (#10964)
This implements the AdminAPI portion of KIP-709: https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=173084258. The request/response protocol changes were implemented in 3.0.0. A new batched API has been introduced to list consumer offsets for different groups. For brokers older than 3.0.0, separate requests are sent for each group.
Co-authored-by: Rajini Sivaram <ra...@googlemail.com>
Co-authored-by: David Jacot <dj...@confluent.io>
Reviewers: David Jacot <dj...@confluent.io>, Rajini Sivaram <ra...@googlemail.com>
---
.../java/org/apache/kafka/clients/admin/Admin.java | 36 ++-
.../kafka/clients/admin/KafkaAdminClient.java | 11 +-
.../admin/ListConsumerGroupOffsetsOptions.java | 14 +-
.../admin/ListConsumerGroupOffsetsResult.java | 56 +++-
.../admin/ListConsumerGroupOffsetsSpec.java | 79 ++++++
.../clients/admin/internals/AdminApiDriver.java | 3 +-
.../admin/internals/CoordinatorStrategy.java | 4 +
.../internals/ListConsumerGroupOffsetsHandler.java | 128 +++++----
.../kafka/common/requests/OffsetFetchResponse.java | 10 +-
.../kafka/clients/admin/AdminClientTestUtils.java | 12 +-
.../kafka/clients/admin/KafkaAdminClientTest.java | 206 ++++++++++++--
.../kafka/clients/admin/MockAdminClient.java | 16 +-
.../ListConsumerGroupOffsetsHandlerTest.java | 308 +++++++++++++++++++--
.../kafka/clients/consumer/KafkaConsumerTest.java | 6 +-
.../internals/ConsumerCoordinatorTest.java | 26 +-
.../scala/kafka/admin/ConsumerGroupCommand.scala | 8 +-
.../kafka/admin/ConsumerGroupServiceTest.scala | 22 +-
.../scala/unit/kafka/server/RequestQuotaTest.scala | 2 +-
.../processor/internals/StoreChangelogReader.java | 12 +-
.../internals/StoreChangelogReaderTest.java | 11 +-
20 files changed, 813 insertions(+), 157 deletions(-)
diff --git a/clients/src/main/java/org/apache/kafka/clients/admin/Admin.java b/clients/src/main/java/org/apache/kafka/clients/admin/Admin.java
index fdacc09db8..0698d29702 100644
--- a/clients/src/main/java/org/apache/kafka/clients/admin/Admin.java
+++ b/clients/src/main/java/org/apache/kafka/clients/admin/Admin.java
@@ -36,6 +36,7 @@ import org.apache.kafka.common.requests.LeaveGroupResponse;
import java.time.Duration;
import java.util.Collection;
+import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Optional;
@@ -919,12 +920,20 @@ public interface Admin extends AutoCloseable {
* @param options The options to use when listing the consumer group offsets.
* @return The ListGroupOffsetsResult
*/
- ListConsumerGroupOffsetsResult listConsumerGroupOffsets(String groupId, ListConsumerGroupOffsetsOptions options);
+ default ListConsumerGroupOffsetsResult listConsumerGroupOffsets(String groupId, ListConsumerGroupOffsetsOptions options) {
+ ListConsumerGroupOffsetsOptions listOptions = new ListConsumerGroupOffsetsOptions()
+ .requireStable(options.requireStable());
+ @SuppressWarnings("deprecation")
+ ListConsumerGroupOffsetsSpec groupSpec = new ListConsumerGroupOffsetsSpec()
+ .topicPartitions(options.topicPartitions());
+ return listConsumerGroupOffsets(Collections.singletonMap(groupId, groupSpec), listOptions);
+ }
/**
* List the consumer group offsets available in the cluster with the default options.
* <p>
- * This is a convenience method for {@link #listConsumerGroupOffsets(String, ListConsumerGroupOffsetsOptions)} with default options.
+ * This is a convenience method for {@link #listConsumerGroupOffsets(Map, ListConsumerGroupOffsetsOptions)}
+ * to list offsets of all partitions of one group with default options.
*
* @return The ListGroupOffsetsResult.
*/
@@ -932,6 +941,29 @@ public interface Admin extends AutoCloseable {
return listConsumerGroupOffsets(groupId, new ListConsumerGroupOffsetsOptions());
}
+ /**
+ * List the consumer group offsets available in the cluster for the specified consumer groups.
+ *
+ * @param groupSpecs Map of consumer group ids to a spec that specifies the topic partitions of the group to list offsets for.
+ *
+ * @param options The options to use when listing the consumer group offsets.
+ * @return The ListConsumerGroupOffsetsResult
+ */
+ ListConsumerGroupOffsetsResult listConsumerGroupOffsets(Map<String, ListConsumerGroupOffsetsSpec> groupSpecs, ListConsumerGroupOffsetsOptions options);
+
+ /**
+ * List the consumer group offsets available in the cluster for the specified groups with the default options.
+ * <p>
+ * This is a convenience method for
+ * {@link #listConsumerGroupOffsets(Map, ListConsumerGroupOffsetsOptions)} with default options.
+ *
+ * @param groupSpecs Map of consumer group ids to a spec that specifies the topic partitions of the group to list offsets for.
+ * @return The ListConsumerGroupOffsetsResult.
+ */
+ default ListConsumerGroupOffsetsResult listConsumerGroupOffsets(Map<String, ListConsumerGroupOffsetsSpec> groupSpecs) {
+ return listConsumerGroupOffsets(groupSpecs, new ListConsumerGroupOffsetsOptions());
+ }
+
/**
* Delete consumer groups from the cluster.
*
diff --git a/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java b/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java
index 2b2642e351..41eb27a1dd 100644
--- a/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java
+++ b/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java
@@ -3401,13 +3401,14 @@ public class KafkaAdminClient extends AdminClient {
}
@Override
- public ListConsumerGroupOffsetsResult listConsumerGroupOffsets(final String groupId,
- final ListConsumerGroupOffsetsOptions options) {
+ public ListConsumerGroupOffsetsResult listConsumerGroupOffsets(Map<String, ListConsumerGroupOffsetsSpec> groupSpecs,
+ ListConsumerGroupOffsetsOptions options) {
SimpleAdminApiFuture<CoordinatorKey, Map<TopicPartition, OffsetAndMetadata>> future =
- ListConsumerGroupOffsetsHandler.newFuture(groupId);
- ListConsumerGroupOffsetsHandler handler = new ListConsumerGroupOffsetsHandler(groupId, options.topicPartitions(), options.requireStable(), logContext);
+ ListConsumerGroupOffsetsHandler.newFuture(groupSpecs.keySet());
+ ListConsumerGroupOffsetsHandler handler =
+ new ListConsumerGroupOffsetsHandler(groupSpecs, options.requireStable(), logContext);
invokeDriver(handler, future, options.timeoutMs);
- return new ListConsumerGroupOffsetsResult(future.get(CoordinatorKey.byGroupId(groupId)));
+ return new ListConsumerGroupOffsetsResult(future.all());
}
@Override
diff --git a/clients/src/main/java/org/apache/kafka/clients/admin/ListConsumerGroupOffsetsOptions.java b/clients/src/main/java/org/apache/kafka/clients/admin/ListConsumerGroupOffsetsOptions.java
index 292a47ef39..44d3a40732 100644
--- a/clients/src/main/java/org/apache/kafka/clients/admin/ListConsumerGroupOffsetsOptions.java
+++ b/clients/src/main/java/org/apache/kafka/clients/admin/ListConsumerGroupOffsetsOptions.java
@@ -23,23 +23,28 @@ import org.apache.kafka.common.annotation.InterfaceStability;
import java.util.List;
/**
- * Options for {@link Admin#listConsumerGroupOffsets(String)}.
+ * Options for {@link Admin#listConsumerGroupOffsets(java.util.Map)} and {@link Admin#listConsumerGroupOffsets(String)}.
* <p>
* The API of this class is evolving, see {@link Admin} for details.
*/
@InterfaceStability.Evolving
public class ListConsumerGroupOffsetsOptions extends AbstractOptions<ListConsumerGroupOffsetsOptions> {
- private List<TopicPartition> topicPartitions = null;
+ private List<TopicPartition> topicPartitions;
private boolean requireStable = false;
/**
* Set the topic partitions to list as part of the result.
* {@code null} includes all topic partitions.
+ * <p>
+ * @deprecated Since 3.3.
+ * Use {@link Admin#listConsumerGroupOffsets(java.util.Map, ListConsumerGroupOffsetsOptions)}
+ * to specify topic partitions.
*
* @param topicPartitions List of topic partitions to include
* @return This ListGroupOffsetsOptions
*/
+ @Deprecated
public ListConsumerGroupOffsetsOptions topicPartitions(List<TopicPartition> topicPartitions) {
this.topicPartitions = topicPartitions;
return this;
@@ -55,7 +60,12 @@ public class ListConsumerGroupOffsetsOptions extends AbstractOptions<ListConsume
/**
* Returns a list of topic partitions to add as part of the result.
+ * <p>
+ * @deprecated Since 3.3.
+ * Use {@link Admin#listConsumerGroupOffsets(java.util.Map, ListConsumerGroupOffsetsOptions)}
+ * to specify topic partitions.
*/
+ @Deprecated
public List<TopicPartition> topicPartitions() {
return topicPartitions;
}
diff --git a/clients/src/main/java/org/apache/kafka/clients/admin/ListConsumerGroupOffsetsResult.java b/clients/src/main/java/org/apache/kafka/clients/admin/ListConsumerGroupOffsetsResult.java
index 48f4531418..2136e33a40 100644
--- a/clients/src/main/java/org/apache/kafka/clients/admin/ListConsumerGroupOffsetsResult.java
+++ b/clients/src/main/java/org/apache/kafka/clients/admin/ListConsumerGroupOffsetsResult.java
@@ -17,25 +17,32 @@
package org.apache.kafka.clients.admin;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.concurrent.ExecutionException;
+import java.util.stream.Collectors;
+
+import org.apache.kafka.clients.admin.internals.CoordinatorKey;
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
import org.apache.kafka.common.KafkaFuture;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.annotation.InterfaceStability;
-import java.util.Map;
-
/**
- * The result of the {@link Admin#listConsumerGroupOffsets(String)} call.
+ * The result of the {@link Admin#listConsumerGroupOffsets(Map)} and
+ * {@link Admin#listConsumerGroupOffsets(String)} call.
* <p>
* The API of this class is evolving, see {@link Admin} for details.
*/
@InterfaceStability.Evolving
public class ListConsumerGroupOffsetsResult {
- final KafkaFuture<Map<TopicPartition, OffsetAndMetadata>> future;
+ final Map<String, KafkaFuture<Map<TopicPartition, OffsetAndMetadata>>> futures;
- ListConsumerGroupOffsetsResult(KafkaFuture<Map<TopicPartition, OffsetAndMetadata>> future) {
- this.future = future;
+ ListConsumerGroupOffsetsResult(final Map<CoordinatorKey, KafkaFuture<Map<TopicPartition, OffsetAndMetadata>>> futures) {
+ this.futures = futures.entrySet().stream()
+ .collect(Collectors.toMap(e -> e.getKey().idValue, Entry::getValue));
}
/**
@@ -43,7 +50,42 @@ public class ListConsumerGroupOffsetsResult {
* If the group does not have a committed offset for this partition, the corresponding value in the returned map will be null.
*/
public KafkaFuture<Map<TopicPartition, OffsetAndMetadata>> partitionsToOffsetAndMetadata() {
- return future;
+ if (futures.size() != 1) {
+ throw new IllegalStateException("Offsets from multiple consumer groups were requested. " +
+ "Use partitionsToOffsetAndMetadata(groupId) instead to get future for a specific group.");
+ }
+ return futures.values().iterator().next();
}
+ /**
+ * Return a future which yields a map of topic partitions to OffsetAndMetadata objects for
+ * the specified group. If the group doesn't have a committed offset for a specific
+ * partition, the corresponding value in the returned map will be null.
+ */
+ public KafkaFuture<Map<TopicPartition, OffsetAndMetadata>> partitionsToOffsetAndMetadata(String groupId) {
+ if (!futures.containsKey(groupId))
+ throw new IllegalArgumentException("Offsets for consumer group '" + groupId + "' were not requested.");
+ return futures.get(groupId);
+ }
+
+ /**
+ * Return a future which yields all Map<String, Map<TopicPartition, OffsetAndMetadata> objects,
+ * if requests for all the groups succeed.
+ */
+ public KafkaFuture<Map<String, Map<TopicPartition, OffsetAndMetadata>>> all() {
+ return KafkaFuture.allOf(futures.values().toArray(new KafkaFuture[0])).thenApply(
+ nil -> {
+ Map<String, Map<TopicPartition, OffsetAndMetadata>> listedConsumerGroupOffsets = new HashMap<>(futures.size());
+ futures.forEach((key, future) -> {
+ try {
+ listedConsumerGroupOffsets.put(key, future.get());
+ } catch (InterruptedException | ExecutionException e) {
+ // This should be unreachable, since the KafkaFuture#allOf already ensured
+ // that all of the futures completed successfully.
+ throw new RuntimeException(e);
+ }
+ });
+ return listedConsumerGroupOffsets;
+ });
+ }
}
diff --git a/clients/src/main/java/org/apache/kafka/clients/admin/ListConsumerGroupOffsetsSpec.java b/clients/src/main/java/org/apache/kafka/clients/admin/ListConsumerGroupOffsetsSpec.java
new file mode 100644
index 0000000000..83858e49c8
--- /dev/null
+++ b/clients/src/main/java/org/apache/kafka/clients/admin/ListConsumerGroupOffsetsSpec.java
@@ -0,0 +1,79 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.clients.admin;
+
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.annotation.InterfaceStability;
+
+import java.util.Collection;
+import java.util.Objects;
+
+/**
+ * Specification of consumer group offsets to list using {@link Admin#listConsumerGroupOffsets(java.util.Map)}.
+ *
+ * The API of this class is evolving, see {@link Admin} for details.
+ */
+@InterfaceStability.Evolving
+public class ListConsumerGroupOffsetsSpec {
+
+ private Collection<TopicPartition> topicPartitions;
+
+ /**
+ * Set the topic partitions whose offsets are to be listed for a consumer group.
+ * {@code null} includes all topic partitions.
+ *
+ * @param topicPartitions List of topic partitions to include
+ * @return This ListConsumerGroupOffsetSpec
+ */
+ public ListConsumerGroupOffsetsSpec topicPartitions(Collection<TopicPartition> topicPartitions) {
+ this.topicPartitions = topicPartitions;
+ return this;
+ }
+
+ /**
+ * Returns the topic partitions whose offsets are to be listed for a consumer group.
+ * {@code null} indicates that offsets of all partitions of the group are to be listed.
+ */
+ public Collection<TopicPartition> topicPartitions() {
+ return topicPartitions;
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) {
+ return true;
+ }
+ if (!(o instanceof ListConsumerGroupOffsetsSpec)) {
+ return false;
+ }
+ ListConsumerGroupOffsetsSpec that = (ListConsumerGroupOffsetsSpec) o;
+ return Objects.equals(topicPartitions, that.topicPartitions);
+ }
+
+ @Override
+ public int hashCode() {
+ return Objects.hash(topicPartitions);
+ }
+
+ @Override
+ public String toString() {
+ return "ListConsumerGroupOffsetsSpec(" +
+ "topicPartitions=" + topicPartitions +
+ ')';
+ }
+}
diff --git a/clients/src/main/java/org/apache/kafka/clients/admin/internals/AdminApiDriver.java b/clients/src/main/java/org/apache/kafka/clients/admin/internals/AdminApiDriver.java
index d00db4b18c..0e1b03d964 100644
--- a/clients/src/main/java/org/apache/kafka/clients/admin/internals/AdminApiDriver.java
+++ b/clients/src/main/java/org/apache/kafka/clients/admin/internals/AdminApiDriver.java
@@ -21,6 +21,7 @@ import org.apache.kafka.common.errors.DisconnectException;
import org.apache.kafka.common.requests.AbstractRequest;
import org.apache.kafka.common.requests.AbstractResponse;
import org.apache.kafka.common.requests.FindCoordinatorRequest.NoBatchedFindCoordinatorsException;
+import org.apache.kafka.common.requests.OffsetFetchRequest.NoBatchedOffsetFetchRequestException;
import org.apache.kafka.common.utils.LogContext;
import org.slf4j.Logger;
@@ -253,7 +254,7 @@ public class AdminApiDriver<K, V> {
.collect(Collectors.toSet());
retryLookup(keysToUnmap);
- } else if (t instanceof NoBatchedFindCoordinatorsException) {
+ } else if (t instanceof NoBatchedFindCoordinatorsException || t instanceof NoBatchedOffsetFetchRequestException) {
((CoordinatorStrategy) handler.lookupStrategy()).disableBatch();
Set<K> keysToUnmap = spec.keys.stream()
.filter(future.lookupKeys()::contains)
diff --git a/clients/src/main/java/org/apache/kafka/clients/admin/internals/CoordinatorStrategy.java b/clients/src/main/java/org/apache/kafka/clients/admin/internals/CoordinatorStrategy.java
index e6fc0d624a..02b68527c3 100644
--- a/clients/src/main/java/org/apache/kafka/clients/admin/internals/CoordinatorStrategy.java
+++ b/clients/src/main/java/org/apache/kafka/clients/admin/internals/CoordinatorStrategy.java
@@ -120,6 +120,10 @@ public class CoordinatorStrategy implements AdminApiLookupStrategy<CoordinatorKe
batch = false;
}
+ public boolean batch() {
+ return batch;
+ }
+
private CoordinatorKey requireSingletonAndType(Set<CoordinatorKey> keys) {
if (keys.size() != 1) {
throw new IllegalArgumentException("Unexpected size of key set: expected 1, but got " + keys.size());
diff --git a/clients/src/main/java/org/apache/kafka/clients/admin/internals/ListConsumerGroupOffsetsHandler.java b/clients/src/main/java/org/apache/kafka/clients/admin/internals/ListConsumerGroupOffsetsHandler.java
index 08648821f7..21c7d8d488 100644
--- a/clients/src/main/java/org/apache/kafka/clients/admin/internals/ListConsumerGroupOffsetsHandler.java
+++ b/clients/src/main/java/org/apache/kafka/clients/admin/internals/ListConsumerGroupOffsetsHandler.java
@@ -17,14 +17,16 @@
package org.apache.kafka.clients.admin.internals;
import java.util.ArrayList;
+import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
-import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
+import java.util.stream.Collectors;
+import org.apache.kafka.clients.admin.ListConsumerGroupOffsetsSpec;
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
import org.apache.kafka.common.Node;
import org.apache.kafka.common.TopicPartition;
@@ -36,39 +38,26 @@ import org.apache.kafka.common.requests.FindCoordinatorRequest.CoordinatorType;
import org.apache.kafka.common.utils.LogContext;
import org.slf4j.Logger;
-public class ListConsumerGroupOffsetsHandler extends AdminApiHandler.Batched<CoordinatorKey, Map<TopicPartition, OffsetAndMetadata>> {
+public class ListConsumerGroupOffsetsHandler implements AdminApiHandler<CoordinatorKey, Map<TopicPartition, OffsetAndMetadata>> {
- private final CoordinatorKey groupId;
- private final List<TopicPartition> partitions;
private final boolean requireStable;
+ private final Map<String, ListConsumerGroupOffsetsSpec> groupSpecs;
private final Logger log;
- private final AdminApiLookupStrategy<CoordinatorKey> lookupStrategy;
+ private final CoordinatorStrategy lookupStrategy;
public ListConsumerGroupOffsetsHandler(
- String groupId,
- List<TopicPartition> partitions,
- LogContext logContext
- ) {
- this(groupId, partitions, false, logContext);
- }
-
- public ListConsumerGroupOffsetsHandler(
- String groupId,
- List<TopicPartition> partitions,
+ Map<String, ListConsumerGroupOffsetsSpec> groupSpecs,
boolean requireStable,
LogContext logContext
) {
- this.groupId = CoordinatorKey.byGroupId(groupId);
- this.partitions = partitions;
- this.requireStable = requireStable;
this.log = logContext.logger(ListConsumerGroupOffsetsHandler.class);
this.lookupStrategy = new CoordinatorStrategy(CoordinatorType.GROUP, logContext);
+ this.groupSpecs = groupSpecs;
+ this.requireStable = requireStable;
}
- public static AdminApiFuture.SimpleAdminApiFuture<CoordinatorKey, Map<TopicPartition, OffsetAndMetadata>> newFuture(
- String groupId
- ) {
- return AdminApiFuture.forKeys(Collections.singleton(CoordinatorKey.byGroupId(groupId)));
+ public static AdminApiFuture.SimpleAdminApiFuture<CoordinatorKey, Map<TopicPartition, OffsetAndMetadata>> newFuture(Collection<String> groupIds) {
+ return AdminApiFuture.forKeys(coordinatorKeys(groupIds));
}
@Override
@@ -82,16 +71,45 @@ public class ListConsumerGroupOffsetsHandler extends AdminApiHandler.Batched<Coo
}
private void validateKeys(Set<CoordinatorKey> groupIds) {
- if (!groupIds.equals(Collections.singleton(groupId))) {
+ Set<CoordinatorKey> keys = coordinatorKeys(groupSpecs.keySet());
+ if (!keys.containsAll(groupIds)) {
throw new IllegalArgumentException("Received unexpected group ids " + groupIds +
- " (expected only " + Collections.singleton(groupId) + ")");
+ " (expected one of " + keys + ")");
}
}
+ private static Set<CoordinatorKey> coordinatorKeys(Collection<String> groupIds) {
+ return groupIds.stream()
+ .map(CoordinatorKey::byGroupId)
+ .collect(Collectors.toSet());
+ }
+
+ public OffsetFetchRequest.Builder buildBatchedRequest(Set<CoordinatorKey> groupIds) {
+ // Create a map that only contains the consumer groups owned by the coordinator.
+ Map<String, List<TopicPartition>> coordinatorGroupIdToTopicPartitions = new HashMap<>(groupIds.size());
+ groupIds.forEach(g -> {
+ ListConsumerGroupOffsetsSpec spec = groupSpecs.get(g.idValue);
+ List<TopicPartition> partitions = spec.topicPartitions() != null ? new ArrayList<>(spec.topicPartitions()) : null;
+ coordinatorGroupIdToTopicPartitions.put(g.idValue, partitions);
+ });
+
+ return new OffsetFetchRequest.Builder(coordinatorGroupIdToTopicPartitions, requireStable, false);
+ }
+
@Override
- public OffsetFetchRequest.Builder buildBatchedRequest(int coordinatorId, Set<CoordinatorKey> groupIds) {
+ public Collection<RequestAndKeys<CoordinatorKey>> buildRequest(int brokerId, Set<CoordinatorKey> groupIds) {
validateKeys(groupIds);
- return new OffsetFetchRequest.Builder(groupId.idValue, requireStable, partitions, false);
+
+ // When the OffsetFetchRequest fails with NoBatchedOffsetFetchRequestException, we completely disable
+ // the batching end-to-end, including the FindCoordinatorRequest.
+ if (lookupStrategy.batch()) {
+ return Collections.singletonList(new RequestAndKeys<>(buildBatchedRequest(groupIds), groupIds));
+ } else {
+ return groupIds.stream().map(groupId -> {
+ Set<CoordinatorKey> keys = Collections.singleton(groupId);
+ return new RequestAndKeys<>(buildBatchedRequest(keys), keys);
+ }).collect(Collectors.toList());
+ }
}
@Override
@@ -104,44 +122,46 @@ public class ListConsumerGroupOffsetsHandler extends AdminApiHandler.Batched<Coo
final OffsetFetchResponse response = (OffsetFetchResponse) abstractResponse;
- // the groupError will contain the group level error for v0-v8 OffsetFetchResponse
- Errors groupError = response.groupLevelError(groupId.idValue);
- if (groupError != Errors.NONE) {
- final Map<CoordinatorKey, Throwable> failed = new HashMap<>();
- final Set<CoordinatorKey> groupsToUnmap = new HashSet<>();
-
- handleGroupError(groupId, groupError, failed, groupsToUnmap);
-
- return new ApiResult<>(Collections.emptyMap(), failed, new ArrayList<>(groupsToUnmap));
- } else {
- final Map<TopicPartition, OffsetAndMetadata> groupOffsetsListing = new HashMap<>();
-
- response.partitionDataMap(groupId.idValue).forEach((topicPartition, partitionData) -> {
- final Errors error = partitionData.error;
- if (error == Errors.NONE) {
- final long offset = partitionData.offset;
- final String metadata = partitionData.metadata;
- final Optional<Integer> leaderEpoch = partitionData.leaderEpoch;
- // Negative offset indicates that the group has no committed offset for this partition
- if (offset < 0) {
- groupOffsetsListing.put(topicPartition, null);
+ Map<CoordinatorKey, Map<TopicPartition, OffsetAndMetadata>> completed = new HashMap<>();
+ Map<CoordinatorKey, Throwable> failed = new HashMap<>();
+ List<CoordinatorKey> unmapped = new ArrayList<>();
+ for (CoordinatorKey coordinatorKey : groupIds) {
+ String group = coordinatorKey.idValue;
+ if (response.groupHasError(group)) {
+ handleGroupError(CoordinatorKey.byGroupId(group), response.groupLevelError(group), failed, unmapped);
+ } else {
+ final Map<TopicPartition, OffsetAndMetadata> groupOffsetsListing = new HashMap<>();
+ Map<TopicPartition, OffsetFetchResponse.PartitionData> responseData = response.partitionDataMap(group);
+ for (Map.Entry<TopicPartition, OffsetFetchResponse.PartitionData> partitionEntry : responseData.entrySet()) {
+ final TopicPartition topicPartition = partitionEntry.getKey();
+ OffsetFetchResponse.PartitionData partitionData = partitionEntry.getValue();
+ final Errors error = partitionData.error;
+
+ if (error == Errors.NONE) {
+ final long offset = partitionData.offset;
+ final String metadata = partitionData.metadata;
+ final Optional<Integer> leaderEpoch = partitionData.leaderEpoch;
+ // Negative offset indicates that the group has no committed offset for this partition
+ if (offset < 0) {
+ groupOffsetsListing.put(topicPartition, null);
+ } else {
+ groupOffsetsListing.put(topicPartition, new OffsetAndMetadata(offset, leaderEpoch, metadata));
+ }
} else {
- groupOffsetsListing.put(topicPartition, new OffsetAndMetadata(offset, leaderEpoch, metadata));
+ log.warn("Skipping return offset for {} due to error {}.", topicPartition, error);
}
- } else {
- log.warn("Skipping return offset for {} due to error {}.", topicPartition, error);
}
- });
-
- return ApiResult.completed(groupId, groupOffsetsListing);
+ completed.put(CoordinatorKey.byGroupId(group), groupOffsetsListing);
+ }
}
+ return new ApiResult<>(completed, failed, unmapped);
}
private void handleGroupError(
CoordinatorKey groupId,
Errors error,
Map<CoordinatorKey, Throwable> failed,
- Set<CoordinatorKey> groupsToUnmap
+ List<CoordinatorKey> groupsToUnmap
) {
switch (error) {
case GROUP_AUTHORIZATION_FAILED:
diff --git a/clients/src/main/java/org/apache/kafka/common/requests/OffsetFetchResponse.java b/clients/src/main/java/org/apache/kafka/common/requests/OffsetFetchResponse.java
index 213182ec8c..4e25984668 100644
--- a/clients/src/main/java/org/apache/kafka/common/requests/OffsetFetchResponse.java
+++ b/clients/src/main/java/org/apache/kafka/common/requests/OffsetFetchResponse.java
@@ -173,8 +173,8 @@ public class OffsetFetchResponse extends AbstractResponse {
* @param responseData Fetched offset information grouped by topic-partition and by group
*/
public OffsetFetchResponse(int throttleTimeMs,
- Map<String, Errors> errors, Map<String,
- Map<TopicPartition, PartitionData>> responseData) {
+ Map<String, Errors> errors,
+ Map<String, Map<TopicPartition, PartitionData>> responseData) {
super(ApiKeys.OFFSET_FETCH);
List<OffsetFetchResponseGroup> groupList = new ArrayList<>();
for (Entry<String, Map<TopicPartition, PartitionData>> entry : responseData.entrySet()) {
@@ -250,7 +250,11 @@ public class OffsetFetchResponse extends AbstractResponse {
}
public boolean groupHasError(String groupId) {
- return groupLevelErrors.get(groupId) != Errors.NONE;
+ Errors error = groupLevelErrors.get(groupId);
+ if (error == null) {
+ return this.error != null && this.error != Errors.NONE;
+ }
+ return error != Errors.NONE;
}
public Errors error() {
diff --git a/clients/src/test/java/org/apache/kafka/clients/admin/AdminClientTestUtils.java b/clients/src/test/java/org/apache/kafka/clients/admin/AdminClientTestUtils.java
index 6f98a166b1..d8b9f427d6 100644
--- a/clients/src/test/java/org/apache/kafka/clients/admin/AdminClientTestUtils.java
+++ b/clients/src/test/java/org/apache/kafka/clients/admin/AdminClientTestUtils.java
@@ -24,6 +24,7 @@ import java.util.stream.Collectors;
import org.apache.kafka.clients.HostResolver;
import org.apache.kafka.clients.admin.CreateTopicsResult.TopicMetadataAndConfig;
import org.apache.kafka.clients.admin.internals.MetadataOperationContext;
+import org.apache.kafka.clients.admin.internals.CoordinatorKey;
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
import org.apache.kafka.common.KafkaException;
import org.apache.kafka.common.KafkaFuture;
@@ -104,14 +105,17 @@ public class AdminClientTestUtils {
.collect(Collectors.toMap(Map.Entry::getKey, e -> KafkaFuture.completedFuture(e.getValue()))));
}
- public static ListConsumerGroupOffsetsResult listConsumerGroupOffsetsResult(Map<TopicPartition, OffsetAndMetadata> offsets) {
- return new ListConsumerGroupOffsetsResult(KafkaFuture.completedFuture(offsets));
+ public static ListConsumerGroupOffsetsResult listConsumerGroupOffsetsResult(Map<String, Map<TopicPartition, OffsetAndMetadata>> offsets) {
+ Map<CoordinatorKey, KafkaFuture<Map<TopicPartition, OffsetAndMetadata>>> resultMap = offsets.entrySet().stream()
+ .collect(Collectors.toMap(e -> CoordinatorKey.byGroupId(e.getKey()),
+ e -> KafkaFutureImpl.completedFuture(e.getValue())));
+ return new ListConsumerGroupOffsetsResult(resultMap);
}
- public static ListConsumerGroupOffsetsResult listConsumerGroupOffsetsResult(KafkaException exception) {
+ public static ListConsumerGroupOffsetsResult listConsumerGroupOffsetsResult(String group, KafkaException exception) {
final KafkaFutureImpl<Map<TopicPartition, OffsetAndMetadata>> future = new KafkaFutureImpl<>();
future.completeExceptionally(exception);
- return new ListConsumerGroupOffsetsResult(future);
+ return new ListConsumerGroupOffsetsResult(Collections.singletonMap(CoordinatorKey.byGroupId(group), future));
}
/**
diff --git a/clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java b/clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java
index 61a2aaa00b..3d285a45f7 100644
--- a/clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java
+++ b/clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java
@@ -108,6 +108,7 @@ import org.apache.kafka.common.message.DescribeUserScramCredentialsResponseData;
import org.apache.kafka.common.message.DescribeUserScramCredentialsResponseData.CredentialInfo;
import org.apache.kafka.common.message.ElectLeadersResponseData.PartitionResult;
import org.apache.kafka.common.message.ElectLeadersResponseData.ReplicaElectionResult;
+import org.apache.kafka.common.message.FindCoordinatorRequestData;
import org.apache.kafka.common.message.FindCoordinatorResponseData;
import org.apache.kafka.common.message.IncrementalAlterConfigsResponseData;
import org.apache.kafka.common.message.IncrementalAlterConfigsResponseData.AlterConfigsResourceResponse;
@@ -192,6 +193,7 @@ import org.apache.kafka.common.requests.OffsetCommitResponse;
import org.apache.kafka.common.requests.OffsetDeleteResponse;
import org.apache.kafka.common.requests.OffsetFetchRequest;
import org.apache.kafka.common.requests.OffsetFetchResponse;
+import org.apache.kafka.common.requests.OffsetFetchResponse.PartitionData;
import org.apache.kafka.common.requests.RequestTestUtils;
import org.apache.kafka.common.requests.UnregisterBrokerResponse;
import org.apache.kafka.common.requests.UpdateFeaturesRequest;
@@ -224,6 +226,7 @@ import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
+import java.util.Map.Entry;
import java.util.Optional;
import java.util.OptionalInt;
import java.util.OptionalLong;
@@ -266,6 +269,7 @@ import static org.junit.jupiter.api.Assertions.fail;
public class KafkaAdminClientTest {
private static final Logger log = LoggerFactory.getLogger(KafkaAdminClientTest.class);
private static final String GROUP_ID = "group-0";
+ private static final int THROTTLE = 10;
@Test
public void testDefaultApiTimeoutAndRequestTimeoutConflicts() {
@@ -501,6 +505,21 @@ public class KafkaAdminClientTest {
return FindCoordinatorResponse.prepareOldResponse(error, node);
}
+ private static FindCoordinatorResponse prepareBatchedFindCoordinatorResponse(Errors error, Node node, Collection<String> groups) {
+ FindCoordinatorResponseData data = new FindCoordinatorResponseData();
+ List<FindCoordinatorResponseData.Coordinator> coordinators = groups.stream()
+ .map(group -> new FindCoordinatorResponseData.Coordinator()
+ .setErrorCode(error.code())
+ .setErrorMessage(error.message())
+ .setKey(group)
+ .setHost(node.host())
+ .setPort(node.port())
+ .setNodeId(node.id()))
+ .collect(Collectors.toList());
+ data.setCoordinators(coordinators);
+ return new FindCoordinatorResponse(data);
+ }
+
private static MetadataResponse prepareMetadataResponse(Cluster cluster, Errors error) {
List<MetadataResponseTopic> metadata = new ArrayList<>();
for (String topic : cluster.topics()) {
@@ -3067,9 +3086,11 @@ public class KafkaAdminClientTest {
env.kafkaClient().prepareResponse(prepareFindCoordinatorResponse(Errors.NONE, env.cluster().controller()));
final TopicPartition tp1 = new TopicPartition("A", 0);
- final ListConsumerGroupOffsetsOptions options = new ListConsumerGroupOffsetsOptions();
- options.topicPartitions(Collections.singletonList(tp1)).requireStable(true);
- final ListConsumerGroupOffsetsResult result = env.adminClient().listConsumerGroupOffsets(GROUP_ID, options);
+ final ListConsumerGroupOffsetsOptions options = new ListConsumerGroupOffsetsOptions()
+ .requireStable(true);
+ final ListConsumerGroupOffsetsSpec groupSpec = new ListConsumerGroupOffsetsSpec()
+ .topicPartitions(Collections.singletonList(tp1));
+ env.adminClient().listConsumerGroupOffsets(Collections.singletonMap(GROUP_ID, groupSpec), options);
final MockClient mockClient = env.kafkaClient();
TestUtils.waitForCondition(() -> {
@@ -3077,11 +3098,11 @@ public class KafkaAdminClientTest {
if (clientRequest != null) {
OffsetFetchRequestData data = ((OffsetFetchRequest.Builder) clientRequest.requestBuilder()).data;
return data.requireStable() &&
- data.topics().get(0).name().equals("A") &&
- data.topics().get(0).partitionIndexes().equals(Collections.singletonList(0));
+ data.groups().get(0).topics().get(0).name().equals("A") &&
+ data.groups().get(0).topics().get(0).partitionIndexes().equals(Collections.singletonList(0));
}
return false;
- }, "Failed awaiting ListConsumerGroupOffsets request");
+ }, "Failed awaiting ListConsumerGroupOfsets request");
}
}
@@ -3095,12 +3116,11 @@ public class KafkaAdminClientTest {
env.kafkaClient().setNodeApiVersions(NodeApiVersions.create());
env.kafkaClient().prepareResponse(prepareFindCoordinatorResponse(Errors.NONE, env.cluster().controller()));
- env.kafkaClient().prepareResponse(new OffsetFetchResponse(Errors.NOT_COORDINATOR, Collections.emptyMap()));
+ env.kafkaClient().prepareResponse(offsetFetchResponse(Errors.NOT_COORDINATOR, Collections.emptyMap()));
env.kafkaClient().prepareResponse(prepareFindCoordinatorResponse(Errors.NONE, env.cluster().controller()));
final ListConsumerGroupOffsetsResult result = env.adminClient().listConsumerGroupOffsets(GROUP_ID);
-
TestUtils.assertFutureError(result.partitionsToOffsetAndMetadata(), TimeoutException.class);
}
}
@@ -3124,16 +3144,16 @@ public class KafkaAdminClientTest {
mockClient.prepareResponse(body -> {
firstAttemptTime.set(time.milliseconds());
return true;
- }, new OffsetFetchResponse(Errors.NOT_COORDINATOR, Collections.emptyMap()));
+ }, offsetFetchResponse(Errors.NOT_COORDINATOR, Collections.emptyMap()));
mockClient.prepareResponse(prepareFindCoordinatorResponse(Errors.NONE, env.cluster().controller()));
mockClient.prepareResponse(body -> {
secondAttemptTime.set(time.milliseconds());
return true;
- }, new OffsetFetchResponse(Errors.NONE, Collections.emptyMap()));
+ }, offsetFetchResponse(Errors.NONE, Collections.emptyMap()));
- final KafkaFuture<Map<TopicPartition, OffsetAndMetadata>> future = env.adminClient().listConsumerGroupOffsets("group-0").partitionsToOffsetAndMetadata();
+ final KafkaFuture<Map<TopicPartition, OffsetAndMetadata>> future = env.adminClient().listConsumerGroupOffsets(GROUP_ID).partitionsToOffsetAndMetadata();
TestUtils.waitForCondition(() -> mockClient.numAwaitingResponses() == 1, "Failed awaiting ListConsumerGroupOffsets first request failure");
TestUtils.waitForCondition(() -> ((KafkaAdminClient) env.adminClient()).numPendingCalls() == 1, "Failed to add retry ListConsumerGroupOffsets call on first failure");
@@ -3157,7 +3177,8 @@ public class KafkaAdminClientTest {
prepareFindCoordinatorResponse(Errors.NONE, env.cluster().controller()));
env.kafkaClient().prepareResponse(
- new OffsetFetchResponse(Errors.COORDINATOR_LOAD_IN_PROGRESS, Collections.emptyMap()));
+ offsetFetchResponse(Errors.COORDINATOR_LOAD_IN_PROGRESS, Collections.emptyMap()));
+
/*
* We need to return two responses here, one for NOT_COORDINATOR call when calling list consumer offsets
* api using coordinator that has moved. This will retry whole operation. So we need to again respond with a
@@ -3166,19 +3187,19 @@ public class KafkaAdminClientTest {
* And the same reason for the following COORDINATOR_NOT_AVAILABLE error response
*/
env.kafkaClient().prepareResponse(
- new OffsetFetchResponse(Errors.NOT_COORDINATOR, Collections.emptyMap()));
+ offsetFetchResponse(Errors.NOT_COORDINATOR, Collections.emptyMap()));
env.kafkaClient().prepareResponse(
prepareFindCoordinatorResponse(Errors.NONE, env.cluster().controller()));
env.kafkaClient().prepareResponse(
- new OffsetFetchResponse(Errors.COORDINATOR_NOT_AVAILABLE, Collections.emptyMap()));
+ offsetFetchResponse(Errors.COORDINATOR_NOT_AVAILABLE, Collections.emptyMap()));
env.kafkaClient().prepareResponse(
prepareFindCoordinatorResponse(Errors.NONE, env.cluster().controller()));
env.kafkaClient().prepareResponse(
- new OffsetFetchResponse(Errors.NONE, Collections.emptyMap()));
+ offsetFetchResponse(Errors.NONE, Collections.emptyMap()));
final ListConsumerGroupOffsetsResult errorResult1 = env.adminClient().listConsumerGroupOffsets(GROUP_ID);
@@ -3199,8 +3220,7 @@ public class KafkaAdminClientTest {
env.kafkaClient().prepareResponse(
prepareFindCoordinatorResponse(Errors.NONE, env.cluster().controller()));
- env.kafkaClient().prepareResponse(
- new OffsetFetchResponse(error, Collections.emptyMap()));
+ env.kafkaClient().prepareResponse(offsetFetchResponse(error, Collections.emptyMap()));
ListConsumerGroupOffsetsResult errorResult = env.adminClient().listConsumerGroupOffsets(GROUP_ID);
@@ -3220,7 +3240,7 @@ public class KafkaAdminClientTest {
env.kafkaClient().prepareResponse(prepareFindCoordinatorResponse(Errors.NONE, env.cluster().controller()));
// Retriable errors should be retried
- env.kafkaClient().prepareResponse(new OffsetFetchResponse(Errors.COORDINATOR_LOAD_IN_PROGRESS, Collections.emptyMap()));
+ env.kafkaClient().prepareResponse(offsetFetchResponse(Errors.COORDINATOR_LOAD_IN_PROGRESS, Collections.emptyMap()));
/*
* We need to return two responses here, one for NOT_COORDINATOR error when calling list consumer group offsets
@@ -3229,10 +3249,10 @@ public class KafkaAdminClientTest {
*
* And the same reason for the following COORDINATOR_NOT_AVAILABLE error response
*/
- env.kafkaClient().prepareResponse(new OffsetFetchResponse(Errors.NOT_COORDINATOR, Collections.emptyMap()));
+ env.kafkaClient().prepareResponse(offsetFetchResponse(Errors.NOT_COORDINATOR, Collections.emptyMap()));
env.kafkaClient().prepareResponse(prepareFindCoordinatorResponse(Errors.NONE, env.cluster().controller()));
- env.kafkaClient().prepareResponse(new OffsetFetchResponse(Errors.COORDINATOR_NOT_AVAILABLE, Collections.emptyMap()));
+ env.kafkaClient().prepareResponse(offsetFetchResponse(Errors.COORDINATOR_NOT_AVAILABLE, Collections.emptyMap()));
env.kafkaClient().prepareResponse(prepareFindCoordinatorResponse(Errors.NONE, env.cluster().controller()));
TopicPartition myTopicPartition0 = new TopicPartition("my_topic", 0);
@@ -3249,7 +3269,7 @@ public class KafkaAdminClientTest {
Optional.empty(), "", Errors.NONE));
responseData.put(myTopicPartition3, new OffsetFetchResponse.PartitionData(OffsetFetchResponse.INVALID_OFFSET,
Optional.empty(), "", Errors.NONE));
- env.kafkaClient().prepareResponse(new OffsetFetchResponse(Errors.NONE, responseData));
+ env.kafkaClient().prepareResponse(offsetFetchResponse(Errors.NONE, responseData));
final ListConsumerGroupOffsetsResult result = env.adminClient().listConsumerGroupOffsets(GROUP_ID);
final Map<TopicPartition, OffsetAndMetadata> partitionToOffsetAndMetadata = result.partitionsToOffsetAndMetadata().get();
@@ -3263,6 +3283,144 @@ public class KafkaAdminClientTest {
}
}
+ @Test
+ public void testBatchedListConsumerGroupOffsets() throws Exception {
+ Cluster cluster = mockCluster(1, 0);
+ Time time = new MockTime();
+ Map<String, ListConsumerGroupOffsetsSpec> groupSpecs = batchedListConsumerGroupOffsetsSpec();
+
+ try (AdminClientUnitTestEnv env = new AdminClientUnitTestEnv(time, cluster, AdminClientConfig.RETRIES_CONFIG, "0")) {
+ env.kafkaClient().setNodeApiVersions(NodeApiVersions.create());
+ env.kafkaClient().prepareResponse(prepareBatchedFindCoordinatorResponse(Errors.NONE, env.cluster().controller(), groupSpecs.keySet()));
+
+ ListConsumerGroupOffsetsResult result = env.adminClient().listConsumerGroupOffsets(groupSpecs, new ListConsumerGroupOffsetsOptions());
+ sendOffsetFetchResponse(env.kafkaClient(), groupSpecs, true, Errors.NONE);
+
+ verifyListOffsetsForMultipleGroups(groupSpecs, result);
+ }
+ }
+
+ @Test
+ public void testBatchedListConsumerGroupOffsetsWithNoFindCoordinatorBatching() throws Exception {
+ Cluster cluster = mockCluster(1, 0);
+ Time time = new MockTime();
+ Map<String, ListConsumerGroupOffsetsSpec> groupSpecs = batchedListConsumerGroupOffsetsSpec();
+
+ ApiVersion findCoordinatorV3 = new ApiVersion()
+ .setApiKey(ApiKeys.FIND_COORDINATOR.id)
+ .setMinVersion((short) 0)
+ .setMaxVersion((short) 3);
+ ApiVersion offsetFetchV7 = new ApiVersion()
+ .setApiKey(ApiKeys.OFFSET_FETCH.id)
+ .setMinVersion((short) 0)
+ .setMaxVersion((short) 7);
+
+ try (AdminClientUnitTestEnv env = new AdminClientUnitTestEnv(time, cluster, AdminClientConfig.RETRY_BACKOFF_MS_CONFIG, "0")) {
+ env.kafkaClient().setNodeApiVersions(NodeApiVersions.create(Arrays.asList(findCoordinatorV3, offsetFetchV7)));
+ env.kafkaClient().prepareResponse(prepareOldFindCoordinatorResponse(Errors.COORDINATOR_NOT_AVAILABLE, Node.noNode()));
+ env.kafkaClient().prepareResponse(prepareOldFindCoordinatorResponse(Errors.NONE, env.cluster().controller()));
+ env.kafkaClient().prepareResponse(prepareOldFindCoordinatorResponse(Errors.NONE, env.cluster().controller()));
+
+ ListConsumerGroupOffsetsResult result = env.adminClient().listConsumerGroupOffsets(groupSpecs);
+
+ // Fail the first request in order to ensure that the group is not batched when retried.
+ sendOffsetFetchResponse(env.kafkaClient(), groupSpecs, false, Errors.COORDINATOR_LOAD_IN_PROGRESS);
+
+ sendOffsetFetchResponse(env.kafkaClient(), groupSpecs, false, Errors.NONE);
+ sendOffsetFetchResponse(env.kafkaClient(), groupSpecs, false, Errors.NONE);
+
+ verifyListOffsetsForMultipleGroups(groupSpecs, result);
+ }
+ }
+
+ @Test
+ public void testBatchedListConsumerGroupOffsetsWithNoOffsetFetchBatching() throws Exception {
+ Cluster cluster = mockCluster(1, 0);
+ Time time = new MockTime();
+ Map<String, ListConsumerGroupOffsetsSpec> groupSpecs = batchedListConsumerGroupOffsetsSpec();
+
+ ApiVersion offsetFetchV7 = new ApiVersion()
+ .setApiKey(ApiKeys.OFFSET_FETCH.id)
+ .setMinVersion((short) 0)
+ .setMaxVersion((short) 7);
+
+ try (AdminClientUnitTestEnv env = new AdminClientUnitTestEnv(time, cluster, AdminClientConfig.RETRY_BACKOFF_MS_CONFIG, "0")) {
+ env.kafkaClient().setNodeApiVersions(NodeApiVersions.create(Collections.singleton(offsetFetchV7)));
+ env.kafkaClient().prepareResponse(prepareBatchedFindCoordinatorResponse(Errors.NONE, env.cluster().controller(), groupSpecs.keySet()));
+ // Prepare a response to force client to attempt batched request creation that throws
+ // NoBatchedOffsetFetchRequestException. This triggers creation of non-batched requests.
+ env.kafkaClient().prepareResponse(offsetFetchResponse(Errors.COORDINATOR_NOT_AVAILABLE, Collections.emptyMap()));
+
+ ListConsumerGroupOffsetsResult result = env.adminClient().listConsumerGroupOffsets(groupSpecs);
+
+ // The request handler attempts both FindCoordinator and OffsetFetch requests. This seems
+ // ok since since we expect this scenario only during upgrades from versions < 3.0.0 where
+ // some upgraded brokers could handle batched FindCoordinator while non-upgraded coordinators
+ // rejected batched OffsetFetch requests.
+ sendFindCoordinatorResponse(env.kafkaClient(), env.cluster().controller());
+ sendFindCoordinatorResponse(env.kafkaClient(), env.cluster().controller());
+ sendOffsetFetchResponse(env.kafkaClient(), groupSpecs, false, Errors.NONE);
+ sendOffsetFetchResponse(env.kafkaClient(), groupSpecs, false, Errors.NONE);
+
+ verifyListOffsetsForMultipleGroups(groupSpecs, result);
+ }
+ }
+
+ private Map<String, ListConsumerGroupOffsetsSpec> batchedListConsumerGroupOffsetsSpec() {
+ Set<TopicPartition> groupAPartitions = Collections.singleton(new TopicPartition("A", 1));
+ Set<TopicPartition> groupBPartitions = Collections.singleton(new TopicPartition("B", 2));
+
+ ListConsumerGroupOffsetsSpec groupASpec = new ListConsumerGroupOffsetsSpec().topicPartitions(groupAPartitions);
+ ListConsumerGroupOffsetsSpec groupBSpec = new ListConsumerGroupOffsetsSpec().topicPartitions(groupBPartitions);
+ return Utils.mkMap(Utils.mkEntry("groupA", groupASpec), Utils.mkEntry("groupB", groupBSpec));
+ }
+
+ private void waitForRequest(MockClient mockClient, ApiKeys apiKeys) throws Exception {
+ TestUtils.waitForCondition(() -> {
+ ClientRequest clientRequest = mockClient.requests().peek();
+ return clientRequest != null && clientRequest.apiKey() == apiKeys;
+ }, "Failed awaiting " + apiKeys + " request");
+ }
+
+ private void sendFindCoordinatorResponse(MockClient mockClient, Node coordinator) throws Exception {
+ waitForRequest(mockClient, ApiKeys.FIND_COORDINATOR);
+
+ ClientRequest clientRequest = mockClient.requests().peek();
+ FindCoordinatorRequestData data = ((FindCoordinatorRequest.Builder) clientRequest.requestBuilder()).data();
+ mockClient.respond(prepareFindCoordinatorResponse(Errors.NONE, data.key(), coordinator));
+ }
+
+ private void sendOffsetFetchResponse(MockClient mockClient, Map<String, ListConsumerGroupOffsetsSpec> groupSpecs, boolean batched, Errors error) throws Exception {
+ waitForRequest(mockClient, ApiKeys.OFFSET_FETCH);
+
+ ClientRequest clientRequest = mockClient.requests().peek();
+ OffsetFetchRequestData data = ((OffsetFetchRequest.Builder) clientRequest.requestBuilder()).data;
+ Map<String, Map<TopicPartition, PartitionData>> results = new HashMap<>();
+ Map<String, Errors> errors = new HashMap<>();
+ data.groups().forEach(group -> {
+ Map<TopicPartition, PartitionData> partitionResults = new HashMap<>();
+ for (TopicPartition tp : groupSpecs.get(group.groupId()).topicPartitions()) {
+ partitionResults.put(tp, new PartitionData(10, Optional.empty(), "", Errors.NONE));
+ }
+ results.put(group.groupId(), partitionResults);
+ errors.put(group.groupId(), error);
+ });
+ if (!batched) {
+ assertEquals(1, data.groups().size());
+ mockClient.respond(new OffsetFetchResponse(THROTTLE, error, results.values().iterator().next()));
+ } else
+ mockClient.respond(new OffsetFetchResponse(THROTTLE, errors, results));
+ }
+
+ private void verifyListOffsetsForMultipleGroups(Map<String, ListConsumerGroupOffsetsSpec> groupSpecs,
+ ListConsumerGroupOffsetsResult result) throws Exception {
+ assertEquals(groupSpecs.size(), result.all().get(10, TimeUnit.SECONDS).size());
+ for (Map.Entry<String, ListConsumerGroupOffsetsSpec> entry : groupSpecs.entrySet()) {
+ assertEquals(entry.getValue().topicPartitions(),
+ result.partitionsToOffsetAndMetadata(entry.getKey()).get().keySet());
+ }
+ }
+
@Test
public void testDeleteConsumerGroupsNumRetries() throws Exception {
final Cluster cluster = mockCluster(3, 0);
@@ -6544,6 +6702,12 @@ public class KafkaAdminClientTest {
.setLogDir(logDir))));
}
+ private OffsetFetchResponse offsetFetchResponse(Errors error, Map<TopicPartition, PartitionData> responseData) {
+ return new OffsetFetchResponse(THROTTLE,
+ Collections.singletonMap(GROUP_ID, error),
+ Collections.singletonMap(GROUP_ID, responseData));
+ }
+
private static MemberDescription convertToMemberDescriptions(DescribedGroupMember member,
MemberAssignment assignment) {
return new MemberDescription(member.memberId(),
diff --git a/clients/src/test/java/org/apache/kafka/clients/admin/MockAdminClient.java b/clients/src/test/java/org/apache/kafka/clients/admin/MockAdminClient.java
index ef858c5003..8c31c7cf69 100644
--- a/clients/src/test/java/org/apache/kafka/clients/admin/MockAdminClient.java
+++ b/clients/src/test/java/org/apache/kafka/clients/admin/MockAdminClient.java
@@ -17,6 +17,7 @@
package org.apache.kafka.clients.admin;
import org.apache.kafka.clients.admin.DescribeReplicaLogDirsResult.ReplicaLogDirInfo;
+import org.apache.kafka.clients.admin.internals.CoordinatorKey;
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
import org.apache.kafka.common.ElectionType;
import org.apache.kafka.common.KafkaException;
@@ -583,24 +584,29 @@ public class MockAdminClient extends AdminClient {
}
@Override
- synchronized public ListConsumerGroupOffsetsResult listConsumerGroupOffsets(String groupId, ListConsumerGroupOffsetsOptions options) {
- // ignoring the groupId and assume one test would only work on one group only
+ synchronized public ListConsumerGroupOffsetsResult listConsumerGroupOffsets(Map<String, ListConsumerGroupOffsetsSpec> groupSpecs, ListConsumerGroupOffsetsOptions options) {
+ // ignoring the groups and assume one test would only work on one group only
+ if (groupSpecs.size() != 1)
+ throw new UnsupportedOperationException("Not implemented yet");
+
+ String group = groupSpecs.keySet().iterator().next();
+ Collection<TopicPartition> topicPartitions = groupSpecs.get(group).topicPartitions();
final KafkaFutureImpl<Map<TopicPartition, OffsetAndMetadata>> future = new KafkaFutureImpl<>();
if (listConsumerGroupOffsetsException != null) {
future.completeExceptionally(listConsumerGroupOffsetsException);
} else {
- if (options.topicPartitions().isEmpty()) {
+ if (topicPartitions.isEmpty()) {
future.complete(committedOffsets.entrySet().stream()
.collect(Collectors.toMap(Map.Entry::getKey, entry -> new OffsetAndMetadata(entry.getValue()))));
} else {
future.complete(committedOffsets.entrySet().stream()
- .filter(entry -> options.topicPartitions().contains(entry.getKey()))
+ .filter(entry -> topicPartitions.contains(entry.getKey()))
.collect(Collectors.toMap(Map.Entry::getKey, entry -> new OffsetAndMetadata(entry.getValue()))));
}
}
- return new ListConsumerGroupOffsetsResult(future);
+ return new ListConsumerGroupOffsetsResult(Collections.singletonMap(CoordinatorKey.byGroupId(group), future));
}
@Override
diff --git a/clients/src/test/java/org/apache/kafka/clients/admin/internals/ListConsumerGroupOffsetsHandlerTest.java b/clients/src/test/java/org/apache/kafka/clients/admin/internals/ListConsumerGroupOffsetsHandlerTest.java
index 27597ce035..95fabb3fc2 100644
--- a/clients/src/test/java/org/apache/kafka/clients/admin/internals/ListConsumerGroupOffsetsHandlerTest.java
+++ b/clients/src/test/java/org/apache/kafka/clients/admin/internals/ListConsumerGroupOffsetsHandlerTest.java
@@ -24,52 +24,140 @@ import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertTrue;
import java.util.Arrays;
+import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
+import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Optional;
+import java.util.Set;
+import java.util.function.Function;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
+
+import org.apache.kafka.clients.admin.ListConsumerGroupOffsetsSpec;
+import org.apache.kafka.clients.admin.internals.AdminApiHandler.RequestAndKeys;
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
import org.apache.kafka.common.Node;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.errors.GroupAuthorizationException;
import org.apache.kafka.common.errors.GroupIdNotFoundException;
import org.apache.kafka.common.errors.InvalidGroupIdException;
+import org.apache.kafka.common.message.OffsetFetchRequestData.OffsetFetchRequestGroup;
+import org.apache.kafka.common.message.OffsetFetchRequestData.OffsetFetchRequestTopics;
import org.apache.kafka.common.protocol.Errors;
import org.apache.kafka.common.requests.OffsetFetchRequest;
import org.apache.kafka.common.requests.OffsetFetchResponse;
import org.apache.kafka.common.requests.OffsetFetchResponse.PartitionData;
import org.apache.kafka.common.utils.LogContext;
+import org.apache.kafka.common.utils.Utils;
import org.junit.jupiter.api.Test;
public class ListConsumerGroupOffsetsHandlerTest {
private final LogContext logContext = new LogContext();
- private final String groupId = "group-id";
+ private final int throttleMs = 10;
+ private final String groupZero = "group0";
+ private final String groupOne = "group1";
+ private final String groupTwo = "group2";
+ private final List<String> groups = Arrays.asList(groupZero, groupOne, groupTwo);
private final TopicPartition t0p0 = new TopicPartition("t0", 0);
private final TopicPartition t0p1 = new TopicPartition("t0", 1);
private final TopicPartition t1p0 = new TopicPartition("t1", 0);
private final TopicPartition t1p1 = new TopicPartition("t1", 1);
- private final List<TopicPartition> tps = Arrays.asList(t0p0, t0p1, t1p0, t1p1);
+ private final TopicPartition t2p0 = new TopicPartition("t2", 0);
+ private final TopicPartition t2p1 = new TopicPartition("t2", 1);
+ private final TopicPartition t2p2 = new TopicPartition("t2", 2);
+ private final Map<String, ListConsumerGroupOffsetsSpec> singleRequestMap = Collections.singletonMap(groupZero,
+ new ListConsumerGroupOffsetsSpec().topicPartitions(Arrays.asList(t0p0, t0p1, t1p0, t1p1)));
+ private final Map<String, ListConsumerGroupOffsetsSpec> batchedRequestMap =
+ new HashMap<String, ListConsumerGroupOffsetsSpec>() {{
+ put(groupZero, new ListConsumerGroupOffsetsSpec().topicPartitions(singletonList(t0p0)));
+ put(groupOne, new ListConsumerGroupOffsetsSpec().topicPartitions(Arrays.asList(t0p0, t1p0, t1p1)));
+ put(groupTwo, new ListConsumerGroupOffsetsSpec().topicPartitions(Arrays.asList(t0p0, t1p0, t1p1, t2p0, t2p1, t2p2)));
+ }};
@Test
public void testBuildRequest() {
- ListConsumerGroupOffsetsHandler handler = new ListConsumerGroupOffsetsHandler(groupId, tps, logContext);
- OffsetFetchRequest request = handler.buildBatchedRequest(1, singleton(CoordinatorKey.byGroupId(groupId))).build();
- assertEquals(groupId, request.data().groups().get(0).groupId());
+ ListConsumerGroupOffsetsHandler handler =
+ new ListConsumerGroupOffsetsHandler(singleRequestMap, false, logContext);
+ OffsetFetchRequest request = handler.buildBatchedRequest(coordinatorKeys(groupZero)).build();
+ assertEquals(groupZero, request.data().groups().get(0).groupId());
assertEquals(2, request.data().groups().get(0).topics().size());
assertEquals(2, request.data().groups().get(0).topics().get(0).partitionIndexes().size());
assertEquals(2, request.data().groups().get(0).topics().get(1).partitionIndexes().size());
}
+ @Test
+ public void testBuildRequestWithMultipleGroups() {
+ Map<String, ListConsumerGroupOffsetsSpec> requestMap = new HashMap<>(this.batchedRequestMap);
+ String groupThree = "group3";
+ requestMap.put(groupThree, new ListConsumerGroupOffsetsSpec()
+ .topicPartitions(Arrays.asList(new TopicPartition("t3", 0), new TopicPartition("t3", 1))));
+
+ ListConsumerGroupOffsetsHandler handler = new ListConsumerGroupOffsetsHandler(requestMap, false, logContext);
+ OffsetFetchRequest request1 = handler.buildBatchedRequest(coordinatorKeys(groupZero, groupOne, groupTwo)).build();
+ assertEquals(Utils.mkSet(groupZero, groupOne, groupTwo), requestGroups(request1));
+
+ OffsetFetchRequest request2 = handler.buildBatchedRequest(coordinatorKeys(groupThree)).build();
+ assertEquals(Utils.mkSet(groupThree), requestGroups(request2));
+
+ Map<String, ListConsumerGroupOffsetsSpec> builtRequests = new HashMap<>();
+ request1.groupIdsToPartitions().forEach((group, partitions) ->
+ builtRequests.put(group, new ListConsumerGroupOffsetsSpec().topicPartitions(partitions)));
+ request2.groupIdsToPartitions().forEach((group, partitions) ->
+ builtRequests.put(group, new ListConsumerGroupOffsetsSpec().topicPartitions(partitions)));
+
+ assertEquals(requestMap, builtRequests);
+ Map<String, List<OffsetFetchRequestTopics>> groupIdsToTopics = request1.groupIdsToTopics();
+
+ assertEquals(3, groupIdsToTopics.size());
+ assertEquals(1, groupIdsToTopics.get(groupZero).size());
+ assertEquals(2, groupIdsToTopics.get(groupOne).size());
+ assertEquals(3, groupIdsToTopics.get(groupTwo).size());
+
+ assertEquals(1, groupIdsToTopics.get(groupZero).get(0).partitionIndexes().size());
+ assertEquals(1, groupIdsToTopics.get(groupOne).get(0).partitionIndexes().size());
+ assertEquals(2, groupIdsToTopics.get(groupOne).get(1).partitionIndexes().size());
+ assertEquals(1, groupIdsToTopics.get(groupTwo).get(0).partitionIndexes().size());
+ assertEquals(2, groupIdsToTopics.get(groupTwo).get(1).partitionIndexes().size());
+ assertEquals(3, groupIdsToTopics.get(groupTwo).get(2).partitionIndexes().size());
+
+ groupIdsToTopics = request2.groupIdsToTopics();
+ assertEquals(1, groupIdsToTopics.size());
+ assertEquals(1, groupIdsToTopics.get(groupThree).size());
+ assertEquals(2, groupIdsToTopics.get(groupThree).get(0).partitionIndexes().size());
+ }
+
+ @Test
+ public void testBuildRequestBatchGroups() {
+ ListConsumerGroupOffsetsHandler handler = new ListConsumerGroupOffsetsHandler(batchedRequestMap, false, logContext);
+ Collection<RequestAndKeys<CoordinatorKey>> requests = handler.buildRequest(1, coordinatorKeys(groupZero, groupOne, groupTwo));
+ assertEquals(1, requests.size());
+ assertEquals(Utils.mkSet(groupZero, groupOne, groupTwo), requestGroups((OffsetFetchRequest) requests.iterator().next().request.build()));
+ }
+
+ @Test
+ public void testBuildRequestDoesNotBatchGroup() {
+ ListConsumerGroupOffsetsHandler handler = new ListConsumerGroupOffsetsHandler(batchedRequestMap, false, logContext);
+ // Disable batching.
+ ((CoordinatorStrategy) handler.lookupStrategy()).disableBatch();
+ Collection<RequestAndKeys<CoordinatorKey>> requests = handler.buildRequest(1, coordinatorKeys(groupZero, groupOne, groupTwo));
+ assertEquals(3, requests.size());
+ assertEquals(
+ Utils.mkSet(Utils.mkSet(groupZero), Utils.mkSet(groupOne), Utils.mkSet(groupTwo)),
+ requests.stream().map(requestAndKey -> requestGroups((OffsetFetchRequest) requestAndKey.request.build())).collect(Collectors.toSet())
+ );
+ }
+
@Test
public void testSuccessfulHandleResponse() {
Map<TopicPartition, OffsetAndMetadata> expected = new HashMap<>();
assertCompleted(handleWithError(Errors.NONE), expected);
}
-
@Test
public void testSuccessfulHandleResponseWithOnePartitionError() {
Map<TopicPartition, OffsetAndMetadata> expectedResult = Collections.singletonMap(t0p0, new OffsetAndMetadata(10L));
@@ -80,17 +168,62 @@ public class ListConsumerGroupOffsetsHandlerTest {
assertCompleted(handleWithPartitionError(Errors.UNSTABLE_OFFSET_COMMIT), expectedResult);
}
+ @Test
+ public void testSuccessfulHandleResponseWithOnePartitionErrorWithMultipleGroups() {
+ Map<TopicPartition, OffsetAndMetadata> offsetAndMetadataMapZero =
+ Collections.singletonMap(t0p0, new OffsetAndMetadata(10L));
+ Map<TopicPartition, OffsetAndMetadata> offsetAndMetadataMapOne =
+ Collections.singletonMap(t1p1, new OffsetAndMetadata(10L));
+ Map<TopicPartition, OffsetAndMetadata> offsetAndMetadataMapTwo =
+ Collections.singletonMap(t2p2, new OffsetAndMetadata(10L));
+ Map<String, Map<TopicPartition, OffsetAndMetadata>> expectedResult =
+ new HashMap<String, Map<TopicPartition, OffsetAndMetadata>>() {{
+ put(groupZero, offsetAndMetadataMapZero);
+ put(groupOne, offsetAndMetadataMapOne);
+ put(groupTwo, offsetAndMetadataMapTwo);
+ }};
+
+ assertCompletedForMultipleGroups(
+ handleWithPartitionErrorMultipleGroups(Errors.UNKNOWN_TOPIC_OR_PARTITION), expectedResult);
+ assertCompletedForMultipleGroups(
+ handleWithPartitionErrorMultipleGroups(Errors.TOPIC_AUTHORIZATION_FAILED), expectedResult);
+ assertCompletedForMultipleGroups(
+ handleWithPartitionErrorMultipleGroups(Errors.UNSTABLE_OFFSET_COMMIT), expectedResult);
+ }
+
+ @Test
+ public void testSuccessfulHandleResponseWithMultipleGroups() {
+ Map<String, Map<TopicPartition, OffsetAndMetadata>> expected = new HashMap<>();
+ Map<String, Errors> errorMap = errorMap(groups, Errors.NONE);
+ assertCompletedForMultipleGroups(handleWithErrorWithMultipleGroups(errorMap, batchedRequestMap), expected);
+ }
+
@Test
public void testUnmappedHandleResponse() {
assertUnmapped(handleWithError(Errors.COORDINATOR_NOT_AVAILABLE));
assertUnmapped(handleWithError(Errors.NOT_COORDINATOR));
}
+ @Test
+ public void testUnmappedHandleResponseWithMultipleGroups() {
+ Map<String, Errors> errorMap = new HashMap<>();
+ errorMap.put(groupZero, Errors.NOT_COORDINATOR);
+ errorMap.put(groupOne, Errors.COORDINATOR_NOT_AVAILABLE);
+ errorMap.put(groupTwo, Errors.NOT_COORDINATOR);
+ assertUnmappedWithMultipleGroups(handleWithErrorWithMultipleGroups(errorMap, batchedRequestMap));
+ }
+
@Test
public void testRetriableHandleResponse() {
assertRetriable(handleWithError(Errors.COORDINATOR_LOAD_IN_PROGRESS));
}
+ @Test
+ public void testRetriableHandleResponseWithMultipleGroups() {
+ Map<String, Errors> errorMap = errorMap(groups, Errors.COORDINATOR_LOAD_IN_PROGRESS);
+ assertRetriable(handleWithErrorWithMultipleGroups(errorMap, batchedRequestMap));
+ }
+
@Test
public void testFailedHandleResponse() {
assertFailed(GroupAuthorizationException.class, handleWithError(Errors.GROUP_AUTHORIZATION_FAILED));
@@ -98,10 +231,50 @@ public class ListConsumerGroupOffsetsHandlerTest {
assertFailed(InvalidGroupIdException.class, handleWithError(Errors.INVALID_GROUP_ID));
}
+ @Test
+ public void testFailedHandleResponseWithMultipleGroups() {
+ Map<String, Errors> errorMap = new HashMap<>();
+ errorMap.put(groupZero, Errors.GROUP_AUTHORIZATION_FAILED);
+ errorMap.put(groupOne, Errors.GROUP_ID_NOT_FOUND);
+ errorMap.put(groupTwo, Errors.INVALID_GROUP_ID);
+ Map<String, Class<? extends Throwable>> groupToExceptionMap = new HashMap<>();
+ groupToExceptionMap.put(groupZero, GroupAuthorizationException.class);
+ groupToExceptionMap.put(groupOne, GroupIdNotFoundException.class);
+ groupToExceptionMap.put(groupTwo, InvalidGroupIdException.class);
+ assertFailedForMultipleGroups(groupToExceptionMap,
+ handleWithErrorWithMultipleGroups(errorMap, batchedRequestMap));
+ }
+
private OffsetFetchResponse buildResponse(Errors error) {
- Map<TopicPartition, PartitionData> responseData = new HashMap<>();
- OffsetFetchResponse response = new OffsetFetchResponse(error, responseData);
- return response;
+ return new OffsetFetchResponse(
+ throttleMs,
+ Collections.singletonMap(groupZero, error),
+ Collections.singletonMap(groupZero, new HashMap<>()));
+ }
+
+ private OffsetFetchResponse buildResponseWithMultipleGroups(
+ Map<String, Errors> errorMap,
+ Map<String, Map<TopicPartition, PartitionData>> responseData
+ ) {
+ return new OffsetFetchResponse(throttleMs, errorMap, responseData);
+ }
+
+ private AdminApiHandler.ApiResult<CoordinatorKey, Map<TopicPartition, OffsetAndMetadata>> handleWithErrorWithMultipleGroups(
+ Map<String, Errors> errorMap,
+ Map<String, ListConsumerGroupOffsetsSpec> groupSpecs
+ ) {
+ ListConsumerGroupOffsetsHandler handler = new ListConsumerGroupOffsetsHandler(groupSpecs, false, logContext);
+ Map<String, Map<TopicPartition, PartitionData>> responseData = new HashMap<>();
+ for (String group : errorMap.keySet()) {
+ responseData.put(group, new HashMap<>());
+ }
+ OffsetFetchResponse response = buildResponseWithMultipleGroups(errorMap, responseData);
+ return handler.handleResponse(new Node(1, "host", 1234),
+ errorMap.keySet()
+ .stream()
+ .map(CoordinatorKey::byGroupId)
+ .collect(Collectors.toSet()),
+ response);
}
private OffsetFetchResponse buildResponseWithPartitionError(Errors error) {
@@ -110,24 +283,68 @@ public class ListConsumerGroupOffsetsHandlerTest {
responseData.put(t0p0, new OffsetFetchResponse.PartitionData(10, Optional.empty(), "", Errors.NONE));
responseData.put(t0p1, new OffsetFetchResponse.PartitionData(10, Optional.empty(), "", error));
- OffsetFetchResponse response = new OffsetFetchResponse(Errors.NONE, responseData);
- return response;
+ return new OffsetFetchResponse(Errors.NONE, responseData);
+ }
+
+ private OffsetFetchResponse buildResponseWithPartitionErrorWithMultipleGroups(Errors error) {
+ Map<TopicPartition, PartitionData> responseDataZero = new HashMap<>();
+ responseDataZero.put(t0p0, new OffsetFetchResponse.PartitionData(10, Optional.empty(), "", Errors.NONE));
+
+ Map<TopicPartition, PartitionData> responseDataOne = new HashMap<>();
+ responseDataOne.put(t0p0, new OffsetFetchResponse.PartitionData(10, Optional.empty(), "", error));
+ responseDataOne.put(t1p0, new OffsetFetchResponse.PartitionData(10, Optional.empty(), "", error));
+ responseDataOne.put(t1p1, new OffsetFetchResponse.PartitionData(10, Optional.empty(), "", Errors.NONE));
+
+ Map<TopicPartition, PartitionData> responseDataTwo = new HashMap<>();
+ responseDataTwo.put(t0p0, new OffsetFetchResponse.PartitionData(10, Optional.empty(), "", error));
+ responseDataTwo.put(t1p0, new OffsetFetchResponse.PartitionData(10, Optional.empty(), "", error));
+ responseDataTwo.put(t1p1, new OffsetFetchResponse.PartitionData(10, Optional.empty(), "", error));
+ responseDataTwo.put(t2p0, new OffsetFetchResponse.PartitionData(10, Optional.empty(), "", error));
+ responseDataTwo.put(t2p1, new OffsetFetchResponse.PartitionData(10, Optional.empty(), "", error));
+ responseDataTwo.put(t2p2, new OffsetFetchResponse.PartitionData(10, Optional.empty(), "", Errors.NONE));
+
+ Map<String, Map<TopicPartition, PartitionData>> responseData =
+ new HashMap<String, Map<TopicPartition, PartitionData>>() {{
+ put(groupZero, responseDataZero);
+ put(groupOne, responseDataOne);
+ put(groupTwo, responseDataTwo);
+ }};
+
+ Map<String, Errors> errorMap = errorMap(groups, Errors.NONE);
+ return new OffsetFetchResponse(0, errorMap, responseData);
}
private AdminApiHandler.ApiResult<CoordinatorKey, Map<TopicPartition, OffsetAndMetadata>> handleWithPartitionError(
Errors error
) {
- ListConsumerGroupOffsetsHandler handler = new ListConsumerGroupOffsetsHandler(groupId, tps, logContext);
+ ListConsumerGroupOffsetsHandler handler = new ListConsumerGroupOffsetsHandler(singleRequestMap,
+ false, logContext);
OffsetFetchResponse response = buildResponseWithPartitionError(error);
- return handler.handleResponse(new Node(1, "host", 1234), singleton(CoordinatorKey.byGroupId(groupId)), response);
+ return handler.handleResponse(new Node(1, "host", 1234),
+ singleton(CoordinatorKey.byGroupId(groupZero)), response);
+ }
+
+ private AdminApiHandler.ApiResult<CoordinatorKey, Map<TopicPartition, OffsetAndMetadata>> handleWithPartitionErrorMultipleGroups(
+ Errors error
+ ) {
+ ListConsumerGroupOffsetsHandler handler = new ListConsumerGroupOffsetsHandler(
+ batchedRequestMap, false, logContext);
+ OffsetFetchResponse response = buildResponseWithPartitionErrorWithMultipleGroups(error);
+ return handler.handleResponse(
+ new Node(1, "host", 1234),
+ coordinatorKeys(groupZero, groupOne, groupTwo),
+ response);
}
private AdminApiHandler.ApiResult<CoordinatorKey, Map<TopicPartition, OffsetAndMetadata>> handleWithError(
Errors error
) {
- ListConsumerGroupOffsetsHandler handler = new ListConsumerGroupOffsetsHandler(groupId, tps, logContext);
+ ListConsumerGroupOffsetsHandler handler = new ListConsumerGroupOffsetsHandler(
+ singleRequestMap, false, logContext);
OffsetFetchResponse response = buildResponse(error);
- return handler.handleResponse(new Node(1, "host", 1234), singleton(CoordinatorKey.byGroupId(groupId)), response);
+ return handler.handleResponse(new Node(1, "host", 1234),
+ singleton(CoordinatorKey.byGroupId(groupZero)),
+ response);
}
private void assertUnmapped(
@@ -135,11 +352,19 @@ public class ListConsumerGroupOffsetsHandlerTest {
) {
assertEquals(emptySet(), result.completedKeys.keySet());
assertEquals(emptySet(), result.failedKeys.keySet());
- assertEquals(singletonList(CoordinatorKey.byGroupId(groupId)), result.unmappedKeys);
+ assertEquals(singletonList(CoordinatorKey.byGroupId(groupZero)), result.unmappedKeys);
+ }
+
+ private void assertUnmappedWithMultipleGroups(
+ AdminApiHandler.ApiResult<CoordinatorKey, Map<TopicPartition, OffsetAndMetadata>> result
+ ) {
+ assertEquals(emptySet(), result.completedKeys.keySet());
+ assertEquals(emptySet(), result.failedKeys.keySet());
+ assertEquals(coordinatorKeys(groupZero, groupOne, groupTwo), new HashSet<>(result.unmappedKeys));
}
private void assertRetriable(
- AdminApiHandler.ApiResult<CoordinatorKey, Map<TopicPartition, OffsetAndMetadata>> result
+ AdminApiHandler.ApiResult<CoordinatorKey, Map<TopicPartition, OffsetAndMetadata>> result
) {
assertEquals(emptySet(), result.completedKeys.keySet());
assertEquals(emptySet(), result.failedKeys.keySet());
@@ -150,21 +375,64 @@ public class ListConsumerGroupOffsetsHandlerTest {
AdminApiHandler.ApiResult<CoordinatorKey, Map<TopicPartition, OffsetAndMetadata>> result,
Map<TopicPartition, OffsetAndMetadata> expected
) {
- CoordinatorKey key = CoordinatorKey.byGroupId(groupId);
+ CoordinatorKey key = CoordinatorKey.byGroupId(groupZero);
assertEquals(emptySet(), result.failedKeys.keySet());
assertEquals(emptyList(), result.unmappedKeys);
assertEquals(singleton(key), result.completedKeys.keySet());
- assertEquals(expected, result.completedKeys.get(CoordinatorKey.byGroupId(groupId)));
+ assertEquals(expected, result.completedKeys.get(key));
+ }
+
+ private void assertCompletedForMultipleGroups(
+ AdminApiHandler.ApiResult<CoordinatorKey, Map<TopicPartition, OffsetAndMetadata>> result,
+ Map<String, Map<TopicPartition, OffsetAndMetadata>> expected
+ ) {
+ assertEquals(emptySet(), result.failedKeys.keySet());
+ assertEquals(emptyList(), result.unmappedKeys);
+ for (String g : expected.keySet()) {
+ CoordinatorKey key = CoordinatorKey.byGroupId(g);
+ assertTrue(result.completedKeys.containsKey(key));
+ assertEquals(expected.get(g), result.completedKeys.get(key));
+ }
}
private void assertFailed(
Class<? extends Throwable> expectedExceptionType,
AdminApiHandler.ApiResult<CoordinatorKey, Map<TopicPartition, OffsetAndMetadata>> result
) {
- CoordinatorKey key = CoordinatorKey.byGroupId(groupId);
+ CoordinatorKey key = CoordinatorKey.byGroupId(groupZero);
assertEquals(emptySet(), result.completedKeys.keySet());
assertEquals(emptyList(), result.unmappedKeys);
assertEquals(singleton(key), result.failedKeys.keySet());
assertTrue(expectedExceptionType.isInstance(result.failedKeys.get(key)));
}
+
+ private void assertFailedForMultipleGroups(
+ Map<String, Class<? extends Throwable>> groupToExceptionMap,
+ AdminApiHandler.ApiResult<CoordinatorKey, Map<TopicPartition, OffsetAndMetadata>> result
+ ) {
+ assertEquals(emptySet(), result.completedKeys.keySet());
+ assertEquals(emptyList(), result.unmappedKeys);
+ for (String g : groupToExceptionMap.keySet()) {
+ CoordinatorKey key = CoordinatorKey.byGroupId(g);
+ assertTrue(result.failedKeys.containsKey(key));
+ assertTrue(groupToExceptionMap.get(g).isInstance(result.failedKeys.get(key)));
+ }
+ }
+
+ private Set<CoordinatorKey> coordinatorKeys(String... groups) {
+ return Stream.of(groups)
+ .map(CoordinatorKey::byGroupId)
+ .collect(Collectors.toSet());
+ }
+
+ private Set<String> requestGroups(OffsetFetchRequest request) {
+ return request.data().groups()
+ .stream()
+ .map(OffsetFetchRequestGroup::groupId)
+ .collect(Collectors.toSet());
+ }
+
+ private Map<String, Errors> errorMap(Collection<String> groups, Errors error) {
+ return groups.stream().collect(Collectors.toMap(Function.identity(), unused -> error));
+ }
}
diff --git a/clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java b/clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java
index da3acf4983..e7f25345c6 100644
--- a/clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java
+++ b/clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java
@@ -172,6 +172,7 @@ public class KafkaConsumerTest {
// Set auto commit interval lower than heartbeat so we don't need to deal with
// a concurrent heartbeat request
private final int autoCommitIntervalMs = 500;
+ private final int throttleMs = 10;
private final String groupId = "mock-group";
private final String memberId = "memberId";
@@ -2434,7 +2435,10 @@ public class KafkaConsumerTest {
partitionData.put(entry.getKey(), new OffsetFetchResponse.PartitionData(entry.getValue(),
Optional.empty(), "", error));
}
- return new OffsetFetchResponse(Errors.NONE, partitionData);
+ return new OffsetFetchResponse(
+ throttleMs,
+ Collections.singletonMap(groupId, Errors.NONE),
+ Collections.singletonMap(groupId, partitionData));
}
private ListOffsetsResponse listOffsetsResponse(Map<TopicPartition, Long> offsets) {
diff --git a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinatorTest.java b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinatorTest.java
index c65d33176f..db483c6c0f 100644
--- a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinatorTest.java
+++ b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinatorTest.java
@@ -71,6 +71,7 @@ import org.apache.kafka.common.requests.MetadataResponse;
import org.apache.kafka.common.requests.OffsetCommitRequest;
import org.apache.kafka.common.requests.OffsetCommitResponse;
import org.apache.kafka.common.requests.OffsetFetchResponse;
+import org.apache.kafka.common.requests.OffsetFetchResponse.PartitionData;
import org.apache.kafka.common.requests.RequestTestUtils;
import org.apache.kafka.common.requests.SyncGroupRequest;
import org.apache.kafka.common.requests.SyncGroupResponse;
@@ -140,6 +141,7 @@ public abstract class ConsumerCoordinatorTest {
private final long retryBackoffMs = 100;
private final int autoCommitIntervalMs = 2000;
private final int requestTimeoutMs = 30000;
+ private final int throttleMs = 10;
private final MockTime time = new MockTime();
private GroupRebalanceConfig rebalanceConfig;
@@ -2872,7 +2874,7 @@ public abstract class ConsumerCoordinatorTest {
OffsetFetchResponse.PartitionData data = new OffsetFetchResponse.PartitionData(offset, leaderEpoch,
metadata, Errors.NONE);
- client.prepareResponse(new OffsetFetchResponse(Errors.NONE, singletonMap(t1p, data)));
+ client.prepareResponse(offsetFetchResponse(Errors.NONE, singletonMap(t1p, data)));
Map<TopicPartition, OffsetAndMetadata> fetchedOffsets = coordinator.fetchCommittedOffsets(singleton(t1p),
time.timer(Long.MAX_VALUE));
@@ -2888,7 +2890,7 @@ public abstract class ConsumerCoordinatorTest {
OffsetFetchResponse.PartitionData data = new OffsetFetchResponse.PartitionData(-1, Optional.empty(),
"", Errors.TOPIC_AUTHORIZATION_FAILED);
- client.prepareResponse(new OffsetFetchResponse(Errors.NONE, singletonMap(t1p, data)));
+ client.prepareResponse(offsetFetchResponse(Errors.NONE, singletonMap(t1p, data)));
TopicAuthorizationException exception = assertThrows(TopicAuthorizationException.class, () ->
coordinator.fetchCommittedOffsets(singleton(t1p), time.timer(Long.MAX_VALUE)));
@@ -2901,7 +2903,7 @@ public abstract class ConsumerCoordinatorTest {
coordinator.ensureCoordinatorReady(time.timer(Long.MAX_VALUE));
subscriptions.assignFromUser(singleton(t1p));
- client.prepareResponse(offsetFetchResponse(Errors.COORDINATOR_LOAD_IN_PROGRESS));
+ client.prepareResponse(offsetFetchResponse(Errors.COORDINATOR_LOAD_IN_PROGRESS, Collections.emptyMap()));
client.prepareResponse(offsetFetchResponse(t1p, Errors.NONE, "", 100L));
coordinator.refreshCommittedOffsetsIfNeeded(time.timer(Long.MAX_VALUE));
@@ -2916,7 +2918,7 @@ public abstract class ConsumerCoordinatorTest {
coordinator.ensureCoordinatorReady(time.timer(Long.MAX_VALUE));
subscriptions.assignFromUser(singleton(t1p));
- client.prepareResponse(offsetFetchResponse(Errors.GROUP_AUTHORIZATION_FAILED));
+ client.prepareResponse(offsetFetchResponse(Errors.GROUP_AUTHORIZATION_FAILED, Collections.emptyMap()));
try {
coordinator.refreshCommittedOffsetsIfNeeded(time.timer(Long.MAX_VALUE));
fail("Expected group authorization error");
@@ -2959,7 +2961,7 @@ public abstract class ConsumerCoordinatorTest {
coordinator.ensureCoordinatorReady(time.timer(Long.MAX_VALUE));
subscriptions.assignFromUser(singleton(t1p));
- client.prepareResponse(offsetFetchResponse(Errors.NOT_COORDINATOR));
+ client.prepareResponse(offsetFetchResponse(Errors.NOT_COORDINATOR, Collections.emptyMap()));
client.prepareResponse(groupCoordinatorResponse(node, Errors.NONE));
client.prepareResponse(offsetFetchResponse(t1p, Errors.NONE, "", 100L));
coordinator.refreshCommittedOffsetsIfNeeded(time.timer(Long.MAX_VALUE));
@@ -3435,7 +3437,11 @@ public abstract class ConsumerCoordinatorTest {
OffsetFetchResponse.PartitionData data = new OffsetFetchResponse.PartitionData(offset, leaderEpoch,
metadata, Errors.NONE);
- client.prepareResponse(new OffsetFetchResponse(Errors.NONE, singletonMap(t1p, data)));
+ if (upperVersion < 8) {
+ client.prepareResponse(new OffsetFetchResponse(Errors.NONE, singletonMap(t1p, data)));
+ } else {
+ client.prepareResponse(offsetFetchResponse(Errors.NONE, singletonMap(t1p, data)));
+ }
if (expectThrows) {
assertThrows(UnsupportedVersionException.class,
() -> coordinator.fetchCommittedOffsets(singleton(t1p), time.timer(Long.MAX_VALUE)));
@@ -3690,8 +3696,10 @@ public abstract class ConsumerCoordinatorTest {
return new OffsetCommitResponse(responseData);
}
- private OffsetFetchResponse offsetFetchResponse(Errors topLevelError) {
- return new OffsetFetchResponse(topLevelError, Collections.emptyMap());
+ private OffsetFetchResponse offsetFetchResponse(Errors error, Map<TopicPartition, PartitionData> responseData) {
+ return new OffsetFetchResponse(throttleMs,
+ singletonMap(groupId, error),
+ singletonMap(groupId, responseData));
}
private OffsetFetchResponse offsetFetchResponse(TopicPartition tp, Errors partitionLevelError, String metadata, long offset) {
@@ -3701,7 +3709,7 @@ public abstract class ConsumerCoordinatorTest {
private OffsetFetchResponse offsetFetchResponse(TopicPartition tp, Errors partitionLevelError, String metadata, long offset, Optional<Integer> epoch) {
OffsetFetchResponse.PartitionData data = new OffsetFetchResponse.PartitionData(offset,
epoch, metadata, partitionLevelError);
- return new OffsetFetchResponse(Errors.NONE, singletonMap(tp, data));
+ return offsetFetchResponse(Errors.NONE, singletonMap(tp, data));
}
private OffsetCommitCallback callback(final AtomicBoolean success) {
diff --git a/core/src/main/scala/kafka/admin/ConsumerGroupCommand.scala b/core/src/main/scala/kafka/admin/ConsumerGroupCommand.scala
index 47c1d173b3..d5aee881c9 100755
--- a/core/src/main/scala/kafka/admin/ConsumerGroupCommand.scala
+++ b/core/src/main/scala/kafka/admin/ConsumerGroupCommand.scala
@@ -18,7 +18,7 @@
package kafka.admin
import java.time.{Duration, Instant}
-import java.util.Properties
+import java.util.{Collections, Properties}
import com.fasterxml.jackson.dataformat.csv.CsvMapper
import com.fasterxml.jackson.module.scala.DefaultScalaModule
import kafka.utils._
@@ -753,9 +753,9 @@ object ConsumerGroupCommand extends Logging {
private def getCommittedOffsets(groupId: String): Map[TopicPartition, OffsetAndMetadata] = {
adminClient.listConsumerGroupOffsets(
- groupId,
- withTimeoutMs(new ListConsumerGroupOffsetsOptions)
- ).partitionsToOffsetAndMetadata.get.asScala
+ Collections.singletonMap(groupId, new ListConsumerGroupOffsetsSpec),
+ withTimeoutMs(new ListConsumerGroupOffsetsOptions())
+ ).partitionsToOffsetAndMetadata(groupId).get().asScala
}
type GroupMetadata = immutable.Map[String, immutable.Map[TopicPartition, OffsetAndMetadata]]
diff --git a/core/src/test/scala/unit/kafka/admin/ConsumerGroupServiceTest.scala b/core/src/test/scala/unit/kafka/admin/ConsumerGroupServiceTest.scala
index 76a3855a87..44b241a7ed 100644
--- a/core/src/test/scala/unit/kafka/admin/ConsumerGroupServiceTest.scala
+++ b/core/src/test/scala/unit/kafka/admin/ConsumerGroupServiceTest.scala
@@ -49,8 +49,8 @@ class ConsumerGroupServiceTest {
when(admin.describeConsumerGroups(ArgumentMatchers.eq(Collections.singletonList(group)), any()))
.thenReturn(describeGroupsResult(ConsumerGroupState.STABLE))
- when(admin.listConsumerGroupOffsets(ArgumentMatchers.eq(group), any()))
- .thenReturn(listGroupOffsetsResult)
+ when(admin.listConsumerGroupOffsets(ArgumentMatchers.eq(listConsumerGroupOffsetsSpec), any()))
+ .thenReturn(listGroupOffsetsResult(group))
when(admin.listOffsets(offsetsArgMatcher, any()))
.thenReturn(listOffsetsResult)
@@ -60,7 +60,7 @@ class ConsumerGroupServiceTest {
assertEquals(topicPartitions.size, assignments.get.size)
verify(admin, times(1)).describeConsumerGroups(ArgumentMatchers.eq(Collections.singletonList(group)), any())
- verify(admin, times(1)).listConsumerGroupOffsets(ArgumentMatchers.eq(group), any())
+ verify(admin, times(1)).listConsumerGroupOffsets(ArgumentMatchers.eq(listConsumerGroupOffsetsSpec), any())
verify(admin, times(1)).listOffsets(offsetsArgMatcher, any())
}
@@ -112,8 +112,10 @@ class ConsumerGroupServiceTest {
future.complete(consumerGroupDescription)
when(admin.describeConsumerGroups(ArgumentMatchers.eq(Collections.singletonList(group)), any()))
.thenReturn(new DescribeConsumerGroupsResult(Collections.singletonMap(group, future)))
- when(admin.listConsumerGroupOffsets(ArgumentMatchers.eq(group), any()))
- .thenReturn(AdminClientTestUtils.listConsumerGroupOffsetsResult(commitedOffsets))
+ when(admin.listConsumerGroupOffsets(ArgumentMatchers.eq(listConsumerGroupOffsetsSpec), any()))
+ .thenReturn(
+ AdminClientTestUtils.listConsumerGroupOffsetsResult(
+ Collections.singletonMap(group, commitedOffsets)))
when(admin.listOffsets(
ArgumentMatchers.argThat(offsetsArgMatcher(assignedTopicPartitions)),
any()
@@ -142,7 +144,7 @@ class ConsumerGroupServiceTest {
assertEquals(expectedOffsets, returnedOffsets)
verify(admin, times(1)).describeConsumerGroups(ArgumentMatchers.eq(Collections.singletonList(group)), any())
- verify(admin, times(1)).listConsumerGroupOffsets(ArgumentMatchers.eq(group), any())
+ verify(admin, times(1)).listConsumerGroupOffsets(ArgumentMatchers.eq(listConsumerGroupOffsetsSpec), any())
verify(admin, times(1)).listOffsets(ArgumentMatchers.argThat(offsetsArgMatcher(assignedTopicPartitions)), any())
verify(admin, times(1)).listOffsets(ArgumentMatchers.argThat(offsetsArgMatcher(unassignedTopicPartitions)), any())
}
@@ -192,9 +194,9 @@ class ConsumerGroupServiceTest {
new DescribeConsumerGroupsResult(Collections.singletonMap(group, future))
}
- private def listGroupOffsetsResult: ListConsumerGroupOffsetsResult = {
+ private def listGroupOffsetsResult(groupId: String): ListConsumerGroupOffsetsResult = {
val offsets = topicPartitions.map(_ -> new OffsetAndMetadata(100)).toMap.asJava
- AdminClientTestUtils.listConsumerGroupOffsetsResult(offsets)
+ AdminClientTestUtils.listConsumerGroupOffsetsResult(Map(groupId -> offsets).asJava)
}
private def offsetsArgMatcher: util.Map[TopicPartition, OffsetSpec] = {
@@ -217,4 +219,8 @@ class ConsumerGroupServiceTest {
}.toMap
AdminClientTestUtils.describeTopicsResult(topicDescriptions.asJava)
}
+
+ private def listConsumerGroupOffsetsSpec: util.Map[String, ListConsumerGroupOffsetsSpec] = {
+ Collections.singletonMap(group, new ListConsumerGroupOffsetsSpec())
+ }
}
diff --git a/core/src/test/scala/unit/kafka/server/RequestQuotaTest.scala b/core/src/test/scala/unit/kafka/server/RequestQuotaTest.scala
index 6d17e93782..82c19949e3 100644
--- a/core/src/test/scala/unit/kafka/server/RequestQuotaTest.scala
+++ b/core/src/test/scala/unit/kafka/server/RequestQuotaTest.scala
@@ -320,7 +320,7 @@ class RequestQuotaTest extends BaseRequestTest {
)
)
case ApiKeys.OFFSET_FETCH =>
- new OffsetFetchRequest.Builder("test-group", false, List(tp).asJava, false)
+ new OffsetFetchRequest.Builder(Map("test-group"-> List(tp).asJava).asJava, false, false)
case ApiKeys.FIND_COORDINATOR =>
new FindCoordinatorRequest.Builder(
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StoreChangelogReader.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StoreChangelogReader.java
index 02cfb0b49c..5240534ce7 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StoreChangelogReader.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StoreChangelogReader.java
@@ -18,6 +18,7 @@ package org.apache.kafka.streams.processor.internals;
import org.apache.kafka.clients.admin.Admin;
import org.apache.kafka.clients.admin.ListConsumerGroupOffsetsOptions;
+import org.apache.kafka.clients.admin.ListConsumerGroupOffsetsSpec;
import org.apache.kafka.clients.admin.ListOffsetsOptions;
import org.apache.kafka.clients.admin.OffsetSpec;
import org.apache.kafka.clients.consumer.Consumer;
@@ -695,11 +696,12 @@ public class StoreChangelogReader implements ChangelogReader {
try {
// those which do not have a committed offset would default to 0
- final ListConsumerGroupOffsetsOptions options = new ListConsumerGroupOffsetsOptions();
- options.topicPartitions(new ArrayList<>(partitions));
- options.requireStable(true);
- final Map<TopicPartition, Long> committedOffsets = adminClient.listConsumerGroupOffsets(groupId, options)
- .partitionsToOffsetAndMetadata().get().entrySet()
+ final ListConsumerGroupOffsetsOptions options = new ListConsumerGroupOffsetsOptions()
+ .requireStable(true);
+ final ListConsumerGroupOffsetsSpec spec = new ListConsumerGroupOffsetsSpec()
+ .topicPartitions(new ArrayList<>(partitions));
+ final Map<TopicPartition, Long> committedOffsets = adminClient.listConsumerGroupOffsets(Collections.singletonMap(groupId, spec))
+ .partitionsToOffsetAndMetadata(groupId).get().entrySet()
.stream()
.collect(Collectors.toMap(Map.Entry::getKey, e -> e.getValue() == null ? 0L : e.getValue().offset()));
diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StoreChangelogReaderTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StoreChangelogReaderTest.java
index 1961736620..fbc8d42326 100644
--- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StoreChangelogReaderTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StoreChangelogReaderTest.java
@@ -19,6 +19,7 @@ package org.apache.kafka.streams.processor.internals;
import org.apache.kafka.clients.admin.AdminClientTestUtils;
import org.apache.kafka.clients.admin.ListConsumerGroupOffsetsOptions;
import org.apache.kafka.clients.admin.ListConsumerGroupOffsetsResult;
+import org.apache.kafka.clients.admin.ListConsumerGroupOffsetsSpec;
import org.apache.kafka.clients.admin.ListOffsetsOptions;
import org.apache.kafka.clients.admin.ListOffsetsResult;
import org.apache.kafka.clients.admin.MockAdminClient;
@@ -648,12 +649,12 @@ public class StoreChangelogReaderTest extends EasyMockSupport {
final AtomicBoolean functionCalled = new AtomicBoolean(false);
final MockAdminClient adminClient = new MockAdminClient() {
@Override
- public synchronized ListConsumerGroupOffsetsResult listConsumerGroupOffsets(final String groupId, final ListConsumerGroupOffsetsOptions options) {
+ public synchronized ListConsumerGroupOffsetsResult listConsumerGroupOffsets(final Map<String, ListConsumerGroupOffsetsSpec> groupSpecs, final ListConsumerGroupOffsetsOptions options) {
if (functionCalled.get()) {
- return super.listConsumerGroupOffsets(groupId, options);
+ return super.listConsumerGroupOffsets(groupSpecs, options);
} else {
functionCalled.set(true);
- return AdminClientTestUtils.listConsumerGroupOffsetsResult(new TimeoutException("KABOOM!"));
+ return AdminClientTestUtils.listConsumerGroupOffsetsResult(groupSpecs.keySet().iterator().next(), new TimeoutException("KABOOM!"));
}
}
};
@@ -708,7 +709,7 @@ public class StoreChangelogReaderTest extends EasyMockSupport {
final MockAdminClient adminClient = new MockAdminClient() {
@Override
- public synchronized ListConsumerGroupOffsetsResult listConsumerGroupOffsets(final String groupId, final ListConsumerGroupOffsetsOptions options) {
+ public synchronized ListConsumerGroupOffsetsResult listConsumerGroupOffsets(final Map<String, ListConsumerGroupOffsetsSpec> groupSpecs, final ListConsumerGroupOffsetsOptions options) {
throw kaboom;
}
};
@@ -790,7 +791,7 @@ public class StoreChangelogReaderTest extends EasyMockSupport {
final MockAdminClient adminClient = new MockAdminClient() {
@Override
- public synchronized ListConsumerGroupOffsetsResult listConsumerGroupOffsets(final String groupId, final ListConsumerGroupOffsetsOptions options) {
+ public synchronized ListConsumerGroupOffsetsResult listConsumerGroupOffsets(final Map<String, ListConsumerGroupOffsetsSpec> groupSpecs, final ListConsumerGroupOffsetsOptions options) {
throw new AssertionError("Should not try to fetch committed offsets");
}
};