You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by ew...@apache.org on 2018/06/06 15:56:31 UTC
[kafka] branch 2.0 updated: MINOR: Use service loader for
ConfigProvider impls (KIP-297)
This is an automated email from the ASF dual-hosted git repository.
ewencp pushed a commit to branch 2.0
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/2.0 by this push:
new f9b30ce MINOR: Use service loader for ConfigProvider impls (KIP-297)
f9b30ce is described below
commit f9b30ceb2eb828050589e82218f148fcceb69a40
Author: Robert Yokota <ra...@gmail.com>
AuthorDate: Wed Jun 6 08:55:57 2018 -0700
MINOR: Use service loader for ConfigProvider impls (KIP-297)
This is a small change to use the Java ServiceLoader to load ConfigProvider plugins. It uses code added by mageshn for Connect Rest Extensions.
Author: Robert Yokota <ra...@gmail.com>
Reviewers: Magesh Nandakumar <ma...@gmail.com>, Randall Hauch <rh...@gmail.com>, Ewen Cheslack-Postava <ew...@confluent.io>
Closes #5141 from rayokota/service-loader-for-config-plugins
(cherry picked from commit 8264492deefa594567573fecd19fba3068bf3d6f)
Signed-off-by: Ewen Cheslack-Postava <me...@ewencp.org>
---
.../apache/kafka/common/config/FileConfigProvider.java | 3 ++-
.../org.apache.kafka.common.config.ConfigProvider | 16 ++++++++++++++++
.../org/apache/kafka/connect/runtime/WorkerConfig.java | 12 +++++++-----
.../connect/runtime/isolation/DelegatingClassLoader.java | 3 +--
.../kafka/connect/runtime/isolation/PluginUtils.java | 4 ++--
.../kafka/connect/runtime/isolation/PluginUtilsTest.java | 10 ++++++++++
6 files changed, 38 insertions(+), 10 deletions(-)
diff --git a/clients/src/main/java/org/apache/kafka/common/config/FileConfigProvider.java b/clients/src/main/java/org/apache/kafka/common/config/FileConfigProvider.java
index fefc935..ff6bc5e 100644
--- a/clients/src/main/java/org/apache/kafka/common/config/FileConfigProvider.java
+++ b/clients/src/main/java/org/apache/kafka/common/config/FileConfigProvider.java
@@ -16,6 +16,7 @@
*/
package org.apache.kafka.common.config;
+import java.io.BufferedReader;
import java.io.FileInputStream;
import java.io.IOException;
import java.io.InputStreamReader;
@@ -93,7 +94,7 @@ public class FileConfigProvider implements ConfigProvider {
// visible for testing
protected Reader reader(String path) throws IOException {
- return new InputStreamReader(new FileInputStream(path), StandardCharsets.UTF_8);
+ return new BufferedReader(new InputStreamReader(new FileInputStream(path), StandardCharsets.UTF_8));
}
public void close() {
diff --git a/clients/src/main/resources/META-INF/services/org.apache.kafka.common.config.ConfigProvider b/clients/src/main/resources/META-INF/services/org.apache.kafka.common.config.ConfigProvider
new file mode 100644
index 0000000..bcad016
--- /dev/null
+++ b/clients/src/main/resources/META-INF/services/org.apache.kafka.common.config.ConfigProvider
@@ -0,0 +1,16 @@
+ # Licensed to the Apache Software Foundation (ASF) under one or more
+ # contributor license agreements. See the NOTICE file distributed with
+ # this work for additional information regarding copyright ownership.
+ # The ASF licenses this file to You under the Apache License, Version 2.0
+ # (the "License"); you may not use this file except in compliance with
+ # the License. You may obtain a copy of the License at
+ #
+ # http://www.apache.org/licenses/LICENSE-2.0
+ #
+ # Unless required by applicable law or agreed to in writing, software
+ # distributed under the License is distributed on an "AS IS" BASIS,
+ # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ # See the License for the specific language governing permissions and
+ # limitations under the License.
+
+org.apache.kafka.common.config.FileConfigProvider
diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerConfig.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerConfig.java
index 355cfbb..583953d 100644
--- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerConfig.java
+++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerConfig.java
@@ -192,16 +192,18 @@ public class WorkerConfig extends AbstractConfig {
+ "/opt/connectors";
public static final String CONFIG_PROVIDERS_CONFIG = "config.providers";
- protected static final String CONFIG_PROVIDERS_DOC = "List of configuration providers. "
- + "This is a comma-separated list of the fully-qualified names of the ConfigProvider implementations, "
- + "in the order they will be created, configured, and used.";
+ protected static final String CONFIG_PROVIDERS_DOC =
+ "Comma-separated names of <code>ConfigProvider</code> classes, loaded and used "
+ + "in the order specified. Implementing the interface "
+ + "<code>ConfigProvider</code> allows you to replace variable references in connector configurations, "
+ + "such as for externalized secrets. ";
public static final String REST_EXTENSION_CLASSES_CONFIG = "rest.extension.classes";
protected static final String REST_EXTENSION_CLASSES_DOC =
"Comma-separated names of <code>ConnectRestExtension</code> classes, loaded and called "
+ "in the order specified. Implementing the interface "
- + "<code>ConnectRestExtension</code> allows you to inject into Connect's REST API user defined resources like filters. "
- + "Typically used to add custom capability like logging, security, etc.";
+ + "<code>ConnectRestExtension</code> allows you to inject into Connect's REST API user defined resources like filters. "
+ + "Typically used to add custom capability like logging, security, etc. ";
public static final String METRICS_SAMPLE_WINDOW_MS_CONFIG = CommonClientConfigs.METRICS_SAMPLE_WINDOW_MS_CONFIG;
public static final String METRICS_NUM_SAMPLES_CONFIG = CommonClientConfigs.METRICS_NUM_SAMPLES_CONFIG;
diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/isolation/DelegatingClassLoader.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/isolation/DelegatingClassLoader.java
index 1e59851..fb9cae3 100644
--- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/isolation/DelegatingClassLoader.java
+++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/isolation/DelegatingClassLoader.java
@@ -301,7 +301,7 @@ public class DelegatingClassLoader extends URLClassLoader {
getPluginDesc(reflections, Converter.class, loader),
getPluginDesc(reflections, HeaderConverter.class, loader),
getPluginDesc(reflections, Transformation.class, loader),
- getPluginDesc(reflections, ConfigProvider.class, loader),
+ getServiceLoaderPluginDesc(ConfigProvider.class, loader),
getServiceLoaderPluginDesc(ConnectRestExtension.class, loader)
);
}
@@ -326,7 +326,6 @@ public class DelegatingClassLoader extends URLClassLoader {
private <T> Collection<PluginDesc<T>> getServiceLoaderPluginDesc(Class<T> klass,
ClassLoader loader) {
-
ServiceLoader<T> serviceLoader = ServiceLoader.load(klass, loader);
Collection<PluginDesc<T>> result = new ArrayList<>();
for (T impl : serviceLoader) {
diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/isolation/PluginUtils.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/isolation/PluginUtils.java
index d490bde..f6c1185 100644
--- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/isolation/PluginUtils.java
+++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/isolation/PluginUtils.java
@@ -122,13 +122,13 @@ public class PluginUtils {
+ "|org\\.slf4j"
+ ")\\..*$";
- private static final String WHITELIST = "^org\\.apache\\.kafka\\.connect\\.(?:"
+ private static final String WHITELIST = "^org\\.apache\\.kafka\\.(?:common.config..*ConfigProvider|connect\\.(?:"
+ "transforms\\.(?!Transformation$).*"
+ "|json\\..*"
+ "|file\\..*"
+ "|converters\\..*"
+ "|storage\\.StringConverter"
- + ")$";
+ + "))$";
private static final DirectoryStream.Filter<Path> PLUGIN_PATH_FILTER = new DirectoryStream
.Filter<Path>() {
diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/isolation/PluginUtilsTest.java b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/isolation/PluginUtilsTest.java
index 4bc6e15..a5ab50a 100644
--- a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/isolation/PluginUtilsTest.java
+++ b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/isolation/PluginUtilsTest.java
@@ -149,6 +149,16 @@ public class PluginUtilsTest {
}
@Test
+ public void testClientConfigProvider() throws Exception {
+ assertTrue(PluginUtils.shouldLoadInIsolation(
+ "org.apache.kafka.common.config.FileConfigProvider")
+ );
+ assertTrue(PluginUtils.shouldLoadInIsolation(
+ "org.apache.kafka.common.config.FutureConfigProvider")
+ );
+ }
+
+ @Test
public void testEmptyPluginUrls() throws Exception {
assertEquals(Collections.<Path>emptyList(), PluginUtils.pluginUrls(pluginPath));
}
--
To stop receiving notification emails like this one, please contact
ewencp@apache.org.