You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by gu...@apache.org on 2021/05/05 17:24:05 UTC

[kafka] branch trunk updated: KAFKA-10847: Set shared outer store to an in-memory store when in-memory stores are supplied (#10613)

This is an automated email from the ASF dual-hosted git repository.

guozhang pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new d915ce5  KAFKA-10847: Set shared outer store to an in-memory store when in-memory stores are supplied (#10613)
d915ce5 is described below

commit d915ce58d2adcfd6113f961ecbf337770dbe760b
Author: Sergio Peña <se...@confluent.io>
AuthorDate: Wed May 5 12:21:43 2021 -0500

    KAFKA-10847: Set shared outer store to an in-memory store when in-memory stores are supplied (#10613)
    
    When users supply in-memory stores for left/outer joins, then the internal shared outer store must be switch to in-memory store too. This will allow users who want to keep all stores in memory to continue doing so.
    
    Added unit tests to validate topology and left/outer joins work fine with an in-memory shared store.
    
    Reviewers: Guozhang Wang <wa...@gmail.com>
---
 .../streams/kstream/internals/KStreamImplJoin.java |  76 ++++++----
 .../org/apache/kafka/streams/TopologyTest.java     | 165 +++++++++++++++++++++
 .../internals/KStreamKStreamLeftJoinTest.java      |  33 ++++-
 .../internals/KStreamKStreamOuterJoinTest.java     |  33 ++++-
 4 files changed, 276 insertions(+), 31 deletions(-)

diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamImplJoin.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamImplJoin.java
index 0977640..c5e918f 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamImplJoin.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamImplJoin.java
@@ -144,18 +144,7 @@ class KStreamImplJoin {
 
         Optional<StoreBuilder<WindowStore<KeyAndJoinSide<K1>, LeftOrRightValue<V1, V2>>>> outerJoinWindowStore = Optional.empty();
         if (leftOuter) {
-            final String outerJoinSuffix = rightOuter ? "-outer-shared-join" : "-left-shared-join";
-
-            // Get the suffix index of the joinThisGeneratedName to build the outer join store name.
-            final String outerJoinStoreGeneratedName = KStreamImpl.OUTERSHARED_NAME
-                + joinThisGeneratedName.substring(
-                    rightOuter
-                        ? KStreamImpl.OUTERTHIS_NAME.length()
-                        : KStreamImpl.JOINTHIS_NAME.length());
-
-            final String outerJoinStoreName = userProvidedBaseStoreName == null ? outerJoinStoreGeneratedName : userProvidedBaseStoreName + outerJoinSuffix;
-
-            outerJoinWindowStore = Optional.of(sharedOuterJoinWindowStoreBuilder(outerJoinStoreName, windows, streamJoinedInternal));
+            outerJoinWindowStore = Optional.of(sharedOuterJoinWindowStoreBuilder(windows, streamJoinedInternal, joinThisGeneratedName));
         }
 
         // Time shared between joins to keep track of the maximum stream time
@@ -263,20 +252,57 @@ class KStreamImplJoin {
         return builder;
     }
 
+    private <K, V1, V2> String buildOuterJoinWindowStoreName(final StreamJoinedInternal<K, V1, V2> streamJoinedInternal, final String joinThisGeneratedName) {
+        final String outerJoinSuffix = rightOuter ? "-outer-shared-join" : "-left-shared-join";
+
+        if (streamJoinedInternal.thisStoreSupplier() != null && !streamJoinedInternal.thisStoreSupplier().name().isEmpty()) {
+            return streamJoinedInternal.thisStoreSupplier().name() + outerJoinSuffix;
+        } else if (streamJoinedInternal.storeName() != null) {
+            return streamJoinedInternal.storeName() + outerJoinSuffix;
+        } else {
+            return KStreamImpl.OUTERSHARED_NAME
+                + joinThisGeneratedName.substring(
+                rightOuter
+                    ? KStreamImpl.OUTERTHIS_NAME.length()
+                    : KStreamImpl.JOINTHIS_NAME.length());
+        }
+    }
+
     @SuppressWarnings("unchecked")
-    private static <K, V1, V2> StoreBuilder<WindowStore<KeyAndJoinSide<K>, LeftOrRightValue<V1, V2>>> sharedOuterJoinWindowStoreBuilder(final String storeName,
-                                                                                                                                        final JoinWindows windows,
-                                                                                                                                        final StreamJoinedInternal<K, V1, V2> streamJoinedInternal) {
-        final StoreBuilder<WindowStore<KeyAndJoinSide<K>, LeftOrRightValue<V1, V2>>> builder = new TimeOrderedWindowStoreBuilder<KeyAndJoinSide<K>, LeftOrRightValue<V1, V2>>(
-            persistentTimeOrderedWindowStore(
-                storeName + "-store",
-                Duration.ofMillis(windows.size() + windows.gracePeriodMs()),
-                Duration.ofMillis(windows.size())
-            ),
-            new KeyAndJoinSideSerde<>(streamJoinedInternal.keySerde()),
-            new LeftOrRightValueSerde(streamJoinedInternal.valueSerde(), streamJoinedInternal.otherValueSerde()),
-            Time.SYSTEM
-        );
+    private <K, V1, V2> StoreBuilder<WindowStore<KeyAndJoinSide<K>, LeftOrRightValue<V1, V2>>> sharedOuterJoinWindowStoreBuilder(final JoinWindows windows,
+                                                                                                                                 final StreamJoinedInternal<K, V1, V2> streamJoinedInternal,
+                                                                                                                                 final String joinThisGeneratedName) {
+        final boolean persistent = streamJoinedInternal.thisStoreSupplier() == null || streamJoinedInternal.thisStoreSupplier().get().persistent();
+        final String storeName = buildOuterJoinWindowStoreName(streamJoinedInternal, joinThisGeneratedName);
+
+        final KeyAndJoinSideSerde keyAndJoinSideSerde = new KeyAndJoinSideSerde<>(streamJoinedInternal.keySerde());
+        final LeftOrRightValueSerde leftOrRightValueSerde = new LeftOrRightValueSerde(streamJoinedInternal.valueSerde(), streamJoinedInternal.otherValueSerde());
+
+        final StoreBuilder<WindowStore<KeyAndJoinSide<K>, LeftOrRightValue<V1, V2>>> builder;
+        if (persistent) {
+            builder = new TimeOrderedWindowStoreBuilder<KeyAndJoinSide<K>, LeftOrRightValue<V1, V2>>(
+                persistentTimeOrderedWindowStore(
+                    storeName + "-store",
+                    Duration.ofMillis(windows.size() + windows.gracePeriodMs()),
+                    Duration.ofMillis(windows.size())
+                ),
+                keyAndJoinSideSerde,
+                leftOrRightValueSerde,
+                Time.SYSTEM
+            );
+        } else {
+            builder = Stores.windowStoreBuilder(
+                Stores.inMemoryWindowStore(
+                    storeName + "-store",
+                    Duration.ofMillis(windows.size() + windows.gracePeriodMs()),
+                    Duration.ofMillis(windows.size()),
+                    false
+                ),
+                keyAndJoinSideSerde,
+                leftOrRightValueSerde
+            );
+        }
+
         if (streamJoinedInternal.loggingEnabled()) {
             builder.withLoggingEnabled(streamJoinedInternal.logConfig());
         } else {
diff --git a/streams/src/test/java/org/apache/kafka/streams/TopologyTest.java b/streams/src/test/java/org/apache/kafka/streams/TopologyTest.java
index 526284b..66c678d 100644
--- a/streams/src/test/java/org/apache/kafka/streams/TopologyTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/TopologyTest.java
@@ -36,6 +36,8 @@ import org.apache.kafka.streams.processor.api.Record;
 import org.apache.kafka.streams.processor.internals.InternalTopologyBuilder;
 import org.apache.kafka.streams.state.KeyValueStore;
 import org.apache.kafka.streams.state.StoreBuilder;
+import org.apache.kafka.streams.state.Stores;
+import org.apache.kafka.streams.state.WindowBytesStoreSupplier;
 import org.apache.kafka.streams.state.internals.KeyValueStoreBuilder;
 import org.apache.kafka.test.MockApiProcessorSupplier;
 import org.apache.kafka.test.MockKeyValueStore;
@@ -45,6 +47,7 @@ import org.easymock.EasyMock;
 import org.junit.Assert;
 import org.junit.Test;
 
+import java.time.Duration;
 import java.util.Arrays;
 import java.util.Collections;
 import java.util.HashSet;
@@ -779,6 +782,60 @@ public class TopologyTest {
     }
 
     @Test
+    public void streamStreamJoinTopologyWithCustomStoresSuppliers() {
+        final StreamsBuilder builder  = new StreamsBuilder();
+        final KStream<Integer, String> stream1;
+        final KStream<Integer, String> stream2;
+
+        stream1 = builder.stream("input-topic1");
+        stream2 = builder.stream("input-topic2");
+
+        final JoinWindows joinWindows = JoinWindows.of(ofMillis(100));
+
+        final WindowBytesStoreSupplier thisStoreSupplier = Stores.inMemoryWindowStore("in-memory-join-store",
+            Duration.ofMillis(joinWindows.size() + joinWindows.gracePeriodMs()),
+            Duration.ofMillis(joinWindows.size()), true);
+
+        final WindowBytesStoreSupplier otherStoreSupplier = Stores.inMemoryWindowStore("in-memory-join-store-other",
+            Duration.ofMillis(joinWindows.size() + joinWindows.gracePeriodMs()),
+            Duration.ofMillis(joinWindows.size()), true);
+
+        stream1.join(
+            stream2,
+            MockValueJoiner.TOSTRING_JOINER,
+            joinWindows,
+            StreamJoined.with(Serdes.Integer(), Serdes.String(), Serdes.String())
+                .withThisStoreSupplier(thisStoreSupplier)
+                .withOtherStoreSupplier(otherStoreSupplier));
+
+        final TopologyDescription describe = builder.build().describe();
+
+        assertEquals(
+            "Topologies:\n" +
+                "   Sub-topology: 0\n" +
+                "    Source: KSTREAM-SOURCE-0000000000 (topics: [input-topic1])\n" +
+                "      --> KSTREAM-WINDOWED-0000000002\n" +
+                "    Source: KSTREAM-SOURCE-0000000001 (topics: [input-topic2])\n" +
+                "      --> KSTREAM-WINDOWED-0000000003\n" +
+                "    Processor: KSTREAM-WINDOWED-0000000002 (stores: [in-memory-join-store])\n" +
+                "      --> KSTREAM-JOINTHIS-0000000004\n" +
+                "      <-- KSTREAM-SOURCE-0000000000\n" +
+                "    Processor: KSTREAM-WINDOWED-0000000003 (stores: [in-memory-join-store-other])\n" +
+                "      --> KSTREAM-JOINOTHER-0000000005\n" +
+                "      <-- KSTREAM-SOURCE-0000000001\n" +
+                "    Processor: KSTREAM-JOINOTHER-0000000005 (stores: [in-memory-join-store])\n" +
+                "      --> KSTREAM-MERGE-0000000006\n" +
+                "      <-- KSTREAM-WINDOWED-0000000003\n" +
+                "    Processor: KSTREAM-JOINTHIS-0000000004 (stores: [in-memory-join-store-other])\n" +
+                "      --> KSTREAM-MERGE-0000000006\n" +
+                "      <-- KSTREAM-WINDOWED-0000000002\n" +
+                "    Processor: KSTREAM-MERGE-0000000006 (stores: [])\n" +
+                "      --> none\n" +
+                "      <-- KSTREAM-JOINTHIS-0000000004, KSTREAM-JOINOTHER-0000000005\n\n",
+            describe.toString());
+    }
+
+    @Test
     public void streamStreamLeftJoinTopologyWithDefaultStoresNames() {
         final StreamsBuilder builder  = new StreamsBuilder();
         final KStream<Integer, String> stream1;
@@ -864,6 +921,60 @@ public class TopologyTest {
     }
 
     @Test
+    public void streamStreamLeftJoinTopologyWithCustomStoresSuppliers() {
+        final StreamsBuilder builder  = new StreamsBuilder();
+        final KStream<Integer, String> stream1;
+        final KStream<Integer, String> stream2;
+
+        stream1 = builder.stream("input-topic1");
+        stream2 = builder.stream("input-topic2");
+
+        final JoinWindows joinWindows = JoinWindows.of(ofMillis(100));
+
+        final WindowBytesStoreSupplier thisStoreSupplier = Stores.inMemoryWindowStore("in-memory-join-store",
+            Duration.ofMillis(joinWindows.size() + joinWindows.gracePeriodMs()),
+            Duration.ofMillis(joinWindows.size()), true);
+
+        final WindowBytesStoreSupplier otherStoreSupplier = Stores.inMemoryWindowStore("in-memory-join-store-other",
+            Duration.ofMillis(joinWindows.size() + joinWindows.gracePeriodMs()),
+            Duration.ofMillis(joinWindows.size()), true);
+
+        stream1.leftJoin(
+            stream2,
+            MockValueJoiner.TOSTRING_JOINER,
+            joinWindows,
+            StreamJoined.with(Serdes.Integer(), Serdes.String(), Serdes.String())
+                .withThisStoreSupplier(thisStoreSupplier)
+                .withOtherStoreSupplier(otherStoreSupplier));
+
+        final TopologyDescription describe = builder.build().describe();
+
+        assertEquals(
+            "Topologies:\n" +
+                "   Sub-topology: 0\n" +
+                "    Source: KSTREAM-SOURCE-0000000000 (topics: [input-topic1])\n" +
+                "      --> KSTREAM-WINDOWED-0000000002\n" +
+                "    Source: KSTREAM-SOURCE-0000000001 (topics: [input-topic2])\n" +
+                "      --> KSTREAM-WINDOWED-0000000003\n" +
+                "    Processor: KSTREAM-WINDOWED-0000000002 (stores: [in-memory-join-store])\n" +
+                "      --> KSTREAM-JOINTHIS-0000000004\n" +
+                "      <-- KSTREAM-SOURCE-0000000000\n" +
+                "    Processor: KSTREAM-WINDOWED-0000000003 (stores: [in-memory-join-store-other])\n" +
+                "      --> KSTREAM-OUTEROTHER-0000000005\n" +
+                "      <-- KSTREAM-SOURCE-0000000001\n" +
+                "    Processor: KSTREAM-JOINTHIS-0000000004 (stores: [in-memory-join-store-other, in-memory-join-store-left-shared-join-store])\n" +
+                "      --> KSTREAM-MERGE-0000000006\n" +
+                "      <-- KSTREAM-WINDOWED-0000000002\n" +
+                "    Processor: KSTREAM-OUTEROTHER-0000000005 (stores: [in-memory-join-store, in-memory-join-store-left-shared-join-store])\n" +
+                "      --> KSTREAM-MERGE-0000000006\n" +
+                "      <-- KSTREAM-WINDOWED-0000000003\n" +
+                "    Processor: KSTREAM-MERGE-0000000006 (stores: [])\n" +
+                "      --> none\n" +
+                "      <-- KSTREAM-JOINTHIS-0000000004, KSTREAM-OUTEROTHER-0000000005\n\n",
+            describe.toString());
+    }
+
+    @Test
     public void streamStreamOuterJoinTopologyWithDefaultStoresNames() {
         final StreamsBuilder builder  = new StreamsBuilder();
         final KStream<Integer, String> stream1;
@@ -949,6 +1060,60 @@ public class TopologyTest {
     }
 
     @Test
+    public void streamStreamOuterJoinTopologyWithCustomStoresSuppliers() {
+        final StreamsBuilder builder  = new StreamsBuilder();
+        final KStream<Integer, String> stream1;
+        final KStream<Integer, String> stream2;
+
+        stream1 = builder.stream("input-topic1");
+        stream2 = builder.stream("input-topic2");
+
+        final JoinWindows joinWindows = JoinWindows.of(ofMillis(100));
+
+        final WindowBytesStoreSupplier thisStoreSupplier = Stores.inMemoryWindowStore("in-memory-join-store",
+            Duration.ofMillis(joinWindows.size() + joinWindows.gracePeriodMs()),
+            Duration.ofMillis(joinWindows.size()), true);
+
+        final WindowBytesStoreSupplier otherStoreSupplier = Stores.inMemoryWindowStore("in-memory-join-store-other",
+            Duration.ofMillis(joinWindows.size() + joinWindows.gracePeriodMs()),
+            Duration.ofMillis(joinWindows.size()), true);
+
+        stream1.outerJoin(
+            stream2,
+            MockValueJoiner.TOSTRING_JOINER,
+            joinWindows,
+            StreamJoined.with(Serdes.Integer(), Serdes.String(), Serdes.String())
+                .withThisStoreSupplier(thisStoreSupplier)
+                .withOtherStoreSupplier(otherStoreSupplier));
+
+        final TopologyDescription describe = builder.build().describe();
+
+        assertEquals(
+            "Topologies:\n" +
+                "   Sub-topology: 0\n" +
+                "    Source: KSTREAM-SOURCE-0000000000 (topics: [input-topic1])\n" +
+                "      --> KSTREAM-WINDOWED-0000000002\n" +
+                "    Source: KSTREAM-SOURCE-0000000001 (topics: [input-topic2])\n" +
+                "      --> KSTREAM-WINDOWED-0000000003\n" +
+                "    Processor: KSTREAM-WINDOWED-0000000002 (stores: [in-memory-join-store])\n" +
+                "      --> KSTREAM-OUTERTHIS-0000000004\n" +
+                "      <-- KSTREAM-SOURCE-0000000000\n" +
+                "    Processor: KSTREAM-WINDOWED-0000000003 (stores: [in-memory-join-store-other])\n" +
+                "      --> KSTREAM-OUTEROTHER-0000000005\n" +
+                "      <-- KSTREAM-SOURCE-0000000001\n" +
+                "    Processor: KSTREAM-OUTEROTHER-0000000005 (stores: [in-memory-join-store-outer-shared-join-store, in-memory-join-store])\n" +
+                "      --> KSTREAM-MERGE-0000000006\n" +
+                "      <-- KSTREAM-WINDOWED-0000000003\n" +
+                "    Processor: KSTREAM-OUTERTHIS-0000000004 (stores: [in-memory-join-store-other, in-memory-join-store-outer-shared-join-store])\n" +
+                "      --> KSTREAM-MERGE-0000000006\n" +
+                "      <-- KSTREAM-WINDOWED-0000000002\n" +
+                "    Processor: KSTREAM-MERGE-0000000006 (stores: [])\n" +
+                "      --> none\n" +
+                "      <-- KSTREAM-OUTERTHIS-0000000004, KSTREAM-OUTEROTHER-0000000005\n\n",
+            describe.toString());
+    }
+
+    @Test
     public void topologyWithDynamicRoutingShouldDescribeExtractorClass() {
         final StreamsBuilder builder  = new StreamsBuilder();
 
diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamLeftJoinTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamLeftJoinTest.java
index 5c4bccf..50c2f86 100644
--- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamLeftJoinTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamLeftJoinTest.java
@@ -30,6 +30,8 @@ import org.apache.kafka.streams.kstream.KStream;
 import org.apache.kafka.streams.TestInputTopic;
 import org.apache.kafka.streams.kstream.StreamJoined;
 import org.apache.kafka.streams.processor.internals.testutil.LogCaptureAppender;
+import org.apache.kafka.streams.state.Stores;
+import org.apache.kafka.streams.state.WindowBytesStoreSupplier;
 import org.apache.kafka.test.MockProcessor;
 import org.apache.kafka.test.MockProcessorSupplier;
 import org.apache.kafka.test.MockValueJoiner;
@@ -429,7 +431,32 @@ public class KStreamKStreamLeftJoinTest {
     }
 
     @Test
-    public void testLeftJoin() {
+    public void testLeftJoinWithInMemoryCustomSuppliers() {
+        final JoinWindows joinWindows = JoinWindows.of(ofMillis(100));
+
+        final WindowBytesStoreSupplier thisStoreSupplier = Stores.inMemoryWindowStore("in-memory-join-store",
+            Duration.ofMillis(joinWindows.size() + joinWindows.gracePeriodMs()),
+            Duration.ofMillis(joinWindows.size()), true);
+
+        final WindowBytesStoreSupplier otherStoreSupplier = Stores.inMemoryWindowStore("in-memory-join-store-other",
+            Duration.ofMillis(joinWindows.size() + joinWindows.gracePeriodMs()),
+            Duration.ofMillis(joinWindows.size()), true);
+
+        final StreamJoined<Integer, String, String> streamJoined = StreamJoined.with(Serdes.Integer(), Serdes.String(), Serdes.String());
+
+        runLeftJoin(streamJoined.withThisStoreSupplier(thisStoreSupplier).withOtherStoreSupplier(otherStoreSupplier), joinWindows);
+    }
+
+    @Test
+    public void testLeftJoinWithDefaultSuppliers() {
+        final JoinWindows joinWindows = JoinWindows.of(ofMillis(100));
+        final StreamJoined<Integer, String, String> streamJoined = StreamJoined.with(Serdes.Integer(), Serdes.String(), Serdes.String());
+
+        runLeftJoin(streamJoined, joinWindows);
+    }
+
+    public void runLeftJoin(final StreamJoined<Integer, String, String> streamJoined,
+                            final JoinWindows joinWindows) {
         final StreamsBuilder builder = new StreamsBuilder();
 
         final int[] expectedKeys = new int[] {0, 1, 2, 3};
@@ -444,8 +471,8 @@ public class KStreamKStreamLeftJoinTest {
         joined = stream1.leftJoin(
             stream2,
             MockValueJoiner.TOSTRING_JOINER,
-            JoinWindows.of(ofMillis(100)),
-            StreamJoined.with(Serdes.Integer(), Serdes.String(), Serdes.String()));
+            joinWindows,
+            streamJoined);
         joined.process(supplier);
 
         final Collection<Set<String>> copartitionGroups =
diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamOuterJoinTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamOuterJoinTest.java
index c0f88d8..2bf7fef 100644
--- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamOuterJoinTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamOuterJoinTest.java
@@ -30,6 +30,8 @@ import org.apache.kafka.streams.kstream.JoinWindows;
 import org.apache.kafka.streams.kstream.KStream;
 import org.apache.kafka.streams.kstream.StreamJoined;
 import org.apache.kafka.streams.processor.internals.testutil.LogCaptureAppender;
+import org.apache.kafka.streams.state.Stores;
+import org.apache.kafka.streams.state.WindowBytesStoreSupplier;
 import org.apache.kafka.test.MockProcessor;
 import org.apache.kafka.test.MockProcessorSupplier;
 import org.apache.kafka.test.MockValueJoiner;
@@ -498,7 +500,32 @@ public class KStreamKStreamOuterJoinTest {
     }
 
     @Test
-    public void testOuterJoin() {
+    public void testOuterJoinWithInMemoryCustomSuppliers() {
+        final JoinWindows joinWindows = JoinWindows.of(ofMillis(100));
+
+        final WindowBytesStoreSupplier thisStoreSupplier = Stores.inMemoryWindowStore("in-memory-join-store",
+            Duration.ofMillis(joinWindows.size() + joinWindows.gracePeriodMs()),
+            Duration.ofMillis(joinWindows.size()), true);
+
+        final WindowBytesStoreSupplier otherStoreSupplier = Stores.inMemoryWindowStore("in-memory-join-store-other",
+            Duration.ofMillis(joinWindows.size() + joinWindows.gracePeriodMs()),
+            Duration.ofMillis(joinWindows.size()), true);
+
+        final StreamJoined<Integer, String, String> streamJoined = StreamJoined.with(Serdes.Integer(), Serdes.String(), Serdes.String());
+
+        runOuterJoin(streamJoined.withThisStoreSupplier(thisStoreSupplier).withOtherStoreSupplier(otherStoreSupplier), joinWindows);
+    }
+
+    @Test
+    public void testOuterJoinWithDefaultSuppliers() {
+        final JoinWindows joinWindows = JoinWindows.of(ofMillis(100));
+        final StreamJoined<Integer, String, String> streamJoined = StreamJoined.with(Serdes.Integer(), Serdes.String(), Serdes.String());
+
+        runOuterJoin(streamJoined, joinWindows);
+    }
+
+    public void runOuterJoin(final StreamJoined<Integer, String, String> streamJoined,
+                             final JoinWindows joinWindows) {
         final StreamsBuilder builder = new StreamsBuilder();
 
         final int[] expectedKeys = new int[] {0, 1, 2, 3};
@@ -513,8 +540,8 @@ public class KStreamKStreamOuterJoinTest {
         joined = stream1.outerJoin(
             stream2,
             MockValueJoiner.TOSTRING_JOINER,
-            JoinWindows.of(ofMillis(100)).grace(ofMillis(0)),
-            StreamJoined.with(Serdes.Integer(), Serdes.String(), Serdes.String()));
+            joinWindows,
+            streamJoined);
         joined.process(supplier);
 
         final Collection<Set<String>> copartitionGroups =