You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by gu...@apache.org on 2018/01/10 19:16:17 UTC

[kafka] branch 1.0 updated: KAFKA-6398: fix KTable.filter that does not include its parent's queryable storename

This is an automated email from the ASF dual-hosted git repository.

guozhang pushed a commit to branch 1.0
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/1.0 by this push:
     new a415f06  KAFKA-6398: fix KTable.filter that does not include its parent's queryable storename
a415f06 is described below

commit a415f0681c908952a473d0c3654d8774f6c3528a
Author: Guozhang Wang <wa...@gmail.com>
AuthorDate: Wed Jan 10 11:14:20 2018 -0800

    KAFKA-6398: fix KTable.filter that does not include its parent's queryable storename
    
    1. Include the parent's queryable store name in KTable.filter if this operator is not materialized.
    2. Augment InternalTopologyBuilder checking on null processor / store names from the enum.
    3. Unit test.
    
    Author: Guozhang Wang <wa...@gmail.com>
    
    Reviewers: Damian Guy <da...@gmail.com>, Bill Bejeck <bi...@confluent.io>, Matthias J. Sax <ma...@confluent.io>
    
    Closes #4384 from guozhangwang/K6398-topology-builder-exception
    
    (cherry picked from commit fa5ebc8a019f8cb24823dd29ef84d422d93624dc)
    Signed-off-by: Guozhang Wang <wa...@gmail.com>
---
 .../streams/kstream/internals/KTableImpl.java      |  4 ++-
 .../internals/InternalTopologyBuilder.java         |  8 +++++-
 .../apache/kafka/streams/StreamsBuilderTest.java   | 28 ++++++++++++++++++
 .../integration/InternalTopicIntegrationTest.java  |  6 ++--
 .../KStreamAggregationDedupIntegrationTest.java    |  4 +--
 .../KStreamAggregationIntegrationTest.java         |  4 +--
 .../integration/KStreamRepartitionJoinTest.java    | 12 ++++----
 .../integration/QueryableStateIntegrationTest.java |  4 +--
 .../kafka/streams/kstream/KStreamBuilderTest.java  |  4 +--
 .../internals/InternalStreamsBuilderTest.java      |  4 +--
 .../kstream/internals/KGroupedTableImplTest.java   |  8 +++---
 .../streams/kstream/internals/KStreamImplTest.java | 10 +++----
 .../kstream/internals/KTableAggregateTest.java     | 12 ++++----
 .../kstream/internals/KTableFilterTest.java        |  6 ++--
 .../streams/kstream/internals/KTableImplTest.java  |  6 ++--
 .../internals/InternalTopologyBuilderTest.java     |  5 ++++
 .../{MockKeyValueMapper.java => MockMapper.java}   | 24 +++++++++++-----
 .../java/org/apache/kafka/test/MockPredicate.java  | 33 ++++++++++++++++++++++
 18 files changed, 133 insertions(+), 49 deletions(-)

diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableImpl.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableImpl.java
index 8c79dec..3bc6f4b 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableImpl.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableImpl.java
@@ -155,8 +155,10 @@ public class KTableImpl<K, S, V> extends AbstractStream<K> implements KTable<K,
         builder.internalTopologyBuilder.addProcessor(name, processorSupplier, this.name);
         if (storeSupplier != null) {
             builder.internalTopologyBuilder.addStateStore(storeSupplier, name);
+            return new KTableImpl<>(builder, name, processorSupplier, this.keySerde, this.valSerde, sourceNodes, internalStoreName, true);
+        } else {
+            return new KTableImpl<>(builder, name, processorSupplier, sourceNodes, this.queryableStoreName, false);
         }
-        return new KTableImpl<>(builder, name, processorSupplier, this.keySerde, this.valSerde, sourceNodes, internalStoreName, internalStoreName != null);
     }
 
     private KTable<K, V> doFilter(final Predicate<? super K, ? super V> predicate,
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalTopologyBuilder.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalTopologyBuilder.java
index f2cbf51..fc2f1f0 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalTopologyBuilder.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalTopologyBuilder.java
@@ -457,6 +457,7 @@ public class InternalTopologyBuilder {
         }
 
         for (final String predecessor : predecessorNames) {
+            Objects.requireNonNull(predecessor, "predecessor name can't be null");
             if (predecessor.equals(name)) {
                 throw new TopologyException("Processor " + name + " cannot be a predecessor of itself.");
             }
@@ -484,6 +485,7 @@ public class InternalTopologyBuilder {
         }
 
         for (final String predecessor : predecessorNames) {
+            Objects.requireNonNull(predecessor, "predecessor name must not be null");
             if (predecessor.equals(name)) {
                 throw new TopologyException("Processor " + name + " cannot be a predecessor of itself.");
             }
@@ -509,6 +511,7 @@ public class InternalTopologyBuilder {
 
         if (processorNames != null) {
             for (final String processorName : processorNames) {
+                Objects.requireNonNull(processorName, "processor name must not be null");
                 connectProcessorAndStateStore(processorName, supplier.name());
             }
         }
@@ -525,6 +528,7 @@ public class InternalTopologyBuilder {
 
         if (processorNames != null) {
             for (final String processorName : processorNames) {
+                Objects.requireNonNull(processorName, "processor name must not be null");
                 connectProcessorAndStateStore(processorName, storeBuilder.name());
             }
         }
@@ -603,11 +607,12 @@ public class InternalTopologyBuilder {
     public final void connectProcessorAndStateStores(final String processorName,
                                                      final String... stateStoreNames) {
         Objects.requireNonNull(processorName, "processorName can't be null");
-        Objects.requireNonNull(stateStoreNames, "stateStoreNames can't be null");
+        Objects.requireNonNull(stateStoreNames, "state store list must not be null");
         if (stateStoreNames.length == 0) {
             throw new TopologyException("Must provide at least one state store name.");
         }
         for (final String stateStoreName : stateStoreNames) {
+            Objects.requireNonNull(stateStoreName, "state store name must not be null");
             connectProcessorAndStateStore(processorName, stateStoreName);
         }
     }
@@ -628,6 +633,7 @@ public class InternalTopologyBuilder {
         }
 
         for (final String processorName : processorNames) {
+            Objects.requireNonNull(processorName, "processor name can't be null");
             if (!nodeFactories.containsKey(processorName)) {
                 throw new TopologyException("Processor " + processorName + " is not added yet.");
             }
diff --git a/streams/src/test/java/org/apache/kafka/streams/StreamsBuilderTest.java b/streams/src/test/java/org/apache/kafka/streams/StreamsBuilderTest.java
index cee01dc..13b5b45 100644
--- a/streams/src/test/java/org/apache/kafka/streams/StreamsBuilderTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/StreamsBuilderTest.java
@@ -22,12 +22,16 @@ import org.apache.kafka.common.utils.Utils;
 import org.apache.kafka.streams.errors.TopologyException;
 import org.apache.kafka.streams.kstream.ForeachAction;
 import org.apache.kafka.streams.kstream.KStream;
+import org.apache.kafka.streams.kstream.KTable;
 import org.apache.kafka.streams.kstream.Materialized;
 import org.apache.kafka.streams.kstream.internals.KStreamImpl;
 import org.apache.kafka.streams.processor.internals.InternalTopologyBuilder;
 import org.apache.kafka.streams.state.KeyValueStore;
 import org.apache.kafka.test.KStreamTestDriver;
+import org.apache.kafka.test.MockMapper;
+import org.apache.kafka.test.MockPredicate;
 import org.apache.kafka.test.MockProcessorSupplier;
+import org.apache.kafka.test.MockValueJoiner;
 import org.apache.kafka.test.TestUtils;
 import org.junit.Rule;
 import org.junit.Test;
@@ -60,6 +64,30 @@ public class StreamsBuilderTest {
     }
 
     @Test
+    public void shouldAllowJoinUnmaterializedFilteredKTable() {
+        final KTable<Bytes, String> filteredKTable = builder.<Bytes, String>table("table-topic").filter(MockPredicate.<Bytes, String>allGoodPredicate());
+        builder.<Bytes, String>stream("stream-topic").join(filteredKTable, MockValueJoiner.TOSTRING_JOINER);
+
+        driver.setUp(builder, TestUtils.tempDirectory());
+    }
+
+    @Test
+    public void shouldAllowJoinUnmaterializedMapValuedKTable() {
+        final KTable<Bytes, String> mappedKTable = builder.<Bytes, String>table("table-topic").mapValues(MockMapper.<String>noOpValueMapper());
+        builder.<Bytes, String>stream("stream-topic").join(mappedKTable, MockValueJoiner.TOSTRING_JOINER);
+
+        driver.setUp(builder, TestUtils.tempDirectory());
+    }
+
+    @Test
+    public void shouldAllowJoinMaterializedSourceKTable() {
+        final KTable<Bytes, String> table = builder.<Bytes, String>table("table-topic");
+        builder.<Bytes, String>stream("stream-topic").join(table, MockValueJoiner.TOSTRING_JOINER);
+
+        driver.setUp(builder, TestUtils.tempDirectory());
+    }
+
+    @Test
     public void shouldProcessingFromSinkTopic() {
         final KStream<String, String> source = builder.stream("topic-source");
         source.to("topic-sink");
diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/InternalTopicIntegrationTest.java b/streams/src/test/java/org/apache/kafka/streams/integration/InternalTopicIntegrationTest.java
index 01cfa5a..06b4147 100644
--- a/streams/src/test/java/org/apache/kafka/streams/integration/InternalTopicIntegrationTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/integration/InternalTopicIntegrationTest.java
@@ -39,7 +39,7 @@ import org.apache.kafka.streams.kstream.TimeWindows;
 import org.apache.kafka.streams.kstream.ValueMapper;
 import org.apache.kafka.streams.processor.internals.ProcessorStateManager;
 import org.apache.kafka.test.IntegrationTest;
-import org.apache.kafka.test.MockKeyValueMapper;
+import org.apache.kafka.test.MockMapper;
 import org.apache.kafka.test.TestUtils;
 import org.junit.Before;
 import org.junit.BeforeClass;
@@ -149,7 +149,7 @@ public class InternalTopicIntegrationTest {
                     public Iterable<String> apply(final String value) {
                         return Arrays.asList(value.toLowerCase(Locale.getDefault()).split("\\W+"));
                     }
-                }).groupBy(MockKeyValueMapper.<String, String>SelectValueMapper())
+                }).groupBy(MockMapper.<String, String>selectValueMapper())
                 .count("Counts").toStream();
 
         wordCounts.to(stringSerde, longSerde, DEFAULT_OUTPUT_TOPIC);
@@ -196,7 +196,7 @@ public class InternalTopicIntegrationTest {
                     public Iterable<String> apply(String value) {
                         return Arrays.asList(value.toLowerCase(Locale.getDefault()).split("\\W+"));
                     }
-                }).groupBy(MockKeyValueMapper.<String, String>SelectValueMapper())
+                }).groupBy(MockMapper.<String, String>selectValueMapper())
                 .count(TimeWindows.of(1000).until(durationMs), "CountWindows").toStream();
 
 
diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/KStreamAggregationDedupIntegrationTest.java b/streams/src/test/java/org/apache/kafka/streams/integration/KStreamAggregationDedupIntegrationTest.java
index cb58849..4c12bb9 100644
--- a/streams/src/test/java/org/apache/kafka/streams/integration/KStreamAggregationDedupIntegrationTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/integration/KStreamAggregationDedupIntegrationTest.java
@@ -39,7 +39,7 @@ import org.apache.kafka.streams.kstream.Serialized;
 import org.apache.kafka.streams.kstream.TimeWindows;
 import org.apache.kafka.streams.kstream.Windowed;
 import org.apache.kafka.test.IntegrationTest;
-import org.apache.kafka.test.MockKeyValueMapper;
+import org.apache.kafka.test.MockMapper;
 import org.apache.kafka.test.TestUtils;
 import org.junit.After;
 import org.junit.Before;
@@ -99,7 +99,7 @@ public class KStreamAggregationDedupIntegrationTest {
         streamsConfiguration.put(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG, 10 * 1024 * 1024L);
         streamsConfiguration.put(IntegrationTestUtils.INTERNAL_LEAVE_GROUP_ON_CLOSE, true);
 
-        KeyValueMapper<Integer, String, String> mapper = MockKeyValueMapper.<Integer, String>SelectValueMapper();
+        KeyValueMapper<Integer, String, String> mapper = MockMapper.<Integer, String>selectValueMapper();
         stream = builder.stream(streamOneInput, Consumed.with(Serdes.Integer(), Serdes.String()));
         groupedStream = stream
             .groupBy(
diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/KStreamAggregationIntegrationTest.java b/streams/src/test/java/org/apache/kafka/streams/integration/KStreamAggregationIntegrationTest.java
index 3500dd5..4527c19 100644
--- a/streams/src/test/java/org/apache/kafka/streams/integration/KStreamAggregationIntegrationTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/integration/KStreamAggregationIntegrationTest.java
@@ -52,7 +52,7 @@ import org.apache.kafka.streams.state.QueryableStoreTypes;
 import org.apache.kafka.streams.state.ReadOnlySessionStore;
 import org.apache.kafka.streams.state.WindowStore;
 import org.apache.kafka.test.IntegrationTest;
-import org.apache.kafka.test.MockKeyValueMapper;
+import org.apache.kafka.test.MockMapper;
 import org.apache.kafka.test.TestUtils;
 import org.junit.After;
 import org.junit.Before;
@@ -116,7 +116,7 @@ public class KStreamAggregationIntegrationTest {
         streamsConfiguration.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass());
         streamsConfiguration.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.Integer().getClass());
 
-        final KeyValueMapper<Integer, String, String> mapper = MockKeyValueMapper.SelectValueMapper();
+        final KeyValueMapper<Integer, String, String> mapper = MockMapper.selectValueMapper();
         stream = builder.stream(streamOneInput, Consumed.with(Serdes.Integer(), Serdes.String()));
         groupedStream = stream
             .groupBy(
diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/KStreamRepartitionJoinTest.java b/streams/src/test/java/org/apache/kafka/streams/integration/KStreamRepartitionJoinTest.java
index 5f6ff44..32546de 100644
--- a/streams/src/test/java/org/apache/kafka/streams/integration/KStreamRepartitionJoinTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/integration/KStreamRepartitionJoinTest.java
@@ -39,7 +39,7 @@ import org.apache.kafka.streams.kstream.KeyValueMapper;
 import org.apache.kafka.streams.kstream.Predicate;
 import org.apache.kafka.streams.kstream.ValueJoiner;
 import org.apache.kafka.test.IntegrationTest;
-import org.apache.kafka.test.MockKeyValueMapper;
+import org.apache.kafka.test.MockMapper;
 import org.apache.kafka.test.MockValueJoiner;
 import org.apache.kafka.test.TestUtils;
 import org.junit.After;
@@ -104,7 +104,7 @@ public class KStreamRepartitionJoinTest {
         streamTwo = builder.stream(streamTwoInput, Consumed.with(Serdes.Integer(), Serdes.String()));
         streamFour = builder.stream(streamFourInput, Consumed.with(Serdes.Integer(), Serdes.String()));
 
-        keyMapper = MockKeyValueMapper.SelectValueKeyValueMapper();
+        keyMapper = MockMapper.selectValueKeyValueMapper();
     }
 
     @After
@@ -157,7 +157,7 @@ public class KStreamRepartitionJoinTest {
 
     private ExpectedOutputOnTopic mapBothStreamsAndJoin() throws InterruptedException {
         final KStream<Integer, Integer> map1 = streamOne.map(keyMapper);
-        final KStream<Integer, String> map2 = streamTwo.map(MockKeyValueMapper.<Integer, String>NoOpKeyValueMapper());
+        final KStream<Integer, String> map2 = streamTwo.map(MockMapper.<Integer, String>noOpKeyValueMapper());
 
         doJoin(map1, map2, "map-both-streams-and-join-" + testNo);
         return new ExpectedOutputOnTopic(expectedStreamOneTwoJoin, "map-both-streams-and-join-" + testNo);
@@ -183,7 +183,7 @@ public class KStreamRepartitionJoinTest {
     private ExpectedOutputOnTopic selectKeyAndJoin() throws Exception {
 
         final KStream<Integer, Integer> keySelected =
-            streamOne.selectKey(MockKeyValueMapper.<Long, Integer>SelectValueMapper());
+            streamOne.selectKey(MockMapper.<Long, Integer>selectValueMapper());
 
         final String outputTopic = "select-key-join-" + testNo;
         doJoin(keySelected, streamTwo, outputTopic);
@@ -222,7 +222,7 @@ public class KStreamRepartitionJoinTest {
     private ExpectedOutputOnTopic mapBothStreamsAndLeftJoin() throws InterruptedException {
         final KStream<Integer, Integer> map1 = streamOne.map(keyMapper);
 
-        final KStream<Integer, String> map2 = streamTwo.map(MockKeyValueMapper.<Integer, String>NoOpKeyValueMapper());
+        final KStream<Integer, String> map2 = streamTwo.map(MockMapper.<Integer, String>noOpKeyValueMapper());
 
 
         final String outputTopic = "left-join-" + testNo;
@@ -247,7 +247,7 @@ public class KStreamRepartitionJoinTest {
         final KStream<Integer, Integer> map1 = streamOne.map(keyMapper);
 
         final KeyValueMapper<Integer, String, KeyValue<Integer, String>>
-            kvMapper = MockKeyValueMapper.NoOpKeyValueMapper();
+            kvMapper = MockMapper.noOpKeyValueMapper();
 
         final KStream<Integer, String> map2 = streamTwo.map(kvMapper);
 
diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/QueryableStateIntegrationTest.java b/streams/src/test/java/org/apache/kafka/streams/integration/QueryableStateIntegrationTest.java
index 569eb27..354209b 100644
--- a/streams/src/test/java/org/apache/kafka/streams/integration/QueryableStateIntegrationTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/integration/QueryableStateIntegrationTest.java
@@ -57,7 +57,7 @@ import org.apache.kafka.streams.state.StreamsMetadata;
 import org.apache.kafka.streams.state.WindowStore;
 import org.apache.kafka.streams.state.WindowStoreIterator;
 import org.apache.kafka.test.IntegrationTest;
-import org.apache.kafka.test.MockKeyValueMapper;
+import org.apache.kafka.test.MockMapper;
 import org.apache.kafka.test.TestCondition;
 import org.apache.kafka.test.TestUtils;
 import org.junit.After;
@@ -218,7 +218,7 @@ public class QueryableStateIntegrationTest {
                     return Arrays.asList(value.toLowerCase(Locale.getDefault()).split("\\W+"));
                 }
             })
-            .groupBy(MockKeyValueMapper.<String, String>SelectValueMapper());
+            .groupBy(MockMapper.<String, String>selectValueMapper());
 
         // Create a State Store for the all time word count
         groupedByWord
diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/KStreamBuilderTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/KStreamBuilderTest.java
index 5ffedb8..f9949d3 100644
--- a/streams/src/test/java/org/apache/kafka/streams/kstream/KStreamBuilderTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/kstream/KStreamBuilderTest.java
@@ -27,7 +27,7 @@ import org.apache.kafka.streams.processor.TopologyBuilder;
 import org.apache.kafka.streams.processor.internals.ProcessorTopology;
 import org.apache.kafka.streams.processor.internals.SourceNode;
 import org.apache.kafka.test.KStreamTestDriver;
-import org.apache.kafka.test.MockKeyValueMapper;
+import org.apache.kafka.test.MockMapper;
 import org.apache.kafka.test.MockProcessorSupplier;
 import org.apache.kafka.test.MockTimestampExtractor;
 import org.apache.kafka.test.MockValueJoiner;
@@ -300,7 +300,7 @@ public class KStreamBuilderTest {
         final KTable<String, String> table = builder.table("table-topic", "table-store");
         assertEquals(Collections.singletonList("table-topic"), builder.stateStoreNameToSourceTopics().get("table-store"));
 
-        final KStream<String, String> mapped = playEvents.map(MockKeyValueMapper.<String, String>SelectValueKeyValueMapper());
+        final KStream<String, String> mapped = playEvents.map(MockMapper.<String, String>selectValueKeyValueMapper());
         mapped.leftJoin(table, MockValueJoiner.TOSTRING_JOINER).groupByKey().count("count");
         assertEquals(Collections.singletonList("table-topic"), builder.stateStoreNameToSourceTopics().get("table-store"));
         assertEquals(Collections.singletonList(APP_ID + "-KSTREAM-MAP-0000000003-repartition"), builder.stateStoreNameToSourceTopics().get("count"));
diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/InternalStreamsBuilderTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/InternalStreamsBuilderTest.java
index 05a0214..156acad 100644
--- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/InternalStreamsBuilderTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/InternalStreamsBuilderTest.java
@@ -31,7 +31,7 @@ import org.apache.kafka.streams.processor.internals.InternalTopologyBuilder;
 import org.apache.kafka.streams.processor.internals.ProcessorTopology;
 import org.apache.kafka.streams.state.KeyValueStore;
 import org.apache.kafka.test.KStreamTestDriver;
-import org.apache.kafka.test.MockKeyValueMapper;
+import org.apache.kafka.test.MockMapper;
 import org.apache.kafka.test.MockTimestampExtractor;
 import org.apache.kafka.test.MockValueJoiner;
 import org.junit.After;
@@ -251,7 +251,7 @@ public class InternalStreamsBuilderTest {
         final KTable<String, String> table = builder.table("table-topic", consumed, materialized);
         assertEquals(Collections.singletonList("table-topic"), builder.internalTopologyBuilder.stateStoreNameToSourceTopics().get("table-store"));
 
-        final KStream<String, String> mapped = playEvents.map(MockKeyValueMapper.<String, String>SelectValueKeyValueMapper());
+        final KStream<String, String> mapped = playEvents.map(MockMapper.<String, String>selectValueKeyValueMapper());
         mapped.leftJoin(table, MockValueJoiner.TOSTRING_JOINER).groupByKey().count("count");
         assertEquals(Collections.singletonList("table-topic"), builder.internalTopologyBuilder.stateStoreNameToSourceTopics().get("table-store"));
         assertEquals(Collections.singletonList(APP_ID + "-KSTREAM-MAP-0000000003-repartition"), builder.internalTopologyBuilder.stateStoreNameToSourceTopics().get("count"));
diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KGroupedTableImplTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KGroupedTableImplTest.java
index 705cf62..3bbd7e2 100644
--- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KGroupedTableImplTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KGroupedTableImplTest.java
@@ -33,7 +33,7 @@ import org.apache.kafka.streams.state.KeyValueStore;
 import org.apache.kafka.test.KStreamTestDriver;
 import org.apache.kafka.test.MockAggregator;
 import org.apache.kafka.test.MockInitializer;
-import org.apache.kafka.test.MockKeyValueMapper;
+import org.apache.kafka.test.MockMapper;
 import org.apache.kafka.test.MockReducer;
 import org.apache.kafka.test.TestUtils;
 import org.junit.Before;
@@ -62,7 +62,7 @@ public class KGroupedTableImplTest {
     @Before
     public void before() {
         groupedTable = builder.table("blah", Consumed.with(Serdes.String(), Serdes.String()))
-                .groupBy(MockKeyValueMapper.<String, String>SelectValueKeyValueMapper());
+                .groupBy(MockMapper.<String, String>selectValueKeyValueMapper());
     }
 
     @Test
@@ -221,7 +221,7 @@ public class KGroupedTableImplTest {
     @Test
     public void shouldCountAndMaterializeResults() {
         final KTable<String, String> table = builder.table(topic, Consumed.with(Serdes.String(), Serdes.String()));
-        table.groupBy(MockKeyValueMapper.<String, String>SelectValueKeyValueMapper(),
+        table.groupBy(MockMapper.<String, String>selectValueKeyValueMapper(),
                       Serialized.with(Serdes.String(),
                                       Serdes.String()))
                 .count(Materialized.<String, Long, KeyValueStore<Bytes, byte[]>>as("count")
@@ -238,7 +238,7 @@ public class KGroupedTableImplTest {
     @Test
     public void shouldAggregateAndMaterializeResults() {
         final KTable<String, String> table = builder.table(topic, Consumed.with(Serdes.String(), Serdes.String()));
-        table.groupBy(MockKeyValueMapper.<String, String>SelectValueKeyValueMapper(),
+        table.groupBy(MockMapper.<String, String>selectValueKeyValueMapper(),
                       Serialized.with(Serdes.String(),
                                       Serdes.String()))
                 .aggregate(MockInitializer.STRING_INIT,
diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamImplTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamImplTest.java
index 0a0232c..562711d 100644
--- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamImplTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamImplTest.java
@@ -40,7 +40,7 @@ import org.apache.kafka.streams.processor.FailOnInvalidTimestamp;
 import org.apache.kafka.streams.processor.internals.ProcessorTopology;
 import org.apache.kafka.streams.processor.internals.SourceNode;
 import org.apache.kafka.test.KStreamTestDriver;
-import org.apache.kafka.test.MockKeyValueMapper;
+import org.apache.kafka.test.MockMapper;
 import org.apache.kafka.test.MockProcessorSupplier;
 import org.apache.kafka.test.MockValueJoiner;
 import org.junit.Before;
@@ -379,7 +379,7 @@ public class KStreamImplTest {
     @Test(expected = NullPointerException.class)
     public void shouldNotAllowNullTableOnJoinWithGlobalTable() {
         testStream.join((GlobalKTable) null,
-                        MockKeyValueMapper.<String, String>SelectValueMapper(),
+                        MockMapper.<String, String>selectValueMapper(),
                         MockValueJoiner.TOSTRING_JOINER);
     }
 
@@ -393,14 +393,14 @@ public class KStreamImplTest {
     @Test(expected = NullPointerException.class)
     public void shouldNotAllowNullJoinerOnJoinWithGlobalTable() {
         testStream.join(builder.globalTable("global", stringConsumed),
-                        MockKeyValueMapper.<String, String>SelectValueMapper(),
+                        MockMapper.<String, String>selectValueMapper(),
                         null);
     }
 
     @Test(expected = NullPointerException.class)
     public void shouldNotAllowNullTableOnJLeftJoinWithGlobalTable() {
         testStream.leftJoin((GlobalKTable) null,
-                        MockKeyValueMapper.<String, String>SelectValueMapper(),
+                        MockMapper.<String, String>selectValueMapper(),
                         MockValueJoiner.TOSTRING_JOINER);
     }
 
@@ -414,7 +414,7 @@ public class KStreamImplTest {
     @Test(expected = NullPointerException.class)
     public void shouldNotAllowNullJoinerOnLeftJoinWithGlobalTable() {
         testStream.leftJoin(builder.globalTable("global", stringConsumed),
-                        MockKeyValueMapper.<String, String>SelectValueMapper(),
+                        MockMapper.<String, String>selectValueMapper(),
                         null);
     }
 
diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableAggregateTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableAggregateTest.java
index 0ae95dd..df8d292 100644
--- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableAggregateTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableAggregateTest.java
@@ -35,7 +35,7 @@ import org.apache.kafka.streams.kstream.ValueMapper;
 import org.apache.kafka.test.KStreamTestDriver;
 import org.apache.kafka.test.MockAggregator;
 import org.apache.kafka.test.MockInitializer;
-import org.apache.kafka.test.MockKeyValueMapper;
+import org.apache.kafka.test.MockMapper;
 import org.apache.kafka.test.MockProcessorSupplier;
 import org.apache.kafka.test.TestUtils;
 import org.junit.Before;
@@ -73,7 +73,7 @@ public class KTableAggregateTest {
         final MockProcessorSupplier<String, String> proc = new MockProcessorSupplier<>();
 
         KTable<String, String> table1 = builder.table(topic1, consumed);
-        KTable<String, String> table2 = table1.groupBy(MockKeyValueMapper.<String, String>NoOpKeyValueMapper(),
+        KTable<String, String> table2 = table1.groupBy(MockMapper.<String, String>noOpKeyValueMapper(),
                                                        stringSerialzied
         ).aggregate(MockInitializer.STRING_INIT,
                 MockAggregator.TOSTRING_ADDER,
@@ -121,7 +121,7 @@ public class KTableAggregateTest {
         final MockProcessorSupplier<String, String> proc = new MockProcessorSupplier<>();
 
         KTable<String, String> table1 = builder.table(topic1, consumed);
-        KTable<String, String> table2 = table1.groupBy(MockKeyValueMapper.<String, String>NoOpKeyValueMapper(),
+        KTable<String, String> table2 = table1.groupBy(MockMapper.<String, String>noOpKeyValueMapper(),
                                                        stringSerialzied
         ).aggregate(MockInitializer.STRING_INIT,
             MockAggregator.TOSTRING_ADDER,
@@ -235,7 +235,7 @@ public class KTableAggregateTest {
         final MockProcessorSupplier<String, Long> proc = new MockProcessorSupplier<>();
 
         builder.table(input, consumed)
-                .groupBy(MockKeyValueMapper.<String, String>SelectValueKeyValueMapper(), stringSerialzied)
+                .groupBy(MockMapper.<String, String>selectValueKeyValueMapper(), stringSerialzied)
                 .count("count")
                 .toStream()
                 .process(proc);
@@ -250,7 +250,7 @@ public class KTableAggregateTest {
         final MockProcessorSupplier<String, Long> proc = new MockProcessorSupplier<>();
 
         builder.table(input, consumed)
-            .groupBy(MockKeyValueMapper.<String, String>SelectValueKeyValueMapper(), stringSerialzied)
+            .groupBy(MockMapper.<String, String>selectValueKeyValueMapper(), stringSerialzied)
             .count()
             .toStream()
             .process(proc);
@@ -265,7 +265,7 @@ public class KTableAggregateTest {
         final MockProcessorSupplier<String, Long> proc = new MockProcessorSupplier<>();
 
         builder.table(input, consumed)
-            .groupBy(MockKeyValueMapper.<String, String>SelectValueKeyValueMapper(), stringSerialzied)
+            .groupBy(MockMapper.<String, String>selectValueKeyValueMapper(), stringSerialzied)
             .count("count")
             .toStream()
             .process(proc);
diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableFilterTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableFilterTest.java
index 7986277..d70d8b7 100644
--- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableFilterTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableFilterTest.java
@@ -28,7 +28,7 @@ import org.apache.kafka.streams.state.KeyValueStore;
 import org.apache.kafka.test.KStreamTestDriver;
 import org.apache.kafka.test.MockProcessorSupplier;
 import org.apache.kafka.test.MockReducer;
-import org.apache.kafka.test.MockKeyValueMapper;
+import org.apache.kafka.test.MockMapper;
 import org.apache.kafka.test.TestUtils;
 import org.junit.Before;
 import org.junit.Rule;
@@ -451,7 +451,7 @@ public class KTableFilterTest {
                 public boolean test(String key, String value) {
                     return value.equalsIgnoreCase("accept");
                 }
-            }).groupBy(MockKeyValueMapper.<String, String>NoOpKeyValueMapper())
+            }).groupBy(MockMapper.<String, String>noOpKeyValueMapper())
             .reduce(MockReducer.STRING_ADDER, MockReducer.STRING_REMOVER, "mock-result");
 
         doTestSkipNullOnMaterialization(builder, table1, table2, topic1);
@@ -473,7 +473,7 @@ public class KTableFilterTest {
                 public boolean test(String key, String value) {
                     return value.equalsIgnoreCase("accept");
                 }
-            }, "anyStoreNameFilter").groupBy(MockKeyValueMapper.<String, String>NoOpKeyValueMapper())
+            }, "anyStoreNameFilter").groupBy(MockMapper.<String, String>noOpKeyValueMapper())
             .reduce(MockReducer.STRING_ADDER, MockReducer.STRING_REMOVER, "mock-result");
 
         doTestSkipNullOnMaterialization(builder, table1, table2, topic1);
diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableImplTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableImplTest.java
index 9d918e2..9539b45 100644
--- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableImplTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableImplTest.java
@@ -35,7 +35,7 @@ import org.apache.kafka.streams.state.KeyValueStore;
 import org.apache.kafka.test.KStreamTestDriver;
 import org.apache.kafka.test.MockAggregator;
 import org.apache.kafka.test.MockInitializer;
-import org.apache.kafka.test.MockKeyValueMapper;
+import org.apache.kafka.test.MockMapper;
 import org.apache.kafka.test.MockProcessorSupplier;
 import org.apache.kafka.test.MockReducer;
 import org.apache.kafka.test.MockValueJoiner;
@@ -341,11 +341,11 @@ public class KTableImplTest {
                                                                            .withValueSerde(stringSerde)
                 );
 
-        table1.groupBy(MockKeyValueMapper.<String, String>NoOpKeyValueMapper())
+        table1.groupBy(MockMapper.<String, String>noOpKeyValueMapper())
             .aggregate(MockInitializer.STRING_INIT, MockAggregator.TOSTRING_ADDER, MockAggregator.TOSTRING_REMOVER, "mock-result1");
 
 
-        table1.groupBy(MockKeyValueMapper.<String, String>NoOpKeyValueMapper())
+        table1.groupBy(MockMapper.<String, String>noOpKeyValueMapper())
             .reduce(MockReducer.STRING_ADDER, MockReducer.STRING_REMOVER, "mock-result2");
 
         driver.setUp(builder, stateDir, stringSerde, stringSerde);
diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/InternalTopologyBuilderTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/InternalTopologyBuilderTest.java
index e223699..588ce46 100644
--- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/InternalTopologyBuilderTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/InternalTopologyBuilderTest.java
@@ -463,6 +463,11 @@ public class InternalTopologyBuilderTest {
     }
 
     @Test(expected = NullPointerException.class)
+    public void shouldNotAllowNullStateStoreNameWhenConnectingProcessorAndStateStores() throws Exception {
+        builder.connectProcessorAndStateStores("processor", new String[]{null});
+    }
+
+    @Test(expected = NullPointerException.class)
     public void shouldNotAddNullInternalTopic() throws Exception {
         builder.addInternalTopic(null);
     }
diff --git a/streams/src/test/java/org/apache/kafka/test/MockKeyValueMapper.java b/streams/src/test/java/org/apache/kafka/test/MockMapper.java
similarity index 76%
rename from streams/src/test/java/org/apache/kafka/test/MockKeyValueMapper.java
rename to streams/src/test/java/org/apache/kafka/test/MockMapper.java
index 2ad24d7..fec9522 100644
--- a/streams/src/test/java/org/apache/kafka/test/MockKeyValueMapper.java
+++ b/streams/src/test/java/org/apache/kafka/test/MockMapper.java
@@ -18,10 +18,9 @@ package org.apache.kafka.test;
 
 import org.apache.kafka.streams.KeyValue;
 import org.apache.kafka.streams.kstream.KeyValueMapper;
+import org.apache.kafka.streams.kstream.ValueMapper;
 
-public class MockKeyValueMapper {
-
-
+public class MockMapper {
 
     private static class NoOpKeyValueMapper<K, V> implements KeyValueMapper<K, V, KeyValue<K, V>> {
         @Override
@@ -51,20 +50,31 @@ public class MockKeyValueMapper {
         }
     }
 
-    public static <K, V> KeyValueMapper<K, V, K> SelectKeyKeyValueMapper() {
+    private static class NoOpValueMapper<V> implements ValueMapper<V, V> {
+        @Override
+        public V apply(final V value) {
+            return value;
+        }
+    }
+
+    public static <K, V> KeyValueMapper<K, V, K> selectKeyKeyValueMapper() {
         return new SelectKeyMapper<>();
     }
 
 
-    public static <K, V> KeyValueMapper<K, V, KeyValue<K, V>> NoOpKeyValueMapper() {
+    public static <K, V> KeyValueMapper<K, V, KeyValue<K, V>> noOpKeyValueMapper() {
         return new NoOpKeyValueMapper<>();
     }
 
-    public static <K, V> KeyValueMapper<K, V, KeyValue<V, V>> SelectValueKeyValueMapper() {
+    public static <K, V> KeyValueMapper<K, V, KeyValue<V, V>> selectValueKeyValueMapper() {
         return new SelectValueKeyValueMapper<>();
     }
 
-    public static <K, V> KeyValueMapper<K, V, V> SelectValueMapper() {
+    public static <K, V> KeyValueMapper<K, V, V> selectValueMapper() {
         return new SelectValueMapper<>();
     }
+
+    public static <V> ValueMapper<V, V> noOpValueMapper() {
+        return new NoOpValueMapper<>();
+    }
 }
\ No newline at end of file
diff --git a/streams/src/test/java/org/apache/kafka/test/MockPredicate.java b/streams/src/test/java/org/apache/kafka/test/MockPredicate.java
new file mode 100644
index 0000000..9d59bab
--- /dev/null
+++ b/streams/src/test/java/org/apache/kafka/test/MockPredicate.java
@@ -0,0 +1,33 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.test;
+
+import org.apache.kafka.streams.kstream.Predicate;
+
+public class MockPredicate {
+
+    private static class AllGoodPredicate<K, V> implements Predicate<K, V> {
+        @Override
+        public boolean test(final K key, final V value) {
+            return true;
+        }
+    }
+
+    public static <K, V> Predicate<K, V> allGoodPredicate() {
+        return new AllGoodPredicate<>();
+    }
+}

-- 
To stop receiving notification emails like this one, please contact
['"commits@kafka.apache.org" <co...@kafka.apache.org>'].