You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by mj...@apache.org on 2018/04/20 13:22:34 UTC

[kafka] branch trunk updated: KAFKA-5253: Fixed TopologyTestDriver to handle streams created with patterns (#4793)

This is an automated email from the ASF dual-hosted git repository.

mjsax pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new b510737  KAFKA-5253: Fixed TopologyTestDriver to handle streams created with patterns (#4793)
b510737 is described below

commit b510737e761e21e63e6ed15e5111358d7515fdd2
Author: Jagadesh Adireddi <ad...@gmail.com>
AuthorDate: Fri Apr 20 18:52:28 2018 +0530

    KAFKA-5253: Fixed TopologyTestDriver to handle streams created with patterns (#4793)
    
    Reviewers: Bill Bejeck <bi...@confluent.io>, Matthias J. Sax <ma...@confluent.io>
---
 .../internals/InternalTopologyBuilder.java         |   4 +
 .../streams/kstream/internals/KStreamImplTest.java |  46 ++++++++++
 .../org/apache/kafka/test/KStreamTestDriver.java   |  13 ++-
 .../apache/kafka/streams/TopologyTestDriver.java   |  29 +++++-
 .../kafka/streams/TopologyTestDriverTest.java      | 100 ++++++++++++++++++++-
 5 files changed, 186 insertions(+), 6 deletions(-)

diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalTopologyBuilder.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalTopologyBuilder.java
index 535f035..b1d60a9 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalTopologyBuilder.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalTopologyBuilder.java
@@ -1880,4 +1880,8 @@ public class InternalTopologyBuilder {
         subscriptionUpdates.updateTopics(topics);
         updateSubscriptions(subscriptionUpdates, logPrefix);
     }
+
+    public synchronized Set<String> getSourceTopicNames() {
+        return sourceTopicNames;
+    }
 }
diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamImplTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamImplTest.java
index 2009806..ef65bb3 100644
--- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamImplTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamImplTest.java
@@ -53,6 +53,7 @@ import org.junit.Test;
 import java.util.Arrays;
 import java.util.Collections;
 import java.util.concurrent.TimeUnit;
+import java.util.regex.Pattern;
 
 import static org.hamcrest.CoreMatchers.equalTo;
 import static org.hamcrest.core.IsInstanceOf.instanceOf;
@@ -542,4 +543,49 @@ public class KStreamImplTest {
         assertEquals(Utils.mkList("A:aa", "B:bb", "C:cc", "D:dd", "E:ee", "F:ff", "G:gg", "H:hh"),
                      processorSupplier.processed);
     }
+
+    @Test
+    public void shouldProcessFromSourceThatMatchPattern() {
+        final KStream<String, String> pattern2Source = builder.stream(Pattern.compile("topic-\\d"));
+
+        final MockProcessorSupplier<String, String> processorSupplier = new MockProcessorSupplier<>();
+        pattern2Source.process(processorSupplier);
+
+        driver.setUp(builder);
+        driver.setTime(0L);
+
+        driver.process("topic-3", "A", "aa");
+        driver.process("topic-4", "B", "bb");
+        driver.process("topic-5", "C", "cc");
+        driver.process("topic-6", "D", "dd");
+        driver.process("topic-7", "E", "ee");
+
+        assertEquals(Utils.mkList("A:aa", "B:bb", "C:cc", "D:dd", "E:ee"),
+                processorSupplier.processed);
+    }
+
+    @Test
+    public void shouldProcessFromSourcesThatMatchMultiplePattern() {
+        final String topic3 = "topic-without-pattern";
+
+        final KStream<String, String> pattern2Source1 = builder.stream(Pattern.compile("topic-\\d"));
+        final KStream<String, String> pattern2Source2 = builder.stream(Pattern.compile("topic-[A-Z]"));
+        final KStream<String, String> source3 = builder.stream(topic3);
+        final KStream<String, String> merged = pattern2Source1.merge(pattern2Source2).merge(source3);
+
+        final MockProcessorSupplier<String, String> processorSupplier = new MockProcessorSupplier<>();
+        merged.process(processorSupplier);
+
+        driver.setUp(builder);
+        driver.setTime(0L);
+
+        driver.process("topic-3", "A", "aa");
+        driver.process("topic-4", "B", "bb");
+        driver.process("topic-A", "C", "cc");
+        driver.process("topic-Z", "D", "dd");
+        driver.process(topic3, "E", "ee");
+
+        assertEquals(Utils.mkList("A:aa", "B:bb", "C:cc", "D:dd", "E:ee"),
+                processorSupplier.processed);
+    }
 }
diff --git a/streams/src/test/java/org/apache/kafka/test/KStreamTestDriver.java b/streams/src/test/java/org/apache/kafka/test/KStreamTestDriver.java
index aebb849..eb137db 100644
--- a/streams/src/test/java/org/apache/kafka/test/KStreamTestDriver.java
+++ b/streams/src/test/java/org/apache/kafka/test/KStreamTestDriver.java
@@ -42,6 +42,7 @@ import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
+import java.util.regex.Pattern;
 
 public class KStreamTestDriver extends ExternalResource {
 
@@ -184,9 +185,15 @@ public class KStreamTestDriver extends ExternalResource {
 
     private ProcessorNode sourceNodeByTopicName(final String topicName) {
         ProcessorNode topicNode = topology.source(topicName);
-
-        if (topicNode == null && globalTopology != null) {
-            topicNode = globalTopology.source(topicName);
+        if (topicNode == null) {
+            for (final String sourceTopic : topology.sourceTopics()) {
+                if (Pattern.compile(sourceTopic).matcher(topicName).matches()) {
+                    return topology.source(sourceTopic);
+                }
+            }
+            if (globalTopology != null) {
+                topicNode = globalTopology.source(topicName);
+            }
         }
 
         return topicNode;
diff --git a/streams/test-utils/src/main/java/org/apache/kafka/streams/TopologyTestDriver.java b/streams/test-utils/src/main/java/org/apache/kafka/streams/TopologyTestDriver.java
index 7730bed..c03bf1a 100644
--- a/streams/test-utils/src/main/java/org/apache/kafka/streams/TopologyTestDriver.java
+++ b/streams/test-utils/src/main/java/org/apache/kafka/streams/TopologyTestDriver.java
@@ -34,6 +34,7 @@ import org.apache.kafka.common.serialization.Serializer;
 import org.apache.kafka.common.utils.LogContext;
 import org.apache.kafka.common.utils.Time;
 import org.apache.kafka.streams.errors.LogAndContinueExceptionHandler;
+import org.apache.kafka.streams.errors.TopologyException;
 import org.apache.kafka.streams.processor.ProcessorContext;
 import org.apache.kafka.streams.processor.PunctuationType;
 import org.apache.kafka.streams.processor.Punctuator;
@@ -75,6 +76,7 @@ import java.util.Queue;
 import java.util.Set;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicLong;
+import java.util.regex.Pattern;
 
 /**
  * This class makes it easier to write tests to verify the behavior of topologies created with {@link Topology} or
@@ -328,7 +330,10 @@ public class TopologyTestDriver implements Closeable {
     public void pipeInput(final ConsumerRecord<byte[], byte[]> consumerRecord) {
         final String topicName = consumerRecord.topic();
 
-        final TopicPartition topicPartition = partitionsByTopic.get(topicName);
+        if (!internalTopologyBuilder.getSourceTopicNames().isEmpty()) {
+            validateSourceTopicNameRegexPattern(consumerRecord.topic());
+        }
+        final TopicPartition topicPartition = getTopicPartition(topicName);
         if (topicPartition != null) {
             final long offset = offsetsByTopicPartition.get(topicPartition).incrementAndGet() - 1;
             task.addRecords(topicPartition, Collections.singleton(new ConsumerRecord<>(
@@ -371,6 +376,28 @@ public class TopologyTestDriver implements Closeable {
         }
     }
 
+    private void validateSourceTopicNameRegexPattern(final String inputRecordTopic) {
+        for (final String sourceTopicName : internalTopologyBuilder.getSourceTopicNames()) {
+            if (!sourceTopicName.equals(inputRecordTopic) && Pattern.compile(sourceTopicName).matcher(inputRecordTopic).matches()) {
+                throw new TopologyException("Topology add source of type String for topic: " + sourceTopicName +
+                        " cannot contain regex pattern for input record topic: " + inputRecordTopic +
+                        " and hence cannot process the message.");
+            }
+        }
+    }
+
+    private TopicPartition getTopicPartition(final String topicName) {
+        final TopicPartition topicPartition = partitionsByTopic.get(topicName);
+        if (topicPartition == null) {
+            for (final Map.Entry<String, TopicPartition> entry : partitionsByTopic.entrySet()) {
+                if (Pattern.compile(entry.getKey()).matcher(topicName).matches()) {
+                    return entry.getValue();
+                }
+            }
+        }
+        return topicPartition;
+    }
+
     private void captureOutputRecords() {
         // Capture all the records sent to the producer ...
         final List<ProducerRecord<byte[], byte[]>> output = producer.history();
diff --git a/streams/test-utils/src/test/java/org/apache/kafka/streams/TopologyTestDriverTest.java b/streams/test-utils/src/test/java/org/apache/kafka/streams/TopologyTestDriverTest.java
index d757f33..077b8ca 100644
--- a/streams/test-utils/src/test/java/org/apache/kafka/streams/TopologyTestDriverTest.java
+++ b/streams/test-utils/src/test/java/org/apache/kafka/streams/TopologyTestDriverTest.java
@@ -28,6 +28,7 @@ import org.apache.kafka.common.serialization.StringSerializer;
 import org.apache.kafka.common.utils.Bytes;
 import org.apache.kafka.common.utils.SystemTime;
 import org.apache.kafka.streams.kstream.Materialized;
+import org.apache.kafka.streams.errors.TopologyException;
 import org.apache.kafka.streams.processor.Processor;
 import org.apache.kafka.streams.processor.ProcessorContext;
 import org.apache.kafka.streams.processor.ProcessorSupplier;
@@ -54,6 +55,7 @@ import java.util.Map;
 import java.util.Objects;
 import java.util.Properties;
 import java.util.Set;
+import java.util.regex.Pattern;
 
 import static org.hamcrest.CoreMatchers.equalTo;
 import static org.junit.Assert.assertEquals;
@@ -902,11 +904,11 @@ public class TopologyTestDriverTest {
             );
         }
     }
-    
+
     @Test
     public void shouldFeedStoreFromGlobalKTable() {
         final StreamsBuilder builder = new StreamsBuilder();
-        builder.globalTable("topic",  
+        builder.globalTable("topic",
             Consumed.with(Serdes.String(), Serdes.String()),
             Materialized.<String, String, KeyValueStore<Bytes, byte[]>>as("globalStore"));
         try (final TopologyTestDriver testDriver = new TopologyTestDriver(builder.build(), config)) {
@@ -919,4 +921,98 @@ public class TopologyTestDriverTest {
             Assert.assertEquals("value1", globalStore.get("k1"));
         }
     }
+
+    private Topology setupMultipleSourcesPatternTopology(final Pattern... sourceTopicPatternNames) {
+        final Topology topology = new Topology();
+
+        final String[] processorNames = new String[sourceTopicPatternNames.length];
+        int i = 0;
+        for (final Pattern sourceTopicPatternName : sourceTopicPatternNames) {
+            final String sourceName = sourceTopicPatternName + "-source";
+            final String processorName = sourceTopicPatternName + "-processor";
+            topology.addSource(sourceName, sourceTopicPatternName);
+            processorNames[i++] = processorName;
+            topology.addProcessor(processorName, new MockProcessorSupplier(), sourceName);
+        }
+        topology.addSink("sink-topic", SINK_TOPIC_1, processorNames);
+        return topology;
+    }
+
+    @Test
+    public void shouldProcessFromSourcesThatMatchMultiplePattern() {
+
+        final  Pattern pattern2Source1 = Pattern.compile("source-topic-\\d");
+        final  Pattern pattern2Source2 = Pattern.compile("source-topic-[A-Z]");
+        final  String consumerTopic2 = "source-topic-Z";
+
+        final ConsumerRecord<byte[], byte[]> consumerRecord2 = consumerRecordFactory.create(consumerTopic2, key2, value2, timestamp2);
+
+        testDriver = new TopologyTestDriver(setupMultipleSourcesPatternTopology(pattern2Source1, pattern2Source2), config);
+
+        final List<Record> processedRecords1 = mockProcessors.get(0).processedRecords;
+        final List<Record> processedRecords2 = mockProcessors.get(1).processedRecords;
+
+        testDriver.pipeInput(consumerRecord1);
+
+        assertEquals(1, processedRecords1.size());
+        assertEquals(0, processedRecords2.size());
+
+        final Record record1 = processedRecords1.get(0);
+        final Record expectedResult1 = new Record(consumerRecord1);
+        expectedResult1.offset = 0L;
+        assertThat(record1, equalTo(expectedResult1));
+
+        testDriver.pipeInput(consumerRecord2);
+
+        assertEquals(1, processedRecords1.size());
+        assertEquals(1, processedRecords2.size());
+
+        final Record record2 = processedRecords2.get(0);
+        final Record expectedResult2 = new Record(consumerRecord2);
+        expectedResult2.offset = 0L;
+        assertThat(record2, equalTo(expectedResult2));
+    }
+
+    @Test
+    public void shouldProcessFromSourceThatMatchPattern() {
+        final String sourceName = "source";
+        final Pattern pattern2Source1 = Pattern.compile("source-topic-\\d");
+
+        final Topology topology = new Topology();
+
+        topology.addSource(sourceName, pattern2Source1);
+        topology.addSink("sink", SINK_TOPIC_1, sourceName);
+
+        testDriver = new TopologyTestDriver(topology, config);
+        testDriver.pipeInput(consumerRecord1);
+
+        final ProducerRecord outputRecord = testDriver.readOutput(SINK_TOPIC_1);
+        assertEquals(key1, outputRecord.key());
+        assertEquals(value1, outputRecord.value());
+        assertEquals(SINK_TOPIC_1, outputRecord.topic());
+    }
+
+    @Test
+    public void shouldThrowPatternNotValidForTopicNameException() {
+        final String sourceName = "source";
+        final String pattern2Source1 = "source-topic-\\d";
+
+        final Topology topology = new Topology();
+
+        topology.addSource(sourceName, pattern2Source1);
+        topology.addSink("sink", SINK_TOPIC_1, sourceName);
+
+        testDriver = new TopologyTestDriver(topology, config);
+        try {
+            testDriver.pipeInput(consumerRecord1);
+        } catch (final TopologyException exception) {
+            String str =
+                    String.format(
+                            "Invalid topology: Topology add source of type String for topic: %s cannot contain regex pattern for " +
+                                    "input record topic: %s and hence cannot process the message.",
+                            pattern2Source1,
+                            SOURCE_TOPIC_1);
+            assertEquals(str, exception.getMessage());
+        }
+    }
 }

-- 
To stop receiving notification emails like this one, please contact
mjsax@apache.org.