You are viewing a plain text version of this content. The canonical link for it is here.
Posted to jira@kafka.apache.org by GitBox <gi...@apache.org> on 2022/08/24 12:25:20 UTC

[GitHub] [kafka] vpapavas opened a new pull request, #12555: Optimize self-join

vpapavas opened a new pull request, #12555:
URL: https://github.com/apache/kafka/pull/12555

   *More detailed description of your change,
   if necessary. The PR title and PR message become
   the squashed commit message, so use a separate
   comment to ping reviewers.*
   
   *Summary of testing strategy (including rationale)
   for the feature or bug fix. Unit and/or integration
   tests are expected for any behaviour change and
   system tests should be considered for larger changes.*
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] vpapavas commented on a diff in pull request #12555: Optimize self-join

Posted by GitBox <gi...@apache.org>.
vpapavas commented on code in PR #12555:
URL: https://github.com/apache/kafka/pull/12555#discussion_r960845326


##########
streams/src/main/java/org/apache/kafka/streams/kstream/internals/InternalStreamsBuilder.java:
##########
@@ -356,6 +381,95 @@ private void mergeDuplicateSourceNodes() {
         }
     }
 
+    /**
+     * If the join is a self-join, remove the node KStreamJoinWindow corresponding to the
+     */
+    @SuppressWarnings("unchecked")
+    private void rewriteSelfJoin(final GraphNode currentNode, final Set<GraphNode> visited) {
+        visited.add(currentNode);
+        if (currentNode instanceof StreamStreamJoinNode && isSelfJoin(currentNode)) {
+            ((StreamStreamJoinNode) currentNode).setSelfJoin();
+            // Remove JoinOtherWindowed node
+            final GraphNode parent = currentNode.parentNodes().stream().findFirst().get();
+            GraphNode left = null, right = null;
+            for (final GraphNode child: parent.children()) {
+                if (child instanceof  ProcessorGraphNode
+                    && isStreamJoinWindowNode((ProcessorGraphNode) child)) {
+                    if (left == null) {
+                        left = child;
+                    } else {
+                        right = child;
+                    }
+                }
+            }
+            // Sanity check
+            if (left != null && right != null && left.buildPriority() < right.buildPriority()) {
+                parent.removeChild(right);
+            }
+        }
+        for (final GraphNode child: currentNode.children()) {
+            if (!visited.contains(child)) {
+                rewriteSelfJoin(child, visited);

Review Comment:
   We can have multiple joins but they won't get optimized, only the first one will get optimized. However, we can have topologies that have more than one self-joins, not n-way but independent pairs. And all of them can and should be optimized. I added a test case in `InternalStreamsBuilderTest:shouldMarkAllStreamStreamJoinsAsSelfJoin`



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] vpapavas commented on a diff in pull request #12555: Optimize self-join

Posted by GitBox <gi...@apache.org>.
vpapavas commented on code in PR #12555:
URL: https://github.com/apache/kafka/pull/12555#discussion_r969349691


##########
streams/src/main/java/org/apache/kafka/streams/kstream/internals/InternalStreamsBuilder.java:
##########
@@ -356,6 +381,95 @@ private void mergeDuplicateSourceNodes() {
         }
     }
 
+    /**
+     * If the join is a self-join, remove the node KStreamJoinWindow corresponding to the
+     */
+    @SuppressWarnings("unchecked")
+    private void rewriteSelfJoin(final GraphNode currentNode, final Set<GraphNode> visited) {
+        visited.add(currentNode);
+        if (currentNode instanceof StreamStreamJoinNode && isSelfJoin(currentNode)) {
+            ((StreamStreamJoinNode) currentNode).setSelfJoin();
+            // Remove JoinOtherWindowed node
+            final GraphNode parent = currentNode.parentNodes().stream().findFirst().get();
+            GraphNode left = null, right = null;
+            for (final GraphNode child: parent.children()) {

Review Comment:
   @vvcephei , yes that's what the code is checking. I updated the comment and added an extra test where I have two joins under the same source. I updated also the code to make the checks more fail-proof. 
   
   @jnh5y , there can be at most two `isStreamJoinWindowNode` nodes _per_ join.  There will be one `isStreamJoinWindowNode` if we have N-way joins. In this case, only the first one will be optimized as a self-join.
   
   All the tests are in `InternalStreamsBuilderTest`. I tried to create several topologies of joins but I am sure there are more I haven't thought of. If you feel there are any missing, please let me know and I will add them. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] vpapavas commented on a diff in pull request #12555: Optimize self-join

Posted by GitBox <gi...@apache.org>.
vpapavas commented on code in PR #12555:
URL: https://github.com/apache/kafka/pull/12555#discussion_r969351766


##########
streams/src/main/java/org/apache/kafka/streams/kstream/internals/InternalStreamsBuilder.java:
##########
@@ -270,17 +277,12 @@ private void maybeAddNodeForOptimizationMetadata(final GraphNode node) {
 
     // use this method for testing only
     public void buildAndOptimizeTopology() {
-        buildAndOptimizeTopology(false);
+        buildAndOptimizeTopology(null);
     }
 
-    public void buildAndOptimizeTopology(final boolean optimizeTopology) {

Review Comment:
   No, it's in the `internals` package so it's not public API. Right @vvcephei ?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] vpapavas commented on a diff in pull request #12555: Optimize self-join

Posted by GitBox <gi...@apache.org>.
vpapavas commented on code in PR #12555:
URL: https://github.com/apache/kafka/pull/12555#discussion_r958320960


##########
streams/src/main/java/org/apache/kafka/streams/kstream/internals/InternalStreamsBuilder.java:
##########
@@ -270,16 +272,20 @@ private void maybeAddNodeForOptimizationMetadata(final GraphNode node) {
 
     // use this method for testing only
     public void buildAndOptimizeTopology() {
-        buildAndOptimizeTopology(false);
+        buildAndOptimizeTopology(false, false);
     }
 
-    public void buildAndOptimizeTopology(final boolean optimizeTopology) {
+    public void buildAndOptimizeTopology(
+        final boolean optimizeTopology, final boolean optimizeSelfJoin) {
 
         mergeDuplicateSourceNodes();
         if (optimizeTopology) {
             LOG.debug("Optimizing the Kafka Streams graph for repartition nodes");
             optimizeKTableSourceTopics();
             maybeOptimizeRepartitionOperations();
+            if (optimizeSelfJoin) {

Review Comment:
   Hey @guozhangwang ! What do you mean by the  "two-pass" mechanism? Do you mean that one pass is building the plan and the second is optimizing it? What is wrong with doing this? 
   
   The complications that I came across are actually not from this but from the fact that the logical plan is not so logical but has a lot of physical plan information already baked into it. I don't think it is a good idea to optimize the plan while building it (in one pass) as some optimizations cannot be identified until after the full plan is built. Moreover, optimizing the plan while building allows an optimization rule to only consider "local" information (up to the point of what the plan currently contains) and not the whole plan holistically which contributes to excluding optimizations that could otherwise be applied simply because the optimizer doesn't have all the information yet. Moreover, it contributes to "race" conditions between optimizations as applying one optimization might exclude another when the second could have been more beneficial. 
   
   Regarding the optimization in this PR, it is not possible to identify whether it is a self-join until we have seen the entire plan. We need to make sure that it is the same source in both arguments of the join and that no transformation is applied to one argument and not the other. I am afraid, the checks that I am doing on whether a self-join is applicable are unavoidable whether we do the optimizations on the fly or afterward. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] jnh5y commented on a diff in pull request #12555: Optimize self-join

Posted by GitBox <gi...@apache.org>.
jnh5y commented on code in PR #12555:
URL: https://github.com/apache/kafka/pull/12555#discussion_r968982353


##########
streams/src/main/java/org/apache/kafka/streams/kstream/internals/InternalStreamsBuilder.java:
##########
@@ -305,6 +307,36 @@ public void buildAndOptimizeTopology(final boolean optimizeTopology) {
         internalTopologyBuilder.validateCopartition();
     }
 
+    /**
+     * A user can provide either the config OPTIMIZE which means all optimizations rules will be
+     * applied or they can provide a list of optimization rules.
+     */
+    private void optimizeTopology(final Properties props) {
+        final List<String> optimizationConfigs;
+        if (props == null || !props.containsKey(StreamsConfig.TOPOLOGY_OPTIMIZATION_CONFIG)) {
+            optimizationConfigs = new ArrayList<>();
+            optimizationConfigs.add(StreamsConfig.NO_OPTIMIZATION);
+        } else {
+            optimizationConfigs = StreamsConfig.verifyTopologyOptimizationConfigs(
+                (String) props.get(StreamsConfig.TOPOLOGY_OPTIMIZATION_CONFIG));
+        }
+        if (optimizationConfigs.contains(StreamsConfig.OPTIMIZE)
+            || optimizationConfigs.contains(StreamsConfig.REUSE_KTABLE_SOURCE_TOPICS)) {
+            LOG.debug("Optimizing the Kafka Streams graph for ktable source nodes");
+            optimizeKTableSourceTopics();
+        }
+        if (optimizationConfigs.contains(StreamsConfig.OPTIMIZE)
+            || optimizationConfigs.contains(StreamsConfig.MERGE_REPARTITION_TOPICS)) {
+            LOG.debug("Optimizing the Kafka Streams graph for repartition nodes");
+            maybeOptimizeRepartitionOperations();
+        }
+        if (optimizationConfigs.contains(StreamsConfig.OPTIMIZE)
+            || optimizationConfigs.contains(StreamsConfig.SELF_JOIN)) {
+            LOG.debug("Optimizing the Kafka Streams graph for self-joins");
+            rewriteSelfJoin(root, new IdentityHashMap<>());

Review Comment:
   In this method, could order matter?
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] vpapavas commented on a diff in pull request #12555: Optimize self-join

Posted by GitBox <gi...@apache.org>.
vpapavas commented on code in PR #12555:
URL: https://github.com/apache/kafka/pull/12555#discussion_r970805661


##########
streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamSelfJoin.java:
##########
@@ -0,0 +1,144 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.kstream.internals;
+
+import static org.apache.kafka.streams.processor.internals.metrics.TaskMetrics.droppedRecordsSensor;
+
+import org.apache.kafka.common.metrics.Sensor;
+import org.apache.kafka.streams.KeyValue;
+import org.apache.kafka.streams.kstream.ValueJoinerWithKey;
+import org.apache.kafka.streams.kstream.internals.KStreamImplJoin.TimeTracker;
+import org.apache.kafka.streams.processor.api.ContextualProcessor;
+import org.apache.kafka.streams.processor.api.Processor;
+import org.apache.kafka.streams.processor.api.ProcessorContext;
+import org.apache.kafka.streams.processor.api.ProcessorSupplier;
+import org.apache.kafka.streams.processor.api.Record;
+import org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl;
+import org.apache.kafka.streams.state.WindowStore;
+import org.apache.kafka.streams.state.WindowStoreIterator;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+class KStreamKStreamSelfJoin<K, V1, V2, VOut> implements ProcessorSupplier<K, V1, K, VOut> {
+    private static final Logger LOG = LoggerFactory.getLogger(KStreamKStreamSelfJoin.class);
+
+    private final String windowName;
+    private final long joinThisBeforeMs;
+    private final long joinThisAfterMs;
+    private final long joinOtherBeforeMs;
+    private final long joinOtherAfterMs;
+    private final ValueJoinerWithKey<? super K, ? super V1, ? super V2, ? extends VOut> joinerThis;
+
+    private final TimeTracker sharedTimeTracker;
+
+    KStreamKStreamSelfJoin(
+        final String windowName,
+        final JoinWindowsInternal windows,
+        final ValueJoinerWithKey<? super K, ? super V1, ? super V2, ? extends VOut> joinerThis,
+        final TimeTracker sharedTimeTracker) {
+
+        this.windowName = windowName;
+        this.joinThisBeforeMs = windows.beforeMs;
+        this.joinThisAfterMs = windows.afterMs;
+        this.joinOtherBeforeMs = windows.afterMs;
+        this.joinOtherAfterMs = windows.beforeMs;
+        this.joinerThis = joinerThis;
+        this.sharedTimeTracker = sharedTimeTracker;
+    }
+
+    @Override
+    public Processor<K, V1, K, VOut> get() {
+        return new KStreamKStreamSelfJoinProcessor();
+    }
+
+    private class KStreamKStreamSelfJoinProcessor extends ContextualProcessor<K, V1, K, VOut> {
+        private WindowStore<K, V2> windowStore;
+        private Sensor droppedRecordsSensor;
+
+        @Override
+        public void init(final ProcessorContext<K, VOut> context) {
+            super.init(context);
+
+            final StreamsMetricsImpl metrics = (StreamsMetricsImpl) context.metrics();
+            droppedRecordsSensor = droppedRecordsSensor(Thread.currentThread().getName(), context.taskId().toString(), metrics);
+            windowStore = context.getStateStore(windowName);
+        }
+
+        @SuppressWarnings("unchecked")
+        @Override
+        public void process(final Record<K, V1> record) {
+            if (StreamStreamJoinUtil.skipRecord(record, LOG, droppedRecordsSensor, context())) {
+                return;
+            }
+
+            final long inputRecordTimestamp = record.timestamp();
+            long timeFrom = Math.max(0L, inputRecordTimestamp - joinThisBeforeMs);
+            long timeTo = Math.max(0L, inputRecordTimestamp + joinThisAfterMs);
+            boolean emittedJoinWithSelf = false;
+            final Record selfRecord = record
+                .withValue(joinerThis.apply(record.key(), record.value(), (V2) record.value()))
+                .withTimestamp(inputRecordTimestamp);
+            sharedTimeTracker.advanceStreamTime(inputRecordTimestamp);
+
+            // Join current record with other
+            try (final WindowStoreIterator<V2> iter = windowStore.fetch(
+                record.key(), timeFrom, timeTo)) {
+                while (iter.hasNext()) {
+                    final KeyValue<Long, V2> otherRecord = iter.next();
+                    final long otherRecordTimestamp = otherRecord.key;
+
+                    // Join this with other
+                    context().forward(
+                        record.withValue(joinerThis.apply(
+                                record.key(), record.value(), otherRecord.value))
+                            .withTimestamp(Math.max(inputRecordTimestamp, otherRecordTimestamp)));
+                }
+            }
+
+            // Needs to be in a different loop to ensure correct ordering of records where
+            // correct ordering means it matches the output of an inner join.
+            timeFrom = Math.max(0L, inputRecordTimestamp - joinOtherBeforeMs);

Review Comment:
   I need to loop over the records twice to ensure that the results match the output of the inner-join. The first loop simulates the join triggered by the left. The second loop simulates the join triggered by the right. I could store the results in a data structure and iterate over it instead of doing a second fetch, but we don't know how many these records might be and I didn't want to cause memory issues. So, I decided to sacrifice CPU for memory. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] guozhangwang commented on a diff in pull request #12555: Optimize self-join

Posted by GitBox <gi...@apache.org>.
guozhangwang commented on code in PR #12555:
URL: https://github.com/apache/kafka/pull/12555#discussion_r970140299


##########
streams/src/main/java/org/apache/kafka/streams/kstream/internals/InternalStreamsBuilder.java:
##########
@@ -305,6 +305,32 @@ public void buildAndOptimizeTopology(final boolean optimizeTopology) {
         internalTopologyBuilder.validateCopartition();
     }
 
+    /**
+     * A user can provide either the config OPTIMIZE which means all optimizations rules will be
+     * applied or they can provide a list of optimization rules.
+     */
+    private void optimizeTopology(final Properties props) {
+        final Set<String> optimizationConfigs;
+        if (props == null || !props.containsKey(StreamsConfig.TOPOLOGY_OPTIMIZATION_CONFIG)) {
+            optimizationConfigs = Collections.emptySet();
+        } else {
+            optimizationConfigs = StreamsConfig.verifyTopologyOptimizationConfigs(
+                (String) props.get(StreamsConfig.TOPOLOGY_OPTIMIZATION_CONFIG));
+        }
+        if (optimizationConfigs.contains(StreamsConfig.REUSE_KTABLE_SOURCE_TOPICS)) {
+            LOG.debug("Optimizing the Kafka Streams graph for ktable source nodes");
+            optimizeKTableSourceTopics();
+        }
+        if (optimizationConfigs.contains(StreamsConfig.MERGE_REPARTITION_TOPICS)) {
+            LOG.debug("Optimizing the Kafka Streams graph for repartition nodes");
+            maybeOptimizeRepartitionOperations();

Review Comment:
   nit: we can remove the `maybe` prefix in function name now I think? Could we just follow the config values and call them `reuseKTableSourceTopics()` and `mergeRepartitionTopics()` instead?



##########
streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamSelfJoin.java:
##########
@@ -0,0 +1,144 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.kstream.internals;
+
+import static org.apache.kafka.streams.processor.internals.metrics.TaskMetrics.droppedRecordsSensor;
+
+import org.apache.kafka.common.metrics.Sensor;
+import org.apache.kafka.streams.KeyValue;
+import org.apache.kafka.streams.kstream.ValueJoinerWithKey;
+import org.apache.kafka.streams.kstream.internals.KStreamImplJoin.TimeTracker;
+import org.apache.kafka.streams.processor.api.ContextualProcessor;
+import org.apache.kafka.streams.processor.api.Processor;
+import org.apache.kafka.streams.processor.api.ProcessorContext;
+import org.apache.kafka.streams.processor.api.ProcessorSupplier;
+import org.apache.kafka.streams.processor.api.Record;
+import org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl;
+import org.apache.kafka.streams.state.WindowStore;
+import org.apache.kafka.streams.state.WindowStoreIterator;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+class KStreamKStreamSelfJoin<K, V1, V2, VOut> implements ProcessorSupplier<K, V1, K, VOut> {

Review Comment:
   I have a meta question in mind about whether we have to introduce a new SelfJoin processor, or could we reuse the existing Join processor by adding this window name, and by comparing that with other window name we can tell if it is a self join or not. But since I have not fully understand the logic of this self join processor (e.g. why we loop twice) I will hold back on that question until I fully comprehend this class.



##########
gradle.properties:
##########
@@ -20,7 +20,7 @@ group=org.apache.kafka
 #  - tests/kafkatest/__init__.py
 #  - tests/kafkatest/version.py (variable DEV_VERSION)
 #  - kafka-merge-pr.py
-version=3.3.0-SNAPSHOT

Review Comment:
   Hmm.. why we have this in the diff? the current trunk should be already pointing to 3.4.0-SNAPSHOT. Maybe some rebasing cleanups needed?



##########
streams/src/main/java/org/apache/kafka/streams/kstream/internals/InternalStreamsBuilder.java:
##########
@@ -356,6 +382,54 @@ private void mergeDuplicateSourceNodes() {
         }
     }
 
+    /**
+     * The self-join rewriting can be applied if the StreamStreamJoinNode has a single parent.
+     * If the join is a self-join, remove the node KStreamJoinWindow corresponding to the
+     * right argument of the join (the "other"). The join node may have multiple siblings but for
+     * this rewriting we only care about the ThisKStreamJoinWindow and the OtherKStreamJoinWindow.
+     * We iterate over all the siblings to identify these two nodes so that we can remove the
+     * latter.
+     */
+    @SuppressWarnings("unchecked")
+    private void rewriteSelfJoin(
+        final GraphNode currentNode, final Map<GraphNode, Boolean> visited) {
+        visited.put(currentNode, true);
+        if (currentNode instanceof StreamStreamJoinNode && currentNode.parentNodes().size() == 1) {
+            final StreamStreamJoinNode joinNode = (StreamStreamJoinNode) currentNode;
+            joinNode.setSelfJoin();
+            // Remove JoinOtherWindowed node
+            final GraphNode parent = joinNode.parentNodes().stream().findFirst().get();
+            GraphNode left = null, right = null;
+            for (final GraphNode child: parent.children()) {
+                if (child instanceof  ProcessorGraphNode
+                    && isStreamJoinWindowNode((ProcessorGraphNode) child)
+                    && child.buildPriority() < joinNode.buildPriority()) {
+                    if (child.nodeName().equals(joinNode.getThisWindowedStreamProcessorParameters().processorName())) {
+                        left = child;
+                    } else if (child.nodeName().equals(joinNode.getOtherWindowedStreamProcessorParameters().processorName())) {
+                        right = child;
+                    }
+                }
+            }
+            // Sanity check
+            if (left != null && right != null && left.buildPriority() < right.buildPriority()) {

Review Comment:
   If this `if` check fails, should that be considered a bug (hence we should throw)?



##########
streams/src/main/java/org/apache/kafka/streams/kstream/internals/InternalStreamsBuilder.java:
##########
@@ -356,6 +382,54 @@ private void mergeDuplicateSourceNodes() {
         }
     }
 
+    /**
+     * The self-join rewriting can be applied if the StreamStreamJoinNode has a single parent.
+     * If the join is a self-join, remove the node KStreamJoinWindow corresponding to the
+     * right argument of the join (the "other"). The join node may have multiple siblings but for
+     * this rewriting we only care about the ThisKStreamJoinWindow and the OtherKStreamJoinWindow.
+     * We iterate over all the siblings to identify these two nodes so that we can remove the
+     * latter.
+     */
+    @SuppressWarnings("unchecked")
+    private void rewriteSelfJoin(
+        final GraphNode currentNode, final Map<GraphNode, Boolean> visited) {
+        visited.put(currentNode, true);
+        if (currentNode instanceof StreamStreamJoinNode && currentNode.parentNodes().size() == 1) {
+            final StreamStreamJoinNode joinNode = (StreamStreamJoinNode) currentNode;
+            joinNode.setSelfJoin();
+            // Remove JoinOtherWindowed node
+            final GraphNode parent = joinNode.parentNodes().stream().findFirst().get();
+            GraphNode left = null, right = null;
+            for (final GraphNode child: parent.children()) {
+                if (child instanceof  ProcessorGraphNode

Review Comment:
   nit: extra space.



##########
streams/src/main/java/org/apache/kafka/streams/kstream/internals/InternalStreamsBuilder.java:
##########
@@ -356,6 +382,54 @@ private void mergeDuplicateSourceNodes() {
         }
     }
 
+    /**
+     * The self-join rewriting can be applied if the StreamStreamJoinNode has a single parent.
+     * If the join is a self-join, remove the node KStreamJoinWindow corresponding to the
+     * right argument of the join (the "other"). The join node may have multiple siblings but for
+     * this rewriting we only care about the ThisKStreamJoinWindow and the OtherKStreamJoinWindow.
+     * We iterate over all the siblings to identify these two nodes so that we can remove the
+     * latter.
+     */
+    @SuppressWarnings("unchecked")
+    private void rewriteSelfJoin(
+        final GraphNode currentNode, final Map<GraphNode, Boolean> visited) {
+        visited.put(currentNode, true);
+        if (currentNode instanceof StreamStreamJoinNode && currentNode.parentNodes().size() == 1) {
+            final StreamStreamJoinNode joinNode = (StreamStreamJoinNode) currentNode;
+            joinNode.setSelfJoin();
+            // Remove JoinOtherWindowed node
+            final GraphNode parent = joinNode.parentNodes().stream().findFirst().get();
+            GraphNode left = null, right = null;
+            for (final GraphNode child: parent.children()) {
+                if (child instanceof  ProcessorGraphNode
+                    && isStreamJoinWindowNode((ProcessorGraphNode) child)
+                    && child.buildPriority() < joinNode.buildPriority()) {
+                    if (child.nodeName().equals(joinNode.getThisWindowedStreamProcessorParameters().processorName())) {
+                        left = child;
+                    } else if (child.nodeName().equals(joinNode.getOtherWindowedStreamProcessorParameters().processorName())) {
+                        right = child;
+                    }
+                }
+            }
+            // Sanity check
+            if (left != null && right != null && left.buildPriority() < right.buildPriority()) {
+                parent.removeChild(right);
+            }
+        }
+        for (final GraphNode child: currentNode.children()) {
+            if (!visited.containsKey(child)) {
+                rewriteSelfJoin(child, visited);
+            }
+        }
+    }
+
+    private boolean isStreamJoinWindowNode(final ProcessorGraphNode node) {

Review Comment:
   I think it's better to add a new node type e.g. `KStreamJoinWindowNode`, extending `StatefulProcessorNode`, that encodes the window store name along with the processor. Then we can reduce this node to `node instanceof KStreamJoinWindowNode`.



##########
streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamSelfJoin.java:
##########
@@ -0,0 +1,144 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.kstream.internals;
+
+import static org.apache.kafka.streams.processor.internals.metrics.TaskMetrics.droppedRecordsSensor;
+
+import org.apache.kafka.common.metrics.Sensor;
+import org.apache.kafka.streams.KeyValue;
+import org.apache.kafka.streams.kstream.ValueJoinerWithKey;
+import org.apache.kafka.streams.kstream.internals.KStreamImplJoin.TimeTracker;
+import org.apache.kafka.streams.processor.api.ContextualProcessor;
+import org.apache.kafka.streams.processor.api.Processor;
+import org.apache.kafka.streams.processor.api.ProcessorContext;
+import org.apache.kafka.streams.processor.api.ProcessorSupplier;
+import org.apache.kafka.streams.processor.api.Record;
+import org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl;
+import org.apache.kafka.streams.state.WindowStore;
+import org.apache.kafka.streams.state.WindowStoreIterator;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+class KStreamKStreamSelfJoin<K, V1, V2, VOut> implements ProcessorSupplier<K, V1, K, VOut> {
+    private static final Logger LOG = LoggerFactory.getLogger(KStreamKStreamSelfJoin.class);
+
+    private final String windowName;
+    private final long joinThisBeforeMs;
+    private final long joinThisAfterMs;
+    private final long joinOtherBeforeMs;
+    private final long joinOtherAfterMs;
+    private final ValueJoinerWithKey<? super K, ? super V1, ? super V2, ? extends VOut> joinerThis;
+
+    private final TimeTracker sharedTimeTracker;
+
+    KStreamKStreamSelfJoin(
+        final String windowName,
+        final JoinWindowsInternal windows,
+        final ValueJoinerWithKey<? super K, ? super V1, ? super V2, ? extends VOut> joinerThis,
+        final TimeTracker sharedTimeTracker) {
+
+        this.windowName = windowName;
+        this.joinThisBeforeMs = windows.beforeMs;
+        this.joinThisAfterMs = windows.afterMs;
+        this.joinOtherBeforeMs = windows.afterMs;
+        this.joinOtherAfterMs = windows.beforeMs;
+        this.joinerThis = joinerThis;
+        this.sharedTimeTracker = sharedTimeTracker;
+    }
+
+    @Override
+    public Processor<K, V1, K, VOut> get() {
+        return new KStreamKStreamSelfJoinProcessor();
+    }
+
+    private class KStreamKStreamSelfJoinProcessor extends ContextualProcessor<K, V1, K, VOut> {
+        private WindowStore<K, V2> windowStore;
+        private Sensor droppedRecordsSensor;
+
+        @Override
+        public void init(final ProcessorContext<K, VOut> context) {
+            super.init(context);
+
+            final StreamsMetricsImpl metrics = (StreamsMetricsImpl) context.metrics();
+            droppedRecordsSensor = droppedRecordsSensor(Thread.currentThread().getName(), context.taskId().toString(), metrics);
+            windowStore = context.getStateStore(windowName);
+        }
+
+        @SuppressWarnings("unchecked")
+        @Override
+        public void process(final Record<K, V1> record) {
+            if (StreamStreamJoinUtil.skipRecord(record, LOG, droppedRecordsSensor, context())) {
+                return;
+            }
+
+            final long inputRecordTimestamp = record.timestamp();
+            long timeFrom = Math.max(0L, inputRecordTimestamp - joinThisBeforeMs);

Review Comment:
   The timeFrom/timeTo are exactly the same here and in line 114/115, is that right?



##########
streams/src/main/java/org/apache/kafka/streams/kstream/internals/InternalStreamsBuilder.java:
##########
@@ -270,17 +277,12 @@ private void maybeAddNodeForOptimizationMetadata(final GraphNode node) {
 
     // use this method for testing only
     public void buildAndOptimizeTopology() {
-        buildAndOptimizeTopology(false);
+        buildAndOptimizeTopology(null);
     }
 
-    public void buildAndOptimizeTopology(final boolean optimizeTopology) {

Review Comment:
   Yes that's right :)



##########
streams/src/main/java/org/apache/kafka/streams/kstream/internals/InternalStreamsBuilder.java:
##########
@@ -356,6 +382,54 @@ private void mergeDuplicateSourceNodes() {
         }
     }
 
+    /**
+     * The self-join rewriting can be applied if the StreamStreamJoinNode has a single parent.
+     * If the join is a self-join, remove the node KStreamJoinWindow corresponding to the
+     * right argument of the join (the "other"). The join node may have multiple siblings but for
+     * this rewriting we only care about the ThisKStreamJoinWindow and the OtherKStreamJoinWindow.
+     * We iterate over all the siblings to identify these two nodes so that we can remove the
+     * latter.
+     */
+    @SuppressWarnings("unchecked")
+    private void rewriteSelfJoin(
+        final GraphNode currentNode, final Map<GraphNode, Boolean> visited) {
+        visited.put(currentNode, true);
+        if (currentNode instanceof StreamStreamJoinNode && currentNode.parentNodes().size() == 1) {
+            final StreamStreamJoinNode joinNode = (StreamStreamJoinNode) currentNode;
+            joinNode.setSelfJoin();

Review Comment:
   Could we defer this after the sanity check has passed?



##########
streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamSelfJoin.java:
##########
@@ -0,0 +1,144 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.kstream.internals;
+
+import static org.apache.kafka.streams.processor.internals.metrics.TaskMetrics.droppedRecordsSensor;
+
+import org.apache.kafka.common.metrics.Sensor;
+import org.apache.kafka.streams.KeyValue;
+import org.apache.kafka.streams.kstream.ValueJoinerWithKey;
+import org.apache.kafka.streams.kstream.internals.KStreamImplJoin.TimeTracker;
+import org.apache.kafka.streams.processor.api.ContextualProcessor;
+import org.apache.kafka.streams.processor.api.Processor;
+import org.apache.kafka.streams.processor.api.ProcessorContext;
+import org.apache.kafka.streams.processor.api.ProcessorSupplier;
+import org.apache.kafka.streams.processor.api.Record;
+import org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl;
+import org.apache.kafka.streams.state.WindowStore;
+import org.apache.kafka.streams.state.WindowStoreIterator;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+class KStreamKStreamSelfJoin<K, V1, V2, VOut> implements ProcessorSupplier<K, V1, K, VOut> {
+    private static final Logger LOG = LoggerFactory.getLogger(KStreamKStreamSelfJoin.class);
+
+    private final String windowName;
+    private final long joinThisBeforeMs;
+    private final long joinThisAfterMs;
+    private final long joinOtherBeforeMs;
+    private final long joinOtherAfterMs;
+    private final ValueJoinerWithKey<? super K, ? super V1, ? super V2, ? extends VOut> joinerThis;
+
+    private final TimeTracker sharedTimeTracker;
+
+    KStreamKStreamSelfJoin(
+        final String windowName,
+        final JoinWindowsInternal windows,
+        final ValueJoinerWithKey<? super K, ? super V1, ? super V2, ? extends VOut> joinerThis,
+        final TimeTracker sharedTimeTracker) {
+
+        this.windowName = windowName;
+        this.joinThisBeforeMs = windows.beforeMs;
+        this.joinThisAfterMs = windows.afterMs;
+        this.joinOtherBeforeMs = windows.afterMs;
+        this.joinOtherAfterMs = windows.beforeMs;
+        this.joinerThis = joinerThis;
+        this.sharedTimeTracker = sharedTimeTracker;
+    }
+
+    @Override
+    public Processor<K, V1, K, VOut> get() {
+        return new KStreamKStreamSelfJoinProcessor();
+    }
+
+    private class KStreamKStreamSelfJoinProcessor extends ContextualProcessor<K, V1, K, VOut> {
+        private WindowStore<K, V2> windowStore;
+        private Sensor droppedRecordsSensor;
+
+        @Override
+        public void init(final ProcessorContext<K, VOut> context) {
+            super.init(context);
+
+            final StreamsMetricsImpl metrics = (StreamsMetricsImpl) context.metrics();
+            droppedRecordsSensor = droppedRecordsSensor(Thread.currentThread().getName(), context.taskId().toString(), metrics);
+            windowStore = context.getStateStore(windowName);
+        }
+
+        @SuppressWarnings("unchecked")
+        @Override
+        public void process(final Record<K, V1> record) {
+            if (StreamStreamJoinUtil.skipRecord(record, LOG, droppedRecordsSensor, context())) {
+                return;
+            }
+
+            final long inputRecordTimestamp = record.timestamp();
+            long timeFrom = Math.max(0L, inputRecordTimestamp - joinThisBeforeMs);
+            long timeTo = Math.max(0L, inputRecordTimestamp + joinThisAfterMs);
+            boolean emittedJoinWithSelf = false;
+            final Record selfRecord = record
+                .withValue(joinerThis.apply(record.key(), record.value(), (V2) record.value()))
+                .withTimestamp(inputRecordTimestamp);
+            sharedTimeTracker.advanceStreamTime(inputRecordTimestamp);
+
+            // Join current record with other
+            try (final WindowStoreIterator<V2> iter = windowStore.fetch(
+                record.key(), timeFrom, timeTo)) {
+                while (iter.hasNext()) {
+                    final KeyValue<Long, V2> otherRecord = iter.next();
+                    final long otherRecordTimestamp = otherRecord.key;
+
+                    // Join this with other
+                    context().forward(

Review Comment:
   Won't we emit the self-join record twice, since we would first put this record into the window store and then trigger this processor? I.e. we would get the same record from the fetch iterator as well?



##########
streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamSelfJoin.java:
##########
@@ -0,0 +1,144 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.kstream.internals;
+
+import static org.apache.kafka.streams.processor.internals.metrics.TaskMetrics.droppedRecordsSensor;
+
+import org.apache.kafka.common.metrics.Sensor;
+import org.apache.kafka.streams.KeyValue;
+import org.apache.kafka.streams.kstream.ValueJoinerWithKey;
+import org.apache.kafka.streams.kstream.internals.KStreamImplJoin.TimeTracker;
+import org.apache.kafka.streams.processor.api.ContextualProcessor;
+import org.apache.kafka.streams.processor.api.Processor;
+import org.apache.kafka.streams.processor.api.ProcessorContext;
+import org.apache.kafka.streams.processor.api.ProcessorSupplier;
+import org.apache.kafka.streams.processor.api.Record;
+import org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl;
+import org.apache.kafka.streams.state.WindowStore;
+import org.apache.kafka.streams.state.WindowStoreIterator;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+class KStreamKStreamSelfJoin<K, V1, V2, VOut> implements ProcessorSupplier<K, V1, K, VOut> {
+    private static final Logger LOG = LoggerFactory.getLogger(KStreamKStreamSelfJoin.class);
+
+    private final String windowName;
+    private final long joinThisBeforeMs;
+    private final long joinThisAfterMs;
+    private final long joinOtherBeforeMs;
+    private final long joinOtherAfterMs;
+    private final ValueJoinerWithKey<? super K, ? super V1, ? super V2, ? extends VOut> joinerThis;
+
+    private final TimeTracker sharedTimeTracker;
+
+    KStreamKStreamSelfJoin(
+        final String windowName,
+        final JoinWindowsInternal windows,
+        final ValueJoinerWithKey<? super K, ? super V1, ? super V2, ? extends VOut> joinerThis,
+        final TimeTracker sharedTimeTracker) {
+
+        this.windowName = windowName;
+        this.joinThisBeforeMs = windows.beforeMs;
+        this.joinThisAfterMs = windows.afterMs;
+        this.joinOtherBeforeMs = windows.afterMs;
+        this.joinOtherAfterMs = windows.beforeMs;
+        this.joinerThis = joinerThis;
+        this.sharedTimeTracker = sharedTimeTracker;
+    }
+
+    @Override
+    public Processor<K, V1, K, VOut> get() {
+        return new KStreamKStreamSelfJoinProcessor();
+    }
+
+    private class KStreamKStreamSelfJoinProcessor extends ContextualProcessor<K, V1, K, VOut> {
+        private WindowStore<K, V2> windowStore;
+        private Sensor droppedRecordsSensor;
+
+        @Override
+        public void init(final ProcessorContext<K, VOut> context) {
+            super.init(context);
+
+            final StreamsMetricsImpl metrics = (StreamsMetricsImpl) context.metrics();
+            droppedRecordsSensor = droppedRecordsSensor(Thread.currentThread().getName(), context.taskId().toString(), metrics);
+            windowStore = context.getStateStore(windowName);
+        }
+
+        @SuppressWarnings("unchecked")
+        @Override
+        public void process(final Record<K, V1> record) {
+            if (StreamStreamJoinUtil.skipRecord(record, LOG, droppedRecordsSensor, context())) {
+                return;
+            }
+
+            final long inputRecordTimestamp = record.timestamp();
+            long timeFrom = Math.max(0L, inputRecordTimestamp - joinThisBeforeMs);
+            long timeTo = Math.max(0L, inputRecordTimestamp + joinThisAfterMs);
+            boolean emittedJoinWithSelf = false;
+            final Record selfRecord = record
+                .withValue(joinerThis.apply(record.key(), record.value(), (V2) record.value()))
+                .withTimestamp(inputRecordTimestamp);
+            sharedTimeTracker.advanceStreamTime(inputRecordTimestamp);
+
+            // Join current record with other
+            try (final WindowStoreIterator<V2> iter = windowStore.fetch(
+                record.key(), timeFrom, timeTo)) {
+                while (iter.hasNext()) {
+                    final KeyValue<Long, V2> otherRecord = iter.next();
+                    final long otherRecordTimestamp = otherRecord.key;
+
+                    // Join this with other
+                    context().forward(
+                        record.withValue(joinerThis.apply(
+                                record.key(), record.value(), otherRecord.value))
+                            .withTimestamp(Math.max(inputRecordTimestamp, otherRecordTimestamp)));
+                }
+            }
+
+            // Needs to be in a different loop to ensure correct ordering of records where
+            // correct ordering means it matches the output of an inner join.
+            timeFrom = Math.max(0L, inputRecordTimestamp - joinOtherBeforeMs);
+            timeTo = Math.max(0L, inputRecordTimestamp + joinOtherAfterMs);
+            try (final WindowStoreIterator<V2> iter2 = windowStore.fetch(
+                record.key(), timeFrom, timeTo)) {
+                while (iter2.hasNext()) {
+                    final KeyValue<Long, V2> otherRecord = iter2.next();
+                    final long otherRecordTimestamp = otherRecord.key;
+                    final long maxRecordTimestamp = Math.max(inputRecordTimestamp, otherRecordTimestamp);
+
+                    // This is needed so that output records follow timestamp order
+                    // Join this with self
+                    if (inputRecordTimestamp < maxRecordTimestamp && !emittedJoinWithSelf) {

Review Comment:
   Won't we emit the self-join record twice, since we would first put this record into the window store and then trigger this processor? I.e. we would get the same record from the fetch iterator as well, hence we would emit the record from the joining of the fetched record once, and then again emitting the selfRecord again here.



##########
streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamSelfJoin.java:
##########
@@ -0,0 +1,144 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.kstream.internals;
+
+import static org.apache.kafka.streams.processor.internals.metrics.TaskMetrics.droppedRecordsSensor;
+
+import org.apache.kafka.common.metrics.Sensor;
+import org.apache.kafka.streams.KeyValue;
+import org.apache.kafka.streams.kstream.ValueJoinerWithKey;
+import org.apache.kafka.streams.kstream.internals.KStreamImplJoin.TimeTracker;
+import org.apache.kafka.streams.processor.api.ContextualProcessor;
+import org.apache.kafka.streams.processor.api.Processor;
+import org.apache.kafka.streams.processor.api.ProcessorContext;
+import org.apache.kafka.streams.processor.api.ProcessorSupplier;
+import org.apache.kafka.streams.processor.api.Record;
+import org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl;
+import org.apache.kafka.streams.state.WindowStore;
+import org.apache.kafka.streams.state.WindowStoreIterator;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+class KStreamKStreamSelfJoin<K, V1, V2, VOut> implements ProcessorSupplier<K, V1, K, VOut> {
+    private static final Logger LOG = LoggerFactory.getLogger(KStreamKStreamSelfJoin.class);
+
+    private final String windowName;
+    private final long joinThisBeforeMs;
+    private final long joinThisAfterMs;
+    private final long joinOtherBeforeMs;
+    private final long joinOtherAfterMs;
+    private final ValueJoinerWithKey<? super K, ? super V1, ? super V2, ? extends VOut> joinerThis;
+
+    private final TimeTracker sharedTimeTracker;
+
+    KStreamKStreamSelfJoin(
+        final String windowName,
+        final JoinWindowsInternal windows,
+        final ValueJoinerWithKey<? super K, ? super V1, ? super V2, ? extends VOut> joinerThis,
+        final TimeTracker sharedTimeTracker) {
+
+        this.windowName = windowName;
+        this.joinThisBeforeMs = windows.beforeMs;
+        this.joinThisAfterMs = windows.afterMs;
+        this.joinOtherBeforeMs = windows.afterMs;
+        this.joinOtherAfterMs = windows.beforeMs;
+        this.joinerThis = joinerThis;
+        this.sharedTimeTracker = sharedTimeTracker;
+    }
+
+    @Override
+    public Processor<K, V1, K, VOut> get() {
+        return new KStreamKStreamSelfJoinProcessor();
+    }
+
+    private class KStreamKStreamSelfJoinProcessor extends ContextualProcessor<K, V1, K, VOut> {
+        private WindowStore<K, V2> windowStore;
+        private Sensor droppedRecordsSensor;
+
+        @Override
+        public void init(final ProcessorContext<K, VOut> context) {
+            super.init(context);
+
+            final StreamsMetricsImpl metrics = (StreamsMetricsImpl) context.metrics();
+            droppedRecordsSensor = droppedRecordsSensor(Thread.currentThread().getName(), context.taskId().toString(), metrics);
+            windowStore = context.getStateStore(windowName);
+        }
+
+        @SuppressWarnings("unchecked")
+        @Override
+        public void process(final Record<K, V1> record) {
+            if (StreamStreamJoinUtil.skipRecord(record, LOG, droppedRecordsSensor, context())) {
+                return;
+            }
+
+            final long inputRecordTimestamp = record.timestamp();
+            long timeFrom = Math.max(0L, inputRecordTimestamp - joinThisBeforeMs);
+            long timeTo = Math.max(0L, inputRecordTimestamp + joinThisAfterMs);
+            boolean emittedJoinWithSelf = false;
+            final Record selfRecord = record
+                .withValue(joinerThis.apply(record.key(), record.value(), (V2) record.value()))
+                .withTimestamp(inputRecordTimestamp);
+            sharedTimeTracker.advanceStreamTime(inputRecordTimestamp);
+
+            // Join current record with other
+            try (final WindowStoreIterator<V2> iter = windowStore.fetch(
+                record.key(), timeFrom, timeTo)) {
+                while (iter.hasNext()) {
+                    final KeyValue<Long, V2> otherRecord = iter.next();
+                    final long otherRecordTimestamp = otherRecord.key;
+
+                    // Join this with other
+                    context().forward(
+                        record.withValue(joinerThis.apply(
+                                record.key(), record.value(), otherRecord.value))
+                            .withTimestamp(Math.max(inputRecordTimestamp, otherRecordTimestamp)));
+                }
+            }
+
+            // Needs to be in a different loop to ensure correct ordering of records where
+            // correct ordering means it matches the output of an inner join.
+            timeFrom = Math.max(0L, inputRecordTimestamp - joinOtherBeforeMs);

Review Comment:
   It's not clear to me why we loop over the same store with the same fetch params twice?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] vpapavas commented on a diff in pull request #12555: Optimize self-join

Posted by GitBox <gi...@apache.org>.
vpapavas commented on code in PR #12555:
URL: https://github.com/apache/kafka/pull/12555#discussion_r962781264


##########
streams/src/main/java/org/apache/kafka/streams/kstream/internals/InternalStreamsBuilder.java:
##########
@@ -356,6 +381,95 @@ private void mergeDuplicateSourceNodes() {
         }
     }
 
+    /**
+     * If the join is a self-join, remove the node KStreamJoinWindow corresponding to the
+     */
+    @SuppressWarnings("unchecked")
+    private void rewriteSelfJoin(final GraphNode currentNode, final Set<GraphNode> visited) {
+        visited.add(currentNode);
+        if (currentNode instanceof StreamStreamJoinNode && isSelfJoin(currentNode)) {
+            ((StreamStreamJoinNode) currentNode).setSelfJoin();
+            // Remove JoinOtherWindowed node
+            final GraphNode parent = currentNode.parentNodes().stream().findFirst().get();
+            GraphNode left = null, right = null;
+            for (final GraphNode child: parent.children()) {
+                if (child instanceof  ProcessorGraphNode
+                    && isStreamJoinWindowNode((ProcessorGraphNode) child)) {
+                    if (left == null) {
+                        left = child;
+                    } else {
+                        right = child;
+                    }
+                }
+            }
+            // Sanity check
+            if (left != null && right != null && left.buildPriority() < right.buildPriority()) {
+                parent.removeChild(right);
+            }
+        }
+        for (final GraphNode child: currentNode.children()) {
+            if (!visited.contains(child)) {
+                rewriteSelfJoin(child, visited);
+            }
+        }
+    }
+
+    /**
+     * The self-join rewriting can be applied if:
+     * 1. The path from the StreamStreamJoinNode to the root contains a single source node.
+     * 2. The StreamStreamJoinNode has a single parent.
+     * 3. There are no other nodes besides the KStreamJoinWindow  that are siblings of the
+     * StreamStreamJoinNode and have smaller build priority.
+     */
+    private boolean isSelfJoin(final GraphNode streamJoinNode) {
+        final AtomicInteger count = new AtomicInteger();
+        countSourceNodes(count, streamJoinNode, new HashSet<>());
+        if (count.get() > 1) {
+            return false;
+        }
+        if (streamJoinNode.parentNodes().size() > 1) {
+            return false;
+        }
+        for (final GraphNode parent: streamJoinNode.parentNodes()) {
+            for (final GraphNode sibling : parent.children()) {
+                if (sibling instanceof ProcessorGraphNode) {
+                    if (isStreamJoinWindowNode((ProcessorGraphNode) sibling)) {
+                        continue;
+                    }
+                }
+                if (sibling != streamJoinNode
+                    && sibling.buildPriority() < streamJoinNode.buildPriority()) {
+                    return false;
+                }
+            }
+        }
+        return true;
+    }
+
+    private void countSourceNodes(
+        final AtomicInteger count,
+        final GraphNode currentNode,
+        final Set<GraphNode> visited) {
+
+        if (currentNode instanceof StreamSourceNode) {
+            count.incrementAndGet();
+        }
+
+        for (final GraphNode parent: currentNode.parentNodes()) {
+            if (!visited.contains(parent)) {

Review Comment:
   Actually, tests fail with the equals so I will use the IdentifyHashMap since it seems the assumption is that `GraphNodes` are compared by reference. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] vpapavas commented on a diff in pull request #12555: Optimize self-join

Posted by GitBox <gi...@apache.org>.
vpapavas commented on code in PR #12555:
URL: https://github.com/apache/kafka/pull/12555#discussion_r960474042


##########
streams/src/main/java/org/apache/kafka/streams/kstream/internals/InternalStreamsBuilder.java:
##########
@@ -270,17 +276,36 @@ private void maybeAddNodeForOptimizationMetadata(final GraphNode node) {
 
     // use this method for testing only
     public void buildAndOptimizeTopology() {
-        buildAndOptimizeTopology(false);
+        buildAndOptimizeTopology(null);
     }
 
-    public void buildAndOptimizeTopology(final boolean optimizeTopology) {
+    public void buildAndOptimizeTopology(final Properties props) {
+        // Vicky: Do we need to verify props?
+        final List<String> optimizationConfigs;
+        if (props == null) {
+            optimizationConfigs = new ArrayList<>();
+            optimizationConfigs.add(StreamsConfig.NO_OPTIMIZATION);
+        } else {
+            final StreamsConfig config = new StreamsConfig(props);
+            optimizationConfigs = config.getList(StreamsConfig.TOPOLOGY_OPTIMIZATION_CONFIG);
+        }
 
         mergeDuplicateSourceNodes();
-        if (optimizeTopology) {
-            LOG.debug("Optimizing the Kafka Streams graph for repartition nodes");
+        if (optimizationConfigs.contains(StreamsConfig.OPTIMIZE)

Review Comment:
   A user can optimize either by providing the config `all` which is the `StreamsConfig.OPTIMIZE` which will apply all optimization rules or by specifying a list of optimization rules. With your suggestion, an optimization will be applied only if `contains(OPTIMIZE)` is true which is not correct. WDYT?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] jnh5y commented on a diff in pull request #12555: Optimize self-join

Posted by GitBox <gi...@apache.org>.
jnh5y commented on code in PR #12555:
URL: https://github.com/apache/kafka/pull/12555#discussion_r968993197


##########
streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamSelfJoin.java:
##########
@@ -0,0 +1,172 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.kstream.internals;
+
+import static org.apache.kafka.streams.processor.internals.metrics.TaskMetrics.droppedRecordsSensor;
+
+import org.apache.kafka.common.metrics.Sensor;
+import org.apache.kafka.streams.KeyValue;
+import org.apache.kafka.streams.kstream.ValueJoinerWithKey;
+import org.apache.kafka.streams.kstream.internals.KStreamImplJoin.TimeTracker;
+import org.apache.kafka.streams.processor.api.Processor;
+import org.apache.kafka.streams.processor.api.ProcessorContext;
+import org.apache.kafka.streams.processor.api.ProcessorSupplier;
+import org.apache.kafka.streams.processor.api.Record;
+import org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl;
+import org.apache.kafka.streams.state.WindowStore;
+import org.apache.kafka.streams.state.WindowStoreIterator;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+class KStreamKStreamSelfJoin<K, V1, V2, VOut> implements ProcessorSupplier<K, V1, K, VOut> {
+    private static final Logger LOG = LoggerFactory.getLogger(KStreamKStreamSelfJoin.class);
+
+    private final String windowName;
+    private final long joinThisBeforeMs;
+    private final long joinThisAfterMs;
+    private final long joinOtherBeforeMs;
+    private final long joinOtherAfterMs;
+    private final ValueJoinerWithKey<? super K, ? super V1, ? super V2, ? extends VOut> joinerThis;
+    private final ValueJoinerWithKey<? super K, ? super V2, ? super V1, ? extends VOut> joinerOther;
+
+    private final TimeTracker sharedTimeTracker;
+
+    KStreamKStreamSelfJoin(
+        final String windowName,
+        final JoinWindowsInternal windows,
+        final ValueJoinerWithKey<? super K, ? super V1, ? super V2, ? extends VOut> joinerThis,
+        final ValueJoinerWithKey<? super K, ? super V2, ? super V1, ? extends VOut> joinerOther,
+        final TimeTracker sharedTimeTracker) {
+
+        this.windowName = windowName;
+        this.joinThisBeforeMs = windows.beforeMs;
+        this.joinThisAfterMs = windows.afterMs;
+        this.joinOtherBeforeMs = windows.afterMs;
+        this.joinOtherAfterMs = windows.beforeMs;
+        this.joinerThis = joinerThis;
+        this.joinerOther = joinerOther;
+        this.sharedTimeTracker = sharedTimeTracker;
+    }
+
+    @Override
+    public Processor<K, V1, K, VOut> get() {
+        return new KStreamKStreamSelfJoinProcessor();
+    }
+
+    private class KStreamKStreamSelfJoinProcessor extends StreamStreamJoinProcessor<K, V1, K, VOut> {
+        private WindowStore<K, V2> windowStore;
+        private Sensor droppedRecordsSensor;
+
+        @Override
+        public void init(final ProcessorContext<K, VOut> context) {
+            super.init(context);
+
+            final StreamsMetricsImpl metrics = (StreamsMetricsImpl) context.metrics();
+            droppedRecordsSensor = droppedRecordsSensor(Thread.currentThread().getName(), context.taskId().toString(), metrics);
+            windowStore = context.getStateStore(windowName);
+        }
+
+        @SuppressWarnings("unchecked")
+        @Override
+        public void process(final Record<K, V1> record) {
+            System.out.println("---> Processing record: " + record);
+            if (skipRecord(record, LOG, droppedRecordsSensor)) {
+                return;
+            }
+
+            final long inputRecordTimestamp = record.timestamp();
+            long timeFrom = Math.max(0L, inputRecordTimestamp - joinThisBeforeMs);
+            long timeTo = Math.max(0L, inputRecordTimestamp + joinThisAfterMs);
+            boolean emittedJoinWithSelf = false;
+
+            sharedTimeTracker.advanceStreamTime(inputRecordTimestamp);
+            System.out.println("----> Window store fetch, timeFrom=" + timeFrom + " timeTo=" + timeTo);
+
+            // Join current record with other
+            System.out.println("----> Window store fetch, timeFrom=" + timeFrom + " timeTo=" + timeTo);

Review Comment:
   System.out.println should likely be removed or replaced with logger.trace/debug?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] guozhangwang commented on a diff in pull request #12555: Optimize self-join

Posted by GitBox <gi...@apache.org>.
guozhangwang commented on code in PR #12555:
URL: https://github.com/apache/kafka/pull/12555#discussion_r971010379


##########
streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamSelfJoin.java:
##########
@@ -0,0 +1,144 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.kstream.internals;
+
+import static org.apache.kafka.streams.processor.internals.metrics.TaskMetrics.droppedRecordsSensor;
+
+import org.apache.kafka.common.metrics.Sensor;
+import org.apache.kafka.streams.KeyValue;
+import org.apache.kafka.streams.kstream.ValueJoinerWithKey;
+import org.apache.kafka.streams.kstream.internals.KStreamImplJoin.TimeTracker;
+import org.apache.kafka.streams.processor.api.ContextualProcessor;
+import org.apache.kafka.streams.processor.api.Processor;
+import org.apache.kafka.streams.processor.api.ProcessorContext;
+import org.apache.kafka.streams.processor.api.ProcessorSupplier;
+import org.apache.kafka.streams.processor.api.Record;
+import org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl;
+import org.apache.kafka.streams.state.WindowStore;
+import org.apache.kafka.streams.state.WindowStoreIterator;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+class KStreamKStreamSelfJoin<K, V1, V2, VOut> implements ProcessorSupplier<K, V1, K, VOut> {
+    private static final Logger LOG = LoggerFactory.getLogger(KStreamKStreamSelfJoin.class);
+
+    private final String windowName;
+    private final long joinThisBeforeMs;
+    private final long joinThisAfterMs;
+    private final long joinOtherBeforeMs;
+    private final long joinOtherAfterMs;
+    private final ValueJoinerWithKey<? super K, ? super V1, ? super V2, ? extends VOut> joinerThis;
+
+    private final TimeTracker sharedTimeTracker;
+
+    KStreamKStreamSelfJoin(
+        final String windowName,
+        final JoinWindowsInternal windows,
+        final ValueJoinerWithKey<? super K, ? super V1, ? super V2, ? extends VOut> joinerThis,
+        final TimeTracker sharedTimeTracker) {
+
+        this.windowName = windowName;
+        this.joinThisBeforeMs = windows.beforeMs;
+        this.joinThisAfterMs = windows.afterMs;
+        this.joinOtherBeforeMs = windows.afterMs;
+        this.joinOtherAfterMs = windows.beforeMs;
+        this.joinerThis = joinerThis;
+        this.sharedTimeTracker = sharedTimeTracker;
+    }
+
+    @Override
+    public Processor<K, V1, K, VOut> get() {
+        return new KStreamKStreamSelfJoinProcessor();
+    }
+
+    private class KStreamKStreamSelfJoinProcessor extends ContextualProcessor<K, V1, K, VOut> {
+        private WindowStore<K, V2> windowStore;
+        private Sensor droppedRecordsSensor;
+
+        @Override
+        public void init(final ProcessorContext<K, VOut> context) {
+            super.init(context);
+
+            final StreamsMetricsImpl metrics = (StreamsMetricsImpl) context.metrics();
+            droppedRecordsSensor = droppedRecordsSensor(Thread.currentThread().getName(), context.taskId().toString(), metrics);
+            windowStore = context.getStateStore(windowName);
+        }
+
+        @SuppressWarnings("unchecked")
+        @Override
+        public void process(final Record<K, V1> record) {
+            if (StreamStreamJoinUtil.skipRecord(record, LOG, droppedRecordsSensor, context())) {
+                return;
+            }
+
+            final long inputRecordTimestamp = record.timestamp();
+            long timeFrom = Math.max(0L, inputRecordTimestamp - joinThisBeforeMs);

Review Comment:
   I see. Is that the same as:
   
   1) loop once with the larger value of (this window, other window);
   2) for each record, check if it falls in both windows or not, if it falls in both windows, we emit it twice; otherwise we emit it once.



##########
streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamSelfJoin.java:
##########
@@ -0,0 +1,144 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.kstream.internals;
+
+import static org.apache.kafka.streams.processor.internals.metrics.TaskMetrics.droppedRecordsSensor;
+
+import org.apache.kafka.common.metrics.Sensor;
+import org.apache.kafka.streams.KeyValue;
+import org.apache.kafka.streams.kstream.ValueJoinerWithKey;
+import org.apache.kafka.streams.kstream.internals.KStreamImplJoin.TimeTracker;
+import org.apache.kafka.streams.processor.api.ContextualProcessor;
+import org.apache.kafka.streams.processor.api.Processor;
+import org.apache.kafka.streams.processor.api.ProcessorContext;
+import org.apache.kafka.streams.processor.api.ProcessorSupplier;
+import org.apache.kafka.streams.processor.api.Record;
+import org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl;
+import org.apache.kafka.streams.state.WindowStore;
+import org.apache.kafka.streams.state.WindowStoreIterator;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+class KStreamKStreamSelfJoin<K, V1, V2, VOut> implements ProcessorSupplier<K, V1, K, VOut> {
+    private static final Logger LOG = LoggerFactory.getLogger(KStreamKStreamSelfJoin.class);
+
+    private final String windowName;
+    private final long joinThisBeforeMs;
+    private final long joinThisAfterMs;
+    private final long joinOtherBeforeMs;
+    private final long joinOtherAfterMs;
+    private final ValueJoinerWithKey<? super K, ? super V1, ? super V2, ? extends VOut> joinerThis;
+
+    private final TimeTracker sharedTimeTracker;
+
+    KStreamKStreamSelfJoin(
+        final String windowName,
+        final JoinWindowsInternal windows,
+        final ValueJoinerWithKey<? super K, ? super V1, ? super V2, ? extends VOut> joinerThis,
+        final TimeTracker sharedTimeTracker) {
+
+        this.windowName = windowName;
+        this.joinThisBeforeMs = windows.beforeMs;
+        this.joinThisAfterMs = windows.afterMs;
+        this.joinOtherBeforeMs = windows.afterMs;
+        this.joinOtherAfterMs = windows.beforeMs;
+        this.joinerThis = joinerThis;
+        this.sharedTimeTracker = sharedTimeTracker;
+    }
+
+    @Override
+    public Processor<K, V1, K, VOut> get() {
+        return new KStreamKStreamSelfJoinProcessor();
+    }
+
+    private class KStreamKStreamSelfJoinProcessor extends ContextualProcessor<K, V1, K, VOut> {
+        private WindowStore<K, V2> windowStore;
+        private Sensor droppedRecordsSensor;
+
+        @Override
+        public void init(final ProcessorContext<K, VOut> context) {
+            super.init(context);
+
+            final StreamsMetricsImpl metrics = (StreamsMetricsImpl) context.metrics();
+            droppedRecordsSensor = droppedRecordsSensor(Thread.currentThread().getName(), context.taskId().toString(), metrics);
+            windowStore = context.getStateStore(windowName);
+        }
+
+        @SuppressWarnings("unchecked")
+        @Override
+        public void process(final Record<K, V1> record) {
+            if (StreamStreamJoinUtil.skipRecord(record, LOG, droppedRecordsSensor, context())) {
+                return;
+            }
+
+            final long inputRecordTimestamp = record.timestamp();
+            long timeFrom = Math.max(0L, inputRecordTimestamp - joinThisBeforeMs);
+            long timeTo = Math.max(0L, inputRecordTimestamp + joinThisAfterMs);
+            boolean emittedJoinWithSelf = false;
+            final Record selfRecord = record
+                .withValue(joinerThis.apply(record.key(), record.value(), (V2) record.value()))
+                .withTimestamp(inputRecordTimestamp);
+            sharedTimeTracker.advanceStreamTime(inputRecordTimestamp);
+
+            // Join current record with other
+            try (final WindowStoreIterator<V2> iter = windowStore.fetch(
+                record.key(), timeFrom, timeTo)) {
+                while (iter.hasNext()) {
+                    final KeyValue<Long, V2> otherRecord = iter.next();
+                    final long otherRecordTimestamp = otherRecord.key;
+
+                    // Join this with other
+                    context().forward(
+                        record.withValue(joinerThis.apply(
+                                record.key(), record.value(), otherRecord.value))
+                            .withTimestamp(Math.max(inputRecordTimestamp, otherRecordTimestamp)));
+                }
+            }
+
+            // Needs to be in a different loop to ensure correct ordering of records where
+            // correct ordering means it matches the output of an inner join.
+            timeFrom = Math.max(0L, inputRecordTimestamp - joinOtherBeforeMs);
+            timeTo = Math.max(0L, inputRecordTimestamp + joinOtherAfterMs);
+            try (final WindowStoreIterator<V2> iter2 = windowStore.fetch(
+                record.key(), timeFrom, timeTo)) {
+                while (iter2.hasNext()) {
+                    final KeyValue<Long, V2> otherRecord = iter2.next();
+                    final long otherRecordTimestamp = otherRecord.key;
+                    final long maxRecordTimestamp = Math.max(inputRecordTimestamp, otherRecordTimestamp);
+
+                    // This is needed so that output records follow timestamp order
+                    // Join this with self
+                    if (inputRecordTimestamp < maxRecordTimestamp && !emittedJoinWithSelf) {

Review Comment:
   Got it, thanks! :)
   
   I did some digging in history and I think we do not really have a strong case to update state store later: https://github.com/apache/kafka/pull/737/files#r49016643
   
   I think we can reorder in the JoinWindowProcessor to avoid this logic here.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] guozhangwang commented on a diff in pull request #12555: Optimize self-join

Posted by GitBox <gi...@apache.org>.
guozhangwang commented on code in PR #12555:
URL: https://github.com/apache/kafka/pull/12555#discussion_r958991975


##########
streams/src/main/java/org/apache/kafka/streams/kstream/internals/InternalStreamsBuilder.java:
##########
@@ -270,16 +272,20 @@ private void maybeAddNodeForOptimizationMetadata(final GraphNode node) {
 
     // use this method for testing only
     public void buildAndOptimizeTopology() {
-        buildAndOptimizeTopology(false);
+        buildAndOptimizeTopology(false, false);
     }
 
-    public void buildAndOptimizeTopology(final boolean optimizeTopology) {
+    public void buildAndOptimizeTopology(
+        final boolean optimizeTopology, final boolean optimizeSelfJoin) {
 
         mergeDuplicateSourceNodes();
         if (optimizeTopology) {
             LOG.debug("Optimizing the Kafka Streams graph for repartition nodes");
             optimizeKTableSourceTopics();
             maybeOptimizeRepartitionOperations();
+            if (optimizeSelfJoin) {

Review Comment:
   The original idea for not doing "two-pass" mechanism is that some of the optimizations, like renaming the internal topics, are only doable with the configs in place which was not the case before. But after thinking about your arguments I agree that optimizations such as self-joins would better be done after the original logical plan is built, so we should keep the second pass still for this and future optimization rules.
   
   Following that, my next thought would be, is there an easier condition to check if this rule can be applied. The current condition as you stated in the comments need to check three conditions, and I'm wondering if we can just check condition #2 i.e. the StreamStreamJoinNode has only one parent. Could you elaborate a bit why we need the other two conditions as well?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] vpapavas commented on a diff in pull request #12555: Optimize self-join

Posted by GitBox <gi...@apache.org>.
vpapavas commented on code in PR #12555:
URL: https://github.com/apache/kafka/pull/12555#discussion_r960457146


##########
streams/src/main/java/org/apache/kafka/streams/kstream/internals/InternalStreamsBuilder.java:
##########
@@ -270,16 +272,20 @@ private void maybeAddNodeForOptimizationMetadata(final GraphNode node) {
 
     // use this method for testing only
     public void buildAndOptimizeTopology() {
-        buildAndOptimizeTopology(false);
+        buildAndOptimizeTopology(false, false);
     }
 
-    public void buildAndOptimizeTopology(final boolean optimizeTopology) {
+    public void buildAndOptimizeTopology(
+        final boolean optimizeTopology, final boolean optimizeSelfJoin) {
 
         mergeDuplicateSourceNodes();
         if (optimizeTopology) {
             LOG.debug("Optimizing the Kafka Streams graph for repartition nodes");
             optimizeKTableSourceTopics();
             maybeOptimizeRepartitionOperations();
+            if (optimizeSelfJoin) {

Review Comment:
   Hey @guozhangwang ! 
   Cases 1 and 2 are optimizable and will be optimized by the algorithm. I have tests for these cases in `InternalStreamsBuilderTest`.  
   Case 3 is not optimizable and won't be recognized. The reason is that processors like `mapValues` or `filter` or `transform` are black-boxes. There is no way to know how they change the contents of a stream hence there is no way to figure out if the two sides of the join are still the same. 
   Case 4 could be optimizable but I did not consider it. I initially only had in-scope cases like 1 and 2. I can add it by adding a special rule to the rewriter that would check if the parent of the join is a merge, then it's ok to have multiple source nodes as long as they are ancestors of the merge node. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] jnh5y commented on a diff in pull request #12555: Optimize self-join

Posted by GitBox <gi...@apache.org>.
jnh5y commented on code in PR #12555:
URL: https://github.com/apache/kafka/pull/12555#discussion_r968974464


##########
streams/src/main/java/org/apache/kafka/streams/kstream/internals/InternalStreamsBuilder.java:
##########
@@ -270,17 +277,12 @@ private void maybeAddNodeForOptimizationMetadata(final GraphNode node) {
 
     // use this method for testing only
     public void buildAndOptimizeTopology() {
-        buildAndOptimizeTopology(false);
+        buildAndOptimizeTopology(null);
     }
 
-    public void buildAndOptimizeTopology(final boolean optimizeTopology) {

Review Comment:
   Is this part of the public API?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] vpapavas commented on a diff in pull request #12555: Optimize self-join

Posted by GitBox <gi...@apache.org>.
vpapavas commented on code in PR #12555:
URL: https://github.com/apache/kafka/pull/12555#discussion_r969353780


##########
streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamSelfJoin.java:
##########
@@ -0,0 +1,172 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.kstream.internals;
+
+import static org.apache.kafka.streams.processor.internals.metrics.TaskMetrics.droppedRecordsSensor;
+
+import org.apache.kafka.common.metrics.Sensor;
+import org.apache.kafka.streams.KeyValue;
+import org.apache.kafka.streams.kstream.ValueJoinerWithKey;
+import org.apache.kafka.streams.kstream.internals.KStreamImplJoin.TimeTracker;
+import org.apache.kafka.streams.processor.api.Processor;
+import org.apache.kafka.streams.processor.api.ProcessorContext;
+import org.apache.kafka.streams.processor.api.ProcessorSupplier;
+import org.apache.kafka.streams.processor.api.Record;
+import org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl;
+import org.apache.kafka.streams.state.WindowStore;
+import org.apache.kafka.streams.state.WindowStoreIterator;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+class KStreamKStreamSelfJoin<K, V1, V2, VOut> implements ProcessorSupplier<K, V1, K, VOut> {
+    private static final Logger LOG = LoggerFactory.getLogger(KStreamKStreamSelfJoin.class);
+
+    private final String windowName;
+    private final long joinThisBeforeMs;
+    private final long joinThisAfterMs;
+    private final long joinOtherBeforeMs;
+    private final long joinOtherAfterMs;
+    private final ValueJoinerWithKey<? super K, ? super V1, ? super V2, ? extends VOut> joinerThis;
+    private final ValueJoinerWithKey<? super K, ? super V2, ? super V1, ? extends VOut> joinerOther;
+
+    private final TimeTracker sharedTimeTracker;
+
+    KStreamKStreamSelfJoin(
+        final String windowName,
+        final JoinWindowsInternal windows,
+        final ValueJoinerWithKey<? super K, ? super V1, ? super V2, ? extends VOut> joinerThis,
+        final ValueJoinerWithKey<? super K, ? super V2, ? super V1, ? extends VOut> joinerOther,
+        final TimeTracker sharedTimeTracker) {
+
+        this.windowName = windowName;
+        this.joinThisBeforeMs = windows.beforeMs;
+        this.joinThisAfterMs = windows.afterMs;
+        this.joinOtherBeforeMs = windows.afterMs;
+        this.joinOtherAfterMs = windows.beforeMs;
+        this.joinerThis = joinerThis;
+        this.joinerOther = joinerOther;
+        this.sharedTimeTracker = sharedTimeTracker;
+    }
+
+    @Override
+    public Processor<K, V1, K, VOut> get() {
+        return new KStreamKStreamSelfJoinProcessor();
+    }
+
+    private class KStreamKStreamSelfJoinProcessor extends StreamStreamJoinProcessor<K, V1, K, VOut> {
+        private WindowStore<K, V2> windowStore;
+        private Sensor droppedRecordsSensor;
+
+        @Override
+        public void init(final ProcessorContext<K, VOut> context) {
+            super.init(context);
+
+            final StreamsMetricsImpl metrics = (StreamsMetricsImpl) context.metrics();
+            droppedRecordsSensor = droppedRecordsSensor(Thread.currentThread().getName(), context.taskId().toString(), metrics);
+            windowStore = context.getStateStore(windowName);
+        }
+
+        @SuppressWarnings("unchecked")
+        @Override
+        public void process(final Record<K, V1> record) {
+            System.out.println("---> Processing record: " + record);
+            if (skipRecord(record, LOG, droppedRecordsSensor)) {
+                return;
+            }
+
+            final long inputRecordTimestamp = record.timestamp();
+            long timeFrom = Math.max(0L, inputRecordTimestamp - joinThisBeforeMs);
+            long timeTo = Math.max(0L, inputRecordTimestamp + joinThisAfterMs);
+            boolean emittedJoinWithSelf = false;
+
+            sharedTimeTracker.advanceStreamTime(inputRecordTimestamp);
+            System.out.println("----> Window store fetch, timeFrom=" + timeFrom + " timeTo=" + timeTo);
+
+            // Join current record with other
+            System.out.println("----> Window store fetch, timeFrom=" + timeFrom + " timeTo=" + timeTo);

Review Comment:
   These will all be removed once I am done with the reviews



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] vpapavas commented on a diff in pull request #12555: Optimize self-join

Posted by GitBox <gi...@apache.org>.
vpapavas commented on code in PR #12555:
URL: https://github.com/apache/kafka/pull/12555#discussion_r969520183


##########
streams/src/test/java/org/apache/kafka/streams/integration/SelfJoinUpgradeIntegrationTest.java:
##########
@@ -0,0 +1,277 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.streams.integration;
+
+import static java.time.Duration.ofMinutes;
+import static java.util.Arrays.asList;
+import static java.util.Collections.singletonList;
+import static org.apache.kafka.streams.integration.utils.IntegrationTestUtils.safeUniqueTestName;
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.hamcrest.core.Is.is;
+
+import java.io.IOException;
+import java.time.Duration;
+import java.util.List;
+import java.util.Properties;
+import org.apache.kafka.clients.consumer.ConsumerConfig;
+import org.apache.kafka.common.serialization.Serdes;
+import org.apache.kafka.common.serialization.StringDeserializer;
+import org.apache.kafka.common.serialization.StringSerializer;
+import org.apache.kafka.streams.KafkaStreams;
+import org.apache.kafka.streams.KeyValue;
+import org.apache.kafka.streams.KeyValueTimestamp;
+import org.apache.kafka.streams.StreamsBuilder;
+import org.apache.kafka.streams.StreamsConfig;
+import org.apache.kafka.streams.integration.utils.EmbeddedKafkaCluster;
+import org.apache.kafka.streams.integration.utils.IntegrationTestUtils;
+import org.apache.kafka.streams.kstream.Consumed;
+import org.apache.kafka.streams.kstream.JoinWindows;
+import org.apache.kafka.streams.kstream.KStream;
+import org.apache.kafka.streams.kstream.Produced;
+import org.apache.kafka.streams.kstream.ValueJoiner;
+import org.apache.kafka.test.IntegrationTest;
+import org.apache.kafka.test.TestUtils;
+import org.junit.After;
+import org.junit.AfterClass;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+import org.junit.rules.TestName;
+
+@Category({IntegrationTest.class})
+public class SelfJoinUpgradeIntegrationTest {
+    public static final String INPUT_TOPIC = "selfjoin-input";
+    public static final String OUTPUT_TOPIC = "selfjoin-output";
+    private String inputTopic;
+    private String outputTopic;
+
+    private KafkaStreams kafkaStreams;
+
+    public static final EmbeddedKafkaCluster CLUSTER = new EmbeddedKafkaCluster(1);
+
+    @BeforeClass
+    public static void startCluster() throws IOException {
+        CLUSTER.start();
+    }
+
+    @AfterClass
+    public static void closeCluster() {
+        CLUSTER.stop();
+    }
+
+    @Rule
+    public TestName testName = new TestName();
+
+    @Before
+    public void createTopics() throws Exception {
+        inputTopic = INPUT_TOPIC + safeUniqueTestName(getClass(), testName);
+        outputTopic = OUTPUT_TOPIC + safeUniqueTestName(getClass(), testName);
+        CLUSTER.createTopic(inputTopic);
+        CLUSTER.createTopic(outputTopic);
+    }
+
+    private Properties props() {
+        final Properties streamsConfiguration = new Properties();
+        final String safeTestName = safeUniqueTestName(getClass(), testName);
+        streamsConfiguration.put(StreamsConfig.APPLICATION_ID_CONFIG, "app-" + safeTestName);
+        streamsConfiguration.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, CLUSTER.bootstrapServers());
+        streamsConfiguration.put(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG, 0);
+        streamsConfiguration.put(StreamsConfig.STATE_DIR_CONFIG, TestUtils.tempDirectory().getPath());
+        streamsConfiguration.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG,
+                                 Serdes.String().getClass());
+        streamsConfiguration.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG,
+                                 Serdes.String().getClass());
+        streamsConfiguration.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, 1000L);
+        streamsConfiguration.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
+        return streamsConfiguration;
+    }
+
+    @After
+    public void shutdown() {
+        if (kafkaStreams != null) {
+            kafkaStreams.close(Duration.ofSeconds(30L));
+            kafkaStreams.cleanUp();
+        }
+    }
+
+
+    @Test
+    @SuppressWarnings("unchecked")
+    public void shouldUpgradeWithTopologyOptimizationOff() throws Exception {
+
+        final StreamsBuilder streamsBuilderOld = new StreamsBuilder();
+        final KStream<String, String> leftOld = streamsBuilderOld.stream(
+            inputTopic, Consumed.with(Serdes.String(), Serdes.String()));
+        final ValueJoiner<String, String, String> valueJoiner = (v, v2) -> v + v2;
+        final KStream<String, String> joinedOld = leftOld.join(
+            leftOld,
+            valueJoiner,
+            JoinWindows.ofTimeDifferenceWithNoGrace(ofMinutes(100))
+        );
+        joinedOld.to(outputTopic, Produced.with(Serdes.String(), Serdes.String()));
+
+
+        final Properties props = props();
+        kafkaStreams = new KafkaStreams(streamsBuilderOld.build(), props);
+        kafkaStreams.start();
+
+        final long currentTime = CLUSTER.time.milliseconds();
+        processKeyValueAndVerifyCount("1", "A", currentTime + 42L, asList(
+            new KeyValueTimestamp<String, String>("1", "AA", currentTime + 42L)));
+
+        processKeyValueAndVerifyCount("1", "B", currentTime + 43L, asList(
+            new KeyValueTimestamp("1", "BA", currentTime + 43L),
+            new KeyValueTimestamp("1", "AB", currentTime + 43L),
+            new KeyValueTimestamp("1", "BB", currentTime + 43L)));
+
+
+        kafkaStreams.close();
+        kafkaStreams = null;
+
+        final StreamsBuilder streamsBuilderNew = new StreamsBuilder();
+
+        final KStream<String, String> leftNew = streamsBuilderNew.stream(
+            inputTopic, Consumed.with(Serdes.String(), Serdes.String()));
+        final KStream<String, String> joinedNew = leftNew.join(
+            leftNew,
+            valueJoiner,
+            JoinWindows.ofTimeDifferenceWithNoGrace(ofMinutes(100))
+        );
+        joinedNew.to(outputTopic, Produced.with(Serdes.String(), Serdes.String()));
+
+        props.put(StreamsConfig.TOPOLOGY_OPTIMIZATION_CONFIG, StreamsConfig.OPTIMIZE);
+        kafkaStreams = new KafkaStreams(streamsBuilderNew.build(), props);
+        kafkaStreams.start();
+
+        final long currentTimeNew = CLUSTER.time.milliseconds();
+
+        processKeyValueAndVerifyCount("1", "C", currentTimeNew + 44L, asList(
+            new KeyValueTimestamp("1", "CA", currentTimeNew + 44L),
+            new KeyValueTimestamp("1", "CB", currentTimeNew + 44L),
+            new KeyValueTimestamp("1", "AC", currentTimeNew + 44L),
+            new KeyValueTimestamp("1", "BC", currentTimeNew + 44L),
+            new KeyValueTimestamp("1", "CC", currentTimeNew + 44L)));
+
+
+        kafkaStreams.close();
+    }
+
+    @Test
+    @SuppressWarnings("unchecked")
+    public void shouldUpgradeWithTopologyOptimizationOn() throws Exception {

Review Comment:
   An upgrade would include one of the two scenarios: 1. Either topology optimization was on before the upgrade and is on after the upgrade which will turn on the self-join rewriting or 2. The topology optimization was off and after the upgrade the user turned it on which will enable self-join rewritings. 
   Are these tests not sufficient to check that the rewriting is backwards compatible?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] guozhangwang commented on a diff in pull request #12555: Optimize self-join

Posted by GitBox <gi...@apache.org>.
guozhangwang commented on code in PR #12555:
URL: https://github.com/apache/kafka/pull/12555#discussion_r958780670


##########
streams/src/main/java/org/apache/kafka/streams/kstream/internals/InternalStreamsBuilder.java:
##########
@@ -270,16 +272,20 @@ private void maybeAddNodeForOptimizationMetadata(final GraphNode node) {
 
     // use this method for testing only
     public void buildAndOptimizeTopology() {
-        buildAndOptimizeTopology(false);
+        buildAndOptimizeTopology(false, false);
     }
 
-    public void buildAndOptimizeTopology(final boolean optimizeTopology) {
+    public void buildAndOptimizeTopology(
+        final boolean optimizeTopology, final boolean optimizeSelfJoin) {
 
         mergeDuplicateSourceNodes();
         if (optimizeTopology) {
             LOG.debug("Optimizing the Kafka Streams graph for repartition nodes");
             optimizeKTableSourceTopics();
             maybeOptimizeRepartitionOperations();
+            if (optimizeSelfJoin) {

Review Comment:
   Yeah I totally agree with you that today's logical plan is not so "logical" but have a lot of physical plan info baked..



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] vpapavas commented on a diff in pull request #12555: Optimize self-join

Posted by GitBox <gi...@apache.org>.
vpapavas commented on code in PR #12555:
URL: https://github.com/apache/kafka/pull/12555#discussion_r960708055


##########
streams/src/main/java/org/apache/kafka/streams/kstream/internals/InternalStreamsBuilder.java:
##########
@@ -356,6 +381,95 @@ private void mergeDuplicateSourceNodes() {
         }
     }
 
+    /**
+     * If the join is a self-join, remove the node KStreamJoinWindow corresponding to the
+     */
+    @SuppressWarnings("unchecked")
+    private void rewriteSelfJoin(final GraphNode currentNode, final Set<GraphNode> visited) {
+        visited.add(currentNode);
+        if (currentNode instanceof StreamStreamJoinNode && isSelfJoin(currentNode)) {
+            ((StreamStreamJoinNode) currentNode).setSelfJoin();
+            // Remove JoinOtherWindowed node
+            final GraphNode parent = currentNode.parentNodes().stream().findFirst().get();
+            GraphNode left = null, right = null;
+            for (final GraphNode child: parent.children()) {
+                if (child instanceof  ProcessorGraphNode
+                    && isStreamJoinWindowNode((ProcessorGraphNode) child)) {
+                    if (left == null) {
+                        left = child;
+                    } else {
+                        right = child;
+                    }
+                }
+            }
+            // Sanity check
+            if (left != null && right != null && left.buildPriority() < right.buildPriority()) {
+                parent.removeChild(right);
+            }
+        }
+        for (final GraphNode child: currentNode.children()) {
+            if (!visited.contains(child)) {
+                rewriteSelfJoin(child, visited);
+            }
+        }
+    }
+
+    /**
+     * The self-join rewriting can be applied if:
+     * 1. The path from the StreamStreamJoinNode to the root contains a single source node.
+     * 2. The StreamStreamJoinNode has a single parent.
+     * 3. There are no other nodes besides the KStreamJoinWindow  that are siblings of the
+     * StreamStreamJoinNode and have smaller build priority.

Review Comment:
   This is the order in which the nodes in the graph are visited and added to the topology. It matters here because the join node can have siblings with smaller build priority, hence they will be before it in the topological order of the topology (applied before the join). The only acceptable nodes are the JoinWindow nodes. If there are others, then it is not a self-join. 
   
   This check covers cases like:
   ```
   stream1 = builder.stream("topic1");
   streams1.mapValues(v -> v);
   stream2 = builder.stream("topic1"); // same topic
   stream1.join(stream2)
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] vpapavas commented on a diff in pull request #12555: Optimize self-join

Posted by GitBox <gi...@apache.org>.
vpapavas commented on code in PR #12555:
URL: https://github.com/apache/kafka/pull/12555#discussion_r960956896


##########
streams/src/main/java/org/apache/kafka/streams/kstream/internals/InternalStreamsBuilder.java:
##########
@@ -356,6 +381,95 @@ private void mergeDuplicateSourceNodes() {
         }
     }
 
+    /**
+     * If the join is a self-join, remove the node KStreamJoinWindow corresponding to the
+     */
+    @SuppressWarnings("unchecked")
+    private void rewriteSelfJoin(final GraphNode currentNode, final Set<GraphNode> visited) {
+        visited.add(currentNode);
+        if (currentNode instanceof StreamStreamJoinNode && isSelfJoin(currentNode)) {
+            ((StreamStreamJoinNode) currentNode).setSelfJoin();
+            // Remove JoinOtherWindowed node
+            final GraphNode parent = currentNode.parentNodes().stream().findFirst().get();
+            GraphNode left = null, right = null;
+            for (final GraphNode child: parent.children()) {
+                if (child instanceof  ProcessorGraphNode
+                    && isStreamJoinWindowNode((ProcessorGraphNode) child)) {
+                    if (left == null) {
+                        left = child;
+                    } else {
+                        right = child;
+                    }
+                }
+            }
+            // Sanity check
+            if (left != null && right != null && left.buildPriority() < right.buildPriority()) {
+                parent.removeChild(right);
+            }
+        }
+        for (final GraphNode child: currentNode.children()) {
+            if (!visited.contains(child)) {
+                rewriteSelfJoin(child, visited);
+            }
+        }
+    }
+
+    /**
+     * The self-join rewriting can be applied if:
+     * 1. The path from the StreamStreamJoinNode to the root contains a single source node.
+     * 2. The StreamStreamJoinNode has a single parent.
+     * 3. There are no other nodes besides the KStreamJoinWindow  that are siblings of the
+     * StreamStreamJoinNode and have smaller build priority.
+     */
+    private boolean isSelfJoin(final GraphNode streamJoinNode) {
+        final AtomicInteger count = new AtomicInteger();
+        countSourceNodes(count, streamJoinNode, new HashSet<>());
+        if (count.get() > 1) {
+            return false;
+        }
+        if (streamJoinNode.parentNodes().size() > 1) {
+            return false;
+        }
+        for (final GraphNode parent: streamJoinNode.parentNodes()) {
+            for (final GraphNode sibling : parent.children()) {
+                if (sibling instanceof ProcessorGraphNode) {
+                    if (isStreamJoinWindowNode((ProcessorGraphNode) sibling)) {
+                        continue;
+                    }
+                }
+                if (sibling != streamJoinNode
+                    && sibling.buildPriority() < streamJoinNode.buildPriority()) {
+                    return false;
+                }
+            }
+        }
+        return true;
+    }
+
+    private void countSourceNodes(
+        final AtomicInteger count,
+        final GraphNode currentNode,
+        final Set<GraphNode> visited) {
+
+        if (currentNode instanceof StreamSourceNode) {
+            count.incrementAndGet();
+        }
+
+        for (final GraphNode parent: currentNode.parentNodes()) {
+            if (!visited.contains(parent)) {

Review Comment:
   Great catch, thank you! I am not sure about the latter so I implemented the equals and hashcode



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] vpapavas commented on a diff in pull request #12555: Optimize self-join

Posted by GitBox <gi...@apache.org>.
vpapavas commented on code in PR #12555:
URL: https://github.com/apache/kafka/pull/12555#discussion_r970800187


##########
streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamSelfJoin.java:
##########
@@ -0,0 +1,144 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.kstream.internals;
+
+import static org.apache.kafka.streams.processor.internals.metrics.TaskMetrics.droppedRecordsSensor;
+
+import org.apache.kafka.common.metrics.Sensor;
+import org.apache.kafka.streams.KeyValue;
+import org.apache.kafka.streams.kstream.ValueJoinerWithKey;
+import org.apache.kafka.streams.kstream.internals.KStreamImplJoin.TimeTracker;
+import org.apache.kafka.streams.processor.api.ContextualProcessor;
+import org.apache.kafka.streams.processor.api.Processor;
+import org.apache.kafka.streams.processor.api.ProcessorContext;
+import org.apache.kafka.streams.processor.api.ProcessorSupplier;
+import org.apache.kafka.streams.processor.api.Record;
+import org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl;
+import org.apache.kafka.streams.state.WindowStore;
+import org.apache.kafka.streams.state.WindowStoreIterator;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+class KStreamKStreamSelfJoin<K, V1, V2, VOut> implements ProcessorSupplier<K, V1, K, VOut> {
+    private static final Logger LOG = LoggerFactory.getLogger(KStreamKStreamSelfJoin.class);
+
+    private final String windowName;
+    private final long joinThisBeforeMs;
+    private final long joinThisAfterMs;
+    private final long joinOtherBeforeMs;
+    private final long joinOtherAfterMs;
+    private final ValueJoinerWithKey<? super K, ? super V1, ? super V2, ? extends VOut> joinerThis;
+
+    private final TimeTracker sharedTimeTracker;
+
+    KStreamKStreamSelfJoin(
+        final String windowName,
+        final JoinWindowsInternal windows,
+        final ValueJoinerWithKey<? super K, ? super V1, ? super V2, ? extends VOut> joinerThis,
+        final TimeTracker sharedTimeTracker) {
+
+        this.windowName = windowName;
+        this.joinThisBeforeMs = windows.beforeMs;
+        this.joinThisAfterMs = windows.afterMs;
+        this.joinOtherBeforeMs = windows.afterMs;
+        this.joinOtherAfterMs = windows.beforeMs;
+        this.joinerThis = joinerThis;
+        this.sharedTimeTracker = sharedTimeTracker;
+    }
+
+    @Override
+    public Processor<K, V1, K, VOut> get() {
+        return new KStreamKStreamSelfJoinProcessor();
+    }
+
+    private class KStreamKStreamSelfJoinProcessor extends ContextualProcessor<K, V1, K, VOut> {
+        private WindowStore<K, V2> windowStore;
+        private Sensor droppedRecordsSensor;
+
+        @Override
+        public void init(final ProcessorContext<K, VOut> context) {
+            super.init(context);
+
+            final StreamsMetricsImpl metrics = (StreamsMetricsImpl) context.metrics();
+            droppedRecordsSensor = droppedRecordsSensor(Thread.currentThread().getName(), context.taskId().toString(), metrics);
+            windowStore = context.getStateStore(windowName);
+        }
+
+        @SuppressWarnings("unchecked")
+        @Override
+        public void process(final Record<K, V1> record) {
+            if (StreamStreamJoinUtil.skipRecord(record, LOG, droppedRecordsSensor, context())) {
+                return;
+            }
+
+            final long inputRecordTimestamp = record.timestamp();
+            long timeFrom = Math.max(0L, inputRecordTimestamp - joinThisBeforeMs);

Review Comment:
   They are not the same: the first uses the `joinThisBeforeMs` and the second uses the `joinOtherBeforeMs`. This is needed as the inner join uses different intervals when fetching rows from the window store based on whether it is the left or right-hand side. Since we want the self-join to match the output of the inner-join, I followed the same logic.  



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] vpapavas commented on a diff in pull request #12555: Optimize self-join

Posted by GitBox <gi...@apache.org>.
vpapavas commented on code in PR #12555:
URL: https://github.com/apache/kafka/pull/12555#discussion_r969371482


##########
streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamImplJoin.java:
##########
@@ -181,13 +181,22 @@ public <K, V1, V2, VOut> KStream<K, VOut> join(final KStream<K, V1> lhs,
             sharedTimeTracker
         );
 
+        final KStreamKStreamSelfJoin<K, V1, V2, VOut> selfJoin = new KStreamKStreamSelfJoin<>(
+            thisWindowStore.name(),
+            internalWindows,
+            joiner,
+            AbstractStream.reverseJoinerWithKey(joiner),
+            sharedTimeTracker
+        );

Review Comment:
   Yes, unfortunately this is the only place where I can get the information needed to create this node. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] vpapavas commented on a diff in pull request #12555: Optimize self-join

Posted by GitBox <gi...@apache.org>.
vpapavas commented on code in PR #12555:
URL: https://github.com/apache/kafka/pull/12555#discussion_r969484570


##########
streams/src/test/java/org/apache/kafka/streams/StreamsConfigTest.java:
##########
@@ -1263,6 +1263,37 @@ public void testInvalidSecurityProtocol() {
         assertTrue(ce.getMessage().contains(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG));
     }
 
+    @Test
+    public void shouldThrowExceptionWhenTopologyOptimizationOnAndOff() {

Review Comment:
   I am not sure I follow. What do you mean?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] vpapavas commented on a diff in pull request #12555: Optimize self-join

Posted by GitBox <gi...@apache.org>.
vpapavas commented on code in PR #12555:
URL: https://github.com/apache/kafka/pull/12555#discussion_r969358545


##########
streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java:
##########
@@ -1644,6 +1668,29 @@ private Map<String, Object> getClientCustomProps() {
         return props;
     }
 
+    public static List<String> verifyTopologyOptimizationConfigs(final String config) {
+        final List<String> acceptableConfigs = Arrays.asList(

Review Comment:
   Yes, I have been conflicted about this. I wanted to add it as a static variable as well but I am worried if someone else adds a new rewriting, they might miss it. Whereas in the method, they will for sure see it. WDYT?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] vpapavas commented on a diff in pull request #12555: Optimize self-join

Posted by GitBox <gi...@apache.org>.
vpapavas commented on code in PR #12555:
URL: https://github.com/apache/kafka/pull/12555#discussion_r970800187


##########
streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamSelfJoin.java:
##########
@@ -0,0 +1,144 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.kstream.internals;
+
+import static org.apache.kafka.streams.processor.internals.metrics.TaskMetrics.droppedRecordsSensor;
+
+import org.apache.kafka.common.metrics.Sensor;
+import org.apache.kafka.streams.KeyValue;
+import org.apache.kafka.streams.kstream.ValueJoinerWithKey;
+import org.apache.kafka.streams.kstream.internals.KStreamImplJoin.TimeTracker;
+import org.apache.kafka.streams.processor.api.ContextualProcessor;
+import org.apache.kafka.streams.processor.api.Processor;
+import org.apache.kafka.streams.processor.api.ProcessorContext;
+import org.apache.kafka.streams.processor.api.ProcessorSupplier;
+import org.apache.kafka.streams.processor.api.Record;
+import org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl;
+import org.apache.kafka.streams.state.WindowStore;
+import org.apache.kafka.streams.state.WindowStoreIterator;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+class KStreamKStreamSelfJoin<K, V1, V2, VOut> implements ProcessorSupplier<K, V1, K, VOut> {
+    private static final Logger LOG = LoggerFactory.getLogger(KStreamKStreamSelfJoin.class);
+
+    private final String windowName;
+    private final long joinThisBeforeMs;
+    private final long joinThisAfterMs;
+    private final long joinOtherBeforeMs;
+    private final long joinOtherAfterMs;
+    private final ValueJoinerWithKey<? super K, ? super V1, ? super V2, ? extends VOut> joinerThis;
+
+    private final TimeTracker sharedTimeTracker;
+
+    KStreamKStreamSelfJoin(
+        final String windowName,
+        final JoinWindowsInternal windows,
+        final ValueJoinerWithKey<? super K, ? super V1, ? super V2, ? extends VOut> joinerThis,
+        final TimeTracker sharedTimeTracker) {
+
+        this.windowName = windowName;
+        this.joinThisBeforeMs = windows.beforeMs;
+        this.joinThisAfterMs = windows.afterMs;
+        this.joinOtherBeforeMs = windows.afterMs;
+        this.joinOtherAfterMs = windows.beforeMs;
+        this.joinerThis = joinerThis;
+        this.sharedTimeTracker = sharedTimeTracker;
+    }
+
+    @Override
+    public Processor<K, V1, K, VOut> get() {
+        return new KStreamKStreamSelfJoinProcessor();
+    }
+
+    private class KStreamKStreamSelfJoinProcessor extends ContextualProcessor<K, V1, K, VOut> {
+        private WindowStore<K, V2> windowStore;
+        private Sensor droppedRecordsSensor;
+
+        @Override
+        public void init(final ProcessorContext<K, VOut> context) {
+            super.init(context);
+
+            final StreamsMetricsImpl metrics = (StreamsMetricsImpl) context.metrics();
+            droppedRecordsSensor = droppedRecordsSensor(Thread.currentThread().getName(), context.taskId().toString(), metrics);
+            windowStore = context.getStateStore(windowName);
+        }
+
+        @SuppressWarnings("unchecked")
+        @Override
+        public void process(final Record<K, V1> record) {
+            if (StreamStreamJoinUtil.skipRecord(record, LOG, droppedRecordsSensor, context())) {
+                return;
+            }
+
+            final long inputRecordTimestamp = record.timestamp();
+            long timeFrom = Math.max(0L, inputRecordTimestamp - joinThisBeforeMs);

Review Comment:
   They are not the same: the first uses the `joinThisBeforeMs` and the second uses the `joinOtherBeforeMs`. This is needed as the inner join (for a reason I have not understood cc @vvcephei ) uses different intervals when fetching rows from the window store based on whether it is the left or right-hand side. Since we want the self-join to match the output of the inner-join, I followed the same logic.  



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] vpapavas commented on a diff in pull request #12555: Optimize self-join

Posted by GitBox <gi...@apache.org>.
vpapavas commented on code in PR #12555:
URL: https://github.com/apache/kafka/pull/12555#discussion_r962965501


##########
streams/src/main/java/org/apache/kafka/streams/kstream/internals/InternalStreamsBuilder.java:
##########
@@ -356,6 +362,95 @@ private void mergeDuplicateSourceNodes() {
         }
     }
 
+    /**
+     * If the join is a self-join, remove the node KStreamJoinWindow corresponding to the
+     */
+    @SuppressWarnings("unchecked")
+    private void rewriteSelfJoin(final GraphNode currentNode, final Set<GraphNode> visited) {
+        visited.add(currentNode);
+        if (currentNode instanceof StreamStreamJoinNode && isSelfJoin(currentNode)) {
+            ((StreamStreamJoinNode) currentNode).setSelfJoin();
+            // Remove JoinOtherWindowed node
+            final GraphNode parent = currentNode.parentNodes().stream().findFirst().get();
+            GraphNode left = null, right = null;
+            for (final GraphNode child: parent.children()) {
+                if (child instanceof  ProcessorGraphNode
+                    && isStreamJoinWindowNode((ProcessorGraphNode) child)) {
+                    if (left == null) {
+                        left = child;
+                    } else {
+                        right = child;
+                    }
+                }
+            }
+            // Sanity check
+            if (left != null && right != null && left.buildPriority() < right.buildPriority()) {
+                parent.removeChild(right);
+            }
+        }
+        for (final GraphNode child: currentNode.children()) {
+            if (!visited.contains(child)) {
+                rewriteSelfJoin(child, visited);
+            }
+        }
+    }
+
+    /**
+     * The self-join rewriting can be applied if:
+     * 1. The path from the StreamStreamJoinNode to the root contains a single source node.
+     * 2. The StreamStreamJoinNode has a single parent.
+     * 3. There are no other nodes besides the KStreamJoinWindow  that are siblings of the
+     * StreamStreamJoinNode and have smaller build priority.
+     */
+    private boolean isSelfJoin(final GraphNode streamJoinNode) {
+        final AtomicInteger count = new AtomicInteger();
+        countSourceNodes(count, streamJoinNode, new HashSet<>());
+        if (count.get() > 1) {
+            return false;
+        }
+        if (streamJoinNode.parentNodes().size() > 1) {
+            return false;
+        }
+        for (final GraphNode parent: streamJoinNode.parentNodes()) {
+            for (final GraphNode sibling : parent.children()) {
+                if (sibling instanceof ProcessorGraphNode) {
+                    if (isStreamJoinWindowNode((ProcessorGraphNode) sibling)) {
+                        continue;
+                    }
+                }
+                if (sibling != streamJoinNode
+                    && sibling.buildPriority() < streamJoinNode.buildPriority()) {
+                    return false;
+                }
+            }
+        }
+        return true;
+    }
+
+    private void countSourceNodes(
+        final AtomicInteger count,
+        final GraphNode currentNode,
+        final Set<GraphNode> visited) {
+
+        if (currentNode instanceof StreamSourceNode) {
+            count.incrementAndGet();
+        }
+
+        for (final GraphNode parent: currentNode.parentNodes()) {
+            if (!visited.contains(parent)) {
+                visited.add(parent);
+                countSourceNodes(count, parent, visited);
+            }
+        }
+    }
+
+    private boolean isStreamJoinWindowNode(final ProcessorGraphNode node) {
+        return node.processorParameters() != null

Review Comment:
   We can apply the self-join optimization in the above example



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] vpapavas commented on a diff in pull request #12555: Optimize self-join

Posted by GitBox <gi...@apache.org>.
vpapavas commented on code in PR #12555:
URL: https://github.com/apache/kafka/pull/12555#discussion_r960845326


##########
streams/src/main/java/org/apache/kafka/streams/kstream/internals/InternalStreamsBuilder.java:
##########
@@ -356,6 +381,95 @@ private void mergeDuplicateSourceNodes() {
         }
     }
 
+    /**
+     * If the join is a self-join, remove the node KStreamJoinWindow corresponding to the
+     */
+    @SuppressWarnings("unchecked")
+    private void rewriteSelfJoin(final GraphNode currentNode, final Set<GraphNode> visited) {
+        visited.add(currentNode);
+        if (currentNode instanceof StreamStreamJoinNode && isSelfJoin(currentNode)) {
+            ((StreamStreamJoinNode) currentNode).setSelfJoin();
+            // Remove JoinOtherWindowed node
+            final GraphNode parent = currentNode.parentNodes().stream().findFirst().get();
+            GraphNode left = null, right = null;
+            for (final GraphNode child: parent.children()) {
+                if (child instanceof  ProcessorGraphNode
+                    && isStreamJoinWindowNode((ProcessorGraphNode) child)) {
+                    if (left == null) {
+                        left = child;
+                    } else {
+                        right = child;
+                    }
+                }
+            }
+            // Sanity check
+            if (left != null && right != null && left.buildPriority() < right.buildPriority()) {
+                parent.removeChild(right);
+            }
+        }
+        for (final GraphNode child: currentNode.children()) {
+            if (!visited.contains(child)) {
+                rewriteSelfJoin(child, visited);

Review Comment:
   We can have n-way self-joins but they won't get optimized, only the first one will get optimized. However, we can have topologies that have more than one self-joins, not n-way but independent pairs. And all of them can and should be optimized. I added a test case in `InternalStreamsBuilderTest:shouldMarkAllStreamStreamJoinsAsSelfJoin`



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] vpapavas commented on a diff in pull request #12555: Optimize self-join

Posted by GitBox <gi...@apache.org>.
vpapavas commented on code in PR #12555:
URL: https://github.com/apache/kafka/pull/12555#discussion_r961032133


##########
streams/src/main/java/org/apache/kafka/streams/kstream/internals/InternalStreamsBuilder.java:
##########
@@ -270,16 +272,20 @@ private void maybeAddNodeForOptimizationMetadata(final GraphNode node) {
 
     // use this method for testing only
     public void buildAndOptimizeTopology() {
-        buildAndOptimizeTopology(false);
+        buildAndOptimizeTopology(false, false);
     }
 
-    public void buildAndOptimizeTopology(final boolean optimizeTopology) {
+    public void buildAndOptimizeTopology(
+        final boolean optimizeTopology, final boolean optimizeSelfJoin) {
 
         mergeDuplicateSourceNodes();
         if (optimizeTopology) {
             LOG.debug("Optimizing the Kafka Streams graph for repartition nodes");
             optimizeKTableSourceTopics();
             maybeOptimizeRepartitionOperations();
+            if (optimizeSelfJoin) {

Review Comment:
   Discussed offline with @guozhangwang .
   
   If the `map` or other transformation is a sibling of the join node but the join node has a single parent, then this means this transformation is a no-op and hence the optimization can be applied. For the transformation to not be a no-op, the join node must have two or more parents, hence the optimization won't be applicable. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] guozhangwang commented on a diff in pull request #12555: Optimize self-join

Posted by GitBox <gi...@apache.org>.
guozhangwang commented on code in PR #12555:
URL: https://github.com/apache/kafka/pull/12555#discussion_r961002924


##########
streams/src/main/java/org/apache/kafka/streams/kstream/internals/InternalStreamsBuilder.java:
##########
@@ -356,6 +381,95 @@ private void mergeDuplicateSourceNodes() {
         }
     }
 
+    /**
+     * If the join is a self-join, remove the node KStreamJoinWindow corresponding to the
+     */
+    @SuppressWarnings("unchecked")
+    private void rewriteSelfJoin(final GraphNode currentNode, final Set<GraphNode> visited) {
+        visited.add(currentNode);
+        if (currentNode instanceof StreamStreamJoinNode && isSelfJoin(currentNode)) {
+            ((StreamStreamJoinNode) currentNode).setSelfJoin();
+            // Remove JoinOtherWindowed node
+            final GraphNode parent = currentNode.parentNodes().stream().findFirst().get();
+            GraphNode left = null, right = null;
+            for (final GraphNode child: parent.children()) {
+                if (child instanceof  ProcessorGraphNode
+                    && isStreamJoinWindowNode((ProcessorGraphNode) child)) {
+                    if (left == null) {
+                        left = child;
+                    } else {
+                        right = child;
+                    }
+                }
+            }
+            // Sanity check
+            if (left != null && right != null && left.buildPriority() < right.buildPriority()) {
+                parent.removeChild(right);
+            }
+        }
+        for (final GraphNode child: currentNode.children()) {
+            if (!visited.contains(child)) {
+                rewriteSelfJoin(child, visited);
+            }
+        }
+    }
+
+    /**
+     * The self-join rewriting can be applied if:
+     * 1. The path from the StreamStreamJoinNode to the root contains a single source node.
+     * 2. The StreamStreamJoinNode has a single parent.
+     * 3. There are no other nodes besides the KStreamJoinWindow  that are siblings of the
+     * StreamStreamJoinNode and have smaller build priority.

Review Comment:
   Just curious, in the above case, logically speaking since we are not reassigning the map-valued result stream back to stream1 like `stream1 = stream1.mapValues()`, this step is basically a no-op and should not affect stream1 right?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] vpapavas commented on a diff in pull request #12555: Optimize self-join

Posted by GitBox <gi...@apache.org>.
vpapavas commented on code in PR #12555:
URL: https://github.com/apache/kafka/pull/12555#discussion_r958321621


##########
streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java:
##########
@@ -663,6 +663,12 @@ public class StreamsConfig extends AbstractConfig {
     public static final String TOPOLOGY_OPTIMIZATION_CONFIG = "topology.optimization";
     private static final String TOPOLOGY_OPTIMIZATION_DOC = "A configuration telling Kafka Streams if it should optimize the topology, disabled by default";
 
+    public static final String SELF_JOIN_OPTIMIZATION_CONFIG = "self.join.optimization";

Review Comment:
   I agree with your suggestion in the KIP and will make the changes.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] vpapavas closed pull request #12555: Optimize self-join

Posted by GitBox <gi...@apache.org>.
vpapavas closed pull request #12555: Optimize self-join
URL: https://github.com/apache/kafka/pull/12555


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] vpapavas commented on a diff in pull request #12555: Optimize self-join

Posted by GitBox <gi...@apache.org>.
vpapavas commented on code in PR #12555:
URL: https://github.com/apache/kafka/pull/12555#discussion_r969371482


##########
streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamImplJoin.java:
##########
@@ -181,13 +181,22 @@ public <K, V1, V2, VOut> KStream<K, VOut> join(final KStream<K, V1> lhs,
             sharedTimeTracker
         );
 
+        final KStreamKStreamSelfJoin<K, V1, V2, VOut> selfJoin = new KStreamKStreamSelfJoin<>(
+            thisWindowStore.name(),
+            internalWindows,
+            joiner,
+            AbstractStream.reverseJoinerWithKey(joiner),
+            sharedTimeTracker
+        );

Review Comment:
   Yes, unfortunately, this is done before the logical plan is created and optimized and this is the only place where I can get the information needed to create this node. I tried to do this when building the topology after the rewriting but it was impossible without major refactoring as currently,  there is not a clear separation between logical and physical plan. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] jnh5y commented on a diff in pull request #12555: Optimize self-join

Posted by GitBox <gi...@apache.org>.
jnh5y commented on code in PR #12555:
URL: https://github.com/apache/kafka/pull/12555#discussion_r968974016


##########
streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java:
##########
@@ -1644,6 +1668,29 @@ private Map<String, Object> getClientCustomProps() {
         return props;
     }
 
+    public static List<String> verifyTopologyOptimizationConfigs(final String config) {
+        final List<String> acceptableConfigs = Arrays.asList(

Review Comment:
   I wonder if this list of optimization flags should be a static variable somewhere?  
   
   Maybe it is fine if it is only used here.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] jnh5y commented on a diff in pull request #12555: Optimize self-join

Posted by GitBox <gi...@apache.org>.
jnh5y commented on code in PR #12555:
URL: https://github.com/apache/kafka/pull/12555#discussion_r968986084


##########
streams/src/main/java/org/apache/kafka/streams/kstream/internals/InternalStreamsBuilder.java:
##########
@@ -356,6 +381,95 @@ private void mergeDuplicateSourceNodes() {
         }
     }
 
+    /**
+     * If the join is a self-join, remove the node KStreamJoinWindow corresponding to the
+     */
+    @SuppressWarnings("unchecked")
+    private void rewriteSelfJoin(final GraphNode currentNode, final Set<GraphNode> visited) {
+        visited.add(currentNode);
+        if (currentNode instanceof StreamStreamJoinNode && isSelfJoin(currentNode)) {
+            ((StreamStreamJoinNode) currentNode).setSelfJoin();
+            // Remove JoinOtherWindowed node
+            final GraphNode parent = currentNode.parentNodes().stream().findFirst().get();
+            GraphNode left = null, right = null;
+            for (final GraphNode child: parent.children()) {

Review Comment:
   If there are more than 2 children, is it the case that only two satisfy `isStreamJoinWindowNode`?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] vpapavas commented on a diff in pull request #12555: Optimize self-join

Posted by GitBox <gi...@apache.org>.
vpapavas commented on code in PR #12555:
URL: https://github.com/apache/kafka/pull/12555#discussion_r959328394


##########
streams/src/main/java/org/apache/kafka/streams/kstream/internals/InternalStreamsBuilder.java:
##########
@@ -270,16 +272,20 @@ private void maybeAddNodeForOptimizationMetadata(final GraphNode node) {
 
     // use this method for testing only
     public void buildAndOptimizeTopology() {
-        buildAndOptimizeTopology(false);
+        buildAndOptimizeTopology(false, false);
     }
 
-    public void buildAndOptimizeTopology(final boolean optimizeTopology) {
+    public void buildAndOptimizeTopology(
+        final boolean optimizeTopology, final boolean optimizeSelfJoin) {
 
         mergeDuplicateSourceNodes();
         if (optimizeTopology) {
             LOG.debug("Optimizing the Kafka Streams graph for repartition nodes");
             optimizeKTableSourceTopics();
             maybeOptimizeRepartitionOperations();
+            if (optimizeSelfJoin) {

Review Comment:
   The main goal of the conditions is to ensure that the join arguments are the same and are not altered somewhere along the path from the root to the join in a unilateral (only one side) way.
   
   The first condition is needed to make sure that the join has a single source ancestor. We don't want to apply the optimization if there is another source somewhere along the path from the root to the join node. This path might be long and might go through multiple joins etc. so just looking for a single parent is not enough. 
   
   The second condition is needed to ensure that no transformation is applied to one side of the join and not the other.
   
   The third condition is needed for the same reason because in my tests, I have seen topologies where the `ProcessorNodes` are added as siblings to the join and not as parents of it. For instance, in this example:
   ```
           final KStream<String, String> stream1 = builder.stream(Collections.singleton("t1"), consumed);
           stream1.mapValues(v -> v);
           final KStream<String, String> stream2 = builder.stream(Collections.singleton("t1"), consumed);
           stream1.join(stream2, MockValueJoiner.TOSTRING_JOINER, 
                                JoinWindows.ofTimeDifferenceWithNoGrace(ofMillis(100)));
   ```
   the node for the `map` is a sibling of the join node and not a parent.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] vpapavas commented on a diff in pull request #12555: Optimize self-join

Posted by GitBox <gi...@apache.org>.
vpapavas commented on code in PR #12555:
URL: https://github.com/apache/kafka/pull/12555#discussion_r959392484


##########
streams/src/main/java/org/apache/kafka/streams/kstream/internals/InternalStreamsBuilder.java:
##########
@@ -356,6 +362,95 @@ private void mergeDuplicateSourceNodes() {
         }
     }
 
+    /**
+     * If the join is a self-join, remove the node KStreamJoinWindow corresponding to the
+     */
+    @SuppressWarnings("unchecked")
+    private void rewriteSelfJoin(final GraphNode currentNode, final Set<GraphNode> visited) {
+        visited.add(currentNode);
+        if (currentNode instanceof StreamStreamJoinNode && isSelfJoin(currentNode)) {
+            ((StreamStreamJoinNode) currentNode).setSelfJoin();
+            // Remove JoinOtherWindowed node
+            final GraphNode parent = currentNode.parentNodes().stream().findFirst().get();
+            GraphNode left = null, right = null;
+            for (final GraphNode child: parent.children()) {
+                if (child instanceof  ProcessorGraphNode
+                    && isStreamJoinWindowNode((ProcessorGraphNode) child)) {
+                    if (left == null) {
+                        left = child;
+                    } else {
+                        right = child;
+                    }
+                }
+            }
+            // Sanity check
+            if (left != null && right != null && left.buildPriority() < right.buildPriority()) {
+                parent.removeChild(right);
+            }
+        }
+        for (final GraphNode child: currentNode.children()) {
+            if (!visited.contains(child)) {
+                rewriteSelfJoin(child, visited);
+            }
+        }
+    }
+
+    /**
+     * The self-join rewriting can be applied if:
+     * 1. The path from the StreamStreamJoinNode to the root contains a single source node.
+     * 2. The StreamStreamJoinNode has a single parent.
+     * 3. There are no other nodes besides the KStreamJoinWindow  that are siblings of the
+     * StreamStreamJoinNode and have smaller build priority.
+     */
+    private boolean isSelfJoin(final GraphNode streamJoinNode) {
+        final AtomicInteger count = new AtomicInteger();
+        countSourceNodes(count, streamJoinNode, new HashSet<>());
+        if (count.get() > 1) {
+            return false;
+        }
+        if (streamJoinNode.parentNodes().size() > 1) {
+            return false;
+        }
+        for (final GraphNode parent: streamJoinNode.parentNodes()) {
+            for (final GraphNode sibling : parent.children()) {
+                if (sibling instanceof ProcessorGraphNode) {
+                    if (isStreamJoinWindowNode((ProcessorGraphNode) sibling)) {
+                        continue;
+                    }
+                }
+                if (sibling != streamJoinNode
+                    && sibling.buildPriority() < streamJoinNode.buildPriority()) {
+                    return false;
+                }
+            }
+        }
+        return true;
+    }
+
+    private void countSourceNodes(
+        final AtomicInteger count,
+        final GraphNode currentNode,
+        final Set<GraphNode> visited) {
+
+        if (currentNode instanceof StreamSourceNode) {
+            count.incrementAndGet();
+        }
+
+        for (final GraphNode parent: currentNode.parentNodes()) {
+            if (!visited.contains(parent)) {
+                visited.add(parent);
+                countSourceNodes(count, parent, visited);
+            }
+        }
+    }
+
+    private boolean isStreamJoinWindowNode(final ProcessorGraphNode node) {
+        return node.processorParameters() != null

Review Comment:
   In this code example 
   ```
           final KStream<String, String> stream1 = builder.stream(Collections.singleton("t1"), consumed);
           stream1.mapValues(v -> v);
           final KStream<String, String> stream2 = builder.stream(Collections.singleton("t1"), consumed);
           stream1.join(stream2, MockValueJoiner.TOSTRING_JOINER, 
                                JoinWindows.ofTimeDifferenceWithNoGrace(ofMillis(100)));
   ```
   the `node.processorParameters().processorSupplier()` is null. I added the first check just to cover my bases. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] vpapavas commented on a diff in pull request #12555: Optimize self-join

Posted by GitBox <gi...@apache.org>.
vpapavas commented on code in PR #12555:
URL: https://github.com/apache/kafka/pull/12555#discussion_r960457146


##########
streams/src/main/java/org/apache/kafka/streams/kstream/internals/InternalStreamsBuilder.java:
##########
@@ -270,16 +272,20 @@ private void maybeAddNodeForOptimizationMetadata(final GraphNode node) {
 
     // use this method for testing only
     public void buildAndOptimizeTopology() {
-        buildAndOptimizeTopology(false);
+        buildAndOptimizeTopology(false, false);
     }
 
-    public void buildAndOptimizeTopology(final boolean optimizeTopology) {
+    public void buildAndOptimizeTopology(
+        final boolean optimizeTopology, final boolean optimizeSelfJoin) {
 
         mergeDuplicateSourceNodes();
         if (optimizeTopology) {
             LOG.debug("Optimizing the Kafka Streams graph for repartition nodes");
             optimizeKTableSourceTopics();
             maybeOptimizeRepartitionOperations();
+            if (optimizeSelfJoin) {

Review Comment:
   Hey @guozhangwang ! 
   Cases 1 and 2 are optimizable and will be optimized by the algorithm. I have tests for this cases in `InternalStreamsBuilderTest`.  
   Case 3 is not optimizable and won't be recognized. The reason is that processors like `mapValues` or `filter` or `transform` are black-boxes. There is no way to know how they change the contents of a stream hence there is no way to figure out if the two sides of the join are still the same. 
   Case 4 could be optimizable but I did not consider it. I initially only had in-scope cases like 1 and 2. I can add it by adding a special rule to the rewriter that would check if the parent of the join is a merge, then it's ok to have multiple source nodes as long as they are ancestors of the merge node. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] vvcephei commented on a diff in pull request #12555: Optimize self-join

Posted by GitBox <gi...@apache.org>.
vvcephei commented on code in PR #12555:
URL: https://github.com/apache/kafka/pull/12555#discussion_r969070840


##########
streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java:
##########
@@ -243,6 +244,26 @@ public class StreamsConfig extends AbstractConfig {
      */
     public static final String OPTIMIZE = "all";
 
+    /**
+     * Config value for parameter {@link #TOPOLOGY_OPTIMIZATION_CONFIG "topology.optimization"}
+     * for enabling the specific optimization that reuses source topic as changelog topic
+     * for KTables.
+     */
+    public static final String REUSE_KTABLE_SOURCE_TOPICS = "reuse.ktable.source.topics";
+
+    /**
+     * Config value for parameter {@link #TOPOLOGY_OPTIMIZATION_CONFIG "topology.optimization"}
+     * for enabling the specific optimization that merges duplicated repartition topics.
+     */
+    public static final String MERGE_REPARTITION_TOPICS = "merge.repartition.topics";
+
+    /**
+     * Config value for parameter {@link #TOPOLOGY_OPTIMIZATION_CONFIG "topology.optimization"}
+     * for enabling the optimization that optimizes inner stream-stream joins into self-joins when
+     * both arguments are the same stream.
+     */
+    public static final String SELF_JOIN = "self.join";

Review Comment:
   Just a reminder to update this to match the KIP.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] vpapavas commented on a diff in pull request #12555: Optimize self-join

Posted by GitBox <gi...@apache.org>.
vpapavas commented on code in PR #12555:
URL: https://github.com/apache/kafka/pull/12555#discussion_r969353364


##########
streams/src/main/java/org/apache/kafka/streams/kstream/internals/InternalStreamsBuilder.java:
##########
@@ -305,6 +307,36 @@ public void buildAndOptimizeTopology(final boolean optimizeTopology) {
         internalTopologyBuilder.validateCopartition();
     }
 
+    /**
+     * A user can provide either the config OPTIMIZE which means all optimizations rules will be
+     * applied or they can provide a list of optimization rules.
+     */
+    private void optimizeTopology(final Properties props) {
+        final List<String> optimizationConfigs;
+        if (props == null || !props.containsKey(StreamsConfig.TOPOLOGY_OPTIMIZATION_CONFIG)) {
+            optimizationConfigs = new ArrayList<>();
+            optimizationConfigs.add(StreamsConfig.NO_OPTIMIZATION);
+        } else {
+            optimizationConfigs = StreamsConfig.verifyTopologyOptimizationConfigs(
+                (String) props.get(StreamsConfig.TOPOLOGY_OPTIMIZATION_CONFIG));
+        }
+        if (optimizationConfigs.contains(StreamsConfig.OPTIMIZE)
+            || optimizationConfigs.contains(StreamsConfig.REUSE_KTABLE_SOURCE_TOPICS)) {
+            LOG.debug("Optimizing the Kafka Streams graph for ktable source nodes");
+            optimizeKTableSourceTopics();
+        }
+        if (optimizationConfigs.contains(StreamsConfig.OPTIMIZE)
+            || optimizationConfigs.contains(StreamsConfig.MERGE_REPARTITION_TOPICS)) {
+            LOG.debug("Optimizing the Kafka Streams graph for repartition nodes");
+            maybeOptimizeRepartitionOperations();
+        }
+        if (optimizationConfigs.contains(StreamsConfig.OPTIMIZE)
+            || optimizationConfigs.contains(StreamsConfig.SELF_JOIN)) {
+            LOG.debug("Optimizing the Kafka Streams graph for self-joins");
+            rewriteSelfJoin(root, new IdentityHashMap<>());

Review Comment:
   You mean the order in which we check and apply the rewritings? I followed the same order as before, as in I added the self-join rewriting last. Technically, it shouldn't matter for the self-join at least.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] vvcephei commented on a diff in pull request #12555: Optimize self-join

Posted by GitBox <gi...@apache.org>.
vvcephei commented on code in PR #12555:
URL: https://github.com/apache/kafka/pull/12555#discussion_r956220721


##########
gradle.properties:
##########
@@ -20,7 +20,7 @@ group=org.apache.kafka
 #  - tests/kafkatest/__init__.py
 #  - tests/kafkatest/version.py (variable DEV_VERSION)
 #  - kafka-merge-pr.py
-version=3.3.0-SNAPSHOT
+version=3.3.0-VICKY2

Review Comment:
   TODO: we need to remove this from the PR.



##########
streams/src/main/java/org/apache/kafka/streams/kstream/internals/InternalStreamsBuilder.java:
##########
@@ -305,6 +307,36 @@ public void buildAndOptimizeTopology(final boolean optimizeTopology) {
         internalTopologyBuilder.validateCopartition();
     }
 
+    /**
+     * A user can provide either the config OPTIMIZE which means all optimizations rules will be
+     * applied or they can provide a list of optimization rules.
+     */
+    private void optimizeTopology(final Properties props) {
+        final List<String> optimizationConfigs;
+        if (props == null || !props.containsKey(StreamsConfig.TOPOLOGY_OPTIMIZATION_CONFIG)) {
+            optimizationConfigs = new ArrayList<>();
+            optimizationConfigs.add(StreamsConfig.NO_OPTIMIZATION);

Review Comment:
   could be `optimizationConfigs = Collections.singletonList`



##########
streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java:
##########
@@ -663,6 +663,12 @@ public class StreamsConfig extends AbstractConfig {
     public static final String TOPOLOGY_OPTIMIZATION_CONFIG = "topology.optimization";
     private static final String TOPOLOGY_OPTIMIZATION_DOC = "A configuration telling Kafka Streams if it should optimize the topology, disabled by default";
 
+    public static final String SELF_JOIN_OPTIMIZATION_CONFIG = "self.join.optimization";

Review Comment:
   Thanks for adding a separate config. I strongly feel this is the right approach for optimization flags.
   
   Can we make a config namespace convention to keep these things organized, like `topology.optimization.self.join`?



##########
streams/src/test/java/org/apache/kafka/streams/StreamsConfigTest.java:
##########
@@ -1263,6 +1263,37 @@ public void testInvalidSecurityProtocol() {
         assertTrue(ce.getMessage().contains(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG));
     }
 
+    @Test
+    public void shouldThrowExceptionWhenTopologyOptimizationOnAndOff() {

Review Comment:
   A more general prohibition would be to disallow OPTIMIZE and NO_OPTIMIZATION in conjunction with a comma at all.



##########
streams/src/test/java/org/apache/kafka/streams/StreamsConfigTest.java:
##########
@@ -1263,6 +1263,37 @@ public void testInvalidSecurityProtocol() {
         assertTrue(ce.getMessage().contains(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG));
     }
 
+    @Test
+    public void shouldThrowExceptionWhenTopologyOptimizationOnAndOff() {
+        final String value = String.join(",", StreamsConfig.OPTIMIZE, StreamsConfig.NO_OPTIMIZATION);
+        props.put(TOPOLOGY_OPTIMIZATION_CONFIG, value);
+        final ConfigException exception = assertThrows(ConfigException.class, () -> new StreamsConfig(props));
+        assertTrue(exception.getMessage().contains("A topology can either not be optimized with"));
+    }
+
+    @Test
+    public void shouldEnableSelfJoin() {
+        final String value = StreamsConfig.SELF_JOIN;
+        props.put(TOPOLOGY_OPTIMIZATION_CONFIG, value);
+        final StreamsConfig config = new StreamsConfig(props);
+        assertEquals(config.getString(TOPOLOGY_OPTIMIZATION_CONFIG), StreamsConfig.SELF_JOIN);
+    }
+
+    @Test
+    public void shouldMultipleOptimizations() {

Review Comment:
   I get what you mean, but "should multiple optimizations" isn't exactly a sensible statement :)
   
   By the way, we might want to add at least one more test that we get the right error if you try to include some extra garbage flag in the list.



##########
streams/src/main/java/org/apache/kafka/streams/kstream/internals/InternalStreamsBuilder.java:
##########
@@ -305,6 +307,36 @@ public void buildAndOptimizeTopology(final boolean optimizeTopology) {
         internalTopologyBuilder.validateCopartition();
     }
 
+    /**
+     * A user can provide either the config OPTIMIZE which means all optimizations rules will be
+     * applied or they can provide a list of optimization rules.
+     */
+    private void optimizeTopology(final Properties props) {
+        final List<String> optimizationConfigs;
+        if (props == null || !props.containsKey(StreamsConfig.TOPOLOGY_OPTIMIZATION_CONFIG)) {
+            optimizationConfigs = new ArrayList<>();
+            optimizationConfigs.add(StreamsConfig.NO_OPTIMIZATION);

Review Comment:
   Also, maybe we can just pack this logic into the `StreamsConfig.verifyTopologyOptimizationConfigs` method:
   1. if NO_OPTIMIZATION, return empty set
   2. else if OPTIMIZE, add all the optimization flags to the set
   3. else split on comma and add each configured flag to the set
   
   Then, the later logic can do a simple `contains` check.



##########
streams/src/test/java/org/apache/kafka/streams/integration/SelfJoinUpgradeIntegrationTest.java:
##########
@@ -0,0 +1,277 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.streams.integration;
+
+import static java.time.Duration.ofMinutes;
+import static java.util.Arrays.asList;
+import static java.util.Collections.singletonList;
+import static org.apache.kafka.streams.integration.utils.IntegrationTestUtils.safeUniqueTestName;
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.hamcrest.core.Is.is;
+
+import java.io.IOException;
+import java.time.Duration;
+import java.util.List;
+import java.util.Properties;
+import org.apache.kafka.clients.consumer.ConsumerConfig;
+import org.apache.kafka.common.serialization.Serdes;
+import org.apache.kafka.common.serialization.StringDeserializer;
+import org.apache.kafka.common.serialization.StringSerializer;
+import org.apache.kafka.streams.KafkaStreams;
+import org.apache.kafka.streams.KeyValue;
+import org.apache.kafka.streams.KeyValueTimestamp;
+import org.apache.kafka.streams.StreamsBuilder;
+import org.apache.kafka.streams.StreamsConfig;
+import org.apache.kafka.streams.integration.utils.EmbeddedKafkaCluster;
+import org.apache.kafka.streams.integration.utils.IntegrationTestUtils;
+import org.apache.kafka.streams.kstream.Consumed;
+import org.apache.kafka.streams.kstream.JoinWindows;
+import org.apache.kafka.streams.kstream.KStream;
+import org.apache.kafka.streams.kstream.Produced;
+import org.apache.kafka.streams.kstream.ValueJoiner;
+import org.apache.kafka.test.IntegrationTest;
+import org.apache.kafka.test.TestUtils;
+import org.junit.After;
+import org.junit.AfterClass;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+import org.junit.rules.TestName;
+
+@Category({IntegrationTest.class})
+public class SelfJoinUpgradeIntegrationTest {
+    public static final String INPUT_TOPIC = "selfjoin-input";
+    public static final String OUTPUT_TOPIC = "selfjoin-output";
+    private String inputTopic;
+    private String outputTopic;
+
+    private KafkaStreams kafkaStreams;
+
+    public static final EmbeddedKafkaCluster CLUSTER = new EmbeddedKafkaCluster(1);
+
+    @BeforeClass
+    public static void startCluster() throws IOException {
+        CLUSTER.start();
+    }
+
+    @AfterClass
+    public static void closeCluster() {
+        CLUSTER.stop();
+    }
+
+    @Rule
+    public TestName testName = new TestName();
+
+    @Before
+    public void createTopics() throws Exception {
+        inputTopic = INPUT_TOPIC + safeUniqueTestName(getClass(), testName);
+        outputTopic = OUTPUT_TOPIC + safeUniqueTestName(getClass(), testName);
+        CLUSTER.createTopic(inputTopic);
+        CLUSTER.createTopic(outputTopic);
+    }
+
+    private Properties props() {
+        final Properties streamsConfiguration = new Properties();
+        final String safeTestName = safeUniqueTestName(getClass(), testName);
+        streamsConfiguration.put(StreamsConfig.APPLICATION_ID_CONFIG, "app-" + safeTestName);
+        streamsConfiguration.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, CLUSTER.bootstrapServers());
+        streamsConfiguration.put(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG, 0);
+        streamsConfiguration.put(StreamsConfig.STATE_DIR_CONFIG, TestUtils.tempDirectory().getPath());
+        streamsConfiguration.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG,
+                                 Serdes.String().getClass());
+        streamsConfiguration.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG,
+                                 Serdes.String().getClass());
+        streamsConfiguration.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, 1000L);
+        streamsConfiguration.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
+        return streamsConfiguration;
+    }
+
+    @After
+    public void shutdown() {
+        if (kafkaStreams != null) {
+            kafkaStreams.close(Duration.ofSeconds(30L));
+            kafkaStreams.cleanUp();
+        }
+    }
+
+
+    @Test
+    @SuppressWarnings("unchecked")
+    public void shouldUpgradeWithTopologyOptimizationOff() throws Exception {
+
+        final StreamsBuilder streamsBuilderOld = new StreamsBuilder();
+        final KStream<String, String> leftOld = streamsBuilderOld.stream(
+            inputTopic, Consumed.with(Serdes.String(), Serdes.String()));
+        final ValueJoiner<String, String, String> valueJoiner = (v, v2) -> v + v2;
+        final KStream<String, String> joinedOld = leftOld.join(
+            leftOld,
+            valueJoiner,
+            JoinWindows.ofTimeDifferenceWithNoGrace(ofMinutes(100))
+        );
+        joinedOld.to(outputTopic, Produced.with(Serdes.String(), Serdes.String()));
+
+
+        final Properties props = props();
+        kafkaStreams = new KafkaStreams(streamsBuilderOld.build(), props);
+        kafkaStreams.start();
+
+        final long currentTime = CLUSTER.time.milliseconds();
+        processKeyValueAndVerifyCount("1", "A", currentTime + 42L, asList(
+            new KeyValueTimestamp<String, String>("1", "AA", currentTime + 42L)));
+
+        processKeyValueAndVerifyCount("1", "B", currentTime + 43L, asList(
+            new KeyValueTimestamp("1", "BA", currentTime + 43L),
+            new KeyValueTimestamp("1", "AB", currentTime + 43L),
+            new KeyValueTimestamp("1", "BB", currentTime + 43L)));
+
+
+        kafkaStreams.close();
+        kafkaStreams = null;
+
+        final StreamsBuilder streamsBuilderNew = new StreamsBuilder();
+
+        final KStream<String, String> leftNew = streamsBuilderNew.stream(
+            inputTopic, Consumed.with(Serdes.String(), Serdes.String()));
+        final KStream<String, String> joinedNew = leftNew.join(
+            leftNew,
+            valueJoiner,
+            JoinWindows.ofTimeDifferenceWithNoGrace(ofMinutes(100))
+        );
+        joinedNew.to(outputTopic, Produced.with(Serdes.String(), Serdes.String()));

Review Comment:
   This is the same as the first StreamsBuilder, right? I don't think you need a new one for the new instance.



##########
streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamImplJoin.java:
##########
@@ -181,13 +181,22 @@ public <K, V1, V2, VOut> KStream<K, VOut> join(final KStream<K, V1> lhs,
             sharedTimeTracker
         );
 
+        final KStreamKStreamSelfJoin<K, V1, V2, VOut> selfJoin = new KStreamKStreamSelfJoin<>(
+            thisWindowStore.name(),
+            internalWindows,
+            joiner,
+            AbstractStream.reverseJoinerWithKey(joiner),
+            sharedTimeTracker
+        );

Review Comment:
   Interesting... So, we're unconditionally building this node in case it turns out to be an optimizable self join later?



##########
streams/src/main/java/org/apache/kafka/streams/kstream/internals/InternalStreamsBuilder.java:
##########
@@ -270,16 +272,20 @@ private void maybeAddNodeForOptimizationMetadata(final GraphNode node) {
 
     // use this method for testing only
     public void buildAndOptimizeTopology() {
-        buildAndOptimizeTopology(false);
+        buildAndOptimizeTopology(false, false);
     }
 
-    public void buildAndOptimizeTopology(final boolean optimizeTopology) {
+    public void buildAndOptimizeTopology(
+        final boolean optimizeTopology, final boolean optimizeSelfJoin) {
 
         mergeDuplicateSourceNodes();
         if (optimizeTopology) {
             LOG.debug("Optimizing the Kafka Streams graph for repartition nodes");
             optimizeKTableSourceTopics();
             maybeOptimizeRepartitionOperations();
+            if (optimizeSelfJoin) {

Review Comment:
   Do you want to couple this to the top-level optimization flag? I'm ok with it, but I think it would be less confusing if they were just independent. Otherwise, you can't enable this optimization without enabling the other ones as well.



##########
streams/src/main/java/org/apache/kafka/streams/kstream/internals/StreamStreamJoinProcessor.java:
##########
@@ -0,0 +1,55 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.streams.kstream.internals;
+
+import org.apache.kafka.common.metrics.Sensor;
+import org.apache.kafka.streams.processor.api.ContextualProcessor;
+import org.apache.kafka.streams.processor.api.Record;
+import org.apache.kafka.streams.processor.api.RecordMetadata;
+import org.slf4j.Logger;
+
+public abstract class StreamStreamJoinProcessor<KIn, VIn, KOut, VOut> extends ContextualProcessor<KIn, VIn, KOut, VOut> {
+
+    protected boolean skipRecord(

Review Comment:
   We don't need an inheritance hierarchy just to add a utility method. Can we instead make this an uninstantiable util class with a static version of this method?



##########
streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamJoin.java:
##########
@@ -122,26 +120,8 @@ public void init(final ProcessorContext<K, VOut> context) {
         @SuppressWarnings("unchecked")
         @Override
         public void process(final Record<K, V1> record) {
-            // we do join iff keys are equal, thus, if key is null we cannot join and just ignore the record
-            //
-            // we also ignore the record if value is null, because in a key-value data model a null-value indicates
-            // an empty message (ie, there is nothing to be joined) -- this contrast SQL NULL semantics
-            // furthermore, on left/outer joins 'null' in ValueJoiner#apply() indicates a missing record --
-            // thus, to be consistent and to avoid ambiguous null semantics, null values are ignored
-            if (record.key() == null || record.value() == null) {
-                if (context().recordMetadata().isPresent()) {
-                    final RecordMetadata recordMetadata = context().recordMetadata().get();
-                    LOG.warn(
-                        "Skipping record due to null key or value. "
-                            + "topic=[{}] partition=[{}] offset=[{}]",
-                        recordMetadata.topic(), recordMetadata.partition(), recordMetadata.offset()
-                    );
-                } else {
-                    LOG.warn(
-                        "Skipping record due to null key or value. Topic, partition, and offset not known."
-                    );
-                }
-                droppedRecordsSensor.record();
+            System.out.println("---> IsLeft: " + isLeftSide + ".Processing record: " + record);

Review Comment:
   Obviously, we'll need to make a pass to remove stuff like this before merging.



##########
streams/src/test/java/org/apache/kafka/streams/integration/SelfJoinUpgradeIntegrationTest.java:
##########
@@ -0,0 +1,277 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.streams.integration;
+
+import static java.time.Duration.ofMinutes;
+import static java.util.Arrays.asList;
+import static java.util.Collections.singletonList;
+import static org.apache.kafka.streams.integration.utils.IntegrationTestUtils.safeUniqueTestName;
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.hamcrest.core.Is.is;
+
+import java.io.IOException;
+import java.time.Duration;
+import java.util.List;
+import java.util.Properties;
+import org.apache.kafka.clients.consumer.ConsumerConfig;
+import org.apache.kafka.common.serialization.Serdes;
+import org.apache.kafka.common.serialization.StringDeserializer;
+import org.apache.kafka.common.serialization.StringSerializer;
+import org.apache.kafka.streams.KafkaStreams;
+import org.apache.kafka.streams.KeyValue;
+import org.apache.kafka.streams.KeyValueTimestamp;
+import org.apache.kafka.streams.StreamsBuilder;
+import org.apache.kafka.streams.StreamsConfig;
+import org.apache.kafka.streams.integration.utils.EmbeddedKafkaCluster;
+import org.apache.kafka.streams.integration.utils.IntegrationTestUtils;
+import org.apache.kafka.streams.kstream.Consumed;
+import org.apache.kafka.streams.kstream.JoinWindows;
+import org.apache.kafka.streams.kstream.KStream;
+import org.apache.kafka.streams.kstream.Produced;
+import org.apache.kafka.streams.kstream.ValueJoiner;
+import org.apache.kafka.test.IntegrationTest;
+import org.apache.kafka.test.TestUtils;
+import org.junit.After;
+import org.junit.AfterClass;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+import org.junit.rules.TestName;
+
+@Category({IntegrationTest.class})
+public class SelfJoinUpgradeIntegrationTest {
+    public static final String INPUT_TOPIC = "selfjoin-input";
+    public static final String OUTPUT_TOPIC = "selfjoin-output";
+    private String inputTopic;
+    private String outputTopic;
+
+    private KafkaStreams kafkaStreams;
+
+    public static final EmbeddedKafkaCluster CLUSTER = new EmbeddedKafkaCluster(1);
+
+    @BeforeClass
+    public static void startCluster() throws IOException {
+        CLUSTER.start();
+    }
+
+    @AfterClass
+    public static void closeCluster() {
+        CLUSTER.stop();
+    }
+
+    @Rule
+    public TestName testName = new TestName();
+
+    @Before
+    public void createTopics() throws Exception {
+        inputTopic = INPUT_TOPIC + safeUniqueTestName(getClass(), testName);
+        outputTopic = OUTPUT_TOPIC + safeUniqueTestName(getClass(), testName);
+        CLUSTER.createTopic(inputTopic);
+        CLUSTER.createTopic(outputTopic);
+    }
+
+    private Properties props() {
+        final Properties streamsConfiguration = new Properties();
+        final String safeTestName = safeUniqueTestName(getClass(), testName);
+        streamsConfiguration.put(StreamsConfig.APPLICATION_ID_CONFIG, "app-" + safeTestName);
+        streamsConfiguration.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, CLUSTER.bootstrapServers());
+        streamsConfiguration.put(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG, 0);
+        streamsConfiguration.put(StreamsConfig.STATE_DIR_CONFIG, TestUtils.tempDirectory().getPath());
+        streamsConfiguration.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG,
+                                 Serdes.String().getClass());
+        streamsConfiguration.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG,
+                                 Serdes.String().getClass());
+        streamsConfiguration.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, 1000L);
+        streamsConfiguration.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
+        return streamsConfiguration;
+    }
+
+    @After
+    public void shutdown() {
+        if (kafkaStreams != null) {
+            kafkaStreams.close(Duration.ofSeconds(30L));
+            kafkaStreams.cleanUp();
+        }
+    }
+
+
+    @Test
+    @SuppressWarnings("unchecked")
+    public void shouldUpgradeWithTopologyOptimizationOff() throws Exception {
+
+        final StreamsBuilder streamsBuilderOld = new StreamsBuilder();
+        final KStream<String, String> leftOld = streamsBuilderOld.stream(
+            inputTopic, Consumed.with(Serdes.String(), Serdes.String()));
+        final ValueJoiner<String, String, String> valueJoiner = (v, v2) -> v + v2;
+        final KStream<String, String> joinedOld = leftOld.join(
+            leftOld,
+            valueJoiner,
+            JoinWindows.ofTimeDifferenceWithNoGrace(ofMinutes(100))
+        );
+        joinedOld.to(outputTopic, Produced.with(Serdes.String(), Serdes.String()));
+
+
+        final Properties props = props();
+        kafkaStreams = new KafkaStreams(streamsBuilderOld.build(), props);
+        kafkaStreams.start();
+
+        final long currentTime = CLUSTER.time.milliseconds();
+        processKeyValueAndVerifyCount("1", "A", currentTime + 42L, asList(
+            new KeyValueTimestamp<String, String>("1", "AA", currentTime + 42L)));
+
+        processKeyValueAndVerifyCount("1", "B", currentTime + 43L, asList(
+            new KeyValueTimestamp("1", "BA", currentTime + 43L),
+            new KeyValueTimestamp("1", "AB", currentTime + 43L),
+            new KeyValueTimestamp("1", "BB", currentTime + 43L)));
+
+
+        kafkaStreams.close();
+        kafkaStreams = null;
+
+        final StreamsBuilder streamsBuilderNew = new StreamsBuilder();
+
+        final KStream<String, String> leftNew = streamsBuilderNew.stream(
+            inputTopic, Consumed.with(Serdes.String(), Serdes.String()));
+        final KStream<String, String> joinedNew = leftNew.join(
+            leftNew,
+            valueJoiner,
+            JoinWindows.ofTimeDifferenceWithNoGrace(ofMinutes(100))
+        );
+        joinedNew.to(outputTopic, Produced.with(Serdes.String(), Serdes.String()));
+
+        props.put(StreamsConfig.TOPOLOGY_OPTIMIZATION_CONFIG, StreamsConfig.OPTIMIZE);
+        kafkaStreams = new KafkaStreams(streamsBuilderNew.build(), props);
+        kafkaStreams.start();
+
+        final long currentTimeNew = CLUSTER.time.milliseconds();
+
+        processKeyValueAndVerifyCount("1", "C", currentTimeNew + 44L, asList(
+            new KeyValueTimestamp("1", "CA", currentTimeNew + 44L),
+            new KeyValueTimestamp("1", "CB", currentTimeNew + 44L),
+            new KeyValueTimestamp("1", "AC", currentTimeNew + 44L),
+            new KeyValueTimestamp("1", "BC", currentTimeNew + 44L),
+            new KeyValueTimestamp("1", "CC", currentTimeNew + 44L)));
+
+
+        kafkaStreams.close();
+    }
+
+    @Test
+    @SuppressWarnings("unchecked")
+    public void shouldUpgradeWithTopologyOptimizationOn() throws Exception {

Review Comment:
   What's the upgrade here? Is it just a restart test?



##########
streams/src/test/java/org/apache/kafka/streams/integration/SelfJoinUpgradeIntegrationTest.java:
##########
@@ -0,0 +1,277 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.streams.integration;
+
+import static java.time.Duration.ofMinutes;
+import static java.util.Arrays.asList;
+import static java.util.Collections.singletonList;
+import static org.apache.kafka.streams.integration.utils.IntegrationTestUtils.safeUniqueTestName;
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.hamcrest.core.Is.is;
+
+import java.io.IOException;
+import java.time.Duration;
+import java.util.List;
+import java.util.Properties;
+import org.apache.kafka.clients.consumer.ConsumerConfig;
+import org.apache.kafka.common.serialization.Serdes;
+import org.apache.kafka.common.serialization.StringDeserializer;
+import org.apache.kafka.common.serialization.StringSerializer;
+import org.apache.kafka.streams.KafkaStreams;
+import org.apache.kafka.streams.KeyValue;
+import org.apache.kafka.streams.KeyValueTimestamp;
+import org.apache.kafka.streams.StreamsBuilder;
+import org.apache.kafka.streams.StreamsConfig;
+import org.apache.kafka.streams.integration.utils.EmbeddedKafkaCluster;
+import org.apache.kafka.streams.integration.utils.IntegrationTestUtils;
+import org.apache.kafka.streams.kstream.Consumed;
+import org.apache.kafka.streams.kstream.JoinWindows;
+import org.apache.kafka.streams.kstream.KStream;
+import org.apache.kafka.streams.kstream.Produced;
+import org.apache.kafka.streams.kstream.ValueJoiner;
+import org.apache.kafka.test.IntegrationTest;
+import org.apache.kafka.test.TestUtils;
+import org.junit.After;
+import org.junit.AfterClass;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+import org.junit.rules.TestName;
+
+@Category({IntegrationTest.class})
+public class SelfJoinUpgradeIntegrationTest {
+    public static final String INPUT_TOPIC = "selfjoin-input";
+    public static final String OUTPUT_TOPIC = "selfjoin-output";
+    private String inputTopic;
+    private String outputTopic;
+
+    private KafkaStreams kafkaStreams;
+
+    public static final EmbeddedKafkaCluster CLUSTER = new EmbeddedKafkaCluster(1);
+
+    @BeforeClass
+    public static void startCluster() throws IOException {
+        CLUSTER.start();
+    }
+
+    @AfterClass
+    public static void closeCluster() {
+        CLUSTER.stop();
+    }
+
+    @Rule
+    public TestName testName = new TestName();
+
+    @Before
+    public void createTopics() throws Exception {
+        inputTopic = INPUT_TOPIC + safeUniqueTestName(getClass(), testName);
+        outputTopic = OUTPUT_TOPIC + safeUniqueTestName(getClass(), testName);
+        CLUSTER.createTopic(inputTopic);
+        CLUSTER.createTopic(outputTopic);
+    }
+
+    private Properties props() {
+        final Properties streamsConfiguration = new Properties();
+        final String safeTestName = safeUniqueTestName(getClass(), testName);
+        streamsConfiguration.put(StreamsConfig.APPLICATION_ID_CONFIG, "app-" + safeTestName);
+        streamsConfiguration.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, CLUSTER.bootstrapServers());
+        streamsConfiguration.put(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG, 0);
+        streamsConfiguration.put(StreamsConfig.STATE_DIR_CONFIG, TestUtils.tempDirectory().getPath());
+        streamsConfiguration.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG,
+                                 Serdes.String().getClass());
+        streamsConfiguration.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG,
+                                 Serdes.String().getClass());
+        streamsConfiguration.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, 1000L);
+        streamsConfiguration.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
+        return streamsConfiguration;
+    }
+
+    @After
+    public void shutdown() {
+        if (kafkaStreams != null) {
+            kafkaStreams.close(Duration.ofSeconds(30L));
+            kafkaStreams.cleanUp();
+        }
+    }
+
+
+    @Test
+    @SuppressWarnings("unchecked")
+    public void shouldUpgradeWithTopologyOptimizationOff() throws Exception {
+
+        final StreamsBuilder streamsBuilderOld = new StreamsBuilder();
+        final KStream<String, String> leftOld = streamsBuilderOld.stream(
+            inputTopic, Consumed.with(Serdes.String(), Serdes.String()));
+        final ValueJoiner<String, String, String> valueJoiner = (v, v2) -> v + v2;
+        final KStream<String, String> joinedOld = leftOld.join(
+            leftOld,
+            valueJoiner,
+            JoinWindows.ofTimeDifferenceWithNoGrace(ofMinutes(100))
+        );
+        joinedOld.to(outputTopic, Produced.with(Serdes.String(), Serdes.String()));
+
+
+        final Properties props = props();
+        kafkaStreams = new KafkaStreams(streamsBuilderOld.build(), props);

Review Comment:
   It would be more clear if we explicitly set NO_OPTIMIZATION instead of setting nothing here.



##########
streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java:
##########
@@ -661,7 +682,9 @@ public class StreamsConfig extends AbstractConfig {
 
     /** {@code topology.optimization} */
     public static final String TOPOLOGY_OPTIMIZATION_CONFIG = "topology.optimization";
-    private static final String TOPOLOGY_OPTIMIZATION_DOC = "A configuration telling Kafka Streams if it should optimize the topology, disabled by default";
+    private static final String TOPOLOGY_OPTIMIZATION_DOC = "A configuration telling Kafka "
+        + "Streams if it should optimize the topology and what optimizations to apply in a comma "
+        + "separated list. Disabled by default";

Review Comment:
   We should enumerate the options here, like `"Acceptable values are: "+NO_OPTIMIZATION+", "+OPTIMIZE+", or a comma separated list of specific optimizations ("+REUSE_KTABLE_SOURCE_TOPICS+", "+MERGE_REPARTITION_TOPICS+", or "+SELF_JOIN+")"`.
   
   Or something like that.



##########
streams/src/main/java/org/apache/kafka/streams/kstream/internals/InternalStreamsBuilder.java:
##########
@@ -356,6 +381,95 @@ private void mergeDuplicateSourceNodes() {
         }
     }
 
+    /**
+     * If the join is a self-join, remove the node KStreamJoinWindow corresponding to the
+     */
+    @SuppressWarnings("unchecked")
+    private void rewriteSelfJoin(final GraphNode currentNode, final Set<GraphNode> visited) {
+        visited.add(currentNode);
+        if (currentNode instanceof StreamStreamJoinNode && isSelfJoin(currentNode)) {
+            ((StreamStreamJoinNode) currentNode).setSelfJoin();
+            // Remove JoinOtherWindowed node
+            final GraphNode parent = currentNode.parentNodes().stream().findFirst().get();
+            GraphNode left = null, right = null;
+            for (final GraphNode child: parent.children()) {

Review Comment:
   I'm also confused. Maybe you can expand the comment to explain more.
   
   I think what's happening here is that the parent might have other children unrelated to the join, but with respect to the join, there are exactly two graph nodes we care about: the JoinThis and JoinOther. Then, the purpose of this loop is to iterate over the children and look for those two nodes.
   
   Did I get that right?
   
   Are we going to get confused if the source is _also_ involved in another join (independent of the one we're optimizing)? If so, we might need some extra metadata to track the "identity" of the join nodes.



##########
streams/src/test/java/org/apache/kafka/streams/integration/SelfJoinUpgradeIntegrationTest.java:
##########
@@ -0,0 +1,277 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.streams.integration;
+
+import static java.time.Duration.ofMinutes;
+import static java.util.Arrays.asList;
+import static java.util.Collections.singletonList;
+import static org.apache.kafka.streams.integration.utils.IntegrationTestUtils.safeUniqueTestName;
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.hamcrest.core.Is.is;
+
+import java.io.IOException;
+import java.time.Duration;
+import java.util.List;
+import java.util.Properties;
+import org.apache.kafka.clients.consumer.ConsumerConfig;
+import org.apache.kafka.common.serialization.Serdes;
+import org.apache.kafka.common.serialization.StringDeserializer;
+import org.apache.kafka.common.serialization.StringSerializer;
+import org.apache.kafka.streams.KafkaStreams;
+import org.apache.kafka.streams.KeyValue;
+import org.apache.kafka.streams.KeyValueTimestamp;
+import org.apache.kafka.streams.StreamsBuilder;
+import org.apache.kafka.streams.StreamsConfig;
+import org.apache.kafka.streams.integration.utils.EmbeddedKafkaCluster;
+import org.apache.kafka.streams.integration.utils.IntegrationTestUtils;
+import org.apache.kafka.streams.kstream.Consumed;
+import org.apache.kafka.streams.kstream.JoinWindows;
+import org.apache.kafka.streams.kstream.KStream;
+import org.apache.kafka.streams.kstream.Produced;
+import org.apache.kafka.streams.kstream.ValueJoiner;
+import org.apache.kafka.test.IntegrationTest;
+import org.apache.kafka.test.TestUtils;
+import org.junit.After;
+import org.junit.AfterClass;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+import org.junit.rules.TestName;
+
+@Category({IntegrationTest.class})
+public class SelfJoinUpgradeIntegrationTest {
+    public static final String INPUT_TOPIC = "selfjoin-input";
+    public static final String OUTPUT_TOPIC = "selfjoin-output";
+    private String inputTopic;
+    private String outputTopic;
+
+    private KafkaStreams kafkaStreams;
+
+    public static final EmbeddedKafkaCluster CLUSTER = new EmbeddedKafkaCluster(1);
+
+    @BeforeClass
+    public static void startCluster() throws IOException {
+        CLUSTER.start();
+    }
+
+    @AfterClass
+    public static void closeCluster() {
+        CLUSTER.stop();
+    }
+
+    @Rule
+    public TestName testName = new TestName();
+
+    @Before
+    public void createTopics() throws Exception {
+        inputTopic = INPUT_TOPIC + safeUniqueTestName(getClass(), testName);
+        outputTopic = OUTPUT_TOPIC + safeUniqueTestName(getClass(), testName);

Review Comment:
   probably better to just call this method once.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] vpapavas commented on a diff in pull request #12555: Optimize self-join

Posted by GitBox <gi...@apache.org>.
vpapavas commented on code in PR #12555:
URL: https://github.com/apache/kafka/pull/12555#discussion_r970792491


##########
streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamSelfJoin.java:
##########
@@ -0,0 +1,144 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.kstream.internals;
+
+import static org.apache.kafka.streams.processor.internals.metrics.TaskMetrics.droppedRecordsSensor;
+
+import org.apache.kafka.common.metrics.Sensor;
+import org.apache.kafka.streams.KeyValue;
+import org.apache.kafka.streams.kstream.ValueJoinerWithKey;
+import org.apache.kafka.streams.kstream.internals.KStreamImplJoin.TimeTracker;
+import org.apache.kafka.streams.processor.api.ContextualProcessor;
+import org.apache.kafka.streams.processor.api.Processor;
+import org.apache.kafka.streams.processor.api.ProcessorContext;
+import org.apache.kafka.streams.processor.api.ProcessorSupplier;
+import org.apache.kafka.streams.processor.api.Record;
+import org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl;
+import org.apache.kafka.streams.state.WindowStore;
+import org.apache.kafka.streams.state.WindowStoreIterator;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+class KStreamKStreamSelfJoin<K, V1, V2, VOut> implements ProcessorSupplier<K, V1, K, VOut> {
+    private static final Logger LOG = LoggerFactory.getLogger(KStreamKStreamSelfJoin.class);
+
+    private final String windowName;
+    private final long joinThisBeforeMs;
+    private final long joinThisAfterMs;
+    private final long joinOtherBeforeMs;
+    private final long joinOtherAfterMs;
+    private final ValueJoinerWithKey<? super K, ? super V1, ? super V2, ? extends VOut> joinerThis;
+
+    private final TimeTracker sharedTimeTracker;
+
+    KStreamKStreamSelfJoin(
+        final String windowName,
+        final JoinWindowsInternal windows,
+        final ValueJoinerWithKey<? super K, ? super V1, ? super V2, ? extends VOut> joinerThis,
+        final TimeTracker sharedTimeTracker) {
+
+        this.windowName = windowName;
+        this.joinThisBeforeMs = windows.beforeMs;
+        this.joinThisAfterMs = windows.afterMs;
+        this.joinOtherBeforeMs = windows.afterMs;
+        this.joinOtherAfterMs = windows.beforeMs;
+        this.joinerThis = joinerThis;
+        this.sharedTimeTracker = sharedTimeTracker;
+    }
+
+    @Override
+    public Processor<K, V1, K, VOut> get() {
+        return new KStreamKStreamSelfJoinProcessor();
+    }
+
+    private class KStreamKStreamSelfJoinProcessor extends ContextualProcessor<K, V1, K, VOut> {
+        private WindowStore<K, V2> windowStore;
+        private Sensor droppedRecordsSensor;
+
+        @Override
+        public void init(final ProcessorContext<K, VOut> context) {
+            super.init(context);
+
+            final StreamsMetricsImpl metrics = (StreamsMetricsImpl) context.metrics();
+            droppedRecordsSensor = droppedRecordsSensor(Thread.currentThread().getName(), context.taskId().toString(), metrics);
+            windowStore = context.getStateStore(windowName);
+        }
+
+        @SuppressWarnings("unchecked")
+        @Override
+        public void process(final Record<K, V1> record) {
+            if (StreamStreamJoinUtil.skipRecord(record, LOG, droppedRecordsSensor, context())) {
+                return;
+            }
+
+            final long inputRecordTimestamp = record.timestamp();
+            long timeFrom = Math.max(0L, inputRecordTimestamp - joinThisBeforeMs);
+            long timeTo = Math.max(0L, inputRecordTimestamp + joinThisAfterMs);
+            boolean emittedJoinWithSelf = false;
+            final Record selfRecord = record
+                .withValue(joinerThis.apply(record.key(), record.value(), (V2) record.value()))
+                .withTimestamp(inputRecordTimestamp);
+            sharedTimeTracker.advanceStreamTime(inputRecordTimestamp);
+
+            // Join current record with other
+            try (final WindowStoreIterator<V2> iter = windowStore.fetch(
+                record.key(), timeFrom, timeTo)) {
+                while (iter.hasNext()) {
+                    final KeyValue<Long, V2> otherRecord = iter.next();
+                    final long otherRecordTimestamp = otherRecord.key;
+
+                    // Join this with other
+                    context().forward(
+                        record.withValue(joinerThis.apply(
+                                record.key(), record.value(), otherRecord.value))
+                            .withTimestamp(Math.max(inputRecordTimestamp, otherRecordTimestamp)));
+                }
+            }
+
+            // Needs to be in a different loop to ensure correct ordering of records where
+            // correct ordering means it matches the output of an inner join.
+            timeFrom = Math.max(0L, inputRecordTimestamp - joinOtherBeforeMs);
+            timeTo = Math.max(0L, inputRecordTimestamp + joinOtherAfterMs);
+            try (final WindowStoreIterator<V2> iter2 = windowStore.fetch(
+                record.key(), timeFrom, timeTo)) {
+                while (iter2.hasNext()) {
+                    final KeyValue<Long, V2> otherRecord = iter2.next();
+                    final long otherRecordTimestamp = otherRecord.key;
+                    final long maxRecordTimestamp = Math.max(inputRecordTimestamp, otherRecordTimestamp);
+
+                    // This is needed so that output records follow timestamp order
+                    // Join this with self
+                    if (inputRecordTimestamp < maxRecordTimestamp && !emittedJoinWithSelf) {

Review Comment:
   We don't emit the self-self record twice because the record gets added to the state store AFTER the children have been triggered. The logic is in the `KStreamJoinWindow` and looks like: 
   ```
           public void process(final Record<K, V> record) {
               // if the key is null, we do not need to put the record into window store
               // since it will never be considered for join operations
               if (record.key() != null) {
                   context().forward(record);
                   // Every record basically starts a new window. We're using a window store mostly for the retention.
                   window.put(record.key(), record.value(), record.timestamp());
               }
           }
   ```
   So, first the record gets forwarded to the children and then it's added to the state store. 
   
   I got tripped up by this as well :)



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] guozhangwang commented on a diff in pull request #12555: Optimize self-join

Posted by GitBox <gi...@apache.org>.
guozhangwang commented on code in PR #12555:
URL: https://github.com/apache/kafka/pull/12555#discussion_r956662529


##########
streams/src/main/java/org/apache/kafka/streams/kstream/internals/InternalStreamsBuilder.java:
##########
@@ -356,6 +362,95 @@ private void mergeDuplicateSourceNodes() {
         }
     }
 
+    /**
+     * If the join is a self-join, remove the node KStreamJoinWindow corresponding to the
+     */
+    @SuppressWarnings("unchecked")
+    private void rewriteSelfJoin(final GraphNode currentNode, final Set<GraphNode> visited) {
+        visited.add(currentNode);
+        if (currentNode instanceof StreamStreamJoinNode && isSelfJoin(currentNode)) {
+            ((StreamStreamJoinNode) currentNode).setSelfJoin();
+            // Remove JoinOtherWindowed node
+            final GraphNode parent = currentNode.parentNodes().stream().findFirst().get();
+            GraphNode left = null, right = null;
+            for (final GraphNode child: parent.children()) {
+                if (child instanceof  ProcessorGraphNode
+                    && isStreamJoinWindowNode((ProcessorGraphNode) child)) {
+                    if (left == null) {
+                        left = child;
+                    } else {
+                        right = child;
+                    }
+                }
+            }
+            // Sanity check
+            if (left != null && right != null && left.buildPriority() < right.buildPriority()) {
+                parent.removeChild(right);
+            }
+        }
+        for (final GraphNode child: currentNode.children()) {
+            if (!visited.contains(child)) {
+                rewriteSelfJoin(child, visited);
+            }
+        }
+    }
+
+    /**
+     * The self-join rewriting can be applied if:
+     * 1. The path from the StreamStreamJoinNode to the root contains a single source node.
+     * 2. The StreamStreamJoinNode has a single parent.
+     * 3. There are no other nodes besides the KStreamJoinWindow  that are siblings of the
+     * StreamStreamJoinNode and have smaller build priority.
+     */
+    private boolean isSelfJoin(final GraphNode streamJoinNode) {
+        final AtomicInteger count = new AtomicInteger();
+        countSourceNodes(count, streamJoinNode, new HashSet<>());
+        if (count.get() > 1) {
+            return false;
+        }
+        if (streamJoinNode.parentNodes().size() > 1) {
+            return false;
+        }
+        for (final GraphNode parent: streamJoinNode.parentNodes()) {
+            for (final GraphNode sibling : parent.children()) {
+                if (sibling instanceof ProcessorGraphNode) {
+                    if (isStreamJoinWindowNode((ProcessorGraphNode) sibling)) {
+                        continue;
+                    }
+                }
+                if (sibling != streamJoinNode
+                    && sibling.buildPriority() < streamJoinNode.buildPriority()) {
+                    return false;
+                }
+            }
+        }
+        return true;
+    }
+
+    private void countSourceNodes(
+        final AtomicInteger count,
+        final GraphNode currentNode,
+        final Set<GraphNode> visited) {
+
+        if (currentNode instanceof StreamSourceNode) {
+            count.incrementAndGet();
+        }
+
+        for (final GraphNode parent: currentNode.parentNodes()) {
+            if (!visited.contains(parent)) {
+                visited.add(parent);
+                countSourceNodes(count, parent, visited);
+            }
+        }
+    }
+
+    private boolean isStreamJoinWindowNode(final ProcessorGraphNode node) {
+        return node.processorParameters() != null

Review Comment:
   Why we need the other checks except the last one? Since this is called after the whole pass the logical plan should be complete and hence all of the checked fields should be null (otherwise it should be a bug)?



##########
streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java:
##########
@@ -663,6 +663,12 @@ public class StreamsConfig extends AbstractConfig {
     public static final String TOPOLOGY_OPTIMIZATION_CONFIG = "topology.optimization";
     private static final String TOPOLOGY_OPTIMIZATION_DOC = "A configuration telling Kafka Streams if it should optimize the topology, disabled by default";
 
+    public static final String SELF_JOIN_OPTIMIZATION_CONFIG = "self.join.optimization";

Review Comment:
   I still have some concerns about the extra configs pattern here, but I will leave for open discuss in the KIP thread.



##########
streams/src/main/java/org/apache/kafka/streams/kstream/internals/InternalStreamsBuilder.java:
##########
@@ -270,16 +272,20 @@ private void maybeAddNodeForOptimizationMetadata(final GraphNode node) {
 
     // use this method for testing only
     public void buildAndOptimizeTopology() {
-        buildAndOptimizeTopology(false);
+        buildAndOptimizeTopology(false, false);
     }
 
-    public void buildAndOptimizeTopology(final boolean optimizeTopology) {
+    public void buildAndOptimizeTopology(
+        final boolean optimizeTopology, final boolean optimizeSelfJoin) {
 
         mergeDuplicateSourceNodes();
         if (optimizeTopology) {
             LOG.debug("Optimizing the Kafka Streams graph for repartition nodes");
             optimizeKTableSourceTopics();
             maybeOptimizeRepartitionOperations();
+            if (optimizeSelfJoin) {

Review Comment:
   This is a meta comment: we used a "two-pass" mechanism to apply optimization before since originally we did not pass in the `Configs` as part of the StreamsBuilder.build() process, and that two-pass mechanism has introduced a lot of issues and complicates (I think you've felt some at this time.. :P ) so the current plan is to get rid of the two-pass mechanism but try to optimize the topology in the first pass as we build the logical plan. So could you consider another approach than rewriting self joins if possible? In this way, we potentially would not need to consider awkward applicabilities like you have in https://github.com/apache/kafka/pull/12555/files#diff-ac1bf2b23b80784dec20b00fdc42f2df7e5a5133d6c68978fa44aea11e950c3aR398-R404 below.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] agavra commented on a diff in pull request #12555: Optimize self-join

Posted by GitBox <gi...@apache.org>.
agavra commented on code in PR #12555:
URL: https://github.com/apache/kafka/pull/12555#discussion_r959892207


##########
streams/src/main/java/org/apache/kafka/streams/kstream/internals/InternalStreamsBuilder.java:
##########
@@ -356,6 +381,95 @@ private void mergeDuplicateSourceNodes() {
         }
     }
 
+    /**
+     * If the join is a self-join, remove the node KStreamJoinWindow corresponding to the
+     */
+    @SuppressWarnings("unchecked")
+    private void rewriteSelfJoin(final GraphNode currentNode, final Set<GraphNode> visited) {
+        visited.add(currentNode);
+        if (currentNode instanceof StreamStreamJoinNode && isSelfJoin(currentNode)) {
+            ((StreamStreamJoinNode) currentNode).setSelfJoin();
+            // Remove JoinOtherWindowed node
+            final GraphNode parent = currentNode.parentNodes().stream().findFirst().get();
+            GraphNode left = null, right = null;
+            for (final GraphNode child: parent.children()) {
+                if (child instanceof  ProcessorGraphNode
+                    && isStreamJoinWindowNode((ProcessorGraphNode) child)) {
+                    if (left == null) {
+                        left = child;
+                    } else {
+                        right = child;
+                    }
+                }
+            }
+            // Sanity check
+            if (left != null && right != null && left.buildPriority() < right.buildPriority()) {
+                parent.removeChild(right);
+            }
+        }
+        for (final GraphNode child: currentNode.children()) {
+            if (!visited.contains(child)) {
+                rewriteSelfJoin(child, visited);
+            }
+        }
+    }
+
+    /**
+     * The self-join rewriting can be applied if:
+     * 1. The path from the StreamStreamJoinNode to the root contains a single source node.
+     * 2. The StreamStreamJoinNode has a single parent.
+     * 3. There are no other nodes besides the KStreamJoinWindow  that are siblings of the
+     * StreamStreamJoinNode and have smaller build priority.
+     */
+    private boolean isSelfJoin(final GraphNode streamJoinNode) {
+        final AtomicInteger count = new AtomicInteger();
+        countSourceNodes(count, streamJoinNode, new HashSet<>());
+        if (count.get() > 1) {
+            return false;
+        }
+        if (streamJoinNode.parentNodes().size() > 1) {
+            return false;
+        }
+        for (final GraphNode parent: streamJoinNode.parentNodes()) {
+            for (final GraphNode sibling : parent.children()) {
+                if (sibling instanceof ProcessorGraphNode) {
+                    if (isStreamJoinWindowNode((ProcessorGraphNode) sibling)) {
+                        continue;
+                    }
+                }
+                if (sibling != streamJoinNode
+                    && sibling.buildPriority() < streamJoinNode.buildPriority()) {
+                    return false;
+                }
+            }
+        }
+        return true;
+    }
+
+    private void countSourceNodes(
+        final AtomicInteger count,
+        final GraphNode currentNode,
+        final Set<GraphNode> visited) {
+
+        if (currentNode instanceof StreamSourceNode) {
+            count.incrementAndGet();
+        }
+
+        for (final GraphNode parent: currentNode.parentNodes()) {
+            if (!visited.contains(parent)) {

Review Comment:
   `GraphNode` doesn't implement `equals` and `hashCode` which makes this operation somewhat risky. Either we should implement those methods or explicitly use `IdentityHashMap` if we're certain that the nodes will always be the same instance



##########
streams/src/main/java/org/apache/kafka/streams/kstream/internals/InternalStreamsBuilder.java:
##########
@@ -356,6 +381,95 @@ private void mergeDuplicateSourceNodes() {
         }
     }
 
+    /**
+     * If the join is a self-join, remove the node KStreamJoinWindow corresponding to the
+     */
+    @SuppressWarnings("unchecked")
+    private void rewriteSelfJoin(final GraphNode currentNode, final Set<GraphNode> visited) {
+        visited.add(currentNode);
+        if (currentNode instanceof StreamStreamJoinNode && isSelfJoin(currentNode)) {
+            ((StreamStreamJoinNode) currentNode).setSelfJoin();
+            // Remove JoinOtherWindowed node
+            final GraphNode parent = currentNode.parentNodes().stream().findFirst().get();
+            GraphNode left = null, right = null;
+            for (final GraphNode child: parent.children()) {
+                if (child instanceof  ProcessorGraphNode
+                    && isStreamJoinWindowNode((ProcessorGraphNode) child)) {
+                    if (left == null) {
+                        left = child;
+                    } else {
+                        right = child;
+                    }
+                }
+            }
+            // Sanity check
+            if (left != null && right != null && left.buildPriority() < right.buildPriority()) {
+                parent.removeChild(right);
+            }
+        }
+        for (final GraphNode child: currentNode.children()) {
+            if (!visited.contains(child)) {
+                rewriteSelfJoin(child, visited);
+            }
+        }
+    }
+
+    /**
+     * The self-join rewriting can be applied if:
+     * 1. The path from the StreamStreamJoinNode to the root contains a single source node.
+     * 2. The StreamStreamJoinNode has a single parent.
+     * 3. There are no other nodes besides the KStreamJoinWindow  that are siblings of the
+     * StreamStreamJoinNode and have smaller build priority.
+     */
+    private boolean isSelfJoin(final GraphNode streamJoinNode) {
+        final AtomicInteger count = new AtomicInteger();
+        countSourceNodes(count, streamJoinNode, new HashSet<>());
+        if (count.get() > 1) {
+            return false;
+        }
+        if (streamJoinNode.parentNodes().size() > 1) {
+            return false;
+        }
+        for (final GraphNode parent: streamJoinNode.parentNodes()) {
+            for (final GraphNode sibling : parent.children()) {
+                if (sibling instanceof ProcessorGraphNode) {
+                    if (isStreamJoinWindowNode((ProcessorGraphNode) sibling)) {
+                        continue;
+                    }
+                }
+                if (sibling != streamJoinNode
+                    && sibling.buildPriority() < streamJoinNode.buildPriority()) {
+                    return false;
+                }
+            }
+        }
+        return true;
+    }
+
+    private void countSourceNodes(

Review Comment:
   instead of incrementing `count` can we have this method return `int`? (you can then sum up recursive call results) - generally better to avoid side-effects as it makes debugging easier
   
   alternatively, we can return a `boolean` and change this to `hasExactlyOneSourceNode`, which would return false if more than one of the recursive calls returned `true` (also allowing us to short-circuit the logic, not that the performance boost matters)



##########
streams/src/main/java/org/apache/kafka/streams/kstream/internals/InternalStreamsBuilder.java:
##########
@@ -356,6 +381,95 @@ private void mergeDuplicateSourceNodes() {
         }
     }
 
+    /**
+     * If the join is a self-join, remove the node KStreamJoinWindow corresponding to the
+     */
+    @SuppressWarnings("unchecked")
+    private void rewriteSelfJoin(final GraphNode currentNode, final Set<GraphNode> visited) {
+        visited.add(currentNode);
+        if (currentNode instanceof StreamStreamJoinNode && isSelfJoin(currentNode)) {
+            ((StreamStreamJoinNode) currentNode).setSelfJoin();
+            // Remove JoinOtherWindowed node
+            final GraphNode parent = currentNode.parentNodes().stream().findFirst().get();
+            GraphNode left = null, right = null;
+            for (final GraphNode child: parent.children()) {
+                if (child instanceof  ProcessorGraphNode
+                    && isStreamJoinWindowNode((ProcessorGraphNode) child)) {
+                    if (left == null) {
+                        left = child;
+                    } else {
+                        right = child;
+                    }
+                }
+            }
+            // Sanity check
+            if (left != null && right != null && left.buildPriority() < right.buildPriority()) {
+                parent.removeChild(right);
+            }
+        }
+        for (final GraphNode child: currentNode.children()) {
+            if (!visited.contains(child)) {
+                rewriteSelfJoin(child, visited);
+            }
+        }
+    }
+
+    /**
+     * The self-join rewriting can be applied if:
+     * 1. The path from the StreamStreamJoinNode to the root contains a single source node.
+     * 2. The StreamStreamJoinNode has a single parent.
+     * 3. There are no other nodes besides the KStreamJoinWindow  that are siblings of the
+     * StreamStreamJoinNode and have smaller build priority.
+     */
+    private boolean isSelfJoin(final GraphNode streamJoinNode) {
+        final AtomicInteger count = new AtomicInteger();
+        countSourceNodes(count, streamJoinNode, new HashSet<>());
+        if (count.get() > 1) {
+            return false;
+        }
+        if (streamJoinNode.parentNodes().size() > 1) {
+            return false;
+        }
+        for (final GraphNode parent: streamJoinNode.parentNodes()) {

Review Comment:
   instead of making this a for loop, can we use `Iterables.getOnlyElement()`? that should make this a bit easier to reason about



##########
streams/src/main/java/org/apache/kafka/streams/kstream/internals/InternalStreamsBuilder.java:
##########
@@ -356,6 +381,95 @@ private void mergeDuplicateSourceNodes() {
         }
     }
 
+    /**
+     * If the join is a self-join, remove the node KStreamJoinWindow corresponding to the
+     */
+    @SuppressWarnings("unchecked")
+    private void rewriteSelfJoin(final GraphNode currentNode, final Set<GraphNode> visited) {
+        visited.add(currentNode);
+        if (currentNode instanceof StreamStreamJoinNode && isSelfJoin(currentNode)) {
+            ((StreamStreamJoinNode) currentNode).setSelfJoin();
+            // Remove JoinOtherWindowed node
+            final GraphNode parent = currentNode.parentNodes().stream().findFirst().get();
+            GraphNode left = null, right = null;
+            for (final GraphNode child: parent.children()) {
+                if (child instanceof  ProcessorGraphNode
+                    && isStreamJoinWindowNode((ProcessorGraphNode) child)) {
+                    if (left == null) {
+                        left = child;
+                    } else {
+                        right = child;
+                    }
+                }
+            }
+            // Sanity check
+            if (left != null && right != null && left.buildPriority() < right.buildPriority()) {
+                parent.removeChild(right);
+            }
+        }
+        for (final GraphNode child: currentNode.children()) {
+            if (!visited.contains(child)) {
+                rewriteSelfJoin(child, visited);
+            }
+        }
+    }
+
+    /**
+     * The self-join rewriting can be applied if:
+     * 1. The path from the StreamStreamJoinNode to the root contains a single source node.
+     * 2. The StreamStreamJoinNode has a single parent.
+     * 3. There are no other nodes besides the KStreamJoinWindow  that are siblings of the
+     * StreamStreamJoinNode and have smaller build priority.

Review Comment:
   this questions is because I'm not familiar with the codebase, but can you explain what build priority is and why it matters in this algorithm? 



##########
streams/src/main/java/org/apache/kafka/streams/kstream/internals/InternalStreamsBuilder.java:
##########
@@ -356,6 +381,95 @@ private void mergeDuplicateSourceNodes() {
         }
     }
 
+    /**
+     * If the join is a self-join, remove the node KStreamJoinWindow corresponding to the
+     */
+    @SuppressWarnings("unchecked")
+    private void rewriteSelfJoin(final GraphNode currentNode, final Set<GraphNode> visited) {
+        visited.add(currentNode);
+        if (currentNode instanceof StreamStreamJoinNode && isSelfJoin(currentNode)) {
+            ((StreamStreamJoinNode) currentNode).setSelfJoin();
+            // Remove JoinOtherWindowed node
+            final GraphNode parent = currentNode.parentNodes().stream().findFirst().get();
+            GraphNode left = null, right = null;
+            for (final GraphNode child: parent.children()) {
+                if (child instanceof  ProcessorGraphNode
+                    && isStreamJoinWindowNode((ProcessorGraphNode) child)) {
+                    if (left == null) {
+                        left = child;
+                    } else {
+                        right = child;
+                    }
+                }
+            }
+            // Sanity check
+            if (left != null && right != null && left.buildPriority() < right.buildPriority()) {
+                parent.removeChild(right);
+            }
+        }
+        for (final GraphNode child: currentNode.children()) {
+            if (!visited.contains(child)) {
+                rewriteSelfJoin(child, visited);
+            }
+        }
+    }
+
+    /**
+     * The self-join rewriting can be applied if:
+     * 1. The path from the StreamStreamJoinNode to the root contains a single source node.
+     * 2. The StreamStreamJoinNode has a single parent.
+     * 3. There are no other nodes besides the KStreamJoinWindow  that are siblings of the
+     * StreamStreamJoinNode and have smaller build priority.
+     */
+    private boolean isSelfJoin(final GraphNode streamJoinNode) {
+        final AtomicInteger count = new AtomicInteger();

Review Comment:
   nit: rename this to numSourceNodes for readability (or if you take my suggestion below, just inline the call in the `if` branch)



##########
streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamSelfJoin.java:
##########
@@ -0,0 +1,152 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.kstream.internals;
+
+import static org.apache.kafka.streams.processor.internals.metrics.TaskMetrics.droppedRecordsSensor;
+
+import org.apache.kafka.common.metrics.Sensor;
+import org.apache.kafka.streams.KeyValue;
+import org.apache.kafka.streams.kstream.ValueJoinerWithKey;
+import org.apache.kafka.streams.kstream.internals.KStreamImplJoin.TimeTracker;
+import org.apache.kafka.streams.processor.api.ContextualProcessor;
+import org.apache.kafka.streams.processor.api.Processor;
+import org.apache.kafka.streams.processor.api.ProcessorContext;
+import org.apache.kafka.streams.processor.api.ProcessorSupplier;
+import org.apache.kafka.streams.processor.api.Record;
+import org.apache.kafka.streams.processor.api.RecordMetadata;
+import org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl;
+import org.apache.kafka.streams.state.WindowStore;
+import org.apache.kafka.streams.state.WindowStoreIterator;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+class KStreamKStreamSelfJoin<K, V1, V2, VOut> implements ProcessorSupplier<K, V1, K, VOut> {
+    private static final Logger LOG = LoggerFactory.getLogger(KStreamKStreamSelfJoin.class);
+
+    private final String windowName;
+    private final long joinBeforeMs;
+    private final long joinAfterMs;
+    private final ValueJoinerWithKey<? super K, ? super V1, ? super V2, ? extends VOut> joinerThis;
+    private final ValueJoinerWithKey<? super K, ? super V2, ? super V1, ? extends VOut> joinerOther;
+
+    private final TimeTracker sharedTimeTracker;
+
+    KStreamKStreamSelfJoin(
+        final String windowName,
+        final JoinWindowsInternal windows,
+        final ValueJoinerWithKey<? super K, ? super V1, ? super V2, ? extends VOut> joinerThis,
+        final ValueJoinerWithKey<? super K, ? super V2, ? super V1, ? extends VOut> joinerOther,
+        final TimeTracker sharedTimeTracker) {
+
+        this.windowName = windowName;
+        this.joinBeforeMs = windows.beforeMs;
+        this.joinAfterMs = windows.afterMs;
+        this.joinerThis = joinerThis;
+        this.joinerOther = joinerOther;
+        this.sharedTimeTracker = sharedTimeTracker;
+    }
+
+    @Override
+    public Processor<K, V1, K, VOut> get() {
+        return new KStreamKStreamSelfJoinProcessor();
+    }
+
+    private class KStreamKStreamSelfJoinProcessor extends ContextualProcessor<K, V1, K, VOut> {
+        private WindowStore<K, V2> windowStore;
+        private Sensor droppedRecordsSensor;
+
+        @Override
+        public void init(final ProcessorContext<K, VOut> context) {
+            super.init(context);
+
+            final StreamsMetricsImpl metrics = (StreamsMetricsImpl) context.metrics();
+            droppedRecordsSensor = droppedRecordsSensor(Thread.currentThread().getName(), context.taskId().toString(), metrics);
+            windowStore = context.getStateStore(windowName);
+        }
+
+        @SuppressWarnings("unchecked")
+        @Override
+        public void process(final Record<K, V1> record) {
+            // Copied from inner join:

Review Comment:
   let's try to avoid duplicated code - without having looked at the code in inner join, is there anything that we can do to put this in a parent class? 



##########
streams/src/main/java/org/apache/kafka/streams/kstream/internals/InternalStreamsBuilder.java:
##########
@@ -356,6 +381,95 @@ private void mergeDuplicateSourceNodes() {
         }
     }
 
+    /**
+     * If the join is a self-join, remove the node KStreamJoinWindow corresponding to the

Review Comment:
   nit: incomplete doc?



##########
streams/src/main/java/org/apache/kafka/streams/kstream/internals/InternalStreamsBuilder.java:
##########
@@ -270,17 +276,36 @@ private void maybeAddNodeForOptimizationMetadata(final GraphNode node) {
 
     // use this method for testing only
     public void buildAndOptimizeTopology() {
-        buildAndOptimizeTopology(false);
+        buildAndOptimizeTopology(null);
     }
 
-    public void buildAndOptimizeTopology(final boolean optimizeTopology) {
+    public void buildAndOptimizeTopology(final Properties props) {
+        // Vicky: Do we need to verify props?
+        final List<String> optimizationConfigs;
+        if (props == null) {
+            optimizationConfigs = new ArrayList<>();
+            optimizationConfigs.add(StreamsConfig.NO_OPTIMIZATION);
+        } else {
+            final StreamsConfig config = new StreamsConfig(props);
+            optimizationConfigs = config.getList(StreamsConfig.TOPOLOGY_OPTIMIZATION_CONFIG);
+        }
 
         mergeDuplicateSourceNodes();
-        if (optimizeTopology) {
-            LOG.debug("Optimizing the Kafka Streams graph for repartition nodes");
+        if (optimizationConfigs.contains(StreamsConfig.OPTIMIZE)

Review Comment:
   nit: probably easier to read if we just have one top level branch for `contains(OPTIMIZE)`:
   ```
   if (contains(OPTIMIZE)) {
     applyOptimizations()
   }
   
   void applyOptimizations() {
     if (contains(REUSE) reuse
     if (contains(MERGE) merge...
   }
   ```



##########
streams/src/main/java/org/apache/kafka/streams/kstream/internals/InternalStreamsBuilder.java:
##########
@@ -356,6 +381,95 @@ private void mergeDuplicateSourceNodes() {
         }
     }
 
+    /**
+     * If the join is a self-join, remove the node KStreamJoinWindow corresponding to the
+     */
+    @SuppressWarnings("unchecked")
+    private void rewriteSelfJoin(final GraphNode currentNode, final Set<GraphNode> visited) {
+        visited.add(currentNode);
+        if (currentNode instanceof StreamStreamJoinNode && isSelfJoin(currentNode)) {
+            ((StreamStreamJoinNode) currentNode).setSelfJoin();
+            // Remove JoinOtherWindowed node
+            final GraphNode parent = currentNode.parentNodes().stream().findFirst().get();
+            GraphNode left = null, right = null;
+            for (final GraphNode child: parent.children()) {

Review Comment:
   are we sure that there's at most two nodes in `parent.children()`? if so we should assert that here and not use a `for` loop but instead get indexes explicitly



##########
streams/src/main/java/org/apache/kafka/streams/kstream/internals/InternalStreamsBuilder.java:
##########
@@ -356,6 +381,95 @@ private void mergeDuplicateSourceNodes() {
         }
     }
 
+    /**
+     * If the join is a self-join, remove the node KStreamJoinWindow corresponding to the
+     */
+    @SuppressWarnings("unchecked")
+    private void rewriteSelfJoin(final GraphNode currentNode, final Set<GraphNode> visited) {
+        visited.add(currentNode);
+        if (currentNode instanceof StreamStreamJoinNode && isSelfJoin(currentNode)) {
+            ((StreamStreamJoinNode) currentNode).setSelfJoin();
+            // Remove JoinOtherWindowed node
+            final GraphNode parent = currentNode.parentNodes().stream().findFirst().get();
+            GraphNode left = null, right = null;
+            for (final GraphNode child: parent.children()) {
+                if (child instanceof  ProcessorGraphNode
+                    && isStreamJoinWindowNode((ProcessorGraphNode) child)) {
+                    if (left == null) {
+                        left = child;
+                    } else {
+                        right = child;
+                    }
+                }
+            }
+            // Sanity check
+            if (left != null && right != null && left.buildPriority() < right.buildPriority()) {
+                parent.removeChild(right);
+            }
+        }
+        for (final GraphNode child: currentNode.children()) {
+            if (!visited.contains(child)) {
+                rewriteSelfJoin(child, visited);
+            }
+        }
+    }
+
+    /**
+     * The self-join rewriting can be applied if:
+     * 1. The path from the StreamStreamJoinNode to the root contains a single source node.
+     * 2. The StreamStreamJoinNode has a single parent.
+     * 3. There are no other nodes besides the KStreamJoinWindow  that are siblings of the
+     * StreamStreamJoinNode and have smaller build priority.
+     */
+    private boolean isSelfJoin(final GraphNode streamJoinNode) {
+        final AtomicInteger count = new AtomicInteger();
+        countSourceNodes(count, streamJoinNode, new HashSet<>());
+        if (count.get() > 1) {
+            return false;
+        }
+        if (streamJoinNode.parentNodes().size() > 1) {

Review Comment:
   should this check be `!= 1`? There should never be a case here where `<1` is true, right?



##########
streams/src/main/java/org/apache/kafka/streams/kstream/internals/InternalStreamsBuilder.java:
##########
@@ -356,6 +381,95 @@ private void mergeDuplicateSourceNodes() {
         }
     }
 
+    /**
+     * If the join is a self-join, remove the node KStreamJoinWindow corresponding to the
+     */
+    @SuppressWarnings("unchecked")
+    private void rewriteSelfJoin(final GraphNode currentNode, final Set<GraphNode> visited) {
+        visited.add(currentNode);
+        if (currentNode instanceof StreamStreamJoinNode && isSelfJoin(currentNode)) {
+            ((StreamStreamJoinNode) currentNode).setSelfJoin();
+            // Remove JoinOtherWindowed node
+            final GraphNode parent = currentNode.parentNodes().stream().findFirst().get();
+            GraphNode left = null, right = null;
+            for (final GraphNode child: parent.children()) {
+                if (child instanceof  ProcessorGraphNode
+                    && isStreamJoinWindowNode((ProcessorGraphNode) child)) {
+                    if (left == null) {
+                        left = child;
+                    } else {
+                        right = child;
+                    }
+                }
+            }
+            // Sanity check
+            if (left != null && right != null && left.buildPriority() < right.buildPriority()) {
+                parent.removeChild(right);
+            }
+        }
+        for (final GraphNode child: currentNode.children()) {
+            if (!visited.contains(child)) {
+                rewriteSelfJoin(child, visited);

Review Comment:
   for my understanding, is it possible to have multiple self joins in a single topology? I thought from offline discussions that this wasn't possible - in which case should we cascade and exit the recursion once we've identified a single self join?



##########
streams/src/main/java/org/apache/kafka/streams/kstream/internals/InternalStreamsBuilder.java:
##########
@@ -356,6 +381,95 @@ private void mergeDuplicateSourceNodes() {
         }
     }
 
+    /**
+     * If the join is a self-join, remove the node KStreamJoinWindow corresponding to the
+     */
+    @SuppressWarnings("unchecked")
+    private void rewriteSelfJoin(final GraphNode currentNode, final Set<GraphNode> visited) {
+        visited.add(currentNode);
+        if (currentNode instanceof StreamStreamJoinNode && isSelfJoin(currentNode)) {
+            ((StreamStreamJoinNode) currentNode).setSelfJoin();
+            // Remove JoinOtherWindowed node
+            final GraphNode parent = currentNode.parentNodes().stream().findFirst().get();
+            GraphNode left = null, right = null;
+            for (final GraphNode child: parent.children()) {
+                if (child instanceof  ProcessorGraphNode
+                    && isStreamJoinWindowNode((ProcessorGraphNode) child)) {
+                    if (left == null) {
+                        left = child;
+                    } else {
+                        right = child;
+                    }
+                }
+            }
+            // Sanity check
+            if (left != null && right != null && left.buildPriority() < right.buildPriority()) {
+                parent.removeChild(right);
+            }
+        }
+        for (final GraphNode child: currentNode.children()) {
+            if (!visited.contains(child)) {

Review Comment:
   same comment about `GraphNode#equals` as below



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] vpapavas commented on a diff in pull request #12555: Optimize self-join

Posted by GitBox <gi...@apache.org>.
vpapavas commented on code in PR #12555:
URL: https://github.com/apache/kafka/pull/12555#discussion_r960647123


##########
streams/src/main/java/org/apache/kafka/streams/kstream/internals/InternalStreamsBuilder.java:
##########
@@ -356,6 +381,95 @@ private void mergeDuplicateSourceNodes() {
         }
     }
 
+    /**
+     * If the join is a self-join, remove the node KStreamJoinWindow corresponding to the
+     */
+    @SuppressWarnings("unchecked")
+    private void rewriteSelfJoin(final GraphNode currentNode, final Set<GraphNode> visited) {
+        visited.add(currentNode);
+        if (currentNode instanceof StreamStreamJoinNode && isSelfJoin(currentNode)) {
+            ((StreamStreamJoinNode) currentNode).setSelfJoin();
+            // Remove JoinOtherWindowed node
+            final GraphNode parent = currentNode.parentNodes().stream().findFirst().get();
+            GraphNode left = null, right = null;
+            for (final GraphNode child: parent.children()) {
+                if (child instanceof  ProcessorGraphNode
+                    && isStreamJoinWindowNode((ProcessorGraphNode) child)) {
+                    if (left == null) {
+                        left = child;
+                    } else {
+                        right = child;
+                    }
+                }
+            }
+            // Sanity check
+            if (left != null && right != null && left.buildPriority() < right.buildPriority()) {
+                parent.removeChild(right);
+            }
+        }
+        for (final GraphNode child: currentNode.children()) {
+            if (!visited.contains(child)) {
+                rewriteSelfJoin(child, visited);
+            }
+        }
+    }
+
+    /**
+     * The self-join rewriting can be applied if:
+     * 1. The path from the StreamStreamJoinNode to the root contains a single source node.
+     * 2. The StreamStreamJoinNode has a single parent.
+     * 3. There are no other nodes besides the KStreamJoinWindow  that are siblings of the
+     * StreamStreamJoinNode and have smaller build priority.
+     */
+    private boolean isSelfJoin(final GraphNode streamJoinNode) {
+        final AtomicInteger count = new AtomicInteger();
+        countSourceNodes(count, streamJoinNode, new HashSet<>());
+        if (count.get() > 1) {
+            return false;
+        }
+        if (streamJoinNode.parentNodes().size() > 1) {
+            return false;
+        }
+        for (final GraphNode parent: streamJoinNode.parentNodes()) {

Review Comment:
   No Guava in Streams :P



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] vpapavas commented on a diff in pull request #12555: Optimize self-join

Posted by GitBox <gi...@apache.org>.
vpapavas commented on code in PR #12555:
URL: https://github.com/apache/kafka/pull/12555#discussion_r960654203


##########
streams/src/main/java/org/apache/kafka/streams/kstream/internals/InternalStreamsBuilder.java:
##########
@@ -356,6 +381,95 @@ private void mergeDuplicateSourceNodes() {
         }
     }
 
+    /**
+     * If the join is a self-join, remove the node KStreamJoinWindow corresponding to the
+     */
+    @SuppressWarnings("unchecked")
+    private void rewriteSelfJoin(final GraphNode currentNode, final Set<GraphNode> visited) {
+        visited.add(currentNode);
+        if (currentNode instanceof StreamStreamJoinNode && isSelfJoin(currentNode)) {
+            ((StreamStreamJoinNode) currentNode).setSelfJoin();
+            // Remove JoinOtherWindowed node
+            final GraphNode parent = currentNode.parentNodes().stream().findFirst().get();
+            GraphNode left = null, right = null;
+            for (final GraphNode child: parent.children()) {

Review Comment:
   No, there can be more than two nodes. That's why I am doing a for loop and I am checking to find the nodes that I need to remove with `isStreamJoinWindowNode` 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] guozhangwang commented on a diff in pull request #12555: Optimize self-join

Posted by GitBox <gi...@apache.org>.
guozhangwang commented on code in PR #12555:
URL: https://github.com/apache/kafka/pull/12555#discussion_r959834484


##########
streams/src/main/java/org/apache/kafka/streams/kstream/internals/InternalStreamsBuilder.java:
##########
@@ -356,6 +362,95 @@ private void mergeDuplicateSourceNodes() {
         }
     }
 
+    /**
+     * If the join is a self-join, remove the node KStreamJoinWindow corresponding to the
+     */
+    @SuppressWarnings("unchecked")
+    private void rewriteSelfJoin(final GraphNode currentNode, final Set<GraphNode> visited) {
+        visited.add(currentNode);
+        if (currentNode instanceof StreamStreamJoinNode && isSelfJoin(currentNode)) {
+            ((StreamStreamJoinNode) currentNode).setSelfJoin();
+            // Remove JoinOtherWindowed node
+            final GraphNode parent = currentNode.parentNodes().stream().findFirst().get();
+            GraphNode left = null, right = null;
+            for (final GraphNode child: parent.children()) {
+                if (child instanceof  ProcessorGraphNode
+                    && isStreamJoinWindowNode((ProcessorGraphNode) child)) {
+                    if (left == null) {
+                        left = child;
+                    } else {
+                        right = child;
+                    }
+                }
+            }
+            // Sanity check
+            if (left != null && right != null && left.buildPriority() < right.buildPriority()) {
+                parent.removeChild(right);
+            }
+        }
+        for (final GraphNode child: currentNode.children()) {
+            if (!visited.contains(child)) {
+                rewriteSelfJoin(child, visited);
+            }
+        }
+    }
+
+    /**
+     * The self-join rewriting can be applied if:
+     * 1. The path from the StreamStreamJoinNode to the root contains a single source node.
+     * 2. The StreamStreamJoinNode has a single parent.
+     * 3. There are no other nodes besides the KStreamJoinWindow  that are siblings of the
+     * StreamStreamJoinNode and have smaller build priority.
+     */
+    private boolean isSelfJoin(final GraphNode streamJoinNode) {
+        final AtomicInteger count = new AtomicInteger();
+        countSourceNodes(count, streamJoinNode, new HashSet<>());
+        if (count.get() > 1) {
+            return false;
+        }
+        if (streamJoinNode.parentNodes().size() > 1) {
+            return false;
+        }
+        for (final GraphNode parent: streamJoinNode.parentNodes()) {
+            for (final GraphNode sibling : parent.children()) {
+                if (sibling instanceof ProcessorGraphNode) {
+                    if (isStreamJoinWindowNode((ProcessorGraphNode) sibling)) {
+                        continue;
+                    }
+                }
+                if (sibling != streamJoinNode
+                    && sibling.buildPriority() < streamJoinNode.buildPriority()) {
+                    return false;
+                }
+            }
+        }
+        return true;
+    }
+
+    private void countSourceNodes(
+        final AtomicInteger count,
+        final GraphNode currentNode,
+        final Set<GraphNode> visited) {
+
+        if (currentNode instanceof StreamSourceNode) {
+            count.incrementAndGet();
+        }
+
+        for (final GraphNode parent: currentNode.parentNodes()) {
+            if (!visited.contains(parent)) {
+                visited.add(parent);
+                countSourceNodes(count, parent, visited);
+            }
+        }
+    }
+
+    private boolean isStreamJoinWindowNode(final ProcessorGraphNode node) {
+        return node.processorParameters() != null

Review Comment:
   Hmm, does this mean for the above example, even if it's `stream2.join(stream2..)` we cannot apply the self-join optimization rule?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] guozhangwang commented on a diff in pull request #12555: Optimize self-join

Posted by GitBox <gi...@apache.org>.
guozhangwang commented on code in PR #12555:
URL: https://github.com/apache/kafka/pull/12555#discussion_r959844687


##########
streams/src/main/java/org/apache/kafka/streams/kstream/internals/InternalStreamsBuilder.java:
##########
@@ -270,16 +272,20 @@ private void maybeAddNodeForOptimizationMetadata(final GraphNode node) {
 
     // use this method for testing only
     public void buildAndOptimizeTopology() {
-        buildAndOptimizeTopology(false);
+        buildAndOptimizeTopology(false, false);
     }
 
-    public void buildAndOptimizeTopology(final boolean optimizeTopology) {
+    public void buildAndOptimizeTopology(
+        final boolean optimizeTopology, final boolean optimizeSelfJoin) {
 
         mergeDuplicateSourceNodes();
         if (optimizeTopology) {
             LOG.debug("Optimizing the Kafka Streams graph for repartition nodes");
             optimizeKTableSourceTopics();
             maybeOptimizeRepartitionOperations();
+            if (optimizeSelfJoin) {

Review Comment:
   Thanks @vpapavas , I think I start to understand some of your motivations here, but just to make sure I do, could we go over a list of examples below:
   
   1.
   ```
   stream1 = builder.stream("topic1");
   stream1.join(stream1)
   ```
   This case is optimizable, and just condition #2 alone should be sufficient to validate optimization to be applied.
   
   2.
   ```
   stream1 = builder.stream("topic1");
   stream2 = builder.stream("topic1"); // same topic
   stream1.join(stream2)
   ```
   This case is optimizable, but since from the logical plan we still have two graph nodes we cannot validate the optimization.
   
   3.
   ```
   stream1 = builder.stream("topic1");
   streams1.mapValues(v -> v);
   stream2 = builder.stream("topic1"); // same topic
   stream1.join(stream2)
   ```
   This case is optimizable, but it seems our validation would exclude it?
   
   4.
   ```
   stream1 = builder.stream("topic1");
   stream2 = builder.stream("topic2");
   stream3 = stream1.merge(stream2);
   stream3.join(stream3);
   ```
   This case should be optimizable, but it seems our validation would exclude it?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] guozhangwang commented on a diff in pull request #12555: Optimize self-join

Posted by GitBox <gi...@apache.org>.
guozhangwang commented on code in PR #12555:
URL: https://github.com/apache/kafka/pull/12555#discussion_r960983275


##########
streams/src/main/java/org/apache/kafka/streams/kstream/internals/InternalStreamsBuilder.java:
##########
@@ -270,16 +272,20 @@ private void maybeAddNodeForOptimizationMetadata(final GraphNode node) {
 
     // use this method for testing only
     public void buildAndOptimizeTopology() {
-        buildAndOptimizeTopology(false);
+        buildAndOptimizeTopology(false, false);
     }
 
-    public void buildAndOptimizeTopology(final boolean optimizeTopology) {
+    public void buildAndOptimizeTopology(
+        final boolean optimizeTopology, final boolean optimizeSelfJoin) {
 
         mergeDuplicateSourceNodes();
         if (optimizeTopology) {
             LOG.debug("Optimizing the Kafka Streams graph for repartition nodes");
             optimizeKTableSourceTopics();
             maybeOptimizeRepartitionOperations();
+            if (optimizeSelfJoin) {

Review Comment:
   Hmm.. for case 2), the logical plan would be like:
   
   ```
   root -> source1
   root -> source2
   source1 -> join-window1
   source2 -> join-window2
   source1,source2 -> stream-join
   ```
   
   Is that right?
   
   For case 3), since we are not re-assigning the result of `mapValues` to stream` it should just be a no-op right? The logical plan would be like:
   
   ```
   root -> source1
   root -> source2
   source1 -> join-window1
   source1 -> map-values1
   source2 -> join-window2
   source1,source2 -> stream-join
   ```
   
   Should that be considered optimizable as well?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] vpapavas commented on a diff in pull request #12555: Optimize self-join

Posted by GitBox <gi...@apache.org>.
vpapavas commented on code in PR #12555:
URL: https://github.com/apache/kafka/pull/12555#discussion_r961012010


##########
streams/src/main/java/org/apache/kafka/streams/kstream/internals/InternalStreamsBuilder.java:
##########
@@ -270,16 +272,20 @@ private void maybeAddNodeForOptimizationMetadata(final GraphNode node) {
 
     // use this method for testing only
     public void buildAndOptimizeTopology() {
-        buildAndOptimizeTopology(false);
+        buildAndOptimizeTopology(false, false);
     }
 
-    public void buildAndOptimizeTopology(final boolean optimizeTopology) {
+    public void buildAndOptimizeTopology(
+        final boolean optimizeTopology, final boolean optimizeSelfJoin) {
 
         mergeDuplicateSourceNodes();
         if (optimizeTopology) {
             LOG.debug("Optimizing the Kafka Streams graph for repartition nodes");
             optimizeKTableSourceTopics();
             maybeOptimizeRepartitionOperations();
+            if (optimizeSelfJoin) {

Review Comment:
   The logical plan will have only one source since they will get merged as they are referring to the same topic. Hence, all the nodes will be children of the source node and siblings of the join node. 
   For example,
   Case 2:
   ```
   root -> source
   source -> join-window1, join-window2, stream-join
   ```
   
   and
   Case 3:
   ```
   root -> source
   source -> join-window1, map-values1, join-window2, stream-join
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] vpapavas commented on a diff in pull request #12555: Optimize self-join

Posted by GitBox <gi...@apache.org>.
vpapavas commented on code in PR #12555:
URL: https://github.com/apache/kafka/pull/12555#discussion_r969391230


##########
streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java:
##########
@@ -663,6 +663,12 @@ public class StreamsConfig extends AbstractConfig {
     public static final String TOPOLOGY_OPTIMIZATION_CONFIG = "topology.optimization";
     private static final String TOPOLOGY_OPTIMIZATION_DOC = "A configuration telling Kafka Streams if it should optimize the topology, disabled by default";
 
+    public static final String SELF_JOIN_OPTIMIZATION_CONFIG = "self.join.optimization";

Review Comment:
   I  agree with the idea to keep them organized and I added the namespace but the configs seem quite long now, not sure if it's better. `topology.optimization.reuse.ktable.source.topics` ?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] vpapavas commented on a diff in pull request #12555: Optimize self-join

Posted by GitBox <gi...@apache.org>.
vpapavas commented on code in PR #12555:
URL: https://github.com/apache/kafka/pull/12555#discussion_r969358545


##########
streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java:
##########
@@ -1644,6 +1668,29 @@ private Map<String, Object> getClientCustomProps() {
         return props;
     }
 
+    public static List<String> verifyTopologyOptimizationConfigs(final String config) {
+        final List<String> acceptableConfigs = Arrays.asList(

Review Comment:
   Yes, I have been conflicted about this. I wanted to add it as a static variable as well but I am worried if someone else adds a new rewriting, they might miss it. Whereas in the method, they will for sure see it. WDYT?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org