You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by gw...@apache.org on 2016/04/12 01:10:00 UTC

[44/50] [abbrv] kafka git commit: KAFKA-3521: validate null keys in Streams DSL implementations

KAFKA-3521: validate null keys in Streams DSL implementations

Author: Guozhang Wang <wa...@gmail.com>

Reviewers: Ewen Cheslack-Postava <ew...@confluent.io>

Closes #1197 from guozhangwang/K3521


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/8c595657
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/8c595657
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/8c595657

Branch: refs/heads/0.10.0
Commit: 8c59565761a42984335294683c3501df8427ce62
Parents: 2a8fa28
Author: Guozhang Wang <wa...@gmail.com>
Authored: Fri Apr 8 13:30:46 2016 -0700
Committer: Ewen Cheslack-Postava <me...@ewencp.org>
Committed: Fri Apr 8 13:30:46 2016 -0700

----------------------------------------------------------------------
 .../kstream/internals/ChangedSerializer.java    | 16 +++++++++++++++-
 .../kstream/internals/KStreamJoinWindow.java    |  8 ++++++--
 .../kstream/internals/KStreamKStreamJoin.java   |  5 +++++
 .../internals/KStreamKTableLeftJoin.java        |  6 +++++-
 .../kstream/internals/KStreamTransform.java     | 10 +++++-----
 .../internals/KStreamWindowAggregate.java       |  5 +++++
 .../kstream/internals/KStreamWindowReduce.java  |  5 +++++
 .../kstream/internals/KTableKTableJoin.java     |  5 +++++
 .../kstream/internals/KTableKTableLeftJoin.java |  5 +++++
 .../internals/KTableKTableOuterJoin.java        |  5 +++++
 .../internals/KTableKTableRightJoin.java        |  5 +++++
 .../kstream/internals/KTableRepartitionMap.java |  5 +++++
 .../processor/internals/StandbyContextImpl.java | 20 ++++++++++----------
 13 files changed, 81 insertions(+), 19 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/8c595657/streams/src/main/java/org/apache/kafka/streams/kstream/internals/ChangedSerializer.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/ChangedSerializer.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/ChangedSerializer.java
index e9b7cad..5ea0791 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/ChangedSerializer.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/ChangedSerializer.java
@@ -18,6 +18,7 @@
 package org.apache.kafka.streams.kstream.internals;
 
 import org.apache.kafka.common.serialization.Serializer;
+import org.apache.kafka.streams.errors.StreamsException;
 
 import java.nio.ByteBuffer;
 import java.util.Map;
@@ -39,8 +40,21 @@ public class ChangedSerializer<T> implements Serializer<Change<T>> {
 
     @Override
     public byte[] serialize(String topic, Change<T> data) {
+        byte[] serializedKey;
+
         // only one of the old / new values would be not null
-        byte[] serializedKey = inner.serialize(topic, data.newValue != null ? data.newValue : data.oldValue);
+        if (data.newValue != null) {
+            if (data.oldValue != null)
+                throw new StreamsException("Both old and new values are not null (" + data.oldValue
+                        + " : " + data.newValue + ") in ChangeSerializer, which is not allowed.");
+
+            serializedKey = inner.serialize(topic, data.newValue);
+        } else {
+            if (data.oldValue == null)
+                throw new StreamsException("Both old and new values are null in ChangeSerializer, which is not allowed.");
+
+            serializedKey = inner.serialize(topic, data.oldValue);
+        }
 
         ByteBuffer buf = ByteBuffer.allocate(serializedKey.length + NEWFLAG_SIZE);
         buf.put(serializedKey);

http://git-wip-us.apache.org/repos/asf/kafka/blob/8c595657/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamJoinWindow.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamJoinWindow.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamJoinWindow.java
index 5b83b28..94e0b88 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamJoinWindow.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamJoinWindow.java
@@ -55,8 +55,12 @@ class KStreamJoinWindow<K, V> implements ProcessorSupplier<K, V> {
 
         @Override
         public void process(K key, V value) {
-            context().forward(key, value);
-            window.put(key, value);
+            // if the key is null, we do not need to put the record into window store
+            // since it will never be considered for join operations
+            if (key != null) {
+                context().forward(key, value);
+                window.put(key, value);
+            }
         }
     }
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/8c595657/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamJoin.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamJoin.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamJoin.java
index a4ac9b3..d8caf3c 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamJoin.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamJoin.java
@@ -18,6 +18,7 @@
 package org.apache.kafka.streams.kstream.internals;
 
 import org.apache.kafka.streams.KeyValue;
+import org.apache.kafka.streams.errors.StreamsException;
 import org.apache.kafka.streams.kstream.ValueJoiner;
 import org.apache.kafka.streams.processor.AbstractProcessor;
 import org.apache.kafka.streams.processor.Processor;
@@ -63,6 +64,10 @@ class KStreamKStreamJoin<K, R, V1, V2> implements ProcessorSupplier<K, V1> {
 
         @Override
         public void process(K key, V1 value) {
+            // the keys should never be null
+            if (key == null)
+                throw new StreamsException("Record key for KStream-KStream join operator with other window state store " + otherWindowName + " should not be null.");
+
             boolean needOuterJoin = KStreamKStreamJoin.this.outer;
 
             long timeFrom = Math.max(0L, context().timestamp() - joinBeforeMs);

http://git-wip-us.apache.org/repos/asf/kafka/blob/8c595657/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamKTableLeftJoin.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamKTableLeftJoin.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamKTableLeftJoin.java
index dfca019..92b9b59 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamKTableLeftJoin.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamKTableLeftJoin.java
@@ -55,7 +55,11 @@ class KStreamKTableLeftJoin<K, R, V1, V2> implements ProcessorSupplier<K, V1> {
 
         @Override
         public void process(K key, V1 value) {
-            context().forward(key, joiner.apply(value, valueGetter.get(key)));
+            // if the key is null, we do not need proceed joining
+            // the record with the table
+            if (key != null) {
+                context().forward(key, joiner.apply(value, valueGetter.get(key)));
+            }
         }
     }
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/8c595657/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamTransform.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamTransform.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamTransform.java
index 4299c66..09dddfe 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamTransform.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamTransform.java
@@ -20,6 +20,7 @@ package org.apache.kafka.streams.kstream.internals;
 import org.apache.kafka.streams.KeyValue;
 import org.apache.kafka.streams.kstream.Transformer;
 import org.apache.kafka.streams.kstream.TransformerSupplier;
+import org.apache.kafka.streams.processor.AbstractProcessor;
 import org.apache.kafka.streams.processor.Processor;
 import org.apache.kafka.streams.processor.ProcessorContext;
 import org.apache.kafka.streams.processor.ProcessorSupplier;
@@ -34,13 +35,12 @@ public class KStreamTransform<K, V, K1, V1> implements ProcessorSupplier<K, V> {
 
     @Override
     public Processor<K, V> get() {
-        return new KStreamTransformProcessor(transformerSupplier.get());
+        return new KStreamTransformProcessor<>(transformerSupplier.get());
     }
 
-    public static class KStreamTransformProcessor<K1, V1, K2, V2> implements Processor<K1, V1> {
+    public static class KStreamTransformProcessor<K1, V1, K2, V2> extends AbstractProcessor<K1, V1> {
 
         private final Transformer<K1, V1, KeyValue<K2, V2>> transformer;
-        private ProcessorContext context;
 
         public KStreamTransformProcessor(Transformer<K1, V1, KeyValue<K2, V2>> transformer) {
             this.transformer = transformer;
@@ -48,14 +48,14 @@ public class KStreamTransform<K, V, K1, V1> implements ProcessorSupplier<K, V> {
 
         @Override
         public void init(ProcessorContext context) {
+            super.init(context);
             transformer.init(context);
-            this.context = context;
         }
 
         @Override
         public void process(K1 key, V1 value) {
             KeyValue<K2, V2> pair = transformer.transform(key, value);
-            context.forward(pair.key, pair.value);
+            context().forward(pair.key, pair.value);
         }
 
         @Override

http://git-wip-us.apache.org/repos/asf/kafka/blob/8c595657/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamWindowAggregate.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamWindowAggregate.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamWindowAggregate.java
index 76964f9..f36cc8c 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamWindowAggregate.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamWindowAggregate.java
@@ -72,6 +72,11 @@ public class KStreamWindowAggregate<K, V, T, W extends Window> implements KStrea
 
         @Override
         public void process(K key, V value) {
+            // if the key is null, we do not need proceed aggregating the record
+            // the record with the table
+            if (key == null)
+                return;
+
             // first get the matching windows
             long timestamp = context().timestamp();
             Map<Long, W> matchedWindows = windows.windowsFor(timestamp);

http://git-wip-us.apache.org/repos/asf/kafka/blob/8c595657/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamWindowReduce.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamWindowReduce.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamWindowReduce.java
index d532e79..6c05ce3 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamWindowReduce.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamWindowReduce.java
@@ -69,6 +69,11 @@ public class KStreamWindowReduce<K, V, W extends Window> implements KStreamAggPr
 
         @Override
         public void process(K key, V value) {
+            // if the key is null, we do not need proceed aggregating the record
+            // the record with the table
+            if (key == null)
+                return;
+
             // first get the matching windows
             long timestamp = context().timestamp();
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/8c595657/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableKTableJoin.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableKTableJoin.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableKTableJoin.java
index 6eb27b6..24c8da6 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableKTableJoin.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableKTableJoin.java
@@ -17,6 +17,7 @@
 
 package org.apache.kafka.streams.kstream.internals;
 
+import org.apache.kafka.streams.errors.StreamsException;
 import org.apache.kafka.streams.kstream.ValueJoiner;
 import org.apache.kafka.streams.processor.AbstractProcessor;
 import org.apache.kafka.streams.processor.Processor;
@@ -61,6 +62,10 @@ class KTableKTableJoin<K, R, V1, V2> extends KTableKTableAbstractJoin<K, R, V1,
 
         @Override
         public void process(K key, Change<V1> change) {
+            // the keys should never be null
+            if (key == null)
+                throw new StreamsException("Record key for KTable join operator should not be null.");
+
             R newValue = null;
             R oldValue = null;
             V2 value2 = null;

http://git-wip-us.apache.org/repos/asf/kafka/blob/8c595657/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableKTableLeftJoin.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableKTableLeftJoin.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableKTableLeftJoin.java
index 00e872e..4bf45ed 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableKTableLeftJoin.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableKTableLeftJoin.java
@@ -17,6 +17,7 @@
 
 package org.apache.kafka.streams.kstream.internals;
 
+import org.apache.kafka.streams.errors.StreamsException;
 import org.apache.kafka.streams.kstream.ValueJoiner;
 import org.apache.kafka.streams.processor.AbstractProcessor;
 import org.apache.kafka.streams.processor.Processor;
@@ -61,6 +62,10 @@ class KTableKTableLeftJoin<K, R, V1, V2> extends KTableKTableAbstractJoin<K, R,
 
         @Override
         public void process(K key, Change<V1> change) {
+            // the keys should never be null
+            if (key == null)
+                throw new StreamsException("Record key for KTable left-join operator should not be null.");
+
             R newValue = null;
             R oldValue = null;
             V2 value2 = null;

http://git-wip-us.apache.org/repos/asf/kafka/blob/8c595657/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableKTableOuterJoin.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableKTableOuterJoin.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableKTableOuterJoin.java
index 6ab0ae9..49eed53 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableKTableOuterJoin.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableKTableOuterJoin.java
@@ -17,6 +17,7 @@
 
 package org.apache.kafka.streams.kstream.internals;
 
+import org.apache.kafka.streams.errors.StreamsException;
 import org.apache.kafka.streams.kstream.ValueJoiner;
 import org.apache.kafka.streams.processor.AbstractProcessor;
 import org.apache.kafka.streams.processor.Processor;
@@ -61,6 +62,10 @@ class KTableKTableOuterJoin<K, R, V1, V2> extends KTableKTableAbstractJoin<K, R,
 
         @Override
         public void process(K key, Change<V1> change) {
+            // the keys should never be null
+            if (key == null)
+                throw new StreamsException("Record key for KTable outer-join operator should not be null.");
+
             R newValue = null;
             R oldValue = null;
             V2 value2 = valueGetter.get(key);

http://git-wip-us.apache.org/repos/asf/kafka/blob/8c595657/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableKTableRightJoin.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableKTableRightJoin.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableKTableRightJoin.java
index a6a13fc..7443d4a 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableKTableRightJoin.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableKTableRightJoin.java
@@ -17,6 +17,7 @@
 
 package org.apache.kafka.streams.kstream.internals;
 
+import org.apache.kafka.streams.errors.StreamsException;
 import org.apache.kafka.streams.kstream.ValueJoiner;
 import org.apache.kafka.streams.processor.AbstractProcessor;
 import org.apache.kafka.streams.processor.Processor;
@@ -62,6 +63,10 @@ class KTableKTableRightJoin<K, R, V1, V2> extends KTableKTableAbstractJoin<K, R,
 
         @Override
         public void process(K key, Change<V1> change) {
+            // the keys should never be null
+            if (key == null)
+                throw new StreamsException("Record key for KTable right-join operator should not be null.");
+
             R newValue = null;
             R oldValue = null;
             V2 value2 = valueGetter.get(key);

http://git-wip-us.apache.org/repos/asf/kafka/blob/8c595657/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableRepartitionMap.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableRepartitionMap.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableRepartitionMap.java
index ff69c37..142a279 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableRepartitionMap.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableRepartitionMap.java
@@ -18,6 +18,7 @@
 package org.apache.kafka.streams.kstream.internals;
 
 import org.apache.kafka.streams.KeyValue;
+import org.apache.kafka.streams.errors.StreamsException;
 import org.apache.kafka.streams.kstream.KeyValueMapper;
 import org.apache.kafka.streams.processor.AbstractProcessor;
 import org.apache.kafka.streams.processor.Processor;
@@ -77,6 +78,10 @@ public class KTableRepartitionMap<K, V, K1, V1> implements KTableProcessorSuppli
         public void process(K key, Change<V> change) {
             KeyValue<K1, V1> newPair = computeValue(key, change.newValue);
 
+            // the selected repartition key should never be null
+            if (newPair.key == null)
+                throw new StreamsException("Record key for KTable repartition operator should not be null.");
+
             context().forward(newPair.key, new Change<>(newPair.value, null));
 
             if (change.oldValue != null) {

http://git-wip-us.apache.org/repos/asf/kafka/blob/8c595657/streams/src/main/java/org/apache/kafka/streams/processor/internals/StandbyContextImpl.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StandbyContextImpl.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StandbyContextImpl.java
index ea008b8..468fe74 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StandbyContextImpl.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StandbyContextImpl.java
@@ -108,51 +108,51 @@ public class StandbyContextImpl implements ProcessorContext, RecordCollector.Sup
 
     @Override
     public StateStore getStateStore(String name) {
-        throw new UnsupportedOperationException("getStateStore() not supported.");
+        throw new UnsupportedOperationException("this should not happen: getStateStore() not supported in standby tasks.");
     }
 
     @Override
     public String topic() {
-        throw new UnsupportedOperationException("topic() not supported.");
+        throw new UnsupportedOperationException("this should not happen: topic() not supported in standby tasks.");
     }
 
     @Override
     public int partition() {
-        throw new UnsupportedOperationException("partition() not supported.");
+        throw new UnsupportedOperationException("this should not happen: partition() not supported in standby tasks.");
     }
 
     @Override
     public long offset() {
-        throw new UnsupportedOperationException("offset() not supported.");
+        throw new UnsupportedOperationException("this should not happen: offset() not supported in standby tasks.");
     }
 
     @Override
     public long timestamp() {
-        throw new UnsupportedOperationException("timestamp() not supported.");
+        throw new UnsupportedOperationException("this should not happen: timestamp() not supported in standby tasks.");
     }
 
     @Override
     public <K, V> void forward(K key, V value) {
-        throw new UnsupportedOperationException("forward() not supported.");
+        throw new UnsupportedOperationException("this should not happen: forward() not supported in standby tasks.");
     }
 
     @Override
     public <K, V> void forward(K key, V value, int childIndex) {
-        throw new UnsupportedOperationException("forward() not supported.");
+        throw new UnsupportedOperationException("this should not happen: forward() not supported in standby tasks.");
     }
 
     @Override
     public <K, V> void forward(K key, V value, String childName) {
-        throw new UnsupportedOperationException();
+        throw new UnsupportedOperationException("this should not happen: forward() not supported in standby tasks.");
     }
 
     @Override
     public void commit() {
-        throw new UnsupportedOperationException("commit() not supported.");
+        throw new UnsupportedOperationException("this should not happen: commit() not supported in standby tasks.");
     }
 
     @Override
     public void schedule(long interval) {
-        throw new UnsupportedOperationException("schedule() not supported.");
+        throw new UnsupportedOperationException("this should not happen: schedule() not supported in standby tasks.");
     }
 }