You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by vv...@apache.org on 2020/06/12 16:19:44 UTC
[kafka] branch 2.6 updated: KAFKA-10049: Fixed FKJ bug where
wrapped serdes are set incorrectly when using default StreamsConfig serdes
(#8764)
This is an automated email from the ASF dual-hosted git repository.
vvcephei pushed a commit to branch 2.6
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/2.6 by this push:
new a98c007 KAFKA-10049: Fixed FKJ bug where wrapped serdes are set incorrectly when using default StreamsConfig serdes (#8764)
a98c007 is described below
commit a98c0077f83a44bfed5a5b521af54684ea7b3c65
Author: Adam Bellemare <ad...@shopify.com>
AuthorDate: Fri Jun 12 11:00:38 2020 -0400
KAFKA-10049: Fixed FKJ bug where wrapped serdes are set incorrectly when using default StreamsConfig serdes (#8764)
Bug Details:
Mistakenly setting the value serde to the key serde for an internal wrapped serde in the FKJ workflow.
Testing:
Modified the existing test to reproduce the issue, then verified that the test passes.
Reviewers: Guozhang Wang <wa...@gmail.com>, John Roesler <vv...@apache.org>
---
checkstyle/import-control.xml | 1 +
.../kstream/internals/ChangedDeserializer.java | 6 +-
.../kstream/internals/ChangedSerializer.java | 6 +-
.../internals/WrappingNullableDeserializer.java | 6 +-
.../internals/WrappingNullableSerializer.java | 4 +-
.../SubscriptionResponseWrapperSerde.java | 12 +--
.../foreignkeyjoin/SubscriptionWrapperSerde.java | 12 +--
.../streams/processor/internals/SinkNode.java | 7 +-
.../streams/processor/internals/SourceNode.java | 7 +-
.../KTableKTableForeignKeyJoinScenarioTest.java | 103 +++++++++++----------
10 files changed, 86 insertions(+), 78 deletions(-)
diff --git a/checkstyle/import-control.xml b/checkstyle/import-control.xml
index 7119265..8c3cb286 100644
--- a/checkstyle/import-control.xml
+++ b/checkstyle/import-control.xml
@@ -261,6 +261,7 @@
<allow pkg="kafka.log" />
<allow pkg="scala" />
<allow class="kafka.zk.EmbeddedZookeeper"/>
+ <allow pkg="com.fasterxml.jackson" />
</subpackage>
<subpackage name="test">
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/ChangedDeserializer.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/ChangedDeserializer.java
index 90d5882..433a18d 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/ChangedDeserializer.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/ChangedDeserializer.java
@@ -22,7 +22,7 @@ import org.apache.kafka.common.serialization.Deserializer;
import java.nio.ByteBuffer;
import java.util.Objects;
-public class ChangedDeserializer<T> implements Deserializer<Change<T>>, WrappingNullableDeserializer<Change<T>, T> {
+public class ChangedDeserializer<T> implements Deserializer<Change<T>>, WrappingNullableDeserializer<Change<T>, Void, T> {
private static final int NEWFLAG_SIZE = 1;
@@ -37,9 +37,9 @@ public class ChangedDeserializer<T> implements Deserializer<Change<T>>, Wrapping
}
@Override
- public void setIfUnset(final Deserializer<T> defaultDeserializer) {
+ public void setIfUnset(final Deserializer<Void> defaultKeyDeserializer, final Deserializer<T> defaultValueDeserializer) {
if (inner == null) {
- inner = Objects.requireNonNull(defaultDeserializer, "defaultDeserializer cannot be null");
+ inner = Objects.requireNonNull(defaultValueDeserializer);
}
}
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/ChangedSerializer.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/ChangedSerializer.java
index 551d948..f5d63cd 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/ChangedSerializer.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/ChangedSerializer.java
@@ -23,7 +23,7 @@ import org.apache.kafka.streams.errors.StreamsException;
import java.nio.ByteBuffer;
import java.util.Objects;
-public class ChangedSerializer<T> implements Serializer<Change<T>>, WrappingNullableSerializer<Change<T>, T> {
+public class ChangedSerializer<T> implements Serializer<Change<T>>, WrappingNullableSerializer<Change<T>, Void, T> {
private static final int NEWFLAG_SIZE = 1;
@@ -38,9 +38,9 @@ public class ChangedSerializer<T> implements Serializer<Change<T>>, WrappingNull
}
@Override
- public void setIfUnset(final Serializer<T> defaultSerializer) {
+ public void setIfUnset(final Serializer<Void> defaultKeySerializer, final Serializer<T> defaultValueSerializer) {
if (inner == null) {
- inner = Objects.requireNonNull(defaultSerializer, "defaultSerializer cannot be null");
+ inner = Objects.requireNonNull(defaultValueSerializer);
}
}
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/WrappingNullableDeserializer.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/WrappingNullableDeserializer.java
index a57e9a1..d0c0b14 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/WrappingNullableDeserializer.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/WrappingNullableDeserializer.java
@@ -18,6 +18,6 @@ package org.apache.kafka.streams.kstream.internals;
import org.apache.kafka.common.serialization.Deserializer;
-public interface WrappingNullableDeserializer<Outer, Inner> extends Deserializer<Outer> {
- void setIfUnset(final Deserializer<Inner> defaultDeserializer);
-}
+public interface WrappingNullableDeserializer<Outer, InnerK, InnerV> extends Deserializer<Outer> {
+ void setIfUnset(final Deserializer<InnerK> defaultKeyDeserializer, final Deserializer<InnerV> defaultValueDeserializer);
+}
\ No newline at end of file
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/WrappingNullableSerializer.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/WrappingNullableSerializer.java
index 2d28e52..8854a8d 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/WrappingNullableSerializer.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/WrappingNullableSerializer.java
@@ -18,6 +18,6 @@ package org.apache.kafka.streams.kstream.internals;
import org.apache.kafka.common.serialization.Serializer;
-public interface WrappingNullableSerializer<Outer, Inner> extends Serializer<Outer> {
- void setIfUnset(final Serializer<Inner> defaultSerializer);
+public interface WrappingNullableSerializer<Outer, InnerK, InnerV> extends Serializer<Outer> {
+ void setIfUnset(final Serializer<InnerK> defaultKeySerializer, final Serializer<InnerV> defaultValueSerializer);
}
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/foreignkeyjoin/SubscriptionResponseWrapperSerde.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/foreignkeyjoin/SubscriptionResponseWrapperSerde.java
index 31317c5..8619111 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/foreignkeyjoin/SubscriptionResponseWrapperSerde.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/foreignkeyjoin/SubscriptionResponseWrapperSerde.java
@@ -46,7 +46,7 @@ public class SubscriptionResponseWrapperSerde<V> implements Serde<SubscriptionRe
}
private static final class SubscriptionResponseWrapperSerializer<V>
- implements Serializer<SubscriptionResponseWrapper<V>>, WrappingNullableSerializer<SubscriptionResponseWrapper<V>, V> {
+ implements Serializer<SubscriptionResponseWrapper<V>>, WrappingNullableSerializer<SubscriptionResponseWrapper<V>, Void, V> {
private Serializer<V> serializer;
@@ -55,9 +55,9 @@ public class SubscriptionResponseWrapperSerde<V> implements Serde<SubscriptionRe
}
@Override
- public void setIfUnset(final Serializer<V> defaultSerializer) {
+ public void setIfUnset(final Serializer<Void> defaultKeySerializer, final Serializer<V> defaultValueSerializer) {
if (serializer == null) {
- serializer = Objects.requireNonNull(defaultSerializer, "defaultSerializer cannot be null");
+ serializer = Objects.requireNonNull(defaultValueSerializer);
}
}
@@ -94,7 +94,7 @@ public class SubscriptionResponseWrapperSerde<V> implements Serde<SubscriptionRe
}
private static final class SubscriptionResponseWrapperDeserializer<V>
- implements Deserializer<SubscriptionResponseWrapper<V>>, WrappingNullableDeserializer<SubscriptionResponseWrapper<V>, V> {
+ implements Deserializer<SubscriptionResponseWrapper<V>>, WrappingNullableDeserializer<SubscriptionResponseWrapper<V>, Void, V> {
private Deserializer<V> deserializer;
@@ -103,9 +103,9 @@ public class SubscriptionResponseWrapperSerde<V> implements Serde<SubscriptionRe
}
@Override
- public void setIfUnset(final Deserializer<V> defaultDeserializer) {
+ public void setIfUnset(final Deserializer<Void> defaultKeyDeserializer, final Deserializer<V> defaultValueDeserializer) {
if (deserializer == null) {
- deserializer = Objects.requireNonNull(defaultDeserializer, "defaultDeserializer cannot be null");
+ deserializer = Objects.requireNonNull(defaultValueDeserializer);
}
}
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/foreignkeyjoin/SubscriptionWrapperSerde.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/foreignkeyjoin/SubscriptionWrapperSerde.java
index 136128c..d2cc989 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/foreignkeyjoin/SubscriptionWrapperSerde.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/foreignkeyjoin/SubscriptionWrapperSerde.java
@@ -50,7 +50,7 @@ public class SubscriptionWrapperSerde<K> implements Serde<SubscriptionWrapper<K>
}
private static class SubscriptionWrapperSerializer<K>
- implements Serializer<SubscriptionWrapper<K>>, WrappingNullableSerializer<SubscriptionWrapper<K>, K> {
+ implements Serializer<SubscriptionWrapper<K>>, WrappingNullableSerializer<SubscriptionWrapper<K>, K, Void> {
private final Supplier<String> primaryKeySerializationPseudoTopicSupplier;
private String primaryKeySerializationPseudoTopic = null;
@@ -63,9 +63,9 @@ public class SubscriptionWrapperSerde<K> implements Serde<SubscriptionWrapper<K>
}
@Override
- public void setIfUnset(final Serializer<K> defaultSerializer) {
+ public void setIfUnset(final Serializer<K> defaultKeySerializer, final Serializer<Void> defaultValueSerializer) {
if (primaryKeySerializer == null) {
- primaryKeySerializer = Objects.requireNonNull(defaultSerializer, "defaultSerializer cannot be null");
+ primaryKeySerializer = Objects.requireNonNull(defaultKeySerializer);
}
}
@@ -110,7 +110,7 @@ public class SubscriptionWrapperSerde<K> implements Serde<SubscriptionWrapper<K>
}
private static class SubscriptionWrapperDeserializer<K>
- implements Deserializer<SubscriptionWrapper<K>>, WrappingNullableDeserializer<SubscriptionWrapper<K>, K> {
+ implements Deserializer<SubscriptionWrapper<K>>, WrappingNullableDeserializer<SubscriptionWrapper<K>, K, Void> {
private final Supplier<String> primaryKeySerializationPseudoTopicSupplier;
private String primaryKeySerializationPseudoTopic = null;
@@ -123,9 +123,9 @@ public class SubscriptionWrapperSerde<K> implements Serde<SubscriptionWrapper<K>
}
@Override
- public void setIfUnset(final Deserializer<K> defaultDeserializer) {
+ public void setIfUnset(final Deserializer<K> defaultKeyDeserializer, final Deserializer<Void> defaultValueDeserializer) {
if (primaryKeyDeserializer == null) {
- primaryKeyDeserializer = Objects.requireNonNull(defaultDeserializer, "defaultDeserializer cannot be null");
+ primaryKeyDeserializer = Objects.requireNonNull(defaultKeyDeserializer);
}
}
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/SinkNode.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/SinkNode.java
index e0f2510..9b0a254 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/SinkNode.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/SinkNode.java
@@ -66,10 +66,13 @@ public class SinkNode<K, V> extends ProcessorNode<K, V> {
valSerializer = (Serializer<V>) context.valueSerde().serializer();
}
- // if value serializers are internal wrapping serializers that may need to be given the default serializer
+ // if serializers are internal wrapping serializers that may need to be given the default serializer
// then pass it the default one from the context
if (valSerializer instanceof WrappingNullableSerializer) {
- ((WrappingNullableSerializer) valSerializer).setIfUnset(context.valueSerde().serializer());
+ ((WrappingNullableSerializer) valSerializer).setIfUnset(
+ context.keySerde().serializer(),
+ context.valueSerde().serializer()
+ );
}
}
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/SourceNode.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/SourceNode.java
index 39b8c0e..8508a7d 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/SourceNode.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/SourceNode.java
@@ -80,10 +80,13 @@ public class SourceNode<K, V> extends ProcessorNode<K, V> {
this.valDeserializer = (Deserializer<V>) context.valueSerde().deserializer();
}
- // if value deserializers are internal wrapping deserializers that may need to be given the default
+ // if deserializers are internal wrapping deserializers that may need to be given the default
// then pass it the default one from the context
if (valDeserializer instanceof WrappingNullableDeserializer) {
- ((WrappingNullableDeserializer) valDeserializer).setIfUnset(context.valueSerde().deserializer());
+ ((WrappingNullableDeserializer) valDeserializer).setIfUnset(
+ context.keySerde().deserializer(),
+ context.valueSerde().deserializer()
+ );
}
}
diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableKTableForeignKeyJoinScenarioTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableKTableForeignKeyJoinScenarioTest.java
index ab84e05..eb5a4cd 100644
--- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableKTableForeignKeyJoinScenarioTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableKTableForeignKeyJoinScenarioTest.java
@@ -16,6 +16,8 @@
*/
package org.apache.kafka.streams.kstream.internals;
+import org.apache.kafka.common.serialization.IntegerDeserializer;
+import org.apache.kafka.common.serialization.IntegerSerializer;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.apache.kafka.common.serialization.StringSerializer;
@@ -61,17 +63,17 @@ public class KTableKTableForeignKeyJoinScenarioTest {
@Test
public void shouldWorkWithDefaultSerdes() {
final StreamsBuilder builder = new StreamsBuilder();
- final KTable<String, String> aTable = builder.table("A");
- final KTable<String, String> bTable = builder.table("B");
+ final KTable<Integer, String> aTable = builder.table("A");
+ final KTable<Integer, String> bTable = builder.table("B");
- final KTable<String, String> fkJoinResult = aTable.join(
+ final KTable<Integer, String> fkJoinResult = aTable.join(
bTable,
- value -> value.split("-")[0],
+ value -> Integer.parseInt(value.split("-")[0]),
(aVal, bVal) -> "(" + aVal + "," + bVal + ")",
Materialized.as("asdf")
);
- final KTable<String, String> finalJoinResult = aTable.join(
+ final KTable<Integer, String> finalJoinResult = aTable.join(
fkJoinResult,
(aVal, fkJoinVal) -> "(" + aVal + "," + fkJoinVal + ")"
);
@@ -84,17 +86,17 @@ public class KTableKTableForeignKeyJoinScenarioTest {
@Test
public void shouldWorkWithDefaultAndConsumedSerdes() {
final StreamsBuilder builder = new StreamsBuilder();
- final KTable<String, String> aTable = builder.table("A", Consumed.with(Serdes.String(), Serdes.String()));
- final KTable<String, String> bTable = builder.table("B");
+ final KTable<Integer, String> aTable = builder.table("A", Consumed.with(Serdes.Integer(), Serdes.String()));
+ final KTable<Integer, String> bTable = builder.table("B");
- final KTable<String, String> fkJoinResult = aTable.join(
+ final KTable<Integer, String> fkJoinResult = aTable.join(
bTable,
- value -> value.split("-")[0],
+ value -> Integer.parseInt(value.split("-")[0]),
(aVal, bVal) -> "(" + aVal + "," + bVal + ")",
Materialized.as("asdf")
);
- final KTable<String, String> finalJoinResult = aTable.join(
+ final KTable<Integer, String> finalJoinResult = aTable.join(
fkJoinResult,
(aVal, fkJoinVal) -> "(" + aVal + "," + fkJoinVal + ")"
);
@@ -107,20 +109,19 @@ public class KTableKTableForeignKeyJoinScenarioTest {
@Test
public void shouldWorkWithDefaultAndJoinResultSerdes() {
final StreamsBuilder builder = new StreamsBuilder();
- final KTable<String, String> aTable = builder.table("A");
- final KTable<String, String> bTable = builder.table("B");
+ final KTable<Integer, String> aTable = builder.table("A");
+ final KTable<Integer, String> bTable = builder.table("B");
- final KTable<String, String> fkJoinResult = aTable.join(
+ final KTable<Integer, String> fkJoinResult = aTable.join(
bTable,
- value -> value.split("-")[0],
+ value -> Integer.parseInt(value.split("-")[0]),
(aVal, bVal) -> "(" + aVal + "," + bVal + ")",
- Materialized
- .<String, String, KeyValueStore<Bytes, byte[]>>as("asdf")
- .withKeySerde(Serdes.String())
- .withValueSerde(Serdes.String())
+ Materialized.<Integer, String, KeyValueStore<Bytes, byte[]>>as("asdf")
+ .withKeySerde(Serdes.Integer())
+ .withValueSerde(Serdes.String())
);
- final KTable<String, String> finalJoinResult = aTable.join(
+ final KTable<Integer, String> finalJoinResult = aTable.join(
fkJoinResult,
(aVal, fkJoinVal) -> "(" + aVal + "," + fkJoinVal + ")"
);
@@ -133,20 +134,20 @@ public class KTableKTableForeignKeyJoinScenarioTest {
@Test
public void shouldWorkWithDefaultAndEquiJoinResultSerdes() {
final StreamsBuilder builder = new StreamsBuilder();
- final KTable<String, String> aTable = builder.table("A");
- final KTable<String, String> bTable = builder.table("B");
+ final KTable<Integer, String> aTable = builder.table("A");
+ final KTable<Integer, String> bTable = builder.table("B");
- final KTable<String, String> fkJoinResult = aTable.join(
+ final KTable<Integer, String> fkJoinResult = aTable.join(
bTable,
- value -> value.split("-")[0],
+ value -> Integer.parseInt(value.split("-")[0]),
(aVal, bVal) -> "(" + aVal + "," + bVal + ")",
Materialized.as("asdf")
);
- final KTable<String, String> finalJoinResult = aTable.join(
+ final KTable<Integer, String> finalJoinResult = aTable.join(
fkJoinResult,
(aVal, fkJoinVal) -> "(" + aVal + "," + fkJoinVal + ")",
- Materialized.with(Serdes.String(), Serdes.String())
+ Materialized.with(Serdes.Integer(), Serdes.String())
);
finalJoinResult.toStream().to("output");
@@ -157,22 +158,22 @@ public class KTableKTableForeignKeyJoinScenarioTest {
@Test
public void shouldWorkWithDefaultAndProducedSerdes() {
final StreamsBuilder builder = new StreamsBuilder();
- final KTable<String, String> aTable = builder.table("A");
- final KTable<String, String> bTable = builder.table("B");
+ final KTable<Integer, String> aTable = builder.table("A");
+ final KTable<Integer, String> bTable = builder.table("B");
- final KTable<String, String> fkJoinResult = aTable.join(
+ final KTable<Integer, String> fkJoinResult = aTable.join(
bTable,
- value -> value.split("-")[0],
+ value -> Integer.parseInt(value.split("-")[0]),
(aVal, bVal) -> "(" + aVal + "," + bVal + ")",
Materialized.as("asdf")
);
- final KTable<String, String> finalJoinResult = aTable.join(
+ final KTable<Integer, String> finalJoinResult = aTable.join(
fkJoinResult,
(aVal, fkJoinVal) -> "(" + aVal + "," + fkJoinVal + ")"
);
- finalJoinResult.toStream().to("output", Produced.with(Serdes.String(), Serdes.String()));
+ finalJoinResult.toStream().to("output", Produced.with(Serdes.Integer(), Serdes.String()));
validateTopologyCanProcessData(builder);
}
@@ -189,20 +190,20 @@ public class KTableKTableForeignKeyJoinScenarioTest {
final UniqueTopicSerdeScope serdeScope = new UniqueTopicSerdeScope();
final StreamsBuilder builder = new StreamsBuilder();
- final KTable<String, String> left = builder.table(
+ final KTable<Integer, String> left = builder.table(
LEFT_TABLE,
- Consumed.with(serdeScope.decorateSerde(Serdes.String(), streamsConfig, true),
- serdeScope.decorateSerde(Serdes.String(), streamsConfig, false))
+ Consumed.with(serdeScope.decorateSerde(Serdes.Integer(), streamsConfig, true),
+ serdeScope.decorateSerde(Serdes.String(), streamsConfig, false))
);
- final KTable<String, String> right = builder.table(
- RIGHT_TABLE,
- Consumed.with(serdeScope.decorateSerde(Serdes.String(), streamsConfig, true),
- serdeScope.decorateSerde(Serdes.String(), streamsConfig, false))
+ final KTable<Integer, String> right = builder.table(
+ RIGHT_TABLE,
+ Consumed.with(serdeScope.decorateSerde(Serdes.Integer(), streamsConfig, true),
+ serdeScope.decorateSerde(Serdes.String(), streamsConfig, false))
);
left.join(
right,
- value -> value.split("\\|")[1],
+ value -> Integer.parseInt(value.split("\\|")[1]),
(value1, value2) -> "(" + value1 + "," + value2 + ")",
Materialized.with(null, serdeScope.decorateSerde(Serdes.String(), streamsConfig, false)
))
@@ -212,10 +213,10 @@ public class KTableKTableForeignKeyJoinScenarioTest {
final Topology topology = builder.build(streamsConfig);
try (final TopologyTestDriver driver = new TopologyTestDriver(topology, streamsConfig)) {
- final TestInputTopic<String, String> leftInput = driver.createInputTopic(LEFT_TABLE, new StringSerializer(), new StringSerializer());
- final TestInputTopic<String, String> rightInput = driver.createInputTopic(RIGHT_TABLE, new StringSerializer(), new StringSerializer());
- leftInput.pipeInput("lhs1", "lhsValue1|rhs1");
- rightInput.pipeInput("rhs1", "rhsValue1");
+ final TestInputTopic<Integer, String> leftInput = driver.createInputTopic(LEFT_TABLE, new IntegerSerializer(), new StringSerializer());
+ final TestInputTopic<Integer, String> rightInput = driver.createInputTopic(RIGHT_TABLE, new IntegerSerializer(), new StringSerializer());
+ leftInput.pipeInput(2, "lhsValue1|1");
+ rightInput.pipeInput(1, "rhsValue1");
}
// verifying primarily that no extra pseudo-topics were used, but it's nice to also verify the rest of the
// topics our serdes serialize data for
@@ -243,17 +244,17 @@ public class KTableKTableForeignKeyJoinScenarioTest {
final String safeTestName = safeUniqueTestName(getClass(), testName);
config.setProperty(StreamsConfig.APPLICATION_ID_CONFIG, "dummy-" + safeTestName);
config.setProperty(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "dummy");
- config.setProperty(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.StringSerde.class.getName());
+ config.setProperty(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.IntegerSerde.class.getName());
config.setProperty(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.StringSerde.class.getName());
config.setProperty(StreamsConfig.STATE_DIR_CONFIG, TestUtils.tempDirectory().getAbsolutePath());
try (final TopologyTestDriver topologyTestDriver = new TopologyTestDriver(builder.build(), config)) {
- final TestInputTopic<String, String> aTopic = topologyTestDriver.createInputTopic("A", new StringSerializer(), new StringSerializer());
- final TestInputTopic<String, String> bTopic = topologyTestDriver.createInputTopic("B", new StringSerializer(), new StringSerializer());
- final TestOutputTopic<String, String> output = topologyTestDriver.createOutputTopic("output", new StringDeserializer(), new StringDeserializer());
- aTopic.pipeInput("a1", "b1-alpha");
- bTopic.pipeInput("b1", "beta");
- final Map<String, String> x = output.readKeyValuesToMap();
- assertThat(x, is(Collections.singletonMap("a1", "(b1-alpha,(b1-alpha,beta))")));
+ final TestInputTopic<Integer, String> aTopic = topologyTestDriver.createInputTopic("A", new IntegerSerializer(), new StringSerializer());
+ final TestInputTopic<Integer, String> bTopic = topologyTestDriver.createInputTopic("B", new IntegerSerializer(), new StringSerializer());
+ final TestOutputTopic<Integer, String> output = topologyTestDriver.createOutputTopic("output", new IntegerDeserializer(), new StringDeserializer());
+ aTopic.pipeInput(1, "999-alpha");
+ bTopic.pipeInput(999, "beta");
+ final Map<Integer, String> x = output.readKeyValuesToMap();
+ assertThat(x, is(Collections.singletonMap(1, "(999-alpha,(999-alpha,beta))")));
}
}
}