You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by ca...@apache.org on 2022/03/29 12:18:37 UTC

[kafka] branch 3.2 updated: KAFKA-6718: Add documentation for KIP-708 (#11923)

This is an automated email from the ASF dual-hosted git repository.

cadonna pushed a commit to branch 3.2
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/3.2 by this push:
     new 3143e7f  KAFKA-6718: Add documentation for KIP-708 (#11923)
3143e7f is described below

commit 3143e7f13e9a1e03e9fd35d6cc528e6ec4a3f106
Author: Levani Kokhreidze <le...@gmail.com>
AuthorDate: Tue Mar 29 15:08:51 2022 +0300

    KAFKA-6718: Add documentation for KIP-708 (#11923)
    
    Adds documentation for KIP-708: Rack awareness for Kafka Streams
    
    Co-authored-by: Bruno Cadonna <ca...@apache.org>
    
    Reviewers: Luke Chen <sh...@gmail.com>, Bruno Cadonna <ca...@apache.org>
---
 .../org/apache/kafka/common/config/ConfigDef.java  |  9 +++--
 .../apache/kafka/common/config/ConfigDefTest.java  |  5 +++
 docs/streams/architecture.html                     |  6 ++++
 docs/streams/developer-guide/config-streams.html   | 42 ++++++++++++++++++++++
 4 files changed, 60 insertions(+), 2 deletions(-)

diff --git a/clients/src/main/java/org/apache/kafka/common/config/ConfigDef.java b/clients/src/main/java/org/apache/kafka/common/config/ConfigDef.java
index 8c91a25..9331f99 100644
--- a/clients/src/main/java/org/apache/kafka/common/config/ConfigDef.java
+++ b/clients/src/main/java/org/apache/kafka/common/config/ConfigDef.java
@@ -16,8 +16,6 @@
  */
 package org.apache.kafka.common.config;
 
-import java.util.function.Function;
-import java.util.stream.Collectors;
 import org.apache.kafka.common.config.types.Password;
 import org.apache.kafka.common.utils.Utils;
 
@@ -33,8 +31,10 @@ import java.util.Locale;
 import java.util.Map;
 import java.util.Set;
 import java.util.function.BiConsumer;
+import java.util.function.Function;
 import java.util.function.Supplier;
 import java.util.regex.Pattern;
+import java.util.stream.Collectors;
 
 /**
  * This class is used for specifying the set of expected configurations. For each configuration, you can specify
@@ -1140,6 +1140,11 @@ public class ConfigDef {
                 throw new ConfigException(name, value, "exceeds maximum list size of [" + maxSize + "].");
             }
         }
+
+        @Override
+        public String toString() {
+            return "List containing maximum of " + maxSize + " elements";
+        }
     }
 
     public static class ConfigKey {
diff --git a/clients/src/test/java/org/apache/kafka/common/config/ConfigDefTest.java b/clients/src/test/java/org/apache/kafka/common/config/ConfigDefTest.java
index 0e5af1f..76c20df 100644
--- a/clients/src/test/java/org/apache/kafka/common/config/ConfigDefTest.java
+++ b/clients/src/test/java/org/apache/kafka/common/config/ConfigDefTest.java
@@ -758,4 +758,9 @@ public class ConfigDefTest {
                                                         "lst doc"));
     }
 
+    @Test
+    public void testListSizeValidatorToString() {
+        assertEquals("List containing maximum of 5 elements", ListSize.atMostOfSize(5).toString());
+    }
+
 }
diff --git a/docs/streams/architecture.html b/docs/streams/architecture.html
index a1773c5..e561231 100644
--- a/docs/streams/architecture.html
+++ b/docs/streams/architecture.html
@@ -161,6 +161,12 @@
         Starting in 2.6, Kafka Streams will guarantee that a task is only ever assigned to an instance with a fully caught-up local copy of the state, if such an instance
         exists. Standby tasks will increase the likelihood that a caught-up instance exists in the case of a failure.
     </p>
+    <p>
+        You can also configure standby replicas with rack awareness. When configured, Kafka Streams will attempt to
+        distribute a standby task on a different "rack" than the active one, thus having a faster recovery time when the
+        rack of the active tasks fails. See <code>rack.aware.assignment.tags</code>
+        in the <a href="/{{version}}/documentation/streams/developer-guide/config-streams.html#rack-aware-assignment-tags"><b>Kafka Streams Developer Guide</b></a> section.
+    </p>
 
     <div class="pagination">
         <a href="/{{version}}/documentation/streams/core-concepts" class="pagination__btn pagination__btn__prev">Previous</a>
diff --git a/docs/streams/developer-guide/config-streams.html b/docs/streams/developer-guide/config-streams.html
index dd9298d..0aee6b6 100644
--- a/docs/streams/developer-guide/config-streams.html
+++ b/docs/streams/developer-guide/config-streams.html
@@ -84,6 +84,7 @@ settings.put(... , ...);</code></pre>
               <li><a class="reference internal" href="#partition-grouper" id="id12">partition.grouper</a></li>
               <li><a class="reference internal" href="#probing-rebalance-interval-ms" id="id30">probing.rebalance.interval.ms</a></li>
               <li><a class="reference internal" href="#processing-guarantee" id="id25">processing.guarantee</a></li>
+              <li><a class="reference internal" href="#rack-aware-assignment-tags" id="id34">rack.aware.assignment.tags</a></li>
               <li><a class="reference internal" href="#replication-factor" id="id13">replication.factor</a></li>
               <li><a class="reference internal" href="#rocksdb-config-setter" id="id20">rocksdb.config.setter</a></li>
               <li><a class="reference internal" href="#state-dir" id="id14">state.dir</a></li>
@@ -383,6 +384,13 @@ streamsSettings.put(StreamsConfig.NUM_STANDBY_REPLICAS_CONFIG), 1);</code></pre>
             <td colspan="2">The amount of time in milliseconds to block waiting for input.</td>
             <td>100 milliseconds</td>
           </tr>
+          <tr class="row-even"><td>rack.aware.assignment.tags</td>
+            <td>Medium</td>
+            <td colspan="2">List of tag keys used to distribute standby replicas across Kafka Streams
+              clients. When configured, Kafka Streams will make a best-effort to distribute the standby tasks over
+              clients with different tag values.</td>
+            <td>the empty list</td>
+          </tr>
           <tr class="row-even"><td>replication.factor</td>
             <td>Medium</td>
             <td colspan="2">The replication factor for changelog topics and repartition topics created by the application.
@@ -677,6 +685,40 @@ streamsConfiguration.put(StreamsConfig.DEFAULT_TIMESTAMP_EXTRACTOR_CLASS_CONFIG,
               <p>This is discussed in more detail in <a class="reference internal" href="datatypes.html#streams-developer-guide-serdes"><span class="std std-ref">Data types and serialization</span></a>.</p>
             </div></blockquote>
         </div>
+      <div class="section" id="rack-aware-assignment-tags">
+        <h4><a class="toc-backref" href="#id34">rack.aware.assignment.tags</a><a class="headerlink" href="#rack-aware-assignment-tags" title="Permalink to this headline"></a>
+        </h4>
+        <blockquote>
+          <div>
+            <p>
+              This configuration sets a list of tag keys used to distribute standby replicas across Kafka Streams
+              clients. When configured, Kafka Streams will make a best-effort to distribute the standby tasks over
+              clients with different tag values.
+            </p>
+            <p>
+              Tags for the Kafka Streams clients can be set via <code class="docutils literal"><span class="pre">client.tag.</span></code>
+              prefix. Example:
+            </p>
+            <pre><code class="language-text">
+Client-1                                   | Client-2
+_______________________________________________________________________
+client.tag.zone: eu-central-1a             | client.tag.zone: eu-central-1b
+client.tag.cluster: k8s-cluster1           | client.tag.cluster: k8s-cluster1
+rack.aware.assignment.tags: zone,cluster   | rack.aware.assignment.tags: zone,cluster
+
+
+Client-3                                   | Client-4
+_______________________________________________________________________
+client.tag.zone: eu-central-1a             | client.tag.zone: eu-central-1b
+client.tag.cluster: k8s-cluster2           | client.tag.cluster: k8s-cluster2
+rack.aware.assignment.tags: zone,cluster   | rack.aware.assignment.tags: zone,cluster</code></pre>
+            <p>
+              In the above example, we have four Kafka Streams clients across two zones (<code class="docutils literal"><span class="pre">eu-central-1a</span></code>, <code class="docutils literal"><span class="pre">eu-central-1b</span></code>) and across two clusters (<code class="docutils literal"><span class="pre">k8s-cluster1</span></code>, <code class="docutils literal"><span class="pre">k8s-cluster2</span></code>).
+              For an active task located on <code class="docutils literal"><span class="pre">Client-1</span></code>, Kafka Streams will allocate a standby task on <code class="docutils literal"><span class="pre">Client-4</span></code>, since <code class="docutils literal"><span class="pre">Client-4</span></code> has a different <code class="docutils literal"><span class="pre">zone</span></code> and a different <code class="docutils literal"><span class="pre">cluster</span></code> than <c [...]
+            </p>
+          </div>
+        </blockquote>
+      </div>
         <div class="section" id="max-task-idle-ms">
           <span id="streams-developer-guide-max-task-idle-ms"></span><h4><a class="toc-backref" href="#id28">max.task.idle.ms</a><a class="headerlink" href="#max-task-idle-ms" title="Permalink to this headline"></a></h4>
           <blockquote>