You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by gu...@apache.org on 2021/05/05 17:24:05 UTC
[kafka] branch trunk updated: KAFKA-10847: Set shared outer store
to an in-memory store when in-memory stores are supplied (#10613)
This is an automated email from the ASF dual-hosted git repository.
guozhang pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new d915ce5 KAFKA-10847: Set shared outer store to an in-memory store when in-memory stores are supplied (#10613)
d915ce5 is described below
commit d915ce58d2adcfd6113f961ecbf337770dbe760b
Author: Sergio Peña <se...@confluent.io>
AuthorDate: Wed May 5 12:21:43 2021 -0500
KAFKA-10847: Set shared outer store to an in-memory store when in-memory stores are supplied (#10613)
When users supply in-memory stores for left/outer joins, then the internal shared outer store must be switch to in-memory store too. This will allow users who want to keep all stores in memory to continue doing so.
Added unit tests to validate topology and left/outer joins work fine with an in-memory shared store.
Reviewers: Guozhang Wang <wa...@gmail.com>
---
.../streams/kstream/internals/KStreamImplJoin.java | 76 ++++++----
.../org/apache/kafka/streams/TopologyTest.java | 165 +++++++++++++++++++++
.../internals/KStreamKStreamLeftJoinTest.java | 33 ++++-
.../internals/KStreamKStreamOuterJoinTest.java | 33 ++++-
4 files changed, 276 insertions(+), 31 deletions(-)
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamImplJoin.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamImplJoin.java
index 0977640..c5e918f 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamImplJoin.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamImplJoin.java
@@ -144,18 +144,7 @@ class KStreamImplJoin {
Optional<StoreBuilder<WindowStore<KeyAndJoinSide<K1>, LeftOrRightValue<V1, V2>>>> outerJoinWindowStore = Optional.empty();
if (leftOuter) {
- final String outerJoinSuffix = rightOuter ? "-outer-shared-join" : "-left-shared-join";
-
- // Get the suffix index of the joinThisGeneratedName to build the outer join store name.
- final String outerJoinStoreGeneratedName = KStreamImpl.OUTERSHARED_NAME
- + joinThisGeneratedName.substring(
- rightOuter
- ? KStreamImpl.OUTERTHIS_NAME.length()
- : KStreamImpl.JOINTHIS_NAME.length());
-
- final String outerJoinStoreName = userProvidedBaseStoreName == null ? outerJoinStoreGeneratedName : userProvidedBaseStoreName + outerJoinSuffix;
-
- outerJoinWindowStore = Optional.of(sharedOuterJoinWindowStoreBuilder(outerJoinStoreName, windows, streamJoinedInternal));
+ outerJoinWindowStore = Optional.of(sharedOuterJoinWindowStoreBuilder(windows, streamJoinedInternal, joinThisGeneratedName));
}
// Time shared between joins to keep track of the maximum stream time
@@ -263,20 +252,57 @@ class KStreamImplJoin {
return builder;
}
+ private <K, V1, V2> String buildOuterJoinWindowStoreName(final StreamJoinedInternal<K, V1, V2> streamJoinedInternal, final String joinThisGeneratedName) {
+ final String outerJoinSuffix = rightOuter ? "-outer-shared-join" : "-left-shared-join";
+
+ if (streamJoinedInternal.thisStoreSupplier() != null && !streamJoinedInternal.thisStoreSupplier().name().isEmpty()) {
+ return streamJoinedInternal.thisStoreSupplier().name() + outerJoinSuffix;
+ } else if (streamJoinedInternal.storeName() != null) {
+ return streamJoinedInternal.storeName() + outerJoinSuffix;
+ } else {
+ return KStreamImpl.OUTERSHARED_NAME
+ + joinThisGeneratedName.substring(
+ rightOuter
+ ? KStreamImpl.OUTERTHIS_NAME.length()
+ : KStreamImpl.JOINTHIS_NAME.length());
+ }
+ }
+
@SuppressWarnings("unchecked")
- private static <K, V1, V2> StoreBuilder<WindowStore<KeyAndJoinSide<K>, LeftOrRightValue<V1, V2>>> sharedOuterJoinWindowStoreBuilder(final String storeName,
- final JoinWindows windows,
- final StreamJoinedInternal<K, V1, V2> streamJoinedInternal) {
- final StoreBuilder<WindowStore<KeyAndJoinSide<K>, LeftOrRightValue<V1, V2>>> builder = new TimeOrderedWindowStoreBuilder<KeyAndJoinSide<K>, LeftOrRightValue<V1, V2>>(
- persistentTimeOrderedWindowStore(
- storeName + "-store",
- Duration.ofMillis(windows.size() + windows.gracePeriodMs()),
- Duration.ofMillis(windows.size())
- ),
- new KeyAndJoinSideSerde<>(streamJoinedInternal.keySerde()),
- new LeftOrRightValueSerde(streamJoinedInternal.valueSerde(), streamJoinedInternal.otherValueSerde()),
- Time.SYSTEM
- );
+ private <K, V1, V2> StoreBuilder<WindowStore<KeyAndJoinSide<K>, LeftOrRightValue<V1, V2>>> sharedOuterJoinWindowStoreBuilder(final JoinWindows windows,
+ final StreamJoinedInternal<K, V1, V2> streamJoinedInternal,
+ final String joinThisGeneratedName) {
+ final boolean persistent = streamJoinedInternal.thisStoreSupplier() == null || streamJoinedInternal.thisStoreSupplier().get().persistent();
+ final String storeName = buildOuterJoinWindowStoreName(streamJoinedInternal, joinThisGeneratedName);
+
+ final KeyAndJoinSideSerde keyAndJoinSideSerde = new KeyAndJoinSideSerde<>(streamJoinedInternal.keySerde());
+ final LeftOrRightValueSerde leftOrRightValueSerde = new LeftOrRightValueSerde(streamJoinedInternal.valueSerde(), streamJoinedInternal.otherValueSerde());
+
+ final StoreBuilder<WindowStore<KeyAndJoinSide<K>, LeftOrRightValue<V1, V2>>> builder;
+ if (persistent) {
+ builder = new TimeOrderedWindowStoreBuilder<KeyAndJoinSide<K>, LeftOrRightValue<V1, V2>>(
+ persistentTimeOrderedWindowStore(
+ storeName + "-store",
+ Duration.ofMillis(windows.size() + windows.gracePeriodMs()),
+ Duration.ofMillis(windows.size())
+ ),
+ keyAndJoinSideSerde,
+ leftOrRightValueSerde,
+ Time.SYSTEM
+ );
+ } else {
+ builder = Stores.windowStoreBuilder(
+ Stores.inMemoryWindowStore(
+ storeName + "-store",
+ Duration.ofMillis(windows.size() + windows.gracePeriodMs()),
+ Duration.ofMillis(windows.size()),
+ false
+ ),
+ keyAndJoinSideSerde,
+ leftOrRightValueSerde
+ );
+ }
+
if (streamJoinedInternal.loggingEnabled()) {
builder.withLoggingEnabled(streamJoinedInternal.logConfig());
} else {
diff --git a/streams/src/test/java/org/apache/kafka/streams/TopologyTest.java b/streams/src/test/java/org/apache/kafka/streams/TopologyTest.java
index 526284b..66c678d 100644
--- a/streams/src/test/java/org/apache/kafka/streams/TopologyTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/TopologyTest.java
@@ -36,6 +36,8 @@ import org.apache.kafka.streams.processor.api.Record;
import org.apache.kafka.streams.processor.internals.InternalTopologyBuilder;
import org.apache.kafka.streams.state.KeyValueStore;
import org.apache.kafka.streams.state.StoreBuilder;
+import org.apache.kafka.streams.state.Stores;
+import org.apache.kafka.streams.state.WindowBytesStoreSupplier;
import org.apache.kafka.streams.state.internals.KeyValueStoreBuilder;
import org.apache.kafka.test.MockApiProcessorSupplier;
import org.apache.kafka.test.MockKeyValueStore;
@@ -45,6 +47,7 @@ import org.easymock.EasyMock;
import org.junit.Assert;
import org.junit.Test;
+import java.time.Duration;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashSet;
@@ -779,6 +782,60 @@ public class TopologyTest {
}
@Test
+ public void streamStreamJoinTopologyWithCustomStoresSuppliers() {
+ final StreamsBuilder builder = new StreamsBuilder();
+ final KStream<Integer, String> stream1;
+ final KStream<Integer, String> stream2;
+
+ stream1 = builder.stream("input-topic1");
+ stream2 = builder.stream("input-topic2");
+
+ final JoinWindows joinWindows = JoinWindows.of(ofMillis(100));
+
+ final WindowBytesStoreSupplier thisStoreSupplier = Stores.inMemoryWindowStore("in-memory-join-store",
+ Duration.ofMillis(joinWindows.size() + joinWindows.gracePeriodMs()),
+ Duration.ofMillis(joinWindows.size()), true);
+
+ final WindowBytesStoreSupplier otherStoreSupplier = Stores.inMemoryWindowStore("in-memory-join-store-other",
+ Duration.ofMillis(joinWindows.size() + joinWindows.gracePeriodMs()),
+ Duration.ofMillis(joinWindows.size()), true);
+
+ stream1.join(
+ stream2,
+ MockValueJoiner.TOSTRING_JOINER,
+ joinWindows,
+ StreamJoined.with(Serdes.Integer(), Serdes.String(), Serdes.String())
+ .withThisStoreSupplier(thisStoreSupplier)
+ .withOtherStoreSupplier(otherStoreSupplier));
+
+ final TopologyDescription describe = builder.build().describe();
+
+ assertEquals(
+ "Topologies:\n" +
+ " Sub-topology: 0\n" +
+ " Source: KSTREAM-SOURCE-0000000000 (topics: [input-topic1])\n" +
+ " --> KSTREAM-WINDOWED-0000000002\n" +
+ " Source: KSTREAM-SOURCE-0000000001 (topics: [input-topic2])\n" +
+ " --> KSTREAM-WINDOWED-0000000003\n" +
+ " Processor: KSTREAM-WINDOWED-0000000002 (stores: [in-memory-join-store])\n" +
+ " --> KSTREAM-JOINTHIS-0000000004\n" +
+ " <-- KSTREAM-SOURCE-0000000000\n" +
+ " Processor: KSTREAM-WINDOWED-0000000003 (stores: [in-memory-join-store-other])\n" +
+ " --> KSTREAM-JOINOTHER-0000000005\n" +
+ " <-- KSTREAM-SOURCE-0000000001\n" +
+ " Processor: KSTREAM-JOINOTHER-0000000005 (stores: [in-memory-join-store])\n" +
+ " --> KSTREAM-MERGE-0000000006\n" +
+ " <-- KSTREAM-WINDOWED-0000000003\n" +
+ " Processor: KSTREAM-JOINTHIS-0000000004 (stores: [in-memory-join-store-other])\n" +
+ " --> KSTREAM-MERGE-0000000006\n" +
+ " <-- KSTREAM-WINDOWED-0000000002\n" +
+ " Processor: KSTREAM-MERGE-0000000006 (stores: [])\n" +
+ " --> none\n" +
+ " <-- KSTREAM-JOINTHIS-0000000004, KSTREAM-JOINOTHER-0000000005\n\n",
+ describe.toString());
+ }
+
+ @Test
public void streamStreamLeftJoinTopologyWithDefaultStoresNames() {
final StreamsBuilder builder = new StreamsBuilder();
final KStream<Integer, String> stream1;
@@ -864,6 +921,60 @@ public class TopologyTest {
}
@Test
+ public void streamStreamLeftJoinTopologyWithCustomStoresSuppliers() {
+ final StreamsBuilder builder = new StreamsBuilder();
+ final KStream<Integer, String> stream1;
+ final KStream<Integer, String> stream2;
+
+ stream1 = builder.stream("input-topic1");
+ stream2 = builder.stream("input-topic2");
+
+ final JoinWindows joinWindows = JoinWindows.of(ofMillis(100));
+
+ final WindowBytesStoreSupplier thisStoreSupplier = Stores.inMemoryWindowStore("in-memory-join-store",
+ Duration.ofMillis(joinWindows.size() + joinWindows.gracePeriodMs()),
+ Duration.ofMillis(joinWindows.size()), true);
+
+ final WindowBytesStoreSupplier otherStoreSupplier = Stores.inMemoryWindowStore("in-memory-join-store-other",
+ Duration.ofMillis(joinWindows.size() + joinWindows.gracePeriodMs()),
+ Duration.ofMillis(joinWindows.size()), true);
+
+ stream1.leftJoin(
+ stream2,
+ MockValueJoiner.TOSTRING_JOINER,
+ joinWindows,
+ StreamJoined.with(Serdes.Integer(), Serdes.String(), Serdes.String())
+ .withThisStoreSupplier(thisStoreSupplier)
+ .withOtherStoreSupplier(otherStoreSupplier));
+
+ final TopologyDescription describe = builder.build().describe();
+
+ assertEquals(
+ "Topologies:\n" +
+ " Sub-topology: 0\n" +
+ " Source: KSTREAM-SOURCE-0000000000 (topics: [input-topic1])\n" +
+ " --> KSTREAM-WINDOWED-0000000002\n" +
+ " Source: KSTREAM-SOURCE-0000000001 (topics: [input-topic2])\n" +
+ " --> KSTREAM-WINDOWED-0000000003\n" +
+ " Processor: KSTREAM-WINDOWED-0000000002 (stores: [in-memory-join-store])\n" +
+ " --> KSTREAM-JOINTHIS-0000000004\n" +
+ " <-- KSTREAM-SOURCE-0000000000\n" +
+ " Processor: KSTREAM-WINDOWED-0000000003 (stores: [in-memory-join-store-other])\n" +
+ " --> KSTREAM-OUTEROTHER-0000000005\n" +
+ " <-- KSTREAM-SOURCE-0000000001\n" +
+ " Processor: KSTREAM-JOINTHIS-0000000004 (stores: [in-memory-join-store-other, in-memory-join-store-left-shared-join-store])\n" +
+ " --> KSTREAM-MERGE-0000000006\n" +
+ " <-- KSTREAM-WINDOWED-0000000002\n" +
+ " Processor: KSTREAM-OUTEROTHER-0000000005 (stores: [in-memory-join-store, in-memory-join-store-left-shared-join-store])\n" +
+ " --> KSTREAM-MERGE-0000000006\n" +
+ " <-- KSTREAM-WINDOWED-0000000003\n" +
+ " Processor: KSTREAM-MERGE-0000000006 (stores: [])\n" +
+ " --> none\n" +
+ " <-- KSTREAM-JOINTHIS-0000000004, KSTREAM-OUTEROTHER-0000000005\n\n",
+ describe.toString());
+ }
+
+ @Test
public void streamStreamOuterJoinTopologyWithDefaultStoresNames() {
final StreamsBuilder builder = new StreamsBuilder();
final KStream<Integer, String> stream1;
@@ -949,6 +1060,60 @@ public class TopologyTest {
}
@Test
+ public void streamStreamOuterJoinTopologyWithCustomStoresSuppliers() {
+ final StreamsBuilder builder = new StreamsBuilder();
+ final KStream<Integer, String> stream1;
+ final KStream<Integer, String> stream2;
+
+ stream1 = builder.stream("input-topic1");
+ stream2 = builder.stream("input-topic2");
+
+ final JoinWindows joinWindows = JoinWindows.of(ofMillis(100));
+
+ final WindowBytesStoreSupplier thisStoreSupplier = Stores.inMemoryWindowStore("in-memory-join-store",
+ Duration.ofMillis(joinWindows.size() + joinWindows.gracePeriodMs()),
+ Duration.ofMillis(joinWindows.size()), true);
+
+ final WindowBytesStoreSupplier otherStoreSupplier = Stores.inMemoryWindowStore("in-memory-join-store-other",
+ Duration.ofMillis(joinWindows.size() + joinWindows.gracePeriodMs()),
+ Duration.ofMillis(joinWindows.size()), true);
+
+ stream1.outerJoin(
+ stream2,
+ MockValueJoiner.TOSTRING_JOINER,
+ joinWindows,
+ StreamJoined.with(Serdes.Integer(), Serdes.String(), Serdes.String())
+ .withThisStoreSupplier(thisStoreSupplier)
+ .withOtherStoreSupplier(otherStoreSupplier));
+
+ final TopologyDescription describe = builder.build().describe();
+
+ assertEquals(
+ "Topologies:\n" +
+ " Sub-topology: 0\n" +
+ " Source: KSTREAM-SOURCE-0000000000 (topics: [input-topic1])\n" +
+ " --> KSTREAM-WINDOWED-0000000002\n" +
+ " Source: KSTREAM-SOURCE-0000000001 (topics: [input-topic2])\n" +
+ " --> KSTREAM-WINDOWED-0000000003\n" +
+ " Processor: KSTREAM-WINDOWED-0000000002 (stores: [in-memory-join-store])\n" +
+ " --> KSTREAM-OUTERTHIS-0000000004\n" +
+ " <-- KSTREAM-SOURCE-0000000000\n" +
+ " Processor: KSTREAM-WINDOWED-0000000003 (stores: [in-memory-join-store-other])\n" +
+ " --> KSTREAM-OUTEROTHER-0000000005\n" +
+ " <-- KSTREAM-SOURCE-0000000001\n" +
+ " Processor: KSTREAM-OUTEROTHER-0000000005 (stores: [in-memory-join-store-outer-shared-join-store, in-memory-join-store])\n" +
+ " --> KSTREAM-MERGE-0000000006\n" +
+ " <-- KSTREAM-WINDOWED-0000000003\n" +
+ " Processor: KSTREAM-OUTERTHIS-0000000004 (stores: [in-memory-join-store-other, in-memory-join-store-outer-shared-join-store])\n" +
+ " --> KSTREAM-MERGE-0000000006\n" +
+ " <-- KSTREAM-WINDOWED-0000000002\n" +
+ " Processor: KSTREAM-MERGE-0000000006 (stores: [])\n" +
+ " --> none\n" +
+ " <-- KSTREAM-OUTERTHIS-0000000004, KSTREAM-OUTEROTHER-0000000005\n\n",
+ describe.toString());
+ }
+
+ @Test
public void topologyWithDynamicRoutingShouldDescribeExtractorClass() {
final StreamsBuilder builder = new StreamsBuilder();
diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamLeftJoinTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamLeftJoinTest.java
index 5c4bccf..50c2f86 100644
--- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamLeftJoinTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamLeftJoinTest.java
@@ -30,6 +30,8 @@ import org.apache.kafka.streams.kstream.KStream;
import org.apache.kafka.streams.TestInputTopic;
import org.apache.kafka.streams.kstream.StreamJoined;
import org.apache.kafka.streams.processor.internals.testutil.LogCaptureAppender;
+import org.apache.kafka.streams.state.Stores;
+import org.apache.kafka.streams.state.WindowBytesStoreSupplier;
import org.apache.kafka.test.MockProcessor;
import org.apache.kafka.test.MockProcessorSupplier;
import org.apache.kafka.test.MockValueJoiner;
@@ -429,7 +431,32 @@ public class KStreamKStreamLeftJoinTest {
}
@Test
- public void testLeftJoin() {
+ public void testLeftJoinWithInMemoryCustomSuppliers() {
+ final JoinWindows joinWindows = JoinWindows.of(ofMillis(100));
+
+ final WindowBytesStoreSupplier thisStoreSupplier = Stores.inMemoryWindowStore("in-memory-join-store",
+ Duration.ofMillis(joinWindows.size() + joinWindows.gracePeriodMs()),
+ Duration.ofMillis(joinWindows.size()), true);
+
+ final WindowBytesStoreSupplier otherStoreSupplier = Stores.inMemoryWindowStore("in-memory-join-store-other",
+ Duration.ofMillis(joinWindows.size() + joinWindows.gracePeriodMs()),
+ Duration.ofMillis(joinWindows.size()), true);
+
+ final StreamJoined<Integer, String, String> streamJoined = StreamJoined.with(Serdes.Integer(), Serdes.String(), Serdes.String());
+
+ runLeftJoin(streamJoined.withThisStoreSupplier(thisStoreSupplier).withOtherStoreSupplier(otherStoreSupplier), joinWindows);
+ }
+
+ @Test
+ public void testLeftJoinWithDefaultSuppliers() {
+ final JoinWindows joinWindows = JoinWindows.of(ofMillis(100));
+ final StreamJoined<Integer, String, String> streamJoined = StreamJoined.with(Serdes.Integer(), Serdes.String(), Serdes.String());
+
+ runLeftJoin(streamJoined, joinWindows);
+ }
+
+ public void runLeftJoin(final StreamJoined<Integer, String, String> streamJoined,
+ final JoinWindows joinWindows) {
final StreamsBuilder builder = new StreamsBuilder();
final int[] expectedKeys = new int[] {0, 1, 2, 3};
@@ -444,8 +471,8 @@ public class KStreamKStreamLeftJoinTest {
joined = stream1.leftJoin(
stream2,
MockValueJoiner.TOSTRING_JOINER,
- JoinWindows.of(ofMillis(100)),
- StreamJoined.with(Serdes.Integer(), Serdes.String(), Serdes.String()));
+ joinWindows,
+ streamJoined);
joined.process(supplier);
final Collection<Set<String>> copartitionGroups =
diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamOuterJoinTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamOuterJoinTest.java
index c0f88d8..2bf7fef 100644
--- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamOuterJoinTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamOuterJoinTest.java
@@ -30,6 +30,8 @@ import org.apache.kafka.streams.kstream.JoinWindows;
import org.apache.kafka.streams.kstream.KStream;
import org.apache.kafka.streams.kstream.StreamJoined;
import org.apache.kafka.streams.processor.internals.testutil.LogCaptureAppender;
+import org.apache.kafka.streams.state.Stores;
+import org.apache.kafka.streams.state.WindowBytesStoreSupplier;
import org.apache.kafka.test.MockProcessor;
import org.apache.kafka.test.MockProcessorSupplier;
import org.apache.kafka.test.MockValueJoiner;
@@ -498,7 +500,32 @@ public class KStreamKStreamOuterJoinTest {
}
@Test
- public void testOuterJoin() {
+ public void testOuterJoinWithInMemoryCustomSuppliers() {
+ final JoinWindows joinWindows = JoinWindows.of(ofMillis(100));
+
+ final WindowBytesStoreSupplier thisStoreSupplier = Stores.inMemoryWindowStore("in-memory-join-store",
+ Duration.ofMillis(joinWindows.size() + joinWindows.gracePeriodMs()),
+ Duration.ofMillis(joinWindows.size()), true);
+
+ final WindowBytesStoreSupplier otherStoreSupplier = Stores.inMemoryWindowStore("in-memory-join-store-other",
+ Duration.ofMillis(joinWindows.size() + joinWindows.gracePeriodMs()),
+ Duration.ofMillis(joinWindows.size()), true);
+
+ final StreamJoined<Integer, String, String> streamJoined = StreamJoined.with(Serdes.Integer(), Serdes.String(), Serdes.String());
+
+ runOuterJoin(streamJoined.withThisStoreSupplier(thisStoreSupplier).withOtherStoreSupplier(otherStoreSupplier), joinWindows);
+ }
+
+ @Test
+ public void testOuterJoinWithDefaultSuppliers() {
+ final JoinWindows joinWindows = JoinWindows.of(ofMillis(100));
+ final StreamJoined<Integer, String, String> streamJoined = StreamJoined.with(Serdes.Integer(), Serdes.String(), Serdes.String());
+
+ runOuterJoin(streamJoined, joinWindows);
+ }
+
+ public void runOuterJoin(final StreamJoined<Integer, String, String> streamJoined,
+ final JoinWindows joinWindows) {
final StreamsBuilder builder = new StreamsBuilder();
final int[] expectedKeys = new int[] {0, 1, 2, 3};
@@ -513,8 +540,8 @@ public class KStreamKStreamOuterJoinTest {
joined = stream1.outerJoin(
stream2,
MockValueJoiner.TOSTRING_JOINER,
- JoinWindows.of(ofMillis(100)).grace(ofMillis(0)),
- StreamJoined.with(Serdes.Integer(), Serdes.String(), Serdes.String()));
+ joinWindows,
+ streamJoined);
joined.process(supplier);
final Collection<Set<String>> copartitionGroups =