You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by gu...@apache.org on 2017/05/16 21:24:33 UTC

kafka git commit: KAFKA-5166: Add option "dry run" to Streams application reset tool

Repository: kafka
Updated Branches:
  refs/heads/trunk 73703a15c -> d1d71aa29


KAFKA-5166: Add option "dry run" to Streams application reset tool

Addressed the below review comment from #PR #2998 from mjsax

I am wondering if it would be better, to "embed" the dry-run into the actual code and branch on each place. Otherwise, if things get changed, we could easily introduce bugs (ie, dry run show something different than what the actual reset code does.

We could introduce methods like mabyeSeekToBeginning() that either does the seek or only prints to stdout. This would ensure that the main logic is used to "feed" into dry-run and we don't have code duplication.

WDYT?

Author: Bharat Viswanadham <bh...@us.ibm.com>

Reviewers: Matthias J. Sax, Damian Guy, Eno Thereska, Guozhang Wang

Closes #3005 from bharatviswa504/KAFKA-5166


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/d1d71aa2
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/d1d71aa2
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/d1d71aa2

Branch: refs/heads/trunk
Commit: d1d71aa29afd4aa959864225d2fec50c71513481
Parents: 73703a1
Author: Bharat Viswanadham <bh...@us.ibm.com>
Authored: Tue May 16 14:24:31 2017 -0700
Committer: Guozhang Wang <wa...@gmail.com>
Committed: Tue May 16 14:24:31 2017 -0700

----------------------------------------------------------------------
 .../main/scala/kafka/tools/StreamsResetter.java | 151 +++++++++++++++----
 1 file changed, 120 insertions(+), 31 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/d1d71aa2/core/src/main/scala/kafka/tools/StreamsResetter.java
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/tools/StreamsResetter.java b/core/src/main/scala/kafka/tools/StreamsResetter.java
index 83166cd..a218125 100644
--- a/core/src/main/scala/kafka/tools/StreamsResetter.java
+++ b/core/src/main/scala/kafka/tools/StreamsResetter.java
@@ -16,13 +16,17 @@
  */
 package kafka.tools;
 
+
 import joptsimple.OptionException;
 import joptsimple.OptionParser;
 import joptsimple.OptionSet;
 import joptsimple.OptionSpec;
+import joptsimple.OptionSpecBuilder;
+
 import kafka.admin.AdminClient;
 import kafka.admin.TopicCommand;
 import kafka.utils.ZkUtils;
+
 import org.apache.kafka.clients.consumer.ConsumerConfig;
 import org.apache.kafka.clients.consumer.KafkaConsumer;
 import org.apache.kafka.common.TopicPartition;
@@ -32,6 +36,7 @@ import org.apache.kafka.common.serialization.ByteArrayDeserializer;
 import org.apache.kafka.common.utils.Exit;
 
 import java.io.IOException;
+import java.util.ArrayList;
 import java.util.HashSet;
 import java.util.LinkedList;
 import java.util.List;
@@ -69,10 +74,12 @@ public class StreamsResetter {
     private static OptionSpec<String> applicationIdOption;
     private static OptionSpec<String> inputTopicsOption;
     private static OptionSpec<String> intermediateTopicsOption;
+    private static OptionSpecBuilder dryRunOption;
 
     private OptionSet options = null;
     private final Properties consumerConfig = new Properties();
     private final List<String> allTopics = new LinkedList<>();
+    private boolean dryRun = false;
 
     public int run(final String[] args) {
         return run(args, new Properties());
@@ -88,13 +95,11 @@ public class StreamsResetter {
         ZkUtils zkUtils = null;
         try {
             parseArguments(args);
+            dryRun = options.has(dryRunOption);
 
             adminClient = AdminClient.createSimplePlaintext(options.valueOf(bootstrapServerOption));
             final String groupId = options.valueOf(applicationIdOption);
-            if (!adminClient.describeConsumerGroup(groupId, 0).consumers().get().isEmpty()) {
-                throw new IllegalStateException("Consumer group '" + groupId + "' is still active. " +
-                    "Make sure to stop all running application instances before running the reset tool.");
-            }
+
 
             zkUtils = ZkUtils.apply(options.valueOf(zookeeperOption),
                 30000,
@@ -104,8 +109,18 @@ public class StreamsResetter {
             allTopics.clear();
             allTopics.addAll(scala.collection.JavaConversions.seqAsJavaList(zkUtils.getAllTopics()));
 
-            resetInputAndInternalAndSeekToEndIntermediateTopicOffsets();
-            deleteInternalTopics(zkUtils);
+
+            if (!adminClient.describeConsumerGroup(groupId, 0).consumers().get().isEmpty()) {
+                throw new IllegalStateException("Consumer group '" + groupId + "' is still active. " +
+                            "Make sure to stop all running application instances before running the reset tool.");
+            }
+
+            if (dryRun) {
+                System.out.println("----Dry run displays the actions which will be performed when running Streams Reset Tool----");
+            }
+            maybeResetInputAndInternalAndSeekToEndIntermediateTopicOffsets();
+            maybeDeleteInternalTopics(zkUtils);
+
         } catch (final Throwable e) {
             exitCode = EXIT_CODE_ERROR;
             System.err.println("ERROR: " + e.getMessage());
@@ -148,6 +163,7 @@ public class StreamsResetter {
             .ofType(String.class)
             .withValuesSeparatedBy(',')
             .describedAs("list");
+        dryRunOption = optionParser.accepts("dry-run", "Option to indicate to run streams reset tool to display actions it will perform");
 
         try {
             options = optionParser.parse(args);
@@ -157,39 +173,43 @@ public class StreamsResetter {
         }
     }
 
-    private void resetInputAndInternalAndSeekToEndIntermediateTopicOffsets() {
+    private void maybeResetInputAndInternalAndSeekToEndIntermediateTopicOffsets() {
         final List<String> inputTopics = options.valuesOf(inputTopicsOption);
         final List<String> intermediateTopics = options.valuesOf(intermediateTopicsOption);
 
+        final List<String> internalTopics = new ArrayList<>();
+
+        final List<String> notFoundInputTopics = new ArrayList<>();
+        final List<String> notFoundIntermediateTopics = new ArrayList<>();
+
+        String groupId = options.valueOf(applicationIdOption);
+
         if (inputTopics.size() == 0 && intermediateTopics.size() == 0) {
             System.out.println("No input or intermediate topics specified. Skipping seek.");
             return;
-        } else {
+        }
+
+        if (!dryRun) {
             if (inputTopics.size() != 0) {
-                System.out.println("Resetting offsets to zero for input topics " + inputTopics + " and all internal topics.");
+                System.out.println("Seek-to-beginning for input topics " + inputTopics + " and all internal topics.");
             }
             if (intermediateTopics.size() != 0) {
                 System.out.println("Seek-to-end for intermediate topics " + intermediateTopics);
             }
         }
 
-        final Properties config = new Properties();
-        config.putAll(consumerConfig);
-        config.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, options.valueOf(bootstrapServerOption));
-        config.setProperty(ConsumerConfig.GROUP_ID_CONFIG, options.valueOf(applicationIdOption));
-        config.setProperty(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false");
-
         final Set<String> topicsToSubscribe = new HashSet<>(inputTopics.size() + intermediateTopics.size());
+
         for (final String topic : inputTopics) {
             if (!allTopics.contains(topic)) {
-                System.err.println("Input topic " + topic + " not found. Skipping.");
+                notFoundInputTopics.add(topic);
             } else {
                 topicsToSubscribe.add(topic);
             }
         }
         for (final String topic : intermediateTopics) {
             if (!allTopics.contains(topic)) {
-                System.err.println("Intermediate topic " + topic + " not found. Skipping.");
+                notFoundIntermediateTopics.add(topic);
             } else {
                 topicsToSubscribe.add(topic);
             }
@@ -197,9 +217,16 @@ public class StreamsResetter {
         for (final String topic : allTopics) {
             if (isInternalTopic(topic)) {
                 topicsToSubscribe.add(topic);
+                internalTopics.add(topic);
             }
         }
 
+        final Properties config = new Properties();
+        config.putAll(consumerConfig);
+        config.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, options.valueOf(bootstrapServerOption));
+        config.setProperty(ConsumerConfig.GROUP_ID_CONFIG, groupId);
+        config.setProperty(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false");
+
         try (final KafkaConsumer<byte[], byte[]> client = new KafkaConsumer<>(config, new ByteArrayDeserializer(), new ByteArrayDeserializer())) {
             client.subscribe(topicsToSubscribe);
             client.poll(1);
@@ -219,25 +246,83 @@ public class StreamsResetter {
                 }
             }
 
-            if (inputAndInternalTopicPartitions.size() > 0) {
-                client.seekToBeginning(inputAndInternalTopicPartitions);
+            maybeSeekToBeginning(client, inputAndInternalTopicPartitions, internalTopics);
+
+            maybeSeekToEnd(client, intermediateTopicPartitions);
+
+            if (!dryRun) {
+                for (final TopicPartition p : partitions) {
+                    client.position(p);
+                }
+                client.commitSync();
             }
-            if (intermediateTopicPartitions.size() > 0) {
-                client.seekToEnd(intermediateTopicPartitions);
+
+            if (notFoundInputTopics.size() > 0) {
+                System.out.println("Following input topics are not found, skipping them");
+                for (final String topic : notFoundInputTopics) {
+                    System.out.println("Topic: " + topic);
+                }
             }
 
-            for (final TopicPartition p : partitions) {
-                client.position(p);
+            if (notFoundIntermediateTopics.size() > 0) {
+                System.out.println("Following intermediate topics are not found, skipping them");
+                for (final String topic : notFoundIntermediateTopics) {
+                    System.out.println("Topic:" + topic);
+                }
             }
-            client.commitSync();
+
         } catch (final RuntimeException e) {
             System.err.println("ERROR: Resetting offsets failed.");
             throw e;
         }
-
         System.out.println("Done.");
     }
 
+    private void maybeSeekToEnd(final KafkaConsumer<byte[], byte[]> client, final Set<TopicPartition> intermediateTopicPartitions) {
+
+        final String groupId = options.valueOf(applicationIdOption);
+        final List<String> intermediateTopics = options.valuesOf(intermediateTopicsOption);
+
+        if (intermediateTopicPartitions.size() > 0) {
+            if (!dryRun) {
+                client.seekToEnd(intermediateTopicPartitions);
+            } else {
+                System.out.println("Following intermediate topics offsets will be reset to end (for consumer group " + groupId + ")");
+                for (final String topic : intermediateTopics) {
+                    if (allTopics.contains(topic)) {
+                        System.out.println("Topic: " + topic);
+                    }
+                }
+            }
+        }
+
+    }
+
+    private void maybeSeekToBeginning(final KafkaConsumer<byte[], byte[]> client,
+                                      final Set<TopicPartition> inputAndInternalTopicPartitions,
+                                      final List<String> internalTopics) {
+
+        final List<String> inputTopics = options.valuesOf(inputTopicsOption);
+        final String groupId = options.valueOf(applicationIdOption);
+
+        if (inputAndInternalTopicPartitions.size() > 0) {
+            if (!dryRun) {
+                client.seekToBeginning(inputAndInternalTopicPartitions);
+            } else {
+                System.out.println("Following input topics offsets will be reset to beginning (for consumer group " + groupId + ")");
+                for (final String topic : inputTopics) {
+                    if (allTopics.contains(topic)) {
+                        System.out.println("Topic: " + topic);
+                    }
+                }
+                System.out.println("Following internal topics offsets will be reset to beginning (for consumer group " + groupId + ")");
+                for (final String topic : internalTopics) {
+                    System.out.println("Topic: " + topic);
+                }
+            }
+        }
+    }
+
     private boolean isInputTopic(final String topic) {
         return options.valuesOf(inputTopicsOption).contains(topic);
     }
@@ -246,23 +331,27 @@ public class StreamsResetter {
         return options.valuesOf(intermediateTopicsOption).contains(topic);
     }
 
-    private void deleteInternalTopics(final ZkUtils zkUtils) {
+    private void maybeDeleteInternalTopics(final ZkUtils zkUtils) {
+
         System.out.println("Deleting all internal/auto-created topics for application " + options.valueOf(applicationIdOption));
 
         for (final String topic : allTopics) {
             if (isInternalTopic(topic)) {
-                final TopicCommand.TopicCommandOptions commandOptions = new TopicCommand.TopicCommandOptions(new String[]{
-                    "--zookeeper", options.valueOf(zookeeperOption),
-                    "--delete", "--topic", topic});
                 try {
-                    TopicCommand.deleteTopic(zkUtils, commandOptions);
+                    if (!dryRun) {
+                        final TopicCommand.TopicCommandOptions commandOptions = new TopicCommand.TopicCommandOptions(new String[]{
+                            "--zookeeper", options.valueOf(zookeeperOption),
+                            "--delete", "--topic", topic});
+                        TopicCommand.deleteTopic(zkUtils, commandOptions);
+                    } else {
+                        System.out.println("Topic: " + topic);
+                    }
                 } catch (final RuntimeException e) {
                     System.err.println("ERROR: Deleting topic " + topic + " failed.");
                     throw e;
                 }
             }
         }
-
         System.out.println("Done.");
     }