You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by bb...@apache.org on 2020/11/04 13:34:31 UTC

[kafka] branch 2.7 updated: KAFKA-10679: [Streams] migrate kafka-site updated docs to kafka/docs (#9554)

This is an automated email from the ASF dual-hosted git repository.

bbejeck pushed a commit to branch 2.7
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/2.7 by this push:
     new 729401f  KAFKA-10679: [Streams] migrate kafka-site updated docs to kafka/docs (#9554)
729401f is described below

commit 729401fa8e30fff432d14a7cb0d6c08980c6499b
Author: Bill Bejeck <bb...@gmail.com>
AuthorDate: Wed Nov 4 08:30:10 2020 -0500

    KAFKA-10679: [Streams] migrate kafka-site updated docs to kafka/docs (#9554)
    
    During the AK website upgrade, changes made to kafka-site weren't migrated back to kafka-docs.
    
    This PR is an attempt at porting the streams changes to kafka/docs
    
    For the most part, the bulk of the changes in the PR are cosmetic.
    
    For testing:
    
    I reviewed the PR diffs
    Rendered the changes locally
    
    Reviewers: John Roesler <jo...@confluent.io>
---
 .../developer-guide/dsl-topology-naming.html       |   2 +-
 docs/streams/architecture.html                     |  12 +-
 docs/streams/core-concepts.html                    |  18 +-
 docs/streams/developer-guide/app-reset-tool.html   |  12 +-
 docs/streams/developer-guide/config-streams.html   | 412 ++++++++++-----------
 docs/streams/developer-guide/datatypes.html        |  18 +-
 docs/streams/developer-guide/dsl-api.html          | 241 +++++-------
 .../developer-guide/dsl-topology-naming.html       |  66 ++--
 docs/streams/developer-guide/index.html            |   4 +-
 .../developer-guide/interactive-queries.html       |  41 +-
 docs/streams/developer-guide/manage-topics.html    |   4 +-
 docs/streams/developer-guide/memory-mgmt.html      |  24 +-
 docs/streams/developer-guide/processor-api.html    |  34 +-
 docs/streams/developer-guide/running-app.html      |  11 +-
 docs/streams/developer-guide/security.html         |  17 +-
 docs/streams/developer-guide/testing.html          |  92 ++---
 docs/streams/developer-guide/write-streams.html    |  30 +-
 docs/streams/index.html                            |  32 +-
 docs/streams/quickstart.html                       | 106 ++----
 docs/streams/tutorial.html                         | 174 +++------
 docs/streams/upgrade-guide.html                    |  33 +-
 21 files changed, 555 insertions(+), 828 deletions(-)

diff --git a/docs/documentation/streams/developer-guide/dsl-topology-naming.html b/docs/documentation/streams/developer-guide/dsl-topology-naming.html
index db5eee3..9f42a04 100644
--- a/docs/documentation/streams/developer-guide/dsl-topology-naming.html
+++ b/docs/documentation/streams/developer-guide/dsl-topology-naming.html
@@ -16,4 +16,4 @@
 -->
 
 <!-- should always link the latest release's documentation -->
-<!--#include virtual="../../../streams/dsl-topology-naming.html" -->
+<!--#include virtual="../../../streams/developer-guide/dsl-topology-naming.html" -->
diff --git a/docs/streams/architecture.html b/docs/streams/architecture.html
index 43de9e7..67594c2 100644
--- a/docs/streams/architecture.html
+++ b/docs/streams/architecture.html
@@ -41,7 +41,7 @@
     </p>
     <img class="centered" src="/{{version}}/images/streams-architecture-overview.jpg" style="width:750px">
 
-    <h3><a id="streams_architecture_tasks" href="#streams_architecture_tasks">Stream Partitions and Tasks</a></h3>
+    <h3 class="anchor-heading"><a id="streams_architecture_tasks" class="anchor-link"></a><a href="#streams_architecture_tasks">Stream Partitions and Tasks</a></h3>
 
     <p>
         The messaging layer of Kafka partitions data for storing and transporting it. Kafka Streams partitions data for processing it.
@@ -91,7 +91,7 @@
     <img class="centered" src="/{{version}}/images/streams-architecture-tasks.jpg" style="width:400px">
     <br>
 
-    <h3><a id="streams_architecture_threads" href="#streams_architecture_threads">Threading Model</a></h3>
+    <h3 class="anchor-heading"><a id="streams_architecture_threads" class="anchor-link"></a><a href="#streams_architecture_threads">Threading Model</a></h3>
 
     <p>
         Kafka Streams allows the user to configure the number of <b>threads</b> that the library can use to parallelize processing within an application instance.
@@ -112,7 +112,7 @@
     </p>
     <br>
 
-    <h3><a id="streams_architecture_state" href="#streams_architecture_state">Local State Stores</a></h3>
+    <h3 class="anchor-heading"><a id="streams_architecture_state" class="anchor-link"></a><a href="#streams_architecture_state">Local State Stores</a></h3>
 
     <p>
         Kafka Streams provides so-called <b>state stores</b>, which can be used by stream processing applications to store and query data,
@@ -131,7 +131,7 @@
     <img class="centered" src="/{{version}}/images/streams-architecture-states.jpg" style="width:400px">
     <br>
 
-    <h3><a id="streams_architecture_recovery" href="#streams_architecture_recovery">Fault Tolerance</a></h3>
+    <h3 class="anchor-heading"><a id="streams_architecture_recovery" class="anchor-link"></a><a href="#streams_architecture_recovery">Fault Tolerance</a></h3>
 
     <p>
         Kafka Streams builds on fault-tolerance capabilities integrated natively within Kafka. Kafka partitions are highly available and replicated; so when stream data is persisted to Kafka it is available
@@ -165,10 +165,10 @@
 
 <!--#include virtual="../../includes/_header.htm" -->
 <!--#include virtual="../../includes/_top.htm" -->
-<div class="content documentation documentation--current">
+<div class="content documentation ">
     <!--#include virtual="../../includes/_nav.htm" -->
     <div class="right">
-        <!--#include virtual="../../includes/_docs_banner.htm" -->
+        <!--//#include virtual="../../includes/_docs_banner.htm" -->
         <ul class="breadcrumbs">
             <li><a href="/documentation">Documentation</a></li>
             <li><a href="/documentation/streams">Kafka Streams</a></li>
diff --git a/docs/streams/core-concepts.html b/docs/streams/core-concepts.html
index cdb9fc1..ddb37ea 100644
--- a/docs/streams/core-concepts.html
+++ b/docs/streams/core-concepts.html
@@ -58,7 +58,7 @@
         We first summarize the key concepts of Kafka Streams.
     </p>
 
-    <h3><a id="streams_topology" href="#streams_topology">Stream Processing Topology</a></h3>
+    <h3 class="anchor-heading"><a id="streams_topology" class="anchor-link"></a><a href="#streams_topology">Stream Processing Topology</a></h3>
 
     <ul>
         <li>A <b>stream</b> is the most important abstraction provided by Kafka Streams: it represents an unbounded, continuously updating data set. A stream is an ordered, replayable, and fault-tolerant sequence of immutable data records, where a <b>data record</b> is defined as a key-value pair.</li>
@@ -88,7 +88,7 @@
         At runtime, the logical topology is instantiated and replicated inside the application for parallel processing (see <a href="/{{version}}/documentation/streams/architecture#streams_architecture_tasks"><b>Stream Partitions and Tasks</b></a> for details).
     </p>
 
-    <h3><a id="streams_time" href="#streams_time">Time</a></h3>
+    <h3 class="anchor-heading"><a id="streams_time" class="anchor-link"></a><a href="#streams_time">Time</a></h3>
 
     <p>
         A critical aspect in stream processing is the notion of <b>time</b>, and how it is modeled and integrated.
@@ -157,7 +157,7 @@
 
     </p>
 
-    <h3><a id="streams_concepts_duality" href="#streams-concepts-duality">Duality of Streams and Tables</a></h3>
+    <h3 class="anchor-heading"><a id="streams_concepts_duality" class="anchor-link"></a><a href="#streams_concepts_duality">Duality of Streams and Tables</a></h3>
     <p>
         When implementing stream processing use cases in practice, you typically need both <strong>streams</strong> and also <strong>databases</strong>.
         An example use case that is very common in practice is an e-commerce application that enriches an incoming <em>stream</em> of customer
@@ -176,9 +176,9 @@
       or to run <a id="streams-developer-guide-interactive-queries" href="/{{version}}/documentation/streams/developer-guide/interactive-queries#interactive-queries">interactive queries</a>
       against your application's latest processing results. And, beyond its internal usage, the Kafka Streams API
       also allows developers to exploit this duality in their own applications.
-    </p>
+  </p>
 
-    <p>
+  <p>
       Before we discuss concepts such as <a id="streams-developer-guide-dsl-aggregating" href="/{{version}}/documentation/streams/developer-guide/dsl-api#aggregating">aggregations</a>
       in Kafka Streams, we must first introduce <strong>tables</strong> in more detail, and talk about the aforementioned stream-table duality.
       Essentially, this duality means that a stream can be viewed as a table, and a table can be viewed as a stream. Kafka's log compaction feature, for example, exploits this duality.
@@ -274,7 +274,7 @@
     </p>
     <br>
 
-    <h2><a id="streams_processing_guarantee" href="#streams_processing_guarantee">Processing Guarantees</a></h2>
+    <h2 class="anchor-heading"><a id="streams_processing_guarantee" class="anchor-link"></a><a href="#streams_processing_guarantee">Processing Guarantees</a></h2>
 
     <p>
         In stream processing, one of the most frequently asked question is "does my stream processing system guarantee that each record is processed once and only once, even if some failures are encountered in the middle of processing?"
@@ -304,7 +304,7 @@
         For more information, see the <a href="/{{version}}/documentation/streams/developer-guide/config-streams.html">Kafka Streams Configs</a> section.
     </p>
 
-    <h3><a id="streams_out_of_ordering" href="#streams_out_of_ordering">Out-of-Order Handling</a></h3>
+    <h3 class="anchor-heading"><a id="streams_out_of_ordering" class="anchor-link"></a><a href="#streams_out_of_ordering">Out-of-Order Handling</a></h3>
 
     <p>
         Besides the guarantee that each record will be processed exactly-once, another issue that many stream processing application will face is how to
@@ -343,10 +343,10 @@
 
 <!--#include virtual="../../includes/_header.htm" -->
 <!--#include virtual="../../includes/_top.htm" -->
-<div class="content documentation documentation--current">
+<div class="content documentation ">
     <!--#include virtual="../../includes/_nav.htm" -->
     <div class="right">
-        <!--#include virtual="../../includes/_docs_banner.htm" -->
+        <!--//#include virtual="../../includes/_docs_banner.htm" -->
         <ul class="breadcrumbs">
             <li><a href="/documentation">Documentation</a></li>
             <li><a href="/documentation/streams">Kafka Streams</a></li>
diff --git a/docs/streams/developer-guide/app-reset-tool.html b/docs/streams/developer-guide/app-reset-tool.html
index e875a23..1b90af0 100644
--- a/docs/streams/developer-guide/app-reset-tool.html
+++ b/docs/streams/developer-guide/app-reset-tool.html
@@ -77,11 +77,10 @@
         <div class="section" id="step-1-run-the-application-reset-tool">
             <h2>Step 1: Run the application reset tool<a class="headerlink" href="#step-1-run-the-application-reset-tool" title="Permalink to this headline"></a></h2>
             <p>Invoke the application reset tool from the command line</p>
-            <div class="highlight-bash"><div class="highlight"><pre><span></span>&lt;path-to-kafka&gt;/bin/kafka-streams-application-reset
-</pre></div>
+            <div class="highlight-bash"><div class="highlight"><pre><span></span><code>&lt;path-to-kafka&gt;/bin/kafka-streams-application-reset</code></pre></div>
             </div>
             <p>The tool accepts the following parameters:</p>
-            <div class="highlight-bash"><div class="highlight"><pre><span></span>Option <span class="o">(</span>* <span class="o">=</span> required<span class="o">)</span>                 Description
+            <div class="highlight-bash"><div class="highlight"><pre><span>Option</span><code> <span class="o">(</span>* <span class="o">=</span> required<span class="o">)</span>                 Description
 ---------------------                 -----------
 * --application-id &lt;String: id&gt;       The Kafka Streams application ID
                                         <span class="o">(</span>application.id<span class="o">)</span>.
@@ -120,8 +119,7 @@
                                         directly.
 --force                               Force removing members of the consumer group
                                       (intended to remove left-over members if
-                                      long session timeout was configured).
-</pre></div>
+                                      long session timeout was configured).</code></pre></div>
             </div>
             <p>Consider the following as reset-offset scenarios for <code>input-topics</code>:</p>
             <ul>
@@ -161,10 +159,10 @@
 
 <!--#include virtual="../../../includes/_header.htm" -->
 <!--#include virtual="../../../includes/_top.htm" -->
-<div class="content documentation documentation--current">
+<div class="content documentation ">
   <!--#include virtual="../../../includes/_nav.htm" -->
   <div class="right">
-    <!--#include virtual="../../../includes/_docs_banner.htm" -->
+    <!--//#include virtual="../../../includes/_docs_banner.htm" -->
     <ul class="breadcrumbs">
       <li><a href="/documentation">Documentation</a></li>
       <li><a href="/documentation/streams">Kafka Streams</a></li>
diff --git a/docs/streams/developer-guide/config-streams.html b/docs/streams/developer-guide/config-streams.html
index a1acc96..6395644 100644
--- a/docs/streams/developer-guide/config-streams.html
+++ b/docs/streams/developer-guide/config-streams.html
@@ -47,8 +47,7 @@
 <span class="n">settings</span><span class="o">.</span><span class="na">put</span><span class="o">(</span><span class="n">StreamsConfig</span><span class="o">.</span><span class="na">APPLICATION_ID_CONFIG</span><span class="o">,</span> <span class="s">&quot;my-first-streams-application&quot;</span><span class="o">);</span>
 <span class="n">settings</span><span class="o">.</span><span class="na">put</span><span class="o">(</span><span class="n">StreamsConfig</span><span class="o">.</span><span class="na">BOOTSTRAP_SERVERS_CONFIG</span><span class="o">,</span> <span class="s">&quot;kafka-broker1:9092&quot;</span><span class="o">);</span>
 <span class="c1">// Any further settings</span>
-<span class="n">settings</span><span class="o">.</span><span class="na">put</span><span class="o">(...</span> <span class="o">,</span> <span class="o">...);</span>
-</pre></div>
+<span class="n">settings</span><span class="o">.</span><span class="na">put</span><span class="o">(...</span> <span class="o">,</span> <span class="o">...);</span></code></pre></div>
         </div>
       </li>
     </ol>
@@ -355,9 +354,9 @@
           <blockquote>
             <div>
               <p>
-              The maximum acceptable lag (total number of offsets to catch up from the changelog) for an instance to be considered caught-up and able to receive an active task. Streams will only assign
-              stateful active tasks to instances whose state stores are within the acceptable recovery lag, if any exist, and assign warmup replicas to restore state in the background for instances
-              that are not yet caught up. Should correspond to a recovery time of well under a minute for a given workload. Must be at least 0.
+                The maximum acceptable lag (total number of offsets to catch up from the changelog) for an instance to be considered caught-up and able to receive an active task. Streams will only assign
+                stateful active tasks to instances whose state stores are within the acceptable recovery lag, if any exist, and assign warmup replicas to restore state in the background for instances
+                that are not yet caught up. Should correspond to a recovery time of well under a minute for a given workload. Must be at least 0.
               </p>
               <p>
                 Note: if you set this to <code>Long.MAX_VALUE</code> it effectively disables the warmup replicas and task high availability, allowing Streams to immediately produce a balanced
@@ -390,8 +389,7 @@
                 The drawback of this approach is that "manual" writes are side effects that are invisible to the Kafka Streams runtime library,
                 so they do not benefit from the end-to-end processing guarantees of the Streams API:</p>
 
-              <pre class="brush: java;">
-              public class SendToDeadLetterQueueExceptionHandler implements DeserializationExceptionHandler {
+              <pre class="line-numbers"><code class="language-java">              public class SendToDeadLetterQueueExceptionHandler implements DeserializationExceptionHandler {
                   KafkaProducer&lt;byte[], byte[]&gt; dlqProducer;
                   String dlqTopic;
 
@@ -415,8 +413,7 @@
                       dlqProducer = .. // get a producer from the configs map
                       dlqTopic = .. // get the topic name from the configs map
                   }
-              }
-              </pre>
+              }</code></pre>
 
             </div></blockquote>
         </div>
@@ -427,10 +424,10 @@
               such as attempting to produce a record that is too large. By default, Kafka provides and uses the <a class="reference external" href="/{{version}}/javadoc/org/apache/kafka/streams/errors/DefaultProductionExceptionHandler.html">DefaultProductionExceptionHandler</a>
               that always fails when these exceptions occur.</p>
 
-            <p>Each exception handler can return a <code>FAIL</code> or <code>CONTINUE</code> depending on the record and the exception thrown. Returning <code>FAIL</code> will signal that Streams should shut down and <code>CONTINUE</code> will signal that Streams
-            should ignore the issue and continue processing. If you want to provide an exception handler that always ignores records that are too large, you could implement something like the following:</p>
+              <p>Each exception handler can return a <code>FAIL</code> or <code>CONTINUE</code> depending on the record and the exception thrown. Returning <code>FAIL</code> will signal that Streams should shut down and <code>CONTINUE</code> will signal that Streams
+                should ignore the issue and continue processing. If you want to provide an exception handler that always ignores records that are too large, you could implement something like the following:</p>
 
-            <pre class="brush: java;">
+            <pre class="line-numbers"><code class="language-java">
             import java.util.Properties;
             import org.apache.kafka.streams.StreamsConfig;
             import org.apache.kafka.common.errors.RecordTooLargeException;
@@ -455,7 +452,7 @@
             // other various kafka streams settings, e.g. bootstrap servers, application id, etc
 
             settings.put(StreamsConfig.DEFAULT_PRODUCTION_EXCEPTION_HANDLER_CLASS_CONFIG,
-                         IgnoreRecordTooLargeHandler.class);</pre></div>
+                         IgnoreRecordTooLargeHandler.class);</code></pre></div>
           </blockquote>
         </div>
         <div class="section" id="timestamp-extractor">
@@ -569,7 +566,7 @@
                 <li>Whenever data is read from or written to a <em>Kafka topic</em> (e.g., via the <code class="docutils literal"><span class="pre">StreamsBuilder#stream()</span></code> and <code class="docutils literal"><span class="pre">KStream#to()</span></code> methods).</li>
                 <li>Whenever data is read from or written to a <em>state store</em>.</li>
               </ul>
-                <p>This is discussed in more detail in <a class="reference internal" href="datatypes.html#streams-developer-guide-serdes"><span class="std std-ref">Data types and serialization</span></a>.</p>
+              <p>This is discussed in more detail in <a class="reference internal" href="datatypes.html#streams-developer-guide-serdes"><span class="std std-ref">Data types and serialization</span></a>.</p>
             </div></blockquote>
         </div>
         <div class="section" id="default-windowed-key-serde-inner">
@@ -581,7 +578,7 @@
                   <li>Whenever data is read from or written to a <em>Kafka topic</em> (e.g., via the <code class="docutils literal"><span class="pre">StreamsBuilder#stream()</span></code> and <code class="docutils literal"><span class="pre">KStream#to()</span></code> methods).</li>
                   <li>Whenever data is read from or written to a <em>state store</em>.</li>
                 </ul>
-                  <p>This is discussed in more detail in <a class="reference internal" href="datatypes.html#streams-developer-guide-serdes"><span class="std std-ref">Data types and serialization</span></a>.</p>
+                <p>This is discussed in more detail in <a class="reference internal" href="datatypes.html#streams-developer-guide-serdes"><span class="std std-ref">Data types and serialization</span></a>.</p>
                 </div>
             </div></blockquote>
         </div>
@@ -629,11 +626,11 @@
             </div>
           </blockquote>
         </div>
-          <div class="admonition note">
-              <p class="first admonition-title">Note</p>
-              <p class="last">If you enable <cite>n</cite> standby tasks, you need to provision <cite>n+1</cite> <code class="docutils literal"><span class="pre">KafkaStreams</span></code>
-              instances.</p>
-          </div>
+        <div class="admonition note">
+          <p class="first admonition-title">Note</p>
+          <p class="last">If you enable <cite>n</cite> standby tasks, you need to provision <cite>n+1</cite> <code class="docutils literal"><span class="pre">KafkaStreams</span></code>
+            instances.</p>
+        </div>
         <div class="section" id="num-stream-threads">
           <h4><a class="toc-backref" href="#id11">num.stream.threads</a><a class="headerlink" href="#num-stream-threads" title="Permalink to this headline"></a></h4>
           <blockquote>
@@ -664,22 +661,22 @@
           <span id="streams-developer-guide-processing-guarantee"></span><h4><a class="toc-backref" href="#id25">processing.guarantee</a><a class="headerlink" href="#processing-guarantee" title="Permalink to this headline"></a></h4>
           <blockquote>
             <div>The processing guarantee that should be used.
-                 Possible values are <code class="docutils literal"><span class="pre">"at_least_once"</span></code> (default),
-                 <code class="docutils literal"><span class="pre">"exactly_once"</span></code>,
-                 and <code class="docutils literal"><span class="pre">"exactly_once_beta"</span></code>.
-                 Using <code class="docutils literal"><span class="pre">"exactly_once"</span></code> requires broker
-                 version 0.11.0 or newer, while using <code class="docutils literal"><span class="pre">"exactly_once_beta"</span></code>
-                 requires broker version 2.5 or newer.
-                 Note that if exactly-once processing is enabled, the default for parameter
-                 <code class="docutils literal"><span class="pre">commit.interval.ms</span></code> changes to 100ms.
-                 Additionally, consumers are configured with <code class="docutils literal"><span class="pre">isolation.level="read_committed"</span></code>
-                 and producers are configured with <code class="docutils literal"><span class="pre">enable.idempotence=true</span></code> per default.
-                 Note that by default exactly-once processing requires a cluster of at least three brokers what is the recommended setting for production.
-                 For development, you can change this configuration by adjusting broker setting
-                 <code class="docutils literal"><span class="pre">transaction.state.log.replication.factor</span></code>
-                 and <code class="docutils literal"><span class="pre">transaction.state.log.min.isr</span></code>
-                 to the number of brokers you want to use.
-                 For more details see <a href="../core-concepts#streams_processing_guarantee">Processing Guarantees</a>.
+              Possible values are <code class="docutils literal"><span class="pre">"at_least_once"</span></code> (default),
+              <code class="docutils literal"><span class="pre">"exactly_once"</span></code>,
+              and <code class="docutils literal"><span class="pre">"exactly_once_beta"</span></code>.
+              Using <code class="docutils literal"><span class="pre">"exactly_once"</span></code> requires broker
+              version 0.11.0 or newer, while using <code class="docutils literal"><span class="pre">"exactly_once_beta"</span></code>
+              requires broker version 2.5 or newer.
+              Note that if exactly-once processing is enabled, the default for parameter
+              <code class="docutils literal"><span class="pre">commit.interval.ms</span></code> changes to 100ms.
+              Additionally, consumers are configured with <code class="docutils literal"><span class="pre">isolation.level="read_committed"</span></code>
+              and producers are configured with <code class="docutils literal"><span class="pre">enable.idempotence=true</span></code> per default.
+              Note that by default exactly-once processing requires a cluster of at least three brokers what is the recommended setting for production.
+              For development, you can change this configuration by adjusting broker setting
+              <code class="docutils literal"><span class="pre">transaction.state.log.replication.factor</span></code>
+              and <code class="docutils literal"><span class="pre">transaction.state.log.min.isr</span></code>
+              to the number of brokers you want to use.
+              For more details see <a href="../core-concepts#streams_processing_guarantee">Processing Guarantees</a>.
             </div></blockquote>
         </div>
         <div class="section" id="replication-factor">
@@ -729,79 +726,79 @@
                     <span class="n">Properties</span> <span class="n">streamsSettings</span> <span class="o">=</span> <span class="k">new</span> <span class="n">Properties</span><span class="o">();</span>
                     <span class="n">streamsConfig</span><span class="o">.</span><span class="na">put</span><span class="o">(</span><span class="n">StreamsConfig</span><span class="o">.</span><span class="na">ROCKSDB_CONFIG_SETTER_CLASS_CONFIG</span><span class="o">,</span> <span class="n">CustomRocksDBConfig</span><span class="o">.</span><span class="na">class</span><span class="o">);</span>
                     </pre></div>
-                    </div>
-                    <dl class="docutils">
-                      <dt>Notes for example:</dt>
-                      <dd><ol class="first last arabic simple">
-                        <li><code class="docutils literal"><span class="pre">BlockBasedTableConfig tableConfig = (BlockBasedTableConfig) options.tableFormatConfig();</span></code> Get a reference to the existing table config rather than create a new one, so you don't accidentally overwrite defaults such as the <code class="docutils literal"><span class="pre">BloomFilter</span></code>, which is an important optimization.
-                        <li><code class="docutils literal"><span class="pre">tableConfig.setBlockSize(16</span> <span class="pre">*</span> <span class="pre">1024L);</span></code> Modify the default <a class="reference external" href="https://github.com/apache/kafka/blob/2.3/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBStore.java#L79">block size</a> per these instructions from the <a class="reference external" href="https://github.com/facebook/rocksdb/wiki/Memory- [...]
-                        <li><code class="docutils literal"><span class="pre">tableConfig.setCacheIndexAndFilterBlocks(true);</span></code> Do not let the index and filter blocks grow unbounded. For more information, see the <a class="reference external" href="https://github.com/facebook/rocksdb/wiki/Block-Cache#caching-index-and-filter-blocks">RocksDB GitHub</a>.</li>
-                        <li><code class="docutils literal"><span class="pre">options.setMaxWriteBufferNumber(2);</span></code> See the advanced options in the <a class="reference external" href="https://github.com/facebook/rocksdb/blob/8dee8cad9ee6b70fd6e1a5989a8156650a70c04f/include/rocksdb/advanced_options.h#L103">RocksDB GitHub</a>.</li>
-                        <li><code class="docutils literal"><span class="pre">cache.close();</span></code> To avoid memory leaks, you must close any objects you constructed that extend org.rocksdb.RocksObject. See  <a class="reference external" href="https://github.com/facebook/rocksdb/wiki/RocksJava-Basics#memory-management">RocksJava docs</a> for more details.</li>
-                      </ol>
-                      </dd>
-                    </dl>
-                  </div></blockquote>
               </div>
-            </div>
-          </blockquote>
-        </div>
-        <div class="section" id="state-dir">
-          <h4><a class="toc-backref" href="#id14">state.dir</a><a class="headerlink" href="#state-dir" title="Permalink to this headline"></a></h4>
-          <blockquote>
-            <div>The state directory. Kafka Streams persists local states under the state directory. Each application has a subdirectory on its hosting
-              machine that is located under the state directory. The name of the subdirectory is the application ID. The state stores associated
-              with the application are created under this subdirectory. When running multiple instances of the same application on a single machine,
-              this path must be unique for each such instance.</div>
-          </blockquote>
-        </div>
-        <div class="section" id="topology-optimization">
-          <h4><a class="toc-backref" href="#id31">topology.optimization</a><a class="headerlink" href="#topology-optimization" title="Permalink to this headline"></a></h4>
-          <blockquote>
-            <div>
-              <p>
-                You can tell Streams to apply topology optimizations by setting this config. The optimizations are currently all or none and disabled by default.
-                These optimizations include moving/reducing repartition topics and reusing the source topic as the changelog for source KTables. It is recommended to enable this.
-              </p>
-              <p>
-                Note that as of 2.3, you need to do two things to enable optimizations. In addition to setting this config to <code>StreamsConfig.OPTIMIZE</code>, you'll need to pass in your
-                configuration properties when building your topology by using the overloaded <code>StreamsBuilder.build(Properties)</code> method.
-                For example <code>KafkaStreams myStream = new KafkaStreams(streamsBuilder.build(properties), properties)</code>.
-              </p>
-          </div></blockquote>
-        </div>
-        <div class="section" id="upgrade-from">
-          <h4><a class="toc-backref" href="#id14">upgrade.from</a><a class="headerlink" href="#upgrade-from" title="Permalink to this headline"></a></h4>
-          <blockquote>
-            <div>
-              The version you are upgrading from. It is important to set this config when performing a rolling upgrade to certain versions, as described in the upgrade guide.
-              You should set this config to the appropriate version before bouncing your instances and upgrading them to the newer version. Once everyone is on the
-              newer version, you should remove this config and do a second rolling bounce. It is only necessary to set this config and follow the two-bounce upgrade path
-              when upgrading from below version 2.0, or when upgrading to 2.4+ from any version lower than 2.4.
-            </div>
-          </blockquote>
+              <dl class="docutils">
+                <dt>Notes for example:</dt>
+                <dd><ol class="first last arabic simple">
+                  <li><code class="docutils literal"><span class="pre">BlockBasedTableConfig tableConfig = (BlockBasedTableConfig) options.tableFormatConfig();</span></code> Get a reference to the existing table config rather than create a new one, so you don't accidentally overwrite defaults such as the <code class="docutils literal"><span class="pre">BloomFilter</span></code>, which is an important optimization.
+                  <li><code class="docutils literal"><span class="pre">tableConfig.setBlockSize(16</span> <span class="pre">*</span> <span class="pre">1024L);</span></code> Modify the default <a class="reference external" href="https://github.com/apache/kafka/blob/2.3/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBStore.java#L79">block size</a> per these instructions from the <a class="reference external" href="https://github.com/facebook/rocksdb/wiki/Memory-usage- [...]
+                  <li><code class="docutils literal"><span class="pre">tableConfig.setCacheIndexAndFilterBlocks(true);</span></code> Do not let the index and filter blocks grow unbounded. For more information, see the <a class="reference external" href="https://github.com/facebook/rocksdb/wiki/Block-Cache#caching-index-and-filter-blocks">RocksDB GitHub</a>.</li>
+                  <li><code class="docutils literal"><span class="pre">options.setMaxWriteBufferNumber(2);</span></code> See the advanced options in the <a class="reference external" href="https://github.com/facebook/rocksdb/blob/8dee8cad9ee6b70fd6e1a5989a8156650a70c04f/include/rocksdb/advanced_options.h#L103">RocksDB GitHub</a>.</li>
+                  <li><code class="docutils literal"><span class="pre">cache.close();</span></code> To avoid memory leaks, you must close any objects you constructed that extend org.rocksdb.RocksObject. See  <a class="reference external" href="https://github.com/facebook/rocksdb/wiki/RocksJava-Basics#memory-management">RocksJava docs</a> for more details.</li>
+                </ol>
+                </dd>
+              </dl>
+            </div></blockquote>
         </div>
       </div>
-      <div class="section" id="kafka-consumers-and-producer-configuration-parameters">
-        <h3><a class="toc-backref" href="#id16">Kafka consumers, producer and admin client configuration parameters</a><a class="headerlink" href="#kafka-consumers-and-producer-configuration-parameters" title="Permalink to this headline"></a></h3>
-        <p>You can specify parameters for the Kafka <a class="reference external" href="/{{version}}/javadoc/org/apache/kafka/clients/consumer/package-summary.html">consumers</a>, <a class="reference external" href="/{{version}}/javadoc/org/apache/kafka/clients/producer/package-summary.html">producers</a>,
-            and <a class="reference external" href="/{{version}}/javadoc/org/apache/kafka/kafka/clients/admin/package-summary.html">admin client</a> that are used internally.
-            The consumer, producer and admin client settings are defined by specifying parameters in a <code class="docutils literal"><span class="pre">StreamsConfig</span></code> instance.</p>
-        <p>In this example, the Kafka <a class="reference external" href="/{{version}}/javadoc/org/apache/kafka/clients/consumer/ConsumerConfig.html#SESSION_TIMEOUT_MS_CONFIG">consumer session timeout</a> is configured to be 60000 milliseconds in the Streams settings:</p>
-        <div class="highlight-java"><div class="highlight"><pre><span></span><span class="n">Properties</span> <span class="n">streamsSettings</span> <span class="o">=</span> <span class="k">new</span> <span class="n">Properties</span><span class="o">();</span>
+      </blockquote>
+    </div>
+    <div class="section" id="state-dir">
+      <h4><a class="toc-backref" href="#id14">state.dir</a><a class="headerlink" href="#state-dir" title="Permalink to this headline"></a></h4>
+      <blockquote>
+        <div>The state directory. Kafka Streams persists local states under the state directory. Each application has a subdirectory on its hosting
+          machine that is located under the state directory. The name of the subdirectory is the application ID. The state stores associated
+          with the application are created under this subdirectory. When running multiple instances of the same application on a single machine,
+          this path must be unique for each such instance.</div>
+      </blockquote>
+    </div>
+    <div class="section" id="topology-optimization">
+      <h4><a class="toc-backref" href="#id31">topology.optimization</a><a class="headerlink" href="#topology-optimization" title="Permalink to this headline"></a></h4>
+      <blockquote>
+        <div>
+          <p>
+            You can tell Streams to apply topology optimizations by setting this config. The optimizations are currently all or none and disabled by default.
+            These optimizations include moving/reducing repartition topics and reusing the source topic as the changelog for source KTables. It is recommended to enable this.
+          </p>
+          <p>
+            Note that as of 2.3, you need to do two things to enable optimizations. In addition to setting this config to <code>StreamsConfig.OPTIMIZE</code>, you'll need to pass in your
+            configuration properties when building your topology by using the overloaded <code>StreamsBuilder.build(Properties)</code> method.
+            For example <code>KafkaStreams myStream = new KafkaStreams(streamsBuilder.build(properties), properties)</code>.
+          </p>
+        </div></blockquote>
+    </div>
+    <div class="section" id="upgrade-from">
+      <h4><a class="toc-backref" href="#id14">upgrade.from</a><a class="headerlink" href="#upgrade-from" title="Permalink to this headline"></a></h4>
+      <blockquote>
+        <div>
+          The version you are upgrading from. It is important to set this config when performing a rolling upgrade to certain versions, as described in the upgrade guide.
+          You should set this config to the appropriate version before bouncing your instances and upgrading them to the newer version. Once everyone is on the
+          newer version, you should remove this config and do a second rolling bounce. It is only necessary to set this config and follow the two-bounce upgrade path
+          when upgrading from below version 2.0, or when upgrading to 2.4+ from any version lower than 2.4.
+        </div>
+      </blockquote>
+    </div>
+  </div>
+  <div class="section" id="kafka-consumers-and-producer-configuration-parameters">
+    <h3><a class="toc-backref" href="#id16">Kafka consumers, producer and admin client configuration parameters</a><a class="headerlink" href="#kafka-consumers-and-producer-configuration-parameters" title="Permalink to this headline"></a></h3>
+    <p>You can specify parameters for the Kafka <a class="reference external" href="/{{version}}/javadoc/org/apache/kafka/clients/consumer/package-summary.html">consumers</a>, <a class="reference external" href="/{{version}}/javadoc/org/apache/kafka/clients/producer/package-summary.html">producers</a>,
+      and <a class="reference external" href="/{{version}}/javadoc/org/apache/kafka/kafka/clients/admin/package-summary.html">admin client</a> that are used internally.
+      The consumer, producer and admin client settings are defined by specifying parameters in a <code class="docutils literal"><span class="pre">StreamsConfig</span></code> instance.</p>
+    <p>In this example, the Kafka <a class="reference external" href="/{{version}}/javadoc/org/apache/kafka/clients/consumer/ConsumerConfig.html#SESSION_TIMEOUT_MS_CONFIG">consumer session timeout</a> is configured to be 60000 milliseconds in the Streams settings:</p>
+    <div class="highlight-java"><div class="highlight"><pre><span></span><span class="n">Properties</span> <span class="n">streamsSettings</span> <span class="o">=</span> <span class="k">new</span> <span class="n">Properties</span><span class="o">();</span>
 <span class="c1">// Example of a &quot;normal&quot; setting for Kafka Streams</span>
 <span class="n">streamsSettings</span><span class="o">.</span><span class="na">put</span><span class="o">(</span><span class="n">StreamsConfig</span><span class="o">.</span><span class="na">BOOTSTRAP_SERVERS_CONFIG</span><span class="o">,</span> <span class="s">&quot;kafka-broker-01:9092&quot;</span><span class="o">);</span>
 <span class="c1">// Customize the Kafka consumer settings of your Streams application</span>
 <span class="n">streamsSettings</span><span class="o">.</span><span class="na">put</span><span class="o">(</span><span class="n">ConsumerConfig</span><span class="o">.</span><span class="na">SESSION_TIMEOUT_MS_CONFIG</span><span class="o">,</span> <span class="mi">60000</span><span class="o">);</span>
 </pre></div>
-        </div>
-        <div class="section" id="naming">
-          <h4><a class="toc-backref" href="#id17">Naming</a><a class="headerlink" href="#naming" title="Permalink to this headline"></a></h4>
-          <p>Some consumer, producer and admin client configuration parameters use the same parameter name, and Kafka Streams library itself also uses some parameters that share the same name with its embedded client. For example, <code class="docutils literal"><span class="pre">send.buffer.bytes</span></code> and
-              <code class="docutils literal"><span class="pre">receive.buffer.bytes</span></code> are used to configure TCP buffers; <code class="docutils literal"><span class="pre">request.timeout.ms</span></code> and <code class="docutils literal"><span class="pre">retry.backoff.ms</span></code> control retries for client request;
-              <code class="docutils literal"><span class="pre">retries</span></code> are used to configure how many retries are allowed when handling retriable errors from broker request responses.
-              You can avoid duplicate names by prefix parameter names with <code class="docutils literal"><span class="pre">consumer.</span></code>, <code class="docutils literal"><span class="pre">producer.</span></code>, or <code class="docutils literal"><span class="pre">admin.</span></code> (e.g., <code class="docutils literal"><span class="pre">consumer.send.buffer.bytes</span></code> and <code class="docutils literal"><span class="pre">producer.send.buffer.bytes</span></code>).</p>
-          <div class="highlight-java"><div class="highlight"><pre><span></span><span class="n">Properties</span> <span class="n">streamsSettings</span> <span class="o">=</span> <span class="k">new</span> <span class="n">Properties</span><span class="o">();</span>
+    </div>
+    <div class="section" id="naming">
+      <h4><a class="toc-backref" href="#id17">Naming</a><a class="headerlink" href="#naming" title="Permalink to this headline"></a></h4>
+      <p>Some consumer, producer and admin client configuration parameters use the same parameter name, and Kafka Streams library itself also uses some parameters that share the same name with its embedded client. For example, <code class="docutils literal"><span class="pre">send.buffer.bytes</span></code> and
+        <code class="docutils literal"><span class="pre">receive.buffer.bytes</span></code> are used to configure TCP buffers; <code class="docutils literal"><span class="pre">request.timeout.ms</span></code> and <code class="docutils literal"><span class="pre">retry.backoff.ms</span></code> control retries for client request;
+        <code class="docutils literal"><span class="pre">retries</span></code> are used to configure how many retries are allowed when handling retriable errors from broker request responses.
+        You can avoid duplicate names by prefix parameter names with <code class="docutils literal"><span class="pre">consumer.</span></code>, <code class="docutils literal"><span class="pre">producer.</span></code>, or <code class="docutils literal"><span class="pre">admin.</span></code> (e.g., <code class="docutils literal"><span class="pre">consumer.send.buffer.bytes</span></code> and <code class="docutils literal"><span class="pre">producer.send.buffer.bytes</span></code>).</p>
+      <div class="highlight-java"><div class="highlight"><pre><span></span><span class="n">Properties</span> <span class="n">streamsSettings</span> <span class="o">=</span> <span class="k">new</span> <span class="n">Properties</span><span class="o">();</span>
 <span class="c1">// same value for consumer, producer, and admin client</span>
 <span class="n">streamsSettings</span><span class="o">.</span><span class="na">put</span><span class="o">(</span><span class="s">&quot;PARAMETER_NAME&quot;</span><span class="o">,</span> <span class="s">&quot;value&quot;</span><span class="o">);</span>
 <span class="c1">// different values for consumer and producer</span>
@@ -813,14 +810,14 @@
 <span class="n">streamsSettings</span><span class="o">.</span><span class="na">put</span><span class="o">(</span><span class="n">StreamsConfig</span><span class="o">.</span><span class="na">producerPrefix</span><span class="o">(</span><span class="s">&quot;PARAMETER_NAME&quot;</span><span class="o">),</span> <span class="s">&quot;producer-value&quot;</span><span class="o">);</span>
 <span class="n">streamsSettings</span><span class="o">.</span><span class="na">put</span><span class="o">(</span><span class="n">StreamsConfig</span><span class="o">.</span><span class="na">adminClientPrefix</span><span class="o">(</span><span class="s">&quot;PARAMETER_NAME&quot;</span><span class="o">),</span> <span class="s">&quot;admin-value&quot;</span><span class="o">);</span>
 </pre></div>
-          <p>You could further separate consumer configuration by adding different prefixes:</p>
-          <ul class="simple">
-            <li><code class="docutils literal"><span class="pre">main.consumer.</span></code> for main consumer which is the default consumer of stream source.</li>
-            <li><code class="docutils literal"><span class="pre">restore.consumer.</span></code> for restore consumer which is in charge of state store recovery.</li>
-            <li><code class="docutils literal"><span class="pre">global.consumer.</span></code> for global consumer which is used in global KTable construction.</li>
-          </ul>
-          <p>For example, if you only want to set restore consumer config without touching other consumers' settings, you could simply use <code class="docutils literal"><span class="pre">restore.consumer.</span></code> to set the config.</p>
-          <div class="highlight-java"><div class="highlight"><pre><span></span><span class="n">Properties</span> <span class="n">streamsSettings</span> <span class="o">=</span> <span class="k">new</span> <span class="n">Properties</span><span class="o">();</span>
+        <p>You could further separate consumer configuration by adding different prefixes:</p>
+        <ul class="simple">
+          <li><code class="docutils literal"><span class="pre">main.consumer.</span></code> for main consumer which is the default consumer of stream source.</li>
+          <li><code class="docutils literal"><span class="pre">restore.consumer.</span></code> for restore consumer which is in charge of state store recovery.</li>
+          <li><code class="docutils literal"><span class="pre">global.consumer.</span></code> for global consumer which is used in global KTable construction.</li>
+        </ul>
+        <p>For example, if you only want to set restore consumer config without touching other consumers' settings, you could simply use <code class="docutils literal"><span class="pre">restore.consumer.</span></code> to set the config.</p>
+        <div class="highlight-java"><div class="highlight"><pre><span></span><span class="n">Properties</span> <span class="n">streamsSettings</span> <span class="o">=</span> <span class="k">new</span> <span class="n">Properties</span><span class="o">();</span>
 <span class="c1">// same config value for all consumer types</span>
 <span class="n">streamsSettings</span><span class="o">.</span><span class="na">put</span><span class="o">(</span><span class="s">&quot;consumer.PARAMETER_NAME&quot;</span><span class="o">,</span> <span class="s">&quot;general-consumer-value&quot;</span><span class="o">);</span>
 <span class="c1">// set a different restore consumer config. This would make restore consumer take restore-consumer-value,</span>
@@ -829,103 +826,103 @@
 <span class="c1">// alternatively, you can use</span>
 <span class="n">streamsSettings</span><span class="o">.</span><span class="na">put</span><span class="o">(</span><span class="n">StreamsConfig</span><span class="o">.</span><span class="na">restoreConsumerPrefix</span><span class="o">(</span><span class="s">&quot;PARAMETER_NAME&quot;</span><span class="o">),</span> <span class="s">&quot;restore-consumer-value&quot;</span><span class="o">);</span>
 </pre></div>
-          </div>
-          <p> Same applied to <code class="docutils literal"><span class="pre">main.consumer.</span></code> and <code class="docutils literal"><span class="pre">main.consumer.</span></code>, if you only want to specify one consumer type config.</p>
-          <p> Additionally, to configure the internal repartition/changelog topics, you could use the <code class="docutils literal"><span class="pre">topic.</span></code> prefix, followed by any of the standard topic configs.</p>
-            <div class="highlight-java"><div class="highlight"><pre><span></span><span class="n">Properties</span> <span class="n">streamsSettings</span> <span class="o">=</span> <span class="k">new</span> <span class="n">Properties</span><span class="o">();</span>
+        </div>
+        <p> Same applied to <code class="docutils literal"><span class="pre">main.consumer.</span></code> and <code class="docutils literal"><span class="pre">main.consumer.</span></code>, if you only want to specify one consumer type config.</p>
+        <p> Additionally, to configure the internal repartition/changelog topics, you could use the <code class="docutils literal"><span class="pre">topic.</span></code> prefix, followed by any of the standard topic configs.</p>
+        <div class="highlight-java"><div class="highlight"><pre><span></span><span class="n">Properties</span> <span class="n">streamsSettings</span> <span class="o">=</span> <span class="k">new</span> <span class="n">Properties</span><span class="o">();</span>
 <span class="c1">// Override default for both changelog and repartition topics</span>
 <span class="n">streamsSettings</span><span class="o">.</span><span class="na">put</span><span class="o">(</span><span class="s">&quot;topic.PARAMETER_NAME&quot;</span><span class="o">,</span> <span class="s">&quot;topic-value&quot;</span><span class="o">);</span>
 <span class="c1">// alternatively, you can use</span>
 <span class="n">streamsSettings</span><span class="o">.</span><span class="na">put</span><span class="o">(</span><span class="n">StreamsConfig</span><span class="o">.</span><span class="na">topicPrefix</span><span class="o">(</span><span class="s">&quot;PARAMETER_NAME&quot;</span><span class="o">),</span> <span class="s">&quot;topic-value&quot;</span><span class="o">);</span>
 </pre></div>
-            </div>
-          </div>
-        </div>
-        <div class="section" id="default-values">
-          <h4><a class="toc-backref" href="#id18">Default Values</a><a class="headerlink" href="#default-values" title="Permalink to this headline"></a></h4>
-          <p>Kafka Streams uses different default values for some of the underlying client configs, which are summarized below. For detailed descriptions
-            of these configs, see <a class="reference external" href="http://kafka.apache.org/0100/documentation.html#producerconfigs">Producer Configs</a>
-            and <a class="reference external" href="http://kafka.apache.org/0100/documentation.html#newconsumerconfigs">Consumer Configs</a>.</p>
-          <table border="1" class="non-scrolling-table docutils">
-            <thead valign="bottom">
-            <tr class="row-odd"><th class="head">Parameter Name</th>
-              <th class="head">Corresponding Client</th>
-              <th class="head">Streams Default</th>
-            </tr>
-            </thead>
-            <tbody valign="top">
-            <tr class="row-even"><td>auto.offset.reset</td>
-              <td>Consumer</td>
-              <td>earliest</td>
-            </tr>
-            <tr class="row-even"><td>linger.ms</td>
-              <td>Producer</td>
-              <td>100</td>
-            </tr>
-            <tr class="row-odd"><td>max.poll.interval.ms</td>
-              <td>Consumer</td>
-              <td>Integer.MAX_VALUE</td>
-            </tr>
-            <tr class="row-even"><td>max.poll.records</td>
-              <td>Consumer</td>
-              <td>1000</td>
-            </tr>
-            </tbody>
-          </table>
-        </div>
-        <div class="section" id="parameters-controlled-by-kafka-streams">
-          <h3><a class="toc-backref" href="#id26">Parameters controlled by Kafka Streams</a><a class="headerlink" href="#parameters-controlled-by-kafka-streams" title="Permalink to this headline"></a></h3>
-          <p>Kafka Streams assigns the following configuration parameters. If you try to change
-            <code class="docutils literal"><span class="pre">allow.auto.create.topics</span></code>, your value
-            is ignored and setting it has no effect in a Kafka Streams application. You can set the other parameters.
-            Kafka Streams sets them to different default values than a plain
-            <code class="docutils literal"><span class="pre">KafkaConsumer</span></code>.
-          <p>Kafka Streams uses the <code class="docutils literal"><span class="pre">client.id</span></code>
-            parameter to compute derived client IDs for internal clients. If you don't set
-            <code class="docutils literal"><span class="pre">client.id</span></code>, Kafka Streams sets it to
-            <code class="docutils literal"><span class="pre">&lt;application.id&gt;-&lt;random-UUID&gt;</span></code>.
-            <table border="1" class="non-scrolling-table docutils">
-              <colgroup>
-              <col width="50%">
-              <col width="19%">
-              <col width="31%">
-              </colgroup>
-              <thead valign="bottom">
-              <tr class="row-odd"><th class="head">Parameter Name</th>
-              <th class="head">Corresponding Client</th>
-              <th class="head">Streams Default</th>
-              </tr>
-              </thead>
-              <tbody valign="top">
-              <tr class="row-odd"><td>allow.auto.create.topics</td>
-              <td>Consumer</td>
-              <td>false</td>
-              </tr>
-              <tr class="row-even"><td>auto.offset.reset</td>
-              <td>Consumer</td>
-              <td>earliest</td>
-              </tr>
-              <tr class="row-odd"><td>linger.ms</td>
-              <td>Producer</td>
-              <td>100</td>
-              </tr>
-              <tr class="row-even"><td>max.poll.interval.ms</td>
-              <td>Consumer</td>
-              <td>300000</td>
-              </tr>
-              <tr class="row-odd"><td>max.poll.records</td>
-              <td>Consumer</td>
-              <td>1000</td>
-              </tr>
-              </tbody>
-              </table>
-        <div class="section" id="enable-auto-commit">
-          <span id="streams-developer-guide-consumer-auto-commit"></span><h4><a class="toc-backref" href="#id19">enable.auto.commit</a><a class="headerlink" href="#enable-auto-commit" title="Permalink to this headline"></a></h4>
-          <blockquote>
-            <div>The consumer auto commit. To guarantee at-least-once processing semantics and turn off auto commits, Kafka Streams overrides this consumer config
-              value to <code class="docutils literal"><span class="pre">false</span></code>.  Consumers will only commit explicitly via <em>commitSync</em> calls when the Kafka Streams library or a user decides
-              to commit the current processing state.</div></blockquote>
         </div>
+      </div>
+    </div>
+    <div class="section" id="default-values">
+      <h4><a class="toc-backref" href="#id18">Default Values</a><a class="headerlink" href="#default-values" title="Permalink to this headline"></a></h4>
+      <p>Kafka Streams uses different default values for some of the underlying client configs, which are summarized below. For detailed descriptions
+        of these configs, see <a class="reference external" href="http://kafka.apache.org/0100/documentation.html#producerconfigs">Producer Configs</a>
+        and <a class="reference external" href="http://kafka.apache.org/0100/documentation.html#newconsumerconfigs">Consumer Configs</a>.</p>
+      <table border="1" class="non-scrolling-table docutils">
+        <thead valign="bottom">
+        <tr class="row-odd"><th class="head">Parameter Name</th>
+          <th class="head">Corresponding Client</th>
+          <th class="head">Streams Default</th>
+        </tr>
+        </thead>
+        <tbody valign="top">
+        <tr class="row-even"><td>auto.offset.reset</td>
+          <td>Consumer</td>
+          <td>earliest</td>
+        </tr>
+        <tr class="row-even"><td>linger.ms</td>
+          <td>Producer</td>
+          <td>100</td>
+        </tr>
+        <tr class="row-odd"><td>max.poll.interval.ms</td>
+          <td>Consumer</td>
+          <td>Integer.MAX_VALUE</td>
+        </tr>
+        <tr class="row-even"><td>max.poll.records</td>
+          <td>Consumer</td>
+          <td>1000</td>
+        </tr>
+        </tbody>
+      </table>
+    </div>
+    <div class="section" id="parameters-controlled-by-kafka-streams">
+      <h3><a class="toc-backref" href="#id26">Parameters controlled by Kafka Streams</a><a class="headerlink" href="#parameters-controlled-by-kafka-streams" title="Permalink to this headline"></a></h3>
+      <p>Kafka Streams assigns the following configuration parameters. If you try to change
+        <code class="docutils literal"><span class="pre">allow.auto.create.topics</span></code>, your value
+        is ignored and setting it has no effect in a Kafka Streams application. You can set the other parameters.
+        Kafka Streams sets them to different default values than a plain
+        <code class="docutils literal"><span class="pre">KafkaConsumer</span></code>.
+      <p>Kafka Streams uses the <code class="docutils literal"><span class="pre">client.id</span></code>
+        parameter to compute derived client IDs for internal clients. If you don't set
+        <code class="docutils literal"><span class="pre">client.id</span></code>, Kafka Streams sets it to
+        <code class="docutils literal"><span class="pre">&lt;application.id&gt;-&lt;random-UUID&gt;</span></code>.
+      <table border="1" class="non-scrolling-table docutils">
+        <colgroup>
+          <col width="50%">
+          <col width="19%">
+          <col width="31%">
+        </colgroup>
+        <thead valign="bottom">
+        <tr class="row-odd"><th class="head">Parameter Name</th>
+          <th class="head">Corresponding Client</th>
+          <th class="head">Streams Default</th>
+        </tr>
+        </thead>
+        <tbody valign="top">
+        <tr class="row-odd"><td>allow.auto.create.topics</td>
+          <td>Consumer</td>
+          <td>false</td>
+        </tr>
+        <tr class="row-even"><td>auto.offset.reset</td>
+          <td>Consumer</td>
+          <td>earliest</td>
+        </tr>
+        <tr class="row-odd"><td>linger.ms</td>
+          <td>Producer</td>
+          <td>100</td>
+        </tr>
+        <tr class="row-even"><td>max.poll.interval.ms</td>
+          <td>Consumer</td>
+          <td>300000</td>
+        </tr>
+        <tr class="row-odd"><td>max.poll.records</td>
+          <td>Consumer</td>
+          <td>1000</td>
+        </tr>
+        </tbody>
+      </table>
+      <div class="section" id="enable-auto-commit">
+        <span id="streams-developer-guide-consumer-auto-commit"></span><h4><a class="toc-backref" href="#id19">enable.auto.commit</a><a class="headerlink" href="#enable-auto-commit" title="Permalink to this headline"></a></h4>
+        <blockquote>
+          <div>The consumer auto commit. To guarantee at-least-once processing semantics and turn off auto commits, Kafka Streams overrides this consumer config
+            value to <code class="docutils literal"><span class="pre">false</span></code>.  Consumers will only commit explicitly via <em>commitSync</em> calls when the Kafka Streams library or a user decides
+            to commit the current processing state.</div></blockquote>
+      </div>
       <div class="section" id="recommended-configuration-parameters-for-resiliency">
         <h3><a class="toc-backref" href="#id21">Recommended configuration parameters for resiliency</a><a class="headerlink" href="#recommended-configuration-parameters-for-resiliency" title="Permalink to this headline"></a></h3>
         <p>There are several Kafka and Kafka Streams configuration options that need to be configured explicitly for resiliency in face of broker failures:</p>
@@ -978,13 +975,12 @@
           <div class="highlight-java"><div class="highlight"><pre><span></span><span class="n">Properties</span> <span class="n">streamsSettings</span> <span class="o">=</span> <span class="k">new</span> <span class="n">Properties</span><span class="o">();</span>
 <span class="n">streamsSettings</span><span class="o">.</span><span class="na">put</span><span class="o">(</span><span class="n">StreamsConfig</span><span class="o">.</span><span class="na">REPLICATION_FACTOR_CONFIG</span><span class="o">,</span> <span class="mi">3</span><span class="o">);</span>
 <span class="n">streamsSettings</span><span class="o">.</span><span class="na">put</span><span class="o">(</span><span class="n">StreamsConfig</span><span class="o">.</span><span class="na">topicPrefix</span><span class="o">(</span><span class="n">TopicConfig</span><span class="o">.</span><span class="na">MIN_IN_SYNC_REPLICAS_CONFIG</span><span class="o">),</span> <span class="mi">2</span><span class="o">);</span>
-<span class="n">streamsSettings</span><span class="o">.</span><span class="na">put</span><span class="o">(</span><span class="n">StreamsConfig</span><span class="o">.</span><span class="na">producerPrefix</span><span class="o">(</span><span class="n">ProducerConfig</span><span class="o">.</span><span class="na">ACKS_CONFIG</span><span class="o">),</span> <span class="s">&quot;all&quot;</span><span class="o">);</span>
-</pre></div>
+<span class="n">streamsSettings</span><span class="o">.</span><span class="na">put</span><span class="o">(</span><span class="n">StreamsConfig</span><span class="o">.</span><span class="na">producerPrefix</span><span class="o">(</span><span class="n">ProducerConfig</span><span class="o">.</span><span class="na">ACKS_CONFIG</span><span class="o">),</span> <span class="s">&quot;all&quot;</span><span class="o">);</span></code></pre></div>
           </div>
-</div>
-</div>
-</div>
-</div>
+        </div>
+      </div>
+    </div>
+  </div>
 
 
                </div>
@@ -997,10 +993,10 @@
 
                 <!--#include virtual="../../../includes/_header.htm" -->
                 <!--#include virtual="../../../includes/_top.htm" -->
-                    <div class="content documentation documentation--current">
+                    <div class="content documentation">
                     <!--#include virtual="../../../includes/_nav.htm" -->
                     <div class="right">
-                    <!--#include virtual="../../../includes/_docs_banner.htm" -->
+                    <!--//#include virtual="../../../includes/_docs_banner.htm" -->
                     <ul class="breadcrumbs">
                     <li><a href="/documentation">Documentation</a></li>
                     <li><a href="/documentation/streams">Kafka Streams</a></li>
diff --git a/docs/streams/developer-guide/datatypes.html b/docs/streams/developer-guide/datatypes.html
index 95aa43a..e6930bc 100644
--- a/docs/streams/developer-guide/datatypes.html
+++ b/docs/streams/developer-guide/datatypes.html
@@ -62,8 +62,7 @@
 <span class="c1">// Default serde for keys of data records (here: built-in serde for String type)</span>
 <span class="n">settings</span><span class="o">.</span><span class="na">put</span><span class="o">(</span><span class="n">StreamsConfig</span><span class="o">.</span><span class="na">DEFAULT_KEY_SERDE_CLASS_CONFIG</span><span class="o">,</span> <span class="n">Serdes</span><span class="o">.</span><span class="na">String</span><span class="o">().</span><span class="na">getClass</span><span class="o">().</span><span class="na">getName</span><span class="o">());</span>
 <span class="c1">// Default serde for values of data records (here: built-in serde for Long type)</span>
-<span class="n">settings</span><span class="o">.</span><span class="na">put</span><span class="o">(</span><span class="n">StreamsConfig</span><span class="o">.</span><span class="na">DEFAULT_VALUE_SERDE_CLASS_CONFIG</span><span class="o">,</span> <span class="n">Serdes</span><span class="o">.</span><span class="na">Long</span><span class="o">().</span><span class="na">getClass</span><span class="o">().</span><span class="na">getName</span><span class="o">());</span>
-</pre></div>
+<span class="n">settings</span><span class="o">.</span><span class="na">put</span><span class="o">(</span><span class="n">StreamsConfig</span><span class="o">.</span><span class="na">DEFAULT_VALUE_SERDE_CLASS_CONFIG</span><span class="o">,</span> <span class="n">Serdes</span><span class="o">.</span><span class="na">Long</span><span class="o">().</span><span class="na">getClass</span><span class="o">().</span><span class="na">getName</span><span class="o">());</span></code></pre></div>
       </div>
     </div>
     <div class="section" id="overriding-default-serdes">
@@ -78,8 +77,7 @@
 <span class="c1">// The stream userCountByRegion has type `String` for record keys (for region)</span>
 <span class="c1">// and type `Long` for record values (for user counts).</span>
 <span class="n">KStream</span><span class="o">&lt;</span><span class="n">String</span><span class="o">,</span> <span class="n">Long</span><span class="o">&gt;</span> <span class="n">userCountByRegion</span> <span class="o">=</span> <span class="o">...;</span>
-<span class="n">userCountByRegion</span><span class="o">.</span><span class="na">to</span><span class="o">(</span><span class="s">&quot;RegionCountsTopic&quot;</span><span class="o">,</span> <span class="n">Produced</span><span class="o">.</span><span class="na">with</span><span class="o">(</span><span class="n">stringSerde</span><span class="o">,</span> <span class="n">longSerde</span><span class="o">));</span>
-</pre></div>
+<span class="n">userCountByRegion</span><span class="o">.</span><span class="na">to</span><span class="o">(</span><span class="s">&quot;RegionCountsTopic&quot;</span><span class="o">,</span> <span class="n">Produced</span><span class="o">.</span><span class="na">with</span><span class="o">(</span><span class="n">stringSerde</span><span class="o">,</span> <span class="n">longSerde</span><span class="o">));</span></code></pre></div>
       </div>
       <p>If you want to override serdes selectively, i.e., keep the defaults for some fields, then don&#8217;t specify the serde whenever you want to leverage the default settings:</p>
       <div class="highlight-java"><div class="highlight"><pre><span></span><span class="kn">import</span> <span class="nn">org.apache.kafka.common.serialization.Serde</span><span class="o">;</span>
@@ -89,8 +87,7 @@
 <span class="c1">// but override the default serializer for record values (here: userCount as Long).</span>
 <span class="kd">final</span> <span class="n">Serde</span><span class="o">&lt;</span><span class="n">Long</span><span class="o">&gt;</span> <span class="n">longSerde</span> <span class="o">=</span> <span class="n">Serdes</span><span class="o">.</span><span class="na">Long</span><span class="o">();</span>
 <span class="n">KStream</span><span class="o">&lt;</span><span class="n">String</span><span class="o">,</span> <span class="n">Long</span><span class="o">&gt;</span> <span class="n">userCountByRegion</span> <span class="o">=</span> <span class="o">...;</span>
-<span class="n">userCountByRegion</span><span class="o">.</span><span class="na">to</span><span class="o">(</span><span class="s">&quot;RegionCountsTopic&quot;</span><span class="o">,</span> <span class="n">Produced</span><span class="o">.</span><span class="na">valueSerde</span><span class="o">(</span><span class="n">Serdes</span><span class="o">.</span><span class="na">Long</span><span class="o">()));</span>
-</pre></div>
+<span class="n">userCountByRegion</span><span class="o">.</span><span class="na">to</span><span class="o">(</span><span class="s">&quot;RegionCountsTopic&quot;</span><span class="o">,</span> <span class="n">Produced</span><span class="o">.</span><span class="na">valueSerde</span><span class="o">(</span><span class="n">Serdes</span><span class="o">.</span><span class="na">Long</span><span class="o">()));</span></code></pre></div>
       </div>
       <p>If some of your incoming records are corrupted or ill-formatted, they will cause the deserializer class to report an error.
          Since 1.0.x we have introduced an <code>DeserializationExceptionHandler</code> interface which allows
@@ -104,12 +101,11 @@
         <h3>Primitive and basic types<a class="headerlink" href="#primitive-and-basic-types" title="Permalink to this headline"></a></h3>
         <p>Apache Kafka includes several built-in serde implementations for Java primitives and basic types such as <code class="docutils literal"><span class="pre">byte[]</span></code> in
           its <code class="docutils literal"><span class="pre">kafka-clients</span></code> Maven artifact:</p>
-        <div class="highlight-xml"><div class="highlight"><pre><span></span><span class="nt">&lt;dependency&gt;</span>
+        <div class="highlight-xml"><div class="highlight"><pre><code><span class="nt">&lt;dependency&gt;</span>
     <span class="nt">&lt;groupId&gt;</span>org.apache.kafka<span class="nt">&lt;/groupId&gt;</span>
     <span class="nt">&lt;artifactId&gt;</span>kafka-clients<span class="nt">&lt;/artifactId&gt;</span>
     <span class="nt">&lt;version&gt;</span>{{fullDotVersion}}<span class="nt">&lt;/version&gt;</span>
-<span class="nt">&lt;/dependency&gt;</span>
-</pre></div>
+<span class="nt">&lt;/dependency&gt;</span></code></pre></div>
         </div>
         <p>This artifact provides the following serde implementations under the package <a class="reference external" href="https://github.com/apache/kafka/blob/{{dotVersion}}/clients/src/main/java/org/apache/kafka/common/serialization">org.apache.kafka.common.serialization</a>, which you can leverage when e.g., defining default serializers in your Streams configuration.</p>
         <table border="1" class="docutils">
@@ -200,10 +196,10 @@
 
 <!--#include virtual="../../../includes/_header.htm" -->
 <!--#include virtual="../../../includes/_top.htm" -->
-<div class="content documentation documentation--current">
+<div class="content documentation ">
   <!--#include virtual="../../../includes/_nav.htm" -->
   <div class="right">
-    <!--#include virtual="../../../includes/_docs_banner.htm" -->
+    <!--//#include virtual="../../../includes/_docs_banner.htm" -->
     <ul class="breadcrumbs">
       <li><a href="/documentation">Documentation</a></li>
       <li><a href="/documentation/streams">Kafka Streams</a></li>
diff --git a/docs/streams/developer-guide/dsl-api.html b/docs/streams/developer-guide/dsl-api.html
index 2eac368..33648f3 100644
--- a/docs/streams/developer-guide/dsl-api.html
+++ b/docs/streams/developer-guide/dsl-api.html
@@ -110,7 +110,7 @@
         </div>
 
         <div class="section" id="dsl-core-constructs-overview">
-            <h4><a id="streams_concepts_kstream" href="#streams_concepts_kstream">KStream</a></h4>
+            <h4 class="anchor-heading"><a id="streams_concepts_kstream" class="anchor-link"></a><a href="#streams_concepts_kstream">KStream</a></h4>
 
             <p>
                 Only the <strong>Kafka Streams DSL</strong> has the notion of a <code>KStream</code>.
@@ -133,7 +133,7 @@
                 which would return <code>3</code> for <code>alice</code>.
             </p>
 
-            <h4><a id="streams_concepts_ktable" href="#streams_concepts_ktable">KTable</a></h4>
+            <h4 class="anchor-heading"><a id="streams_concepts_ktable" class="anchor-link"></a><a href="#streams_concepts_ktable">KTable</a></h4>
 
             <p>
                 Only the <strong>Kafka Streams DSL</strong> has the notion of a <code>KTable</code>.
@@ -172,7 +172,7 @@
                 KTable also provides an ability to look up <em>current</em> values of data records by keys. This table-lookup functionality is available through <strong>join operations</strong> (see also <strong>Joining</strong> in the Developer Guide) as well as through <strong>Interactive Queries</strong>.
             </p>
 
-            <h4><a id="streams_concepts_globalktable" href="#streams_concepts_globalktable">GlobalKTable</a></h4>
+            <h4 class="anchor-heading"><a id="streams_concepts_globalktable" class="anchor-link"></a><a href="#streams_concepts_globalktable">GlobalKTable</a></h4>
 
             <p>Only the <strong>Kafka Streams DSL</strong> has the notion of a <strong>GlobalKTable</strong>.</p>
 
@@ -242,7 +242,7 @@
                         <p>In the case of a KStream, the local KStream instance of every application instance will
                             be populated with data from only <strong>a subset</strong> of the partitions of the input topic.  Collectively, across
                             all application instances, all input topic partitions are read and processed.</p>
-                        <div class="highlight-java"><div class="highlight"><pre><span></span><span class="kn">import</span> <span class="nn">org.apache.kafka.common.serialization.Serdes</span><span class="o">;</span>
+                        <div class="highlight-java"><div class="highlight"><pre><code><span></span><span class="kn">import</span> <span class="nn">org.apache.kafka.common.serialization.Serdes</span><span class="o">;</span>
 <span class="kn">import</span> <span class="nn">org.apache.kafka.streams.StreamsBuilder</span><span class="o">;</span>
 <span class="kn">import</span> <span class="nn">org.apache.kafka.streams.kstream.KStream</span><span class="o">;</span>
 
@@ -253,8 +253,7 @@
     <span class="n">Consumed</span><span class="o">.</span><span class="na">with</span><span class="o">(</span>
       <span class="n">Serdes</span><span class="o">.</span><span class="na">String</span><span class="o">(),</span> <span class="cm">/* key serde */</span>
       <span class="n">Serdes</span><span class="o">.</span><span class="na">Long</span><span class="o">()</span>   <span class="cm">/* value serde */</span>
-    <span class="o">);</span>
-</pre></div>
+    <span class="o">);</span></code></pre></div>
                         </div>
                         <p>If you do not specify SerDes explicitly, the default SerDes from the
                             <a class="reference internal" href="config-streams.html#streams-developer-guide-configuration"><span class="std std-ref">configuration</span></a> are used.</p>
@@ -304,7 +303,7 @@
                             <a class="reference internal" href="../architecture.html#streams_architecture_state"><span class="std std-ref">state store</span></a> that backs the table).  This is required for
                             supporting <a class="reference internal" href="interactive-queries.html#streams-developer-guide-interactive-queries"><span class="std std-ref">interactive queries</span></a> against the table. When a
                             name is not provided the table will not be queryable and an internal name will be provided for the state store.</p>
-                        <div class="highlight-java"><div class="highlight"><pre><span></span><span class="kn">import</span> <span class="nn">org.apache.kafka.common.serialization.Serdes</span><span class="o">;</span>
+                        <div class="highlight-java"><div class="highlight"><pre><code><span></span><span class="kn">import</span> <span class="nn">org.apache.kafka.common.serialization.Serdes</span><span class="o">;</span>
 <span class="kn">import</span> <span class="nn">org.apache.kafka.streams.StreamsBuilder</span><span class="o">;</span>
 <span class="kn">import</span> <span class="nn">org.apache.kafka.streams.kstream.GlobalKTable</span><span class="o">;</span>
 
@@ -316,8 +315,7 @@
       <span class="s">&quot;word-counts-global-store&quot;</span> <span class="cm">/* table/store name */</span><span class="o">)</span>
       <span class="o">.</span><span class="na">withKeySerde</span><span class="o">(</span><span class="n">Serdes</span><span class="o">.</span><span class="na">String</span><span class="o">())</span> <span class="cm">/* key serde */</span>
       <span class="o">.</span><span class="na">withValueSerde</span><span class="o">(</span><span class="n">Serdes</span><span class="o">.</span><span class="na">Long</span><span class="o">())</span> <span class="cm">/* value serde */</span>
-    <span class="o">);</span>
-</pre></div>
+    <span class="o">);</span></code></pre></div>
                         </div>
                         <p>You <strong>must specify SerDes explicitly</strong> if the key or value types of the records in the Kafka input
                             topics do not match the configured default SerDes. For information about configuring default SerDes, available
@@ -384,8 +382,7 @@
 <span class="c1">// KStream branches[1] contains all records whose keys start with &quot;B&quot;</span>
 <span class="c1">// KStream branches[2] contains all other records</span>
 
-<span class="c1">// Java 7 example: cf. `filter` for how to create `Predicate` instances</span>
-</pre></div>
+<span class="c1">// Java 7 example: cf. `filter` for how to create `Predicate` instances</span></code></pre></div>
                             </div>
                         </td>
                     </tr>
@@ -411,8 +408,7 @@
       <span class="kd">public</span> <span class="kt">boolean</span> <span class="nf">test</span><span class="o">(</span><span class="n">String</span> <span class="n">key</span><span class="o">,</span> <span class="n">Long</span> <span class="n">value</span><span class="o">)</span> <span class="o">{</span>
         <span class="k">return</span> <span class="n">value</span> <span class="o">&gt;</span> <span class="mi">0</span><span class="o">;</span>
       <span class="o">}</span>
-    <span class="o">});</span>
-</pre></div>
+    <span class="o">});</span></code></pre></div>
                             </div>
                         </td>
                     </tr>
@@ -425,7 +421,7 @@
                         <td><p class="first">Evaluates a boolean function for each element and drops those for which the function returns true.
                             (<a class="reference external" href="/{{version}}/javadoc/org/apache/kafka/streams/kstream/KStream.html#filterNot-org.apache.kafka.streams.kstream.Predicate-">KStream details</a>,
                             <a class="reference external" href="/{{version}}/javadoc/org/apache/kafka/streams/kstream/KTable.html#filterNot-org.apache.kafka.streams.kstream.Predicate-">KTable details</a>)</p>
-                            <div class="last highlight-java"><div class="highlight"><pre><span></span><span class="n">KStream</span><span class="o">&lt;</span><span class="n">String</span><span class="o">,</span> <span class="n">Long</span><span class="o">&gt;</span> <span class="n">stream</span> <span class="o">=</span> <span class="o">...;</span>
+                            <div class="last highlight-java"><div class="highlight"><pre><code><span></span><span class="n">KStream</span><span class="o">&lt;</span><span class="n">String</span><span class="o">,</span> <span class="n">Long</span><span class="o">&gt;</span> <span class="n">stream</span> <span class="o">=</span> <span class="o">...;</span>
 
 <span class="c1">// An inverse filter that discards any negative numbers or zero</span>
 <span class="c1">// Java 8+ example, using lambda expressions</span>
@@ -438,8 +434,7 @@
       <span class="kd">public</span> <span class="kt">boolean</span> <span class="nf">test</span><span class="o">(</span><span class="n">String</span> <span class="n">key</span><span class="o">,</span> <span class="n">Long</span> <span class="n">value</span><span class="o">)</span> <span class="o">{</span>
         <span class="k">return</span> <span class="n">value</span> <span class="o">&lt;=</span> <span class="mi">0</span><span class="o">;</span>
       <span class="o">}</span>
-    <span class="o">});</span>
-</pre></div>
+    <span class="o">});</span></code></pre></div>
                             </div>
                         </td>
                     </tr>
@@ -467,8 +462,7 @@
     <span class="o">}</span>
   <span class="o">);</span>
 
-<span class="c1">// Java 7 example: cf. `map` for how to create `KeyValueMapper` instances</span>
-</pre></div>
+<span class="c1">// Java 7 example: cf. `map` for how to create `KeyValueMapper` instances</span></code></pre></div>
                             </div>
                         </td>
                     </tr>
@@ -486,8 +480,7 @@
 <span class="n">KStream</span><span class="o">&lt;</span><span class="kt">byte</span><span class="o">[],</span> <span class="n">String</span><span class="o">&gt;</span> <span class="n">sentences</span> <span class="o">=</span> <span class="o">...;</span>
 <span class="n">KStream</span><span class="o">&lt;</span><span class="kt">byte</span><span class="o">[],</span> <span class="n">String</span><span class="o">&gt;</span> <span class="n">words</span> <span class="o">=</span> <span class="n">sentences</span><span class="o">.</span><span class="na">flatMapValues</span><span class="o">(</span><span class="n">value</span> <span class="o">-&gt;</span> <span class="n">Arrays</span><span class="o">.</span><span class="na">asList</span><span class [...]
 
-<span class="c1">// Java 7 example: cf. `mapValues` for how to create `ValueMapper` instances</span>
-</pre></div>
+<span class="c1">// Java 7 example: cf. `mapValues` for how to create `ValueMapper` instances</span></code></pre></div>
                             </div>
                         </td>
                     </tr>
@@ -504,7 +497,7 @@
                                 <em>further processing</em> of the input data (unlike <code class="docutils literal"><span class="pre">peek</span></code>, which is not a terminal operation).</p>
                             <p><strong>Note on processing guarantees:</strong> Any side effects of an action (such as writing to external systems) are not
                                 trackable by Kafka, which means they will typically not benefit from  Kafka&#8217;s processing guarantees.</p>
-                            <div class="last highlight-java"><div class="highlight"><pre><span></span><span class="n">KStream</span><span class="o">&lt;</span><span class="n">String</span><span class="o">,</span> <span class="n">Long</span><span class="o">&gt;</span> <span class="n">stream</span> <span class="o">=</span> <span class="o">...;</span>
+                            <div class="last highlight-java"><div class="highlight"><pre><code><span></span><span class="n">KStream</span><span class="o">&lt;</span><span class="n">String</span><span class="o">,</span> <span class="n">Long</span><span class="o">&gt;</span> <span class="n">stream</span> <span class="o">=</span> <span class="o">...;</span>
 
 <span class="c1">// Print the contents of the KStream to the local console.</span>
 <span class="c1">// Java 8+ example, using lambda expressions</span>
@@ -517,8 +510,7 @@
       <span class="kd">public</span> <span class="kt">void</span> <span class="nf">apply</span><span class="o">(</span><span class="n">String</span> <span class="n">key</span><span class="o">,</span> <span class="n">Long</span> <span class="n">value</span><span class="o">)</span> <span class="o">{</span>
         <span class="n">System</span><span class="o">.</span><span class="na">out</span><span class="o">.</span><span class="na">println</span><span class="o">(</span><span class="n">key</span> <span class="o">+</span> <span class="s">&quot; =&gt; &quot;</span> <span class="o">+</span> <span class="n">value</span><span class="o">);</span>
       <span class="o">}</span>
-    <span class="o">});</span>
-</pre></div>
+    <span class="o">});</span></code></pre></div>
                             </div>
                         </td>
                     </tr>
@@ -559,8 +551,7 @@
     <span class="n">Grouped</span><span class="o">.</span><span class="na">with</span><span class="o">(</span>
       <span class="n">Serdes</span><span class="o">.</span><span class="na">ByteArray</span><span class="o">(),</span> <span class="cm">/* key */</span>
       <span class="n">Serdes</span><span class="o">.</span><span class="na">String</span><span class="o">())</span>     <span class="cm">/* value */</span>
-  <span class="o">);</span>
-</pre></div>
+  <span class="o">);</span></code></pre></div>
                             </div>
                         </td>
                     </tr>
@@ -639,8 +630,7 @@
     <span class="n">Grouped</span><span class="o">.</span><span class="na">with</span><span class="o">(</span>
       <span class="n">Serdes</span><span class="o">.</span><span class="na">String</span><span class="o">(),</span> <span class="cm">/* key (note: type was modified) */</span>
       <span class="n">Serdes</span><span class="o">.</span><span class="na">Integer</span><span class="o">())</span> <span class="cm">/* value (note: type was modified) */</span>
-  <span class="o">);</span>
-                            </pre></div>
+  <span class="o">);</span></code></pre></div>
                             </div>
                         </td>
                     </tr>
@@ -667,8 +657,7 @@
 
 <span class="n">KTable</span><span class="o">&lt;</span><span class="kt">byte</span><span class="o">[],</span> <span class="n">String</span><span class="o">&gt;</span> <span class="n">table</span> <span class="o">=</span> <span class="n">cogroupedStream</span><span class="o">.</span><span class="na">aggregate</span><span class="o">(initializer);</span>
 
-<span class="n">KTable</span><span class="o">&lt;</span><span class="kt">byte</span><span class="o">[],</span> <span class="n">String</span><span class="o">&gt;</span> <span class="n">table2</span> <span class="o">=</span> <span class="n">cogroupedStream</span><span class="o">.</span><span class="na">windowedBy</span><span class="o">(TimeWindows.duration(500ms))</span>.</span><span class="na">aggregate</span><span class="o">(initializer);</span>
-</pre></div>
+<span class="n">KTable</span><span class="o">&lt;</span><span class="kt">byte</span><span class="o">[],</span> <span class="n">String</span><span class="o">&gt;</span> <span class="n">table2</span> <span class="o">=</span> <span class="n">cogroupedStream</span><span class="o">.</span><span class="na">windowedBy</span><span class="o">(TimeWindows.duration(500ms))</span>.</span><span class="na">aggregate</span><span class="o">(initializer);</span></code></pre></div>
                             </div>
                         </td>
                     </tr>
@@ -697,8 +686,7 @@
       <span class="kd">public</span> <span class="n">KeyValue</span><span class="o">&lt;</span><span class="n">String</span><span class="o">,</span> <span class="n">Integer</span><span class="o">&gt;</span> <span class="nf">apply</span><span class="o">(</span><span class="kt">byte</span><span class="o">[]</span> <span class="n">key</span><span class="o">,</span> <span class="n">String</span> <span class="n">value</span><span class="o">)</span> <span class="o">{</span>
         <span class="k">return</span> <span class="k">new</span> <span class="n">KeyValue</span><span class="o">&lt;&gt;(</span><span class="n">value</span><span class="o">.</span><span class="na">toLowerCase</span><span class="o">(),</span> <span class="n">value</span><span class="o">.</span><span class="na">length</span><span class="o">());</span>
       <span class="o">}</span>
-    <span class="o">});</span>
-</pre></div>
+    <span class="o">});</span></code></pre></div>
                             </div>
                         </td>
                     </tr>
@@ -726,8 +714,7 @@
       <span class="kd">public</span> <span class="n">String</span> <span class="nf">apply</span><span class="o">(</span><span class="n">String</span> <span class="n">s</span><span class="o">)</span> <span class="o">{</span>
         <span class="k">return</span> <span class="n">s</span><span class="o">.</span><span class="na">toUpperCase</span><span class="o">();</span>
       <span class="o">}</span>
-    <span class="o">});</span>
-</pre></div>
+    <span class="o">});</span></code></pre></div>
                             </div>
                         </td>
                     </tr>
@@ -743,13 +730,11 @@
                             from different streams in the merged stream. Relative order is preserved within each input stream though (ie, records within the same input stream are processed in order)</p>
                             <div class="last highlight-java">
                               <div class="highlight">
-                                <pre>
-<span class="n">KStream</span><span class="o">&lt;</span><span class="kt">byte</span><span class="o">[],</span> <span class="n">String</span><span class="o">&gt;</span> <span class="n">stream1</span> <span class="o">=</span> <span class="o">...;</span>
+                                <pre class="line-numbers"><code class="language-text"><span class="n">KStream</span><span class="o">&lt;</span><span class="kt">byte</span><span class="o">[],</span> <span class="n">String</span><span class="o">&gt;</span> <span class="n">stream1</span> <span class="o">=</span> <span class="o">...;</span>
 
 <span class="n">KStream</span><span class="o">&lt;</span><span class="kt">byte</span><span class="o">[],</span> <span class="n">String</span><span class="o">&gt;</span> <span class="n">stream2</span> <span class="o">=</span> <span class="o">...;</span>
 
-<span class="n">KStream</span><span class="o">&lt;</span><span class="kt">byte</span><span class="o">[],</span> <span class="n">String</span><span class="o">&gt;</span> <span class="n">merged</span> <span class="o">=</span> <span class="n">stream1</span><span class="o">.</span><span class="na">merge</span><span class="o">(</span><span class="n">stream2</span><span class="o">);</span>
-                                </pre>
+<span class="n">KStream</span><span class="o">&lt;</span><span class="kt">byte</span><span class="o">[],</span> <span class="n">String</span><span class="o">&gt;</span> <span class="n">merged</span> <span class="o">=</span> <span class="n">stream1</span><span class="o">.</span><span class="na">merge</span><span class="o">(</span><span class="n">stream2</span><span class="o">);</span></code></pre>
                               </div>
                             </div>
                         </td>
@@ -780,8 +765,7 @@
       <span class="kd">public</span> <span class="kt">void</span> <span class="nf">apply</span><span class="o">(</span><span class="kt">byte</span><span class="o">[]</span> <span class="n">key</span><span class="o">,</span> <span class="n">String</span> <span class="n">value</span><span class="o">)</span> <span class="o">{</span>
         <span class="n">System</span><span class="o">.</span><span class="na">out</span><span class="o">.</span><span class="na">println</span><span class="o">(</span><span class="s">&quot;key=&quot;</span> <span class="o">+</span> <span class="n">key</span> <span class="o">+</span> <span class="s">&quot;, value=&quot;</span> <span class="o">+</span> <span class="n">value</span><span class="o">);</span>
       <span class="o">}</span>
-    <span class="o">});</span>
-</pre></div>
+    <span class="o">});</span></code></pre></div>
                             </div>
                         </td>
                     </tr>
@@ -800,8 +784,7 @@
 <span class="n">stream</span><span class="o">.</span><span class="na">print</span><span class="o">();</span>
 
 <span class="c1">// print to file with a custom label</span>
-<span class="n">stream</span><span class="o">.</span><span class="na">print</span><span class="o">(</span><span class="n">Printed</span><span class="o">.</span><span class="na">toFile</span><span class="o">(</span><span class="s">&quot;streams.out&quot;</span><span class="o">).</span><span class="na">withLabel</span><span class="o">(</span><span class="s">&quot;streams&quot;</span><span class="o">));</span>
-</pre></div>
+<span class="n">stream</span><span class="o">.</span><span class="na">print</span><span class="o">(</span><span class="n">Printed</span><span class="o">.</span><span class="na">toFile</span><span class="o">(</span><span class="s">&quot;streams.out&quot;</span><span class="o">).</span><span class="na">withLabel</span><span class="o">(</span><span class="s">&quot;streams&quot;</span><span class="o">));</span></code></pre></div>
                             </div>
                         </td>
                     </tr>
@@ -828,8 +811,7 @@
       <span class="kd">public</span> <span class="n">String</span> <span class="nf">apply</span><span class="o">(</span><span class="kt">byte</span><span class="o">[]</span> <span class="n">key</span><span class="o">,</span> <span class="n">String</span> <span class="n">value</span><span class="o">)</span> <span class="o">{</span>
         <span class="k">return</span> <span class="n">value</span><span class="o">.</span><span class="na">split</span><span class="o">(</span><span class="s">&quot; &quot;</span><span class="o">)[</span><span class="mi">0</span><span class="o">];</span>
       <span class="o">}</span>
-    <span class="o">});</span>
-</pre></div>
+    <span class="o">});</span></code></pre></div>
                             </div>
                         </td>
                     </tr>
@@ -844,8 +826,7 @@
 
 <span class="c1">// Also, a variant of `toStream` exists that allows you</span>
 <span class="c1">// to select a new key for the resulting stream.</span>
-<span class="n">KStream</span><span class="o">&lt;</span><span class="kt">byte</span><span class="o">[],</span> <span class="n">String</span><span class="o">&gt;</span> <span class="n">stream</span> <span class="o">=</span> <span class="n">table</span><span class="o">.</span><span class="na">toStream</span><span class="o">();</span>
-</pre></div>
+<span class="n">KStream</span><span class="o">&lt;</span><span class="kt">byte</span><span class="o">[],</span> <span class="n">String</span><span class="o">&gt;</span> <span class="n">stream</span> <span class="o">=</span> <span class="n">table</span><span class="o">.</span><span class="na">toStream</span><span class="o">();</span></code></pre></div>
                             </div>
                         </td>
                     </tr>
@@ -858,8 +839,7 @@
                             (<a class="reference external" href="/{{version}}/javadoc/org/apache/kafka/streams/kstream/KStream.html#toTable--">details</a>)</p>
                             <div class="last highlight-java"><div class="highlight"><pre><span></span><span class="n">KStream</span><span class="o">&lt;</span><span class="kt">byte</span><span class="o">[],</span> <span class="n">String</span><span class="o">&gt;</span> <span class="n">stream</span> <span class="o">=</span> <span class="o">...;</span>
 
-<span class="n">KTable</span><span class="o">&lt;</span><span class="kt">byte</span><span class="o">[],</span> <span class="n">String</span><span class="o">&gt;</span> <span class="n">table</span> <span class="o">=</span> <span class="n">stream</span><span class="o">.</span><span class="na">toTable</span><span class="o">();</span>
-</pre></div>
+<span class="n">KTable</span><span class="o">&lt;</span><span class="kt">byte</span><span class="o">[],</span> <span class="n">String</span><span class="o">&gt;</span> <span class="n">table</span> <span class="o">=</span> <span class="n">stream</span><span class="o">.</span><span class="na">toTable</span><span class="o">();</span></code></pre></div>
                             </div>
                         </td>
                     </tr>
@@ -878,7 +858,7 @@
                             <code><span class="pre">repartition()</span></code> operation always triggers repartitioning of the stream, as a result it can be used with embedded Processor API methods (like <code><span class="pre">transform()</span></code> et al.) that do not trigger auto repartitioning when key changing operation is performed beforehand.
 
                             <div class="last highlight-java"><div class="highlight"><pre><span></span><span class="n">KStream</span><span class="o">&lt;</span><span class="kt">byte</span><span class="o">[],</span> <span class="n">String</span><span class="o">&gt;</span> <span class="n">stream</span> <span class="o">=</span> <span class="o">... ;</span>
-<span class="n">KStream</span><span class="o">&lt;</span><span class="kt">byte</span><span class="o">[],</span> <span class="n">String</span><span class="o">&gt;</span> <span class="n">repartitionedStream</span> <span class="o">=</span> <span class="n">stream</span><span class="o">.</span><span class="na">repartition</span><span class="o">(</span><span class="n">Repartitioned</span><span class="o">.</span><span class="na">numberOfPartitions</span><span class="o">(</span><span class="s">1 [...]
+<span class="n">KStream</span><span class="o">&lt;</span><span class="kt">byte</span><span class="o">[],</span> <span class="n">String</span><span class="o">&gt;</span> <span class="n">repartitionedStream</span> <span class="o">=</span> <span class="n">stream</span><span class="o">.</span><span class="na">repartition</span><span class="o">(</span><span class="n">Repartitioned</span><span class="o">.</span><span class="na">numberOfPartitions</span><span class="o">(</span><span class="s">1 [...]
                             </div>
                         </td>
                     </tr>
@@ -925,8 +905,7 @@
     <span class="c1">// `KTable&lt;String, Long&gt;` (word -&gt; count).</span>
     <span class="o">.</span><span class="na">count</span><span class="o">()</span>
     <span class="c1">// Convert the `KTable&lt;String, Long&gt;` into a `KStream&lt;String, Long&gt;`.</span>
-    <span class="o">.</span><span class="na">toStream</span><span class="o">();</span>
-</pre></div>
+    <span class="o">.</span><span class="na">toStream</span><span class="o">();</span></code></pre></div>
                 </div>
                 <p>WordCount example in Java 7:</p>
                 <div class="highlight-java"><div class="highlight"><pre><span></span><span class="c1">// Code below is equivalent to the previous Java 8+ example above.</span>
@@ -946,8 +925,7 @@
         <span class="o">}</span>
     <span class="o">})</span>
     <span class="o">.</span><span class="na">count</span><span class="o">()</span>
-    <span class="o">.</span><span class="na">toStream</span><span class="o">();</span>
-</pre></div>
+    <span class="o">.</span><span class="na">toStream</span><span class="o">();</span></code></pre></div>
                 </div>
                 <div class="section" id="aggregating">
                     <span id="streams-developer-guide-dsl-aggregating"></span><h4><a class="toc-backref" href="#id12">Aggregating</a><a class="headerlink" href="#aggregating" title="Permalink to this headline"></a></h4>
@@ -1046,8 +1024,7 @@
       <span class="o">}</span>
     <span class="o">},</span>
     <span class="n">Materialized</span><span class="o">.</span><span class="na">as</span><span class="o">(</span><span class="s">&quot;aggregated-stream-store&quot;</span><span class="o">)</span>
-        <span class="o">.</span><span class="na">withValueSerde</span><span class="o">(</span><span class="n">Serdes</span><span class="o">.</span><span class="na">Long</span><span class="o">());</span>
-</pre></div>
+        <span class="o">.</span><span class="na">withValueSerde</span><span class="o">(</span><span class="n">Serdes</span><span class="o">.</span><span class="na">Long</span><span class="o">());</span></code></pre></div>
                                 </div>
                                 <p>Detailed behavior of <code class="docutils literal"><span class="pre">KGroupedStream</span></code>:</p>
                                 <ul class="simple">
@@ -1162,8 +1139,7 @@
             <span class="o">}</span>
         <span class="o">},</span>
         <span class="n">Materialized</span><span class="o">.&lt;</span><span class="n">String</span><span class="o">,</span> <span class="n">Long</span><span class="o">,</span> <span class="n">SessionStore</span><span class="o">&lt;</span><span class="n">Bytes</span><span class="o">,</span> <span class="kt">byte</span><span class="o">[]&gt;&gt;</span><span class="n">as</span><span class="o">(</span><span class="s">&quot;sessionized-aggregated-stream-store&quot;</span><span class="o">)</span>
-          <span class="o">.</span><span class="na">withValueSerde</span><span class="o">(</span><span class="n">Serdes</span><span class="o">.</span><span class="na">Long</span><span class="o">()));</span>
-</pre></div>
+          <span class="o">.</span><span class="na">withValueSerde</span><span class="o">(</span><span class="n">Serdes</span><span class="o">.</span><span class="na">Long</span><span class="o">()));</span></code></pre></div>
                                 </div>
                                 <p>Detailed behavior:</p>
                                 <ul class="simple">
@@ -1195,8 +1171,7 @@
 <span class="n">KTable</span><span class="o">&lt;</span><span class="n">String</span><span class="o">,</span> <span class="n">Long</span><span class="o">&gt;</span> <span class="n">aggregatedStream</span> <span class="o">=</span> <span class="n">groupedStream</span><span class="o">.</span><span class="na">count</span><span class="o">();</span>
 
 <span class="c1">// Counting a KGroupedTable</span>
-<span class="n">KTable</span><span class="o">&lt;</span><span class="n">String</span><span class="o">,</span> <span class="n">Long</span><span class="o">&gt;</span> <span class="n">aggregatedTable</span> <span class="o">=</span> <span class="n">groupedTable</span><span class="o">.</span><span class="na">count</span><span class="o">();</span>
-</pre></div>
+<span class="n">KTable</span><span class="o">&lt;</span><span class="n">String</span><span class="o">,</span> <span class="n">Long</span><span class="o">&gt;</span> <span class="n">aggregatedTable</span> <span class="o">=</span> <span class="n">groupedTable</span><span class="o">.</span><span class="na">count</span><span class="o">();</span></code></pre></div>
                                 </div>
                                 <p>Detailed behavior for <code class="docutils literal"><span class="pre">KGroupedStream</span></code>:</p>
                                 <ul class="simple">
@@ -1237,8 +1212,7 @@
 <span class="c1">// Counting a KGroupedStream with session-based windowing (here: with 5-minute inactivity gaps)</span>
 <span class="n">KTable</span><span class="o">&lt;</span><span class="n">Windowed</span><span class="o">&lt;</span><span class="n">String</span><span class="o">&gt;,</span> <span class="n">Long</span><span class="o">&gt;</span> <span class="n">aggregatedStream</span> <span class="o">=</span> <span class="n">groupedStream</span><span class="o">.</span><span class="na">windowedBy</span><span class="o">(</span>
     <span class="n">SessionWindows</span><span class="o">.</span><span class="na">with</span><span class="o">(</span><span class="n">Duration</span><span class="o">.</span><span class="na">ofMinutes</span><span class="o">(</span><span class="mi">5</span><span class="o">)))</span> <span class="cm">/* session window */</span>
-    <span class="o">.</span><span class="na">count</span><span class="o">();</span>
-</pre></div>
+    <span class="o">.</span><span class="na">count</span><span class="o">();</span></code></pre></div>
                                 </div>
                                 <p>Detailed behavior:</p>
                                 <ul class="last simple">
@@ -1300,8 +1274,7 @@
       <span class="kd">public</span> <span class="n">Long</span> <span class="nf">apply</span><span class="o">(</span><span class="n">Long</span> <span class="n">aggValue</span><span class="o">,</span> <span class="n">Long</span> <span class="n">oldValue</span><span class="o">)</span> <span class="o">{</span>
         <span class="k">return</span> <span class="n">aggValue</span> <span class="o">-</span> <span class="n">oldValue</span><span class="o">;</span>
       <span class="o">}</span>
-    <span class="o">});</span>
-</pre></div>
+    <span class="o">});</span></code></pre></div>
                                 </div>
                                 <p>Detailed behavior for <code class="docutils literal"><span class="pre">KGroupedStream</span></code>:</p>
                                 <ul class="simple">
@@ -1393,8 +1366,7 @@
       <span class="kd">public</span> <span class="n">Long</span> <span class="nf">apply</span><span class="o">(</span><span class="n">Long</span> <span class="n">aggValue</span><span class="o">,</span> <span class="n">Long</span> <span class="n">newValue</span><span class="o">)</span> <span class="o">{</span>
         <span class="k">return</span> <span class="n">aggValue</span> <span class="o">+</span> <span class="n">newValue</span><span class="o">;</span>
       <span class="o">}</span>
-    <span class="o">});</span>
-</pre></div>
+    <span class="o">});</span></code></pre></div>
                                 </div>
                                 <p>Detailed behavior:</p>
                                 <ul class="simple">
@@ -1426,8 +1398,7 @@
     <span class="o">(</span><span class="n">aggKey</span><span class="o">,</span> <span class="n">newValue</span><span class="o">,</span> <span class="n">aggValue</span><span class="o">)</span> <span class="o">-&gt;</span> <span class="n">aggValue</span> <span class="o">+</span> <span class="n">newValue</span><span class="o">,</span> <span class="cm">/* adder */</span>
     <span class="n">Materialized</span><span class="o">.&lt;</span><span class="n">String</span><span class="o">,</span> <span class="n">Long</span><span class="o">,</span> <span class="n">KeyValueStore</span><span class="o">&lt;</span><span class="n">Bytes</span><span class="o">,</span> <span class="kt">byte</span><span class="o">[]&gt;</span><span class="n">as</span><span class="o">(</span><span class="s">&quot;aggregated-stream-store&quot;</span> <span class="cm">/* state store name * [...]
       <span class="o">.</span><span class="na">withKeySerde</span><span class="o">(</span><span class="n">Serdes</span><span class="o">.</span><span class="na">String</span><span class="o">())</span> <span class="cm">/* key serde */</span>
-      <span class="o">.</span><span class="na">withValueSerde</span><span class="o">(</span><span class="n">Serdes</span><span class="o">.</span><span class="na">Integer</span><span class="o">());</span> <span class="cm">/* serde for aggregate value */</span>
-</pre></div>
+      <span class="o">.</span><span class="na">withValueSerde</span><span class="o">(</span><span class="n">Serdes</span><span class="o">.</span><span class="na">Integer</span><span class="o">());</span> <span class="cm">/* serde for aggregate value */</span></code></pre></div>
                     </div>
                     <div class="admonition note">
                         <p><b>Note</b></p>
@@ -1548,8 +1519,7 @@
     <span class="o">(</span><span class="n">aggKey</span><span class="o">,</span> <span class="n">oldValue</span><span class="o">,</span> <span class="n">aggValue</span><span class="o">)</span> <span class="o">-&gt;</span> <span class="n">aggValue</span> <span class="o">-</span> <span class="n">oldValue</span><span class="o">,</span> <span class="cm">/* subtractor */</span>
     <span class="n">Materialized</span><span class="o">.&lt;</span><span class="n">String</span><span class="o">,</span> <span class="n">Long</span><span class="o">,</span> <span class="n">KeyValueStore</span><span class="o">&lt;</span><span class="n">Bytes</span><span class="o">,</span> <span class="kt">byte</span><span class="o">[]&gt;</span><span class="n">as</span><span class="o">(</span><span class="s">&quot;aggregated-table-store&quot;</span> <span class="cm">/* state store name */ [...]
       <span class="o">.</span><span class="na">withKeySerde</span><span class="o">(</span><span class="n">Serdes</span><span class="o">.</span><span class="na">String</span><span class="o">())</span> <span class="cm">/* key serde */</span>
-      <span class="o">.</span><span class="na">withValueSerde</span><span class="o">(</span><span class="n">Serdes</span><span class="o">.</span><span class="na">Integer</span><span class="o">());</span> <span class="cm">/* serde for aggregate value */</span>
-</pre></div>
+      <span class="o">.</span><span class="na">withValueSerde</span><span class="o">(</span><span class="n">Serdes</span><span class="o">.</span><span class="na">Integer</span><span class="o">());</span> <span class="cm">/* serde for aggregate value */</span></code></pre></div>
                     </div>
                     <div class="admonition note">
                         <p><b>Note</b></p>
@@ -1815,15 +1785,14 @@
                             produce a join output <em>for each</em> matching record on the other side, and there can be <em>multiple</em> such matching records
                             in a given join window (cf. the row with timestamp 15 in the join semantics table below, for example).</p>
                         <p>Join output records are effectively created as follows, leveraging the user-supplied <code class="docutils literal"><span class="pre">ValueJoiner</span></code>:</p>
-                        <div class="highlight-java"><div class="highlight"><pre><span></span><span class="n">KeyValue</span><span class="o">&lt;</span><span class="n">K</span><span class="o">,</span> <span class="n">LV</span><span class="o">&gt;</span> <span class="n">leftRecord</span> <span class="o">=</span> <span class="o">...;</span>
+                        <div class="highlight-java"><div class="highlight"><pre><code><span></span><span class="n">KeyValue</span><span class="o">&lt;</span><span class="n">K</span><span class="o">,</span> <span class="n">LV</span><span class="o">&gt;</span> <span class="n">leftRecord</span> <span class="o">=</span> <span class="o">...;</span>
 <span class="n">KeyValue</span><span class="o">&lt;</span><span class="n">K</span><span class="o">,</span> <span class="n">RV</span><span class="o">&gt;</span> <span class="n">rightRecord</span> <span class="o">=</span> <span class="o">...;</span>
 <span class="n">ValueJoiner</span><span class="o">&lt;</span><span class="n">LV</span><span class="o">,</span> <span class="n">RV</span><span class="o">,</span> <span class="n">JV</span><span class="o">&gt;</span> <span class="n">joiner</span> <span class="o">=</span> <span class="o">...;</span>
 
 <span class="n">KeyValue</span><span class="o">&lt;</span><span class="n">K</span><span class="o">,</span> <span class="n">JV</span><span class="o">&gt;</span> <span class="n">joinOutputRecord</span> <span class="o">=</span> <span class="n">KeyValue</span><span class="o">.</span><span class="na">pair</span><span class="o">(</span>
     <span class="n">leftRecord</span><span class="o">.</span><span class="na">key</span><span class="o">,</span> <span class="cm">/* by definition, leftRecord.key == rightRecord.key */</span>
     <span class="n">joiner</span><span class="o">.</span><span class="na">apply</span><span class="o">(</span><span class="n">leftRecord</span><span class="o">.</span><span class="na">value</span><span class="o">,</span> <span class="n">rightRecord</span><span class="o">.</span><span class="na">value</span><span class="o">)</span>
-  <span class="o">);</span>
-</pre></div>
+  <span class="o">);</span></code></pre></div>
                         </div>
                         <table border="1" class="non-scrolling-table width-100-percent docutils">
                             <colgroup>
@@ -1875,8 +1844,7 @@
       <span class="n">Serdes</span><span class="o">.</span><span class="na">String</span><span class="o">(),</span> <span class="cm">/* key */</span>
       <span class="n">Serdes</span><span class="o">.</span><span class="na">Long</span><span class="o">(),</span>   <span class="cm">/* left value */</span>
       <span class="n">Serdes</span><span class="o">.</span><span class="na">Double</span><span class="o">())</span>  <span class="cm">/* right value */</span>
-  <span class="o">);</span>
-</pre></div>
+  <span class="o">);</span></code></pre></div>
                                     </div>
                                     <p>Detailed behavior:</p>
                                     <ul>
@@ -1934,8 +1902,7 @@
       <span class="n">Serdes</span><span class="o">.</span><span class="na">String</span><span class="o">(),</span> <span class="cm">/* key */</span>
       <span class="n">Serdes</span><span class="o">.</span><span class="na">Long</span><span class="o">(),</span>   <span class="cm">/* left value */</span>
       <span class="n">Serdes</span><span class="o">.</span><span class="na">Double</span><span class="o">())</span>  <span class="cm">/* right value */</span>
-  <span class="o">);</span>
-</pre></div>
+  <span class="o">);</span></code></pre></div>
                                     </div>
                                     <p>Detailed behavior:</p>
                                     <ul>
@@ -1996,8 +1963,7 @@
       <span class="n">Serdes</span><span class="o">.</span><span class="na">String</span><span class="o">(),</span> <span class="cm">/* key */</span>
       <span class="n">Serdes</span><span class="o">.</span><span class="na">Long</span><span class="o">(),</span>   <span class="cm">/* left value */</span>
       <span class="n">Serdes</span><span class="o">.</span><span class="na">Double</span><span class="o">())</span>  <span class="cm">/* right value */</span>
-  <span class="o">);</span>
-</pre></div>
+  <span class="o">);</span></code></pre></div>
                                     </div>
                                     <p>Detailed behavior:</p>
                                     <ul>
@@ -2162,8 +2128,7 @@
 <span class="n">KeyValue</span><span class="o">&lt;</span><span class="n">K</span><span class="o">,</span> <span class="n">JV</span><span class="o">&gt;</span> <span class="n">joinOutputRecord</span> <span class="o">=</span> <span class="n">KeyValue</span><span class="o">.</span><span class="na">pair</span><span class="o">(</span>
     <span class="n">leftRecord</span><span class="o">.</span><span class="na">key</span><span class="o">,</span> <span class="cm">/* by definition, leftRecord.key == rightRecord.key */</span>
     <span class="n">joiner</span><span class="o">.</span><span class="na">apply</span><span class="o">(</span><span class="n">leftRecord</span><span class="o">.</span><span class="na">value</span><span class="o">,</span> <span class="n">rightRecord</span><span class="o">.</span><span class="na">value</span><span class="o">)</span>
-  <span class="o">);</span>
-</pre></div>
+  <span class="o">);</span></code></pre></div>
                         </div>
                         <table border="1" class="non-scrolling-table width-100-percent docutils">
                             <colgroup>
@@ -2201,8 +2166,7 @@
       <span class="kd">public</span> <span class="n">String</span> <span class="nf">apply</span><span class="o">(</span><span class="n">Long</span> <span class="n">leftValue</span><span class="o">,</span> <span class="n">Double</span> <span class="n">rightValue</span><span class="o">)</span> <span class="o">{</span>
         <span class="k">return</span> <span class="s">&quot;left=&quot;</span> <span class="o">+</span> <span class="n">leftValue</span> <span class="o">+</span> <span class="s">&quot;, right=&quot;</span> <span class="o">+</span> <span class="n">rightValue</span><span class="o">;</span>
       <span class="o">}</span>
-    <span class="o">});</span>
-</pre></div>
+    <span class="o">});</span></code></pre></div>
                                     </div>
                                     <p>Detailed behavior:</p>
                                     <ul>
@@ -2247,8 +2211,7 @@
       <span class="kd">public</span> <span class="n">String</span> <span class="nf">apply</span><span class="o">(</span><span class="n">Long</span> <span class="n">leftValue</span><span class="o">,</span> <span class="n">Double</span> <span class="n">rightValue</span><span class="o">)</span> <span class="o">{</span>
         <span class="k">return</span> <span class="s">&quot;left=&quot;</span> <span class="o">+</span> <span class="n">leftValue</span> <span class="o">+</span> <span class="s">&quot;, right=&quot;</span> <span class="o">+</span> <span class="n">rightValue</span><span class="o">;</span>
       <span class="o">}</span>
-    <span class="o">});</span>
-</pre></div>
+    <span class="o">});</span></code></pre></div>
                                     </div>
                                     <p>Detailed behavior:</p>
                                     <ul>
@@ -2296,8 +2259,7 @@
       <span class="kd">public</span> <span class="n">String</span> <span class="nf">apply</span><span class="o">(</span><span class="n">Long</span> <span class="n">leftValue</span><span class="o">,</span> <span class="n">Double</span> <span class="n">rightValue</span><span class="o">)</span> <span class="o">{</span>
         <span class="k">return</span> <span class="s">&quot;left=&quot;</span> <span class="o">+</span> <span class="n">leftValue</span> <span class="o">+</span> <span class="s">&quot;, right=&quot;</span> <span class="o">+</span> <span class="n">rightValue</span><span class="o">;</span>
       <span class="o">}</span>
-    <span class="o">});</span>
-</pre></div>
+    <span class="o">});</span></code></pre></div>
                                     </div>
                                     <p>Detailed behavior:</p>
                                     <ul>
@@ -2463,11 +2425,11 @@
                       <h5><a class="toc-backref" href="#id34">KTable-KTable Foreign-Key
                           Join</a></h5>
                       <p>KTable-KTable foreign-key joins are always <em>non-windowed</em>
-                        joins. Foreign-key joins are analogous to joins in SQL. As a rough example:
+                        joins. Foreign-key joins are analogous to joins in SQL. As a rough example: 
                         <div><code class="docutils literal"><span class="pre">
-                        SELECT ... FROM {this KTable}
-                        JOIN {other KTable}
-                        ON {other.key} = {result of foreignKeyExtractor(this.value)} ...
+                        SELECT ... FROM {this KTable} 
+                        JOIN {other KTable} 
+                        ON {other.key} = {result of foreignKeyExtractor(this.value)} ... 
                         </span></code></div>
                         The output of the operation is a new KTable containing the join result.
                         </p>
@@ -2477,7 +2439,7 @@
                           internal" href="#streams_concepts_ktable"><span class="std
                             std-ref">table duals</span></a>. A foreign-key extractor
                         function is applied to the left record, with a new intermediate
-                        record created and is used to lookup and join with the corresponding
+                        record created and is used to lookup and join with the corresponding 
                         primary key on the right hand side table.
                         The result is a new KTable that represents the changelog stream
                         of the join operation.</p>
@@ -2516,8 +2478,7 @@
                 <span class="n">KTable</span><span class="o">&lt;Long</span><span class="o">,</span> <span class="n">Double</span><span class="o">&gt;</span> <span class="n">right</span> <span class="o">=</span> <span class="o">...;<br>//This </span><span class="o"><span class="o"><span class="n">foreignKeyExtractor</span></span> simply uses the left-value to map to the right-key.<br></span><span class="o"><span class="n">Function</span><span class="o">&lt;Long</span><span class="o">,</s [...]
                 <span class="n">KTable</span><span class="o">&lt;</span><span class="n">String</span><span class="o">,</span> <span class="n">String</span><span class="o">&gt;</span> <span class="n">joined</span> <span class="o">=</span> <span class="n">left</span><span class="o">.</span><span class="na">join</span><span class="o">(</span><span class="n">right</span><span class="o">,</span><br>    <span class="o"><span class="n">foreignKeyExtractor,</span></span>
                     <span class="o">(</span><span class="n">leftValue</span><span class="o">,</span> <span class="n">rightValue</span><span class="o">)</span> <span class="o">-&gt;</span> <span class="s">"left="</span> <span class="o">+</span> <span class="n">leftValue</span> <span class="o">+</span> <span class="s">", right="</span> <span class="o">+</span> <span class="n">rightValue</span> <span class="cm">/* ValueJoiner */</span>
-                  <span class="o">);</span>
-                </pre>
+                  <span class="o">);</span></code></pre>
                                 </div>
                               </div>
                               <p>Detailed behavior:</p>
@@ -2525,7 +2486,7 @@
                                 <li>
                                   <p class="first">The join is <em>key-based</em>, i.e.
                                     with the join predicate: </p>
-                                  <pre><code class="docutils literal"><span class="pre">foreignKeyExtractor.apply(leftRecord.value)</span> <span class="pre">==</span> <span class="pre">rightRecord.key</span></code></pre>
+                                  <pre><code class="docutils literal"><span class="pre">foreignKeyExtractor.apply(leftRecord.value)</span> <span class="pre">==</span> <span class="pre">rightRecord.key</span></code></code></pre>
                                 </li>
                                 <li>
                                   <p class="first">The join will be triggered under the
@@ -2576,8 +2537,7 @@
                 <span class="n">KTable</span><span class="o">&lt;Long</span><span class="o">,</span> <span class="n">Double</span><span class="o">&gt;</span> <span class="n">right</span> <span class="o">=</span> <span class="o">...;<br>//This </span><span class="o"><span class="o"><span class="n">foreignKeyExtractor</span></span> simply uses the left-value to map to the right-key.<br></span><span class="o"><span class="n">Function</span><span class="o">&lt;Long</span><span class="o">,</s [...]
                 <span class="n">KTable</span><span class="o">&lt;</span><span class="n">String</span><span class="o">,</span> <span class="n">String</span><span class="o">&gt;</span> <span class="n">joined</span> <span class="o">=</span> <span class="n">left</span><span class="o">.</span><span class="na">join</span><span class="o">(</span><span class="n">right</span><span class="o">,</span><br>    <span class="o"><span class="n">foreignKeyExtractor,</span></span>
                     <span class="o">(</span><span class="n">leftValue</span><span class="o">,</span> <span class="n">rightValue</span><span class="o">)</span> <span class="o">-&gt;</span> <span class="s">"left="</span> <span class="o">+</span> <span class="n">leftValue</span> <span class="o">+</span> <span class="s">", right="</span> <span class="o">+</span> <span class="n">rightValue</span> <span class="cm">/* ValueJoiner */</span>
-                  <span class="o">);</span>
-                </pre>
+                  <span class="o">);</span></code></pre>
                                 </div>
                               </div>
                               <p>Detailed behavior:</p>
@@ -2585,7 +2545,7 @@
                                 <li>
                                   <p class="first">The join is <em>key-based</em>, i.e.
                                     with the join predicate: </p>
-                                  <pre><code class="docutils literal"><span class="pre">foreignKeyExtractor.apply(leftRecord.value)</span> <span class="pre">==</span> <span class="pre">rightRecord.key</span></code></pre>
+                                  <pre><code class="docutils literal"><span class="pre">foreignKeyExtractor.apply(leftRecord.value)</span> <span class="pre">==</span> <span class="pre">rightRecord.key</span></code></code></pre>
                                 </li>
                                 <li>
                                   <p class="first">The join will be triggered under the
@@ -2774,8 +2734,7 @@
 <span class="n">KeyValue</span><span class="o">&lt;</span><span class="n">K</span><span class="o">,</span> <span class="n">JV</span><span class="o">&gt;</span> <span class="n">joinOutputRecord</span> <span class="o">=</span> <span class="n">KeyValue</span><span class="o">.</span><span class="na">pair</span><span class="o">(</span>
     <span class="n">leftRecord</span><span class="o">.</span><span class="na">key</span><span class="o">,</span> <span class="cm">/* by definition, leftRecord.key == rightRecord.key */</span>
     <span class="n">joiner</span><span class="o">.</span><span class="na">apply</span><span class="o">(</span><span class="n">leftRecord</span><span class="o">.</span><span class="na">value</span><span class="o">,</span> <span class="n">rightRecord</span><span class="o">.</span><span class="na">value</span><span class="o">)</span>
-  <span class="o">);</span>
-</pre></div>
+  <span class="o">);</span></code></pre></div>
                         </div>
                         <table border="1" class="non-scrolling-table width-100-percent docutils">
                             <colgroup>
@@ -2819,8 +2778,7 @@
     <span class="o">},</span>
     <span class="n">Joined</span><span class="o">.</span><span class="na">keySerde</span><span class="o">(</span><span class="n">Serdes</span><span class="o">.</span><span class="na">String</span><span class="o">())</span> <span class="cm">/* key */</span>
       <span class="o">.</span><span class="na">withValueSerde</span><span class="o">(</span><span class="n">Serdes</span><span class="o">.</span><span class="na">Long</span><span class="o">())</span> <span class="cm">/* left value */</span>
-  <span class="o">);</span>
-</pre></div>
+  <span class="o">);</span></code></pre></div>
                                     </div>
                                     <p>Detailed behavior:</p>
                                     <ul>
@@ -2872,8 +2830,7 @@
     <span class="o">},</span>
     <span class="n">Joined</span><span class="o">.</span><span class="na">keySerde</span><span class="o">(</span><span class="n">Serdes</span><span class="o">.</span><span class="na">String</span><span class="o">())</span> <span class="cm">/* key */</span>
       <span class="o">.</span><span class="na">withValueSerde</span><span class="o">(</span><span class="n">Serdes</span><span class="o">.</span><span class="na">Long</span><span class="o">())</span> <span class="cm">/* left value */</span>
-  <span class="o">);</span>
-</pre></div>
+  <span class="o">);</span></code></pre></div>
                                     </div>
                                     <p>Detailed behavior:</p>
                                     <ul>
@@ -3045,8 +3002,7 @@
 <span class="n">KeyValue</span><span class="o">&lt;</span><span class="n">K</span><span class="o">,</span> <span class="n">JV</span><span class="o">&gt;</span> <span class="n">joinOutputRecord</span> <span class="o">=</span> <span class="n">KeyValue</span><span class="o">.</span><span class="na">pair</span><span class="o">(</span>
     <span class="n">leftRecord</span><span class="o">.</span><span class="na">key</span><span class="o">,</span> <span class="cm">/* by definition, leftRecord.key == rightRecord.key */</span>
     <span class="n">joiner</span><span class="o">.</span><span class="na">apply</span><span class="o">(</span><span class="n">leftRecord</span><span class="o">.</span><span class="na">value</span><span class="o">,</span> <span class="n">rightRecord</span><span class="o">.</span><span class="na">value</span><span class="o">)</span>
-  <span class="o">);</span>
-</pre></div>
+  <span class="o">);</span></code></pre></div>
                         </div>
                         <table border="1" class="non-scrolling-table width-100-percent docutils">
                             <colgroup>
@@ -3070,7 +3026,7 @@
                                     <p>The <code class="docutils literal"><span class="pre">GlobalKTable</span></code> is fully bootstrapped upon (re)start of a <code class="docutils literal"><span class="pre">KafkaStreams</span></code> instance, which means the table is fully populated with all the data in the underlying topic that is
                                         available at the time of the startup.  The actual data processing begins only once the bootstrapping has completed.</p>
                                     <p><strong>Causes data re-partitioning of the stream if and only if the stream was marked for re-partitioning.</strong></p>
-                                    <div class="highlight-java"><div class="highlight"><pre><span></span><span class="n">KStream</span><span class="o">&lt;</span><span class="n">String</span><span class="o">,</span> <span class="n">Long</span><span class="o">&gt;</span> <span class="n">left</span> <span class="o">=</span> <span class="o">...;</span>
+                                    <div class="highlight-java"><div class="highlight"><pre><code><span></span><span class="n">KStream</span><span class="o">&lt;</span><span class="n">String</span><span class="o">,</span> <span class="n">Long</span><span class="o">&gt;</span> <span class="n">left</span> <span class="o">=</span> <span class="o">...;</span>
 <span class="n">GlobalKTable</span><span class="o">&lt;</span><span class="n">Integer</span><span class="o">,</span> <span class="n">Double</span><span class="o">&gt;</span> <span class="n">right</span> <span class="o">=</span> <span class="o">...;</span>
 
 <span class="c1">// Java 8+ example, using lambda expressions</span>
@@ -3092,8 +3048,7 @@
       <span class="kd">public</span> <span class="n">String</span> <span class="nf">apply</span><span class="o">(</span><span class="n">Long</span> <span class="n">leftValue</span><span class="o">,</span> <span class="n">Double</span> <span class="n">rightValue</span><span class="o">)</span> <span class="o">{</span>
         <span class="k">return</span> <span class="s">&quot;left=&quot;</span> <span class="o">+</span> <span class="n">leftValue</span> <span class="o">+</span> <span class="s">&quot;, right=&quot;</span> <span class="o">+</span> <span class="n">rightValue</span><span class="o">;</span>
       <span class="o">}</span>
-    <span class="o">});</span>
-</pre></div>
+    <span class="o">});</span></code></pre></div>
                                     </div>
                                     <p>Detailed behavior:</p>
                                     <ul class="last">
@@ -3146,8 +3101,7 @@
       <span class="kd">public</span> <span class="n">String</span> <span class="nf">apply</span><span class="o">(</span><span class="n">Long</span> <span class="n">leftValue</span><span class="o">,</span> <span class="n">Double</span> <span class="n">rightValue</span><span class="o">)</span> <span class="o">{</span>
         <span class="k">return</span> <span class="s">&quot;left=&quot;</span> <span class="o">+</span> <span class="n">leftValue</span> <span class="o">+</span> <span class="s">&quot;, right=&quot;</span> <span class="o">+</span> <span class="n">rightValue</span><span class="o">;</span>
       <span class="o">}</span>
-    <span class="o">});</span>
-</pre></div>
+    <span class="o">});</span></code></pre></div>
                                     </div>
                                     <p>Detailed behavior:</p>
                                     <ul class="last">
@@ -3248,8 +3202,7 @@
 <span class="c1">// The window&#39;s name -- the string parameter -- is used to e.g. name the backing state store.</span>
 <span class="kt">Duration</span> <span class="n">windowSizeMs</span> <span class="o">=</span> <span class="n">Duration</span><span class="o">.</span><span class="na">ofMinutes</span><span class="o">(</span><span class="mi">5</span><span class="o">);</span>
 <span class="kt">Duration</span> <span class="n">advanceMs</span> <span class="o">=</span>    <span class="n">Duration</span><span class="o">.</span><span class="na">ofMinutes</span><span class="o">(</span><span class="mi">1</span><span class="o">);</span>
-<span class="n">TimeWindows</span><span class="o">.</span><span class="na">of</span><span class="o">(</span><span class="n">windowSizeMs</span><span class="o">).</span><span class="na">advanceBy</span><span class="o">(</span><span class="n">advanceMs</span><span class="o">);</span>
-</pre></div>
+<span class="n">TimeWindows</span><span class="o">.</span><span class="na">of</span><span class="o">(</span><span class="n">windowSizeMs</span><span class="o">).</span><span class="na">advanceBy</span><span class="o">(</span><span class="n">advanceMs</span><span class="o">);</span></code></pre></div>
                         </div>
                         <div class="figure align-center" id="id4">
                             <img class="centered" src="/{{version}}/images/streams-time-windows-hopping.png">
@@ -3299,8 +3252,7 @@ become t=300,000).</span></p>
 <span class="n">TimeWindows</span><span class="o">.</span><span class="na">of</span><span class="o">(</span><span class="n">windowSizeMs</span><span class="o">).</span><span class="na">grace</span><span class="o">(</span><span class="n">gracePeriodMs</span><span class="o">);</span>
 
 <span class="c1">// The above is equivalent to the following code:</span>
-<span class="n">TimeWindows</span><span class="o">.</span><span class="na">of</span><span class="o">(</span><span class="n">windowSizeMs</span><span class="o">).</span><span class="na">advanceBy</span><span class="o">(</span><span class="n">windowSizeMs</span><span class="o">).</span><span class="na">grace</span><span class="o">(</span><span class="n">gracePeriodMs</span><span class="o">);</span>
-</pre></div>
+<span class="n">TimeWindows</span><span class="o">.</span><span class="na">of</span><span class="o">(</span><span class="n">windowSizeMs</span><span class="o">).</span><span class="na">advanceBy</span><span class="o">(</span><span class="n">windowSizeMs</span><span class="o">).</span><span class="na">grace</span><span class="o">(</span><span class="n">gracePeriodMs</span><span class="o">);</span></code></pre></div>
                         </div>
                     </div>
                     <div class="section" id="sliding-time-windows">
@@ -3354,8 +3306,7 @@ become t=300,000).</span></p>
 <span class="kn">import</span> <span class="nn">org.apache.kafka.streams.kstream.SessionWindows</span><span class="o">;</span>
 
 <span class="c1">// A session window with an inactivity gap of 5 minutes.</span>
-<span class="n">SessionWindows</span><span class="o">.</span><span class="na">with</span><span class="o">(</span><span class="n">Duration</span><span class="o">.</span><span class="na">ofMinutes</span><span class="o">(</span><span class="mi">5</span><span class="o">));</span>
-</pre></div>
+<span class="n">SessionWindows</span><span class="o">.</span><span class="na">with</span><span class="o">(</span><span class="n">Duration</span><span class="o">.</span><span class="na">ofMinutes</span><span class="o">(</span><span class="mi">5</span><span class="o">));</span></code></pre></div>
                         </div>
                         <p>Given the previous session window example, here&#8217;s what would happen on an input stream of six records.
                             When the first three records arrive (upper part of in the diagram below), we&#8217;d have three sessions (see lower part)
@@ -3401,16 +3352,14 @@ t=5 (blue), which lead to a merge of sessions and an extension of a session, res
 			    </p>
 			    <p>For example:</p>
 			    <div class="highlight-java"><div class="highlight">
-<pre>
-KGroupedStream&lt;UserId, Event&gt; grouped = ...;
+<pre class="line-numbers"><code class="language-text">KGroupedStream&lt;UserId, Event&gt; grouped = ...;
 grouped
     .windowedBy(TimeWindows.of(Duration.ofHours(1)).grace(ofMinutes(10)))
     .count()
     .suppress(Suppressed.untilWindowCloses(unbounded()))
     .filter((windowedUserId, count) -&gt; count &lt; 3)
     .toStream()
-    .foreach((windowedUserId, count) -&gt; sendAlert(windowedUserId.window(), windowedUserId.key(), count));
-</pre>
+    .foreach((windowedUserId, count) -&gt; sendAlert(windowedUserId.window(), windowedUserId.key(), count));</code></pre>
 			    </div></div>
 			    <p>The key parts of this program are:
 			    <dl>
@@ -3567,8 +3516,7 @@ grouped
     <span class="c1">// Any code for clean up would go here.  This processor instance will not be used again after this call.</span>
   <span class="o">}</span>
 
-<span class="o">}</span>
-</pre></div>
+<span class="o">}</span></code></pre></div>
                 </div>
                 <div class="admonition tip">
                     <p><b>Tip</b></p>
@@ -3595,11 +3543,10 @@ grouped
          <span class="o">.</span><span class="na">filter</span><span class="o">((</span><span class="n">PageId</span> <span class="n">pageId</span><span class="o">,</span> <span class="n">Long</span> <span class="n">viewCount</span><span class="o">)</span> <span class="o">-&gt;</span> <span class="n">viewCount</span> <span class="o">==</span> <span class="mi">1000</span><span class="o">)</span>
          <span class="c1">// PopularPageEmailAlert is your custom processor that implements the</span>
          <span class="c1">// `Processor` interface, see further down below.</span>
-         <span class="o">.</span><span class="na">process</span><span class="o">(()</span> <span class="o">-&gt;</span> <span class="k">new</span> <span class="n">PopularPageEmailAlert</span><span class="o">(</span><span class="s">&quot;alerts@yourcompany.com&quot;</span><span class="o">));</span>
-</pre></div>
+         <span class="o">.</span><span class="na">process</span><span class="o">(()</span> <span class="o">-&gt;</span> <span class="k">new</span> <span class="n">PopularPageEmailAlert</span><span class="o">(</span><span class="s">&quot;alerts@yourcompany.com&quot;</span><span class="o">));</span></code></pre></div>
                 </div>
                 <p>In Java 7:</p>
-                <div class="highlight-java"><div class="highlight"><pre><span></span><span class="c1">// Send an email notification when the view count of a page reaches one thousand.</span>
+                <div class="highlight-java"><div class="highlight"><pre><code><span></span><span class="c1">// Send an email notification when the view count of a page reaches one thousand.</span>
 <span class="n">pageViews</span><span class="o">.</span><span class="na">groupByKey</span><span class="o">().</span>
          <span class="o">.</span><span class="na">count</span><span class="o">()</span>
          <span class="o">.</span><span class="na">filter</span><span class="o">(</span>
@@ -3615,8 +3562,7 @@ grouped
                <span class="c1">// the `Processor` interface, see further down below.</span>
                <span class="k">return</span> <span class="k">new</span> <span class="n">PopularPageEmailAlert</span><span class="o">(</span><span class="s">&quot;alerts@yourcompany.com&quot;</span><span class="o">);</span>
              <span class="o">}</span>
-           <span class="o">});</span>
-</pre></div>
+           <span class="o">});</span></code></pre></div>
                 </div>
             </div>
         </div>
@@ -3642,14 +3588,12 @@ grouped
 	    <p>For example:
 	    </p>
 	    <div class="highlight-java"><div class="highlight">
-<pre>
-KGroupedTable&lt;String, String&gt; groupedTable = ...;
+<pre class="line-numbers"><code class="language-text">KGroupedTable&lt;String, String&gt; groupedTable = ...;
 groupedTable
     .count()
     .suppress(untilTimeLimit(ofMinutes(5), maxBytes(1_000_000L).emitEarlyWhenFull()))
     .toStream()
-    .foreach((key, count) -&gt; updateCountsDatabase(key, count));
-</pre>
+    .foreach((key, count) -&gt; updateCountsDatabase(key, count));</code></pre>
 	    </div></div>
 	    <p>This configuration ensures that <code>updateCountsDatabase</code> gets events for each <code>key</code> no more than once every 5 minutes.
 	       Note that the latest state for each key has to be buffered in memory for that 5-minute period.
@@ -3703,7 +3647,7 @@ groupedTable
                             how output records are distributed across the partitions of the output topic.</p>
                         <p>Another variant of <code class="docutils literal"><span class="pre">to</span></code> exists that enables you to dynamically choose which topic to send to for each record via a <code class="docutils literal"><span class="pre">TopicNameExtractor</span></code>
                             instance.</p>
-                        <div class="highlight-java"><div class="highlight"><pre><span></span><span class="n">KStream</span><span class="o">&lt;</span><span class="n">String</span><span class="o">,</span> <span class="n">Long</span><span class="o">&gt;</span> <span class="n">stream</span> <span class="o">=</span> <span class="o">...;</span>
+                        <div class="highlight-java"><div class="highlight"><pre><code><span></span><span class="n">KStream</span><span class="o">&lt;</span><span class="n">String</span><span class="o">,</span> <span class="n">Long</span><span class="o">&gt;</span> <span class="n">stream</span> <span class="o">=</span> <span class="o">...;</span>
 <span class="n">KTable</span><span class="o">&lt;</span><span class="n">String</span><span class="o">,</span> <span class="n">Long</span><span class="o">&gt;</span> <span class="n">table</span> <span class="o">=</span> <span class="o">...;</span>
 
 
@@ -3716,8 +3660,7 @@ groupedTable
 
 <span class="c1">// Write the stream to the output topic, using explicit key and value serdes,</span>
 <span class="c1">// (thus overriding the defaults in the config properties).</span>
-<span class="n">stream</span><span class="o">.</span><span class="na">to</span><span class="o">(</span><span class="s">&quot;my-stream-output-topic&quot;</span><span class="o">,</span> <span class="n">Produced</span><span class="o">.</span><span class="na">with</span><span class="o">(</span><span class="n">Serdes</span><span class="o">.</span><span class="na">String</span><span class="o">(),</span> <span class="n">Serdes</span><span class="o">.</span><span class="na">Long</span><span cla [...]
-</pre></div>
+<span class="n">stream</span><span class="o">.</span><span class="na">to</span><span class="o">(</span><span class="s">&quot;my-stream-output-topic&quot;</span><span class="o">,</span> <span class="n">Produced</span><span class="o">.</span><span class="na">with</span><span class="o">(</span><span class="n">Serdes</span><span class="o">.</span><span class="na">String</span><span class="o">(),</span> <span class="n">Serdes</span><span class="o">.</span><span class="na">Long</span><span cla [...]
                         </div>
                         <p><strong>Causes data re-partitioning if any of the following conditions is true:</strong></p>
                         <ol class="last arabic simple">
@@ -3783,25 +3726,20 @@ groupedTable
               <li><code class="docutils literal"><span class="pre">org.apache.kafka.streams.scala.serialization.Serdes</span></code>: Module that contains all primitive SerDes that can be imported as implicits and a helper to create custom SerDes.</li>
             </ul>
             <p>The library is cross-built with Scala 2.12 and 2.13. To reference the library compiled against Scala {{scalaVersion}} include the following in your maven <code>pom.xml</code> add the following:</p>
-            <pre class="brush: xml;">
-              &lt;dependency&gt;
+            <pre class="line-numbers"><code class="language-xml">              &lt;dependency&gt;
                 &lt;groupId&gt;org.apache.kafka&lt;/groupId&gt;
                 &lt;artifactId&gt;kafka-streams-scala_{{scalaVersion}}&lt;/artifactId&gt;
                 &lt;version&gt;{{fullDotVersion}}&lt;/version&gt;
-              &lt;/dependency&gt;
-            </pre>
+              &lt;/dependency&gt;</code></pre>
             <p>To use the library compiled against Scala 2.12 replace the <code class="docutils literal"><span class="pre">artifactId</span></code> with <code class="docutils literal"><span class="pre">kafka-streams-scala_2.12</span></code>.</p>
             <p>When using SBT then you can reference the correct library using the following:</p>
-            <pre class="brush: scala;">
-              libraryDependencies += "org.apache.kafka" %% "kafka-streams-scala" % "{{fullDotVersion}}"
-            </pre>
+            <pre class="line-numbers"><code class="language-scala">              libraryDependencies += "org.apache.kafka" %% "kafka-streams-scala" % "{{fullDotVersion}}"</code></pre>
             <div class="section" id="scala-dsl-sample-usage">
               <span id="streams-developer-guide-dsl-sample-usage"></span><h3><a class="toc-backref" href="#id28">Sample Usage</a><a class="headerlink" href="#scala-dsl-sample-usage" title="Permalink to this headline"></a></h3>
               <p>The library works by wrapping the original Java abstractions of Kafka Streams within a Scala wrapper object and then using implicit conversions between them. All the Scala abstractions are named identically as the corresponding Java abstraction, but they reside in a different package of the library e.g. the Scala class <code class="docutils literal"><span class="pre">org.apache.kafka.streams.scala.StreamsBuilder</span></code> is a wrapper around <code class="docutils lit [...]
               <p>Here's an example of the classic WordCount program that uses the Scala <code class="docutils literal"><span class="pre">StreamsBuilder</span></code> that builds an instance of <code class="docutils literal"><span class="pre">KStream</span></code> which is a wrapper around Java <code class="docutils literal"><span class="pre">KStream</span></code>. Then we reify to a table and get a <code class="docutils literal"><span class="pre">KTable</span></code>, which, again is a w [...]
               <p>The net result is that the following code is structured just like using the Java API, but with Scala and with far fewer type annotations compared to using the Java API directly from Scala. The difference in type annotation usage is more obvious when given an example.  Below is an example WordCount implementation that will be used to demonstrate the differences between the Scala and Java API.</p>
-              <pre class="brush: scala;">
-import java.time.Duration
+              <pre class="line-numbers"><code class="language-scala">import java.time.Duration
 import java.util.Properties
 
 import org.apache.kafka.streams.kstream.Materialized
@@ -3834,8 +3772,7 @@ object WordCountApplication extends App {
   sys.ShutdownHookThread {
      streams.close(Duration.ofSeconds(10))
   }
-}
-              </pre>
+}</code></pre>
               <p>In the above code snippet, we don't have to provide any SerDes, <code class="docutils literal"><span class="pre">Grouped</span></code>, <code class="docutils literal"><span class="pre">Produced</span></code>, <code class="docutils literal"><span class="pre">Consumed</span></code> or <code class="docutils literal"><span class="pre">Joined</span></code> explicitly. They will also not be dependent on any SerDes specified in the config.  <strong>In fact all SerDes specified  [...]
             </div>
             <div class="section" id="scala-dsl-implicit-serdes">
@@ -3844,8 +3781,7 @@ object WordCountApplication extends App {
               <p>The library uses the power of <a href="https://docs.scala-lang.org/tour/implicit-parameters.html">Scala implicit parameters</a> to alleviate this concern. As a user you can provide implicit SerDes or implicit values of <code class="docutils literal"><span class="pre">Grouped</span></code>, <code class="docutils literal"><span class="pre">Produced</span></code>, <code class="docutils literal"><span class="pre">Repartitioned</span></code>, <code class="docutils literal"><s [...]
               <p>The library also bundles all implicit SerDes of the commonly used primitive types in a Scala module - so just import the module vals and have all SerDes in scope.  A similar strategy of modular implicits can be adopted for any user-defined SerDes as well (User-defined SerDes are discussed in the next section).</p>
               <p>Here's an example:</p>
-              <pre class="brush: scala;">
-// DefaultSerdes brings into scope implicit SerDes (mostly for primitives)
+              <pre class="line-numbers"><code class="language-scala">// DefaultSerdes brings into scope implicit SerDes (mostly for primitives)
 // that will set up all Grouped, Produced, Consumed and Joined instances.
 // So all APIs below that accept Grouped, Produced, Consumed or Joined will
 // get these instances automatically
@@ -3867,8 +3803,7 @@ val clicksPerRegion: KTable[String, Long] =
     .groupByKey
     .reduce(_ + _)
 
-clicksPerRegion.toStream.to(outputTopic)
-              </pre>
+clicksPerRegion.toStream.to(outputTopic)</code></pre>
               <p>Quite a few things are going on in the above code snippet that may warrant a few lines of elaboration:</p>
               <ol>
                 <li>The code snippet does not depend on any config defined SerDes. In fact any SerDes defined as part of the config will be ignored.</li>
@@ -3880,8 +3815,7 @@ clicksPerRegion.toStream.to(outputTopic)
             <div class="section" id="scala-dsl-user-defined-serdes">
               <span id="streams-developer-guide-dsl-scala-dsl-user-defined-serdes"></span><h3><a class="toc-backref" href="#id30">User-Defined SerDes</a><a class="headerlink" href="#scala-dsl-user-defined-serdes" title="Permalink to this headline"></a></h3>
               <p>When the default primitive SerDes are not enough and we need to define custom SerDes, the usage is exactly the same as above. Just define the implicit SerDes and start building the stream transformation. Here's an example with <code class="docutils literal"><span class="pre">AvroSerde</span></code>:</p>
-              <pre class="brush: scala;">
-// domain object as a case class
+              <pre class="line-numbers"><code class="language-scala">// domain object as a case class
 case class UserClicks(clicks: Long)
 
 // An implicit Serde implementation for the values we want to
@@ -3912,8 +3846,7 @@ val clicksPerRegion: KTable[String, Long] =
    .reduce(_ + _)
 
 // Write the (continuously updating) results to the output topic.
-clicksPerRegion.toStream.to(outputTopic)
-              </pre>
+clicksPerRegion.toStream.to(outputTopic)</code></pre>
               <p>A complete example of user-defined SerDes can be found in a test class within the library.</p>
             </div>
         </div>
@@ -3930,10 +3863,10 @@ clicksPerRegion.toStream.to(outputTopic)
 
 <!--#include virtual="../../../includes/_header.htm" -->
 <!--#include virtual="../../../includes/_top.htm" -->
-<div class="content documentation documentation--current">
+<div class="content documentation ">
   <!--#include virtual="../../../includes/_nav.htm" -->
   <div class="right">
-    <!--#include virtual="../../../includes/_docs_banner.htm" -->
+    <!--//#include virtual="../../../includes/_docs_banner.htm" -->
     <ul class="breadcrumbs">
       <li><a href="/documentation">Documentation</a></li>
       <li><a href="/documentation/streams">Kafka Streams</a></li>
diff --git a/docs/streams/developer-guide/dsl-topology-naming.html b/docs/streams/developer-guide/dsl-topology-naming.html
index e0c1e1f..a3b2a53 100644
--- a/docs/streams/developer-guide/dsl-topology-naming.html
+++ b/docs/streams/developer-guide/dsl-topology-naming.html
@@ -18,7 +18,7 @@
 <script><!--#include virtual="../../js/templateData.js" --></script>
 
 <script id="content-template" type="text/x-handlebars-template">
-    <!-- h1>Developer Guide for Kafka Streams</h1 -->
+    <h1>Developer Guide for Kafka Streams</h1>
     <div class="sub-nav-sticky">
         <div class="sticky-top">
             <!-- div style="height:35px">
@@ -71,20 +71,17 @@
 		 	For example, consider the following simple topology:
 
 			<br/>
-		<pre>
-		KStream&lt;String,String&gt; stream = builder.stream("input");
+		<pre class="line-numbers"><code class="language-text">		KStream&lt;String,String&gt; stream = builder.stream("input");
 		stream.filter((k,v) -> !v.equals("invalid_txn"))
 		      .mapValues((v) -> v.substring(0,5))
-		      .to("output")
-	    </pre>
+		      .to("output")</code></pre>
 
 		</p>
 
 		<p>
 		Running <code>Topology#describe()</code> yields this string:
 
-		<pre>
-		Topologies:
+		<pre class="line-numbers"><code class="language-text">		Topologies:
 		   Sub-topology: 0
 		    Source: KSTREAM-SOURCE-0000000000 (topics: [input])
 		      --> KSTREAM-FILTER-0000000001
@@ -95,8 +92,7 @@
 		      --> KSTREAM-SINK-0000000003
 		      <-- KSTREAM-FILTER-0000000001
 		    Sink: KSTREAM-SINK-0000000003 (topic: output)
-		      <-- KSTREAM-MAPVALUES-0000000002
-		 </pre>
+		      <-- KSTREAM-MAPVALUES-0000000002</code></pre>
 
 		 From this report, you can see what the different operators are, but what is the broader context here?
 		 For example, consider <code>KSTREAM-FILTER-0000000001</code>, we can see that it's a
@@ -116,16 +112,13 @@
 		</p>
 		<p>
 		 Now let's take a look at your topology with all the processors named:
-		<pre>
-		KStream&lt;String,String&gt; stream =
+		<pre class="line-numbers"><code class="language-text">		KStream&lt;String,String&gt; stream =
 		builder.stream("input", Consumed.as("Customer_transactions_input_topic"));
 		stream.filter((k,v) -> !v.equals("invalid_txn"), Named.as("filter_out_invalid_txns"))
 		      .mapValues((v) -> v.substring(0,5), Named.as("Map_values_to_first_6_characters"))
-		      .to("output", Produced.as("Mapped_transactions_output_topic"));
-	     </pre>
+		      .to("output", Produced.as("Mapped_transactions_output_topic"));</code></pre>
 
-		 <pre>
-		 Topologies:
+		 <pre class="line-numbers"><code class="language-text">		 Topologies:
 		   Sub-topology: 0
 		    Source: Customer_transactions_input_topic (topics: [input])
 		      --> filter_out_invalid_txns
@@ -136,8 +129,7 @@
 		      --> Mapped_transactions_output_topic
 		      <-- filter_out_invalid_txns
 		    Sink: Mapped_transactions_output_topic (topic: output)
-		      <-- Map_values_to_first_6_characters
-		 </pre>
+		      <-- Map_values_to_first_6_characters</code></pre>
 
 		Now you can look at the topology description and easily understand what role each processor
 		plays in the topology. But there's another reason for naming your processor nodes when you
@@ -159,16 +151,13 @@
 		 shifting does have implications for topologies with stateful operators or repartition topics. 
 
 		 Here's a different topology with some state:
-		 <pre>
-		 KStream&lt;String,String&gt; stream = builder.stream("input");
+		 <pre class="line-numbers"><code class="language-text">		 KStream&lt;String,String&gt; stream = builder.stream("input");
 		 stream.groupByKey()
 		       .count()
 		       .toStream()
-		       .to("output");
-		 </pre>
+		       .to("output");</code></pre>
 		This topology description yields the following:
-		 <pre>
-			 Topologies:
+		 <pre class="line-numbers"><code class="language-text">			 Topologies:
 			   Sub-topology: 0
 			    Source: KSTREAM-SOURCE-0000000000 (topics: [input])
 			     --> KSTREAM-AGGREGATE-0000000002
@@ -179,25 +168,21 @@
 			     --> KSTREAM-SINK-0000000004
 			     <-- KSTREAM-AGGREGATE-0000000002
 			    Sink: KSTREAM-SINK-0000000004 (topic: output)
-			     <-- KTABLE-TOSTREAM-0000000003
-		 </pre>
+			     <-- KTABLE-TOSTREAM-0000000003</code></pre>
 		 </p>
 		 <p>
 		  You can see from the topology description above that the state store is named
 		  <code>KSTREAM-AGGREGATE-STATE-STORE-0000000002</code>.  Here's what happens when you
 		  add a filter to keep some of the records out of the aggregation:
-		  <pre>
-		   KStream&lt;String,String&gt; stream = builder.stream("input");
+		  <pre class="line-numbers"><code class="language-text">		   KStream&lt;String,String&gt; stream = builder.stream("input");
 		   stream.filter((k,v)-> v !=null && v.length() >= 6 )
 		         .groupByKey()
 		         .count()
 		         .toStream()
-		         .to("output");
-		  </pre>
+		         .to("output");</code></pre>
 
 		  And the corresponding topology:
-		  <pre>
-			  Topologies:
+		  <pre class="line-numbers"><code class="language-text">			  Topologies:
 			    Sub-topology: 0
 			     Source: KSTREAM-SOURCE-0000000000 (topics: [input])
 			      --> KSTREAM-FILTER-0000000001
@@ -211,8 +196,7 @@
 			       --> KSTREAM-SINK-0000000005
 			       <-- KSTREAM-AGGREGATE-0000000003
 			      Sink: KSTREAM-SINK-0000000005 (topic: output)
-			       <-- KTABLE-TOSTREAM-0000000004
-		  </pre>
+			       <-- KTABLE-TOSTREAM-0000000004</code></pre>
 		 </p>
 		<p>
 		Notice that since you've added an operation <em>before</em> the <code>count</code> operation, the state
@@ -232,19 +216,16 @@
 		  But it's worth reiterating the importance of naming these DSL topology operations again.
 
 		  Here's how your DSL code looks now giving a specific name to your state store:
-		  <pre>
-		  KStream&lt;String,String&gt; stream = builder.stream("input");
+		  <pre class="line-numbers"><code class="language-text">		  KStream&lt;String,String&gt; stream = builder.stream("input");
 		  stream.filter((k, v) -> v != null && v.length() >= 6)
 		        .groupByKey()
 		        .count(Materialized.as("Purchase_count_store"))
 		        .toStream()
-		        .to("output");
-		  </pre>
+		        .to("output");</code></pre>
 
 		  And here's the topology 
 
-		  <pre>
-		  Topologies:
+		  <pre class="line-numbers"><code class="language-text">		  Topologies:
 		   Sub-topology: 0
 		    Source: KSTREAM-SOURCE-0000000000 (topics: [input])
 		      --> KSTREAM-FILTER-0000000001
@@ -258,8 +239,7 @@
 		      --> KSTREAM-SINK-0000000004
 		      <-- KSTREAM-AGGREGATE-0000000002
 		    Sink: KSTREAM-SINK-0000000004 (topic: output)
-		      <-- KTABLE-TOSTREAM-0000000003
-		  </pre>
+		      <-- KTABLE-TOSTREAM-0000000003</code></pre>
 		</p>
 		<p>
 		  Now, even though you've added processors before your state store, the store name and its changelog
@@ -327,10 +307,10 @@
 
 <!--#include virtual="../../../includes/_header.htm" -->
 <!--#include virtual="../../../includes/_top.htm" -->
-<div class="content documentation documentation--current">
+<div class="content documentation ">
 	<!--#include virtual="../../../includes/_nav.htm" -->
 	<div class="right">
-		<!--#include virtual="../../../includes/_docs_banner.htm" -->
+		<!--//#include virtual="../../../includes/_docs_banner.htm" -->
 		<ul class="breadcrumbs">
 			<li><a href="/documentation">Documentation</a></li>
 			<li><a href="/documentation/streams">Kafka Streams</a></li>
diff --git a/docs/streams/developer-guide/index.html b/docs/streams/developer-guide/index.html
index eb96f7d..19f638e 100644
--- a/docs/streams/developer-guide/index.html
+++ b/docs/streams/developer-guide/index.html
@@ -68,10 +68,10 @@
 
 <!--#include virtual="../../../includes/_header.htm" -->
 <!--#include virtual="../../../includes/_top.htm" -->
-<div class="content documentation documentation--current">
+<div class="content documentation ">
     <!--#include virtual="../../../includes/_nav.htm" -->
     <div class="right">
-        <!--#include virtual="../../../includes/_docs_banner.htm" -->
+        <!--//#include virtual="../../../includes/_docs_banner.htm" -->
         <ul class="breadcrumbs">
             <li><a href="/documentation">Documentation</a></li>
             <li><a href="/documentation/streams">Kafka Streams</a></li>
diff --git a/docs/streams/developer-guide/interactive-queries.html b/docs/streams/developer-guide/interactive-queries.html
index d6cfc33..ad890e0 100644
--- a/docs/streams/developer-guide/interactive-queries.html
+++ b/docs/streams/developer-guide/interactive-queries.html
@@ -143,8 +143,7 @@
 
 <span class="c1">// Start an instance of the topology</span>
 <span class="n">KafkaStreams</span> <span class="n">streams</span> <span class="o">=</span> <span class="k">new</span> <span class="n">KafkaStreams</span><span class="o">(</span><span class="n">builder</span><span class="o">,</span> <span class="n">props</span><span class="o">);</span>
-<span class="n">streams</span><span class="o">.</span><span class="na">start</span><span class="o">();</span>
-</pre></div>
+<span class="n">streams</span><span class="o">.</span><span class="na">start</span><span class="o">();</span></code></pre></div>
                 </div>
                 <p>After the application has started, you can get access to &#8220;CountsKeyValueStore&#8221; and then query it via the <a class="reference external" href="https://github.com/apache/kafka/blob/{{dotVersion}}/streams/src/main/java/org/apache/kafka/streams/state/ReadOnlyKeyValueStore.java">ReadOnlyKeyValueStore</a> API:</p>
                 <div class="highlight-java"><div class="highlight"><pre><span></span><span class="c1">// Get the key-value store CountsKeyValueStore</span>
@@ -166,8 +165,7 @@
 <span class="k">while</span> <span class="o">(</span><span class="n">range</span><span class="o">.</span><span class="na">hasNext</span><span class="o">())</span> <span class="o">{</span>
   <span class="n">KeyValue</span><span class="o">&lt;</span><span class="n">String</span><span class="o">,</span> <span class="n">Long</span><span class="o">&gt;</span> <span class="n">next</span> <span class="o">=</span> <span class="n">range</span><span class="o">.</span><span class="na">next</span><span class="o">();</span>
   <span class="n">System</span><span class="o">.</span><span class="na">out</span><span class="o">.</span><span class="na">println</span><span class="o">(</span><span class="s">&quot;count for &quot;</span> <span class="o">+</span> <span class="n">next</span><span class="o">.</span><span class="na">key</span> <span class="o">+</span> <span class="s">&quot;: &quot;</span> <span class="o">+</span> <span class="n">next</span><span class="o">.</span><span class="na">value</span><span class=" [...]
-<span class="o">}</span>
-</pre></div>
+<span class="o">}</span></code></pre></div>
                 </div>
                 <p>You can also materialize the results of stateless operators by using the overloaded methods that take a <code class="docutils literal"><span class="pre">queryableStoreName</span></code>
                     as shown in the example below:</p>
@@ -182,8 +180,7 @@
 
 <span class="c1">// do not materialize the result of filtering corresponding to even numbers</span>
 <span class="c1">// this means that these results will not be materialized and cannot be queried.</span>
-<span class="n">KTable</span><span class="o">&lt;</span><span class="n">String</span><span class="o">,</span> <span class="n">Integer</span><span class="o">&gt;</span> <span class="n">oddCounts</span> <span class="o">=</span> <span class="n">numberLines</span><span class="o">.</span><span class="na">filter</span><span class="o">((</span><span class="n">region</span><span class="o">,</span> <span class="n">count</span><span class="o">)</span> <span class="o">-&gt;</span> <span class="o">( [...]
-</pre></div>
+<span class="n">KTable</span><span class="o">&lt;</span><span class="n">String</span><span class="o">,</span> <span class="n">Integer</span><span class="o">&gt;</span> <span class="n">oddCounts</span> <span class="o">=</span> <span class="n">numberLines</span><span class="o">.</span><span class="na">filter</span><span class="o">((</span><span class="n">region</span><span class="o">,</span> <span class="n">count</span><span class="o">)</span> <span class="o">-&gt;</span> <span class="o">( [...]
                 </div>
             </div>
             <div class="section" id="querying-local-window-stores">
@@ -203,8 +200,7 @@
 
 <span class="c1">// Create a window state store named &quot;CountsWindowStore&quot; that contains the word counts for every minute</span>
 <span class="n">groupedByWord</span><span class="o">.</span><span class="na">windowedBy</span><span class="o">(</span><span class="n">TimeWindows</span><span class="o">.</span><span class="na">of</span><span class="o">(<span class="n">Duration</span><span class="o">.</span><span class="na">ofSeconds</span><span class="o">(</span><span class="mi">60</span><span class="o">)))</span>
-  <span class="o">.</span><span class="na">count</span><span class="o">(</span><span class="n">Materialized</span><span class="o">.&lt;</span><span class="n">String</span><span class="o">,</span> <span class="n">Long</span><span class="o">,</span> <span class="n">WindowStore</span><span class="o">&lt;</span><span class="n">Bytes</span><span class="o">,</span> <span class="kt">byte</span><span class="o">[]&gt;</span><span class="n">as</span><span class="o">(</span><span class="s">&quot;Co [...]
-</pre></div>
+  <span class="o">.</span><span class="na">count</span><span class="o">(</span><span class="n">Materialized</span><span class="o">.&lt;</span><span class="n">String</span><span class="o">,</span> <span class="n">Long</span><span class="o">,</span> <span class="n">WindowStore</span><span class="o">&lt;</span><span class="n">Bytes</span><span class="o">,</span> <span class="kt">byte</span><span class="o">[]&gt;</span><span class="n">as</span><span class="o">(</span><span class="s">&quot;Co [...]
                 </div>
                 <p>After the application has started, you can get access to &#8220;CountsWindowStore&#8221; and then query it via the <a class="reference external" href="https://github.com/apache/kafka/blob/{{dotVersion}}/streams/src/main/java/org/apache/kafka/streams/state/ReadOnlyWindowStore.java">ReadOnlyWindowStore</a> API:</p>
                 <div class="highlight-java"><div class="highlight"><pre><span></span><span class="c1">// Get the window store named &quot;CountsWindowStore&quot;</span>
@@ -220,8 +216,7 @@
   <span class="n">KeyValue</span><span class="o">&lt;</span><span class="n">Long</span><span class="o">,</span> <span class="n">Long</span><span class="o">&gt;</span> <span class="n">next</span> <span class="o">=</span> <span class="n">iterator</span><span class="o">.</span><span class="na">next</span><span class="o">();</span>
   <span class="kt">long</span> <span class="n">windowTimestamp</span> <span class="o">=</span> <span class="n">next</span><span class="o">.</span><span class="na">key</span><span class="o">;</span>
   <span class="n">System</span><span class="o">.</span><span class="na">out</span><span class="o">.</span><span class="na">println</span><span class="o">(</span><span class="s">&quot;Count of &#39;world&#39; @ time &quot;</span> <span class="o">+</span> <span class="n">windowTimestamp</span> <span class="o">+</span> <span class="s">&quot; is &quot;</span> <span class="o">+</span> <span class="n">next</span><span class="o">.</span><span class="na">value</span><span class="o">);</span>
-<span class="o">}</span>
-</pre></div>
+<span class="o">}</span></code></pre></div>
                 </div>
             </div>
             <div class="section" id="querying-local-custom-state-stores">
@@ -238,7 +233,7 @@
                     <li>It is recommended that you provide an interface that restricts access to read-only operations. This prevents users of this API from mutating the state of your running Kafka Streams application out-of-band.</li>
                 </ul>
                 <p>The class/interface hierarchy for your custom store might look something like:</p>
-                <div class="highlight-java"><div class="highlight"><pre><span></span><span class="kd">public</span> <span class="kd">class</span> <span class="nc">MyCustomStore</span><span class="o">&lt;</span><span class="n">K</span><span class="o">,</span><span class="n">V</span><span class="o">&gt;</span> <span class="kd">implements</span> <span class="n">StateStore</span><span class="o">,</span> <span class="n">MyWriteableCustomStore</span><span class="o">&lt;</span><span class="n">K [...]
+                <div class="highlight-java"><div class="highlight"><pre><code><span></span><span class="kd">public</span> <span class="kd">class</span> <span class="nc">MyCustomStore</span><span class="o">&lt;</span><span class="n">K</span><span class="o">,</span><span class="n">V</span><span class="o">&gt;</span> <span class="kd">implements</span> <span class="n">StateStore</span><span class="o">,</span> <span class="n">MyWriteableCustomStore</span><span class="o">&lt;</span><span class [...]
   <span class="c1">// implementation of the actual store</span>
 <span class="o">}</span>
 
@@ -254,8 +249,7 @@
 
 <span class="kd">public</span> <span class="kd">class</span> <span class="nc">MyCustomStoreBuilder</span> <span class="kd">implements</span> <span class="n">StoreBuilder</span> <span class="o">{</span>
   <span class="c1">// implementation of the supplier for MyCustomStore</span>
-<span class="o">}</span>
-</pre></div>
+<span class="o">}</span></code></pre></div>
                 </div>
                 <p>To make this store queryable you must:</p>
                 <ul class="simple">
@@ -274,8 +268,7 @@
       <span class="k">return</span> <span class="k">new</span> <span class="n">MyCustomStoreTypeWrapper</span><span class="o">(</span><span class="n">storeProvider</span><span class="o">,</span> <span class="n">storeName</span><span class="o">,</span> <span class="k">this</span><span class="o">);</span>
   <span class="o">}</span>
 
-<span class="o">}</span>
-</pre></div>
+<span class="o">}</span></code></pre></div>
                 </div>
                 <p>A wrapper class is required because each instance of a Kafka Streams application may run multiple stream tasks and manage
                     multiple local instances of a particular state store.  The wrapper class hides this complexity and lets you query a &#8220;logical&#8221;
@@ -286,7 +279,7 @@
                     <code class="docutils literal"><span class="pre">StateStoreProvider#stores(String</span> <span class="pre">storeName,</span> <span class="pre">QueryableStoreType&lt;T&gt;</span> <span class="pre">queryableStoreType)</span></code> returns a <code class="docutils literal"><span class="pre">List</span></code> of state
                     stores with the given storeName and of the type as defined by <code class="docutils literal"><span class="pre">queryableStoreType</span></code>.</p>
                 <p>Here is an example implementation of the wrapper follows (Java 8+):</p>
-                <div class="highlight-java"><div class="highlight"><pre><span></span><span class="c1">// We strongly recommended implementing a read-only interface</span>
+                <div class="highlight-java"><div class="highlight"><pre><code><span></span><span class="c1">// We strongly recommended implementing a read-only interface</span>
 <span class="c1">// to restrict usage of the store to safe read operations!</span>
 <span class="kd">public</span> <span class="kd">class</span> <span class="nc">MyCustomStoreTypeWrapper</span><span class="o">&lt;</span><span class="n">K</span><span class="o">,</span><span class="n">V</span><span class="o">&gt;</span> <span class="kd">implements</span> <span class="n">MyReadableCustomStore</span><span class="o">&lt;</span><span class="n">K</span><span class="o">,</span><span class="n">V</span><span class="o">&gt;</span> <span class="o">{</span>
 
@@ -312,8 +305,7 @@
     <span class="k">return</span> <span class="n">value</span><span class="o">.</span><span class="na">orElse</span><span class="o">(</span><span class="kc">null</span><span class="o">);</span>
   <span class="o">}</span>
 
-<span class="o">}</span>
-</pre></div>
+<span class="o">}</span></code></pre></div>
                 </div>
                 <p>You can now find and query your custom store:</p>
                 <div class="highlight-java"><div class="highlight"><pre><span></span>
@@ -335,8 +327,7 @@
 <span class="c1">// Get access to the custom store</span>
 <span class="n">MyReadableCustomStore</span><span class="o">&lt;</span><span class="n">String</span><span class="o">,</span><span class="n">String</span><span class="o">&gt;</span> <span class="n">store</span> <span class="o">=</span> <span class="n">streams</span><span class="o">.</span><span class="na">store</span><span class="o">(</span><span class="s">&quot;the-custom-store&quot;</span><span class="o">,</span> <span class="k">new</span> <span class="n">MyCustomStoreType</span><span c [...]
 <span class="c1">// Query the store</span>
-<span class="n">String</span> <span class="n">value</span> <span class="o">=</span> <span class="n">store</span><span class="o">.</span><span class="na">read</span><span class="o">(</span><span class="s">&quot;key&quot;</span><span class="o">);</span>
-</pre></div>
+<span class="n">String</span> <span class="n">value</span> <span class="o">=</span> <span class="n">store</span><span class="o">.</span><span class="na">read</span><span class="o">(</span><span class="s">&quot;key&quot;</span><span class="o">);</span></code></pre></div>
                 </div>
             </div>
         </div>
@@ -410,8 +401,7 @@ interactive queries</span></p>
 <span class="c1">// fictitious, but we provide end-to-end demo applications (such as KafkaMusicExample)</span>
 <span class="c1">// that showcase how to implement such a service to get you started.</span>
 <span class="n">MyRPCService</span> <span class="n">rpcService</span> <span class="o">=</span> <span class="o">...;</span>
-<span class="n">rpcService</span><span class="o">.</span><span class="na">listenAt</span><span class="o">(</span><span class="n">rpcEndpoint</span><span class="o">);</span>
-</pre></div>
+<span class="n">rpcService</span><span class="o">.</span><span class="na">listenAt</span><span class="o">(</span><span class="n">rpcEndpoint</span><span class="o">);</span></code></pre></div>
                 </div>
             </div>
             <div class="section" id="discovering-and-accessing-application-instances-and-their-local-state-stores">
@@ -460,8 +450,7 @@ interactive queries</span></p>
         <span class="k">return</span> <span class="n">http</span><span class="o">.</span><span class="na">getLong</span><span class="o">(</span><span class="n">url</span><span class="o">);</span>
     <span class="o">})</span>
     <span class="o">.</span><span class="na">filter</span><span class="o">(</span><span class="n">s</span> <span class="o">-&gt;</span> <span class="n">s</span> <span class="o">!=</span> <span class="kc">null</span><span class="o">)</span>
-    <span class="o">.</span><span class="na">findFirst</span><span class="o">();</span>
-</pre></div>
+    <span class="o">.</span><span class="na">findFirst</span><span class="o">();</span></code></pre></div>
                 </div>
                 <p>At this point the full state of the application is interactively queryable:</p>
                 <ul class="simple">
@@ -490,10 +479,10 @@ interactive queries</span></p>
 
                 <!--#include virtual="../../../includes/_header.htm" -->
                 <!--#include virtual="../../../includes/_top.htm" -->
-                    <div class="content documentation documentation--current">
+                    <div class="content documentation ">
                     <!--#include virtual="../../../includes/_nav.htm" -->
                     <div class="right">
-                    <!--#include virtual="../../../includes/_docs_banner.htm" -->
+                    <!--//#include virtual="../../../includes/_docs_banner.htm" -->
                     <ul class="breadcrumbs">
                     <li><a href="/documentation">Documentation</a></li>
                     <li><a href="/documentation/streams">Kafka Streams</a></li>
diff --git a/docs/streams/developer-guide/manage-topics.html b/docs/streams/developer-guide/manage-topics.html
index 1dd7cfc..d65e375 100644
--- a/docs/streams/developer-guide/manage-topics.html
+++ b/docs/streams/developer-guide/manage-topics.html
@@ -89,10 +89,10 @@
 
 <!--#include virtual="../../../includes/_header.htm" -->
 <!--#include virtual="../../../includes/_top.htm" -->
-<div class="content documentation documentation--current">
+<div class="content documentation ">
   <!--#include virtual="../../../includes/_nav.htm" -->
   <div class="right">
-    <!--#include virtual="../../../includes/_docs_banner.htm" -->
+    <!--//#include virtual="../../../includes/_docs_banner.htm" -->
     <ul class="breadcrumbs">
       <li><a href="/documentation">Documentation</a></li>
       <li><a href="/documentation/streams">Kafka Streams</a></li>
diff --git a/docs/streams/developer-guide/memory-mgmt.html b/docs/streams/developer-guide/memory-mgmt.html
index d4c2bef..0070705 100644
--- a/docs/streams/developer-guide/memory-mgmt.html
+++ b/docs/streams/developer-guide/memory-mgmt.html
@@ -82,8 +82,7 @@
         processing topology:</p>
       <div class="highlight-java"><div class="highlight"><pre><span></span><span class="c1">// Enable record cache of size 10 MB.</span>
 <span class="n">Properties</span> <span class="n">props</span> <span class="o">=</span> <span class="k">new</span> <span class="n">Properties</span><span class="o">();</span>
-<span class="n">props</span><span class="o">.</span><span class="na">put</span><span class="o">(</span><span class="n">StreamsConfig</span><span class="o">.</span><span class="na">CACHE_MAX_BYTES_BUFFERING_CONFIG</span><span class="o">,</span> <span class="mi">10</span> <span class="o">*</span> <span class="mi">1024</span> <span class="o">*</span> <span class="mi">1024L</span><span class="o">);</span>
-</pre></div>
+<span class="n">props</span><span class="o">.</span><span class="na">put</span><span class="o">(</span><span class="n">StreamsConfig</span><span class="o">.</span><span class="na">CACHE_MAX_BYTES_BUFFERING_CONFIG</span><span class="o">,</span> <span class="mi">10</span> <span class="o">*</span> <span class="mi">1024</span> <span class="o">*</span> <span class="mi">1024L</span><span class="o">);</span></code></pre></div>
       </div>
       <p>This parameter controls the number of bytes allocated for caching. Specifically, for a processor topology instance with
         <code class="docutils literal"><span class="pre">T</span></code> threads and <code class="docutils literal"><span class="pre">C</span></code> bytes allocated for caching, each thread will have an even <code class="docutils literal"><span class="pre">C/T</span></code> bytes to construct its own
@@ -107,8 +106,7 @@
           <blockquote>
             <div><div class="highlight-java"><div class="highlight"><pre><span></span><span class="c1">// Disable record cache</span>
 <span class="n">Properties</span> <span class="n">props</span> <span class="o">=</span> <span class="k">new</span> <span class="n">Properties</span><span class="o">();</span>
-<span class="n">props</span><span class="o">.</span><span class="na">put</span><span class="o">(</span><span class="n">StreamsConfig</span><span class="o">.</span><span class="na">CACHE_MAX_BYTES_BUFFERING_CONFIG</span><span class="o">,</span> <span class="mi">0</span><span class="o">);</span>
-</pre></div>
+<span class="n">props</span><span class="o">.</span><span class="na">put</span><span class="o">(</span><span class="n">StreamsConfig</span><span class="o">.</span><span class="na">CACHE_MAX_BYTES_BUFFERING_CONFIG</span><span class="o">,</span> <span class="mi">0</span><span class="o">);</span></code></pre></div>
             </div>
               <p>Turning off caching might result in high write traffic for the underlying RocksDB store.
                 With default settings caching is enabled within Kafka Streams but RocksDB caching is disabled.
@@ -123,8 +121,7 @@
 <span class="c1">// Enable record cache of size 10 MB.</span>
 <span class="n">props</span><span class="o">.</span><span class="na">put</span><span class="o">(</span><span class="n">StreamsConfig</span><span class="o">.</span><span class="na">CACHE_MAX_BYTES_BUFFERING_CONFIG</span><span class="o">,</span> <span class="mi">10</span> <span class="o">*</span> <span class="mi">1024</span> <span class="o">*</span> <span class="mi">1024L</span><span class="o">);</span>
 <span class="c1">// Set commit interval to 1 second.</span>
-<span class="n">props</span><span class="o">.</span><span class="na">put</span><span class="o">(</span><span class="n">StreamsConfig</span><span class="o">.</span><span class="na">COMMIT_INTERVAL_MS_CONFIG</span><span class="o">,</span> <span class="mi">1000</span><span class="o">);</span>
-</pre></div>
+<span class="n">props</span><span class="o">.</span><span class="na">put</span><span class="o">(</span><span class="n">StreamsConfig</span><span class="o">.</span><span class="na">COMMIT_INTERVAL_MS_CONFIG</span><span class="o">,</span> <span class="mi">1000</span><span class="o">);</span></code></pre></div>
             </div>
             </div></blockquote>
         </li>
@@ -159,13 +156,12 @@
       <p>Following from the example first shown in section <a class="reference internal" href="processor-api.html#streams-developer-guide-state-store"><span class="std std-ref">State Stores</span></a>, to disable caching, you can
         add the <code class="docutils literal"><span class="pre">withCachingDisabled</span></code> call (note that caches are enabled by default, however there is an explicit <code class="docutils literal"><span class="pre">withCachingEnabled</span></code>
         call).</p>
-      <div class="highlight-java"><div class="highlight"><pre><span></span><span class="n">StoreBuilder</span> <span class="n">countStoreBuilder</span> <span class="o">=</span>
+      <div class="highlight-java"><div class="highlight"><pre><code><span></span><span class="n">StoreBuilder</span> <span class="n">countStoreBuilder</span> <span class="o">=</span>
   <span class="n">Stores</span><span class="o">.</span><span class="na">keyValueStoreBuilder</span><span class="o">(</span>
     <span class="n">Stores</span><span class="o">.</span><span class="na">persistentKeyValueStore</span><span class="o">(</span><span class="s">&quot;Counts&quot;</span><span class="o">),</span>
     <span class="n">Serdes</span><span class="o">.</span><span class="na">String</span><span class="o">(),</span>
     <span class="n">Serdes</span><span class="o">.</span><span class="na">Long</span><span class="o">())</span>
-  <span class="o">.</span><span class="na">withCachingEnabled</span><span class="o">()</span>
-</pre></div>
+  <span class="o">.</span><span class="na">withCachingEnabled</span><span class="o">()</span></code></pre></div>
       </div>
     </div>
     <div class="section" id="rocksdb">
@@ -207,9 +203,9 @@
     <span class="o">}</span>
       </div>
         <sup id="fn1">1. INDEX_FILTER_BLOCK_RATIO can be used to set a fraction of the block cache to set aside for "high priority" (aka index and filter) blocks, preventing them from being evicted by data blocks. See the full signature of the <a class="reference external" href="https://github.com/facebook/rocksdb/blob/master/java/src/main/java/org/rocksdb/LRUCache.java#L72">LRUCache constructor</a>.
-                        NOTE: the boolean parameter in the cache constructor lets you control whether the cache should enforce a strict memory limit by failing the read or iteration in the rare cases where it might go larger than its capacity. Due to a
-                        <a class="reference external" href="https://github.com/facebook/rocksdb/issues/6247">bug in RocksDB</a>, this option cannot be used
-                        if the write buffer memory is also counted against the cache. If you set this to true, you should NOT pass the cache in to the <code>WriteBufferManager</code> and just control the write buffer and cache memory separately.</sup>
+          NOTE: the boolean parameter in the cache constructor lets you control whether the cache should enforce a strict memory limit by failing the read or iteration in the rare cases where it might go larger than its capacity. Due to a
+          <a class="reference external" href="https://github.com/facebook/rocksdb/issues/6247">bug in RocksDB</a>, this option cannot be used
+          if the write buffer memory is also counted against the cache. If you set this to true, you should NOT pass the cache in to the <code>WriteBufferManager</code> and just control the write buffer and cache memory separately.</sup>
         <br>
         <sup id="fn2">2. This must be set in order for INDEX_FILTER_BLOCK_RATIO to take effect (see footnote 1) as described in the <a class="reference external" href="https://github.com/facebook/rocksdb/wiki/Block-Cache#caching-index-and-filter-blocks">RocksDB docs</a></sup>
         <br>
@@ -253,10 +249,10 @@
 
 <!--#include virtual="../../../includes/_header.htm" -->
 <!--#include virtual="../../../includes/_top.htm" -->
-<div class="content documentation documentation--current">
+<div class="content documentation ">
   <!--#include virtual="../../../includes/_nav.htm" -->
   <div class="right">
-    <!--#include virtual="../../../includes/_docs_banner.htm" -->
+    <!--//#include virtual="../../../includes/_docs_banner.htm" -->
     <ul class="breadcrumbs">
       <li><a href="/documentation">Documentation</a></li>
       <li><a href="/documentation/streams">Kafka Streams</a></li>
diff --git a/docs/streams/developer-guide/processor-api.html b/docs/streams/developer-guide/processor-api.html
index d314c7d..a5c1335 100644
--- a/docs/streams/developer-guide/processor-api.html
+++ b/docs/streams/developer-guide/processor-api.html
@@ -158,8 +158,7 @@
       <span class="c1">// Note: Do not close any StateStores as these are managed by the library</span>
   <span class="o">}</span>
 
-<span class="o">}</span>
-</pre></div>
+<span class="o">}</span></code></pre></div>
             </div>
             <div class="admonition note">
                 <p><b>Note</b></p>
@@ -246,8 +245,7 @@
     <span class="n">Stores</span><span class="o">.</span><span class="na">persistentKeyValueStore</span><span class="o">(</span><span class="s">&quot;persistent-counts&quot;</span><span class="o">),</span>
     <span class="n">Serdes</span><span class="o">.</span><span class="na">String</span><span class="o">(),</span>
     <span class="n">Serdes</span><span class="o">.</span><span class="na">Long</span><span class="o">());</span>
-<span class="n">KeyValueStore</span><span class="o">&lt;</span><span class="n">String</span><span class="o">,</span> <span class="n">Long</span><span class="o">&gt;</span> <span class="n">countStore</span> <span class="o">=</span> <span class="n">countStoreSupplier</span><span class="o">.</span><span class="na">build</span><span class="o">();</span>
-</pre></div>
+<span class="n">KeyValueStore</span><span class="o">&lt;</span><span class="n">String</span><span class="o">,</span> <span class="n">Long</span><span class="o">&gt;</span> <span class="n">countStore</span> <span class="o">=</span> <span class="n">countStoreSupplier</span><span class="o">.</span><span class="na">build</span><span class="o">();</span></code></pre></div>
                             </div>
                         </td>
                     </tr>
@@ -281,8 +279,7 @@
     <span class="n">Stores</span><span class="o">.</span><span class="na">inMemoryKeyValueStore</span><span class="o">(</span><span class="s">&quot;inmemory-counts&quot;</span><span class="o">),</span>
     <span class="n">Serdes</span><span class="o">.</span><span class="na">String</span><span class="o">(),</span>
     <span class="n">Serdes</span><span class="o">.</span><span class="na">Long</span><span class="o">());</span>
-<span class="n">KeyValueStore</span><span class="o">&lt;</span><span class="n">String</span><span class="o">,</span> <span class="n">Long</span><span class="o">&gt;</span> <span class="n">countStore</span> <span class="o">=</span> <span class="n">countStoreSupplier</span><span class="o">.</span><span class="na">build</span><span class="o">();</span>
-</pre></div>
+<span class="n">KeyValueStore</span><span class="o">&lt;</span><span class="n">String</span><span class="o">,</span> <span class="n">Long</span><span class="o">&gt;</span> <span class="n">countStore</span> <span class="o">=</span> <span class="n">countStoreSupplier</span><span class="o">.</span><span class="na">build</span><span class="o">();</span></code></pre></div>
                             </div>
                         </td>
                     </tr>
@@ -327,8 +324,7 @@
   <span class="n">Stores</span><span class="o">.</span><span class="na">persistentKeyValueStore</span><span class="o">(</span><span class="s">&quot;Counts&quot;</span><span class="o">),</span>
     <span class="n">Serdes</span><span class="o">.</span><span class="na">String</span><span class="o">(),</span>
     <span class="n">Serdes</span><span class="o">.</span><span class="na">Long</span><span class="o">())</span>
-  <span class="o">.</span><span class="na">withLoggingDisabled</span><span class="o">();</span> <span class="c1">// disable backing up the store to a changelog topic</span>
-</pre></div>
+  <span class="o">.</span><span class="na">withLoggingDisabled</span><span class="o">();</span> <span class="c1">// disable backing up the store to a changelog topic</span></code></pre></div>
                 </div>
                 <div class="admonition attention">
                     <p class="first admonition-title">Attention</p>
@@ -348,8 +344,7 @@
   <span class="n">Stores</span><span class="o">.</span><span class="na">persistentKeyValueStore</span><span class="o">(</span><span class="s">&quot;Counts&quot;</span><span class="o">),</span>
     <span class="n">Serdes</span><span class="o">.</span><span class="na">String</span><span class="o">(),</span>
     <span class="n">Serdes</span><span class="o">.</span><span class="na">Long</span><span class="o">())</span>
-  <span class="o">.</span><span class="na">withLoggingEnabled</span><span class="o">(</span><span class="n">changlogConfig</span><span class="o">);</span> <span class="c1">// enable changelogging, with custom changelog settings</span>
-</pre></div>
+  <span class="o">.</span><span class="na">withLoggingEnabled</span><span class="o">(</span><span class="n">changlogConfig</span><span class="o">);</span> <span class="c1">// enable changelogging, with custom changelog settings</span></code></pre></div>
                 </div>
             </div>
             <div class="section" id="timestamped-state-stores">
@@ -398,8 +393,7 @@
 
     <span class="c1">// add a header to the elements</span>
     <span class="n">context()</span><span class="o">.</span><span class="na">headers</span><span class="o">()</span><span class="o">.</span><span class="na">add</span><span class="o">.</span><span class="o">(</span><span class="s">&quot;key&quot;</span><span class="o">,</span> <span class="s">&quot;key&quot;</span>
-<span class="o">}</span>
-</pre></div>
+<span class="o">}</span></code></pre></div>
         </div>
         <div class="section" id="connecting-processors-and-state-stores">
             <h2><a class="toc-backref" href="#id8">Connecting Processors and State Stores</a><a class="headerlink" href="#connecting-processors-and-state-stores" title="Permalink to this headline"></a></h2>
@@ -409,8 +403,7 @@
                 to generate input data streams into the topology, and sink processors with the specified Kafka topics to generate
                 output data streams out of the topology.</p>
             <p>Here is an example implementation:</p>
-            <pre class="brush: java">
-                Topology builder = new Topology();
+            <pre class="line-numbers"><code class="language-java">                Topology builder = new Topology();
                 // add the source processor node that takes Kafka topic "source-topic" as input
                 builder.addSource("Source", "source-topic")
                     // add the WordCountProcessor node which takes the source processor as its upstream processor
@@ -419,8 +412,7 @@
                     .addStateStore(countStoreBuilder, "Process")
                     // add the sink processor node that takes Kafka topic "sink-topic" as output
                     // and the WordCountProcessor node as its upstream processor
-                    .addSink("Sink", "sink-topic", "Process");
-            </pre>
+                    .addSink("Sink", "sink-topic", "Process");</code></pre>
             <p>Here is a quick explanation of this example:</p>
             <ul class="simple">
                 <li>A source processor node named <code class="docutils literal"><span class="pre">&quot;Source&quot;</span></code> is added to the topology using the <code class="docutils literal"><span class="pre">addSource</span></code> method, with one Kafka topic
@@ -437,8 +429,7 @@
                 This can be done by implementing <code class="docutils literal"><span class="pre">ConnectedStoreProvider#stores()</span></code> on the <code class="docutils literal"><span class="pre">ProcessorSupplier</span></code>
                 instead of calling <code class="docutils literal"><span class="pre">Topology#addStateStore()</span></code>, like this:
             </p>
-            <pre class="brush: java">
-                Topology builder = new Topology();
+            <pre class="line-numbers"><code class="language-java">                Topology builder = new Topology();
                 // add the source processor node that takes Kafka "source-topic" as input
                 builder.addSource("Source", "source-topic")
                     // add the WordCountProcessor node which takes the source processor as its upstream processor.
@@ -453,8 +444,7 @@
                     }, "Source")
                     // add the sink processor node that takes Kafka topic "sink-topic" as output
                     // and the WordCountProcessor node as its upstream processor
-                    .addSink("Sink", "sink-topic", "Process");
-            </pre>
+                    .addSink("Sink", "sink-topic", "Process");</code></pre>
             <p>This allows for a processor to "own" state stores, effectively encapsulating their usage from the user wiring the topology.
                 Multiple processors that share a state store may provide the same store with this technique, as long as the <code class="docutils literal"><span class="pre">StoreBuilder</span></code> is the same <code class="docutils literal"><span class="pre">instance</span></code>.</p>
             <p>In these topologies, the <code class="docutils literal"><span class="pre">&quot;Process&quot;</span></code> stream processor node is considered a downstream processor of the <code class="docutils literal"><span class="pre">&quot;Source&quot;</span></code> node, and an
@@ -483,10 +473,10 @@
 
                 <!--#include virtual="../../../includes/_header.htm" -->
                 <!--#include virtual="../../../includes/_top.htm" -->
-                    <div class="content documentation documentation--current">
+                    <div class="content documentation ">
                     <!--#include virtual="../../../includes/_nav.htm" -->
                     <div class="right">
-                    <!--#include virtual="../../../includes/_docs_banner.htm" -->
+                    <!--//#include virtual="../../../includes/_docs_banner.htm" -->
                     <ul class="breadcrumbs">
                     <li><a href="/documentation">Documentation</a></li>
                     <li><a href="/documentation/streams">Kafka Streams</a></li>
diff --git a/docs/streams/developer-guide/running-app.html b/docs/streams/developer-guide/running-app.html
index fccb5090..87ee8f0 100644
--- a/docs/streams/developer-guide/running-app.html
+++ b/docs/streams/developer-guide/running-app.html
@@ -51,10 +51,9 @@
           <div class="section" id="starting-a-kafka-streams-application">
               <span id="streams-developer-guide-execution-starting"></span><h2><a class="toc-backref" href="#id3">Starting a Kafka Streams application</a><a class="headerlink" href="#starting-a-kafka-streams-application" title="Permalink to this headline"></a></h2>
               <p>You can package your Java application as a fat JAR file and then start the application like this:</p>
-              <div class="highlight-bash"><div class="highlight"><pre><span></span><span class="c1"># Start the application in class `com.example.MyStreamsApp`</span>
+              <div class="highlight-bash"><div class="highlight"><pre><code><span></span><span class="c1"># Start the application in class `com.example.MyStreamsApp`</span>
 <span class="c1"># from the fat JAR named `path-to-app-fatjar.jar`.</span>
-$ java -cp path-to-app-fatjar.jar com.example.MyStreamsApp
-</pre></div>
+$ java -cp path-to-app-fatjar.jar com.example.MyStreamsApp</code></pre></div>
               </div>
               <p>When you start your application you are launching a Kafka Streams instance of your application. You can run multiple
                   instances of your application. A common scenario is that there are multiple instances of your application running in
@@ -116,7 +115,7 @@ $ java -cp path-to-app-fatjar.jar com.example.MyStreamsApp
                       <a class="reference internal" href="config-streams.html#acceptable-recovery-lag"><span class="std std-ref"><code>acceptable.recovery.lag</code></span></a>, if one exists. This means that
                       most of the time, a task migration will <b>not</b> result in downtime for that task. It will remain active on the instance that's already caught up, while the instance that it's being
                       migrated to works on restoring the state. Streams will <a class="reference internal" href="config-streams.html#probing-rebalance-interval-ms"><span class="std std-ref">regularly probe</span></a> for warmup tasks that have finished restoring and transition them to active tasks when ready.
-                   </p>
+                  </p>
                   <p>
                       Note, the one exception to this task availability is if none of the instances have a caught up version of that task. In that case, we have no choice but to assign the active
                       task to an instance that is not caught up and will have to block further processing on restoration of the task's state from the changelog. If high availability is important
@@ -151,10 +150,10 @@ $ java -cp path-to-app-fatjar.jar com.example.MyStreamsApp
 
                 <!--#include virtual="../../../includes/_header.htm" -->
                 <!--#include virtual="../../../includes/_top.htm" -->
-                    <div class="content documentation documentation--current">
+                    <div class="content documentation ">
                     <!--#include virtual="../../../includes/_nav.htm" -->
                     <div class="right">
-                    <!--#include virtual="../../../includes/_docs_banner.htm" -->
+                    <!--//#include virtual="../../../includes/_docs_banner.htm" -->
                     <ul class="breadcrumbs">
                     <li><a href="/documentation">Documentation</a></li>
                     <li><a href="/documentation/streams">Kafka Streams</a></li>
diff --git a/docs/streams/developer-guide/security.html b/docs/streams/developer-guide/security.html
index bd05fc5..05de079 100644
--- a/docs/streams/developer-guide/security.html
+++ b/docs/streams/developer-guide/security.html
@@ -98,15 +98,14 @@
                 then you must also include these SSL certificates in the correct locations within the Docker image.</p>
             <p>The snippet below shows the settings to enable client authentication and SSL encryption for data-in-transit between your
                 Kafka Streams application and the Kafka cluster it is reading and writing from:</p>
-            <div class="highlight-bash"><div class="highlight"><pre><span></span><span class="c1"># Essential security settings to enable client authentication and SSL encryption</span>
+            <div class="highlight-bash"><div class="highlight"><pre><code><span></span><span class="c1"># Essential security settings to enable client authentication and SSL encryption</span>
 bootstrap.servers<span class="o">=</span>kafka.example.com:9093
 security.protocol<span class="o">=</span>SSL
 ssl.truststore.location<span class="o">=</span>/etc/security/tls/kafka.client.truststore.jks
 ssl.truststore.password<span class="o">=</span>test1234
 ssl.keystore.location<span class="o">=</span>/etc/security/tls/kafka.client.keystore.jks
 ssl.keystore.password<span class="o">=</span>test1234
-ssl.key.password<span class="o">=</span>test1234
-</pre></div>
+ssl.key.password<span class="o">=</span>test1234</code></pre></div>
             </div>
             <p>Configure these settings in the application for your <code class="docutils literal"><span class="pre">Properties</span></code> instance. These settings will encrypt any
                 data-in-transit that is being read from or written to Kafka, and your application will authenticate itself against the
@@ -127,20 +126,18 @@ ssl.key.password<span class="o">=</span>test1234
 <span class="n">settings</span><span class="o">.</span><span class="na">put</span><span class="o">(</span><span class="n">SslConfigs</span><span class="o">.</span><span class="na">SSL_TRUSTSTORE_PASSWORD_CONFIG</span><span class="o">,</span> <span class="s">&quot;test1234&quot;</span><span class="o">);</span>
 <span class="n">settings</span><span class="o">.</span><span class="na">put</span><span class="o">(</span><span class="n">SslConfigs</span><span class="o">.</span><span class="na">SSL_KEYSTORE_LOCATION_CONFIG</span><span class="o">,</span> <span class="s">&quot;/etc/security/tls/kafka.client.keystore.jks&quot;</span><span class="o">);</span>
 <span class="n">settings</span><span class="o">.</span><span class="na">put</span><span class="o">(</span><span class="n">SslConfigs</span><span class="o">.</span><span class="na">SSL_KEYSTORE_PASSWORD_CONFIG</span><span class="o">,</span> <span class="s">&quot;test1234&quot;</span><span class="o">);</span>
-<span class="n">settings</span><span class="o">.</span><span class="na">put</span><span class="o">(</span><span class="n">SslConfigs</span><span class="o">.</span><span class="na">SSL_KEY_PASSWORD_CONFIG</span><span class="o">,</span> <span class="s">&quot;test1234&quot;</span><span class="o">);</span>
-</pre></div>
+<span class="n">settings</span><span class="o">.</span><span class="na">put</span><span class="o">(</span><span class="n">SslConfigs</span><span class="o">.</span><span class="na">SSL_KEY_PASSWORD_CONFIG</span><span class="o">,</span> <span class="s">&quot;test1234&quot;</span><span class="o">);</span></code></pre></div>
             </div>
             <p>If you incorrectly configure a security setting in your application, it will fail at runtime, typically right after you
                 start it.  For example, if you enter an incorrect password for the <code class="docutils literal"><span class="pre">ssl.keystore.password</span></code> setting, an error message
                 similar to this would be logged and then the application would terminate:</p>
-            <div class="highlight-bash"><div class="highlight"><pre><span></span><span class="c1"># Misconfigured ssl.keystore.password</span>
+            <div class="highlight-bash"><div class="highlight"><pre><code><span></span><span class="c1"># Misconfigured ssl.keystore.password</span>
 Exception in thread <span class="s2">&quot;main&quot;</span> org.apache.kafka.common.KafkaException: Failed to construct kafka producer
 <span class="o">[</span>...snip...<span class="o">]</span>
 Caused by: org.apache.kafka.common.KafkaException: org.apache.kafka.common.KafkaException:
    java.io.IOException: Keystore was tampered with, or password was incorrect
 <span class="o">[</span>...snip...<span class="o">]</span>
-Caused by: java.security.UnrecoverableKeyException: Password verification failed
-</pre></div>
+Caused by: java.security.UnrecoverableKeyException: Password verification failed</code></pre></div>
             </div>
             <p>Monitor your Kafka Streams application log files for such error messages to spot any misconfigured applications quickly.</p>
 </div>
@@ -157,10 +154,10 @@ Caused by: java.security.UnrecoverableKeyException: Password verification failed
 
                 <!--#include virtual="../../../includes/_header.htm" -->
                 <!--#include virtual="../../../includes/_top.htm" -->
-                    <div class="content documentation documentation--current">
+                    <div class="content documentation ">
                     <!--#include virtual="../../../includes/_nav.htm" -->
                     <div class="right">
-                    <!--#include virtual="../../../includes/_docs_banner.htm" -->
+                    <!--//#include virtual="../../../includes/_docs_banner.htm" -->
                     <ul class="breadcrumbs">
                     <li><a href="/documentation">Documentation</a></li>
                     <li><a href="/documentation/streams">Kafka Streams</a></li>
diff --git a/docs/streams/developer-guide/testing.html b/docs/streams/developer-guide/testing.html
index 5be2255..79ae6e1 100644
--- a/docs/streams/developer-guide/testing.html
+++ b/docs/streams/developer-guide/testing.html
@@ -51,14 +51,12 @@
                 To test a Kafka Streams application, Kafka provides a test-utils artifact that can be added as regular
                 dependency to your test code base. Example <code>pom.xml</code> snippet when using Maven:
             </p>
-            <pre>
-&lt;dependency&gt;
+            <pre class="line-numbers"><code class="language-text">&lt;dependency&gt;
     &lt;groupId&gt;org.apache.kafka&lt;/groupId&gt;
     &lt;artifactId&gt;kafka-streams-test-utils&lt;/artifactId&gt;
     &lt;version&gt;{{fullDotVersion}}&lt;/version&gt;
     &lt;scope&gt;test&lt;/scope&gt;
-&lt;/dependency&gt;
-    </pre>
+&lt;/dependency&gt;</code></pre>
         </div>
         <div class="section" id="testing-topologytestdriver">
             <h2><a class="toc-backref" href="#testing-topologytestdriver" title="Permalink to this headline">Testing a
@@ -73,8 +71,7 @@
                 You can use the test driver to verify that your specified processor topology computes the correct result
                 with the manually piped in data records.
                 The test driver captures the results records and allows to query its embedded state stores.
-            <pre>
-// Processor API
+            <pre class="line-numbers"><code class="language-text">// Processor API
 Topology topology = new Topology();
 topology.addSource("sourceProcessor", "input-topic");
 topology.addProcessor("processor", ..., "sourceProcessor");
@@ -89,16 +86,13 @@ Topology topology = builder.build();
 Properties props = new Properties();
 props.put(StreamsConfig.APPLICATION_ID_CONFIG, "test");
 props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "dummy:1234");
-TopologyTestDriver testDriver = new TopologyTestDriver(topology, props);
-        </pre>
+TopologyTestDriver testDriver = new TopologyTestDriver(topology, props);</code></pre>
             <p>
                 With the test driver you can create <code>TestInputTopic</code> giving topic name and the corresponding serializers.
                 <code>TestInputTopic</code> provides various methods to pipe new message values, keys and values, or list of KeyValue objects.
             </p>
-            <pre>
-TestInputTopic&lt;String, Long&gt; inputTopic = testDriver.createInputTopic("input-topic", stringSerde.serializer(), longSerde.serializer());
-inputTopic.pipeInput("key", 42L);
-        </pre>
+            <pre class="line-numbers"><code class="language-text">TestInputTopic&lt;String, Long&gt; inputTopic = testDriver.createInputTopic("input-topic", stringSerde.serializer(), longSerde.serializer());
+inputTopic.pipeInput("key", 42L);</code></pre>
             <p>
                 To verify the output, you can use <code>TestOutputTopic</code>
                 where you configure the topic and the corresponding deserializers during initialization.
@@ -106,34 +100,26 @@ inputTopic.pipeInput("key", 42L);
                 For example, you can validate returned <code>KeyValue</code> with standard assertions
                 if you only care about the key and value, but not the timestamp of the result record.
             </p>
-            <pre>
-TestOutputTopic&lt;String, Long&gt; outputTopic = testDriver.createOutputTopic("result-topic", stringSerde.deserializer(), longSerde.deserializer());
-assertThat(outputTopic.readKeyValue(), equalTo(new KeyValue&lt;&gt;("key", 42L)));
-        </pre>
+            <pre class="line-numbers"><code class="language-text">TestOutputTopic&lt;String, Long&gt; outputTopic = testDriver.createOutputTopic("output-topic", stringSerde.deserializer(), longSerde.deserializer());
+assertThat(outputTopic.readKeyValue(), equalTo(new KeyValue&lt;&gt;("key", 42L)));</code></pre>
             <p>
                 <code>TopologyTestDriver</code> supports punctuations, too.
                 Event-time punctuations are triggered automatically based on the processed records' timestamps.
                 Wall-clock-time punctuations can also be triggered by advancing the test driver's wall-clock-time (the
                 driver mocks wall-clock-time internally to give users control over it).
             </p>
-            <pre>
-testDriver.advanceWallClockTime(Duration.ofSeconds(20));
-        </pre>
+            <pre class="line-numbers"><code class="language-text">testDriver.advanceWallClockTime(Duration.ofSeconds(20));</code></pre>
             <p>
                 Additionally, you can access state stores via the test driver before or after a test.
                 Accessing stores before a test is useful to pre-populate a store with some initial values.
                 After data was processed, expected updates to the store can be verified.
             </p>
-            <pre>
-KeyValueStore store = testDriver.getKeyValueStore("store-name");
-        </pre>
+            <pre class="line-numbers"><code class="language-text">KeyValueStore store = testDriver.getKeyValueStore("store-name");</code></pre>
             <p>
                 Note, that you should always close the test driver at the end to make sure all resources are release
                 properly.
             </p>
-            <pre>
-testDriver.close();
-        </pre>
+            <pre class="line-numbers"><code class="language-text">testDriver.close();</code></pre>
 
             <h3>Example</h3>
             <p>
@@ -142,8 +128,7 @@ testDriver.close();
                 While processing, no output is generated, but only the store is updated.
                 Output is only sent downstream based on event-time and wall-clock punctuations.
             </p>
-            <pre>
-private TopologyTestDriver testDriver;
+            <pre class="line-numbers"><code class="language-text">private TopologyTestDriver testDriver;
 private TestInputTopic&lt;String, Long&gt; inputTopic;
 private TestOutputTopic&lt;String, Long&gt; outputTopic;
 private KeyValueStore&lt;String, Long&gt; store;
@@ -277,8 +262,7 @@ public class CustomMaxAggregator implements Processor&lt;String, Long&gt; {
 
     @Override
     public void close() {}
-}
-        </pre>
+}</code></pre>
         </div>
         <div class="section" id="unit-testing-processors">
             <h2>
@@ -296,28 +280,23 @@ public class CustomMaxAggregator implements Processor&lt;String, Long&gt; {
             <b>Construction</b>
             <p>
                 To begin with, instantiate your processor and initialize it with the mock context:
-            <pre>
-final Processor processorUnderTest = ...;
+            <pre class="line-numbers"><code class="language-text">final Processor processorUnderTest = ...;
 final MockProcessorContext context = new MockProcessorContext();
-processorUnderTest.init(context);
-                </pre>
+processorUnderTest.init(context);</code></pre>
             If you need to pass configuration to your processor or set the default serdes, you can create the mock with
             config:
-            <pre>
-final Properties props = new Properties();
+            <pre class="line-numbers"><code class="language-text">final Properties props = new Properties();
 props.put(StreamsConfig.APPLICATION_ID_CONFIG, "unit-test");
 props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "");
 props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass());
 props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.Long().getClass());
 props.put("some.other.config", "some config value");
-final MockProcessorContext context = new MockProcessorContext(props);
-                </pre>
+final MockProcessorContext context = new MockProcessorContext(props);</code></pre>
             </p>
             <b>Captured data</b>
             <p>
                 The mock will capture any values that your processor forwards. You can make assertions on them:
-            <pre>
-processorUnderTest.process("key", "value");
+            <pre class="line-numbers"><code class="language-text">processorUnderTest.process("key", "value");
 
 final Iterator&lt;CapturedForward&gt; forwarded = context.forwarded().iterator();
 assertEquals(forwarded.next().keyValue(), new KeyValue&lt;&gt;(..., ...));
@@ -326,34 +305,27 @@ assertFalse(forwarded.hasNext());
 // you can reset forwards to clear the captured data. This may be helpful in constructing longer scenarios.
 context.resetForwards();
 
-assertEquals(context.forwarded().size(), 0);
-            </pre>
+assertEquals(context.forwarded().size(), 0);</code></pre>
             If your processor forwards to specific child processors, you can query the context for captured data by
             child name:
-            <pre>
-final List&lt;CapturedForward&gt; captures = context.forwarded("childProcessorName");
-            </pre>
+            <pre class="line-numbers"><code class="language-text">final List&lt;CapturedForward&gt; captures = context.forwarded("childProcessorName");</code></pre>
             The mock also captures whether your processor has called <code>commit()</code> on the context:
-            <pre>
-assertTrue(context.committed());
+            <pre class="line-numbers"><code class="language-text">assertTrue(context.committed());
 
 // commit captures can also be reset.
 context.resetCommit();
 
-assertFalse(context.committed());
-            </pre>
+assertFalse(context.committed());</code></pre>
             </p>
             <b>Setting record metadata</b>
             <p>
                 In case your processor logic depends on the record metadata (topic, partition, offset, or timestamp),
                 you can set them on the context, either all together or individually:
-            <pre>
-context.setRecordMetadata("topicName", /*partition*/ 0, /*offset*/ 0L, /*timestamp*/ 0L);
+            <pre class="line-numbers"><code class="language-text">context.setRecordMetadata("topicName", /*partition*/ 0, /*offset*/ 0L, /*timestamp*/ 0L);
 context.setTopic("topicName");
 context.setPartition(0);
 context.setOffset(0L);
-context.setTimestamp(0L);
-                </pre>
+context.setTimestamp(0L);</code></pre>
             Once these are set, the context will continue returning the same values, until you set new ones.
             </p>
             <b>State stores</b>
@@ -362,8 +334,7 @@ context.setTimestamp(0L);
                 You're encouraged to use a simple in-memory store of the appropriate type (KeyValue, Windowed, or
                 Session), since the mock context does <i>not</i> manage changelogs, state directories, etc.
             </p>
-            <pre>
-final KeyValueStore&lt;String, Integer&gt; store =
+            <pre class="line-numbers"><code class="language-text">final KeyValueStore&lt;String, Integer&gt; store =
     Stores.keyValueStoreBuilder(
             Stores.inMemoryKeyValueStore("myStore"),
             Serdes.String(),
@@ -372,21 +343,18 @@ final KeyValueStore&lt;String, Integer&gt; store =
         .withLoggingDisabled() // Changelog is not supported by MockProcessorContext.
         .build();
 store.init(context, store);
-context.register(store, /*deprecated parameter*/ false, /*parameter unused in mock*/ null);
-            </pre>
+context.register(store, /*deprecated parameter*/ false, /*parameter unused in mock*/ null);</code></pre>
             <b>Verifying punctuators</b>
             <p>
                 Processors can schedule punctuators to handle periodic tasks.
                 The mock context does <i>not</i> automatically execute punctuators, but it does capture them to
                 allow you to unit test them as well:
-            <pre>
-final MockProcessorContext.CapturedPunctuator capturedPunctuator = context.scheduledPunctuators().get(0);
+            <pre class="line-numbers"><code class="language-text">final MockProcessorContext.CapturedPunctuator capturedPunctuator = context.scheduledPunctuators().get(0);
 final long interval = capturedPunctuator.getIntervalMs();
 final PunctuationType type = capturedPunctuator.getType();
 final boolean cancelled = capturedPunctuator.cancelled();
 final Punctuator punctuator = capturedPunctuator.getPunctuator();
-punctuator.punctuate(/*timestamp*/ 0L);
-                </pre>
+punctuator.punctuate(/*timestamp*/ 0L);</code></pre>
             If you need to write tests involving automatic firing of scheduled punctuators, we recommend creating a
             simple topology with your processor and using the <a href="testing.html#testing-topologytestdriver"><code>TopologyTestDriver</code></a>.
             </p>
@@ -400,10 +368,10 @@ punctuator.punctuate(/*timestamp*/ 0L);
 
 <!--#include virtual="../../../includes/_header.htm" -->
 <!--#include virtual="../../../includes/_top.htm" -->
-<div class="content documentation documentation--current">
+<div class="content documentation ">
     <!--#include virtual="../../../includes/_nav.htm" -->
     <div class="right">
-        <!--#include virtual="../../../includes/_docs_banner.htm" -->
+        <!--//#include virtual="../../../includes/_docs_banner.htm" -->
         <ul class="breadcrumbs">
             <li><a href="/documentation">Documentation</a></li>
             <li><a href="/documentation/streams">Kafka Streams</a></li>
diff --git a/docs/streams/developer-guide/write-streams.html b/docs/streams/developer-guide/write-streams.html
index ed74d72..720b0c3 100644
--- a/docs/streams/developer-guide/write-streams.html
+++ b/docs/streams/developer-guide/write-streams.html
@@ -90,8 +90,7 @@
               <p class="last">See the section <a class="reference internal" href="datatypes.html#streams-developer-guide-serdes"><span class="std std-ref">Data Types and Serialization</span></a> for more information about Serializers/Deserializers.</p>
           </div>
           <p>Example <code class="docutils literal"><span class="pre">pom.xml</span></code> snippet when using Maven:</p>
-          <pre class="brush: xml;">
-<dependency>
+          <pre class="line-numbers"><code class="language-xml"><dependency>
     <groupId>org.apache.kafka</groupId>
     <artifactId>kafka-streams</artifactId>
     <version>{{fullDotVersion}}</version>
@@ -106,8 +105,7 @@
     <groupId>org.apache.kafka</groupId>
     <artifactId>kafka-streams-scala_{{scalaVersion}}</artifactId>
     <version>{{fullDotVersion}}</version>
-</dependency>
-          </pre>
+</dependency></code></pre>
       </div>
     <div class="section" id="using-kafka-streams-within-your-application-code">
       <h2>Using Kafka Streams within your application code<a class="headerlink" href="#using-kafka-streams-within-your-application-code" title="Permalink to this headline"></a></h2>
@@ -143,21 +141,19 @@
 <span class="c1">// and so on.</span>
 <span class="n">Properties</span> <span class="n">props</span> <span class="o">=</span> <span class="o">...;</span>
 
-<span class="n">KafkaStreams</span> <span class="n">streams</span> <span class="o">=</span> <span class="k">new</span> <span class="n">KafkaStreams</span><span class="o">(</span><span class="n">topology</span><span class="o">,</span> <span class="n">props</span><span class="o">);</span>
-</pre></div>
+<span class="n">KafkaStreams</span> <span class="n">streams</span> <span class="o">=</span> <span class="k">new</span> <span class="n">KafkaStreams</span><span class="o">(</span><span class="n">topology</span><span class="o">,</span> <span class="n">props</span><span class="o">);</span></code></pre></div>
       </div>
       <p>At this point, internal structures are initialized, but the processing is not started yet.
         You have to explicitly start the Kafka Streams thread by calling the <code class="docutils literal"><span class="pre">KafkaStreams#start()</span></code> method:</p>
       <div class="highlight-java"><div class="highlight"><pre><span></span><span class="c1">// Start the Kafka Streams threads</span>
-<span class="n">streams</span><span class="o">.</span><span class="na">start</span><span class="o">();</span>
-</pre></div>
+<span class="n">streams</span><span class="o">.</span><span class="na">start</span><span class="o">();</span></code></pre></div>
       </div>
       <p>If there are other instances of this stream processing application running elsewhere (e.g., on another machine), Kafka
         Streams transparently re-assigns tasks from the existing instances to the new instance that you just started.
         For more information, see <a class="reference internal" href="../architecture.html#streams_architecture_tasks"><span class="std std-ref">Stream Partitions and Tasks</span></a> and <a class="reference internal" href="../architecture.html#streams_architecture_threads"><span class="std std-ref">Threading Model</span></a>.</p>
       <p>To catch any unexpected exceptions, you can set an <code class="docutils literal"><span class="pre">java.lang.Thread.UncaughtExceptionHandler</span></code> before you start the
         application.  This handler is called whenever a stream thread is terminated by an unexpected exception:</p>
-      <div class="highlight-java"><div class="highlight"><pre><span></span><span class="c1">// Java 8+, using lambda expressions</span>
+      <div class="highlight-java"><div class="highlight"><pre><code><span></span><span class="c1">// Java 8+, using lambda expressions</span>
 <span class="n">streams</span><span class="o">.</span><span class="na">setUncaughtExceptionHandler</span><span class="o">((</span><span class="n">Thread</span> <span class="n">thread</span><span class="o">,</span> <span class="n">Throwable</span> <span class="n">throwable</span><span class="o">)</span> <span class="o">-&gt;</span> <span class="o">{</span>
   <span class="c1">// here you should examine the throwable/exception and perform an appropriate action!</span>
 <span class="o">});</span>
@@ -168,13 +164,11 @@
   <span class="kd">public</span> <span class="kt">void</span> <span class="nf">uncaughtException</span><span class="o">(</span><span class="n">Thread</span> <span class="n">thread</span><span class="o">,</span> <span class="n">Throwable</span> <span class="n">throwable</span><span class="o">)</span> <span class="o">{</span>
     <span class="c1">// here you should examine the throwable/exception and perform an appropriate action!</span>
   <span class="o">}</span>
-<span class="o">});</span>
-</pre></div>
+<span class="o">});</span></code></pre></div>
       </div>
       <p>To stop the application instance, call the <code class="docutils literal"><span class="pre">KafkaStreams#close()</span></code> method:</p>
       <div class="highlight-java"><div class="highlight"><pre><span></span><span class="c1">// Stop the Kafka Streams threads</span>
-<span class="n">streams</span><span class="o">.</span><span class="na">close</span><span class="o">();</span>
-</pre></div>
+<span class="n">streams</span><span class="o">.</span><span class="na">close</span><span class="o">();</span></code></pre></div>
       </div>
       <p>To allow your application to gracefully shutdown in response to SIGTERM, it is recommended that you add a shutdown hook
         and call <code class="docutils literal"><span class="pre">KafkaStreams#close</span></code>.</p>
@@ -183,8 +177,7 @@
           <blockquote>
             <div><div class="highlight-java"><div class="highlight"><pre><span></span><span class="c1">// Add shutdown hook to stop the Kafka Streams threads.</span>
 <span class="c1">// You can optionally provide a timeout to `close`.</span>
-<span class="n">Runtime</span><span class="o">.</span><span class="na">getRuntime</span><span class="o">().</span><span class="na">addShutdownHook</span><span class="o">(</span><span class="k">new</span> <span class="n">Thread</span><span class="o">(</span><span class="n">streams</span><span class="o">::</span><span class="n">close</span><span class="o">));</span>
-</pre></div>
+<span class="n">Runtime</span><span class="o">.</span><span class="na">getRuntime</span><span class="o">().</span><span class="na">addShutdownHook</span><span class="o">(</span><span class="k">new</span> <span class="n">Thread</span><span class="o">(</span><span class="n">streams</span><span class="o">::</span><span class="n">close</span><span class="o">));</span></code></pre></div>
             </div>
             </div></blockquote>
         </li>
@@ -197,8 +190,7 @@
   <span class="kd">public</span> <span class="kt">void</span> <span class="nf">run</span><span class="o">()</span> <span class="o">{</span>
       <span class="n">streams</span><span class="o">.</span><span class="na">close</span><span class="o">();</span>
   <span class="o">}</span>
-<span class="o">}));</span>
-</pre></div>
+<span class="o">}));</span></code></pre></div>
             </div>
             </div></blockquote>
         </li>
@@ -224,10 +216,10 @@
 
 <!--#include virtual="../../../includes/_header.htm" -->
 <!--#include virtual="../../../includes/_top.htm" -->
-<div class="content documentation documentation--current">
+<div class="content documentation ">
   <!--#include virtual="../../../includes/_nav.htm" -->
   <div class="right">
-    <!--#include virtual="../../../includes/_docs_banner.htm" -->
+    <!--//#include virtual="../../../includes/_docs_banner.htm" -->
     <ul class="breadcrumbs">
       <li><a href="/documentation">Documentation</a></li>
       <li><a href="/documentation/streams">Kafka Streams</a></li>
diff --git a/docs/streams/index.html b/docs/streams/index.html
index 6872297..3d84bbf 100644
--- a/docs/streams/index.html
+++ b/docs/streams/index.html
@@ -48,16 +48,16 @@
                 <h3>TOUR OF THE STREAMS API</h3>
                 <div class="video__list">
                    <p class="video__item video_list_1 active" onclick="$('.video__item').removeClass('active'); $(this).addClass('active');$('.yt_series').hide();$('.video_1').show();">
-                       <span class="number">1</span><span class="video__text">Intro to Streams</span>
+                       <span class="video-number">1</span><span class="video__text">Intro to Streams</span>
                    </p>
                    <p class="video__item video_list_2" onclick="$('.video__item').removeClass('active'); $(this).addClass('active');$('.yt_series').hide();$('.video_2').show();">
-                       <span class="number">2</span><span class="video__text">Creating a Streams Application</span>
+                       <span class="video-number">2</span><span class="video__text">Creating a Streams Application</span>
                    </p>
                    <p class="video__item video_list_3" onclick="$('.video__item').removeClass('active'); $(this).addClass('active');$('.yt_series').hide();$('.video_3').show();">
-                       <span class="number">3</span><span class="video__text">Transforming Data Pt. 1</span>
+                       <span class="video-number">3</span><span class="video__text">Transforming Data Pt. 1</span>
                    </p>
                    <p class="video__item video_list_4" onclick="$('.video__item').removeClass('active'); $(this).addClass('active');$('.yt_series').hide();$('.video_4').show();">
-                      <span class="number">4</span><span class="video__text">Transforming Data Pt. 11</span>
+                      <span class="video-number">4</span><span class="video__text">Transforming Data Pt. 11</span>
                    </p>
                 </div>
             </div>
@@ -103,7 +103,7 @@
                <p class="grid__item__customer__description extra__space">As the leading online fashion retailer in Europe, Zalando uses Kafka as an ESB (Enterprise Service Bus), which helps us in transitioning from a monolithic to a micro services architecture. Using Kafka for processing
                  <a href="https://www.confluent.io/blog/ranking-websites-real-time-apache-kafkas-streams-api/" target='blank'> event streams</a> enables our technical team to do near-real time business intelligence.
                </p>
-           </div>
+            </div>
            </div>
            <div class="customer__grid">
              <div class="customer__item  streams_logo_grid streams__line__grid">
@@ -154,8 +154,7 @@
            </div>
 
            <div class="code-example__snippet b-java-8 selected">
-               <pre class="brush: java;">
-                   import org.apache.kafka.common.serialization.Serdes;
+               <pre class="line-numbers"><code class="language-java">                   import org.apache.kafka.common.serialization.Serdes;
                    import org.apache.kafka.common.utils.Bytes;
                    import org.apache.kafka.streams.KafkaStreams;
                    import org.apache.kafka.streams.StreamsBuilder;
@@ -190,13 +189,11 @@
                            streams.start();
                        }
 
-                   }
-               </pre>
+                   }</code></pre>
            </div>
 
            <div class="code-example__snippet b-java-7">
-               <pre class="brush: java;">
-                   import org.apache.kafka.common.serialization.Serdes;
+               <pre class="line-numbers"><code class="language-java">                   import org.apache.kafka.common.serialization.Serdes;
                    import org.apache.kafka.common.utils.Bytes;
                    import org.apache.kafka.streams.KafkaStreams;
                    import org.apache.kafka.streams.StreamsBuilder;
@@ -245,13 +242,11 @@
                            streams.start();
                        }
 
-                   }
-               </pre>
+                   }</code></pre>
            </div>
 
            <div class="code-example__snippet b-scala">
-               <pre class="brush: scala;">
-import java.util.Properties
+               <pre class="line-numbers"><code class="language-scala">import java.util.Properties
 import java.util.concurrent.TimeUnit
 
 import org.apache.kafka.streams.kstream.Materialized
@@ -284,8 +279,7 @@ object WordCountApplication extends App {
   sys.ShutdownHookThread {
      streams.close(10, TimeUnit.SECONDS)
   }
-}
-               </pre>
+}</code></pre>
            </div>
        </div>
 
@@ -297,10 +291,10 @@ object WordCountApplication extends App {
 </script>
 <!--#include virtual="../../includes/_header.htm" -->
 <!--#include virtual="../../includes/_top.htm" -->
-<div class="content documentation documentation--current">
+<div class="content documentation">
   <!--#include virtual="../../includes/_nav.htm" -->
   <div class="right">
-    <!--#include virtual="../../includes/_docs_banner.htm" -->
+    <!--//#include virtual="../../includes/_docs_banner.htm" -->
     <ul class="breadcrumbs">
       <li><a href="/documentation">Documentation</a>
       <li><a href="/documentation/streams">Kafka Streams</a></li>
diff --git a/docs/streams/quickstart.html b/docs/streams/quickstart.html
index a6781f6..2cc48ef 100644
--- a/docs/streams/quickstart.html
+++ b/docs/streams/quickstart.html
@@ -48,8 +48,7 @@
 This quickstart example will demonstrate how to run a streaming application coded in this library. Here is the gist
 of the <code><a href="https://github.com/apache/kafka/blob/{{dotVersion}}/streams/examples/src/main/java/org/apache/kafka/streams/examples/wordcount/WordCountDemo.java">WordCountDemo</a></code> example code (converted to use Java 8 lambda expressions for easy reading).
 </p>
-<pre class="brush: java;">
-// Serializers/deserializers (serde) for String and Long types
+<pre class="line-numbers"><code class="language-java">// Serializers/deserializers (serde) for String and Long types
 final Serde&lt;String&gt; stringSerde = Serdes.String();
 final Serde&lt;Long&gt; longSerde = Serdes.Long();
 
@@ -72,8 +71,7 @@ KTable&lt;String, Long&gt; wordCounts = textLines
     .count();
 
 // Store the running counts as a changelog stream to the output topic.
-wordCounts.toStream().to("streams-wordcount-output", Produced.with(Serdes.String(), Serdes.Long()));
-</pre>
+wordCounts.toStream().to("streams-wordcount-output", Produced.with(Serdes.String(), Serdes.Long()));</code></pre>
 
 <p>
 It implements the WordCount
@@ -94,10 +92,8 @@ because it cannot know when it has processed "all" the input data.
 <a href="https://www.apache.org/dyn/closer.cgi?path=/kafka/{{fullDotVersion}}/kafka_{{scalaVersion}}-{{fullDotVersion}}.tgz" title="Kafka downloads">Download</a> the {{fullDotVersion}} release and un-tar it.
 Note that there are multiple downloadable Scala versions and we choose to use the recommended version ({{scalaVersion}}) here:
 
-<pre class="brush: bash;">
-&gt; tar -xzf kafka_{{scalaVersion}}-{{fullDotVersion}}.tgz
-&gt; cd kafka_{{scalaVersion}}-{{fullDotVersion}}
-</pre>
+<pre class="line-numbers"><code class="language-bash">&gt; tar -xzf kafka_{{scalaVersion}}-{{fullDotVersion}}.tgz
+&gt; cd kafka_{{scalaVersion}}-{{fullDotVersion}}</code></pre>
 
 <h4><a id="quickstart_streams_startserver" href="#quickstart_streams_startserver">Step 2: Start the Kafka server</a></h4>
 
@@ -105,79 +101,63 @@ Note that there are multiple downloadable Scala versions and we choose to use th
 Kafka uses <a href="https://zookeeper.apache.org/">ZooKeeper</a> so you need to first start a ZooKeeper server if you don't already have one. You can use the convenience script packaged with kafka to get a quick-and-dirty single-node ZooKeeper instance.
 </p>
 
-<pre class="brush: bash;">
-&gt; bin/zookeeper-server-start.sh config/zookeeper.properties
+<pre class="line-numbers"><code class="language-bash">&gt; bin/zookeeper-server-start.sh config/zookeeper.properties
 [2013-04-22 15:01:37,495] INFO Reading configuration from: config/zookeeper.properties (org.apache.zookeeper.server.quorum.QuorumPeerConfig)
-...
-</pre>
+...</code></pre>
 
 <p>Now start the Kafka server:</p>
-<pre class="brush: bash;">
-&gt; bin/kafka-server-start.sh config/server.properties
+<pre class="line-numbers"><code class="language-bash">&gt; bin/kafka-server-start.sh config/server.properties
 [2013-04-22 15:01:47,028] INFO Verifying properties (kafka.utils.VerifiableProperties)
 [2013-04-22 15:01:47,051] INFO Property socket.send.buffer.bytes is overridden to 1048576 (kafka.utils.VerifiableProperties)
-...
-</pre>
+...</code></pre>
 
 
 <h4><a id="quickstart_streams_prepare" href="#quickstart_streams_prepare">Step 3: Prepare input topic and start Kafka producer</a></h4>
 
 <!--
 
-<pre class="brush: bash;">
-&gt; echo -e "all streams lead to kafka\nhello kafka streams\njoin kafka summit" > file-input.txt
-</pre>
+<pre class="line-numbers"><code class="language-bash">&gt; echo -e "all streams lead to kafka\nhello kafka streams\njoin kafka summit" > file-input.txt</code></pre>
 Or on Windows:
-<pre class="brush: bash;">
-&gt; echo all streams lead to kafka> file-input.txt
+<pre class="line-numbers"><code class="language-bash">&gt; echo all streams lead to kafka> file-input.txt
 &gt; echo hello kafka streams>> file-input.txt
-&gt; echo|set /p=join kafka summit>> file-input.txt
-</pre>
+&gt; echo|set /p=join kafka summit>> file-input.txt</code></pre>
 
 -->
 
 Next, we create the input topic named <b>streams-plaintext-input</b> and the output topic named <b>streams-wordcount-output</b>:
 
-<pre class="brush: bash;">
-&gt; bin/kafka-topics.sh --create \
+<pre class="line-numbers"><code class="language-bash">&gt; bin/kafka-topics.sh --create \
     --bootstrap-server localhost:9092 \
     --replication-factor 1 \
     --partitions 1 \
     --topic streams-plaintext-input
-Created topic "streams-plaintext-input".
-</pre>
+Created topic "streams-plaintext-input".</code></pre>
 
 Note: we create the output topic with compaction enabled because the output stream is a changelog stream
 (cf. <a href="#anchor-changelog-output">explanation of application output</a> below).
 
-<pre class="brush: bash;">
-&gt; bin/kafka-topics.sh --create \
+<pre class="line-numbers"><code class="language-bash">&gt; bin/kafka-topics.sh --create \
     --bootstrap-server localhost:9092 \
     --replication-factor 1 \
     --partitions 1 \
     --topic streams-wordcount-output \
     --config cleanup.policy=compact
-Created topic "streams-wordcount-output".
-</pre>
+Created topic "streams-wordcount-output".</code></pre>
 
 The created topic can be described with the same <b>kafka-topics</b> tool:
 
-<pre class="brush: bash;">
-&gt; bin/kafka-topics.sh --bootstrap-server localhost:9092 --describe
+<pre class="line-numbers"><code class="language-bash">&gt; bin/kafka-topics.sh --bootstrap-server localhost:9092 --describe
 
 Topic:streams-wordcount-output	PartitionCount:1	ReplicationFactor:1	Configs:cleanup.policy=compact,segment.bytes=1073741824
 	Topic: streams-wordcount-output	Partition: 0	Leader: 0	Replicas: 0	Isr: 0
 Topic:streams-plaintext-input	PartitionCount:1	ReplicationFactor:1	Configs:segment.bytes=1073741824
-	Topic: streams-plaintext-input	Partition: 0	Leader: 0	Replicas: 0	Isr: 0
-</pre>
+	Topic: streams-plaintext-input	Partition: 0	Leader: 0	Replicas: 0	Isr: 0</code></pre>
 
 <h4><a id="quickstart_streams_start" href="#quickstart_streams_start">Step 4: Start the Wordcount Application</a></h4>
 
 The following command starts the WordCount demo application:
 
-<pre class="brush: bash;">
-&gt; bin/kafka-run-class.sh org.apache.kafka.streams.examples.wordcount.WordCountDemo
-</pre>
+<pre class="line-numbers"><code class="language-bash">&gt; bin/kafka-run-class.sh org.apache.kafka.streams.examples.wordcount.WordCountDemo</code></pre>
 
 <p>
 The demo application will read from the input topic <b>streams-plaintext-input</b>, perform the computations of the WordCount algorithm on each of the read messages,
@@ -187,22 +167,18 @@ Hence there won't be any STDOUT output except log entries as the results are wri
 
 Now we can start the console producer in a separate terminal to write some input data to this topic:
 
-<pre class="brush: bash;">
-&gt; bin/kafka-console-producer.sh --bootstrap-server localhost:9092 --topic streams-plaintext-input
-</pre>
+<pre class="line-numbers"><code class="language-bash">&gt; bin/kafka-console-producer.sh --bootstrap-server localhost:9092 --topic streams-plaintext-input</code></pre>
 
 and inspect the output of the WordCount demo application by reading from its output topic with the console consumer in a separate terminal:
 
-<pre class="brush: bash;">
-&gt; bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 \
+<pre class="line-numbers"><code class="language-bash">&gt; bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 \
     --topic streams-wordcount-output \
     --from-beginning \
     --formatter kafka.tools.DefaultMessageFormatter \
     --property print.key=true \
     --property print.value=true \
     --property key.deserializer=org.apache.kafka.common.serialization.StringDeserializer \
-    --property value.deserializer=org.apache.kafka.common.serialization.LongDeserializer
-</pre>
+    --property value.deserializer=org.apache.kafka.common.serialization.LongDeserializer</code></pre>
 
 
 <h4><a id="quickstart_streams_process" href="#quickstart_streams_process">Step 5: Process some data</a></h4>
@@ -211,17 +187,14 @@ Now let's write some message with the console producer into the input topic <b>s
 This will send a new message to the input topic, where the message key is null and the message value is the string encoded text line that you just entered
 (in practice, input data for applications will typically be streaming continuously into Kafka, rather than being manually entered as we do in this quickstart):
 
-<pre class="brush: bash;">
-&gt; bin/kafka-console-producer.sh --bootstrap-server localhost:9092 --topic streams-plaintext-input
-all streams lead to kafka
-</pre>
+<pre class="line-numbers"><code class="language-bash">&gt; bin/kafka-console-producer.sh --bootstrap-server localhost:9092 --topic streams-plaintext-input
+all streams lead to kafka</code></pre>
 
 <p>
 This message will be processed by the Wordcount application and the following output data will be written to the <b>streams-wordcount-output</b> topic and printed by the console consumer:
 </p>
 
-<pre class="brush: bash;">
-&gt; bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 \
+<pre class="line-numbers"><code class="language-bash">&gt; bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 \
     --topic streams-wordcount-output \
     --from-beginning \
     --formatter kafka.tools.DefaultMessageFormatter \
@@ -234,8 +207,7 @@ all	    1
 streams	1
 lead	1
 to	    1
-kafka	1
-</pre>
+kafka	1</code></pre>
 
 <p>
 Here, the first column is the Kafka message key in <code>java.lang.String</code> format and represents a word that is being counted, and the second column is the message value in <code>java.lang.Long</code>format, representing the word's latest count.
@@ -245,16 +217,13 @@ Now let's continue writing one more message with the console producer into the i
 Enter the text line "hello kafka streams" and hit &lt;RETURN&gt;.
 Your terminal should look as follows:
 
-<pre class="brush: bash;">
-&gt; bin/kafka-console-producer.sh --bootstrap-server localhost:9092 --topic streams-plaintext-input
+<pre class="line-numbers"><code class="language-bash">&gt; bin/kafka-console-producer.sh --bootstrap-server localhost:9092 --topic streams-plaintext-input
 all streams lead to kafka
-hello kafka streams
-</pre>
+hello kafka streams</code></pre>
 
 In your other terminal in which the console consumer is running, you will observe that the WordCount application wrote new output data:
 
-<pre class="brush: bash;">
-&gt; bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 \
+<pre class="line-numbers"><code class="language-bash">&gt; bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 \
     --topic streams-wordcount-output \
     --from-beginning \
     --formatter kafka.tools.DefaultMessageFormatter \
@@ -270,26 +239,22 @@ to	    1
 kafka	1
 hello	1
 kafka	2
-streams	2
-</pre>
+streams	2</code></pre>
 
 Here the last printed lines <b>kafka 2</b> and <b>streams 2</b> indicate updates to the keys <b>kafka</b> and <b>streams</b> whose counts have been incremented from <b>1</b> to <b>2</b>.
 Whenever you write further input messages to the input topic, you will observe new messages being added to the <b>streams-wordcount-output</b> topic,
 representing the most recent word counts as computed by the WordCount application.
 Let's enter one final input text line "join kafka summit" and hit &lt;RETURN&gt; in the console producer to the input topic <b>streams-plaintext-input</b> before we wrap up this quickstart:
 
-<pre class="brush: bash;">
-&gt; bin/kafka-console-producer.sh --bootstrap-server localhost:9092 --topic streams-plaintext-input
+<pre class="line-numbers"><code class="language-bash">&gt; bin/kafka-console-producer.sh --bootstrap-server localhost:9092 --topic streams-plaintext-input
 all streams lead to kafka
 hello kafka streams
-join kafka summit
-</pre>
+join kafka summit</code></pre>
 
 <a name="anchor-changelog-output"></a>
 The <b>streams-wordcount-output</b> topic will subsequently show the corresponding updated word counts (see last three lines):
 
-<pre class="brush: bash;">
-&gt; bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 \
+<pre class="line-numbers"><code class="language-bash">&gt; bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 \
     --topic streams-wordcount-output \
     --from-beginning \
     --formatter kafka.tools.DefaultMessageFormatter \
@@ -308,8 +273,7 @@ kafka	2
 streams	2
 join	1
 kafka	3
-summit	1
-</pre>
+summit	1</code></pre>
 
 As one can see, outputs of the Wordcount application is actually a continuous stream of updates, where each output record (i.e. each line in the original output above) is
 an updated count of a single word, aka record key such as "kafka". For multiple records with the same key, each later record is an update of the previous one.
@@ -352,10 +316,10 @@ Looking beyond the scope of this concrete example, what Kafka Streams is doing h
 
 <!--#include virtual="../../includes/_header.htm" -->
 <!--#include virtual="../../includes/_top.htm" -->
-<div class="content documentation documentation--current">
+<div class="content documentation">
     <!--#include virtual="../../includes/_nav.htm" -->
     <div class="right">
-        <!--#include virtual="../../includes/_docs_banner.htm" -->
+        <!--//#include virtual="../../includes/_docs_banner.htm" -->
         <ul class="breadcrumbs">
             <li><a href="/documentation">Documentation</a></li>
             <li><a href="/documentation/streams">Kafka Streams</a></li>
diff --git a/docs/streams/tutorial.html b/docs/streams/tutorial.html
index eeb90d6..71915cf 100644
--- a/docs/streams/tutorial.html
+++ b/docs/streams/tutorial.html
@@ -36,30 +36,27 @@
         It is highly recommended to read the <a href="/{{version}}/documentation/streams/quickstart">quickstart</a> first on how to run a Streams application written in Kafka Streams if you have not done so.
     </p>
 
-    <h4><a id="tutorial_maven_setup" href="#tutorial_maven_setup">Setting up a Maven Project</a></h4>
+    <h4 class="anchor-heading"><a id="tutorial_maven_setup" class="anchor-link"></a><a href="#tutorial_maven_setup">Setting up a Maven Project</a></h4>
 
     <p>
         We are going to use a Kafka Streams Maven Archetype for creating a Streams project structure with the following commands:
     </p>
 
-    <pre class="brush: bash;">
-        mvn archetype:generate \
+    <pre class="line-numbers"><code class="language-bash">        mvn archetype:generate \
             -DarchetypeGroupId=org.apache.kafka \
             -DarchetypeArtifactId=streams-quickstart-java \
             -DarchetypeVersion={{fullDotVersion}} \
             -DgroupId=streams.examples \
             -DartifactId=streams.examples \
             -Dversion=0.1 \
-            -Dpackage=myapps
-    </pre>
+            -Dpackage=myapps</code></pre>
 
     <p>
         You can use a different value for <code>groupId</code>, <code>artifactId</code> and <code>package</code> parameters if you like.
         Assuming the above parameter values are used, this command will create a project structure that looks like this:
     </p>
 
-    <pre class="brush: bash;">
-        &gt; tree streams.examples
+    <pre class="line-numbers"><code class="language-bash">        &gt; tree streams.examples
         streams-quickstart
         |-- pom.xml
         |-- src
@@ -70,8 +67,7 @@
                 |       |-- Pipe.java
                 |       |-- WordCount.java
                 |-- resources
-                    |-- log4j.properties
-    </pre>
+                    |-- log4j.properties</code></pre>
 
     <p>
         The <code>pom.xml</code> file included in the project already has the Streams dependency defined.
@@ -83,26 +79,22 @@
         Since we are going to start writing such programs from scratch, we can now delete these examples:
     </p>
 
-    <pre class="brush: bash;">
-        &gt; cd streams-quickstart
-        &gt; rm src/main/java/myapps/*.java
-    </pre>
+    <pre class="line-numbers"><code class="language-bash">        &gt; cd streams-quickstart
+        &gt; rm src/main/java/myapps/*.java</code></pre>
 
     <h4><a id="tutorial_code_pipe" href="#tutorial_code_pipe">Writing a first Streams application: Pipe</a></h4>
 
     It's coding time now! Feel free to open your favorite IDE and import this Maven project, or simply open a text editor and create a java file under <code>src/main/java/myapps</code>.
     Let's name it <code>Pipe.java</code>:
 
-    <pre class="brush: java;">
-        package myapps;
+    <pre class="line-numbers"><code class="language-java">        package myapps;
 
         public class Pipe {
 
             public static void main(String[] args) throws Exception {
 
             }
-        }
-    </pre>
+        }</code></pre>
 
     <p>
         We are going to fill in the <code>main</code> function to write this pipe program. Note that we will not list the import statements as we go since IDEs can usually add them automatically.
@@ -115,20 +107,16 @@
         and <code>StreamsConfig.APPLICATION_ID_CONFIG</code>, which gives the unique identifier of your Streams application to distinguish itself with other applications talking to the same Kafka cluster:
     </p>
 
-    <pre class="brush: java;">
-        Properties props = new Properties();
+    <pre class="line-numbers"><code class="language-java">        Properties props = new Properties();
         props.put(StreamsConfig.APPLICATION_ID_CONFIG, "streams-pipe");
-        props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");    // assuming that the Kafka broker this application is talking to runs on local machine with port 9092
-    </pre>
+        props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");    // assuming that the Kafka broker this application is talking to runs on local machine with port 9092</code></pre>
 
     <p>
         In addition, you can customize other configurations in the same map, for example, default serialization and deserialization libraries for the record key-value pairs:
     </p>
 
-    <pre class="brush: java;">
-        props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass());
-        props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass());
-    </pre>
+    <pre class="line-numbers"><code class="language-java">        props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass());
+        props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass());</code></pre>
 
     <p>
         For a full list of configurations of Kafka Streams please refer to this <a href="/{{version}}/documentation/#streamsconfigs">table</a>.
@@ -140,17 +128,13 @@
         We can use a topology builder to construct such a topology,
     </p>
 
-    <pre class="brush: java;">
-        final StreamsBuilder builder = new StreamsBuilder();
-    </pre>
+    <pre class="line-numbers"><code class="language-java">        final StreamsBuilder builder = new StreamsBuilder();</code></pre>
 
     <p>
         And then create a source stream from a Kafka topic named <code>streams-plaintext-input</code> using this topology builder:
     </p>
 
-    <pre class="brush: java;">
-        KStream&lt;String, String&gt; source = builder.stream("streams-plaintext-input");
-    </pre>
+    <pre class="line-numbers"><code class="language-java">        KStream&lt;String, String&gt; source = builder.stream("streams-plaintext-input");</code></pre>
 
     <p>
         Now we get a <code>KStream</code> that is continuously generating records from its source Kafka topic <code>streams-plaintext-input</code>.
@@ -158,48 +142,38 @@
         The simplest thing we can do with this stream is to write it into another Kafka topic, say it's named <code>streams-pipe-output</code>:
     </p>
 
-    <pre class="brush: java;">
-        source.to("streams-pipe-output");
-    </pre>
+    <pre class="line-numbers"><code class="language-java">        source.to("streams-pipe-output");</code></pre>
 
     <p>
         Note that we can also concatenate the above two lines into a single line as:
     </p>
 
-    <pre class="brush: java;">
-        builder.stream("streams-plaintext-input").to("streams-pipe-output");
-    </pre>
+    <pre class="line-numbers"><code class="language-java">        builder.stream("streams-plaintext-input").to("streams-pipe-output");</code></pre>
 
     <p>
         We can inspect what kind of <code>topology</code> is created from this builder by doing the following:
     </p>
 
-    <pre class="brush: java;">
-        final Topology topology = builder.build();
-    </pre>
+    <pre class="line-numbers"><code class="language-java">        final Topology topology = builder.build();</code></pre>
 
     <p>
         And print its description to standard output as:
     </p>
 
-    <pre class="brush: java;">
-        System.out.println(topology.describe());
-    </pre>
+    <pre class="line-numbers"><code class="language-java">        System.out.println(topology.describe());</code></pre>
 
     <p>
         If we just stop here, compile and run the program, it will output the following information:
     </p>
 
-    <pre class="brush: bash;">
-        &gt; mvn clean package
+    <pre class="line-numbers"><code class="language-bash">        &gt; mvn clean package
         &gt; mvn exec:java -Dexec.mainClass=myapps.Pipe
         Sub-topologies:
           Sub-topology: 0
             Source: KSTREAM-SOURCE-0000000000(topics: streams-plaintext-input) --> KSTREAM-SINK-0000000001
             Sink: KSTREAM-SINK-0000000001(topic: streams-pipe-output) <-- KSTREAM-SOURCE-0000000000
         Global Stores:
-          none
-    </pre>
+          none</code></pre>
 
     <p>
         As shown above, it illustrates that the constructed topology has two processor nodes, a source node <code>KSTREAM-SOURCE-0000000000</code> and a sink node <code>KSTREAM-SINK-0000000001</code>.
@@ -215,9 +189,7 @@
         we can now construct the Streams client with the two components we have just constructed above: the configuration map specified in a <code>java.util.Properties</code> instance and the <code>Topology</code> object.
     </p>
 
-    <pre class="brush: java;">
-        final KafkaStreams streams = new KafkaStreams(topology, props);
-    </pre>
+    <pre class="line-numbers"><code class="language-java">        final KafkaStreams streams = new KafkaStreams(topology, props);</code></pre>
 
     <p>
         By calling its <code>start()</code> function we can trigger the execution of this client.
@@ -225,8 +197,7 @@
         We can, for example, add a shutdown hook with a countdown latch to capture a user interrupt and close the client upon terminating this program:
     </p>
 
-    <pre class="brush: java;">
-        final CountDownLatch latch = new CountDownLatch(1);
+    <pre class="line-numbers"><code class="language-java">        final CountDownLatch latch = new CountDownLatch(1);
 
         // attach shutdown handler to catch control-c
         Runtime.getRuntime().addShutdownHook(new Thread("streams-shutdown-hook") {
@@ -243,15 +214,13 @@
         } catch (Throwable e) {
             System.exit(1);
         }
-        System.exit(0);
-    </pre>
+        System.exit(0);</code></pre>
 
     <p>
         The complete code so far looks like this:
     </p>
 
-    <pre class="brush: java;">
-        package myapps;
+    <pre class="line-numbers"><code class="language-java">        package myapps;
 
         import org.apache.kafka.common.serialization.Serdes;
         import org.apache.kafka.streams.KafkaStreams;
@@ -297,8 +266,7 @@
                 }
                 System.exit(0);
             }
-        }
-    </pre>
+        }</code></pre>
 
     <p>
         If you already have the Kafka broker up and running at <code>localhost:9092</code>,
@@ -306,10 +274,8 @@
         you can run this code in your IDE or on the command line, using Maven:
     </p>
 
-    <pre class="brush: brush;">
-        &gt; mvn clean package
-        &gt; mvn exec:java -Dexec.mainClass=myapps.Pipe
-    </pre>
+    <pre class="line-numbers"><code class="language-brush">        &gt; mvn clean package
+        &gt; mvn exec:java -Dexec.mainClass=myapps.Pipe</code></pre>
 
     <p>
         For detailed instructions on how to run a Streams application and observe its computing results,
@@ -325,39 +291,33 @@
         We can first create another program by first copy the existing <code>Pipe.java</code> class:
     </p>
 
-    <pre class="brush: brush;">
-        &gt; cp src/main/java/myapps/Pipe.java src/main/java/myapps/LineSplit.java
-    </pre>
+    <pre class="line-numbers"><code class="language-brush">        &gt; cp src/main/java/myapps/Pipe.java src/main/java/myapps/LineSplit.java</code></pre>
 
     <p>
         And change its class name as well as the application id config to distinguish with the original program:
     </p>
 
-    <pre class="brush: java;">
-        public class LineSplit {
+    <pre class="line-numbers"><code class="language-java">        public class LineSplit {
 
             public static void main(String[] args) throws Exception {
                 Properties props = new Properties();
                 props.put(StreamsConfig.APPLICATION_ID_CONFIG, "streams-linesplit");
                 // ...
             }
-        }
-    </pre>
+        }</code></pre>
 
     <p>
         Since each of the source stream's record is a <code>String</code> typed key-value pair,
         let's treat the value string as a text line and split it into words with a <code>FlatMapValues</code> operator:
     </p>
 
-    <pre class="brush: java;">
-        KStream&lt;String, String&gt; source = builder.stream("streams-plaintext-input");
+    <pre class="line-numbers"><code class="language-java">        KStream&lt;String, String&gt; source = builder.stream("streams-plaintext-input");
         KStream&lt;String, String&gt; words = source.flatMapValues(new ValueMapper&lt;String, Iterable&lt;String&gt;&gt;() {
                     @Override
                     public Iterable&lt;String&gt; apply(String value) {
                         return Arrays.asList(value.split("\\W+"));
                     }
-                });
-    </pre>
+                });</code></pre>
 
     <p>
         The operator will take the <code>source</code> stream as its input, and generate a new stream named <code>words</code>
@@ -367,28 +327,23 @@
         Note if you are using JDK 8 you can use lambda expression and simplify the above code as:
     </p>
 
-    <pre class="brush: java;">
-        KStream&lt;String, String&gt; source = builder.stream("streams-plaintext-input");
-        KStream&lt;String, String&gt; words = source.flatMapValues(value -> Arrays.asList(value.split("\\W+")));
-    </pre>
+    <pre class="line-numbers"><code class="language-java">        KStream&lt;String, String&gt; source = builder.stream("streams-plaintext-input");
+        KStream&lt;String, String&gt; words = source.flatMapValues(value -> Arrays.asList(value.split("\\W+")));</code></pre>
 
     <p>
         And finally we can write the word stream back into another Kafka topic, say <code>streams-linesplit-output</code>.
         Again, these two steps can be concatenated as the following (assuming lambda expression is used):
     </p>
 
-    <pre class="brush: java;">
-        KStream&lt;String, String&gt; source = builder.stream("streams-plaintext-input");
+    <pre class="line-numbers"><code class="language-java">        KStream&lt;String, String&gt; source = builder.stream("streams-plaintext-input");
         source.flatMapValues(value -> Arrays.asList(value.split("\\W+")))
-              .to("streams-linesplit-output");
-    </pre>
+              .to("streams-linesplit-output");</code></pre>
 
     <p>
         If we now describe this augmented topology as <code>System.out.println(topology.describe())</code>, we will get the following:
     </p>
 
-    <pre class="brush: bash;">
-        &gt; mvn clean package
+    <pre class="line-numbers"><code class="language-bash">        &gt; mvn clean package
         &gt; mvn exec:java -Dexec.mainClass=myapps.LineSplit
         Sub-topologies:
           Sub-topology: 0
@@ -396,8 +351,7 @@
             Processor: KSTREAM-FLATMAPVALUES-0000000001(stores: []) --> KSTREAM-SINK-0000000002 <-- KSTREAM-SOURCE-0000000000
             Sink: KSTREAM-SINK-0000000002(topic: streams-linesplit-output) <-- KSTREAM-FLATMAPVALUES-0000000001
           Global Stores:
-            none
-    </pre>
+            none</code></pre>
 
     <p>
         As we can see above, a new processor node <code>KSTREAM-FLATMAPVALUES-0000000001</code> is injected into the topology between the original source and sink nodes.
@@ -411,8 +365,7 @@
         The complete code looks like this (assuming lambda expression is used):
     </p>
 
-    <pre class="brush: java;">
-        package myapps;
+    <pre class="line-numbers"><code class="language-java">        package myapps;
 
         import org.apache.kafka.common.serialization.Serdes;
         import org.apache.kafka.streams.KafkaStreams;
@@ -446,8 +399,7 @@
 
                 // ... same as Pipe.java above
             }
-        }
-    </pre>
+        }</code></pre>
 
     <h4><a id="tutorial_code_wordcount" href="#tutorial_code_wordcount">Writing a third Streams application: Wordcount</a></h4>
 
@@ -456,37 +408,32 @@
         Following similar steps let's create another program based on the <code>LineSplit.java</code> class:
     </p>
 
-    <pre class="brush: java;">
-        public class WordCount {
+    <pre class="line-numbers"><code class="language-java">        public class WordCount {
 
             public static void main(String[] args) throws Exception {
                 Properties props = new Properties();
                 props.put(StreamsConfig.APPLICATION_ID_CONFIG, "streams-wordcount");
                 // ...
             }
-        }
-    </pre>
+        }</code></pre>
 
     <p>
         In order to count the words we can first modify the <code>flatMapValues</code> operator to treat all of them as lower case (assuming lambda expression is used):
     </p>
 
-    <pre class="brush: java;">
-        source.flatMapValues(new ValueMapper&lt;String, Iterable&lt;String&gt;&gt;() {
+    <pre class="line-numbers"><code class="language-java">        source.flatMapValues(new ValueMapper&lt;String, Iterable&lt;String&gt;&gt;() {
                     @Override
                     public Iterable&lt;String&gt; apply(String value) {
                         return Arrays.asList(value.toLowerCase(Locale.getDefault()).split("\\W+"));
                     }
-                });
-    </pre>
+                });</code></pre>
 
     <p>
         In order to do the counting aggregation we have to first specify that we want to key the stream on the value string, i.e. the lower cased word, with a <code>groupBy</code> operator.
         This operator generate a new grouped stream, which can then be aggregated by a <code>count</code> operator, which generates a running count on each of the grouped keys:
     </p>
 
-    <pre class="brush: java;">
-        KTable&lt;String, Long&gt; counts =
+    <pre class="line-numbers"><code class="language-java">        KTable&lt;String, Long&gt; counts =
         source.flatMapValues(new ValueMapper&lt;String, Iterable&lt;String&gt;&gt;() {
                     @Override
                     public Iterable&lt;String&gt; apply(String value) {
@@ -501,8 +448,7 @@
                 })
               // Materialize the result into a KeyValueStore named "counts-store".
               // The Materialized store is always of type &lt;Bytes, byte[]&gt; as this is the format of the inner most store.
-              .count(Materialized.&lt;String, Long, KeyValueStore&lt;Bytes, byte[]&gt;&gt; as("counts-store"));
-    </pre>
+              .count(Materialized.&lt;String, Long, KeyValueStore&lt;Bytes, byte[]&gt;&gt; as("counts-store"));</code></pre>
 
     <p>
         Note that the <code>count</code> operator has a <code>Materialized</code> parameter that specifies that the
@@ -517,9 +463,7 @@
         We need to provide overridden serialization methods for <code>Long</code> types, otherwise a runtime exception will be thrown:
     </p>
 
-    <pre class="brush: java;">
-        counts.toStream().to("streams-wordcount-output", Produced.with(Serdes.String(), Serdes.Long()));
-    </pre>
+    <pre class="line-numbers"><code class="language-java">        counts.toStream().to("streams-wordcount-output", Produced.with(Serdes.String(), Serdes.Long()));</code></pre>
 
     <p>
         Note that in order to read the changelog stream from topic <code>streams-wordcount-output</code>,
@@ -528,21 +472,18 @@
         Assuming lambda expression from JDK 8 can be used, the above code can be simplified as:
     </p>
 
-    <pre class="brush: java;">
-        KStream&lt;String, String&gt; source = builder.stream("streams-plaintext-input");
+    <pre class="line-numbers"><code class="language-java">        KStream&lt;String, String&gt; source = builder.stream("streams-plaintext-input");
         source.flatMapValues(value -> Arrays.asList(value.toLowerCase(Locale.getDefault()).split("\\W+")))
               .groupBy((key, value) -> value)
               .count(Materialized.&lt;String, Long, KeyValueStore&lt;Bytes, byte[]&gt;&gt;as("counts-store"))
               .toStream()
-              .to("streams-wordcount-output", Produced.with(Serdes.String(), Serdes.Long()));
-    </pre>
+              .to("streams-wordcount-output", Produced.with(Serdes.String(), Serdes.Long()));</code></pre>
 
     <p>
         If we again describe this augmented topology as <code>System.out.println(topology.describe())</code>, we will get the following:
     </p>
 
-    <pre class="brush: bash;">
-        &gt; mvn clean package
+    <pre class="line-numbers"><code class="language-bash">        &gt; mvn clean package
         &gt; mvn exec:java -Dexec.mainClass=myapps.WordCount
         Sub-topologies:
           Sub-topology: 0
@@ -557,8 +498,7 @@
             Processor: KTABLE-TOSTREAM-0000000007(stores: []) --> KSTREAM-SINK-0000000008 <-- KSTREAM-AGGREGATE-0000000003
             Sink: KSTREAM-SINK-0000000008(topic: streams-wordcount-output) <-- KTABLE-TOSTREAM-0000000007
         Global Stores:
-          none
-    </pre>
+          none</code></pre>
 
     <p>
         As we can see above, the topology now contains two disconnected sub-topologies.
@@ -577,8 +517,7 @@
         The complete code looks like this (assuming lambda expression is used):
     </p>
 
-    <pre class="brush: java;">
-        package myapps;
+    <pre class="line-numbers"><code class="language-java">        package myapps;
 
         import org.apache.kafka.common.serialization.Serdes;
         import org.apache.kafka.streams.KafkaStreams;
@@ -616,8 +555,7 @@
 
                 // ... same as Pipe.java above
             }
-        }
-    </pre>
+        }</code></pre>
 
     <div class="pagination">
         <a href="/{{version}}/documentation/streams/quickstart" class="pagination__btn pagination__btn__prev">Previous</a>
@@ -629,10 +567,10 @@
 
 <!--#include virtual="../../includes/_header.htm" -->
 <!--#include virtual="../../includes/_top.htm" -->
-<div class="content documentation documentation--current">
+<div class="content documentation">
     <!--#include virtual="../../includes/_nav.htm" -->
     <div class="right">
-        <!--#include virtual="../../includes/_docs_banner.htm" -->
+        <!--//#include virtual="../../includes/_docs_banner.htm" -->
         <ul class="breadcrumbs">
             <li><a href="/documentation">Documentation</a></li>
             <li><a href="/documentation/streams">Kafka Streams</a></li>
diff --git a/docs/streams/upgrade-guide.html b/docs/streams/upgrade-guide.html
index 699a8ab..758ea74 100644
--- a/docs/streams/upgrade-guide.html
+++ b/docs/streams/upgrade-guide.html
@@ -126,7 +126,6 @@
         new value <code>"exactly_once_beta"</code>.
         Note that you need brokers with version 2.5 or newer to use this feature.
     </p>
-
     <p>
         For more highly available stateful applications, we've modified the task assignment algorithm to delay the movement of stateful active tasks to instances
         that aren't yet caught up with that task's state. Instead, to migrate a task from one instance to another (eg when scaling out),
@@ -135,12 +134,10 @@
         tasks to their new owners in the background. Check out <a href="https://cwiki.apache.org/confluence/x/0i4lBg">KIP-441</a>
         for full details, including several new configs for control over this new feature.
     </p>
-
     <p>
         New end-to-end latency metrics have been added. These task-level metrics will be logged at the INFO level and report the min and max end-to-end latency of a record at the beginning/source node(s)
         and end/terminal node(s) of a task. See <a href="https://cwiki.apache.org/confluence/x/gBkRCQ">KIP-613</a> for more information.
     </p>
-
     <p>
         As of 2.6.0 Kafka Streams deprecates <code>KStream.through()</code> in favor of the new <code>KStream.repartition()</code> operator
         (as per <a href="https://cwiki.apache.org/confluence/display/KAFKA/KIP-221%3A+Enhance+DSL+with+Connecting+Topic+Creation+and+Repartition+Hint">KIP-221</a>).
@@ -160,7 +157,7 @@
         as per <a href="https://cwiki.apache.org/confluence/display/KAFKA/KIP-571%3A+Add+option+to+force+remove+members+in+StreamsResetter">KIP-571</a>.
     </p>
 
-    <h3><a id="streams_api_changes_250" href="#streams_api_changes_250">Streams API changes in 2.5.0</a></h3>
+    <h3 class="anchor-heading"><a id="streams_api_changes_250" class="anchor-link"></a><a href="#streams_api_changes_250">Streams API changes in 2.5.0</a></h3>
     <p>
         We add a new <code>cogroup()</code> operator (via <a href="https://cwiki.apache.org/confluence/display/KAFKA/KIP-150+-+Kafka-Streams+Cogroup">KIP-150</a>)
         that allows to aggregate multiple streams in a single operation.
@@ -186,7 +183,7 @@
         <a href="https://cwiki.apache.org/confluence/display/KAFKA/KIP-535%3A+Allow+state+stores+to+serve+stale+reads+during+rebalance">KIP-535</a> respectively.
     </p>
 
-    <h3><a id="streams_api_changes_240" href="#streams_api_changes_240">Streams API changes in 2.4.0</a></h3>
+    <h3 class="anchor-heading"><a id="streams_api_changes_240" class="anchor-link"></a><a href="#streams_api_changes_240">Streams API changes in 2.4.0</a></h3>
     <p>     
          As of 2.4.0 Kafka Streams offers a KTable-KTable foreign-key join (as per <a href="https://cwiki.apache.org/confluence/display/KAFKA/KIP-213+Support+non-key+joining+in+KTable">KIP-213</a>). 
          This joiner allows for records to be joined between two KTables with different keys. 
@@ -277,7 +274,7 @@
         Hence, you will need to reset your application to upgrade it.
     
 
-    <h3><a id="streams_api_changes_230" href="#streams_api_changes_230">Streams API changes in 2.3.0</a></h3>
+    <h3 class="anchor-heading"><a id="streams_api_changes_230" class="anchor-link"></a><a href="#streams_api_changes_230">Streams API changes in 2.3.0</a></h3>
 
     <p>Version 2.3.0 adds the Suppress operator to the <code>kafka-streams-scala</code> Ktable API.</p>
 
@@ -346,13 +343,13 @@
         For more details please read <a href="https://issues.apache.org/jira/browse/KAFKA-8215">KAFKA-8215</a>.
     </p>
 
-    <h3><a id="streams_notable_changes_221" href="#streams_api_changes_221">Notable changes in Kafka Streams 2.2.1</a></h3>
+    <h3 class="anchor-heading"><a id="streams_notable_changes_221" class="anchor-link"></a><a href="#streams_notable_changes_221">Notable changes in Kafka Streams 2.2.1</a></h3>
     <p>
         As of Kafka Streams 2.2.1 a message format 0.11 or higher is required;
         this implies that brokers must be on version 0.11.0 or higher.
     </p>
 
-    <h3><a id="streams_api_changes_220" href="#streams_api_changes_220">Streams API changes in 2.2.0</a></h3>
+    <h3 class="anchor-heading"><a id="streams_api_changes_220" class="anchor-link"></a><a href="#streams_api_changes_220">Streams API changes in 2.2.0</a></h3>
     <p>
         We've simplified the <code>KafkaStreams#state</code> transition diagram during the starting up phase a bit in 2.2.0: in older versions the state will transit from <code>CREATED</code> to <code>RUNNING</code>, and then to <code>REBALANCING</code> to get the first
         stream task assignment, and then back to <code>RUNNING</code>; starting in 2.2.0 it will transit from <code>CREATED</code> directly to <code>REBALANCING</code> and then to <code>RUNNING</code>.
@@ -369,7 +366,7 @@
         used in a try-with-resource statement. For a full list of public interfaces that get impacted please read <a href="https://cwiki.apache.org/confluence/display/KAFKA/KIP-376%3A+Implement+AutoClosable+on+appropriate+classes+that+want+to+be+used+in+a+try-with-resource+statement">KIP-376</a>.
     </p>
 
-    <h3><a id="streams_api_changes_210" href="#streams_api_changes_210">Streams API changes in 2.1.0</a></h3>
+    <h3 class="anchor-heading"><a id="streams_api_changes_210" class="anchor-link"></a><a href="#streams_api_changes_210">Streams API changes in 2.1.0</a></h3>
     <p>
         We updated <code>TopologyDescription</code> API to allow for better runtime checking.
         Users are encouraged to use <code>#topicSet()</code> and <code>#topicPattern()</code> accordingly on <code>TopologyDescription.Source</code> nodes,
@@ -469,7 +466,7 @@
         different stream instances in one application.
     </p>
 
-    <h3><a id="streams_api_changes_200" href="#streams_api_changes_200">Streams API changes in 2.0.0</a></h3>
+    <h3 class="anchor-heading"><a id="streams_api_changes_200" class="anchor-link"></a><a href="#streams_api_changes_200">Streams API changes in 2.0.0</a></h3>
     <p>
         In 2.0.0 we have added a few new APIs on the <code>ReadOnlyWindowStore</code> interface (for details please read <a href="#streams_api_changes_200">Streams API changes</a> below).
         If you have customized window store implementations that extends the <code>ReadOnlyWindowStore</code> interface you need to make code changes.
@@ -605,7 +602,7 @@
         <li><code>StreamsConfig#ZOOKEEPER_CONNECT_CONFIG</code> are removed as we do not need ZooKeeper dependency in Streams any more (it is deprecated since 0.10.2.0). </li>
     </ul>
 
-    <h3><a id="streams_api_changes_110" href="#streams_api_changes_110">Streams API changes in 1.1.0</a></h3>
+    <h3 class="anchor-heading"><a id="streams_api_changes_110" class="anchor-link"></a><a href="#streams_api_changes_110">Streams API changes in 1.1.0</a></h3>
     <p>
         We have added support for methods in <code>ReadOnlyWindowStore</code> which allows for querying <code>WindowStore</code>s without the necessity of providing keys.
         For users who have customized window store implementations on the above interface, they'd need to update their code to implement the newly added method as well.
@@ -663,7 +660,7 @@
         <li> added options to specify input topics offsets to reset according to <a href="https://cwiki.apache.org/confluence/display/KAFKA/KIP-171+-+Extend+Consumer+Group+Reset+Offset+for+Stream+Application">KIP-171</a></li>
     </ul>
 
-    <h3><a id="streams_api_changes_100" href="#streams_api_changes_100">Streams API changes in 1.0.0</a></h3>
+    <h3 class="anchor-heading"><a id="streams_api_changes_100" class="anchor-link"></a><a href="#streams_api_changes_100">Streams API changes in 1.0.0</a></h3>
 
     <p>
         With 1.0 a major API refactoring was accomplished and the new API is cleaner and easier to use.
@@ -797,7 +794,7 @@
         If you already use <code>StateStoreSupplier</code> or <code>Materialized</code> to provide configs for changelogs, then they will take precedence over those supplied in the config.
     </p>
 
-    <h3><a id="streams_api_changes_0110" href="#streams_api_changes_0110">Streams API changes in 0.11.0.0</a></h3>
+    <h3 class="anchor-heading"><a id="streams_api_changes_0110" class="anchor-link"></a><a href="#streams_api_changes_0110">Streams API changes in 0.11.0.0</a></h3>
 
     <p> Updates in <code>StreamsConfig</code>: </p>
     <ul>
@@ -866,7 +863,7 @@
     </ul>
     <p> <code>[client.Id]</code> is either set via Streams configuration parameter <code>client.id</code> or defaults to <code>[application.id]-[processId]</code> (<code>[processId]</code> is a random UUID). </p>
 
-    <h3><a id="streams_api_changes_01021" href="#streams_api_changes_01021">Notable changes in 0.10.2.1</a></h3>
+    <h3 class="anchor-heading"><a id="streams_api_changes_01021" class="anchor-link"></a><a href="#streams_api_changes_01021">Notable changes in 0.10.2.1</a></h3>
 
     <p>
         Parameter updates in <code>StreamsConfig</code>:
@@ -875,7 +872,7 @@
         <li> The default config values of embedded producer's <code>retries</code> and consumer's <code>max.poll.interval.ms</code> have been changed to improve the resiliency of a Kafka Streams application </li>
     </ul>
 
-    <h3><a id="streams_api_changes_0102" href="#streams_api_changes_0102">Streams API changes in 0.10.2.0</a></h3>
+    <h3 class="anchor-heading"><a id="streams_api_changes_0102" class="anchor-link"></a><a href="#streams_api_changes_0102">Streams API changes in 0.10.2.0</a></h3>
 
     <p>
         New methods in <code>KafkaStreams</code>:
@@ -946,7 +943,7 @@
 
     <p> Relaxed type constraints of many DSL interfaces, classes, and methods (cf. <a href="https://cwiki.apache.org/confluence/display/KAFKA/KIP-100+-+Relax+Type+constraints+in+Kafka+Streams+API">KIP-100</a>). </p>
 
-    <h3><a id="streams_api_changes_0101" href="#streams_api_changes_0101">Streams API changes in 0.10.1.0</a></h3>
+    <h3 class="anchor-heading"><a id="streams_api_changes_0101" class="anchor-link"></a><a href="#streams_api_changes_0101">Streams API changes in 0.10.1.0</a></h3>
 
     <p> Stream grouping and aggregation split into two methods: </p>
     <ul>
@@ -989,10 +986,10 @@
 
 <!--#include virtual="../../includes/_header.htm" -->
 <!--#include virtual="../../includes/_top.htm" -->
-<div class="content documentation documentation--current">
+<div class="content documentation">
     <!--#include virtual="../../includes/_nav.htm" -->
     <div class="right">
-        <!--#include virtual="../../includes/_docs_banner.htm" -->
+        <!--//#include virtual="../../includes/_docs_banner.htm" -->
         <ul class="breadcrumbs">
             <li><a href="/documentation">Documentation</a></li>
             <li><a href="/documentation/streams">Kafka Streams</a></li>