You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by bb...@apache.org on 2019/11/14 00:04:36 UTC

[kafka] branch 2.4 updated: KAFKA-9072: Add Topology naming to the dev guide (#7629)

This is an automated email from the ASF dual-hosted git repository.

bbejeck pushed a commit to branch 2.4
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/2.4 by this push:
     new 124b5ec  KAFKA-9072: Add Topology naming to the dev guide (#7629)
124b5ec is described below

commit 124b5ecb259dbb267dc6dd2a6d0bcec4bfa5b1d0
Author: Bill Bejeck <bb...@gmail.com>
AuthorDate: Wed Nov 13 10:54:47 2019 -0500

    KAFKA-9072: Add Topology naming to the dev guide (#7629)
    
    Reviewers: Jim Galasyn <ji...@confluent.io>, Matthias J. Sax <mj...@apache.org>, Sophie Blee-Goldman <so...@confluent.io>
---
 docs/streams/developer-guide/dsl-api.html          |   6 +-
 .../developer-guide/dsl-topology-naming.html       | 370 +++++++++++++++++++++
 2 files changed, 375 insertions(+), 1 deletion(-)

diff --git a/docs/streams/developer-guide/dsl-api.html b/docs/streams/developer-guide/dsl-api.html
index 5be029e..e54d449 100644
--- a/docs/streams/developer-guide/dsl-api.html
+++ b/docs/streams/developer-guide/dsl-api.html
@@ -67,6 +67,7 @@
                     <li><a class="reference internal" href="#applying-processors-and-transformers-processor-api-integration" id="id24">Applying processors and transformers (Processor API integration)</a></li>
                 </ul>
                 </li>
+                <li><a class="reference internal" href="#naming-a-streams-app" id="id33">Naming Operators in a Streams DSL application</a></li>
                 <li><a class="reference internal" href="#controlling-emit-rate" id="id32">Controlling KTable update rate</a></li>
                 <li><a class="reference internal" href="#writing-streams-back-to-kafka" id="id25">Writing streams back to Kafka</a></li>
                 <li><a class="reference internal" href="#testing-a-streams-app" id="id26">Testing a Streams application</a></li>
@@ -3515,7 +3516,10 @@ grouped
                 </div>
             </div>
         </div>
-
+        <div class="section" id="naming-a-streams-app">
+            <a class="headerlink" href="#naming-a-streams-app" title="Permalink to this headline"><h2><a class="toc-backref" href="#id33">Naming Operators in a Streams DSL application</a></h2></a>
+            Kafka Streams allows you to <a class="reference internal" href="dsl-topology-naming.html">name processors</a> created via the Streams DSL
+        </div>
 	<div class="section" id="controlling-emit-rate">
             <span id="streams-developer-guide-dsl-suppression"></span><h2><a class="toc-backref" href="#id32">Controlling KTable emit rate</a><a class="headerlink" href="#controlling-emit-rate" title="Permalink to this headline"></a></h2>
 	    <p>A KTable is logically a continuously updated table.
diff --git a/docs/streams/developer-guide/dsl-topology-naming.html b/docs/streams/developer-guide/dsl-topology-naming.html
new file mode 100644
index 0000000..e0c1e1f
--- /dev/null
+++ b/docs/streams/developer-guide/dsl-topology-naming.html
@@ -0,0 +1,370 @@
+<!--
+ Licensed to the Apache Software Foundation (ASF) under one or more
+ contributor license agreements.  See the NOTICE file distributed with
+ this work for additional information regarding copyright ownership.
+ The ASF licenses this file to You under the Apache License, Version 2.0
+ (the "License"); you may not use this file except in compliance with
+ the License.  You may obtain a copy of the License at
+
+    http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+-->
+
+<script><!--#include virtual="../../js/templateData.js" --></script>
+
+<script id="content-template" type="text/x-handlebars-template">
+    <!-- h1>Developer Guide for Kafka Streams</h1 -->
+    <div class="sub-nav-sticky">
+        <div class="sticky-top">
+            <!-- div style="height:35px">
+              <a href="/{{version}}/documentation/streams/">Introduction</a>
+              <a class="active-menu-item" href="/{{version}}/documentation/streams/developer-guide">Developer Guide</a>
+              <a href="/{{version}}/documentation/streams/core-concepts">Concepts</a>
+              <a href="/{{version}}/documentation/streams/quickstart">Run Demo App</a>
+              <a href="/{{version}}/documentation/streams/tutorial">Tutorial: Write App</a>
+            </div -->
+        </div>
+    </div>
+
+	<div class="section" id="naming">
+        <span id="streams-developer-guide-dsl-topology-naming"></span>
+        <h1>Naming Operators in a Kafka Streams DSL Application<a class="headerlink" href="#naming" title="Permalink to this headline"></a></h1>
+
+        <p>
+		   You now can give names to processors when using the Kafka Streams DSL.
+		   In the PAPI there are <code>Processors</code> and <code>State Stores</code> and
+		   you are required to explicitly name each one.
+	    </p>
+		<p>
+		   At the DLS layer, there are operators.  A single DSL operator may
+		   compile down to multiple <code>Processors</code> and <code>State Stores</code>, and
+		   if required <code>repartition topics</code>. But with the Kafka Streams
+		   DSL, all these names are generated for you. There is a relationship between
+		   the generated processor name state store names (hence changelog topic names) and repartition
+		   topic names. Note, that the names of state stores and changelog/repartition topics
+		   are "stateful" while processor names are "stateless".
+	    </p>
+		<p>
+			This distinction
+		   of stateful vs. stateless names has important implications when updating your topology.
+		   While the internal naming makes creating
+		   a topology with the DSL much more straightforward,
+		   there are a couple of trade-offs.  The first trade-off is what we could
+		   consider a readability issue. The other
+		   more severe trade-off is the shifting of names due to the relationship between the
+		   DSL operator and the generated <code>Processors</code>, <code>State Stores</code> changelog
+		   topics and repartition topics.
+	   </p>
+
+
+		<h2>Readability Issues</h2>
+
+		<p>
+		 	By saying there is a readability trade-off, we are referring to viewing a description of the topology.
+		 	When you render the string description of your topology via the <code>Topology#desribe()</code>
+		 	method, you can see what the processor is, but you don't have any context for its business purpose.
+		 	For example, consider the following simple topology:
+
+			<br/>
+		<pre>
+		KStream&lt;String,String&gt; stream = builder.stream("input");
+		stream.filter((k,v) -> !v.equals("invalid_txn"))
+		      .mapValues((v) -> v.substring(0,5))
+		      .to("output")
+	    </pre>
+
+		</p>
+
+		<p>
+		Running <code>Topology#describe()</code> yields this string:
+
+		<pre>
+		Topologies:
+		   Sub-topology: 0
+		    Source: KSTREAM-SOURCE-0000000000 (topics: [input])
+		      --> KSTREAM-FILTER-0000000001
+		    Processor: KSTREAM-FILTER-0000000001 (stores: [])
+		      --> KSTREAM-MAPVALUES-0000000002
+		      <-- KSTREAM-SOURCE-0000000000
+		    Processor: KSTREAM-MAPVALUES-0000000002 (stores: [])
+		      --> KSTREAM-SINK-0000000003
+		      <-- KSTREAM-FILTER-0000000001
+		    Sink: KSTREAM-SINK-0000000003 (topic: output)
+		      <-- KSTREAM-MAPVALUES-0000000002
+		 </pre>
+
+		 From this report, you can see what the different operators are, but what is the broader context here?
+		 For example, consider <code>KSTREAM-FILTER-0000000001</code>, we can see that it's a
+		 filter operation, which means that records are dropped that don't match the given predicate.  But what is
+		 the meaning of the predicate?  Additionally, you can see the topic names of the source and sink nodes,
+		 but what if the topics aren't named in a meaningful way?  Then you're left to guess the
+		 business purpose behind these topics.
+		</p>
+		<p>
+		 Also notice the numbering here: the source node is suffixed with <code>0000000000</code>
+		 indicating it's the first processor in the topology.  
+		 The filter is suffixed with <code>0000000001</code>, indicating it's the second processor in
+		 the topology.   In Kafka Streams, there are now overloaded methods for
+		 both <code>KStream</code> and <code>KTable</code> that accept
+		 a new parameter <code>Named</code>. By using the <code>Named</code> class DSL users can
+		 provide meaningful names to the processors in their topology.
+		</p>
+		<p>
+		 Now let's take a look at your topology with all the processors named:
+		<pre>
+		KStream&lt;String,String&gt; stream =
+		builder.stream("input", Consumed.as("Customer_transactions_input_topic"));
+		stream.filter((k,v) -> !v.equals("invalid_txn"), Named.as("filter_out_invalid_txns"))
+		      .mapValues((v) -> v.substring(0,5), Named.as("Map_values_to_first_6_characters"))
+		      .to("output", Produced.as("Mapped_transactions_output_topic"));
+	     </pre>
+
+		 <pre>
+		 Topologies:
+		   Sub-topology: 0
+		    Source: Customer_transactions_input_topic (topics: [input])
+		      --> filter_out_invalid_txns
+		    Processor: filter_out_invalid_txns (stores: [])
+		      --> Map_values_to_first_6_characters
+		      <-- Customer_transactions_input_topic
+		    Processor: Map_values_to_first_6_characters (stores: [])
+		      --> Mapped_transactions_output_topic
+		      <-- filter_out_invalid_txns
+		    Sink: Mapped_transactions_output_topic (topic: output)
+		      <-- Map_values_to_first_6_characters
+		 </pre>
+
+		Now you can look at the topology description and easily understand what role each processor
+		plays in the topology. But there's another reason for naming your processor nodes when you
+		have stateful operators that remain between restarts of your Kafka Streams applications, 
+		state stores, changelog topics, and repartition topics.
+		</p>
+
+		<h2>Changing Names</h2>
+		<p>
+		 Generated names are numbered where they are built in the topology.
+		 The name generation strategy is
+		 <code>KSTREAM|KTABLE-&gt;operator name&lt;-&gt;number suffix&lt;</code>. The number is a
+		 globally incrementing number that represents the operator's order in the topology.
+		 The generated number is prefixed with a varying number of "0"s to create a
+		 string that is consistently 10 characters long.
+		 This means that if you add/remove or shift the order of operations, the position of the
+		 processor shifts, which shifts the name of the processor.  Since <strong>most</strong> processors exist
+		 in memory only, this name shifting presents no issue for many topologies.  But the name
+		 shifting does have implications for topologies with stateful operators or repartition topics. 
+
+		 Here's a different topology with some state:
+		 <pre>
+		 KStream&lt;String,String&gt; stream = builder.stream("input");
+		 stream.groupByKey()
+		       .count()
+		       .toStream()
+		       .to("output");
+		 </pre>
+		This topology description yields the following:
+		 <pre>
+			 Topologies:
+			   Sub-topology: 0
+			    Source: KSTREAM-SOURCE-0000000000 (topics: [input])
+			     --> KSTREAM-AGGREGATE-0000000002
+			    Processor: KSTREAM-AGGREGATE-0000000002 (stores: [KSTREAM-AGGREGATE-STATE-STORE-0000000001])
+			     --> KTABLE-TOSTREAM-0000000003
+			     <-- KSTREAM-SOURCE-0000000000
+			    Processor: KTABLE-TOSTREAM-0000000003 (stores: [])
+			     --> KSTREAM-SINK-0000000004
+			     <-- KSTREAM-AGGREGATE-0000000002
+			    Sink: KSTREAM-SINK-0000000004 (topic: output)
+			     <-- KTABLE-TOSTREAM-0000000003
+		 </pre>
+		 </p>
+		 <p>
+		  You can see from the topology description above that the state store is named
+		  <code>KSTREAM-AGGREGATE-STATE-STORE-0000000002</code>.  Here's what happens when you
+		  add a filter to keep some of the records out of the aggregation:
+		  <pre>
+		   KStream&lt;String,String&gt; stream = builder.stream("input");
+		   stream.filter((k,v)-> v !=null && v.length() >= 6 )
+		         .groupByKey()
+		         .count()
+		         .toStream()
+		         .to("output");
+		  </pre>
+
+		  And the corresponding topology:
+		  <pre>
+			  Topologies:
+			    Sub-topology: 0
+			     Source: KSTREAM-SOURCE-0000000000 (topics: [input])
+			      --> KSTREAM-FILTER-0000000001
+			     Processor: KSTREAM-FILTER-0000000001 (stores: [])
+			       --> KSTREAM-AGGREGATE-0000000003
+			       <-- KSTREAM-SOURCE-0000000000
+			     Processor: KSTREAM-AGGREGATE-0000000003 (stores: [KSTREAM-AGGREGATE-STATE-STORE-0000000002])
+			       --> KTABLE-TOSTREAM-0000000004
+			       <-- KSTREAM-FILTER-0000000001
+			     Processor: KTABLE-TOSTREAM-0000000004 (stores: [])
+			       --> KSTREAM-SINK-0000000005
+			       <-- KSTREAM-AGGREGATE-0000000003
+			      Sink: KSTREAM-SINK-0000000005 (topic: output)
+			       <-- KTABLE-TOSTREAM-0000000004
+		  </pre>
+		 </p>
+		<p>
+		Notice that since you've added an operation <em>before</em> the <code>count</code> operation, the state
+		  store (and the changelog topic) names have changed.  This name change means you can't
+		  do a rolling re-deployment of your updated topology.  Also, you must use the
+		  <a href="/{{version}}/documentation/streams/developer-guide/app-reset-tool">Streams Reset Tool</a> 
+		  to re-calculate the aggregations, because the changelog topic has changed on start-up and the
+		  new changelog topic contains no data.
+
+		  Fortunately, there's an easy solution to remedy this situation.  Give the
+		  state store a user-defined name instead of relying on the generated one,
+		  so you don't have to worry about topology changes shifting the name of the state store.
+
+		  You've had the ability to name repartition topics with the <code>Joined</code>,
+		  <code>StreamJoined</code>, and<code>Grouped</code> classes, and
+		  name state store and changelog topics with <code>Materialized</code>.
+		  But it's worth reiterating the importance of naming these DSL topology operations again.
+
+		  Here's how your DSL code looks now giving a specific name to your state store:
+		  <pre>
+		  KStream&lt;String,String&gt; stream = builder.stream("input");
+		  stream.filter((k, v) -> v != null && v.length() >= 6)
+		        .groupByKey()
+		        .count(Materialized.as("Purchase_count_store"))
+		        .toStream()
+		        .to("output");
+		  </pre>
+
+		  And here's the topology 
+
+		  <pre>
+		  Topologies:
+		   Sub-topology: 0
+		    Source: KSTREAM-SOURCE-0000000000 (topics: [input])
+		      --> KSTREAM-FILTER-0000000001
+		    Processor: KSTREAM-FILTER-0000000001 (stores: [])
+		      --> KSTREAM-AGGREGATE-0000000002
+		      <-- KSTREAM-SOURCE-0000000000
+		    Processor: KSTREAM-AGGREGATE-0000000002 (stores: [Purchase_count_store])
+		      --> KTABLE-TOSTREAM-0000000003
+		      <-- KSTREAM-FILTER-0000000001
+		    Processor: KTABLE-TOSTREAM-0000000003 (stores: [])
+		      --> KSTREAM-SINK-0000000004
+		      <-- KSTREAM-AGGREGATE-0000000002
+		    Sink: KSTREAM-SINK-0000000004 (topic: output)
+		      <-- KTABLE-TOSTREAM-0000000003
+		  </pre>
+		</p>
+		<p>
+		  Now, even though you've added processors before your state store, the store name and its changelog
+		  topic names don't change.  This makes your topology more robust and resilient to changes made by
+		  adding or removing processors.
+		</p>
+
+		<h2>Conclusion</h2>
+
+		 It's a good practice to name your processing nodes when using the DSL, and it's even
+		 more important to do this when you have "stateful" processors
+		your application such as repartition
+		 topics and state stores (and the accompanying changelog topics).
+        <p>
+		 Here are a couple of points to remember when naming your DSL topology:
+		<ol>
+			<li>
+				If you have an <em>existing topology</em> and you <em>haven't</em> named your
+				state stores (and changelog topics) and repartition topics, we recommended that you
+				do so.  But this will be a topology breaking change, so you'll need to shut down all
+				application instances, make the changes, and run the
+				<a href="/{{version}}/documentation/streams/developer-guide/app-reset-tool">Streams Reset Tool</a>.
+				Although this may be inconvenient at first, it's worth the effort to protect your application from
+				unexpected errors due to topology changes.
+			</li>
+			<li>
+				If you have a <em>new topology</em>, make sure you name the persistent parts of your topology:
+				state stores (changelog topics) and repartition topics. This way, when you deploy your
+				application, you're protected from topology changes that otherwise would break your Kafka Streams application.
+				If you don't want to add names to stateless processors at first, that's fine as you can
+				always go back and add the names later.
+			</li>
+		</ol>
+
+		  Here's a quick reference on naming the critical parts of
+		  your Kafka Streams application to prevent topology name changes from breaking your application:
+
+		  <table>
+		      <tr>
+		          <th>Operation</th><th>Naming Class</th>
+		      </tr>
+		      <tr>
+		          <td>Aggregation repartition topics</td><td>Grouped</td>
+		      </tr>
+		      <tr>
+		          <td>KStream-KStream Join repartition topics</td><td>StreamJoined</td>
+		      </tr>
+			  <tr>
+				  <td>KStream-KTable Join repartition topic</td><td>Joined</td>
+			  </tr>
+		      <tr>
+		          <td>KStream-KStream Join state stores</td><td>StreamJoined</td>
+		      </tr>
+		      <tr>
+		          <td>State Stores (for aggregations and KTable-KTable joins)</td><td>Materialized</td>
+		      </tr>
+		      <tr>
+		          <td>Stream/Table non-stateful operations</td><td>Named</td>
+		      </tr>
+		  </table>
+	</p>
+</div>
+
+</script>
+
+<!--#include virtual="../../../includes/_header.htm" -->
+<!--#include virtual="../../../includes/_top.htm" -->
+<div class="content documentation documentation--current">
+	<!--#include virtual="../../../includes/_nav.htm" -->
+	<div class="right">
+		<!--#include virtual="../../../includes/_docs_banner.htm" -->
+		<ul class="breadcrumbs">
+			<li><a href="/documentation">Documentation</a></li>
+			<li><a href="/documentation/streams">Kafka Streams</a></li>
+			<li><a href="/documentation/streams/developer-guide/">Developer Guide</a></li>
+		</ul>
+		<div class="p-content"></div>
+	</div>
+</div>
+<!--#include virtual="../../../includes/_footer.htm" -->
+<script>
+	$(function () {
+		// Show selected style on nav item
+		$('.b-nav__streams').addClass('selected');
+
+		//sticky secondary nav
+		var $navbar = $(".sub-nav-sticky"),
+				y_pos = $navbar.offset().top,
+				height = $navbar.height();
+
+		$(window).scroll(function () {
+			var scrollTop = $(window).scrollTop();
+
+			if (scrollTop > y_pos - height) {
+				$navbar.addClass("navbar-fixed")
+			} else if (scrollTop <= y_pos) {
+				$navbar.removeClass("navbar-fixed")
+			}
+		});
+
+		// Display docs subnav items
+		$('.b-nav__docs').parent().toggleClass('nav__item__with__subs--expanded');
+	});
+</script>
+
+
+
+