You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by gu...@apache.org on 2016/04/27 19:50:24 UTC

kafka git commit: KAFKA-3629; KStreamImpl.to(...) throws NPE when the value SerDe is null

Repository: kafka
Updated Branches:
  refs/heads/trunk 2885bc33d -> 4c76b5fa6


KAFKA-3629; KStreamImpl.to(...) throws NPE when the value SerDe is null

guozhangwang

Author: Damian Guy <da...@gmail.com>

Reviewers: Matthias J. Sax, Guozhang Wang

Closes #1272 from dguy/kstreamimpl-to-npe and squashes the following commits:

49d48fb [Damian Guy] actually commit the fix
07ce589 [Damian Guy] fix npe in KStreamImpl.to(..)
74d396d [Damian Guy] fix npe in KStreamImpl.to(..)


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/4c76b5fa
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/4c76b5fa
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/4c76b5fa

Branch: refs/heads/trunk
Commit: 4c76b5fa6a72412efa5936c284800148c2c69c24
Parents: 2885bc3
Author: Damian Guy <da...@gmail.com>
Authored: Wed Apr 27 10:50:20 2016 -0700
Committer: Guozhang Wang <wa...@gmail.com>
Committed: Wed Apr 27 10:50:20 2016 -0700

----------------------------------------------------------------------
 .../apache/kafka/streams/kstream/internals/KStreamImpl.java   | 2 +-
 .../kafka/streams/kstream/internals/KStreamImplTest.java      | 7 +++++++
 2 files changed, 8 insertions(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/4c76b5fa/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamImpl.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamImpl.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamImpl.java
index a84b4aa..91bcef9 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamImpl.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamImpl.java
@@ -298,7 +298,7 @@ public class KStreamImpl<K, V> extends AbstractStream<K> implements KStream<K, V
         String name = topology.newName(SINK_NAME);
 
         Serializer<K> keySerializer = keySerde == null ? null : keySerde.serializer();
-        Serializer<V> valSerializer = keySerde == null ? null : valSerde.serializer();
+        Serializer<V> valSerializer = valSerde == null ? null : valSerde.serializer();
         
         if (partitioner == null && keySerializer != null && keySerializer instanceof WindowedSerializer) {
             WindowedSerializer<Object> windowedSerializer = (WindowedSerializer<Object>) keySerializer;

http://git-wip-us.apache.org/repos/asf/kafka/blob/4c76b5fa/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamImplTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamImplTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamImplTest.java
index b5c3d47..3d45d1d 100644
--- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamImplTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamImplTest.java
@@ -133,4 +133,11 @@ public class KStreamImplTest {
             1, // process
             builder.build("X", null).processors().size());
     }
+
+    @Test
+    public void testToWithNullValueSerdeDoesntNPE() {
+        final KStreamBuilder builder = new KStreamBuilder();
+        final KStream<String, String> inputStream = builder.stream(stringSerde, stringSerde, "input");
+        inputStream.to(stringSerde, null, "output");
+    }
 }