You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by gu...@apache.org on 2019/08/22 20:37:34 UTC

[kafka] branch 2.3 updated: KAFKA-8824: bypass value serde on null (#7235)

This is an automated email from the ASF dual-hosted git repository.

guozhang pushed a commit to branch 2.3
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/2.3 by this push:
     new 9100009  KAFKA-8824: bypass value serde on null (#7235)
9100009 is described below

commit 91000095c2685cac3ac6788fad42076a795f8085
Author: John Roesler <vv...@users.noreply.github.com>
AuthorDate: Thu Aug 22 15:35:33 2019 -0500

    KAFKA-8824: bypass value serde on null (#7235)
    
    In a KTable context, we should not pass null into a user-supplied serde.
    
    Testing: I verified that the change to the test results in test failures without the patch.
    
    Reviewers: Matthias J. Sax <ma...@confluent.io>, Bill Bejeck <bi...@confluent.io>, Guozhang Wang <wa...@gmail.com>,
---
 .../state/internals/InMemoryTimeOrderedKeyValueBuffer.java |  2 +-
 .../state/internals/TimeOrderedKeyValueBufferTest.java     | 14 +++++++++++++-
 2 files changed, 14 insertions(+), 2 deletions(-)

diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/InMemoryTimeOrderedKeyValueBuffer.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/InMemoryTimeOrderedKeyValueBuffer.java
index 6c5022f..14a41ea 100644
--- a/streams/src/main/java/org/apache/kafka/streams/state/internals/InMemoryTimeOrderedKeyValueBuffer.java
+++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/InMemoryTimeOrderedKeyValueBuffer.java
@@ -448,7 +448,7 @@ public final class InMemoryTimeOrderedKeyValueBuffer<K, V> implements TimeOrdere
         final byte[] serializedPriorValue;
         if (buffered == null) {
             final V priorValue = value.oldValue;
-            serializedPriorValue = valueSerde.innerSerde().serializer().serialize(changelogTopic, priorValue);
+            serializedPriorValue = (priorValue == null) ? null : valueSerde.innerSerde().serializer().serialize(changelogTopic, priorValue);
         } else {
             serializedPriorValue = buffered.priorValue();
         }
diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/TimeOrderedKeyValueBufferTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/TimeOrderedKeyValueBufferTest.java
index 941832b..e8a62d0 100644
--- a/streams/src/test/java/org/apache/kafka/streams/state/internals/TimeOrderedKeyValueBufferTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/TimeOrderedKeyValueBufferTest.java
@@ -24,6 +24,8 @@ import org.apache.kafka.common.header.internals.RecordHeaders;
 import org.apache.kafka.common.record.TimestampType;
 import org.apache.kafka.common.serialization.Serdes;
 import org.apache.kafka.common.serialization.Serializer;
+import org.apache.kafka.common.serialization.StringDeserializer;
+import org.apache.kafka.common.serialization.StringSerializer;
 import org.apache.kafka.common.utils.Utils;
 import org.apache.kafka.streams.KeyValue;
 import org.apache.kafka.streams.StreamsConfig;
@@ -68,6 +70,16 @@ public class TimeOrderedKeyValueBufferTest<B extends TimeOrderedKeyValueBuffer<S
     private final Function<String, B> bufferSupplier;
     private final String testName;
 
+    public static final class NullRejectingStringSerializer extends StringSerializer {
+        @Override
+        public byte[] serialize(final String topic, final String data) {
+            if (data == null) {
+                throw new IllegalArgumentException();
+            }
+            return super.serialize(topic, data);
+        }
+    }
+
     // As we add more buffer implementations/configurations, we can add them here
     @Parameterized.Parameters(name = "{index}: test={0}")
     public static Collection<Object[]> parameters() {
@@ -76,7 +88,7 @@ public class TimeOrderedKeyValueBufferTest<B extends TimeOrderedKeyValueBuffer<S
                 "in-memory buffer",
                 (Function<String, InMemoryTimeOrderedKeyValueBuffer<String, String>>) name ->
                     new InMemoryTimeOrderedKeyValueBuffer
-                        .Builder<>(name, Serdes.String(), Serdes.String())
+                        .Builder<>(name, Serdes.String(), Serdes.serdeFrom(new NullRejectingStringSerializer(), new StringDeserializer()))
                         .build()
             }
         );