You are viewing a plain text version of this content. The canonical link for it is here.
Posted to jira@kafka.apache.org by GitBox <gi...@apache.org> on 2020/06/24 20:04:44 UTC

[GitHub] [kafka] JoelWee opened a new pull request #8923: KAFKA-6435: KIP-623 Add internal topics option to streamResetter

JoelWee opened a new pull request #8923:
URL: https://github.com/apache/kafka/pull/8923


   This allows users to specify which internal-topics the tool will attempt to delete instead of inferring it.
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] JoelWee commented on pull request #8923: KAFKA-6435: KIP-623 Add internal topics option to streamResetter

Posted by GitBox <gi...@apache.org>.
JoelWee commented on pull request #8923:
URL: https://github.com/apache/kafka/pull/8923#issuecomment-701619467


   Hi @abbccdda, would you mind taking another look? :) 


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] JoelWee commented on a change in pull request #8923: KAFKA-6435: KIP-623 Add internal topics option to streamResetter

Posted by GitBox <gi...@apache.org>.
JoelWee commented on a change in pull request #8923:
URL: https://github.com/apache/kafka/pull/8923#discussion_r621537903



##########
File path: docs/streams/developer-guide/app-reset-tool.html
##########
@@ -78,6 +78,9 @@
             <h2>Step 1: Run the application reset tool<a class="headerlink" href="#step-1-run-the-application-reset-tool" title="Permalink to this headline"></a></h2>
             <p>Invoke the application reset tool from the command line</p>
             <div class="highlight-bash"><div class="highlight"><pre><span></span><code>&lt;path-to-kafka&gt;/bin/kafka-streams-application-reset</code></pre></div>
+            <p>Warning! This tool makes irreversible changes to your application. It is strongly recommended that you run this once with <code class="docutils literal"><span class="pre">--dry-run</span></code> to preview your changes before making them.</p>
+            <div class="highlight-bash"><div class="highlight"><pre><span></span>&lt;path-to-kafka&gt;/bin/kafka-streams-application-reset

Review comment:
       Thanks! Have updated this now to look like:
   
   <img width="1034" alt="Screenshot 2021-04-27 at 7 54 48 PM" src="https://user-images.githubusercontent.com/32009741/116301476-fa873f80-a797-11eb-9a19-de59d6771ac5.png">
   




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] JoelWee commented on a change in pull request #8923: KAFKA-6435: KIP-623 Add internal topics option to streamResetter

Posted by GitBox <gi...@apache.org>.
JoelWee commented on a change in pull request #8923:
URL: https://github.com/apache/kafka/pull/8923#discussion_r469722613



##########
File path: docs/streams/developer-guide/app-reset-tool.html
##########
@@ -77,6 +77,7 @@
         <div class="section" id="step-1-run-the-application-reset-tool">
             <h2>Step 1: Run the application reset tool<a class="headerlink" href="#step-1-run-the-application-reset-tool" title="Permalink to this headline"></a></h2>
             <p>Invoke the application reset tool from the command line</p>
+            <p>Warning! This tool makes irreversible changes to your application. It is strongly recommended that you run this once with --dry-run to preview your changes before making them.</p>

Review comment:
       👍  here's a screenshot for this portion:
   
   <img width="1035" alt="Screenshot 2020-08-13 at 2 19 01 PM" src="https://user-images.githubusercontent.com/32009741/90100918-40b68300-dd70-11ea-8ae2-a08c385156da.png">
   

##########
File path: core/src/main/scala/kafka/tools/StreamsResetter.java
##########
@@ -167,7 +171,7 @@ public int run(final String[] args,
             final HashMap<Object, Object> consumerConfig = new HashMap<>(config);
             consumerConfig.putAll(properties);
             exitCode = maybeResetInputAndSeekToEndIntermediateTopicOffsets(consumerConfig, dryRun);
-            maybeDeleteInternalTopics(adminClient, dryRun);
+            exitCode |= maybeDeleteInternalTopics(adminClient, dryRun);

Review comment:
       Yep. Currently it returns 1 if either exitCode is 1, and 0 otherwise. Or should we do something else?

##########
File path: docs/streams/developer-guide/app-reset-tool.html
##########
@@ -106,6 +107,11 @@ <h2>Step 1: Run the application reset tool<a class="headerlink" href="#step-1-ru
                                         topics <span class="o">(</span>topics used in the through<span class="o">()</span>
                                         method<span class="o">)</span>. For these topics, the tool
                                         will skip to the end.
+--internal-topics &lt;String: list&gt;      Comma-separated list of internal topics
+                                        to delete. Must be a subset of the
+                                        internal topics marked for deletion by
+                                        the default behaviour (do a dry-run without
+                                        this option to view these topics).

Review comment:
       screenshot of this portion:
   
   <img width="1031" alt="Screenshot 2020-08-13 at 2 19 35 PM" src="https://user-images.githubusercontent.com/32009741/90100950-52982600-dd70-11ea-9ae5-2b84296188cc.png">
   




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] JoelWee commented on pull request #8923: KAFKA-6435: KIP-623 Add internal topics option to streamResetter

Posted by GitBox <gi...@apache.org>.
JoelWee commented on pull request #8923:
URL: https://github.com/apache/kafka/pull/8923#issuecomment-828616051


   Thanks @ableegoldman! :) 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] ableegoldman commented on pull request #8923: KAFKA-6435: KIP-623 Add internal topics option to streamResetter

Posted by GitBox <gi...@apache.org>.
ableegoldman commented on pull request #8923:
URL: https://github.com/apache/kafka/pull/8923#issuecomment-819179530


   Hey @abbccdda , can you take another look at this? @JoelWee are you still interested in getting this KIP done? 
   
   Maybe one of @cadonna @mjsax @lct45 @wcarlson5 can help with the review if so


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] JoelWee commented on pull request #8923: KAFKA-6435: KIP-623 Add internal topics option to streamResetter

Posted by GitBox <gi...@apache.org>.
JoelWee commented on pull request #8923:
URL: https://github.com/apache/kafka/pull/8923#issuecomment-827122994


   Hi @ableegoldman, have rebased now :). The failing tests are RaftClusterTests which don't seem related to the changes here?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] wcarlson5 commented on a change in pull request #8923: KAFKA-6435: KIP-623 Add internal topics option to streamResetter

Posted by GitBox <gi...@apache.org>.
wcarlson5 commented on a change in pull request #8923:
URL: https://github.com/apache/kafka/pull/8923#discussion_r614166802



##########
File path: core/src/main/scala/kafka/tools/StreamsResetter.java
##########
@@ -248,7 +257,7 @@ private void parseArguments(final String[] args) {
             .ofType(String.class)
             .describedAs("file name");
         forceOption = optionParser.accepts("force", "Force the removal of members of the consumer group (intended to remove stopped members if a long session timeout was used). " +
-                "Make sure to shut down all stream applications when this option is specified to avoid unexpected rebalances.");

Review comment:
       unnecessary whitespace change




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] ableegoldman merged pull request #8923: KAFKA-6435: KIP-623 Add internal topics option to streamResetter

Posted by GitBox <gi...@apache.org>.
ableegoldman merged pull request #8923:
URL: https://github.com/apache/kafka/pull/8923


   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] JoelWee commented on pull request #8923: KAFKA-6435: KIP-623 Add internal topics option to streamResetter

Posted by GitBox <gi...@apache.org>.
JoelWee commented on pull request #8923:
URL: https://github.com/apache/kafka/pull/8923#issuecomment-820560188


   Thanks @ableegoldman and @wcarlson5! Will try to get this done over the next week or so and ping you when ready :) 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] JoelWee commented on pull request #8923: KAFKA-6435: KIP-623 Add internal topics option to streamResetter

Posted by GitBox <gi...@apache.org>.
JoelWee commented on pull request #8923:
URL: https://github.com/apache/kafka/pull/8923#issuecomment-657181993


   @abbccdda rebased :) 


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] ableegoldman commented on a change in pull request #8923: KAFKA-6435: KIP-623 Add internal topics option to streamResetter

Posted by GitBox <gi...@apache.org>.
ableegoldman commented on a change in pull request #8923:
URL: https://github.com/apache/kafka/pull/8923#discussion_r621665639



##########
File path: streams/src/test/java/org/apache/kafka/streams/integration/ResetIntegrationTest.java
##########
@@ -151,6 +151,22 @@ public void shouldNotAllowToResetWhenIntermediateTopicAbsent() {
         Assert.assertEquals(1, exitCode);
     }
 
+    @Test
+    public void shouldNotAllowToResetWhenSpecifiedInternalTopicAbsent() {

Review comment:
       Hah, maybe I should have asked why are all of those tests not also in `AbstractResetIntegrationTest`. Seems like almost everything that applies here would also be good to test in the SSL version of the test (which AFAICT is the only other one to extend the AbstractResetIntegrationTest).
   
   But I'm ok with leaving it as is, and maybe we can just look into this as followup work unless there is a good reason for them to be where they are (which I can't think of but my imagination is not endless)




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] JoelWee commented on a change in pull request #8923: KAFKA-6435: KIP-623 Add internal topics option to streamResetter

Posted by GitBox <gi...@apache.org>.
JoelWee commented on a change in pull request #8923:
URL: https://github.com/apache/kafka/pull/8923#discussion_r621542393



##########
File path: streams/src/test/java/org/apache/kafka/streams/integration/ResetIntegrationTest.java
##########
@@ -151,6 +151,22 @@ public void shouldNotAllowToResetWhenIntermediateTopicAbsent() {
         Assert.assertEquals(1, exitCode);
     }
 
+    @Test
+    public void shouldNotAllowToResetWhenSpecifiedInternalTopicAbsent() {

Review comment:
       It seems more natural to group it together with the other `shouldNotAllowToResetWhen...` tests. E.g. shouldNotAllowToResetWhenIntermediateTopicAbsent, shouldNotAllowToResetWhenInputTopicAbsent, etc.
   
   Happy to shift it over to AbstractResetIntegrationTest.java




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] ableegoldman commented on pull request #8923: KAFKA-6435: KIP-623 Add internal topics option to streamResetter

Posted by GitBox <gi...@apache.org>.
ableegoldman commented on pull request #8923:
URL: https://github.com/apache/kafka/pull/8923#issuecomment-827995691


   Oof, there are a LOT of flaky test failures in this build. They're all unrelated to this PR, mostly in Connect and the RaftClusterTest, so I'll go ahead and merge, but yikes. 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] JoelWee commented on a change in pull request #8923: KAFKA-6435: KIP-623 Add internal topics option to streamResetter

Posted by GitBox <gi...@apache.org>.
JoelWee commented on a change in pull request #8923:
URL: https://github.com/apache/kafka/pull/8923#discussion_r621538528



##########
File path: streams/src/test/java/org/apache/kafka/streams/integration/AbstractResetIntegrationTest.java
##########
@@ -205,6 +206,34 @@ private void add10InputElements() {
         }
     }
 
+    @Test
+    public void testResetWhenInternalTopicsAreSpecified() throws Exception {
+        final String appID = IntegrationTestUtils.safeUniqueTestName(getClass(), testName);
+        streamsConfig.put(StreamsConfig.APPLICATION_ID_CONFIG, appID);
+
+        // RUN
+        streams = new KafkaStreams(setupTopologyWithIntermediateTopic(true, OUTPUT_TOPIC_2), streamsConfig);
+        streams.start();
+        IntegrationTestUtils.waitUntilMinKeyValueRecordsReceived(resultConsumerConfig, OUTPUT_TOPIC, 10);
+
+        streams.close();
+        waitForEmptyConsumerGroup(adminClient, appID, TIMEOUT_MULTIPLIER * STREAMS_CONSUMER_TIMEOUT);
+
+        // RESET
+        streams.cleanUp();
+
+        final List<String> internalTopics = cluster.getAllTopicsInCluster().stream()
+                .filter(topic -> topic.startsWith(appID + "-"))

Review comment:
       done now :) 




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] ableegoldman commented on a change in pull request #8923: KAFKA-6435: KIP-623 Add internal topics option to streamResetter

Posted by GitBox <gi...@apache.org>.
ableegoldman commented on a change in pull request #8923:
URL: https://github.com/apache/kafka/pull/8923#discussion_r620645865



##########
File path: streams/src/test/java/org/apache/kafka/streams/integration/ResetIntegrationTest.java
##########
@@ -151,6 +151,22 @@ public void shouldNotAllowToResetWhenIntermediateTopicAbsent() {
         Assert.assertEquals(1, exitCode);
     }
 
+    @Test
+    public void shouldNotAllowToResetWhenSpecifiedInternalTopicAbsent() {

Review comment:
       Just wondering, why put this test here instead of in `AbstractResetIntegrationTest`?

##########
File path: streams/src/test/java/org/apache/kafka/streams/integration/ResetIntegrationTest.java
##########
@@ -151,6 +151,22 @@ public void shouldNotAllowToResetWhenIntermediateTopicAbsent() {
         Assert.assertEquals(1, exitCode);
     }
 
+    @Test
+    public void shouldNotAllowToResetWhenSpecifiedInternalTopicAbsent() {

Review comment:
       Also, can we add a test like this but for the case where the topic does exist but just isn't a subset of inferred internal topics?

##########
File path: docs/streams/developer-guide/app-reset-tool.html
##########
@@ -78,6 +78,9 @@
             <h2>Step 1: Run the application reset tool<a class="headerlink" href="#step-1-run-the-application-reset-tool" title="Permalink to this headline"></a></h2>
             <p>Invoke the application reset tool from the command line</p>
             <div class="highlight-bash"><div class="highlight"><pre><span></span><code>&lt;path-to-kafka&gt;/bin/kafka-streams-application-reset</code></pre></div>
+            <p>Warning! This tool makes irreversible changes to your application. It is strongly recommended that you run this once with <code class="docutils literal"><span class="pre">--dry-run</span></code> to preview your changes before making them.</p>
+            <div class="highlight-bash"><div class="highlight"><pre><span></span>&lt;path-to-kafka&gt;/bin/kafka-streams-application-reset

Review comment:
       You don't need to repeat this line, it's just printing an example of running the app reset tool for the line above (`Invoke the application reset tool from the command line`)

##########
File path: streams/src/test/java/org/apache/kafka/streams/integration/AbstractResetIntegrationTest.java
##########
@@ -205,6 +206,34 @@ private void add10InputElements() {
         }
     }
 
+    @Test
+    public void testResetWhenInternalTopicsAreSpecified() throws Exception {
+        final String appID = IntegrationTestUtils.safeUniqueTestName(getClass(), testName);
+        streamsConfig.put(StreamsConfig.APPLICATION_ID_CONFIG, appID);
+
+        // RUN
+        streams = new KafkaStreams(setupTopologyWithIntermediateTopic(true, OUTPUT_TOPIC_2), streamsConfig);
+        streams.start();
+        IntegrationTestUtils.waitUntilMinKeyValueRecordsReceived(resultConsumerConfig, OUTPUT_TOPIC, 10);
+
+        streams.close();
+        waitForEmptyConsumerGroup(adminClient, appID, TIMEOUT_MULTIPLIER * STREAMS_CONSUMER_TIMEOUT);
+
+        // RESET
+        streams.cleanUp();
+
+        final List<String> internalTopics = cluster.getAllTopicsInCluster().stream()
+                .filter(topic -> topic.startsWith(appID + "-"))

Review comment:
       nit: we should also filter for internal topics specifically, like what's done in `StreamsResetter#matchesInternalTopicFormat`. Actually you can probably just invoke that method directly for the filter here (it can be made static if necessary)




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] abbccdda commented on pull request #8923: KAFKA-6435: KIP-623 Add internal topics option to streamResetter

Posted by GitBox <gi...@apache.org>.
abbccdda commented on pull request #8923:
URL: https://github.com/apache/kafka/pull/8923#issuecomment-656951683


   @JoelWee Could we rebase this work?


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] abbccdda commented on a change in pull request #8923: KAFKA-6435: KIP-623 Add internal topics option to streamResetter

Posted by GitBox <gi...@apache.org>.
abbccdda commented on a change in pull request #8923:
URL: https://github.com/apache/kafka/pull/8923#discussion_r466127053



##########
File path: core/src/main/scala/kafka/tools/StreamsResetter.java
##########
@@ -646,22 +655,68 @@ private boolean isIntermediateTopic(final String topic) {
         return options.valuesOf(intermediateTopicsOption).contains(topic);
     }
 
-    private void maybeDeleteInternalTopics(final Admin adminClient, final boolean dryRun) {
-        System.out.println("Deleting all internal/auto-created topics for application " + options.valueOf(applicationIdOption));
+    private int maybeDeleteInternalTopics(final Admin adminClient, final boolean dryRun) {
+        if (!options.valuesOf(internalTopicsOption).isEmpty()) {
+            return maybeDeleteSpecifiedInternalTopics(adminClient, dryRun);
+        } else {
+            return maybeDeleteInferredInternalTopics(adminClient, dryRun);
+        }
+    }
+
+    private int maybeDeleteSpecifiedInternalTopics(final Admin adminClient, final boolean dryRun) {
+        final List<String> internalTopics = options.valuesOf(internalTopicsOption);
+        int topicNotFound = EXIT_CODE_SUCCESS;
+
+        final List<String> topicsToDelete = new ArrayList<>();
+        final List<String> notFoundInternalTopics = new ArrayList<>();
+
+        System.out.println("Deleting specified internal/auto-created topics " + internalTopics);
+        for (final String topic : internalTopics) {
+            if (allTopics.contains(topic) && isInferredInternalTopic(topic)) {
+                topicsToDelete.add(topic);
+            } else {
+                notFoundInternalTopics.add(topic);
+            }
+        }
+
+        if (!notFoundInternalTopics.isEmpty()) {
+            System.out.println("Following topics were not detected as internal, skipping them");
+            for (final String topic : notFoundInternalTopics) {
+                System.out.println("Topic: " + topic);
+            }
+            topicNotFound = EXIT_CODE_ERROR;
+        }
+
+        System.out.println("Following internal topics will be deleted for application " + options.valueOf(applicationIdOption));
+        for (final String topic : topicsToDelete) {
+            System.out.println("Topic: " + topic);
+        }
+
+        if (!dryRun) {
+            doDelete(topicsToDelete, adminClient);
+        }
+        System.out.println("Done.");
+        return topicNotFound;
+    }
+
+    private int maybeDeleteInferredInternalTopics(final Admin adminClient, final boolean dryRun) {
         final List<String> topicsToDelete = new ArrayList<>();
         for (final String listing : allTopics) {
-            if (isInternalTopic(listing)) {
-                if (!dryRun) {
-                    topicsToDelete.add(listing);
-                } else {
-                    System.out.println("Topic: " + listing);
-                }
+            if (isInferredInternalTopic(listing)) {
+                topicsToDelete.add(listing);
             }
         }
+
+        System.out.println("Following inferred internal/auto-created topics will be deleted for application " + options.valueOf(applicationIdOption));
+        for (final String topic : topicsToDelete) {
+            System.out.println("Topic: " + topic);
+        }
+
         if (!dryRun) {
             doDelete(topicsToDelete, adminClient);
         }
         System.out.println("Done.");
+        return EXIT_CODE_SUCCESS;

Review comment:
       We could refactor out a helper function here.

##########
File path: docs/streams/developer-guide/app-reset-tool.html
##########
@@ -77,6 +77,7 @@
         <div class="section" id="step-1-run-the-application-reset-tool">
             <h2>Step 1: Run the application reset tool<a class="headerlink" href="#step-1-run-the-application-reset-tool" title="Permalink to this headline"></a></h2>
             <p>Invoke the application reset tool from the command line</p>
+            <p>Warning! This tool makes irreversible changes to your application. It is strongly recommended that you run this once with --dry-run to preview your changes before making them.</p>

Review comment:
       for html changes, it is recommended to include a screenshot of the built website. Please refer to https://cwiki.apache.org/confluence/display/KAFKA/Setup+Kafka+Website+on+Local+Apache+Server

##########
File path: streams/src/test/java/org/apache/kafka/streams/integration/AbstractResetIntegrationTest.java
##########
@@ -262,6 +263,52 @@ public void shouldNotAllowToResetWhenIntermediateTopicAbsent() throws Exception
         Assert.assertEquals(1, exitCode);
     }
 
+    void shouldNotAllowToResetWhenSpecifiedInternalTopicAbsent() throws Exception {
+        appID = testId + "-not-reset-without-intermediate-topic";
+        final String[] parameters = new String[]{
+            "--application-id", appID,
+            "--bootstrap-servers", cluster.bootstrapServers(),
+            "--internal-topics", NON_EXISTING_TOPIC,
+            "--execute"
+        };
+        final Properties cleanUpConfig = new Properties();
+        cleanUpConfig.put(ConsumerConfig.HEARTBEAT_INTERVAL_MS_CONFIG, 100);
+        cleanUpConfig.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, "" + CLEANUP_CONSUMER_TIMEOUT);
+
+        final int exitCode = new StreamsResetter().run(parameters, cleanUpConfig);
+        Assert.assertEquals(1, exitCode);
+    }
+
+    void testResetWhenInternalTopicsAreSpecified() throws Exception {
+        final boolean useRepartitioned = true;
+
+        appID = testId + "-with-internal-topics-option";
+        streamsConfig.put(StreamsConfig.APPLICATION_ID_CONFIG, appID);
+
+        // RUN
+        streams = new KafkaStreams(setupTopologyWithIntermediateTopic(useRepartitioned, OUTPUT_TOPIC_2), streamsConfig);
+        streams.start();
+        IntegrationTestUtils.waitUntilMinKeyValueRecordsReceived(resultConsumerConfig, OUTPUT_TOPIC, 10);
+
+        streams.close();
+        waitForEmptyConsumerGroup(adminClient, appID, TIMEOUT_MULTIPLIER * STREAMS_CONSUMER_TIMEOUT);
+
+        // RESET
+        streams.cleanUp();
+
+        final List<String> internalTopics = cluster.getAllTopicsInCluster().stream()
+                .filter(topic -> topic.startsWith(appID + "-"))
+                .collect(Collectors.toList());
+        final boolean cleanResult = tryCleanGlobal(!useRepartitioned,

Review comment:
       Could just use `false`

##########
File path: core/src/main/scala/kafka/tools/StreamsResetter.java
##########
@@ -167,7 +171,7 @@ public int run(final String[] args,
             final HashMap<Object, Object> consumerConfig = new HashMap<>(config);
             consumerConfig.putAll(properties);
             exitCode = maybeResetInputAndSeekToEndIntermediateTopicOffsets(consumerConfig, dryRun);
-            maybeDeleteInternalTopics(adminClient, dryRun);
+            exitCode |= maybeDeleteInternalTopics(adminClient, dryRun);

Review comment:
       should we do check both exit codes and decide whether to return 0 or 1?

##########
File path: core/src/main/scala/kafka/tools/StreamsResetter.java
##########
@@ -646,22 +655,68 @@ private boolean isIntermediateTopic(final String topic) {
         return options.valuesOf(intermediateTopicsOption).contains(topic);
     }
 
-    private void maybeDeleteInternalTopics(final Admin adminClient, final boolean dryRun) {
-        System.out.println("Deleting all internal/auto-created topics for application " + options.valueOf(applicationIdOption));
+    private int maybeDeleteInternalTopics(final Admin adminClient, final boolean dryRun) {
+        if (!options.valuesOf(internalTopicsOption).isEmpty()) {
+            return maybeDeleteSpecifiedInternalTopics(adminClient, dryRun);
+        } else {
+            return maybeDeleteInferredInternalTopics(adminClient, dryRun);
+        }
+    }
+
+    private int maybeDeleteSpecifiedInternalTopics(final Admin adminClient, final boolean dryRun) {
+        final List<String> internalTopics = options.valuesOf(internalTopicsOption);
+        int topicNotFound = EXIT_CODE_SUCCESS;
+
+        final List<String> topicsToDelete = new ArrayList<>();
+        final List<String> notFoundInternalTopics = new ArrayList<>();
+
+        System.out.println("Deleting specified internal/auto-created topics " + internalTopics);
+        for (final String topic : internalTopics) {
+            if (allTopics.contains(topic) && isInferredInternalTopic(topic)) {
+                topicsToDelete.add(topic);
+            } else {
+                notFoundInternalTopics.add(topic);
+            }
+        }
+
+        if (!notFoundInternalTopics.isEmpty()) {

Review comment:
       Should we throw exception here to stop the run or just skip the not found topics?

##########
File path: streams/src/test/java/org/apache/kafka/streams/integration/AbstractResetIntegrationTest.java
##########
@@ -262,6 +263,52 @@ public void shouldNotAllowToResetWhenIntermediateTopicAbsent() throws Exception
         Assert.assertEquals(1, exitCode);
     }
 
+    void shouldNotAllowToResetWhenSpecifiedInternalTopicAbsent() throws Exception {
+        appID = testId + "-not-reset-without-intermediate-topic";
+        final String[] parameters = new String[]{
+            "--application-id", appID,
+            "--bootstrap-servers", cluster.bootstrapServers(),
+            "--internal-topics", NON_EXISTING_TOPIC,
+            "--execute"
+        };
+        final Properties cleanUpConfig = new Properties();
+        cleanUpConfig.put(ConsumerConfig.HEARTBEAT_INTERVAL_MS_CONFIG, 100);
+        cleanUpConfig.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, "" + CLEANUP_CONSUMER_TIMEOUT);
+
+        final int exitCode = new StreamsResetter().run(parameters, cleanUpConfig);
+        Assert.assertEquals(1, exitCode);
+    }
+
+    void testResetWhenInternalTopicsAreSpecified() throws Exception {
+        final boolean useRepartitioned = true;
+
+        appID = testId + "-with-internal-topics-option";
+        streamsConfig.put(StreamsConfig.APPLICATION_ID_CONFIG, appID);
+
+        // RUN
+        streams = new KafkaStreams(setupTopologyWithIntermediateTopic(useRepartitioned, OUTPUT_TOPIC_2), streamsConfig);
+        streams.start();
+        IntegrationTestUtils.waitUntilMinKeyValueRecordsReceived(resultConsumerConfig, OUTPUT_TOPIC, 10);
+
+        streams.close();
+        waitForEmptyConsumerGroup(adminClient, appID, TIMEOUT_MULTIPLIER * STREAMS_CONSUMER_TIMEOUT);
+
+        // RESET
+        streams.cleanUp();
+
+        final List<String> internalTopics = cluster.getAllTopicsInCluster().stream()
+                .filter(topic -> topic.startsWith(appID + "-"))
+                .collect(Collectors.toList());
+        final boolean cleanResult = tryCleanGlobal(!useRepartitioned,
+                "--internal-topics",
+                String.join(",", internalTopics.subList(1, internalTopics.size())) + "," + OUTPUT_TOPIC);
+        Assert.assertEquals(false, cleanResult); // Reset will give error code since output topic is not a valid internal topic

Review comment:
       could use `assertFalse`

##########
File path: core/src/main/scala/kafka/tools/StreamsResetter.java
##########
@@ -646,22 +655,68 @@ private boolean isIntermediateTopic(final String topic) {
         return options.valuesOf(intermediateTopicsOption).contains(topic);
     }
 
-    private void maybeDeleteInternalTopics(final Admin adminClient, final boolean dryRun) {
-        System.out.println("Deleting all internal/auto-created topics for application " + options.valueOf(applicationIdOption));
+    private int maybeDeleteInternalTopics(final Admin adminClient, final boolean dryRun) {
+        if (!options.valuesOf(internalTopicsOption).isEmpty()) {
+            return maybeDeleteSpecifiedInternalTopics(adminClient, dryRun);
+        } else {
+            return maybeDeleteInferredInternalTopics(adminClient, dryRun);
+        }
+    }
+
+    private int maybeDeleteSpecifiedInternalTopics(final Admin adminClient, final boolean dryRun) {
+        final List<String> internalTopics = options.valuesOf(internalTopicsOption);
+        int topicNotFound = EXIT_CODE_SUCCESS;
+
+        final List<String> topicsToDelete = new ArrayList<>();
+        final List<String> notFoundInternalTopics = new ArrayList<>();
+
+        System.out.println("Deleting specified internal/auto-created topics " + internalTopics);
+        for (final String topic : internalTopics) {
+            if (allTopics.contains(topic) && isInferredInternalTopic(topic)) {
+                topicsToDelete.add(topic);
+            } else {
+                notFoundInternalTopics.add(topic);
+            }
+        }
+
+        if (!notFoundInternalTopics.isEmpty()) {
+            System.out.println("Following topics were not detected as internal, skipping them");
+            for (final String topic : notFoundInternalTopics) {
+                System.out.println("Topic: " + topic);
+            }
+            topicNotFound = EXIT_CODE_ERROR;
+        }
+
+        System.out.println("Following internal topics will be deleted for application " + options.valueOf(applicationIdOption));
+        for (final String topic : topicsToDelete) {
+            System.out.println("Topic: " + topic);
+        }
+
+        if (!dryRun) {
+            doDelete(topicsToDelete, adminClient);
+        }
+        System.out.println("Done.");
+        return topicNotFound;
+    }
+
+    private int maybeDeleteInferredInternalTopics(final Admin adminClient, final boolean dryRun) {
         final List<String> topicsToDelete = new ArrayList<>();
         for (final String listing : allTopics) {
-            if (isInternalTopic(listing)) {
-                if (!dryRun) {
-                    topicsToDelete.add(listing);
-                } else {
-                    System.out.println("Topic: " + listing);
-                }
+            if (isInferredInternalTopic(listing)) {
+                topicsToDelete.add(listing);
             }
         }
+
+        System.out.println("Following inferred internal/auto-created topics will be deleted for application " + options.valueOf(applicationIdOption));
+        for (final String topic : topicsToDelete) {

Review comment:
       Could we print all topics in one line, instead of multiple?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] wcarlson5 commented on a change in pull request #8923: KAFKA-6435: KIP-623 Add internal topics option to streamResetter

Posted by GitBox <gi...@apache.org>.
wcarlson5 commented on a change in pull request #8923:
URL: https://github.com/apache/kafka/pull/8923#discussion_r614166802



##########
File path: core/src/main/scala/kafka/tools/StreamsResetter.java
##########
@@ -248,7 +257,7 @@ private void parseArguments(final String[] args) {
             .ofType(String.class)
             .describedAs("file name");
         forceOption = optionParser.accepts("force", "Force the removal of members of the consumer group (intended to remove stopped members if a long session timeout was used). " +
-                "Make sure to shut down all stream applications when this option is specified to avoid unexpected rebalances.");

Review comment:
       unnecessary

##########
File path: core/src/main/scala/kafka/tools/StreamsResetter.java
##########
@@ -225,6 +229,11 @@ private void parseArguments(final String[] args) {
             .ofType(String.class)
             .withValuesSeparatedBy(',')
             .describedAs("list");
+        internalTopicsOption = optionParser.accepts("internal-topics", "Comma-separated list of internal topics to delete. Must be a subset of the internal topics marked for deletion by the default behaviour (do a dry-run without this option to view these topics).")

Review comment:
       line is too long

##########
File path: core/src/main/scala/kafka/tools/StreamsResetter.java
##########
@@ -642,22 +651,38 @@ private boolean isIntermediateTopic(final String topic) {
         return options.valuesOf(intermediateTopicsOption).contains(topic);
     }
 
-    private void maybeDeleteInternalTopics(final Admin adminClient, final boolean dryRun) {
-        System.out.println("Deleting all internal/auto-created topics for application " + options.valueOf(applicationIdOption));
-        final List<String> topicsToDelete = new ArrayList<>();
-        for (final String listing : allTopics) {
-            if (isInternalTopic(listing)) {
-                if (!dryRun) {
-                    topicsToDelete.add(listing);
-                } else {
-                    System.out.println("Topic: " + listing);
-                }
+    private int maybeDeleteInternalTopics(final Admin adminClient, final boolean dryRun) {
+        final List<String> inferredInternalTopics = allTopics.stream()
+                .filter(this::isInferredInternalTopic)
+                .collect(Collectors.toList());
+        final List<String> specifiedInternalTopics = options.valuesOf(internalTopicsOption);
+        final List<String> topicsToDelete;
+
+        if (!specifiedInternalTopics.isEmpty()) {
+            final List<String> notFoundInternalTopics = specifiedInternalTopics.stream()
+                    .filter(topic -> !inferredInternalTopics.contains(topic))
+                    .collect(Collectors.toList());
+            if (!notFoundInternalTopics.isEmpty()) {

Review comment:
       something like `inferredInternalTopics.containsAll(specifiedInternalTopics)` might be easier to understand here




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] JoelWee commented on a change in pull request #8923: KAFKA-6435: KIP-623 Add internal topics option to streamResetter

Posted by GitBox <gi...@apache.org>.
JoelWee commented on a change in pull request #8923:
URL: https://github.com/apache/kafka/pull/8923#discussion_r469725796



##########
File path: core/src/main/scala/kafka/tools/StreamsResetter.java
##########
@@ -646,22 +655,68 @@ private boolean isIntermediateTopic(final String topic) {
         return options.valuesOf(intermediateTopicsOption).contains(topic);
     }
 
-    private void maybeDeleteInternalTopics(final Admin adminClient, final boolean dryRun) {
-        System.out.println("Deleting all internal/auto-created topics for application " + options.valueOf(applicationIdOption));
+    private int maybeDeleteInternalTopics(final Admin adminClient, final boolean dryRun) {
+        if (!options.valuesOf(internalTopicsOption).isEmpty()) {
+            return maybeDeleteSpecifiedInternalTopics(adminClient, dryRun);
+        } else {
+            return maybeDeleteInferredInternalTopics(adminClient, dryRun);
+        }
+    }
+
+    private int maybeDeleteSpecifiedInternalTopics(final Admin adminClient, final boolean dryRun) {
+        final List<String> internalTopics = options.valuesOf(internalTopicsOption);
+        int topicNotFound = EXIT_CODE_SUCCESS;
+
+        final List<String> topicsToDelete = new ArrayList<>();
+        final List<String> notFoundInternalTopics = new ArrayList<>();
+
+        System.out.println("Deleting specified internal/auto-created topics " + internalTopics);
+        for (final String topic : internalTopics) {
+            if (allTopics.contains(topic) && isInferredInternalTopic(topic)) {
+                topicsToDelete.add(topic);
+            } else {
+                notFoundInternalTopics.add(topic);
+            }
+        }
+
+        if (!notFoundInternalTopics.isEmpty()) {

Review comment:
       I would prefer to throw an exception, since it's likely that the user has made a mistake. I've implemented this now




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org