You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by mj...@apache.org on 2020/03/25 23:21:59 UTC
[kafka] branch trunk updated: DOCS-3625: Add section to config
topic: parameters controlled by Kafka Streams (#8268)
This is an automated email from the ASF dual-hosted git repository.
mjsax pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new 0f48446 DOCS-3625: Add section to config topic: parameters controlled by Kafka Streams (#8268)
0f48446 is described below
commit 0f48446690e42b78a9a6b8c6a9bbab9f01d84cb1
Author: Jim Galasyn <ji...@confluent.io>
AuthorDate: Wed Mar 25 16:21:27 2020 -0700
DOCS-3625: Add section to config topic: parameters controlled by Kafka Streams (#8268)
Reviewer: Matthias J. Sax <ma...@confluent.io>
---
docs/streams/architecture.html | 5 +
docs/streams/developer-guide/config-streams.html | 172 ++++++++++++++---------
2 files changed, 111 insertions(+), 66 deletions(-)
diff --git a/docs/streams/architecture.html b/docs/streams/architecture.html
index 7efd7ea..75d6d3d 100644
--- a/docs/streams/architecture.html
+++ b/docs/streams/architecture.html
@@ -80,6 +80,11 @@
tasks will be automatically restarted on other instances and continue to consume from the same stream partitions.
</p>
+ <p><b>NOTE:</b> Topic partitions are assigned to tasks, and tasks are assigned to all threads over all instances, in a best-effort attempt
+ to trade off load-balancing and stickiness of stateful tasks. For this assignment, Kafka Streams uses the
+ <a href="https://github.com/apache/kafka/blob/trunk/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsPartitionAssignor.java">StreamsPartitionAssignor</a>
+ class and doesn't let you change to a different assignor. If you try to use a different assignor, Kafka Streams ignores it.
+
<p>
The following diagram shows two tasks each assigned with one partition of the input streams.
</p>
diff --git a/docs/streams/developer-guide/config-streams.html b/docs/streams/developer-guide/config-streams.html
index 479e8a1..dc0164a 100644
--- a/docs/streams/developer-guide/config-streams.html
+++ b/docs/streams/developer-guide/config-streams.html
@@ -269,6 +269,11 @@
<td colspan="2">The amount of time in milliseconds, before a request is retried. This applies if the <code class="docutils literal"><span class="pre">retries</span></code> parameter is configured to be greater than 0. </td>
<td>100</td>
</tr>
+ <tr class="row-even"><td>rocksdb.config.setter</td>
+ <td>Medium</td>
+ <td colspan="2">The RocksDB configuration.</td>
+ <td></td>
+ </tr>
<tr class="row-odd"><td>state.cleanup.delay.ms</td>
<td>Low</td>
<td colspan="2">The amount of time in milliseconds to wait before deleting state when a partition has migrated.</td>
@@ -474,6 +479,57 @@
</dl>
</div></blockquote>
</div>
+ <div class="section" id="rocksdb-config-setter">
+ <span id="streams-developer-guide-rocksdb-config"></span><h4><a class="toc-backref" href="#id20">rocksdb.config.setter</a><a class="headerlink" href="#rocksdb-config-setter" title="Permalink to this headline"></a></h4>
+ <blockquote>
+ <div><p>The RocksDB configuration. Kafka Streams uses RocksDB as the default storage engine for persistent stores. To change the default
+ configuration for RocksDB, you can implement <code class="docutils literal"><span class="pre">RocksDBConfigSetter</span></code> and provide your custom class via <a class="reference external" href="/{{version}}/javadoc/org/apache/kafka/streams/state/RocksDBConfigSetter.html">rocksdb.config.setter</a>.</p>
+ <p>Here is an example that adjusts the memory size consumed by RocksDB.</p>
+ <div class="highlight-java"><div class="highlight"><pre><span></span> <span class="kd">public</span> <span class="kd">static</span> <span class="kd">class</span> <span class="nc">CustomRocksDBConfig</span> <span class="kd">implements</span> <span class="n">RocksDBConfigSetter</span> <span class="o">{</span>
+ <span class="c1">// This object should be a member variable so it can be closed in RocksDBConfigSetter#close.</span>
+ <span class="kd">private</span> <span class="n">org.rocksdb.Cache</span> <span class="n">cache</span> <span class="o">=</span> <span class="k">new</span> <span class="n">org</span><span class="o">.</span><span class="na">rocksdb</span><span class="o">.</span><span class="na">LRUCache</span><span class="o">(</span><span class="mi">16</span> <span class="o">*</span> <span class="mi">1024L</span> <span class="o">*</span> <span class="mi">1024L</span><span class="o">);</span>
+
+ <span class="nd">@Override</span>
+ <span class="kd">public</span> <span class="kt">void</span> <span class="nf">setConfig</span><span class="o">(</span><span class="kd">final</span> <span class="n">String</span> <span class="n">storeName</span><span class="o">,</span> <span class="kd">final</span> <span class="n">Options</span> <span class="n">options</span><span class="o">,</span> <span class="kd">final</span> <span class="n">Map</span><span class="o"><</span><span class="n">String</span><span clas [...]
+ <span class="c1">// See #1 below.</span>
+ <span class="n">BlockBasedTableConfig</span> <span class="n">tableConfig</span> <span class="o">=</span> <span class="k">(BlockBasedTableConfig)</span> <span class="n">options</span><span><span class="o">.</span><span class="na">tableFormatConfig</span><span class="o">();</span>
+ <span class="n">tableConfig</span><span class="o">.</span><span class="na">setBlockCache</span><span class="o">(</span><span class="mi">cache</span></span><span class="o">);</span>
+ <span class="c1">// See #2 below.</span>
+ <span class="n">tableConfig</span><span class="o">.</span><span class="na">setBlockSize</span><span class="o">(</span><span class="mi">16</span> <span class="o">*</span> <span class="mi">1024L</span><span class="o">);</span>
+ <span class="c1">// See #3 below.</span>
+ <span class="n">tableConfig</span><span class="o">.</span><span class="na">setCacheIndexAndFilterBlocks</span><span class="o">(</span><span class="kc">true</span><span class="o">);</span>
+ <span class="n">options</span><span class="o">.</span><span class="na">setTableFormatConfig</span><span class="o">(</span><span class="n">tableConfig</span><span class="o">);</span>
+ <span class="c1">// See #4 below.</span>
+ <span class="n">options</span><span class="o">.</span><span class="na">setMaxWriteBufferNumber</span><span class="o">(</span><span class="mi">2</span><span class="o">);</span>
+ <span class="o">}</span>
+
+ <span class="nd">@Override</span>
+ <span class="kd">public</span> <span class="kt">void</span> <span class="nf">close</span><span class="o">(</span><span class="kd">final</span> <span class="n">String</span> <span class="n">storeName</span><span class="o">,</span> <span class="kd">final</span> <span class="n">Options</span> <span class="n">options</span><span class="o">)</span> <span class="o">{</span>
+ <span class="c1">// See #5 below.</span>
+ <span class="n">cache</span><span class="o">.</span><span class="na">close</span><span class="o">();</span>
+ <span class="o">}</span>
+ <span class="o">}</span>
+
+ <span class="n">Properties</span> <span class="n">streamsSettings</span> <span class="o">=</span> <span class="k">new</span> <span class="n">Properties</span><span class="o">();</span>
+ <span class="n">streamsConfig</span><span class="o">.</span><span class="na">put</span><span class="o">(</span><span class="n">StreamsConfig</span><span class="o">.</span><span class="na">ROCKSDB_CONFIG_SETTER_CLASS_CONFIG</span><span class="o">,</span> <span class="n">CustomRocksDBConfig</span><span class="o">.</span><span class="na">class</span><span class="o">);</span>
+ </pre></div>
+ </div>
+ <dl class="docutils">
+ <dt>Notes for example:</dt>
+ <dd><ol class="first last arabic simple">
+ <li><code class="docutils literal"><span class="pre">BlockBasedTableConfig tableConfig = (BlockBasedTableConfig) options.tableFormatConfig();</span></code> Get a reference to the existing table config rather than create a new one, so you don't accidentally overwrite defaults such as the <code class="docutils literal"><span class="pre">BloomFilter</span></code>, which is an important optimization.
+ <li><code class="docutils literal"><span class="pre">tableConfig.setBlockSize(16</span> <span class="pre">*</span> <span class="pre">1024L);</span></code> Modify the default <a class="reference external" href="https://github.com/apache/kafka/blob/2.3/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBStore.java#L79">block size</a> per these instructions from the <a class="reference external" href="https://github.com/facebook/rocksdb/wiki/Memory- [...]
+ <li><code class="docutils literal"><span class="pre">tableConfig.setCacheIndexAndFilterBlocks(true);</span></code> Do not let the index and filter blocks grow unbounded. For more information, see the <a class="reference external" href="https://github.com/facebook/rocksdb/wiki/Block-Cache#caching-index-and-filter-blocks">RocksDB GitHub</a>.</li>
+ <li><code class="docutils literal"><span class="pre">options.setMaxWriteBufferNumber(2);</span></code> See the advanced options in the <a class="reference external" href="https://github.com/facebook/rocksdb/blob/8dee8cad9ee6b70fd6e1a5989a8156650a70c04f/include/rocksdb/advanced_options.h#L103">RocksDB GitHub</a>.</li>
+ <li><code class="docutils literal"><span class="pre">cache.close();</span></code> To avoid memory leaks, you must close any objects you constructed that extend org.rocksdb.RocksObject. See <a class="reference external" href="https://github.com/facebook/rocksdb/wiki/RocksJava-Basics#memory-management">RocksJava docs</a> for more details.</li>
+ </ol>
+ </dd>
+ </dl>
+ </div></blockquote>
+ </div>
+ </div>
+ </blockquote>
+ </div>
<div class="section" id="state-dir">
<h4><a class="toc-backref" href="#id14">state.dir</a><a class="headerlink" href="#state-dir" title="Permalink to this headline"></a></h4>
<blockquote>
@@ -657,21 +713,9 @@
</thead>
<tbody valign="top">
<tr class="row-even"><td>auto.offset.reset</td>
- <td>Global Consumer</td>
- <td>none (cannot be changed)</td>
- </tr>
- <tr class="row-even"><td>auto.offset.reset</td>
- <td>Restore Consumer</td>
- <td>none (cannot be changed)</td>
- </tr>
- <tr class="row-even"><td>auto.offset.reset</td>
<td>Consumer</td>
<td>earliest</td>
</tr>
- <tr class="row-odd"><td>enable.auto.commit</td>
- <td>Consumer</td>
- <td>false</td>
- </tr>
<tr class="row-even"><td>linger.ms</td>
<td>Producer</td>
<td>100</td>
@@ -684,13 +728,59 @@
<td>Consumer</td>
<td>1000</td>
</tr>
- <tr class="row-odd"><td>rocksdb.config.setter</td>
- <td>Consumer</td>
- <td> </td>
- </tr>
</tbody>
</table>
</div>
+ <div class="section" id="parameters-controlled-by-kafka-streams">
+ <h3><a class="toc-backref" href="#id26">Parameters controlled by Kafka Streams</a><a class="headerlink" href="#parameters-controlled-by-kafka-streams" title="Permalink to this headline"></a></h3>
+ <p>Kafka Streams assigns the following configuration parameters. If you try to change
+ <code class="docutils literal"><span class="pre">allow.auto.create.topics</span></code>, your value
+ is ignored and setting it has no effect in a Kafka Streams application. You can set the other parameters.
+ Kafka Streams sets them to different default values than a plain
+ <code class="docutils literal"><span class="pre">KafkaConsumer</span></code>.
+ <p>Kafka Streams uses the <code class="docutils literal"><span class="pre">client.id</span></code>
+ parameter to compute derived client IDs for internal clients. If you don't set
+ <code class="docutils literal"><span class="pre">client.id</span></code>, Kafka Streams sets it to
+ <code class="docutils literal"><span class="pre"><application.id>-<random-UUID></span></code>.
+ <table border="1" class="non-scrolling-table docutils">
+ <colgroup>
+ <col width="50%">
+ <col width="19%">
+ <col width="31%">
+ </colgroup>
+ <thead valign="bottom">
+ <tr class="row-odd"><th class="head">Parameter Name</th>
+ <th class="head">Corresponding Client</th>
+ <th class="head">Streams Default</th>
+ </tr>
+ </thead>
+ <tbody valign="top">
+ <tr class="row-odd"><td>allow.auto.create.topics</td>
+ <td>Consumer</td>
+ <td>false</td>
+ </tr>
+ <tr class="row-even"><td>auto.offset.reset</td>
+ <td>Consumer</td>
+ <td>earliest</td>
+ </tr>
+ <tr class="row-odd"><td>linger.ms</td>
+ <td>Producer</td>
+ <td>100</td>
+ </tr>
+ <tr class="row-even"><td>max.poll.interval.ms</td>
+ <td>Consumer</td>
+ <td>300000</td>
+ </tr>
+ <tr class="row-odd"><td>max.poll.records</td>
+ <td>Consumer</td>
+ <td>1000</td>
+ </tr>
+ <tr class="row-even"><td>retries</td>
+ <td>Producer</td>
+ <td>10</td>
+ </tr>
+ </tbody>
+ </table>
<div class="section" id="enable-auto-commit">
<span id="streams-developer-guide-consumer-auto-commit"></span><h4><a class="toc-backref" href="#id19">enable.auto.commit</a><a class="headerlink" href="#enable-auto-commit" title="Permalink to this headline"></a></h4>
<blockquote>
@@ -698,56 +788,6 @@
value to <code class="docutils literal"><span class="pre">false</span></code>. Consumers will only commit explicitly via <em>commitSync</em> calls when the Kafka Streams library or a user decides
to commit the current processing state.</div></blockquote>
</div>
- <div class="section" id="rocksdb-config-setter">
- <span id="streams-developer-guide-rocksdb-config"></span><h4><a class="toc-backref" href="#id20">rocksdb.config.setter</a><a class="headerlink" href="#rocksdb-config-setter" title="Permalink to this headline"></a></h4>
- <blockquote>
- <div><p>The RocksDB configuration. Kafka Streams uses RocksDB as the default storage engine for persistent stores. To change the default
- configuration for RocksDB, implement <code class="docutils literal"><span class="pre">RocksDBConfigSetter</span></code> and provide your custom class via <a class="reference external" href="/{{version}}/javadoc/org/apache/kafka/streams/state/RocksDBConfigSetter.html">rocksdb.config.setter</a>.</p>
- <p>Here is an example that adjusts the memory size consumed by RocksDB.</p>
- <div class="highlight-java"><div class="highlight"><pre><span></span> <span class="kd">public</span> <span class="kd">static</span> <span class="kd">class</span> <span class="nc">CustomRocksDBConfig</span> <span class="kd">implements</span> <span class="n">RocksDBConfigSetter</span> <span class="o">{</span>
-
- <span class="c1">// This object should be a member variable so it can be closed in RocksDBConfigSetter#close.</span>
- <span class="kd">private</span> <span class="n">org.rocksdb.Cache</span> <span class="n">cache</span> <span class="o">=</span> <span class="k">new</span> <span class="n">org</span><span class="o">.</span><span class="na">rocksdb</span><span class="o">.</span><span class="na">LRUCache</span><span class="o">(</span><span class="mi">16</span> <span class="o">*</span> <span class="mi">1024L</span> <span class="o">*</span> <span class="mi">1024L</span><span class="o">);</span>
-
- <span class="nd">@Override</span>
- <span class="kd">public</span> <span class="kt">void</span> <span class="nf">setConfig</span><span class="o">(</span><span class="kd">final</span> <span class="n">String</span> <span class="n">storeName</span><span class="o">,</span> <span class="kd">final</span> <span class="n">Options</span> <span class="n">options</span><span class="o">,</span> <span class="kd">final</span> <span class="n">Map</span><span class="o"><</span><span class="n">String</span><span class="o">,</span [...]
- <span class="c1">// See #1 below.</span>
- <span class="n">BlockBasedTableConfig</span> <span class="n">tableConfig</span> <span class="o">=</span> <span class="k">(BlockBasedTableConfig)</span> <span class="n">options</span><span><span class="o">.</span><span class="na">tableFormatConfig</span><span class="o">();</span>
- <span class="n">tableConfig</span><span class="o">.</span><span class="na">setBlockCache</span><span class="o">(</span><span class="mi">cache</span></span><span class="o">);</span>
- <span class="c1">// See #2 below.</span>
- <span class="n">tableConfig</span><span class="o">.</span><span class="na">setBlockSize</span><span class="o">(</span><span class="mi">16</span> <span class="o">*</span> <span class="mi">1024L</span><span class="o">);</span>
- <span class="c1">// See #3 below.</span>
- <span class="n">tableConfig</span><span class="o">.</span><span class="na">setCacheIndexAndFilterBlocks</span><span class="o">(</span><span class="kc">true</span><span class="o">);</span>
- <span class="n">options</span><span class="o">.</span><span class="na">setTableFormatConfig</span><span class="o">(</span><span class="n">tableConfig</span><span class="o">);</span>
- <span class="c1">// See #4 below.</span>
- <span class="n">options</span><span class="o">.</span><span class="na">setMaxWriteBufferNumber</span><span class="o">(</span><span class="mi">2</span><span class="o">);</span>
- <span class="o">}</span>
-
- <span class="nd">@Override</span>
- <span class="kd">public</span> <span class="kt">void</span> <span class="nf">close</span><span class="o">(</span><span class="kd">final</span> <span class="n">String</span> <span class="n">storeName</span><span class="o">,</span> <span class="kd">final</span> <span class="n">Options</span> <span class="n">options</span><span class="o">)</span> <span class="o">{</span>
- <span class="c1">// See #5 below.</span>
- <span class="n">cache</span><span class="o">.</span><span class="na">close</span><span class="o">();</span>
- <span class="o">}</span>
- <span class="o">}</span>
-
-<span class="n">Properties</span> <span class="n">streamsSettings</span> <span class="o">=</span> <span class="k">new</span> <span class="n">Properties</span><span class="o">();</span>
-<span class="n">streamsConfig</span><span class="o">.</span><span class="na">put</span><span class="o">(</span><span class="n">StreamsConfig</span><span class="o">.</span><span class="na">ROCKSDB_CONFIG_SETTER_CLASS_CONFIG</span><span class="o">,</span> <span class="n">CustomRocksDBConfig</span><span class="o">.</span><span class="na">class</span><span class="o">);</span>
-</pre></div>
- </div>
- <dl class="docutils">
- <dt>Notes for example:</dt>
- <dd><ol class="first last arabic simple">
- <li><code class="docutils literal"><span class="pre">BlockBasedTableConfig tableConfig = (BlockBasedTableConfig) options.tableFormatConfig();</span></code> Get a reference to the existing table config rather than create a new one, so you don't accidentally overwrite defaults such as the <code class="docutils literal"><span class="pre">BloomFilter</span></code>, which is an important optimization.
- <li><code class="docutils literal"><span class="pre">tableConfig.setBlockSize(16</span> <span class="pre">*</span> <span class="pre">1024L);</span></code> Modify the default <a class="reference external" href="https://github.com/apache/kafka/blob/2.3/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBStore.java#L79">block size</a> per these instructions from the <a class="reference external" href="https://github.com/facebook/rocksdb/wiki/Memory-usage- [...]
- <li><code class="docutils literal"><span class="pre">tableConfig.setCacheIndexAndFilterBlocks(true);</span></code> Do not let the index and filter blocks grow unbounded. For more information, see the <a class="reference external" href="https://github.com/facebook/rocksdb/wiki/Block-Cache#caching-index-and-filter-blocks">RocksDB GitHub</a>.</li>
- <li><code class="docutils literal"><span class="pre">options.setMaxWriteBufferNumber(2);</span></code> See the advanced options in the <a class="reference external" href="https://github.com/facebook/rocksdb/blob/8dee8cad9ee6b70fd6e1a5989a8156650a70c04f/include/rocksdb/advanced_options.h#L103">RocksDB GitHub</a>.</li>
- <li><code class="docutils literal"><span class="pre">cache.close();</span></code> To avoid memory leaks, you must close any objects you constructed that extend org.rocksdb.RocksObject. See <a class="reference external" href="https://github.com/facebook/rocksdb/wiki/RocksJava-Basics#memory-management">RocksJava docs</a> for more details.</li>
- </ol>
- </dd>
- </dl>
- </div></blockquote>
- </div>
- </div>
<div class="section" id="recommended-configuration-parameters-for-resiliency">
<h3><a class="toc-backref" href="#id21">Recommended configuration parameters for resiliency</a><a class="headerlink" href="#recommended-configuration-parameters-for-resiliency" title="Permalink to this headline"></a></h3>
<p>There are several Kafka and Kafka Streams configuration options that need to be configured explicitly for resiliency in face of broker failures:</p>