You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by mj...@apache.org on 2023/04/19 23:34:44 UTC

[kafka] branch trunk updated: KAFKA-14834: [11/N] Update table joins to identify out-of-order records with `isLatest` (#13609)

This is an automated email from the ASF dual-hosted git repository.

mjsax pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 11c8bf48261 KAFKA-14834: [11/N] Update table joins to identify out-of-order records with `isLatest` (#13609)
11c8bf48261 is described below

commit 11c8bf4826197533807b2132cfc6599ba70de1c1
Author: Victoria Xia <vi...@confluent.io>
AuthorDate: Wed Apr 19 19:34:36 2023 -0400

    KAFKA-14834: [11/N] Update table joins to identify out-of-order records with `isLatest` (#13609)
    
    This PR fixes a bug in the table-table join handling of out-of-order records in versioned tables where if the latest value for a particular key is a tombstone, by using the isLatest value from the Change object instead of calling get(key) on the state store to fetch timestamps to compare against. As part of this fix, this PR also updates table-table joins to determine whether upstream tables are versioned by using the GraphNode mechanism, instead of checking the table's value getter.
    
    Part of KIP-914.
    
    Reviewer: Matthias J. Sax <ma...@confluent.io>
---
 .../kstream/internals/InternalStreamsBuilder.java  |  10 +-
 .../streams/kstream/internals/KTableImpl.java      |  22 +-
 .../internals/KTableKTableAbstractJoin.java        |  20 +-
 .../kstream/internals/KTableKTableInnerJoin.java   |  30 +-
 .../kstream/internals/KTableKTableLeftJoin.java    |  30 +-
 .../kstream/internals/KTableKTableOuterJoin.java   |  30 +-
 .../kstream/internals/KTableKTableRightJoin.java   |  30 +-
 .../ForeignTableJoinProcessorSupplier.java         |  40 +--
 .../SubscriptionJoinProcessorSupplier.java         |   2 +-
 .../SubscriptionReceiveProcessorSupplier.java      |   2 +-
 .../SubscriptionSendProcessorSupplier.java         |  46 +--
 .../internals/graph/BaseJoinProcessorNode.java     |   6 +-
 ...e.java => ForeignJoinSubscriptionSendNode.java} |  25 +-
 ...leFilterNode.java => ForeignTableJoinNode.java} |  28 +-
 .../internals/graph/KTableKTableJoinNode.java      |  37 ++-
 .../kstream/internals/graph/TableFilterNode.java   |   2 +-
 .../internals/graph/TableRepartitionMapNode.java   |   2 +-
 .../graph/VersionedSemanticsGraphNode.java         |   2 +-
 .../integration/AbstractJoinIntegrationTest.java   |  53 ++-
 ...ableForeignKeyVersionedJoinIntegrationTest.java |  72 ++--
 .../StreamStreamJoinIntegrationTest.java           | 334 +------------------
 .../StreamTableJoinIntegrationTest.java            |  46 ++-
 .../integration/TableTableJoinIntegrationTest.java | 363 +++++++++------------
 .../internals/InternalStreamsBuilderTest.java      | 201 ++++++++++++
 .../streams/kstream/internals/KStreamImplTest.java |   8 +-
 25 files changed, 671 insertions(+), 770 deletions(-)

diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/InternalStreamsBuilder.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/InternalStreamsBuilder.java
index f9517afe425..d928f5b92d3 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/InternalStreamsBuilder.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/InternalStreamsBuilder.java
@@ -620,11 +620,17 @@ public class InternalStreamsBuilder implements InternalNameProvider {
     }
 
     private void enableVersionedSemantics() {
-        versionedSemanticsNodes.forEach(node -> ((VersionedSemanticsGraphNode) node).enableVersionedSemantics(isVersionedUpstream(node)));
+        versionedSemanticsNodes.forEach(node -> {
+            for (final GraphNode parentNode : node.parentNodes()) {
+                if (isVersionedOrVersionedUpstream(parentNode)) {
+                    ((VersionedSemanticsGraphNode) node).enableVersionedSemantics(true, parentNode.nodeName());
+                }
+            }
+        });
         tableSuppressNodesNodes.forEach(node -> {
             if (isVersionedUpstream(node)) {
                 throw new TopologyException("suppress() is only supported for non-versioned KTables " +
-                        "(note that version semantics might be inherited from upstream)");
+                    "(note that version semantics might be inherited from upstream)");
             }
         });
     }
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableImpl.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableImpl.java
index 494969460f0..0527c69c55e 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableImpl.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableImpl.java
@@ -48,6 +48,8 @@ import org.apache.kafka.streams.kstream.internals.foreignkeyjoin.SubscriptionRes
 import org.apache.kafka.streams.kstream.internals.foreignkeyjoin.SubscriptionReceiveProcessorSupplier;
 import org.apache.kafka.streams.kstream.internals.foreignkeyjoin.SubscriptionWrapper;
 import org.apache.kafka.streams.kstream.internals.foreignkeyjoin.SubscriptionWrapperSerde;
+import org.apache.kafka.streams.kstream.internals.graph.ForeignTableJoinNode;
+import org.apache.kafka.streams.kstream.internals.graph.ForeignJoinSubscriptionSendNode;
 import org.apache.kafka.streams.kstream.internals.graph.KTableKTableJoinNode;
 import org.apache.kafka.streams.kstream.internals.graph.ProcessorGraphNode;
 import org.apache.kafka.streams.kstream.internals.graph.ProcessorParameters;
@@ -804,6 +806,7 @@ public class KTableImpl<K, S, V> extends AbstractStream<K, V> implements KTable<
         kTableKTableJoinNode.setOutputVersioned(isOutputVersioned);
 
         builder.addGraphNode(this.graphNode, kTableKTableJoinNode);
+        builder.addGraphNode(((KTableImpl<?, ?, ?>) other).graphNode, kTableKTableJoinNode);
 
         // we can inherit parent key serde if user do not provide specific overrides
         return new KTableImpl<K, Change<VR>, VR>(
@@ -1098,7 +1101,7 @@ public class KTableImpl<K, S, V> extends AbstractStream<K, V> implements KTable<
         //not be done needlessly.
         ((KTableImpl<?, ?, ?>) foreignKeyTable).enableSendingOldValues(true);
 
-        //Old values must be sent such that the ForeignJoinSubscriptionSendProcessorSupplier can propagate deletions to the correct node.
+        //Old values must be sent such that the SubscriptionSendProcessorSupplier can propagate deletions to the correct node.
         //This occurs whenever the extracted foreignKey changes values.
         enableSendingOldValues(true);
 
@@ -1138,8 +1141,7 @@ public class KTableImpl<K, S, V> extends AbstractStream<K, V> implements KTable<
             keySerde
         );
 
-        final KTableValueGetterSupplier<K, V> primaryKeyValueGetter = valueGetterSupplier();
-        final StatefulProcessorNode<K, Change<V>> subscriptionSendNode = new StatefulProcessorNode<>(
+        final ProcessorGraphNode<K, Change<V>> subscriptionSendNode = new ForeignJoinSubscriptionSendNode<>(
             new ProcessorParameters<>(
                 new SubscriptionSendProcessorSupplier<>(
                     foreignKeyExtractor,
@@ -1147,13 +1149,10 @@ public class KTableImpl<K, S, V> extends AbstractStream<K, V> implements KTable<
                     valueHashSerdePseudoTopic,
                     foreignKeySerde,
                     valueSerde == null ? null : valueSerde.serializer(),
-                    leftJoin,
-                    primaryKeyValueGetter
+                    leftJoin
                 ),
                 renamed.suffixWithOrElseGet("-subscription-registration-processor", builder, SUBSCRIPTION_REGISTRATION)
-            ),
-            Collections.emptySet(),
-            Collections.singleton(primaryKeyValueGetter)
+            )
         );
         builder.addGraphNode(graphNode, subscriptionSendNode);
 
@@ -1219,13 +1218,13 @@ public class KTableImpl<K, S, V> extends AbstractStream<K, V> implements KTable<
             );
         builder.addGraphNode(subscriptionReceiveNode, subscriptionJoinNode);
 
-        final StatefulProcessorNode<KO, Change<VO>> foreignTableJoinNode = new StatefulProcessorNode<>(
+        final StatefulProcessorNode<KO, Change<VO>> foreignTableJoinNode = new ForeignTableJoinNode<>(
             new ProcessorParameters<>(
-                new ForeignTableJoinProcessorSupplier<>(subscriptionStore, combinedKeySchema, foreignKeyValueGetter),
+                new ForeignTableJoinProcessorSupplier<>(subscriptionStore, combinedKeySchema),
                 renamed.suffixWithOrElseGet("-foreign-join-subscription", builder, SUBSCRIPTION_PROCESSOR)
             ),
             Collections.singleton(subscriptionStore),
-            Collections.singleton(foreignKeyValueGetter)
+            Collections.emptySet()
         );
         builder.addGraphNode(((KTableImpl<KO, VO, ?>) foreignKeyTable).graphNode, foreignTableJoinNode);
 
@@ -1259,6 +1258,7 @@ public class KTableImpl<K, S, V> extends AbstractStream<K, V> implements KTable<
         resultSourceNodes.add(foreignResponseSource.nodeName());
         builder.internalTopologyBuilder.copartitionSources(resultSourceNodes);
 
+        final KTableValueGetterSupplier<K, V> primaryKeyValueGetter = valueGetterSupplier();
         final StatefulProcessorNode<K, SubscriptionResponseWrapper<VO>> responseJoinNode = new StatefulProcessorNode<>(
             new ProcessorParameters<>(
                 new ResponseJoinProcessorSupplier<>(
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableKTableAbstractJoin.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableKTableAbstractJoin.java
index 665d839f2f4..21339c0b649 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableKTableAbstractJoin.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableKTableAbstractJoin.java
@@ -18,17 +18,16 @@ package org.apache.kafka.streams.kstream.internals;
 
 import org.apache.kafka.streams.kstream.ValueJoiner;
 
-abstract class KTableKTableAbstractJoin<K, V1, V2, VOut> implements
+public abstract class KTableKTableAbstractJoin<K, V1, V2, VOut> implements
     KTableProcessorSupplier<K, V1, K, VOut> {
 
     private final KTableImpl<K, ?, V1> table1;
     private final KTableImpl<K, ?, V2> table2;
     final KTableValueGetterSupplier<K, V1> valueGetterSupplier1;
     final KTableValueGetterSupplier<K, V2> valueGetterSupplier2;
-    final KTableValueGetter<K, V1> valueGetter1;
-    final KTableValueGetter<K, V2> valueGetter2;
     final ValueJoiner<? super V1, ? super V2, ? extends VOut> joiner;
 
+    boolean useVersionedSemantics = false;
     boolean sendOldValues = false;
 
     KTableKTableAbstractJoin(final KTableImpl<K, ?, V1> table1,
@@ -38,8 +37,6 @@ abstract class KTableKTableAbstractJoin<K, V1, V2, VOut> implements
         this.table2 = table2;
         this.valueGetterSupplier1 = table1.valueGetterSupplier();
         this.valueGetterSupplier2 = table2.valueGetterSupplier();
-        this.valueGetter1 = valueGetterSupplier1.get();
-        this.valueGetter2 = valueGetterSupplier2.get();
         this.joiner = joiner;
     }
 
@@ -51,4 +48,17 @@ abstract class KTableKTableAbstractJoin<K, V1, V2, VOut> implements
         sendOldValues = true;
         return true;
     }
+
+    public void setUseVersionedSemantics(final boolean useVersionedSemantics) {
+        this.useVersionedSemantics = useVersionedSemantics;
+    }
+
+    // VisibleForTesting
+    public boolean isUseVersionedSemantics() {
+        return useVersionedSemantics;
+    }
+
+    public String joinThisParentNodeName() {
+        return table1.graphNode.nodeName();
+    }
 }
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableKTableInnerJoin.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableKTableInnerJoin.java
index 98587b16fe5..517a741aaf6 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableKTableInnerJoin.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableKTableInnerJoin.java
@@ -42,7 +42,7 @@ class KTableKTableInnerJoin<K, V1, V2, VOut> extends KTableKTableAbstractJoin<K,
 
     @Override
     public Processor<K, Change<V1>, K, Change<VOut>> get() {
-        return new KTableKTableJoinProcessor(valueGetter1, valueGetter2);
+        return new KTableKTableJoinProcessor(valueGetterSupplier2.get());
     }
 
     @Override
@@ -64,14 +64,11 @@ class KTableKTableInnerJoin<K, V1, V2, VOut> extends KTableKTableAbstractJoin<K,
 
     private class KTableKTableJoinProcessor extends ContextualProcessor<K, Change<V1>, K, Change<VOut>> {
 
-        private final KTableValueGetter<K, V1> thisValueGetter;
-        private final KTableValueGetter<K, V2> otherValueGetter;
+        private final KTableValueGetter<K, V2> valueGetter;
         private Sensor droppedRecordsSensor;
 
-        KTableKTableJoinProcessor(final KTableValueGetter<K, V1> thisValueGetter,
-                                  final KTableValueGetter<K, V2> otherValueGetter) {
-            this.thisValueGetter = thisValueGetter;
-            this.otherValueGetter = otherValueGetter;
+        KTableKTableJoinProcessor(final KTableValueGetter<K, V2> valueGetter) {
+            this.valueGetter = valueGetter;
         }
 
         @Override
@@ -82,8 +79,7 @@ class KTableKTableInnerJoin<K, V1, V2, VOut> extends KTableKTableAbstractJoin<K,
                 context.taskId().toString(),
                 (StreamsMetricsImpl) context.metrics()
             );
-            thisValueGetter.init(context);
-            otherValueGetter.init(context);
+            valueGetter.init(context);
         }
 
         @Override
@@ -107,20 +103,17 @@ class KTableKTableInnerJoin<K, V1, V2, VOut> extends KTableKTableAbstractJoin<K,
             }
 
             // drop out-of-order records from versioned tables (cf. KIP-914)
-            if (thisValueGetter.isVersioned()) {
-                final ValueAndTimestamp<V1> valueAndTimestampLeft = thisValueGetter.get(record.key());
-                if (valueAndTimestampLeft != null && valueAndTimestampLeft.timestamp() > record.timestamp()) {
-                    LOG.info("Skipping out-of-order record from versioned table while performing table-table join.");
-                    droppedRecordsSensor.record();
-                    return;
-                }
+            if (useVersionedSemantics && !record.value().isLatest) {
+                LOG.info("Skipping out-of-order record from versioned table while performing table-table join.");
+                droppedRecordsSensor.record();
+                return;
             }
 
             VOut newValue = null;
             final long resultTimestamp;
             VOut oldValue = null;
 
-            final ValueAndTimestamp<V2> valueAndTimestampRight = otherValueGetter.get(record.key());
+            final ValueAndTimestamp<V2> valueAndTimestampRight = valueGetter.get(record.key());
             final V2 valueRight = getValueOrNull(valueAndTimestampRight);
             if (valueRight == null) {
                 return;
@@ -141,8 +134,7 @@ class KTableKTableInnerJoin<K, V1, V2, VOut> extends KTableKTableAbstractJoin<K,
 
         @Override
         public void close() {
-            thisValueGetter.close();
-            otherValueGetter.close();
+            valueGetter.close();
         }
     }
 
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableKTableLeftJoin.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableKTableLeftJoin.java
index 46bcbda4e5e..7484c8efc1a 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableKTableLeftJoin.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableKTableLeftJoin.java
@@ -43,7 +43,7 @@ class KTableKTableLeftJoin<K, V1, V2, VOut> extends KTableKTableAbstractJoin<K,
 
     @Override
     public Processor<K, Change<V1>, K, Change<VOut>> get() {
-        return new KTableKTableLeftJoinProcessor(valueGetter1, valueGetter2);
+        return new KTableKTableLeftJoinProcessor(valueGetterSupplier2.get());
     }
 
     @Override
@@ -66,14 +66,11 @@ class KTableKTableLeftJoin<K, V1, V2, VOut> extends KTableKTableAbstractJoin<K,
 
     private class KTableKTableLeftJoinProcessor extends ContextualProcessor<K, Change<V1>, K, Change<VOut>> {
 
-        private final KTableValueGetter<K, V1> thisValueGetter;
-        private final KTableValueGetter<K, V2> otherValueGetter;
+        private final KTableValueGetter<K, V2> valueGetter;
         private Sensor droppedRecordsSensor;
 
-        KTableKTableLeftJoinProcessor(final KTableValueGetter<K, V1> thisValueGetter,
-                                      final KTableValueGetter<K, V2> otherValueGetter) {
-            this.thisValueGetter = thisValueGetter;
-            this.otherValueGetter = otherValueGetter;
+        KTableKTableLeftJoinProcessor(final KTableValueGetter<K, V2> valueGetter) {
+            this.valueGetter = valueGetter;
         }
 
         @Override
@@ -84,8 +81,7 @@ class KTableKTableLeftJoin<K, V1, V2, VOut> extends KTableKTableAbstractJoin<K,
                 context.taskId().toString(),
                 (StreamsMetricsImpl) context.metrics()
             );
-            thisValueGetter.init(context);
-            otherValueGetter.init(context);
+            valueGetter.init(context);
         }
 
         @Override
@@ -109,20 +105,17 @@ class KTableKTableLeftJoin<K, V1, V2, VOut> extends KTableKTableAbstractJoin<K,
             }
 
             // drop out-of-order records from versioned tables (cf. KIP-914)
-            if (thisValueGetter.isVersioned()) {
-                final ValueAndTimestamp<V1> valueAndTimestampLeft = thisValueGetter.get(record.key());
-                if (valueAndTimestampLeft != null && valueAndTimestampLeft.timestamp() > record.timestamp()) {
-                    LOG.info("Skipping out-of-order record from versioned table while performing table-table join.");
-                    droppedRecordsSensor.record();
-                    return;
-                }
+            if (useVersionedSemantics && !record.value().isLatest) {
+                LOG.info("Skipping out-of-order record from versioned table while performing table-table join.");
+                droppedRecordsSensor.record();
+                return;
             }
 
             VOut newValue = null;
             final long resultTimestamp;
             VOut oldValue = null;
 
-            final ValueAndTimestamp<V2> valueAndTimestampRight = otherValueGetter.get(record.key());
+            final ValueAndTimestamp<V2> valueAndTimestampRight = valueGetter.get(record.key());
             final V2 value2 = getValueOrNull(valueAndTimestampRight);
             final long timestampRight;
 
@@ -150,8 +143,7 @@ class KTableKTableLeftJoin<K, V1, V2, VOut> extends KTableKTableAbstractJoin<K,
 
         @Override
         public void close() {
-            thisValueGetter.close();
-            otherValueGetter.close();
+            valueGetter.close();
         }
     }
 
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableKTableOuterJoin.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableKTableOuterJoin.java
index a06a7cc4479..e8f74075d45 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableKTableOuterJoin.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableKTableOuterJoin.java
@@ -43,7 +43,7 @@ class KTableKTableOuterJoin<K, V1, V2, VOut> extends KTableKTableAbstractJoin<K,
 
     @Override
     public Processor<K, Change<V1>, K, Change<VOut>> get() {
-        return new KTableKTableOuterJoinProcessor(valueGetter1, valueGetter2);
+        return new KTableKTableOuterJoinProcessor(valueGetterSupplier2.get());
     }
 
     @Override
@@ -65,14 +65,11 @@ class KTableKTableOuterJoin<K, V1, V2, VOut> extends KTableKTableAbstractJoin<K,
 
     private class KTableKTableOuterJoinProcessor extends ContextualProcessor<K, Change<V1>, K, Change<VOut>> {
 
-        private final KTableValueGetter<K, V2> otherValueGetter;
-        private final KTableValueGetter<K, V1> thisValueGetter;
+        private final KTableValueGetter<K, V2> valueGetter;
         private Sensor droppedRecordsSensor;
 
-        KTableKTableOuterJoinProcessor(final KTableValueGetter<K, V1> thisValueGetter,
-                                       final KTableValueGetter<K, V2> otherValueGetter) {
-            this.thisValueGetter = thisValueGetter;
-            this.otherValueGetter = otherValueGetter;
+        KTableKTableOuterJoinProcessor(final KTableValueGetter<K, V2> valueGetter) {
+            this.valueGetter = valueGetter;
         }
 
         @Override
@@ -83,8 +80,7 @@ class KTableKTableOuterJoin<K, V1, V2, VOut> extends KTableKTableAbstractJoin<K,
                 context.taskId().toString(),
                 (StreamsMetricsImpl) context.metrics()
             );
-            thisValueGetter.init(context);
-            otherValueGetter.init(context);
+            valueGetter.init(context);
         }
 
         @Override
@@ -108,20 +104,17 @@ class KTableKTableOuterJoin<K, V1, V2, VOut> extends KTableKTableAbstractJoin<K,
             }
 
             // drop out-of-order records from versioned tables (cf. KIP-914)
-            if (thisValueGetter.isVersioned()) {
-                final ValueAndTimestamp<V1> valueAndTimestamp1 = thisValueGetter.get(record.key());
-                if (valueAndTimestamp1 != null && valueAndTimestamp1.timestamp() > record.timestamp()) {
-                    LOG.info("Skipping out-of-order record from versioned table while performing table-table join.");
-                    droppedRecordsSensor.record();
-                    return;
-                }
+            if (useVersionedSemantics && !record.value().isLatest) {
+                LOG.info("Skipping out-of-order record from versioned table while performing table-table join.");
+                droppedRecordsSensor.record();
+                return;
             }
 
             VOut newValue = null;
             final long resultTimestamp;
             VOut oldValue = null;
 
-            final ValueAndTimestamp<V2> valueAndTimestamp2 = otherValueGetter.get(record.key());
+            final ValueAndTimestamp<V2> valueAndTimestamp2 = valueGetter.get(record.key());
             final V2 value2 = getValueOrNull(valueAndTimestamp2);
             if (value2 == null) {
                 if (record.value().newValue == null && record.value().oldValue == null) {
@@ -145,8 +138,7 @@ class KTableKTableOuterJoin<K, V1, V2, VOut> extends KTableKTableAbstractJoin<K,
 
         @Override
         public void close() {
-            thisValueGetter.close();
-            otherValueGetter.close();
+            valueGetter.close();
         }
     }
 
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableKTableRightJoin.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableKTableRightJoin.java
index 08d7241fd70..082a0fdae0e 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableKTableRightJoin.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableKTableRightJoin.java
@@ -42,7 +42,7 @@ class KTableKTableRightJoin<K, V1, V2, VOut> extends KTableKTableAbstractJoin<K,
 
     @Override
     public Processor<K, Change<V1>, K, Change<VOut>> get() {
-        return new KTableKTableRightJoinProcessor(valueGetter1, valueGetter2);
+        return new KTableKTableRightJoinProcessor(valueGetterSupplier2.get());
     }
 
     @Override
@@ -64,14 +64,11 @@ class KTableKTableRightJoin<K, V1, V2, VOut> extends KTableKTableAbstractJoin<K,
 
     private class KTableKTableRightJoinProcessor extends ContextualProcessor<K, Change<V1>, K, Change<VOut>> {
 
-        private final KTableValueGetter<K, V1> thisValueGetter;
-        private final KTableValueGetter<K, V2> otherValueGetter;
+        private final KTableValueGetter<K, V2> valueGetter;
         private Sensor droppedRecordsSensor;
 
-        KTableKTableRightJoinProcessor(final KTableValueGetter<K, V1> thisValueGetter,
-                                       final KTableValueGetter<K, V2> otherValueGetter) {
-            this.thisValueGetter = thisValueGetter;
-            this.otherValueGetter = otherValueGetter;
+        KTableKTableRightJoinProcessor(final KTableValueGetter<K, V2> valueGetter) {
+            this.valueGetter = valueGetter;
         }
 
         @Override
@@ -82,8 +79,7 @@ class KTableKTableRightJoin<K, V1, V2, VOut> extends KTableKTableAbstractJoin<K,
                 context.taskId().toString(),
                 (StreamsMetricsImpl) context.metrics()
             );
-            thisValueGetter.init(context);
-            otherValueGetter.init(context);
+            valueGetter.init(context);
         }
 
         @Override
@@ -107,20 +103,17 @@ class KTableKTableRightJoin<K, V1, V2, VOut> extends KTableKTableAbstractJoin<K,
             }
 
             // drop out-of-order records from versioned tables (cf. KIP-914)
-            if (thisValueGetter.isVersioned()) {
-                final ValueAndTimestamp<V1> valueAndTimestampRight = thisValueGetter.get(record.key());
-                if (valueAndTimestampRight != null && valueAndTimestampRight.timestamp() > record.timestamp()) {
-                    LOG.info("Skipping out-of-order record from versioned table while performing table-table join.");
-                    droppedRecordsSensor.record();
-                    return;
-                }
+            if (useVersionedSemantics && !record.value().isLatest) {
+                LOG.info("Skipping out-of-order record from versioned table while performing table-table join.");
+                droppedRecordsSensor.record();
+                return;
             }
 
             final VOut newValue;
             final long resultTimestamp;
             VOut oldValue = null;
 
-            final ValueAndTimestamp<V2> valueAndTimestampLeft = otherValueGetter.get(record.key());
+            final ValueAndTimestamp<V2> valueAndTimestampLeft = valueGetter.get(record.key());
             final V2 valueLeft = getValueOrNull(valueAndTimestampLeft);
             if (valueLeft == null) {
                 return;
@@ -141,8 +134,7 @@ class KTableKTableRightJoin<K, V1, V2, VOut> extends KTableKTableAbstractJoin<K,
 
         @Override
         public void close() {
-            thisValueGetter.close();
-            otherValueGetter.close();
+            valueGetter.close();
         }
     }
 
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/foreignkeyjoin/ForeignTableJoinProcessorSupplier.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/foreignkeyjoin/ForeignTableJoinProcessorSupplier.java
index a9cae203378..314ed7cdfd3 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/foreignkeyjoin/ForeignTableJoinProcessorSupplier.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/foreignkeyjoin/ForeignTableJoinProcessorSupplier.java
@@ -21,8 +21,6 @@ import org.apache.kafka.common.metrics.Sensor;
 import org.apache.kafka.common.utils.Bytes;
 import org.apache.kafka.streams.KeyValue;
 import org.apache.kafka.streams.kstream.internals.Change;
-import org.apache.kafka.streams.kstream.internals.KTableValueGetter;
-import org.apache.kafka.streams.kstream.internals.KTableValueGetterSupplier;
 import org.apache.kafka.streams.processor.api.ContextualProcessor;
 import org.apache.kafka.streams.processor.api.Processor;
 import org.apache.kafka.streams.processor.api.ProcessorContext;
@@ -45,32 +43,33 @@ public class ForeignTableJoinProcessorSupplier<K, KO, VO> implements
     private static final Logger LOG = LoggerFactory.getLogger(ForeignTableJoinProcessorSupplier.class);
     private final StoreBuilder<TimestampedKeyValueStore<Bytes, SubscriptionWrapper<K>>> storeBuilder;
     private final CombinedKeySchema<KO, K> keySchema;
-    private final KTableValueGetterSupplier<KO, VO> foreignKeyValueGetterSupplier;
+    private boolean useVersionedSemantics = false;
 
     public ForeignTableJoinProcessorSupplier(
         final StoreBuilder<TimestampedKeyValueStore<Bytes, SubscriptionWrapper<K>>> storeBuilder,
-        final CombinedKeySchema<KO, K> keySchema,
-        final KTableValueGetterSupplier<KO, VO> foreignKeyValueGetterSupplier) {
+        final CombinedKeySchema<KO, K> keySchema) {
 
         this.storeBuilder = storeBuilder;
         this.keySchema = keySchema;
-        this.foreignKeyValueGetterSupplier = foreignKeyValueGetterSupplier;
     }
 
     @Override
     public Processor<KO, Change<VO>, K, SubscriptionResponseWrapper<VO>> get() {
-        return new KTableKTableJoinProcessor(foreignKeyValueGetterSupplier.get());
+        return new KTableKTableJoinProcessor();
     }
 
+    public void setUseVersionedSemantics(final boolean useVersionedSemantics) {
+        this.useVersionedSemantics = useVersionedSemantics;
+    }
+
+    // VisibleForTesting
+    public boolean isUseVersionedSemantics() {
+        return useVersionedSemantics;
+    }
 
     private final class KTableKTableJoinProcessor extends ContextualProcessor<KO, Change<VO>, K, SubscriptionResponseWrapper<VO>> {
         private Sensor droppedRecordsSensor;
         private TimestampedKeyValueStore<Bytes, SubscriptionWrapper<K>> subscriptionStore;
-        private final KTableValueGetter<KO, VO> foreignKeyValueGetter;
-
-        private KTableKTableJoinProcessor(final KTableValueGetter<KO, VO> foreignKeyValueGetter) {
-            this.foreignKeyValueGetter = foreignKeyValueGetter;
-        }
 
         @Override
         public void init(final ProcessorContext<K, SubscriptionResponseWrapper<VO>> context) {
@@ -82,7 +81,6 @@ public class ForeignTableJoinProcessorSupplier<K, KO, VO> implements
                 internalProcessorContext.metrics()
             );
             subscriptionStore = internalProcessorContext.getStateStore(storeBuilder);
-            foreignKeyValueGetter.init(context);
         }
 
         @Override
@@ -107,13 +105,10 @@ public class ForeignTableJoinProcessorSupplier<K, KO, VO> implements
             }
 
             // drop out-of-order records from versioned tables (cf. KIP-914)
-            if (foreignKeyValueGetter.isVersioned()) {
-                final ValueAndTimestamp<VO> latestValueAndTimestamp = foreignKeyValueGetter.get(record.key());
-                if (latestValueAndTimestamp != null && latestValueAndTimestamp.timestamp() > record.timestamp()) {
-                    LOG.info("Skipping out-of-order record from versioned table while performing table-table join.");
-                    droppedRecordsSensor.record();
-                    return;
-                }
+            if (useVersionedSemantics && !record.value().isLatest) {
+                LOG.info("Skipping out-of-order record from versioned table while performing table-table join.");
+                droppedRecordsSensor.record();
+                return;
             }
 
             final Bytes prefixBytes = keySchema.prefixBytes(record.key());
@@ -139,11 +134,6 @@ public class ForeignTableJoinProcessorSupplier<K, KO, VO> implements
             }
         }
 
-        @Override
-        public void close() {
-            foreignKeyValueGetter.close();
-        }
-
         private boolean prefixEquals(final byte[] x, final byte[] y) {
             final int min = Math.min(x.length, y.length);
             final ByteBuffer xSlice = ByteBuffer.wrap(x, 0, min);
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/foreignkeyjoin/SubscriptionJoinProcessorSupplier.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/foreignkeyjoin/SubscriptionJoinProcessorSupplier.java
index 490f70ffc8d..7d31ef44223 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/foreignkeyjoin/SubscriptionJoinProcessorSupplier.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/foreignkeyjoin/SubscriptionJoinProcessorSupplier.java
@@ -33,7 +33,7 @@ import java.util.Objects;
 /**
  * Receives {@code SubscriptionWrapper<K>} events and processes them according to their Instruction.
  * Depending on the results, {@code SubscriptionResponseWrapper}s are created, which will be propagated to
- * the {@code SubscriptionResolverJoinProcessorSupplier} instance.
+ * the {@code ResponseJoinProcessorSupplier} instance.
  *
  * @param <K> Type of primary keys
  * @param <KO> Type of foreign key
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/foreignkeyjoin/SubscriptionReceiveProcessorSupplier.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/foreignkeyjoin/SubscriptionReceiveProcessorSupplier.java
index 5c386cc735b..cf88aec6f9a 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/foreignkeyjoin/SubscriptionReceiveProcessorSupplier.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/foreignkeyjoin/SubscriptionReceiveProcessorSupplier.java
@@ -103,7 +103,7 @@ public class SubscriptionReceiveProcessorSupplier<K, KO>
                 final ValueAndTimestamp<SubscriptionWrapper<K>> newValue = ValueAndTimestamp.make(record.value(), record.timestamp());
                 final ValueAndTimestamp<SubscriptionWrapper<K>> oldValue = store.get(subscriptionKey);
 
-                //This store is used by the prefix scanner in ForeignJoinSubscriptionProcessorSupplier
+                //This store is used by the prefix scanner in ForeignTableJoinProcessorSupplier
                 if (record.value().getInstruction().equals(SubscriptionWrapper.Instruction.DELETE_KEY_AND_PROPAGATE) ||
                     record.value().getInstruction().equals(SubscriptionWrapper.Instruction.DELETE_KEY_NO_PROPAGATE)) {
                     store.delete(subscriptionKey);
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/foreignkeyjoin/SubscriptionSendProcessorSupplier.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/foreignkeyjoin/SubscriptionSendProcessorSupplier.java
index feb06b02e13..8a6298c28de 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/foreignkeyjoin/SubscriptionSendProcessorSupplier.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/foreignkeyjoin/SubscriptionSendProcessorSupplier.java
@@ -21,8 +21,6 @@ import org.apache.kafka.common.metrics.Sensor;
 import org.apache.kafka.common.serialization.Serde;
 import org.apache.kafka.common.serialization.Serializer;
 import org.apache.kafka.streams.kstream.internals.Change;
-import org.apache.kafka.streams.kstream.internals.KTableValueGetter;
-import org.apache.kafka.streams.kstream.internals.KTableValueGetterSupplier;
 import org.apache.kafka.streams.processor.api.ContextualProcessor;
 import org.apache.kafka.streams.processor.api.Processor;
 import org.apache.kafka.streams.processor.api.ProcessorContext;
@@ -31,7 +29,6 @@ import org.apache.kafka.streams.processor.api.Record;
 import org.apache.kafka.streams.processor.api.RecordMetadata;
 import org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl;
 import org.apache.kafka.streams.processor.internals.metrics.TaskMetrics;
-import org.apache.kafka.streams.state.ValueAndTimestamp;
 import org.apache.kafka.streams.state.internals.Murmur3;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -52,29 +49,36 @@ public class SubscriptionSendProcessorSupplier<K, KO, V> implements ProcessorSup
     private final Supplier<String> foreignKeySerdeTopicSupplier;
     private final Supplier<String> valueSerdeTopicSupplier;
     private final boolean leftJoin;
-    private final KTableValueGetterSupplier<K, V> primaryKeyValueGetterSupplier;
     private Serializer<KO> foreignKeySerializer;
     private Serializer<V> valueSerializer;
+    private boolean useVersionedSemantics;
 
     public SubscriptionSendProcessorSupplier(final Function<V, KO> foreignKeyExtractor,
                                              final Supplier<String> foreignKeySerdeTopicSupplier,
                                              final Supplier<String> valueSerdeTopicSupplier,
                                              final Serde<KO> foreignKeySerde,
                                              final Serializer<V> valueSerializer,
-                                             final boolean leftJoin,
-                                             final KTableValueGetterSupplier<K, V> primaryKeyValueGetterSupplier) {
+                                             final boolean leftJoin) {
         this.foreignKeyExtractor = foreignKeyExtractor;
         this.foreignKeySerdeTopicSupplier = foreignKeySerdeTopicSupplier;
         this.valueSerdeTopicSupplier = valueSerdeTopicSupplier;
         this.valueSerializer = valueSerializer;
         this.leftJoin = leftJoin;
-        this.primaryKeyValueGetterSupplier = primaryKeyValueGetterSupplier;
         foreignKeySerializer = foreignKeySerde == null ? null : foreignKeySerde.serializer();
     }
 
     @Override
     public Processor<K, Change<V>, KO, SubscriptionWrapper<K>> get() {
-        return new UnbindChangeProcessor(primaryKeyValueGetterSupplier.get());
+        return new UnbindChangeProcessor();
+    }
+
+    public void setUseVersionedSemantics(final boolean useVersionedSemantics) {
+        this.useVersionedSemantics = useVersionedSemantics;
+    }
+
+    // VisibleForTesting
+    public boolean isUseVersionedSemantics() {
+        return useVersionedSemantics;
     }
 
     private class UnbindChangeProcessor extends ContextualProcessor<K, Change<V>, KO, SubscriptionWrapper<K>> {
@@ -82,11 +86,6 @@ public class SubscriptionSendProcessorSupplier<K, KO, V> implements ProcessorSup
         private Sensor droppedRecordsSensor;
         private String foreignKeySerdeTopic;
         private String valueSerdeTopic;
-        private final KTableValueGetter<K, V> primaryKeyValueGetter;
-
-        private UnbindChangeProcessor(final KTableValueGetter<K, V> primaryKeyValueGetter) {
-            this.primaryKeyValueGetter = primaryKeyValueGetter;
-        }
 
         @SuppressWarnings("unchecked")
         @Override
@@ -106,23 +105,15 @@ public class SubscriptionSendProcessorSupplier<K, KO, V> implements ProcessorSup
                 context.taskId().toString(),
                 (StreamsMetricsImpl) context.metrics()
             );
-            primaryKeyValueGetter.init(context);
         }
 
         @Override
         public void process(final Record<K, Change<V>> record) {
             // drop out-of-order records from versioned tables (cf. KIP-914)
-            if (primaryKeyValueGetter.isVersioned()) {
-                // key-value stores do not contain data for null keys, so skip the check
-                // if the key is null
-                if (record.key() != null) {
-                    final ValueAndTimestamp<V> latestValueAndTimestamp = primaryKeyValueGetter.get(record.key());
-                    if (latestValueAndTimestamp != null && latestValueAndTimestamp.timestamp() > record.timestamp()) {
-                        LOG.info("Skipping out-of-order record from versioned table while performing table-table join.");
-                        droppedRecordsSensor.record();
-                        return;
-                    }
-                }
+            if (useVersionedSemantics && !record.value().isLatest) {
+                LOG.info("Skipping out-of-order record from versioned table while performing table-table join.");
+                droppedRecordsSensor.record();
+                return;
             }
 
             final long[] currentHash = record.value().newValue == null ?
@@ -208,11 +199,6 @@ public class SubscriptionSendProcessorSupplier<K, KO, V> implements ProcessorSup
             }
         }
 
-        @Override
-        public void close() {
-            primaryKeyValueGetter.close();
-        }
-
         private void logSkippedRecordDueToNullForeignKey() {
             if (context().recordMetadata().isPresent()) {
                 final RecordMetadata recordMetadata = context().recordMetadata().get();
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/graph/BaseJoinProcessorNode.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/graph/BaseJoinProcessorNode.java
index 121db537943..0c0dcb3bec9 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/graph/BaseJoinProcessorNode.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/graph/BaseJoinProcessorNode.java
@@ -51,11 +51,13 @@ abstract class BaseJoinProcessorNode<K, V1, V2, VR> extends GraphNode {
         this.otherJoinSideNodeName = otherJoinSideNodeName;
     }
 
-    ProcessorParameters<K, V1, ?, ?> thisProcessorParameters() {
+    // VisibleForTesting
+    public ProcessorParameters<K, V1, ?, ?> thisProcessorParameters() {
         return joinThisProcessorParameters;
     }
 
-    ProcessorParameters<K, V2, ?, ?> otherProcessorParameters() {
+    // VisibleForTesting
+    public ProcessorParameters<K, V2, ?, ?> otherProcessorParameters() {
         return joinOtherProcessorParameters;
     }
 
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/graph/TableFilterNode.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/graph/ForeignJoinSubscriptionSendNode.java
similarity index 56%
copy from streams/src/main/java/org/apache/kafka/streams/kstream/internals/graph/TableFilterNode.java
copy to streams/src/main/java/org/apache/kafka/streams/kstream/internals/graph/ForeignJoinSubscriptionSendNode.java
index e53d5b43a0d..4efbd9b29f1 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/graph/TableFilterNode.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/graph/ForeignJoinSubscriptionSendNode.java
@@ -14,30 +14,27 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-
 package org.apache.kafka.streams.kstream.internals.graph;
 
-import org.apache.kafka.streams.kstream.internals.KTableFilter;
+import org.apache.kafka.streams.kstream.internals.foreignkeyjoin.SubscriptionSendProcessorSupplier;
 import org.apache.kafka.streams.processor.api.ProcessorSupplier;
-import org.apache.kafka.streams.state.StoreBuilder;
 
-public class TableFilterNode<K, V> extends TableProcessorNode<K, V> implements VersionedSemanticsGraphNode {
+public class ForeignJoinSubscriptionSendNode<K, V> extends ProcessorGraphNode<K, V> implements VersionedSemanticsGraphNode {
 
-    public TableFilterNode(final String nodeName,
-                           final ProcessorParameters<K, V, ?, ?> processorParameters,
-                           final StoreBuilder<?> storeBuilder) {
-        super(nodeName, processorParameters, storeBuilder);
+    public ForeignJoinSubscriptionSendNode(final ProcessorParameters<K, V, ?, ?> processorParameters) {
+        super(processorParameters);
     }
 
     @SuppressWarnings("unchecked")
     @Override
-    public void enableVersionedSemantics(final boolean useVersionedSemantics) {
-        final ProcessorSupplier<K, V, ?, ?> processorSupplier = processorParameters().processorSupplier();
-        if (!(processorSupplier instanceof KTableFilter)) {
-            throw new IllegalStateException("Unexpected processor type for table filter: " + processorSupplier.getClass().getName());
+    public void enableVersionedSemantics(final boolean useVersionedSemantics, final String parentNodeName) {
+        final ProcessorSupplier<?, ?, ?, ?> processorSupplier = processorParameters().processorSupplier();
+        if (!(processorSupplier instanceof SubscriptionSendProcessorSupplier)) {
+            throw new IllegalStateException("Unexpected processor type for foreign-key table-table join subscription send processor: " + processorSupplier.getClass().getName());
         }
 
-        final KTableFilter<K, V> tableFilter = (KTableFilter<K, V>) processorSupplier;
-        tableFilter.setUseVersionedSemantics(useVersionedSemantics);
+        final SubscriptionSendProcessorSupplier<?, ?, ?> subscriptionSendProcessor
+            = (SubscriptionSendProcessorSupplier<?, ?, ?>) processorSupplier;
+        subscriptionSendProcessor.setUseVersionedSemantics(useVersionedSemantics);
     }
 }
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/graph/TableFilterNode.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/graph/ForeignTableJoinNode.java
similarity index 50%
copy from streams/src/main/java/org/apache/kafka/streams/kstream/internals/graph/TableFilterNode.java
copy to streams/src/main/java/org/apache/kafka/streams/kstream/internals/graph/ForeignTableJoinNode.java
index e53d5b43a0d..82d9d6486ad 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/graph/TableFilterNode.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/graph/ForeignTableJoinNode.java
@@ -14,30 +14,32 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-
 package org.apache.kafka.streams.kstream.internals.graph;
 
-import org.apache.kafka.streams.kstream.internals.KTableFilter;
+import java.util.Set;
+import org.apache.kafka.streams.kstream.internals.KTableValueGetterSupplier;
+import org.apache.kafka.streams.kstream.internals.foreignkeyjoin.ForeignTableJoinProcessorSupplier;
 import org.apache.kafka.streams.processor.api.ProcessorSupplier;
 import org.apache.kafka.streams.state.StoreBuilder;
 
-public class TableFilterNode<K, V> extends TableProcessorNode<K, V> implements VersionedSemanticsGraphNode {
+public class ForeignTableJoinNode<K, V> extends StatefulProcessorNode<K, V> implements VersionedSemanticsGraphNode {
 
-    public TableFilterNode(final String nodeName,
-                           final ProcessorParameters<K, V, ?, ?> processorParameters,
-                           final StoreBuilder<?> storeBuilder) {
-        super(nodeName, processorParameters, storeBuilder);
+    public ForeignTableJoinNode(final ProcessorParameters<K, V, ?, ?> processorParameters,
+                                final Set<StoreBuilder<?>> preRegisteredStores,
+                                final Set<KTableValueGetterSupplier<?, ?>> valueGetterSuppliers) {
+        super(processorParameters, preRegisteredStores, valueGetterSuppliers);
     }
 
     @SuppressWarnings("unchecked")
     @Override
-    public void enableVersionedSemantics(final boolean useVersionedSemantics) {
-        final ProcessorSupplier<K, V, ?, ?> processorSupplier = processorParameters().processorSupplier();
-        if (!(processorSupplier instanceof KTableFilter)) {
-            throw new IllegalStateException("Unexpected processor type for table filter: " + processorSupplier.getClass().getName());
+    public void enableVersionedSemantics(final boolean useVersionedSemantics, final String parentNodeName) {
+        final ProcessorSupplier<?, ?, ?, ?> processorSupplier = processorParameters().processorSupplier();
+        if (!(processorSupplier instanceof ForeignTableJoinProcessorSupplier)) {
+            throw new IllegalStateException("Unexpected processor type for foreign-key table-table join subscription processor: " + processorSupplier.getClass().getName());
         }
 
-        final KTableFilter<K, V> tableFilter = (KTableFilter<K, V>) processorSupplier;
-        tableFilter.setUseVersionedSemantics(useVersionedSemantics);
+        final ForeignTableJoinProcessorSupplier<?, ?, ?> subscriptionProcessor
+            = (ForeignTableJoinProcessorSupplier<?, ?, ?>) processorSupplier;
+        subscriptionProcessor.setUseVersionedSemantics(useVersionedSemantics);
     }
 }
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/graph/KTableKTableJoinNode.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/graph/KTableKTableJoinNode.java
index 8c2f945a64e..ea07b4ba2f8 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/graph/KTableKTableJoinNode.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/graph/KTableKTableJoinNode.java
@@ -17,11 +17,9 @@
 
 package org.apache.kafka.streams.kstream.internals.graph;
 
-import java.util.Collections;
-import java.util.HashSet;
-import java.util.Set;
 import org.apache.kafka.common.serialization.Serde;
 import org.apache.kafka.streams.kstream.internals.Change;
+import org.apache.kafka.streams.kstream.internals.KTableKTableAbstractJoin;
 import org.apache.kafka.streams.kstream.internals.KTableKTableJoinMerger;
 import org.apache.kafka.streams.kstream.internals.KTableProcessorSupplier;
 import org.apache.kafka.streams.processor.api.ProcessorSupplier;
@@ -33,7 +31,7 @@ import java.util.Arrays;
 /**
  * Too much specific information to generalize so the KTable-KTable join requires a specific node.
  */
-public class KTableKTableJoinNode<K, V1, V2, VR> extends BaseJoinProcessorNode<K, Change<V1>, Change<V2>, Change<VR>> {
+public class KTableKTableJoinNode<K, V1, V2, VR> extends BaseJoinProcessorNode<K, Change<V1>, Change<V2>, Change<VR>> implements VersionedSemanticsGraphNode {
 
     private final Serde<K> keySerde;
     private final Serde<VR> valueSerde;
@@ -97,6 +95,27 @@ public class KTableKTableJoinNode<K, V1, V2, VR> extends BaseJoinProcessorNode<K
         return (KTableKTableJoinMerger<K, VR>) kChangeProcessorSupplier;
     }
 
+    @Override
+    public void enableVersionedSemantics(final boolean useVersionedSemantics, final String parentNodeName) {
+        enableVersionedSemantics(thisProcessorParameters(), useVersionedSemantics, parentNodeName);
+        enableVersionedSemantics(otherProcessorParameters(), useVersionedSemantics, parentNodeName);
+    }
+
+    @SuppressWarnings("unchecked")
+    private void enableVersionedSemantics(final ProcessorParameters<K, ?, ?, ?> processorParameters,
+                                          final boolean useVersionedSemantics,
+                                          final String parentNodeName) {
+        final ProcessorSupplier<K, ?, ?, ?> processorSupplier = processorParameters.processorSupplier();
+        if (!(processorSupplier instanceof KTableKTableAbstractJoin)) {
+            throw new IllegalStateException("Unexpected processor type for table-table join: " + processorSupplier.getClass().getName());
+        }
+        final KTableKTableAbstractJoin<K, ?, ?, ?> tableJoin = (KTableKTableAbstractJoin<K, ?, ?, ?>) processorSupplier;
+
+        if (parentNodeName.equals(tableJoin.joinThisParentNodeName())) {
+            tableJoin.setUseVersionedSemantics(useVersionedSemantics);
+        }
+    }
+
     @Override
     public void writeToTopology(final InternalTopologyBuilder topologyBuilder) {
         final String thisProcessorName = thisProcessorParameters().processorName();
@@ -119,14 +138,8 @@ public class KTableKTableJoinNode<K, V1, V2, VR> extends BaseJoinProcessorNode<K
             thisProcessorName,
             otherProcessorName);
 
-        // join processors for both sides of the join should have access to both stores
-        final Set<String> stores = new HashSet<>(joinThisStoreNames.length + joinOtherStoreNames.length);
-        Collections.addAll(stores, joinThisStoreNames);
-        Collections.addAll(stores, joinOtherStoreNames);
-        final String[] mergedStoreNames = stores.toArray(new String[0]);
-
-        topologyBuilder.connectProcessorAndStateStores(thisProcessorName, mergedStoreNames);
-        topologyBuilder.connectProcessorAndStateStores(otherProcessorName, mergedStoreNames);
+        topologyBuilder.connectProcessorAndStateStores(thisProcessorName, joinOtherStoreNames);
+        topologyBuilder.connectProcessorAndStateStores(otherProcessorName, joinThisStoreNames);
 
         if (storeBuilder != null) {
             topologyBuilder.addStateStore(storeBuilder, mergeProcessorName);
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/graph/TableFilterNode.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/graph/TableFilterNode.java
index e53d5b43a0d..e13adbc9c39 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/graph/TableFilterNode.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/graph/TableFilterNode.java
@@ -31,7 +31,7 @@ public class TableFilterNode<K, V> extends TableProcessorNode<K, V> implements V
 
     @SuppressWarnings("unchecked")
     @Override
-    public void enableVersionedSemantics(final boolean useVersionedSemantics) {
+    public void enableVersionedSemantics(final boolean useVersionedSemantics, final String parentNodeName) {
         final ProcessorSupplier<K, V, ?, ?> processorSupplier = processorParameters().processorSupplier();
         if (!(processorSupplier instanceof KTableFilter)) {
             throw new IllegalStateException("Unexpected processor type for table filter: " + processorSupplier.getClass().getName());
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/graph/TableRepartitionMapNode.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/graph/TableRepartitionMapNode.java
index 4d5e283aa62..6f56e052b69 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/graph/TableRepartitionMapNode.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/graph/TableRepartitionMapNode.java
@@ -29,7 +29,7 @@ public class TableRepartitionMapNode<K, V> extends ProcessorGraphNode<K, V> impl
 
     @SuppressWarnings("unchecked")
     @Override
-    public void enableVersionedSemantics(final boolean useVersionedSemantics) {
+    public void enableVersionedSemantics(final boolean useVersionedSemantics, final String parentNodeName) {
         final ProcessorSupplier<K, V, ?, ?> processorSupplier = processorParameters().processorSupplier();
         if (!(processorSupplier instanceof KTableRepartitionMap)) {
             throw new IllegalStateException("Unexpected processor type for table repartition map: " + processorSupplier.getClass().getName());
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/graph/VersionedSemanticsGraphNode.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/graph/VersionedSemanticsGraphNode.java
index 9b7e578bfe0..c4cc9c3934d 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/graph/VersionedSemanticsGraphNode.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/graph/VersionedSemanticsGraphNode.java
@@ -28,6 +28,6 @@ public interface VersionedSemanticsGraphNode {
     /**
      * @param useVersionedSemantics whether versioned semantics should be enabled
      */
-    void enableVersionedSemantics(final boolean useVersionedSemantics);
+    void enableVersionedSemantics(boolean useVersionedSemantics, String parentNodeName);
 
 }
diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/AbstractJoinIntegrationTest.java b/streams/src/test/java/org/apache/kafka/streams/integration/AbstractJoinIntegrationTest.java
index 5805ecfeff6..ff4d2e69e1a 100644
--- a/streams/src/test/java/org/apache/kafka/streams/integration/AbstractJoinIntegrationTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/integration/AbstractJoinIntegrationTest.java
@@ -16,6 +16,7 @@
  */
 package org.apache.kafka.streams.integration;
 
+import java.util.Collections;
 import org.apache.kafka.clients.consumer.ConsumerConfig;
 import org.apache.kafka.common.serialization.LongDeserializer;
 import org.apache.kafka.common.serialization.LongSerializer;
@@ -87,7 +88,37 @@ public abstract class AbstractJoinIntegrationTest {
 
     StreamsBuilder builder;
 
-    private final List<Input<String>> input = Arrays.asList(
+    protected final List<Input<String>> input = Arrays.asList(
+        new Input<>(INPUT_TOPIC_LEFT, null, 1),
+        new Input<>(INPUT_TOPIC_RIGHT, null, 2),
+        new Input<>(INPUT_TOPIC_LEFT, "A", 3),
+        new Input<>(INPUT_TOPIC_RIGHT, "a", 4),
+        new Input<>(INPUT_TOPIC_LEFT, "B", 5),
+        new Input<>(INPUT_TOPIC_RIGHT, "b", 6),
+        new Input<>(INPUT_TOPIC_LEFT, null, 7),
+        new Input<>(INPUT_TOPIC_RIGHT, null, 8),
+        new Input<>(INPUT_TOPIC_LEFT, "C", 9),
+        new Input<>(INPUT_TOPIC_RIGHT, "c", 10),
+        new Input<>(INPUT_TOPIC_RIGHT, null, 11),
+        new Input<>(INPUT_TOPIC_LEFT, null, 12),
+        new Input<>(INPUT_TOPIC_RIGHT, null, 13),
+        new Input<>(INPUT_TOPIC_RIGHT, "d", 7), // out-of-order data with null as latest
+        new Input<>(INPUT_TOPIC_LEFT, "D", 6),
+        new Input<>(INPUT_TOPIC_LEFT, null, 2),
+        new Input<>(INPUT_TOPIC_RIGHT, null, 3),
+        new Input<>(INPUT_TOPIC_RIGHT, "e", 14),
+        new Input<>(INPUT_TOPIC_LEFT, "E", 15),
+        new Input<>(INPUT_TOPIC_LEFT, null, 10), // out-of-order data with non-null as latest
+        new Input<>(INPUT_TOPIC_RIGHT, null, 9),
+        new Input<>(INPUT_TOPIC_LEFT, "F", 4),
+        new Input<>(INPUT_TOPIC_RIGHT, "f", 3)
+    );
+
+    // used for stream-stream join tests where out-of-order data does not meaningfully affect
+    // the result, and the main `input` list results in too many result records/test noise.
+    // also used for table-table multi-join tests, since out-of-order data with table-table
+    // joins is already tested in non-multi-join settings.
+    protected final List<Input<String>> inputWithoutOutOfOrderData = Arrays.asList(
         new Input<>(INPUT_TOPIC_LEFT, null, 1),
         new Input<>(INPUT_TOPIC_RIGHT, null, 2),
         new Input<>(INPUT_TOPIC_LEFT, "A", 3),
@@ -102,13 +133,10 @@ public abstract class AbstractJoinIntegrationTest {
         new Input<>(INPUT_TOPIC_LEFT, null, 12),
         new Input<>(INPUT_TOPIC_RIGHT, null, 13),
         new Input<>(INPUT_TOPIC_RIGHT, "d", 14),
-        new Input<>(INPUT_TOPIC_LEFT, "D", 15),
-        new Input<>(INPUT_TOPIC_LEFT, "E", 4), // out-of-order data
-        new Input<>(INPUT_TOPIC_RIGHT, "e", 3),
-        new Input<>(INPUT_TOPIC_RIGHT, "f", 7),
-        new Input<>(INPUT_TOPIC_LEFT, "F", 8)
+        new Input<>(INPUT_TOPIC_LEFT, "D", 15)
     );
 
+    // used for stream-stream self joins where only one input topic is needed
     private final List<Input<String>> leftInput = Arrays.asList(
         new Input<>(INPUT_TOPIC_LEFT, null, 1),
         new Input<>(INPUT_TOPIC_LEFT, "A", 2),
@@ -144,11 +172,11 @@ public abstract class AbstractJoinIntegrationTest {
         STREAMS_CONFIG.put(StreamsConfig.STATE_DIR_CONFIG, testFolder.getRoot().getPath());
     }
 
-    void runTestWithDriver(final List<List<TestRecord<Long, String>>> expectedResult) {
-        runTestWithDriver(expectedResult, null);
+    void runTestWithDriver(final List<Input<String>> input, final List<List<TestRecord<Long, String>>> expectedResult) {
+        runTestWithDriver(input, expectedResult, null);
     }
 
-    void runTestWithDriver(final List<List<TestRecord<Long, String>>> expectedResult, final String storeName) {
+    void runTestWithDriver(final List<Input<String>> input, final List<List<TestRecord<Long, String>>> expectedResult, final String storeName) {
         try (final TopologyTestDriver driver = new TopologyTestDriver(builder.build(STREAMS_CONFIG), STREAMS_CONFIG)) {
             final TestInputTopic<Long, String> right = driver.createInputTopic(INPUT_TOPIC_RIGHT, new LongSerializer(), new StringSerializer());
             final TestInputTopic<Long, String> left = driver.createInputTopic(INPUT_TOPIC_LEFT, new LongSerializer(), new StringSerializer());
@@ -175,6 +203,9 @@ public abstract class AbstractJoinIntegrationTest {
                     final List<TestRecord<Long, String>> output = outputTopic.readRecordsToList();
                     assertThat(output, equalTo(updatedExpected));
                     expectedFinalResult = updatedExpected.get(expected.size() - 1);
+                } else {
+                    final List<TestRecord<Long, String>> output = outputTopic.readRecordsToList();
+                    assertThat(output, equalTo(Collections.emptyList()));
                 }
             }
 
@@ -184,7 +215,7 @@ public abstract class AbstractJoinIntegrationTest {
         }
     }
 
-    void runTestWithDriver(final TestRecord<Long, String> expectedFinalResult, final String storeName) throws InterruptedException {
+    void runTestWithDriver(final List<Input<String>> input, final TestRecord<Long, String> expectedFinalResult, final String storeName) throws InterruptedException {
         try (final TopologyTestDriver driver = new TopologyTestDriver(builder.build(STREAMS_CONFIG), STREAMS_CONFIG)) {
             final TestInputTopic<Long, String> right = driver.createInputTopic(INPUT_TOPIC_RIGHT, new LongSerializer(), new StringSerializer());
             final TestInputTopic<Long, String> left = driver.createInputTopic(INPUT_TOPIC_LEFT, new LongSerializer(), new StringSerializer());
@@ -258,7 +289,7 @@ public abstract class AbstractJoinIntegrationTest {
         }
     }
 
-    private static final class Input<V> {
+    protected static final class Input<V> {
         String topic;
         KeyValue<Long, V> record;
         long timestamp;
diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/KTableKTableForeignKeyVersionedJoinIntegrationTest.java b/streams/src/test/java/org/apache/kafka/streams/integration/KTableKTableForeignKeyVersionedJoinIntegrationTest.java
index 10f632ffdea..3703c910388 100644
--- a/streams/src/test/java/org/apache/kafka/streams/integration/KTableKTableForeignKeyVersionedJoinIntegrationTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/integration/KTableKTableForeignKeyVersionedJoinIntegrationTest.java
@@ -85,7 +85,6 @@ public class KTableKTableForeignKeyVersionedJoinIntegrationTest extends KTableKT
             // LHS records with match to existing RHS record
             left.pipeInput("lhs1", "lhsValue1|rhs1", baseTimestamp + 3);
             left.pipeInput("lhs2", "lhsValue2|rhs1", baseTimestamp + 5);
-
             {
                 final Map<String, String> expected = mkMap(
                     mkEntry("lhs1", "(lhsValue1|rhs1,rhsValue1)"),
@@ -103,8 +102,28 @@ public class KTableKTableForeignKeyVersionedJoinIntegrationTest extends KTableKT
                 }
             }
 
+            // replace with tombstone, to validate behavior when latest record is null
+            left.pipeInput("lhs2", null, baseTimestamp + 6);
+            {
+                assertThat(
+                    outputTopic.readKeyValuesToMap(),
+                    is(mkMap(
+                        mkEntry("lhs2", null)
+                    ))
+                );
+                if (materialized) {
+                    assertThat(
+                        asMap(store),
+                        is(mkMap(
+                            mkEntry("lhs1", "(lhsValue1|rhs1,rhsValue1)")
+                        ))
+                    );
+                }
+            }
+
             // out-of-order LHS record (for existing key) does not produce a new result iff LHS is versioned
             left.pipeInput("lhs1", "lhsValue1_ooo|rhs1", baseTimestamp + 2);
+            left.pipeInput("lhs2", "lhsValue2_ooo|rhs1", baseTimestamp + 2);
             if (leftVersioned) {
                 assertThat(
                     outputTopic.readKeyValuesToMap(),
@@ -114,25 +133,23 @@ public class KTableKTableForeignKeyVersionedJoinIntegrationTest extends KTableKT
                     assertThat(
                         asMap(store),
                         is(mkMap(
-                            mkEntry("lhs1", "(lhsValue1|rhs1,rhsValue1)"),
-                            mkEntry("lhs2", "(lhsValue2|rhs1,rhsValue1)")
+                            mkEntry("lhs1", "(lhsValue1|rhs1,rhsValue1)")
                         ))
                     );
                 }
             } else {
+                final Map<String, String> expected = mkMap(
+                    mkEntry("lhs1", "(lhsValue1_ooo|rhs1,rhsValue1)"),
+                    mkEntry("lhs2", "(lhsValue2_ooo|rhs1,rhsValue1)")
+                );
                 assertThat(
                     outputTopic.readKeyValuesToMap(),
-                    is(mkMap(
-                        mkEntry("lhs1", "(lhsValue1_ooo|rhs1,rhsValue1)")
-                    ))
+                    is(expected)
                 );
                 if (materialized) {
                     assertThat(
                         asMap(store),
-                        is(mkMap(
-                            mkEntry("lhs1", "(lhsValue1_ooo|rhs1,rhsValue1)"),
-                            mkEntry("lhs2", "(lhsValue2|rhs1,rhsValue1)")
-                        ))
+                        is(expected)
                     );
                 }
             }
@@ -148,8 +165,7 @@ public class KTableKTableForeignKeyVersionedJoinIntegrationTest extends KTableKT
                     assertThat(
                         asMap(store),
                         is(mkMap(
-                            mkEntry("lhs1", "(lhsValue1|rhs1,rhsValue1)"),
-                            mkEntry("lhs2", "(lhsValue2|rhs1,rhsValue1)")
+                            mkEntry("lhs1", "(lhsValue1|rhs1,rhsValue1)")
                         ))
                     );
                 }
@@ -164,7 +180,7 @@ public class KTableKTableForeignKeyVersionedJoinIntegrationTest extends KTableKT
                     assertThat(
                         asMap(store),
                         is(mkMap(
-                            mkEntry("lhs2", "(lhsValue2|rhs1,rhsValue1)")
+                            mkEntry("lhs2", "(lhsValue2_ooo|rhs1,rhsValue1)")
                         ))
                     );
                 }
@@ -172,20 +188,20 @@ public class KTableKTableForeignKeyVersionedJoinIntegrationTest extends KTableKT
 
             // LHS record with larger timestamp always produces a new result
             left.pipeInput("lhs1", "lhsValue1_new|rhs1", baseTimestamp + 8);
+            left.pipeInput("lhs2", "lhsValue2_new|rhs1", baseTimestamp + 8);
             {
+                final Map<String, String> expected = mkMap(
+                    mkEntry("lhs1", "(lhsValue1_new|rhs1,rhsValue1)"),
+                    mkEntry("lhs2", "(lhsValue2_new|rhs1,rhsValue1)")
+                );
                 assertThat(
                     outputTopic.readKeyValuesToMap(),
-                    is(mkMap(
-                        mkEntry("lhs1", "(lhsValue1_new|rhs1,rhsValue1)")
-                    ))
+                    is(expected)
                 );
                 if (materialized) {
                     assertThat(
                         asMap(store),
-                        is(mkMap(
-                            mkEntry("lhs1", "(lhsValue1_new|rhs1,rhsValue1)"),
-                            mkEntry("lhs2", "(lhsValue2|rhs1,rhsValue1)")
-                        ))
+                        is(expected)
                     );
                 }
             }
@@ -202,7 +218,7 @@ public class KTableKTableForeignKeyVersionedJoinIntegrationTest extends KTableKT
                         asMap(store),
                         is(mkMap(
                             mkEntry("lhs1", "(lhsValue1_new|rhs1,rhsValue1)"),
-                            mkEntry("lhs2", "(lhsValue2|rhs1,rhsValue1)")
+                            mkEntry("lhs2", "(lhsValue2_new|rhs1,rhsValue1)")
                         ))
                     );
                 }
@@ -211,7 +227,7 @@ public class KTableKTableForeignKeyVersionedJoinIntegrationTest extends KTableKT
                     outputTopic.readKeyValuesToMap(),
                     is(mkMap(
                         mkEntry("lhs1", "(lhsValue1_new|rhs1,rhsValue1_ooo)"),
-                        mkEntry("lhs2", "(lhsValue2|rhs1,rhsValue1_ooo)")
+                        mkEntry("lhs2", "(lhsValue2_new|rhs1,rhsValue1_ooo)")
                     ))
                 );
                 if (materialized) {
@@ -219,7 +235,7 @@ public class KTableKTableForeignKeyVersionedJoinIntegrationTest extends KTableKT
                         asMap(store),
                         is(mkMap(
                             mkEntry("lhs1", "(lhsValue1_new|rhs1,rhsValue1_ooo)"),
-                            mkEntry("lhs2", "(lhsValue2|rhs1,rhsValue1_ooo)")
+                            mkEntry("lhs2", "(lhsValue2_new|rhs1,rhsValue1_ooo)")
                         ))
                     );
                 }
@@ -237,7 +253,7 @@ public class KTableKTableForeignKeyVersionedJoinIntegrationTest extends KTableKT
                         asMap(store),
                         is(mkMap(
                             mkEntry("lhs1", "(lhsValue1_new|rhs1,rhsValue1)"),
-                            mkEntry("lhs2", "(lhsValue2|rhs1,rhsValue1)")
+                            mkEntry("lhs2", "(lhsValue2_new|rhs1,rhsValue1)")
                         ))
                     );
                 }
@@ -247,7 +263,7 @@ public class KTableKTableForeignKeyVersionedJoinIntegrationTest extends KTableKT
                         outputTopic.readKeyValuesToMap(),
                         is(mkMap(
                             mkEntry("lhs1", "(lhsValue1_new|rhs1,null)"),
-                            mkEntry("lhs2", "(lhsValue2|rhs1,null)")
+                            mkEntry("lhs2", "(lhsValue2_new|rhs1,null)")
                         ))
                     );
                     if (materialized) {
@@ -255,7 +271,7 @@ public class KTableKTableForeignKeyVersionedJoinIntegrationTest extends KTableKT
                             asMap(store),
                             is(mkMap(
                                 mkEntry("lhs1", "(lhsValue1_new|rhs1,null)"),
-                                mkEntry("lhs2", "(lhsValue2|rhs1,null)")
+                                mkEntry("lhs2", "(lhsValue2_new|rhs1,null)")
                             ))
                         );
                     }
@@ -283,7 +299,7 @@ public class KTableKTableForeignKeyVersionedJoinIntegrationTest extends KTableKT
                     outputTopic.readKeyValuesToMap(),
                     is(mkMap(
                         mkEntry("lhs1", "(lhsValue1_new|rhs1,rhsValue1_new)"),
-                        mkEntry("lhs2", "(lhsValue2|rhs1,rhsValue1_new)")
+                        mkEntry("lhs2", "(lhsValue2_new|rhs1,rhsValue1_new)")
                     ))
                 );
                 if (materialized) {
@@ -291,7 +307,7 @@ public class KTableKTableForeignKeyVersionedJoinIntegrationTest extends KTableKT
                         asMap(store),
                         is(mkMap(
                             mkEntry("lhs1", "(lhsValue1_new|rhs1,rhsValue1_new)"),
-                            mkEntry("lhs2", "(lhsValue2|rhs1,rhsValue1_new)")
+                            mkEntry("lhs2", "(lhsValue2_new|rhs1,rhsValue1_new)")
                         ))
                     );
                 }
diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/StreamStreamJoinIntegrationTest.java b/streams/src/test/java/org/apache/kafka/streams/integration/StreamStreamJoinIntegrationTest.java
index 725ed7e3f66..554c5692a96 100644
--- a/streams/src/test/java/org/apache/kafka/streams/integration/StreamStreamJoinIntegrationTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/integration/StreamStreamJoinIntegrationTest.java
@@ -137,31 +137,7 @@ public class StreamStreamJoinIntegrationTest extends AbstractJoinIntegrationTest
                 new TestRecord<>(ANY_UNIQUE_KEY, "D-a", null, 15L),
                 new TestRecord<>(ANY_UNIQUE_KEY, "D-b", null, 15L),
                 new TestRecord<>(ANY_UNIQUE_KEY, "D-c", null, 15L),
-                new TestRecord<>(ANY_UNIQUE_KEY, "D-d", null, 15L)),
-            Arrays.asList(
-                new TestRecord<>(ANY_UNIQUE_KEY, "E-a", null, 4L),
-                new TestRecord<>(ANY_UNIQUE_KEY, "E-b", null, 6L),
-                new TestRecord<>(ANY_UNIQUE_KEY, "E-c", null, 10L),
-                new TestRecord<>(ANY_UNIQUE_KEY, "E-d", null, 14L)),
-            Arrays.asList(
-                new TestRecord<>(ANY_UNIQUE_KEY, "A-e", null, 3L),
-                new TestRecord<>(ANY_UNIQUE_KEY, "E-e", null, 4L),
-                new TestRecord<>(ANY_UNIQUE_KEY, "B-e", null, 5L),
-                new TestRecord<>(ANY_UNIQUE_KEY, "C-e", null, 9L),
-                new TestRecord<>(ANY_UNIQUE_KEY, "D-e", null, 15L)),
-            Arrays.asList(
-                new TestRecord<>(ANY_UNIQUE_KEY, "A-f", null, 7L),
-                new TestRecord<>(ANY_UNIQUE_KEY, "E-f", null, 7L),
-                new TestRecord<>(ANY_UNIQUE_KEY, "B-f", null, 7L),
-                new TestRecord<>(ANY_UNIQUE_KEY, "C-f", null, 9L),
-                new TestRecord<>(ANY_UNIQUE_KEY, "D-f", null, 15L)),
-            Arrays.asList(
-                new TestRecord<>(ANY_UNIQUE_KEY, "F-e", null, 8L),
-                new TestRecord<>(ANY_UNIQUE_KEY, "F-a", null, 8L),
-                new TestRecord<>(ANY_UNIQUE_KEY, "F-b", null, 8L),
-                new TestRecord<>(ANY_UNIQUE_KEY, "F-f", null, 8L),
-                new TestRecord<>(ANY_UNIQUE_KEY, "F-c", null, 10L),
-                new TestRecord<>(ANY_UNIQUE_KEY, "F-d", null, 14L))
+                new TestRecord<>(ANY_UNIQUE_KEY, "D-d", null, 15L))
         );
 
         leftStream.join(
@@ -170,7 +146,7 @@ public class StreamStreamJoinIntegrationTest extends AbstractJoinIntegrationTest
             JoinWindows.ofTimeDifferenceAndGrace(ofSeconds(10), ofHours(24))
         ).to(OUTPUT_TOPIC);
 
-        runTestWithDriver(expectedResult);
+        runTestWithDriver(inputWithoutOutOfOrderData, expectedResult);
     }
 
     @Test
@@ -206,31 +182,7 @@ public class StreamStreamJoinIntegrationTest extends AbstractJoinIntegrationTest
                 new TestRecord<>(ANY_UNIQUE_KEY, "D-a", null, 15L),
                 new TestRecord<>(ANY_UNIQUE_KEY, "D-b", null, 15L),
                 new TestRecord<>(ANY_UNIQUE_KEY, "D-c", null, 15L),
-                new TestRecord<>(ANY_UNIQUE_KEY, "D-d", null, 15L)),
-            Arrays.asList(
-                new TestRecord<>(ANY_UNIQUE_KEY, "E-a", null, 4L),
-                new TestRecord<>(ANY_UNIQUE_KEY, "E-b", null, 6L),
-                new TestRecord<>(ANY_UNIQUE_KEY, "E-c", null, 10L),
-                new TestRecord<>(ANY_UNIQUE_KEY, "E-d", null, 14L)),
-            Arrays.asList(
-                new TestRecord<>(ANY_UNIQUE_KEY, "A-e", null, 3L),
-                new TestRecord<>(ANY_UNIQUE_KEY, "E-e", null, 4L),
-                new TestRecord<>(ANY_UNIQUE_KEY, "B-e", null, 5L),
-                new TestRecord<>(ANY_UNIQUE_KEY, "C-e", null, 9L),
-                new TestRecord<>(ANY_UNIQUE_KEY, "D-e", null, 15L)),
-            Arrays.asList(
-                new TestRecord<>(ANY_UNIQUE_KEY, "A-f", null, 7L),
-                new TestRecord<>(ANY_UNIQUE_KEY, "E-f", null, 7L),
-                new TestRecord<>(ANY_UNIQUE_KEY, "B-f", null, 7L),
-                new TestRecord<>(ANY_UNIQUE_KEY, "C-f", null, 9L),
-                new TestRecord<>(ANY_UNIQUE_KEY, "D-f", null, 15L)),
-            Arrays.asList(
-                new TestRecord<>(ANY_UNIQUE_KEY, "F-e", null, 8L),
-                new TestRecord<>(ANY_UNIQUE_KEY, "F-a", null, 8L),
-                new TestRecord<>(ANY_UNIQUE_KEY, "F-b", null, 8L),
-                new TestRecord<>(ANY_UNIQUE_KEY, "F-f", null, 8L),
-                new TestRecord<>(ANY_UNIQUE_KEY, "F-c", null, 10L),
-                new TestRecord<>(ANY_UNIQUE_KEY, "F-d", null, 14L))
+                new TestRecord<>(ANY_UNIQUE_KEY, "D-d", null, 15L))
         );
 
         leftStream.map(MockMapper.noOpKeyValueMapper())
@@ -241,7 +193,7 @@ public class StreamStreamJoinIntegrationTest extends AbstractJoinIntegrationTest
                 JoinWindows.ofTimeDifferenceAndGrace(ofSeconds(10), ofHours(24))
             ).to(OUTPUT_TOPIC);
 
-        runTestWithDriver(expectedResult);
+        runTestWithDriver(inputWithoutOutOfOrderData, expectedResult);
     }
 
     @Test
@@ -277,31 +229,7 @@ public class StreamStreamJoinIntegrationTest extends AbstractJoinIntegrationTest
                 new TestRecord<>(ANY_UNIQUE_KEY, "D-a", null, 15L),
                 new TestRecord<>(ANY_UNIQUE_KEY, "D-b", null, 15L),
                 new TestRecord<>(ANY_UNIQUE_KEY, "D-c", null, 15L),
-                new TestRecord<>(ANY_UNIQUE_KEY, "D-d", null, 15L)),
-            Arrays.asList(
-                new TestRecord<>(ANY_UNIQUE_KEY, "E-a", null, 4L),
-                new TestRecord<>(ANY_UNIQUE_KEY, "E-b", null, 6L),
-                new TestRecord<>(ANY_UNIQUE_KEY, "E-c", null, 10L),
-                new TestRecord<>(ANY_UNIQUE_KEY, "E-d", null, 14L)),
-            Arrays.asList(
-                new TestRecord<>(ANY_UNIQUE_KEY, "A-e", null, 3L),
-                new TestRecord<>(ANY_UNIQUE_KEY, "E-e", null, 4L),
-                new TestRecord<>(ANY_UNIQUE_KEY, "B-e", null, 5L),
-                new TestRecord<>(ANY_UNIQUE_KEY, "C-e", null, 9L),
-                new TestRecord<>(ANY_UNIQUE_KEY, "D-e", null, 15L)),
-            Arrays.asList(
-                new TestRecord<>(ANY_UNIQUE_KEY, "A-f", null, 7L),
-                new TestRecord<>(ANY_UNIQUE_KEY, "E-f", null, 7L),
-                new TestRecord<>(ANY_UNIQUE_KEY, "B-f", null, 7L),
-                new TestRecord<>(ANY_UNIQUE_KEY, "C-f", null, 9L),
-                new TestRecord<>(ANY_UNIQUE_KEY, "D-f", null, 15L)),
-            Arrays.asList(
-                new TestRecord<>(ANY_UNIQUE_KEY, "F-e", null, 8L),
-                new TestRecord<>(ANY_UNIQUE_KEY, "F-a", null, 8L),
-                new TestRecord<>(ANY_UNIQUE_KEY, "F-b", null, 8L),
-                new TestRecord<>(ANY_UNIQUE_KEY, "F-f", null, 8L),
-                new TestRecord<>(ANY_UNIQUE_KEY, "F-c", null, 10L),
-                new TestRecord<>(ANY_UNIQUE_KEY, "F-d", null, 14L))
+                new TestRecord<>(ANY_UNIQUE_KEY, "D-d", null, 15L))
         );
 
         leftStream.leftJoin(
@@ -310,7 +238,7 @@ public class StreamStreamJoinIntegrationTest extends AbstractJoinIntegrationTest
             JoinWindows.ofTimeDifferenceAndGrace(ofSeconds(10), ofHours(24))
         ).to(OUTPUT_TOPIC);
 
-        runTestWithDriver(expectedResult);
+        runTestWithDriver(inputWithoutOutOfOrderData, expectedResult);
     }
 
     @Test
@@ -346,31 +274,7 @@ public class StreamStreamJoinIntegrationTest extends AbstractJoinIntegrationTest
                 new TestRecord<>(ANY_UNIQUE_KEY, "D-a", null, 15L),
                 new TestRecord<>(ANY_UNIQUE_KEY, "D-b", null, 15L),
                 new TestRecord<>(ANY_UNIQUE_KEY, "D-c", null, 15L),
-                new TestRecord<>(ANY_UNIQUE_KEY, "D-d", null, 15L)),
-            Arrays.asList(
-                new TestRecord<>(ANY_UNIQUE_KEY, "E-a", null, 4L),
-                new TestRecord<>(ANY_UNIQUE_KEY, "E-b", null, 6L),
-                new TestRecord<>(ANY_UNIQUE_KEY, "E-c", null, 10L),
-                new TestRecord<>(ANY_UNIQUE_KEY, "E-d", null, 14L)),
-            Arrays.asList(
-                new TestRecord<>(ANY_UNIQUE_KEY, "A-e", null, 3L),
-                new TestRecord<>(ANY_UNIQUE_KEY, "E-e", null, 4L),
-                new TestRecord<>(ANY_UNIQUE_KEY, "B-e", null, 5L),
-                new TestRecord<>(ANY_UNIQUE_KEY, "C-e", null, 9L),
-                new TestRecord<>(ANY_UNIQUE_KEY, "D-e", null, 15L)),
-            Arrays.asList(
-                new TestRecord<>(ANY_UNIQUE_KEY, "A-f", null, 7L),
-                new TestRecord<>(ANY_UNIQUE_KEY, "E-f", null, 7L),
-                new TestRecord<>(ANY_UNIQUE_KEY, "B-f", null, 7L),
-                new TestRecord<>(ANY_UNIQUE_KEY, "C-f", null, 9L),
-                new TestRecord<>(ANY_UNIQUE_KEY, "D-f", null, 15L)),
-            Arrays.asList(
-                new TestRecord<>(ANY_UNIQUE_KEY, "F-e", null, 8L),
-                new TestRecord<>(ANY_UNIQUE_KEY, "F-a", null, 8L),
-                new TestRecord<>(ANY_UNIQUE_KEY, "F-b", null, 8L),
-                new TestRecord<>(ANY_UNIQUE_KEY, "F-f", null, 8L),
-                new TestRecord<>(ANY_UNIQUE_KEY, "F-c", null, 10L),
-                new TestRecord<>(ANY_UNIQUE_KEY, "F-d", null, 14L))
+                new TestRecord<>(ANY_UNIQUE_KEY, "D-d", null, 15L))
         );
 
         leftStream.map(MockMapper.noOpKeyValueMapper())
@@ -381,7 +285,7 @@ public class StreamStreamJoinIntegrationTest extends AbstractJoinIntegrationTest
                 JoinWindows.ofTimeDifferenceAndGrace(ofSeconds(10), ofHours(24))
             ).to(OUTPUT_TOPIC);
 
-        runTestWithDriver(expectedResult);
+        runTestWithDriver(inputWithoutOutOfOrderData, expectedResult);
     }
 
     @Test
@@ -417,31 +321,7 @@ public class StreamStreamJoinIntegrationTest extends AbstractJoinIntegrationTest
                 new TestRecord<>(ANY_UNIQUE_KEY, "D-a", null, 15L),
                 new TestRecord<>(ANY_UNIQUE_KEY, "D-b", null, 15L),
                 new TestRecord<>(ANY_UNIQUE_KEY, "D-c", null, 15L),
-                new TestRecord<>(ANY_UNIQUE_KEY, "D-d", null, 15L)),
-            Arrays.asList(
-                new TestRecord<>(ANY_UNIQUE_KEY, "E-a", null, 4L),
-                new TestRecord<>(ANY_UNIQUE_KEY, "E-b", null, 6L),
-                new TestRecord<>(ANY_UNIQUE_KEY, "E-c", null, 10L),
-                new TestRecord<>(ANY_UNIQUE_KEY, "E-d", null, 14L)),
-            Arrays.asList(
-                new TestRecord<>(ANY_UNIQUE_KEY, "A-e", null, 3L),
-                new TestRecord<>(ANY_UNIQUE_KEY, "E-e", null, 4L),
-                new TestRecord<>(ANY_UNIQUE_KEY, "B-e", null, 5L),
-                new TestRecord<>(ANY_UNIQUE_KEY, "C-e", null, 9L),
-                new TestRecord<>(ANY_UNIQUE_KEY, "D-e", null, 15L)),
-            Arrays.asList(
-                new TestRecord<>(ANY_UNIQUE_KEY, "A-f", null, 7L),
-                new TestRecord<>(ANY_UNIQUE_KEY, "E-f", null, 7L),
-                new TestRecord<>(ANY_UNIQUE_KEY, "B-f", null, 7L),
-                new TestRecord<>(ANY_UNIQUE_KEY, "C-f", null, 9L),
-                new TestRecord<>(ANY_UNIQUE_KEY, "D-f", null, 15L)),
-            Arrays.asList(
-                new TestRecord<>(ANY_UNIQUE_KEY, "F-e", null, 8L),
-                new TestRecord<>(ANY_UNIQUE_KEY, "F-a", null, 8L),
-                new TestRecord<>(ANY_UNIQUE_KEY, "F-b", null, 8L),
-                new TestRecord<>(ANY_UNIQUE_KEY, "F-f", null, 8L),
-                new TestRecord<>(ANY_UNIQUE_KEY, "F-c", null, 10L),
-                new TestRecord<>(ANY_UNIQUE_KEY, "F-d", null, 14L))
+                new TestRecord<>(ANY_UNIQUE_KEY, "D-d", null, 15L))
         );
 
         leftStream.outerJoin(
@@ -450,7 +330,7 @@ public class StreamStreamJoinIntegrationTest extends AbstractJoinIntegrationTest
             JoinWindows.ofTimeDifferenceAndGrace(ofSeconds(10), ofHours(24))
         ).to(OUTPUT_TOPIC);
 
-        runTestWithDriver(expectedResult);
+        runTestWithDriver(inputWithoutOutOfOrderData, expectedResult);
     }
 
     @Test
@@ -486,31 +366,7 @@ public class StreamStreamJoinIntegrationTest extends AbstractJoinIntegrationTest
                 new TestRecord<>(ANY_UNIQUE_KEY, "D-a", null, 15L),
                 new TestRecord<>(ANY_UNIQUE_KEY, "D-b", null, 15L),
                 new TestRecord<>(ANY_UNIQUE_KEY, "D-c", null, 15L),
-                new TestRecord<>(ANY_UNIQUE_KEY, "D-d", null, 15L)),
-            Arrays.asList(
-                new TestRecord<>(ANY_UNIQUE_KEY, "E-a", null, 4L),
-                new TestRecord<>(ANY_UNIQUE_KEY, "E-b", null, 6L),
-                new TestRecord<>(ANY_UNIQUE_KEY, "E-c", null, 10L),
-                new TestRecord<>(ANY_UNIQUE_KEY, "E-d", null, 14L)),
-            Arrays.asList(
-                new TestRecord<>(ANY_UNIQUE_KEY, "A-e", null, 3L),
-                new TestRecord<>(ANY_UNIQUE_KEY, "E-e", null, 4L),
-                new TestRecord<>(ANY_UNIQUE_KEY, "B-e", null, 5L),
-                new TestRecord<>(ANY_UNIQUE_KEY, "C-e", null, 9L),
-                new TestRecord<>(ANY_UNIQUE_KEY, "D-e", null, 15L)),
-            Arrays.asList(
-                new TestRecord<>(ANY_UNIQUE_KEY, "A-f", null, 7L),
-                new TestRecord<>(ANY_UNIQUE_KEY, "E-f", null, 7L),
-                new TestRecord<>(ANY_UNIQUE_KEY, "B-f", null, 7L),
-                new TestRecord<>(ANY_UNIQUE_KEY, "C-f", null, 9L),
-                new TestRecord<>(ANY_UNIQUE_KEY, "D-f", null, 15L)),
-            Arrays.asList(
-                new TestRecord<>(ANY_UNIQUE_KEY, "F-e", null, 8L),
-                new TestRecord<>(ANY_UNIQUE_KEY, "F-a", null, 8L),
-                new TestRecord<>(ANY_UNIQUE_KEY, "F-b", null, 8L),
-                new TestRecord<>(ANY_UNIQUE_KEY, "F-f", null, 8L),
-                new TestRecord<>(ANY_UNIQUE_KEY, "F-c", null, 10L),
-                new TestRecord<>(ANY_UNIQUE_KEY, "F-d", null, 14L))
+                new TestRecord<>(ANY_UNIQUE_KEY, "D-d", null, 15L))
         );
 
         leftStream.map(MockMapper.noOpKeyValueMapper())
@@ -521,7 +377,7 @@ public class StreamStreamJoinIntegrationTest extends AbstractJoinIntegrationTest
                 JoinWindows.ofTimeDifferenceAndGrace(ofSeconds(10), ofHours(24))
             ).to(OUTPUT_TOPIC);
 
-        runTestWithDriver(expectedResult);
+        runTestWithDriver(inputWithoutOutOfOrderData, expectedResult);
     }
 
     @Test
@@ -548,7 +404,7 @@ public class StreamStreamJoinIntegrationTest extends AbstractJoinIntegrationTest
                 new TestRecord<>(ANY_UNIQUE_KEY, "C-a-b", null, 9L),
                 new TestRecord<>(ANY_UNIQUE_KEY, "C-b-a", null, 9L),
                 new TestRecord<>(ANY_UNIQUE_KEY, "C-b-b", null, 9L)),
-            Arrays.<TestRecord<Long, String>>asList(
+            Arrays.asList(
                 new TestRecord<>(ANY_UNIQUE_KEY, "A-c-a", null, 10L),
                 new TestRecord<>(ANY_UNIQUE_KEY, "A-c-b", null, 10L),
                 new TestRecord<>(ANY_UNIQUE_KEY, "B-c-a", null, 10L),
@@ -567,7 +423,7 @@ public class StreamStreamJoinIntegrationTest extends AbstractJoinIntegrationTest
             null,
             null,
             null,
-            Arrays.<TestRecord<Long, String>>asList(
+            Arrays.asList(
                 new TestRecord<>(ANY_UNIQUE_KEY, "A-d-a", null, 14L),
                 new TestRecord<>(ANY_UNIQUE_KEY, "A-d-b", null, 14L),
                 new TestRecord<>(ANY_UNIQUE_KEY, "A-d-c", null, 14L),
@@ -589,7 +445,7 @@ public class StreamStreamJoinIntegrationTest extends AbstractJoinIntegrationTest
                 new TestRecord<>(ANY_UNIQUE_KEY, "A-d-d", null, 14L),
                 new TestRecord<>(ANY_UNIQUE_KEY, "B-d-d", null, 14L),
                 new TestRecord<>(ANY_UNIQUE_KEY, "C-d-d", null, 14L)),
-            Arrays.<TestRecord<Long, String>>asList(
+            Arrays.asList(
                 new TestRecord<>(ANY_UNIQUE_KEY, "D-a-a", null, 15L),
                 new TestRecord<>(ANY_UNIQUE_KEY, "D-a-b", null, 15L),
                 new TestRecord<>(ANY_UNIQUE_KEY, "D-a-c", null, 15L),
@@ -605,163 +461,7 @@ public class StreamStreamJoinIntegrationTest extends AbstractJoinIntegrationTest
                 new TestRecord<>(ANY_UNIQUE_KEY, "D-d-a", null, 15L),
                 new TestRecord<>(ANY_UNIQUE_KEY, "D-d-b", null, 15L),
                 new TestRecord<>(ANY_UNIQUE_KEY, "D-d-c", null, 15L),
-                new TestRecord<>(ANY_UNIQUE_KEY, "D-d-d", null, 15L)),
-            Arrays.<TestRecord<Long, String>>asList(
-                new TestRecord<>(ANY_UNIQUE_KEY, "E-a-a", null, 4L),
-                new TestRecord<>(ANY_UNIQUE_KEY, "E-a-b", null, 6L),
-                new TestRecord<>(ANY_UNIQUE_KEY, "E-a-c", null, 10L),
-                new TestRecord<>(ANY_UNIQUE_KEY, "E-a-d", null, 14L),
-                new TestRecord<>(ANY_UNIQUE_KEY, "E-b-a", null, 6L),
-                new TestRecord<>(ANY_UNIQUE_KEY, "E-b-b", null, 6L),
-                new TestRecord<>(ANY_UNIQUE_KEY, "E-b-c", null, 10L),
-                new TestRecord<>(ANY_UNIQUE_KEY, "E-b-d", null, 14L),
-                new TestRecord<>(ANY_UNIQUE_KEY, "E-c-a", null, 10L),
-                new TestRecord<>(ANY_UNIQUE_KEY, "E-c-b", null, 10L),
-                new TestRecord<>(ANY_UNIQUE_KEY, "E-c-c", null, 10L),
-                new TestRecord<>(ANY_UNIQUE_KEY, "E-c-d", null, 14L),
-                new TestRecord<>(ANY_UNIQUE_KEY, "E-d-a", null, 14L),
-                new TestRecord<>(ANY_UNIQUE_KEY, "E-d-b", null, 14L),
-                new TestRecord<>(ANY_UNIQUE_KEY, "E-d-c", null, 14L),
-                new TestRecord<>(ANY_UNIQUE_KEY, "E-d-d", null, 14L)),
-            Arrays.<TestRecord<Long, String>>asList(
-                new TestRecord<>(ANY_UNIQUE_KEY, "A-e-a", null, 4L),
-                new TestRecord<>(ANY_UNIQUE_KEY, "A-e-b", null, 6L),
-                new TestRecord<>(ANY_UNIQUE_KEY, "A-e-c", null, 10L),
-                new TestRecord<>(ANY_UNIQUE_KEY, "A-e-d", null, 14L),
-                new TestRecord<>(ANY_UNIQUE_KEY, "E-e-a", null, 4L),
-                new TestRecord<>(ANY_UNIQUE_KEY, "E-e-b", null, 6L),
-                new TestRecord<>(ANY_UNIQUE_KEY, "E-e-c", null, 10L),
-                new TestRecord<>(ANY_UNIQUE_KEY, "E-e-d", null, 14L),
-                new TestRecord<>(ANY_UNIQUE_KEY, "B-e-a", null, 5L),
-                new TestRecord<>(ANY_UNIQUE_KEY, "B-e-b", null, 6L),
-                new TestRecord<>(ANY_UNIQUE_KEY, "B-e-c", null, 10L),
-                new TestRecord<>(ANY_UNIQUE_KEY, "B-e-d", null, 14L),
-                new TestRecord<>(ANY_UNIQUE_KEY, "C-e-a", null, 9L),
-                new TestRecord<>(ANY_UNIQUE_KEY, "C-e-b", null, 9L),
-                new TestRecord<>(ANY_UNIQUE_KEY, "C-e-c", null, 10L),
-                new TestRecord<>(ANY_UNIQUE_KEY, "C-e-d", null, 14L),
-                new TestRecord<>(ANY_UNIQUE_KEY, "D-e-a", null, 15L),
-                new TestRecord<>(ANY_UNIQUE_KEY, "D-e-b", null, 15L),
-                new TestRecord<>(ANY_UNIQUE_KEY, "D-e-c", null, 15L),
-                new TestRecord<>(ANY_UNIQUE_KEY, "D-e-d", null, 15L),
-                new TestRecord<>(ANY_UNIQUE_KEY, "A-e-e", null, 3L),
-                new TestRecord<>(ANY_UNIQUE_KEY, "A-a-e", null, 4L),
-                new TestRecord<>(ANY_UNIQUE_KEY, "E-a-e", null, 4L),
-                new TestRecord<>(ANY_UNIQUE_KEY, "E-e-e", null, 4L),
-                new TestRecord<>(ANY_UNIQUE_KEY, "B-a-e", null, 5L),
-                new TestRecord<>(ANY_UNIQUE_KEY, "B-e-e", null, 5L),
-                new TestRecord<>(ANY_UNIQUE_KEY, "A-b-e", null, 6L),
-                new TestRecord<>(ANY_UNIQUE_KEY, "B-b-e", null, 6L),
-                new TestRecord<>(ANY_UNIQUE_KEY, "E-b-e", null, 6L),
-                new TestRecord<>(ANY_UNIQUE_KEY, "C-a-e", null, 9L),
-                new TestRecord<>(ANY_UNIQUE_KEY, "C-b-e", null, 9L),
-                new TestRecord<>(ANY_UNIQUE_KEY, "C-e-e", null, 9L),
-                new TestRecord<>(ANY_UNIQUE_KEY, "A-c-e", null, 10L),
-                new TestRecord<>(ANY_UNIQUE_KEY, "B-c-e", null, 10L),
-                new TestRecord<>(ANY_UNIQUE_KEY, "C-c-e", null, 10L),
-                new TestRecord<>(ANY_UNIQUE_KEY, "E-c-e", null, 10L),
-                new TestRecord<>(ANY_UNIQUE_KEY, "A-d-e", null, 14L),
-                new TestRecord<>(ANY_UNIQUE_KEY, "B-d-e", null, 14L),
-                new TestRecord<>(ANY_UNIQUE_KEY, "C-d-e", null, 14L),
-                new TestRecord<>(ANY_UNIQUE_KEY, "E-d-e", null, 14L),
-                new TestRecord<>(ANY_UNIQUE_KEY, "D-a-e", null, 15L),
-                new TestRecord<>(ANY_UNIQUE_KEY, "D-b-e", null, 15L),
-                new TestRecord<>(ANY_UNIQUE_KEY, "D-c-e", null, 15L),
-                new TestRecord<>(ANY_UNIQUE_KEY, "D-d-e", null, 15L),
-                new TestRecord<>(ANY_UNIQUE_KEY, "D-e-e", null, 15L)),
-            Arrays.<TestRecord<Long, String>>asList(
-                new TestRecord<>(ANY_UNIQUE_KEY, "A-f-e", null, 7L),
-                new TestRecord<>(ANY_UNIQUE_KEY, "A-f-a", null, 7L),
-                new TestRecord<>(ANY_UNIQUE_KEY, "A-f-b", null, 7L),
-                new TestRecord<>(ANY_UNIQUE_KEY, "A-f-c", null, 10L),
-                new TestRecord<>(ANY_UNIQUE_KEY, "A-f-d", null, 14L),
-                new TestRecord<>(ANY_UNIQUE_KEY, "E-f-e", null, 7L),
-                new TestRecord<>(ANY_UNIQUE_KEY, "E-f-a", null, 7L),
-                new TestRecord<>(ANY_UNIQUE_KEY, "E-f-b", null, 7L),
-                new TestRecord<>(ANY_UNIQUE_KEY, "E-f-c", null, 10L),
-                new TestRecord<>(ANY_UNIQUE_KEY, "E-f-d", null, 14L),
-                new TestRecord<>(ANY_UNIQUE_KEY, "B-f-e", null, 7L),
-                new TestRecord<>(ANY_UNIQUE_KEY, "B-f-a", null, 7L),
-                new TestRecord<>(ANY_UNIQUE_KEY, "B-f-b", null, 7L),
-                new TestRecord<>(ANY_UNIQUE_KEY, "B-f-c", null, 10L),
-                new TestRecord<>(ANY_UNIQUE_KEY, "B-f-d", null, 14L),
-                new TestRecord<>(ANY_UNIQUE_KEY, "C-f-e", null, 9L),
-                new TestRecord<>(ANY_UNIQUE_KEY, "C-f-a", null, 9L),
-                new TestRecord<>(ANY_UNIQUE_KEY, "C-f-b", null, 9L),
-                new TestRecord<>(ANY_UNIQUE_KEY, "C-f-c", null, 10L),
-                new TestRecord<>(ANY_UNIQUE_KEY, "C-f-d", null, 14L),
-                new TestRecord<>(ANY_UNIQUE_KEY, "D-f-e", null, 15L),
-                new TestRecord<>(ANY_UNIQUE_KEY, "D-f-a", null, 15L),
-                new TestRecord<>(ANY_UNIQUE_KEY, "D-f-b", null, 15L),
-                new TestRecord<>(ANY_UNIQUE_KEY, "D-f-c", null, 15L),
-                new TestRecord<>(ANY_UNIQUE_KEY, "D-f-d", null, 15L),
-                new TestRecord<>(ANY_UNIQUE_KEY, "A-e-f", null, 7L),
-                new TestRecord<>(ANY_UNIQUE_KEY, "A-a-f", null, 7L),
-                new TestRecord<>(ANY_UNIQUE_KEY, "E-a-f", null, 7L),
-                new TestRecord<>(ANY_UNIQUE_KEY, "E-e-f", null, 7L),
-                new TestRecord<>(ANY_UNIQUE_KEY, "B-a-f", null, 7L),
-                new TestRecord<>(ANY_UNIQUE_KEY, "B-e-f", null, 7L),
-                new TestRecord<>(ANY_UNIQUE_KEY, "A-b-f", null, 7L),
-                new TestRecord<>(ANY_UNIQUE_KEY, "B-b-f", null, 7L),
-                new TestRecord<>(ANY_UNIQUE_KEY, "E-b-f", null, 7L),
-                new TestRecord<>(ANY_UNIQUE_KEY, "A-f-f", null, 7L),
-                new TestRecord<>(ANY_UNIQUE_KEY, "E-f-f", null, 7L),
-                new TestRecord<>(ANY_UNIQUE_KEY, "B-f-f", null, 7L),
-                new TestRecord<>(ANY_UNIQUE_KEY, "C-a-f", null, 9L),
-                new TestRecord<>(ANY_UNIQUE_KEY, "C-b-f", null, 9L),
-                new TestRecord<>(ANY_UNIQUE_KEY, "C-e-f", null, 9L),
-                new TestRecord<>(ANY_UNIQUE_KEY, "C-f-f", null, 9L),
-                new TestRecord<>(ANY_UNIQUE_KEY, "A-c-f", null, 10L),
-                new TestRecord<>(ANY_UNIQUE_KEY, "B-c-f", null, 10L),
-                new TestRecord<>(ANY_UNIQUE_KEY, "C-c-f", null, 10L),
-                new TestRecord<>(ANY_UNIQUE_KEY, "E-c-f", null, 10L),
-                new TestRecord<>(ANY_UNIQUE_KEY, "A-d-f", null, 14L),
-                new TestRecord<>(ANY_UNIQUE_KEY, "B-d-f", null, 14L),
-                new TestRecord<>(ANY_UNIQUE_KEY, "C-d-f", null, 14L),
-                new TestRecord<>(ANY_UNIQUE_KEY, "E-d-f", null, 14L),
-                new TestRecord<>(ANY_UNIQUE_KEY, "D-a-f", null, 15L),
-                new TestRecord<>(ANY_UNIQUE_KEY, "D-b-f", null, 15L),
-                new TestRecord<>(ANY_UNIQUE_KEY, "D-c-f", null, 15L),
-                new TestRecord<>(ANY_UNIQUE_KEY, "D-d-f", null, 15L),
-                new TestRecord<>(ANY_UNIQUE_KEY, "D-e-f", null, 15L),
-                new TestRecord<>(ANY_UNIQUE_KEY, "D-f-f", null, 15L)),
-            Arrays.<TestRecord<Long, String>>asList(
-                new TestRecord<>(ANY_UNIQUE_KEY, "F-e-e", null, 8L),
-                new TestRecord<>(ANY_UNIQUE_KEY, "F-e-a", null, 8L),
-                new TestRecord<>(ANY_UNIQUE_KEY, "F-e-b", null, 8L),
-                new TestRecord<>(ANY_UNIQUE_KEY, "F-e-f", null, 8L),
-                new TestRecord<>(ANY_UNIQUE_KEY, "F-e-c", null, 10L),
-                new TestRecord<>(ANY_UNIQUE_KEY, "F-e-d", null, 14L),
-                new TestRecord<>(ANY_UNIQUE_KEY, "F-a-e", null, 8L),
-                new TestRecord<>(ANY_UNIQUE_KEY, "F-a-a", null, 8L),
-                new TestRecord<>(ANY_UNIQUE_KEY, "F-a-b", null, 8L),
-                new TestRecord<>(ANY_UNIQUE_KEY, "F-a-f", null, 8L),
-                new TestRecord<>(ANY_UNIQUE_KEY, "F-a-c", null, 10L),
-                new TestRecord<>(ANY_UNIQUE_KEY, "F-a-d", null, 14L),
-                new TestRecord<>(ANY_UNIQUE_KEY, "F-b-e", null, 8L),
-                new TestRecord<>(ANY_UNIQUE_KEY, "F-b-a", null, 8L),
-                new TestRecord<>(ANY_UNIQUE_KEY, "F-b-b", null, 8L),
-                new TestRecord<>(ANY_UNIQUE_KEY, "F-b-f", null, 8L),
-                new TestRecord<>(ANY_UNIQUE_KEY, "F-b-c", null, 10L),
-                new TestRecord<>(ANY_UNIQUE_KEY, "F-b-d", null, 14L),
-                new TestRecord<>(ANY_UNIQUE_KEY, "F-f-e", null, 8L),
-                new TestRecord<>(ANY_UNIQUE_KEY, "F-f-a", null, 8L),
-                new TestRecord<>(ANY_UNIQUE_KEY, "F-f-b", null, 8L),
-                new TestRecord<>(ANY_UNIQUE_KEY, "F-f-f", null, 8L),
-                new TestRecord<>(ANY_UNIQUE_KEY, "F-f-c", null, 10L),
-                new TestRecord<>(ANY_UNIQUE_KEY, "F-f-d", null, 14L),
-                new TestRecord<>(ANY_UNIQUE_KEY, "F-c-e", null, 10L),
-                new TestRecord<>(ANY_UNIQUE_KEY, "F-c-a", null, 10L),
-                new TestRecord<>(ANY_UNIQUE_KEY, "F-c-b", null, 10L),
-                new TestRecord<>(ANY_UNIQUE_KEY, "F-c-f", null, 10L),
-                new TestRecord<>(ANY_UNIQUE_KEY, "F-c-c", null, 10L),
-                new TestRecord<>(ANY_UNIQUE_KEY, "F-c-d", null, 14L),
-                new TestRecord<>(ANY_UNIQUE_KEY, "F-d-e", null, 14L),
-                new TestRecord<>(ANY_UNIQUE_KEY, "F-d-a", null, 14L),
-                new TestRecord<>(ANY_UNIQUE_KEY, "F-d-b", null, 14L),
-                new TestRecord<>(ANY_UNIQUE_KEY, "F-d-f", null, 14L),
-                new TestRecord<>(ANY_UNIQUE_KEY, "F-d-c", null, 14L),
-                new TestRecord<>(ANY_UNIQUE_KEY, "F-d-d", null, 14L))
+                new TestRecord<>(ANY_UNIQUE_KEY, "D-d-d", null, 15L))
         );
 
         leftStream.join(
@@ -774,6 +474,6 @@ public class StreamStreamJoinIntegrationTest extends AbstractJoinIntegrationTest
             JoinWindows.ofTimeDifferenceAndGrace(ofSeconds(10), ofHours(24))
         ).to(OUTPUT_TOPIC);
 
-        runTestWithDriver(expectedResult);
+        runTestWithDriver(inputWithoutOutOfOrderData, expectedResult);
     }
 }
diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/StreamTableJoinIntegrationTest.java b/streams/src/test/java/org/apache/kafka/streams/integration/StreamTableJoinIntegrationTest.java
index bd95f57b15a..e4e46ec69a2 100644
--- a/streams/src/test/java/org/apache/kafka/streams/integration/StreamTableJoinIntegrationTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/integration/StreamTableJoinIntegrationTest.java
@@ -87,14 +87,18 @@ public class StreamTableJoinIntegrationTest extends AbstractJoinIntegrationTest
             null,
             null,
             null,
-            Collections.singletonList(new TestRecord<>(ANY_UNIQUE_KEY, "D-d", null,  15L)),
-            Collections.singletonList(new TestRecord<>(ANY_UNIQUE_KEY, "E-d", null,  4L)),
+            Collections.singletonList(new TestRecord<>(ANY_UNIQUE_KEY, "D-d", null,  6L)),
             null,
             null,
-            Collections.singletonList(new TestRecord<>(ANY_UNIQUE_KEY, "F-f", null,  8L))
+            null,
+            Collections.singletonList(new TestRecord<>(ANY_UNIQUE_KEY, "E-e", null,  15L)),
+            null,
+            null,
+            null,
+            null
         );
 
-        runTestWithDriver(expectedResult);
+        runTestWithDriver(input, expectedResult);
     }
 
     @Test
@@ -120,14 +124,18 @@ public class StreamTableJoinIntegrationTest extends AbstractJoinIntegrationTest
             null,
             null,
             null,
-            Collections.singletonList(new TestRecord<>(ANY_UNIQUE_KEY, "D-d", null, 15L)),
-            Collections.singletonList(new TestRecord<>(ANY_UNIQUE_KEY, "E-d", null,  4L)),
+            Collections.singletonList(new TestRecord<>(ANY_UNIQUE_KEY, "D-d", null, 6L)),
+            null,
+            null,
+            null,
+            Collections.singletonList(new TestRecord<>(ANY_UNIQUE_KEY, "E-e", null,  15L)),
             null,
             null,
-            Collections.singletonList(new TestRecord<>(ANY_UNIQUE_KEY, "F-f", null,  8L))
+            Collections.singletonList(new TestRecord<>(ANY_UNIQUE_KEY, "F-null", null,  4L)),
+            null
         );
 
-        runTestWithDriver(expectedResult);
+        runTestWithDriver(input, expectedResult);
     }
 
     @Test
@@ -154,14 +162,18 @@ public class StreamTableJoinIntegrationTest extends AbstractJoinIntegrationTest
             null,
             null,
             null,
-            Collections.singletonList(new TestRecord<>(ANY_UNIQUE_KEY, "D-d", null,  15L)),
-            Collections.singletonList(new TestRecord<>(ANY_UNIQUE_KEY, "E-a", null,  4L)),
+            Collections.singletonList(new TestRecord<>(ANY_UNIQUE_KEY, "D-b", null,  6L)),
+            null,
+            null,
+            null,
+            Collections.singletonList(new TestRecord<>(ANY_UNIQUE_KEY, "E-e", null,  15L)),
             null,
             null,
+            Collections.singletonList(new TestRecord<>(ANY_UNIQUE_KEY, "F-a", null,  4L)),
             null
         );
 
-        runTestWithDriver(expectedResult);
+        runTestWithDriver(input, expectedResult);
     }
 
     @Test
@@ -188,13 +200,17 @@ public class StreamTableJoinIntegrationTest extends AbstractJoinIntegrationTest
             null,
             null,
             null,
-            Collections.singletonList(new TestRecord<>(ANY_UNIQUE_KEY, "D-d", null, 15L)),
-            Collections.singletonList(new TestRecord<>(ANY_UNIQUE_KEY, "E-a", null,  4L)),
+            Collections.singletonList(new TestRecord<>(ANY_UNIQUE_KEY, "D-b", null, 6L)),
             null,
             null,
-            Collections.singletonList(new TestRecord<>(ANY_UNIQUE_KEY, "F-null", null,  8L))
+            null,
+            Collections.singletonList(new TestRecord<>(ANY_UNIQUE_KEY, "E-e", null,  15L)),
+            null,
+            null,
+            Collections.singletonList(new TestRecord<>(ANY_UNIQUE_KEY, "F-a", null,  4L)),
+            null
         );
 
-        runTestWithDriver(expectedResult);
+        runTestWithDriver(input, expectedResult);
     }
 }
diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/TableTableJoinIntegrationTest.java b/streams/src/test/java/org/apache/kafka/streams/integration/TableTableJoinIntegrationTest.java
index 261ac165cb5..31bcc9e0e09 100644
--- a/streams/src/test/java/org/apache/kafka/streams/integration/TableTableJoinIntegrationTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/integration/TableTableJoinIntegrationTest.java
@@ -63,11 +63,10 @@ public class TableTableJoinIntegrationTest extends AbstractJoinIntegrationTest {
         builder = new StreamsBuilder();
     }
 
-    private final TestRecord<Long, String> expectedFinalJoinResultUnversioned = new TestRecord<>(ANY_UNIQUE_KEY, "F-f", null, 8L);
-    private final TestRecord<Long, String> expectedFinalJoinResultVersioned = new TestRecord<>(ANY_UNIQUE_KEY, "D-d", null, 15L);
-    private final TestRecord<Long, String> expectedFinalJoinResultLeftVersionedOnly = new TestRecord<>(ANY_UNIQUE_KEY, "D-f", null, 15L);
-    private final TestRecord<Long, String> expectedFinalJoinResultRightVersionedOnly = new TestRecord<>(ANY_UNIQUE_KEY, "F-d", null, 14L);
-    private final TestRecord<Long, String> expectedFinalMultiJoinResult = new TestRecord<>(ANY_UNIQUE_KEY, "F-f-f", null,  8L);
+    private final TestRecord<Long, String> expectedFinalJoinResultUnversioned = new TestRecord<>(ANY_UNIQUE_KEY, "F-f", null, 4L);
+    private final TestRecord<Long, String> expectedFinalJoinResultLeftVersionedOnly = new TestRecord<>(ANY_UNIQUE_KEY, "E-f", null, 15L);
+    private final TestRecord<Long, String> expectedFinalJoinResultRightVersionedOnly = new TestRecord<>(ANY_UNIQUE_KEY, "F-e", null, 14L);
+    private final TestRecord<Long, String> expectedFinalMultiJoinResult = new TestRecord<>(ANY_UNIQUE_KEY, "F-f-f", null,  4L);
     private final String storeName = appID + "-store";
 
     private final Materialized<Long, String, KeyValueStore<Bytes, byte[]>> materialized = Materialized.<Long, String, KeyValueStore<Bytes, byte[]>>as(storeName)
@@ -85,7 +84,7 @@ public class TableTableJoinIntegrationTest extends AbstractJoinIntegrationTest {
         leftTable.join(rightTable, valueJoiner, materialized).toStream().to(OUTPUT_TOPIC);
 
         if (cacheEnabled) {
-            runTestWithDriver(expectedFinalJoinResultUnversioned, storeName);
+            runTestWithDriver(input, expectedFinalJoinResultUnversioned, storeName);
         } else {
             final List<List<TestRecord<Long, String>>> expectedResult = Arrays.asList(
                 null,
@@ -102,14 +101,18 @@ public class TableTableJoinIntegrationTest extends AbstractJoinIntegrationTest {
                 null,
                 null,
                 null,
-                Collections.singletonList(new TestRecord<>(ANY_UNIQUE_KEY, "D-d", null,  15L)),
-                Collections.singletonList(new TestRecord<>(ANY_UNIQUE_KEY, "E-d", null,  14L)),
-                Collections.singletonList(new TestRecord<>(ANY_UNIQUE_KEY, "E-e", null,  4L)),
-                Collections.singletonList(new TestRecord<>(ANY_UNIQUE_KEY, "E-f", null,  7L)),
-                Collections.singletonList(new TestRecord<>(ANY_UNIQUE_KEY, "F-f", null,  8L))
+                Collections.singletonList(new TestRecord<>(ANY_UNIQUE_KEY, "D-d", null,  7L)),
+                Collections.singletonList(new TestRecord<>(ANY_UNIQUE_KEY, null, null,  7L)),
+                null,
+                null,
+                Collections.singletonList(new TestRecord<>(ANY_UNIQUE_KEY, "E-e", null,  15L)),
+                Collections.singletonList(new TestRecord<>(ANY_UNIQUE_KEY, null, null,  14L)),
+                null,
+                null,
+                Collections.singletonList(new TestRecord<>(ANY_UNIQUE_KEY, "F-f", null,  4L))
             );
 
-            runTestWithDriver(expectedResult, storeName);
+            runTestWithDriver(input, expectedResult, storeName);
         }
     }
 
@@ -122,7 +125,7 @@ public class TableTableJoinIntegrationTest extends AbstractJoinIntegrationTest {
         leftTable.leftJoin(rightTable, valueJoiner, materialized).toStream().to(OUTPUT_TOPIC);
 
         if (cacheEnabled) {
-            runTestWithDriver(expectedFinalJoinResultUnversioned, storeName);
+            runTestWithDriver(input, expectedFinalJoinResultUnversioned, storeName);
         } else {
             final List<List<TestRecord<Long, String>>> expectedResult = Arrays.asList(
                 null,
@@ -139,14 +142,18 @@ public class TableTableJoinIntegrationTest extends AbstractJoinIntegrationTest {
                 Collections.singletonList(new TestRecord<>(ANY_UNIQUE_KEY, null, null,  12L)),
                 null,
                 null,
-                Collections.singletonList(new TestRecord<>(ANY_UNIQUE_KEY, "D-d", null,  15L)),
-                Collections.singletonList(new TestRecord<>(ANY_UNIQUE_KEY, "E-d", null,  14L)),
-                Collections.singletonList(new TestRecord<>(ANY_UNIQUE_KEY, "E-e", null,  4L)),
-                Collections.singletonList(new TestRecord<>(ANY_UNIQUE_KEY, "E-f", null,  7L)),
-                Collections.singletonList(new TestRecord<>(ANY_UNIQUE_KEY, "F-f", null,  8L))
+                Collections.singletonList(new TestRecord<>(ANY_UNIQUE_KEY, "D-d", null,  7L)),
+                Collections.singletonList(new TestRecord<>(ANY_UNIQUE_KEY, null, null,  7L)),
+                null,
+                null,
+                Collections.singletonList(new TestRecord<>(ANY_UNIQUE_KEY, "E-e", null,  15L)),
+                Collections.singletonList(new TestRecord<>(ANY_UNIQUE_KEY, null, null,  14L)),
+                null,
+                Collections.singletonList(new TestRecord<>(ANY_UNIQUE_KEY, "F-null", null,  4L)),
+                Collections.singletonList(new TestRecord<>(ANY_UNIQUE_KEY, "F-f", null,  4L))
             );
 
-            runTestWithDriver(expectedResult, storeName);
+            runTestWithDriver(input, expectedResult, storeName);
         }
     }
 
@@ -159,7 +166,7 @@ public class TableTableJoinIntegrationTest extends AbstractJoinIntegrationTest {
         leftTable.outerJoin(rightTable, valueJoiner, materialized).toStream().to(OUTPUT_TOPIC);
 
         if (cacheEnabled) {
-            runTestWithDriver(expectedFinalJoinResultUnversioned, storeName);
+            runTestWithDriver(input, expectedFinalJoinResultUnversioned, storeName);
         } else {
             final List<List<TestRecord<Long, String>>> expectedResult = Arrays.asList(
                 null,
@@ -175,15 +182,19 @@ public class TableTableJoinIntegrationTest extends AbstractJoinIntegrationTest {
                 Collections.singletonList(new TestRecord<>(ANY_UNIQUE_KEY, "C-null", null,  11L)),
                 Collections.singletonList(new TestRecord<>(ANY_UNIQUE_KEY, null, null,  12L)),
                 null,
-                Collections.singletonList(new TestRecord<>(ANY_UNIQUE_KEY, "null-d", null, 14L)),
-                Collections.singletonList(new TestRecord<>(ANY_UNIQUE_KEY, "D-d", null,  15L)),
-                Collections.singletonList(new TestRecord<>(ANY_UNIQUE_KEY, "E-d", null,  14L)),
-                Collections.singletonList(new TestRecord<>(ANY_UNIQUE_KEY, "E-e", null,  4L)),
-                Collections.singletonList(new TestRecord<>(ANY_UNIQUE_KEY, "E-f", null,  7L)),
-                Collections.singletonList(new TestRecord<>(ANY_UNIQUE_KEY, "F-f", null,  8L))
+                Collections.singletonList(new TestRecord<>(ANY_UNIQUE_KEY, "null-d", null, 7L)),
+                Collections.singletonList(new TestRecord<>(ANY_UNIQUE_KEY, "D-d", null,  7L)),
+                Collections.singletonList(new TestRecord<>(ANY_UNIQUE_KEY, "null-d", null, 7L)),
+                Collections.singletonList(new TestRecord<>(ANY_UNIQUE_KEY, null, null,  3L)),
+                Collections.singletonList(new TestRecord<>(ANY_UNIQUE_KEY, "null-e", null, 14L)),
+                Collections.singletonList(new TestRecord<>(ANY_UNIQUE_KEY, "E-e", null,  15L)),
+                Collections.singletonList(new TestRecord<>(ANY_UNIQUE_KEY, "null-e", null, 14L)),
+                Collections.singletonList(new TestRecord<>(ANY_UNIQUE_KEY, null, null,  9L)),
+                Collections.singletonList(new TestRecord<>(ANY_UNIQUE_KEY, "F-null", null,  4L)),
+                Collections.singletonList(new TestRecord<>(ANY_UNIQUE_KEY, "F-f", null,  4L))
             );
 
-            runTestWithDriver(expectedResult, storeName);
+            runTestWithDriver(input, expectedResult, storeName);
         }
     }
 
@@ -215,14 +226,18 @@ public class TableTableJoinIntegrationTest extends AbstractJoinIntegrationTest {
             null,
             null,
             null,
-            Collections.singletonList(new TestRecord<>(ANY_UNIQUE_KEY, "D-d", null,  15L)),
+            null,
+            null,
+            null,
+            null,
+            Collections.singletonList(new TestRecord<>(ANY_UNIQUE_KEY, "E-e", null,  15L)),
             null,
             null,
             null,
             null
         );
 
-        runTestWithDriver(expectedResult, storeName);
+        runTestWithDriver(input, expectedResult, storeName);
     }
 
     @Test
@@ -253,14 +268,18 @@ public class TableTableJoinIntegrationTest extends AbstractJoinIntegrationTest {
             Collections.singletonList(new TestRecord<>(ANY_UNIQUE_KEY, null, null,  12L)),
             null,
             null,
-            Collections.singletonList(new TestRecord<>(ANY_UNIQUE_KEY, "D-d", null,  15L)),
+            null,
+            null,
+            null,
+            null,
+            Collections.singletonList(new TestRecord<>(ANY_UNIQUE_KEY, "E-e", null,  15L)),
             null,
             null,
             null,
             null
         );
 
-        runTestWithDriver(expectedResult, storeName);
+        runTestWithDriver(input, expectedResult, storeName);
     }
 
     @Test
@@ -290,15 +309,19 @@ public class TableTableJoinIntegrationTest extends AbstractJoinIntegrationTest {
             Collections.singletonList(new TestRecord<>(ANY_UNIQUE_KEY, "C-null", null,  11L)),
             Collections.singletonList(new TestRecord<>(ANY_UNIQUE_KEY, null, null,  12L)),
             null,
-            Collections.singletonList(new TestRecord<>(ANY_UNIQUE_KEY, "null-d", null, 14L)),
-            Collections.singletonList(new TestRecord<>(ANY_UNIQUE_KEY, "D-d", null,  15L)),
+            null,
+            null,
+            null,
+            null,
+            Collections.singletonList(new TestRecord<>(ANY_UNIQUE_KEY, "null-e", null, 14L)),
+            Collections.singletonList(new TestRecord<>(ANY_UNIQUE_KEY, "E-e", null,  15L)),
             null,
             null,
             null,
             null
         );
 
-        runTestWithDriver(expectedResult, storeName);
+        runTestWithDriver(input, expectedResult, storeName);
     }
 
     @Test
@@ -312,7 +335,7 @@ public class TableTableJoinIntegrationTest extends AbstractJoinIntegrationTest {
         leftTable.join(rightTable, valueJoiner, materialized).toStream().to(OUTPUT_TOPIC);
 
         if (cacheEnabled) {
-            runTestWithDriver(expectedFinalJoinResultLeftVersionedOnly, storeName);
+            runTestWithDriver(input, expectedFinalJoinResultLeftVersionedOnly, storeName);
         } else {
             final List<List<TestRecord<Long, String>>> expectedResult = Arrays.asList(
                 null,
@@ -329,14 +352,18 @@ public class TableTableJoinIntegrationTest extends AbstractJoinIntegrationTest {
                 null,
                 null,
                 null,
-                Collections.singletonList(new TestRecord<>(ANY_UNIQUE_KEY, "D-d", null,  15L)),
                 null,
-                Collections.singletonList(new TestRecord<>(ANY_UNIQUE_KEY, "D-e", null,  15L)),
-                Collections.singletonList(new TestRecord<>(ANY_UNIQUE_KEY, "D-f", null,  15L)),
-                null
+                null,
+                null,
+                null,
+                Collections.singletonList(new TestRecord<>(ANY_UNIQUE_KEY, "E-e", null,  15L)),
+                null,
+                Collections.singletonList(new TestRecord<>(ANY_UNIQUE_KEY, null, null,  15L)),
+                null,
+                Collections.singletonList(new TestRecord<>(ANY_UNIQUE_KEY, "E-f", null,  15L))
             );
 
-            runTestWithDriver(expectedResult, storeName);
+            runTestWithDriver(input, expectedResult, storeName);
         }
     }
 
@@ -351,7 +378,7 @@ public class TableTableJoinIntegrationTest extends AbstractJoinIntegrationTest {
         leftTable.leftJoin(rightTable, valueJoiner, materialized).toStream().to(OUTPUT_TOPIC);
 
         if (cacheEnabled) {
-            runTestWithDriver(expectedFinalJoinResultLeftVersionedOnly, storeName);
+            runTestWithDriver(input, expectedFinalJoinResultLeftVersionedOnly, storeName);
         } else {
             final List<List<TestRecord<Long, String>>> expectedResult = Arrays.asList(
                 null,
@@ -368,14 +395,18 @@ public class TableTableJoinIntegrationTest extends AbstractJoinIntegrationTest {
                 Collections.singletonList(new TestRecord<>(ANY_UNIQUE_KEY, null, null,  12L)),
                 null,
                 null,
-                Collections.singletonList(new TestRecord<>(ANY_UNIQUE_KEY, "D-d", null,  15L)),
                 null,
-                Collections.singletonList(new TestRecord<>(ANY_UNIQUE_KEY, "D-e", null,  15L)),
-                Collections.singletonList(new TestRecord<>(ANY_UNIQUE_KEY, "D-f", null,  15L)),
-                null
+                null,
+                null,
+                null,
+                Collections.singletonList(new TestRecord<>(ANY_UNIQUE_KEY, "E-e", null,  15L)),
+                null,
+                Collections.singletonList(new TestRecord<>(ANY_UNIQUE_KEY, "E-null", null,  15L)),
+                null,
+                Collections.singletonList(new TestRecord<>(ANY_UNIQUE_KEY, "E-f", null,  15L))
             );
 
-            runTestWithDriver(expectedResult, storeName);
+            runTestWithDriver(input, expectedResult, storeName);
         }
     }
 
@@ -390,31 +421,35 @@ public class TableTableJoinIntegrationTest extends AbstractJoinIntegrationTest {
         leftTable.outerJoin(rightTable, valueJoiner, materialized).toStream().to(OUTPUT_TOPIC);
 
         if (cacheEnabled) {
-            runTestWithDriver(expectedFinalJoinResultLeftVersionedOnly, storeName);
+            runTestWithDriver(input, expectedFinalJoinResultLeftVersionedOnly, storeName);
         } else {
             final List<List<TestRecord<Long, String>>> expectedResult = Arrays.asList(
                 null,
                 null,
-                Collections.singletonList(new TestRecord<>(ANY_UNIQUE_KEY, "A-null", null,  3L)),
-                Collections.singletonList(new TestRecord<>(ANY_UNIQUE_KEY, "A-a", null,  4L)),
-                Collections.singletonList(new TestRecord<>(ANY_UNIQUE_KEY, "B-a", null,  5L)),
-                Collections.singletonList(new TestRecord<>(ANY_UNIQUE_KEY, "B-b", null,  6L)),
+                Collections.singletonList(new TestRecord<>(ANY_UNIQUE_KEY, "A-null", null, 3L)),
+                Collections.singletonList(new TestRecord<>(ANY_UNIQUE_KEY, "A-a", null, 4L)),
+                Collections.singletonList(new TestRecord<>(ANY_UNIQUE_KEY, "B-a", null, 5L)),
+                Collections.singletonList(new TestRecord<>(ANY_UNIQUE_KEY, "B-b", null, 6L)),
                 Collections.singletonList(new TestRecord<>(ANY_UNIQUE_KEY, "null-b", null, 7L)),
-                Collections.singletonList(new TestRecord<>(ANY_UNIQUE_KEY, null, null,  8L)),
-                Collections.singletonList(new TestRecord<>(ANY_UNIQUE_KEY, "C-null", null,  9L)),
-                Collections.singletonList(new TestRecord<>(ANY_UNIQUE_KEY, "C-c", null,  10L)),
-                Collections.singletonList(new TestRecord<>(ANY_UNIQUE_KEY, "C-null", null,  11L)),
-                Collections.singletonList(new TestRecord<>(ANY_UNIQUE_KEY, null, null,  12L)),
+                Collections.singletonList(new TestRecord<>(ANY_UNIQUE_KEY, null, null, 8L)),
+                Collections.singletonList(new TestRecord<>(ANY_UNIQUE_KEY, "C-null", null, 9L)),
+                Collections.singletonList(new TestRecord<>(ANY_UNIQUE_KEY, "C-c", null, 10L)),
+                Collections.singletonList(new TestRecord<>(ANY_UNIQUE_KEY, "C-null", null, 11L)),
+                Collections.singletonList(new TestRecord<>(ANY_UNIQUE_KEY, null, null, 12L)),
                 null,
-                Collections.singletonList(new TestRecord<>(ANY_UNIQUE_KEY, "null-d", null, 14L)),
-                Collections.singletonList(new TestRecord<>(ANY_UNIQUE_KEY, "D-d", null,  15L)),
+                Collections.singletonList(new TestRecord<>(ANY_UNIQUE_KEY, "null-d", null, 7L)),
                 null,
-                Collections.singletonList(new TestRecord<>(ANY_UNIQUE_KEY, "D-e", null,  15L)),
-                Collections.singletonList(new TestRecord<>(ANY_UNIQUE_KEY, "D-f", null,  15L)),
-                null
+                null,
+                Collections.singletonList(new TestRecord<>(ANY_UNIQUE_KEY, null, null, 3L)),
+                Collections.singletonList(new TestRecord<>(ANY_UNIQUE_KEY, "null-e", null, 14L)),
+                Collections.singletonList(new TestRecord<>(ANY_UNIQUE_KEY, "E-e", null, 15L)),
+                null,
+                Collections.singletonList(new TestRecord<>(ANY_UNIQUE_KEY, "E-null", null, 15L)),
+                null,
+                Collections.singletonList(new TestRecord<>(ANY_UNIQUE_KEY, "E-f", null, 15L))
             );
 
-            runTestWithDriver(expectedResult, storeName);
+            runTestWithDriver(input, expectedResult, storeName);
         }
     }
 
@@ -429,7 +464,7 @@ public class TableTableJoinIntegrationTest extends AbstractJoinIntegrationTest {
         leftTable.join(rightTable, valueJoiner, materialized).toStream().to(OUTPUT_TOPIC);
 
         if (cacheEnabled) {
-            runTestWithDriver(expectedFinalJoinResultRightVersionedOnly, storeName);
+            runTestWithDriver(input, expectedFinalJoinResultRightVersionedOnly, storeName);
         } else {
             final List<List<TestRecord<Long, String>>> expectedResult = Arrays.asList(
                 null,
@@ -446,14 +481,18 @@ public class TableTableJoinIntegrationTest extends AbstractJoinIntegrationTest {
                 null,
                 null,
                 null,
-                Collections.singletonList(new TestRecord<>(ANY_UNIQUE_KEY, "D-d", null,  15L)),
-                Collections.singletonList(new TestRecord<>(ANY_UNIQUE_KEY, "E-d", null,  14L)),
                 null,
                 null,
-                Collections.singletonList(new TestRecord<>(ANY_UNIQUE_KEY, "F-d", null,  14L))
+                null,
+                null,
+                Collections.singletonList(new TestRecord<>(ANY_UNIQUE_KEY, "E-e", null,  15L)),
+                Collections.singletonList(new TestRecord<>(ANY_UNIQUE_KEY, null, null,  14L)),
+                null,
+                Collections.singletonList(new TestRecord<>(ANY_UNIQUE_KEY, "F-e", null,  14L)),
+                null
             );
 
-            runTestWithDriver(expectedResult, storeName);
+            runTestWithDriver(input, expectedResult, storeName);
         }
     }
 
@@ -468,7 +507,7 @@ public class TableTableJoinIntegrationTest extends AbstractJoinIntegrationTest {
         leftTable.leftJoin(rightTable, valueJoiner, materialized).toStream().to(OUTPUT_TOPIC);
 
         if (cacheEnabled) {
-            runTestWithDriver(expectedFinalJoinResultRightVersionedOnly, storeName);
+            runTestWithDriver(input, expectedFinalJoinResultRightVersionedOnly, storeName);
         } else {
             final List<List<TestRecord<Long, String>>> expectedResult = Arrays.asList(
                 null,
@@ -485,14 +524,18 @@ public class TableTableJoinIntegrationTest extends AbstractJoinIntegrationTest {
                 Collections.singletonList(new TestRecord<>(ANY_UNIQUE_KEY, null, null,  12L)),
                 null,
                 null,
-                Collections.singletonList(new TestRecord<>(ANY_UNIQUE_KEY, "D-d", null,  15L)),
-                Collections.singletonList(new TestRecord<>(ANY_UNIQUE_KEY, "E-d", null,  14L)),
+                Collections.singletonList(new TestRecord<>(ANY_UNIQUE_KEY, "D-null", null,  6L)),
+                Collections.singletonList(new TestRecord<>(ANY_UNIQUE_KEY, null, null,  2L)),
+                null,
                 null,
+                Collections.singletonList(new TestRecord<>(ANY_UNIQUE_KEY, "E-e", null,  15L)),
+                Collections.singletonList(new TestRecord<>(ANY_UNIQUE_KEY, null, null,  14L)),
                 null,
-                Collections.singletonList(new TestRecord<>(ANY_UNIQUE_KEY, "F-d", null,  14L))
+                Collections.singletonList(new TestRecord<>(ANY_UNIQUE_KEY, "F-e", null,  14L)),
+                null
             );
 
-            runTestWithDriver(expectedResult, storeName);
+            runTestWithDriver(input, expectedResult, storeName);
         }
     }
 
@@ -507,7 +550,7 @@ public class TableTableJoinIntegrationTest extends AbstractJoinIntegrationTest {
         leftTable.outerJoin(rightTable, valueJoiner, materialized).toStream().to(OUTPUT_TOPIC);
 
         if (cacheEnabled) {
-            runTestWithDriver(expectedFinalJoinResultRightVersionedOnly, storeName);
+            runTestWithDriver(input, expectedFinalJoinResultRightVersionedOnly, storeName);
         } else {
             final List<List<TestRecord<Long, String>>> expectedResult = Arrays.asList(
                 null,
@@ -523,15 +566,19 @@ public class TableTableJoinIntegrationTest extends AbstractJoinIntegrationTest {
                 Collections.singletonList(new TestRecord<>(ANY_UNIQUE_KEY, "C-null", null,  11L)),
                 Collections.singletonList(new TestRecord<>(ANY_UNIQUE_KEY, null, null,  12L)),
                 null,
-                Collections.singletonList(new TestRecord<>(ANY_UNIQUE_KEY, "null-d", null, 14L)),
-                Collections.singletonList(new TestRecord<>(ANY_UNIQUE_KEY, "D-d", null,  15L)),
-                Collections.singletonList(new TestRecord<>(ANY_UNIQUE_KEY, "E-d", null,  14L)),
                 null,
+                Collections.singletonList(new TestRecord<>(ANY_UNIQUE_KEY, "D-null", null,  6L)),
+                Collections.singletonList(new TestRecord<>(ANY_UNIQUE_KEY, null, null,  2L)),
+                null,
+                Collections.singletonList(new TestRecord<>(ANY_UNIQUE_KEY, "null-e", null,  14L)),
+                Collections.singletonList(new TestRecord<>(ANY_UNIQUE_KEY, "E-e", null,  15L)),
+                Collections.singletonList(new TestRecord<>(ANY_UNIQUE_KEY, "null-e", null,  14L)),
                 null,
-                Collections.singletonList(new TestRecord<>(ANY_UNIQUE_KEY, "F-d", null,  14L))
+                Collections.singletonList(new TestRecord<>(ANY_UNIQUE_KEY, "F-e", null,  14L)),
+                null
             );
 
-            runTestWithDriver(expectedResult, storeName);
+            runTestWithDriver(input, expectedResult, storeName);
         }
     }
 
@@ -547,7 +594,7 @@ public class TableTableJoinIntegrationTest extends AbstractJoinIntegrationTest {
                  .to(OUTPUT_TOPIC);
 
         if (cacheEnabled) {
-            runTestWithDriver(expectedFinalMultiJoinResult, storeName);
+            runTestWithDriver(input, expectedFinalMultiJoinResult, storeName);
         } else {
             // TODO K6443: the duplicate below for all the multi-joins are due to
             //             KAFKA-6443, should be updated once it is fixed.
@@ -573,18 +620,10 @@ public class TableTableJoinIntegrationTest extends AbstractJoinIntegrationTest {
                 null,
                 null,
                 null,
-                Collections.singletonList(new TestRecord<>(ANY_UNIQUE_KEY, "D-d-d", null,  15L)),
-                Collections.singletonList(new TestRecord<>(ANY_UNIQUE_KEY, "E-d-d", null,  14L)),
-                Arrays.asList(
-                    new TestRecord<>(ANY_UNIQUE_KEY, "E-e-e", null,  4L),
-                    new TestRecord<>(ANY_UNIQUE_KEY, "E-e-e", null,  4L)),
-                Arrays.asList(
-                    new TestRecord<>(ANY_UNIQUE_KEY, "E-f-f", null,  7L),
-                    new TestRecord<>(ANY_UNIQUE_KEY, "E-f-f", null,  7L)),
-                Collections.singletonList(new TestRecord<>(ANY_UNIQUE_KEY, "F-f-f", null,  8L))
+                Collections.singletonList(new TestRecord<>(ANY_UNIQUE_KEY, "D-d-d", null,  15L))
             );
 
-            runTestWithDriver(expectedResult, storeName);
+            runTestWithDriver(inputWithoutOutOfOrderData, expectedResult, storeName);
         }
     }
 
@@ -600,7 +639,7 @@ public class TableTableJoinIntegrationTest extends AbstractJoinIntegrationTest {
                  .to(OUTPUT_TOPIC);
 
         if (cacheEnabled) {
-            runTestWithDriver(expectedFinalMultiJoinResult, storeName);
+            runTestWithDriver(input, expectedFinalMultiJoinResult, storeName);
         } else {
             final List<List<TestRecord<Long, String>>> expectedResult = Arrays.asList(
                 null,
@@ -623,18 +662,10 @@ public class TableTableJoinIntegrationTest extends AbstractJoinIntegrationTest {
                 null,
                 null,
                 null,
-                Collections.singletonList(new TestRecord<>(ANY_UNIQUE_KEY, "D-d-d", null,  15L)),
-                Collections.singletonList(new TestRecord<>(ANY_UNIQUE_KEY, "E-d-d", null,  14L)),
-                Arrays.asList(
-                    new TestRecord<>(ANY_UNIQUE_KEY, "E-e-e", null,  4L),
-                    new TestRecord<>(ANY_UNIQUE_KEY, "E-e-e", null,  4L)),
-                Arrays.asList(
-                    new TestRecord<>(ANY_UNIQUE_KEY, "E-f-f", null,  7L),
-                    new TestRecord<>(ANY_UNIQUE_KEY, "E-f-f", null,  7L)),
-                Collections.singletonList(new TestRecord<>(ANY_UNIQUE_KEY, "F-f-f", null,  8L))
+                Collections.singletonList(new TestRecord<>(ANY_UNIQUE_KEY, "D-d-d", null,  15L))
             );
 
-            runTestWithDriver(expectedResult, storeName);
+            runTestWithDriver(inputWithoutOutOfOrderData, expectedResult, storeName);
         }
     }
 
@@ -650,7 +681,7 @@ public class TableTableJoinIntegrationTest extends AbstractJoinIntegrationTest {
                  .to(OUTPUT_TOPIC);
 
         if (cacheEnabled) {
-            runTestWithDriver(expectedFinalMultiJoinResult, storeName);
+            runTestWithDriver(input, expectedFinalMultiJoinResult, storeName);
         } else {
             final List<List<TestRecord<Long, String>>> expectedResult = Arrays.asList(
                 null,
@@ -674,22 +705,12 @@ public class TableTableJoinIntegrationTest extends AbstractJoinIntegrationTest {
                     new TestRecord<>(ANY_UNIQUE_KEY, null, null,  11L)),
                 null,
                 null,
-                null,
-                Arrays.asList(
-                    // incorrect result `null-d` is caused by self-join of `rightTable`
-                    new TestRecord<>(ANY_UNIQUE_KEY, "null-d", null, 14L),
-                    new TestRecord<>(ANY_UNIQUE_KEY, "D-d-d", null,  15L)),
-                Collections.singletonList(new TestRecord<>(ANY_UNIQUE_KEY, "E-d-d", null,  14L)),
-                Arrays.asList(
-                    new TestRecord<>(ANY_UNIQUE_KEY, "E-e-e", null,  4L),
-                    new TestRecord<>(ANY_UNIQUE_KEY, "E-e-e", null,  4L)),
-                Arrays.asList(
-                    new TestRecord<>(ANY_UNIQUE_KEY, "E-f-f", null,  7L),
-                    new TestRecord<>(ANY_UNIQUE_KEY, "E-f-f", null,  7L)),
-                Collections.singletonList(new TestRecord<>(ANY_UNIQUE_KEY, "F-f-f", null,  8L))
+                // incorrect result `null-d` is caused by self-join of `rightTable`
+                Collections.singletonList(new TestRecord<>(ANY_UNIQUE_KEY, "null-d", null, 14L)),
+                Collections.singletonList(new TestRecord<>(ANY_UNIQUE_KEY, "D-d-d", null,  15L))
             );
 
-            runTestWithDriver(expectedResult, storeName);
+            runTestWithDriver(inputWithoutOutOfOrderData, expectedResult, storeName);
         }
     }
 
@@ -705,7 +726,7 @@ public class TableTableJoinIntegrationTest extends AbstractJoinIntegrationTest {
                  .to(OUTPUT_TOPIC);
 
         if (cacheEnabled) {
-            runTestWithDriver(expectedFinalMultiJoinResult, storeName);
+            runTestWithDriver(input, expectedFinalMultiJoinResult, storeName);
         } else {
             final List<List<TestRecord<Long, String>>> expectedResult = Arrays.asList(
                 null,
@@ -728,18 +749,10 @@ public class TableTableJoinIntegrationTest extends AbstractJoinIntegrationTest {
                 null,
                 null,
                 null,
-                Collections.singletonList(new TestRecord<>(ANY_UNIQUE_KEY, "D-d-d", null,  15L)),
-                Collections.singletonList(new TestRecord<>(ANY_UNIQUE_KEY, "E-d-d", null,  14L)),
-                Arrays.asList(
-                    new TestRecord<>(ANY_UNIQUE_KEY, "E-e-e", null,  4L),
-                    new TestRecord<>(ANY_UNIQUE_KEY, "E-e-e", null,  4L)),
-                Arrays.asList(
-                    new TestRecord<>(ANY_UNIQUE_KEY, "E-f-f", null,  7L),
-                    new TestRecord<>(ANY_UNIQUE_KEY, "E-f-f", null,  7L)),
-                Collections.singletonList(new TestRecord<>(ANY_UNIQUE_KEY, "F-f-f", null,  8L))
+                Collections.singletonList(new TestRecord<>(ANY_UNIQUE_KEY, "D-d-d", null,  15L))
             );
 
-            runTestWithDriver(expectedResult, storeName);
+            runTestWithDriver(inputWithoutOutOfOrderData, expectedResult, storeName);
         }
     }
 
@@ -755,14 +768,13 @@ public class TableTableJoinIntegrationTest extends AbstractJoinIntegrationTest {
                  .to(OUTPUT_TOPIC);
 
         if (cacheEnabled) {
-            runTestWithDriver(expectedFinalMultiJoinResult, storeName);
+            runTestWithDriver(input, expectedFinalMultiJoinResult, storeName);
         } else {
             final List<List<TestRecord<Long, String>>> expectedResult = Arrays.asList(
                 null,
                 null,
-                null,
+                Collections.singletonList(new TestRecord<>(ANY_UNIQUE_KEY, "A-null-null", null,  3L)),
                 Arrays.asList(
-                    new TestRecord<>(ANY_UNIQUE_KEY, "A-null-null", null,  3L),
                     new TestRecord<>(ANY_UNIQUE_KEY, "A-a-a", null,  4L),
                     new TestRecord<>(ANY_UNIQUE_KEY, "A-a-a", null,  4L)),
                 Collections.singletonList(new TestRecord<>(ANY_UNIQUE_KEY, "B-a-a", null,  5L)),
@@ -771,9 +783,8 @@ public class TableTableJoinIntegrationTest extends AbstractJoinIntegrationTest {
                     new TestRecord<>(ANY_UNIQUE_KEY, "B-b-b", null,  6L)),
                 Collections.singletonList(new TestRecord<>(ANY_UNIQUE_KEY, null, null,  7L)),
                 null,
-                null,
+                Collections.singletonList(new TestRecord<>(ANY_UNIQUE_KEY, "C-null-null", null,  9L)),
                 Arrays.asList(
-                    new TestRecord<>(ANY_UNIQUE_KEY, "C-null-null", null,  9L),
                     new TestRecord<>(ANY_UNIQUE_KEY, "C-c-c", null,  10L),
                     new TestRecord<>(ANY_UNIQUE_KEY, "C-c-c", null,  10L)),
                 Arrays.asList(
@@ -782,18 +793,10 @@ public class TableTableJoinIntegrationTest extends AbstractJoinIntegrationTest {
                 Collections.singletonList(new TestRecord<>(ANY_UNIQUE_KEY, null, null,  12L)),
                 null,
                 null,
-                Collections.singletonList(new TestRecord<>(ANY_UNIQUE_KEY, "D-d-d", null,  15L)),
-                Collections.singletonList(new TestRecord<>(ANY_UNIQUE_KEY, "E-d-d", null,  14L)),
-                Arrays.asList(
-                    new TestRecord<>(ANY_UNIQUE_KEY, "E-e-e", null,  4L),
-                    new TestRecord<>(ANY_UNIQUE_KEY, "E-e-e", null,  4L)),
-                Arrays.asList(
-                    new TestRecord<>(ANY_UNIQUE_KEY, "E-f-f", null,  7L),
-                    new TestRecord<>(ANY_UNIQUE_KEY, "E-f-f", null,  7L)),
-                Collections.singletonList(new TestRecord<>(ANY_UNIQUE_KEY, "F-f-f", null,  8L))
+                Collections.singletonList(new TestRecord<>(ANY_UNIQUE_KEY, "D-d-d", null,  15L))
             );
 
-            runTestWithDriver(expectedResult, storeName);
+            runTestWithDriver(inputWithoutOutOfOrderData, expectedResult, storeName);
         }
     }
 
@@ -809,14 +812,13 @@ public class TableTableJoinIntegrationTest extends AbstractJoinIntegrationTest {
                  .to(OUTPUT_TOPIC);
 
         if (cacheEnabled) {
-            runTestWithDriver(expectedFinalMultiJoinResult, storeName);
+            runTestWithDriver(input, expectedFinalMultiJoinResult, storeName);
         } else {
             final List<List<TestRecord<Long, String>>> expectedResult = Arrays.asList(
                 null,
                 null,
-                null,
+                Collections.singletonList(new TestRecord<>(ANY_UNIQUE_KEY, "A-null-null", null,  3L)),
                 Arrays.asList(
-                    new TestRecord<>(ANY_UNIQUE_KEY, "A-null-null", null,  3L),
                     new TestRecord<>(ANY_UNIQUE_KEY, "A-a-a", null,  4L),
                     new TestRecord<>(ANY_UNIQUE_KEY, "A-a-a", null,  4L)),
                 Collections.singletonList(new TestRecord<>(ANY_UNIQUE_KEY, "B-a-a", null,  5L)),
@@ -825,9 +827,8 @@ public class TableTableJoinIntegrationTest extends AbstractJoinIntegrationTest {
                     new TestRecord<>(ANY_UNIQUE_KEY, "B-b-b", null,  6L)),
                 Collections.singletonList(new TestRecord<>(ANY_UNIQUE_KEY, "null-b", null, 7L)),
                 Collections.singletonList(new TestRecord<>(ANY_UNIQUE_KEY, null, null,  8L)),
-                null,
+                Collections.singletonList(new TestRecord<>(ANY_UNIQUE_KEY, "C-null-null", null,  9L)),
                 Arrays.asList(
-                    new TestRecord<>(ANY_UNIQUE_KEY, "C-null-null", null,  9L),
                     new TestRecord<>(ANY_UNIQUE_KEY, "C-c-c", null,  10L),
                     new TestRecord<>(ANY_UNIQUE_KEY, "C-c-c", null,  10L)),
                 Arrays.asList(
@@ -835,21 +836,11 @@ public class TableTableJoinIntegrationTest extends AbstractJoinIntegrationTest {
                     new TestRecord<>(ANY_UNIQUE_KEY, "C-null-null", null,  11L)),
                 Collections.singletonList(new TestRecord<>(ANY_UNIQUE_KEY, null, null,  12L)),
                 null,
-                null,
-                Arrays.asList(
-                    new TestRecord<>(ANY_UNIQUE_KEY, "null-d", null, 14L),
-                    new TestRecord<>(ANY_UNIQUE_KEY, "D-d-d", null,  15L)),
-                Collections.singletonList(new TestRecord<>(ANY_UNIQUE_KEY, "E-d-d", null,  14L)),
-                Arrays.asList(
-                    new TestRecord<>(ANY_UNIQUE_KEY, "E-e-e", null,  4L),
-                    new TestRecord<>(ANY_UNIQUE_KEY, "E-e-e", null,  4L)),
-                Arrays.asList(
-                    new TestRecord<>(ANY_UNIQUE_KEY, "E-f-f", null,  7L),
-                    new TestRecord<>(ANY_UNIQUE_KEY, "E-f-f", null,  7L)),
-                Collections.singletonList(new TestRecord<>(ANY_UNIQUE_KEY, "F-f-f", null,  8L))
+                Collections.singletonList(new TestRecord<>(ANY_UNIQUE_KEY, "null-d", null, 14L)),
+                Collections.singletonList(new TestRecord<>(ANY_UNIQUE_KEY, "D-d-d", null,  15L))
             );
 
-            runTestWithDriver(expectedResult, storeName);
+            runTestWithDriver(inputWithoutOutOfOrderData, expectedResult, storeName);
         }
     }
 
@@ -865,7 +856,7 @@ public class TableTableJoinIntegrationTest extends AbstractJoinIntegrationTest {
                  .to(OUTPUT_TOPIC);
 
         if (cacheEnabled) {
-            runTestWithDriver(expectedFinalMultiJoinResult, storeName);
+            runTestWithDriver(input, expectedFinalMultiJoinResult, storeName);
         } else {
             final List<List<TestRecord<Long, String>>> expectedResult = Arrays.asList(
                 null,
@@ -890,18 +881,10 @@ public class TableTableJoinIntegrationTest extends AbstractJoinIntegrationTest {
                 Arrays.asList(
                     new TestRecord<>(ANY_UNIQUE_KEY, "null-d-d", null, 14L),
                     new TestRecord<>(ANY_UNIQUE_KEY, "null-d-d", null, 14L)),
-                Collections.singletonList(new TestRecord<>(ANY_UNIQUE_KEY, "D-d-d", null,  15L)),
-                Collections.singletonList(new TestRecord<>(ANY_UNIQUE_KEY, "E-d-d", null,  14L)),
-                Arrays.asList(
-                    new TestRecord<>(ANY_UNIQUE_KEY, "E-e-e", null,  4L),
-                    new TestRecord<>(ANY_UNIQUE_KEY, "E-e-e", null,  4L)),
-                Arrays.asList(
-                    new TestRecord<>(ANY_UNIQUE_KEY, "E-f-f", null,  7L),
-                    new TestRecord<>(ANY_UNIQUE_KEY, "E-f-f", null,  7L)),
-                Collections.singletonList(new TestRecord<>(ANY_UNIQUE_KEY, "F-f-f", null,  8L))
+                Collections.singletonList(new TestRecord<>(ANY_UNIQUE_KEY, "D-d-d", null,  15L))
             );
 
-            runTestWithDriver(expectedResult, storeName);
+            runTestWithDriver(inputWithoutOutOfOrderData, expectedResult, storeName);
         }
     }
 
@@ -917,14 +900,13 @@ public class TableTableJoinIntegrationTest extends AbstractJoinIntegrationTest {
                  .to(OUTPUT_TOPIC);
 
         if (cacheEnabled) {
-            runTestWithDriver(expectedFinalMultiJoinResult, storeName);
+            runTestWithDriver(input, expectedFinalMultiJoinResult, storeName);
         } else {
             final List<List<TestRecord<Long, String>>> expectedResult = Arrays.asList(
                 null,
                 null,
-                null,
+                Collections.singletonList(new TestRecord<>(ANY_UNIQUE_KEY, "A-null-null", null,  3L)),
                 Arrays.asList(
-                    new TestRecord<>(ANY_UNIQUE_KEY, "A-null-null", null,  3L),
                     new TestRecord<>(ANY_UNIQUE_KEY, "A-a-a", null,  4L),
                     new TestRecord<>(ANY_UNIQUE_KEY, "A-a-a", null,  4L)),
                 Collections.singletonList(new TestRecord<>(ANY_UNIQUE_KEY, "B-a-a", null,  5L)),
@@ -933,9 +915,8 @@ public class TableTableJoinIntegrationTest extends AbstractJoinIntegrationTest {
                     new TestRecord<>(ANY_UNIQUE_KEY, "B-b-b", null,  6L)),
                 Collections.singletonList(new TestRecord<>(ANY_UNIQUE_KEY, "null-b-b", null, 7L)),
                 Collections.singletonList(new TestRecord<>(ANY_UNIQUE_KEY, null, null,  8L)),
-                null,
+                Collections.singletonList(new TestRecord<>(ANY_UNIQUE_KEY, "C-null-null", null,  9L)),
                 Arrays.asList(
-                    new TestRecord<>(ANY_UNIQUE_KEY, "C-null-null", null,  9L),
                     new TestRecord<>(ANY_UNIQUE_KEY, "C-c-c", null,  10L),
                     new TestRecord<>(ANY_UNIQUE_KEY, "C-c-c", null,  10L)),
                 Arrays.asList(
@@ -946,18 +927,10 @@ public class TableTableJoinIntegrationTest extends AbstractJoinIntegrationTest {
                 Arrays.asList(
                     new TestRecord<>(ANY_UNIQUE_KEY, "null-d-d", null, 14L),
                     new TestRecord<>(ANY_UNIQUE_KEY, "null-d-d", null, 14L)),
-                Collections.singletonList(new TestRecord<>(ANY_UNIQUE_KEY, "D-d-d", null,  15L)),
-                Collections.singletonList(new TestRecord<>(ANY_UNIQUE_KEY, "E-d-d", null,  14L)),
-                Arrays.asList(
-                    new TestRecord<>(ANY_UNIQUE_KEY, "E-e-e", null,  4L),
-                    new TestRecord<>(ANY_UNIQUE_KEY, "E-e-e", null,  4L)),
-                Arrays.asList(
-                    new TestRecord<>(ANY_UNIQUE_KEY, "E-f-f", null,  7L),
-                    new TestRecord<>(ANY_UNIQUE_KEY, "E-f-f", null,  7L)),
-                Collections.singletonList(new TestRecord<>(ANY_UNIQUE_KEY, "F-f-f", null,  8L))
+                Collections.singletonList(new TestRecord<>(ANY_UNIQUE_KEY, "D-d-d", null,  15L))
             );
 
-            runTestWithDriver(expectedResult, storeName);
+            runTestWithDriver(inputWithoutOutOfOrderData, expectedResult, storeName);
         }
     }
 
@@ -973,14 +946,13 @@ public class TableTableJoinIntegrationTest extends AbstractJoinIntegrationTest {
                  .to(OUTPUT_TOPIC);
 
         if (cacheEnabled) {
-            runTestWithDriver(expectedFinalMultiJoinResult, storeName);
+            runTestWithDriver(input, expectedFinalMultiJoinResult, storeName);
         } else {
             final List<List<TestRecord<Long, String>>> expectedResult = Arrays.asList(
                 null,
                 null,
-                null,
+                Collections.singletonList(new TestRecord<>(ANY_UNIQUE_KEY, "A-null-null", null,  3L)),
                 Arrays.asList(
-                    new TestRecord<>(ANY_UNIQUE_KEY, "A-null-null", null,  3L),
                     new TestRecord<>(ANY_UNIQUE_KEY, "A-a-a", null,  4L),
                     new TestRecord<>(ANY_UNIQUE_KEY, "A-a-a", null,  4L)),
                 Collections.singletonList(new TestRecord<>(ANY_UNIQUE_KEY, "B-a-a", null,  5L)),
@@ -1000,21 +972,12 @@ public class TableTableJoinIntegrationTest extends AbstractJoinIntegrationTest {
                     new TestRecord<>(ANY_UNIQUE_KEY, "C-null-null", null,  11L)),
                 Collections.singletonList(new TestRecord<>(ANY_UNIQUE_KEY, null, null,  12L)),
                 null,
-                null,
                 Arrays.asList(
                     new TestRecord<>(ANY_UNIQUE_KEY, "null-d-d", null,  14L),
-                    new TestRecord<>(ANY_UNIQUE_KEY, "null-d-d", null,  14L),
-                    new TestRecord<>(ANY_UNIQUE_KEY, "D-d-d", null,  15L)),
-                Collections.singletonList(new TestRecord<>(ANY_UNIQUE_KEY, "E-d-d", null,  14L)),
-                Arrays.asList(
-                    new TestRecord<>(ANY_UNIQUE_KEY, "E-e-e", null,  4L),
-                    new TestRecord<>(ANY_UNIQUE_KEY, "E-e-e", null,  4L)),
-                Arrays.asList(
-                    new TestRecord<>(ANY_UNIQUE_KEY, "E-f-f", null,  7L),
-                    new TestRecord<>(ANY_UNIQUE_KEY, "E-f-f", null,  7L)),
-                Collections.singletonList(new TestRecord<>(ANY_UNIQUE_KEY, "F-f-f", null,  8L))
+                    new TestRecord<>(ANY_UNIQUE_KEY, "null-d-d", null,  14L)),
+                Collections.singletonList(new TestRecord<>(ANY_UNIQUE_KEY, "D-d-d", null,  15L))
             );
-            runTestWithDriver(expectedResult, storeName);
+            runTestWithDriver(inputWithoutOutOfOrderData, expectedResult, storeName);
         }
     }
 }
diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/InternalStreamsBuilderTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/InternalStreamsBuilderTest.java
index 7449f8d3cde..1106e914cf3 100644
--- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/InternalStreamsBuilderTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/InternalStreamsBuilderTest.java
@@ -33,7 +33,12 @@ import org.apache.kafka.streams.kstream.KStream;
 import org.apache.kafka.streams.kstream.KTable;
 import org.apache.kafka.streams.kstream.KeyValueMapper;
 import org.apache.kafka.streams.kstream.Materialized;
+import org.apache.kafka.streams.kstream.internals.foreignkeyjoin.ForeignTableJoinProcessorSupplier;
+import org.apache.kafka.streams.kstream.internals.foreignkeyjoin.SubscriptionSendProcessorSupplier;
+import org.apache.kafka.streams.kstream.internals.graph.ForeignTableJoinNode;
+import org.apache.kafka.streams.kstream.internals.graph.ForeignJoinSubscriptionSendNode;
 import org.apache.kafka.streams.kstream.internals.graph.GraphNode;
+import org.apache.kafka.streams.kstream.internals.graph.KTableKTableJoinNode;
 import org.apache.kafka.streams.kstream.internals.graph.StreamStreamJoinNode;
 import org.apache.kafka.streams.kstream.internals.graph.TableFilterNode;
 import org.apache.kafka.streams.kstream.internals.graph.TableRepartitionMapNode;
@@ -1009,6 +1014,176 @@ public class InternalStreamsBuilderTest {
         verifyVersionedSemantics((TableRepartitionMapNode<?, ?>) repartitionMap, true);
     }
 
+    @Test
+    public void shouldSetUseVersionedSemanticsOnTableJoin() {
+        // Given:
+        final MaterializedInternal<String, String, KeyValueStore<Bytes, byte[]>> versionedMaterialize =
+            new MaterializedInternal<>(Materialized.as(Stores.persistentVersionedKeyValueStore("versioned", Duration.ofMinutes(5))), builder, storePrefix);
+        final MaterializedInternal<String, String, KeyValueStore<Bytes, byte[]>> versionedMaterialize2 =
+            new MaterializedInternal<>(Materialized.as(Stores.persistentVersionedKeyValueStore("versioned2", Duration.ofMinutes(5))), builder, storePrefix);
+        final KTable<String, String> table1 = builder.table("t1", consumed, versionedMaterialize);
+        final KTable<String, String> table2 = builder.table("t2", consumed, versionedMaterialize2);
+        table1.join(table2, (v1, v2) -> v1 + v2);
+
+        // When:
+        builder.buildAndOptimizeTopology();
+
+        // Then:
+        final GraphNode join = getNodeByType(builder.root, KTableKTableJoinNode.class, new HashSet<>());
+        assertNotNull(join);
+        verifyVersionedSemantics((KTableKTableJoinNode<?, ?, ?, ?>) join, true, true);
+    }
+
+    @Test
+    public void shouldSetUseVersionedSemanticsOnTableJoinLeftOnly() {
+        // Given:
+        final MaterializedInternal<String, String, KeyValueStore<Bytes, byte[]>> versionedMaterialize =
+            new MaterializedInternal<>(Materialized.as(Stores.persistentVersionedKeyValueStore("versioned", Duration.ofMinutes(5))), builder, storePrefix);
+        final MaterializedInternal<String, String, KeyValueStore<Bytes, byte[]>> unversionedMaterialize =
+            new MaterializedInternal<>(Materialized.as("unversioned"), builder, storePrefix);
+        final KTable<String, String> table1 = builder.table("t1", consumed, versionedMaterialize);
+        final KTable<String, String> table2 = builder.table("t2", consumed, unversionedMaterialize);
+        table1.join(table2, (v1, v2) -> v1 + v2);
+
+        // When:
+        builder.buildAndOptimizeTopology();
+
+        // Then:
+        final GraphNode join = getNodeByType(builder.root, KTableKTableJoinNode.class, new HashSet<>());
+        assertNotNull(join);
+        verifyVersionedSemantics((KTableKTableJoinNode<?, ?, ?, ?>) join, true, false);
+    }
+
+    @Test
+    public void shouldSetUseVersionedSemanticsOnTableJoinRightOnly() {
+        // Given:
+        final MaterializedInternal<String, String, KeyValueStore<Bytes, byte[]>> versionedMaterialize =
+            new MaterializedInternal<>(Materialized.as(Stores.persistentVersionedKeyValueStore("versioned", Duration.ofMinutes(5))), builder, storePrefix);
+        final MaterializedInternal<String, String, KeyValueStore<Bytes, byte[]>> unversionedMaterialize =
+            new MaterializedInternal<>(Materialized.as("unversioned"), builder, storePrefix);
+        final KTable<String, String> table1 = builder.table("t1", consumed, unversionedMaterialize);
+        final KTable<String, String> table2 = builder.table("t2", consumed, versionedMaterialize);
+        table1.join(table2, (v1, v2) -> v1 + v2);
+
+        // When:
+        builder.buildAndOptimizeTopology();
+
+        // Then:
+        final GraphNode join = getNodeByType(builder.root, KTableKTableJoinNode.class, new HashSet<>());
+        assertNotNull(join);
+        verifyVersionedSemantics((KTableKTableJoinNode<?, ?, ?, ?>) join, false, true);
+    }
+
+    @Test
+    public void shouldSetUseVersionedSemanticsOnTableSelfJoin() {
+        // Given:
+        final MaterializedInternal<String, String, KeyValueStore<Bytes, byte[]>> versionedMaterialize =
+            new MaterializedInternal<>(Materialized.as(Stores.persistentVersionedKeyValueStore("versioned", Duration.ofMinutes(5))), builder, storePrefix);
+        final KTable<String, String> table1 = builder.table("t1", consumed, versionedMaterialize);
+        table1.join(table1, (v1, v2) -> v1 + v2);
+
+        // When:
+        builder.buildAndOptimizeTopology();
+
+        // Then:
+        final GraphNode join = getNodeByType(builder.root, KTableKTableJoinNode.class, new HashSet<>());
+        assertNotNull(join);
+        verifyVersionedSemantics((KTableKTableJoinNode<?, ?, ?, ?>) join, true, true);
+    }
+
+    @Test
+    public void shouldSetUseVersionedSemanticsOnTableForeignJoin() {
+        // Given:
+        final MaterializedInternal<String, String, KeyValueStore<Bytes, byte[]>> versionedMaterialize =
+            new MaterializedInternal<>(Materialized.as(Stores.persistentVersionedKeyValueStore("versioned", Duration.ofMinutes(5))), builder, storePrefix);
+        final MaterializedInternal<String, String, KeyValueStore<Bytes, byte[]>> versionedMaterialize2 =
+            new MaterializedInternal<>(Materialized.as(Stores.persistentVersionedKeyValueStore("versioned2", Duration.ofMinutes(5))), builder, storePrefix);
+        final KTable<String, String> table1 = builder.table("t1", consumed, versionedMaterialize);
+        final KTable<String, String> table2 = builder.table("t2", consumed, versionedMaterialize2);
+        table1.join(table2, v -> v, (v1, v2) -> v1 + v2);
+
+        // When:
+        builder.buildAndOptimizeTopology();
+
+        // Then:
+        final GraphNode joinThis = getNodeByType(builder.root, ForeignJoinSubscriptionSendNode.class, new HashSet<>());
+        assertNotNull(joinThis);
+        verifyVersionedSemantics((ForeignJoinSubscriptionSendNode<?, ?>) joinThis, true);
+
+        final GraphNode joinOther = getNodeByType(builder.root, ForeignTableJoinNode.class, new HashSet<>());
+        assertNotNull(joinOther);
+        verifyVersionedSemantics((ForeignTableJoinNode<?, ?>) joinOther, true);
+    }
+
+    @Test
+    public void shouldSetUseVersionedSemanticsOnTableForeignJoinLeftOnly() {
+        // Given:
+        final MaterializedInternal<String, String, KeyValueStore<Bytes, byte[]>> versionedMaterialize =
+            new MaterializedInternal<>(Materialized.as(Stores.persistentVersionedKeyValueStore("versioned", Duration.ofMinutes(5))), builder, storePrefix);
+        final MaterializedInternal<String, String, KeyValueStore<Bytes, byte[]>> unversionedMaterialize =
+            new MaterializedInternal<>(Materialized.as("unversioned"), builder, storePrefix);
+        final KTable<String, String> table1 = builder.table("t1", consumed, versionedMaterialize);
+        final KTable<String, String> table2 = builder.table("t2", consumed, unversionedMaterialize);
+        table1.join(table2, v -> v, (v1, v2) -> v1 + v2);
+
+        // When:
+        builder.buildAndOptimizeTopology();
+
+        // Then:
+        final GraphNode joinThis = getNodeByType(builder.root, ForeignJoinSubscriptionSendNode.class, new HashSet<>());
+        assertNotNull(joinThis);
+        verifyVersionedSemantics((ForeignJoinSubscriptionSendNode<?, ?>) joinThis, true);
+
+        final GraphNode joinOther = getNodeByType(builder.root, ForeignTableJoinNode.class, new HashSet<>());
+        assertNotNull(joinOther);
+        verifyVersionedSemantics((ForeignTableJoinNode<?, ?>) joinOther, false);
+    }
+
+    @Test
+    public void shouldSetUseVersionedSemanticsOnTableForeignJoinRightOnly() {
+        // Given:
+        final MaterializedInternal<String, String, KeyValueStore<Bytes, byte[]>> versionedMaterialize =
+            new MaterializedInternal<>(Materialized.as(Stores.persistentVersionedKeyValueStore("versioned", Duration.ofMinutes(5))), builder, storePrefix);
+        final MaterializedInternal<String, String, KeyValueStore<Bytes, byte[]>> unversionedMaterialize =
+            new MaterializedInternal<>(Materialized.as("unversioned"), builder, storePrefix);
+        final KTable<String, String> table1 = builder.table("t1", consumed, unversionedMaterialize);
+        final KTable<String, String> table2 = builder.table("t2", consumed, versionedMaterialize);
+        table1.join(table2, v -> v, (v1, v2) -> v1 + v2);
+
+        // When:
+        builder.buildAndOptimizeTopology();
+
+        // Then:
+        final GraphNode joinThis = getNodeByType(builder.root, ForeignJoinSubscriptionSendNode.class, new HashSet<>());
+        assertNotNull(joinThis);
+        verifyVersionedSemantics((ForeignJoinSubscriptionSendNode<?, ?>) joinThis, false);
+
+        final GraphNode joinOther = getNodeByType(builder.root, ForeignTableJoinNode.class, new HashSet<>());
+        assertNotNull(joinOther);
+        verifyVersionedSemantics((ForeignTableJoinNode<?, ?>) joinOther, true);
+    }
+
+    @Test
+    public void shouldSetUseVersionedSemanticsOnTableForeignSelfJoin() {
+        // Given:
+        final MaterializedInternal<String, String, KeyValueStore<Bytes, byte[]>> versionedMaterialize =
+            new MaterializedInternal<>(Materialized.as(Stores.persistentVersionedKeyValueStore("versioned", Duration.ofMinutes(5))), builder, storePrefix);
+        final KTable<String, String> table1 = builder.table("t1", consumed, versionedMaterialize);
+        table1.join(table1, v -> v, (v1, v2) -> v1 + v2);
+
+        // When:
+        builder.buildAndOptimizeTopology();
+
+        // Then:
+        final GraphNode joinThis = getNodeByType(builder.root, ForeignJoinSubscriptionSendNode.class, new HashSet<>());
+        assertNotNull(joinThis);
+        verifyVersionedSemantics((ForeignJoinSubscriptionSendNode<?, ?>) joinThis, true);
+
+        final GraphNode joinOther = getNodeByType(builder.root, ForeignTableJoinNode.class, new HashSet<>());
+        assertNotNull(joinOther);
+        verifyVersionedSemantics((ForeignTableJoinNode<?, ?>) joinOther, true);
+    }
+
     private void verifyVersionedSemantics(final TableFilterNode<?, ?> filterNode, final boolean expectedValue) {
         final ProcessorSupplier<?, ?, ?, ?> processorSupplier = filterNode.processorParameters().processorSupplier();
         assertTrue(processorSupplier instanceof KTableFilter);
@@ -1023,6 +1198,32 @@ public class InternalStreamsBuilderTest {
         assertEquals(expectedValue, repartitionMap.isUseVersionedSemantics());
     }
 
+    private void verifyVersionedSemantics(final KTableKTableJoinNode<?, ?, ?, ?> joinNode, final boolean expectedValueLeft, final boolean expectedValueRight) {
+        final ProcessorSupplier<?, ?, ?, ?> thisProcessorSupplier = joinNode.thisProcessorParameters().processorSupplier();
+        assertTrue(thisProcessorSupplier instanceof KTableKTableAbstractJoin);
+        final KTableKTableAbstractJoin<?, ?, ?, ?> thisJoin = (KTableKTableAbstractJoin<?, ?, ?, ?>) thisProcessorSupplier;
+        assertEquals(expectedValueLeft, thisJoin.isUseVersionedSemantics());
+
+        final ProcessorSupplier<?, ?, ?, ?> otherProcessorSupplier = joinNode.otherProcessorParameters().processorSupplier();
+        assertTrue(otherProcessorSupplier instanceof KTableKTableAbstractJoin);
+        final KTableKTableAbstractJoin<?, ?, ?, ?> otherJoin = (KTableKTableAbstractJoin<?, ?, ?, ?>) otherProcessorSupplier;
+        assertEquals(expectedValueRight, otherJoin.isUseVersionedSemantics());
+    }
+
+    private void verifyVersionedSemantics(final ForeignJoinSubscriptionSendNode<?, ?> joinThisNode, final boolean expectedValue) {
+        final ProcessorSupplier<?, ?, ?, ?> thisProcessorSupplier = joinThisNode.processorParameters().processorSupplier();
+        assertTrue(thisProcessorSupplier instanceof SubscriptionSendProcessorSupplier);
+        final SubscriptionSendProcessorSupplier<?, ?, ?> joinThis = (SubscriptionSendProcessorSupplier<?, ?, ?>) thisProcessorSupplier;
+        assertEquals(expectedValue, joinThis.isUseVersionedSemantics());
+    }
+
+    private void verifyVersionedSemantics(final ForeignTableJoinNode<?, ?> joinOtherNode, final boolean expectedValue) {
+        final ProcessorSupplier<?, ?, ?, ?> otherProcessorSupplier = joinOtherNode.processorParameters().processorSupplier();
+        assertTrue(otherProcessorSupplier instanceof ForeignTableJoinProcessorSupplier);
+        final ForeignTableJoinProcessorSupplier<?, ?, ?> joinThis = (ForeignTableJoinProcessorSupplier<?, ?, ?>) otherProcessorSupplier;
+        assertEquals(expectedValue, joinThis.isUseVersionedSemantics());
+    }
+
     private GraphNode getNodeByType(
         final GraphNode currentNode,
         final Class<? extends GraphNode> clazz,
diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamImplTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamImplTest.java
index 54bb14aa0ec..81a2f1daf5b 100644
--- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamImplTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamImplTest.java
@@ -3149,7 +3149,7 @@ public class KStreamImplTest {
                 "    Processor: KTABLE-FK-JOIN-OUTPUT-0000000018 (stores: [])\n" +
                 "      --> KTABLE-TOSTREAM-0000000020\n" +
                 "      <-- KTABLE-FK-JOIN-SUBSCRIPTION-RESPONSE-RESOLVER-PROCESSOR-0000000017\n" +
-                "    Processor: KTABLE-FK-JOIN-SUBSCRIPTION-REGISTRATION-0000000007 (stores: [KSTREAM-TOTABLE-STATE-STORE-0000000002])\n" +
+                "    Processor: KTABLE-FK-JOIN-SUBSCRIPTION-REGISTRATION-0000000007 (stores: [])\n" +
                 "      --> KTABLE-SINK-0000000008\n" +
                 "      <-- KSTREAM-TOTABLE-0000000001\n" +
                 "    Processor: KTABLE-TOSTREAM-0000000020 (stores: [])\n" +
@@ -3174,7 +3174,7 @@ public class KStreamImplTest {
                 "    Processor: KTABLE-FK-JOIN-SUBSCRIPTION-PROCESSOR-0000000012 (stores: [KSTREAM-TOTABLE-STATE-STORE-0000000005])\n" +
                 "      --> KTABLE-SINK-0000000015\n" +
                 "      <-- KTABLE-FK-JOIN-SUBSCRIPTION-PROCESSOR-0000000011\n" +
-                "    Processor: KTABLE-FK-JOIN-SUBSCRIPTION-PROCESSOR-0000000013 (stores: [KSTREAM-TOTABLE-STATE-STORE-0000000005, KTABLE-FK-JOIN-SUBSCRIPTION-STATE-STORE-0000000010])\n" +
+                "    Processor: KTABLE-FK-JOIN-SUBSCRIPTION-PROCESSOR-0000000013 (stores: [KTABLE-FK-JOIN-SUBSCRIPTION-STATE-STORE-0000000010])\n" +
                 "      --> KTABLE-SINK-0000000015\n" +
                 "      <-- KSTREAM-TOTABLE-0000000004\n" +
                 "    Sink: KTABLE-SINK-0000000015 (topic: KTABLE-FK-JOIN-SUBSCRIPTION-RESPONSE-0000000014-topic)\n" +
@@ -3254,10 +3254,10 @@ public class KStreamImplTest {
                 "    Processor: KSTREAM-TOTABLE-0000000004 (stores: [KSTREAM-TOTABLE-STATE-STORE-0000000005])\n" +
                 "      --> KTABLE-JOINOTHER-0000000008\n" +
                 "      <-- KSTREAM-SOURCE-0000000003\n" +
-                "    Processor: KTABLE-JOINOTHER-0000000008 (stores: [KSTREAM-TOTABLE-STATE-STORE-0000000005, KSTREAM-TOTABLE-STATE-STORE-0000000002])\n" +
+                "    Processor: KTABLE-JOINOTHER-0000000008 (stores: [KSTREAM-TOTABLE-STATE-STORE-0000000002])\n" +
                 "      --> KTABLE-MERGE-0000000006\n" +
                 "      <-- KSTREAM-TOTABLE-0000000004\n" +
-                "    Processor: KTABLE-JOINTHIS-0000000007 (stores: [KSTREAM-TOTABLE-STATE-STORE-0000000005, KSTREAM-TOTABLE-STATE-STORE-0000000002])\n" +
+                "    Processor: KTABLE-JOINTHIS-0000000007 (stores: [KSTREAM-TOTABLE-STATE-STORE-0000000005])\n" +
                 "      --> KTABLE-MERGE-0000000006\n" +
                 "      <-- KSTREAM-TOTABLE-0000000001\n" +
                 "    Processor: KTABLE-MERGE-0000000006 (stores: [])\n" +