You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by gu...@apache.org on 2016/08/09 18:31:07 UTC
kafka git commit: KAFKA-3936: Validate parameters as early as possible
Repository: kafka
Updated Branches:
refs/heads/trunk 8abcece40 -> caa9bd0fc
KAFKA-3936: Validate parameters as early as possible
Added non null checks to parameters supplied via the DSL and `TopologyBuilder`
Author: Damian Guy <da...@gmail.com>
Reviewers: Edward Ribeiro <ed...@gmail.com>, Guozhang Wang <wa...@gmail.com>
Closes #1711 from dguy/kafka-3936
Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/caa9bd0f
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/caa9bd0f
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/caa9bd0f
Branch: refs/heads/trunk
Commit: caa9bd0fcd2fab4758791408e2b145532153910e
Parents: 8abcece
Author: Damian Guy <da...@gmail.com>
Authored: Tue Aug 9 11:31:04 2016 -0700
Committer: Guozhang Wang <wa...@gmail.com>
Committed: Tue Aug 9 11:31:04 2016 -0700
----------------------------------------------------------------------
.../kstream/internals/KGroupedStreamImpl.java | 13 ++
.../kstream/internals/KGroupedTableImpl.java | 8 ++
.../streams/kstream/internals/KStreamImpl.java | 26 ++++
.../streams/kstream/internals/KTableImpl.java | 15 +++
.../streams/processor/TopologyBuilder.java | 21 +++-
.../streams/kstream/KStreamBuilderTest.java | 11 ++
.../internals/KGroupedStreamImplTest.java | 100 +++++++++++++++
.../internals/KGroupedTableImplTest.java | 76 ++++++++++++
.../kstream/internals/KStreamImplTest.java | 122 +++++++++++++++++++
.../kstream/internals/KTableImplTest.java | 86 +++++++++++++
.../streams/processor/TopologyBuilderTest.java | 65 ++++++++++
.../StreamThreadStateStoreProviderTest.java | 2 +-
12 files changed, 540 insertions(+), 5 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/kafka/blob/caa9bd0f/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KGroupedStreamImpl.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KGroupedStreamImpl.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KGroupedStreamImpl.java
index 51fd116..78e6a2c 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KGroupedStreamImpl.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KGroupedStreamImpl.java
@@ -29,6 +29,7 @@ import org.apache.kafka.streams.processor.StateStoreSupplier;
import org.apache.kafka.streams.state.Stores;
import java.util.Collections;
+import java.util.Objects;
import java.util.Set;
public class KGroupedStreamImpl<K, V> extends AbstractStream<K> implements KGroupedStream<K, V> {
@@ -55,6 +56,8 @@ public class KGroupedStreamImpl<K, V> extends AbstractStream<K> implements KGrou
@Override
public KTable<K, V> reduce(final Reducer<V> reducer,
final String storeName) {
+ Objects.requireNonNull(reducer, "reducer can't be null");
+ Objects.requireNonNull(storeName, "storeName can't be null");
return doAggregate(
new KStreamReduce<K, V>(storeName, reducer),
REDUCE_NAME,
@@ -67,6 +70,9 @@ public class KGroupedStreamImpl<K, V> extends AbstractStream<K> implements KGrou
public <W extends Window> KTable<Windowed<K>, V> reduce(Reducer<V> reducer,
Windows<W> windows,
final String storeName) {
+ Objects.requireNonNull(reducer, "reducer can't be null");
+ Objects.requireNonNull(windows, "windows can't be null");
+ Objects.requireNonNull(storeName, "storeName can't be null");
return (KTable<Windowed<K>, V>) doAggregate(
new KStreamWindowReduce<K, V, W>(windows, storeName, reducer),
REDUCE_NAME,
@@ -79,6 +85,9 @@ public class KGroupedStreamImpl<K, V> extends AbstractStream<K> implements KGrou
final Aggregator<K, V, T> aggregator,
final Serde<T> aggValueSerde,
final String storeName) {
+ Objects.requireNonNull(initializer, "initializer can't be null");
+ Objects.requireNonNull(aggregator, "aggregator can't be null");
+ Objects.requireNonNull(storeName, "storeName can't be null");
return doAggregate(
new KStreamAggregate<>(storeName, initializer, aggregator),
AGGREGATE_NAME,
@@ -92,6 +101,10 @@ public class KGroupedStreamImpl<K, V> extends AbstractStream<K> implements KGrou
final Windows<W> windows,
final Serde<T> aggValueSerde,
final String storeName) {
+ Objects.requireNonNull(initializer, "initializer can't be null");
+ Objects.requireNonNull(aggregator, "aggregator can't be null");
+ Objects.requireNonNull(windows, "windows can't be null");
+ Objects.requireNonNull(storeName, "storeName can't be null");
return (KTable<Windowed<K>, T>) doAggregate(
new KStreamWindowAggregate<>(windows, storeName, initializer, aggregator),
AGGREGATE_NAME,
http://git-wip-us.apache.org/repos/asf/kafka/blob/caa9bd0f/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KGroupedTableImpl.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KGroupedTableImpl.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KGroupedTableImpl.java
index 5039c04..6d6e8cc 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KGroupedTableImpl.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KGroupedTableImpl.java
@@ -32,6 +32,7 @@ import org.apache.kafka.streams.processor.StateStoreSupplier;
import org.apache.kafka.streams.state.Stores;
import java.util.Collections;
+import java.util.Objects;
/**
* The implementation class of {@link KGroupedTable}.
@@ -65,6 +66,10 @@ public class KGroupedTableImpl<K, V> extends AbstractStream<K> implements KGroup
Serde<T> aggValueSerde,
String storeName) {
+ Objects.requireNonNull(initializer, "initializer can't be null");
+ Objects.requireNonNull(adder, "adder can't be null");
+ Objects.requireNonNull(subtractor, "subtractor can't be null");
+ Objects.requireNonNull(storeName, "storeName can't be null");
ProcessorSupplier<K, Change<V>> aggregateSupplier = new KTableAggregate<>(storeName, initializer, adder, subtractor);
return doAggregate(aggregateSupplier, aggValueSerde, AGGREGATE_NAME, storeName);
}
@@ -121,6 +126,9 @@ public class KGroupedTableImpl<K, V> extends AbstractStream<K> implements KGroup
public KTable<K, V> reduce(Reducer<V> adder,
Reducer<V> subtractor,
String storeName) {
+ Objects.requireNonNull(adder, "adder can't be null");
+ Objects.requireNonNull(subtractor, "subtractor can't be null");
+ Objects.requireNonNull(storeName, "storeName can't be null");
ProcessorSupplier<K, Change<V>> aggregateSupplier = new KTableReduce<>(storeName, adder, subtractor);
return doAggregate(aggregateSupplier, valSerde, REDUCE_NAME, storeName);
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/caa9bd0f/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamImpl.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamImpl.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamImpl.java
index 1859503..4d39b18 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamImpl.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamImpl.java
@@ -44,6 +44,7 @@ import java.io.PrintStream;
import java.lang.reflect.Array;
import java.util.Collections;
import java.util.HashSet;
+import java.util.Objects;
import java.util.Set;
public class KStreamImpl<K, V> extends AbstractStream<K> implements KStream<K, V> {
@@ -104,6 +105,7 @@ public class KStreamImpl<K, V> extends AbstractStream<K> implements KStream<K, V
@Override
public KStream<K, V> filter(Predicate<K, V> predicate) {
+ Objects.requireNonNull(predicate, "predicate can't be null");
String name = topology.newName(FILTER_NAME);
topology.addProcessor(name, new KStreamFilter<>(predicate, false), this.name);
@@ -113,6 +115,7 @@ public class KStreamImpl<K, V> extends AbstractStream<K> implements KStream<K, V
@Override
public KStream<K, V> filterNot(final Predicate<K, V> predicate) {
+ Objects.requireNonNull(predicate, "predicate can't be null");
String name = topology.newName(FILTER_NAME);
topology.addProcessor(name, new KStreamFilter<>(predicate, true), this.name);
@@ -123,6 +126,7 @@ public class KStreamImpl<K, V> extends AbstractStream<K> implements KStream<K, V
@Override
@SuppressWarnings("unchecked")
public <K1> KStream<K1, V> selectKey(final KeyValueMapper<K, V, K1> mapper) {
+ Objects.requireNonNull(mapper, "mapper can't be null");
return new KStreamImpl<>(topology, internalSelectKey(mapper), sourceNodes, true);
}
@@ -139,6 +143,7 @@ public class KStreamImpl<K, V> extends AbstractStream<K> implements KStream<K, V
@Override
public <K1, V1> KStream<K1, V1> map(KeyValueMapper<K, V, KeyValue<K1, V1>> mapper) {
+ Objects.requireNonNull(mapper, "mapper can't be null");
String name = topology.newName(MAP_NAME);
topology.addProcessor(name, new KStreamMap<>(mapper), this.name);
@@ -149,6 +154,7 @@ public class KStreamImpl<K, V> extends AbstractStream<K> implements KStream<K, V
@Override
public <V1> KStream<K, V1> mapValues(ValueMapper<V, V1> mapper) {
+ Objects.requireNonNull(mapper, "mapper can't be null");
String name = topology.newName(MAPVALUES_NAME);
topology.addProcessor(name, new KStreamMapValues<>(mapper), this.name);
@@ -200,6 +206,7 @@ public class KStreamImpl<K, V> extends AbstractStream<K> implements KStream<K, V
*/
@Override
public void writeAsText(String filePath, String streamName, Serde<K> keySerde, Serde<V> valSerde) {
+ Objects.requireNonNull(filePath, "filePath can't be null");
String name = topology.newName(PRINTING_NAME);
streamName = (streamName == null) ? this.name : streamName;
try {
@@ -216,6 +223,7 @@ public class KStreamImpl<K, V> extends AbstractStream<K> implements KStream<K, V
@Override
public <K1, V1> KStream<K1, V1> flatMap(KeyValueMapper<K, V, Iterable<KeyValue<K1, V1>>> mapper) {
+ Objects.requireNonNull(mapper, "mapper can't be null");
String name = topology.newName(FLATMAP_NAME);
topology.addProcessor(name, new KStreamFlatMap<>(mapper), this.name);
@@ -225,6 +233,7 @@ public class KStreamImpl<K, V> extends AbstractStream<K> implements KStream<K, V
@Override
public <V1> KStream<K, V1> flatMapValues(ValueMapper<V, Iterable<V1>> mapper) {
+ Objects.requireNonNull(mapper, "mapper can't be null");
String name = topology.newName(FLATMAPVALUES_NAME);
topology.addProcessor(name, new KStreamFlatMapValues<>(mapper), this.name);
@@ -235,6 +244,12 @@ public class KStreamImpl<K, V> extends AbstractStream<K> implements KStream<K, V
@Override
@SuppressWarnings("unchecked")
public KStream<K, V>[] branch(Predicate<K, V>... predicates) {
+ if (predicates.length == 0) {
+ throw new IllegalArgumentException("you must provide at least one predicate");
+ }
+ for (Predicate<K, V> predicate : predicates) {
+ Objects.requireNonNull(predicate, "predicates can't have null values");
+ }
String branchName = topology.newName(BRANCH_NAME);
topology.addProcessor(branchName, new KStreamBranch(predicates.clone()), this.name);
@@ -283,6 +298,7 @@ public class KStreamImpl<K, V> extends AbstractStream<K> implements KStream<K, V
@Override
public void foreach(ForeachAction<K, V> action) {
+ Objects.requireNonNull(action, "action can't be null");
String name = topology.newName(FOREACH_NAME);
topology.addProcessor(name, new KStreamForeach<>(action), this.name);
@@ -321,6 +337,7 @@ public class KStreamImpl<K, V> extends AbstractStream<K> implements KStream<K, V
@SuppressWarnings("unchecked")
@Override
public void to(Serde<K> keySerde, Serde<V> valSerde, StreamPartitioner<K, V> partitioner, String topic) {
+ Objects.requireNonNull(topic, "topic can't be null");
String name = topology.newName(SINK_NAME);
Serializer<K> keySerializer = keySerde == null ? null : keySerde.serializer();
@@ -336,6 +353,7 @@ public class KStreamImpl<K, V> extends AbstractStream<K> implements KStream<K, V
@Override
public <K1, V1> KStream<K1, V1> transform(TransformerSupplier<K, V, KeyValue<K1, V1>> transformerSupplier, String... stateStoreNames) {
+ Objects.requireNonNull(transformerSupplier, "transformerSupplier can't be null");
String name = topology.newName(TRANSFORM_NAME);
topology.addProcessor(name, new KStreamTransform<>(transformerSupplier), this.name);
@@ -346,6 +364,7 @@ public class KStreamImpl<K, V> extends AbstractStream<K> implements KStream<K, V
@Override
public <V1> KStream<K, V1> transformValues(ValueTransformerSupplier<V, V1> valueTransformerSupplier, String... stateStoreNames) {
+ Objects.requireNonNull(valueTransformerSupplier, "valueTransformSupplier can't be null");
String name = topology.newName(TRANSFORMVALUES_NAME);
topology.addProcessor(name, new KStreamTransformValues<>(valueTransformerSupplier), this.name);
@@ -430,6 +449,10 @@ public class KStreamImpl<K, V> extends AbstractStream<K> implements KStream<K, V
Serde<V> thisValueSerde,
Serde<V1> otherValueSerde,
KStreamImplJoin join) {
+ Objects.requireNonNull(other, "other KStream can't be null");
+ Objects.requireNonNull(joiner, "joiner can't be null");
+ Objects.requireNonNull(windows, "windows can't be null");
+
KStreamImpl<K, V> joinThis = this;
KStreamImpl<K, V1> joinOther = (KStreamImpl) other;
@@ -541,6 +564,8 @@ public class KStreamImpl<K, V> extends AbstractStream<K> implements KStream<K, V
ValueJoiner<V, V1, R> joiner,
Serde<K> keySerde,
Serde<V> valueSerde) {
+ Objects.requireNonNull(other, "other KTable can't be null");
+ Objects.requireNonNull(joiner, "joiner can't be null");
if (repartitionRequired) {
KStreamImpl<K, V> thisStreamRepartitioned = this.repartitionForJoin(keySerde,
@@ -574,6 +599,7 @@ public class KStreamImpl<K, V> extends AbstractStream<K> implements KStream<K, V
Serde<K1> keySerde,
Serde<V> valSerde) {
+ Objects.requireNonNull(selector, "selector can't be null");
String selectName = internalSelectKey(selector);
return new KGroupedStreamImpl<>(topology,
selectName,
http://git-wip-us.apache.org/repos/asf/kafka/blob/caa9bd0f/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableImpl.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableImpl.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableImpl.java
index f4d4855..2f36183 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableImpl.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableImpl.java
@@ -37,6 +37,7 @@ import org.apache.kafka.streams.errors.StreamsException;
import java.io.FileNotFoundException;
import java.io.FileOutputStream;
import java.io.PrintStream;
+import java.util.Objects;
import java.util.Set;
/**
@@ -113,6 +114,7 @@ public class KTableImpl<K, S, V> extends AbstractStream<K> implements KTable<K,
@Override
public KTable<K, V> filter(Predicate<K, V> predicate) {
+ Objects.requireNonNull(predicate, "predicate can't be null");
String name = topology.newName(FILTER_NAME);
KTableProcessorSupplier<K, V, V> processorSupplier = new KTableFilter<>(this, predicate, false);
topology.addProcessor(name, processorSupplier, this.name);
@@ -122,6 +124,7 @@ public class KTableImpl<K, S, V> extends AbstractStream<K> implements KTable<K,
@Override
public KTable<K, V> filterNot(final Predicate<K, V> predicate) {
+ Objects.requireNonNull(predicate, "predicate can't be null");
String name = topology.newName(FILTER_NAME);
KTableProcessorSupplier<K, V, V> processorSupplier = new KTableFilter<>(this, predicate, true);
@@ -132,6 +135,7 @@ public class KTableImpl<K, S, V> extends AbstractStream<K> implements KTable<K,
@Override
public <V1> KTable<K, V1> mapValues(ValueMapper<V, V1> mapper) {
+ Objects.requireNonNull(mapper);
String name = topology.newName(MAPVALUES_NAME);
KTableProcessorSupplier<K, V, V1> processorSupplier = new KTableMapValues<>(this, mapper);
@@ -196,6 +200,7 @@ public class KTableImpl<K, S, V> extends AbstractStream<K> implements KTable<K,
@Override
public void foreach(final ForeachAction<K, V> action) {
+ Objects.requireNonNull(action, "action can't be null");
String name = topology.newName(FOREACH_NAME);
KStreamForeach<K, Change<V>> processorSupplier = new KStreamForeach<>(new ForeachAction<K, Change<V>>() {
@Override
@@ -212,6 +217,7 @@ public class KTableImpl<K, S, V> extends AbstractStream<K> implements KTable<K,
StreamPartitioner<K, V> partitioner,
String topic,
final String storeName) {
+ Objects.requireNonNull(storeName, "storeName can't be null");
to(keySerde, valSerde, partitioner, topic);
return topology.table(keySerde, valSerde, topic, storeName);
@@ -274,6 +280,9 @@ public class KTableImpl<K, S, V> extends AbstractStream<K> implements KTable<K,
@SuppressWarnings("unchecked")
@Override
public <V1, R> KTable<K, R> join(KTable<K, V1> other, ValueJoiner<V, V1, R> joiner) {
+ Objects.requireNonNull(other, "other can't be null");
+ Objects.requireNonNull(joiner, "joiner can't be null");
+
Set<String> allSourceNodes = ensureJoinableWith((AbstractStream<K>) other);
String joinThisName = topology.newName(JOINTHIS_NAME);
@@ -297,6 +306,9 @@ public class KTableImpl<K, S, V> extends AbstractStream<K> implements KTable<K,
@SuppressWarnings("unchecked")
@Override
public <V1, R> KTable<K, R> outerJoin(KTable<K, V1> other, ValueJoiner<V, V1, R> joiner) {
+ Objects.requireNonNull(other, "other can't be null");
+ Objects.requireNonNull(joiner, "joiner can't be null");
+
Set<String> allSourceNodes = ensureJoinableWith((AbstractStream<K>) other);
String joinThisName = topology.newName(OUTERTHIS_NAME);
@@ -320,6 +332,8 @@ public class KTableImpl<K, S, V> extends AbstractStream<K> implements KTable<K,
@SuppressWarnings("unchecked")
@Override
public <V1, R> KTable<K, R> leftJoin(KTable<K, V1> other, ValueJoiner<V, V1, R> joiner) {
+ Objects.requireNonNull(other, "other can't be null");
+ Objects.requireNonNull(joiner, "joiner can't be null");
Set<String> allSourceNodes = ensureJoinableWith((AbstractStream<K>) other);
String joinThisName = topology.newName(LEFTTHIS_NAME);
@@ -345,6 +359,7 @@ public class KTableImpl<K, S, V> extends AbstractStream<K> implements KTable<K,
Serde<K1> keySerde,
Serde<V1> valueSerde) {
+ Objects.requireNonNull(selector, "selector can't be null");
String selectName = topology.newName(SELECT_NAME);
KTableProcessorSupplier<K, V, KeyValue<K1, V1>> selectSupplier = new KTableRepartitionMap<>(this, selector);
http://git-wip-us.apache.org/repos/asf/kafka/blob/caa9bd0f/streams/src/main/java/org/apache/kafka/streams/processor/TopologyBuilder.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/TopologyBuilder.java b/streams/src/main/java/org/apache/kafka/streams/processor/TopologyBuilder.java
index b8851b4..8e3dc7a 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/TopologyBuilder.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/TopologyBuilder.java
@@ -39,6 +39,7 @@ import java.util.Iterator;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
+import java.util.Objects;
import java.util.Set;
import java.util.regex.Pattern;
@@ -278,10 +279,15 @@ public class TopologyBuilder {
* @throws TopologyBuilderException if processor is already added or if topics have already been registered by another source
*/
public synchronized final TopologyBuilder addSource(String name, Deserializer keyDeserializer, Deserializer valDeserializer, String... topics) {
+ if (topics.length == 0) {
+ throw new TopologyBuilderException("You must provide at least one topic");
+ }
+ Objects.requireNonNull(name, "name must not be null");
if (nodeFactories.containsKey(name))
throw new TopologyBuilderException("Processor " + name + " is already added.");
for (String topic : topics) {
+ Objects.requireNonNull(topic, "topic names cannot be null");
if (sourceTopicNames.contains(topic))
throw new TopologyBuilderException("Topic " + topic + " has already been registered by another source.");
@@ -322,10 +328,8 @@ public class TopologyBuilder {
*/
public synchronized final TopologyBuilder addSource(String name, Deserializer keyDeserializer, Deserializer valDeserializer, Pattern topicPattern) {
-
- if (topicPattern == null) {
- throw new TopologyBuilderException("Pattern can't be null");
- }
+ Objects.requireNonNull(topicPattern, "topicPattern can't be null");
+ Objects.requireNonNull(name, "name can't be null");
if (nodeFactories.containsKey(name)) {
throw new TopologyBuilderException("Processor " + name + " is already added.");
@@ -435,6 +439,8 @@ public class TopologyBuilder {
* @throws TopologyBuilderException if parent processor is not added yet, or if this processor's name is equal to the parent's name
*/
public synchronized final <K, V> TopologyBuilder addSink(String name, String topic, Serializer<K> keySerializer, Serializer<V> valSerializer, StreamPartitioner<K, V> partitioner, String... parentNames) {
+ Objects.requireNonNull(name, "name must not be null");
+ Objects.requireNonNull(topic, "topic must not be null");
if (nodeFactories.containsKey(name))
throw new TopologyBuilderException("Processor " + name + " is already added.");
@@ -467,6 +473,8 @@ public class TopologyBuilder {
* @throws TopologyBuilderException if parent processor is not added yet, or if this processor's name is equal to the parent's name
*/
public synchronized final TopologyBuilder addProcessor(String name, ProcessorSupplier supplier, String... parentNames) {
+ Objects.requireNonNull(name, "name must not be null");
+ Objects.requireNonNull(supplier, "supplier must not be null");
if (nodeFactories.containsKey(name))
throw new TopologyBuilderException("Processor " + name + " is already added.");
@@ -495,6 +503,7 @@ public class TopologyBuilder {
* @throws TopologyBuilderException if state store supplier is already added
*/
public synchronized final TopologyBuilder addStateStore(StateStoreSupplier supplier, boolean isInternal, String... processorNames) {
+ Objects.requireNonNull(supplier, "supplier can't be null");
if (stateFactories.containsKey(supplier.name())) {
throw new TopologyBuilderException("StateStore " + supplier.name() + " is already added.");
}
@@ -528,6 +537,7 @@ public class TopologyBuilder {
* @return this builder instance so methods can be chained together; never null
*/
public synchronized final TopologyBuilder connectProcessorAndStateStores(String processorName, String... stateStoreNames) {
+ Objects.requireNonNull(processorName, "processorName can't be null");
if (stateStoreNames != null) {
for (String stateStoreName : stateStoreNames) {
connectProcessorAndStateStore(processorName, stateStoreName);
@@ -584,6 +594,7 @@ public class TopologyBuilder {
* @return this builder instance so methods can be chained together; never null
*/
public synchronized final TopologyBuilder addInternalTopic(String topicName) {
+ Objects.requireNonNull(topicName, "topicName can't be null");
this.internalTopicNames.add(topicName);
return this;
@@ -793,6 +804,7 @@ public class TopologyBuilder {
* @see org.apache.kafka.streams.KafkaStreams#KafkaStreams(TopologyBuilder, org.apache.kafka.streams.StreamsConfig)
*/
public synchronized ProcessorTopology build(String applicationId, Integer topicGroupId) {
+ Objects.requireNonNull(applicationId, "applicationId can't be null");
Set<String> nodeGroup;
if (topicGroupId != null) {
nodeGroup = nodeGroups().get(topicGroupId);
@@ -910,6 +922,7 @@ public class TopologyBuilder {
* {@link org.apache.kafka.streams.StreamsConfig#APPLICATION_ID_CONFIG}
*/
public synchronized void setApplicationId(String applicationId) {
+ Objects.requireNonNull(applicationId, "applicationId can't be null");
this.applicationId = applicationId;
}
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/caa9bd0f/streams/src/test/java/org/apache/kafka/streams/kstream/KStreamBuilderTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/KStreamBuilderTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/KStreamBuilderTest.java
index cdf28db..c776b8a 100644
--- a/streams/src/test/java/org/apache/kafka/streams/kstream/KStreamBuilderTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/kstream/KStreamBuilderTest.java
@@ -17,6 +17,7 @@
package org.apache.kafka.streams.kstream;
+import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.common.utils.Utils;
import org.apache.kafka.streams.kstream.internals.KStreamImpl;
import org.apache.kafka.streams.errors.TopologyBuilderException;
@@ -88,4 +89,14 @@ public class KStreamBuilderTest {
assertEquals(Utils.mkList("A:aa", "B:bb", "C:cc", "D:dd"), processorSupplier.processed);
}
+ @Test(expected = TopologyBuilderException.class)
+ public void shouldThrowExceptionWhenNoTopicPresent() throws Exception {
+ new KStreamBuilder().stream();
+ }
+
+ @Test(expected = NullPointerException.class)
+ public void shouldThrowExceptionWhenTopicNamesAreNull() throws Exception {
+ new KStreamBuilder().stream(Serdes.String(), Serdes.String(), null, null);
+ }
+
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/caa9bd0f/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KGroupedStreamImplTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KGroupedStreamImplTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KGroupedStreamImplTest.java
new file mode 100644
index 0000000..a95d1fb
--- /dev/null
+++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KGroupedStreamImplTest.java
@@ -0,0 +1,100 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.streams.kstream.internals;
+
+import org.apache.kafka.common.serialization.Serdes;
+import org.apache.kafka.streams.kstream.KGroupedStream;
+import org.apache.kafka.streams.kstream.KStream;
+import org.apache.kafka.streams.kstream.KStreamBuilder;
+import org.apache.kafka.streams.kstream.TimeWindows;
+import org.apache.kafka.test.MockAggregator;
+import org.apache.kafka.test.MockInitializer;
+import org.apache.kafka.test.MockReducer;
+import org.junit.Before;
+import org.junit.Test;
+
+public class KGroupedStreamImplTest {
+
+ private KGroupedStream<String, String> groupedStream;
+
+ @Before
+ public void before() {
+ final KStream<String, String> stream = new KStreamBuilder().stream("topic");
+ groupedStream = stream.groupByKey();
+ }
+
+ @Test(expected = NullPointerException.class)
+ public void shouldNotHaveNullReducerOnReduce() throws Exception {
+ groupedStream.reduce(null, "store");
+ }
+
+ @Test(expected = NullPointerException.class)
+ public void shouldNotHaveNullStoreNameOnReduce() throws Exception {
+ groupedStream.reduce(MockReducer.STRING_ADDER, null);
+ }
+
+ @Test(expected = NullPointerException.class)
+ public void shouldNotHaveNullReducerWithWindowedReduce() throws Exception {
+ groupedStream.reduce(null, TimeWindows.of(10), "store");
+ }
+
+ @Test(expected = NullPointerException.class)
+ public void shouldNotHaveNullWindowsWithWindowedReduce() throws Exception {
+ groupedStream.reduce(MockReducer.STRING_ADDER, null, "store");
+ }
+
+ @Test(expected = NullPointerException.class)
+ public void shouldNotHaveNullStoreNameWithWindowedReduce() throws Exception {
+ groupedStream.reduce(MockReducer.STRING_ADDER, TimeWindows.of(10), null);
+ }
+
+ @Test(expected = NullPointerException.class)
+ public void shouldNotHaveNullInitializerOnAggregate() throws Exception {
+ groupedStream.aggregate(null, MockAggregator.STRING_ADDER, Serdes.String(), "store");
+ }
+
+ @Test(expected = NullPointerException.class)
+ public void shouldNotHaveNullAdderOnAggregate() throws Exception {
+ groupedStream.aggregate(MockInitializer.STRING_INIT, null, Serdes.String(), "store");
+ }
+
+ @Test(expected = NullPointerException.class)
+ public void shouldNotHaveNullStoreNameOnAggregate() throws Exception {
+ groupedStream.aggregate(MockInitializer.STRING_INIT, MockAggregator.STRING_ADDER, Serdes.String(), null);
+ }
+
+ @Test(expected = NullPointerException.class)
+ public void shouldNotHaveNullInitializerOnWindowedAggregate() throws Exception {
+ groupedStream.aggregate(null, MockAggregator.STRING_ADDER, TimeWindows.of(10), Serdes.String(), "store");
+ }
+
+ @Test(expected = NullPointerException.class)
+ public void shouldNotHaveNullAdderOnWindowedAggregate() throws Exception {
+ groupedStream.aggregate(MockInitializer.STRING_INIT, null, TimeWindows.of(10), Serdes.String(), "store");
+ }
+
+ @Test(expected = NullPointerException.class)
+ public void shouldNotHaveNullWindowsOnWindowedAggregate() throws Exception {
+ groupedStream.aggregate(MockInitializer.STRING_INIT, MockAggregator.STRING_ADDER, null, Serdes.String(), "store");
+ }
+
+ @Test(expected = NullPointerException.class)
+ public void shouldNotHaveNullStoreNameOnWindowedAggregate() throws Exception {
+ groupedStream.aggregate(MockInitializer.STRING_INIT, MockAggregator.STRING_ADDER, TimeWindows.of(10), Serdes.String(), null);
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/kafka/blob/caa9bd0f/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KGroupedTableImplTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KGroupedTableImplTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KGroupedTableImplTest.java
new file mode 100644
index 0000000..0846066
--- /dev/null
+++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KGroupedTableImplTest.java
@@ -0,0 +1,76 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.streams.kstream.internals;
+
+import org.apache.kafka.common.serialization.Serdes;
+import org.apache.kafka.streams.kstream.KGroupedTable;
+import org.apache.kafka.streams.kstream.KStreamBuilder;
+import org.apache.kafka.test.MockAggregator;
+import org.apache.kafka.test.MockInitializer;
+import org.apache.kafka.test.MockKeyValueMapper;
+import org.apache.kafka.test.MockReducer;
+import org.junit.Before;
+import org.junit.Test;
+
+
+public class KGroupedTableImplTest {
+
+ private KGroupedTable<String, String> groupedTable;
+
+ @Before
+ public void before() {
+ final KStreamBuilder builder = new KStreamBuilder();
+ groupedTable = builder.table(Serdes.String(), Serdes.String(), "blah", "blah")
+ .groupBy(MockKeyValueMapper.<String, String>SelectValueKeyValueMapper());
+ }
+
+ @Test(expected = NullPointerException.class)
+ public void shouldNotAllowNullStoreNameOnAggregate() throws Exception {
+ groupedTable.aggregate(MockInitializer.STRING_INIT, MockAggregator.STRING_ADDER, MockAggregator.STRING_REMOVER, null);
+ }
+
+ @Test(expected = NullPointerException.class)
+ public void shouldNotAllowNullInitializerOnAggregate() throws Exception {
+ groupedTable.aggregate(null, MockAggregator.STRING_ADDER, MockAggregator.STRING_REMOVER, "store");
+ }
+
+ @Test(expected = NullPointerException.class)
+ public void shouldNotAllowNullAdderOnAggregate() throws Exception {
+ groupedTable.aggregate(MockInitializer.STRING_INIT, null, MockAggregator.STRING_REMOVER, "store");
+ }
+
+ @Test(expected = NullPointerException.class)
+ public void shouldNotAllowNullSubtractorOnAggregate() throws Exception {
+ groupedTable.aggregate(MockInitializer.STRING_INIT, MockAggregator.STRING_ADDER, null, "store");
+ }
+
+ @Test(expected = NullPointerException.class)
+ public void shouldNotAllowNullAdderOnReduce() throws Exception {
+ groupedTable.reduce(null, MockReducer.STRING_REMOVER, "store");
+ }
+
+ @Test(expected = NullPointerException.class)
+ public void shouldNotAllowNullSubtractorOnReduce() throws Exception {
+ groupedTable.reduce(MockReducer.STRING_ADDER, null, "store");
+ }
+
+ @Test(expected = NullPointerException.class)
+ public void shouldNotAllowNullStoreNameOnReduce() throws Exception {
+ groupedTable.reduce(MockReducer.STRING_ADDER, MockReducer.STRING_REMOVER, null);
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/kafka/blob/caa9bd0f/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamImplTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamImplTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamImplTest.java
index d5fc41b..9cbc156 100644
--- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamImplTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamImplTest.java
@@ -22,10 +22,13 @@ import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.streams.kstream.JoinWindows;
import org.apache.kafka.streams.kstream.KStream;
import org.apache.kafka.streams.kstream.KStreamBuilder;
+import org.apache.kafka.streams.kstream.KTable;
import org.apache.kafka.streams.kstream.Predicate;
import org.apache.kafka.streams.kstream.ValueJoiner;
import org.apache.kafka.streams.kstream.ValueMapper;
import org.apache.kafka.test.MockProcessorSupplier;
+import org.apache.kafka.test.MockValueJoiner;
+import org.junit.Before;
import org.junit.Test;
import java.util.Collections;
@@ -37,6 +40,14 @@ public class KStreamImplTest {
final private Serde<String> stringSerde = Serdes.String();
final private Serde<Integer> intSerde = Serdes.Integer();
+ private KStream<String, String> testStream;
+ private KStreamBuilder builder;
+
+ @Before
+ public void before() {
+ builder = new KStreamBuilder();
+ testStream = builder.stream("source");
+ }
@Test
public void testNumProcesses() {
@@ -141,4 +152,115 @@ public class KStreamImplTest {
final KStream<String, String> inputStream = builder.stream(stringSerde, stringSerde, "input");
inputStream.to(stringSerde, null, "output");
}
+
+ @Test(expected = NullPointerException.class)
+ public void shouldNotAllowNullPredicateOnFilter() throws Exception {
+ testStream.filter(null);
+ }
+
+ @Test(expected = NullPointerException.class)
+ public void shouldNotAllowNullPredicateOnFilterNot() throws Exception {
+ testStream.filterNot(null);
+ }
+
+ @Test(expected = NullPointerException.class)
+ public void shouldNotAllowNullMapperOnSelectKey() throws Exception {
+ testStream.selectKey(null);
+ }
+
+ @Test(expected = NullPointerException.class)
+ public void shouldNotAllowNullMapperOnMap() throws Exception {
+ testStream.map(null);
+ }
+
+ @Test(expected = NullPointerException.class)
+ public void shouldNotAllowNullMapperOnMapValues() throws Exception {
+ testStream.mapValues(null);
+ }
+
+ @Test(expected = NullPointerException.class)
+ public void shouldNotAllowNullFilePathOnWriteAsText() throws Exception {
+ testStream.writeAsText(null);
+ }
+
+ @Test(expected = NullPointerException.class)
+ public void shouldNotAllowNullMapperOnFlatMap() throws Exception {
+ testStream.flatMap(null);
+ }
+
+ @Test(expected = NullPointerException.class)
+ public void shouldNotAllowNullMapperOnFlatMapValues() throws Exception {
+ testStream.flatMapValues(null);
+ }
+
+ @Test(expected = IllegalArgumentException.class)
+ public void shouldHaveAtLeastOnPredicateWhenBranching() throws Exception {
+ testStream.branch();
+ }
+
+ @Test(expected = NullPointerException.class)
+ public void shouldCantHaveNullPredicate() throws Exception {
+ testStream.branch(null);
+ }
+
+ @Test(expected = NullPointerException.class)
+ public void shouldNotAllowNullTopicOnThrough() throws Exception {
+ testStream.through(null);
+ }
+
+ @Test(expected = NullPointerException.class)
+ public void shouldNotAllowNullTopicOnTo() throws Exception {
+ testStream.to(null);
+ }
+
+ @Test(expected = NullPointerException.class)
+ public void shouldNotAllowNullTransformSupplierOnTransform() throws Exception {
+ testStream.transform(null);
+ }
+
+ @Test(expected = NullPointerException.class)
+ public void shouldNotAllowNullTransformSupplierOnTransformValues() throws Exception {
+ testStream.transformValues(null);
+ }
+
+ @Test(expected = NullPointerException.class)
+ public void shouldNotAllowNullProcessSupplier() throws Exception {
+ testStream.process(null);
+ }
+
+ @Test(expected = NullPointerException.class)
+ public void shouldNotAllowNullOtherStreamOnJoin() throws Exception {
+ testStream.join(null, MockValueJoiner.STRING_JOINER, JoinWindows.of(10));
+ }
+
+ @Test(expected = NullPointerException.class)
+ public void shouldNotAllowNullValueJoinerOnJoin() throws Exception {
+ testStream.join(testStream, null, JoinWindows.of(10));
+ }
+
+ @Test(expected = NullPointerException.class)
+ public void shouldNotAllowNullJoinWindowsOnJoin() throws Exception {
+ testStream.join(testStream, MockValueJoiner.STRING_JOINER, null);
+ }
+
+ @Test(expected = NullPointerException.class)
+ public void shouldNotAllowNullTableOnTableJoin() throws Exception {
+ testStream.leftJoin((KTable) null, MockValueJoiner.STRING_JOINER);
+ }
+
+ @Test(expected = NullPointerException.class)
+ public void shouldNotAllowNullValueMapperOnTableJoin() throws Exception {
+ testStream.leftJoin(builder.table(Serdes.String(), Serdes.String(), "topic", "store"), null);
+ }
+
+ @Test(expected = NullPointerException.class)
+ public void shouldNotAllowNullSelectorOnGroupBy() throws Exception {
+ testStream.groupBy(null);
+ }
+
+ @Test(expected = NullPointerException.class)
+ public void shouldNotAllowNullActionOnForEach() throws Exception {
+ testStream.foreach(null);
+ }
+
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/caa9bd0f/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableImplTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableImplTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableImplTest.java
index 617a2a1..7edaac9 100644
--- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableImplTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableImplTest.java
@@ -33,6 +33,7 @@ import org.apache.kafka.test.MockInitializer;
import org.apache.kafka.test.MockKeyValueMapper;
import org.apache.kafka.test.MockProcessorSupplier;
import org.apache.kafka.test.MockReducer;
+import org.apache.kafka.test.MockValueJoiner;
import org.apache.kafka.test.TestUtils;
import org.junit.After;
import org.junit.Before;
@@ -52,6 +53,8 @@ public class KTableImplTest {
private KStreamTestDriver driver = null;
private File stateDir = null;
+ private KStreamBuilder builder;
+ private KTable<String, String> table;
@After
public void tearDown() {
@@ -64,6 +67,8 @@ public class KTableImplTest {
@Before
public void setUp() throws IOException {
stateDir = TestUtils.tempDirectory("kafka-test");
+ builder = new KStreamBuilder();
+ table = builder.table("test", "test");
}
@Test
@@ -356,4 +361,85 @@ public class KTableImplTest {
assertNotNull(((ChangedSerializer) ((SinkNode) driver.processor("KSTREAM-SINK-0000000007")).valueSerializer()).inner());
assertNotNull(((ChangedDeserializer) ((SourceNode) driver.processor("KSTREAM-SOURCE-0000000008")).valueDeserializer()).inner());
}
+
+ @Test(expected = NullPointerException.class)
+ public void shouldNotAllowNullSelectorOnToStream() throws Exception {
+ table.toStream(null);
+ }
+
+ @Test(expected = NullPointerException.class)
+ public void shouldNotAllowNullTopicOnTo() throws Exception {
+ table.to(null);
+ }
+
+ @Test(expected = NullPointerException.class)
+ public void shouldNotAllowNullPredicateOnFilter() throws Exception {
+ table.filter(null);
+ }
+
+ @Test(expected = NullPointerException.class)
+ public void shouldNotAllowNullPredicateOnFilterNot() throws Exception {
+ table.filterNot(null);
+ }
+
+ @Test(expected = NullPointerException.class)
+ public void shouldNotAllowNullMapperOnMapValues() throws Exception {
+ table.mapValues(null);
+ }
+
+ @Test(expected = NullPointerException.class)
+ public void shouldNotAllowNullFilePathOnWriteAsText() throws Exception {
+ table.writeAsText(null);
+ }
+
+ @Test(expected = NullPointerException.class)
+ public void shouldNotAllowNullActionOnForEach() throws Exception {
+ table.foreach(null);
+ }
+
+ @Test(expected = NullPointerException.class)
+ public void shouldNotAllowNullTopicInThrough() throws Exception {
+ table.through(null, "store");
+ }
+
+ @Test(expected = NullPointerException.class)
+ public void shouldNotAllowNullStoreInThrough() throws Exception {
+ table.through("topic", null);
+ }
+
+ @Test(expected = NullPointerException.class)
+ public void shouldNotAllowNullSelectorOnGroupBy() throws Exception {
+ table.groupBy(null);
+ }
+
+ @Test(expected = NullPointerException.class)
+ public void shouldNotAllowNullOtherTableOnJoin() throws Exception {
+ table.join(null, MockValueJoiner.STRING_JOINER);
+ }
+
+ @Test(expected = NullPointerException.class)
+ public void shouldNotAllowNullJoinerJoin() throws Exception {
+ table.join(table, null);
+ }
+
+ @Test(expected = NullPointerException.class)
+ public void shouldNotAllowNullOtherTableOnOuterJoin() throws Exception {
+ table.outerJoin(null, MockValueJoiner.STRING_JOINER);
+ }
+
+ @Test(expected = NullPointerException.class)
+ public void shouldNotAllowNullJoinerOnOuterJoin() throws Exception {
+ table.outerJoin(table, null);
+ }
+
+ @Test(expected = NullPointerException.class)
+ public void shouldNotAllowNullJoinerOnLeftJoin() throws Exception {
+ table.leftJoin(table, null);
+ }
+
+ @Test(expected = NullPointerException.class)
+ public void shouldNotAllowNullOtherTableOnLeftJoin() throws Exception {
+ table.leftJoin(null, MockValueJoiner.STRING_JOINER);
+ }
+
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/caa9bd0f/streams/src/test/java/org/apache/kafka/streams/processor/TopologyBuilderTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/TopologyBuilderTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/TopologyBuilderTest.java
index 107d832..f6ca6db 100644
--- a/streams/src/test/java/org/apache/kafka/streams/processor/TopologyBuilderTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/processor/TopologyBuilderTest.java
@@ -331,6 +331,71 @@ public class TopologyBuilderTest {
assertEquals(mkSet("source-5"), nodeNames(topology2.processors()));
}
+ @Test(expected = NullPointerException.class)
+ public void shouldNotAllowNullNameWhenAddingSink() throws Exception {
+ final TopologyBuilder builder = new TopologyBuilder();
+ builder.addSink(null, "topic");
+ }
+
+ @Test(expected = NullPointerException.class)
+ public void shouldNotAllowNullTopicWhenAddingSink() throws Exception {
+ final TopologyBuilder builder = new TopologyBuilder();
+ builder.addSink("name", null);
+ }
+
+ @Test(expected = NullPointerException.class)
+ public void shouldNotAllowNullNameWhenAddingProcessor() throws Exception {
+ final TopologyBuilder builder = new TopologyBuilder();
+ builder.addProcessor(null, new ProcessorSupplier() {
+ @Override
+ public Processor get() {
+ return null;
+ }
+ });
+ }
+
+ @Test(expected = NullPointerException.class)
+ public void shouldNotAllowNullProcessorSupplier() throws Exception {
+ final TopologyBuilder builder = new TopologyBuilder();
+ builder.addProcessor("name", null);
+ }
+
+ @Test(expected = NullPointerException.class)
+ public void shouldNotAllowNullNameWhenAddingSource() throws Exception {
+ final TopologyBuilder builder = new TopologyBuilder();
+ builder.addSource(null, Pattern.compile(".*"));
+ }
+
+ @Test(expected = NullPointerException.class)
+ public void shouldNotAllowNullProcessorNameWhenConnectingProcessorAndStateStores() throws Exception {
+ final TopologyBuilder builder = new TopologyBuilder();
+ builder.connectProcessorAndStateStores(null, "store");
+ }
+
+ @Test(expected = NullPointerException.class)
+ public void shouldNotAddNullInternalTopic() throws Exception {
+ final TopologyBuilder builder = new TopologyBuilder();
+ builder.addInternalTopic(null);
+ }
+
+ @Test(expected = NullPointerException.class)
+ public void shouldNotAllowNullApplicationIdOnBuild() throws Exception {
+ final TopologyBuilder builder = new TopologyBuilder();
+ builder.build(null, 1);
+ }
+
+ @Test(expected = NullPointerException.class)
+ public void shouldNotSetApplicationIdToNull() throws Exception {
+ final TopologyBuilder builder = new TopologyBuilder();
+ builder.setApplicationId(null);
+ }
+
+ @Test(expected = NullPointerException.class)
+ public void shouldNotAddNullStateStoreSupplier() throws Exception {
+ final TopologyBuilder builder = new TopologyBuilder();
+ builder.addStateStore(null, true);
+ }
+
private Set<String> nodeNames(Collection<ProcessorNode> nodes) {
Set<String> nodeNames = new HashSet<>();
for (ProcessorNode node : nodes) {
http://git-wip-us.apache.org/repos/asf/kafka/blob/caa9bd0f/streams/src/test/java/org/apache/kafka/streams/state/internals/StreamThreadStateStoreProviderTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/StreamThreadStateStoreProviderTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/StreamThreadStateStoreProviderTest.java
index 6563e26..a112a5a 100644
--- a/streams/src/test/java/org/apache/kafka/streams/state/internals/StreamThreadStateStoreProviderTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/StreamThreadStateStoreProviderTest.java
@@ -65,7 +65,7 @@ public class StreamThreadStateStoreProviderTest {
@Before
public void before() throws IOException {
final TopologyBuilder builder = new TopologyBuilder();
- builder.addSource("the-source");
+ builder.addSource("the-source", "the-source");
builder.addProcessor("the-processor", new MockProcessorSupplier());
builder.addStateStore(Stores.create("kv-store")
.withStringKeys()