You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by mj...@apache.org on 2018/02/08 21:37:14 UTC

[kafka] branch 1.1 updated: KAFKA-3625: add docs for Kafka Streams test-utils (follow up) (#4493)

This is an automated email from the ASF dual-hosted git repository.

mjsax pushed a commit to branch 1.1
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/1.1 by this push:
     new ee7cacc  KAFKA-3625: add docs for Kafka Streams test-utils (follow up) (#4493)
ee7cacc is described below

commit ee7cacc811f6db889bbe7a62bceb608e8a1af6d3
Author: Matthias J. Sax <mj...@apache.org>
AuthorDate: Thu Feb 8 13:35:57 2018 -0800

    KAFKA-3625: add docs for Kafka Streams test-utils (follow up) (#4493)
    
    Adds web page docs for KIP-247
    
    Author: Matthias J. Sax <ma...@confluent.io>
    
    Reviewers: Guozhang Wang <gu...@confluent.io>, Bill Bejeck <bi...@confluent.io>, Joel Hamill <jo...@confluent.io>, Damian Guy <da...@confluent.io>
---
 .../streams/developer-guide/testing.html           |  19 ++
 docs/streams/developer-guide/datatypes.html        |   2 +-
 docs/streams/developer-guide/index.html            |   1 +
 .../developer-guide/interactive-queries.html       |   2 +-
 docs/streams/developer-guide/testing.html          | 315 +++++++++++++++++++++
 5 files changed, 337 insertions(+), 2 deletions(-)

diff --git a/docs/documentation/streams/developer-guide/testing.html b/docs/documentation/streams/developer-guide/testing.html
new file mode 100644
index 0000000..4753e66
--- /dev/null
+++ b/docs/documentation/streams/developer-guide/testing.html
@@ -0,0 +1,19 @@
+<!--
+ Licensed to the Apache Software Foundation (ASF) under one or more
+ contributor license agreements.  See the NOTICE file distributed with
+ this work for additional information regarding copyright ownership.
+ The ASF licenses this file to You under the Apache License, Version 2.0
+ (the "License"); you may not use this file except in compliance with
+ the License.  You may obtain a copy of the License at
+
+    http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+-->
+
+<!-- should always link the latest release's documentation -->
+<!--#include virtual="../../../streams/developer-guide/testing.html" -->
diff --git a/docs/streams/developer-guide/datatypes.html b/docs/streams/developer-guide/datatypes.html
index a86ea3e..51bd585 100644
--- a/docs/streams/developer-guide/datatypes.html
+++ b/docs/streams/developer-guide/datatypes.html
@@ -178,7 +178,7 @@
               </div>
   <div class="pagination">
     <a href="/{{version}}/documentation/streams/developer-guide/processor-api" class="pagination__btn pagination__btn__prev">Previous</a>
-    <a href="/{{version}}/documentation/streams/developer-guide/interactive-queries" class="pagination__btn pagination__btn__next">Next</a>
+    <a href="/{{version}}/documentation/streams/developer-guide/testing" class="pagination__btn pagination__btn__next">Next</a>
   </div>
 </script>
 
diff --git a/docs/streams/developer-guide/index.html b/docs/streams/developer-guide/index.html
index 443ad7d..095ec82 100644
--- a/docs/streams/developer-guide/index.html
+++ b/docs/streams/developer-guide/index.html
@@ -44,6 +44,7 @@
 <li class="toctree-l1"><a class="reference internal" href="dsl-api.html">Streams DSL</a></li>
 <li class="toctree-l1"><a class="reference internal" href="processor-api.html">Processor API</a></li>
 <li class="toctree-l1"><a class="reference internal" href="datatypes.html">Data Types and Serialization</a></li>
+<li class="toctree-l1"><a class="reference internal" href="testing.html">Testing a Streams Application</a></li>
 <li class="toctree-l1"><a class="reference internal" href="interactive-queries.html">Interactive Queries</a></li>
 <li class="toctree-l1"><a class="reference internal" href="memory-mgmt.html">Memory Management</a></li>
 <li class="toctree-l1"><a class="reference internal" href="running-app.html">Running Streams Applications</a></li>
diff --git a/docs/streams/developer-guide/interactive-queries.html b/docs/streams/developer-guide/interactive-queries.html
index 4358774..4675d8a 100644
--- a/docs/streams/developer-guide/interactive-queries.html
+++ b/docs/streams/developer-guide/interactive-queries.html
@@ -484,7 +484,7 @@ interactive queries</span></p>
                </div>
               </div>
               <div class="pagination">
-                <a href="/{{version}}/documentation/streams/developer-guide/datatypes" class="pagination__btn pagination__btn__prev">Previous</a>
+                <a href="/{{version}}/documentation/streams/developer-guide/testing" class="pagination__btn pagination__btn__prev">Previous</a>
                 <a href="/{{version}}/documentation/streams/developer-guide/memory-mgmt" class="pagination__btn pagination__btn__next">Next</a>
               </div>
                 </script>
diff --git a/docs/streams/developer-guide/testing.html b/docs/streams/developer-guide/testing.html
new file mode 100644
index 0000000..e6886a1
--- /dev/null
+++ b/docs/streams/developer-guide/testing.html
@@ -0,0 +1,315 @@
+<!--
+ Licensed to the Apache Software Foundation (ASF) under one or more
+ contributor license agreements.  See the NOTICE file distributed with
+ this work for additional information regarding copyright ownership.
+ The ASF licenses this file to You under the Apache License, Version 2.0
+ (the "License"); you may not use this file except in compliance with
+ the License.  You may obtain a copy of the License at
+
+    http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+-->
+
+<script><!--#include virtual="../../js/templateData.js" --></script>
+
+<script id="content-template" type="text/x-handlebars-template">
+  <!-- h1>Developer Guide for Kafka Streams</h1 -->
+  <div class="sub-nav-sticky">
+    <div class="sticky-top">
+      <!-- div style="height:35px">
+        <a href="/{{version}}/documentation/streams/">Introduction</a>
+        <a class="active-menu-item" href="/{{version}}/documentation/streams/developer-guide">Developer Guide</a>
+        <a href="/{{version}}/documentation/streams/core-concepts">Concepts</a>
+        <a href="/{{version}}/documentation/streams/quickstart">Run Demo App</a>
+        <a href="/{{version}}/documentation/streams/tutorial">Tutorial: Write App</a>
+      </div -->
+    </div>
+  </div>
+
+  <div class="section" id="testing">
+    <span id="streams-developer-guide-testing"></span><h1>Testing a Streams Application<a class="headerlink" href="#testing" title="Permalink to this headline"></a></h1>
+    <p>
+      To test a Kafka Streams application, Kafka provides a test-utils artifact that can be added as regular dependency to your test code base.
+      Example <code>pom.xml</code> snippet when using Maven:
+    </p>
+    <pre>
+&lt;dependency&gt;
+    &lt;groupId&gt;org.apache.kafka&lt;/groupId&gt;
+    &lt;artifactId&gt;kafka-streams-test-utils&lt;/artifactId&gt;
+    &lt;version&gt;{{fullDotVersion}}&lt;/version&gt;
+    &lt;scope&gt;test&lt;/scope&gt;
+&lt;/dependency&gt;
+    </pre>
+    <p>
+    The test-utils package provides a <code>TopologyTestDriver</code> that can be used pipe data through a <code>Topology</code> that is either assembled manually
+    using Processor API or via the DSL using <code>StreamsBuilder</code>.
+    The test driver simulates the library runtime that continuously fetches records from input topics and processes them by traversing the topology.
+    You can use the test driver to verify that your specified processor topology computes the correct result with the manually piped in data records.
+    The test driver captures the results records and allows to query its embedded state stores.
+    <pre>
+// Processor API
+Topology topology = new Topology();
+topology.addSource("sourceProcessor", "input-topic");
+topology.addProcessor("processor", ..., "sourceProcessor");
+topology.addSink("sinkProcessor", "output-topic", "processor");
+// or
+// using DSL
+StreamsBuilder builder = new StreamsBuilder();
+builder.stream("input-topic").filter(...).to("output-topic");
+Topology topology = builder.build();
+
+// setup test driver
+Properties config = new Properties();
+config.put(StreamsConfig.APPLICATION_ID_CONFIG, "test");
+config.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "dummy:1234");
+TopologyTestDriver testDriver = new TopologyTestDriver(topology, config);
+    </pre>
+    <p>
+    The test driver accepts <code>ConsumerRecord</code>s with key and value type <code>byte[]</code>.
+    Because <code>byte[]</code> types can be problematic, you can use the <code>ConsumerRecordFactory</code> to generate those records
+    by providing regular Java types for key and values and the corresponding serializers.
+    </p>
+    <pre>
+ConsumerRecordFactory&lt;String, Integer&gt; factory = new ConsumerRecordFactory&lt;&gt;("input-topic", new StringSerializer(), new IntegerSerializer());
+testDriver.pipe(factory.create("key", 42L));
+    </pre>
+    <p>
+    To verify the output, the test driver produces <code>ProducerRecord</code>s with key and value type <code>byte[]</code>.
+    For result verification, you can specify corresponding deserializers when reading the output record from the driver.
+    <pre>
+ProducerRecord&lt;String, Integer&gt; outputRecord = testDriver.readOutput("output-topic", new StringDeserializer(), new LongDeserializer());
+    </pre>
+    <p>
+    For result verification, you can use <code>OutputVerifier</code>.
+    It offers helper methods to compare only certain parts of the result record:
+    for example, you might only care about the key and value, but not the timestamp of the result record.
+    </p>
+    <pre>
+OutputVerifier.compareKeyValue(outputRecord, "key", 42L); // throws AssertionError if key or value does not match
+    </pre>
+    <p>
+    <code>TopologyTestDriver</code> supports punctuations, too.
+    Event-time punctuations are triggered automatically based on the processed records' timestamps.
+    Wall-clock-time punctuations can also be triggered by advancing the test driver's wall-clock-time (the driver mocks wall-clock-time internally to give users control over it).
+    </p>
+    <pre>
+testDriver.advanceWallClockTime(20L);
+    </pre>
+    </div>
+    <p>
+    Additionally, you can access state stores via the test driver before or after a test.
+    Accessing stores before a test is useful to pre-populate a store with some initial values.
+    After data was processed, expected updates to the store can be verified.
+    </p>
+    <pre>
+KeyValueStore store = testDriver.getKeyValueStore("store-name");
+    </pre>
+    <p>
+    Note, that you should always close the test driver at the end to make sure all resources are release properly.
+    </p>
+    <pre>
+testDriver.close();
+    </pre>
+
+    <h2>Example</h2>
+    <p>
+    The following example demonstrates how to use the test driver and helper classes.
+    The example creates a topology that computes the maximum value per key using a key-value-store.
+    While processing, no output is generated, but only the store is updated.
+    Output is only sent downstream based on event-time and wall-clock punctuations.
+    </p>
+    <pre>
+private TopologyTestDriver testDriver;
+private KeyValueStore&lt;String, Long&gt; store;
+
+private StringDeserializer stringDeserializer = new StringDeserializer();
+private LongDeserializer longDeserializer = new LongDeserializer();
+private ConsumerRecordFactory&lt;String, Long&gt; recordFactory = new ConsumerRecordFactory&lt;&gt;(new StringSerializer(), new LongSerializer());
+
+@Before
+public void setup() {
+    Topology topology = new Topology();
+    topology.addSource("sourceProcessor", "input-topic");
+    topology.addProcessor("aggregator", new CustomMaxAggregatorSupplier(), "sourceProcessor");
+    topology.addStateStore(
+        Stores.keyValueStoreBuilder(
+            Stores.inMemoryKeyValueStore("aggStore"),
+            Serdes.String(),
+            Serdes.Long()).withLoggingDisabled(), // need to disable logging to allow store pre-populating
+        "aggregator");
+    topology.addSink("sinkProcessor", "result-topic", "aggregator");
+
+    // setup test driver
+    Properties config = new Properties();
+    config.setProperty(StreamsConfig.APPLICATION_ID_CONFIG, "maxAggregation");
+    config.setProperty(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "dummy:1234");
+    config.setProperty(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());
+    config.setProperty(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.Long().getClass().getName());
+    testDriver = new TopologyTestDriver(topology, config);
+
+    // pre-populate store
+    store = testDriver.getKeyValueStore("aggStore");
+    store.put("a", 21L);
+}
+
+@After
+public void tearDown() {
+    testDriver.close();
+}
+
+@Test
+public void shouldFlushStoreForFirstInput() {
+    testDriver.pipeInput(recordFactory.create("input-topic", "a", 1L, 9999L));
+    OutputVerifier.compareKeyValue(testDriver.readOutput("result-topic", stringDeserializer, longDeserializer), "a", 21L);
+    Assert.assertNull(testDriver.readOutput("result-topic", stringDeserializer, longDeserializer));
+}
+
+@Test
+public void shouldNotUpdateStoreForSmallerValue() {
+    testDriver.pipeInput(recordFactory.create("input-topic", "a", 1L, 9999L));
+    Assert.assertThat(store.get("a"), equalTo(21L));
+    OutputVerifier.compareKeyValue(testDriver.readOutput("result-topic", stringDeserializer, longDeserializer), "a", 21L);
+    Assert.assertNull(testDriver.readOutput("result-topic", stringDeserializer, longDeserializer));
+}
+
+@Test
+public void shouldNotUpdateStoreForLargerValue() {
+    testDriver.pipeInput(recordFactory.create("input-topic", "a", 42L, 9999L));
+    Assert.assertThat(store.get("a"), equalTo(42L));
+    OutputVerifier.compareKeyValue(testDriver.readOutput("result-topic", stringDeserializer, longDeserializer), "a", 42L);
+    Assert.assertNull(testDriver.readOutput("result-topic", stringDeserializer, longDeserializer));
+}
+
+@Test
+public void shouldUpdateStoreForNewKey() {
+    testDriver.pipeInput(recordFactory.create("input-topic", "b", 21L, 9999L));
+    Assert.assertThat(store.get("b"), equalTo(21L));
+    OutputVerifier.compareKeyValue(testDriver.readOutput("result-topic", stringDeserializer, longDeserializer), "a", 21L);
+    OutputVerifier.compareKeyValue(testDriver.readOutput("result-topic", stringDeserializer, longDeserializer), "b", 21L);
+    Assert.assertNull(testDriver.readOutput("result-topic", stringDeserializer, longDeserializer));
+}
+
+@Test
+public void shouldPunctuateIfEvenTimeAdvances() {
+    testDriver.pipeInput(recordFactory.create("input-topic", "a", 1L, 9999L));
+    OutputVerifier.compareKeyValue(testDriver.readOutput("result-topic", stringDeserializer, longDeserializer), "a", 21L);
+
+    testDriver.pipeInput(recordFactory.create("input-topic", "a", 1L, 9999L));
+    Assert.assertNull(testDriver.readOutput("result-topic", stringDeserializer, longDeserializer));
+
+    testDriver.pipeInput(recordFactory.create("input-topic", "a", 1L, 10000L));
+    OutputVerifier.compareKeyValue(testDriver.readOutput("result-topic", stringDeserializer, longDeserializer), "a", 21L);
+    Assert.assertNull(testDriver.readOutput("result-topic", stringDeserializer, longDeserializer));
+}
+
+@Test
+public void shouldPunctuateIfWallClockTimeAdvances() {
+    testDriver.advanceWallClockTime(60000);
+    OutputVerifier.compareKeyValue(testDriver.readOutput("result-topic", stringDeserializer, longDeserializer), "a", 21L);
+    Assert.assertNull(testDriver.readOutput("result-topic", stringDeserializer, longDeserializer));
+}
+
+public class CustomMaxAggregatorSupplier implements ProcessorSupplier&lt;String, Long&gt; {
+    @Override
+    public Processor&lt;String, Long&gt; get() {
+        return new CustomMaxAggregator();
+    }
+}
+
+public class CustomMaxAggregator implements Processor&lt;String, Long&gt; {
+    ProcessorContext context;
+    private KeyValueStore&lt;String, Long&gt; store;
+
+    @SuppressWarnings("unchecked")
+    @Override
+    public void init(ProcessorContext context) {
+        this.context = context;
+        context.schedule(60000, PunctuationType.WALL_CLOCK_TIME, new Punctuator() {
+            @Override
+            public void punctuate(long timestamp) {
+                flushStore();
+            }
+        });
+        context.schedule(10000, PunctuationType.STREAM_TIME, new Punctuator() {
+            @Override
+            public void punctuate(long timestamp) {
+                flushStore();
+            }
+        });
+        store = (KeyValueStore&lt;String, Long&gt;) context.getStateStore("aggStore");
+    }
+
+    @Override
+    public void process(String key, Long value) {
+        Long oldValue = store.get(key);
+        if (oldValue == null || value &gt; oldValue) {
+            store.put(key, value);
+        }
+    }
+
+    private void flushStore() {
+        KeyValueIterator&lt;String, Long&gt; it = store.all();
+        while (it.hasNext()) {
+            KeyValue&lt;String, Long&gt; next = it.next();
+            context.forward(next.key, next.value);
+        }
+    }
+
+    @Override
+    public void punctuate(long timestamp) {} // deprecated; not used
+
+    @Override
+    public void close() {}
+}
+    </pre>
+  <div class="pagination">
+  <div class="pagination">
+    <a href="/{{version}}/documentation/streams/developer-guide/datatypes" class="pagination__btn pagination__btn__prev">Previous</a>
+    <a href="/{{version}}/documentation/streams/developer-guide/interactive-queries" class="pagination__btn pagination__btn__next">Next</a>
+  </div>
+</script>
+
+<!--#include virtual="../../../includes/_header.htm" -->
+<!--#include virtual="../../../includes/_top.htm" -->
+<div class="content documentation documentation--current">
+  <!--#include virtual="../../../includes/_nav.htm" -->
+  <div class="right">
+    <!--#include virtual="../../../includes/_docs_banner.htm" -->
+    <ul class="breadcrumbs">
+      <li><a href="/documentation">Documentation</a></li>
+      <li><a href="/documentation/streams">Kafka Streams</a></li>
+      <li><a href="/documentation/streams/developer-guide/">Developer Guide</a></li>
+    </ul>
+    <div class="p-content"></div>
+  </div>
+</div>
+<!--#include virtual="../../../includes/_footer.htm" -->
+<script>
+    $(function() {
+        // Show selected style on nav item
+        $('.b-nav__streams').addClass('selected');
+
+        //sticky secondary nav
+        var $navbar = $(".sub-nav-sticky"),
+            y_pos = $navbar.offset().top,
+            height = $navbar.height();
+
+        $(window).scroll(function() {
+            var scrollTop = $(window).scrollTop();
+
+            if (scrollTop > y_pos - height) {
+                $navbar.addClass("navbar-fixed")
+            } else if (scrollTop <= y_pos) {
+                $navbar.removeClass("navbar-fixed")
+            }
+        });
+
+        // Display docs subnav items
+        $('.b-nav__docs').parent().toggleClass('nav__item__with__subs--expanded');
+    });
+</script>

-- 
To stop receiving notification emails like this one, please contact
mjsax@apache.org.