You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by gu...@apache.org on 2019/08/22 20:37:34 UTC
[kafka] branch 2.3 updated: KAFKA-8824: bypass value serde on null
(#7235)
This is an automated email from the ASF dual-hosted git repository.
guozhang pushed a commit to branch 2.3
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/2.3 by this push:
new 9100009 KAFKA-8824: bypass value serde on null (#7235)
9100009 is described below
commit 91000095c2685cac3ac6788fad42076a795f8085
Author: John Roesler <vv...@users.noreply.github.com>
AuthorDate: Thu Aug 22 15:35:33 2019 -0500
KAFKA-8824: bypass value serde on null (#7235)
In a KTable context, we should not pass null into a user-supplied serde.
Testing: I verified that the change to the test results in test failures without the patch.
Reviewers: Matthias J. Sax <ma...@confluent.io>, Bill Bejeck <bi...@confluent.io>, Guozhang Wang <wa...@gmail.com>,
---
.../state/internals/InMemoryTimeOrderedKeyValueBuffer.java | 2 +-
.../state/internals/TimeOrderedKeyValueBufferTest.java | 14 +++++++++++++-
2 files changed, 14 insertions(+), 2 deletions(-)
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/InMemoryTimeOrderedKeyValueBuffer.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/InMemoryTimeOrderedKeyValueBuffer.java
index 6c5022f..14a41ea 100644
--- a/streams/src/main/java/org/apache/kafka/streams/state/internals/InMemoryTimeOrderedKeyValueBuffer.java
+++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/InMemoryTimeOrderedKeyValueBuffer.java
@@ -448,7 +448,7 @@ public final class InMemoryTimeOrderedKeyValueBuffer<K, V> implements TimeOrdere
final byte[] serializedPriorValue;
if (buffered == null) {
final V priorValue = value.oldValue;
- serializedPriorValue = valueSerde.innerSerde().serializer().serialize(changelogTopic, priorValue);
+ serializedPriorValue = (priorValue == null) ? null : valueSerde.innerSerde().serializer().serialize(changelogTopic, priorValue);
} else {
serializedPriorValue = buffered.priorValue();
}
diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/TimeOrderedKeyValueBufferTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/TimeOrderedKeyValueBufferTest.java
index 941832b..e8a62d0 100644
--- a/streams/src/test/java/org/apache/kafka/streams/state/internals/TimeOrderedKeyValueBufferTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/TimeOrderedKeyValueBufferTest.java
@@ -24,6 +24,8 @@ import org.apache.kafka.common.header.internals.RecordHeaders;
import org.apache.kafka.common.record.TimestampType;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.common.serialization.Serializer;
+import org.apache.kafka.common.serialization.StringDeserializer;
+import org.apache.kafka.common.serialization.StringSerializer;
import org.apache.kafka.common.utils.Utils;
import org.apache.kafka.streams.KeyValue;
import org.apache.kafka.streams.StreamsConfig;
@@ -68,6 +70,16 @@ public class TimeOrderedKeyValueBufferTest<B extends TimeOrderedKeyValueBuffer<S
private final Function<String, B> bufferSupplier;
private final String testName;
+ public static final class NullRejectingStringSerializer extends StringSerializer {
+ @Override
+ public byte[] serialize(final String topic, final String data) {
+ if (data == null) {
+ throw new IllegalArgumentException();
+ }
+ return super.serialize(topic, data);
+ }
+ }
+
// As we add more buffer implementations/configurations, we can add them here
@Parameterized.Parameters(name = "{index}: test={0}")
public static Collection<Object[]> parameters() {
@@ -76,7 +88,7 @@ public class TimeOrderedKeyValueBufferTest<B extends TimeOrderedKeyValueBuffer<S
"in-memory buffer",
(Function<String, InMemoryTimeOrderedKeyValueBuffer<String, String>>) name ->
new InMemoryTimeOrderedKeyValueBuffer
- .Builder<>(name, Serdes.String(), Serdes.String())
+ .Builder<>(name, Serdes.String(), Serdes.serdeFrom(new NullRejectingStringSerializer(), new StringDeserializer()))
.build()
}
);