You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by mj...@apache.org on 2018/02/08 21:37:14 UTC
[kafka] branch 1.1 updated: KAFKA-3625: add docs for Kafka Streams
test-utils (follow up) (#4493)
This is an automated email from the ASF dual-hosted git repository.
mjsax pushed a commit to branch 1.1
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/1.1 by this push:
new ee7cacc KAFKA-3625: add docs for Kafka Streams test-utils (follow up) (#4493)
ee7cacc is described below
commit ee7cacc811f6db889bbe7a62bceb608e8a1af6d3
Author: Matthias J. Sax <mj...@apache.org>
AuthorDate: Thu Feb 8 13:35:57 2018 -0800
KAFKA-3625: add docs for Kafka Streams test-utils (follow up) (#4493)
Adds web page docs for KIP-247
Author: Matthias J. Sax <ma...@confluent.io>
Reviewers: Guozhang Wang <gu...@confluent.io>, Bill Bejeck <bi...@confluent.io>, Joel Hamill <jo...@confluent.io>, Damian Guy <da...@confluent.io>
---
.../streams/developer-guide/testing.html | 19 ++
docs/streams/developer-guide/datatypes.html | 2 +-
docs/streams/developer-guide/index.html | 1 +
.../developer-guide/interactive-queries.html | 2 +-
docs/streams/developer-guide/testing.html | 315 +++++++++++++++++++++
5 files changed, 337 insertions(+), 2 deletions(-)
diff --git a/docs/documentation/streams/developer-guide/testing.html b/docs/documentation/streams/developer-guide/testing.html
new file mode 100644
index 0000000..4753e66
--- /dev/null
+++ b/docs/documentation/streams/developer-guide/testing.html
@@ -0,0 +1,19 @@
+<!--
+ Licensed to the Apache Software Foundation (ASF) under one or more
+ contributor license agreements. See the NOTICE file distributed with
+ this work for additional information regarding copyright ownership.
+ The ASF licenses this file to You under the Apache License, Version 2.0
+ (the "License"); you may not use this file except in compliance with
+ the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+-->
+
+<!-- should always link the latest release's documentation -->
+<!--#include virtual="../../../streams/developer-guide/testing.html" -->
diff --git a/docs/streams/developer-guide/datatypes.html b/docs/streams/developer-guide/datatypes.html
index a86ea3e..51bd585 100644
--- a/docs/streams/developer-guide/datatypes.html
+++ b/docs/streams/developer-guide/datatypes.html
@@ -178,7 +178,7 @@
</div>
<div class="pagination">
<a href="/{{version}}/documentation/streams/developer-guide/processor-api" class="pagination__btn pagination__btn__prev">Previous</a>
- <a href="/{{version}}/documentation/streams/developer-guide/interactive-queries" class="pagination__btn pagination__btn__next">Next</a>
+ <a href="/{{version}}/documentation/streams/developer-guide/testing" class="pagination__btn pagination__btn__next">Next</a>
</div>
</script>
diff --git a/docs/streams/developer-guide/index.html b/docs/streams/developer-guide/index.html
index 443ad7d..095ec82 100644
--- a/docs/streams/developer-guide/index.html
+++ b/docs/streams/developer-guide/index.html
@@ -44,6 +44,7 @@
<li class="toctree-l1"><a class="reference internal" href="dsl-api.html">Streams DSL</a></li>
<li class="toctree-l1"><a class="reference internal" href="processor-api.html">Processor API</a></li>
<li class="toctree-l1"><a class="reference internal" href="datatypes.html">Data Types and Serialization</a></li>
+<li class="toctree-l1"><a class="reference internal" href="testing.html">Testing a Streams Application</a></li>
<li class="toctree-l1"><a class="reference internal" href="interactive-queries.html">Interactive Queries</a></li>
<li class="toctree-l1"><a class="reference internal" href="memory-mgmt.html">Memory Management</a></li>
<li class="toctree-l1"><a class="reference internal" href="running-app.html">Running Streams Applications</a></li>
diff --git a/docs/streams/developer-guide/interactive-queries.html b/docs/streams/developer-guide/interactive-queries.html
index 4358774..4675d8a 100644
--- a/docs/streams/developer-guide/interactive-queries.html
+++ b/docs/streams/developer-guide/interactive-queries.html
@@ -484,7 +484,7 @@ interactive queries</span></p>
</div>
</div>
<div class="pagination">
- <a href="/{{version}}/documentation/streams/developer-guide/datatypes" class="pagination__btn pagination__btn__prev">Previous</a>
+ <a href="/{{version}}/documentation/streams/developer-guide/testing" class="pagination__btn pagination__btn__prev">Previous</a>
<a href="/{{version}}/documentation/streams/developer-guide/memory-mgmt" class="pagination__btn pagination__btn__next">Next</a>
</div>
</script>
diff --git a/docs/streams/developer-guide/testing.html b/docs/streams/developer-guide/testing.html
new file mode 100644
index 0000000..e6886a1
--- /dev/null
+++ b/docs/streams/developer-guide/testing.html
@@ -0,0 +1,315 @@
+<!--
+ Licensed to the Apache Software Foundation (ASF) under one or more
+ contributor license agreements. See the NOTICE file distributed with
+ this work for additional information regarding copyright ownership.
+ The ASF licenses this file to You under the Apache License, Version 2.0
+ (the "License"); you may not use this file except in compliance with
+ the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+-->
+
+<script><!--#include virtual="../../js/templateData.js" --></script>
+
+<script id="content-template" type="text/x-handlebars-template">
+ <!-- h1>Developer Guide for Kafka Streams</h1 -->
+ <div class="sub-nav-sticky">
+ <div class="sticky-top">
+ <!-- div style="height:35px">
+ <a href="/{{version}}/documentation/streams/">Introduction</a>
+ <a class="active-menu-item" href="/{{version}}/documentation/streams/developer-guide">Developer Guide</a>
+ <a href="/{{version}}/documentation/streams/core-concepts">Concepts</a>
+ <a href="/{{version}}/documentation/streams/quickstart">Run Demo App</a>
+ <a href="/{{version}}/documentation/streams/tutorial">Tutorial: Write App</a>
+ </div -->
+ </div>
+ </div>
+
+ <div class="section" id="testing">
+ <span id="streams-developer-guide-testing"></span><h1>Testing a Streams Application<a class="headerlink" href="#testing" title="Permalink to this headline"></a></h1>
+ <p>
+ To test a Kafka Streams application, Kafka provides a test-utils artifact that can be added as regular dependency to your test code base.
+ Example <code>pom.xml</code> snippet when using Maven:
+ </p>
+ <pre>
+<dependency>
+ <groupId>org.apache.kafka</groupId>
+ <artifactId>kafka-streams-test-utils</artifactId>
+ <version>{{fullDotVersion}}</version>
+ <scope>test</scope>
+</dependency>
+ </pre>
+ <p>
+ The test-utils package provides a <code>TopologyTestDriver</code> that can be used pipe data through a <code>Topology</code> that is either assembled manually
+ using Processor API or via the DSL using <code>StreamsBuilder</code>.
+ The test driver simulates the library runtime that continuously fetches records from input topics and processes them by traversing the topology.
+ You can use the test driver to verify that your specified processor topology computes the correct result with the manually piped in data records.
+ The test driver captures the results records and allows to query its embedded state stores.
+ <pre>
+// Processor API
+Topology topology = new Topology();
+topology.addSource("sourceProcessor", "input-topic");
+topology.addProcessor("processor", ..., "sourceProcessor");
+topology.addSink("sinkProcessor", "output-topic", "processor");
+// or
+// using DSL
+StreamsBuilder builder = new StreamsBuilder();
+builder.stream("input-topic").filter(...).to("output-topic");
+Topology topology = builder.build();
+
+// setup test driver
+Properties config = new Properties();
+config.put(StreamsConfig.APPLICATION_ID_CONFIG, "test");
+config.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "dummy:1234");
+TopologyTestDriver testDriver = new TopologyTestDriver(topology, config);
+ </pre>
+ <p>
+ The test driver accepts <code>ConsumerRecord</code>s with key and value type <code>byte[]</code>.
+ Because <code>byte[]</code> types can be problematic, you can use the <code>ConsumerRecordFactory</code> to generate those records
+ by providing regular Java types for key and values and the corresponding serializers.
+ </p>
+ <pre>
+ConsumerRecordFactory<String, Integer> factory = new ConsumerRecordFactory<>("input-topic", new StringSerializer(), new IntegerSerializer());
+testDriver.pipe(factory.create("key", 42L));
+ </pre>
+ <p>
+ To verify the output, the test driver produces <code>ProducerRecord</code>s with key and value type <code>byte[]</code>.
+ For result verification, you can specify corresponding deserializers when reading the output record from the driver.
+ <pre>
+ProducerRecord<String, Integer> outputRecord = testDriver.readOutput("output-topic", new StringDeserializer(), new LongDeserializer());
+ </pre>
+ <p>
+ For result verification, you can use <code>OutputVerifier</code>.
+ It offers helper methods to compare only certain parts of the result record:
+ for example, you might only care about the key and value, but not the timestamp of the result record.
+ </p>
+ <pre>
+OutputVerifier.compareKeyValue(outputRecord, "key", 42L); // throws AssertionError if key or value does not match
+ </pre>
+ <p>
+ <code>TopologyTestDriver</code> supports punctuations, too.
+ Event-time punctuations are triggered automatically based on the processed records' timestamps.
+ Wall-clock-time punctuations can also be triggered by advancing the test driver's wall-clock-time (the driver mocks wall-clock-time internally to give users control over it).
+ </p>
+ <pre>
+testDriver.advanceWallClockTime(20L);
+ </pre>
+ </div>
+ <p>
+ Additionally, you can access state stores via the test driver before or after a test.
+ Accessing stores before a test is useful to pre-populate a store with some initial values.
+ After data was processed, expected updates to the store can be verified.
+ </p>
+ <pre>
+KeyValueStore store = testDriver.getKeyValueStore("store-name");
+ </pre>
+ <p>
+ Note, that you should always close the test driver at the end to make sure all resources are release properly.
+ </p>
+ <pre>
+testDriver.close();
+ </pre>
+
+ <h2>Example</h2>
+ <p>
+ The following example demonstrates how to use the test driver and helper classes.
+ The example creates a topology that computes the maximum value per key using a key-value-store.
+ While processing, no output is generated, but only the store is updated.
+ Output is only sent downstream based on event-time and wall-clock punctuations.
+ </p>
+ <pre>
+private TopologyTestDriver testDriver;
+private KeyValueStore<String, Long> store;
+
+private StringDeserializer stringDeserializer = new StringDeserializer();
+private LongDeserializer longDeserializer = new LongDeserializer();
+private ConsumerRecordFactory<String, Long> recordFactory = new ConsumerRecordFactory<>(new StringSerializer(), new LongSerializer());
+
+@Before
+public void setup() {
+ Topology topology = new Topology();
+ topology.addSource("sourceProcessor", "input-topic");
+ topology.addProcessor("aggregator", new CustomMaxAggregatorSupplier(), "sourceProcessor");
+ topology.addStateStore(
+ Stores.keyValueStoreBuilder(
+ Stores.inMemoryKeyValueStore("aggStore"),
+ Serdes.String(),
+ Serdes.Long()).withLoggingDisabled(), // need to disable logging to allow store pre-populating
+ "aggregator");
+ topology.addSink("sinkProcessor", "result-topic", "aggregator");
+
+ // setup test driver
+ Properties config = new Properties();
+ config.setProperty(StreamsConfig.APPLICATION_ID_CONFIG, "maxAggregation");
+ config.setProperty(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "dummy:1234");
+ config.setProperty(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());
+ config.setProperty(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.Long().getClass().getName());
+ testDriver = new TopologyTestDriver(topology, config);
+
+ // pre-populate store
+ store = testDriver.getKeyValueStore("aggStore");
+ store.put("a", 21L);
+}
+
+@After
+public void tearDown() {
+ testDriver.close();
+}
+
+@Test
+public void shouldFlushStoreForFirstInput() {
+ testDriver.pipeInput(recordFactory.create("input-topic", "a", 1L, 9999L));
+ OutputVerifier.compareKeyValue(testDriver.readOutput("result-topic", stringDeserializer, longDeserializer), "a", 21L);
+ Assert.assertNull(testDriver.readOutput("result-topic", stringDeserializer, longDeserializer));
+}
+
+@Test
+public void shouldNotUpdateStoreForSmallerValue() {
+ testDriver.pipeInput(recordFactory.create("input-topic", "a", 1L, 9999L));
+ Assert.assertThat(store.get("a"), equalTo(21L));
+ OutputVerifier.compareKeyValue(testDriver.readOutput("result-topic", stringDeserializer, longDeserializer), "a", 21L);
+ Assert.assertNull(testDriver.readOutput("result-topic", stringDeserializer, longDeserializer));
+}
+
+@Test
+public void shouldNotUpdateStoreForLargerValue() {
+ testDriver.pipeInput(recordFactory.create("input-topic", "a", 42L, 9999L));
+ Assert.assertThat(store.get("a"), equalTo(42L));
+ OutputVerifier.compareKeyValue(testDriver.readOutput("result-topic", stringDeserializer, longDeserializer), "a", 42L);
+ Assert.assertNull(testDriver.readOutput("result-topic", stringDeserializer, longDeserializer));
+}
+
+@Test
+public void shouldUpdateStoreForNewKey() {
+ testDriver.pipeInput(recordFactory.create("input-topic", "b", 21L, 9999L));
+ Assert.assertThat(store.get("b"), equalTo(21L));
+ OutputVerifier.compareKeyValue(testDriver.readOutput("result-topic", stringDeserializer, longDeserializer), "a", 21L);
+ OutputVerifier.compareKeyValue(testDriver.readOutput("result-topic", stringDeserializer, longDeserializer), "b", 21L);
+ Assert.assertNull(testDriver.readOutput("result-topic", stringDeserializer, longDeserializer));
+}
+
+@Test
+public void shouldPunctuateIfEvenTimeAdvances() {
+ testDriver.pipeInput(recordFactory.create("input-topic", "a", 1L, 9999L));
+ OutputVerifier.compareKeyValue(testDriver.readOutput("result-topic", stringDeserializer, longDeserializer), "a", 21L);
+
+ testDriver.pipeInput(recordFactory.create("input-topic", "a", 1L, 9999L));
+ Assert.assertNull(testDriver.readOutput("result-topic", stringDeserializer, longDeserializer));
+
+ testDriver.pipeInput(recordFactory.create("input-topic", "a", 1L, 10000L));
+ OutputVerifier.compareKeyValue(testDriver.readOutput("result-topic", stringDeserializer, longDeserializer), "a", 21L);
+ Assert.assertNull(testDriver.readOutput("result-topic", stringDeserializer, longDeserializer));
+}
+
+@Test
+public void shouldPunctuateIfWallClockTimeAdvances() {
+ testDriver.advanceWallClockTime(60000);
+ OutputVerifier.compareKeyValue(testDriver.readOutput("result-topic", stringDeserializer, longDeserializer), "a", 21L);
+ Assert.assertNull(testDriver.readOutput("result-topic", stringDeserializer, longDeserializer));
+}
+
+public class CustomMaxAggregatorSupplier implements ProcessorSupplier<String, Long> {
+ @Override
+ public Processor<String, Long> get() {
+ return new CustomMaxAggregator();
+ }
+}
+
+public class CustomMaxAggregator implements Processor<String, Long> {
+ ProcessorContext context;
+ private KeyValueStore<String, Long> store;
+
+ @SuppressWarnings("unchecked")
+ @Override
+ public void init(ProcessorContext context) {
+ this.context = context;
+ context.schedule(60000, PunctuationType.WALL_CLOCK_TIME, new Punctuator() {
+ @Override
+ public void punctuate(long timestamp) {
+ flushStore();
+ }
+ });
+ context.schedule(10000, PunctuationType.STREAM_TIME, new Punctuator() {
+ @Override
+ public void punctuate(long timestamp) {
+ flushStore();
+ }
+ });
+ store = (KeyValueStore<String, Long>) context.getStateStore("aggStore");
+ }
+
+ @Override
+ public void process(String key, Long value) {
+ Long oldValue = store.get(key);
+ if (oldValue == null || value > oldValue) {
+ store.put(key, value);
+ }
+ }
+
+ private void flushStore() {
+ KeyValueIterator<String, Long> it = store.all();
+ while (it.hasNext()) {
+ KeyValue<String, Long> next = it.next();
+ context.forward(next.key, next.value);
+ }
+ }
+
+ @Override
+ public void punctuate(long timestamp) {} // deprecated; not used
+
+ @Override
+ public void close() {}
+}
+ </pre>
+ <div class="pagination">
+ <div class="pagination">
+ <a href="/{{version}}/documentation/streams/developer-guide/datatypes" class="pagination__btn pagination__btn__prev">Previous</a>
+ <a href="/{{version}}/documentation/streams/developer-guide/interactive-queries" class="pagination__btn pagination__btn__next">Next</a>
+ </div>
+</script>
+
+<!--#include virtual="../../../includes/_header.htm" -->
+<!--#include virtual="../../../includes/_top.htm" -->
+<div class="content documentation documentation--current">
+ <!--#include virtual="../../../includes/_nav.htm" -->
+ <div class="right">
+ <!--#include virtual="../../../includes/_docs_banner.htm" -->
+ <ul class="breadcrumbs">
+ <li><a href="/documentation">Documentation</a></li>
+ <li><a href="/documentation/streams">Kafka Streams</a></li>
+ <li><a href="/documentation/streams/developer-guide/">Developer Guide</a></li>
+ </ul>
+ <div class="p-content"></div>
+ </div>
+</div>
+<!--#include virtual="../../../includes/_footer.htm" -->
+<script>
+ $(function() {
+ // Show selected style on nav item
+ $('.b-nav__streams').addClass('selected');
+
+ //sticky secondary nav
+ var $navbar = $(".sub-nav-sticky"),
+ y_pos = $navbar.offset().top,
+ height = $navbar.height();
+
+ $(window).scroll(function() {
+ var scrollTop = $(window).scrollTop();
+
+ if (scrollTop > y_pos - height) {
+ $navbar.addClass("navbar-fixed")
+ } else if (scrollTop <= y_pos) {
+ $navbar.removeClass("navbar-fixed")
+ }
+ });
+
+ // Display docs subnav items
+ $('.b-nav__docs').parent().toggleClass('nav__item__with__subs--expanded');
+ });
+</script>
--
To stop receiving notification emails like this one, please contact
mjsax@apache.org.