You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by gu...@apache.org on 2016/04/27 19:50:24 UTC
kafka git commit: KAFKA-3629;
KStreamImpl.to(...) throws NPE when the value SerDe is null
Repository: kafka
Updated Branches:
refs/heads/trunk 2885bc33d -> 4c76b5fa6
KAFKA-3629; KStreamImpl.to(...) throws NPE when the value SerDe is null
guozhangwang
Author: Damian Guy <da...@gmail.com>
Reviewers: Matthias J. Sax, Guozhang Wang
Closes #1272 from dguy/kstreamimpl-to-npe and squashes the following commits:
49d48fb [Damian Guy] actually commit the fix
07ce589 [Damian Guy] fix npe in KStreamImpl.to(..)
74d396d [Damian Guy] fix npe in KStreamImpl.to(..)
Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/4c76b5fa
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/4c76b5fa
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/4c76b5fa
Branch: refs/heads/trunk
Commit: 4c76b5fa6a72412efa5936c284800148c2c69c24
Parents: 2885bc3
Author: Damian Guy <da...@gmail.com>
Authored: Wed Apr 27 10:50:20 2016 -0700
Committer: Guozhang Wang <wa...@gmail.com>
Committed: Wed Apr 27 10:50:20 2016 -0700
----------------------------------------------------------------------
.../apache/kafka/streams/kstream/internals/KStreamImpl.java | 2 +-
.../kafka/streams/kstream/internals/KStreamImplTest.java | 7 +++++++
2 files changed, 8 insertions(+), 1 deletion(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/kafka/blob/4c76b5fa/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamImpl.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamImpl.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamImpl.java
index a84b4aa..91bcef9 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamImpl.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamImpl.java
@@ -298,7 +298,7 @@ public class KStreamImpl<K, V> extends AbstractStream<K> implements KStream<K, V
String name = topology.newName(SINK_NAME);
Serializer<K> keySerializer = keySerde == null ? null : keySerde.serializer();
- Serializer<V> valSerializer = keySerde == null ? null : valSerde.serializer();
+ Serializer<V> valSerializer = valSerde == null ? null : valSerde.serializer();
if (partitioner == null && keySerializer != null && keySerializer instanceof WindowedSerializer) {
WindowedSerializer<Object> windowedSerializer = (WindowedSerializer<Object>) keySerializer;
http://git-wip-us.apache.org/repos/asf/kafka/blob/4c76b5fa/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamImplTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamImplTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamImplTest.java
index b5c3d47..3d45d1d 100644
--- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamImplTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamImplTest.java
@@ -133,4 +133,11 @@ public class KStreamImplTest {
1, // process
builder.build("X", null).processors().size());
}
+
+ @Test
+ public void testToWithNullValueSerdeDoesntNPE() {
+ final KStreamBuilder builder = new KStreamBuilder();
+ final KStream<String, String> inputStream = builder.stream(stringSerde, stringSerde, "input");
+ inputStream.to(stringSerde, null, "output");
+ }
}