You are viewing a plain text version of this content. The canonical link for it is here.
Posted to jira@kafka.apache.org by GitBox <gi...@apache.org> on 2020/11/10 04:29:31 UTC

[GitHub] [kafka] ableegoldman opened a new pull request #9582: KAFKA-6687: rewrite topology to allow creating multiple KStreams from the same topic

ableegoldman opened a new pull request #9582:
URL: https://github.com/apache/kafka/pull/9582


   Needed to fix this on the side in order to more easily set up some experiments, so here's the PR.
   
   Allows a user to create multiple KStreams from the same topic, collection of topics, or pattern. The one exception is when the KStreams are subscribed to an overlapping-but-unequal collection of topics, which I left as future work (with a TODO in the comments describing a possible solution).
   
   If the offset reset policy doesn't match we just throw a TopologyException.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] lct45 commented on a change in pull request #9582: KAFKA-6687: rewrite topology to allow reading the same topic multiple times in the DSL

Posted by GitBox <gi...@apache.org>.
lct45 commented on a change in pull request #9582:
URL: https://github.com/apache/kafka/pull/9582#discussion_r523044318



##########
File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalTopologyBuilder.java
##########
@@ -410,18 +410,6 @@ public final void addSource(final Topology.AutoOffsetReset offsetReset,
             }
         }
 
-        for (final Pattern otherPattern : earliestResetPatterns) {
-            if (topicPattern.pattern().contains(otherPattern.pattern()) || otherPattern.pattern().contains(topicPattern.pattern())) {
-                throw new TopologyException("Pattern " + topicPattern + " will overlap with another pattern " + otherPattern + " already been registered by another source");
-            }
-        }
-
-        for (final Pattern otherPattern : latestResetPatterns) {
-            if (topicPattern.pattern().contains(otherPattern.pattern()) || otherPattern.pattern().contains(topicPattern.pattern())) {
-                throw new TopologyException("Pattern " + topicPattern + " will overlap with another pattern " + otherPattern + " already been registered by another source");
-            }
-        }
-

Review comment:
       I might _also_ be missing something, but what's the scenario where one pattern is a substring of another and they _dont_ match the same topics? If you take Bruno's example from earlier of `topic*` and `topi*`, `topi*` would be considered a substring of `topic*` and they would both match `topic A`, right? I guess the other scenario is if we have a topic `topia A`, that would match `topi*` and not `topic*`. So I guess it seems like it isn't always true that they'll overlap, but we would want to check if they do, right?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] cadonna commented on pull request #9582: KAFKA-6687: rewrite topology to allow reading the same topic multiple times in the DSL

Posted by GitBox <gi...@apache.org>.
cadonna commented on pull request #9582:
URL: https://github.com/apache/kafka/pull/9582#issuecomment-726597853


   > I do think there's some possible followup work to further improve the situation (see my comment above) but I would say that's it's different enough to merit creating separate followup tickets rather than subtasks of this one. Lmk what you think
   
   Fair enough. Let's do it as you say.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] ableegoldman commented on a change in pull request #9582: KAFKA-6687: rewrite topology to allow creating multiple KStreams from the same topic

Posted by GitBox <gi...@apache.org>.
ableegoldman commented on a change in pull request #9582:
URL: https://github.com/apache/kafka/pull/9582#discussion_r522525068



##########
File path: streams/src/main/java/org/apache/kafka/streams/kstream/internals/InternalStreamsBuilder.java
##########
@@ -314,6 +317,50 @@ public void buildAndOptimizeTopology(final Properties props) {
         internalTopologyBuilder.validateCopartition();
     }
 
+    private void mergeDuplicateSourceNodes() {
+        final Map<String, StreamSourceNode<?, ?>> topicsToSourceNodes = new HashMap<>();
+
+        // We don't really care about the order here, but since Pattern does not implement equals() we can't rely on
+        // a regular HashMap and containsKey(Pattern). But for our purposes it's sufficient to compare the compiled
+        // string and flags to determine if two pattern subscriptions can be merged into a single source node
+        final Map<Pattern, StreamSourceNode<?, ?>> patternsToSourceNodes =
+            new TreeMap<>(Comparator.comparing(Pattern::pattern).thenComparing(Pattern::flags));
+
+        for (final StreamsGraphNode graphNode : root.children()) {
+            if (graphNode instanceof StreamSourceNode) {
+                final StreamSourceNode<?, ?> currentSourceNode = (StreamSourceNode<?, ?>) graphNode;

Review comment:
       Yeah I think that's a fair point but I would prefer to keep the scope of this PR as small as possible for now. Maybe @lct45 could pick this up on the side once this is merged?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] lct45 commented on a change in pull request #9582: KAFKA-6687: rewrite topology to allow reading the same topic multiple times in the DSL

Posted by GitBox <gi...@apache.org>.
lct45 commented on a change in pull request #9582:
URL: https://github.com/apache/kafka/pull/9582#discussion_r523006855



##########
File path: streams/src/main/java/org/apache/kafka/streams/kstream/internals/InternalStreamsBuilder.java
##########
@@ -314,6 +317,50 @@ public void buildAndOptimizeTopology(final Properties props) {
         internalTopologyBuilder.validateCopartition();
     }
 
+    private void mergeDuplicateSourceNodes() {
+        final Map<String, StreamSourceNode<?, ?>> topicsToSourceNodes = new HashMap<>();
+
+        // We don't really care about the order here, but since Pattern does not implement equals() we can't rely on
+        // a regular HashMap and containsKey(Pattern). But for our purposes it's sufficient to compare the compiled
+        // string and flags to determine if two pattern subscriptions can be merged into a single source node
+        final Map<Pattern, StreamSourceNode<?, ?>> patternsToSourceNodes =
+            new TreeMap<>(Comparator.comparing(Pattern::pattern).thenComparing(Pattern::flags));
+
+        for (final StreamsGraphNode graphNode : root.children()) {
+            if (graphNode instanceof StreamSourceNode) {
+                final StreamSourceNode<?, ?> currentSourceNode = (StreamSourceNode<?, ?>) graphNode;

Review comment:
       SGTM




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] cadonna commented on a change in pull request #9582: KAFKA-6687: rewrite topology to allow reading the same topic multiple times in the DSL

Posted by GitBox <gi...@apache.org>.
cadonna commented on a change in pull request #9582:
URL: https://github.com/apache/kafka/pull/9582#discussion_r522778010



##########
File path: streams/src/main/java/org/apache/kafka/streams/kstream/internals/graph/StreamSourceNode.java
##########
@@ -71,6 +78,22 @@ public Pattern topicPattern() {
         return consumedInternal.valueSerde();
     }
 
+    // We "merge" source nodes into a single node under the hood if a user tries to read in a source topic multiple times

Review comment:
       I would also remove this comment.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] ableegoldman commented on a change in pull request #9582: KAFKA-6687: rewrite topology to allow reading the same topic multiple times in the DSL

Posted by GitBox <gi...@apache.org>.
ableegoldman commented on a change in pull request #9582:
URL: https://github.com/apache/kafka/pull/9582#discussion_r522549685



##########
File path: streams/src/main/java/org/apache/kafka/streams/kstream/internals/InternalStreamsBuilder.java
##########
@@ -314,6 +317,50 @@ public void buildAndOptimizeTopology(final Properties props) {
         internalTopologyBuilder.validateCopartition();
     }
 
+    private void mergeDuplicateSourceNodes() {
+        final Map<String, StreamSourceNode<?, ?>> topicsToSourceNodes = new HashMap<>();
+
+        // We don't really care about the order here, but since Pattern does not implement equals() we can't rely on
+        // a regular HashMap and containsKey(Pattern). But for our purposes it's sufficient to compare the compiled
+        // string and flags to determine if two pattern subscriptions can be merged into a single source node
+        final Map<Pattern, StreamSourceNode<?, ?>> patternsToSourceNodes =
+            new TreeMap<>(Comparator.comparing(Pattern::pattern).thenComparing(Pattern::flags));

Review comment:
       Yes to all of that: this PR improves some situations, but not all. Specifically you would still get a TopologyException if  (a) subscribing to overlapping but not equal collection of topics, (b) subscribing to a topic and to a pattern that matches said topic, and (c) subscribing to two (or more) patterns that match the same topic(s).
   
   Case (c) is what you described, I just wanted to list them all here for completion. Here's my take on what we can/should reasonably try to tackle:
   
   (a) this case is easily detected, easily worked around, and easy for us to fix. It results in a "compile time" exception (meaning when the topology is compiled, not the program) which users can quickly detect and work around if need be by rewriting the topology themselves. Fix is relatively straightforward but very low priority, so I plan to just file a followup ticket for this for now
   (b) is easily detected (you get a compile time exception) and possible to work around, but difficult to solve. I think in all cases a user could find a way around this issue by some combination of topology rewriting and Pattern manipulation or topic renaming, depending on what exactly they're trying to achieve. Of course there's no way for us to detect what an arbitrary user is trying to do in this case, so I don't see any path forwarding to making this case possible. No plans to file a followup ticket
   (c) is difficult to detect, might be possible to work around, and probably very complicated to actually fix. Unfortunately, in this case you only get a run-time exception, since there's no way of knowing which topics will or will not be created ahead of time. And I'm thinking that determining whether two regexes will both match any possible string may be unsolvable...so, no followup ticket planned for this. 
   WDYT?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] ableegoldman commented on a change in pull request #9582: KAFKA-6687: rewrite topology to allow creating multiple KStreams from the same topic

Posted by GitBox <gi...@apache.org>.
ableegoldman commented on a change in pull request #9582:
URL: https://github.com/apache/kafka/pull/9582#discussion_r522384690



##########
File path: streams/src/main/java/org/apache/kafka/streams/kstream/internals/graph/StreamSourceNode.java
##########
@@ -71,6 +78,21 @@ public Pattern topicPattern() {
         return consumedInternal.valueSerde();
     }
 
+    // We "merge" source nodes into a single node under the hood if a user tries to read in a source topic multiple times
+    public void merge(final StreamSourceNode<?, ?> other) {
+        final AutoOffsetReset resetPolicy = consumedInternal.offsetResetPolicy();
+        if (resetPolicy != null && !resetPolicy.equals(other.consumedInternal().offsetResetPolicy())) {

Review comment:
       Ack, good catch. Added a test for this case

##########
File path: streams/src/main/java/org/apache/kafka/streams/kstream/internals/graph/StreamSourceNode.java
##########
@@ -71,6 +78,21 @@ public Pattern topicPattern() {
         return consumedInternal.valueSerde();
     }
 
+    // We "merge" source nodes into a single node under the hood if a user tries to read in a source topic multiple times
+    public void merge(final StreamSourceNode<?, ?> other) {
+        final AutoOffsetReset resetPolicy = consumedInternal.offsetResetPolicy();
+        if (resetPolicy != null && !resetPolicy.equals(other.consumedInternal().offsetResetPolicy())) {

Review comment:
       Ack, good catch. Added a test for this case as well




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] ableegoldman commented on a change in pull request #9582: KAFKA-6687: rewrite topology to allow creating multiple KStreams from the same topic

Posted by GitBox <gi...@apache.org>.
ableegoldman commented on a change in pull request #9582:
URL: https://github.com/apache/kafka/pull/9582#discussion_r522381803



##########
File path: streams/src/main/java/org/apache/kafka/streams/kstream/internals/graph/StreamSourceNode.java
##########
@@ -71,6 +78,21 @@ public Pattern topicPattern() {
         return consumedInternal.valueSerde();
     }
 
+    // We "merge" source nodes into a single node under the hood if a user tries to read in a source topic multiple times
+    public void merge(final StreamSourceNode<?, ?> other) {
+        final AutoOffsetReset resetPolicy = consumedInternal.offsetResetPolicy();
+        if (resetPolicy != null && !resetPolicy.equals(other.consumedInternal().offsetResetPolicy())) {
+            log.error("Tried to merge source nodes {} and {} which are subscribed to the same topic/pattern, but "
+                          + "the offset reset policies do not match", this, other);
+            throw new TopologyException("Can't configure different offset reset policies on the same input topic(s)");
+        }
+        for (final StreamsGraphNode otherChild : other.children()) {
+            // Move children from other to this, these calls take care of resetting the child's parents to this

Review comment:
       I'll remove it




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] ableegoldman commented on pull request #9582: KAFKA-6687: rewrite topology to allow reading the same topic multiple times in the DSL

Posted by GitBox <gi...@apache.org>.
ableegoldman commented on pull request #9582:
URL: https://github.com/apache/kafka/pull/9582#issuecomment-727067902


   Two unrelated flaky test failures:
   `EosBetaUpgradeIntegrationTest.shouldUpgradeFromEosAlphaToEosBeta[true]`
   `DynamicConnectionQuotaTest.testDynamicListenerConnectionCreationRateQuota`


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] ableegoldman commented on pull request #9582: KAFKA-6687: rewrite topology to allow reading the same topic multiple times in the DSL

Posted by GitBox <gi...@apache.org>.
ableegoldman commented on pull request #9582:
URL: https://github.com/apache/kafka/pull/9582#issuecomment-727069085


   Merged to trunk


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] cadonna commented on a change in pull request #9582: KAFKA-6687: rewrite topology to allow creating multiple KStreams from the same topic

Posted by GitBox <gi...@apache.org>.
cadonna commented on a change in pull request #9582:
URL: https://github.com/apache/kafka/pull/9582#discussion_r522027942



##########
File path: streams/src/main/java/org/apache/kafka/streams/kstream/internals/InternalStreamsBuilder.java
##########
@@ -314,6 +317,50 @@ public void buildAndOptimizeTopology(final Properties props) {
         internalTopologyBuilder.validateCopartition();
     }
 
+    private void mergeDuplicateSourceNodes() {
+        final Map<String, StreamSourceNode<?, ?>> topicsToSourceNodes = new HashMap<>();
+
+        // We don't really care about the order here, but since Pattern does not implement equals() we can't rely on
+        // a regular HashMap and containsKey(Pattern). But for our purposes it's sufficient to compare the compiled
+        // string and flags to determine if two pattern subscriptions can be merged into a single source node
+        final Map<Pattern, StreamSourceNode<?, ?>> patternsToSourceNodes =
+            new TreeMap<>(Comparator.comparing(Pattern::pattern).thenComparing(Pattern::flags));

Review comment:
       Just to be clear. This improves the situation but it is not a complete solution, right? Assume we have a topic `topicA`. Patterns `topic*` and `topi*` both  match `topicA` but they are different when compared with this comparator. In that case a `TopologyException` would be thrown in the `InternalTopologyBuilder`, right?

##########
File path: streams/src/main/java/org/apache/kafka/streams/kstream/internals/graph/StreamSourceNode.java
##########
@@ -71,6 +78,21 @@ public Pattern topicPattern() {
         return consumedInternal.valueSerde();
     }
 
+    // We "merge" source nodes into a single node under the hood if a user tries to read in a source topic multiple times
+    public void merge(final StreamSourceNode<?, ?> other) {
+        final AutoOffsetReset resetPolicy = consumedInternal.offsetResetPolicy();
+        if (resetPolicy != null && !resetPolicy.equals(other.consumedInternal().offsetResetPolicy())) {
+            log.error("Tried to merge source nodes {} and {} which are subscribed to the same topic/pattern, but "
+                          + "the offset reset policies do not match", this, other);
+            throw new TopologyException("Can't configure different offset reset policies on the same input topic(s)");
+        }
+        for (final StreamsGraphNode otherChild : other.children()) {
+            // Move children from other to this, these calls take care of resetting the child's parents to this

Review comment:
       Do we really need this comment and the comment on line 81. We get the same information when we navigate to the call and to the implementation of the methods with the difference that comments can start to be outdated without us noticing it.

##########
File path: streams/src/test/java/org/apache/kafka/streams/StreamsBuilderTest.java
##########
@@ -895,6 +898,55 @@ public void shouldUseSpecifiedNameForAggregateOperationGivenTable() {
             STREAM_OPERATION_NAME);
     }
 
+    @Test
+    public void shouldAllowReadingFromSameTopic() {
+        builder.stream("topic");
+        builder.stream("topic");
+        builder.build();
+    }
+
+    @Test
+    public void shouldAllowSubscribingToSamePattern() {
+        builder.stream(Pattern.compile("some-regex"));
+        builder.stream(Pattern.compile("some-regex"));
+        builder.build();
+    }
+
+    @Test
+    public void shouldAllowReadingFromSameCollectionOfTopics() {
+        builder.stream(Collections.singletonList("topic"));
+        builder.stream(Collections.singletonList("topic"));

Review comment:
       Please use a collection with at least two topics to test the loop over the collections.

##########
File path: streams/src/test/java/org/apache/kafka/streams/StreamsBuilderTest.java
##########
@@ -895,6 +898,55 @@ public void shouldUseSpecifiedNameForAggregateOperationGivenTable() {
             STREAM_OPERATION_NAME);
     }
 
+    @Test
+    public void shouldAllowReadingFromSameTopic() {
+        builder.stream("topic");
+        builder.stream("topic");
+        builder.build();
+    }
+
+    @Test
+    public void shouldAllowSubscribingToSamePattern() {
+        builder.stream(Pattern.compile("some-regex"));
+        builder.stream(Pattern.compile("some-regex"));
+        builder.build();
+    }
+
+    @Test
+    public void shouldAllowReadingFromSameCollectionOfTopics() {
+        builder.stream(Collections.singletonList("topic"));
+        builder.stream(Collections.singletonList("topic"));
+        builder.build();
+    }
+
+    @Test
+    public void shouldNotAllowReadingFromOverlappingAndUnequalCollectionOfTopics() {
+        builder.stream(Collections.singletonList("topic"));
+        builder.stream(asList("topic", "anotherTopic"));
+        assertThrows(TopologyException.class, builder::build);
+    }
+
+    @Test
+    public void shouldThrowWhenSubscribedToATopicWithDifferentResetPolicies() {
+        builder.stream("topic", Consumed.with(AutoOffsetReset.EARLIEST));
+        builder.stream("topic", Consumed.with(AutoOffsetReset.LATEST));
+        assertThrows(TopologyException.class, builder::build);
+    }
+
+    @Test
+    public void shouldThrowWhenSubscribedToATopicWithSetAndUnsetResetPolicies() {
+        builder.stream("topic", Consumed.with(AutoOffsetReset.EARLIEST));
+        builder.stream("topic");
+        assertThrows(TopologyException.class, builder::build);
+    }
+
+    @Test
+    public void shouldThrowWhenSubscribedToAPatternWithDifferentResetPolicies() {
+        builder.stream(Pattern.compile("some-regex"), Consumed.with(AutoOffsetReset.EARLIEST));
+        builder.stream(Pattern.compile("some-regex"), Consumed.with(AutoOffsetReset.LATEST));
+        assertThrows(TopologyException.class, builder::build);
+    }
+

Review comment:
       Could you also add a test with two patterns with the same string but one with a set reset policy and one with unset reset policy like you did for the non-pattern case. Just to make it clear it should also throw in that case.

##########
File path: streams/src/test/java/org/apache/kafka/streams/StreamsBuilderTest.java
##########
@@ -895,6 +898,55 @@ public void shouldUseSpecifiedNameForAggregateOperationGivenTable() {
             STREAM_OPERATION_NAME);
     }
 
+    @Test
+    public void shouldAllowReadingFromSameTopic() {
+        builder.stream("topic");
+        builder.stream("topic");
+        builder.build();
+    }
+
+    @Test
+    public void shouldAllowSubscribingToSamePattern() {
+        builder.stream(Pattern.compile("some-regex"));
+        builder.stream(Pattern.compile("some-regex"));
+        builder.build();
+    }
+
+    @Test
+    public void shouldAllowReadingFromSameCollectionOfTopics() {
+        builder.stream(Collections.singletonList("topic"));
+        builder.stream(Collections.singletonList("topic"));
+        builder.build();
+    }
+
+    @Test
+    public void shouldNotAllowReadingFromOverlappingAndUnequalCollectionOfTopics() {
+        builder.stream(Collections.singletonList("topic"));
+        builder.stream(asList("topic", "anotherTopic"));
+        assertThrows(TopologyException.class, builder::build);
+    }
+
+    @Test
+    public void shouldThrowWhenSubscribedToATopicWithDifferentResetPolicies() {
+        builder.stream("topic", Consumed.with(AutoOffsetReset.EARLIEST));
+        builder.stream("topic", Consumed.with(AutoOffsetReset.LATEST));
+        assertThrows(TopologyException.class, builder::build);
+    }
+
+    @Test
+    public void shouldThrowWhenSubscribedToATopicWithSetAndUnsetResetPolicies() {
+        builder.stream("topic", Consumed.with(AutoOffsetReset.EARLIEST));
+        builder.stream("topic");
+        assertThrows(TopologyException.class, builder::build);
+    }

Review comment:
       What should happen in this case? See also my comment in `merge()`.
   ```
       @Test
       public void shouldThrowWhenSubscribedToATopicWithSetAndUnsetResetPolicies() {
           builder.stream("topic");
           builder.stream("topic", Consumed.with(AutoOffsetReset.EARLIEST));
           assertThrows(TopologyException.class, builder::build);
       }
   ```

##########
File path: streams/src/main/java/org/apache/kafka/streams/kstream/internals/graph/StreamSourceNode.java
##########
@@ -71,6 +78,21 @@ public Pattern topicPattern() {
         return consumedInternal.valueSerde();
     }
 
+    // We "merge" source nodes into a single node under the hood if a user tries to read in a source topic multiple times
+    public void merge(final StreamSourceNode<?, ?> other) {
+        final AutoOffsetReset resetPolicy = consumedInternal.offsetResetPolicy();
+        if (resetPolicy != null && !resetPolicy.equals(other.consumedInternal().offsetResetPolicy())) {

Review comment:
       What should happen if this reset policy is `null` and the other is not `null`? I guess we should also throw in that case, don't we?

##########
File path: streams/src/main/java/org/apache/kafka/streams/kstream/internals/InternalStreamsBuilder.java
##########
@@ -314,6 +317,50 @@ public void buildAndOptimizeTopology(final Properties props) {
         internalTopologyBuilder.validateCopartition();
     }
 
+    private void mergeDuplicateSourceNodes() {
+        final Map<String, StreamSourceNode<?, ?>> topicsToSourceNodes = new HashMap<>();
+
+        // We don't really care about the order here, but since Pattern does not implement equals() we can't rely on
+        // a regular HashMap and containsKey(Pattern). But for our purposes it's sufficient to compare the compiled
+        // string and flags to determine if two pattern subscriptions can be merged into a single source node
+        final Map<Pattern, StreamSourceNode<?, ?>> patternsToSourceNodes =
+            new TreeMap<>(Comparator.comparing(Pattern::pattern).thenComparing(Pattern::flags));
+
+        for (final StreamsGraphNode graphNode : root.children()) {
+            if (graphNode instanceof StreamSourceNode) {
+                final StreamSourceNode<?, ?> currentSourceNode = (StreamSourceNode<?, ?>) graphNode;

Review comment:
       We could avoid the `instanceof` and the casting if we introduce a `RootGraphNode` with a method `sourceNodes()`. Since a root can only have source nodes and state stores as children, we could make the topology code in general a bit more type safe. As far as I can see that would need some additional changes outside the scope of this PR. So, feel free to not consider this comment for this PR and we can do another PR for that.

##########
File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalTopologyBuilder.java
##########
@@ -410,18 +410,6 @@ public final void addSource(final Topology.AutoOffsetReset offsetReset,
             }
         }
 
-        for (final Pattern otherPattern : earliestResetPatterns) {
-            if (topicPattern.pattern().contains(otherPattern.pattern()) || otherPattern.pattern().contains(topicPattern.pattern())) {
-                throw new TopologyException("Pattern " + topicPattern + " will overlap with another pattern " + otherPattern + " already been registered by another source");
-            }
-        }
-
-        for (final Pattern otherPattern : latestResetPatterns) {
-            if (topicPattern.pattern().contains(otherPattern.pattern()) || otherPattern.pattern().contains(topicPattern.pattern())) {
-                throw new TopologyException("Pattern " + topicPattern + " will overlap with another pattern " + otherPattern + " already been registered by another source");
-            }
-        }
-

Review comment:
       I agree on the first part.
   Regarding the second part, I had similiar thoughts when I wrote my comment in `mergeDuplicateSourceNodes()`.
   But I might also be missing something here.  

##########
File path: streams/src/test/java/org/apache/kafka/streams/StreamsBuilderTest.java
##########
@@ -895,6 +898,55 @@ public void shouldUseSpecifiedNameForAggregateOperationGivenTable() {
             STREAM_OPERATION_NAME);
     }
 
+    @Test
+    public void shouldAllowReadingFromSameTopic() {
+        builder.stream("topic");
+        builder.stream("topic");
+        builder.build();
+    }

Review comment:
       Could you please add a try-catch clause to better document the test?
   For example:
   ```suggestion
       public void shouldAllowReadingFromSameTopic() {
           builder.stream("topic");
           builder.stream("topic");
           
           try {
               builder.build();
           } catch (final TopologyException topologyException) {
               fail("TopologyException not expected");
           }
       }
   ```
   This applies also to the other tests.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] ableegoldman commented on pull request #9582: KAFKA-6687: rewrite topology to allow reading the same topic multiple times in the DSL

Posted by GitBox <gi...@apache.org>.
ableegoldman commented on pull request #9582:
URL: https://github.com/apache/kafka/pull/9582#issuecomment-726433686


   > According to the ticket similar work needs to be done for table() and globalTable(). What do you think of adding subtasks to the ticket to track what has already been done and what not?
   
   Thanks for the review @cadonna . This fix actually does work for `table()` as well as `stream()`, I've updated the title and added unit tests to reflect this. As for the global table, there is already a separate ticket for this which should be linked to from KAFKA-6687.
   
   I do think there's some possible followup work to further improve the situation (see my comment above) but I would say that's it's different enough to merit creating separate followup tickets rather than subtasks of this one. Lmk what you think


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] cadonna commented on a change in pull request #9582: KAFKA-6687: rewrite topology to allow reading the same topic multiple times in the DSL

Posted by GitBox <gi...@apache.org>.
cadonna commented on a change in pull request #9582:
URL: https://github.com/apache/kafka/pull/9582#discussion_r522782299



##########
File path: streams/src/test/java/org/apache/kafka/streams/StreamsBuilderTest.java
##########
@@ -895,6 +899,103 @@ public void shouldUseSpecifiedNameForAggregateOperationGivenTable() {
             STREAM_OPERATION_NAME);
     }
 
+    @Test
+    public void shouldAllowStreamsFromSameTopic() {
+        builder.stream("topic");
+        builder.stream("topic");
+        try {
+            builder.build();
+        } catch (final TopologyException topologyException) {
+            fail("TopologyException not expected");
+        }

Review comment:
       Could you please extract this part in its own method since we use it also in a couple of other tests?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] ableegoldman merged pull request #9582: KAFKA-6687: rewrite topology to allow reading the same topic multiple times in the DSL

Posted by GitBox <gi...@apache.org>.
ableegoldman merged pull request #9582:
URL: https://github.com/apache/kafka/pull/9582


   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] ableegoldman commented on a change in pull request #9582: KAFKA-6687: rewrite topology to allow creating multiple KStreams from the same topic

Posted by GitBox <gi...@apache.org>.
ableegoldman commented on a change in pull request #9582:
URL: https://github.com/apache/kafka/pull/9582#discussion_r522384690



##########
File path: streams/src/main/java/org/apache/kafka/streams/kstream/internals/graph/StreamSourceNode.java
##########
@@ -71,6 +78,21 @@ public Pattern topicPattern() {
         return consumedInternal.valueSerde();
     }
 
+    // We "merge" source nodes into a single node under the hood if a user tries to read in a source topic multiple times
+    public void merge(final StreamSourceNode<?, ?> other) {
+        final AutoOffsetReset resetPolicy = consumedInternal.offsetResetPolicy();
+        if (resetPolicy != null && !resetPolicy.equals(other.consumedInternal().offsetResetPolicy())) {

Review comment:
       Ack, good catch




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] ableegoldman commented on a change in pull request #9582: KAFKA-6687: rewrite topology to allow creating multiple KStreams from the same topic

Posted by GitBox <gi...@apache.org>.
ableegoldman commented on a change in pull request #9582:
URL: https://github.com/apache/kafka/pull/9582#discussion_r522476382



##########
File path: streams/src/test/java/org/apache/kafka/streams/StreamsBuilderTest.java
##########
@@ -895,6 +898,55 @@ public void shouldUseSpecifiedNameForAggregateOperationGivenTable() {
             STREAM_OPERATION_NAME);
     }
 
+    @Test
+    public void shouldAllowReadingFromSameTopic() {
+        builder.stream("topic");
+        builder.stream("topic");
+        builder.build();
+    }
+
+    @Test
+    public void shouldAllowSubscribingToSamePattern() {
+        builder.stream(Pattern.compile("some-regex"));
+        builder.stream(Pattern.compile("some-regex"));
+        builder.build();
+    }
+
+    @Test
+    public void shouldAllowReadingFromSameCollectionOfTopics() {
+        builder.stream(Collections.singletonList("topic"));
+        builder.stream(Collections.singletonList("topic"));
+        builder.build();
+    }
+
+    @Test
+    public void shouldNotAllowReadingFromOverlappingAndUnequalCollectionOfTopics() {
+        builder.stream(Collections.singletonList("topic"));
+        builder.stream(asList("topic", "anotherTopic"));
+        assertThrows(TopologyException.class, builder::build);
+    }
+
+    @Test
+    public void shouldThrowWhenSubscribedToATopicWithDifferentResetPolicies() {
+        builder.stream("topic", Consumed.with(AutoOffsetReset.EARLIEST));
+        builder.stream("topic", Consumed.with(AutoOffsetReset.LATEST));
+        assertThrows(TopologyException.class, builder::build);
+    }
+
+    @Test
+    public void shouldThrowWhenSubscribedToATopicWithSetAndUnsetResetPolicies() {
+        builder.stream("topic", Consumed.with(AutoOffsetReset.EARLIEST));
+        builder.stream("topic");
+        assertThrows(TopologyException.class, builder::build);
+    }

Review comment:
       Yep that was just an oversight in the condition in `merge()`. I fixed that and added another unit test for the case




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] ableegoldman commented on a change in pull request #9582: KAFKA-6687: rewrite topology to allow reading the same topic multiple times in the DSL

Posted by GitBox <gi...@apache.org>.
ableegoldman commented on a change in pull request #9582:
URL: https://github.com/apache/kafka/pull/9582#discussion_r523183821



##########
File path: streams/src/main/java/org/apache/kafka/streams/kstream/internals/InternalStreamsBuilder.java
##########
@@ -314,6 +317,50 @@ public void buildAndOptimizeTopology(final Properties props) {
         internalTopologyBuilder.validateCopartition();
     }
 
+    private void mergeDuplicateSourceNodes() {
+        final Map<String, StreamSourceNode<?, ?>> topicsToSourceNodes = new HashMap<>();
+
+        // We don't really care about the order here, but since Pattern does not implement equals() we can't rely on
+        // a regular HashMap and containsKey(Pattern). But for our purposes it's sufficient to compare the compiled
+        // string and flags to determine if two pattern subscriptions can be merged into a single source node
+        final Map<Pattern, StreamSourceNode<?, ?>> patternsToSourceNodes =
+            new TreeMap<>(Comparator.comparing(Pattern::pattern).thenComparing(Pattern::flags));
+
+        for (final StreamsGraphNode graphNode : root.children()) {
+            if (graphNode instanceof StreamSourceNode) {
+                final StreamSourceNode<?, ?> currentSourceNode = (StreamSourceNode<?, ?>) graphNode;
+
+                if (currentSourceNode.topicPattern() != null) {
+                    if (!patternsToSourceNodes.containsKey(currentSourceNode.topicPattern())) {
+                        patternsToSourceNodes.put(currentSourceNode.topicPattern(), currentSourceNode);
+                    } else {
+                        final StreamSourceNode<?, ?> mainSourceNode = patternsToSourceNodes.get(currentSourceNode.topicPattern());
+                        mainSourceNode.merge(currentSourceNode);
+                        root.removeChild(graphNode);
+                    }
+                } else {
+                    for (final String topic : currentSourceNode.topicNames()) {
+                        if (!topicsToSourceNodes.containsKey(topic)) {
+                            topicsToSourceNodes.put(topic, currentSourceNode);
+                        } else {
+                            final StreamSourceNode<?, ?> mainSourceNode = topicsToSourceNodes.get(topic);
+                            // TODO we only merge source nodes if the subscribed topic(s) are an exact match, so it's still not
+                            // possible to subscribe to topicA in one KStream and topicA + topicB in another. We could achieve
+                            // this by splitting these source nodes into one topic per node and routing to the subscribed children

Review comment:
       Filed https://issues.apache.org/jira/browse/KAFKA-10721




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] cadonna commented on a change in pull request #9582: KAFKA-6687: rewrite topology to allow reading the same topic multiple times in the DSL

Posted by GitBox <gi...@apache.org>.
cadonna commented on a change in pull request #9582:
URL: https://github.com/apache/kafka/pull/9582#discussion_r522775434



##########
File path: streams/src/main/java/org/apache/kafka/streams/kstream/internals/InternalStreamsBuilder.java
##########
@@ -314,6 +317,50 @@ public void buildAndOptimizeTopology(final Properties props) {
         internalTopologyBuilder.validateCopartition();
     }
 
+    private void mergeDuplicateSourceNodes() {
+        final Map<String, StreamSourceNode<?, ?>> topicsToSourceNodes = new HashMap<>();
+
+        // We don't really care about the order here, but since Pattern does not implement equals() we can't rely on
+        // a regular HashMap and containsKey(Pattern). But for our purposes it's sufficient to compare the compiled
+        // string and flags to determine if two pattern subscriptions can be merged into a single source node
+        final Map<Pattern, StreamSourceNode<?, ?>> patternsToSourceNodes =
+            new TreeMap<>(Comparator.comparing(Pattern::pattern).thenComparing(Pattern::flags));

Review comment:
       Thank you for the list of issues. I agree in all points.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] ableegoldman commented on a change in pull request #9582: KAFKA-6687: rewrite topology to allow creating multiple KStreams from the same topic

Posted by GitBox <gi...@apache.org>.
ableegoldman commented on a change in pull request #9582:
URL: https://github.com/apache/kafka/pull/9582#discussion_r520959288



##########
File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalTopologyBuilder.java
##########
@@ -410,18 +410,6 @@ public final void addSource(final Topology.AutoOffsetReset offsetReset,
             }
         }
 
-        for (final Pattern otherPattern : earliestResetPatterns) {
-            if (topicPattern.pattern().contains(otherPattern.pattern()) || otherPattern.pattern().contains(topicPattern.pattern())) {
-                throw new TopologyException("Pattern " + topicPattern + " will overlap with another pattern " + otherPattern + " already been registered by another source");
-            }
-        }
-
-        for (final Pattern otherPattern : latestResetPatterns) {
-            if (topicPattern.pattern().contains(otherPattern.pattern()) || otherPattern.pattern().contains(topicPattern.pattern())) {
-                throw new TopologyException("Pattern " + topicPattern + " will overlap with another pattern " + otherPattern + " already been registered by another source");
-            }
-        }
-

Review comment:
       Saw this and at first I thought it was broken because it only considers pattern-subscribed topics that happened to explicitly configure an offset reset policy. Unless I'm missing something here, that makes no sense and we should consider _all_  source patterns and whether they overlap.
   But then I started thinking, why does it matter if they overlap? Just because one pattern is a substring of another does not mean that they'll match the same topics. So I think that we should actually just remove this restriction altogether. Am I missing anything here?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] ableegoldman commented on pull request #9582: KAFKA-6687: rewrite topology to allow creating multiple KStreams from the same topic

Posted by GitBox <gi...@apache.org>.
ableegoldman commented on pull request #9582:
URL: https://github.com/apache/kafka/pull/9582#issuecomment-725045200


   Call for review @mjsax @cadonna @lct45 


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] ableegoldman commented on a change in pull request #9582: KAFKA-6687: rewrite topology to allow creating multiple KStreams from the same topic

Posted by GitBox <gi...@apache.org>.
ableegoldman commented on a change in pull request #9582:
URL: https://github.com/apache/kafka/pull/9582#discussion_r522384690



##########
File path: streams/src/main/java/org/apache/kafka/streams/kstream/internals/graph/StreamSourceNode.java
##########
@@ -71,6 +78,21 @@ public Pattern topicPattern() {
         return consumedInternal.valueSerde();
     }
 
+    // We "merge" source nodes into a single node under the hood if a user tries to read in a source topic multiple times
+    public void merge(final StreamSourceNode<?, ?> other) {
+        final AutoOffsetReset resetPolicy = consumedInternal.offsetResetPolicy();
+        if (resetPolicy != null && !resetPolicy.equals(other.consumedInternal().offsetResetPolicy())) {

Review comment:
       Ack, good catch




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] cadonna commented on a change in pull request #9582: KAFKA-6687: rewrite topology to allow reading the same topic multiple times in the DSL

Posted by GitBox <gi...@apache.org>.
cadonna commented on a change in pull request #9582:
URL: https://github.com/apache/kafka/pull/9582#discussion_r523061696



##########
File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalTopologyBuilder.java
##########
@@ -410,18 +410,6 @@ public final void addSource(final Topology.AutoOffsetReset offsetReset,
             }
         }
 
-        for (final Pattern otherPattern : earliestResetPatterns) {
-            if (topicPattern.pattern().contains(otherPattern.pattern()) || otherPattern.pattern().contains(topicPattern.pattern())) {
-                throw new TopologyException("Pattern " + topicPattern + " will overlap with another pattern " + otherPattern + " already been registered by another source");
-            }
-        }
-
-        for (final Pattern otherPattern : latestResetPatterns) {
-            if (topicPattern.pattern().contains(otherPattern.pattern()) || otherPattern.pattern().contains(topicPattern.pattern())) {
-                throw new TopologyException("Pattern " + topicPattern + " will overlap with another pattern " + otherPattern + " already been registered by another source");
-            }
-        }
-

Review comment:
       Pattern `topic*` is contained in pattern `topic*A`. However, `topic*A` matches only a subset of `topic*`. So, they do not match exactly the same topics. But matching exactly the same topics is a pre-requisite for merging the source nodes.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] ableegoldman commented on a change in pull request #9582: KAFKA-6687: rewrite topology to allow reading the same topic multiple times in the DSL

Posted by GitBox <gi...@apache.org>.
ableegoldman commented on a change in pull request #9582:
URL: https://github.com/apache/kafka/pull/9582#discussion_r523175703



##########
File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalTopologyBuilder.java
##########
@@ -410,18 +410,6 @@ public final void addSource(final Topology.AutoOffsetReset offsetReset,
             }
         }
 
-        for (final Pattern otherPattern : earliestResetPatterns) {
-            if (topicPattern.pattern().contains(otherPattern.pattern()) || otherPattern.pattern().contains(topicPattern.pattern())) {
-                throw new TopologyException("Pattern " + topicPattern + " will overlap with another pattern " + otherPattern + " already been registered by another source");
-            }
-        }
-
-        for (final Pattern otherPattern : latestResetPatterns) {
-            if (topicPattern.pattern().contains(otherPattern.pattern()) || otherPattern.pattern().contains(topicPattern.pattern())) {
-                throw new TopologyException("Pattern " + topicPattern + " will overlap with another pattern " + otherPattern + " already been registered by another source");
-            }
-        }
-

Review comment:
       I think in this case we were matching whether the pattern's string was a literal substring of another pattern's string, not whether the regexes themselves are substrings. So `topi*` would not be a substring of `topic*` because `topi*` is not contained literally within the string `topic*`. It's not doing a smart regex-matching, just a dumb  literal string comparison




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org