You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by vv...@apache.org on 2020/04/29 22:06:21 UTC

[kafka] branch 2.5 updated (9e2785f -> d1ae75d)

This is an automated email from the ASF dual-hosted git repository.

vvcephei pushed a change to branch 2.5
in repository https://gitbox.apache.org/repos/asf/kafka.git.


    from 9e2785f  KAFKA-9127: don't create StreamThreads for global-only topology (#8540)
     new e3a8223  MINOR: reuse pseudo-topic in FKJoin (#8296)
     new d1ae75d  KAFKA-9925: decorate pseudo-topics with app id (#8574)

The 2 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 .../streams/kstream/internals/KTableImpl.java      | 25 ++++++--
 .../foreignkeyjoin/CombinedKeySchema.java          | 17 +++--
 ...reignJoinSubscriptionSendProcessorSupplier.java | 17 +++--
 .../SubscriptionResolverJoinProcessorSupplier.java | 15 +++--
 .../foreignkeyjoin/SubscriptionWrapperSerde.java   | 30 ++++++---
 .../internals/InternalTopologyBuilder.java         |  4 ++
 ...KTableKTableForeignKeyJoinDefaultSerdeTest.java | 74 +++++++++++++++++++++-
 .../foreignkeyjoin/CombinedKeySchemaTest.java      | 30 ++++++---
 ...scriptionResolverJoinProcessorSupplierTest.java |  6 ++
 .../SubscriptionWrapperSerdeTest.java              |  8 +--
 .../kafka/streams/utils/UniqueTopicSerdeScope.java |  6 ++
 11 files changed, 184 insertions(+), 48 deletions(-)


[kafka] 02/02: KAFKA-9925: decorate pseudo-topics with app id (#8574)

Posted by vv...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

vvcephei pushed a commit to branch 2.5
in repository https://gitbox.apache.org/repos/asf/kafka.git

commit d1ae75d5f9222da42b1a396f4dc1f01e3df0ef83
Author: John Roesler <vv...@users.noreply.github.com>
AuthorDate: Wed Apr 29 16:17:34 2020 -0500

    KAFKA-9925: decorate pseudo-topics with app id (#8574)
    
    Reviewers: Boyang Chen <bo...@confluent.io>, Kin Siu
---
 .../streams/kstream/internals/KTableImpl.java      |  24 +++-
 .../foreignkeyjoin/CombinedKeySchema.java          |  17 ++-
 ...reignJoinSubscriptionSendProcessorSupplier.java |  17 ++-
 .../SubscriptionResolverJoinProcessorSupplier.java |  10 +-
 .../foreignkeyjoin/SubscriptionWrapperSerde.java   |  30 +++--
 .../internals/InternalTopologyBuilder.java         |   4 +
 ...KTableKTableForeignKeyJoinDefaultSerdeTest.java |  74 ++++++++++-
 .../KTableKTableForeignKeyJoinPseudoTopicTest.java | 138 ---------------------
 .../foreignkeyjoin/CombinedKeySchemaTest.java      |  30 +++--
 ...scriptionResolverJoinProcessorSupplierTest.java |  12 +-
 .../SubscriptionWrapperSerdeTest.java              |   8 +-
 11 files changed, 176 insertions(+), 188 deletions(-)

diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableImpl.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableImpl.java
index 75cdffa..cc7d8f2 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableImpl.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableImpl.java
@@ -75,6 +75,7 @@ import java.util.HashSet;
 import java.util.Objects;
 import java.util.Set;
 import java.util.function.Function;
+import java.util.function.Supplier;
 
 import static org.apache.kafka.streams.kstream.internals.graph.GraphGraceSearchUtil.findAndVerifyWindowGrace;
 
@@ -953,13 +954,26 @@ public class KTableImpl<K, S, V> extends AbstractStream<K, V> implements KTable<
         //This occurs whenever the extracted foreignKey changes values.
         enableSendingOldValues();
 
+        final NamedInternal renamed = new NamedInternal(joinName);
+
+        final String subscriptionTopicName = renamed.suffixWithOrElseGet(
+            "-subscription-registration",
+            builder,
+            SUBSCRIPTION_REGISTRATION
+        ) + TOPIC_SUFFIX;
 
+        // the decoration can't be performed until we have the configuration available when the app runs,
+        // so we pass Suppliers into the components, which they can call at run time
+
+        final Supplier<String> subscriptionPrimaryKeySerdePseudoTopic =
+            () -> internalTopologyBuilder().decoratePseudoTopic(subscriptionTopicName + "-pk");
+
+        final Supplier<String> subscriptionForeignKeySerdePseudoTopic =
+            () -> internalTopologyBuilder().decoratePseudoTopic(subscriptionTopicName + "-fk");
+
+        final Supplier<String> valueHashSerdePseudoTopic =
+            () -> internalTopologyBuilder().decoratePseudoTopic(subscriptionTopicName + "-vh");
 
-        final NamedInternal renamed = new NamedInternal(joinName);
-        final String subscriptionTopicName = renamed.suffixWithOrElseGet("-subscription-registration", builder, SUBSCRIPTION_REGISTRATION) + TOPIC_SUFFIX;
-        final String subscriptionPrimaryKeySerdePseudoTopic = subscriptionTopicName + "-pk";
-        final String subscriptionForeignKeySerdePseudoTopic = subscriptionTopicName + "-fk";
-        final String valueHashSerdePseudoTopic = subscriptionTopicName + "-vh";
         builder.internalTopologyBuilder.addInternalTopic(subscriptionTopicName);
 
         final Serde<KO> foreignKeySerde = ((KTableImpl<KO, VO, ?>) foreignKeyTable).keySerde;
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/foreignkeyjoin/CombinedKeySchema.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/foreignkeyjoin/CombinedKeySchema.java
index 92fb72c..57bc646 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/foreignkeyjoin/CombinedKeySchema.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/foreignkeyjoin/CombinedKeySchema.java
@@ -23,24 +23,27 @@ import org.apache.kafka.common.utils.Bytes;
 import org.apache.kafka.streams.processor.ProcessorContext;
 
 import java.nio.ByteBuffer;
+import java.util.function.Supplier;
 
 /**
  * Factory for creating CombinedKey serializers / deserializers.
  */
 public class CombinedKeySchema<KO, K> {
-    private final String primaryKeySerdeTopic;
-    private final String foreignKeySerdeTopic;
+    private final Supplier<String> undecoratedPrimaryKeySerdeTopicSupplier;
+    private final Supplier<String> undecoratedForeignKeySerdeTopicSupplier;
+    private String primaryKeySerdeTopic;
+    private String foreignKeySerdeTopic;
     private Serializer<K> primaryKeySerializer;
     private Deserializer<K> primaryKeyDeserializer;
     private Serializer<KO> foreignKeySerializer;
     private Deserializer<KO> foreignKeyDeserializer;
 
-    public CombinedKeySchema(final String foreignKeySerdeTopic,
+    public CombinedKeySchema(final Supplier<String> foreignKeySerdeTopicSupplier,
                              final Serde<KO> foreignKeySerde,
-                             final String primaryKeySerdeTopic,
+                             final Supplier<String> primaryKeySerdeTopicSupplier,
                              final Serde<K> primaryKeySerde) {
-        this.primaryKeySerdeTopic = primaryKeySerdeTopic;
-        this.foreignKeySerdeTopic = foreignKeySerdeTopic;
+        undecoratedPrimaryKeySerdeTopicSupplier = primaryKeySerdeTopicSupplier;
+        undecoratedForeignKeySerdeTopicSupplier = foreignKeySerdeTopicSupplier;
         primaryKeySerializer = primaryKeySerde == null ? null : primaryKeySerde.serializer();
         primaryKeyDeserializer = primaryKeySerde == null ? null : primaryKeySerde.deserializer();
         foreignKeyDeserializer = foreignKeySerde == null ? null : foreignKeySerde.deserializer();
@@ -49,6 +52,8 @@ public class CombinedKeySchema<KO, K> {
 
     @SuppressWarnings("unchecked")
     public void init(final ProcessorContext context) {
+        primaryKeySerdeTopic = undecoratedPrimaryKeySerdeTopicSupplier.get();
+        foreignKeySerdeTopic = undecoratedForeignKeySerdeTopicSupplier.get();
         primaryKeySerializer = primaryKeySerializer == null ? (Serializer<K>) context.keySerde().serializer() : primaryKeySerializer;
         primaryKeyDeserializer = primaryKeyDeserializer == null ? (Deserializer<K>) context.keySerde().deserializer() : primaryKeyDeserializer;
         foreignKeySerializer = foreignKeySerializer == null ? (Serializer<KO>) context.keySerde().serializer() : foreignKeySerializer;
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/foreignkeyjoin/ForeignJoinSubscriptionSendProcessorSupplier.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/foreignkeyjoin/ForeignJoinSubscriptionSendProcessorSupplier.java
index ba794f7..9787875 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/foreignkeyjoin/ForeignJoinSubscriptionSendProcessorSupplier.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/foreignkeyjoin/ForeignJoinSubscriptionSendProcessorSupplier.java
@@ -33,6 +33,7 @@ import org.slf4j.LoggerFactory;
 
 import java.util.Arrays;
 import java.util.function.Function;
+import java.util.function.Supplier;
 
 import static org.apache.kafka.streams.kstream.internals.foreignkeyjoin.SubscriptionWrapper.Instruction.DELETE_KEY_AND_PROPAGATE;
 import static org.apache.kafka.streams.kstream.internals.foreignkeyjoin.SubscriptionWrapper.Instruction.DELETE_KEY_NO_PROPAGATE;
@@ -43,21 +44,21 @@ public class ForeignJoinSubscriptionSendProcessorSupplier<K, KO, V> implements P
     private static final Logger LOG = LoggerFactory.getLogger(ForeignJoinSubscriptionSendProcessorSupplier.class);
 
     private final Function<V, KO> foreignKeyExtractor;
-    private final String foreignKeySerdeTopic;
-    private final String valueSerdeTopic;
+    private final Supplier<String> foreignKeySerdeTopicSupplier;
+    private final Supplier<String> valueSerdeTopicSupplier;
     private final boolean leftJoin;
     private Serializer<KO> foreignKeySerializer;
     private Serializer<V> valueSerializer;
 
     public ForeignJoinSubscriptionSendProcessorSupplier(final Function<V, KO> foreignKeyExtractor,
-                                                        final String foreignKeySerdeTopic,
-                                                        final String valueSerdeTopic,
+                                                        final Supplier<String> foreignKeySerdeTopicSupplier,
+                                                        final Supplier<String> valueSerdeTopicSupplier,
                                                         final Serde<KO> foreignKeySerde,
                                                         final Serializer<V> valueSerializer,
                                                         final boolean leftJoin) {
         this.foreignKeyExtractor = foreignKeyExtractor;
-        this.foreignKeySerdeTopic = foreignKeySerdeTopic;
-        this.valueSerdeTopic = valueSerdeTopic;
+        this.foreignKeySerdeTopicSupplier = foreignKeySerdeTopicSupplier;
+        this.valueSerdeTopicSupplier = valueSerdeTopicSupplier;
         this.valueSerializer = valueSerializer;
         this.leftJoin = leftJoin;
         foreignKeySerializer = foreignKeySerde == null ? null : foreignKeySerde.serializer();
@@ -71,11 +72,15 @@ public class ForeignJoinSubscriptionSendProcessorSupplier<K, KO, V> implements P
     private class UnbindChangeProcessor extends AbstractProcessor<K, Change<V>> {
 
         private Sensor droppedRecordsSensor;
+        private String foreignKeySerdeTopic;
+        private String valueSerdeTopic;
 
         @SuppressWarnings("unchecked")
         @Override
         public void init(final ProcessorContext context) {
             super.init(context);
+            foreignKeySerdeTopic = foreignKeySerdeTopicSupplier.get();
+            valueSerdeTopic = valueSerdeTopicSupplier.get();
             // get default key serde if it wasn't supplied directly at construction
             if (foreignKeySerializer == null) {
                 foreignKeySerializer = (Serializer<KO>) context.keySerde().serializer();
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/foreignkeyjoin/SubscriptionResolverJoinProcessorSupplier.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/foreignkeyjoin/SubscriptionResolverJoinProcessorSupplier.java
index 31de068..3cd0636 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/foreignkeyjoin/SubscriptionResolverJoinProcessorSupplier.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/foreignkeyjoin/SubscriptionResolverJoinProcessorSupplier.java
@@ -29,6 +29,8 @@ import org.apache.kafka.streams.processor.ProcessorSupplier;
 import org.apache.kafka.streams.state.ValueAndTimestamp;
 import org.apache.kafka.streams.state.internals.Murmur3;
 
+import java.util.function.Supplier;
+
 /**
  * Receives {@code SubscriptionResponseWrapper<VO>} events and filters out events which do not match the current hash
  * of the primary key. This eliminates race-condition results for rapidly-changing foreign-keys for a given primary key.
@@ -42,18 +44,18 @@ import org.apache.kafka.streams.state.internals.Murmur3;
 public class SubscriptionResolverJoinProcessorSupplier<K, V, VO, VR> implements ProcessorSupplier<K, SubscriptionResponseWrapper<VO>> {
     private final KTableValueGetterSupplier<K, V> valueGetterSupplier;
     private final Serializer<V> constructionTimeValueSerializer;
-    private final String valueHashSerdePseudoTopic;
+    private final Supplier<String> valueHashSerdePseudoTopicSupplier;
     private final ValueJoiner<V, VO, VR> joiner;
     private final boolean leftJoin;
 
     public SubscriptionResolverJoinProcessorSupplier(final KTableValueGetterSupplier<K, V> valueGetterSupplier,
                                                      final Serializer<V> valueSerializer,
-                                                     final String valueHashSerdePseudoTopic,
+                                                     final Supplier<String> valueHashSerdePseudoTopicSupplier,
                                                      final ValueJoiner<V, VO, VR> joiner,
                                                      final boolean leftJoin) {
         this.valueGetterSupplier = valueGetterSupplier;
         constructionTimeValueSerializer = valueSerializer;
-        this.valueHashSerdePseudoTopic = valueHashSerdePseudoTopic;
+        this.valueHashSerdePseudoTopicSupplier = valueHashSerdePseudoTopicSupplier;
         this.joiner = joiner;
         this.leftJoin = leftJoin;
     }
@@ -61,6 +63,7 @@ public class SubscriptionResolverJoinProcessorSupplier<K, V, VO, VR> implements
     @Override
     public Processor<K, SubscriptionResponseWrapper<VO>> get() {
         return new AbstractProcessor<K, SubscriptionResponseWrapper<VO>>() {
+            private String valueHashSerdePseudoTopic;
             private Serializer<V> runtimeValueSerializer = constructionTimeValueSerializer;
 
             private KTableValueGetter<K, V> valueGetter;
@@ -69,6 +72,7 @@ public class SubscriptionResolverJoinProcessorSupplier<K, V, VO, VR> implements
             @Override
             public void init(final ProcessorContext context) {
                 super.init(context);
+                valueHashSerdePseudoTopic = valueHashSerdePseudoTopicSupplier.get();
                 valueGetter = valueGetterSupplier.get();
                 valueGetter.init(context);
                 if (runtimeValueSerializer == null) {
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/foreignkeyjoin/SubscriptionWrapperSerde.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/foreignkeyjoin/SubscriptionWrapperSerde.java
index 42aed94..136128c 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/foreignkeyjoin/SubscriptionWrapperSerde.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/foreignkeyjoin/SubscriptionWrapperSerde.java
@@ -25,16 +25,17 @@ import org.apache.kafka.streams.kstream.internals.WrappingNullableSerializer;
 
 import java.nio.ByteBuffer;
 import java.util.Objects;
+import java.util.function.Supplier;
 
 public class SubscriptionWrapperSerde<K> implements Serde<SubscriptionWrapper<K>> {
     private final SubscriptionWrapperSerializer<K> serializer;
     private final SubscriptionWrapperDeserializer<K> deserializer;
 
-    public SubscriptionWrapperSerde(final String primaryKeySerializationPseudoTopic,
+    public SubscriptionWrapperSerde(final Supplier<String> primaryKeySerializationPseudoTopicSupplier,
                                     final Serde<K> primaryKeySerde) {
-        serializer = new SubscriptionWrapperSerializer<>(primaryKeySerializationPseudoTopic,
+        serializer = new SubscriptionWrapperSerializer<>(primaryKeySerializationPseudoTopicSupplier,
                                                          primaryKeySerde == null ? null : primaryKeySerde.serializer());
-        deserializer = new SubscriptionWrapperDeserializer<>(primaryKeySerializationPseudoTopic,
+        deserializer = new SubscriptionWrapperDeserializer<>(primaryKeySerializationPseudoTopicSupplier,
                                                              primaryKeySerde == null ? null : primaryKeySerde.deserializer());
     }
 
@@ -51,12 +52,13 @@ public class SubscriptionWrapperSerde<K> implements Serde<SubscriptionWrapper<K>
     private static class SubscriptionWrapperSerializer<K>
         implements Serializer<SubscriptionWrapper<K>>, WrappingNullableSerializer<SubscriptionWrapper<K>, K> {
 
-        private final String primaryKeySerializationPseudoTopic;
+        private final Supplier<String> primaryKeySerializationPseudoTopicSupplier;
+        private String primaryKeySerializationPseudoTopic = null;
         private Serializer<K> primaryKeySerializer;
 
-        SubscriptionWrapperSerializer(final String primaryKeySerializationPseudoTopic,
+        SubscriptionWrapperSerializer(final Supplier<String> primaryKeySerializationPseudoTopicSupplier,
                                       final Serializer<K> primaryKeySerializer) {
-            this.primaryKeySerializationPseudoTopic = primaryKeySerializationPseudoTopic;
+            this.primaryKeySerializationPseudoTopicSupplier = primaryKeySerializationPseudoTopicSupplier;
             this.primaryKeySerializer = primaryKeySerializer;
         }
 
@@ -76,6 +78,10 @@ public class SubscriptionWrapperSerde<K> implements Serde<SubscriptionWrapper<K>
                 throw new UnsupportedVersionException("SubscriptionWrapper version is larger than maximum supported 0x7F");
             }
 
+            if (primaryKeySerializationPseudoTopic == null) {
+                primaryKeySerializationPseudoTopic = primaryKeySerializationPseudoTopicSupplier.get();
+            }
+
             final byte[] primaryKeySerializedData = primaryKeySerializer.serialize(
                 primaryKeySerializationPseudoTopic,
                 data.getPrimaryKey()
@@ -106,12 +112,13 @@ public class SubscriptionWrapperSerde<K> implements Serde<SubscriptionWrapper<K>
     private static class SubscriptionWrapperDeserializer<K>
         implements Deserializer<SubscriptionWrapper<K>>, WrappingNullableDeserializer<SubscriptionWrapper<K>, K> {
 
-        private final String primaryKeySerializationPseudoTopic;
+        private final Supplier<String> primaryKeySerializationPseudoTopicSupplier;
+        private String primaryKeySerializationPseudoTopic = null;
         private Deserializer<K> primaryKeyDeserializer;
 
-        SubscriptionWrapperDeserializer(final String primaryKeySerializationPseudoTopic,
+        SubscriptionWrapperDeserializer(final Supplier<String> primaryKeySerializationPseudoTopicSupplier,
                                         final Deserializer<K> primaryKeyDeserializer) {
-            this.primaryKeySerializationPseudoTopic = primaryKeySerializationPseudoTopic;
+            this.primaryKeySerializationPseudoTopicSupplier = primaryKeySerializationPseudoTopicSupplier;
             this.primaryKeyDeserializer = primaryKeyDeserializer;
         }
 
@@ -144,6 +151,11 @@ public class SubscriptionWrapperSerde<K> implements Serde<SubscriptionWrapper<K>
 
             final byte[] primaryKeyRaw = new byte[data.length - lengthSum]; //The remaining data is the serialized pk
             buf.get(primaryKeyRaw, 0, primaryKeyRaw.length);
+
+            if (primaryKeySerializationPseudoTopic == null) {
+                primaryKeySerializationPseudoTopic = primaryKeySerializationPseudoTopicSupplier.get();
+            }
+
             final K primaryKey = primaryKeyDeserializer.deserialize(primaryKeySerializationPseudoTopic,
                                                                     primaryKeyRaw);
 
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalTopologyBuilder.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalTopologyBuilder.java
index f36324d..41123b3 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalTopologyBuilder.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalTopologyBuilder.java
@@ -1203,6 +1203,10 @@ public class InternalTopologyBuilder {
         return decoratedTopics;
     }
 
+    public String decoratePseudoTopic(final String topic) {
+        return decorateTopic(topic);
+    }
+
     private String decorateTopic(final String topic) {
         if (applicationId == null) {
             throw new TopologyException("there are internal topics and "
diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/KTableKTableForeignKeyJoinDefaultSerdeTest.java b/streams/src/test/java/org/apache/kafka/streams/integration/KTableKTableForeignKeyJoinDefaultSerdeTest.java
index 5af0530..64c6b06 100644
--- a/streams/src/test/java/org/apache/kafka/streams/integration/KTableKTableForeignKeyJoinDefaultSerdeTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/integration/KTableKTableForeignKeyJoinDefaultSerdeTest.java
@@ -24,12 +24,14 @@ import org.apache.kafka.streams.StreamsBuilder;
 import org.apache.kafka.streams.StreamsConfig;
 import org.apache.kafka.streams.TestInputTopic;
 import org.apache.kafka.streams.TestOutputTopic;
+import org.apache.kafka.streams.Topology;
 import org.apache.kafka.streams.TopologyTestDriver;
 import org.apache.kafka.streams.kstream.Consumed;
 import org.apache.kafka.streams.kstream.KTable;
 import org.apache.kafka.streams.kstream.Materialized;
 import org.apache.kafka.streams.kstream.Produced;
 import org.apache.kafka.streams.state.KeyValueStore;
+import org.apache.kafka.streams.utils.UniqueTopicSerdeScope;
 import org.apache.kafka.test.TestUtils;
 import org.junit.Test;
 
@@ -38,10 +40,15 @@ import java.util.Map;
 import java.util.Properties;
 import java.util.UUID;
 
+import static org.apache.kafka.common.utils.Utils.mkEntry;
+import static org.apache.kafka.common.utils.Utils.mkMap;
+import static org.apache.kafka.common.utils.Utils.mkProperties;
+import static org.apache.kafka.common.utils.Utils.mkSet;
 import static org.hamcrest.MatcherAssert.assertThat;
 import static org.hamcrest.Matchers.is;
 
 public class KTableKTableForeignKeyJoinDefaultSerdeTest {
+
     @Test
     public void shouldWorkWithDefaultSerdes() {
         final StreamsBuilder builder = new StreamsBuilder();
@@ -161,7 +168,72 @@ public class KTableKTableForeignKeyJoinDefaultSerdeTest {
         validateTopologyCanProcessData(builder);
     }
 
-    private static void validateTopologyCanProcessData(final StreamsBuilder builder) {
+    @Test
+    public void shouldUseExpectedTopicsWithSerde() {
+        final String applicationId = "ktable-ktable-joinOnForeignKey";
+        final Properties streamsConfig = mkProperties(mkMap(
+            mkEntry(StreamsConfig.APPLICATION_ID_CONFIG, applicationId),
+            mkEntry(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "asdf:0000"),
+            mkEntry(StreamsConfig.STATE_DIR_CONFIG, TestUtils.tempDirectory().getPath())
+        ));
+
+        final UniqueTopicSerdeScope serdeScope = new UniqueTopicSerdeScope();
+        final StreamsBuilder builder = new StreamsBuilder();
+
+        final String leftTable = "left_table";
+        final String rightTable = "right_table";
+        final String output = "output-topic";
+
+        final KTable<String, String> left = builder.table(
+            leftTable,
+            Consumed.with(serdeScope.decorateSerde(Serdes.String(), streamsConfig, true),
+                          serdeScope.decorateSerde(Serdes.String(), streamsConfig, false))
+        );
+        final KTable<String, String> right = builder.table(
+            rightTable,
+            Consumed.with(serdeScope.decorateSerde(Serdes.String(), streamsConfig, true),
+                          serdeScope.decorateSerde(Serdes.String(), streamsConfig, false))
+        );
+
+        left.join(
+            right,
+            value -> value.split("\\|")[1],
+            (value1, value2) -> "(" + value1 + "," + value2 + ")",
+            Materialized.with(null, serdeScope.decorateSerde(Serdes.String(), streamsConfig, false)
+            ))
+            .toStream()
+            .to(output);
+
+
+        final Topology topology = builder.build(streamsConfig);
+        try (final TopologyTestDriver driver = new TopologyTestDriver(topology, streamsConfig)) {
+            final TestInputTopic<String, String> leftInput = driver.createInputTopic(leftTable, new StringSerializer(), new StringSerializer());
+            final TestInputTopic<String, String> rightInput = driver.createInputTopic(rightTable, new StringSerializer(), new StringSerializer());
+            leftInput.pipeInput("lhs1", "lhsValue1|rhs1");
+            rightInput.pipeInput("rhs1", "rhsValue1");
+        }
+        // verifying primarily that no extra pseudo-topics were used, but it's nice to also verify the rest of the
+        // topics our serdes serialize data for
+        assertThat(serdeScope.registeredTopics(), is(mkSet(
+            // expected pseudo-topics
+            applicationId + "-KTABLE-FK-JOIN-SUBSCRIPTION-REGISTRATION-0000000006-topic-fk--key",
+            applicationId + "-KTABLE-FK-JOIN-SUBSCRIPTION-REGISTRATION-0000000006-topic-pk--key",
+            applicationId + "-KTABLE-FK-JOIN-SUBSCRIPTION-REGISTRATION-0000000006-topic-vh--value",
+            // internal topics
+            applicationId + "-KTABLE-FK-JOIN-SUBSCRIPTION-REGISTRATION-0000000006-topic--key",
+            applicationId + "-KTABLE-FK-JOIN-SUBSCRIPTION-RESPONSE-0000000014-topic--key",
+            applicationId + "-KTABLE-FK-JOIN-SUBSCRIPTION-RESPONSE-0000000014-topic--value",
+            applicationId + "-left_table-STATE-STORE-0000000000-changelog--key",
+            applicationId + "-left_table-STATE-STORE-0000000000-changelog--value",
+            applicationId + "-right_table-STATE-STORE-0000000003-changelog--key",
+            applicationId + "-right_table-STATE-STORE-0000000003-changelog--value",
+            // output topics
+            "output-topic--key",
+            "output-topic--value"
+        )));
+    }
+
+    private void validateTopologyCanProcessData(final StreamsBuilder builder) {
         final Properties config = new Properties();
         config.setProperty(StreamsConfig.APPLICATION_ID_CONFIG, "dummy-" + UUID.randomUUID());
         config.setProperty(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "dummy");
diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/KTableKTableForeignKeyJoinPseudoTopicTest.java b/streams/src/test/java/org/apache/kafka/streams/integration/KTableKTableForeignKeyJoinPseudoTopicTest.java
deleted file mode 100644
index b08c293..0000000
--- a/streams/src/test/java/org/apache/kafka/streams/integration/KTableKTableForeignKeyJoinPseudoTopicTest.java
+++ /dev/null
@@ -1,138 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.kafka.streams.integration;
-
-import org.apache.kafka.common.serialization.Serdes;
-import org.apache.kafka.common.serialization.StringSerializer;
-import org.apache.kafka.streams.StreamsBuilder;
-import org.apache.kafka.streams.StreamsConfig;
-import org.apache.kafka.streams.TestInputTopic;
-import org.apache.kafka.streams.Topology;
-import org.apache.kafka.streams.TopologyTestDriver;
-import org.apache.kafka.streams.kstream.Consumed;
-import org.apache.kafka.streams.kstream.KTable;
-import org.apache.kafka.streams.kstream.Materialized;
-import org.apache.kafka.streams.utils.UniqueTopicSerdeScope;
-import org.apache.kafka.test.TestUtils;
-import org.junit.Test;
-
-import java.util.Collection;
-import java.util.LinkedList;
-import java.util.List;
-import java.util.Properties;
-
-import static org.apache.kafka.common.utils.Utils.mkEntry;
-import static org.apache.kafka.common.utils.Utils.mkMap;
-import static org.apache.kafka.common.utils.Utils.mkProperties;
-import static org.apache.kafka.common.utils.Utils.mkSet;
-import static org.hamcrest.CoreMatchers.is;
-import static org.hamcrest.MatcherAssert.assertThat;
-
-
-public class KTableKTableForeignKeyJoinPseudoTopicTest {
-
-    private static final String LEFT_TABLE = "left_table";
-    private static final String RIGHT_TABLE = "right_table";
-    private static final String OUTPUT = "output-topic";
-    private static final String REJOIN_OUTPUT = "rejoin-output-topic";
-    private final Properties streamsConfig = mkProperties(mkMap(
-        mkEntry(StreamsConfig.APPLICATION_ID_CONFIG, "ktable-ktable-joinOnForeignKey"),
-        mkEntry(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "asdf:0000"),
-        mkEntry(StreamsConfig.STATE_DIR_CONFIG, TestUtils.tempDirectory().getPath())
-    ));
-
-
-    private static Collection<Object[]> buildParameters(final List<?>... argOptions) {
-        List<Object[]> result = new LinkedList<>();
-        result.add(new Object[0]);
-
-        for (final List<?> argOption : argOptions) {
-            result = times(result, argOption);
-        }
-
-        return result;
-    }
-
-    private static List<Object[]> times(final List<Object[]> left, final List<?> right) {
-        final List<Object[]> result = new LinkedList<>();
-        for (final Object[] args : left) {
-            for (final Object rightElem : right) {
-                final Object[] resArgs = new Object[args.length + 1];
-                System.arraycopy(args, 0, resArgs, 0, args.length);
-                resArgs[args.length] = rightElem;
-                result.add(resArgs);
-            }
-        }
-        return result;
-    }
-
-
-    @Test
-    public void shouldUseExpectedTopicsWithSerde() {
-        final UniqueTopicSerdeScope serdeScope = new UniqueTopicSerdeScope();
-        final StreamsBuilder builder = new StreamsBuilder();
-
-        final KTable<String, String> left = builder.table(
-            LEFT_TABLE,
-            Consumed.with(serdeScope.decorateSerde(Serdes.String(), streamsConfig, true),
-                          serdeScope.decorateSerde(Serdes.String(), streamsConfig, false))
-        );
-        final KTable<String, String> right = builder.table(
-            RIGHT_TABLE,
-            Consumed.with(serdeScope.decorateSerde(Serdes.String(), streamsConfig, true),
-                          serdeScope.decorateSerde(Serdes.String(), streamsConfig, false))
-        );
-
-        left.join(
-            right,
-            value -> value.split("\\|")[1],
-            (value1, value2) -> "(" + value1 + "," + value2 + ")",
-            Materialized.with(null, serdeScope.decorateSerde(Serdes.String(), streamsConfig, false)
-            ))
-            .toStream()
-            .to(OUTPUT);
-
-
-        final Topology topology = builder.build(streamsConfig);
-        try (final TopologyTestDriver driver = new TopologyTestDriver(topology, streamsConfig)) {
-            final TestInputTopic<String, String> leftInput = driver.createInputTopic(LEFT_TABLE, new StringSerializer(), new StringSerializer());
-            final TestInputTopic<String, String> rightInput = driver.createInputTopic(RIGHT_TABLE, new StringSerializer(), new StringSerializer());
-            leftInput.pipeInput("lhs1", "lhsValue1|rhs1");
-            rightInput.pipeInput("rhs1", "rhsValue1");
-        }
-        // verifying primarily that no extra pseudo-topics were used, but it's nice to also verify the rest of the
-        // topics our serdes serialize data for
-        assertThat(serdeScope.registeredTopics(), is(mkSet(
-            // expected pseudo-topics
-            "KTABLE-FK-JOIN-SUBSCRIPTION-REGISTRATION-0000000006-topic-fk--key",
-            "KTABLE-FK-JOIN-SUBSCRIPTION-REGISTRATION-0000000006-topic-pk--key",
-            "KTABLE-FK-JOIN-SUBSCRIPTION-REGISTRATION-0000000006-topic-vh--value",
-            // internal topics
-            "ktable-ktable-joinOnForeignKey-KTABLE-FK-JOIN-SUBSCRIPTION-REGISTRATION-0000000006-topic--key",
-            "ktable-ktable-joinOnForeignKey-KTABLE-FK-JOIN-SUBSCRIPTION-RESPONSE-0000000014-topic--key",
-            "ktable-ktable-joinOnForeignKey-KTABLE-FK-JOIN-SUBSCRIPTION-RESPONSE-0000000014-topic--value",
-            "ktable-ktable-joinOnForeignKey-left_table-STATE-STORE-0000000000-changelog--key",
-            "ktable-ktable-joinOnForeignKey-left_table-STATE-STORE-0000000000-changelog--value",
-            "ktable-ktable-joinOnForeignKey-right_table-STATE-STORE-0000000003-changelog--key",
-            "ktable-ktable-joinOnForeignKey-right_table-STATE-STORE-0000000003-changelog--value",
-            // output topics
-            "output-topic--key",
-            "output-topic--value"
-        )));
-    }
-
-}
diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/foreignkeyjoin/CombinedKeySchemaTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/foreignkeyjoin/CombinedKeySchemaTest.java
index cb1ef59..17f0c79 100644
--- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/foreignkeyjoin/CombinedKeySchemaTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/foreignkeyjoin/CombinedKeySchemaTest.java
@@ -28,8 +28,10 @@ public class CombinedKeySchemaTest {
 
     @Test
     public void nonNullPrimaryKeySerdeTest() {
-        final CombinedKeySchema<String, Integer> cks = new CombinedKeySchema<>("fkTopic", Serdes.String(),
-                                                                               "pkTopic", Serdes.Integer());
+        final CombinedKeySchema<String, Integer> cks = new CombinedKeySchema<>(
+            () -> "fkTopic", Serdes.String(),
+            () -> "pkTopic", Serdes.Integer()
+        );
         final Integer primary = -999;
         final Bytes result = cks.toBytes("foreignKey", primary);
 
@@ -40,22 +42,28 @@ public class CombinedKeySchemaTest {
 
     @Test(expected = NullPointerException.class)
     public void nullPrimaryKeySerdeTest() {
-        final CombinedKeySchema<String, Integer> cks = new CombinedKeySchema<>("fkTopic", Serdes.String(),
-                                                                               "pkTopic", Serdes.Integer());
+        final CombinedKeySchema<String, Integer> cks = new CombinedKeySchema<>(
+            () -> "fkTopic", Serdes.String(),
+            () -> "pkTopic", Serdes.Integer()
+        );
         cks.toBytes("foreignKey", null);
     }
 
     @Test(expected = NullPointerException.class)
     public void nullForeignKeySerdeTest() {
-        final CombinedKeySchema<String, Integer> cks = new CombinedKeySchema<>("fkTopic", Serdes.String(),
-                                                                               "pkTopic", Serdes.Integer());
+        final CombinedKeySchema<String, Integer> cks = new CombinedKeySchema<>(
+            () -> "fkTopic", Serdes.String(),
+            () -> "pkTopic", Serdes.Integer()
+        );
         cks.toBytes(null, 10);
     }
 
     @Test
     public void prefixKeySerdeTest() {
-        final CombinedKeySchema<String, Integer> cks = new CombinedKeySchema<>("fkTopic", Serdes.String(),
-                                                                               "pkTopic", Serdes.Integer());
+        final CombinedKeySchema<String, Integer> cks = new CombinedKeySchema<>(
+            () -> "fkTopic", Serdes.String(),
+            () -> "pkTopic", Serdes.Integer()
+        );
         final String foreignKey = "someForeignKey";
         final byte[] foreignKeySerializedData =
             Serdes.String().serializer().serialize("fkTopic", foreignKey);
@@ -71,8 +79,10 @@ public class CombinedKeySchemaTest {
 
     @Test(expected = NullPointerException.class)
     public void nullPrefixKeySerdeTest() {
-        final CombinedKeySchema<String, Integer> cks = new CombinedKeySchema<>("fkTopic", Serdes.String(),
-                                                                               "pkTopic", Serdes.Integer());
+        final CombinedKeySchema<String, Integer> cks = new CombinedKeySchema<>(
+            () -> "fkTopic", Serdes.String(),
+            () -> "pkTopic", Serdes.Integer()
+        );
         final String foreignKey = null;
         cks.prefixBytes(foreignKey);
     }
diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/foreignkeyjoin/SubscriptionResolverJoinProcessorSupplierTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/foreignkeyjoin/SubscriptionResolverJoinProcessorSupplierTest.java
index aae99ec..b95569f 100644
--- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/foreignkeyjoin/SubscriptionResolverJoinProcessorSupplierTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/foreignkeyjoin/SubscriptionResolverJoinProcessorSupplierTest.java
@@ -82,7 +82,7 @@ public class SubscriptionResolverJoinProcessorSupplierTest {
             new SubscriptionResolverJoinProcessorSupplier<>(
                 valueGetterSupplier,
                 STRING_SERIALIZER,
-                "value-hash-dummy-topic",
+                () -> "value-hash-dummy-topic",
                 JOINER,
                 leftJoin
             );
@@ -107,7 +107,7 @@ public class SubscriptionResolverJoinProcessorSupplierTest {
             new SubscriptionResolverJoinProcessorSupplier<>(
                 valueGetterSupplier,
                 STRING_SERIALIZER,
-                "value-hash-dummy-topic",
+                () -> "value-hash-dummy-topic",
                 JOINER,
                 leftJoin
             );
@@ -132,7 +132,7 @@ public class SubscriptionResolverJoinProcessorSupplierTest {
             new SubscriptionResolverJoinProcessorSupplier<>(
                 valueGetterSupplier,
                 STRING_SERIALIZER,
-                "value-hash-dummy-topic",
+                () -> "value-hash-dummy-topic",
                 JOINER,
                 leftJoin
             );
@@ -158,7 +158,7 @@ public class SubscriptionResolverJoinProcessorSupplierTest {
             new SubscriptionResolverJoinProcessorSupplier<>(
                 valueGetterSupplier,
                 STRING_SERIALIZER,
-                "value-hash-dummy-topic",
+                () -> "value-hash-dummy-topic",
                 JOINER,
                 leftJoin
             );
@@ -184,7 +184,7 @@ public class SubscriptionResolverJoinProcessorSupplierTest {
             new SubscriptionResolverJoinProcessorSupplier<>(
                 valueGetterSupplier,
                 STRING_SERIALIZER,
-                "value-hash-dummy-topic",
+                () -> "value-hash-dummy-topic",
                 JOINER,
                 leftJoin
             );
@@ -210,7 +210,7 @@ public class SubscriptionResolverJoinProcessorSupplierTest {
             new SubscriptionResolverJoinProcessorSupplier<>(
                 valueGetterSupplier,
                 STRING_SERIALIZER,
-                "value-hash-dummy-topic",
+                () -> "value-hash-dummy-topic",
                 JOINER,
                 leftJoin
             );
diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/foreignkeyjoin/SubscriptionWrapperSerdeTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/foreignkeyjoin/SubscriptionWrapperSerdeTest.java
index 5c6551c..dd67b4b 100644
--- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/foreignkeyjoin/SubscriptionWrapperSerdeTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/foreignkeyjoin/SubscriptionWrapperSerdeTest.java
@@ -31,7 +31,7 @@ public class SubscriptionWrapperSerdeTest {
     @SuppressWarnings("unchecked")
     public void shouldSerdeTest() {
         final String originalKey = "originalKey";
-        final SubscriptionWrapperSerde swSerde = new SubscriptionWrapperSerde<>("pkTopic", Serdes.String());
+        final SubscriptionWrapperSerde swSerde = new SubscriptionWrapperSerde<>(() -> "pkTopic", Serdes.String());
         final long[] hashedValue = Murmur3.hash128(new byte[] {(byte) 0xFF, (byte) 0xAA, (byte) 0x00, (byte) 0x19});
         final SubscriptionWrapper wrapper = new SubscriptionWrapper<>(hashedValue, SubscriptionWrapper.Instruction.DELETE_KEY_AND_PROPAGATE, originalKey);
         final byte[] serialized = swSerde.serializer().serialize(null, wrapper);
@@ -46,7 +46,7 @@ public class SubscriptionWrapperSerdeTest {
     @SuppressWarnings("unchecked")
     public void shouldSerdeNullHashTest() {
         final String originalKey = "originalKey";
-        final SubscriptionWrapperSerde swSerde = new SubscriptionWrapperSerde<>("pkTopic", Serdes.String());
+        final SubscriptionWrapperSerde swSerde = new SubscriptionWrapperSerde<>(() -> "pkTopic", Serdes.String());
         final long[] hashedValue = null;
         final SubscriptionWrapper wrapper = new SubscriptionWrapper<>(hashedValue, SubscriptionWrapper.Instruction.PROPAGATE_ONLY_IF_FK_VAL_AVAILABLE, originalKey);
         final byte[] serialized = swSerde.serializer().serialize(null, wrapper);
@@ -61,7 +61,7 @@ public class SubscriptionWrapperSerdeTest {
     @SuppressWarnings("unchecked")
     public void shouldThrowExceptionOnNullKeyTest() {
         final String originalKey = null;
-        final SubscriptionWrapperSerde swSerde = new SubscriptionWrapperSerde<>("pkTopic", Serdes.String());
+        final SubscriptionWrapperSerde swSerde = new SubscriptionWrapperSerde<>(() -> "pkTopic", Serdes.String());
         final long[] hashedValue = Murmur3.hash128(new byte[] {(byte) 0xFF, (byte) 0xAA, (byte) 0x00, (byte) 0x19});
         final SubscriptionWrapper wrapper = new SubscriptionWrapper<>(hashedValue, SubscriptionWrapper.Instruction.PROPAGATE_ONLY_IF_FK_VAL_AVAILABLE, originalKey);
         swSerde.serializer().serialize(null, wrapper);
@@ -71,7 +71,7 @@ public class SubscriptionWrapperSerdeTest {
     @SuppressWarnings("unchecked")
     public void shouldThrowExceptionOnNullInstructionTest() {
         final String originalKey = "originalKey";
-        final SubscriptionWrapperSerde swSerde = new SubscriptionWrapperSerde<>("pkTopic", Serdes.String());
+        final SubscriptionWrapperSerde swSerde = new SubscriptionWrapperSerde<>(() -> "pkTopic", Serdes.String());
         final long[] hashedValue = Murmur3.hash128(new byte[] {(byte) 0xFF, (byte) 0xAA, (byte) 0x00, (byte) 0x19});
         final SubscriptionWrapper wrapper = new SubscriptionWrapper<>(hashedValue, null, originalKey);
         swSerde.serializer().serialize(null, wrapper);


[kafka] 01/02: MINOR: reuse pseudo-topic in FKJoin (#8296)

Posted by vv...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

vvcephei pushed a commit to branch 2.5
in repository https://gitbox.apache.org/repos/asf/kafka.git

commit e3a8223e3c511a8a17bac1b60b1b9ef1c2b4c9de
Author: John Roesler <vv...@users.noreply.github.com>
AuthorDate: Fri Mar 13 23:04:14 2020 -0500

    MINOR: reuse pseudo-topic in FKJoin (#8296)
    
    Reuse the same pseudo-topic for serializing the LHS value in the foreign-key join resolver as
    we originally used to serialize it before sending the subscription request.
    
    Reviewers: Boyang Chen <bo...@confluent.io>
---
 .../streams/kstream/internals/KTableImpl.java      |   1 +
 .../SubscriptionResolverJoinProcessorSupplier.java |  11 +-
 .../KTableKTableForeignKeyJoinPseudoTopicTest.java | 138 +++++++++++++++++++++
 ...scriptionResolverJoinProcessorSupplierTest.java |   6 +
 .../kafka/streams/utils/UniqueTopicSerdeScope.java |   6 +
 5 files changed, 155 insertions(+), 7 deletions(-)

diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableImpl.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableImpl.java
index ddff497..75cdffa 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableImpl.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableImpl.java
@@ -1085,6 +1085,7 @@ public class KTableImpl<K, S, V> extends AbstractStream<K, V> implements KTable<
         final SubscriptionResolverJoinProcessorSupplier<K, V, VO, VR> resolverProcessorSupplier = new SubscriptionResolverJoinProcessorSupplier<>(
             primaryKeyValueGetter,
             valSerde == null ? null : valSerde.serializer(),
+            valueHashSerdePseudoTopic,
             joiner,
             leftJoin
         );
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/foreignkeyjoin/SubscriptionResolverJoinProcessorSupplier.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/foreignkeyjoin/SubscriptionResolverJoinProcessorSupplier.java
index 8fa77aa..31de068 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/foreignkeyjoin/SubscriptionResolverJoinProcessorSupplier.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/foreignkeyjoin/SubscriptionResolverJoinProcessorSupplier.java
@@ -42,15 +42,18 @@ import org.apache.kafka.streams.state.internals.Murmur3;
 public class SubscriptionResolverJoinProcessorSupplier<K, V, VO, VR> implements ProcessorSupplier<K, SubscriptionResponseWrapper<VO>> {
     private final KTableValueGetterSupplier<K, V> valueGetterSupplier;
     private final Serializer<V> constructionTimeValueSerializer;
+    private final String valueHashSerdePseudoTopic;
     private final ValueJoiner<V, VO, VR> joiner;
     private final boolean leftJoin;
 
     public SubscriptionResolverJoinProcessorSupplier(final KTableValueGetterSupplier<K, V> valueGetterSupplier,
                                                      final Serializer<V> valueSerializer,
+                                                     final String valueHashSerdePseudoTopic,
                                                      final ValueJoiner<V, VO, VR> joiner,
                                                      final boolean leftJoin) {
         this.valueGetterSupplier = valueGetterSupplier;
         constructionTimeValueSerializer = valueSerializer;
+        this.valueHashSerdePseudoTopic = valueHashSerdePseudoTopic;
         this.joiner = joiner;
         this.leftJoin = leftJoin;
     }
@@ -83,15 +86,9 @@ public class SubscriptionResolverJoinProcessorSupplier<K, V, VO, VR> implements
                 }
                 final ValueAndTimestamp<V> currentValueWithTimestamp = valueGetter.get(key);
 
-                //We are unable to access the actual source topic name for the valueSerializer at runtime, without
-                //tightly coupling to KTableRepartitionProcessorSupplier.
-                //While we can use the source topic from where the events came from, we shouldn't serialize against it
-                //as it causes problems with the confluent schema registry, which requires each topic have only a single
-                //registered schema.
-                final String dummySerializationTopic = context().topic() + "-join-resolver";
                 final long[] currentHash = currentValueWithTimestamp == null ?
                     null :
-                    Murmur3.hash128(runtimeValueSerializer.serialize(dummySerializationTopic, currentValueWithTimestamp.value()));
+                    Murmur3.hash128(runtimeValueSerializer.serialize(valueHashSerdePseudoTopic, currentValueWithTimestamp.value()));
 
                 final long[] messageHash = value.getOriginalValueHash();
 
diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/KTableKTableForeignKeyJoinPseudoTopicTest.java b/streams/src/test/java/org/apache/kafka/streams/integration/KTableKTableForeignKeyJoinPseudoTopicTest.java
new file mode 100644
index 0000000..b08c293
--- /dev/null
+++ b/streams/src/test/java/org/apache/kafka/streams/integration/KTableKTableForeignKeyJoinPseudoTopicTest.java
@@ -0,0 +1,138 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.integration;
+
+import org.apache.kafka.common.serialization.Serdes;
+import org.apache.kafka.common.serialization.StringSerializer;
+import org.apache.kafka.streams.StreamsBuilder;
+import org.apache.kafka.streams.StreamsConfig;
+import org.apache.kafka.streams.TestInputTopic;
+import org.apache.kafka.streams.Topology;
+import org.apache.kafka.streams.TopologyTestDriver;
+import org.apache.kafka.streams.kstream.Consumed;
+import org.apache.kafka.streams.kstream.KTable;
+import org.apache.kafka.streams.kstream.Materialized;
+import org.apache.kafka.streams.utils.UniqueTopicSerdeScope;
+import org.apache.kafka.test.TestUtils;
+import org.junit.Test;
+
+import java.util.Collection;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Properties;
+
+import static org.apache.kafka.common.utils.Utils.mkEntry;
+import static org.apache.kafka.common.utils.Utils.mkMap;
+import static org.apache.kafka.common.utils.Utils.mkProperties;
+import static org.apache.kafka.common.utils.Utils.mkSet;
+import static org.hamcrest.CoreMatchers.is;
+import static org.hamcrest.MatcherAssert.assertThat;
+
+
+public class KTableKTableForeignKeyJoinPseudoTopicTest {
+
+    private static final String LEFT_TABLE = "left_table";
+    private static final String RIGHT_TABLE = "right_table";
+    private static final String OUTPUT = "output-topic";
+    private static final String REJOIN_OUTPUT = "rejoin-output-topic";
+    private final Properties streamsConfig = mkProperties(mkMap(
+        mkEntry(StreamsConfig.APPLICATION_ID_CONFIG, "ktable-ktable-joinOnForeignKey"),
+        mkEntry(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "asdf:0000"),
+        mkEntry(StreamsConfig.STATE_DIR_CONFIG, TestUtils.tempDirectory().getPath())
+    ));
+
+
+    private static Collection<Object[]> buildParameters(final List<?>... argOptions) {
+        List<Object[]> result = new LinkedList<>();
+        result.add(new Object[0]);
+
+        for (final List<?> argOption : argOptions) {
+            result = times(result, argOption);
+        }
+
+        return result;
+    }
+
+    private static List<Object[]> times(final List<Object[]> left, final List<?> right) {
+        final List<Object[]> result = new LinkedList<>();
+        for (final Object[] args : left) {
+            for (final Object rightElem : right) {
+                final Object[] resArgs = new Object[args.length + 1];
+                System.arraycopy(args, 0, resArgs, 0, args.length);
+                resArgs[args.length] = rightElem;
+                result.add(resArgs);
+            }
+        }
+        return result;
+    }
+
+
+    @Test
+    public void shouldUseExpectedTopicsWithSerde() {
+        final UniqueTopicSerdeScope serdeScope = new UniqueTopicSerdeScope();
+        final StreamsBuilder builder = new StreamsBuilder();
+
+        final KTable<String, String> left = builder.table(
+            LEFT_TABLE,
+            Consumed.with(serdeScope.decorateSerde(Serdes.String(), streamsConfig, true),
+                          serdeScope.decorateSerde(Serdes.String(), streamsConfig, false))
+        );
+        final KTable<String, String> right = builder.table(
+            RIGHT_TABLE,
+            Consumed.with(serdeScope.decorateSerde(Serdes.String(), streamsConfig, true),
+                          serdeScope.decorateSerde(Serdes.String(), streamsConfig, false))
+        );
+
+        left.join(
+            right,
+            value -> value.split("\\|")[1],
+            (value1, value2) -> "(" + value1 + "," + value2 + ")",
+            Materialized.with(null, serdeScope.decorateSerde(Serdes.String(), streamsConfig, false)
+            ))
+            .toStream()
+            .to(OUTPUT);
+
+
+        final Topology topology = builder.build(streamsConfig);
+        try (final TopologyTestDriver driver = new TopologyTestDriver(topology, streamsConfig)) {
+            final TestInputTopic<String, String> leftInput = driver.createInputTopic(LEFT_TABLE, new StringSerializer(), new StringSerializer());
+            final TestInputTopic<String, String> rightInput = driver.createInputTopic(RIGHT_TABLE, new StringSerializer(), new StringSerializer());
+            leftInput.pipeInput("lhs1", "lhsValue1|rhs1");
+            rightInput.pipeInput("rhs1", "rhsValue1");
+        }
+        // verifying primarily that no extra pseudo-topics were used, but it's nice to also verify the rest of the
+        // topics our serdes serialize data for
+        assertThat(serdeScope.registeredTopics(), is(mkSet(
+            // expected pseudo-topics
+            "KTABLE-FK-JOIN-SUBSCRIPTION-REGISTRATION-0000000006-topic-fk--key",
+            "KTABLE-FK-JOIN-SUBSCRIPTION-REGISTRATION-0000000006-topic-pk--key",
+            "KTABLE-FK-JOIN-SUBSCRIPTION-REGISTRATION-0000000006-topic-vh--value",
+            // internal topics
+            "ktable-ktable-joinOnForeignKey-KTABLE-FK-JOIN-SUBSCRIPTION-REGISTRATION-0000000006-topic--key",
+            "ktable-ktable-joinOnForeignKey-KTABLE-FK-JOIN-SUBSCRIPTION-RESPONSE-0000000014-topic--key",
+            "ktable-ktable-joinOnForeignKey-KTABLE-FK-JOIN-SUBSCRIPTION-RESPONSE-0000000014-topic--value",
+            "ktable-ktable-joinOnForeignKey-left_table-STATE-STORE-0000000000-changelog--key",
+            "ktable-ktable-joinOnForeignKey-left_table-STATE-STORE-0000000000-changelog--value",
+            "ktable-ktable-joinOnForeignKey-right_table-STATE-STORE-0000000003-changelog--key",
+            "ktable-ktable-joinOnForeignKey-right_table-STATE-STORE-0000000003-changelog--value",
+            // output topics
+            "output-topic--key",
+            "output-topic--value"
+        )));
+    }
+
+}
diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/foreignkeyjoin/SubscriptionResolverJoinProcessorSupplierTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/foreignkeyjoin/SubscriptionResolverJoinProcessorSupplierTest.java
index 3ec19de..aae99ec 100644
--- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/foreignkeyjoin/SubscriptionResolverJoinProcessorSupplierTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/foreignkeyjoin/SubscriptionResolverJoinProcessorSupplierTest.java
@@ -82,6 +82,7 @@ public class SubscriptionResolverJoinProcessorSupplierTest {
             new SubscriptionResolverJoinProcessorSupplier<>(
                 valueGetterSupplier,
                 STRING_SERIALIZER,
+                "value-hash-dummy-topic",
                 JOINER,
                 leftJoin
             );
@@ -106,6 +107,7 @@ public class SubscriptionResolverJoinProcessorSupplierTest {
             new SubscriptionResolverJoinProcessorSupplier<>(
                 valueGetterSupplier,
                 STRING_SERIALIZER,
+                "value-hash-dummy-topic",
                 JOINER,
                 leftJoin
             );
@@ -130,6 +132,7 @@ public class SubscriptionResolverJoinProcessorSupplierTest {
             new SubscriptionResolverJoinProcessorSupplier<>(
                 valueGetterSupplier,
                 STRING_SERIALIZER,
+                "value-hash-dummy-topic",
                 JOINER,
                 leftJoin
             );
@@ -155,6 +158,7 @@ public class SubscriptionResolverJoinProcessorSupplierTest {
             new SubscriptionResolverJoinProcessorSupplier<>(
                 valueGetterSupplier,
                 STRING_SERIALIZER,
+                "value-hash-dummy-topic",
                 JOINER,
                 leftJoin
             );
@@ -180,6 +184,7 @@ public class SubscriptionResolverJoinProcessorSupplierTest {
             new SubscriptionResolverJoinProcessorSupplier<>(
                 valueGetterSupplier,
                 STRING_SERIALIZER,
+                "value-hash-dummy-topic",
                 JOINER,
                 leftJoin
             );
@@ -205,6 +210,7 @@ public class SubscriptionResolverJoinProcessorSupplierTest {
             new SubscriptionResolverJoinProcessorSupplier<>(
                 valueGetterSupplier,
                 STRING_SERIALIZER,
+                "value-hash-dummy-topic",
                 JOINER,
                 leftJoin
             );
diff --git a/streams/src/test/java/org/apache/kafka/streams/utils/UniqueTopicSerdeScope.java b/streams/src/test/java/org/apache/kafka/streams/utils/UniqueTopicSerdeScope.java
index 04b1a7b..c385187 100644
--- a/streams/src/test/java/org/apache/kafka/streams/utils/UniqueTopicSerdeScope.java
+++ b/streams/src/test/java/org/apache/kafka/streams/utils/UniqueTopicSerdeScope.java
@@ -21,8 +21,10 @@ import org.apache.kafka.common.serialization.Deserializer;
 import org.apache.kafka.common.serialization.Serde;
 import org.apache.kafka.common.serialization.Serializer;
 
+import java.util.Collections;
 import java.util.Map;
 import java.util.Properties;
+import java.util.Set;
 import java.util.TreeMap;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.stream.Collectors;
@@ -41,6 +43,10 @@ public class UniqueTopicSerdeScope {
         return decorator;
     }
 
+    public Set<String> registeredTopics() {
+        return Collections.unmodifiableSet(topicTypeRegistry.keySet());
+    }
+
     public class UniqueTopicSerdeDecorator<T> implements Serde<T> {
         private final AtomicBoolean isKey = new AtomicBoolean(false);
         private final Serde<T> delegate;