You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by jg...@apache.org on 2018/01/23 16:44:35 UTC
[kafka] branch trunk updated: KAFKA-6241;
Enable dynamic updates of broker SSL keystore (#4263)
This is an automated email from the ASF dual-hosted git repository.
jgus pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new b814a16 KAFKA-6241; Enable dynamic updates of broker SSL keystore (#4263)
b814a16 is described below
commit b814a16b968d144802d08523b5c359d6706f5632
Author: Rajini Sivaram <ra...@googlemail.com>
AuthorDate: Tue Jan 23 08:44:31 2018 -0800
KAFKA-6241; Enable dynamic updates of broker SSL keystore (#4263)
Enable dynamic broker configuration (see KIP-226 for details). Includes
- Base implementation to allow specific broker configs and custom configs to be dynamically updated
- Extend DescribeConfigsRequest/Response to return all synonym configs and their sources in the order of precedence
- Extend AdminClient to alter dynamic broker configs
- Dynamic update of SSL keystores
Reviewers: Ted Yu <yu...@gmail.com>, Jason Gustafson <ja...@confluent.io>
---
checkstyle/suppressions.xml | 2 +-
.../apache/kafka/clients/admin/ConfigEntry.java | 138 +++++-
.../clients/admin/DescribeConfigsOptions.java | 17 +
.../kafka/clients/admin/KafkaAdminClient.java | 87 +++-
.../org/apache/kafka/common/Reconfigurable.java | 49 +++
.../org/apache/kafka/common/config/ConfigDef.java | 33 ++
.../apache/kafka/common/config/ConfigResource.java | 8 +
.../org/apache/kafka/common/config/SslConfigs.java | 7 +
.../kafka/common/network/ChannelBuilder.java | 9 +-
.../kafka/common/network/ChannelBuilders.java | 21 +-
.../network/ListenerReconfigurable.java} | 26 +-
.../kafka/common/network/SaslChannelBuilder.java | 34 +-
.../kafka/common/network/SslChannelBuilder.java | 37 +-
.../apache/kafka/common/protocol/types/Struct.java | 6 +-
.../common/requests/DescribeConfigsRequest.java | 29 +-
.../common/requests/DescribeConfigsResponse.java | 160 ++++++-
.../kafka/common/security/ssl/SslFactory.java | 282 ++++++++++--
.../apache/kafka/common/network/NioEchoServer.java | 2 +-
.../common/network/SaslChannelBuilderTest.java | 2 +-
.../kafka/common/network/SslSelectorTest.java | 6 +-
.../common/network/SslTransportLayerTest.java | 82 +++-
.../kafka/common/requests/RequestResponseTest.java | 26 +-
.../authenticator/SaslAuthenticatorTest.java | 4 +-
.../kafka/common/security/ssl/SslFactoryTest.java | 65 ++-
core/src/main/scala/kafka/log/LogConfig.scala | 31 +-
.../main/scala/kafka/network/SocketServer.scala | 36 +-
.../src/main/scala/kafka/server/AdminManager.scala | 153 +++++--
.../main/scala/kafka/server/ConfigHandler.scala | 8 +-
.../scala/kafka/server/DynamicBrokerConfig.scala | 360 ++++++++++++++++
.../main/scala/kafka/server/DynamicConfig.scala | 18 +-
core/src/main/scala/kafka/server/KafkaApis.scala | 2 +-
core/src/main/scala/kafka/server/KafkaConfig.scala | 50 ++-
core/src/main/scala/kafka/server/KafkaServer.scala | 13 +-
.../scala/kafka/server/KafkaServerStartable.scala | 6 +-
core/src/main/scala/kafka/utils/ZkUtils.scala | 1 +
core/src/main/scala/kafka/zk/AdminZkClient.scala | 27 +-
core/src/main/scala/kafka/zk/ZkData.scala | 6 +-
.../server/DynamicBrokerReconfigurationTest.scala | 471 +++++++++++++++++++++
.../kafka/server/DynamicBrokerConfigTest.scala | 153 +++++++
.../unit/kafka/server/DynamicConfigTest.scala | 7 +-
.../test/scala/unit/kafka/utils/TestUtils.scala | 28 +-
41 files changed, 2270 insertions(+), 232 deletions(-)
diff --git a/checkstyle/suppressions.xml b/checkstyle/suppressions.xml
index 05d4a52..664a3f6 100644
--- a/checkstyle/suppressions.xml
+++ b/checkstyle/suppressions.xml
@@ -51,7 +51,7 @@
files="(Utils|Topic|KafkaLZ4BlockOutputStream|AclData).java"/>
<suppress checks="CyclomaticComplexity"
- files="(ConsumerCoordinator|Fetcher|Sender|KafkaProducer|BufferPool|ConfigDef|RecordAccumulator|KerberosLogin|AbstractRequest|AbstractResponse|Selector|SslTransportLayer|SaslClientAuthenticator|SaslClientCallbackHandler).java"/>
+ files="(ConsumerCoordinator|Fetcher|Sender|KafkaProducer|BufferPool|ConfigDef|RecordAccumulator|KerberosLogin|AbstractRequest|AbstractResponse|Selector|SslFactory|SslTransportLayer|SaslClientAuthenticator|SaslClientCallbackHandler).java"/>
<suppress checks="JavaNCSS"
files="AbstractRequest.java|KerberosLogin.java|WorkerSinkTaskTest.java"/>
diff --git a/clients/src/main/java/org/apache/kafka/clients/admin/ConfigEntry.java b/clients/src/main/java/org/apache/kafka/clients/admin/ConfigEntry.java
index 3f24c81..e8da646 100644
--- a/clients/src/main/java/org/apache/kafka/clients/admin/ConfigEntry.java
+++ b/clients/src/main/java/org/apache/kafka/clients/admin/ConfigEntry.java
@@ -19,6 +19,8 @@ package org.apache.kafka.clients.admin;
import org.apache.kafka.common.annotation.InterfaceStability;
+import java.util.Collections;
+import java.util.List;
import java.util.Objects;
/**
@@ -31,9 +33,10 @@ public class ConfigEntry {
private final String name;
private final String value;
- private final boolean isDefault;
+ private final ConfigSource source;
private final boolean isSensitive;
private final boolean isReadOnly;
+ private final List<ConfigSynonym> synonyms;
/**
* Create a configuration entry with the provided values.
@@ -53,14 +56,36 @@ public class ConfigEntry {
* @param isDefault whether the config value is the default or if it's been explicitly set
* @param isSensitive whether the config value is sensitive, the broker never returns the value if it is sensitive
* @param isReadOnly whether the config is read-only and cannot be updated
+ * @deprecated since 1.1.0. This constructor will be removed in a future release.
*/
public ConfigEntry(String name, String value, boolean isDefault, boolean isSensitive, boolean isReadOnly) {
+ this(name,
+ value,
+ isDefault ? ConfigSource.DEFAULT_CONFIG : ConfigSource.UNKNOWN,
+ isSensitive,
+ isReadOnly,
+ Collections.<ConfigSynonym>emptyList());
+ }
+
+ /**
+ * Create a configuration with the provided values.
+ *
+ * @param name the non-null config name
+ * @param value the config value or null
+ * @param source the source of this config entry
+ * @param isSensitive whether the config value is sensitive, the broker never returns the value if it is sensitive
+ * @param isReadOnly whether the config is read-only and cannot be updated
+ * @param synonyms Synonym configs in order of precedence
+ */
+ ConfigEntry(String name, String value, ConfigSource source, boolean isSensitive, boolean isReadOnly,
+ List<ConfigSynonym> synonyms) {
Objects.requireNonNull(name, "name should not be null");
this.name = name;
this.value = value;
- this.isDefault = isDefault;
+ this.source = source;
this.isSensitive = isSensitive;
this.isReadOnly = isReadOnly;
+ this.synonyms = synonyms;
}
/**
@@ -78,10 +103,17 @@ public class ConfigEntry {
}
/**
+ * Return the source of this configuration entry.
+ */
+ public ConfigSource source() {
+ return source;
+ }
+
+ /**
* Return whether the config value is the default or if it's been explicitly set.
*/
public boolean isDefault() {
- return isDefault;
+ return source == ConfigSource.DEFAULT_CONFIG;
}
/**
@@ -99,6 +131,15 @@ public class ConfigEntry {
return isReadOnly;
}
+ /**
+ * Returns all config values that may be used as the value of this config along with their source,
+ * in the order of precedence. The list starts with the value returned in this ConfigEntry.
+ * The list is empty if synonyms were not requested using {@link DescribeConfigsOptions#includeSynonyms(boolean)}
+ */
+ public List<ConfigSynonym> synonyms() {
+ return synonyms;
+ }
+
@Override
public boolean equals(Object o) {
if (this == o)
@@ -110,9 +151,10 @@ public class ConfigEntry {
return this.name.equals(that.name) &&
this.value != null ? this.value.equals(that.value) : that.value == null &&
- this.isDefault == that.isDefault &&
this.isSensitive == that.isSensitive &&
- this.isReadOnly == that.isReadOnly;
+ this.isReadOnly == that.isReadOnly &&
+ this.source == that.source &&
+ Objects.equals(this.synonyms, that.synonyms);
}
@Override
@@ -121,9 +163,10 @@ public class ConfigEntry {
int result = 1;
result = prime * result + name.hashCode();
result = prime * result + ((value == null) ? 0 : value.hashCode());
- result = prime * result + (isDefault ? 1 : 0);
result = prime * result + (isSensitive ? 1 : 0);
result = prime * result + (isReadOnly ? 1 : 0);
+ result = prime * result + source.hashCode();
+ result = prime * result + synonyms.hashCode();
return result;
}
@@ -132,9 +175,90 @@ public class ConfigEntry {
return "ConfigEntry(" +
"name=" + name +
", value=" + value +
- ", isDefault=" + isDefault +
+ ", source=" + source +
", isSensitive=" + isSensitive +
", isReadOnly=" + isReadOnly +
+ ", synonyms=" + synonyms +
")";
}
+
+
+ /**
+ * Source of configuration entries.
+ */
+ public enum ConfigSource {
+ DYNAMIC_TOPIC_CONFIG, // dynamic topic config that is configured for a specific topic
+ DYNAMIC_BROKER_CONFIG, // dynamic broker config that is configured for a specific broker
+ DYNAMIC_DEFAULT_BROKER_CONFIG, // dynamic broker config that is configured as default for all brokers in the cluster
+ STATIC_BROKER_CONFIG, // static broker config provided as broker properties at start up (e.g. server.properties file)
+ DEFAULT_CONFIG, // built-in default configuration for configs that have a default value
+ UNKNOWN // source unknown e.g. in the ConfigEntry used for alter requests where source is not set
+ }
+
+ /**
+ * Class representing a configuration synonym of a {@link ConfigEntry}.
+ */
+ public static class ConfigSynonym {
+
+ private final String name;
+ private final String value;
+ private final ConfigSource source;
+
+ /**
+ * Create a configuration synonym with the provided values.
+ *
+ * @param name Configuration name (this may be different from the name of the associated {@link ConfigEntry}
+ * @param value Configuration value
+ * @param source {@link ConfigSource} of this configuraton
+ */
+ ConfigSynonym(String name, String value, ConfigSource source) {
+ this.name = name;
+ this.value = value;
+ this.source = source;
+ }
+
+ /**
+ * Returns the name of this configuration.
+ */
+ public String name() {
+ return name;
+ }
+
+ /**
+ * Returns the value of this configuration, which may be null if the configuration is sensitive.
+ */
+ public String value() {
+ return value;
+ }
+
+ /**
+ * Returns the source of this configuration.
+ */
+ public ConfigSource source() {
+ return source;
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) return true;
+ if (o == null || getClass() != o.getClass()) return false;
+
+ ConfigSynonym that = (ConfigSynonym) o;
+ return Objects.equals(name, that.name) && Objects.equals(value, that.value) && source == that.source;
+ }
+
+ @Override
+ public int hashCode() {
+ return Objects.hash(name, value, source);
+ }
+
+ @Override
+ public String toString() {
+ return "ConfigSynonym(" +
+ "name=" + name +
+ ", value=" + value +
+ ", source=" + source +
+ ")";
+ }
+ }
}
diff --git a/clients/src/main/java/org/apache/kafka/clients/admin/DescribeConfigsOptions.java b/clients/src/main/java/org/apache/kafka/clients/admin/DescribeConfigsOptions.java
index b0b9b3c..aa667af 100644
--- a/clients/src/main/java/org/apache/kafka/clients/admin/DescribeConfigsOptions.java
+++ b/clients/src/main/java/org/apache/kafka/clients/admin/DescribeConfigsOptions.java
@@ -29,6 +29,8 @@ import java.util.Collection;
@InterfaceStability.Evolving
public class DescribeConfigsOptions extends AbstractOptions<DescribeConfigsOptions> {
+ private boolean includeSynonyms = false;
+
/**
* Set the request timeout in milliseconds for this operation or {@code null} if the default request timeout for the
* AdminClient should be used.
@@ -40,4 +42,19 @@ public class DescribeConfigsOptions extends AbstractOptions<DescribeConfigsOptio
return this;
}
+ /**
+ * Return true if synonym configs should be returned in the response.
+ */
+ public boolean includeSynonyms() {
+ return includeSynonyms;
+ }
+
+ /**
+ * Set to true if synonym configs should be returned in the response.
+ */
+ public DescribeConfigsOptions includeSynonyms(boolean includeSynonyms) {
+ this.includeSynonyms = includeSynonyms;
+ return this;
+ }
+
}
diff --git a/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java b/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java
index cf48846..6d6788d 100644
--- a/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java
+++ b/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java
@@ -1494,7 +1494,7 @@ public class KafkaAdminClient extends AdminClient {
final Collection<Resource> unifiedRequestResources = new ArrayList<>(configResources.size());
for (ConfigResource resource : configResources) {
- if (resource.type() == ConfigResource.Type.BROKER) {
+ if (resource.type() == ConfigResource.Type.BROKER && !resource.isDefault()) {
brokerFutures.put(resource, new KafkaFutureImpl<Config>());
brokerResources.add(configResourceToResource(resource));
} else {
@@ -1510,7 +1510,8 @@ public class KafkaAdminClient extends AdminClient {
@Override
AbstractRequest.Builder createRequest(int timeoutMs) {
- return new DescribeConfigsRequest.Builder(unifiedRequestResources);
+ return new DescribeConfigsRequest.Builder(unifiedRequestResources)
+ .includeSynonyms(options.includeSynonyms());
}
@Override
@@ -1531,8 +1532,10 @@ public class KafkaAdminClient extends AdminClient {
}
List<ConfigEntry> configEntries = new ArrayList<>();
for (DescribeConfigsResponse.ConfigEntry configEntry : config.entries()) {
- configEntries.add(new ConfigEntry(configEntry.name(), configEntry.value(),
- configEntry.isDefault(), configEntry.isSensitive(), configEntry.isReadOnly()));
+ configEntries.add(new ConfigEntry(configEntry.name(),
+ configEntry.value(), configSource(configEntry.source()),
+ configEntry.isSensitive(), configEntry.isReadOnly(),
+ configSynonyms(configEntry)));
}
future.complete(new Config(configEntries));
}
@@ -1554,7 +1557,8 @@ public class KafkaAdminClient extends AdminClient {
@Override
AbstractRequest.Builder createRequest(int timeoutMs) {
- return new DescribeConfigsRequest.Builder(Collections.singleton(resource));
+ return new DescribeConfigsRequest.Builder(Collections.singleton(resource))
+ .includeSynonyms(options.includeSynonyms());
}
@Override
@@ -1573,7 +1577,8 @@ public class KafkaAdminClient extends AdminClient {
List<ConfigEntry> configEntries = new ArrayList<>();
for (DescribeConfigsResponse.ConfigEntry configEntry : config.entries()) {
configEntries.add(new ConfigEntry(configEntry.name(), configEntry.value(),
- configEntry.isDefault(), configEntry.isSensitive(), configEntry.isReadOnly()));
+ configSource(configEntry.source()), configEntry.isSensitive(), configEntry.isReadOnly(),
+ configSynonyms(configEntry)));
}
brokerFuture.complete(new Config(configEntries));
}
@@ -1606,24 +1611,74 @@ public class KafkaAdminClient extends AdminClient {
return new Resource(resourceType, configResource.name());
}
+ private List<ConfigEntry.ConfigSynonym> configSynonyms(DescribeConfigsResponse.ConfigEntry configEntry) {
+ List<ConfigEntry.ConfigSynonym> synonyms = new ArrayList<>(configEntry.synonyms().size());
+ for (DescribeConfigsResponse.ConfigSynonym synonym : configEntry.synonyms()) {
+ synonyms.add(new ConfigEntry.ConfigSynonym(synonym.name(), synonym.value(), configSource(synonym.source())));
+ }
+ return synonyms;
+ }
+
+ private ConfigEntry.ConfigSource configSource(DescribeConfigsResponse.ConfigSource source) {
+ ConfigEntry.ConfigSource configSource;
+ switch (source) {
+ case TOPIC_CONFIG:
+ configSource = ConfigEntry.ConfigSource.DYNAMIC_TOPIC_CONFIG;
+ break;
+ case DYNAMIC_BROKER_CONFIG:
+ configSource = ConfigEntry.ConfigSource.DYNAMIC_BROKER_CONFIG;
+ break;
+ case DYNAMIC_DEFAULT_BROKER_CONFIG:
+ configSource = ConfigEntry.ConfigSource.DYNAMIC_DEFAULT_BROKER_CONFIG;
+ break;
+ case STATIC_BROKER_CONFIG:
+ configSource = ConfigEntry.ConfigSource.STATIC_BROKER_CONFIG;
+ break;
+ case DEFAULT_CONFIG:
+ configSource = ConfigEntry.ConfigSource.DEFAULT_CONFIG;
+ break;
+ default:
+ throw new IllegalArgumentException("Unexpected config source " + source);
+ }
+ return configSource;
+ }
+
@Override
public AlterConfigsResult alterConfigs(Map<ConfigResource, Config> configs, final AlterConfigsOptions options) {
- final Map<ConfigResource, KafkaFutureImpl<Void>> futures = new HashMap<>(configs.size());
- for (ConfigResource configResource : configs.keySet()) {
- futures.put(configResource, new KafkaFutureImpl<Void>());
+ final Map<ConfigResource, KafkaFutureImpl<Void>> allFutures = new HashMap<>();
+ // We must make a separate AlterConfigs request for every BROKER resource we want to alter
+ // and send the request to that specific broker. Other resources are grouped together into
+ // a single request that may be sent to any broker.
+ final Collection<ConfigResource> unifiedRequestResources = new ArrayList<>();
+
+ for (ConfigResource resource : configs.keySet()) {
+ if (resource.type() == ConfigResource.Type.BROKER && !resource.isDefault()) {
+ NodeProvider nodeProvider = new ConstantNodeIdProvider(Integer.parseInt(resource.name()));
+ allFutures.putAll(alterConfigs(configs, options, Collections.singleton(resource), nodeProvider));
+ } else
+ unifiedRequestResources.add(resource);
}
- final Map<Resource, AlterConfigsRequest.Config> requestMap = new HashMap<>(configs.size());
- for (Map.Entry<ConfigResource, Config> entry : configs.entrySet()) {
+ if (!unifiedRequestResources.isEmpty())
+ allFutures.putAll(alterConfigs(configs, options, unifiedRequestResources, new LeastLoadedNodeProvider()));
+ return new AlterConfigsResult(new HashMap<ConfigResource, KafkaFuture<Void>>(allFutures));
+ }
+
+ private Map<ConfigResource, KafkaFutureImpl<Void>> alterConfigs(Map<ConfigResource, Config> configs,
+ final AlterConfigsOptions options,
+ Collection<ConfigResource> resources,
+ NodeProvider nodeProvider) {
+ final Map<ConfigResource, KafkaFutureImpl<Void>> futures = new HashMap<>();
+ final Map<Resource, AlterConfigsRequest.Config> requestMap = new HashMap<>(resources.size());
+ for (ConfigResource resource : resources) {
List<AlterConfigsRequest.ConfigEntry> configEntries = new ArrayList<>();
- for (ConfigEntry configEntry: entry.getValue().entries())
+ for (ConfigEntry configEntry: configs.get(resource).entries())
configEntries.add(new AlterConfigsRequest.ConfigEntry(configEntry.name(), configEntry.value()));
- ConfigResource resource = entry.getKey();
requestMap.put(configResourceToResource(resource), new AlterConfigsRequest.Config(configEntries));
+ futures.put(resource, new KafkaFutureImpl<Void>());
}
final long now = time.milliseconds();
- runnable.call(new Call("alterConfigs", calcDeadlineMs(now, options.timeoutMs()),
- new LeastLoadedNodeProvider()) {
+ runnable.call(new Call("alterConfigs", calcDeadlineMs(now, options.timeoutMs()), nodeProvider) {
@Override
public AbstractRequest.Builder createRequest(int timeoutMs) {
@@ -1649,7 +1704,7 @@ public class KafkaAdminClient extends AdminClient {
completeAllExceptionally(futures.values(), throwable);
}
}, now);
- return new AlterConfigsResult(new HashMap<ConfigResource, KafkaFuture<Void>>(futures));
+ return futures;
}
@Override
diff --git a/clients/src/main/java/org/apache/kafka/common/Reconfigurable.java b/clients/src/main/java/org/apache/kafka/common/Reconfigurable.java
new file mode 100644
index 0000000..3339dce
--- /dev/null
+++ b/clients/src/main/java/org/apache/kafka/common/Reconfigurable.java
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.common;
+
+import java.util.Map;
+import java.util.Set;
+
+/**
+ * Interface for reconfigurable classes that support dynamic configuration.
+ */
+public interface Reconfigurable extends Configurable {
+
+ /**
+ * Returns the names of configs that may be reconfigured.
+ */
+ Set<String> reconfigurableConfigs();
+
+ /**
+ * Validates the provided configuration. The provided map contains
+ * all configs including any reconfigurable configs that may be different
+ * from the initial configuration. Reconfiguration will be not performed
+ * if this method returns false or throws any exception.
+ */
+ boolean validateReconfiguration(Map<String, ?> configs);
+
+ /**
+ * Reconfigures this instance with the given key-value pairs. The provided
+ * map contains all configs including any reconfigurable configs that
+ * may have changed since the object was initially configured using
+ * {@link Configurable#configure(Map)}. This method will only be invoked if
+ * the configs have passed validation using {@link #validateReconfiguration(Map)}.
+ */
+ void reconfigure(Map<String, ?> configs);
+
+}
diff --git a/clients/src/main/java/org/apache/kafka/common/config/ConfigDef.java b/clients/src/main/java/org/apache/kafka/common/config/ConfigDef.java
index 7b9881f..3340ab3 100644
--- a/clients/src/main/java/org/apache/kafka/common/config/ConfigDef.java
+++ b/clients/src/main/java/org/apache/kafka/common/config/ConfigDef.java
@@ -105,6 +105,15 @@ public class ConfigDef {
return Collections.unmodifiableSet(configKeys.keySet());
}
+ public Map<String, Object> defaultValues() {
+ Map<String, Object> defaultValues = new HashMap<>();
+ for (ConfigKey key : configKeys.values()) {
+ if (key.defaultValue != NO_DEFAULT_VALUE)
+ defaultValues.put(key.name, key.defaultValue);
+ }
+ return defaultValues;
+ }
+
public ConfigDef define(ConfigKey key) {
if (configKeys.containsKey(key.name)) {
throw new ConfigException("Configuration " + key.name + " is defined twice.");
@@ -746,6 +755,30 @@ public class ConfigDef {
}
/**
+ * Converts a map of config (key, value) pairs to a map of strings where each value
+ * is converted to a string. This method should be used with care since it stores
+ * actual password values to String. Values from this map should never be used in log entries.
+ */
+ public static Map<String, String> convertToStringMapWithPasswordValues(Map<String, ?> configs) {
+ Map<String, String> result = new HashMap<>();
+ for (Map.Entry<String, ?> entry : configs.entrySet()) {
+ Object value = entry.getValue();
+ String strValue;
+ if (value instanceof Password)
+ strValue = ((Password) value).value();
+ else if (value instanceof List)
+ strValue = convertToString(value, Type.LIST);
+ else if (value instanceof Class)
+ strValue = convertToString(value, Type.CLASS);
+ else
+ strValue = convertToString(value, null);
+ if (strValue != null)
+ result.put(entry.getKey(), strValue);
+ }
+ return result;
+ }
+
+ /**
* The config types
*/
public enum Type {
diff --git a/clients/src/main/java/org/apache/kafka/common/config/ConfigResource.java b/clients/src/main/java/org/apache/kafka/common/config/ConfigResource.java
index cd397ad..4402c26 100644
--- a/clients/src/main/java/org/apache/kafka/common/config/ConfigResource.java
+++ b/clients/src/main/java/org/apache/kafka/common/config/ConfigResource.java
@@ -61,6 +61,14 @@ public final class ConfigResource {
return name;
}
+ /**
+ * Returns true if this is the default resource of a resource type.
+ * Resource name is empty for the default resource.
+ */
+ public boolean isDefault() {
+ return name.isEmpty();
+ }
+
@Override
public boolean equals(Object o) {
if (this == o)
diff --git a/clients/src/main/java/org/apache/kafka/common/config/SslConfigs.java b/clients/src/main/java/org/apache/kafka/common/config/SslConfigs.java
index 042b051..fd4d39e 100644
--- a/clients/src/main/java/org/apache/kafka/common/config/SslConfigs.java
+++ b/clients/src/main/java/org/apache/kafka/common/config/SslConfigs.java
@@ -17,9 +17,11 @@
package org.apache.kafka.common.config;
import org.apache.kafka.common.config.internals.BrokerSecurityConfigs;
+import org.apache.kafka.common.utils.Utils;
import javax.net.ssl.KeyManagerFactory;
import javax.net.ssl.TrustManagerFactory;
+import java.util.Set;
public class SslConfigs {
/*
@@ -135,4 +137,9 @@ public class SslConfigs {
.define(SslConfigs.SSL_ENDPOINT_IDENTIFICATION_ALGORITHM_CONFIG, ConfigDef.Type.STRING, null, ConfigDef.Importance.LOW, SslConfigs.SSL_ENDPOINT_IDENTIFICATION_ALGORITHM_DOC)
.define(SslConfigs.SSL_SECURE_RANDOM_IMPLEMENTATION_CONFIG, ConfigDef.Type.STRING, null, ConfigDef.Importance.LOW, SslConfigs.SSL_SECURE_RANDOM_IMPLEMENTATION_DOC);
}
+
+ public static final Set<String> RECONFIGURABLE_CONFIGS = Utils.mkSet(SslConfigs.SSL_KEYSTORE_TYPE_CONFIG,
+ SslConfigs.SSL_KEYSTORE_LOCATION_CONFIG,
+ SslConfigs.SSL_KEYSTORE_PASSWORD_CONFIG,
+ SslConfigs.SSL_KEY_PASSWORD_CONFIG);
}
diff --git a/clients/src/main/java/org/apache/kafka/common/network/ChannelBuilder.java b/clients/src/main/java/org/apache/kafka/common/network/ChannelBuilder.java
index 54689f3..e3b2eeb 100644
--- a/clients/src/main/java/org/apache/kafka/common/network/ChannelBuilder.java
+++ b/clients/src/main/java/org/apache/kafka/common/network/ChannelBuilder.java
@@ -16,22 +16,17 @@
*/
package org.apache.kafka.common.network;
-import java.util.Map;
import java.nio.channels.SelectionKey;
import org.apache.kafka.common.KafkaException;
+import org.apache.kafka.common.Configurable;
import org.apache.kafka.common.memory.MemoryPool;
/**
* A ChannelBuilder interface to build Channel based on configs
*/
-public interface ChannelBuilder extends AutoCloseable {
-
- /**
- * Configure this class with the given key-value pairs
- */
- void configure(Map<String, ?> configs) throws KafkaException;
+public interface ChannelBuilder extends AutoCloseable, Configurable {
/**
* returns a Channel with TransportLayer and Authenticator configured.
diff --git a/clients/src/main/java/org/apache/kafka/common/network/ChannelBuilders.java b/clients/src/main/java/org/apache/kafka/common/network/ChannelBuilders.java
index 8f301ad..e6df78e 100644
--- a/clients/src/main/java/org/apache/kafka/common/network/ChannelBuilders.java
+++ b/clients/src/main/java/org/apache/kafka/common/network/ChannelBuilders.java
@@ -59,7 +59,7 @@ public class ChannelBuilders {
if (clientSaslMechanism == null)
throw new IllegalArgumentException("`clientSaslMechanism` must be non-null in client mode if `securityProtocol` is `" + securityProtocol + "`");
}
- return create(securityProtocol, Mode.CLIENT, contextType, config, listenerName, clientSaslMechanism,
+ return create(securityProtocol, Mode.CLIENT, contextType, config, listenerName, false, clientSaslMechanism,
saslHandshakeRequestEnable, null, null);
}
@@ -71,12 +71,13 @@ public class ChannelBuilders {
* @return the configured `ChannelBuilder`
*/
public static ChannelBuilder serverChannelBuilder(ListenerName listenerName,
+ boolean isInterBrokerListener,
SecurityProtocol securityProtocol,
AbstractConfig config,
CredentialCache credentialCache,
DelegationTokenCache tokenCache) {
- return create(securityProtocol, Mode.SERVER, JaasContext.Type.SERVER, config, listenerName, null,
- true, credentialCache, tokenCache);
+ return create(securityProtocol, Mode.SERVER, JaasContext.Type.SERVER, config, listenerName,
+ isInterBrokerListener, null, true, credentialCache, tokenCache);
}
private static ChannelBuilder create(SecurityProtocol securityProtocol,
@@ -84,6 +85,7 @@ public class ChannelBuilders {
JaasContext.Type contextType,
AbstractConfig config,
ListenerName listenerName,
+ boolean isInterBrokerListener,
String clientSaslMechanism,
boolean saslHandshakeRequestEnable,
CredentialCache credentialCache,
@@ -98,14 +100,21 @@ public class ChannelBuilders {
switch (securityProtocol) {
case SSL:
requireNonNullMode(mode, securityProtocol);
- channelBuilder = new SslChannelBuilder(mode);
+ channelBuilder = new SslChannelBuilder(mode, listenerName, isInterBrokerListener);
break;
case SASL_SSL:
case SASL_PLAINTEXT:
requireNonNullMode(mode, securityProtocol);
JaasContext jaasContext = JaasContext.load(contextType, listenerName, configs);
- channelBuilder = new SaslChannelBuilder(mode, jaasContext, securityProtocol, listenerName,
- clientSaslMechanism, saslHandshakeRequestEnable, credentialCache, tokenCache);
+ channelBuilder = new SaslChannelBuilder(mode,
+ jaasContext,
+ securityProtocol,
+ listenerName,
+ isInterBrokerListener,
+ clientSaslMechanism,
+ saslHandshakeRequestEnable,
+ credentialCache,
+ tokenCache);
break;
case PLAINTEXT:
channelBuilder = new PlaintextChannelBuilder();
diff --git a/clients/src/main/java/org/apache/kafka/clients/admin/DescribeConfigsOptions.java b/clients/src/main/java/org/apache/kafka/common/network/ListenerReconfigurable.java
similarity index 52%
copy from clients/src/main/java/org/apache/kafka/clients/admin/DescribeConfigsOptions.java
copy to clients/src/main/java/org/apache/kafka/common/network/ListenerReconfigurable.java
index b0b9b3c..3541212 100644
--- a/clients/src/main/java/org/apache/kafka/clients/admin/DescribeConfigsOptions.java
+++ b/clients/src/main/java/org/apache/kafka/common/network/ListenerReconfigurable.java
@@ -14,30 +14,18 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
+package org.apache.kafka.common.network;
-package org.apache.kafka.clients.admin;
-
-import org.apache.kafka.common.annotation.InterfaceStability;
-
-import java.util.Collection;
+import org.apache.kafka.common.Reconfigurable;
/**
- * Options for {@link AdminClient#describeConfigs(Collection)}.
- *
- * The API of this class is evolving, see {@link AdminClient} for details.
+ * Interface for reconfigurable entities associated with a listener.
*/
-@InterfaceStability.Evolving
-public class DescribeConfigsOptions extends AbstractOptions<DescribeConfigsOptions> {
+public interface ListenerReconfigurable extends Reconfigurable {
/**
- * Set the request timeout in milliseconds for this operation or {@code null} if the default request timeout for the
- * AdminClient should be used.
- *
+ * Returns the listener name associated with this reconfigurable. Listener-specific
+ * configs corresponding to this listener name are provided for reconfiguration.
*/
- // This method is retained to keep binary compatibility with 0.11
- public DescribeConfigsOptions timeoutMs(Integer timeoutMs) {
- this.timeoutMs = timeoutMs;
- return this;
- }
-
+ ListenerName listenerName();
}
diff --git a/clients/src/main/java/org/apache/kafka/common/network/SaslChannelBuilder.java b/clients/src/main/java/org/apache/kafka/common/network/SaslChannelBuilder.java
index d6dd5fc..1716e0e 100644
--- a/clients/src/main/java/org/apache/kafka/common/network/SaslChannelBuilder.java
+++ b/clients/src/main/java/org/apache/kafka/common/network/SaslChannelBuilder.java
@@ -18,6 +18,7 @@ package org.apache.kafka.common.network;
import org.apache.kafka.common.KafkaException;
import org.apache.kafka.common.config.SaslConfigs;
+import org.apache.kafka.common.config.SslConfigs;
import org.apache.kafka.common.config.internals.BrokerSecurityConfigs;
import org.apache.kafka.common.memory.MemoryPool;
import org.apache.kafka.common.security.auth.SecurityProtocol;
@@ -39,16 +40,19 @@ import java.lang.reflect.Method;
import java.net.Socket;
import java.nio.channels.SelectionKey;
import java.nio.channels.SocketChannel;
+import java.util.Collections;
import java.util.List;
import java.util.Map;
+import java.util.Set;
import javax.security.auth.Subject;
-public class SaslChannelBuilder implements ChannelBuilder {
+public class SaslChannelBuilder implements ChannelBuilder, ListenerReconfigurable {
private static final Logger log = LoggerFactory.getLogger(SaslChannelBuilder.class);
private final SecurityProtocol securityProtocol;
private final ListenerName listenerName;
+ private final boolean isInterBrokerListener;
private final String clientSaslMechanism;
private final Mode mode;
private final JaasContext jaasContext;
@@ -65,6 +69,7 @@ public class SaslChannelBuilder implements ChannelBuilder {
JaasContext jaasContext,
SecurityProtocol securityProtocol,
ListenerName listenerName,
+ boolean isInterBrokerListener,
String clientSaslMechanism,
boolean handshakeRequestEnable,
CredentialCache credentialCache,
@@ -73,6 +78,7 @@ public class SaslChannelBuilder implements ChannelBuilder {
this.jaasContext = jaasContext;
this.securityProtocol = securityProtocol;
this.listenerName = listenerName;
+ this.isInterBrokerListener = isInterBrokerListener;
this.handshakeRequestEnable = handshakeRequestEnable;
this.clientSaslMechanism = clientSaslMechanism;
this.credentialCache = credentialCache;
@@ -108,7 +114,7 @@ public class SaslChannelBuilder implements ChannelBuilder {
if (this.securityProtocol == SecurityProtocol.SASL_SSL) {
// Disable SSL client authentication as we are using SASL authentication
- this.sslFactory = new SslFactory(mode, "none");
+ this.sslFactory = new SslFactory(mode, "none", isInterBrokerListener);
this.sslFactory.configure(configs);
}
} catch (Exception e) {
@@ -118,6 +124,30 @@ public class SaslChannelBuilder implements ChannelBuilder {
}
@Override
+ public Set<String> reconfigurableConfigs() {
+ return securityProtocol == SecurityProtocol.SASL_SSL ? SslConfigs.RECONFIGURABLE_CONFIGS : Collections.<String>emptySet();
+ }
+
+ @Override
+ public boolean validateReconfiguration(Map<String, ?> configs) {
+ if (this.securityProtocol == SecurityProtocol.SASL_SSL)
+ return sslFactory.validateReconfiguration(configs);
+ else
+ return true;
+ }
+
+ @Override
+ public void reconfigure(Map<String, ?> configs) {
+ if (this.securityProtocol == SecurityProtocol.SASL_SSL)
+ sslFactory.reconfigure(configs);
+ }
+
+ @Override
+ public ListenerName listenerName() {
+ return listenerName;
+ }
+
+ @Override
public KafkaChannel buildChannel(String id, SelectionKey key, int maxReceiveSize, MemoryPool memoryPool) throws KafkaException {
try {
SocketChannel socketChannel = (SocketChannel) key.channel();
diff --git a/clients/src/main/java/org/apache/kafka/common/network/SslChannelBuilder.java b/clients/src/main/java/org/apache/kafka/common/network/SslChannelBuilder.java
index 9519e58..e024d32 100644
--- a/clients/src/main/java/org/apache/kafka/common/network/SslChannelBuilder.java
+++ b/clients/src/main/java/org/apache/kafka/common/network/SslChannelBuilder.java
@@ -17,6 +17,7 @@
package org.apache.kafka.common.network;
import org.apache.kafka.common.KafkaException;
+import org.apache.kafka.common.config.SslConfigs;
import org.apache.kafka.common.memory.MemoryPool;
import org.apache.kafka.common.security.auth.KafkaPrincipal;
import org.apache.kafka.common.security.auth.KafkaPrincipalBuilder;
@@ -33,21 +34,31 @@ import java.net.InetSocketAddress;
import java.nio.channels.SelectionKey;
import java.nio.channels.SocketChannel;
import java.util.Map;
+import java.util.Set;
-public class SslChannelBuilder implements ChannelBuilder {
+public class SslChannelBuilder implements ChannelBuilder, ListenerReconfigurable {
private static final Logger log = LoggerFactory.getLogger(SslChannelBuilder.class);
+
+ private final ListenerName listenerName;
+ private final boolean isInterBrokerListener;
private SslFactory sslFactory;
private Mode mode;
private Map<String, ?> configs;
- public SslChannelBuilder(Mode mode) {
+ /**
+ * Constructs a SSL channel builder. ListenerName is provided only
+ * for server channel builder and will be null for client channel builder.
+ */
+ public SslChannelBuilder(Mode mode, ListenerName listenerName, boolean isInterBrokerListener) {
this.mode = mode;
+ this.listenerName = listenerName;
+ this.isInterBrokerListener = isInterBrokerListener;
}
public void configure(Map<String, ?> configs) throws KafkaException {
try {
this.configs = configs;
- this.sslFactory = new SslFactory(mode);
+ this.sslFactory = new SslFactory(mode, null, isInterBrokerListener);
this.sslFactory.configure(this.configs);
} catch (Exception e) {
throw new KafkaException(e);
@@ -55,6 +66,26 @@ public class SslChannelBuilder implements ChannelBuilder {
}
@Override
+ public Set<String> reconfigurableConfigs() {
+ return SslConfigs.RECONFIGURABLE_CONFIGS;
+ }
+
+ @Override
+ public boolean validateReconfiguration(Map<String, ?> configs) {
+ return sslFactory.validateReconfiguration(configs);
+ }
+
+ @Override
+ public void reconfigure(Map<String, ?> configs) {
+ sslFactory.reconfigure(configs);
+ }
+
+ @Override
+ public ListenerName listenerName() {
+ return listenerName;
+ }
+
+ @Override
public KafkaChannel buildChannel(String id, SelectionKey key, int maxReceiveSize, MemoryPool memoryPool) throws KafkaException {
try {
SslTransportLayer transportLayer = buildTransportLayer(sslFactory, id, key, peerHost(key));
diff --git a/clients/src/main/java/org/apache/kafka/common/protocol/types/Struct.java b/clients/src/main/java/org/apache/kafka/common/protocol/types/Struct.java
index b825201..6fb6b20 100644
--- a/clients/src/main/java/org/apache/kafka/common/protocol/types/Struct.java
+++ b/clients/src/main/java/org/apache/kafka/common/protocol/types/Struct.java
@@ -289,7 +289,11 @@ public class Struct {
}
public Struct setIfExists(Field def, Object value) {
- BoundField field = this.schema.get(def.name);
+ return setIfExists(def.name, value);
+ }
+
+ public Struct setIfExists(String fieldName, Object value) {
+ BoundField field = this.schema.get(fieldName);
if (field != null)
this.values[field.index] = value;
return this;
diff --git a/clients/src/main/java/org/apache/kafka/common/requests/DescribeConfigsRequest.java b/clients/src/main/java/org/apache/kafka/common/requests/DescribeConfigsRequest.java
index 74e25f4..4156338 100644
--- a/clients/src/main/java/org/apache/kafka/common/requests/DescribeConfigsRequest.java
+++ b/clients/src/main/java/org/apache/kafka/common/requests/DescribeConfigsRequest.java
@@ -30,12 +30,14 @@ import java.util.HashMap;
import java.util.List;
import java.util.Map;
+import static org.apache.kafka.common.protocol.types.Type.BOOLEAN;
import static org.apache.kafka.common.protocol.types.Type.INT8;
import static org.apache.kafka.common.protocol.types.Type.STRING;
public class DescribeConfigsRequest extends AbstractRequest {
private static final String RESOURCES_KEY_NAME = "resources";
+ private static final String INCLUDE_SYNONYMS = "include_synonyms";
private static final String RESOURCE_TYPE_KEY_NAME = "resource_type";
private static final String RESOURCE_NAME_KEY_NAME = "resource_name";
private static final String CONFIG_NAMES_KEY_NAME = "config_names";
@@ -48,18 +50,29 @@ public class DescribeConfigsRequest extends AbstractRequest {
private static final Schema DESCRIBE_CONFIGS_REQUEST_V0 = new Schema(
new Field(RESOURCES_KEY_NAME, new ArrayOf(DESCRIBE_CONFIGS_REQUEST_RESOURCE_V0), "An array of config resources to be returned."));
+ private static final Schema DESCRIBE_CONFIGS_REQUEST_V1 = new Schema(
+ new Field(RESOURCES_KEY_NAME, new ArrayOf(DESCRIBE_CONFIGS_REQUEST_RESOURCE_V0), "An array of config resources to be returned."),
+ new Field(INCLUDE_SYNONYMS, BOOLEAN));
+
+
public static Schema[] schemaVersions() {
- return new Schema[]{DESCRIBE_CONFIGS_REQUEST_V0};
+ return new Schema[]{DESCRIBE_CONFIGS_REQUEST_V0, DESCRIBE_CONFIGS_REQUEST_V1};
}
public static class Builder extends AbstractRequest.Builder {
private final Map<Resource, Collection<String>> resourceToConfigNames;
+ private boolean includeSynonyms;
public Builder(Map<Resource, Collection<String>> resourceToConfigNames) {
super(ApiKeys.DESCRIBE_CONFIGS);
this.resourceToConfigNames = resourceToConfigNames;
}
+ public Builder includeSynonyms(boolean includeSynonyms) {
+ this.includeSynonyms = includeSynonyms;
+ return this;
+ }
+
public Builder(Collection<Resource> resources) {
this(toResourceToConfigNames(resources));
}
@@ -73,16 +86,17 @@ public class DescribeConfigsRequest extends AbstractRequest {
@Override
public DescribeConfigsRequest build(short version) {
- return new DescribeConfigsRequest(version, resourceToConfigNames);
+ return new DescribeConfigsRequest(version, resourceToConfigNames, includeSynonyms);
}
}
private final Map<Resource, Collection<String>> resourceToConfigNames;
+ private final boolean includeSynonyms;
- public DescribeConfigsRequest(short version, Map<Resource, Collection<String>> resourceToConfigNames) {
+ public DescribeConfigsRequest(short version, Map<Resource, Collection<String>> resourceToConfigNames, boolean includeSynonyms) {
super(version);
this.resourceToConfigNames = resourceToConfigNames;
-
+ this.includeSynonyms = includeSynonyms;
}
public DescribeConfigsRequest(Struct struct, short version) {
@@ -104,6 +118,7 @@ public class DescribeConfigsRequest extends AbstractRequest {
resourceToConfigNames.put(new Resource(resourceType, resourceName), configNames);
}
+ this.includeSynonyms = struct.hasField(INCLUDE_SYNONYMS) ? struct.getBoolean(INCLUDE_SYNONYMS) : false;
}
public Collection<Resource> resources() {
@@ -117,6 +132,10 @@ public class DescribeConfigsRequest extends AbstractRequest {
return resourceToConfigNames.get(resource);
}
+ public boolean includeSynonyms() {
+ return includeSynonyms;
+ }
+
@Override
protected Struct toStruct() {
Struct struct = new Struct(ApiKeys.DESCRIBE_CONFIGS.requestSchema(version()));
@@ -133,6 +152,7 @@ public class DescribeConfigsRequest extends AbstractRequest {
resourceStructs.add(resourceStruct);
}
struct.set(RESOURCES_KEY_NAME, resourceStructs.toArray(new Struct[0]));
+ struct.setIfExists(INCLUDE_SYNONYMS, includeSynonyms);
return struct;
}
@@ -141,6 +161,7 @@ public class DescribeConfigsRequest extends AbstractRequest {
short version = version();
switch (version) {
case 0:
+ case 1:
ApiError error = ApiError.fromThrowable(e);
Map<Resource, DescribeConfigsResponse.Config> errors = new HashMap<>(resources().size());
DescribeConfigsResponse.Config config = new DescribeConfigsResponse.Config(error,
diff --git a/clients/src/main/java/org/apache/kafka/common/requests/DescribeConfigsResponse.java b/clients/src/main/java/org/apache/kafka/common/requests/DescribeConfigsResponse.java
index 91bf30e..62012f4 100644
--- a/clients/src/main/java/org/apache/kafka/common/requests/DescribeConfigsResponse.java
+++ b/clients/src/main/java/org/apache/kafka/common/requests/DescribeConfigsResponse.java
@@ -27,6 +27,7 @@ import org.apache.kafka.common.protocol.types.Struct;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.Collection;
+import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
@@ -54,24 +55,53 @@ public class DescribeConfigsResponse extends AbstractResponse {
private static final String IS_DEFAULT_KEY_NAME = "is_default";
private static final String READ_ONLY_KEY_NAME = "read_only";
+ private static final String CONFIG_SYNONYMS_KEY_NAME = "config_synonyms";
+ private static final String CONFIG_SOURCE_KEY_NAME = "config_source";
+
+ private static final Schema DESCRIBE_CONFIGS_RESPONSE_ENTRY_V0 = new Schema(
+ new Field(CONFIG_NAME_KEY_NAME, STRING),
+ new Field(CONFIG_VALUE_KEY_NAME, NULLABLE_STRING),
+ new Field(READ_ONLY_KEY_NAME, BOOLEAN),
+ new Field(IS_DEFAULT_KEY_NAME, BOOLEAN),
+ new Field(IS_SENSITIVE_KEY_NAME, BOOLEAN));
+
+ private static final Schema DESCRIBE_CONFIGS_RESPONSE_SYNONYM_V1 = new Schema(
+ new Field(CONFIG_NAME_KEY_NAME, STRING),
+ new Field(CONFIG_VALUE_KEY_NAME, NULLABLE_STRING),
+ new Field(CONFIG_SOURCE_KEY_NAME, INT8));
+
+ private static final Schema DESCRIBE_CONFIGS_RESPONSE_ENTRY_V1 = new Schema(
+ new Field(CONFIG_NAME_KEY_NAME, STRING),
+ new Field(CONFIG_VALUE_KEY_NAME, NULLABLE_STRING),
+ new Field(READ_ONLY_KEY_NAME, BOOLEAN),
+ new Field(IS_DEFAULT_KEY_NAME, BOOLEAN),
+ new Field(IS_SENSITIVE_KEY_NAME, BOOLEAN),
+ new Field(CONFIG_SYNONYMS_KEY_NAME, new ArrayOf(DESCRIBE_CONFIGS_RESPONSE_SYNONYM_V1)));
+
private static final Schema DESCRIBE_CONFIGS_RESPONSE_ENTITY_V0 = new Schema(
ERROR_CODE,
ERROR_MESSAGE,
new Field(RESOURCE_TYPE_KEY_NAME, INT8),
new Field(RESOURCE_NAME_KEY_NAME, STRING),
- new Field(CONFIG_ENTRIES_KEY_NAME, new ArrayOf(new Schema(
- new Field(CONFIG_NAME_KEY_NAME, STRING),
- new Field(CONFIG_VALUE_KEY_NAME, NULLABLE_STRING),
- new Field(READ_ONLY_KEY_NAME, BOOLEAN),
- new Field(IS_DEFAULT_KEY_NAME, BOOLEAN),
- new Field(IS_SENSITIVE_KEY_NAME, BOOLEAN)))));
+ new Field(CONFIG_ENTRIES_KEY_NAME, new ArrayOf(DESCRIBE_CONFIGS_RESPONSE_ENTRY_V0)));
+
+ private static final Schema DESCRIBE_CONFIGS_RESPONSE_ENTITY_V1 = new Schema(
+ ERROR_CODE,
+ ERROR_MESSAGE,
+ new Field(RESOURCE_TYPE_KEY_NAME, INT8),
+ new Field(RESOURCE_NAME_KEY_NAME, STRING),
+ new Field(CONFIG_ENTRIES_KEY_NAME, new ArrayOf(DESCRIBE_CONFIGS_RESPONSE_ENTRY_V1)));
private static final Schema DESCRIBE_CONFIGS_RESPONSE_V0 = new Schema(
THROTTLE_TIME_MS,
new Field(RESOURCES_KEY_NAME, new ArrayOf(DESCRIBE_CONFIGS_RESPONSE_ENTITY_V0)));
+ private static final Schema DESCRIBE_CONFIGS_RESPONSE_V1 = new Schema(
+ THROTTLE_TIME_MS,
+ new Field(RESOURCES_KEY_NAME, new ArrayOf(DESCRIBE_CONFIGS_RESPONSE_ENTITY_V1)));
+
public static Schema[] schemaVersions() {
- return new Schema[]{DESCRIBE_CONFIGS_RESPONSE_V0};
+ return new Schema[]{DESCRIBE_CONFIGS_RESPONSE_V0, DESCRIBE_CONFIGS_RESPONSE_V1};
}
public static class Config {
@@ -96,15 +126,19 @@ public class DescribeConfigsResponse extends AbstractResponse {
private final String name;
private final String value;
private final boolean isSensitive;
- private final boolean isDefault;
+ private final ConfigSource source;
private final boolean readOnly;
+ private final Collection<ConfigSynonym> synonyms;
+
+ public ConfigEntry(String name, String value, ConfigSource source, boolean isSensitive, boolean readOnly,
+ Collection<ConfigSynonym> synonyms) {
- public ConfigEntry(String name, String value, boolean isSensitive, boolean isDefault, boolean readOnly) {
this.name = name;
this.value = value;
+ this.source = source;
this.isSensitive = isSensitive;
- this.isDefault = isDefault;
this.readOnly = readOnly;
+ this.synonyms = synonyms;
}
public String name() {
@@ -119,15 +153,66 @@ public class DescribeConfigsResponse extends AbstractResponse {
return isSensitive;
}
- public boolean isDefault() {
- return isDefault;
+ public ConfigSource source() {
+ return source;
}
public boolean isReadOnly() {
return readOnly;
}
+
+ public Collection<ConfigSynonym> synonyms() {
+ return synonyms;
+ }
+ }
+
+ public enum ConfigSource {
+ UNKNOWN_CONFIG((byte) 0),
+ TOPIC_CONFIG((byte) 1),
+ DYNAMIC_BROKER_CONFIG((byte) 2),
+ DYNAMIC_DEFAULT_BROKER_CONFIG((byte) 3),
+ STATIC_BROKER_CONFIG((byte) 4),
+ DEFAULT_CONFIG((byte) 5);
+
+ final byte id;
+ private static final ConfigSource[] VALUES = values();
+
+ ConfigSource(byte id) {
+ this.id = id;
+ }
+
+ public static ConfigSource forId(byte id) {
+ if (id < 0)
+ throw new IllegalArgumentException("id should be positive, id: " + id);
+ if (id >= VALUES.length)
+ return UNKNOWN_CONFIG;
+ return VALUES[id];
+ }
}
+ public static class ConfigSynonym {
+ private final String name;
+ private final String value;
+ private final ConfigSource source;
+
+ public ConfigSynonym(String name, String value, ConfigSource source) {
+ this.name = name;
+ this.value = value;
+ this.source = source;
+ }
+
+ public String name() {
+ return name;
+ }
+ public String value() {
+ return value;
+ }
+ public ConfigSource source() {
+ return source;
+ }
+ }
+
+
private final int throttleTimeMs;
private final Map<Resource, Config> configs;
@@ -155,9 +240,42 @@ public class DescribeConfigsResponse extends AbstractResponse {
String configName = configEntriesStruct.getString(CONFIG_NAME_KEY_NAME);
String configValue = configEntriesStruct.getString(CONFIG_VALUE_KEY_NAME);
boolean isSensitive = configEntriesStruct.getBoolean(IS_SENSITIVE_KEY_NAME);
- boolean isDefault = configEntriesStruct.getBoolean(IS_DEFAULT_KEY_NAME);
+ ConfigSource configSource;
+ if (configEntriesStruct.hasField(CONFIG_SOURCE_KEY_NAME))
+ configSource = ConfigSource.forId(configEntriesStruct.getByte(CONFIG_SOURCE_KEY_NAME));
+ else if (configEntriesStruct.hasField(IS_DEFAULT_KEY_NAME)) {
+ if (configEntriesStruct.getBoolean(IS_DEFAULT_KEY_NAME))
+ configSource = ConfigSource.DEFAULT_CONFIG;
+ else {
+ switch (resourceType) {
+ case BROKER:
+ configSource = ConfigSource.STATIC_BROKER_CONFIG;
+ break;
+ case TOPIC:
+ configSource = ConfigSource.TOPIC_CONFIG;
+ break;
+ default:
+ configSource = ConfigSource.UNKNOWN_CONFIG;
+ break;
+ }
+ }
+ } else
+ throw new IllegalStateException("Config entry should contain either is_default or config_source");
boolean readOnly = configEntriesStruct.getBoolean(READ_ONLY_KEY_NAME);
- configEntries.add(new ConfigEntry(configName, configValue, isSensitive, isDefault, readOnly));
+ Collection<ConfigSynonym> synonyms;
+ if (configEntriesStruct.hasField(CONFIG_SYNONYMS_KEY_NAME)) {
+ Object[] synonymsArray = configEntriesStruct.getArray(CONFIG_SYNONYMS_KEY_NAME);
+ synonyms = new ArrayList<>(synonymsArray.length);
+ for (Object synonymObj: synonymsArray) {
+ Struct synonymStruct = (Struct) synonymObj;
+ String synonymConfigName = synonymStruct.getString(CONFIG_NAME_KEY_NAME);
+ String synonymConfigValue = synonymStruct.getString(CONFIG_VALUE_KEY_NAME);
+ ConfigSource source = ConfigSource.forId(synonymStruct.getByte(CONFIG_SOURCE_KEY_NAME));
+ synonyms.add(new ConfigSynonym(synonymConfigName, synonymConfigValue, source));
+ }
+ } else
+ synonyms = Collections.emptyList();
+ configEntries.add(new ConfigEntry(configName, configValue, configSource, isSensitive, readOnly, synonyms));
}
Config config = new Config(error, configEntries);
configs.put(resource, config);
@@ -205,9 +323,21 @@ public class DescribeConfigsResponse extends AbstractResponse {
configEntriesStruct.set(CONFIG_NAME_KEY_NAME, configEntry.name);
configEntriesStruct.set(CONFIG_VALUE_KEY_NAME, configEntry.value);
configEntriesStruct.set(IS_SENSITIVE_KEY_NAME, configEntry.isSensitive);
- configEntriesStruct.set(IS_DEFAULT_KEY_NAME, configEntry.isDefault);
+ configEntriesStruct.setIfExists(CONFIG_SOURCE_KEY_NAME, configEntry.source.id);
+ configEntriesStruct.setIfExists(IS_DEFAULT_KEY_NAME, configEntry.source == ConfigSource.DEFAULT_CONFIG);
configEntriesStruct.set(READ_ONLY_KEY_NAME, configEntry.readOnly);
configEntryStructs.add(configEntriesStruct);
+ if (configEntriesStruct.hasField(CONFIG_SYNONYMS_KEY_NAME)) {
+ List<Struct> configSynonymStructs = new ArrayList<>(configEntry.synonyms.size());
+ for (ConfigSynonym synonym : configEntry.synonyms) {
+ Struct configSynonymStruct = configEntriesStruct.instance(CONFIG_SYNONYMS_KEY_NAME);
+ configSynonymStruct.set(CONFIG_NAME_KEY_NAME, synonym.name);
+ configSynonymStruct.set(CONFIG_VALUE_KEY_NAME, synonym.value);
+ configSynonymStruct.set(CONFIG_SOURCE_KEY_NAME, synonym.source.id);
+ configSynonymStructs.add(configSynonymStruct);
+ }
+ configEntriesStruct.set(CONFIG_SYNONYMS_KEY_NAME, configSynonymStructs.toArray(new Struct[0]));
+ }
}
resourceStruct.set(CONFIG_ENTRIES_KEY_NAME, configEntryStructs.toArray(new Struct[0]));
diff --git a/clients/src/main/java/org/apache/kafka/common/security/ssl/SslFactory.java b/clients/src/main/java/org/apache/kafka/common/security/ssl/SslFactory.java
index 6b2fe74..285582c 100644
--- a/clients/src/main/java/org/apache/kafka/common/security/ssl/SslFactory.java
+++ b/clients/src/main/java/org/apache/kafka/common/security/ssl/SslFactory.java
@@ -16,40 +16,53 @@
*/
package org.apache.kafka.common.security.ssl;
-import org.apache.kafka.common.Configurable;
import org.apache.kafka.common.KafkaException;
+import org.apache.kafka.common.Reconfigurable;
+import org.apache.kafka.common.config.ConfigException;
import org.apache.kafka.common.config.SslConfigs;
import org.apache.kafka.common.config.internals.BrokerSecurityConfigs;
import org.apache.kafka.common.network.Mode;
import org.apache.kafka.common.config.types.Password;
+import org.apache.kafka.common.utils.Utils;
+import javax.net.ssl.KeyManager;
+import javax.net.ssl.KeyManagerFactory;
+import javax.net.ssl.SSLContext;
+import javax.net.ssl.SSLEngine;
+import javax.net.ssl.SSLEngineResult;
+import javax.net.ssl.SSLException;
+import javax.net.ssl.SSLParameters;
+import javax.net.ssl.TrustManagerFactory;
import java.io.FileInputStream;
import java.io.IOException;
+import java.nio.ByteBuffer;
import java.security.GeneralSecurityException;
import java.security.KeyStore;
+import java.security.Principal;
import java.security.SecureRandom;
+import java.security.cert.Certificate;
+import java.security.cert.X509Certificate;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.Enumeration;
import java.util.List;
import java.util.Map;
import java.util.Objects;
+import java.util.Set;
+import java.util.HashSet;
-import javax.net.ssl.KeyManager;
-import javax.net.ssl.KeyManagerFactory;
-import javax.net.ssl.SSLContext;
-import javax.net.ssl.SSLEngine;
-import javax.net.ssl.SSLParameters;
-import javax.net.ssl.TrustManagerFactory;
-
-public class SslFactory implements Configurable {
+public class SslFactory implements Reconfigurable {
private final Mode mode;
private final String clientAuthConfigOverride;
+ private final boolean keystoreVerifiableUsingTruststore;
private String protocol;
private String provider;
private String kmfAlgorithm;
private String tmfAlgorithm;
private SecurityStore keystore = null;
- private Password keyPassword;
private SecurityStore truststore;
private String[] cipherSuites;
private String[] enabledProtocols;
@@ -60,12 +73,13 @@ public class SslFactory implements Configurable {
private boolean wantClientAuth;
public SslFactory(Mode mode) {
- this(mode, null);
+ this(mode, null, false);
}
- public SslFactory(Mode mode, String clientAuthConfigOverride) {
+ public SslFactory(Mode mode, String clientAuthConfigOverride, boolean keystoreVerifiableUsingTruststore) {
this.mode = mode;
this.clientAuthConfigOverride = clientAuthConfigOverride;
+ this.keystoreVerifiableUsingTruststore = keystoreVerifiableUsingTruststore;
}
@Override
@@ -109,23 +123,68 @@ public class SslFactory implements Configurable {
this.kmfAlgorithm = (String) configs.get(SslConfigs.SSL_KEYMANAGER_ALGORITHM_CONFIG);
this.tmfAlgorithm = (String) configs.get(SslConfigs.SSL_TRUSTMANAGER_ALGORITHM_CONFIG);
- createKeystore((String) configs.get(SslConfigs.SSL_KEYSTORE_TYPE_CONFIG),
+ this.keystore = createKeystore((String) configs.get(SslConfigs.SSL_KEYSTORE_TYPE_CONFIG),
(String) configs.get(SslConfigs.SSL_KEYSTORE_LOCATION_CONFIG),
(Password) configs.get(SslConfigs.SSL_KEYSTORE_PASSWORD_CONFIG),
(Password) configs.get(SslConfigs.SSL_KEY_PASSWORD_CONFIG));
- createTruststore((String) configs.get(SslConfigs.SSL_TRUSTSTORE_TYPE_CONFIG),
+ this.truststore = createTruststore((String) configs.get(SslConfigs.SSL_TRUSTSTORE_TYPE_CONFIG),
(String) configs.get(SslConfigs.SSL_TRUSTSTORE_LOCATION_CONFIG),
(Password) configs.get(SslConfigs.SSL_TRUSTSTORE_PASSWORD_CONFIG));
try {
- this.sslContext = createSSLContext();
+ this.sslContext = createSSLContext(keystore);
} catch (Exception e) {
throw new KafkaException(e);
}
}
+ @Override
+ public Set<String> reconfigurableConfigs() {
+ return SslConfigs.RECONFIGURABLE_CONFIGS;
+ }
+
+ @Override
+ public boolean validateReconfiguration(Map<String, ?> configs) {
+ try {
+ SecurityStore newKeystore = maybeCreateNewKeystore(configs);
+ if (newKeystore != null)
+ createSSLContext(newKeystore);
+ return true;
+ } catch (Exception e) {
+ throw new KafkaException("Validation of dynamic config update failed", e);
+ }
+ }
+
+ @Override
+ public void reconfigure(Map<String, ?> configs) throws KafkaException {
+ SecurityStore newKeystore = maybeCreateNewKeystore(configs);
+ if (newKeystore != null) {
+ try {
+ this.sslContext = createSSLContext(newKeystore);
+ this.keystore = newKeystore;
+ } catch (Exception e) {
+ throw new KafkaException("Reconfiguration of SSL keystore failed", e);
+ }
+ }
+ }
+
+ private SecurityStore maybeCreateNewKeystore(Map<String, ?> configs) {
+ boolean keystoreChanged = Objects.equals(configs.get(SslConfigs.SSL_KEYSTORE_TYPE_CONFIG), keystore.type) ||
+ Objects.equals(configs.get(SslConfigs.SSL_KEYSTORE_LOCATION_CONFIG), keystore.path) ||
+ Objects.equals(configs.get(SslConfigs.SSL_KEYSTORE_PASSWORD_CONFIG), keystore.password) ||
+ Objects.equals(configs.get(SslConfigs.SSL_KEY_PASSWORD_CONFIG), keystore.keyPassword);
+
+ if (keystoreChanged) {
+ return createKeystore((String) configs.get(SslConfigs.SSL_KEYSTORE_TYPE_CONFIG),
+ (String) configs.get(SslConfigs.SSL_KEYSTORE_LOCATION_CONFIG),
+ (Password) configs.get(SslConfigs.SSL_KEYSTORE_PASSWORD_CONFIG),
+ (Password) configs.get(SslConfigs.SSL_KEY_PASSWORD_CONFIG));
+ } else
+ return null;
+ }
- private SSLContext createSSLContext() throws GeneralSecurityException, IOException {
+ // package access for testing
+ SSLContext createSSLContext(SecurityStore keystore) throws GeneralSecurityException, IOException {
SSLContext sslContext;
if (provider != null)
sslContext = SSLContext.getInstance(protocol, provider);
@@ -137,7 +196,7 @@ public class SslFactory implements Configurable {
String kmfAlgorithm = this.kmfAlgorithm != null ? this.kmfAlgorithm : KeyManagerFactory.getDefaultAlgorithm();
KeyManagerFactory kmf = KeyManagerFactory.getInstance(kmfAlgorithm);
KeyStore ks = keystore.load();
- Password keyPassword = this.keyPassword != null ? this.keyPassword : keystore.password;
+ Password keyPassword = keystore.keyPassword != null ? keystore.keyPassword : keystore.password;
kmf.init(ks, keyPassword.value().toCharArray());
keyManagers = kmf.getKeyManagers();
}
@@ -148,10 +207,23 @@ public class SslFactory implements Configurable {
tmf.init(ts);
sslContext.init(keyManagers, tmf.getTrustManagers(), this.secureRandomImplementation);
+ if (keystore != null && keystore != this.keystore) {
+ if (this.keystore == null)
+ throw new ConfigException("Cannot add SSL keystore to an existing listener for which no keystore was configured.");
+ if (keystoreVerifiableUsingTruststore)
+ SSLConfigValidatorEngine.validate(this, sslContext);
+ if (!CertificateEntries.create(this.keystore.load()).equals(CertificateEntries.create(keystore.load()))) {
+ throw new ConfigException("Keystore DistinguishedName or SubjectAltNames do not match");
+ }
+ }
return sslContext;
}
public SSLEngine createSslEngine(String peerHost, int peerPort) {
+ return createSslEngine(sslContext, peerHost, peerPort);
+ }
+
+ private SSLEngine createSslEngine(SSLContext sslContext, String peerHost, int peerPort) {
SSLEngine sslEngine = sslContext.createSSLEngine(peerHost, peerPort);
if (cipherSuites != null) sslEngine.setEnabledCipherSuites(cipherSuites);
if (enabledProtocols != null) sslEngine.setEnabledProtocols(enabledProtocols);
@@ -181,38 +253,42 @@ public class SslFactory implements Configurable {
return sslContext;
}
- private void createKeystore(String type, String path, Password password, Password keyPassword) {
+ private SecurityStore createKeystore(String type, String path, Password password, Password keyPassword) {
if (path == null && password != null) {
throw new KafkaException("SSL key store is not specified, but key store password is specified.");
} else if (path != null && password == null) {
throw new KafkaException("SSL key store is specified, but key store password is not specified.");
} else if (path != null && password != null) {
- this.keystore = new SecurityStore(type, path, password);
- this.keyPassword = keyPassword;
- }
+ return new SecurityStore(type, path, password, keyPassword);
+ } else
+ return null; // path == null, clients may use this path with brokers that don't require client auth
}
- private void createTruststore(String type, String path, Password password) {
+ private SecurityStore createTruststore(String type, String path, Password password) {
if (path == null && password != null) {
throw new KafkaException("SSL trust store is not specified, but trust store password is specified.");
} else if (path != null) {
- this.truststore = new SecurityStore(type, path, password);
- }
+ return new SecurityStore(type, path, password, null);
+ } else
+ return null;
}
- private static class SecurityStore {
+ // package access for testing
+ static class SecurityStore {
private final String type;
private final String path;
private final Password password;
+ private final Password keyPassword;
- private SecurityStore(String type, String path, Password password) {
+ SecurityStore(String type, String path, Password password, Password keyPassword) {
Objects.requireNonNull(type, "type must not be null");
this.type = type;
this.path = path;
this.password = password;
+ this.keyPassword = keyPassword;
}
- private KeyStore load() throws GeneralSecurityException, IOException {
+ KeyStore load() throws GeneralSecurityException, IOException {
FileInputStream in = null;
try {
KeyStore ks = KeyStore.getInstance(type);
@@ -227,4 +303,156 @@ public class SslFactory implements Configurable {
}
}
+ /**
+ * Validator used to verify dynamic update of keystore used in inter-broker communication.
+ * The validator checks that a successful handshake can be performed using the keystore and
+ * truststore configured on this SslFactory.
+ */
+ static class SSLConfigValidatorEngine {
+ private static final ByteBuffer EMPTY_BUF = ByteBuffer.allocate(0);
+ private final SSLEngine sslEngine;
+ private SSLEngineResult handshakeResult;
+ private ByteBuffer appBuffer;
+ private ByteBuffer netBuffer;
+
+ static void validate(SslFactory sslFactory, SSLContext sslContext) throws SSLException {
+ SSLConfigValidatorEngine clientEngine = new SSLConfigValidatorEngine(sslFactory, sslContext, Mode.CLIENT);
+ SSLConfigValidatorEngine serverEngine = new SSLConfigValidatorEngine(sslFactory, sslContext, Mode.SERVER);
+ try {
+ clientEngine.beginHandshake();
+ serverEngine.beginHandshake();
+ while (!serverEngine.complete() || !clientEngine.complete()) {
+ clientEngine.handshake(serverEngine);
+ serverEngine.handshake(clientEngine);
+ }
+ } finally {
+ clientEngine.close();
+ serverEngine.close();
+ }
+ }
+
+ private SSLConfigValidatorEngine(SslFactory sslFactory, SSLContext sslContext, Mode mode) {
+ this.sslEngine = sslFactory.createSslEngine(sslContext, "localhost", 0); // these hints are not used for validation
+ sslEngine.setUseClientMode(mode == Mode.CLIENT);
+ appBuffer = ByteBuffer.allocate(sslEngine.getSession().getApplicationBufferSize());
+ netBuffer = ByteBuffer.allocate(sslEngine.getSession().getPacketBufferSize());
+ }
+
+ void beginHandshake() throws SSLException {
+ sslEngine.beginHandshake();
+ }
+
+ void handshake(SSLConfigValidatorEngine peerEngine) throws SSLException {
+ SSLEngineResult.HandshakeStatus handshakeStatus = sslEngine.getHandshakeStatus();
+ while (true) {
+ switch (handshakeStatus) {
+ case NEED_WRAP:
+ handshakeResult = sslEngine.wrap(EMPTY_BUF, netBuffer);
+ switch (handshakeResult.getStatus()) {
+ case OK: break;
+ case BUFFER_OVERFLOW:
+ netBuffer.compact();
+ netBuffer = Utils.ensureCapacity(netBuffer, sslEngine.getSession().getPacketBufferSize());
+ netBuffer.flip();
+ break;
+ case BUFFER_UNDERFLOW:
+ case CLOSED:
+ default:
+ throw new SSLException("Unexpected handshake status: " + handshakeResult.getStatus());
+ }
+ return;
+ case NEED_UNWRAP:
+ if (peerEngine.netBuffer.position() == 0) // no data to unwrap, return to process peer
+ return;
+ peerEngine.netBuffer.flip(); // unwrap the data from peer
+ handshakeResult = sslEngine.unwrap(peerEngine.netBuffer, appBuffer);
+ peerEngine.netBuffer.compact();
+ handshakeStatus = handshakeResult.getHandshakeStatus();
+ switch (handshakeResult.getStatus()) {
+ case OK: break;
+ case BUFFER_OVERFLOW:
+ appBuffer = Utils.ensureCapacity(appBuffer, sslEngine.getSession().getApplicationBufferSize());
+ break;
+ case BUFFER_UNDERFLOW:
+ netBuffer = Utils.ensureCapacity(netBuffer, sslEngine.getSession().getPacketBufferSize());
+ break;
+ case CLOSED:
+ default:
+ throw new SSLException("Unexpected handshake status: " + handshakeResult.getStatus());
+ }
+ break;
+ case NEED_TASK:
+ sslEngine.getDelegatedTask().run();
+ handshakeStatus = sslEngine.getHandshakeStatus();
+ break;
+ case FINISHED:
+ return;
+ case NOT_HANDSHAKING:
+ if (handshakeResult.getHandshakeStatus() != SSLEngineResult.HandshakeStatus.FINISHED)
+ throw new SSLException("Did not finish handshake");
+ return;
+ default:
+ throw new IllegalStateException("Unexpected handshake status " + handshakeStatus);
+ }
+ }
+ }
+
+ boolean complete() {
+ return sslEngine.getHandshakeStatus() == SSLEngineResult.HandshakeStatus.FINISHED ||
+ sslEngine.getHandshakeStatus() == SSLEngineResult.HandshakeStatus.NOT_HANDSHAKING;
+ }
+
+ void close() {
+ sslEngine.closeOutbound();
+ try {
+ sslEngine.closeInbound();
+ } catch (Exception e) {
+ // ignore
+ }
+ }
+ }
+
+ static class CertificateEntries {
+ private final Principal subjectPrincipal;
+ private final Set<List<?>> subjectAltNames;
+
+ static List<CertificateEntries> create(KeyStore keystore) throws GeneralSecurityException, IOException {
+ Enumeration<String> aliases = keystore.aliases();
+ List<CertificateEntries> entries = new ArrayList<>();
+ while (aliases.hasMoreElements()) {
+ String alias = aliases.nextElement();
+ Certificate cert = keystore.getCertificate(alias);
+ if (cert instanceof X509Certificate)
+ entries.add(new CertificateEntries((X509Certificate) cert));
+ }
+ return entries;
+ }
+
+ CertificateEntries(X509Certificate cert) throws GeneralSecurityException {
+ this.subjectPrincipal = cert.getSubjectX500Principal();
+ Collection<List<?>> altNames = cert.getSubjectAlternativeNames();
+ // use a set for comparison
+ this.subjectAltNames = altNames != null ? new HashSet<>(altNames) : Collections.<List<?>>emptySet();
+ }
+
+ @Override
+ public int hashCode() {
+ return Objects.hash(subjectPrincipal, subjectAltNames);
+ }
+
+ @Override
+ public boolean equals(Object obj) {
+ if (!(obj instanceof CertificateEntries))
+ return false;
+ CertificateEntries other = (CertificateEntries) obj;
+ return Objects.equals(subjectPrincipal, other.subjectPrincipal) &&
+ Objects.equals(subjectAltNames, other.subjectAltNames);
+ }
+
+ @Override
+ public String toString() {
+ return "subjectPrincipal=" + subjectPrincipal +
+ ", subjectAltNames=" + subjectAltNames;
+ }
+ }
}
diff --git a/clients/src/test/java/org/apache/kafka/common/network/NioEchoServer.java b/clients/src/test/java/org/apache/kafka/common/network/NioEchoServer.java
index ff8929a..0352ade 100644
--- a/clients/src/test/java/org/apache/kafka/common/network/NioEchoServer.java
+++ b/clients/src/test/java/org/apache/kafka/common/network/NioEchoServer.java
@@ -82,7 +82,7 @@ public class NioEchoServer extends Thread {
if (securityProtocol == SecurityProtocol.SASL_PLAINTEXT || securityProtocol == SecurityProtocol.SASL_SSL)
ScramCredentialUtils.createCache(credentialCache, ScramMechanism.mechanismNames());
if (channelBuilder == null)
- channelBuilder = ChannelBuilders.serverChannelBuilder(listenerName, securityProtocol, config, credentialCache, tokenCache);
+ channelBuilder = ChannelBuilders.serverChannelBuilder(listenerName, false, securityProtocol, config, credentialCache, tokenCache);
this.metrics = new Metrics();
this.selector = new Selector(5000, metrics, new MockTime(), "MetricGroup", channelBuilder, new LogContext());
acceptorThread = new AcceptorThread();
diff --git a/clients/src/test/java/org/apache/kafka/common/network/SaslChannelBuilderTest.java b/clients/src/test/java/org/apache/kafka/common/network/SaslChannelBuilderTest.java
index a64cb45..86d26d4 100644
--- a/clients/src/test/java/org/apache/kafka/common/network/SaslChannelBuilderTest.java
+++ b/clients/src/test/java/org/apache/kafka/common/network/SaslChannelBuilderTest.java
@@ -72,7 +72,7 @@ public class SaslChannelBuilderTest {
jaasConfig.addEntry("jaasContext", PlainLoginModule.class.getName(), new HashMap<String, Object>());
JaasContext jaasContext = new JaasContext("jaasContext", JaasContext.Type.SERVER, jaasConfig);
return new SaslChannelBuilder(Mode.CLIENT, jaasContext, securityProtocol, new ListenerName("PLAIN"),
- "PLAIN", true, null, null);
+ false, "PLAIN", true, null, null);
}
}
diff --git a/clients/src/test/java/org/apache/kafka/common/network/SslSelectorTest.java b/clients/src/test/java/org/apache/kafka/common/network/SslSelectorTest.java
index 96c7bc2..1d78e5a 100644
--- a/clients/src/test/java/org/apache/kafka/common/network/SslSelectorTest.java
+++ b/clients/src/test/java/org/apache/kafka/common/network/SslSelectorTest.java
@@ -64,7 +64,7 @@ public class SslSelectorTest extends SelectorTest {
this.server.start();
this.time = new MockTime();
sslClientConfigs = TestSslUtils.createSslConfig(false, false, Mode.CLIENT, trustStoreFile, "client");
- this.channelBuilder = new SslChannelBuilder(Mode.CLIENT);
+ this.channelBuilder = new SslChannelBuilder(Mode.CLIENT, null, false);
this.channelBuilder.configure(sslClientConfigs);
this.metrics = new Metrics();
this.selector = new Selector(5000, metrics, time, "MetricGroup", channelBuilder, new LogContext());
@@ -149,7 +149,7 @@ public class SslSelectorTest extends SelectorTest {
//the initial channel builder is for clients, we need a server one
File trustStoreFile = File.createTempFile("truststore", ".jks");
Map<String, Object> sslServerConfigs = TestSslUtils.createSslConfig(false, true, Mode.SERVER, trustStoreFile, "server");
- channelBuilder = new SslChannelBuilder(Mode.SERVER);
+ channelBuilder = new SslChannelBuilder(Mode.SERVER, null, false);
channelBuilder.configure(sslServerConfigs);
selector = new Selector(NetworkReceive.UNLIMITED, 5000, metrics, time, "MetricGroup",
new HashMap<String, String>(), true, false, channelBuilder, pool, new LogContext());
@@ -236,7 +236,7 @@ public class SslSelectorTest extends SelectorTest {
private static class TestSslChannelBuilder extends SslChannelBuilder {
public TestSslChannelBuilder(Mode mode) {
- super(mode);
+ super(mode, null, false);
}
@Override
diff --git a/clients/src/test/java/org/apache/kafka/common/network/SslTransportLayerTest.java b/clients/src/test/java/org/apache/kafka/common/network/SslTransportLayerTest.java
index 1f55246..5cc66e5 100644
--- a/clients/src/test/java/org/apache/kafka/common/network/SslTransportLayerTest.java
+++ b/clients/src/test/java/org/apache/kafka/common/network/SslTransportLayerTest.java
@@ -80,7 +80,7 @@ public class SslTransportLayerTest {
clientCertStores = new CertStores(false, "client", "localhost");
sslServerConfigs = serverCertStores.getTrustingConfig(clientCertStores);
sslClientConfigs = clientCertStores.getTrustingConfig(serverCertStores);
- this.channelBuilder = new SslChannelBuilder(Mode.CLIENT);
+ this.channelBuilder = new SslChannelBuilder(Mode.CLIENT, null, false);
this.channelBuilder.configure(sslClientConfigs);
this.selector = new Selector(5000, new Metrics(), new MockTime(), "MetricGroup", channelBuilder, new LogContext());
}
@@ -429,7 +429,7 @@ public class SslTransportLayerTest {
*/
@Test
public void testInvalidSecureRandomImplementation() throws Exception {
- SslChannelBuilder channelBuilder = new SslChannelBuilder(Mode.CLIENT);
+ SslChannelBuilder channelBuilder = new SslChannelBuilder(Mode.CLIENT, null, false);
try {
sslClientConfigs.put(SslConfigs.SSL_SECURE_RANDOM_IMPLEMENTATION_CONFIG, "invalid");
channelBuilder.configure(sslClientConfigs);
@@ -444,7 +444,7 @@ public class SslTransportLayerTest {
*/
@Test
public void testInvalidTruststorePassword() throws Exception {
- SslChannelBuilder channelBuilder = new SslChannelBuilder(Mode.CLIENT);
+ SslChannelBuilder channelBuilder = new SslChannelBuilder(Mode.CLIENT, null, false);
try {
sslClientConfigs.put(SslConfigs.SSL_TRUSTSTORE_PASSWORD_CONFIG, "invalid");
channelBuilder.configure(sslClientConfigs);
@@ -459,7 +459,7 @@ public class SslTransportLayerTest {
*/
@Test
public void testInvalidKeystorePassword() throws Exception {
- SslChannelBuilder channelBuilder = new SslChannelBuilder(Mode.CLIENT);
+ SslChannelBuilder channelBuilder = new SslChannelBuilder(Mode.CLIENT, null, false);
try {
sslClientConfigs.put(SslConfigs.SSL_KEYSTORE_PASSWORD_CONFIG, "invalid");
channelBuilder.configure(sslClientConfigs);
@@ -752,7 +752,7 @@ public class SslTransportLayerTest {
@Test
public void testCloseSsl() throws Exception {
- testClose(SecurityProtocol.SSL, new SslChannelBuilder(Mode.CLIENT));
+ testClose(SecurityProtocol.SSL, new SslChannelBuilder(Mode.CLIENT, null, false));
}
@Test
@@ -792,17 +792,81 @@ public class SslTransportLayerTest {
}, 5000, "All requests sent were not processed");
}
- private void createSelector(Map<String, Object> sslClientConfigs) {
- createSelector(sslClientConfigs, null, null, null);
+ /**
+ * Tests reconfiguration of server keystore. Verifies that existing connections continue
+ * to work with old keystore and new connections work with new keystore.
+ */
+ @Test
+ public void testServerKeystoreDynamicUpdate() throws Exception {
+ SecurityProtocol securityProtocol = SecurityProtocol.SSL;
+ TestSecurityConfig config = new TestSecurityConfig(sslServerConfigs);
+ ListenerName listenerName = ListenerName.forSecurityProtocol(securityProtocol);
+ ChannelBuilder serverChannelBuilder = ChannelBuilders.serverChannelBuilder(listenerName,
+ false, securityProtocol, config, null, null);
+ server = new NioEchoServer(listenerName, securityProtocol, config,
+ "localhost", serverChannelBuilder, null);
+ server.start();
+ InetSocketAddress addr = new InetSocketAddress("localhost", server.port());
+
+ // Verify that client with matching truststore can authenticate, send and receive
+ String oldNode = "0";
+ Selector oldClientSelector = createSelector(sslClientConfigs);
+ oldClientSelector.connect(oldNode, addr, BUFFER_SIZE, BUFFER_SIZE);
+ NetworkTestUtils.checkClientConnection(selector, oldNode, 100, 10);
+
+ CertStores newServerCertStores = new CertStores(true, "server", "localhost");
+ sslServerConfigs = newServerCertStores.getTrustingConfig(clientCertStores);
+ assertTrue("SslChannelBuilder not reconfigurable", serverChannelBuilder instanceof ListenerReconfigurable);
+ ListenerReconfigurable reconfigurableBuilder = (ListenerReconfigurable) serverChannelBuilder;
+ assertEquals(listenerName, reconfigurableBuilder.listenerName());
+ reconfigurableBuilder.validateReconfiguration(sslServerConfigs);
+ reconfigurableBuilder.reconfigure(sslServerConfigs);
+
+ // Verify that new client with old truststore fails
+ oldClientSelector.connect("1", addr, BUFFER_SIZE, BUFFER_SIZE);
+ NetworkTestUtils.waitForChannelClose(oldClientSelector, "1", ChannelState.State.AUTHENTICATION_FAILED);
+
+ // Verify that new client with new truststore can authenticate, send and receive
+ sslClientConfigs = clientCertStores.getTrustingConfig(newServerCertStores);
+ Selector newClientSelector = createSelector(sslClientConfigs);
+ newClientSelector.connect("2", addr, BUFFER_SIZE, BUFFER_SIZE);
+ NetworkTestUtils.checkClientConnection(newClientSelector, "2", 100, 10);
+
+ // Verify that old client continues to work
+ NetworkTestUtils.checkClientConnection(oldClientSelector, oldNode, 100, 10);
+
+ CertStores invalidCertStores = new CertStores(true, "server", "127.0.0.1");
+ Map<String, Object> invalidConfigs = invalidCertStores.getTrustingConfig(clientCertStores);
+ try {
+ reconfigurableBuilder.validateReconfiguration(invalidConfigs);
+ fail("Should have failed validation with an exception with different SubjectAltName");
+ } catch (KafkaException e) {
+ // expected exception
+ }
+ try {
+ reconfigurableBuilder.reconfigure(invalidConfigs);
+ fail("Should have failed to reconfigure with different SubjectAltName");
+ } catch (KafkaException e) {
+ // expected exception
+ }
+
+ // Verify that new connections continue to work with the server with previously configured keystore after failed reconfiguration
+ newClientSelector.connect("3", addr, BUFFER_SIZE, BUFFER_SIZE);
+ NetworkTestUtils.checkClientConnection(newClientSelector, "3", 100, 10);
+ }
+
+ private Selector createSelector(Map<String, Object> sslClientConfigs) {
+ return createSelector(sslClientConfigs, null, null, null);
}
- private void createSelector(Map<String, Object> sslClientConfigs, final Integer netReadBufSize,
+ private Selector createSelector(Map<String, Object> sslClientConfigs, final Integer netReadBufSize,
final Integer netWriteBufSize, final Integer appBufSize) {
TestSslChannelBuilder channelBuilder = new TestSslChannelBuilder(Mode.CLIENT);
channelBuilder.configureBufferSizes(netReadBufSize, netWriteBufSize, appBufSize);
this.channelBuilder = channelBuilder;
this.channelBuilder.configure(sslClientConfigs);
this.selector = new Selector(5000, new Metrics(), new MockTime(), "MetricGroup", channelBuilder, new LogContext());
+ return selector;
}
private NioEchoServer createEchoServer(ListenerName listenerName, SecurityProtocol securityProtocol) throws Exception {
@@ -823,7 +887,7 @@ public class SslTransportLayerTest {
int flushDelayCount = 0;
public TestSslChannelBuilder(Mode mode) {
- super(mode);
+ super(mode, null, false);
}
public void configureBufferSizes(Integer netReadBufSize, Integer netWriteBufSize, Integer appBufSize) {
diff --git a/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java b/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java
index 2771f2f..c18f5c2 100644
--- a/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java
+++ b/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java
@@ -242,10 +242,14 @@ public class RequestResponseTest {
checkRequest(createAlterConfigsRequest());
checkErrorResponse(createAlterConfigsRequest(), new UnknownServerException());
checkResponse(createAlterConfigsResponse(), 0);
- checkRequest(createDescribeConfigsRequest());
- checkRequest(createDescribeConfigsRequestWithConfigEntries());
- checkErrorResponse(createDescribeConfigsRequest(), new UnknownServerException());
+ checkRequest(createDescribeConfigsRequest(0));
+ checkRequest(createDescribeConfigsRequestWithConfigEntries(0));
+ checkErrorResponse(createDescribeConfigsRequest(0), new UnknownServerException());
checkResponse(createDescribeConfigsResponse(), 0);
+ checkRequest(createDescribeConfigsRequest(1));
+ checkRequest(createDescribeConfigsRequestWithConfigEntries(1));
+ checkErrorResponse(createDescribeConfigsRequest(1), new UnknownServerException());
+ checkResponse(createDescribeConfigsResponse(), 1);
checkRequest(createCreatePartitionsRequest());
checkRequest(createCreatePartitionsRequestWithAssignments());
checkErrorResponse(createCreatePartitionsRequest(), new InvalidTopicException());
@@ -1031,25 +1035,29 @@ public class RequestResponseTest {
return new DeleteAclsResponse(0, responses);
}
- private DescribeConfigsRequest createDescribeConfigsRequest() {
+ private DescribeConfigsRequest createDescribeConfigsRequest(int version) {
return new DescribeConfigsRequest.Builder(asList(
new org.apache.kafka.common.requests.Resource(org.apache.kafka.common.requests.ResourceType.BROKER, "0"),
- new org.apache.kafka.common.requests.Resource(org.apache.kafka.common.requests.ResourceType.TOPIC, "topic"))).build((short) 0);
+ new org.apache.kafka.common.requests.Resource(org.apache.kafka.common.requests.ResourceType.TOPIC, "topic")))
+ .build((short) version);
}
- private DescribeConfigsRequest createDescribeConfigsRequestWithConfigEntries() {
+ private DescribeConfigsRequest createDescribeConfigsRequestWithConfigEntries(int version) {
Map<org.apache.kafka.common.requests.Resource, Collection<String>> resources = new HashMap<>();
resources.put(new org.apache.kafka.common.requests.Resource(org.apache.kafka.common.requests.ResourceType.BROKER, "0"), asList("foo", "bar"));
resources.put(new org.apache.kafka.common.requests.Resource(org.apache.kafka.common.requests.ResourceType.TOPIC, "topic"), null);
resources.put(new org.apache.kafka.common.requests.Resource(org.apache.kafka.common.requests.ResourceType.TOPIC, "topic a"), Collections.<String>emptyList());
- return new DescribeConfigsRequest.Builder(resources).build((short) 0);
+ return new DescribeConfigsRequest.Builder(resources).build((short) version);
}
private DescribeConfigsResponse createDescribeConfigsResponse() {
Map<org.apache.kafka.common.requests.Resource, DescribeConfigsResponse.Config> configs = new HashMap<>();
+ List<DescribeConfigsResponse.ConfigSynonym> synonyms = Collections.emptyList();
List<DescribeConfigsResponse.ConfigEntry> configEntries = asList(
- new DescribeConfigsResponse.ConfigEntry("config_name", "config_value", false, true, false),
- new DescribeConfigsResponse.ConfigEntry("another_name", "another value", true, false, true)
+ new DescribeConfigsResponse.ConfigEntry("config_name", "config_value",
+ DescribeConfigsResponse.ConfigSource.DYNAMIC_BROKER_CONFIG, true, false, synonyms),
+ new DescribeConfigsResponse.ConfigEntry("another_name", "another value",
+ DescribeConfigsResponse.ConfigSource.DEFAULT_CONFIG, false, true, synonyms)
);
configs.put(new org.apache.kafka.common.requests.Resource(org.apache.kafka.common.requests.ResourceType.BROKER, "0"), new DescribeConfigsResponse.Config(
ApiError.NONE, configEntries));
diff --git a/clients/src/test/java/org/apache/kafka/common/security/authenticator/SaslAuthenticatorTest.java b/clients/src/test/java/org/apache/kafka/common/security/authenticator/SaslAuthenticatorTest.java
index bd7fee3..e3d6b7a 100644
--- a/clients/src/test/java/org/apache/kafka/common/security/authenticator/SaslAuthenticatorTest.java
+++ b/clients/src/test/java/org/apache/kafka/common/security/authenticator/SaslAuthenticatorTest.java
@@ -992,7 +992,7 @@ public class SaslAuthenticatorTest {
if (isScram)
ScramCredentialUtils.createCache(credentialCache, Arrays.asList(saslMechanism));
SaslChannelBuilder serverChannelBuilder = new SaslChannelBuilder(Mode.SERVER, jaasContext,
- securityProtocol, listenerName, saslMechanism, true, credentialCache, null) {
+ securityProtocol, listenerName, false, saslMechanism, true, credentialCache, null) {
@Override
protected SaslServerAuthenticator buildServerAuthenticator(Map<String, ?> configs, String id,
@@ -1034,7 +1034,7 @@ public class SaslAuthenticatorTest {
final Map<String, ?> configs = Collections.emptyMap();
final JaasContext jaasContext = JaasContext.load(JaasContext.Type.CLIENT, null, configs);
SaslChannelBuilder clientChannelBuilder = new SaslChannelBuilder(Mode.CLIENT, jaasContext,
- securityProtocol, listenerName, saslMechanism, true, null, null) {
+ securityProtocol, listenerName, false, saslMechanism, true, null, null) {
@Override
protected SaslClientAuthenticator buildClientAuthenticator(Map<String, ?> configs, String id,
diff --git a/clients/src/test/java/org/apache/kafka/common/security/ssl/SslFactoryTest.java b/clients/src/test/java/org/apache/kafka/common/security/ssl/SslFactoryTest.java
index 5546a55..3a89260 100644
--- a/clients/src/test/java/org/apache/kafka/common/security/ssl/SslFactoryTest.java
+++ b/clients/src/test/java/org/apache/kafka/common/security/ssl/SslFactoryTest.java
@@ -17,18 +17,23 @@
package org.apache.kafka.common.security.ssl;
import java.io.File;
+import java.security.KeyStore;
import java.util.Map;
+import javax.net.ssl.SSLContext;
import javax.net.ssl.SSLEngine;
+import javax.net.ssl.SSLHandshakeException;
import org.apache.kafka.common.config.SslConfigs;
+import org.apache.kafka.common.config.types.Password;
import org.apache.kafka.test.TestSslUtils;
import org.apache.kafka.common.network.Mode;
import org.junit.Test;
-import static org.junit.Assert.assertNotNull;
-import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertArrayEquals;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotEquals;
+import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
@@ -76,4 +81,60 @@ public class SslFactoryTest {
assertTrue(engine.getUseClientMode());
}
+ @Test
+ public void testKeyStoreTrustStoreValidation() throws Exception {
+ File trustStoreFile = File.createTempFile("truststore", ".jks");
+ Map<String, Object> serverSslConfig = TestSslUtils.createSslConfig(false, true,
+ Mode.SERVER, trustStoreFile, "server");
+ SslFactory sslFactory = new SslFactory(Mode.SERVER);
+ sslFactory.configure(serverSslConfig);
+ SSLContext sslContext = sslFactory.createSSLContext(securityStore(serverSslConfig));
+ assertNotNull("SSL context not created", sslContext);
+ }
+
+ @Test
+ public void testUntrustedKeyStoreValidation() throws Exception {
+ File trustStoreFile = File.createTempFile("truststore", ".jks");
+ Map<String, Object> serverSslConfig = TestSslUtils.createSslConfig(false, true,
+ Mode.SERVER, trustStoreFile, "server");
+ Map<String, Object> untrustedConfig = TestSslUtils.createSslConfig(false, true,
+ Mode.SERVER, File.createTempFile("truststore", ".jks"), "server");
+ SslFactory sslFactory = new SslFactory(Mode.SERVER, null, true);
+ sslFactory.configure(serverSslConfig);
+ try {
+ sslFactory.createSSLContext(securityStore(untrustedConfig));
+ fail("Validation did not fail with untrusted keystore");
+ } catch (SSLHandshakeException e) {
+ // Expected exception
+ }
+ }
+
+ @Test
+ public void testCertificateEntriesValidation() throws Exception {
+ File trustStoreFile = File.createTempFile("truststore", ".jks");
+ Map<String, Object> serverSslConfig = TestSslUtils.createSslConfig(false, true,
+ Mode.SERVER, trustStoreFile, "server");
+ Map<String, Object> newCnConfig = TestSslUtils.createSslConfig(false, true,
+ Mode.SERVER, File.createTempFile("truststore", ".jks"), "server", "Another CN");
+ KeyStore ks1 = securityStore(serverSslConfig).load();
+ KeyStore ks2 = securityStore(serverSslConfig).load();
+ assertEquals(SslFactory.CertificateEntries.create(ks1), SslFactory.CertificateEntries.create(ks2));
+
+ // Use different alias name, validation should succeed
+ ks2.setCertificateEntry("another", ks1.getCertificate("localhost"));
+ assertEquals(SslFactory.CertificateEntries.create(ks1), SslFactory.CertificateEntries.create(ks2));
+
+ KeyStore ks3 = securityStore(newCnConfig).load();
+ assertNotEquals(SslFactory.CertificateEntries.create(ks1), SslFactory.CertificateEntries.create(ks3));
+ }
+
+ private SslFactory.SecurityStore securityStore(Map<String, Object> sslConfig) {
+ return new SslFactory.SecurityStore(
+ (String) sslConfig.get(SslConfigs.SSL_KEYSTORE_TYPE_CONFIG),
+ (String) sslConfig.get(SslConfigs.SSL_KEYSTORE_LOCATION_CONFIG),
+ (Password) sslConfig.get(SslConfigs.SSL_KEYSTORE_PASSWORD_CONFIG),
+ (Password) sslConfig.get(SslConfigs.SSL_KEY_PASSWORD_CONFIG)
+ );
+ }
+
}
diff --git a/core/src/main/scala/kafka/log/LogConfig.scala b/core/src/main/scala/kafka/log/LogConfig.scala
index 57b4112..26cc504 100755
--- a/core/src/main/scala/kafka/log/LogConfig.scala
+++ b/core/src/main/scala/kafka/log/LogConfig.scala
@@ -29,7 +29,7 @@ import org.apache.kafka.common.config.{AbstractConfig, ConfigDef, TopicConfig}
import org.apache.kafka.common.record.TimestampType
import org.apache.kafka.common.utils.Utils
-import scala.collection.mutable
+import scala.collection.{Map, mutable}
import org.apache.kafka.common.config.ConfigDef.{ConfigKey, ValidList, Validator}
object Defaults {
@@ -297,4 +297,33 @@ object LogConfig {
configDef.parse(props)
}
+ /**
+ * Map topic config to the broker config with highest priority. Some of these have additional synonyms
+ * that can be obtained using [[kafka.server.DynamicBrokerConfig#brokerConfigSynonyms]]
+ */
+ val TopicConfigSynonyms = Map(
+ SegmentBytesProp -> KafkaConfig.LogSegmentBytesProp,
+ SegmentMsProp -> KafkaConfig.LogRollTimeMillisProp,
+ SegmentJitterMsProp -> KafkaConfig.LogRollTimeJitterMillisProp,
+ SegmentIndexBytesProp -> KafkaConfig.LogIndexSizeMaxBytesProp,
+ FlushMessagesProp -> KafkaConfig.LogFlushIntervalMessagesProp,
+ FlushMsProp -> KafkaConfig.LogFlushIntervalMsProp,
+ RetentionBytesProp -> KafkaConfig.LogRetentionBytesProp,
+ RetentionMsProp -> KafkaConfig.LogRetentionTimeMillisProp,
+ MaxMessageBytesProp -> KafkaConfig.MessageMaxBytesProp,
+ IndexIntervalBytesProp -> KafkaConfig.LogIndexIntervalBytesProp,
+ DeleteRetentionMsProp -> KafkaConfig.LogCleanerDeleteRetentionMsProp,
+ MinCompactionLagMsProp -> KafkaConfig.LogCleanerMinCompactionLagMsProp,
+ FileDeleteDelayMsProp -> KafkaConfig.LogDeleteDelayMsProp,
+ MinCleanableDirtyRatioProp -> KafkaConfig.LogCleanerMinCleanRatioProp,
+ CleanupPolicyProp -> KafkaConfig.LogCleanupPolicyProp,
+ UncleanLeaderElectionEnableProp -> KafkaConfig.UncleanLeaderElectionEnableProp,
+ MinInSyncReplicasProp -> KafkaConfig.MinInSyncReplicasProp,
+ CompressionTypeProp -> KafkaConfig.CompressionTypeProp,
+ PreAllocateEnableProp -> KafkaConfig.LogPreAllocateProp,
+ MessageFormatVersionProp -> KafkaConfig.LogMessageFormatVersionProp,
+ MessageTimestampTypeProp -> KafkaConfig.LogMessageTimestampTypeProp,
+ MessageTimestampDifferenceMaxMsProp -> KafkaConfig.LogMessageTimestampDifferenceMaxMsProp
+ )
+
}
diff --git a/core/src/main/scala/kafka/network/SocketServer.scala b/core/src/main/scala/kafka/network/SocketServer.scala
index d5e49a5..b40bf84 100644
--- a/core/src/main/scala/kafka/network/SocketServer.scala
+++ b/core/src/main/scala/kafka/network/SocketServer.scala
@@ -31,6 +31,7 @@ import kafka.metrics.KafkaMetricsGroup
import kafka.security.CredentialProvider
import kafka.server.KafkaConfig
import kafka.utils._
+import org.apache.kafka.common.Reconfigurable
import org.apache.kafka.common.memory.{MemoryPool, SimpleMemoryPool}
import org.apache.kafka.common.metrics._
import org.apache.kafka.common.metrics.stats.Meter
@@ -437,20 +438,29 @@ private[kafka] class Processor(val id: Int,
)
private val selector = createSelector(
- ChannelBuilders.serverChannelBuilder(listenerName, securityProtocol, config, credentialProvider.credentialCache, credentialProvider.tokenCache))
+ ChannelBuilders.serverChannelBuilder(listenerName,
+ listenerName == config.interBrokerListenerName,
+ securityProtocol,
+ config,
+ credentialProvider.credentialCache,
+ credentialProvider.tokenCache))
// Visible to override for testing
- protected[network] def createSelector(channelBuilder: ChannelBuilder): KSelector = new KSelector(
- maxRequestSize,
- connectionsMaxIdleMs,
- metrics,
- time,
- "socket-server",
- metricTags,
- false,
- true,
- channelBuilder,
- memoryPool,
- logContext)
+ protected[network] def createSelector(channelBuilder: ChannelBuilder): KSelector = {
+ if (channelBuilder.isInstanceOf[Reconfigurable])
+ config.addReconfigurable(channelBuilder.asInstanceOf[Reconfigurable])
+ new KSelector(
+ maxRequestSize,
+ connectionsMaxIdleMs,
+ metrics,
+ time,
+ "socket-server",
+ metricTags,
+ false,
+ true,
+ channelBuilder,
+ memoryPool,
+ logContext)
+ }
// Connection ids have the format `localAddr:localPort-remoteAddr:remotePort-index`. The index is a
// non-negative incrementing value that ensures that even if remotePort is reused after a connection is
diff --git a/core/src/main/scala/kafka/server/AdminManager.scala b/core/src/main/scala/kafka/server/AdminManager.scala
index 8f69000..596dde0 100644
--- a/core/src/main/scala/kafka/server/AdminManager.scala
+++ b/core/src/main/scala/kafka/server/AdminManager.scala
@@ -32,6 +32,7 @@ import org.apache.kafka.common.metrics.Metrics
import org.apache.kafka.common.network.ListenerName
import org.apache.kafka.common.protocol.Errors
import org.apache.kafka.common.requests.CreateTopicsRequest._
+import org.apache.kafka.common.requests.DescribeConfigsResponse.ConfigSource
import org.apache.kafka.common.requests.{AlterConfigsRequest, ApiError, DescribeConfigsResponse, Resource, ResourceType}
import org.apache.kafka.server.policy.{AlterConfigPolicy, CreateTopicPolicy}
import org.apache.kafka.server.policy.CreateTopicPolicy.RequestMetadata
@@ -280,24 +281,17 @@ class AdminManager(val config: KafkaConfig,
}
}
- def describeConfigs(resourceToConfigNames: Map[Resource, Option[Set[String]]]): Map[Resource, DescribeConfigsResponse.Config] = {
+ def describeConfigs(resourceToConfigNames: Map[Resource, Option[Set[String]]], includeSynonyms: Boolean): Map[Resource, DescribeConfigsResponse.Config] = {
resourceToConfigNames.map { case (resource, configNames) =>
- def createResponseConfig(config: AbstractConfig, isReadOnly: Boolean, isDefault: String => Boolean): DescribeConfigsResponse.Config = {
- val filteredConfigPairs = config.values.asScala.filter { case (configName, _) =>
+ def createResponseConfig(config: AbstractConfig, createConfigEntry: (String, Any) => DescribeConfigsResponse.ConfigEntry): DescribeConfigsResponse.Config = {
+ val allConfigs = config.originals.asScala.filter(_._2 != null) ++ config.values.asScala
+ val filteredConfigPairs = allConfigs.filter { case (configName, _) =>
/* Always returns true if configNames is None */
- configNames.map(_.contains(configName)).getOrElse(true)
+ configNames.forall(_.contains(configName))
}.toIndexedSeq
- val configEntries = filteredConfigPairs.map { case (name, value) =>
- val configEntryType = config.typeOf(name)
- val isSensitive = configEntryType == ConfigDef.Type.PASSWORD
- val valueAsString =
- if (isSensitive) null
- else ConfigDef.convertToString(value, configEntryType)
- new DescribeConfigsResponse.ConfigEntry(name, valueAsString, isSensitive, isDefault(name), isReadOnly)
- }
-
+ val configEntries = filteredConfigPairs.map { case (name, value) => createConfigEntry(name, value) }
new DescribeConfigsResponse.Config(ApiError.NONE, configEntries.asJava)
}
@@ -310,15 +304,12 @@ class AdminManager(val config: KafkaConfig,
// Consider optimizing this by caching the configs or retrieving them from the `Log` when possible
val topicProps = adminZkClient.fetchEntityConfig(ConfigType.Topic, topic)
val logConfig = LogConfig.fromProps(KafkaServer.copyKafkaConfigToLog(config), topicProps)
- createResponseConfig(logConfig, isReadOnly = false, name => !topicProps.containsKey(name))
+ createResponseConfig(logConfig, createTopicConfigEntry(logConfig, topicProps, includeSynonyms))
case ResourceType.BROKER =>
- val brokerId = try resource.name.toInt catch {
- case _: NumberFormatException =>
- throw new InvalidRequestException(s"Broker id must be an integer, but it is: ${resource.name}")
- }
+ val brokerId = resourceNameToBrokerId(resource.name)
if (brokerId == config.brokerId)
- createResponseConfig(config, isReadOnly = true, name => !config.originals.containsKey(name))
+ createResponseConfig(config, createBrokerConfigEntry(includeSynonyms))
else
throw new InvalidRequestException(s"Unexpected broker id, expected ${config.brokerId}, but received $brokerId")
@@ -340,6 +331,16 @@ class AdminManager(val config: KafkaConfig,
def alterConfigs(configs: Map[Resource, AlterConfigsRequest.Config], validateOnly: Boolean): Map[Resource, ApiError] = {
configs.map { case (resource, config) =>
+
+ def validateConfigPolicy(resourceType: ConfigResource.Type): Unit = {
+ alterConfigPolicy match {
+ case Some(policy) =>
+ val configEntriesMap = config.entries.asScala.map(entry => (entry.name, entry.value)).toMap
+ policy.validate(new AlterConfigPolicy.RequestMetadata(
+ new ConfigResource(resourceType, resource.name), configEntriesMap.asJava))
+ case None =>
+ }
+ }
try {
resource.`type` match {
case ResourceType.TOPIC =>
@@ -347,31 +348,40 @@ class AdminManager(val config: KafkaConfig,
val properties = new Properties
config.entries.asScala.foreach { configEntry =>
- properties.setProperty(configEntry.name(), configEntry.value())
+ properties.setProperty(configEntry.name, configEntry.value)
+ }
+
+ adminZkClient.validateTopicConfig(topic, properties)
+ validateConfigPolicy(ConfigResource.Type.TOPIC)
+ if (!validateOnly)
+ adminZkClient.changeTopicConfig(topic, properties)
+
+ resource -> ApiError.NONE
+
+ case ResourceType.BROKER =>
+ val brokerId = if (resource.name == null || resource.name.isEmpty)
+ None
+ else
+ Some(resourceNameToBrokerId(resource.name))
+ val configProps = new Properties
+ config.entries.asScala.foreach { configEntry =>
+ configProps.setProperty(configEntry.name, configEntry.value)
}
- alterConfigPolicy match {
- case Some(policy) =>
- adminZkClient.validateTopicConfig(topic, properties)
-
- val configEntriesMap = config.entries.asScala.map(entry => (entry.name, entry.value)).toMap
- policy.validate(new AlterConfigPolicy.RequestMetadata(
- new ConfigResource(ConfigResource.Type.TOPIC, resource.name), configEntriesMap.asJava))
-
- if (!validateOnly)
- adminZkClient.changeTopicConfig(topic, properties)
- case None =>
- if (validateOnly)
- adminZkClient.validateTopicConfig(topic, properties)
- else
- adminZkClient.changeTopicConfig(topic, properties)
+ val perBrokerConfig = brokerId.nonEmpty
+ this.config.dynamicConfig.validate(configProps, perBrokerConfig)
+ validateConfigPolicy(ConfigResource.Type.BROKER)
+ if (!validateOnly) {
+ adminZkClient.changeBrokerConfig(brokerId,
+ this.config.dynamicConfig.toPersistentProps(configProps, perBrokerConfig))
}
+
resource -> ApiError.NONE
case resourceType =>
- throw new InvalidRequestException(s"AlterConfigs is only supported for topics, but resource type is $resourceType")
+ throw new InvalidRequestException(s"AlterConfigs is only supported for topics and brokers, but resource type is $resourceType")
}
} catch {
- case e: ConfigException =>
+ case e @ (_: ConfigException | _: IllegalArgumentException) =>
val message = s"Invalid config value for resource $resource: ${e.getMessage}"
info(message)
resource -> ApiError.fromThrowable(new InvalidRequestException(message, e))
@@ -392,4 +402,73 @@ class AdminManager(val config: KafkaConfig,
CoreUtils.swallow(createTopicPolicy.foreach(_.close()), this)
CoreUtils.swallow(alterConfigPolicy.foreach(_.close()), this)
}
+
+ private def resourceNameToBrokerId(resourceName: String): Int = {
+ try resourceName.toInt catch {
+ case _: NumberFormatException =>
+ throw new InvalidRequestException(s"Broker id must be an integer, but it is: $resourceName")
+ }
+ }
+
+ private def brokerSynonyms(name: String): List[String] = {
+ DynamicBrokerConfig.brokerConfigSynonyms(name, matchListenerOverride = true)
+ }
+
+ private def configType(name: String, synonyms: List[String]): ConfigDef.Type = {
+ val configType = config.typeOf(name)
+ if (configType != null)
+ configType
+ else
+ synonyms.iterator.map(config.typeOf).find(_ != null).orNull
+ }
+
+ private def configSynonyms(name: String, synonyms: List[String], isSensitive: Boolean): List[DescribeConfigsResponse.ConfigSynonym] = {
+ val dynamicConfig = config.dynamicConfig
+ val allSynonyms = mutable.Buffer[DescribeConfigsResponse.ConfigSynonym]()
+
+ def maybeAddSynonym(map: Map[String, String], source: ConfigSource)(name: String): Unit = {
+ map.get(name).map { value =>
+ val configValue = if (isSensitive) null else value
+ allSynonyms += new DescribeConfigsResponse.ConfigSynonym(name, configValue, source)
+ }
+ }
+
+ synonyms.foreach(maybeAddSynonym(dynamicConfig.currentDynamicBrokerConfigs, ConfigSource.DYNAMIC_BROKER_CONFIG))
+ synonyms.foreach(maybeAddSynonym(dynamicConfig.currentDynamicDefaultConfigs, ConfigSource.DYNAMIC_DEFAULT_BROKER_CONFIG))
+ synonyms.foreach(maybeAddSynonym(dynamicConfig.staticBrokerConfigs, ConfigSource.STATIC_BROKER_CONFIG))
+ synonyms.foreach(maybeAddSynonym(dynamicConfig.staticDefaultConfigs, ConfigSource.DEFAULT_CONFIG))
+ allSynonyms.dropWhile(s => s.name != name).toList // e.g. drop listener overrides when describing base config
+ }
+
+ private def createTopicConfigEntry(logConfig: LogConfig, topicProps: Properties, includeSynonyms: Boolean)
+ (name: String, value: Any): DescribeConfigsResponse.ConfigEntry = {
+ val configEntryType = logConfig.typeOf(name)
+ val isSensitive = configEntryType == ConfigDef.Type.PASSWORD
+ val valueAsString = if (isSensitive) null else ConfigDef.convertToString(value, configEntryType)
+ val allSynonyms = {
+ val list = LogConfig.TopicConfigSynonyms.get(name)
+ .map(s => configSynonyms(s, brokerSynonyms(s), isSensitive))
+ .getOrElse(List.empty)
+ if (!topicProps.containsKey(name))
+ list
+ else
+ new DescribeConfigsResponse.ConfigSynonym(name, valueAsString, ConfigSource.TOPIC_CONFIG) +: list
+ }
+ val source = if (allSynonyms.isEmpty) ConfigSource.DEFAULT_CONFIG else allSynonyms.head.source
+ val synonyms = if (!includeSynonyms) List.empty else allSynonyms
+ new DescribeConfigsResponse.ConfigEntry(name, valueAsString, source, isSensitive, false, synonyms.asJava)
+ }
+
+ private def createBrokerConfigEntry(includeSynonyms: Boolean)
+ (name: String, value: Any): DescribeConfigsResponse.ConfigEntry = {
+ val allNames = brokerSynonyms(name)
+ val configEntryType = configType(name, allNames)
+ val isSensitive = configEntryType == ConfigDef.Type.PASSWORD
+ val valueAsString = if (isSensitive) null else ConfigDef.convertToString(value, configEntryType)
+ val allSynonyms = configSynonyms(name, allNames, isSensitive)
+ val synonyms = if (!includeSynonyms) List.empty else allSynonyms
+ val source = if (allSynonyms.isEmpty) ConfigSource.DEFAULT_CONFIG else allSynonyms.head.source
+ val readOnly = !allNames.exists(DynamicBrokerConfig.AllDynamicConfigs.contains)
+ new DescribeConfigsResponse.ConfigEntry(name, valueAsString, source, isSensitive, readOnly, synonyms.asJava)
+ }
}
diff --git a/core/src/main/scala/kafka/server/ConfigHandler.scala b/core/src/main/scala/kafka/server/ConfigHandler.scala
index 390222d..23322c1 100644
--- a/core/src/main/scala/kafka/server/ConfigHandler.scala
+++ b/core/src/main/scala/kafka/server/ConfigHandler.scala
@@ -177,7 +177,8 @@ class UserConfigHandler(private val quotaManagers: QuotaManagers, val credential
* The callback provides the brokerId and the full properties set read from ZK.
* This implementation reports the overrides to the respective ReplicationQuotaManager objects
*/
-class BrokerConfigHandler(private val brokerConfig: KafkaConfig, private val quotaManagers: QuotaManagers) extends ConfigHandler with Logging {
+class BrokerConfigHandler(private val brokerConfig: KafkaConfig,
+ private val quotaManagers: QuotaManagers) extends ConfigHandler with Logging {
def processConfigChanges(brokerId: String, properties: Properties) {
def getOrDefault(prop: String): Long = {
@@ -186,7 +187,10 @@ class BrokerConfigHandler(private val brokerConfig: KafkaConfig, private val quo
else
DefaultReplicationThrottledRate
}
- if (brokerConfig.brokerId == brokerId.trim.toInt) {
+ if (brokerId == ConfigEntityName.Default)
+ brokerConfig.dynamicConfig.updateDefaultConfig(properties)
+ else if (brokerConfig.brokerId == brokerId.trim.toInt) {
+ brokerConfig.dynamicConfig.updateBrokerConfig(brokerConfig.brokerId, properties)
quotaManagers.leader.updateQuota(upperBound(getOrDefault(LeaderReplicationThrottledRateProp)))
quotaManagers.follower.updateQuota(upperBound(getOrDefault(FollowerReplicationThrottledRateProp)))
quotaManagers.alterLogDirs.updateQuota(upperBound(getOrDefault(ReplicaAlterLogDirsIoMaxBytesPerSecondProp)))
diff --git a/core/src/main/scala/kafka/server/DynamicBrokerConfig.scala b/core/src/main/scala/kafka/server/DynamicBrokerConfig.scala
new file mode 100755
index 0000000..f307b8d
--- /dev/null
+++ b/core/src/main/scala/kafka/server/DynamicBrokerConfig.scala
@@ -0,0 +1,360 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package kafka.server
+
+import java.nio.charset.StandardCharsets
+import java.util
+import java.util.Properties
+import java.util.concurrent.locks.ReentrantReadWriteLock
+
+import kafka.server.DynamicBrokerConfig._
+import kafka.utils.{CoreUtils, Logging}
+import kafka.zk.{AdminZkClient, KafkaZkClient}
+import org.apache.kafka.common.Reconfigurable
+import org.apache.kafka.common.config.{ConfigDef, ConfigException, SslConfigs}
+import org.apache.kafka.common.network.ListenerReconfigurable
+import org.apache.kafka.common.utils.Base64
+
+import scala.collection._
+import scala.collection.JavaConverters._
+
+/**
+ * Dynamic broker configurations are stored in ZooKeeper and may be defined at two levels:
+ * <ul>
+ * <li>Per-broker configs persisted at <tt>/configs/brokers/{brokerId}</tt>: These can be described/altered
+ * using AdminClient using the resource name brokerId.</li>
+ * <li>Cluster-wide defaults persisted at <tt>/configs/brokers/<default></tt>: These can be described/altered
+ * using AdminClient using an empty resource name.</li>
+ * </ul>
+ * The order of precedence for broker configs is:
+ * <ol>
+ * <li>DYNAMIC_BROKER_CONFIG: stored in ZK at /configs/brokers/{brokerId}</li>
+ * <li>DYNAMIC_DEFAULT_BROKER_CONFIG: stored in ZK at /configs/brokers/<default></li>
+ * <li>STATIC_BROKER_CONFIG: properties that broker is started up with, typically from server.properties file</li>
+ * <li>DEFAULT_CONFIG: Default configs defined in KafkaConfig</li>
+ * </ol>
+ * Log configs use topic config overrides if defined and fallback to broker defaults using the order of precedence above.
+ * Topic config overrides may use a different config name from the default broker config.
+ * See [[kafka.log.LogConfig#TopicConfigSynonyms]] for the mapping.
+ * <p>
+ * AdminClient returns all config synonyms in the order of precedence when configs are described with
+ * <code>includeSynonyms</code>. In addition to configs that may be defined with the same name at different levels,
+ * some configs have additional synonyms.
+ * </p>
+ * <ul>
+ * <li>Listener configs may be defined using the prefix <tt>listener.name.{listenerName}.{configName}</tt>. These may be
+ * configured as dynamic or static broker configs. Listener configs have higher precedence than the base configs
+ * that don't specify the listener name. Listeners without a listener config use the base config. Base configs
+ * may be defined only as STATIC_BROKER_CONFIG or DEFAULT_CONFIG and cannot be updated dynamically.<li>
+ * <li>Some configs may be defined using multiple properties. For example, <tt>log.roll.ms</tt> and
+ * <tt>log.roll.hours</tt> refer to the same config that may be defined in milliseconds or hours. The order of
+ * precedence of these synonyms is described in the docs of these configs in [[kafka.server.KafkaConfig]].</li>
+ * </ul>
+ *
+ */
+object DynamicBrokerConfig {
+
+ private val DynamicPasswordConfigs = Set(
+ SslConfigs.SSL_KEYSTORE_PASSWORD_CONFIG,
+ SslConfigs.SSL_KEY_PASSWORD_CONFIG
+ )
+ private val DynamicSecurityConfigs = SslConfigs.RECONFIGURABLE_CONFIGS.asScala
+
+ val AllDynamicConfigs = mutable.Set[String]()
+ AllDynamicConfigs ++= DynamicSecurityConfigs
+
+ private val PerBrokerConfigs = DynamicSecurityConfigs
+
+ val ListenerConfigRegex = """listener\.name\.[^.]*\.(.*)""".r
+
+
+ def brokerConfigSynonyms(name: String, matchListenerOverride: Boolean): List[String] = {
+ name match {
+ case KafkaConfig.LogRollTimeMillisProp | KafkaConfig.LogRollTimeHoursProp =>
+ List(KafkaConfig.LogRollTimeMillisProp, KafkaConfig.LogRollTimeHoursProp)
+ case KafkaConfig.LogRollTimeJitterMillisProp | KafkaConfig.LogRollTimeJitterHoursProp =>
+ List(KafkaConfig.LogRollTimeJitterMillisProp, KafkaConfig.LogRollTimeJitterHoursProp)
+ case KafkaConfig.LogFlushIntervalMsProp => // LogFlushSchedulerIntervalMsProp is used as default
+ List(KafkaConfig.LogFlushIntervalMsProp, KafkaConfig.LogFlushSchedulerIntervalMsProp)
+ case KafkaConfig.LogRetentionTimeMillisProp | KafkaConfig.LogRetentionTimeMinutesProp | KafkaConfig.LogRetentionTimeHoursProp =>
+ List(KafkaConfig.LogRetentionTimeMillisProp, KafkaConfig.LogRetentionTimeMinutesProp, KafkaConfig.LogRetentionTimeHoursProp)
+ case ListenerConfigRegex(baseName) if matchListenerOverride => List(name, baseName)
+ case _ => List(name)
+ }
+ }
+
+ private[server] def addDynamicConfigs(configDef: ConfigDef): Unit = {
+ KafkaConfig.configKeys.filterKeys(AllDynamicConfigs.contains).values.foreach { config =>
+ configDef.define(config.name, config.`type`, config.defaultValue, config.validator,
+ config.importance, config.documentation, config.group, config.orderInGroup, config.width,
+ config.displayName, config.dependents, config.recommender)
+ }
+ }
+}
+
+class DynamicBrokerConfig(private val kafkaConfig: KafkaConfig) extends Logging {
+
+ private[server] val staticBrokerConfigs = ConfigDef.convertToStringMapWithPasswordValues(kafkaConfig.originalsFromThisConfig).asScala
+ private[server] val staticDefaultConfigs = ConfigDef.convertToStringMapWithPasswordValues(KafkaConfig.defaultValues.asJava).asScala
+ private val dynamicBrokerConfigs = mutable.Map[String, String]()
+ private val dynamicDefaultConfigs = mutable.Map[String, String]()
+ private val brokerId = kafkaConfig.brokerId
+ private val reconfigurables = mutable.Buffer[Reconfigurable]()
+ private val lock = new ReentrantReadWriteLock
+ private var currentConfig = kafkaConfig
+
+ private[server] def initialize(zkClient: KafkaZkClient): Unit = {
+ val adminZkClient = new AdminZkClient(zkClient)
+ updateDefaultConfig(adminZkClient.fetchEntityConfig(ConfigType.Broker, ConfigEntityName.Default))
+ updateBrokerConfig(brokerId, adminZkClient.fetchEntityConfig(ConfigType.Broker, brokerId.toString))
+ }
+
+ def addReconfigurable(reconfigurable: Reconfigurable): Unit = CoreUtils.inWriteLock(lock) {
+ require(reconfigurable.reconfigurableConfigs.asScala.forall(AllDynamicConfigs.contains))
+ reconfigurables += reconfigurable
+ }
+
+ def removeReconfigurable(reconfigurable: Reconfigurable): Unit = CoreUtils.inWriteLock(lock) {
+ reconfigurables -= reconfigurable
+ }
+
+ // Visibility for testing
+ private[server] def currentKafkaConfig: KafkaConfig = CoreUtils.inReadLock(lock) {
+ currentConfig
+ }
+
+ private[server] def currentDynamicBrokerConfigs: Map[String, String] = CoreUtils.inReadLock(lock) {
+ dynamicBrokerConfigs.clone()
+ }
+
+ private[server] def currentDynamicDefaultConfigs: Map[String, String] = CoreUtils.inReadLock(lock) {
+ dynamicDefaultConfigs.clone()
+ }
+
+ private[server] def updateBrokerConfig(brokerId: Int, persistentProps: Properties): Unit = CoreUtils.inWriteLock(lock) {
+ try {
+ val props = fromPersistentProps(persistentProps, perBrokerConfig = true)
+ dynamicBrokerConfigs.clear()
+ dynamicBrokerConfigs ++= props.asScala
+ updateCurrentConfig()
+ } catch {
+ case e: Exception => error(s"Per-broker configs of $brokerId could not be applied: $persistentProps", e)
+ }
+ }
+
+ private[server] def updateDefaultConfig(persistentProps: Properties): Unit = CoreUtils.inWriteLock(lock) {
+ try {
+ val props = fromPersistentProps(persistentProps, perBrokerConfig = false)
+ dynamicDefaultConfigs.clear()
+ dynamicDefaultConfigs ++= props.asScala
+ updateCurrentConfig()
+ } catch {
+ case e: Exception => error(s"Cluster default configs could not be applied: $persistentProps", e)
+ }
+ }
+
+ private[server] def toPersistentProps(configProps: Properties, perBrokerConfig: Boolean): Properties = {
+ val props = configProps.clone().asInstanceOf[Properties]
+ // TODO (KAFKA-6246): encrypt passwords
+ def encodePassword(configName: String): Unit = {
+ val value = props.getProperty(configName)
+ if (value != null) {
+ if (!perBrokerConfig)
+ throw new ConfigException("Password config can be defined only at broker level")
+ props.setProperty(configName, Base64.encoder.encodeToString(value.getBytes(StandardCharsets.UTF_8)))
+ }
+ }
+ DynamicPasswordConfigs.foreach(encodePassword)
+ props
+ }
+
+ private[server] def fromPersistentProps(persistentProps: Properties, perBrokerConfig: Boolean): Properties = {
+ val props = persistentProps.clone().asInstanceOf[Properties]
+
+ // Remove all invalid configs from `props`
+ removeInvalidConfigs(props, perBrokerConfig)
+ def removeInvalidProps(invalidPropNames: Set[String], errorMessage: String): Unit = {
+ if (invalidPropNames.nonEmpty) {
+ invalidPropNames.foreach(props.remove)
+ error(s"$errorMessage: $invalidPropNames")
+ }
+ }
+ removeInvalidProps(nonDynamicConfigs(props), "Non-dynamic configs configured in ZooKeeper will be ignored")
+ removeInvalidProps(securityConfigsWithoutListenerPrefix(props),
+ "Security configs can be dynamically updated only using listener prefix, base configs will be ignored")
+ if (!perBrokerConfig)
+ removeInvalidProps(perBrokerConfigs(props), "Per-broker configs defined at default cluster level will be ignored")
+
+ // TODO (KAFKA-6246): encrypt passwords
+ def decodePassword(configName: String): Unit = {
+ val value = props.getProperty(configName)
+ if (value != null) {
+ props.setProperty(configName, new String(Base64.decoder.decode(value), StandardCharsets.UTF_8))
+ }
+ }
+ DynamicPasswordConfigs.foreach(decodePassword)
+ props
+ }
+
+ private[server] def validate(props: Properties, perBrokerConfig: Boolean): Unit = CoreUtils.inReadLock(lock) {
+ def checkInvalidProps(invalidPropNames: Set[String], errorMessage: String): Unit = {
+ if (invalidPropNames.nonEmpty)
+ throw new ConfigException(s"$errorMessage: $invalidPropNames")
+ }
+ checkInvalidProps(nonDynamicConfigs(props), "Cannot update these configs dynamically")
+ checkInvalidProps(securityConfigsWithoutListenerPrefix(props),
+ "These security configs can be dynamically updated only per-listener using the listener prefix")
+ validateConfigTypes(props)
+ val newProps = mutable.Map[String, String]()
+ newProps ++= staticBrokerConfigs
+ if (perBrokerConfig) {
+ overrideProps(newProps, dynamicDefaultConfigs)
+ overrideProps(newProps, props.asScala)
+ } else {
+ checkInvalidProps(perBrokerConfigs(props),
+ "Cannot update these configs at default cluster level, broker id must be specified")
+ overrideProps(newProps, props.asScala)
+ overrideProps(newProps, dynamicBrokerConfigs)
+ }
+ processReconfiguration(newProps, validateOnly = true)
+ }
+
+ private def perBrokerConfigs(props: Properties): Set[String] = {
+ val configNames = props.asScala.keySet
+ configNames.intersect(PerBrokerConfigs) ++ configNames.filter(ListenerConfigRegex.findFirstIn(_).nonEmpty)
+ }
+
+ private def nonDynamicConfigs(props: Properties): Set[String] = {
+ props.asScala.keySet.intersect(DynamicConfig.Broker.nonDynamicProps)
+ }
+
+ private def securityConfigsWithoutListenerPrefix(props: Properties): Set[String] = {
+ DynamicSecurityConfigs.filter(props.containsKey)
+ }
+
+ private def validateConfigTypes(props: Properties): Unit = {
+ val baseProps = new Properties
+ props.asScala.foreach {
+ case (ListenerConfigRegex(baseName), v) => baseProps.put(baseName, v)
+ case (k, v) => baseProps.put(k, v)
+ }
+ DynamicConfig.Broker.validate(baseProps)
+ }
+
+ private def removeInvalidConfigs(props: Properties, perBrokerConfig: Boolean): Unit = {
+ try {
+ validateConfigTypes(props)
+ props.asScala
+ } catch {
+ case e: Exception =>
+ val invalidProps = props.asScala.filter { case (k, v) =>
+ val props1 = new Properties
+ props1.put(k, v)
+ try {
+ validateConfigTypes(props1)
+ false
+ } catch {
+ case _: Exception => true
+ }
+ }
+ invalidProps.foreach(props.remove)
+ val configSource = if (perBrokerConfig) "broker" else "default cluster"
+ error(s"Dynamic $configSource config contains invalid values: $invalidProps, these configs will be ignored", e)
+ }
+ }
+
+ private def updatedConfigs(newProps: java.util.Map[String, _], currentProps: java.util.Map[_, _]): mutable.Map[String, _] = {
+ newProps.asScala.filter {
+ case (k, v) => v != currentProps.get(k)
+ }
+ }
+
+ /**
+ * Updates values in `props` with the new values from `propsOverride`. Synonyms of updated configs
+ * are removed from `props` to ensure that the config with the higher precedence is applied. For example,
+ * if `log.roll.ms` was defined in server.properties and `log.roll.hours` is configured dynamically,
+ * `log.roll.hours` from the dynamic configuration will be used and `log.roll.ms` will be removed from
+ * `props` (even though `log.roll.hours` is secondary to `log.roll.ms`).
+ */
+ private def overrideProps(props: mutable.Map[String, String], propsOverride: mutable.Map[String, String]): Unit = {
+ propsOverride.foreach { case (k, v) =>
+ // Remove synonyms of `k` to ensure the right precedence is applied. But disable `matchListenerOverride`
+ // so that base configs corresponding to listener configs are not removed. Base configs should not be removed
+ // since they may be used by other listeners. It is ok to retain them in `props` since base configs cannot be
+ // dynamically updated and listener-specific configs have the higher precedence.
+ brokerConfigSynonyms(k, matchListenerOverride = false).foreach(props.remove)
+ props.put(k, v)
+ }
+ }
+
+ private def updateCurrentConfig(): Unit = {
+ val newProps = mutable.Map[String, String]()
+ newProps ++= staticBrokerConfigs
+ overrideProps(newProps, dynamicDefaultConfigs)
+ overrideProps(newProps, dynamicBrokerConfigs)
+ val newConfig = processReconfiguration(newProps, validateOnly = false)
+ if (newConfig ne currentConfig) {
+ currentConfig = newConfig
+ kafkaConfig.updateCurrentConfig(currentConfig)
+ }
+ }
+
+ private def processReconfiguration(newProps: Map[String, String], validateOnly: Boolean): KafkaConfig = {
+ val newConfig = new KafkaConfig(newProps.asJava, !validateOnly, None)
+ val updatedMap = updatedConfigs(newConfig.originalsFromThisConfig, currentConfig.originals)
+ if (updatedMap.nonEmpty) {
+ try {
+ val customConfigs = new util.HashMap[String, Object](newConfig.originalsFromThisConfig) // non-Kafka configs
+ newConfig.valuesFromThisConfig.keySet.asScala.foreach(customConfigs.remove)
+ reconfigurables.foreach {
+ case listenerReconfigurable: ListenerReconfigurable =>
+ val listenerName = listenerReconfigurable.listenerName
+ val oldValues = currentConfig.valuesWithPrefixOverride(listenerName.configPrefix)
+ val newValues = newConfig.valuesFromThisConfigWithPrefixOverride(listenerName.configPrefix)
+ val updatedKeys = updatedConfigs(newValues, oldValues).keySet
+ processReconfigurable(listenerReconfigurable, updatedKeys, newValues, customConfigs, validateOnly)
+ case reconfigurable =>
+ processReconfigurable(reconfigurable, updatedMap.keySet, newConfig.valuesFromThisConfig, customConfigs, validateOnly)
+ }
+ newConfig
+ } catch {
+ case e: Exception =>
+ if (!validateOnly)
+ error(s"Failed to update broker configuration with configs : ${newConfig.originalsFromThisConfig}", e)
+ throw new ConfigException("Invalid dynamic configuration", e)
+ }
+ }
+ else
+ currentConfig
+ }
+
+ private def processReconfigurable(reconfigurable: Reconfigurable, updatedKeys: Set[String],
+ allNewConfigs: util.Map[String, _], newCustomConfigs: util.Map[String, Object],
+ validateOnly: Boolean): Unit = {
+ if (reconfigurable.reconfigurableConfigs.asScala.intersect(updatedKeys).nonEmpty) {
+ val newConfigs = new util.HashMap[String, Object]
+ allNewConfigs.asScala.foreach { case (k, v) => newConfigs.put(k, v.asInstanceOf[AnyRef]) }
+ newConfigs.putAll(newCustomConfigs)
+ if (validateOnly) {
+ if (!reconfigurable.validateReconfiguration(newConfigs))
+ throw new ConfigException("Validation of dynamic config update failed")
+ } else
+ reconfigurable.reconfigure(newConfigs)
+ }
+ }
+}
diff --git a/core/src/main/scala/kafka/server/DynamicConfig.scala b/core/src/main/scala/kafka/server/DynamicConfig.scala
index ddfdff8..1a401d2 100644
--- a/core/src/main/scala/kafka/server/DynamicConfig.scala
+++ b/core/src/main/scala/kafka/server/DynamicConfig.scala
@@ -18,12 +18,14 @@
package kafka.server
import java.util.Properties
+
import kafka.log.LogConfig
import kafka.security.CredentialProvider
import org.apache.kafka.common.config.ConfigDef
import org.apache.kafka.common.config.ConfigDef.Importance._
import org.apache.kafka.common.config.ConfigDef.Range._
import org.apache.kafka.common.config.ConfigDef.Type._
+
import scala.collection.JavaConverters._
/**
@@ -57,10 +59,12 @@ object DynamicConfig {
.define(LeaderReplicationThrottledRateProp, LONG, DefaultReplicationThrottledRate, atLeast(0), MEDIUM, LeaderReplicationThrottledRateDoc)
.define(FollowerReplicationThrottledRateProp, LONG, DefaultReplicationThrottledRate, atLeast(0), MEDIUM, FollowerReplicationThrottledRateDoc)
.define(ReplicaAlterLogDirsIoMaxBytesPerSecondProp, LONG, DefaultReplicationThrottledRate, atLeast(0), MEDIUM, ReplicaAlterLogDirsIoMaxBytesPerSecondDoc)
+ DynamicBrokerConfig.addDynamicConfigs(brokerConfigDef)
+ val nonDynamicProps = KafkaConfig.configNames.toSet -- brokerConfigDef.names.asScala
def names = brokerConfigDef.names
- def validate(props: Properties) = DynamicConfig.validate(brokerConfigDef, props)
+ def validate(props: Properties) = DynamicConfig.validate(brokerConfigDef, props, customPropsAllowed = true)
}
object Client {
@@ -87,7 +91,7 @@ object DynamicConfig {
def names = clientConfigs.names
- def validate(props: Properties) = DynamicConfig.validate(clientConfigs, props)
+ def validate(props: Properties) = DynamicConfig.validate(clientConfigs, props, customPropsAllowed = false)
}
object User {
@@ -100,14 +104,16 @@ object DynamicConfig {
def names = userConfigs.names
- def validate(props: Properties) = DynamicConfig.validate(userConfigs, props)
+ def validate(props: Properties) = DynamicConfig.validate(userConfigs, props, customPropsAllowed = false)
}
- private def validate(configDef: ConfigDef, props: Properties) = {
+ private def validate(configDef: ConfigDef, props: Properties, customPropsAllowed: Boolean) = {
//Validate Names
val names = configDef.names()
- props.keys.asScala.foreach { name =>
- require(names.contains(name), s"Unknown Dynamic Configuration '$name'.")
+ val propKeys = props.keySet.asScala.map(_.asInstanceOf[String])
+ if (!customPropsAllowed) {
+ val unknownKeys = propKeys.filter(!names.contains(_))
+ require(unknownKeys.isEmpty, s"Unknown Dynamic Configuration: $unknownKeys.")
}
//ValidateValues
configDef.parse(props)
diff --git a/core/src/main/scala/kafka/server/KafkaApis.scala b/core/src/main/scala/kafka/server/KafkaApis.scala
index b54a637..1ff75c0 100644
--- a/core/src/main/scala/kafka/server/KafkaApis.scala
+++ b/core/src/main/scala/kafka/server/KafkaApis.scala
@@ -1959,7 +1959,7 @@ class KafkaApis(val requestChannel: RequestChannel,
}
val authorizedConfigs = adminManager.describeConfigs(authorizedResources.map { resource =>
resource -> Option(describeConfigsRequest.configNames(resource)).map(_.asScala.toSet)
- }.toMap)
+ }.toMap, describeConfigsRequest.includeSynonyms)
val unauthorizedConfigs = unauthorizedResources.map { resource =>
val error = configsAuthorizationApiError(request.session, resource)
resource -> new DescribeConfigsResponse.Config(error, Collections.emptyList[DescribeConfigsResponse.ConfigEntry])
diff --git a/core/src/main/scala/kafka/server/KafkaConfig.scala b/core/src/main/scala/kafka/server/KafkaConfig.scala
index 6e8249f..47f13f6 100755
--- a/core/src/main/scala/kafka/server/KafkaConfig.scala
+++ b/core/src/main/scala/kafka/server/KafkaConfig.scala
@@ -17,6 +17,7 @@
package kafka.server
+import java.util
import java.util.Properties
import kafka.api.{ApiVersion, KAFKA_0_10_0_IV1}
@@ -28,7 +29,8 @@ import kafka.message.{BrokerCompressionCodec, CompressionCodec, Message, Message
import kafka.utils.CoreUtils
import kafka.utils.Implicits._
import org.apache.kafka.clients.CommonClientConfigs
-import org.apache.kafka.common.config.ConfigDef.ValidList
+import org.apache.kafka.common.Reconfigurable
+import org.apache.kafka.common.config.ConfigDef.{ConfigKey, ValidList}
import org.apache.kafka.common.config.internals.BrokerSecurityConfigs
import org.apache.kafka.common.config.{AbstractConfig, ConfigDef, ConfigException, SaslConfigs, SslConfigs, TopicConfig}
import org.apache.kafka.common.metrics.Sensor
@@ -912,6 +914,8 @@ object KafkaConfig {
}
def configNames() = configDef.names().asScala.toList.sorted
+ private[server] def defaultValues: Map[String, _] = configDef.defaultValues.asScala
+ private[server] def configKeys: Map[String, ConfigKey] = configDef.configKeys.asScala
def fromProps(props: Properties): KafkaConfig =
fromProps(props, true)
@@ -933,9 +937,37 @@ object KafkaConfig {
}
-class KafkaConfig(val props: java.util.Map[_, _], doLog: Boolean) extends AbstractConfig(KafkaConfig.configDef, props, doLog) {
+class KafkaConfig(val props: java.util.Map[_, _], doLog: Boolean, dynamicConfigOverride: Option[DynamicBrokerConfig])
+ extends AbstractConfig(KafkaConfig.configDef, props, doLog) {
- def this(props: java.util.Map[_, _]) = this(props, true)
+ def this(props: java.util.Map[_, _]) = this(props, true, None)
+ def this(props: java.util.Map[_, _], doLog: Boolean) = this(props, doLog, None)
+ private[server] val dynamicConfig = dynamicConfigOverride.getOrElse(new DynamicBrokerConfig(this))
+ // Cache the current config to avoid acquiring read lock to access from dynamicConfig
+ @volatile private var currentConfig = this
+
+ private[server] def updateCurrentConfig(newConfig: KafkaConfig): Unit = {
+ this.currentConfig = newConfig
+ }
+
+ override def originals: util.Map[String, AnyRef] =
+ if (this eq currentConfig) super.originals else currentConfig.originals
+ override def values: util.Map[String, _] =
+ if (this eq currentConfig) super.values else currentConfig.values
+ override def originalsStrings: util.Map[String, String] =
+ if (this eq currentConfig) super.originalsStrings else currentConfig.originalsStrings
+ override def originalsWithPrefix(prefix: String): util.Map[String, AnyRef] =
+ if (this eq currentConfig) super.originalsWithPrefix(prefix) else currentConfig.originalsWithPrefix(prefix)
+ override def valuesWithPrefixOverride(prefix: String): util.Map[String, AnyRef] =
+ if (this eq currentConfig) super.valuesWithPrefixOverride(prefix) else currentConfig.valuesWithPrefixOverride(prefix)
+ override def get(key: String): AnyRef =
+ if (this eq currentConfig) super.get(key) else currentConfig.get(key)
+
+ // During dynamic update, we use the values from this config, these are only used in DynamicBrokerConfig
+ private[server] def originalsFromThisConfig: util.Map[String, AnyRef] = super.originals
+ private[server] def valuesFromThisConfig: util.Map[String, _] = super.values
+ private[server] def valuesFromThisConfigWithPrefixOverride(prefix: String): util.Map[String, AnyRef] =
+ super.valuesWithPrefixOverride(prefix)
/** ********* Zookeeper Configuration ***********/
val zkConnect: String = getString(KafkaConfig.ZkConnectProp)
@@ -1094,10 +1126,10 @@ class KafkaConfig(val props: java.util.Map[_, _], doLog: Boolean) extends Abstra
val sslProtocol = getString(KafkaConfig.SslProtocolProp)
val sslProvider = getString(KafkaConfig.SslProviderProp)
val sslEnabledProtocols = getList(KafkaConfig.SslEnabledProtocolsProp)
- val sslKeystoreType = getString(KafkaConfig.SslKeystoreTypeProp)
- val sslKeystoreLocation = getString(KafkaConfig.SslKeystoreLocationProp)
- val sslKeystorePassword = getPassword(KafkaConfig.SslKeystorePasswordProp)
- val sslKeyPassword = getPassword(KafkaConfig.SslKeyPasswordProp)
+ def sslKeystoreType = getString(KafkaConfig.SslKeystoreTypeProp)
+ def sslKeystoreLocation = getString(KafkaConfig.SslKeystoreLocationProp)
+ def sslKeystorePassword = getPassword(KafkaConfig.SslKeystorePasswordProp)
+ def sslKeyPassword = getPassword(KafkaConfig.SslKeyPasswordProp)
val sslTruststoreType = getString(KafkaConfig.SslTruststoreTypeProp)
val sslTruststoreLocation = getString(KafkaConfig.SslTruststoreLocationProp)
val sslTruststorePassword = getPassword(KafkaConfig.SslTruststorePasswordProp)
@@ -1143,6 +1175,10 @@ class KafkaConfig(val props: java.util.Map[_, _], doLog: Boolean) extends Abstra
val advertisedListeners: Seq[EndPoint] = getAdvertisedListeners
private[kafka] lazy val listenerSecurityProtocolMap = getListenerSecurityProtocolMap
+ def addReconfigurable(reconfigurable: Reconfigurable): Unit = {
+ dynamicConfig.addReconfigurable(reconfigurable)
+ }
+
private def getLogRetentionTimeMillis: Long = {
val millisInMinute = 60L * 1000L
val millisInHour = 60L * millisInMinute
diff --git a/core/src/main/scala/kafka/server/KafkaServer.scala b/core/src/main/scala/kafka/server/KafkaServer.scala
index 355b741..80b0eb7 100755
--- a/core/src/main/scala/kafka/server/KafkaServer.scala
+++ b/core/src/main/scala/kafka/server/KafkaServer.scala
@@ -134,7 +134,7 @@ class KafkaServer(val config: KafkaConfig, time: Time = Time.SYSTEM, threadNameP
var kafkaController: KafkaController = null
- val kafkaScheduler = new KafkaScheduler(config.backgroundThreads)
+ var kafkaScheduler: KafkaScheduler = null
var metadataCache: MetadataCache = null
var quotaManagers: QuotaFactory.QuotaManagers = null
@@ -196,12 +196,17 @@ class KafkaServer(val config: KafkaConfig, time: Time = Time.SYSTEM, threadNameP
if (canStartup) {
brokerState.newState(Starting)
- /* start scheduler */
- kafkaScheduler.startup()
-
/* setup zookeeper */
initZkClient(time)
+ // initialize dynamic broker configs from ZooKeeper. Any updates made after this will be
+ // applied after DynamicConfigManager starts.
+ config.dynamicConfig.initialize(zkClient)
+
+ /* start scheduler */
+ kafkaScheduler = new KafkaScheduler(config.backgroundThreads)
+ kafkaScheduler.startup()
+
/* Get or create cluster_id */
_clusterId = getOrGenerateClusterId(zkClient)
info(s"Cluster ID = $clusterId")
diff --git a/core/src/main/scala/kafka/server/KafkaServerStartable.scala b/core/src/main/scala/kafka/server/KafkaServerStartable.scala
index eb290d2..bbe9aba 100644
--- a/core/src/main/scala/kafka/server/KafkaServerStartable.scala
+++ b/core/src/main/scala/kafka/server/KafkaServerStartable.scala
@@ -25,12 +25,12 @@ import kafka.utils.{Exit, Logging, VerifiableProperties}
object KafkaServerStartable {
def fromProps(serverProps: Properties) = {
val reporters = KafkaMetricsReporter.startReporters(new VerifiableProperties(serverProps))
- new KafkaServerStartable(KafkaConfig.fromProps(serverProps), reporters)
+ new KafkaServerStartable(KafkaConfig.fromProps(serverProps, false), reporters)
}
}
-class KafkaServerStartable(val serverConfig: KafkaConfig, reporters: Seq[KafkaMetricsReporter]) extends Logging {
- private val server = new KafkaServer(serverConfig, kafkaMetricsReporters = reporters)
+class KafkaServerStartable(val staticServerConfig: KafkaConfig, reporters: Seq[KafkaMetricsReporter]) extends Logging {
+ private val server = new KafkaServer(staticServerConfig, kafkaMetricsReporters = reporters)
def this(serverConfig: KafkaConfig) = this(serverConfig, Seq.empty)
diff --git a/core/src/main/scala/kafka/utils/ZkUtils.scala b/core/src/main/scala/kafka/utils/ZkUtils.scala
index 098670c..e04bce0 100644
--- a/core/src/main/scala/kafka/utils/ZkUtils.scala
+++ b/core/src/main/scala/kafka/utils/ZkUtils.scala
@@ -66,6 +66,7 @@ object ZkUtils {
val BrokerSequenceIdPath = s"$BrokersPath/seqid"
val ConfigChangesPath = s"$ConfigPath/changes"
val ConfigUsersPath = s"$ConfigPath/users"
+ val ConfigBrokersPath = s"$ConfigPath/brokers"
val ProducerIdBlockPath = "/latest_producer_id_block"
val SecureZkRootPaths = ZkData.SecureRootPaths
diff --git a/core/src/main/scala/kafka/zk/AdminZkClient.scala b/core/src/main/scala/kafka/zk/AdminZkClient.scala
index fe64414..6d7df3f 100644
--- a/core/src/main/scala/kafka/zk/AdminZkClient.scala
+++ b/core/src/main/scala/kafka/zk/AdminZkClient.scala
@@ -357,11 +357,34 @@ class AdminZkClient(zkClient: KafkaZkClient) extends Logging {
* @param configs: The config to change, as properties
*/
def changeBrokerConfig(brokers: Seq[Int], configs: Properties): Unit = {
- DynamicConfig.Broker.validate(configs)
- brokers.foreach { broker => changeEntityConfig(ConfigType.Broker, broker.toString, configs)
+ validateBrokerConfig(configs)
+ brokers.foreach {
+ broker => changeEntityConfig(ConfigType.Broker, broker.toString, configs)
}
}
+ /**
+ * Override a broker override or broker default config. These overrides will be persisted between sessions, and will
+ * override any defaults entered in the broker's config files
+ *
+ * @param broker: The broker to apply config changes to or None to update dynamic default configs
+ * @param configs: The config to change, as properties
+ */
+ def changeBrokerConfig(broker: Option[Int], configs: Properties): Unit = {
+ validateBrokerConfig(configs)
+ val entityName = broker.map(_.toString).getOrElse(ConfigEntityName.Default)
+ changeEntityConfig(ConfigType.Broker, broker.map(String.valueOf).getOrElse(ConfigEntityName.Default), configs)
+ }
+
+ /**
+ * Validate dynamic broker configs. Since broker configs may contain custom configs, the validation
+ * only verifies that the provided config does not contain any static configs.
+ * @param configs configs to validate
+ */
+ def validateBrokerConfig(configs: Properties): Unit = {
+ DynamicConfig.Broker.validate(configs)
+ }
+
private def changeEntityConfig(rootEntityType: String, fullSanitizedEntityName: String, configs: Properties) {
val sanitizedEntityPath = rootEntityType + '/' + fullSanitizedEntityName
zkClient.setOrCreateEntityConfigs(rootEntityType, fullSanitizedEntityName, configs)
diff --git a/core/src/main/scala/kafka/zk/ZkData.scala b/core/src/main/scala/kafka/zk/ZkData.scala
index 99fe591..b6352fa 100644
--- a/core/src/main/scala/kafka/zk/ZkData.scala
+++ b/core/src/main/scala/kafka/zk/ZkData.scala
@@ -546,7 +546,11 @@ object ZkData {
LogDirEventNotificationZNode.path
) ++ ConfigType.all.map(ConfigEntityTypeZNode.path)
- val SensitiveRootPaths = Seq(ConfigEntityTypeZNode.path(ConfigType.User), DelegationTokensZNode.path)
+ val SensitiveRootPaths = Seq(
+ ConfigEntityTypeZNode.path(ConfigType.User),
+ ConfigEntityTypeZNode.path(ConfigType.Broker),
+ DelegationTokensZNode.path
+ )
def sensitivePath(path: String): Boolean = {
path != null && SensitiveRootPaths.exists(path.startsWith)
diff --git a/core/src/test/scala/integration/kafka/server/DynamicBrokerReconfigurationTest.scala b/core/src/test/scala/integration/kafka/server/DynamicBrokerReconfigurationTest.scala
new file mode 100644
index 0000000..fc06b73
--- /dev/null
+++ b/core/src/test/scala/integration/kafka/server/DynamicBrokerReconfigurationTest.scala
@@ -0,0 +1,471 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package kafka.server
+
+import java.io.File
+import java.nio.file.{Files, StandardCopyOption}
+import java.util
+import java.util.{Collections, Properties}
+import java.util.concurrent.{ExecutionException, TimeUnit}
+
+import kafka.api.SaslSetup
+import kafka.coordinator.group.OffsetConfig
+import kafka.utils.{ShutdownableThread, TestUtils}
+import kafka.utils.Implicits._
+import kafka.zk.{ConfigEntityChangeNotificationZNode, ZooKeeperTestHarness}
+import org.apache.kafka.clients.admin.ConfigEntry.{ConfigSource, ConfigSynonym}
+import org.apache.kafka.clients.admin._
+import org.apache.kafka.clients.consumer.{ConsumerRecord, KafkaConsumer}
+import org.apache.kafka.clients.producer.{KafkaProducer, ProducerConfig, ProducerRecord}
+import org.apache.kafka.common.config.ConfigResource
+import org.apache.kafka.common.config.SslConfigs._
+import org.apache.kafka.common.config.types.Password
+import org.apache.kafka.common.errors.{AuthenticationException, InvalidRequestException}
+import org.apache.kafka.common.internals.Topic
+import org.apache.kafka.common.network.{ListenerName, Mode}
+import org.apache.kafka.common.security.auth.SecurityProtocol
+import org.apache.kafka.common.serialization.{StringDeserializer, StringSerializer}
+import org.junit.Assert._
+import org.junit.{After, Before, Test}
+
+import scala.collection.mutable.ArrayBuffer
+import scala.collection.JavaConverters._
+
+object DynamicBrokerReconfigurationTest {
+ val SecureInternal = "INTERNAL"
+ val SecureExternal = "EXTERNAL"
+}
+
+class DynamicBrokerReconfigurationTest extends ZooKeeperTestHarness with SaslSetup {
+
+ import DynamicBrokerReconfigurationTest._
+
+ private var servers = new ArrayBuffer[KafkaServer]
+ private val numServers = 3
+ private val producers = new ArrayBuffer[KafkaProducer[String, String]]
+ private val consumers = new ArrayBuffer[KafkaConsumer[String, String]]
+ private val adminClients = new ArrayBuffer[AdminClient]()
+ private val clientThreads = new ArrayBuffer[ShutdownableThread]()
+ private val topic = "testtopic"
+
+ private val kafkaClientSaslMechanism = "PLAIN"
+ private val kafkaServerSaslMechanisms = List("PLAIN")
+ private val clientSaslProps = kafkaClientSaslProperties(kafkaClientSaslMechanism, dynamicJaasConfig = true)
+
+ private val trustStoreFile1 = File.createTempFile("truststore", ".jks")
+ private val trustStoreFile2 = File.createTempFile("truststore", ".jks")
+ private val sslProperties1 = TestUtils.sslConfigs(Mode.SERVER, clientCert = false, Some(trustStoreFile1), "kafka")
+ private val sslProperties2 = TestUtils.sslConfigs(Mode.SERVER, clientCert = false, Some(trustStoreFile2), "kafka")
+ private val invalidSslProperties = invalidSslConfigs
+
+ @Before
+ override def setUp(): Unit = {
+ startSasl(jaasSections(kafkaServerSaslMechanisms, Some(kafkaClientSaslMechanism)))
+ super.setUp()
+
+ (0 until numServers).foreach { brokerId =>
+
+ val props = TestUtils.createBrokerConfig(brokerId, zkConnect, trustStoreFile = Some(trustStoreFile1))
+ // Ensure that we can support multiple listeners per security protocol and multiple security protocols
+ props.put(KafkaConfig.ListenersProp, s"$SecureInternal://localhost:0, $SecureExternal://localhost:0")
+ props.put(KafkaConfig.ListenerSecurityProtocolMapProp, s"$SecureInternal:SSL, $SecureExternal:SASL_SSL")
+ props.put(KafkaConfig.InterBrokerListenerNameProp, SecureInternal)
+ props.put(KafkaConfig.ZkEnableSecureAclsProp, "true")
+ props.put(KafkaConfig.SaslEnabledMechanismsProp, kafkaServerSaslMechanisms.mkString(","))
+
+ props ++= sslProperties1
+ addKeystoreWithListenerPrefix(sslProperties1, props, SecureInternal)
+
+ // Set invalid static properties to ensure that dynamic config is used
+ props ++= invalidSslProperties
+ addKeystoreWithListenerPrefix(invalidSslProperties, props, SecureExternal)
+
+ val kafkaConfig = KafkaConfig.fromProps(props)
+ configureDynamicKeystoreInZooKeeper(kafkaConfig, Seq(brokerId), sslProperties1)
+
+ servers += TestUtils.createServer(kafkaConfig)
+ }
+
+ TestUtils.createTopic(zkClient, Topic.GROUP_METADATA_TOPIC_NAME, OffsetConfig.DefaultOffsetsTopicNumPartitions,
+ replicationFactor = numServers, servers, servers.head.groupCoordinator.offsetsTopicConfigs)
+
+ TestUtils.createTopic(zkClient, topic, numPartitions = 10, replicationFactor = numServers, servers)
+ createAdminClient(SecurityProtocol.SSL, SecureInternal)
+ }
+
+ @After
+ override def tearDown() {
+ clientThreads.foreach(_.interrupt())
+ clientThreads.foreach(_.initiateShutdown())
+ clientThreads.foreach(_.join(5 * 1000))
+ producers.foreach(_.close())
+ consumers.foreach(_.close())
+ adminClients.foreach(_.close())
+ TestUtils.shutdownServers(servers)
+ super.tearDown()
+ closeSasl()
+ }
+
+ @Test
+ def testKeystoreUpdate(): Unit = {
+ val producer = createProducer(trustStoreFile1, retries = 0)
+ val consumer = createConsumer("group1", trustStoreFile1)
+ verifyProduceConsume(producer, consumer, 10)
+
+ // Producer with new truststore should fail to connect before keystore update
+ val producer2 = createProducer(trustStoreFile2, retries = 0)
+ verifyAuthenticationFailure(producer2)
+
+ // Update broker keystore
+ configureDynamicKeystoreInZooKeeper(servers.head.config, servers.map(_.config.brokerId), sslProperties2)
+ waitForKeystore(sslProperties2)
+
+ // New producer with old truststore should fail to connect
+ val producer1 = createProducer(trustStoreFile1, retries = 0)
+ verifyAuthenticationFailure(producer1)
+
+ // New producer with new truststore should work
+ val producer3 = createProducer(trustStoreFile2, retries = 0)
+ verifyProduceConsume(producer3, consumer, 10)
+
+ // Old producer with old truststore should continue to work (with their old connections)
+ verifyProduceConsume(producer, consumer, 10)
+ }
+
+ @Test
+ def testKeyStoreDescribeUsingAdminClient(): Unit = {
+
+ def verifyConfig(configName: String, configEntry: ConfigEntry, isSensitive: Boolean, expectedProps: Properties): Unit = {
+ if (isSensitive) {
+ assertTrue(s"Value is sensitive: $configName", configEntry.isSensitive)
+ assertNull(s"Sensitive value returned for $configName", configEntry.value)
+ } else {
+ assertFalse(s"Config is not sensitive: $configName", configEntry.isSensitive)
+ assertEquals(expectedProps.getProperty(configName), configEntry.value)
+ }
+ }
+
+ def verifySynonym(configName: String, synonym: ConfigSynonym, isSensitive: Boolean,
+ expectedPrefix: String, expectedSource: ConfigSource, expectedProps: Properties): Unit = {
+ if (isSensitive)
+ assertNull(s"Sensitive value returned for $configName", synonym.value)
+ else
+ assertEquals(expectedProps.getProperty(configName), synonym.value)
+ assertTrue(s"Expected listener config, got $synonym", synonym.name.startsWith(expectedPrefix))
+ assertEquals(expectedSource, synonym.source)
+ }
+
+ def verifySynonyms(configName: String, synonyms: util.List[ConfigSynonym], isSensitive: Boolean,
+ prefix: String, defaultValue: Option[String]): Unit = {
+
+ val overrideCount = if (prefix.isEmpty) 0 else 2
+ assertEquals(s"Wrong synonyms for $configName: $synonyms", 1 + overrideCount + defaultValue.size, synonyms.size)
+ if (overrideCount > 0) {
+ val listenerPrefix = "listener.name.external.ssl."
+ verifySynonym(configName, synonyms.get(0), isSensitive, listenerPrefix, ConfigSource.DYNAMIC_BROKER_CONFIG, sslProperties1)
+ verifySynonym(configName, synonyms.get(1), isSensitive, listenerPrefix, ConfigSource.STATIC_BROKER_CONFIG, invalidSslProperties)
+ }
+ verifySynonym(configName, synonyms.get(overrideCount), isSensitive, "ssl.", ConfigSource.STATIC_BROKER_CONFIG, invalidSslProperties)
+ defaultValue.foreach { value =>
+ val defaultProps = new Properties
+ defaultProps.setProperty(configName, value)
+ verifySynonym(configName, synonyms.get(overrideCount + 1), isSensitive, "ssl.", ConfigSource.DEFAULT_CONFIG, defaultProps)
+ }
+ }
+
+ def verifySslConfig(prefix: String, expectedProps: Properties, configDesc: Config): Unit = {
+ Seq(SSL_KEYSTORE_LOCATION_CONFIG, SSL_KEYSTORE_TYPE_CONFIG, SSL_KEYSTORE_PASSWORD_CONFIG, SSL_KEY_PASSWORD_CONFIG).foreach { configName =>
+ val desc = configEntry(configDesc, s"$prefix$configName")
+ val isSensitive = configName.contains("password")
+ verifyConfig(configName, desc, isSensitive, if (prefix.isEmpty) invalidSslProperties else sslProperties1)
+ val defaultValue = if (configName == SSL_KEYSTORE_TYPE_CONFIG) Some("JKS") else None
+ verifySynonyms(configName, desc.synonyms, isSensitive, prefix, defaultValue)
+ }
+ }
+
+ val adminClient = adminClients.head
+
+ val configDesc = describeConfig(adminClient)
+ verifySslConfig("listener.name.external.", sslProperties1, configDesc)
+ verifySslConfig("", invalidSslProperties, configDesc)
+ }
+
+ @Test
+ def testKeyStoreAlterUsingAdminClient(): Unit = {
+ val topic2 = "testtopic2"
+ TestUtils.createTopic(zkClient, topic2, numPartitions = 10, replicationFactor = numServers, servers)
+
+ // Start a producer and consumer that work with the current truststore.
+ // This should continue working while changes are made
+ val (producerThread, consumerThread) = startProduceConsume(retries = 0)
+ TestUtils.waitUntilTrue(() => consumerThread.received >= 10, "Messages not received")
+
+ // Update broker keystore for external listener
+ val adminClient = adminClients.head
+ alterSslKeystore(adminClient, sslProperties2, SecureExternal)
+
+ // Produce/consume should work with new truststore
+ val producer = createProducer(trustStoreFile2, retries = 0)
+ val consumer = createConsumer("group1", trustStoreFile2, topic2)
+ verifyProduceConsume(producer, consumer, 10, topic2)
+
+ // Broker keystore update for internal listener with incompatible keystore should fail without update
+ alterSslKeystore(adminClient, sslProperties2, SecureInternal, expectFailure = true)
+ verifyProduceConsume(producer, consumer, 10, topic2)
+
+ // Broker keystore update for internal listener with incompatible keystore should succeed
+ val sslPropertiesCopy = sslProperties1.clone().asInstanceOf[Properties]
+ val oldFile = new File(sslProperties1.getProperty(SSL_KEYSTORE_LOCATION_CONFIG))
+ val newFile = File.createTempFile("keystore", ".jks")
+ Files.copy(oldFile.toPath, newFile.toPath, StandardCopyOption.REPLACE_EXISTING)
+ sslPropertiesCopy.setProperty(SSL_KEYSTORE_LOCATION_CONFIG, newFile.getPath)
+ alterSslKeystore(adminClient, sslPropertiesCopy, SecureInternal)
+ verifyProduceConsume(producer, consumer, 10, topic2)
+
+ // Verify that all messages sent with retries=0 while keystores were being altered were consumed
+ stopAndVerifyProduceConsume(producerThread, consumerThread, mayFailRequests = false)
+ }
+
+ private def createProducer(trustStore: File, retries: Int,
+ clientId: String = "test-producer"): KafkaProducer[String, String] = {
+ val bootstrapServers = TestUtils.bootstrapServers(servers, new ListenerName(SecureExternal))
+ val propsOverride = new Properties
+ propsOverride.put(ProducerConfig.CLIENT_ID_CONFIG, clientId)
+ val producer = TestUtils.createNewProducer(
+ bootstrapServers,
+ acks = -1,
+ retries = retries,
+ securityProtocol = SecurityProtocol.SASL_SSL,
+ trustStoreFile = Some(trustStore),
+ saslProperties = Some(clientSaslProps),
+ keySerializer = new StringSerializer,
+ valueSerializer = new StringSerializer,
+ props = Some(propsOverride))
+ producers += producer
+ producer
+ }
+
+ private def createConsumer(groupId: String, trustStore: File, topic: String = topic):KafkaConsumer[String, String] = {
+ val bootstrapServers = TestUtils.bootstrapServers(servers, new ListenerName(SecureExternal))
+ val consumer = TestUtils.createNewConsumer(
+ bootstrapServers,
+ groupId,
+ securityProtocol = SecurityProtocol.SASL_SSL,
+ trustStoreFile = Some(trustStore),
+ saslProperties = Some(clientSaslProps),
+ keyDeserializer = new StringDeserializer,
+ valueDeserializer = new StringDeserializer)
+ consumer.subscribe(Collections.singleton(topic))
+ consumers += consumer
+ consumer
+ }
+
+ private def createAdminClient(securityProtocol: SecurityProtocol, listenerName: String): AdminClient = {
+ val config = new util.HashMap[String, Object]
+ val bootstrapServers = TestUtils.bootstrapServers(servers, new ListenerName(listenerName))
+ config.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers)
+ val securityProps: util.Map[Object, Object] =
+ TestUtils.adminClientSecurityConfigs(securityProtocol, Some(trustStoreFile1), Some(clientSaslProps))
+ securityProps.asScala.foreach { case (key, value) => config.put(key.asInstanceOf[String], value) }
+ val adminClient = AdminClient.create(config)
+ adminClients += adminClient
+ adminClient
+ }
+
+ private def verifyProduceConsume(producer: KafkaProducer[String, String],
+ consumer: KafkaConsumer[String, String],
+ numRecords: Int,
+ topic: String = topic): Unit = {
+ val producerRecords = (1 to numRecords).map(i => new ProducerRecord(topic, s"key$i", s"value$i"))
+ producerRecords.map(producer.send).map(_.get(10, TimeUnit.SECONDS))
+
+ val records = new ArrayBuffer[ConsumerRecord[String, String]]
+ TestUtils.waitUntilTrue(() => {
+ records ++= consumer.poll(50).asScala
+ records.size == numRecords
+ }, s"Consumed ${records.size} records until timeout instead of the expected $numRecords records")
+ }
+
+ private def verifyAuthenticationFailure(producer: KafkaProducer[_, _]): Unit = {
+ try {
+ producer.partitionsFor(topic)
+ fail("Producer connection did not fail with invalid keystore")
+ } catch {
+ case _:AuthenticationException => // expected exception
+ }
+ }
+
+ private def describeConfig(adminClient: AdminClient): Config = {
+ val configResources = servers.map { server =>
+ new ConfigResource(ConfigResource.Type.BROKER, server.config.brokerId.toString)
+ }
+ val describeOptions = new DescribeConfigsOptions().includeSynonyms(true)
+ val describeResult = adminClient.describeConfigs(configResources.asJava, describeOptions).all.get
+ assertEquals(servers.size, describeResult.values.size)
+ val configDescription = describeResult.values.iterator.next
+ assertFalse("Configs are empty", configDescription.entries.isEmpty)
+ configDescription
+ }
+
+ private def alterSslKeystore(adminClient: AdminClient, props: Properties, listener: String, expectFailure: Boolean = false): Unit = {
+ val newProps = new Properties
+ val configPrefix = new ListenerName(listener).configPrefix
+ val keystoreLocation = props.getProperty(SSL_KEYSTORE_LOCATION_CONFIG)
+ newProps.setProperty(s"$configPrefix$SSL_KEYSTORE_LOCATION_CONFIG", keystoreLocation)
+ newProps.setProperty(s"$configPrefix$SSL_KEYSTORE_TYPE_CONFIG", props.getProperty(SSL_KEYSTORE_TYPE_CONFIG))
+ newProps.setProperty(s"$configPrefix$SSL_KEYSTORE_PASSWORD_CONFIG", props.get(SSL_KEYSTORE_PASSWORD_CONFIG).asInstanceOf[Password].value)
+ newProps.setProperty(s"$configPrefix$SSL_KEY_PASSWORD_CONFIG", props.get(SSL_KEY_PASSWORD_CONFIG).asInstanceOf[Password].value)
+ reconfigureServers(newProps, perBrokerConfig = true, (s"$configPrefix$SSL_KEYSTORE_LOCATION_CONFIG", keystoreLocation), expectFailure)
+ }
+
+ private def alterConfigs(adminClient: AdminClient, props: Properties, perBrokerConfig: Boolean): AlterConfigsResult = {
+ val configEntries = props.asScala.map { case (k, v) => new ConfigEntry(k, v) }.toList.asJava
+ val newConfig = new Config(configEntries)
+ val configs = if (perBrokerConfig) {
+ servers.map { server =>
+ val resource = new ConfigResource(ConfigResource.Type.BROKER, server.config.brokerId.toString)
+ (resource, newConfig)
+ }.toMap.asJava
+ } else {
+ Map(new ConfigResource(ConfigResource.Type.BROKER, "") -> newConfig).asJava
+ }
+ adminClient.alterConfigs(configs)
+ }
+
+ private def reconfigureServers(newProps: Properties, perBrokerConfig: Boolean, aPropToVerify: (String, String), expectFailure: Boolean = false): Unit = {
+ val alterResult = alterConfigs(adminClients.head, newProps, perBrokerConfig)
+ if (expectFailure) {
+ val oldProps = servers.head.config.values.asScala.filterKeys(newProps.containsKey)
+ val brokerResources = if (perBrokerConfig)
+ servers.map(server => new ConfigResource(ConfigResource.Type.BROKER, server.config.brokerId.toString))
+ else
+ Seq(new ConfigResource(ConfigResource.Type.BROKER, ""))
+ brokerResources.foreach { brokerResource =>
+ val exception = intercept[ExecutionException](alterResult.values.get(brokerResource).get)
+ assertTrue(exception.getCause.isInstanceOf[InvalidRequestException])
+ }
+ assertEquals(oldProps, servers.head.config.values.asScala.filterKeys(newProps.containsKey))
+ } else {
+ alterResult.all.get
+ waitForConfig(aPropToVerify._1, aPropToVerify._2)
+ }
+ }
+
+ private def configEntry(configDesc: Config, configName: String): ConfigEntry = {
+ configDesc.entries.asScala.find(cfg => cfg.name == configName)
+ .getOrElse(throw new IllegalStateException(s"Config not found $configName"))
+ }
+
+ private def addKeystoreWithListenerPrefix(srcProps: Properties, destProps: Properties, listener: String): Unit = {
+ val listenerPrefix = new ListenerName(listener).configPrefix
+ destProps.put(listenerPrefix + SSL_KEYSTORE_TYPE_CONFIG, srcProps.get(SSL_KEYSTORE_TYPE_CONFIG))
+ destProps.put(listenerPrefix + SSL_KEYSTORE_LOCATION_CONFIG, srcProps.get(SSL_KEYSTORE_LOCATION_CONFIG))
+ destProps.put(listenerPrefix + SSL_KEYSTORE_PASSWORD_CONFIG, srcProps.get(SSL_KEYSTORE_PASSWORD_CONFIG).asInstanceOf[Password].value)
+ destProps.put(listenerPrefix + SSL_KEY_PASSWORD_CONFIG, srcProps.get(SSL_KEY_PASSWORD_CONFIG).asInstanceOf[Password].value)
+ }
+
+ private def configureDynamicKeystoreInZooKeeper(kafkaConfig: KafkaConfig, brokers: Seq[Int], sslProperties: Properties): Unit = {
+ val keystoreProps = new Properties
+ addKeystoreWithListenerPrefix(sslProperties, keystoreProps, SecureExternal)
+ kafkaConfig.dynamicConfig.toPersistentProps(keystoreProps, perBrokerConfig = true)
+ zkClient.makeSurePersistentPathExists(ConfigEntityChangeNotificationZNode.path)
+ adminZkClient.changeBrokerConfig(brokers, keystoreProps)
+ }
+
+ private def waitForKeystore(sslProperties: Properties, maxWaitMs: Long = 10000): Unit = {
+ waitForConfig(new ListenerName(SecureExternal).configPrefix + SSL_KEYSTORE_LOCATION_CONFIG,
+ sslProperties.getProperty(SSL_KEYSTORE_LOCATION_CONFIG), maxWaitMs)
+
+ }
+
+ private def waitForConfig(propName: String, propValue: String, maxWaitMs: Long = 10000): Unit = {
+ servers.foreach { server =>
+ TestUtils.retry(maxWaitMs) {
+ assertEquals(propValue, server.config.originals.get(propName))
+ }
+ }
+ }
+
+ private def invalidSslConfigs: Properties = {
+ val props = new Properties
+ props.put(SSL_KEYSTORE_LOCATION_CONFIG, "invalid/file/path")
+ props.put(SSL_KEYSTORE_PASSWORD_CONFIG, new Password("invalid"))
+ props.put(SSL_KEY_PASSWORD_CONFIG, new Password("invalid"))
+ props.put(SSL_KEYSTORE_TYPE_CONFIG, "PKCS12")
+ props
+ }
+
+ private def startProduceConsume(retries: Int): (ProducerThread, ConsumerThread) = {
+ val producerThread = new ProducerThread(retries)
+ clientThreads += producerThread
+ val consumerThread = new ConsumerThread(producerThread)
+ clientThreads += consumerThread
+ consumerThread.start()
+ producerThread.start()
+ (producerThread, consumerThread)
+ }
+
+ private def stopAndVerifyProduceConsume(producerThread: ProducerThread, consumerThread: ConsumerThread,
+ mayFailRequests: Boolean): Unit = {
+ producerThread.shutdown()
+ consumerThread.initiateShutdown()
+ consumerThread.awaitShutdown()
+ if (!mayFailRequests)
+ assertEquals(producerThread.sent, consumerThread.received)
+ else {
+ assertTrue(s"Some messages not received, sent=${producerThread.sent} received=${consumerThread.received}",
+ consumerThread.received >= producerThread.sent)
+ }
+ }
+
+ private class ProducerThread(retries: Int) extends ShutdownableThread("test-producer", isInterruptible = false) {
+ private val producer = createProducer(trustStoreFile1, retries)
+ @volatile var sent = 0
+ override def doWork(): Unit = {
+ try {
+ while (isRunning.get) {
+ sent += 1
+ val record = new ProducerRecord(topic, s"key$sent", s"value$sent")
+ producer.send(record).get(10, TimeUnit.SECONDS)
+ }
+ } finally {
+ producer.close()
+ }
+ }
+ }
+
+ private class ConsumerThread(producerThread: ProducerThread) extends ShutdownableThread("test-consumer", isInterruptible = false) {
+ private val consumer = createConsumer("group1", trustStoreFile1)
+ @volatile private var endTimeMs = Long.MaxValue
+ var received = 0
+ override def doWork(): Unit = {
+ try {
+ while (isRunning.get || (received < producerThread.sent && System.currentTimeMillis < endTimeMs)) {
+ received += consumer.poll(50).count
+ }
+ } finally {
+ consumer.close()
+ }
+ }
+ override def initiateShutdown(): Boolean = {
+ endTimeMs = System.currentTimeMillis + 10 * 1000
+ super.initiateShutdown()
+ }
+ }
+}
diff --git a/core/src/test/scala/unit/kafka/server/DynamicBrokerConfigTest.scala b/core/src/test/scala/unit/kafka/server/DynamicBrokerConfigTest.scala
new file mode 100755
index 0000000..2032011
--- /dev/null
+++ b/core/src/test/scala/unit/kafka/server/DynamicBrokerConfigTest.scala
@@ -0,0 +1,153 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package kafka.server
+
+import java.util.Properties
+
+import kafka.utils.TestUtils
+import org.apache.kafka.common.config.{ConfigException, SslConfigs}
+import org.junit.Assert._
+import org.junit.Test
+
+class DynamicBrokerConfigTest {
+
+ @Test
+ def testConfigUpdate(): Unit = {
+ val props = TestUtils.createBrokerConfig(0, TestUtils.MockZkConnect, port = 8181)
+ val oldKeystore = "oldKs.jks"
+ props.put(SslConfigs.SSL_KEYSTORE_LOCATION_CONFIG, oldKeystore)
+ val config = KafkaConfig(props)
+ val dynamicConfig = config.dynamicConfig
+ assertSame(config, dynamicConfig.currentKafkaConfig)
+ assertEquals(oldKeystore, config.sslKeystoreLocation)
+ assertEquals(oldKeystore,
+ config.valuesFromThisConfigWithPrefixOverride("listener.name.external.").get(SslConfigs.SSL_KEYSTORE_LOCATION_CONFIG))
+ assertEquals(oldKeystore, config.originalsFromThisConfig.get(SslConfigs.SSL_KEYSTORE_LOCATION_CONFIG))
+
+ (1 to 2).foreach { i =>
+ val props1 = new Properties
+ val newKeystore = s"ks$i.jks"
+ props1.put(s"listener.name.external.${SslConfigs.SSL_KEYSTORE_LOCATION_CONFIG}", newKeystore)
+ dynamicConfig.updateBrokerConfig(0, props1)
+ assertNotSame(config, dynamicConfig.currentKafkaConfig)
+
+ assertEquals(newKeystore,
+ config.valuesWithPrefixOverride("listener.name.external.").get(SslConfigs.SSL_KEYSTORE_LOCATION_CONFIG))
+ assertEquals(newKeystore,
+ config.originalsWithPrefix("listener.name.external.").get(SslConfigs.SSL_KEYSTORE_LOCATION_CONFIG))
+ assertEquals(newKeystore,
+ config.valuesWithPrefixOverride("listener.name.external.").get(SslConfigs.SSL_KEYSTORE_LOCATION_CONFIG))
+ assertEquals(newKeystore,
+ config.originalsWithPrefix("listener.name.external.").get(SslConfigs.SSL_KEYSTORE_LOCATION_CONFIG))
+
+ assertEquals(oldKeystore, config.sslKeystoreLocation)
+ assertEquals(oldKeystore, config.originals.get(SslConfigs.SSL_KEYSTORE_LOCATION_CONFIG))
+ assertEquals(oldKeystore, config.values.get(SslConfigs.SSL_KEYSTORE_LOCATION_CONFIG))
+ assertEquals(oldKeystore, config.originalsStrings.get(SslConfigs.SSL_KEYSTORE_LOCATION_CONFIG))
+
+ assertEquals(oldKeystore,
+ config.valuesFromThisConfigWithPrefixOverride("listener.name.external.").get(SslConfigs.SSL_KEYSTORE_LOCATION_CONFIG))
+ assertEquals(oldKeystore, config.originalsFromThisConfig.get(SslConfigs.SSL_KEYSTORE_LOCATION_CONFIG))
+ assertEquals(oldKeystore, config.valuesFromThisConfig.get(SslConfigs.SSL_KEYSTORE_LOCATION_CONFIG))
+ assertEquals(oldKeystore, config.originalsFromThisConfig.get(SslConfigs.SSL_KEYSTORE_LOCATION_CONFIG))
+ assertEquals(oldKeystore, config.valuesFromThisConfig.get(SslConfigs.SSL_KEYSTORE_LOCATION_CONFIG))
+ }
+ }
+
+ @Test
+ def testConfigUpdateWithSomeInvalidConfigs(): Unit = {
+ val origProps = TestUtils.createBrokerConfig(0, TestUtils.MockZkConnect, port = 8181)
+ origProps.put(SslConfigs.SSL_KEYSTORE_TYPE_CONFIG, "JKS")
+ val config = KafkaConfig(origProps)
+
+ def verifyConfigUpdateWithInvalidConfig(validProps: Map[String, String], invalidProps: Map[String, String]): Unit = {
+ val props = new Properties
+ validProps.foreach { case (k, v) => props.put(k, v) }
+ invalidProps.foreach { case (k, v) => props.put(k, v) }
+
+ // DynamicBrokerConfig#validate is used by AdminClient to validate the configs provided in
+ // in an AlterConfigs request. Validation should fail with an exception if any of the configs are invalid.
+ try {
+ config.dynamicConfig.validate(props, perBrokerConfig = true)
+ fail("Invalid config did not fail validation")
+ } catch {
+ case e: ConfigException => // expected exception
+ }
+
+ // DynamicBrokerConfig#updateBrokerConfig is used to update configs from ZooKeeper during
+ // startup and when configs are updated in ZK. Update should apply valid configs and ignore
+ // invalid ones.
+ config.dynamicConfig.updateBrokerConfig(0, props)
+ validProps.foreach { case (name, value) => assertEquals(value, config.originals.get(name)) }
+ invalidProps.keySet.foreach { name =>
+ assertEquals(origProps.get(name), config.originals.get(name))
+ }
+ }
+
+ val validProps = Map(s"listener.name.external.${SslConfigs.SSL_KEYSTORE_LOCATION_CONFIG}" ->"ks.p12")
+ val securityPropsWithoutListenerPrefix = Map(SslConfigs.SSL_KEYSTORE_TYPE_CONFIG -> "PKCS12")
+ verifyConfigUpdateWithInvalidConfig(validProps, securityPropsWithoutListenerPrefix)
+ val nonDynamicProps = Map(KafkaConfig.ZkConnectProp -> "somehost:2181")
+ verifyConfigUpdateWithInvalidConfig(validProps, nonDynamicProps)
+ }
+
+ @Test
+ def testSecurityConfigs(): Unit = {
+ def verifyUpdate(name: String, value: Object, invalidValue: Boolean): Unit = {
+ verifyConfigUpdate(name, value, perBrokerConfig = true, expectFailure = true)
+ verifyConfigUpdate(s"listener.name.external.$name", value, perBrokerConfig = true, expectFailure = invalidValue)
+ verifyConfigUpdate(name, value, perBrokerConfig = false, expectFailure = true)
+ verifyConfigUpdate(s"listener.name.external.$name", value, perBrokerConfig = false, expectFailure = true)
+ }
+
+ verifyUpdate(SslConfigs.SSL_KEYSTORE_LOCATION_CONFIG, "ks.jks", invalidValue = false)
+ verifyUpdate(SslConfigs.SSL_KEYSTORE_TYPE_CONFIG, "JKS", invalidValue = false)
+ verifyUpdate(SslConfigs.SSL_KEYSTORE_PASSWORD_CONFIG, "password", invalidValue = false)
+ verifyUpdate(SslConfigs.SSL_KEY_PASSWORD_CONFIG, "password", invalidValue = false)
+ verifyUpdate(SslConfigs.SSL_KEYSTORE_PASSWORD_CONFIG, 1.asInstanceOf[Integer], invalidValue = true)
+ verifyUpdate(SslConfigs.SSL_KEY_PASSWORD_CONFIG, 1.asInstanceOf[Integer], invalidValue = true)
+ }
+
+ private def verifyConfigUpdate(name: String, value: Object, perBrokerConfig: Boolean, expectFailure: Boolean) {
+ val config = KafkaConfig(TestUtils.createBrokerConfig(0, TestUtils.MockZkConnect, port = 8181))
+ val props = new Properties
+ props.put(name, value)
+ val oldValue = config.originals.get(name)
+
+ def updateConfig() = {
+ if (perBrokerConfig)
+ config.dynamicConfig.updateBrokerConfig(0, props)
+ else
+ config.dynamicConfig.updateDefaultConfig(props)
+ }
+ if (!expectFailure) {
+ config.dynamicConfig.validate(props, perBrokerConfig)
+ updateConfig()
+ assertEquals(value, config.originals.get(name))
+ } else {
+ try {
+ config.dynamicConfig.validate(props, perBrokerConfig)
+ fail("Invalid config did not fail validation")
+ } catch {
+ case e: Exception => // expected exception
+ }
+ updateConfig()
+ assertEquals(oldValue, config.originals.get(name))
+ }
+ }
+}
diff --git a/core/src/test/scala/unit/kafka/server/DynamicConfigTest.scala b/core/src/test/scala/unit/kafka/server/DynamicConfigTest.scala
index b2378cf..cf09849 100644
--- a/core/src/test/scala/unit/kafka/server/DynamicConfigTest.scala
+++ b/core/src/test/scala/unit/kafka/server/DynamicConfigTest.scala
@@ -26,11 +26,6 @@ class DynamicConfigTest extends ZooKeeperTestHarness {
private final val someValue: String = "some interesting value"
@Test(expected = classOf[IllegalArgumentException])
- def shouldFailWhenChangingBrokerUnknownConfig() {
- adminZkClient.changeBrokerConfig(Seq(0), propsWith(nonExistentConfig, someValue))
- }
-
- @Test(expected = classOf[IllegalArgumentException])
def shouldFailWhenChangingClientIdUnknownConfig() {
adminZkClient.changeClientIdConfig("ClientId", propsWith(nonExistentConfig, someValue))
}
@@ -51,4 +46,4 @@ class DynamicConfigTest extends ZooKeeperTestHarness {
adminZkClient.changeBrokerConfig(Seq(0),
propsWith(DynamicConfig.Broker.FollowerReplicationThrottledRateProp, "-100"))
}
-}
\ No newline at end of file
+}
diff --git a/core/src/test/scala/unit/kafka/utils/TestUtils.scala b/core/src/test/scala/unit/kafka/utils/TestUtils.scala
index 09e4c94..30a10c7 100755
--- a/core/src/test/scala/unit/kafka/utils/TestUtils.scala
+++ b/core/src/test/scala/unit/kafka/utils/TestUtils.scala
@@ -49,7 +49,7 @@ import org.apache.kafka.common.internals.Topic
import org.apache.kafka.common.network.{ListenerName, Mode}
import org.apache.kafka.common.record._
import org.apache.kafka.common.security.auth.SecurityProtocol
-import org.apache.kafka.common.serialization.{ByteArraySerializer, Serializer}
+import org.apache.kafka.common.serialization.{ByteArrayDeserializer, ByteArraySerializer, Deserializer, Serializer}
import org.apache.kafka.common.utils.Time
import org.apache.kafka.common.utils.Utils._
import org.apache.kafka.test.{TestSslUtils, TestUtils => JTestUtils}
@@ -613,16 +613,18 @@ object TestUtils extends Logging {
/**
* Create a new consumer with a few pre-configured properties.
*/
- def createNewConsumer(brokerList: String,
- groupId: String = "group",
- autoOffsetReset: String = "earliest",
- partitionFetchSize: Long = 4096L,
- partitionAssignmentStrategy: String = classOf[RangeAssignor].getName,
- sessionTimeout: Int = 30000,
- securityProtocol: SecurityProtocol,
- trustStoreFile: Option[File] = None,
- saslProperties: Option[Properties] = None,
- props: Option[Properties] = None) : KafkaConsumer[Array[Byte],Array[Byte]] = {
+ def createNewConsumer[K, V](brokerList: String,
+ groupId: String = "group",
+ autoOffsetReset: String = "earliest",
+ partitionFetchSize: Long = 4096L,
+ partitionAssignmentStrategy: String = classOf[RangeAssignor].getName,
+ sessionTimeout: Int = 30000,
+ securityProtocol: SecurityProtocol,
+ trustStoreFile: Option[File] = None,
+ saslProperties: Option[Properties] = None,
+ keyDeserializer: Deserializer[K] = new ByteArrayDeserializer,
+ valueDeserializer: Deserializer[V] =new ByteArrayDeserializer,
+ props: Option[Properties] = None) : KafkaConsumer[K, V] = {
import org.apache.kafka.clients.consumer.ConsumerConfig
val consumerProps = props.getOrElse(new Properties())
@@ -633,8 +635,6 @@ object TestUtils extends Logging {
val defaultProps = Map(
ConsumerConfig.RETRY_BACKOFF_MS_CONFIG -> "100",
ConsumerConfig.RECONNECT_BACKOFF_MS_CONFIG -> "200",
- ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG -> "org.apache.kafka.common.serialization.ByteArrayDeserializer",
- ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG -> "org.apache.kafka.common.serialization.ByteArrayDeserializer",
ConsumerConfig.PARTITION_ASSIGNMENT_STRATEGY_CONFIG -> partitionAssignmentStrategy,
ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG -> sessionTimeout.toString,
ConsumerConfig.GROUP_ID_CONFIG -> groupId)
@@ -652,7 +652,7 @@ object TestUtils extends Logging {
if(!consumerProps.containsKey(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG))
consumerProps ++= consumerSecurityConfigs(securityProtocol, trustStoreFile, saslProperties)
- new KafkaConsumer[Array[Byte],Array[Byte]](consumerProps)
+ new KafkaConsumer[K, V](consumerProps, keyDeserializer, valueDeserializer)
}
/**
--
To stop receiving notification emails like this one, please contact
jgus@apache.org.