You are viewing a plain text version of this content. The canonical link for it is here.
Posted to jira@kafka.apache.org by GitBox <gi...@apache.org> on 2020/11/25 12:14:25 UTC

[GitHub] [kafka] cadonna opened a new pull request #9654: MINOR: Increase unit test coverage of ProcessorTopology#updateSourceTopics()

cadonna opened a new pull request #9654:
URL: https://github.com/apache/kafka/pull/9654


   The unit tests for method ProcessorTopology#updateSourceTopics() did not cover all
   code paths.
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] cadonna commented on pull request #9654: MINOR: Increase unit test coverage of ProcessorTopology#updateSourceTopics()

Posted by GitBox <gi...@apache.org>.
cadonna commented on pull request #9654:
URL: https://github.com/apache/kafka/pull/9654#issuecomment-733670985


   Call for review: @ableegoldman 


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] ableegoldman merged pull request #9654: MINOR: Increase unit test coverage of ProcessorTopology#updateSourceTopics()

Posted by GitBox <gi...@apache.org>.
ableegoldman merged pull request #9654:
URL: https://github.com/apache/kafka/pull/9654


   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] ableegoldman commented on pull request #9654: MINOR: Increase unit test coverage of ProcessorTopology#updateSourceTopics()

Posted by GitBox <gi...@apache.org>.
ableegoldman commented on pull request #9654:
URL: https://github.com/apache/kafka/pull/9654#issuecomment-736752649


   Merged to trunk


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] ableegoldman commented on a change in pull request #9654: MINOR: Increase unit test coverage of ProcessorTopology#updateSourceTopics()

Posted by GitBox <gi...@apache.org>.
ableegoldman commented on a change in pull request #9654:
URL: https://github.com/apache/kafka/pull/9654#discussion_r533644775



##########
File path: streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorTopologyTest.java
##########
@@ -151,41 +156,105 @@ public void shouldGetTerminalNodes() {
 
     @Test
     public void shouldUpdateSourceTopicsWithNewMatchingTopic() {
-        topology.addSource("source-1", "topic-1");
+        final String sourceNode = "source-1";
+        final String topic = "topic-1";
+        final String newTopic = "topic-2";
+        topology.addSource(sourceNode, topic);
         final ProcessorTopology processorTopology = topology.getInternalBuilder("X").buildTopology();
+        assertThat(processorTopology.source(newTopic), is(nullValue()));
 
-        assertNull(processorTopology.source("topic-2"));
-        processorTopology.updateSourceTopics(Collections.singletonMap("source-1", asList("topic-1", "topic-2")));
+        processorTopology.updateSourceTopics(Collections.singletonMap(sourceNode, asList(topic, newTopic)));
 
-        assertThat(processorTopology.source("topic-2").name(), equalTo("source-1"));
+        assertThat(processorTopology.source(newTopic).name(), equalTo(sourceNode));
     }
 
     @Test
     public void shouldUpdateSourceTopicsWithRemovedTopic() {
-        topology.addSource("source-1", "topic-1", "topic-2");
+        final String sourceNode = "source-1";
+        final String topic = "topic-1";
+        final String topicToRemove = "topic-2";
+        topology.addSource(sourceNode, topic, topicToRemove);
         final ProcessorTopology processorTopology = topology.getInternalBuilder("X").buildTopology();
+        assertThat(processorTopology.source(topicToRemove).name(), equalTo(sourceNode));
+
+        processorTopology.updateSourceTopics(Collections.singletonMap(sourceNode, Collections.singletonList(topic)));
 
-        assertThat(processorTopology.source("topic-2").name(), equalTo("source-1"));
+        assertThat(processorTopology.source(topicToRemove), is(nullValue()));
+    }
+
+    @Test
+    public void shouldUpdateSourceTopicsWithAllTopicsRemoved() {
+        final String sourceNode = "source-1";
+        final String topic = "topic-1";
+        topology.addSource(sourceNode, topic);
+        final ProcessorTopology processorTopology = topology.getInternalBuilder("X").buildTopology();
+        assertThat(processorTopology.source(topic).name(), equalTo(sourceNode));
 
-        processorTopology.updateSourceTopics(Collections.singletonMap("source-1", Collections.singletonList("topic-1")));
+        processorTopology.updateSourceTopics(Collections.singletonMap(sourceNode, Collections.emptyList()));
 
-        assertNull(processorTopology.source("topic-2"));
+        assertThat(processorTopology.source(topic), is(nullValue()));
     }
 
     @Test
     public void shouldUpdateSourceTopicsOnlyForSourceNodesWithinTheSubtopology() {
-        topology.addSource("source-1", "topic-1");
+        final String sourceNodeWithinSubtopology = "source-1";
+        final String sourceNodeOutsideSubtopology = "source-2";
+        final String topicWithinSubtopology = "topic-1";
+        final String topicOutsideSubtopology = "topic-2";
+        topology.addSource(sourceNodeWithinSubtopology, topicWithinSubtopology);
         final ProcessorTopology processorTopology = topology.getInternalBuilder("X").buildTopology();
 
         processorTopology.updateSourceTopics(mkMap(
-            mkEntry("source-1", Collections.singletonList("topic-1")),
-            mkEntry("source-2", Collections.singletonList("topic-2")))
+            mkEntry(sourceNodeWithinSubtopology, Collections.singletonList(topicWithinSubtopology)),
+            mkEntry(sourceNodeOutsideSubtopology, Collections.singletonList(topicOutsideSubtopology))
+            )
         );
 
-        assertNull(processorTopology.source("topic-2"));
+        assertThat(processorTopology.source(topicOutsideSubtopology), is(nullValue()));
         assertThat(processorTopology.sources().size(), equalTo(1));
     }
 
+    @Test
+    public void shouldThrowIfSourceNodeToUpdateDoesNotExist() {
+        final String existingSourceNode = "source-1";
+        final String nonExistingSourceNode = "source-2";

Review comment:
       It's not a bid deal, but I found the naming here a bit confusing, since the `nonExistingSourceNode` is clearly added to the topology down on line 223, so it definitely exists. But I'm not sure what a better name would be...msybe `removedSourceNode`? Idk




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org