You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by vv...@apache.org on 2020/02/10 21:55:40 UTC
[kafka] branch 2.5 updated: KAFKA-9517: Fix default serdes with FK
join (#8061)
This is an automated email from the ASF dual-hosted git repository.
vvcephei pushed a commit to branch 2.5
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/2.5 by this push:
new 661a660 KAFKA-9517: Fix default serdes with FK join (#8061)
661a660 is described below
commit 661a660cf37be25f8795665e0de74be13bdf44a4
Author: John Roesler <vv...@users.noreply.github.com>
AuthorDate: Mon Feb 10 15:33:23 2020 -0600
KAFKA-9517: Fix default serdes with FK join (#8061)
During the KIP-213 implementation and verification, we neglected to test the
code path for falling back to default serdes if none are given in the topology.
Cherry-pick of 520a76155c66486cbcd0c519ef2980359964fd7c from trunk
Reviewer: Bill Bejeck <bb...@gmail.com>
---
.../kstream/internals/ChangedDeserializer.java | 10 +-
.../kstream/internals/ChangedSerializer.java | 10 +-
.../streams/kstream/internals/KTableImpl.java | 4 +-
.../internals/WrappingNullableDeserializer.java | 23 +++
.../internals/WrappingNullableSerializer.java | 23 +++
.../foreignkeyjoin/CombinedKeySchema.java | 8 +-
...reignJoinSubscriptionSendProcessorSupplier.java | 5 +-
.../SubscriptionResolverJoinProcessorSupplier.java | 11 +-
.../SubscriptionResponseWrapperSerde.java | 33 +++-
.../SubscriptionStoreReceiveProcessorSupplier.java | 2 +
.../foreignkeyjoin/SubscriptionWrapperSerde.java | 35 +++-
.../streams/processor/internals/SinkNode.java | 10 +-
.../streams/processor/internals/SourceNode.java | 10 +-
.../streams/integration/ForeignKeyJoinSuite.java | 1 +
...KTableKTableForeignKeyJoinDefaultSerdeTest.java | 178 +++++++++++++++++++++
.../internals/StreamsPartitionAssignorTest.java | 1 -
16 files changed, 325 insertions(+), 39 deletions(-)
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/ChangedDeserializer.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/ChangedDeserializer.java
index 36f77b8..90d5882 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/ChangedDeserializer.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/ChangedDeserializer.java
@@ -20,8 +20,9 @@ import org.apache.kafka.common.header.Headers;
import org.apache.kafka.common.serialization.Deserializer;
import java.nio.ByteBuffer;
+import java.util.Objects;
-public class ChangedDeserializer<T> implements Deserializer<Change<T>> {
+public class ChangedDeserializer<T> implements Deserializer<Change<T>>, WrappingNullableDeserializer<Change<T>, T> {
private static final int NEWFLAG_SIZE = 1;
@@ -35,8 +36,11 @@ public class ChangedDeserializer<T> implements Deserializer<Change<T>> {
return inner;
}
- public void setInner(final Deserializer<T> inner) {
- this.inner = inner;
+ @Override
+ public void setIfUnset(final Deserializer<T> defaultDeserializer) {
+ if (inner == null) {
+ inner = Objects.requireNonNull(defaultDeserializer, "defaultDeserializer cannot be null");
+ }
}
@Override
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/ChangedSerializer.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/ChangedSerializer.java
index bfd0afa..551d948 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/ChangedSerializer.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/ChangedSerializer.java
@@ -21,8 +21,9 @@ import org.apache.kafka.common.serialization.Serializer;
import org.apache.kafka.streams.errors.StreamsException;
import java.nio.ByteBuffer;
+import java.util.Objects;
-public class ChangedSerializer<T> implements Serializer<Change<T>> {
+public class ChangedSerializer<T> implements Serializer<Change<T>>, WrappingNullableSerializer<Change<T>, T> {
private static final int NEWFLAG_SIZE = 1;
@@ -36,8 +37,11 @@ public class ChangedSerializer<T> implements Serializer<Change<T>> {
return inner;
}
- public void setInner(final Serializer<T> inner) {
- this.inner = inner;
+ @Override
+ public void setIfUnset(final Serializer<T> defaultSerializer) {
+ if (inner == null) {
+ inner = Objects.requireNonNull(defaultSerializer, "defaultSerializer cannot be null");
+ }
}
/**
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableImpl.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableImpl.java
index 38e5c49..f5c5aec 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableImpl.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableImpl.java
@@ -970,7 +970,7 @@ public class KTableImpl<K, S, V> extends AbstractStream<K, V> implements KTable<
foreignKeyExtractor,
foreignKeySerde,
subscriptionTopicName,
- valSerde.serializer(),
+ valSerde == null ? null : valSerde.serializer(),
leftJoin
),
renamed.suffixWithOrElseGet("-subscription-registration-processor", builder, SUBSCRIPTION_REGISTRATION)
@@ -1073,7 +1073,7 @@ public class KTableImpl<K, S, V> extends AbstractStream<K, V> implements KTable<
final KTableValueGetterSupplier<K, V> primaryKeyValueGetter = valueGetterSupplier();
final SubscriptionResolverJoinProcessorSupplier<K, V, VO, VR> resolverProcessorSupplier = new SubscriptionResolverJoinProcessorSupplier<>(
primaryKeyValueGetter,
- valueSerde().serializer(),
+ valSerde == null ? null : valSerde.serializer(),
joiner,
leftJoin
);
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/WrappingNullableDeserializer.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/WrappingNullableDeserializer.java
new file mode 100644
index 0000000..a57e9a1
--- /dev/null
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/WrappingNullableDeserializer.java
@@ -0,0 +1,23 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.kstream.internals;
+
+import org.apache.kafka.common.serialization.Deserializer;
+
+public interface WrappingNullableDeserializer<Outer, Inner> extends Deserializer<Outer> {
+ void setIfUnset(final Deserializer<Inner> defaultDeserializer);
+}
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/WrappingNullableSerializer.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/WrappingNullableSerializer.java
new file mode 100644
index 0000000..2d28e52
--- /dev/null
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/WrappingNullableSerializer.java
@@ -0,0 +1,23 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.kstream.internals;
+
+import org.apache.kafka.common.serialization.Serializer;
+
+public interface WrappingNullableSerializer<Outer, Inner> extends Serializer<Outer> {
+ void setIfUnset(final Serializer<Inner> defaultSerializer);
+}
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/foreignkeyjoin/CombinedKeySchema.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/foreignkeyjoin/CombinedKeySchema.java
index 8abe583..7cda404 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/foreignkeyjoin/CombinedKeySchema.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/foreignkeyjoin/CombinedKeySchema.java
@@ -36,10 +36,10 @@ public class CombinedKeySchema<KO, K> {
public CombinedKeySchema(final String serdeTopic, final Serde<KO> foreignKeySerde, final Serde<K> primaryKeySerde) {
this.serdeTopic = serdeTopic;
- primaryKeySerializer = primaryKeySerde.serializer();
- primaryKeyDeserializer = primaryKeySerde.deserializer();
- foreignKeyDeserializer = foreignKeySerde.deserializer();
- foreignKeySerializer = foreignKeySerde.serializer();
+ primaryKeySerializer = primaryKeySerde == null ? null : primaryKeySerde.serializer();
+ primaryKeyDeserializer = primaryKeySerde == null ? null : primaryKeySerde.deserializer();
+ foreignKeyDeserializer = foreignKeySerde == null ? null : foreignKeySerde.deserializer();
+ foreignKeySerializer = foreignKeySerde == null ? null : foreignKeySerde.serializer();
}
@SuppressWarnings("unchecked")
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/foreignkeyjoin/ForeignJoinSubscriptionSendProcessorSupplier.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/foreignkeyjoin/ForeignJoinSubscriptionSendProcessorSupplier.java
index 00815f9..06bd7d2 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/foreignkeyjoin/ForeignJoinSubscriptionSendProcessorSupplier.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/foreignkeyjoin/ForeignJoinSubscriptionSendProcessorSupplier.java
@@ -44,9 +44,9 @@ public class ForeignJoinSubscriptionSendProcessorSupplier<K, KO, V> implements P
private final Function<V, KO> foreignKeyExtractor;
private final String repartitionTopicName;
- private final Serializer<V> valueSerializer;
private final boolean leftJoin;
private Serializer<KO> foreignKeySerializer;
+ private Serializer<V> valueSerializer;
public ForeignJoinSubscriptionSendProcessorSupplier(final Function<V, KO> foreignKeyExtractor,
final Serde<KO> foreignKeySerde,
@@ -77,6 +77,9 @@ public class ForeignJoinSubscriptionSendProcessorSupplier<K, KO, V> implements P
if (foreignKeySerializer == null) {
foreignKeySerializer = (Serializer<KO>) context.keySerde().serializer();
}
+ if (valueSerializer == null) {
+ valueSerializer = (Serializer<V>) context.valueSerde().serializer();
+ }
droppedRecordsSensor = TaskMetrics.droppedRecordsSensorOrSkippedRecordsSensor(
Thread.currentThread().getName(),
context.taskId().toString(),
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/foreignkeyjoin/SubscriptionResolverJoinProcessorSupplier.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/foreignkeyjoin/SubscriptionResolverJoinProcessorSupplier.java
index a188f15..8fa77aa 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/foreignkeyjoin/SubscriptionResolverJoinProcessorSupplier.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/foreignkeyjoin/SubscriptionResolverJoinProcessorSupplier.java
@@ -41,7 +41,7 @@ import org.apache.kafka.streams.state.internals.Murmur3;
*/
public class SubscriptionResolverJoinProcessorSupplier<K, V, VO, VR> implements ProcessorSupplier<K, SubscriptionResponseWrapper<VO>> {
private final KTableValueGetterSupplier<K, V> valueGetterSupplier;
- private final Serializer<V> valueSerializer;
+ private final Serializer<V> constructionTimeValueSerializer;
private final ValueJoiner<V, VO, VR> joiner;
private final boolean leftJoin;
@@ -50,7 +50,7 @@ public class SubscriptionResolverJoinProcessorSupplier<K, V, VO, VR> implements
final ValueJoiner<V, VO, VR> joiner,
final boolean leftJoin) {
this.valueGetterSupplier = valueGetterSupplier;
- this.valueSerializer = valueSerializer;
+ constructionTimeValueSerializer = valueSerializer;
this.joiner = joiner;
this.leftJoin = leftJoin;
}
@@ -58,14 +58,19 @@ public class SubscriptionResolverJoinProcessorSupplier<K, V, VO, VR> implements
@Override
public Processor<K, SubscriptionResponseWrapper<VO>> get() {
return new AbstractProcessor<K, SubscriptionResponseWrapper<VO>>() {
+ private Serializer<V> runtimeValueSerializer = constructionTimeValueSerializer;
private KTableValueGetter<K, V> valueGetter;
+ @SuppressWarnings("unchecked")
@Override
public void init(final ProcessorContext context) {
super.init(context);
valueGetter = valueGetterSupplier.get();
valueGetter.init(context);
+ if (runtimeValueSerializer == null) {
+ runtimeValueSerializer = (Serializer<V>) context.valueSerde().serializer();
+ }
}
@Override
@@ -86,7 +91,7 @@ public class SubscriptionResolverJoinProcessorSupplier<K, V, VO, VR> implements
final String dummySerializationTopic = context().topic() + "-join-resolver";
final long[] currentHash = currentValueWithTimestamp == null ?
null :
- Murmur3.hash128(valueSerializer.serialize(dummySerializationTopic, currentValueWithTimestamp.value()));
+ Murmur3.hash128(runtimeValueSerializer.serialize(dummySerializationTopic, currentValueWithTimestamp.value()));
final long[] messageHash = value.getOriginalValueHash();
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/foreignkeyjoin/SubscriptionResponseWrapperSerde.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/foreignkeyjoin/SubscriptionResponseWrapperSerde.java
index 4277f9a..31317c5 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/foreignkeyjoin/SubscriptionResponseWrapperSerde.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/foreignkeyjoin/SubscriptionResponseWrapperSerde.java
@@ -20,16 +20,19 @@ import org.apache.kafka.common.errors.UnsupportedVersionException;
import org.apache.kafka.common.serialization.Deserializer;
import org.apache.kafka.common.serialization.Serde;
import org.apache.kafka.common.serialization.Serializer;
+import org.apache.kafka.streams.kstream.internals.WrappingNullableDeserializer;
+import org.apache.kafka.streams.kstream.internals.WrappingNullableSerializer;
import java.nio.ByteBuffer;
+import java.util.Objects;
public class SubscriptionResponseWrapperSerde<V> implements Serde<SubscriptionResponseWrapper<V>> {
private final SubscriptionResponseWrapperSerializer<V> serializer;
private final SubscriptionResponseWrapperDeserializer<V> deserializer;
public SubscriptionResponseWrapperSerde(final Serde<V> foreignValueSerde) {
- serializer = new SubscriptionResponseWrapperSerializer<>(foreignValueSerde.serializer());
- deserializer = new SubscriptionResponseWrapperDeserializer<>(foreignValueSerde.deserializer());
+ serializer = new SubscriptionResponseWrapperSerializer<>(foreignValueSerde == null ? null : foreignValueSerde.serializer());
+ deserializer = new SubscriptionResponseWrapperDeserializer<>(foreignValueSerde == null ? null : foreignValueSerde.deserializer());
}
@Override
@@ -42,14 +45,23 @@ public class SubscriptionResponseWrapperSerde<V> implements Serde<SubscriptionRe
return deserializer;
}
- private static final class SubscriptionResponseWrapperSerializer<V> implements Serializer<SubscriptionResponseWrapper<V>> {
- private final Serializer<V> serializer;
+ private static final class SubscriptionResponseWrapperSerializer<V>
+ implements Serializer<SubscriptionResponseWrapper<V>>, WrappingNullableSerializer<SubscriptionResponseWrapper<V>, V> {
+
+ private Serializer<V> serializer;
private SubscriptionResponseWrapperSerializer(final Serializer<V> serializer) {
this.serializer = serializer;
}
@Override
+ public void setIfUnset(final Serializer<V> defaultSerializer) {
+ if (serializer == null) {
+ serializer = Objects.requireNonNull(defaultSerializer, "defaultSerializer cannot be null");
+ }
+ }
+
+ @Override
public byte[] serialize(final String topic, final SubscriptionResponseWrapper<V> data) {
//{1-bit-isHashNull}{7-bits-version}{Optional-16-byte-Hash}{n-bytes serialized data}
@@ -81,14 +93,23 @@ public class SubscriptionResponseWrapperSerde<V> implements Serde<SubscriptionRe
}
- private static final class SubscriptionResponseWrapperDeserializer<V> implements Deserializer<SubscriptionResponseWrapper<V>> {
- private final Deserializer<V> deserializer;
+ private static final class SubscriptionResponseWrapperDeserializer<V>
+ implements Deserializer<SubscriptionResponseWrapper<V>>, WrappingNullableDeserializer<SubscriptionResponseWrapper<V>, V> {
+
+ private Deserializer<V> deserializer;
private SubscriptionResponseWrapperDeserializer(final Deserializer<V> deserializer) {
this.deserializer = deserializer;
}
@Override
+ public void setIfUnset(final Deserializer<V> defaultDeserializer) {
+ if (deserializer == null) {
+ deserializer = Objects.requireNonNull(defaultDeserializer, "defaultDeserializer cannot be null");
+ }
+ }
+
+ @Override
public SubscriptionResponseWrapper<V> deserialize(final String topic, final byte[] data) {
//{1-bit-isHashNull}{7-bits-version}{Optional-16-byte-Hash}{n-bytes serialized data}
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/foreignkeyjoin/SubscriptionStoreReceiveProcessorSupplier.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/foreignkeyjoin/SubscriptionStoreReceiveProcessorSupplier.java
index 9cbeadd..61fb1c1 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/foreignkeyjoin/SubscriptionStoreReceiveProcessorSupplier.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/foreignkeyjoin/SubscriptionStoreReceiveProcessorSupplier.java
@@ -68,6 +68,8 @@ public class SubscriptionStoreReceiveProcessorSupplier<K, KO>
internalProcessorContext.metrics()
);
store = internalProcessorContext.getStateStore(storeBuilder);
+
+ keySchema.init(context);
}
@Override
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/foreignkeyjoin/SubscriptionWrapperSerde.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/foreignkeyjoin/SubscriptionWrapperSerde.java
index ae53ba8..1cb3293 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/foreignkeyjoin/SubscriptionWrapperSerde.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/foreignkeyjoin/SubscriptionWrapperSerde.java
@@ -20,16 +20,19 @@ import org.apache.kafka.common.errors.UnsupportedVersionException;
import org.apache.kafka.common.serialization.Deserializer;
import org.apache.kafka.common.serialization.Serde;
import org.apache.kafka.common.serialization.Serializer;
+import org.apache.kafka.streams.kstream.internals.WrappingNullableDeserializer;
+import org.apache.kafka.streams.kstream.internals.WrappingNullableSerializer;
import java.nio.ByteBuffer;
+import java.util.Objects;
public class SubscriptionWrapperSerde<K> implements Serde<SubscriptionWrapper<K>> {
private final SubscriptionWrapperSerializer<K> serializer;
private final SubscriptionWrapperDeserializer<K> deserializer;
public SubscriptionWrapperSerde(final Serde<K> primaryKeySerde) {
- serializer = new SubscriptionWrapperSerializer<>(primaryKeySerde.serializer());
- deserializer = new SubscriptionWrapperDeserializer<>(primaryKeySerde.deserializer());
+ serializer = new SubscriptionWrapperSerializer<>(primaryKeySerde == null ? null : primaryKeySerde.serializer());
+ deserializer = new SubscriptionWrapperDeserializer<>(primaryKeySerde == null ? null : primaryKeySerde.deserializer());
}
@Override
@@ -42,13 +45,23 @@ public class SubscriptionWrapperSerde<K> implements Serde<SubscriptionWrapper<K>
return deserializer;
}
- private static class SubscriptionWrapperSerializer<K> implements Serializer<SubscriptionWrapper<K>> {
- private final Serializer<K> primaryKeySerializer;
+ public static class SubscriptionWrapperSerializer<K>
+ implements Serializer<SubscriptionWrapper<K>>, WrappingNullableSerializer<SubscriptionWrapper<K>, K> {
+
+ private Serializer<K> primaryKeySerializer;
+
SubscriptionWrapperSerializer(final Serializer<K> primaryKeySerializer) {
this.primaryKeySerializer = primaryKeySerializer;
}
@Override
+ public void setIfUnset(final Serializer<K> defaultSerializer) {
+ if (primaryKeySerializer == null) {
+ primaryKeySerializer = Objects.requireNonNull(defaultSerializer, "defaultSerializer cannot be null");
+ }
+ }
+
+ @Override
public byte[] serialize(final String topic, final SubscriptionWrapper<K> data) {
//{1-bit-isHashNull}{7-bits-version}{1-byte-instruction}{Optional-16-byte-Hash}{PK-serialized}
@@ -81,13 +94,23 @@ public class SubscriptionWrapperSerde<K> implements Serde<SubscriptionWrapper<K>
}
- private static class SubscriptionWrapperDeserializer<K> implements Deserializer<SubscriptionWrapper<K>> {
- private final Deserializer<K> primaryKeyDeserializer;
+ public static class SubscriptionWrapperDeserializer<K>
+ implements Deserializer<SubscriptionWrapper<K>>, WrappingNullableDeserializer<SubscriptionWrapper<K>, K> {
+
+ private Deserializer<K> primaryKeyDeserializer;
+
SubscriptionWrapperDeserializer(final Deserializer<K> primaryKeyDeserializer) {
this.primaryKeyDeserializer = primaryKeyDeserializer;
}
@Override
+ public void setIfUnset(final Deserializer<K> defaultDeserializer) {
+ if (primaryKeyDeserializer == null) {
+ primaryKeyDeserializer = Objects.requireNonNull(defaultDeserializer, "defaultDeserializer cannot be null");
+ }
+ }
+
+ @Override
public SubscriptionWrapper<K> deserialize(final String topic, final byte[] data) {
//{7-bits-version}{1-bit-isHashNull}{1-byte-instruction}{Optional-16-byte-Hash}{PK-serialized}
final ByteBuffer buf = ByteBuffer.wrap(data);
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/SinkNode.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/SinkNode.java
index b94291b..e3333be 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/SinkNode.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/SinkNode.java
@@ -18,7 +18,7 @@ package org.apache.kafka.streams.processor.internals;
import org.apache.kafka.common.serialization.Serializer;
import org.apache.kafka.streams.errors.StreamsException;
-import org.apache.kafka.streams.kstream.internals.ChangedSerializer;
+import org.apache.kafka.streams.kstream.internals.WrappingNullableSerializer;
import org.apache.kafka.streams.processor.StreamPartitioner;
import org.apache.kafka.streams.processor.TopicNameExtractor;
@@ -66,10 +66,10 @@ public class SinkNode<K, V> extends ProcessorNode<K, V> {
valSerializer = (Serializer<V>) context.valueSerde().serializer();
}
- // if value serializers are for {@code Change} values, set the inner serializer when necessary
- if (valSerializer instanceof ChangedSerializer &&
- ((ChangedSerializer) valSerializer).inner() == null) {
- ((ChangedSerializer) valSerializer).setInner(context.valueSerde().serializer());
+ // if value serializers are internal wrapping serializers that may need to be given the default serializer
+ // then pass it the default one from the context
+ if (valSerializer instanceof WrappingNullableSerializer) {
+ ((WrappingNullableSerializer) valSerializer).setIfUnset(context.valueSerde().serializer());
}
}
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/SourceNode.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/SourceNode.java
index 33d08b1..853520a 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/SourceNode.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/SourceNode.java
@@ -19,7 +19,7 @@ package org.apache.kafka.streams.processor.internals;
import org.apache.kafka.common.header.Headers;
import org.apache.kafka.common.metrics.Sensor;
import org.apache.kafka.common.serialization.Deserializer;
-import org.apache.kafka.streams.kstream.internals.ChangedDeserializer;
+import org.apache.kafka.streams.kstream.internals.WrappingNullableDeserializer;
import org.apache.kafka.streams.processor.ProcessorContext;
import org.apache.kafka.streams.processor.TimestampExtractor;
import org.apache.kafka.streams.processor.internals.metrics.ProcessorNodeMetrics;
@@ -88,10 +88,10 @@ public class SourceNode<K, V> extends ProcessorNode<K, V> {
this.valDeserializer = (Deserializer<V>) context.valueSerde().deserializer();
}
- // if value deserializers are for {@code Change} values, set the inner deserializer when necessary
- if (this.valDeserializer instanceof ChangedDeserializer &&
- ((ChangedDeserializer) this.valDeserializer).inner() == null) {
- ((ChangedDeserializer) this.valDeserializer).setInner(context.valueSerde().deserializer());
+ // if value deserializers are internal wrapping deserializers that may need to be given the default
+ // then pass it the default one from the context
+ if (valDeserializer instanceof WrappingNullableDeserializer) {
+ ((WrappingNullableDeserializer) valDeserializer).setIfUnset(context.valueSerde().deserializer());
}
}
diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/ForeignKeyJoinSuite.java b/streams/src/test/java/org/apache/kafka/streams/integration/ForeignKeyJoinSuite.java
index 47e2d95..dbe5bc2 100644
--- a/streams/src/test/java/org/apache/kafka/streams/integration/ForeignKeyJoinSuite.java
+++ b/streams/src/test/java/org/apache/kafka/streams/integration/ForeignKeyJoinSuite.java
@@ -39,6 +39,7 @@ import org.junit.runners.Suite;
KTableKTableForeignKeyInnerJoinMultiIntegrationTest.class,
KTableKTableForeignKeyJoinIntegrationTest.class,
KTableKTableForeignKeyJoinMaterializationIntegrationTest.class,
+ KTableKTableForeignKeyJoinDefaultSerdeTest.class,
CombinedKeySchemaTest.class,
SubscriptionWrapperSerdeTest.class,
SubscriptionResponseWrapperSerdeTest.class,
diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/KTableKTableForeignKeyJoinDefaultSerdeTest.java b/streams/src/test/java/org/apache/kafka/streams/integration/KTableKTableForeignKeyJoinDefaultSerdeTest.java
new file mode 100644
index 0000000..df6349a
--- /dev/null
+++ b/streams/src/test/java/org/apache/kafka/streams/integration/KTableKTableForeignKeyJoinDefaultSerdeTest.java
@@ -0,0 +1,178 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.integration;
+
+import org.apache.kafka.common.serialization.Serdes;
+import org.apache.kafka.common.serialization.StringDeserializer;
+import org.apache.kafka.common.serialization.StringSerializer;
+import org.apache.kafka.common.utils.Bytes;
+import org.apache.kafka.streams.StreamsBuilder;
+import org.apache.kafka.streams.StreamsConfig;
+import org.apache.kafka.streams.TestInputTopic;
+import org.apache.kafka.streams.TestOutputTopic;
+import org.apache.kafka.streams.TopologyTestDriver;
+import org.apache.kafka.streams.kstream.Consumed;
+import org.apache.kafka.streams.kstream.KTable;
+import org.apache.kafka.streams.kstream.Materialized;
+import org.apache.kafka.streams.kstream.Produced;
+import org.apache.kafka.streams.state.KeyValueStore;
+import org.junit.Test;
+
+import java.util.Collections;
+import java.util.Map;
+import java.util.Properties;
+
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.hamcrest.Matchers.is;
+
+public class KTableKTableForeignKeyJoinDefaultSerdeTest {
+ @Test
+ public void shouldWorkWithDefaultSerdes() {
+ final StreamsBuilder builder = new StreamsBuilder();
+ final KTable<String, String> aTable = builder.table("A");
+ final KTable<String, String> bTable = builder.table("B");
+
+ final KTable<String, String> fkJoinResult = aTable.join(
+ bTable,
+ value -> value.split("-")[0],
+ (aVal, bVal) -> "(" + aVal + "," + bVal + ")",
+ Materialized.as("asdf")
+ );
+
+ final KTable<String, String> finalJoinResult = aTable.join(
+ fkJoinResult,
+ (aVal, fkJoinVal) -> "(" + aVal + "," + fkJoinVal + ")"
+ );
+
+ finalJoinResult.toStream().to("output");
+
+ validateTopologyCanProcessData(builder);
+ }
+
+ @Test
+ public void shouldWorkWithDefaultAndConsumedSerdes() {
+ final StreamsBuilder builder = new StreamsBuilder();
+ final KTable<String, String> aTable = builder.table("A", Consumed.with(Serdes.String(), Serdes.String()));
+ final KTable<String, String> bTable = builder.table("B");
+
+ final KTable<String, String> fkJoinResult = aTable.join(
+ bTable,
+ value -> value.split("-")[0],
+ (aVal, bVal) -> "(" + aVal + "," + bVal + ")",
+ Materialized.as("asdf")
+ );
+
+ final KTable<String, String> finalJoinResult = aTable.join(
+ fkJoinResult,
+ (aVal, fkJoinVal) -> "(" + aVal + "," + fkJoinVal + ")"
+ );
+
+ finalJoinResult.toStream().to("output");
+
+ validateTopologyCanProcessData(builder);
+ }
+
+ @Test
+ public void shouldWorkWithDefaultAndJoinResultSerdes() {
+ final StreamsBuilder builder = new StreamsBuilder();
+ final KTable<String, String> aTable = builder.table("A");
+ final KTable<String, String> bTable = builder.table("B");
+
+ final KTable<String, String> fkJoinResult = aTable.join(
+ bTable,
+ value -> value.split("-")[0],
+ (aVal, bVal) -> "(" + aVal + "," + bVal + ")",
+ Materialized
+ .<String, String, KeyValueStore<Bytes, byte[]>>as("asdf")
+ .withKeySerde(Serdes.String())
+ .withValueSerde(Serdes.String())
+ );
+
+ final KTable<String, String> finalJoinResult = aTable.join(
+ fkJoinResult,
+ (aVal, fkJoinVal) -> "(" + aVal + "," + fkJoinVal + ")"
+ );
+
+ finalJoinResult.toStream().to("output");
+
+ validateTopologyCanProcessData(builder);
+ }
+
+ @Test
+ public void shouldWorkWithDefaultAndEquiJoinResultSerdes() {
+ final StreamsBuilder builder = new StreamsBuilder();
+ final KTable<String, String> aTable = builder.table("A");
+ final KTable<String, String> bTable = builder.table("B");
+
+ final KTable<String, String> fkJoinResult = aTable.join(
+ bTable,
+ value -> value.split("-")[0],
+ (aVal, bVal) -> "(" + aVal + "," + bVal + ")",
+ Materialized.as("asdf")
+ );
+
+ final KTable<String, String> finalJoinResult = aTable.join(
+ fkJoinResult,
+ (aVal, fkJoinVal) -> "(" + aVal + "," + fkJoinVal + ")",
+ Materialized.with(Serdes.String(), Serdes.String())
+ );
+
+ finalJoinResult.toStream().to("output");
+
+ validateTopologyCanProcessData(builder);
+ }
+
+ @Test
+ public void shouldWorkWithDefaultAndProducedSerdes() {
+ final StreamsBuilder builder = new StreamsBuilder();
+ final KTable<String, String> aTable = builder.table("A");
+ final KTable<String, String> bTable = builder.table("B");
+
+ final KTable<String, String> fkJoinResult = aTable.join(
+ bTable,
+ value -> value.split("-")[0],
+ (aVal, bVal) -> "(" + aVal + "," + bVal + ")",
+ Materialized.as("asdf")
+ );
+
+ final KTable<String, String> finalJoinResult = aTable.join(
+ fkJoinResult,
+ (aVal, fkJoinVal) -> "(" + aVal + "," + fkJoinVal + ")"
+ );
+
+ finalJoinResult.toStream().to("output", Produced.with(Serdes.String(), Serdes.String()));
+
+ validateTopologyCanProcessData(builder);
+ }
+
+ private static void validateTopologyCanProcessData(final StreamsBuilder builder) {
+ final Properties config = new Properties();
+ config.setProperty(StreamsConfig.APPLICATION_ID_CONFIG, "dummy");
+ config.setProperty(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "dummy");
+ config.setProperty(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.StringSerde.class.getName());
+ config.setProperty(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.StringSerde.class.getName());
+ try (final TopologyTestDriver topologyTestDriver = new TopologyTestDriver(builder.build(), config)) {
+ final TestInputTopic<String, String> aTopic = topologyTestDriver.createInputTopic("A", new StringSerializer(), new StringSerializer());
+ final TestInputTopic<String, String> bTopic = topologyTestDriver.createInputTopic("B", new StringSerializer(), new StringSerializer());
+ final TestOutputTopic<String, String> output = topologyTestDriver.createOutputTopic("output", new StringDeserializer(), new StringDeserializer());
+ aTopic.pipeInput("a1", "b1-alpha");
+ bTopic.pipeInput("b1", "beta");
+ final Map<String, String> x = output.readKeyValuesToMap();
+ assertThat(x, is(Collections.singletonMap("a1", "(b1-alpha,(b1-alpha,beta))")));
+ }
+ }
+}
diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamsPartitionAssignorTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamsPartitionAssignorTest.java
index 12fc9c7..23e4c91 100644
--- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamsPartitionAssignorTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamsPartitionAssignorTest.java
@@ -1755,7 +1755,6 @@ public class StreamsPartitionAssignorTest {
createMockTaskManager(emptyTasks, emptyTasks, uuid1, builder);
EasyMock.replay(taskManager);
- streamsMetadataState = EasyMock.createNiceMock(StreamsMetadataState.class);
configurePartitionAssignor(emptyMap());
final MockInternalTopicManager internalTopicManager =
new MockInternalTopicManager(streamsConfig, mockClientSupplier.restoreConsumer);