You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by rs...@apache.org on 2019/06/03 11:56:50 UTC

[kafka] branch trunk updated: KAFKA-8426; Fix for keeping the ConfigProvider configs consistent with KIP-297 (#6750)

This is an automated email from the ASF dual-hosted git repository.

rsivaram pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new b042b36  KAFKA-8426; Fix for keeping the ConfigProvider configs consistent with KIP-297 (#6750)
b042b36 is described below

commit b042b36674f0bd8046f0229c96e0b8cf99554b2b
Author: tadsul <43...@users.noreply.github.com>
AuthorDate: Mon Jun 3 04:56:31 2019 -0700

    KAFKA-8426; Fix for keeping the ConfigProvider configs consistent with KIP-297 (#6750)
    
    According to KIP-297 a parameter is passed to ConfigProvider with syntax "config.providers.{name}.param.{param-name}". Currently AbstractConfig allows parameters of the format "config.providers.{name}.{param-name}". With this fix AbstractConfig will be consistent with KIP-297 syntax.
    
    Reviewers: Robert Yokota <ra...@gmail.com>, Rajini Sivaram <ra...@googlemail.com>
---
 .../apache/kafka/common/config/AbstractConfig.java | 14 +++++++++-
 .../kafka/common/config/AbstractConfigTest.java    | 32 +++++++++++++++++-----
 .../config/provider/MockVaultConfigProvider.java   | 18 +++++++++++-
 3 files changed, 55 insertions(+), 9 deletions(-)

diff --git a/clients/src/main/java/org/apache/kafka/common/config/AbstractConfig.java b/clients/src/main/java/org/apache/kafka/common/config/AbstractConfig.java
index 13865d0..a48856f 100644
--- a/clients/src/main/java/org/apache/kafka/common/config/AbstractConfig.java
+++ b/clients/src/main/java/org/apache/kafka/common/config/AbstractConfig.java
@@ -55,6 +55,8 @@ public class AbstractConfig {
 
     private static final String CONFIG_PROVIDERS_CONFIG = "config.providers";
 
+    private static final String CONFIG_PROVIDERS_PARAM = ".param.";
+
     /**
      * Construct a configuration with a ConfigDef and the configuration properties, which can include properties
      * for zero or more {@link ConfigProvider} that will be used to resolve variables in configuration property
@@ -494,6 +496,16 @@ public class AbstractConfig {
         return result;
     }
 
+    /**
+     * Instantiates and configures the ConfigProviders. The config providers configs are defined as follows:
+     * config.providers : A comma-separated list of names for providers.
+     * config.providers.{name}.class : The Java class name for a provider.
+     * config.providers.{name}.param.{param-name} : A parameter to be passed to the above Java class on initialization.
+     * returns a map of config provider name and its instance.
+     * @param indirectConfigs The map of potential variable configs
+     * @param providerConfigProperties The map of config provider configs
+     * @return map map of config provider name and its instance.
+     */
     private Map<String, ConfigProvider> instantiateConfigProviders(Map<String, String> indirectConfigs, Map<String, ?> providerConfigProperties) {
         final String configProviders = indirectConfigs.get(CONFIG_PROVIDERS_CONFIG);
 
@@ -513,7 +525,7 @@ public class AbstractConfig {
         Map<String, ConfigProvider> configProviderInstances = new HashMap<>();
         for (Map.Entry<String, String> entry : providerMap.entrySet()) {
             try {
-                String prefix = CONFIG_PROVIDERS_CONFIG + "." + entry.getKey() + ".";
+                String prefix = CONFIG_PROVIDERS_CONFIG + "." + entry.getKey() + CONFIG_PROVIDERS_PARAM;
                 Map<String, ?> configProperties = configProviderProperties(prefix, providerConfigProperties);
                 ConfigProvider provider = Utils.newInstance(entry.getValue(), ConfigProvider.class);
                 provider.configure(configProperties);
diff --git a/clients/src/test/java/org/apache/kafka/common/config/AbstractConfigTest.java b/clients/src/test/java/org/apache/kafka/common/config/AbstractConfigTest.java
index 33aaac5..f686475 100644
--- a/clients/src/test/java/org/apache/kafka/common/config/AbstractConfigTest.java
+++ b/clients/src/test/java/org/apache/kafka/common/config/AbstractConfigTest.java
@@ -24,6 +24,8 @@ import org.apache.kafka.common.metrics.FakeMetricsReporter;
 import org.apache.kafka.common.metrics.JmxReporter;
 import org.apache.kafka.common.metrics.MetricsReporter;
 import org.apache.kafka.common.security.TestSecurityConfig;
+import org.apache.kafka.common.config.provider.MockVaultConfigProvider;
+import org.apache.kafka.common.config.provider.MockFileConfigProvider;
 import org.junit.Test;
 
 import java.util.Arrays;
@@ -337,7 +339,7 @@ public class AbstractConfigTest {
 
         // Test Case: Valid Test Case for ConfigProviders as part of config.properties
         props.put("config.providers", "file");
-        props.put("config.providers.file.class", "org.apache.kafka.common.config.provider.MockFileConfigProvider");
+        props.put("config.providers.file.class", MockFileConfigProvider.class.getName());
         props.put("prefix.ssl.truststore.location.number", 5);
         props.put("sasl.kerberos.service.name", "service name");
         props.put("sasl.kerberos.key", "${file:/usr/kerberos:key}");
@@ -354,7 +356,7 @@ public class AbstractConfigTest {
         // Test Case: Valid Test Case for ConfigProviders as a separate variable
         Properties providers = new Properties();
         providers.put("config.providers", "file");
-        providers.put("config.providers.file.class", "org.apache.kafka.common.config.provider.MockFileConfigProvider");
+        providers.put("config.providers.file.class", MockFileConfigProvider.class.getName());
         Properties props = new Properties();
         props.put("sasl.kerberos.key", "${file:/usr/kerberos:key}");
         props.put("sasl.kerberos.password", "${file:/usr/kerberos:password}");
@@ -382,8 +384,8 @@ public class AbstractConfigTest {
         // Test Case: Valid Test Case With Multiple ConfigProviders as a separate variable
         Properties providers = new Properties();
         providers.put("config.providers", "file,vault");
-        providers.put("config.providers.file.class", "org.apache.kafka.common.config.provider.MockFileConfigProvider");
-        providers.put("config.providers.vault.class", "org.apache.kafka.common.config.provider.MockVaultConfigProvider");
+        providers.put("config.providers.file.class", MockFileConfigProvider.class.getName());
+        providers.put("config.providers.vault.class", MockVaultConfigProvider.class.getName());
         Properties props = new Properties();
         props.put("sasl.kerberos.key", "${file:/usr/kerberos:key}");
         props.put("sasl.kerberos.password", "${file:/usr/kerberos:password}");
@@ -426,7 +428,7 @@ public class AbstractConfigTest {
         // Test Case: Config Provider fails to resolve the config (key not present)
         Properties props = new Properties();
         props.put("config.providers", "test");
-        props.put("config.providers.test.class", "org.apache.kafka.common.config.provider.MockFileConfigProvider");
+        props.put("config.providers.test.class", MockFileConfigProvider.class.getName());
         props.put("random", "${test:/foo/bar/testpath:random}");
         TestIndirectConfigResolution config = new TestIndirectConfigResolution(props);
         assertEquals(config.originals().get("random"), "${test:/foo/bar/testpath:random}");
@@ -437,17 +439,33 @@ public class AbstractConfigTest {
         // Test Case: If ConfigProvider is provided in both originals and provider. Only the ones in provider should be used.
         Properties providers = new Properties();
         providers.put("config.providers", "test");
-        providers.put("config.providers.test.class", "org.apache.kafka.common.config.provider.MockFileConfigProvider");
+        providers.put("config.providers.test.class", MockVaultConfigProvider.class.getName());
 
         Properties props = new Properties();
         props.put("sasl.kerberos.key", "${file:/usr/kerberos:key}");
         props.put("config.providers", "file");
-        props.put("config.providers.file.class", "org.apache.kafka.common.config.provider.MockFileConfigProvider");
+        props.put("config.providers.file.class", MockVaultConfigProvider.class.getName());
 
         TestIndirectConfigResolution config = new TestIndirectConfigResolution(props, convertPropertiesToMap(providers));
         assertEquals(config.originals().get("sasl.kerberos.key"), "${file:/usr/kerberos:key}");
     }
 
+    @Test
+    public void testConfigProviderConfigurationWithConfigParams() {
+        // Test Case: Valid Test Case With Multiple ConfigProviders as a separate variable
+        Properties providers = new Properties();
+        providers.put("config.providers", "vault");
+        providers.put("config.providers.vault.class", MockVaultConfigProvider.class.getName());
+        providers.put("config.providers.vault.param.key", "randomKey");
+        providers.put("config.providers.vault.param.location", "/usr/vault");
+        Properties props = new Properties();
+        props.put("sasl.truststore.location", "${vault:/usr/truststore:truststoreKey}");
+        props.put("sasl.truststore.password", "${vault:/usr/truststore:truststorePassword}");
+        props.put("sasl.truststore.location", "${vault:/usr/truststore:truststoreLocation}");
+        TestIndirectConfigResolution config = new TestIndirectConfigResolution(props, convertPropertiesToMap(providers));
+        assertEquals(config.originals().get("sasl.truststore.location"), "/usr/vault");
+    }
+
     private static class TestIndirectConfigResolution extends AbstractConfig {
 
         private static final ConfigDef CONFIG;
diff --git a/clients/src/test/java/org/apache/kafka/common/config/provider/MockVaultConfigProvider.java b/clients/src/test/java/org/apache/kafka/common/config/provider/MockVaultConfigProvider.java
index dbfe158..c741798 100644
--- a/clients/src/test/java/org/apache/kafka/common/config/provider/MockVaultConfigProvider.java
+++ b/clients/src/test/java/org/apache/kafka/common/config/provider/MockVaultConfigProvider.java
@@ -19,11 +19,27 @@ package org.apache.kafka.common.config.provider;
 import java.io.IOException;
 import java.io.Reader;
 import java.io.StringReader;
+import java.util.Map;
 
 public class MockVaultConfigProvider extends FileConfigProvider {
 
+    Map<String, ?> vaultConfigs;
+    private boolean configured = false;
+    private static final String LOCATION = "location";
+
     @Override
     protected Reader reader(String path) throws IOException {
-        return new StringReader("truststoreKey=testTruststoreKey\ntruststorePassword=randomtruststorePassword");
+        String vaultLocation = (String) vaultConfigs.get(LOCATION);
+        return new StringReader("truststoreKey=testTruststoreKey\ntruststorePassword=randomtruststorePassword\n" + "truststoreLocation=" + vaultLocation + "\n");
+    }
+
+    @Override
+    public void configure(Map<String, ?> configs) {
+        this.vaultConfigs = configs;
+        configured = true;
+    }
+
+    public boolean configured() {
+        return configured;
     }
 }