You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by vv...@apache.org on 2020/10/20 22:24:10 UTC

[kafka] branch 2.7 updated: KAFKA-10515: Properly initialize nullable Serdes with default values (#9338)

This is an automated email from the ASF dual-hosted git repository.

vvcephei pushed a commit to branch 2.7
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/2.7 by this push:
     new 4a572f8  KAFKA-10515: Properly initialize nullable Serdes with default values (#9338)
4a572f8 is described below

commit 4a572f8f1a9935c3e5ca13ab043aa64b19feb7db
Author: Thorsten Hake <th...@gmail.com>
AuthorDate: Wed Oct 21 00:03:39 2020 +0200

    KAFKA-10515: Properly initialize nullable Serdes with default values (#9338)
    
    Also introduced the notion of WrappingNullableSerdes (aligned to the concept
    of WrappingNullableSerializer and WrappingNullableDeserializer) and centralized
    initialization in WrappingNullables.
    
    The added integeration test KTableKTableForeignKeyJoinDistributedTest tests
    whether all serdes are now correctly set on all stream clients.
    
    Reviewers: John Roesler <vv...@apache.org>
---
 .../kstream/internals/WrappingNullableSerde.java   |  68 ++++++
 .../kstream/internals/WrappingNullableUtils.java   |  97 +++++++++
 .../foreignkeyjoin/SubscriptionWrapperSerde.java   |  26 +--
 .../processor/internals/ProcessorContextUtils.java |  24 +++
 .../streams/processor/internals/SinkNode.java      |  26 +--
 .../streams/processor/internals/SourceNode.java    |  25 +--
 .../state/internals/MeteredKeyValueStore.java      |  25 ++-
 .../state/internals/MeteredSessionStore.java       |  12 +-
 .../internals/MeteredTimestampedKeyValueStore.java |  34 +--
 .../internals/MeteredTimestampedWindowStore.java   |  36 +---
 .../state/internals/MeteredWindowStore.java        |  19 +-
 .../internals/ValueAndTimestampDeserializer.java   |  11 +-
 .../state/internals/ValueAndTimestampSerde.java    |  41 +---
 .../internals/ValueAndTimestampSerializer.java     |  12 +-
 .../KTableKTableForeignKeyJoinDistributedTest.java | 235 +++++++++++++++++++++
 .../internals/GlobalStateStoreProviderTest.java    |   8 +
 .../MeteredTimestampedKeyValueStoreTest.java       |  17 +-
 17 files changed, 542 insertions(+), 174 deletions(-)

diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/WrappingNullableSerde.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/WrappingNullableSerde.java
new file mode 100644
index 0000000..c15ff23
--- /dev/null
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/WrappingNullableSerde.java
@@ -0,0 +1,68 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.streams.kstream.internals;
+
+import org.apache.kafka.common.serialization.Deserializer;
+import org.apache.kafka.common.serialization.Serde;
+import org.apache.kafka.common.serialization.Serializer;
+
+import java.util.Map;
+import java.util.Objects;
+
+public abstract class WrappingNullableSerde<T, InnerK, InnerV> implements Serde<T> {
+    private final WrappingNullableSerializer<T, InnerK, InnerV> serializer;
+    private final WrappingNullableDeserializer<T, InnerK, InnerV> deserializer;
+
+    protected WrappingNullableSerde(final WrappingNullableSerializer<T, InnerK, InnerV> serializer,
+                                    final WrappingNullableDeserializer<T, InnerK, InnerV> deserializer) {
+        Objects.requireNonNull(serializer, "serializer can't be null");
+        Objects.requireNonNull(deserializer, "deserializer can't be null");
+        this.serializer = serializer;
+        this.deserializer = deserializer;
+    }
+
+    @Override
+    public Serializer<T> serializer() {
+        return serializer;
+    }
+
+    @Override
+    public Deserializer<T> deserializer() {
+        return deserializer;
+    }
+
+    @Override
+    public void configure(final Map<String, ?> configs,
+                          final boolean isKey) {
+        serializer.configure(configs, isKey);
+        deserializer.configure(configs, isKey);
+    }
+
+    @Override
+    public void close() {
+        serializer.close();
+        deserializer.close();
+    }
+
+    public void setIfUnset(final Serde<InnerK> defaultKeySerde, final Serde<InnerV> defaultValueSerde) {
+        Objects.requireNonNull(defaultKeySerde);
+        Objects.requireNonNull(defaultValueSerde);
+        serializer.setIfUnset(defaultKeySerde.serializer(), defaultValueSerde.serializer());
+        deserializer.setIfUnset(defaultKeySerde.deserializer(), defaultValueSerde.deserializer());
+    }
+}
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/WrappingNullableUtils.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/WrappingNullableUtils.java
new file mode 100644
index 0000000..23954d2
--- /dev/null
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/WrappingNullableUtils.java
@@ -0,0 +1,97 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.kstream.internals;
+
+import org.apache.kafka.common.serialization.Deserializer;
+import org.apache.kafka.common.serialization.Serde;
+import org.apache.kafka.common.serialization.Serializer;
+
+/**
+ * If a component's serdes are Wrapping serdes, then they require a little extra setup
+ * to be fully initialized at run time.
+ */
+public class WrappingNullableUtils {
+
+    @SuppressWarnings("unchecked")
+    private static <T> Deserializer<T> prepareDeserializer(final Deserializer<T> specificDeserializer, final Deserializer<?> contextKeyDeserializer, final Deserializer<?> contextValueDeserializer, final boolean isKey) {
+        Deserializer<T> deserializerToUse = specificDeserializer;
+        if (deserializerToUse == null) {
+            deserializerToUse = (Deserializer<T>) (isKey ? contextKeyDeserializer : contextValueDeserializer);
+        } else {
+            initNullableDeserializer(deserializerToUse, contextKeyDeserializer, contextValueDeserializer);
+        }
+        return deserializerToUse;
+    }
+    @SuppressWarnings("unchecked")
+    private static <T> Serializer<T> prepareSerializer(final Serializer<T> specificSerializer, final Serializer<?> contextKeySerializer, final Serializer<?> contextValueSerializer, final boolean isKey) {
+        Serializer<T> serializerToUse = specificSerializer;
+        if (serializerToUse == null) {
+            serializerToUse = (Serializer<T>) (isKey ? contextKeySerializer : contextValueSerializer);
+        } else {
+            initNullableSerializer(serializerToUse, contextKeySerializer, contextValueSerializer);
+        }
+        return serializerToUse;
+    }
+
+    @SuppressWarnings({"rawtypes", "unchecked"})
+    private static <T> Serde<T> prepareSerde(final Serde<T> specificSerde, final Serde<?> contextKeySerde, final Serde<?> contextValueSerde, final boolean isKey) {
+        Serde<T> serdeToUse = specificSerde;
+        if (serdeToUse == null) {
+            serdeToUse = (Serde<T>) (isKey ?  contextKeySerde : contextValueSerde);
+        } else if (serdeToUse instanceof WrappingNullableSerde) {
+            ((WrappingNullableSerde) serdeToUse).setIfUnset(contextKeySerde, contextValueSerde);
+        }
+        return serdeToUse;
+    }
+
+    public static <K> Deserializer<K> prepareKeyDeserializer(final Deserializer<K> specificDeserializer, final Deserializer<?> contextKeyDeserializer, final Deserializer<?> contextValueDeserializer) {
+        return prepareDeserializer(specificDeserializer, contextKeyDeserializer, contextValueDeserializer, true);
+    }
+
+    public static <V> Deserializer<V> prepareValueDeserializer(final Deserializer<V> specificDeserializer, final Deserializer<?> contextKeyDeserializer, final Deserializer<?> contextValueDeserializer) {
+        return prepareDeserializer(specificDeserializer, contextKeyDeserializer, contextValueDeserializer, false);
+    }
+
+    public static <K> Serializer<K> prepareKeySerializer(final Serializer<K> specificSerializer, final Serializer<?> contextKeySerializer, final Serializer<?> contextValueSerializer) {
+        return prepareSerializer(specificSerializer, contextKeySerializer, contextValueSerializer, true);
+    }
+
+    public static <V> Serializer<V> prepareValueSerializer(final Serializer<V> specificSerializer, final Serializer<?> contextKeySerializer, final Serializer<?> contextValueSerializer) {
+        return prepareSerializer(specificSerializer, contextKeySerializer, contextValueSerializer, false);
+    }
+
+    public static <K> Serde<K> prepareKeySerde(final Serde<K> specificSerde, final Serde<?> keySerde, final Serde<?> valueSerde) {
+        return prepareSerde(specificSerde, keySerde, valueSerde, true);
+    }
+
+    public static <V> Serde<V> prepareValueSerde(final Serde<V> specificSerde, final Serde<?> keySerde, final Serde<?> valueSerde) {
+        return prepareSerde(specificSerde, keySerde, valueSerde, false);
+    }
+    @SuppressWarnings({"rawtypes", "unchecked"})
+    public static <T> void initNullableSerializer(final Serializer<T> specificSerializer, final Serializer<?> contextKeySerializer, final Serializer<?> contextValueSerializer) {
+        if (specificSerializer instanceof WrappingNullableSerializer) {
+            ((WrappingNullableSerializer) specificSerializer).setIfUnset(contextKeySerializer, contextValueSerializer);
+        }
+    }
+    @SuppressWarnings({"rawtypes", "unchecked"})
+    public static <T> void initNullableDeserializer(final Deserializer<T> specificDeserializer, final Deserializer<?> contextKeyDeserializer, final Deserializer<?> contextValueDeserializer) {
+        if (specificDeserializer instanceof WrappingNullableDeserializer) {
+            ((WrappingNullableDeserializer) specificDeserializer).setIfUnset(contextKeyDeserializer, contextValueDeserializer);
+        }
+    }
+
+}
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/foreignkeyjoin/SubscriptionWrapperSerde.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/foreignkeyjoin/SubscriptionWrapperSerde.java
index d2cc989..7fe124a 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/foreignkeyjoin/SubscriptionWrapperSerde.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/foreignkeyjoin/SubscriptionWrapperSerde.java
@@ -21,32 +21,22 @@ import org.apache.kafka.common.serialization.Deserializer;
 import org.apache.kafka.common.serialization.Serde;
 import org.apache.kafka.common.serialization.Serializer;
 import org.apache.kafka.streams.kstream.internals.WrappingNullableDeserializer;
+import org.apache.kafka.streams.kstream.internals.WrappingNullableSerde;
 import org.apache.kafka.streams.kstream.internals.WrappingNullableSerializer;
 
 import java.nio.ByteBuffer;
 import java.util.Objects;
 import java.util.function.Supplier;
 
-public class SubscriptionWrapperSerde<K> implements Serde<SubscriptionWrapper<K>> {
-    private final SubscriptionWrapperSerializer<K> serializer;
-    private final SubscriptionWrapperDeserializer<K> deserializer;
-
+public class SubscriptionWrapperSerde<K> extends WrappingNullableSerde<SubscriptionWrapper<K>, K, Void> {
     public SubscriptionWrapperSerde(final Supplier<String> primaryKeySerializationPseudoTopicSupplier,
                                     final Serde<K> primaryKeySerde) {
-        serializer = new SubscriptionWrapperSerializer<>(primaryKeySerializationPseudoTopicSupplier,
-                                                         primaryKeySerde == null ? null : primaryKeySerde.serializer());
-        deserializer = new SubscriptionWrapperDeserializer<>(primaryKeySerializationPseudoTopicSupplier,
-                                                             primaryKeySerde == null ? null : primaryKeySerde.deserializer());
-    }
-
-    @Override
-    public Serializer<SubscriptionWrapper<K>> serializer() {
-        return serializer;
-    }
-
-    @Override
-    public Deserializer<SubscriptionWrapper<K>> deserializer() {
-        return deserializer;
+        super(
+            new SubscriptionWrapperSerializer<>(primaryKeySerializationPseudoTopicSupplier,
+                                                primaryKeySerde == null ? null : primaryKeySerde.serializer()),
+            new SubscriptionWrapperDeserializer<>(primaryKeySerializationPseudoTopicSupplier,
+                                                  primaryKeySerde == null ? null : primaryKeySerde.deserializer())
+        );
     }
 
     private static class SubscriptionWrapperSerializer<K>
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorContextUtils.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorContextUtils.java
index 9c84aab..8d053df 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorContextUtils.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorContextUtils.java
@@ -16,6 +16,9 @@
  */
 package org.apache.kafka.streams.processor.internals;
 
+import org.apache.kafka.common.serialization.Deserializer;
+import org.apache.kafka.common.serialization.Serde;
+import org.apache.kafka.common.serialization.Serializer;
 import org.apache.kafka.streams.processor.ProcessorContext;
 import org.apache.kafka.streams.processor.StateStoreContext;
 import org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl;
@@ -86,4 +89,25 @@ public final class ProcessorContextUtils {
             );
         }
     }
+
+    public static Serializer<?> getKeySerializer(final ProcessorContext processorContext) {
+        return getSerializer(processorContext, true);
+    }
+    public static Serializer<?> getValueSerializer(final ProcessorContext processorContext) {
+        return getSerializer(processorContext, false);
+    }
+    private static Serializer<?> getSerializer(final ProcessorContext processorContext, final boolean key) {
+        final Serde<?> serde = key ? processorContext.keySerde() : processorContext.valueSerde();
+        return serde == null ? null : serde.serializer();
+    }
+    public static Deserializer<?> getKeyDeserializer(final ProcessorContext processorContext) {
+        return getDeserializer(processorContext, true);
+    }
+    public static Deserializer<?> getValueDeserializer(final ProcessorContext processorContext) {
+        return getDeserializer(processorContext, false);
+    }
+    private static Deserializer<?> getDeserializer(final ProcessorContext processorContext, final boolean key) {
+        final Serde<?> serde = key ? processorContext.keySerde() : processorContext.valueSerde();
+        return serde == null ? null : serde.deserializer();
+    }
 }
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/SinkNode.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/SinkNode.java
index 2e4307c..2efa537 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/SinkNode.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/SinkNode.java
@@ -17,11 +17,13 @@
 package org.apache.kafka.streams.processor.internals;
 
 import org.apache.kafka.common.serialization.Serializer;
-import org.apache.kafka.streams.kstream.internals.WrappingNullableSerializer;
 import org.apache.kafka.streams.processor.StreamPartitioner;
 import org.apache.kafka.streams.processor.TopicNameExtractor;
 import org.apache.kafka.streams.processor.api.Record;
 
+import static org.apache.kafka.streams.kstream.internals.WrappingNullableUtils.prepareKeySerializer;
+import static org.apache.kafka.streams.kstream.internals.WrappingNullableUtils.prepareValueSerializer;
+
 public class SinkNode<KIn, VIn, KOut, VOut> extends ProcessorNode<KIn, VIn, KOut, VOut> {
 
     private Serializer<KIn> keySerializer;
@@ -52,28 +54,14 @@ public class SinkNode<KIn, VIn, KOut, VOut> extends ProcessorNode<KIn, VIn, KOut
         throw new UnsupportedOperationException("sink node does not allow addChild");
     }
 
-    @SuppressWarnings("unchecked")
     @Override
     public void init(final InternalProcessorContext context) {
         super.init(context);
         this.context = context;
-
-        // if serializers are null, get the default ones from the context
-        if (keySerializer == null) {
-            keySerializer = (Serializer<KIn>) context.keySerde().serializer();
-        }
-        if (valSerializer == null) {
-            valSerializer = (Serializer<VIn>) context.valueSerde().serializer();
-        }
-
-        // if serializers are internal wrapping serializers that may need to be given the default serializer
-        // then pass it the default one from the context
-        if (valSerializer instanceof WrappingNullableSerializer) {
-            ((WrappingNullableSerializer) valSerializer).setIfUnset(
-                context.keySerde().serializer(),
-                context.valueSerde().serializer()
-            );
-        }
+        final Serializer<?> contextKeySerializer = ProcessorContextUtils.getKeySerializer(context);
+        final Serializer<?> contextValueSerializer = ProcessorContextUtils.getValueSerializer(context);
+        keySerializer = prepareKeySerializer(keySerializer, contextKeySerializer, contextValueSerializer);
+        valSerializer = prepareValueSerializer(valSerializer, contextKeySerializer, contextValueSerializer);
     }
 
     @Override
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/SourceNode.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/SourceNode.java
index 7fa8c64..7198f2f 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/SourceNode.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/SourceNode.java
@@ -19,11 +19,13 @@ package org.apache.kafka.streams.processor.internals;
 import org.apache.kafka.common.header.Headers;
 import org.apache.kafka.common.metrics.Sensor;
 import org.apache.kafka.common.serialization.Deserializer;
-import org.apache.kafka.streams.kstream.internals.WrappingNullableDeserializer;
 import org.apache.kafka.streams.processor.TimestampExtractor;
 import org.apache.kafka.streams.processor.api.Record;
 import org.apache.kafka.streams.processor.internals.metrics.ProcessorNodeMetrics;
 
+import static org.apache.kafka.streams.kstream.internals.WrappingNullableUtils.prepareKeyDeserializer;
+import static org.apache.kafka.streams.kstream.internals.WrappingNullableUtils.prepareValueDeserializer;
+
 public class SourceNode<KIn, VIn, KOut, VOut> extends ProcessorNode<KIn, VIn, KOut, VOut> {
 
     private InternalProcessorContext context;
@@ -56,7 +58,6 @@ public class SourceNode<KIn, VIn, KOut, VOut> extends ProcessorNode<KIn, VIn, KO
         return valDeserializer.deserialize(topic, headers, data);
     }
 
-    @SuppressWarnings("unchecked")
     @Override
     public void init(final InternalProcessorContext context) {
         // It is important to first create the sensor before calling init on the
@@ -73,22 +74,10 @@ public class SourceNode<KIn, VIn, KOut, VOut> extends ProcessorNode<KIn, VIn, KO
         super.init(context);
         this.context = context;
 
-        // if deserializers are null, get the default ones from the context
-        if (this.keyDeserializer == null) {
-            this.keyDeserializer = (Deserializer<KIn>) context.keySerde().deserializer();
-        }
-        if (this.valDeserializer == null) {
-            this.valDeserializer = (Deserializer<VIn>) context.valueSerde().deserializer();
-        }
-
-        // if deserializers are internal wrapping deserializers that may need to be given the default
-        // then pass it the default one from the context
-        if (valDeserializer instanceof WrappingNullableDeserializer) {
-            ((WrappingNullableDeserializer) valDeserializer).setIfUnset(
-                    context.keySerde().deserializer(),
-                    context.valueSerde().deserializer()
-            );
-        }
+        final Deserializer<?> contextKeyDeserializer = ProcessorContextUtils.getKeyDeserializer(context);
+        final Deserializer<?> contextValueDeserializer = ProcessorContextUtils.getValueDeserializer(context);
+        keyDeserializer = prepareKeyDeserializer(keyDeserializer, contextKeyDeserializer, contextValueDeserializer);
+        valDeserializer = prepareValueDeserializer(valDeserializer, contextKeyDeserializer, contextValueDeserializer);
     }
 
 
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredKeyValueStore.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredKeyValueStore.java
index 2c45358..a9470ee 100644
--- a/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredKeyValueStore.java
+++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredKeyValueStore.java
@@ -22,6 +22,7 @@ import org.apache.kafka.common.utils.Bytes;
 import org.apache.kafka.common.utils.Time;
 import org.apache.kafka.streams.KeyValue;
 import org.apache.kafka.streams.errors.ProcessorStateException;
+import org.apache.kafka.streams.kstream.internals.WrappingNullableUtils;
 import org.apache.kafka.streams.processor.ProcessorContext;
 import org.apache.kafka.streams.processor.StateStore;
 import org.apache.kafka.streams.processor.StateStoreContext;
@@ -37,6 +38,7 @@ import org.apache.kafka.streams.state.internals.metrics.StateStoreMetrics;
 import java.util.ArrayList;
 import java.util.List;
 
+import static org.apache.kafka.streams.kstream.internals.WrappingNullableUtils.prepareKeySerde;
 import static org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.maybeMeasureLatency;
 
 /**
@@ -130,29 +132,34 @@ public class MeteredKeyValueStore<K, V>
         e2eLatencySensor = StateStoreMetrics.e2ELatencySensor(taskId, metricsScope, name(), streamsMetrics);
     }
 
+    protected Serde<V> prepareValueSerdeForStore(final Serde<V> valueSerde, final Serde<?> contextKeySerde, final Serde<?> contextValueSerde) {
+        return WrappingNullableUtils.prepareValueSerde(valueSerde, contextKeySerde, contextValueSerde);
+    }
+
+
     @Deprecated
-    @SuppressWarnings("unchecked")
-    void initStoreSerde(final ProcessorContext context) {
+    private void initStoreSerde(final ProcessorContext context) {
         final String storeName = name();
         final String changelogTopic = ProcessorContextUtils.changelogFor(context, storeName);
         serdes = new StateSerdes<>(
             changelogTopic != null ?
                 changelogTopic :
                 ProcessorStateManager.storeChangelogTopic(context.applicationId(), storeName),
-            keySerde == null ? (Serde<K>) context.keySerde() : keySerde,
-            valueSerde == null ? (Serde<V>) context.valueSerde() : valueSerde);
+            prepareKeySerde(keySerde, context.keySerde(), context.valueSerde()),
+            prepareValueSerdeForStore(valueSerde, context.keySerde(), context.valueSerde())
+        );
     }
 
-    @SuppressWarnings("unchecked")
-    void initStoreSerde(final StateStoreContext context) {
+    private void initStoreSerde(final StateStoreContext context) {
         final String storeName = name();
         final String changelogTopic = ProcessorContextUtils.changelogFor(context, storeName);
         serdes = new StateSerdes<>(
-             changelogTopic != null ?
+            changelogTopic != null ?
                 changelogTopic :
                 ProcessorStateManager.storeChangelogTopic(context.applicationId(), storeName),
-            keySerde == null ? (Serde<K>) context.keySerde() : keySerde,
-            valueSerde == null ? (Serde<V>) context.valueSerde() : valueSerde);
+            prepareKeySerde(keySerde, context.keySerde(), context.valueSerde()),
+            prepareValueSerdeForStore(valueSerde, context.keySerde(), context.valueSerde())
+        );
     }
 
     @SuppressWarnings("unchecked")
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredSessionStore.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredSessionStore.java
index f7f25c0..0419192 100644
--- a/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredSessionStore.java
+++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredSessionStore.java
@@ -22,6 +22,7 @@ import org.apache.kafka.common.utils.Bytes;
 import org.apache.kafka.common.utils.Time;
 import org.apache.kafka.streams.errors.ProcessorStateException;
 import org.apache.kafka.streams.kstream.Windowed;
+import org.apache.kafka.streams.kstream.internals.WrappingNullableUtils;
 import org.apache.kafka.streams.processor.ProcessorContext;
 import org.apache.kafka.streams.processor.StateStore;
 import org.apache.kafka.streams.processor.StateStoreContext;
@@ -111,7 +112,7 @@ public class MeteredSessionStore<K, V>
         e2eLatencySensor = StateStoreMetrics.e2ELatencySensor(taskId, metricsScope, name(), streamsMetrics);
     }
 
-    @SuppressWarnings("unchecked")
+
     private void initStoreSerde(final ProcessorContext context) {
         final String storeName = name();
         final String changelogTopic = ProcessorContextUtils.changelogFor(context, storeName);
@@ -119,12 +120,11 @@ public class MeteredSessionStore<K, V>
             changelogTopic != null ?
                 changelogTopic :
                 ProcessorStateManager.storeChangelogTopic(context.applicationId(), storeName),
-            keySerde == null ? (Serde<K>) context.keySerde() : keySerde,
-            valueSerde == null ? (Serde<V>) context.valueSerde() : valueSerde
+                WrappingNullableUtils.prepareKeySerde(keySerde, context.keySerde(), context.valueSerde()),
+                WrappingNullableUtils.prepareValueSerde(valueSerde, context.keySerde(), context.valueSerde())
         );
     }
 
-    @SuppressWarnings("unchecked")
     private void initStoreSerde(final StateStoreContext context) {
         final String storeName = name();
         final String changelogTopic = ProcessorContextUtils.changelogFor(context, storeName);
@@ -132,8 +132,8 @@ public class MeteredSessionStore<K, V>
             changelogTopic != null ?
                 changelogTopic :
                 ProcessorStateManager.storeChangelogTopic(context.applicationId(), storeName),
-            keySerde == null ? (Serde<K>) context.keySerde() : keySerde,
-            valueSerde == null ? (Serde<V>) context.valueSerde() : valueSerde
+                WrappingNullableUtils.prepareKeySerde(keySerde, context.keySerde(), context.valueSerde()),
+                WrappingNullableUtils.prepareValueSerde(valueSerde, context.keySerde(), context.valueSerde())
         );
     }
 
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredTimestampedKeyValueStore.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredTimestampedKeyValueStore.java
index bd9cb92..19d3900 100644
--- a/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredTimestampedKeyValueStore.java
+++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredTimestampedKeyValueStore.java
@@ -22,12 +22,7 @@ import org.apache.kafka.common.serialization.Serde;
 import org.apache.kafka.common.utils.Bytes;
 import org.apache.kafka.common.utils.Time;
 import org.apache.kafka.streams.errors.ProcessorStateException;
-import org.apache.kafka.streams.processor.ProcessorContext;
-import org.apache.kafka.streams.processor.StateStoreContext;
-import org.apache.kafka.streams.processor.internals.ProcessorContextUtils;
-import org.apache.kafka.streams.processor.internals.ProcessorStateManager;
 import org.apache.kafka.streams.state.KeyValueStore;
-import org.apache.kafka.streams.state.StateSerdes;
 import org.apache.kafka.streams.state.TimestampedKeyValueStore;
 import org.apache.kafka.streams.state.ValueAndTimestamp;
 
@@ -51,31 +46,18 @@ public class MeteredTimestampedKeyValueStore<K, V>
         super(inner, metricScope, time, keySerde, valueSerde);
     }
 
-    @Deprecated
-    @SuppressWarnings("unchecked")
-    void initStoreSerde(final ProcessorContext context) {
-        final String storeName = name();
-        final String changelogTopic = ProcessorContextUtils.changelogFor(context, storeName);
-        serdes = new StateSerdes<>(
-            changelogTopic != null ?
-                changelogTopic :
-                ProcessorStateManager.storeChangelogTopic(context.applicationId(), storeName),
-            keySerde == null ? (Serde<K>) context.keySerde() : keySerde,
-            valueSerde == null ? new ValueAndTimestampSerde<>((Serde<V>) context.valueSerde()) : valueSerde);
-    }
 
     @SuppressWarnings("unchecked")
-    void initStoreSerde(final StateStoreContext context) {
-        final String storeName = name();
-        final String changelogTopic = ProcessorContextUtils.changelogFor(context, storeName);
-        serdes = new StateSerdes<>(
-            changelogTopic != null ?
-                changelogTopic :
-                ProcessorStateManager.storeChangelogTopic(context.applicationId(), storeName),
-            keySerde == null ? (Serde<K>) context.keySerde() : keySerde,
-            valueSerde == null ? new ValueAndTimestampSerde<>((Serde<V>) context.valueSerde()) : valueSerde);
+    @Override
+    protected Serde<ValueAndTimestamp<V>> prepareValueSerdeForStore(final Serde<ValueAndTimestamp<V>> valueSerde, final Serde<?> contextKeySerde, final Serde<?> contextValueSerde) {
+        if (valueSerde == null) {
+            return new ValueAndTimestampSerde<>((Serde<V>) contextValueSerde);
+        } else {
+            return super.prepareValueSerdeForStore(valueSerde, contextKeySerde, contextValueSerde);
+        }
     }
 
+
     public RawAndDeserializedValue<V> getWithBinary(final K key) {
         try {
             return maybeMeasureLatency(() -> { 
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredTimestampedWindowStore.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredTimestampedWindowStore.java
index 258db89..7d37f16 100644
--- a/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredTimestampedWindowStore.java
+++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredTimestampedWindowStore.java
@@ -19,11 +19,6 @@ package org.apache.kafka.streams.state.internals;
 import org.apache.kafka.common.serialization.Serde;
 import org.apache.kafka.common.utils.Bytes;
 import org.apache.kafka.common.utils.Time;
-import org.apache.kafka.streams.processor.ProcessorContext;
-import org.apache.kafka.streams.processor.StateStoreContext;
-import org.apache.kafka.streams.processor.internals.ProcessorContextUtils;
-import org.apache.kafka.streams.processor.internals.ProcessorStateManager;
-import org.apache.kafka.streams.state.StateSerdes;
 import org.apache.kafka.streams.state.TimestampedWindowStore;
 import org.apache.kafka.streams.state.ValueAndTimestamp;
 import org.apache.kafka.streams.state.WindowStore;
@@ -50,32 +45,13 @@ class MeteredTimestampedWindowStore<K, V>
         super(inner, windowSizeMs, metricScope, time, keySerde, valueSerde);
     }
 
-    @Deprecated
     @SuppressWarnings("unchecked")
     @Override
-    void initStoreSerde(final ProcessorContext context) {
-        final String storeName = name();
-        final String changelogTopic = ProcessorContextUtils.changelogFor(context, storeName);
-        serdes = new StateSerdes<>(
-            changelogTopic != null ?
-                changelogTopic :
-                ProcessorStateManager.storeChangelogTopic(context.applicationId(), storeName),
-            keySerde == null ? (Serde<K>) context.keySerde() : keySerde,
-            valueSerde == null ? new ValueAndTimestampSerde<>((Serde<V>) context.valueSerde()) : valueSerde
-        );
-    }
-
-    @SuppressWarnings("unchecked")
-    @Override
-    void initStoreSerde(final StateStoreContext context) {
-        final String storeName = name();
-        final String changelogTopic = ProcessorContextUtils.changelogFor(context, storeName);
-        serdes = new StateSerdes<>(
-            changelogTopic != null ?
-                changelogTopic :
-                ProcessorStateManager.storeChangelogTopic(context.applicationId(), storeName),
-            keySerde == null ? (Serde<K>) context.keySerde() : keySerde,
-            valueSerde == null ? new ValueAndTimestampSerde<>((Serde<V>) context.valueSerde()) : valueSerde
-        );
+    protected Serde<ValueAndTimestamp<V>> prepareValueSerde(final Serde<ValueAndTimestamp<V>> valueSerde, final Serde<?> contextKeySerde, final Serde<?> contextValueSerde) {
+        if (valueSerde == null) {
+            return new ValueAndTimestampSerde<>((Serde<V>) contextValueSerde);
+        } else {
+            return super.prepareValueSerde(valueSerde, contextKeySerde, contextValueSerde);
+        }
     }
 }
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredWindowStore.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredWindowStore.java
index d47233b..1b419ec 100644
--- a/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredWindowStore.java
+++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredWindowStore.java
@@ -22,6 +22,7 @@ import org.apache.kafka.common.utils.Bytes;
 import org.apache.kafka.common.utils.Time;
 import org.apache.kafka.streams.errors.ProcessorStateException;
 import org.apache.kafka.streams.kstream.Windowed;
+import org.apache.kafka.streams.kstream.internals.WrappingNullableUtils;
 import org.apache.kafka.streams.processor.ProcessorContext;
 import org.apache.kafka.streams.processor.StateStore;
 import org.apache.kafka.streams.processor.StateStoreContext;
@@ -35,6 +36,7 @@ import org.apache.kafka.streams.state.WindowStore;
 import org.apache.kafka.streams.state.WindowStoreIterator;
 import org.apache.kafka.streams.state.internals.metrics.StateStoreMetrics;
 
+import static org.apache.kafka.streams.kstream.internals.WrappingNullableUtils.prepareKeySerde;
 import static org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.maybeMeasureLatency;
 
 public class MeteredWindowStore<K, V>
@@ -103,6 +105,9 @@ public class MeteredWindowStore<K, V>
         // register and possibly restore the state from the logs
         maybeMeasureLatency(() -> super.init(context, root), time, restoreSensor);
     }
+    protected Serde<V> prepareValueSerde(final Serde<V> valueSerde, final Serde<?> contextKeySerde, final Serde<?> contextValueSerde) {
+        return WrappingNullableUtils.prepareValueSerde(valueSerde, contextKeySerde, contextValueSerde);
+    }
 
     private void registerMetrics() {
         putSensor = StateStoreMetrics.putSensor(threadId, taskId, metricsScope, name(), streamsMetrics);
@@ -112,28 +117,26 @@ public class MeteredWindowStore<K, V>
     }
 
     @Deprecated
-    @SuppressWarnings("unchecked")
-    void initStoreSerde(final ProcessorContext context) {
+    private void initStoreSerde(final ProcessorContext context) {
         final String storeName = name();
         final String changelogTopic = ProcessorContextUtils.changelogFor(context, storeName);
         serdes = new StateSerdes<>(
             changelogTopic != null ?
                 changelogTopic :
                 ProcessorStateManager.storeChangelogTopic(context.applicationId(), storeName),
-            keySerde == null ? (Serde<K>) context.keySerde() : keySerde,
-            valueSerde == null ? (Serde<V>) context.valueSerde() : valueSerde);
+            prepareKeySerde(keySerde, context.keySerde(), context.valueSerde()),
+            prepareValueSerde(valueSerde, context.keySerde(), context.valueSerde()));
     }
 
-    @SuppressWarnings("unchecked")
-    void initStoreSerde(final StateStoreContext context) {
+    private void initStoreSerde(final StateStoreContext context) {
         final String storeName = name();
         final String changelogTopic = ProcessorContextUtils.changelogFor(context, storeName);
         serdes = new StateSerdes<>(
             changelogTopic != null ?
                 changelogTopic :
                 ProcessorStateManager.storeChangelogTopic(context.applicationId(), storeName),
-            keySerde == null ? (Serde<K>) context.keySerde() : keySerde,
-            valueSerde == null ? (Serde<V>) context.valueSerde() : valueSerde);
+            prepareKeySerde(keySerde, context.keySerde(), context.valueSerde()),
+            prepareValueSerde(valueSerde, context.keySerde(), context.valueSerde()));
     }
 
     @SuppressWarnings("unchecked")
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/ValueAndTimestampDeserializer.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/ValueAndTimestampDeserializer.java
index 7cd37d2..b7e56a3 100644
--- a/streams/src/main/java/org/apache/kafka/streams/state/internals/ValueAndTimestampDeserializer.java
+++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/ValueAndTimestampDeserializer.java
@@ -18,13 +18,16 @@ package org.apache.kafka.streams.state.internals;
 
 import org.apache.kafka.common.serialization.Deserializer;
 import org.apache.kafka.common.serialization.LongDeserializer;
+import org.apache.kafka.streams.kstream.internals.WrappingNullableDeserializer;
 import org.apache.kafka.streams.state.ValueAndTimestamp;
 
 import java.nio.ByteBuffer;
 import java.util.Map;
 import java.util.Objects;
 
-class ValueAndTimestampDeserializer<V> implements Deserializer<ValueAndTimestamp<V>> {
+import static org.apache.kafka.streams.kstream.internals.WrappingNullableUtils.initNullableDeserializer;
+
+class ValueAndTimestampDeserializer<V> implements WrappingNullableDeserializer<ValueAndTimestamp<V>, Void, V> {
     private final static LongDeserializer LONG_DESERIALIZER = new LongDeserializer();
 
     public final Deserializer<V> valueDeserializer;
@@ -81,4 +84,10 @@ class ValueAndTimestampDeserializer<V> implements Deserializer<ValueAndTimestamp
         return LONG_DESERIALIZER.deserialize(null, rawTimestamp(rawValueAndTimestamp));
     }
 
+    @Override
+    public void setIfUnset(final Deserializer<Void> defaultKeyDeserializer, final Deserializer<V> defaultValueDeserializer) {
+        // ValueAndTimestampDeserializer never wraps a null deserializer (or configure would throw),
+        // but it may wrap a deserializer that itself wraps a null deserializer.
+        initNullableDeserializer(valueDeserializer, defaultKeyDeserializer, defaultValueDeserializer);
+    }
 }
\ No newline at end of file
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/ValueAndTimestampSerde.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/ValueAndTimestampSerde.java
index c02992f..1936d29 100644
--- a/streams/src/main/java/org/apache/kafka/streams/state/internals/ValueAndTimestampSerde.java
+++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/ValueAndTimestampSerde.java
@@ -16,44 +16,17 @@
  */
 package org.apache.kafka.streams.state.internals;
 
-import org.apache.kafka.common.serialization.Deserializer;
 import org.apache.kafka.common.serialization.Serde;
-import org.apache.kafka.common.serialization.Serializer;
+import org.apache.kafka.streams.kstream.internals.WrappingNullableSerde;
 import org.apache.kafka.streams.state.ValueAndTimestamp;
 
-import java.util.Map;
-import java.util.Objects;
-
-public class ValueAndTimestampSerde<V> implements Serde<ValueAndTimestamp<V>> {
-    private final ValueAndTimestampSerializer<V> valueAndTimestampSerializer;
-    private final ValueAndTimestampDeserializer<V> valueAndTimestampDeserializer;
+import static java.util.Objects.requireNonNull;
 
+public class ValueAndTimestampSerde<V> extends WrappingNullableSerde<ValueAndTimestamp<V>, Void, V> {
     public ValueAndTimestampSerde(final Serde<V> valueSerde) {
-        Objects.requireNonNull(valueSerde);
-        valueAndTimestampSerializer = new ValueAndTimestampSerializer<>(valueSerde.serializer());
-        valueAndTimestampDeserializer = new ValueAndTimestampDeserializer<>(valueSerde.deserializer());
-    }
-
-    @Override
-    public void configure(final Map<String, ?> configs,
-                          final boolean isKey) {
-        valueAndTimestampSerializer.configure(configs, isKey);
-        valueAndTimestampDeserializer.configure(configs, isKey);
-    }
-
-    @Override
-    public void close() {
-        valueAndTimestampSerializer.close();
-        valueAndTimestampDeserializer.close();
-    }
-
-    @Override
-    public Serializer<ValueAndTimestamp<V>> serializer() {
-        return valueAndTimestampSerializer;
-    }
-
-    @Override
-    public Deserializer<ValueAndTimestamp<V>> deserializer() {
-        return valueAndTimestampDeserializer;
+        super(
+            new ValueAndTimestampSerializer<>(requireNonNull(valueSerde, "valueSerde was null").serializer()),
+            new ValueAndTimestampDeserializer<>(requireNonNull(valueSerde, "valueSerde was null").deserializer())
+        );
     }
 }
\ No newline at end of file
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/ValueAndTimestampSerializer.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/ValueAndTimestampSerializer.java
index 8193ab2..8f01dee 100644
--- a/streams/src/main/java/org/apache/kafka/streams/state/internals/ValueAndTimestampSerializer.java
+++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/ValueAndTimestampSerializer.java
@@ -18,13 +18,16 @@ package org.apache.kafka.streams.state.internals;
 
 import org.apache.kafka.common.serialization.LongSerializer;
 import org.apache.kafka.common.serialization.Serializer;
+import org.apache.kafka.streams.kstream.internals.WrappingNullableSerializer;
 import org.apache.kafka.streams.state.ValueAndTimestamp;
 
 import java.nio.ByteBuffer;
 import java.util.Map;
 import java.util.Objects;
 
-public class ValueAndTimestampSerializer<V> implements Serializer<ValueAndTimestamp<V>> {
+import static org.apache.kafka.streams.kstream.internals.WrappingNullableUtils.initNullableSerializer;
+
+public class ValueAndTimestampSerializer<V> implements WrappingNullableSerializer<ValueAndTimestamp<V>, Void, V> {
     public final Serializer<V> valueSerializer;
     private final Serializer<Long> timestampSerializer;
 
@@ -119,4 +122,11 @@ public class ValueAndTimestampSerializer<V> implements Serializer<ValueAndTimest
         }
         return true;
     }
+
+    @Override
+    public void setIfUnset(final Serializer<Void> defaultKeySerializer, final Serializer<V> defaultValueSerializer) {
+        // ValueAndTimestampSerializer never wraps a null serializer (or configure would throw),
+        // but it may wrap a serializer that itself wraps a null serializer.
+        initNullableSerializer(valueSerializer, defaultKeySerializer, defaultValueSerializer);
+    }
 }
diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/KTableKTableForeignKeyJoinDistributedTest.java b/streams/src/test/java/org/apache/kafka/streams/integration/KTableKTableForeignKeyJoinDistributedTest.java
new file mode 100644
index 0000000..5b1304f
--- /dev/null
+++ b/streams/src/test/java/org/apache/kafka/streams/integration/KTableKTableForeignKeyJoinDistributedTest.java
@@ -0,0 +1,235 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.streams.integration;
+
+import org.apache.kafka.clients.consumer.ConsumerConfig;
+import org.apache.kafka.clients.producer.ProducerConfig;
+import org.apache.kafka.common.serialization.Serdes;
+import org.apache.kafka.common.serialization.StringDeserializer;
+import org.apache.kafka.common.serialization.StringSerializer;
+import org.apache.kafka.streams.KafkaStreams;
+import org.apache.kafka.streams.KeyValue;
+import org.apache.kafka.streams.StreamsBuilder;
+import org.apache.kafka.streams.StreamsConfig;
+import org.apache.kafka.streams.Topology;
+import org.apache.kafka.streams.integration.utils.EmbeddedKafkaCluster;
+import org.apache.kafka.streams.integration.utils.IntegrationTestUtils;
+import org.apache.kafka.streams.kstream.KTable;
+import org.apache.kafka.streams.kstream.ValueJoiner;
+import org.apache.kafka.streams.processor.ThreadMetadata;
+import org.apache.kafka.test.IntegrationTest;
+import org.apache.kafka.test.TestUtils;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.ClassRule;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+import org.junit.rules.TestName;
+
+import java.util.Arrays;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Properties;
+import java.util.Set;
+import java.util.function.Function;
+import java.util.function.Predicate;
+
+import static org.apache.kafka.streams.integration.utils.IntegrationTestUtils.quietlyCleanStateAfterTest;
+import static org.apache.kafka.streams.integration.utils.IntegrationTestUtils.safeUniqueTestName;
+import static org.junit.Assert.assertEquals;
+
+@Category({IntegrationTest.class})
+public class KTableKTableForeignKeyJoinDistributedTest {
+    private static final int NUM_BROKERS = 1;
+    private static final String LEFT_TABLE = "left_table";
+    private static final String RIGHT_TABLE = "right_table";
+    private static final String OUTPUT = "output-topic";
+    @ClassRule
+    public static final EmbeddedKafkaCluster CLUSTER = new EmbeddedKafkaCluster(NUM_BROKERS);
+    private static final Properties CONSUMER_CONFIG = new Properties();
+
+    @Rule
+    public TestName testName = new TestName();
+
+
+    private static final String INPUT_TOPIC = "input-topic";
+
+    private KafkaStreams client1;
+    private KafkaStreams client2;
+
+    private volatile boolean client1IsOk = false;
+    private volatile boolean client2IsOk = false;
+
+    @BeforeClass
+    public static void createTopics() throws InterruptedException {
+        CLUSTER.createTopic(INPUT_TOPIC, 2, 1);
+    }
+
+    @Before
+    public void setupTopics() throws InterruptedException {
+        CLUSTER.createTopic(LEFT_TABLE, 1, 1);
+        CLUSTER.createTopic(RIGHT_TABLE, 1, 1);
+        CLUSTER.createTopic(OUTPUT, 11, 1);
+
+        //Fill test tables
+        final Properties producerConfig = new Properties();
+        producerConfig.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, CLUSTER.bootstrapServers());
+        producerConfig.put(ProducerConfig.ACKS_CONFIG, "all");
+        producerConfig.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
+        producerConfig.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
+        final List<KeyValue<String, String>> leftTable = Arrays.asList(
+                new KeyValue<>("lhsValue1", "lhsValue1|rhs1"),
+                new KeyValue<>("lhsValue2", "lhsValue2|rhs2"),
+                new KeyValue<>("lhsValue3", "lhsValue3|rhs3"),
+                new KeyValue<>("lhsValue4", "lhsValue4|rhs4")
+        );
+        final List<KeyValue<String, String>> rightTable = Arrays.asList(
+                new KeyValue<>("rhs1", "rhsValue1"),
+                new KeyValue<>("rhs2", "rhsValue2"),
+                new KeyValue<>("rhs3", "rhsValue3")
+        );
+
+        IntegrationTestUtils.produceKeyValuesSynchronously(LEFT_TABLE, leftTable, producerConfig, CLUSTER.time);
+        IntegrationTestUtils.produceKeyValuesSynchronously(RIGHT_TABLE, rightTable, producerConfig, CLUSTER.time);
+
+        CONSUMER_CONFIG.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, CLUSTER.bootstrapServers());
+        CONSUMER_CONFIG.put(ConsumerConfig.GROUP_ID_CONFIG, "ktable-ktable-distributed-consumer");
+        CONSUMER_CONFIG.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
+        CONSUMER_CONFIG.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
+    }
+
+    @After
+    public void after() {
+        client1.close();
+        client2.close();
+        quietlyCleanStateAfterTest(CLUSTER, client1);
+        quietlyCleanStateAfterTest(CLUSTER, client2);
+    }
+
+    public Properties getStreamsConfiguration() {
+        final String safeTestName = safeUniqueTestName(getClass(), testName);
+        final Properties streamsConfiguration = new Properties();
+        streamsConfiguration.put(StreamsConfig.APPLICATION_ID_CONFIG, "app-" + safeTestName);
+        streamsConfiguration.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, CLUSTER.bootstrapServers());
+        streamsConfiguration.put(StreamsConfig.STATE_DIR_CONFIG, TestUtils.tempDirectory().getPath());
+        streamsConfiguration.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass());
+        streamsConfiguration.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass());
+        streamsConfiguration.put(StreamsConfig.TOPOLOGY_OPTIMIZATION_CONFIG, StreamsConfig.OPTIMIZE);
+        return streamsConfiguration;
+    }
+
+
+    private void configureBuilder(final StreamsBuilder builder) {
+        final KTable<String, String> left = builder.table(
+                LEFT_TABLE
+        );
+        final KTable<String, String> right = builder.table(
+                RIGHT_TABLE
+        );
+
+        final Function<String, String> extractor = value -> value.split("\\|")[1];
+        final ValueJoiner<String, String, String> joiner = (value1, value2) -> "(" + value1 + "," + value2 + ")";
+
+        final KTable<String, String> fkJoin = left.join(right, extractor, joiner);
+        fkJoin
+                .toStream()
+                .to(OUTPUT);
+    }
+
+    @Test
+    public void shouldBeInitializedWithDefaultSerde() throws Exception {
+        final Properties streamsConfiguration1 = getStreamsConfiguration();
+        final Properties streamsConfiguration2 = getStreamsConfiguration();
+
+        //Each streams client needs to have it's own StreamsBuilder in order to simulate
+        //a truly distributed run
+        final StreamsBuilder builder1 = new StreamsBuilder();
+        configureBuilder(builder1);
+        final StreamsBuilder builder2 = new StreamsBuilder();
+        configureBuilder(builder2);
+
+
+        createClients(
+                builder1.build(streamsConfiguration1),
+                streamsConfiguration1,
+                builder2.build(streamsConfiguration2),
+                streamsConfiguration2
+        );
+
+        setStateListenersForVerification(thread -> !thread.activeTasks().isEmpty());
+
+        startClients();
+
+        waitUntilBothClientAreOK(
+                "At least one client did not reach state RUNNING with active tasks"
+        );
+        final Set<KeyValue<String, String>> expectedResult = new HashSet<>();
+        expectedResult.add(new KeyValue<>("lhsValue1", "(lhsValue1|rhs1,rhsValue1)"));
+        expectedResult.add(new KeyValue<>("lhsValue2", "(lhsValue2|rhs2,rhsValue2)"));
+        expectedResult.add(new KeyValue<>("lhsValue3", "(lhsValue3|rhs3,rhsValue3)"));
+        final Set<KeyValue<String, String>> result = new HashSet<>(IntegrationTestUtils.waitUntilMinKeyValueRecordsReceived(
+                CONSUMER_CONFIG,
+                OUTPUT,
+                expectedResult.size()));
+
+        assertEquals(expectedResult, result);
+        //Check that both clients are still running
+        assertEquals(KafkaStreams.State.RUNNING, client1.state());
+        assertEquals(KafkaStreams.State.RUNNING, client2.state());
+    }
+
+    private void createClients(final Topology topology1,
+                               final Properties streamsConfiguration1,
+                               final Topology topology2,
+                               final Properties streamsConfiguration2) {
+
+        client1 = new KafkaStreams(topology1, streamsConfiguration1);
+        client2 = new KafkaStreams(topology2, streamsConfiguration2);
+    }
+
+    private void setStateListenersForVerification(final Predicate<ThreadMetadata> taskCondition) {
+        client1.setStateListener((newState, oldState) -> {
+            if (newState == KafkaStreams.State.RUNNING &&
+                    client1.localThreadsMetadata().stream().allMatch(taskCondition)) {
+                client1IsOk = true;
+            }
+        });
+        client2.setStateListener((newState, oldState) -> {
+            if (newState == KafkaStreams.State.RUNNING &&
+                    client2.localThreadsMetadata().stream().allMatch(taskCondition)) {
+                client2IsOk = true;
+            }
+        });
+    }
+
+    private void startClients() {
+        client1.start();
+        client2.start();
+    }
+
+    private void waitUntilBothClientAreOK(final String message) throws Exception {
+        TestUtils.waitForCondition(() -> client1IsOk && client2IsOk,
+                30 * 1000,
+                message + ": "
+                        + "Client 1 is " + (!client1IsOk ? "NOT " : "") + "OK, "
+                        + "client 2 is " + (!client2IsOk ? "NOT " : "") + "OK."
+        );
+    }
+}
diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/GlobalStateStoreProviderTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/GlobalStateStoreProviderTest.java
index 851bb66..cb6f209 100644
--- a/streams/src/test/java/org/apache/kafka/streams/state/internals/GlobalStateStoreProviderTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/GlobalStateStoreProviderTest.java
@@ -17,6 +17,7 @@
 package org.apache.kafka.streams.state.internals;
 
 import org.apache.kafka.common.metrics.Metrics;
+import org.apache.kafka.common.serialization.Serde;
 import org.apache.kafka.common.serialization.Serdes;
 import org.apache.kafka.common.utils.MockTime;
 import org.apache.kafka.streams.StreamsConfig;
@@ -108,12 +109,19 @@ public class GlobalStateStoreProviderTest {
             );
         expect(mockContext.taskId()).andStubReturn(new TaskId(0, 0));
         expect(mockContext.recordCollector()).andStubReturn(null);
+        expectSerdes(mockContext);
         replay(mockContext);
         for (final StateStore store : stores.values()) {
             store.init((StateStoreContext) mockContext, null);
         }
     }
 
+    @SuppressWarnings({"unchecked", "rawtypes"})
+    private static void expectSerdes(final ProcessorContextImpl context) {
+        expect(context.keySerde()).andStubReturn((Serde) Serdes.String());
+        expect(context.valueSerde()).andStubReturn((Serde) Serdes.Long());
+    }
+
     @Test
     public void shouldReturnSingleItemListIfStoreExists() {
         final GlobalStateStoreProvider provider =
diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/MeteredTimestampedKeyValueStoreTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/MeteredTimestampedKeyValueStoreTest.java
index 405ab4f..c0ef7c6 100644
--- a/streams/src/test/java/org/apache/kafka/streams/state/internals/MeteredTimestampedKeyValueStoreTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/MeteredTimestampedKeyValueStoreTest.java
@@ -145,6 +145,7 @@ public class MeteredTimestampedKeyValueStoreTest {
             .andStubReturn(new StreamsMetricsImpl(metrics, "test", builtInMetricsVersion, mockTime));
         expect(context.taskId()).andStubReturn(taskId);
         expect(context.changelogFor(STORE_NAME)).andStubReturn(CHANGELOG_TOPIC);
+        expectSerdes();
         expect(inner.name()).andStubReturn(STORE_NAME);
         storeLevelGroup =
             StreamsConfig.METRICS_0100_TO_24.equals(builtInMetricsVersion) ? STORE_LEVEL_GROUP_FROM_0100_TO_24 : STORE_LEVEL_GROUP;
@@ -158,6 +159,12 @@ public class MeteredTimestampedKeyValueStoreTest {
 
     }
 
+    @SuppressWarnings({"unchecked", "rawtypes"})
+    private void expectSerdes() {
+        expect(context.keySerde()).andStubReturn((Serde) Serdes.String());
+        expect(context.valueSerde()).andStubReturn((Serde) Serdes.Long());
+    }
+
     private void init() {
         replay(inner, context);
         metered.init((StateStoreContext) context, metered);
@@ -459,8 +466,6 @@ public class MeteredTimestampedKeyValueStoreTest {
     @Test
     @SuppressWarnings("unchecked")
     public void shouldNotThrowExceptionIfSerdesCorrectlySetFromProcessorContext() {
-        expect(context.keySerde()).andStubReturn((Serde) Serdes.String());
-        expect(context.valueSerde()).andStubReturn((Serde) Serdes.Long());
         final MeteredTimestampedKeyValueStore<String, Long> store = new MeteredTimestampedKeyValueStore<>(
             inner,
             STORE_TYPE,
@@ -475,9 +480,13 @@ public class MeteredTimestampedKeyValueStoreTest {
             store.put("key", ValueAndTimestamp.make(42L, 60000));
         } catch (final StreamsException exception) {
             if (exception.getCause() instanceof ClassCastException) {
-                fail("Serdes are not correctly set from processor context.");
+                throw new AssertionError(
+                    "Serdes are not correctly set from processor context.",
+                    exception
+                );
+            } else {
+                throw exception;
             }
-            throw exception;
         }
     }