You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by gh...@apache.org on 2023/07/19 19:49:32 UTC
[kafka] branch trunk updated: KAFKA-15162: Reflectively find plugins in parent ClassLoaders that aren't on the classpath (#13977)
This is an automated email from the ASF dual-hosted git repository.
gharris pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new d5a00cca74d KAFKA-15162: Reflectively find plugins in parent ClassLoaders that aren't on the classpath (#13977)
d5a00cca74d is described below
commit d5a00cca74decaf7b44238bb6ac57e3c9f9c7218
Author: Greg Harris <gr...@aiven.io>
AuthorDate: Wed Jul 19 12:49:25 2023 -0700
KAFKA-15162: Reflectively find plugins in parent ClassLoaders that aren't on the classpath (#13977)
Signed-off-by: Greg Harris <gr...@aiven.io>
Reviewed-by: Chris Egerton <ch...@aiven.io>
---
.../apache/kafka/connect/runtime/isolation/PluginUtils.java | 6 ++++--
.../apache/kafka/connect/runtime/isolation/PluginsTest.java | 7 ++++++-
.../test/plugins/ReadVersionFromResource.java | 12 +++++++++++-
.../test/plugins/ReadVersionFromResource.java | 12 +++++++++++-
4 files changed, 32 insertions(+), 5 deletions(-)
diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/isolation/PluginUtils.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/isolation/PluginUtils.java
index e1ef76ebd2e..38c07d7b12b 100644
--- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/isolation/PluginUtils.java
+++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/isolation/PluginUtils.java
@@ -350,8 +350,10 @@ public class PluginUtils {
log.error("Could not get listing for plugin path: {}. Ignoring.", pluginLocation, e);
}
}
- URL[] classpathUrls = ClasspathHelper.forJavaClassPath().toArray(new URL[0]);
- pluginSources.add(new PluginSource(null, classLoader.getParent(), classpathUrls));
+ List<URL> parentUrls = new ArrayList<>();
+ parentUrls.addAll(ClasspathHelper.forJavaClassPath());
+ parentUrls.addAll(ClasspathHelper.forClassLoader(classLoader.getParent()));
+ pluginSources.add(new PluginSource(null, classLoader.getParent(), parentUrls.toArray(new URL[0])));
return pluginSources;
}
diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/isolation/PluginsTest.java b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/isolation/PluginsTest.java
index bb6d399c122..75418b27a6f 100644
--- a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/isolation/PluginsTest.java
+++ b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/isolation/PluginsTest.java
@@ -477,7 +477,7 @@ public class PluginsTest {
}
private void assertClassLoaderReadsVersionFromResource(
- TestPlugin parentResource, TestPlugin childResource, String className, String... expectedVersions) throws MalformedURLException {
+ TestPlugin parentResource, TestPlugin childResource, String className, String... expectedVersions) {
URL[] systemPath = TestPlugins.pluginPath(parentResource)
.stream()
.map(Path::toFile)
@@ -500,6 +500,11 @@ public class PluginsTest {
);
plugins = new Plugins(pluginProps, parent, new ClassLoaderFactory());
+ assertTrue("Should find plugin in plugin classloader",
+ plugins.converters().stream().anyMatch(desc -> desc.loader() instanceof PluginClassLoader));
+ assertTrue("Should find plugin in parent classloader",
+ plugins.converters().stream().anyMatch(desc -> parent.equals(desc.loader())));
+
Converter converter = plugins.newPlugin(
className,
new AbstractConfig(new ConfigDef(), Collections.emptyMap()),
diff --git a/connect/runtime/src/test/resources/test-plugins/read-version-from-resource-v1/test/plugins/ReadVersionFromResource.java b/connect/runtime/src/test/resources/test-plugins/read-version-from-resource-v1/test/plugins/ReadVersionFromResource.java
index 2f5f911f2ed..9f3de801f16 100644
--- a/connect/runtime/src/test/resources/test-plugins/read-version-from-resource-v1/test/plugins/ReadVersionFromResource.java
+++ b/connect/runtime/src/test/resources/test-plugins/read-version-from-resource-v1/test/plugins/ReadVersionFromResource.java
@@ -31,6 +31,7 @@ import java.net.URL;
import org.apache.kafka.connect.data.Schema;
import org.apache.kafka.connect.data.SchemaAndValue;
import org.apache.kafka.connect.storage.Converter;
+import org.apache.kafka.connect.components.Versioned;
/**
* Fake plugin class for testing classloading isolation
@@ -39,7 +40,7 @@ import org.apache.kafka.connect.storage.Converter;
* Exfiltrates data via {@link ReadVersionFromResource#fromConnectData(String, Schema, Object)}
* and {@link ReadVersionFromResource#toConnectData(String, byte[])}.
*/
-public class ReadVersionFromResource implements Converter {
+public class ReadVersionFromResource implements Converter, Versioned {
@Override
public void configure(final Map<String, ?> configs, final boolean isKey) {
@@ -78,4 +79,13 @@ public class ReadVersionFromResource implements Converter {
throw new AssertionError(e);
}
}
+
+ @Override
+ public String version() {
+ try (InputStream stream = this.getClass().getResourceAsStream("/version")) {
+ return version(stream);
+ } catch (IOException e) {
+ throw new AssertionError(e);
+ }
+ }
}
\ No newline at end of file
diff --git a/connect/runtime/src/test/resources/test-plugins/read-version-from-resource-v2/test/plugins/ReadVersionFromResource.java b/connect/runtime/src/test/resources/test-plugins/read-version-from-resource-v2/test/plugins/ReadVersionFromResource.java
index 9f450a342a2..caeb4340d6e 100644
--- a/connect/runtime/src/test/resources/test-plugins/read-version-from-resource-v2/test/plugins/ReadVersionFromResource.java
+++ b/connect/runtime/src/test/resources/test-plugins/read-version-from-resource-v2/test/plugins/ReadVersionFromResource.java
@@ -31,6 +31,7 @@ import java.net.URL;
import org.apache.kafka.connect.data.Schema;
import org.apache.kafka.connect.data.SchemaAndValue;
import org.apache.kafka.connect.storage.Converter;
+import org.apache.kafka.connect.components.Versioned;
/**
* Fake plugin class for testing classloading isolation.
@@ -39,7 +40,7 @@ import org.apache.kafka.connect.storage.Converter;
* Exfiltrates data via {@link ReadVersionFromResource#fromConnectData(String, Schema, Object)}
* and {@link ReadVersionFromResource#toConnectData(String, byte[])}.
*/
-public class ReadVersionFromResource implements Converter {
+public class ReadVersionFromResource implements Converter, Versioned {
@Override
public void configure(final Map<String, ?> configs, final boolean isKey) {
@@ -78,4 +79,13 @@ public class ReadVersionFromResource implements Converter {
throw new AssertionError(e);
}
}
+
+ @Override
+ public String version() {
+ try (InputStream stream = this.getClass().getResourceAsStream("/version")) {
+ return version(stream);
+ } catch (IOException e) {
+ throw new AssertionError(e);
+ }
+ }
}
\ No newline at end of file