You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by da...@apache.org on 2023/11/17 12:48:09 UTC

(kafka) branch trunk updated: KAFKA-15849: Fix ListGroups API when runtime partition size is zero (#14785)

This is an automated email from the ASF dual-hosted git repository.

dajac pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new b1796ce6d2c KAFKA-15849: Fix ListGroups API when runtime partition size is zero (#14785)
b1796ce6d2c is described below

commit b1796ce6d2c04444a62393fbfd7c61811e001d67
Author: Dongnuo Lyu <13...@users.noreply.github.com>
AuthorDate: Fri Nov 17 07:48:02 2023 -0500

    KAFKA-15849: Fix ListGroups API when runtime partition size is zero (#14785)
    
    When the group coordinator does not host any __consumer_offsets partitions, the existing ListGroup implementation won't schedule any operation, thus a `new CompletableFuture<>()` is returned directly and never gets completed. This patch fixes the issue.
    
    Reviewers: David Jacot <dj...@confluent.io>
---
 .../coordinator/group/GroupCoordinatorService.java |  4 ++++
 .../group/GroupCoordinatorServiceTest.java         | 24 ++++++++++++++++++++++
 2 files changed, 28 insertions(+)

diff --git a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorService.java b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorService.java
index b092316bb31..b184ab9b83b 100644
--- a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorService.java
+++ b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorService.java
@@ -501,6 +501,10 @@ public class GroupCoordinatorService implements GroupCoordinator {
         final Set<TopicPartition> existingPartitionSet = runtime.partitions();
         final AtomicInteger cnt = new AtomicInteger(existingPartitionSet.size());
 
+        if (existingPartitionSet.isEmpty()) {
+            return CompletableFuture.completedFuture(new ListGroupsResponseData());
+        }
+
         for (TopicPartition tp : existingPartitionSet) {
             runtime.scheduleReadOperation(
                 "list-groups",
diff --git a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupCoordinatorServiceTest.java b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupCoordinatorServiceTest.java
index e3e13b82685..98174b07508 100644
--- a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupCoordinatorServiceTest.java
+++ b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupCoordinatorServiceTest.java
@@ -832,6 +832,30 @@ public class GroupCoordinatorServiceTest {
         assertEquals(Collections.emptyList(), listGroupsResponseData.groups());
     }
 
+    @Test
+    public void testListGroupsWithEmptyTopicPartitions() throws ExecutionException, InterruptedException {
+        CoordinatorRuntime<GroupCoordinatorShard, Record> runtime = mockRuntime();
+        GroupCoordinatorService service = new GroupCoordinatorService(
+            new LogContext(),
+            createConfig(),
+            runtime
+        );
+        int partitionCount = 0;
+        service.startup(() -> partitionCount);
+
+        ListGroupsRequestData request = new ListGroupsRequestData();
+
+        CompletableFuture<ListGroupsResponseData> future = service.listGroups(
+            requestContext(ApiKeys.LIST_GROUPS),
+            request
+        );
+
+        assertEquals(
+            new ListGroupsResponseData(),
+            future.get()
+        );
+    }
+
     @Test
     public void testListGroupsWhenNotStarted() throws ExecutionException, InterruptedException {
         CoordinatorRuntime<GroupCoordinatorShard, Record> runtime = mockRuntime();