You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by jg...@apache.org on 2019/12/04 21:42:53 UTC

[kafka] branch 2.4 updated: KAFKA-9251; Describing a non consumer group with the Admin API hangs forever (#7763)

This is an automated email from the ASF dual-hosted git repository.

jgus pushed a commit to branch 2.4
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/2.4 by this push:
     new 56a27c1  KAFKA-9251; Describing a non consumer group with the Admin API hangs forever (#7763)
56a27c1 is described below

commit 56a27c142a68ef32eccf075bcb99c04b226ece93
Author: David Jacot <dj...@confluent.io>
AuthorDate: Wed Dec 4 22:33:34 2019 +0100

    KAFKA-9251; Describing a non consumer group with the Admin API hangs forever (#7763)
    
    If a non-consumer group is specified in `describeConsumerGroup`, the future will hang indefinitely because the future callback is never completed. This patch fixes the problem by completing the future exceptionally with an `IllegalArgumentException`.
    
    Reviewers: Stanislav Kozlovski <st...@outlook.com>, Jason Gustafson <ja...@confluent.io>
---
 .../kafka/clients/admin/KafkaAdminClient.java      |  4 +++
 .../kafka/clients/admin/KafkaAdminClientTest.java  | 37 ++++++++++++++++++++++
 2 files changed, 41 insertions(+)

diff --git a/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java b/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java
index be4dd9e..344cef8 100644
--- a/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java
+++ b/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java
@@ -2807,6 +2807,10 @@ public class KafkaAdminClient extends AdminClient {
                             context.getNode().get(),
                             authorizedOperations);
                     context.getFuture().complete(consumerGroupDescription);
+                } else {
+                    context.getFuture().completeExceptionally(new IllegalArgumentException(
+                        String.format("GroupId {} is not a consumer group ({}).",
+                            context.getGroupId(), protocolType)));
                 }
             }
 
diff --git a/clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java b/clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java
index df28f00..a274d87 100644
--- a/clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java
+++ b/clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java
@@ -1475,6 +1475,43 @@ public class KafkaAdminClientTest {
     }
 
     @Test
+    public void testDescribeNonConsumerGroups() throws Exception {
+        final HashMap<Integer, Node> nodes = new HashMap<>();
+        nodes.put(0, new Node(0, "localhost", 8121));
+
+        final Cluster cluster =
+            new Cluster(
+                "mockClusterId",
+                nodes.values(),
+                Collections.<PartitionInfo>emptyList(),
+                Collections.<String>emptySet(),
+                Collections.<String>emptySet(), nodes.get(0));
+
+        try (AdminClientUnitTestEnv env = new AdminClientUnitTestEnv(cluster)) {
+            env.kafkaClient().setNodeApiVersions(NodeApiVersions.create());
+
+            env.kafkaClient().prepareResponse(prepareFindCoordinatorResponse(Errors.NONE, env.cluster().controller()));
+
+            DescribeGroupsResponseData data = new DescribeGroupsResponseData();
+
+            data.groups().add(DescribeGroupsResponse.groupMetadata(
+                "group-0",
+                Errors.NONE,
+                "",
+                "non-consumer",
+                "",
+                asList(),
+                Collections.emptySet()));
+
+            env.kafkaClient().prepareResponse(new DescribeGroupsResponse(data));
+
+            final DescribeConsumerGroupsResult result = env.adminClient().describeConsumerGroups(singletonList("group-0"));
+
+            TestUtils.assertFutureError(result.describedGroups().get("group-0"), IllegalArgumentException.class);
+        }
+    }
+
+    @Test
     public void testDescribeConsumerGroupOffsets() throws Exception {
         final HashMap<Integer, Node> nodes = new HashMap<>();
         nodes.put(0, new Node(0, "localhost", 8121));