You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by gh...@apache.org on 2023/07/19 19:49:32 UTC

[kafka] branch trunk updated: KAFKA-15162: Reflectively find plugins in parent ClassLoaders that aren't on the classpath (#13977)

This is an automated email from the ASF dual-hosted git repository.

gharris pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new d5a00cca74d KAFKA-15162: Reflectively find plugins in parent ClassLoaders that aren't on the classpath (#13977)
d5a00cca74d is described below

commit d5a00cca74decaf7b44238bb6ac57e3c9f9c7218
Author: Greg Harris <gr...@aiven.io>
AuthorDate: Wed Jul 19 12:49:25 2023 -0700

    KAFKA-15162: Reflectively find plugins in parent ClassLoaders that aren't on the classpath (#13977)
    
    Signed-off-by: Greg Harris <gr...@aiven.io>
    Reviewed-by: Chris Egerton <ch...@aiven.io>
---
 .../apache/kafka/connect/runtime/isolation/PluginUtils.java  |  6 ++++--
 .../apache/kafka/connect/runtime/isolation/PluginsTest.java  |  7 ++++++-
 .../test/plugins/ReadVersionFromResource.java                | 12 +++++++++++-
 .../test/plugins/ReadVersionFromResource.java                | 12 +++++++++++-
 4 files changed, 32 insertions(+), 5 deletions(-)

diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/isolation/PluginUtils.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/isolation/PluginUtils.java
index e1ef76ebd2e..38c07d7b12b 100644
--- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/isolation/PluginUtils.java
+++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/isolation/PluginUtils.java
@@ -350,8 +350,10 @@ public class PluginUtils {
                 log.error("Could not get listing for plugin path: {}. Ignoring.", pluginLocation, e);
             }
         }
-        URL[] classpathUrls = ClasspathHelper.forJavaClassPath().toArray(new URL[0]);
-        pluginSources.add(new PluginSource(null, classLoader.getParent(), classpathUrls));
+        List<URL> parentUrls = new ArrayList<>();
+        parentUrls.addAll(ClasspathHelper.forJavaClassPath());
+        parentUrls.addAll(ClasspathHelper.forClassLoader(classLoader.getParent()));
+        pluginSources.add(new PluginSource(null, classLoader.getParent(), parentUrls.toArray(new URL[0])));
         return pluginSources;
     }
 
diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/isolation/PluginsTest.java b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/isolation/PluginsTest.java
index bb6d399c122..75418b27a6f 100644
--- a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/isolation/PluginsTest.java
+++ b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/isolation/PluginsTest.java
@@ -477,7 +477,7 @@ public class PluginsTest {
     }
 
     private void assertClassLoaderReadsVersionFromResource(
-            TestPlugin parentResource, TestPlugin childResource, String className, String... expectedVersions) throws MalformedURLException {
+            TestPlugin parentResource, TestPlugin childResource, String className, String... expectedVersions) {
         URL[] systemPath = TestPlugins.pluginPath(parentResource)
                 .stream()
                 .map(Path::toFile)
@@ -500,6 +500,11 @@ public class PluginsTest {
         );
         plugins = new Plugins(pluginProps, parent, new ClassLoaderFactory());
 
+        assertTrue("Should find plugin in plugin classloader",
+                plugins.converters().stream().anyMatch(desc -> desc.loader() instanceof PluginClassLoader));
+        assertTrue("Should find plugin in parent classloader",
+                plugins.converters().stream().anyMatch(desc -> parent.equals(desc.loader())));
+
         Converter converter = plugins.newPlugin(
                 className,
                 new AbstractConfig(new ConfigDef(), Collections.emptyMap()),
diff --git a/connect/runtime/src/test/resources/test-plugins/read-version-from-resource-v1/test/plugins/ReadVersionFromResource.java b/connect/runtime/src/test/resources/test-plugins/read-version-from-resource-v1/test/plugins/ReadVersionFromResource.java
index 2f5f911f2ed..9f3de801f16 100644
--- a/connect/runtime/src/test/resources/test-plugins/read-version-from-resource-v1/test/plugins/ReadVersionFromResource.java
+++ b/connect/runtime/src/test/resources/test-plugins/read-version-from-resource-v1/test/plugins/ReadVersionFromResource.java
@@ -31,6 +31,7 @@ import java.net.URL;
 import org.apache.kafka.connect.data.Schema;
 import org.apache.kafka.connect.data.SchemaAndValue;
 import org.apache.kafka.connect.storage.Converter;
+import org.apache.kafka.connect.components.Versioned;
 
 /**
  * Fake plugin class for testing classloading isolation
@@ -39,7 +40,7 @@ import org.apache.kafka.connect.storage.Converter;
  * Exfiltrates data via {@link ReadVersionFromResource#fromConnectData(String, Schema, Object)}
  * and {@link ReadVersionFromResource#toConnectData(String, byte[])}.
  */
-public class ReadVersionFromResource implements Converter {
+public class ReadVersionFromResource implements Converter, Versioned {
     @Override
     public void configure(final Map<String, ?> configs, final boolean isKey) {
 
@@ -78,4 +79,13 @@ public class ReadVersionFromResource implements Converter {
             throw new AssertionError(e);
         }
     }
+
+    @Override
+    public String version() {
+        try (InputStream stream = this.getClass().getResourceAsStream("/version")) {
+            return version(stream);
+        } catch (IOException e) {
+            throw new AssertionError(e);
+        }
+    }
 }
\ No newline at end of file
diff --git a/connect/runtime/src/test/resources/test-plugins/read-version-from-resource-v2/test/plugins/ReadVersionFromResource.java b/connect/runtime/src/test/resources/test-plugins/read-version-from-resource-v2/test/plugins/ReadVersionFromResource.java
index 9f450a342a2..caeb4340d6e 100644
--- a/connect/runtime/src/test/resources/test-plugins/read-version-from-resource-v2/test/plugins/ReadVersionFromResource.java
+++ b/connect/runtime/src/test/resources/test-plugins/read-version-from-resource-v2/test/plugins/ReadVersionFromResource.java
@@ -31,6 +31,7 @@ import java.net.URL;
 import org.apache.kafka.connect.data.Schema;
 import org.apache.kafka.connect.data.SchemaAndValue;
 import org.apache.kafka.connect.storage.Converter;
+import org.apache.kafka.connect.components.Versioned;
 
 /**
  * Fake plugin class for testing classloading isolation.
@@ -39,7 +40,7 @@ import org.apache.kafka.connect.storage.Converter;
  * Exfiltrates data via {@link ReadVersionFromResource#fromConnectData(String, Schema, Object)}
  * and {@link ReadVersionFromResource#toConnectData(String, byte[])}.
  */
-public class ReadVersionFromResource implements Converter {
+public class ReadVersionFromResource implements Converter, Versioned {
     @Override
     public void configure(final Map<String, ?> configs, final boolean isKey) {
 
@@ -78,4 +79,13 @@ public class ReadVersionFromResource implements Converter {
             throw new AssertionError(e);
         }
     }
+
+    @Override
+    public String version() {
+        try (InputStream stream = this.getClass().getResourceAsStream("/version")) {
+            return version(stream);
+        } catch (IOException e) {
+            throw new AssertionError(e);
+        }
+    }
 }
\ No newline at end of file