You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by gu...@apache.org on 2015/11/04 18:52:08 UTC
kafka git commit: KAFKA-2727: Topology partial construction
Repository: kafka
Updated Branches:
refs/heads/trunk 7ded64bc2 -> 421de0a3f
KAFKA-2727: Topology partial construction
guozhangwang
Author: Yasuhiro Matsuda <ya...@confluent.io>
Reviewers: Guozhang Wang
Closes #411 from ymatsuda/topology_partial_construction
Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/421de0a3
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/421de0a3
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/421de0a3
Branch: refs/heads/trunk
Commit: 421de0a3f93f8d21f7ca0f1287a4305c00edaa08
Parents: 7ded64b
Author: Yasuhiro Matsuda <ya...@confluent.io>
Authored: Wed Nov 4 09:57:50 2015 -0800
Committer: Guozhang Wang <wa...@gmail.com>
Committed: Wed Nov 4 09:57:50 2015 -0800
----------------------------------------------------------------------
.../streams/processor/TopologyBuilder.java | 111 ++++++++++++-------
.../processor/internals/StreamThread.java | 4 +-
.../kstream/internals/KStreamImplTest.java | 2 +-
.../streams/processor/TopologyBuilderTest.java | 58 +++++++---
.../internals/ProcessorTopologyTest.java | 2 +-
.../processor/internals/StreamTaskTest.java | 66 +++++------
.../processor/internals/StreamThreadTest.java | 9 +-
.../apache/kafka/test/KStreamTestDriver.java | 2 +-
.../kafka/test/ProcessorTopologyTestDriver.java | 2 +-
9 files changed, 155 insertions(+), 101 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/kafka/blob/421de0a3/streams/src/main/java/org/apache/kafka/streams/processor/TopologyBuilder.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/TopologyBuilder.java b/streams/src/main/java/org/apache/kafka/streams/processor/TopologyBuilder.java
index 5b6d4ae..893f7de 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/TopologyBuilder.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/TopologyBuilder.java
@@ -63,18 +63,23 @@ public class TopologyBuilder {
private Map<String, StateStoreSupplier> stateStores = new HashMap<>();
private Map<String, Set<String>> stateStoreUsers = new HashMap();
- private interface NodeFactory {
- ProcessorNode build();
+ private static abstract class NodeFactory {
+ public final String name;
+
+ NodeFactory(String name) {
+ this.name = name;
+ }
+
+ public abstract ProcessorNode build();
}
- private class ProcessorNodeFactory implements NodeFactory {
+ private static class ProcessorNodeFactory extends NodeFactory {
public final String[] parents;
- private final String name;
private final ProcessorSupplier supplier;
private final Set<String> stateStoreNames = new HashSet<>();
public ProcessorNodeFactory(String name, String[] parents, ProcessorSupplier supplier) {
- this.name = name;
+ super(name);
this.parents = parents.clone();
this.supplier = supplier;
}
@@ -89,14 +94,13 @@ public class TopologyBuilder {
}
}
- private class SourceNodeFactory implements NodeFactory {
+ private static class SourceNodeFactory extends NodeFactory {
public final String[] topics;
- private final String name;
private Deserializer keyDeserializer;
private Deserializer valDeserializer;
private SourceNodeFactory(String name, String[] topics, Deserializer keyDeserializer, Deserializer valDeserializer) {
- this.name = name;
+ super(name);
this.topics = topics.clone();
this.keyDeserializer = keyDeserializer;
this.valDeserializer = valDeserializer;
@@ -108,15 +112,14 @@ public class TopologyBuilder {
}
}
- private class SinkNodeFactory implements NodeFactory {
+ private static class SinkNodeFactory extends NodeFactory {
public final String[] parents;
public final String topic;
- private final String name;
private Serializer keySerializer;
private Serializer valSerializer;
private SinkNodeFactory(String name, String[] parents, String topic, Serializer keySerializer, Serializer valSerializer) {
- this.name = name;
+ super(name);
this.parents = parents.clone();
this.topic = topic;
this.keySerializer = keySerializer;
@@ -330,11 +333,8 @@ public class TopologyBuilder {
public Map<Integer, Set<String>> topicGroups() {
Map<Integer, Set<String>> topicGroups = new HashMap<>();
- if (nodeGroups == null) {
- nodeGroups = nodeGroups();
- } else if (!nodeGroups.equals(nodeGroups())) {
- throw new TopologyException("topology has mutated");
- }
+ if (nodeGroups == null)
+ nodeGroups = makeNodeGroups();
for (Map.Entry<Integer, Set<String>> entry : nodeGroups.entrySet()) {
Set<String> topicGroup = new HashSet<>();
@@ -349,7 +349,19 @@ public class TopologyBuilder {
return Collections.unmodifiableMap(topicGroups);
}
- private Map<Integer, Set<String>> nodeGroups() {
+ /**
+ * Returns the map of node groups keyed by the topic group id.
+ *
+ * @return groups of node names
+ */
+ public Map<Integer, Set<String>> nodeGroups() {
+ if (nodeGroups == null)
+ nodeGroups = makeNodeGroups();
+
+ return nodeGroups;
+ }
+
+ private Map<Integer, Set<String>> makeNodeGroups() {
HashMap<Integer, Set<String>> nodeGroups = new HashMap<>();
HashMap<String, Set<String>> rootToNodeGroup = new HashMap<>();
@@ -383,14 +395,16 @@ public class TopologyBuilder {
return nodeGroups;
}
-
+
/**
* Asserts that the streams of the specified source nodes must be copartitioned.
*
* @param sourceNodes a set of source node names
+ * @return this builder instance so methods can be chained together; never null
*/
- public void copartitionSources(Collection<String> sourceNodes) {
+ public final TopologyBuilder copartitionSources(Collection<String> sourceNodes) {
copartitionSourceGroups.add(Collections.unmodifiableSet(new HashSet<>(sourceNodes)));
+ return this;
}
/**
@@ -414,13 +428,24 @@ public class TopologyBuilder {
}
/**
- * Build the topology. This is typically called automatically when passing this builder into the
+ * Build the topology for the specified topic group. This is called automatically when passing this builder into the
* {@link KafkaStreaming#KafkaStreaming(TopologyBuilder, StreamingConfig)} constructor.
*
* @see KafkaStreaming#KafkaStreaming(TopologyBuilder, StreamingConfig)
*/
+ public ProcessorTopology build(Integer topicGroupId) {
+ Set<String> nodeGroup;
+ if (topicGroupId != null) {
+ nodeGroup = nodeGroups().get(topicGroupId);
+ } else {
+ // when nodeGroup is null, we build the full topology. this is used in some tests.
+ nodeGroup = null;
+ }
+ return build(nodeGroup);
+ }
+
@SuppressWarnings("unchecked")
- public ProcessorTopology build() {
+ private ProcessorTopology build(Set<String> nodeGroup) {
List<ProcessorNode> processorNodes = new ArrayList<>(nodeFactories.size());
Map<String, ProcessorNode> processorMap = new HashMap<>();
Map<String, SourceNode> topicSourceMap = new HashMap<>();
@@ -429,29 +454,31 @@ public class TopologyBuilder {
try {
// create processor nodes in a topological order ("nodeFactories" is already topologically sorted)
for (NodeFactory factory : nodeFactories.values()) {
- ProcessorNode node = factory.build();
- processorNodes.add(node);
- processorMap.put(node.name(), node);
-
- if (factory instanceof ProcessorNodeFactory) {
- for (String parent : ((ProcessorNodeFactory) factory).parents) {
- processorMap.get(parent).addChild(node);
- }
- for (String stateStoreName : ((ProcessorNodeFactory) factory).stateStoreNames) {
- if (!stateStoreMap.containsKey(stateStoreName)) {
- stateStoreMap.put(stateStoreName, stateStores.get(stateStoreName));
+ if (nodeGroup == null || nodeGroup.contains(factory.name)) {
+ ProcessorNode node = factory.build();
+ processorNodes.add(node);
+ processorMap.put(node.name(), node);
+
+ if (factory instanceof ProcessorNodeFactory) {
+ for (String parent : ((ProcessorNodeFactory) factory).parents) {
+ processorMap.get(parent).addChild(node);
}
+ for (String stateStoreName : ((ProcessorNodeFactory) factory).stateStoreNames) {
+ if (!stateStoreMap.containsKey(stateStoreName)) {
+ stateStoreMap.put(stateStoreName, stateStores.get(stateStoreName));
+ }
+ }
+ } else if (factory instanceof SourceNodeFactory) {
+ for (String topic : ((SourceNodeFactory) factory).topics) {
+ topicSourceMap.put(topic, (SourceNode) node);
+ }
+ } else if (factory instanceof SinkNodeFactory) {
+ for (String parent : ((SinkNodeFactory) factory).parents) {
+ processorMap.get(parent).addChild(node);
+ }
+ } else {
+ throw new TopologyException("Unknown definition class: " + factory.getClass().getName());
}
- } else if (factory instanceof SourceNodeFactory) {
- for (String topic : ((SourceNodeFactory) factory).topics) {
- topicSourceMap.put(topic, (SourceNode) node);
- }
- } else if (factory instanceof SinkNodeFactory) {
- for (String parent : ((SinkNodeFactory) factory).parents) {
- processorMap.get(parent).addChild(node);
- }
- } else {
- throw new TopologyException("Unknown definition class: " + factory.getClass().getName());
}
}
} catch (Exception e) {
http://git-wip-us.apache.org/repos/asf/kafka/blob/421de0a3/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
index 0bf51d7..ba81421 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
@@ -418,7 +418,9 @@ public class StreamThread extends Thread {
protected StreamTask createStreamTask(TaskId id, Collection<TopicPartition> partitionsForTask) {
sensors.taskCreationSensor.record();
- return new StreamTask(id, consumer, producer, restoreConsumer, partitionsForTask, builder.build(), config, sensors);
+ ProcessorTopology topology = builder.build(id.topicGroupId);
+
+ return new StreamTask(id, consumer, producer, restoreConsumer, partitionsForTask, topology, config, sensors);
}
private void addPartitions(Collection<TopicPartition> assignment) {
http://git-wip-us.apache.org/repos/asf/kafka/blob/421de0a3/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamImplTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamImplTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamImplTest.java
index 2db488c..d924a34 100644
--- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamImplTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamImplTest.java
@@ -132,6 +132,6 @@ public class KStreamImplTest {
1 + // to
2 + // through
1, // process
- builder.build().processors().size());
+ builder.build(null).processors().size());
}
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/421de0a3/streams/src/test/java/org/apache/kafka/streams/processor/TopologyBuilderTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/TopologyBuilderTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/TopologyBuilderTest.java
index de1328e..b1b71b6 100644
--- a/streams/src/test/java/org/apache/kafka/streams/processor/TopologyBuilderTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/processor/TopologyBuilderTest.java
@@ -20,11 +20,14 @@ package org.apache.kafka.streams.processor;
import static org.junit.Assert.assertEquals;
import static org.apache.kafka.common.utils.Utils.mkSet;
+import static org.apache.kafka.common.utils.Utils.mkList;
+
+import org.apache.kafka.streams.processor.internals.ProcessorNode;
+import org.apache.kafka.streams.processor.internals.ProcessorTopology;
import org.apache.kafka.test.MockProcessorSupplier;
import org.apache.kafka.test.MockStateStoreSupplier;
import org.junit.Test;
-import java.util.Arrays;
import java.util.Collection;
import java.util.HashMap;
import java.util.HashSet;
@@ -145,13 +148,13 @@ public class TopologyBuilderTest {
StateStoreSupplier supplier = new MockStateStoreSupplier("store-1", false);
builder.addStateStore(supplier);
- suppliers = builder.build().stateStoreSuppliers();
+ suppliers = builder.build(null).stateStoreSuppliers();
assertEquals(0, suppliers.size());
builder.addSource("source-1", "topic-1");
builder.addProcessor("processor-1", new MockProcessorSupplier(), "source-1");
builder.connectProcessorAndStateStores("processor-1", "store-1");
- suppliers = builder.build().stateStoreSuppliers();
+ suppliers = builder.build(null).stateStoreSuppliers();
assertEquals(1, suppliers.size());
assertEquals(supplier.name(), suppliers.get(0).name());
}
@@ -169,16 +172,16 @@ public class TopologyBuilderTest {
builder.addProcessor("processor-1", new MockProcessorSupplier(), "source-1");
builder.addProcessor("processor-2", new MockProcessorSupplier(), "source-2", "processor-1");
- builder.copartitionSources(list("source-1", "source-2"));
+ builder.copartitionSources(mkList("source-1", "source-2"));
builder.addProcessor("processor-3", new MockProcessorSupplier(), "source-3", "source-4");
Map<Integer, Set<String>> topicGroups = builder.topicGroups();
Map<Integer, Set<String>> expectedTopicGroups = new HashMap<>();
- expectedTopicGroups.put(0, set("topic-1", "topic-1x", "topic-2"));
- expectedTopicGroups.put(1, set("topic-3", "topic-4"));
- expectedTopicGroups.put(2, set("topic-5"));
+ expectedTopicGroups.put(0, mkSet("topic-1", "topic-1x", "topic-2"));
+ expectedTopicGroups.put(1, mkSet("topic-3", "topic-4"));
+ expectedTopicGroups.put(2, mkSet("topic-5"));
assertEquals(3, topicGroups.size());
assertEquals(expectedTopicGroups, topicGroups);
@@ -209,24 +212,43 @@ public class TopologyBuilderTest {
Map<Integer, Set<String>> topicGroups = builder.topicGroups();
Map<Integer, Set<String>> expectedTopicGroups = new HashMap<>();
- expectedTopicGroups.put(0, set("topic-1", "topic-1x", "topic-2"));
- expectedTopicGroups.put(1, set("topic-3", "topic-4"));
- expectedTopicGroups.put(2, set("topic-5"));
+ expectedTopicGroups.put(0, mkSet("topic-1", "topic-1x", "topic-2"));
+ expectedTopicGroups.put(1, mkSet("topic-3", "topic-4"));
+ expectedTopicGroups.put(2, mkSet("topic-5"));
assertEquals(3, topicGroups.size());
assertEquals(expectedTopicGroups, topicGroups);
}
- private <T> Set<T> set(T... items) {
- Set<T> set = new HashSet<>();
- for (T item : items) {
- set.add(item);
- }
- return set;
+ @Test
+ public void testBuild() {
+ final TopologyBuilder builder = new TopologyBuilder();
+
+ builder.addSource("source-1", "topic-1", "topic-1x");
+ builder.addSource("source-2", "topic-2");
+ builder.addSource("source-3", "topic-3");
+ builder.addSource("source-4", "topic-4");
+ builder.addSource("source-5", "topic-5");
+
+ builder.addProcessor("processor-1", new MockProcessorSupplier(), "source-1");
+ builder.addProcessor("processor-2", new MockProcessorSupplier(), "source-2", "processor-1");
+ builder.addProcessor("processor-3", new MockProcessorSupplier(), "source-3", "source-4");
+
+ ProcessorTopology topology0 = builder.build(0);
+ ProcessorTopology topology1 = builder.build(1);
+ ProcessorTopology topology2 = builder.build(2);
+
+ assertEquals(mkSet("source-1", "source-2", "processor-1", "processor-2"), nodeNames(topology0.processors()));
+ assertEquals(mkSet("source-3", "source-4", "processor-3"), nodeNames(topology1.processors()));
+ assertEquals(mkSet("source-5"), nodeNames(topology2.processors()));
}
- private <T> List<T> list(T... elems) {
- return Arrays.asList(elems);
+ private Set<String> nodeNames(Collection<ProcessorNode> nodes) {
+ Set<String> nodeNames = new HashSet<>();
+ for (ProcessorNode node : nodes) {
+ nodeNames.add(node.name());
+ }
+ return nodeNames;
}
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/421de0a3/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorTopologyTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorTopologyTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorTopologyTest.java
index 803d4b0..54096b2 100644
--- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorTopologyTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorTopologyTest.java
@@ -95,7 +95,7 @@ public class ProcessorTopologyTest {
builder.addSink("sink-1", "topic-3", "processor-1");
builder.addSink("sink-2", "topic-4", "processor-1", "processor-2");
- final ProcessorTopology topology = builder.build();
+ final ProcessorTopology topology = builder.build(null);
assertEquals(6, topology.processors().size());
http://git-wip-us.apache.org/repos/asf/kafka/blob/421de0a3/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamTaskTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamTaskTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamTaskTest.java
index d80e98c..a95c2fa 100644
--- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamTaskTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamTaskTest.java
@@ -40,8 +40,8 @@ import java.nio.file.Files;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
-import java.util.HashSet;
import java.util.Properties;
+import java.util.Set;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;
@@ -54,7 +54,7 @@ public class StreamTaskTest {
private final TopicPartition partition1 = new TopicPartition("topic1", 1);
private final TopicPartition partition2 = new TopicPartition("topic2", 1);
- private final HashSet<TopicPartition> partitions = new HashSet<>(Arrays.asList(partition1, partition2));
+ private final Set<TopicPartition> partitions = Utils.mkSet(partition1, partition2);
private final MockSourceNode source1 = new MockSourceNode<>(intDeserializer, intDeserializer);
private final MockSourceNode source2 = new MockSourceNode<>(intDeserializer, intDeserializer);
@@ -117,29 +117,29 @@ public class StreamTaskTest {
new ConsumerRecord<>(partition2.topic(), partition2.partition(), 45, recordKey, recordValue)
));
- assertEquals(task.process(), 5);
- assertEquals(source1.numReceived, 1);
- assertEquals(source2.numReceived, 0);
+ assertEquals(5, task.process());
+ assertEquals(1, source1.numReceived);
+ assertEquals(0, source2.numReceived);
- assertEquals(task.process(), 4);
- assertEquals(source1.numReceived, 1);
- assertEquals(source2.numReceived, 1);
+ assertEquals(4, task.process());
+ assertEquals(1, source1.numReceived);
+ assertEquals(1, source2.numReceived);
- assertEquals(task.process(), 3);
- assertEquals(source1.numReceived, 2);
- assertEquals(source2.numReceived, 1);
+ assertEquals(3, task.process());
+ assertEquals(2, source1.numReceived);
+ assertEquals(1, source2.numReceived);
- assertEquals(task.process(), 2);
- assertEquals(source1.numReceived, 3);
- assertEquals(source2.numReceived, 1);
+ assertEquals(2, task.process());
+ assertEquals(3, source1.numReceived);
+ assertEquals(1, source2.numReceived);
- assertEquals(task.process(), 1);
- assertEquals(source1.numReceived, 3);
- assertEquals(source2.numReceived, 2);
+ assertEquals(1, task.process());
+ assertEquals(3, source1.numReceived);
+ assertEquals(2, source2.numReceived);
- assertEquals(task.process(), 0);
- assertEquals(source1.numReceived, 3);
- assertEquals(source2.numReceived, 3);
+ assertEquals(0, task.process());
+ assertEquals(3, source1.numReceived);
+ assertEquals(3, source2.numReceived);
task.close();
@@ -168,11 +168,11 @@ public class StreamTaskTest {
new ConsumerRecord<>(partition2.topic(), partition2.partition(), 65, recordKey, recordValue)
));
- assertEquals(task.process(), 5);
- assertEquals(source1.numReceived, 1);
- assertEquals(source2.numReceived, 0);
+ assertEquals(5, task.process());
+ assertEquals(1, source1.numReceived);
+ assertEquals(0, source2.numReceived);
- assertEquals(consumer.paused().size(), 1);
+ assertEquals(1, consumer.paused().size());
assertTrue(consumer.paused().contains(partition2));
task.addRecords(partition1, records(
@@ -181,22 +181,22 @@ public class StreamTaskTest {
new ConsumerRecord<>(partition1.topic(), partition1.partition(), 50, recordKey, recordValue)
));
- assertEquals(consumer.paused().size(), 2);
+ assertEquals(2, consumer.paused().size());
assertTrue(consumer.paused().contains(partition1));
assertTrue(consumer.paused().contains(partition2));
- assertEquals(task.process(), 7);
- assertEquals(source1.numReceived, 1);
- assertEquals(source2.numReceived, 1);
+ assertEquals(7, task.process());
+ assertEquals(1, source1.numReceived);
+ assertEquals(1, source2.numReceived);
- assertEquals(consumer.paused().size(), 1);
+ assertEquals(1, consumer.paused().size());
assertTrue(consumer.paused().contains(partition1));
- assertEquals(task.process(), 6);
- assertEquals(source1.numReceived, 2);
- assertEquals(source2.numReceived, 1);
+ assertEquals(6, task.process());
+ assertEquals(2, source1.numReceived);
+ assertEquals(1, source2.numReceived);
- assertEquals(consumer.paused().size(), 0);
+ assertEquals(0, consumer.paused().size());
task.close();
http://git-wip-us.apache.org/repos/asf/kafka/blob/421de0a3/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java
index d5011a3..d8141d1 100644
--- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java
@@ -142,7 +142,8 @@ public class StreamThreadTest {
StreamThread thread = new StreamThread(builder, config, producer, consumer, mockRestoreConsumer, "test", new Metrics(), new SystemTime()) {
@Override
protected StreamTask createStreamTask(TaskId id, Collection<TopicPartition> partitionsForTask) {
- return new TestStreamTask(id, consumer, producer, mockRestoreConsumer, partitionsForTask, builder.build(), config);
+ ProcessorTopology topology = builder.build(id.topicGroupId);
+ return new TestStreamTask(id, consumer, producer, mockRestoreConsumer, partitionsForTask, topology, config);
}
};
@@ -266,7 +267,8 @@ public class StreamThreadTest {
@Override
protected StreamTask createStreamTask(TaskId id, Collection<TopicPartition> partitionsForTask) {
- return new TestStreamTask(id, consumer, producer, mockRestoreConsumer, partitionsForTask, builder.build(), config);
+ ProcessorTopology topology = builder.build(id.topicGroupId);
+ return new TestStreamTask(id, consumer, producer, mockRestoreConsumer, partitionsForTask, topology, config);
}
};
@@ -387,7 +389,8 @@ public class StreamThreadTest {
@Override
protected StreamTask createStreamTask(TaskId id, Collection<TopicPartition> partitionsForTask) {
- return new TestStreamTask(id, consumer, producer, mockRestoreConsumer, partitionsForTask, builder.build(), config);
+ ProcessorTopology topology = builder.build(id.topicGroupId);
+ return new TestStreamTask(id, consumer, producer, mockRestoreConsumer, partitionsForTask, topology, config);
}
};
http://git-wip-us.apache.org/repos/asf/kafka/blob/421de0a3/streams/src/test/java/org/apache/kafka/test/KStreamTestDriver.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/test/KStreamTestDriver.java b/streams/src/test/java/org/apache/kafka/test/KStreamTestDriver.java
index 2c42e6c..ca5f33d 100644
--- a/streams/src/test/java/org/apache/kafka/test/KStreamTestDriver.java
+++ b/streams/src/test/java/org/apache/kafka/test/KStreamTestDriver.java
@@ -37,7 +37,7 @@ public class KStreamTestDriver {
}
public KStreamTestDriver(KStreamBuilder builder, Serializer<?> serializer, Deserializer<?> deserializer) {
- this.topology = builder.build();
+ this.topology = builder.build(null);
this.context = new MockProcessorContext(this, serializer, deserializer);
for (ProcessorNode node : topology.processors()) {
http://git-wip-us.apache.org/repos/asf/kafka/blob/421de0a3/streams/src/test/java/org/apache/kafka/test/ProcessorTopologyTestDriver.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/test/ProcessorTopologyTestDriver.java b/streams/src/test/java/org/apache/kafka/test/ProcessorTopologyTestDriver.java
index fc83762..5f796c6 100644
--- a/streams/src/test/java/org/apache/kafka/test/ProcessorTopologyTestDriver.java
+++ b/streams/src/test/java/org/apache/kafka/test/ProcessorTopologyTestDriver.java
@@ -142,7 +142,7 @@ public class ProcessorTopologyTestDriver {
*/
public ProcessorTopologyTestDriver(StreamingConfig config, TopologyBuilder builder, String... storeNames) {
id = new TaskId(0, 0);
- topology = builder.build();
+ topology = builder.build(null);
// Set up the consumer and producer ...
consumer = new MockConsumer<>(OffsetResetStrategy.EARLIEST);