You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by jg...@apache.org on 2019/12/04 21:59:42 UTC
[kafka] branch 2.3 updated: KAFKA-9251;
Describing a non consumer group with the Admin API hangs forever
(#7763)
This is an automated email from the ASF dual-hosted git repository.
jgus pushed a commit to branch 2.3
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/2.3 by this push:
new bddc4c1 KAFKA-9251; Describing a non consumer group with the Admin API hangs forever (#7763)
bddc4c1 is described below
commit bddc4c109a94bda16265b573df71a6b2450a8bbd
Author: David Jacot <dj...@confluent.io>
AuthorDate: Wed Dec 4 22:33:34 2019 +0100
KAFKA-9251; Describing a non consumer group with the Admin API hangs forever (#7763)
If a non-consumer group is specified in `describeConsumerGroup`, the future will hang indefinitely because the future callback is never completed. This patch fixes the problem by completing the future exceptionally with an `IllegalArgumentException`.
Reviewers: Stanislav Kozlovski <st...@outlook.com>, Jason Gustafson <ja...@confluent.io>
---
.../kafka/clients/admin/KafkaAdminClient.java | 4 +++
.../kafka/clients/admin/KafkaAdminClientTest.java | 37 ++++++++++++++++++++++
2 files changed, 41 insertions(+)
diff --git a/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java b/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java
index 45f7da0..005ed6b 100644
--- a/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java
+++ b/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java
@@ -2608,6 +2608,10 @@ public class KafkaAdminClient extends AdminClient {
fcResponse.node(),
authorizedOperations);
future.complete(consumerGroupDescription);
+ } else {
+ future.completeExceptionally(new IllegalArgumentException(
+ String.format("GroupId {} is not a consumer group ({}).",
+ groupId, protocolType)));
}
}
diff --git a/clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java b/clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java
index 02eda60..6bbd87a 100644
--- a/clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java
+++ b/clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java
@@ -1251,6 +1251,43 @@ public class KafkaAdminClientTest {
}
@Test
+ public void testDescribeNonConsumerGroups() throws Exception {
+ final HashMap<Integer, Node> nodes = new HashMap<>();
+ nodes.put(0, new Node(0, "localhost", 8121));
+
+ final Cluster cluster =
+ new Cluster(
+ "mockClusterId",
+ nodes.values(),
+ Collections.<PartitionInfo>emptyList(),
+ Collections.<String>emptySet(),
+ Collections.<String>emptySet(), nodes.get(0));
+
+ try (AdminClientUnitTestEnv env = new AdminClientUnitTestEnv(cluster)) {
+ env.kafkaClient().setNodeApiVersions(NodeApiVersions.create());
+
+ env.kafkaClient().prepareResponse(prepareFindCoordinatorResponse(Errors.NONE, env.cluster().controller()));
+
+ DescribeGroupsResponseData data = new DescribeGroupsResponseData();
+
+ data.groups().add(DescribeGroupsResponse.groupMetadata(
+ "group-0",
+ Errors.NONE,
+ "",
+ "non-consumer",
+ "",
+ asList(),
+ Collections.emptySet()));
+
+ env.kafkaClient().prepareResponse(new DescribeGroupsResponse(data));
+
+ final DescribeConsumerGroupsResult result = env.adminClient().describeConsumerGroups(singletonList("group-0"));
+
+ TestUtils.assertFutureError(result.describedGroups().get("group-0"), IllegalArgumentException.class);
+ }
+ }
+
+ @Test
public void testDescribeConsumerGroupOffsets() throws Exception {
final HashMap<Integer, Node> nodes = new HashMap<>();
nodes.put(0, new Node(0, "localhost", 8121));