You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by gu...@apache.org on 2017/03/30 22:44:59 UTC
kafka git commit: KAFKA-4791: unable to add state store with regex
matched topics
Repository: kafka
Updated Branches:
refs/heads/trunk 4e92fd5f7 -> 15e0234a5
KAFKA-4791: unable to add state store with regex matched topics
Fix for adding state stores with regex defined sources
Author: bbejeck <bb...@gmail.com>
Reviewers: Matthias J. Sax, Damian Guy, Guozhang Wang
Closes #2618 from bbejeck/KAFKA-4791_unable_to_add_statestore_regex_topics
Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/15e0234a
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/15e0234a
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/15e0234a
Branch: refs/heads/trunk
Commit: 15e0234a5f4976facd4cfe61b91cfcdec6f6083c
Parents: 4e92fd5
Author: Bill Bejeck <bb...@gmail.com>
Authored: Thu Mar 30 15:44:56 2017 -0700
Committer: Guozhang Wang <wa...@gmail.com>
Committed: Thu Mar 30 15:44:56 2017 -0700
----------------------------------------------------------------------
.../streams/processor/TopologyBuilder.java | 185 ++++++++++++-------
.../integration/RegexSourceIntegrationTest.java | 30 +++
.../streams/processor/TopologyBuilderTest.java | 34 ++++
3 files changed, 178 insertions(+), 71 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/kafka/blob/15e0234a/streams/src/main/java/org/apache/kafka/streams/processor/TopologyBuilder.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/TopologyBuilder.java b/streams/src/main/java/org/apache/kafka/streams/processor/TopologyBuilder.java
index 99f5d65..7c2ec4f 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/TopologyBuilder.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/TopologyBuilder.java
@@ -99,6 +99,10 @@ public class TopologyBuilder {
// are connected to these state stores
private final Map<String, Set<String>> stateStoreNameToSourceTopics = new HashMap<>();
+ // map from state store names to all the regex subscribed topics from source processors that
+ // are connected to these state stores
+ private final Map<String, Set<Pattern>> stateStoreNameToSourceRegex = new HashMap<>();
+
// map from state store names to this state store's corresponding changelog topic if possible,
// this is used in the extended KStreamBuilder.
private final Map<String, String> storeToChangelogTopic = new HashMap<>();
@@ -174,7 +178,7 @@ public class TopologyBuilder {
private SourceNodeFactory(String name, String[] topics, Pattern pattern, Deserializer<?> keyDeserializer, Deserializer<?> valDeserializer) {
super(name);
- this.topics = topics != null ? Arrays.asList(topics) : null;
+ this.topics = topics != null ? Arrays.asList(topics) : new ArrayList<String>();
this.pattern = pattern;
this.keyDeserializer = keyDeserializer;
this.valDeserializer = valDeserializer;
@@ -311,7 +315,7 @@ public class TopologyBuilder {
* @param applicationId the streams applicationId. Should be the same as set by
* {@link org.apache.kafka.streams.StreamsConfig#APPLICATION_ID_CONFIG}
*/
- public synchronized final TopologyBuilder setApplicationId(String applicationId) {
+ public synchronized final TopologyBuilder setApplicationId(final String applicationId) {
Objects.requireNonNull(applicationId, "applicationId can't be null");
this.applicationId = applicationId;
@@ -329,7 +333,7 @@ public class TopologyBuilder {
* @param topics the name of one or more Kafka topics that this source is to consume
* @return this builder instance so methods can be chained together; never null
*/
- public synchronized final TopologyBuilder addSource(String name, String... topics) {
+ public synchronized final TopologyBuilder addSource(final String name, final String... topics) {
return addSource(null, name, null, null, topics);
}
@@ -345,7 +349,7 @@ public class TopologyBuilder {
* @param topics the name of one or more Kafka topics that this source is to consume
* @return this builder instance so methods can be chained together; never null
*/
- public synchronized final TopologyBuilder addSource(AutoOffsetReset offsetReset, String name, String... topics) {
+ public synchronized final TopologyBuilder addSource(final AutoOffsetReset offsetReset, final String name, final String... topics) {
return addSource(offsetReset, name, null, null, topics);
}
@@ -362,7 +366,7 @@ public class TopologyBuilder {
* @param topicPattern regular expression pattern to match Kafka topics that this source is to consume
* @return this builder instance so methods can be chained together; never null
*/
- public synchronized final TopologyBuilder addSource(String name, Pattern topicPattern) {
+ public synchronized final TopologyBuilder addSource(final String name, final Pattern topicPattern) {
return addSource(null, name, null, null, topicPattern);
}
@@ -379,7 +383,7 @@ public class TopologyBuilder {
* @param topicPattern regular expression pattern to match Kafka topics that this source is to consume
* @return this builder instance so methods can be chained together; never null
*/
- public synchronized final TopologyBuilder addSource(AutoOffsetReset offsetReset, String name, Pattern topicPattern) {
+ public synchronized final TopologyBuilder addSource(final AutoOffsetReset offsetReset, final String name, final Pattern topicPattern) {
return addSource(offsetReset, name, null, null, topicPattern);
}
@@ -400,7 +404,7 @@ public class TopologyBuilder {
* @return this builder instance so methods can be chained together; never null
* @throws TopologyBuilderException if processor is already added or if topics have already been registered by another source
*/
- public synchronized final TopologyBuilder addSource(String name, Deserializer keyDeserializer, Deserializer valDeserializer, String... topics) {
+ public synchronized final TopologyBuilder addSource(final String name, final Deserializer keyDeserializer, final Deserializer valDeserializer, final String... topics) {
return addSource(null, name, keyDeserializer, valDeserializer, topics);
}
@@ -422,7 +426,7 @@ public class TopologyBuilder {
* @return this builder instance so methods can be chained together; never null
* @throws TopologyBuilderException if processor is already added or if topics have already been registered by another source
*/
- public synchronized final TopologyBuilder addSource(AutoOffsetReset offsetReset, String name, Deserializer keyDeserializer, Deserializer valDeserializer, String... topics) {
+ public synchronized final TopologyBuilder addSource(final AutoOffsetReset offsetReset, final String name, final Deserializer keyDeserializer, final Deserializer valDeserializer, final String... topics) {
if (topics.length == 0) {
throw new TopologyBuilderException("You must provide at least one topic");
}
@@ -540,7 +544,7 @@ public class TopologyBuilder {
* @throws TopologyBuilderException if processor is already added or if topics have already been registered by name
*/
- public synchronized final TopologyBuilder addSource(String name, Deserializer keyDeserializer, Deserializer valDeserializer, Pattern topicPattern) {
+ public synchronized final TopologyBuilder addSource(final String name, final Deserializer keyDeserializer, final Deserializer valDeserializer, final Pattern topicPattern) {
return addSource(null, name, keyDeserializer, valDeserializer, topicPattern);
}
@@ -566,7 +570,7 @@ public class TopologyBuilder {
* @throws TopologyBuilderException if processor is already added or if topics have already been registered by name
*/
- public synchronized final TopologyBuilder addSource(AutoOffsetReset offsetReset, String name, Deserializer keyDeserializer, Deserializer valDeserializer, Pattern topicPattern) {
+ public synchronized final TopologyBuilder addSource(final AutoOffsetReset offsetReset, final String name, final Deserializer keyDeserializer, final Deserializer valDeserializer, final Pattern topicPattern) {
Objects.requireNonNull(topicPattern, "topicPattern can't be null");
Objects.requireNonNull(name, "name can't be null");
@@ -604,7 +608,7 @@ public class TopologyBuilder {
* @see #addSink(String, String, Serializer, Serializer, String...)
* @see #addSink(String, String, Serializer, Serializer, StreamPartitioner, String...)
*/
- public synchronized final TopologyBuilder addSink(String name, String topic, String... parentNames) {
+ public synchronized final TopologyBuilder addSink(final String name, final String topic, final String... parentNames) {
return addSink(name, topic, null, null, parentNames);
}
@@ -631,7 +635,7 @@ public class TopologyBuilder {
* @see #addSink(String, String, Serializer, Serializer, String...)
* @see #addSink(String, String, Serializer, Serializer, StreamPartitioner, String...)
*/
- public synchronized final TopologyBuilder addSink(String name, String topic, StreamPartitioner partitioner, String... parentNames) {
+ public synchronized final TopologyBuilder addSink(final String name, final String topic, final StreamPartitioner partitioner, final String... parentNames) {
return addSink(name, topic, null, null, partitioner, parentNames);
}
@@ -654,7 +658,7 @@ public class TopologyBuilder {
* @see #addSink(String, String, StreamPartitioner, String...)
* @see #addSink(String, String, Serializer, Serializer, StreamPartitioner, String...)
*/
- public synchronized final TopologyBuilder addSink(String name, String topic, Serializer keySerializer, Serializer valSerializer, String... parentNames) {
+ public synchronized final TopologyBuilder addSink(final String name, final String topic, final Serializer keySerializer, final Serializer valSerializer, final String... parentNames) {
return addSink(name, topic, keySerializer, valSerializer, null, parentNames);
}
@@ -679,7 +683,7 @@ public class TopologyBuilder {
* @see #addSink(String, String, Serializer, Serializer, String...)
* @throws TopologyBuilderException if parent processor is not added yet, or if this processor's name is equal to the parent's name
*/
- public synchronized final <K, V> TopologyBuilder addSink(String name, String topic, Serializer<K> keySerializer, Serializer<V> valSerializer, StreamPartitioner<? super K, ? super V> partitioner, String... parentNames) {
+ public synchronized final <K, V> TopologyBuilder addSink(final String name, final String topic, final Serializer<K> keySerializer, final Serializer<V> valSerializer, final StreamPartitioner<? super K, ? super V> partitioner, final String... parentNames) {
Objects.requireNonNull(name, "name must not be null");
Objects.requireNonNull(topic, "topic must not be null");
if (nodeFactories.containsKey(name))
@@ -713,7 +717,7 @@ public class TopologyBuilder {
* @return this builder instance so methods can be chained together; never null
* @throws TopologyBuilderException if parent processor is not added yet, or if this processor's name is equal to the parent's name
*/
- public synchronized final TopologyBuilder addProcessor(String name, ProcessorSupplier supplier, String... parentNames) {
+ public synchronized final TopologyBuilder addProcessor(final String name, final ProcessorSupplier supplier, final String... parentNames) {
Objects.requireNonNull(name, "name must not be null");
Objects.requireNonNull(supplier, "supplier must not be null");
if (nodeFactories.containsKey(name))
@@ -742,7 +746,7 @@ public class TopologyBuilder {
* @return this builder instance so methods can be chained together; never null
* @throws TopologyBuilderException if state store supplier is already added
*/
- public synchronized final TopologyBuilder addStateStore(StateStoreSupplier supplier, String... processorNames) {
+ public synchronized final TopologyBuilder addStateStore(final StateStoreSupplier supplier, final String... processorNames) {
Objects.requireNonNull(supplier, "supplier can't be null");
if (stateFactories.containsKey(supplier.name())) {
throw new TopologyBuilderException("StateStore " + supplier.name() + " is already added.");
@@ -766,7 +770,7 @@ public class TopologyBuilder {
* @param stateStoreNames the names of state stores that the processor uses
* @return this builder instance so methods can be chained together; never null
*/
- public synchronized final TopologyBuilder connectProcessorAndStateStores(String processorName, String... stateStoreNames) {
+ public synchronized final TopologyBuilder connectProcessorAndStateStores(final String processorName, final String... stateStoreNames) {
Objects.requireNonNull(processorName, "processorName can't be null");
if (stateStoreNames != null) {
for (String stateStoreName : stateStoreNames) {
@@ -781,7 +785,7 @@ public class TopologyBuilder {
* This is used only for KStreamBuilder: when adding a KTable from a source topic,
* we need to add the topic as the KTable's materialized state store's changelog.
*/
- protected synchronized final TopologyBuilder connectSourceStoreAndTopic(String sourceStoreName, String topic) {
+ protected synchronized final TopologyBuilder connectSourceStoreAndTopic(final String sourceStoreName, final String topic) {
if (storeToChangelogTopic.containsKey(sourceStoreName)) {
throw new TopologyBuilderException("Source store " + sourceStoreName + " is already added.");
}
@@ -799,7 +803,7 @@ public class TopologyBuilder {
* @return this builder instance so methods can be chained together; never null
* @throws TopologyBuilderException if less than two processors are specified, or if one of the processors is not added yet
*/
- public synchronized final TopologyBuilder connectProcessors(String... processorNames) {
+ public synchronized final TopologyBuilder connectProcessors(final String... processorNames) {
if (processorNames.length < 2)
throw new TopologyBuilderException("At least two processors need to participate in the connection.");
@@ -822,7 +826,7 @@ public class TopologyBuilder {
* @param topicName the name of the topic
* @return this builder instance so methods can be chained together; never null
*/
- public synchronized final TopologyBuilder addInternalTopic(String topicName) {
+ public synchronized final TopologyBuilder addInternalTopic(final String topicName) {
Objects.requireNonNull(topicName, "topicName can't be null");
this.internalTopicNames.add(topicName);
@@ -835,69 +839,85 @@ public class TopologyBuilder {
* @param sourceNodes a set of source node names
* @return this builder instance so methods can be chained together; never null
*/
- public synchronized final TopologyBuilder copartitionSources(Collection<String> sourceNodes) {
+ public synchronized final TopologyBuilder copartitionSources(final Collection<String> sourceNodes) {
copartitionSourceGroups.add(Collections.unmodifiableSet(new HashSet<>(sourceNodes)));
return this;
}
- private void connectProcessorAndStateStore(String processorName, String stateStoreName) {
+ private void connectProcessorAndStateStore(final String processorName, final String stateStoreName) {
if (!stateFactories.containsKey(stateStoreName))
throw new TopologyBuilderException("StateStore " + stateStoreName + " is not added yet.");
if (!nodeFactories.containsKey(processorName))
throw new TopologyBuilderException("Processor " + processorName + " is not added yet.");
- StateStoreFactory stateStoreFactory = stateFactories.get(stateStoreName);
- Iterator<String> iter = stateStoreFactory.users.iterator();
+ final StateStoreFactory stateStoreFactory = stateFactories.get(stateStoreName);
+ final Iterator<String> iter = stateStoreFactory.users.iterator();
if (iter.hasNext()) {
- String user = iter.next();
+ final String user = iter.next();
nodeGrouper.unite(user, processorName);
}
stateStoreFactory.users.add(processorName);
NodeFactory nodeFactory = nodeFactories.get(processorName);
if (nodeFactory instanceof ProcessorNodeFactory) {
- ProcessorNodeFactory processorNodeFactory = (ProcessorNodeFactory) nodeFactory;
+ final ProcessorNodeFactory processorNodeFactory = (ProcessorNodeFactory) nodeFactory;
processorNodeFactory.addStateStore(stateStoreName);
- connectStateStoreNameToSourceTopics(stateStoreName, processorNodeFactory);
+ connectStateStoreNameToSourceTopicsOrPattern(stateStoreName, processorNodeFactory);
} else {
throw new TopologyBuilderException("cannot connect a state store " + stateStoreName + " to a source node or a sink node.");
}
}
- private Set<String> findSourceTopicsForProcessorParents(String[] parents) {
- final Set<String> sourceTopics = new HashSet<>();
+ private Set<SourceNodeFactory> findSourcesForProcessorParents(final String[] parents) {
+ final Set<SourceNodeFactory> sourceNodes = new HashSet<>();
for (String parent : parents) {
- NodeFactory nodeFactory = nodeFactories.get(parent);
+ final NodeFactory nodeFactory = nodeFactories.get(parent);
if (nodeFactory instanceof SourceNodeFactory) {
- sourceTopics.addAll(((SourceNodeFactory) nodeFactory).topics);
+ sourceNodes.add((SourceNodeFactory) nodeFactory);
} else if (nodeFactory instanceof ProcessorNodeFactory) {
- sourceTopics.addAll(findSourceTopicsForProcessorParents(((ProcessorNodeFactory) nodeFactory).parents));
+ sourceNodes.addAll(findSourcesForProcessorParents(((ProcessorNodeFactory) nodeFactory).parents));
}
}
- return sourceTopics;
+ return sourceNodes;
}
- private void connectStateStoreNameToSourceTopics(final String stateStoreName,
- final ProcessorNodeFactory processorNodeFactory) {
+ private void connectStateStoreNameToSourceTopicsOrPattern(final String stateStoreName,
+ final ProcessorNodeFactory processorNodeFactory) {
// we should never update the mapping from state store names to source topics if the store name already exists
// in the map; this scenario is possible, for example, that a state store underlying a source KTable is
// connecting to a join operator whose source topic is not the original KTable's source topic but an internal repartition topic.
- if (stateStoreNameToSourceTopics.containsKey(stateStoreName)) {
+
+ if (stateStoreNameToSourceTopics.containsKey(stateStoreName) || stateStoreNameToSourceRegex.containsKey(stateStoreName)) {
return;
}
- final Set<String> sourceTopics = findSourceTopicsForProcessorParents(processorNodeFactory.parents);
- if (sourceTopics.isEmpty()) {
- throw new TopologyBuilderException("can't find source topic for state store " +
- stateStoreName);
+ final Set<String> sourceTopics = new HashSet<>();
+ final Set<Pattern> sourcePatterns = new HashSet<>();
+ final Set<SourceNodeFactory> sourceNodesForParent = findSourcesForProcessorParents(processorNodeFactory.parents);
+
+ for (SourceNodeFactory sourceNodeFactory : sourceNodesForParent) {
+ if (sourceNodeFactory.pattern != null) {
+ sourcePatterns.add(sourceNodeFactory.pattern);
+ } else {
+ sourceTopics.addAll(sourceNodeFactory.topics);
+ }
+ }
+
+ if (!sourceTopics.isEmpty()) {
+ stateStoreNameToSourceTopics.put(stateStoreName,
+ Collections.unmodifiableSet(sourceTopics));
+ }
+
+ if (!sourcePatterns.isEmpty()) {
+ stateStoreNameToSourceRegex.put(stateStoreName,
+ Collections.unmodifiableSet(sourcePatterns));
}
- stateStoreNameToSourceTopics.put(stateStoreName,
- Collections.unmodifiableSet(sourceTopics));
+
}
- private <T> void maybeAddToResetList(Collection<T> earliestResets, Collection<T> latestResets, AutoOffsetReset offsetReset, T item) {
+ private <T> void maybeAddToResetList(final Collection<T> earliestResets, final Collection<T> latestResets, final AutoOffsetReset offsetReset, final T item) {
if (offsetReset != null) {
switch (offsetReset) {
case EARLIEST:
@@ -925,8 +945,8 @@ public class TopologyBuilder {
}
private Map<Integer, Set<String>> makeNodeGroups() {
- HashMap<Integer, Set<String>> nodeGroups = new LinkedHashMap<>();
- HashMap<String, Set<String>> rootToNodeGroup = new HashMap<>();
+ final HashMap<Integer, Set<String>> nodeGroups = new LinkedHashMap<>();
+ final HashMap<String, Set<String>> rootToNodeGroup = new HashMap<>();
int nodeGroupId = 0;
@@ -935,7 +955,7 @@ public class TopologyBuilder {
allSourceNodes.addAll(nodeToSourcePatterns.keySet());
for (String nodeName : Utils.sorted(allSourceNodes)) {
- String root = nodeGrouper.root(nodeName);
+ final String root = nodeGrouper.root(nodeName);
Set<String> nodeGroup = rootToNodeGroup.get(root);
if (nodeGroup == null) {
nodeGroup = new HashSet<>();
@@ -948,7 +968,7 @@ public class TopologyBuilder {
// Go through non-source nodes
for (String nodeName : Utils.sorted(nodeFactories.keySet())) {
if (!nodeToSourceTopics.containsKey(nodeName)) {
- String root = nodeGrouper.root(nodeName);
+ final String root = nodeGrouper.root(nodeName);
Set<String> nodeGroup = rootToNodeGroup.get(root);
if (nodeGroup == null) {
nodeGroup = new HashSet<>();
@@ -968,7 +988,7 @@ public class TopologyBuilder {
*
* @see org.apache.kafka.streams.KafkaStreams#KafkaStreams(TopologyBuilder, org.apache.kafka.streams.StreamsConfig)
*/
- public synchronized ProcessorTopology build(Integer topicGroupId) {
+ public synchronized ProcessorTopology build(final Integer topicGroupId) {
Set<String> nodeGroup;
if (topicGroupId != null) {
nodeGroup = nodeGroups().get(topicGroupId);
@@ -1016,12 +1036,12 @@ public class TopologyBuilder {
return globalGroups;
}
- private ProcessorTopology build(Set<String> nodeGroup) {
- List<ProcessorNode> processorNodes = new ArrayList<>(nodeFactories.size());
- Map<String, ProcessorNode> processorMap = new HashMap<>();
- Map<String, SourceNode> topicSourceMap = new HashMap<>();
- Map<String, SinkNode> topicSinkMap = new HashMap<>();
- Map<String, StateStore> stateStoreMap = new LinkedHashMap<>();
+ private ProcessorTopology build(final Set<String> nodeGroup) {
+ final List<ProcessorNode> processorNodes = new ArrayList<>(nodeFactories.size());
+ final Map<String, ProcessorNode> processorMap = new HashMap<>();
+ final Map<String, SourceNode> topicSourceMap = new HashMap<>();
+ final Map<String, SinkNode> topicSinkMap = new HashMap<>();
+ final Map<String, StateStore> stateStoreMap = new LinkedHashMap<>();
// create processor nodes in a topological order ("nodeFactories" is already topologically sorted)
for (NodeFactory factory : nodeFactories.values()) {
@@ -1032,7 +1052,7 @@ public class TopologyBuilder {
if (factory instanceof ProcessorNodeFactory) {
for (String parent : ((ProcessorNodeFactory) factory).parents) {
- ProcessorNode<?, ?> parentNode = processorMap.get(parent);
+ final ProcessorNode<?, ?> parentNode = processorMap.get(parent);
parentNode.addChild(node);
}
for (String stateStoreName : ((ProcessorNodeFactory) factory).stateStoreNames) {
@@ -1106,19 +1126,19 @@ public class TopologyBuilder {
* @return groups of topic names
*/
public synchronized Map<Integer, TopicsInfo> topicGroups() {
- Map<Integer, TopicsInfo> topicGroups = new LinkedHashMap<>();
+ final Map<Integer, TopicsInfo> topicGroups = new LinkedHashMap<>();
if (nodeGroups == null)
nodeGroups = makeNodeGroups();
for (Map.Entry<Integer, Set<String>> entry : nodeGroups.entrySet()) {
- Set<String> sinkTopics = new HashSet<>();
- Set<String> sourceTopics = new HashSet<>();
- Map<String, InternalTopicConfig> internalSourceTopics = new HashMap<>();
- Map<String, InternalTopicConfig> stateChangelogTopics = new HashMap<>();
+ final Set<String> sinkTopics = new HashSet<>();
+ final Set<String> sourceTopics = new HashSet<>();
+ final Map<String, InternalTopicConfig> internalSourceTopics = new HashMap<>();
+ final Map<String, InternalTopicConfig> stateChangelogTopics = new HashMap<>();
for (String node : entry.getValue()) {
// if the node is a source node, add to the source topics
- List<String> topics = nodeToSourceTopics.get(node);
+ final List<String> topics = nodeToSourceTopics.get(node);
if (topics != null) {
// if some of the topics are internal, add them to the internal topics
for (String topic : topics) {
@@ -1128,7 +1148,7 @@ public class TopologyBuilder {
}
if (this.internalTopicNames.contains(topic)) {
// prefix the internal topic name with the application id
- String internalTopic = decorateTopic(topic);
+ final String internalTopic = decorateTopic(topic);
internalSourceTopics.put(internalTopic, new InternalTopicConfig(internalTopic,
Collections.singleton(InternalTopicConfig.CleanupPolicy.delete),
Collections.<String, String>emptyMap()));
@@ -1140,7 +1160,7 @@ public class TopologyBuilder {
}
// if the node is a sink node, add to the sink topics
- String topic = nodeToSinkTopic.get(node);
+ final String topic = nodeToSinkTopic.get(node);
if (topic != null) {
if (internalTopicNames.contains(topic)) {
// prefix the change log topic name with the application id
@@ -1175,7 +1195,7 @@ public class TopologyBuilder {
private void setRegexMatchedTopicsToSourceNodes() {
if (subscriptionUpdates.hasUpdates()) {
for (Map.Entry<String, Pattern> stringPatternEntry : nodeToSourcePatterns.entrySet()) {
- SourceNodeFactory sourceNode = (SourceNodeFactory) nodeFactories.get(stringPatternEntry.getKey());
+ final SourceNodeFactory sourceNode = (SourceNodeFactory) nodeFactories.get(stringPatternEntry.getKey());
//need to update nodeToSourceTopics with topics matched from given regex
nodeToSourceTopics.put(stringPatternEntry.getKey(), sourceNode.getTopics(subscriptionUpdates.getUpdates()));
log.debug("nodeToSourceTopics {}", nodeToSourceTopics);
@@ -1183,6 +1203,28 @@ public class TopologyBuilder {
}
}
+ private void setRegexMatchedTopicToStateStore() {
+ if (subscriptionUpdates.hasUpdates()) {
+ for (Map.Entry<String, Set<Pattern>> storePattern : stateStoreNameToSourceRegex.entrySet()) {
+ final Set<String> updatedTopicsForStateStore = new HashSet<>();
+ for (String subscriptionUpdateTopic : subscriptionUpdates.getUpdates()) {
+ for (Pattern pattern : storePattern.getValue()) {
+ if (pattern.matcher(subscriptionUpdateTopic).matches()) {
+ updatedTopicsForStateStore.add(subscriptionUpdateTopic);
+ }
+ }
+ }
+ if (!updatedTopicsForStateStore.isEmpty()) {
+ Collection<String> storeTopics = stateStoreNameToSourceTopics.get(storePattern.getKey());
+ if (storeTopics != null) {
+ updatedTopicsForStateStore.addAll(storeTopics);
+ }
+ stateStoreNameToSourceTopics.put(storePattern.getKey(), Collections.unmodifiableSet(updatedTopicsForStateStore));
+ }
+ }
+ }
+ }
+
private InternalTopicConfig createInternalTopicConfig(final StateStoreSupplier<?> supplier, final String name) {
if (!(supplier instanceof WindowStoreSupplier)) {
return new InternalTopicConfig(name, Collections.singleton(InternalTopicConfig.CleanupPolicy.compact), supplier.logConfig());
@@ -1223,7 +1265,7 @@ public class TopologyBuilder {
return latestPattern;
}
- private void ensureNoRegexOverlap(Pattern builtPattern, Set<Pattern> otherPatterns, Set<String> otherTopics) {
+ private void ensureNoRegexOverlap(final Pattern builtPattern, final Set<Pattern> otherPatterns, final Set<String> otherTopics) {
for (Pattern otherPattern : otherPatterns) {
if (builtPattern.pattern().contains(otherPattern.pattern())) {
@@ -1246,8 +1288,8 @@ public class TopologyBuilder {
* @param sourcePatterns Patterns for matching source topics to add to a composite pattern
* @return a Pattern that is composed of the literal source topic names and any Patterns for matching source topics
*/
- private static synchronized Pattern buildPatternForOffsetResetTopics(Collection<String> sourceTopics, Collection<Pattern> sourcePatterns) {
- StringBuilder builder = new StringBuilder();
+ private static synchronized Pattern buildPatternForOffsetResetTopics(final Collection<String> sourceTopics, final Collection<Pattern> sourcePatterns) {
+ final StringBuilder builder = new StringBuilder();
for (String topic : sourceTopics) {
builder.append(topic).append("|");
@@ -1283,7 +1325,7 @@ public class TopologyBuilder {
* @return groups of topic names
*/
public synchronized Collection<Set<String>> copartitionGroups() {
- List<Set<String>> list = new ArrayList<>(copartitionSourceGroups.size());
+ final List<Set<String>> list = new ArrayList<>(copartitionSourceGroups.size());
for (Set<String> nodeNames : copartitionSourceGroups) {
Set<String> copartitionGroup = new HashSet<>();
for (String node : nodeNames) {
@@ -1308,7 +1350,7 @@ public class TopologyBuilder {
return decoratedTopics;
}
- private String decorateTopic(String topic) {
+ private String decorateTopic(final String topic) {
if (applicationId == null) {
throw new TopologyBuilderException("there are internal topics and "
+ "applicationId hasn't been set. Call "
@@ -1320,7 +1362,7 @@ public class TopologyBuilder {
public synchronized Pattern sourceTopicPattern() {
if (this.topicPattern == null) {
- List<String> allSourceTopics = new ArrayList<>();
+ final List<String> allSourceTopics = new ArrayList<>();
if (!nodeToSourceTopics.isEmpty()) {
for (List<String> topics : nodeToSourceTopics.values()) {
allSourceTopics.addAll(maybeDecorateInternalSourceTopics(topics));
@@ -1334,9 +1376,10 @@ public class TopologyBuilder {
return this.topicPattern;
}
- public synchronized void updateSubscriptions(SubscriptionUpdates subscriptionUpdates, String threadId) {
+ public synchronized void updateSubscriptions(final SubscriptionUpdates subscriptionUpdates, final String threadId) {
log.debug("stream-thread [{}] updating builder with {} topic(s) with possible matching regex subscription(s)", threadId, subscriptionUpdates);
this.subscriptionUpdates = subscriptionUpdates;
setRegexMatchedTopicsToSourceNodes();
+ setRegexMatchedTopicToStateStore();
}
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/15e0234a/streams/src/test/java/org/apache/kafka/streams/integration/RegexSourceIntegrationTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/RegexSourceIntegrationTest.java b/streams/src/test/java/org/apache/kafka/streams/integration/RegexSourceIntegrationTest.java
index a84a208..b671c4e 100644
--- a/streams/src/test/java/org/apache/kafka/streams/integration/RegexSourceIntegrationTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/integration/RegexSourceIntegrationTest.java
@@ -32,12 +32,15 @@ import org.apache.kafka.streams.integration.utils.EmbeddedKafkaCluster;
import org.apache.kafka.streams.integration.utils.IntegrationTestUtils;
import org.apache.kafka.streams.kstream.KStream;
import org.apache.kafka.streams.kstream.KStreamBuilder;
+import org.apache.kafka.streams.processor.ProcessorSupplier;
import org.apache.kafka.streams.processor.TaskId;
import org.apache.kafka.streams.processor.TopologyBuilder;
import org.apache.kafka.streams.processor.internals.DefaultKafkaClientSupplier;
import org.apache.kafka.streams.processor.internals.StreamTask;
import org.apache.kafka.streams.processor.internals.StreamThread;
import org.apache.kafka.streams.processor.internals.StreamsMetadataState;
+import org.apache.kafka.test.MockProcessorSupplier;
+import org.apache.kafka.test.MockStateStoreSupplier;
import org.apache.kafka.test.IntegrationTest;
import org.apache.kafka.test.StreamsTestUtils;
import org.apache.kafka.test.TestCondition;
@@ -55,11 +58,13 @@ import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.List;
+import java.util.Map;
import java.util.Properties;
import java.util.UUID;
import java.util.regex.Pattern;
import static org.hamcrest.CoreMatchers.equalTo;
+import static org.hamcrest.core.Is.is;
import static org.junit.Assert.assertThat;
import static org.junit.Assert.fail;
@@ -230,6 +235,31 @@ public class RegexSourceIntegrationTest {
streams.close();
}
+ @Test
+ public void shouldAddStateStoreToRegexDefinedSource() throws Exception {
+
+ ProcessorSupplier<String, String> processorSupplier = new MockProcessorSupplier<>();
+ MockStateStoreSupplier stateStoreSupplier = new MockStateStoreSupplier("testStateStore", false);
+
+ TopologyBuilder builder = new TopologyBuilder()
+ .addSource("ingest", Pattern.compile("topic-\\d+"))
+ .addProcessor("my-processor", processorSupplier, "ingest")
+ .addStateStore(stateStoreSupplier, "my-processor");
+
+
+ final KafkaStreams streams = new KafkaStreams(builder, streamsConfiguration);
+ streams.start();
+
+ final Properties producerConfig = TestUtils.producerConfig(CLUSTER.bootstrapServers(), StringSerializer.class, StringSerializer.class);
+
+ IntegrationTestUtils.produceValuesSynchronously(TOPIC_1, Arrays.asList("message for test"), producerConfig, mockTime);
+ streams.close();
+
+ Map<String, List<String>> stateStoreToSourceTopic = builder.stateStoreNameToSourceTopics();
+
+ assertThat(stateStoreToSourceTopic.get("testStateStore").get(0), is("topic-1"));
+ }
+
@Test
public void testShouldReadFromRegexAndNamedTopics() throws Exception {
http://git-wip-us.apache.org/repos/asf/kafka/blob/15e0234a/streams/src/test/java/org/apache/kafka/streams/processor/TopologyBuilderTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/TopologyBuilderTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/TopologyBuilderTest.java
index 88a420a..7c8b15f 100644
--- a/streams/src/test/java/org/apache/kafka/streams/processor/TopologyBuilderTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/processor/TopologyBuilderTest.java
@@ -51,6 +51,7 @@ import static org.apache.kafka.common.utils.Utils.mkList;
import static org.apache.kafka.common.utils.Utils.mkSet;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.assertFalse;
import static org.junit.Assert.fail;
public class TopologyBuilderTest {
@@ -697,4 +698,37 @@ public class TopologyBuilderTest {
assertTrue(topicGroups.get(2).sourceTopics.contains("topic-3"));
}
+
+ @SuppressWarnings("unchecked")
+ @Test
+ public void shouldConnectRegexMatchedTopicsToStateStore() throws Exception {
+
+ final TopologyBuilder topologyBuilder = new TopologyBuilder()
+ .addSource("ingest", Pattern.compile("topic-\\d+"))
+ .addProcessor("my-processor", new MockProcessorSupplier(), "ingest")
+ .addStateStore(new MockStateStoreSupplier("testStateStore", false), "my-processor");
+
+ final StreamPartitionAssignor.SubscriptionUpdates subscriptionUpdates = new StreamPartitionAssignor.SubscriptionUpdates();
+ final Field updatedTopicsField = subscriptionUpdates.getClass().getDeclaredField("updatedTopicSubscriptions");
+ updatedTopicsField.setAccessible(true);
+
+ final Set<String> updatedTopics = (Set<String>) updatedTopicsField.get(subscriptionUpdates);
+
+ updatedTopics.add("topic-2");
+ updatedTopics.add("topic-3");
+ updatedTopics.add("topic-A");
+
+ topologyBuilder.updateSubscriptions(subscriptionUpdates, "test-thread");
+ topologyBuilder.setApplicationId("test-app");
+
+ Map<String, List<String>> stateStoreAndTopics = topologyBuilder.stateStoreNameToSourceTopics();
+ List<String> topics = stateStoreAndTopics.get("testStateStore");
+
+ assertTrue("Expected to contain two topics", topics.size() == 2);
+
+ assertTrue(topics.contains("topic-2"));
+ assertTrue(topics.contains("topic-3"));
+ assertFalse(topics.contains("topic-A"));
+ }
+
}