You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by gu...@apache.org on 2017/05/18 06:15:26 UTC

kafka git commit: KAFKA-5210: Application Reset Tool does not need to seek for internal topics

Repository: kafka
Updated Branches:
  refs/heads/trunk 972b75453 -> 9fa0d52ca


KAFKA-5210: Application Reset Tool does not need to seek for internal topics

mjsax dguy guozhangwang Could you please review the changes.

Author: Bharat Viswanadham <bh...@us.ibm.com>

Reviewers: Matthias J. Sax, Damian Guy, Guozhang Wang

Closes #3073 from bharatviswa504/KAFKA-5210


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/9fa0d52c
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/9fa0d52c
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/9fa0d52c

Branch: refs/heads/trunk
Commit: 9fa0d52cac24c69dbc907208ccb3e603cab3503b
Parents: 972b754
Author: Bharat Viswanadham <bh...@us.ibm.com>
Authored: Wed May 17 23:15:23 2017 -0700
Committer: Guozhang Wang <wa...@gmail.com>
Committed: Wed May 17 23:15:23 2017 -0700

----------------------------------------------------------------------
 .../main/scala/kafka/tools/StreamsResetter.java | 32 ++++++--------------
 1 file changed, 10 insertions(+), 22 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/9fa0d52c/core/src/main/scala/kafka/tools/StreamsResetter.java
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/tools/StreamsResetter.java b/core/src/main/scala/kafka/tools/StreamsResetter.java
index a218125..3a778ee 100644
--- a/core/src/main/scala/kafka/tools/StreamsResetter.java
+++ b/core/src/main/scala/kafka/tools/StreamsResetter.java
@@ -118,7 +118,7 @@ public class StreamsResetter {
             if (dryRun) {
                 System.out.println("----Dry run displays the actions which will be performed when running Streams Reset Tool----");
             }
-            maybeResetInputAndInternalAndSeekToEndIntermediateTopicOffsets();
+            maybeResetInputAndSeekToEndIntermediateTopicOffsets();
             maybeDeleteInternalTopics(zkUtils);
 
         } catch (final Throwable e) {
@@ -173,11 +173,10 @@ public class StreamsResetter {
         }
     }
 
-    private void maybeResetInputAndInternalAndSeekToEndIntermediateTopicOffsets() {
+    private void maybeResetInputAndSeekToEndIntermediateTopicOffsets() {
         final List<String> inputTopics = options.valuesOf(inputTopicsOption);
         final List<String> intermediateTopics = options.valuesOf(intermediateTopicsOption);
 
-        final List<String> internalTopics = new ArrayList<>();
 
         final List<String> notFoundInputTopics = new ArrayList<>();
         final List<String> notFoundIntermediateTopics = new ArrayList<>();
@@ -191,7 +190,7 @@ public class StreamsResetter {
 
         if (!dryRun) {
             if (inputTopics.size() != 0) {
-                System.out.println("Seek-to-beginning for input topics " + inputTopics + " and all internal topics.");
+                System.out.println("Seek-to-beginning for input topics " + inputTopics);
             }
             if (intermediateTopics.size() != 0) {
                 System.out.println("Seek-to-end for intermediate topics " + intermediateTopics);
@@ -214,12 +213,6 @@ public class StreamsResetter {
                 topicsToSubscribe.add(topic);
             }
         }
-        for (final String topic : allTopics) {
-            if (isInternalTopic(topic)) {
-                topicsToSubscribe.add(topic);
-                internalTopics.add(topic);
-            }
-        }
 
         final Properties config = new Properties();
         config.putAll(consumerConfig);
@@ -232,13 +225,13 @@ public class StreamsResetter {
             client.poll(1);
 
             final Set<TopicPartition> partitions = client.assignment();
-            final Set<TopicPartition> inputAndInternalTopicPartitions = new HashSet<>();
+            final Set<TopicPartition> inputTopicPartitions = new HashSet<>();
             final Set<TopicPartition> intermediateTopicPartitions = new HashSet<>();
 
             for (final TopicPartition p : partitions) {
                 final String topic = p.topic();
-                if (isInputTopic(topic) || isInternalTopic(topic)) {
-                    inputAndInternalTopicPartitions.add(p);
+                if (isInputTopic(topic)) {
+                    inputTopicPartitions.add(p);
                 } else if (isIntermediateTopic(topic)) {
                     intermediateTopicPartitions.add(p);
                 } else {
@@ -246,7 +239,7 @@ public class StreamsResetter {
                 }
             }
 
-            maybeSeekToBeginning(client, inputAndInternalTopicPartitions, internalTopics);
+            maybeSeekToBeginning(client, inputTopicPartitions);
 
             maybeSeekToEnd(client, intermediateTopicPartitions);
 
@@ -299,15 +292,14 @@ public class StreamsResetter {
     }
 
     private void maybeSeekToBeginning(final KafkaConsumer<byte[], byte[]> client,
-                                      final Set<TopicPartition> inputAndInternalTopicPartitions,
-                                      final List<String> internalTopics) {
+                                      final Set<TopicPartition> inputTopicPartitions) {
 
         final List<String> inputTopics = options.valuesOf(inputTopicsOption);
         final String groupId = options.valueOf(applicationIdOption);
 
-        if (inputAndInternalTopicPartitions.size() > 0) {
+        if (inputTopicPartitions.size() > 0) {
             if (!dryRun) {
-                client.seekToBeginning(inputAndInternalTopicPartitions);
+                client.seekToBeginning(inputTopicPartitions);
             } else {
                 System.out.println("Following input topics offsets will be reset to beginning (for consumer group " + groupId + ")");
                 for (final String topic : inputTopics) {
@@ -315,10 +307,6 @@ public class StreamsResetter {
                         System.out.println("Topic: " + topic);
                     }
                 }
-                System.out.println("Following internal topics offsets will be reset to beginning (for consumer group " + groupId + ")");
-                for (final String topic : internalTopics) {
-                    System.out.println("Topic: " + topic);
-                }
             }
         }
     }