You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by da...@apache.org on 2023/11/17 12:48:09 UTC
(kafka) branch trunk updated: KAFKA-15849: Fix ListGroups API when runtime partition size is zero (#14785)
This is an automated email from the ASF dual-hosted git repository.
dajac pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new b1796ce6d2c KAFKA-15849: Fix ListGroups API when runtime partition size is zero (#14785)
b1796ce6d2c is described below
commit b1796ce6d2c04444a62393fbfd7c61811e001d67
Author: Dongnuo Lyu <13...@users.noreply.github.com>
AuthorDate: Fri Nov 17 07:48:02 2023 -0500
KAFKA-15849: Fix ListGroups API when runtime partition size is zero (#14785)
When the group coordinator does not host any __consumer_offsets partitions, the existing ListGroup implementation won't schedule any operation, thus a `new CompletableFuture<>()` is returned directly and never gets completed. This patch fixes the issue.
Reviewers: David Jacot <dj...@confluent.io>
---
.../coordinator/group/GroupCoordinatorService.java | 4 ++++
.../group/GroupCoordinatorServiceTest.java | 24 ++++++++++++++++++++++
2 files changed, 28 insertions(+)
diff --git a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorService.java b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorService.java
index b092316bb31..b184ab9b83b 100644
--- a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorService.java
+++ b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorService.java
@@ -501,6 +501,10 @@ public class GroupCoordinatorService implements GroupCoordinator {
final Set<TopicPartition> existingPartitionSet = runtime.partitions();
final AtomicInteger cnt = new AtomicInteger(existingPartitionSet.size());
+ if (existingPartitionSet.isEmpty()) {
+ return CompletableFuture.completedFuture(new ListGroupsResponseData());
+ }
+
for (TopicPartition tp : existingPartitionSet) {
runtime.scheduleReadOperation(
"list-groups",
diff --git a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupCoordinatorServiceTest.java b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupCoordinatorServiceTest.java
index e3e13b82685..98174b07508 100644
--- a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupCoordinatorServiceTest.java
+++ b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupCoordinatorServiceTest.java
@@ -832,6 +832,30 @@ public class GroupCoordinatorServiceTest {
assertEquals(Collections.emptyList(), listGroupsResponseData.groups());
}
+ @Test
+ public void testListGroupsWithEmptyTopicPartitions() throws ExecutionException, InterruptedException {
+ CoordinatorRuntime<GroupCoordinatorShard, Record> runtime = mockRuntime();
+ GroupCoordinatorService service = new GroupCoordinatorService(
+ new LogContext(),
+ createConfig(),
+ runtime
+ );
+ int partitionCount = 0;
+ service.startup(() -> partitionCount);
+
+ ListGroupsRequestData request = new ListGroupsRequestData();
+
+ CompletableFuture<ListGroupsResponseData> future = service.listGroups(
+ requestContext(ApiKeys.LIST_GROUPS),
+ request
+ );
+
+ assertEquals(
+ new ListGroupsResponseData(),
+ future.get()
+ );
+ }
+
@Test
public void testListGroupsWhenNotStarted() throws ExecutionException, InterruptedException {
CoordinatorRuntime<GroupCoordinatorShard, Record> runtime = mockRuntime();