You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by gu...@apache.org on 2018/05/11 15:44:43 UTC

[kafka] branch trunk updated: KAFKA-5965: Remove Deprecated AdminClient from Streams Resetter Tool (#4968)

This is an automated email from the ASF dual-hosted git repository.

guozhang pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 6eb7cf1  KAFKA-5965: Remove Deprecated AdminClient from Streams Resetter Tool (#4968)
6eb7cf1 is described below

commit 6eb7cf1300fc0c411ffab93de041654bc10918bf
Author: fedosov-alexander <al...@yandex.ru>
AuthorDate: Fri May 11 18:44:27 2018 +0300

    KAFKA-5965: Remove Deprecated AdminClient from Streams Resetter Tool (#4968)
    
    Removed usage of deprecated AdminClient from StreamsResetter
    No additional tests are required.
    
    Matthias J. Sax <ma...@confluent.io>, Guozhang Wang <wa...@gmail.com>
---
 .../main/scala/kafka/tools/StreamsResetter.java    | 27 +++++++++++-----------
 1 file changed, 14 insertions(+), 13 deletions(-)

diff --git a/core/src/main/scala/kafka/tools/StreamsResetter.java b/core/src/main/scala/kafka/tools/StreamsResetter.java
index b0d5276..d7c4e43 100644
--- a/core/src/main/scala/kafka/tools/StreamsResetter.java
+++ b/core/src/main/scala/kafka/tools/StreamsResetter.java
@@ -23,8 +23,11 @@ import joptsimple.OptionSpec;
 import joptsimple.OptionSpecBuilder;
 import kafka.utils.CommandLineUtils;
 import org.apache.kafka.clients.admin.AdminClient;
+import org.apache.kafka.clients.admin.MemberDescription;
 import org.apache.kafka.clients.admin.DeleteTopicsResult;
 import org.apache.kafka.clients.admin.KafkaAdminClient;
+import org.apache.kafka.clients.admin.DescribeConsumerGroupsOptions;
+import org.apache.kafka.clients.admin.DescribeConsumerGroupsResult;
 import org.apache.kafka.clients.consumer.Consumer;
 import org.apache.kafka.clients.consumer.ConsumerConfig;
 import org.apache.kafka.clients.consumer.KafkaConsumer;
@@ -42,6 +45,7 @@ import javax.xml.datatype.Duration;
 import java.io.IOException;
 import java.text.ParseException;
 import java.text.SimpleDateFormat;
+import java.util.Arrays;
 import java.util.ArrayList;
 import java.util.Date;
 import java.util.HashMap;
@@ -51,6 +55,7 @@ import java.util.List;
 import java.util.Map;
 import java.util.Properties;
 import java.util.Set;
+import java.util.concurrent.ExecutionException;
 import java.util.concurrent.TimeUnit;
 
 /**
@@ -120,8 +125,8 @@ public class StreamsResetter {
             }
             properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, options.valueOf(bootstrapServerOption));
 
-            validateNoActiveConsumers(groupId, properties);
             kafkaAdminClient = (KafkaAdminClient) AdminClient.create(properties);
+            validateNoActiveConsumers(groupId, kafkaAdminClient);
 
             allTopics.clear();
             allTopics.addAll(kafkaAdminClient.listTopics().names().get(60, TimeUnit.SECONDS));
@@ -149,18 +154,14 @@ public class StreamsResetter {
     }
 
     private void validateNoActiveConsumers(final String groupId,
-                                           final Properties properties) {
-        kafka.admin.AdminClient olderAdminClient = null;
-        try {
-            olderAdminClient = kafka.admin.AdminClient.create(properties);
-            if (!olderAdminClient.describeConsumerGroup(groupId, 0).consumers().get().isEmpty()) {
-                throw new IllegalStateException("Consumer group '" + groupId + "' is still active. "
-                                                + "Make sure to stop all running application instances before running the reset tool.");
-            }
-        } finally {
-            if (olderAdminClient != null) {
-                olderAdminClient.close();
-            }
+                                           final AdminClient adminClient) throws ExecutionException, InterruptedException {
+        final DescribeConsumerGroupsResult describeResult = adminClient.describeConsumerGroups(Arrays.asList(groupId),
+                (new DescribeConsumerGroupsOptions()).timeoutMs(10 * 1000));
+        final List<MemberDescription> members = describeResult.describedGroups().get(groupId).get().members();
+        if (!members.isEmpty()) {
+            throw new IllegalStateException("Consumer group '" + groupId + "' is still active "
+                    + "and has following members: " + members + ". "
+                    + "Make sure to stop all running application instances before running the reset tool.");
         }
     }
 

-- 
To stop receiving notification emails like this one, please contact
guozhang@apache.org.