You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by gu...@apache.org on 2017/07/24 18:03:32 UTC
[3/5] kafka git commit: KAFKA-3856 (KIP-120) step two: extract
internal functions from public facing TopologyBuilder class
http://git-wip-us.apache.org/repos/asf/kafka/blob/5d798511/streams/src/main/java/org/apache/kafka/streams/processor/TopologyDescription.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/TopologyDescription.java b/streams/src/main/java/org/apache/kafka/streams/processor/TopologyDescription.java
deleted file mode 100644
index 0949bf5..0000000
--- a/streams/src/main/java/org/apache/kafka/streams/processor/TopologyDescription.java
+++ /dev/null
@@ -1,476 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.kafka.streams.processor;
-
-import org.apache.kafka.streams.KafkaStreams;
-import org.apache.kafka.streams.processor.internals.StreamTask;
-
-import java.util.Collections;
-import java.util.HashSet;
-import java.util.Objects;
-import java.util.Set;
-
-/**
- * A meta representation of a {@link Topology topology}.
- * <p>
- * The nodes of a topology are grouped into {@link Subtopology sub-topologies} if they are connected.
- * In contrast, two sub-topologies are not connected but can be linked to each other via topics, i.e., if one
- * sub-topology {@link Topology#addSink(String, String, String...) writes} into a topic and another sub-topology
- * {@link Topology#addSource(String, String...) reads} from the same topic.
- * <p>
- * For {@link KafkaStreams#start() execution} sub-topologies are translated into {@link StreamTask tasks}.
- */
-// TODO make public (hide until KIP-120 if fully implemented)
-final class TopologyDescription {
- private final Set<Subtopology> subtopologies = new HashSet<>();
- private final Set<GlobalStore> globalStores = new HashSet<>();
-
- /**
- * A connected sub-graph of a {@link Topology}.
- * <p>
- * Nodes of a {@code Subtopology} are connected {@link Topology#addProcessor(String, ProcessorSupplier, String...)
- * directly} or indirectly via {@link Topology#connectProcessorAndStateStores(String, String...) state stores}
- * (i.e., if multiple processors share the same state).
- */
- public final static class Subtopology {
- private final int id;
- private final Set<Node> nodes;
-
- Subtopology(final int id,
- final Set<Node> nodes) {
- this.id = id;
- this.nodes = nodes;
- }
-
- /**
- * Internally assigned unique ID.
- * @return the ID of the sub-topology
- */
- public int id() {
- return id;
- }
-
- /**
- * All nodes of this sub-topology.
- * @return set of all nodes within the sub-topology
- */
- public Set<Node> nodes() {
- return Collections.unmodifiableSet(nodes);
- }
-
- @Override
- public String toString() {
- return "Sub-topology: " + id + "\n" + nodesAsString();
- }
-
- private String nodesAsString() {
- final StringBuilder sb = new StringBuilder();
- for (final Node node : nodes) {
- sb.append(" ");
- sb.append(node);
- sb.append('\n');
- }
- return sb.toString();
- }
-
- @Override
- public boolean equals(final Object o) {
- if (this == o) {
- return true;
- }
- if (o == null || getClass() != o.getClass()) {
- return false;
- }
-
- final Subtopology that = (Subtopology) o;
- return id == that.id
- && nodes.equals(that.nodes);
- }
-
- @Override
- public int hashCode() {
- return Objects.hash(id, nodes);
- }
- }
-
- /**
- * Represents a {@link Topology#addGlobalStore(StateStoreSupplier, String,
- * org.apache.kafka.common.serialization.Deserializer, org.apache.kafka.common.serialization.Deserializer, String,
- * String, ProcessorSupplier)} global store}.
- * Adding a global store results in adding a source node and one stateful processor node.
- * Note, that all added global stores form a single unit (similar to a {@link Subtopology}) even if different
- * global stores are not connected to each other.
- * Furthermore, global stores are available to all processors without connecting them explicitly, and thus global
- * stores will never be part of any {@link Subtopology}.
- */
- public final static class GlobalStore {
- private final Source source;
- private final Processor processor;
-
- GlobalStore(final String sourceName,
- final String processorName,
- final String storeName,
- final String topicName) {
- source = new Source(sourceName, topicName);
- processor = new Processor(processorName, Collections.singleton(storeName));
- source.successors.add(processor);
- processor.predecessors.add(source);
- }
-
- /**
- * The source node reading from a "global" topic.
- * @return the "global" source node
- */
- public Source source() {
- return source;
- }
-
- /**
- * The processor node maintaining the global store.
- * @return the "global" processor node
- */
- public Processor processor() {
- return processor;
- }
-
- @Override
- public String toString() {
- return "GlobalStore: " + source.name + "(topic: " + source.topics + ") -> "
- + processor.name + "(store: " + processor.stores.iterator().next() + ")\n";
- }
-
- @Override
- public boolean equals(final Object o) {
- if (this == o) {
- return true;
- }
- if (o == null || getClass() != o.getClass()) {
- return false;
- }
-
- final GlobalStore that = (GlobalStore) o;
- return source.equals(that.source)
- && processor.equals(that.processor);
- }
-
- @Override
- public int hashCode() {
- return Objects.hash(source, processor);
- }
- }
-
- /**
- * A node of a topology. Can be a source, sink, or processor node.
- */
- public interface Node {
- /**
- * The name of the node. Will never be {@code null}.
- * @return the name of the node
- */
- String name();
- /**
- * The predecessors of this node within a sub-topology.
- * Note, sources do not have any predecessors.
- * Will never be {@code null}.
- * @return set of all predecessors
- */
- Set<Node> predecessors();
- /**
- * The successor of this node within a sub-topology.
- * Note, sinks do not have any successors.
- * Will never be {@code null}.
- * @return set of all successor
- */
- Set<Node> successors();
- }
-
- abstract static class AbstractNode implements Node {
- final String name;
- final Set<Node> predecessors = new HashSet<>();
- final Set<Node> successors = new HashSet<>();
-
- AbstractNode(final String name) {
- this.name = name;
- }
-
- @Override
- public String name() {
- return name;
- }
-
- @Override
- public Set<Node> predecessors() {
- return Collections.unmodifiableSet(predecessors);
- }
-
- @Override
- public Set<Node> successors() {
- return Collections.unmodifiableSet(successors);
- }
-
- void addPredecessor(final Node predecessor) {
- predecessors.add(predecessor);
- }
-
- void addSuccessor(final Node successor) {
- successors.add(successor);
- }
- }
-
- /**
- * A source node of a topology.
- */
- public final static class Source extends AbstractNode {
- private final String topics;
-
- Source(final String name,
- final String topics) {
- super(name);
- this.topics = topics;
- }
-
- /**
- * The topic names this source node is reading from.
- * @return comma separated list of topic names or pattern (as String)
- */
- public String topics() {
- return topics;
- }
-
- @Override
- void addPredecessor(final Node predecessor) {
- throw new UnsupportedOperationException("Sources don't have predecessors.");
- }
-
- @Override
- public String toString() {
- return "Source: " + name + "(topics: " + topics + ") --> " + nodeNames(successors);
- }
-
- @Override
- public boolean equals(final Object o) {
- if (this == o) {
- return true;
- }
- if (o == null || getClass() != o.getClass()) {
- return false;
- }
-
- final Source source = (Source) o;
- // omit successor to avoid infinite loops
- return name.equals(source.name)
- && topics.equals(source.topics);
- }
-
- @Override
- public int hashCode() {
- // omit successor as it might change and alter the hash code
- return Objects.hash(name, topics);
- }
- }
-
- /**
- * A processor node of a topology.
- */
- public final static class Processor extends AbstractNode {
- private final Set<String> stores;
-
- Processor(final String name,
- final Set<String> stores) {
- super(name);
- this.stores = stores;
- }
-
- /**
- * The names of all connected stores.
- * @return set of store names
- */
- public Set<String> stores() {
- return Collections.unmodifiableSet(stores);
- }
-
- @Override
- public String toString() {
- return "Processor: " + name + "(stores: " + stores + ") --> " + nodeNames(successors) + " <-- " + nodeNames(predecessors);
- }
-
- @Override
- public boolean equals(final Object o) {
- if (this == o) {
- return true;
- }
- if (o == null || getClass() != o.getClass()) {
- return false;
- }
-
- final Processor processor = (Processor) o;
- // omit successor to avoid infinite loops
- return name.equals(processor.name)
- && stores.equals(processor.stores)
- && predecessors.equals(processor.predecessors);
- }
-
- @Override
- public int hashCode() {
- // omit successor as it might change and alter the hash code
- return Objects.hash(name, stores);
- }
- }
-
- /**
- * A sink node of a topology.
- */
- public final static class Sink extends AbstractNode {
- private final String topic;
-
- Sink(final String name,
- final String topic) {
- super(name);
- this.topic = topic;
- }
-
- /**
- * The topic name this sink node is writing to.
- * @return a topic name
- */
- public String topic() {
- return topic;
- }
-
- @Override
- void addSuccessor(final Node successor) {
- throw new UnsupportedOperationException("Sinks don't have successors.");
- }
-
- @Override
- public String toString() {
- return "Sink: " + name + "(topic: " + topic + ") <-- " + nodeNames(predecessors);
- }
-
- @Override
- public boolean equals(final Object o) {
- if (this == o) {
- return true;
- }
- if (o == null || getClass() != o.getClass()) {
- return false;
- }
-
- final Sink sink = (Sink) o;
- return name.equals(sink.name)
- && topic.equals(sink.topic)
- && predecessors.equals(sink.predecessors);
- }
-
- @Override
- public int hashCode() {
- // omit predecessors as it might change and alter the hash code
- return Objects.hash(name, topic);
- }
- }
-
- void addSubtopology(final Subtopology subtopology) {
- subtopologies.add(subtopology);
- }
-
- void addGlobalStore(final GlobalStore globalStore) {
- globalStores.add(globalStore);
- }
-
- /**
- * All sub-topologies of the represented topology.
- * @return set of all sub-topologies
- */
- public Set<Subtopology> subtopologies() {
- return Collections.unmodifiableSet(subtopologies);
- }
-
- /**
- * All global stores of the represented topology.
- * @return set of all global stores
- */
- public Set<GlobalStore> globalStores() {
- return Collections.unmodifiableSet(globalStores);
- }
-
- @Override
- public String toString() {
- return subtopologiesAsString() + globalStoresAsString();
- }
-
- private static String nodeNames(final Set<Node> nodes) {
- final StringBuilder sb = new StringBuilder();
- if (!nodes.isEmpty()) {
- for (final Node n : nodes) {
- sb.append(n.name());
- sb.append(", ");
- }
- sb.deleteCharAt(sb.length() - 1);
- sb.deleteCharAt(sb.length() - 1);
- }
- return sb.toString();
- }
-
- private String subtopologiesAsString() {
- final StringBuilder sb = new StringBuilder();
- sb.append("Sub-topologies: \n");
- if (subtopologies.isEmpty()) {
- sb.append(" none\n");
- } else {
- for (final Subtopology st : subtopologies) {
- sb.append(" ");
- sb.append(st);
- }
- }
- return sb.toString();
- }
-
- private String globalStoresAsString() {
- final StringBuilder sb = new StringBuilder();
- sb.append("Global Stores:\n");
- if (globalStores.isEmpty()) {
- sb.append(" none\n");
- } else {
- for (final GlobalStore gs : globalStores) {
- sb.append(" ");
- sb.append(gs);
- }
- }
- return sb.toString();
- }
-
- @Override
- public boolean equals(final Object o) {
- if (this == o) {
- return true;
- }
- if (o == null || getClass() != o.getClass()) {
- return false;
- }
-
- final TopologyDescription that = (TopologyDescription) o;
- return subtopologies.equals(that.subtopologies)
- && globalStores.equals(that.globalStores);
- }
-
- @Override
- public int hashCode() {
- return Objects.hash(subtopologies, globalStores);
- }
-
-}
-
http://git-wip-us.apache.org/repos/asf/kafka/blob/5d798511/streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalTopologyBuilder.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalTopologyBuilder.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalTopologyBuilder.java
new file mode 100644
index 0000000..ff65d31
--- /dev/null
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalTopologyBuilder.java
@@ -0,0 +1,1491 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.processor.internals;
+
+import org.apache.kafka.common.serialization.Deserializer;
+import org.apache.kafka.common.serialization.Serializer;
+import org.apache.kafka.common.utils.Utils;
+import org.apache.kafka.streams.errors.TopologyBuilderException;
+import org.apache.kafka.streams.processor.ProcessorSupplier;
+import org.apache.kafka.streams.processor.StateStore;
+import org.apache.kafka.streams.processor.StateStoreSupplier;
+import org.apache.kafka.streams.processor.StreamPartitioner;
+import org.apache.kafka.streams.processor.TimestampExtractor;
+import org.apache.kafka.streams.processor.TopologyBuilder;
+import org.apache.kafka.streams.processor.internals.StreamPartitionAssignor.SubscriptionUpdates;
+import org.apache.kafka.streams.state.KeyValueStore;
+import org.apache.kafka.streams.state.internals.WindowStoreSupplier;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Set;
+import java.util.regex.Pattern;
+
+
+public class InternalTopologyBuilder {
+
+ private static final Logger log = LoggerFactory.getLogger(InternalTopologyBuilder.class);
+
+ private static final Pattern EMPTY_ZERO_LENGTH_PATTERN = Pattern.compile("");
+
+ private static final String[] NO_PREDECESSORS = {};
+
+ // node factories in a topological order
+ private final LinkedHashMap<String, NodeFactory> nodeFactories = new LinkedHashMap<>();
+
+ // state factories
+ private final Map<String, StateStoreFactory> stateFactories = new HashMap<>();
+
+ // global state factories
+ private final Map<String, StateStore> globalStateStores = new LinkedHashMap<>();
+
+ // all topics subscribed from source processors (without application-id prefix for internal topics)
+ private final Set<String> sourceTopicNames = new HashSet<>();
+
+ // all internal topics auto-created by the topology builder and used in source / sink processors
+ private final Set<String> internalTopicNames = new HashSet<>();
+
+ // groups of source processors that need to be copartitioned
+ private final List<Set<String>> copartitionSourceGroups = new ArrayList<>();
+
+ // map from source processor names to subscribed topics (without application-id prefix for internal topics)
+ private final HashMap<String, List<String>> nodeToSourceTopics = new HashMap<>();
+
+ // map from source processor names to regex subscription patterns
+ private final HashMap<String, Pattern> nodeToSourcePatterns = new LinkedHashMap<>();
+
+ // map from sink processor names to subscribed topic (without application-id prefix for internal topics)
+ private final HashMap<String, String> nodeToSinkTopic = new HashMap<>();
+
+ // map from topics to their matched regex patterns, this is to ensure one topic is passed through on source node
+ // even if it can be matched by multiple regex patterns
+ private final HashMap<String, Pattern> topicToPatterns = new HashMap<>();
+
+ // map from state store names to all the topics subscribed from source processors that
+ // are connected to these state stores
+ private final Map<String, Set<String>> stateStoreNameToSourceTopics = new HashMap<>();
+
+ // map from state store names to all the regex subscribed topics from source processors that
+ // are connected to these state stores
+ private final Map<String, Set<Pattern>> stateStoreNameToSourceRegex = new HashMap<>();
+
+ // map from state store names to this state store's corresponding changelog topic if possible,
+ // this is used in the extended KStreamBuilder.
+ private final Map<String, String> storeToChangelogTopic = new HashMap<>();
+
+ // all global topics
+ private final Set<String> globalTopics = new HashSet<>();
+
+ private final Set<String> earliestResetTopics = new HashSet<>();
+
+ private final Set<String> latestResetTopics = new HashSet<>();
+
+ private final Set<Pattern> earliestResetPatterns = new HashSet<>();
+
+ private final Set<Pattern> latestResetPatterns = new HashSet<>();
+
+ private final QuickUnion<String> nodeGrouper = new QuickUnion<>();
+
+ private SubscriptionUpdates subscriptionUpdates = new SubscriptionUpdates();
+
+ private String applicationId = null;
+
+ private Pattern topicPattern = null;
+
+ private Map<Integer, Set<String>> nodeGroups = null;
+
+ private static class StateStoreFactory {
+ public final Set<String> users;
+
+ public final StateStoreSupplier supplier;
+
+ StateStoreFactory(final StateStoreSupplier supplier) {
+ this.supplier = supplier;
+ users = new HashSet<>();
+ }
+ }
+
+ private static abstract class NodeFactory {
+ final String name;
+ final String[] predecessors;
+
+ NodeFactory(final String name,
+ final String[] predecessors) {
+ this.name = name;
+ this.predecessors = predecessors;
+ }
+
+ public abstract ProcessorNode build();
+
+ abstract AbstractNode describe();
+ }
+
+ private static class ProcessorNodeFactory extends NodeFactory {
+ private final ProcessorSupplier<?, ?> supplier;
+ private final Set<String> stateStoreNames = new HashSet<>();
+
+ ProcessorNodeFactory(final String name,
+ final String[] predecessors,
+ final ProcessorSupplier<?, ?> supplier) {
+ super(name, predecessors.clone());
+ this.supplier = supplier;
+ }
+
+ public void addStateStore(final String stateStoreName) {
+ stateStoreNames.add(stateStoreName);
+ }
+
+ @Override
+ public ProcessorNode build() {
+ return new ProcessorNode<>(name, supplier.get(), stateStoreNames);
+ }
+
+ @Override
+ Processor describe() {
+ return new Processor(name, new HashSet<>(stateStoreNames));
+ }
+ }
+
+ private class SourceNodeFactory extends NodeFactory {
+ private final List<String> topics;
+ private final Pattern pattern;
+ private final Deserializer<?> keyDeserializer;
+ private final Deserializer<?> valDeserializer;
+ private final TimestampExtractor timestampExtractor;
+
+ private SourceNodeFactory(final String name,
+ final String[] topics,
+ final Pattern pattern,
+ final TimestampExtractor timestampExtractor,
+ final Deserializer<?> keyDeserializer,
+ final Deserializer<?> valDeserializer) {
+ super(name, NO_PREDECESSORS);
+ this.topics = topics != null ? Arrays.asList(topics) : new ArrayList<String>();
+ this.pattern = pattern;
+ this.keyDeserializer = keyDeserializer;
+ this.valDeserializer = valDeserializer;
+ this.timestampExtractor = timestampExtractor;
+ }
+
+ List<String> getTopics(final Collection<String> subscribedTopics) {
+ // if it is subscribed via patterns, it is possible that the topic metadata has not been updated
+ // yet and hence the map from source node to topics is stale, in this case we put the pattern as a place holder;
+ // this should only happen for debugging since during runtime this function should always be called after the metadata has updated.
+ if (subscribedTopics.isEmpty()) {
+ return Collections.singletonList("" + pattern + "");
+ }
+
+ final List<String> matchedTopics = new ArrayList<>();
+ for (final String update : subscribedTopics) {
+ if (pattern == topicToPatterns.get(update)) {
+ matchedTopics.add(update);
+ } else if (topicToPatterns.containsKey(update) && isMatch(update)) {
+ // the same topic cannot be matched to more than one pattern
+ // TODO: we should lift this requirement in the future
+ throw new TopologyBuilderException("Topic " + update +
+ " is already matched for another regex pattern " + topicToPatterns.get(update) +
+ " and hence cannot be matched to this regex pattern " + pattern + " any more.");
+ } else if (isMatch(update)) {
+ topicToPatterns.put(update, pattern);
+ matchedTopics.add(update);
+ }
+ }
+ return matchedTopics;
+ }
+
+ @Override
+ public ProcessorNode build() {
+ final List<String> sourceTopics = nodeToSourceTopics.get(name);
+
+ // if it is subscribed via patterns, it is possible that the topic metadata has not been updated
+ // yet and hence the map from source node to topics is stale, in this case we put the pattern as a place holder;
+ // this should only happen for debugging since during runtime this function should always be called after the metadata has updated.
+ if (sourceTopics == null) {
+ return new SourceNode<>(name, Collections.singletonList("" + pattern + ""), timestampExtractor, keyDeserializer, valDeserializer);
+ } else {
+ return new SourceNode<>(name, maybeDecorateInternalSourceTopics(sourceTopics), timestampExtractor, keyDeserializer, valDeserializer);
+ }
+ }
+
+ private boolean isMatch(final String topic) {
+ return pattern.matcher(topic).matches();
+ }
+
+ @Override
+ Source describe() {
+ String sourceTopics;
+
+ if (pattern == null) {
+ sourceTopics = topics.toString();
+ sourceTopics = sourceTopics.substring(1, sourceTopics.length() - 1); // trim first and last, ie. []
+ } else {
+ sourceTopics = pattern.toString();
+ }
+
+ return new Source(name, sourceTopics);
+ }
+ }
+
+ private class SinkNodeFactory<K, V> extends NodeFactory {
+ private final String topic;
+ private final Serializer<K> keySerializer;
+ private final Serializer<V> valSerializer;
+ private final StreamPartitioner<? super K, ? super V> partitioner;
+
+ private SinkNodeFactory(final String name,
+ final String[] predecessors,
+ final String topic,
+ final Serializer<K> keySerializer,
+ final Serializer<V> valSerializer,
+ final StreamPartitioner<? super K, ? super V> partitioner) {
+ super(name, predecessors.clone());
+ this.topic = topic;
+ this.keySerializer = keySerializer;
+ this.valSerializer = valSerializer;
+ this.partitioner = partitioner;
+ }
+
+ @Override
+ public ProcessorNode build() {
+ if (internalTopicNames.contains(topic)) {
+ // prefix the internal topic name with the application id
+ return new SinkNode<>(name, decorateTopic(topic), keySerializer, valSerializer, partitioner);
+ } else {
+ return new SinkNode<>(name, topic, keySerializer, valSerializer, partitioner);
+ }
+ }
+
+ @Override
+ Sink describe() {
+ return new Sink(name, topic);
+ }
+ }
+
+ public synchronized final InternalTopologyBuilder setApplicationId(final String applicationId) {
+ Objects.requireNonNull(applicationId, "applicationId can't be null");
+ this.applicationId = applicationId;
+
+ return this;
+ }
+
+ public final void addSource(final TopologyBuilder.AutoOffsetReset offsetReset,
+ final String name,
+ final TimestampExtractor timestampExtractor,
+ final Deserializer keyDeserializer,
+ final Deserializer valDeserializer,
+ final String... topics) {
+ if (topics.length == 0) {
+ throw new TopologyBuilderException("You must provide at least one topic");
+ }
+ Objects.requireNonNull(name, "name must not be null");
+ if (nodeFactories.containsKey(name)) {
+ throw new TopologyBuilderException("Processor " + name + " is already added.");
+ }
+
+ for (final String topic : topics) {
+ Objects.requireNonNull(topic, "topic names cannot be null");
+ validateTopicNotAlreadyRegistered(topic);
+ maybeAddToResetList(earliestResetTopics, latestResetTopics, offsetReset, topic);
+ sourceTopicNames.add(topic);
+ }
+
+ nodeFactories.put(name, new SourceNodeFactory(name, topics, null, timestampExtractor, keyDeserializer, valDeserializer));
+ nodeToSourceTopics.put(name, Arrays.asList(topics));
+ nodeGrouper.add(name);
+ }
+
+ public final void addGlobalStore(final StateStoreSupplier<KeyValueStore> storeSupplier,
+ final String sourceName,
+ final TimestampExtractor timestampExtractor,
+ final Deserializer keyDeserializer,
+ final Deserializer valueDeserializer,
+ final String topic,
+ final String processorName,
+ final ProcessorSupplier stateUpdateSupplier) {
+ Objects.requireNonNull(storeSupplier, "store supplier must not be null");
+ Objects.requireNonNull(sourceName, "sourceName must not be null");
+ Objects.requireNonNull(topic, "topic must not be null");
+ Objects.requireNonNull(stateUpdateSupplier, "supplier must not be null");
+ Objects.requireNonNull(processorName, "processorName must not be null");
+ if (nodeFactories.containsKey(sourceName)) {
+ throw new TopologyBuilderException("Processor " + sourceName + " is already added.");
+ }
+ if (nodeFactories.containsKey(processorName)) {
+ throw new TopologyBuilderException("Processor " + processorName + " is already added.");
+ }
+ if (stateFactories.containsKey(storeSupplier.name()) || globalStateStores.containsKey(storeSupplier.name())) {
+ throw new TopologyBuilderException("StateStore " + storeSupplier.name() + " is already added.");
+ }
+ if (storeSupplier.loggingEnabled()) {
+ throw new TopologyBuilderException("StateStore " + storeSupplier.name() + " for global table must not have logging enabled.");
+ }
+ if (sourceName.equals(processorName)) {
+ throw new TopologyBuilderException("sourceName and processorName must be different.");
+ }
+
+ validateTopicNotAlreadyRegistered(topic);
+
+ globalTopics.add(topic);
+ final String[] topics = {topic};
+ nodeFactories.put(sourceName, new SourceNodeFactory(sourceName, topics, null, timestampExtractor, keyDeserializer, valueDeserializer));
+ nodeToSourceTopics.put(sourceName, Arrays.asList(topics));
+ nodeGrouper.add(sourceName);
+
+ final String[] predecessors = {sourceName};
+ final ProcessorNodeFactory nodeFactory = new ProcessorNodeFactory(processorName, predecessors, stateUpdateSupplier);
+ nodeFactory.addStateStore(storeSupplier.name());
+ nodeFactories.put(processorName, nodeFactory);
+ nodeGrouper.add(processorName);
+ nodeGrouper.unite(processorName, predecessors);
+
+ globalStateStores.put(storeSupplier.name(), storeSupplier.get());
+ connectSourceStoreAndTopic(storeSupplier.name(), topic);
+ }
+
+ private void validateTopicNotAlreadyRegistered(final String topic) {
+ if (sourceTopicNames.contains(topic) || globalTopics.contains(topic)) {
+ throw new TopologyBuilderException("Topic " + topic + " has already been registered by another source.");
+ }
+
+ for (final Pattern pattern : nodeToSourcePatterns.values()) {
+ if (pattern.matcher(topic).matches()) {
+ throw new TopologyBuilderException("Topic " + topic + " matches a Pattern already registered by another source.");
+ }
+ }
+ }
+
+ public final void addSource(final TopologyBuilder.AutoOffsetReset offsetReset,
+ final String name,
+ final TimestampExtractor timestampExtractor,
+ final Deserializer keyDeserializer,
+ final Deserializer valDeserializer,
+ final Pattern topicPattern) {
+ Objects.requireNonNull(topicPattern, "topicPattern can't be null");
+ Objects.requireNonNull(name, "name can't be null");
+
+ if (nodeFactories.containsKey(name)) {
+ throw new TopologyBuilderException("Processor " + name + " is already added.");
+ }
+
+ for (final String sourceTopicName : sourceTopicNames) {
+ if (topicPattern.matcher(sourceTopicName).matches()) {
+ throw new TopologyBuilderException("Pattern " + topicPattern + " will match a topic that has already been registered by another source.");
+ }
+ }
+
+ maybeAddToResetList(earliestResetPatterns, latestResetPatterns, offsetReset, topicPattern);
+
+ nodeFactories.put(name, new SourceNodeFactory(name, null, topicPattern, timestampExtractor, keyDeserializer, valDeserializer));
+ nodeToSourcePatterns.put(name, topicPattern);
+ nodeGrouper.add(name);
+ }
+
+ public final <K, V> void addSink(final String name,
+ final String topic,
+ final Serializer<K> keySerializer,
+ final Serializer<V> valSerializer,
+ final StreamPartitioner<? super K, ? super V> partitioner,
+ final String... predecessorNames) {
+ Objects.requireNonNull(name, "name must not be null");
+ Objects.requireNonNull(topic, "topic must not be null");
+ if (nodeFactories.containsKey(name)) {
+ throw new TopologyBuilderException("Processor " + name + " is already added.");
+ }
+
+ for (final String predecessor : predecessorNames) {
+ if (predecessor.equals(name)) {
+ throw new TopologyBuilderException("Processor " + name + " cannot be a predecessor of itself.");
+ }
+ if (!nodeFactories.containsKey(predecessor)) {
+ throw new TopologyBuilderException("Predecessor processor " + predecessor + " is not added yet.");
+ }
+ }
+
+ nodeFactories.put(name, new SinkNodeFactory<>(name, predecessorNames, topic, keySerializer, valSerializer, partitioner));
+ nodeToSinkTopic.put(name, topic);
+ nodeGrouper.add(name);
+ nodeGrouper.unite(name, predecessorNames);
+ }
+
+ public final void addProcessor(final String name,
+ final ProcessorSupplier supplier,
+ final String... predecessorNames) {
+ Objects.requireNonNull(name, "name must not be null");
+ Objects.requireNonNull(supplier, "supplier must not be null");
+ if (nodeFactories.containsKey(name)) {
+ throw new TopologyBuilderException("Processor " + name + " is already added.");
+ }
+
+ for (final String predecessor : predecessorNames) {
+ if (predecessor.equals(name)) {
+ throw new TopologyBuilderException("Processor " + name + " cannot be a predecessor of itself.");
+ }
+ if (!nodeFactories.containsKey(predecessor)) {
+ throw new TopologyBuilderException("Predecessor processor " + predecessor + " is not added yet.");
+ }
+ }
+
+ nodeFactories.put(name, new ProcessorNodeFactory(name, predecessorNames, supplier));
+ nodeGrouper.add(name);
+ nodeGrouper.unite(name, predecessorNames);
+ }
+
+ public final void addStateStore(final StateStoreSupplier supplier,
+ final String... processorNames) {
+ Objects.requireNonNull(supplier, "supplier can't be null");
+ if (stateFactories.containsKey(supplier.name())) {
+ throw new TopologyBuilderException("StateStore " + supplier.name() + " is already added.");
+ }
+
+ stateFactories.put(supplier.name(), new StateStoreFactory(supplier));
+
+ if (processorNames != null) {
+ for (final String processorName : processorNames) {
+ connectProcessorAndStateStore(processorName, supplier.name());
+ }
+ }
+ }
+
+ public final void connectProcessorAndStateStores(final String processorName,
+ final String... stateStoreNames) {
+ Objects.requireNonNull(processorName, "processorName can't be null");
+ if (stateStoreNames != null) {
+ for (final String stateStoreName : stateStoreNames) {
+ connectProcessorAndStateStore(processorName, stateStoreName);
+ }
+ }
+ }
+
+ public final void connectSourceStoreAndTopic(final String sourceStoreName,
+ final String topic) {
+ if (storeToChangelogTopic.containsKey(sourceStoreName)) {
+ throw new TopologyBuilderException("Source store " + sourceStoreName + " is already added.");
+ }
+ storeToChangelogTopic.put(sourceStoreName, topic);
+ }
+
+ public final void connectProcessors(final String... processorNames) {
+ if (processorNames.length < 2) {
+ throw new TopologyBuilderException("At least two processors need to participate in the connection.");
+ }
+
+ for (final String processorName : processorNames) {
+ if (!nodeFactories.containsKey(processorName)) {
+ throw new TopologyBuilderException("Processor " + processorName + " is not added yet.");
+ }
+ }
+
+ nodeGrouper.unite(processorNames[0], Arrays.copyOfRange(processorNames, 1, processorNames.length));
+ }
+
+ public final void addInternalTopic(final String topicName) {
+ Objects.requireNonNull(topicName, "topicName can't be null");
+ internalTopicNames.add(topicName);
+ }
+
+ public final void copartitionSources(final Collection<String> sourceNodes) {
+ copartitionSourceGroups.add(Collections.unmodifiableSet(new HashSet<>(sourceNodes)));
+ }
+
+ private void connectProcessorAndStateStore(final String processorName,
+ final String stateStoreName) {
+ if (!stateFactories.containsKey(stateStoreName)) {
+ throw new TopologyBuilderException("StateStore " + stateStoreName + " is not added yet.");
+ }
+ if (!nodeFactories.containsKey(processorName)) {
+ throw new TopologyBuilderException("Processor " + processorName + " is not added yet.");
+ }
+
+ final StateStoreFactory stateStoreFactory = stateFactories.get(stateStoreName);
+ final Iterator<String> iter = stateStoreFactory.users.iterator();
+ if (iter.hasNext()) {
+ final String user = iter.next();
+ nodeGrouper.unite(user, processorName);
+ }
+ stateStoreFactory.users.add(processorName);
+
+ final NodeFactory nodeFactory = nodeFactories.get(processorName);
+ if (nodeFactory instanceof ProcessorNodeFactory) {
+ final ProcessorNodeFactory processorNodeFactory = (ProcessorNodeFactory) nodeFactory;
+ processorNodeFactory.addStateStore(stateStoreName);
+ connectStateStoreNameToSourceTopicsOrPattern(stateStoreName, processorNodeFactory);
+ } else {
+ throw new TopologyBuilderException("cannot connect a state store " + stateStoreName + " to a source node or a sink node.");
+ }
+ }
+
+ private Set<SourceNodeFactory> findSourcesForProcessorPredecessors(final String[] predecessors) {
+ final Set<SourceNodeFactory> sourceNodes = new HashSet<>();
+ for (final String predecessor : predecessors) {
+ final NodeFactory nodeFactory = nodeFactories.get(predecessor);
+ if (nodeFactory instanceof SourceNodeFactory) {
+ sourceNodes.add((SourceNodeFactory) nodeFactory);
+ } else if (nodeFactory instanceof ProcessorNodeFactory) {
+ sourceNodes.addAll(findSourcesForProcessorPredecessors(((ProcessorNodeFactory) nodeFactory).predecessors));
+ }
+ }
+ return sourceNodes;
+ }
+
+ private void connectStateStoreNameToSourceTopicsOrPattern(final String stateStoreName,
+ final ProcessorNodeFactory processorNodeFactory) {
+ // we should never update the mapping from state store names to source topics if the store name already exists
+ // in the map; this scenario is possible, for example, that a state store underlying a source KTable is
+ // connecting to a join operator whose source topic is not the original KTable's source topic but an internal repartition topic.
+
+ if (stateStoreNameToSourceTopics.containsKey(stateStoreName) || stateStoreNameToSourceRegex.containsKey(stateStoreName)) {
+ return;
+ }
+
+ final Set<String> sourceTopics = new HashSet<>();
+ final Set<Pattern> sourcePatterns = new HashSet<>();
+ final Set<SourceNodeFactory> sourceNodesForPredecessor = findSourcesForProcessorPredecessors(processorNodeFactory.predecessors);
+
+ for (final SourceNodeFactory sourceNodeFactory : sourceNodesForPredecessor) {
+ if (sourceNodeFactory.pattern != null) {
+ sourcePatterns.add(sourceNodeFactory.pattern);
+ } else {
+ sourceTopics.addAll(sourceNodeFactory.topics);
+ }
+ }
+
+ if (!sourceTopics.isEmpty()) {
+ stateStoreNameToSourceTopics.put(stateStoreName,
+ Collections.unmodifiableSet(sourceTopics));
+ }
+
+ if (!sourcePatterns.isEmpty()) {
+ stateStoreNameToSourceRegex.put(stateStoreName,
+ Collections.unmodifiableSet(sourcePatterns));
+ }
+
+ }
+
+ private <T> void maybeAddToResetList(final Collection<T> earliestResets,
+ final Collection<T> latestResets,
+ final TopologyBuilder.AutoOffsetReset offsetReset,
+ final T item) {
+ if (offsetReset != null) {
+ switch (offsetReset) {
+ case EARLIEST:
+ earliestResets.add(item);
+ break;
+ case LATEST:
+ latestResets.add(item);
+ break;
+ default:
+ throw new TopologyBuilderException(String.format("Unrecognized reset format %s", offsetReset));
+ }
+ }
+ }
+
+ public synchronized Map<Integer, Set<String>> nodeGroups() {
+ if (nodeGroups == null) {
+ nodeGroups = makeNodeGroups();
+ }
+ return nodeGroups;
+ }
+
+ private Map<Integer, Set<String>> makeNodeGroups() {
+ final HashMap<Integer, Set<String>> nodeGroups = new LinkedHashMap<>();
+ final HashMap<String, Set<String>> rootToNodeGroup = new HashMap<>();
+
+ int nodeGroupId = 0;
+
+ // Go through source nodes first. This makes the group id assignment easy to predict in tests
+ final HashSet<String> allSourceNodes = new HashSet<>(nodeToSourceTopics.keySet());
+ allSourceNodes.addAll(nodeToSourcePatterns.keySet());
+
+ for (final String nodeName : Utils.sorted(allSourceNodes)) {
+ final String root = nodeGrouper.root(nodeName);
+ Set<String> nodeGroup = rootToNodeGroup.get(root);
+ if (nodeGroup == null) {
+ nodeGroup = new HashSet<>();
+ rootToNodeGroup.put(root, nodeGroup);
+ nodeGroups.put(nodeGroupId++, nodeGroup);
+ }
+ nodeGroup.add(nodeName);
+ }
+
+ // Go through non-source nodes
+ for (final String nodeName : Utils.sorted(nodeFactories.keySet())) {
+ if (!nodeToSourceTopics.containsKey(nodeName)) {
+ final String root = nodeGrouper.root(nodeName);
+ Set<String> nodeGroup = rootToNodeGroup.get(root);
+ if (nodeGroup == null) {
+ nodeGroup = new HashSet<>();
+ rootToNodeGroup.put(root, nodeGroup);
+ nodeGroups.put(nodeGroupId++, nodeGroup);
+ }
+ nodeGroup.add(nodeName);
+ }
+ }
+
+ return nodeGroups;
+ }
+
+ public synchronized ProcessorTopology build(final Integer topicGroupId) {
+ final Set<String> nodeGroup;
+ if (topicGroupId != null) {
+ nodeGroup = nodeGroups().get(topicGroupId);
+ } else {
+ // when topicGroupId is null, we build the full topology minus the global groups
+ final Set<String> globalNodeGroups = globalNodeGroups();
+ final Collection<Set<String>> values = nodeGroups().values();
+ nodeGroup = new HashSet<>();
+ for (final Set<String> value : values) {
+ nodeGroup.addAll(value);
+ }
+ nodeGroup.removeAll(globalNodeGroups);
+
+
+ }
+ return build(nodeGroup);
+ }
+
+ /**
+ * Builds the topology for any global state stores
+ * @return ProcessorTopology
+ */
+ public synchronized ProcessorTopology buildGlobalStateTopology() {
+ final Set<String> globalGroups = globalNodeGroups();
+ if (globalGroups.isEmpty()) {
+ return null;
+ }
+ return build(globalGroups);
+ }
+
+ private Set<String> globalNodeGroups() {
+ final Set<String> globalGroups = new HashSet<>();
+ for (final Map.Entry<Integer, Set<String>> nodeGroup : nodeGroups().entrySet()) {
+ final Set<String> nodes = nodeGroup.getValue();
+ for (final String node : nodes) {
+ if (isGlobalSource(node)) {
+ globalGroups.addAll(nodes);
+ }
+ }
+ }
+ return globalGroups;
+ }
+
+ private ProcessorTopology build(final Set<String> nodeGroup) {
+ final List<ProcessorNode> processorNodes = new ArrayList<>(nodeFactories.size());
+ final Map<String, ProcessorNode> processorMap = new HashMap<>();
+ final Map<String, SourceNode> topicSourceMap = new HashMap<>();
+ final Map<String, SinkNode> topicSinkMap = new HashMap<>();
+ final Map<String, StateStore> stateStoreMap = new LinkedHashMap<>();
+
+ // create processor nodes in a topological order ("nodeFactories" is already topologically sorted)
+ for (final NodeFactory factory : nodeFactories.values()) {
+ if (nodeGroup == null || nodeGroup.contains(factory.name)) {
+ final ProcessorNode node = factory.build();
+ processorNodes.add(node);
+ processorMap.put(node.name(), node);
+
+ if (factory instanceof ProcessorNodeFactory) {
+ for (final String predecessor : ((ProcessorNodeFactory) factory).predecessors) {
+ final ProcessorNode<?, ?> predecessorNode = processorMap.get(predecessor);
+ predecessorNode.addChild(node);
+ }
+ for (final String stateStoreName : ((ProcessorNodeFactory) factory).stateStoreNames) {
+ if (!stateStoreMap.containsKey(stateStoreName)) {
+ final StateStore stateStore;
+
+ if (stateFactories.containsKey(stateStoreName)) {
+ final StateStoreSupplier supplier = stateFactories.get(stateStoreName).supplier;
+ stateStore = supplier.get();
+
+ // remember the changelog topic if this state store is change-logging enabled
+ if (supplier.loggingEnabled() && !storeToChangelogTopic.containsKey(stateStoreName)) {
+ final String changelogTopic = ProcessorStateManager.storeChangelogTopic(applicationId, stateStoreName);
+ storeToChangelogTopic.put(stateStoreName, changelogTopic);
+ }
+ } else {
+ stateStore = globalStateStores.get(stateStoreName);
+ }
+
+ stateStoreMap.put(stateStoreName, stateStore);
+ }
+ }
+ } else if (factory instanceof SourceNodeFactory) {
+ final SourceNodeFactory sourceNodeFactory = (SourceNodeFactory) factory;
+ final List<String> topics = (sourceNodeFactory.pattern != null) ?
+ sourceNodeFactory.getTopics(subscriptionUpdates.getUpdates()) :
+ sourceNodeFactory.topics;
+
+ for (final String topic : topics) {
+ if (internalTopicNames.contains(topic)) {
+ // prefix the internal topic name with the application id
+ topicSourceMap.put(decorateTopic(topic), (SourceNode) node);
+ } else {
+ topicSourceMap.put(topic, (SourceNode) node);
+ }
+ }
+ } else if (factory instanceof SinkNodeFactory) {
+ final SinkNodeFactory sinkNodeFactory = (SinkNodeFactory) factory;
+
+ for (final String predecessor : sinkNodeFactory.predecessors) {
+ processorMap.get(predecessor).addChild(node);
+ if (internalTopicNames.contains(sinkNodeFactory.topic)) {
+ // prefix the internal topic name with the application id
+ topicSinkMap.put(decorateTopic(sinkNodeFactory.topic), (SinkNode) node);
+ } else {
+ topicSinkMap.put(sinkNodeFactory.topic, (SinkNode) node);
+ }
+ }
+ } else {
+ throw new TopologyBuilderException("Unknown definition class: " + factory.getClass().getName());
+ }
+ }
+ }
+
+ return new ProcessorTopology(processorNodes, topicSourceMap, topicSinkMap, new ArrayList<>(stateStoreMap.values()), storeToChangelogTopic, new ArrayList<>(globalStateStores.values()));
+ }
+
+ /**
+ * Get any global {@link StateStore}s that are part of the
+ * topology
+ * @return map containing all global {@link StateStore}s
+ */
+ public Map<String, StateStore> globalStateStores() {
+ return Collections.unmodifiableMap(globalStateStores);
+ }
+
+ /**
+ * Returns the map of topic groups keyed by the group id.
+ * A topic group is a group of topics in the same task.
+ *
+ * @return groups of topic names
+ */
+ public synchronized Map<Integer, TopologyBuilder.TopicsInfo> topicGroups() {
+ final Map<Integer, TopologyBuilder.TopicsInfo> topicGroups = new LinkedHashMap<>();
+
+ if (nodeGroups == null) {
+ nodeGroups = makeNodeGroups();
+ }
+
+ for (final Map.Entry<Integer, Set<String>> entry : nodeGroups.entrySet()) {
+ final Set<String> sinkTopics = new HashSet<>();
+ final Set<String> sourceTopics = new HashSet<>();
+ final Map<String, InternalTopicConfig> internalSourceTopics = new HashMap<>();
+ final Map<String, InternalTopicConfig> stateChangelogTopics = new HashMap<>();
+ for (final String node : entry.getValue()) {
+ // if the node is a source node, add to the source topics
+ final List<String> topics = nodeToSourceTopics.get(node);
+ if (topics != null) {
+ // if some of the topics are internal, add them to the internal topics
+ for (final String topic : topics) {
+ // skip global topic as they don't need partition assignment
+ if (globalTopics.contains(topic)) {
+ continue;
+ }
+ if (internalTopicNames.contains(topic)) {
+ // prefix the internal topic name with the application id
+ final String internalTopic = decorateTopic(topic);
+ internalSourceTopics.put(internalTopic, new InternalTopicConfig(internalTopic,
+ Collections.singleton(InternalTopicConfig.CleanupPolicy.delete),
+ Collections.<String, String>emptyMap()));
+ sourceTopics.add(internalTopic);
+ } else {
+ sourceTopics.add(topic);
+ }
+ }
+ }
+
+ // if the node is a sink node, add to the sink topics
+ final String topic = nodeToSinkTopic.get(node);
+ if (topic != null) {
+ if (internalTopicNames.contains(topic)) {
+ // prefix the change log topic name with the application id
+ sinkTopics.add(decorateTopic(topic));
+ } else {
+ sinkTopics.add(topic);
+ }
+ }
+
+ // if the node is connected to a state, add to the state topics
+ for (final StateStoreFactory stateFactory : stateFactories.values()) {
+ final StateStoreSupplier supplier = stateFactory.supplier;
+ if (supplier.loggingEnabled() && stateFactory.users.contains(node)) {
+ final String name = ProcessorStateManager.storeChangelogTopic(applicationId, supplier.name());
+ final InternalTopicConfig internalTopicConfig = createInternalTopicConfig(supplier, name);
+ stateChangelogTopics.put(name, internalTopicConfig);
+ }
+ }
+ }
+ if (!sourceTopics.isEmpty()) {
+ topicGroups.put(entry.getKey(), new TopologyBuilder.TopicsInfo(
+ Collections.unmodifiableSet(sinkTopics),
+ Collections.unmodifiableSet(sourceTopics),
+ Collections.unmodifiableMap(internalSourceTopics),
+ Collections.unmodifiableMap(stateChangelogTopics)));
+ }
+ }
+
+ return Collections.unmodifiableMap(topicGroups);
+ }
+
+ private void setRegexMatchedTopicsToSourceNodes() {
+ if (subscriptionUpdates.hasUpdates()) {
+ for (final Map.Entry<String, Pattern> stringPatternEntry : nodeToSourcePatterns.entrySet()) {
+ final SourceNodeFactory sourceNode = (SourceNodeFactory) nodeFactories.get(stringPatternEntry.getKey());
+ //need to update nodeToSourceTopics with topics matched from given regex
+ nodeToSourceTopics.put(stringPatternEntry.getKey(), sourceNode.getTopics(subscriptionUpdates.getUpdates()));
+ log.debug("nodeToSourceTopics {}", nodeToSourceTopics);
+ }
+ }
+ }
+
+ private void setRegexMatchedTopicToStateStore() {
+ if (subscriptionUpdates.hasUpdates()) {
+ for (final Map.Entry<String, Set<Pattern>> storePattern : stateStoreNameToSourceRegex.entrySet()) {
+ final Set<String> updatedTopicsForStateStore = new HashSet<>();
+ for (final String subscriptionUpdateTopic : subscriptionUpdates.getUpdates()) {
+ for (final Pattern pattern : storePattern.getValue()) {
+ if (pattern.matcher(subscriptionUpdateTopic).matches()) {
+ updatedTopicsForStateStore.add(subscriptionUpdateTopic);
+ }
+ }
+ }
+ if (!updatedTopicsForStateStore.isEmpty()) {
+ final Collection<String> storeTopics = stateStoreNameToSourceTopics.get(storePattern.getKey());
+ if (storeTopics != null) {
+ updatedTopicsForStateStore.addAll(storeTopics);
+ }
+ stateStoreNameToSourceTopics.put(storePattern.getKey(), Collections.unmodifiableSet(updatedTopicsForStateStore));
+ }
+ }
+ }
+ }
+
+ private InternalTopicConfig createInternalTopicConfig(final StateStoreSupplier<?> supplier,
+ final String name) {
+ if (!(supplier instanceof WindowStoreSupplier)) {
+ return new InternalTopicConfig(name, Collections.singleton(InternalTopicConfig.CleanupPolicy.compact), supplier.logConfig());
+ }
+
+ final WindowStoreSupplier windowStoreSupplier = (WindowStoreSupplier) supplier;
+ final InternalTopicConfig config = new InternalTopicConfig(name,
+ Utils.mkSet(InternalTopicConfig.CleanupPolicy.compact,
+ InternalTopicConfig.CleanupPolicy.delete),
+ supplier.logConfig());
+ config.setRetentionMs(windowStoreSupplier.retentionPeriod());
+ return config;
+ }
+
+ public synchronized Pattern earliestResetTopicsPattern() {
+ final List<String> topics = maybeDecorateInternalSourceTopics(earliestResetTopics);
+ final Pattern earliestPattern = buildPatternForOffsetResetTopics(topics, earliestResetPatterns);
+
+ ensureNoRegexOverlap(earliestPattern, latestResetPatterns, latestResetTopics);
+
+ return earliestPattern;
+ }
+
+ public synchronized Pattern latestResetTopicsPattern() {
+ final List<String> topics = maybeDecorateInternalSourceTopics(latestResetTopics);
+ final Pattern latestPattern = buildPatternForOffsetResetTopics(topics, latestResetPatterns);
+
+ ensureNoRegexOverlap(latestPattern, earliestResetPatterns, earliestResetTopics);
+
+ return latestPattern;
+ }
+
+ private void ensureNoRegexOverlap(final Pattern builtPattern,
+ final Set<Pattern> otherPatterns,
+ final Set<String> otherTopics) {
+ for (final Pattern otherPattern : otherPatterns) {
+ if (builtPattern.pattern().contains(otherPattern.pattern())) {
+ throw new TopologyBuilderException(
+ String.format("Found overlapping regex [%s] against [%s] for a KStream with auto offset resets",
+ otherPattern.pattern(),
+ builtPattern.pattern()));
+ }
+ }
+
+ for (final String otherTopic : otherTopics) {
+ if (builtPattern.matcher(otherTopic).matches()) {
+ throw new TopologyBuilderException(
+ String.format("Found overlapping regex [%s] matching topic [%s] for a KStream with auto offset resets",
+ builtPattern.pattern(),
+ otherTopic));
+ }
+ }
+ }
+
+ private static Pattern buildPatternForOffsetResetTopics(final Collection<String> sourceTopics,
+ final Collection<Pattern> sourcePatterns) {
+ final StringBuilder builder = new StringBuilder();
+
+ for (final String topic : sourceTopics) {
+ builder.append(topic).append("|");
+ }
+
+ for (final Pattern sourcePattern : sourcePatterns) {
+ builder.append(sourcePattern.pattern()).append("|");
+ }
+
+ if (builder.length() > 0) {
+ builder.setLength(builder.length() - 1);
+ return Pattern.compile(builder.toString());
+ }
+
+ return EMPTY_ZERO_LENGTH_PATTERN;
+ }
+
+ public Map<String, List<String>> stateStoreNameToSourceTopics() {
+ final Map<String, List<String>> results = new HashMap<>();
+ for (final Map.Entry<String, Set<String>> entry : stateStoreNameToSourceTopics.entrySet()) {
+ results.put(entry.getKey(), maybeDecorateInternalSourceTopics(entry.getValue()));
+ }
+ return results;
+ }
+
+ public synchronized Collection<Set<String>> copartitionGroups() {
+ final List<Set<String>> list = new ArrayList<>(copartitionSourceGroups.size());
+ for (final Set<String> nodeNames : copartitionSourceGroups) {
+ final Set<String> copartitionGroup = new HashSet<>();
+ for (final String node : nodeNames) {
+ final List<String> topics = nodeToSourceTopics.get(node);
+ if (topics != null) {
+ copartitionGroup.addAll(maybeDecorateInternalSourceTopics(topics));
+ }
+ }
+ list.add(Collections.unmodifiableSet(copartitionGroup));
+ }
+ return Collections.unmodifiableList(list);
+ }
+
+ private List<String> maybeDecorateInternalSourceTopics(final Collection<String> sourceTopics) {
+ final List<String> decoratedTopics = new ArrayList<>();
+ for (final String topic : sourceTopics) {
+ if (internalTopicNames.contains(topic)) {
+ decoratedTopics.add(decorateTopic(topic));
+ } else {
+ decoratedTopics.add(topic);
+ }
+ }
+ return decoratedTopics;
+ }
+
+ private String decorateTopic(final String topic) {
+ if (applicationId == null) {
+ throw new TopologyBuilderException("there are internal topics and "
+ + "applicationId hasn't been set. Call "
+ + "setApplicationId first");
+ }
+
+ return applicationId + "-" + topic;
+ }
+
+ public SubscriptionUpdates subscriptionUpdates() {
+ return subscriptionUpdates;
+ }
+
+ public synchronized Pattern sourceTopicPattern() {
+ if (topicPattern == null) {
+ final List<String> allSourceTopics = new ArrayList<>();
+ if (!nodeToSourceTopics.isEmpty()) {
+ for (final List<String> topics : nodeToSourceTopics.values()) {
+ allSourceTopics.addAll(maybeDecorateInternalSourceTopics(topics));
+ }
+ }
+ Collections.sort(allSourceTopics);
+
+ topicPattern = buildPatternForOffsetResetTopics(allSourceTopics, nodeToSourcePatterns.values());
+ }
+
+ return topicPattern;
+ }
+
+ public synchronized void updateSubscriptions(final SubscriptionUpdates subscriptionUpdates,
+ final String threadId) {
+ log.debug("stream-thread [{}] updating builder with {} topic(s) with possible matching regex subscription(s)",
+ threadId, subscriptionUpdates);
+ this.subscriptionUpdates = subscriptionUpdates;
+ setRegexMatchedTopicsToSourceNodes();
+ setRegexMatchedTopicToStateStore();
+ }
+
+ private boolean isGlobalSource(final String nodeName) {
+ final NodeFactory nodeFactory = nodeFactories.get(nodeName);
+
+ if (nodeFactory instanceof SourceNodeFactory) {
+ final List<String> topics = ((SourceNodeFactory) nodeFactory).topics;
+ if (topics != null && topics.size() == 1 && globalTopics.contains(topics.get(0))) {
+ return true;
+ }
+ }
+
+ return false;
+ }
+
+ public TopologyDescription describe() {
+ final TopologyDescription description = new TopologyDescription();
+
+ describeSubtopologies(description);
+ describeGlobalStores(description);
+
+ return description;
+ }
+
+ private void describeSubtopologies(final TopologyDescription description) {
+ for (final Map.Entry<Integer, Set<String>> nodeGroup : makeNodeGroups().entrySet()) {
+
+ final Set<String> allNodesOfGroups = nodeGroup.getValue();
+ final boolean isNodeGroupOfGlobalStores = nodeGroupContainsGlobalSourceNode(allNodesOfGroups);
+
+ if (!isNodeGroupOfGlobalStores) {
+ describeSubtopology(description, nodeGroup.getKey(), allNodesOfGroups);
+ }
+ }
+ }
+
+ private boolean nodeGroupContainsGlobalSourceNode(final Set<String> allNodesOfGroups) {
+ for (final String node : allNodesOfGroups) {
+ if (isGlobalSource(node)) {
+ return true;
+ }
+ }
+ return false;
+ }
+
+ private void describeSubtopology(final TopologyDescription description,
+ final Integer subtopologyId,
+ final Set<String> nodeNames) {
+
+ final HashMap<String, AbstractNode> nodesByName = new HashMap<>();
+
+ // add all nodes
+ for (final String nodeName : nodeNames) {
+ nodesByName.put(nodeName, nodeFactories.get(nodeName).describe());
+ }
+
+ // connect each node to its predecessors and successors
+ for (final AbstractNode node : nodesByName.values()) {
+ for (final String predecessorName : nodeFactories.get(node.name()).predecessors) {
+ final AbstractNode predecessor = nodesByName.get(predecessorName);
+ node.addPredecessor(predecessor);
+ predecessor.addSuccessor(node);
+ }
+ }
+
+ description.addSubtopology(new Subtopology(
+ subtopologyId,
+ new HashSet<TopologyDescription.Node>(nodesByName.values())));
+ }
+
+ private void describeGlobalStores(final TopologyDescription description) {
+ for (final Map.Entry<Integer, Set<String>> nodeGroup : makeNodeGroups().entrySet()) {
+ final Set<String> nodes = nodeGroup.getValue();
+
+ final Iterator<String> it = nodes.iterator();
+ while (it.hasNext()) {
+ final String node = it.next();
+
+ if (isGlobalSource(node)) {
+ // we found a GlobalStore node group; those contain exactly two node: {sourceNode,processorNode}
+ it.remove(); // remove sourceNode from group
+ final String processorNode = nodes.iterator().next(); // get remaining processorNode
+
+ description.addGlobalStore(new GlobalStore(
+ node,
+ processorNode,
+ ((ProcessorNodeFactory) nodeFactories.get(processorNode)).stateStoreNames.iterator().next(),
+ nodeToSourceTopics.get(node).get(0)
+ ));
+ break;
+ }
+ }
+ }
+ }
+
+ public final static class GlobalStore implements TopologyDescription.GlobalStore {
+ private final Source source;
+ private final Processor processor;
+
+ public GlobalStore(final String sourceName,
+ final String processorName,
+ final String storeName,
+ final String topicName) {
+ source = new Source(sourceName, topicName);
+ processor = new Processor(processorName, Collections.singleton(storeName));
+ source.successors.add(processor);
+ processor.predecessors.add(source);
+ }
+
+ @Override
+ public TopologyDescription.Source source() {
+ return source;
+ }
+
+ @Override
+ public TopologyDescription.Processor processor() {
+ return processor;
+ }
+
+ @Override
+ public String toString() {
+ return "GlobalStore: " + source.name + "(topic: " + source.topics + ") -> "
+ + processor.name + "(store: " + processor.stores.iterator().next() + ")\n";
+ }
+
+ @Override
+ public boolean equals(final Object o) {
+ if (this == o) {
+ return true;
+ }
+ if (o == null || getClass() != o.getClass()) {
+ return false;
+ }
+
+ final GlobalStore that = (GlobalStore) o;
+ return source.equals(that.source)
+ && processor.equals(that.processor);
+ }
+
+ @Override
+ public int hashCode() {
+ return Objects.hash(source, processor);
+ }
+ }
+
+ public abstract static class AbstractNode implements TopologyDescription.Node {
+ final String name;
+ final Set<TopologyDescription.Node> predecessors = new HashSet<>();
+ final Set<TopologyDescription.Node> successors = new HashSet<>();
+
+ AbstractNode(final String name) {
+ this.name = name;
+ }
+
+ @Override
+ public String name() {
+ return name;
+ }
+
+ @Override
+ public Set<TopologyDescription.Node> predecessors() {
+ return Collections.unmodifiableSet(predecessors);
+ }
+
+ @Override
+ public Set<TopologyDescription.Node> successors() {
+ return Collections.unmodifiableSet(successors);
+ }
+
+ public void addPredecessor(final TopologyDescription.Node predecessor) {
+ predecessors.add(predecessor);
+ }
+
+ public void addSuccessor(final TopologyDescription.Node successor) {
+ successors.add(successor);
+ }
+ }
+
+ public final static class Source extends AbstractNode implements TopologyDescription.Source {
+ private final String topics;
+
+ public Source(final String name,
+ final String topics) {
+ super(name);
+ this.topics = topics;
+ }
+
+ @Override
+ public String topics() {
+ return topics;
+ }
+
+ @Override
+ public void addPredecessor(final TopologyDescription.Node predecessor) {
+ throw new UnsupportedOperationException("Sources don't have predecessors.");
+ }
+
+ @Override
+ public String toString() {
+ return "Source: " + name + "(topics: " + topics + ") --> " + nodeNames(successors);
+ }
+
+ @Override
+ public boolean equals(final Object o) {
+ if (this == o) {
+ return true;
+ }
+ if (o == null || getClass() != o.getClass()) {
+ return false;
+ }
+
+ final Source source = (Source) o;
+ // omit successor to avoid infinite loops
+ return name.equals(source.name)
+ && topics.equals(source.topics);
+ }
+
+ @Override
+ public int hashCode() {
+ // omit successor as it might change and alter the hash code
+ return Objects.hash(name, topics);
+ }
+ }
+
+ public final static class Processor extends AbstractNode implements TopologyDescription.Processor {
+ private final Set<String> stores;
+
+ public Processor(final String name,
+ final Set<String> stores) {
+ super(name);
+ this.stores = stores;
+ }
+
+ @Override
+ public Set<String> stores() {
+ return Collections.unmodifiableSet(stores);
+ }
+
+ @Override
+ public String toString() {
+ return "Processor: " + name + "(stores: " + stores + ") --> " + nodeNames(successors) + " <-- " + nodeNames(predecessors);
+ }
+
+ @Override
+ public boolean equals(final Object o) {
+ if (this == o) {
+ return true;
+ }
+ if (o == null || getClass() != o.getClass()) {
+ return false;
+ }
+
+ final Processor processor = (Processor) o;
+ // omit successor to avoid infinite loops
+ return name.equals(processor.name)
+ && stores.equals(processor.stores)
+ && predecessors.equals(processor.predecessors);
+ }
+
+ @Override
+ public int hashCode() {
+ // omit successor as it might change and alter the hash code
+ return Objects.hash(name, stores);
+ }
+ }
+
+ public final static class Sink extends AbstractNode implements TopologyDescription.Sink {
+ private final String topic;
+
+ public Sink(final String name,
+ final String topic) {
+ super(name);
+ this.topic = topic;
+ }
+
+ @Override
+ public String topic() {
+ return topic;
+ }
+
+ @Override
+ public void addSuccessor(final TopologyDescription.Node successor) {
+ throw new UnsupportedOperationException("Sinks don't have successors.");
+ }
+
+ @Override
+ public String toString() {
+ return "Sink: " + name + "(topic: " + topic + ") <-- " + nodeNames(predecessors);
+ }
+
+ @Override
+ public boolean equals(final Object o) {
+ if (this == o) {
+ return true;
+ }
+ if (o == null || getClass() != o.getClass()) {
+ return false;
+ }
+
+ final Sink sink = (Sink) o;
+ return name.equals(sink.name)
+ && topic.equals(sink.topic)
+ && predecessors.equals(sink.predecessors);
+ }
+
+ @Override
+ public int hashCode() {
+ // omit predecessors as it might change and alter the hash code
+ return Objects.hash(name, topic);
+ }
+ }
+
+ public final static class Subtopology implements org.apache.kafka.streams.TopologyDescription.Subtopology {
+ private final int id;
+ private final Set<org.apache.kafka.streams.TopologyDescription.Node> nodes;
+
+ public Subtopology(final int id,
+ final Set<org.apache.kafka.streams.TopologyDescription.Node> nodes) {
+ this.id = id;
+ this.nodes = nodes;
+ }
+
+ @Override
+ public int id() {
+ return id;
+ }
+
+ @Override
+ public Set<org.apache.kafka.streams.TopologyDescription.Node> nodes() {
+ return Collections.unmodifiableSet(nodes);
+ }
+
+ @Override
+ public String toString() {
+ return "Sub-topology: " + id + "\n" + nodesAsString();
+ }
+
+ private String nodesAsString() {
+ final StringBuilder sb = new StringBuilder();
+ for (final org.apache.kafka.streams.TopologyDescription.Node node : nodes) {
+ sb.append(" ");
+ sb.append(node);
+ sb.append('\n');
+ }
+ return sb.toString();
+ }
+
+ @Override
+ public boolean equals(final Object o) {
+ if (this == o) {
+ return true;
+ }
+ if (o == null || getClass() != o.getClass()) {
+ return false;
+ }
+
+ final Subtopology that = (Subtopology) o;
+ return id == that.id
+ && nodes.equals(that.nodes);
+ }
+
+ @Override
+ public int hashCode() {
+ return Objects.hash(id, nodes);
+ }
+ }
+
+ public final static class TopologyDescription implements org.apache.kafka.streams.TopologyDescription {
+ private final Set<org.apache.kafka.streams.TopologyDescription.Subtopology> subtopologies = new HashSet<>();
+ private final Set<org.apache.kafka.streams.TopologyDescription.GlobalStore> globalStores = new HashSet<>();
+
+ public void addSubtopology(final org.apache.kafka.streams.TopologyDescription.Subtopology subtopology) {
+ subtopologies.add(subtopology);
+ }
+
+ public void addGlobalStore(final org.apache.kafka.streams.TopologyDescription.GlobalStore globalStore) {
+ globalStores.add(globalStore);
+ }
+
+ @Override
+ public Set<org.apache.kafka.streams.TopologyDescription.Subtopology> subtopologies() {
+ return Collections.unmodifiableSet(subtopologies);
+ }
+
+ @Override
+ public Set<org.apache.kafka.streams.TopologyDescription.GlobalStore> globalStores() {
+ return Collections.unmodifiableSet(globalStores);
+ }
+
+ @Override
+ public String toString() {
+ return subtopologiesAsString() + globalStoresAsString();
+ }
+
+ private String subtopologiesAsString() {
+ final StringBuilder sb = new StringBuilder();
+ sb.append("Sub-topologies: \n");
+ if (subtopologies.isEmpty()) {
+ sb.append(" none\n");
+ } else {
+ for (final org.apache.kafka.streams.TopologyDescription.Subtopology st : subtopologies) {
+ sb.append(" ");
+ sb.append(st);
+ }
+ }
+ return sb.toString();
+ }
+
+ private String globalStoresAsString() {
+ final StringBuilder sb = new StringBuilder();
+ sb.append("Global Stores:\n");
+ if (globalStores.isEmpty()) {
+ sb.append(" none\n");
+ } else {
+ for (final org.apache.kafka.streams.TopologyDescription.GlobalStore gs : globalStores) {
+ sb.append(" ");
+ sb.append(gs);
+ }
+ }
+ return sb.toString();
+ }
+
+ @Override
+ public boolean equals(final Object o) {
+ if (this == o) {
+ return true;
+ }
+ if (o == null || getClass() != o.getClass()) {
+ return false;
+ }
+
+ final TopologyDescription that = (TopologyDescription) o;
+ return subtopologies.equals(that.subtopologies)
+ && globalStores.equals(that.globalStores);
+ }
+
+ @Override
+ public int hashCode() {
+ return Objects.hash(subtopologies, globalStores);
+ }
+
+ }
+
+ private static String nodeNames(final Set<TopologyDescription.Node> nodes) {
+ final StringBuilder sb = new StringBuilder();
+ if (!nodes.isEmpty()) {
+ for (final TopologyDescription.Node n : nodes) {
+ sb.append(n.name());
+ sb.append(", ");
+ }
+ sb.deleteCharAt(sb.length() - 1);
+ sb.deleteCharAt(sb.length() - 1);
+ } else {
+ return "none";
+ }
+ return sb.toString();
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/kafka/blob/5d798511/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamPartitionAssignor.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamPartitionAssignor.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamPartitionAssignor.java
index 91856b0..e8b6a1a 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamPartitionAssignor.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamPartitionAssignor.java
@@ -356,8 +356,8 @@ public class StreamPartitionAssignor implements PartitionAssignor, Configurable
if (otherSinkTopics.contains(topicName)) {
// if this topic is one of the sink topics of this topology,
// use the maximum of all its source topic partitions as the number of partitions
- for (String sourceTopicName : otherTopicsInfo.sourceTopics) {
- Integer numPartitionsCandidate;
+ for (final String sourceTopicName : otherTopicsInfo.sourceTopics) {
+ final Integer numPartitionsCandidate;
// It is possible the sourceTopic is another internal topic, i.e,
// map().join().join(map())
if (repartitionTopicMetadata.containsKey(sourceTopicName)) {
@@ -377,10 +377,11 @@ public class StreamPartitionAssignor implements PartitionAssignor, Configurable
}
// if we still have not find the right number of partitions,
// another iteration is needed
- if (numPartitions == UNKNOWN)
+ if (numPartitions == UNKNOWN) {
numPartitionsNeeded = true;
- else
+ } else {
repartitionTopicMetadata.get(topicName).numPartitions = numPartitions;
+ }
}
}
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/5d798511/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
index eb75b14..f10bf41 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
@@ -34,9 +34,9 @@ import org.apache.kafka.common.metrics.Metrics;
import org.apache.kafka.common.metrics.Sensor;
import org.apache.kafka.common.metrics.stats.Avg;
import org.apache.kafka.common.metrics.stats.Count;
-import org.apache.kafka.common.metrics.stats.Sum;
import org.apache.kafka.common.metrics.stats.Max;
import org.apache.kafka.common.metrics.stats.Rate;
+import org.apache.kafka.common.metrics.stats.Sum;
import org.apache.kafka.common.utils.Time;
import org.apache.kafka.streams.KafkaClientSupplier;
import org.apache.kafka.streams.StreamsConfig;
@@ -45,7 +45,6 @@ import org.apache.kafka.streams.errors.StreamsException;
import org.apache.kafka.streams.errors.TaskIdFormatException;
import org.apache.kafka.streams.processor.PartitionGrouper;
import org.apache.kafka.streams.processor.TaskId;
-import org.apache.kafka.streams.processor.TopologyBuilder;
import org.apache.kafka.streams.state.HostInfo;
import org.apache.kafka.streams.state.internals.ThreadCache;
import org.slf4j.Logger;
@@ -398,7 +397,7 @@ public class StreamThread extends Thread {
public final UUID processId;
protected final StreamsConfig config;
- protected final TopologyBuilder builder;
+ protected final InternalTopologyBuilder builder;
Producer<byte[], byte[]> threadProducer;
private final KafkaClientSupplier clientSupplier;
protected final Consumer<byte[], byte[]> consumer;
@@ -441,7 +440,7 @@ public class StreamThread extends Thread {
final ConsumerRebalanceListener rebalanceListener;
private final static int UNLIMITED_RECORDS = -1;
- public StreamThread(final TopologyBuilder builder,
+ public StreamThread(final InternalTopologyBuilder builder,
final StreamsConfig config,
final KafkaClientSupplier clientSupplier,
final String applicationId,
http://git-wip-us.apache.org/repos/asf/kafka/blob/5d798511/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsMetadataState.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsMetadataState.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsMetadataState.java
index bb74b48..3fd1613 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsMetadataState.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsMetadataState.java
@@ -23,7 +23,6 @@ import org.apache.kafka.common.serialization.Serializer;
import org.apache.kafka.streams.KafkaStreams;
import org.apache.kafka.streams.processor.StateStore;
import org.apache.kafka.streams.processor.StreamPartitioner;
-import org.apache.kafka.streams.processor.TopologyBuilder;
import org.apache.kafka.streams.state.HostInfo;
import org.apache.kafka.streams.state.StreamsMetadata;
@@ -44,14 +43,14 @@ import java.util.Set;
*/
public class StreamsMetadataState {
public static final HostInfo UNKNOWN_HOST = new HostInfo("unknown", -1);
- private final TopologyBuilder builder;
+ private final InternalTopologyBuilder builder;
private final List<StreamsMetadata> allMetadata = new ArrayList<>();
private final Set<String> globalStores;
private final HostInfo thisHost;
private Cluster clusterMetadata;
private StreamsMetadata myMetadata;
- public StreamsMetadataState(final TopologyBuilder builder, final HostInfo thisHost) {
+ public StreamsMetadataState(final InternalTopologyBuilder builder, final HostInfo thisHost) {
this.builder = builder;
this.globalStores = builder.globalStateStores().keySet();
this.thisHost = thisHost;