You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by bb...@apache.org on 2019/11/14 00:04:36 UTC
[kafka] branch 2.4 updated: KAFKA-9072: Add Topology naming to the
dev guide (#7629)
This is an automated email from the ASF dual-hosted git repository.
bbejeck pushed a commit to branch 2.4
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/2.4 by this push:
new 124b5ec KAFKA-9072: Add Topology naming to the dev guide (#7629)
124b5ec is described below
commit 124b5ecb259dbb267dc6dd2a6d0bcec4bfa5b1d0
Author: Bill Bejeck <bb...@gmail.com>
AuthorDate: Wed Nov 13 10:54:47 2019 -0500
KAFKA-9072: Add Topology naming to the dev guide (#7629)
Reviewers: Jim Galasyn <ji...@confluent.io>, Matthias J. Sax <mj...@apache.org>, Sophie Blee-Goldman <so...@confluent.io>
---
docs/streams/developer-guide/dsl-api.html | 6 +-
.../developer-guide/dsl-topology-naming.html | 370 +++++++++++++++++++++
2 files changed, 375 insertions(+), 1 deletion(-)
diff --git a/docs/streams/developer-guide/dsl-api.html b/docs/streams/developer-guide/dsl-api.html
index 5be029e..e54d449 100644
--- a/docs/streams/developer-guide/dsl-api.html
+++ b/docs/streams/developer-guide/dsl-api.html
@@ -67,6 +67,7 @@
<li><a class="reference internal" href="#applying-processors-and-transformers-processor-api-integration" id="id24">Applying processors and transformers (Processor API integration)</a></li>
</ul>
</li>
+ <li><a class="reference internal" href="#naming-a-streams-app" id="id33">Naming Operators in a Streams DSL application</a></li>
<li><a class="reference internal" href="#controlling-emit-rate" id="id32">Controlling KTable update rate</a></li>
<li><a class="reference internal" href="#writing-streams-back-to-kafka" id="id25">Writing streams back to Kafka</a></li>
<li><a class="reference internal" href="#testing-a-streams-app" id="id26">Testing a Streams application</a></li>
@@ -3515,7 +3516,10 @@ grouped
</div>
</div>
</div>
-
+ <div class="section" id="naming-a-streams-app">
+ <a class="headerlink" href="#naming-a-streams-app" title="Permalink to this headline"><h2><a class="toc-backref" href="#id33">Naming Operators in a Streams DSL application</a></h2></a>
+ Kafka Streams allows you to <a class="reference internal" href="dsl-topology-naming.html">name processors</a> created via the Streams DSL
+ </div>
<div class="section" id="controlling-emit-rate">
<span id="streams-developer-guide-dsl-suppression"></span><h2><a class="toc-backref" href="#id32">Controlling KTable emit rate</a><a class="headerlink" href="#controlling-emit-rate" title="Permalink to this headline"></a></h2>
<p>A KTable is logically a continuously updated table.
diff --git a/docs/streams/developer-guide/dsl-topology-naming.html b/docs/streams/developer-guide/dsl-topology-naming.html
new file mode 100644
index 0000000..e0c1e1f
--- /dev/null
+++ b/docs/streams/developer-guide/dsl-topology-naming.html
@@ -0,0 +1,370 @@
+<!--
+ Licensed to the Apache Software Foundation (ASF) under one or more
+ contributor license agreements. See the NOTICE file distributed with
+ this work for additional information regarding copyright ownership.
+ The ASF licenses this file to You under the Apache License, Version 2.0
+ (the "License"); you may not use this file except in compliance with
+ the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+-->
+
+<script><!--#include virtual="../../js/templateData.js" --></script>
+
+<script id="content-template" type="text/x-handlebars-template">
+ <!-- h1>Developer Guide for Kafka Streams</h1 -->
+ <div class="sub-nav-sticky">
+ <div class="sticky-top">
+ <!-- div style="height:35px">
+ <a href="/{{version}}/documentation/streams/">Introduction</a>
+ <a class="active-menu-item" href="/{{version}}/documentation/streams/developer-guide">Developer Guide</a>
+ <a href="/{{version}}/documentation/streams/core-concepts">Concepts</a>
+ <a href="/{{version}}/documentation/streams/quickstart">Run Demo App</a>
+ <a href="/{{version}}/documentation/streams/tutorial">Tutorial: Write App</a>
+ </div -->
+ </div>
+ </div>
+
+ <div class="section" id="naming">
+ <span id="streams-developer-guide-dsl-topology-naming"></span>
+ <h1>Naming Operators in a Kafka Streams DSL Application<a class="headerlink" href="#naming" title="Permalink to this headline"></a></h1>
+
+ <p>
+ You now can give names to processors when using the Kafka Streams DSL.
+ In the PAPI there are <code>Processors</code> and <code>State Stores</code> and
+ you are required to explicitly name each one.
+ </p>
+ <p>
+ At the DLS layer, there are operators. A single DSL operator may
+ compile down to multiple <code>Processors</code> and <code>State Stores</code>, and
+ if required <code>repartition topics</code>. But with the Kafka Streams
+ DSL, all these names are generated for you. There is a relationship between
+ the generated processor name state store names (hence changelog topic names) and repartition
+ topic names. Note, that the names of state stores and changelog/repartition topics
+ are "stateful" while processor names are "stateless".
+ </p>
+ <p>
+ This distinction
+ of stateful vs. stateless names has important implications when updating your topology.
+ While the internal naming makes creating
+ a topology with the DSL much more straightforward,
+ there are a couple of trade-offs. The first trade-off is what we could
+ consider a readability issue. The other
+ more severe trade-off is the shifting of names due to the relationship between the
+ DSL operator and the generated <code>Processors</code>, <code>State Stores</code> changelog
+ topics and repartition topics.
+ </p>
+
+
+ <h2>Readability Issues</h2>
+
+ <p>
+ By saying there is a readability trade-off, we are referring to viewing a description of the topology.
+ When you render the string description of your topology via the <code>Topology#desribe()</code>
+ method, you can see what the processor is, but you don't have any context for its business purpose.
+ For example, consider the following simple topology:
+
+ <br/>
+ <pre>
+ KStream<String,String> stream = builder.stream("input");
+ stream.filter((k,v) -> !v.equals("invalid_txn"))
+ .mapValues((v) -> v.substring(0,5))
+ .to("output")
+ </pre>
+
+ </p>
+
+ <p>
+ Running <code>Topology#describe()</code> yields this string:
+
+ <pre>
+ Topologies:
+ Sub-topology: 0
+ Source: KSTREAM-SOURCE-0000000000 (topics: [input])
+ --> KSTREAM-FILTER-0000000001
+ Processor: KSTREAM-FILTER-0000000001 (stores: [])
+ --> KSTREAM-MAPVALUES-0000000002
+ <-- KSTREAM-SOURCE-0000000000
+ Processor: KSTREAM-MAPVALUES-0000000002 (stores: [])
+ --> KSTREAM-SINK-0000000003
+ <-- KSTREAM-FILTER-0000000001
+ Sink: KSTREAM-SINK-0000000003 (topic: output)
+ <-- KSTREAM-MAPVALUES-0000000002
+ </pre>
+
+ From this report, you can see what the different operators are, but what is the broader context here?
+ For example, consider <code>KSTREAM-FILTER-0000000001</code>, we can see that it's a
+ filter operation, which means that records are dropped that don't match the given predicate. But what is
+ the meaning of the predicate? Additionally, you can see the topic names of the source and sink nodes,
+ but what if the topics aren't named in a meaningful way? Then you're left to guess the
+ business purpose behind these topics.
+ </p>
+ <p>
+ Also notice the numbering here: the source node is suffixed with <code>0000000000</code>
+ indicating it's the first processor in the topology.
+ The filter is suffixed with <code>0000000001</code>, indicating it's the second processor in
+ the topology. In Kafka Streams, there are now overloaded methods for
+ both <code>KStream</code> and <code>KTable</code> that accept
+ a new parameter <code>Named</code>. By using the <code>Named</code> class DSL users can
+ provide meaningful names to the processors in their topology.
+ </p>
+ <p>
+ Now let's take a look at your topology with all the processors named:
+ <pre>
+ KStream<String,String> stream =
+ builder.stream("input", Consumed.as("Customer_transactions_input_topic"));
+ stream.filter((k,v) -> !v.equals("invalid_txn"), Named.as("filter_out_invalid_txns"))
+ .mapValues((v) -> v.substring(0,5), Named.as("Map_values_to_first_6_characters"))
+ .to("output", Produced.as("Mapped_transactions_output_topic"));
+ </pre>
+
+ <pre>
+ Topologies:
+ Sub-topology: 0
+ Source: Customer_transactions_input_topic (topics: [input])
+ --> filter_out_invalid_txns
+ Processor: filter_out_invalid_txns (stores: [])
+ --> Map_values_to_first_6_characters
+ <-- Customer_transactions_input_topic
+ Processor: Map_values_to_first_6_characters (stores: [])
+ --> Mapped_transactions_output_topic
+ <-- filter_out_invalid_txns
+ Sink: Mapped_transactions_output_topic (topic: output)
+ <-- Map_values_to_first_6_characters
+ </pre>
+
+ Now you can look at the topology description and easily understand what role each processor
+ plays in the topology. But there's another reason for naming your processor nodes when you
+ have stateful operators that remain between restarts of your Kafka Streams applications,
+ state stores, changelog topics, and repartition topics.
+ </p>
+
+ <h2>Changing Names</h2>
+ <p>
+ Generated names are numbered where they are built in the topology.
+ The name generation strategy is
+ <code>KSTREAM|KTABLE->operator name<->number suffix<</code>. The number is a
+ globally incrementing number that represents the operator's order in the topology.
+ The generated number is prefixed with a varying number of "0"s to create a
+ string that is consistently 10 characters long.
+ This means that if you add/remove or shift the order of operations, the position of the
+ processor shifts, which shifts the name of the processor. Since <strong>most</strong> processors exist
+ in memory only, this name shifting presents no issue for many topologies. But the name
+ shifting does have implications for topologies with stateful operators or repartition topics.
+
+ Here's a different topology with some state:
+ <pre>
+ KStream<String,String> stream = builder.stream("input");
+ stream.groupByKey()
+ .count()
+ .toStream()
+ .to("output");
+ </pre>
+ This topology description yields the following:
+ <pre>
+ Topologies:
+ Sub-topology: 0
+ Source: KSTREAM-SOURCE-0000000000 (topics: [input])
+ --> KSTREAM-AGGREGATE-0000000002
+ Processor: KSTREAM-AGGREGATE-0000000002 (stores: [KSTREAM-AGGREGATE-STATE-STORE-0000000001])
+ --> KTABLE-TOSTREAM-0000000003
+ <-- KSTREAM-SOURCE-0000000000
+ Processor: KTABLE-TOSTREAM-0000000003 (stores: [])
+ --> KSTREAM-SINK-0000000004
+ <-- KSTREAM-AGGREGATE-0000000002
+ Sink: KSTREAM-SINK-0000000004 (topic: output)
+ <-- KTABLE-TOSTREAM-0000000003
+ </pre>
+ </p>
+ <p>
+ You can see from the topology description above that the state store is named
+ <code>KSTREAM-AGGREGATE-STATE-STORE-0000000002</code>. Here's what happens when you
+ add a filter to keep some of the records out of the aggregation:
+ <pre>
+ KStream<String,String> stream = builder.stream("input");
+ stream.filter((k,v)-> v !=null && v.length() >= 6 )
+ .groupByKey()
+ .count()
+ .toStream()
+ .to("output");
+ </pre>
+
+ And the corresponding topology:
+ <pre>
+ Topologies:
+ Sub-topology: 0
+ Source: KSTREAM-SOURCE-0000000000 (topics: [input])
+ --> KSTREAM-FILTER-0000000001
+ Processor: KSTREAM-FILTER-0000000001 (stores: [])
+ --> KSTREAM-AGGREGATE-0000000003
+ <-- KSTREAM-SOURCE-0000000000
+ Processor: KSTREAM-AGGREGATE-0000000003 (stores: [KSTREAM-AGGREGATE-STATE-STORE-0000000002])
+ --> KTABLE-TOSTREAM-0000000004
+ <-- KSTREAM-FILTER-0000000001
+ Processor: KTABLE-TOSTREAM-0000000004 (stores: [])
+ --> KSTREAM-SINK-0000000005
+ <-- KSTREAM-AGGREGATE-0000000003
+ Sink: KSTREAM-SINK-0000000005 (topic: output)
+ <-- KTABLE-TOSTREAM-0000000004
+ </pre>
+ </p>
+ <p>
+ Notice that since you've added an operation <em>before</em> the <code>count</code> operation, the state
+ store (and the changelog topic) names have changed. This name change means you can't
+ do a rolling re-deployment of your updated topology. Also, you must use the
+ <a href="/{{version}}/documentation/streams/developer-guide/app-reset-tool">Streams Reset Tool</a>
+ to re-calculate the aggregations, because the changelog topic has changed on start-up and the
+ new changelog topic contains no data.
+
+ Fortunately, there's an easy solution to remedy this situation. Give the
+ state store a user-defined name instead of relying on the generated one,
+ so you don't have to worry about topology changes shifting the name of the state store.
+
+ You've had the ability to name repartition topics with the <code>Joined</code>,
+ <code>StreamJoined</code>, and<code>Grouped</code> classes, and
+ name state store and changelog topics with <code>Materialized</code>.
+ But it's worth reiterating the importance of naming these DSL topology operations again.
+
+ Here's how your DSL code looks now giving a specific name to your state store:
+ <pre>
+ KStream<String,String> stream = builder.stream("input");
+ stream.filter((k, v) -> v != null && v.length() >= 6)
+ .groupByKey()
+ .count(Materialized.as("Purchase_count_store"))
+ .toStream()
+ .to("output");
+ </pre>
+
+ And here's the topology
+
+ <pre>
+ Topologies:
+ Sub-topology: 0
+ Source: KSTREAM-SOURCE-0000000000 (topics: [input])
+ --> KSTREAM-FILTER-0000000001
+ Processor: KSTREAM-FILTER-0000000001 (stores: [])
+ --> KSTREAM-AGGREGATE-0000000002
+ <-- KSTREAM-SOURCE-0000000000
+ Processor: KSTREAM-AGGREGATE-0000000002 (stores: [Purchase_count_store])
+ --> KTABLE-TOSTREAM-0000000003
+ <-- KSTREAM-FILTER-0000000001
+ Processor: KTABLE-TOSTREAM-0000000003 (stores: [])
+ --> KSTREAM-SINK-0000000004
+ <-- KSTREAM-AGGREGATE-0000000002
+ Sink: KSTREAM-SINK-0000000004 (topic: output)
+ <-- KTABLE-TOSTREAM-0000000003
+ </pre>
+ </p>
+ <p>
+ Now, even though you've added processors before your state store, the store name and its changelog
+ topic names don't change. This makes your topology more robust and resilient to changes made by
+ adding or removing processors.
+ </p>
+
+ <h2>Conclusion</h2>
+
+ It's a good practice to name your processing nodes when using the DSL, and it's even
+ more important to do this when you have "stateful" processors
+ your application such as repartition
+ topics and state stores (and the accompanying changelog topics).
+ <p>
+ Here are a couple of points to remember when naming your DSL topology:
+ <ol>
+ <li>
+ If you have an <em>existing topology</em> and you <em>haven't</em> named your
+ state stores (and changelog topics) and repartition topics, we recommended that you
+ do so. But this will be a topology breaking change, so you'll need to shut down all
+ application instances, make the changes, and run the
+ <a href="/{{version}}/documentation/streams/developer-guide/app-reset-tool">Streams Reset Tool</a>.
+ Although this may be inconvenient at first, it's worth the effort to protect your application from
+ unexpected errors due to topology changes.
+ </li>
+ <li>
+ If you have a <em>new topology</em>, make sure you name the persistent parts of your topology:
+ state stores (changelog topics) and repartition topics. This way, when you deploy your
+ application, you're protected from topology changes that otherwise would break your Kafka Streams application.
+ If you don't want to add names to stateless processors at first, that's fine as you can
+ always go back and add the names later.
+ </li>
+ </ol>
+
+ Here's a quick reference on naming the critical parts of
+ your Kafka Streams application to prevent topology name changes from breaking your application:
+
+ <table>
+ <tr>
+ <th>Operation</th><th>Naming Class</th>
+ </tr>
+ <tr>
+ <td>Aggregation repartition topics</td><td>Grouped</td>
+ </tr>
+ <tr>
+ <td>KStream-KStream Join repartition topics</td><td>StreamJoined</td>
+ </tr>
+ <tr>
+ <td>KStream-KTable Join repartition topic</td><td>Joined</td>
+ </tr>
+ <tr>
+ <td>KStream-KStream Join state stores</td><td>StreamJoined</td>
+ </tr>
+ <tr>
+ <td>State Stores (for aggregations and KTable-KTable joins)</td><td>Materialized</td>
+ </tr>
+ <tr>
+ <td>Stream/Table non-stateful operations</td><td>Named</td>
+ </tr>
+ </table>
+ </p>
+</div>
+
+</script>
+
+<!--#include virtual="../../../includes/_header.htm" -->
+<!--#include virtual="../../../includes/_top.htm" -->
+<div class="content documentation documentation--current">
+ <!--#include virtual="../../../includes/_nav.htm" -->
+ <div class="right">
+ <!--#include virtual="../../../includes/_docs_banner.htm" -->
+ <ul class="breadcrumbs">
+ <li><a href="/documentation">Documentation</a></li>
+ <li><a href="/documentation/streams">Kafka Streams</a></li>
+ <li><a href="/documentation/streams/developer-guide/">Developer Guide</a></li>
+ </ul>
+ <div class="p-content"></div>
+ </div>
+</div>
+<!--#include virtual="../../../includes/_footer.htm" -->
+<script>
+ $(function () {
+ // Show selected style on nav item
+ $('.b-nav__streams').addClass('selected');
+
+ //sticky secondary nav
+ var $navbar = $(".sub-nav-sticky"),
+ y_pos = $navbar.offset().top,
+ height = $navbar.height();
+
+ $(window).scroll(function () {
+ var scrollTop = $(window).scrollTop();
+
+ if (scrollTop > y_pos - height) {
+ $navbar.addClass("navbar-fixed")
+ } else if (scrollTop <= y_pos) {
+ $navbar.removeClass("navbar-fixed")
+ }
+ });
+
+ // Display docs subnav items
+ $('.b-nav__docs').parent().toggleClass('nav__item__with__subs--expanded');
+ });
+</script>
+
+
+
+