You are viewing a plain text version of this content. The canonical link for it is here.
Posted to jira@kafka.apache.org by GitBox <gi...@apache.org> on 2020/11/24 01:20:12 UTC

[GitHub] [kafka] ableegoldman opened a new pull request #9648: KAFKA-10758: ProcessorTopology should only consider its own nodes when updating regex source topics

ableegoldman opened a new pull request #9648:
URL: https://github.com/apache/kafka/pull/9648


   The problem is basically just that we compare two incompatible sets in ProcessorTopology#updateSourceTopics: the local `sourceNodesByName` map only contains nodes that correspond to a particular subtopology whereas the passed-in `nodeToSourceTopics` ultimately comes from the InternalTopologyBuilder's map, which contains nodes for the entire topology. So we would end up hitting the IllegalStateException thrown in #updateSourceTopics` any time we tried to update an application with more than one subtopology.
   
   The fix is simple, we just need to ignore any source nodes that aren't part of the ProcessorTopology's subtopology.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] ableegoldman commented on a change in pull request #9648: KAFKA-10758: ProcessorTopology should only consider its own nodes when updating regex source topics

Posted by GitBox <gi...@apache.org>.
ableegoldman commented on a change in pull request #9648:
URL: https://github.com/apache/kafka/pull/9648#discussion_r529101499



##########
File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/AbstractTask.java
##########
@@ -154,9 +154,9 @@ final void transitionTo(final Task.State newState) {
     }
 
     @Override
-    public void update(final Set<TopicPartition> topicPartitions, final Map<String, List<String>> nodeToSourceTopics) {

Review comment:
       Since the root cause of this bug was basically just confusion over what exactly this set contains, a renaming feels in order




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] ableegoldman commented on a change in pull request #9648: KAFKA-10758: ProcessorTopology should only consider its own nodes when updating regex source topics

Posted by GitBox <gi...@apache.org>.
ableegoldman commented on a change in pull request #9648:
URL: https://github.com/apache/kafka/pull/9648#discussion_r529932449



##########
File path: streams/src/test/java/org/apache/kafka/streams/integration/RegexSourceIntegrationTest.java
##########
@@ -198,9 +200,16 @@ public void testRegexRecordsAreProcessedAfterReassignment() throws Exception {
 
             final StreamsBuilder builder = new StreamsBuilder();
             final KStream<String, String> pattern1Stream = builder.stream(Pattern.compile("TEST-TOPIC-\\d"));
-            pattern1Stream.to(outputTopic, Produced.with(Serdes.String(), Serdes.String()));
+            final KStream<String, String> otherStream = builder.stream(Pattern.compile("not-a-match"));
+
+            pattern1Stream
+                .selectKey((k, v) -> k)
+                .groupByKey()
+                .aggregate(() -> "", (k, v, a) -> v)
+                .toStream().to(outputTopic, Produced.with(Serdes.String(), Serdes.String()));

Review comment:
       There are two ways to have more than one source node: either reading from a different input topic/pattern, or via a repartition. I agree that we should test both of these cases, but I'd prefer to do so in a single integration test rather than in two separate integration tests, to avoid making the test suite even longer




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] ableegoldman commented on pull request #9648: KAFKA-10758: ProcessorTopology should only consider its own nodes when updating regex source topics

Posted by GitBox <gi...@apache.org>.
ableegoldman commented on pull request #9648:
URL: https://github.com/apache/kafka/pull/9648#issuecomment-733420127


   Cherrypicked to 2.7 cc @bbejeck 


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] ableegoldman commented on pull request #9648: KAFKA-10758: ProcessorTopology should only consider its own nodes when updating regex source topics

Posted by GitBox <gi...@apache.org>.
ableegoldman commented on pull request #9648:
URL: https://github.com/apache/kafka/pull/9648#issuecomment-733428965


   Cherrypicked to 2.6 cc/ @mimaison 


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] ableegoldman commented on a change in pull request #9648: KAFKA-10758: ProcessorTopology should only consider its own nodes when updating regex source topics

Posted by GitBox <gi...@apache.org>.
ableegoldman commented on a change in pull request #9648:
URL: https://github.com/apache/kafka/pull/9648#discussion_r529798216



##########
File path: streams/src/test/java/org/apache/kafka/streams/integration/RegexSourceIntegrationTest.java
##########
@@ -198,9 +200,16 @@ public void testRegexRecordsAreProcessedAfterReassignment() throws Exception {
 
             final StreamsBuilder builder = new StreamsBuilder();
             final KStream<String, String> pattern1Stream = builder.stream(Pattern.compile("TEST-TOPIC-\\d"));
-            pattern1Stream.to(outputTopic, Produced.with(Serdes.String(), Serdes.String()));
+            final KStream<String, String> otherStream = builder.stream(Pattern.compile("not-a-match"));
+
+            pattern1Stream
+                .selectKey((k, v) -> k)
+                .groupByKey()
+                .aggregate(() -> "", (k, v, a) -> v)
+                .toStream().to(outputTopic, Produced.with(Serdes.String(), Serdes.String()));
+
             streams = new KafkaStreams(builder.build(), streamsConfiguration);

Review comment:
       Ack good call




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] ableegoldman commented on a change in pull request #9648: KAFKA-10758: ProcessorTopology should only consider its own nodes when updating regex source topics

Posted by GitBox <gi...@apache.org>.
ableegoldman commented on a change in pull request #9648:
URL: https://github.com/apache/kafka/pull/9648#discussion_r529109355



##########
File path: streams/src/test/java/org/apache/kafka/streams/integration/RegexSourceIntegrationTest.java
##########
@@ -198,9 +200,16 @@ public void testRegexRecordsAreProcessedAfterReassignment() throws Exception {
 
             final StreamsBuilder builder = new StreamsBuilder();
             final KStream<String, String> pattern1Stream = builder.stream(Pattern.compile("TEST-TOPIC-\\d"));
-            pattern1Stream.to(outputTopic, Produced.with(Serdes.String(), Serdes.String()));
+            final KStream<String, String> otherStream = builder.stream(Pattern.compile("not-a-match"));

Review comment:
       We didn't catch the bug in this test for two reasons: it has only one subtopology, and it didn't wait for Streams to get to RUNNING before it created the new topic. So we weren't even covering the "update source topics" code path since all topics existed by the first assignment




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] cadonna commented on a change in pull request #9648: KAFKA-10758: ProcessorTopology should only consider its own nodes when updating regex source topics

Posted by GitBox <gi...@apache.org>.
cadonna commented on a change in pull request #9648:
URL: https://github.com/apache/kafka/pull/9648#discussion_r529846543



##########
File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorTopology.java
##########
@@ -149,24 +149,30 @@ public boolean hasPersistentGlobalStore() {
         return false;
     }
 
-    public void updateSourceTopics(final Map<String, List<String>> sourceTopicsByName) {
-        if (!sourceTopicsByName.keySet().equals(sourceNodesByName.keySet())) {
-            log.error("Set of source nodes do not match: \n" +
-                "sourceNodesByName = {}\n" +
-                "sourceTopicsByName = {}",
-                sourceNodesByName.keySet(), sourceTopicsByName.keySet());
-            throw new IllegalStateException("Tried to update source topics but source nodes did not match");
-        }
+    public void updateSourceTopics(final Map<String, List<String>> allSourceTopicsByNodeName) {

Review comment:
       The scope of an integration test and of a unit test are quite different. The integration test specifically tests the pattern subscription whereas the unit test just tests the update of the source topics. Probably at the moment the update of the source topics is only called when pattern subscription is used, but that assumption seems rather brittle to me because   it may change in future. I think that neither of the tests is useless. If one were useless, I would keep the unit test for test execution performance reasons.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] cadonna commented on a change in pull request #9648: KAFKA-10758: ProcessorTopology should only consider its own nodes when updating regex source topics

Posted by GitBox <gi...@apache.org>.
cadonna commented on a change in pull request #9648:
URL: https://github.com/apache/kafka/pull/9648#discussion_r529280072



##########
File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorTopology.java
##########
@@ -149,24 +149,30 @@ public boolean hasPersistentGlobalStore() {
         return false;
     }
 
-    public void updateSourceTopics(final Map<String, List<String>> sourceTopicsByName) {
-        if (!sourceTopicsByName.keySet().equals(sourceNodesByName.keySet())) {
-            log.error("Set of source nodes do not match: \n" +
-                "sourceNodesByName = {}\n" +
-                "sourceTopicsByName = {}",
-                sourceNodesByName.keySet(), sourceTopicsByName.keySet());
-            throw new IllegalStateException("Tried to update source topics but source nodes did not match");
-        }
+    public void updateSourceTopics(final Map<String, List<String>> allSourceTopicsByNodeName) {

Review comment:
       Could you also add a unit test that verifies that the issue is gone?

##########
File path: streams/src/test/java/org/apache/kafka/streams/integration/RegexSourceIntegrationTest.java
##########
@@ -198,9 +200,16 @@ public void testRegexRecordsAreProcessedAfterReassignment() throws Exception {
 
             final StreamsBuilder builder = new StreamsBuilder();
             final KStream<String, String> pattern1Stream = builder.stream(Pattern.compile("TEST-TOPIC-\\d"));
-            pattern1Stream.to(outputTopic, Produced.with(Serdes.String(), Serdes.String()));
+            final KStream<String, String> otherStream = builder.stream(Pattern.compile("not-a-match"));
+
+            pattern1Stream
+                .selectKey((k, v) -> k)
+                .groupByKey()
+                .aggregate(() -> "", (k, v, a) -> v)
+                .toStream().to(outputTopic, Produced.with(Serdes.String(), Serdes.String()));
+
             streams = new KafkaStreams(builder.build(), streamsConfiguration);

Review comment:
       I would add the following assert here to ensure we have more than one sub-topology:
   
   ```
               final Topology topology = builder.build();
               assertThat(topology.describe().subtopologies().size(), greaterThan(1));
               streams = new KafkaStreams(topology, streamsConfiguration);
   ```
   
   Just to make it clear that we want to test with multiple sub-topologies.

##########
File path: streams/src/test/java/org/apache/kafka/streams/integration/RegexSourceIntegrationTest.java
##########
@@ -198,9 +200,16 @@ public void testRegexRecordsAreProcessedAfterReassignment() throws Exception {
 
             final StreamsBuilder builder = new StreamsBuilder();
             final KStream<String, String> pattern1Stream = builder.stream(Pattern.compile("TEST-TOPIC-\\d"));
-            pattern1Stream.to(outputTopic, Produced.with(Serdes.String(), Serdes.String()));
+            final KStream<String, String> otherStream = builder.stream(Pattern.compile("not-a-match"));
+
+            pattern1Stream
+                .selectKey((k, v) -> k)
+                .groupByKey()
+                .aggregate(() -> "", (k, v, a) -> v)
+                .toStream().to(outputTopic, Produced.with(Serdes.String(), Serdes.String()));

Review comment:
       I would even add `otherStream` like:
   ```suggestion
                   .toStream()
                   .merge(otherStream)
                   .to(outputTopic, Produced.with(Serdes.String(), Serdes.String()));
   ```




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] ableegoldman commented on a change in pull request #9648: KAFKA-10758: ProcessorTopology should only consider its own nodes when updating regex source topics

Posted by GitBox <gi...@apache.org>.
ableegoldman commented on a change in pull request #9648:
URL: https://github.com/apache/kafka/pull/9648#discussion_r529798140



##########
File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorTopology.java
##########
@@ -149,24 +149,30 @@ public boolean hasPersistentGlobalStore() {
         return false;
     }
 
-    public void updateSourceTopics(final Map<String, List<String>> sourceTopicsByName) {
-        if (!sourceTopicsByName.keySet().equals(sourceNodesByName.keySet())) {
-            log.error("Set of source nodes do not match: \n" +
-                "sourceNodesByName = {}\n" +
-                "sourceTopicsByName = {}",
-                sourceNodesByName.keySet(), sourceTopicsByName.keySet());
-            throw new IllegalStateException("Tried to update source topics but source nodes did not match");
-        }
+    public void updateSourceTopics(final Map<String, List<String>> allSourceTopicsByNodeName) {

Review comment:
       I modified the existing test rather than add a new one, and verified that the modified test does fail without the changes. Since the original test wasn't actually testing anything prior to the modifications I thought it made sense to fix the test rather than add a new one and retain a useless test in addition




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] cadonna commented on a change in pull request #9648: KAFKA-10758: ProcessorTopology should only consider its own nodes when updating regex source topics

Posted by GitBox <gi...@apache.org>.
cadonna commented on a change in pull request #9648:
URL: https://github.com/apache/kafka/pull/9648#discussion_r529840078



##########
File path: streams/src/test/java/org/apache/kafka/streams/integration/RegexSourceIntegrationTest.java
##########
@@ -198,9 +200,16 @@ public void testRegexRecordsAreProcessedAfterReassignment() throws Exception {
 
             final StreamsBuilder builder = new StreamsBuilder();
             final KStream<String, String> pattern1Stream = builder.stream(Pattern.compile("TEST-TOPIC-\\d"));
-            pattern1Stream.to(outputTopic, Produced.with(Serdes.String(), Serdes.String()));
+            final KStream<String, String> otherStream = builder.stream(Pattern.compile("not-a-match"));
+
+            pattern1Stream
+                .selectKey((k, v) -> k)
+                .groupByKey()
+                .aggregate(() -> "", (k, v, a) -> v)
+                .toStream().to(outputTopic, Produced.with(Serdes.String(), Serdes.String()));

Review comment:
       You would have a source node in one sub-topology an a source node in the other topology. I thought that was the pattern in the bug report, but I now realized that the bug report uses a pattern similar to the one you did specified. Wouldn't it make sense to test both?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] ableegoldman commented on a change in pull request #9648: KAFKA-10758: ProcessorTopology should only consider its own nodes when updating regex source topics

Posted by GitBox <gi...@apache.org>.
ableegoldman commented on a change in pull request #9648:
URL: https://github.com/apache/kafka/pull/9648#discussion_r529104011



##########
File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorTopology.java
##########
@@ -149,24 +149,30 @@ public boolean hasPersistentGlobalStore() {
         return false;
     }
 
-    public void updateSourceTopics(final Map<String, List<String>> sourceTopicsByName) {
-        if (!sourceTopicsByName.keySet().equals(sourceNodesByName.keySet())) {
-            log.error("Set of source nodes do not match: \n" +
-                "sourceNodesByName = {}\n" +
-                "sourceTopicsByName = {}",
-                sourceNodesByName.keySet(), sourceTopicsByName.keySet());
-            throw new IllegalStateException("Tried to update source topics but source nodes did not match");
-        }
+    public void updateSourceTopics(final Map<String, List<String>> allSourceTopicsByNodeName) {
         sourceNodesByTopic.clear();
-        for (final Map.Entry<String, List<String>> sourceEntry : sourceTopicsByName.entrySet()) {
-            final String nodeName = sourceEntry.getKey();
-            for (final String topic : sourceEntry.getValue()) {
+        for (final Map.Entry<String, SourceNode<?, ?, ?, ?>> sourceNodeEntry : sourceNodesByName.entrySet()) {

Review comment:
       In addition to removing the fault check, I slightly refactored this loop so that we only loop over the source nodes in this particular subtopology. Previously we would have added entries for all source nodes across the entire topology to our `sourceNodesByTopic` map




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] cadonna commented on a change in pull request #9648: KAFKA-10758: ProcessorTopology should only consider its own nodes when updating regex source topics

Posted by GitBox <gi...@apache.org>.
cadonna commented on a change in pull request #9648:
URL: https://github.com/apache/kafka/pull/9648#discussion_r529840078



##########
File path: streams/src/test/java/org/apache/kafka/streams/integration/RegexSourceIntegrationTest.java
##########
@@ -198,9 +200,16 @@ public void testRegexRecordsAreProcessedAfterReassignment() throws Exception {
 
             final StreamsBuilder builder = new StreamsBuilder();
             final KStream<String, String> pattern1Stream = builder.stream(Pattern.compile("TEST-TOPIC-\\d"));
-            pattern1Stream.to(outputTopic, Produced.with(Serdes.String(), Serdes.String()));
+            final KStream<String, String> otherStream = builder.stream(Pattern.compile("not-a-match"));
+
+            pattern1Stream
+                .selectKey((k, v) -> k)
+                .groupByKey()
+                .aggregate(() -> "", (k, v, a) -> v)
+                .toStream().to(outputTopic, Produced.with(Serdes.String(), Serdes.String()));

Review comment:
       You would have a source node in one sub-topology an a source node in the other sub-topology. I thought that was the pattern in the bug report, but I now realized that the bug report uses a pattern similar to the one you specified. Wouldn't it make sense to test both?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] ableegoldman merged pull request #9648: KAFKA-10758: ProcessorTopology should only consider its own nodes when updating regex source topics

Posted by GitBox <gi...@apache.org>.
ableegoldman merged pull request #9648:
URL: https://github.com/apache/kafka/pull/9648


   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] ableegoldman commented on pull request #9648: KAFKA-10758: ProcessorTopology should only consider its own nodes when updating regex source topics

Posted by GitBox <gi...@apache.org>.
ableegoldman commented on pull request #9648:
URL: https://github.com/apache/kafka/pull/9648#issuecomment-733412594


   The Java 15 tests passed, Java 8 build failed with flaky `TransactionsTest.testBumpTransactionalEpoch` and the Java 11 build failed with `StreamTableJoinTopologyOptimizationIntegrationTest.shouldDoStreamTableJoinWithDifferentNumberOfPartitions` (which is known to be flaky, confirmed that the failure was unrelated -- due to `DirectoryNotEmptyException`)
   
   Will merge & cherrypick to older branches to unblock the 2.6.1 and 2.7.0 releases


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] ableegoldman commented on a change in pull request #9648: KAFKA-10758: ProcessorTopology should only consider its own nodes when updating regex source topics

Posted by GitBox <gi...@apache.org>.
ableegoldman commented on a change in pull request #9648:
URL: https://github.com/apache/kafka/pull/9648#discussion_r529109803



##########
File path: streams/src/test/java/org/apache/kafka/streams/integration/RegexSourceIntegrationTest.java
##########
@@ -198,9 +200,16 @@ public void testRegexRecordsAreProcessedAfterReassignment() throws Exception {
 
             final StreamsBuilder builder = new StreamsBuilder();
             final KStream<String, String> pattern1Stream = builder.stream(Pattern.compile("TEST-TOPIC-\\d"));
-            pattern1Stream.to(outputTopic, Produced.with(Serdes.String(), Serdes.String()));
+            final KStream<String, String> otherStream = builder.stream(Pattern.compile("not-a-match"));
+
+            pattern1Stream

Review comment:
       Technically it's sufficient to just add the second KStream above for a multi-subtopology application, but I felt the test coverage could only stand to benefit with (slightly) more complicated examples




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] ableegoldman commented on a change in pull request #9648: KAFKA-10758: ProcessorTopology should only consider its own nodes when updating regex source topics

Posted by GitBox <gi...@apache.org>.
ableegoldman commented on a change in pull request #9648:
URL: https://github.com/apache/kafka/pull/9648#discussion_r529796474



##########
File path: streams/src/test/java/org/apache/kafka/streams/integration/RegexSourceIntegrationTest.java
##########
@@ -198,9 +200,16 @@ public void testRegexRecordsAreProcessedAfterReassignment() throws Exception {
 
             final StreamsBuilder builder = new StreamsBuilder();
             final KStream<String, String> pattern1Stream = builder.stream(Pattern.compile("TEST-TOPIC-\\d"));
-            pattern1Stream.to(outputTopic, Produced.with(Serdes.String(), Serdes.String()));
+            final KStream<String, String> otherStream = builder.stream(Pattern.compile("not-a-match"));
+
+            pattern1Stream
+                .selectKey((k, v) -> k)
+                .groupByKey()
+                .aggregate(() -> "", (k, v, a) -> v)
+                .toStream().to(outputTopic, Produced.with(Serdes.String(), Serdes.String()));

Review comment:
       But if we merge then won't that merge the subtopologies as well? (We would still have two subtopologies due to the upstream key-changing operation/repartition, but I wanted the test to cover different "kinds" of subtopologies like this)




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] ableegoldman commented on a change in pull request #9648: KAFKA-10758: ProcessorTopology should only consider its own nodes when updating regex source topics

Posted by GitBox <gi...@apache.org>.
ableegoldman commented on a change in pull request #9648:
URL: https://github.com/apache/kafka/pull/9648#discussion_r529928566



##########
File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorTopology.java
##########
@@ -149,24 +149,30 @@ public boolean hasPersistentGlobalStore() {
         return false;
     }
 
-    public void updateSourceTopics(final Map<String, List<String>> sourceTopicsByName) {
-        if (!sourceTopicsByName.keySet().equals(sourceNodesByName.keySet())) {
-            log.error("Set of source nodes do not match: \n" +
-                "sourceNodesByName = {}\n" +
-                "sourceTopicsByName = {}",
-                sourceNodesByName.keySet(), sourceTopicsByName.keySet());
-            throw new IllegalStateException("Tried to update source topics but source nodes did not match");
-        }
+    public void updateSourceTopics(final Map<String, List<String>> allSourceTopicsByNodeName) {

Review comment:
       Added a unit test




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org