You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by gw...@apache.org on 2016/04/12 01:09:28 UTC
[12/50] [abbrv] kafka git commit: MINOR: add null check for aggregate
and reduce operators
MINOR: add null check for aggregate and reduce operators
Author: Guozhang Wang <wa...@gmail.com>
Reviewers: Yasuhiro Matsuda, Gwen Shapira
Closes #1175 from guozhangwang/KSNullPointerException
Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/ae939467
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/ae939467
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/ae939467
Branch: refs/heads/0.10.0
Commit: ae939467e8aec38f47e2474e74e7ab7ea29c2840
Parents: 77142f6
Author: Guozhang Wang <wa...@gmail.com>
Authored: Fri Apr 1 13:14:47 2016 -0700
Committer: Gwen Shapira <cs...@gmail.com>
Committed: Fri Apr 1 13:14:47 2016 -0700
----------------------------------------------------------------------
.../kafka/streams/kstream/internals/KStreamAggregate.java | 5 +++++
.../apache/kafka/streams/kstream/internals/KStreamReduce.java | 5 +++++
.../apache/kafka/streams/kstream/internals/KTableAggregate.java | 5 +++++
.../apache/kafka/streams/kstream/internals/KTableReduce.java | 5 +++++
4 files changed, 20 insertions(+)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/kafka/blob/ae939467/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamAggregate.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamAggregate.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamAggregate.java
index f41bfa6..871a12d 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamAggregate.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamAggregate.java
@@ -17,6 +17,7 @@
package org.apache.kafka.streams.kstream.internals;
+import org.apache.kafka.streams.errors.StreamsException;
import org.apache.kafka.streams.kstream.Aggregator;
import org.apache.kafka.streams.kstream.Initializer;
import org.apache.kafka.streams.processor.AbstractProcessor;
@@ -62,6 +63,10 @@ public class KStreamAggregate<K, V, T> implements KStreamAggProcessorSupplier<K,
@Override
public void process(K key, V value) {
+ // the keys should never be null
+ if (key == null)
+ throw new StreamsException("Record key for KStream aggregate operator with state " + storeName + " should not be null.");
+
T oldAgg = store.get(key);
if (oldAgg == null)
http://git-wip-us.apache.org/repos/asf/kafka/blob/ae939467/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamReduce.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamReduce.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamReduce.java
index 0ec0465..e37fe34 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamReduce.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamReduce.java
@@ -17,6 +17,7 @@
package org.apache.kafka.streams.kstream.internals;
+import org.apache.kafka.streams.errors.StreamsException;
import org.apache.kafka.streams.kstream.Reducer;
import org.apache.kafka.streams.processor.AbstractProcessor;
import org.apache.kafka.streams.processor.Processor;
@@ -59,6 +60,10 @@ public class KStreamReduce<K, V> implements KStreamAggProcessorSupplier<K, K, V,
@Override
public void process(K key, V value) {
+ // the keys should never be null
+ if (key == null)
+ throw new StreamsException("Record key for KStream reduce operator with state " + storeName + " should not be null.");
+
V oldAgg = store.get(key);
V newAgg = oldAgg;
http://git-wip-us.apache.org/repos/asf/kafka/blob/ae939467/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableAggregate.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableAggregate.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableAggregate.java
index 6ce776a..806c6e9 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableAggregate.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableAggregate.java
@@ -17,6 +17,7 @@
package org.apache.kafka.streams.kstream.internals;
+import org.apache.kafka.streams.errors.StreamsException;
import org.apache.kafka.streams.kstream.Aggregator;
import org.apache.kafka.streams.kstream.Initializer;
import org.apache.kafka.streams.processor.AbstractProcessor;
@@ -64,6 +65,10 @@ public class KTableAggregate<K, V, T> implements KTableProcessorSupplier<K, V, T
@Override
public void process(K key, Change<V> value) {
+ // the keys should never be null
+ if (key == null)
+ throw new StreamsException("Record key for KTable aggregate operator with state " + storeName + " should not be null.");
+
T oldAgg = store.get(key);
if (oldAgg == null)
http://git-wip-us.apache.org/repos/asf/kafka/blob/ae939467/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableReduce.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableReduce.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableReduce.java
index 0d1b55a..d56b3ae 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableReduce.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableReduce.java
@@ -17,6 +17,7 @@
package org.apache.kafka.streams.kstream.internals;
+import org.apache.kafka.streams.errors.StreamsException;
import org.apache.kafka.streams.kstream.Reducer;
import org.apache.kafka.streams.processor.AbstractProcessor;
import org.apache.kafka.streams.processor.Processor;
@@ -61,6 +62,10 @@ public class KTableReduce<K, V> implements KTableProcessorSupplier<K, V, V> {
@Override
public void process(K key, Change<V> value) {
+ // the keys should never be null
+ if (key == null)
+ throw new StreamsException("Record key for KTable reduce operator with state " + storeName + " should not be null.");
+
V oldAgg = store.get(key);
V newAgg = oldAgg;