You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by rs...@apache.org on 2018/02/14 22:19:45 UTC
[kafka] branch 1.1 updated: KAFKA-6476: Documentation for dynamic
broker configuration (#4558)
This is an automated email from the ASF dual-hosted git repository.
rsivaram pushed a commit to branch 1.1
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/1.1 by this push:
new 7e23d3b KAFKA-6476: Documentation for dynamic broker configuration (#4558)
7e23d3b is described below
commit 7e23d3b38e19c9292518a4c989faa4c8b942acf3
Author: Rajini Sivaram <ra...@googlemail.com>
AuthorDate: Wed Feb 14 22:09:45 2018 +0000
KAFKA-6476: Documentation for dynamic broker configuration (#4558)
Docs for dynamic broker configuration (KIP-226)
---
.../org/apache/kafka/common/config/ConfigDef.java | 39 +++++-
.../apache/kafka/common/config/ConfigDefTest.java | 27 ++++
.../scala/kafka/server/DynamicBrokerConfig.scala | 7 +
core/src/main/scala/kafka/server/KafkaConfig.scala | 2 +-
docs/configuration.html | 156 +++++++++++++++++++++
docs/security.html | 33 ++++-
6 files changed, 257 insertions(+), 7 deletions(-)
diff --git a/clients/src/main/java/org/apache/kafka/common/config/ConfigDef.java b/clients/src/main/java/org/apache/kafka/common/config/ConfigDef.java
index bb199dd..a1c0e3b 100644
--- a/clients/src/main/java/org/apache/kafka/common/config/ConfigDef.java
+++ b/clients/src/main/java/org/apache/kafka/common/config/ConfigDef.java
@@ -1091,16 +1091,40 @@ public class ConfigDef {
}
public String toHtmlTable() {
+ return toHtmlTable(Collections.<String, String>emptyMap());
+ }
+
+ private void addHeader(StringBuilder builder, String headerName) {
+ builder.append("<th>");
+ builder.append(headerName);
+ builder.append("</th>\n");
+ }
+
+ private void addColumnValue(StringBuilder builder, String value) {
+ builder.append("<td>");
+ builder.append(value);
+ builder.append("</td>");
+ }
+
+ /**
+ * Converts this config into an HTML table that can be embedded into docs.
+ * If <code>dynamicUpdateModes</code> is non-empty, a "Dynamic Update Mode" column
+ * will be included n the table with the value of the update mode. Default
+ * mode is "read-only".
+ * @param dynamicUpdateModes Config name -> update mode mapping
+ */
+ public String toHtmlTable(Map<String, String> dynamicUpdateModes) {
+ boolean hasUpdateModes = !dynamicUpdateModes.isEmpty();
List<ConfigKey> configs = sortedConfigs();
StringBuilder b = new StringBuilder();
b.append("<table class=\"data-table\"><tbody>\n");
b.append("<tr>\n");
// print column headers
for (String headerName : headers()) {
- b.append("<th>");
- b.append(headerName);
- b.append("</th>\n");
+ addHeader(b, headerName);
}
+ if (hasUpdateModes)
+ addHeader(b, "Dynamic Update Mode");
b.append("</tr>\n");
for (ConfigKey key : configs) {
if (key.internalConfig) {
@@ -1109,10 +1133,15 @@ public class ConfigDef {
b.append("<tr>\n");
// print column values
for (String headerName : headers()) {
- b.append("<td>");
- b.append(getConfigValue(key, headerName));
+ addColumnValue(b, getConfigValue(key, headerName));
b.append("</td>");
}
+ if (hasUpdateModes) {
+ String updateMode = dynamicUpdateModes.get(key.name);
+ if (updateMode == null)
+ updateMode = "read-only";
+ addColumnValue(b, updateMode);
+ }
b.append("</tr>\n");
}
b.append("</tbody></table>");
diff --git a/clients/src/test/java/org/apache/kafka/common/config/ConfigDefTest.java b/clients/src/test/java/org/apache/kafka/common/config/ConfigDefTest.java
index 339c51a..affa5dd 100644
--- a/clients/src/test/java/org/apache/kafka/common/config/ConfigDefTest.java
+++ b/clients/src/test/java/org/apache/kafka/common/config/ConfigDefTest.java
@@ -39,6 +39,7 @@ import static java.util.Arrays.asList;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
public class ConfigDefTest {
@@ -368,6 +369,32 @@ public class ConfigDefTest {
}
@Test
+ public void testDynamicUpdateModeInDocs() throws Exception {
+ final ConfigDef configDef = new ConfigDef()
+ .define("my.broker.config", Type.LONG, Importance.HIGH, "docs")
+ .define("my.cluster.config", Type.LONG, Importance.HIGH, "docs")
+ .define("my.readonly.config", Type.LONG, Importance.HIGH, "docs");
+ final Map<String, String> updateModes = new HashMap<>();
+ updateModes.put("my.broker.config", "per-broker");
+ updateModes.put("my.cluster.config", "cluster-wide");
+ final String html = configDef.toHtmlTable(updateModes);
+ Set<String> configsInHtml = new HashSet();
+ for (String line : html.split("\n")) {
+ if (line.contains("my.broker.config")) {
+ assertTrue(line.contains("per-broker"));
+ configsInHtml.add("my.broker.config");
+ } else if (line.contains("my.cluster.config")) {
+ assertTrue(line.contains("cluster-wide"));
+ configsInHtml.add("my.cluster.config");
+ } else if (line.contains("my.readonly.config")) {
+ assertTrue(line.contains("read-only"));
+ configsInHtml.add("my.readonly.config");
+ }
+ }
+ assertEquals(configDef.names(), configsInHtml);
+ }
+
+ @Test
public void testNames() {
final ConfigDef configDef = new ConfigDef()
.define("a", Type.STRING, Importance.LOW, "docs")
diff --git a/core/src/main/scala/kafka/server/DynamicBrokerConfig.scala b/core/src/main/scala/kafka/server/DynamicBrokerConfig.scala
index a95de0a..ce4b9e7 100755
--- a/core/src/main/scala/kafka/server/DynamicBrokerConfig.scala
+++ b/core/src/main/scala/kafka/server/DynamicBrokerConfig.scala
@@ -115,6 +115,13 @@ object DynamicBrokerConfig {
config.displayName, config.dependents, config.recommender)
}
}
+
+ private[server] def dynamicConfigUpdateModes: util.Map[String, String] = {
+ AllDynamicConfigs.map { name =>
+ val mode = if (PerBrokerConfigs.contains(name)) "per-broker" else "cluster-wide"
+ (name -> mode)
+ }.toMap.asJava
+ }
}
class DynamicBrokerConfig(private val kafkaConfig: KafkaConfig) extends Logging {
diff --git a/core/src/main/scala/kafka/server/KafkaConfig.scala b/core/src/main/scala/kafka/server/KafkaConfig.scala
index 0b9bdaa..529d0e6 100755
--- a/core/src/main/scala/kafka/server/KafkaConfig.scala
+++ b/core/src/main/scala/kafka/server/KafkaConfig.scala
@@ -238,7 +238,7 @@ object KafkaConfig {
private val LogConfigPrefix = "log."
def main(args: Array[String]) {
- System.out.println(configDef.toHtmlTable)
+ System.out.println(configDef.toHtmlTable(DynamicBrokerConfig.dynamicConfigUpdateModes))
}
/** ********* Zookeeper Configuration ***********/
diff --git a/docs/configuration.html b/docs/configuration.html
index df5eebb..df58ba7 100644
--- a/docs/configuration.html
+++ b/docs/configuration.html
@@ -33,6 +33,162 @@
<p>More details about broker configuration can be found in the scala class <code>kafka.server.KafkaConfig</code>.</p>
+ <h4><a id="dynamicbrokerconfigs" href="#dynamicbrokerconfigs">3.1.1 Updating Broker Configs</a></h4>
+ From Kafka version 1.1 onwards, some of the broker configs can be updated without restarting the broker. See the
+ <code>Dynamic Update Mode</code> column in <a href="#brokerconfigs">Broker Configs</a> for the update mode of each broker config.
+ <ul>
+ <li><code>read-only</code>: Requires a broker restart for update</li>
+ <li><code>per-broker</code>: May be updated dynamically for each broker</li>
+ <li><code>cluster-wide</code>: May be updated dynamically as a cluster-wide default. May also be updated as a per-broker value for testing.</li>
+ </ul>
+
+ To alter the current broker configs for broker id 0 (for example, the number of log cleaner threads):
+ <pre class="brush: bash;">
+ > bin/kafka-configs.sh --bootstrap-server localhost:9092 --entity-type brokers --entity-name 0 --alter --add-config log.cleaner.threads=2
+ </pre>
+
+ To describe the current dynamic broker configs for broker id 0:
+ <pre class="brush: bash;">
+ > bin/kafka-configs.sh --bootstrap-server localhost:9092 --entity-type brokers --entity-name 0 --describe
+ </pre>
+
+ To delete a config override and revert to the statically configured or default value for broker id 0 (for example,
+ the number of log cleaner threads):
+ <pre class="brush: bash;">
+ > bin/kafka-configs.sh --bootstrap-server localhost:9092 --entity-type brokers --entity-name 0 --alter --delete-config log.cleaner.threads
+ </pre>
+
+ Some configs may be configured as a cluster-wide default to maintain consistent values across the whole cluster. All brokers
+ in the cluster will process the cluster default update. For example, to update log cleaner threads on all brokers:
+ <pre class="brush: bash;">
+ > bin/kafka-configs.sh --bootstrap-server localhost:9092 --entity-type brokers --entity-default --alter --add-config log.cleaner.threads=2
+ </pre>
+
+ To describe the currently configured dynamic cluster-wide default configs:
+ <pre class="brush: bash;">
+ > bin/kafka-configs.sh --bootstrap-server localhost:9092 --entity-type brokers --entity-default --describe
+ </pre>
+
+ All configs that are configurable at cluster level may also be configured at per-broker level (e.g. for testing).
+ If a config value is defined at different levels, the following order of precedence is used:
+ <ul>
+ <li>Dynamic per-broker config stored in ZooKeeper</li>
+ <li>Dynamic cluster-wide default config stored in ZooKeeper</li>
+ <li>Static broker config from <code>server.properties</code></li>
+ <li>Kafka default, see <a href="#brokerconfigs">broker configs</a></li>
+ </ul>
+
+ <h5>Updating Password Configs Dynamically</h5>
+ <p>Password config values that are dynamically updated are encrypted before storing in ZooKeeper. The broker config
+ <code>password.encoder.secret</code> must be configured in <code>server.properties</code> to enable dynamic update
+ of password configs. The secret may be different on different brokers.</p>
+ <p>The secret used for password encoding may be rotated with a rolling restart of brokers. The old secret used for encoding
+ passwords currently in ZooKeeper must be provided in the static broker config <code>password.encoder.old.secret</code> and
+ the new secret must be provided in <code>password.encoder.secret</code>. All dynamic password configs stored in ZooKeeper
+ will be re-encoded with the new secret when the broker starts up.</p>
+ <p>In Kafka 1.1.x, all dynamically updated password configs must be provided in every alter request when updating configs
+ using <code>kafka-configs.sh</code> even if the password config is not being altered. This constraint will be removed in
+ a future release.</p>
+
+ <h5>Updating SSL Keystore of an Existing Listener</h5>
+ Brokers may be configured with SSL keystores with short validity periods to reduce the risk of compromised certificates.
+ Keystores may be updated dynamically without restarting the broker. The config name must be prefixed with the listener prefix
+ <code>listener.name.{listenerName}.</code> so that only the keystore config of a specific listener is updated.
+ The following configs may be updated in a single alter request at per-broker level:
+ <ul>
+ <li><code>ssl.keystore.type</code></li>
+ <li><code>ssl.keystore.location</code></li>
+ <li><code>ssl.keystore.password</code></li>
+ <li><code>ssl.key.password</code></li>
+ </ul>
+ If the listener is the inter-broker listener, the update is allowed only if the new keystore is trusted by the truststore
+ configured for that listener. For other listeners, no trust validation is performed on the keystore by the broker. Certificates
+ must be signed by the same certificate authority that signed the old certificate to avoid any client authentication failures.
+
+ <h5>Updating Default Topic Configuration</h5>
+ Default topic configuration options used by brokers may be updated without broker restart. The configs are applied to topics
+ without a topic config override for the equivalent per-topic config. One or more of these configs may be overridden at
+ cluster-default level used by all brokers.
+ <ul>
+ <li><code>log.segment.bytes</code></li>
+ <li><code>log.roll.ms</code></li>
+ <li><code>log.roll.hours</code></li>
+ <li><code>log.roll.jitter.ms</code></li>
+ <li><code>log.roll.jitter.hours</code></li>
+ <li><code>log.index.size.max.bytes</code></li>
+ <li><code>log.flush.interval.messages</code></li>
+ <li><code>log.flush.interval.ms</code></li>
+ <li><code>log.retention.bytes</code></li>
+ <li><code>log.retention.ms</code></li>
+ <li><code>log.retention.minutes</code></li>
+ <li><code>log.retention.hours</code></li>
+ <li><code>log.index.interval.bytes</code></li>
+ <li><code>log.cleaner.delete.retention.ms</code></li>
+ <li><code>log.cleaner.min.compaction.lag.ms</code></li>
+ <li><code>log.cleaner.min.cleanable.ratio</code></li>
+ <li><code>log.cleanup.policy</code></li>
+ <li><code>log.segment.delete.delay.ms</code></li>
+ <li><code>unclean.leader.election.enable</code></li>
+ <li><code>min.insync.replicas</code></li>
+ <li><code>max.message.bytes</code></li>
+ <li><code>compression.type</code></li>
+ <li><code>log.preallocate</code></li>
+ <li><code>log.message.timestamp.type</code></li>
+ <li><code>log.message.timestamp.difference.max.ms</code></li>
+ </ul>
+
+ In Kafka version 1.1.x, changes to <code>unclean.leader.election.enable</code> take effect only when a new controller is elected.
+ Controller re-election may be forced by running:
+
+ <pre class="brush: bash;">
+ > bin/zookeeper-shell.sh localhost
+ rmr /controller
+ </pre>
+
+ <h5>Updating Log Cleaner Configs</h5>
+ Log cleaner configs may be updated dynamically at cluster-default level used by all brokers. The changes take effect
+ on the next iteration of log cleaning. One or more of these configs may be updated:
+ <ul>
+ <li><code>log.cleaner.threads</code></li>
+ <li><code>log.cleaner.io.max.bytes.per.second</code></li>
+ <li><code>log.cleaner.dedupe.buffer.size</code></li>
+ <li><code>log.cleaner.io.buffer.size</code></li>
+ <li><code>log.cleaner.io.buffer.load.factor</code></li>
+ <li><code>log.cleaner.backoff.ms</code></li>
+ </ul>
+
+ <h5>Updating Thread Configs</h5>
+ The size of various thread pools used by the broker may be updated dynamically at cluster-default level used by all brokers.
+ Updates are restricted to the range <code>currentSize / 2</code> to <code>currentSize * 2</code> to ensure that config updates are
+ handled gracefully.
+ <ul>
+ <li><code>num.network.threads</code></li>
+ <li><code>num.io.threads</code></li>
+ <li><code>num.replica.fetchers</code></li>
+ <li><code>num.recovery.threads.per.data.dir</code></li>
+ <li><code>log.cleaner.threads</code></li>
+ <li><code>background.threads</code></li>
+ </ul>
+
+ <h5>Adding and Removing Listeners</h5>
+ <p>Listeners may be added or removed dynamically. When a new listener is added, security configs of the listener must be provided
+ as listener configs with the listener prefix <code>listener.name.{listenerName}.</code>. If the new listener uses SASL,
+ the JAAS configuration of the listener must be provided using the JAAS configuration property <code>sasl.jaas.config</code>
+ with the listener and mechanism prefix. See <a href="#security_jaas_broker">JAAS configuration for Kafka brokers</a> for details.</p>
+
+ <p>In Kafka version 1.1.x, the listener used by the inter-broker listener may not be updated dynamically. To update the inter-broker
+ listener to a new listener, the new listener may be added on all brokers without restarting the broker. A rolling restart is then
+ required to update <code>inter.broker.listener.name</code>.</p>
+
+ In addition to all the security configs of new listeners, the following configs may be updated dynamically at per-broker level:
+ <ul>
+ <li><code>listeners</code></li>
+ <li><code>advertised.listeners</code></li>
+ <li><code>listener.security.protocol.map</code></li>
+ </ul>
+ Inter-broker listener must be configured using the static broker configuration <code>inter.broker.listener.name</code>
+ or <code>inter.broker.security.protocol</code>.
+
<h3><a id="topicconfigs" href="#topicconfigs">3.2 Topic-Level Configs</a></h3>
Configurations pertinent to topics have both a server default as well an optional per-topic override. If no per-topic configuration is given the server default is used. The override can be set at topic creation time by giving one or more <code>--config</code> options. This example creates a topic named <i>my-topic</i> with a custom max message size and flush rate:
diff --git a/docs/security.html b/docs/security.html
index 3e3c818..db6f487 100644
--- a/docs/security.html
+++ b/docs/security.html
@@ -231,7 +231,9 @@
<p><tt>KafkaServer</tt> is the section name in the JAAS file used by each
KafkaServer/Broker. This section provides SASL configuration options
for the broker including any SASL client connections made by the broker
- for inter-broker communication.</p>
+ for inter-broker communication. If multiple listeners are configured to use
+ SASL, the section name may be prefixed with the listener name in lower-case
+ followed by a period, e.g. <tt>sasl_ssl.KafkaServer</tt>.</p>
<p><tt>Client</tt> section is used to authenticate a SASL connection with
zookeeper. It also allows the brokers to set SASL ACL on zookeeper
@@ -246,6 +248,35 @@
<tt>zookeeper.sasl.client.username</tt> to the appropriate name
(<i>e.g.</i>, <tt>-Dzookeeper.sasl.client.username=zk</tt>).</p></li>
+ <p>Brokers may also configure JAAS using the broker configuration property <code>sasl.jaas.config</code>.
+ The property name must be prefixed with the listener prefix including the SASL mechanism,
+ i.e. <code>listener.name.{listenerName}.{saslMechanism}.sasl.jaas.config</code>. Only one
+ login module may be specified in the config value. If multiple mechanisms are configured on a
+ listener, configs must be provided for each mechanism using the listener and mechanism prefix.
+ For example,
+ <pre class="brush: text;">
+ listener.name.sasl_ssl.scram-sha-256.sasl.jaas.config=org.apache.kafka.common.security.scram.ScramLoginModule required \
+ username="admin" \
+ password="admin-secret";
+ listener.name.sasl_ssl.plain.sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required \
+ username="admin" \
+ password="admin-secret" \
+ user_admin="admin-secret" \
+ user_alice="alice-secret";</pre>
+
+ If JAAS configuration is defined at different levels, the order of precedence used is:
+ <ul>
+ <li>Broker configuration property <code>listener.name.{listenerName}.{saslMechanism}.sasl.jaas.config</code></li>
+ <li><code>{listenerName}.KafkaServer</code> section of static JAAS configuration</code></li>
+ <li><code>KafkaServer</code> section of static JAAS configuration</code></li>
+ </ul>
+ Note that ZooKeeper JAAS config may only be configured using static JAAS configuration.
+
+ <p>See <a href="#security_sasl_kerberos_brokerconfig">GSSAPI (Kerberos)</a>,
+ <a href="#security_sasl_plain_brokerconfig">PLAIN</a> or
+ <a href="#security_sasl_scram_brokerconfig">SCRAM</a> for example broker configurations.</p></li>
+
+
<li><h5><a id="security_jaas_client"
href="#security_jaas_client">JAAS configuration for Kafka clients</a></h5>
--
To stop receiving notification emails like this one, please contact
rsivaram@apache.org.