You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by rh...@apache.org on 2019/05/15 02:11:23 UTC
[kafka] branch trunk updated: KIP-421: Support for resolving
externalized secrets in AbstractConfig (#6467)
This is an automated email from the ASF dual-hosted git repository.
rhauch pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new d92eb0d KIP-421: Support for resolving externalized secrets in AbstractConfig (#6467)
d92eb0d is described below
commit d92eb0d8d85cc24256499eff97672141493fb90b
Author: tadsul <43...@users.noreply.github.com>
AuthorDate: Tue May 14 19:11:07 2019 -0700
KIP-421: Support for resolving externalized secrets in AbstractConfig (#6467)
Updated AbstractConfig to be able to resolve variables in config values when the configuration includes config provider properties.
Author: Tejal Adsul <te...@confluent.io>
Reviewers: Rajini Sivaram <ra...@gmail.com>, Randall Hauch <rh...@gmail.com>
---
.../apache/kafka/common/config/AbstractConfig.java | 162 ++++++++++++++++++++-
.../kafka/common/config/AbstractConfigTest.java | 137 +++++++++++++++++
.../config/provider/MockFileConfigProvider.java | 29 ++++
.../config/provider/MockVaultConfigProvider.java | 29 ++++
4 files changed, 354 insertions(+), 3 deletions(-)
diff --git a/clients/src/main/java/org/apache/kafka/common/config/AbstractConfig.java b/clients/src/main/java/org/apache/kafka/common/config/AbstractConfig.java
index 9cf13dd..555b634 100644
--- a/clients/src/main/java/org/apache/kafka/common/config/AbstractConfig.java
+++ b/clients/src/main/java/org/apache/kafka/common/config/AbstractConfig.java
@@ -22,6 +22,7 @@ import org.apache.kafka.common.config.types.Password;
import org.apache.kafka.common.utils.Utils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import org.apache.kafka.common.config.provider.ConfigProvider;
import java.util.ArrayList;
import java.util.Collections;
@@ -52,13 +53,57 @@ public class AbstractConfig {
private final ConfigDef definition;
+ private static final String CONFIG_PROVIDERS_CONFIG = "config.providers";
+
+ /**
+ * Construct a configuration with a ConfigDef and the configuration properties, which can include properties
+ * for zero or more {@link ConfigProvider} that will be used to resolve variables in configuration property
+ * values.
+ *
+ * The originals is a name-value pair configuration properties and optional config provider configs. The
+ * value of the configuration can be a variable as defined below or the actual value. This constructor will
+ * first instantiate the ConfigProviders using the config provider configs, then it will find all the
+ * variables in the values of the originals configurations, attempt to resolve the variables using the named
+ * ConfigProviders, and then parse and validate the configurations.
+ *
+ * ConfigProvider configs can be passed either as configs in the originals map or in the separate
+ * configProviderProps map. If config providers properties are passed in the configProviderProps any config
+ * provider properties in originals map will be ignored. If ConfigProvider properties are not provided, the
+ * constructor will skip the variable substitution step and will simply validate and parse the supplied
+ * configuration.
+ *
+ * The "{@code config.providers}" configuration property and all configuration properties that begin with the
+ * "{@code config.providers.}" prefix are reserved. The "{@code config.providers}" configuration property
+ * specifies the names of the config providers, and properties that begin with the "{@code config.providers..}"
+ * prefix correspond to the properties for that named provider. For example, the "{@code config.providers..class}"
+ * property specifies the name of the {@link ConfigProvider} implementation class that should be used for
+ * the provider.
+ *
+ * The keys for ConfigProvider configs in both originals and configProviderProps will start with the above
+ * mentioned "{@code config.providers.}" prefix.
+ *
+ * Variables have the form "${providerName:[path:]key}", where "providerName" is the name of a ConfigProvider,
+ * "path" is an optional string, and "key" is a required string. This variable is resolved by passing the "key"
+ * and optional "path" to a ConfigProvider with the specified name, and the result from the ConfigProvider is
+ * then used in place of the variable. Variables that cannot be resolved by the AbstractConfig constructor will
+ * be left unchanged in the configuration.
+ *
+ *
+ * @param definition the definition of the configurations; may not be null
+ * @param originals the configuration properties plus any optional config provider properties;
+ * @param configProviderProps the map of properties of config providers which will be instantiated by
+ * the constructor to resolve any variables in {@code originals}; may be null or empty
+ * @param doLog whether the configurations should be logged
+ */
@SuppressWarnings("unchecked")
- public AbstractConfig(ConfigDef definition, Map<?, ?> originals, boolean doLog) {
+ public AbstractConfig(ConfigDef definition, Map<?, ?> originals, Map<String, ?> configProviderProps, boolean doLog) {
/* check that all the keys are really strings */
for (Map.Entry<?, ?> entry : originals.entrySet())
if (!(entry.getKey() instanceof String))
throw new ConfigException(entry.getKey().toString(), entry.getValue(), "Key must be a string.");
- this.originals = (Map<String, ?>) originals;
+
+ this.originals = resolveConfigVariables(configProviderProps, (Map<String, Object>) originals);
+
this.values = definition.parse(this.originals);
Map<String, Object> configUpdates = postProcessParsedConfig(Collections.unmodifiableMap(this.values));
for (Map.Entry<String, Object> update : configUpdates.entrySet()) {
@@ -71,8 +116,30 @@ public class AbstractConfig {
logAll();
}
+ /**
+ * Construct a configuration with a ConfigDef and the configuration properties,
+ * which can include properties for zero or more {@link ConfigProvider}
+ * that will be used to resolve variables in configuration property values.
+ *
+ * @param definition the definition of the configurations; may not be null
+ * @param originals the configuration properties plus any optional config provider properties; may not be null
+ */
public AbstractConfig(ConfigDef definition, Map<?, ?> originals) {
- this(definition, originals, true);
+ this(definition, originals, Collections.emptyMap(), true);
+ }
+
+ /**
+ * Construct a configuration with a ConfigDef and the configuration properties,
+ * which can include properties for zero or more {@link ConfigProvider}
+ * that will be used to resolve variables in configuration property values.
+ *
+ * @param definition the definition of the configurations; may not be null
+ * @param originals the configuration properties plus any optional config provider properties; may not be null
+ * @param doLog whether the configurations should be logged
+ */
+ public AbstractConfig(ConfigDef definition, Map<?, ?> originals, boolean doLog) {
+ this(definition, originals, Collections.emptyMap(), doLog);
+
}
/**
@@ -369,6 +436,95 @@ public class AbstractConfig {
return objects;
}
+ private Map<String, String> extractPotentialVariables(Map<?, ?> configMap) {
+ // Variables are tuples of the form "${providerName:[path:]key}". From the configMap we extract the subset of configs with string
+ // values as potential variables.
+ Map<String, String> configMapAsString = new HashMap<>();
+ for (Map.Entry<?, ?> entry : configMap.entrySet()) {
+ if (entry.getValue() instanceof String)
+ configMapAsString.put((String) entry.getKey(), (String) entry.getValue());
+ }
+
+ return configMapAsString;
+ }
+
+ /**
+ * Instantiates given list of config providers and fetches the actual values of config variables from the config providers.
+ * returns a map of config key and resolved values.
+ * @param configProviderProps The map of config provider configs
+ * @param originals The map of raw configs.
+ * @return map of resolved config variable.
+ */
+ @SuppressWarnings("unchecked")
+ private Map<String, ?> resolveConfigVariables(Map<String, ?> configProviderProps, Map<String, Object> originals) {
+ Map<String, String> providerConfigString;
+ Map<String, ?> configProperties;
+
+ // As variable configs are strings, parse the originals and obtain the potential variable configs.
+ Map<String, String> indirectVariables = extractPotentialVariables(originals);
+
+ if (configProviderProps == null || configProviderProps.isEmpty()) {
+ providerConfigString = indirectVariables;
+ configProperties = originals;
+ } else {
+ providerConfigString = extractPotentialVariables(configProviderProps);
+ configProperties = configProviderProps;
+ }
+ Map<String, ConfigProvider> providers = instantiateConfigProviders(providerConfigString, configProperties);
+
+ if (!providers.isEmpty()) {
+ ConfigTransformer configTransformer = new ConfigTransformer(providers);
+ ConfigTransformerResult result = configTransformer.transform(indirectVariables);
+ originals.putAll(result.data());
+ }
+
+ return originals;
+ }
+
+ private Map<String, Object> configProviderProperties(String configProviderPrefix, Map<String, ?> providerConfigProperties) {
+ Map<String, Object> result = new HashMap<>();
+ for (Map.Entry<String, ?> entry : providerConfigProperties.entrySet()) {
+ String key = entry.getKey();
+ if (key.startsWith(configProviderPrefix) && key.length() > configProviderPrefix.length()) {
+ result.put(key.substring(configProviderPrefix.length()), entry.getValue());
+ }
+ }
+ return result;
+ }
+
+ private Map<String, ConfigProvider> instantiateConfigProviders(Map<String, String> indirectConfigs, Map<String, ?> providerConfigProperties) {
+ final String configProviders = indirectConfigs.get(CONFIG_PROVIDERS_CONFIG);
+
+ if (configProviders == null || configProviders.isEmpty()) {
+ return Collections.emptyMap();
+ }
+
+ Map<String, String> providerMap = new HashMap<>();
+
+ for (String provider: configProviders.split(",")) {
+ String providerClass = CONFIG_PROVIDERS_CONFIG + "." + provider + ".class";
+ if (indirectConfigs.containsKey(providerClass))
+ providerMap.put(provider, indirectConfigs.get(providerClass));
+
+ }
+ // Instantiate Config Providers
+ Map<String, ConfigProvider> configProviderInstances = new HashMap<>();
+ for (Map.Entry<String, String> entry : providerMap.entrySet()) {
+ try {
+ String prefix = CONFIG_PROVIDERS_CONFIG + "." + entry.getKey() + ".";
+ Map<String, ?> configProperties = configProviderProperties(prefix, providerConfigProperties);
+ ConfigProvider provider = Utils.newInstance(entry.getValue(), ConfigProvider.class);
+ provider.configure(configProperties);
+ configProviderInstances.put(entry.getKey(), provider);
+ } catch (ClassNotFoundException e) {
+ log.error("ClassNotFoundException exception occurred: " + entry.getValue());
+ throw new ConfigException("Invalid config:" + entry.getValue() + " ClassNotFoundException exception occurred", e);
+ }
+ }
+
+ return configProviderInstances;
+ }
+
@Override
public boolean equals(Object o) {
if (this == o) return true;
diff --git a/clients/src/test/java/org/apache/kafka/common/config/AbstractConfigTest.java b/clients/src/test/java/org/apache/kafka/common/config/AbstractConfigTest.java
index 071deed..a007fd3 100644
--- a/clients/src/test/java/org/apache/kafka/common/config/AbstractConfigTest.java
+++ b/clients/src/test/java/org/apache/kafka/common/config/AbstractConfigTest.java
@@ -321,6 +321,143 @@ public class AbstractConfigTest {
}
}
+ @SuppressWarnings("unchecked")
+ public Map<String, ?> convertPropertiesToMap(Map<?, ?> props) {
+ for (Map.Entry<?, ?> entry : props.entrySet()) {
+ if (!(entry.getKey() instanceof String))
+ throw new ConfigException(entry.getKey().toString(), entry.getValue(),
+ "Key must be a string.");
+ }
+ return (Map<String, ?>) props;
+ }
+
+ @Test
+ public void testOriginalsWithConfigProvidersProps() {
+ Properties props = new Properties();
+
+ // Test Case: Valid Test Case for ConfigProviders as part of config.properties
+ props.put("config.providers", "file");
+ props.put("config.providers.file.class", "org.apache.kafka.common.config.provider.MockFileConfigProvider");
+ props.put("prefix.ssl.truststore.location.number", 5);
+ props.put("sasl.kerberos.service.name", "service name");
+ props.put("sasl.kerberos.key", "${file:/usr/kerberos:key}");
+ props.put("sasl.kerberos.password", "${file:/usr/kerberos:password}");
+ TestIndirectConfigResolution config = new TestIndirectConfigResolution(props);
+ assertEquals(config.originals().get("sasl.kerberos.key"), "testKey");
+ assertEquals(config.originals().get("sasl.kerberos.password"), "randomPassword");
+ assertEquals(config.originals().get("prefix.ssl.truststore.location.number"), 5);
+ assertEquals(config.originals().get("sasl.kerberos.service.name"), "service name");
+ }
+
+ @Test
+ public void testConfigProvidersPropsAsParam() {
+ // Test Case: Valid Test Case for ConfigProviders as a separate variable
+ Properties providers = new Properties();
+ providers.put("config.providers", "file");
+ providers.put("config.providers.file.class", "org.apache.kafka.common.config.provider.MockFileConfigProvider");
+ Properties props = new Properties();
+ props.put("sasl.kerberos.key", "${file:/usr/kerberos:key}");
+ props.put("sasl.kerberos.password", "${file:/usr/kerberos:password}");
+ TestIndirectConfigResolution config = new TestIndirectConfigResolution(props, convertPropertiesToMap(providers));
+ assertEquals(config.originals().get("sasl.kerberos.key"), "testKey");
+ assertEquals(config.originals().get("sasl.kerberos.password"), "randomPassword");
+ }
+
+ @Test
+ public void testAutoConfigResolutionWithMultipleConfigProviders() {
+ // Test Case: Valid Test Case With Multiple ConfigProviders as a separate variable
+ Properties providers = new Properties();
+ providers.put("config.providers", "file,vault");
+ providers.put("config.providers.file.class", "org.apache.kafka.common.config.provider.MockFileConfigProvider");
+ providers.put("config.providers.vault.class", "org.apache.kafka.common.config.provider.MockVaultConfigProvider");
+ Properties props = new Properties();
+ props.put("sasl.kerberos.key", "${file:/usr/kerberos:key}");
+ props.put("sasl.kerberos.password", "${file:/usr/kerberos:password}");
+ props.put("sasl.truststore.key", "${vault:/usr/truststore:truststoreKey}");
+ props.put("sasl.truststore.password", "${vault:/usr/truststore:truststorePassword}");
+ TestIndirectConfigResolution config = new TestIndirectConfigResolution(props, convertPropertiesToMap(providers));
+ assertEquals(config.originals().get("sasl.kerberos.key"), "testKey");
+ assertEquals(config.originals().get("sasl.kerberos.password"), "randomPassword");
+ assertEquals(config.originals().get("sasl.truststore.key"), "testTruststoreKey");
+ assertEquals(config.originals().get("sasl.truststore.password"), "randomtruststorePassword");
+ }
+
+ @Test
+ public void testAutoConfigResolutionWithInvalidConfigProviderClass() {
+ // Test Case: Invalid class for Config Provider
+ Properties props = new Properties();
+ props.put("config.providers", "file");
+ props.put("config.providers.file.class",
+ "org.apache.kafka.common.config.provider.InvalidConfigProvider");
+ props.put("testKey", "${test:/foo/bar/testpath:testKey}");
+ try {
+ TestIndirectConfigResolution config = new TestIndirectConfigResolution(props);
+ fail("Expected a config exception due to invalid props :" + props);
+ } catch (KafkaException e) {
+ // this is good
+ }
+ }
+
+ @Test
+ public void testAutoConfigResolutionWithMissingConfigProvider() {
+ // Test Case: Config Provider for a variable missing in config file.
+ Properties props = new Properties();
+ props.put("testKey", "${test:/foo/bar/testpath:testKey}");
+ TestIndirectConfigResolution config = new TestIndirectConfigResolution(props);
+ assertEquals(config.originals().get("testKey"), "${test:/foo/bar/testpath:testKey}");
+ }
+
+ @Test
+ public void testAutoConfigResolutionWithMissingConfigKey() {
+ // Test Case: Config Provider fails to resolve the config (key not present)
+ Properties props = new Properties();
+ props.put("config.providers", "test");
+ props.put("config.providers.test.class", "org.apache.kafka.common.config.provider.MockFileConfigProvider");
+ props.put("random", "${test:/foo/bar/testpath:random}");
+ TestIndirectConfigResolution config = new TestIndirectConfigResolution(props);
+ assertEquals(config.originals().get("random"), "${test:/foo/bar/testpath:random}");
+ }
+
+ @Test
+ public void testAutoConfigResolutionWithDuplicateConfigProvider() {
+ // Test Case: If ConfigProvider is provided in both originals and provider. Only the ones in provider should be used.
+ Properties providers = new Properties();
+ providers.put("config.providers", "test");
+ providers.put("config.providers.test.class", "org.apache.kafka.common.config.provider.MockFileConfigProvider");
+
+ Properties props = new Properties();
+ props.put("sasl.kerberos.key", "${file:/usr/kerberos:key}");
+ props.put("config.providers", "file");
+ props.put("config.providers.file.class", "org.apache.kafka.common.config.provider.MockFileConfigProvider");
+
+ TestIndirectConfigResolution config = new TestIndirectConfigResolution(props, convertPropertiesToMap(providers));
+ assertEquals(config.originals().get("sasl.kerberos.key"), "${file:/usr/kerberos:key}");
+ }
+
+ private static class TestIndirectConfigResolution extends AbstractConfig {
+
+ private static final ConfigDef CONFIG;
+
+ public static final String INDIRECT_CONFIGS = "indirect.variables";
+ private static final String INDIRECT_CONFIGS_DOC = "Variables whose values can be obtained from ConfigProviders";
+
+ static {
+ CONFIG = new ConfigDef().define(INDIRECT_CONFIGS,
+ Type.LIST,
+ "",
+ Importance.LOW,
+ INDIRECT_CONFIGS_DOC);
+ }
+
+ public TestIndirectConfigResolution(Map<?, ?> props) {
+ super(CONFIG, props, true);
+ }
+
+ public TestIndirectConfigResolution(Map<?, ?> props, Map<String, ?> providers) {
+ super(CONFIG, props, providers, true);
+ }
+ }
+
private static class ClassTestConfig extends AbstractConfig {
static final Class<?> DEFAULT_CLASS = FakeMetricsReporter.class;
static final Class<?> VISIBLE_CLASS = JmxReporter.class;
diff --git a/clients/src/test/java/org/apache/kafka/common/config/provider/MockFileConfigProvider.java b/clients/src/test/java/org/apache/kafka/common/config/provider/MockFileConfigProvider.java
new file mode 100644
index 0000000..e779cbe
--- /dev/null
+++ b/clients/src/test/java/org/apache/kafka/common/config/provider/MockFileConfigProvider.java
@@ -0,0 +1,29 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.common.config.provider;
+
+import java.io.IOException;
+import java.io.Reader;
+import java.io.StringReader;
+
+public class MockFileConfigProvider extends FileConfigProvider {
+
+ @Override
+ protected Reader reader(String path) throws IOException {
+ return new StringReader("key=testKey\npassword=randomPassword");
+ }
+}
diff --git a/clients/src/test/java/org/apache/kafka/common/config/provider/MockVaultConfigProvider.java b/clients/src/test/java/org/apache/kafka/common/config/provider/MockVaultConfigProvider.java
new file mode 100644
index 0000000..dbfe158
--- /dev/null
+++ b/clients/src/test/java/org/apache/kafka/common/config/provider/MockVaultConfigProvider.java
@@ -0,0 +1,29 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.common.config.provider;
+
+import java.io.IOException;
+import java.io.Reader;
+import java.io.StringReader;
+
+public class MockVaultConfigProvider extends FileConfigProvider {
+
+ @Override
+ protected Reader reader(String path) throws IOException {
+ return new StringReader("truststoreKey=testTruststoreKey\ntruststorePassword=randomtruststorePassword");
+ }
+}