You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by mj...@apache.org on 2023/04/13 02:05:17 UTC

[kafka] branch trunk updated: KAFKA-14834: [5/N] Drop out-of-order records from FK join with versioned tables (#13522)

This is an automated email from the ASF dual-hosted git repository.

mjsax pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 1d5d003ff48 KAFKA-14834: [5/N] Drop out-of-order records from FK join with versioned tables (#13522)
1d5d003ff48 is described below

commit 1d5d003ff48097c17464ebadee58182114ee1a7f
Author: Victoria Xia <vi...@confluent.io>
AuthorDate: Wed Apr 12 22:05:10 2023 -0400

    KAFKA-14834: [5/N] Drop out-of-order records from FK join with versioned tables (#13522)
    
    This PR updates foreign-key table-table join processors to ignore out-of-order records from versioned tables, as specified in KIP-914.
    
    Reviewers: Matthias J. Sax <ma...@confluent.io>
---
 checkstyle/suppressions.xml                        |   6 +-
 .../streams/kstream/internals/KTableImpl.java      |  22 +-
 .../ForeignJoinSubscriptionProcessorSupplier.java  |  36 ++-
 ...reignJoinSubscriptionSendProcessorSupplier.java |  93 ++++---
 .../KTableKTableForeignKeyJoinIntegrationTest.java | 219 +++++++++------
 ...ableForeignKeyVersionedJoinIntegrationTest.java | 301 +++++++++++++++++++++
 .../streams/kstream/internals/KStreamImplTest.java |   4 +-
 7 files changed, 534 insertions(+), 147 deletions(-)

diff --git a/checkstyle/suppressions.xml b/checkstyle/suppressions.xml
index d3e458082f4..28cefdd30c2 100644
--- a/checkstyle/suppressions.xml
+++ b/checkstyle/suppressions.xml
@@ -230,19 +230,19 @@
               files=".*[/\\]streams[/\\].*test[/\\].*.java"/>
 
     <suppress checks="CyclomaticComplexity"
-              files="(EosV2UpgradeIntegrationTest|KStreamKStreamJoinTest|KTableKTableForeignKeyJoinIntegrationTest|RocksDBGenericOptionsToDbOptionsColumnFamilyOptionsAdapterTest|RelationalSmokeTest|MockProcessorContextStateStoreTest).java"/>
+              files="(EosV2UpgradeIntegrationTest|KStreamKStreamJoinTest|KTableKTableForeignKeyJoinIntegrationTest|KTableKTableForeignKeyVersionedJoinIntegrationTest|RocksDBGenericOptionsToDbOptionsColumnFamilyOptionsAdapterTest|RelationalSmokeTest|MockProcessorContextStateStoreTest).java"/>
 
     <suppress checks="JavaNCSS"
               files="(EosV2UpgradeIntegrationTest|KStreamKStreamJoinTest|StreamThreadTest|TaskManagerTest).java"/>
 
     <suppress checks="NPathComplexity"
-              files="(EosV2UpgradeIntegrationTest|EosTestDriver|KStreamKStreamJoinTest|KTableKTableForeignKeyJoinIntegrationTest|RelationalSmokeTest|MockProcessorContextStateStoreTest|TopologyTestDriverTest).java"/>
+              files="(EosV2UpgradeIntegrationTest|EosTestDriver|KStreamKStreamJoinTest|KTableKTableForeignKeyJoinIntegrationTest|KTableKTableForeignKeyVersionedJoinIntegrationTest|RelationalSmokeTest|MockProcessorContextStateStoreTest|TopologyTestDriverTest).java"/>
 
     <suppress checks="(FinalLocalVariable|WhitespaceAround|LocalVariableName|ImportControl|AvoidStarImport)"
               files="Murmur3Test.java"/>
 
     <suppress checks="MethodLength"
-              files="(KStreamSlidingWindowAggregateTest|KStreamKStreamLeftJoinTest|KStreamKStreamOuterJoinTest).java"/>
+              files="(KStreamSlidingWindowAggregateTest|KStreamKStreamLeftJoinTest|KStreamKStreamOuterJoinTest|KTableKTableForeignKeyVersionedJoinIntegrationTest).java"/>
 
     <suppress checks="ClassFanOutComplexity"
               files="StreamTaskTest.java"/>
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableImpl.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableImpl.java
index cd8c5abc280..82438ff59a6 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableImpl.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableImpl.java
@@ -1116,7 +1116,8 @@ public class KTableImpl<K, S, V> extends AbstractStream<K, V> implements KTable<
             keySerde
         );
 
-        final ProcessorGraphNode<K, Change<V>> subscriptionNode = new ProcessorGraphNode<>(
+        final KTableValueGetterSupplier<K, V> primaryKeyValueGetter = valueGetterSupplier();
+        final StatefulProcessorNode<K, Change<V>> subscriptionNode = new StatefulProcessorNode<>(
             new ProcessorParameters<>(
                 new ForeignJoinSubscriptionSendProcessorSupplier<>(
                     foreignKeyExtractor,
@@ -1124,10 +1125,13 @@ public class KTableImpl<K, S, V> extends AbstractStream<K, V> implements KTable<
                     valueHashSerdePseudoTopic,
                     foreignKeySerde,
                     valueSerde == null ? null : valueSerde.serializer(),
-                    leftJoin
+                    leftJoin,
+                    primaryKeyValueGetter
                 ),
                 renamed.suffixWithOrElseGet("-subscription-registration-processor", builder, SUBSCRIPTION_REGISTRATION)
-            )
+            ),
+            Collections.emptySet(),
+            Collections.singleton(primaryKeyValueGetter)
         );
         builder.addGraphNode(graphNode, subscriptionNode);
 
@@ -1179,26 +1183,27 @@ public class KTableImpl<K, S, V> extends AbstractStream<K, V> implements KTable<
             );
         builder.addGraphNode(subscriptionSource, subscriptionReceiveNode);
 
+        final KTableValueGetterSupplier<KO, VO> foreignKeyValueGetter = ((KTableImpl<KO, VO, VO>) foreignKeyTable).valueGetterSupplier();
         final StatefulProcessorNode<CombinedKey<KO, K>, Change<ValueAndTimestamp<SubscriptionWrapper<K>>>> subscriptionJoinForeignNode =
             new StatefulProcessorNode<>(
                 new ProcessorParameters<>(
                     new SubscriptionJoinForeignProcessorSupplier<>(
-                        ((KTableImpl<KO, VO, VO>) foreignKeyTable).valueGetterSupplier()
+                        foreignKeyValueGetter
                     ),
                     renamed.suffixWithOrElseGet("-subscription-join-foreign", builder, SUBSCRIPTION_PROCESSOR)
                 ),
                 Collections.emptySet(),
-                Collections.singleton(((KTableImpl<KO, VO, VO>) foreignKeyTable).valueGetterSupplier())
+                Collections.singleton(foreignKeyValueGetter)
             );
         builder.addGraphNode(subscriptionReceiveNode, subscriptionJoinForeignNode);
 
-        final StatefulProcessorNode<KO, Change<Object>> foreignJoinSubscriptionNode = new StatefulProcessorNode<>(
+        final StatefulProcessorNode<KO, Change<VO>> foreignJoinSubscriptionNode = new StatefulProcessorNode<>(
             new ProcessorParameters<>(
-                new ForeignJoinSubscriptionProcessorSupplier<>(subscriptionStore, combinedKeySchema),
+                new ForeignJoinSubscriptionProcessorSupplier<>(subscriptionStore, combinedKeySchema, foreignKeyValueGetter),
                 renamed.suffixWithOrElseGet("-foreign-join-subscription", builder, SUBSCRIPTION_PROCESSOR)
             ),
             Collections.singleton(subscriptionStore),
-            Collections.emptySet()
+            Collections.singleton(foreignKeyValueGetter)
         );
         builder.addGraphNode(((KTableImpl<KO, VO, ?>) foreignKeyTable).graphNode, foreignJoinSubscriptionNode);
 
@@ -1232,7 +1237,6 @@ public class KTableImpl<K, S, V> extends AbstractStream<K, V> implements KTable<
         resultSourceNodes.add(foreignResponseSource.nodeName());
         builder.internalTopologyBuilder.copartitionSources(resultSourceNodes);
 
-        final KTableValueGetterSupplier<K, V> primaryKeyValueGetter = valueGetterSupplier();
         final SubscriptionResolverJoinProcessorSupplier<K, V, VO, VR> resolverProcessorSupplier = new SubscriptionResolverJoinProcessorSupplier<>(
             primaryKeyValueGetter,
             valueSerde == null ? null : valueSerde.serializer(),
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/foreignkeyjoin/ForeignJoinSubscriptionProcessorSupplier.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/foreignkeyjoin/ForeignJoinSubscriptionProcessorSupplier.java
index 55e40fce64f..46e2bd24c25 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/foreignkeyjoin/ForeignJoinSubscriptionProcessorSupplier.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/foreignkeyjoin/ForeignJoinSubscriptionProcessorSupplier.java
@@ -21,6 +21,8 @@ import org.apache.kafka.common.metrics.Sensor;
 import org.apache.kafka.common.utils.Bytes;
 import org.apache.kafka.streams.KeyValue;
 import org.apache.kafka.streams.kstream.internals.Change;
+import org.apache.kafka.streams.kstream.internals.KTableValueGetter;
+import org.apache.kafka.streams.kstream.internals.KTableValueGetterSupplier;
 import org.apache.kafka.streams.processor.api.ContextualProcessor;
 import org.apache.kafka.streams.processor.api.Processor;
 import org.apache.kafka.streams.processor.api.ProcessorContext;
@@ -43,24 +45,32 @@ public class ForeignJoinSubscriptionProcessorSupplier<K, KO, VO> implements
     private static final Logger LOG = LoggerFactory.getLogger(ForeignJoinSubscriptionProcessorSupplier.class);
     private final StoreBuilder<TimestampedKeyValueStore<Bytes, SubscriptionWrapper<K>>> storeBuilder;
     private final CombinedKeySchema<KO, K> keySchema;
+    private final KTableValueGetterSupplier<KO, VO> foreignKeyValueGetterSupplier;
 
     public ForeignJoinSubscriptionProcessorSupplier(
         final StoreBuilder<TimestampedKeyValueStore<Bytes, SubscriptionWrapper<K>>> storeBuilder,
-        final CombinedKeySchema<KO, K> keySchema) {
+        final CombinedKeySchema<KO, K> keySchema,
+        final KTableValueGetterSupplier<KO, VO> foreignKeyValueGetterSupplier) {
 
         this.storeBuilder = storeBuilder;
         this.keySchema = keySchema;
+        this.foreignKeyValueGetterSupplier = foreignKeyValueGetterSupplier;
     }
 
     @Override
     public Processor<KO, Change<VO>, K, SubscriptionResponseWrapper<VO>> get() {
-        return new KTableKTableJoinProcessor();
+        return new KTableKTableJoinProcessor(foreignKeyValueGetterSupplier.get());
     }
 
 
     private final class KTableKTableJoinProcessor extends ContextualProcessor<KO, Change<VO>, K, SubscriptionResponseWrapper<VO>> {
         private Sensor droppedRecordsSensor;
-        private TimestampedKeyValueStore<Bytes, SubscriptionWrapper<K>> store;
+        private TimestampedKeyValueStore<Bytes, SubscriptionWrapper<K>> subscriptionStore;
+        private final KTableValueGetter<KO, VO> foreignKeyValueGetter;
+
+        private KTableKTableJoinProcessor(final KTableValueGetter<KO, VO> foreignKeyValueGetter) {
+            this.foreignKeyValueGetter = foreignKeyValueGetter;
+        }
 
         @Override
         public void init(final ProcessorContext<K, SubscriptionResponseWrapper<VO>> context) {
@@ -71,7 +81,8 @@ public class ForeignJoinSubscriptionProcessorSupplier<K, KO, VO> implements
                 internalProcessorContext.taskId().toString(),
                 internalProcessorContext.metrics()
             );
-            store = internalProcessorContext.getStateStore(storeBuilder);
+            subscriptionStore = internalProcessorContext.getStateStore(storeBuilder);
+            foreignKeyValueGetter.init(context);
         }
 
         @Override
@@ -95,11 +106,21 @@ public class ForeignJoinSubscriptionProcessorSupplier<K, KO, VO> implements
                 return;
             }
 
+            // drop out-of-order records from versioned tables (cf. KIP-914)
+            if (foreignKeyValueGetter.isVersioned()) {
+                final ValueAndTimestamp<VO> latestValueAndTimestamp = foreignKeyValueGetter.get(record.key());
+                if (latestValueAndTimestamp != null && latestValueAndTimestamp.timestamp() > record.timestamp()) {
+                    LOG.info("Skipping out-of-order record from versioned table while performing table-table join.");
+                    droppedRecordsSensor.record();
+                    return;
+                }
+            }
+
             final Bytes prefixBytes = keySchema.prefixBytes(record.key());
 
             //Perform the prefixScan and propagate the results
             try (final KeyValueIterator<Bytes, ValueAndTimestamp<SubscriptionWrapper<K>>> prefixScanResults =
-                     store.range(prefixBytes, Bytes.increment(prefixBytes))) {
+                     subscriptionStore.range(prefixBytes, Bytes.increment(prefixBytes))) {
 
                 while (prefixScanResults.hasNext()) {
                     final KeyValue<Bytes, ValueAndTimestamp<SubscriptionWrapper<K>>> next = prefixScanResults.next();
@@ -118,6 +139,11 @@ public class ForeignJoinSubscriptionProcessorSupplier<K, KO, VO> implements
             }
         }
 
+        @Override
+        public void close() {
+            foreignKeyValueGetter.close();
+        }
+
         private boolean prefixEquals(final byte[] x, final byte[] y) {
             final int min = Math.min(x.length, y.length);
             final ByteBuffer xSlice = ByteBuffer.wrap(x, 0, min);
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/foreignkeyjoin/ForeignJoinSubscriptionSendProcessorSupplier.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/foreignkeyjoin/ForeignJoinSubscriptionSendProcessorSupplier.java
index 0efe4da2bcb..3d8b5dd222e 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/foreignkeyjoin/ForeignJoinSubscriptionSendProcessorSupplier.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/foreignkeyjoin/ForeignJoinSubscriptionSendProcessorSupplier.java
@@ -21,6 +21,8 @@ import org.apache.kafka.common.metrics.Sensor;
 import org.apache.kafka.common.serialization.Serde;
 import org.apache.kafka.common.serialization.Serializer;
 import org.apache.kafka.streams.kstream.internals.Change;
+import org.apache.kafka.streams.kstream.internals.KTableValueGetter;
+import org.apache.kafka.streams.kstream.internals.KTableValueGetterSupplier;
 import org.apache.kafka.streams.processor.api.ContextualProcessor;
 import org.apache.kafka.streams.processor.api.Processor;
 import org.apache.kafka.streams.processor.api.ProcessorContext;
@@ -29,6 +31,7 @@ import org.apache.kafka.streams.processor.api.Record;
 import org.apache.kafka.streams.processor.api.RecordMetadata;
 import org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl;
 import org.apache.kafka.streams.processor.internals.metrics.TaskMetrics;
+import org.apache.kafka.streams.state.ValueAndTimestamp;
 import org.apache.kafka.streams.state.internals.Murmur3;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -49,6 +52,7 @@ public class ForeignJoinSubscriptionSendProcessorSupplier<K, KO, V> implements P
     private final Supplier<String> foreignKeySerdeTopicSupplier;
     private final Supplier<String> valueSerdeTopicSupplier;
     private final boolean leftJoin;
+    private final KTableValueGetterSupplier<K, V> primaryKeyValueGetterSupplier;
     private Serializer<KO> foreignKeySerializer;
     private Serializer<V> valueSerializer;
 
@@ -57,18 +61,20 @@ public class ForeignJoinSubscriptionSendProcessorSupplier<K, KO, V> implements P
                                                         final Supplier<String> valueSerdeTopicSupplier,
                                                         final Serde<KO> foreignKeySerde,
                                                         final Serializer<V> valueSerializer,
-                                                        final boolean leftJoin) {
+                                                        final boolean leftJoin,
+                                                        final KTableValueGetterSupplier<K, V> primaryKeyValueGetterSupplier) {
         this.foreignKeyExtractor = foreignKeyExtractor;
         this.foreignKeySerdeTopicSupplier = foreignKeySerdeTopicSupplier;
         this.valueSerdeTopicSupplier = valueSerdeTopicSupplier;
         this.valueSerializer = valueSerializer;
         this.leftJoin = leftJoin;
+        this.primaryKeyValueGetterSupplier = primaryKeyValueGetterSupplier;
         foreignKeySerializer = foreignKeySerde == null ? null : foreignKeySerde.serializer();
     }
 
     @Override
     public Processor<K, Change<V>, KO, SubscriptionWrapper<K>> get() {
-        return new UnbindChangeProcessor();
+        return new UnbindChangeProcessor(primaryKeyValueGetterSupplier.get());
     }
 
     private class UnbindChangeProcessor extends ContextualProcessor<K, Change<V>, KO, SubscriptionWrapper<K>> {
@@ -76,6 +82,11 @@ public class ForeignJoinSubscriptionSendProcessorSupplier<K, KO, V> implements P
         private Sensor droppedRecordsSensor;
         private String foreignKeySerdeTopic;
         private String valueSerdeTopic;
+        private final KTableValueGetter<K, V> primaryKeyValueGetter;
+
+        private UnbindChangeProcessor(final KTableValueGetter<K, V> primaryKeyValueGetter) {
+            this.primaryKeyValueGetter = primaryKeyValueGetter;
+        }
 
         @SuppressWarnings("unchecked")
         @Override
@@ -95,10 +106,25 @@ public class ForeignJoinSubscriptionSendProcessorSupplier<K, KO, V> implements P
                 context.taskId().toString(),
                 (StreamsMetricsImpl) context.metrics()
             );
+            primaryKeyValueGetter.init(context);
         }
 
         @Override
         public void process(final Record<K, Change<V>> record) {
+            // drop out-of-order records from versioned tables (cf. KIP-914)
+            if (primaryKeyValueGetter.isVersioned()) {
+                // key-value stores do not contain data for null keys, so skip the check
+                // if the key is null
+                if (record.key() != null) {
+                    final ValueAndTimestamp<V> latestValueAndTimestamp = primaryKeyValueGetter.get(record.key());
+                    if (latestValueAndTimestamp != null && latestValueAndTimestamp.timestamp() > record.timestamp()) {
+                        LOG.info("Skipping out-of-order record from versioned table while performing table-table join.");
+                        droppedRecordsSensor.record();
+                        return;
+                    }
+                }
+            }
+
             final long[] currentHash = record.value().newValue == null ?
                 null :
                 Murmur3.hash128(valueSerializer.serialize(valueSerdeTopic, record.value().newValue));
@@ -107,37 +133,13 @@ public class ForeignJoinSubscriptionSendProcessorSupplier<K, KO, V> implements P
             if (record.value().oldValue != null) {
                 final KO oldForeignKey = foreignKeyExtractor.apply(record.value().oldValue);
                 if (oldForeignKey == null) {
-                    if (context().recordMetadata().isPresent()) {
-                        final RecordMetadata recordMetadata = context().recordMetadata().get();
-                        LOG.warn(
-                            "Skipping record due to null foreign key. "
-                                + "topic=[{}] partition=[{}] offset=[{}]",
-                            recordMetadata.topic(), recordMetadata.partition(), recordMetadata.offset()
-                        );
-                    } else {
-                        LOG.warn(
-                            "Skipping record due to null foreign key. Topic, partition, and offset not known."
-                        );
-                    }
-                    droppedRecordsSensor.record();
+                    logSkippedRecordDueToNullForeignKey();
                     return;
                 }
                 if (record.value().newValue != null) {
                     final KO newForeignKey = foreignKeyExtractor.apply(record.value().newValue);
                     if (newForeignKey == null) {
-                        if (context().recordMetadata().isPresent()) {
-                            final RecordMetadata recordMetadata = context().recordMetadata().get();
-                            LOG.warn(
-                                "Skipping record due to null foreign key. "
-                                    + "topic=[{}] partition=[{}] offset=[{}]",
-                                recordMetadata.topic(), recordMetadata.partition(), recordMetadata.offset()
-                            );
-                        } else {
-                            LOG.warn(
-                                "Skipping record due to null foreign key. Topic, partition, and offset not known."
-                            );
-                        }
-                        droppedRecordsSensor.record();
+                        logSkippedRecordDueToNullForeignKey();
                         return;
                     }
 
@@ -193,19 +195,7 @@ public class ForeignJoinSubscriptionSendProcessorSupplier<K, KO, V> implements P
                 }
                 final KO newForeignKey = foreignKeyExtractor.apply(record.value().newValue);
                 if (newForeignKey == null) {
-                    if (context().recordMetadata().isPresent()) {
-                        final RecordMetadata recordMetadata = context().recordMetadata().get();
-                        LOG.warn(
-                            "Skipping record due to null foreign key. "
-                                + "topic=[{}] partition=[{}] offset=[{}]",
-                            recordMetadata.topic(), recordMetadata.partition(), recordMetadata.offset()
-                        );
-                    } else {
-                        LOG.warn(
-                            "Skipping record due to null foreign key. Topic, partition, and offset not known."
-                        );
-                    }
-                    droppedRecordsSensor.record();
+                    logSkippedRecordDueToNullForeignKey();
                 } else {
                     context().forward(
                         record.withKey(newForeignKey)
@@ -217,5 +207,26 @@ public class ForeignJoinSubscriptionSendProcessorSupplier<K, KO, V> implements P
                 }
             }
         }
+
+        @Override
+        public void close() {
+            primaryKeyValueGetter.close();
+        }
+
+        private void logSkippedRecordDueToNullForeignKey() {
+            if (context().recordMetadata().isPresent()) {
+                final RecordMetadata recordMetadata = context().recordMetadata().get();
+                LOG.warn(
+                    "Skipping record due to null foreign key. "
+                        + "topic=[{}] partition=[{}] offset=[{}]",
+                    recordMetadata.topic(), recordMetadata.partition(), recordMetadata.offset()
+                );
+            } else {
+                LOG.warn(
+                    "Skipping record due to null foreign key. Topic, partition, and offset not known."
+                );
+            }
+            droppedRecordsSensor.record();
+        }
     }
 }
diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/KTableKTableForeignKeyJoinIntegrationTest.java b/streams/src/test/java/org/apache/kafka/streams/integration/KTableKTableForeignKeyJoinIntegrationTest.java
index 931aaf8e53e..04b124a726b 100644
--- a/streams/src/test/java/org/apache/kafka/streams/integration/KTableKTableForeignKeyJoinIntegrationTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/integration/KTableKTableForeignKeyJoinIntegrationTest.java
@@ -16,10 +16,12 @@
  */
 package org.apache.kafka.streams.integration;
 
+import java.time.Duration;
 import org.apache.kafka.common.serialization.Serdes;
 import org.apache.kafka.common.serialization.StringDeserializer;
 import org.apache.kafka.common.serialization.StringSerializer;
 import org.apache.kafka.common.utils.Bytes;
+import org.apache.kafka.common.utils.MockTime;
 import org.apache.kafka.streams.StreamsBuilder;
 import org.apache.kafka.streams.StreamsConfig;
 import org.apache.kafka.streams.TestInputTopic;
@@ -66,25 +68,45 @@ import static org.hamcrest.MatcherAssert.assertThat;
 public class KTableKTableForeignKeyJoinIntegrationTest {
     @Rule
     public Timeout globalTimeout = Timeout.seconds(600);
-    private static final String LEFT_TABLE = "left_table";
-    private static final String RIGHT_TABLE = "right_table";
-    private static final String OUTPUT = "output-topic";
+    protected static final String LEFT_TABLE = "left_table";
+    protected static final String RIGHT_TABLE = "right_table";
+    protected static final String OUTPUT = "output-topic";
     private static final String REJOIN_OUTPUT = "rejoin-output-topic";
-    private final boolean leftJoin;
-    private final boolean materialized;
+
+    private final MockTime time = new MockTime();
+
+    protected final boolean leftJoin;
+    protected final boolean materialized;
     private final String optimization;
-    private final boolean rejoin;
+    protected final boolean rejoin;
+    protected final boolean leftVersioned;
+    protected final boolean rightVersioned;
 
-    private Properties streamsConfig;
+    protected Properties streamsConfig;
+    protected long baseTimestamp;
 
     public KTableKTableForeignKeyJoinIntegrationTest(final boolean leftJoin,
                                                      final String optimization,
                                                      final boolean materialized,
                                                      final boolean rejoin) {
+        // versioning is disabled for these tests, even though the code supports building a
+        // topology with versioned tables, since KTableKTableForeignKeyVersionedJoinIntegrationTest
+        // extends this test class.
+        this(leftJoin, optimization, materialized, rejoin, false, false);
+    }
+
+    protected KTableKTableForeignKeyJoinIntegrationTest(final boolean leftJoin,
+                                                        final String optimization,
+                                                        final boolean materialized,
+                                                        final boolean rejoin,
+                                                        final boolean leftVersioned,
+                                                        final boolean rightVersioned) {
         this.rejoin = rejoin;
         this.leftJoin = leftJoin;
         this.materialized = materialized;
         this.optimization = optimization;
+        this.leftVersioned = leftVersioned;
+        this.rightVersioned = rightVersioned;
     }
 
     @Rule
@@ -96,6 +118,7 @@ public class KTableKTableForeignKeyJoinIntegrationTest {
             mkEntry(StreamsConfig.STATE_DIR_CONFIG, TestUtils.tempDirectory().getPath()),
             mkEntry(StreamsConfig.TOPOLOGY_OPTIMIZATION_CONFIG, optimization)
         ));
+        baseTimestamp = time.milliseconds();
     }
 
     @Parameterized.Parameters(name = "leftJoin={0}, optimization={1}, materialized={2}, rejoin={3}")
@@ -105,7 +128,7 @@ public class KTableKTableForeignKeyJoinIntegrationTest {
         return buildParameters(booleans, optimizations, booleans, booleans);
     }
 
-    private static Collection<Object[]> buildParameters(final List<?>... argOptions) {
+    protected static Collection<Object[]> buildParameters(final List<?>... argOptions) {
         List<Object[]> result = new LinkedList<>();
         result.add(new Object[0]);
 
@@ -131,7 +154,7 @@ public class KTableKTableForeignKeyJoinIntegrationTest {
 
     @Test
     public void doJoinFromLeftThenDeleteLeftEntity() {
-        final Topology topology = getTopology(streamsConfig, materialized ? "store" : null, leftJoin, rejoin);
+        final Topology topology = getTopology(streamsConfig, materialized ? "store" : null, leftJoin, rejoin, leftVersioned, rightVersioned);
         try (final TopologyTestDriver driver = new TopologyTestDriver(topology, streamsConfig)) {
             final TestInputTopic<String, String> right = driver.createInputTopic(RIGHT_TABLE, new StringSerializer(), new StringSerializer());
             final TestInputTopic<String, String> left = driver.createInputTopic(LEFT_TABLE, new StringSerializer(), new StringSerializer());
@@ -140,9 +163,9 @@ public class KTableKTableForeignKeyJoinIntegrationTest {
             final KeyValueStore<String, String> store = driver.getKeyValueStore("store");
 
             // Pre-populate the RHS records. This test is all about what happens when we add/remove LHS records
-            right.pipeInput("rhs1", "rhsValue1");
-            right.pipeInput("rhs2", "rhsValue2");
-            right.pipeInput("rhs3", "rhsValue3"); // this unreferenced FK won't show up in any results
+            right.pipeInput("rhs1", "rhsValue1", baseTimestamp);
+            right.pipeInput("rhs2", "rhsValue2", baseTimestamp + 1);
+            right.pipeInput("rhs3", "rhsValue3", baseTimestamp + 2); // this unreferenced FK won't show up in any results
 
             assertThat(
                 outputTopic.readKeyValuesToMap(),
@@ -161,8 +184,8 @@ public class KTableKTableForeignKeyJoinIntegrationTest {
                 );
             }
 
-            left.pipeInput("lhs1", "lhsValue1|rhs1");
-            left.pipeInput("lhs2", "lhsValue2|rhs2");
+            left.pipeInput("lhs1", "lhsValue1|rhs1", baseTimestamp + 3);
+            left.pipeInput("lhs2", "lhsValue2|rhs2", baseTimestamp + 4);
 
             {
                 final Map<String, String> expected = mkMap(
@@ -191,7 +214,7 @@ public class KTableKTableForeignKeyJoinIntegrationTest {
             }
 
             // Add another reference to an existing FK
-            left.pipeInput("lhs3", "lhsValue3|rhs1");
+            left.pipeInput("lhs3", "lhsValue3|rhs1", baseTimestamp + 5);
             {
                 assertThat(
                     outputTopic.readKeyValuesToMap(),
@@ -220,7 +243,7 @@ public class KTableKTableForeignKeyJoinIntegrationTest {
             }
             // Now delete one LHS entity such that one delete is propagated down to the output.
 
-            left.pipeInput("lhs1", (String) null);
+            left.pipeInput("lhs1", (String) null, baseTimestamp + 6);
             assertThat(
                 outputTopic.readKeyValuesToMap(),
                 is(mkMap(
@@ -249,7 +272,7 @@ public class KTableKTableForeignKeyJoinIntegrationTest {
 
     @Test
     public void doJoinFromRightThenDeleteRightEntity() {
-        final Topology topology = getTopology(streamsConfig, materialized ? "store" : null, leftJoin, rejoin);
+        final Topology topology = getTopology(streamsConfig, materialized ? "store" : null, leftJoin, rejoin, leftVersioned, rightVersioned);
         try (final TopologyTestDriver driver = new TopologyTestDriver(topology, streamsConfig)) {
             final TestInputTopic<String, String> right = driver.createInputTopic(RIGHT_TABLE, new StringSerializer(), new StringSerializer());
             final TestInputTopic<String, String> left = driver.createInputTopic(LEFT_TABLE, new StringSerializer(), new StringSerializer());
@@ -257,54 +280,54 @@ public class KTableKTableForeignKeyJoinIntegrationTest {
             final KeyValueStore<String, String> store = driver.getKeyValueStore("store");
 
             // Pre-populate the LHS records. This test is all about what happens when we add/remove RHS records
-            left.pipeInput("lhs1", "lhsValue1|rhs1");
-            left.pipeInput("lhs2", "lhsValue2|rhs2");
-            left.pipeInput("lhs3", "lhsValue3|rhs1");
+            left.pipeInput("lhs1", "lhsValue1|rhs1", baseTimestamp);
+            left.pipeInput("lhs2", "lhsValue2|rhs2", baseTimestamp + 1);
+            left.pipeInput("lhs3", "lhsValue3|rhs1", baseTimestamp + 2);
 
             assertThat(
                 outputTopic.readKeyValuesToMap(),
                 is(leftJoin
-                       ? mkMap(mkEntry("lhs1", "(lhsValue1|rhs1,null)"),
-                               mkEntry("lhs2", "(lhsValue2|rhs2,null)"),
-                               mkEntry("lhs3", "(lhsValue3|rhs1,null)"))
-                       : emptyMap()
+                    ? mkMap(mkEntry("lhs1", "(lhsValue1|rhs1,null)"),
+                    mkEntry("lhs2", "(lhsValue2|rhs2,null)"),
+                    mkEntry("lhs3", "(lhsValue3|rhs1,null)"))
+                    : emptyMap()
                 )
             );
             if (materialized) {
                 assertThat(
                     asMap(store),
                     is(leftJoin
-                           ? mkMap(mkEntry("lhs1", "(lhsValue1|rhs1,null)"),
-                                   mkEntry("lhs2", "(lhsValue2|rhs2,null)"),
-                                   mkEntry("lhs3", "(lhsValue3|rhs1,null)"))
-                           : emptyMap()
+                        ? mkMap(mkEntry("lhs1", "(lhsValue1|rhs1,null)"),
+                        mkEntry("lhs2", "(lhsValue2|rhs2,null)"),
+                        mkEntry("lhs3", "(lhsValue3|rhs1,null)"))
+                        : emptyMap()
                     )
                 );
             }
 
-            right.pipeInput("rhs1", "rhsValue1");
+            right.pipeInput("rhs1", "rhsValue1", baseTimestamp + 3);
 
             assertThat(
                 outputTopic.readKeyValuesToMap(),
                 is(mkMap(mkEntry("lhs1", "(lhsValue1|rhs1,rhsValue1)"),
-                         mkEntry("lhs3", "(lhsValue3|rhs1,rhsValue1)"))
+                    mkEntry("lhs3", "(lhsValue3|rhs1,rhsValue1)"))
                 )
             );
             if (materialized) {
                 assertThat(
                     asMap(store),
                     is(leftJoin
-                           ? mkMap(mkEntry("lhs1", "(lhsValue1|rhs1,rhsValue1)"),
-                                   mkEntry("lhs2", "(lhsValue2|rhs2,null)"),
-                                   mkEntry("lhs3", "(lhsValue3|rhs1,rhsValue1)"))
+                        ? mkMap(mkEntry("lhs1", "(lhsValue1|rhs1,rhsValue1)"),
+                        mkEntry("lhs2", "(lhsValue2|rhs2,null)"),
+                        mkEntry("lhs3", "(lhsValue3|rhs1,rhsValue1)"))
 
-                           : mkMap(mkEntry("lhs1", "(lhsValue1|rhs1,rhsValue1)"),
-                                   mkEntry("lhs3", "(lhsValue3|rhs1,rhsValue1)"))
+                        : mkMap(mkEntry("lhs1", "(lhsValue1|rhs1,rhsValue1)"),
+                        mkEntry("lhs3", "(lhsValue3|rhs1,rhsValue1)"))
                     )
                 );
             }
 
-            right.pipeInput("rhs2", "rhsValue2");
+            right.pipeInput("rhs2", "rhsValue2", baseTimestamp + 4);
 
             assertThat(
                 outputTopic.readKeyValuesToMap(),
@@ -314,13 +337,13 @@ public class KTableKTableForeignKeyJoinIntegrationTest {
                 assertThat(
                     asMap(store),
                     is(mkMap(mkEntry("lhs1", "(lhsValue1|rhs1,rhsValue1)"),
-                             mkEntry("lhs2", "(lhsValue2|rhs2,rhsValue2)"),
-                             mkEntry("lhs3", "(lhsValue3|rhs1,rhsValue1)"))
+                        mkEntry("lhs2", "(lhsValue2|rhs2,rhsValue2)"),
+                        mkEntry("lhs3", "(lhsValue3|rhs1,rhsValue1)"))
                     )
                 );
             }
 
-            right.pipeInput("rhs3", "rhsValue3"); // this unreferenced FK won't show up in any results
+            right.pipeInput("rhs3", "rhsValue3", baseTimestamp + 5); // this unreferenced FK won't show up in any results
 
             assertThat(
                 outputTopic.readKeyValuesToMap(),
@@ -330,30 +353,30 @@ public class KTableKTableForeignKeyJoinIntegrationTest {
                 assertThat(
                     asMap(store),
                     is(mkMap(mkEntry("lhs1", "(lhsValue1|rhs1,rhsValue1)"),
-                             mkEntry("lhs2", "(lhsValue2|rhs2,rhsValue2)"),
-                             mkEntry("lhs3", "(lhsValue3|rhs1,rhsValue1)"))
+                        mkEntry("lhs2", "(lhsValue2|rhs2,rhsValue2)"),
+                        mkEntry("lhs3", "(lhsValue3|rhs1,rhsValue1)"))
                     )
                 );
             }
 
             // Now delete the RHS entity such that all matching keys have deletes propagated.
-            right.pipeInput("rhs1", (String) null);
+            right.pipeInput("rhs1", (String) null, baseTimestamp + 6);
 
             assertThat(
                 outputTopic.readKeyValuesToMap(),
                 is(mkMap(mkEntry("lhs1", leftJoin ? "(lhsValue1|rhs1,null)" : null),
-                         mkEntry("lhs3", leftJoin ? "(lhsValue3|rhs1,null)" : null))
+                    mkEntry("lhs3", leftJoin ? "(lhsValue3|rhs1,null)" : null))
                 )
             );
             if (materialized) {
                 assertThat(
                     asMap(store),
                     is(leftJoin
-                           ? mkMap(mkEntry("lhs1", "(lhsValue1|rhs1,null)"),
-                                   mkEntry("lhs2", "(lhsValue2|rhs2,rhsValue2)"),
-                                   mkEntry("lhs3", "(lhsValue3|rhs1,null)"))
+                        ? mkMap(mkEntry("lhs1", "(lhsValue1|rhs1,null)"),
+                        mkEntry("lhs2", "(lhsValue2|rhs2,rhsValue2)"),
+                        mkEntry("lhs3", "(lhsValue3|rhs1,null)"))
 
-                           : mkMap(mkEntry("lhs2", "(lhsValue2|rhs2,rhsValue2)"))
+                        : mkMap(mkEntry("lhs2", "(lhsValue2|rhs2,rhsValue2)"))
                     )
                 );
             }
@@ -362,13 +385,13 @@ public class KTableKTableForeignKeyJoinIntegrationTest {
 
     @Test
     public void shouldEmitTombstoneWhenDeletingNonJoiningRecords() {
-        final Topology topology = getTopology(streamsConfig, materialized ? "store" : null, leftJoin, rejoin);
+        final Topology topology = getTopology(streamsConfig, materialized ? "store" : null, leftJoin, rejoin, leftVersioned, rightVersioned);
         try (final TopologyTestDriver driver = new TopologyTestDriver(topology, streamsConfig)) {
             final TestInputTopic<String, String> left = driver.createInputTopic(LEFT_TABLE, new StringSerializer(), new StringSerializer());
             final TestOutputTopic<String, String> outputTopic = driver.createOutputTopic(OUTPUT, new StringDeserializer(), new StringDeserializer());
             final KeyValueStore<String, String> store = driver.getKeyValueStore("store");
 
-            left.pipeInput("lhs1", "lhsValue1|rhs1");
+            left.pipeInput("lhs1", "lhsValue1|rhs1", baseTimestamp);
 
             {
                 final Map<String, String> expected =
@@ -388,7 +411,7 @@ public class KTableKTableForeignKeyJoinIntegrationTest {
             // Deleting a non-joining record produces an unnecessary tombstone for inner joins, because
             // it's not possible to know whether a result was previously emitted.
             // For the left join, the tombstone is necessary.
-            left.pipeInput("lhs1", (String) null);
+            left.pipeInput("lhs1", (String) null, baseTimestamp + 1);
             {
                 assertThat(
                     outputTopic.readKeyValuesToMap(),
@@ -403,7 +426,7 @@ public class KTableKTableForeignKeyJoinIntegrationTest {
             }
 
             // Deleting a non-existing record is idempotent
-            left.pipeInput("lhs1", (String) null);
+            left.pipeInput("lhs1", (String) null, baseTimestamp + 2);
             {
                 assertThat(
                     outputTopic.readKeyValuesToMap(),
@@ -421,14 +444,14 @@ public class KTableKTableForeignKeyJoinIntegrationTest {
 
     @Test
     public void shouldNotEmitTombstonesWhenDeletingNonExistingRecords() {
-        final Topology topology = getTopology(streamsConfig, materialized ? "store" : null, leftJoin, rejoin);
+        final Topology topology = getTopology(streamsConfig, materialized ? "store" : null, leftJoin, rejoin, leftVersioned, rightVersioned);
         try (final TopologyTestDriver driver = new TopologyTestDriver(topology, streamsConfig)) {
             final TestInputTopic<String, String> left = driver.createInputTopic(LEFT_TABLE, new StringSerializer(), new StringSerializer());
             final TestOutputTopic<String, String> outputTopic = driver.createOutputTopic(OUTPUT, new StringDeserializer(), new StringDeserializer());
             final KeyValueStore<String, String> store = driver.getKeyValueStore("store");
 
             // Deleting a record that never existed doesn't need to emit tombstones.
-            left.pipeInput("lhs1", (String) null);
+            left.pipeInput("lhs1", (String) null, baseTimestamp);
             {
                 assertThat(
                     outputTopic.readKeyValuesToMap(),
@@ -446,14 +469,14 @@ public class KTableKTableForeignKeyJoinIntegrationTest {
 
     @Test
     public void joinShouldProduceNullsWhenValueHasNonMatchingForeignKey() {
-        final Topology topology = getTopology(streamsConfig, materialized ? "store" : null, leftJoin, rejoin);
+        final Topology topology = getTopology(streamsConfig, materialized ? "store" : null, leftJoin, rejoin, leftVersioned, rightVersioned);
         try (final TopologyTestDriver driver = new TopologyTestDriver(topology, streamsConfig)) {
             final TestInputTopic<String, String> right = driver.createInputTopic(RIGHT_TABLE, new StringSerializer(), new StringSerializer());
             final TestInputTopic<String, String> left = driver.createInputTopic(LEFT_TABLE, new StringSerializer(), new StringSerializer());
             final TestOutputTopic<String, String> outputTopic = driver.createOutputTopic(OUTPUT, new StringDeserializer(), new StringDeserializer());
             final KeyValueStore<String, String> store = driver.getKeyValueStore("store");
 
-            left.pipeInput("lhs1", "lhsValue1|rhs1");
+            left.pipeInput("lhs1", "lhsValue1|rhs1", baseTimestamp);
             // no output for a new inner join on a non-existent FK
             // the left join of course emits the half-joined output
             assertThat(
@@ -470,7 +493,7 @@ public class KTableKTableForeignKeyJoinIntegrationTest {
             // since it impossible to know whether the prior FK existed or not (and thus whether any results have
             // previously been emitted)
             // The left join emits a _necessary_ update (since the lhs record has actually changed)
-            left.pipeInput("lhs1", "lhsValue1|rhs2");
+            left.pipeInput("lhs1", "lhsValue1|rhs2", baseTimestamp + 1);
             assertThat(
                 outputTopic.readKeyValuesToMap(),
                 is(mkMap(mkEntry("lhs1", leftJoin ? "(lhsValue1|rhs2,null)" : null)))
@@ -482,7 +505,7 @@ public class KTableKTableForeignKeyJoinIntegrationTest {
                 );
             }
             // of course, moving it again to yet another non-existent FK has the same effect
-            left.pipeInput("lhs1", "lhsValue1|rhs3");
+            left.pipeInput("lhs1", "lhsValue1|rhs3", baseTimestamp + 2);
             assertThat(
                 outputTopic.readKeyValuesToMap(),
                 is(mkMap(mkEntry("lhs1", leftJoin ? "(lhsValue1|rhs3,null)" : null)))
@@ -497,7 +520,7 @@ public class KTableKTableForeignKeyJoinIntegrationTest {
             // Adding an RHS record now, so that we can demonstrate "moving" from a non-existent FK to an existent one
             // This RHS key was previously referenced, but it's not referenced now, so adding this record should
             // result in no changes whatsoever.
-            right.pipeInput("rhs1", "rhsValue1");
+            right.pipeInput("rhs1", "rhsValue1", baseTimestamp + 3);
             assertThat(
                 outputTopic.readKeyValuesToMap(),
                 is(emptyMap())
@@ -510,7 +533,7 @@ public class KTableKTableForeignKeyJoinIntegrationTest {
             }
 
             // now, we change to a FK that exists, and see the join completes
-            left.pipeInput("lhs1", "lhsValue1|rhs1");
+            left.pipeInput("lhs1", "lhsValue1|rhs1", baseTimestamp + 4);
             assertThat(
                 outputTopic.readKeyValuesToMap(),
                 is(mkMap(
@@ -528,7 +551,7 @@ public class KTableKTableForeignKeyJoinIntegrationTest {
 
             // but if we update it again to a non-existent one, we'll get a tombstone for the inner join, and the
             // left join updates appropriately.
-            left.pipeInput("lhs1", "lhsValue1|rhs2");
+            left.pipeInput("lhs1", "lhsValue1|rhs2", baseTimestamp + 5);
             assertThat(
                 outputTopic.readKeyValuesToMap(),
                 is(mkMap(
@@ -546,7 +569,7 @@ public class KTableKTableForeignKeyJoinIntegrationTest {
 
     @Test
     public void shouldUnsubscribeOldForeignKeyIfLeftSideIsUpdated() {
-        final Topology topology = getTopology(streamsConfig, materialized ? "store" : null, leftJoin, rejoin);
+        final Topology topology = getTopology(streamsConfig, materialized ? "store" : null, leftJoin, rejoin, leftVersioned, rightVersioned);
         try (final TopologyTestDriver driver = new TopologyTestDriver(topology, streamsConfig)) {
             final TestInputTopic<String, String> right = driver.createInputTopic(RIGHT_TABLE, new StringSerializer(), new StringSerializer());
             final TestInputTopic<String, String> left = driver.createInputTopic(LEFT_TABLE, new StringSerializer(), new StringSerializer());
@@ -555,8 +578,8 @@ public class KTableKTableForeignKeyJoinIntegrationTest {
 
             // Pre-populate the RHS records. This test is all about what happens when we change LHS records foreign key reference
             // then populate update on RHS
-            right.pipeInput("rhs1", "rhsValue1");
-            right.pipeInput("rhs2", "rhsValue2");
+            right.pipeInput("rhs1", "rhsValue1", baseTimestamp);
+            right.pipeInput("rhs2", "rhsValue2", baseTimestamp + 1);
 
             assertThat(
                 outputTopic.readKeyValuesToMap(),
@@ -569,7 +592,7 @@ public class KTableKTableForeignKeyJoinIntegrationTest {
                 );
             }
 
-            left.pipeInput("lhs1", "lhsValue1|rhs1");
+            left.pipeInput("lhs1", "lhsValue1|rhs1", baseTimestamp + 2);
             {
                 final Map<String, String> expected = mkMap(
                     mkEntry("lhs1", "(lhsValue1|rhs1,rhsValue1)")
@@ -587,7 +610,7 @@ public class KTableKTableForeignKeyJoinIntegrationTest {
             }
 
             // Change LHS foreign key reference
-            left.pipeInput("lhs1", "lhsValue1|rhs2");
+            left.pipeInput("lhs1", "lhsValue1|rhs2", baseTimestamp + 3);
             {
                 final Map<String, String> expected = mkMap(
                     mkEntry("lhs1", "(lhsValue1|rhs2,rhsValue2)")
@@ -605,7 +628,7 @@ public class KTableKTableForeignKeyJoinIntegrationTest {
             }
 
             // Populate RHS update on old LHS foreign key ref
-            right.pipeInput("rhs1", "rhsValue1Delta");
+            right.pipeInput("rhs1", "rhsValue1Delta", baseTimestamp + 4);
             {
                 assertThat(
                     outputTopic.readKeyValuesToMap(),
@@ -623,29 +646,52 @@ public class KTableKTableForeignKeyJoinIntegrationTest {
         }
     }
 
-    private static Map<String, String> asMap(final KeyValueStore<String, String> store) {
+    protected static Map<String, String> asMap(final KeyValueStore<String, String> store) {
         final HashMap<String, String> result = new HashMap<>();
         store.all().forEachRemaining(kv -> result.put(kv.key, kv.value));
         return result;
     }
 
-    private static Topology getTopology(final Properties streamsConfig,
-                                        final String queryableStoreName,
-                                        final boolean leftJoin,
-                                        final boolean rejoin) {
+    protected static Topology getTopology(final Properties streamsConfig,
+                                          final String queryableStoreName,
+                                          final boolean leftJoin,
+                                          final boolean rejoin,
+                                          final boolean leftVersioned,
+                                          final boolean rightVersioned) {
         final UniqueTopicSerdeScope serdeScope = new UniqueTopicSerdeScope();
         final StreamsBuilder builder = new StreamsBuilder();
 
-        final KTable<String, String> left = builder.table(
-            LEFT_TABLE,
-            Consumed.with(serdeScope.decorateSerde(Serdes.String(), streamsConfig, true),
-                          serdeScope.decorateSerde(Serdes.String(), streamsConfig, false))
-        );
-        final KTable<String, String> right = builder.table(
-            RIGHT_TABLE,
-            Consumed.with(serdeScope.decorateSerde(Serdes.String(), streamsConfig, true),
-                          serdeScope.decorateSerde(Serdes.String(), streamsConfig, false))
-        );
+        final KTable<String, String> left;
+        if (leftVersioned) {
+            left = builder.table(
+                LEFT_TABLE,
+                Consumed.with(serdeScope.decorateSerde(Serdes.String(), streamsConfig, true),
+                    serdeScope.decorateSerde(Serdes.String(), streamsConfig, false)),
+                Materialized.as(Stores.persistentVersionedKeyValueStore("left", Duration.ofMinutes(5)))
+            );
+        } else {
+            left = builder.table(
+                LEFT_TABLE,
+                Consumed.with(serdeScope.decorateSerde(Serdes.String(), streamsConfig, true),
+                    serdeScope.decorateSerde(Serdes.String(), streamsConfig, false))
+            );
+        }
+
+        final KTable<String, String> right;
+        if (rightVersioned) {
+            right = builder.table(
+                RIGHT_TABLE,
+                Consumed.with(serdeScope.decorateSerde(Serdes.String(), streamsConfig, true),
+                    serdeScope.decorateSerde(Serdes.String(), streamsConfig, false)),
+                Materialized.as(Stores.persistentVersionedKeyValueStore("right", Duration.ofMinutes(5)))
+            );
+        } else {
+            right = builder.table(
+                RIGHT_TABLE,
+                Consumed.with(serdeScope.decorateSerde(Serdes.String(), streamsConfig, true),
+                    serdeScope.decorateSerde(Serdes.String(), streamsConfig, false))
+            );
+        }
 
         final Function<String, String> extractor = value -> value.split("\\|")[1];
         final ValueJoiner<String, String, String> joiner = (value1, value2) -> "(" + value1 + "," + value2 + ")";
@@ -678,13 +724,13 @@ public class KTableKTableForeignKeyJoinIntegrationTest {
                 left.leftJoin(right, extractor, joiner, mainMaterialized);
 
             fkJoin.toStream()
-                  .to(OUTPUT);
+                .to(OUTPUT);
 
             // also make sure the FK join is set up right for downstream operations that require materialization
             if (rejoin) {
                 fkJoin.leftJoin(left, rejoiner, rejoinMaterialized)
-                      .toStream()
-                      .to(REJOIN_OUTPUT);
+                    .toStream()
+                    .to(REJOIN_OUTPUT);
             }
         } else {
             final KTable<String, String> fkJoin = left.join(right, extractor, joiner, mainMaterialized);
@@ -696,12 +742,11 @@ public class KTableKTableForeignKeyJoinIntegrationTest {
             // also make sure the FK join is set up right for downstream operations that require materialization
             if (rejoin) {
                 fkJoin.join(left, rejoiner, rejoinMaterialized)
-                      .toStream()
-                      .to(REJOIN_OUTPUT);
+                    .toStream()
+                    .to(REJOIN_OUTPUT);
             }
         }
 
-
         return builder.build(streamsConfig);
     }
 }
diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/KTableKTableForeignKeyVersionedJoinIntegrationTest.java b/streams/src/test/java/org/apache/kafka/streams/integration/KTableKTableForeignKeyVersionedJoinIntegrationTest.java
new file mode 100644
index 00000000000..10f632ffdea
--- /dev/null
+++ b/streams/src/test/java/org/apache/kafka/streams/integration/KTableKTableForeignKeyVersionedJoinIntegrationTest.java
@@ -0,0 +1,301 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.integration;
+
+import static java.util.Collections.emptyMap;
+import static org.apache.kafka.common.utils.Utils.mkEntry;
+import static org.apache.kafka.common.utils.Utils.mkMap;
+import static org.hamcrest.CoreMatchers.is;
+import static org.hamcrest.MatcherAssert.assertThat;
+
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.List;
+import java.util.Map;
+import org.apache.kafka.common.serialization.StringDeserializer;
+import org.apache.kafka.common.serialization.StringSerializer;
+import org.apache.kafka.streams.StreamsConfig;
+import org.apache.kafka.streams.TestInputTopic;
+import org.apache.kafka.streams.TestOutputTopic;
+import org.apache.kafka.streams.Topology;
+import org.apache.kafka.streams.TopologyTestDriver;
+import org.apache.kafka.streams.state.KeyValueStore;
+import org.apache.kafka.test.IntegrationTest;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+
+@RunWith(Parameterized.class)
+@Category(IntegrationTest.class)
+public class KTableKTableForeignKeyVersionedJoinIntegrationTest extends KTableKTableForeignKeyJoinIntegrationTest {
+
+    public KTableKTableForeignKeyVersionedJoinIntegrationTest(final boolean leftJoin,
+                                                              final boolean materialized,
+                                                              final boolean leftVersioned,
+                                                              final boolean rightVersioned) {
+        // optimizations and rejoin are disabled for these tests, as these tests focus on versioning.
+        // see KTableKTableForeignKeyJoinIntegrationTest for test coverage for optimizations and rejoin
+        super(leftJoin, StreamsConfig.NO_OPTIMIZATION, materialized, false, leftVersioned, rightVersioned);
+    }
+
+    @Parameterized.Parameters(name = "leftJoin={0}, materialized={1}, leftVersioned={2}, rightVersioned={3}")
+    public static Collection<Object[]> data() {
+        final List<Boolean> booleans = Arrays.asList(true, false);
+        return buildParameters(booleans, booleans, booleans, booleans);
+    }
+
+    @Test
+    public void shouldIgnoreOutOfOrderRecordsIffVersioned() {
+        final Topology topology = getTopology(streamsConfig, materialized ? "store" : null, leftJoin, rejoin, leftVersioned, rightVersioned);
+        try (final TopologyTestDriver driver = new TopologyTestDriver(topology, streamsConfig)) {
+            final TestInputTopic<String, String> right = driver.createInputTopic(RIGHT_TABLE, new StringSerializer(), new StringSerializer());
+            final TestInputTopic<String, String> left = driver.createInputTopic(LEFT_TABLE, new StringSerializer(), new StringSerializer());
+            final TestOutputTopic<String, String> outputTopic = driver.createOutputTopic(OUTPUT, new StringDeserializer(), new StringDeserializer());
+            final KeyValueStore<String, String> store = driver.getKeyValueStore("store");
+
+            // RHS record
+            right.pipeInput("rhs1", "rhsValue1", baseTimestamp + 4);
+
+            assertThat(
+                outputTopic.readKeyValuesToMap(),
+                is(emptyMap())
+            );
+            if (materialized) {
+                assertThat(
+                    asMap(store),
+                    is(emptyMap())
+                );
+            }
+
+            // LHS records with match to existing RHS record
+            left.pipeInput("lhs1", "lhsValue1|rhs1", baseTimestamp + 3);
+            left.pipeInput("lhs2", "lhsValue2|rhs1", baseTimestamp + 5);
+
+            {
+                final Map<String, String> expected = mkMap(
+                    mkEntry("lhs1", "(lhsValue1|rhs1,rhsValue1)"),
+                    mkEntry("lhs2", "(lhsValue2|rhs1,rhsValue1)")
+                );
+                assertThat(
+                    outputTopic.readKeyValuesToMap(),
+                    is(expected)
+                );
+                if (materialized) {
+                    assertThat(
+                        asMap(store),
+                        is(expected)
+                    );
+                }
+            }
+
+            // out-of-order LHS record (for existing key) does not produce a new result iff LHS is versioned
+            left.pipeInput("lhs1", "lhsValue1_ooo|rhs1", baseTimestamp + 2);
+            if (leftVersioned) {
+                assertThat(
+                    outputTopic.readKeyValuesToMap(),
+                    is(emptyMap())
+                );
+                if (materialized) {
+                    assertThat(
+                        asMap(store),
+                        is(mkMap(
+                            mkEntry("lhs1", "(lhsValue1|rhs1,rhsValue1)"),
+                            mkEntry("lhs2", "(lhsValue2|rhs1,rhsValue1)")
+                        ))
+                    );
+                }
+            } else {
+                assertThat(
+                    outputTopic.readKeyValuesToMap(),
+                    is(mkMap(
+                        mkEntry("lhs1", "(lhsValue1_ooo|rhs1,rhsValue1)")
+                    ))
+                );
+                if (materialized) {
+                    assertThat(
+                        asMap(store),
+                        is(mkMap(
+                            mkEntry("lhs1", "(lhsValue1_ooo|rhs1,rhsValue1)"),
+                            mkEntry("lhs2", "(lhsValue2|rhs1,rhsValue1)")
+                        ))
+                    );
+                }
+            }
+
+            // out-of-order LHS tombstone (for existing key) is similarly ignored (iff LHS is versioned)
+            left.pipeInput("lhs1", null, baseTimestamp + 2);
+            if (leftVersioned) {
+                assertThat(
+                    outputTopic.readKeyValuesToMap(),
+                    is(emptyMap())
+                );
+                if (materialized) {
+                    assertThat(
+                        asMap(store),
+                        is(mkMap(
+                            mkEntry("lhs1", "(lhsValue1|rhs1,rhsValue1)"),
+                            mkEntry("lhs2", "(lhsValue2|rhs1,rhsValue1)")
+                        ))
+                    );
+                }
+            } else {
+                assertThat(
+                    outputTopic.readKeyValuesToMap(),
+                    is(mkMap(
+                        mkEntry("lhs1", null)
+                    ))
+                );
+                if (materialized) {
+                    assertThat(
+                        asMap(store),
+                        is(mkMap(
+                            mkEntry("lhs2", "(lhsValue2|rhs1,rhsValue1)")
+                        ))
+                    );
+                }
+            }
+
+            // LHS record with larger timestamp always produces a new result
+            left.pipeInput("lhs1", "lhsValue1_new|rhs1", baseTimestamp + 8);
+            {
+                assertThat(
+                    outputTopic.readKeyValuesToMap(),
+                    is(mkMap(
+                        mkEntry("lhs1", "(lhsValue1_new|rhs1,rhsValue1)")
+                    ))
+                );
+                if (materialized) {
+                    assertThat(
+                        asMap(store),
+                        is(mkMap(
+                            mkEntry("lhs1", "(lhsValue1_new|rhs1,rhsValue1)"),
+                            mkEntry("lhs2", "(lhsValue2|rhs1,rhsValue1)")
+                        ))
+                    );
+                }
+            }
+
+            // out-of-order RHS record (for existing key) does not produce a new result iff RHS is versioned
+            right.pipeInput("rhs1", "rhsValue1_ooo", baseTimestamp + 1);
+            if (rightVersioned) {
+                assertThat(
+                    outputTopic.readKeyValuesToMap(),
+                    is(emptyMap())
+                );
+                if (materialized) {
+                    assertThat(
+                        asMap(store),
+                        is(mkMap(
+                            mkEntry("lhs1", "(lhsValue1_new|rhs1,rhsValue1)"),
+                            mkEntry("lhs2", "(lhsValue2|rhs1,rhsValue1)")
+                        ))
+                    );
+                }
+            } else {
+                assertThat(
+                    outputTopic.readKeyValuesToMap(),
+                    is(mkMap(
+                        mkEntry("lhs1", "(lhsValue1_new|rhs1,rhsValue1_ooo)"),
+                        mkEntry("lhs2", "(lhsValue2|rhs1,rhsValue1_ooo)")
+                    ))
+                );
+                if (materialized) {
+                    assertThat(
+                        asMap(store),
+                        is(mkMap(
+                            mkEntry("lhs1", "(lhsValue1_new|rhs1,rhsValue1_ooo)"),
+                            mkEntry("lhs2", "(lhsValue2|rhs1,rhsValue1_ooo)")
+                        ))
+                    );
+                }
+            }
+
+            // out-of-order RHS tombstone (for existing key) is similarly ignored (iff RHS is versioned)
+            right.pipeInput("rhs1", null, baseTimestamp + 1);
+            if (rightVersioned) {
+                assertThat(
+                    outputTopic.readKeyValuesToMap(),
+                    is(emptyMap())
+                );
+                if (materialized) {
+                    assertThat(
+                        asMap(store),
+                        is(mkMap(
+                            mkEntry("lhs1", "(lhsValue1_new|rhs1,rhsValue1)"),
+                            mkEntry("lhs2", "(lhsValue2|rhs1,rhsValue1)")
+                        ))
+                    );
+                }
+            } else {
+                if (leftJoin) {
+                    assertThat(
+                        outputTopic.readKeyValuesToMap(),
+                        is(mkMap(
+                            mkEntry("lhs1", "(lhsValue1_new|rhs1,null)"),
+                            mkEntry("lhs2", "(lhsValue2|rhs1,null)")
+                        ))
+                    );
+                    if (materialized) {
+                        assertThat(
+                            asMap(store),
+                            is(mkMap(
+                                mkEntry("lhs1", "(lhsValue1_new|rhs1,null)"),
+                                mkEntry("lhs2", "(lhsValue2|rhs1,null)")
+                            ))
+                        );
+                    }
+                } else {
+                    assertThat(
+                        outputTopic.readKeyValuesToMap(),
+                        is(mkMap(
+                            mkEntry("lhs1", null),
+                            mkEntry("lhs2", null)
+                        ))
+                    );
+                    if (materialized) {
+                        assertThat(
+                            asMap(store),
+                            is(emptyMap())
+                        );
+                    }
+                }
+            }
+
+            // RHS record with larger timestamps always produces new results
+            right.pipeInput("rhs1", "rhsValue1_new", baseTimestamp + 6);
+            {
+                assertThat(
+                    outputTopic.readKeyValuesToMap(),
+                    is(mkMap(
+                        mkEntry("lhs1", "(lhsValue1_new|rhs1,rhsValue1_new)"),
+                        mkEntry("lhs2", "(lhsValue2|rhs1,rhsValue1_new)")
+                    ))
+                );
+                if (materialized) {
+                    assertThat(
+                        asMap(store),
+                        is(mkMap(
+                            mkEntry("lhs1", "(lhsValue1_new|rhs1,rhsValue1_new)"),
+                            mkEntry("lhs2", "(lhsValue2|rhs1,rhsValue1_new)")
+                        ))
+                    );
+                }
+            }
+        }
+    }
+}
diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamImplTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamImplTest.java
index 76ec17c63b7..54bb14aa0ec 100644
--- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamImplTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamImplTest.java
@@ -3149,7 +3149,7 @@ public class KStreamImplTest {
                 "    Processor: KTABLE-FK-JOIN-OUTPUT-0000000018 (stores: [])\n" +
                 "      --> KTABLE-TOSTREAM-0000000020\n" +
                 "      <-- KTABLE-FK-JOIN-SUBSCRIPTION-RESPONSE-RESOLVER-PROCESSOR-0000000017\n" +
-                "    Processor: KTABLE-FK-JOIN-SUBSCRIPTION-REGISTRATION-0000000007 (stores: [])\n" +
+                "    Processor: KTABLE-FK-JOIN-SUBSCRIPTION-REGISTRATION-0000000007 (stores: [KSTREAM-TOTABLE-STATE-STORE-0000000002])\n" +
                 "      --> KTABLE-SINK-0000000008\n" +
                 "      <-- KSTREAM-TOTABLE-0000000001\n" +
                 "    Processor: KTABLE-TOSTREAM-0000000020 (stores: [])\n" +
@@ -3174,7 +3174,7 @@ public class KStreamImplTest {
                 "    Processor: KTABLE-FK-JOIN-SUBSCRIPTION-PROCESSOR-0000000012 (stores: [KSTREAM-TOTABLE-STATE-STORE-0000000005])\n" +
                 "      --> KTABLE-SINK-0000000015\n" +
                 "      <-- KTABLE-FK-JOIN-SUBSCRIPTION-PROCESSOR-0000000011\n" +
-                "    Processor: KTABLE-FK-JOIN-SUBSCRIPTION-PROCESSOR-0000000013 (stores: [KTABLE-FK-JOIN-SUBSCRIPTION-STATE-STORE-0000000010])\n" +
+                "    Processor: KTABLE-FK-JOIN-SUBSCRIPTION-PROCESSOR-0000000013 (stores: [KSTREAM-TOTABLE-STATE-STORE-0000000005, KTABLE-FK-JOIN-SUBSCRIPTION-STATE-STORE-0000000010])\n" +
                 "      --> KTABLE-SINK-0000000015\n" +
                 "      <-- KSTREAM-TOTABLE-0000000004\n" +
                 "    Sink: KTABLE-SINK-0000000015 (topic: KTABLE-FK-JOIN-SUBSCRIPTION-RESPONSE-0000000014-topic)\n" +