You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by mj...@apache.org on 2020/09/22 18:45:37 UTC

[kafka] branch 2.6 updated (23f9890 -> 1498979)

This is an automated email from the ASF dual-hosted git repository.

mjsax pushed a change to branch 2.6
in repository https://gitbox.apache.org/repos/asf/kafka.git.


    from 23f9890  KAFKA-10401; Ensure `currentStateTimeStamp` is set correctly by group coordinator (#9202)
     new e64c209  KAFKA-9161: add docs for KIP-441 and KIP-613 and other configs that need fixing (#9027)
     new 1498979  MINOR: Streams docs fixes (#9308)

The 2 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 docs/streams/architecture.html                     |   4 +-
 docs/streams/developer-guide/config-streams.html   | 360 ++++++++++++++-------
 docs/streams/developer-guide/memory-mgmt.html      |   5 +-
 docs/streams/developer-guide/running-app.html      |  12 +
 docs/streams/upgrade-guide.html                    |  17 +-
 .../org/apache/kafka/streams/StreamsConfig.java    |  56 ++--
 6 files changed, 312 insertions(+), 142 deletions(-)


[kafka] 01/02: KAFKA-9161: add docs for KIP-441 and KIP-613 and other configs that need fixing (#9027)

Posted by mj...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

mjsax pushed a commit to branch 2.6
in repository https://gitbox.apache.org/repos/asf/kafka.git

commit e64c209567fa3e27ac1b8d2bc50b72b39860ea06
Author: A. Sophie Blee-Goldman <so...@confluent.io>
AuthorDate: Mon Jul 20 14:27:28 2020 -0700

    KAFKA-9161: add docs for KIP-441 and KIP-613 and other configs that need fixing (#9027)
    
    Add docs for KIP-441 and KIP-613.
    Fixed some miscellaneous unrelated issues in the docs:
    * Adds some missing configs to the Streams config docs: max.task.idle.ms,topology.optimization, default.windowed.key.serde.inner.class, and default.windowed.value.serde.inner.class
    * Defines the previously-undefined default windowed serde class configs, including choosing a default (null) and giving them a doc string, so the yshould nwo show up in the auto-generated general Kafka config docs
    * Adds a note to warn users about the rocksDB bug that prevents setting a strict capacity limit and counting write buffer memory against the block cache
    
    Reviewers: Bruno Cadonna <br...@confluent.io>, John Roesler <vv...@apache.org>
---
 docs/streams/architecture.html                     |   4 +-
 docs/streams/developer-guide/config-streams.html   | 358 ++++++++++++++-------
 docs/streams/developer-guide/memory-mgmt.html      |   5 +-
 docs/streams/developer-guide/running-app.html      |  12 +
 docs/streams/upgrade-guide.html                    |  17 +-
 .../org/apache/kafka/streams/StreamsConfig.java    |  56 ++--
 6 files changed, 312 insertions(+), 140 deletions(-)

diff --git a/docs/streams/architecture.html b/docs/streams/architecture.html
index 3ef9dfe..43de9e7 100644
--- a/docs/streams/architecture.html
+++ b/docs/streams/architecture.html
@@ -151,8 +151,10 @@
     <p>
         Note that the cost of task (re)initialization typically depends primarily on the time for restoring the state by replaying the state stores' associated changelog topics.
         To minimize this restoration time, users can configure their applications to have <b>standby replicas</b> of local states (i.e. fully replicated copies of the state).
-        When a task migration happens, Kafka Streams then attempts to assign a task to an application instance where such a standby replica already exists in order to minimize
+        When a task migration happens, Kafka Streams will assign a task to an application instance where such a standby replica already exists in order to minimize
         the task (re)initialization cost. See <code>num.standby.replicas</code> in the <a href="/{{version}}/documentation/#streamsconfigs"><b>Kafka Streams Configs</b></a> section.
+        Starting in 2.6, Kafka Streams will guarantee that a task is only ever assigned to an instance with a fully caught-up local copy of the state, if such an instance
+        exists. Standby tasks will increase the likelihood that a caught-up instance exists in the case of a failure.
     </p>
 
     <div class="pagination">
diff --git a/docs/streams/developer-guide/config-streams.html b/docs/streams/developer-guide/config-streams.html
index a64dd77..8388476 100644
--- a/docs/streams/developer-guide/config-streams.html
+++ b/docs/streams/developer-guide/config-streams.html
@@ -63,24 +63,31 @@
           </ul>
           </li>
           <li><a class="reference internal" href="#optional-configuration-parameters" id="id6">Optional configuration parameters</a><ul>
+            <li><a class="reference internal" href="#acceptable-recovery-lag" id="id27">acceptable.recovery.lag</a></li>
             <li><a class="reference internal" href="#default-deserialization-exception-handler" id="id7">default.deserialization.exception.handler</a></li>
-            <li><a class="reference internal" href="#default-production-exception-handler" id="id24">default.production.exception.handler</a></li>
             <li><a class="reference internal" href="#default-key-serde" id="id8">default.key.serde</a></li>
+            <li><a class="reference internal" href="#default-production-exception-handler" id="id24">default.production.exception.handler</a></li>
+            <li><a class="reference internal" href="#timestamp-extractor" id="id15">default.timestamp.extractor</a></li>
             <li><a class="reference internal" href="#default-value-serde" id="id9">default.value.serde</a></li>
+            <li><a class="reference internal" href="#default-windowed-key-serde-inner" id="id32">default.windowed.key.serde.inner</a></li>
+            <li><a class="reference internal" href="#default-windowed-value-serde-inner" id="id33">default.windowed.value.serde.inner</a></li>
+            <li><a class="reference internal" href="#max-task-idle-ms" id="id28">max.task.idle.ms</a></li>
+            <li><a class="reference internal" href="#max-warmup-replicas" id="id29">max.warmup.replicas</a></li>
             <li><a class="reference internal" href="#num-standby-replicas" id="id10">num.standby.replicas</a></li>
             <li><a class="reference internal" href="#num-stream-threads" id="id11">num.stream.threads</a></li>
             <li><a class="reference internal" href="#partition-grouper" id="id12">partition.grouper</a></li>
+            <li><a class="reference internal" href="#probing-rebalance-interval-ms" id="id30">probing.rebalance.interval.ms</a></li>
             <li><a class="reference internal" href="#processing-guarantee" id="id25">processing.guarantee</a></li>
             <li><a class="reference internal" href="#replication-factor" id="id13">replication.factor</a></li>
+            <li><a class="reference internal" href="#rocksdb-config-setter" id="id20">rocksdb.config.setter</a></li>
             <li><a class="reference internal" href="#state-dir" id="id14">state.dir</a></li>
-            <li><a class="reference internal" href="#timestamp-extractor" id="id15">timestamp.extractor</a></li>
+            <li><a class="reference internal" href="#topology-optimization" id="id31">topology.optimization</a></li>
           </ul>
           </li>
           <li><a class="reference internal" href="#kafka-consumers-and-producer-configuration-parameters" id="id16">Kafka consumers and producer configuration parameters</a><ul>
             <li><a class="reference internal" href="#naming" id="id17">Naming</a></li>
             <li><a class="reference internal" href="#default-values" id="id18">Default Values</a></li>
             <li><a class="reference internal" href="#enable-auto-commit" id="id19">enable.auto.commit</a></li>
-            <li><a class="reference internal" href="#rocksdb-config-setter" id="id20">rocksdb.config.setter</a></li>
           </ul>
           </li>
           <li><a class="reference internal" href="#recommended-configuration-parameters-for-resiliency" id="id21">Recommended configuration parameters for resiliency</a><ul>
@@ -166,6 +173,11 @@
           </tr>
           </thead>
           <tbody valign="top">
+          <tr class="row-odd"><td>acceptable.recovery.lag</td>
+            <td>Medium</td>
+            <td colspan="2">The maximum acceptable lag (number of offsets to catch up) for an instance to be considered caught-up and ready for the active task.</td>
+            <td>10000</td>
+          </tr>
           <tr class="row-even"><td>application.server</td>
             <td>Low</td>
             <td colspan="2">A host:port pair pointing to an embedded user defined endpoint that can be used for discovering the locations of
@@ -199,16 +211,47 @@
             <td colspan="2">Exception handling class that implements the <code class="docutils literal"><span class="pre">DeserializationExceptionHandler</span></code> interface.</td>
             <td><code class="docutils literal"><span class="pre">LogAndContinueExceptionHandler</span></code></td>
           </tr>
-          <tr class="row-even"><td>default.production.exception.handler</td>
+          <tr class="row-even"><td>default.key.serde</td>
+            <td>Medium</td>
+            <td colspan="2">Default serializer/deserializer class for record keys, implements the <code class="docutils literal"><span class="pre">Serde</span></code> interface (see also default.value.serde).</td>
+            <td><code class="docutils literal"><span class="pre">Serdes.ByteArray().getClass().getName()</span></code></td>
+          </tr>
+          <tr class="row-odd"><td>default.production.exception.handler</td>
             <td>Medium</td>
             <td colspan="2">Exception handling class that implements the <code class="docutils literal"><span class="pre">ProductionExceptionHandler</span></code> interface.</td>
             <td><code class="docutils literal"><span class="pre">DefaultProductionExceptionHandler</span></code></td>
           </tr>
-          <tr class="row-odd"><td>key.serde</td>
+          <tr class="row-even"><td>default.timestamp.extractor</td>
+            <td>Medium</td>
+            <td colspan="2">Timestamp extractor class that implements the <code class="docutils literal"><span class="pre">TimestampExtractor</span></code> interface.</td>
+            <td>See <a class="reference internal" href="#streams-developer-guide-timestamp-extractor"><span class="std std-ref">Timestamp Extractor</span></a></td>
+          </tr>
+          <tr class="row-odd"><td>default.value.serde</td>
             <td>Medium</td>
-            <td colspan="2">Default serializer/deserializer class for record keys, implements the <code class="docutils literal"><span class="pre">Serde</span></code> interface (see also value.serde).</td>
+            <td colspan="2">Default serializer/deserializer class for record values, implements the <code class="docutils literal"><span class="pre">Serde</span></code> interface (see also default.key.serde).</td>
             <td><code class="docutils literal"><span class="pre">Serdes.ByteArray().getClass().getName()</span></code></td>
           </tr>
+          <tr class="row-even"><td>default.windowed.key.serde.inner</td>
+            <td>Medium</td>
+            <td colspan="2">Default serializer/deserializer for the inner class of windowed keys, implementing the <code class="docutils literal"><span class="pre">Serde</span></code> interface.</td>
+            <td>null</td>
+          </tr>
+          <tr class="row-odd"><td>default.windowed.value.serde.inner</td>
+            <td>Medium</td>
+            <td colspan="2">Default serializer/deserializer for the inner class of windowed values, implementing the <code class="docutils literal"><span class="pre">Serde</span></code> interface.</td>
+            <td>null</td>
+          </tr>
+          <tr class="row-even"><td>max.task.idle.ms</td>
+            <td>Medium</td>
+            <td colspan="2">Maximum amount of time a stream task will stay idle while waiting for all partitions to contain data and avoid potential out-of-order record
+              processing across multiple input streams.</td>
+            <td>0 milliseconds</td>
+          </tr>
+          <tr class="row-odd"><td>max.warmup.replicas</td>
+            <td>Medium</td>
+            <td colspan="2">The maximum number of warmup replicas (extra standbys beyond the configured num.standbys) that can be assigned at once.</td>
+            <td>2</td>
+          </tr>
           <tr class="row-even"><td>metric.reporters</td>
             <td>Low</td>
             <td colspan="2">A list of classes to use as metrics reporters.</td>
@@ -244,10 +287,15 @@
             <td colspan="2">Partition grouper class that implements the <code class="docutils literal"><span class="pre">PartitionGrouper</span></code> interface.</td>
             <td>See <a class="reference internal" href="#streams-developer-guide-partition-grouper"><span class="std std-ref">Partition Grouper</span></a></td>
           </tr>
+          <tr class="row-odd"><td>probing.rebalance.interval.ms</td>
+            <td>Low</td>
+            <td colspan="2">The maximum time to wait before triggering a rebalance to probe for warmup replicas that have sufficiently caught up.</td>
+            <td>600000 milliseconds (10 minutes)</td>
+          </tr>
           <tr class="row-even"><td>processing.guarantee</td>
             <td>Medium</td>
             <td colspan="2">The processing mode. Can be either <code class="docutils literal"><span class="pre">"at_least_once"</span></code> (default),
-              <code class="docutils literal"><span class="pre">"exactly_once"</span></code>, or <code class="docutils literal"><span class="pre">"exactly_once_beta"</span></code>.
+              <code class="docutils literal"><span class="pre">"exactly_once"</span></code>, or <code class="docutils literal"><span class="pre">"exactly_once_beta"</span></code></td>.
             <td>See <a class="reference internal" href="#streams-developer-guide-processing-guarantedd"><span class="std std-ref">Processing Guarantee</span></a></td>
           </tr>
           <tr class="row-odd"><td>poll.ms</td>
@@ -260,7 +308,7 @@
             <td colspan="2">The replication factor for changelog topics and repartition topics created by the application.</td>
             <td>1</td>
           </tr>
-          <tr class="row-even"><td>retries</td>
+          <tr class="row-odd"><td>retries</td>
               <td>Medium</td>
               <td colspan="2">The number of retries for broker requests that return a retryable error. </td>
               <td>0</td>
@@ -270,36 +318,31 @@
               <td colspan="2">The amount of time in milliseconds, before a request is retried. This applies if the <code class="docutils literal"><span class="pre">retries</span></code> parameter is configured to be greater than 0. </td>
               <td>100</td>
           </tr>
-          <tr class="row-even"><td>rocksdb.config.setter</td>
+          <tr class="row-odd"><td>rocksdb.config.setter</td>
             <td>Medium</td>
             <td colspan="2">The RocksDB configuration.</td>
             <td></td>
           </tr>
-          <tr class="row-odd"><td>state.cleanup.delay.ms</td>
+          <tr class="row-even"><td>state.cleanup.delay.ms</td>
             <td>Low</td>
             <td colspan="2">The amount of time in milliseconds to wait before deleting state when a partition has migrated.</td>
             <td>600000 milliseconds</td>
           </tr>
-          <tr class="row-even"><td>state.dir</td>
+          <tr class="row-odd"><td>state.dir</td>
             <td>High</td>
             <td colspan="2">Directory location for state stores.</td>
             <td><code class="docutils literal"><span class="pre">/tmp/kafka-streams</span></code></td>
           </tr>
-          <tr class="row-odd"><td>timestamp.extractor</td>
+          <tr class="row-even"><td>topology.optimization</td>
             <td>Medium</td>
-            <td colspan="2">Timestamp extractor class that implements the <code class="docutils literal"><span class="pre">TimestampExtractor</span></code> interface.</td>
-            <td>See <a class="reference internal" href="#streams-developer-guide-timestamp-extractor"><span class="std std-ref">Timestamp Extractor</span></a></td>
+            <td colspan="2">A configuration telling Kafka Streams if it should optimize the topology</td>
+            <td>none</td>
           </tr>
-          <tr class="row-even"><td>upgrade.from</td>
+          <tr class="row-odd"><td>upgrade.from</td>
             <td>Medium</td>
             <td colspan="2">The version you are upgrading from during a rolling upgrade.</td>
             <td>See <a class="reference internal" href="#streams-developer-guide-upgrade-from"><span class="std std-ref">Upgrade From</span></a></td>
           </tr>
-          <tr class="row-odd"><td>value.serde</td>
-            <td>Medium</td>
-            <td colspan="2">Default serializer/deserializer class for record values, implements the <code class="docutils literal"><span class="pre">Serde</span></code> interface (see also key.serde).</td>
-            <td><code class="docutils literal"><span class="pre">Serdes.ByteArray().getClass().getName()</span></code></td>
-          </tr>
           <tr class="row-even"><td>windowstore.changelog.additional.retention.ms</td>
             <td>Low</td>
             <td colspan="2">Added to a windows maintainMs to ensure data is not deleted from the log prematurely. Allows for clock drift.</td>
@@ -307,6 +350,22 @@
           </tr>
           </tbody>
         </table>
+        <div class="section" id="acceptable-recovery-lag">
+          <h4><a class="toc-backref" href="#id27">acceptable.recovery.lag</a><a class="headerlink" href="#acceptable-recovery-lag" title="Permalink to this headline"></a></h4>
+          <blockquote>
+            <div>
+              <p>
+              The maximum acceptable lag (total number of offsets to catch up from the changelog) for an instance to be considered caught-up and able to receive an active task. Streams will only assign
+              stateful active tasks to instances whose state stores are within the acceptable recovery lag, if any exist, and assign warmup replicas to restore state in the background for instances
+              that are not yet caught up. Should correspond to a recovery time of well under a minute for a given workload. Must be at least 0.
+              </p>
+              <p>
+                Note: if you set this to <code>Long.MAX_VALUE</code> it effectively disables the warmup replicas and task high availability, allowing Streams to immediately produce a balanced
+                assignment and migrate tasks to a new instance without first warming them up.
+              </p>
+            </div>
+          </blockquote>
+        </div>
         <div class="section" id="default-deserialization-exception-handler">
           <span id="streams-developer-guide-deh"></span><h4><a class="toc-backref" href="#id7">default.deserialization.exception.handler</a><a class="headerlink" href="#default-deserialization-exception-handler" title="Permalink to this headline"></a></h4>
           <blockquote>
@@ -399,6 +458,95 @@
                          IgnoreRecordTooLargeHandler.class);</pre></div>
           </blockquote>
         </div>
+        <div class="section" id="timestamp-extractor">
+          <span id="streams-developer-guide-timestamp-extractor"></span><h4><a class="toc-backref" href="#id15">default.timestamp.extractor</a><a class="headerlink" href="#timestamp-extractor" title="Permalink to this headline"></a></h4>
+          <blockquote>
+            <div><p>A timestamp extractor pulls a timestamp from an instance of <a class="reference external" href="/{{version}}/javadoc/org/apache/kafka/clients/consumer/ConsumerRecord.html">ConsumerRecord</a>.
+              Timestamps are used to control the progress of streams.</p>
+              <p>The default extractor is
+                <a class="reference external" href="/{{version}}/javadoc/org/apache/kafka/streams/processor/FailOnInvalidTimestamp.html">FailOnInvalidTimestamp</a>.
+                This extractor retrieves built-in timestamps that are automatically embedded into Kafka messages by the Kafka producer
+                client since
+                <a class="reference external" href="https://cwiki.apache.org/confluence/display/KAFKA/KIP-32+-+Add+timestamps+to+Kafka+message">Kafka version 0.10</a>.
+                Depending on the setting of Kafka&#8217;s server-side <code class="docutils literal"><span class="pre">log.message.timestamp.type</span></code> broker and <code class="docutils literal"><span class="pre">message.timestamp.type</span></code> topic parameters,
+                this extractor provides you with:</p>
+              <ul class="simple">
+                <li><strong>event-time</strong> processing semantics if <code class="docutils literal"><span class="pre">log.message.timestamp.type</span></code> is set to <code class="docutils literal"><span class="pre">CreateTime</span></code> aka &#8220;producer time&#8221;
+                  (which is the default).  This represents the time when a Kafka producer sent the original message.  If you use Kafka&#8217;s
+                  official producer client, the timestamp represents milliseconds since the epoch.</li>
+                <li><strong>ingestion-time</strong> processing semantics if <code class="docutils literal"><span class="pre">log.message.timestamp.type</span></code> is set to <code class="docutils literal"><span class="pre">LogAppendTime</span></code> aka &#8220;broker
+                  time&#8221;.  This represents the time when the Kafka broker received the original message, in milliseconds since the epoch.</li>
+              </ul>
+              <p>The <code class="docutils literal"><span class="pre">FailOnInvalidTimestamp</span></code> extractor throws an exception if a record contains an invalid (i.e. negative) built-in
+                timestamp, because Kafka Streams would not process this record but silently drop it.  Invalid built-in timestamps can
+                occur for various reasons:  if for example, you consume a topic that is written to by pre-0.10 Kafka producer clients
+                or by third-party producer clients that don&#8217;t support the new Kafka 0.10 message format yet;  another situation where
+                this may happen is after upgrading your Kafka cluster from <code class="docutils literal"><span class="pre">0.9</span></code> to <code class="docutils literal"><span class="pre">0.10</span></code>, where all the data that was generated
+                with <code class="docutils literal"><span class="pre">0.9</span></code> does not include the <code class="docutils literal"><span class="pre">0.10</span></code> message timestamps.</p>
+              <p>If you have data with invalid timestamps and want to process it, then there are two alternative extractors available.
+                Both work on built-in timestamps, but handle invalid timestamps differently.</p>
+              <ul class="simple">
+                <li><a class="reference external" href="/{{version}}/javadoc/org/apache/kafka/streams/processor/LogAndSkipOnInvalidTimestamp.html">LogAndSkipOnInvalidTimestamp</a>:
+                  This extractor logs a warn message and returns the invalid timestamp to Kafka Streams, which will not process but
+                  silently drop the record.
+                  This log-and-skip strategy allows Kafka Streams to make progress instead of failing if there are records with an
+                  invalid built-in timestamp in your input data.</li>
+                <li><a class="reference external" href="/{{version}}/javadoc/org/apache/kafka/streams/processor/UsePartitionTimeOnInvalidTimestamp.html">UsePartitionTimeOnInvalidTimestamp</a>.
+                  This extractor returns the record&#8217;s built-in timestamp if it is valid (i.e. not negative).  If the record does not
+                  have a valid built-in timestamps, the extractor returns the previously extracted valid timestamp from a record of the
+                  same topic partition as the current record as a timestamp estimation.  In case that no timestamp can be estimated, it
+                  throws an exception.</li>
+              </ul>
+              <p>Another built-in extractor is
+                <a class="reference external" href="/{{version}}/javadoc/org/apache/kafka/streams/processor/WallclockTimestampExtractor.html">WallclockTimestampExtractor</a>.
+                This extractor does not actually &#8220;extract&#8221; a timestamp from the consumed record but rather returns the current time in
+                milliseconds from the system clock (think: <code class="docutils literal"><span class="pre">System.currentTimeMillis()</span></code>), which effectively means Streams will operate
+                on the basis of the so-called <strong>processing-time</strong> of events.</p>
+              <p>You can also provide your own timestamp extractors, for instance to retrieve timestamps embedded in the payload of
+                messages.  If you cannot extract a valid timestamp, you can either throw an exception, return a negative timestamp, or
+                estimate a timestamp.  Returning a negative timestamp will result in data loss &#8211; the corresponding record will not be
+                processed but silently dropped.  If you want to estimate a new timestamp, you can use the value provided via
+                <code class="docutils literal"><span class="pre">previousTimestamp</span></code> (i.e., a Kafka Streams timestamp estimation).  Here is an example of a custom
+                <code class="docutils literal"><span class="pre">TimestampExtractor</span></code> implementation:</p>
+              <div class="highlight-java"><div class="highlight"><pre><span></span><span class="kn">import</span> <span class="nn">org.apache.kafka.clients.consumer.ConsumerRecord</span><span class="o">;</span>
+<span class="kn">import</span> <span class="nn">org.apache.kafka.streams.processor.TimestampExtractor</span><span class="o">;</span>
+
+<span class="c1">// Extracts the embedded timestamp of a record (giving you &quot;event-time&quot; semantics).</span>
+<span class="kd">public</span> <span class="kd">class</span> <span class="nc">MyEventTimeExtractor</span> <span class="kd">implements</span> <span class="n">TimestampExtractor</span> <span class="o">{</span>
+
+  <span class="nd">@Override</span>
+  <span class="kd">public</span> <span class="kt">long</span> <span class="nf">extract</span><span class="o">(</span><span class="kd">final</span> <span class="n">ConsumerRecord</span><span class="o">&lt;</span><span class="n">Object</span><span class="o">,</span> <span class="n">Object</span><span class="o">&gt;</span> <span class="n">record</span><span class="o">,</span> <span class="kd">final</span> <span class="kt">long</span> <span class="n">previousTimestamp</span><span class="o">) [...]
+    <span class="c1">// `Foo` is your own custom class, which we assume has a method that returns</span>
+    <span class="c1">// the embedded timestamp (milliseconds since midnight, January 1, 1970 UTC).</span>
+    <span class="kt">long</span> <span class="n">timestamp</span> <span class="o">=</span> <span class="o">-</span><span class="mi">1</span><span class="o">;</span>
+    <span class="kd">final</span> <span class="n">Foo</span> <span class="n">myPojo</span> <span class="o">=</span> <span class="o">(</span><span class="n">Foo</span><span class="o">)</span> <span class="n">record</span><span class="o">.</span><span class="na">value</span><span class="o">();</span>
+    <span class="k">if</span> <span class="o">(</span><span class="n">myPojo</span> <span class="o">!=</span> <span class="kc">null</span><span class="o">)</span> <span class="o">{</span>
+      <span class="n">timestamp</span> <span class="o">=</span> <span class="n">myPojo</span><span class="o">.</span><span class="na">getTimestampInMillis</span><span class="o">();</span>
+    <span class="o">}</span>
+    <span class="k">if</span> <span class="o">(</span><span class="n">timestamp</span> <span class="o">&lt;</span> <span class="mi">0</span><span class="o">)</span> <span class="o">{</span>
+      <span class="c1">// Invalid timestamp!  Attempt to estimate a new timestamp,</span>
+      <span class="c1">// otherwise fall back to wall-clock time (processing-time).</span>
+      <span class="k">if</span> <span class="o">(</span><span class="n">previousTimestamp</span> <span class="o">&gt;=</span> <span class="mi">0</span><span class="o">)</span> <span class="o">{</span>
+        <span class="k">return</span> <span class="n">previousTimestamp</span><span class="o">;</span>
+      <span class="o">}</span> <span class="k">else</span> <span class="o">{</span>
+        <span class="k">return</span> <span class="n">System</span><span class="o">.</span><span class="na">currentTimeMillis</span><span class="o">();</span>
+      <span class="o">}</span>
+    <span class="o">}</span>
+  <span class="o">}</span>
+
+<span class="o">}</span>
+</pre></div>
+              </div>
+              <p>You would then define the custom timestamp extractor in your Streams configuration as follows:</p>
+              <div class="highlight-java"><div class="highlight"><pre><span></span><span class="kn">import</span> <span class="nn">java.util.Properties</span><span class="o">;</span>
+<span class="kn">import</span> <span class="nn">org.apache.kafka.streams.StreamsConfig</span><span class="o">;</span>
+
+<span class="n">Properties</span> <span class="n">streamsConfiguration</span> <span class="o">=</span> <span class="k">new</span> <span class="n">Properties</span><span class="o">();</span>
+<span class="n">streamsConfiguration</span><span class="o">.</span><span class="na">put</span><span class="o">(</span><span class="n">StreamsConfig</span><span class="o">.</span><span class="na">DEFAULT_TIMESTAMP_EXTRACTOR_CLASS_CONFIG</span><span class="o">,</span> <span class="n">MyEventTimeExtractor</span><span class="o">.</span><span class="na">class</span><span class="o">);</span>
+</pre></div>
+              </div>
+            </div></blockquote>
+        </div>
         <div class="section" id="default-key-serde">
           <h4><a class="toc-backref" href="#id8">default.key.serde</a><a class="headerlink" href="#default-key-serde" title="Permalink to this headline"></a></h4>
           <blockquote>
@@ -425,6 +573,52 @@
               <p>This is discussed in more detail in <a class="reference internal" href="datatypes.html#streams-developer-guide-serdes"><span class="std std-ref">Data types and serialization</span></a>.</p>
             </div></blockquote>
         </div>
+        <div class="section" id="default-windowed-key-serde-inner">
+          <h4><a class="toc-backref" href="#id32">default.windowed.key.serde.inner</a><a class="headerlink" href="#default-windowed-key-serde-inner" title="Permalink to this headline"></a></h4>
+          <blockquote>
+            <div><p>The default Serializer/Deserializer class for the inner class of windowed keys. Serialization and deserialization in Kafka Streams happens
+              whenever data needs to be materialized, for example:</p>
+              <blockquote>
+                <div><ul class="simple">
+                  <li>Whenever data is read from or written to a <em>Kafka topic</em> (e.g., via the <code class="docutils literal"><span class="pre">StreamsBuilder#stream()</span></code> and <code class="docutils literal"><span class="pre">KStream#to()</span></code> methods).</li>
+                  <li>Whenever data is read from or written to a <em>state store</em>.</li>
+                </ul>
+                  <p>This is discussed in more detail in <a class="reference internal" href="datatypes.html#streams-developer-guide-serdes"><span class="std std-ref">Data types and serialization</span></a>.</p>
+                </div></blockquote>
+            </div></blockquote>
+        </div>
+        <div class="section" id="default-windowed-value-serde-inner">
+          <h4><a class="toc-backref" href="#id33">default.windowed.value.serde.inner</a><a class="headerlink" href="#default-windowed-value-serde-inner" title="Permalink to this headline"></a></h4>
+          <blockquote>
+            <div><p>The default Serializer/Deserializer class for the inner class of windowed values. Serialization and deserialization in Kafka Streams happens
+              happens whenever data needs to be materialized, for example:</p>
+              <ul class="simple">
+                <li>Whenever data is read from or written to a <em>Kafka topic</em> (e.g., via the <code class="docutils literal"><span class="pre">StreamsBuilder#stream()</span></code> and <code class="docutils literal"><span class="pre">KStream#to()</span></code> methods).</li>
+                <li>Whenever data is read from or written to a <em>state store</em>.</li>
+              </ul>
+              <p>This is discussed in more detail in <a class="reference internal" href="datatypes.html#streams-developer-guide-serdes"><span class="std std-ref">Data types and serialization</span></a>.</p>
+            </div></blockquote>
+        </div>
+        <div class="section" id="max-task-idle-ms">
+          <span id="streams-developer-guide-max-task-idle-ms"></span><h4><a class="toc-backref" href="#id28">max.task.idle.ms</a><a class="headerlink" href="#max-task-idle-ms" title="Permalink to this headline"></a></h4>
+          <blockquote>
+            <div>
+              The maximum amount of time a task will idle without processing data when waiting for all of its input partition buffers to contain records. This can help avoid potential out-of-order
+              processing when the task has multiple input streams, as in a join, for example. Setting this to a nonzero value may increase latency but will improve time synchronization.
+            </div>
+          </blockquote>
+        </div>
+        <div class="section" id="max-warmup-replicas">
+          <span id="streams-developer-guide-max-warmup-replicas"></span><h4><a class="toc-backref" href="#id29">max.warmup.replicas</a><a class="headerlink" href="#max-warmup-replicas" title="Permalink to this headline"></a></h4>
+          <blockquote>
+            <div>
+              The maximum number of warmup replicas (extra standbys beyond the configured num.standbys) that can be assigned at once for the purpose of keeping
+              the task available on one instance while it is warming up on another instance it has been reassigned to. Used to throttle how much extra broker
+              traffic and cluster state can be used for high availability. Increasing this will allow Streams to warm up more tasks at once, speeding up the time
+              for the reassigned warmups to restore sufficient state for them to be transitioned to active tasks. Must be at least 1.
+            </div>
+          </blockquote>
+        </div>
         <div class="section" id="num-standby-replicas">
           <span id="streams-developer-guide-standby-replicas"></span><h4><a class="toc-backref" href="#id10">num.standby.replicas</a><a class="headerlink" href="#num-standby-replicas" title="Permalink to this headline"></a></h4>
           <blockquote>
@@ -433,13 +627,15 @@
               Standby replicas are used to minimize the latency of task failover.  A task that was previously running on a failed instance is
               preferred to restart on an instance that has standby replicas so that the local state store restoration process from its
               changelog can be minimized.  Details about how Kafka Streams makes use of the standby replicas to minimize the cost of
-              resuming tasks on failover can be found in the <a class="reference internal" href="../architecture.html#streams_architecture_state"><span class="std std-ref">State</span></a> section.</div></blockquote>
+              resuming tasks on failover can be found in the <a class="reference internal" href="../architecture.html#streams_architecture_state"><span class="std std-ref">State</span></a> section.
             </div>
-            <div class="admonition note">
+          </blockquote>
+        </div>
+          <div class="admonition note">
               <p class="first admonition-title">Note</p>
               <p class="last">If you enable <cite>n</cite> standby tasks, you need to provision <cite>n+1</cite> <code class="docutils literal"><span class="pre">KafkaStreams</span></code>
               instances.</p>
-              </div>
+          </div>
         <div class="section" id="num-stream-threads">
           <h4><a class="toc-backref" href="#id11">num.stream.threads</a><a class="headerlink" href="#num-stream-threads" title="Permalink to this headline"></a></h4>
           <blockquote>
@@ -449,10 +645,22 @@
         <div class="section" id="partition-grouper">
           <span id="streams-developer-guide-partition-grouper"></span><h4><a class="toc-backref" href="#id12">partition.grouper</a><a class="headerlink" href="#partition-grouper" title="Permalink to this headline"></a></h4>
           <blockquote>
-            <div>A partition grouper creates a list of stream tasks from the partitions of source topics, where each created task is assigned with a group of source topic partitions.
+            <div>
+              <b>[DEPRECATED]</b> A partition grouper creates a list of stream tasks from the partitions of source topics, where each created task is assigned with a group of source topic partitions.
               The default implementation provided by Kafka Streams is <a class="reference external" href="/{{version}}/javadoc/org/apache/kafka/streams/processor/DefaultPartitionGrouper.html">DefaultPartitionGrouper</a>.
               It assigns each task with one partition for each of the source topic partitions. The generated number of tasks equals the largest
-              number of partitions among the input topics. Usually an application does not need to customize the partition grouper.</div></blockquote>
+              number of partitions among the input topics. Usually an application does not need to customize the partition grouper.
+            </div>
+          </blockquote>
+        </div>
+        <div class="section" id="probing-rebalance-interval-ms">
+          <h4><a class="toc-backref" href="#id30">probing-rebalance.interval.ms</a><a class="headerlink" href="#probing-rebalance-interval-ms" title="Permalink to this headline"></a></h4>
+          <blockquote>
+            <div>
+              The maximum time to wait before triggering a rebalance to probe for warmup replicas that have restored enough to be considered caught up. Streams will only assign stateful active tasks to
+              instances that are caught up and within the <a class="reference internal" href="#acceptable-recovery-lag"><span class="std std-ref">acceptable.recovery.lag</span></a>, if any exist. Probing rebalances are used to query the latest total lag of warmup replicas and transition
+              them to active tasks if ready. They will continue to be triggered as long as there are warmup tasks, and until the assignment is balanced. Must be at least 1 minute.
+            </div></blockquote>
         </div>
         <div class="section" id="processing-guarantee">
           <span id="streams-developer-guide-processing-guarantee"></span><h4><a class="toc-backref" href="#id25">processing.guarantee</a><a class="headerlink" href="#processing-guarantee" title="Permalink to this headline"></a></h4>
@@ -549,94 +757,20 @@
               this path must be unique for each such instance.</div>
           </blockquote>
         </div>
-        <div class="section" id="timestamp-extractor">
-          <span id="streams-developer-guide-timestamp-extractor"></span><h4><a class="toc-backref" href="#id15">timestamp.extractor</a><a class="headerlink" href="#timestamp-extractor" title="Permalink to this headline"></a></h4>
+        <div class="section" id="topology-optimization">
+          <h4><a class="toc-backref" href="#id31">topology.optimization</a><a class="headerlink" href="#topology-optimization" title="Permalink to this headline"></a></h4>
           <blockquote>
-            <div><p>A timestamp extractor pulls a timestamp from an instance of <a class="reference external" href="/{{version}}/javadoc/org/apache/kafka/clients/consumer/ConsumerRecord.html">ConsumerRecord</a>.
-              Timestamps are used to control the progress of streams.</p>
-              <p>The default extractor is
-                <a class="reference external" href="/{{version}}/javadoc/org/apache/kafka/streams/processor/FailOnInvalidTimestamp.html">FailOnInvalidTimestamp</a>.
-                This extractor retrieves built-in timestamps that are automatically embedded into Kafka messages by the Kafka producer
-                client since
-                <a class="reference external" href="https://cwiki.apache.org/confluence/display/KAFKA/KIP-32+-+Add+timestamps+to+Kafka+message">Kafka version 0.10</a>.
-                Depending on the setting of Kafka&#8217;s server-side <code class="docutils literal"><span class="pre">log.message.timestamp.type</span></code> broker and <code class="docutils literal"><span class="pre">message.timestamp.type</span></code> topic parameters,
-                this extractor provides you with:</p>
-              <ul class="simple">
-                <li><strong>event-time</strong> processing semantics if <code class="docutils literal"><span class="pre">log.message.timestamp.type</span></code> is set to <code class="docutils literal"><span class="pre">CreateTime</span></code> aka &#8220;producer time&#8221;
-                  (which is the default).  This represents the time when a Kafka producer sent the original message.  If you use Kafka&#8217;s
-                  official producer client, the timestamp represents milliseconds since the epoch.</li>
-                <li><strong>ingestion-time</strong> processing semantics if <code class="docutils literal"><span class="pre">log.message.timestamp.type</span></code> is set to <code class="docutils literal"><span class="pre">LogAppendTime</span></code> aka &#8220;broker
-                  time&#8221;.  This represents the time when the Kafka broker received the original message, in milliseconds since the epoch.</li>
-              </ul>
-              <p>The <code class="docutils literal"><span class="pre">FailOnInvalidTimestamp</span></code> extractor throws an exception if a record contains an invalid (i.e. negative) built-in
-                timestamp, because Kafka Streams would not process this record but silently drop it.  Invalid built-in timestamps can
-                occur for various reasons:  if for example, you consume a topic that is written to by pre-0.10 Kafka producer clients
-                or by third-party producer clients that don&#8217;t support the new Kafka 0.10 message format yet;  another situation where
-                this may happen is after upgrading your Kafka cluster from <code class="docutils literal"><span class="pre">0.9</span></code> to <code class="docutils literal"><span class="pre">0.10</span></code>, where all the data that was generated
-                with <code class="docutils literal"><span class="pre">0.9</span></code> does not include the <code class="docutils literal"><span class="pre">0.10</span></code> message timestamps.</p>
-              <p>If you have data with invalid timestamps and want to process it, then there are two alternative extractors available.
-                Both work on built-in timestamps, but handle invalid timestamps differently.</p>
-              <ul class="simple">
-                <li><a class="reference external" href="/{{version}}/javadoc/org/apache/kafka/streams/processor/LogAndSkipOnInvalidTimestamp.html">LogAndSkipOnInvalidTimestamp</a>:
-                  This extractor logs a warn message and returns the invalid timestamp to Kafka Streams, which will not process but
-                  silently drop the record.
-                  This log-and-skip strategy allows Kafka Streams to make progress instead of failing if there are records with an
-                  invalid built-in timestamp in your input data.</li>
-                <li><a class="reference external" href="/{{version}}/javadoc/org/apache/kafka/streams/processor/UsePartitionTimeOnInvalidTimestamp.html">UsePartitionTimeOnInvalidTimestamp</a>.
-                  This extractor returns the record&#8217;s built-in timestamp if it is valid (i.e. not negative).  If the record does not
-                  have a valid built-in timestamps, the extractor returns the previously extracted valid timestamp from a record of the
-                  same topic partition as the current record as a timestamp estimation.  In case that no timestamp can be estimated, it
-                  throws an exception.</li>
-              </ul>
-              <p>Another built-in extractor is
-                <a class="reference external" href="/{{version}}/javadoc/org/apache/kafka/streams/processor/WallclockTimestampExtractor.html">WallclockTimestampExtractor</a>.
-                This extractor does not actually &#8220;extract&#8221; a timestamp from the consumed record but rather returns the current time in
-                milliseconds from the system clock (think: <code class="docutils literal"><span class="pre">System.currentTimeMillis()</span></code>), which effectively means Streams will operate
-                on the basis of the so-called <strong>processing-time</strong> of events.</p>
-              <p>You can also provide your own timestamp extractors, for instance to retrieve timestamps embedded in the payload of
-                messages.  If you cannot extract a valid timestamp, you can either throw an exception, return a negative timestamp, or
-                estimate a timestamp.  Returning a negative timestamp will result in data loss &#8211; the corresponding record will not be
-                processed but silently dropped.  If you want to estimate a new timestamp, you can use the value provided via
-                <code class="docutils literal"><span class="pre">previousTimestamp</span></code> (i.e., a Kafka Streams timestamp estimation).  Here is an example of a custom
-                <code class="docutils literal"><span class="pre">TimestampExtractor</span></code> implementation:</p>
-              <div class="highlight-java"><div class="highlight"><pre><span></span><span class="kn">import</span> <span class="nn">org.apache.kafka.clients.consumer.ConsumerRecord</span><span class="o">;</span>
-<span class="kn">import</span> <span class="nn">org.apache.kafka.streams.processor.TimestampExtractor</span><span class="o">;</span>
-
-<span class="c1">// Extracts the embedded timestamp of a record (giving you &quot;event-time&quot; semantics).</span>
-<span class="kd">public</span> <span class="kd">class</span> <span class="nc">MyEventTimeExtractor</span> <span class="kd">implements</span> <span class="n">TimestampExtractor</span> <span class="o">{</span>
-
-  <span class="nd">@Override</span>
-  <span class="kd">public</span> <span class="kt">long</span> <span class="nf">extract</span><span class="o">(</span><span class="kd">final</span> <span class="n">ConsumerRecord</span><span class="o">&lt;</span><span class="n">Object</span><span class="o">,</span> <span class="n">Object</span><span class="o">&gt;</span> <span class="n">record</span><span class="o">,</span> <span class="kd">final</span> <span class="kt">long</span> <span class="n">previousTimestamp</span><span class="o">) [...]
-    <span class="c1">// `Foo` is your own custom class, which we assume has a method that returns</span>
-    <span class="c1">// the embedded timestamp (milliseconds since midnight, January 1, 1970 UTC).</span>
-    <span class="kt">long</span> <span class="n">timestamp</span> <span class="o">=</span> <span class="o">-</span><span class="mi">1</span><span class="o">;</span>
-    <span class="kd">final</span> <span class="n">Foo</span> <span class="n">myPojo</span> <span class="o">=</span> <span class="o">(</span><span class="n">Foo</span><span class="o">)</span> <span class="n">record</span><span class="o">.</span><span class="na">value</span><span class="o">();</span>
-    <span class="k">if</span> <span class="o">(</span><span class="n">myPojo</span> <span class="o">!=</span> <span class="kc">null</span><span class="o">)</span> <span class="o">{</span>
-      <span class="n">timestamp</span> <span class="o">=</span> <span class="n">myPojo</span><span class="o">.</span><span class="na">getTimestampInMillis</span><span class="o">();</span>
-    <span class="o">}</span>
-    <span class="k">if</span> <span class="o">(</span><span class="n">timestamp</span> <span class="o">&lt;</span> <span class="mi">0</span><span class="o">)</span> <span class="o">{</span>
-      <span class="c1">// Invalid timestamp!  Attempt to estimate a new timestamp,</span>
-      <span class="c1">// otherwise fall back to wall-clock time (processing-time).</span>
-      <span class="k">if</span> <span class="o">(</span><span class="n">previousTimestamp</span> <span class="o">&gt;=</span> <span class="mi">0</span><span class="o">)</span> <span class="o">{</span>
-        <span class="k">return</span> <span class="n">previousTimestamp</span><span class="o">;</span>
-      <span class="o">}</span> <span class="k">else</span> <span class="o">{</span>
-        <span class="k">return</span> <span class="n">System</span><span class="o">.</span><span class="na">currentTimeMillis</span><span class="o">();</span>
-      <span class="o">}</span>
-    <span class="o">}</span>
-  <span class="o">}</span>
-
-<span class="o">}</span>
-</pre></div>
-              </div>
-              <p>You would then define the custom timestamp extractor in your Streams configuration as follows:</p>
-              <div class="highlight-java"><div class="highlight"><pre><span></span><span class="kn">import</span> <span class="nn">java.util.Properties</span><span class="o">;</span>
-<span class="kn">import</span> <span class="nn">org.apache.kafka.streams.StreamsConfig</span><span class="o">;</span>
-
-<span class="n">Properties</span> <span class="n">streamsConfiguration</span> <span class="o">=</span> <span class="k">new</span> <span class="n">Properties</span><span class="o">();</span>
-<span class="n">streamsConfiguration</span><span class="o">.</span><span class="na">put</span><span class="o">(</span><span class="n">StreamsConfig</span><span class="o">.</span><span class="na">DEFAULT_TIMESTAMP_EXTRACTOR_CLASS_CONFIG</span><span class="o">,</span> <span class="n">MyEventTimeExtractor</span><span class="o">.</span><span class="na">class</span><span class="o">);</span>
-</pre></div>
-              </div>
-            </div></blockquote>
+            <div>
+              <p>
+                You can tell Streams to apply topology optimizations by setting this config. The optimizations are currently all or none and disabled by default.
+                These optimizations include moving/reducing repartition topics and reusing the source topic as the changelog for source KTables. It is recommended to enable this.
+              </p>
+              <p>
+                Note that as of 2.3, you need to do two things to enable optimizations. In addition to setting this config to <code>StreamsConfig.OPTIMIZE</code>, you'll need to pass in your
+                configuration properties when building your topology by using the overloaded <code>StreamsBuilder.build(Properties)</code> method.
+                For example <code>KafkaStreams myStream = new KafkaStreams(streamsBuilder.build(properties), properties)</code>.
+              </p>
+          </div></blockquote>
         </div>
         <div class="section" id="upgrade-from">
           <h4><a class="toc-backref" href="#id14">upgrade.from</a><a class="headerlink" href="#upgrade-from" title="Permalink to this headline"></a></h4>
diff --git a/docs/streams/developer-guide/memory-mgmt.html b/docs/streams/developer-guide/memory-mgmt.html
index 5f1158f..d4c2bef 100644
--- a/docs/streams/developer-guide/memory-mgmt.html
+++ b/docs/streams/developer-guide/memory-mgmt.html
@@ -206,7 +206,10 @@
        <span class="o">}</span>
     <span class="o">}</span>
       </div>
-        <sup id="fn1">1. INDEX_FILTER_BLOCK_RATIO can be used to set a fraction of the block cache to set aside for "high priority" (aka index and filter) blocks, preventing them from being evicted by data blocks. See the full signature of the <a class="reference external" href="https://github.com/facebook/rocksdb/blob/master/java/src/main/java/org/rocksdb/LRUCache.java#L72">LRUCache constructor</a>. </sup>
+        <sup id="fn1">1. INDEX_FILTER_BLOCK_RATIO can be used to set a fraction of the block cache to set aside for "high priority" (aka index and filter) blocks, preventing them from being evicted by data blocks. See the full signature of the <a class="reference external" href="https://github.com/facebook/rocksdb/blob/master/java/src/main/java/org/rocksdb/LRUCache.java#L72">LRUCache constructor</a>.
+                        NOTE: the boolean parameter in the cache constructor lets you control whether the cache should enforce a strict memory limit by failing the read or iteration in the rare cases where it might go larger than its capacity. Due to a
+                        <a class="reference external" href="https://github.com/facebook/rocksdb/issues/6247">bug in RocksDB</a>, this option cannot be used
+                        if the write buffer memory is also counted against the cache. If you set this to true, you should NOT pass the cache in to the <code>WriteBufferManager</code> and just control the write buffer and cache memory separately.</sup>
         <br>
         <sup id="fn2">2. This must be set in order for INDEX_FILTER_BLOCK_RATIO to take effect (see footnote 1) as described in the <a class="reference external" href="https://github.com/facebook/rocksdb/wiki/Block-Cache#caching-index-and-filter-blocks">RocksDB docs</a></sup>
         <br>
diff --git a/docs/streams/developer-guide/running-app.html b/docs/streams/developer-guide/running-app.html
index b2c4fc5..fccb5090 100644
--- a/docs/streams/developer-guide/running-app.html
+++ b/docs/streams/developer-guide/running-app.html
@@ -110,6 +110,18 @@ $ java -cp path-to-app-fatjar.jar com.example.MyStreamsApp
                       <li>If a local state store exists, the changelog is replayed from the previously checkpointed offset. The changes are applied and the state is restored to the most recent snapshot. This method takes less time because it is applying a smaller portion of the changelog.</li>
                   </ul>
                   <p>For more information, see <a class="reference internal" href="config-streams.html#num-standby-replicas"><span class="std std-ref">Standby Replicas</span></a>.</p>
+                  <p>
+                      As of version 2.6, Streams will now do most of a task's restoration in the background through warmup replicas. These will be assigned to instances that need to restore a lot of state for a task.
+                      A stateful active task will only be assigned to an instance once its state is within the configured
+                      <a class="reference internal" href="config-streams.html#acceptable-recovery-lag"><span class="std std-ref"><code>acceptable.recovery.lag</code></span></a>, if one exists. This means that
+                      most of the time, a task migration will <b>not</b> result in downtime for that task. It will remain active on the instance that's already caught up, while the instance that it's being
+                      migrated to works on restoring the state. Streams will <a class="reference internal" href="config-streams.html#probing-rebalance-interval-ms"><span class="std std-ref">regularly probe</span></a> for warmup tasks that have finished restoring and transition them to active tasks when ready.
+                   </p>
+                  <p>
+                      Note, the one exception to this task availability is if none of the instances have a caught up version of that task. In that case, we have no choice but to assign the active
+                      task to an instance that is not caught up and will have to block further processing on restoration of the task's state from the changelog. If high availability is important
+                      for your application, you are highly recommended to enable standbys.
+                  </p>
               </div>
               <div class="section" id="determining-how-many-application-instances-to-run">
                   <h3><a class="toc-backref" href="#id8">Determining how many application instances to run</a><a class="headerlink" href="#determining-how-many-application-instances-to-run" title="Permalink to this headline"></a></h3>
diff --git a/docs/streams/upgrade-guide.html b/docs/streams/upgrade-guide.html
index 98e0c93..4694451 100644
--- a/docs/streams/upgrade-guide.html
+++ b/docs/streams/upgrade-guide.html
@@ -42,7 +42,7 @@
     <ul>
         <li> prepare your application instances for a rolling bounce and make sure that config <code>upgrade.from</code> is set to the version from which it is being upgrade.</li>
         <li> bounce each instance of your application once </li>
-        <li> prepare your newly deployed {{fullDotVersion}} application instances for a second round of rolling bounces; make sure to remove the value for config <code>upgrade.mode</code> </li>
+        <li> prepare your newly deployed {{fullDotVersion}} application instances for a second round of rolling bounces; make sure to remove the value for config <code>upgrade.from</code> </li>
         <li> bounce each instance of your application once more to complete the upgrade </li>
     </ul>
     <p> As an alternative, an offline upgrade is also possible. Upgrading from any versions as old as 0.10.0.x to {{fullDotVersion}} in offline mode require the following steps: </p>
@@ -94,6 +94,21 @@
         new value <code>"exactly_once_beta"</code>.
         Note that you need brokers with version 2.5 or newer to use this feature.
     </p>
+
+    <p>
+        For more highly available stateful applications, we've modified the task assignment algorithm to delay the movement of stateful active tasks to instances
+        that aren't yet caught up with that task's state. Instead, to migrate a task from one instance to another (eg when scaling out),
+        Streams will assign a warmup replica to the target instance so it can begin restoring the state while the active task stays available on an instance
+        that already had the task. The instances warming up tasks will communicate their progress to the group so that, once ready, Streams can move active
+        tasks to their new owners in the background. Check out <a href="https://cwiki.apache.org/confluence/x/0i4lBg">KIP-441</a>
+        for full details, including several new configs for control over this new feature.
+    </p>
+
+    <p>
+        New end-to-end latency metrics have been added. These task-level metrics will be logged at the INFO level and report the min and max end-to-end latency of a record at the beginning/source node(s)
+        and end/terminal node(s) of a task. See <a href="https://cwiki.apache.org/confluence/x/gBkRCQ">KIP-613</a> for more information.
+    </p>
+
     <p>
         As of 2.6.0 Kafka Streams deprecates <code>KStream.through()</code> in favor of the new <code>KStream.repartition()</code> operator
         (as per <a href="https://cwiki.apache.org/confluence/display/KAFKA/KIP-221%3A+Enhance+DSL+with+Connecting+Topic+Creation+and+Repartition+Hint">KIP-221</a>).
diff --git a/streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java b/streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java
index f9e75fe..71db2e3 100644
--- a/streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java
+++ b/streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java
@@ -360,40 +360,31 @@ public class StreamsConfig extends AbstractConfig {
         " (Note, if <code>processing.guarantee</code> is set to <code>" + EXACTLY_ONCE + "</code>, the default value is <code>" + EOS_DEFAULT_COMMIT_INTERVAL_MS + "</code>," +
         " otherwise the default value is <code>" + DEFAULT_COMMIT_INTERVAL_MS + "</code>.";
 
-    /** {@code max.task.idle.ms} */
-    public static final String MAX_TASK_IDLE_MS_CONFIG = "max.task.idle.ms";
-    private static final String MAX_TASK_IDLE_MS_DOC = "Maximum amount of time a stream task will stay idle when not all of its partition buffers contain records," +
-        " to avoid potential out-of-order record processing across multiple input streams.";
-
     /** {@code connections.max.idle.ms} */
     @SuppressWarnings("WeakerAccess")
     public static final String CONNECTIONS_MAX_IDLE_MS_CONFIG = CommonClientConfigs.CONNECTIONS_MAX_IDLE_MS_CONFIG;
 
-    /**
-     * {@code default.deserialization.exception.handler}
-     */
+    /** {@code default.deserialization.exception.handler} */
     @SuppressWarnings("WeakerAccess")
     public static final String DEFAULT_DESERIALIZATION_EXCEPTION_HANDLER_CLASS_CONFIG = "default.deserialization.exception.handler";
     private static final String DEFAULT_DESERIALIZATION_EXCEPTION_HANDLER_CLASS_DOC = "Exception handling class that implements the <code>org.apache.kafka.streams.errors.DeserializationExceptionHandler</code> interface.";
 
-    /**
-     * {@code default.production.exception.handler}
-     */
+    /** {@code default.production.exception.handler} */
     @SuppressWarnings("WeakerAccess")
     public static final String DEFAULT_PRODUCTION_EXCEPTION_HANDLER_CLASS_CONFIG = "default.production.exception.handler";
     private static final String DEFAULT_PRODUCTION_EXCEPTION_HANDLER_CLASS_DOC = "Exception handling class that implements the <code>org.apache.kafka.streams.errors.ProductionExceptionHandler</code> interface.";
 
-    /**
-     * {@code default.windowed.key.serde.inner}
-     */
+    /** {@code default.windowed.key.serde.inner} */
     @SuppressWarnings("WeakerAccess")
     public static final String DEFAULT_WINDOWED_KEY_SERDE_INNER_CLASS = "default.windowed.key.serde.inner";
+    private static final String DEFAULT_WINDOWED_KEY_SERDE_INNER_CLASS_DOC = "Default serializer / deserializer for the inner class of a windowed key. Must implement the " +
+        "<code>org.apache.kafka.common.serialization.Serde</code> interface.";
 
-    /**
-     * {@code default.windowed.value.serde.inner}
-     */
+    /** {@code default.windowed.value.serde.inner} */
     @SuppressWarnings("WeakerAccess")
     public static final String DEFAULT_WINDOWED_VALUE_SERDE_INNER_CLASS = "default.windowed.value.serde.inner";
+    private static final String DEFAULT_WINDOWED_VALUE_SERDE_INNER_CLASS_DOC = "Default serializer / deserializer for the inner class of a windowed value. Must implement the " +
+        "<code>org.apache.kafka.common.serialization.Serde</code> interface.";
 
     /** {@code default key.serde} */
     @SuppressWarnings("WeakerAccess")
@@ -414,6 +405,11 @@ public class StreamsConfig extends AbstractConfig {
     public static final String DEFAULT_TIMESTAMP_EXTRACTOR_CLASS_CONFIG = "default.timestamp.extractor";
     private static final String DEFAULT_TIMESTAMP_EXTRACTOR_CLASS_DOC = "Default timestamp extractor class that implements the <code>org.apache.kafka.streams.processor.TimestampExtractor</code> interface.";
 
+    /** {@code max.task.idle.ms} */
+    public static final String MAX_TASK_IDLE_MS_CONFIG = "max.task.idle.ms";
+    private static final String MAX_TASK_IDLE_MS_DOC = "Maximum amount of time a stream task will stay idle when not all of its partition buffers contain records," +
+        " to avoid potential out-of-order record processing across multiple input streams.";
+
     /** {@code max.warmup.replicas} */
     public static final String MAX_WARMUP_REPLICAS_CONFIG = "max.warmup.replicas";
     private static final String MAX_WARMUP_REPLICAS_DOC = "The maximum number of warmup replicas (extra standbys beyond the configured num.standbys) that can be assigned at once for the purpose of keeping " +
@@ -632,16 +628,16 @@ public class StreamsConfig extends AbstractConfig {
                     Serdes.ByteArraySerde.class.getName(),
                     Importance.MEDIUM,
                     DEFAULT_VALUE_SERDE_CLASS_DOC)
-            .define(NUM_STANDBY_REPLICAS_CONFIG,
-                    Type.INT,
-                    0,
+            .define(DEFAULT_WINDOWED_KEY_SERDE_INNER_CLASS,
+                    Type.CLASS,
+                    null,
                     Importance.MEDIUM,
-                    NUM_STANDBY_REPLICAS_DOC)
-            .define(NUM_STREAM_THREADS_CONFIG,
-                    Type.INT,
-                    1,
+                    DEFAULT_WINDOWED_KEY_SERDE_INNER_CLASS_DOC)
+            .define(DEFAULT_WINDOWED_VALUE_SERDE_INNER_CLASS,
+                    Type.CLASS,
+                    null,
                     Importance.MEDIUM,
-                    NUM_STREAM_THREADS_DOC)
+                    DEFAULT_WINDOWED_VALUE_SERDE_INNER_CLASS_DOC)
             .define(MAX_TASK_IDLE_MS_CONFIG,
                     Type.LONG,
                     0L,
@@ -653,6 +649,16 @@ public class StreamsConfig extends AbstractConfig {
                     atLeast(1),
                     Importance.MEDIUM,
                     MAX_WARMUP_REPLICAS_DOC)
+            .define(NUM_STANDBY_REPLICAS_CONFIG,
+                    Type.INT,
+                    0,
+                    Importance.MEDIUM,
+                    NUM_STANDBY_REPLICAS_DOC)
+            .define(NUM_STREAM_THREADS_CONFIG,
+                    Type.INT,
+                    1,
+                    Importance.MEDIUM,
+                    NUM_STREAM_THREADS_DOC)
             .define(PROCESSING_GUARANTEE_CONFIG,
                     Type.STRING,
                     AT_LEAST_ONCE,


[kafka] 02/02: MINOR: Streams docs fixes (#9308)

Posted by mj...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

mjsax pushed a commit to branch 2.6
in repository https://gitbox.apache.org/repos/asf/kafka.git

commit 149897976214a769a35cf87eecf9c02221f8d597
Author: A. Sophie Blee-Goldman <so...@confluent.io>
AuthorDate: Sat Sep 19 17:25:47 2020 -0700

    MINOR: Streams docs fixes (#9308)
    
    Reviewers: Guozhang Wang <wa...@gmail.com>
---
 docs/streams/developer-guide/config-streams.html | 10 ++++------
 1 file changed, 4 insertions(+), 6 deletions(-)

diff --git a/docs/streams/developer-guide/config-streams.html b/docs/streams/developer-guide/config-streams.html
index 8388476..a1674b0 100644
--- a/docs/streams/developer-guide/config-streams.html
+++ b/docs/streams/developer-guide/config-streams.html
@@ -552,13 +552,12 @@
           <blockquote>
             <div><p>The default Serializer/Deserializer class for record keys. Serialization and deserialization in Kafka Streams happens
               whenever data needs to be materialized, for example:</p>
-              <blockquote>
                 <div><ul class="simple">
                   <li>Whenever data is read from or written to a <em>Kafka topic</em> (e.g., via the <code class="docutils literal"><span class="pre">StreamsBuilder#stream()</span></code> and <code class="docutils literal"><span class="pre">KStream#to()</span></code> methods).</li>
                   <li>Whenever data is read from or written to a <em>state store</em>.</li>
                 </ul>
                   <p>This is discussed in more detail in <a class="reference internal" href="datatypes.html#streams-developer-guide-serdes"><span class="std std-ref">Data types and serialization</span></a>.</p>
-                </div></blockquote>
+                </div>
             </div></blockquote>
         </div>
         <div class="section" id="default-value-serde">
@@ -570,7 +569,7 @@
                 <li>Whenever data is read from or written to a <em>Kafka topic</em> (e.g., via the <code class="docutils literal"><span class="pre">StreamsBuilder#stream()</span></code> and <code class="docutils literal"><span class="pre">KStream#to()</span></code> methods).</li>
                 <li>Whenever data is read from or written to a <em>state store</em>.</li>
               </ul>
-              <p>This is discussed in more detail in <a class="reference internal" href="datatypes.html#streams-developer-guide-serdes"><span class="std std-ref">Data types and serialization</span></a>.</p>
+                <p>This is discussed in more detail in <a class="reference internal" href="datatypes.html#streams-developer-guide-serdes"><span class="std std-ref">Data types and serialization</span></a>.</p>
             </div></blockquote>
         </div>
         <div class="section" id="default-windowed-key-serde-inner">
@@ -578,13 +577,12 @@
           <blockquote>
             <div><p>The default Serializer/Deserializer class for the inner class of windowed keys. Serialization and deserialization in Kafka Streams happens
               whenever data needs to be materialized, for example:</p>
-              <blockquote>
                 <div><ul class="simple">
                   <li>Whenever data is read from or written to a <em>Kafka topic</em> (e.g., via the <code class="docutils literal"><span class="pre">StreamsBuilder#stream()</span></code> and <code class="docutils literal"><span class="pre">KStream#to()</span></code> methods).</li>
                   <li>Whenever data is read from or written to a <em>state store</em>.</li>
                 </ul>
                   <p>This is discussed in more detail in <a class="reference internal" href="datatypes.html#streams-developer-guide-serdes"><span class="std std-ref">Data types and serialization</span></a>.</p>
-                </div></blockquote>
+                </div>
             </div></blockquote>
         </div>
         <div class="section" id="default-windowed-value-serde-inner">
@@ -654,7 +652,7 @@
           </blockquote>
         </div>
         <div class="section" id="probing-rebalance-interval-ms">
-          <h4><a class="toc-backref" href="#id30">probing-rebalance.interval.ms</a><a class="headerlink" href="#probing-rebalance-interval-ms" title="Permalink to this headline"></a></h4>
+          <h4><a class="toc-backref" href="#id30">probing.rebalance.interval.ms</a><a class="headerlink" href="#probing-rebalance-interval-ms" title="Permalink to this headline"></a></h4>
           <blockquote>
             <div>
               The maximum time to wait before triggering a rebalance to probe for warmup replicas that have restored enough to be considered caught up. Streams will only assign stateful active tasks to