You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by mj...@apache.org on 2020/03/25 23:21:59 UTC

[kafka] branch trunk updated: DOCS-3625: Add section to config topic: parameters controlled by Kafka Streams (#8268)

This is an automated email from the ASF dual-hosted git repository.

mjsax pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 0f48446  DOCS-3625: Add section to config topic: parameters controlled by Kafka Streams (#8268)
0f48446 is described below

commit 0f48446690e42b78a9a6b8c6a9bbab9f01d84cb1
Author: Jim Galasyn <ji...@confluent.io>
AuthorDate: Wed Mar 25 16:21:27 2020 -0700

    DOCS-3625: Add section to config topic: parameters controlled by Kafka Streams (#8268)
    
    Reviewer: Matthias J. Sax <ma...@confluent.io>
---
 docs/streams/architecture.html                   |   5 +
 docs/streams/developer-guide/config-streams.html | 172 ++++++++++++++---------
 2 files changed, 111 insertions(+), 66 deletions(-)

diff --git a/docs/streams/architecture.html b/docs/streams/architecture.html
index 7efd7ea..75d6d3d 100644
--- a/docs/streams/architecture.html
+++ b/docs/streams/architecture.html
@@ -80,6 +80,11 @@
         tasks will be automatically restarted on other instances and continue to consume from the same stream partitions.
     </p>
 
+    <p><b>NOTE:</b> Topic partitions are assigned to tasks, and tasks are assigned to all threads over all instances, in a best-effort attempt
+        to trade off load-balancing and stickiness of stateful tasks. For this assignment, Kafka Streams uses the
+        <a href="https://github.com/apache/kafka/blob/trunk/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsPartitionAssignor.java">StreamsPartitionAssignor</a>
+        class and doesn't let you change to a different assignor. If you try to use a different assignor, Kafka Streams ignores it.
+
     <p>
         The following diagram shows two tasks each assigned with one partition of the input streams.
     </p>
diff --git a/docs/streams/developer-guide/config-streams.html b/docs/streams/developer-guide/config-streams.html
index 479e8a1..dc0164a 100644
--- a/docs/streams/developer-guide/config-streams.html
+++ b/docs/streams/developer-guide/config-streams.html
@@ -269,6 +269,11 @@
               <td colspan="2">The amount of time in milliseconds, before a request is retried. This applies if the <code class="docutils literal"><span class="pre">retries</span></code> parameter is configured to be greater than 0. </td>
               <td>100</td>
           </tr>
+          <tr class="row-even"><td>rocksdb.config.setter</td>
+            <td>Medium</td>
+            <td colspan="2">The RocksDB configuration.</td>
+            <td></td>
+          </tr>
           <tr class="row-odd"><td>state.cleanup.delay.ms</td>
             <td>Low</td>
             <td colspan="2">The amount of time in milliseconds to wait before deleting state when a partition has migrated.</td>
@@ -474,6 +479,57 @@
               </dl>
             </div></blockquote>
         </div>
+        <div class="section" id="rocksdb-config-setter">
+          <span id="streams-developer-guide-rocksdb-config"></span><h4><a class="toc-backref" href="#id20">rocksdb.config.setter</a><a class="headerlink" href="#rocksdb-config-setter" title="Permalink to this headline"></a></h4>
+          <blockquote>
+            <div><p>The RocksDB configuration. Kafka Streams uses RocksDB as the default storage engine for persistent stores. To change the default
+              configuration for RocksDB, you can implement <code class="docutils literal"><span class="pre">RocksDBConfigSetter</span></code> and provide your custom class via <a class="reference external" href="/{{version}}/javadoc/org/apache/kafka/streams/state/RocksDBConfigSetter.html">rocksdb.config.setter</a>.</p>
+              <p>Here is an example that adjusts the memory size consumed by RocksDB.</p>
+              <div class="highlight-java"><div class="highlight"><pre><span></span>    <span class="kd">public</span> <span class="kd">static</span> <span class="kd">class</span> <span class="nc">CustomRocksDBConfig</span> <span class="kd">implements</span> <span class="n">RocksDBConfigSetter</span> <span class="o">{</span>
+                    <span class="c1">// This object should be a member variable so it can be closed in RocksDBConfigSetter#close.</span>
+                    <span class="kd">private</span> <span class="n">org.rocksdb.Cache</span> <span class="n">cache</span> <span class="o">=</span> <span class="k">new</span> <span class="n">org</span><span class="o">.</span><span class="na">rocksdb</span><span class="o">.</span><span class="na">LRUCache</span><span class="o">(</span><span class="mi">16</span> <span class="o">*</span> <span class="mi">1024L</span> <span class="o">*</span> <span class="mi">1024L</span><span class="o">);</span>
+
+                    <span class="nd">@Override</span>
+                    <span class="kd">public</span> <span class="kt">void</span> <span class="nf">setConfig</span><span class="o">(</span><span class="kd">final</span> <span class="n">String</span> <span class="n">storeName</span><span class="o">,</span> <span class="kd">final</span> <span class="n">Options</span> <span class="n">options</span><span class="o">,</span> <span class="kd">final</span> <span class="n">Map</span><span class="o">&lt;</span><span class="n">String</span><span clas [...]
+                      <span class="c1">// See #1 below.</span>
+                      <span class="n">BlockBasedTableConfig</span> <span class="n">tableConfig</span> <span class="o">=</span> <span class="k">(BlockBasedTableConfig)</span> <span class="n">options</span><span><span class="o">.</span><span class="na">tableFormatConfig</span><span class="o">();</span>
+                      <span class="n">tableConfig</span><span class="o">.</span><span class="na">setBlockCache</span><span class="o">(</span><span class="mi">cache</span></span><span class="o">);</span>
+                      <span class="c1">// See #2 below.</span>
+                      <span class="n">tableConfig</span><span class="o">.</span><span class="na">setBlockSize</span><span class="o">(</span><span class="mi">16</span> <span class="o">*</span> <span class="mi">1024L</span><span class="o">);</span>
+                      <span class="c1">// See #3 below.</span>
+                      <span class="n">tableConfig</span><span class="o">.</span><span class="na">setCacheIndexAndFilterBlocks</span><span class="o">(</span><span class="kc">true</span><span class="o">);</span>
+                      <span class="n">options</span><span class="o">.</span><span class="na">setTableFormatConfig</span><span class="o">(</span><span class="n">tableConfig</span><span class="o">);</span>
+                      <span class="c1">// See #4 below.</span>
+                      <span class="n">options</span><span class="o">.</span><span class="na">setMaxWriteBufferNumber</span><span class="o">(</span><span class="mi">2</span><span class="o">);</span>
+                    <span class="o">}</span>
+
+                    <span class="nd">@Override</span>
+                    <span class="kd">public</span> <span class="kt">void</span> <span class="nf">close</span><span class="o">(</span><span class="kd">final</span> <span class="n">String</span> <span class="n">storeName</span><span class="o">,</span> <span class="kd">final</span> <span class="n">Options</span> <span class="n">options</span><span class="o">)</span> <span class="o">{</span>
+                      <span class="c1">// See #5 below.</span>
+                      <span class="n">cache</span><span class="o">.</span><span class="na">close</span><span class="o">();</span>
+                    <span class="o">}</span>
+                    <span class="o">}</span>
+
+                    <span class="n">Properties</span> <span class="n">streamsSettings</span> <span class="o">=</span> <span class="k">new</span> <span class="n">Properties</span><span class="o">();</span>
+                    <span class="n">streamsConfig</span><span class="o">.</span><span class="na">put</span><span class="o">(</span><span class="n">StreamsConfig</span><span class="o">.</span><span class="na">ROCKSDB_CONFIG_SETTER_CLASS_CONFIG</span><span class="o">,</span> <span class="n">CustomRocksDBConfig</span><span class="o">.</span><span class="na">class</span><span class="o">);</span>
+                    </pre></div>
+                    </div>
+                    <dl class="docutils">
+                      <dt>Notes for example:</dt>
+                      <dd><ol class="first last arabic simple">
+                        <li><code class="docutils literal"><span class="pre">BlockBasedTableConfig tableConfig = (BlockBasedTableConfig) options.tableFormatConfig();</span></code> Get a reference to the existing table config rather than create a new one, so you don't accidentally overwrite defaults such as the <code class="docutils literal"><span class="pre">BloomFilter</span></code>, which is an important optimization.
+                        <li><code class="docutils literal"><span class="pre">tableConfig.setBlockSize(16</span> <span class="pre">*</span> <span class="pre">1024L);</span></code> Modify the default <a class="reference external" href="https://github.com/apache/kafka/blob/2.3/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBStore.java#L79">block size</a> per these instructions from the <a class="reference external" href="https://github.com/facebook/rocksdb/wiki/Memory- [...]
+                        <li><code class="docutils literal"><span class="pre">tableConfig.setCacheIndexAndFilterBlocks(true);</span></code> Do not let the index and filter blocks grow unbounded. For more information, see the <a class="reference external" href="https://github.com/facebook/rocksdb/wiki/Block-Cache#caching-index-and-filter-blocks">RocksDB GitHub</a>.</li>
+                        <li><code class="docutils literal"><span class="pre">options.setMaxWriteBufferNumber(2);</span></code> See the advanced options in the <a class="reference external" href="https://github.com/facebook/rocksdb/blob/8dee8cad9ee6b70fd6e1a5989a8156650a70c04f/include/rocksdb/advanced_options.h#L103">RocksDB GitHub</a>.</li>
+                        <li><code class="docutils literal"><span class="pre">cache.close();</span></code> To avoid memory leaks, you must close any objects you constructed that extend org.rocksdb.RocksObject. See  <a class="reference external" href="https://github.com/facebook/rocksdb/wiki/RocksJava-Basics#memory-management">RocksJava docs</a> for more details.</li>
+                      </ol>
+                      </dd>
+                    </dl>
+                  </div></blockquote>
+              </div>
+            </div>
+          </blockquote>
+        </div>
         <div class="section" id="state-dir">
           <h4><a class="toc-backref" href="#id14">state.dir</a><a class="headerlink" href="#state-dir" title="Permalink to this headline"></a></h4>
           <blockquote>
@@ -657,21 +713,9 @@
             </thead>
             <tbody valign="top">
             <tr class="row-even"><td>auto.offset.reset</td>
-                <td>Global Consumer</td>
-                <td>none (cannot be changed)</td>
-            </tr>
-            <tr class="row-even"><td>auto.offset.reset</td>
-                <td>Restore Consumer</td>
-                <td>none (cannot be changed)</td>
-            </tr>
-            <tr class="row-even"><td>auto.offset.reset</td>
               <td>Consumer</td>
               <td>earliest</td>
             </tr>
-            <tr class="row-odd"><td>enable.auto.commit</td>
-              <td>Consumer</td>
-              <td>false</td>
-            </tr>
             <tr class="row-even"><td>linger.ms</td>
               <td>Producer</td>
               <td>100</td>
@@ -684,13 +728,59 @@
               <td>Consumer</td>
               <td>1000</td>
             </tr>
-            <tr class="row-odd"><td>rocksdb.config.setter</td>
-              <td>Consumer</td>
-              <td>&nbsp;</td>
-            </tr>
             </tbody>
           </table>
         </div>
+        <div class="section" id="parameters-controlled-by-kafka-streams">
+          <h3><a class="toc-backref" href="#id26">Parameters controlled by Kafka Streams</a><a class="headerlink" href="#parameters-controlled-by-kafka-streams" title="Permalink to this headline"></a></h3>
+          <p>Kafka Streams assigns the following configuration parameters. If you try to change
+            <code class="docutils literal"><span class="pre">allow.auto.create.topics</span></code>, your value
+            is ignored and setting it has no effect in a Kafka Streams application. You can set the other parameters.
+            Kafka Streams sets them to different default values than a plain
+            <code class="docutils literal"><span class="pre">KafkaConsumer</span></code>.
+          <p>Kafka Streams uses the <code class="docutils literal"><span class="pre">client.id</span></code>
+            parameter to compute derived client IDs for internal clients. If you don't set
+            <code class="docutils literal"><span class="pre">client.id</span></code>, Kafka Streams sets it to
+            <code class="docutils literal"><span class="pre">&lt;application.id&gt;-&lt;random-UUID&gt;</span></code>.
+            <table border="1" class="non-scrolling-table docutils">
+              <colgroup>
+              <col width="50%">
+              <col width="19%">
+              <col width="31%">
+              </colgroup>
+              <thead valign="bottom">
+              <tr class="row-odd"><th class="head">Parameter Name</th>
+              <th class="head">Corresponding Client</th>
+              <th class="head">Streams Default</th>
+              </tr>
+              </thead>
+              <tbody valign="top">
+              <tr class="row-odd"><td>allow.auto.create.topics</td>
+              <td>Consumer</td>
+              <td>false</td>
+              </tr>
+              <tr class="row-even"><td>auto.offset.reset</td>
+              <td>Consumer</td>
+              <td>earliest</td>
+              </tr>
+              <tr class="row-odd"><td>linger.ms</td>
+              <td>Producer</td>
+              <td>100</td>
+              </tr>
+              <tr class="row-even"><td>max.poll.interval.ms</td>
+              <td>Consumer</td>
+              <td>300000</td>
+              </tr>
+              <tr class="row-odd"><td>max.poll.records</td>
+              <td>Consumer</td>
+              <td>1000</td>
+              </tr>
+              <tr class="row-even"><td>retries</td>
+              <td>Producer</td>
+              <td>10</td>
+              </tr>
+              </tbody>
+              </table>
         <div class="section" id="enable-auto-commit">
           <span id="streams-developer-guide-consumer-auto-commit"></span><h4><a class="toc-backref" href="#id19">enable.auto.commit</a><a class="headerlink" href="#enable-auto-commit" title="Permalink to this headline"></a></h4>
           <blockquote>
@@ -698,56 +788,6 @@
               value to <code class="docutils literal"><span class="pre">false</span></code>.  Consumers will only commit explicitly via <em>commitSync</em> calls when the Kafka Streams library or a user decides
               to commit the current processing state.</div></blockquote>
         </div>
-        <div class="section" id="rocksdb-config-setter">
-          <span id="streams-developer-guide-rocksdb-config"></span><h4><a class="toc-backref" href="#id20">rocksdb.config.setter</a><a class="headerlink" href="#rocksdb-config-setter" title="Permalink to this headline"></a></h4>
-          <blockquote>
-            <div><p>The RocksDB configuration. Kafka Streams uses RocksDB as the default storage engine for persistent stores. To change the default
-              configuration for RocksDB, implement <code class="docutils literal"><span class="pre">RocksDBConfigSetter</span></code> and provide your custom class via <a class="reference external" href="/{{version}}/javadoc/org/apache/kafka/streams/state/RocksDBConfigSetter.html">rocksdb.config.setter</a>.</p>
-              <p>Here is an example that adjusts the memory size consumed by RocksDB.</p>
-              <div class="highlight-java"><div class="highlight"><pre><span></span>    <span class="kd">public</span> <span class="kd">static</span> <span class="kd">class</span> <span class="nc">CustomRocksDBConfig</span> <span class="kd">implements</span> <span class="n">RocksDBConfigSetter</span> <span class="o">{</span>
-
-       <span class="c1">// This object should be a member variable so it can be closed in RocksDBConfigSetter#close.</span>
-       <span class="kd">private</span> <span class="n">org.rocksdb.Cache</span> <span class="n">cache</span> <span class="o">=</span> <span class="k">new</span> <span class="n">org</span><span class="o">.</span><span class="na">rocksdb</span><span class="o">.</span><span class="na">LRUCache</span><span class="o">(</span><span class="mi">16</span> <span class="o">*</span> <span class="mi">1024L</span> <span class="o">*</span> <span class="mi">1024L</span><span class="o">);</span>
-
-       <span class="nd">@Override</span>
-       <span class="kd">public</span> <span class="kt">void</span> <span class="nf">setConfig</span><span class="o">(</span><span class="kd">final</span> <span class="n">String</span> <span class="n">storeName</span><span class="o">,</span> <span class="kd">final</span> <span class="n">Options</span> <span class="n">options</span><span class="o">,</span> <span class="kd">final</span> <span class="n">Map</span><span class="o">&lt;</span><span class="n">String</span><span class="o">,</span [...]
-         <span class="c1">// See #1 below.</span>
-         <span class="n">BlockBasedTableConfig</span> <span class="n">tableConfig</span> <span class="o">=</span> <span class="k">(BlockBasedTableConfig)</span> <span class="n">options</span><span><span class="o">.</span><span class="na">tableFormatConfig</span><span class="o">();</span>
-         <span class="n">tableConfig</span><span class="o">.</span><span class="na">setBlockCache</span><span class="o">(</span><span class="mi">cache</span></span><span class="o">);</span>
-         <span class="c1">// See #2 below.</span>
-         <span class="n">tableConfig</span><span class="o">.</span><span class="na">setBlockSize</span><span class="o">(</span><span class="mi">16</span> <span class="o">*</span> <span class="mi">1024L</span><span class="o">);</span>
-         <span class="c1">// See #3 below.</span>
-         <span class="n">tableConfig</span><span class="o">.</span><span class="na">setCacheIndexAndFilterBlocks</span><span class="o">(</span><span class="kc">true</span><span class="o">);</span>
-         <span class="n">options</span><span class="o">.</span><span class="na">setTableFormatConfig</span><span class="o">(</span><span class="n">tableConfig</span><span class="o">);</span>
-         <span class="c1">// See #4 below.</span>
-         <span class="n">options</span><span class="o">.</span><span class="na">setMaxWriteBufferNumber</span><span class="o">(</span><span class="mi">2</span><span class="o">);</span>
-       <span class="o">}</span>
-
-       <span class="nd">@Override</span>
-       <span class="kd">public</span> <span class="kt">void</span> <span class="nf">close</span><span class="o">(</span><span class="kd">final</span> <span class="n">String</span> <span class="n">storeName</span><span class="o">,</span> <span class="kd">final</span> <span class="n">Options</span> <span class="n">options</span><span class="o">)</span> <span class="o">{</span>
-         <span class="c1">// See #5 below.</span>
-         <span class="n">cache</span><span class="o">.</span><span class="na">close</span><span class="o">();</span>
-       <span class="o">}</span>
-    <span class="o">}</span>
-
-<span class="n">Properties</span> <span class="n">streamsSettings</span> <span class="o">=</span> <span class="k">new</span> <span class="n">Properties</span><span class="o">();</span>
-<span class="n">streamsConfig</span><span class="o">.</span><span class="na">put</span><span class="o">(</span><span class="n">StreamsConfig</span><span class="o">.</span><span class="na">ROCKSDB_CONFIG_SETTER_CLASS_CONFIG</span><span class="o">,</span> <span class="n">CustomRocksDBConfig</span><span class="o">.</span><span class="na">class</span><span class="o">);</span>
-</pre></div>
-              </div>
-              <dl class="docutils">
-                <dt>Notes for example:</dt>
-                <dd><ol class="first last arabic simple">
-                  <li><code class="docutils literal"><span class="pre">BlockBasedTableConfig tableConfig = (BlockBasedTableConfig) options.tableFormatConfig();</span></code> Get a reference to the existing table config rather than create a new one, so you don't accidentally overwrite defaults such as the <code class="docutils literal"><span class="pre">BloomFilter</span></code>, which is an important optimization.
-                  <li><code class="docutils literal"><span class="pre">tableConfig.setBlockSize(16</span> <span class="pre">*</span> <span class="pre">1024L);</span></code> Modify the default <a class="reference external" href="https://github.com/apache/kafka/blob/2.3/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBStore.java#L79">block size</a> per these instructions from the <a class="reference external" href="https://github.com/facebook/rocksdb/wiki/Memory-usage- [...]
-                  <li><code class="docutils literal"><span class="pre">tableConfig.setCacheIndexAndFilterBlocks(true);</span></code> Do not let the index and filter blocks grow unbounded. For more information, see the <a class="reference external" href="https://github.com/facebook/rocksdb/wiki/Block-Cache#caching-index-and-filter-blocks">RocksDB GitHub</a>.</li>
-                  <li><code class="docutils literal"><span class="pre">options.setMaxWriteBufferNumber(2);</span></code> See the advanced options in the <a class="reference external" href="https://github.com/facebook/rocksdb/blob/8dee8cad9ee6b70fd6e1a5989a8156650a70c04f/include/rocksdb/advanced_options.h#L103">RocksDB GitHub</a>.</li>
-                  <li><code class="docutils literal"><span class="pre">cache.close();</span></code> To avoid memory leaks, you must close any objects you constructed that extend org.rocksdb.RocksObject. See  <a class="reference external" href="https://github.com/facebook/rocksdb/wiki/RocksJava-Basics#memory-management">RocksJava docs</a> for more details.</li>
-                </ol>
-                </dd>
-              </dl>
-            </div></blockquote>
-        </div>
-      </div>
       <div class="section" id="recommended-configuration-parameters-for-resiliency">
         <h3><a class="toc-backref" href="#id21">Recommended configuration parameters for resiliency</a><a class="headerlink" href="#recommended-configuration-parameters-for-resiliency" title="Permalink to this headline"></a></h3>
         <p>There are several Kafka and Kafka Streams configuration options that need to be configured explicitly for resiliency in face of broker failures:</p>