You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by ew...@apache.org on 2018/06/07 04:09:42 UTC

[kafka] branch 2.0 updated: KAFKA-6991: Fix ServiceLoader issue with PluginClassLoader (KIP-285)

This is an automated email from the ASF dual-hosted git repository.

ewencp pushed a commit to branch 2.0
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/2.0 by this push:
     new 3b511e5  KAFKA-6991: Fix ServiceLoader issue with PluginClassLoader (KIP-285)
3b511e5 is described below

commit 3b511e5019cdbd38409462325f09b97de39d1cc3
Author: Magesh Nandakumar <ma...@gmail.com>
AuthorDate: Wed Jun 6 21:09:16 2018 -0700

    KAFKA-6991: Fix ServiceLoader issue with PluginClassLoader (KIP-285)
    
    Fix ServiceLoader issue with PluginClassLoader and add basic-auth-extension packaging & classpath
    
    *More detailed description of your change,
    if necessary. The PR title and PR message become
    the squashed commit message, so use a separate
    comment to ping reviewers.*
    
    *Summary of testing strategy (including rationale)
    for the feature or bug fix. Unit and/or integration
    tests are expected for any behaviour change and
    system tests should be considered for larger changes.*
    
    Author: Magesh Nandakumar <ma...@gmail.com>
    
    Reviewers: Konstantine Karantasis <ko...@confluent.io>, Ewen Cheslack-Postava <ew...@confluent.io>
    
    Closes #5135 from mageshn/KAFKA-6991
    
    (cherry picked from commit 642a09168c38e961a2936e5a2fe72f5cd83ae7a6)
    Signed-off-by: Ewen Cheslack-Postava <me...@ewencp.org>
---
 bin/kafka-run-class.sh                             |  2 +-
 build.gradle                                       |  4 ++-
 .../BasicAuthSecurityRestExtension.java            |  6 ++--
 .../JaasBasicAuthFilter.java                       |  2 +-
 .../PropertyFileLoginModule.java                   |  2 +-
 ....apache.kafka.connect.rest.ConnectRestExtension |  2 +-
 .../JaasBasicAuthFilterTest.java                   |  4 +--
 .../runtime/isolation/DelegatingClassLoader.java   | 42 +++++++++++++++++++---
 .../connect/runtime/isolation/PluginUtils.java     |  1 +
 .../isolation/DelegatingClassLoaderTest.java       | 41 +++++++++++++++++++++
 .../connect/runtime/isolation/PluginUtilsTest.java |  3 ++
 11 files changed, 95 insertions(+), 14 deletions(-)

diff --git a/bin/kafka-run-class.sh b/bin/kafka-run-class.sh
index 354e106..f33342e 100755
--- a/bin/kafka-run-class.sh
+++ b/bin/kafka-run-class.sh
@@ -129,7 +129,7 @@ do
   CLASSPATH="$CLASSPATH:$dir/*"
 done
 
-for cc_pkg in "api" "transforms" "runtime" "file" "json" "tools"
+for cc_pkg in "api" "transforms" "runtime" "file" "json" "tools" "basic-auth-extension"
 do
   for file in "$base_dir"/connect/${cc_pkg}/build/libs/connect-${cc_pkg}*.jar;
   do
diff --git a/build.gradle b/build.gradle
index 14479f1..b796c21 100644
--- a/build.gradle
+++ b/build.gradle
@@ -507,7 +507,7 @@ for ( sv in availableScalaVersions ) {
   }
 }
 
-def connectPkgs = ['connect:api', 'connect:runtime', 'connect:transforms', 'connect:json', 'connect:file']
+def connectPkgs = ['connect:api', 'connect:runtime', 'connect:transforms', 'connect:json', 'connect:file', 'connect:basic-auth-extension']
 def pkgs = ['clients', 'examples', 'log4j-appender', 'tools', 'streams', 'streams:streams-scala', 'streams:test-utils', 'streams:examples'] + connectPkgs
 
 /** Create one task per default Scala version */
@@ -728,6 +728,8 @@ project(':core') {
     from(project(':connect:json').configurations.runtime) { into("libs/") }
     from(project(':connect:file').jar) { into("libs/") }
     from(project(':connect:file').configurations.runtime) { into("libs/") }
+    from(project(':connect:basic-auth-extension').jar) { into("libs/") }
+    from(project(':connect:basic-auth-extension').configurations.runtime) { into("libs/") }
     from(project(':streams').jar) { into("libs/") }
     from(project(':streams').configurations.runtime) { into("libs/") }
     from(project(':streams:streams-scala').jar) { into("libs/") }
diff --git a/connect/basic-auth-extension/src/main/java/org/apache/kafka/connect/rest/basic/auth/extenstion/BasicAuthSecurityRestExtension.java b/connect/basic-auth-extension/src/main/java/org/apache/kafka/connect/rest/basic/auth/extension/BasicAuthSecurityRestExtension.java
similarity index 94%
rename from connect/basic-auth-extension/src/main/java/org/apache/kafka/connect/rest/basic/auth/extenstion/BasicAuthSecurityRestExtension.java
rename to connect/basic-auth-extension/src/main/java/org/apache/kafka/connect/rest/basic/auth/extension/BasicAuthSecurityRestExtension.java
index 91d5d9c..4169e9e 100644
--- a/connect/basic-auth-extension/src/main/java/org/apache/kafka/connect/rest/basic/auth/extenstion/BasicAuthSecurityRestExtension.java
+++ b/connect/basic-auth-extension/src/main/java/org/apache/kafka/connect/rest/basic/auth/extension/BasicAuthSecurityRestExtension.java
@@ -15,7 +15,7 @@
  * limitations under the License.
  */
 
-package org.apache.kafka.connect.rest.basic.auth.extenstion;
+package org.apache.kafka.connect.rest.basic.auth.extension;
 
 import org.apache.kafka.common.utils.AppInfoParser;
 import org.apache.kafka.connect.rest.ConnectRestExtension;
@@ -33,13 +33,13 @@ import java.util.Map;
  *
  * <p>To use this extension, one needs to add the following config in the {@code worker.properties}
  * <pre>
- *     rest.extension.classes = org.apache.kafka.connect.rest.basic.auth.extenstion.BasicAuthSecurityRestExtension
+ *     rest.extension.classes = org.apache.kafka.connect.rest.basic.auth.extension.BasicAuthSecurityRestExtension
  * </pre>
  *
  * <p> An example JAAS config would look as below
  * <Pre>
  *         KafkaConnect {
- *              org.apache.kafka.connect.rest.basic.auth.extenstion.PropertyFileLoginModule required
+ *              org.apache.kafka.connect.rest.basic.auth.extension.PropertyFileLoginModule required
  *              file="/mnt/secret/credentials.properties";
  *         };
  *</Pre>
diff --git a/connect/basic-auth-extension/src/main/java/org/apache/kafka/connect/rest/basic/auth/extenstion/JaasBasicAuthFilter.java b/connect/basic-auth-extension/src/main/java/org/apache/kafka/connect/rest/basic/auth/extension/JaasBasicAuthFilter.java
similarity index 98%
rename from connect/basic-auth-extension/src/main/java/org/apache/kafka/connect/rest/basic/auth/extenstion/JaasBasicAuthFilter.java
rename to connect/basic-auth-extension/src/main/java/org/apache/kafka/connect/rest/basic/auth/extension/JaasBasicAuthFilter.java
index 7231700..6167434 100644
--- a/connect/basic-auth-extension/src/main/java/org/apache/kafka/connect/rest/basic/auth/extenstion/JaasBasicAuthFilter.java
+++ b/connect/basic-auth-extension/src/main/java/org/apache/kafka/connect/rest/basic/auth/extension/JaasBasicAuthFilter.java
@@ -15,7 +15,7 @@
  * limitations under the License.
  */
 
-package org.apache.kafka.connect.rest.basic.auth.extenstion;
+package org.apache.kafka.connect.rest.basic.auth.extension;
 
 import org.apache.kafka.common.config.ConfigException;
 
diff --git a/connect/basic-auth-extension/src/main/java/org/apache/kafka/connect/rest/basic/auth/extenstion/PropertyFileLoginModule.java b/connect/basic-auth-extension/src/main/java/org/apache/kafka/connect/rest/basic/auth/extension/PropertyFileLoginModule.java
similarity index 98%
rename from connect/basic-auth-extension/src/main/java/org/apache/kafka/connect/rest/basic/auth/extenstion/PropertyFileLoginModule.java
rename to connect/basic-auth-extension/src/main/java/org/apache/kafka/connect/rest/basic/auth/extension/PropertyFileLoginModule.java
index 7af7863..101c6f4 100644
--- a/connect/basic-auth-extension/src/main/java/org/apache/kafka/connect/rest/basic/auth/extenstion/PropertyFileLoginModule.java
+++ b/connect/basic-auth-extension/src/main/java/org/apache/kafka/connect/rest/basic/auth/extension/PropertyFileLoginModule.java
@@ -15,7 +15,7 @@
  * limitations under the License.
  */
 
-package org.apache.kafka.connect.rest.basic.auth.extenstion;
+package org.apache.kafka.connect.rest.basic.auth.extension;
 
 import org.apache.kafka.common.config.ConfigException;
 import org.slf4j.Logger;
diff --git a/connect/basic-auth-extension/src/main/resources/META-INF/services/org.apache.kafka.connect.rest.ConnectRestExtension b/connect/basic-auth-extension/src/main/resources/META-INF/services/org.apache.kafka.connect.rest.ConnectRestExtension
index 098c947..ba7ae5b 100644
--- a/connect/basic-auth-extension/src/main/resources/META-INF/services/org.apache.kafka.connect.rest.ConnectRestExtension
+++ b/connect/basic-auth-extension/src/main/resources/META-INF/services/org.apache.kafka.connect.rest.ConnectRestExtension
@@ -13,4 +13,4 @@
  # See the License for the specific language governing permissions and
  # limitations under the License.
 
-org.apache.kafka.connect.rest.basic.auth.extenstion.BasicAuthSecurityRestExtension
\ No newline at end of file
+org.apache.kafka.connect.rest.basic.auth.extension.BasicAuthSecurityRestExtension
\ No newline at end of file
diff --git a/connect/basic-auth-extension/src/test/java/org/apache/kafka/connect/rest/basic/auth/extenstion/JaasBasicAuthFilterTest.java b/connect/basic-auth-extension/src/test/java/org/apache/kafka/connect/rest/basic/auth/extension/JaasBasicAuthFilterTest.java
similarity index 98%
rename from connect/basic-auth-extension/src/test/java/org/apache/kafka/connect/rest/basic/auth/extenstion/JaasBasicAuthFilterTest.java
rename to connect/basic-auth-extension/src/test/java/org/apache/kafka/connect/rest/basic/auth/extension/JaasBasicAuthFilterTest.java
index 80299f8..d61fc06 100644
--- a/connect/basic-auth-extension/src/test/java/org/apache/kafka/connect/rest/basic/auth/extenstion/JaasBasicAuthFilterTest.java
+++ b/connect/basic-auth-extension/src/test/java/org/apache/kafka/connect/rest/basic/auth/extension/JaasBasicAuthFilterTest.java
@@ -15,7 +15,7 @@
  * limitations under the License.
  */
 
-package org.apache.kafka.connect.rest.basic.auth.extenstion;
+package org.apache.kafka.connect.rest.basic.auth.extension;
 
 import org.apache.kafka.common.security.JaasUtils;
 import org.easymock.EasyMock;
@@ -155,7 +155,7 @@ public class JaasBasicAuthFilterTest {
 
         List<String> lines;
         lines = new ArrayList<>();
-        lines.add(loginModule + " { org.apache.kafka.connect.rest.basic.auth.extenstion.PropertyFileLoginModule required ");
+        lines.add(loginModule + " { org.apache.kafka.connect.rest.basic.auth.extension.PropertyFileLoginModule required ");
         if (includeFileOptions) {
             lines.add("file=\"" + credentialFilePath + "\"");
         }
diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/isolation/DelegatingClassLoader.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/isolation/DelegatingClassLoader.java
index fb9cae3..8e31220 100644
--- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/isolation/DelegatingClassLoader.java
+++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/isolation/DelegatingClassLoader.java
@@ -46,6 +46,7 @@ import java.sql.Driver;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collection;
+import java.util.Enumeration;
 import java.util.HashMap;
 import java.util.Iterator;
 import java.util.List;
@@ -56,6 +57,7 @@ import java.util.SortedMap;
 import java.util.SortedSet;
 import java.util.TreeMap;
 import java.util.TreeSet;
+import java.util.stream.Collectors;
 
 public class DelegatingClassLoader extends URLClassLoader {
     private static final Logger log = LoggerFactory.getLogger(DelegatingClassLoader.class);
@@ -72,6 +74,12 @@ public class DelegatingClassLoader extends URLClassLoader {
     private final List<String> pluginPaths;
     private final Map<Path, PluginClassLoader> activePaths;
 
+    private static final String MANIFEST_PREFIX = "META-INF/services/";
+    private static final Class[] SERVICE_LOADER_PLUGINS = new Class[] {ConnectRestExtension.class, ConfigProvider.class};
+    private static final Set<String> PLUGIN_MANIFEST_FILES =
+        Arrays.stream(SERVICE_LOADER_PLUGINS).map(serviceLoaderPlugin -> MANIFEST_PREFIX + serviceLoaderPlugin.getName())
+            .collect(Collectors.toSet());
+
     public DelegatingClassLoader(List<String> pluginPaths, ClassLoader parent) {
         super(new URL[0], parent);
         this.pluginPaths = pluginPaths;
@@ -324,12 +332,11 @@ public class DelegatingClassLoader extends URLClassLoader {
         return result;
     }
 
-    private <T> Collection<PluginDesc<T>> getServiceLoaderPluginDesc(Class<T> klass,
-                                                                     ClassLoader loader) {
+    private <T> Collection<PluginDesc<T>> getServiceLoaderPluginDesc(Class<T> klass, ClassLoader loader) {
         ServiceLoader<T> serviceLoader = ServiceLoader.load(klass, loader);
         Collection<PluginDesc<T>> result = new ArrayList<>();
-        for (T impl : serviceLoader) {
-            result.add(new PluginDesc<>(klass, versionFor(impl), loader));
+        for (T pluginImpl : serviceLoader) {
+            result.add(new PluginDesc<>((Class<? extends T>) pluginImpl.getClass(), versionFor(pluginImpl), loader));
         }
         return result;
     }
@@ -407,4 +414,31 @@ public class DelegatingClassLoader extends URLClassLoader {
             }
         }
     }
+
+    @Override
+    public URL getResource(String name) {
+        if (serviceLoaderManifestForPlugin(name)) {
+            // Default implementation of getResource searches the parent class loader and if not available/found, its own URL paths.
+            // This will enable thePluginClassLoader to limit its resource search only to its own URL paths.
+            return null;
+        } else {
+            return super.getResource(name);
+        }
+    }
+
+    @Override
+    public Enumeration<URL> getResources(String name) throws IOException {
+        if (serviceLoaderManifestForPlugin(name)) {
+            // Default implementation of getResources searches the parent class loader and and also its own URL paths. This will enable the
+            // PluginClassLoader to limit its resource search to only its own URL paths.
+            return null;
+        } else {
+            return super.getResources(name);
+        }
+    }
+
+    //Visible for testing
+    static boolean serviceLoaderManifestForPlugin(String name) {
+        return PLUGIN_MANIFEST_FILES.contains(name);
+    }
 }
diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/isolation/PluginUtils.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/isolation/PluginUtils.java
index f6c1185..b4aee47 100644
--- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/isolation/PluginUtils.java
+++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/isolation/PluginUtils.java
@@ -128,6 +128,7 @@ public class PluginUtils {
             + "|file\\..*"
             + "|converters\\..*"
             + "|storage\\.StringConverter"
+            + "|rest\\.basic\\.auth\\.extension\\.BasicAuthSecurityRestExtension"
             + "))$";
 
     private static final DirectoryStream.Filter<Path> PLUGIN_PATH_FILTER = new DirectoryStream
diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/isolation/DelegatingClassLoaderTest.java b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/isolation/DelegatingClassLoaderTest.java
new file mode 100644
index 0000000..83ff040
--- /dev/null
+++ b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/isolation/DelegatingClassLoaderTest.java
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.connect.runtime.isolation;
+
+import org.junit.Test;
+
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+
+public class DelegatingClassLoaderTest {
+
+    @Test
+    public void testWhiteListedManifestResources() {
+        assertTrue(
+            DelegatingClassLoader.serviceLoaderManifestForPlugin("META-INF/services/org.apache.kafka.connect.rest.ConnectRestExtension"));
+        assertTrue(
+            DelegatingClassLoader.serviceLoaderManifestForPlugin("META-INF/services/org.apache.kafka.common.config.ConfigProvider"));
+    }
+
+    @Test
+    public void testOtherResources() {
+        assertFalse(
+            DelegatingClassLoader.serviceLoaderManifestForPlugin("META-INF/services/org.apache.kafka.connect.transforms.Transformation"));
+        assertFalse(DelegatingClassLoader.serviceLoaderManifestForPlugin("resource/version.properties"));
+    }
+}
diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/isolation/PluginUtilsTest.java b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/isolation/PluginUtilsTest.java
index a5ab50a..9698153 100644
--- a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/isolation/PluginUtilsTest.java
+++ b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/isolation/PluginUtilsTest.java
@@ -146,6 +146,9 @@ public class PluginUtilsTest {
         assertTrue(PluginUtils.shouldLoadInIsolation(
                 "org.apache.kafka.connect.storage.StringConverter")
         );
+        assertTrue(PluginUtils.shouldLoadInIsolation(
+            "org.apache.kafka.connect.rest.basic.auth.extension.BasicAuthSecurityRestExtension"
+        ));
     }
 
     @Test

-- 
To stop receiving notification emails like this one, please contact
ewencp@apache.org.