You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by mj...@apache.org on 2022/03/15 17:33:46 UTC
[kafka] branch 3.1 updated: KAFKA-13721: asymetric join-winodws should not emit spurious left/outer join results (#11875)
This is an automated email from the ASF dual-hosted git repository.
mjsax pushed a commit to branch 3.1
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/3.1 by this push:
new fe52a95 KAFKA-13721: asymetric join-winodws should not emit spurious left/outer join results (#11875)
fe52a95 is described below
commit fe52a95f50b17713b80a27e5f4d33b3fb18d6101
Author: Matthias J. Sax <ma...@confluent.io>
AuthorDate: Tue Mar 15 09:37:01 2022 -0700
KAFKA-13721: asymetric join-winodws should not emit spurious left/outer join results (#11875)
Reviewers: Sergio Peña <se...@confluent.io>, Guozhang Wang <gu...@confluent.io>
---
.../kstream/internals/KStreamKStreamJoin.java | 5 +-
.../kstream/internals/KStreamKStreamJoinTest.java | 11 +-
.../internals/KStreamKStreamLeftJoinTest.java | 99 ++++++++--
.../internals/KStreamKStreamOuterJoinTest.java | 207 +++++++++++++++++++--
4 files changed, 287 insertions(+), 35 deletions(-)
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamJoin.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamJoin.java
index 305cb38..4d9ac2e 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamJoin.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamJoin.java
@@ -51,6 +51,7 @@ class KStreamKStreamJoin<K, V1, V2, VOut> implements ProcessorSupplier<K, V1, K,
private final long joinAfterMs;
private final long joinGraceMs;
private final boolean enableSpuriousResultFix;
+ private final long joinSpuriousLookBackTimeMs;
private final boolean outer;
private final boolean isLeftSide;
@@ -71,9 +72,11 @@ class KStreamKStreamJoin<K, V1, V2, VOut> implements ProcessorSupplier<K, V1, K,
if (isLeftSide) {
this.joinBeforeMs = windows.beforeMs;
this.joinAfterMs = windows.afterMs;
+ this.joinSpuriousLookBackTimeMs = windows.beforeMs;
} else {
this.joinBeforeMs = windows.afterMs;
this.joinAfterMs = windows.beforeMs;
+ this.joinSpuriousLookBackTimeMs = windows.afterMs;
}
this.joinGraceMs = windows.gracePeriodMs();
this.enableSpuriousResultFix = windows.spuriousResultFixEnabled();
@@ -211,7 +214,7 @@ class KStreamKStreamJoin<K, V1, V2, VOut> implements ProcessorSupplier<K, V1, K,
// to reduce runtime cost, we try to avoid paying those cost
// only try to emit left/outer join results if there _might_ be any result records
- if (sharedTimeTracker.minTime >= sharedTimeTracker.streamTime - joinAfterMs - joinGraceMs) {
+ if (sharedTimeTracker.minTime >= sharedTimeTracker.streamTime - joinSpuriousLookBackTimeMs - joinGraceMs) {
return;
}
// throttle the emit frequency to a (configurable) interval;
diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamJoinTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamJoinTest.java
index 08b0e93..ea4eaae 100644
--- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamJoinTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamJoinTest.java
@@ -105,7 +105,6 @@ public class KStreamKStreamJoinTest {
}
}
-
@Test
public void shouldReuseRepartitionTopicWithGeneratedName() {
final StreamsBuilder builder = new StreamsBuilder();
@@ -139,8 +138,7 @@ public class KStreamKStreamJoinTest {
@Test
public void shouldDisableLoggingOnStreamJoined() {
-
- final JoinWindows joinWindows = JoinWindows.of(ofMillis(100)).grace(Duration.ofMillis(50));
+ final JoinWindows joinWindows = JoinWindows.ofTimeDifferenceAndGrace(ofMillis(100), Duration.ofMillis(50));
final StreamJoined<String, Integer, Integer> streamJoined = StreamJoined
.with(Serdes.String(), Serdes.Integer(), Serdes.Integer())
.withStoreName("store")
@@ -166,8 +164,7 @@ public class KStreamKStreamJoinTest {
@Test
public void shouldEnableLoggingWithCustomConfigOnStreamJoined() {
-
- final JoinWindows joinWindows = JoinWindows.of(ofMillis(100)).grace(Duration.ofMillis(50));
+ final JoinWindows joinWindows = JoinWindows.ofTimeDifferenceAndGrace(ofMillis(100), Duration.ofMillis(50));
final StreamJoined<String, Integer, Integer> streamJoined = StreamJoined
.with(Serdes.String(), Serdes.Integer(), Serdes.Integer())
.withStoreName("store")
@@ -1199,7 +1196,7 @@ public class KStreamKStreamJoinTest {
joined = stream1.join(
stream2,
MockValueJoiner.TOSTRING_JOINER,
- JoinWindows.of(ofMillis(0)).after(ofMillis(100)).grace(ofMillis(0)),
+ JoinWindows.ofTimeDifferenceWithNoGrace(ofMillis(0)).after(ofMillis(100)),
StreamJoined.with(Serdes.Integer(),
Serdes.String(),
Serdes.String())
@@ -1868,4 +1865,4 @@ public class KStreamKStreamJoinTest {
" <-- KSTREAM-MERGE-0000000011\n" +
" Sink: KSTREAM-SINK-0000000021 (topic: out-to)\n" +
" <-- KSTREAM-MERGE-0000000020\n\n";
-}
\ No newline at end of file
+}
diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamLeftJoinTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamLeftJoinTest.java
index 4e2b6d8..55aa7c4 100644
--- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamLeftJoinTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamLeftJoinTest.java
@@ -21,6 +21,7 @@ import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.common.serialization.StringSerializer;
import org.apache.kafka.streams.KeyValueTimestamp;
import org.apache.kafka.streams.StreamsBuilder;
+import org.apache.kafka.streams.StreamsConfig.InternalConfig;
import org.apache.kafka.streams.TopologyTestDriver;
import org.apache.kafka.streams.TopologyWrapper;
import org.apache.kafka.streams.kstream.Consumed;
@@ -34,6 +35,7 @@ import org.apache.kafka.test.MockProcessor;
import org.apache.kafka.test.MockProcessorSupplier;
import org.apache.kafka.test.MockValueJoiner;
import org.apache.kafka.test.StreamsTestUtils;
+import org.junit.BeforeClass;
import org.junit.Test;
import java.time.Duration;
@@ -44,6 +46,7 @@ import java.util.HashSet;
import java.util.Properties;
import java.util.Set;
+import static java.time.Duration.ZERO;
import static java.time.Duration.ofMillis;
import static org.junit.Assert.assertEquals;
@@ -55,8 +58,14 @@ public class KStreamKStreamLeftJoinTest {
private final String topic1 = "topic1";
private final String topic2 = "topic2";
private final Consumed<Integer, String> consumed = Consumed.with(Serdes.Integer(), Serdes.String());
- private final Properties props = StreamsTestUtils.getStreamsConfig(Serdes.String(), Serdes.String());
+ private final static Properties PROPS = StreamsTestUtils.getStreamsConfig(Serdes.String(), Serdes.String());
+ @BeforeClass
+ public static void beforeClass() {
+ PROPS.put(InternalConfig.EMIT_INTERVAL_MS_KSTREAMS_OUTER_JOIN_SPURIOUS_RESULTS_FIX, 0L);
+ }
+
+ @SuppressWarnings("deprecation") // old join semantics; can be removed when `JoinWindows.of()` is removed
@Test
public void testLeftJoinWithSpuriousResultFixDisabledOldApi() {
final StreamsBuilder builder = new StreamsBuilder();
@@ -78,7 +87,7 @@ public class KStreamKStreamLeftJoinTest {
);
joined.process(supplier);
- try (final TopologyTestDriver driver = new TopologyTestDriver(builder.build(props), props)) {
+ try (final TopologyTestDriver driver = new TopologyTestDriver(builder.build(PROPS), PROPS)) {
final TestInputTopic<Integer, String> inputTopic1 =
driver.createInputTopic(topic1, new IntegerSerializer(), new StringSerializer(), Instant.ofEpochMilli(0L), Duration.ZERO);
final TestInputTopic<Integer, String> inputTopic2 =
@@ -116,6 +125,7 @@ public class KStreamKStreamLeftJoinTest {
}
}
+ @SuppressWarnings("deprecation") // old join semantics; can be removed when `JoinWindows.of()` is removed
@Test
public void testLeftJoinDuplicatesWithSpuriousResultFixDisabledOldApi() {
final StreamsBuilder builder = new StreamsBuilder();
@@ -135,7 +145,7 @@ public class KStreamKStreamLeftJoinTest {
);
joined.process(supplier);
- try (final TopologyTestDriver driver = new TopologyTestDriver(builder.build(props), props)) {
+ try (final TopologyTestDriver driver = new TopologyTestDriver(builder.build(PROPS), PROPS)) {
final TestInputTopic<Integer, String> inputTopic1 =
driver.createInputTopic(topic1, new IntegerSerializer(), new StringSerializer(), Instant.ofEpochMilli(0L), Duration.ZERO);
final TestInputTopic<Integer, String> inputTopic2 =
@@ -177,7 +187,7 @@ public class KStreamKStreamLeftJoinTest {
);
joined.process(supplier);
- try (final TopologyTestDriver driver = new TopologyTestDriver(builder.build(), props)) {
+ try (final TopologyTestDriver driver = new TopologyTestDriver(builder.build(), PROPS)) {
final TestInputTopic<Integer, String> inputTopic1 =
driver.createInputTopic(topic1, new IntegerSerializer(), new StringSerializer(), Instant.ofEpochMilli(0L), Duration.ZERO);
final TestInputTopic<Integer, String> inputTopic2 =
@@ -233,7 +243,7 @@ public class KStreamKStreamLeftJoinTest {
);
joined.process(supplier);
- try (final TopologyTestDriver driver = new TopologyTestDriver(builder.build(), props)) {
+ try (final TopologyTestDriver driver = new TopologyTestDriver(builder.build(), PROPS)) {
final TestInputTopic<Integer, String> inputTopic1 =
driver.createInputTopic(topic1, new IntegerSerializer(), new StringSerializer(), Instant.ofEpochMilli(0L), Duration.ZERO);
final TestInputTopic<Integer, String> inputTopic2 =
@@ -288,7 +298,7 @@ public class KStreamKStreamLeftJoinTest {
);
joined.process(supplier);
- try (final TopologyTestDriver driver = new TopologyTestDriver(builder.build(), props)) {
+ try (final TopologyTestDriver driver = new TopologyTestDriver(builder.build(), PROPS)) {
final TestInputTopic<Integer, String> inputTopic1 =
driver.createInputTopic(topic1, new IntegerSerializer(), new StringSerializer(), Instant.ofEpochMilli(0L), Duration.ZERO);
final TestInputTopic<Integer, String> inputTopic2 =
@@ -343,7 +353,7 @@ public class KStreamKStreamLeftJoinTest {
);
joined.process(supplier);
- try (final TopologyTestDriver driver = new TopologyTestDriver(builder.build(), props)) {
+ try (final TopologyTestDriver driver = new TopologyTestDriver(builder.build(), PROPS)) {
final TestInputTopic<Integer, String> inputTopic1 =
driver.createInputTopic(topic1, new IntegerSerializer(), new StringSerializer(), Instant.ofEpochMilli(0L), Duration.ZERO);
final TestInputTopic<Integer, String> inputTopic2 =
@@ -395,7 +405,7 @@ public class KStreamKStreamLeftJoinTest {
);
joined.process(supplier);
- try (final TopologyTestDriver driver = new TopologyTestDriver(builder.build(), props)) {
+ try (final TopologyTestDriver driver = new TopologyTestDriver(builder.build(), PROPS)) {
final TestInputTopic<Integer, String> inputTopic1 =
driver.createInputTopic(topic1, new IntegerSerializer(), new StringSerializer(), Instant.ofEpochMilli(0L), Duration.ZERO);
final TestInputTopic<Integer, String> inputTopic2 =
@@ -480,7 +490,7 @@ public class KStreamKStreamLeftJoinTest {
assertEquals(1, copartitionGroups.size());
assertEquals(new HashSet<>(Arrays.asList(topic1, topic2)), copartitionGroups.iterator().next());
- try (final TopologyTestDriver driver = new TopologyTestDriver(builder.build(), props)) {
+ try (final TopologyTestDriver driver = new TopologyTestDriver(builder.build(), PROPS)) {
final TestInputTopic<Integer, String> inputTopic1 =
driver.createInputTopic(topic1, new IntegerSerializer(), new StringSerializer(), Instant.ofEpochMilli(0L), Duration.ZERO);
final TestInputTopic<Integer, String> inputTopic2 =
@@ -585,7 +595,7 @@ public class KStreamKStreamLeftJoinTest {
);
joined.process(supplier);
- try (final TopologyTestDriver driver = new TopologyTestDriver(builder.build(), props)) {
+ try (final TopologyTestDriver driver = new TopologyTestDriver(builder.build(), PROPS)) {
final TestInputTopic<Integer, String> inputTopic1 =
driver.createInputTopic(topic1, new IntegerSerializer(), new StringSerializer(), Instant.ofEpochMilli(0L), Duration.ZERO);
final TestInputTopic<Integer, String> inputTopic2 =
@@ -642,7 +652,7 @@ public class KStreamKStreamLeftJoinTest {
assertEquals(1, copartitionGroups.size());
assertEquals(new HashSet<>(Arrays.asList(topic1, topic2)), copartitionGroups.iterator().next());
- try (final TopologyTestDriver driver = new TopologyTestDriver(builder.build(), props)) {
+ try (final TopologyTestDriver driver = new TopologyTestDriver(builder.build(), PROPS)) {
final TestInputTopic<Integer, String> inputTopic1 =
driver.createInputTopic(topic1, new IntegerSerializer(), new StringSerializer(), Instant.ofEpochMilli(0L), Duration.ZERO);
final TestInputTopic<Integer, String> inputTopic2 =
@@ -714,7 +724,7 @@ public class KStreamKStreamLeftJoinTest {
assertEquals(1, copartitionGroups.size());
assertEquals(new HashSet<>(Arrays.asList(topic1, topic2)), copartitionGroups.iterator().next());
- try (final TopologyTestDriver driver = new TopologyTestDriver(builder.build(), props)) {
+ try (final TopologyTestDriver driver = new TopologyTestDriver(builder.build(), PROPS)) {
final TestInputTopic<Integer, String> inputTopic1 =
driver.createInputTopic(topic1, new IntegerSerializer(), new StringSerializer(), Instant.ofEpochMilli(0L), Duration.ZERO);
final TestInputTopic<Integer, String> inputTopic2 =
@@ -749,6 +759,70 @@ public class KStreamKStreamLeftJoinTest {
}
}
+ @Test
+ public void shouldNotEmitLeftJoinResultForAsymmetricWindow() {
+ final StreamsBuilder builder = new StreamsBuilder();
+ final int[] expectedKeys = new int[] {0, 1, 2, 3};
+
+ final KStream<Integer, String> stream1;
+ final KStream<Integer, String> stream2;
+ final KStream<Integer, String> joined;
+ final MockProcessorSupplier<Integer, String> supplier = new MockProcessorSupplier<>();
+ stream1 = builder.stream(topic1, consumed);
+ stream2 = builder.stream(topic2, consumed);
+
+ joined = stream1.leftJoin(
+ stream2,
+ MockValueJoiner.TOSTRING_JOINER,
+ JoinWindows.ofTimeDifferenceWithNoGrace(ofMillis(100)).before(ZERO),
+ StreamJoined.with(Serdes.Integer(), Serdes.String(), Serdes.String())
+ );
+ joined.process(supplier);
+
+ try (final TopologyTestDriver driver = new TopologyTestDriver(builder.build(), PROPS)) {
+ final TestInputTopic<Integer, String> inputTopic1 =
+ driver.createInputTopic(topic1, new IntegerSerializer(), new StringSerializer(), Instant.ofEpochMilli(0L), Duration.ZERO);
+ final TestInputTopic<Integer, String> inputTopic2 =
+ driver.createInputTopic(topic2, new IntegerSerializer(), new StringSerializer(), Instant.ofEpochMilli(0L), Duration.ZERO);
+ final MockProcessor<Integer, String> processor = supplier.theCapturedProcessor();
+ long time = 0L;
+
+ // push two items to the primary stream; the other window is empty; this should not produce any items
+ // w1 = {}
+ // w2 = {}
+ // --> w1 = { 0:A0 (ts: 0), 1:A1 (ts: 1) }
+ // --> w2 = {}
+ for (int i = 0; i < 2; i++) {
+ inputTopic1.pipeInput(expectedKeys[i], "A" + expectedKeys[i], time + i);
+ }
+ processor.checkAndClearProcessResult();
+
+ // push one item to the other stream; this should produce one full-join item
+ // w1 = { 0:A0 (ts: 0), 1:A1 (ts: 1) }
+ // w2 = {}
+ // --> w1 = { 0:A0 (ts: 0), 1:A1 (ts: 1) }
+ // --> w2 = { 0:a0 (ts: 100) }
+ time += 100L;
+ inputTopic2.pipeInput(expectedKeys[0], "a" + expectedKeys[0], time);
+
+ processor.checkAndClearProcessResult(
+ new KeyValueTimestamp<>(0, "A0+a0", 100L)
+ );
+
+ // push one item to the other stream; this should produce one left-join item
+ // w1 = { 0:A0 (ts: 0), 1:A1 (ts: 1) }
+ // w2 = { 0:a0 (ts: 100) }
+ // --> w1 = { 0:A0 (ts: 0), 1:A1 (ts: 1) }
+ // --> w2 = { 0:a0 (ts: 100), 1:a1 (ts: 102) }
+ time += 2;
+ inputTopic2.pipeInput(expectedKeys[1], "a" + expectedKeys[1], time);
+
+ processor.checkAndClearProcessResult(
+ new KeyValueTimestamp<>(1, "A1+null", 1L)
+ );
+ }
+ }
+
private void testUpperWindowBound(final int[] expectedKeys,
final TopologyTestDriver driver,
final MockProcessor<Integer, String> processor) {
@@ -877,7 +951,6 @@ public class KStreamKStreamLeftJoinTest {
// push a dummy record to produce all left-join non-joined items
time += 301L;
- driver.advanceWallClockTime(Duration.ofMillis(1000L));
inputTopic1.pipeInput(0, "dummy", time);
processor.checkAndClearProcessResult(
new KeyValueTimestamp<>(0, "C0+null", 1101L),
diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamOuterJoinTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamOuterJoinTest.java
index 2d9e320..133d8d5 100644
--- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamOuterJoinTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamOuterJoinTest.java
@@ -21,6 +21,7 @@ import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.common.serialization.StringSerializer;
import org.apache.kafka.streams.KeyValueTimestamp;
import org.apache.kafka.streams.StreamsBuilder;
+import org.apache.kafka.streams.StreamsConfig.InternalConfig;
import org.apache.kafka.streams.TestInputTopic;
import org.apache.kafka.streams.TopologyTestDriver;
import org.apache.kafka.streams.TopologyWrapper;
@@ -34,6 +35,7 @@ import org.apache.kafka.test.MockProcessor;
import org.apache.kafka.test.MockProcessorSupplier;
import org.apache.kafka.test.MockValueJoiner;
import org.apache.kafka.test.StreamsTestUtils;
+import org.junit.BeforeClass;
import org.junit.Test;
import java.time.Duration;
@@ -44,6 +46,7 @@ import java.util.HashSet;
import java.util.Properties;
import java.util.Set;
+import static java.time.Duration.ZERO;
import static java.time.Duration.ofMillis;
import static org.junit.Assert.assertEquals;
@@ -53,8 +56,14 @@ public class KStreamKStreamOuterJoinTest {
private final String topic1 = "topic1";
private final String topic2 = "topic2";
private final Consumed<Integer, String> consumed = Consumed.with(Serdes.Integer(), Serdes.String());
- private final Properties props = StreamsTestUtils.getStreamsConfig(Serdes.String(), Serdes.String());
+ private final static Properties PROPS = StreamsTestUtils.getStreamsConfig(Serdes.String(), Serdes.String());
+ @BeforeClass
+ public static void beforeClass() {
+ PROPS.put(InternalConfig.EMIT_INTERVAL_MS_KSTREAMS_OUTER_JOIN_SPURIOUS_RESULTS_FIX, 0L);
+ }
+
+ @SuppressWarnings("deprecation") // old join semantics; can be removed when `JoinWindows.of()` is removed
@Test
public void testOuterJoinDuplicatesWithFixDisabledOldApi() {
final StreamsBuilder builder = new StreamsBuilder();
@@ -74,7 +83,7 @@ public class KStreamKStreamOuterJoinTest {
);
joined.process(supplier);
- try (final TopologyTestDriver driver = new TopologyTestDriver(builder.build(props), props)) {
+ try (final TopologyTestDriver driver = new TopologyTestDriver(builder.build(PROPS), PROPS)) {
final TestInputTopic<Integer, String> inputTopic1 =
driver.createInputTopic(topic1, new IntegerSerializer(), new StringSerializer(), Instant.ofEpochMilli(0L), Duration.ZERO);
final TestInputTopic<Integer, String> inputTopic2 =
@@ -118,7 +127,7 @@ public class KStreamKStreamOuterJoinTest {
);
joined.process(supplier);
- try (final TopologyTestDriver driver = new TopologyTestDriver(builder.build(), props)) {
+ try (final TopologyTestDriver driver = new TopologyTestDriver(builder.build(), PROPS)) {
final TestInputTopic<Integer, String> inputTopic1 =
driver.createInputTopic(topic1, new IntegerSerializer(), new StringSerializer(), Instant.ofEpochMilli(0L), Duration.ZERO);
final TestInputTopic<Integer, String> inputTopic2 =
@@ -156,7 +165,6 @@ public class KStreamKStreamOuterJoinTest {
// this record should expired non-joined records; only null+a0 will be emitted because
// it did not have a join
- driver.advanceWallClockTime(Duration.ofMillis(1000L));
inputTopic2.pipeInput(3, "dummy", 1500L);
processor.checkAndClearProcessResult(
@@ -185,7 +193,7 @@ public class KStreamKStreamOuterJoinTest {
);
joined.process(supplier);
- try (final TopologyTestDriver driver = new TopologyTestDriver(builder.build(), props)) {
+ try (final TopologyTestDriver driver = new TopologyTestDriver(builder.build(), PROPS)) {
final TestInputTopic<Integer, String> inputTopic1 =
driver.createInputTopic(topic1, new IntegerSerializer(), new StringSerializer(), Instant.ofEpochMilli(0L), Duration.ZERO);
final TestInputTopic<Integer, String> inputTopic2 =
@@ -240,7 +248,7 @@ public class KStreamKStreamOuterJoinTest {
);
joined.process(supplier);
- try (final TopologyTestDriver driver = new TopologyTestDriver(builder.build(), props)) {
+ try (final TopologyTestDriver driver = new TopologyTestDriver(builder.build(), PROPS)) {
final TestInputTopic<Integer, String> inputTopic1 =
driver.createInputTopic(topic1, new IntegerSerializer(), new StringSerializer(), Instant.ofEpochMilli(0L), Duration.ZERO);
final TestInputTopic<Integer, String> inputTopic2 =
@@ -295,7 +303,7 @@ public class KStreamKStreamOuterJoinTest {
);
joined.process(supplier);
- try (final TopologyTestDriver driver = new TopologyTestDriver(builder.build(), props)) {
+ try (final TopologyTestDriver driver = new TopologyTestDriver(builder.build(), PROPS)) {
final TestInputTopic<Integer, String> inputTopic1 =
driver.createInputTopic(topic1, new IntegerSerializer(), new StringSerializer(), Instant.ofEpochMilli(0L), Duration.ZERO);
final TestInputTopic<Integer, String> inputTopic2 =
@@ -350,7 +358,7 @@ public class KStreamKStreamOuterJoinTest {
);
joined.process(supplier);
- try (final TopologyTestDriver driver = new TopologyTestDriver(builder.build(), props)) {
+ try (final TopologyTestDriver driver = new TopologyTestDriver(builder.build(), PROPS)) {
final TestInputTopic<Integer, String> inputTopic1 =
driver.createInputTopic(topic1, new IntegerSerializer(), new StringSerializer(), Instant.ofEpochMilli(0L), Duration.ZERO);
final TestInputTopic<Integer, String> inputTopic2 =
@@ -405,7 +413,7 @@ public class KStreamKStreamOuterJoinTest {
);
joined.process(supplier);
- try (final TopologyTestDriver driver = new TopologyTestDriver(builder.build(), props)) {
+ try (final TopologyTestDriver driver = new TopologyTestDriver(builder.build(), PROPS)) {
final TestInputTopic<Integer, String> inputTopic1 =
driver.createInputTopic(topic1, new IntegerSerializer(), new StringSerializer(), Instant.ofEpochMilli(0L), Duration.ZERO);
final TestInputTopic<Integer, String> inputTopic2 =
@@ -461,7 +469,7 @@ public class KStreamKStreamOuterJoinTest {
assertEquals(1, copartitionGroups.size());
assertEquals(new HashSet<>(Arrays.asList(topic1, topic2)), copartitionGroups.iterator().next());
- try (final TopologyTestDriver driver = new TopologyTestDriver(builder.build(), props)) {
+ try (final TopologyTestDriver driver = new TopologyTestDriver(builder.build(), PROPS)) {
final TestInputTopic<Integer, String> inputTopic1 =
driver.createInputTopic(topic1, new IntegerSerializer(), new StringSerializer(), Instant.ofEpochMilli(0L), Duration.ZERO);
final TestInputTopic<Integer, String> inputTopic2 =
@@ -560,7 +568,7 @@ public class KStreamKStreamOuterJoinTest {
assertEquals(1, copartitionGroups.size());
assertEquals(new HashSet<>(Arrays.asList(topic1, topic2)), copartitionGroups.iterator().next());
- try (final TopologyTestDriver driver = new TopologyTestDriver(builder.build(), props)) {
+ try (final TopologyTestDriver driver = new TopologyTestDriver(builder.build(), PROPS)) {
final TestInputTopic<Integer, String> inputTopic1 =
driver.createInputTopic(topic1, new IntegerSerializer(), new StringSerializer(), Instant.ofEpochMilli(0L), Duration.ZERO);
final TestInputTopic<Integer, String> inputTopic2 =
@@ -673,7 +681,7 @@ public class KStreamKStreamOuterJoinTest {
assertEquals(1, copartitionGroups.size());
assertEquals(new HashSet<>(Arrays.asList(topic1, topic2)), copartitionGroups.iterator().next());
- try (final TopologyTestDriver driver = new TopologyTestDriver(builder.build(), props)) {
+ try (final TopologyTestDriver driver = new TopologyTestDriver(builder.build(), PROPS)) {
final TestInputTopic<Integer, String> inputTopic1 =
driver.createInputTopic(topic1, new IntegerSerializer(), new StringSerializer(), Instant.ofEpochMilli(0L), Duration.ZERO);
final TestInputTopic<Integer, String> inputTopic2 =
@@ -709,6 +717,178 @@ public class KStreamKStreamOuterJoinTest {
}
}
+ @Test
+ public void shouldNotEmitLeftJoinResultForAsymmetricBeforeWindow() {
+ final StreamsBuilder builder = new StreamsBuilder();
+ final int[] expectedKeys = new int[] {0, 1, 2, 3};
+
+ final KStream<Integer, String> stream1;
+ final KStream<Integer, String> stream2;
+ final KStream<Integer, String> joined;
+ final MockProcessorSupplier<Integer, String> supplier = new MockProcessorSupplier<>();
+ stream1 = builder.stream(topic1, consumed);
+ stream2 = builder.stream(topic2, consumed);
+
+ joined = stream1.outerJoin(
+ stream2,
+ MockValueJoiner.TOSTRING_JOINER,
+ JoinWindows.ofTimeDifferenceWithNoGrace(ofMillis(100)).before(ZERO),
+ StreamJoined.with(Serdes.Integer(), Serdes.String(), Serdes.String())
+ );
+ joined.process(supplier);
+
+ try (final TopologyTestDriver driver = new TopologyTestDriver(builder.build(), PROPS)) {
+ final TestInputTopic<Integer, String> inputTopic1 =
+ driver.createInputTopic(topic1, new IntegerSerializer(), new StringSerializer(), Instant.ofEpochMilli(0L), Duration.ZERO);
+ final TestInputTopic<Integer, String> inputTopic2 =
+ driver.createInputTopic(topic2, new IntegerSerializer(), new StringSerializer(), Instant.ofEpochMilli(0L), Duration.ZERO);
+ final MockProcessor<Integer, String> processor = supplier.theCapturedProcessor();
+ long time = 0L;
+
+ // push two items to the primary stream; the other window is empty; this should not produce any items
+ // w1 = {}
+ // w2 = {}
+ // --> w1 = { 0:A0 (ts: 0), 1:A1 (ts: 1) }
+ // --> w2 = {}
+ for (int i = 0; i < 2; i++) {
+ inputTopic1.pipeInput(expectedKeys[i], "A" + expectedKeys[i], time + i);
+ }
+ processor.checkAndClearProcessResult();
+
+ // push one item to the other stream; this should produce one full-join item
+ // w1 = { 0:A0 (ts: 0), 1:A1 (ts: 1) }
+ // w2 = {}
+ // --> w1 = { 0:A0 (ts: 0), 1:A1 (ts: 1) }
+ // --> w2 = { 0:a0 (ts: 100) }
+ time += 100L;
+ inputTopic2.pipeInput(expectedKeys[0], "a" + expectedKeys[0], time);
+
+ processor.checkAndClearProcessResult(
+ new KeyValueTimestamp<>(0, "A0+a0", 100L)
+ );
+
+ // push one item to the other stream; this should produce one left-join item
+ // w1 = { 0:A0 (ts: 0), 1:A1 (ts: 1) }
+ // w2 = { 0:a0 (ts: 100) }
+ // --> w1 = { 0:A0 (ts: 0), 1:A1 (ts: 1) }
+ // --> w2 = { 0:a0 (ts: 100), 1:a1 (ts: 102) }
+ time += 2;
+ inputTopic2.pipeInput(expectedKeys[1], "a" + expectedKeys[1], time);
+
+ processor.checkAndClearProcessResult(
+ new KeyValueTimestamp<>(1, "A1+null", 1L)
+ );
+
+ // push one item to the other stream; this should not produce any items
+ // w1 = { 0:A0 (ts: 0), 1:A1 (ts: 1) }
+ // w2 = { 0:a0 (ts: 100), 1:a1 (ts: 102) }
+ // --> w1 = { 0:A0 (ts: 0), 1:A1 (ts: 1) }
+ // --> w2 = { 0:a0 (ts: 100), 1:a1 (ts: 102), 2:a2 (ts: 103) }
+ time += 1;
+ inputTopic2.pipeInput(expectedKeys[2], "a" + expectedKeys[2], time);
+
+ processor.checkAndClearProcessResult(
+ new KeyValueTimestamp<>(1, "null+a1", 102L)
+ );
+
+ // push one item to the first stream; this should not produce one full-join item
+ // w1 = { 0:A0 (ts: 0), 1:A1 (ts: 1) }
+ // w2 = { 0:a0 (ts: 100), 1:a1 (ts: 102), 2:a2 (ts: 103) }
+ // --> w1 = { 0:A0 (ts: 0), 1:A1 (ts: 1), 2:A2 (ts: 103) }
+ // --> w2 = { 0:a0 (ts: 100), 1:a1 (ts: 102), 2:a2 (ts: 103) }
+ inputTopic1.pipeInput(expectedKeys[2], "A" + expectedKeys[2], time);
+
+ processor.checkAndClearProcessResult(
+ new KeyValueTimestamp<>(2, "A2+a2", 103L)
+ );
+ }
+ }
+
+ @Test
+ public void shouldNotEmitLeftJoinResultForAsymmetricAfterWindow() {
+ final StreamsBuilder builder = new StreamsBuilder();
+ final int[] expectedKeys = new int[] {0, 1, 2, 3};
+
+ final KStream<Integer, String> stream1;
+ final KStream<Integer, String> stream2;
+ final KStream<Integer, String> joined;
+ final MockProcessorSupplier<Integer, String> supplier = new MockProcessorSupplier<>();
+ stream1 = builder.stream(topic1, consumed);
+ stream2 = builder.stream(topic2, consumed);
+
+ joined = stream1.outerJoin(
+ stream2,
+ MockValueJoiner.TOSTRING_JOINER,
+ JoinWindows.ofTimeDifferenceWithNoGrace(ofMillis(100)).after(ZERO),
+ StreamJoined.with(Serdes.Integer(), Serdes.String(), Serdes.String())
+ );
+ joined.process(supplier);
+
+ try (final TopologyTestDriver driver = new TopologyTestDriver(builder.build(), PROPS)) {
+ final TestInputTopic<Integer, String> inputTopic1 =
+ driver.createInputTopic(topic1, new IntegerSerializer(), new StringSerializer(), Instant.ofEpochMilli(0L), Duration.ZERO);
+ final TestInputTopic<Integer, String> inputTopic2 =
+ driver.createInputTopic(topic2, new IntegerSerializer(), new StringSerializer(), Instant.ofEpochMilli(0L), Duration.ZERO);
+ final MockProcessor<Integer, String> processor = supplier.theCapturedProcessor();
+ long time = 0L;
+
+ // push two items to the primary stream; the other window is empty; this should not produce any item
+ // w1 = {}
+ // w2 = {}
+ // --> w1 = { 0:A0 (ts: 0), 1:A1 (ts: 1) }
+ // --> w2 = {}
+ for (int i = 0; i < 2; i++) {
+ inputTopic1.pipeInput(expectedKeys[i], "A" + expectedKeys[i], time + i);
+ }
+ processor.checkAndClearProcessResult();
+
+ // push one item to the other stream; this should produce one full-join item
+ // w1 = { 0:A0 (ts: 0), 1:A1 (ts: 1) }
+ // w2 = {}
+ // --> w1 = { 0:A0 (ts: 0), 1:A1 (ts: 1) }
+ // --> w2 = { 1:a1 (ts: 1) }
+ time += 1;
+ inputTopic2.pipeInput(expectedKeys[1], "a" + expectedKeys[1], time);
+
+ processor.checkAndClearProcessResult(
+ new KeyValueTimestamp<>(1, "A1+a1", 1L)
+ );
+
+ // push one item to the other stream; this should produce one left-join item
+ // w1 = { 0:A0 (ts: 0), 1:A1 (ts: 1) }
+ // w2 = { 1:a1 (ts: 1) }
+ // --> w1 = { 0:A0 (ts: 0), 1:A1 (ts: 1) }
+ // --> w2 = { 1:a1 (ts: 1), 2:a2 (ts: 101) }
+ time += 100;
+ inputTopic2.pipeInput(expectedKeys[2], "a" + expectedKeys[2], time);
+
+ processor.checkAndClearProcessResult(
+ new KeyValueTimestamp<>(0, "A0+null", 0L)
+ );
+
+ // push one item to the other stream; this should not produce any item
+ // w1 = { 0:A0 (ts: 0), 1:A1 (ts: 1) }
+ // w2 = { 1:a1 (ts: 1), 2:a2 (ts: 101) }
+ // --> w1 = { 0:A0 (ts: 0), 1:A1 (ts: 1) }
+ // --> w2 = { 1:a1 (ts: 1), 2:a2 (ts: 101), 3:a3 (ts: 101) }
+ inputTopic2.pipeInput(expectedKeys[3], "a" + expectedKeys[3], time);
+
+ processor.checkAndClearProcessResult();
+
+ // push one item to the first stream; this should produce one full-join item
+ // w1 = { 0:A0 (ts: 0), 1:A1 (ts: 1) }
+ // w2 = { 1:a1 (ts: 1), 2:a2 (ts: 101), 3:a3 (ts: 101) }
+ // --> w1 = { 0:A0 (ts: 0), 1:A1 (ts: 1), 2:A2 (ts: 201) }
+ // --> w2 = { 1:a1 (ts: 1), 2:a2 (ts: 101), 3:a3 (ts: 101 }
+ time += 100;
+ inputTopic1.pipeInput(expectedKeys[2], "A" + expectedKeys[2], time);
+
+ processor.checkAndClearProcessResult(
+ new KeyValueTimestamp<>(2, "A2+a2", 201L)
+ );
+ }
+ }
+
private void testUpperWindowBound(final int[] expectedKeys,
final TopologyTestDriver driver,
final MockProcessor<Integer, String> processor) {
@@ -840,7 +1020,6 @@ public class KStreamKStreamOuterJoinTest {
// push a dummy record to produce all left-join non-joined items
time += 301L;
- driver.advanceWallClockTime(Duration.ofMillis(1000L));
inputTopic1.pipeInput(0, "dummy", time);
processor.checkAndClearProcessResult(
new KeyValueTimestamp<>(0, "C0+null", 1101L),
@@ -1034,4 +1213,4 @@ public class KStreamKStreamOuterJoinTest {
new KeyValueTimestamp<>(0, "dummy+null", 1103L)
);
}
-}
\ No newline at end of file
+}