You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by gu...@apache.org on 2016/06/18 19:02:20 UTC

kafka git commit: MINOR: Check null keys in KTableSource

Repository: kafka
Updated Branches:
  refs/heads/trunk 0b0925a16 -> 91135ea33


MINOR: Check null keys in KTableSource

Author: Guozhang Wang <wa...@gmail.com>

Reviewers: Damian Guy, Matthias J. Sax

Closes #1521 from guozhangwang/Kminor-check-nullkey-ktable-source


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/91135ea3
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/91135ea3
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/91135ea3

Branch: refs/heads/trunk
Commit: 91135ea33a7231e9b17cfc74c78dc5223475834b
Parents: 0b0925a
Author: Guozhang Wang <wa...@gmail.com>
Authored: Sat Jun 18 12:02:17 2016 -0700
Committer: Guozhang Wang <wa...@gmail.com>
Committed: Sat Jun 18 12:02:17 2016 -0700

----------------------------------------------------------------------
 .../org/apache/kafka/streams/kstream/KStreamBuilder.java  |  2 ++
 .../kafka/streams/kstream/internals/KTableSource.java     | 10 +++++++++-
 2 files changed, 11 insertions(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/91135ea3/streams/src/main/java/org/apache/kafka/streams/kstream/KStreamBuilder.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/KStreamBuilder.java b/streams/src/main/java/org/apache/kafka/streams/kstream/KStreamBuilder.java
index 37d8921..307dcab 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/KStreamBuilder.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/KStreamBuilder.java
@@ -116,6 +116,7 @@ public class KStreamBuilder extends TopologyBuilder {
 
     /**
      * Create a {@link KTable} instance for the specified topic.
+     * Record keys of the topic should never by null, otherwise an exception will be thrown at runtime.
      * The default deserializers specified in the config are used.
      *
      * @param topic     the topic name; cannot be null
@@ -127,6 +128,7 @@ public class KStreamBuilder extends TopologyBuilder {
 
     /**
      * Create a {@link KTable} instance for the specified topic.
+     * Record keys of the topic should never by null, otherwise an exception will be thrown at runtime.
      *
      * @param keySerde   key serde used to send key-value pairs,
      *                   if not specified the default key serde defined in the configuration will be used

http://git-wip-us.apache.org/repos/asf/kafka/blob/91135ea3/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableSource.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableSource.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableSource.java
index 8010b3a..5aafc02 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableSource.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableSource.java
@@ -17,6 +17,7 @@
 
 package org.apache.kafka.streams.kstream.internals;
 
+import org.apache.kafka.streams.errors.StreamsException;
 import org.apache.kafka.streams.processor.AbstractProcessor;
 import org.apache.kafka.streams.processor.Processor;
 import org.apache.kafka.streams.processor.ProcessorContext;
@@ -54,6 +55,10 @@ public class KTableSource<K, V> implements ProcessorSupplier<K, V> {
     private class KTableSourceProcessor extends AbstractProcessor<K, V> {
         @Override
         public void process(K key, V value) {
+            // the keys should never be null
+            if (key == null)
+                throw new StreamsException("Record key for the source KTable from topic " + topic + " should not be null.");
+
             context().forward(key, new Change<>(value, null));
         }
     }
@@ -71,11 +76,14 @@ public class KTableSource<K, V> implements ProcessorSupplier<K, V> {
 
         @Override
         public void process(K key, V value) {
+            // the keys should never be null
+            if (key == null)
+                throw new StreamsException("Record key for the source KTable from topic " + topic + " should not be null.");
+
             V oldValue = sendOldValues ? store.get(key) : null;
             store.put(key, value);
 
             context().forward(key, new Change<>(value, oldValue));
         }
     }
-
 }