You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by jg...@apache.org on 2019/12/04 21:59:42 UTC

[kafka] branch 2.3 updated: KAFKA-9251; Describing a non consumer group with the Admin API hangs forever (#7763)

This is an automated email from the ASF dual-hosted git repository.

jgus pushed a commit to branch 2.3
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/2.3 by this push:
     new bddc4c1  KAFKA-9251; Describing a non consumer group with the Admin API hangs forever (#7763)
bddc4c1 is described below

commit bddc4c109a94bda16265b573df71a6b2450a8bbd
Author: David Jacot <dj...@confluent.io>
AuthorDate: Wed Dec 4 22:33:34 2019 +0100

    KAFKA-9251; Describing a non consumer group with the Admin API hangs forever (#7763)
    
    If a non-consumer group is specified in `describeConsumerGroup`, the future will hang indefinitely because the future callback is never completed. This patch fixes the problem by completing the future exceptionally with an `IllegalArgumentException`.
    
    Reviewers: Stanislav Kozlovski <st...@outlook.com>, Jason Gustafson <ja...@confluent.io>
---
 .../kafka/clients/admin/KafkaAdminClient.java      |  4 +++
 .../kafka/clients/admin/KafkaAdminClientTest.java  | 37 ++++++++++++++++++++++
 2 files changed, 41 insertions(+)

diff --git a/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java b/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java
index 45f7da0..005ed6b 100644
--- a/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java
+++ b/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java
@@ -2608,6 +2608,10 @@ public class KafkaAdminClient extends AdminClient {
                                         fcResponse.node(),
                                         authorizedOperations);
                                 future.complete(consumerGroupDescription);
+                            } else {
+                                future.completeExceptionally(new IllegalArgumentException(
+                                        String.format("GroupId {} is not a consumer group ({}).",
+                                                groupId, protocolType)));
                             }
                         }
 
diff --git a/clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java b/clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java
index 02eda60..6bbd87a 100644
--- a/clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java
+++ b/clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java
@@ -1251,6 +1251,43 @@ public class KafkaAdminClientTest {
     }
 
     @Test
+    public void testDescribeNonConsumerGroups() throws Exception {
+        final HashMap<Integer, Node> nodes = new HashMap<>();
+        nodes.put(0, new Node(0, "localhost", 8121));
+
+        final Cluster cluster =
+            new Cluster(
+                "mockClusterId",
+                nodes.values(),
+                Collections.<PartitionInfo>emptyList(),
+                Collections.<String>emptySet(),
+                Collections.<String>emptySet(), nodes.get(0));
+
+        try (AdminClientUnitTestEnv env = new AdminClientUnitTestEnv(cluster)) {
+            env.kafkaClient().setNodeApiVersions(NodeApiVersions.create());
+
+            env.kafkaClient().prepareResponse(prepareFindCoordinatorResponse(Errors.NONE, env.cluster().controller()));
+
+            DescribeGroupsResponseData data = new DescribeGroupsResponseData();
+
+            data.groups().add(DescribeGroupsResponse.groupMetadata(
+                "group-0",
+                Errors.NONE,
+                "",
+                "non-consumer",
+                "",
+                asList(),
+                Collections.emptySet()));
+
+            env.kafkaClient().prepareResponse(new DescribeGroupsResponse(data));
+
+            final DescribeConsumerGroupsResult result = env.adminClient().describeConsumerGroups(singletonList("group-0"));
+
+            TestUtils.assertFutureError(result.describedGroups().get("group-0"), IllegalArgumentException.class);
+        }
+    }
+
+    @Test
     public void testDescribeConsumerGroupOffsets() throws Exception {
         final HashMap<Integer, Node> nodes = new HashMap<>();
         nodes.put(0, new Node(0, "localhost", 8121));