You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by vv...@apache.org on 2020/06/12 16:19:44 UTC

[kafka] branch 2.6 updated: KAFKA-10049: Fixed FKJ bug where wrapped serdes are set incorrectly when using default StreamsConfig serdes (#8764)

This is an automated email from the ASF dual-hosted git repository.

vvcephei pushed a commit to branch 2.6
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/2.6 by this push:
     new a98c007  KAFKA-10049: Fixed FKJ bug where wrapped serdes are set incorrectly when using default StreamsConfig serdes (#8764)
a98c007 is described below

commit a98c0077f83a44bfed5a5b521af54684ea7b3c65
Author: Adam Bellemare <ad...@shopify.com>
AuthorDate: Fri Jun 12 11:00:38 2020 -0400

    KAFKA-10049: Fixed FKJ bug where wrapped serdes are set incorrectly when using default StreamsConfig serdes (#8764)
    
    Bug Details:
    Mistakenly setting the value serde to the key serde for an internal wrapped serde in the FKJ workflow.
    
    Testing:
    Modified the existing test to reproduce the issue, then verified that the test passes.
    
    Reviewers: Guozhang Wang <wa...@gmail.com>, John Roesler <vv...@apache.org>
---
 checkstyle/import-control.xml                      |   1 +
 .../kstream/internals/ChangedDeserializer.java     |   6 +-
 .../kstream/internals/ChangedSerializer.java       |   6 +-
 .../internals/WrappingNullableDeserializer.java    |   6 +-
 .../internals/WrappingNullableSerializer.java      |   4 +-
 .../SubscriptionResponseWrapperSerde.java          |  12 +--
 .../foreignkeyjoin/SubscriptionWrapperSerde.java   |  12 +--
 .../streams/processor/internals/SinkNode.java      |   7 +-
 .../streams/processor/internals/SourceNode.java    |   7 +-
 .../KTableKTableForeignKeyJoinScenarioTest.java    | 103 +++++++++++----------
 10 files changed, 86 insertions(+), 78 deletions(-)

diff --git a/checkstyle/import-control.xml b/checkstyle/import-control.xml
index 7119265..8c3cb286 100644
--- a/checkstyle/import-control.xml
+++ b/checkstyle/import-control.xml
@@ -261,6 +261,7 @@
       <allow pkg="kafka.log" />
       <allow pkg="scala" />
       <allow class="kafka.zk.EmbeddedZookeeper"/>
+      <allow pkg="com.fasterxml.jackson" />
     </subpackage>
 
     <subpackage name="test">
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/ChangedDeserializer.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/ChangedDeserializer.java
index 90d5882..433a18d 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/ChangedDeserializer.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/ChangedDeserializer.java
@@ -22,7 +22,7 @@ import org.apache.kafka.common.serialization.Deserializer;
 import java.nio.ByteBuffer;
 import java.util.Objects;
 
-public class ChangedDeserializer<T> implements Deserializer<Change<T>>, WrappingNullableDeserializer<Change<T>, T> {
+public class ChangedDeserializer<T> implements Deserializer<Change<T>>, WrappingNullableDeserializer<Change<T>, Void, T> {
 
     private static final int NEWFLAG_SIZE = 1;
 
@@ -37,9 +37,9 @@ public class ChangedDeserializer<T> implements Deserializer<Change<T>>, Wrapping
     }
 
     @Override
-    public void setIfUnset(final Deserializer<T> defaultDeserializer) {
+    public void setIfUnset(final Deserializer<Void> defaultKeyDeserializer, final Deserializer<T> defaultValueDeserializer) {
         if (inner == null) {
-            inner = Objects.requireNonNull(defaultDeserializer, "defaultDeserializer cannot be null");
+            inner = Objects.requireNonNull(defaultValueDeserializer);
         }
     }
 
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/ChangedSerializer.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/ChangedSerializer.java
index 551d948..f5d63cd 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/ChangedSerializer.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/ChangedSerializer.java
@@ -23,7 +23,7 @@ import org.apache.kafka.streams.errors.StreamsException;
 import java.nio.ByteBuffer;
 import java.util.Objects;
 
-public class ChangedSerializer<T> implements Serializer<Change<T>>, WrappingNullableSerializer<Change<T>, T> {
+public class ChangedSerializer<T> implements Serializer<Change<T>>, WrappingNullableSerializer<Change<T>, Void, T> {
 
     private static final int NEWFLAG_SIZE = 1;
 
@@ -38,9 +38,9 @@ public class ChangedSerializer<T> implements Serializer<Change<T>>, WrappingNull
     }
 
     @Override
-    public void setIfUnset(final Serializer<T> defaultSerializer) {
+    public void setIfUnset(final Serializer<Void> defaultKeySerializer, final Serializer<T> defaultValueSerializer) {
         if (inner == null) {
-            inner = Objects.requireNonNull(defaultSerializer, "defaultSerializer cannot be null");
+            inner = Objects.requireNonNull(defaultValueSerializer);
         }
     }
 
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/WrappingNullableDeserializer.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/WrappingNullableDeserializer.java
index a57e9a1..d0c0b14 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/WrappingNullableDeserializer.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/WrappingNullableDeserializer.java
@@ -18,6 +18,6 @@ package org.apache.kafka.streams.kstream.internals;
 
 import org.apache.kafka.common.serialization.Deserializer;
 
-public interface WrappingNullableDeserializer<Outer, Inner> extends Deserializer<Outer> {
-    void setIfUnset(final Deserializer<Inner> defaultDeserializer);
-}
+public interface WrappingNullableDeserializer<Outer, InnerK, InnerV> extends Deserializer<Outer> {
+    void setIfUnset(final Deserializer<InnerK> defaultKeyDeserializer, final Deserializer<InnerV> defaultValueDeserializer);
+}
\ No newline at end of file
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/WrappingNullableSerializer.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/WrappingNullableSerializer.java
index 2d28e52..8854a8d 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/WrappingNullableSerializer.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/WrappingNullableSerializer.java
@@ -18,6 +18,6 @@ package org.apache.kafka.streams.kstream.internals;
 
 import org.apache.kafka.common.serialization.Serializer;
 
-public interface WrappingNullableSerializer<Outer, Inner> extends Serializer<Outer> {
-    void setIfUnset(final Serializer<Inner> defaultSerializer);
+public interface WrappingNullableSerializer<Outer, InnerK, InnerV> extends Serializer<Outer> {
+    void setIfUnset(final Serializer<InnerK> defaultKeySerializer, final Serializer<InnerV> defaultValueSerializer);
 }
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/foreignkeyjoin/SubscriptionResponseWrapperSerde.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/foreignkeyjoin/SubscriptionResponseWrapperSerde.java
index 31317c5..8619111 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/foreignkeyjoin/SubscriptionResponseWrapperSerde.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/foreignkeyjoin/SubscriptionResponseWrapperSerde.java
@@ -46,7 +46,7 @@ public class SubscriptionResponseWrapperSerde<V> implements Serde<SubscriptionRe
     }
 
     private static final class SubscriptionResponseWrapperSerializer<V>
-        implements Serializer<SubscriptionResponseWrapper<V>>, WrappingNullableSerializer<SubscriptionResponseWrapper<V>, V> {
+        implements Serializer<SubscriptionResponseWrapper<V>>, WrappingNullableSerializer<SubscriptionResponseWrapper<V>, Void, V> {
 
         private Serializer<V> serializer;
 
@@ -55,9 +55,9 @@ public class SubscriptionResponseWrapperSerde<V> implements Serde<SubscriptionRe
         }
 
         @Override
-        public void setIfUnset(final Serializer<V> defaultSerializer) {
+        public void setIfUnset(final Serializer<Void> defaultKeySerializer, final Serializer<V> defaultValueSerializer) {
             if (serializer == null) {
-                serializer = Objects.requireNonNull(defaultSerializer, "defaultSerializer cannot be null");
+                serializer = Objects.requireNonNull(defaultValueSerializer);
             }
         }
 
@@ -94,7 +94,7 @@ public class SubscriptionResponseWrapperSerde<V> implements Serde<SubscriptionRe
     }
 
     private static final class SubscriptionResponseWrapperDeserializer<V>
-        implements Deserializer<SubscriptionResponseWrapper<V>>, WrappingNullableDeserializer<SubscriptionResponseWrapper<V>, V> {
+        implements Deserializer<SubscriptionResponseWrapper<V>>, WrappingNullableDeserializer<SubscriptionResponseWrapper<V>, Void, V> {
 
         private Deserializer<V> deserializer;
 
@@ -103,9 +103,9 @@ public class SubscriptionResponseWrapperSerde<V> implements Serde<SubscriptionRe
         }
 
         @Override
-        public void setIfUnset(final Deserializer<V> defaultDeserializer) {
+        public void setIfUnset(final Deserializer<Void> defaultKeyDeserializer, final Deserializer<V> defaultValueDeserializer) {
             if (deserializer == null) {
-                deserializer = Objects.requireNonNull(defaultDeserializer, "defaultDeserializer cannot be null");
+                deserializer = Objects.requireNonNull(defaultValueDeserializer);
             }
         }
 
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/foreignkeyjoin/SubscriptionWrapperSerde.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/foreignkeyjoin/SubscriptionWrapperSerde.java
index 136128c..d2cc989 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/foreignkeyjoin/SubscriptionWrapperSerde.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/foreignkeyjoin/SubscriptionWrapperSerde.java
@@ -50,7 +50,7 @@ public class SubscriptionWrapperSerde<K> implements Serde<SubscriptionWrapper<K>
     }
 
     private static class SubscriptionWrapperSerializer<K>
-        implements Serializer<SubscriptionWrapper<K>>, WrappingNullableSerializer<SubscriptionWrapper<K>, K> {
+        implements Serializer<SubscriptionWrapper<K>>, WrappingNullableSerializer<SubscriptionWrapper<K>, K, Void> {
 
         private final Supplier<String> primaryKeySerializationPseudoTopicSupplier;
         private String primaryKeySerializationPseudoTopic = null;
@@ -63,9 +63,9 @@ public class SubscriptionWrapperSerde<K> implements Serde<SubscriptionWrapper<K>
         }
 
         @Override
-        public void setIfUnset(final Serializer<K> defaultSerializer) {
+        public void setIfUnset(final Serializer<K> defaultKeySerializer, final Serializer<Void> defaultValueSerializer) {
             if (primaryKeySerializer == null) {
-                primaryKeySerializer = Objects.requireNonNull(defaultSerializer, "defaultSerializer cannot be null");
+                primaryKeySerializer = Objects.requireNonNull(defaultKeySerializer);
             }
         }
 
@@ -110,7 +110,7 @@ public class SubscriptionWrapperSerde<K> implements Serde<SubscriptionWrapper<K>
     }
 
     private static class SubscriptionWrapperDeserializer<K>
-        implements Deserializer<SubscriptionWrapper<K>>, WrappingNullableDeserializer<SubscriptionWrapper<K>, K> {
+        implements Deserializer<SubscriptionWrapper<K>>, WrappingNullableDeserializer<SubscriptionWrapper<K>, K, Void> {
 
         private final Supplier<String> primaryKeySerializationPseudoTopicSupplier;
         private String primaryKeySerializationPseudoTopic = null;
@@ -123,9 +123,9 @@ public class SubscriptionWrapperSerde<K> implements Serde<SubscriptionWrapper<K>
         }
 
         @Override
-        public void setIfUnset(final Deserializer<K> defaultDeserializer) {
+        public void setIfUnset(final Deserializer<K> defaultKeyDeserializer, final Deserializer<Void> defaultValueDeserializer) {
             if (primaryKeyDeserializer == null) {
-                primaryKeyDeserializer = Objects.requireNonNull(defaultDeserializer, "defaultDeserializer cannot be null");
+                primaryKeyDeserializer = Objects.requireNonNull(defaultKeyDeserializer);
             }
         }
 
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/SinkNode.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/SinkNode.java
index e0f2510..9b0a254 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/SinkNode.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/SinkNode.java
@@ -66,10 +66,13 @@ public class SinkNode<K, V> extends ProcessorNode<K, V> {
             valSerializer = (Serializer<V>) context.valueSerde().serializer();
         }
 
-        // if value serializers are internal wrapping serializers that may need to be given the default serializer
+        // if serializers are internal wrapping serializers that may need to be given the default serializer
         // then pass it the default one from the context
         if (valSerializer instanceof WrappingNullableSerializer) {
-            ((WrappingNullableSerializer) valSerializer).setIfUnset(context.valueSerde().serializer());
+            ((WrappingNullableSerializer) valSerializer).setIfUnset(
+                context.keySerde().serializer(),
+                context.valueSerde().serializer()
+            );
         }
     }
 
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/SourceNode.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/SourceNode.java
index 39b8c0e..8508a7d 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/SourceNode.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/SourceNode.java
@@ -80,10 +80,13 @@ public class SourceNode<K, V> extends ProcessorNode<K, V> {
             this.valDeserializer = (Deserializer<V>) context.valueSerde().deserializer();
         }
 
-        // if value deserializers are internal wrapping deserializers that may need to be given the default
+        // if deserializers are internal wrapping deserializers that may need to be given the default
         // then pass it the default one from the context
         if (valDeserializer instanceof WrappingNullableDeserializer) {
-            ((WrappingNullableDeserializer) valDeserializer).setIfUnset(context.valueSerde().deserializer());
+            ((WrappingNullableDeserializer) valDeserializer).setIfUnset(
+                    context.keySerde().deserializer(),
+                    context.valueSerde().deserializer()
+            );
         }
     }
 
diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableKTableForeignKeyJoinScenarioTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableKTableForeignKeyJoinScenarioTest.java
index ab84e05..eb5a4cd 100644
--- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableKTableForeignKeyJoinScenarioTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableKTableForeignKeyJoinScenarioTest.java
@@ -16,6 +16,8 @@
  */
 package org.apache.kafka.streams.kstream.internals;
 
+import org.apache.kafka.common.serialization.IntegerDeserializer;
+import org.apache.kafka.common.serialization.IntegerSerializer;
 import org.apache.kafka.common.serialization.Serdes;
 import org.apache.kafka.common.serialization.StringDeserializer;
 import org.apache.kafka.common.serialization.StringSerializer;
@@ -61,17 +63,17 @@ public class KTableKTableForeignKeyJoinScenarioTest {
     @Test
     public void shouldWorkWithDefaultSerdes() {
         final StreamsBuilder builder = new StreamsBuilder();
-        final KTable<String, String> aTable = builder.table("A");
-        final KTable<String, String> bTable = builder.table("B");
+        final KTable<Integer, String> aTable = builder.table("A");
+        final KTable<Integer, String> bTable = builder.table("B");
 
-        final KTable<String, String> fkJoinResult = aTable.join(
+        final KTable<Integer, String> fkJoinResult = aTable.join(
             bTable,
-            value -> value.split("-")[0],
+            value -> Integer.parseInt(value.split("-")[0]),
             (aVal, bVal) -> "(" + aVal + "," + bVal + ")",
             Materialized.as("asdf")
         );
 
-        final KTable<String, String> finalJoinResult = aTable.join(
+        final KTable<Integer, String> finalJoinResult = aTable.join(
             fkJoinResult,
             (aVal, fkJoinVal) -> "(" + aVal + "," + fkJoinVal + ")"
         );
@@ -84,17 +86,17 @@ public class KTableKTableForeignKeyJoinScenarioTest {
     @Test
     public void shouldWorkWithDefaultAndConsumedSerdes() {
         final StreamsBuilder builder = new StreamsBuilder();
-        final KTable<String, String> aTable = builder.table("A", Consumed.with(Serdes.String(), Serdes.String()));
-        final KTable<String, String> bTable = builder.table("B");
+        final KTable<Integer, String> aTable = builder.table("A", Consumed.with(Serdes.Integer(), Serdes.String()));
+        final KTable<Integer, String> bTable = builder.table("B");
 
-        final KTable<String, String> fkJoinResult = aTable.join(
+        final KTable<Integer, String> fkJoinResult = aTable.join(
             bTable,
-            value -> value.split("-")[0],
+            value -> Integer.parseInt(value.split("-")[0]),
             (aVal, bVal) -> "(" + aVal + "," + bVal + ")",
             Materialized.as("asdf")
         );
 
-        final KTable<String, String> finalJoinResult = aTable.join(
+        final KTable<Integer, String> finalJoinResult = aTable.join(
             fkJoinResult,
             (aVal, fkJoinVal) -> "(" + aVal + "," + fkJoinVal + ")"
         );
@@ -107,20 +109,19 @@ public class KTableKTableForeignKeyJoinScenarioTest {
     @Test
     public void shouldWorkWithDefaultAndJoinResultSerdes() {
         final StreamsBuilder builder = new StreamsBuilder();
-        final KTable<String, String> aTable = builder.table("A");
-        final KTable<String, String> bTable = builder.table("B");
+        final KTable<Integer, String> aTable = builder.table("A");
+        final KTable<Integer, String> bTable = builder.table("B");
 
-        final KTable<String, String> fkJoinResult = aTable.join(
+        final KTable<Integer, String> fkJoinResult = aTable.join(
             bTable,
-            value -> value.split("-")[0],
+            value -> Integer.parseInt(value.split("-")[0]),
             (aVal, bVal) -> "(" + aVal + "," + bVal + ")",
-            Materialized
-                .<String, String, KeyValueStore<Bytes, byte[]>>as("asdf")
-                .withKeySerde(Serdes.String())
-                .withValueSerde(Serdes.String())
+            Materialized.<Integer, String, KeyValueStore<Bytes, byte[]>>as("asdf")
+                    .withKeySerde(Serdes.Integer())
+                    .withValueSerde(Serdes.String())
         );
 
-        final KTable<String, String> finalJoinResult = aTable.join(
+        final KTable<Integer, String> finalJoinResult = aTable.join(
             fkJoinResult,
             (aVal, fkJoinVal) -> "(" + aVal + "," + fkJoinVal + ")"
         );
@@ -133,20 +134,20 @@ public class KTableKTableForeignKeyJoinScenarioTest {
     @Test
     public void shouldWorkWithDefaultAndEquiJoinResultSerdes() {
         final StreamsBuilder builder = new StreamsBuilder();
-        final KTable<String, String> aTable = builder.table("A");
-        final KTable<String, String> bTable = builder.table("B");
+        final KTable<Integer, String> aTable = builder.table("A");
+        final KTable<Integer, String> bTable = builder.table("B");
 
-        final KTable<String, String> fkJoinResult = aTable.join(
+        final KTable<Integer, String> fkJoinResult = aTable.join(
             bTable,
-            value -> value.split("-")[0],
+            value -> Integer.parseInt(value.split("-")[0]),
             (aVal, bVal) -> "(" + aVal + "," + bVal + ")",
             Materialized.as("asdf")
         );
 
-        final KTable<String, String> finalJoinResult = aTable.join(
+        final KTable<Integer, String> finalJoinResult = aTable.join(
             fkJoinResult,
             (aVal, fkJoinVal) -> "(" + aVal + "," + fkJoinVal + ")",
-            Materialized.with(Serdes.String(), Serdes.String())
+            Materialized.with(Serdes.Integer(), Serdes.String())
         );
 
         finalJoinResult.toStream().to("output");
@@ -157,22 +158,22 @@ public class KTableKTableForeignKeyJoinScenarioTest {
     @Test
     public void shouldWorkWithDefaultAndProducedSerdes() {
         final StreamsBuilder builder = new StreamsBuilder();
-        final KTable<String, String> aTable = builder.table("A");
-        final KTable<String, String> bTable = builder.table("B");
+        final KTable<Integer, String> aTable = builder.table("A");
+        final KTable<Integer, String> bTable = builder.table("B");
 
-        final KTable<String, String> fkJoinResult = aTable.join(
+        final KTable<Integer, String> fkJoinResult = aTable.join(
             bTable,
-            value -> value.split("-")[0],
+            value -> Integer.parseInt(value.split("-")[0]),
             (aVal, bVal) -> "(" + aVal + "," + bVal + ")",
             Materialized.as("asdf")
         );
 
-        final KTable<String, String> finalJoinResult = aTable.join(
+        final KTable<Integer, String> finalJoinResult = aTable.join(
             fkJoinResult,
             (aVal, fkJoinVal) -> "(" + aVal + "," + fkJoinVal + ")"
         );
 
-        finalJoinResult.toStream().to("output", Produced.with(Serdes.String(), Serdes.String()));
+        finalJoinResult.toStream().to("output", Produced.with(Serdes.Integer(), Serdes.String()));
 
         validateTopologyCanProcessData(builder);
     }
@@ -189,20 +190,20 @@ public class KTableKTableForeignKeyJoinScenarioTest {
         final UniqueTopicSerdeScope serdeScope = new UniqueTopicSerdeScope();
         final StreamsBuilder builder = new StreamsBuilder();
 
-        final KTable<String, String> left = builder.table(
+        final KTable<Integer, String> left = builder.table(
             LEFT_TABLE,
-            Consumed.with(serdeScope.decorateSerde(Serdes.String(), streamsConfig, true),
-                          serdeScope.decorateSerde(Serdes.String(), streamsConfig, false))
+            Consumed.with(serdeScope.decorateSerde(Serdes.Integer(), streamsConfig, true),
+                        serdeScope.decorateSerde(Serdes.String(), streamsConfig, false))
         );
-        final KTable<String, String> right = builder.table(
-            RIGHT_TABLE,
-            Consumed.with(serdeScope.decorateSerde(Serdes.String(), streamsConfig, true),
-                          serdeScope.decorateSerde(Serdes.String(), streamsConfig, false))
+        final KTable<Integer, String> right = builder.table(
+                RIGHT_TABLE,
+                Consumed.with(serdeScope.decorateSerde(Serdes.Integer(), streamsConfig, true),
+                              serdeScope.decorateSerde(Serdes.String(), streamsConfig, false))
         );
 
         left.join(
             right,
-            value -> value.split("\\|")[1],
+            value -> Integer.parseInt(value.split("\\|")[1]),
             (value1, value2) -> "(" + value1 + "," + value2 + ")",
             Materialized.with(null, serdeScope.decorateSerde(Serdes.String(), streamsConfig, false)
             ))
@@ -212,10 +213,10 @@ public class KTableKTableForeignKeyJoinScenarioTest {
 
         final Topology topology = builder.build(streamsConfig);
         try (final TopologyTestDriver driver = new TopologyTestDriver(topology, streamsConfig)) {
-            final TestInputTopic<String, String> leftInput = driver.createInputTopic(LEFT_TABLE, new StringSerializer(), new StringSerializer());
-            final TestInputTopic<String, String> rightInput = driver.createInputTopic(RIGHT_TABLE, new StringSerializer(), new StringSerializer());
-            leftInput.pipeInput("lhs1", "lhsValue1|rhs1");
-            rightInput.pipeInput("rhs1", "rhsValue1");
+            final TestInputTopic<Integer, String> leftInput = driver.createInputTopic(LEFT_TABLE, new IntegerSerializer(), new StringSerializer());
+            final TestInputTopic<Integer, String> rightInput = driver.createInputTopic(RIGHT_TABLE, new IntegerSerializer(), new StringSerializer());
+            leftInput.pipeInput(2, "lhsValue1|1");
+            rightInput.pipeInput(1, "rhsValue1");
         }
         // verifying primarily that no extra pseudo-topics were used, but it's nice to also verify the rest of the
         // topics our serdes serialize data for
@@ -243,17 +244,17 @@ public class KTableKTableForeignKeyJoinScenarioTest {
         final String safeTestName = safeUniqueTestName(getClass(), testName);
         config.setProperty(StreamsConfig.APPLICATION_ID_CONFIG, "dummy-" + safeTestName);
         config.setProperty(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "dummy");
-        config.setProperty(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.StringSerde.class.getName());
+        config.setProperty(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.IntegerSerde.class.getName());
         config.setProperty(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.StringSerde.class.getName());
         config.setProperty(StreamsConfig.STATE_DIR_CONFIG, TestUtils.tempDirectory().getAbsolutePath());
         try (final TopologyTestDriver topologyTestDriver = new TopologyTestDriver(builder.build(), config)) {
-            final TestInputTopic<String, String> aTopic = topologyTestDriver.createInputTopic("A", new StringSerializer(), new StringSerializer());
-            final TestInputTopic<String, String> bTopic = topologyTestDriver.createInputTopic("B", new StringSerializer(), new StringSerializer());
-            final TestOutputTopic<String, String> output = topologyTestDriver.createOutputTopic("output", new StringDeserializer(), new StringDeserializer());
-            aTopic.pipeInput("a1", "b1-alpha");
-            bTopic.pipeInput("b1", "beta");
-            final Map<String, String> x = output.readKeyValuesToMap();
-            assertThat(x, is(Collections.singletonMap("a1", "(b1-alpha,(b1-alpha,beta))")));
+            final TestInputTopic<Integer, String> aTopic = topologyTestDriver.createInputTopic("A", new IntegerSerializer(), new StringSerializer());
+            final TestInputTopic<Integer, String> bTopic = topologyTestDriver.createInputTopic("B", new IntegerSerializer(), new StringSerializer());
+            final TestOutputTopic<Integer, String> output = topologyTestDriver.createOutputTopic("output", new IntegerDeserializer(), new StringDeserializer());
+            aTopic.pipeInput(1, "999-alpha");
+            bTopic.pipeInput(999, "beta");
+            final Map<Integer, String> x = output.readKeyValuesToMap();
+            assertThat(x, is(Collections.singletonMap(1, "(999-alpha,(999-alpha,beta))")));
         }
     }
 }