You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by gu...@apache.org on 2018/11/15 21:23:53 UTC

[kafka] branch trunk updated: MINOR: Code cleanup in StreamsResetter (#5891)

This is an automated email from the ASF dual-hosted git repository.

guozhang pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 9346660  MINOR: Code cleanup in StreamsResetter (#5891)
9346660 is described below

commit 93466602d80c0c8dba7544964f48feb49589b0b5
Author: Matthias J. Sax <mj...@apache.org>
AuthorDate: Thu Nov 15 13:23:39 2018 -0800

    MINOR: Code cleanup in StreamsResetter (#5891)
    
    Reviewers: Bill Bejeck <bi...@confluent.io>, John Roesler <jo...@confluent.io>, Guozhang Wang <wa...@gmail.com>
---
 .../main/scala/kafka/tools/StreamsResetter.java    | 49 +++++++++++++---------
 1 file changed, 29 insertions(+), 20 deletions(-)

diff --git a/core/src/main/scala/kafka/tools/StreamsResetter.java b/core/src/main/scala/kafka/tools/StreamsResetter.java
index 09d3b9e..967901c 100644
--- a/core/src/main/scala/kafka/tools/StreamsResetter.java
+++ b/core/src/main/scala/kafka/tools/StreamsResetter.java
@@ -23,11 +23,11 @@ import joptsimple.OptionSpec;
 import joptsimple.OptionSpecBuilder;
 import kafka.utils.CommandLineUtils;
 import org.apache.kafka.clients.admin.AdminClient;
-import org.apache.kafka.clients.admin.MemberDescription;
 import org.apache.kafka.clients.admin.DeleteTopicsResult;
-import org.apache.kafka.clients.admin.KafkaAdminClient;
 import org.apache.kafka.clients.admin.DescribeConsumerGroupsOptions;
 import org.apache.kafka.clients.admin.DescribeConsumerGroupsResult;
+import org.apache.kafka.clients.admin.KafkaAdminClient;
+import org.apache.kafka.clients.admin.MemberDescription;
 import org.apache.kafka.clients.consumer.Consumer;
 import org.apache.kafka.clients.consumer.ConsumerConfig;
 import org.apache.kafka.clients.consumer.KafkaConsumer;
@@ -39,15 +39,14 @@ import org.apache.kafka.common.serialization.ByteArrayDeserializer;
 import org.apache.kafka.common.utils.Exit;
 import org.apache.kafka.common.utils.Utils;
 
-import javax.xml.datatype.DatatypeConfigurationException;
 import javax.xml.datatype.DatatypeFactory;
 import javax.xml.datatype.Duration;
 import java.io.IOException;
 import java.text.ParseException;
 import java.text.SimpleDateFormat;
-import java.util.Arrays;
 import java.util.ArrayList;
 import java.util.Collection;
+import java.util.Collections;
 import java.util.Date;
 import java.util.HashMap;
 import java.util.HashSet;
@@ -111,7 +110,7 @@ public class StreamsResetter {
 
     public int run(final String[] args,
                    final Properties config) {
-        int exitCode = EXIT_CODE_SUCCESS;
+        int exitCode;
 
         KafkaAdminClient kafkaAdminClient = null;
 
@@ -157,10 +156,10 @@ public class StreamsResetter {
 
     private void validateNoActiveConsumers(final String groupId,
                                            final AdminClient adminClient) throws ExecutionException, InterruptedException {
-        final DescribeConsumerGroupsResult describeResult = adminClient.describeConsumerGroups(Arrays.asList(groupId),
+        final DescribeConsumerGroupsResult describeResult = adminClient.describeConsumerGroups(Collections.singleton(groupId),
                 (new DescribeConsumerGroupsOptions()).timeoutMs(10 * 1000));
         final List<MemberDescription> members =
-            new ArrayList<MemberDescription>(describeResult.describedGroups().get(groupId).get().members());
+            new ArrayList<>(describeResult.describedGroups().get(groupId).get().members());
         if (!members.isEmpty()) {
             throw new IllegalStateException("Consumer group '" + groupId + "' is still active "
                     + "and has following members: " + members + ". "
@@ -216,7 +215,7 @@ public class StreamsResetter {
         executeOption = optionParser.accepts("execute", "Execute the command.");
         dryRunOption = optionParser.accepts("dry-run", "Display the actions that would be performed without executing the reset commands.");
 
-        // TODO: deprecated in 1.0; can be removed eventually
+        // TODO: deprecated in 1.0; can be removed eventually: https://issues.apache.org/jira/browse/KAFKA-7606
         optionParser.accepts("zookeeper", "Zookeeper option is deprecated by bootstrap.servers, as the reset tool would no longer access Zookeeper directly.");
 
         try {
@@ -230,7 +229,7 @@ public class StreamsResetter {
             CommandLineUtils.printUsageAndDie(optionParser, "Only one of --dry-run and --execute can be specified");
         }
 
-        scala.collection.immutable.HashSet<OptionSpec<?>> allScenarioOptions = new scala.collection.immutable.HashSet<>();
+        final scala.collection.immutable.HashSet<OptionSpec<?>> allScenarioOptions = new scala.collection.immutable.HashSet<>();
         allScenarioOptions.$plus(toOffsetOption);
         allScenarioOptions.$plus(toDatetimeOption);
         allScenarioOptions.$plus(byDurationOption);
@@ -315,7 +314,7 @@ public class StreamsResetter {
         config.setProperty(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false");
 
         try (final KafkaConsumer<byte[], byte[]> client = new KafkaConsumer<>(config, new ByteArrayDeserializer(), new ByteArrayDeserializer())) {
-            Collection<TopicPartition> partitions = topicsToSubscribe.stream().map(client::partitionsFor)
+            final Collection<TopicPartition> partitions = topicsToSubscribe.stream().map(client::partitionsFor)
                     .flatMap(Collection::stream)
                     .map(info -> new TopicPartition(info.topic(), info.partition()))
                     .collect(Collectors.toList());
@@ -407,7 +406,9 @@ public class StreamsResetter {
     }
 
     // visible for testing
-    public void resetOffsetsFromResetPlan(Consumer<byte[], byte[]> client, Set<TopicPartition> inputTopicPartitions, Map<TopicPartition, Long> topicPartitionsAndOffset) {
+    public void resetOffsetsFromResetPlan(final Consumer<byte[], byte[]> client,
+                                          final Set<TopicPartition> inputTopicPartitions,
+                                          final Map<TopicPartition, Long> topicPartitionsAndOffset) {
         final Map<TopicPartition, Long> endOffsets = client.endOffsets(inputTopicPartitions);
         final Map<TopicPartition, Long> beginningOffsets = client.beginningOffsets(inputTopicPartitions);
 
@@ -419,12 +420,14 @@ public class StreamsResetter {
         }
     }
 
-    private Map<TopicPartition, Long> getTopicPartitionOffsetFromResetPlan(String resetPlanPath) throws IOException, ParseException {
+    private Map<TopicPartition, Long> getTopicPartitionOffsetFromResetPlan(final String resetPlanPath) throws IOException, ParseException {
         final String resetPlanCsv = Utils.readFileAsString(resetPlanPath);
         return parseResetPlan(resetPlanCsv);
     }
 
-    private void resetByDuration(Consumer<byte[], byte[]> client, Set<TopicPartition> inputTopicPartitions, Duration duration) throws DatatypeConfigurationException {
+    private void resetByDuration(final Consumer<byte[], byte[]> client,
+                                 final Set<TopicPartition> inputTopicPartitions,
+                                 final Duration duration) {
         final Date now = new Date();
         duration.negate().addTo(now);
         final long timestamp = now.getTime();
@@ -441,7 +444,9 @@ public class StreamsResetter {
         }
     }
 
-    private void resetToDatetime(Consumer<byte[], byte[]> client, Set<TopicPartition> inputTopicPartitions, Long timestamp) {
+    private void resetToDatetime(final Consumer<byte[], byte[]> client,
+                                 final Set<TopicPartition> inputTopicPartitions,
+                                 final Long timestamp) {
         final Map<TopicPartition, Long> topicPartitionsAndTimes = new HashMap<>(inputTopicPartitions.size());
         for (final TopicPartition topicPartition : inputTopicPartitions) {
             topicPartitionsAndTimes.put(topicPartition, timestamp);
@@ -455,7 +460,9 @@ public class StreamsResetter {
     }
 
     // visible for testing
-    public void shiftOffsetsBy(Consumer<byte[], byte[]> client, Set<TopicPartition> inputTopicPartitions, long shiftBy) {
+    public void shiftOffsetsBy(final Consumer<byte[], byte[]> client,
+                               final Set<TopicPartition> inputTopicPartitions,
+                               final long shiftBy) {
         final Map<TopicPartition, Long> endOffsets = client.endOffsets(inputTopicPartitions);
         final Map<TopicPartition, Long> beginningOffsets = client.beginningOffsets(inputTopicPartitions);
 
@@ -475,7 +482,9 @@ public class StreamsResetter {
     }
 
     // visible for testing
-    public void resetOffsetsTo(Consumer<byte[], byte[]> client, Set<TopicPartition> inputTopicPartitions, Long offset) {
+    public void resetOffsetsTo(final Consumer<byte[], byte[]> client,
+                               final Set<TopicPartition> inputTopicPartitions,
+                               final Long offset) {
         final Map<TopicPartition, Long> endOffsets = client.endOffsets(inputTopicPartitions);
         final Map<TopicPartition, Long> beginningOffsets = client.beginningOffsets(inputTopicPartitions);
 
@@ -511,7 +520,7 @@ public class StreamsResetter {
         try {
             final Date date = new SimpleDateFormat("yyyy-MM-dd'T'HH:mm:ss.SSSXXX").parse(timestamp);
             return date.getTime();
-        } catch (ParseException e) {
+        } catch (final ParseException e) {
             final Date date = new SimpleDateFormat("yyyy-MM-dd'T'HH:mm:ss.SSSX").parse(timestamp);
             return date.getTime();
         }
@@ -574,7 +583,7 @@ public class StreamsResetter {
     private void maybeDeleteInternalTopics(final KafkaAdminClient adminClient, final boolean dryRun) {
 
         System.out.println("Deleting all internal/auto-created topics for application " + options.valueOf(applicationIdOption));
-        List<String> topicsToDelete = new ArrayList<>();
+        final List<String> topicsToDelete = new ArrayList<>();
         for (final String listing : allTopics) {
             if (isInternalTopic(listing)) {
                 if (!dryRun) {
@@ -600,7 +609,7 @@ public class StreamsResetter {
         for (final Map.Entry<String, KafkaFuture<Void>> entry : results.entrySet()) {
             try {
                 entry.getValue().get(30, TimeUnit.SECONDS);
-            } catch (Exception e) {
+            } catch (final Exception e) {
                 System.err.println("ERROR: deleting topic " + entry.getKey());
                 e.printStackTrace(System.err);
                 hasDeleteErrors = true;
@@ -617,7 +626,7 @@ public class StreamsResetter {
             && (topicName.endsWith("-changelog") || topicName.endsWith("-repartition"));
     }
 
-    private void printHelp(OptionParser parser) throws IOException {
+    private void printHelp(final OptionParser parser) throws IOException {
         System.err.println("The Streams Reset Tool allows you to quickly reset an application in order to reprocess "
                 + "its data from scratch.\n"
                 + "* This tool resets offsets of input topics to the earliest available offset and it skips to the end of "