You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by mj...@apache.org on 2020/05/07 14:58:50 UTC

[kafka] branch trunk updated: KAFKA-9865: Expose output topic names from TopologyTestDriver (#8483)

This is an automated email from the ASF dual-hosted git repository.

mjsax pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 33bfdac  KAFKA-9865: Expose output topic names from TopologyTestDriver (#8483)
33bfdac is described below

commit 33bfdacd7ab5afefbc8d494aeab6957b33b1354c
Author: Andy Coates <80...@users.noreply.github.com>
AuthorDate: Thu May 7 15:57:55 2020 +0100

    KAFKA-9865: Expose output topic names from TopologyTestDriver (#8483)
    
    Implements KIP-594
    
    Reviewers: Matthias J. Sax <ma...@confluent.io>
---
 .../apache/kafka/streams/TopologyTestDriver.java   | 16 +++++++
 .../kafka/streams/TopologyTestDriverTest.java      | 54 ++++++++++++++++++----
 2 files changed, 61 insertions(+), 9 deletions(-)

diff --git a/streams/test-utils/src/main/java/org/apache/kafka/streams/TopologyTestDriver.java b/streams/test-utils/src/main/java/org/apache/kafka/streams/TopologyTestDriver.java
index b36fdce..d902f81 100644
--- a/streams/test-utils/src/main/java/org/apache/kafka/streams/TopologyTestDriver.java
+++ b/streams/test-utils/src/main/java/org/apache/kafka/streams/TopologyTestDriver.java
@@ -102,6 +102,7 @@ import java.util.NoSuchElementException;
 import java.util.Objects;
 import java.util.Properties;
 import java.util.Queue;
+import java.util.Set;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicLong;
 import java.util.function.Supplier;
@@ -861,6 +862,21 @@ public class TopologyTestDriver implements Closeable {
         return new TestOutputTopic<>(this, topicName, keyDeserializer, valueDeserializer);
     }
 
+    /**
+     * Get all the names of all the topics to which records have been produced during the test run.
+     * <p>
+     * Call this method after piping the input into the test driver to retrieve the full set of topic names the topology
+     * produced records to.
+     * <p>
+     * The returned set of topic names may include user (e.g., output) and internal (e.g., changelog, repartition) topic
+     * names.
+     *
+     * @return The set of topic names the topology has produced to
+     */
+    public final Set<String> producedTopicNames() {
+        return Collections.unmodifiableSet(outputRecordsByTopic.keySet());
+    }
+
     ProducerRecord<byte[], byte[]> readRecord(final String topic) {
         final Queue<? extends ProducerRecord<byte[], byte[]>> outputRecords = getRecordsQueue(topic);
         if (outputRecords == null) {
diff --git a/streams/test-utils/src/test/java/org/apache/kafka/streams/TopologyTestDriverTest.java b/streams/test-utils/src/test/java/org/apache/kafka/streams/TopologyTestDriverTest.java
index fb5585f..3ecf013 100644
--- a/streams/test-utils/src/test/java/org/apache/kafka/streams/TopologyTestDriverTest.java
+++ b/streams/test-utils/src/test/java/org/apache/kafka/streams/TopologyTestDriverTest.java
@@ -73,7 +73,10 @@ import java.util.regex.Pattern;
 import static org.apache.kafka.common.utils.Utils.mkEntry;
 import static org.apache.kafka.common.utils.Utils.mkMap;
 import static org.apache.kafka.common.utils.Utils.mkProperties;
+import static org.hamcrest.CoreMatchers.endsWith;
 import static org.hamcrest.CoreMatchers.equalTo;
+import static org.hamcrest.CoreMatchers.hasItem;
+import static org.hamcrest.CoreMatchers.hasItems;
 import static org.hamcrest.CoreMatchers.is;
 import static org.hamcrest.CoreMatchers.notNullValue;
 import static org.hamcrest.MatcherAssert.assertThat;
@@ -388,6 +391,19 @@ public class TopologyTestDriverTest {
         return topology;
     }
 
+    private Topology setupTopologyWithInternalTopic() {
+        final StreamsBuilder builder = new StreamsBuilder();
+
+        builder.stream(SOURCE_TOPIC_1)
+            .selectKey((k, v) -> v)
+            .groupByKey()
+            .count()
+            .toStream()
+            .to(SINK_TOPIC_1);
+
+        return builder.build(config);
+    }
+
     @Test
     public void shouldInitProcessor() {
         testDriver = new TopologyTestDriver(setupSingleProcessorTopology(), config);
@@ -451,6 +467,26 @@ public class TopologyTestDriverTest {
     }
 
     @Test
+    public void shouldGetSinkTopicNames() {
+        testDriver = new TopologyTestDriver(setupSourceSinkTopology(), config);
+
+        pipeRecord(SOURCE_TOPIC_1, testRecord1);
+
+        assertThat(testDriver.producedTopicNames(), hasItem(SINK_TOPIC_1));
+    }
+
+    @Test
+    public void shouldGetInternalTopicNames() {
+        testDriver = new TopologyTestDriver(setupTopologyWithInternalTopic(), config);
+
+        pipeRecord(SOURCE_TOPIC_1, testRecord1);
+
+        assertThat(testDriver.producedTopicNames(), hasItems(
+            endsWith("-changelog"), endsWith("-repartition")
+        ));
+    }
+
+    @Test
     public void shouldProcessRecordForTopic() {
         testDriver = new TopologyTestDriver(setupSourceSinkTopology(), config);
 
@@ -1579,19 +1615,19 @@ public class TopologyTestDriverTest {
         final Topology topology = new Topology();
         topology.addSource("source", new StringDeserializer(), new StringDeserializer(), "input");
         topology.addGlobalStore(
-            Stores.keyValueStoreBuilder(Stores.inMemoryKeyValueStore("globule-store"), Serdes.String(), Serdes.String()).withLoggingDisabled(),
-            "globuleSource",
+            Stores.keyValueStoreBuilder(Stores.inMemoryKeyValueStore("global-store"), Serdes.String(), Serdes.String()).withLoggingDisabled(),
+            "globalSource",
             new StringDeserializer(),
             new StringDeserializer(),
-            "globule-topic",
-            "globuleProcessor",
+            "global-topic",
+            "globalProcessor",
             () -> new Processor<String, String>() {
                 private KeyValueStore<String, String> stateStore;
 
                 @SuppressWarnings("unchecked")
                 @Override
                 public void init(final ProcessorContext context) {
-                    stateStore = (KeyValueStore<String, String>) context.getStateStore("globule-store");
+                    stateStore = (KeyValueStore<String, String>) context.getStateStore("global-store");
                 }
 
                 @Override
@@ -1614,23 +1650,23 @@ public class TopologyTestDriverTest {
                         context().forward(key, "recurse-" + value, To.child("recursiveSink"));
                     }
                     context().forward(key, value, To.child("sink"));
-                    context().forward(key, value, To.child("globuleSink"));
+                    context().forward(key, value, To.child("globalSink"));
                 }
             },
             "source"
         );
         topology.addSink("recursiveSink", "input", new StringSerializer(), new StringSerializer(), "recursiveProcessor");
         topology.addSink("sink", "output", new StringSerializer(), new StringSerializer(), "recursiveProcessor");
-        topology.addSink("globuleSink", "globule-topic", new StringSerializer(), new StringSerializer(), "recursiveProcessor");
+        topology.addSink("globalSink", "global-topic", new StringSerializer(), new StringSerializer(), "recursiveProcessor");
 
         try (final TopologyTestDriver topologyTestDriver = new TopologyTestDriver(topology, properties)) {
             final TestInputTopic<String, String> in = topologyTestDriver.createInputTopic("input", new StringSerializer(), new StringSerializer());
-            final TestOutputTopic<String, String> globalTopic = topologyTestDriver.createOutputTopic("globule-topic", new StringDeserializer(), new StringDeserializer());
+            final TestOutputTopic<String, String> globalTopic = topologyTestDriver.createOutputTopic("global-topic", new StringDeserializer(), new StringDeserializer());
 
             in.pipeInput("A", "alpha");
 
             // expect the global store to correctly reflect the last update
-            final KeyValueStore<String, String> keyValueStore = topologyTestDriver.getKeyValueStore("globule-store");
+            final KeyValueStore<String, String> keyValueStore = topologyTestDriver.getKeyValueStore("global-store");
             assertThat(keyValueStore, notNullValue());
             assertThat(keyValueStore.get("A"), is("recurse-alpha"));