You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by mj...@apache.org on 2022/03/15 17:33:46 UTC

[kafka] branch 3.1 updated: KAFKA-13721: asymetric join-winodws should not emit spurious left/outer join results (#11875)

This is an automated email from the ASF dual-hosted git repository.

mjsax pushed a commit to branch 3.1
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/3.1 by this push:
     new fe52a95  KAFKA-13721: asymetric join-winodws should not emit spurious left/outer join results (#11875)
fe52a95 is described below

commit fe52a95f50b17713b80a27e5f4d33b3fb18d6101
Author: Matthias J. Sax <ma...@confluent.io>
AuthorDate: Tue Mar 15 09:37:01 2022 -0700

    KAFKA-13721: asymetric join-winodws should not emit spurious left/outer join results (#11875)
    
    Reviewers:  Sergio Peña <se...@confluent.io>, Guozhang Wang <gu...@confluent.io>
---
 .../kstream/internals/KStreamKStreamJoin.java      |   5 +-
 .../kstream/internals/KStreamKStreamJoinTest.java  |  11 +-
 .../internals/KStreamKStreamLeftJoinTest.java      |  99 ++++++++--
 .../internals/KStreamKStreamOuterJoinTest.java     | 207 +++++++++++++++++++--
 4 files changed, 287 insertions(+), 35 deletions(-)

diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamJoin.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamJoin.java
index 305cb38..4d9ac2e 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamJoin.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamJoin.java
@@ -51,6 +51,7 @@ class KStreamKStreamJoin<K, V1, V2, VOut> implements ProcessorSupplier<K, V1, K,
     private final long joinAfterMs;
     private final long joinGraceMs;
     private final boolean enableSpuriousResultFix;
+    private final long joinSpuriousLookBackTimeMs;
 
     private final boolean outer;
     private final boolean isLeftSide;
@@ -71,9 +72,11 @@ class KStreamKStreamJoin<K, V1, V2, VOut> implements ProcessorSupplier<K, V1, K,
         if (isLeftSide) {
             this.joinBeforeMs = windows.beforeMs;
             this.joinAfterMs = windows.afterMs;
+            this.joinSpuriousLookBackTimeMs = windows.beforeMs;
         } else {
             this.joinBeforeMs = windows.afterMs;
             this.joinAfterMs = windows.beforeMs;
+            this.joinSpuriousLookBackTimeMs = windows.afterMs;
         }
         this.joinGraceMs = windows.gracePeriodMs();
         this.enableSpuriousResultFix = windows.spuriousResultFixEnabled();
@@ -211,7 +214,7 @@ class KStreamKStreamJoin<K, V1, V2, VOut> implements ProcessorSupplier<K, V1, K,
             // to reduce runtime cost, we try to avoid paying those cost
 
             // only try to emit left/outer join results if there _might_ be any result records
-            if (sharedTimeTracker.minTime >= sharedTimeTracker.streamTime - joinAfterMs - joinGraceMs) {
+            if (sharedTimeTracker.minTime >= sharedTimeTracker.streamTime - joinSpuriousLookBackTimeMs - joinGraceMs) {
                 return;
             }
             // throttle the emit frequency to a (configurable) interval;
diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamJoinTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamJoinTest.java
index 08b0e93..ea4eaae 100644
--- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamJoinTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamJoinTest.java
@@ -105,7 +105,6 @@ public class KStreamKStreamJoinTest {
         }
     }
 
-
     @Test
     public void shouldReuseRepartitionTopicWithGeneratedName() {
         final StreamsBuilder builder = new StreamsBuilder();
@@ -139,8 +138,7 @@ public class KStreamKStreamJoinTest {
 
     @Test
     public void shouldDisableLoggingOnStreamJoined() {
-
-        final JoinWindows joinWindows = JoinWindows.of(ofMillis(100)).grace(Duration.ofMillis(50));
+        final JoinWindows joinWindows = JoinWindows.ofTimeDifferenceAndGrace(ofMillis(100), Duration.ofMillis(50));
         final StreamJoined<String, Integer, Integer> streamJoined = StreamJoined
             .with(Serdes.String(), Serdes.Integer(), Serdes.Integer())
             .withStoreName("store")
@@ -166,8 +164,7 @@ public class KStreamKStreamJoinTest {
 
     @Test
     public void shouldEnableLoggingWithCustomConfigOnStreamJoined() {
-
-        final JoinWindows joinWindows = JoinWindows.of(ofMillis(100)).grace(Duration.ofMillis(50));
+        final JoinWindows joinWindows = JoinWindows.ofTimeDifferenceAndGrace(ofMillis(100), Duration.ofMillis(50));
         final StreamJoined<String, Integer, Integer> streamJoined = StreamJoined
             .with(Serdes.String(), Serdes.Integer(), Serdes.Integer())
             .withStoreName("store")
@@ -1199,7 +1196,7 @@ public class KStreamKStreamJoinTest {
         joined = stream1.join(
             stream2,
             MockValueJoiner.TOSTRING_JOINER,
-            JoinWindows.of(ofMillis(0)).after(ofMillis(100)).grace(ofMillis(0)),
+            JoinWindows.ofTimeDifferenceWithNoGrace(ofMillis(0)).after(ofMillis(100)),
             StreamJoined.with(Serdes.Integer(),
                 Serdes.String(),
                 Serdes.String())
@@ -1868,4 +1865,4 @@ public class KStreamKStreamJoinTest {
             "      <-- KSTREAM-MERGE-0000000011\n" +
             "    Sink: KSTREAM-SINK-0000000021 (topic: out-to)\n" +
             "      <-- KSTREAM-MERGE-0000000020\n\n";
-}
\ No newline at end of file
+}
diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamLeftJoinTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamLeftJoinTest.java
index 4e2b6d8..55aa7c4 100644
--- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamLeftJoinTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamLeftJoinTest.java
@@ -21,6 +21,7 @@ import org.apache.kafka.common.serialization.Serdes;
 import org.apache.kafka.common.serialization.StringSerializer;
 import org.apache.kafka.streams.KeyValueTimestamp;
 import org.apache.kafka.streams.StreamsBuilder;
+import org.apache.kafka.streams.StreamsConfig.InternalConfig;
 import org.apache.kafka.streams.TopologyTestDriver;
 import org.apache.kafka.streams.TopologyWrapper;
 import org.apache.kafka.streams.kstream.Consumed;
@@ -34,6 +35,7 @@ import org.apache.kafka.test.MockProcessor;
 import org.apache.kafka.test.MockProcessorSupplier;
 import org.apache.kafka.test.MockValueJoiner;
 import org.apache.kafka.test.StreamsTestUtils;
+import org.junit.BeforeClass;
 import org.junit.Test;
 
 import java.time.Duration;
@@ -44,6 +46,7 @@ import java.util.HashSet;
 import java.util.Properties;
 import java.util.Set;
 
+import static java.time.Duration.ZERO;
 import static java.time.Duration.ofMillis;
 import static org.junit.Assert.assertEquals;
 
@@ -55,8 +58,14 @@ public class KStreamKStreamLeftJoinTest {
     private final String topic1 = "topic1";
     private final String topic2 = "topic2";
     private final Consumed<Integer, String> consumed = Consumed.with(Serdes.Integer(), Serdes.String());
-    private final Properties props = StreamsTestUtils.getStreamsConfig(Serdes.String(), Serdes.String());
+    private final static Properties PROPS = StreamsTestUtils.getStreamsConfig(Serdes.String(), Serdes.String());
 
+    @BeforeClass
+    public static void beforeClass() {
+        PROPS.put(InternalConfig.EMIT_INTERVAL_MS_KSTREAMS_OUTER_JOIN_SPURIOUS_RESULTS_FIX, 0L);
+    }
+
+    @SuppressWarnings("deprecation") // old join semantics; can be removed when `JoinWindows.of()` is removed
     @Test
     public void testLeftJoinWithSpuriousResultFixDisabledOldApi() {
         final StreamsBuilder builder = new StreamsBuilder();
@@ -78,7 +87,7 @@ public class KStreamKStreamLeftJoinTest {
         );
         joined.process(supplier);
 
-        try (final TopologyTestDriver driver = new TopologyTestDriver(builder.build(props), props)) {
+        try (final TopologyTestDriver driver = new TopologyTestDriver(builder.build(PROPS), PROPS)) {
             final TestInputTopic<Integer, String> inputTopic1 =
                     driver.createInputTopic(topic1, new IntegerSerializer(), new StringSerializer(), Instant.ofEpochMilli(0L), Duration.ZERO);
             final TestInputTopic<Integer, String> inputTopic2 =
@@ -116,6 +125,7 @@ public class KStreamKStreamLeftJoinTest {
         }
     }
 
+    @SuppressWarnings("deprecation") // old join semantics; can be removed when `JoinWindows.of()` is removed
     @Test
     public void testLeftJoinDuplicatesWithSpuriousResultFixDisabledOldApi() {
         final StreamsBuilder builder = new StreamsBuilder();
@@ -135,7 +145,7 @@ public class KStreamKStreamLeftJoinTest {
         );
         joined.process(supplier);
 
-        try (final TopologyTestDriver driver = new TopologyTestDriver(builder.build(props), props)) {
+        try (final TopologyTestDriver driver = new TopologyTestDriver(builder.build(PROPS), PROPS)) {
             final TestInputTopic<Integer, String> inputTopic1 =
                     driver.createInputTopic(topic1, new IntegerSerializer(), new StringSerializer(), Instant.ofEpochMilli(0L), Duration.ZERO);
             final TestInputTopic<Integer, String> inputTopic2 =
@@ -177,7 +187,7 @@ public class KStreamKStreamLeftJoinTest {
         );
         joined.process(supplier);
 
-        try (final TopologyTestDriver driver = new TopologyTestDriver(builder.build(), props)) {
+        try (final TopologyTestDriver driver = new TopologyTestDriver(builder.build(), PROPS)) {
             final TestInputTopic<Integer, String> inputTopic1 =
                 driver.createInputTopic(topic1, new IntegerSerializer(), new StringSerializer(), Instant.ofEpochMilli(0L), Duration.ZERO);
             final TestInputTopic<Integer, String> inputTopic2 =
@@ -233,7 +243,7 @@ public class KStreamKStreamLeftJoinTest {
         );
         joined.process(supplier);
 
-        try (final TopologyTestDriver driver = new TopologyTestDriver(builder.build(), props)) {
+        try (final TopologyTestDriver driver = new TopologyTestDriver(builder.build(), PROPS)) {
             final TestInputTopic<Integer, String> inputTopic1 =
                 driver.createInputTopic(topic1, new IntegerSerializer(), new StringSerializer(), Instant.ofEpochMilli(0L), Duration.ZERO);
             final TestInputTopic<Integer, String> inputTopic2 =
@@ -288,7 +298,7 @@ public class KStreamKStreamLeftJoinTest {
         );
         joined.process(supplier);
 
-        try (final TopologyTestDriver driver = new TopologyTestDriver(builder.build(), props)) {
+        try (final TopologyTestDriver driver = new TopologyTestDriver(builder.build(), PROPS)) {
             final TestInputTopic<Integer, String> inputTopic1 =
                 driver.createInputTopic(topic1, new IntegerSerializer(), new StringSerializer(), Instant.ofEpochMilli(0L), Duration.ZERO);
             final TestInputTopic<Integer, String> inputTopic2 =
@@ -343,7 +353,7 @@ public class KStreamKStreamLeftJoinTest {
         );
         joined.process(supplier);
 
-        try (final TopologyTestDriver driver = new TopologyTestDriver(builder.build(), props)) {
+        try (final TopologyTestDriver driver = new TopologyTestDriver(builder.build(), PROPS)) {
             final TestInputTopic<Integer, String> inputTopic1 =
                 driver.createInputTopic(topic1, new IntegerSerializer(), new StringSerializer(), Instant.ofEpochMilli(0L), Duration.ZERO);
             final TestInputTopic<Integer, String> inputTopic2 =
@@ -395,7 +405,7 @@ public class KStreamKStreamLeftJoinTest {
         );
         joined.process(supplier);
 
-        try (final TopologyTestDriver driver = new TopologyTestDriver(builder.build(), props)) {
+        try (final TopologyTestDriver driver = new TopologyTestDriver(builder.build(), PROPS)) {
             final TestInputTopic<Integer, String> inputTopic1 =
                 driver.createInputTopic(topic1, new IntegerSerializer(), new StringSerializer(), Instant.ofEpochMilli(0L), Duration.ZERO);
             final TestInputTopic<Integer, String> inputTopic2 =
@@ -480,7 +490,7 @@ public class KStreamKStreamLeftJoinTest {
         assertEquals(1, copartitionGroups.size());
         assertEquals(new HashSet<>(Arrays.asList(topic1, topic2)), copartitionGroups.iterator().next());
 
-        try (final TopologyTestDriver driver = new TopologyTestDriver(builder.build(), props)) {
+        try (final TopologyTestDriver driver = new TopologyTestDriver(builder.build(), PROPS)) {
             final TestInputTopic<Integer, String> inputTopic1 =
                     driver.createInputTopic(topic1, new IntegerSerializer(), new StringSerializer(), Instant.ofEpochMilli(0L), Duration.ZERO);
             final TestInputTopic<Integer, String> inputTopic2 =
@@ -585,7 +595,7 @@ public class KStreamKStreamLeftJoinTest {
         );
         joined.process(supplier);
 
-        try (final TopologyTestDriver driver = new TopologyTestDriver(builder.build(), props)) {
+        try (final TopologyTestDriver driver = new TopologyTestDriver(builder.build(), PROPS)) {
             final TestInputTopic<Integer, String> inputTopic1 =
                 driver.createInputTopic(topic1, new IntegerSerializer(), new StringSerializer(), Instant.ofEpochMilli(0L), Duration.ZERO);
             final TestInputTopic<Integer, String> inputTopic2 =
@@ -642,7 +652,7 @@ public class KStreamKStreamLeftJoinTest {
         assertEquals(1, copartitionGroups.size());
         assertEquals(new HashSet<>(Arrays.asList(topic1, topic2)), copartitionGroups.iterator().next());
 
-        try (final TopologyTestDriver driver = new TopologyTestDriver(builder.build(), props)) {
+        try (final TopologyTestDriver driver = new TopologyTestDriver(builder.build(), PROPS)) {
             final TestInputTopic<Integer, String> inputTopic1 =
                 driver.createInputTopic(topic1, new IntegerSerializer(), new StringSerializer(), Instant.ofEpochMilli(0L), Duration.ZERO);
             final TestInputTopic<Integer, String> inputTopic2 =
@@ -714,7 +724,7 @@ public class KStreamKStreamLeftJoinTest {
         assertEquals(1, copartitionGroups.size());
         assertEquals(new HashSet<>(Arrays.asList(topic1, topic2)), copartitionGroups.iterator().next());
 
-        try (final TopologyTestDriver driver = new TopologyTestDriver(builder.build(), props)) {
+        try (final TopologyTestDriver driver = new TopologyTestDriver(builder.build(), PROPS)) {
             final TestInputTopic<Integer, String> inputTopic1 =
                     driver.createInputTopic(topic1, new IntegerSerializer(), new StringSerializer(), Instant.ofEpochMilli(0L), Duration.ZERO);
             final TestInputTopic<Integer, String> inputTopic2 =
@@ -749,6 +759,70 @@ public class KStreamKStreamLeftJoinTest {
         }
     }
 
+    @Test
+    public void shouldNotEmitLeftJoinResultForAsymmetricWindow() {
+        final StreamsBuilder builder = new StreamsBuilder();
+        final int[] expectedKeys = new int[] {0, 1, 2, 3};
+
+        final KStream<Integer, String> stream1;
+        final KStream<Integer, String> stream2;
+        final KStream<Integer, String> joined;
+        final MockProcessorSupplier<Integer, String> supplier = new MockProcessorSupplier<>();
+        stream1 = builder.stream(topic1, consumed);
+        stream2 = builder.stream(topic2, consumed);
+
+        joined = stream1.leftJoin(
+            stream2,
+            MockValueJoiner.TOSTRING_JOINER,
+            JoinWindows.ofTimeDifferenceWithNoGrace(ofMillis(100)).before(ZERO),
+            StreamJoined.with(Serdes.Integer(), Serdes.String(), Serdes.String())
+        );
+        joined.process(supplier);
+
+        try (final TopologyTestDriver driver = new TopologyTestDriver(builder.build(), PROPS)) {
+            final TestInputTopic<Integer, String> inputTopic1 =
+                driver.createInputTopic(topic1, new IntegerSerializer(), new StringSerializer(), Instant.ofEpochMilli(0L), Duration.ZERO);
+            final TestInputTopic<Integer, String> inputTopic2 =
+                driver.createInputTopic(topic2, new IntegerSerializer(), new StringSerializer(), Instant.ofEpochMilli(0L), Duration.ZERO);
+            final MockProcessor<Integer, String> processor = supplier.theCapturedProcessor();
+            long time = 0L;
+
+            // push two items to the primary stream; the other window is empty; this should not produce any items
+            // w1 = {}
+            // w2 = {}
+            // --> w1 = { 0:A0 (ts: 0), 1:A1 (ts: 1) }
+            // --> w2 = {}
+            for (int i = 0; i < 2; i++) {
+                inputTopic1.pipeInput(expectedKeys[i], "A" + expectedKeys[i], time + i);
+            }
+            processor.checkAndClearProcessResult();
+
+            // push one item to the other stream; this should produce one full-join item
+            // w1 = { 0:A0 (ts: 0), 1:A1 (ts: 1) }
+            // w2 = {}
+            // --> w1 = { 0:A0 (ts: 0), 1:A1 (ts: 1) }
+            // --> w2 = { 0:a0 (ts: 100) }
+            time += 100L;
+            inputTopic2.pipeInput(expectedKeys[0], "a" + expectedKeys[0], time);
+
+            processor.checkAndClearProcessResult(
+                new KeyValueTimestamp<>(0, "A0+a0", 100L)
+            );
+
+            // push one item to the other stream; this should produce one left-join item
+            // w1 = { 0:A0 (ts: 0), 1:A1 (ts: 1) }
+            // w2 = { 0:a0 (ts: 100) }
+            // --> w1 = { 0:A0 (ts: 0), 1:A1 (ts: 1) }
+            // --> w2 = { 0:a0 (ts: 100), 1:a1 (ts: 102) }
+            time += 2;
+            inputTopic2.pipeInput(expectedKeys[1], "a" + expectedKeys[1], time);
+
+            processor.checkAndClearProcessResult(
+                new KeyValueTimestamp<>(1, "A1+null", 1L)
+            );
+        }
+    }
+
     private void testUpperWindowBound(final int[] expectedKeys,
                                       final TopologyTestDriver driver,
                                       final MockProcessor<Integer, String> processor) {
@@ -877,7 +951,6 @@ public class KStreamKStreamLeftJoinTest {
 
         // push a dummy record to produce all left-join non-joined items
         time += 301L;
-        driver.advanceWallClockTime(Duration.ofMillis(1000L));
         inputTopic1.pipeInput(0, "dummy", time);
         processor.checkAndClearProcessResult(
             new KeyValueTimestamp<>(0, "C0+null", 1101L),
diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamOuterJoinTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamOuterJoinTest.java
index 2d9e320..133d8d5 100644
--- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamOuterJoinTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamOuterJoinTest.java
@@ -21,6 +21,7 @@ import org.apache.kafka.common.serialization.Serdes;
 import org.apache.kafka.common.serialization.StringSerializer;
 import org.apache.kafka.streams.KeyValueTimestamp;
 import org.apache.kafka.streams.StreamsBuilder;
+import org.apache.kafka.streams.StreamsConfig.InternalConfig;
 import org.apache.kafka.streams.TestInputTopic;
 import org.apache.kafka.streams.TopologyTestDriver;
 import org.apache.kafka.streams.TopologyWrapper;
@@ -34,6 +35,7 @@ import org.apache.kafka.test.MockProcessor;
 import org.apache.kafka.test.MockProcessorSupplier;
 import org.apache.kafka.test.MockValueJoiner;
 import org.apache.kafka.test.StreamsTestUtils;
+import org.junit.BeforeClass;
 import org.junit.Test;
 
 import java.time.Duration;
@@ -44,6 +46,7 @@ import java.util.HashSet;
 import java.util.Properties;
 import java.util.Set;
 
+import static java.time.Duration.ZERO;
 import static java.time.Duration.ofMillis;
 import static org.junit.Assert.assertEquals;
 
@@ -53,8 +56,14 @@ public class KStreamKStreamOuterJoinTest {
     private final String topic1 = "topic1";
     private final String topic2 = "topic2";
     private final Consumed<Integer, String> consumed = Consumed.with(Serdes.Integer(), Serdes.String());
-    private final Properties props = StreamsTestUtils.getStreamsConfig(Serdes.String(), Serdes.String());
+    private final static Properties PROPS = StreamsTestUtils.getStreamsConfig(Serdes.String(), Serdes.String());
 
+    @BeforeClass
+    public static void beforeClass() {
+        PROPS.put(InternalConfig.EMIT_INTERVAL_MS_KSTREAMS_OUTER_JOIN_SPURIOUS_RESULTS_FIX, 0L);
+    }
+
+    @SuppressWarnings("deprecation") // old join semantics; can be removed when `JoinWindows.of()` is removed
     @Test
     public void testOuterJoinDuplicatesWithFixDisabledOldApi() {
         final StreamsBuilder builder = new StreamsBuilder();
@@ -74,7 +83,7 @@ public class KStreamKStreamOuterJoinTest {
         );
         joined.process(supplier);
 
-        try (final TopologyTestDriver driver = new TopologyTestDriver(builder.build(props), props)) {
+        try (final TopologyTestDriver driver = new TopologyTestDriver(builder.build(PROPS), PROPS)) {
             final TestInputTopic<Integer, String> inputTopic1 =
                     driver.createInputTopic(topic1, new IntegerSerializer(), new StringSerializer(), Instant.ofEpochMilli(0L), Duration.ZERO);
             final TestInputTopic<Integer, String> inputTopic2 =
@@ -118,7 +127,7 @@ public class KStreamKStreamOuterJoinTest {
         );
         joined.process(supplier);
 
-        try (final TopologyTestDriver driver = new TopologyTestDriver(builder.build(), props)) {
+        try (final TopologyTestDriver driver = new TopologyTestDriver(builder.build(), PROPS)) {
             final TestInputTopic<Integer, String> inputTopic1 =
                 driver.createInputTopic(topic1, new IntegerSerializer(), new StringSerializer(), Instant.ofEpochMilli(0L), Duration.ZERO);
             final TestInputTopic<Integer, String> inputTopic2 =
@@ -156,7 +165,6 @@ public class KStreamKStreamOuterJoinTest {
 
             // this record should expired non-joined records; only null+a0 will be emitted because
             // it did not have a join
-            driver.advanceWallClockTime(Duration.ofMillis(1000L));
             inputTopic2.pipeInput(3, "dummy", 1500L);
 
             processor.checkAndClearProcessResult(
@@ -185,7 +193,7 @@ public class KStreamKStreamOuterJoinTest {
         );
         joined.process(supplier);
 
-        try (final TopologyTestDriver driver = new TopologyTestDriver(builder.build(), props)) {
+        try (final TopologyTestDriver driver = new TopologyTestDriver(builder.build(), PROPS)) {
             final TestInputTopic<Integer, String> inputTopic1 =
                 driver.createInputTopic(topic1, new IntegerSerializer(), new StringSerializer(), Instant.ofEpochMilli(0L), Duration.ZERO);
             final TestInputTopic<Integer, String> inputTopic2 =
@@ -240,7 +248,7 @@ public class KStreamKStreamOuterJoinTest {
         );
         joined.process(supplier);
 
-        try (final TopologyTestDriver driver = new TopologyTestDriver(builder.build(), props)) {
+        try (final TopologyTestDriver driver = new TopologyTestDriver(builder.build(), PROPS)) {
             final TestInputTopic<Integer, String> inputTopic1 =
                 driver.createInputTopic(topic1, new IntegerSerializer(), new StringSerializer(), Instant.ofEpochMilli(0L), Duration.ZERO);
             final TestInputTopic<Integer, String> inputTopic2 =
@@ -295,7 +303,7 @@ public class KStreamKStreamOuterJoinTest {
         );
         joined.process(supplier);
 
-        try (final TopologyTestDriver driver = new TopologyTestDriver(builder.build(), props)) {
+        try (final TopologyTestDriver driver = new TopologyTestDriver(builder.build(), PROPS)) {
             final TestInputTopic<Integer, String> inputTopic1 =
                 driver.createInputTopic(topic1, new IntegerSerializer(), new StringSerializer(), Instant.ofEpochMilli(0L), Duration.ZERO);
             final TestInputTopic<Integer, String> inputTopic2 =
@@ -350,7 +358,7 @@ public class KStreamKStreamOuterJoinTest {
         );
         joined.process(supplier);
 
-        try (final TopologyTestDriver driver = new TopologyTestDriver(builder.build(), props)) {
+        try (final TopologyTestDriver driver = new TopologyTestDriver(builder.build(), PROPS)) {
             final TestInputTopic<Integer, String> inputTopic1 =
                 driver.createInputTopic(topic1, new IntegerSerializer(), new StringSerializer(), Instant.ofEpochMilli(0L), Duration.ZERO);
             final TestInputTopic<Integer, String> inputTopic2 =
@@ -405,7 +413,7 @@ public class KStreamKStreamOuterJoinTest {
         );
         joined.process(supplier);
 
-        try (final TopologyTestDriver driver = new TopologyTestDriver(builder.build(), props)) {
+        try (final TopologyTestDriver driver = new TopologyTestDriver(builder.build(), PROPS)) {
             final TestInputTopic<Integer, String> inputTopic1 =
                 driver.createInputTopic(topic1, new IntegerSerializer(), new StringSerializer(), Instant.ofEpochMilli(0L), Duration.ZERO);
             final TestInputTopic<Integer, String> inputTopic2 =
@@ -461,7 +469,7 @@ public class KStreamKStreamOuterJoinTest {
         assertEquals(1, copartitionGroups.size());
         assertEquals(new HashSet<>(Arrays.asList(topic1, topic2)), copartitionGroups.iterator().next());
 
-        try (final TopologyTestDriver driver = new TopologyTestDriver(builder.build(), props)) {
+        try (final TopologyTestDriver driver = new TopologyTestDriver(builder.build(), PROPS)) {
             final TestInputTopic<Integer, String> inputTopic1 =
                 driver.createInputTopic(topic1, new IntegerSerializer(), new StringSerializer(), Instant.ofEpochMilli(0L), Duration.ZERO);
             final TestInputTopic<Integer, String> inputTopic2 =
@@ -560,7 +568,7 @@ public class KStreamKStreamOuterJoinTest {
         assertEquals(1, copartitionGroups.size());
         assertEquals(new HashSet<>(Arrays.asList(topic1, topic2)), copartitionGroups.iterator().next());
 
-        try (final TopologyTestDriver driver = new TopologyTestDriver(builder.build(), props)) {
+        try (final TopologyTestDriver driver = new TopologyTestDriver(builder.build(), PROPS)) {
             final TestInputTopic<Integer, String> inputTopic1 =
                 driver.createInputTopic(topic1, new IntegerSerializer(), new StringSerializer(), Instant.ofEpochMilli(0L), Duration.ZERO);
             final TestInputTopic<Integer, String> inputTopic2 =
@@ -673,7 +681,7 @@ public class KStreamKStreamOuterJoinTest {
         assertEquals(1, copartitionGroups.size());
         assertEquals(new HashSet<>(Arrays.asList(topic1, topic2)), copartitionGroups.iterator().next());
 
-        try (final TopologyTestDriver driver = new TopologyTestDriver(builder.build(), props)) {
+        try (final TopologyTestDriver driver = new TopologyTestDriver(builder.build(), PROPS)) {
             final TestInputTopic<Integer, String> inputTopic1 =
                 driver.createInputTopic(topic1, new IntegerSerializer(), new StringSerializer(), Instant.ofEpochMilli(0L), Duration.ZERO);
             final TestInputTopic<Integer, String> inputTopic2 =
@@ -709,6 +717,178 @@ public class KStreamKStreamOuterJoinTest {
         }
     }
 
+    @Test
+    public void shouldNotEmitLeftJoinResultForAsymmetricBeforeWindow() {
+        final StreamsBuilder builder = new StreamsBuilder();
+        final int[] expectedKeys = new int[] {0, 1, 2, 3};
+
+        final KStream<Integer, String> stream1;
+        final KStream<Integer, String> stream2;
+        final KStream<Integer, String> joined;
+        final MockProcessorSupplier<Integer, String> supplier = new MockProcessorSupplier<>();
+        stream1 = builder.stream(topic1, consumed);
+        stream2 = builder.stream(topic2, consumed);
+
+        joined = stream1.outerJoin(
+            stream2,
+            MockValueJoiner.TOSTRING_JOINER,
+            JoinWindows.ofTimeDifferenceWithNoGrace(ofMillis(100)).before(ZERO),
+            StreamJoined.with(Serdes.Integer(), Serdes.String(), Serdes.String())
+        );
+        joined.process(supplier);
+
+        try (final TopologyTestDriver driver = new TopologyTestDriver(builder.build(), PROPS)) {
+            final TestInputTopic<Integer, String> inputTopic1 =
+                driver.createInputTopic(topic1, new IntegerSerializer(), new StringSerializer(), Instant.ofEpochMilli(0L), Duration.ZERO);
+            final TestInputTopic<Integer, String> inputTopic2 =
+                driver.createInputTopic(topic2, new IntegerSerializer(), new StringSerializer(), Instant.ofEpochMilli(0L), Duration.ZERO);
+            final MockProcessor<Integer, String> processor = supplier.theCapturedProcessor();
+            long time = 0L;
+
+            // push two items to the primary stream; the other window is empty; this should not produce any items
+            // w1 = {}
+            // w2 = {}
+            // --> w1 = { 0:A0 (ts: 0), 1:A1 (ts: 1) }
+            // --> w2 = {}
+            for (int i = 0; i < 2; i++) {
+                inputTopic1.pipeInput(expectedKeys[i], "A" + expectedKeys[i], time + i);
+            }
+            processor.checkAndClearProcessResult();
+
+            // push one item to the other stream; this should produce one full-join item
+            // w1 = { 0:A0 (ts: 0), 1:A1 (ts: 1) }
+            // w2 = {}
+            // --> w1 = { 0:A0 (ts: 0), 1:A1 (ts: 1) }
+            // --> w2 = { 0:a0 (ts: 100) }
+            time += 100L;
+            inputTopic2.pipeInput(expectedKeys[0], "a" + expectedKeys[0], time);
+
+            processor.checkAndClearProcessResult(
+                new KeyValueTimestamp<>(0, "A0+a0", 100L)
+            );
+
+            // push one item to the other stream; this should produce one left-join item
+            // w1 = { 0:A0 (ts: 0), 1:A1 (ts: 1) }
+            // w2 = { 0:a0 (ts: 100) }
+            // --> w1 = { 0:A0 (ts: 0), 1:A1 (ts: 1) }
+            // --> w2 = { 0:a0 (ts: 100), 1:a1 (ts: 102) }
+            time += 2;
+            inputTopic2.pipeInput(expectedKeys[1], "a" + expectedKeys[1], time);
+
+            processor.checkAndClearProcessResult(
+                new KeyValueTimestamp<>(1, "A1+null", 1L)
+            );
+
+            // push one item to the other stream; this should not produce any items
+            // w1 = { 0:A0 (ts: 0), 1:A1 (ts: 1) }
+            // w2 = { 0:a0 (ts: 100), 1:a1 (ts: 102) }
+            // --> w1 = { 0:A0 (ts: 0), 1:A1 (ts: 1) }
+            // --> w2 = { 0:a0 (ts: 100), 1:a1 (ts: 102), 2:a2 (ts: 103) }
+            time += 1;
+            inputTopic2.pipeInput(expectedKeys[2], "a" + expectedKeys[2], time);
+
+            processor.checkAndClearProcessResult(
+                new KeyValueTimestamp<>(1, "null+a1", 102L)
+            );
+
+            // push one item to the first stream; this should not produce one full-join item
+            // w1 = { 0:A0 (ts: 0), 1:A1 (ts: 1) }
+            // w2 = { 0:a0 (ts: 100), 1:a1 (ts: 102), 2:a2 (ts: 103) }
+            // --> w1 = { 0:A0 (ts: 0), 1:A1 (ts: 1), 2:A2 (ts: 103) }
+            // --> w2 = { 0:a0 (ts: 100), 1:a1 (ts: 102), 2:a2 (ts: 103)  }
+            inputTopic1.pipeInput(expectedKeys[2], "A" + expectedKeys[2], time);
+
+            processor.checkAndClearProcessResult(
+                new KeyValueTimestamp<>(2, "A2+a2", 103L)
+            );
+        }
+    }
+
+    @Test
+    public void shouldNotEmitLeftJoinResultForAsymmetricAfterWindow() {
+        final StreamsBuilder builder = new StreamsBuilder();
+        final int[] expectedKeys = new int[] {0, 1, 2, 3};
+
+        final KStream<Integer, String> stream1;
+        final KStream<Integer, String> stream2;
+        final KStream<Integer, String> joined;
+        final MockProcessorSupplier<Integer, String> supplier = new MockProcessorSupplier<>();
+        stream1 = builder.stream(topic1, consumed);
+        stream2 = builder.stream(topic2, consumed);
+
+        joined = stream1.outerJoin(
+            stream2,
+            MockValueJoiner.TOSTRING_JOINER,
+            JoinWindows.ofTimeDifferenceWithNoGrace(ofMillis(100)).after(ZERO),
+            StreamJoined.with(Serdes.Integer(), Serdes.String(), Serdes.String())
+        );
+        joined.process(supplier);
+
+        try (final TopologyTestDriver driver = new TopologyTestDriver(builder.build(), PROPS)) {
+            final TestInputTopic<Integer, String> inputTopic1 =
+                driver.createInputTopic(topic1, new IntegerSerializer(), new StringSerializer(), Instant.ofEpochMilli(0L), Duration.ZERO);
+            final TestInputTopic<Integer, String> inputTopic2 =
+                driver.createInputTopic(topic2, new IntegerSerializer(), new StringSerializer(), Instant.ofEpochMilli(0L), Duration.ZERO);
+            final MockProcessor<Integer, String> processor = supplier.theCapturedProcessor();
+            long time = 0L;
+
+            // push two items to the primary stream; the other window is empty; this should not produce any item
+            // w1 = {}
+            // w2 = {}
+            // --> w1 = { 0:A0 (ts: 0), 1:A1 (ts: 1) }
+            // --> w2 = {}
+            for (int i = 0; i < 2; i++) {
+                inputTopic1.pipeInput(expectedKeys[i], "A" + expectedKeys[i], time + i);
+            }
+            processor.checkAndClearProcessResult();
+
+            // push one item to the other stream; this should produce one full-join item
+            // w1 = { 0:A0 (ts: 0), 1:A1 (ts: 1) }
+            // w2 = {}
+            // --> w1 = { 0:A0 (ts: 0), 1:A1 (ts: 1) }
+            // --> w2 = { 1:a1 (ts: 1) }
+            time += 1;
+            inputTopic2.pipeInput(expectedKeys[1], "a" + expectedKeys[1], time);
+
+            processor.checkAndClearProcessResult(
+                new KeyValueTimestamp<>(1, "A1+a1", 1L)
+            );
+
+            // push one item to the other stream; this should produce one left-join item
+            // w1 = { 0:A0 (ts: 0), 1:A1 (ts: 1) }
+            // w2 = { 1:a1 (ts: 1) }
+            // --> w1 = { 0:A0 (ts: 0), 1:A1 (ts: 1) }
+            // --> w2 = { 1:a1 (ts: 1), 2:a2 (ts: 101) }
+            time += 100;
+            inputTopic2.pipeInput(expectedKeys[2], "a" + expectedKeys[2], time);
+
+            processor.checkAndClearProcessResult(
+                new KeyValueTimestamp<>(0, "A0+null", 0L)
+            );
+
+            // push one item to the other stream; this should not produce any item
+            // w1 = { 0:A0 (ts: 0), 1:A1 (ts: 1) }
+            // w2 = { 1:a1 (ts: 1), 2:a2 (ts: 101) }
+            // --> w1 = { 0:A0 (ts: 0), 1:A1 (ts: 1) }
+            // --> w2 = { 1:a1 (ts: 1), 2:a2 (ts: 101), 3:a3 (ts: 101) }
+            inputTopic2.pipeInput(expectedKeys[3], "a" + expectedKeys[3], time);
+
+            processor.checkAndClearProcessResult();
+
+            // push one item to the first stream; this should produce one full-join item
+            // w1 = { 0:A0 (ts: 0), 1:A1 (ts: 1) }
+            // w2 = { 1:a1 (ts: 1), 2:a2 (ts: 101), 3:a3 (ts: 101) }
+            // --> w1 = { 0:A0 (ts: 0), 1:A1 (ts: 1), 2:A2 (ts: 201) }
+            // --> w2 = { 1:a1 (ts: 1), 2:a2 (ts: 101), 3:a3 (ts: 101 }
+            time += 100;
+            inputTopic1.pipeInput(expectedKeys[2], "A" + expectedKeys[2], time);
+
+            processor.checkAndClearProcessResult(
+                new KeyValueTimestamp<>(2, "A2+a2", 201L)
+            );
+        }
+    }
+
     private void testUpperWindowBound(final int[] expectedKeys,
                                       final TopologyTestDriver driver,
                                       final MockProcessor<Integer, String> processor) {
@@ -840,7 +1020,6 @@ public class KStreamKStreamOuterJoinTest {
 
         // push a dummy record to produce all left-join non-joined items
         time += 301L;
-        driver.advanceWallClockTime(Duration.ofMillis(1000L));
         inputTopic1.pipeInput(0, "dummy", time);
         processor.checkAndClearProcessResult(
             new KeyValueTimestamp<>(0, "C0+null", 1101L),
@@ -1034,4 +1213,4 @@ public class KStreamKStreamOuterJoinTest {
             new KeyValueTimestamp<>(0, "dummy+null", 1103L)
         );
     }
-}
\ No newline at end of file
+}