You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by gu...@apache.org on 2018/05/11 15:44:43 UTC
[kafka] branch trunk updated: KAFKA-5965: Remove Deprecated
AdminClient from Streams Resetter Tool (#4968)
This is an automated email from the ASF dual-hosted git repository.
guozhang pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new 6eb7cf1 KAFKA-5965: Remove Deprecated AdminClient from Streams Resetter Tool (#4968)
6eb7cf1 is described below
commit 6eb7cf1300fc0c411ffab93de041654bc10918bf
Author: fedosov-alexander <al...@yandex.ru>
AuthorDate: Fri May 11 18:44:27 2018 +0300
KAFKA-5965: Remove Deprecated AdminClient from Streams Resetter Tool (#4968)
Removed usage of deprecated AdminClient from StreamsResetter
No additional tests are required.
Matthias J. Sax <ma...@confluent.io>, Guozhang Wang <wa...@gmail.com>
---
.../main/scala/kafka/tools/StreamsResetter.java | 27 +++++++++++-----------
1 file changed, 14 insertions(+), 13 deletions(-)
diff --git a/core/src/main/scala/kafka/tools/StreamsResetter.java b/core/src/main/scala/kafka/tools/StreamsResetter.java
index b0d5276..d7c4e43 100644
--- a/core/src/main/scala/kafka/tools/StreamsResetter.java
+++ b/core/src/main/scala/kafka/tools/StreamsResetter.java
@@ -23,8 +23,11 @@ import joptsimple.OptionSpec;
import joptsimple.OptionSpecBuilder;
import kafka.utils.CommandLineUtils;
import org.apache.kafka.clients.admin.AdminClient;
+import org.apache.kafka.clients.admin.MemberDescription;
import org.apache.kafka.clients.admin.DeleteTopicsResult;
import org.apache.kafka.clients.admin.KafkaAdminClient;
+import org.apache.kafka.clients.admin.DescribeConsumerGroupsOptions;
+import org.apache.kafka.clients.admin.DescribeConsumerGroupsResult;
import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.KafkaConsumer;
@@ -42,6 +45,7 @@ import javax.xml.datatype.Duration;
import java.io.IOException;
import java.text.ParseException;
import java.text.SimpleDateFormat;
+import java.util.Arrays;
import java.util.ArrayList;
import java.util.Date;
import java.util.HashMap;
@@ -51,6 +55,7 @@ import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.Set;
+import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
/**
@@ -120,8 +125,8 @@ public class StreamsResetter {
}
properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, options.valueOf(bootstrapServerOption));
- validateNoActiveConsumers(groupId, properties);
kafkaAdminClient = (KafkaAdminClient) AdminClient.create(properties);
+ validateNoActiveConsumers(groupId, kafkaAdminClient);
allTopics.clear();
allTopics.addAll(kafkaAdminClient.listTopics().names().get(60, TimeUnit.SECONDS));
@@ -149,18 +154,14 @@ public class StreamsResetter {
}
private void validateNoActiveConsumers(final String groupId,
- final Properties properties) {
- kafka.admin.AdminClient olderAdminClient = null;
- try {
- olderAdminClient = kafka.admin.AdminClient.create(properties);
- if (!olderAdminClient.describeConsumerGroup(groupId, 0).consumers().get().isEmpty()) {
- throw new IllegalStateException("Consumer group '" + groupId + "' is still active. "
- + "Make sure to stop all running application instances before running the reset tool.");
- }
- } finally {
- if (olderAdminClient != null) {
- olderAdminClient.close();
- }
+ final AdminClient adminClient) throws ExecutionException, InterruptedException {
+ final DescribeConsumerGroupsResult describeResult = adminClient.describeConsumerGroups(Arrays.asList(groupId),
+ (new DescribeConsumerGroupsOptions()).timeoutMs(10 * 1000));
+ final List<MemberDescription> members = describeResult.describedGroups().get(groupId).get().members();
+ if (!members.isEmpty()) {
+ throw new IllegalStateException("Consumer group '" + groupId + "' is still active "
+ + "and has following members: " + members + ". "
+ + "Make sure to stop all running application instances before running the reset tool.");
}
}
--
To stop receiving notification emails like this one, please contact
guozhang@apache.org.