You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by gu...@apache.org on 2016/11/23 15:28:38 UTC
[3/5] kafka git commit: KAFKA-4331: Kafka Streams resetter is slow
because it joins the same group for each topic
KAFKA-4331: Kafka Streams resetter is slow because it joins the same group for each topic
- reworked to use a sinlge KafkaConsumer and subscribe only once
Author: Matthias J. Sax <ma...@confluent.io>
Reviewers: Guozhang Wang <wa...@gmail.com>
Closes #2049 from mjsax/improveResetTool
Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/2dedd8d9
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/2dedd8d9
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/2dedd8d9
Branch: refs/heads/0.10.1
Commit: 2dedd8d95aa56214f82822c54ff50e0ef9c4f2c8
Parents: 0a24d3a
Author: Matthias J. Sax <ma...@confluent.io>
Authored: Mon Oct 24 13:44:27 2016 -0700
Committer: Guozhang Wang <wa...@gmail.com>
Committed: Wed Nov 23 07:26:30 2016 -0800
----------------------------------------------------------------------
.../main/scala/kafka/tools/StreamsResetter.java | 155 +++++++++----------
1 file changed, 74 insertions(+), 81 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/kafka/blob/2dedd8d9/core/src/main/scala/kafka/tools/StreamsResetter.java
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/tools/StreamsResetter.java b/core/src/main/scala/kafka/tools/StreamsResetter.java
index 7153790..1bb63f7 100644
--- a/core/src/main/scala/kafka/tools/StreamsResetter.java
+++ b/core/src/main/scala/kafka/tools/StreamsResetter.java
@@ -31,7 +31,7 @@ import org.apache.kafka.common.security.JaasUtils;
import org.apache.kafka.common.serialization.ByteArrayDeserializer;
import java.io.IOException;
-import java.util.Collections;
+import java.util.HashSet;
import java.util.LinkedList;
import java.util.List;
import java.util.Properties;
@@ -78,8 +78,8 @@ public class StreamsResetter {
}
public int run(final String[] args, final Properties config) {
- this.consumerConfig.clear();
- this.consumerConfig.putAll(config);
+ consumerConfig.clear();
+ consumerConfig.putAll(config);
int exitCode = EXIT_CODE_SUCCESS;
@@ -95,16 +95,15 @@ public class StreamsResetter {
"Make sure to stop all running application instances before running the reset tool.");
}
- zkUtils = ZkUtils.apply(this.options.valueOf(zookeeperOption),
+ zkUtils = ZkUtils.apply(options.valueOf(zookeeperOption),
30000,
30000,
JaasUtils.isZkSecurityEnabled());
- this.allTopics.clear();
- this.allTopics.addAll(scala.collection.JavaConversions.seqAsJavaList(zkUtils.getAllTopics()));
+ allTopics.clear();
+ allTopics.addAll(scala.collection.JavaConversions.seqAsJavaList(zkUtils.getAllTopics()));
- resetInputAndInternalTopicOffsets();
- seekToEndIntermediateTopics();
+ resetInputAndInternalAndSeekToEndIntermediateTopicOffsets();
deleteInternalTopics(zkUtils);
} catch (final Throwable e) {
exitCode = EXIT_CODE_ERROR;
@@ -150,111 +149,105 @@ public class StreamsResetter {
.describedAs("list");
try {
- this.options = optionParser.parse(args);
+ options = optionParser.parse(args);
} catch (final OptionException e) {
optionParser.printHelpOn(System.err);
throw e;
}
}
- private void resetInputAndInternalTopicOffsets() {
- final List<String> inputTopics = this.options.valuesOf(inputTopicsOption);
+ private void resetInputAndInternalAndSeekToEndIntermediateTopicOffsets() {
+ final List<String> inputTopics = options.valuesOf(inputTopicsOption);
+ final List<String> intermediateTopics = options.valuesOf(intermediateTopicsOption);
- if (inputTopics.size() == 0) {
- System.out.println("No input topics specified.");
+ if (inputTopics.size() == 0 && intermediateTopics.size() == 0) {
+ System.out.println("No input or intermediate topics specified. Skipping seek.");
+ return;
} else {
- System.out.println("Resetting offsets to zero for input topics " + inputTopics + " and all internal topics.");
+ if (inputTopics.size() != 0) {
+ System.out.println("Resetting offsets to zero for input topics " + inputTopics + " and all internal topics.");
+ }
+ if (intermediateTopics.size() != 0) {
+ System.out.println("Seek-to-end for intermediate topics " + intermediateTopics);
+ }
}
final Properties config = new Properties();
- config.putAll(this.consumerConfig);
- config.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, this.options.valueOf(bootstrapServerOption));
- config.setProperty(ConsumerConfig.GROUP_ID_CONFIG, this.options.valueOf(applicationIdOption));
+ config.putAll(consumerConfig);
+ config.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, options.valueOf(bootstrapServerOption));
+ config.setProperty(ConsumerConfig.GROUP_ID_CONFIG, options.valueOf(applicationIdOption));
config.setProperty(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false");
- for (final String inTopic : inputTopics) {
- if (!this.allTopics.contains(inTopic)) {
- System.out.println("Input topic " + inTopic + " not found. Skipping.");
+ final Set<String> topicsToSubscribe = new HashSet<>(inputTopics.size() + intermediateTopics.size());
+ for (final String topic : inputTopics) {
+ if (!allTopics.contains(topic)) {
+ System.err.println("Input topic " + topic + " not found. Skipping.");
+ } else {
+ topicsToSubscribe.add(topic);
+ }
+ }
+ for (final String topic : intermediateTopics) {
+ if (!allTopics.contains(topic)) {
+ System.err.println("Intermediate topic " + topic + " not found. Skipping.");
+ } else {
+ topicsToSubscribe.add(topic);
+ }
+ }
+ for (final String topic : allTopics) {
+ if (isInternalTopic(topic)) {
+ topicsToSubscribe.add(topic);
}
}
- for (final String topic : this.allTopics) {
- if (isInputTopic(topic) || isInternalTopic(topic)) {
- System.out.println("Topic: " + topic);
+ try (final KafkaConsumer<byte[], byte[]> client = new KafkaConsumer<>(config, new ByteArrayDeserializer(), new ByteArrayDeserializer())) {
+ client.subscribe(topicsToSubscribe);
+ client.poll(1);
+
+ final Set<TopicPartition> partitions = client.assignment();
+ final Set<TopicPartition> inputAndInternalTopicPartitions = new HashSet<>();
+ final Set<TopicPartition> intermediateTopicPartitions = new HashSet<>();
+
+ for (final TopicPartition p : partitions) {
+ final String topic = p.topic();
+ if (isInputTopic(topic) || isInternalTopic(topic)) {
+ inputAndInternalTopicPartitions.add(p);
+ } else if (isIntermediateTopic(topic)) {
+ intermediateTopicPartitions.add(p);
+ } else {
+ System.err.println("Skipping invalid partition: " + p);
+ }
+ }
- try (final KafkaConsumer<byte[], byte[]> client = new KafkaConsumer<>(config, new ByteArrayDeserializer(), new ByteArrayDeserializer())) {
- client.subscribe(Collections.singleton(topic));
- client.poll(1);
+ client.seekToBeginning(inputAndInternalTopicPartitions);
+ client.seekToEnd(intermediateTopicPartitions);
- final Set<TopicPartition> partitions = client.assignment();
- client.seekToBeginning(partitions);
- for (final TopicPartition p : partitions) {
- client.position(p);
- }
- client.commitSync();
- } catch (final RuntimeException e) {
- System.err.println("ERROR: Resetting offsets for topic " + topic + " failed.");
- throw e;
- }
+ for (final TopicPartition p : partitions) {
+ client.position(p);
}
+ client.commitSync();
+ } catch (final RuntimeException e) {
+ System.err.println("ERROR: Resetting offsets failed.");
+ throw e;
}
System.out.println("Done.");
}
private boolean isInputTopic(final String topic) {
- return this.options.valuesOf(inputTopicsOption).contains(topic);
+ return options.valuesOf(inputTopicsOption).contains(topic);
}
- private void seekToEndIntermediateTopics() {
- final List<String> intermediateTopics = this.options.valuesOf(intermediateTopicsOption);
-
- if (intermediateTopics.size() == 0) {
- System.out.println("No intermediate user topics specified, skipping seek-to-end for user topic offsets.");
- return;
- }
-
- System.out.println("Seek-to-end for intermediate user topics " + intermediateTopics);
-
- final Properties config = new Properties();
- config.putAll(this.consumerConfig);
- config.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, this.options.valueOf(bootstrapServerOption));
- config.setProperty(ConsumerConfig.GROUP_ID_CONFIG, this.options.valueOf(applicationIdOption));
- config.setProperty(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false");
-
- for (final String topic : intermediateTopics) {
- if (this.allTopics.contains(topic)) {
- System.out.println("Topic: " + topic);
-
- try (final KafkaConsumer<byte[], byte[]> client = new KafkaConsumer<>(config, new ByteArrayDeserializer(), new ByteArrayDeserializer())) {
- client.subscribe(Collections.singleton(topic));
- client.poll(1);
-
- final Set<TopicPartition> partitions = client.assignment();
- client.seekToEnd(partitions);
- for (final TopicPartition p : partitions) {
- client.position(p);
- }
- client.commitSync();
- } catch (final RuntimeException e) {
- System.err.println("ERROR: Seek-to-end for topic " + topic + " failed.");
- throw e;
- }
- } else {
- System.out.println("Topic " + topic + " not found. Skipping.");
- }
- }
-
- System.out.println("Done.");
+ private boolean isIntermediateTopic(final String topic) {
+ return options.valuesOf(intermediateTopicsOption).contains(topic);
}
private void deleteInternalTopics(final ZkUtils zkUtils) {
- System.out.println("Deleting all internal/auto-created topics for application " + this.options.valueOf(applicationIdOption));
+ System.out.println("Deleting all internal/auto-created topics for application " + options.valueOf(applicationIdOption));
- for (final String topic : this.allTopics) {
+ for (final String topic : allTopics) {
if (isInternalTopic(topic)) {
final TopicCommand.TopicCommandOptions commandOptions = new TopicCommand.TopicCommandOptions(new String[]{
- "--zookeeper", this.options.valueOf(zookeeperOption),
+ "--zookeeper", options.valueOf(zookeeperOption),
"--delete", "--topic", topic});
try {
TopicCommand.deleteTopic(zkUtils, commandOptions);
@@ -269,7 +262,7 @@ public class StreamsResetter {
}
private boolean isInternalTopic(final String topicName) {
- return topicName.startsWith(this.options.valueOf(applicationIdOption) + "-")
+ return topicName.startsWith(options.valueOf(applicationIdOption) + "-")
&& (topicName.endsWith("-changelog") || topicName.endsWith("-repartition"));
}