You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by rs...@apache.org on 2022/07/15 08:21:50 UTC

[kafka] branch trunk updated: MINOR: Fix options for old-style Admin.listConsumerGroupOffsets (#12406)

This is an automated email from the ASF dual-hosted git repository.

rsivaram pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new ddbc030036 MINOR: Fix options for old-style Admin.listConsumerGroupOffsets (#12406)
ddbc030036 is described below

commit ddbc0300365dd1d9a2fb2c73faef8c4cbec0b316
Author: Rajini Sivaram <ra...@googlemail.com>
AuthorDate: Fri Jul 15 09:21:35 2022 +0100

    MINOR: Fix options for old-style Admin.listConsumerGroupOffsets (#12406)
    
    Reviewers: David Jacot <dj...@confluent.io>
---
 .../java/org/apache/kafka/clients/admin/Admin.java |  7 +--
 .../kafka/clients/admin/KafkaAdminClientTest.java  | 52 +++++++++++++++-------
 2 files changed, 40 insertions(+), 19 deletions(-)

diff --git a/clients/src/main/java/org/apache/kafka/clients/admin/Admin.java b/clients/src/main/java/org/apache/kafka/clients/admin/Admin.java
index 0698d29702..1d469a6643 100644
--- a/clients/src/main/java/org/apache/kafka/clients/admin/Admin.java
+++ b/clients/src/main/java/org/apache/kafka/clients/admin/Admin.java
@@ -921,12 +921,13 @@ public interface Admin extends AutoCloseable {
      * @return The ListGroupOffsetsResult
      */
     default ListConsumerGroupOffsetsResult listConsumerGroupOffsets(String groupId, ListConsumerGroupOffsetsOptions options) {
-        ListConsumerGroupOffsetsOptions listOptions = new ListConsumerGroupOffsetsOptions()
-            .requireStable(options.requireStable());
         @SuppressWarnings("deprecation")
         ListConsumerGroupOffsetsSpec groupSpec = new ListConsumerGroupOffsetsSpec()
             .topicPartitions(options.topicPartitions());
-        return listConsumerGroupOffsets(Collections.singletonMap(groupId, groupSpec), listOptions);
+
+        // We can use the provided options with the batched API, which uses topic partitions from
+        // the group spec and ignores any topic partitions set in the options.
+        return listConsumerGroupOffsets(Collections.singletonMap(groupId, groupSpec), options);
     }
 
     /**
diff --git a/clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java b/clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java
index 3d285a45f7..de57813679 100644
--- a/clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java
+++ b/clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java
@@ -131,6 +131,8 @@ import org.apache.kafka.common.message.OffsetDeleteResponseData.OffsetDeleteResp
 import org.apache.kafka.common.message.OffsetDeleteResponseData.OffsetDeleteResponseTopic;
 import org.apache.kafka.common.message.OffsetDeleteResponseData.OffsetDeleteResponseTopicCollection;
 import org.apache.kafka.common.message.OffsetFetchRequestData;
+import org.apache.kafka.common.message.OffsetFetchRequestData.OffsetFetchRequestGroup;
+import org.apache.kafka.common.message.OffsetFetchRequestData.OffsetFetchRequestTopics;
 import org.apache.kafka.common.message.UnregisterBrokerResponseData;
 import org.apache.kafka.common.message.WriteTxnMarkersResponseData;
 import org.apache.kafka.common.protocol.ApiKeys;
@@ -3075,7 +3077,17 @@ public class KafkaAdminClientTest {
     }
 
     @Test
-    public void testListConsumerGroupOffsetsOptions() throws Exception {
+    public void testListConsumerGroupOffsetsOptionsWithUnbatchedApi() throws Exception {
+        verifyListConsumerGroupOffsetsOptions(false);
+    }
+
+    @Test
+    public void testListConsumerGroupOffsetsOptionsWithBatchedApi() throws Exception {
+        verifyListConsumerGroupOffsetsOptions(true);
+    }
+
+    @SuppressWarnings("deprecation")
+    private void verifyListConsumerGroupOffsetsOptions(boolean batchedApi) throws Exception {
         final Cluster cluster = mockCluster(3, 0);
         final Time time = new MockTime();
 
@@ -3085,24 +3097,32 @@ public class KafkaAdminClientTest {
 
             env.kafkaClient().prepareResponse(prepareFindCoordinatorResponse(Errors.NONE, env.cluster().controller()));
 
-            final TopicPartition tp1 = new TopicPartition("A", 0);
+            final List<TopicPartition> partitions = Collections.singletonList(new TopicPartition("A", 0));
             final ListConsumerGroupOffsetsOptions options = new ListConsumerGroupOffsetsOptions()
-                    .requireStable(true);
-            final ListConsumerGroupOffsetsSpec groupSpec = new ListConsumerGroupOffsetsSpec()
-                    .topicPartitions(Collections.singletonList(tp1));
-            env.adminClient().listConsumerGroupOffsets(Collections.singletonMap(GROUP_ID, groupSpec), options);
+                    .requireStable(true)
+                    .timeoutMs(300);
+            if (batchedApi) {
+                final ListConsumerGroupOffsetsSpec groupSpec = new ListConsumerGroupOffsetsSpec()
+                        .topicPartitions(partitions);
+                env.adminClient().listConsumerGroupOffsets(Collections.singletonMap(GROUP_ID, groupSpec), options);
+            } else {
+                env.adminClient().listConsumerGroupOffsets(GROUP_ID, options.topicPartitions(partitions));
+            }
 
             final MockClient mockClient = env.kafkaClient();
-            TestUtils.waitForCondition(() -> {
-                final ClientRequest clientRequest = mockClient.requests().peek();
-                if (clientRequest != null) {
-                    OffsetFetchRequestData data = ((OffsetFetchRequest.Builder) clientRequest.requestBuilder()).data;
-                    return data.requireStable() &&
-                        data.groups().get(0).topics().get(0).name().equals("A") &&
-                        data.groups().get(0).topics().get(0).partitionIndexes().equals(Collections.singletonList(0));
-                }
-                return false;
-            }, "Failed awaiting ListConsumerGroupOfsets request");
+            waitForRequest(mockClient, ApiKeys.OFFSET_FETCH);
+
+            ClientRequest clientRequest = mockClient.requests().peek();
+            assertNotNull(clientRequest);
+            assertEquals(300, clientRequest.requestTimeoutMs());
+            OffsetFetchRequestData data = ((OffsetFetchRequest.Builder) clientRequest.requestBuilder()).data;
+            assertTrue(data.requireStable());
+            assertEquals(Collections.singletonList(GROUP_ID),
+                    data.groups().stream().map(OffsetFetchRequestGroup::groupId).collect(Collectors.toList()));
+            assertEquals(Collections.singletonList("A"),
+                    data.groups().get(0).topics().stream().map(OffsetFetchRequestTopics::name).collect(Collectors.toList()));
+            assertEquals(Collections.singletonList(0),
+                    data.groups().get(0).topics().get(0).partitionIndexes());
         }
     }