You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by vv...@apache.org on 2020/02/10 21:55:40 UTC

[kafka] branch 2.5 updated: KAFKA-9517: Fix default serdes with FK join (#8061)

This is an automated email from the ASF dual-hosted git repository.

vvcephei pushed a commit to branch 2.5
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/2.5 by this push:
     new 661a660  KAFKA-9517: Fix default serdes with FK join (#8061)
661a660 is described below

commit 661a660cf37be25f8795665e0de74be13bdf44a4
Author: John Roesler <vv...@users.noreply.github.com>
AuthorDate: Mon Feb 10 15:33:23 2020 -0600

    KAFKA-9517: Fix default serdes with FK join (#8061)
    
    During the KIP-213 implementation and verification, we neglected to test the
    code path for falling back to default serdes if none are given in the topology.
    
    Cherry-pick of 520a76155c66486cbcd0c519ef2980359964fd7c from trunk
    
    Reviewer: Bill Bejeck <bb...@gmail.com>
---
 .../kstream/internals/ChangedDeserializer.java     |  10 +-
 .../kstream/internals/ChangedSerializer.java       |  10 +-
 .../streams/kstream/internals/KTableImpl.java      |   4 +-
 .../internals/WrappingNullableDeserializer.java    |  23 +++
 .../internals/WrappingNullableSerializer.java      |  23 +++
 .../foreignkeyjoin/CombinedKeySchema.java          |   8 +-
 ...reignJoinSubscriptionSendProcessorSupplier.java |   5 +-
 .../SubscriptionResolverJoinProcessorSupplier.java |  11 +-
 .../SubscriptionResponseWrapperSerde.java          |  33 +++-
 .../SubscriptionStoreReceiveProcessorSupplier.java |   2 +
 .../foreignkeyjoin/SubscriptionWrapperSerde.java   |  35 +++-
 .../streams/processor/internals/SinkNode.java      |  10 +-
 .../streams/processor/internals/SourceNode.java    |  10 +-
 .../streams/integration/ForeignKeyJoinSuite.java   |   1 +
 ...KTableKTableForeignKeyJoinDefaultSerdeTest.java | 178 +++++++++++++++++++++
 .../internals/StreamsPartitionAssignorTest.java    |   1 -
 16 files changed, 325 insertions(+), 39 deletions(-)

diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/ChangedDeserializer.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/ChangedDeserializer.java
index 36f77b8..90d5882 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/ChangedDeserializer.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/ChangedDeserializer.java
@@ -20,8 +20,9 @@ import org.apache.kafka.common.header.Headers;
 import org.apache.kafka.common.serialization.Deserializer;
 
 import java.nio.ByteBuffer;
+import java.util.Objects;
 
-public class ChangedDeserializer<T> implements Deserializer<Change<T>> {
+public class ChangedDeserializer<T> implements Deserializer<Change<T>>, WrappingNullableDeserializer<Change<T>, T> {
 
     private static final int NEWFLAG_SIZE = 1;
 
@@ -35,8 +36,11 @@ public class ChangedDeserializer<T> implements Deserializer<Change<T>> {
         return inner;
     }
 
-    public void setInner(final Deserializer<T> inner) {
-        this.inner = inner;
+    @Override
+    public void setIfUnset(final Deserializer<T> defaultDeserializer) {
+        if (inner == null) {
+            inner = Objects.requireNonNull(defaultDeserializer, "defaultDeserializer cannot be null");
+        }
     }
 
     @Override
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/ChangedSerializer.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/ChangedSerializer.java
index bfd0afa..551d948 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/ChangedSerializer.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/ChangedSerializer.java
@@ -21,8 +21,9 @@ import org.apache.kafka.common.serialization.Serializer;
 import org.apache.kafka.streams.errors.StreamsException;
 
 import java.nio.ByteBuffer;
+import java.util.Objects;
 
-public class ChangedSerializer<T> implements Serializer<Change<T>> {
+public class ChangedSerializer<T> implements Serializer<Change<T>>, WrappingNullableSerializer<Change<T>, T> {
 
     private static final int NEWFLAG_SIZE = 1;
 
@@ -36,8 +37,11 @@ public class ChangedSerializer<T> implements Serializer<Change<T>> {
         return inner;
     }
 
-    public void setInner(final Serializer<T> inner) {
-        this.inner = inner;
+    @Override
+    public void setIfUnset(final Serializer<T> defaultSerializer) {
+        if (inner == null) {
+            inner = Objects.requireNonNull(defaultSerializer, "defaultSerializer cannot be null");
+        }
     }
 
     /**
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableImpl.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableImpl.java
index 38e5c49..f5c5aec 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableImpl.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableImpl.java
@@ -970,7 +970,7 @@ public class KTableImpl<K, S, V> extends AbstractStream<K, V> implements KTable<
                     foreignKeyExtractor,
                     foreignKeySerde,
                     subscriptionTopicName,
-                    valSerde.serializer(),
+                    valSerde == null ? null : valSerde.serializer(),
                     leftJoin
                 ),
                 renamed.suffixWithOrElseGet("-subscription-registration-processor", builder, SUBSCRIPTION_REGISTRATION)
@@ -1073,7 +1073,7 @@ public class KTableImpl<K, S, V> extends AbstractStream<K, V> implements KTable<
         final KTableValueGetterSupplier<K, V> primaryKeyValueGetter = valueGetterSupplier();
         final SubscriptionResolverJoinProcessorSupplier<K, V, VO, VR> resolverProcessorSupplier = new SubscriptionResolverJoinProcessorSupplier<>(
             primaryKeyValueGetter,
-            valueSerde().serializer(),
+            valSerde == null ? null : valSerde.serializer(),
             joiner,
             leftJoin
         );
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/WrappingNullableDeserializer.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/WrappingNullableDeserializer.java
new file mode 100644
index 0000000..a57e9a1
--- /dev/null
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/WrappingNullableDeserializer.java
@@ -0,0 +1,23 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.kstream.internals;
+
+import org.apache.kafka.common.serialization.Deserializer;
+
+public interface WrappingNullableDeserializer<Outer, Inner> extends Deserializer<Outer> {
+    void setIfUnset(final Deserializer<Inner> defaultDeserializer);
+}
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/WrappingNullableSerializer.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/WrappingNullableSerializer.java
new file mode 100644
index 0000000..2d28e52
--- /dev/null
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/WrappingNullableSerializer.java
@@ -0,0 +1,23 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.kstream.internals;
+
+import org.apache.kafka.common.serialization.Serializer;
+
+public interface WrappingNullableSerializer<Outer, Inner> extends Serializer<Outer> {
+    void setIfUnset(final Serializer<Inner> defaultSerializer);
+}
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/foreignkeyjoin/CombinedKeySchema.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/foreignkeyjoin/CombinedKeySchema.java
index 8abe583..7cda404 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/foreignkeyjoin/CombinedKeySchema.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/foreignkeyjoin/CombinedKeySchema.java
@@ -36,10 +36,10 @@ public class CombinedKeySchema<KO, K> {
 
     public CombinedKeySchema(final String serdeTopic, final Serde<KO> foreignKeySerde, final Serde<K> primaryKeySerde) {
         this.serdeTopic = serdeTopic;
-        primaryKeySerializer = primaryKeySerde.serializer();
-        primaryKeyDeserializer = primaryKeySerde.deserializer();
-        foreignKeyDeserializer = foreignKeySerde.deserializer();
-        foreignKeySerializer = foreignKeySerde.serializer();
+        primaryKeySerializer = primaryKeySerde == null ? null : primaryKeySerde.serializer();
+        primaryKeyDeserializer = primaryKeySerde == null ? null : primaryKeySerde.deserializer();
+        foreignKeyDeserializer = foreignKeySerde == null ? null : foreignKeySerde.deserializer();
+        foreignKeySerializer = foreignKeySerde == null ? null : foreignKeySerde.serializer();
     }
 
     @SuppressWarnings("unchecked")
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/foreignkeyjoin/ForeignJoinSubscriptionSendProcessorSupplier.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/foreignkeyjoin/ForeignJoinSubscriptionSendProcessorSupplier.java
index 00815f9..06bd7d2 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/foreignkeyjoin/ForeignJoinSubscriptionSendProcessorSupplier.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/foreignkeyjoin/ForeignJoinSubscriptionSendProcessorSupplier.java
@@ -44,9 +44,9 @@ public class ForeignJoinSubscriptionSendProcessorSupplier<K, KO, V> implements P
 
     private final Function<V, KO> foreignKeyExtractor;
     private final String repartitionTopicName;
-    private final Serializer<V> valueSerializer;
     private final boolean leftJoin;
     private Serializer<KO> foreignKeySerializer;
+    private Serializer<V> valueSerializer;
 
     public ForeignJoinSubscriptionSendProcessorSupplier(final Function<V, KO> foreignKeyExtractor,
                                                         final Serde<KO> foreignKeySerde,
@@ -77,6 +77,9 @@ public class ForeignJoinSubscriptionSendProcessorSupplier<K, KO, V> implements P
             if (foreignKeySerializer == null) {
                 foreignKeySerializer = (Serializer<KO>) context.keySerde().serializer();
             }
+            if (valueSerializer == null) {
+                valueSerializer = (Serializer<V>) context.valueSerde().serializer();
+            }
             droppedRecordsSensor = TaskMetrics.droppedRecordsSensorOrSkippedRecordsSensor(
                 Thread.currentThread().getName(),
                 context.taskId().toString(),
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/foreignkeyjoin/SubscriptionResolverJoinProcessorSupplier.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/foreignkeyjoin/SubscriptionResolverJoinProcessorSupplier.java
index a188f15..8fa77aa 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/foreignkeyjoin/SubscriptionResolverJoinProcessorSupplier.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/foreignkeyjoin/SubscriptionResolverJoinProcessorSupplier.java
@@ -41,7 +41,7 @@ import org.apache.kafka.streams.state.internals.Murmur3;
  */
 public class SubscriptionResolverJoinProcessorSupplier<K, V, VO, VR> implements ProcessorSupplier<K, SubscriptionResponseWrapper<VO>> {
     private final KTableValueGetterSupplier<K, V> valueGetterSupplier;
-    private final Serializer<V> valueSerializer;
+    private final Serializer<V> constructionTimeValueSerializer;
     private final ValueJoiner<V, VO, VR> joiner;
     private final boolean leftJoin;
 
@@ -50,7 +50,7 @@ public class SubscriptionResolverJoinProcessorSupplier<K, V, VO, VR> implements
                                                      final ValueJoiner<V, VO, VR> joiner,
                                                      final boolean leftJoin) {
         this.valueGetterSupplier = valueGetterSupplier;
-        this.valueSerializer = valueSerializer;
+        constructionTimeValueSerializer = valueSerializer;
         this.joiner = joiner;
         this.leftJoin = leftJoin;
     }
@@ -58,14 +58,19 @@ public class SubscriptionResolverJoinProcessorSupplier<K, V, VO, VR> implements
     @Override
     public Processor<K, SubscriptionResponseWrapper<VO>> get() {
         return new AbstractProcessor<K, SubscriptionResponseWrapper<VO>>() {
+            private Serializer<V> runtimeValueSerializer = constructionTimeValueSerializer;
 
             private KTableValueGetter<K, V> valueGetter;
 
+            @SuppressWarnings("unchecked")
             @Override
             public void init(final ProcessorContext context) {
                 super.init(context);
                 valueGetter = valueGetterSupplier.get();
                 valueGetter.init(context);
+                if (runtimeValueSerializer == null) {
+                    runtimeValueSerializer = (Serializer<V>) context.valueSerde().serializer();
+                }
             }
 
             @Override
@@ -86,7 +91,7 @@ public class SubscriptionResolverJoinProcessorSupplier<K, V, VO, VR> implements
                 final String dummySerializationTopic = context().topic() + "-join-resolver";
                 final long[] currentHash = currentValueWithTimestamp == null ?
                     null :
-                    Murmur3.hash128(valueSerializer.serialize(dummySerializationTopic, currentValueWithTimestamp.value()));
+                    Murmur3.hash128(runtimeValueSerializer.serialize(dummySerializationTopic, currentValueWithTimestamp.value()));
 
                 final long[] messageHash = value.getOriginalValueHash();
 
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/foreignkeyjoin/SubscriptionResponseWrapperSerde.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/foreignkeyjoin/SubscriptionResponseWrapperSerde.java
index 4277f9a..31317c5 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/foreignkeyjoin/SubscriptionResponseWrapperSerde.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/foreignkeyjoin/SubscriptionResponseWrapperSerde.java
@@ -20,16 +20,19 @@ import org.apache.kafka.common.errors.UnsupportedVersionException;
 import org.apache.kafka.common.serialization.Deserializer;
 import org.apache.kafka.common.serialization.Serde;
 import org.apache.kafka.common.serialization.Serializer;
+import org.apache.kafka.streams.kstream.internals.WrappingNullableDeserializer;
+import org.apache.kafka.streams.kstream.internals.WrappingNullableSerializer;
 
 import java.nio.ByteBuffer;
+import java.util.Objects;
 
 public class SubscriptionResponseWrapperSerde<V> implements Serde<SubscriptionResponseWrapper<V>> {
     private final SubscriptionResponseWrapperSerializer<V> serializer;
     private final SubscriptionResponseWrapperDeserializer<V> deserializer;
 
     public SubscriptionResponseWrapperSerde(final Serde<V> foreignValueSerde) {
-        serializer = new SubscriptionResponseWrapperSerializer<>(foreignValueSerde.serializer());
-        deserializer = new SubscriptionResponseWrapperDeserializer<>(foreignValueSerde.deserializer());
+        serializer = new SubscriptionResponseWrapperSerializer<>(foreignValueSerde == null ? null : foreignValueSerde.serializer());
+        deserializer = new SubscriptionResponseWrapperDeserializer<>(foreignValueSerde == null ? null : foreignValueSerde.deserializer());
     }
 
     @Override
@@ -42,14 +45,23 @@ public class SubscriptionResponseWrapperSerde<V> implements Serde<SubscriptionRe
         return deserializer;
     }
 
-    private static final class SubscriptionResponseWrapperSerializer<V> implements Serializer<SubscriptionResponseWrapper<V>> {
-        private final Serializer<V> serializer;
+    private static final class SubscriptionResponseWrapperSerializer<V>
+        implements Serializer<SubscriptionResponseWrapper<V>>, WrappingNullableSerializer<SubscriptionResponseWrapper<V>, V> {
+
+        private Serializer<V> serializer;
 
         private SubscriptionResponseWrapperSerializer(final Serializer<V> serializer) {
             this.serializer = serializer;
         }
 
         @Override
+        public void setIfUnset(final Serializer<V> defaultSerializer) {
+            if (serializer == null) {
+                serializer = Objects.requireNonNull(defaultSerializer, "defaultSerializer cannot be null");
+            }
+        }
+
+        @Override
         public byte[] serialize(final String topic, final SubscriptionResponseWrapper<V> data) {
             //{1-bit-isHashNull}{7-bits-version}{Optional-16-byte-Hash}{n-bytes serialized data}
 
@@ -81,14 +93,23 @@ public class SubscriptionResponseWrapperSerde<V> implements Serde<SubscriptionRe
 
     }
 
-    private static final class SubscriptionResponseWrapperDeserializer<V> implements Deserializer<SubscriptionResponseWrapper<V>> {
-        private final Deserializer<V> deserializer;
+    private static final class SubscriptionResponseWrapperDeserializer<V>
+        implements Deserializer<SubscriptionResponseWrapper<V>>, WrappingNullableDeserializer<SubscriptionResponseWrapper<V>, V> {
+
+        private Deserializer<V> deserializer;
 
         private SubscriptionResponseWrapperDeserializer(final Deserializer<V> deserializer) {
             this.deserializer = deserializer;
         }
 
         @Override
+        public void setIfUnset(final Deserializer<V> defaultDeserializer) {
+            if (deserializer == null) {
+                deserializer = Objects.requireNonNull(defaultDeserializer, "defaultDeserializer cannot be null");
+            }
+        }
+
+        @Override
         public SubscriptionResponseWrapper<V> deserialize(final String topic, final byte[] data) {
             //{1-bit-isHashNull}{7-bits-version}{Optional-16-byte-Hash}{n-bytes serialized data}
 
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/foreignkeyjoin/SubscriptionStoreReceiveProcessorSupplier.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/foreignkeyjoin/SubscriptionStoreReceiveProcessorSupplier.java
index 9cbeadd..61fb1c1 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/foreignkeyjoin/SubscriptionStoreReceiveProcessorSupplier.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/foreignkeyjoin/SubscriptionStoreReceiveProcessorSupplier.java
@@ -68,6 +68,8 @@ public class SubscriptionStoreReceiveProcessorSupplier<K, KO>
                     internalProcessorContext.metrics()
                 );
                 store = internalProcessorContext.getStateStore(storeBuilder);
+
+                keySchema.init(context);
             }
 
             @Override
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/foreignkeyjoin/SubscriptionWrapperSerde.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/foreignkeyjoin/SubscriptionWrapperSerde.java
index ae53ba8..1cb3293 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/foreignkeyjoin/SubscriptionWrapperSerde.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/foreignkeyjoin/SubscriptionWrapperSerde.java
@@ -20,16 +20,19 @@ import org.apache.kafka.common.errors.UnsupportedVersionException;
 import org.apache.kafka.common.serialization.Deserializer;
 import org.apache.kafka.common.serialization.Serde;
 import org.apache.kafka.common.serialization.Serializer;
+import org.apache.kafka.streams.kstream.internals.WrappingNullableDeserializer;
+import org.apache.kafka.streams.kstream.internals.WrappingNullableSerializer;
 
 import java.nio.ByteBuffer;
+import java.util.Objects;
 
 public class SubscriptionWrapperSerde<K> implements Serde<SubscriptionWrapper<K>> {
     private final SubscriptionWrapperSerializer<K> serializer;
     private final SubscriptionWrapperDeserializer<K> deserializer;
 
     public SubscriptionWrapperSerde(final Serde<K> primaryKeySerde) {
-        serializer = new SubscriptionWrapperSerializer<>(primaryKeySerde.serializer());
-        deserializer = new SubscriptionWrapperDeserializer<>(primaryKeySerde.deserializer());
+        serializer = new SubscriptionWrapperSerializer<>(primaryKeySerde == null ? null : primaryKeySerde.serializer());
+        deserializer = new SubscriptionWrapperDeserializer<>(primaryKeySerde == null ? null : primaryKeySerde.deserializer());
     }
 
     @Override
@@ -42,13 +45,23 @@ public class SubscriptionWrapperSerde<K> implements Serde<SubscriptionWrapper<K>
         return deserializer;
     }
 
-    private static class SubscriptionWrapperSerializer<K> implements Serializer<SubscriptionWrapper<K>> {
-        private final Serializer<K> primaryKeySerializer;
+    public static class SubscriptionWrapperSerializer<K>
+        implements Serializer<SubscriptionWrapper<K>>, WrappingNullableSerializer<SubscriptionWrapper<K>, K> {
+
+        private Serializer<K> primaryKeySerializer;
+
         SubscriptionWrapperSerializer(final Serializer<K> primaryKeySerializer) {
             this.primaryKeySerializer = primaryKeySerializer;
         }
 
         @Override
+        public void setIfUnset(final Serializer<K> defaultSerializer) {
+            if (primaryKeySerializer == null) {
+                primaryKeySerializer = Objects.requireNonNull(defaultSerializer, "defaultSerializer cannot be null");
+            }
+        }
+
+        @Override
         public byte[] serialize(final String topic, final SubscriptionWrapper<K> data) {
             //{1-bit-isHashNull}{7-bits-version}{1-byte-instruction}{Optional-16-byte-Hash}{PK-serialized}
 
@@ -81,13 +94,23 @@ public class SubscriptionWrapperSerde<K> implements Serde<SubscriptionWrapper<K>
 
     }
 
-    private static class SubscriptionWrapperDeserializer<K> implements Deserializer<SubscriptionWrapper<K>> {
-        private final Deserializer<K> primaryKeyDeserializer;
+    public static class SubscriptionWrapperDeserializer<K>
+        implements Deserializer<SubscriptionWrapper<K>>, WrappingNullableDeserializer<SubscriptionWrapper<K>, K> {
+
+        private Deserializer<K> primaryKeyDeserializer;
+
         SubscriptionWrapperDeserializer(final Deserializer<K> primaryKeyDeserializer) {
             this.primaryKeyDeserializer = primaryKeyDeserializer;
         }
 
         @Override
+        public void setIfUnset(final Deserializer<K> defaultDeserializer) {
+            if (primaryKeyDeserializer == null) {
+                primaryKeyDeserializer = Objects.requireNonNull(defaultDeserializer, "defaultDeserializer cannot be null");
+            }
+        }
+
+        @Override
         public SubscriptionWrapper<K> deserialize(final String topic, final byte[] data) {
             //{7-bits-version}{1-bit-isHashNull}{1-byte-instruction}{Optional-16-byte-Hash}{PK-serialized}
             final ByteBuffer buf = ByteBuffer.wrap(data);
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/SinkNode.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/SinkNode.java
index b94291b..e3333be 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/SinkNode.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/SinkNode.java
@@ -18,7 +18,7 @@ package org.apache.kafka.streams.processor.internals;
 
 import org.apache.kafka.common.serialization.Serializer;
 import org.apache.kafka.streams.errors.StreamsException;
-import org.apache.kafka.streams.kstream.internals.ChangedSerializer;
+import org.apache.kafka.streams.kstream.internals.WrappingNullableSerializer;
 import org.apache.kafka.streams.processor.StreamPartitioner;
 import org.apache.kafka.streams.processor.TopicNameExtractor;
 
@@ -66,10 +66,10 @@ public class SinkNode<K, V> extends ProcessorNode<K, V> {
             valSerializer = (Serializer<V>) context.valueSerde().serializer();
         }
 
-        // if value serializers are for {@code Change} values, set the inner serializer when necessary
-        if (valSerializer instanceof ChangedSerializer &&
-                ((ChangedSerializer) valSerializer).inner() == null) {
-            ((ChangedSerializer) valSerializer).setInner(context.valueSerde().serializer());
+        // if value serializers are internal wrapping serializers that may need to be given the default serializer
+        // then pass it the default one from the context
+        if (valSerializer instanceof WrappingNullableSerializer) {
+            ((WrappingNullableSerializer) valSerializer).setIfUnset(context.valueSerde().serializer());
         }
     }
 
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/SourceNode.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/SourceNode.java
index 33d08b1..853520a 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/SourceNode.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/SourceNode.java
@@ -19,7 +19,7 @@ package org.apache.kafka.streams.processor.internals;
 import org.apache.kafka.common.header.Headers;
 import org.apache.kafka.common.metrics.Sensor;
 import org.apache.kafka.common.serialization.Deserializer;
-import org.apache.kafka.streams.kstream.internals.ChangedDeserializer;
+import org.apache.kafka.streams.kstream.internals.WrappingNullableDeserializer;
 import org.apache.kafka.streams.processor.ProcessorContext;
 import org.apache.kafka.streams.processor.TimestampExtractor;
 import org.apache.kafka.streams.processor.internals.metrics.ProcessorNodeMetrics;
@@ -88,10 +88,10 @@ public class SourceNode<K, V> extends ProcessorNode<K, V> {
             this.valDeserializer = (Deserializer<V>) context.valueSerde().deserializer();
         }
 
-        // if value deserializers are for {@code Change} values, set the inner deserializer when necessary
-        if (this.valDeserializer instanceof ChangedDeserializer &&
-                ((ChangedDeserializer) this.valDeserializer).inner() == null) {
-            ((ChangedDeserializer) this.valDeserializer).setInner(context.valueSerde().deserializer());
+        // if value deserializers are internal wrapping deserializers that may need to be given the default
+        // then pass it the default one from the context
+        if (valDeserializer instanceof WrappingNullableDeserializer) {
+            ((WrappingNullableDeserializer) valDeserializer).setIfUnset(context.valueSerde().deserializer());
         }
     }
 
diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/ForeignKeyJoinSuite.java b/streams/src/test/java/org/apache/kafka/streams/integration/ForeignKeyJoinSuite.java
index 47e2d95..dbe5bc2 100644
--- a/streams/src/test/java/org/apache/kafka/streams/integration/ForeignKeyJoinSuite.java
+++ b/streams/src/test/java/org/apache/kafka/streams/integration/ForeignKeyJoinSuite.java
@@ -39,6 +39,7 @@ import org.junit.runners.Suite;
     KTableKTableForeignKeyInnerJoinMultiIntegrationTest.class,
     KTableKTableForeignKeyJoinIntegrationTest.class,
     KTableKTableForeignKeyJoinMaterializationIntegrationTest.class,
+    KTableKTableForeignKeyJoinDefaultSerdeTest.class,
     CombinedKeySchemaTest.class,
     SubscriptionWrapperSerdeTest.class,
     SubscriptionResponseWrapperSerdeTest.class,
diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/KTableKTableForeignKeyJoinDefaultSerdeTest.java b/streams/src/test/java/org/apache/kafka/streams/integration/KTableKTableForeignKeyJoinDefaultSerdeTest.java
new file mode 100644
index 0000000..df6349a
--- /dev/null
+++ b/streams/src/test/java/org/apache/kafka/streams/integration/KTableKTableForeignKeyJoinDefaultSerdeTest.java
@@ -0,0 +1,178 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.integration;
+
+import org.apache.kafka.common.serialization.Serdes;
+import org.apache.kafka.common.serialization.StringDeserializer;
+import org.apache.kafka.common.serialization.StringSerializer;
+import org.apache.kafka.common.utils.Bytes;
+import org.apache.kafka.streams.StreamsBuilder;
+import org.apache.kafka.streams.StreamsConfig;
+import org.apache.kafka.streams.TestInputTopic;
+import org.apache.kafka.streams.TestOutputTopic;
+import org.apache.kafka.streams.TopologyTestDriver;
+import org.apache.kafka.streams.kstream.Consumed;
+import org.apache.kafka.streams.kstream.KTable;
+import org.apache.kafka.streams.kstream.Materialized;
+import org.apache.kafka.streams.kstream.Produced;
+import org.apache.kafka.streams.state.KeyValueStore;
+import org.junit.Test;
+
+import java.util.Collections;
+import java.util.Map;
+import java.util.Properties;
+
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.hamcrest.Matchers.is;
+
+public class KTableKTableForeignKeyJoinDefaultSerdeTest {
+    @Test
+    public void shouldWorkWithDefaultSerdes() {
+        final StreamsBuilder builder = new StreamsBuilder();
+        final KTable<String, String> aTable = builder.table("A");
+        final KTable<String, String> bTable = builder.table("B");
+
+        final KTable<String, String> fkJoinResult = aTable.join(
+            bTable,
+            value -> value.split("-")[0],
+            (aVal, bVal) -> "(" + aVal + "," + bVal + ")",
+            Materialized.as("asdf")
+        );
+
+        final KTable<String, String> finalJoinResult = aTable.join(
+            fkJoinResult,
+            (aVal, fkJoinVal) -> "(" + aVal + "," + fkJoinVal + ")"
+        );
+
+        finalJoinResult.toStream().to("output");
+
+        validateTopologyCanProcessData(builder);
+    }
+
+    @Test
+    public void shouldWorkWithDefaultAndConsumedSerdes() {
+        final StreamsBuilder builder = new StreamsBuilder();
+        final KTable<String, String> aTable = builder.table("A", Consumed.with(Serdes.String(), Serdes.String()));
+        final KTable<String, String> bTable = builder.table("B");
+
+        final KTable<String, String> fkJoinResult = aTable.join(
+            bTable,
+            value -> value.split("-")[0],
+            (aVal, bVal) -> "(" + aVal + "," + bVal + ")",
+            Materialized.as("asdf")
+        );
+
+        final KTable<String, String> finalJoinResult = aTable.join(
+            fkJoinResult,
+            (aVal, fkJoinVal) -> "(" + aVal + "," + fkJoinVal + ")"
+        );
+
+        finalJoinResult.toStream().to("output");
+
+        validateTopologyCanProcessData(builder);
+    }
+
+    @Test
+    public void shouldWorkWithDefaultAndJoinResultSerdes() {
+        final StreamsBuilder builder = new StreamsBuilder();
+        final KTable<String, String> aTable = builder.table("A");
+        final KTable<String, String> bTable = builder.table("B");
+
+        final KTable<String, String> fkJoinResult = aTable.join(
+            bTable,
+            value -> value.split("-")[0],
+            (aVal, bVal) -> "(" + aVal + "," + bVal + ")",
+            Materialized
+                .<String, String, KeyValueStore<Bytes, byte[]>>as("asdf")
+                .withKeySerde(Serdes.String())
+                .withValueSerde(Serdes.String())
+        );
+
+        final KTable<String, String> finalJoinResult = aTable.join(
+            fkJoinResult,
+            (aVal, fkJoinVal) -> "(" + aVal + "," + fkJoinVal + ")"
+        );
+
+        finalJoinResult.toStream().to("output");
+
+        validateTopologyCanProcessData(builder);
+    }
+
+    @Test
+    public void shouldWorkWithDefaultAndEquiJoinResultSerdes() {
+        final StreamsBuilder builder = new StreamsBuilder();
+        final KTable<String, String> aTable = builder.table("A");
+        final KTable<String, String> bTable = builder.table("B");
+
+        final KTable<String, String> fkJoinResult = aTable.join(
+            bTable,
+            value -> value.split("-")[0],
+            (aVal, bVal) -> "(" + aVal + "," + bVal + ")",
+            Materialized.as("asdf")
+        );
+
+        final KTable<String, String> finalJoinResult = aTable.join(
+            fkJoinResult,
+            (aVal, fkJoinVal) -> "(" + aVal + "," + fkJoinVal + ")",
+            Materialized.with(Serdes.String(), Serdes.String())
+        );
+
+        finalJoinResult.toStream().to("output");
+
+        validateTopologyCanProcessData(builder);
+    }
+
+    @Test
+    public void shouldWorkWithDefaultAndProducedSerdes() {
+        final StreamsBuilder builder = new StreamsBuilder();
+        final KTable<String, String> aTable = builder.table("A");
+        final KTable<String, String> bTable = builder.table("B");
+
+        final KTable<String, String> fkJoinResult = aTable.join(
+            bTable,
+            value -> value.split("-")[0],
+            (aVal, bVal) -> "(" + aVal + "," + bVal + ")",
+            Materialized.as("asdf")
+        );
+
+        final KTable<String, String> finalJoinResult = aTable.join(
+            fkJoinResult,
+            (aVal, fkJoinVal) -> "(" + aVal + "," + fkJoinVal + ")"
+        );
+
+        finalJoinResult.toStream().to("output", Produced.with(Serdes.String(), Serdes.String()));
+
+        validateTopologyCanProcessData(builder);
+    }
+
+    private static void validateTopologyCanProcessData(final StreamsBuilder builder) {
+        final Properties config = new Properties();
+        config.setProperty(StreamsConfig.APPLICATION_ID_CONFIG, "dummy");
+        config.setProperty(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "dummy");
+        config.setProperty(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.StringSerde.class.getName());
+        config.setProperty(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.StringSerde.class.getName());
+        try (final TopologyTestDriver topologyTestDriver = new TopologyTestDriver(builder.build(), config)) {
+            final TestInputTopic<String, String> aTopic = topologyTestDriver.createInputTopic("A", new StringSerializer(), new StringSerializer());
+            final TestInputTopic<String, String> bTopic = topologyTestDriver.createInputTopic("B", new StringSerializer(), new StringSerializer());
+            final TestOutputTopic<String, String> output = topologyTestDriver.createOutputTopic("output", new StringDeserializer(), new StringDeserializer());
+            aTopic.pipeInput("a1", "b1-alpha");
+            bTopic.pipeInput("b1", "beta");
+            final Map<String, String> x = output.readKeyValuesToMap();
+            assertThat(x, is(Collections.singletonMap("a1", "(b1-alpha,(b1-alpha,beta))")));
+        }
+    }
+}
diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamsPartitionAssignorTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamsPartitionAssignorTest.java
index 12fc9c7..23e4c91 100644
--- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamsPartitionAssignorTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamsPartitionAssignorTest.java
@@ -1755,7 +1755,6 @@ public class StreamsPartitionAssignorTest {
         createMockTaskManager(emptyTasks, emptyTasks, uuid1, builder);
         EasyMock.replay(taskManager);
 
-        streamsMetadataState = EasyMock.createNiceMock(StreamsMetadataState.class);
         configurePartitionAssignor(emptyMap());
         final MockInternalTopicManager internalTopicManager =
             new MockInternalTopicManager(streamsConfig, mockClientSupplier.restoreConsumer);