You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by rs...@apache.org on 2022/07/15 08:21:50 UTC
[kafka] branch trunk updated: MINOR: Fix options for old-style Admin.listConsumerGroupOffsets (#12406)
This is an automated email from the ASF dual-hosted git repository.
rsivaram pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new ddbc030036 MINOR: Fix options for old-style Admin.listConsumerGroupOffsets (#12406)
ddbc030036 is described below
commit ddbc0300365dd1d9a2fb2c73faef8c4cbec0b316
Author: Rajini Sivaram <ra...@googlemail.com>
AuthorDate: Fri Jul 15 09:21:35 2022 +0100
MINOR: Fix options for old-style Admin.listConsumerGroupOffsets (#12406)
Reviewers: David Jacot <dj...@confluent.io>
---
.../java/org/apache/kafka/clients/admin/Admin.java | 7 +--
.../kafka/clients/admin/KafkaAdminClientTest.java | 52 +++++++++++++++-------
2 files changed, 40 insertions(+), 19 deletions(-)
diff --git a/clients/src/main/java/org/apache/kafka/clients/admin/Admin.java b/clients/src/main/java/org/apache/kafka/clients/admin/Admin.java
index 0698d29702..1d469a6643 100644
--- a/clients/src/main/java/org/apache/kafka/clients/admin/Admin.java
+++ b/clients/src/main/java/org/apache/kafka/clients/admin/Admin.java
@@ -921,12 +921,13 @@ public interface Admin extends AutoCloseable {
* @return The ListGroupOffsetsResult
*/
default ListConsumerGroupOffsetsResult listConsumerGroupOffsets(String groupId, ListConsumerGroupOffsetsOptions options) {
- ListConsumerGroupOffsetsOptions listOptions = new ListConsumerGroupOffsetsOptions()
- .requireStable(options.requireStable());
@SuppressWarnings("deprecation")
ListConsumerGroupOffsetsSpec groupSpec = new ListConsumerGroupOffsetsSpec()
.topicPartitions(options.topicPartitions());
- return listConsumerGroupOffsets(Collections.singletonMap(groupId, groupSpec), listOptions);
+
+ // We can use the provided options with the batched API, which uses topic partitions from
+ // the group spec and ignores any topic partitions set in the options.
+ return listConsumerGroupOffsets(Collections.singletonMap(groupId, groupSpec), options);
}
/**
diff --git a/clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java b/clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java
index 3d285a45f7..de57813679 100644
--- a/clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java
+++ b/clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java
@@ -131,6 +131,8 @@ import org.apache.kafka.common.message.OffsetDeleteResponseData.OffsetDeleteResp
import org.apache.kafka.common.message.OffsetDeleteResponseData.OffsetDeleteResponseTopic;
import org.apache.kafka.common.message.OffsetDeleteResponseData.OffsetDeleteResponseTopicCollection;
import org.apache.kafka.common.message.OffsetFetchRequestData;
+import org.apache.kafka.common.message.OffsetFetchRequestData.OffsetFetchRequestGroup;
+import org.apache.kafka.common.message.OffsetFetchRequestData.OffsetFetchRequestTopics;
import org.apache.kafka.common.message.UnregisterBrokerResponseData;
import org.apache.kafka.common.message.WriteTxnMarkersResponseData;
import org.apache.kafka.common.protocol.ApiKeys;
@@ -3075,7 +3077,17 @@ public class KafkaAdminClientTest {
}
@Test
- public void testListConsumerGroupOffsetsOptions() throws Exception {
+ public void testListConsumerGroupOffsetsOptionsWithUnbatchedApi() throws Exception {
+ verifyListConsumerGroupOffsetsOptions(false);
+ }
+
+ @Test
+ public void testListConsumerGroupOffsetsOptionsWithBatchedApi() throws Exception {
+ verifyListConsumerGroupOffsetsOptions(true);
+ }
+
+ @SuppressWarnings("deprecation")
+ private void verifyListConsumerGroupOffsetsOptions(boolean batchedApi) throws Exception {
final Cluster cluster = mockCluster(3, 0);
final Time time = new MockTime();
@@ -3085,24 +3097,32 @@ public class KafkaAdminClientTest {
env.kafkaClient().prepareResponse(prepareFindCoordinatorResponse(Errors.NONE, env.cluster().controller()));
- final TopicPartition tp1 = new TopicPartition("A", 0);
+ final List<TopicPartition> partitions = Collections.singletonList(new TopicPartition("A", 0));
final ListConsumerGroupOffsetsOptions options = new ListConsumerGroupOffsetsOptions()
- .requireStable(true);
- final ListConsumerGroupOffsetsSpec groupSpec = new ListConsumerGroupOffsetsSpec()
- .topicPartitions(Collections.singletonList(tp1));
- env.adminClient().listConsumerGroupOffsets(Collections.singletonMap(GROUP_ID, groupSpec), options);
+ .requireStable(true)
+ .timeoutMs(300);
+ if (batchedApi) {
+ final ListConsumerGroupOffsetsSpec groupSpec = new ListConsumerGroupOffsetsSpec()
+ .topicPartitions(partitions);
+ env.adminClient().listConsumerGroupOffsets(Collections.singletonMap(GROUP_ID, groupSpec), options);
+ } else {
+ env.adminClient().listConsumerGroupOffsets(GROUP_ID, options.topicPartitions(partitions));
+ }
final MockClient mockClient = env.kafkaClient();
- TestUtils.waitForCondition(() -> {
- final ClientRequest clientRequest = mockClient.requests().peek();
- if (clientRequest != null) {
- OffsetFetchRequestData data = ((OffsetFetchRequest.Builder) clientRequest.requestBuilder()).data;
- return data.requireStable() &&
- data.groups().get(0).topics().get(0).name().equals("A") &&
- data.groups().get(0).topics().get(0).partitionIndexes().equals(Collections.singletonList(0));
- }
- return false;
- }, "Failed awaiting ListConsumerGroupOfsets request");
+ waitForRequest(mockClient, ApiKeys.OFFSET_FETCH);
+
+ ClientRequest clientRequest = mockClient.requests().peek();
+ assertNotNull(clientRequest);
+ assertEquals(300, clientRequest.requestTimeoutMs());
+ OffsetFetchRequestData data = ((OffsetFetchRequest.Builder) clientRequest.requestBuilder()).data;
+ assertTrue(data.requireStable());
+ assertEquals(Collections.singletonList(GROUP_ID),
+ data.groups().stream().map(OffsetFetchRequestGroup::groupId).collect(Collectors.toList()));
+ assertEquals(Collections.singletonList("A"),
+ data.groups().get(0).topics().stream().map(OffsetFetchRequestTopics::name).collect(Collectors.toList()));
+ assertEquals(Collections.singletonList(0),
+ data.groups().get(0).topics().get(0).partitionIndexes());
}
}