You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by bb...@apache.org on 2020/11/03 18:46:20 UTC

[kafka] branch 2.7 updated: KAFKA-10679: Migrate upgrade changes from site to kafka/docs (#9551)

This is an automated email from the ASF dual-hosted git repository.

bbejeck pushed a commit to branch 2.7
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/2.7 by this push:
     new 7f081ba  KAFKA-10679: Migrate upgrade changes from site to kafka/docs (#9551)
7f081ba is described below

commit 7f081ba748091e541e8a3eef3338922ac02d860e
Author: Bill Bejeck <bb...@gmail.com>
AuthorDate: Tue Nov 3 13:40:44 2020 -0500

    KAFKA-10679: Migrate upgrade changes from site to kafka/docs (#9551)
    
    During the AK website upgrade, changes made to kafka-site weren't migrated back to kafka-docs.
    
    This PR is an initial attempt at porting the changes to kafka/docs, but it does not include the streams changes. Those will come in a separate PR.
    
    For the most part, the bulk of the changes in the PR are cosmetic. Only the introduction.html has substantial changes, but it's a direct port from the live documentation.
    
    For testing:
    
    I reviewed the PR diffs
    Rendered the changes locally
    
    Reviewers: Matthias J. Sax <mj...@apache.org>
---
 docs/api.html                  |  40 ++--
 docs/configuration.html        |  74 +++----
 docs/design.html               |  76 ++++---
 docs/implementation.html       |  98 ++++-----
 docs/introduction.html         | 339 +++++++++++++++--------------
 docs/migration.html            |   4 +-
 docs/ops.html                  | 392 +++++++++++++--------------------
 docs/protocol.html             |  42 ++--
 docs/quickstart-docker.html    | 204 +++++++++++++++++
 docs/quickstart-zookeeper.html | 277 +++++++++++++++++++++++
 docs/quickstart.html           | 300 -------------------------
 docs/security.html             | 483 ++++++++++++++++-------------------------
 docs/uses.html                 |  14 +-
 13 files changed, 1140 insertions(+), 1203 deletions(-)

diff --git a/docs/api.html b/docs/api.html
index b6ab1fa..94d5f3e 100644
--- a/docs/api.html
+++ b/docs/api.html
@@ -26,7 +26,7 @@
 
 	Kafka exposes all its functionality over a language independent protocol which has clients available in many programming languages. However only the Java clients are maintained as part of the main Kafka project, the others are available as independent open source projects. A list of non-Java clients is available <a href="https://cwiki.apache.org/confluence/display/KAFKA/Clients">here</a>.
 
-	<h3><a id="producerapi" href="#producerapi">2.1 Producer API</a></h3>
+	<h3 class="anchor-heading"><a id="producerapi" class="anchor-link"></a><a href="#producerapi">2.1 Producer API</a></h3>
 
 	The Producer API allows applications to send streams of data to topics in the Kafka cluster.
 	<p>
@@ -35,15 +35,13 @@
 	<p>
 	To use the producer, you can use the following maven dependency:
 
-	<pre class="brush: xml;">
-		&lt;dependency&gt;
+	<pre class="line-numbers"><code class="language-xml">		&lt;dependency&gt;
 			&lt;groupId&gt;org.apache.kafka&lt;/groupId&gt;
 			&lt;artifactId&gt;kafka-clients&lt;/artifactId&gt;
 			&lt;version&gt;{{fullDotVersion}}&lt;/version&gt;
-		&lt;/dependency&gt;
-	</pre>
+		&lt;/dependency&gt;</code></pre>
 
-	<h3><a id="consumerapi" href="#consumerapi">2.2 Consumer API</a></h3>
+	<h3 class="anchor-heading"><a id="consumerapi" class="anchor-link"></a><a href="#consumerapi">2.2 Consumer API</a></h3>
 
 	The Consumer API allows applications to read streams of data from topics in the Kafka cluster.
 	<p>
@@ -51,15 +49,13 @@
 	<a href="/{{version}}/javadoc/index.html?org/apache/kafka/clients/consumer/KafkaConsumer.html" title="Kafka {{dotVersion}} Javadoc">javadocs</a>.
 	<p>
 	To use the consumer, you can use the following maven dependency:
-	<pre class="brush: xml;">
-		&lt;dependency&gt;
+	<pre class="line-numbers"><code class="language-xml">		&lt;dependency&gt;
 			&lt;groupId&gt;org.apache.kafka&lt;/groupId&gt;
 			&lt;artifactId&gt;kafka-clients&lt;/artifactId&gt;
 			&lt;version&gt;{{fullDotVersion}}&lt;/version&gt;
-		&lt;/dependency&gt;
-	</pre>
+		&lt;/dependency&gt;</code></pre>
 
-	<h3><a id="streamsapi" href="#streamsapi">2.3 Streams API</a></h3>
+	<h3 class="anchor-heading"><a id="streamsapi" class="anchor-link"></a><a href="#streamsapi">2.3 Streams API</a></h3>
 
 	The <a href="#streamsapi">Streams</a> API allows transforming streams of data from input topics to output topics.
 	<p>
@@ -70,28 +66,24 @@
 	<p>
 	To use Kafka Streams you can use the following maven dependency:
 
-	<pre class="brush: xml;">
-		&lt;dependency&gt;
+	<pre class="line-numbers"><code class="language-xml">		&lt;dependency&gt;
 			&lt;groupId&gt;org.apache.kafka&lt;/groupId&gt;
 			&lt;artifactId&gt;kafka-streams&lt;/artifactId&gt;
 			&lt;version&gt;{{fullDotVersion}}&lt;/version&gt;
-		&lt;/dependency&gt;
-	</pre>
+		&lt;/dependency&gt;</code></pre>
 
 	<p>
 	When using Scala you may optionally include the <code>kafka-streams-scala</code> library.  Additional documentation on using the Kafka Streams DSL for Scala is available <a href="/{{version}}/documentation/streams/developer-guide/dsl-api.html#scala-dsl">in the developer guide</a>.
 	<p>
 	To use Kafka Streams DSL for Scala for Scala {{scalaVersion}} you can use the following maven dependency:
 
-	<pre class="brush: xml;">
-		&lt;dependency&gt;
+	<pre class="line-numbers"><code class="language-xml">		&lt;dependency&gt;
 			&lt;groupId&gt;org.apache.kafka&lt;/groupId&gt;
 			&lt;artifactId&gt;kafka-streams-scala_{{scalaVersion}}&lt;/artifactId&gt;
 			&lt;version&gt;{{fullDotVersion}}&lt;/version&gt;
-		&lt;/dependency&gt;
-	</pre>
+		&lt;/dependency&gt;</code></pre>
 
-	<h3><a id="connectapi" href="#connectapi">2.4 Connect API</a></h3>
+	<h3 class="anchor-heading"><a id="connectapi" class="anchor-link"></a><a href="#connectapi">2.4 Connect API</a></h3>
 
 	The Connect API allows implementing connectors that continually pull from some source data system into Kafka or push from Kafka into some sink data system.
 	<p>
@@ -100,18 +92,16 @@
 	Those who want to implement custom connectors can see the <a href="/{{version}}/javadoc/index.html?org/apache/kafka/connect" title="Kafka {{dotVersion}} Javadoc">javadoc</a>.
 	<p>
 
-	<h3><a id="adminapi" href="#adminapi">2.5 Admin API</a></h3>
+	<h3 class="anchor-heading"><a id="adminapi" class="anchor-link"></a><a href="#adminapi">2.5 Admin API</a></h3>
 
 	The Admin API supports managing and inspecting topics, brokers, acls, and other Kafka objects.
 	<p>
 	To use the Admin API, add the following Maven dependency:
-	<pre class="brush: xml;">
-		&lt;dependency&gt;
+	<pre class="line-numbers"><code class="language-xml">		&lt;dependency&gt;
 			&lt;groupId&gt;org.apache.kafka&lt;/groupId&gt;
 			&lt;artifactId&gt;kafka-clients&lt;/artifactId&gt;
 			&lt;version&gt;{{fullDotVersion}}&lt;/version&gt;
-		&lt;/dependency&gt;
-	</pre>
+		&lt;/dependency&gt;</code></pre>
 	For more information about the Admin APIs, see the <a href="/{{version}}/javadoc/index.html?org/apache/kafka/clients/admin/Admin.html" title="Kafka {{dotVersion}} Javadoc">javadoc</a>.
 	<p>
 
diff --git a/docs/configuration.html b/docs/configuration.html
index 9e913a2..34340a9 100644
--- a/docs/configuration.html
+++ b/docs/configuration.html
@@ -18,7 +18,7 @@
 <script id="configuration-template" type="text/x-handlebars-template">
   Kafka uses key-value pairs in the <a href="http://en.wikipedia.org/wiki/.properties">property file format</a> for configuration. These values can be supplied either from a file or programmatically.
 
-  <h3><a id="brokerconfigs" href="#brokerconfigs">3.1 Broker Configs</a></h3>
+  <h3 class="anchor-heading"><a id="brokerconfigs" class="anchor-link"></a><a href="#brokerconfigs">3.1 Broker Configs</a></h3>
 
   The essential configurations are the following:
   <ul>
@@ -33,7 +33,7 @@
 
   <p>More details about broker configuration can be found in the scala class <code>kafka.server.KafkaConfig</code>.</p>
 
-  <h4><a id="dynamicbrokerconfigs" href="#dynamicbrokerconfigs">3.1.1 Updating Broker Configs</a></h4>
+  <h4 class="anchor-heading"><a id="dynamicbrokerconfigs" class="anchor-link"></a><a href="#dynamicbrokerconfigs">3.1.1 Updating Broker Configs</a></h4>
   From Kafka version 1.1 onwards, some of the broker configs can be updated without restarting the broker. See the
   <code>Dynamic Update Mode</code> column in <a href="#brokerconfigs">Broker Configs</a> for the update mode of each broker config.
   <ul>
@@ -43,31 +43,21 @@
   </ul>
 
   To alter the current broker configs for broker id 0 (for example, the number of log cleaner threads):
-  <pre class="brush: bash;">
-  &gt; bin/kafka-configs.sh --bootstrap-server localhost:9092 --entity-type brokers --entity-name 0 --alter --add-config log.cleaner.threads=2
-  </pre>
+  <pre class="line-numbers"><code class="language-bash">  &gt; bin/kafka-configs.sh --bootstrap-server localhost:9092 --entity-type brokers --entity-name 0 --alter --add-config log.cleaner.threads=2</code></pre>
 
   To describe the current dynamic broker configs for broker id 0:
-  <pre class="brush: bash;">
-  &gt; bin/kafka-configs.sh --bootstrap-server localhost:9092 --entity-type brokers --entity-name 0 --describe
-  </pre>
+  <pre class="line-numbers"><code class="language-bash">  &gt; bin/kafka-configs.sh --bootstrap-server localhost:9092 --entity-type brokers --entity-name 0 --describe</code></pre>
 
   To delete a config override and revert to the statically configured or default value for broker id 0 (for example,
   the number of log cleaner threads):
-  <pre class="brush: bash;">
-  &gt; bin/kafka-configs.sh --bootstrap-server localhost:9092 --entity-type brokers --entity-name 0 --alter --delete-config log.cleaner.threads
-  </pre>
+  <pre class="line-numbers"><code class="language-bash">  &gt; bin/kafka-configs.sh --bootstrap-server localhost:9092 --entity-type brokers --entity-name 0 --alter --delete-config log.cleaner.threads</code></pre>
 
   Some configs may be configured as a cluster-wide default to maintain consistent values across the whole cluster.  All brokers
   in the cluster will process the cluster default update. For example, to update log cleaner threads on all brokers:
-  <pre class="brush: bash;">
-  &gt; bin/kafka-configs.sh --bootstrap-server localhost:9092 --entity-type brokers --entity-default --alter --add-config log.cleaner.threads=2
-  </pre>
+  <pre class="line-numbers"><code class="language-bash">  &gt; bin/kafka-configs.sh --bootstrap-server localhost:9092 --entity-type brokers --entity-default --alter --add-config log.cleaner.threads=2</code></pre>
 
   To describe the currently configured dynamic cluster-wide default configs:
-  <pre class="brush: bash;">
-  &gt; bin/kafka-configs.sh --bootstrap-server localhost:9092 --entity-type brokers --entity-default --describe
-  </pre>
+  <pre class="line-numbers"><code class="language-bash">  &gt; bin/kafka-configs.sh --bootstrap-server localhost:9092 --entity-type brokers --entity-default --describe</code></pre>
 
   All configs that are configurable at cluster level may also be configured at per-broker level (e.g. for testing).
   If a config value is defined at different levels, the following order of precedence is used:
@@ -99,10 +89,8 @@
   encoder configs will not be persisted in ZooKeeper. For example, to store SSL key password for listener <code>INTERNAL</code>
   on broker 0:
 
-  <pre class="brush: bash;">
-  &gt; bin/kafka-configs.sh --zookeeper localhost:2182 --zk-tls-config-file zk_tls_config.properties --entity-type brokers --entity-name 0 --alter --add-config
-    'listener.name.internal.ssl.key.password=key-password,password.encoder.secret=secret,password.encoder.iterations=8192'
-  </pre>
+  <pre class="line-numbers"><code class="language-bash">  &gt; bin/kafka-configs.sh --zookeeper localhost:2182 --zk-tls-config-file zk_tls_config.properties --entity-type brokers --entity-name 0 --alter --add-config
+    'listener.name.internal.ssl.key.password=key-password,password.encoder.secret=secret,password.encoder.iterations=8192'</code></pre>
 
   The configuration <code>listener.name.internal.ssl.key.password</code> will be persisted in ZooKeeper in encrypted
   form using the provided encoder configs. The encoder secret and iterations are not persisted in ZooKeeper.
@@ -174,10 +162,8 @@
   In Kafka version 1.1.x, changes to <code>unclean.leader.election.enable</code> take effect only when a new controller is elected.
   Controller re-election may be forced by running:
 
-  <pre class="brush: bash;">
-  &gt; bin/zookeeper-shell.sh localhost
-  rmr /controller
-  </pre>
+  <pre class="line-numbers"><code class="language-bash">  &gt; bin/zookeeper-shell.sh localhost
+  rmr /controller</code></pre>
 
   <h5>Updating Log Cleaner Configs</h5>
   Log cleaner configs may be updated dynamically at cluster-default level used by all brokers. The changes take effect
@@ -231,61 +217,53 @@
   Inter-broker listener must be configured using the static broker configuration <code>inter.broker.listener.name</code>
   or <code>inter.broker.security.protocol</code>.
 
-  <h3><a id="topicconfigs" href="#topicconfigs">3.2 Topic-Level Configs</a></h3>
+  <h3 class="anchor-heading"><a id="topicconfigs" class="anchor-link"></a><a href="#topicconfigs">3.2 Topic-Level Configs</a></h3>
 
   Configurations pertinent to topics have both a server default as well an optional per-topic override. If no per-topic configuration is given the server default is used. The override can be set at topic creation time by giving one or more <code>--config</code> options. This example creates a topic named <i>my-topic</i> with a custom max message size and flush rate:
-  <pre class="brush: bash;">
-  &gt; bin/kafka-topics.sh --bootstrap-server localhost:9092 --create --topic my-topic --partitions 1 \
-      --replication-factor 1 --config max.message.bytes=64000 --config flush.messages=1
-  </pre>
+  <pre class="line-numbers"><code class="language-bash">  &gt; bin/kafka-topics.sh --bootstrap-server localhost:9092 --create --topic my-topic --partitions 1 \
+      --replication-factor 1 --config max.message.bytes=64000 --config flush.messages=1</code></pre>
   Overrides can also be changed or set later using the alter configs command. This example updates the max message size for <i>my-topic</i>:
-  <pre class="brush: bash;">
-  &gt; bin/kafka-configs.sh --bootstrap-server localhost:9092 --entity-type topics --entity-name my-topic
-      --alter --add-config max.message.bytes=128000
-  </pre>
+  <pre class="line-numbers"><code class="language-bash">  &gt; bin/kafka-configs.sh --bootstrap-server localhost:9092 --entity-type topics --entity-name my-topic
+      --alter --add-config max.message.bytes=128000</code></pre>
 
   To check overrides set on the topic you can do
-  <pre class="brush: bash;">
-  &gt; bin/kafka-configs.sh --bootstrap-server localhost:9092 --entity-type topics --entity-name my-topic --describe
-  </pre>
+  <pre class="line-numbers"><code class="language-bash">  &gt; bin/kafka-configs.sh --bootstrap-server localhost:9092 --entity-type topics --entity-name my-topic --describe</code></pre>
 
   To remove an override you can do
-  <pre class="brush: bash;">
-  &gt; bin/kafka-configs.sh --bootstrap-server localhost:9092  --entity-type topics --entity-name my-topic
-      --alter --delete-config max.message.bytes
-  </pre>
+  <pre class="line-numbers"><code class="language-bash">  &gt; bin/kafka-configs.sh --bootstrap-server localhost:9092  --entity-type topics --entity-name my-topic
+      --alter --delete-config max.message.bytes</code></pre>
 
   The following are the topic-level configurations. The server's default configuration for this property is given under the Server Default Property heading. A given server default config value only applies to a topic if it does not have an explicit topic config override.
 
   <!--#include virtual="generated/topic_config.html" -->
 
-  <h3><a id="producerconfigs" href="#producerconfigs">3.3 Producer Configs</a></h3>
+  <h3 class="anchor-heading"><a id="producerconfigs" class="anchor-link"></a><a href="#producerconfigs">3.3 Producer Configs</a></h3>
 
   Below is the configuration of the producer:
   <!--#include virtual="generated/producer_config.html" -->
 
-  <h3><a id="consumerconfigs" href="#consumerconfigs">3.4 Consumer Configs</a></h3>
+  <h3 class="anchor-heading"><a id="consumerconfigs" class="anchor-link"></a><a href="#consumerconfigs">3.4 Consumer Configs</a></h3>
 
   Below is the configuration for the consumer:
   <!--#include virtual="generated/consumer_config.html" -->
 
-  <h3><a id="connectconfigs" href="#connectconfigs">3.5 Kafka Connect Configs</a></h3>
+  <h3 class="anchor-heading"><a id="connectconfigs" class="anchor-link"></a><a href="#connectconfigs">3.5 Kafka Connect Configs</a></h3>
   Below is the configuration of the Kafka Connect framework.
   <!--#include virtual="generated/connect_config.html" -->
 
-  <h4><a id="sourceconnectconfigs" href="#sourceconnectconfigs">3.5.1 Source Connector Configs</a></h4>
+  <h4 class="anchor-heading"><a id="sourceconnectconfigs" class="anchor-link"></a><a href="#sourceconnectconfigs">3.5.1 Source Connector Configs</a></h4>
   Below is the configuration of a source connector.
   <!--#include virtual="generated/source_connector_config.html" -->
 
-  <h4><a id="sinkconnectconfigs" href="#sinkconnectconfigs">3.5.2 Sink Connector Configs</a></h4>
+  <h4 class="anchor-heading"><a id="sinkconnectconfigs" class="anchor-link"></a><a href="#sinkconnectconfigs">3.5.2 Sink Connector Configs</a></h4>
   Below is the configuration of a sink connector.
   <!--#include virtual="generated/sink_connector_config.html" -->
 
-  <h3><a id="streamsconfigs" href="#streamsconfigs">3.6 Kafka Streams Configs</a></h3>
+  <h3 class="anchor-heading"><a id="streamsconfigs" class="anchor-link"></a><a href="#streamsconfigs">3.6 Kafka Streams Configs</a></h3>
   Below is the configuration of the Kafka Streams client library.
   <!--#include virtual="generated/streams_config.html" -->
 
-  <h3><a id="adminclientconfigs" href="#adminclientconfigs">3.7 Admin Configs</a></h3>
+  <h3 class="anchor-heading"><a id="adminclientconfigs" class="anchor-link"></a><a href="#adminclientconfigs">3.7 Admin Configs</a></h3>
   Below is the configuration of the Kafka Admin client library.
   <!--#include virtual="generated/admin_client_config.html" -->
 </script>
diff --git a/docs/design.html b/docs/design.html
index 082e87e..9e08b5f 100644
--- a/docs/design.html
+++ b/docs/design.html
@@ -16,7 +16,7 @@
 -->
 
 <script id="design-template" type="text/x-handlebars-template">
-    <h3><a id="majordesignelements" href="#majordesignelements">4.1 Motivation</a></h3>
+    <h3 class="anchor-heading"><a id="majordesignelements" class="anchor-link"></a><a href="#majordesignelements">4.1 Motivation</a></h3>
     <p>
     We designed Kafka to be able to act as a unified platform for handling all the real-time data feeds <a href="#introduction">a large company might have</a>. To do this we had to think through a fairly broad set of use cases.
     <p>
@@ -32,7 +32,7 @@
     <p>
     Supporting these uses led us to a design with a number of unique elements, more akin to a database log than a traditional messaging system. We will outline some elements of the design in the following sections.
 
-    <h3><a id="persistence" href="#persistence">4.2 Persistence</a></h3>
+    <h3 class="anchor-heading"><a id="persistence" class="anchor-link"></a><a href="#persistence">4.2 Persistence</a></h3>
     <h4><a id="design_filesystem" href="#design_filesystem">Don't fear the filesystem!</a></h4>
     <p>
     Kafka relies heavily on the filesystem for storing and caching messages. There is a general perception that "disks are slow" which makes people skeptical that a persistent structure can offer competitive performance.
@@ -66,7 +66,7 @@
     <p>
     This style of pagecache-centric design is described in an <a href="http://varnish-cache.org/wiki/ArchitectNotes">article</a> on the design of Varnish here (along with a healthy dose of arrogance).
 
-    <h4><a id="design_constanttime" href="#design_constanttime">Constant Time Suffices</a></h4>
+    <h4 class="anchor-heading"><a id="design_constanttime" class="anchor-link"></a><a href="#design_constanttime">Constant Time Suffices</a></h4>
     <p>
     The persistent data structure used in messaging systems are often a per-consumer queue with an associated BTree or other general-purpose random access data structures to maintain metadata about messages.
     BTrees are the most versatile data structure available, and make it possible to support a wide variety of transactional and non-transactional semantics in the messaging system.
@@ -82,7 +82,7 @@
     Having access to virtually unlimited disk space without any performance penalty means that we can provide some features not usually found in a messaging system. For example, in Kafka, instead of attempting to
     delete messages as soon as they are consumed, we can retain messages for a relatively long period (say a week). This leads to a great deal of flexibility for consumers, as we will describe.
 
-    <h3><a id="maximizingefficiency" href="#maximizingefficiency">4.3 Efficiency</a></h3>
+    <h3 class="anchor-heading"><a id="maximizingefficiency" class="anchor-link"></a><a href="#maximizingefficiency">4.3 Efficiency</a></h3>
     <p>
     We have put significant effort into efficiency. One of our primary use cases is handling web activity data, which is very high volume: each page view may generate dozens of writes. Furthermore, we assume each
     message published is read by at least one consumer (often many), hence we strive to make consumption as cheap as possible.
@@ -127,7 +127,7 @@
     <p>
     For more background on the sendfile and zero-copy support in Java, see this <a href="https://developer.ibm.com/articles/j-zerocopy/">article</a>.
 
-    <h4><a id="design_compression" href="#design_compression">End-to-end Batch Compression</a></h4>
+    <h4 class="anchor-heading"><a id="design_compression" class="anchor-link"></a><a href="#design_compression">End-to-end Batch Compression</a></h4>
     <p>
     In some cases the bottleneck is actually not CPU or disk but network bandwidth. This is particularly true for a data pipeline that needs to send messages between data centers over a wide-area network. Of course,
     the user can always compress its messages one at a time without any support needed from Kafka, but this can lead to very poor compression ratios as much of the redundancy is due to repetition between messages of
@@ -138,9 +138,9 @@
     <p>
     Kafka supports GZIP, Snappy, LZ4 and ZStandard compression protocols. More details on compression can be found <a href="https://cwiki.apache.org/confluence/display/KAFKA/Compression">here</a>.
 
-    <h3><a id="theproducer" href="#theproducer">4.4 The Producer</a></h3>
+    <h3 class="anchor-heading"><a id="theproducer" class="anchor-link"></a><a href="#theproducer">4.4 The Producer</a></h3>
 
-    <h4><a id="design_loadbalancing" href="#design_loadbalancing">Load balancing</a></h4>
+    <h4 class="anchor-heading"><a id="design_loadbalancing" class="anchor-link"></a><a href="#design_loadbalancing">Load balancing</a></h4>
     <p>
     The producer sends data directly to the broker that is the leader for the partition without any intervening routing tier. To help the producer do this all Kafka nodes can answer a request for metadata about which
     servers are alive and where the leaders for the partitions of a topic are at any given time to allow the producer to appropriately direct its requests.
@@ -150,7 +150,7 @@
     chosen was a user id then all data for a given user would be sent to the same partition. This in turn will allow consumers to make locality assumptions about their consumption. This style of partitioning is explicitly
     designed to allow locality-sensitive processing in consumers.
 
-    <h4><a id="design_asyncsend" href="#design_asyncsend">Asynchronous send</a></h4>
+    <h4 class="anchor-heading"><a id="design_asyncsend" class="anchor-link"></a><a href="#design_asyncsend">Asynchronous send</a></h4>
     <p>
     Batching is one of the big drivers of efficiency, and to enable batching the Kafka producer will attempt to accumulate data in memory and to send out larger batches in a single request. The batching can be configured
     to accumulate no more than a fixed number of messages and to wait no longer than some fixed latency bound (say 64k or 10 ms). This allows the accumulation of more bytes to send, and few larger I/O operations on the
@@ -159,12 +159,12 @@
     Details on <a href="#producerconfigs">configuration</a> and the <a href="http://kafka.apache.org/082/javadoc/index.html?org/apache/kafka/clients/producer/KafkaProducer.html">api</a> for the producer can be found
     elsewhere in the documentation.
 
-    <h3><a id="theconsumer" href="#theconsumer">4.5 The Consumer</a></h3>
+    <h3 class="anchor-heading"><a id="theconsumer" class="anchor-link"></a><a href="#theconsumer">4.5 The Consumer</a></h3>
 
     The Kafka consumer works by issuing "fetch" requests to the brokers leading the partitions it wants to consume. The consumer specifies its offset in the log with each request and receives back a chunk of log
     beginning from that position. The consumer thus has significant control over this position and can rewind it to re-consume data if need be.
 
-    <h4><a id="design_pull" href="#design_pull">Push vs. pull</a></h4>
+    <h4 class="anchor-heading"><a id="design_pull" class="anchor-link"></a><a href="#design_pull">Push vs. pull</a></h4>
     <p>
     An initial question we considered is whether consumers should pull data from brokers or brokers should push data to the consumer. In this respect Kafka follows a more traditional design, shared by most messaging
     systems, where data is pushed to the broker from the producer and pulled from the broker by the consumer. Some logging-centric systems, such as <a href="http://github.com/facebook/scribe">Scribe</a> and
@@ -187,7 +187,7 @@
     scale led us to feel that involving thousands of disks in the system across many applications would not actually make things more reliable and would be a nightmare to operate. And in practice we have found that we
     can run a pipeline with strong SLAs at large scale without a need for producer persistence.
 
-    <h4><a id="design_consumerposition" href="#design_consumerposition">Consumer Position</a></h4>
+    <h4 class="anchor-heading"><a id="design_consumerposition" class="anchor-link"></a><a href="#design_consumerposition">Consumer Position</a></h4>
     Keeping track of <i>what</i> has been consumed is, surprisingly, one of the key performance points of a messaging system.
     <p>
     Most messaging systems keep metadata about what messages have been consumed on the broker. That is, as a message is handed out to a consumer, the broker either records that fact locally immediately or it may wait
@@ -208,7 +208,7 @@
     There is a side benefit of this decision. A consumer can deliberately <i>rewind</i> back to an old offset and re-consume data. This violates the common contract of a queue, but turns out to be an essential feature
     for many consumers. For example, if the consumer code has a bug and is discovered after some messages are consumed, the consumer can re-consume those messages once the bug is fixed.
 
-    <h4><a id="design_offlineload" href="#design_offlineload">Offline Data Load</a></h4>
+    <h4 class="anchor-heading"><a id="design_offlineload" class="anchor-link"></a><a href="#design_offlineload">Offline Data Load</a></h4>
 
     Scalable persistence allows for the possibility of consumers that only periodically consume such as batch data loads that periodically bulk-load data into an offline system such as Hadoop or a relational data
     warehouse.
@@ -216,7 +216,7 @@
     In the case of Hadoop we parallelize the data load by splitting the load over individual map tasks, one for each node/topic/partition combination, allowing full parallelism in the loading. Hadoop provides the task
     management, and tasks which fail can restart without danger of duplicate data&mdash;they simply restart from their original position.
 
-    <h4><a id="static_membership" href="#static_membership">Static Membership</a></h4>
+    <h4 class="anchor-heading"><a id="static_membership" class="anchor-link"></a><a href="#static_membership">Static Membership</a></h4>
     Static membership aims to improve the availability of stream applications, consumer groups and other applications built on top of the group rebalance protocol.
     The rebalance protocol relies on the group coordinator to allocate entity ids to group members. These generated ids are ephemeral and will change when members restart and rejoin.
     For consumer based apps, this "dynamic membership" can cause a large percentage of tasks re-assigned to different instances during administrative operations
@@ -238,7 +238,7 @@
     For more details, see
     <a href="https://cwiki.apache.org/confluence/display/KAFKA/KIP-345%3A+Introduce+static+membership+protocol+to+reduce+consumer+rebalances">KIP-345</a>
 
-    <h3><a id="semantics" href="#semantics">4.6 Message Delivery Semantics</a></h3>
+    <h3 class="anchor-heading"><a id="semantics" class="anchor-link"></a><a href="#semantics">4.6 Message Delivery Semantics</a></h3>
     <p>
     Now that we understand a little about how producers and consumers work, let's discuss the semantic guarantees Kafka provides between producer and consumer. Clearly there are multiple possible message delivery
     guarantees that could be provided:
@@ -303,7 +303,7 @@
     offset which makes implementing this feasible (see also <a href="https://kafka.apache.org/documentation/#connect">Kafka Connect</a>). Otherwise, Kafka guarantees at-least-once delivery by default, and allows
     the user to implement at-most-once delivery by disabling retries on the producer and committing offsets in the consumer prior to processing a batch of messages.
 
-    <h3><a id="replication" href="#replication">4.7 Replication</a></h3>
+    <h3 class="anchor-heading"><a id="replication" class="anchor-link"></a><a href="#replication">4.7 Replication</a></h3>
     <p>
     Kafka replicates the log for each topic's partitions across a configurable number of servers (you can set this replication factor on a topic-by-topic basis). This allows automatic failover to these replicas when a
     server in the cluster fails so messages remain available in the presence of failures.
@@ -413,7 +413,7 @@
     your data or violate consistency by taking what remains on an existing server as your new source of truth.
 
 
-    <h4><a id="design_ha" href="#design_ha">Availability and Durability Guarantees</a></h4>
+    <h4 class="anchor-heading"><a id="design_ha" class="anchor-link"></a><a href="#design_ha">Availability and Durability Guarantees</a></h4>
 
     When writing to Kafka, producers can choose whether they wait for the message to be acknowledged by 0,1 or all (-1) replicas.
     Note that "acknowledgement by all replicas" does not guarantee that the full set of assigned replicas have received the message. By default, when acks=all, acknowledgement happens as soon as all the current in-sync
@@ -432,7 +432,7 @@
     </ol>
 
 
-    <h4><a id="design_replicamanagment" href="#design_replicamanagment">Replica Management</a></h4>
+    <h4 class="anchor-heading"><a id="design_replicamanagment" class="anchor-link"></a><a href="#design_replicamanagment">Replica Management</a></h4>
 
     The above discussion on replicated logs really covers only a single log, i.e. one topic partition. However a Kafka cluster will manage hundreds or thousands of these partitions. We attempt to balance partitions
     within a cluster in a round-robin fashion to avoid clustering all partitions for high-volume topics on a small number of nodes. Likewise we try to balance leadership so that each node is the leader for a proportional
@@ -443,7 +443,7 @@
     affected partitions in a failed broker. The result is that we are able to batch together many of the required leadership change notifications which makes the election process far cheaper and faster for a large number
     of partitions. If the controller fails, one of the surviving brokers will become the new controller.
 
-    <h3><a id="compaction" href="#compaction">4.8 Log Compaction</a></h3>
+    <h3 class="anchor-heading"><a id="compaction" class="anchor-link"></a><a href="#compaction">4.8 Log Compaction</a></h3>
 
     Log compaction ensures that Kafka will always retain at least the last known value for each message key within the log of data for a single topic partition.  It addresses use cases and scenarios such as restoring
     state after application crashes or system failure, or reloading caches after application restarts during operational maintenance. Let's dive into these use cases in more detail and then describe how compaction works.
@@ -453,8 +453,7 @@
     <p>
     Let's discuss a concrete example of such a stream. Say we have a topic containing user email addresses; every time a user updates their email address we send a message to this topic using their user id as the
     primary key. Now say we send the following messages over some time period for a user with id 123, each message corresponding to a change in email address (messages for other ids are omitted):
-    <pre class="brush: text;">
-        123 => bill@microsoft.com
+    <pre class="line-numbers"><code class="language-text"><code>       123 => bill@microsoft.com
                 .
                 .
                 .
@@ -462,8 +461,7 @@
                 .
                 .
                 .
-        123 => bill@gmail.com
-    </pre>
+        123 => bill@gmail.com</code></pre>
     Log compaction gives us a more granular retention mechanism so that we are guaranteed to retain at least the last update for each primary key (e.g. <code>bill@gmail.com</code>). By doing this we guarantee that the
     log contains a full snapshot of the final value for every key not just keys that changed recently. This means downstream consumers can restore their own state off this topic without us having to retain a complete
     log of all changes.
@@ -497,7 +495,7 @@
     Unlike most log-structured storage systems Kafka is built for subscription and organizes data for fast linear reads and writes. Unlike Databus, Kafka acts as a source-of-truth store so it is useful even in
     situations where the upstream data source would not otherwise be replayable.
 
-    <h4><a id="design_compactionbasics" href="#design_compactionbasics">Log Compaction Basics</a></h4>
+    <h4 class="anchor-heading"><a id="design_compactionbasics" class="anchor-link"></a><a href="#design_compactionbasics">Log Compaction Basics</a></h4>
 
     Here is a high-level picture that shows the logical structure of a Kafka log with the offset for each message.
     <p>
@@ -517,7 +515,10 @@
     <p>
     <img class="centered" src="/{{version}}/images/log_compaction.png">
     <p>
-    <h4><a id="design_compactionguarantees" href="#design_compactionguarantees">What guarantees does log compaction provide?</a></h4>
+    <h4 class="anchor-heading">
+        <a class="anchor-link" id="design_compactionguarantees" href="#design_compactionguarantees"></a>
+        <a href="#design_compactionguarantees">What guarantees does log compaction provide</a>?
+    </h4>
 
     Log compaction guarantees the following:
     <ol>
@@ -531,7 +532,7 @@
     concurrently with reads, it is possible for a consumer to miss delete markers if it lags by more than <code>delete.retention.ms</code>.
     </ol>
 
-    <h4><a id="design_compactiondetails" href="#design_compactiondetails">Log Compaction Details</a></h4>
+    <h4 class="anchor-heading"><a id="design_compactiondetails" class="anchor-link"></a><a href="#design_compactiondetails">Log Compaction Details</a></h4>
 
     Log compaction is handled by the log cleaner, a pool of background threads that recopy log segment files, removing records whose key appears in the head of the log. Each compactor thread works as follows:
     <ol>
@@ -543,11 +544,11 @@
     (assuming 1k messages).
     </ol>
     <p>
-    <h4><a id="design_compactionconfig" href="#design_compactionconfig">Configuring The Log Cleaner</a></h4>
+    <h4 class="anchor-heading"><a id="design_compactionconfig" class="anchor-link"></a><a href="#design_compactionconfig">Configuring The Log Cleaner</a></h4>
 
     The log cleaner is enabled by default. This will start the pool of cleaner threads.
     To enable log cleaning on a particular topic, add the log-specific property
-    <pre class="brush: text;"> log.cleanup.policy=compact</pre>
+    <pre class="language-text"><code> log.cleanup.policy=compact</code></pre>
 
     The <code>log.cleanup.policy</code> property is a broker configuration setting defined
     in the broker's <code>server.properties</code> file; it affects all of the topics
@@ -555,13 +556,13 @@
     <a href="/documentation.html#brokerconfigs">here</a>.
 
     The log cleaner can be configured to retain a minimum amount of the uncompacted "head" of the log. This is enabled by setting the compaction time lag.
-    <pre class="brush: text;">  log.cleaner.min.compaction.lag.ms</pre>
+    <pre class="language-text"><code>  log.cleaner.min.compaction.lag.ms</code></pre>
 
     This can be used to prevent messages newer than a minimum message age from being subject to compaction. If not set, all log segments are eligible for compaction except for the last segment, i.e. the one currently
     being written to. The active segment will not be compacted even if all of its messages are older than the minimum compaction time lag.
 
     The log cleaner can be configured to ensure a maximum delay after which the uncompacted "head" of the log becomes eligible for log compaction.
-    <pre class="brush: text;">  log.cleaner.max.compaction.lag.ms</pre>
+    <pre class="language-text"><code> log.cleaner.max.compaction.lag.ms</code></pre>
 
     This can be used to prevent log with low produce rate from remaining ineligible for compaction for an unbounded duration. If not set, logs that do not exceed min.cleanable.dirty.ratio are not compacted.
     Note that this compaction deadline is not a hard guarantee since it is still subjected to the availability of log cleaner threads and the actual compaction time.
@@ -570,7 +571,7 @@
     <p>
     Further cleaner configurations are described <a href="/documentation.html#brokerconfigs">here</a>.
 
-    <h3><a id="design_quotas" href="#design_quotas">4.9 Quotas</a></h3>
+    <h3 class="anchor-heading"><a id="design_quotas" class="anchor-link"></a><a href="#design_quotas">4.9 Quotas</a></h3>
     <p>
     Kafka cluster has the ability to enforce quotas on requests to control the broker resources used by clients. Two types
     of client quotas can be enforced by Kafka brokers for each group of clients sharing a quota:
@@ -580,14 +581,17 @@
     </ol>
     </p>
 
-    <h4><a id="design_quotasnecessary" href="#design_quotasnecessary">Why are quotas necessary?</a></h4>
+    <h4 class="anchor-heading">
+        <a class="anchor-link" id="design_quotasnecessary" href="#design_quotasnecessary"></a>
+        <a href="#design_quotasnecessary">Why are quotas necessary</a>?
+    </h4>
     <p>
     It is possible for producers and consumers to produce/consume very high volumes of data or generate requests at a very high
     rate and thus monopolize broker resources, cause network saturation and generally DOS other clients and the brokers themselves.
     Having quotas protects against these issues and is all the more important in large multi-tenant clusters where a small set of badly behaved clients can degrade user experience for the well behaved ones.
     In fact, when running Kafka as a service this even makes it possible to enforce API limits according to an agreed upon contract.
     </p>
-    <h4><a id="design_quotasgroups" href="#design_quotasgroups">Client groups</a></h4>
+    <h4 class="anchor-heading"><a id="design_quotasgroups" class="anchor-link"></a><a href="#design_quotasgroups">Client groups</a></h4>
         The identity of Kafka clients is the user principal which represents an authenticated user in a secure cluster. In a cluster that supports unauthenticated clients, user principal is a grouping of unauthenticated
         users
         chosen by the broker using a configurable <code>PrincipalBuilder</code>. Client-id is a logical grouping of clients with a meaningful name chosen by the client application. The tuple (user, client-id) defines
@@ -596,7 +600,7 @@
         Quotas can be applied to (user, client-id), user or client-id groups. For a given connection, the most specific quota matching the connection is applied. All connections of a quota group share the quota configured for the group.
         For example, if (user="test-user", client-id="test-client") has a produce quota of 10MB/sec, this is shared across all producer instances of user "test-user" with the client-id "test-client".
     </p>
-    <h4><a id="design_quotasconfig" href="#design_quotasconfig">Quota Configuration</a></h4>
+    <h4 class="anchor-heading"><a id="design_quotasconfig" class="anchor-link"></a><a href="#design_quotasconfig">Quota Configuration</a></h4>
     <p>
         Quota configuration may be defined for (user, client-id), user and client-id groups. It is possible to override the default quota at any of the quota levels that needs a higher (or even lower) quota.
         The mechanism is similar to the per-topic log config overrides.
@@ -620,14 +624,14 @@
         Broker properties (quota.producer.default, quota.consumer.default) can also be used to set defaults of network bandwidth quotas for client-id groups. These properties are being deprecated and will be removed in a later release.
         Default quotas for client-id can be set in Zookeeper similar to the other quota overrides and defaults.
     </p>
-    <h4><a id="design_quotasbandwidth" href="#design_quotasbandwidth">Network Bandwidth Quotas</a></h4>
+    <h4 class="anchor-heading"><a id="design_quotasbandwidth" class="anchor-link"></a><a href="#design_quotasbandwidth">Network Bandwidth Quotas</a></h4>
     <p>
         Network bandwidth quotas are defined as the byte rate threshold for each group of clients sharing a quota.
         By default, each unique client group receives a fixed quota in bytes/sec as configured by the cluster.
         This quota is defined on a per-broker basis. Each group of clients can publish/fetch a maximum of X bytes/sec
         per broker before clients are throttled.
     </p>
-    <h4><a id="design_quotascpu" href="#design_quotascpu">Request Rate Quotas</a></h4>
+    <h4 class="anchor-heading"><a id="design_quotascpu" class="anchor-link"></a><a href="#design_quotascpu">Request Rate Quotas</a></h4>
     <p>
         Request rate quotas are defined as the percentage of time a client can utilize on request handler I/O
         threads and network threads of each broker within a quota window. A quota of <tt>n%</tt> represents
@@ -637,7 +641,7 @@
         on the number of cores available on the broker host, request rate quotas represent the total percentage of CPU
         that may be used by each group of clients sharing the quota.
     </p>
-    <h4><a id="design_quotasenforcement" href="#design_quotasenforcement">Enforcement</a></h4>
+    <h4 class="anchor-heading"><a id="design_quotasenforcement" class="anchor-link"></a><a href="#design_quotasenforcement">Enforcement</a></h4>
     <p>
         By default, each unique client group receives a fixed quota as configured by the cluster.
         This quota is defined on a per-broker basis. Each client can utilize this quota per broker before it gets throttled. We decided that defining these quotas per broker is much better than
diff --git a/docs/implementation.html b/docs/implementation.html
index c28c33b..8c89546 100644
--- a/docs/implementation.html
+++ b/docs/implementation.html
@@ -16,24 +16,23 @@
 -->
 
 <script id="implementation-template" type="text/x-handlebars-template">
-    <h3><a id="networklayer" href="#networklayer">5.1 Network Layer</a></h3>
+    <h3 class="anchor-heading"><a id="networklayer" class="anchor-link"></a><a href="#networklayer">5.1 Network Layer</a></h3>
     <p>
     The network layer is a fairly straight-forward NIO server, and will not be described in great detail. The sendfile implementation is done by giving the <code>MessageSet</code> interface a <code>writeTo</code> method. This allows the file-backed message set to use the more efficient <code>transferTo</code> implementation instead of an in-process buffered write. The threading model is a single acceptor thread and <i>N</i> processor threads which handle a fixed number of connections eac [...]
     </p>
-    <h3><a id="messages" href="#messages">5.2 Messages</a></h3>
+    <h3 class="anchor-heading"><a id="messages" class="anchor-link"></a><a href="#messages">5.2 Messages</a></h3>
     <p>
     Messages consist of a variable-length header, a variable-length opaque key byte array and a variable-length opaque value byte array. The format of the header is described in the following section.
     Leaving the key and value opaque is the right decision: there is a great deal of progress being made on serialization libraries right now, and any particular choice is unlikely to be right for all uses. Needless to say a particular application using Kafka would likely mandate a particular serialization type as part of its usage. The <code>RecordBatch</code> interface is simply an iterator over messages with specialized methods for bulk reading and writing to an NIO <code>Channel</code>.</p>
 
-    <h3><a id="messageformat" href="#messageformat">5.3 Message Format</a></h3>
+    <h3 class="anchor-heading"><a id="messageformat" class="anchor-link"></a><a href="#messageformat">5.3 Message Format</a></h3>
     <p>
     Messages (aka Records) are always written in batches. The technical term for a batch of messages is a record batch, and a record batch contains one or more records. In the degenerate case, we could have a record batch containing a single record.
     Record batches and records have their own headers. The format of each is described below. </p>
 
-    <h4><a id="recordbatch" href="#recordbatch">5.3.1 Record Batch</a></h4>
+    <h4 class="anchor-heading"><a id="recordbatch" class="anchor-link"></a><a href="#recordbatch">5.3.1 Record Batch</a></h4>
 	<p> The following is the on-disk format of a RecordBatch. </p>
-	<p><pre class="brush: java;">
-		baseOffset: int64
+	<p><pre class="line-numbers"><code class="language-java">		baseOffset: int64
 		batchLength: int32
 		partitionLeaderEpoch: int32
 		magic: int8 (current magic value is 2)
@@ -55,8 +54,7 @@
 		producerId: int64
 		producerEpoch: int16
 		baseSequence: int32
-		records: [Record]
-	</pre></p>
+		records: [Record]</code></pre></p>
     <p> Note that when compression is enabled, the compressed record data is serialized directly following the count of the number of records. </p>
 
     <p>The CRC covers the data from the attributes to the end of the batch (i.e. all the bytes that follow the CRC). It is located after the magic byte, which
@@ -70,19 +68,16 @@
     it is possible to have empty batches in the log when all the records in the batch are cleaned but batch is still retained in order to preserve a producer's last sequence number. One oddity here is that the firstTimestamp
     field is not preserved during compaction, so it will change if the first record in the batch is compacted away.</p>
 
-    <h5><a id="controlbatch" href="#controlbatch">5.3.1.1 Control Batches</a></h5>
+    <h5 class="anchor-heading"><a id="controlbatch" class="anchor-link"></a><a href="#controlbatch">5.3.1.1 Control Batches</a></h5>
     <p>A control batch contains a single record called the control record. Control records should not be passed on to applications. Instead, they are used by consumers to filter out aborted transactional messages.</p>
     <p> The key of a control record conforms to the following schema: </p>
-    <p><pre class="brush: java">
-       version: int16 (current version is 0)
-       type: int16 (0 indicates an abort marker, 1 indicates a commit)
-    </pre></p>
+    <p><pre class="line-numbers"><code class="language-java">       version: int16 (current version is 0)
+       type: int16 (0 indicates an abort marker, 1 indicates a commit)</code></pre></p>
     <p>The schema for the value of a control record is dependent on the type. The value is opaque to clients.</p>
 
-    <h4><a id="record" href="#record">5.3.2 Record</a></h4>
+    <h4 class="anchor-heading"><a id="record" class="anchor-link"></a><a href="#record">5.3.2 Record</a></h4>
 	<p>Record level headers were introduced in Kafka 0.11.0. The on-disk format of a record with Headers is delineated below. </p>
-	<p><pre class="brush: java;">
-		length: varint
+	<p><pre class="line-numbers"><code class="language-java">		length: varint
 		attributes: int8
 			bit 0~7: unused
 		timestampDelta: varint
@@ -91,27 +86,23 @@
 		key: byte[]
 		valueLen: varint
 		value: byte[]
-		Headers => [Header]
-	</pre></p>
-	<h5><a id="recordheader" href="#recordheader">5.3.2.1 Record Header</a></h5>
-	<p><pre class="brush: java;">
-		headerKeyLength: varint
+		Headers => [Header]</code></pre></p>
+	<h5 class="anchor-heading"><a id="recordheader" class="anchor-link"></a><a href="#recordheader">5.3.2.1 Record Header</a></h5>
+	<p><pre class="line-numbers"><code class="language-java">		headerKeyLength: varint
 		headerKey: String
 		headerValueLength: varint
-		Value: byte[]
-	</pre></p>
+		Value: byte[]</code></pre></p>
     <p>We use the same varint encoding as Protobuf. More information on the latter can be found <a href="https://developers.google.com/protocol-buffers/docs/encoding#varints">here</a>. The count of headers in a record
     is also encoded as a varint.</p>
 
-    <h4><a id="messageset" href="#messageset">5.3.3 Old Message Format</a></h4>
+    <h4 class="anchor-heading"><a id="messageset" class="anchor-link"></a><a href="#messageset">5.3.3 Old Message Format</a></h4>
     <p>
         Prior to Kafka 0.11, messages were transferred and stored in <i>message sets</i>. In a message set, each message has its own metadata. Note that although message sets are represented as an array,
         they are not preceded by an int32 array size like other array elements in the protocol.
     </p>
 
     <b>Message Set:</b><br>
-    <p><pre class="brush: java;">
-    MessageSet (Version: 0) => [offset message_size message]
+    <p><pre class="line-numbers"><code class="language-java">    MessageSet (Version: 0) => [offset message_size message]
         offset => INT64
         message_size => INT32
         message => crc magic_byte attributes key value
@@ -124,10 +115,8 @@
                     2: snappy
                 bit 3~7: unused
             key => BYTES
-            value => BYTES
-    </pre></p>
-    <p><pre class="brush: java;">
-    MessageSet (Version: 1) => [offset message_size message]
+            value => BYTES</code></pre></p>
+    <p><pre class="line-numbers"><code class="language-java">    MessageSet (Version: 1) => [offset message_size message]
         offset => INT64
         message_size => INT32
         message => crc magic_byte attributes timestamp key value
@@ -145,8 +134,7 @@
                 bit 4~7: unused
             timestamp => INT64
             key => BYTES
-            value => BYTES
-    </pre></p>
+            value => BYTES</code></pre></p>
     <p>
         In versions prior to Kafka 0.10, the only supported message format version (which is indicated in the magic value) was 0. Message format version 1 was introduced with timestamp support in version 0.10.
     <ul>
@@ -170,7 +158,7 @@
 
     <p>The crc field contains the CRC32 (and not CRC-32C) of the subsequent message bytes (i.e. from magic byte to the value).</p>
 
-    <h3><a id="log" href="#log">5.4 Log</a></h3>
+    <h3 class="anchor-heading"><a id="log" class="anchor-link"></a><a href="#log">5.4 Log</a></h3>
     <p>
     A log for a topic named "my_topic" with two partitions consists of two directories (namely <code>my_topic_0</code> and <code>my_topic_1</code>) populated with data files containing the messages for that topic. The format of the log files is a sequence of "log entries""; each log entry is a 4 byte integer <i>N</i> storing the message length which is followed by the <i>N</i> message bytes. Each message is uniquely identified by a 64-bit integer <i>offset</i> giving the byte position of [...]
     </p>
@@ -181,11 +169,11 @@
     The use of the message offset as the message id is unusual. Our original idea was to use a GUID generated by the producer, and maintain a mapping from GUID to offset on each broker. But since a consumer must maintain an ID for each server, the global uniqueness of the GUID provides no value. Furthermore, the complexity of maintaining the mapping from a random id to an offset requires a heavy weight index structure which must be synchronized with disk, essentially requiring a full per [...]
     </p>
     <img class="centered" src="/{{version}}/images/kafka_log.png">
-    <h4><a id="impl_writes" href="#impl_writes">Writes</a></h4>
+    <h4 class="anchor-heading"><a id="impl_writes" class="anchor-link"></a><a href="#impl_writes">Writes</a></h4>
     <p>
     The log allows serial appends which always go to the last file. This file is rolled over to a fresh file when it reaches a configurable size (say 1GB). The log takes two configuration parameters: <i>M</i>, which gives the number of messages to write before forcing the OS to flush the file to disk, and <i>S</i>, which gives a number of seconds after which a flush is forced. This gives a durability guarantee of losing at most <i>M</i> messages or <i>S</i> seconds of data in the event o [...]
     </p>
-    <h4><a id="impl_reads" href="#impl_reads">Reads</a></h4>
+    <h4 class="anchor-heading"><a id="impl_reads" class="anchor-link"></a><a href="#impl_reads">Reads</a></h4>
     <p>
     Reads are done by giving the 64-bit logical offset of a message and an <i>S</i>-byte max chunk size. This will return an iterator over the messages contained in the <i>S</i>-byte buffer. <i>S</i> is intended to be larger than any single message, but in the event of an abnormally large message, the read can be retried multiple times, each time doubling the buffer size, until the message is read successfully. A maximum message and buffer size can be specified to make the server reject  [...]
     </p>
@@ -198,26 +186,22 @@
 
     <p> The following is the format of the results sent to the consumer.
 
-    <pre class="brush: text;">
-    MessageSetSend (fetch result)
+    <pre class="line-numbers"><code class="language-text">    MessageSetSend (fetch result)
 
     total length     : 4 bytes
     error code       : 2 bytes
     message 1        : x bytes
     ...
-    message n        : x bytes
-    </pre>
+    message n        : x bytes</code></pre>
 
-    <pre class="brush: text;">
-    MultiMessageSetSend (multiFetch result)
+    <pre class="line-numbers"><code class="language-text">    MultiMessageSetSend (multiFetch result)
 
     total length       : 4 bytes
     error code         : 2 bytes
     messageSetSend 1
     ...
-    messageSetSend n
-    </pre>
-    <h4><a id="impl_deletes" href="#impl_deletes">Deletes</a></h4>
+    messageSetSend n</code></pre>
+    <h4 class="anchor-heading"><a id="impl_deletes" class="anchor-link"></a><a href="#impl_deletes">Deletes</a></h4>
     <p>
     Data is deleted one log segment at a time. The log manager applies two metrics to identify segments which are
         eligible for deletion: time and size. For time-based policies, the record timestamps are considered, with the
@@ -229,7 +213,7 @@
         style segment list implementation that provides consistent views to allow a binary search to proceed on an
         immutable static snapshot view of the log segments while deletes are progressing.
     </p>
-    <h4><a id="impl_guarantees" href="#impl_guarantees">Guarantees</a></h4>
+    <h4 class="anchor-heading"><a id="impl_guarantees" class="anchor-link"></a><a href="#impl_guarantees">Guarantees</a></h4>
     <p>
     The log provides a configuration parameter <i>M</i> which controls the maximum number of messages that are written before forcing a flush to disk. On startup a log recovery process is run that iterates over all messages in the newest log segment and verifies that each message entry is valid. A message entry is valid if the sum of its size and offset are less than the length of the file AND the CRC32 of the message payload matches the CRC stored with the message. In the event corrupti [...]
     </p>
@@ -237,8 +221,8 @@
     Note that two kinds of corruption must be handled: truncation in which an unwritten block is lost due to a crash, and corruption in which a nonsense block is ADDED to the file. The reason for this is that in general the OS makes no guarantee of the write order between the file inode and the actual block data so in addition to losing written data the file can gain nonsense data if the inode is updated with a new size but a crash occurs before the block containing that data is written. [...]
     </p>
 
-    <h3><a id="distributionimpl" href="#distributionimpl">5.5 Distribution</a></h3>
-    <h4><a id="impl_offsettracking" href="#impl_offsettracking">Consumer Offset Tracking</a></h4>
+    <h3 class="anchor-heading"><a id="distributionimpl" class="anchor-link"></a><a href="#distributionimpl">5.5 Distribution</a></h3>
+    <h4 class="anchor-heading"><a id="impl_offsettracking" class="anchor-link"></a><a href="#impl_offsettracking">Consumer Offset Tracking</a></h4>
     <p>
     Kafka consumer tracks the maximum offset it has consumed in each partition and has the capability to commit offsets so
         that it can resume from those offsets in the event of a restart. Kafka provides the option to store all the offsets for
@@ -265,36 +249,32 @@
         CoordinatorLoadInProgressException and the consumer may retry the OffsetFetchRequest after backing off.
     </p>
 
-    <h4><a id="impl_zookeeper" href="#impl_zookeeper">ZooKeeper Directories</a></h4>
+    <h4 class="anchor-heading"><a id="impl_zookeeper" class="anchor-link"></a><a href="#impl_zookeeper">ZooKeeper Directories</a></h4>
     <p>
     The following gives the ZooKeeper structures and algorithms used for co-ordination between consumers and brokers.
     </p>
 
-    <h4><a id="impl_zknotation" href="#impl_zknotation">Notation</a></h4>
+    <h4 class="anchor-heading"><a id="impl_zknotation" class="anchor-link"></a><a href="#impl_zknotation">Notation</a></h4>
     <p>
     When an element in a path is denoted <code>[xyz]</code>, that means that the value of xyz is not fixed and there is in fact a ZooKeeper znode for each possible value of xyz. For example <code>/topics/[topic]</code> would be a directory named /topics containing a sub-directory for each topic name. Numerical ranges are also given such as <code>[0...5]</code> to indicate the subdirectories 0, 1, 2, 3, 4. An arrow <code>-></code> is used to indicate the contents of a znode. For example < [...]
     </p>
 
-    <h4><a id="impl_zkbroker" href="#impl_zkbroker">Broker Node Registry</a></h4>
-    <pre class="brush: json;">
-    /brokers/ids/[0...N] --> {"jmx_port":...,"timestamp":...,"endpoints":[...],"host":...,"version":...,"port":...} (ephemeral node)
-    </pre>
+    <h4 class="anchor-heading"><a id="impl_zkbroker" class="anchor-link"></a><a href="#impl_zkbroker">Broker Node Registry</a></h4>
+    <pre class="line-numbers"><code class="language-json">    /brokers/ids/[0...N] --> {"jmx_port":...,"timestamp":...,"endpoints":[...],"host":...,"version":...,"port":...} (ephemeral node)</code></pre>
     <p>
     This is a list of all present broker nodes, each of which provides a unique logical broker id which identifies it to consumers (which must be given as part of its configuration). On startup, a broker node registers itself by creating a znode with the logical broker id under /brokers/ids. The purpose of the logical broker id is to allow a broker to be moved to a different physical machine without affecting consumers. An attempt to register a broker id that is already in use (say becau [...]
     </p>
     <p>
     Since the broker registers itself in ZooKeeper using ephemeral znodes, this registration is dynamic and will disappear if the broker is shutdown or dies (thus notifying consumers it is no longer available).
     </p>
-    <h4><a id="impl_zktopic" href="#impl_zktopic">Broker Topic Registry</a></h4>
-    <pre class="brush: json;">
-    /brokers/topics/[topic]/partitions/[0...N]/state --> {"controller_epoch":...,"leader":...,"version":...,"leader_epoch":...,"isr":[...]} (ephemeral node)
-    </pre>
+    <h4 class="anchor-heading"><a id="impl_zktopic" class="anchor-link"></a><a href="#impl_zktopic">Broker Topic Registry</a></h4>
+    <pre class="line-numbers"><code class="language-json">    /brokers/topics/[topic]/partitions/[0...N]/state --> {"controller_epoch":...,"leader":...,"version":...,"leader_epoch":...,"isr":[...]} (ephemeral node)</code></pre>
 
     <p>
     Each broker registers itself under the topics it maintains and stores the number of partitions for that topic.
     </p>
 
-    <h4><a id="impl_clusterid" href="#impl_clusterid">Cluster Id</a></h4>
+    <h4 class="anchor-heading"><a id="impl_clusterid" class="anchor-link"></a><a href="#impl_clusterid">Cluster Id</a></h4>
 
     <p>
         The cluster id is a unique and immutable identifier assigned to a Kafka cluster. The cluster id can have a maximum of 22 characters and the allowed characters are defined by the regular expression [a-zA-Z0-9_\-]+, which corresponds to the characters used by the URL-safe Base64 variant with no padding. Conceptually, it is auto-generated when a cluster is started for the first time.
@@ -303,7 +283,7 @@
         Implementation-wise, it is generated when a broker with version 0.10.1 or later is successfully started for the first time. The broker tries to get the cluster id from the <code>/cluster/id</code> znode during startup. If the znode does not exist, the broker generates a new cluster id and creates the znode with this cluster id.
     </p>
 
-    <h4><a id="impl_brokerregistration" href="#impl_brokerregistration">Broker node registration</a></h4>
+    <h4 class="anchor-heading"><a id="impl_brokerregistration" class="anchor-link"></a><a href="#impl_brokerregistration">Broker node registration</a></h4>
 
     <p>
     The broker nodes are basically independent, so they only publish information about what they have. When a broker joins, it registers itself under the broker node registry directory and writes information about its host name and port. The broker also register the list of existing topics and their logical partitions in the broker topic registry. New topics are registered dynamically when they are created on the broker.
diff --git a/docs/introduction.html b/docs/introduction.html
index b10da72..da79386 100644
--- a/docs/introduction.html
+++ b/docs/introduction.html
@@ -18,198 +18,203 @@
 <script><!--#include virtual="js/templateData.js" --></script>
 
 <script id="introduction-template" type="text/x-handlebars-template">
-  <h3> Apache Kafka&reg; is <i>a distributed streaming platform</i>. What exactly does that mean?</h3>
-  <p>A streaming platform has three key capabilities:</p>
-  <ul>
-    <li>Publish and subscribe to streams of records, similar to a message queue or enterprise messaging system.
-    <li>Store streams of records in a fault-tolerant durable way.
-    <li>Process streams of records as they occur.
-  </ul>
-  <p>Kafka is generally used for two broad classes of applications:</p>
-  <ul>
-    <li>Building real-time streaming data pipelines that reliably get data between systems or applications
-    <li>Building real-time streaming applications that transform or react to the streams of data
-  </ul>
-  <p>To understand how Kafka does these things, let's dive in and explore Kafka's capabilities from the bottom up.</p>
-  <p>First a few concepts:</p>
-  <ul>
-    <li>Kafka runs as a cluster on one or more servers that can span multiple datacenters.
-      <li>The Kafka cluster stores streams of <i>records</i> in categories called <i>topics</i>.
-    <li>Each record consists of a key, a value, and a timestamp.
-  </ul>
-  <p>Kafka has five core APIs:</p>
-  <div style="overflow: hidden;">
-      <ul style="float: left; width: 40%;">
-      <li>The <a href="/documentation.html#producerapi">Producer API</a> allows an application to publish a stream of records to one or more Kafka topics.
-      <li>The <a href="/documentation.html#consumerapi">Consumer API</a> allows an application to subscribe to one or more topics and process the stream of records produced to them.
-    <li>The <a href="/documentation/streams">Streams API</a> allows an application to act as a <i>stream processor</i>, consuming an input stream from one or more topics and producing an output stream to one or more output topics, effectively transforming the input streams to output streams.
-    <li>The <a href="/documentation.html#connect">Connector API</a> allows building and running reusable producers or consumers that connect Kafka topics to existing applications or data systems. For example, a connector to a relational database might capture every change to a table.
-    <li>The <a href="/documentation.html#adminapi">Admin API</a> allows managing and inspecting topics, brokers and other Kafka objects.
-  </ul>
-      <img src="/{{version}}/images/kafka-apis.png" style="float: right; width: 50%;">
-      </div>
-  <p>
-  In Kafka the communication between the clients and the servers is done with a simple, high-performance, language agnostic <a href="https://kafka.apache.org/protocol.html">TCP protocol</a>. This protocol is versioned and maintains backwards compatibility with older versions. We provide a Java client for Kafka, but clients are available in <a href="https://cwiki.apache.org/confluence/display/KAFKA/Clients">many languages</a>.</p>
-
-  <h4><a id="intro_topics" href="#intro_topics">Topics and Logs</a></h4>
-  <p>Let's first dive into the core abstraction Kafka provides for a stream of records&mdash;the topic.</p>
-  <p>A topic is a category or feed name to which records are published. Topics in Kafka are always multi-subscriber; that is, a topic can have zero, one, or many consumers that subscribe to the data written to it.</p>
-  <p> For each topic, the Kafka cluster maintains a partitioned log that looks like this: </p>
-  <img class="centered" src="/{{version}}/images/log_anatomy.png">
-
-  <p> Each partition is an ordered, immutable sequence of records that is continually appended to&mdash;a structured commit log. The records in the partitions are each assigned a sequential id number called the <i>offset</i> that uniquely identifies each record within the partition.
-  </p>
+  <h4 class="anchor-heading">
+    <a class="anchor-link" id="intro_streaming" href="#intro_streaming"></a>
+    <a href="#intro_streaming">What is event streaming?</a>
+  </h4>
   <p>
-  The Kafka cluster durably persists all published records&mdash;whether or not they have been consumed&mdash;using a configurable retention period. For example, if the retention policy is set to two days, then for the two days after a record is published, it is available for consumption, after which it will be discarded to free up space. Kafka's performance is effectively constant with respect to data size so storing data for a long time is not a problem.
+    Event streaming is the digital equivalent of the human body's central nervous system. It is the
+    technological foundation for the 'always-on' world where businesses are increasingly software-defined 
+    and automated, and where the user of software is more software.
   </p>
-  <img class="centered" src="/{{version}}/images/log_consumer.png" style="width:400px">
   <p>
-  In fact, the only metadata retained on a per-consumer basis is the offset or position of that consumer in the log. This offset is controlled by the consumer: normally a consumer will advance its offset linearly as it reads records, but, in fact, since the position is controlled by the consumer it can consume records in any order it likes. For example a consumer can reset to an older offset to reprocess data from the past or skip ahead to the most recent record and start consuming from "now".
+    Technically speaking, event streaming is the practice of capturing data in real-time from event sources
+    like databases, sensors, mobile devices, cloud services, and software applications in the form of streams
+    of events; storing these event streams durably for later retrieval; manipulating, processing, and reacting
+    to the event streams in real-time as well as retrospectively; and routing the event streams to different
+    destination technologies as needed. Event streaming thus ensures a continuous flow and interpretation of
+    data so that the right information is at the right place, at the right time.
   </p>
-  <p>
-  This combination of features means that Kafka consumers are very cheap&mdash;they can come and go without much impact on the cluster or on other consumers. For example, you can use our command line tools to "tail" the contents of any topic without changing what is consumed by any existing consumers.
-  </p>
-  <p>
-  The partitions in the log serve several purposes. First, they allow the log to scale beyond a size that will fit on a single server. Each individual partition must fit on the servers that host it, but a topic may have many partitions so it can handle an arbitrary amount of data. Second they act as the unit of parallelism&mdash;more on that in a bit.
-  </p>
-
-  <h4><a id="intro_distribution" href="#intro_distribution">Distribution</a></h4>
 
+  <h4 class="anchor-heading">
+    <a class="anchor-link" id="intro_usage" href="#intro_usage"></a>
+    <a href="#intro_usage">What can I use event streaming for?</a>
+  </h4>
   <p>
-  The partitions of the log are distributed over the servers in the Kafka cluster with each server handling data and requests for a share of the partitions. Each partition is replicated across a configurable number of servers for fault tolerance.
-  </p>
-  <p>
-  Each partition has one server which acts as the "leader" and zero or more servers which act as "followers". The leader handles all read and write requests for the partition while the followers passively replicate the leader. If the leader fails, one of the followers will automatically become the new leader. Each server acts as a leader for some of its partitions and a follower for others so load is well balanced within the cluster.
-  </p>
-
-  <h4><a id="intro_geo-replication" href="#intro_geo-replication">Geo-Replication</a></h4>
-
-  <p>Kafka MirrorMaker provides geo-replication support for your clusters. With MirrorMaker, messages are replicated across multiple datacenters or cloud regions. You can use this in active/passive scenarios for backup and recovery; or in active/active scenarios to place data closer to your users, or support data locality requirements. </p>
-
-  <h4><a id="intro_producers" href="#intro_producers">Producers</a></h4>
-  <p>
-  Producers publish data to the topics of their choice. The producer is responsible for choosing which record to assign to which partition within the topic. This can be done in a round-robin fashion simply to balance load or it can be done according to some semantic partition function (say based on some key in the record). More on the use of partitioning in a second!
-  </p>
-
-  <h4><a id="intro_consumers" href="#intro_consumers">Consumers</a></h4>
-
-  <p>
-  Consumers label themselves with a <i>consumer group</i> name, and each record published to a topic is delivered to one consumer instance within each subscribing consumer group. Consumer instances can be in separate processes or on separate machines.
-  </p>
-  <p>
-  If all the consumer instances have the same consumer group, then the records will effectively be load balanced over the consumer instances.</p>
-  <p>
-  If all the consumer instances have different consumer groups, then each record will be broadcast to all the consumer processes.
-  </p>
-  <img class="centered" src="/{{version}}/images/consumer-groups.png">
-  <p>
-    A two server Kafka cluster hosting four partitions (P0-P3) with two consumer groups. Consumer group A has two consumer instances and group B has four.
+    Event streaming is applied to a <a href="/powered-by">wide variety of use cases</a>
+    across a plethora of industries and organizations. Its many examples include:
   </p>
+  <ul>
+    <li>
+      To process payments and financial transactions in real-time, such as in stock exchanges, banks, and insurances.
+    </li>
+    <li>
+      To track and monitor cars, trucks, fleets, and shipments in real-time, such as in logistics and the automotive industry.
+    </li>
+    <li>
+      To continuously capture and analyze sensor data from IoT devices or other equipment, such as in factories and wind parks.
+    </li>
+    <li>
+      To collect and immediately react to customer interactions and orders, such as in retail, the hotel and travel industry, and mobile applications.
+    </li>
+    <li>
+      To monitor patients in hospital care and predict changes in condition to ensure timely treatment in emergencies.
+    </li>
+    <li>
+      To connect, store, and make available data produced by different divisions of a company.
+    </li>
+    <li>
+      To serve as the foundation for data platforms, event-driven architectures, and microservices.
+    </li>
+  </ul>
 
-  <p>
-  More commonly, however, we have found that topics have a small number of consumer groups, one for each "logical subscriber". Each group is composed of many consumer instances for scalability and fault tolerance. This is nothing more than publish-subscribe semantics where the subscriber is a cluster of consumers instead of a single process.
-  </p>
-  <p>
-  The way consumption is implemented in Kafka is by dividing up the partitions in the log over the consumer instances so that each instance is the exclusive consumer of a "fair share" of partitions at any point in time. This process of maintaining membership in the group is handled by the Kafka protocol dynamically. If new instances join the group they will take over some partitions from other members of the group; if an instance dies, its partitions will be distributed to the remaining  [...]
-  </p>
-  <p>
-  Kafka only provides a total order over records <i>within</i> a partition, not between different partitions in a topic. Per-partition ordering combined with the ability to partition data by key is sufficient for most applications. However, if you require a total order over records this can be achieved with a topic that has only one partition, though this will mean only one consumer process per consumer group.
-  </p>
-  <h4><a id="intro_multi-tenancy" href="#intro_multi-tenancy">Multi-tenancy</a></h4>
-  <p>You can deploy Kafka as a multi-tenant solution. Multi-tenancy is enabled by configuring which topics can produce or consume data. There is also operations support for quotas.  Administrators can define and enforce quotas on requests to control the broker resources that are used by clients.  For more information, see the <a href="https://kafka.apache.org/documentation/#security">security documentation</a>. </p>
-  <h4><a id="intro_guarantees" href="#intro_guarantees">Guarantees</a></h4>
-  <p>
-  At a high-level Kafka gives the following guarantees:
+  <h4 class="anchor-heading">
+    <a class="anchor-link" id="intro_platform" href="#intro_platform"></a>
+    <a href="#intro_platform">Apache Kafka&reg; is an event streaming platform. What does that mean?</a>
+  </h4>
+  <p>
+    Kafka combines three key capabilities so you can implement
+    <a href="/powered-by">your use cases</a>
+    for event streaming end-to-end with a single battle-tested solution:
+  </p>
+  <ol>
+    <li>
+      To <strong>publish</strong> (write) and <strong>subscribe to</strong> (read) streams of events, including continuous import/export of
+      your data from other systems.
+    </li>
+    <li>
+      To <strong>store</strong> streams of events durably and reliably for as long as you want.
+    </li>
+    <li>
+      To <strong>process</strong> streams of events as they occur or retrospectively.
+    </li>
+  </ol>
+  <p>
+    And all this functionality is provided in a distributed, highly scalable, elastic, fault-tolerant, and
+    secure manner. Kafka can be deployed on bare-metal hardware, virtual machines, and containers, and on-premises
+    as well as in the cloud. You can choose between self-managing your Kafka environments and using fully managed
+    services offered by a variety of vendors.
+  </p>
+
+  <h4 class="anchor-heading">
+    <a class="anchor-link" id="intro_nutshell" href="#intro_nutshell"></a>
+    <a href="#intro_nutshell">How does Kafka work in a nutshell?</a>
+  </h4>
+  <p>
+    Kafka is a distributed system consisting of <strong>servers</strong> and <strong>clients</strong> that
+    communicate via a high-performance <a href="/protocol.html">TCP network protocol</a>.
+    It can be deployed on bare-metal hardware, virtual machines, and containers in on-premise as well as cloud
+    environments.
+  </p>
+  <p>
+    <strong>Servers</strong>: Kafka is run as a cluster of one or more servers that can span multiple datacenters
+    or cloud regions. Some of these servers form the storage layer, called the brokers. Other servers run
+    <a href="/documentation/#connect">Kafka Connect</a> to continuously import and export
+    data as event streams to integrate Kafka with your existing systems such as relational databases as well as
+    other Kafka clusters. To let you implement mission-critical use cases, a Kafka cluster is highly scalable
+    and fault-tolerant: if any of its servers fails, the other servers will take over their work to ensure
+    continuous operations without any data loss.
+  </p>
+  <p>
+    <strong>Clients</strong>: They allow you to write distributed applications and microservices that read, write,
+    and process streams of events in parallel, at scale, and in a fault-tolerant manner even in the case of network
+    problems or machine failures. Kafka ships with some such clients included, which are augmented by
+    <a href="https://cwiki.apache.org/confluence/display/KAFKA/Clients">dozens of clients</a> provided by the Kafka
+    community: clients are available for Java and Scala including the higher-level
+    <a href="/documentation/streams/">Kafka Streams</a> library, for Go, Python, C/C++, and
+    many other programming languages as well as REST APIs.
+  </p>
+
+  <h4 class="anchor-heading">
+    <a class="anchor-link" id="intro_concepts_and_terms" href="#intro_concepts_and_terms"></a>
+    <a href="#intro_concepts_and_terms">Main Concepts and Terminology</a>
+  </h4>
+  <p>
+    An <strong>event</strong> records the fact that "something happened" in the world or in your business. It is also called record or message in the documentation. When you read or write data to Kafka, you do this in the form of events. Conceptually, an event has a key, value, timestamp, and optional metadata headers. Here's an example event:
   </p>
   <ul>
-    <li>Messages sent by a producer to a particular topic partition will be appended in the order they are sent. That is, if a record M1 is sent by the same producer as a record M2, and M1 is sent first, then M1 will have a lower offset than M2 and appear earlier in the log.
-    <li>A consumer instance sees records in the order they are stored in the log.
-    <li>For a topic with replication factor N, we will tolerate up to N-1 server failures without losing any records committed to the log.
+    <li>
+      Event key: "Alice"
+    </li>
+    <li>
+      Event value: "Made a payment of $200 to Bob"
+    </li>
+    <li>
+      Event timestamp: "Jun. 25, 2020 at 2:06 p.m."
+    </li>
   </ul>
   <p>
-  More details on these guarantees are given in the design section of the documentation.
-  </p>
-  <h4><a id="kafka_mq" href="#kafka_mq">Kafka as a Messaging System</a></h4>
-  <p>
-  How does Kafka's notion of streams compare to a traditional enterprise messaging system?
+    <strong>Producers</strong> are those client applications that publish (write) events to Kafka, and <strong>consumers</strong> are those that subscribe to (read and process) these events. In Kafka, producers and consumers are fully decoupled and agnostic of each other, which is a key design element to achieve the high scalability that Kafka is known for. For example, producers never need to wait for consumers. Kafka provides various <a href="/documentation/#intro_guarantees">guarantee [...]
   </p>
   <p>
-  Messaging traditionally has two models: <a href="http://en.wikipedia.org/wiki/Message_queue">queuing</a> and <a href="http://en.wikipedia.org/wiki/Publish%E2%80%93subscribe_pattern">publish-subscribe</a>. In a queue, a pool of consumers may read from a server and each record goes to one of them; in publish-subscribe the record is broadcast to all consumers. Each of these two models has a strength and a weakness. The strength of queuing is that it allows you to divide up the processing  [...]
+    Events are organized and durably stored in <strong>topics</strong>. Very simplified, a topic is similar to a folder in a filesystem, and the events are the files in that folder. An example topic name could be "payments". Topics in Kafka are always multi-producer and multi-subscriber: a topic can have zero, one, or many producers that write events to it, as well as zero, one, or many consumers that subscribe to these events. Events in a topic can be read as often as needed—unlike trad [...]
   </p>
   <p>
-  The consumer group concept in Kafka generalizes these two concepts. As with a queue the consumer group allows you to divide up processing over a collection of processes (the members of the consumer group). As with publish-subscribe, Kafka allows you to broadcast messages to multiple consumer groups.
+    Topics are <strong>partitioned</strong>, meaning a topic is spread over a number of "buckets" located on different Kafka brokers. This distributed placement of your data is very important for scalability because it allows client applications to both read and write the data from/to many brokers at the same time. When a new event is published to a topic, it is actually appended to one of the topic's partitions. Events with the same event key (e.g., a customer or vehicle ID) are written [...]
   </p>
+  <figure class="figure">
+    <img src="/images/streams-and-tables-p1_p4.png" class="figure-image" />
+    <figcaption class="figure-caption">
+      Figure: This example topic has four partitions P1–P4. Two different producer clients are publishing,
+      independently from each other, new events to the topic by writing events over the network to the topic's
+      partitions. Events with the same key (denoted by their color in the figure) are written to the same
+      partition. Note that both producers can write to the same partition if appropriate.
+    </figcaption>
+  </figure>
   <p>
-  The advantage of Kafka's model is that every topic has both these properties&mdash;it can scale processing and is also multi-subscriber&mdash;there is no need to choose one or the other.
+    To make your data fault-tolerant and highly-available, every topic can be <strong>replicated</strong>, even across geo-regions or datacenters, so that there are always multiple brokers that have a copy of the data just in case things go wrong, you want to do maintenance on the brokers, and so on. A common production setting is a replication factor of 3, i.e., there will always be three copies of your data. This replication is performed at the level of topic-partitions.
   </p>
   <p>
-  Kafka has stronger ordering guarantees than a traditional messaging system, too.
-  </p>
-  <p>
-  A traditional queue retains records in-order on the server, and if multiple consumers consume from the queue then the server hands out records in the order they are stored. However, although the server hands out records in order, the records are delivered asynchronously to consumers, so they may arrive out of order on different consumers. This effectively means the ordering of the records is lost in the presence of parallel consumption. Messaging systems often work around this by havin [...]
+    This primer should be sufficient for an introduction. The <a href="/documentation/#design">Design</a> section of the documentation explains Kafka's various concepts in full detail, if you are interested.
   </p>
+
+  <h4 class="anchor-heading">
+    <a class="anchor-link" id="intro_apis" href="#intro_apis"></a>
+    <a href="#intro_apis">Kafka APIs</a>
+  </h4>
   <p>
-  Kafka does it better. By having a notion of parallelism&mdash;the partition&mdash;within the topics, Kafka is able to provide both ordering guarantees and load balancing over a pool of consumer processes. This is achieved by assigning the partitions in the topic to the consumers in the consumer group so that each partition is consumed by exactly one consumer in the group. By doing this we ensure that the consumer is the only reader of that partition and consumes the data in order. Sinc [...]
+    In addition to command line tooling for management and administration tasks, Kafka has five core APIs for Java and Scala:
   </p>
+  <ul>
+    <li>
+      The <a href="/documentation.html#adminapi">Admin API</a> to manage and inspect topics, brokers, and other Kafka objects.
+    </li>
+    <li>
+      The <a href="/documentation.html#producerapi">Producer API</a> to publish (write) a stream of events to one or more Kafka topics.
+    </li>
+    <li>
+      The <a href="/documentation.html#consumerapi">Consumer API</a> to subscribe to (read) one or more topics and to process the stream of events produced to them.
+    </li>
+    <li>
+      The <a href="/documentation/streams">Kafka Streams API</a> to implement stream processing applications and microservices. It provides higher-level functions to process event streams, including transformations, stateful operations like aggregations and joins, windowing, processing based on event-time, and more. Input is read from one or more topics in order to generate output to one or more topics, effectively transforming the input streams to output streams.
+    </li>
+    <li>
+      The <a href="/documentation.html#connect">Kafka Connect API</a> to build and run reusable data import/export connectors that consume (read) or produce (write) streams of events from and to external systems and applications so they can integrate with Kafka. For example, a connector to a relational database like PostgreSQL might capture every change to a set of tables. However, in practice, you typically don't need to implement your own connectors because the Kafka community already  [...]
+    </li>
+  </ul>
 
-  <h4 id="kafka_storage">Kafka as a Storage System</h4>
+  <!-- TODO: add new section once supporting page is written -->
 
-  <p>
-  Any message queue that allows publishing messages decoupled from consuming them is effectively acting as a storage system for the in-flight messages. What is different about Kafka is that it is a very good storage system.
-  </p>
-  <p>
-  Data written to Kafka is written to disk and replicated for fault-tolerance. Kafka allows producers to wait on acknowledgement so that a write isn't considered complete until it is fully replicated and guaranteed to persist even if the server written to fails.
-  </p>
-  <p>
-  The disk structures Kafka uses scale well&mdash;Kafka will perform the same whether you have 50 KB or 50 TB of persistent data on the server.
-  </p>
-  <p>
-  As a result of taking storage seriously and allowing the clients to control their read position, you can think of Kafka as a kind of special purpose distributed filesystem dedicated to high-performance, low-latency commit log storage, replication, and propagation.
-  </p>
-  <p>
-  For details about the Kafka's commit log storage and replication design, please read <a href="https://kafka.apache.org/documentation/#design">this</a> page.
-  </p>
-  <h4>Kafka for Stream Processing</h4>
-  <p>
-  It isn't enough to just read, write, and store streams of data, the purpose is to enable real-time processing of streams.
-  </p>
-  <p>
-  In Kafka a stream processor is anything that takes continual streams of  data from input topics, performs some processing on this input, and produces continual streams of data to output topics.
-  </p>
-  <p>
-  For example, a retail application might take in input streams of sales and shipments, and output a stream of reorders and price adjustments computed off this data.
-  </p>
-  <p>
-  It is possible to do simple processing directly using the producer and consumer APIs. However for more complex transformations Kafka provides a fully integrated <a href="/documentation/streams">Streams API</a>. This allows building applications that do non-trivial processing that compute aggregations off of streams or join streams together.
-  </p>
-  <p>
-  This facility helps solve the hard problems this type of application faces: handling out-of-order data, reprocessing input as code changes, performing stateful computations, etc.
-  </p>
-  <p>
-  The streams API builds on the core primitives Kafka provides: it uses the producer and consumer APIs for input, uses Kafka for stateful storage, and uses the same group mechanism for fault tolerance among the stream processor instances.
-  </p>
-  <h4>Putting the Pieces Together</h4>
-  <p>
-  This combination of messaging, storage, and stream processing may seem unusual but it is essential to Kafka's role as a streaming platform.
-  </p>
-  <p>
-  A distributed file system like HDFS allows storing static files for batch processing. Effectively a system like this allows storing and processing <i>historical</i> data from the past.
-  </p>
-  <p>
-  A traditional enterprise messaging system allows processing future messages that will arrive after you subscribe. Applications built in this way process future data as it arrives.
-  </p>
-  <p>
-  Kafka combines both of these capabilities, and the combination is critical both for Kafka usage as a platform for streaming applications as well as for streaming data pipelines.
-  </p>
-  <p>
-  By combining storage and low-latency subscriptions, streaming applications can treat both past and future data the same way. That is a single application can process historical, stored data but rather than ending when it reaches the last record it can keep processing as future data arrives. This is a generalized notion of stream processing that subsumes batch processing as well as message-driven applications.
-  </p>
-  <p>
-  Likewise for streaming data pipelines the combination of subscription to real-time events make it possible to use Kafka for very low-latency pipelines; but the ability to store data reliably make it possible to use it for critical data where the delivery of data must be guaranteed or for integration with offline systems that load data only periodically or may go down for extended periods of time for maintenance. The stream processing facilities make it possible to transform data as it  [...]
-  </p>
-  <p>
-  For more information on the guarantees, APIs, and capabilities Kafka provides see the rest of the <a href="/documentation.html">documentation</a>.
-  </p>
+  <h4 class="anchor-heading">
+    <a class="anchor-link" id="intro_more" href="#intro_more"></a>
+    <a href="#intro_more">Where to go from here</a>
+  </h4>
+  <ul>
+    <li>
+      To get hands-on experience with Kafka, follow the <a href="/quickstart">Quickstart</a>.
+    </li>
+    <li>
+      To understand Kafka in more detail, read the <a href="/documentation/">Documentation</a>.
+      You also have your choice of <a href="/books-and-papers">Kafka books and academic papers</a>.
+    </li>
+    <li>
+      Browse through the <a href="/powered-by">Use Cases</a> to learn how other users in our world-wide community are getting value out of Kafka.
+    </li>
+    <li>
+      Join a <a href="/events">local Kafka meetup group</a> and
+      <a href="https://kafka-summit.org/past-events/">watch talks from Kafka Summit</a>, the main conference of the Kafka community.
+    </li>
+  </ul>
 </script>
 
 <div class="p-introduction"></div>
diff --git a/docs/migration.html b/docs/migration.html
index 08a6271..95fc87f 100644
--- a/docs/migration.html
+++ b/docs/migration.html
@@ -16,11 +16,11 @@
 -->
 
 <!--#include virtual="../includes/_header.htm" -->
-<h2><a id="migration" href="#migration">Migrating from 0.7.x to 0.8</a></h2>
+<h2 class="anchor-heading"><a id="migration" class="anchor-link"></a><a href="#migration">Migrating from 0.7.x to 0.8</a></h2>
 
 0.8 is our first (and hopefully last) release with a non-backwards-compatible wire protocol, ZooKeeper     layout, and on-disk data format. This was a chance for us to clean up a lot of cruft and start fresh. This means performing a no-downtime upgrade is more painful than normal&mdash;you cannot just swap in the new code in-place.
 
-<h3><a id="migration_steps" href="#migration_steps">Migration Steps</a></h3>
+<h3 class="anchor-heading"><a id="migration_steps" class="anchor-link"></a><a href="#migration_steps">Migration Steps</a></h3>
 
 <ol>
     <li>Setup a new cluster running 0.8.
diff --git a/docs/ops.html b/docs/ops.html
index 322bf39..b84c980 100644
--- a/docs/ops.html
+++ b/docs/ops.html
@@ -18,19 +18,17 @@
 
   Here is some information on actually running Kafka as a production system based on usage and experience at LinkedIn. Please send us any additional tips you know of.
 
-  <h3><a id="basic_ops" href="#basic_ops">6.1 Basic Kafka Operations</a></h3>
+  <h3 class="anchor-heading"><a id="basic_ops" class="anchor-link"></a><a href="#basic_ops">6.1 Basic Kafka Operations</a></h3>
 
   This section will review the most common operations you will perform on your Kafka cluster. All of the tools reviewed in this section are available under the <code>bin/</code> directory of the Kafka distribution and each tool will print details on all possible commandline options if it is run with no arguments.
 
-  <h4><a id="basic_ops_add_topic" href="#basic_ops_add_topic">Adding and removing topics</a></h4>
+  <h4 class="anchor-heading"><a id="basic_ops_add_topic" class="anchor-link"></a><a href="#basic_ops_add_topic">Adding and removing topics</a></h4>
 
   You have the option of either adding topics manually or having them be created automatically when data is first published to a non-existent topic. If topics are auto-created then you may want to tune the default <a href="#topicconfigs">topic configurations</a> used for auto-created topics.
   <p>
   Topics are added and modified using the topic tool:
-  <pre class="brush: bash;">
-  &gt; bin/kafka-topics.sh --bootstrap-server broker_host:port --create --topic my_topic_name \
-        --partitions 20 --replication-factor 3 --config x=y
-  </pre>
+  <pre class="line-numbers"><code class="language-bash">  &gt; bin/kafka-topics.sh --bootstrap-server broker_host:port --create --topic my_topic_name \
+        --partitions 20 --replication-factor 3 --config x=y</code></pre>
   The replication factor controls how many servers will replicate each message that is written. If you have a replication factor of 3 then up to 2 servers can fail before you will lose access to your data. We recommend you use a replication factor of 2 or 3 so that you can transparently bounce machines without interrupting data consumption.
   <p>
   The partition count controls how many logs the topic will be sharded into. There are several impacts of the partition count. First each partition must fit entirely on a single server. So if you have 20 partitions the full data set (and read and write load) will be handled by no more than 20 servers (not counting replicas). Finally the partition count impacts the maximum parallelism of your consumers. This is discussed in greater detail in the <a href="#intro_consumers">concepts section</a>.
@@ -39,35 +37,27 @@
   <p>
   The configurations added on the command line override the default settings the server has for things like the length of time data should be retained. The complete set of per-topic configurations is documented <a href="#topicconfigs">here</a>.
 
-  <h4><a id="basic_ops_modify_topic" href="#basic_ops_modify_topic">Modifying topics</a></h4>
+  <h4 class="anchor-heading"><a id="basic_ops_modify_topic" class="anchor-link"></a><a href="#basic_ops_modify_topic">Modifying topics</a></h4>
 
   You can change the configuration or partitioning of a topic using the same topic tool.
   <p>
   To add partitions you can do
-  <pre class="brush: bash;">
-  &gt; bin/kafka-topics.sh --bootstrap-server broker_host:port --alter --topic my_topic_name \
-        --partitions 40
-  </pre>
+  <pre class="line-numbers"><code class="language-bash">  &gt; bin/kafka-topics.sh --bootstrap-server broker_host:port --alter --topic my_topic_name \
+        --partitions 40</code></pre>
   Be aware that one use case for partitions is to semantically partition data, and adding partitions doesn't change the partitioning of existing data so this may disturb consumers if they rely on that partition. That is if data is partitioned by <code>hash(key) % number_of_partitions</code> then this partitioning will potentially be shuffled by adding partitions but Kafka will not attempt to automatically redistribute data in any way.
   <p>
   To add configs:
-  <pre class="brush: bash;">
-  &gt; bin/kafka-configs.sh --bootstrap-server broker_host:port --entity-type topics --entity-name my_topic_name --alter --add-config x=y
-  </pre>
+  <pre class="line-numbers"><code class="language-bash">  &gt; bin/kafka-configs.sh --bootstrap-server broker_host:port --entity-type topics --entity-name my_topic_name --alter --add-config x=y</code></pre>
   To remove a config:
-  <pre class="brush: bash;">
-  &gt; bin/kafka-configs.sh --bootstrap-server broker_host:port --entity-type topics --entity-name my_topic_name --alter --delete-config x
-  </pre>
+  <pre class="line-numbers"><code class="language-bash">  &gt; bin/kafka-configs.sh --bootstrap-server broker_host:port --entity-type topics --entity-name my_topic_name --alter --delete-config x</code></pre>
   And finally deleting a topic:
-  <pre class="brush: bash;">
-  &gt; bin/kafka-topics.sh --bootstrap-server broker_host:port --delete --topic my_topic_name
-  </pre>
+  <pre class="line-numbers"><code class="language-bash">  &gt; bin/kafka-topics.sh --bootstrap-server broker_host:port --delete --topic my_topic_name</code></pre>
   <p>
   Kafka does not currently support reducing the number of partitions for a topic.
   <p>
   Instructions for changing the replication factor of a topic can be found <a href="#basic_ops_increase_replication_factor">here</a>.
 
-  <h4><a id="basic_ops_restarting" href="#basic_ops_restarting">Graceful shutdown</a></h4>
+  <h4 class="anchor-heading"><a id="basic_ops_restarting" class="anchor-link"></a><a href="#basic_ops_restarting">Graceful shutdown</a></h4>
 
   The Kafka cluster will automatically detect any broker shutdown or failure and elect new leaders for the partitions on that machine. This will occur whether a server fails or it is brought down intentionally for maintenance or configuration changes. For the latter cases Kafka supports a more graceful mechanism for stopping a server than just killing it.
 
@@ -78,37 +68,31 @@
   </ol>
 
   Syncing the logs will happen automatically whenever the server is stopped other than by a hard kill, but the controlled leadership migration requires using a special setting:
-  <pre class="brush: text;">
-      controlled.shutdown.enable=true
-  </pre>
+  <pre class="line-numbers"><code class="language-text">      controlled.shutdown.enable=true</code></pre>
   Note that controlled shutdown will only succeed if <i>all</i> the partitions hosted on the broker have replicas (i.e. the replication factor is greater than 1 <i>and</i> at least one of these replicas is alive). This is generally what you want since shutting down the last replica would make that topic partition unavailable.
 
-  <h4><a id="basic_ops_leader_balancing" href="#basic_ops_leader_balancing">Balancing leadership</a></h4>
+  <h4 class="anchor-heading"><a id="basic_ops_leader_balancing" class="anchor-link"></a><a href="#basic_ops_leader_balancing">Balancing leadership</a></h4>
 
   Whenever a broker stops or crashes, leadership for that broker's partitions transfers to other replicas. When the broker is restarted it will only be a follower for all its partitions, meaning it will not be used for client reads and writes.
   <p>
   To avoid this imbalance, Kafka has a notion of preferred replicas. If the list of replicas for a partition is 1,5,9 then node 1 is preferred as the leader to either node 5 or 9 because it is earlier in the replica list. By default the Kafka cluster will try to restore leadership to the restored replicas.  This behaviour is configured with:
 
-  <pre class="brush: text;">
-      auto.leader.rebalance.enable=true
-  </pre>
+  <pre class="line-numbers"><code class="language-text">      auto.leader.rebalance.enable=true</code></pre>
     You can also set this to false, but you will then need to manually restore leadership to the restored replicas by running the command:
-  <pre class="brush: bash;">
-  &gt; bin/kafka-preferred-replica-election.sh --bootstrap-server broker_host:port
-  </pre>
+  <pre class="line-numbers"><code class="language-bash">  &gt; bin/kafka-preferred-replica-election.sh --bootstrap-server broker_host:port</code></pre>
 
-  <h4><a id="basic_ops_racks" href="#basic_ops_racks">Balancing Replicas Across Racks</a></h4>
+  <h4 class="anchor-heading"><a id="basic_ops_racks" class="anchor-link"></a><a href="#basic_ops_racks">Balancing Replicas Across Racks</a></h4>
   The rack awareness feature spreads replicas of the same partition across different racks. This extends the guarantees Kafka provides for broker-failure to cover rack-failure, limiting the risk of data loss should all the brokers on a rack fail at once. The feature can also be applied to other broker groupings such as availability zones in EC2.
   <p></p>
   You can specify that a broker belongs to a particular rack by adding a property to the broker config:
-  <pre class="brush: text;">   broker.rack=my-rack-id</pre>
+  <pre class="language-text"><code class="language-text">  broker.rack=my-rack-id</code></pre>
   When a topic is <a href="#basic_ops_add_topic">created</a>, <a href="#basic_ops_modify_topic">modified</a> or replicas are <a href="#basic_ops_cluster_expansion">redistributed</a>, the rack constraint will be honoured, ensuring replicas span as many racks as they can (a partition will span min(#racks, replication-factor) different racks).
   <p></p>
   The algorithm used to assign replicas to brokers ensures that the number of leaders per broker will be constant, regardless of how brokers are distributed across racks. This ensures balanced throughput.
   <p></p>
   However if racks are assigned different numbers of brokers, the assignment of replicas will not be even. Racks with fewer brokers will get more replicas, meaning they will use more storage and put more resources into replication. Hence it is sensible to configure an equal number of brokers per rack.
 
-  <h4><a id="basic_ops_mirror_maker" href="#basic_ops_mirror_maker">Mirroring data between clusters</a></h4>
+  <h4 class="anchor-heading"><a id="basic_ops_mirror_maker" class="anchor-link"></a><a href="#basic_ops_mirror_maker">Mirroring data between clusters</a></h4>
 
   We refer to the process of replicating data <i>between</i> Kafka clusters "mirroring" to avoid confusion with the replication that happens amongst the nodes in a single cluster. Kafka comes with a tool for mirroring data between Kafka clusters. The tool consumes from a source cluster and produces to a destination cluster.
 
@@ -121,42 +105,35 @@
   The source and destination clusters are completely independent entities: they can have different numbers of partitions and the offsets will not be the same. For this reason the mirror cluster is not really intended as a fault-tolerance mechanism (as the consumer position will be different); for that we recommend using normal in-cluster replication. The mirror maker process will, however, retain and use the message key for partitioning so order is preserved on a per-key basis.
   <p>
   Here is an example showing how to mirror a single topic (named <i>my-topic</i>) from an input cluster:
-  <pre class="brush: bash;">
-  &gt; bin/kafka-mirror-maker.sh
+  <pre class="line-numbers"><code class="language-bash">  &gt; bin/kafka-mirror-maker.sh
         --consumer.config consumer.properties
-        --producer.config producer.properties --whitelist my-topic
-  </pre>
+        --producer.config producer.properties --whitelist my-topic</code></pre>
   Note that we specify the list of topics with the <code>--whitelist</code> option. This option allows any regular expression using <a href="http://docs.oracle.com/javase/7/docs/api/java/util/regex/Pattern.html">Java-style regular expressions</a>. So you could mirror two topics named <i>A</i> and <i>B</i> using <code>--whitelist 'A|B'</code>. Or you could mirror <i>all</i> topics using <code>--whitelist '*'</code>. Make sure to quote any regular expression to ensure the shell doesn't try [...]
 
   Combining mirroring with the configuration <code>auto.create.topics.enable=true</code> makes it possible to have a replica cluster that will automatically create and replicate all data in a source cluster even as new topics are added.
 
-  <h4><a id="basic_ops_consumer_lag" href="#basic_ops_consumer_lag">Checking consumer position</a></h4>
+  <h4 class="anchor-heading"><a id="basic_ops_consumer_lag" class="anchor-link"></a><a href="#basic_ops_consumer_lag">Checking consumer position</a></h4>
   Sometimes it's useful to see the position of your consumers. We have a tool that will show the position of all consumers in a consumer group as well as how far behind the end of the log they are. To run this tool on a consumer group named <i>my-group</i> consuming a topic named <i>my-topic</i> would look like this:
-  <pre class="brush: bash;">
-  &gt; bin/kafka-consumer-groups.sh --bootstrap-server localhost:9092 --describe --group my-group
+  <pre class="line-numbers"><code class="language-bash">  &gt; bin/kafka-consumer-groups.sh --bootstrap-server localhost:9092 --describe --group my-group
 
   TOPIC                          PARTITION  CURRENT-OFFSET  LOG-END-OFFSET  LAG        CONSUMER-ID                                       HOST                           CLIENT-ID
   my-topic                       0          2               4               2          consumer-1-029af89c-873c-4751-a720-cefd41a669d6   /127.0.0.1                     consumer-1
   my-topic                       1          2               3               1          consumer-1-029af89c-873c-4751-a720-cefd41a669d6   /127.0.0.1                     consumer-1
-  my-topic                       2          2               3               1          consumer-2-42c1abd4-e3b2-425d-a8bb-e1ea49b29bb2   /127.0.0.1                     consumer-2
-  </pre>
+  my-topic                       2          2               3               1          consumer-2-42c1abd4-e3b2-425d-a8bb-e1ea49b29bb2   /127.0.0.1                     consumer-2</code></pre>
 
-  <h4><a id="basic_ops_consumer_group" href="#basic_ops_consumer_group">Managing Consumer Groups</a></h4>
+  <h4 class="anchor-heading"><a id="basic_ops_consumer_group" class="anchor-link"></a><a href="#basic_ops_consumer_group">Managing Consumer Groups</a></h4>
 
   With the ConsumerGroupCommand tool, we can list, describe, or delete the consumer groups. The consumer group can be deleted manually, or automatically when the last committed offset for that group expires. Manual deletion works only if the group does not have any active members.
 
   For example, to list all consumer groups across all topics:
 
-  <pre class="brush: bash;">
-  &gt; bin/kafka-consumer-groups.sh --bootstrap-server localhost:9092 --list
+  <pre class="line-numbers"><code class="language-bash">  &gt; bin/kafka-consumer-groups.sh --bootstrap-server localhost:9092 --list
 
-  test-consumer-group
-  </pre>
+  test-consumer-group</code></pre>
 
   To view offsets, as mentioned earlier, we "describe" the consumer group like this:
 
-  <pre class="brush: bash;">
-  &gt; bin/kafka-consumer-groups.sh --bootstrap-server localhost:9092 --describe --group my-group
+  <pre class="line-numbers"><code class="language-bash">  &gt; bin/kafka-consumer-groups.sh --bootstrap-server localhost:9092 --describe --group my-group
 
   TOPIC           PARTITION  CURRENT-OFFSET  LOG-END-OFFSET  LAG             CONSUMER-ID                                    HOST            CLIENT-ID
   topic3          0          241019          395308          154289          consumer2-e76ea8c3-5d30-4299-9005-47eb41f3d3c4 /127.0.0.1      consumer2
@@ -164,50 +141,41 @@
   topic3          1          241018          398817          157799          consumer2-e76ea8c3-5d30-4299-9005-47eb41f3d3c4 /127.0.0.1      consumer2
   topic1          0          854144          855809          1665            consumer1-3fc8d6f1-581a-4472-bdf3-3515b4aee8c1 /127.0.0.1      consumer1
   topic2          0          460537          803290          342753          consumer1-3fc8d6f1-581a-4472-bdf3-3515b4aee8c1 /127.0.0.1      consumer1
-  topic3          2          243655          398812          155157          consumer4-117fe4d3-c6c1-4178-8ee9-eb4a3954bee0 /127.0.0.1      consumer4
-  </pre>
+  topic3          2          243655          398812          155157          consumer4-117fe4d3-c6c1-4178-8ee9-eb4a3954bee0 /127.0.0.1      consumer4</code></pre>
 
   There are a number of additional "describe" options that can be used to provide more detailed information about a consumer group:
   <ul>
     <li>--members: This option provides the list of all active members in the consumer group.
-      <pre class="brush: bash;">
-      &gt; bin/kafka-consumer-groups.sh --bootstrap-server localhost:9092 --describe --group my-group --members
+      <pre class="line-numbers"><code class="language-bash">      &gt; bin/kafka-consumer-groups.sh --bootstrap-server localhost:9092 --describe --group my-group --members
 
       CONSUMER-ID                                    HOST            CLIENT-ID       #PARTITIONS
       consumer1-3fc8d6f1-581a-4472-bdf3-3515b4aee8c1 /127.0.0.1      consumer1       2
       consumer4-117fe4d3-c6c1-4178-8ee9-eb4a3954bee0 /127.0.0.1      consumer4       1
       consumer2-e76ea8c3-5d30-4299-9005-47eb41f3d3c4 /127.0.0.1      consumer2       3
-      consumer3-ecea43e4-1f01-479f-8349-f9130b75d8ee /127.0.0.1      consumer3       0
-      </pre>
+      consumer3-ecea43e4-1f01-479f-8349-f9130b75d8ee /127.0.0.1      consumer3       0</code></pre>
     </li>
     <li>--members --verbose: On top of the information reported by the "--members" options above, this option also provides the partitions assigned to each member.
-      <pre class="brush: bash;">
-      &gt; bin/kafka-consumer-groups.sh --bootstrap-server localhost:9092 --describe --group my-group --members --verbose
+      <pre class="line-numbers"><code class="language-bash">      &gt; bin/kafka-consumer-groups.sh --bootstrap-server localhost:9092 --describe --group my-group --members --verbose
 
       CONSUMER-ID                                    HOST            CLIENT-ID       #PARTITIONS     ASSIGNMENT
       consumer1-3fc8d6f1-581a-4472-bdf3-3515b4aee8c1 /127.0.0.1      consumer1       2               topic1(0), topic2(0)
       consumer4-117fe4d3-c6c1-4178-8ee9-eb4a3954bee0 /127.0.0.1      consumer4       1               topic3(2)
       consumer2-e76ea8c3-5d30-4299-9005-47eb41f3d3c4 /127.0.0.1      consumer2       3               topic2(1), topic3(0,1)
-      consumer3-ecea43e4-1f01-479f-8349-f9130b75d8ee /127.0.0.1      consumer3       0               -
-      </pre>
+      consumer3-ecea43e4-1f01-479f-8349-f9130b75d8ee /127.0.0.1      consumer3       0               -</code></pre>
     </li>
     <li>--offsets: This is the default describe option and provides the same output as the "--describe" option.</li>
     <li>--state: This option provides useful group-level information.
-      <pre class="brush: bash;">
-      &gt; bin/kafka-consumer-groups.sh --bootstrap-server localhost:9092 --describe --group my-group --state
+      <pre class="line-numbers"><code class="language-bash">      &gt; bin/kafka-consumer-groups.sh --bootstrap-server localhost:9092 --describe --group my-group --state
 
       COORDINATOR (ID)          ASSIGNMENT-STRATEGY       STATE                #MEMBERS
-      localhost:9092 (0)        range                     Stable               4
-      </pre>
+      localhost:9092 (0)        range                     Stable               4</code></pre>
     </li>
   </ul>
 
   To manually delete one or multiple consumer groups, the "--delete" option can be used:
-  <pre class="brush: bash;">
-  &gt; bin/kafka-consumer-groups.sh --bootstrap-server localhost:9092 --delete --group my-group --group my-other-group
+  <pre class="line-numbers"><code class="language-bash">  &gt; bin/kafka-consumer-groups.sh --bootstrap-server localhost:9092 --delete --group my-group --group my-other-group
 
-  Deletion of requested consumer groups ('my-group', 'my-other-group') was successful.
-  </pre>
+  Deletion of requested consumer groups ('my-group', 'my-other-group') was successful.</code></pre>
 
   <p>
   To reset offsets of a consumer group, "--reset-offsets" option can be used.
@@ -263,23 +231,19 @@
   <p>
   For example, to reset offsets of a consumer group to the latest offset:
 
-  <pre class="brush: bash;">
-  &gt; bin/kafka-consumer-groups.sh --bootstrap-server localhost:9092 --reset-offsets --group consumergroup1 --topic topic1 --to-latest
+  <pre class="line-numbers"><code class="language-bash">  &gt; bin/kafka-consumer-groups.sh --bootstrap-server localhost:9092 --reset-offsets --group consumergroup1 --topic topic1 --to-latest
 
   TOPIC                          PARTITION  NEW-OFFSET
-  topic1                         0          0
-  </pre>
+  topic1                         0          0</code></pre>
 
   <p>
 
   If you are using the old high-level consumer and storing the group metadata in ZooKeeper (i.e. <code>offsets.storage=zookeeper</code>), pass
   <code>--zookeeper</code> instead of <code>--bootstrap-server</code>:
 
-  <pre class="brush: bash;">
-  &gt; bin/kafka-consumer-groups.sh --zookeeper localhost:2181 --list
-  </pre>
+  <pre class="line-numbers"><code class="language-bash">  &gt; bin/kafka-consumer-groups.sh --zookeeper localhost:2181 --list</code></pre>
 
-  <h4><a id="basic_ops_cluster_expansion" href="#basic_ops_cluster_expansion">Expanding your cluster</a></h4>
+  <h4 class="anchor-heading"><a id="basic_ops_cluster_expansion" class="anchor-link"></a><a href="#basic_ops_cluster_expansion">Expanding your cluster</a></h4>
 
   Adding servers to a Kafka cluster is easy, just assign them a unique broker id and start up Kafka on your new servers. However these new servers will not automatically be assigned any data partitions, so unless partitions are moved to them they won't be doing any work until new topics are created. So usually when you add machines to your cluster you will want to migrate some existing data to these machines.
   <p>
@@ -293,22 +257,19 @@
   <li>--execute: In this mode, the tool kicks off the reassignment of partitions based on the user provided reassignment plan. (using the --reassignment-json-file option). This can either be a custom reassignment plan hand crafted by the admin or provided by using the --generate option</li>
   <li>--verify: In this mode, the tool verifies the status of the reassignment for all partitions listed during the last --execute. The status can be either of successfully completed, failed or in progress</li>
   </ul>
-  <h5><a id="basic_ops_automigrate" href="#basic_ops_automigrate">Automatically migrating data to new machines</a></h5>
+  <h5 class="anchor-heading"><a id="basic_ops_automigrate" class="anchor-link"></a><a href="#basic_ops_automigrate">Automatically migrating data to new machines</a></h5>
   The partition reassignment tool can be used to move some topics off of the current set of brokers to the newly added brokers. This is typically useful while expanding an existing cluster since it is easier to move entire topics to the new set of brokers, than moving one partition at a time. When used to do this, the user should provide a list of topics that should be moved to the new set of brokers and a target list of new brokers. The tool then evenly distributes all partitions for th [...]
   <p>
   For instance, the following example will move all partitions for topics foo1,foo2 to the new set of brokers 5,6. At the end of this move, all partitions for topics foo1 and foo2 will <i>only</i> exist on brokers 5,6.
   <p>
   Since the tool accepts the input list of topics as a json file, you first need to identify the topics you want to move and create the json file as follows:
-  <pre class="brush: bash;">
-  > cat topics-to-move.json
+  <pre class="line-numbers"><code class="language-bash">  > cat topics-to-move.json
   {"topics": [{"topic": "foo1"},
               {"topic": "foo2"}],
   "version":1
-  }
-  </pre>
+  }</code></pre>
   Once the json file is ready, use the partition reassignment tool to generate a candidate assignment:
-  <pre class="brush: bash;">
-  > bin/kafka-reassign-partitions.sh --bootstrap-server localhost:9092 --topics-to-move-json-file topics-to-move.json --broker-list "5,6" --generate
+  <pre class="line-numbers"><code class="language-bash">  > bin/kafka-reassign-partitions.sh --bootstrap-server localhost:9092 --topics-to-move-json-file topics-to-move.json --broker-list "5,6" --generate
   Current partition replica assignment
 
   {"version":1,
@@ -329,12 +290,10 @@
                 {"topic":"foo2","partition":0,"replicas":[5,6]},
                 {"topic":"foo1","partition":1,"replicas":[5,6]},
                 {"topic":"foo2","partition":1,"replicas":[5,6]}]
-  }
-  </pre>
+  }</code></pre>
   <p>
   The tool generates a candidate assignment that will move all partitions from topics foo1,foo2 to brokers 5,6. Note, however, that at this point, the partition movement has not started, it merely tells you the current assignment and the proposed new assignment. The current assignment should be saved in case you want to rollback to it. The new assignment should be saved in a json file (e.g. expand-cluster-reassignment.json) to be input to the tool with the --execute option as follows:
-  <pre class="brush: bash;">
-  > bin/kafka-reassign-partitions.sh --bootstrap-server localhost:9092 --reassignment-json-file expand-cluster-reassignment.json --execute
+  <pre class="line-numbers"><code class="language-bash">  > bin/kafka-reassign-partitions.sh --bootstrap-server localhost:9092 --reassignment-json-file expand-cluster-reassignment.json --execute
   Current partition replica assignment
 
   {"version":1,
@@ -355,34 +314,28 @@
                 {"topic":"foo2","partition":0,"replicas":[5,6]},
                 {"topic":"foo1","partition":1,"replicas":[5,6]},
                 {"topic":"foo2","partition":1,"replicas":[5,6]}]
-  }
-  </pre>
+  }</code></pre>
   <p>
   Finally, the --verify option can be used with the tool to check the status of the partition reassignment. Note that the same expand-cluster-reassignment.json (used with the --execute option) should be used with the --verify option:
-  <pre class="brush: bash;">
-  > bin/kafka-reassign-partitions.sh --bootstrap-server localhost:9092 --reassignment-json-file expand-cluster-reassignment.json --verify
+  <pre class="line-numbers"><code class="language-bash">  > bin/kafka-reassign-partitions.sh --bootstrap-server localhost:9092 --reassignment-json-file expand-cluster-reassignment.json --verify
   Status of partition reassignment:
   Reassignment of partition [foo1,0] completed successfully
   Reassignment of partition [foo1,1] is in progress
   Reassignment of partition [foo1,2] is in progress
   Reassignment of partition [foo2,0] completed successfully
   Reassignment of partition [foo2,1] completed successfully
-  Reassignment of partition [foo2,2] completed successfully
-  </pre>
+  Reassignment of partition [foo2,2] completed successfully</code></pre>
 
-  <h5><a id="basic_ops_partitionassignment" href="#basic_ops_partitionassignment">Custom partition assignment and migration</a></h5>
+  <h5 class="anchor-heading"><a id="basic_ops_partitionassignment" class="anchor-link"></a><a href="#basic_ops_partitionassignment">Custom partition assignment and migration</a></h5>
   The partition reassignment tool can also be used to selectively move replicas of a partition to a specific set of brokers. When used in this manner, it is assumed that the user knows the reassignment plan and does not require the tool to generate a candidate reassignment, effectively skipping the --generate step and moving straight to the --execute step
   <p>
   For instance, the following example moves partition 0 of topic foo1 to brokers 5,6 and partition 1 of topic foo2 to brokers 2,3:
   <p>
   The first step is to hand craft the custom reassignment plan in a json file:
-  <pre class="brush: bash;">
-  > cat custom-reassignment.json
-  {"version":1,"partitions":[{"topic":"foo1","partition":0,"replicas":[5,6]},{"topic":"foo2","partition":1,"replicas":[2,3]}]}
-  </pre>
+  <pre class="line-numbers"><code class="language-bash">  > cat custom-reassignment.json
+  {"version":1,"partitions":[{"topic":"foo1","partition":0,"replicas":[5,6]},{"topic":"foo2","partition":1,"replicas":[2,3]}]}</code></pre>
   Then, use the json file with the --execute option to start the reassignment process:
-  <pre class="brush: bash;">
-  > bin/kafka-reassign-partitions.sh --bootstrap-server localhost:9092 --reassignment-json-file custom-reassignment.json --execute
+  <pre class="line-numbers"><code class="language-bash">  > bin/kafka-reassign-partitions.sh --bootstrap-server localhost:9092 --reassignment-json-file custom-reassignment.json --execute
   Current partition replica assignment
 
   {"version":1,
@@ -395,34 +348,28 @@
   {"version":1,
   "partitions":[{"topic":"foo1","partition":0,"replicas":[5,6]},
                 {"topic":"foo2","partition":1,"replicas":[2,3]}]
-  }
-  </pre>
+  }</code></pre>
   <p>
   The --verify option can be used with the tool to check the status of the partition reassignment. Note that the same custom-reassignment.json (used with the --execute option) should be used with the --verify option:
-  <pre class="brush: bash;">
-  > bin/kafka-reassign-partitions.sh --bootstrap-server localhost:9092 --reassignment-json-file custom-reassignment.json --verify
+  <pre class="line-numbers"><code class="language-bash">  > bin/kafka-reassign-partitions.sh --bootstrap-server localhost:9092 --reassignment-json-file custom-reassignment.json --verify
   Status of partition reassignment:
   Reassignment of partition [foo1,0] completed successfully
-  Reassignment of partition [foo2,1] completed successfully
-  </pre>
+  Reassignment of partition [foo2,1] completed successfully</code></pre>
 
-  <h4><a id="basic_ops_decommissioning_brokers" href="#basic_ops_decommissioning_brokers">Decommissioning brokers</a></h4>
+  <h4 class="anchor-heading"><a id="basic_ops_decommissioning_brokers" class="anchor-link"></a><a href="#basic_ops_decommissioning_brokers">Decommissioning brokers</a></h4>
   The partition reassignment tool does not have the ability to automatically generate a reassignment plan for decommissioning brokers yet. As such, the admin has to come up with a reassignment plan to move the replica for all partitions hosted on the broker to be decommissioned, to the rest of the brokers. This can be relatively tedious as the reassignment needs to ensure that all the replicas are not moved from the decommissioned broker to only one other broker. To make this process eff [...]
 
-  <h4><a id="basic_ops_increase_replication_factor" href="#basic_ops_increase_replication_factor">Increasing replication factor</a></h4>
+  <h4 class="anchor-heading"><a id="basic_ops_increase_replication_factor" class="anchor-link"></a><a href="#basic_ops_increase_replication_factor">Increasing replication factor</a></h4>
   Increasing the replication factor of an existing partition is easy. Just specify the extra replicas in the custom reassignment json file and use it with the --execute option to increase the replication factor of the specified partitions.
   <p>
   For instance, the following example increases the replication factor of partition 0 of topic foo from 1 to 3. Before increasing the replication factor, the partition's only replica existed on broker 5. As part of increasing the replication factor, we will add more replicas on brokers 6 and 7.
   <p>
   The first step is to hand craft the custom reassignment plan in a json file:
-  <pre class="brush: bash;">
-  > cat increase-replication-factor.json
+  <pre class="line-numbers"><code class="language-bash">  > cat increase-replication-factor.json
   {"version":1,
-  "partitions":[{"topic":"foo","partition":0,"replicas":[5,6,7]}]}
-  </pre>
+  "partitions":[{"topic":"foo","partition":0,"replicas":[5,6,7]}]}</code></pre>
   Then, use the json file with the --execute option to start the reassignment process:
-  <pre class="brush: bash;">
-  > bin/kafka-reassign-partitions.sh --bootstrap-server localhost:9092 --reassignment-json-file increase-replication-factor.json --execute
+  <pre class="line-numbers"><code class="language-bash">  > bin/kafka-reassign-partitions.sh --bootstrap-server localhost:9092 --reassignment-json-file increase-replication-factor.json --execute
   Current partition replica assignment
 
   {"version":1,
@@ -431,37 +378,31 @@
   Save this to use as the --reassignment-json-file option during rollback
   Successfully started reassignment of partitions
   {"version":1,
-  "partitions":[{"topic":"foo","partition":0,"replicas":[5,6,7]}]}
-  </pre>
+  "partitions":[{"topic":"foo","partition":0,"replicas":[5,6,7]}]}</code></pre>
   <p>
   The --verify option can be used with the tool to check the status of the partition reassignment. Note that the same increase-replication-factor.json (used with the --execute option) should be used with the --verify option:
-  <pre class="brush: bash;">
-  > bin/kafka-reassign-partitions.sh --bootstrap-server localhost:9092 --reassignment-json-file increase-replication-factor.json --verify
+  <pre class="line-numbers"><code class="language-bash">  > bin/kafka-reassign-partitions.sh --bootstrap-server localhost:9092 --reassignment-json-file increase-replication-factor.json --verify
   Status of partition reassignment:
-  Reassignment of partition [foo,0] completed successfully
-  </pre>
+  Reassignment of partition [foo,0] completed successfully</code></pre>
   You can also verify the increase in replication factor with the kafka-topics tool:
-  <pre class="brush: bash;">
-  > bin/kafka-topics.sh --bootstrap-server localhost:9092 --topic foo --describe
+  <pre class="line-numbers"><code class="language-bash">  > bin/kafka-topics.sh --bootstrap-server localhost:9092 --topic foo --describe
   Topic:foo	PartitionCount:1	ReplicationFactor:3	Configs:
-    Topic: foo	Partition: 0	Leader: 5	Replicas: 5,6,7	Isr: 5,6,7
-  </pre>
+    Topic: foo	Partition: 0	Leader: 5	Replicas: 5,6,7	Isr: 5,6,7</code></pre>
 
-  <h4><a id="rep-throttle" href="#rep-throttle">Limiting Bandwidth Usage during Data Migration</a></h4>
+  <h4 class="anchor-heading"><a id="rep-throttle" class="anchor-link"></a><a href="#rep-throttle">Limiting Bandwidth Usage during Data Migration</a></h4>
   Kafka lets you apply a throttle to replication traffic, setting an upper bound on the bandwidth used to move replicas from machine to machine. This is useful when rebalancing a cluster, bootstrapping a new broker or adding or removing brokers, as it limits the impact these data-intensive operations will have on users.
   <p></p>
   There are two interfaces that can be used to engage a throttle. The simplest, and safest, is to apply a throttle when invoking the kafka-reassign-partitions.sh, but kafka-configs.sh can also be used to view and alter the throttle values directly.
   <p></p>
   So for example, if you were to execute a rebalance, with the below command, it would move partitions at no more than 50MB/s.
-  <pre class="brush: bash;">$ bin/kafka-reassign-partitions.sh --bootstrap-server localhost:9092 --execute --reassignment-json-file bigger-cluster.json --throttle 50000000</pre>
+  <pre class="language-bash">$ bin/kafka-reassign-partitions.sh --bootstrap-server localhost:9092 --execute --reassignment-json-file bigger-cluster.json --throttle 50000000</code></pre>
   When you execute this script you will see the throttle engage:
-  <pre class="brush: bash;">
-  The throttle limit was set to 50000000 B/s
-  Successfully started reassignment of partitions.</pre>
+  <pre class="line-numbers"><code class="language-bash">  The throttle limit was set to 50000000 B/s
+  Successfully started reassignment of partitions.</code></pre>
   <p>Should you wish to alter the throttle, during a rebalance, say to increase the throughput so it completes quicker, you can do this by re-running the execute command passing the same reassignment-json-file:</p>
-  <pre class="brush: bash;">$ bin/kafka-reassign-partitions.sh --bootstrap-server localhost:9092  --execute --reassignment-json-file bigger-cluster.json --throttle 700000000
+  <pre class="language-bash">$ bin/kafka-reassign-partitions.sh --bootstrap-server localhost:9092  --execute --reassignment-json-file bigger-cluster.json --throttle 700000000
   There is an existing assignment running.
-  The throttle limit was set to 700000000 B/s</pre>
+  The throttle limit was set to 700000000 B/s</code></pre>
 
   <p>Once the rebalance completes the administrator can check the status of the rebalance using the --verify option.
       If the rebalance has completed, the throttle will be removed via the --verify command. It is important that
@@ -469,28 +410,23 @@
       the --verify option. Failure to do so could cause regular replication traffic to be throttled. </p>
   <p>When the --verify option is executed, and the reassignment has completed, the script will confirm that the throttle was removed:</p>
 
-  <pre class="brush: bash;">
-  > bin/kafka-reassign-partitions.sh --bootstrap-server localhost:9092  --verify --reassignment-json-file bigger-cluster.json
+  <pre class="line-numbers"><code class="language-bash">  > bin/kafka-reassign-partitions.sh --bootstrap-server localhost:9092  --verify --reassignment-json-file bigger-cluster.json
   Status of partition reassignment:
   Reassignment of partition [my-topic,1] completed successfully
   Reassignment of partition [mytopic,0] completed successfully
-  Throttle was removed.</pre>
+  Throttle was removed.</code></pre>
 
   <p>The administrator can also validate the assigned configs using the kafka-configs.sh. There are two pairs of throttle
       configuration used to manage the throttling process. First pair refers to the throttle value itself. This is configured, at a broker
       level, using the dynamic properties: </p>
 
-  <pre class="brush: text;">
-    leader.replication.throttled.rate
-    follower.replication.throttled.rate
-  </pre>
+  <pre class="line-numbers"><code class="language-text">    leader.replication.throttled.rate
+    follower.replication.throttled.rate</code></pre>
 
   <p>Then there is the configuration pair of enumerated sets of throttled replicas: </p>
 
-  <pre class="brush: text;">
-    leader.replication.throttled.replicas
-    follower.replication.throttled.replicas
-  </pre>
+  <pre class="line-numbers"><code class="language-text">    leader.replication.throttled.replicas
+    follower.replication.throttled.replicas</code></pre>
 
   <p>Which are configured per topic. </p>
 
@@ -498,20 +434,18 @@
 
   <p>To view the throttle limit configuration:</p>
 
-  <pre class="brush: bash;">
-  > bin/kafka-configs.sh --describe --bootstrap-server localhost:9092 --entity-type brokers
+  <pre class="line-numbers"><code class="language-bash">  > bin/kafka-configs.sh --describe --bootstrap-server localhost:9092 --entity-type brokers
   Configs for brokers '2' are leader.replication.throttled.rate=700000000,follower.replication.throttled.rate=700000000
-  Configs for brokers '1' are leader.replication.throttled.rate=700000000,follower.replication.throttled.rate=700000000</pre>
+  Configs for brokers '1' are leader.replication.throttled.rate=700000000,follower.replication.throttled.rate=700000000</code></pre>
 
   <p>This shows the throttle applied to both leader and follower side of the replication protocol. By default both sides
       are assigned the same throttled throughput value. </p>
 
   <p>To view the list of throttled replicas:</p>
 
-  <pre class="brush: bash;">
-  > bin/kafka-configs.sh --describe --bootstrap-server localhost:9092 --entity-type topics
+  <pre class="line-numbers"><code class="language-bash">  > bin/kafka-configs.sh --describe --bootstrap-server localhost:9092 --entity-type topics
   Configs for topic 'my-topic' are leader.replication.throttled.replicas=1:102,0:101,
-      follower.replication.throttled.replicas=1:101,0:102</pre>
+      follower.replication.throttled.replicas=1:101,0:102</code></pre>
 
   <p>Here we see the leader throttle is applied to partition 1 on broker 102 and partition 0 on broker 101. Likewise the
       follower throttle is applied to partition 1 on
@@ -538,98 +472,74 @@
   <p><i>(2) Ensuring Progress:</i></p>
   <p>If the throttle is set too low, in comparison to the incoming write rate, it is possible for replication to not
       make progress. This occurs when:</p>
-  <pre>max(BytesInPerSec) > throttle</pre>
+  <pre>max(BytesInPerSec) > throttle</code></pre>
   <p>
       Where BytesInPerSec is the metric that monitors the write throughput of producers into each broker. </p>
   <p>The administrator can monitor whether replication is making progress, during the rebalance, using the metric:</p>
 
-  <pre>kafka.server:type=FetcherLagMetrics,name=ConsumerLag,clientId=([-.\w]+),topic=([-.\w]+),partition=([0-9]+)</pre>
+  <pre>kafka.server:type=FetcherLagMetrics,name=ConsumerLag,clientId=([-.\w]+),topic=([-.\w]+),partition=([0-9]+)</code></pre>
 
   <p>The lag should constantly decrease during replication.  If the metric does not decrease the administrator should
       increase the
       throttle throughput as described above. </p>
 
 
-  <h4><a id="quotas" href="#quotas">Setting quotas</a></h4>
+  <h4 class="anchor-heading"><a id="quotas" class="anchor-link"></a><a href="#quotas">Setting quotas</a></h4>
   Quotas overrides and defaults may be configured at (user, client-id), user or client-id levels as described <a href="#design_quotas">here</a>.
   By default, clients receive an unlimited quota.
 
   It is possible to set custom quotas for each (user, client-id), user or client-id group.
   <p>
   Configure custom quota for (user=user1, client-id=clientA):
-  <pre class="brush: bash;">
-  > bin/kafka-configs.sh  --bootstrap-server localhost:9092 --alter --add-config 'producer_byte_rate=1024,consumer_byte_rate=2048,request_percentage=200' --entity-type users --entity-name user1 --entity-type clients --entity-name clientA
-  Updated config for entity: user-principal 'user1', client-id 'clientA'.
-  </pre>
+  <pre class="line-numbers"><code class="language-bash">  > bin/kafka-configs.sh  --bootstrap-server localhost:9092 --alter --add-config 'producer_byte_rate=1024,consumer_byte_rate=2048,request_percentage=200' --entity-type users --entity-name user1 --entity-type clients --entity-name clientA
+  Updated config for entity: user-principal 'user1', client-id 'clientA'.</code></pre>
 
   Configure custom quota for user=user1:
-  <pre class="brush: bash;">
-  > bin/kafka-configs.sh  --bootstrap-server localhost:9092 --alter --add-config 'producer_byte_rate=1024,consumer_byte_rate=2048,request_percentage=200' --entity-type users --entity-name user1
-  Updated config for entity: user-principal 'user1'.
-  </pre>
+  <pre class="line-numbers"><code class="language-bash">  > bin/kafka-configs.sh  --bootstrap-server localhost:9092 --alter --add-config 'producer_byte_rate=1024,consumer_byte_rate=2048,request_percentage=200' --entity-type users --entity-name user1
+  Updated config for entity: user-principal 'user1'.</code></pre>
 
   Configure custom quota for client-id=clientA:
-  <pre class="brush: bash;">
-  > bin/kafka-configs.sh  --bootstrap-server localhost:9092 --alter --add-config 'producer_byte_rate=1024,consumer_byte_rate=2048,request_percentage=200' --entity-type clients --entity-name clientA
-  Updated config for entity: client-id 'clientA'.
-  </pre>
+  <pre class="line-numbers"><code class="language-bash">  > bin/kafka-configs.sh  --bootstrap-server localhost:9092 --alter --add-config 'producer_byte_rate=1024,consumer_byte_rate=2048,request_percentage=200' --entity-type clients --entity-name clientA
+  Updated config for entity: client-id 'clientA'.</code></pre>
 
   It is possible to set default quotas for each (user, client-id), user or client-id group by specifying <i>--entity-default</i> option instead of <i>--entity-name</i>.
   <p>
   Configure default client-id quota for user=userA:
-  <pre class="brush: bash;">
-  > bin/kafka-configs.sh  --bootstrap-server localhost:9092 --alter --add-config 'producer_byte_rate=1024,consumer_byte_rate=2048,request_percentage=200' --entity-type users --entity-name user1 --entity-type clients --entity-default
-  Updated config for entity: user-principal 'user1', default client-id.
-  </pre>
+  <pre class="line-numbers"><code class="language-bash">  > bin/kafka-configs.sh  --bootstrap-server localhost:9092 --alter --add-config 'producer_byte_rate=1024,consumer_byte_rate=2048,request_percentage=200' --entity-type users --entity-name user1 --entity-type clients --entity-default
+  Updated config for entity: user-principal 'user1', default client-id.</code></pre>
 
   Configure default quota for user:
-  <pre class="brush: bash;">
-  > bin/kafka-configs.sh  --bootstrap-server localhost:9092 --alter --add-config 'producer_byte_rate=1024,consumer_byte_rate=2048,request_percentage=200' --entity-type users --entity-default
-  Updated config for entity: default user-principal.
-  </pre>
+  <pre class="line-numbers"><code class="language-bash">  > bin/kafka-configs.sh  --bootstrap-server localhost:9092 --alter --add-config 'producer_byte_rate=1024,consumer_byte_rate=2048,request_percentage=200' --entity-type users --entity-default
+  Updated config for entity: default user-principal.</code></pre>
 
   Configure default quota for client-id:
-  <pre class="brush: bash;">
-  > bin/kafka-configs.sh  --bootstrap-server localhost:9092 --alter --add-config 'producer_byte_rate=1024,consumer_byte_rate=2048,request_percentage=200' --entity-type clients --entity-default
-  Updated config for entity: default client-id.
-  </pre>
+  <pre class="line-numbers"><code class="language-bash">  > bin/kafka-configs.sh  --bootstrap-server localhost:9092 --alter --add-config 'producer_byte_rate=1024,consumer_byte_rate=2048,request_percentage=200' --entity-type clients --entity-default
+  Updated config for entity: default client-id.</code></pre>
 
   Here's how to describe the quota for a given (user, client-id):
-  <pre class="brush: bash;">
-  > bin/kafka-configs.sh  --bootstrap-server localhost:9092 --describe --entity-type users --entity-name user1 --entity-type clients --entity-name clientA
-  Configs for user-principal 'user1', client-id 'clientA' are producer_byte_rate=1024,consumer_byte_rate=2048,request_percentage=200
-  </pre>
+  <pre class="line-numbers"><code class="language-bash">  > bin/kafka-configs.sh  --bootstrap-server localhost:9092 --describe --entity-type users --entity-name user1 --entity-type clients --entity-name clientA
+  Configs for user-principal 'user1', client-id 'clientA' are producer_byte_rate=1024,consumer_byte_rate=2048,request_percentage=200</code></pre>
   Describe quota for a given user:
-  <pre class="brush: bash;">
-  > bin/kafka-configs.sh  --bootstrap-server localhost:9092 --describe --entity-type users --entity-name user1
-  Configs for user-principal 'user1' are producer_byte_rate=1024,consumer_byte_rate=2048,request_percentage=200
-  </pre>
+  <pre class="line-numbers"><code class="language-bash">  > bin/kafka-configs.sh  --bootstrap-server localhost:9092 --describe --entity-type users --entity-name user1
+  Configs for user-principal 'user1' are producer_byte_rate=1024,consumer_byte_rate=2048,request_percentage=200</code></pre>
   Describe quota for a given client-id:
-  <pre class="brush: bash;">
-  > bin/kafka-configs.sh  --bootstrap-server localhost:9092 --describe --entity-type clients --entity-name clientA
-  Configs for client-id 'clientA' are producer_byte_rate=1024,consumer_byte_rate=2048,request_percentage=200
-  </pre>
+  <pre class="line-numbers"><code class="language-bash">  > bin/kafka-configs.sh  --bootstrap-server localhost:9092 --describe --entity-type clients --entity-name clientA
+  Configs for client-id 'clientA' are producer_byte_rate=1024,consumer_byte_rate=2048,request_percentage=200</code></pre>
   If entity name is not specified, all entities of the specified type are described. For example, describe all users:
-  <pre class="brush: bash;">
-  > bin/kafka-configs.sh  --bootstrap-server localhost:9092 --describe --entity-type users
+  <pre class="line-numbers"><code class="language-bash">  > bin/kafka-configs.sh  --bootstrap-server localhost:9092 --describe --entity-type users
   Configs for user-principal 'user1' are producer_byte_rate=1024,consumer_byte_rate=2048,request_percentage=200
-  Configs for default user-principal are producer_byte_rate=1024,consumer_byte_rate=2048,request_percentage=200
-  </pre>
+  Configs for default user-principal are producer_byte_rate=1024,consumer_byte_rate=2048,request_percentage=200</code></pre>
   Similarly for (user, client):
-  <pre class="brush: bash;">
-  > bin/kafka-configs.sh  --bootstrap-server localhost:9092 --describe --entity-type users --entity-type clients
+  <pre class="line-numbers"><code class="language-bash">  > bin/kafka-configs.sh  --bootstrap-server localhost:9092 --describe --entity-type users --entity-type clients
   Configs for user-principal 'user1', default client-id are producer_byte_rate=1024,consumer_byte_rate=2048,request_percentage=200
-  Configs for user-principal 'user1', client-id 'clientA' are producer_byte_rate=1024,consumer_byte_rate=2048,request_percentage=200
-  </pre>
+  Configs for user-principal 'user1', client-id 'clientA' are producer_byte_rate=1024,consumer_byte_rate=2048,request_percentage=200</code></pre>
   <p>
   It is possible to set default quotas that apply to all client-ids by setting these configs on the brokers. These properties are applied only if quota overrides or defaults are not configured in Zookeeper. By default, each client-id receives an unlimited quota. The following sets the default quota per producer and consumer client-id to 10MB/sec.
-  <pre class="brush: text;">
-    quota.producer.default=10485760
-    quota.consumer.default=10485760
-  </pre>
+  <pre class="line-numbers"><code class="language-text">    quota.producer.default=10485760
+    quota.consumer.default=10485760</code></pre>
   Note that these properties are being deprecated and may be removed in a future release. Defaults configured using kafka-configs.sh take precedence over these properties.
 
-  <h3><a id="datacenters" href="#datacenters">6.2 Datacenters</a></h3>
+  <h3 class="anchor-heading"><a id="datacenters" class="anchor-link"></a><a href="#datacenters">6.2 Datacenters</a></h3>
 
   Some deployments will need to manage a data pipeline that spans multiple datacenters. Our recommended approach to this is to deploy a local Kafka cluster in each datacenter with application instances in each datacenter interacting only with their local cluster and mirroring between clusters (see the documentation on the <a href="#basic_ops_mirror_maker">mirror maker tool</a> for how to do this).
   <p>
@@ -643,9 +553,9 @@
   <p>
   It is generally <i>not</i> advisable to run a <i>single</i> Kafka cluster that spans multiple datacenters over a high-latency link. This will incur very high replication latency both for Kafka writes and ZooKeeper writes, and neither Kafka nor ZooKeeper will remain available in all locations if the network between locations is unavailable.
 
-  <h3><a id="config" href="#config">6.3 Kafka Configuration</a></h3>
+  <h3 class="anchor-heading"><a id="config" class="anchor-link"></a><a href="#config">6.3 Kafka Configuration</a></h3>
 
-  <h4><a id="clientconfig" href="#clientconfig">Important Client Configurations</a></h4>
+  <h4 class="anchor-heading"><a id="clientconfig" class="anchor-link"></a><a href="#clientconfig">Important Client Configurations</a></h4>
 
   The most important producer configurations are:
   <ul>
@@ -657,10 +567,9 @@
   <p>
   All configurations are documented in the <a href="#configuration">configuration</a> section.
   <p>
-  <h4><a id="prodconfig" href="#prodconfig">A Production Server Config</a></h4>
+  <h4 class="anchor-heading"><a id="prodconfig" class="anchor-link"></a><a href="#prodconfig">A Production Server Config</a></h4>
   Here is an example production server configuration:
-  <pre class="brush: text;">
-  # ZooKeeper
+  <pre class="line-numbers"><code class="language-text">  # ZooKeeper
   zookeeper.connect=[list of ZooKeeper servers]
 
   # Log configuration
@@ -673,12 +582,11 @@
   listeners=[list of listeners]
   auto.create.topics.enable=false
   min.insync.replicas=2
-  queued.max.requests=[number of concurrent requests]
-  </pre>
+  queued.max.requests=[number of concurrent requests]</code></pre>
 
   Our client configuration varies a fair amount between different use cases.
 
-  <h3><a id="java" href="#java">6.4 Java Version</a></h3>
+  <h3 class="anchor-heading"><a id="java" class="anchor-link"></a><a href="#java">6.4 Java Version</a></h3>
 
   Java 8 and Java 11 are supported. Java 11 performs significantly better if TLS is enabled, so it is highly recommended (it also includes a number of other
   performance improvements: G1GC, CRC32C, Compact Strings, Thread-Local Handshakes and more).
@@ -687,11 +595,9 @@
 
   Typical arguments for running Kafka with OpenJDK-based Java implementations (including Oracle JDK) are:
   
-  <pre class="brush: text;">
-  -Xmx6g -Xms6g -XX:MetaspaceSize=96m -XX:+UseG1GC
+  <pre class="line-numbers"><code class="language-text">  -Xmx6g -Xms6g -XX:MetaspaceSize=96m -XX:+UseG1GC
   -XX:MaxGCPauseMillis=20 -XX:InitiatingHeapOccupancyPercent=35 -XX:G1HeapRegionSize=16M
-  -XX:MinMetaspaceFreeRatio=50 -XX:MaxMetaspaceFreeRatio=80 -XX:+ExplicitGCInvokesConcurrent
-  </pre>
+  -XX:MinMetaspaceFreeRatio=50 -XX:MaxMetaspaceFreeRatio=80 -XX:+ExplicitGCInvokesConcurrent</code></pre>
 
   For reference, here are the stats for one of LinkedIn's busiest clusters (at peak) that uses said Java arguments:
   <ul>
@@ -703,14 +609,14 @@
 
   All of the brokers in that cluster have a 90% GC pause time of about 21ms with less than 1 young GC per second.
 
-  <h3><a id="hwandos" href="#hwandos">6.5 Hardware and OS</a></h3>
+  <h3 class="anchor-heading"><a id="hwandos" class="anchor-link"></a><a href="#hwandos">6.5 Hardware and OS</a></h3>
   We are using dual quad-core Intel Xeon machines with 24GB of memory.
   <p>
   You need sufficient memory to buffer active readers and writers. You can do a back-of-the-envelope estimate of memory needs by assuming you want to be able to buffer for 30 seconds and compute your memory need as write_throughput*30.
   <p>
   The disk throughput is important. We have 8x7200 rpm SATA drives. In general disk throughput is the performance bottleneck, and more disks is better. Depending on how you configure flush behavior you may or may not benefit from more expensive disks (if you force flush often then higher RPM SAS drives may be better).
 
-  <h4><a id="os" href="#os">OS</a></h4>
+  <h4 class="anchor-heading"><a id="os" class="anchor-link"></a><a href="#os">OS</a></h4>
   Kafka should run well on any unix system and has been tested on Linux and Solaris.
   <p>
   We have seen a few issues running on Windows and Windows is not currently a well supported platform though we would be happy to change that.
@@ -723,7 +629,7 @@
   </ul>
   <p>
 
-  <h4><a id="diskandfs" href="#diskandfs">Disks and Filesystem</a></h4>
+  <h4 class="anchor-heading"><a id="diskandfs" class="anchor-link"></a><a href="#diskandfs">Disks and Filesystem</a></h4>
   We recommend using multiple drives to get good throughput and not sharing the same drives used for Kafka data with application logs or other OS filesystem activity to ensure good latency. You can either RAID these drives together into a single volume or format and mount each drive as its own directory. Since Kafka has replication the redundancy provided by RAID can also be provided at the application level. This choice has several tradeoffs.
   <p>
   If you configure multiple data directories partitions will be assigned round-robin to data directories. Each partition will be entirely in one of the data directories. If data is not well balanced among partitions this can lead to load imbalance between disks.
@@ -732,7 +638,7 @@
   <p>
   Another potential benefit of RAID is the ability to tolerate disk failures. However our experience has been that rebuilding the RAID array is so I/O intensive that it effectively disables the server, so this does not provide much real availability improvement.
 
-  <h4><a id="appvsosflush" href="#appvsosflush">Application vs. OS Flush Management</a></h4>
+  <h4 class="anchor-heading"><a id="appvsosflush" class="anchor-link"></a><a href="#appvsosflush">Application vs. OS Flush Management</a></h4>
   Kafka always immediately writes all data to the filesystem and supports the ability to configure the flush policy that controls when data is forced out of the OS cache and onto disk using the flush. This flush policy can be controlled to force data to disk after a period of time or after a certain number of messages has been written. There are several choices in this configuration.
   <p>
   Kafka must eventually call fsync to know that data was flushed. When recovering from a crash for any log segment not known to be fsync'd Kafka will check the integrity of each message by checking its CRC and also rebuild the accompanying offset index file as part of the recovery process executed on startup.
@@ -745,7 +651,7 @@
   <p>
   In general you don't need to do any low-level tuning of the filesystem, but in the next few sections we will go over some of this in case it is useful.
 
-  <h4><a id="linuxflush" href="#linuxflush">Understanding Linux OS Flush Behavior</a></h4>
+  <h4 class="anchor-heading"><a id="linuxflush" class="anchor-link"></a><a href="#linuxflush">Understanding Linux OS Flush Behavior</a></h4>
 
   In Linux, data written to the filesystem is maintained in <a href="http://en.wikipedia.org/wiki/Page_cache">pagecache</a> until it must be written out to disk (due to an application-level fsync or the OS's own flush policy). The flushing of data is done by a set of background threads called pdflush (or in post 2.6.32 kernels "flusher threads").
   <p>
@@ -754,7 +660,7 @@
   When Pdflush cannot keep up with the rate of data being written it will eventually cause the writing process to block incurring latency in the writes to slow down the accumulation of data.
   <p>
   You can see the current state of OS memory usage by doing
-  <pre class="brush: bash;"> &gt; cat /proc/meminfo </pre>
+  <pre class="language-bash"> &gt; cat /proc/meminfo </code></pre>
   The meaning of these values are described in the link above.
   <p>
   Using pagecache has several advantages over an in-process cache for storing data that will be written out to disk:
@@ -764,21 +670,21 @@
     <li>It automatically uses all the free memory on the machine
   </ul>
 
-  <h4><a id="filesystems" href="#filesystems">Filesystem Selection</a></h4>
+  <h4 class="anchor-heading"><a id="filesystems" class="anchor-link"></a><a href="#filesystems">Filesystem Selection</a></h4>
   <p>Kafka uses regular files on disk, and as such it has no hard dependency on a specific filesystem. The two filesystems which have the most usage, however, are EXT4 and XFS. Historically, EXT4 has had more usage, but recent improvements to the XFS filesystem have shown it to have better performance characteristics for Kafka's workload with no compromise in stability.</p>
   <p>Comparison testing was performed on a cluster with significant message loads, using a variety of filesystem creation and mount options. The primary metric in Kafka that was monitored was the "Request Local Time", indicating the amount of time append operations were taking. XFS resulted in much better local times (160ms vs. 250ms+ for the best EXT4 configuration), as well as lower average wait times. The XFS performance also showed less variability in disk performance.</p>
-  <h5><a id="generalfs" href="#generalfs">General Filesystem Notes</a></h5>
+  <h5 class="anchor-heading"><a id="generalfs" class="anchor-link"></a><a href="#generalfs">General Filesystem Notes</a></h5>
   For any filesystem used for data directories, on Linux systems, the following options are recommended to be used at mount time:
   <ul>
     <li>noatime: This option disables updating of a file's atime (last access time) attribute when the file is read. This can eliminate a significant number of filesystem writes, especially in the case of bootstrapping consumers. Kafka does not rely on the atime attributes at all, so it is safe to disable this.</li>
   </ul>
-  <h5><a id="xfs" href="#xfs">XFS Notes</a></h5>
+  <h5 class="anchor-heading"><a id="xfs" class="anchor-link"></a><a href="#xfs">XFS Notes</a></h5>
   The XFS filesystem has a significant amount of auto-tuning in place, so it does not require any change in the default settings, either at filesystem creation time or at mount. The only tuning parameters worth considering are:
   <ul>
     <li>largeio: This affects the preferred I/O size reported by the stat call. While this can allow for higher performance on larger disk writes, in practice it had minimal or no effect on performance.</li>
     <li>nobarrier: For underlying devices that have battery-backed cache, this option can provide a little more performance by disabling periodic write flushes. However, if the underlying device is well-behaved, it will report to the filesystem that it does not require flushes, and this option will have no effect.</li>
   </ul>
-  <h5><a id="ext4" href="#ext4">EXT4 Notes</a></h5>
+  <h5 class="anchor-heading"><a id="ext4" class="anchor-link"></a><a href="#ext4">EXT4 Notes</a></h5>
   EXT4 is a serviceable choice of filesystem for the Kafka data directories, however getting the most performance out of it will require adjusting several mount options. In addition, these options are generally unsafe in a failure scenario, and will result in much more data loss and corruption. For a single broker failure, this is not much of a concern as the disk can be wiped and the replicas rebuilt from the cluster. In a multiple-failure scenario, such as a power outage, this can mean [...]
   <ul>
     <li>data=writeback: Ext4 defaults to data=ordered which puts a strong order on some writes. Kafka does not require this ordering as it does very paranoid data recovery on all unflushed log. This setting removes the ordering constraint and seems to significantly reduce latency.
@@ -788,7 +694,7 @@
     <li>delalloc: Delayed allocation means that the filesystem avoid allocating any blocks until the physical write occurs. This allows ext4 to allocate a large extent instead of smaller pages and helps ensure the data is written sequentially. This feature is great for throughput. It does seem to involve some locking in the filesystem which adds a bit of latency variance.
   </ul>
 
-  <h3><a id="monitoring" href="#monitoring">6.6 Monitoring</a></h3>
+  <h3 class="anchor-heading"><a id="monitoring" class="anchor-link"></a><a href="#monitoring">6.6 Monitoring</a></h3>
 
   Kafka uses Yammer Metrics for metrics reporting in the server. The Java clients use Kafka Metrics, a built-in metrics registry that minimizes transitive dependencies pulled into client applications. Both expose metrics via JMX and can be configured to report stats using pluggable stats reporters to hook up to your monitoring system.
   <p>
@@ -797,7 +703,7 @@
   <p>
   The easiest way to see the available metrics is to fire up jconsole and point it at a running kafka client or server; this will allow browsing all metrics with JMX.
 
-  <h4><a id="remote_jmx" href="#remote_jmx">Security Considerations for Remote Monitoring using JMX</a></h4>
+  <h4 class="anchor-heading"><a id="remote_jmx" class="anchor-link"></a><a href="#remote_jmx">Security Considerations for Remote Monitoring using JMX</a></h4>
   Apache Kafka disables remote JMX by default. You can enable remote monitoring using JMX by setting the environment variable
   <code>JMX_PORT</code> for processes started using the CLI or standard Java system properties to enable remote JMX programmatically.
   You must enable security when enabling remote JMX in production scenarios to ensure that unauthorized users cannot monitor or
@@ -1394,7 +1300,7 @@
     </tbody>
   </table>
 
-  <h4><a id="producer_monitoring" href="#producer_monitoring">Producer monitoring</a></h4>
+  <h4 class="anchor-heading"><a id="producer_monitoring" class="anchor-link"></a><a href="#producer_monitoring">Producer monitoring</a></h4>
 
   The following metrics are available on producer instances.
 
@@ -1427,12 +1333,12 @@
 
   </tbody></table>
 
-  <h5><a id="producer_sender_monitoring" href="#producer_sender_monitoring">Producer Sender Metrics</a></h5>
+  <h5 class="anchor-heading"><a id="producer_sender_monitoring" class="anchor-link"></a><a href="#producer_sender_monitoring">Producer Sender Metrics</a></h5>
 
   <!--#include virtual="generated/producer_metrics.html" -->
 
 
-  <h4><a id="consumer_monitoring" href="#consumer_monitoring">consumer monitoring</a></h4>
+  <h4 class="anchor-heading"><a id="consumer_monitoring" class="anchor-link"></a><a href="#consumer_monitoring">consumer monitoring</a></h4>
 
   The following metrics are available on consumer instances.
 
@@ -1466,7 +1372,7 @@
     </tbody>
   </table>
 
-  <h5><a id="consumer_group_monitoring" href="#consumer_group_monitoring">Consumer Group Metrics</a></h5>
+  <h5 class="anchor-heading"><a id="consumer_group_monitoring" class="anchor-link"></a><a href="#consumer_group_monitoring">Consumer Group Metrics</a></h5>
   <table class="data-table">
     <tbody>
       <tr>
@@ -1632,18 +1538,18 @@
     </tbody>
   </table>
 
-  <h5><a id="consumer_fetch_monitoring" href="#consumer_fetch_monitoring">Consumer Fetch Metrics</a></h5>
+  <h5 class="anchor-heading"><a id="consumer_fetch_monitoring" class="anchor-link"></a><a href="#consumer_fetch_monitoring">Consumer Fetch Metrics</a></h5>
 
   <!--#include virtual="generated/consumer_metrics.html" -->
 
-  <h4><a id="connect_monitoring" href="#connect_monitoring">Connect Monitoring</a></h4>
+  <h4 class="anchor-heading"><a id="connect_monitoring" class="anchor-link"></a><a href="#connect_monitoring">Connect Monitoring</a></h4>
 
   A Connect worker process contains all the producer and consumer metrics as well as metrics specific to Connect.
   The worker process itself has a number of metrics, while each connector and task have additional metrics.
 
   <!--#include virtual="generated/connect_metrics.html" -->
 
-  <h4><a id="kafka_streams_monitoring" href="#kafka_streams_monitoring">Streams Monitoring</a></h4>
+  <h4 class="anchor-heading"><a id="kafka_streams_monitoring" class="anchor-link"></a><a href="#kafka_streams_monitoring">Streams Monitoring</a></h4>
 
   A Kafka Streams instance contains all the producer and consumer metrics as well as additional metrics specific to Streams.
   By default Kafka Streams has metrics with two recording levels: <code>debug</code> and <code>info</code>.
@@ -1658,9 +1564,9 @@
   Use the following configuration option to specify which metrics
   you want collected:
 
-<pre>metrics.recording.level="info"</pre>
+<pre><code>metrics.recording.level="info"</code></pre>
 
-<h5><a id="kafka_streams_client_monitoring" href="#kafka_streams_client_monitoring">Client Metrics</a></h5>
+<h5 class="anchor-heading"><a id="kafka_streams_client_monitoring" class="anchor-link"></a><a href="#kafka_streams_client_monitoring">Client Metrics</a></h5>
 All of the following metrics have a recording level of <code>info</code>:
 <table class="data-table">
   <tbody>
@@ -1697,7 +1603,7 @@ All of the following metrics have a recording level of <code>info</code>:
   </tbody>
 </table>
 
-<h5><a id="kafka_streams_thread_monitoring" href="#kafka_streams_thread_monitoring">Thread Metrics</a></h5>
+<h5 class="anchor-heading"><a id="kafka_streams_thread_monitoring" class="anchor-link"></a><a href="#kafka_streams_thread_monitoring">Thread Metrics</a></h5>
 All of the following metrics have a recording level of <code>info</code>:
 <table class="data-table">
     <tbody>
@@ -1809,7 +1715,7 @@ All of the following metrics have a recording level of <code>info</code>:
  </tbody>
 </table>
 
-<h5><a id="kafka_streams_task_monitoring" href="#kafka_streams_task_monitoring">Task Metrics</a></h5>
+<h5 class="anchor-heading"><a id="kafka_streams_task_monitoring" class="anchor-link"></a><a href="#kafka_streams_task_monitoring">Task Metrics</a></h5>
 All of the following metrics have a recording level of <code>debug</code>, except for metrics
 dropped-records-rate and dropped-records-total which have a recording level of <code>info</code>:
  <table class="data-table">
@@ -1892,7 +1798,7 @@ dropped-records-rate and dropped-records-total which have a recording level of <
  </tbody>
 </table>
 
- <h5><a id="kafka_streams_node_monitoring" href="#kafka_streams_node_monitoring">Processor Node Metrics</a></h5>
+ <h5 class="anchor-heading"><a id="kafka_streams_node_monitoring" class="anchor-link"></a><a href="#kafka_streams_node_monitoring">Processor Node Metrics</a></h5>
  The following metrics are only available on certain types of nodes, i.e., process-rate and process-total are
  only available for source processor nodes and suppression-emit-rate and suppression-emit-total are only available
  for suppression operation nodes. All of the metrics have a recording level of <code>debug</code>:
@@ -1926,7 +1832,7 @@ dropped-records-rate and dropped-records-total which have a recording level of <
  </tbody>
  </table>
 
- <h5><a id="kafka_streams_store_monitoring" href="#kafka_streams_store_monitoring">State Store Metrics</a></h5>
+ <h5 class="anchor-heading"><a id="kafka_streams_store_monitoring" class="anchor-link"></a><a href="#kafka_streams_store_monitoring">State Store Metrics</a></h5>
  All of the following metrics have a recording level of <code>debug</code>. Note that the <code>store-scope</code> value is specified in <code>StoreSupplier#metricsScope()</code> for user's customized
  state stores; for built-in state stores, currently we have:
   <ul>
@@ -2106,7 +2012,7 @@ dropped-records-rate and dropped-records-total which have a recording level of <
     </tbody>
  </table>
 
-  <h5><a id="kafka_streams_rocksdb_monitoring" href="#kafka_streams_rocksdb_monitoring">RocksDB Metrics</a></h5>
+  <h5 class="anchor-heading"><a id="kafka_streams_rocksdb_monitoring" class="anchor-link"></a><a href="#kafka_streams_rocksdb_monitoring">RocksDB Metrics</a></h5>
   All of the following metrics have a recording level of <code>debug</code>.
   The metrics are collected every minute from the RocksDB state stores.
   If a state store consists of multiple RocksDB instances as it is the case for aggregations over time and session windows,
@@ -2208,7 +2114,7 @@ dropped-records-rate and dropped-records-total which have a recording level of <
     </tbody>
   </table>
 
-  <h5><a id="kafka_streams_cache_monitoring" href="#kafka_streams_cache_monitoring">Record Cache Metrics</a></h5>
+  <h5 class="anchor-heading"><a id="kafka_streams_cache_monitoring" class="anchor-link"></a><a href="#kafka_streams_cache_monitoring">Record Cache Metrics</a></h5>
   All of the following metrics have a recording level of <code>debug</code>:
 
   <table class="data-table">
@@ -2236,18 +2142,18 @@ dropped-records-rate and dropped-records-total which have a recording level of <
     </tbody>
  </table>
 
-  <h4><a id="others_monitoring" href="#others_monitoring">Others</a></h4>
+  <h4 class="anchor-heading"><a id="others_monitoring" class="anchor-link"></a><a href="#others_monitoring">Others</a></h4>
 
   We recommend monitoring GC time and other stats and various server stats such as CPU utilization, I/O service time, etc.
 
   On the client side, we recommend monitoring the message/byte rate (global and per topic), request rate/size/time, and on the consumer side, max lag in messages among all partitions and min fetch request rate. For a consumer to keep up, max lag needs to be less than a threshold and min fetch rate needs to be larger than 0.
 
-  <h3><a id="zk" href="#zk">6.7 ZooKeeper</a></h3>
+  <h3 class="anchor-heading"><a id="zk" class="anchor-link"></a><a href="#zk">6.7 ZooKeeper</a></h3>
 
-  <h4><a id="zkversion" href="#zkversion">Stable version</a></h4>
+  <h4 class="anchor-heading"><a id="zkversion" class="anchor-link"></a><a href="#zkversion">Stable version</a></h4>
   The current stable branch is 3.5. Kafka is regularly updated to include the latest release in the 3.5 series.
 
-  <h4><a id="zkops" href="#zkops">Operationalizing ZooKeeper</a></h4>
+  <h4 class="anchor-heading"><a id="zkops" class="anchor-link"></a><a href="#zkops">Operationalizing ZooKeeper</a></h4>
   Operationally, we do the following for a healthy ZooKeeper installation:
   <ul>
     <li>Redundancy in the physical/hardware/network layout: try not to put them all in the same rack, decent (but don't go nuts) hardware, try to keep redundant power and network paths, etc. A typical ZooKeeper ensemble has 5 or 7 servers, which tolerates 2 and 3 servers down, respectively. If you have a small deployment, then using 3 servers is acceptable, but keep in mind that you'll only be able to tolerate 1 server down in this case. </li>
diff --git a/docs/protocol.html b/docs/protocol.html
index 5759aa3..29811a2 100644
--- a/docs/protocol.html
+++ b/docs/protocol.html
@@ -59,9 +59,9 @@
     <li><a href="#protocol_philosophy">Some Common Philosophical Questions</a></li>
 </ul>
 
-<h4><a id="protocol_preliminaries" href="#protocol_preliminaries">Preliminaries</a></h4>
+<h4 class="anchor-heading"><a id="protocol_preliminaries" class="anchor-link"></a><a href="#protocol_preliminaries">Preliminaries</a></h4>
 
-<h5><a id="protocol_network" href="#protocol_network">Network</a></h5>
+<h5 class="anchor-heading"><a id="protocol_network" class="anchor-link"></a><a href="#protocol_network">Network</a></h5>
 
 <p>Kafka uses a binary protocol over TCP. The protocol defines all APIs as request response message pairs. All messages are size delimited and are made up of the following primitive types.</p>
 
@@ -73,7 +73,7 @@
 
 <p>The server has a configurable maximum limit on request size and any request that exceeds this limit will result in the socket being disconnected.</p>
 
-<h5><a id="protocol_partitioning" href="#protocol_partitioning">Partitioning and bootstrapping</a></h5>
+<h5 class="anchor-heading"><a id="protocol_partitioning" class="anchor-link"></a><a href="#protocol_partitioning">Partitioning and bootstrapping</a></h5>
 
 <p>Kafka is a partitioned system so not all servers have the complete data set. Instead recall that topics are split into a pre-defined number of partitions, P, and each partition is replicated with some replication factor, N. Topic partitions themselves are just ordered "commit logs" numbered 0, 1, ..., P-1.</p>
 
@@ -92,7 +92,7 @@
     <li>If we get an appropriate error, refresh the metadata and try again.</li>
 </ol>
 
-<h5><a id="protocol_partitioning_strategies" href="#protocol_partitioning_strategies">Partitioning Strategies</a></h5>
+<h5 class="anchor-heading"><a id="protocol_partitioning_strategies" class="anchor-link"></a><a href="#protocol_partitioning_strategies">Partitioning Strategies</a></h5>
 
 <p>As mentioned above the assignment of messages to partitions is something the producing client controls. That said, how should this functionality be exposed to the end-user?</p>
 
@@ -108,13 +108,13 @@
 
 <p>Semantic partitioning means using some key in the message to assign messages to partitions. For example if you were processing a click message stream you might want to partition the stream by the user id so that all data for a particular user would go to a single consumer. To accomplish this the client can take a key associated with the message and use some hash of this key to choose the partition to which to deliver the message.</p>
 
-<h5><a id="protocol_batching" href="#protocol_batching">Batching</a></h5>
+<h5 class="anchor-heading"><a id="protocol_batching" class="anchor-link"></a><a href="#protocol_batching">Batching</a></h5>
 
 <p>Our APIs encourage batching small things together for efficiency. We have found this is a very significant performance win. Both our API to send messages and our API to fetch messages always work with a sequence of messages not a single message to encourage this. A clever client can make use of this and support an "asynchronous" mode in which it batches together messages sent individually and sends them in larger clumps. We go even further with this and allow the batching across multi [...]
 
 <p>The client implementer can choose to ignore this and send everything one at a time if they like.</p>
 
-<h5><a id="protocol_compatibility" href="#protocol_compatibility">Compatibility</a></h5>
+<h5 class="anchor-heading"><a id="protocol_compatibility" class="anchor-link"></a><a href="#protocol_compatibility">Compatibility</a></h5>
 
 <p>Kafka has a "bidirectional" client compatibility policy.  In other words, new clients can talk to old servers, and old clients can talk to new servers.  This allows users to upgrade either clients or servers without experiencing any downtime.
 
@@ -128,7 +128,7 @@
 
 <p>Note that <a href="https://cwiki.apache.org/confluence/display/KAFKA/KIP-482%3A+The+Kafka+Protocol+should+Support+Optional+Tagged+Fields">KIP-482 tagged fields</a> can be added to a request without incrementing the version number.  This offers an additional way of evolving the message schema without breaking compatibility.  Tagged fields do not take up any space when the field is not set.  Therefore, if a field is rarely used, it is more efficient to make it a tagged field than to put [...]
 
-<h5><a id="api_versions" href="#api_versions">Retrieving Supported API versions</a></h5>
+<h5 class="anchor-heading"><a id="api_versions" class="anchor-link"></a><a href="#api_versions">Retrieving Supported API versions</a></h5>
 <p>In order to work against multiple broker versions, clients need to know what versions of various APIs a
     broker supports. The broker exposes this information since 0.10.0.0 as described in <a href="https://cwiki.apache.org/confluence/display/KAFKA/KIP-35+-+Retrieving+protocol+version">KIP-35</a>.
     Clients should use the supported API versions information to choose the highest API version supported by both client and broker. If no such version
@@ -151,7 +151,7 @@
         upgraded/downgraded in the mean time.</li>
 </ol>
 
-<h5><a id="sasl_handshake" href="#sasl_handshake">SASL Authentication Sequence</a></h5>
+<h5 class="anchor-heading"><a id="sasl_handshake" class="anchor-link"></a><a href="#sasl_handshake">SASL Authentication Sequence</a></h5>
 <p>The following sequence is used for SASL authentication:
 <ol>
   <li>Kafka <code>ApiVersionsRequest</code> may be sent by the client to obtain the version ranges of requests supported by the broker. This is optional.</li>
@@ -167,50 +167,48 @@
 Kafka request. SASL/GSSAPI authentication is performed starting with this packet, skipping the first two steps above.</p>
 
 
-<h4><a id="protocol_details" href="#protocol_details">The Protocol</a></h4>
+<h4 class="anchor-heading"><a id="protocol_details" class="anchor-link"></a><a href="#protocol_details">The Protocol</a></h4>
 
-<h5><a id="protocol_types" href="#protocol_types">Protocol Primitive Types</a></h5>
+<h5 class="anchor-heading"><a id="protocol_types" class="anchor-link"></a><a href="#protocol_types">Protocol Primitive Types</a></h5>
 
 <p>The protocol is built out of the following primitive types.</p>
 <!--#include virtual="generated/protocol_types.html" -->
 
-<h5><a id="protocol_grammar" href="#protocol_grammar">Notes on reading the request format grammars</a></h5>
+<h5 class="anchor-heading"><a id="protocol_grammar" class="anchor-link"></a><a href="#protocol_grammar">Notes on reading the request format grammars</a></h5>
 
 <p>The <a href="https://en.wikipedia.org/wiki/Backus%E2%80%93Naur_Form">BNF</a>s below give an exact context free grammar for the request and response binary format. The BNF is intentionally not compact in order to give human-readable name. As always in a BNF a sequence of productions indicates concatenation. When there are multiple possible productions these are separated with '|' and may be enclosed in parenthesis for grouping. The top-level definition is always given first and subsequ [...]
 
-<h5><a id="protocol_common" href="#protocol_common">Common Request and Response Structure</a></h5>
+<h5 class="anchor-heading"><a id="protocol_common" class="anchor-link"></a><a href="#protocol_common">Common Request and Response Structure</a></h5>
 
 <p>All requests and responses originate from the following grammar which will be incrementally describe through the rest of this document:</p>
 
-<pre>
-RequestOrResponse => Size (RequestMessage | ResponseMessage)
-  Size => int32
-</pre>
+<pre class="line-numbers"><code class="language-text">RequestOrResponse => Size (RequestMessage | ResponseMessage)
+  Size => int32</code></pre>
 
 <table class="data-table"><tbody>
 <tr><th>Field</th><th>Description</th></tr>
 <tr><td>message_size</td><td>The message_size field gives the size of the subsequent request or response message in bytes. The client can read requests by first reading this 4 byte size as an integer N, and then reading and parsing the subsequent N bytes of the request.</td></tr>
 </table>
 
-<h5><a id="protocol_recordbatch" href="#protocol_recordbatch">Record Batch</a></h5>
+<h5 class="anchor-heading"><a id="protocol_recordbatch" class="anchor-link"></a><a href="#protocol_recordbatch">Record Batch</a></h5>
 <p>A description of the record batch format can be found <a href="/documentation/#recordbatch">here</a>.</p>
 
-<h4><a id="protocol_constants" href="#protocol_constants">Constants</a></h4>
+<h4 class="anchor-heading"><a id="protocol_constants" class="anchor-link"></a><a href="#protocol_constants">Constants</a></h4>
 
-<h5><a id="protocol_error_codes" href="#protocol_error_codes">Error Codes</a></h5>
+<h5 class="anchor-heading"><a id="protocol_error_codes" class="anchor-link"></a><a href="#protocol_error_codes">Error Codes</a></h5>
 <p>We use numeric codes to indicate what problem occurred on the server. These can be translated by the client into exceptions or whatever the appropriate error handling mechanism in the client language. Here is a table of the error codes currently in use:</p>
 <!--#include virtual="generated/protocol_errors.html" -->
 
-<h5><a id="protocol_api_keys" href="#protocol_api_keys">Api Keys</a></h5>
+<h5 class="anchor-heading"><a id="protocol_api_keys" class="anchor-link"></a><a href="#protocol_api_keys">Api Keys</a></h5>
 <p>The following are the numeric codes that the ApiKey in the request can take for each of the below request types.</p>
 <!--#include virtual="generated/protocol_api_keys.html" -->
 
-<h4><a id="protocol_messages" href="#protocol_messages">The Messages</a></h4>
+<h4 class="anchor-heading"><a id="protocol_messages" class="anchor-link"></a><a href="#protocol_messages">The Messages</a></h4>
 
 <p>This section gives details on each of the individual API Messages, their usage, their binary format, and the meaning of their fields.</p>
 <!--#include virtual="generated/protocol_messages.html" -->
 
-<h4><a id="protocol_philosophy" href="#protocol_philosophy">Some Common Philosophical Questions</a></h4>
+<h4 class="anchor-heading"><a id="protocol_philosophy" class="anchor-link"></a><a href="#protocol_philosophy">Some Common Philosophical Questions</a></h4>
 
 <p>Some people have asked why we don't use HTTP. There are a number of reasons, the best is that client implementors can make use of some of the more advanced TCP features--the ability to multiplex requests, the ability to simultaneously poll many connections, etc. We have also found HTTP libraries in many languages to be surprisingly shabby.</p>
 
diff --git a/docs/quickstart-docker.html b/docs/quickstart-docker.html
new file mode 100644
index 0000000..d8816ba
--- /dev/null
+++ b/docs/quickstart-docker.html
@@ -0,0 +1,204 @@
+<!--
+ Licensed to the Apache Software Foundation (ASF) under one or more
+ contributor license agreements.  See the NOTICE file distributed with
+ this work for additional information regarding copyright ownership.
+ The ASF licenses this file to You under the Apache License, Version 2.0
+ (the "License"); you may not use this file except in compliance with
+ the License.  You may obtain a copy of the License at
+
+    http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+-->
+
+<script><!--#include virtual="js/templateData.js" --></script>
+
+<script id="quickstart-docker-template" type="text/x-handlebars-template">
+<div class="quickstart-step">
+<h4 class="anchor-heading">
+    <a class="anchor-link" id="step-1-get-kafka" href="#step-1-get-kafka"></a>
+    <a href="#step-1-get-kafka">Step 1: Get Kafka</a>
+</h4>
+
+<p>
+    This docker-compose file will run everything for you via <a href="https://www.docker.com/" rel="nofollow">Docker</a>.
+    Copy and paste it into a file named <code>docker-compose.yml</code> on your local filesystem.
+</p>
+<pre class="line-numbers"><code class="language-bash">---
+    version: '2'
+    
+    services:
+      broker:
+        image: apache-kafka/broker:2.5.0
+        hostname: kafka-broker
+        container_name: kafka-broker
+    
+    # ...rest omitted...</code></pre>
+</div>
+
+<div class="quickstart-step">
+<h4 class="anchor-heading">
+    <a class="anchor-link" id="step-2-start-kafka" href="#step-2-start-kafka"></a>
+    <a href="#step-2-start-kafka">Step 2: Start the Kafka environment</a>
+</h4>
+
+<p>
+    From the directory containing the <code>docker-compose.yml</code> file created in the previous step, run this
+    command in order to start all services in the correct order:
+</p>
+<pre class="line-numbers"><code class="language-bash">$ docker-compose up</code></pre>
+<p>
+    Once all services have successfully launched, you will have a basic Kafka environment running and ready to use.
+</p>
+</div>
+
+<div class="quickstart-step">
+<h4 class="anchor-heading">
+    <a class="anchor-link" id="step-3-create-a-topic" href="#step-3-create-a-topic"></a>
+    <a href="#step-3-create-a-topic">Step 3: Create a topic to store your events</a>
+</h4>
+<p>Kafka is a distributed <em>event streaming platform</em> that lets you read, write, store, and process
+<a href="/documentation/#messages" rel="nofollow"><em>events</em></a> (also called <em>records</em> or <em>messages</em> in the documentation)
+across many machines.
+Example events are payment transactions, geolocation updates from mobile phones, shipping orders, sensor measurements
+from IoT devices or medical equipment, and much more.
+These events are organized and stored in <a href="/documentation/#intro_topics" rel="nofollow"><em>topics</em></a>.
+Very simplified, a topic is similar to a folder in a filesystem, and the events are the files in that folder.</p>
+<p>So before you can write your first events, you must create a topic:</p>
+<pre class="line-numbers"><code class="language-bash">$ docker exec -it kafka-broker kafka-topics.sh --create --topic quickstart-events</code></pre>
+<p>All of Kafka's command line tools have additional options: run the <code>kafka-topics.sh</code> command without any
+arguments to display usage information.
+For example, it can also show you
+<a href="/documentation/#intro_topics" rel="nofollow">details such as the partition count</a> of the new topic:</p>
+<pre class="line-numbers"><code class="language-bash">$ docker exec -it kafka-broker kafka-topics.sh --describe --topic quickstart-events
+    Topic:quickstart-events  PartitionCount:1    ReplicationFactor:1 Configs:
+    Topic: quickstart-events Partition: 0    Leader: 0   Replicas: 0 Isr: 0</code></pre>
+</div>
+
+<div class="quickstart-step">
+<h4 class="anchor-heading">
+    <a class="anchor-link" id="step-4-write-events" href="#step-4-write-events"></a>
+    <a href="#step-4-write-events">Step 4: Write some events into the topic</a>
+</h4>
+<p>A Kafka client communicates with the Kafka brokers via the network for writing (or reading) events.
+Once received, the brokers will store the events in a durable and fault-tolerant manner for as long as you
+need—even forever.</p>
+<p>Run the console producer client to write a few events into your topic.
+By default, each line you enter will result in a separate event being written to the topic.</p>
+<pre class="line-numbers"><code class="language-bash">$ docker exec -it kafka-broker kafka-console-producer.sh --topic quickstart-events
+This is my first event
+This is my second event</code></pre>
+<p>You can stop the producer client with <code>Ctrl-C</code> at any time.</p>
+</div>
+
+<div class="quickstart-step">
+<h4 class="anchor-heading">
+    <a class="anchor-link" id="step-5-read-the-events" href="#step-5-read-the-events"></a>
+    <a href="#step-5-read-the-events">Step 5: Read the events</a>
+</h4>
+<p>Open another terminal session and run the console consumer client to read the events you just created:</p>
+<pre class="line-numbers"><code class="language-bash">$ docker exec -it kafka-broker kafka-console-consumer.sh --topic quickstart-events --from-beginning
+This is my first event
+This is my second event</code></pre>
+<p>You can stop the consumer client with <code>Ctrl-C</code> at any time.</p>
+<p>Feel free to experiment: for example, switch back to your producer terminal (previous step) to write
+additional events, and see how the events immediately show up in your consumer terminal.</p>
+<p>Because events are durably stored in Kafka, they can be read as many times and by as many consumers as you want.
+You can easily verify this by opening yet another terminal session and re-running the previous command again.</p>
+
+</div>
+
+<div class="quickstart-step">
+<h4 class="anchor-heading">
+    <a class="anchor-link" id="step-5-read-the-events" href="#step-5-read-the-events"></a>
+    <a href="#step-5-read-the-events">Step 6: Import/export your data as streams of events with Kafka Connect</a>
+</h4>
+<p>You probably have lots of data in existing systems like relational databases or traditional messaging systems, along
+with many applications that already use these systems.
+<a href="/documentation/#connect" rel="nofollow">Kafka Connect</a> allows you to continuously ingest data from external
+systems into Kafka, and vice versa.  It is thus
+very easy to integrate existing systems with Kafka. To make this process even easier, there are hundreds of such
+connectors readily available.</p>
+<p>Take a look at the <a href="/documentation/#connect" rel="nofollow">Kafka Connect section</a> in the documentation to
+learn more about how to continuously import/export your data into and out of Kafka.</p>
+
+</div>
+
+<div class="quickstart-step">
+<h4 class="anchor-heading">
+    <a class="anchor-link" id="step-7-process-events" href="#step-7-process-events"></a>
+    <a href="#step-7-process-events">Step 7: Process your events with Kafka Streams</a>
+</h4>
+
+<p>Once your data is stored in Kafka as events, you can process the data with the
+<a href="/documentation/streams" rel="nofollow">Kafka Streams</a> client library for Java/Scala.
+It allows you to implement mission-critical real-time applications and microservices, where the input and/or output data
+is stored in Kafka topics.  Kafka Streams combines the simplicity of writing and deploying standard Java and Scala
+applications on the client side with the benefits of Kafka's server-side cluster technology to make these applications
+highly scalable, elastic, fault-tolerant, and distributed. The library supports exactly-once processing, stateful
+operations and aggregations, windowing, joins, processing based on event-time, and much more.</p>
+<p>To give you a first taste, here's how one would implement the popular <code>WordCount</code> algorithm:</p>
+<pre class="line-numbers"><code class="language-java">KStream<String, String> textLines = builder.stream("quickstart-events");
+
+KTable<String, Long> wordCounts = textLines
+            .flatMapValues(line -> Arrays.asList(line.toLowerCase().split(" ")))
+            .groupBy((keyIgnored, word) -> word)
+            .count();
+
+wordCounts.toStream().to("output-topic"), Produced.with(Serdes.String(), Serdes.Long()));</code></pre>
+<p>The <a href="/25/documentation/streams/quickstart" rel="nofollow">Kafka Streams demo</a> and the
+<a href="/25/documentation/streams/tutorial" rel="nofollow">app development tutorial</a> demonstrate how to code and run
+such a streaming application from start to finish.</p>
+
+</div>
+
+<div class="quickstart-step">
+<h4 class="anchor-heading">
+    <a class="anchor-link" id="step-8-terminate" href="#step-8-terminate"></a>
+    <a href="#step-8-terminate">Step 8: Terminate the Kafka environment</a>
+</h4>
+<p>Now that you reached the end of the quickstart, feel free to tear down the Kafka environment—or continue playing around.</p>
+<p>Run the following command to tear down the environment, which also deletes any events you have created along the way:</p>
+<pre class="line-numbers"><code class="language-bash">$ docker-compose down</code></pre>
+
+</div>
+
+<div class="quickstart-step">
+<h4 class="anchor-heading">
+    <a class="anchor-link" id="quickstart_kafkacongrats" href="#quickstart_kafkacongrats"></a>
+    <a href="#quickstart_kafkacongrats">Congratulations!</a>
+  </h4>
+  
+  <p>You have successfully finished the Apache Kafka quickstart.<div>
+  
+  <p>To learn more, we suggest the following next steps:</p>
+  
+  <ul>
+      <li>
+          Read through the brief <a href="/intro">Introduction</a> to learn how Kafka works at a high level, its
+          main concepts, and how it compares to other technologies. To understand Kafka in more detail, head over to the
+          <a href="/documentation/">Documentation</a>.
+      </li>
+      <li>
+          Browse through the <a href="/powered-by">Use Cases</a> to learn how other users in our world-wide
+          community are getting value out of Kafka.
+      </li>
+      <!--
+      <li>
+          Learn how _Kafka compares to other technologies_ [note to design team: this new page is not yet written] you might be familiar with.
+      </li>
+      -->
+      <li>
+          Join a <a href="/events">local Kafka meetup group</a> and
+          <a href="https://kafka-summit.org/past-events/">watch talks from Kafka Summit</a>,
+          the main conference of the Kafka community.
+      </li>
+  </ul>
+</div>
+</script>
+
+<div class="p-quickstart-docker"></div>
diff --git a/docs/quickstart-zookeeper.html b/docs/quickstart-zookeeper.html
new file mode 100644
index 0000000..32478d6
--- /dev/null
+++ b/docs/quickstart-zookeeper.html
@@ -0,0 +1,277 @@
+<!--
+ Licensed to the Apache Software Foundation (ASF) under one or more
+ contributor license agreements.  See the NOTICE file distributed with
+ this work for additional information regarding copyright ownership.
+ The ASF licenses this file to You under the Apache License, Version 2.0
+ (the "License"); you may not use this file except in compliance with
+ the License.  You may obtain a copy of the License at
+
+    http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+-->
+
+<script>
+  <!--#include virtual="js/templateData.js" -->
+</script>
+
+<script id="quickstart-template" type="text/x-handlebars-template">
+
+      <div class="quickstart-step">
+      <h4 class="anchor-heading">
+          <a class="anchor-link" id="quickstart_download" href="#quickstart_download"></a>
+          <a href="#quickstart_download">Step 1: Get Kafka</a>
+      </h4>
+
+      <p>
+          <a href="https://www.apache.org/dyn/closer.cgi?path=/kafka/2.6.0/kafka_2.13-2.6.0.tgz">Download</a>
+          the latest Kafka release and extract it:
+      </p>
+
+<pre class="line-numbers"><code class="language-bash">$ tar -xzf kafka_2.13-2.6.0.tgz
+$ cd kafka_2.13-2.6.0</code></pre>
+  </div>
+
+  <div class="quickstart-step">
+      <h4 class="anchor-heading">
+          <a class="anchor-link" id="quickstart_startserver" href="#quickstart_startserver"></a>
+          <a href="#quickstart_startserver">Step 2: Start the Kafka environment</a>
+      </h4>
+
+      <p class="note">
+        NOTE: Your local environment must have Java 8+ installed.
+      </p>
+
+      <p>
+          Run the following commands in order to start all services in the correct order:
+      </p>
+
+<pre class="line-numbers"><code class="language-bash"># Start the ZooKeeper service
+# Note: Soon, ZooKeeper will no longer be required by Apache Kafka.
+$ bin/zookeeper-server-start.sh config/zookeeper.properties</code></pre>
+
+      <p>
+          Open another terminal session and run:
+      </p>
+
+<pre class="line-numbers"><code class="language-bash"># Start the Kafka broker service
+$ bin/kafka-server-start.sh config/server.properties</code></pre>
+
+      <p>
+          Once all services have successfully launched, you will have a basic Kafka environment running and ready to use.
+      </p>
+  </div>
+
+  <div class="quickstart-step">
+      <h4 class="anchor-heading">
+          <a class="anchor-link" id="quickstart_createtopic" href="#quickstart_createtopic"></a>
+          <a href="#quickstart_createtopic">Step 3: Create a topic to store your events</a>
+      </h4>
+
+      <p>
+          Kafka is a distributed <em>event streaming platform</em> that lets you read, write, store, and process
+          <a href="/documentation/#messages"><em>events</em></a> (also called <em>records</em> or
+          <em>messages</em> in the documentation)
+          across many machines.
+      </p>
+
+      <p>
+          Example events are payment transactions, geolocation updates from mobile phones, shipping orders, sensor measurements
+          from IoT devices or medical equipment, and much more. These events are organized and stored in
+          <a href="/documentation/#intro_topics"><em>topics</em></a>.
+          Very simplified, a topic is similar to a folder in a filesystem, and the events are the files in that folder.
+      </p>
+
+      <p>
+          So before you can write your first events, you must create a topic.  Open another terminal session and run:
+      </p>
+
+<pre class="line-numbers"><code class="language-bash">$ bin/kafka-topics.sh --create --topic quickstart-events --bootstrap-server localhost:9092</code></pre>
+
+      <p>
+          All of Kafka's command line tools have additional options: run the <code>kafka-topics.sh</code> command without any
+          arguments to display usage information. For example, it can also show you
+          <a href="/documentation/#intro_topics">details such as the partition count</a>
+          of the new topic:
+      </p>
+
+<pre class="line-numbers"><code class="language-bash">$ bin/kafka-topics.sh --describe --topic quickstart-events --bootstrap-server localhost:9092
+Topic:quickstart-events  PartitionCount:1    ReplicationFactor:1 Configs:
+    Topic: quickstart-events Partition: 0    Leader: 0   Replicas: 0 Isr: 0</code></pre>
+  </div>
+
+  <div class="quickstart-step">
+      <h4 class="anchor-heading">
+          <a class="anchor-link" id="quickstart_send" href="#quickstart_send"></a>
+          <a href="#quickstart_send">Step 4: Write some events into the topic</a>
+      </h4>
+
+      <p>
+          A Kafka client communicates with the Kafka brokers via the network for writing (or reading) events.
+          Once received, the brokers will store the events in a durable and fault-tolerant manner for as long as you
+          need—even forever.
+      </p>
+
+      <p>
+          Run the console producer client to write a few events into your topic.
+          By default, each line you enter will result in a separate event being written to the topic.
+      </p>
+
+<pre class="line-numbers"><code class="language-bash">$ bin/kafka-console-producer.sh --topic quickstart-events --bootstrap-server localhost:9092
+This is my first event
+This is my second event</code></pre>
+
+      <p>
+          You can stop the producer client with <code>Ctrl-C</code> at any time.
+      </p>
+  </div>
+
+  <div class="quickstart-step">
+      <h4 class="anchor-heading">
+          <a class="anchor-link" id="quickstart_consume" href="#quickstart_consume"></a>
+          <a href="#quickstart_consume">Step 5: Read the events</a>
+      </h4>
+
+  <p>Open another terminal session and run the console consumer client to read the events you just created:</p>
+
+<pre class="line-numbers"><code class="language-bash">$ bin/kafka-console-consumer.sh --topic quickstart-events --from-beginning --bootstrap-server localhost:9092
+This is my first event
+This is my second event</code></pre>
+
+  <p>You can stop the consumer client with <code>Ctrl-C</code> at any time.</p>
+
+  <p>Feel free to experiment: for example, switch back to your producer terminal (previous step) to write
+  additional events, and see how the events immediately show up in your consumer terminal.</p>
+
+  <p>Because events are durably stored in Kafka, they can be read as many times and by as many consumers as you want.
+  You can easily verify this by opening yet another terminal session and re-running the previous command again.</p>
+  </div>
+
+  <div class="quickstart-step">
+  <h4 class="anchor-heading">
+      <a class="anchor-link" id="quickstart_kafkaconnect" href="#quickstart_kafkaconnect"></a>
+      <a href="#quickstart_kafkaconnect">Step 6: Import/export your data as streams of events with Kafka Connect</a>
+  </h4>
+
+  <p>
+      You probably have lots of data in existing systems like relational databases or traditional messaging systems,
+      along with many applications that already use these systems.
+      <a href="/documentation/#connect">Kafka Connect</a> allows you to continuously ingest
+      data from external systems into Kafka, and vice versa.  It is thus very easy to integrate existing systems with
+      Kafka. To make this process even easier, there are hundreds of such connectors readily available.
+  </p>
+
+  <p>Take a look at the <a href="/documentation/#connect">Kafka Connect section</a>
+  learn more about how to continuously import/export your data into and out of Kafka.</p>
+
+  </div>
+
+  <div class="quickstart-step">
+  <h4 class="anchor-heading">
+      <a class="anchor-link" id="quickstart_kafkastreams" href="#quickstart_kafkastreams"></a>
+      <a href="#quickstart_kafkastreams">Step 7: Process your events with Kafka Streams</a>
+  </h4>
+
+  <p>
+      Once your data is stored in Kafka as events, you can process the data with the
+      <a href="/documentation/streams">Kafka Streams</a> client library for Java/Scala.
+      It allows you to implement mission-critical real-time applications and microservices, where the input
+      and/or output data is stored in Kafka topics.  Kafka Streams combines the simplicity of writing and deploying
+      standard Java and Scala applications on the client side with the benefits of Kafka's server-side cluster
+      technology to make these applications highly scalable, elastic, fault-tolerant, and distributed. The library
+      supports exactly-once processing, stateful operations and aggregations, windowing, joins, processing based
+      on event-time, and much more.
+  </p>
+
+  <p>To give you a first taste, here's how one would implement the popular <code>WordCount</code> algorithm:</p>
+
+<pre class="line-numbers"><code class="language-bash">KStream&lt;String, String&gt; textLines = builder.stream("quickstart-events");
+
+KTable&lt;String, Long&gt; wordCounts = textLines
+            .flatMapValues(line -&gt; Arrays.asList(line.toLowerCase().split(" ")))
+            .groupBy((keyIgnored, word) -&gt; word)
+            .count();
+
+wordCounts.toStream().to("output-topic"), Produced.with(Serdes.String(), Serdes.Long()));</code></pre>
+
+  <p>
+      The <a href="/25/documentation/streams/quickstart">Kafka Streams demo</a>
+      and the <a href="/25/documentation/streams/tutorial">app development tutorial</a> 
+      demonstrate how to code and run such a streaming application from start to finish.
+  </p>
+
+  </div>
+
+  <div class="quickstart-step">
+  <h4 class="anchor-heading">
+      <a class="anchor-link" id="quickstart_kafkaterminate" href="#quickstart_kafkaterminate"></a>
+      <a href="#quickstart_kafkaterminate">Step 8: Terminate the Kafka environment</a>
+  </h4>
+
+  <p>
+      Now that you reached the end of the quickstart, feel free to tear down the Kafka environment—or
+      continue playing around.
+  </p>
+
+  <ol>
+      <li>
+          Stop the producer and consumer clients with <code>Ctrl-C</code>, if you haven't done so already.
+      </li>
+      <li>
+          Stop the Kafka broker with <code>Ctrl-C</code>.
+      </li>
+      <li>
+          Lastly, stop the ZooKeeper server with <code>Ctrl-C</code>.
+      </li>
+  </ol>
+
+  <p>
+      If you also want to delete any data of your local Kafka environment including any events you have created
+      along the way, run the command:
+  </p>
+
+<pre class="line-numbers"><code class="language-bash">$ rm -rf /tmp/kafka-logs /tmp/zookeeper</code></pre>
+
+  </div>
+
+  <div class="quickstart-step">
+  <h4 class="anchor-heading">
+      <a class="anchor-link" id="quickstart_kafkacongrats" href="#quickstart_kafkacongrats"></a>
+      <a href="#quickstart_kafkacongrats">Congratulations!</a>
+    </h4>
+
+    <p>You have successfully finished the Apache Kafka quickstart.<div>
+
+    <p>To learn more, we suggest the following next steps:</p>
+
+    <ul>
+        <li>
+            Read through the brief <a href="/intro">Introduction</a>
+            to learn how Kafka works at a high level, its main concepts, and how it compares to other
+            technologies. To understand Kafka in more detail, head over to the
+            <a href="/documentation/">Documentation</a>.
+        </li>
+        <li>
+            Browse through the <a href="/powered-by">Use Cases</a> to learn how 
+            other users in our world-wide community are getting value out of Kafka.
+        </li>
+        <!--
+        <li>
+            Learn how _Kafka compares to other technologies_ you might be familiar with.
+            [note to design team: this new page is not yet written] 
+        </li>
+        -->
+        <li>
+            Join a <a href="/events">local Kafka meetup group</a> and
+            <a href="https://kafka-summit.org/past-events/">watch talks from Kafka Summit</a>,
+            the main conference of the Kafka community.
+        </li>
+    </ul>
+  </div>
+</script>
+
+<div class="p-quickstart"></div>
diff --git a/docs/quickstart.html b/docs/quickstart.html
deleted file mode 100644
index 0a56e00..0000000
--- a/docs/quickstart.html
+++ /dev/null
@@ -1,300 +0,0 @@
-<!--
- Licensed to the Apache Software Foundation (ASF) under one or more
- contributor license agreements.  See the NOTICE file distributed with
- this work for additional information regarding copyright ownership.
- The ASF licenses this file to You under the Apache License, Version 2.0
- (the "License"); you may not use this file except in compliance with
- the License.  You may obtain a copy of the License at
-
-    http://www.apache.org/licenses/LICENSE-2.0
-
- Unless required by applicable law or agreed to in writing, software
- distributed under the License is distributed on an "AS IS" BASIS,
- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- See the License for the specific language governing permissions and
- limitations under the License.
--->
-
-<script><!--#include virtual="js/templateData.js" --></script>
-
-<script id="quickstart-template" type="text/x-handlebars-template">
-<p>
-This tutorial assumes you are starting fresh and have no existing Kafka or ZooKeeper data.
-Since Kafka console scripts are different for Unix-based and Windows platforms, on Windows platforms use <code>bin\windows\</code> instead of <code>bin/</code>, and change the script extension to <code>.bat</code>.
-</p>
-
-<h4><a id="quickstart_download" href="#quickstart_download">Step 1: Download the code</a></h4>
-
-<a href="https://www.apache.org/dyn/closer.cgi?path=/kafka/{{fullDotVersion}}/kafka_{{scalaVersion}}-{{fullDotVersion}}.tgz" title="Kafka downloads">Download</a> the {{fullDotVersion}} release and un-tar it.
-
-<pre class="brush: bash;">
-&gt; tar -xzf kafka_{{scalaVersion}}-{{fullDotVersion}}.tgz
-&gt; cd kafka_{{scalaVersion}}-{{fullDotVersion}}
-</pre>
-
-<h4><a id="quickstart_startserver" href="#quickstart_startserver">Step 2: Start the server</a></h4>
-
-<p>
-Kafka uses <a href="https://zookeeper.apache.org/">ZooKeeper</a> so you need to first start a ZooKeeper server if you don't already have one. You can use the convenience script packaged with kafka to get a quick-and-dirty single-node ZooKeeper instance.
-</p>
-
-<pre class="brush: bash;">
-&gt; bin/zookeeper-server-start.sh config/zookeeper.properties
-[2013-04-22 15:01:37,495] INFO Reading configuration from: config/zookeeper.properties (org.apache.zookeeper.server.quorum.QuorumPeerConfig)
-...
-</pre>
-
-<p>Now start the Kafka server:</p>
-<pre class="brush: bash;">
-&gt; bin/kafka-server-start.sh config/server.properties
-[2013-04-22 15:01:47,028] INFO Verifying properties (kafka.utils.VerifiableProperties)
-[2013-04-22 15:01:47,051] INFO Property socket.send.buffer.bytes is overridden to 1048576 (kafka.utils.VerifiableProperties)
-...
-</pre>
-
-<h4><a id="quickstart_createtopic" href="#quickstart_createtopic">Step 3: Create a topic</a></h4>
-
-<p>Let's create a topic named "test" with a single partition and only one replica:</p>
-<pre class="brush: bash;">
-&gt; bin/kafka-topics.sh --create --bootstrap-server localhost:9092 --replication-factor 1 --partitions 1 --topic test
-</pre>
-
-<p>We can now see that topic if we run the list topic command:</p>
-<pre class="brush: bash;">
-&gt; bin/kafka-topics.sh --list --bootstrap-server localhost:9092
-test
-</pre>
-<p>Alternatively, instead of manually creating topics you can also configure your brokers to auto-create topics when a non-existent topic is published to.</p>
-
-<h4><a id="quickstart_send" href="#quickstart_send">Step 4: Send some messages</a></h4>
-
-<p>Kafka comes with a command line client that will take input from a file or from standard input and send it out as messages to the Kafka cluster. By default, each line will be sent as a separate message.</p>
-<p>
-Run the producer and then type a few messages into the console to send to the server.</p>
-
-<pre class="brush: bash;">
-&gt; bin/kafka-console-producer.sh --bootstrap-server localhost:9092 --topic test
-This is a message
-This is another message
-</pre>
-
-<h4><a id="quickstart_consume" href="#quickstart_consume">Step 5: Start a consumer</a></h4>
-
-<p>Kafka also has a command line consumer that will dump out messages to standard output.</p>
-
-<pre class="brush: bash;">
-&gt; bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic test --from-beginning
-This is a message
-This is another message
-</pre>
-<p>
-If you have each of the above commands running in a different terminal then you should now be able to type messages into the producer terminal and see them appear in the consumer terminal.
-</p>
-<p>
-All of the command line tools have additional options; running the command with no arguments will display usage information documenting them in more detail.
-</p>
-
-<h4><a id="quickstart_multibroker" href="#quickstart_multibroker">Step 6: Setting up a multi-broker cluster</a></h4>
-
-<p>So far we have been running against a single broker, but that's no fun. For Kafka, a single broker is just a cluster of size one, so nothing much changes other than starting a few more broker instances. But just to get feel for it, let's expand our cluster to three nodes (still all on our local machine).</p>
-<p>
-First we make a config file for each of the brokers (on Windows use the <code>copy</code> command instead):
-</p>
-<pre class="brush: bash;">
-&gt; cp config/server.properties config/server-1.properties
-&gt; cp config/server.properties config/server-2.properties
-</pre>
-
-<p>
-Now edit these new files and set the following properties:
-</p>
-<pre class="brush: text;">
-
-config/server-1.properties:
-    broker.id=1
-    listeners=PLAINTEXT://:9093
-    log.dirs=/tmp/kafka-logs-1
-
-config/server-2.properties:
-    broker.id=2
-    listeners=PLAINTEXT://:9094
-    log.dirs=/tmp/kafka-logs-2
-</pre>
-<p>The <code>broker.id</code> property is the unique and permanent name of each node in the cluster. We have to override the port and log directory only because we are running these all on the same machine and we want to keep the brokers from all trying to register on the same port or overwrite each other's data.</p>
-<p>
-We already have Zookeeper and our single node started, so we just need to start the two new nodes:
-</p>
-<pre class="brush: bash;">
-&gt; bin/kafka-server-start.sh config/server-1.properties &amp;
-...
-&gt; bin/kafka-server-start.sh config/server-2.properties &amp;
-...
-</pre>
-
-<p>Now create a new topic with a replication factor of three:</p>
-<pre class="brush: bash;">
-&gt; bin/kafka-topics.sh --create --bootstrap-server localhost:9092 --replication-factor 3 --partitions 1 --topic my-replicated-topic
-</pre>
-
-<p>Okay but now that we have a cluster how can we know which broker is doing what? To see that run the "describe topics" command:</p>
-<pre class="brush: bash;">
-&gt; bin/kafka-topics.sh --describe --bootstrap-server localhost:9092 --topic my-replicated-topic
-Topic:my-replicated-topic	PartitionCount:1	ReplicationFactor:3	Configs:
-	Topic: my-replicated-topic	Partition: 0	Leader: 1	Replicas: 1,2,0	Isr: 1,2,0
-</pre>
-<p>Here is an explanation of output. The first line gives a summary of all the partitions, each additional line gives information about one partition. Since we have only one partition for this topic there is only one line.</p>
-<ul>
-  <li>"leader" is the node responsible for all reads and writes for the given partition. Each node will be the leader for a randomly selected portion of the partitions.
-  <li>"replicas" is the list of nodes that replicate the log for this partition regardless of whether they are the leader or even if they are currently alive.
-  <li>"isr" is the set of "in-sync" replicas. This is the subset of the replicas list that is currently alive and caught-up to the leader.
-</ul>
-<p>Note that in my example node 1 is the leader for the only partition of the topic.</p>
-<p>
-We can run the same command on the original topic we created to see where it is:
-</p>
-<pre class="brush: bash;">
-&gt; bin/kafka-topics.sh --describe --bootstrap-server localhost:9092 --topic test
-Topic:test	PartitionCount:1	ReplicationFactor:1	Configs:
-	Topic: test	Partition: 0	Leader: 0	Replicas: 0	Isr: 0
-</pre>
-<p>So there is no surprise there&mdash;the original topic has no replicas and is on server 0, the only server in our cluster when we created it.</p>
-<p>
-Let's publish a few messages to our new topic:
-</p>
-<pre class="brush: bash;">
-&gt; bin/kafka-console-producer.sh --bootstrap-server localhost:9092 --topic my-replicated-topic
-...
-my test message 1
-my test message 2
-^C
-</pre>
-<p>Now let's consume these messages:</p>
-<pre class="brush: bash;">
-&gt; bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --from-beginning --topic my-replicated-topic
-...
-my test message 1
-my test message 2
-^C
-</pre>
-
-<p>Now let's test out fault-tolerance. Broker 1 was acting as the leader so let's kill it:</p>
-<pre class="brush: bash;">
-&gt; ps aux | grep server-1.properties
-7564 ttys002    0:15.91 /System/Library/Frameworks/JavaVM.framework/Versions/1.8/Home/bin/java...
-&gt; kill -9 7564
-</pre>
-
-On Windows use:
-<pre class="brush: bash;">
-&gt; wmic process where "caption = 'java.exe' and commandline like '%server-1.properties%'" get processid
-ProcessId
-6016
-&gt; taskkill /pid 6016 /f
-</pre>
-
-<p>Leadership has switched to one of the followers and node 1 is no longer in the in-sync replica set:</p>
-
-<pre class="brush: bash;">
-&gt; bin/kafka-topics.sh --describe --bootstrap-server localhost:9092 --topic my-replicated-topic
-Topic:my-replicated-topic	PartitionCount:1	ReplicationFactor:3	Configs:
-	Topic: my-replicated-topic	Partition: 0	Leader: 2	Replicas: 1,2,0	Isr: 2,0
-</pre>
-<p>But the messages are still available for consumption even though the leader that took the writes originally is down:</p>
-<pre class="brush: bash;">
-&gt; bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --from-beginning --topic my-replicated-topic
-...
-my test message 1
-my test message 2
-^C
-</pre>
-
-
-<h4><a id="quickstart_kafkaconnect" href="#quickstart_kafkaconnect">Step 7: Use Kafka Connect to import/export data</a></h4>
-
-<p>Reading data from the console and writing it back to the console is a convenient place to start, but you'll probably want
-to use data from other sources or export data from Kafka to other systems. For many systems, instead of writing custom
-integration code you can use Kafka Connect to import or export data.</p>
-
-<p>Kafka Connect is a tool included with Kafka that imports and exports data to Kafka. It is an extensible tool that runs
-<i>connectors</i>, which implement the custom logic for interacting with an external system. In this quickstart we'll see
-how to run Kafka Connect with simple connectors that import data from a file to a Kafka topic and export data from a
-Kafka topic to a file.</p>
-
-<p>First, we'll start by creating some seed data to test with:</p>
-
-<pre class="brush: bash;">
-&gt; echo -e "foo\nbar" > test.txt
-</pre>
-Or on Windows:
-<pre class="brush: bash;">
-&gt; echo foo> test.txt
-&gt; echo bar>> test.txt
-</pre>
-
-<p>Next, we'll start two connectors running in <i>standalone</i> mode, which means they run in a single, local, dedicated
-process. We provide three configuration files as parameters. The first is always the configuration for the Kafka Connect
-process, containing common configuration such as the Kafka brokers to connect to and the serialization format for data.
-The remaining configuration files each specify a connector to create. These files include a unique connector name, the connector
-class to instantiate, and any other configuration required by the connector.</p>
-
-<pre class="brush: bash;">
-&gt; bin/connect-standalone.sh config/connect-standalone.properties config/connect-file-source.properties config/connect-file-sink.properties
-</pre>
-
-<p>
-These sample configuration files, included with Kafka, use the default local cluster configuration you started earlier
-and create two connectors: the first is a source connector that reads lines from an input file and produces each to a Kafka topic
-and the second is a sink connector that reads messages from a Kafka topic and produces each as a line in an output file.
-</p>
-
-<p>
-During startup you'll see a number of log messages, including some indicating that the connectors are being instantiated.
-Once the Kafka Connect process has started, the source connector should start reading lines from <code>test.txt</code> and
-producing them to the topic <code>connect-test</code>, and the sink connector should start reading messages from the topic <code>connect-test</code>
-and write them to the file <code>test.sink.txt</code>. We can verify the data has been delivered through the entire pipeline
-by examining the contents of the output file:
-</p>
-
-
-<pre class="brush: bash;">
-&gt; more test.sink.txt
-foo
-bar
-</pre>
-
-<p>
-Note that the data is being stored in the Kafka topic <code>connect-test</code>, so we can also run a console consumer to see the
-data in the topic (or use custom consumer code to process it):
-</p>
-
-
-<pre class="brush: bash;">
-&gt; bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic connect-test --from-beginning
-{"schema":{"type":"string","optional":false},"payload":"foo"}
-{"schema":{"type":"string","optional":false},"payload":"bar"}
-...
-</pre>
-
-<p>The connectors continue to process data, so we can add data to the file and see it move through the pipeline:</p>
-
-<pre class="brush: bash;">
-&gt; echo Another line>> test.txt
-</pre>
-
-<p>You should see the line appear in the console consumer output and in the sink file.</p>
-
-<h4><a id="quickstart_kafkastreams" href="#quickstart_kafkastreams">Step 8: Use Kafka Streams to process data</a></h4>
-
-<p>
-  Kafka Streams is a client library for building mission-critical real-time applications and microservices,
-  where the input and/or output data is stored in Kafka clusters. Kafka Streams combines the simplicity of
-  writing and deploying standard Java and Scala applications on the client side with the benefits of Kafka's
-  server-side cluster technology to make these applications highly scalable, elastic, fault-tolerant, distributed,
-  and much more. This <a href="/{{version}}/documentation/streams/quickstart">quickstart example</a> will demonstrate how
-  to run a streaming application coded in this library. 
-</p>
-
-
-</script>
-
-<div class="p-quickstart"></div>
diff --git a/docs/security.html b/docs/security.html
index f70e13f..ed88776 100644
--- a/docs/security.html
+++ b/docs/security.html
@@ -16,7 +16,7 @@
 -->
 
 <script id="security-template" type="text/x-handlebars-template">
-    <h3><a id="security_overview" href="#security_overview">7.1 Security Overview</a></h3>
+    <h3 class="anchor-heading"><a id="security_overview" class="anchor-link"></a><a href="#security_overview">7.1 Security Overview</a></h3>
     In release 0.9.0.0, the Kafka community added a number of features that, used either separately or together, increases security in a Kafka cluster. The following security measures are currently supported:
     <ol>
         <li>Authentication of connections to brokers from clients (producers and consumers), other brokers and tools, using either SSL or SASL. Kafka supports the following SASL mechanisms:
@@ -36,20 +36,18 @@
 
     The guides below explain how to configure and use the security features in both clients and brokers.
 
-    <h3><a id="security_ssl" href="#security_ssl">7.2 Encryption and Authentication using SSL</a></h3>
+    <h3 class="anchor-heading"><a id="security_ssl" class="anchor-link"></a><a href="#security_ssl">7.2 Encryption and Authentication using SSL</a></h3>
     Apache Kafka allows clients to use SSL for encryption of traffic as well as authentication. By default, SSL is disabled but can be turned on if needed.
     The following paragraphs explain in detail how to set up your own PKI infrastructure, use it to create certificates and configure Kafka to use these.
 
     <ol>
-        <li><h4><a id="security_ssl_key" href="#security_ssl_key">Generate SSL key and certificate for each Kafka broker</a></h4>
+        <li><h4 class="anchor-heading"><a id="security_ssl_key" class="anchor-link"></a><a href="#security_ssl_key">Generate SSL key and certificate for each Kafka broker</a></h4>
             The first step of deploying one or more brokers with SSL support is to generate a public/private keypair for every server.
             Since Kafka expects all keys and certificates to be stored in keystores we will use Java's keytool command for this task.
             The tool supports two different keystore formats, the Java specific jks format which has been deprecated by now, as well as PKCS12.
             PKCS12 is the default format as of Java version 9, to ensure this format is being used regardless of the Java version in use all following
             commands explicitly specify the PKCS12 format.
-            <pre class="brush: bash;">
-                keytool -keystore {keystorefile} -alias localhost -validity {validity} -genkey -keyalg RSA -storetype pkcs12
-            </pre>
+            <pre class="line-numbers"><code class="language-bash">                keytool -keystore {keystorefile} -alias localhost -validity {validity} -genkey -keyalg RSA -storetype pkcs12</code></pre>
             You need to specify two parameters in the above command:
             <ol>
                 <li>keystorefile: the keystore file that stores the keys (and later the certificate) for this broker. The keystore file contains the private
@@ -65,9 +63,7 @@
             authentication purposes.<br>
             To generate certificate signing requests run the following command for all server keystores created so far.
 
-            <pre class="brush: bash;">
-                keytool -keystore server.keystore.jks -alias localhost -validity {validity} -genkey -keyalg RSA -destkeystoretype pkcs12 -ext SAN=DNS:{FQDN},IP:{IPADDRESS1}
-            </pre>
+            <pre class="line-numbers"><code class="language-bash">                keytool -keystore server.keystore.jks -alias localhost -validity {validity} -genkey -keyalg RSA -destkeystoretype pkcs12 -ext SAN=DNS:{FQDN},IP:{IPADDRESS1}</code></pre>
             This command assumes that you want to add hostname information to the certificate, if this is not the case, you can omit the extension parameter <code>-ext SAN=DNS:{FQDN},IP:{IPADDRESS1}</code>. Please see below for more information on this.
 
             <h5>Host Name Verification</h5>
@@ -80,9 +76,7 @@
             Server host name verification may be disabled by setting <code>ssl.endpoint.identification.algorithm</code> to an empty string.<br>
             For dynamically configured broker listeners, hostname verification may be disabled using <code>kafka-configs.sh</code>:<br>
 
-            <pre class="brush: text;">
-                bin/kafka-configs.sh --bootstrap-server localhost:9093 --entity-type brokers --entity-name 0 --alter --add-config "listener.name.internal.ssl.endpoint.identification.algorithm="
-            </pre>
+            <pre class="line-numbers"><code class="language-text">                bin/kafka-configs.sh --bootstrap-server localhost:9093 --entity-type brokers --entity-name 0 --alter --add-config "listener.name.internal.ssl.endpoint.identification.algorithm="</code></pre>
 
             <p><b>Note:</b></p>
             Normally there is no good reason to disable hostname verification apart from being the quickest way to "just get it to work" followed
@@ -105,12 +99,10 @@
 
 
             To add a SAN field append the following argument <code> -ext SAN=DNS:{FQDN},IP:{IPADDRESS} </code> to the keytool command:
-            <pre class="brush: bash;">
-                keytool -keystore server.keystore.jks -alias localhost -validity {validity} -genkey -keyalg RSA -destkeystoretype pkcs12 -ext SAN=DNS:{FQDN},IP:{IPADDRESS1}
-            </pre>
+            <pre class="line-numbers"><code class="language-bash">                keytool -keystore server.keystore.jks -alias localhost -validity {validity} -genkey -keyalg RSA -destkeystoretype pkcs12 -ext SAN=DNS:{FQDN},IP:{IPADDRESS1}</code></pre>
         </li>
 
-        <li><h4><a id="security_ssl_ca" href="#security_ssl_ca">Creating your own CA</a></h4>
+        <li><h4 class="anchor-heading"><a id="security_ssl_ca" class="anchor-link"></a><a href="#security_ssl_ca">Creating your own CA</a></h4>
             After this step each machine in the cluster has a public/private key pair which can already be used to encrypt traffic and a certificate
             signing request, which is the basis for creating a certificate. To add authentication capabilities this signing request needs to be signed
             by a trusted authority, which will be created in this step.
@@ -131,8 +123,7 @@
             CA keypair.<br>
 
             Save the following listing into a file called openssl-ca.cnf and adjust the values for validity and common attributes as necessary.
-            <pre class="brush: bash">
-HOME            = .
+            <pre class="line-numbers"><code class="language-bash">HOME            = .
 RANDFILE        = $ENV::HOME/.rnd
 
 ####################################################################
@@ -212,39 +203,30 @@ emailAddress           = optional
 subjectKeyIdentifier   = hash
 authorityKeyIdentifier = keyid,issuer
 basicConstraints       = CA:FALSE
-keyUsage               = digitalSignature, keyEncipherment
-            </pre>
+keyUsage               = digitalSignature, keyEncipherment</code></pre>
 
             Then create a database and serial number file, these will be used to keep track of which certificates were signed with this CA. Both of
             these are simply text files that reside in the same directory as your CA keys.
 
-            <pre class="brush: bash;">
-                echo 01 > serial.txt
-                touch index.txt
-            </pre>
+            <pre class="line-numbers"><code class="language-bash">                echo 01 > serial.txt
+                touch index.txt</code></pre>
 
             With these steps done you are now ready to generate your CA that will be used to sign certificates later.
 
-            <pre class="brush: bash;">
-            openssl req -x509 -config openssl-ca.cnf -newkey rsa:4096 -sha256 -nodes -out cacert.pem -outform PEM
-            </pre>
+            <pre class="line-numbers"><code class="language-bash">            openssl req -x509 -config openssl-ca.cnf -newkey rsa:4096 -sha256 -nodes -out cacert.pem -outform PEM</code></pre>
 
             The CA is simply a public/private key pair and certificate that is signed by itself, and is only intended to sign other certificates.<br>
             This keypair should be kept very safe, if someone gains access to it, they can create and sign certificates that will be trusted by your
             infrastructure, which means they will be able to impersonate anybody when connecting to any service that trusts this CA.<br>
 
             The next step is to add the generated CA to the **clients' truststore** so that the clients can trust this CA:
-            <pre class="brush: bash;">
-                keytool -keystore client.truststore.jks -alias CARoot -import -file ca-cert
-            </pre>
+            <pre class="line-numbers"><code class="language-bash">                keytool -keystore client.truststore.jks -alias CARoot -import -file ca-cert</code></pre>
 
             <b>Note:</b>
             If you configure the Kafka brokers to require client authentication by setting ssl.client.auth to be "requested" or "required" in the
             <a href="#brokerconfigs">Kafka brokers config</a> then you must provide a truststore for the Kafka brokers as well and it should have
             all the CA certificates that clients' keys were signed by.
-            <pre class="brush: bash;">
-                keytool -keystore server.truststore.jks -alias CARoot -import -file ca-cert
-            </pre>
+            <pre class="line-numbers"><code class="language-bash">                keytool -keystore server.truststore.jks -alias CARoot -import -file ca-cert</code></pre>
 
             In contrast to the keystore in step 1 that stores each machine's own identity, the truststore of a client stores all the certificates
             that the client should trust. Importing a certificate into one's truststore also means trusting all certificates that are signed by that
@@ -253,17 +235,13 @@ keyUsage               = digitalSignature, keyEncipherment
             in the cluster with a single CA, and have all machines share the same truststore that trusts the CA. That way all machines can authenticate all
             other machines.
         </li>
-        <li><h4><a id="security_ssl_signing" href="#security_ssl_signing">Signing the certificate</a></h4>
+        <li><h4 class="anchor-heading"><a id="security_ssl_signing" class="anchor-link"></a><a href="#security_ssl_signing">Signing the certificate</a></h4>
             Then sign it with the CA:
-            <pre class="brush: bash;">
-                openssl ca -config openssl-ca.cnf -policy signing_policy -extensions signing_req -out {server certificate} -infiles {certificate signing request}
-           </pre>
+            <pre class="line-numbers"><code class="language-bash">                openssl ca -config openssl-ca.cnf -policy signing_policy -extensions signing_req -out {server certificate} -infiles {certificate signing request}</code></pre>
 
             Finally, you need to import both the certificate of the CA and the signed certificate into the keystore:
-            <pre class="brush: bash;">
-                keytool -keystore {keystore} -alias CARoot -import -file {CA certificate}
-                keytool -keystore {keystore} -alias localhost -import -file cert-signed
-            </pre>
+            <pre class="line-numbers"><code class="language-bash">                keytool -keystore {keystore} -alias CARoot -import -file {CA certificate}
+                keytool -keystore {keystore} -alias localhost -import -file cert-signed</code></pre>
 
             The definitions of the parameters are the following:
             <ol>
@@ -291,7 +269,7 @@ keyUsage               = digitalSignature, keyEncipherment
             should be provided in <code>ssl.truststore.certificates</code>. Since PEM is typically stored as multi-line base-64 strings, the configuration value
             can be included in Kafka configuration as multi-line strings with lines terminating in backslash ('\') for line continuation.
 
-            <p>Store password configs <code>ssl.keystore.password</code> and <code>ssl.truststore.password</code> are not used for PEM. 
+            <p>Store password configs <code>ssl.keystore.password</code> and <code>ssl.truststore.password</code> are not used for PEM.
             If private key is encrypted using a password, the key password must be provided in <code>ssl.key.password</code>. Private keys may be provided
             in unencrypted form without a password when PEM is specified directly in the config value. In production deployments, configs should be encrypted or
             externalized using password protection feature in Kafka in this case. Note that the default SSL engine factory has limited capabilities for decryption
@@ -299,7 +277,7 @@ keyUsage               = digitalSignature, keyEncipherment
             custom <code>SslEngineFactory</code> to support a wider range of encrypted private keys.</p>
 
         </li>
-        <li><h4><a id="security_ssl_production" href="#security_ssl_production">Common Pitfalls in Production</a></h4>
+        <li><h4 class="anchor-heading"><a id="security_ssl_production" class="anchor-link"></a><a href="#security_ssl_production">Common Pitfalls in Production</a></h4>
             The above paragraphs show the process to create your own CA and use it to sign certificates for your cluster.
             While very useful for sandbox, dev, test, and similar systems, this is usually not the correct process to create certificates for a production
             cluster in a corporate environment.
@@ -332,28 +310,24 @@ keyUsage               = digitalSignature, keyEncipherment
                     harder for a malicious party to obtain certificates with potentially misleading or fraudulent values.
                     It is adviseable to double check signed certificates, whether these contain all requested SAN fields to enable proper hostname verification.
                     The following command can be used to print certificate details to the console, which should be compared with what was originally requested:
-                    <pre class="brush: bash;">
-                        openssl x509 -in certificate.crt -text -noout
-                    </pre>
+                    <pre class="line-numbers"><code class="language-bash">                        openssl x509 -in certificate.crt -text -noout</code></pre>
                 </li>
             </ol>
         </li>
-        <li><h4><a id="security_configbroker" href="#security_configbroker">Configuring Kafka Brokers</a></h4>
+        <li><h4 class="anchor-heading"><a id="security_configbroker" class="anchor-link"></a><a href="#security_configbroker">Configuring Kafka Brokers</a></h4>
             Kafka Brokers support listening for connections on multiple ports.
             We need to configure the following property in server.properties, which must have one or more comma-separated values:
-            <pre>listeners</pre>
+            <pre>listeners</code></pre>
 
             If SSL is not enabled for inter-broker communication (see below for how to enable it), both PLAINTEXT and SSL ports will be necessary.
-            <pre class="brush: text;">
-            listeners=PLAINTEXT://host.name:port,SSL://host.name:port</pre>
+            <pre class="line-numbers"><code class="language-text">            listeners=PLAINTEXT://host.name:port,SSL://host.name:port</code></pre>
 
             Following SSL configs are needed on the broker side
-            <pre class="brush: text;">
-            ssl.keystore.location=/var/private/ssl/server.keystore.jks
+            <pre class="line-numbers"><code class="language-text">            ssl.keystore.location=/var/private/ssl/server.keystore.jks
             ssl.keystore.password=test1234
             ssl.key.password=test1234
             ssl.truststore.location=/var/private/ssl/server.truststore.jks
-            ssl.truststore.password=test1234</pre>
+            ssl.truststore.password=test1234</code></pre>
 
             Note: ssl.truststore.password is technically optional but highly recommended. If a password is not set access to the truststore is still available, but integrity checking is disabled.
 
@@ -367,8 +341,7 @@ keyUsage               = digitalSignature, keyEncipherment
                 <li>ssl.secure.random.implementation=SHA1PRNG</li>
             </ol>
             If you want to enable SSL for inter-broker communication, add the following to the server.properties file (it defaults to PLAINTEXT)
-            <pre>
-            security.inter.broker.protocol=SSL</pre>
+            <pre class="line-numbers"><code class="language-text">            security.inter.broker.protocol=SSL</code></pre>
 
             <p>
             Due to import regulations in some countries, the Oracle implementation limits the strength of cryptographic algorithms available by default. If stronger algorithms are needed (for example, AES with 256-bit keys), the <a href="http://www.oracle.com/technetwork/java/javase/downloads/index.html">JCE Unlimited Strength Jurisdiction Policy Files</a> must be obtained and installed in the JDK/JRE. See the
@@ -384,35 +357,31 @@ keyUsage               = digitalSignature, keyEncipherment
             </p>
 
             Once you start the broker you should be able to see in the server.log
-            <pre>
-            with addresses: PLAINTEXT -> EndPoint(192.168.64.1,9092,PLAINTEXT),SSL -> EndPoint(192.168.64.1,9093,SSL)</pre>
+            <pre class="line-numbers"><code class="language-text">            with addresses: PLAINTEXT -> EndPoint(192.168.64.1,9092,PLAINTEXT),SSL -> EndPoint(192.168.64.1,9093,SSL)</code></pre>
 
             To check quickly if  the server keystore and truststore are setup properly you can run the following command
-            <pre>openssl s_client -debug -connect localhost:9093 -tls1</pre> (Note: TLSv1 should be listed under ssl.enabled.protocols)<br>
+            <pre>openssl s_client -debug -connect localhost:9093 -tls1</code></pre> (Note: TLSv1 should be listed under ssl.enabled.protocols)<br>
             In the output of this command you should see server's certificate:
-            <pre>
-            -----BEGIN CERTIFICATE-----
+            <pre class="line-numbers"><code class="language-text">            -----BEGIN CERTIFICATE-----
             {variable sized random bytes}
             -----END CERTIFICATE-----
             subject=/C=US/ST=CA/L=Santa Clara/O=org/OU=org/CN=Sriharsha Chintalapani
-            issuer=/C=US/ST=CA/L=Santa Clara/O=org/OU=org/CN=kafka/emailAddress=test@test.com</pre>
+            issuer=/C=US/ST=CA/L=Santa Clara/O=org/OU=org/CN=kafka/emailAddress=test@test.com</code></pre>
             If the certificate does not show up or if there are any other error messages then your keystore is not setup properly.</li>
 
-        <li><h4><a id="security_configclients" href="#security_configclients">Configuring Kafka Clients</a></h4>
+        <li><h4 class="anchor-heading"><a id="security_configclients" class="anchor-link"></a><a href="#security_configclients">Configuring Kafka Clients</a></h4>
             SSL is supported only for the new Kafka Producer and Consumer, the older API is not supported. The configs for SSL will be the same for both producer and consumer.<br>
             If client authentication is not required in the broker, then the following is a minimal configuration example:
-            <pre class="brush: text;">
-            security.protocol=SSL
+            <pre class="line-numbers"><code class="language-text">            security.protocol=SSL
             ssl.truststore.location=/var/private/ssl/client.truststore.jks
-            ssl.truststore.password=test1234</pre>
+            ssl.truststore.password=test1234</code></pre>
 
             Note: ssl.truststore.password is technically optional but highly recommended. If a password is not set access to the truststore is still available, but integrity checking is disabled.
 
             If client authentication is required, then a keystore must be created like in step 1 and the following must also be configured:
-            <pre class="brush: text;">
-            ssl.keystore.location=/var/private/ssl/client.keystore.jks
+            <pre class="line-numbers"><code class="language-text">            ssl.keystore.location=/var/private/ssl/client.keystore.jks
             ssl.keystore.password=test1234
-            ssl.key.password=test1234</pre>
+            ssl.key.password=test1234</code></pre>
 
             Other configuration settings that may also be needed depending on our requirements and the broker configuration:
                 <ol>
@@ -424,15 +393,14 @@ keyUsage               = digitalSignature, keyEncipherment
                 </ol>
     <br>
             Examples using console-producer and console-consumer:
-            <pre class="brush: bash;">
-            kafka-console-producer.sh --bootstrap-server localhost:9093 --topic test --producer.config client-ssl.properties
-            kafka-console-consumer.sh --bootstrap-server localhost:9093 --topic test --consumer.config client-ssl.properties</pre>
+            <pre class="line-numbers"><code class="language-bash">            kafka-console-producer.sh --bootstrap-server localhost:9093 --topic test --producer.config client-ssl.properties
+            kafka-console-consumer.sh --bootstrap-server localhost:9093 --topic test --consumer.config client-ssl.properties</code></pre>
         </li>
     </ol>
-    <h3><a id="security_sasl" href="#security_sasl">7.3 Authentication using SASL</a></h3>
+    <h3 class="anchor-heading"><a id="security_sasl" class="anchor-link"></a><a href="#security_sasl">7.3 Authentication using SASL</a></h3>
 
     <ol>
-    <li><h4><a id="security_sasl_jaasconfig" href="#security_sasl_jaasconfig">JAAS configuration</a></h4>
+    <li><h4 class="anchor-heading"><a id="security_sasl_jaasconfig" class="anchor-link"></a><a href="#security_sasl_jaasconfig">JAAS configuration</a></h4>
     <p>Kafka uses the Java Authentication and Authorization Service
     (<a href="https://docs.oracle.com/javase/8/docs/technotes/guides/security/jaas/JAASRefGuide.html">JAAS</a>)
     for SASL configuration.</p>
@@ -466,15 +434,14 @@ keyUsage               = digitalSignature, keyEncipherment
             login module may be specified in the config value. If multiple mechanisms are configured on a
             listener, configs must be provided for each mechanism using the listener and mechanism prefix.
             For example,
-                    <pre class="brush: text;">
-        listener.name.sasl_ssl.scram-sha-256.sasl.jaas.config=org.apache.kafka.common.security.scram.ScramLoginModule required \
+                    <pre class="line-numbers"><code class="language-text">        listener.name.sasl_ssl.scram-sha-256.sasl.jaas.config=org.apache.kafka.common.security.scram.ScramLoginModule required \
             username="admin" \
             password="admin-secret";
         listener.name.sasl_ssl.plain.sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required \
             username="admin" \
             password="admin-secret" \
             user_admin="admin-secret" \
-            user_alice="alice-secret";</pre>
+            user_alice="alice-secret";</code></pre>
 
             If JAAS configuration is defined at different levels, the order of precedence used is:
             <ul>
@@ -513,7 +480,7 @@ keyUsage               = digitalSignature, keyEncipherment
                 <a href="#security_sasl_scram_clientconfig">SCRAM</a> or
                 <a href="#security_sasl_oauthbearer_clientconfig">OAUTHBEARER</a> for example configurations.</p></li>
 
-                <li><h6><a id="security_client_staticjaas" href="#security_client_staticjaas">JAAS configuration using static config file</a></h6>
+                <li><h6 class="anchor-heading"><a id="security_client_staticjaas" class="anchor-link"></a><a href="#security_client_staticjaas">JAAS configuration using static config file</a></h6>
                 To configure SASL authentication on the clients using static JAAS config file:
                 <ol>
                 <li>Add a JAAS config file with a client login section named <tt>KafkaClient</tt>. Configure
@@ -524,17 +491,16 @@ keyUsage               = digitalSignature, keyEncipherment
                     <a href="#security_sasl_oauthbearer_clientconfig">OAUTHBEARER</a>.
                     For example, <a href="#security_sasl_gssapi_clientconfig">GSSAPI</a>
                     credentials may be configured as:
-                    <pre class="brush: text;">
-        KafkaClient {
+                    <pre class="line-numbers"><code class="language-text">        KafkaClient {
         com.sun.security.auth.module.Krb5LoginModule required
         useKeyTab=true
         storeKey=true
         keyTab="/etc/security/keytabs/kafka_client.keytab"
         principal="kafka-client-1@EXAMPLE.COM";
-    };</pre>
+    };</code></pre>
                 </li>
                 <li>Pass the JAAS config file location as JVM parameter to each client JVM. For example:
-                    <pre class="brush: bash;">    -Djava.security.auth.login.config=/etc/kafka/kafka_client_jaas.conf</pre></li>
+                    <pre class="language-bash">    -Djava.security.auth.login.config=/etc/kafka/kafka_client_jaas.conf</code></pre></li>
                 </ol>
                 </li>
             </ol>
@@ -566,11 +532,11 @@ keyUsage               = digitalSignature, keyEncipherment
             <li>Configure a SASL port in server.properties, by adding at least one of
                 SASL_PLAINTEXT or SASL_SSL to the <i>listeners</i> parameter, which
                 contains one or more comma-separated values:
-                <pre>    listeners=SASL_PLAINTEXT://host.name:port</pre>
+                <pre>    listeners=SASL_PLAINTEXT://host.name:port</code></pre>
                 If you are only configuring a SASL port (or if you want
                 the Kafka brokers to authenticate each other using SASL) then make sure
                 you set the same SASL protocol for inter-broker communication:
-                <pre>    security.inter.broker.protocol=SASL_PLAINTEXT (or SASL_SSL)</pre></li>
+                <pre>    security.inter.broker.protocol=SASL_PLAINTEXT (or SASL_SSL)</code></pre></li>
             <li>Select one or more  <a href="#security_sasl_mechanism">supported mechanisms</a>
                 to enable in the broker and follow the steps to configure SASL for the mechanism.
                 To enable multiple mechanisms in the broker, follow the steps
@@ -590,23 +556,21 @@ keyUsage               = digitalSignature, keyEncipherment
     </li>
     <li><h4><a id="security_sasl_kerberos" href="#security_sasl_kerberos">Authentication using SASL/Kerberos</a></h4>
         <ol>
-        <li><h5><a id="security_sasl_kerberos_prereq" href="#security_sasl_kerberos_prereq">Prerequisites</a></h5>
+        <li><h5 class="anchor-heading"><a id="security_sasl_kerberos_prereq" class="anchor-link"></a><a href="#security_sasl_kerberos_prereq">Prerequisites</a></h5>
         <ol>
             <li><b>Kerberos</b><br>
             If your organization is already using a Kerberos server (for example, by using Active Directory), there is no need to install a new server just for Kafka. Otherwise you will need to install one, your Linux vendor likely has packages for Kerberos and a short guide on how to install and configure it (<a href="https://help.ubuntu.com/community/Kerberos">Ubuntu</a>, <a href="https://access.redhat.com/documentation/en-US/Red_Hat_Enterprise_Linux/6/html/Managing_Smart_Cards/install [...]
             <li><b>Create Kerberos Principals</b><br>
             If you are using the organization's Kerberos or Active Directory server, ask your Kerberos administrator for a principal for each Kafka broker in your cluster and for every operating system user that will access Kafka with Kerberos authentication (via clients and tools).</br>
             If you have installed your own Kerberos, you will need to create these principals yourself using the following commands:
-                <pre class="brush: bash;">
-        sudo /usr/sbin/kadmin.local -q 'addprinc -randkey kafka/{hostname}@{REALM}'
-        sudo /usr/sbin/kadmin.local -q "ktadd -k /etc/security/keytabs/{keytabname}.keytab kafka/{hostname}@{REALM}"</pre></li>
+                <pre class="line-numbers"><code class="language-bash">        sudo /usr/sbin/kadmin.local -q 'addprinc -randkey kafka/{hostname}@{REALM}'
+        sudo /usr/sbin/kadmin.local -q "ktadd -k /etc/security/keytabs/{keytabname}.keytab kafka/{hostname}@{REALM}"</code></pre></li>
             <li><b>Make sure all hosts can be reachable using hostnames</b> - it is a Kerberos requirement that all your hosts can be resolved with their FQDNs.</li>
         </ol>
-        <li><h5><a id="security_sasl_kerberos_brokerconfig" href="#security_sasl_kerberos_brokerconfig">Configuring Kafka Brokers</a></h5>
+        <li><h5 class="anchor-heading"><a id="security_sasl_kerberos_brokerconfig" class="anchor-link"></a><a href="#security_sasl_kerberos_brokerconfig">Configuring Kafka Brokers</a></h5>
         <ol>
             <li>Add a suitably modified JAAS file similar to the one below to each Kafka broker's config directory, let's call it kafka_server_jaas.conf for this example (note that each broker should have its own keytab):
-            <pre class="brush: text;">
-        KafkaServer {
+            <pre class="line-numbers"><code class="language-text">        KafkaServer {
             com.sun.security.auth.module.Krb5LoginModule required
             useKeyTab=true
             storeKey=true
@@ -621,27 +585,26 @@ keyUsage               = digitalSignature, keyEncipherment
         storeKey=true
         keyTab="/etc/security/keytabs/kafka_server.keytab"
         principal="kafka/kafka1.hostname.com@EXAMPLE.COM";
-        };</pre>
+        };</code></pre>
 
             </li>
             <tt>KafkaServer</tt> section in the JAAS file tells the broker which principal to use and the location of the keytab where this principal is stored. It
             allows the broker to login using the keytab specified in this section. See <a href="#security_jaas_broker">notes</a> for more details on Zookeeper SASL configuration.
             <li>Pass the JAAS and optionally the krb5 file locations as JVM parameters to each Kafka broker (see <a href="https://docs.oracle.com/javase/8/docs/technotes/guides/security/jgss/tutorials/KerberosReq.html">here</a> for more details):
                 <pre>    -Djava.security.krb5.conf=/etc/kafka/krb5.conf
-        -Djava.security.auth.login.config=/etc/kafka/kafka_server_jaas.conf</pre>
+        -Djava.security.auth.login.config=/etc/kafka/kafka_server_jaas.conf</code></pre>
             </li>
             <li>Make sure the keytabs configured in the JAAS file are readable by the operating system user who is starting kafka broker.</li>
             <li>Configure SASL port and SASL mechanisms in server.properties as described <a href="#security_sasl_brokerconfig">here</a>. For example:
             <pre>    listeners=SASL_PLAINTEXT://host.name:port
         security.inter.broker.protocol=SASL_PLAINTEXT
         sasl.mechanism.inter.broker.protocol=GSSAPI
-        sasl.enabled.mechanisms=GSSAPI
-            </pre>
+        sasl.enabled.mechanisms=GSSAPI</code></pre>
             </li>We must also configure the service name in server.properties, which should match the principal name of the kafka brokers. In the above example, principal is "kafka/kafka1.hostname.com@EXAMPLE.com", so:
-            <pre>    sasl.kerberos.service.name=kafka</pre>
+            <pre>    sasl.kerberos.service.name=kafka</code></pre>
 
         </ol></li>
-        <li><h5><a id="security_sasl_kerberos_clientconfig" href="#security_sasl_kerberos_clientconfig">Configuring Kafka Clients</a></h5>
+        <li><h5 class="anchor-heading"><a id="security_sasl_kerberos_clientconfig" class="anchor-link"></a><a href="#security_sasl_kerberos_clientconfig">Configuring Kafka Clients</a></h5>
             To configure SASL authentication on the clients:
             <ol>
                 <li>
@@ -652,30 +615,27 @@ keyUsage               = digitalSignature, keyEncipherment
                     The property <code>sasl.jaas.config</code> in producer.properties or consumer.properties describes
                     how clients like producer and consumer can connect to the Kafka Broker. The following is an example
                     configuration for a client using a keytab (recommended for long-running processes):
-                <pre>
-    sasl.jaas.config=com.sun.security.auth.module.Krb5LoginModule required \
+                <pre class="line-numbers"><code class="language-text">    sasl.jaas.config=com.sun.security.auth.module.Krb5LoginModule required \
         useKeyTab=true \
         storeKey=true  \
         keyTab="/etc/security/keytabs/kafka_client.keytab" \
-        principal="kafka-client-1@EXAMPLE.COM";</pre>
+        principal="kafka-client-1@EXAMPLE.COM";</code></pre>
 
                    For command-line utilities like kafka-console-consumer or kafka-console-producer, kinit can be used
                    along with "useTicketCache=true" as in:
-                <pre>
-    sasl.jaas.config=com.sun.security.auth.module.Krb5LoginModule required \
-        useTicketCache=true;</pre>
+                <pre class="line-numbers"><code class="language-text">    sasl.jaas.config=com.sun.security.auth.module.Krb5LoginModule required \
+        useTicketCache=true;</code></pre>
 
                    JAAS configuration for clients may alternatively be specified as a JVM parameter similar to brokers
                    as described <a href="#security_client_staticjaas">here</a>. Clients use the login section named
                    <tt>KafkaClient</tt>. This option allows only one user for all client connections from a JVM.</li>
                 <li>Make sure the keytabs configured in the JAAS configuration are readable by the operating system user who is starting kafka client.</li>
                 <li>Optionally pass the krb5 file locations as JVM parameters to each client JVM (see <a href="https://docs.oracle.com/javase/8/docs/technotes/guides/security/jgss/tutorials/KerberosReq.html">here</a> for more details):
-                <pre>    -Djava.security.krb5.conf=/etc/kafka/krb5.conf</pre></li>
+                <pre>    -Djava.security.krb5.conf=/etc/kafka/krb5.conf</code></pre></li>
                 <li>Configure the following properties in producer.properties or consumer.properties:
-                <pre>
-    security.protocol=SASL_PLAINTEXT (or SASL_SSL)
+                <pre class="line-numbers"><code class="language-text">    security.protocol=SASL_PLAINTEXT (or SASL_SSL)
     sasl.mechanism=GSSAPI
-    sasl.kerberos.service.name=kafka</pre></li>
+    sasl.kerberos.service.name=kafka</code></pre></li>
             </ol>
         </li>
         </ol>
@@ -686,42 +646,40 @@ keyUsage               = digitalSignature, keyEncipherment
         Kafka supports a default implementation for SASL/PLAIN which can be extended for production use as described <a href="#security_sasl_plain_production">here</a>.</p>
         The username is used as the authenticated <code>Principal</code> for configuration of ACLs etc.
         <ol>
-        <li><h5><a id="security_sasl_plain_brokerconfig" href="#security_sasl_plain_brokerconfig">Configuring Kafka Brokers</a></h5>
+        <li><h5 class="anchor-heading"><a id="security_sasl_plain_brokerconfig" class="anchor-link"></a><a href="#security_sasl_plain_brokerconfig">Configuring Kafka Brokers</a></h5>
             <ol>
             <li>Add a suitably modified JAAS file similar to the one below to each Kafka broker's config directory, let's call it kafka_server_jaas.conf for this example:
-                <pre class="brush: text;">
-        KafkaServer {
+                <pre class="line-numbers"><code class="language-text">        KafkaServer {
             org.apache.kafka.common.security.plain.PlainLoginModule required
             username="admin"
             password="admin-secret"
             user_admin="admin-secret"
             user_alice="alice-secret";
-        };</pre>
+        };</code></pre>
                 This configuration defines two users (<i>admin</i> and <i>alice</i>). The properties <tt>username</tt> and <tt>password</tt>
                 in the <tt>KafkaServer</tt> section are used by the broker to initiate connections to other brokers. In this example,
                 <i>admin</i> is the user for inter-broker communication. The set of properties <tt>user_<i>userName</i></tt> defines
                 the passwords for all users that connect to the broker and the broker validates all client connections including
                 those from other brokers using these properties.</li>
             <li>Pass the JAAS config file location as JVM parameter to each Kafka broker:
-                <pre>    -Djava.security.auth.login.config=/etc/kafka/kafka_server_jaas.conf</pre></li>
+                <pre>    -Djava.security.auth.login.config=/etc/kafka/kafka_server_jaas.conf</code></pre></li>
             <li>Configure SASL port and SASL mechanisms in server.properties as described <a href="#security_sasl_brokerconfig">here</a>. For example:
                 <pre>    listeners=SASL_SSL://host.name:port
         security.inter.broker.protocol=SASL_SSL
         sasl.mechanism.inter.broker.protocol=PLAIN
-        sasl.enabled.mechanisms=PLAIN</pre></li>
+        sasl.enabled.mechanisms=PLAIN</code></pre></li>
             </ol>
         </li>
 
-        <li><h5><a id="security_sasl_plain_clientconfig" href="#security_sasl_plain_clientconfig">Configuring Kafka Clients</a></h5>
+        <li><h5 class="anchor-heading"><a id="security_sasl_plain_clientconfig" class="anchor-link"></a><a href="#security_sasl_plain_clientconfig">Configuring Kafka Clients</a></h5>
             To configure SASL authentication on the clients:
             <ol>
             <li>Configure the JAAS configuration property for each client in producer.properties or consumer.properties.
                 The login module describes how the clients like producer and consumer can connect to the Kafka Broker.
                 The following is an example configuration for a client for the PLAIN mechanism:
-                <pre class="brush: text;">
-    sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required \
+                <pre class="line-numbers"><code class="language-text">    sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required \
         username="alice" \
-        password="alice-secret";</pre>
+        password="alice-secret";</code></pre>
                 <p>The options <tt>username</tt> and <tt>password</tt> are used by clients to configure
                 the user for client connections. In this example, clients connect to the broker as user <i>alice</i>.
                 Different clients within a JVM may connect as different users by specifying different user names
@@ -731,9 +689,8 @@ keyUsage               = digitalSignature, keyEncipherment
                 as described <a href="#security_client_staticjaas">here</a>. Clients use the login section named
                 <tt>KafkaClient</tt>. This option allows only one user for all client connections from a JVM.</p></li>
             <li>Configure the following properties in producer.properties or consumer.properties:
-                <pre>
-    security.protocol=SASL_SSL
-    sasl.mechanism=PLAIN</pre></li>
+                <pre class="line-numbers"><code class="language-text">    security.protocol=SASL_SSL
+    sasl.mechanism=PLAIN</code></pre></li>
             </ol>
         </li>
         <li><h5><a id="security_sasl_plain_production" href="#security_sasl_plain_production">Use of SASL/PLAIN in production</a></h5>
@@ -762,65 +719,54 @@ keyUsage               = digitalSignature, keyEncipherment
         is on a private network. Refer to <a href="#security_sasl_scram_security">Security Considerations</a>
         for more details.</p>
         <ol>
-        <li><h5><a id="security_sasl_scram_credentials" href="#security_sasl_scram_credentials">Creating SCRAM Credentials</a></h5>
+        <li><h5 class="anchor-heading"><a id="security_sasl_scram_credentials" class="anchor-link"></a><a href="#security_sasl_scram_credentials">Creating SCRAM Credentials</a></h5>
         <p>The SCRAM implementation in Kafka uses Zookeeper as credential store. Credentials can be created in
         Zookeeper using <tt>kafka-configs.sh</tt>. For each SCRAM mechanism enabled, credentials must be created
         by adding a config with the mechanism name. Credentials for inter-broker communication must be created
         before Kafka brokers are started. Client credentials may be created and updated dynamically and updated
         credentials will be used to authenticate new connections.</p>
         <p>Create SCRAM credentials for user <i>alice</i> with password <i>alice-secret</i>:
-        <pre class="brush: bash;">
-    > bin/kafka-configs.sh --zookeeper localhost:2182 --zk-tls-config-file zk_tls_config.properties --alter --add-config 'SCRAM-SHA-256=[iterations=8192,password=alice-secret],SCRAM-SHA-512=[password=alice-secret]' --entity-type users --entity-name alice
-        </pre>
+        <pre class="line-numbers"><code class="language-bash">    > bin/kafka-configs.sh --zookeeper localhost:2182 --zk-tls-config-file zk_tls_config.properties --alter --add-config 'SCRAM-SHA-256=[iterations=8192,password=alice-secret],SCRAM-SHA-512=[password=alice-secret]' --entity-type users --entity-name alice</code></pre>
         <p>The default iteration count of 4096 is used if iterations are not specified. A random salt is created
         and the SCRAM identity consisting of salt, iterations, StoredKey and ServerKey are stored in Zookeeper.
         See <a href="https://tools.ietf.org/html/rfc5802">RFC 5802</a> for details on SCRAM identity and the individual fields.
         <p>The following examples also require a user <i>admin</i> for inter-broker communication which can be created using:
-        <pre class="brush: bash;">
-    > bin/kafka-configs.sh --zookeeper localhost:2182 --zk-tls-config-file zk_tls_config.properties --alter --add-config 'SCRAM-SHA-256=[password=admin-secret],SCRAM-SHA-512=[password=admin-secret]' --entity-type users --entity-name admin
-        </pre>
+        <pre class="line-numbers"><code class="language-bash">    > bin/kafka-configs.sh --zookeeper localhost:2182 --zk-tls-config-file zk_tls_config.properties --alter --add-config 'SCRAM-SHA-256=[password=admin-secret],SCRAM-SHA-512=[password=admin-secret]' --entity-type users --entity-name admin</code></pre>
         <p>Existing credentials may be listed using the <i>--describe</i> option:
-        <pre class="brush: bash;">
-    > bin/kafka-configs.sh --zookeeper localhost:2182 --zk-tls-config-file zk_tls_config.properties --describe --entity-type users --entity-name alice
-        </pre>
+        <pre class="line-numbers"><code class="language-bash">    > bin/kafka-configs.sh --zookeeper localhost:2182 --zk-tls-config-file zk_tls_config.properties --describe --entity-type users --entity-name alice</code></pre>
         <p>Credentials may be deleted for one or more SCRAM mechanisms using the <i>--alter --delete-config</i> option:
-        <pre class="brush: bash;">
-    > bin/kafka-configs.sh --zookeeper localhost:2182 --zk-tls-config-file zk_tls_config.properties --alter --delete-config 'SCRAM-SHA-512' --entity-type users --entity-name alice
-        </pre>
+        <pre class="line-numbers"><code class="language-bash">    > bin/kafka-configs.sh --zookeeper localhost:2182 --zk-tls-config-file zk_tls_config.properties --alter --delete-config 'SCRAM-SHA-512' --entity-type users --entity-name alice</code></pre>
         </li>
-        <li><h5><a id="security_sasl_scram_brokerconfig" href="#security_sasl_scram_brokerconfig">Configuring Kafka Brokers</a></h5>
+        <li><h5 class="anchor-heading"><a id="security_sasl_scram_brokerconfig" class="anchor-link"></a><a href="#security_sasl_scram_brokerconfig">Configuring Kafka Brokers</a></h5>
             <ol>
             <li>Add a suitably modified JAAS file similar to the one below to each Kafka broker's config directory, let's call it kafka_server_jaas.conf for this example:
-                <pre>
-    KafkaServer {
+                <pre class="line-numbers"><code class="language-text">    KafkaServer {
         org.apache.kafka.common.security.scram.ScramLoginModule required
         username="admin"
         password="admin-secret";
-    };</pre>
+    };</code></pre>
                 The properties <tt>username</tt> and <tt>password</tt> in the <tt>KafkaServer</tt> section are used by
                 the broker to initiate connections to other brokers. In this example, <i>admin</i> is the user for
                 inter-broker communication.</li>
             <li>Pass the JAAS config file location as JVM parameter to each Kafka broker:
-                <pre>    -Djava.security.auth.login.config=/etc/kafka/kafka_server_jaas.conf</pre></li>
-            <li>Configure SASL port and SASL mechanisms in server.properties as described <a href="#security_sasl_brokerconfig">here</a>.</pre> For example:
-                <pre>
-    listeners=SASL_SSL://host.name:port
+                <pre>    -Djava.security.auth.login.config=/etc/kafka/kafka_server_jaas.conf</code></pre></li>
+            <li>Configure SASL port and SASL mechanisms in server.properties as described <a href="#security_sasl_brokerconfig">here</a>.</code></pre> For example:
+                <pre class="line-numbers"><code class="language-text">    listeners=SASL_SSL://host.name:port
     security.inter.broker.protocol=SASL_SSL
     sasl.mechanism.inter.broker.protocol=SCRAM-SHA-256 (or SCRAM-SHA-512)
-    sasl.enabled.mechanisms=SCRAM-SHA-256 (or SCRAM-SHA-512)</pre></li>
+    sasl.enabled.mechanisms=SCRAM-SHA-256 (or SCRAM-SHA-512)</code></pre></li>
             </ol>
         </li>
 
-        <li><h5><a id="security_sasl_scram_clientconfig" href="#security_sasl_scram_clientconfig">Configuring Kafka Clients</a></h5>
+        <li><h5 class="anchor-heading"><a id="security_sasl_scram_clientconfig" class="anchor-link"></a><a href="#security_sasl_scram_clientconfig">Configuring Kafka Clients</a></h5>
             To configure SASL authentication on the clients:
             <ol>
             <li>Configure the JAAS configuration property for each client in producer.properties or consumer.properties.
                 The login module describes how the clients like producer and consumer can connect to the Kafka Broker.
                 The following is an example configuration for a client for the SCRAM mechanisms:
-                <pre class="brush: text;">
-   sasl.jaas.config=org.apache.kafka.common.security.scram.ScramLoginModule required \
+                <pre class="line-numbers"><code class="language-text">   sasl.jaas.config=org.apache.kafka.common.security.scram.ScramLoginModule required \
         username="alice" \
-        password="alice-secret";</pre>
+        password="alice-secret";</code></pre>
 
                 <p>The options <tt>username</tt> and <tt>password</tt> are used by clients to configure
                 the user for client connections. In this example, clients connect to the broker as user <i>alice</i>.
@@ -831,9 +777,8 @@ keyUsage               = digitalSignature, keyEncipherment
                 as described <a href="#security_client_staticjaas">here</a>. Clients use the login section named
                 <tt>KafkaClient</tt>. This option allows only one user for all client connections from a JVM.</p></li>
             <li>Configure the following properties in producer.properties or consumer.properties:
-                <pre>
-    security.protocol=SASL_SSL
-    sasl.mechanism=SCRAM-SHA-256 (or SCRAM-SHA-512)</pre></li>
+                <pre class="line-numbers"><code class="language-text">    security.protocol=SASL_SSL
+    sasl.mechanism=SCRAM-SHA-256 (or SCRAM-SHA-512)</code></pre></li>
             </ol>
         </li>
         <li><h5><a id="security_sasl_scram_security" href="#security_sasl_scram_security">Security Considerations for SASL/SCRAM</a></h5>
@@ -863,37 +808,34 @@ keyUsage               = digitalSignature, keyEncipherment
         and is only suitable for use in non-production Kafka installations. Refer to <a href="#security_sasl_oauthbearer_security">Security Considerations</a>
         for more details.</p>
         <ol>
-        <li><h5><a id="security_sasl_oauthbearer_brokerconfig" href="#security_sasl_oauthbearer_brokerconfig">Configuring Kafka Brokers</a></h5>
+        <li><h5 class="anchor-heading"><a id="security_sasl_oauthbearer_brokerconfig" class="anchor-link"></a><a href="#security_sasl_oauthbearer_brokerconfig">Configuring Kafka Brokers</a></h5>
             <ol>
             <li>Add a suitably modified JAAS file similar to the one below to each Kafka broker's config directory, let's call it kafka_server_jaas.conf for this example:
-                <pre>
-    KafkaServer {
+                <pre class="line-numbers"><code class="language-text">    KafkaServer {
         org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule required
         unsecuredLoginStringClaim_sub="admin";
-    };</pre>
+    };</code></pre>
                 The property <tt>unsecuredLoginStringClaim_sub</tt> in the <tt>KafkaServer</tt> section is used by
                 the broker when it initiates connections to other brokers. In this example, <i>admin</i> will appear in the
                 subject (<tt>sub</tt>) claim and will be the user for inter-broker communication.</li>
             <li>Pass the JAAS config file location as JVM parameter to each Kafka broker:
-                <pre>    -Djava.security.auth.login.config=/etc/kafka/kafka_server_jaas.conf</pre></li>
-            <li>Configure SASL port and SASL mechanisms in server.properties as described <a href="#security_sasl_brokerconfig">here</a>.</pre> For example:
-                <pre>
-    listeners=SASL_SSL://host.name:port (or SASL_PLAINTEXT if non-production)
+                <pre>    -Djava.security.auth.login.config=/etc/kafka/kafka_server_jaas.conf</code></pre></li>
+            <li>Configure SASL port and SASL mechanisms in server.properties as described <a href="#security_sasl_brokerconfig">here</a>.</code></pre> For example:
+                <pre class="line-numbers"><code class="language-text">    listeners=SASL_SSL://host.name:port (or SASL_PLAINTEXT if non-production)
     security.inter.broker.protocol=SASL_SSL (or SASL_PLAINTEXT if non-production)
     sasl.mechanism.inter.broker.protocol=OAUTHBEARER
-    sasl.enabled.mechanisms=OAUTHBEARER</pre></li>
+    sasl.enabled.mechanisms=OAUTHBEARER</code></pre></li>
             </ol>
         </li>
 
-        <li><h5><a id="security_sasl_oauthbearer_clientconfig" href="#security_sasl_oauthbearer_clientconfig">Configuring Kafka Clients</a></h5>
+        <li><h5 class="anchor-heading"><a id="security_sasl_oauthbearer_clientconfig" class="anchor-link"></a><a href="#security_sasl_oauthbearer_clientconfig">Configuring Kafka Clients</a></h5>
             To configure SASL authentication on the clients:
             <ol>
 	    <li>Configure the JAAS configuration property for each client in producer.properties or consumer.properties.
                 The login module describes how the clients like producer and consumer can connect to the Kafka Broker.
 	        The following is an example configuration for a client for the OAUTHBEARER mechanisms:
-                <pre class="brush: text;">
-   sasl.jaas.config=org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule required \
-        unsecuredLoginStringClaim_sub="alice";</pre>
+                <pre class="line-numbers"><code class="language-text">   sasl.jaas.config=org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule required \
+        unsecuredLoginStringClaim_sub="alice";</code></pre>
 
                 <p>The option <tt>unsecuredLoginStringClaim_sub</tt> is used by clients to configure
                 the subject (<tt>sub</tt>) claim, which determines the user for client connections.
@@ -905,9 +847,8 @@ keyUsage               = digitalSignature, keyEncipherment
                 as described <a href="#security_client_staticjaas">here</a>. Clients use the login section named
                 <tt>KafkaClient</tt>. This option allows only one user for all client connections from a JVM.</p></li>
             <li>Configure the following properties in producer.properties or consumer.properties:
-                <pre>
-    security.protocol=SASL_SSL (or SASL_PLAINTEXT if non-production)
-    sasl.mechanism=OAUTHBEARER</pre></li>
+                <pre class="line-numbers"><code class="language-text">    security.protocol=SASL_SSL (or SASL_PLAINTEXT if non-production)
+    sasl.mechanism=OAUTHBEARER</code></pre></li>
              <li>The default implementation of SASL/OAUTHBEARER depends on the jackson-databind library.
                  Since it's an optional dependency, users have to configure it as a dependency via their build tool.</li>
             </ol>
@@ -1062,11 +1003,10 @@ keyUsage               = digitalSignature, keyEncipherment
         </ol>
     </li>
 
-    <li><h4><a id="security_sasl_multimechanism" href="#security_sasl_multimechanism">Enabling multiple SASL mechanisms in a broker</a></h4>
+    <li><h4 class="anchor-heading"><a id="security_sasl_multimechanism" class="anchor-link"></a><a href="#security_sasl_multimechanism">Enabling multiple SASL mechanisms in a broker</a></h4>
         <ol>
         <li>Specify configuration for the login modules of all enabled mechanisms in the <tt>KafkaServer</tt> section of the JAAS config file. For example:
-            <pre>
-        KafkaServer {
+            <pre class="line-numbers"><code class="language-text">        KafkaServer {
             com.sun.security.auth.module.Krb5LoginModule required
             useKeyTab=true
             storeKey=true
@@ -1078,19 +1018,18 @@ keyUsage               = digitalSignature, keyEncipherment
             password="admin-secret"
             user_admin="admin-secret"
             user_alice="alice-secret";
-        };</pre></li>
-        <li>Enable the SASL mechanisms in server.properties: <pre>    sasl.enabled.mechanisms=GSSAPI,PLAIN,SCRAM-SHA-256,SCRAM-SHA-512,OAUTHBEARER</pre></li>
+        };</code></pre></li>
+        <li>Enable the SASL mechanisms in server.properties: <pre>    sasl.enabled.mechanisms=GSSAPI,PLAIN,SCRAM-SHA-256,SCRAM-SHA-512,OAUTHBEARER</code></pre></li>
         <li>Specify the SASL security protocol and mechanism for inter-broker communication in server.properties if required:
-            <pre>
-    security.inter.broker.protocol=SASL_PLAINTEXT (or SASL_SSL)
-    sasl.mechanism.inter.broker.protocol=GSSAPI (or one of the other enabled mechanisms)</pre></li>
+            <pre class="line-numbers"><code class="language-text">    security.inter.broker.protocol=SASL_PLAINTEXT (or SASL_SSL)
+    sasl.mechanism.inter.broker.protocol=GSSAPI (or one of the other enabled mechanisms)</code></pre></li>
         <li>Follow the mechanism-specific steps in <a href="#security_sasl_kerberos_brokerconfig">GSSAPI (Kerberos)</a>,
             <a href="#security_sasl_plain_brokerconfig">PLAIN</a>,
             <a href="#security_sasl_scram_brokerconfig">SCRAM</a> and <a href="#security_sasl_oauthbearer_brokerconfig">OAUTHBEARER</a>
             to configure SASL for the enabled mechanisms.</li>
         </ol>
     </li>
-    <li><h4><a id="saslmechanism_rolling_upgrade" href="#saslmechanism_rolling_upgrade">Modifying SASL mechanism in a Running Cluster</a></h4>
+    <li><h4 class="anchor-heading"><a id="saslmechanism_rolling_upgrade" class="anchor-link"></a><a href="#saslmechanism_rolling_upgrade">Modifying SASL mechanism in a Running Cluster</a></h4>
         <p>SASL mechanism can be modified in a running cluster using the following sequence:</p>
         <ol>
         <li>Enable new SASL mechanism by adding the mechanism to <tt>sasl.enabled.mechanisms</tt> in server.properties for each broker. Update JAAS config file to include both
@@ -1103,7 +1042,7 @@ keyUsage               = digitalSignature, keyEncipherment
         </ol>
     </li>
 
-    <li><h4><a id="security_delegation_token" href="#security_delegation_token">Authentication using Delegation Tokens</a></h4>
+    <li><h4 class="anchor-heading"><a id="security_delegation_token" class="anchor-link"></a><a href="#security_delegation_token">Authentication using Delegation Tokens</a></h4>
         <p>Delegation token based authentication is a lightweight authentication mechanism to complement existing SASL/SSL
             methods. Delegation tokens are shared secrets between kafka brokers and clients. Delegation tokens will help processing
             frameworks to distribute the workload to available workers in a secure environment without the added cost of distributing
@@ -1119,7 +1058,7 @@ keyUsage               = digitalSignature, keyEncipherment
         </ol>
 
         <ol>
-        <li><h5><a id="security_token_management" href="#security_token_management">Token Management</a></h5>
+        <li><h5 class="anchor-heading"><a id="security_token_management" class="anchor-link"></a><a href="#security_token_management">Token Management</a></h5>
         <p> A master key/secret is used to generate and verify delegation tokens. This is supplied using config
             option <tt>delegation.token.master.key</tt>. Same secret key must be configured across all the brokers.
             If the secret is not set or set to empty string, brokers will disable the delegation token authentication.</p>
@@ -1136,29 +1075,21 @@ keyUsage               = digitalSignature, keyEncipherment
             is beyond the max life time, it will be deleted from all broker caches as well as from zookeeper.</p>
         </li>
 
-        <li><h5><a id="security_sasl_create_tokens" href="#security_sasl_create_tokens">Creating Delegation Tokens</a></h5>
+        <li><h5 class="anchor-heading"><a id="security_sasl_create_tokens" class="anchor-link"></a><a href="#security_sasl_create_tokens">Creating Delegation Tokens</a></h5>
         <p>Tokens can be created by using Admin APIs or using <tt>kafka-delegation-tokens.sh</tt> script.
             Delegation token requests (create/renew/expire/describe) should be issued only on SASL or SSL authenticated channels.
             Tokens can not be requests if the initial authentication is done through delegation token.
             <tt>kafka-delegation-tokens.sh</tt> script examples are given below.</p>
         <p>Create a delegation token:
-        <pre class="brush: bash;">
-    > bin/kafka-delegation-tokens.sh --bootstrap-server localhost:9092 --create   --max-life-time-period -1 --command-config client.properties --renewer-principal User:user1
-        </pre>
+        <pre class="line-numbers"><code class="language-bash">    > bin/kafka-delegation-tokens.sh --bootstrap-server localhost:9092 --create   --max-life-time-period -1 --command-config client.properties --renewer-principal User:user1</code></pre>
         <p>Renew a delegation token:
-        <pre class="brush: bash;">
-    > bin/kafka-delegation-tokens.sh --bootstrap-server localhost:9092 --renew    --renew-time-period -1 --command-config client.properties --hmac ABCDEFGHIJK
-        </pre>
+        <pre class="line-numbers"><code class="language-bash">    > bin/kafka-delegation-tokens.sh --bootstrap-server localhost:9092 --renew    --renew-time-period -1 --command-config client.properties --hmac ABCDEFGHIJK</code></pre>
         <p>Expire a delegation token:
-        <pre class="brush: bash;">
-    > bin/kafka-delegation-tokens.sh --bootstrap-server localhost:9092 --expire   --expiry-time-period -1   --command-config client.properties  --hmac ABCDEFGHIJK
-        </pre>
+        <pre class="line-numbers"><code class="language-bash">    > bin/kafka-delegation-tokens.sh --bootstrap-server localhost:9092 --expire   --expiry-time-period -1   --command-config client.properties  --hmac ABCDEFGHIJK</code></pre>
         <p>Existing tokens can be described using the --describe option:
-        <pre class="brush: bash;">
-    > bin/kafka-delegation-tokens.sh --bootstrap-server localhost:9092 --describe --command-config client.properties  --owner-principal User:user1
-        </pre>
+        <pre class="line-numbers"><code class="language-bash">    > bin/kafka-delegation-tokens.sh --bootstrap-server localhost:9092 --describe --command-config client.properties  --owner-principal User:user1</code></pre>
         </li>
-        <li><h5><a id="security_token_authentication" href="#security_token_authentication">Token Authentication</a></h5>
+        <li><h5 class="anchor-heading"><a id="security_token_authentication" class="anchor-link"></a><a href="#security_token_authentication">Token Authentication</a></h5>
             <p>Delegation token authentication piggybacks on the current SASL/SCRAM authentication mechanism. We must enable
                 SASL/SCRAM mechanism on Kafka cluster as described in <a href="#security_sasl_scram">here</a>.</p>
 
@@ -1167,11 +1098,10 @@ keyUsage               = digitalSignature, keyEncipherment
                     <li>Configure the JAAS configuration property for each client in producer.properties or consumer.properties.
                 The login module describes how the clients like producer and consumer can connect to the Kafka Broker.
                 The following is an example configuration for a client for the token authentication:
-                <pre class="brush: text;">
-   sasl.jaas.config=org.apache.kafka.common.security.scram.ScramLoginModule required \
+                <pre class="line-numbers"><code class="language-text">   sasl.jaas.config=org.apache.kafka.common.security.scram.ScramLoginModule required \
         username="tokenID123" \
         password="lAYYSFmLs4bTjf+lTZ1LCHR/ZZFNA==" \
-        tokenauth="true";</pre>
+        tokenauth="true";</code></pre>
 
                 <p>The options <tt>username</tt> and <tt>password</tt> are used by clients to configure the token id and
                     token HMAC. And the option <tt>tokenauth</tt> is used to indicate the server about token authentication.
@@ -1196,7 +1126,7 @@ keyUsage               = digitalSignature, keyEncipherment
             <p>We intend to automate this in a future Kafka release.</p>
         </li>
 
-        <li><h5><a id="security_token_notes" href="#security_token_notes">Notes on Delegation Tokens</a></h5>
+        <li><h5 class="anchor-heading"><a id="security_token_notes" class="anchor-link"></a><a href="#security_token_notes">Notes on Delegation Tokens</a></h5>
             <ul>
             <li>Currently, we only allow a user to create delegation token for that user only. Owner/Renewers can renew or expire tokens.
                 Owner/renewers can always describe their own tokens. To describe others tokens, we need to add DESCRIBE permission on Token Resource.</li>
@@ -1206,15 +1136,15 @@ keyUsage               = digitalSignature, keyEncipherment
     </li>
     </ol>
 
-    <h3><a id="security_authz" href="#security_authz">7.4 Authorization and ACLs</a></h3>
+    <h3 class="anchor-heading"><a id="security_authz" class="anchor-link"></a><a href="#security_authz">7.4 Authorization and ACLs</a></h3>
     Kafka ships with a pluggable Authorizer and an out-of-box authorizer implementation that uses zookeeper to store all the acls. The Authorizer is configured by setting <tt>authorizer.class.name</tt> in server.properties. To enable the out of the box implementation use:
-    <pre>authorizer.class.name=kafka.security.authorizer.AclAuthorizer</pre>
+    <pre>authorizer.class.name=kafka.security.authorizer.AclAuthorizer</code></pre>
     Kafka acls are defined in the general format of "Principal P is [Allowed/Denied] Operation O From Host H on any Resource R matching ResourcePattern RP". You can read more about the acl structure in <a href="https://cwiki.apache.org/confluence/display/KAFKA/KIP-11+-+Authorization+Interface">KIP-11</a> and resource patterns in <a href="https://cwiki.apache.org/confluence/display/KAFKA/KIP-290%3A+Support+for+Prefixed+ACLs">KIP-290</a>. In order to add, remove or list acls you can use th [...]
-    <pre>allow.everyone.if.no.acl.found=true</pre>
+    <pre>allow.everyone.if.no.acl.found=true</code></pre>
     One can also add super users in server.properties like the following (note that the delimiter is semicolon since SSL user names may contain comma). Default PrincipalType string "User" is case sensitive.
-    <pre>super.users=User:Bob;User:Alice</pre>
+    <pre>super.users=User:Bob;User:Alice</code></pre>
 
-    <h5><a id="security_authz_ssl" href="#security_authz_ssl">Customizing SSL User Name</a></h5>
+    <h5 class="anchor-heading"><a id="security_authz_ssl" class="anchor-link"></a><a href="#security_authz_ssl">Customizing SSL User Name</a></h5>
 
     By default, the SSL user name will be of the form "CN=writeuser,OU=Unknown,O=Unknown,L=Unknown,ST=Unknown,C=Unknown". One can change that by setting <code>ssl.principal.mapping.rules</code> to a customized rule in server.properties.
     This config allows a list of rules for mapping X.500 distinguished name to short name. The rules are evaluated in order and the first rule that matches a distinguished name is used to map it to a short name. Any later rules in the list are ignored.
@@ -1223,43 +1153,37 @@ keyUsage               = digitalSignature, keyEncipherment
     string representation of the X.500 certificate distinguished name. If the distinguished name matches the pattern, then the replacement command will be run over the name.
     This also supports lowercase/uppercase options, to force the translated result to be all lower/uppercase case. This is done by adding a "/L" or "/U' to the end of the rule.
 
-    <pre>
-        RULE:pattern/replacement/
-        RULE:pattern/replacement/[LU]
-    </pre>
+    <pre class="line-numbers"><code class="language-text">        RULE:pattern/replacement/
+        RULE:pattern/replacement/[LU]</code></pre>
 
     Example <code>ssl.principal.mapping.rules</code> values are:
-    <pre>
-        RULE:^CN=(.*?),OU=ServiceUsers.*$/$1/,
+    <pre class="line-numbers"><code class="language-text">        RULE:^CN=(.*?),OU=ServiceUsers.*$/$1/,
         RULE:^CN=(.*?),OU=(.*?),O=(.*?),L=(.*?),ST=(.*?),C=(.*?)$/$1@$2/L,
         RULE:^.*[Cc][Nn]=([a-zA-Z0-9.]*).*$/$1/L,
-        DEFAULT
-    </pre>
+        DEFAULT</code></pre>
 
     Above rules translate distinguished name "CN=serviceuser,OU=ServiceUsers,O=Unknown,L=Unknown,ST=Unknown,C=Unknown" to "serviceuser"
     and "CN=adminUser,OU=Admin,O=Unknown,L=Unknown,ST=Unknown,C=Unknown" to "adminuser@admin".
 
     <br>For advanced use cases, one can customize the name by setting a customized PrincipalBuilder in server.properties like the following.
-    <pre>principal.builder.class=CustomizedPrincipalBuilderClass</pre>
+    <pre>principal.builder.class=CustomizedPrincipalBuilderClass</code></pre>
 
-    <h5><a id="security_authz_sasl" href="#security_authz_sasl">Customizing SASL User Name</a></h5>
+    <h5 class="anchor-heading"><a id="security_authz_sasl" class="anchor-link"></a><a href="#security_authz_sasl">Customizing SASL User Name</a></h5>
 
     By default, the SASL user name will be the primary part of the Kerberos principal. One can change that by setting <code>sasl.kerberos.principal.to.local.rules</code> to a customized rule in server.properties.
     The format of <code>sasl.kerberos.principal.to.local.rules</code> is a list where each rule works in the same way as the auth_to_local in <a href="http://web.mit.edu/Kerberos/krb5-latest/doc/admin/conf_files/krb5_conf.html">Kerberos configuration file (krb5.conf)</a>. This also support additional lowercase/uppercase rule, to force the translated result to be all lowercase/uppercase. This is done by adding a "/L" or "/U" to the end of the rule. check below formats for syntax.
     Each rules starts with RULE: and contains an expression as the following formats. See the kerberos documentation for more details.
-    <pre>
-        RULE:[n:string](regexp)s/pattern/replacement/
+    <pre class="line-numbers"><code class="language-text">        RULE:[n:string](regexp)s/pattern/replacement/
         RULE:[n:string](regexp)s/pattern/replacement/g
         RULE:[n:string](regexp)s/pattern/replacement//L
         RULE:[n:string](regexp)s/pattern/replacement/g/L
         RULE:[n:string](regexp)s/pattern/replacement//U
-        RULE:[n:string](regexp)s/pattern/replacement/g/U
-    </pre>
+        RULE:[n:string](regexp)s/pattern/replacement/g/U</code></pre>
 
     An example of adding a rule to properly translate user@MYDOMAIN.COM to user while also keeping the default rule in place is:
-    <pre>sasl.kerberos.principal.to.local.rules=RULE:[1:$1@$0](.*@MYDOMAIN.COM)s/@.*//,DEFAULT</pre>
+    <pre>sasl.kerberos.principal.to.local.rules=RULE:[1:$1@$0](.*@MYDOMAIN.COM)s/@.*//,DEFAULT</code></pre>
 
-    <h4><a id="security_authz_cli" href="#security_authz_cli">Command Line Interface</a></h4>
+    <h4 class="anchor-heading"><a id="security_authz_cli" class="anchor-link"></a><a href="#security_authz_cli">Command Line Interface</a></h4>
     Kafka Authorization management CLI can be found under bin directory with all the other CLIs. The CLI script is called <b>kafka-acls.sh</b>. Following lists all the options that the script supports:
     <p></p>
     <table class="data-table">
@@ -1447,45 +1371,45 @@ keyUsage               = digitalSignature, keyEncipherment
         </tr>
     </tbody></table>
 
-    <h4><a id="security_authz_examples" href="#security_authz_examples">Examples</a></h4>
+    <h4 class="anchor-heading"><a id="security_authz_examples" class="anchor-link"></a><a href="#security_authz_examples">Examples</a></h4>
     <ul>
         <li><b>Adding Acls</b><br>
     Suppose you want to add an acl "Principals User:Bob and User:Alice are allowed to perform Operation Read and Write on Topic Test-Topic from IP 198.51.100.0 and IP 198.51.100.1". You can do that by executing the CLI with following options:
-            <pre class="brush: bash;">bin/kafka-acls.sh --authorizer-properties zookeeper.connect=localhost:2181 --add --allow-principal User:Bob --allow-principal User:Alice --allow-host 198.51.100.0 --allow-host 198.51.100.1 --operation Read --operation Write --topic Test-topic</pre>
+            <pre class="language-bash">bin/kafka-acls.sh --authorizer-properties zookeeper.connect=localhost:2181 --add --allow-principal User:Bob --allow-principal User:Alice --allow-host 198.51.100.0 --allow-host 198.51.100.1 --operation Read --operation Write --topic Test-topic</code></pre>
             By default, all principals that don't have an explicit acl that allows access for an operation to a resource are denied. In rare cases where an allow acl is defined that allows access to all but some principal we will have to use the --deny-principal and --deny-host option. For example, if we want to allow all users to Read from Test-topic but only deny User:BadBob from IP 198.51.100.3 we can do so using following commands:
-            <pre class="brush: bash;">bin/kafka-acls.sh --authorizer-properties zookeeper.connect=localhost:2181 --add --allow-principal User:* --allow-host * --deny-principal User:BadBob --deny-host 198.51.100.3 --operation Read --topic Test-topic</pre>
+            <pre class="language-bash">bin/kafka-acls.sh --authorizer-properties zookeeper.connect=localhost:2181 --add --allow-principal User:* --allow-host * --deny-principal User:BadBob --deny-host 198.51.100.3 --operation Read --topic Test-topic</code></pre>
             Note that <code>--allow-host</code> and <code>--deny-host</code> only support IP addresses (hostnames are not supported).
             Above examples add acls to a topic by specifying --topic [topic-name] as the resource pattern option. Similarly user can add acls to cluster by specifying --cluster and to a consumer group by specifying --group [group-name].
             You can add acls on any resource of a certain type, e.g. suppose you wanted to add an acl "Principal User:Peter is allowed to produce to any Topic from IP 198.51.200.0"
             You can do that by using the wildcard resource '*', e.g. by executing the CLI with following options:
-            <pre class="brush: bash;">bin/kafka-acls.sh --authorizer-properties zookeeper.connect=localhost:2181 --add --allow-principal User:Peter --allow-host 198.51.200.1 --producer --topic *</pre>
+            <pre class="language-bash">bin/kafka-acls.sh --authorizer-properties zookeeper.connect=localhost:2181 --add --allow-principal User:Peter --allow-host 198.51.200.1 --producer --topic *</code></pre>
             You can add acls on prefixed resource patterns, e.g. suppose you want to add an acl "Principal User:Jane is allowed to produce to any Topic whose name starts with 'Test-' from any host".
             You can do that by executing the CLI with following options:
-            <pre class="brush: bash;">bin/kafka-acls.sh --authorizer-properties zookeeper.connect=localhost:2181 --add --allow-principal User:Jane --producer --topic Test- --resource-pattern-type prefixed</pre>
+            <pre class="language-bash">bin/kafka-acls.sh --authorizer-properties zookeeper.connect=localhost:2181 --add --allow-principal User:Jane --producer --topic Test- --resource-pattern-type prefixed</code></pre>
             Note, --resource-pattern-type defaults to 'literal', which only affects resources with the exact same name or, in the case of the wildcard resource name '*', a resource with any name.</li>
 
         <li><b>Removing Acls</b><br>
                 Removing acls is pretty much the same. The only difference is instead of --add option users will have to specify --remove option. To remove the acls added by the first example above we can execute the CLI with following options:
-            <pre class="brush: bash;"> bin/kafka-acls.sh --authorizer-properties zookeeper.connect=localhost:2181 --remove --allow-principal User:Bob --allow-principal User:Alice --allow-host 198.51.100.0 --allow-host 198.51.100.1 --operation Read --operation Write --topic Test-topic </pre>
+            <pre class="language-bash"> bin/kafka-acls.sh --authorizer-properties zookeeper.connect=localhost:2181 --remove --allow-principal User:Bob --allow-principal User:Alice --allow-host 198.51.100.0 --allow-host 198.51.100.1 --operation Read --operation Write --topic Test-topic </code></pre>
             If you want to remove the acl added to the prefixed resource pattern above we can execute the CLI with following options:
-            <pre class="brush: bash;"> bin/kafka-acls.sh --authorizer-properties zookeeper.connect=localhost:2181 --remove --allow-principal User:Jane --producer --topic Test- --resource-pattern-type Prefixed</pre></li>
+            <pre class="language-bash"> bin/kafka-acls.sh --authorizer-properties zookeeper.connect=localhost:2181 --remove --allow-principal User:Jane --producer --topic Test- --resource-pattern-type Prefixed</code></pre></li>
 
         <li><b>List Acls</b><br>
                 We can list acls for any resource by specifying the --list option with the resource. To list all acls on the literal resource pattern Test-topic, we can execute the CLI with following options:
-                <pre class="brush: bash;">bin/kafka-acls.sh --authorizer-properties zookeeper.connect=localhost:2181 --list --topic Test-topic</pre>
+                <pre class="language-bash">bin/kafka-acls.sh --authorizer-properties zookeeper.connect=localhost:2181 --list --topic Test-topic</code></pre>
                 However, this will only return the acls that have been added to this exact resource pattern. Other acls can exist that affect access to the topic,
                 e.g. any acls on the topic wildcard '*', or any acls on prefixed resource patterns. Acls on the wildcard resource pattern can be queried explicitly:
-                <pre class="brush: bash;">bin/kafka-acls.sh --authorizer-properties zookeeper.connect=localhost:2181 --list --topic *</pre>
+                <pre class="language-bash">bin/kafka-acls.sh --authorizer-properties zookeeper.connect=localhost:2181 --list --topic *</code></pre>
                 However, it is not necessarily possible to explicitly query for acls on prefixed resource patterns that match Test-topic as the name of such patterns may not be known.
                 We can list <i>all</i> acls affecting Test-topic by using '--resource-pattern-type match', e.g.
-                <pre class="brush: bash;">bin/kafka-acls.sh --authorizer-properties zookeeper.connect=localhost:2181 --list --topic Test-topic --resource-pattern-type match</pre>
+                <pre class="language-bash">bin/kafka-acls.sh --authorizer-properties zookeeper.connect=localhost:2181 --list --topic Test-topic --resource-pattern-type match</code></pre>
                 This will list acls on all matching literal, wildcard and prefixed resource patterns.</li>
 
         <li><b>Adding or removing a principal as producer or consumer</b><br>
                 The most common use case for acl management are adding/removing a principal as producer or consumer so we added convenience options to handle these cases. In order to add User:Bob as a producer of  Test-topic we can execute the following command:
-            <pre class="brush: bash;"> bin/kafka-acls.sh --authorizer-properties zookeeper.connect=localhost:2181 --add --allow-principal User:Bob --producer --topic Test-topic</pre>
+            <pre class="language-bash"> bin/kafka-acls.sh --authorizer-properties zookeeper.connect=localhost:2181 --add --allow-principal User:Bob --producer --topic Test-topic</code></pre>
                 Similarly to add Alice as a consumer of Test-topic with consumer group Group-1 we just have to pass --consumer option:
-            <pre class="brush: bash;"> bin/kafka-acls.sh --authorizer-properties zookeeper.connect=localhost:2181 --add --allow-principal User:Bob --consumer --topic Test-topic --group Group-1 </pre>
+            <pre class="language-bash"> bin/kafka-acls.sh --authorizer-properties zookeeper.connect=localhost:2181 --add --allow-principal User:Bob --consumer --topic Test-topic --group Group-1 </code></pre>
                 Note that for consumer option we must also specify the consumer group.
                 In order to remove a principal from producer or consumer role we just need to pass --remove option. </li>
 
@@ -1493,18 +1417,17 @@ keyUsage               = digitalSignature, keyEncipherment
             Users having Alter permission on ClusterResource can use Admin API for ACL management. kafka-acls.sh script supports AdminClient API to manage ACLs without interacting with zookeeper/authorizer directly.
             All the above examples can be executed by using <b>--bootstrap-server</b> option. For example:
 
-            <pre class="brush: bash;">
-            bin/kafka-acls.sh --bootstrap-server localhost:9092 --command-config /tmp/adminclient-configs.conf --add --allow-principal User:Bob --producer --topic Test-topic
+            <pre class="line-numbers"><code class="language-bash">            bin/kafka-acls.sh --bootstrap-server localhost:9092 --command-config /tmp/adminclient-configs.conf --add --allow-principal User:Bob --producer --topic Test-topic
             bin/kafka-acls.sh --bootstrap-server localhost:9092 --command-config /tmp/adminclient-configs.conf --add --allow-principal User:Bob --consumer --topic Test-topic --group Group-1
-            bin/kafka-acls.sh --bootstrap-server localhost:9092 --command-config /tmp/adminclient-configs.conf --list --topic Test-topic</pre></li>
+            bin/kafka-acls.sh --bootstrap-server localhost:9092 --command-config /tmp/adminclient-configs.conf --list --topic Test-topic</code></pre></li>
 
     </ul>
 
-    <h4><a id="security_authz_primitives" href="#security_authz_primitives">Authorization Primitives</a></h4>
+    <h4 class="anchor-heading"><a id="security_authz_primitives" class="anchor-link"></a><a href="#security_authz_primitives">Authorization Primitives</a></h4>
     <p>Protocol calls are usually performing some operations on certain resources in Kafka. It is required to know the
         operations and resources to set up effective protection. In this section we'll list these operations and
         resources, then list the combination of these with the protocols to see the valid scenarios.</p>
-    <h5><a id="operations_in_kafka" href="#operations_in_kafka">Operations in Kafka</a></h5>
+    <h5 class="anchor-heading"><a id="operations_in_kafka" class="anchor-link"></a><a href="#operations_in_kafka">Operations in Kafka</a></h5>
     <p>There are a few operation primitives that can be used to build up privileges. These can be matched up with
         certain resources to allow specific protocol calls for a given user. These are:</p>
     <ul>
@@ -1520,7 +1443,7 @@ keyUsage               = digitalSignature, keyEncipherment
         <li>IdempotentWrite</li>
         <li>All</li>
     </ul>
-    <h5><a id="resources_in_kafka" href="#resources_in_kafka">Resources in Kafka</a></h5>
+    <h5 class="anchor-heading"><a id="resources_in_kafka" class="anchor-link"></a><a href="#resources_in_kafka">Resources in Kafka</a></h5>
     <p>The operations above can be applied on certain resources which are described below.</p>
     <ul>
         <li><b>Topic:</b> this simply represents a Topic. All protocol calls that are acting on topics (such as reading,
@@ -1540,7 +1463,7 @@ keyUsage               = digitalSignature, keyEncipherment
             <a href="https://cwiki.apache.org/confluence/display/KAFKA/KIP-48+Delegation+token+support+for+Kafka#KIP-48DelegationtokensupportforKafka-DescribeDelegationTokenRequest">KIP-48</a>
             and the related upstream documentation at <a href="#security_delegation_token">Authentication using Delegation Tokens</a>.</li>
     </ul>
-    <h5><a id="operations_resources_and_protocols" href="#operations_resources_and_protocols">Operations and Resources on Protocols</a></h5>
+    <h5 class="anchor-heading"><a id="operations_resources_and_protocols" class="anchor-link"></a><a href="#operations_resources_and_protocols">Operations and Resources on Protocols</a></h5>
     <p>In the below table we'll list the valid operations on resources that are executed by the Kafka API protocols.</p>
     <table class="data-table">
         <thead>
@@ -1986,7 +1909,7 @@ keyUsage               = digitalSignature, keyEncipherment
         </tbody>
     </table>
 
-    <h3><a id="security_rolling_upgrade" href="#security_rolling_upgrade">7.5 Incorporating Security Features in a Running Cluster</a></h3>
+    <h3 class="anchor-heading"><a id="security_rolling_upgrade" class="anchor-link"></a><a href="#security_rolling_upgrade">7.5 Incorporating Security Features in a Running Cluster</a></h3>
         You can secure a running cluster via one or more of the supported protocols discussed previously. This is done in phases:
         <p></p>
         <ul>
@@ -2006,64 +1929,48 @@ keyUsage               = digitalSignature, keyEncipherment
         When performing an incremental bounce stop the brokers cleanly via a SIGTERM. It's also good practice to wait for restarted replicas to return to the ISR list before moving onto the next node.
         <p></p>
         As an example, say we wish to encrypt both broker-client and broker-broker communication with SSL. In the first incremental bounce, an SSL port is opened on each node:
-            <pre>
-            listeners=PLAINTEXT://broker1:9091,SSL://broker1:9092
-            </pre>
+            <pre class="line-numbers"><code class="language-text">            listeners=PLAINTEXT://broker1:9091,SSL://broker1:9092</code></pre>
 
         We then restart the clients, changing their config to point at the newly opened, secured port:
 
-            <pre>
-            bootstrap.servers = [broker1:9092,...]
+            <pre class="line-numbers"><code class="language-text">            bootstrap.servers = [broker1:9092,...]
             security.protocol = SSL
-            ...etc
-            </pre>
+            ...etc</code></pre>
 
         In the second incremental server bounce we instruct Kafka to use SSL as the broker-broker protocol (which will use the same SSL port):
 
-            <pre>
-            listeners=PLAINTEXT://broker1:9091,SSL://broker1:9092
-            security.inter.broker.protocol=SSL
-            </pre>
+            <pre class="line-numbers"><code class="language-text">            listeners=PLAINTEXT://broker1:9091,SSL://broker1:9092
+            security.inter.broker.protocol=SSL</code></pre>
 
         In the final bounce we secure the cluster by closing the PLAINTEXT port:
 
-            <pre>
-            listeners=SSL://broker1:9092
-            security.inter.broker.protocol=SSL
-            </pre>
+            <pre class="line-numbers"><code class="language-text">            listeners=SSL://broker1:9092
+            security.inter.broker.protocol=SSL</code></pre>
 
         Alternatively we might choose to open multiple ports so that different protocols can be used for broker-broker and broker-client communication. Say we wished to use SSL encryption throughout (i.e. for broker-broker and broker-client communication) but we'd like to add SASL authentication to the broker-client connection also. We would achieve this by opening two additional ports during the first bounce:
 
-            <pre>
-            listeners=PLAINTEXT://broker1:9091,SSL://broker1:9092,SASL_SSL://broker1:9093
-            </pre>
+            <pre class="line-numbers"><code class="language-text">            listeners=PLAINTEXT://broker1:9091,SSL://broker1:9092,SASL_SSL://broker1:9093</code></pre>
 
         We would then restart the clients, changing their config to point at the newly opened, SASL & SSL secured port:
 
-            <pre>
-            bootstrap.servers = [broker1:9093,...]
+            <pre class="line-numbers"><code class="language-text">            bootstrap.servers = [broker1:9093,...]
             security.protocol = SASL_SSL
-            ...etc
-            </pre>
+            ...etc</code></pre>
 
         The second server bounce would switch the cluster to use encrypted broker-broker communication via the SSL port we previously opened on port 9092:
 
-            <pre>
-            listeners=PLAINTEXT://broker1:9091,SSL://broker1:9092,SASL_SSL://broker1:9093
-            security.inter.broker.protocol=SSL
-            </pre>
+            <pre class="line-numbers"><code class="language-text">            listeners=PLAINTEXT://broker1:9091,SSL://broker1:9092,SASL_SSL://broker1:9093
+            security.inter.broker.protocol=SSL</code></pre>
 
         The final bounce secures the cluster by closing the PLAINTEXT port.
 
-            <pre>
-        listeners=SSL://broker1:9092,SASL_SSL://broker1:9093
-        security.inter.broker.protocol=SSL
-            </pre>
+            <pre class="line-numbers"><code class="language-text">        listeners=SSL://broker1:9092,SASL_SSL://broker1:9093
+        security.inter.broker.protocol=SSL</code></pre>
 
         ZooKeeper can be secured independently of the Kafka cluster. The steps for doing this are covered in section <a href="#zk_authz_migration">7.6.2</a>.
 
 
-    <h3><a id="zk_authz" href="#zk_authz">7.6 ZooKeeper Authentication</a></h3>
+    <h3 class="anchor-heading"><a id="zk_authz" class="anchor-link"></a><a href="#zk_authz">7.6 ZooKeeper Authentication</a></h3>
     ZooKeeper supports mutual TLS (mTLS) authentication beginning with the 3.5.x versions.
     Kafka supports authenticating to ZooKeeper with SASL and mTLS -- either individually or both together --
     beginning with version 2.5. See
@@ -2095,8 +2002,8 @@ keyUsage               = digitalSignature, keyEncipherment
         Use the <tt>-zk-tls-config-file &lt;file&gt;</tt> option (note the single-dash rather than double-dash)
         to set TLS configs for the <tt>zookeeper-shell.sh</tt> CLI tool.
     </p>
-    <h4><a id="zk_authz_new" href="#zk_authz_new">7.6.1 New clusters</a></h4>
-    <h5><a id="zk_authz_new_sasl" href="#zk_authz_new_sasl">7.6.1.1 ZooKeeper SASL Authentication</a></h5>
+    <h4 class="anchor-heading"><a id="zk_authz_new" class="anchor-link"></a><a href="#zk_authz_new">7.6.1 New clusters</a></h4>
+    <h5 class="anchor-heading"><a id="zk_authz_new_sasl" class="anchor-link"></a><a href="#zk_authz_new_sasl">7.6.1.1 ZooKeeper SASL Authentication</a></h5>
     To enable ZooKeeper SASL authentication on brokers, there are two necessary steps:
     <ol>
         <li> Create a JAAS login file and set the appropriate system property to point to it as described above</li>
@@ -2105,7 +2012,7 @@ keyUsage               = digitalSignature, keyEncipherment
 
     The metadata stored in ZooKeeper for the Kafka cluster is world-readable, but can only be modified by the brokers. The rationale behind this decision is that the data stored in ZooKeeper is not sensitive, but inappropriate manipulation of that data can cause cluster disruption. We also recommend limiting the access to ZooKeeper via network segmentation (only brokers and some admin tools need access to ZooKeeper).
 
-    <h5><a id="zk_authz_new_mtls" href="#zk_authz_new_mtls">7.6.1.2 ZooKeeper Mutual TLS Authentication</a></h5>
+    <h5 class="anchor-heading"><a id="zk_authz_new_mtls" class="anchor-link"></a><a href="#zk_authz_new_mtls">7.6.1.2 ZooKeeper Mutual TLS Authentication</a></h5>
     ZooKeeper mTLS authentication can be enabled with or without SASL authentication.  As mentioned above,
     when using mTLS alone, every broker and any CLI tools (such as the <a href="#zk_authz_migration">ZooKeeper Security Migration Tool</a>)
     must generally identify itself with the same Distinguished Name (DN) because it is the DN that is ACL'ed, which means
@@ -2121,15 +2028,13 @@ keyUsage               = digitalSignature, keyEncipherment
     Here is a sample (partial) ZooKeeper configuration for enabling TLS authentication.
     These configurations are described in the
     <a href="https://zookeeper.apache.org/doc/r3.5.7/zookeeperAdmin.html#sc_authOptions">ZooKeeper Admin Guide</a>.
-    <pre>
-        secureClientPort=2182
+    <pre class="line-numbers"><code class="language-text">        secureClientPort=2182
         serverCnxnFactory=org.apache.zookeeper.server.NettyServerCnxnFactory
         authProvider.x509=org.apache.zookeeper.server.auth.X509AuthenticationProvider
         ssl.keyStore.location=/path/to/zk/keystore.jks
         ssl.keyStore.password=zk-ks-passwd
         ssl.trustStore.location=/path/to/zk/truststore.jks
-        ssl.trustStore.password=zk-ts-passwd
-    </pre>
+        ssl.trustStore.password=zk-ts-passwd</code></pre>
     <strong>IMPORTANT</strong>: ZooKeeper does not support setting the key password in the ZooKeeper server keystore
     to a value different from the keystore password itself.
     Be sure to set the key password to be the same as the keystore password.
@@ -2137,8 +2042,7 @@ keyUsage               = digitalSignature, keyEncipherment
     <p>Here is a sample (partial) Kafka Broker configuration for connecting to ZooKeeper with mTLS authentication.
     These configurations are described above in <a href="#brokerconfigs">Broker Configs</a>.
     </p>
-    <pre>
-        # connect to the ZooKeeper port configured for TLS
+    <pre class="line-numbers"><code class="language-text">        # connect to the ZooKeeper port configured for TLS
         zookeeper.connect=zk1:2182,zk2:2182,zk3:2182
         # required to use TLS to ZooKeeper (default is false)
         zookeeper.ssl.client.enable=true
@@ -2150,26 +2054,23 @@ keyUsage               = digitalSignature, keyEncipherment
         zookeeper.ssl.truststore.location=/path/to/kafka/truststore.jks
         zookeeper.ssl.truststore.password=kafka-ts-passwd
         # tell broker to create ACLs on znodes
-        zookeeper.set.acl=true
-    </pre>
+        zookeeper.set.acl=true</code></pre>
     <strong>IMPORTANT</strong>: ZooKeeper does not support setting the key password in the ZooKeeper client (i.e. broker) keystore
     to a value different from the keystore password itself.
     Be sure to set the key password to be the same as the keystore password.
 
-    <h4><a id="zk_authz_migration" href="#zk_authz_migration">7.6.2 Migrating clusters</a></h4>
+    <h4 class="anchor-heading"><a id="zk_authz_migration" class="anchor-link"></a><a href="#zk_authz_migration">7.6.2 Migrating clusters</a></h4>
     If you are running a version of Kafka that does not support security or simply with security disabled, and you want to make the cluster secure, then you need to execute the following steps to enable ZooKeeper authentication with minimal disruption to your operations:
     <ol>
         <li>Enable SASL and/or mTLS authentication on ZooKeeper.  If enabling mTLS, you would now have both a non-TLS port and a TLS port, like this:
-            <pre>
-    clientPort=2181
+            <pre class="line-numbers"><code class="language-text">    clientPort=2181
     secureClientPort=2182
     serverCnxnFactory=org.apache.zookeeper.server.NettyServerCnxnFactory
     authProvider.x509=org.apache.zookeeper.server.auth.X509AuthenticationProvider
     ssl.keyStore.location=/path/to/zk/keystore.jks
     ssl.keyStore.password=zk-ks-passwd
     ssl.trustStore.location=/path/to/zk/truststore.jks
-    ssl.trustStore.password=zk-ts-passwd
-            </pre>
+    ssl.trustStore.password=zk-ts-passwd</code></pre>
         </li>
         <li>Perform a rolling restart of brokers setting the JAAS login file and/or defining ZooKeeper mutual TLS configurations (including connecting to the TLS-enabled ZooKeeper port) as required, which enables brokers to authenticate to ZooKeeper. At the end of the rolling restart, brokers are able to manipulate znodes with strict ACLs, but they will not create znodes with those ACLs</li>
         <li>If you enabled mTLS, disable the non-TLS port in ZooKeeper</li>
@@ -2185,32 +2086,27 @@ keyUsage               = digitalSignature, keyEncipherment
         <li>If you are disabling mTLS, disable the TLS port in ZooKeeper</li>
     </ol>
     Here is an example of how to run the migration tool:
-    <pre class="brush: bash;">
-    bin/zookeeper-security-migration.sh --zookeeper.acl=secure --zookeeper.connect=localhost:2181
-    </pre>
+    <pre class="line-numbers"><code class="language-bash">    bin/zookeeper-security-migration.sh --zookeeper.acl=secure --zookeeper.connect=localhost:2181</code></pre>
     <p>Run this to see the full list of parameters:</p>
-    <pre class="brush: bash;">
-    bin/zookeeper-security-migration.sh --help
-    </pre>
-    <h4><a id="zk_authz_ensemble" href="#zk_authz_ensemble">7.6.3 Migrating the ZooKeeper ensemble</a></h4>
+    <pre class="line-numbers"><code class="language-bash">    bin/zookeeper-security-migration.sh --help</code></pre>
+    <h4 class="anchor-heading"><a id="zk_authz_ensemble" class="anchor-link"></a><a href="#zk_authz_ensemble">7.6.3 Migrating the ZooKeeper ensemble</a></h4>
     It is also necessary to enable SASL and/or mTLS authentication on the ZooKeeper ensemble. To do it, we need to perform a rolling restart of the server and set a few properties. See above for mTLS information.  Please refer to the ZooKeeper documentation for more detail:
     <ol>
         <li><a href="https://zookeeper.apache.org/doc/r3.5.7/zookeeperProgrammers.html#sc_ZooKeeperAccessControl">Apache ZooKeeper documentation</a></li>
         <li><a href="https://cwiki.apache.org/confluence/display/ZOOKEEPER/Zookeeper+and+SASL">Apache ZooKeeper wiki</a></li>
     </ol>
-    <h4><a id="zk_authz_quorum" href="#zk_authz_quorum">7.6.4 ZooKeeper Quorum Mutual TLS Authentication</a></h4>
+    <h4 class="anchor-heading"><a id="zk_authz_quorum" class="anchor-link"></a><a href="#zk_authz_quorum">7.6.4 ZooKeeper Quorum Mutual TLS Authentication</a></h4>
     It is possible to enable mTLS authentication between the ZooKeeper servers themselves.
     Please refer to the <a href="https://zookeeper.apache.org/doc/r3.5.7/zookeeperAdmin.html#Quorum+TLS">ZooKeeper documentation</a> for more detail.
 
-    <h3><a id="zk_encryption" href="#zk_encryption">7.7 ZooKeeper Encryption</a></h3>
+    <h3 class="anchor-heading"><a id="zk_encryption" class="anchor-link"></a><a href="#zk_encryption">7.7 ZooKeeper Encryption</a></h3>
     ZooKeeper connections that use mutual TLS are encrypted.
     Beginning with ZooKeeper version 3.5.7 (the version shipped with Kafka version 2.5) ZooKeeper supports a sever-side config
     <tt>ssl.clientAuth</tt> (case-insensitively: <tt>want</tt>/<tt>need</tt>/<tt>none</tt> are the valid options, the default is <tt>need</tt>),
     and setting this value to <tt>none</tt> in ZooKeeper allows clients to connect via a TLS-encrypted connection
     without presenting their own certificate.  Here is a sample (partial) Kafka Broker configuration for connecting to ZooKeeper with just TLS encryption.
     These configurations are described above in <a href="#brokerconfigs">Broker Configs</a>.
-    <pre>
-        # connect to the ZooKeeper port configured for TLS
+    <pre class="line-numbers"><code class="language-text">        # connect to the ZooKeeper port configured for TLS
         zookeeper.connect=zk1:2182,zk2:2182,zk3:2182
         # required to use TLS to ZooKeeper (default is false)
         zookeeper.ssl.client.enable=true
@@ -2221,8 +2117,7 @@ keyUsage               = digitalSignature, keyEncipherment
         zookeeper.ssl.truststore.location=/path/to/kafka/truststore.jks
         zookeeper.ssl.truststore.password=kafka-ts-passwd
         # tell broker to create ACLs on znodes (if using SASL authentication, otherwise do not set this)
-        zookeeper.set.acl=true
-    </pre>
+        zookeeper.set.acl=true</code></pre>
 </script>
 
 <div class="p-security"></div>
diff --git a/docs/uses.html b/docs/uses.html
index 09bc45f..51f16a8 100644
--- a/docs/uses.html
+++ b/docs/uses.html
@@ -18,7 +18,7 @@
 <p> Here is a description of a few of the popular use cases for Apache Kafka&reg;.
 For an overview of a number of these areas in action, see <a href="https://engineering.linkedin.com/distributed-systems/log-what-every-software-engineer-should-know-about-real-time-datas-unifying/">this blog post</a>. </p>
 
-<h4><a id="uses_messaging" href="#uses_messaging">Messaging</a></h4>
+<h4 class="anchor-heading"><a id="uses_messaging" class="anchor-link"></a><a href="#uses_messaging">Messaging</a></h4>
 
 Kafka works well as a replacement for a more traditional message broker.
 Message brokers are used for a variety of reasons (to decouple processing from data producers, to buffer unprocessed messages, etc).
@@ -31,7 +31,7 @@ durability guarantees Kafka provides.
 In this domain Kafka is comparable to traditional messaging systems such as <a href="http://activemq.apache.org">ActiveMQ</a> or
 <a href="https://www.rabbitmq.com">RabbitMQ</a>.
 
-<h4><a id="uses_website" href="#uses_website">Website Activity Tracking</a></h4>
+<h4 class="anchor-heading"><a id="uses_website" class="anchor-link"></a><a href="#uses_website">Website Activity Tracking</a></h4>
 
 The original use case for Kafka was to be able to rebuild a user activity tracking pipeline as a set of real-time publish-subscribe feeds.
 This means site activity (page views, searches, or other actions users may take) is published to central topics with one topic per activity type.
@@ -40,12 +40,12 @@ offline data warehousing systems for offline processing and reporting.
 <p>
 Activity tracking is often very high volume as many activity messages are generated for each user page view.
 
-<h4><a id="uses_metrics" href="#uses_metrics">Metrics</a></h4>
+<h4 class="anchor-heading"><a id="uses_metrics" class="anchor-link"></a><a href="#uses_metrics">Metrics</a></h4>
 
 Kafka is often used for operational monitoring data.
 This involves aggregating statistics from distributed applications to produce centralized feeds of operational data.
 
-<h4><a id="uses_logs" href="#uses_logs">Log Aggregation</a></h4>
+<h4 class="anchor-heading"><a id="uses_logs" class="anchor-link"></a><a href="#uses_logs">Log Aggregation</a></h4>
 
 Many people use Kafka as a replacement for a log aggregation solution.
 Log aggregation typically collects physical log files off servers and puts them in a central place (a file server or HDFS perhaps) for processing.
@@ -55,7 +55,7 @@ This allows for lower-latency processing and easier support for multiple data so
 In comparison to log-centric systems like Scribe or Flume, Kafka offers equally good performance, stronger durability guarantees due to replication,
 and much lower end-to-end latency.
 
-<h4><a id="uses_streamprocessing" href="#uses_streamprocessing">Stream Processing</a></h4>
+<h4 class="anchor-heading"><a id="uses_streamprocessing" class="anchor-link"></a><a href="#uses_streamprocessing">Stream Processing</a></h4>
 
 Many users of Kafka process data in processing pipelines consisting of multiple stages, where raw input data is consumed from Kafka topics and then
 aggregated, enriched, or otherwise transformed into new topics for further consumption or follow-up processing.
@@ -68,12 +68,12 @@ is available in Apache Kafka to perform such data processing as described above.
 Apart from Kafka Streams, alternative open source stream processing tools include <a href="https://storm.apache.org/">Apache Storm</a> and
 <a href="http://samza.apache.org/">Apache Samza</a>.
 
-<h4><a id="uses_eventsourcing" href="#uses_eventsourcing">Event Sourcing</a></h4>
+<h4 class="anchor-heading"><a id="uses_eventsourcing" class="anchor-link"></a><a href="#uses_eventsourcing">Event Sourcing</a></h4>
 
 <a href="http://martinfowler.com/eaaDev/EventSourcing.html">Event sourcing</a> is a style of application design where state changes are logged as a
 time-ordered sequence of records. Kafka's support for very large stored log data makes it an excellent backend for an application built in this style.
 
-<h4><a id="uses_commitlog" href="#uses_commitlog">Commit Log</a></h4>
+<h4 class="anchor-heading"><a id="uses_commitlog" class="anchor-link"></a><a href="#uses_commitlog">Commit Log</a></h4>
 
 Kafka can serve as a kind of external commit-log for a distributed system. The log helps replicate data between nodes and acts as a re-syncing
 mechanism for failed nodes to restore their data.