You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by ew...@apache.org on 2018/06/07 04:09:42 UTC
[kafka] branch 2.0 updated: KAFKA-6991: Fix ServiceLoader issue
with PluginClassLoader (KIP-285)
This is an automated email from the ASF dual-hosted git repository.
ewencp pushed a commit to branch 2.0
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/2.0 by this push:
new 3b511e5 KAFKA-6991: Fix ServiceLoader issue with PluginClassLoader (KIP-285)
3b511e5 is described below
commit 3b511e5019cdbd38409462325f09b97de39d1cc3
Author: Magesh Nandakumar <ma...@gmail.com>
AuthorDate: Wed Jun 6 21:09:16 2018 -0700
KAFKA-6991: Fix ServiceLoader issue with PluginClassLoader (KIP-285)
Fix ServiceLoader issue with PluginClassLoader and add basic-auth-extension packaging & classpath
*More detailed description of your change,
if necessary. The PR title and PR message become
the squashed commit message, so use a separate
comment to ping reviewers.*
*Summary of testing strategy (including rationale)
for the feature or bug fix. Unit and/or integration
tests are expected for any behaviour change and
system tests should be considered for larger changes.*
Author: Magesh Nandakumar <ma...@gmail.com>
Reviewers: Konstantine Karantasis <ko...@confluent.io>, Ewen Cheslack-Postava <ew...@confluent.io>
Closes #5135 from mageshn/KAFKA-6991
(cherry picked from commit 642a09168c38e961a2936e5a2fe72f5cd83ae7a6)
Signed-off-by: Ewen Cheslack-Postava <me...@ewencp.org>
---
bin/kafka-run-class.sh | 2 +-
build.gradle | 4 ++-
.../BasicAuthSecurityRestExtension.java | 6 ++--
.../JaasBasicAuthFilter.java | 2 +-
.../PropertyFileLoginModule.java | 2 +-
....apache.kafka.connect.rest.ConnectRestExtension | 2 +-
.../JaasBasicAuthFilterTest.java | 4 +--
.../runtime/isolation/DelegatingClassLoader.java | 42 +++++++++++++++++++---
.../connect/runtime/isolation/PluginUtils.java | 1 +
.../isolation/DelegatingClassLoaderTest.java | 41 +++++++++++++++++++++
.../connect/runtime/isolation/PluginUtilsTest.java | 3 ++
11 files changed, 95 insertions(+), 14 deletions(-)
diff --git a/bin/kafka-run-class.sh b/bin/kafka-run-class.sh
index 354e106..f33342e 100755
--- a/bin/kafka-run-class.sh
+++ b/bin/kafka-run-class.sh
@@ -129,7 +129,7 @@ do
CLASSPATH="$CLASSPATH:$dir/*"
done
-for cc_pkg in "api" "transforms" "runtime" "file" "json" "tools"
+for cc_pkg in "api" "transforms" "runtime" "file" "json" "tools" "basic-auth-extension"
do
for file in "$base_dir"/connect/${cc_pkg}/build/libs/connect-${cc_pkg}*.jar;
do
diff --git a/build.gradle b/build.gradle
index 14479f1..b796c21 100644
--- a/build.gradle
+++ b/build.gradle
@@ -507,7 +507,7 @@ for ( sv in availableScalaVersions ) {
}
}
-def connectPkgs = ['connect:api', 'connect:runtime', 'connect:transforms', 'connect:json', 'connect:file']
+def connectPkgs = ['connect:api', 'connect:runtime', 'connect:transforms', 'connect:json', 'connect:file', 'connect:basic-auth-extension']
def pkgs = ['clients', 'examples', 'log4j-appender', 'tools', 'streams', 'streams:streams-scala', 'streams:test-utils', 'streams:examples'] + connectPkgs
/** Create one task per default Scala version */
@@ -728,6 +728,8 @@ project(':core') {
from(project(':connect:json').configurations.runtime) { into("libs/") }
from(project(':connect:file').jar) { into("libs/") }
from(project(':connect:file').configurations.runtime) { into("libs/") }
+ from(project(':connect:basic-auth-extension').jar) { into("libs/") }
+ from(project(':connect:basic-auth-extension').configurations.runtime) { into("libs/") }
from(project(':streams').jar) { into("libs/") }
from(project(':streams').configurations.runtime) { into("libs/") }
from(project(':streams:streams-scala').jar) { into("libs/") }
diff --git a/connect/basic-auth-extension/src/main/java/org/apache/kafka/connect/rest/basic/auth/extenstion/BasicAuthSecurityRestExtension.java b/connect/basic-auth-extension/src/main/java/org/apache/kafka/connect/rest/basic/auth/extension/BasicAuthSecurityRestExtension.java
similarity index 94%
rename from connect/basic-auth-extension/src/main/java/org/apache/kafka/connect/rest/basic/auth/extenstion/BasicAuthSecurityRestExtension.java
rename to connect/basic-auth-extension/src/main/java/org/apache/kafka/connect/rest/basic/auth/extension/BasicAuthSecurityRestExtension.java
index 91d5d9c..4169e9e 100644
--- a/connect/basic-auth-extension/src/main/java/org/apache/kafka/connect/rest/basic/auth/extenstion/BasicAuthSecurityRestExtension.java
+++ b/connect/basic-auth-extension/src/main/java/org/apache/kafka/connect/rest/basic/auth/extension/BasicAuthSecurityRestExtension.java
@@ -15,7 +15,7 @@
* limitations under the License.
*/
-package org.apache.kafka.connect.rest.basic.auth.extenstion;
+package org.apache.kafka.connect.rest.basic.auth.extension;
import org.apache.kafka.common.utils.AppInfoParser;
import org.apache.kafka.connect.rest.ConnectRestExtension;
@@ -33,13 +33,13 @@ import java.util.Map;
*
* <p>To use this extension, one needs to add the following config in the {@code worker.properties}
* <pre>
- * rest.extension.classes = org.apache.kafka.connect.rest.basic.auth.extenstion.BasicAuthSecurityRestExtension
+ * rest.extension.classes = org.apache.kafka.connect.rest.basic.auth.extension.BasicAuthSecurityRestExtension
* </pre>
*
* <p> An example JAAS config would look as below
* <Pre>
* KafkaConnect {
- * org.apache.kafka.connect.rest.basic.auth.extenstion.PropertyFileLoginModule required
+ * org.apache.kafka.connect.rest.basic.auth.extension.PropertyFileLoginModule required
* file="/mnt/secret/credentials.properties";
* };
*</Pre>
diff --git a/connect/basic-auth-extension/src/main/java/org/apache/kafka/connect/rest/basic/auth/extenstion/JaasBasicAuthFilter.java b/connect/basic-auth-extension/src/main/java/org/apache/kafka/connect/rest/basic/auth/extension/JaasBasicAuthFilter.java
similarity index 98%
rename from connect/basic-auth-extension/src/main/java/org/apache/kafka/connect/rest/basic/auth/extenstion/JaasBasicAuthFilter.java
rename to connect/basic-auth-extension/src/main/java/org/apache/kafka/connect/rest/basic/auth/extension/JaasBasicAuthFilter.java
index 7231700..6167434 100644
--- a/connect/basic-auth-extension/src/main/java/org/apache/kafka/connect/rest/basic/auth/extenstion/JaasBasicAuthFilter.java
+++ b/connect/basic-auth-extension/src/main/java/org/apache/kafka/connect/rest/basic/auth/extension/JaasBasicAuthFilter.java
@@ -15,7 +15,7 @@
* limitations under the License.
*/
-package org.apache.kafka.connect.rest.basic.auth.extenstion;
+package org.apache.kafka.connect.rest.basic.auth.extension;
import org.apache.kafka.common.config.ConfigException;
diff --git a/connect/basic-auth-extension/src/main/java/org/apache/kafka/connect/rest/basic/auth/extenstion/PropertyFileLoginModule.java b/connect/basic-auth-extension/src/main/java/org/apache/kafka/connect/rest/basic/auth/extension/PropertyFileLoginModule.java
similarity index 98%
rename from connect/basic-auth-extension/src/main/java/org/apache/kafka/connect/rest/basic/auth/extenstion/PropertyFileLoginModule.java
rename to connect/basic-auth-extension/src/main/java/org/apache/kafka/connect/rest/basic/auth/extension/PropertyFileLoginModule.java
index 7af7863..101c6f4 100644
--- a/connect/basic-auth-extension/src/main/java/org/apache/kafka/connect/rest/basic/auth/extenstion/PropertyFileLoginModule.java
+++ b/connect/basic-auth-extension/src/main/java/org/apache/kafka/connect/rest/basic/auth/extension/PropertyFileLoginModule.java
@@ -15,7 +15,7 @@
* limitations under the License.
*/
-package org.apache.kafka.connect.rest.basic.auth.extenstion;
+package org.apache.kafka.connect.rest.basic.auth.extension;
import org.apache.kafka.common.config.ConfigException;
import org.slf4j.Logger;
diff --git a/connect/basic-auth-extension/src/main/resources/META-INF/services/org.apache.kafka.connect.rest.ConnectRestExtension b/connect/basic-auth-extension/src/main/resources/META-INF/services/org.apache.kafka.connect.rest.ConnectRestExtension
index 098c947..ba7ae5b 100644
--- a/connect/basic-auth-extension/src/main/resources/META-INF/services/org.apache.kafka.connect.rest.ConnectRestExtension
+++ b/connect/basic-auth-extension/src/main/resources/META-INF/services/org.apache.kafka.connect.rest.ConnectRestExtension
@@ -13,4 +13,4 @@
# See the License for the specific language governing permissions and
# limitations under the License.
-org.apache.kafka.connect.rest.basic.auth.extenstion.BasicAuthSecurityRestExtension
\ No newline at end of file
+org.apache.kafka.connect.rest.basic.auth.extension.BasicAuthSecurityRestExtension
\ No newline at end of file
diff --git a/connect/basic-auth-extension/src/test/java/org/apache/kafka/connect/rest/basic/auth/extenstion/JaasBasicAuthFilterTest.java b/connect/basic-auth-extension/src/test/java/org/apache/kafka/connect/rest/basic/auth/extension/JaasBasicAuthFilterTest.java
similarity index 98%
rename from connect/basic-auth-extension/src/test/java/org/apache/kafka/connect/rest/basic/auth/extenstion/JaasBasicAuthFilterTest.java
rename to connect/basic-auth-extension/src/test/java/org/apache/kafka/connect/rest/basic/auth/extension/JaasBasicAuthFilterTest.java
index 80299f8..d61fc06 100644
--- a/connect/basic-auth-extension/src/test/java/org/apache/kafka/connect/rest/basic/auth/extenstion/JaasBasicAuthFilterTest.java
+++ b/connect/basic-auth-extension/src/test/java/org/apache/kafka/connect/rest/basic/auth/extension/JaasBasicAuthFilterTest.java
@@ -15,7 +15,7 @@
* limitations under the License.
*/
-package org.apache.kafka.connect.rest.basic.auth.extenstion;
+package org.apache.kafka.connect.rest.basic.auth.extension;
import org.apache.kafka.common.security.JaasUtils;
import org.easymock.EasyMock;
@@ -155,7 +155,7 @@ public class JaasBasicAuthFilterTest {
List<String> lines;
lines = new ArrayList<>();
- lines.add(loginModule + " { org.apache.kafka.connect.rest.basic.auth.extenstion.PropertyFileLoginModule required ");
+ lines.add(loginModule + " { org.apache.kafka.connect.rest.basic.auth.extension.PropertyFileLoginModule required ");
if (includeFileOptions) {
lines.add("file=\"" + credentialFilePath + "\"");
}
diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/isolation/DelegatingClassLoader.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/isolation/DelegatingClassLoader.java
index fb9cae3..8e31220 100644
--- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/isolation/DelegatingClassLoader.java
+++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/isolation/DelegatingClassLoader.java
@@ -46,6 +46,7 @@ import java.sql.Driver;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
+import java.util.Enumeration;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
@@ -56,6 +57,7 @@ import java.util.SortedMap;
import java.util.SortedSet;
import java.util.TreeMap;
import java.util.TreeSet;
+import java.util.stream.Collectors;
public class DelegatingClassLoader extends URLClassLoader {
private static final Logger log = LoggerFactory.getLogger(DelegatingClassLoader.class);
@@ -72,6 +74,12 @@ public class DelegatingClassLoader extends URLClassLoader {
private final List<String> pluginPaths;
private final Map<Path, PluginClassLoader> activePaths;
+ private static final String MANIFEST_PREFIX = "META-INF/services/";
+ private static final Class[] SERVICE_LOADER_PLUGINS = new Class[] {ConnectRestExtension.class, ConfigProvider.class};
+ private static final Set<String> PLUGIN_MANIFEST_FILES =
+ Arrays.stream(SERVICE_LOADER_PLUGINS).map(serviceLoaderPlugin -> MANIFEST_PREFIX + serviceLoaderPlugin.getName())
+ .collect(Collectors.toSet());
+
public DelegatingClassLoader(List<String> pluginPaths, ClassLoader parent) {
super(new URL[0], parent);
this.pluginPaths = pluginPaths;
@@ -324,12 +332,11 @@ public class DelegatingClassLoader extends URLClassLoader {
return result;
}
- private <T> Collection<PluginDesc<T>> getServiceLoaderPluginDesc(Class<T> klass,
- ClassLoader loader) {
+ private <T> Collection<PluginDesc<T>> getServiceLoaderPluginDesc(Class<T> klass, ClassLoader loader) {
ServiceLoader<T> serviceLoader = ServiceLoader.load(klass, loader);
Collection<PluginDesc<T>> result = new ArrayList<>();
- for (T impl : serviceLoader) {
- result.add(new PluginDesc<>(klass, versionFor(impl), loader));
+ for (T pluginImpl : serviceLoader) {
+ result.add(new PluginDesc<>((Class<? extends T>) pluginImpl.getClass(), versionFor(pluginImpl), loader));
}
return result;
}
@@ -407,4 +414,31 @@ public class DelegatingClassLoader extends URLClassLoader {
}
}
}
+
+ @Override
+ public URL getResource(String name) {
+ if (serviceLoaderManifestForPlugin(name)) {
+ // Default implementation of getResource searches the parent class loader and if not available/found, its own URL paths.
+ // This will enable thePluginClassLoader to limit its resource search only to its own URL paths.
+ return null;
+ } else {
+ return super.getResource(name);
+ }
+ }
+
+ @Override
+ public Enumeration<URL> getResources(String name) throws IOException {
+ if (serviceLoaderManifestForPlugin(name)) {
+ // Default implementation of getResources searches the parent class loader and and also its own URL paths. This will enable the
+ // PluginClassLoader to limit its resource search to only its own URL paths.
+ return null;
+ } else {
+ return super.getResources(name);
+ }
+ }
+
+ //Visible for testing
+ static boolean serviceLoaderManifestForPlugin(String name) {
+ return PLUGIN_MANIFEST_FILES.contains(name);
+ }
}
diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/isolation/PluginUtils.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/isolation/PluginUtils.java
index f6c1185..b4aee47 100644
--- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/isolation/PluginUtils.java
+++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/isolation/PluginUtils.java
@@ -128,6 +128,7 @@ public class PluginUtils {
+ "|file\\..*"
+ "|converters\\..*"
+ "|storage\\.StringConverter"
+ + "|rest\\.basic\\.auth\\.extension\\.BasicAuthSecurityRestExtension"
+ "))$";
private static final DirectoryStream.Filter<Path> PLUGIN_PATH_FILTER = new DirectoryStream
diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/isolation/DelegatingClassLoaderTest.java b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/isolation/DelegatingClassLoaderTest.java
new file mode 100644
index 0000000..83ff040
--- /dev/null
+++ b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/isolation/DelegatingClassLoaderTest.java
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.connect.runtime.isolation;
+
+import org.junit.Test;
+
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+
+public class DelegatingClassLoaderTest {
+
+ @Test
+ public void testWhiteListedManifestResources() {
+ assertTrue(
+ DelegatingClassLoader.serviceLoaderManifestForPlugin("META-INF/services/org.apache.kafka.connect.rest.ConnectRestExtension"));
+ assertTrue(
+ DelegatingClassLoader.serviceLoaderManifestForPlugin("META-INF/services/org.apache.kafka.common.config.ConfigProvider"));
+ }
+
+ @Test
+ public void testOtherResources() {
+ assertFalse(
+ DelegatingClassLoader.serviceLoaderManifestForPlugin("META-INF/services/org.apache.kafka.connect.transforms.Transformation"));
+ assertFalse(DelegatingClassLoader.serviceLoaderManifestForPlugin("resource/version.properties"));
+ }
+}
diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/isolation/PluginUtilsTest.java b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/isolation/PluginUtilsTest.java
index a5ab50a..9698153 100644
--- a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/isolation/PluginUtilsTest.java
+++ b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/isolation/PluginUtilsTest.java
@@ -146,6 +146,9 @@ public class PluginUtilsTest {
assertTrue(PluginUtils.shouldLoadInIsolation(
"org.apache.kafka.connect.storage.StringConverter")
);
+ assertTrue(PluginUtils.shouldLoadInIsolation(
+ "org.apache.kafka.connect.rest.basic.auth.extension.BasicAuthSecurityRestExtension"
+ ));
}
@Test
--
To stop receiving notification emails like this one, please contact
ewencp@apache.org.