You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by gu...@apache.org on 2017/07/24 18:03:33 UTC
[4/5] kafka git commit: KAFKA-3856 (KIP-120) step two: extract
internal functions from public facing TopologyBuilder class
http://git-wip-us.apache.org/repos/asf/kafka/blob/5d798511/streams/src/main/java/org/apache/kafka/streams/processor/TopologyBuilder.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/TopologyBuilder.java b/streams/src/main/java/org/apache/kafka/streams/processor/TopologyBuilder.java
index 4508c77..ce6ba7b 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/TopologyBuilder.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/TopologyBuilder.java
@@ -19,39 +19,25 @@ package org.apache.kafka.streams.processor;
import org.apache.kafka.common.annotation.InterfaceStability;
import org.apache.kafka.common.serialization.Deserializer;
import org.apache.kafka.common.serialization.Serializer;
-import org.apache.kafka.common.utils.Utils;
import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.errors.TopologyBuilderException;
import org.apache.kafka.streams.kstream.KStream;
import org.apache.kafka.streams.kstream.KTable;
import org.apache.kafka.streams.processor.internals.InternalTopicConfig;
+import org.apache.kafka.streams.processor.internals.InternalTopologyBuilder;
import org.apache.kafka.streams.processor.internals.ProcessorNode;
-import org.apache.kafka.streams.processor.internals.ProcessorStateManager;
import org.apache.kafka.streams.processor.internals.ProcessorTopology;
-import org.apache.kafka.streams.processor.internals.QuickUnion;
import org.apache.kafka.streams.processor.internals.SinkNode;
import org.apache.kafka.streams.processor.internals.SourceNode;
import org.apache.kafka.streams.processor.internals.StreamPartitionAssignor.SubscriptionUpdates;
import org.apache.kafka.streams.state.KeyValueStore;
-import org.apache.kafka.streams.state.internals.WindowStoreSupplier;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-import java.util.ArrayList;
-import java.util.Arrays;
import java.util.Collection;
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.Iterator;
-import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
-import java.util.Objects;
import java.util.Set;
import java.util.regex.Pattern;
-
/**
* A component that is used to build a {@link ProcessorTopology}. A topology contains an acyclic graph of sources, processors,
* and sinks. A {@link SourceNode source} is a node in the graph that consumes one or more Kafka topics and forwards them to
@@ -64,238 +50,30 @@ import java.util.regex.Pattern;
@InterfaceStability.Evolving
public class TopologyBuilder {
- private static final Logger log = LoggerFactory.getLogger(TopologyBuilder.class);
-
- private static final Pattern EMPTY_ZERO_LENGTH_PATTERN = Pattern.compile("");
-
- // node factories in a topological order
- private final LinkedHashMap<String, NodeFactory> nodeFactories = new LinkedHashMap<>();
-
- // state factories
- private final Map<String, StateStoreFactory> stateFactories = new HashMap<>();
-
- // global state factories
- private final Map<String, StateStore> globalStateStores = new LinkedHashMap<>();
-
- // all topics subscribed from source processors (without application-id prefix for internal topics)
- private final Set<String> sourceTopicNames = new HashSet<>();
-
- // all internal topics auto-created by the topology builder and used in source / sink processors
- private final Set<String> internalTopicNames = new HashSet<>();
-
- // groups of source processors that need to be copartitioned
- private final List<Set<String>> copartitionSourceGroups = new ArrayList<>();
-
- // map from source processor names to subscribed topics (without application-id prefix for internal topics)
- private final HashMap<String, List<String>> nodeToSourceTopics = new HashMap<>();
-
- // map from source processor names to regex subscription patterns
- private final HashMap<String, Pattern> nodeToSourcePatterns = new LinkedHashMap<>();
-
- // map from sink processor names to subscribed topic (without application-id prefix for internal topics)
- private final HashMap<String, String> nodeToSinkTopic = new HashMap<>();
-
- // map from topics to their matched regex patterns, this is to ensure one topic is passed through on source node
- // even if it can be matched by multiple regex patterns
- private final HashMap<String, Pattern> topicToPatterns = new HashMap<>();
-
- // map from state store names to all the topics subscribed from source processors that
- // are connected to these state stores
- private final Map<String, Set<String>> stateStoreNameToSourceTopics = new HashMap<>();
-
- // map from state store names to all the regex subscribed topics from source processors that
- // are connected to these state stores
- private final Map<String, Set<Pattern>> stateStoreNameToSourceRegex = new HashMap<>();
-
- // map from state store names to this state store's corresponding changelog topic if possible,
- // this is used in the extended KStreamBuilder.
- private final Map<String, String> storeToChangelogTopic = new HashMap<>();
-
- // all global topics
- private final Set<String> globalTopics = new HashSet<>();
-
- private final Set<String> earliestResetTopics = new HashSet<>();
-
- private final Set<String> latestResetTopics = new HashSet<>();
-
- private final Set<Pattern> earliestResetPatterns = new HashSet<>();
-
- private final Set<Pattern> latestResetPatterns = new HashSet<>();
-
- private final QuickUnion<String> nodeGrouper = new QuickUnion<>();
-
- private SubscriptionUpdates subscriptionUpdates = new SubscriptionUpdates();
-
- private String applicationId = null;
-
- private Pattern topicPattern = null;
-
- private Map<Integer, Set<String>> nodeGroups = null;
-
- private static class StateStoreFactory {
- public final Set<String> users;
-
- public final StateStoreSupplier supplier;
-
- StateStoreFactory(StateStoreSupplier supplier) {
- this.supplier = supplier;
- this.users = new HashSet<>();
- }
- }
-
- private static abstract class NodeFactory {
- final String name;
- final String[] parents;
-
- NodeFactory(final String name, final String[] parents) {
- this.name = name;
- this.parents = parents;
- }
-
- public abstract ProcessorNode build();
-
- abstract TopologyDescription.AbstractNode describe();
- }
-
- private static class ProcessorNodeFactory extends NodeFactory {
- private final ProcessorSupplier<?, ?> supplier;
- private final Set<String> stateStoreNames = new HashSet<>();
-
- ProcessorNodeFactory(String name, String[] parents, ProcessorSupplier<?, ?> supplier) {
- super(name, parents.clone());
- this.supplier = supplier;
- }
-
- public void addStateStore(String stateStoreName) {
- stateStoreNames.add(stateStoreName);
- }
-
- @Override
- public ProcessorNode build() {
- return new ProcessorNode<>(name, supplier.get(), stateStoreNames);
- }
-
- @Override
- TopologyDescription.Processor describe() {
- return new TopologyDescription.Processor(name, new HashSet<>(stateStoreNames));
- }
- }
-
- private class SourceNodeFactory extends NodeFactory {
- private final List<String> topics;
- private final Pattern pattern;
- private final Deserializer<?> keyDeserializer;
- private final Deserializer<?> valDeserializer;
- private final TimestampExtractor timestampExtractor;
-
- private SourceNodeFactory(final String name,
- final String[] topics,
- final Pattern pattern,
- final TimestampExtractor timestampExtractor,
- final Deserializer<?> keyDeserializer,
- final Deserializer<?> valDeserializer) {
- super(name, new String[0]);
- this.topics = topics != null ? Arrays.asList(topics) : new ArrayList<String>();
- this.pattern = pattern;
- this.keyDeserializer = keyDeserializer;
- this.valDeserializer = valDeserializer;
- this.timestampExtractor = timestampExtractor;
- }
-
- List<String> getTopics(Collection<String> subscribedTopics) {
- // if it is subscribed via patterns, it is possible that the topic metadata has not been updated
- // yet and hence the map from source node to topics is stale, in this case we put the pattern as a place holder;
- // this should only happen for debugging since during runtime this function should always be called after the metadata has updated.
- if (subscribedTopics.isEmpty())
- return Collections.singletonList("" + pattern + "");
-
- List<String> matchedTopics = new ArrayList<>();
- for (String update : subscribedTopics) {
- if (this.pattern == topicToPatterns.get(update)) {
- matchedTopics.add(update);
- } else if (topicToPatterns.containsKey(update) && isMatch(update)) {
- // the same topic cannot be matched to more than one pattern
- // TODO: we should lift this requirement in the future
- throw new TopologyBuilderException("Topic " + update +
- " is already matched for another regex pattern " + topicToPatterns.get(update) +
- " and hence cannot be matched to this regex pattern " + pattern + " any more.");
- } else if (isMatch(update)) {
- topicToPatterns.put(update, this.pattern);
- matchedTopics.add(update);
- }
- }
- return matchedTopics;
- }
-
- @Override
- public ProcessorNode build() {
- final List<String> sourceTopics = nodeToSourceTopics.get(name);
-
- // if it is subscribed via patterns, it is possible that the topic metadata has not been updated
- // yet and hence the map from source node to topics is stale, in this case we put the pattern as a place holder;
- // this should only happen for debugging since during runtime this function should always be called after the metadata has updated.
- if (sourceTopics == null)
- return new SourceNode<>(name, Collections.singletonList("" + pattern + ""), timestampExtractor, keyDeserializer, valDeserializer);
- else
- return new SourceNode<>(name, maybeDecorateInternalSourceTopics(sourceTopics), timestampExtractor, keyDeserializer, valDeserializer);
- }
-
- private boolean isMatch(String topic) {
- return this.pattern.matcher(topic).matches();
- }
-
- @Override
- TopologyDescription.Source describe() {
- String sourceTopics;
-
- if (pattern == null) {
- sourceTopics = topics.toString();
- sourceTopics = sourceTopics.substring(1, sourceTopics.length() - 1); // trim first and last, ie. []
- } else {
- sourceTopics = pattern.toString();
- }
-
- return new TopologyDescription.Source(name, sourceTopics);
- }
- }
-
- private class SinkNodeFactory<K, V> extends NodeFactory {
- private final String topic;
- private final Serializer<K> keySerializer;
- private final Serializer<V> valSerializer;
- private final StreamPartitioner<? super K, ? super V> partitioner;
-
- private SinkNodeFactory(String name, String[] parents, String topic, Serializer<K> keySerializer, Serializer<V> valSerializer, StreamPartitioner<? super K, ? super V> partitioner) {
- super(name, parents.clone());
- this.topic = topic;
- this.keySerializer = keySerializer;
- this.valSerializer = valSerializer;
- this.partitioner = partitioner;
- }
-
- @Override
- public ProcessorNode build() {
- if (internalTopicNames.contains(topic)) {
- // prefix the internal topic name with the application id
- return new SinkNode<>(name, decorateTopic(topic), keySerializer, valSerializer, partitioner);
- } else {
- return new SinkNode<>(name, topic, keySerializer, valSerializer, partitioner);
- }
- }
-
- @Override
- TopologyDescription.Sink describe() {
- return new TopologyDescription.Sink(name, topic);
- }
- }
+ /**
+ * NOTE this member would not needed by developers working with the processor APIs, but only used
+ * for internal functionalities.
+ * @deprecated not part of public API and for internal usage only
+ */
+ @Deprecated
+ public final InternalTopologyBuilder internalTopologyBuilder = new InternalTopologyBuilder();
+ /**
+ * NOTE this class would not needed by developers working with the processor APIs, but only used
+ * for internal functionalities.
+ * @deprecated not part of public API and for internal usage only
+ */
+ @Deprecated
public static class TopicsInfo {
public Set<String> sinkTopics;
public Set<String> sourceTopics;
public Map<String, InternalTopicConfig> stateChangelogTopics;
public Map<String, InternalTopicConfig> repartitionSourceTopics;
- TopicsInfo(Set<String> sinkTopics, Set<String> sourceTopics, Map<String, InternalTopicConfig> repartitionSourceTopics, Map<String, InternalTopicConfig> stateChangelogTopics) {
+ public TopicsInfo(final Set<String> sinkTopics,
+ final Set<String> sourceTopics,
+ final Map<String, InternalTopicConfig> repartitionSourceTopics,
+ final Map<String, InternalTopicConfig> stateChangelogTopics) {
this.sinkTopics = sinkTopics;
this.sourceTopics = sourceTopics;
this.stateChangelogTopics = stateChangelogTopics;
@@ -303,10 +81,10 @@ public class TopologyBuilder {
}
@Override
- public boolean equals(Object o) {
+ public boolean equals(final Object o) {
if (o instanceof TopicsInfo) {
- TopicsInfo other = (TopicsInfo) o;
- return other.sourceTopics.equals(this.sourceTopics) && other.stateChangelogTopics.equals(this.stateChangelogTopics);
+ final TopicsInfo other = (TopicsInfo) o;
+ return other.sourceTopics.equals(sourceTopics) && other.stateChangelogTopics.equals(stateChangelogTopics);
} else {
return false;
}
@@ -314,7 +92,7 @@ public class TopologyBuilder {
@Override
public int hashCode() {
- long n = ((long) sourceTopics.hashCode() << 32) | (long) stateChangelogTopics.hashCode();
+ final long n = ((long) sourceTopics.hashCode() << 32) | (long) stateChangelogTopics.hashCode();
return (int) (n % 0xFFFFFFFFL);
}
@@ -341,19 +119,10 @@ public class TopologyBuilder {
*/
public TopologyBuilder() {}
- /**
- * Set the applicationId to be used for auto-generated internal topics.
- *
- * This is required before calling {@link #topicGroups}, {@link #copartitionSources},
- * {@link #stateStoreNameToSourceTopics} and {@link #build(Integer)}.
- *
- * @param applicationId the streams applicationId. Should be the same as set by
- * {@link org.apache.kafka.streams.StreamsConfig#APPLICATION_ID_CONFIG}
- */
+ /** @deprecated This class is not part of public API and should never be used by a developer. */
+ @Deprecated
public synchronized final TopologyBuilder setApplicationId(final String applicationId) {
- Objects.requireNonNull(applicationId, "applicationId can't be null");
- this.applicationId = applicationId;
-
+ internalTopologyBuilder.setApplicationId(applicationId);
return this;
}
@@ -369,8 +138,10 @@ public class TopologyBuilder {
* @param topics the name of one or more Kafka topics that this source is to consume
* @return this builder instance so methods can be chained together; never null
*/
- public synchronized final TopologyBuilder addSource(final String name, final String... topics) {
- return addSource(null, name, null, null, null, topics);
+ public synchronized final TopologyBuilder addSource(final String name,
+ final String... topics) {
+ internalTopologyBuilder.addSource(null, name, null, null, null, topics);
+ return this;
}
/**
@@ -386,9 +157,11 @@ public class TopologyBuilder {
* @param topics the name of one or more Kafka topics that this source is to consume
* @return this builder instance so methods can be chained together; never null
*/
-
- public synchronized final TopologyBuilder addSource(final AutoOffsetReset offsetReset, final String name, final String... topics) {
- return addSource(offsetReset, name, null, null, null, topics);
+ public synchronized final TopologyBuilder addSource(final AutoOffsetReset offsetReset,
+ final String name,
+ final String... topics) {
+ internalTopologyBuilder.addSource(offsetReset, name, null, null, null, topics);
+ return this;
}
/**
@@ -404,8 +177,10 @@ public class TopologyBuilder {
* @param topics the name of one or more Kafka topics that this source is to consume
* @return this builder instance so methods can be chained together; never null
*/
- public synchronized final TopologyBuilder addSource(final TimestampExtractor timestampExtractor, final String name, final String... topics) {
- return addSource(null, name, timestampExtractor, null, null, topics);
+ public synchronized final TopologyBuilder addSource(final TimestampExtractor timestampExtractor,
+ final String name, final String... topics) {
+ internalTopologyBuilder.addSource(null, name, timestampExtractor, null, null, topics);
+ return this;
}
/**
@@ -423,8 +198,10 @@ public class TopologyBuilder {
* @param topics the name of one or more Kafka topics that this source is to consume
* @return this builder instance so methods can be chained together; never null
*/
- public synchronized final TopologyBuilder addSource(final AutoOffsetReset offsetReset, final TimestampExtractor timestampExtractor, final String name, final String... topics) {
- return addSource(offsetReset, name, timestampExtractor, null, null, topics);
+ public synchronized final TopologyBuilder addSource(final AutoOffsetReset offsetReset,
+ final TimestampExtractor timestampExtractor, final String name, final String... topics) {
+ internalTopologyBuilder.addSource(offsetReset, name, timestampExtractor, null, null, topics);
+ return this;
}
/**
@@ -440,9 +217,10 @@ public class TopologyBuilder {
* @param topicPattern regular expression pattern to match Kafka topics that this source is to consume
* @return this builder instance so methods can be chained together; never null
*/
-
- public synchronized final TopologyBuilder addSource(final String name, final Pattern topicPattern) {
- return addSource(null, name, null, null, null, topicPattern);
+ public synchronized final TopologyBuilder addSource(final String name,
+ final Pattern topicPattern) {
+ internalTopologyBuilder.addSource(null, name, null, null, null, topicPattern);
+ return this;
}
/**
@@ -459,9 +237,11 @@ public class TopologyBuilder {
* @param topicPattern regular expression pattern to match Kafka topics that this source is to consume
* @return this builder instance so methods can be chained together; never null
*/
-
- public synchronized final TopologyBuilder addSource(final AutoOffsetReset offsetReset, final String name, final Pattern topicPattern) {
- return addSource(offsetReset, name, null, null, null, topicPattern);
+ public synchronized final TopologyBuilder addSource(final AutoOffsetReset offsetReset,
+ final String name,
+ final Pattern topicPattern) {
+ internalTopologyBuilder.addSource(offsetReset, name, null, null, null, topicPattern);
+ return this;
}
@@ -479,8 +259,11 @@ public class TopologyBuilder {
* @param topicPattern regular expression pattern to match Kafka topics that this source is to consume
* @return this builder instance so methods can be chained together; never null
*/
- public synchronized final TopologyBuilder addSource(final TimestampExtractor timestampExtractor, final String name, final Pattern topicPattern) {
- return addSource(null, name, timestampExtractor, null, null, topicPattern);
+ public synchronized final TopologyBuilder addSource(final TimestampExtractor timestampExtractor,
+ final String name,
+ final Pattern topicPattern) {
+ internalTopologyBuilder.addSource(null, name, timestampExtractor, null, null, topicPattern);
+ return this;
}
@@ -500,8 +283,12 @@ public class TopologyBuilder {
* @param topicPattern regular expression pattern to match Kafka topics that this source is to consume
* @return this builder instance so methods can be chained together; never null
*/
- public synchronized final TopologyBuilder addSource(final AutoOffsetReset offsetReset, final TimestampExtractor timestampExtractor, final String name, final Pattern topicPattern) {
- return addSource(offsetReset, name, timestampExtractor, null, null, topicPattern);
+ public synchronized final TopologyBuilder addSource(final AutoOffsetReset offsetReset,
+ final TimestampExtractor timestampExtractor,
+ final String name,
+ final Pattern topicPattern) {
+ internalTopologyBuilder.addSource(offsetReset, name, timestampExtractor, null, null, topicPattern);
+ return this;
}
@@ -521,8 +308,12 @@ public class TopologyBuilder {
* @throws TopologyBuilderException if processor is already added or if topics have already been registered by another source
*/
- public synchronized final TopologyBuilder addSource(final String name, final Deserializer keyDeserializer, final Deserializer valDeserializer, final String... topics) {
- return addSource(null, name, null, keyDeserializer, valDeserializer, topics);
+ public synchronized final TopologyBuilder addSource(final String name,
+ final Deserializer keyDeserializer,
+ final Deserializer valDeserializer,
+ final String... topics) {
+ internalTopologyBuilder.addSource(null, name, null, keyDeserializer, valDeserializer, topics);
+ return this;
}
/**
@@ -550,24 +341,7 @@ public class TopologyBuilder {
final Deserializer keyDeserializer,
final Deserializer valDeserializer,
final String... topics) {
- if (topics.length == 0) {
- throw new TopologyBuilderException("You must provide at least one topic");
- }
- Objects.requireNonNull(name, "name must not be null");
- if (nodeFactories.containsKey(name))
- throw new TopologyBuilderException("Processor " + name + " is already added.");
-
- for (String topic : topics) {
- Objects.requireNonNull(topic, "topic names cannot be null");
- validateTopicNotAlreadyRegistered(topic);
- maybeAddToResetList(earliestResetTopics, latestResetTopics, offsetReset, topic);
- sourceTopicNames.add(topic);
- }
-
- nodeFactories.put(name, new SourceNodeFactory(name, topics, null, timestampExtractor, keyDeserializer, valDeserializer));
- nodeToSourceTopics.put(name, Arrays.asList(topics));
- nodeGrouper.add(name);
-
+ internalTopologyBuilder.addSource(offsetReset, name, timestampExtractor, keyDeserializer, valDeserializer, topics);
return this;
}
@@ -600,11 +374,10 @@ public class TopologyBuilder {
final String topic,
final String processorName,
final ProcessorSupplier stateUpdateSupplier) {
- return addGlobalStore(storeSupplier, sourceName, null, keyDeserializer, valueDeserializer, topic, processorName, stateUpdateSupplier);
+ internalTopologyBuilder.addGlobalStore(storeSupplier, sourceName, null, keyDeserializer, valueDeserializer, topic, processorName, stateUpdateSupplier);
+ return this;
}
-
-
/**
* Adds a global {@link StateStore} to the topology. The {@link StateStore} sources its data
* from all partitions of the provided input topic. There will be exactly one instance of this
@@ -636,58 +409,8 @@ public class TopologyBuilder {
final String topic,
final String processorName,
final ProcessorSupplier stateUpdateSupplier) {
- Objects.requireNonNull(storeSupplier, "store supplier must not be null");
- Objects.requireNonNull(sourceName, "sourceName must not be null");
- Objects.requireNonNull(topic, "topic must not be null");
- Objects.requireNonNull(stateUpdateSupplier, "supplier must not be null");
- Objects.requireNonNull(processorName, "processorName must not be null");
- if (nodeFactories.containsKey(sourceName)) {
- throw new TopologyBuilderException("Processor " + sourceName + " is already added.");
- }
- if (nodeFactories.containsKey(processorName)) {
- throw new TopologyBuilderException("Processor " + processorName + " is already added.");
- }
- if (stateFactories.containsKey(storeSupplier.name()) || globalStateStores.containsKey(storeSupplier.name())) {
- throw new TopologyBuilderException("StateStore " + storeSupplier.name() + " is already added.");
- }
- if (storeSupplier.loggingEnabled()) {
- throw new TopologyBuilderException("StateStore " + storeSupplier.name() + " for global table must not have logging enabled.");
- }
- if (sourceName.equals(processorName)) {
- throw new TopologyBuilderException("sourceName and processorName must be different.");
- }
-
- validateTopicNotAlreadyRegistered(topic);
-
- globalTopics.add(topic);
- final String[] topics = {topic};
- nodeFactories.put(sourceName, new SourceNodeFactory(sourceName, topics, null, timestampExtractor, keyDeserializer, valueDeserializer));
- nodeToSourceTopics.put(sourceName, Arrays.asList(topics));
- nodeGrouper.add(sourceName);
-
- final String[] parents = {sourceName};
- final ProcessorNodeFactory nodeFactory = new ProcessorNodeFactory(processorName, parents, stateUpdateSupplier);
- nodeFactory.addStateStore(storeSupplier.name());
- nodeFactories.put(processorName, nodeFactory);
- nodeGrouper.add(processorName);
- nodeGrouper.unite(processorName, parents);
-
- globalStateStores.put(storeSupplier.name(), storeSupplier.get());
- connectSourceStoreAndTopic(storeSupplier.name(), topic);
+ internalTopologyBuilder.addGlobalStore(storeSupplier, sourceName, timestampExtractor, keyDeserializer, valueDeserializer, topic, processorName, stateUpdateSupplier);
return this;
-
- }
-
- private void validateTopicNotAlreadyRegistered(final String topic) {
- if (sourceTopicNames.contains(topic) || globalTopics.contains(topic)) {
- throw new TopologyBuilderException("Topic " + topic + " has already been registered by another source.");
- }
-
- for (Pattern pattern : nodeToSourcePatterns.values()) {
- if (pattern.matcher(topic).matches()) {
- throw new TopologyBuilderException("Topic " + topic + " matches a Pattern already registered by another source.");
- }
- }
}
/**
@@ -708,9 +431,12 @@ public class TopologyBuilder {
* @return this builder instance so methods can be chained together; never null
* @throws TopologyBuilderException if processor is already added or if topics have already been registered by name
*/
-
- public synchronized final TopologyBuilder addSource(final String name, final Deserializer keyDeserializer, final Deserializer valDeserializer, final Pattern topicPattern) {
- return addSource(null, name, null, keyDeserializer, valDeserializer, topicPattern);
+ public synchronized final TopologyBuilder addSource(final String name,
+ final Deserializer keyDeserializer,
+ final Deserializer valDeserializer,
+ final Pattern topicPattern) {
+ internalTopologyBuilder.addSource(null, name, null, keyDeserializer, valDeserializer, topicPattern);
+ return this;
}
/**
@@ -734,36 +460,16 @@ public class TopologyBuilder {
* @return this builder instance so methods can be chained together; never null
* @throws TopologyBuilderException if processor is already added or if topics have already been registered by name
*/
-
public synchronized final TopologyBuilder addSource(final AutoOffsetReset offsetReset,
final String name,
final TimestampExtractor timestampExtractor,
final Deserializer keyDeserializer,
final Deserializer valDeserializer,
final Pattern topicPattern) {
- Objects.requireNonNull(topicPattern, "topicPattern can't be null");
- Objects.requireNonNull(name, "name can't be null");
-
- if (nodeFactories.containsKey(name)) {
- throw new TopologyBuilderException("Processor " + name + " is already added.");
- }
-
- for (String sourceTopicName : sourceTopicNames) {
- if (topicPattern.matcher(sourceTopicName).matches()) {
- throw new TopologyBuilderException("Pattern " + topicPattern + " will match a topic that has already been registered by another source.");
- }
- }
-
- maybeAddToResetList(earliestResetPatterns, latestResetPatterns, offsetReset, topicPattern);
-
- nodeFactories.put(name, new SourceNodeFactory(name, null, topicPattern, timestampExtractor, keyDeserializer, valDeserializer));
- nodeToSourcePatterns.put(name, topicPattern);
- nodeGrouper.add(name);
-
+ internalTopologyBuilder.addSource(offsetReset, name, timestampExtractor, keyDeserializer, valDeserializer, topicPattern);
return this;
}
-
/**
* Add a new source that consumes from topics matching the given pattern
* and forwards the records to child processor and/or sink nodes.
@@ -783,37 +489,40 @@ public class TopologyBuilder {
* @return this builder instance so methods can be chained together; never null
* @throws TopologyBuilderException if processor is already added or if topics have already been registered by name
*/
-
public synchronized final TopologyBuilder addSource(final AutoOffsetReset offsetReset,
final String name,
final Deserializer keyDeserializer,
final Deserializer valDeserializer,
final Pattern topicPattern) {
- return addSource(offsetReset, name, null, keyDeserializer, valDeserializer, topicPattern);
+ internalTopologyBuilder.addSource(offsetReset, name, null, keyDeserializer, valDeserializer, topicPattern);
+ return this;
}
/**
- * Add a new sink that forwards records from upstream parent processor and/or source nodes to the named Kafka topic.
+ * Add a new sink that forwards records from predecessor nodes (processors and/or sources) to the named Kafka topic.
* The sink will use the {@link org.apache.kafka.streams.StreamsConfig#DEFAULT_KEY_SERDE_CLASS_CONFIG default key serializer} and
* {@link org.apache.kafka.streams.StreamsConfig#DEFAULT_VALUE_SERDE_CLASS_CONFIG default value serializer} specified in the
* {@link org.apache.kafka.streams.StreamsConfig stream configuration}.
*
* @param name the unique name of the sink
* @param topic the name of the Kafka topic to which this sink should write its records
- * @param parentNames the name of one or more source or processor nodes whose output records this sink should consume
+ * @param predecessorNames the name of one or more source or processor nodes whose output records this sink should consume
* and write to its topic
* @return this builder instance so methods can be chained together; never null
* @see #addSink(String, String, StreamPartitioner, String...)
* @see #addSink(String, String, Serializer, Serializer, String...)
* @see #addSink(String, String, Serializer, Serializer, StreamPartitioner, String...)
*/
- public synchronized final TopologyBuilder addSink(final String name, final String topic, final String... parentNames) {
- return addSink(name, topic, null, null, parentNames);
+ public synchronized final TopologyBuilder addSink(final String name,
+ final String topic,
+ final String... predecessorNames) {
+ internalTopologyBuilder.addSink(name, topic, null, null, null, predecessorNames);
+ return this;
}
/**
- * Add a new sink that forwards records from upstream parent processor and/or source nodes to the named Kafka topic, using
+ * Add a new sink that forwards records from predecessor nodes (processors and/or sources) to the named Kafka topic, using
* the supplied partitioner.
* The sink will use the {@link org.apache.kafka.streams.StreamsConfig#DEFAULT_KEY_SERDE_CLASS_CONFIG default key serializer} and
* {@link org.apache.kafka.streams.StreamsConfig#DEFAULT_VALUE_SERDE_CLASS_CONFIG default value serializer} specified in the
@@ -828,19 +537,23 @@ public class TopologyBuilder {
* @param name the unique name of the sink
* @param topic the name of the Kafka topic to which this sink should write its records
* @param partitioner the function that should be used to determine the partition for each record processed by the sink
- * @param parentNames the name of one or more source or processor nodes whose output records this sink should consume
+ * @param predecessorNames the name of one or more source or processor nodes whose output records this sink should consume
* and write to its topic
* @return this builder instance so methods can be chained together; never null
* @see #addSink(String, String, String...)
* @see #addSink(String, String, Serializer, Serializer, String...)
* @see #addSink(String, String, Serializer, Serializer, StreamPartitioner, String...)
*/
- public synchronized final TopologyBuilder addSink(final String name, final String topic, final StreamPartitioner partitioner, final String... parentNames) {
- return addSink(name, topic, null, null, partitioner, parentNames);
+ public synchronized final TopologyBuilder addSink(final String name,
+ final String topic,
+ final StreamPartitioner partitioner,
+ final String... predecessorNames) {
+ internalTopologyBuilder.addSink(name, topic, null, null, partitioner, predecessorNames);
+ return this;
}
/**
- * Add a new sink that forwards records from upstream parent processor and/or source nodes to the named Kafka topic.
+ * Add a new sink that forwards records from predecessor nodes (processors and/or sources) to the named Kafka topic.
* The sink will use the specified key and value serializers.
*
* @param name the unique name of the sink
@@ -851,19 +564,24 @@ public class TopologyBuilder {
* @param valSerializer the {@link Serializer value serializer} used when consuming records; may be null if the sink
* should use the {@link org.apache.kafka.streams.StreamsConfig#DEFAULT_VALUE_SERDE_CLASS_CONFIG default value serializer} specified in the
* {@link org.apache.kafka.streams.StreamsConfig stream configuration}
- * @param parentNames the name of one or more source or processor nodes whose output records this sink should consume
+ * @param predecessorNames the name of one or more source or processor nodes whose output records this sink should consume
* and write to its topic
* @return this builder instance so methods can be chained together; never null
* @see #addSink(String, String, String...)
* @see #addSink(String, String, StreamPartitioner, String...)
* @see #addSink(String, String, Serializer, Serializer, StreamPartitioner, String...)
*/
- public synchronized final TopologyBuilder addSink(final String name, final String topic, final Serializer keySerializer, final Serializer valSerializer, final String... parentNames) {
- return addSink(name, topic, keySerializer, valSerializer, null, parentNames);
+ public synchronized final TopologyBuilder addSink(final String name,
+ final String topic,
+ final Serializer keySerializer,
+ final Serializer valSerializer,
+ final String... predecessorNames) {
+ internalTopologyBuilder.addSink(name, topic, keySerializer, valSerializer, null, predecessorNames);
+ return this;
}
/**
- * Add a new sink that forwards records from upstream parent processor and/or source nodes to the named Kafka topic.
+ * Add a new sink that forwards records from predecessor nodes (processors and/or sources) to the named Kafka topic.
* The sink will use the specified key and value serializers, and the supplied partitioner.
*
* @param name the unique name of the sink
@@ -875,66 +593,41 @@ public class TopologyBuilder {
* should use the {@link org.apache.kafka.streams.StreamsConfig#DEFAULT_VALUE_SERDE_CLASS_CONFIG default value serializer} specified in the
* {@link org.apache.kafka.streams.StreamsConfig stream configuration}
* @param partitioner the function that should be used to determine the partition for each record processed by the sink
- * @param parentNames the name of one or more source or processor nodes whose output records this sink should consume
+ * @param predecessorNames the name of one or more source or processor nodes whose output records this sink should consume
* and write to its topic
* @return this builder instance so methods can be chained together; never null
* @see #addSink(String, String, String...)
* @see #addSink(String, String, StreamPartitioner, String...)
* @see #addSink(String, String, Serializer, Serializer, String...)
- * @throws TopologyBuilderException if parent processor is not added yet, or if this processor's name is equal to the parent's name
- */
- public synchronized final <K, V> TopologyBuilder addSink(final String name, final String topic, final Serializer<K> keySerializer, final Serializer<V> valSerializer, final StreamPartitioner<? super K, ? super V> partitioner, final String... parentNames) {
- Objects.requireNonNull(name, "name must not be null");
- Objects.requireNonNull(topic, "topic must not be null");
- if (nodeFactories.containsKey(name))
- throw new TopologyBuilderException("Processor " + name + " is already added.");
-
- for (final String parent : parentNames) {
- if (parent.equals(name)) {
- throw new TopologyBuilderException("Processor " + name + " cannot be a parent of itself.");
- }
- if (!nodeFactories.containsKey(parent)) {
- throw new TopologyBuilderException("Parent processor " + parent + " is not added yet.");
- }
- }
-
- nodeFactories.put(name, new SinkNodeFactory<>(name, parentNames, topic, keySerializer, valSerializer, partitioner));
- nodeToSinkTopic.put(name, topic);
- nodeGrouper.add(name);
- nodeGrouper.unite(name, parentNames);
+ * @throws TopologyBuilderException if predecessor is not added yet, or if this processor's name is equal to the predecessor's name
+ */
+ public synchronized final <K, V> TopologyBuilder addSink(final String name,
+ final String topic,
+ final Serializer<K> keySerializer,
+ final Serializer<V> valSerializer,
+ final StreamPartitioner<? super K, ? super V> partitioner,
+ final String... predecessorNames) {
+ internalTopologyBuilder.addSink(name, topic, keySerializer, valSerializer, partitioner, predecessorNames);
return this;
}
/**
- * Add a new processor node that receives and processes records output by one or more parent source or processor node.
+ * Add a new processor node that receives and processes records output by one or more predecessor source or processor node.
* Any new record output by this processor will be forwarded to its child processor or sink nodes.
* @param name the unique name of the processor node
* @param supplier the supplier used to obtain this node's {@link Processor} instance
- * @param parentNames the name of one or more source or processor nodes whose output records this processor should receive
+ * @param predecessorNames the name of one or more source or processor nodes whose output records this processor should receive
* and process
* @return this builder instance so methods can be chained together; never null
- * @throws TopologyBuilderException if parent processor is not added yet, or if this processor's name is equal to the parent's name
+ * @throws TopologyBuilderException if predecessor is not added yet, or if this processor's name is equal to the predecessor's name
*/
- public synchronized final TopologyBuilder addProcessor(final String name, final ProcessorSupplier supplier, final String... parentNames) {
- Objects.requireNonNull(name, "name must not be null");
- Objects.requireNonNull(supplier, "supplier must not be null");
- if (nodeFactories.containsKey(name))
- throw new TopologyBuilderException("Processor " + name + " is already added.");
-
- for (final String parent : parentNames) {
- if (parent.equals(name)) {
- throw new TopologyBuilderException("Processor " + name + " cannot be a parent of itself.");
- }
- if (!nodeFactories.containsKey(parent)) {
- throw new TopologyBuilderException("Parent processor " + parent + " is not added yet.");
- }
- }
-
- nodeFactories.put(name, new ProcessorNodeFactory(name, parentNames, supplier));
- nodeGrouper.add(name);
- nodeGrouper.unite(name, parentNames);
+ public synchronized final TopologyBuilder addProcessor(final String name,
+ final ProcessorSupplier supplier,
+ final String... predecessorNames) {
+ internalTopologyBuilder.addProcessor(name, supplier, predecessorNames);
return this;
}
+
/**
* Adds a state store
*
@@ -942,20 +635,9 @@ public class TopologyBuilder {
* @return this builder instance so methods can be chained together; never null
* @throws TopologyBuilderException if state store supplier is already added
*/
- public synchronized final TopologyBuilder addStateStore(final StateStoreSupplier supplier, final String... processorNames) {
- Objects.requireNonNull(supplier, "supplier can't be null");
- if (stateFactories.containsKey(supplier.name())) {
- throw new TopologyBuilderException("StateStore " + supplier.name() + " is already added.");
- }
-
- stateFactories.put(supplier.name(), new StateStoreFactory(supplier));
-
- if (processorNames != null) {
- for (String processorName : processorNames) {
- connectProcessorAndStateStore(processorName, supplier.name());
- }
- }
-
+ public synchronized final TopologyBuilder addStateStore(final StateStoreSupplier supplier,
+ final String... processorNames) {
+ internalTopologyBuilder.addStateStore(supplier, processorNames);
return this;
}
@@ -966,26 +648,25 @@ public class TopologyBuilder {
* @param stateStoreNames the names of state stores that the processor uses
* @return this builder instance so methods can be chained together; never null
*/
- public synchronized final TopologyBuilder connectProcessorAndStateStores(final String processorName, final String... stateStoreNames) {
- Objects.requireNonNull(processorName, "processorName can't be null");
- if (stateStoreNames != null) {
- for (String stateStoreName : stateStoreNames) {
- connectProcessorAndStateStore(processorName, stateStoreName);
- }
- }
-
+ public synchronized final TopologyBuilder connectProcessorAndStateStores(final String processorName,
+ final String... stateStoreNames) {
+ internalTopologyBuilder.connectProcessorAndStateStores(processorName, stateStoreNames);
return this;
}
/**
* This is used only for KStreamBuilder: when adding a KTable from a source topic,
* we need to add the topic as the KTable's materialized state store's changelog.
+ *
+ * NOTE this function would not needed by developers working with the processor APIs, but only used
+ * for the high-level DSL parsing functionalities.
+ *
+ * @deprecated not part of public API and for internal usage only
*/
- protected synchronized final TopologyBuilder connectSourceStoreAndTopic(final String sourceStoreName, final String topic) {
- if (storeToChangelogTopic.containsKey(sourceStoreName)) {
- throw new TopologyBuilderException("Source store " + sourceStoreName + " is already added.");
- }
- storeToChangelogTopic.put(sourceStoreName, topic);
+ @Deprecated
+ protected synchronized final TopologyBuilder connectSourceStoreAndTopic(final String sourceStoreName,
+ final String topic) {
+ internalTopologyBuilder.connectSourceStoreAndTopic(sourceStoreName, topic);
return this;
}
@@ -998,678 +679,206 @@ public class TopologyBuilder {
* @param processorNames the name of the processors
* @return this builder instance so methods can be chained together; never null
* @throws TopologyBuilderException if less than two processors are specified, or if one of the processors is not added yet
+ * @deprecated not part of public API and for internal usage only
*/
+ @Deprecated
public synchronized final TopologyBuilder connectProcessors(final String... processorNames) {
- if (processorNames.length < 2)
- throw new TopologyBuilderException("At least two processors need to participate in the connection.");
-
- for (String processorName : processorNames) {
- if (!nodeFactories.containsKey(processorName))
- throw new TopologyBuilderException("Processor " + processorName + " is not added yet.");
-
- }
-
- String firstProcessorName = processorNames[0];
-
- nodeGrouper.unite(firstProcessorName, Arrays.copyOfRange(processorNames, 1, processorNames.length));
-
+ internalTopologyBuilder.connectProcessors(processorNames);
return this;
}
/**
* Adds an internal topic
*
+ * NOTE this function would not needed by developers working with the processor APIs, but only used
+ * for the high-level DSL parsing functionalities.
+ *
* @param topicName the name of the topic
* @return this builder instance so methods can be chained together; never null
+ * @deprecated not part of public API and for internal usage only
*/
+ @Deprecated
public synchronized final TopologyBuilder addInternalTopic(final String topicName) {
- Objects.requireNonNull(topicName, "topicName can't be null");
- this.internalTopicNames.add(topicName);
-
+ internalTopologyBuilder.addInternalTopic(topicName);
return this;
}
/**
* Asserts that the streams of the specified source nodes must be copartitioned.
*
+ * NOTE this function would not needed by developers working with the processor APIs, but only used
+ * for the high-level DSL parsing functionalities.
+ *
* @param sourceNodes a set of source node names
* @return this builder instance so methods can be chained together; never null
+ * @deprecated not part of public API and for internal usage only
*/
+ @Deprecated
public synchronized final TopologyBuilder copartitionSources(final Collection<String> sourceNodes) {
- copartitionSourceGroups.add(Collections.unmodifiableSet(new HashSet<>(sourceNodes)));
+ internalTopologyBuilder.copartitionSources(sourceNodes);
return this;
}
- private void connectProcessorAndStateStore(final String processorName, final String stateStoreName) {
- if (!stateFactories.containsKey(stateStoreName))
- throw new TopologyBuilderException("StateStore " + stateStoreName + " is not added yet.");
- if (!nodeFactories.containsKey(processorName))
- throw new TopologyBuilderException("Processor " + processorName + " is not added yet.");
-
- final StateStoreFactory stateStoreFactory = stateFactories.get(stateStoreName);
- final Iterator<String> iter = stateStoreFactory.users.iterator();
- if (iter.hasNext()) {
- final String user = iter.next();
- nodeGrouper.unite(user, processorName);
- }
- stateStoreFactory.users.add(processorName);
-
- NodeFactory nodeFactory = nodeFactories.get(processorName);
- if (nodeFactory instanceof ProcessorNodeFactory) {
- final ProcessorNodeFactory processorNodeFactory = (ProcessorNodeFactory) nodeFactory;
- processorNodeFactory.addStateStore(stateStoreName);
- connectStateStoreNameToSourceTopicsOrPattern(stateStoreName, processorNodeFactory);
- } else {
- throw new TopologyBuilderException("cannot connect a state store " + stateStoreName + " to a source node or a sink node.");
- }
- }
-
- private Set<SourceNodeFactory> findSourcesForProcessorParents(final String[] parents) {
- final Set<SourceNodeFactory> sourceNodes = new HashSet<>();
- for (String parent : parents) {
- final NodeFactory nodeFactory = nodeFactories.get(parent);
- if (nodeFactory instanceof SourceNodeFactory) {
- sourceNodes.add((SourceNodeFactory) nodeFactory);
- } else if (nodeFactory instanceof ProcessorNodeFactory) {
- sourceNodes.addAll(findSourcesForProcessorParents(((ProcessorNodeFactory) nodeFactory).parents));
- }
- }
- return sourceNodes;
- }
-
- private void connectStateStoreNameToSourceTopicsOrPattern(final String stateStoreName,
- final ProcessorNodeFactory processorNodeFactory) {
-
- // we should never update the mapping from state store names to source topics if the store name already exists
- // in the map; this scenario is possible, for example, that a state store underlying a source KTable is
- // connecting to a join operator whose source topic is not the original KTable's source topic but an internal repartition topic.
-
- if (stateStoreNameToSourceTopics.containsKey(stateStoreName) || stateStoreNameToSourceRegex.containsKey(stateStoreName)) {
- return;
- }
-
- final Set<String> sourceTopics = new HashSet<>();
- final Set<Pattern> sourcePatterns = new HashSet<>();
- final Set<SourceNodeFactory> sourceNodesForParent = findSourcesForProcessorParents(processorNodeFactory.parents);
-
- for (SourceNodeFactory sourceNodeFactory : sourceNodesForParent) {
- if (sourceNodeFactory.pattern != null) {
- sourcePatterns.add(sourceNodeFactory.pattern);
- } else {
- sourceTopics.addAll(sourceNodeFactory.topics);
- }
- }
-
- if (!sourceTopics.isEmpty()) {
- stateStoreNameToSourceTopics.put(stateStoreName,
- Collections.unmodifiableSet(sourceTopics));
- }
-
- if (!sourcePatterns.isEmpty()) {
- stateStoreNameToSourceRegex.put(stateStoreName,
- Collections.unmodifiableSet(sourcePatterns));
- }
-
- }
-
-
- private <T> void maybeAddToResetList(final Collection<T> earliestResets, final Collection<T> latestResets, final AutoOffsetReset offsetReset, final T item) {
- if (offsetReset != null) {
- switch (offsetReset) {
- case EARLIEST:
- earliestResets.add(item);
- break;
- case LATEST:
- latestResets.add(item);
- break;
- default:
- throw new TopologyBuilderException(String.format("Unrecognized reset format %s", offsetReset));
- }
- }
- }
-
/**
* Returns the map of node groups keyed by the topic group id.
*
+ * NOTE this function would not needed by developers working with the processor APIs, but only used
+ * for the high-level DSL parsing functionalities.
+ *
* @return groups of node names
+ * @deprecated not part of public API and for internal usage only
*/
+ @Deprecated
public synchronized Map<Integer, Set<String>> nodeGroups() {
- if (nodeGroups == null)
- nodeGroups = makeNodeGroups();
-
- return nodeGroups;
- }
-
- private Map<Integer, Set<String>> makeNodeGroups() {
- final HashMap<Integer, Set<String>> nodeGroups = new LinkedHashMap<>();
- final HashMap<String, Set<String>> rootToNodeGroup = new HashMap<>();
-
- int nodeGroupId = 0;
-
- // Go through source nodes first. This makes the group id assignment easy to predict in tests
- final HashSet<String> allSourceNodes = new HashSet<>(nodeToSourceTopics.keySet());
- allSourceNodes.addAll(nodeToSourcePatterns.keySet());
-
- for (String nodeName : Utils.sorted(allSourceNodes)) {
- final String root = nodeGrouper.root(nodeName);
- Set<String> nodeGroup = rootToNodeGroup.get(root);
- if (nodeGroup == null) {
- nodeGroup = new HashSet<>();
- rootToNodeGroup.put(root, nodeGroup);
- nodeGroups.put(nodeGroupId++, nodeGroup);
- }
- nodeGroup.add(nodeName);
- }
-
- // Go through non-source nodes
- for (String nodeName : Utils.sorted(nodeFactories.keySet())) {
- if (!nodeToSourceTopics.containsKey(nodeName)) {
- final String root = nodeGrouper.root(nodeName);
- Set<String> nodeGroup = rootToNodeGroup.get(root);
- if (nodeGroup == null) {
- nodeGroup = new HashSet<>();
- rootToNodeGroup.put(root, nodeGroup);
- nodeGroups.put(nodeGroupId++, nodeGroup);
- }
- nodeGroup.add(nodeName);
- }
- }
-
- return nodeGroups;
+ return internalTopologyBuilder.nodeGroups();
}
/**
* Build the topology for the specified topic group. This is called automatically when passing this builder into the
* {@link org.apache.kafka.streams.KafkaStreams#KafkaStreams(TopologyBuilder, org.apache.kafka.streams.StreamsConfig)} constructor.
*
+ * NOTE this function would not needed by developers working with the processor APIs, but only used
+ * for the high-level DSL parsing functionalities.
+ *
* @see org.apache.kafka.streams.KafkaStreams#KafkaStreams(TopologyBuilder, org.apache.kafka.streams.StreamsConfig)
+ * @deprecated not part of public API and for internal usage only
*/
+ @Deprecated
public synchronized ProcessorTopology build(final Integer topicGroupId) {
- Set<String> nodeGroup;
- if (topicGroupId != null) {
- nodeGroup = nodeGroups().get(topicGroupId);
- } else {
- // when topicGroupId is null, we build the full topology minus the global groups
- final Set<String> globalNodeGroups = globalNodeGroups();
- final Collection<Set<String>> values = nodeGroups().values();
- nodeGroup = new HashSet<>();
- for (Set<String> value : values) {
- nodeGroup.addAll(value);
- }
- nodeGroup.removeAll(globalNodeGroups);
-
-
- }
- return build(nodeGroup);
+ return internalTopologyBuilder.build(topicGroupId);
}
/**
* Builds the topology for any global state stores
+ *
+ * NOTE this function would not needed by developers working with the processor APIs, but only used
+ * for the high-level DSL parsing functionalities.
+ *
* @return ProcessorTopology
+ * @deprecated not part of public API and for internal usage only
*/
+ @Deprecated
public synchronized ProcessorTopology buildGlobalStateTopology() {
- final Set<String> globalGroups = globalNodeGroups();
- if (globalGroups.isEmpty()) {
- return null;
- }
- return build(globalGroups);
- }
-
- private Set<String> globalNodeGroups() {
- final Set<String> globalGroups = new HashSet<>();
- for (final Map.Entry<Integer, Set<String>> nodeGroup : nodeGroups().entrySet()) {
- final Set<String> nodes = nodeGroup.getValue();
- for (String node : nodes) {
- if (isGlobalSource(node)) {
- globalGroups.addAll(nodes);
- }
- }
- }
- return globalGroups;
- }
-
- private ProcessorTopology build(final Set<String> nodeGroup) {
- final List<ProcessorNode> processorNodes = new ArrayList<>(nodeFactories.size());
- final Map<String, ProcessorNode> processorMap = new HashMap<>();
- final Map<String, SourceNode> topicSourceMap = new HashMap<>();
- final Map<String, SinkNode> topicSinkMap = new HashMap<>();
- final Map<String, StateStore> stateStoreMap = new LinkedHashMap<>();
-
- // create processor nodes in a topological order ("nodeFactories" is already topologically sorted)
- for (NodeFactory factory : nodeFactories.values()) {
- if (nodeGroup == null || nodeGroup.contains(factory.name)) {
- final ProcessorNode node = factory.build();
- processorNodes.add(node);
- processorMap.put(node.name(), node);
-
- if (factory instanceof ProcessorNodeFactory) {
- for (String parent : ((ProcessorNodeFactory) factory).parents) {
- final ProcessorNode<?, ?> parentNode = processorMap.get(parent);
- parentNode.addChild(node);
- }
- for (String stateStoreName : ((ProcessorNodeFactory) factory).stateStoreNames) {
- if (!stateStoreMap.containsKey(stateStoreName)) {
- StateStore stateStore;
-
- if (stateFactories.containsKey(stateStoreName)) {
- final StateStoreSupplier supplier = stateFactories.get(stateStoreName).supplier;
- stateStore = supplier.get();
-
- // remember the changelog topic if this state store is change-logging enabled
- if (supplier.loggingEnabled() && !storeToChangelogTopic.containsKey(stateStoreName)) {
- final String changelogTopic = ProcessorStateManager.storeChangelogTopic(this.applicationId, stateStoreName);
- storeToChangelogTopic.put(stateStoreName, changelogTopic);
- }
- } else {
- stateStore = globalStateStores.get(stateStoreName);
- }
-
- stateStoreMap.put(stateStoreName, stateStore);
- }
- }
- } else if (factory instanceof SourceNodeFactory) {
- final SourceNodeFactory sourceNodeFactory = (SourceNodeFactory) factory;
- final List<String> topics = (sourceNodeFactory.pattern != null) ?
- sourceNodeFactory.getTopics(subscriptionUpdates.getUpdates()) :
- sourceNodeFactory.topics;
-
- for (String topic : topics) {
- if (internalTopicNames.contains(topic)) {
- // prefix the internal topic name with the application id
- topicSourceMap.put(decorateTopic(topic), (SourceNode) node);
- } else {
- topicSourceMap.put(topic, (SourceNode) node);
- }
- }
- } else if (factory instanceof SinkNodeFactory) {
- final SinkNodeFactory sinkNodeFactory = (SinkNodeFactory) factory;
-
- for (String parent : sinkNodeFactory.parents) {
- processorMap.get(parent).addChild(node);
- if (internalTopicNames.contains(sinkNodeFactory.topic)) {
- // prefix the internal topic name with the application id
- topicSinkMap.put(decorateTopic(sinkNodeFactory.topic), (SinkNode) node);
- } else {
- topicSinkMap.put(sinkNodeFactory.topic, (SinkNode) node);
- }
- }
- } else {
- throw new TopologyBuilderException("Unknown definition class: " + factory.getClass().getName());
- }
- }
- }
-
- return new ProcessorTopology(processorNodes, topicSourceMap, topicSinkMap, new ArrayList<>(stateStoreMap.values()), storeToChangelogTopic, new ArrayList<>(globalStateStores.values()));
+ return internalTopologyBuilder.buildGlobalStateTopology();
}
/**
* Get any global {@link StateStore}s that are part of the
* topology
+ *
+ * NOTE this function would not needed by developers working with the processor APIs, but only used
+ * for the high-level DSL parsing functionalities.
+ *
* @return map containing all global {@link StateStore}s
+ * @deprecated not part of public API and for internal usage only
*/
+ @Deprecated
public Map<String, StateStore> globalStateStores() {
- return Collections.unmodifiableMap(globalStateStores);
+ return internalTopologyBuilder.globalStateStores();
}
/**
* Returns the map of topic groups keyed by the group id.
* A topic group is a group of topics in the same task.
*
+ * NOTE this function would not needed by developers working with the processor APIs, but only used
+ * for the high-level DSL parsing functionalities.
+ *
* @return groups of topic names
+ * @deprecated not part of public API and for internal usage only
*/
+ @Deprecated
public synchronized Map<Integer, TopicsInfo> topicGroups() {
- final Map<Integer, TopicsInfo> topicGroups = new LinkedHashMap<>();
-
- if (nodeGroups == null)
- nodeGroups = makeNodeGroups();
-
- for (Map.Entry<Integer, Set<String>> entry : nodeGroups.entrySet()) {
- final Set<String> sinkTopics = new HashSet<>();
- final Set<String> sourceTopics = new HashSet<>();
- final Map<String, InternalTopicConfig> internalSourceTopics = new HashMap<>();
- final Map<String, InternalTopicConfig> stateChangelogTopics = new HashMap<>();
- for (String node : entry.getValue()) {
- // if the node is a source node, add to the source topics
- final List<String> topics = nodeToSourceTopics.get(node);
- if (topics != null) {
- // if some of the topics are internal, add them to the internal topics
- for (String topic : topics) {
- // skip global topic as they don't need partition assignment
- if (globalTopics.contains(topic)) {
- continue;
- }
- if (this.internalTopicNames.contains(topic)) {
- // prefix the internal topic name with the application id
- final String internalTopic = decorateTopic(topic);
- internalSourceTopics.put(internalTopic, new InternalTopicConfig(internalTopic,
- Collections.singleton(InternalTopicConfig.CleanupPolicy.delete),
- Collections.<String, String>emptyMap()));
- sourceTopics.add(internalTopic);
- } else {
- sourceTopics.add(topic);
- }
- }
- }
-
- // if the node is a sink node, add to the sink topics
- final String topic = nodeToSinkTopic.get(node);
- if (topic != null) {
- if (internalTopicNames.contains(topic)) {
- // prefix the change log topic name with the application id
- sinkTopics.add(decorateTopic(topic));
- } else {
- sinkTopics.add(topic);
- }
- }
-
- // if the node is connected to a state, add to the state topics
- for (StateStoreFactory stateFactory : stateFactories.values()) {
- final StateStoreSupplier supplier = stateFactory.supplier;
- if (supplier.loggingEnabled() && stateFactory.users.contains(node)) {
- final String name = ProcessorStateManager.storeChangelogTopic(applicationId, supplier.name());
- final InternalTopicConfig internalTopicConfig = createInternalTopicConfig(supplier, name);
- stateChangelogTopics.put(name, internalTopicConfig);
- }
- }
- }
- if (!sourceTopics.isEmpty()) {
- topicGroups.put(entry.getKey(), new TopicsInfo(
- Collections.unmodifiableSet(sinkTopics),
- Collections.unmodifiableSet(sourceTopics),
- Collections.unmodifiableMap(internalSourceTopics),
- Collections.unmodifiableMap(stateChangelogTopics)));
- }
- }
-
- return Collections.unmodifiableMap(topicGroups);
- }
-
- private void setRegexMatchedTopicsToSourceNodes() {
- if (subscriptionUpdates.hasUpdates()) {
- for (Map.Entry<String, Pattern> stringPatternEntry : nodeToSourcePatterns.entrySet()) {
- final SourceNodeFactory sourceNode = (SourceNodeFactory) nodeFactories.get(stringPatternEntry.getKey());
- //need to update nodeToSourceTopics with topics matched from given regex
- nodeToSourceTopics.put(stringPatternEntry.getKey(), sourceNode.getTopics(subscriptionUpdates.getUpdates()));
- log.debug("nodeToSourceTopics {}", nodeToSourceTopics);
- }
- }
- }
-
- private void setRegexMatchedTopicToStateStore() {
- if (subscriptionUpdates.hasUpdates()) {
- for (Map.Entry<String, Set<Pattern>> storePattern : stateStoreNameToSourceRegex.entrySet()) {
- final Set<String> updatedTopicsForStateStore = new HashSet<>();
- for (String subscriptionUpdateTopic : subscriptionUpdates.getUpdates()) {
- for (Pattern pattern : storePattern.getValue()) {
- if (pattern.matcher(subscriptionUpdateTopic).matches()) {
- updatedTopicsForStateStore.add(subscriptionUpdateTopic);
- }
- }
- }
- if (!updatedTopicsForStateStore.isEmpty()) {
- Collection<String> storeTopics = stateStoreNameToSourceTopics.get(storePattern.getKey());
- if (storeTopics != null) {
- updatedTopicsForStateStore.addAll(storeTopics);
- }
- stateStoreNameToSourceTopics.put(storePattern.getKey(), Collections.unmodifiableSet(updatedTopicsForStateStore));
- }
- }
- }
- }
-
- private InternalTopicConfig createInternalTopicConfig(final StateStoreSupplier<?> supplier, final String name) {
- if (!(supplier instanceof WindowStoreSupplier)) {
- return new InternalTopicConfig(name, Collections.singleton(InternalTopicConfig.CleanupPolicy.compact), supplier.logConfig());
- }
-
- final WindowStoreSupplier windowStoreSupplier = (WindowStoreSupplier) supplier;
- final InternalTopicConfig config = new InternalTopicConfig(name,
- Utils.mkSet(InternalTopicConfig.CleanupPolicy.compact,
- InternalTopicConfig.CleanupPolicy.delete),
- supplier.logConfig());
- config.setRetentionMs(windowStoreSupplier.retentionPeriod());
- return config;
+ return internalTopologyBuilder.topicGroups();
}
/**
* Get the Pattern to match all topics requiring to start reading from earliest available offset
+ *
+ * NOTE this function would not needed by developers working with the processor APIs, but only used
+ * for the high-level DSL parsing functionalities.
+ *
* @return the Pattern for matching all topics reading from earliest offset, never null
+ * @deprecated not part of public API and for internal usage only
*/
+ @Deprecated
public synchronized Pattern earliestResetTopicsPattern() {
- final List<String> topics = maybeDecorateInternalSourceTopics(earliestResetTopics);
- final Pattern earliestPattern = buildPatternForOffsetResetTopics(topics, earliestResetPatterns);
-
- ensureNoRegexOverlap(earliestPattern, latestResetPatterns, latestResetTopics);
-
- return earliestPattern;
+ return internalTopologyBuilder.earliestResetTopicsPattern();
}
/**
* Get the Pattern to match all topics requiring to start reading from latest available offset
+ *
+ * NOTE this function would not needed by developers working with the processor APIs, but only used
+ * for the high-level DSL parsing functionalities.
+ *
* @return the Pattern for matching all topics reading from latest offset, never null
+ * @deprecated not part of public API and for internal usage only
*/
+ @Deprecated
public synchronized Pattern latestResetTopicsPattern() {
- final List<String> topics = maybeDecorateInternalSourceTopics(latestResetTopics);
- final Pattern latestPattern = buildPatternForOffsetResetTopics(topics, latestResetPatterns);
-
- ensureNoRegexOverlap(latestPattern, earliestResetPatterns, earliestResetTopics);
-
- return latestPattern;
- }
-
- private void ensureNoRegexOverlap(final Pattern builtPattern, final Set<Pattern> otherPatterns, final Set<String> otherTopics) {
-
- for (Pattern otherPattern : otherPatterns) {
- if (builtPattern.pattern().contains(otherPattern.pattern())) {
- throw new TopologyBuilderException(String.format("Found overlapping regex [%s] against [%s] for a KStream with auto offset resets", otherPattern.pattern(), builtPattern.pattern()));
- }
- }
-
- for (String otherTopic : otherTopics) {
- if (builtPattern.matcher(otherTopic).matches()) {
- throw new TopologyBuilderException(String.format("Found overlapping regex [%s] matching topic [%s] for a KStream with auto offset resets", builtPattern.pattern(), otherTopic));
- }
- }
+ return internalTopologyBuilder.latestResetTopicsPattern();
}
/**
- * Builds a composite pattern out of topic names and Pattern object for matching topic names. If the provided
- * arrays are empty a Pattern.compile("") instance is returned.
+ * NOTE this function would not needed by developers working with the processor APIs, but only used
+ * for the high-level DSL parsing functionalities.
*
- * @param sourceTopics the name of source topics to add to a composite pattern
- * @param sourcePatterns Patterns for matching source topics to add to a composite pattern
- * @return a Pattern that is composed of the literal source topic names and any Patterns for matching source topics
- */
- private static synchronized Pattern buildPatternForOffsetResetTopics(final Collection<String> sourceTopics, final Collection<Pattern> sourcePatterns) {
- final StringBuilder builder = new StringBuilder();
-
- for (String topic : sourceTopics) {
- builder.append(topic).append("|");
- }
-
- for (Pattern sourcePattern : sourcePatterns) {
- builder.append(sourcePattern.pattern()).append("|");
- }
-
- if (builder.length() > 0) {
- builder.setLength(builder.length() - 1);
- return Pattern.compile(builder.toString());
- }
-
- return EMPTY_ZERO_LENGTH_PATTERN;
- }
-
- /**
* @return a mapping from state store name to a Set of source Topics.
+ * @deprecated not part of public API and for internal usage only
*/
+ @Deprecated
public Map<String, List<String>> stateStoreNameToSourceTopics() {
- final Map<String, List<String>> results = new HashMap<>();
- for (Map.Entry<String, Set<String>> entry : stateStoreNameToSourceTopics.entrySet()) {
- results.put(entry.getKey(), maybeDecorateInternalSourceTopics(entry.getValue()));
- }
- return results;
+ return internalTopologyBuilder.stateStoreNameToSourceTopics();
}
/**
* Returns the copartition groups.
* A copartition group is a group of source topics that are required to be copartitioned.
*
+ * NOTE this function would not needed by developers working with the processor APIs, but only used
+ * for the high-level DSL parsing functionalities.
+ *
* @return groups of topic names
+ * @deprecated not part of public API and for internal usage only
*/
+ @Deprecated
public synchronized Collection<Set<String>> copartitionGroups() {
- final List<Set<String>> list = new ArrayList<>(copartitionSourceGroups.size());
- for (Set<String> nodeNames : copartitionSourceGroups) {
- Set<String> copartitionGroup = new HashSet<>();
- for (String node : nodeNames) {
- final List<String> topics = nodeToSourceTopics.get(node);
- if (topics != null)
- copartitionGroup.addAll(maybeDecorateInternalSourceTopics(topics));
- }
- list.add(Collections.unmodifiableSet(copartitionGroup));
- }
- return Collections.unmodifiableList(list);
- }
-
- private List<String> maybeDecorateInternalSourceTopics(final Collection<String> sourceTopics) {
- final List<String> decoratedTopics = new ArrayList<>();
- for (String topic : sourceTopics) {
- if (internalTopicNames.contains(topic)) {
- decoratedTopics.add(decorateTopic(topic));
- } else {
- decoratedTopics.add(topic);
- }
- }
- return decoratedTopics;
- }
-
- private String decorateTopic(final String topic) {
- if (applicationId == null) {
- throw new TopologyBuilderException("there are internal topics and "
- + "applicationId hasn't been set. Call "
- + "setApplicationId first");
- }
-
- return applicationId + "-" + topic;
+ return internalTopologyBuilder.copartitionGroups();
}
+ /**
+ * NOTE this function would not needed by developers working with the processor APIs, but only used
+ * for the high-level DSL parsing functionalities.
+ *
+ * @deprecated not part of public API and for internal usage only
+ */
+ @Deprecated
public SubscriptionUpdates subscriptionUpdates() {
- return subscriptionUpdates;
+ return internalTopologyBuilder.subscriptionUpdates();
}
+ /**
+ * NOTE this function would not needed by developers working with the processor APIs, but only used
+ * for the high-level DSL parsing functionalities.
+ *
+ * @deprecated not part of public API and for internal usage only
+ */
+ @Deprecated
public synchronized Pattern sourceTopicPattern() {
- if (this.topicPattern == null) {
- final List<String> allSourceTopics = new ArrayList<>();
- if (!nodeToSourceTopics.isEmpty()) {
- for (List<String> topics : nodeToSourceTopics.values()) {
- allSourceTopics.addAll(maybeDecorateInternalSourceTopics(topics));
- }
- }
- Collections.sort(allSourceTopics);
-
- this.topicPattern = buildPatternForOffsetResetTopics(allSourceTopics, nodeToSourcePatterns.values());
- }
-
- return this.topicPattern;
- }
-
- public synchronized void updateSubscriptions(final SubscriptionUpdates subscriptionUpdates, final String threadId) {
- log.debug("stream-thread [{}] updating builder with {} topic(s) with possible matching regex subscription(s)", threadId, subscriptionUpdates);
- this.subscriptionUpdates = subscriptionUpdates;
- setRegexMatchedTopicsToSourceNodes();
- setRegexMatchedTopicToStateStore();
- }
-
- private boolean isGlobalSource(final String nodeName) {
- final NodeFactory nodeFactory = nodeFactories.get(nodeName);
-
- if (nodeFactory instanceof SourceNodeFactory) {
- final List<String> topics = ((SourceNodeFactory) nodeFactory).topics;
- if (topics != null && topics.size() == 1 && globalTopics.contains(topics.get(0))) {
- return true;
- }
- }
-
- return false;
- }
-
- TopologyDescription describe() {
- final TopologyDescription description = new TopologyDescription();
-
- describeSubtopologies(description);
- describeGlobalStores(description);
-
- return description;
- }
-
- private void describeSubtopologies(final TopologyDescription description) {
- for (final Map.Entry<Integer, Set<String>> nodeGroup : makeNodeGroups().entrySet()) {
-
- final Set<String> allNodesOfGroups = nodeGroup.getValue();
- final boolean isNodeGroupOfGlobalStores = nodeGroupContainsGlobalSourceNode(allNodesOfGroups);
-
- if (!isNodeGroupOfGlobalStores) {
- describeSubtopology(description, nodeGroup.getKey(), allNodesOfGroups);
- }
- }
+ return internalTopologyBuilder.sourceTopicPattern();
}
- private boolean nodeGroupContainsGlobalSourceNode(final Set<String> allNodesOfGroups) {
- for (final String node : allNodesOfGroups) {
- if (isGlobalSource(node)) {
- return true;
- }
- }
- return false;
- }
-
- private void describeSubtopology(final TopologyDescription description,
- final Integer subtopologyId,
- final Set<String> nodeNames) {
-
- final HashMap<String, TopologyDescription.AbstractNode> nodesByName = new HashMap<>();
-
- // add all nodes
- for (final String nodeName : nodeNames) {
- nodesByName.put(nodeName, nodeFactories.get(nodeName).describe());
- }
-
- // connect each node to its predecessors and successors
- for (final TopologyDescription.AbstractNode node : nodesByName.values()) {
- for (final String predecessorName : nodeFactories.get(node.name()).parents) {
- final TopologyDescription.AbstractNode predecessor = nodesByName.get(predecessorName);
- node.addPredecessor(predecessor);
- predecessor.addSuccessor(node);
- }
- }
-
- description.addSubtopology(new TopologyDescription.Subtopology(
- subtopologyId,
- new HashSet<TopologyDescription.Node>(nodesByName.values())));
- }
-
- private void describeGlobalStores(final TopologyDescription description) {
- for (final Map.Entry<Integer, Set<String>> nodeGroup : makeNodeGroups().entrySet()) {
- final Set<String> nodes = nodeGroup.getValue();
-
- final Iterator<String> it = nodes.iterator();
- while (it.hasNext()) {
- final String node = it.next();
-
- if (isGlobalSource(node)) {
- // we found a GlobalStore node group; those contain exactly two node: {sourceNode,processorNode}
- it.remove(); // remove sourceNode from group
- final String processorNode = nodes.iterator().next(); // get remaining processorNode
-
- description.addGlobalStore(new TopologyDescription.GlobalStore(
- node,
- processorNode,
- ((ProcessorNodeFactory) nodeFactories.get(processorNode)).stateStoreNames.iterator().next(),
- nodeToSourceTopics.get(node).get(0)
- ));
- break;
- }
- }
- }
+ /**
+ * NOTE this function would not needed by developers working with the processor APIs, but only used
+ * for the high-level DSL parsing functionalities.
+ *
+ * @deprecated not part of public API and for internal usage only
+ */
+ @Deprecated
+ public synchronized void updateSubscriptions(final SubscriptionUpdates subscriptionUpdates,
+ final String threadId) {
+ internalTopologyBuilder.updateSubscriptions(subscriptionUpdates, threadId);
}
}