You are viewing a plain text version of this content. The canonical link for it is here.
Posted to jira@kafka.apache.org by GitBox <gi...@apache.org> on 2021/04/26 21:05:05 UTC

[GitHub] [kafka] ableegoldman commented on a change in pull request #8923: KAFKA-6435: KIP-623 Add internal topics option to streamResetter

ableegoldman commented on a change in pull request #8923:
URL: https://github.com/apache/kafka/pull/8923#discussion_r620645865



##########
File path: streams/src/test/java/org/apache/kafka/streams/integration/ResetIntegrationTest.java
##########
@@ -151,6 +151,22 @@ public void shouldNotAllowToResetWhenIntermediateTopicAbsent() {
         Assert.assertEquals(1, exitCode);
     }
 
+    @Test
+    public void shouldNotAllowToResetWhenSpecifiedInternalTopicAbsent() {

Review comment:
       Just wondering, why put this test here instead of in `AbstractResetIntegrationTest`?

##########
File path: streams/src/test/java/org/apache/kafka/streams/integration/ResetIntegrationTest.java
##########
@@ -151,6 +151,22 @@ public void shouldNotAllowToResetWhenIntermediateTopicAbsent() {
         Assert.assertEquals(1, exitCode);
     }
 
+    @Test
+    public void shouldNotAllowToResetWhenSpecifiedInternalTopicAbsent() {

Review comment:
       Also, can we add a test like this but for the case where the topic does exist but just isn't a subset of inferred internal topics?

##########
File path: docs/streams/developer-guide/app-reset-tool.html
##########
@@ -78,6 +78,9 @@
             <h2>Step 1: Run the application reset tool<a class="headerlink" href="#step-1-run-the-application-reset-tool" title="Permalink to this headline"></a></h2>
             <p>Invoke the application reset tool from the command line</p>
             <div class="highlight-bash"><div class="highlight"><pre><span></span><code>&lt;path-to-kafka&gt;/bin/kafka-streams-application-reset</code></pre></div>
+            <p>Warning! This tool makes irreversible changes to your application. It is strongly recommended that you run this once with <code class="docutils literal"><span class="pre">--dry-run</span></code> to preview your changes before making them.</p>
+            <div class="highlight-bash"><div class="highlight"><pre><span></span>&lt;path-to-kafka&gt;/bin/kafka-streams-application-reset

Review comment:
       You don't need to repeat this line, it's just printing an example of running the app reset tool for the line above (`Invoke the application reset tool from the command line`)

##########
File path: streams/src/test/java/org/apache/kafka/streams/integration/AbstractResetIntegrationTest.java
##########
@@ -205,6 +206,34 @@ private void add10InputElements() {
         }
     }
 
+    @Test
+    public void testResetWhenInternalTopicsAreSpecified() throws Exception {
+        final String appID = IntegrationTestUtils.safeUniqueTestName(getClass(), testName);
+        streamsConfig.put(StreamsConfig.APPLICATION_ID_CONFIG, appID);
+
+        // RUN
+        streams = new KafkaStreams(setupTopologyWithIntermediateTopic(true, OUTPUT_TOPIC_2), streamsConfig);
+        streams.start();
+        IntegrationTestUtils.waitUntilMinKeyValueRecordsReceived(resultConsumerConfig, OUTPUT_TOPIC, 10);
+
+        streams.close();
+        waitForEmptyConsumerGroup(adminClient, appID, TIMEOUT_MULTIPLIER * STREAMS_CONSUMER_TIMEOUT);
+
+        // RESET
+        streams.cleanUp();
+
+        final List<String> internalTopics = cluster.getAllTopicsInCluster().stream()
+                .filter(topic -> topic.startsWith(appID + "-"))

Review comment:
       nit: we should also filter for internal topics specifically, like what's done in `StreamsResetter#matchesInternalTopicFormat`. Actually you can probably just invoke that method directly for the filter here (it can be made static if necessary)




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org