You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by gu...@apache.org on 2016/11/23 15:28:38 UTC

[3/5] kafka git commit: KAFKA-4331: Kafka Streams resetter is slow because it joins the same group for each topic

KAFKA-4331: Kafka Streams resetter is slow because it joins the same group for each topic

- reworked to use a sinlge KafkaConsumer and subscribe only once

Author: Matthias J. Sax <ma...@confluent.io>

Reviewers: Guozhang Wang <wa...@gmail.com>

Closes #2049 from mjsax/improveResetTool


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/2dedd8d9
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/2dedd8d9
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/2dedd8d9

Branch: refs/heads/0.10.1
Commit: 2dedd8d95aa56214f82822c54ff50e0ef9c4f2c8
Parents: 0a24d3a
Author: Matthias J. Sax <ma...@confluent.io>
Authored: Mon Oct 24 13:44:27 2016 -0700
Committer: Guozhang Wang <wa...@gmail.com>
Committed: Wed Nov 23 07:26:30 2016 -0800

----------------------------------------------------------------------
 .../main/scala/kafka/tools/StreamsResetter.java | 155 +++++++++----------
 1 file changed, 74 insertions(+), 81 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/2dedd8d9/core/src/main/scala/kafka/tools/StreamsResetter.java
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/tools/StreamsResetter.java b/core/src/main/scala/kafka/tools/StreamsResetter.java
index 7153790..1bb63f7 100644
--- a/core/src/main/scala/kafka/tools/StreamsResetter.java
+++ b/core/src/main/scala/kafka/tools/StreamsResetter.java
@@ -31,7 +31,7 @@ import org.apache.kafka.common.security.JaasUtils;
 import org.apache.kafka.common.serialization.ByteArrayDeserializer;
 
 import java.io.IOException;
-import java.util.Collections;
+import java.util.HashSet;
 import java.util.LinkedList;
 import java.util.List;
 import java.util.Properties;
@@ -78,8 +78,8 @@ public class StreamsResetter {
     }
 
     public int run(final String[] args, final Properties config) {
-        this.consumerConfig.clear();
-        this.consumerConfig.putAll(config);
+        consumerConfig.clear();
+        consumerConfig.putAll(config);
 
         int exitCode = EXIT_CODE_SUCCESS;
 
@@ -95,16 +95,15 @@ public class StreamsResetter {
                     "Make sure to stop all running application instances before running the reset tool.");
             }
 
-            zkUtils = ZkUtils.apply(this.options.valueOf(zookeeperOption),
+            zkUtils = ZkUtils.apply(options.valueOf(zookeeperOption),
                 30000,
                 30000,
                 JaasUtils.isZkSecurityEnabled());
 
-            this.allTopics.clear();
-            this.allTopics.addAll(scala.collection.JavaConversions.seqAsJavaList(zkUtils.getAllTopics()));
+            allTopics.clear();
+            allTopics.addAll(scala.collection.JavaConversions.seqAsJavaList(zkUtils.getAllTopics()));
 
-            resetInputAndInternalTopicOffsets();
-            seekToEndIntermediateTopics();
+            resetInputAndInternalAndSeekToEndIntermediateTopicOffsets();
             deleteInternalTopics(zkUtils);
         } catch (final Throwable e) {
             exitCode = EXIT_CODE_ERROR;
@@ -150,111 +149,105 @@ public class StreamsResetter {
             .describedAs("list");
 
         try {
-            this.options = optionParser.parse(args);
+            options = optionParser.parse(args);
         } catch (final OptionException e) {
             optionParser.printHelpOn(System.err);
             throw e;
         }
     }
 
-    private void resetInputAndInternalTopicOffsets() {
-        final List<String> inputTopics = this.options.valuesOf(inputTopicsOption);
+    private void resetInputAndInternalAndSeekToEndIntermediateTopicOffsets() {
+        final List<String> inputTopics = options.valuesOf(inputTopicsOption);
+        final List<String> intermediateTopics = options.valuesOf(intermediateTopicsOption);
 
-        if (inputTopics.size() == 0) {
-            System.out.println("No input topics specified.");
+        if (inputTopics.size() == 0 && intermediateTopics.size() == 0) {
+            System.out.println("No input or intermediate topics specified. Skipping seek.");
+            return;
         } else {
-            System.out.println("Resetting offsets to zero for input topics " + inputTopics + " and all internal topics.");
+            if (inputTopics.size() != 0) {
+                System.out.println("Resetting offsets to zero for input topics " + inputTopics + " and all internal topics.");
+            }
+            if (intermediateTopics.size() != 0) {
+                System.out.println("Seek-to-end for intermediate topics " + intermediateTopics);
+            }
         }
 
         final Properties config = new Properties();
-        config.putAll(this.consumerConfig);
-        config.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, this.options.valueOf(bootstrapServerOption));
-        config.setProperty(ConsumerConfig.GROUP_ID_CONFIG, this.options.valueOf(applicationIdOption));
+        config.putAll(consumerConfig);
+        config.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, options.valueOf(bootstrapServerOption));
+        config.setProperty(ConsumerConfig.GROUP_ID_CONFIG, options.valueOf(applicationIdOption));
         config.setProperty(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false");
 
-        for (final String inTopic : inputTopics) {
-            if (!this.allTopics.contains(inTopic)) {
-                System.out.println("Input topic " + inTopic + " not found. Skipping.");
+        final Set<String> topicsToSubscribe = new HashSet<>(inputTopics.size() + intermediateTopics.size());
+        for (final String topic : inputTopics) {
+            if (!allTopics.contains(topic)) {
+                System.err.println("Input topic " + topic + " not found. Skipping.");
+            } else {
+                topicsToSubscribe.add(topic);
+            }
+        }
+        for (final String topic : intermediateTopics) {
+            if (!allTopics.contains(topic)) {
+                System.err.println("Intermediate topic " + topic + " not found. Skipping.");
+            } else {
+                topicsToSubscribe.add(topic);
+            }
+        }
+        for (final String topic : allTopics) {
+            if (isInternalTopic(topic)) {
+                topicsToSubscribe.add(topic);
             }
         }
 
-        for (final String topic : this.allTopics) {
-            if (isInputTopic(topic) || isInternalTopic(topic)) {
-                System.out.println("Topic: " + topic);
+        try (final KafkaConsumer<byte[], byte[]> client = new KafkaConsumer<>(config, new ByteArrayDeserializer(), new ByteArrayDeserializer())) {
+            client.subscribe(topicsToSubscribe);
+            client.poll(1);
+
+            final Set<TopicPartition> partitions = client.assignment();
+            final Set<TopicPartition> inputAndInternalTopicPartitions = new HashSet<>();
+            final Set<TopicPartition> intermediateTopicPartitions = new HashSet<>();
+
+            for (final TopicPartition p : partitions) {
+                final String topic = p.topic();
+                if (isInputTopic(topic) || isInternalTopic(topic)) {
+                    inputAndInternalTopicPartitions.add(p);
+                } else if (isIntermediateTopic(topic)) {
+                    intermediateTopicPartitions.add(p);
+                } else {
+                    System.err.println("Skipping invalid partition: " + p);
+                }
+            }
 
-                try (final KafkaConsumer<byte[], byte[]> client = new KafkaConsumer<>(config, new ByteArrayDeserializer(), new ByteArrayDeserializer())) {
-                    client.subscribe(Collections.singleton(topic));
-                    client.poll(1);
+            client.seekToBeginning(inputAndInternalTopicPartitions);
+            client.seekToEnd(intermediateTopicPartitions);
 
-                    final Set<TopicPartition> partitions = client.assignment();
-                    client.seekToBeginning(partitions);
-                    for (final TopicPartition p : partitions) {
-                        client.position(p);
-                    }
-                    client.commitSync();
-                } catch (final RuntimeException e) {
-                    System.err.println("ERROR: Resetting offsets for topic " + topic + " failed.");
-                    throw e;
-                }
+            for (final TopicPartition p : partitions) {
+                client.position(p);
             }
+            client.commitSync();
+        } catch (final RuntimeException e) {
+            System.err.println("ERROR: Resetting offsets failed.");
+            throw e;
         }
 
         System.out.println("Done.");
     }
 
     private boolean isInputTopic(final String topic) {
-        return this.options.valuesOf(inputTopicsOption).contains(topic);
+        return options.valuesOf(inputTopicsOption).contains(topic);
     }
 
-    private void seekToEndIntermediateTopics() {
-        final List<String> intermediateTopics = this.options.valuesOf(intermediateTopicsOption);
-
-        if (intermediateTopics.size() == 0) {
-            System.out.println("No intermediate user topics specified, skipping seek-to-end for user topic offsets.");
-            return;
-        }
-
-        System.out.println("Seek-to-end for intermediate user topics " + intermediateTopics);
-
-        final Properties config = new Properties();
-        config.putAll(this.consumerConfig);
-        config.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, this.options.valueOf(bootstrapServerOption));
-        config.setProperty(ConsumerConfig.GROUP_ID_CONFIG, this.options.valueOf(applicationIdOption));
-        config.setProperty(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false");
-
-        for (final String topic : intermediateTopics) {
-            if (this.allTopics.contains(topic)) {
-                System.out.println("Topic: " + topic);
-
-                try (final KafkaConsumer<byte[], byte[]> client = new KafkaConsumer<>(config, new ByteArrayDeserializer(), new ByteArrayDeserializer())) {
-                    client.subscribe(Collections.singleton(topic));
-                    client.poll(1);
-
-                    final Set<TopicPartition> partitions = client.assignment();
-                    client.seekToEnd(partitions);
-                    for (final TopicPartition p : partitions) {
-                        client.position(p);
-                    }
-                    client.commitSync();
-                } catch (final RuntimeException e) {
-                    System.err.println("ERROR: Seek-to-end for topic " + topic + " failed.");
-                    throw e;
-                }
-            } else {
-                System.out.println("Topic " + topic + " not found. Skipping.");
-            }
-        }
-
-        System.out.println("Done.");
+    private boolean isIntermediateTopic(final String topic) {
+        return options.valuesOf(intermediateTopicsOption).contains(topic);
     }
 
     private void deleteInternalTopics(final ZkUtils zkUtils) {
-        System.out.println("Deleting all internal/auto-created topics for application " + this.options.valueOf(applicationIdOption));
+        System.out.println("Deleting all internal/auto-created topics for application " + options.valueOf(applicationIdOption));
 
-        for (final String topic : this.allTopics) {
+        for (final String topic : allTopics) {
             if (isInternalTopic(topic)) {
                 final TopicCommand.TopicCommandOptions commandOptions = new TopicCommand.TopicCommandOptions(new String[]{
-                    "--zookeeper", this.options.valueOf(zookeeperOption),
+                    "--zookeeper", options.valueOf(zookeeperOption),
                     "--delete", "--topic", topic});
                 try {
                     TopicCommand.deleteTopic(zkUtils, commandOptions);
@@ -269,7 +262,7 @@ public class StreamsResetter {
     }
 
     private boolean isInternalTopic(final String topicName) {
-        return topicName.startsWith(this.options.valueOf(applicationIdOption) + "-")
+        return topicName.startsWith(options.valueOf(applicationIdOption) + "-")
             && (topicName.endsWith("-changelog") || topicName.endsWith("-repartition"));
     }