You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by gu...@apache.org on 2015/11/04 18:52:08 UTC

kafka git commit: KAFKA-2727: Topology partial construction

Repository: kafka
Updated Branches:
  refs/heads/trunk 7ded64bc2 -> 421de0a3f


KAFKA-2727: Topology partial construction

guozhangwang

Author: Yasuhiro Matsuda <ya...@confluent.io>

Reviewers: Guozhang Wang

Closes #411 from ymatsuda/topology_partial_construction


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/421de0a3
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/421de0a3
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/421de0a3

Branch: refs/heads/trunk
Commit: 421de0a3f93f8d21f7ca0f1287a4305c00edaa08
Parents: 7ded64b
Author: Yasuhiro Matsuda <ya...@confluent.io>
Authored: Wed Nov 4 09:57:50 2015 -0800
Committer: Guozhang Wang <wa...@gmail.com>
Committed: Wed Nov 4 09:57:50 2015 -0800

----------------------------------------------------------------------
 .../streams/processor/TopologyBuilder.java      | 111 ++++++++++++-------
 .../processor/internals/StreamThread.java       |   4 +-
 .../kstream/internals/KStreamImplTest.java      |   2 +-
 .../streams/processor/TopologyBuilderTest.java  |  58 +++++++---
 .../internals/ProcessorTopologyTest.java        |   2 +-
 .../processor/internals/StreamTaskTest.java     |  66 +++++------
 .../processor/internals/StreamThreadTest.java   |   9 +-
 .../apache/kafka/test/KStreamTestDriver.java    |   2 +-
 .../kafka/test/ProcessorTopologyTestDriver.java |   2 +-
 9 files changed, 155 insertions(+), 101 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/421de0a3/streams/src/main/java/org/apache/kafka/streams/processor/TopologyBuilder.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/TopologyBuilder.java b/streams/src/main/java/org/apache/kafka/streams/processor/TopologyBuilder.java
index 5b6d4ae..893f7de 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/TopologyBuilder.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/TopologyBuilder.java
@@ -63,18 +63,23 @@ public class TopologyBuilder {
     private Map<String, StateStoreSupplier> stateStores = new HashMap<>();
     private Map<String, Set<String>> stateStoreUsers = new HashMap();
 
-    private interface NodeFactory {
-        ProcessorNode build();
+    private static abstract class NodeFactory {
+        public final String name;
+
+        NodeFactory(String name) {
+            this.name = name;
+        }
+
+        public abstract ProcessorNode build();
     }
 
-    private class ProcessorNodeFactory implements NodeFactory {
+    private static class ProcessorNodeFactory extends NodeFactory {
         public final String[] parents;
-        private final String name;
         private final ProcessorSupplier supplier;
         private final Set<String> stateStoreNames = new HashSet<>();
 
         public ProcessorNodeFactory(String name, String[] parents, ProcessorSupplier supplier) {
-            this.name = name;
+            super(name);
             this.parents = parents.clone();
             this.supplier = supplier;
         }
@@ -89,14 +94,13 @@ public class TopologyBuilder {
         }
     }
 
-    private class SourceNodeFactory implements NodeFactory {
+    private static class SourceNodeFactory extends NodeFactory {
         public final String[] topics;
-        private final String name;
         private Deserializer keyDeserializer;
         private Deserializer valDeserializer;
 
         private SourceNodeFactory(String name, String[] topics, Deserializer keyDeserializer, Deserializer valDeserializer) {
-            this.name = name;
+            super(name);
             this.topics = topics.clone();
             this.keyDeserializer = keyDeserializer;
             this.valDeserializer = valDeserializer;
@@ -108,15 +112,14 @@ public class TopologyBuilder {
         }
     }
 
-    private class SinkNodeFactory implements NodeFactory {
+    private static class SinkNodeFactory extends NodeFactory {
         public final String[] parents;
         public final String topic;
-        private final String name;
         private Serializer keySerializer;
         private Serializer valSerializer;
 
         private SinkNodeFactory(String name, String[] parents, String topic, Serializer keySerializer, Serializer valSerializer) {
-            this.name = name;
+            super(name);
             this.parents = parents.clone();
             this.topic = topic;
             this.keySerializer = keySerializer;
@@ -330,11 +333,8 @@ public class TopologyBuilder {
     public Map<Integer, Set<String>> topicGroups() {
         Map<Integer, Set<String>> topicGroups = new HashMap<>();
 
-        if (nodeGroups == null) {
-            nodeGroups = nodeGroups();
-        } else if (!nodeGroups.equals(nodeGroups())) {
-            throw new TopologyException("topology has mutated");
-        }
+        if (nodeGroups == null)
+            nodeGroups = makeNodeGroups();
 
         for (Map.Entry<Integer, Set<String>> entry : nodeGroups.entrySet()) {
             Set<String> topicGroup = new HashSet<>();
@@ -349,7 +349,19 @@ public class TopologyBuilder {
         return Collections.unmodifiableMap(topicGroups);
     }
 
-    private Map<Integer, Set<String>> nodeGroups() {
+    /**
+     * Returns the map of node groups keyed by the topic group id.
+     *
+     * @return groups of node names
+     */
+    public Map<Integer, Set<String>> nodeGroups() {
+        if (nodeGroups == null)
+            nodeGroups = makeNodeGroups();
+
+        return nodeGroups;
+    }
+
+    private Map<Integer, Set<String>> makeNodeGroups() {
         HashMap<Integer, Set<String>> nodeGroups = new HashMap<>();
         HashMap<String, Set<String>> rootToNodeGroup = new HashMap<>();
 
@@ -383,14 +395,16 @@ public class TopologyBuilder {
 
         return nodeGroups;
     }
-
+    
     /**
      * Asserts that the streams of the specified source nodes must be copartitioned.
      *
      * @param sourceNodes a set of source node names
+     * @return this builder instance so methods can be chained together; never null
      */
-    public void copartitionSources(Collection<String> sourceNodes) {
+    public final TopologyBuilder copartitionSources(Collection<String> sourceNodes) {
         copartitionSourceGroups.add(Collections.unmodifiableSet(new HashSet<>(sourceNodes)));
+        return this;
     }
 
     /**
@@ -414,13 +428,24 @@ public class TopologyBuilder {
     }
 
     /**
-     * Build the topology. This is typically called automatically when passing this builder into the
+     * Build the topology for the specified topic group. This is called automatically when passing this builder into the
      * {@link KafkaStreaming#KafkaStreaming(TopologyBuilder, StreamingConfig)} constructor.
      *
      * @see KafkaStreaming#KafkaStreaming(TopologyBuilder, StreamingConfig)
      */
+    public ProcessorTopology build(Integer topicGroupId) {
+        Set<String> nodeGroup;
+        if (topicGroupId != null) {
+            nodeGroup = nodeGroups().get(topicGroupId);
+        } else {
+            // when nodeGroup is null, we build the full topology. this is used in some tests.
+            nodeGroup = null;
+        }
+        return build(nodeGroup);
+    }
+
     @SuppressWarnings("unchecked")
-    public ProcessorTopology build() {
+    private ProcessorTopology build(Set<String> nodeGroup) {
         List<ProcessorNode> processorNodes = new ArrayList<>(nodeFactories.size());
         Map<String, ProcessorNode> processorMap = new HashMap<>();
         Map<String, SourceNode> topicSourceMap = new HashMap<>();
@@ -429,29 +454,31 @@ public class TopologyBuilder {
         try {
             // create processor nodes in a topological order ("nodeFactories" is already topologically sorted)
             for (NodeFactory factory : nodeFactories.values()) {
-                ProcessorNode node = factory.build();
-                processorNodes.add(node);
-                processorMap.put(node.name(), node);
-
-                if (factory instanceof ProcessorNodeFactory) {
-                    for (String parent : ((ProcessorNodeFactory) factory).parents) {
-                        processorMap.get(parent).addChild(node);
-                    }
-                    for (String stateStoreName : ((ProcessorNodeFactory) factory).stateStoreNames) {
-                        if (!stateStoreMap.containsKey(stateStoreName)) {
-                            stateStoreMap.put(stateStoreName, stateStores.get(stateStoreName));
+                if (nodeGroup == null || nodeGroup.contains(factory.name)) {
+                    ProcessorNode node = factory.build();
+                    processorNodes.add(node);
+                    processorMap.put(node.name(), node);
+
+                    if (factory instanceof ProcessorNodeFactory) {
+                        for (String parent : ((ProcessorNodeFactory) factory).parents) {
+                            processorMap.get(parent).addChild(node);
                         }
+                        for (String stateStoreName : ((ProcessorNodeFactory) factory).stateStoreNames) {
+                            if (!stateStoreMap.containsKey(stateStoreName)) {
+                                stateStoreMap.put(stateStoreName, stateStores.get(stateStoreName));
+                            }
+                        }
+                    } else if (factory instanceof SourceNodeFactory) {
+                        for (String topic : ((SourceNodeFactory) factory).topics) {
+                            topicSourceMap.put(topic, (SourceNode) node);
+                        }
+                    } else if (factory instanceof SinkNodeFactory) {
+                        for (String parent : ((SinkNodeFactory) factory).parents) {
+                            processorMap.get(parent).addChild(node);
+                        }
+                    } else {
+                        throw new TopologyException("Unknown definition class: " + factory.getClass().getName());
                     }
-                } else if (factory instanceof SourceNodeFactory) {
-                    for (String topic : ((SourceNodeFactory) factory).topics) {
-                        topicSourceMap.put(topic, (SourceNode) node);
-                    }
-                } else if (factory instanceof SinkNodeFactory) {
-                    for (String parent : ((SinkNodeFactory) factory).parents) {
-                        processorMap.get(parent).addChild(node);
-                    }
-                } else {
-                    throw new TopologyException("Unknown definition class: " + factory.getClass().getName());
                 }
             }
         } catch (Exception e) {

http://git-wip-us.apache.org/repos/asf/kafka/blob/421de0a3/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
index 0bf51d7..ba81421 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
@@ -418,7 +418,9 @@ public class StreamThread extends Thread {
     protected StreamTask createStreamTask(TaskId id, Collection<TopicPartition> partitionsForTask) {
         sensors.taskCreationSensor.record();
 
-        return new StreamTask(id, consumer, producer, restoreConsumer, partitionsForTask, builder.build(), config, sensors);
+        ProcessorTopology topology = builder.build(id.topicGroupId);
+
+        return new StreamTask(id, consumer, producer, restoreConsumer, partitionsForTask, topology, config, sensors);
     }
 
     private void addPartitions(Collection<TopicPartition> assignment) {

http://git-wip-us.apache.org/repos/asf/kafka/blob/421de0a3/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamImplTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamImplTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamImplTest.java
index 2db488c..d924a34 100644
--- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamImplTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamImplTest.java
@@ -132,6 +132,6 @@ public class KStreamImplTest {
             1 + // to
             2 + // through
             1, // process
-            builder.build().processors().size());
+            builder.build(null).processors().size());
     }
 }

http://git-wip-us.apache.org/repos/asf/kafka/blob/421de0a3/streams/src/test/java/org/apache/kafka/streams/processor/TopologyBuilderTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/TopologyBuilderTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/TopologyBuilderTest.java
index de1328e..b1b71b6 100644
--- a/streams/src/test/java/org/apache/kafka/streams/processor/TopologyBuilderTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/processor/TopologyBuilderTest.java
@@ -20,11 +20,14 @@ package org.apache.kafka.streams.processor;
 import static org.junit.Assert.assertEquals;
 
 import static org.apache.kafka.common.utils.Utils.mkSet;
+import static org.apache.kafka.common.utils.Utils.mkList;
+
+import org.apache.kafka.streams.processor.internals.ProcessorNode;
+import org.apache.kafka.streams.processor.internals.ProcessorTopology;
 import org.apache.kafka.test.MockProcessorSupplier;
 import org.apache.kafka.test.MockStateStoreSupplier;
 import org.junit.Test;
 
-import java.util.Arrays;
 import java.util.Collection;
 import java.util.HashMap;
 import java.util.HashSet;
@@ -145,13 +148,13 @@ public class TopologyBuilderTest {
 
         StateStoreSupplier supplier = new MockStateStoreSupplier("store-1", false);
         builder.addStateStore(supplier);
-        suppliers = builder.build().stateStoreSuppliers();
+        suppliers = builder.build(null).stateStoreSuppliers();
         assertEquals(0, suppliers.size());
 
         builder.addSource("source-1", "topic-1");
         builder.addProcessor("processor-1", new MockProcessorSupplier(), "source-1");
         builder.connectProcessorAndStateStores("processor-1", "store-1");
-        suppliers = builder.build().stateStoreSuppliers();
+        suppliers = builder.build(null).stateStoreSuppliers();
         assertEquals(1, suppliers.size());
         assertEquals(supplier.name(), suppliers.get(0).name());
     }
@@ -169,16 +172,16 @@ public class TopologyBuilderTest {
         builder.addProcessor("processor-1", new MockProcessorSupplier(), "source-1");
 
         builder.addProcessor("processor-2", new MockProcessorSupplier(), "source-2", "processor-1");
-        builder.copartitionSources(list("source-1", "source-2"));
+        builder.copartitionSources(mkList("source-1", "source-2"));
 
         builder.addProcessor("processor-3", new MockProcessorSupplier(), "source-3", "source-4");
 
         Map<Integer, Set<String>> topicGroups = builder.topicGroups();
 
         Map<Integer, Set<String>> expectedTopicGroups = new HashMap<>();
-        expectedTopicGroups.put(0, set("topic-1", "topic-1x", "topic-2"));
-        expectedTopicGroups.put(1, set("topic-3", "topic-4"));
-        expectedTopicGroups.put(2, set("topic-5"));
+        expectedTopicGroups.put(0, mkSet("topic-1", "topic-1x", "topic-2"));
+        expectedTopicGroups.put(1, mkSet("topic-3", "topic-4"));
+        expectedTopicGroups.put(2, mkSet("topic-5"));
 
         assertEquals(3, topicGroups.size());
         assertEquals(expectedTopicGroups, topicGroups);
@@ -209,24 +212,43 @@ public class TopologyBuilderTest {
         Map<Integer, Set<String>> topicGroups = builder.topicGroups();
 
         Map<Integer, Set<String>> expectedTopicGroups = new HashMap<>();
-        expectedTopicGroups.put(0, set("topic-1", "topic-1x", "topic-2"));
-        expectedTopicGroups.put(1, set("topic-3", "topic-4"));
-        expectedTopicGroups.put(2, set("topic-5"));
+        expectedTopicGroups.put(0, mkSet("topic-1", "topic-1x", "topic-2"));
+        expectedTopicGroups.put(1, mkSet("topic-3", "topic-4"));
+        expectedTopicGroups.put(2, mkSet("topic-5"));
 
         assertEquals(3, topicGroups.size());
         assertEquals(expectedTopicGroups, topicGroups);
     }
 
-    private <T> Set<T> set(T... items) {
-        Set<T> set = new HashSet<>();
-        for (T item : items) {
-            set.add(item);
-        }
-        return set;
+    @Test
+    public void testBuild() {
+        final TopologyBuilder builder = new TopologyBuilder();
+
+        builder.addSource("source-1", "topic-1", "topic-1x");
+        builder.addSource("source-2", "topic-2");
+        builder.addSource("source-3", "topic-3");
+        builder.addSource("source-4", "topic-4");
+        builder.addSource("source-5", "topic-5");
+
+        builder.addProcessor("processor-1", new MockProcessorSupplier(), "source-1");
+        builder.addProcessor("processor-2", new MockProcessorSupplier(), "source-2", "processor-1");
+        builder.addProcessor("processor-3", new MockProcessorSupplier(), "source-3", "source-4");
+
+        ProcessorTopology topology0 = builder.build(0);
+        ProcessorTopology topology1 = builder.build(1);
+        ProcessorTopology topology2 = builder.build(2);
+
+        assertEquals(mkSet("source-1", "source-2", "processor-1", "processor-2"), nodeNames(topology0.processors()));
+        assertEquals(mkSet("source-3", "source-4", "processor-3"), nodeNames(topology1.processors()));
+        assertEquals(mkSet("source-5"), nodeNames(topology2.processors()));
     }
 
-    private <T> List<T> list(T... elems) {
-        return Arrays.asList(elems);
+    private Set<String> nodeNames(Collection<ProcessorNode> nodes) {
+        Set<String> nodeNames = new HashSet<>();
+        for (ProcessorNode node : nodes) {
+            nodeNames.add(node.name());
+        }
+        return nodeNames;
     }
 
 }

http://git-wip-us.apache.org/repos/asf/kafka/blob/421de0a3/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorTopologyTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorTopologyTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorTopologyTest.java
index 803d4b0..54096b2 100644
--- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorTopologyTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorTopologyTest.java
@@ -95,7 +95,7 @@ public class ProcessorTopologyTest {
         builder.addSink("sink-1", "topic-3", "processor-1");
         builder.addSink("sink-2", "topic-4", "processor-1", "processor-2");
 
-        final ProcessorTopology topology = builder.build();
+        final ProcessorTopology topology = builder.build(null);
 
         assertEquals(6, topology.processors().size());
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/421de0a3/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamTaskTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamTaskTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamTaskTest.java
index d80e98c..a95c2fa 100644
--- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamTaskTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamTaskTest.java
@@ -40,8 +40,8 @@ import java.nio.file.Files;
 import java.util.Arrays;
 import java.util.Collections;
 import java.util.HashMap;
-import java.util.HashSet;
 import java.util.Properties;
+import java.util.Set;
 
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertTrue;
@@ -54,7 +54,7 @@ public class StreamTaskTest {
 
     private final TopicPartition partition1 = new TopicPartition("topic1", 1);
     private final TopicPartition partition2 = new TopicPartition("topic2", 1);
-    private final HashSet<TopicPartition> partitions = new HashSet<>(Arrays.asList(partition1, partition2));
+    private final Set<TopicPartition> partitions = Utils.mkSet(partition1, partition2);
 
     private final MockSourceNode source1 = new MockSourceNode<>(intDeserializer, intDeserializer);
     private final MockSourceNode source2 = new MockSourceNode<>(intDeserializer, intDeserializer);
@@ -117,29 +117,29 @@ public class StreamTaskTest {
                     new ConsumerRecord<>(partition2.topic(), partition2.partition(), 45, recordKey, recordValue)
             ));
 
-            assertEquals(task.process(), 5);
-            assertEquals(source1.numReceived, 1);
-            assertEquals(source2.numReceived, 0);
+            assertEquals(5, task.process());
+            assertEquals(1, source1.numReceived);
+            assertEquals(0, source2.numReceived);
 
-            assertEquals(task.process(), 4);
-            assertEquals(source1.numReceived, 1);
-            assertEquals(source2.numReceived, 1);
+            assertEquals(4, task.process());
+            assertEquals(1, source1.numReceived);
+            assertEquals(1, source2.numReceived);
 
-            assertEquals(task.process(), 3);
-            assertEquals(source1.numReceived, 2);
-            assertEquals(source2.numReceived, 1);
+            assertEquals(3, task.process());
+            assertEquals(2, source1.numReceived);
+            assertEquals(1, source2.numReceived);
 
-            assertEquals(task.process(), 2);
-            assertEquals(source1.numReceived, 3);
-            assertEquals(source2.numReceived, 1);
+            assertEquals(2, task.process());
+            assertEquals(3, source1.numReceived);
+            assertEquals(1, source2.numReceived);
 
-            assertEquals(task.process(), 1);
-            assertEquals(source1.numReceived, 3);
-            assertEquals(source2.numReceived, 2);
+            assertEquals(1, task.process());
+            assertEquals(3, source1.numReceived);
+            assertEquals(2, source2.numReceived);
 
-            assertEquals(task.process(), 0);
-            assertEquals(source1.numReceived, 3);
-            assertEquals(source2.numReceived, 3);
+            assertEquals(0, task.process());
+            assertEquals(3, source1.numReceived);
+            assertEquals(3, source2.numReceived);
 
             task.close();
 
@@ -168,11 +168,11 @@ public class StreamTaskTest {
                     new ConsumerRecord<>(partition2.topic(), partition2.partition(), 65, recordKey, recordValue)
             ));
 
-            assertEquals(task.process(), 5);
-            assertEquals(source1.numReceived, 1);
-            assertEquals(source2.numReceived, 0);
+            assertEquals(5, task.process());
+            assertEquals(1, source1.numReceived);
+            assertEquals(0, source2.numReceived);
 
-            assertEquals(consumer.paused().size(), 1);
+            assertEquals(1, consumer.paused().size());
             assertTrue(consumer.paused().contains(partition2));
 
             task.addRecords(partition1, records(
@@ -181,22 +181,22 @@ public class StreamTaskTest {
                     new ConsumerRecord<>(partition1.topic(), partition1.partition(), 50, recordKey, recordValue)
             ));
 
-            assertEquals(consumer.paused().size(), 2);
+            assertEquals(2, consumer.paused().size());
             assertTrue(consumer.paused().contains(partition1));
             assertTrue(consumer.paused().contains(partition2));
 
-            assertEquals(task.process(), 7);
-            assertEquals(source1.numReceived, 1);
-            assertEquals(source2.numReceived, 1);
+            assertEquals(7, task.process());
+            assertEquals(1, source1.numReceived);
+            assertEquals(1, source2.numReceived);
 
-            assertEquals(consumer.paused().size(), 1);
+            assertEquals(1, consumer.paused().size());
             assertTrue(consumer.paused().contains(partition1));
 
-            assertEquals(task.process(), 6);
-            assertEquals(source1.numReceived, 2);
-            assertEquals(source2.numReceived, 1);
+            assertEquals(6, task.process());
+            assertEquals(2, source1.numReceived);
+            assertEquals(1, source2.numReceived);
 
-            assertEquals(consumer.paused().size(), 0);
+            assertEquals(0, consumer.paused().size());
 
             task.close();
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/421de0a3/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java
index d5011a3..d8141d1 100644
--- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java
@@ -142,7 +142,8 @@ public class StreamThreadTest {
         StreamThread thread = new StreamThread(builder, config, producer, consumer, mockRestoreConsumer, "test", new Metrics(), new SystemTime()) {
             @Override
             protected StreamTask createStreamTask(TaskId id, Collection<TopicPartition> partitionsForTask) {
-                return new TestStreamTask(id, consumer, producer, mockRestoreConsumer, partitionsForTask, builder.build(), config);
+                ProcessorTopology topology = builder.build(id.topicGroupId);
+                return new TestStreamTask(id, consumer, producer, mockRestoreConsumer, partitionsForTask, topology, config);
             }
         };
 
@@ -266,7 +267,8 @@ public class StreamThreadTest {
 
                 @Override
                 protected StreamTask createStreamTask(TaskId id, Collection<TopicPartition> partitionsForTask) {
-                    return new TestStreamTask(id, consumer, producer, mockRestoreConsumer, partitionsForTask, builder.build(), config);
+                    ProcessorTopology topology = builder.build(id.topicGroupId);
+                    return new TestStreamTask(id, consumer, producer, mockRestoreConsumer, partitionsForTask, topology, config);
                 }
             };
 
@@ -387,7 +389,8 @@ public class StreamThreadTest {
 
                 @Override
                 protected StreamTask createStreamTask(TaskId id, Collection<TopicPartition> partitionsForTask) {
-                    return new TestStreamTask(id, consumer, producer, mockRestoreConsumer, partitionsForTask, builder.build(), config);
+                    ProcessorTopology topology = builder.build(id.topicGroupId);
+                    return new TestStreamTask(id, consumer, producer, mockRestoreConsumer, partitionsForTask, topology, config);
                 }
             };
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/421de0a3/streams/src/test/java/org/apache/kafka/test/KStreamTestDriver.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/test/KStreamTestDriver.java b/streams/src/test/java/org/apache/kafka/test/KStreamTestDriver.java
index 2c42e6c..ca5f33d 100644
--- a/streams/src/test/java/org/apache/kafka/test/KStreamTestDriver.java
+++ b/streams/src/test/java/org/apache/kafka/test/KStreamTestDriver.java
@@ -37,7 +37,7 @@ public class KStreamTestDriver {
     }
 
     public KStreamTestDriver(KStreamBuilder builder, Serializer<?> serializer, Deserializer<?> deserializer) {
-        this.topology = builder.build();
+        this.topology = builder.build(null);
         this.context = new MockProcessorContext(this, serializer, deserializer);
 
         for (ProcessorNode node : topology.processors()) {

http://git-wip-us.apache.org/repos/asf/kafka/blob/421de0a3/streams/src/test/java/org/apache/kafka/test/ProcessorTopologyTestDriver.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/test/ProcessorTopologyTestDriver.java b/streams/src/test/java/org/apache/kafka/test/ProcessorTopologyTestDriver.java
index fc83762..5f796c6 100644
--- a/streams/src/test/java/org/apache/kafka/test/ProcessorTopologyTestDriver.java
+++ b/streams/src/test/java/org/apache/kafka/test/ProcessorTopologyTestDriver.java
@@ -142,7 +142,7 @@ public class ProcessorTopologyTestDriver {
      */
     public ProcessorTopologyTestDriver(StreamingConfig config, TopologyBuilder builder, String... storeNames) {
         id = new TaskId(0, 0);
-        topology = builder.build();
+        topology = builder.build(null);
 
         // Set up the consumer and producer ...
         consumer = new MockConsumer<>(OffsetResetStrategy.EARLIEST);