You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by rh...@apache.org on 2019/05/15 02:11:23 UTC

[kafka] branch trunk updated: KIP-421: Support for resolving externalized secrets in AbstractConfig (#6467)

This is an automated email from the ASF dual-hosted git repository.

rhauch pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new d92eb0d  KIP-421: Support for resolving externalized secrets in AbstractConfig (#6467)
d92eb0d is described below

commit d92eb0d8d85cc24256499eff97672141493fb90b
Author: tadsul <43...@users.noreply.github.com>
AuthorDate: Tue May 14 19:11:07 2019 -0700

    KIP-421: Support for resolving externalized secrets in AbstractConfig (#6467)
    
    Updated AbstractConfig to be able to resolve variables in config values when the configuration includes config provider properties.
    
    Author: Tejal Adsul <te...@confluent.io>
    Reviewers: Rajini Sivaram <ra...@gmail.com>, Randall Hauch <rh...@gmail.com>
---
 .../apache/kafka/common/config/AbstractConfig.java | 162 ++++++++++++++++++++-
 .../kafka/common/config/AbstractConfigTest.java    | 137 +++++++++++++++++
 .../config/provider/MockFileConfigProvider.java    |  29 ++++
 .../config/provider/MockVaultConfigProvider.java   |  29 ++++
 4 files changed, 354 insertions(+), 3 deletions(-)

diff --git a/clients/src/main/java/org/apache/kafka/common/config/AbstractConfig.java b/clients/src/main/java/org/apache/kafka/common/config/AbstractConfig.java
index 9cf13dd..555b634 100644
--- a/clients/src/main/java/org/apache/kafka/common/config/AbstractConfig.java
+++ b/clients/src/main/java/org/apache/kafka/common/config/AbstractConfig.java
@@ -22,6 +22,7 @@ import org.apache.kafka.common.config.types.Password;
 import org.apache.kafka.common.utils.Utils;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
+import org.apache.kafka.common.config.provider.ConfigProvider;
 
 import java.util.ArrayList;
 import java.util.Collections;
@@ -52,13 +53,57 @@ public class AbstractConfig {
 
     private final ConfigDef definition;
 
+    private static final String CONFIG_PROVIDERS_CONFIG = "config.providers";
+
+    /**
+     * Construct a configuration with a ConfigDef and the configuration properties, which can include properties
+     * for zero or more {@link ConfigProvider} that will be used to resolve variables in configuration property
+     * values.
+     *
+     * The originals is a name-value pair configuration properties and optional config provider configs. The
+     * value of the configuration can be a variable as defined below or the actual value. This constructor will
+     * first instantiate the ConfigProviders using the config provider configs, then it will find all the
+     * variables in the values of the originals configurations, attempt to resolve the variables using the named
+     * ConfigProviders, and then parse and validate the configurations.
+     *
+     * ConfigProvider configs can be passed either as configs in the originals map or in the separate
+     * configProviderProps map. If config providers properties are passed in the configProviderProps any config
+     * provider properties in originals map will be ignored. If ConfigProvider properties are not provided, the
+     * constructor will skip the variable substitution step and will simply validate and parse the supplied
+     * configuration.
+     *
+     * The "{@code config.providers}" configuration property and all configuration properties that begin with the
+     * "{@code config.providers.}" prefix are reserved. The "{@code config.providers}" configuration property
+     * specifies the names of the config providers, and properties that begin with the "{@code config.providers..}"
+     * prefix correspond to the properties for that named provider. For example, the "{@code config.providers..class}"
+     * property specifies the name of the {@link ConfigProvider} implementation class that should be used for
+     * the provider.
+     *
+     * The keys for ConfigProvider configs in both originals and configProviderProps will start with the above
+     * mentioned "{@code config.providers.}" prefix.
+     *
+     * Variables have the form "${providerName:[path:]key}", where "providerName" is the name of a ConfigProvider,
+     * "path" is an optional string, and "key" is a required string. This variable is resolved by passing the "key"
+     * and optional "path" to a ConfigProvider with the specified name, and the result from the ConfigProvider is
+     * then used in place of the variable. Variables that cannot be resolved by the AbstractConfig constructor will
+     * be left unchanged in the configuration.
+     *
+     *
+     * @param definition the definition of the configurations; may not be null
+     * @param originals the configuration properties plus any optional config provider properties;
+     * @param configProviderProps the map of properties of config providers which will be instantiated by
+     *        the constructor to resolve any variables in {@code originals}; may be null or empty
+     * @param doLog whether the configurations should be logged
+     */
     @SuppressWarnings("unchecked")
-    public AbstractConfig(ConfigDef definition, Map<?, ?> originals, boolean doLog) {
+    public AbstractConfig(ConfigDef definition, Map<?, ?> originals,  Map<String, ?> configProviderProps, boolean doLog) {
         /* check that all the keys are really strings */
         for (Map.Entry<?, ?> entry : originals.entrySet())
             if (!(entry.getKey() instanceof String))
                 throw new ConfigException(entry.getKey().toString(), entry.getValue(), "Key must be a string.");
-        this.originals = (Map<String, ?>) originals;
+
+        this.originals = resolveConfigVariables(configProviderProps, (Map<String, Object>) originals);
+
         this.values = definition.parse(this.originals);
         Map<String, Object> configUpdates = postProcessParsedConfig(Collections.unmodifiableMap(this.values));
         for (Map.Entry<String, Object> update : configUpdates.entrySet()) {
@@ -71,8 +116,30 @@ public class AbstractConfig {
             logAll();
     }
 
+    /**
+     * Construct a configuration with a ConfigDef and the configuration properties,
+     * which can include properties for zero or more {@link ConfigProvider}
+     * that will be used to resolve variables in configuration property values.
+     *
+     * @param definition the definition of the configurations; may not be null
+     * @param originals the configuration properties plus any optional config provider properties; may not be null
+     */
     public AbstractConfig(ConfigDef definition, Map<?, ?> originals) {
-        this(definition, originals, true);
+        this(definition, originals, Collections.emptyMap(), true);
+    }
+
+    /**
+     * Construct a configuration with a ConfigDef and the configuration properties,
+     * which can include properties for zero or more {@link ConfigProvider}
+     * that will be used to resolve variables in configuration property values.
+     *
+     * @param definition the definition of the configurations; may not be null
+     * @param originals the configuration properties plus any optional config provider properties; may not be null
+     * @param doLog whether the configurations should be logged
+     */
+    public AbstractConfig(ConfigDef definition, Map<?, ?> originals, boolean doLog) {
+        this(definition, originals, Collections.emptyMap(), doLog);
+
     }
 
     /**
@@ -369,6 +436,95 @@ public class AbstractConfig {
         return objects;
     }
 
+    private Map<String, String> extractPotentialVariables(Map<?, ?>  configMap) {
+        // Variables are tuples of the form "${providerName:[path:]key}". From the configMap we extract the subset of configs with string
+        // values as potential variables.
+        Map<String, String> configMapAsString = new HashMap<>();
+        for (Map.Entry<?, ?> entry : configMap.entrySet()) {
+            if (entry.getValue() instanceof String)
+                configMapAsString.put((String) entry.getKey(), (String) entry.getValue());
+        }
+
+        return configMapAsString;
+    }
+
+    /**
+     * Instantiates given list of config providers and fetches the actual values of config variables from the config providers.
+     * returns a map of config key and resolved values.
+     * @param configProviderProps The map of config provider configs
+     * @param originals The map of raw configs.
+     * @return map of resolved config variable.
+     */
+    @SuppressWarnings("unchecked")
+    private  Map<String, ?> resolveConfigVariables(Map<String, ?> configProviderProps, Map<String, Object> originals) {
+        Map<String, String> providerConfigString;
+        Map<String, ?> configProperties;
+
+        // As variable configs are strings, parse the originals and obtain the potential variable configs.
+        Map<String, String> indirectVariables = extractPotentialVariables(originals);
+
+        if (configProviderProps == null || configProviderProps.isEmpty()) {
+            providerConfigString = indirectVariables;
+            configProperties = originals;
+        } else {
+            providerConfigString = extractPotentialVariables(configProviderProps);
+            configProperties = configProviderProps;
+        }
+        Map<String, ConfigProvider> providers = instantiateConfigProviders(providerConfigString, configProperties);
+
+        if (!providers.isEmpty()) {
+            ConfigTransformer configTransformer = new ConfigTransformer(providers);
+            ConfigTransformerResult result = configTransformer.transform(indirectVariables);
+            originals.putAll(result.data());
+        }
+
+        return originals;
+    }
+
+    private Map<String, Object> configProviderProperties(String configProviderPrefix, Map<String, ?> providerConfigProperties) {
+        Map<String, Object> result = new HashMap<>();
+        for (Map.Entry<String, ?> entry : providerConfigProperties.entrySet()) {
+            String key = entry.getKey();
+            if (key.startsWith(configProviderPrefix) && key.length() > configProviderPrefix.length()) {
+                result.put(key.substring(configProviderPrefix.length()), entry.getValue());
+            }
+        }
+        return result;
+    }
+
+    private Map<String, ConfigProvider> instantiateConfigProviders(Map<String, String> indirectConfigs, Map<String, ?> providerConfigProperties) {
+        final String configProviders = indirectConfigs.get(CONFIG_PROVIDERS_CONFIG);
+
+        if (configProviders == null || configProviders.isEmpty()) {
+            return Collections.emptyMap();
+        }
+
+        Map<String, String> providerMap = new HashMap<>();
+
+        for (String provider: configProviders.split(",")) {
+            String providerClass = CONFIG_PROVIDERS_CONFIG + "." + provider + ".class";
+            if (indirectConfigs.containsKey(providerClass))
+                providerMap.put(provider, indirectConfigs.get(providerClass));
+
+        }
+        // Instantiate Config Providers
+        Map<String, ConfigProvider> configProviderInstances = new HashMap<>();
+        for (Map.Entry<String, String> entry : providerMap.entrySet()) {
+            try {
+                String prefix = CONFIG_PROVIDERS_CONFIG + "." + entry.getKey() + ".";
+                Map<String, ?> configProperties = configProviderProperties(prefix, providerConfigProperties);
+                ConfigProvider provider = Utils.newInstance(entry.getValue(), ConfigProvider.class);
+                provider.configure(configProperties);
+                configProviderInstances.put(entry.getKey(), provider);
+            } catch (ClassNotFoundException e) {
+                log.error("ClassNotFoundException exception occurred: " + entry.getValue());
+                throw new ConfigException("Invalid config:" + entry.getValue() + " ClassNotFoundException exception occurred", e);
+            }
+        }
+
+        return configProviderInstances;
+    }
+
     @Override
     public boolean equals(Object o) {
         if (this == o) return true;
diff --git a/clients/src/test/java/org/apache/kafka/common/config/AbstractConfigTest.java b/clients/src/test/java/org/apache/kafka/common/config/AbstractConfigTest.java
index 071deed..a007fd3 100644
--- a/clients/src/test/java/org/apache/kafka/common/config/AbstractConfigTest.java
+++ b/clients/src/test/java/org/apache/kafka/common/config/AbstractConfigTest.java
@@ -321,6 +321,143 @@ public class AbstractConfigTest {
         }
     }
 
+    @SuppressWarnings("unchecked")
+    public Map<String, ?> convertPropertiesToMap(Map<?, ?> props) {
+        for (Map.Entry<?, ?> entry : props.entrySet()) {
+            if (!(entry.getKey() instanceof String))
+                throw new ConfigException(entry.getKey().toString(), entry.getValue(),
+                    "Key must be a string.");
+        }
+        return (Map<String, ?>) props;
+    }
+
+    @Test
+    public void testOriginalsWithConfigProvidersProps() {
+        Properties props = new Properties();
+
+        // Test Case: Valid Test Case for ConfigProviders as part of config.properties
+        props.put("config.providers", "file");
+        props.put("config.providers.file.class", "org.apache.kafka.common.config.provider.MockFileConfigProvider");
+        props.put("prefix.ssl.truststore.location.number", 5);
+        props.put("sasl.kerberos.service.name", "service name");
+        props.put("sasl.kerberos.key", "${file:/usr/kerberos:key}");
+        props.put("sasl.kerberos.password", "${file:/usr/kerberos:password}");
+        TestIndirectConfigResolution config = new TestIndirectConfigResolution(props);
+        assertEquals(config.originals().get("sasl.kerberos.key"), "testKey");
+        assertEquals(config.originals().get("sasl.kerberos.password"), "randomPassword");
+        assertEquals(config.originals().get("prefix.ssl.truststore.location.number"), 5);
+        assertEquals(config.originals().get("sasl.kerberos.service.name"), "service name");
+    }
+
+    @Test
+    public void testConfigProvidersPropsAsParam() {
+        // Test Case: Valid Test Case for ConfigProviders as a separate variable
+        Properties providers = new Properties();
+        providers.put("config.providers", "file");
+        providers.put("config.providers.file.class", "org.apache.kafka.common.config.provider.MockFileConfigProvider");
+        Properties props = new Properties();
+        props.put("sasl.kerberos.key", "${file:/usr/kerberos:key}");
+        props.put("sasl.kerberos.password", "${file:/usr/kerberos:password}");
+        TestIndirectConfigResolution config = new TestIndirectConfigResolution(props, convertPropertiesToMap(providers));
+        assertEquals(config.originals().get("sasl.kerberos.key"), "testKey");
+        assertEquals(config.originals().get("sasl.kerberos.password"), "randomPassword");
+    }
+
+    @Test
+    public void testAutoConfigResolutionWithMultipleConfigProviders() {
+        // Test Case: Valid Test Case With Multiple ConfigProviders as a separate variable
+        Properties providers = new Properties();
+        providers.put("config.providers", "file,vault");
+        providers.put("config.providers.file.class", "org.apache.kafka.common.config.provider.MockFileConfigProvider");
+        providers.put("config.providers.vault.class", "org.apache.kafka.common.config.provider.MockVaultConfigProvider");
+        Properties props = new Properties();
+        props.put("sasl.kerberos.key", "${file:/usr/kerberos:key}");
+        props.put("sasl.kerberos.password", "${file:/usr/kerberos:password}");
+        props.put("sasl.truststore.key", "${vault:/usr/truststore:truststoreKey}");
+        props.put("sasl.truststore.password", "${vault:/usr/truststore:truststorePassword}");
+        TestIndirectConfigResolution config = new TestIndirectConfigResolution(props, convertPropertiesToMap(providers));
+        assertEquals(config.originals().get("sasl.kerberos.key"), "testKey");
+        assertEquals(config.originals().get("sasl.kerberos.password"), "randomPassword");
+        assertEquals(config.originals().get("sasl.truststore.key"), "testTruststoreKey");
+        assertEquals(config.originals().get("sasl.truststore.password"), "randomtruststorePassword");
+    }
+
+    @Test
+    public void testAutoConfigResolutionWithInvalidConfigProviderClass() {
+        // Test Case: Invalid class for Config Provider
+        Properties props = new Properties();
+        props.put("config.providers", "file");
+        props.put("config.providers.file.class",
+            "org.apache.kafka.common.config.provider.InvalidConfigProvider");
+        props.put("testKey", "${test:/foo/bar/testpath:testKey}");
+        try {
+            TestIndirectConfigResolution config = new TestIndirectConfigResolution(props);
+            fail("Expected a config exception due to invalid props :" + props);
+        } catch (KafkaException e) {
+            // this is good
+        }
+    }
+
+    @Test
+    public void testAutoConfigResolutionWithMissingConfigProvider() {
+        // Test Case: Config Provider for a variable missing in config file.
+        Properties props = new Properties();
+        props.put("testKey", "${test:/foo/bar/testpath:testKey}");
+        TestIndirectConfigResolution config = new TestIndirectConfigResolution(props);
+        assertEquals(config.originals().get("testKey"), "${test:/foo/bar/testpath:testKey}");
+    }
+
+    @Test
+    public void testAutoConfigResolutionWithMissingConfigKey() {
+        // Test Case: Config Provider fails to resolve the config (key not present)
+        Properties props = new Properties();
+        props.put("config.providers", "test");
+        props.put("config.providers.test.class", "org.apache.kafka.common.config.provider.MockFileConfigProvider");
+        props.put("random", "${test:/foo/bar/testpath:random}");
+        TestIndirectConfigResolution config = new TestIndirectConfigResolution(props);
+        assertEquals(config.originals().get("random"), "${test:/foo/bar/testpath:random}");
+    }
+
+    @Test
+    public void testAutoConfigResolutionWithDuplicateConfigProvider() {
+        // Test Case: If ConfigProvider is provided in both originals and provider. Only the ones in provider should be used.
+        Properties providers = new Properties();
+        providers.put("config.providers", "test");
+        providers.put("config.providers.test.class", "org.apache.kafka.common.config.provider.MockFileConfigProvider");
+
+        Properties props = new Properties();
+        props.put("sasl.kerberos.key", "${file:/usr/kerberos:key}");
+        props.put("config.providers", "file");
+        props.put("config.providers.file.class", "org.apache.kafka.common.config.provider.MockFileConfigProvider");
+
+        TestIndirectConfigResolution config = new TestIndirectConfigResolution(props, convertPropertiesToMap(providers));
+        assertEquals(config.originals().get("sasl.kerberos.key"), "${file:/usr/kerberos:key}");
+    }
+
+    private static class TestIndirectConfigResolution extends AbstractConfig {
+
+        private static final ConfigDef CONFIG;
+
+        public static final String INDIRECT_CONFIGS = "indirect.variables";
+        private static final String INDIRECT_CONFIGS_DOC = "Variables whose values can be obtained from ConfigProviders";
+
+        static {
+            CONFIG = new ConfigDef().define(INDIRECT_CONFIGS,
+                    Type.LIST,
+                    "",
+                    Importance.LOW,
+                    INDIRECT_CONFIGS_DOC);
+        }
+
+        public TestIndirectConfigResolution(Map<?, ?> props) {
+            super(CONFIG, props, true);
+        }
+
+        public TestIndirectConfigResolution(Map<?, ?> props, Map<String, ?> providers) {
+            super(CONFIG, props, providers, true);
+        }
+    }
+
     private static class ClassTestConfig extends AbstractConfig {
         static final Class<?> DEFAULT_CLASS = FakeMetricsReporter.class;
         static final Class<?> VISIBLE_CLASS = JmxReporter.class;
diff --git a/clients/src/test/java/org/apache/kafka/common/config/provider/MockFileConfigProvider.java b/clients/src/test/java/org/apache/kafka/common/config/provider/MockFileConfigProvider.java
new file mode 100644
index 0000000..e779cbe
--- /dev/null
+++ b/clients/src/test/java/org/apache/kafka/common/config/provider/MockFileConfigProvider.java
@@ -0,0 +1,29 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.common.config.provider;
+
+import java.io.IOException;
+import java.io.Reader;
+import java.io.StringReader;
+
+public class MockFileConfigProvider extends FileConfigProvider {
+
+    @Override
+    protected Reader reader(String path) throws IOException {
+        return new StringReader("key=testKey\npassword=randomPassword");
+    }
+}
diff --git a/clients/src/test/java/org/apache/kafka/common/config/provider/MockVaultConfigProvider.java b/clients/src/test/java/org/apache/kafka/common/config/provider/MockVaultConfigProvider.java
new file mode 100644
index 0000000..dbfe158
--- /dev/null
+++ b/clients/src/test/java/org/apache/kafka/common/config/provider/MockVaultConfigProvider.java
@@ -0,0 +1,29 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.common.config.provider;
+
+import java.io.IOException;
+import java.io.Reader;
+import java.io.StringReader;
+
+public class MockVaultConfigProvider extends FileConfigProvider {
+
+    @Override
+    protected Reader reader(String path) throws IOException {
+        return new StringReader("truststoreKey=testTruststoreKey\ntruststorePassword=randomtruststorePassword");
+    }
+}