You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by ew...@apache.org on 2018/06/06 15:56:31 UTC

[kafka] branch 2.0 updated: MINOR: Use service loader for ConfigProvider impls (KIP-297)

This is an automated email from the ASF dual-hosted git repository.

ewencp pushed a commit to branch 2.0
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/2.0 by this push:
     new f9b30ce  MINOR: Use service loader for ConfigProvider impls (KIP-297)
f9b30ce is described below

commit f9b30ceb2eb828050589e82218f148fcceb69a40
Author: Robert Yokota <ra...@gmail.com>
AuthorDate: Wed Jun 6 08:55:57 2018 -0700

    MINOR: Use service loader for ConfigProvider impls (KIP-297)
    
    This is a small change to use the Java ServiceLoader to load ConfigProvider plugins.  It uses code added by mageshn for Connect Rest Extensions.
    
    Author: Robert Yokota <ra...@gmail.com>
    
    Reviewers: Magesh Nandakumar <ma...@gmail.com>, Randall Hauch <rh...@gmail.com>, Ewen Cheslack-Postava <ew...@confluent.io>
    
    Closes #5141 from rayokota/service-loader-for-config-plugins
    
    (cherry picked from commit 8264492deefa594567573fecd19fba3068bf3d6f)
    Signed-off-by: Ewen Cheslack-Postava <me...@ewencp.org>
---
 .../apache/kafka/common/config/FileConfigProvider.java   |  3 ++-
 .../org.apache.kafka.common.config.ConfigProvider        | 16 ++++++++++++++++
 .../org/apache/kafka/connect/runtime/WorkerConfig.java   | 12 +++++++-----
 .../connect/runtime/isolation/DelegatingClassLoader.java |  3 +--
 .../kafka/connect/runtime/isolation/PluginUtils.java     |  4 ++--
 .../kafka/connect/runtime/isolation/PluginUtilsTest.java | 10 ++++++++++
 6 files changed, 38 insertions(+), 10 deletions(-)

diff --git a/clients/src/main/java/org/apache/kafka/common/config/FileConfigProvider.java b/clients/src/main/java/org/apache/kafka/common/config/FileConfigProvider.java
index fefc935..ff6bc5e 100644
--- a/clients/src/main/java/org/apache/kafka/common/config/FileConfigProvider.java
+++ b/clients/src/main/java/org/apache/kafka/common/config/FileConfigProvider.java
@@ -16,6 +16,7 @@
  */
 package org.apache.kafka.common.config;
 
+import java.io.BufferedReader;
 import java.io.FileInputStream;
 import java.io.IOException;
 import java.io.InputStreamReader;
@@ -93,7 +94,7 @@ public class FileConfigProvider implements ConfigProvider {
 
     // visible for testing
     protected Reader reader(String path) throws IOException {
-        return new InputStreamReader(new FileInputStream(path), StandardCharsets.UTF_8);
+        return new BufferedReader(new InputStreamReader(new FileInputStream(path), StandardCharsets.UTF_8));
     }
 
     public void close() {
diff --git a/clients/src/main/resources/META-INF/services/org.apache.kafka.common.config.ConfigProvider b/clients/src/main/resources/META-INF/services/org.apache.kafka.common.config.ConfigProvider
new file mode 100644
index 0000000..bcad016
--- /dev/null
+++ b/clients/src/main/resources/META-INF/services/org.apache.kafka.common.config.ConfigProvider
@@ -0,0 +1,16 @@
+ # Licensed to the Apache Software Foundation (ASF) under one or more
+ # contributor license agreements. See the NOTICE file distributed with
+ # this work for additional information regarding copyright ownership.
+ # The ASF licenses this file to You under the Apache License, Version 2.0
+ # (the "License"); you may not use this file except in compliance with
+ # the License. You may obtain a copy of the License at
+ #
+ #    http://www.apache.org/licenses/LICENSE-2.0
+ #
+ # Unless required by applicable law or agreed to in writing, software
+ # distributed under the License is distributed on an "AS IS" BASIS,
+ # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ # See the License for the specific language governing permissions and
+ # limitations under the License.
+
+org.apache.kafka.common.config.FileConfigProvider
diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerConfig.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerConfig.java
index 355cfbb..583953d 100644
--- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerConfig.java
+++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerConfig.java
@@ -192,16 +192,18 @@ public class WorkerConfig extends AbstractConfig {
             + "/opt/connectors";
 
     public static final String CONFIG_PROVIDERS_CONFIG = "config.providers";
-    protected static final String CONFIG_PROVIDERS_DOC = "List of configuration providers. "
-            + "This is a comma-separated list of the fully-qualified names of the ConfigProvider implementations, "
-            + "in the order they will be created, configured, and used.";
+    protected static final String CONFIG_PROVIDERS_DOC =
+            "Comma-separated names of <code>ConfigProvider</code> classes, loaded and used "
+            + "in the order specified. Implementing the interface  "
+            + "<code>ConfigProvider</code> allows you to replace variable references in connector configurations, "
+            + "such as for externalized secrets. ";
 
     public static final String REST_EXTENSION_CLASSES_CONFIG = "rest.extension.classes";
     protected static final String REST_EXTENSION_CLASSES_DOC =
             "Comma-separated names of <code>ConnectRestExtension</code> classes, loaded and called "
             + "in the order specified. Implementing the interface  "
-            + "<code>ConnectRestExtension</code> allows you to inject into Connect's REST API user defined resources  like filters. "
-            + "Typically used to add custom capability like logging, security, etc.";
+            + "<code>ConnectRestExtension</code> allows you to inject into Connect's REST API user defined resources like filters. "
+            + "Typically used to add custom capability like logging, security, etc. ";
 
     public static final String METRICS_SAMPLE_WINDOW_MS_CONFIG = CommonClientConfigs.METRICS_SAMPLE_WINDOW_MS_CONFIG;
     public static final String METRICS_NUM_SAMPLES_CONFIG = CommonClientConfigs.METRICS_NUM_SAMPLES_CONFIG;
diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/isolation/DelegatingClassLoader.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/isolation/DelegatingClassLoader.java
index 1e59851..fb9cae3 100644
--- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/isolation/DelegatingClassLoader.java
+++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/isolation/DelegatingClassLoader.java
@@ -301,7 +301,7 @@ public class DelegatingClassLoader extends URLClassLoader {
                 getPluginDesc(reflections, Converter.class, loader),
                 getPluginDesc(reflections, HeaderConverter.class, loader),
                 getPluginDesc(reflections, Transformation.class, loader),
-                getPluginDesc(reflections, ConfigProvider.class, loader),
+                getServiceLoaderPluginDesc(ConfigProvider.class, loader),
                 getServiceLoaderPluginDesc(ConnectRestExtension.class, loader)
         );
     }
@@ -326,7 +326,6 @@ public class DelegatingClassLoader extends URLClassLoader {
 
     private <T> Collection<PluginDesc<T>> getServiceLoaderPluginDesc(Class<T> klass,
                                                                      ClassLoader loader) {
-
         ServiceLoader<T> serviceLoader = ServiceLoader.load(klass, loader);
         Collection<PluginDesc<T>> result = new ArrayList<>();
         for (T impl : serviceLoader) {
diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/isolation/PluginUtils.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/isolation/PluginUtils.java
index d490bde..f6c1185 100644
--- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/isolation/PluginUtils.java
+++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/isolation/PluginUtils.java
@@ -122,13 +122,13 @@ public class PluginUtils {
             + "|org\\.slf4j"
             + ")\\..*$";
 
-    private static final String WHITELIST = "^org\\.apache\\.kafka\\.connect\\.(?:"
+    private static final String WHITELIST = "^org\\.apache\\.kafka\\.(?:common.config..*ConfigProvider|connect\\.(?:"
             + "transforms\\.(?!Transformation$).*"
             + "|json\\..*"
             + "|file\\..*"
             + "|converters\\..*"
             + "|storage\\.StringConverter"
-            + ")$";
+            + "))$";
 
     private static final DirectoryStream.Filter<Path> PLUGIN_PATH_FILTER = new DirectoryStream
             .Filter<Path>() {
diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/isolation/PluginUtilsTest.java b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/isolation/PluginUtilsTest.java
index 4bc6e15..a5ab50a 100644
--- a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/isolation/PluginUtilsTest.java
+++ b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/isolation/PluginUtilsTest.java
@@ -149,6 +149,16 @@ public class PluginUtilsTest {
     }
 
     @Test
+    public void testClientConfigProvider() throws Exception {
+        assertTrue(PluginUtils.shouldLoadInIsolation(
+                "org.apache.kafka.common.config.FileConfigProvider")
+        );
+        assertTrue(PluginUtils.shouldLoadInIsolation(
+                "org.apache.kafka.common.config.FutureConfigProvider")
+        );
+    }
+
+    @Test
     public void testEmptyPluginUrls() throws Exception {
         assertEquals(Collections.<Path>emptyList(), PluginUtils.pluginUrls(pluginPath));
     }

-- 
To stop receiving notification emails like this one, please contact
ewencp@apache.org.