You are viewing a plain text version of this content. The canonical link for it is here.
Posted to jira@kafka.apache.org by GitBox <gi...@apache.org> on 2022/09/13 01:57:02 UTC

[GitHub] [kafka] vvcephei commented on a diff in pull request #12555: Optimize self-join

vvcephei commented on code in PR #12555:
URL: https://github.com/apache/kafka/pull/12555#discussion_r956220721


##########
gradle.properties:
##########
@@ -20,7 +20,7 @@ group=org.apache.kafka
 #  - tests/kafkatest/__init__.py
 #  - tests/kafkatest/version.py (variable DEV_VERSION)
 #  - kafka-merge-pr.py
-version=3.3.0-SNAPSHOT
+version=3.3.0-VICKY2

Review Comment:
   TODO: we need to remove this from the PR.



##########
streams/src/main/java/org/apache/kafka/streams/kstream/internals/InternalStreamsBuilder.java:
##########
@@ -305,6 +307,36 @@ public void buildAndOptimizeTopology(final boolean optimizeTopology) {
         internalTopologyBuilder.validateCopartition();
     }
 
+    /**
+     * A user can provide either the config OPTIMIZE which means all optimizations rules will be
+     * applied or they can provide a list of optimization rules.
+     */
+    private void optimizeTopology(final Properties props) {
+        final List<String> optimizationConfigs;
+        if (props == null || !props.containsKey(StreamsConfig.TOPOLOGY_OPTIMIZATION_CONFIG)) {
+            optimizationConfigs = new ArrayList<>();
+            optimizationConfigs.add(StreamsConfig.NO_OPTIMIZATION);

Review Comment:
   could be `optimizationConfigs = Collections.singletonList`



##########
streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java:
##########
@@ -663,6 +663,12 @@ public class StreamsConfig extends AbstractConfig {
     public static final String TOPOLOGY_OPTIMIZATION_CONFIG = "topology.optimization";
     private static final String TOPOLOGY_OPTIMIZATION_DOC = "A configuration telling Kafka Streams if it should optimize the topology, disabled by default";
 
+    public static final String SELF_JOIN_OPTIMIZATION_CONFIG = "self.join.optimization";

Review Comment:
   Thanks for adding a separate config. I strongly feel this is the right approach for optimization flags.
   
   Can we make a config namespace convention to keep these things organized, like `topology.optimization.self.join`?



##########
streams/src/test/java/org/apache/kafka/streams/StreamsConfigTest.java:
##########
@@ -1263,6 +1263,37 @@ public void testInvalidSecurityProtocol() {
         assertTrue(ce.getMessage().contains(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG));
     }
 
+    @Test
+    public void shouldThrowExceptionWhenTopologyOptimizationOnAndOff() {

Review Comment:
   A more general prohibition would be to disallow OPTIMIZE and NO_OPTIMIZATION in conjunction with a comma at all.



##########
streams/src/test/java/org/apache/kafka/streams/StreamsConfigTest.java:
##########
@@ -1263,6 +1263,37 @@ public void testInvalidSecurityProtocol() {
         assertTrue(ce.getMessage().contains(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG));
     }
 
+    @Test
+    public void shouldThrowExceptionWhenTopologyOptimizationOnAndOff() {
+        final String value = String.join(",", StreamsConfig.OPTIMIZE, StreamsConfig.NO_OPTIMIZATION);
+        props.put(TOPOLOGY_OPTIMIZATION_CONFIG, value);
+        final ConfigException exception = assertThrows(ConfigException.class, () -> new StreamsConfig(props));
+        assertTrue(exception.getMessage().contains("A topology can either not be optimized with"));
+    }
+
+    @Test
+    public void shouldEnableSelfJoin() {
+        final String value = StreamsConfig.SELF_JOIN;
+        props.put(TOPOLOGY_OPTIMIZATION_CONFIG, value);
+        final StreamsConfig config = new StreamsConfig(props);
+        assertEquals(config.getString(TOPOLOGY_OPTIMIZATION_CONFIG), StreamsConfig.SELF_JOIN);
+    }
+
+    @Test
+    public void shouldMultipleOptimizations() {

Review Comment:
   I get what you mean, but "should multiple optimizations" isn't exactly a sensible statement :)
   
   By the way, we might want to add at least one more test that we get the right error if you try to include some extra garbage flag in the list.



##########
streams/src/main/java/org/apache/kafka/streams/kstream/internals/InternalStreamsBuilder.java:
##########
@@ -305,6 +307,36 @@ public void buildAndOptimizeTopology(final boolean optimizeTopology) {
         internalTopologyBuilder.validateCopartition();
     }
 
+    /**
+     * A user can provide either the config OPTIMIZE which means all optimizations rules will be
+     * applied or they can provide a list of optimization rules.
+     */
+    private void optimizeTopology(final Properties props) {
+        final List<String> optimizationConfigs;
+        if (props == null || !props.containsKey(StreamsConfig.TOPOLOGY_OPTIMIZATION_CONFIG)) {
+            optimizationConfigs = new ArrayList<>();
+            optimizationConfigs.add(StreamsConfig.NO_OPTIMIZATION);

Review Comment:
   Also, maybe we can just pack this logic into the `StreamsConfig.verifyTopologyOptimizationConfigs` method:
   1. if NO_OPTIMIZATION, return empty set
   2. else if OPTIMIZE, add all the optimization flags to the set
   3. else split on comma and add each configured flag to the set
   
   Then, the later logic can do a simple `contains` check.



##########
streams/src/test/java/org/apache/kafka/streams/integration/SelfJoinUpgradeIntegrationTest.java:
##########
@@ -0,0 +1,277 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.streams.integration;
+
+import static java.time.Duration.ofMinutes;
+import static java.util.Arrays.asList;
+import static java.util.Collections.singletonList;
+import static org.apache.kafka.streams.integration.utils.IntegrationTestUtils.safeUniqueTestName;
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.hamcrest.core.Is.is;
+
+import java.io.IOException;
+import java.time.Duration;
+import java.util.List;
+import java.util.Properties;
+import org.apache.kafka.clients.consumer.ConsumerConfig;
+import org.apache.kafka.common.serialization.Serdes;
+import org.apache.kafka.common.serialization.StringDeserializer;
+import org.apache.kafka.common.serialization.StringSerializer;
+import org.apache.kafka.streams.KafkaStreams;
+import org.apache.kafka.streams.KeyValue;
+import org.apache.kafka.streams.KeyValueTimestamp;
+import org.apache.kafka.streams.StreamsBuilder;
+import org.apache.kafka.streams.StreamsConfig;
+import org.apache.kafka.streams.integration.utils.EmbeddedKafkaCluster;
+import org.apache.kafka.streams.integration.utils.IntegrationTestUtils;
+import org.apache.kafka.streams.kstream.Consumed;
+import org.apache.kafka.streams.kstream.JoinWindows;
+import org.apache.kafka.streams.kstream.KStream;
+import org.apache.kafka.streams.kstream.Produced;
+import org.apache.kafka.streams.kstream.ValueJoiner;
+import org.apache.kafka.test.IntegrationTest;
+import org.apache.kafka.test.TestUtils;
+import org.junit.After;
+import org.junit.AfterClass;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+import org.junit.rules.TestName;
+
+@Category({IntegrationTest.class})
+public class SelfJoinUpgradeIntegrationTest {
+    public static final String INPUT_TOPIC = "selfjoin-input";
+    public static final String OUTPUT_TOPIC = "selfjoin-output";
+    private String inputTopic;
+    private String outputTopic;
+
+    private KafkaStreams kafkaStreams;
+
+    public static final EmbeddedKafkaCluster CLUSTER = new EmbeddedKafkaCluster(1);
+
+    @BeforeClass
+    public static void startCluster() throws IOException {
+        CLUSTER.start();
+    }
+
+    @AfterClass
+    public static void closeCluster() {
+        CLUSTER.stop();
+    }
+
+    @Rule
+    public TestName testName = new TestName();
+
+    @Before
+    public void createTopics() throws Exception {
+        inputTopic = INPUT_TOPIC + safeUniqueTestName(getClass(), testName);
+        outputTopic = OUTPUT_TOPIC + safeUniqueTestName(getClass(), testName);
+        CLUSTER.createTopic(inputTopic);
+        CLUSTER.createTopic(outputTopic);
+    }
+
+    private Properties props() {
+        final Properties streamsConfiguration = new Properties();
+        final String safeTestName = safeUniqueTestName(getClass(), testName);
+        streamsConfiguration.put(StreamsConfig.APPLICATION_ID_CONFIG, "app-" + safeTestName);
+        streamsConfiguration.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, CLUSTER.bootstrapServers());
+        streamsConfiguration.put(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG, 0);
+        streamsConfiguration.put(StreamsConfig.STATE_DIR_CONFIG, TestUtils.tempDirectory().getPath());
+        streamsConfiguration.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG,
+                                 Serdes.String().getClass());
+        streamsConfiguration.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG,
+                                 Serdes.String().getClass());
+        streamsConfiguration.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, 1000L);
+        streamsConfiguration.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
+        return streamsConfiguration;
+    }
+
+    @After
+    public void shutdown() {
+        if (kafkaStreams != null) {
+            kafkaStreams.close(Duration.ofSeconds(30L));
+            kafkaStreams.cleanUp();
+        }
+    }
+
+
+    @Test
+    @SuppressWarnings("unchecked")
+    public void shouldUpgradeWithTopologyOptimizationOff() throws Exception {
+
+        final StreamsBuilder streamsBuilderOld = new StreamsBuilder();
+        final KStream<String, String> leftOld = streamsBuilderOld.stream(
+            inputTopic, Consumed.with(Serdes.String(), Serdes.String()));
+        final ValueJoiner<String, String, String> valueJoiner = (v, v2) -> v + v2;
+        final KStream<String, String> joinedOld = leftOld.join(
+            leftOld,
+            valueJoiner,
+            JoinWindows.ofTimeDifferenceWithNoGrace(ofMinutes(100))
+        );
+        joinedOld.to(outputTopic, Produced.with(Serdes.String(), Serdes.String()));
+
+
+        final Properties props = props();
+        kafkaStreams = new KafkaStreams(streamsBuilderOld.build(), props);
+        kafkaStreams.start();
+
+        final long currentTime = CLUSTER.time.milliseconds();
+        processKeyValueAndVerifyCount("1", "A", currentTime + 42L, asList(
+            new KeyValueTimestamp<String, String>("1", "AA", currentTime + 42L)));
+
+        processKeyValueAndVerifyCount("1", "B", currentTime + 43L, asList(
+            new KeyValueTimestamp("1", "BA", currentTime + 43L),
+            new KeyValueTimestamp("1", "AB", currentTime + 43L),
+            new KeyValueTimestamp("1", "BB", currentTime + 43L)));
+
+
+        kafkaStreams.close();
+        kafkaStreams = null;
+
+        final StreamsBuilder streamsBuilderNew = new StreamsBuilder();
+
+        final KStream<String, String> leftNew = streamsBuilderNew.stream(
+            inputTopic, Consumed.with(Serdes.String(), Serdes.String()));
+        final KStream<String, String> joinedNew = leftNew.join(
+            leftNew,
+            valueJoiner,
+            JoinWindows.ofTimeDifferenceWithNoGrace(ofMinutes(100))
+        );
+        joinedNew.to(outputTopic, Produced.with(Serdes.String(), Serdes.String()));

Review Comment:
   This is the same as the first StreamsBuilder, right? I don't think you need a new one for the new instance.



##########
streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamImplJoin.java:
##########
@@ -181,13 +181,22 @@ public <K, V1, V2, VOut> KStream<K, VOut> join(final KStream<K, V1> lhs,
             sharedTimeTracker
         );
 
+        final KStreamKStreamSelfJoin<K, V1, V2, VOut> selfJoin = new KStreamKStreamSelfJoin<>(
+            thisWindowStore.name(),
+            internalWindows,
+            joiner,
+            AbstractStream.reverseJoinerWithKey(joiner),
+            sharedTimeTracker
+        );

Review Comment:
   Interesting... So, we're unconditionally building this node in case it turns out to be an optimizable self join later?



##########
streams/src/main/java/org/apache/kafka/streams/kstream/internals/InternalStreamsBuilder.java:
##########
@@ -270,16 +272,20 @@ private void maybeAddNodeForOptimizationMetadata(final GraphNode node) {
 
     // use this method for testing only
     public void buildAndOptimizeTopology() {
-        buildAndOptimizeTopology(false);
+        buildAndOptimizeTopology(false, false);
     }
 
-    public void buildAndOptimizeTopology(final boolean optimizeTopology) {
+    public void buildAndOptimizeTopology(
+        final boolean optimizeTopology, final boolean optimizeSelfJoin) {
 
         mergeDuplicateSourceNodes();
         if (optimizeTopology) {
             LOG.debug("Optimizing the Kafka Streams graph for repartition nodes");
             optimizeKTableSourceTopics();
             maybeOptimizeRepartitionOperations();
+            if (optimizeSelfJoin) {

Review Comment:
   Do you want to couple this to the top-level optimization flag? I'm ok with it, but I think it would be less confusing if they were just independent. Otherwise, you can't enable this optimization without enabling the other ones as well.



##########
streams/src/main/java/org/apache/kafka/streams/kstream/internals/StreamStreamJoinProcessor.java:
##########
@@ -0,0 +1,55 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.streams.kstream.internals;
+
+import org.apache.kafka.common.metrics.Sensor;
+import org.apache.kafka.streams.processor.api.ContextualProcessor;
+import org.apache.kafka.streams.processor.api.Record;
+import org.apache.kafka.streams.processor.api.RecordMetadata;
+import org.slf4j.Logger;
+
+public abstract class StreamStreamJoinProcessor<KIn, VIn, KOut, VOut> extends ContextualProcessor<KIn, VIn, KOut, VOut> {
+
+    protected boolean skipRecord(

Review Comment:
   We don't need an inheritance hierarchy just to add a utility method. Can we instead make this an uninstantiable util class with a static version of this method?



##########
streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamJoin.java:
##########
@@ -122,26 +120,8 @@ public void init(final ProcessorContext<K, VOut> context) {
         @SuppressWarnings("unchecked")
         @Override
         public void process(final Record<K, V1> record) {
-            // we do join iff keys are equal, thus, if key is null we cannot join and just ignore the record
-            //
-            // we also ignore the record if value is null, because in a key-value data model a null-value indicates
-            // an empty message (ie, there is nothing to be joined) -- this contrast SQL NULL semantics
-            // furthermore, on left/outer joins 'null' in ValueJoiner#apply() indicates a missing record --
-            // thus, to be consistent and to avoid ambiguous null semantics, null values are ignored
-            if (record.key() == null || record.value() == null) {
-                if (context().recordMetadata().isPresent()) {
-                    final RecordMetadata recordMetadata = context().recordMetadata().get();
-                    LOG.warn(
-                        "Skipping record due to null key or value. "
-                            + "topic=[{}] partition=[{}] offset=[{}]",
-                        recordMetadata.topic(), recordMetadata.partition(), recordMetadata.offset()
-                    );
-                } else {
-                    LOG.warn(
-                        "Skipping record due to null key or value. Topic, partition, and offset not known."
-                    );
-                }
-                droppedRecordsSensor.record();
+            System.out.println("---> IsLeft: " + isLeftSide + ".Processing record: " + record);

Review Comment:
   Obviously, we'll need to make a pass to remove stuff like this before merging.



##########
streams/src/test/java/org/apache/kafka/streams/integration/SelfJoinUpgradeIntegrationTest.java:
##########
@@ -0,0 +1,277 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.streams.integration;
+
+import static java.time.Duration.ofMinutes;
+import static java.util.Arrays.asList;
+import static java.util.Collections.singletonList;
+import static org.apache.kafka.streams.integration.utils.IntegrationTestUtils.safeUniqueTestName;
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.hamcrest.core.Is.is;
+
+import java.io.IOException;
+import java.time.Duration;
+import java.util.List;
+import java.util.Properties;
+import org.apache.kafka.clients.consumer.ConsumerConfig;
+import org.apache.kafka.common.serialization.Serdes;
+import org.apache.kafka.common.serialization.StringDeserializer;
+import org.apache.kafka.common.serialization.StringSerializer;
+import org.apache.kafka.streams.KafkaStreams;
+import org.apache.kafka.streams.KeyValue;
+import org.apache.kafka.streams.KeyValueTimestamp;
+import org.apache.kafka.streams.StreamsBuilder;
+import org.apache.kafka.streams.StreamsConfig;
+import org.apache.kafka.streams.integration.utils.EmbeddedKafkaCluster;
+import org.apache.kafka.streams.integration.utils.IntegrationTestUtils;
+import org.apache.kafka.streams.kstream.Consumed;
+import org.apache.kafka.streams.kstream.JoinWindows;
+import org.apache.kafka.streams.kstream.KStream;
+import org.apache.kafka.streams.kstream.Produced;
+import org.apache.kafka.streams.kstream.ValueJoiner;
+import org.apache.kafka.test.IntegrationTest;
+import org.apache.kafka.test.TestUtils;
+import org.junit.After;
+import org.junit.AfterClass;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+import org.junit.rules.TestName;
+
+@Category({IntegrationTest.class})
+public class SelfJoinUpgradeIntegrationTest {
+    public static final String INPUT_TOPIC = "selfjoin-input";
+    public static final String OUTPUT_TOPIC = "selfjoin-output";
+    private String inputTopic;
+    private String outputTopic;
+
+    private KafkaStreams kafkaStreams;
+
+    public static final EmbeddedKafkaCluster CLUSTER = new EmbeddedKafkaCluster(1);
+
+    @BeforeClass
+    public static void startCluster() throws IOException {
+        CLUSTER.start();
+    }
+
+    @AfterClass
+    public static void closeCluster() {
+        CLUSTER.stop();
+    }
+
+    @Rule
+    public TestName testName = new TestName();
+
+    @Before
+    public void createTopics() throws Exception {
+        inputTopic = INPUT_TOPIC + safeUniqueTestName(getClass(), testName);
+        outputTopic = OUTPUT_TOPIC + safeUniqueTestName(getClass(), testName);
+        CLUSTER.createTopic(inputTopic);
+        CLUSTER.createTopic(outputTopic);
+    }
+
+    private Properties props() {
+        final Properties streamsConfiguration = new Properties();
+        final String safeTestName = safeUniqueTestName(getClass(), testName);
+        streamsConfiguration.put(StreamsConfig.APPLICATION_ID_CONFIG, "app-" + safeTestName);
+        streamsConfiguration.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, CLUSTER.bootstrapServers());
+        streamsConfiguration.put(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG, 0);
+        streamsConfiguration.put(StreamsConfig.STATE_DIR_CONFIG, TestUtils.tempDirectory().getPath());
+        streamsConfiguration.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG,
+                                 Serdes.String().getClass());
+        streamsConfiguration.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG,
+                                 Serdes.String().getClass());
+        streamsConfiguration.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, 1000L);
+        streamsConfiguration.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
+        return streamsConfiguration;
+    }
+
+    @After
+    public void shutdown() {
+        if (kafkaStreams != null) {
+            kafkaStreams.close(Duration.ofSeconds(30L));
+            kafkaStreams.cleanUp();
+        }
+    }
+
+
+    @Test
+    @SuppressWarnings("unchecked")
+    public void shouldUpgradeWithTopologyOptimizationOff() throws Exception {
+
+        final StreamsBuilder streamsBuilderOld = new StreamsBuilder();
+        final KStream<String, String> leftOld = streamsBuilderOld.stream(
+            inputTopic, Consumed.with(Serdes.String(), Serdes.String()));
+        final ValueJoiner<String, String, String> valueJoiner = (v, v2) -> v + v2;
+        final KStream<String, String> joinedOld = leftOld.join(
+            leftOld,
+            valueJoiner,
+            JoinWindows.ofTimeDifferenceWithNoGrace(ofMinutes(100))
+        );
+        joinedOld.to(outputTopic, Produced.with(Serdes.String(), Serdes.String()));
+
+
+        final Properties props = props();
+        kafkaStreams = new KafkaStreams(streamsBuilderOld.build(), props);
+        kafkaStreams.start();
+
+        final long currentTime = CLUSTER.time.milliseconds();
+        processKeyValueAndVerifyCount("1", "A", currentTime + 42L, asList(
+            new KeyValueTimestamp<String, String>("1", "AA", currentTime + 42L)));
+
+        processKeyValueAndVerifyCount("1", "B", currentTime + 43L, asList(
+            new KeyValueTimestamp("1", "BA", currentTime + 43L),
+            new KeyValueTimestamp("1", "AB", currentTime + 43L),
+            new KeyValueTimestamp("1", "BB", currentTime + 43L)));
+
+
+        kafkaStreams.close();
+        kafkaStreams = null;
+
+        final StreamsBuilder streamsBuilderNew = new StreamsBuilder();
+
+        final KStream<String, String> leftNew = streamsBuilderNew.stream(
+            inputTopic, Consumed.with(Serdes.String(), Serdes.String()));
+        final KStream<String, String> joinedNew = leftNew.join(
+            leftNew,
+            valueJoiner,
+            JoinWindows.ofTimeDifferenceWithNoGrace(ofMinutes(100))
+        );
+        joinedNew.to(outputTopic, Produced.with(Serdes.String(), Serdes.String()));
+
+        props.put(StreamsConfig.TOPOLOGY_OPTIMIZATION_CONFIG, StreamsConfig.OPTIMIZE);
+        kafkaStreams = new KafkaStreams(streamsBuilderNew.build(), props);
+        kafkaStreams.start();
+
+        final long currentTimeNew = CLUSTER.time.milliseconds();
+
+        processKeyValueAndVerifyCount("1", "C", currentTimeNew + 44L, asList(
+            new KeyValueTimestamp("1", "CA", currentTimeNew + 44L),
+            new KeyValueTimestamp("1", "CB", currentTimeNew + 44L),
+            new KeyValueTimestamp("1", "AC", currentTimeNew + 44L),
+            new KeyValueTimestamp("1", "BC", currentTimeNew + 44L),
+            new KeyValueTimestamp("1", "CC", currentTimeNew + 44L)));
+
+
+        kafkaStreams.close();
+    }
+
+    @Test
+    @SuppressWarnings("unchecked")
+    public void shouldUpgradeWithTopologyOptimizationOn() throws Exception {

Review Comment:
   What's the upgrade here? Is it just a restart test?



##########
streams/src/test/java/org/apache/kafka/streams/integration/SelfJoinUpgradeIntegrationTest.java:
##########
@@ -0,0 +1,277 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.streams.integration;
+
+import static java.time.Duration.ofMinutes;
+import static java.util.Arrays.asList;
+import static java.util.Collections.singletonList;
+import static org.apache.kafka.streams.integration.utils.IntegrationTestUtils.safeUniqueTestName;
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.hamcrest.core.Is.is;
+
+import java.io.IOException;
+import java.time.Duration;
+import java.util.List;
+import java.util.Properties;
+import org.apache.kafka.clients.consumer.ConsumerConfig;
+import org.apache.kafka.common.serialization.Serdes;
+import org.apache.kafka.common.serialization.StringDeserializer;
+import org.apache.kafka.common.serialization.StringSerializer;
+import org.apache.kafka.streams.KafkaStreams;
+import org.apache.kafka.streams.KeyValue;
+import org.apache.kafka.streams.KeyValueTimestamp;
+import org.apache.kafka.streams.StreamsBuilder;
+import org.apache.kafka.streams.StreamsConfig;
+import org.apache.kafka.streams.integration.utils.EmbeddedKafkaCluster;
+import org.apache.kafka.streams.integration.utils.IntegrationTestUtils;
+import org.apache.kafka.streams.kstream.Consumed;
+import org.apache.kafka.streams.kstream.JoinWindows;
+import org.apache.kafka.streams.kstream.KStream;
+import org.apache.kafka.streams.kstream.Produced;
+import org.apache.kafka.streams.kstream.ValueJoiner;
+import org.apache.kafka.test.IntegrationTest;
+import org.apache.kafka.test.TestUtils;
+import org.junit.After;
+import org.junit.AfterClass;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+import org.junit.rules.TestName;
+
+@Category({IntegrationTest.class})
+public class SelfJoinUpgradeIntegrationTest {
+    public static final String INPUT_TOPIC = "selfjoin-input";
+    public static final String OUTPUT_TOPIC = "selfjoin-output";
+    private String inputTopic;
+    private String outputTopic;
+
+    private KafkaStreams kafkaStreams;
+
+    public static final EmbeddedKafkaCluster CLUSTER = new EmbeddedKafkaCluster(1);
+
+    @BeforeClass
+    public static void startCluster() throws IOException {
+        CLUSTER.start();
+    }
+
+    @AfterClass
+    public static void closeCluster() {
+        CLUSTER.stop();
+    }
+
+    @Rule
+    public TestName testName = new TestName();
+
+    @Before
+    public void createTopics() throws Exception {
+        inputTopic = INPUT_TOPIC + safeUniqueTestName(getClass(), testName);
+        outputTopic = OUTPUT_TOPIC + safeUniqueTestName(getClass(), testName);
+        CLUSTER.createTopic(inputTopic);
+        CLUSTER.createTopic(outputTopic);
+    }
+
+    private Properties props() {
+        final Properties streamsConfiguration = new Properties();
+        final String safeTestName = safeUniqueTestName(getClass(), testName);
+        streamsConfiguration.put(StreamsConfig.APPLICATION_ID_CONFIG, "app-" + safeTestName);
+        streamsConfiguration.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, CLUSTER.bootstrapServers());
+        streamsConfiguration.put(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG, 0);
+        streamsConfiguration.put(StreamsConfig.STATE_DIR_CONFIG, TestUtils.tempDirectory().getPath());
+        streamsConfiguration.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG,
+                                 Serdes.String().getClass());
+        streamsConfiguration.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG,
+                                 Serdes.String().getClass());
+        streamsConfiguration.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, 1000L);
+        streamsConfiguration.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
+        return streamsConfiguration;
+    }
+
+    @After
+    public void shutdown() {
+        if (kafkaStreams != null) {
+            kafkaStreams.close(Duration.ofSeconds(30L));
+            kafkaStreams.cleanUp();
+        }
+    }
+
+
+    @Test
+    @SuppressWarnings("unchecked")
+    public void shouldUpgradeWithTopologyOptimizationOff() throws Exception {
+
+        final StreamsBuilder streamsBuilderOld = new StreamsBuilder();
+        final KStream<String, String> leftOld = streamsBuilderOld.stream(
+            inputTopic, Consumed.with(Serdes.String(), Serdes.String()));
+        final ValueJoiner<String, String, String> valueJoiner = (v, v2) -> v + v2;
+        final KStream<String, String> joinedOld = leftOld.join(
+            leftOld,
+            valueJoiner,
+            JoinWindows.ofTimeDifferenceWithNoGrace(ofMinutes(100))
+        );
+        joinedOld.to(outputTopic, Produced.with(Serdes.String(), Serdes.String()));
+
+
+        final Properties props = props();
+        kafkaStreams = new KafkaStreams(streamsBuilderOld.build(), props);

Review Comment:
   It would be more clear if we explicitly set NO_OPTIMIZATION instead of setting nothing here.



##########
streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java:
##########
@@ -661,7 +682,9 @@ public class StreamsConfig extends AbstractConfig {
 
     /** {@code topology.optimization} */
     public static final String TOPOLOGY_OPTIMIZATION_CONFIG = "topology.optimization";
-    private static final String TOPOLOGY_OPTIMIZATION_DOC = "A configuration telling Kafka Streams if it should optimize the topology, disabled by default";
+    private static final String TOPOLOGY_OPTIMIZATION_DOC = "A configuration telling Kafka "
+        + "Streams if it should optimize the topology and what optimizations to apply in a comma "
+        + "separated list. Disabled by default";

Review Comment:
   We should enumerate the options here, like `"Acceptable values are: "+NO_OPTIMIZATION+", "+OPTIMIZE+", or a comma separated list of specific optimizations ("+REUSE_KTABLE_SOURCE_TOPICS+", "+MERGE_REPARTITION_TOPICS+", or "+SELF_JOIN+")"`.
   
   Or something like that.



##########
streams/src/main/java/org/apache/kafka/streams/kstream/internals/InternalStreamsBuilder.java:
##########
@@ -356,6 +381,95 @@ private void mergeDuplicateSourceNodes() {
         }
     }
 
+    /**
+     * If the join is a self-join, remove the node KStreamJoinWindow corresponding to the
+     */
+    @SuppressWarnings("unchecked")
+    private void rewriteSelfJoin(final GraphNode currentNode, final Set<GraphNode> visited) {
+        visited.add(currentNode);
+        if (currentNode instanceof StreamStreamJoinNode && isSelfJoin(currentNode)) {
+            ((StreamStreamJoinNode) currentNode).setSelfJoin();
+            // Remove JoinOtherWindowed node
+            final GraphNode parent = currentNode.parentNodes().stream().findFirst().get();
+            GraphNode left = null, right = null;
+            for (final GraphNode child: parent.children()) {

Review Comment:
   I'm also confused. Maybe you can expand the comment to explain more.
   
   I think what's happening here is that the parent might have other children unrelated to the join, but with respect to the join, there are exactly two graph nodes we care about: the JoinThis and JoinOther. Then, the purpose of this loop is to iterate over the children and look for those two nodes.
   
   Did I get that right?
   
   Are we going to get confused if the source is _also_ involved in another join (independent of the one we're optimizing)? If so, we might need some extra metadata to track the "identity" of the join nodes.



##########
streams/src/test/java/org/apache/kafka/streams/integration/SelfJoinUpgradeIntegrationTest.java:
##########
@@ -0,0 +1,277 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.streams.integration;
+
+import static java.time.Duration.ofMinutes;
+import static java.util.Arrays.asList;
+import static java.util.Collections.singletonList;
+import static org.apache.kafka.streams.integration.utils.IntegrationTestUtils.safeUniqueTestName;
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.hamcrest.core.Is.is;
+
+import java.io.IOException;
+import java.time.Duration;
+import java.util.List;
+import java.util.Properties;
+import org.apache.kafka.clients.consumer.ConsumerConfig;
+import org.apache.kafka.common.serialization.Serdes;
+import org.apache.kafka.common.serialization.StringDeserializer;
+import org.apache.kafka.common.serialization.StringSerializer;
+import org.apache.kafka.streams.KafkaStreams;
+import org.apache.kafka.streams.KeyValue;
+import org.apache.kafka.streams.KeyValueTimestamp;
+import org.apache.kafka.streams.StreamsBuilder;
+import org.apache.kafka.streams.StreamsConfig;
+import org.apache.kafka.streams.integration.utils.EmbeddedKafkaCluster;
+import org.apache.kafka.streams.integration.utils.IntegrationTestUtils;
+import org.apache.kafka.streams.kstream.Consumed;
+import org.apache.kafka.streams.kstream.JoinWindows;
+import org.apache.kafka.streams.kstream.KStream;
+import org.apache.kafka.streams.kstream.Produced;
+import org.apache.kafka.streams.kstream.ValueJoiner;
+import org.apache.kafka.test.IntegrationTest;
+import org.apache.kafka.test.TestUtils;
+import org.junit.After;
+import org.junit.AfterClass;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+import org.junit.rules.TestName;
+
+@Category({IntegrationTest.class})
+public class SelfJoinUpgradeIntegrationTest {
+    public static final String INPUT_TOPIC = "selfjoin-input";
+    public static final String OUTPUT_TOPIC = "selfjoin-output";
+    private String inputTopic;
+    private String outputTopic;
+
+    private KafkaStreams kafkaStreams;
+
+    public static final EmbeddedKafkaCluster CLUSTER = new EmbeddedKafkaCluster(1);
+
+    @BeforeClass
+    public static void startCluster() throws IOException {
+        CLUSTER.start();
+    }
+
+    @AfterClass
+    public static void closeCluster() {
+        CLUSTER.stop();
+    }
+
+    @Rule
+    public TestName testName = new TestName();
+
+    @Before
+    public void createTopics() throws Exception {
+        inputTopic = INPUT_TOPIC + safeUniqueTestName(getClass(), testName);
+        outputTopic = OUTPUT_TOPIC + safeUniqueTestName(getClass(), testName);

Review Comment:
   probably better to just call this method once.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org