You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by mj...@apache.org on 2022/05/14 00:21:40 UTC

[kafka] branch trunk updated: MINOR: Add upgrade tests for FK joins (#12122)

This is an automated email from the ASF dual-hosted git repository.

mjsax pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 78dd40123cc MINOR: Add upgrade tests for FK joins (#12122)
78dd40123cc is described below

commit 78dd40123cc47e425264f516a4b8c6c2003c99b8
Author: Alex Sorokoumov <91...@users.noreply.github.com>
AuthorDate: Sat May 14 02:21:27 2022 +0200

    MINOR: Add upgrade tests for FK joins (#12122)
    
    Follow up PR for KAFKA-13769.
    
    Reviewers: Matthias J. Sax <ma...@confluent.io>
---
 .../kafka/streams/tests/SmokeTestDriver.java       | 104 ++++++++++++------
 .../kafka/streams/tests/SmokeTestDriver.java       | 113 ++++++++++++++------
 .../kafka/streams/tests/StreamsUpgradeTest.java    |  45 ++++++--
 .../kafka/streams/tests/SmokeTestDriver.java       | 112 ++++++++++++++------
 .../kafka/streams/tests/StreamsUpgradeTest.java    |  45 ++++++--
 .../kafka/streams/tests/SmokeTestDriver.java       | 114 ++++++++++++++------
 .../kafka/streams/tests/StreamsUpgradeTest.java    |  45 ++++++--
 .../kafka/streams/tests/SmokeTestDriver.java       | 101 ++++++++++++++----
 .../kafka/streams/tests/StreamsUpgradeTest.java    |  45 ++++++--
 .../kafka/streams/tests/SmokeTestDriver.java       | 112 ++++++++++++++------
 .../kafka/streams/tests/StreamsUpgradeTest.java    |  45 ++++++--
 .../kafka/streams/tests/SmokeTestDriver.java       | 112 ++++++++++++++------
 .../kafka/streams/tests/StreamsUpgradeTest.java    |  45 ++++++--
 .../kafka/streams/tests/SmokeTestDriver.java       | 116 +++++++++++++++------
 .../kafka/streams/tests/StreamsUpgradeTest.java    |  45 ++++++--
 tests/kafkatest/services/streams.py                |   5 +-
 .../tests/streams/streams_upgrade_test.py          |  77 +++++++++-----
 tests/kafkatest/version.py                         |   3 +
 18 files changed, 962 insertions(+), 322 deletions(-)

diff --git a/streams/src/test/java/org/apache/kafka/streams/tests/SmokeTestDriver.java b/streams/src/test/java/org/apache/kafka/streams/tests/SmokeTestDriver.java
index ac83cd95eba..2bbb25db395 100644
--- a/streams/src/test/java/org/apache/kafka/streams/tests/SmokeTestDriver.java
+++ b/streams/src/test/java/org/apache/kafka/streams/tests/SmokeTestDriver.java
@@ -30,6 +30,7 @@ import org.apache.kafka.common.TopicPartition;
 import org.apache.kafka.common.errors.TimeoutException;
 import org.apache.kafka.common.serialization.ByteArraySerializer;
 import org.apache.kafka.common.serialization.Deserializer;
+import org.apache.kafka.common.serialization.Serde;
 import org.apache.kafka.common.serialization.StringDeserializer;
 import org.apache.kafka.common.utils.Exit;
 import org.apache.kafka.common.utils.Utils;
@@ -60,7 +61,7 @@ import static java.util.Collections.emptyMap;
 import static org.apache.kafka.common.utils.Utils.mkEntry;
 
 public class SmokeTestDriver extends SmokeTestUtil {
-    private static final String[] TOPICS = {
+    private static final String[] NUMERIC_VALUE_TOPICS = {
         "data",
         "echo",
         "max",
@@ -72,6 +73,15 @@ public class SmokeTestDriver extends SmokeTestUtil {
         "avg",
         "tagg"
     };
+    private static final String[] STRING_VALUE_TOPICS = {
+        "fk"
+    };
+
+    private static final String[] TOPICS = new String[NUMERIC_VALUE_TOPICS.length + STRING_VALUE_TOPICS.length];
+    static {
+        System.arraycopy(NUMERIC_VALUE_TOPICS, 0, TOPICS, 0, NUMERIC_VALUE_TOPICS.length);
+        System.arraycopy(STRING_VALUE_TOPICS, 0, TOPICS, NUMERIC_VALUE_TOPICS.length, STRING_VALUE_TOPICS.length);
+    }
 
     private static final int MAX_RECORD_EMPTY_RETRIES = 30;
 
@@ -163,7 +173,8 @@ public class SmokeTestDriver extends SmokeTestUtil {
 
         final long recordPauseTime = timeToSpend.toMillis() / numKeys / maxRecordsPerKey;
 
-        List<ProducerRecord<byte[], byte[]>> needRetry = new ArrayList<>();
+        final List<ProducerRecord<byte[], byte[]>> dataNeedRetry = new ArrayList<>();
+        final List<ProducerRecord<byte[], byte[]>> fkNeedRetry = new ArrayList<>();
 
         try (final KafkaProducer<byte[], byte[]> producer = new KafkaProducer<>(producerProps)) {
             while (remaining > 0) {
@@ -183,7 +194,16 @@ public class SmokeTestDriver extends SmokeTestUtil {
                             intSerde.serializer().serialize("", value)
                         );
 
-                    producer.send(record, new TestCallback(record, needRetry));
+                    producer.send(record, new TestCallback(record, dataNeedRetry));
+
+                    final ProducerRecord<byte[], byte[]> fkRecord =
+                        new ProducerRecord<>(
+                            "fk",
+                            intSerde.serializer().serialize("", value),
+                            stringSerde.serializer().serialize("", key)
+                        );
+
+                    producer.send(fkRecord, new TestCallback(fkRecord, fkNeedRetry));
 
                     numRecordsProduced++;
                     allData.get(key).add(value);
@@ -195,36 +215,60 @@ public class SmokeTestDriver extends SmokeTestUtil {
             }
             producer.flush();
 
-            int remainingRetries = 5;
-            while (!needRetry.isEmpty()) {
-                final List<ProducerRecord<byte[], byte[]>> needRetry2 = new ArrayList<>();
-                for (final ProducerRecord<byte[], byte[]> record : needRetry) {
-                    System.out.println("retry producing " + stringSerde.deserializer().deserialize("", record.key()));
-                    producer.send(record, new TestCallback(record, needRetry2));
-                }
-                producer.flush();
-                needRetry = needRetry2;
+            retry(producer, dataNeedRetry, stringSerde);
+            retry(producer, fkNeedRetry, intSerde);
 
-                if (--remainingRetries == 0 && !needRetry.isEmpty()) {
-                    System.err.println("Failed to produce all records after multiple retries");
-                    Exit.exit(1);
-                }
+            flush(producer,
+                "data",
+                stringSerde.serializer().serialize("", "flush"),
+                intSerde.serializer().serialize("", 0)
+            );
+            flush(producer,
+                "fk",
+                intSerde.serializer().serialize("", 0),
+                stringSerde.serializer().serialize("", "flush")
+            );
+
+        }
+        return Collections.unmodifiableMap(allData);
+    }
+
+    private static void retry(final KafkaProducer<byte[], byte[]> producer,
+                              List<ProducerRecord<byte[], byte[]>> needRetry,
+                              final Serde<?> keySerde) {
+        int remainingRetries = 5;
+        while (!needRetry.isEmpty()) {
+            final List<ProducerRecord<byte[], byte[]>> needRetry2 = new ArrayList<>();
+            for (final ProducerRecord<byte[], byte[]> record : needRetry) {
+                System.out.println("retry producing " + keySerde.deserializer().deserialize("", record.key()));
+                producer.send(record, new TestCallback(record, needRetry2));
             }
+            producer.flush();
+            needRetry = needRetry2;
 
-            // now that we've sent everything, we'll send some final records with a timestamp high enough to flush out
-            // all suppressed records.
-            final List<PartitionInfo> partitions = producer.partitionsFor("data");
-            for (final PartitionInfo partition : partitions) {
-                producer.send(new ProducerRecord<>(
-                    partition.topic(),
-                    partition.partition(),
-                    System.currentTimeMillis() + Duration.ofDays(2).toMillis(),
-                    stringSerde.serializer().serialize("", "flush"),
-                    intSerde.serializer().serialize("", 0)
-                ));
+            if (--remainingRetries == 0 && !needRetry.isEmpty()) {
+                System.err.println("Failed to produce all records after multiple retries");
+                Exit.exit(1);
             }
         }
-        return Collections.unmodifiableMap(allData);
+    }
+
+    private static void flush(final KafkaProducer<byte[], byte[]> producer,
+                              final String topic,
+                              final byte[] keyBytes,
+                              final byte[] valBytes) {
+        // now that we've sent everything, we'll send some final records with a timestamp high enough to flush out
+        // all suppressed records.
+        final List<PartitionInfo> partitions = producer.partitionsFor(topic);
+        for (final PartitionInfo partition : partitions) {
+            producer.send(new ProducerRecord<>(
+                partition.topic(),
+                partition.partition(),
+                System.currentTimeMillis() + Duration.ofDays(2).toMillis(),
+                keyBytes,
+                valBytes
+            ));
+        }
     }
 
     private static Properties generatorProperties(final String kafka) {
@@ -315,14 +359,14 @@ public class SmokeTestDriver extends SmokeTestUtil {
         props.put(ConsumerConfig.ISOLATION_LEVEL_CONFIG, "read_committed");
 
         final KafkaConsumer<String, Number> consumer = new KafkaConsumer<>(props);
-        final List<TopicPartition> partitions = getAllPartitions(consumer, TOPICS);
+        final List<TopicPartition> partitions = getAllPartitions(consumer, NUMERIC_VALUE_TOPICS);
         consumer.assign(partitions);
         consumer.seekToBeginning(partitions);
 
         final int recordsGenerated = inputs.size() * maxRecordsPerKey;
         int recordsProcessed = 0;
         final Map<String, AtomicInteger> processed =
-            Stream.of(TOPICS)
+            Stream.of(NUMERIC_VALUE_TOPICS)
                   .collect(Collectors.toMap(t -> t, t -> new AtomicInteger(0)));
 
         final Map<String, Map<String, LinkedList<ConsumerRecord<String, Number>>>> events = new HashMap<>();
diff --git a/streams/upgrade-system-tests-24/src/test/java/org/apache/kafka/streams/tests/SmokeTestDriver.java b/streams/upgrade-system-tests-24/src/test/java/org/apache/kafka/streams/tests/SmokeTestDriver.java
index ac83cd95eba..5c4a8cd615b 100644
--- a/streams/upgrade-system-tests-24/src/test/java/org/apache/kafka/streams/tests/SmokeTestDriver.java
+++ b/streams/upgrade-system-tests-24/src/test/java/org/apache/kafka/streams/tests/SmokeTestDriver.java
@@ -30,6 +30,7 @@ import org.apache.kafka.common.TopicPartition;
 import org.apache.kafka.common.errors.TimeoutException;
 import org.apache.kafka.common.serialization.ByteArraySerializer;
 import org.apache.kafka.common.serialization.Deserializer;
+import org.apache.kafka.common.serialization.Serde;
 import org.apache.kafka.common.serialization.StringDeserializer;
 import org.apache.kafka.common.utils.Exit;
 import org.apache.kafka.common.utils.Utils;
@@ -60,7 +61,7 @@ import static java.util.Collections.emptyMap;
 import static org.apache.kafka.common.utils.Utils.mkEntry;
 
 public class SmokeTestDriver extends SmokeTestUtil {
-    private static final String[] TOPICS = {
+    private static final String[] NUMERIC_VALUE_TOPICS = {
         "data",
         "echo",
         "max",
@@ -72,6 +73,14 @@ public class SmokeTestDriver extends SmokeTestUtil {
         "avg",
         "tagg"
     };
+    private static final String[] STRING_VALUE_TOPICS = {
+        "fk"
+    };
+    private static final String[] TOPICS = new String[NUMERIC_VALUE_TOPICS.length + STRING_VALUE_TOPICS.length];
+    static {
+        System.arraycopy(NUMERIC_VALUE_TOPICS, 0, TOPICS, 0, NUMERIC_VALUE_TOPICS.length);
+        System.arraycopy(STRING_VALUE_TOPICS, 0, TOPICS, NUMERIC_VALUE_TOPICS.length, STRING_VALUE_TOPICS.length);
+    }
 
     private static final int MAX_RECORD_EMPTY_RETRIES = 30;
 
@@ -130,9 +139,16 @@ public class SmokeTestDriver extends SmokeTestUtil {
                         stringSerde.serializer().serialize("", key),
                         intSerde.serializer().serialize("", value)
                     );
-
                 producer.send(record);
 
+                final ProducerRecord<byte[], byte[]> fkRecord =
+                    new ProducerRecord<>(
+                        "fk",
+                        intSerde.serializer().serialize("", value),
+                        stringSerde.serializer().serialize("", key)
+                    );
+                producer.send(fkRecord);
+
                 numRecordsProduced++;
                 if (numRecordsProduced % 100 == 0) {
                     System.out.println(Instant.now() + " " + numRecordsProduced + " records produced");
@@ -148,7 +164,6 @@ public class SmokeTestDriver extends SmokeTestUtil {
                                                      final Duration timeToSpend) {
         final Properties producerProps = generatorProperties(kafka);
 
-
         int numRecordsProduced = 0;
 
         final Map<String, Set<Integer>> allData = new HashMap<>();
@@ -163,7 +178,8 @@ public class SmokeTestDriver extends SmokeTestUtil {
 
         final long recordPauseTime = timeToSpend.toMillis() / numKeys / maxRecordsPerKey;
 
-        List<ProducerRecord<byte[], byte[]>> needRetry = new ArrayList<>();
+        final List<ProducerRecord<byte[], byte[]>> dataNeedRetry = new ArrayList<>();
+        final List<ProducerRecord<byte[], byte[]>> fkNeedRetry = new ArrayList<>();
 
         try (final KafkaProducer<byte[], byte[]> producer = new KafkaProducer<>(producerProps)) {
             while (remaining > 0) {
@@ -175,15 +191,21 @@ public class SmokeTestDriver extends SmokeTestUtil {
                     remaining--;
                     data[index] = data[remaining];
                 } else {
-
                     final ProducerRecord<byte[], byte[]> record =
                         new ProducerRecord<>(
                             "data",
                             stringSerde.serializer().serialize("", key),
                             intSerde.serializer().serialize("", value)
                         );
+                    producer.send(record, new TestCallback(record, dataNeedRetry));
 
-                    producer.send(record, new TestCallback(record, needRetry));
+                    final ProducerRecord<byte[], byte[]> fkRecord =
+                        new ProducerRecord<>(
+                            "fk",
+                            intSerde.serializer().serialize("", value),
+                            stringSerde.serializer().serialize("", key)
+                        );
+                    producer.send(fkRecord, new TestCallback(fkRecord, fkNeedRetry));
 
                     numRecordsProduced++;
                     allData.get(key).add(value);
@@ -195,36 +217,59 @@ public class SmokeTestDriver extends SmokeTestUtil {
             }
             producer.flush();
 
-            int remainingRetries = 5;
-            while (!needRetry.isEmpty()) {
-                final List<ProducerRecord<byte[], byte[]>> needRetry2 = new ArrayList<>();
-                for (final ProducerRecord<byte[], byte[]> record : needRetry) {
-                    System.out.println("retry producing " + stringSerde.deserializer().deserialize("", record.key()));
-                    producer.send(record, new TestCallback(record, needRetry2));
-                }
-                producer.flush();
-                needRetry = needRetry2;
+            retry(producer, dataNeedRetry, stringSerde);
+            retry(producer, fkNeedRetry, intSerde);
+
+            flush(producer,
+                "data",
+                stringSerde.serializer().serialize("", "flush"),
+                intSerde.serializer().serialize("", 0)
+            );
+            flush(producer,
+                "fk",
+                intSerde.serializer().serialize("", 0),
+                stringSerde.serializer().serialize("", "flush")
+            );
+        }
+        return Collections.unmodifiableMap(allData);
+    }
 
-                if (--remainingRetries == 0 && !needRetry.isEmpty()) {
-                    System.err.println("Failed to produce all records after multiple retries");
-                    Exit.exit(1);
-                }
+    private static void retry(final KafkaProducer<byte[], byte[]> producer,
+        List<ProducerRecord<byte[], byte[]>> needRetry,
+        final Serde<?> keySerde) {
+        int remainingRetries = 5;
+        while (!needRetry.isEmpty()) {
+            final List<ProducerRecord<byte[], byte[]>> needRetry2 = new ArrayList<>();
+            for (final ProducerRecord<byte[], byte[]> record : needRetry) {
+                System.out.println(
+                    "retry producing " + keySerde.deserializer().deserialize("", record.key()));
+                producer.send(record, new TestCallback(record, needRetry2));
             }
-
-            // now that we've sent everything, we'll send some final records with a timestamp high enough to flush out
-            // all suppressed records.
-            final List<PartitionInfo> partitions = producer.partitionsFor("data");
-            for (final PartitionInfo partition : partitions) {
-                producer.send(new ProducerRecord<>(
-                    partition.topic(),
-                    partition.partition(),
-                    System.currentTimeMillis() + Duration.ofDays(2).toMillis(),
-                    stringSerde.serializer().serialize("", "flush"),
-                    intSerde.serializer().serialize("", 0)
-                ));
+            producer.flush();
+            needRetry = needRetry2;
+            if (--remainingRetries == 0 && !needRetry.isEmpty()) {
+                System.err.println("Failed to produce all records after multiple retries");
+                Exit.exit(1);
             }
         }
-        return Collections.unmodifiableMap(allData);
+    }
+
+    private static void flush(final KafkaProducer<byte[], byte[]> producer,
+        final String topic,
+        final byte[] keyBytes,
+        final byte[] valBytes) {
+        // now that we've sent everything, we'll send some final records with a timestamp high enough to flush out
+        // all suppressed records.
+        final List<PartitionInfo> partitions = producer.partitionsFor(topic);
+        for (final PartitionInfo partition : partitions) {
+            producer.send(new ProducerRecord<>(
+                partition.topic(),
+                partition.partition(),
+                System.currentTimeMillis() + Duration.ofDays(2).toMillis(),
+                keyBytes,
+                valBytes
+            ));
+        }
     }
 
     private static Properties generatorProperties(final String kafka) {
@@ -315,14 +360,14 @@ public class SmokeTestDriver extends SmokeTestUtil {
         props.put(ConsumerConfig.ISOLATION_LEVEL_CONFIG, "read_committed");
 
         final KafkaConsumer<String, Number> consumer = new KafkaConsumer<>(props);
-        final List<TopicPartition> partitions = getAllPartitions(consumer, TOPICS);
+        final List<TopicPartition> partitions = getAllPartitions(consumer, NUMERIC_VALUE_TOPICS);
         consumer.assign(partitions);
         consumer.seekToBeginning(partitions);
 
         final int recordsGenerated = inputs.size() * maxRecordsPerKey;
         int recordsProcessed = 0;
         final Map<String, AtomicInteger> processed =
-            Stream.of(TOPICS)
+            Stream.of(NUMERIC_VALUE_TOPICS)
                   .collect(Collectors.toMap(t -> t, t -> new AtomicInteger(0)));
 
         final Map<String, Map<String, LinkedList<ConsumerRecord<String, Number>>>> events = new HashMap<>();
diff --git a/streams/upgrade-system-tests-24/src/test/java/org/apache/kafka/streams/tests/StreamsUpgradeTest.java b/streams/upgrade-system-tests-24/src/test/java/org/apache/kafka/streams/tests/StreamsUpgradeTest.java
index c0c8c72c599..9d08663d9b3 100644
--- a/streams/upgrade-system-tests-24/src/test/java/org/apache/kafka/streams/tests/StreamsUpgradeTest.java
+++ b/streams/upgrade-system-tests-24/src/test/java/org/apache/kafka/streams/tests/StreamsUpgradeTest.java
@@ -16,11 +16,18 @@
  */
 package org.apache.kafka.streams.tests;
 
+import static org.apache.kafka.streams.tests.SmokeTestUtil.intSerde;
+import static org.apache.kafka.streams.tests.SmokeTestUtil.stringSerde;
+
+import java.util.Random;
 import org.apache.kafka.common.utils.Utils;
 import org.apache.kafka.streams.KafkaStreams;
 import org.apache.kafka.streams.StreamsBuilder;
 import org.apache.kafka.streams.StreamsConfig;
+import org.apache.kafka.streams.kstream.Consumed;
 import org.apache.kafka.streams.kstream.KStream;
+import org.apache.kafka.streams.kstream.KTable;
+import org.apache.kafka.streams.kstream.Produced;
 import org.apache.kafka.streams.processor.AbstractProcessor;
 import org.apache.kafka.streams.processor.ProcessorContext;
 import org.apache.kafka.streams.processor.ProcessorSupplier;
@@ -42,12 +49,29 @@ public class StreamsUpgradeTest {
         System.out.println("props=" + streamsProperties);
 
         final StreamsBuilder builder = new StreamsBuilder();
-        final KStream dataStream = builder.stream("data");
-        dataStream.process(printProcessorSupplier());
+        final KTable<String, Integer> dataTable = builder.table(
+            "data", Consumed.with(stringSerde, intSerde));
+        final KStream<String, Integer> dataStream = dataTable.toStream();
+        dataStream.process(printProcessorSupplier("data"));
         dataStream.to("echo");
 
+        final boolean runFkJoin = Boolean.parseBoolean(streamsProperties.getProperty(
+            "test.run_fk_join",
+            "false"));
+        if (runFkJoin) {
+            try {
+                final KTable<Integer, String> fkTable = builder.table(
+                    "fk", Consumed.with(intSerde, stringSerde));
+                buildFKTable(dataTable, fkTable);
+            } catch (final Exception e) {
+                System.err.println("Caught " + e.getMessage());
+            }
+        }
+
         final Properties config = new Properties();
-        config.setProperty(StreamsConfig.APPLICATION_ID_CONFIG, "StreamsUpgradeTest");
+        config.setProperty(
+            StreamsConfig.APPLICATION_ID_CONFIG,
+            "StreamsUpgradeTest-" + new Random().nextLong());
         config.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, 1000L);
         config.putAll(streamsProperties);
 
@@ -61,13 +85,22 @@ public class StreamsUpgradeTest {
         }));
     }
 
-    private static <K, V> ProcessorSupplier<K, V> printProcessorSupplier() {
+    private static void buildFKTable(final KTable<String, Integer> primaryTable,
+                                     final KTable<Integer, String> otherTable) {
+        final KStream<String, String> kStream = primaryTable
+            .join(otherTable, v -> v, (k0, v0) -> v0)
+            .toStream();
+        kStream.process(printProcessorSupplier("fk"));
+        kStream.to("fk-result", Produced.with(stringSerde, stringSerde));
+    }
+
+    private static <K, V> ProcessorSupplier<K, V> printProcessorSupplier(final String topic) {
         return () -> new AbstractProcessor<K, V>() {
             private int numRecordsProcessed = 0;
 
             @Override
             public void init(final ProcessorContext context) {
-                System.out.println("[2.4] initializing processor: topic=data taskId=" + context.taskId());
+                System.out.println("[2.4] initializing processor: topic=" + topic + " taskId=" + context.taskId());
                 numRecordsProcessed = 0;
             }
 
@@ -75,7 +108,7 @@ public class StreamsUpgradeTest {
             public void process(final K key, final V value) {
                 numRecordsProcessed++;
                 if (numRecordsProcessed % 100 == 0) {
-                    System.out.println("processed " + numRecordsProcessed + " records from topic=data");
+                    System.out.println("processed " + numRecordsProcessed + " records from topic=" + topic);
                 }
             }
 
diff --git a/streams/upgrade-system-tests-25/src/test/java/org/apache/kafka/streams/tests/SmokeTestDriver.java b/streams/upgrade-system-tests-25/src/test/java/org/apache/kafka/streams/tests/SmokeTestDriver.java
index ac83cd95eba..4dae6eae575 100644
--- a/streams/upgrade-system-tests-25/src/test/java/org/apache/kafka/streams/tests/SmokeTestDriver.java
+++ b/streams/upgrade-system-tests-25/src/test/java/org/apache/kafka/streams/tests/SmokeTestDriver.java
@@ -30,6 +30,7 @@ import org.apache.kafka.common.TopicPartition;
 import org.apache.kafka.common.errors.TimeoutException;
 import org.apache.kafka.common.serialization.ByteArraySerializer;
 import org.apache.kafka.common.serialization.Deserializer;
+import org.apache.kafka.common.serialization.Serde;
 import org.apache.kafka.common.serialization.StringDeserializer;
 import org.apache.kafka.common.utils.Exit;
 import org.apache.kafka.common.utils.Utils;
@@ -60,7 +61,7 @@ import static java.util.Collections.emptyMap;
 import static org.apache.kafka.common.utils.Utils.mkEntry;
 
 public class SmokeTestDriver extends SmokeTestUtil {
-    private static final String[] TOPICS = {
+    private static final String[] NUMERIC_VALUE_TOPICS = {
         "data",
         "echo",
         "max",
@@ -72,6 +73,14 @@ public class SmokeTestDriver extends SmokeTestUtil {
         "avg",
         "tagg"
     };
+    private static final String[] STRING_VALUE_TOPICS = {
+        "fk"
+    };
+    private static final String[] TOPICS = new String[NUMERIC_VALUE_TOPICS.length + STRING_VALUE_TOPICS.length];
+    static {
+        System.arraycopy(NUMERIC_VALUE_TOPICS, 0, TOPICS, 0, NUMERIC_VALUE_TOPICS.length);
+        System.arraycopy(STRING_VALUE_TOPICS, 0, TOPICS, NUMERIC_VALUE_TOPICS.length, STRING_VALUE_TOPICS.length);
+    }
 
     private static final int MAX_RECORD_EMPTY_RETRIES = 30;
 
@@ -130,9 +139,16 @@ public class SmokeTestDriver extends SmokeTestUtil {
                         stringSerde.serializer().serialize("", key),
                         intSerde.serializer().serialize("", value)
                     );
-
                 producer.send(record);
 
+                final ProducerRecord<byte[], byte[]> fkRecord =
+                    new ProducerRecord<>(
+                        "fk",
+                        intSerde.serializer().serialize("", value),
+                        stringSerde.serializer().serialize("", key)
+                    );
+                producer.send(fkRecord);
+
                 numRecordsProduced++;
                 if (numRecordsProduced % 100 == 0) {
                     System.out.println(Instant.now() + " " + numRecordsProduced + " records produced");
@@ -163,7 +179,8 @@ public class SmokeTestDriver extends SmokeTestUtil {
 
         final long recordPauseTime = timeToSpend.toMillis() / numKeys / maxRecordsPerKey;
 
-        List<ProducerRecord<byte[], byte[]>> needRetry = new ArrayList<>();
+        final List<ProducerRecord<byte[], byte[]>> dataNeedRetry = new ArrayList<>();
+        final List<ProducerRecord<byte[], byte[]>> fkNeedRetry = new ArrayList<>();
 
         try (final KafkaProducer<byte[], byte[]> producer = new KafkaProducer<>(producerProps)) {
             while (remaining > 0) {
@@ -175,15 +192,21 @@ public class SmokeTestDriver extends SmokeTestUtil {
                     remaining--;
                     data[index] = data[remaining];
                 } else {
-
                     final ProducerRecord<byte[], byte[]> record =
                         new ProducerRecord<>(
                             "data",
                             stringSerde.serializer().serialize("", key),
                             intSerde.serializer().serialize("", value)
                         );
+                    producer.send(record, new TestCallback(record, dataNeedRetry));
 
-                    producer.send(record, new TestCallback(record, needRetry));
+                    final ProducerRecord<byte[], byte[]> fkRecord =
+                        new ProducerRecord<>(
+                            "fk",
+                            intSerde.serializer().serialize("", value),
+                            stringSerde.serializer().serialize("", key)
+                        );
+                    producer.send(fkRecord, new TestCallback(fkRecord, fkNeedRetry));
 
                     numRecordsProduced++;
                     allData.get(key).add(value);
@@ -195,36 +218,59 @@ public class SmokeTestDriver extends SmokeTestUtil {
             }
             producer.flush();
 
-            int remainingRetries = 5;
-            while (!needRetry.isEmpty()) {
-                final List<ProducerRecord<byte[], byte[]>> needRetry2 = new ArrayList<>();
-                for (final ProducerRecord<byte[], byte[]> record : needRetry) {
-                    System.out.println("retry producing " + stringSerde.deserializer().deserialize("", record.key()));
-                    producer.send(record, new TestCallback(record, needRetry2));
-                }
-                producer.flush();
-                needRetry = needRetry2;
+            retry(producer, dataNeedRetry, stringSerde);
+            retry(producer, fkNeedRetry, intSerde);
+
+            flush(producer,
+                "data",
+                stringSerde.serializer().serialize("", "flush"),
+                intSerde.serializer().serialize("", 0)
+            );
+            flush(producer,
+                "fk",
+                intSerde.serializer().serialize("", 0),
+                stringSerde.serializer().serialize("", "flush")
+            );
+        }
+        return Collections.unmodifiableMap(allData);
+    }
 
-                if (--remainingRetries == 0 && !needRetry.isEmpty()) {
-                    System.err.println("Failed to produce all records after multiple retries");
-                    Exit.exit(1);
-                }
+    private static void retry(final KafkaProducer<byte[], byte[]> producer,
+        List<ProducerRecord<byte[], byte[]>> needRetry,
+        final Serde<?> keySerde) {
+        int remainingRetries = 5;
+        while (!needRetry.isEmpty()) {
+            final List<ProducerRecord<byte[], byte[]>> needRetry2 = new ArrayList<>();
+            for (final ProducerRecord<byte[], byte[]> record : needRetry) {
+                System.out.println(
+                    "retry producing " + keySerde.deserializer().deserialize("", record.key()));
+                producer.send(record, new TestCallback(record, needRetry2));
             }
-
-            // now that we've sent everything, we'll send some final records with a timestamp high enough to flush out
-            // all suppressed records.
-            final List<PartitionInfo> partitions = producer.partitionsFor("data");
-            for (final PartitionInfo partition : partitions) {
-                producer.send(new ProducerRecord<>(
-                    partition.topic(),
-                    partition.partition(),
-                    System.currentTimeMillis() + Duration.ofDays(2).toMillis(),
-                    stringSerde.serializer().serialize("", "flush"),
-                    intSerde.serializer().serialize("", 0)
-                ));
+            producer.flush();
+            needRetry = needRetry2;
+            if (--remainingRetries == 0 && !needRetry.isEmpty()) {
+                System.err.println("Failed to produce all records after multiple retries");
+                Exit.exit(1);
             }
         }
-        return Collections.unmodifiableMap(allData);
+    }
+
+    private static void flush(final KafkaProducer<byte[], byte[]> producer,
+        final String topic,
+        final byte[] keyBytes,
+        final byte[] valBytes) {
+        // now that we've sent everything, we'll send some final records with a timestamp high enough to flush out
+        // all suppressed records.
+        final List<PartitionInfo> partitions = producer.partitionsFor(topic);
+        for (final PartitionInfo partition : partitions) {
+            producer.send(new ProducerRecord<>(
+                partition.topic(),
+                partition.partition(),
+                System.currentTimeMillis() + Duration.ofDays(2).toMillis(),
+                keyBytes,
+                valBytes
+            ));
+        }
     }
 
     private static Properties generatorProperties(final String kafka) {
@@ -315,14 +361,14 @@ public class SmokeTestDriver extends SmokeTestUtil {
         props.put(ConsumerConfig.ISOLATION_LEVEL_CONFIG, "read_committed");
 
         final KafkaConsumer<String, Number> consumer = new KafkaConsumer<>(props);
-        final List<TopicPartition> partitions = getAllPartitions(consumer, TOPICS);
+        final List<TopicPartition> partitions = getAllPartitions(consumer, NUMERIC_VALUE_TOPICS);
         consumer.assign(partitions);
         consumer.seekToBeginning(partitions);
 
         final int recordsGenerated = inputs.size() * maxRecordsPerKey;
         int recordsProcessed = 0;
         final Map<String, AtomicInteger> processed =
-            Stream.of(TOPICS)
+            Stream.of(NUMERIC_VALUE_TOPICS)
                   .collect(Collectors.toMap(t -> t, t -> new AtomicInteger(0)));
 
         final Map<String, Map<String, LinkedList<ConsumerRecord<String, Number>>>> events = new HashMap<>();
diff --git a/streams/upgrade-system-tests-25/src/test/java/org/apache/kafka/streams/tests/StreamsUpgradeTest.java b/streams/upgrade-system-tests-25/src/test/java/org/apache/kafka/streams/tests/StreamsUpgradeTest.java
index 0fea040bcb4..69c46de37af 100644
--- a/streams/upgrade-system-tests-25/src/test/java/org/apache/kafka/streams/tests/StreamsUpgradeTest.java
+++ b/streams/upgrade-system-tests-25/src/test/java/org/apache/kafka/streams/tests/StreamsUpgradeTest.java
@@ -16,11 +16,18 @@
  */
 package org.apache.kafka.streams.tests;
 
+import static org.apache.kafka.streams.tests.SmokeTestUtil.intSerde;
+import static org.apache.kafka.streams.tests.SmokeTestUtil.stringSerde;
+
+import java.util.Random;
 import org.apache.kafka.common.utils.Utils;
 import org.apache.kafka.streams.KafkaStreams;
 import org.apache.kafka.streams.StreamsBuilder;
 import org.apache.kafka.streams.StreamsConfig;
+import org.apache.kafka.streams.kstream.Consumed;
 import org.apache.kafka.streams.kstream.KStream;
+import org.apache.kafka.streams.kstream.KTable;
+import org.apache.kafka.streams.kstream.Produced;
 import org.apache.kafka.streams.processor.AbstractProcessor;
 import org.apache.kafka.streams.processor.ProcessorContext;
 import org.apache.kafka.streams.processor.ProcessorSupplier;
@@ -42,12 +49,29 @@ public class StreamsUpgradeTest {
         System.out.println("props=" + streamsProperties);
 
         final StreamsBuilder builder = new StreamsBuilder();
-        final KStream dataStream = builder.stream("data");
-        dataStream.process(printProcessorSupplier());
+        final KTable<String, Integer> dataTable = builder.table(
+            "data", Consumed.with(stringSerde, intSerde));
+        final KStream<String, Integer> dataStream = dataTable.toStream();
+        dataStream.process(printProcessorSupplier("data"));
         dataStream.to("echo");
 
+        final boolean runFkJoin = Boolean.parseBoolean(streamsProperties.getProperty(
+            "test.run_fk_join",
+            "false"));
+        if (runFkJoin) {
+            try {
+                final KTable<Integer, String> fkTable = builder.table(
+                    "fk", Consumed.with(intSerde, stringSerde));
+                buildFKTable(dataTable, fkTable);
+            } catch (final Exception e) {
+                System.err.println("Caught " + e.getMessage());
+            }
+        }
+
         final Properties config = new Properties();
-        config.setProperty(StreamsConfig.APPLICATION_ID_CONFIG, "StreamsUpgradeTest");
+        config.setProperty(
+            StreamsConfig.APPLICATION_ID_CONFIG,
+            "StreamsUpgradeTest-" + new Random().nextLong());
         config.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, 1000L);
         config.putAll(streamsProperties);
 
@@ -61,13 +85,22 @@ public class StreamsUpgradeTest {
         }));
     }
 
-    private static <K, V> ProcessorSupplier<K, V> printProcessorSupplier() {
+    private static void buildFKTable(final KTable<String, Integer> primaryTable,
+                                     final KTable<Integer, String> otherTable) {
+        final KStream<String, String> kStream = primaryTable
+            .join(otherTable, v -> v, (k0, v0) -> v0)
+            .toStream();
+        kStream.process(printProcessorSupplier("fk"));
+        kStream.to("fk-result", Produced.with(stringSerde, stringSerde));
+    }
+
+    private static <K, V> ProcessorSupplier<K, V> printProcessorSupplier(final String topic) {
         return () -> new AbstractProcessor<K, V>() {
             private int numRecordsProcessed = 0;
 
             @Override
             public void init(final ProcessorContext context) {
-                System.out.println("[2.5] initializing processor: topic=data taskId=" + context.taskId());
+                System.out.println("[2.5] initializing processor: topic=" + topic + " taskId=" + context.taskId());
                 numRecordsProcessed = 0;
             }
 
@@ -75,7 +108,7 @@ public class StreamsUpgradeTest {
             public void process(final K key, final V value) {
                 numRecordsProcessed++;
                 if (numRecordsProcessed % 100 == 0) {
-                    System.out.println("processed " + numRecordsProcessed + " records from topic=data");
+                    System.out.println("processed " + numRecordsProcessed + " records from topic=" + topic);
                 }
             }
 
diff --git a/streams/upgrade-system-tests-26/src/test/java/org/apache/kafka/streams/tests/SmokeTestDriver.java b/streams/upgrade-system-tests-26/src/test/java/org/apache/kafka/streams/tests/SmokeTestDriver.java
index ac83cd95eba..0e08771495f 100644
--- a/streams/upgrade-system-tests-26/src/test/java/org/apache/kafka/streams/tests/SmokeTestDriver.java
+++ b/streams/upgrade-system-tests-26/src/test/java/org/apache/kafka/streams/tests/SmokeTestDriver.java
@@ -30,6 +30,7 @@ import org.apache.kafka.common.TopicPartition;
 import org.apache.kafka.common.errors.TimeoutException;
 import org.apache.kafka.common.serialization.ByteArraySerializer;
 import org.apache.kafka.common.serialization.Deserializer;
+import org.apache.kafka.common.serialization.Serde;
 import org.apache.kafka.common.serialization.StringDeserializer;
 import org.apache.kafka.common.utils.Exit;
 import org.apache.kafka.common.utils.Utils;
@@ -60,7 +61,7 @@ import static java.util.Collections.emptyMap;
 import static org.apache.kafka.common.utils.Utils.mkEntry;
 
 public class SmokeTestDriver extends SmokeTestUtil {
-    private static final String[] TOPICS = {
+    private static final String[] NUMERIC_VALUE_TOPICS = {
         "data",
         "echo",
         "max",
@@ -72,6 +73,14 @@ public class SmokeTestDriver extends SmokeTestUtil {
         "avg",
         "tagg"
     };
+    private static final String[] STRING_VALUE_TOPICS = {
+        "fk"
+    };
+    private static final String[] TOPICS = new String[NUMERIC_VALUE_TOPICS.length + STRING_VALUE_TOPICS.length];
+    static {
+        System.arraycopy(NUMERIC_VALUE_TOPICS, 0, TOPICS, 0, NUMERIC_VALUE_TOPICS.length);
+        System.arraycopy(STRING_VALUE_TOPICS, 0, TOPICS, NUMERIC_VALUE_TOPICS.length, STRING_VALUE_TOPICS.length);
+    }
 
     private static final int MAX_RECORD_EMPTY_RETRIES = 30;
 
@@ -130,9 +139,16 @@ public class SmokeTestDriver extends SmokeTestUtil {
                         stringSerde.serializer().serialize("", key),
                         intSerde.serializer().serialize("", value)
                     );
-
                 producer.send(record);
 
+                final ProducerRecord<byte[], byte[]> fkRecord =
+                    new ProducerRecord<>(
+                        "fk",
+                        intSerde.serializer().serialize("", value),
+                        stringSerde.serializer().serialize("", key)
+                    );
+                producer.send(fkRecord);
+
                 numRecordsProduced++;
                 if (numRecordsProduced % 100 == 0) {
                     System.out.println(Instant.now() + " " + numRecordsProduced + " records produced");
@@ -163,7 +179,8 @@ public class SmokeTestDriver extends SmokeTestUtil {
 
         final long recordPauseTime = timeToSpend.toMillis() / numKeys / maxRecordsPerKey;
 
-        List<ProducerRecord<byte[], byte[]>> needRetry = new ArrayList<>();
+        final List<ProducerRecord<byte[], byte[]>> dataNeedRetry = new ArrayList<>();
+        final List<ProducerRecord<byte[], byte[]>> fkNeedRetry = new ArrayList<>();
 
         try (final KafkaProducer<byte[], byte[]> producer = new KafkaProducer<>(producerProps)) {
             while (remaining > 0) {
@@ -175,7 +192,6 @@ public class SmokeTestDriver extends SmokeTestUtil {
                     remaining--;
                     data[index] = data[remaining];
                 } else {
-
                     final ProducerRecord<byte[], byte[]> record =
                         new ProducerRecord<>(
                             "data",
@@ -183,7 +199,16 @@ public class SmokeTestDriver extends SmokeTestUtil {
                             intSerde.serializer().serialize("", value)
                         );
 
-                    producer.send(record, new TestCallback(record, needRetry));
+                    producer.send(record, new TestCallback(record, dataNeedRetry));
+
+                    final ProducerRecord<byte[], byte[]> fkRecord =
+                        new ProducerRecord<>(
+                            "fk",
+                            intSerde.serializer().serialize("", value),
+                            stringSerde.serializer().serialize("", key)
+                        );
+
+                    producer.send(fkRecord, new TestCallback(fkRecord, fkNeedRetry));
 
                     numRecordsProduced++;
                     allData.get(key).add(value);
@@ -195,36 +220,59 @@ public class SmokeTestDriver extends SmokeTestUtil {
             }
             producer.flush();
 
-            int remainingRetries = 5;
-            while (!needRetry.isEmpty()) {
-                final List<ProducerRecord<byte[], byte[]>> needRetry2 = new ArrayList<>();
-                for (final ProducerRecord<byte[], byte[]> record : needRetry) {
-                    System.out.println("retry producing " + stringSerde.deserializer().deserialize("", record.key()));
-                    producer.send(record, new TestCallback(record, needRetry2));
-                }
-                producer.flush();
-                needRetry = needRetry2;
+            retry(producer, dataNeedRetry, stringSerde);
+            retry(producer, fkNeedRetry, intSerde);
+
+            flush(producer,
+                "data",
+                stringSerde.serializer().serialize("", "flush"),
+                intSerde.serializer().serialize("", 0)
+            );
+            flush(producer,
+                "fk",
+                intSerde.serializer().serialize("", 0),
+                stringSerde.serializer().serialize("", "flush")
+            );
+        }
+        return Collections.unmodifiableMap(allData);
+    }
 
-                if (--remainingRetries == 0 && !needRetry.isEmpty()) {
-                    System.err.println("Failed to produce all records after multiple retries");
-                    Exit.exit(1);
-                }
+    private static void retry(final KafkaProducer<byte[], byte[]> producer,
+        List<ProducerRecord<byte[], byte[]>> needRetry,
+        final Serde<?> keySerde) {
+        int remainingRetries = 5;
+        while (!needRetry.isEmpty()) {
+            final List<ProducerRecord<byte[], byte[]>> needRetry2 = new ArrayList<>();
+            for (final ProducerRecord<byte[], byte[]> record : needRetry) {
+                System.out.println(
+                    "retry producing " + keySerde.deserializer().deserialize("", record.key()));
+                producer.send(record, new TestCallback(record, needRetry2));
             }
-
-            // now that we've sent everything, we'll send some final records with a timestamp high enough to flush out
-            // all suppressed records.
-            final List<PartitionInfo> partitions = producer.partitionsFor("data");
-            for (final PartitionInfo partition : partitions) {
-                producer.send(new ProducerRecord<>(
-                    partition.topic(),
-                    partition.partition(),
-                    System.currentTimeMillis() + Duration.ofDays(2).toMillis(),
-                    stringSerde.serializer().serialize("", "flush"),
-                    intSerde.serializer().serialize("", 0)
-                ));
+            producer.flush();
+            needRetry = needRetry2;
+            if (--remainingRetries == 0 && !needRetry.isEmpty()) {
+                System.err.println("Failed to produce all records after multiple retries");
+                Exit.exit(1);
             }
         }
-        return Collections.unmodifiableMap(allData);
+    }
+
+    private static void flush(final KafkaProducer<byte[], byte[]> producer,
+        final String topic,
+        final byte[] keyBytes,
+        final byte[] valBytes) {
+        // now that we've sent everything, we'll send some final records with a timestamp high enough to flush out
+        // all suppressed records.
+        final List<PartitionInfo> partitions = producer.partitionsFor(topic);
+        for (final PartitionInfo partition : partitions) {
+            producer.send(new ProducerRecord<>(
+                partition.topic(),
+                partition.partition(),
+                System.currentTimeMillis() + Duration.ofDays(2).toMillis(),
+                keyBytes,
+                valBytes
+            ));
+        }
     }
 
     private static Properties generatorProperties(final String kafka) {
@@ -315,14 +363,14 @@ public class SmokeTestDriver extends SmokeTestUtil {
         props.put(ConsumerConfig.ISOLATION_LEVEL_CONFIG, "read_committed");
 
         final KafkaConsumer<String, Number> consumer = new KafkaConsumer<>(props);
-        final List<TopicPartition> partitions = getAllPartitions(consumer, TOPICS);
+        final List<TopicPartition> partitions = getAllPartitions(consumer, NUMERIC_VALUE_TOPICS);
         consumer.assign(partitions);
         consumer.seekToBeginning(partitions);
 
         final int recordsGenerated = inputs.size() * maxRecordsPerKey;
         int recordsProcessed = 0;
         final Map<String, AtomicInteger> processed =
-            Stream.of(TOPICS)
+            Stream.of(NUMERIC_VALUE_TOPICS)
                   .collect(Collectors.toMap(t -> t, t -> new AtomicInteger(0)));
 
         final Map<String, Map<String, LinkedList<ConsumerRecord<String, Number>>>> events = new HashMap<>();
diff --git a/streams/upgrade-system-tests-26/src/test/java/org/apache/kafka/streams/tests/StreamsUpgradeTest.java b/streams/upgrade-system-tests-26/src/test/java/org/apache/kafka/streams/tests/StreamsUpgradeTest.java
index e1b294ff15b..0844552134a 100644
--- a/streams/upgrade-system-tests-26/src/test/java/org/apache/kafka/streams/tests/StreamsUpgradeTest.java
+++ b/streams/upgrade-system-tests-26/src/test/java/org/apache/kafka/streams/tests/StreamsUpgradeTest.java
@@ -16,11 +16,18 @@
  */
 package org.apache.kafka.streams.tests;
 
+import static org.apache.kafka.streams.tests.SmokeTestUtil.intSerde;
+import static org.apache.kafka.streams.tests.SmokeTestUtil.stringSerde;
+
+import java.util.Random;
 import org.apache.kafka.common.utils.Utils;
 import org.apache.kafka.streams.KafkaStreams;
 import org.apache.kafka.streams.StreamsBuilder;
 import org.apache.kafka.streams.StreamsConfig;
+import org.apache.kafka.streams.kstream.Consumed;
 import org.apache.kafka.streams.kstream.KStream;
+import org.apache.kafka.streams.kstream.KTable;
+import org.apache.kafka.streams.kstream.Produced;
 import org.apache.kafka.streams.processor.AbstractProcessor;
 import org.apache.kafka.streams.processor.ProcessorContext;
 import org.apache.kafka.streams.processor.ProcessorSupplier;
@@ -42,12 +49,29 @@ public class StreamsUpgradeTest {
         System.out.println("props=" + streamsProperties);
 
         final StreamsBuilder builder = new StreamsBuilder();
-        final KStream dataStream = builder.stream("data");
-        dataStream.process(printProcessorSupplier());
+        final KTable<String, Integer> dataTable = builder.table(
+            "data", Consumed.with(stringSerde, intSerde));
+        final KStream<String, Integer> dataStream = dataTable.toStream();
+        dataStream.process(printProcessorSupplier("data"));
         dataStream.to("echo");
 
+        final boolean runFkJoin = Boolean.parseBoolean(streamsProperties.getProperty(
+            "test.run_fk_join",
+            "false"));
+        if (runFkJoin) {
+            try {
+                final KTable<Integer, String> fkTable = builder.table(
+                    "fk", Consumed.with(intSerde, stringSerde));
+                buildFKTable(dataTable, fkTable);
+            } catch (final Exception e) {
+                System.err.println("Caught " + e.getMessage());
+            }
+        }
+
         final Properties config = new Properties();
-        config.setProperty(StreamsConfig.APPLICATION_ID_CONFIG, "StreamsUpgradeTest");
+        config.setProperty(
+            StreamsConfig.APPLICATION_ID_CONFIG,
+            "StreamsUpgradeTest-" + new Random().nextLong());
         config.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, 1000L);
         config.putAll(streamsProperties);
 
@@ -61,13 +85,22 @@ public class StreamsUpgradeTest {
         }));
     }
 
-    private static <K, V> ProcessorSupplier<K, V> printProcessorSupplier() {
+    private static void buildFKTable(final KTable<String, Integer> primaryTable,
+                                     final KTable<Integer, String> otherTable) {
+        final KStream<String, String> kStream = primaryTable
+            .join(otherTable, v -> v, (k0, v0) -> v0)
+            .toStream();
+        kStream.process(printProcessorSupplier("fk"));
+        kStream.to("fk-result", Produced.with(stringSerde, stringSerde));
+    }
+
+    private static <K, V> ProcessorSupplier<K, V> printProcessorSupplier(final String topic) {
         return () -> new AbstractProcessor<K, V>() {
             private int numRecordsProcessed = 0;
 
             @Override
             public void init(final ProcessorContext context) {
-                System.out.println("[2.6] initializing processor: topic=data taskId=" + context.taskId());
+                System.out.println("[2.6] initializing processor: topic=" + topic + " taskId=" + context.taskId());
                 numRecordsProcessed = 0;
             }
 
@@ -75,7 +108,7 @@ public class StreamsUpgradeTest {
             public void process(final K key, final V value) {
                 numRecordsProcessed++;
                 if (numRecordsProcessed % 100 == 0) {
-                    System.out.println("processed " + numRecordsProcessed + " records from topic=data");
+                    System.out.println("processed " + numRecordsProcessed + " records from topic=" + topic);
                 }
             }
 
diff --git a/streams/upgrade-system-tests-27/src/test/java/org/apache/kafka/streams/tests/SmokeTestDriver.java b/streams/upgrade-system-tests-27/src/test/java/org/apache/kafka/streams/tests/SmokeTestDriver.java
index ac83cd95eba..ac7482cfb2d 100644
--- a/streams/upgrade-system-tests-27/src/test/java/org/apache/kafka/streams/tests/SmokeTestDriver.java
+++ b/streams/upgrade-system-tests-27/src/test/java/org/apache/kafka/streams/tests/SmokeTestDriver.java
@@ -30,6 +30,7 @@ import org.apache.kafka.common.TopicPartition;
 import org.apache.kafka.common.errors.TimeoutException;
 import org.apache.kafka.common.serialization.ByteArraySerializer;
 import org.apache.kafka.common.serialization.Deserializer;
+import org.apache.kafka.common.serialization.Serde;
 import org.apache.kafka.common.serialization.StringDeserializer;
 import org.apache.kafka.common.utils.Exit;
 import org.apache.kafka.common.utils.Utils;
@@ -60,7 +61,7 @@ import static java.util.Collections.emptyMap;
 import static org.apache.kafka.common.utils.Utils.mkEntry;
 
 public class SmokeTestDriver extends SmokeTestUtil {
-    private static final String[] TOPICS = {
+    private static final String[] NUMERIC_VALUE_TOPICS = {
         "data",
         "echo",
         "max",
@@ -72,6 +73,14 @@ public class SmokeTestDriver extends SmokeTestUtil {
         "avg",
         "tagg"
     };
+    private static final String[] STRING_VALUE_TOPICS = {
+        "fk"
+    };
+    private static final String[] TOPICS = new String[NUMERIC_VALUE_TOPICS.length + STRING_VALUE_TOPICS.length];
+    static {
+        System.arraycopy(NUMERIC_VALUE_TOPICS, 0, TOPICS, 0, NUMERIC_VALUE_TOPICS.length);
+        System.arraycopy(STRING_VALUE_TOPICS, 0, TOPICS, NUMERIC_VALUE_TOPICS.length, STRING_VALUE_TOPICS.length);
+    }
 
     private static final int MAX_RECORD_EMPTY_RETRIES = 30;
 
@@ -130,9 +139,16 @@ public class SmokeTestDriver extends SmokeTestUtil {
                         stringSerde.serializer().serialize("", key),
                         intSerde.serializer().serialize("", value)
                     );
-
                 producer.send(record);
 
+                final ProducerRecord<byte[], byte[]> fkRecord =
+                    new ProducerRecord<>(
+                        "fk",
+                        intSerde.serializer().serialize("", value),
+                        stringSerde.serializer().serialize("", key)
+                    );
+                producer.send(fkRecord);
+
                 numRecordsProduced++;
                 if (numRecordsProduced % 100 == 0) {
                     System.out.println(Instant.now() + " " + numRecordsProduced + " records produced");
@@ -163,7 +179,8 @@ public class SmokeTestDriver extends SmokeTestUtil {
 
         final long recordPauseTime = timeToSpend.toMillis() / numKeys / maxRecordsPerKey;
 
-        List<ProducerRecord<byte[], byte[]>> needRetry = new ArrayList<>();
+        final List<ProducerRecord<byte[], byte[]>> dataNeedRetry = new ArrayList<>();
+        final List<ProducerRecord<byte[], byte[]>> fkNeedRetry = new ArrayList<>();
 
         try (final KafkaProducer<byte[], byte[]> producer = new KafkaProducer<>(producerProps)) {
             while (remaining > 0) {
@@ -175,15 +192,21 @@ public class SmokeTestDriver extends SmokeTestUtil {
                     remaining--;
                     data[index] = data[remaining];
                 } else {
-
                     final ProducerRecord<byte[], byte[]> record =
                         new ProducerRecord<>(
                             "data",
                             stringSerde.serializer().serialize("", key),
                             intSerde.serializer().serialize("", value)
                         );
+                    producer.send(record, new TestCallback(record, dataNeedRetry));
 
-                    producer.send(record, new TestCallback(record, needRetry));
+                    final ProducerRecord<byte[], byte[]> fkRecord =
+                        new ProducerRecord<>(
+                            "fk",
+                            intSerde.serializer().serialize("", value),
+                            stringSerde.serializer().serialize("", key)
+                        );
+                    producer.send(fkRecord, new TestCallback(fkRecord, fkNeedRetry));
 
                     numRecordsProduced++;
                     allData.get(key).add(value);
@@ -195,21 +218,19 @@ public class SmokeTestDriver extends SmokeTestUtil {
             }
             producer.flush();
 
-            int remainingRetries = 5;
-            while (!needRetry.isEmpty()) {
-                final List<ProducerRecord<byte[], byte[]>> needRetry2 = new ArrayList<>();
-                for (final ProducerRecord<byte[], byte[]> record : needRetry) {
-                    System.out.println("retry producing " + stringSerde.deserializer().deserialize("", record.key()));
-                    producer.send(record, new TestCallback(record, needRetry2));
-                }
-                producer.flush();
-                needRetry = needRetry2;
+            retry(producer, dataNeedRetry, stringSerde);
+            retry(producer, fkNeedRetry, intSerde);
 
-                if (--remainingRetries == 0 && !needRetry.isEmpty()) {
-                    System.err.println("Failed to produce all records after multiple retries");
-                    Exit.exit(1);
-                }
-            }
+            flush(producer,
+                "data",
+                stringSerde.serializer().serialize("", "flush"),
+                intSerde.serializer().serialize("", 0)
+            );
+            flush(producer,
+                "fk",
+                intSerde.serializer().serialize("", 0),
+                stringSerde.serializer().serialize("", "flush")
+            );
 
             // now that we've sent everything, we'll send some final records with a timestamp high enough to flush out
             // all suppressed records.
@@ -227,6 +248,44 @@ public class SmokeTestDriver extends SmokeTestUtil {
         return Collections.unmodifiableMap(allData);
     }
 
+    private static void retry(final KafkaProducer<byte[], byte[]> producer,
+        List<ProducerRecord<byte[], byte[]>> needRetry,
+        final Serde<?> keySerde) {
+        int remainingRetries = 5;
+        while (!needRetry.isEmpty()) {
+            final List<ProducerRecord<byte[], byte[]>> needRetry2 = new ArrayList<>();
+            for (final ProducerRecord<byte[], byte[]> record : needRetry) {
+                System.out.println(
+                    "retry producing " + keySerde.deserializer().deserialize("", record.key()));
+                producer.send(record, new TestCallback(record, needRetry2));
+            }
+            producer.flush();
+            needRetry = needRetry2;
+            if (--remainingRetries == 0 && !needRetry.isEmpty()) {
+                System.err.println("Failed to produce all records after multiple retries");
+                Exit.exit(1);
+            }
+        }
+    }
+
+    private static void flush(final KafkaProducer<byte[], byte[]> producer,
+        final String topic,
+        final byte[] keyBytes,
+        final byte[] valBytes) {
+        // now that we've sent everything, we'll send some final records with a timestamp high enough to flush out
+        // all suppressed records.
+        final List<PartitionInfo> partitions = producer.partitionsFor(topic);
+        for (final PartitionInfo partition : partitions) {
+            producer.send(new ProducerRecord<>(
+                partition.topic(),
+                partition.partition(),
+                System.currentTimeMillis() + Duration.ofDays(2).toMillis(),
+                keyBytes,
+                valBytes
+            ));
+        }
+    }
+
     private static Properties generatorProperties(final String kafka) {
         final Properties producerProps = new Properties();
         producerProps.put(ProducerConfig.CLIENT_ID_CONFIG, "SmokeTest");
@@ -315,14 +374,14 @@ public class SmokeTestDriver extends SmokeTestUtil {
         props.put(ConsumerConfig.ISOLATION_LEVEL_CONFIG, "read_committed");
 
         final KafkaConsumer<String, Number> consumer = new KafkaConsumer<>(props);
-        final List<TopicPartition> partitions = getAllPartitions(consumer, TOPICS);
+        final List<TopicPartition> partitions = getAllPartitions(consumer, NUMERIC_VALUE_TOPICS);
         consumer.assign(partitions);
         consumer.seekToBeginning(partitions);
 
         final int recordsGenerated = inputs.size() * maxRecordsPerKey;
         int recordsProcessed = 0;
         final Map<String, AtomicInteger> processed =
-            Stream.of(TOPICS)
+            Stream.of(NUMERIC_VALUE_TOPICS)
                   .collect(Collectors.toMap(t -> t, t -> new AtomicInteger(0)));
 
         final Map<String, Map<String, LinkedList<ConsumerRecord<String, Number>>>> events = new HashMap<>();
diff --git a/streams/upgrade-system-tests-27/src/test/java/org/apache/kafka/streams/tests/StreamsUpgradeTest.java b/streams/upgrade-system-tests-27/src/test/java/org/apache/kafka/streams/tests/StreamsUpgradeTest.java
index 6f485e694cf..32d8d9408f5 100644
--- a/streams/upgrade-system-tests-27/src/test/java/org/apache/kafka/streams/tests/StreamsUpgradeTest.java
+++ b/streams/upgrade-system-tests-27/src/test/java/org/apache/kafka/streams/tests/StreamsUpgradeTest.java
@@ -16,11 +16,18 @@
  */
 package org.apache.kafka.streams.tests;
 
+import static org.apache.kafka.streams.tests.SmokeTestUtil.intSerde;
+import static org.apache.kafka.streams.tests.SmokeTestUtil.stringSerde;
+
+import java.util.Random;
 import org.apache.kafka.common.utils.Utils;
 import org.apache.kafka.streams.KafkaStreams;
 import org.apache.kafka.streams.StreamsBuilder;
 import org.apache.kafka.streams.StreamsConfig;
+import org.apache.kafka.streams.kstream.Consumed;
 import org.apache.kafka.streams.kstream.KStream;
+import org.apache.kafka.streams.kstream.KTable;
+import org.apache.kafka.streams.kstream.Produced;
 import org.apache.kafka.streams.processor.AbstractProcessor;
 import org.apache.kafka.streams.processor.ProcessorContext;
 import org.apache.kafka.streams.processor.ProcessorSupplier;
@@ -42,12 +49,29 @@ public class StreamsUpgradeTest {
         System.out.println("props=" + streamsProperties);
 
         final StreamsBuilder builder = new StreamsBuilder();
-        final KStream dataStream = builder.stream("data");
-        dataStream.process(printProcessorSupplier());
+        final KTable<String, Integer> dataTable = builder.table(
+            "data", Consumed.with(stringSerde, intSerde));
+        final KStream<String, Integer> dataStream = dataTable.toStream();
+        dataStream.process(printProcessorSupplier("data"));
         dataStream.to("echo");
 
+        final boolean runFkJoin = Boolean.parseBoolean(streamsProperties.getProperty(
+            "test.run_fk_join",
+            "false"));
+        if (runFkJoin) {
+            try {
+                final KTable<Integer, String> fkTable = builder.table(
+                    "fk", Consumed.with(intSerde, stringSerde));
+                buildFKTable(dataTable, fkTable);
+            } catch (final Exception e) {
+                System.err.println("Caught " + e.getMessage());
+            }
+        }
+
         final Properties config = new Properties();
-        config.setProperty(StreamsConfig.APPLICATION_ID_CONFIG, "StreamsUpgradeTest");
+        config.setProperty(
+            StreamsConfig.APPLICATION_ID_CONFIG,
+            "StreamsUpgradeTest-" + new Random().nextLong());
         config.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, 1000L);
         config.putAll(streamsProperties);
 
@@ -61,13 +85,22 @@ public class StreamsUpgradeTest {
         }));
     }
 
-    private static <K, V> ProcessorSupplier<K, V> printProcessorSupplier() {
+    private static void buildFKTable(final KTable<String, Integer> primaryTable,
+                                     final KTable<Integer, String> otherTable) {
+        final KStream<String, String> kStream = primaryTable
+            .join(otherTable, v -> v, (k0, v0) -> v0)
+            .toStream();
+        kStream.process(printProcessorSupplier("fk"));
+        kStream.to("fk-result", Produced.with(stringSerde, stringSerde));
+    }
+
+    private static <K, V> ProcessorSupplier<K, V> printProcessorSupplier(final String topic) {
         return () -> new AbstractProcessor<K, V>() {
             private int numRecordsProcessed = 0;
 
             @Override
             public void init(final ProcessorContext context) {
-                System.out.println("[2.7] initializing processor: topic=data taskId=" + context.taskId());
+                System.out.println("[2.7] initializing processor: topic=" + topic + " taskId=" + context.taskId());
                 numRecordsProcessed = 0;
             }
 
@@ -75,7 +108,7 @@ public class StreamsUpgradeTest {
             public void process(final K key, final V value) {
                 numRecordsProcessed++;
                 if (numRecordsProcessed % 100 == 0) {
-                    System.out.println("processed " + numRecordsProcessed + " records from topic=data");
+                    System.out.println("processed " + numRecordsProcessed + " records from topic=" + topic);
                 }
             }
 
diff --git a/streams/upgrade-system-tests-28/src/test/java/org/apache/kafka/streams/tests/SmokeTestDriver.java b/streams/upgrade-system-tests-28/src/test/java/org/apache/kafka/streams/tests/SmokeTestDriver.java
index ac83cd95eba..4dae6eae575 100644
--- a/streams/upgrade-system-tests-28/src/test/java/org/apache/kafka/streams/tests/SmokeTestDriver.java
+++ b/streams/upgrade-system-tests-28/src/test/java/org/apache/kafka/streams/tests/SmokeTestDriver.java
@@ -30,6 +30,7 @@ import org.apache.kafka.common.TopicPartition;
 import org.apache.kafka.common.errors.TimeoutException;
 import org.apache.kafka.common.serialization.ByteArraySerializer;
 import org.apache.kafka.common.serialization.Deserializer;
+import org.apache.kafka.common.serialization.Serde;
 import org.apache.kafka.common.serialization.StringDeserializer;
 import org.apache.kafka.common.utils.Exit;
 import org.apache.kafka.common.utils.Utils;
@@ -60,7 +61,7 @@ import static java.util.Collections.emptyMap;
 import static org.apache.kafka.common.utils.Utils.mkEntry;
 
 public class SmokeTestDriver extends SmokeTestUtil {
-    private static final String[] TOPICS = {
+    private static final String[] NUMERIC_VALUE_TOPICS = {
         "data",
         "echo",
         "max",
@@ -72,6 +73,14 @@ public class SmokeTestDriver extends SmokeTestUtil {
         "avg",
         "tagg"
     };
+    private static final String[] STRING_VALUE_TOPICS = {
+        "fk"
+    };
+    private static final String[] TOPICS = new String[NUMERIC_VALUE_TOPICS.length + STRING_VALUE_TOPICS.length];
+    static {
+        System.arraycopy(NUMERIC_VALUE_TOPICS, 0, TOPICS, 0, NUMERIC_VALUE_TOPICS.length);
+        System.arraycopy(STRING_VALUE_TOPICS, 0, TOPICS, NUMERIC_VALUE_TOPICS.length, STRING_VALUE_TOPICS.length);
+    }
 
     private static final int MAX_RECORD_EMPTY_RETRIES = 30;
 
@@ -130,9 +139,16 @@ public class SmokeTestDriver extends SmokeTestUtil {
                         stringSerde.serializer().serialize("", key),
                         intSerde.serializer().serialize("", value)
                     );
-
                 producer.send(record);
 
+                final ProducerRecord<byte[], byte[]> fkRecord =
+                    new ProducerRecord<>(
+                        "fk",
+                        intSerde.serializer().serialize("", value),
+                        stringSerde.serializer().serialize("", key)
+                    );
+                producer.send(fkRecord);
+
                 numRecordsProduced++;
                 if (numRecordsProduced % 100 == 0) {
                     System.out.println(Instant.now() + " " + numRecordsProduced + " records produced");
@@ -163,7 +179,8 @@ public class SmokeTestDriver extends SmokeTestUtil {
 
         final long recordPauseTime = timeToSpend.toMillis() / numKeys / maxRecordsPerKey;
 
-        List<ProducerRecord<byte[], byte[]>> needRetry = new ArrayList<>();
+        final List<ProducerRecord<byte[], byte[]>> dataNeedRetry = new ArrayList<>();
+        final List<ProducerRecord<byte[], byte[]>> fkNeedRetry = new ArrayList<>();
 
         try (final KafkaProducer<byte[], byte[]> producer = new KafkaProducer<>(producerProps)) {
             while (remaining > 0) {
@@ -175,15 +192,21 @@ public class SmokeTestDriver extends SmokeTestUtil {
                     remaining--;
                     data[index] = data[remaining];
                 } else {
-
                     final ProducerRecord<byte[], byte[]> record =
                         new ProducerRecord<>(
                             "data",
                             stringSerde.serializer().serialize("", key),
                             intSerde.serializer().serialize("", value)
                         );
+                    producer.send(record, new TestCallback(record, dataNeedRetry));
 
-                    producer.send(record, new TestCallback(record, needRetry));
+                    final ProducerRecord<byte[], byte[]> fkRecord =
+                        new ProducerRecord<>(
+                            "fk",
+                            intSerde.serializer().serialize("", value),
+                            stringSerde.serializer().serialize("", key)
+                        );
+                    producer.send(fkRecord, new TestCallback(fkRecord, fkNeedRetry));
 
                     numRecordsProduced++;
                     allData.get(key).add(value);
@@ -195,36 +218,59 @@ public class SmokeTestDriver extends SmokeTestUtil {
             }
             producer.flush();
 
-            int remainingRetries = 5;
-            while (!needRetry.isEmpty()) {
-                final List<ProducerRecord<byte[], byte[]>> needRetry2 = new ArrayList<>();
-                for (final ProducerRecord<byte[], byte[]> record : needRetry) {
-                    System.out.println("retry producing " + stringSerde.deserializer().deserialize("", record.key()));
-                    producer.send(record, new TestCallback(record, needRetry2));
-                }
-                producer.flush();
-                needRetry = needRetry2;
+            retry(producer, dataNeedRetry, stringSerde);
+            retry(producer, fkNeedRetry, intSerde);
+
+            flush(producer,
+                "data",
+                stringSerde.serializer().serialize("", "flush"),
+                intSerde.serializer().serialize("", 0)
+            );
+            flush(producer,
+                "fk",
+                intSerde.serializer().serialize("", 0),
+                stringSerde.serializer().serialize("", "flush")
+            );
+        }
+        return Collections.unmodifiableMap(allData);
+    }
 
-                if (--remainingRetries == 0 && !needRetry.isEmpty()) {
-                    System.err.println("Failed to produce all records after multiple retries");
-                    Exit.exit(1);
-                }
+    private static void retry(final KafkaProducer<byte[], byte[]> producer,
+        List<ProducerRecord<byte[], byte[]>> needRetry,
+        final Serde<?> keySerde) {
+        int remainingRetries = 5;
+        while (!needRetry.isEmpty()) {
+            final List<ProducerRecord<byte[], byte[]>> needRetry2 = new ArrayList<>();
+            for (final ProducerRecord<byte[], byte[]> record : needRetry) {
+                System.out.println(
+                    "retry producing " + keySerde.deserializer().deserialize("", record.key()));
+                producer.send(record, new TestCallback(record, needRetry2));
             }
-
-            // now that we've sent everything, we'll send some final records with a timestamp high enough to flush out
-            // all suppressed records.
-            final List<PartitionInfo> partitions = producer.partitionsFor("data");
-            for (final PartitionInfo partition : partitions) {
-                producer.send(new ProducerRecord<>(
-                    partition.topic(),
-                    partition.partition(),
-                    System.currentTimeMillis() + Duration.ofDays(2).toMillis(),
-                    stringSerde.serializer().serialize("", "flush"),
-                    intSerde.serializer().serialize("", 0)
-                ));
+            producer.flush();
+            needRetry = needRetry2;
+            if (--remainingRetries == 0 && !needRetry.isEmpty()) {
+                System.err.println("Failed to produce all records after multiple retries");
+                Exit.exit(1);
             }
         }
-        return Collections.unmodifiableMap(allData);
+    }
+
+    private static void flush(final KafkaProducer<byte[], byte[]> producer,
+        final String topic,
+        final byte[] keyBytes,
+        final byte[] valBytes) {
+        // now that we've sent everything, we'll send some final records with a timestamp high enough to flush out
+        // all suppressed records.
+        final List<PartitionInfo> partitions = producer.partitionsFor(topic);
+        for (final PartitionInfo partition : partitions) {
+            producer.send(new ProducerRecord<>(
+                partition.topic(),
+                partition.partition(),
+                System.currentTimeMillis() + Duration.ofDays(2).toMillis(),
+                keyBytes,
+                valBytes
+            ));
+        }
     }
 
     private static Properties generatorProperties(final String kafka) {
@@ -315,14 +361,14 @@ public class SmokeTestDriver extends SmokeTestUtil {
         props.put(ConsumerConfig.ISOLATION_LEVEL_CONFIG, "read_committed");
 
         final KafkaConsumer<String, Number> consumer = new KafkaConsumer<>(props);
-        final List<TopicPartition> partitions = getAllPartitions(consumer, TOPICS);
+        final List<TopicPartition> partitions = getAllPartitions(consumer, NUMERIC_VALUE_TOPICS);
         consumer.assign(partitions);
         consumer.seekToBeginning(partitions);
 
         final int recordsGenerated = inputs.size() * maxRecordsPerKey;
         int recordsProcessed = 0;
         final Map<String, AtomicInteger> processed =
-            Stream.of(TOPICS)
+            Stream.of(NUMERIC_VALUE_TOPICS)
                   .collect(Collectors.toMap(t -> t, t -> new AtomicInteger(0)));
 
         final Map<String, Map<String, LinkedList<ConsumerRecord<String, Number>>>> events = new HashMap<>();
diff --git a/streams/upgrade-system-tests-28/src/test/java/org/apache/kafka/streams/tests/StreamsUpgradeTest.java b/streams/upgrade-system-tests-28/src/test/java/org/apache/kafka/streams/tests/StreamsUpgradeTest.java
index 4f2825d23d6..db17d73bcba 100644
--- a/streams/upgrade-system-tests-28/src/test/java/org/apache/kafka/streams/tests/StreamsUpgradeTest.java
+++ b/streams/upgrade-system-tests-28/src/test/java/org/apache/kafka/streams/tests/StreamsUpgradeTest.java
@@ -16,11 +16,18 @@
  */
 package org.apache.kafka.streams.tests;
 
+import static org.apache.kafka.streams.tests.SmokeTestUtil.intSerde;
+import static org.apache.kafka.streams.tests.SmokeTestUtil.stringSerde;
+
+import java.util.Random;
 import org.apache.kafka.common.utils.Utils;
 import org.apache.kafka.streams.KafkaStreams;
 import org.apache.kafka.streams.StreamsBuilder;
 import org.apache.kafka.streams.StreamsConfig;
+import org.apache.kafka.streams.kstream.Consumed;
 import org.apache.kafka.streams.kstream.KStream;
+import org.apache.kafka.streams.kstream.KTable;
+import org.apache.kafka.streams.kstream.Produced;
 import org.apache.kafka.streams.processor.AbstractProcessor;
 import org.apache.kafka.streams.processor.ProcessorContext;
 import org.apache.kafka.streams.processor.ProcessorSupplier;
@@ -42,12 +49,29 @@ public class StreamsUpgradeTest {
         System.out.println("props=" + streamsProperties);
 
         final StreamsBuilder builder = new StreamsBuilder();
-        final KStream dataStream = builder.stream("data");
-        dataStream.process(printProcessorSupplier());
+        final KTable<String, Integer> dataTable = builder.table(
+            "data", Consumed.with(stringSerde, intSerde));
+        final KStream<String, Integer> dataStream = dataTable.toStream();
+        dataStream.process(printProcessorSupplier("data"));
         dataStream.to("echo");
 
+        final boolean runFkJoin = Boolean.parseBoolean(streamsProperties.getProperty(
+            "test.run_fk_join",
+            "false"));
+        if (runFkJoin) {
+            try {
+                final KTable<Integer, String> fkTable = builder.table(
+                    "fk", Consumed.with(intSerde, stringSerde));
+                buildFKTable(dataTable, fkTable);
+            } catch (final Exception e) {
+                System.err.println("Caught " + e.getMessage());
+            }
+        }
+
         final Properties config = new Properties();
-        config.setProperty(StreamsConfig.APPLICATION_ID_CONFIG, "StreamsUpgradeTest");
+        config.setProperty(
+            StreamsConfig.APPLICATION_ID_CONFIG,
+            "StreamsUpgradeTest-" + new Random().nextLong());
         config.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, 1000);
         config.putAll(streamsProperties);
 
@@ -61,13 +85,22 @@ public class StreamsUpgradeTest {
         }));
     }
 
-    private static <K, V> ProcessorSupplier<K, V> printProcessorSupplier() {
+    private static void buildFKTable(final KTable<String, Integer> primaryTable,
+        final KTable<Integer, String> otherTable) {
+        final KStream<String, String> kStream = primaryTable
+            .join(otherTable, v -> v, (k0, v0) -> v0)
+            .toStream();
+        kStream.process(printProcessorSupplier("fk"));
+        kStream.to("fk-result", Produced.with(stringSerde, stringSerde));
+    }
+
+    private static <K, V> ProcessorSupplier<K, V> printProcessorSupplier(final String topic) {
         return () -> new AbstractProcessor<K, V>() {
             private int numRecordsProcessed = 0;
 
             @Override
             public void init(final ProcessorContext context) {
-                System.out.println("[2.8] initializing processor: topic=data taskId=" + context.taskId());
+                System.out.println("[2.8] initializing processor: topic=" + topic + " taskId=" + context.taskId());
                 numRecordsProcessed = 0;
             }
 
@@ -75,7 +108,7 @@ public class StreamsUpgradeTest {
             public void process(final K key, final V value) {
                 numRecordsProcessed++;
                 if (numRecordsProcessed % 100 == 0) {
-                    System.out.println("processed " + numRecordsProcessed + " records from topic=data");
+                    System.out.println("processed " + numRecordsProcessed + " records from topic=" + topic);
                 }
             }
 
diff --git a/streams/upgrade-system-tests-30/src/test/java/org/apache/kafka/streams/tests/SmokeTestDriver.java b/streams/upgrade-system-tests-30/src/test/java/org/apache/kafka/streams/tests/SmokeTestDriver.java
index ac83cd95eba..4dae6eae575 100644
--- a/streams/upgrade-system-tests-30/src/test/java/org/apache/kafka/streams/tests/SmokeTestDriver.java
+++ b/streams/upgrade-system-tests-30/src/test/java/org/apache/kafka/streams/tests/SmokeTestDriver.java
@@ -30,6 +30,7 @@ import org.apache.kafka.common.TopicPartition;
 import org.apache.kafka.common.errors.TimeoutException;
 import org.apache.kafka.common.serialization.ByteArraySerializer;
 import org.apache.kafka.common.serialization.Deserializer;
+import org.apache.kafka.common.serialization.Serde;
 import org.apache.kafka.common.serialization.StringDeserializer;
 import org.apache.kafka.common.utils.Exit;
 import org.apache.kafka.common.utils.Utils;
@@ -60,7 +61,7 @@ import static java.util.Collections.emptyMap;
 import static org.apache.kafka.common.utils.Utils.mkEntry;
 
 public class SmokeTestDriver extends SmokeTestUtil {
-    private static final String[] TOPICS = {
+    private static final String[] NUMERIC_VALUE_TOPICS = {
         "data",
         "echo",
         "max",
@@ -72,6 +73,14 @@ public class SmokeTestDriver extends SmokeTestUtil {
         "avg",
         "tagg"
     };
+    private static final String[] STRING_VALUE_TOPICS = {
+        "fk"
+    };
+    private static final String[] TOPICS = new String[NUMERIC_VALUE_TOPICS.length + STRING_VALUE_TOPICS.length];
+    static {
+        System.arraycopy(NUMERIC_VALUE_TOPICS, 0, TOPICS, 0, NUMERIC_VALUE_TOPICS.length);
+        System.arraycopy(STRING_VALUE_TOPICS, 0, TOPICS, NUMERIC_VALUE_TOPICS.length, STRING_VALUE_TOPICS.length);
+    }
 
     private static final int MAX_RECORD_EMPTY_RETRIES = 30;
 
@@ -130,9 +139,16 @@ public class SmokeTestDriver extends SmokeTestUtil {
                         stringSerde.serializer().serialize("", key),
                         intSerde.serializer().serialize("", value)
                     );
-
                 producer.send(record);
 
+                final ProducerRecord<byte[], byte[]> fkRecord =
+                    new ProducerRecord<>(
+                        "fk",
+                        intSerde.serializer().serialize("", value),
+                        stringSerde.serializer().serialize("", key)
+                    );
+                producer.send(fkRecord);
+
                 numRecordsProduced++;
                 if (numRecordsProduced % 100 == 0) {
                     System.out.println(Instant.now() + " " + numRecordsProduced + " records produced");
@@ -163,7 +179,8 @@ public class SmokeTestDriver extends SmokeTestUtil {
 
         final long recordPauseTime = timeToSpend.toMillis() / numKeys / maxRecordsPerKey;
 
-        List<ProducerRecord<byte[], byte[]>> needRetry = new ArrayList<>();
+        final List<ProducerRecord<byte[], byte[]>> dataNeedRetry = new ArrayList<>();
+        final List<ProducerRecord<byte[], byte[]>> fkNeedRetry = new ArrayList<>();
 
         try (final KafkaProducer<byte[], byte[]> producer = new KafkaProducer<>(producerProps)) {
             while (remaining > 0) {
@@ -175,15 +192,21 @@ public class SmokeTestDriver extends SmokeTestUtil {
                     remaining--;
                     data[index] = data[remaining];
                 } else {
-
                     final ProducerRecord<byte[], byte[]> record =
                         new ProducerRecord<>(
                             "data",
                             stringSerde.serializer().serialize("", key),
                             intSerde.serializer().serialize("", value)
                         );
+                    producer.send(record, new TestCallback(record, dataNeedRetry));
 
-                    producer.send(record, new TestCallback(record, needRetry));
+                    final ProducerRecord<byte[], byte[]> fkRecord =
+                        new ProducerRecord<>(
+                            "fk",
+                            intSerde.serializer().serialize("", value),
+                            stringSerde.serializer().serialize("", key)
+                        );
+                    producer.send(fkRecord, new TestCallback(fkRecord, fkNeedRetry));
 
                     numRecordsProduced++;
                     allData.get(key).add(value);
@@ -195,36 +218,59 @@ public class SmokeTestDriver extends SmokeTestUtil {
             }
             producer.flush();
 
-            int remainingRetries = 5;
-            while (!needRetry.isEmpty()) {
-                final List<ProducerRecord<byte[], byte[]>> needRetry2 = new ArrayList<>();
-                for (final ProducerRecord<byte[], byte[]> record : needRetry) {
-                    System.out.println("retry producing " + stringSerde.deserializer().deserialize("", record.key()));
-                    producer.send(record, new TestCallback(record, needRetry2));
-                }
-                producer.flush();
-                needRetry = needRetry2;
+            retry(producer, dataNeedRetry, stringSerde);
+            retry(producer, fkNeedRetry, intSerde);
+
+            flush(producer,
+                "data",
+                stringSerde.serializer().serialize("", "flush"),
+                intSerde.serializer().serialize("", 0)
+            );
+            flush(producer,
+                "fk",
+                intSerde.serializer().serialize("", 0),
+                stringSerde.serializer().serialize("", "flush")
+            );
+        }
+        return Collections.unmodifiableMap(allData);
+    }
 
-                if (--remainingRetries == 0 && !needRetry.isEmpty()) {
-                    System.err.println("Failed to produce all records after multiple retries");
-                    Exit.exit(1);
-                }
+    private static void retry(final KafkaProducer<byte[], byte[]> producer,
+        List<ProducerRecord<byte[], byte[]>> needRetry,
+        final Serde<?> keySerde) {
+        int remainingRetries = 5;
+        while (!needRetry.isEmpty()) {
+            final List<ProducerRecord<byte[], byte[]>> needRetry2 = new ArrayList<>();
+            for (final ProducerRecord<byte[], byte[]> record : needRetry) {
+                System.out.println(
+                    "retry producing " + keySerde.deserializer().deserialize("", record.key()));
+                producer.send(record, new TestCallback(record, needRetry2));
             }
-
-            // now that we've sent everything, we'll send some final records with a timestamp high enough to flush out
-            // all suppressed records.
-            final List<PartitionInfo> partitions = producer.partitionsFor("data");
-            for (final PartitionInfo partition : partitions) {
-                producer.send(new ProducerRecord<>(
-                    partition.topic(),
-                    partition.partition(),
-                    System.currentTimeMillis() + Duration.ofDays(2).toMillis(),
-                    stringSerde.serializer().serialize("", "flush"),
-                    intSerde.serializer().serialize("", 0)
-                ));
+            producer.flush();
+            needRetry = needRetry2;
+            if (--remainingRetries == 0 && !needRetry.isEmpty()) {
+                System.err.println("Failed to produce all records after multiple retries");
+                Exit.exit(1);
             }
         }
-        return Collections.unmodifiableMap(allData);
+    }
+
+    private static void flush(final KafkaProducer<byte[], byte[]> producer,
+        final String topic,
+        final byte[] keyBytes,
+        final byte[] valBytes) {
+        // now that we've sent everything, we'll send some final records with a timestamp high enough to flush out
+        // all suppressed records.
+        final List<PartitionInfo> partitions = producer.partitionsFor(topic);
+        for (final PartitionInfo partition : partitions) {
+            producer.send(new ProducerRecord<>(
+                partition.topic(),
+                partition.partition(),
+                System.currentTimeMillis() + Duration.ofDays(2).toMillis(),
+                keyBytes,
+                valBytes
+            ));
+        }
     }
 
     private static Properties generatorProperties(final String kafka) {
@@ -315,14 +361,14 @@ public class SmokeTestDriver extends SmokeTestUtil {
         props.put(ConsumerConfig.ISOLATION_LEVEL_CONFIG, "read_committed");
 
         final KafkaConsumer<String, Number> consumer = new KafkaConsumer<>(props);
-        final List<TopicPartition> partitions = getAllPartitions(consumer, TOPICS);
+        final List<TopicPartition> partitions = getAllPartitions(consumer, NUMERIC_VALUE_TOPICS);
         consumer.assign(partitions);
         consumer.seekToBeginning(partitions);
 
         final int recordsGenerated = inputs.size() * maxRecordsPerKey;
         int recordsProcessed = 0;
         final Map<String, AtomicInteger> processed =
-            Stream.of(TOPICS)
+            Stream.of(NUMERIC_VALUE_TOPICS)
                   .collect(Collectors.toMap(t -> t, t -> new AtomicInteger(0)));
 
         final Map<String, Map<String, LinkedList<ConsumerRecord<String, Number>>>> events = new HashMap<>();
diff --git a/streams/upgrade-system-tests-30/src/test/java/org/apache/kafka/streams/tests/StreamsUpgradeTest.java b/streams/upgrade-system-tests-30/src/test/java/org/apache/kafka/streams/tests/StreamsUpgradeTest.java
index b097de71d41..0751516d76c 100644
--- a/streams/upgrade-system-tests-30/src/test/java/org/apache/kafka/streams/tests/StreamsUpgradeTest.java
+++ b/streams/upgrade-system-tests-30/src/test/java/org/apache/kafka/streams/tests/StreamsUpgradeTest.java
@@ -16,11 +16,18 @@
  */
 package org.apache.kafka.streams.tests;
 
+import static org.apache.kafka.streams.tests.SmokeTestUtil.intSerde;
+import static org.apache.kafka.streams.tests.SmokeTestUtil.stringSerde;
+
+import java.util.Random;
 import org.apache.kafka.common.utils.Utils;
 import org.apache.kafka.streams.KafkaStreams;
 import org.apache.kafka.streams.StreamsBuilder;
 import org.apache.kafka.streams.StreamsConfig;
+import org.apache.kafka.streams.kstream.Consumed;
 import org.apache.kafka.streams.kstream.KStream;
+import org.apache.kafka.streams.kstream.KTable;
+import org.apache.kafka.streams.kstream.Produced;
 import org.apache.kafka.streams.processor.api.ContextualProcessor;
 import org.apache.kafka.streams.processor.api.ProcessorContext;
 import org.apache.kafka.streams.processor.api.ProcessorSupplier;
@@ -44,12 +51,29 @@ public class StreamsUpgradeTest {
         System.out.println("props=" + streamsProperties);
 
         final StreamsBuilder builder = new StreamsBuilder();
-        final KStream dataStream = builder.stream("data");
-        dataStream.process(printProcessorSupplier());
+        final KTable<String, Integer> dataTable = builder.table(
+            "data", Consumed.with(stringSerde, intSerde));
+        final KStream<String, Integer> dataStream = dataTable.toStream();
+        dataStream.process(printProcessorSupplier("data"));
         dataStream.to("echo");
 
+        final boolean runFkJoin = Boolean.parseBoolean(streamsProperties.getProperty(
+            "test.run_fk_join",
+            "false"));
+        if (runFkJoin) {
+            try {
+                final KTable<Integer, String> fkTable = builder.table(
+                    "fk", Consumed.with(intSerde, stringSerde));
+                buildFKTable(dataTable, fkTable);
+            } catch (final Exception e) {
+                System.err.println("Caught " + e.getMessage());
+            }
+        }
+
         final Properties config = new Properties();
-        config.setProperty(StreamsConfig.APPLICATION_ID_CONFIG, "StreamsUpgradeTest");
+        config.setProperty(
+            StreamsConfig.APPLICATION_ID_CONFIG,
+            "StreamsUpgradeTest-" + new Random().nextLong());
         config.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, 1000);
         config.putAll(streamsProperties);
 
@@ -63,13 +87,22 @@ public class StreamsUpgradeTest {
         }));
     }
 
-    private static <KIn, VIn, KOut, VOut> ProcessorSupplier<KIn, VIn, KOut, VOut> printProcessorSupplier() {
+    private static void buildFKTable(final KTable<String, Integer> primaryTable,
+        final KTable<Integer, String> otherTable) {
+        final KStream<String, String> kStream = primaryTable
+            .join(otherTable, v -> v, (k0, v0) -> v0)
+            .toStream();
+        kStream.process(printProcessorSupplier("fk"));
+        kStream.to("fk-result", Produced.with(stringSerde, stringSerde));
+    }
+
+    private static <KIn, VIn, KOut, VOut> ProcessorSupplier<KIn, VIn, KOut, VOut> printProcessorSupplier(final String topic) {
         return () -> new ContextualProcessor<KIn, VIn, KOut, VOut>() {
             private int numRecordsProcessed = 0;
 
             @Override
             public void init(final ProcessorContext<KOut, VOut> context) {
-                System.out.println("[3.0] initializing processor: topic=data taskId=" + context.taskId());
+                System.out.println("[3.0] initializing processor: topic=" + topic + " taskId=" + context.taskId());
                 numRecordsProcessed = 0;
             }
 
@@ -77,7 +110,7 @@ public class StreamsUpgradeTest {
             public void process(final Record<KIn, VIn> record) {
                 numRecordsProcessed++;
                 if (numRecordsProcessed % 100 == 0) {
-                    System.out.println("processed " + numRecordsProcessed + " records from topic=data");
+                    System.out.println("processed " + numRecordsProcessed + " records from topic=" + topic);
                 }
             }
 
diff --git a/streams/upgrade-system-tests-31/src/test/java/org/apache/kafka/streams/tests/SmokeTestDriver.java b/streams/upgrade-system-tests-31/src/test/java/org/apache/kafka/streams/tests/SmokeTestDriver.java
index ac83cd95eba..dbacbb9625b 100644
--- a/streams/upgrade-system-tests-31/src/test/java/org/apache/kafka/streams/tests/SmokeTestDriver.java
+++ b/streams/upgrade-system-tests-31/src/test/java/org/apache/kafka/streams/tests/SmokeTestDriver.java
@@ -30,6 +30,7 @@ import org.apache.kafka.common.TopicPartition;
 import org.apache.kafka.common.errors.TimeoutException;
 import org.apache.kafka.common.serialization.ByteArraySerializer;
 import org.apache.kafka.common.serialization.Deserializer;
+import org.apache.kafka.common.serialization.Serde;
 import org.apache.kafka.common.serialization.StringDeserializer;
 import org.apache.kafka.common.utils.Exit;
 import org.apache.kafka.common.utils.Utils;
@@ -60,7 +61,7 @@ import static java.util.Collections.emptyMap;
 import static org.apache.kafka.common.utils.Utils.mkEntry;
 
 public class SmokeTestDriver extends SmokeTestUtil {
-    private static final String[] TOPICS = {
+    private static final String[] NUMERIC_VALUE_TOPICS = {
         "data",
         "echo",
         "max",
@@ -72,6 +73,15 @@ public class SmokeTestDriver extends SmokeTestUtil {
         "avg",
         "tagg"
     };
+    private static final String[] STRING_VALUE_TOPICS = {
+        "fk"
+    };
+
+    private static final String[] TOPICS = new String[NUMERIC_VALUE_TOPICS.length + STRING_VALUE_TOPICS.length];
+    static {
+        System.arraycopy(NUMERIC_VALUE_TOPICS, 0, TOPICS, 0, NUMERIC_VALUE_TOPICS.length);
+        System.arraycopy(STRING_VALUE_TOPICS, 0, TOPICS, NUMERIC_VALUE_TOPICS.length, STRING_VALUE_TOPICS.length);
+    }
 
     private static final int MAX_RECORD_EMPTY_RETRIES = 30;
 
@@ -130,9 +140,16 @@ public class SmokeTestDriver extends SmokeTestUtil {
                         stringSerde.serializer().serialize("", key),
                         intSerde.serializer().serialize("", value)
                     );
-
                 producer.send(record);
 
+                final ProducerRecord<byte[], byte[]> fkRecord =
+                    new ProducerRecord<>(
+                        "fk",
+                        intSerde.serializer().serialize("", value),
+                        stringSerde.serializer().serialize("", key)
+                    );
+                producer.send(fkRecord);
+
                 numRecordsProduced++;
                 if (numRecordsProduced % 100 == 0) {
                     System.out.println(Instant.now() + " " + numRecordsProduced + " records produced");
@@ -148,7 +165,6 @@ public class SmokeTestDriver extends SmokeTestUtil {
                                                      final Duration timeToSpend) {
         final Properties producerProps = generatorProperties(kafka);
 
-
         int numRecordsProduced = 0;
 
         final Map<String, Set<Integer>> allData = new HashMap<>();
@@ -163,7 +179,8 @@ public class SmokeTestDriver extends SmokeTestUtil {
 
         final long recordPauseTime = timeToSpend.toMillis() / numKeys / maxRecordsPerKey;
 
-        List<ProducerRecord<byte[], byte[]>> needRetry = new ArrayList<>();
+        final List<ProducerRecord<byte[], byte[]>> dataNeedRetry = new ArrayList<>();
+        final List<ProducerRecord<byte[], byte[]>> fkNeedRetry = new ArrayList<>();
 
         try (final KafkaProducer<byte[], byte[]> producer = new KafkaProducer<>(producerProps)) {
             while (remaining > 0) {
@@ -175,7 +192,6 @@ public class SmokeTestDriver extends SmokeTestUtil {
                     remaining--;
                     data[index] = data[remaining];
                 } else {
-
                     final ProducerRecord<byte[], byte[]> record =
                         new ProducerRecord<>(
                             "data",
@@ -183,7 +199,16 @@ public class SmokeTestDriver extends SmokeTestUtil {
                             intSerde.serializer().serialize("", value)
                         );
 
-                    producer.send(record, new TestCallback(record, needRetry));
+                    producer.send(record, new TestCallback(record, dataNeedRetry));
+
+                    final ProducerRecord<byte[], byte[]> fkRecord =
+                        new ProducerRecord<>(
+                            "fk",
+                            intSerde.serializer().serialize("", value),
+                            stringSerde.serializer().serialize("", key)
+                        );
+
+                    producer.send(fkRecord, new TestCallback(fkRecord, fkNeedRetry));
 
                     numRecordsProduced++;
                     allData.get(key).add(value);
@@ -195,36 +220,59 @@ public class SmokeTestDriver extends SmokeTestUtil {
             }
             producer.flush();
 
-            int remainingRetries = 5;
-            while (!needRetry.isEmpty()) {
-                final List<ProducerRecord<byte[], byte[]>> needRetry2 = new ArrayList<>();
-                for (final ProducerRecord<byte[], byte[]> record : needRetry) {
-                    System.out.println("retry producing " + stringSerde.deserializer().deserialize("", record.key()));
-                    producer.send(record, new TestCallback(record, needRetry2));
-                }
-                producer.flush();
-                needRetry = needRetry2;
+            retry(producer, dataNeedRetry, stringSerde);
+            retry(producer, fkNeedRetry, intSerde);
+
+            flush(producer,
+                "data",
+                stringSerde.serializer().serialize("", "flush"),
+                intSerde.serializer().serialize("", 0)
+            );
+            flush(producer,
+                "fk",
+                intSerde.serializer().serialize("", 0),
+                stringSerde.serializer().serialize("", "flush")
+            );
+        }
+        return Collections.unmodifiableMap(allData);
+    }
 
-                if (--remainingRetries == 0 && !needRetry.isEmpty()) {
-                    System.err.println("Failed to produce all records after multiple retries");
-                    Exit.exit(1);
-                }
+    private static void retry(final KafkaProducer<byte[], byte[]> producer,
+        List<ProducerRecord<byte[], byte[]>> needRetry,
+        final Serde<?> keySerde) {
+        int remainingRetries = 5;
+        while (!needRetry.isEmpty()) {
+            final List<ProducerRecord<byte[], byte[]>> needRetry2 = new ArrayList<>();
+            for (final ProducerRecord<byte[], byte[]> record : needRetry) {
+                System.out.println(
+                    "retry producing " + keySerde.deserializer().deserialize("", record.key()));
+                producer.send(record, new TestCallback(record, needRetry2));
             }
-
-            // now that we've sent everything, we'll send some final records with a timestamp high enough to flush out
-            // all suppressed records.
-            final List<PartitionInfo> partitions = producer.partitionsFor("data");
-            for (final PartitionInfo partition : partitions) {
-                producer.send(new ProducerRecord<>(
-                    partition.topic(),
-                    partition.partition(),
-                    System.currentTimeMillis() + Duration.ofDays(2).toMillis(),
-                    stringSerde.serializer().serialize("", "flush"),
-                    intSerde.serializer().serialize("", 0)
-                ));
+            producer.flush();
+            needRetry = needRetry2;
+            if (--remainingRetries == 0 && !needRetry.isEmpty()) {
+                System.err.println("Failed to produce all records after multiple retries");
+                Exit.exit(1);
             }
         }
-        return Collections.unmodifiableMap(allData);
+    }
+
+    private static void flush(final KafkaProducer<byte[], byte[]> producer,
+        final String topic,
+        final byte[] keyBytes,
+        final byte[] valBytes) {
+        // now that we've sent everything, we'll send some final records with a timestamp high enough to flush out
+        // all suppressed records.
+        final List<PartitionInfo> partitions = producer.partitionsFor(topic);
+        for (final PartitionInfo partition : partitions) {
+            producer.send(new ProducerRecord<>(
+                partition.topic(),
+                partition.partition(),
+                System.currentTimeMillis() + Duration.ofDays(2).toMillis(),
+                keyBytes,
+                valBytes
+            ));
+        }
     }
 
     private static Properties generatorProperties(final String kafka) {
@@ -315,14 +363,14 @@ public class SmokeTestDriver extends SmokeTestUtil {
         props.put(ConsumerConfig.ISOLATION_LEVEL_CONFIG, "read_committed");
 
         final KafkaConsumer<String, Number> consumer = new KafkaConsumer<>(props);
-        final List<TopicPartition> partitions = getAllPartitions(consumer, TOPICS);
+        final List<TopicPartition> partitions = getAllPartitions(consumer, NUMERIC_VALUE_TOPICS);
         consumer.assign(partitions);
         consumer.seekToBeginning(partitions);
 
         final int recordsGenerated = inputs.size() * maxRecordsPerKey;
         int recordsProcessed = 0;
         final Map<String, AtomicInteger> processed =
-            Stream.of(TOPICS)
+            Stream.of(NUMERIC_VALUE_TOPICS)
                   .collect(Collectors.toMap(t -> t, t -> new AtomicInteger(0)));
 
         final Map<String, Map<String, LinkedList<ConsumerRecord<String, Number>>>> events = new HashMap<>();
diff --git a/streams/upgrade-system-tests-31/src/test/java/org/apache/kafka/streams/tests/StreamsUpgradeTest.java b/streams/upgrade-system-tests-31/src/test/java/org/apache/kafka/streams/tests/StreamsUpgradeTest.java
index 6657c5f2f23..311d30ba400 100644
--- a/streams/upgrade-system-tests-31/src/test/java/org/apache/kafka/streams/tests/StreamsUpgradeTest.java
+++ b/streams/upgrade-system-tests-31/src/test/java/org/apache/kafka/streams/tests/StreamsUpgradeTest.java
@@ -16,11 +16,18 @@
  */
 package org.apache.kafka.streams.tests;
 
+import static org.apache.kafka.streams.tests.SmokeTestUtil.intSerde;
+import static org.apache.kafka.streams.tests.SmokeTestUtil.stringSerde;
+
+import java.util.Random;
 import org.apache.kafka.common.utils.Utils;
 import org.apache.kafka.streams.KafkaStreams;
 import org.apache.kafka.streams.StreamsBuilder;
 import org.apache.kafka.streams.StreamsConfig;
+import org.apache.kafka.streams.kstream.Consumed;
 import org.apache.kafka.streams.kstream.KStream;
+import org.apache.kafka.streams.kstream.KTable;
+import org.apache.kafka.streams.kstream.Produced;
 import org.apache.kafka.streams.processor.api.ContextualProcessor;
 import org.apache.kafka.streams.processor.api.ProcessorContext;
 import org.apache.kafka.streams.processor.api.ProcessorSupplier;
@@ -44,12 +51,29 @@ public class StreamsUpgradeTest {
         System.out.println("props=" + streamsProperties);
 
         final StreamsBuilder builder = new StreamsBuilder();
-        final KStream dataStream = builder.stream("data");
-        dataStream.process(printProcessorSupplier());
+        final KTable<String, Integer> dataTable = builder.table(
+            "data", Consumed.with(stringSerde, intSerde));
+        final KStream<String, Integer> dataStream = dataTable.toStream();
+        dataStream.process(printProcessorSupplier("data"));
         dataStream.to("echo");
 
+        final boolean runFkJoin = Boolean.parseBoolean(streamsProperties.getProperty(
+            "test.run_fk_join",
+            "false"));
+        if (runFkJoin) {
+            try {
+                final KTable<Integer, String> fkTable = builder.table(
+                    "fk", Consumed.with(intSerde, stringSerde));
+                buildFKTable(dataStream, fkTable);
+            } catch (final Exception e) {
+                System.err.println("Caught " + e.getMessage());
+            }
+        }
+
         final Properties config = new Properties();
-        config.setProperty(StreamsConfig.APPLICATION_ID_CONFIG, "StreamsUpgradeTest");
+        config.setProperty(
+            StreamsConfig.APPLICATION_ID_CONFIG,
+            "StreamsUpgradeTest-" + new Random().nextLong());
         config.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, 1000);
         config.putAll(streamsProperties);
 
@@ -63,13 +87,22 @@ public class StreamsUpgradeTest {
         }));
     }
 
-    private static <KIn, VIn, KOut, VOut> ProcessorSupplier<KIn, VIn, KOut, VOut> printProcessorSupplier() {
+    private static void buildFKTable(final KStream<String, Integer> primaryTable,
+                                     final KTable<Integer, String> otherTable) {
+        final KStream<String, String> kStream = primaryTable.toTable()
+            .join(otherTable, v -> v, (k0, v0) -> v0)
+            .toStream();
+        kStream.process(printProcessorSupplier("fk"));
+        kStream.to("fk-result", Produced.with(stringSerde, stringSerde));
+    }
+
+    private static <KIn, VIn, KOut, VOut> ProcessorSupplier<KIn, VIn, KOut, VOut> printProcessorSupplier(final String topic) {
         return () -> new ContextualProcessor<KIn, VIn, KOut, VOut>() {
             private int numRecordsProcessed = 0;
 
             @Override
             public void init(final ProcessorContext<KOut, VOut> context) {
-                System.out.println("[3.1] initializing processor: topic=data taskId=" + context.taskId());
+                System.out.println("[3.1] initializing processor: topic=" + topic + "taskId=" + context.taskId());
                 numRecordsProcessed = 0;
             }
 
@@ -77,7 +110,7 @@ public class StreamsUpgradeTest {
             public void process(final Record<KIn, VIn> record) {
                 numRecordsProcessed++;
                 if (numRecordsProcessed % 100 == 0) {
-                    System.out.println("processed " + numRecordsProcessed + " records from topic=data");
+                    System.out.println("processed " + numRecordsProcessed + " records from topic=" + topic);
                 }
             }
 
diff --git a/tests/kafkatest/services/streams.py b/tests/kafkatest/services/streams.py
index 5dedc579163..38b303281d2 100644
--- a/tests/kafkatest/services/streams.py
+++ b/tests/kafkatest/services/streams.py
@@ -22,7 +22,7 @@ from ducktape.utils.util import wait_until
 from kafkatest.directory_layout.kafka_path import KafkaPathResolverMixin
 from kafkatest.services.kafka import KafkaConfig
 from kafkatest.services.monitor.jmx import JmxMixin
-from kafkatest.version import LATEST_0_10_0, LATEST_0_10_1
+from kafkatest.version import KafkaVersion, LATEST_0_10_0, LATEST_0_10_1
 
 STATE_DIR = "state.dir"
 
@@ -616,6 +616,9 @@ class StreamsUpgradeTestJobRunnerService(StreamsTestBaseService):
 
         if self.UPGRADE_FROM is not None:
             properties['upgrade.from'] = self.UPGRADE_FROM
+        if (self.UPGRADE_FROM is not None and KafkaVersion(self.UPGRADE_FROM).supports_fk_joins()) or \
+            (self.KAFKA_STREAMS_VERSION is not None and KafkaVersion(self.KAFKA_STREAMS_VERSION).supports_fk_joins()):
+            properties['test.run_fk_join'] = "true"
         if self.UPGRADE_TO == "future_version":
             properties['test.future.metadata'] = "any_value"
 
diff --git a/tests/kafkatest/tests/streams/streams_upgrade_test.py b/tests/kafkatest/tests/streams/streams_upgrade_test.py
index 57c89aa4c83..7639b33f1ed 100644
--- a/tests/kafkatest/tests/streams/streams_upgrade_test.py
+++ b/tests/kafkatest/tests/streams/streams_upgrade_test.py
@@ -37,6 +37,8 @@ broker_upgrade_versions = [str(LATEST_0_11_0), str(LATEST_1_0), str(LATEST_1_1),
 
 metadata_1_versions = [str(LATEST_0_10_0)]
 metadata_2_versions = [str(LATEST_0_10_1), str(LATEST_0_10_2), str(LATEST_0_11_0), str(LATEST_1_0), str(LATEST_1_1)]
+fk_join_versions = [str(LATEST_2_4), str(LATEST_2_5), str(LATEST_2_6), str(LATEST_2_7), str(LATEST_2_8), 
+                    str(LATEST_3_0), str(LATEST_3_1)]
 
 """
 After each release one should first check that the released version has been uploaded to 
@@ -86,9 +88,11 @@ class StreamsUpgradeTest(Test):
         self.topics = {
             'echo' : { 'partitions': 5 },
             'data' : { 'partitions': 5 },
+            'fk' : { 'partitions': 5 },
         }
 
-    processed_msg = "processed [0-9]* records"
+    processed_data_msg = "processed [0-9]* records from topic=data"
+    processed_fk_msg = "processed [0-9]* records from topic=fk"
     base_version_number = str(DEV_VERSION).split("-")[0]
 
     def perform_broker_upgrade(self, to_version):
@@ -159,9 +163,9 @@ class StreamsUpgradeTest(Test):
 
             with processor.node.account.monitor_log(processor.STDOUT_FILE) as monitor:
                 processor.start()
-                monitor.wait_until(self.processed_msg,
+                monitor.wait_until(self.processed_data_msg,
                                    timeout_sec=60,
-                                   err_msg="Never saw output '%s' on " % self.processed_msg + str(processor.node))
+                                   err_msg="Never saw output '%s' on " % self.processed_data_msg + str(processor.node))
 
             connected_message = "Discovered group coordinator"
             with processor.node.account.monitor_log(processor.LOG_FILE) as log_monitor:
@@ -172,9 +176,9 @@ class StreamsUpgradeTest(Test):
                                            timeout_sec=120,
                                            err_msg=("Never saw output '%s' on " % connected_message) + str(processor.node.account))
 
-                    stdout_monitor.wait_until(self.processed_msg,
+                    stdout_monitor.wait_until(self.processed_data_msg,
                                               timeout_sec=60,
-                                              err_msg="Never saw output '%s' on" % self.processed_msg + str(processor.node.account))
+                                              err_msg="Never saw output '%s' on" % self.processed_data_msg + str(processor.node.account))
 
             # SmokeTestDriver allows up to 6 minutes to consume all
             # records for the verification step so this timeout is set to
@@ -192,8 +196,12 @@ class StreamsUpgradeTest(Test):
     @cluster(num_nodes=6)
     @matrix(from_version=metadata_1_versions, to_version=[str(DEV_VERSION)])
     @matrix(from_version=metadata_2_versions, to_version=[str(DEV_VERSION)])
-    def test_metadata_upgrade(self, from_version, to_version):
+    @matrix(from_version=fk_join_versions, to_version=[str(DEV_VERSION)])
+    def test_rolling_upgrade_with_2_bounces(self, from_version, to_version):
         """
+        This test verifies that the cluster successfully upgrades despite changes in the metadata and FK
+        join protocols.
+        
         Starts 3 KafkaStreams instances with version <from_version> and upgrades one-by-one to <to_version>
         """
 
@@ -311,9 +319,14 @@ class StreamsUpgradeTest(Test):
                 log_monitor.wait_until(kafka_version_str,
                                        timeout_sec=60,
                                        err_msg="Could not detect Kafka Streams version " + version + " " + str(node1.account))
-                monitor.wait_until(self.processed_msg,
+                monitor.wait_until(self.processed_data_msg,
                                    timeout_sec=60,
-                                   err_msg="Never saw output '%s' on " % self.processed_msg + str(node1.account))
+                                   err_msg="Never saw output '%s' on " % self.processed_data_msg + str(node1.account))
+                if KafkaVersion(version).supports_fk_joins():
+                    monitor.wait_until(self.processed_fk_msg,
+                    timeout_sec=60,
+                    err_msg="Never saw output '%s' on " % self.processed_fk_msg + str(node1.account))
+
 
         # start second with <version>
         self.prepare_for(self.processor2, version)
@@ -325,12 +338,16 @@ class StreamsUpgradeTest(Test):
                     log_monitor.wait_until(kafka_version_str,
                                            timeout_sec=60,
                                            err_msg="Could not detect Kafka Streams version " + version + " on " + str(node2.account))
-                    first_monitor.wait_until(self.processed_msg,
+                    first_monitor.wait_until(self.processed_data_msg,
                                              timeout_sec=60,
-                                             err_msg="Never saw output '%s' on " % self.processed_msg + str(node1.account))
-                    second_monitor.wait_until(self.processed_msg,
+                                             err_msg="Never saw output '%s' on " % self.processed_data_msg + str(node1.account))
+                    second_monitor.wait_until(self.processed_data_msg,
                                               timeout_sec=60,
-                                              err_msg="Never saw output '%s' on " % self.processed_msg + str(node2.account))
+                                              err_msg="Never saw output '%s' on " % self.processed_data_msg + str(node2.account))
+                    if KafkaVersion(version).supports_fk_joins():
+                        second_monitor.wait_until(self.processed_fk_msg,
+                        timeout_sec=60,
+                        err_msg="Never saw output '%s' on " % self.processed_fk_msg + str(node2.account))
 
         # start third with <version>
         self.prepare_for(self.processor3, version)
@@ -343,15 +360,19 @@ class StreamsUpgradeTest(Test):
                         log_monitor.wait_until(kafka_version_str,
                                                timeout_sec=60,
                                                err_msg="Could not detect Kafka Streams version " + version + " on " + str(node3.account))
-                        first_monitor.wait_until(self.processed_msg,
+                        first_monitor.wait_until(self.processed_data_msg,
                                                  timeout_sec=60,
-                                                 err_msg="Never saw output '%s' on " % self.processed_msg + str(node1.account))
-                        second_monitor.wait_until(self.processed_msg,
+                                                 err_msg="Never saw output '%s' on " % self.processed_data_msg + str(node1.account))
+                        second_monitor.wait_until(self.processed_data_msg,
                                                   timeout_sec=60,
-                                                  err_msg="Never saw output '%s' on " % self.processed_msg + str(node2.account))
-                        third_monitor.wait_until(self.processed_msg,
+                                                  err_msg="Never saw output '%s' on " % self.processed_data_msg + str(node2.account))
+                        third_monitor.wait_until(self.processed_data_msg,
                                                  timeout_sec=60,
-                                                 err_msg="Never saw output '%s' on " % self.processed_msg + str(node3.account))
+                                                 err_msg="Never saw output '%s' on " % self.processed_data_msg + str(node3.account))
+                        if KafkaVersion(version).supports_fk_joins():
+                            third_monitor.wait_until(self.processed_fk_msg,
+                            timeout_sec=60,
+                            err_msg="Never saw output '%s' on " % self.processed_fk_msg + str(node2.account))
 
     @staticmethod
     def prepare_for(processor, version):
@@ -381,12 +402,12 @@ class StreamsUpgradeTest(Test):
         with first_other_node.account.monitor_log(first_other_processor.STDOUT_FILE) as first_other_monitor:
             with second_other_node.account.monitor_log(second_other_processor.STDOUT_FILE) as second_other_monitor:
                 processor.stop()
-                first_other_monitor.wait_until(self.processed_msg,
+                first_other_monitor.wait_until(self.processed_data_msg,
                                                timeout_sec=60,
-                                               err_msg="Never saw output '%s' on " % self.processed_msg + str(first_other_node.account))
-                second_other_monitor.wait_until(self.processed_msg,
+                                               err_msg="Never saw output '%s' on " % self.processed_data_msg + str(first_other_node.account))
+                second_other_monitor.wait_until(self.processed_data_msg,
                                                 timeout_sec=60,
-                                                err_msg="Never saw output '%s' on " % self.processed_msg + str(second_other_node.account))
+                                                err_msg="Never saw output '%s' on " % self.processed_data_msg + str(second_other_node.account))
         node.account.ssh_capture("grep UPGRADE-TEST-CLIENT-CLOSED %s" % processor.STDOUT_FILE, allow_fail=False)
 
         if upgrade_from is None:  # upgrade disabled -- second round of rolling bounces
@@ -414,23 +435,23 @@ class StreamsUpgradeTest(Test):
                         log_monitor.wait_until(kafka_version_str,
                                                timeout_sec=60,
                                                err_msg="Could not detect Kafka Streams version " + new_version + " on " + str(node.account))
-                        first_other_monitor.wait_until(self.processed_msg,
+                        first_other_monitor.wait_until(self.processed_data_msg,
                                                        timeout_sec=60,
-                                                       err_msg="Never saw output '%s' on " % self.processed_msg + str(first_other_node.account))
+                                                       err_msg="Never saw output '%s' on " % self.processed_data_msg + str(first_other_node.account))
                         found = list(first_other_node.account.ssh_capture(grep_metadata_error + first_other_processor.STDERR_FILE, allow_fail=True))
                         if len(found) > 0:
                             raise Exception("Kafka Streams failed with 'unable to decode subscription data: version=2'")
 
-                        second_other_monitor.wait_until(self.processed_msg,
+                        second_other_monitor.wait_until(self.processed_data_msg,
                                                         timeout_sec=60,
-                                                        err_msg="Never saw output '%s' on " % self.processed_msg + str(second_other_node.account))
+                                                        err_msg="Never saw output '%s' on " % self.processed_data_msg + str(second_other_node.account))
                         found = list(second_other_node.account.ssh_capture(grep_metadata_error + second_other_processor.STDERR_FILE, allow_fail=True))
                         if len(found) > 0:
                             raise Exception("Kafka Streams failed with 'unable to decode subscription data: version=2'")
 
-                        monitor.wait_until(self.processed_msg,
+                        monitor.wait_until(self.processed_data_msg,
                                            timeout_sec=60,
-                                           err_msg="Never saw output '%s' on " % self.processed_msg + str(node.account))
+                                           err_msg="Never saw output '%s' on " % self.processed_data_msg + str(node.account))
 
 
     def do_rolling_bounce(self, processor, counter, current_generation):
diff --git a/tests/kafkatest/version.py b/tests/kafkatest/version.py
index 3a01a5adf58..64f0bf2c53d 100644
--- a/tests/kafkatest/version.py
+++ b/tests/kafkatest/version.py
@@ -106,6 +106,9 @@ class KafkaVersion(LooseVersion):
         # Self-managed clusters always support topic ID, so this method only applies to ZK clusters.
         return self >= V_2_8_0
 
+    def supports_fk_joins(self):
+        return hasattr(self, "version") and self >= V_2_4_0
+
 def get_version(node=None):
     """Return the version attached to the given node.
     Default to DEV_BRANCH if node or node.version is undefined (aka None)