You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by gw...@apache.org on 2016/04/12 01:09:28 UTC

[12/50] [abbrv] kafka git commit: MINOR: add null check for aggregate and reduce operators

MINOR: add null check for aggregate and reduce operators

Author: Guozhang Wang <wa...@gmail.com>

Reviewers: Yasuhiro Matsuda, Gwen Shapira

Closes #1175 from guozhangwang/KSNullPointerException


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/ae939467
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/ae939467
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/ae939467

Branch: refs/heads/0.10.0
Commit: ae939467e8aec38f47e2474e74e7ab7ea29c2840
Parents: 77142f6
Author: Guozhang Wang <wa...@gmail.com>
Authored: Fri Apr 1 13:14:47 2016 -0700
Committer: Gwen Shapira <cs...@gmail.com>
Committed: Fri Apr 1 13:14:47 2016 -0700

----------------------------------------------------------------------
 .../kafka/streams/kstream/internals/KStreamAggregate.java       | 5 +++++
 .../apache/kafka/streams/kstream/internals/KStreamReduce.java   | 5 +++++
 .../apache/kafka/streams/kstream/internals/KTableAggregate.java | 5 +++++
 .../apache/kafka/streams/kstream/internals/KTableReduce.java    | 5 +++++
 4 files changed, 20 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/ae939467/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamAggregate.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamAggregate.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamAggregate.java
index f41bfa6..871a12d 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamAggregate.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamAggregate.java
@@ -17,6 +17,7 @@
 
 package org.apache.kafka.streams.kstream.internals;
 
+import org.apache.kafka.streams.errors.StreamsException;
 import org.apache.kafka.streams.kstream.Aggregator;
 import org.apache.kafka.streams.kstream.Initializer;
 import org.apache.kafka.streams.processor.AbstractProcessor;
@@ -62,6 +63,10 @@ public class KStreamAggregate<K, V, T> implements KStreamAggProcessorSupplier<K,
 
         @Override
         public void process(K key, V value) {
+            // the keys should never be null
+            if (key == null)
+                throw new StreamsException("Record key for KStream aggregate operator with state " + storeName + " should not be null.");
+
             T oldAgg = store.get(key);
 
             if (oldAgg == null)

http://git-wip-us.apache.org/repos/asf/kafka/blob/ae939467/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamReduce.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamReduce.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamReduce.java
index 0ec0465..e37fe34 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamReduce.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamReduce.java
@@ -17,6 +17,7 @@
 
 package org.apache.kafka.streams.kstream.internals;
 
+import org.apache.kafka.streams.errors.StreamsException;
 import org.apache.kafka.streams.kstream.Reducer;
 import org.apache.kafka.streams.processor.AbstractProcessor;
 import org.apache.kafka.streams.processor.Processor;
@@ -59,6 +60,10 @@ public class KStreamReduce<K, V> implements KStreamAggProcessorSupplier<K, K, V,
 
         @Override
         public void process(K key, V value) {
+            // the keys should never be null
+            if (key == null)
+                throw new StreamsException("Record key for KStream reduce operator with state " + storeName + " should not be null.");
+
             V oldAgg = store.get(key);
             V newAgg = oldAgg;
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/ae939467/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableAggregate.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableAggregate.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableAggregate.java
index 6ce776a..806c6e9 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableAggregate.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableAggregate.java
@@ -17,6 +17,7 @@
 
 package org.apache.kafka.streams.kstream.internals;
 
+import org.apache.kafka.streams.errors.StreamsException;
 import org.apache.kafka.streams.kstream.Aggregator;
 import org.apache.kafka.streams.kstream.Initializer;
 import org.apache.kafka.streams.processor.AbstractProcessor;
@@ -64,6 +65,10 @@ public class KTableAggregate<K, V, T> implements KTableProcessorSupplier<K, V, T
 
         @Override
         public void process(K key, Change<V> value) {
+            // the keys should never be null
+            if (key == null)
+                throw new StreamsException("Record key for KTable aggregate operator with state " + storeName + " should not be null.");
+
             T oldAgg = store.get(key);
 
             if (oldAgg == null)

http://git-wip-us.apache.org/repos/asf/kafka/blob/ae939467/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableReduce.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableReduce.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableReduce.java
index 0d1b55a..d56b3ae 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableReduce.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableReduce.java
@@ -17,6 +17,7 @@
 
 package org.apache.kafka.streams.kstream.internals;
 
+import org.apache.kafka.streams.errors.StreamsException;
 import org.apache.kafka.streams.kstream.Reducer;
 import org.apache.kafka.streams.processor.AbstractProcessor;
 import org.apache.kafka.streams.processor.Processor;
@@ -61,6 +62,10 @@ public class KTableReduce<K, V> implements KTableProcessorSupplier<K, V, V> {
 
         @Override
         public void process(K key, Change<V> value) {
+            // the keys should never be null
+            if (key == null)
+                throw new StreamsException("Record key for KTable reduce operator with state " + storeName + " should not be null.");
+
             V oldAgg = store.get(key);
             V newAgg = oldAgg;