You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by vv...@apache.org on 2020/04/29 22:06:23 UTC

[kafka] 02/02: KAFKA-9925: decorate pseudo-topics with app id (#8574)

This is an automated email from the ASF dual-hosted git repository.

vvcephei pushed a commit to branch 2.5
in repository https://gitbox.apache.org/repos/asf/kafka.git

commit d1ae75d5f9222da42b1a396f4dc1f01e3df0ef83
Author: John Roesler <vv...@users.noreply.github.com>
AuthorDate: Wed Apr 29 16:17:34 2020 -0500

    KAFKA-9925: decorate pseudo-topics with app id (#8574)
    
    Reviewers: Boyang Chen <bo...@confluent.io>, Kin Siu
---
 .../streams/kstream/internals/KTableImpl.java      |  24 +++-
 .../foreignkeyjoin/CombinedKeySchema.java          |  17 ++-
 ...reignJoinSubscriptionSendProcessorSupplier.java |  17 ++-
 .../SubscriptionResolverJoinProcessorSupplier.java |  10 +-
 .../foreignkeyjoin/SubscriptionWrapperSerde.java   |  30 +++--
 .../internals/InternalTopologyBuilder.java         |   4 +
 ...KTableKTableForeignKeyJoinDefaultSerdeTest.java |  74 ++++++++++-
 .../KTableKTableForeignKeyJoinPseudoTopicTest.java | 138 ---------------------
 .../foreignkeyjoin/CombinedKeySchemaTest.java      |  30 +++--
 ...scriptionResolverJoinProcessorSupplierTest.java |  12 +-
 .../SubscriptionWrapperSerdeTest.java              |   8 +-
 11 files changed, 176 insertions(+), 188 deletions(-)

diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableImpl.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableImpl.java
index 75cdffa..cc7d8f2 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableImpl.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableImpl.java
@@ -75,6 +75,7 @@ import java.util.HashSet;
 import java.util.Objects;
 import java.util.Set;
 import java.util.function.Function;
+import java.util.function.Supplier;
 
 import static org.apache.kafka.streams.kstream.internals.graph.GraphGraceSearchUtil.findAndVerifyWindowGrace;
 
@@ -953,13 +954,26 @@ public class KTableImpl<K, S, V> extends AbstractStream<K, V> implements KTable<
         //This occurs whenever the extracted foreignKey changes values.
         enableSendingOldValues();
 
+        final NamedInternal renamed = new NamedInternal(joinName);
+
+        final String subscriptionTopicName = renamed.suffixWithOrElseGet(
+            "-subscription-registration",
+            builder,
+            SUBSCRIPTION_REGISTRATION
+        ) + TOPIC_SUFFIX;
 
+        // the decoration can't be performed until we have the configuration available when the app runs,
+        // so we pass Suppliers into the components, which they can call at run time
+
+        final Supplier<String> subscriptionPrimaryKeySerdePseudoTopic =
+            () -> internalTopologyBuilder().decoratePseudoTopic(subscriptionTopicName + "-pk");
+
+        final Supplier<String> subscriptionForeignKeySerdePseudoTopic =
+            () -> internalTopologyBuilder().decoratePseudoTopic(subscriptionTopicName + "-fk");
+
+        final Supplier<String> valueHashSerdePseudoTopic =
+            () -> internalTopologyBuilder().decoratePseudoTopic(subscriptionTopicName + "-vh");
 
-        final NamedInternal renamed = new NamedInternal(joinName);
-        final String subscriptionTopicName = renamed.suffixWithOrElseGet("-subscription-registration", builder, SUBSCRIPTION_REGISTRATION) + TOPIC_SUFFIX;
-        final String subscriptionPrimaryKeySerdePseudoTopic = subscriptionTopicName + "-pk";
-        final String subscriptionForeignKeySerdePseudoTopic = subscriptionTopicName + "-fk";
-        final String valueHashSerdePseudoTopic = subscriptionTopicName + "-vh";
         builder.internalTopologyBuilder.addInternalTopic(subscriptionTopicName);
 
         final Serde<KO> foreignKeySerde = ((KTableImpl<KO, VO, ?>) foreignKeyTable).keySerde;
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/foreignkeyjoin/CombinedKeySchema.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/foreignkeyjoin/CombinedKeySchema.java
index 92fb72c..57bc646 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/foreignkeyjoin/CombinedKeySchema.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/foreignkeyjoin/CombinedKeySchema.java
@@ -23,24 +23,27 @@ import org.apache.kafka.common.utils.Bytes;
 import org.apache.kafka.streams.processor.ProcessorContext;
 
 import java.nio.ByteBuffer;
+import java.util.function.Supplier;
 
 /**
  * Factory for creating CombinedKey serializers / deserializers.
  */
 public class CombinedKeySchema<KO, K> {
-    private final String primaryKeySerdeTopic;
-    private final String foreignKeySerdeTopic;
+    private final Supplier<String> undecoratedPrimaryKeySerdeTopicSupplier;
+    private final Supplier<String> undecoratedForeignKeySerdeTopicSupplier;
+    private String primaryKeySerdeTopic;
+    private String foreignKeySerdeTopic;
     private Serializer<K> primaryKeySerializer;
     private Deserializer<K> primaryKeyDeserializer;
     private Serializer<KO> foreignKeySerializer;
     private Deserializer<KO> foreignKeyDeserializer;
 
-    public CombinedKeySchema(final String foreignKeySerdeTopic,
+    public CombinedKeySchema(final Supplier<String> foreignKeySerdeTopicSupplier,
                              final Serde<KO> foreignKeySerde,
-                             final String primaryKeySerdeTopic,
+                             final Supplier<String> primaryKeySerdeTopicSupplier,
                              final Serde<K> primaryKeySerde) {
-        this.primaryKeySerdeTopic = primaryKeySerdeTopic;
-        this.foreignKeySerdeTopic = foreignKeySerdeTopic;
+        undecoratedPrimaryKeySerdeTopicSupplier = primaryKeySerdeTopicSupplier;
+        undecoratedForeignKeySerdeTopicSupplier = foreignKeySerdeTopicSupplier;
         primaryKeySerializer = primaryKeySerde == null ? null : primaryKeySerde.serializer();
         primaryKeyDeserializer = primaryKeySerde == null ? null : primaryKeySerde.deserializer();
         foreignKeyDeserializer = foreignKeySerde == null ? null : foreignKeySerde.deserializer();
@@ -49,6 +52,8 @@ public class CombinedKeySchema<KO, K> {
 
     @SuppressWarnings("unchecked")
     public void init(final ProcessorContext context) {
+        primaryKeySerdeTopic = undecoratedPrimaryKeySerdeTopicSupplier.get();
+        foreignKeySerdeTopic = undecoratedForeignKeySerdeTopicSupplier.get();
         primaryKeySerializer = primaryKeySerializer == null ? (Serializer<K>) context.keySerde().serializer() : primaryKeySerializer;
         primaryKeyDeserializer = primaryKeyDeserializer == null ? (Deserializer<K>) context.keySerde().deserializer() : primaryKeyDeserializer;
         foreignKeySerializer = foreignKeySerializer == null ? (Serializer<KO>) context.keySerde().serializer() : foreignKeySerializer;
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/foreignkeyjoin/ForeignJoinSubscriptionSendProcessorSupplier.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/foreignkeyjoin/ForeignJoinSubscriptionSendProcessorSupplier.java
index ba794f7..9787875 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/foreignkeyjoin/ForeignJoinSubscriptionSendProcessorSupplier.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/foreignkeyjoin/ForeignJoinSubscriptionSendProcessorSupplier.java
@@ -33,6 +33,7 @@ import org.slf4j.LoggerFactory;
 
 import java.util.Arrays;
 import java.util.function.Function;
+import java.util.function.Supplier;
 
 import static org.apache.kafka.streams.kstream.internals.foreignkeyjoin.SubscriptionWrapper.Instruction.DELETE_KEY_AND_PROPAGATE;
 import static org.apache.kafka.streams.kstream.internals.foreignkeyjoin.SubscriptionWrapper.Instruction.DELETE_KEY_NO_PROPAGATE;
@@ -43,21 +44,21 @@ public class ForeignJoinSubscriptionSendProcessorSupplier<K, KO, V> implements P
     private static final Logger LOG = LoggerFactory.getLogger(ForeignJoinSubscriptionSendProcessorSupplier.class);
 
     private final Function<V, KO> foreignKeyExtractor;
-    private final String foreignKeySerdeTopic;
-    private final String valueSerdeTopic;
+    private final Supplier<String> foreignKeySerdeTopicSupplier;
+    private final Supplier<String> valueSerdeTopicSupplier;
     private final boolean leftJoin;
     private Serializer<KO> foreignKeySerializer;
     private Serializer<V> valueSerializer;
 
     public ForeignJoinSubscriptionSendProcessorSupplier(final Function<V, KO> foreignKeyExtractor,
-                                                        final String foreignKeySerdeTopic,
-                                                        final String valueSerdeTopic,
+                                                        final Supplier<String> foreignKeySerdeTopicSupplier,
+                                                        final Supplier<String> valueSerdeTopicSupplier,
                                                         final Serde<KO> foreignKeySerde,
                                                         final Serializer<V> valueSerializer,
                                                         final boolean leftJoin) {
         this.foreignKeyExtractor = foreignKeyExtractor;
-        this.foreignKeySerdeTopic = foreignKeySerdeTopic;
-        this.valueSerdeTopic = valueSerdeTopic;
+        this.foreignKeySerdeTopicSupplier = foreignKeySerdeTopicSupplier;
+        this.valueSerdeTopicSupplier = valueSerdeTopicSupplier;
         this.valueSerializer = valueSerializer;
         this.leftJoin = leftJoin;
         foreignKeySerializer = foreignKeySerde == null ? null : foreignKeySerde.serializer();
@@ -71,11 +72,15 @@ public class ForeignJoinSubscriptionSendProcessorSupplier<K, KO, V> implements P
     private class UnbindChangeProcessor extends AbstractProcessor<K, Change<V>> {
 
         private Sensor droppedRecordsSensor;
+        private String foreignKeySerdeTopic;
+        private String valueSerdeTopic;
 
         @SuppressWarnings("unchecked")
         @Override
         public void init(final ProcessorContext context) {
             super.init(context);
+            foreignKeySerdeTopic = foreignKeySerdeTopicSupplier.get();
+            valueSerdeTopic = valueSerdeTopicSupplier.get();
             // get default key serde if it wasn't supplied directly at construction
             if (foreignKeySerializer == null) {
                 foreignKeySerializer = (Serializer<KO>) context.keySerde().serializer();
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/foreignkeyjoin/SubscriptionResolverJoinProcessorSupplier.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/foreignkeyjoin/SubscriptionResolverJoinProcessorSupplier.java
index 31de068..3cd0636 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/foreignkeyjoin/SubscriptionResolverJoinProcessorSupplier.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/foreignkeyjoin/SubscriptionResolverJoinProcessorSupplier.java
@@ -29,6 +29,8 @@ import org.apache.kafka.streams.processor.ProcessorSupplier;
 import org.apache.kafka.streams.state.ValueAndTimestamp;
 import org.apache.kafka.streams.state.internals.Murmur3;
 
+import java.util.function.Supplier;
+
 /**
  * Receives {@code SubscriptionResponseWrapper<VO>} events and filters out events which do not match the current hash
  * of the primary key. This eliminates race-condition results for rapidly-changing foreign-keys for a given primary key.
@@ -42,18 +44,18 @@ import org.apache.kafka.streams.state.internals.Murmur3;
 public class SubscriptionResolverJoinProcessorSupplier<K, V, VO, VR> implements ProcessorSupplier<K, SubscriptionResponseWrapper<VO>> {
     private final KTableValueGetterSupplier<K, V> valueGetterSupplier;
     private final Serializer<V> constructionTimeValueSerializer;
-    private final String valueHashSerdePseudoTopic;
+    private final Supplier<String> valueHashSerdePseudoTopicSupplier;
     private final ValueJoiner<V, VO, VR> joiner;
     private final boolean leftJoin;
 
     public SubscriptionResolverJoinProcessorSupplier(final KTableValueGetterSupplier<K, V> valueGetterSupplier,
                                                      final Serializer<V> valueSerializer,
-                                                     final String valueHashSerdePseudoTopic,
+                                                     final Supplier<String> valueHashSerdePseudoTopicSupplier,
                                                      final ValueJoiner<V, VO, VR> joiner,
                                                      final boolean leftJoin) {
         this.valueGetterSupplier = valueGetterSupplier;
         constructionTimeValueSerializer = valueSerializer;
-        this.valueHashSerdePseudoTopic = valueHashSerdePseudoTopic;
+        this.valueHashSerdePseudoTopicSupplier = valueHashSerdePseudoTopicSupplier;
         this.joiner = joiner;
         this.leftJoin = leftJoin;
     }
@@ -61,6 +63,7 @@ public class SubscriptionResolverJoinProcessorSupplier<K, V, VO, VR> implements
     @Override
     public Processor<K, SubscriptionResponseWrapper<VO>> get() {
         return new AbstractProcessor<K, SubscriptionResponseWrapper<VO>>() {
+            private String valueHashSerdePseudoTopic;
             private Serializer<V> runtimeValueSerializer = constructionTimeValueSerializer;
 
             private KTableValueGetter<K, V> valueGetter;
@@ -69,6 +72,7 @@ public class SubscriptionResolverJoinProcessorSupplier<K, V, VO, VR> implements
             @Override
             public void init(final ProcessorContext context) {
                 super.init(context);
+                valueHashSerdePseudoTopic = valueHashSerdePseudoTopicSupplier.get();
                 valueGetter = valueGetterSupplier.get();
                 valueGetter.init(context);
                 if (runtimeValueSerializer == null) {
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/foreignkeyjoin/SubscriptionWrapperSerde.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/foreignkeyjoin/SubscriptionWrapperSerde.java
index 42aed94..136128c 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/foreignkeyjoin/SubscriptionWrapperSerde.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/foreignkeyjoin/SubscriptionWrapperSerde.java
@@ -25,16 +25,17 @@ import org.apache.kafka.streams.kstream.internals.WrappingNullableSerializer;
 
 import java.nio.ByteBuffer;
 import java.util.Objects;
+import java.util.function.Supplier;
 
 public class SubscriptionWrapperSerde<K> implements Serde<SubscriptionWrapper<K>> {
     private final SubscriptionWrapperSerializer<K> serializer;
     private final SubscriptionWrapperDeserializer<K> deserializer;
 
-    public SubscriptionWrapperSerde(final String primaryKeySerializationPseudoTopic,
+    public SubscriptionWrapperSerde(final Supplier<String> primaryKeySerializationPseudoTopicSupplier,
                                     final Serde<K> primaryKeySerde) {
-        serializer = new SubscriptionWrapperSerializer<>(primaryKeySerializationPseudoTopic,
+        serializer = new SubscriptionWrapperSerializer<>(primaryKeySerializationPseudoTopicSupplier,
                                                          primaryKeySerde == null ? null : primaryKeySerde.serializer());
-        deserializer = new SubscriptionWrapperDeserializer<>(primaryKeySerializationPseudoTopic,
+        deserializer = new SubscriptionWrapperDeserializer<>(primaryKeySerializationPseudoTopicSupplier,
                                                              primaryKeySerde == null ? null : primaryKeySerde.deserializer());
     }
 
@@ -51,12 +52,13 @@ public class SubscriptionWrapperSerde<K> implements Serde<SubscriptionWrapper<K>
     private static class SubscriptionWrapperSerializer<K>
         implements Serializer<SubscriptionWrapper<K>>, WrappingNullableSerializer<SubscriptionWrapper<K>, K> {
 
-        private final String primaryKeySerializationPseudoTopic;
+        private final Supplier<String> primaryKeySerializationPseudoTopicSupplier;
+        private String primaryKeySerializationPseudoTopic = null;
         private Serializer<K> primaryKeySerializer;
 
-        SubscriptionWrapperSerializer(final String primaryKeySerializationPseudoTopic,
+        SubscriptionWrapperSerializer(final Supplier<String> primaryKeySerializationPseudoTopicSupplier,
                                       final Serializer<K> primaryKeySerializer) {
-            this.primaryKeySerializationPseudoTopic = primaryKeySerializationPseudoTopic;
+            this.primaryKeySerializationPseudoTopicSupplier = primaryKeySerializationPseudoTopicSupplier;
             this.primaryKeySerializer = primaryKeySerializer;
         }
 
@@ -76,6 +78,10 @@ public class SubscriptionWrapperSerde<K> implements Serde<SubscriptionWrapper<K>
                 throw new UnsupportedVersionException("SubscriptionWrapper version is larger than maximum supported 0x7F");
             }
 
+            if (primaryKeySerializationPseudoTopic == null) {
+                primaryKeySerializationPseudoTopic = primaryKeySerializationPseudoTopicSupplier.get();
+            }
+
             final byte[] primaryKeySerializedData = primaryKeySerializer.serialize(
                 primaryKeySerializationPseudoTopic,
                 data.getPrimaryKey()
@@ -106,12 +112,13 @@ public class SubscriptionWrapperSerde<K> implements Serde<SubscriptionWrapper<K>
     private static class SubscriptionWrapperDeserializer<K>
         implements Deserializer<SubscriptionWrapper<K>>, WrappingNullableDeserializer<SubscriptionWrapper<K>, K> {
 
-        private final String primaryKeySerializationPseudoTopic;
+        private final Supplier<String> primaryKeySerializationPseudoTopicSupplier;
+        private String primaryKeySerializationPseudoTopic = null;
         private Deserializer<K> primaryKeyDeserializer;
 
-        SubscriptionWrapperDeserializer(final String primaryKeySerializationPseudoTopic,
+        SubscriptionWrapperDeserializer(final Supplier<String> primaryKeySerializationPseudoTopicSupplier,
                                         final Deserializer<K> primaryKeyDeserializer) {
-            this.primaryKeySerializationPseudoTopic = primaryKeySerializationPseudoTopic;
+            this.primaryKeySerializationPseudoTopicSupplier = primaryKeySerializationPseudoTopicSupplier;
             this.primaryKeyDeserializer = primaryKeyDeserializer;
         }
 
@@ -144,6 +151,11 @@ public class SubscriptionWrapperSerde<K> implements Serde<SubscriptionWrapper<K>
 
             final byte[] primaryKeyRaw = new byte[data.length - lengthSum]; //The remaining data is the serialized pk
             buf.get(primaryKeyRaw, 0, primaryKeyRaw.length);
+
+            if (primaryKeySerializationPseudoTopic == null) {
+                primaryKeySerializationPseudoTopic = primaryKeySerializationPseudoTopicSupplier.get();
+            }
+
             final K primaryKey = primaryKeyDeserializer.deserialize(primaryKeySerializationPseudoTopic,
                                                                     primaryKeyRaw);
 
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalTopologyBuilder.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalTopologyBuilder.java
index f36324d..41123b3 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalTopologyBuilder.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalTopologyBuilder.java
@@ -1203,6 +1203,10 @@ public class InternalTopologyBuilder {
         return decoratedTopics;
     }
 
+    public String decoratePseudoTopic(final String topic) {
+        return decorateTopic(topic);
+    }
+
     private String decorateTopic(final String topic) {
         if (applicationId == null) {
             throw new TopologyException("there are internal topics and "
diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/KTableKTableForeignKeyJoinDefaultSerdeTest.java b/streams/src/test/java/org/apache/kafka/streams/integration/KTableKTableForeignKeyJoinDefaultSerdeTest.java
index 5af0530..64c6b06 100644
--- a/streams/src/test/java/org/apache/kafka/streams/integration/KTableKTableForeignKeyJoinDefaultSerdeTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/integration/KTableKTableForeignKeyJoinDefaultSerdeTest.java
@@ -24,12 +24,14 @@ import org.apache.kafka.streams.StreamsBuilder;
 import org.apache.kafka.streams.StreamsConfig;
 import org.apache.kafka.streams.TestInputTopic;
 import org.apache.kafka.streams.TestOutputTopic;
+import org.apache.kafka.streams.Topology;
 import org.apache.kafka.streams.TopologyTestDriver;
 import org.apache.kafka.streams.kstream.Consumed;
 import org.apache.kafka.streams.kstream.KTable;
 import org.apache.kafka.streams.kstream.Materialized;
 import org.apache.kafka.streams.kstream.Produced;
 import org.apache.kafka.streams.state.KeyValueStore;
+import org.apache.kafka.streams.utils.UniqueTopicSerdeScope;
 import org.apache.kafka.test.TestUtils;
 import org.junit.Test;
 
@@ -38,10 +40,15 @@ import java.util.Map;
 import java.util.Properties;
 import java.util.UUID;
 
+import static org.apache.kafka.common.utils.Utils.mkEntry;
+import static org.apache.kafka.common.utils.Utils.mkMap;
+import static org.apache.kafka.common.utils.Utils.mkProperties;
+import static org.apache.kafka.common.utils.Utils.mkSet;
 import static org.hamcrest.MatcherAssert.assertThat;
 import static org.hamcrest.Matchers.is;
 
 public class KTableKTableForeignKeyJoinDefaultSerdeTest {
+
     @Test
     public void shouldWorkWithDefaultSerdes() {
         final StreamsBuilder builder = new StreamsBuilder();
@@ -161,7 +168,72 @@ public class KTableKTableForeignKeyJoinDefaultSerdeTest {
         validateTopologyCanProcessData(builder);
     }
 
-    private static void validateTopologyCanProcessData(final StreamsBuilder builder) {
+    @Test
+    public void shouldUseExpectedTopicsWithSerde() {
+        final String applicationId = "ktable-ktable-joinOnForeignKey";
+        final Properties streamsConfig = mkProperties(mkMap(
+            mkEntry(StreamsConfig.APPLICATION_ID_CONFIG, applicationId),
+            mkEntry(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "asdf:0000"),
+            mkEntry(StreamsConfig.STATE_DIR_CONFIG, TestUtils.tempDirectory().getPath())
+        ));
+
+        final UniqueTopicSerdeScope serdeScope = new UniqueTopicSerdeScope();
+        final StreamsBuilder builder = new StreamsBuilder();
+
+        final String leftTable = "left_table";
+        final String rightTable = "right_table";
+        final String output = "output-topic";
+
+        final KTable<String, String> left = builder.table(
+            leftTable,
+            Consumed.with(serdeScope.decorateSerde(Serdes.String(), streamsConfig, true),
+                          serdeScope.decorateSerde(Serdes.String(), streamsConfig, false))
+        );
+        final KTable<String, String> right = builder.table(
+            rightTable,
+            Consumed.with(serdeScope.decorateSerde(Serdes.String(), streamsConfig, true),
+                          serdeScope.decorateSerde(Serdes.String(), streamsConfig, false))
+        );
+
+        left.join(
+            right,
+            value -> value.split("\\|")[1],
+            (value1, value2) -> "(" + value1 + "," + value2 + ")",
+            Materialized.with(null, serdeScope.decorateSerde(Serdes.String(), streamsConfig, false)
+            ))
+            .toStream()
+            .to(output);
+
+
+        final Topology topology = builder.build(streamsConfig);
+        try (final TopologyTestDriver driver = new TopologyTestDriver(topology, streamsConfig)) {
+            final TestInputTopic<String, String> leftInput = driver.createInputTopic(leftTable, new StringSerializer(), new StringSerializer());
+            final TestInputTopic<String, String> rightInput = driver.createInputTopic(rightTable, new StringSerializer(), new StringSerializer());
+            leftInput.pipeInput("lhs1", "lhsValue1|rhs1");
+            rightInput.pipeInput("rhs1", "rhsValue1");
+        }
+        // verifying primarily that no extra pseudo-topics were used, but it's nice to also verify the rest of the
+        // topics our serdes serialize data for
+        assertThat(serdeScope.registeredTopics(), is(mkSet(
+            // expected pseudo-topics
+            applicationId + "-KTABLE-FK-JOIN-SUBSCRIPTION-REGISTRATION-0000000006-topic-fk--key",
+            applicationId + "-KTABLE-FK-JOIN-SUBSCRIPTION-REGISTRATION-0000000006-topic-pk--key",
+            applicationId + "-KTABLE-FK-JOIN-SUBSCRIPTION-REGISTRATION-0000000006-topic-vh--value",
+            // internal topics
+            applicationId + "-KTABLE-FK-JOIN-SUBSCRIPTION-REGISTRATION-0000000006-topic--key",
+            applicationId + "-KTABLE-FK-JOIN-SUBSCRIPTION-RESPONSE-0000000014-topic--key",
+            applicationId + "-KTABLE-FK-JOIN-SUBSCRIPTION-RESPONSE-0000000014-topic--value",
+            applicationId + "-left_table-STATE-STORE-0000000000-changelog--key",
+            applicationId + "-left_table-STATE-STORE-0000000000-changelog--value",
+            applicationId + "-right_table-STATE-STORE-0000000003-changelog--key",
+            applicationId + "-right_table-STATE-STORE-0000000003-changelog--value",
+            // output topics
+            "output-topic--key",
+            "output-topic--value"
+        )));
+    }
+
+    private void validateTopologyCanProcessData(final StreamsBuilder builder) {
         final Properties config = new Properties();
         config.setProperty(StreamsConfig.APPLICATION_ID_CONFIG, "dummy-" + UUID.randomUUID());
         config.setProperty(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "dummy");
diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/KTableKTableForeignKeyJoinPseudoTopicTest.java b/streams/src/test/java/org/apache/kafka/streams/integration/KTableKTableForeignKeyJoinPseudoTopicTest.java
deleted file mode 100644
index b08c293..0000000
--- a/streams/src/test/java/org/apache/kafka/streams/integration/KTableKTableForeignKeyJoinPseudoTopicTest.java
+++ /dev/null
@@ -1,138 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.kafka.streams.integration;
-
-import org.apache.kafka.common.serialization.Serdes;
-import org.apache.kafka.common.serialization.StringSerializer;
-import org.apache.kafka.streams.StreamsBuilder;
-import org.apache.kafka.streams.StreamsConfig;
-import org.apache.kafka.streams.TestInputTopic;
-import org.apache.kafka.streams.Topology;
-import org.apache.kafka.streams.TopologyTestDriver;
-import org.apache.kafka.streams.kstream.Consumed;
-import org.apache.kafka.streams.kstream.KTable;
-import org.apache.kafka.streams.kstream.Materialized;
-import org.apache.kafka.streams.utils.UniqueTopicSerdeScope;
-import org.apache.kafka.test.TestUtils;
-import org.junit.Test;
-
-import java.util.Collection;
-import java.util.LinkedList;
-import java.util.List;
-import java.util.Properties;
-
-import static org.apache.kafka.common.utils.Utils.mkEntry;
-import static org.apache.kafka.common.utils.Utils.mkMap;
-import static org.apache.kafka.common.utils.Utils.mkProperties;
-import static org.apache.kafka.common.utils.Utils.mkSet;
-import static org.hamcrest.CoreMatchers.is;
-import static org.hamcrest.MatcherAssert.assertThat;
-
-
-public class KTableKTableForeignKeyJoinPseudoTopicTest {
-
-    private static final String LEFT_TABLE = "left_table";
-    private static final String RIGHT_TABLE = "right_table";
-    private static final String OUTPUT = "output-topic";
-    private static final String REJOIN_OUTPUT = "rejoin-output-topic";
-    private final Properties streamsConfig = mkProperties(mkMap(
-        mkEntry(StreamsConfig.APPLICATION_ID_CONFIG, "ktable-ktable-joinOnForeignKey"),
-        mkEntry(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "asdf:0000"),
-        mkEntry(StreamsConfig.STATE_DIR_CONFIG, TestUtils.tempDirectory().getPath())
-    ));
-
-
-    private static Collection<Object[]> buildParameters(final List<?>... argOptions) {
-        List<Object[]> result = new LinkedList<>();
-        result.add(new Object[0]);
-
-        for (final List<?> argOption : argOptions) {
-            result = times(result, argOption);
-        }
-
-        return result;
-    }
-
-    private static List<Object[]> times(final List<Object[]> left, final List<?> right) {
-        final List<Object[]> result = new LinkedList<>();
-        for (final Object[] args : left) {
-            for (final Object rightElem : right) {
-                final Object[] resArgs = new Object[args.length + 1];
-                System.arraycopy(args, 0, resArgs, 0, args.length);
-                resArgs[args.length] = rightElem;
-                result.add(resArgs);
-            }
-        }
-        return result;
-    }
-
-
-    @Test
-    public void shouldUseExpectedTopicsWithSerde() {
-        final UniqueTopicSerdeScope serdeScope = new UniqueTopicSerdeScope();
-        final StreamsBuilder builder = new StreamsBuilder();
-
-        final KTable<String, String> left = builder.table(
-            LEFT_TABLE,
-            Consumed.with(serdeScope.decorateSerde(Serdes.String(), streamsConfig, true),
-                          serdeScope.decorateSerde(Serdes.String(), streamsConfig, false))
-        );
-        final KTable<String, String> right = builder.table(
-            RIGHT_TABLE,
-            Consumed.with(serdeScope.decorateSerde(Serdes.String(), streamsConfig, true),
-                          serdeScope.decorateSerde(Serdes.String(), streamsConfig, false))
-        );
-
-        left.join(
-            right,
-            value -> value.split("\\|")[1],
-            (value1, value2) -> "(" + value1 + "," + value2 + ")",
-            Materialized.with(null, serdeScope.decorateSerde(Serdes.String(), streamsConfig, false)
-            ))
-            .toStream()
-            .to(OUTPUT);
-
-
-        final Topology topology = builder.build(streamsConfig);
-        try (final TopologyTestDriver driver = new TopologyTestDriver(topology, streamsConfig)) {
-            final TestInputTopic<String, String> leftInput = driver.createInputTopic(LEFT_TABLE, new StringSerializer(), new StringSerializer());
-            final TestInputTopic<String, String> rightInput = driver.createInputTopic(RIGHT_TABLE, new StringSerializer(), new StringSerializer());
-            leftInput.pipeInput("lhs1", "lhsValue1|rhs1");
-            rightInput.pipeInput("rhs1", "rhsValue1");
-        }
-        // verifying primarily that no extra pseudo-topics were used, but it's nice to also verify the rest of the
-        // topics our serdes serialize data for
-        assertThat(serdeScope.registeredTopics(), is(mkSet(
-            // expected pseudo-topics
-            "KTABLE-FK-JOIN-SUBSCRIPTION-REGISTRATION-0000000006-topic-fk--key",
-            "KTABLE-FK-JOIN-SUBSCRIPTION-REGISTRATION-0000000006-topic-pk--key",
-            "KTABLE-FK-JOIN-SUBSCRIPTION-REGISTRATION-0000000006-topic-vh--value",
-            // internal topics
-            "ktable-ktable-joinOnForeignKey-KTABLE-FK-JOIN-SUBSCRIPTION-REGISTRATION-0000000006-topic--key",
-            "ktable-ktable-joinOnForeignKey-KTABLE-FK-JOIN-SUBSCRIPTION-RESPONSE-0000000014-topic--key",
-            "ktable-ktable-joinOnForeignKey-KTABLE-FK-JOIN-SUBSCRIPTION-RESPONSE-0000000014-topic--value",
-            "ktable-ktable-joinOnForeignKey-left_table-STATE-STORE-0000000000-changelog--key",
-            "ktable-ktable-joinOnForeignKey-left_table-STATE-STORE-0000000000-changelog--value",
-            "ktable-ktable-joinOnForeignKey-right_table-STATE-STORE-0000000003-changelog--key",
-            "ktable-ktable-joinOnForeignKey-right_table-STATE-STORE-0000000003-changelog--value",
-            // output topics
-            "output-topic--key",
-            "output-topic--value"
-        )));
-    }
-
-}
diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/foreignkeyjoin/CombinedKeySchemaTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/foreignkeyjoin/CombinedKeySchemaTest.java
index cb1ef59..17f0c79 100644
--- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/foreignkeyjoin/CombinedKeySchemaTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/foreignkeyjoin/CombinedKeySchemaTest.java
@@ -28,8 +28,10 @@ public class CombinedKeySchemaTest {
 
     @Test
     public void nonNullPrimaryKeySerdeTest() {
-        final CombinedKeySchema<String, Integer> cks = new CombinedKeySchema<>("fkTopic", Serdes.String(),
-                                                                               "pkTopic", Serdes.Integer());
+        final CombinedKeySchema<String, Integer> cks = new CombinedKeySchema<>(
+            () -> "fkTopic", Serdes.String(),
+            () -> "pkTopic", Serdes.Integer()
+        );
         final Integer primary = -999;
         final Bytes result = cks.toBytes("foreignKey", primary);
 
@@ -40,22 +42,28 @@ public class CombinedKeySchemaTest {
 
     @Test(expected = NullPointerException.class)
     public void nullPrimaryKeySerdeTest() {
-        final CombinedKeySchema<String, Integer> cks = new CombinedKeySchema<>("fkTopic", Serdes.String(),
-                                                                               "pkTopic", Serdes.Integer());
+        final CombinedKeySchema<String, Integer> cks = new CombinedKeySchema<>(
+            () -> "fkTopic", Serdes.String(),
+            () -> "pkTopic", Serdes.Integer()
+        );
         cks.toBytes("foreignKey", null);
     }
 
     @Test(expected = NullPointerException.class)
     public void nullForeignKeySerdeTest() {
-        final CombinedKeySchema<String, Integer> cks = new CombinedKeySchema<>("fkTopic", Serdes.String(),
-                                                                               "pkTopic", Serdes.Integer());
+        final CombinedKeySchema<String, Integer> cks = new CombinedKeySchema<>(
+            () -> "fkTopic", Serdes.String(),
+            () -> "pkTopic", Serdes.Integer()
+        );
         cks.toBytes(null, 10);
     }
 
     @Test
     public void prefixKeySerdeTest() {
-        final CombinedKeySchema<String, Integer> cks = new CombinedKeySchema<>("fkTopic", Serdes.String(),
-                                                                               "pkTopic", Serdes.Integer());
+        final CombinedKeySchema<String, Integer> cks = new CombinedKeySchema<>(
+            () -> "fkTopic", Serdes.String(),
+            () -> "pkTopic", Serdes.Integer()
+        );
         final String foreignKey = "someForeignKey";
         final byte[] foreignKeySerializedData =
             Serdes.String().serializer().serialize("fkTopic", foreignKey);
@@ -71,8 +79,10 @@ public class CombinedKeySchemaTest {
 
     @Test(expected = NullPointerException.class)
     public void nullPrefixKeySerdeTest() {
-        final CombinedKeySchema<String, Integer> cks = new CombinedKeySchema<>("fkTopic", Serdes.String(),
-                                                                               "pkTopic", Serdes.Integer());
+        final CombinedKeySchema<String, Integer> cks = new CombinedKeySchema<>(
+            () -> "fkTopic", Serdes.String(),
+            () -> "pkTopic", Serdes.Integer()
+        );
         final String foreignKey = null;
         cks.prefixBytes(foreignKey);
     }
diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/foreignkeyjoin/SubscriptionResolverJoinProcessorSupplierTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/foreignkeyjoin/SubscriptionResolverJoinProcessorSupplierTest.java
index aae99ec..b95569f 100644
--- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/foreignkeyjoin/SubscriptionResolverJoinProcessorSupplierTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/foreignkeyjoin/SubscriptionResolverJoinProcessorSupplierTest.java
@@ -82,7 +82,7 @@ public class SubscriptionResolverJoinProcessorSupplierTest {
             new SubscriptionResolverJoinProcessorSupplier<>(
                 valueGetterSupplier,
                 STRING_SERIALIZER,
-                "value-hash-dummy-topic",
+                () -> "value-hash-dummy-topic",
                 JOINER,
                 leftJoin
             );
@@ -107,7 +107,7 @@ public class SubscriptionResolverJoinProcessorSupplierTest {
             new SubscriptionResolverJoinProcessorSupplier<>(
                 valueGetterSupplier,
                 STRING_SERIALIZER,
-                "value-hash-dummy-topic",
+                () -> "value-hash-dummy-topic",
                 JOINER,
                 leftJoin
             );
@@ -132,7 +132,7 @@ public class SubscriptionResolverJoinProcessorSupplierTest {
             new SubscriptionResolverJoinProcessorSupplier<>(
                 valueGetterSupplier,
                 STRING_SERIALIZER,
-                "value-hash-dummy-topic",
+                () -> "value-hash-dummy-topic",
                 JOINER,
                 leftJoin
             );
@@ -158,7 +158,7 @@ public class SubscriptionResolverJoinProcessorSupplierTest {
             new SubscriptionResolverJoinProcessorSupplier<>(
                 valueGetterSupplier,
                 STRING_SERIALIZER,
-                "value-hash-dummy-topic",
+                () -> "value-hash-dummy-topic",
                 JOINER,
                 leftJoin
             );
@@ -184,7 +184,7 @@ public class SubscriptionResolverJoinProcessorSupplierTest {
             new SubscriptionResolverJoinProcessorSupplier<>(
                 valueGetterSupplier,
                 STRING_SERIALIZER,
-                "value-hash-dummy-topic",
+                () -> "value-hash-dummy-topic",
                 JOINER,
                 leftJoin
             );
@@ -210,7 +210,7 @@ public class SubscriptionResolverJoinProcessorSupplierTest {
             new SubscriptionResolverJoinProcessorSupplier<>(
                 valueGetterSupplier,
                 STRING_SERIALIZER,
-                "value-hash-dummy-topic",
+                () -> "value-hash-dummy-topic",
                 JOINER,
                 leftJoin
             );
diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/foreignkeyjoin/SubscriptionWrapperSerdeTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/foreignkeyjoin/SubscriptionWrapperSerdeTest.java
index 5c6551c..dd67b4b 100644
--- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/foreignkeyjoin/SubscriptionWrapperSerdeTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/foreignkeyjoin/SubscriptionWrapperSerdeTest.java
@@ -31,7 +31,7 @@ public class SubscriptionWrapperSerdeTest {
     @SuppressWarnings("unchecked")
     public void shouldSerdeTest() {
         final String originalKey = "originalKey";
-        final SubscriptionWrapperSerde swSerde = new SubscriptionWrapperSerde<>("pkTopic", Serdes.String());
+        final SubscriptionWrapperSerde swSerde = new SubscriptionWrapperSerde<>(() -> "pkTopic", Serdes.String());
         final long[] hashedValue = Murmur3.hash128(new byte[] {(byte) 0xFF, (byte) 0xAA, (byte) 0x00, (byte) 0x19});
         final SubscriptionWrapper wrapper = new SubscriptionWrapper<>(hashedValue, SubscriptionWrapper.Instruction.DELETE_KEY_AND_PROPAGATE, originalKey);
         final byte[] serialized = swSerde.serializer().serialize(null, wrapper);
@@ -46,7 +46,7 @@ public class SubscriptionWrapperSerdeTest {
     @SuppressWarnings("unchecked")
     public void shouldSerdeNullHashTest() {
         final String originalKey = "originalKey";
-        final SubscriptionWrapperSerde swSerde = new SubscriptionWrapperSerde<>("pkTopic", Serdes.String());
+        final SubscriptionWrapperSerde swSerde = new SubscriptionWrapperSerde<>(() -> "pkTopic", Serdes.String());
         final long[] hashedValue = null;
         final SubscriptionWrapper wrapper = new SubscriptionWrapper<>(hashedValue, SubscriptionWrapper.Instruction.PROPAGATE_ONLY_IF_FK_VAL_AVAILABLE, originalKey);
         final byte[] serialized = swSerde.serializer().serialize(null, wrapper);
@@ -61,7 +61,7 @@ public class SubscriptionWrapperSerdeTest {
     @SuppressWarnings("unchecked")
     public void shouldThrowExceptionOnNullKeyTest() {
         final String originalKey = null;
-        final SubscriptionWrapperSerde swSerde = new SubscriptionWrapperSerde<>("pkTopic", Serdes.String());
+        final SubscriptionWrapperSerde swSerde = new SubscriptionWrapperSerde<>(() -> "pkTopic", Serdes.String());
         final long[] hashedValue = Murmur3.hash128(new byte[] {(byte) 0xFF, (byte) 0xAA, (byte) 0x00, (byte) 0x19});
         final SubscriptionWrapper wrapper = new SubscriptionWrapper<>(hashedValue, SubscriptionWrapper.Instruction.PROPAGATE_ONLY_IF_FK_VAL_AVAILABLE, originalKey);
         swSerde.serializer().serialize(null, wrapper);
@@ -71,7 +71,7 @@ public class SubscriptionWrapperSerdeTest {
     @SuppressWarnings("unchecked")
     public void shouldThrowExceptionOnNullInstructionTest() {
         final String originalKey = "originalKey";
-        final SubscriptionWrapperSerde swSerde = new SubscriptionWrapperSerde<>("pkTopic", Serdes.String());
+        final SubscriptionWrapperSerde swSerde = new SubscriptionWrapperSerde<>(() -> "pkTopic", Serdes.String());
         final long[] hashedValue = Murmur3.hash128(new byte[] {(byte) 0xFF, (byte) 0xAA, (byte) 0x00, (byte) 0x19});
         final SubscriptionWrapper wrapper = new SubscriptionWrapper<>(hashedValue, null, originalKey);
         swSerde.serializer().serialize(null, wrapper);