You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by gu...@apache.org on 2016/06/16 02:20:47 UTC
kafka git commit: KAFKA-3443: support for adding sources to
KafkaStreams via Pattern.
Repository: kafka
Updated Branches:
refs/heads/trunk 1ef7b494b -> fb42558e2
KAFKA-3443: support for adding sources to KafkaStreams via Pattern.
This PR is the follow on to the closed PR #1410.
Author: bbejeck <bb...@gmail.com>
Reviewers: Guozhang Wang <wa...@gmail.com>
Closes #1477 from bbejeck/KAFKA-3443_streams_support_for_regex_sources
Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/fb42558e
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/fb42558e
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/fb42558e
Branch: refs/heads/trunk
Commit: fb42558e2500835722a4e5028896ddae4f407d6f
Parents: 1ef7b49
Author: bbejeck <bb...@gmail.com>
Authored: Wed Jun 15 19:20:43 2016 -0700
Committer: Guozhang Wang <wa...@gmail.com>
Committed: Wed Jun 15 19:20:43 2016 -0700
----------------------------------------------------------------------
.../java/org/apache/kafka/clients/Metadata.java | 19 +-
.../kafka/streams/kstream/KStreamBuilder.java | 39 ++
.../streams/processor/TopologyBuilder.java | 150 +++++++-
.../internals/StreamPartitionAssignor.java | 37 +-
.../processor/internals/StreamThread.java | 10 +-
.../integration/RegexSourceIntegrationTest.java | 365 +++++++++++++++++++
.../utils/EmbeddedSingleNodeKafkaCluster.java | 7 +-
.../integration/utils/KafkaEmbedded.java | 15 +
.../streams/processor/TopologyBuilderTest.java | 43 ++-
9 files changed, 664 insertions(+), 21 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/kafka/blob/fb42558e/clients/src/main/java/org/apache/kafka/clients/Metadata.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/Metadata.java b/clients/src/main/java/org/apache/kafka/clients/Metadata.java
index 54b19a3..3934627 100644
--- a/clients/src/main/java/org/apache/kafka/clients/Metadata.java
+++ b/clients/src/main/java/org/apache/kafka/clients/Metadata.java
@@ -12,6 +12,13 @@
*/
package org.apache.kafka.clients;
+import org.apache.kafka.common.Cluster;
+import org.apache.kafka.common.Node;
+import org.apache.kafka.common.PartitionInfo;
+import org.apache.kafka.common.errors.TimeoutException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
@@ -22,13 +29,6 @@ import java.util.List;
import java.util.Map;
import java.util.Set;
-import org.apache.kafka.common.Cluster;
-import org.apache.kafka.common.Node;
-import org.apache.kafka.common.PartitionInfo;
-import org.apache.kafka.common.errors.TimeoutException;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
/**
* A class encapsulating some of the logic around metadata.
* <p>
@@ -292,7 +292,10 @@ public final class Metadata {
unauthorizedTopics.retainAll(this.topics.keySet());
for (String topic : this.topics.keySet()) {
- partitionInfos.addAll(cluster.partitionsForTopic(topic));
+ List<PartitionInfo> partitionInfoList = cluster.partitionsForTopic(topic);
+ if (partitionInfoList != null) {
+ partitionInfos.addAll(partitionInfoList);
+ }
}
nodes = cluster.nodes();
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/fb42558e/streams/src/main/java/org/apache/kafka/streams/kstream/KStreamBuilder.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/KStreamBuilder.java b/streams/src/main/java/org/apache/kafka/streams/kstream/KStreamBuilder.java
index 9d90ba0..53b2f4e 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/KStreamBuilder.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/KStreamBuilder.java
@@ -26,6 +26,7 @@ import org.apache.kafka.streams.processor.TopologyBuilder;
import java.util.Collections;
import java.util.concurrent.atomic.AtomicInteger;
+import java.util.regex.Pattern;
/**
* {@link KStreamBuilder} is a subclass of {@link TopologyBuilder} that provides the Kafka Streams DSL
@@ -55,6 +56,22 @@ public class KStreamBuilder extends TopologyBuilder {
return stream(null, null, topics);
}
+
+ /**
+ * Create a {@link KStream} instance from the specified Pattern.
+ * The default deserializers specified in the config are used.
+ * <p>
+ * If multiple topics are matched by the specified pattern, the created stream will read data from all of them,
+ * and there is no ordering guarantee between records from different topics
+ *
+ * @param topicPattern the Pattern to match for topic names
+ * @return a {@link KStream} for topics matching the regex pattern.
+ */
+ public <K, V> KStream<K, V> stream(Pattern topicPattern) {
+ return stream(null, null, topicPattern);
+ }
+
+
/**
* Create a {@link KStream} instance from the specified topics.
* <p>
@@ -75,6 +92,28 @@ public class KStreamBuilder extends TopologyBuilder {
return new KStreamImpl<>(this, name, Collections.singleton(name));
}
+
+ /**
+ * Create a {@link KStream} instance from the specified Pattern.
+ * <p>
+ * If multiple topics are matched by the specified pattern, the created stream will read data from all of them,
+ * and there is no ordering guarantee between records from different topics.
+ *
+ * @param keySerde key serde used to read this source {@link KStream},
+ * if not specified the default serde defined in the configs will be used
+ * @param valSerde value serde used to read this source {@link KStream},
+ * if not specified the default serde defined in the configs will be used
+ * @param topicPattern the Pattern to match for topic names
+ * @return a {@link KStream} for the specified topics
+ */
+ public <K, V> KStream<K, V> stream(Serde<K> keySerde, Serde<V> valSerde, Pattern topicPattern) {
+ String name = newName(KStreamImpl.SOURCE_NAME);
+
+ addSource(name, keySerde == null ? null : keySerde.deserializer(), valSerde == null ? null : valSerde.deserializer(), topicPattern);
+
+ return new KStreamImpl<>(this, name, Collections.singleton(name));
+ }
+
/**
* Create a {@link KTable} instance for the specified topic.
* The default deserializers specified in the config are used.
http://git-wip-us.apache.org/repos/asf/kafka/blob/fb42558e/streams/src/main/java/org/apache/kafka/streams/processor/TopologyBuilder.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/TopologyBuilder.java b/streams/src/main/java/org/apache/kafka/streams/processor/TopologyBuilder.java
index 5425149..1743baf 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/TopologyBuilder.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/TopologyBuilder.java
@@ -27,6 +27,7 @@ import org.apache.kafka.streams.processor.internals.ProcessorTopology;
import org.apache.kafka.streams.processor.internals.QuickUnion;
import org.apache.kafka.streams.processor.internals.SinkNode;
import org.apache.kafka.streams.processor.internals.SourceNode;
+import org.apache.kafka.streams.processor.internals.StreamPartitionAssignor.SubscriptionUpdates;
import java.util.ArrayList;
import java.util.Arrays;
@@ -39,6 +40,7 @@ import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;
+import java.util.regex.Pattern;
/**
* A component that is used to build a {@link ProcessorTopology}. A topology contains an acyclic graph of sources, processors,
@@ -62,8 +64,14 @@ public class TopologyBuilder {
private final QuickUnion<String> nodeGrouper = new QuickUnion<>();
private final List<Set<String>> copartitionSourceGroups = new ArrayList<>();
private final HashMap<String, String[]> nodeToSourceTopics = new HashMap<>();
+ private final HashMap<String, Pattern> nodeToSourcePatterns = new LinkedHashMap<>();
+ private final HashMap<String, Pattern> topicToPatterns = new HashMap<>();
private final HashMap<String, String> nodeToSinkTopic = new HashMap<>();
+ private SubscriptionUpdates subscriptionUpdates = new SubscriptionUpdates();
private Map<Integer, Set<String>> nodeGroups = null;
+ private Pattern topicPattern;
+
+
private static class StateStoreFactory {
public final Set<String> users;
@@ -110,23 +118,49 @@ public class TopologyBuilder {
}
}
- private static class SourceNodeFactory extends NodeFactory {
- public final String[] topics;
+ private class SourceNodeFactory extends NodeFactory {
+ private final String[] topics;
+ public final Pattern pattern;
private Deserializer keyDeserializer;
private Deserializer valDeserializer;
- private SourceNodeFactory(String name, String[] topics, Deserializer keyDeserializer, Deserializer valDeserializer) {
+ private SourceNodeFactory(String name, String[] topics, Pattern pattern, Deserializer keyDeserializer, Deserializer valDeserializer) {
super(name);
- this.topics = topics.clone();
+ this.topics = topics != null ? topics.clone() : null;
+ this.pattern = pattern;
this.keyDeserializer = keyDeserializer;
this.valDeserializer = valDeserializer;
}
+ public String[] getTopics() {
+ return topics;
+ }
+
+ public String[] getTopics(Collection<String> subscribedTopics) {
+ List<String> matchedTopics = new ArrayList<>();
+ for (String update : subscribedTopics) {
+ if (this.pattern == topicToPatterns.get(update)) {
+ matchedTopics.add(update);
+ //not same pattern instance,but still matches not allowed
+ } else if (topicToPatterns.containsKey(update) && isMatch(update)) {
+ throw new TopologyBuilderException("Topic " + update + " already matched check for overlapping regex patterns");
+ } else if (isMatch(update)) {
+ topicToPatterns.put(update, this.pattern);
+ matchedTopics.add(update);
+ }
+ }
+ return matchedTopics.toArray(new String[matchedTopics.size()]);
+ }
+
@SuppressWarnings("unchecked")
@Override
public ProcessorNode build(String applicationId) {
return new SourceNode(name, keyDeserializer, valDeserializer);
}
+
+ private boolean isMatch(String topic) {
+ return this.pattern.matcher(topic).matches();
+ }
}
private class SinkNodeFactory extends NodeFactory {
@@ -193,7 +227,7 @@ public class TopologyBuilder {
public TopologyBuilder() {}
/**
- * Add a new source that consumes the named topics and forwards the records to child processor and/or sink nodes.
+ * Add a new source that consumes the named topics and forward the records to child processor and/or sink nodes.
* The source will use the {@link org.apache.kafka.streams.StreamsConfig#KEY_SERDE_CLASS_CONFIG default key deserializer} and
* {@link org.apache.kafka.streams.StreamsConfig#VALUE_SERDE_CLASS_CONFIG default value deserializer} specified in the
* {@link org.apache.kafka.streams.StreamsConfig stream configuration}.
@@ -207,6 +241,23 @@ public class TopologyBuilder {
return addSource(name, (Deserializer) null, (Deserializer) null, topics);
}
+
+ /**
+ * Add a new source that consumes from topics matching the given pattern
+ * and forward the records to child processor and/or sink nodes.
+ * The source will use the {@link org.apache.kafka.streams.StreamsConfig#KEY_SERDE_CLASS_CONFIG default key deserializer} and
+ * {@link org.apache.kafka.streams.StreamsConfig#VALUE_SERDE_CLASS_CONFIG default value deserializer} specified in the
+ * {@link org.apache.kafka.streams.StreamsConfig stream configuration}.
+ *
+ * @param name the unique name of the source used to reference this node when
+ * {@link #addProcessor(String, ProcessorSupplier, String...) adding processor children}.
+ * @param topicPattern regular expression pattern to match Kafka topics that this source is to consume
+ * @return this builder instance so methods can be chained together; never null
+ */
+ public final TopologyBuilder addSource(String name, Pattern topicPattern) {
+ return addSource(name, (Deserializer) null, (Deserializer) null, topicPattern);
+ }
+
/**
* Add a new source that consumes the named topics and forwards the records to child processor and/or sink nodes.
* The source will use the specified key and value deserializers.
@@ -231,10 +282,16 @@ public class TopologyBuilder {
if (sourceTopicNames.contains(topic))
throw new TopologyBuilderException("Topic " + topic + " has already been registered by another source.");
+ for (Pattern pattern : nodeToSourcePatterns.values()) {
+ if (pattern.matcher(topic).matches()) {
+ throw new TopologyBuilderException("Topic " + topic + " matches a Pattern already registered by another source.");
+ }
+ }
+
sourceTopicNames.add(topic);
}
- nodeFactories.put(name, new SourceNodeFactory(name, topics, keyDeserializer, valDeserializer));
+ nodeFactories.put(name, new SourceNodeFactory(name, topics, null, keyDeserializer, valDeserializer));
nodeToSourceTopics.put(name, topics.clone());
nodeGrouper.add(name);
@@ -242,6 +299,49 @@ public class TopologyBuilder {
}
/**
+ * Add a new source that consumes from topics matching the given pattern
+ * and forwards the records to child processor and/or sink nodes.
+ * The source will use the specified key and value deserializers. The provided
+ * de-/serializers will be used for all matched topics, so care should be taken to specify patterns for
+ * topics that share the same key-value data format.
+ *
+ * @param name the unique name of the source used to reference this node when
+ * {@link #addProcessor(String, ProcessorSupplier, String...) adding processor children}.
+ * @param keyDeserializer the {@link Deserializer key deserializer} used when consuming records; may be null if the source
+ * should use the {@link org.apache.kafka.streams.StreamsConfig#KEY_SERDE_CLASS_CONFIG default key deserializer} specified in the
+ * {@link org.apache.kafka.streams.StreamsConfig stream configuration}
+ * @param valDeserializer the {@link Deserializer value deserializer} used when consuming records; may be null if the source
+ * should use the {@link org.apache.kafka.streams.StreamsConfig#VALUE_SERDE_CLASS_CONFIG default value deserializer} specified in the
+ * {@link org.apache.kafka.streams.StreamsConfig stream configuration}
+ * @param topicPattern regular expression pattern to match Kafka topics that this source is to consume
+ * @return this builder instance so methods can be chained together; never null
+ * @throws TopologyBuilderException if processor is already added or if topics have already been registered by name
+ */
+
+ public final TopologyBuilder addSource(String name, Deserializer keyDeserializer, Deserializer valDeserializer, Pattern topicPattern) {
+
+ if (topicPattern == null) {
+ throw new TopologyBuilderException("Pattern can't be null");
+ }
+
+ if (nodeFactories.containsKey(name)) {
+ throw new TopologyBuilderException("Processor " + name + " is already added.");
+ }
+
+ for (String sourceTopicName : sourceTopicNames) {
+ if (topicPattern.matcher(sourceTopicName).matches()) {
+ throw new TopologyBuilderException("Pattern " + topicPattern + " will match a topic that has already been registered by another source.");
+ }
+ }
+
+ nodeToSourcePatterns.put(name, topicPattern);
+ nodeFactories.put(name, new SourceNodeFactory(name, null, topicPattern, keyDeserializer, valDeserializer));
+ nodeGrouper.add(name);
+
+ return this;
+ }
+
+ /**
* Add a new sink that forwards records from upstream parent processor and/or source nodes to the named Kafka topic.
* The sink will use the {@link org.apache.kafka.streams.StreamsConfig#KEY_SERDE_CLASS_CONFIG default key serializer} and
* {@link org.apache.kafka.streams.StreamsConfig#VALUE_SERDE_CLASS_CONFIG default value serializer} specified in the
@@ -504,9 +604,19 @@ public class TopologyBuilder {
public Map<Integer, TopicsInfo> topicGroups(String applicationId) {
Map<Integer, TopicsInfo> topicGroups = new HashMap<>();
+
+ if (subscriptionUpdates.hasUpdates()) {
+ for (Map.Entry<String, Pattern> stringPatternEntry : nodeToSourcePatterns.entrySet()) {
+ SourceNodeFactory sourceNode = (SourceNodeFactory) nodeFactories.get(stringPatternEntry.getKey());
+ //need to update nodeToSourceTopics with topics matched from given regex
+ nodeToSourceTopics.put(stringPatternEntry.getKey(), sourceNode.getTopics(subscriptionUpdates.getUpdates()));
+ }
+ }
+
if (nodeGroups == null)
nodeGroups = makeNodeGroups();
+
for (Map.Entry<Integer, Set<String>> entry : nodeGroups.entrySet()) {
Set<String> sinkTopics = new HashSet<>();
Set<String> sourceTopics = new HashSet<>();
@@ -677,7 +787,9 @@ public class TopologyBuilder {
}
}
} else if (factory instanceof SourceNodeFactory) {
- for (String topic : ((SourceNodeFactory) factory).topics) {
+ SourceNodeFactory sourceNodeFactory = (SourceNodeFactory) factory;
+ String[] topics = (sourceNodeFactory.pattern != null) ? sourceNodeFactory.getTopics(subscriptionUpdates.getUpdates()) : sourceNodeFactory.getTopics();
+ for (String topic : topics) {
if (internalTopicNames.contains(topic)) {
// prefix the internal topic name with the application id
topicSourceMap.put(applicationId + "-" + topic, (SourceNode) node);
@@ -713,4 +825,28 @@ public class TopologyBuilder {
}
return Collections.unmodifiableSet(topics);
}
+
+ public Pattern sourceTopicPattern() {
+ if (this.topicPattern == null && !nodeToSourcePatterns.isEmpty()) {
+ StringBuilder builder = new StringBuilder();
+ for (Pattern pattern : nodeToSourcePatterns.values()) {
+ builder.append(pattern.pattern()).append("|");
+ }
+ if (!nodeToSourceTopics.isEmpty()) {
+ for (String[] topics : nodeToSourceTopics.values()) {
+ for (String topic : topics) {
+ builder.append(topic).append("|");
+ }
+ }
+ }
+
+ builder.setLength(builder.length() - 1);
+ this.topicPattern = Pattern.compile(builder.toString());
+ }
+ return this.topicPattern;
+ }
+
+ public void updateSubscriptions(SubscriptionUpdates subscriptionUpdates) {
+ this.subscriptionUpdates = subscriptionUpdates;
+ }
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/fb42558e/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamPartitionAssignor.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamPartitionAssignor.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamPartitionAssignor.java
index 085ff94..adefab9 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamPartitionAssignor.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamPartitionAssignor.java
@@ -118,8 +118,6 @@ public class StreamPartitionAssignor implements PartitionAssignor, Configurable
streamThread = (StreamThread) o;
streamThread.partitionAssignor(this);
- this.topicGroups = streamThread.builder.topicGroups(streamThread.applicationId);
-
if (configs.containsKey(StreamsConfig.ZOOKEEPER_CONNECT_CONFIG)) {
internalTopicManager = new InternalTopicManager(
(String) configs.get(StreamsConfig.ZOOKEEPER_CONNECT_CONFIG),
@@ -228,12 +226,17 @@ public class StreamPartitionAssignor implements PartitionAssignor, Configurable
// 2. within each client, tasks are assigned to consumer clients in round-robin manner.
Map<UUID, Set<String>> consumersByClient = new HashMap<>();
Map<UUID, ClientState<TaskId>> states = new HashMap<>();
-
+ SubscriptionUpdates subscriptionUpdates = new SubscriptionUpdates();
// decode subscription info
for (Map.Entry<String, Subscription> entry : subscriptions.entrySet()) {
String consumerId = entry.getKey();
Subscription subscription = entry.getValue();
+ if (streamThread.builder.sourceTopicPattern() != null) {
+ // update the topic groups with the returned subscription list for regex pattern subscriptions
+ subscriptionUpdates.updateTopics(subscription.topics());
+ }
+
SubscriptionInfo info = SubscriptionInfo.decode(subscription.userData());
Set<String> consumers = consumersByClient.get(info.processId);
@@ -255,6 +258,9 @@ public class StreamPartitionAssignor implements PartitionAssignor, Configurable
state.capacity = state.capacity + 1d;
}
+ streamThread.builder.updateSubscriptions(subscriptionUpdates);
+ this.topicGroups = streamThread.builder.topicGroups(streamThread.applicationId);
+
// ensure the co-partitioning topics within the group have the same number of partitions,
// and enforce the number of partitions for those internal topics.
internalSourceTopicToTaskIds = new HashMap<>();
@@ -486,4 +492,29 @@ public class StreamPartitionAssignor implements PartitionAssignor, Configurable
public void setInternalTopicManager(InternalTopicManager internalTopicManager) {
this.internalTopicManager = internalTopicManager;
}
+
+ /**
+ * Used to capture subscribed topic via Patterns discovered during the
+ * partition assignment process.
+ */
+ public static class SubscriptionUpdates {
+
+ private final Set<String> updatedTopicSubscriptions = new HashSet<>();
+
+
+ private void updateTopics(Collection<String> topicNames) {
+ updatedTopicSubscriptions.clear();
+ updatedTopicSubscriptions.addAll(topicNames);
+ }
+
+ public Collection<String> getUpdates() {
+ return Collections.unmodifiableSet(new HashSet<>(updatedTopicSubscriptions));
+ }
+
+ public boolean hasUpdates() {
+ return !updatedTopicSubscriptions.isEmpty();
+ }
+
+ }
+
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/fb42558e/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
index 72eeef5..64127a8 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
@@ -62,6 +62,7 @@ import java.util.Set;
import java.util.UUID;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
+import java.util.regex.Pattern;
import static java.util.Collections.singleton;
@@ -78,6 +79,7 @@ public class StreamThread extends Thread {
protected final StreamsConfig config;
protected final TopologyBuilder builder;
protected final Set<String> sourceTopics;
+ protected final Pattern topicPattern;
protected final Producer<byte[], byte[]> producer;
protected final Consumer<byte[], byte[]> consumer;
protected final Consumer<byte[], byte[]> restoreConsumer;
@@ -160,6 +162,7 @@ public class StreamThread extends Thread {
this.config = config;
this.builder = builder;
this.sourceTopics = builder.sourceTopics(applicationId);
+ this.topicPattern = builder.sourceTopicPattern();
this.clientId = clientId;
this.processId = processId;
this.partitionGrouper = config.getConfiguredInstance(StreamsConfig.PARTITION_GROUPER_CLASS_CONFIG, PartitionGrouper.class);
@@ -283,7 +286,12 @@ public class StreamThread extends Thread {
long lastPoll = 0L;
boolean requiresPoll = true;
- consumer.subscribe(new ArrayList<>(sourceTopics), rebalanceListener);
+ if (topicPattern != null) {
+ consumer.subscribe(topicPattern, rebalanceListener);
+ } else {
+ consumer.subscribe(new ArrayList<>(sourceTopics), rebalanceListener);
+ }
+
while (stillRunning()) {
// try to fetch some records if necessary
http://git-wip-us.apache.org/repos/asf/kafka/blob/fb42558e/streams/src/test/java/org/apache/kafka/streams/integration/RegexSourceIntegrationTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/RegexSourceIntegrationTest.java b/streams/src/test/java/org/apache/kafka/streams/integration/RegexSourceIntegrationTest.java
new file mode 100644
index 0000000..7e18cff
--- /dev/null
+++ b/streams/src/test/java/org/apache/kafka/streams/integration/RegexSourceIntegrationTest.java
@@ -0,0 +1,365 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license
+ * agreements. See the NOTICE file distributed with this work for additional information regarding
+ * copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License. You may obtain a
+ * copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
+ * or implied. See the License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+
+package org.apache.kafka.streams.integration;
+
+import org.apache.kafka.clients.consumer.ConsumerConfig;
+import org.apache.kafka.clients.producer.ProducerConfig;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.metrics.Metrics;
+import org.apache.kafka.common.serialization.Serde;
+import org.apache.kafka.common.serialization.Serdes;
+import org.apache.kafka.common.serialization.StringDeserializer;
+import org.apache.kafka.common.serialization.StringSerializer;
+import org.apache.kafka.common.utils.SystemTime;
+import org.apache.kafka.common.utils.Time;
+import org.apache.kafka.streams.KafkaClientSupplier;
+import org.apache.kafka.streams.KafkaStreams;
+import org.apache.kafka.streams.KeyValue;
+import org.apache.kafka.streams.StreamsConfig;
+import org.apache.kafka.streams.integration.utils.EmbeddedSingleNodeKafkaCluster;
+import org.apache.kafka.streams.integration.utils.IntegrationTestUtils;
+import org.apache.kafka.streams.kstream.KStream;
+import org.apache.kafka.streams.kstream.KStreamBuilder;
+import org.apache.kafka.streams.processor.TaskId;
+import org.apache.kafka.streams.processor.TopologyBuilder;
+import org.apache.kafka.streams.processor.internals.DefaultKafkaClientSupplier;
+import org.apache.kafka.streams.processor.internals.StreamTask;
+import org.apache.kafka.streams.processor.internals.StreamThread;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.ClassRule;
+import org.junit.Test;
+
+import java.lang.reflect.Field;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
+import java.util.UUID;
+import java.util.regex.Pattern;
+
+import static org.hamcrest.CoreMatchers.equalTo;
+import static org.junit.Assert.assertThat;
+
+/**
+ * End-to-end integration test based on using regex and named topics for creating sources, using
+ * an embedded Kafka cluster.
+ */
+
+public class RegexSourceIntegrationTest {
+ @ClassRule
+ public static final EmbeddedSingleNodeKafkaCluster CLUSTER = new EmbeddedSingleNodeKafkaCluster();
+
+ private static final String TOPIC_1 = "topic-1";
+ private static final String TOPIC_2 = "topic-2";
+ private static final String TOPIC_A = "topic-A";
+ private static final String TOPIC_C = "topic-C";
+ private static final String TOPIC_Y = "topic-Y";
+ private static final String TOPIC_Z = "topic-Z";
+ private static final String FA_TOPIC = "fa";
+ private static final String FOO_TOPIC = "foo";
+
+ private static final int FIRST_UPDATE = 0;
+ private static final int SECOND_UPDATE = 1;
+
+ private static final String DEFAULT_OUTPUT_TOPIC = "outputTopic";
+ private Properties streamsConfiguration;
+
+
+ @BeforeClass
+ public static void startKafkaCluster() throws Exception {
+ CLUSTER.createTopic(TOPIC_1);
+ CLUSTER.createTopic(TOPIC_2);
+ CLUSTER.createTopic(TOPIC_A);
+ CLUSTER.createTopic(TOPIC_C);
+ CLUSTER.createTopic(TOPIC_Y);
+ CLUSTER.createTopic(TOPIC_Z);
+ CLUSTER.createTopic(FA_TOPIC);
+ CLUSTER.createTopic(FOO_TOPIC);
+
+ }
+
+ @Before
+ public void setUp() {
+ streamsConfiguration = getStreamsConfig();
+ }
+
+ @After
+ public void tearDown() throws Exception {
+ // Remove any state from previous test runs
+ IntegrationTestUtils.purgeLocalStreamsState(streamsConfiguration);
+ }
+
+ @Test
+ public void testRegexMatchesTopicsAWhenCreated() throws Exception {
+
+ final Serde<String> stringSerde = Serdes.String();
+
+ StreamsConfig streamsConfig = new StreamsConfig(streamsConfiguration);
+
+ CLUSTER.createTopic("TEST-TOPIC-1");
+
+ KStreamBuilder builder = new KStreamBuilder();
+
+ KStream<String, String> pattern1Stream = builder.stream(Pattern.compile("TEST-TOPIC-\\d"));
+
+ pattern1Stream.to(stringSerde, stringSerde, DEFAULT_OUTPUT_TOPIC);
+
+ KafkaStreams streams = new KafkaStreams(builder, streamsConfiguration);
+
+ Field streamThreadsField = streams.getClass().getDeclaredField("threads");
+ streamThreadsField.setAccessible(true);
+ StreamThread[] streamThreads = (StreamThread[]) streamThreadsField.get(streams);
+ StreamThread originalThread = streamThreads[0];
+
+ TestStreamThread testStreamThread = new TestStreamThread(builder, streamsConfig,
+ new DefaultKafkaClientSupplier(),
+ originalThread.applicationId, originalThread.clientId, originalThread.processId, new Metrics(), new SystemTime());
+
+ streamThreads[0] = testStreamThread;
+ streams.start();
+ testStreamThread.waitUntilTasksUpdated();
+
+ CLUSTER.createTopic("TEST-TOPIC-2");
+
+ testStreamThread.waitUntilTasksUpdated();
+
+ streams.close();
+
+ List<String> expectedFirstAssignment = Arrays.asList("TEST-TOPIC-1");
+ List<String> expectedSecondAssignment = Arrays.asList("TEST-TOPIC-1", "TEST-TOPIC-2");
+
+ assertThat(testStreamThread.assignedTopicPartitions.get(FIRST_UPDATE), equalTo(expectedFirstAssignment));
+ assertThat(testStreamThread.assignedTopicPartitions.get(SECOND_UPDATE), equalTo(expectedSecondAssignment));
+ }
+
+ @Test
+ public void testRegexMatchesTopicsAWhenDeleted() throws Exception {
+
+ final Serde<String> stringSerde = Serdes.String();
+
+ StreamsConfig streamsConfig = new StreamsConfig(streamsConfiguration);
+
+ CLUSTER.createTopic("TEST-TOPIC-A");
+ CLUSTER.createTopic("TEST-TOPIC-B");
+
+ KStreamBuilder builder = new KStreamBuilder();
+
+ KStream<String, String> pattern1Stream = builder.stream(Pattern.compile("TEST-TOPIC-[A-Z]"));
+
+ pattern1Stream.to(stringSerde, stringSerde, DEFAULT_OUTPUT_TOPIC);
+
+ KafkaStreams streams = new KafkaStreams(builder, streamsConfiguration);
+
+ Field streamThreadsField = streams.getClass().getDeclaredField("threads");
+ streamThreadsField.setAccessible(true);
+ StreamThread[] streamThreads = (StreamThread[]) streamThreadsField.get(streams);
+ StreamThread originalThread = streamThreads[0];
+
+ TestStreamThread testStreamThread = new TestStreamThread(builder, streamsConfig,
+ new DefaultKafkaClientSupplier(),
+ originalThread.applicationId, originalThread.clientId, originalThread.processId, new Metrics(), new SystemTime());
+
+ streamThreads[0] = testStreamThread;
+ streams.start();
+
+ testStreamThread.waitUntilTasksUpdated();
+
+ CLUSTER.deleteTopic("TEST-TOPIC-A");
+
+ testStreamThread.waitUntilTasksUpdated();
+
+ streams.close();
+
+ List<String> expectedFirstAssignment = Arrays.asList("TEST-TOPIC-A", "TEST-TOPIC-B");
+ List<String> expectedSecondAssignment = Arrays.asList("TEST-TOPIC-B");
+
+ assertThat(testStreamThread.assignedTopicPartitions.get(FIRST_UPDATE), equalTo(expectedFirstAssignment));
+ assertThat(testStreamThread.assignedTopicPartitions.get(SECOND_UPDATE), equalTo(expectedSecondAssignment));
+ }
+
+
+ @Test
+ public void testShouldReadFromRegexAndNamedTopics() throws Exception {
+
+ String topic1TestMessage = "topic-1 test";
+ String topic2TestMessage = "topic-2 test";
+ String topicATestMessage = "topic-A test";
+ String topicCTestMessage = "topic-C test";
+ String topicYTestMessage = "topic-Y test";
+ String topicZTestMessage = "topic-Z test";
+
+
+ final Serde<String> stringSerde = Serdes.String();
+
+ KStreamBuilder builder = new KStreamBuilder();
+
+ KStream<String, String> pattern1Stream = builder.stream(Pattern.compile("topic-\\d"));
+ KStream<String, String> pattern2Stream = builder.stream(Pattern.compile("topic-[A-D]"));
+ KStream<String, String> namedTopicsStream = builder.stream(TOPIC_Y, TOPIC_Z);
+
+ pattern1Stream.to(stringSerde, stringSerde, DEFAULT_OUTPUT_TOPIC);
+ pattern2Stream.to(stringSerde, stringSerde, DEFAULT_OUTPUT_TOPIC);
+ namedTopicsStream.to(stringSerde, stringSerde, DEFAULT_OUTPUT_TOPIC);
+
+ KafkaStreams streams = new KafkaStreams(builder, streamsConfiguration);
+ streams.start();
+
+ Properties producerConfig = getProducerConfig();
+
+ IntegrationTestUtils.produceValuesSynchronously(TOPIC_1, Arrays.asList(topic1TestMessage), producerConfig);
+ IntegrationTestUtils.produceValuesSynchronously(TOPIC_2, Arrays.asList(topic2TestMessage), producerConfig);
+ IntegrationTestUtils.produceValuesSynchronously(TOPIC_A, Arrays.asList(topicATestMessage), producerConfig);
+ IntegrationTestUtils.produceValuesSynchronously(TOPIC_C, Arrays.asList(topicCTestMessage), producerConfig);
+ IntegrationTestUtils.produceValuesSynchronously(TOPIC_Y, Arrays.asList(topicYTestMessage), producerConfig);
+ IntegrationTestUtils.produceValuesSynchronously(TOPIC_Z, Arrays.asList(topicZTestMessage), producerConfig);
+
+ Properties consumerConfig = getConsumerConfig();
+
+ List<String> expectedReceivedValues = Arrays.asList(topicATestMessage, topic1TestMessage, topic2TestMessage, topicCTestMessage, topicYTestMessage, topicZTestMessage);
+ List<KeyValue<String, String>> receivedKeyValues = IntegrationTestUtils.waitUntilMinKeyValueRecordsReceived(consumerConfig, DEFAULT_OUTPUT_TOPIC, 6);
+ List<String> actualValues = new ArrayList<>(6);
+
+ for (KeyValue<String, String> receivedKeyValue : receivedKeyValues) {
+ actualValues.add(receivedKeyValue.value);
+ }
+
+ streams.close();
+ Collections.sort(actualValues);
+ Collections.sort(expectedReceivedValues);
+ assertThat(actualValues, equalTo(expectedReceivedValues));
+ }
+
+ //TODO should be updated to expected = TopologyBuilderException after KAFKA-3708
+ @Test(expected = AssertionError.class)
+ public void testNoMessagesSentExceptionFromOverlappingPatterns() throws Exception {
+
+ String fooMessage = "fooMessage";
+ String fMessage = "fMessage";
+
+
+ final Serde<String> stringSerde = Serdes.String();
+
+ KStreamBuilder builder = new KStreamBuilder();
+
+
+ // overlapping patterns here, no messages should be sent as TopologyBuilderException
+ // will be thrown when the processor topology is built.
+
+ KStream<String, String> pattern1Stream = builder.stream(Pattern.compile("foo.*"));
+ KStream<String, String> pattern2Stream = builder.stream(Pattern.compile("f.*"));
+
+
+ pattern1Stream.to(stringSerde, stringSerde, DEFAULT_OUTPUT_TOPIC);
+ pattern2Stream.to(stringSerde, stringSerde, DEFAULT_OUTPUT_TOPIC);
+
+
+ // Remove any state from previous test runs
+ IntegrationTestUtils.purgeLocalStreamsState(streamsConfiguration);
+
+ KafkaStreams streams = new KafkaStreams(builder, streamsConfiguration);
+ streams.start();
+
+ Properties producerConfig = getProducerConfig();
+
+ IntegrationTestUtils.produceValuesSynchronously(FA_TOPIC, Arrays.asList(fMessage), producerConfig);
+ IntegrationTestUtils.produceValuesSynchronously(FOO_TOPIC, Arrays.asList(fooMessage), producerConfig);
+
+ Properties consumerConfig = getConsumerConfig();
+
+ try {
+ IntegrationTestUtils.waitUntilMinKeyValueRecordsReceived(consumerConfig, DEFAULT_OUTPUT_TOPIC, 2, 5000);
+ } finally {
+ streams.close();
+ }
+
+ }
+
+ private Properties getProducerConfig() {
+ Properties producerConfig = new Properties();
+ producerConfig.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, CLUSTER.bootstrapServers());
+ producerConfig.put(ProducerConfig.ACKS_CONFIG, "all");
+ producerConfig.put(ProducerConfig.RETRIES_CONFIG, 0);
+ producerConfig.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
+ producerConfig.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
+ return producerConfig;
+ }
+
+ private Properties getStreamsConfig() {
+ Properties streamsConfiguration = new Properties();
+ streamsConfiguration.put(StreamsConfig.APPLICATION_ID_CONFIG, "regex-source-integration-test");
+ streamsConfiguration.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, CLUSTER.bootstrapServers());
+ streamsConfiguration.put(StreamsConfig.ZOOKEEPER_CONNECT_CONFIG, CLUSTER.zKConnectString());
+ streamsConfiguration.put(ConsumerConfig.METADATA_MAX_AGE_CONFIG, "1000");
+ streamsConfiguration.put(StreamsConfig.KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());
+ streamsConfiguration.put(StreamsConfig.VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());
+ streamsConfiguration.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
+ streamsConfiguration.put(StreamsConfig.STATE_DIR_CONFIG, "/tmp/kafka-streams");
+ return streamsConfiguration;
+ }
+
+ private Properties getConsumerConfig() {
+ Properties consumerConfig = new Properties();
+ consumerConfig.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, CLUSTER.bootstrapServers());
+ consumerConfig.put(ConsumerConfig.GROUP_ID_CONFIG, "regex-source-integration-consumer");
+ consumerConfig.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
+ consumerConfig.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
+ consumerConfig.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
+
+ return consumerConfig;
+ }
+
+ private class TestStreamThread extends StreamThread {
+
+ public Map<Integer, List<String>> assignedTopicPartitions = new HashMap<>();
+ private int index = 0;
+ public volatile boolean streamTaskUpdated = false;
+
+ public TestStreamThread(TopologyBuilder builder, StreamsConfig config, KafkaClientSupplier clientSupplier, String applicationId, String clientId, UUID processId, Metrics metrics, Time time) {
+ super(builder, config, clientSupplier, applicationId, clientId, processId, metrics, time);
+ }
+
+ @Override
+ public StreamTask createStreamTask(TaskId id, Collection<TopicPartition> partitions) {
+ List<String> assignedTopics = new ArrayList<>();
+ for (TopicPartition partition : partitions) {
+ assignedTopics.add(partition.topic());
+ }
+ Collections.sort(assignedTopics);
+ streamTaskUpdated = true;
+ assignedTopicPartitions.put(index++, assignedTopics);
+ return super.createStreamTask(id, partitions);
+ }
+
+
+ void waitUntilTasksUpdated() {
+ long maxTimeMillis = 30000;
+ long startTime = System.currentTimeMillis();
+ while (!streamTaskUpdated && ((System.currentTimeMillis() - startTime) < maxTimeMillis)) {
+ //empty loop just waiting for update
+ }
+ streamTaskUpdated = false;
+ }
+
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/kafka/blob/fb42558e/streams/src/test/java/org/apache/kafka/streams/integration/utils/EmbeddedSingleNodeKafkaCluster.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/utils/EmbeddedSingleNodeKafkaCluster.java b/streams/src/test/java/org/apache/kafka/streams/integration/utils/EmbeddedSingleNodeKafkaCluster.java
index 34753ae..d3ba065 100644
--- a/streams/src/test/java/org/apache/kafka/streams/integration/utils/EmbeddedSingleNodeKafkaCluster.java
+++ b/streams/src/test/java/org/apache/kafka/streams/integration/utils/EmbeddedSingleNodeKafkaCluster.java
@@ -19,12 +19,12 @@ package org.apache.kafka.streams.integration.utils;
import kafka.server.KafkaConfig$;
import kafka.zk.EmbeddedZookeeper;
+import org.junit.rules.ExternalResource;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.IOException;
import java.util.Properties;
-import org.junit.rules.ExternalResource;
/**
* Runs an in-memory, "embedded" Kafka cluster with 1 ZooKeeper instance and 1 Kafka broker.
@@ -48,6 +48,7 @@ public class EmbeddedSingleNodeKafkaCluster extends ExternalResource {
log.debug("ZooKeeper instance is running at {}", zKConnectString());
brokerConfig.put(KafkaConfig$.MODULE$.ZkConnectProp(), zKConnectString());
brokerConfig.put(KafkaConfig$.MODULE$.PortProp(), DEFAULT_BROKER_PORT);
+ brokerConfig.put(KafkaConfig$.MODULE$.DeleteTopicEnableProp(), true);
log.debug("Starting a Kafka instance on port {} ...", brokerConfig.getProperty(KafkaConfig$.MODULE$.PortProp()));
broker = new KafkaEmbedded(brokerConfig);
@@ -125,4 +126,8 @@ public class EmbeddedSingleNodeKafkaCluster extends ExternalResource {
Properties topicConfig) {
broker.createTopic(topic, partitions, replication, topicConfig);
}
+
+ public void deleteTopic(String topic) {
+ broker.deleteTopic(topic);
+ }
}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/kafka/blob/fb42558e/streams/src/test/java/org/apache/kafka/streams/integration/utils/KafkaEmbedded.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/utils/KafkaEmbedded.java b/streams/src/test/java/org/apache/kafka/streams/integration/utils/KafkaEmbedded.java
index 348b46b..43b82d6 100644
--- a/streams/src/test/java/org/apache/kafka/streams/integration/utils/KafkaEmbedded.java
+++ b/streams/src/test/java/org/apache/kafka/streams/integration/utils/KafkaEmbedded.java
@@ -186,4 +186,19 @@ public class KafkaEmbedded {
AdminUtils.createTopic(zkUtils, topic, partitions, replication, topicConfig, RackAwareMode.Enforced$.MODULE$);
zkClient.close();
}
+
+ public void deleteTopic(String topic) {
+ log.debug("Deleting topic { name: {} }", topic);
+
+ ZkClient zkClient = new ZkClient(
+ zookeeperConnect(),
+ DEFAULT_ZK_SESSION_TIMEOUT_MS,
+ DEFAULT_ZK_CONNECTION_TIMEOUT_MS,
+ ZKStringSerializer$.MODULE$);
+ boolean isSecure = false;
+ ZkUtils zkUtils = new ZkUtils(zkClient, new ZkConnection(zookeeperConnect()), isSecure);
+ AdminUtils.deleteTopic(zkUtils, topic);
+ zkClient.close();
+ }
+
}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/kafka/blob/fb42558e/streams/src/test/java/org/apache/kafka/streams/processor/TopologyBuilderTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/TopologyBuilderTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/TopologyBuilderTest.java
index 9af313a..28acf09 100644
--- a/streams/src/test/java/org/apache/kafka/streams/processor/TopologyBuilderTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/processor/TopologyBuilderTest.java
@@ -18,10 +18,10 @@
package org.apache.kafka.streams.processor;
import org.apache.kafka.streams.errors.TopologyBuilderException;
+import org.apache.kafka.streams.processor.TopologyBuilder.TopicsInfo;
import org.apache.kafka.streams.processor.internals.ProcessorNode;
import org.apache.kafka.streams.processor.internals.ProcessorStateManager;
import org.apache.kafka.streams.processor.internals.ProcessorTopology;
-import org.apache.kafka.streams.processor.TopologyBuilder.TopicsInfo;
import org.apache.kafka.test.MockProcessorSupplier;
import org.apache.kafka.test.MockStateStoreSupplier;
import org.junit.Test;
@@ -33,6 +33,7 @@ import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
+import java.util.regex.Pattern;
import static org.apache.kafka.common.utils.Utils.mkList;
import static org.apache.kafka.common.utils.Utils.mkSet;
@@ -152,6 +153,46 @@ public class TopologyBuilderTest {
assertEquals(expected, builder.sourceTopics("X"));
}
+ @Test
+ public void testPatternSourceTopic() {
+ final TopologyBuilder builder = new TopologyBuilder();
+ Pattern expectedPattern = Pattern.compile("topic-\\d");
+ builder.addSource("source-1", expectedPattern);
+ assertEquals(expectedPattern.pattern(), builder.sourceTopicPattern().pattern());
+ }
+
+ @Test
+ public void testAddMoreThanOnePatternSourceNode() {
+ final TopologyBuilder builder = new TopologyBuilder();
+ Pattern expectedPattern = Pattern.compile("topics[A-Z]|.*-\\d");
+ builder.addSource("source-1", Pattern.compile("topics[A-Z]"));
+ builder.addSource("source-2", Pattern.compile(".*-\\d"));
+ assertEquals(expectedPattern.pattern(), builder.sourceTopicPattern().pattern());
+ }
+
+ @Test
+ public void testSubscribeTopicNameAndPattern() {
+ final TopologyBuilder builder = new TopologyBuilder();
+ Pattern expectedPattern = Pattern.compile(".*-\\d|topic-foo|topic-bar");
+ builder.addSource("source-1", "topic-foo", "topic-bar");
+ builder.addSource("source-2", Pattern.compile(".*-\\d"));
+ assertEquals(expectedPattern.pattern(), builder.sourceTopicPattern().pattern());
+ }
+
+ @Test(expected = TopologyBuilderException.class)
+ public void testPatternMatchesAlreadyProvidedTopicSource() {
+ final TopologyBuilder builder = new TopologyBuilder();
+ builder.addSource("source-1", "foo");
+ builder.addSource("source-2", Pattern.compile("f.*"));
+ }
+
+ @Test(expected = TopologyBuilderException.class)
+ public void testNamedTopicMatchesAlreadyProvidedPattern() {
+ final TopologyBuilder builder = new TopologyBuilder();
+ builder.addSource("source-1", Pattern.compile("f.*"));
+ builder.addSource("source-2", "foo");
+ }
+
@Test(expected = TopologyBuilderException.class)
public void testAddStateStoreWithNonExistingProcessor() {
final TopologyBuilder builder = new TopologyBuilder();