You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by gw...@apache.org on 2016/04/12 01:10:00 UTC
[44/50] [abbrv] kafka git commit: KAFKA-3521: validate null keys in
Streams DSL implementations
KAFKA-3521: validate null keys in Streams DSL implementations
Author: Guozhang Wang <wa...@gmail.com>
Reviewers: Ewen Cheslack-Postava <ew...@confluent.io>
Closes #1197 from guozhangwang/K3521
Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/8c595657
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/8c595657
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/8c595657
Branch: refs/heads/0.10.0
Commit: 8c59565761a42984335294683c3501df8427ce62
Parents: 2a8fa28
Author: Guozhang Wang <wa...@gmail.com>
Authored: Fri Apr 8 13:30:46 2016 -0700
Committer: Ewen Cheslack-Postava <me...@ewencp.org>
Committed: Fri Apr 8 13:30:46 2016 -0700
----------------------------------------------------------------------
.../kstream/internals/ChangedSerializer.java | 16 +++++++++++++++-
.../kstream/internals/KStreamJoinWindow.java | 8 ++++++--
.../kstream/internals/KStreamKStreamJoin.java | 5 +++++
.../internals/KStreamKTableLeftJoin.java | 6 +++++-
.../kstream/internals/KStreamTransform.java | 10 +++++-----
.../internals/KStreamWindowAggregate.java | 5 +++++
.../kstream/internals/KStreamWindowReduce.java | 5 +++++
.../kstream/internals/KTableKTableJoin.java | 5 +++++
.../kstream/internals/KTableKTableLeftJoin.java | 5 +++++
.../internals/KTableKTableOuterJoin.java | 5 +++++
.../internals/KTableKTableRightJoin.java | 5 +++++
.../kstream/internals/KTableRepartitionMap.java | 5 +++++
.../processor/internals/StandbyContextImpl.java | 20 ++++++++++----------
13 files changed, 81 insertions(+), 19 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/kafka/blob/8c595657/streams/src/main/java/org/apache/kafka/streams/kstream/internals/ChangedSerializer.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/ChangedSerializer.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/ChangedSerializer.java
index e9b7cad..5ea0791 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/ChangedSerializer.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/ChangedSerializer.java
@@ -18,6 +18,7 @@
package org.apache.kafka.streams.kstream.internals;
import org.apache.kafka.common.serialization.Serializer;
+import org.apache.kafka.streams.errors.StreamsException;
import java.nio.ByteBuffer;
import java.util.Map;
@@ -39,8 +40,21 @@ public class ChangedSerializer<T> implements Serializer<Change<T>> {
@Override
public byte[] serialize(String topic, Change<T> data) {
+ byte[] serializedKey;
+
// only one of the old / new values would be not null
- byte[] serializedKey = inner.serialize(topic, data.newValue != null ? data.newValue : data.oldValue);
+ if (data.newValue != null) {
+ if (data.oldValue != null)
+ throw new StreamsException("Both old and new values are not null (" + data.oldValue
+ + " : " + data.newValue + ") in ChangeSerializer, which is not allowed.");
+
+ serializedKey = inner.serialize(topic, data.newValue);
+ } else {
+ if (data.oldValue == null)
+ throw new StreamsException("Both old and new values are null in ChangeSerializer, which is not allowed.");
+
+ serializedKey = inner.serialize(topic, data.oldValue);
+ }
ByteBuffer buf = ByteBuffer.allocate(serializedKey.length + NEWFLAG_SIZE);
buf.put(serializedKey);
http://git-wip-us.apache.org/repos/asf/kafka/blob/8c595657/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamJoinWindow.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamJoinWindow.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamJoinWindow.java
index 5b83b28..94e0b88 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamJoinWindow.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamJoinWindow.java
@@ -55,8 +55,12 @@ class KStreamJoinWindow<K, V> implements ProcessorSupplier<K, V> {
@Override
public void process(K key, V value) {
- context().forward(key, value);
- window.put(key, value);
+ // if the key is null, we do not need to put the record into window store
+ // since it will never be considered for join operations
+ if (key != null) {
+ context().forward(key, value);
+ window.put(key, value);
+ }
}
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/8c595657/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamJoin.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamJoin.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamJoin.java
index a4ac9b3..d8caf3c 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamJoin.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamJoin.java
@@ -18,6 +18,7 @@
package org.apache.kafka.streams.kstream.internals;
import org.apache.kafka.streams.KeyValue;
+import org.apache.kafka.streams.errors.StreamsException;
import org.apache.kafka.streams.kstream.ValueJoiner;
import org.apache.kafka.streams.processor.AbstractProcessor;
import org.apache.kafka.streams.processor.Processor;
@@ -63,6 +64,10 @@ class KStreamKStreamJoin<K, R, V1, V2> implements ProcessorSupplier<K, V1> {
@Override
public void process(K key, V1 value) {
+ // the keys should never be null
+ if (key == null)
+ throw new StreamsException("Record key for KStream-KStream join operator with other window state store " + otherWindowName + " should not be null.");
+
boolean needOuterJoin = KStreamKStreamJoin.this.outer;
long timeFrom = Math.max(0L, context().timestamp() - joinBeforeMs);
http://git-wip-us.apache.org/repos/asf/kafka/blob/8c595657/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamKTableLeftJoin.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamKTableLeftJoin.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamKTableLeftJoin.java
index dfca019..92b9b59 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamKTableLeftJoin.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamKTableLeftJoin.java
@@ -55,7 +55,11 @@ class KStreamKTableLeftJoin<K, R, V1, V2> implements ProcessorSupplier<K, V1> {
@Override
public void process(K key, V1 value) {
- context().forward(key, joiner.apply(value, valueGetter.get(key)));
+ // if the key is null, we do not need proceed joining
+ // the record with the table
+ if (key != null) {
+ context().forward(key, joiner.apply(value, valueGetter.get(key)));
+ }
}
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/8c595657/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamTransform.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamTransform.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamTransform.java
index 4299c66..09dddfe 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamTransform.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamTransform.java
@@ -20,6 +20,7 @@ package org.apache.kafka.streams.kstream.internals;
import org.apache.kafka.streams.KeyValue;
import org.apache.kafka.streams.kstream.Transformer;
import org.apache.kafka.streams.kstream.TransformerSupplier;
+import org.apache.kafka.streams.processor.AbstractProcessor;
import org.apache.kafka.streams.processor.Processor;
import org.apache.kafka.streams.processor.ProcessorContext;
import org.apache.kafka.streams.processor.ProcessorSupplier;
@@ -34,13 +35,12 @@ public class KStreamTransform<K, V, K1, V1> implements ProcessorSupplier<K, V> {
@Override
public Processor<K, V> get() {
- return new KStreamTransformProcessor(transformerSupplier.get());
+ return new KStreamTransformProcessor<>(transformerSupplier.get());
}
- public static class KStreamTransformProcessor<K1, V1, K2, V2> implements Processor<K1, V1> {
+ public static class KStreamTransformProcessor<K1, V1, K2, V2> extends AbstractProcessor<K1, V1> {
private final Transformer<K1, V1, KeyValue<K2, V2>> transformer;
- private ProcessorContext context;
public KStreamTransformProcessor(Transformer<K1, V1, KeyValue<K2, V2>> transformer) {
this.transformer = transformer;
@@ -48,14 +48,14 @@ public class KStreamTransform<K, V, K1, V1> implements ProcessorSupplier<K, V> {
@Override
public void init(ProcessorContext context) {
+ super.init(context);
transformer.init(context);
- this.context = context;
}
@Override
public void process(K1 key, V1 value) {
KeyValue<K2, V2> pair = transformer.transform(key, value);
- context.forward(pair.key, pair.value);
+ context().forward(pair.key, pair.value);
}
@Override
http://git-wip-us.apache.org/repos/asf/kafka/blob/8c595657/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamWindowAggregate.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamWindowAggregate.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamWindowAggregate.java
index 76964f9..f36cc8c 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamWindowAggregate.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamWindowAggregate.java
@@ -72,6 +72,11 @@ public class KStreamWindowAggregate<K, V, T, W extends Window> implements KStrea
@Override
public void process(K key, V value) {
+ // if the key is null, we do not need proceed aggregating the record
+ // the record with the table
+ if (key == null)
+ return;
+
// first get the matching windows
long timestamp = context().timestamp();
Map<Long, W> matchedWindows = windows.windowsFor(timestamp);
http://git-wip-us.apache.org/repos/asf/kafka/blob/8c595657/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamWindowReduce.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamWindowReduce.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamWindowReduce.java
index d532e79..6c05ce3 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamWindowReduce.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamWindowReduce.java
@@ -69,6 +69,11 @@ public class KStreamWindowReduce<K, V, W extends Window> implements KStreamAggPr
@Override
public void process(K key, V value) {
+ // if the key is null, we do not need proceed aggregating the record
+ // the record with the table
+ if (key == null)
+ return;
+
// first get the matching windows
long timestamp = context().timestamp();
http://git-wip-us.apache.org/repos/asf/kafka/blob/8c595657/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableKTableJoin.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableKTableJoin.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableKTableJoin.java
index 6eb27b6..24c8da6 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableKTableJoin.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableKTableJoin.java
@@ -17,6 +17,7 @@
package org.apache.kafka.streams.kstream.internals;
+import org.apache.kafka.streams.errors.StreamsException;
import org.apache.kafka.streams.kstream.ValueJoiner;
import org.apache.kafka.streams.processor.AbstractProcessor;
import org.apache.kafka.streams.processor.Processor;
@@ -61,6 +62,10 @@ class KTableKTableJoin<K, R, V1, V2> extends KTableKTableAbstractJoin<K, R, V1,
@Override
public void process(K key, Change<V1> change) {
+ // the keys should never be null
+ if (key == null)
+ throw new StreamsException("Record key for KTable join operator should not be null.");
+
R newValue = null;
R oldValue = null;
V2 value2 = null;
http://git-wip-us.apache.org/repos/asf/kafka/blob/8c595657/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableKTableLeftJoin.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableKTableLeftJoin.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableKTableLeftJoin.java
index 00e872e..4bf45ed 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableKTableLeftJoin.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableKTableLeftJoin.java
@@ -17,6 +17,7 @@
package org.apache.kafka.streams.kstream.internals;
+import org.apache.kafka.streams.errors.StreamsException;
import org.apache.kafka.streams.kstream.ValueJoiner;
import org.apache.kafka.streams.processor.AbstractProcessor;
import org.apache.kafka.streams.processor.Processor;
@@ -61,6 +62,10 @@ class KTableKTableLeftJoin<K, R, V1, V2> extends KTableKTableAbstractJoin<K, R,
@Override
public void process(K key, Change<V1> change) {
+ // the keys should never be null
+ if (key == null)
+ throw new StreamsException("Record key for KTable left-join operator should not be null.");
+
R newValue = null;
R oldValue = null;
V2 value2 = null;
http://git-wip-us.apache.org/repos/asf/kafka/blob/8c595657/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableKTableOuterJoin.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableKTableOuterJoin.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableKTableOuterJoin.java
index 6ab0ae9..49eed53 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableKTableOuterJoin.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableKTableOuterJoin.java
@@ -17,6 +17,7 @@
package org.apache.kafka.streams.kstream.internals;
+import org.apache.kafka.streams.errors.StreamsException;
import org.apache.kafka.streams.kstream.ValueJoiner;
import org.apache.kafka.streams.processor.AbstractProcessor;
import org.apache.kafka.streams.processor.Processor;
@@ -61,6 +62,10 @@ class KTableKTableOuterJoin<K, R, V1, V2> extends KTableKTableAbstractJoin<K, R,
@Override
public void process(K key, Change<V1> change) {
+ // the keys should never be null
+ if (key == null)
+ throw new StreamsException("Record key for KTable outer-join operator should not be null.");
+
R newValue = null;
R oldValue = null;
V2 value2 = valueGetter.get(key);
http://git-wip-us.apache.org/repos/asf/kafka/blob/8c595657/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableKTableRightJoin.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableKTableRightJoin.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableKTableRightJoin.java
index a6a13fc..7443d4a 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableKTableRightJoin.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableKTableRightJoin.java
@@ -17,6 +17,7 @@
package org.apache.kafka.streams.kstream.internals;
+import org.apache.kafka.streams.errors.StreamsException;
import org.apache.kafka.streams.kstream.ValueJoiner;
import org.apache.kafka.streams.processor.AbstractProcessor;
import org.apache.kafka.streams.processor.Processor;
@@ -62,6 +63,10 @@ class KTableKTableRightJoin<K, R, V1, V2> extends KTableKTableAbstractJoin<K, R,
@Override
public void process(K key, Change<V1> change) {
+ // the keys should never be null
+ if (key == null)
+ throw new StreamsException("Record key for KTable right-join operator should not be null.");
+
R newValue = null;
R oldValue = null;
V2 value2 = valueGetter.get(key);
http://git-wip-us.apache.org/repos/asf/kafka/blob/8c595657/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableRepartitionMap.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableRepartitionMap.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableRepartitionMap.java
index ff69c37..142a279 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableRepartitionMap.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableRepartitionMap.java
@@ -18,6 +18,7 @@
package org.apache.kafka.streams.kstream.internals;
import org.apache.kafka.streams.KeyValue;
+import org.apache.kafka.streams.errors.StreamsException;
import org.apache.kafka.streams.kstream.KeyValueMapper;
import org.apache.kafka.streams.processor.AbstractProcessor;
import org.apache.kafka.streams.processor.Processor;
@@ -77,6 +78,10 @@ public class KTableRepartitionMap<K, V, K1, V1> implements KTableProcessorSuppli
public void process(K key, Change<V> change) {
KeyValue<K1, V1> newPair = computeValue(key, change.newValue);
+ // the selected repartition key should never be null
+ if (newPair.key == null)
+ throw new StreamsException("Record key for KTable repartition operator should not be null.");
+
context().forward(newPair.key, new Change<>(newPair.value, null));
if (change.oldValue != null) {
http://git-wip-us.apache.org/repos/asf/kafka/blob/8c595657/streams/src/main/java/org/apache/kafka/streams/processor/internals/StandbyContextImpl.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StandbyContextImpl.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StandbyContextImpl.java
index ea008b8..468fe74 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StandbyContextImpl.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StandbyContextImpl.java
@@ -108,51 +108,51 @@ public class StandbyContextImpl implements ProcessorContext, RecordCollector.Sup
@Override
public StateStore getStateStore(String name) {
- throw new UnsupportedOperationException("getStateStore() not supported.");
+ throw new UnsupportedOperationException("this should not happen: getStateStore() not supported in standby tasks.");
}
@Override
public String topic() {
- throw new UnsupportedOperationException("topic() not supported.");
+ throw new UnsupportedOperationException("this should not happen: topic() not supported in standby tasks.");
}
@Override
public int partition() {
- throw new UnsupportedOperationException("partition() not supported.");
+ throw new UnsupportedOperationException("this should not happen: partition() not supported in standby tasks.");
}
@Override
public long offset() {
- throw new UnsupportedOperationException("offset() not supported.");
+ throw new UnsupportedOperationException("this should not happen: offset() not supported in standby tasks.");
}
@Override
public long timestamp() {
- throw new UnsupportedOperationException("timestamp() not supported.");
+ throw new UnsupportedOperationException("this should not happen: timestamp() not supported in standby tasks.");
}
@Override
public <K, V> void forward(K key, V value) {
- throw new UnsupportedOperationException("forward() not supported.");
+ throw new UnsupportedOperationException("this should not happen: forward() not supported in standby tasks.");
}
@Override
public <K, V> void forward(K key, V value, int childIndex) {
- throw new UnsupportedOperationException("forward() not supported.");
+ throw new UnsupportedOperationException("this should not happen: forward() not supported in standby tasks.");
}
@Override
public <K, V> void forward(K key, V value, String childName) {
- throw new UnsupportedOperationException();
+ throw new UnsupportedOperationException("this should not happen: forward() not supported in standby tasks.");
}
@Override
public void commit() {
- throw new UnsupportedOperationException("commit() not supported.");
+ throw new UnsupportedOperationException("this should not happen: commit() not supported in standby tasks.");
}
@Override
public void schedule(long interval) {
- throw new UnsupportedOperationException("schedule() not supported.");
+ throw new UnsupportedOperationException("this should not happen: schedule() not supported in standby tasks.");
}
}