You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by mj...@apache.org on 2018/04/20 13:22:34 UTC
[kafka] branch trunk updated: KAFKA-5253: Fixed TopologyTestDriver
to handle streams created with patterns (#4793)
This is an automated email from the ASF dual-hosted git repository.
mjsax pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new b510737 KAFKA-5253: Fixed TopologyTestDriver to handle streams created with patterns (#4793)
b510737 is described below
commit b510737e761e21e63e6ed15e5111358d7515fdd2
Author: Jagadesh Adireddi <ad...@gmail.com>
AuthorDate: Fri Apr 20 18:52:28 2018 +0530
KAFKA-5253: Fixed TopologyTestDriver to handle streams created with patterns (#4793)
Reviewers: Bill Bejeck <bi...@confluent.io>, Matthias J. Sax <ma...@confluent.io>
---
.../internals/InternalTopologyBuilder.java | 4 +
.../streams/kstream/internals/KStreamImplTest.java | 46 ++++++++++
.../org/apache/kafka/test/KStreamTestDriver.java | 13 ++-
.../apache/kafka/streams/TopologyTestDriver.java | 29 +++++-
.../kafka/streams/TopologyTestDriverTest.java | 100 ++++++++++++++++++++-
5 files changed, 186 insertions(+), 6 deletions(-)
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalTopologyBuilder.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalTopologyBuilder.java
index 535f035..b1d60a9 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalTopologyBuilder.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalTopologyBuilder.java
@@ -1880,4 +1880,8 @@ public class InternalTopologyBuilder {
subscriptionUpdates.updateTopics(topics);
updateSubscriptions(subscriptionUpdates, logPrefix);
}
+
+ public synchronized Set<String> getSourceTopicNames() {
+ return sourceTopicNames;
+ }
}
diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamImplTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamImplTest.java
index 2009806..ef65bb3 100644
--- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamImplTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamImplTest.java
@@ -53,6 +53,7 @@ import org.junit.Test;
import java.util.Arrays;
import java.util.Collections;
import java.util.concurrent.TimeUnit;
+import java.util.regex.Pattern;
import static org.hamcrest.CoreMatchers.equalTo;
import static org.hamcrest.core.IsInstanceOf.instanceOf;
@@ -542,4 +543,49 @@ public class KStreamImplTest {
assertEquals(Utils.mkList("A:aa", "B:bb", "C:cc", "D:dd", "E:ee", "F:ff", "G:gg", "H:hh"),
processorSupplier.processed);
}
+
+ @Test
+ public void shouldProcessFromSourceThatMatchPattern() {
+ final KStream<String, String> pattern2Source = builder.stream(Pattern.compile("topic-\\d"));
+
+ final MockProcessorSupplier<String, String> processorSupplier = new MockProcessorSupplier<>();
+ pattern2Source.process(processorSupplier);
+
+ driver.setUp(builder);
+ driver.setTime(0L);
+
+ driver.process("topic-3", "A", "aa");
+ driver.process("topic-4", "B", "bb");
+ driver.process("topic-5", "C", "cc");
+ driver.process("topic-6", "D", "dd");
+ driver.process("topic-7", "E", "ee");
+
+ assertEquals(Utils.mkList("A:aa", "B:bb", "C:cc", "D:dd", "E:ee"),
+ processorSupplier.processed);
+ }
+
+ @Test
+ public void shouldProcessFromSourcesThatMatchMultiplePattern() {
+ final String topic3 = "topic-without-pattern";
+
+ final KStream<String, String> pattern2Source1 = builder.stream(Pattern.compile("topic-\\d"));
+ final KStream<String, String> pattern2Source2 = builder.stream(Pattern.compile("topic-[A-Z]"));
+ final KStream<String, String> source3 = builder.stream(topic3);
+ final KStream<String, String> merged = pattern2Source1.merge(pattern2Source2).merge(source3);
+
+ final MockProcessorSupplier<String, String> processorSupplier = new MockProcessorSupplier<>();
+ merged.process(processorSupplier);
+
+ driver.setUp(builder);
+ driver.setTime(0L);
+
+ driver.process("topic-3", "A", "aa");
+ driver.process("topic-4", "B", "bb");
+ driver.process("topic-A", "C", "cc");
+ driver.process("topic-Z", "D", "dd");
+ driver.process(topic3, "E", "ee");
+
+ assertEquals(Utils.mkList("A:aa", "B:bb", "C:cc", "D:dd", "E:ee"),
+ processorSupplier.processed);
+ }
}
diff --git a/streams/src/test/java/org/apache/kafka/test/KStreamTestDriver.java b/streams/src/test/java/org/apache/kafka/test/KStreamTestDriver.java
index aebb849..eb137db 100644
--- a/streams/src/test/java/org/apache/kafka/test/KStreamTestDriver.java
+++ b/streams/src/test/java/org/apache/kafka/test/KStreamTestDriver.java
@@ -42,6 +42,7 @@ import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
+import java.util.regex.Pattern;
public class KStreamTestDriver extends ExternalResource {
@@ -184,9 +185,15 @@ public class KStreamTestDriver extends ExternalResource {
private ProcessorNode sourceNodeByTopicName(final String topicName) {
ProcessorNode topicNode = topology.source(topicName);
-
- if (topicNode == null && globalTopology != null) {
- topicNode = globalTopology.source(topicName);
+ if (topicNode == null) {
+ for (final String sourceTopic : topology.sourceTopics()) {
+ if (Pattern.compile(sourceTopic).matcher(topicName).matches()) {
+ return topology.source(sourceTopic);
+ }
+ }
+ if (globalTopology != null) {
+ topicNode = globalTopology.source(topicName);
+ }
}
return topicNode;
diff --git a/streams/test-utils/src/main/java/org/apache/kafka/streams/TopologyTestDriver.java b/streams/test-utils/src/main/java/org/apache/kafka/streams/TopologyTestDriver.java
index 7730bed..c03bf1a 100644
--- a/streams/test-utils/src/main/java/org/apache/kafka/streams/TopologyTestDriver.java
+++ b/streams/test-utils/src/main/java/org/apache/kafka/streams/TopologyTestDriver.java
@@ -34,6 +34,7 @@ import org.apache.kafka.common.serialization.Serializer;
import org.apache.kafka.common.utils.LogContext;
import org.apache.kafka.common.utils.Time;
import org.apache.kafka.streams.errors.LogAndContinueExceptionHandler;
+import org.apache.kafka.streams.errors.TopologyException;
import org.apache.kafka.streams.processor.ProcessorContext;
import org.apache.kafka.streams.processor.PunctuationType;
import org.apache.kafka.streams.processor.Punctuator;
@@ -75,6 +76,7 @@ import java.util.Queue;
import java.util.Set;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
+import java.util.regex.Pattern;
/**
* This class makes it easier to write tests to verify the behavior of topologies created with {@link Topology} or
@@ -328,7 +330,10 @@ public class TopologyTestDriver implements Closeable {
public void pipeInput(final ConsumerRecord<byte[], byte[]> consumerRecord) {
final String topicName = consumerRecord.topic();
- final TopicPartition topicPartition = partitionsByTopic.get(topicName);
+ if (!internalTopologyBuilder.getSourceTopicNames().isEmpty()) {
+ validateSourceTopicNameRegexPattern(consumerRecord.topic());
+ }
+ final TopicPartition topicPartition = getTopicPartition(topicName);
if (topicPartition != null) {
final long offset = offsetsByTopicPartition.get(topicPartition).incrementAndGet() - 1;
task.addRecords(topicPartition, Collections.singleton(new ConsumerRecord<>(
@@ -371,6 +376,28 @@ public class TopologyTestDriver implements Closeable {
}
}
+ private void validateSourceTopicNameRegexPattern(final String inputRecordTopic) {
+ for (final String sourceTopicName : internalTopologyBuilder.getSourceTopicNames()) {
+ if (!sourceTopicName.equals(inputRecordTopic) && Pattern.compile(sourceTopicName).matcher(inputRecordTopic).matches()) {
+ throw new TopologyException("Topology add source of type String for topic: " + sourceTopicName +
+ " cannot contain regex pattern for input record topic: " + inputRecordTopic +
+ " and hence cannot process the message.");
+ }
+ }
+ }
+
+ private TopicPartition getTopicPartition(final String topicName) {
+ final TopicPartition topicPartition = partitionsByTopic.get(topicName);
+ if (topicPartition == null) {
+ for (final Map.Entry<String, TopicPartition> entry : partitionsByTopic.entrySet()) {
+ if (Pattern.compile(entry.getKey()).matcher(topicName).matches()) {
+ return entry.getValue();
+ }
+ }
+ }
+ return topicPartition;
+ }
+
private void captureOutputRecords() {
// Capture all the records sent to the producer ...
final List<ProducerRecord<byte[], byte[]>> output = producer.history();
diff --git a/streams/test-utils/src/test/java/org/apache/kafka/streams/TopologyTestDriverTest.java b/streams/test-utils/src/test/java/org/apache/kafka/streams/TopologyTestDriverTest.java
index d757f33..077b8ca 100644
--- a/streams/test-utils/src/test/java/org/apache/kafka/streams/TopologyTestDriverTest.java
+++ b/streams/test-utils/src/test/java/org/apache/kafka/streams/TopologyTestDriverTest.java
@@ -28,6 +28,7 @@ import org.apache.kafka.common.serialization.StringSerializer;
import org.apache.kafka.common.utils.Bytes;
import org.apache.kafka.common.utils.SystemTime;
import org.apache.kafka.streams.kstream.Materialized;
+import org.apache.kafka.streams.errors.TopologyException;
import org.apache.kafka.streams.processor.Processor;
import org.apache.kafka.streams.processor.ProcessorContext;
import org.apache.kafka.streams.processor.ProcessorSupplier;
@@ -54,6 +55,7 @@ import java.util.Map;
import java.util.Objects;
import java.util.Properties;
import java.util.Set;
+import java.util.regex.Pattern;
import static org.hamcrest.CoreMatchers.equalTo;
import static org.junit.Assert.assertEquals;
@@ -902,11 +904,11 @@ public class TopologyTestDriverTest {
);
}
}
-
+
@Test
public void shouldFeedStoreFromGlobalKTable() {
final StreamsBuilder builder = new StreamsBuilder();
- builder.globalTable("topic",
+ builder.globalTable("topic",
Consumed.with(Serdes.String(), Serdes.String()),
Materialized.<String, String, KeyValueStore<Bytes, byte[]>>as("globalStore"));
try (final TopologyTestDriver testDriver = new TopologyTestDriver(builder.build(), config)) {
@@ -919,4 +921,98 @@ public class TopologyTestDriverTest {
Assert.assertEquals("value1", globalStore.get("k1"));
}
}
+
+ private Topology setupMultipleSourcesPatternTopology(final Pattern... sourceTopicPatternNames) {
+ final Topology topology = new Topology();
+
+ final String[] processorNames = new String[sourceTopicPatternNames.length];
+ int i = 0;
+ for (final Pattern sourceTopicPatternName : sourceTopicPatternNames) {
+ final String sourceName = sourceTopicPatternName + "-source";
+ final String processorName = sourceTopicPatternName + "-processor";
+ topology.addSource(sourceName, sourceTopicPatternName);
+ processorNames[i++] = processorName;
+ topology.addProcessor(processorName, new MockProcessorSupplier(), sourceName);
+ }
+ topology.addSink("sink-topic", SINK_TOPIC_1, processorNames);
+ return topology;
+ }
+
+ @Test
+ public void shouldProcessFromSourcesThatMatchMultiplePattern() {
+
+ final Pattern pattern2Source1 = Pattern.compile("source-topic-\\d");
+ final Pattern pattern2Source2 = Pattern.compile("source-topic-[A-Z]");
+ final String consumerTopic2 = "source-topic-Z";
+
+ final ConsumerRecord<byte[], byte[]> consumerRecord2 = consumerRecordFactory.create(consumerTopic2, key2, value2, timestamp2);
+
+ testDriver = new TopologyTestDriver(setupMultipleSourcesPatternTopology(pattern2Source1, pattern2Source2), config);
+
+ final List<Record> processedRecords1 = mockProcessors.get(0).processedRecords;
+ final List<Record> processedRecords2 = mockProcessors.get(1).processedRecords;
+
+ testDriver.pipeInput(consumerRecord1);
+
+ assertEquals(1, processedRecords1.size());
+ assertEquals(0, processedRecords2.size());
+
+ final Record record1 = processedRecords1.get(0);
+ final Record expectedResult1 = new Record(consumerRecord1);
+ expectedResult1.offset = 0L;
+ assertThat(record1, equalTo(expectedResult1));
+
+ testDriver.pipeInput(consumerRecord2);
+
+ assertEquals(1, processedRecords1.size());
+ assertEquals(1, processedRecords2.size());
+
+ final Record record2 = processedRecords2.get(0);
+ final Record expectedResult2 = new Record(consumerRecord2);
+ expectedResult2.offset = 0L;
+ assertThat(record2, equalTo(expectedResult2));
+ }
+
+ @Test
+ public void shouldProcessFromSourceThatMatchPattern() {
+ final String sourceName = "source";
+ final Pattern pattern2Source1 = Pattern.compile("source-topic-\\d");
+
+ final Topology topology = new Topology();
+
+ topology.addSource(sourceName, pattern2Source1);
+ topology.addSink("sink", SINK_TOPIC_1, sourceName);
+
+ testDriver = new TopologyTestDriver(topology, config);
+ testDriver.pipeInput(consumerRecord1);
+
+ final ProducerRecord outputRecord = testDriver.readOutput(SINK_TOPIC_1);
+ assertEquals(key1, outputRecord.key());
+ assertEquals(value1, outputRecord.value());
+ assertEquals(SINK_TOPIC_1, outputRecord.topic());
+ }
+
+ @Test
+ public void shouldThrowPatternNotValidForTopicNameException() {
+ final String sourceName = "source";
+ final String pattern2Source1 = "source-topic-\\d";
+
+ final Topology topology = new Topology();
+
+ topology.addSource(sourceName, pattern2Source1);
+ topology.addSink("sink", SINK_TOPIC_1, sourceName);
+
+ testDriver = new TopologyTestDriver(topology, config);
+ try {
+ testDriver.pipeInput(consumerRecord1);
+ } catch (final TopologyException exception) {
+ String str =
+ String.format(
+ "Invalid topology: Topology add source of type String for topic: %s cannot contain regex pattern for " +
+ "input record topic: %s and hence cannot process the message.",
+ pattern2Source1,
+ SOURCE_TOPIC_1);
+ assertEquals(str, exception.getMessage());
+ }
+ }
}
--
To stop receiving notification emails like this one, please contact
mjsax@apache.org.