You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by da...@apache.org on 2017/07/20 17:47:54 UTC
kafka git commit: KAFKA-3856;
Cleanup Kafka Stream builder API (KIP-120)
Repository: kafka
Updated Branches:
refs/heads/trunk e0099e1f5 -> b04bed022
KAFKA-3856; Cleanup Kafka Stream builder API (KIP-120)
Author: Matthias J. Sax <ma...@confluent.io>
Reviewers: Damian Guy <da...@gmail.com>, Bill Bejeck <bb...@gmail.com>
Closes #2301 from mjsax/kafka-3856-topology-builder-API
Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/b04bed02
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/b04bed02
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/b04bed02
Branch: refs/heads/trunk
Commit: b04bed022aec0f1f478a03383ab5184f048133b6
Parents: e0099e1
Author: Matthias J. Sax <ma...@confluent.io>
Authored: Thu Jul 20 18:47:47 2017 +0100
Committer: Damian Guy <da...@gmail.com>
Committed: Thu Jul 20 18:47:47 2017 +0100
----------------------------------------------------------------------
checkstyle/suppressions.xml | 4 +-
.../streams/processor/TopologyBuilder.java | 144 +++++-
.../streams/processor/TopologyDescription.java | 476 +++++++++++++++++++
.../kafka/streams/processor/TopologyTest.java | 405 ++++++++++++++++
4 files changed, 1012 insertions(+), 17 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/kafka/blob/b04bed02/checkstyle/suppressions.xml
----------------------------------------------------------------------
diff --git a/checkstyle/suppressions.xml b/checkstyle/suppressions.xml
index 5c00ede..6d2b559 100644
--- a/checkstyle/suppressions.xml
+++ b/checkstyle/suppressions.xml
@@ -117,7 +117,7 @@
<!-- Streams -->
<suppress checks="ClassFanOutComplexity"
- files="(KafkaStreams|KStreamImpl|KTableImpl|StreamThread|StreamTask).java"/>
+ files="(TopologyBuilder|KafkaStreams|KStreamImpl|KTableImpl|StreamThread|StreamTask).java"/>
<suppress checks="MethodLength"
files="StreamPartitionAssignor.java"/>
@@ -128,7 +128,7 @@
files="RocksDBWindowStoreSupplier.java"/>
<suppress checks="ClassDataAbstractionCoupling"
- files="(KStreamImpl|StreamPartitionAssignor|KafkaStreams|KTableImpl).java"/>
+ files="(TopologyBuilder|KStreamImpl|StreamPartitionAssignor|KafkaStreams|KTableImpl).java"/>
<suppress checks="CyclomaticComplexity"
files="TopologyBuilder.java"/>
http://git-wip-us.apache.org/repos/asf/kafka/blob/b04bed02/streams/src/main/java/org/apache/kafka/streams/processor/TopologyBuilder.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/TopologyBuilder.java b/streams/src/main/java/org/apache/kafka/streams/processor/TopologyBuilder.java
index c3614bb..4508c77 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/TopologyBuilder.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/TopologyBuilder.java
@@ -144,23 +144,25 @@ public class TopologyBuilder {
}
private static abstract class NodeFactory {
- public final String name;
+ final String name;
+ final String[] parents;
- NodeFactory(String name) {
+ NodeFactory(final String name, final String[] parents) {
this.name = name;
+ this.parents = parents;
}
public abstract ProcessorNode build();
+
+ abstract TopologyDescription.AbstractNode describe();
}
private static class ProcessorNodeFactory extends NodeFactory {
- private final String[] parents;
private final ProcessorSupplier<?, ?> supplier;
private final Set<String> stateStoreNames = new HashSet<>();
ProcessorNodeFactory(String name, String[] parents, ProcessorSupplier<?, ?> supplier) {
- super(name);
- this.parents = parents.clone();
+ super(name, parents.clone());
this.supplier = supplier;
}
@@ -172,6 +174,11 @@ public class TopologyBuilder {
public ProcessorNode build() {
return new ProcessorNode<>(name, supplier.get(), stateStoreNames);
}
+
+ @Override
+ TopologyDescription.Processor describe() {
+ return new TopologyDescription.Processor(name, new HashSet<>(stateStoreNames));
+ }
}
private class SourceNodeFactory extends NodeFactory {
@@ -187,7 +194,7 @@ public class TopologyBuilder {
final TimestampExtractor timestampExtractor,
final Deserializer<?> keyDeserializer,
final Deserializer<?> valDeserializer) {
- super(name);
+ super(name, new String[0]);
this.topics = topics != null ? Arrays.asList(topics) : new ArrayList<String>();
this.pattern = pattern;
this.keyDeserializer = keyDeserializer;
@@ -236,18 +243,30 @@ public class TopologyBuilder {
private boolean isMatch(String topic) {
return this.pattern.matcher(topic).matches();
}
+
+ @Override
+ TopologyDescription.Source describe() {
+ String sourceTopics;
+
+ if (pattern == null) {
+ sourceTopics = topics.toString();
+ sourceTopics = sourceTopics.substring(1, sourceTopics.length() - 1); // trim first and last, ie. []
+ } else {
+ sourceTopics = pattern.toString();
+ }
+
+ return new TopologyDescription.Source(name, sourceTopics);
+ }
}
private class SinkNodeFactory<K, V> extends NodeFactory {
- private final String[] parents;
private final String topic;
private final Serializer<K> keySerializer;
private final Serializer<V> valSerializer;
private final StreamPartitioner<? super K, ? super V> partitioner;
private SinkNodeFactory(String name, String[] parents, String topic, Serializer<K> keySerializer, Serializer<V> valSerializer, StreamPartitioner<? super K, ? super V> partitioner) {
- super(name);
- this.parents = parents.clone();
+ super(name, parents.clone());
this.topic = topic;
this.keySerializer = keySerializer;
this.valSerializer = valSerializer;
@@ -263,6 +282,11 @@ public class TopologyBuilder {
return new SinkNode<>(name, topic, keySerializer, valSerializer, partitioner);
}
}
+
+ @Override
+ TopologyDescription.Sink describe() {
+ return new TopologyDescription.Sink(name, topic);
+ }
}
public static class TopicsInfo {
@@ -1196,12 +1220,8 @@ public class TopologyBuilder {
for (final Map.Entry<Integer, Set<String>> nodeGroup : nodeGroups().entrySet()) {
final Set<String> nodes = nodeGroup.getValue();
for (String node : nodes) {
- final NodeFactory nodeFactory = nodeFactories.get(node);
- if (nodeFactory instanceof SourceNodeFactory) {
- final List<String> topics = ((SourceNodeFactory) nodeFactory).topics;
- if (topics != null && topics.size() == 1 && globalTopics.contains(topics.get(0))) {
- globalGroups.addAll(nodes);
- }
+ if (isGlobalSource(node)) {
+ globalGroups.addAll(nodes);
}
}
}
@@ -1558,4 +1578,98 @@ public class TopologyBuilder {
setRegexMatchedTopicsToSourceNodes();
setRegexMatchedTopicToStateStore();
}
+
+ private boolean isGlobalSource(final String nodeName) {
+ final NodeFactory nodeFactory = nodeFactories.get(nodeName);
+
+ if (nodeFactory instanceof SourceNodeFactory) {
+ final List<String> topics = ((SourceNodeFactory) nodeFactory).topics;
+ if (topics != null && topics.size() == 1 && globalTopics.contains(topics.get(0))) {
+ return true;
+ }
+ }
+
+ return false;
+ }
+
+ TopologyDescription describe() {
+ final TopologyDescription description = new TopologyDescription();
+
+ describeSubtopologies(description);
+ describeGlobalStores(description);
+
+ return description;
+ }
+
+ private void describeSubtopologies(final TopologyDescription description) {
+ for (final Map.Entry<Integer, Set<String>> nodeGroup : makeNodeGroups().entrySet()) {
+
+ final Set<String> allNodesOfGroups = nodeGroup.getValue();
+ final boolean isNodeGroupOfGlobalStores = nodeGroupContainsGlobalSourceNode(allNodesOfGroups);
+
+ if (!isNodeGroupOfGlobalStores) {
+ describeSubtopology(description, nodeGroup.getKey(), allNodesOfGroups);
+ }
+ }
+ }
+
+ private boolean nodeGroupContainsGlobalSourceNode(final Set<String> allNodesOfGroups) {
+ for (final String node : allNodesOfGroups) {
+ if (isGlobalSource(node)) {
+ return true;
+ }
+ }
+ return false;
+ }
+
+ private void describeSubtopology(final TopologyDescription description,
+ final Integer subtopologyId,
+ final Set<String> nodeNames) {
+
+ final HashMap<String, TopologyDescription.AbstractNode> nodesByName = new HashMap<>();
+
+ // add all nodes
+ for (final String nodeName : nodeNames) {
+ nodesByName.put(nodeName, nodeFactories.get(nodeName).describe());
+ }
+
+ // connect each node to its predecessors and successors
+ for (final TopologyDescription.AbstractNode node : nodesByName.values()) {
+ for (final String predecessorName : nodeFactories.get(node.name()).parents) {
+ final TopologyDescription.AbstractNode predecessor = nodesByName.get(predecessorName);
+ node.addPredecessor(predecessor);
+ predecessor.addSuccessor(node);
+ }
+ }
+
+ description.addSubtopology(new TopologyDescription.Subtopology(
+ subtopologyId,
+ new HashSet<TopologyDescription.Node>(nodesByName.values())));
+ }
+
+ private void describeGlobalStores(final TopologyDescription description) {
+ for (final Map.Entry<Integer, Set<String>> nodeGroup : makeNodeGroups().entrySet()) {
+ final Set<String> nodes = nodeGroup.getValue();
+
+ final Iterator<String> it = nodes.iterator();
+ while (it.hasNext()) {
+ final String node = it.next();
+
+ if (isGlobalSource(node)) {
+ // we found a GlobalStore node group; those contain exactly two node: {sourceNode,processorNode}
+ it.remove(); // remove sourceNode from group
+ final String processorNode = nodes.iterator().next(); // get remaining processorNode
+
+ description.addGlobalStore(new TopologyDescription.GlobalStore(
+ node,
+ processorNode,
+ ((ProcessorNodeFactory) nodeFactories.get(processorNode)).stateStoreNames.iterator().next(),
+ nodeToSourceTopics.get(node).get(0)
+ ));
+ break;
+ }
+ }
+ }
+ }
+
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/b04bed02/streams/src/main/java/org/apache/kafka/streams/processor/TopologyDescription.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/TopologyDescription.java b/streams/src/main/java/org/apache/kafka/streams/processor/TopologyDescription.java
new file mode 100644
index 0000000..0949bf5
--- /dev/null
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/TopologyDescription.java
@@ -0,0 +1,476 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.processor;
+
+import org.apache.kafka.streams.KafkaStreams;
+import org.apache.kafka.streams.processor.internals.StreamTask;
+
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.Objects;
+import java.util.Set;
+
+/**
+ * A meta representation of a {@link Topology topology}.
+ * <p>
+ * The nodes of a topology are grouped into {@link Subtopology sub-topologies} if they are connected.
+ * In contrast, two sub-topologies are not connected but can be linked to each other via topics, i.e., if one
+ * sub-topology {@link Topology#addSink(String, String, String...) writes} into a topic and another sub-topology
+ * {@link Topology#addSource(String, String...) reads} from the same topic.
+ * <p>
+ * For {@link KafkaStreams#start() execution} sub-topologies are translated into {@link StreamTask tasks}.
+ */
+// TODO make public (hide until KIP-120 if fully implemented)
+final class TopologyDescription {
+ private final Set<Subtopology> subtopologies = new HashSet<>();
+ private final Set<GlobalStore> globalStores = new HashSet<>();
+
+ /**
+ * A connected sub-graph of a {@link Topology}.
+ * <p>
+ * Nodes of a {@code Subtopology} are connected {@link Topology#addProcessor(String, ProcessorSupplier, String...)
+ * directly} or indirectly via {@link Topology#connectProcessorAndStateStores(String, String...) state stores}
+ * (i.e., if multiple processors share the same state).
+ */
+ public final static class Subtopology {
+ private final int id;
+ private final Set<Node> nodes;
+
+ Subtopology(final int id,
+ final Set<Node> nodes) {
+ this.id = id;
+ this.nodes = nodes;
+ }
+
+ /**
+ * Internally assigned unique ID.
+ * @return the ID of the sub-topology
+ */
+ public int id() {
+ return id;
+ }
+
+ /**
+ * All nodes of this sub-topology.
+ * @return set of all nodes within the sub-topology
+ */
+ public Set<Node> nodes() {
+ return Collections.unmodifiableSet(nodes);
+ }
+
+ @Override
+ public String toString() {
+ return "Sub-topology: " + id + "\n" + nodesAsString();
+ }
+
+ private String nodesAsString() {
+ final StringBuilder sb = new StringBuilder();
+ for (final Node node : nodes) {
+ sb.append(" ");
+ sb.append(node);
+ sb.append('\n');
+ }
+ return sb.toString();
+ }
+
+ @Override
+ public boolean equals(final Object o) {
+ if (this == o) {
+ return true;
+ }
+ if (o == null || getClass() != o.getClass()) {
+ return false;
+ }
+
+ final Subtopology that = (Subtopology) o;
+ return id == that.id
+ && nodes.equals(that.nodes);
+ }
+
+ @Override
+ public int hashCode() {
+ return Objects.hash(id, nodes);
+ }
+ }
+
+ /**
+ * Represents a {@link Topology#addGlobalStore(StateStoreSupplier, String,
+ * org.apache.kafka.common.serialization.Deserializer, org.apache.kafka.common.serialization.Deserializer, String,
+ * String, ProcessorSupplier)} global store}.
+ * Adding a global store results in adding a source node and one stateful processor node.
+ * Note, that all added global stores form a single unit (similar to a {@link Subtopology}) even if different
+ * global stores are not connected to each other.
+ * Furthermore, global stores are available to all processors without connecting them explicitly, and thus global
+ * stores will never be part of any {@link Subtopology}.
+ */
+ public final static class GlobalStore {
+ private final Source source;
+ private final Processor processor;
+
+ GlobalStore(final String sourceName,
+ final String processorName,
+ final String storeName,
+ final String topicName) {
+ source = new Source(sourceName, topicName);
+ processor = new Processor(processorName, Collections.singleton(storeName));
+ source.successors.add(processor);
+ processor.predecessors.add(source);
+ }
+
+ /**
+ * The source node reading from a "global" topic.
+ * @return the "global" source node
+ */
+ public Source source() {
+ return source;
+ }
+
+ /**
+ * The processor node maintaining the global store.
+ * @return the "global" processor node
+ */
+ public Processor processor() {
+ return processor;
+ }
+
+ @Override
+ public String toString() {
+ return "GlobalStore: " + source.name + "(topic: " + source.topics + ") -> "
+ + processor.name + "(store: " + processor.stores.iterator().next() + ")\n";
+ }
+
+ @Override
+ public boolean equals(final Object o) {
+ if (this == o) {
+ return true;
+ }
+ if (o == null || getClass() != o.getClass()) {
+ return false;
+ }
+
+ final GlobalStore that = (GlobalStore) o;
+ return source.equals(that.source)
+ && processor.equals(that.processor);
+ }
+
+ @Override
+ public int hashCode() {
+ return Objects.hash(source, processor);
+ }
+ }
+
+ /**
+ * A node of a topology. Can be a source, sink, or processor node.
+ */
+ public interface Node {
+ /**
+ * The name of the node. Will never be {@code null}.
+ * @return the name of the node
+ */
+ String name();
+ /**
+ * The predecessors of this node within a sub-topology.
+ * Note, sources do not have any predecessors.
+ * Will never be {@code null}.
+ * @return set of all predecessors
+ */
+ Set<Node> predecessors();
+ /**
+ * The successor of this node within a sub-topology.
+ * Note, sinks do not have any successors.
+ * Will never be {@code null}.
+ * @return set of all successor
+ */
+ Set<Node> successors();
+ }
+
+ abstract static class AbstractNode implements Node {
+ final String name;
+ final Set<Node> predecessors = new HashSet<>();
+ final Set<Node> successors = new HashSet<>();
+
+ AbstractNode(final String name) {
+ this.name = name;
+ }
+
+ @Override
+ public String name() {
+ return name;
+ }
+
+ @Override
+ public Set<Node> predecessors() {
+ return Collections.unmodifiableSet(predecessors);
+ }
+
+ @Override
+ public Set<Node> successors() {
+ return Collections.unmodifiableSet(successors);
+ }
+
+ void addPredecessor(final Node predecessor) {
+ predecessors.add(predecessor);
+ }
+
+ void addSuccessor(final Node successor) {
+ successors.add(successor);
+ }
+ }
+
+ /**
+ * A source node of a topology.
+ */
+ public final static class Source extends AbstractNode {
+ private final String topics;
+
+ Source(final String name,
+ final String topics) {
+ super(name);
+ this.topics = topics;
+ }
+
+ /**
+ * The topic names this source node is reading from.
+ * @return comma separated list of topic names or pattern (as String)
+ */
+ public String topics() {
+ return topics;
+ }
+
+ @Override
+ void addPredecessor(final Node predecessor) {
+ throw new UnsupportedOperationException("Sources don't have predecessors.");
+ }
+
+ @Override
+ public String toString() {
+ return "Source: " + name + "(topics: " + topics + ") --> " + nodeNames(successors);
+ }
+
+ @Override
+ public boolean equals(final Object o) {
+ if (this == o) {
+ return true;
+ }
+ if (o == null || getClass() != o.getClass()) {
+ return false;
+ }
+
+ final Source source = (Source) o;
+ // omit successor to avoid infinite loops
+ return name.equals(source.name)
+ && topics.equals(source.topics);
+ }
+
+ @Override
+ public int hashCode() {
+ // omit successor as it might change and alter the hash code
+ return Objects.hash(name, topics);
+ }
+ }
+
+ /**
+ * A processor node of a topology.
+ */
+ public final static class Processor extends AbstractNode {
+ private final Set<String> stores;
+
+ Processor(final String name,
+ final Set<String> stores) {
+ super(name);
+ this.stores = stores;
+ }
+
+ /**
+ * The names of all connected stores.
+ * @return set of store names
+ */
+ public Set<String> stores() {
+ return Collections.unmodifiableSet(stores);
+ }
+
+ @Override
+ public String toString() {
+ return "Processor: " + name + "(stores: " + stores + ") --> " + nodeNames(successors) + " <-- " + nodeNames(predecessors);
+ }
+
+ @Override
+ public boolean equals(final Object o) {
+ if (this == o) {
+ return true;
+ }
+ if (o == null || getClass() != o.getClass()) {
+ return false;
+ }
+
+ final Processor processor = (Processor) o;
+ // omit successor to avoid infinite loops
+ return name.equals(processor.name)
+ && stores.equals(processor.stores)
+ && predecessors.equals(processor.predecessors);
+ }
+
+ @Override
+ public int hashCode() {
+ // omit successor as it might change and alter the hash code
+ return Objects.hash(name, stores);
+ }
+ }
+
+ /**
+ * A sink node of a topology.
+ */
+ public final static class Sink extends AbstractNode {
+ private final String topic;
+
+ Sink(final String name,
+ final String topic) {
+ super(name);
+ this.topic = topic;
+ }
+
+ /**
+ * The topic name this sink node is writing to.
+ * @return a topic name
+ */
+ public String topic() {
+ return topic;
+ }
+
+ @Override
+ void addSuccessor(final Node successor) {
+ throw new UnsupportedOperationException("Sinks don't have successors.");
+ }
+
+ @Override
+ public String toString() {
+ return "Sink: " + name + "(topic: " + topic + ") <-- " + nodeNames(predecessors);
+ }
+
+ @Override
+ public boolean equals(final Object o) {
+ if (this == o) {
+ return true;
+ }
+ if (o == null || getClass() != o.getClass()) {
+ return false;
+ }
+
+ final Sink sink = (Sink) o;
+ return name.equals(sink.name)
+ && topic.equals(sink.topic)
+ && predecessors.equals(sink.predecessors);
+ }
+
+ @Override
+ public int hashCode() {
+ // omit predecessors as it might change and alter the hash code
+ return Objects.hash(name, topic);
+ }
+ }
+
+ void addSubtopology(final Subtopology subtopology) {
+ subtopologies.add(subtopology);
+ }
+
+ void addGlobalStore(final GlobalStore globalStore) {
+ globalStores.add(globalStore);
+ }
+
+ /**
+ * All sub-topologies of the represented topology.
+ * @return set of all sub-topologies
+ */
+ public Set<Subtopology> subtopologies() {
+ return Collections.unmodifiableSet(subtopologies);
+ }
+
+ /**
+ * All global stores of the represented topology.
+ * @return set of all global stores
+ */
+ public Set<GlobalStore> globalStores() {
+ return Collections.unmodifiableSet(globalStores);
+ }
+
+ @Override
+ public String toString() {
+ return subtopologiesAsString() + globalStoresAsString();
+ }
+
+ private static String nodeNames(final Set<Node> nodes) {
+ final StringBuilder sb = new StringBuilder();
+ if (!nodes.isEmpty()) {
+ for (final Node n : nodes) {
+ sb.append(n.name());
+ sb.append(", ");
+ }
+ sb.deleteCharAt(sb.length() - 1);
+ sb.deleteCharAt(sb.length() - 1);
+ }
+ return sb.toString();
+ }
+
+ private String subtopologiesAsString() {
+ final StringBuilder sb = new StringBuilder();
+ sb.append("Sub-topologies: \n");
+ if (subtopologies.isEmpty()) {
+ sb.append(" none\n");
+ } else {
+ for (final Subtopology st : subtopologies) {
+ sb.append(" ");
+ sb.append(st);
+ }
+ }
+ return sb.toString();
+ }
+
+ private String globalStoresAsString() {
+ final StringBuilder sb = new StringBuilder();
+ sb.append("Global Stores:\n");
+ if (globalStores.isEmpty()) {
+ sb.append(" none\n");
+ } else {
+ for (final GlobalStore gs : globalStores) {
+ sb.append(" ");
+ sb.append(gs);
+ }
+ }
+ return sb.toString();
+ }
+
+ @Override
+ public boolean equals(final Object o) {
+ if (this == o) {
+ return true;
+ }
+ if (o == null || getClass() != o.getClass()) {
+ return false;
+ }
+
+ final TopologyDescription that = (TopologyDescription) o;
+ return subtopologies.equals(that.subtopologies)
+ && globalStores.equals(that.globalStores);
+ }
+
+ @Override
+ public int hashCode() {
+ return Objects.hash(subtopologies, globalStores);
+ }
+
+}
+
http://git-wip-us.apache.org/repos/asf/kafka/blob/b04bed02/streams/src/test/java/org/apache/kafka/streams/processor/TopologyTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/TopologyTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/TopologyTest.java
new file mode 100644
index 0000000..17c5640
--- /dev/null
+++ b/streams/src/test/java/org/apache/kafka/streams/processor/TopologyTest.java
@@ -0,0 +1,405 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.processor;
+
+import org.apache.kafka.test.MockProcessorSupplier;
+import org.apache.kafka.test.MockStateStoreSupplier;
+import org.junit.Test;
+
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.Set;
+import java.util.regex.Pattern;
+
+import static org.hamcrest.CoreMatchers.equalTo;
+import static org.hamcrest.MatcherAssert.assertThat;
+
+// TODO (remove this comment) Test name ok, we just use TopologyBuilder for now in this test until Topology gets added
+public class TopologyTest {
+ // TODO change from TopologyBuilder to Topology
+ private final TopologyBuilder topology = new TopologyBuilder();
+ private final TopologyDescription expectedDescription = new TopologyDescription();
+
+ @Test
+ public void shouldDescribeEmptyTopology() {
+ assertThat(topology.describe(), equalTo(expectedDescription));
+ }
+
+ @Test
+ public void singleSourceShouldHaveSingleSubtopology() {
+ final TopologyDescription.Source expectedSourceNode = addSource("source", "topic");
+
+ expectedDescription.addSubtopology(
+ new TopologyDescription.Subtopology(0,
+ Collections.<TopologyDescription.Node>singleton(expectedSourceNode)));
+
+ assertThat(topology.describe(), equalTo(expectedDescription));
+ }
+
+ @Test
+ public void singleSourceWithListOfTopicsShouldHaveSingleSubtopology() {
+ final TopologyDescription.Source expectedSourceNode = addSource("source", "topic1", "topic2", "topic3");
+
+ expectedDescription.addSubtopology(
+ new TopologyDescription.Subtopology(0,
+ Collections.<TopologyDescription.Node>singleton(expectedSourceNode)));
+
+ assertThat(topology.describe(), equalTo(expectedDescription));
+ }
+
+ @Test
+ public void singleSourcePatternShouldHaveSingleSubtopology() {
+ final TopologyDescription.Source expectedSourceNode = addSource("source", Pattern.compile("topic[0-9]"));
+
+ expectedDescription.addSubtopology(
+ new TopologyDescription.Subtopology(0,
+ Collections.<TopologyDescription.Node>singleton(expectedSourceNode)));
+
+ assertThat(topology.describe(), equalTo(expectedDescription));
+ }
+
+ @Test
+ public void multipleSourcesShouldHaveDistinctSubtopologies() {
+ final TopologyDescription.Source expectedSourceNode1 = addSource("source1", "topic1");
+ expectedDescription.addSubtopology(
+ new TopologyDescription.Subtopology(0,
+ Collections.<TopologyDescription.Node>singleton(expectedSourceNode1)));
+
+ final TopologyDescription.Source expectedSourceNode2 = addSource("source2", "topic2");
+ expectedDescription.addSubtopology(
+ new TopologyDescription.Subtopology(1,
+ Collections.<TopologyDescription.Node>singleton(expectedSourceNode2)));
+
+ final TopologyDescription.Source expectedSourceNode3 = addSource("source3", "topic3");
+ expectedDescription.addSubtopology(
+ new TopologyDescription.Subtopology(2,
+ Collections.<TopologyDescription.Node>singleton(expectedSourceNode3)));
+
+ assertThat(topology.describe(), equalTo(expectedDescription));
+ }
+
+ @Test
+ public void sourceAndProcessorShouldHaveSingleSubtopology() {
+ final TopologyDescription.Source expectedSourceNode = addSource("source", "topic");
+ final TopologyDescription.Processor expectedProcessorNode = addProcessor("processor", expectedSourceNode);
+
+ final Set<TopologyDescription.Node> allNodes = new HashSet<>();
+ allNodes.add(expectedSourceNode);
+ allNodes.add(expectedProcessorNode);
+ expectedDescription.addSubtopology(new TopologyDescription.Subtopology(0, allNodes));
+
+ assertThat(topology.describe(), equalTo(expectedDescription));
+ }
+
+ @Test
+ public void sourceAndProcessorWithStateShouldHaveSingleSubtopology() {
+ final TopologyDescription.Source expectedSourceNode = addSource("source", "topic");
+ final String[] store = new String[] {"store"};
+ final TopologyDescription.Processor expectedProcessorNode
+ = addProcessorWithNewStore("processor", store, expectedSourceNode);
+
+ final Set<TopologyDescription.Node> allNodes = new HashSet<>();
+ allNodes.add(expectedSourceNode);
+ allNodes.add(expectedProcessorNode);
+ expectedDescription.addSubtopology(new TopologyDescription.Subtopology(0, allNodes));
+
+ assertThat(topology.describe(), equalTo(expectedDescription));
+ }
+
+
+ @Test
+ public void sourceAndProcessorWithMultipleStatesShouldHaveSingleSubtopology() {
+ final TopologyDescription.Source expectedSourceNode = addSource("source", "topic");
+ final String[] stores = new String[] {"store1", "store2"};
+ final TopologyDescription.Processor expectedProcessorNode
+ = addProcessorWithNewStore("processor", stores, expectedSourceNode);
+
+ final Set<TopologyDescription.Node> allNodes = new HashSet<>();
+ allNodes.add(expectedSourceNode);
+ allNodes.add(expectedProcessorNode);
+ expectedDescription.addSubtopology(new TopologyDescription.Subtopology(0, allNodes));
+
+ assertThat(topology.describe(), equalTo(expectedDescription));
+ }
+
+ @Test
+ public void sourceWithMultipleProcessorsShouldHaveSingleSubtopology() {
+ final TopologyDescription.Source expectedSourceNode = addSource("source", "topic");
+ final TopologyDescription.Processor expectedProcessorNode1 = addProcessor("processor1", expectedSourceNode);
+ final TopologyDescription.Processor expectedProcessorNode2 = addProcessor("processor2", expectedSourceNode);
+
+ final Set<TopologyDescription.Node> allNodes = new HashSet<>();
+ allNodes.add(expectedSourceNode);
+ allNodes.add(expectedProcessorNode1);
+ allNodes.add(expectedProcessorNode2);
+ expectedDescription.addSubtopology(new TopologyDescription.Subtopology(0, allNodes));
+
+ assertThat(topology.describe(), equalTo(expectedDescription));
+ }
+
+ @Test
+ public void processorWithMultipleSourcesShouldHaveSingleSubtopology() {
+ final TopologyDescription.Source expectedSourceNode1 = addSource("source1", "topic0");
+ final TopologyDescription.Source expectedSourceNode2 = addSource("source2", Pattern.compile("topic[1-9]"));
+ final TopologyDescription.Processor expectedProcessorNode = addProcessor("processor", expectedSourceNode1, expectedSourceNode2);
+
+ final Set<TopologyDescription.Node> allNodes = new HashSet<>();
+ allNodes.add(expectedSourceNode1);
+ allNodes.add(expectedSourceNode2);
+ allNodes.add(expectedProcessorNode);
+ expectedDescription.addSubtopology(new TopologyDescription.Subtopology(0, allNodes));
+
+ assertThat(topology.describe(), equalTo(expectedDescription));
+ }
+
+ @Test
+ public void multipleSourcesWithProcessorsShouldHaveDistinctSubtopologies() {
+ final TopologyDescription.Source expectedSourceNode1 = addSource("source1", "topic1");
+ final TopologyDescription.Processor expectedProcessorNode1 = addProcessor("processor1", expectedSourceNode1);
+
+ final TopologyDescription.Source expectedSourceNode2 = addSource("source2", "topic2");
+ final TopologyDescription.Processor expectedProcessorNode2 = addProcessor("processor2", expectedSourceNode2);
+
+ final TopologyDescription.Source expectedSourceNode3 = addSource("source3", "topic3");
+ final TopologyDescription.Processor expectedProcessorNode3 = addProcessor("processor3", expectedSourceNode3);
+
+ final Set<TopologyDescription.Node> allNodes1 = new HashSet<>();
+ allNodes1.add(expectedSourceNode1);
+ allNodes1.add(expectedProcessorNode1);
+ expectedDescription.addSubtopology(new TopologyDescription.Subtopology(0, allNodes1));
+
+ final Set<TopologyDescription.Node> allNodes2 = new HashSet<>();
+ allNodes2.add(expectedSourceNode2);
+ allNodes2.add(expectedProcessorNode2);
+ expectedDescription.addSubtopology(new TopologyDescription.Subtopology(1, allNodes2));
+
+ final Set<TopologyDescription.Node> allNodes3 = new HashSet<>();
+ allNodes3.add(expectedSourceNode3);
+ allNodes3.add(expectedProcessorNode3);
+ expectedDescription.addSubtopology(new TopologyDescription.Subtopology(2, allNodes3));
+
+ assertThat(topology.describe(), equalTo(expectedDescription));
+ }
+
+ @Test
+ public void multipleSourcesWithSinksShouldHaveDistinctSubtopologies() {
+ final TopologyDescription.Source expectedSourceNode1 = addSource("source1", "topic1");
+ final TopologyDescription.Sink expectedSinkNode1 = addSink("sink1", "sinkTopic1", expectedSourceNode1);
+
+ final TopologyDescription.Source expectedSourceNode2 = addSource("source2", "topic2");
+ final TopologyDescription.Sink expectedSinkNode2 = addSink("sink2", "sinkTopic2", expectedSourceNode2);
+
+ final TopologyDescription.Source expectedSourceNode3 = addSource("source3", "topic3");
+ final TopologyDescription.Sink expectedSinkNode3 = addSink("sink3", "sinkTopic3", expectedSourceNode3);
+
+ final Set<TopologyDescription.Node> allNodes1 = new HashSet<>();
+ allNodes1.add(expectedSourceNode1);
+ allNodes1.add(expectedSinkNode1);
+ expectedDescription.addSubtopology(new TopologyDescription.Subtopology(0, allNodes1));
+
+ final Set<TopologyDescription.Node> allNodes2 = new HashSet<>();
+ allNodes2.add(expectedSourceNode2);
+ allNodes2.add(expectedSinkNode2);
+ expectedDescription.addSubtopology(new TopologyDescription.Subtopology(1, allNodes2));
+
+ final Set<TopologyDescription.Node> allNodes3 = new HashSet<>();
+ allNodes3.add(expectedSourceNode3);
+ allNodes3.add(expectedSinkNode3);
+ expectedDescription.addSubtopology(new TopologyDescription.Subtopology(2, allNodes3));
+
+ assertThat(topology.describe(), equalTo(expectedDescription));
+ }
+
+ @Test
+ public void processorsWithSameSinkShouldHaveSameSubtopology() {
+ final TopologyDescription.Source expectedSourceNode1 = addSource("source", "topic");
+ final TopologyDescription.Processor expectedProcessorNode1 = addProcessor("processor1", expectedSourceNode1);
+
+ final TopologyDescription.Source expectedSourceNode2 = addSource("source2", "topic2");
+ final TopologyDescription.Processor expectedProcessorNode2 = addProcessor("processor2", expectedSourceNode2);
+
+ final TopologyDescription.Source expectedSourceNode3 = addSource("source3", "topic3");
+ final TopologyDescription.Processor expectedProcessorNode3 = addProcessor("processor3", expectedSourceNode3);
+
+ final TopologyDescription.Sink expectedSinkNode = addSink(
+ "sink",
+ "sinkTopic",
+ expectedProcessorNode1,
+ expectedProcessorNode2,
+ expectedProcessorNode3);
+
+ final Set<TopologyDescription.Node> allNodes = new HashSet<>();
+ allNodes.add(expectedSourceNode1);
+ allNodes.add(expectedProcessorNode1);
+ allNodes.add(expectedSourceNode2);
+ allNodes.add(expectedProcessorNode2);
+ allNodes.add(expectedSourceNode3);
+ allNodes.add(expectedProcessorNode3);
+ allNodes.add(expectedSinkNode);
+ expectedDescription.addSubtopology(new TopologyDescription.Subtopology(0, allNodes));
+
+ assertThat(topology.describe(), equalTo(expectedDescription));
+ }
+
+ @Test
+ public void processorsWithSharedStateShouldHaveSameSubtopology() {
+ final String[] store1 = new String[] {"store1"};
+ final String[] store2 = new String[] {"store2"};
+ final String[] bothStores = new String[] {store1[0], store2[0]};
+
+ final TopologyDescription.Source expectedSourceNode1 = addSource("source", "topic");
+ final TopologyDescription.Processor expectedProcessorNode1
+ = addProcessorWithNewStore("processor1", store1, expectedSourceNode1);
+
+ final TopologyDescription.Source expectedSourceNode2 = addSource("source2", "topic2");
+ final TopologyDescription.Processor expectedProcessorNode2
+ = addProcessorWithNewStore("processor2", store2, expectedSourceNode2);
+
+ final TopologyDescription.Source expectedSourceNode3 = addSource("source3", "topic3");
+ final TopologyDescription.Processor expectedProcessorNode3
+ = addProcessorWithExistingStore("processor3", bothStores, expectedSourceNode3);
+
+ final Set<TopologyDescription.Node> allNodes = new HashSet<>();
+ allNodes.add(expectedSourceNode1);
+ allNodes.add(expectedProcessorNode1);
+ allNodes.add(expectedSourceNode2);
+ allNodes.add(expectedProcessorNode2);
+ allNodes.add(expectedSourceNode3);
+ allNodes.add(expectedProcessorNode3);
+ expectedDescription.addSubtopology(new TopologyDescription.Subtopology(0, allNodes));
+
+ assertThat(topology.describe(), equalTo(expectedDescription));
+ }
+
+ @Test
+ public void shouldDescribeGlobalStoreTopology() {
+ addGlobalStoreToTopologyAndExpectedDescription("globalStore", "source", "globalTopic", "processor");
+ assertThat(topology.describe(), equalTo(expectedDescription));
+ }
+
+ @Test
+ public void shouldDescribeMultipleGlobalStoreTopology() {
+ addGlobalStoreToTopologyAndExpectedDescription("globalStore1", "source1", "globalTopic1", "processor1");
+ addGlobalStoreToTopologyAndExpectedDescription("globalStore2", "source2", "globalTopic2", "processor2");
+ assertThat(topology.describe(), equalTo(expectedDescription));
+ }
+
+ private TopologyDescription.Source addSource(final String sourceName,
+ final String... sourceTopic) {
+ topology.addSource(sourceName, sourceTopic);
+ String allSourceTopics = sourceTopic[0];
+ for (int i = 1; i < sourceTopic.length; ++i) {
+ allSourceTopics += ", " + sourceTopic[i];
+ }
+ return new TopologyDescription.Source(sourceName, allSourceTopics);
+ }
+
+ private TopologyDescription.Source addSource(final String sourceName,
+ final Pattern sourcePattern) {
+ topology.addSource(sourceName, sourcePattern);
+ return new TopologyDescription.Source(sourceName, sourcePattern.toString());
+ }
+
+ private TopologyDescription.Processor addProcessor(final String processorName,
+ final TopologyDescription.AbstractNode... parents) {
+ return addProcessorWithNewStore(processorName, new String[0], parents);
+ }
+
+ private TopologyDescription.Processor addProcessorWithNewStore(final String processorName,
+ final String[] storeNames,
+ final TopologyDescription.AbstractNode... parents) {
+ return addProcessorWithStore(processorName, storeNames, true, parents);
+ }
+
+ private TopologyDescription.Processor addProcessorWithExistingStore(final String processorName,
+ final String[] storeNames,
+ final TopologyDescription.AbstractNode... parents) {
+ return addProcessorWithStore(processorName, storeNames, false, parents);
+ }
+
+ private TopologyDescription.Processor addProcessorWithStore(final String processorName,
+ final String[] storeNames,
+ final boolean newStores,
+ final TopologyDescription.AbstractNode... parents) {
+ final String[] parentNames = new String[parents.length];
+ for (int i = 0; i < parents.length; ++i) {
+ parentNames[i] = parents[i].name();
+ }
+
+ topology.addProcessor(processorName, new MockProcessorSupplier(), parentNames);
+ if (newStores) {
+ for (final String store : storeNames) {
+ topology.addStateStore(new MockStateStoreSupplier(store, false), processorName);
+ }
+ } else {
+ topology.connectProcessorAndStateStores(processorName, storeNames);
+ }
+ final TopologyDescription.Processor expectedProcessorNode
+ = new TopologyDescription.Processor(processorName, new HashSet<>(Arrays.asList(storeNames)));
+
+ for (final TopologyDescription.AbstractNode parent : parents) {
+ parent.addSuccessor(expectedProcessorNode);
+ expectedProcessorNode.addPredecessor(parent);
+ }
+
+ return expectedProcessorNode;
+ }
+
+ private TopologyDescription.Sink addSink(final String sinkName,
+ final String sinkTopic,
+ final TopologyDescription.AbstractNode... parents) {
+ final String[] parentNames = new String[parents.length];
+ for (int i = 0; i < parents.length; ++i) {
+ parentNames[i] = parents[i].name();
+ }
+
+ topology.addSink(sinkName, sinkTopic, parentNames);
+ final TopologyDescription.Sink expectedSinkNode
+ = new TopologyDescription.Sink(sinkName, sinkTopic);
+
+ for (final TopologyDescription.AbstractNode parent : parents) {
+ parent.addSuccessor(expectedSinkNode);
+ expectedSinkNode.addPredecessor(parent);
+ }
+
+ return expectedSinkNode;
+ }
+
+ private void addGlobalStoreToTopologyAndExpectedDescription(final String globalStoreName,
+ final String sourceName,
+ final String globalTopicName,
+ final String processorName) {
+ topology.addGlobalStore(
+ new MockStateStoreSupplier(globalStoreName, false, false),
+ sourceName,
+ null,
+ null,
+ globalTopicName,
+ processorName,
+ new MockProcessorSupplier());
+
+ final TopologyDescription.GlobalStore expectedGlobalStore = new TopologyDescription.GlobalStore(
+ sourceName,
+ processorName,
+ globalStoreName,
+ globalTopicName);
+
+ expectedDescription.addGlobalStore(expectedGlobalStore);
+ }
+
+}