You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by mj...@apache.org on 2020/05/07 14:58:50 UTC
[kafka] branch trunk updated: KAFKA-9865: Expose output topic names
from TopologyTestDriver (#8483)
This is an automated email from the ASF dual-hosted git repository.
mjsax pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new 33bfdac KAFKA-9865: Expose output topic names from TopologyTestDriver (#8483)
33bfdac is described below
commit 33bfdacd7ab5afefbc8d494aeab6957b33b1354c
Author: Andy Coates <80...@users.noreply.github.com>
AuthorDate: Thu May 7 15:57:55 2020 +0100
KAFKA-9865: Expose output topic names from TopologyTestDriver (#8483)
Implements KIP-594
Reviewers: Matthias J. Sax <ma...@confluent.io>
---
.../apache/kafka/streams/TopologyTestDriver.java | 16 +++++++
.../kafka/streams/TopologyTestDriverTest.java | 54 ++++++++++++++++++----
2 files changed, 61 insertions(+), 9 deletions(-)
diff --git a/streams/test-utils/src/main/java/org/apache/kafka/streams/TopologyTestDriver.java b/streams/test-utils/src/main/java/org/apache/kafka/streams/TopologyTestDriver.java
index b36fdce..d902f81 100644
--- a/streams/test-utils/src/main/java/org/apache/kafka/streams/TopologyTestDriver.java
+++ b/streams/test-utils/src/main/java/org/apache/kafka/streams/TopologyTestDriver.java
@@ -102,6 +102,7 @@ import java.util.NoSuchElementException;
import java.util.Objects;
import java.util.Properties;
import java.util.Queue;
+import java.util.Set;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
import java.util.function.Supplier;
@@ -861,6 +862,21 @@ public class TopologyTestDriver implements Closeable {
return new TestOutputTopic<>(this, topicName, keyDeserializer, valueDeserializer);
}
+ /**
+ * Get all the names of all the topics to which records have been produced during the test run.
+ * <p>
+ * Call this method after piping the input into the test driver to retrieve the full set of topic names the topology
+ * produced records to.
+ * <p>
+ * The returned set of topic names may include user (e.g., output) and internal (e.g., changelog, repartition) topic
+ * names.
+ *
+ * @return The set of topic names the topology has produced to
+ */
+ public final Set<String> producedTopicNames() {
+ return Collections.unmodifiableSet(outputRecordsByTopic.keySet());
+ }
+
ProducerRecord<byte[], byte[]> readRecord(final String topic) {
final Queue<? extends ProducerRecord<byte[], byte[]>> outputRecords = getRecordsQueue(topic);
if (outputRecords == null) {
diff --git a/streams/test-utils/src/test/java/org/apache/kafka/streams/TopologyTestDriverTest.java b/streams/test-utils/src/test/java/org/apache/kafka/streams/TopologyTestDriverTest.java
index fb5585f..3ecf013 100644
--- a/streams/test-utils/src/test/java/org/apache/kafka/streams/TopologyTestDriverTest.java
+++ b/streams/test-utils/src/test/java/org/apache/kafka/streams/TopologyTestDriverTest.java
@@ -73,7 +73,10 @@ import java.util.regex.Pattern;
import static org.apache.kafka.common.utils.Utils.mkEntry;
import static org.apache.kafka.common.utils.Utils.mkMap;
import static org.apache.kafka.common.utils.Utils.mkProperties;
+import static org.hamcrest.CoreMatchers.endsWith;
import static org.hamcrest.CoreMatchers.equalTo;
+import static org.hamcrest.CoreMatchers.hasItem;
+import static org.hamcrest.CoreMatchers.hasItems;
import static org.hamcrest.CoreMatchers.is;
import static org.hamcrest.CoreMatchers.notNullValue;
import static org.hamcrest.MatcherAssert.assertThat;
@@ -388,6 +391,19 @@ public class TopologyTestDriverTest {
return topology;
}
+ private Topology setupTopologyWithInternalTopic() {
+ final StreamsBuilder builder = new StreamsBuilder();
+
+ builder.stream(SOURCE_TOPIC_1)
+ .selectKey((k, v) -> v)
+ .groupByKey()
+ .count()
+ .toStream()
+ .to(SINK_TOPIC_1);
+
+ return builder.build(config);
+ }
+
@Test
public void shouldInitProcessor() {
testDriver = new TopologyTestDriver(setupSingleProcessorTopology(), config);
@@ -451,6 +467,26 @@ public class TopologyTestDriverTest {
}
@Test
+ public void shouldGetSinkTopicNames() {
+ testDriver = new TopologyTestDriver(setupSourceSinkTopology(), config);
+
+ pipeRecord(SOURCE_TOPIC_1, testRecord1);
+
+ assertThat(testDriver.producedTopicNames(), hasItem(SINK_TOPIC_1));
+ }
+
+ @Test
+ public void shouldGetInternalTopicNames() {
+ testDriver = new TopologyTestDriver(setupTopologyWithInternalTopic(), config);
+
+ pipeRecord(SOURCE_TOPIC_1, testRecord1);
+
+ assertThat(testDriver.producedTopicNames(), hasItems(
+ endsWith("-changelog"), endsWith("-repartition")
+ ));
+ }
+
+ @Test
public void shouldProcessRecordForTopic() {
testDriver = new TopologyTestDriver(setupSourceSinkTopology(), config);
@@ -1579,19 +1615,19 @@ public class TopologyTestDriverTest {
final Topology topology = new Topology();
topology.addSource("source", new StringDeserializer(), new StringDeserializer(), "input");
topology.addGlobalStore(
- Stores.keyValueStoreBuilder(Stores.inMemoryKeyValueStore("globule-store"), Serdes.String(), Serdes.String()).withLoggingDisabled(),
- "globuleSource",
+ Stores.keyValueStoreBuilder(Stores.inMemoryKeyValueStore("global-store"), Serdes.String(), Serdes.String()).withLoggingDisabled(),
+ "globalSource",
new StringDeserializer(),
new StringDeserializer(),
- "globule-topic",
- "globuleProcessor",
+ "global-topic",
+ "globalProcessor",
() -> new Processor<String, String>() {
private KeyValueStore<String, String> stateStore;
@SuppressWarnings("unchecked")
@Override
public void init(final ProcessorContext context) {
- stateStore = (KeyValueStore<String, String>) context.getStateStore("globule-store");
+ stateStore = (KeyValueStore<String, String>) context.getStateStore("global-store");
}
@Override
@@ -1614,23 +1650,23 @@ public class TopologyTestDriverTest {
context().forward(key, "recurse-" + value, To.child("recursiveSink"));
}
context().forward(key, value, To.child("sink"));
- context().forward(key, value, To.child("globuleSink"));
+ context().forward(key, value, To.child("globalSink"));
}
},
"source"
);
topology.addSink("recursiveSink", "input", new StringSerializer(), new StringSerializer(), "recursiveProcessor");
topology.addSink("sink", "output", new StringSerializer(), new StringSerializer(), "recursiveProcessor");
- topology.addSink("globuleSink", "globule-topic", new StringSerializer(), new StringSerializer(), "recursiveProcessor");
+ topology.addSink("globalSink", "global-topic", new StringSerializer(), new StringSerializer(), "recursiveProcessor");
try (final TopologyTestDriver topologyTestDriver = new TopologyTestDriver(topology, properties)) {
final TestInputTopic<String, String> in = topologyTestDriver.createInputTopic("input", new StringSerializer(), new StringSerializer());
- final TestOutputTopic<String, String> globalTopic = topologyTestDriver.createOutputTopic("globule-topic", new StringDeserializer(), new StringDeserializer());
+ final TestOutputTopic<String, String> globalTopic = topologyTestDriver.createOutputTopic("global-topic", new StringDeserializer(), new StringDeserializer());
in.pipeInput("A", "alpha");
// expect the global store to correctly reflect the last update
- final KeyValueStore<String, String> keyValueStore = topologyTestDriver.getKeyValueStore("globule-store");
+ final KeyValueStore<String, String> keyValueStore = topologyTestDriver.getKeyValueStore("global-store");
assertThat(keyValueStore, notNullValue());
assertThat(keyValueStore.get("A"), is("recurse-alpha"));