You are viewing a plain text version of this content. The canonical link for it is here.
Posted to jira@kafka.apache.org by GitBox <gi...@apache.org> on 2022/09/08 15:02:31 UTC

[GitHub] [kafka] Danny02 opened a new pull request, #12608: Enable KStream to be merged with itself

Danny02 opened a new pull request, #12608:
URL: https://github.com/apache/kafka/pull/12608

   Why:
   It is an interesting question what should be the result when merging a KStream with itself. Should the merge duplicate the messages or should it be a noop.
   I think the only reasonable solution is to duplicate the messages because there are many different ways to disguise a KStream (e.g. adding peek operation on it).
   It is therefore impossible to implement the solution where it is a noop.
   
   How does it help with resolving the issue:
   This change makes the behavior of the merge operation consistent.
   
   Side effects:
   Using a list instead of a set for the parent nodes could have side effects. The test suit did not detect any.
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


Re: [PR] Enable KStream to be merged with itself [kafka]

Posted by "mjsax (via GitHub)" <gi...@apache.org>.
mjsax commented on PR #12608:
URL: https://github.com/apache/kafka/pull/12608#issuecomment-1760636434

   Closing this PR due to inactivity. Feel free to resume at any time -- and we should have a Jira for this.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


Re: [PR] Enable KStream to be merged with itself [kafka]

Posted by "mjsax (via GitHub)" <gi...@apache.org>.
mjsax closed pull request #12608: Enable KStream to be merged with itself
URL: https://github.com/apache/kafka/pull/12608


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] mjsax commented on a diff in pull request #12608: Enable KStream to be merged with itself

Posted by GitBox <gi...@apache.org>.
mjsax commented on code in PR #12608:
URL: https://github.com/apache/kafka/pull/12608#discussion_r1058650295


##########
streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamImpl.java:
##########
@@ -532,29 +532,36 @@ private KStream<K, V> merge(final InternalStreamsBuilder builder,
                                 final KStream<K, V> stream,
                                 final NamedInternal named) {
         final KStreamImpl<K, V> streamImpl = (KStreamImpl<K, V>) stream;
-        final boolean requireRepartitioning = streamImpl.repartitionRequired || repartitionRequired;
-        final String name = named.orElseGenerateWithPrefix(builder, MERGE_NAME);
-        final Set<String> allSubTopologySourceNodes = new HashSet<>();
-        allSubTopologySourceNodes.addAll(subTopologySourceNodes);
-        allSubTopologySourceNodes.addAll(streamImpl.subTopologySourceNodes);
-
-        final ProcessorParameters<? super K, ? super V, ?, ?> processorParameters =
-            new ProcessorParameters<>(new PassThrough<>(), name);
-        final ProcessorGraphNode<? super K, ? super V> mergeNode =
-            new ProcessorGraphNode<>(name, processorParameters);
-        mergeNode.setMergeNode(true);
-
-        builder.addGraphNode(Arrays.asList(graphNode, streamImpl.graphNode), mergeNode);
-
-        // drop the serde as we cannot safely use either one to represent both streams
-        return new KStreamImpl<>(
-            name,
-            null,
-            null,
-            allSubTopologySourceNodes,
-            requireRepartitioning,
-            mergeNode,
-            builder);
+        if (graphNode.equals(streamImpl.graphNode)) {
+            // We hide the current node through this processor node.
+            // Parent nodes are collected in a Set.
+            // The merge node would otherwise only have a single parent in this case.
+            return process(new KStreamPassThrough<>()).merge(stream, named);

Review Comment:
   I was just thinking about this fix, and it feels a little bit like a "hack" to be honest.
   
   Digging around in the code a little bit, it seems the issue is this:
   ```
   public abstract class GraphNode {
   
       private final Collection<GraphNode> childNodes = new LinkedHashSet<>();
       private final Collection<GraphNode> parentNodes = new LinkedHashSet<>();
   ```
   
   Because we use a set for the parent nodes, adding the same node twice just results in a single parent, instead of two. Just replacing the set with a list breaks two tests though: `TepartitionTopicNamingTest` and `InternalStreamsBuilderTest`. Not sure if we could fix those tests.
   
   Overall, I would prefer a "clean" solution over the proposed fix, but I am not 100% sure if it's doable.
   
   \cc @bbejeck WDYT?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] mjsax commented on pull request #12608: Enable KStream to be merged with itself

Posted by GitBox <gi...@apache.org>.
mjsax commented on PR #12608:
URL: https://github.com/apache/kafka/pull/12608#issuecomment-1381081214

   @Danny02 -- Are you still interested to finish this PR?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] mjsax commented on a diff in pull request #12608: Enable KStream to be merged with itself

Posted by GitBox <gi...@apache.org>.
mjsax commented on code in PR #12608:
URL: https://github.com/apache/kafka/pull/12608#discussion_r1058586441


##########
streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamPassThrough.java:
##########
@@ -0,0 +1,45 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.kstream.internals;
+
+import org.apache.kafka.streams.processor.api.Processor;
+import org.apache.kafka.streams.processor.api.ProcessorContext;
+import org.apache.kafka.streams.processor.api.ProcessorSupplier;
+import org.apache.kafka.streams.processor.api.Record;
+
+public class KStreamPassThrough<KIn, VIn> implements ProcessorSupplier<KIn, VIn, KIn, VIn> {

Review Comment:
   There is already `PassThrough.java` that we can re-use.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] Danny02 commented on pull request #12608: Enable KStream to be merged with itself

Posted by GitBox <gi...@apache.org>.
Danny02 commented on PR #12608:
URL: https://github.com/apache/kafka/pull/12608#issuecomment-1241762494

   I have really no idea why completely different tests are failing.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] mjsax commented on a diff in pull request #12608: Enable KStream to be merged with itself

Posted by GitBox <gi...@apache.org>.
mjsax commented on code in PR #12608:
URL: https://github.com/apache/kafka/pull/12608#discussion_r1058585746


##########
streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamImplTest.java:
##########
@@ -1694,6 +1694,28 @@ public void shouldMergeMultipleStreams() {
             processorSupplier.theCapturedProcessor().processed());
     }
 
+    @Test
+    public void shouldMergeSameStreams() {
+        final String topic1 = "topic-1";
+
+        final KStream<String, String> source = builder.stream(topic1);
+        final KStream<String, String> merged = source.merge(source);
+
+        merged.process(processorSupplier);
+
+        System.out.println(builder.build().describe());

Review Comment:
   We should not have `println` calls inside tests.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] mjsax commented on pull request #12608: Enable KStream to be merged with itself

Posted by GitBox <gi...@apache.org>.
mjsax commented on PR #12608:
URL: https://github.com/apache/kafka/pull/12608#issuecomment-1366911487

   Can you create a JIRA ticket for this PR? If it's not working right now, I would consider it a bug.
   
   And yes, I agree that merging a KStream with itself should result in duplicating each record.
   
   For the tests, it's most likely flaky test.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] mjsax commented on a diff in pull request #12608: Enable KStream to be merged with itself

Posted by GitBox <gi...@apache.org>.
mjsax commented on code in PR #12608:
URL: https://github.com/apache/kafka/pull/12608#discussion_r1058650447


##########
streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamImplTest.java:
##########
@@ -1694,6 +1694,28 @@ public void shouldMergeMultipleStreams() {
             processorSupplier.theCapturedProcessor().processed());
     }
 
+    @Test
+    public void shouldMergeSameStreams() {
+        final String topic1 = "topic-1";
+
+        final KStream<String, String> source = builder.stream(topic1);

Review Comment:
   Instead of creating a new `KStream` we could just re-use the prepared `KStream testStream`
   ```
   final KStream<String, String> merged = testStream.merge(testStream);
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org