You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by gu...@apache.org on 2016/04/21 23:51:24 UTC

kafka git commit: KAFKA-3589: set inner serializer for ChangedSerde upon initialization

Repository: kafka
Updated Branches:
  refs/heads/trunk 5c547475d -> 74e6dc842


KAFKA-3589: set inner serializer for ChangedSerde upon initialization

Author: Guozhang Wang <wa...@gmail.com>

Reviewers: Eno Thereska <en...@gmail.com>, Ismael Juma <is...@juma.me.uk>

Closes #1246 from guozhangwang/K3589


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/74e6dc84
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/74e6dc84
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/74e6dc84

Branch: refs/heads/trunk
Commit: 74e6dc842559d344241c70cb6607a69291e3a20d
Parents: 5c54747
Author: Guozhang Wang <wa...@gmail.com>
Authored: Thu Apr 21 14:51:21 2016 -0700
Committer: Guozhang Wang <wa...@gmail.com>
Committed: Thu Apr 21 14:51:21 2016 -0700

----------------------------------------------------------------------
 .../kstream/internals/ChangedDeserializer.java  | 10 ++-
 .../kstream/internals/ChangedSerializer.java    | 10 ++-
 .../kstream/internals/KGroupedTableImpl.java    | 76 +++++++++++---------
 .../streams/processor/internals/SinkNode.java   | 14 ++++
 .../streams/processor/internals/SourceNode.java | 18 ++++-
 .../kstream/internals/KTableAggregateTest.java  | 39 ++--------
 .../kstream/internals/KTableImplTest.java       | 49 +++++++++++++
 .../apache/kafka/test/KStreamTestDriver.java    | 26 ++++++-
 .../org/apache/kafka/test/MockAggregator.java   | 43 +++++++++++
 .../org/apache/kafka/test/MockInitializer.java  | 33 +++++++++
 .../apache/kafka/test/MockKeyValueMapper.java   | 36 ++++++++++
 .../java/org/apache/kafka/test/MockReducer.java | 43 +++++++++++
 .../apache/kafka/test/NoOpKeyValueMapper.java   | 29 --------
 13 files changed, 325 insertions(+), 101 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/74e6dc84/streams/src/main/java/org/apache/kafka/streams/kstream/internals/ChangedDeserializer.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/ChangedDeserializer.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/ChangedDeserializer.java
index d4c4e2d..ce9be49 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/ChangedDeserializer.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/ChangedDeserializer.java
@@ -26,12 +26,20 @@ public class ChangedDeserializer<T> implements Deserializer<Change<T>> {
 
     private static final int NEWFLAG_SIZE = 1;
 
-    private final Deserializer<T> inner;
+    private Deserializer<T> inner;
 
     public ChangedDeserializer(Deserializer<T> inner) {
         this.inner = inner;
     }
 
+    public Deserializer<T> inner() {
+        return inner;
+    }
+
+    public void setInner(Deserializer<T> inner) {
+        this.inner = inner;
+    }
+
     @Override
     public void configure(Map<String, ?> configs, boolean isKey) {
         // do nothing

http://git-wip-us.apache.org/repos/asf/kafka/blob/74e6dc84/streams/src/main/java/org/apache/kafka/streams/kstream/internals/ChangedSerializer.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/ChangedSerializer.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/ChangedSerializer.java
index 5dbbac9..12e06f8 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/ChangedSerializer.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/ChangedSerializer.java
@@ -27,12 +27,20 @@ public class ChangedSerializer<T> implements Serializer<Change<T>> {
 
     private static final int NEWFLAG_SIZE = 1;
 
-    private final Serializer<T> inner;
+    private Serializer<T> inner;
 
     public ChangedSerializer(Serializer<T> inner) {
         this.inner = inner;
     }
 
+    public Serializer<T> inner() {
+        return inner;
+    }
+
+    public void setInner(Serializer<T> inner) {
+        this.inner = inner;
+    }
+
     @Override
     public void configure(Map<String, ?> configs, boolean isKey) {
         // do nothing

http://git-wip-us.apache.org/repos/asf/kafka/blob/74e6dc84/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KGroupedTableImpl.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KGroupedTableImpl.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KGroupedTableImpl.java
index d9b0f3d..f2e2eed 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KGroupedTableImpl.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KGroupedTableImpl.java
@@ -17,8 +17,10 @@
 
 package org.apache.kafka.streams.kstream.internals;
 
+import org.apache.kafka.common.serialization.Deserializer;
 import org.apache.kafka.common.serialization.Serde;
 import org.apache.kafka.common.serialization.Serdes;
+import org.apache.kafka.common.serialization.Serializer;
 import org.apache.kafka.streams.kstream.Aggregator;
 import org.apache.kafka.streams.kstream.Initializer;
 import org.apache.kafka.streams.kstream.KGroupedTable;
@@ -48,15 +50,12 @@ public class KGroupedTableImpl<K, V> extends AbstractStream<K> implements KGroup
     protected final Serde<K> keySerde;
     protected final Serde<V> valSerde;
 
-    private final String sourceName;
-
     public KGroupedTableImpl(KStreamBuilder topology,
                              String name,
                              String sourceName,
                              Serde<K> keySerde,
                              Serde<V> valSerde) {
         super(topology, name, Collections.singleton(sourceName));
-        this.sourceName = sourceName;
         this.keySerde = keySerde;
         this.valSerde = valSerde;
     }
@@ -74,8 +73,13 @@ public class KGroupedTableImpl<K, V> extends AbstractStream<K> implements KGroup
 
         String topic = name + REPARTITION_TOPIC_SUFFIX;
 
-        ChangedSerializer<V> changedValueSerializer = new ChangedSerializer<>(valSerde.serializer());
-        ChangedDeserializer<V> changedValueDeserializer = new ChangedDeserializer<>(valSerde.deserializer());
+        Serializer<K> keySerializer = keySerde == null ? null : keySerde.serializer();
+        Deserializer<K> keyDeserializer = keySerde == null ? null : keySerde.deserializer();
+        Serializer<V> valueSerializer = valSerde == null ? null : valSerde.serializer();
+        Deserializer<V> valueDeserializer = valSerde == null ? null : valSerde.deserializer();
+
+        ChangedSerializer<V> changedValueSerializer = new ChangedSerializer<>(valueSerializer);
+        ChangedDeserializer<V> changedValueDeserializer = new ChangedDeserializer<>(valueDeserializer);
 
         ProcessorSupplier<K, Change<V>> aggregateSupplier = new KTableAggregate<>(name, initializer, adder, subtractor);
 
@@ -87,10 +91,10 @@ public class KGroupedTableImpl<K, V> extends AbstractStream<K> implements KGroup
 
         // send the aggregate key-value pairs to the intermediate topic for partitioning
         topology.addInternalTopic(topic);
-        topology.addSink(sinkName, topic, keySerde.serializer(), changedValueSerializer, this.name);
+        topology.addSink(sinkName, topic, keySerializer, changedValueSerializer, this.name);
 
         // read the intermediate topic
-        topology.addSource(sourceName, keySerde.deserializer(), changedValueDeserializer, topic);
+        topology.addSource(sourceName, keyDeserializer, changedValueDeserializer, topic);
 
         // aggregate the values with the aggregator and local store
         topology.addProcessor(aggregateName, aggregateSupplier, sourceName);
@@ -110,29 +114,6 @@ public class KGroupedTableImpl<K, V> extends AbstractStream<K> implements KGroup
     }
 
     @Override
-    public KTable<K, Long> count(String name) {
-        return this.aggregate(
-                new Initializer<Long>() {
-                    @Override
-                    public Long apply() {
-                        return 0L;
-                    }
-                },
-                new Aggregator<K, V, Long>() {
-                    @Override
-                    public Long apply(K aggKey, V value, Long aggregate) {
-                        return aggregate + 1L;
-                    }
-                }, new Aggregator<K, V, Long>() {
-                    @Override
-                    public Long apply(K aggKey, V value, Long aggregate) {
-                        return aggregate - 1L;
-                    }
-                },
-                Serdes.Long(), name);
-    }
-
-    @Override
     public KTable<K, V> reduce(Reducer<V> adder,
                                Reducer<V> subtractor,
                                String name) {
@@ -143,8 +124,13 @@ public class KGroupedTableImpl<K, V> extends AbstractStream<K> implements KGroup
 
         String topic = name + REPARTITION_TOPIC_SUFFIX;
 
-        ChangedSerializer<V> changedValueSerializer = new ChangedSerializer<>(valSerde.serializer());
-        ChangedDeserializer<V> changedValueDeserializer = new ChangedDeserializer<>(valSerde.deserializer());
+        Serializer<K> keySerializer = keySerde == null ? null : keySerde.serializer();
+        Deserializer<K> keyDeserializer = keySerde == null ? null : keySerde.deserializer();
+        Serializer<V> valueSerializer = valSerde == null ? null : valSerde.serializer();
+        Deserializer<V> valueDeserializer = valSerde == null ? null : valSerde.deserializer();
+
+        ChangedSerializer<V> changedValueSerializer = new ChangedSerializer<>(valueSerializer);
+        ChangedDeserializer<V> changedValueDeserializer = new ChangedDeserializer<>(valueDeserializer);
 
         ProcessorSupplier<K, Change<V>> aggregateSupplier = new KTableReduce<>(name, adder, subtractor);
 
@@ -156,10 +142,10 @@ public class KGroupedTableImpl<K, V> extends AbstractStream<K> implements KGroup
 
         // send the aggregate key-value pairs to the intermediate topic for partitioning
         topology.addInternalTopic(topic);
-        topology.addSink(sinkName, topic, keySerde.serializer(), changedValueSerializer, this.name);
+        topology.addSink(sinkName, topic, keySerializer, changedValueSerializer, this.name);
 
         // read the intermediate topic
-        topology.addSource(sourceName, keySerde.deserializer(), changedValueDeserializer, topic);
+        topology.addSource(sourceName, keyDeserializer, changedValueDeserializer, topic);
 
         // aggregate the values with the aggregator and local store
         topology.addProcessor(reduceName, aggregateSupplier, sourceName);
@@ -169,4 +155,26 @@ public class KGroupedTableImpl<K, V> extends AbstractStream<K> implements KGroup
         return new KTableImpl<>(topology, reduceName, aggregateSupplier, Collections.singleton(sourceName));
     }
 
+    @Override
+    public KTable<K, Long> count(String name) {
+        return this.aggregate(
+                new Initializer<Long>() {
+                    @Override
+                    public Long apply() {
+                        return 0L;
+                    }
+                },
+                new Aggregator<K, V, Long>() {
+                    @Override
+                    public Long apply(K aggKey, V value, Long aggregate) {
+                        return aggregate + 1L;
+                    }
+                }, new Aggregator<K, V, Long>() {
+                    @Override
+                    public Long apply(K aggKey, V value, Long aggregate) {
+                        return aggregate - 1L;
+                    }
+                },
+                Serdes.Long(), name);
+    }
 }

http://git-wip-us.apache.org/repos/asf/kafka/blob/74e6dc84/streams/src/main/java/org/apache/kafka/streams/processor/internals/SinkNode.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/SinkNode.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/SinkNode.java
index e9c2760..3795916 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/SinkNode.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/SinkNode.java
@@ -19,6 +19,7 @@ package org.apache.kafka.streams.processor.internals;
 
 import org.apache.kafka.clients.producer.ProducerRecord;
 import org.apache.kafka.common.serialization.Serializer;
+import org.apache.kafka.streams.kstream.internals.ChangedSerializer;
 import org.apache.kafka.streams.processor.ProcessorContext;
 import org.apache.kafka.streams.processor.StreamPartitioner;
 
@@ -52,8 +53,16 @@ public class SinkNode<K, V> extends ProcessorNode<K, V> {
     @Override
     public void init(ProcessorContext context) {
         this.context = context;
+
+        // if serializers are null, get the default ones from the context
         if (this.keySerializer == null) this.keySerializer = (Serializer<K>) context.keySerde().serializer();
         if (this.valSerializer == null) this.valSerializer = (Serializer<V>) context.valueSerde().serializer();
+
+        // if value serializers are for {@code Change} values, set the inner serializer when necessary
+        if (this.valSerializer instanceof ChangedSerializer &&
+                ((ChangedSerializer) this.valSerializer).inner() == null)
+            ((ChangedSerializer) this.valSerializer).setInner(context.valueSerde().serializer());
+
     }
 
     @Override
@@ -67,4 +76,9 @@ public class SinkNode<K, V> extends ProcessorNode<K, V> {
     public void close() {
         // do nothing
     }
+
+    // for test only
+    public Serializer<V> valueSerializer() {
+        return valSerializer;
+    }
 }

http://git-wip-us.apache.org/repos/asf/kafka/blob/74e6dc84/streams/src/main/java/org/apache/kafka/streams/processor/internals/SourceNode.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/SourceNode.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/SourceNode.java
index 1868c1b..a550344 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/SourceNode.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/SourceNode.java
@@ -18,6 +18,7 @@
 package org.apache.kafka.streams.processor.internals;
 
 import org.apache.kafka.common.serialization.Deserializer;
+import org.apache.kafka.streams.kstream.internals.ChangedDeserializer;
 import org.apache.kafka.streams.processor.ProcessorContext;
 
 public class SourceNode<K, V> extends ProcessorNode<K, V> {
@@ -46,9 +47,16 @@ public class SourceNode<K, V> extends ProcessorNode<K, V> {
     public void init(ProcessorContext context) {
         this.context = context;
 
-        // if serializers are null, get the default ones from the context
-        if (this.keyDeserializer == null) this.keyDeserializer = (Deserializer<K>) context.keySerde().deserializer();
-        if (this.valDeserializer == null) this.valDeserializer = (Deserializer<V>) context.valueSerde().deserializer();
+        // if deserializers are null, get the default ones from the context
+        if (this.keyDeserializer == null)
+            this.keyDeserializer = (Deserializer<K>) context.keySerde().deserializer();
+        if (this.valDeserializer == null)
+            this.valDeserializer = (Deserializer<V>) context.valueSerde().deserializer();
+
+        // if value deserializers are for {@code Change} values, set the inner deserializer when necessary
+        if (this.valDeserializer instanceof ChangedDeserializer &&
+                ((ChangedDeserializer) this.valDeserializer).inner() == null)
+            ((ChangedDeserializer) this.valDeserializer).setInner(context.valueSerde().deserializer());
     }
 
     @Override
@@ -61,4 +69,8 @@ public class SourceNode<K, V> extends ProcessorNode<K, V> {
         // do nothing
     }
 
+    // for test only
+    public Deserializer<V> valueDeserializer() {
+        return valDeserializer;
+    }
 }

http://git-wip-us.apache.org/repos/asf/kafka/blob/74e6dc84/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableAggregateTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableAggregateTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableAggregateTest.java
index 1564e95..be0ec19 100644
--- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableAggregateTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableAggregateTest.java
@@ -20,13 +20,13 @@ package org.apache.kafka.streams.kstream.internals;
 import org.apache.kafka.common.serialization.Serde;
 import org.apache.kafka.common.serialization.Serdes;
 import org.apache.kafka.common.utils.Utils;
-import org.apache.kafka.streams.kstream.Aggregator;
-import org.apache.kafka.streams.kstream.Initializer;
 import org.apache.kafka.streams.kstream.KStreamBuilder;
 import org.apache.kafka.streams.kstream.KTable;
 import org.apache.kafka.test.KStreamTestDriver;
+import org.apache.kafka.test.MockAggregator;
+import org.apache.kafka.test.MockInitializer;
+import org.apache.kafka.test.MockKeyValueMapper;
 import org.apache.kafka.test.MockProcessorSupplier;
-import org.apache.kafka.test.NoOpKeyValueMapper;
 import org.junit.Test;
 
 import java.io.File;
@@ -38,31 +38,6 @@ public class KTableAggregateTest {
 
     final private Serde<String> stringSerde = new Serdes.StringSerde();
 
-    private class StringAdd implements Aggregator<String, String, String> {
-
-        @Override
-        public String apply(String aggKey, String value, String aggregate) {
-            return aggregate + "+" + value;
-        }
-    }
-
-    private class StringRemove implements Aggregator<String, String, String> {
-
-        @Override
-        public String apply(String aggKey, String value, String aggregate) {
-            return aggregate + "-" + value;
-        }
-    }
-
-    private class StringInit implements Initializer<String> {
-
-        @Override
-        public String apply() {
-            return "0";
-        }
-    }
-
-
     @Test
     public void testAggBasic() throws Exception {
         final File baseDir = Files.createTempDirectory("test").toFile();
@@ -72,12 +47,12 @@ public class KTableAggregateTest {
             String topic1 = "topic1";
 
             KTable<String, String> table1 = builder.table(stringSerde, stringSerde, topic1);
-            KTable<String, String> table2 = table1.groupBy(new NoOpKeyValueMapper<String, String>(),
+            KTable<String, String> table2 = table1.groupBy(MockKeyValueMapper.<String, String>NoOpKeyValueMapper(),
                                                            stringSerde,
                                                            stringSerde
-            ).aggregate(new StringInit(),
-                        new StringAdd(),
-                        new StringRemove(),
+            ).aggregate(MockInitializer.STRING_INIT,
+                        MockAggregator.STRING_ADDER,
+                        MockAggregator.STRING_REMOVER,
                         stringSerde,
                         "topic1-Canonized");
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/74e6dc84/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableImplTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableImplTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableImplTest.java
index 6f49b6a..8a13e9a 100644
--- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableImplTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableImplTest.java
@@ -25,8 +25,14 @@ import org.apache.kafka.streams.kstream.KTable;
 import org.apache.kafka.streams.kstream.Predicate;
 import org.apache.kafka.streams.kstream.ValueJoiner;
 import org.apache.kafka.streams.kstream.ValueMapper;
+import org.apache.kafka.streams.processor.internals.SinkNode;
+import org.apache.kafka.streams.processor.internals.SourceNode;
 import org.apache.kafka.test.KStreamTestDriver;
+import org.apache.kafka.test.MockAggregator;
+import org.apache.kafka.test.MockInitializer;
+import org.apache.kafka.test.MockKeyValueMapper;
 import org.apache.kafka.test.MockProcessorSupplier;
+import org.apache.kafka.test.MockReducer;
 import org.junit.Test;
 
 import java.io.File;
@@ -34,7 +40,9 @@ import java.io.IOException;
 import java.nio.file.Files;
 
 import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
 import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertTrue;
 
 public class KTableImplTest {
 
@@ -295,6 +303,47 @@ public class KTableImplTest {
         } finally {
             Utils.delete(stateDir);
         }
+    }
+
+    @Test
+    public void testRepartition() throws IOException {
+        String topic1 = "topic1";
+
+        File stateDir = Files.createTempDirectory("test").toFile();
+        try {
+            KStreamBuilder builder = new KStreamBuilder();
+
+            KTableImpl<String, String, String> table1 =
+                    (KTableImpl<String, String, String>) builder.table(stringSerde, stringSerde, topic1);
 
+            KTableImpl<String, String, String> table1Aggregated = (KTableImpl<String, String, String>) table1
+                    .groupBy(MockKeyValueMapper.<String, String>NoOpKeyValueMapper())
+                    .aggregate(MockInitializer.STRING_INIT, MockAggregator.STRING_ADDER, MockAggregator.STRING_REMOVER, "mock-result1");
+
+
+            KTableImpl<String, String, String> table1Reduced = (KTableImpl<String, String, String>) table1
+                    .groupBy(MockKeyValueMapper.<String, String>NoOpKeyValueMapper())
+                    .reduce(MockReducer.STRING_ADDER, MockReducer.STRING_REMOVER, "mock-result2");
+
+            KStreamTestDriver driver = new KStreamTestDriver(builder, stateDir, stringSerde, stringSerde);
+            driver.setTime(0L);
+
+            // three state store should be created, one for source, one for aggregate and one for reduce
+            assertEquals(3, driver.allStateStores().size());
+
+            // contains the corresponding repartition source / sink nodes
+            assertTrue(driver.allProcessorNames().contains("KSTREAM-SINK-0000000003"));
+            assertTrue(driver.allProcessorNames().contains("KSTREAM-SOURCE-0000000004"));
+            assertTrue(driver.allProcessorNames().contains("KSTREAM-SINK-0000000007"));
+            assertTrue(driver.allProcessorNames().contains("KSTREAM-SOURCE-0000000008"));
+
+            assertNotNull(((ChangedSerializer) ((SinkNode) driver.processor("KSTREAM-SINK-0000000003")).valueSerializer()).inner());
+            assertNotNull(((ChangedDeserializer) ((SourceNode) driver.processor("KSTREAM-SOURCE-0000000004")).valueDeserializer()).inner());
+            assertNotNull(((ChangedSerializer) ((SinkNode) driver.processor("KSTREAM-SINK-0000000007")).valueSerializer()).inner());
+            assertNotNull(((ChangedDeserializer) ((SourceNode) driver.processor("KSTREAM-SOURCE-0000000008")).valueDeserializer()).inner());
+
+        } finally {
+            Utils.delete(stateDir);
+        }
     }
 }

http://git-wip-us.apache.org/repos/asf/kafka/blob/74e6dc84/streams/src/test/java/org/apache/kafka/test/KStreamTestDriver.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/test/KStreamTestDriver.java b/streams/src/test/java/org/apache/kafka/test/KStreamTestDriver.java
index 2ee8730..d738794 100644
--- a/streams/src/test/java/org/apache/kafka/test/KStreamTestDriver.java
+++ b/streams/src/test/java/org/apache/kafka/test/KStreamTestDriver.java
@@ -31,8 +31,10 @@ import org.apache.kafka.streams.processor.internals.ProcessorTopology;
 import org.apache.kafka.streams.processor.internals.RecordCollector;
 
 import java.io.File;
+import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
+import java.util.Set;
 
 public class KStreamTestDriver {
 
@@ -151,11 +153,33 @@ public class KStreamTestDriver {
         }
     }
 
+    public Set<String> allProcessorNames() {
+        Set<String> names = new HashSet<>();
+
+        List<ProcessorNode> nodes = topology.processors();
+
+        for (ProcessorNode node: nodes) {
+            names.add(node.name());
+        }
+
+        return names;
+    }
+
+    public ProcessorNode processor(String name) {
+        List<ProcessorNode> nodes = topology.processors();
+
+        for (ProcessorNode node: nodes) {
+            if (node.name().equals(name))
+                return node;
+        }
+
+        return null;
+    }
+
     public Map<String, StateStore> allStateStores() {
         return context.allStateStores();
     }
 
-
     private class MockRecordCollector extends RecordCollector {
         public MockRecordCollector() {
             super(null);

http://git-wip-us.apache.org/repos/asf/kafka/blob/74e6dc84/streams/src/test/java/org/apache/kafka/test/MockAggregator.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/test/MockAggregator.java b/streams/src/test/java/org/apache/kafka/test/MockAggregator.java
new file mode 100644
index 0000000..e8bb10b
--- /dev/null
+++ b/streams/src/test/java/org/apache/kafka/test/MockAggregator.java
@@ -0,0 +1,43 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.test;
+
+import org.apache.kafka.streams.kstream.Aggregator;
+
+public class MockAggregator {
+
+    private static class StringAdd implements Aggregator<String, String, String> {
+
+        @Override
+        public String apply(String aggKey, String value, String aggregate) {
+            return aggregate + "+" + value;
+        }
+    }
+
+    private static class StringRemove implements Aggregator<String, String, String> {
+
+        @Override
+        public String apply(String aggKey, String value, String aggregate) {
+            return aggregate + "-" + value;
+        }
+    }
+
+    public final static Aggregator<String, String, String> STRING_ADDER = new StringAdd();
+
+    public final static Aggregator<String, String, String> STRING_REMOVER = new StringRemove();
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/74e6dc84/streams/src/test/java/org/apache/kafka/test/MockInitializer.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/test/MockInitializer.java b/streams/src/test/java/org/apache/kafka/test/MockInitializer.java
new file mode 100644
index 0000000..9bfe7f8
--- /dev/null
+++ b/streams/src/test/java/org/apache/kafka/test/MockInitializer.java
@@ -0,0 +1,33 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.test;
+
+import org.apache.kafka.streams.kstream.Initializer;
+
+public class MockInitializer {
+
+    private static class StringInit implements Initializer<String> {
+
+        @Override
+        public String apply() {
+            return "0";
+        }
+    }
+
+    public final static Initializer<String> STRING_INIT = new StringInit();
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/74e6dc84/streams/src/test/java/org/apache/kafka/test/MockKeyValueMapper.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/test/MockKeyValueMapper.java b/streams/src/test/java/org/apache/kafka/test/MockKeyValueMapper.java
new file mode 100644
index 0000000..ae8c2fd
--- /dev/null
+++ b/streams/src/test/java/org/apache/kafka/test/MockKeyValueMapper.java
@@ -0,0 +1,36 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.test;
+
+import org.apache.kafka.streams.KeyValue;
+import org.apache.kafka.streams.kstream.KeyValueMapper;
+
+public class MockKeyValueMapper {
+
+    private static class NoOpKeyValueMapper<K, V> implements KeyValueMapper<K, V, KeyValue<K, V>> {
+
+        @Override
+        public KeyValue<K, V> apply(K key, V value) {
+            return new KeyValue<>(key, value);
+        }
+    }
+
+    public static <K, V> KeyValueMapper<K, V, KeyValue<K, V>> NoOpKeyValueMapper() {
+        return new NoOpKeyValueMapper<>();
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/kafka/blob/74e6dc84/streams/src/test/java/org/apache/kafka/test/MockReducer.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/test/MockReducer.java b/streams/src/test/java/org/apache/kafka/test/MockReducer.java
new file mode 100644
index 0000000..24a8fea
--- /dev/null
+++ b/streams/src/test/java/org/apache/kafka/test/MockReducer.java
@@ -0,0 +1,43 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.test;
+
+import org.apache.kafka.streams.kstream.Reducer;
+
+public class MockReducer {
+
+    private static class StringAdd implements Reducer<String> {
+
+        @Override
+        public String apply(String value1, String value2) {
+            return value1 + "+" + value2;
+        }
+    }
+
+    private static class StringRemove implements Reducer<String> {
+
+        @Override
+        public String apply(String value1, String value2) {
+            return value1 + "-" + value2;
+        }
+    }
+
+    public final static Reducer<String> STRING_ADDER = new StringAdd();
+
+    public final static Reducer<String> STRING_REMOVER = new StringRemove();
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/kafka/blob/74e6dc84/streams/src/test/java/org/apache/kafka/test/NoOpKeyValueMapper.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/test/NoOpKeyValueMapper.java b/streams/src/test/java/org/apache/kafka/test/NoOpKeyValueMapper.java
deleted file mode 100644
index 828b5ae..0000000
--- a/streams/src/test/java/org/apache/kafka/test/NoOpKeyValueMapper.java
+++ /dev/null
@@ -1,29 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.kafka.test;
-
-import org.apache.kafka.streams.KeyValue;
-import org.apache.kafka.streams.kstream.KeyValueMapper;
-
-public class NoOpKeyValueMapper<K, V> implements KeyValueMapper<K, V, KeyValue<K, V>> {
-
-    @Override
-    public KeyValue<K, V> apply(K key, V value) {
-        return new KeyValue<>(key, value);
-    }
-}