You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by gu...@apache.org on 2017/05/18 06:15:26 UTC
kafka git commit: KAFKA-5210: Application Reset Tool does not need to
seek for internal topics
Repository: kafka
Updated Branches:
refs/heads/trunk 972b75453 -> 9fa0d52ca
KAFKA-5210: Application Reset Tool does not need to seek for internal topics
mjsax dguy guozhangwang Could you please review the changes.
Author: Bharat Viswanadham <bh...@us.ibm.com>
Reviewers: Matthias J. Sax, Damian Guy, Guozhang Wang
Closes #3073 from bharatviswa504/KAFKA-5210
Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/9fa0d52c
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/9fa0d52c
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/9fa0d52c
Branch: refs/heads/trunk
Commit: 9fa0d52cac24c69dbc907208ccb3e603cab3503b
Parents: 972b754
Author: Bharat Viswanadham <bh...@us.ibm.com>
Authored: Wed May 17 23:15:23 2017 -0700
Committer: Guozhang Wang <wa...@gmail.com>
Committed: Wed May 17 23:15:23 2017 -0700
----------------------------------------------------------------------
.../main/scala/kafka/tools/StreamsResetter.java | 32 ++++++--------------
1 file changed, 10 insertions(+), 22 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/kafka/blob/9fa0d52c/core/src/main/scala/kafka/tools/StreamsResetter.java
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/tools/StreamsResetter.java b/core/src/main/scala/kafka/tools/StreamsResetter.java
index a218125..3a778ee 100644
--- a/core/src/main/scala/kafka/tools/StreamsResetter.java
+++ b/core/src/main/scala/kafka/tools/StreamsResetter.java
@@ -118,7 +118,7 @@ public class StreamsResetter {
if (dryRun) {
System.out.println("----Dry run displays the actions which will be performed when running Streams Reset Tool----");
}
- maybeResetInputAndInternalAndSeekToEndIntermediateTopicOffsets();
+ maybeResetInputAndSeekToEndIntermediateTopicOffsets();
maybeDeleteInternalTopics(zkUtils);
} catch (final Throwable e) {
@@ -173,11 +173,10 @@ public class StreamsResetter {
}
}
- private void maybeResetInputAndInternalAndSeekToEndIntermediateTopicOffsets() {
+ private void maybeResetInputAndSeekToEndIntermediateTopicOffsets() {
final List<String> inputTopics = options.valuesOf(inputTopicsOption);
final List<String> intermediateTopics = options.valuesOf(intermediateTopicsOption);
- final List<String> internalTopics = new ArrayList<>();
final List<String> notFoundInputTopics = new ArrayList<>();
final List<String> notFoundIntermediateTopics = new ArrayList<>();
@@ -191,7 +190,7 @@ public class StreamsResetter {
if (!dryRun) {
if (inputTopics.size() != 0) {
- System.out.println("Seek-to-beginning for input topics " + inputTopics + " and all internal topics.");
+ System.out.println("Seek-to-beginning for input topics " + inputTopics);
}
if (intermediateTopics.size() != 0) {
System.out.println("Seek-to-end for intermediate topics " + intermediateTopics);
@@ -214,12 +213,6 @@ public class StreamsResetter {
topicsToSubscribe.add(topic);
}
}
- for (final String topic : allTopics) {
- if (isInternalTopic(topic)) {
- topicsToSubscribe.add(topic);
- internalTopics.add(topic);
- }
- }
final Properties config = new Properties();
config.putAll(consumerConfig);
@@ -232,13 +225,13 @@ public class StreamsResetter {
client.poll(1);
final Set<TopicPartition> partitions = client.assignment();
- final Set<TopicPartition> inputAndInternalTopicPartitions = new HashSet<>();
+ final Set<TopicPartition> inputTopicPartitions = new HashSet<>();
final Set<TopicPartition> intermediateTopicPartitions = new HashSet<>();
for (final TopicPartition p : partitions) {
final String topic = p.topic();
- if (isInputTopic(topic) || isInternalTopic(topic)) {
- inputAndInternalTopicPartitions.add(p);
+ if (isInputTopic(topic)) {
+ inputTopicPartitions.add(p);
} else if (isIntermediateTopic(topic)) {
intermediateTopicPartitions.add(p);
} else {
@@ -246,7 +239,7 @@ public class StreamsResetter {
}
}
- maybeSeekToBeginning(client, inputAndInternalTopicPartitions, internalTopics);
+ maybeSeekToBeginning(client, inputTopicPartitions);
maybeSeekToEnd(client, intermediateTopicPartitions);
@@ -299,15 +292,14 @@ public class StreamsResetter {
}
private void maybeSeekToBeginning(final KafkaConsumer<byte[], byte[]> client,
- final Set<TopicPartition> inputAndInternalTopicPartitions,
- final List<String> internalTopics) {
+ final Set<TopicPartition> inputTopicPartitions) {
final List<String> inputTopics = options.valuesOf(inputTopicsOption);
final String groupId = options.valueOf(applicationIdOption);
- if (inputAndInternalTopicPartitions.size() > 0) {
+ if (inputTopicPartitions.size() > 0) {
if (!dryRun) {
- client.seekToBeginning(inputAndInternalTopicPartitions);
+ client.seekToBeginning(inputTopicPartitions);
} else {
System.out.println("Following input topics offsets will be reset to beginning (for consumer group " + groupId + ")");
for (final String topic : inputTopics) {
@@ -315,10 +307,6 @@ public class StreamsResetter {
System.out.println("Topic: " + topic);
}
}
- System.out.println("Following internal topics offsets will be reset to beginning (for consumer group " + groupId + ")");
- for (final String topic : internalTopics) {
- System.out.println("Topic: " + topic);
- }
}
}
}