You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by ew...@apache.org on 2017/05/18 17:39:23 UTC
[3/3] kafka git commit: KAFKA-3487: Support classloading isolation in
Connect (KIP-146)
KAFKA-3487: Support classloading isolation in Connect (KIP-146)
Author: Konstantine Karantasis <ko...@confluent.io>
Reviewers: Randall Hauch <rh...@gmail.com>, Ewen Cheslack-Postava <ew...@confluent.io>
Closes #3028 from kkonstantine/KAFKA-3487-Support-classloading-isolation-in-Connect
Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/45f22617
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/45f22617
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/45f22617
Branch: refs/heads/trunk
Commit: 45f2261763eac5caaebf860daab32ef5337c9293
Parents: 5aaaba7
Author: Konstantine Karantasis <ko...@confluent.io>
Authored: Thu May 18 10:39:15 2017 -0700
Committer: Ewen Cheslack-Postava <me...@ewencp.org>
Committed: Thu May 18 10:39:15 2017 -0700
----------------------------------------------------------------------
build.gradle | 1 +
checkstyle/import-control.xml | 5 +
checkstyle/suppressions.xml | 2 +-
config/connect-distributed.properties | 10 +
config/connect-standalone.properties | 11 +
.../kafka/connect/cli/ConnectDistributed.java | 7 +-
.../kafka/connect/cli/ConnectStandalone.java | 7 +-
.../kafka/connect/runtime/AbstractHerder.java | 91 +++---
.../kafka/connect/runtime/ConnectorConfig.java | 67 ++++-
.../apache/kafka/connect/runtime/Herder.java | 9 +-
.../kafka/connect/runtime/PluginDiscovery.java | 126 --------
.../connect/runtime/SinkConnectorConfig.java | 10 +-
.../connect/runtime/SourceConnectorConfig.java | 5 +-
.../apache/kafka/connect/runtime/Worker.java | 173 ++++++++---
.../kafka/connect/runtime/WorkerConfig.java | 31 +-
.../kafka/connect/runtime/WorkerSinkTask.java | 3 +-
.../kafka/connect/runtime/WorkerSourceTask.java | 3 +-
.../kafka/connect/runtime/WorkerTask.java | 12 +-
.../runtime/distributed/DistributedHerder.java | 4 +-
.../isolation/DelegatingClassLoader.java | 299 +++++++++++++++++++
.../runtime/isolation/PluginClassLoader.java | 68 +++++
.../connect/runtime/isolation/PluginDesc.java | 110 +++++++
.../runtime/isolation/PluginScanResult.java | 55 ++++
.../connect/runtime/isolation/PluginType.java | 58 ++++
.../connect/runtime/isolation/PluginUtils.java | 147 +++++++++
.../connect/runtime/isolation/Plugins.java | 217 ++++++++++++++
.../rest/entities/ConnectorPluginInfo.java | 37 +--
.../resources/ConnectorPluginsResource.java | 40 ++-
.../runtime/standalone/StandaloneHerder.java | 4 +-
.../connect/runtime/ConnectorConfigTest.java | 27 +-
.../connect/runtime/WorkerConnectorTest.java | 30 +-
.../connect/runtime/WorkerSinkTaskTest.java | 8 +-
.../runtime/WorkerSinkTaskThreadedTest.java | 7 +-
.../connect/runtime/WorkerSourceTaskTest.java | 5 +-
.../kafka/connect/runtime/WorkerTaskTest.java | 30 +-
.../kafka/connect/runtime/WorkerTest.java | 224 ++++++++++++--
.../distributed/DistributedHerderTest.java | 67 +++--
.../runtime/isolation/PluginUtilsTest.java | 127 ++++++++
.../resources/ConnectorPluginsResourceTest.java | 128 ++++++--
.../standalone/StandaloneHerderTest.java | 72 +++--
gradle/dependencies.gradle | 4 +-
41 files changed, 1937 insertions(+), 404 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/kafka/blob/45f22617/build.gradle
----------------------------------------------------------------------
diff --git a/build.gradle b/build.gradle
index 8d6e703..1693723 100644
--- a/build.gradle
+++ b/build.gradle
@@ -1056,6 +1056,7 @@ project(':connect:runtime') {
compile libs.jettyServlet
compile libs.jettyServlets
compile(libs.reflections)
+ compile(libs.mavenArtifact)
testCompile project(':clients').sourceSets.test.output
testCompile libs.easymock
http://git-wip-us.apache.org/repos/asf/kafka/blob/45f22617/checkstyle/import-control.xml
----------------------------------------------------------------------
diff --git a/checkstyle/import-control.xml b/checkstyle/import-control.xml
index 21d9d3c..7f51979 100644
--- a/checkstyle/import-control.xml
+++ b/checkstyle/import-control.xml
@@ -254,6 +254,11 @@
<allow pkg="org.glassfish.jersey" />
<allow pkg="com.fasterxml.jackson" />
</subpackage>
+
+ <subpackage name="isolation">
+ <allow pkg="com.fasterxml.jackson" />
+ <allow pkg="org.apache.maven.artifact.versioning" />
+ </subpackage>
</subpackage>
<subpackage name="cli">
http://git-wip-us.apache.org/repos/asf/kafka/blob/45f22617/checkstyle/suppressions.xml
----------------------------------------------------------------------
diff --git a/checkstyle/suppressions.xml b/checkstyle/suppressions.xml
index dc00bee..3b865bc 100644
--- a/checkstyle/suppressions.xml
+++ b/checkstyle/suppressions.xml
@@ -66,7 +66,7 @@
<!-- Connect -->
<suppress checks="ClassFanOutComplexity"
- files="DistributedHerder.java"/>
+ files="DistributedHerder(|Test).java"/>
<suppress checks="MethodLength"
files="(KafkaConfigBackingStore|RequestResponseTest).java"/>
http://git-wip-us.apache.org/repos/asf/kafka/blob/45f22617/config/connect-distributed.properties
----------------------------------------------------------------------
diff --git a/config/connect-distributed.properties b/config/connect-distributed.properties
index b0092bb..752e1f5 100644
--- a/config/connect-distributed.properties
+++ b/config/connect-distributed.properties
@@ -58,3 +58,13 @@ offset.flush.interval.ms=10000
# The Hostname & Port that will be given out to other workers to connect to i.e. URLs that are routable from other servers.
#rest.advertised.host.name=
#rest.advertised.port=
+
+# Set to a list of filesystem paths separated by commas (,) to enable class loading isolation for plugins
+# (connectors, converters, transformations). The list should consist of top level directories that include
+# any combination of:
+# a) directories immediately containing jars with plugins and their dependencies
+# b) uber-jars with plugins and their dependencies
+# c) directories immediately containing the package directory structure of classes of plugins and their dependencies
+# Examples:
+# plugin.path=/usr/local/share/java,/usr/local/share/kafka/plugins,/opt/connectors,
+#plugin.path=
http://git-wip-us.apache.org/repos/asf/kafka/blob/45f22617/config/connect-standalone.properties
----------------------------------------------------------------------
diff --git a/config/connect-standalone.properties b/config/connect-standalone.properties
index 8760590..0039796 100644
--- a/config/connect-standalone.properties
+++ b/config/connect-standalone.properties
@@ -35,3 +35,14 @@ internal.value.converter.schemas.enable=false
offset.storage.file.filename=/tmp/connect.offsets
# Flush much faster than normal, which is useful for testing/debugging
offset.flush.interval.ms=10000
+
+# Set to a list of filesystem paths separated by commas (,) to enable class loading isolation for plugins
+# (connectors, converters, transformations). The list should consist of top level directories that include
+# any combination of:
+# a) directories immediately containing jars with plugins and their dependencies
+# b) uber-jars with plugins and their dependencies
+# c) directories immediately containing the package directory structure of classes of plugins and their dependencies
+# Note: symlinks will be followed to discover dependencies or plugins.
+# Examples:
+# plugin.path=/usr/local/share/java,/usr/local/share/kafka/plugins,/opt/connectors,
+#plugin.path=
http://git-wip-us.apache.org/repos/asf/kafka/blob/45f22617/connect/runtime/src/main/java/org/apache/kafka/connect/cli/ConnectDistributed.java
----------------------------------------------------------------------
diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/cli/ConnectDistributed.java b/connect/runtime/src/main/java/org/apache/kafka/connect/cli/ConnectDistributed.java
index fb3d693..717ccd9 100644
--- a/connect/runtime/src/main/java/org/apache/kafka/connect/cli/ConnectDistributed.java
+++ b/connect/runtime/src/main/java/org/apache/kafka/connect/cli/ConnectDistributed.java
@@ -20,10 +20,10 @@ import org.apache.kafka.common.utils.Exit;
import org.apache.kafka.common.utils.Time;
import org.apache.kafka.common.utils.Utils;
import org.apache.kafka.connect.runtime.Connect;
-import org.apache.kafka.connect.runtime.ConnectorFactory;
import org.apache.kafka.connect.runtime.Worker;
import org.apache.kafka.connect.runtime.distributed.DistributedConfig;
import org.apache.kafka.connect.runtime.distributed.DistributedHerder;
+import org.apache.kafka.connect.runtime.isolation.Plugins;
import org.apache.kafka.connect.runtime.rest.RestServer;
import org.apache.kafka.connect.storage.ConfigBackingStore;
import org.apache.kafka.connect.storage.KafkaConfigBackingStore;
@@ -60,7 +60,8 @@ public class ConnectDistributed {
Utils.propsToStringMap(Utils.loadProps(workerPropsFile)) : Collections.<String, String>emptyMap();
Time time = Time.SYSTEM;
- ConnectorFactory connectorFactory = new ConnectorFactory();
+ Plugins plugins = new Plugins(workerProps);
+ plugins.compareAndSwapWithDelegatingLoader();
DistributedConfig config = new DistributedConfig(workerProps);
RestServer rest = new RestServer(config);
@@ -70,7 +71,7 @@ public class ConnectDistributed {
KafkaOffsetBackingStore offsetBackingStore = new KafkaOffsetBackingStore();
offsetBackingStore.configure(config);
- Worker worker = new Worker(workerId, time, connectorFactory, config, offsetBackingStore);
+ Worker worker = new Worker(workerId, time, plugins, config, offsetBackingStore);
StatusBackingStore statusBackingStore = new KafkaStatusBackingStore(time, worker.getInternalValueConverter());
statusBackingStore.configure(config);
http://git-wip-us.apache.org/repos/asf/kafka/blob/45f22617/connect/runtime/src/main/java/org/apache/kafka/connect/cli/ConnectStandalone.java
----------------------------------------------------------------------
diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/cli/ConnectStandalone.java b/connect/runtime/src/main/java/org/apache/kafka/connect/cli/ConnectStandalone.java
index 0465048..c6d0e59 100644
--- a/connect/runtime/src/main/java/org/apache/kafka/connect/cli/ConnectStandalone.java
+++ b/connect/runtime/src/main/java/org/apache/kafka/connect/cli/ConnectStandalone.java
@@ -21,9 +21,9 @@ import org.apache.kafka.common.utils.Time;
import org.apache.kafka.common.utils.Utils;
import org.apache.kafka.connect.runtime.Connect;
import org.apache.kafka.connect.runtime.ConnectorConfig;
-import org.apache.kafka.connect.runtime.ConnectorFactory;
import org.apache.kafka.connect.runtime.Herder;
import org.apache.kafka.connect.runtime.Worker;
+import org.apache.kafka.connect.runtime.isolation.Plugins;
import org.apache.kafka.connect.runtime.rest.RestServer;
import org.apache.kafka.connect.runtime.rest.entities.ConnectorInfo;
import org.apache.kafka.connect.runtime.standalone.StandaloneConfig;
@@ -65,14 +65,15 @@ public class ConnectStandalone {
Utils.propsToStringMap(Utils.loadProps(workerPropsFile)) : Collections.<String, String>emptyMap();
Time time = Time.SYSTEM;
- ConnectorFactory connectorFactory = new ConnectorFactory();
+ Plugins plugins = new Plugins(workerProps);
+ plugins.compareAndSwapWithDelegatingLoader();
StandaloneConfig config = new StandaloneConfig(workerProps);
RestServer rest = new RestServer(config);
URI advertisedUrl = rest.advertisedUrl();
String workerId = advertisedUrl.getHost() + ":" + advertisedUrl.getPort();
- Worker worker = new Worker(workerId, time, connectorFactory, config, new FileOffsetBackingStore());
+ Worker worker = new Worker(workerId, time, plugins, config, new FileOffsetBackingStore());
Herder herder = new StandaloneHerder(worker);
final Connect connect = new Connect(herder, rest);
http://git-wip-us.apache.org/repos/asf/kafka/blob/45f22617/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/AbstractHerder.java
----------------------------------------------------------------------
diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/AbstractHerder.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/AbstractHerder.java
index fb286e2..6293b01 100644
--- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/AbstractHerder.java
+++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/AbstractHerder.java
@@ -20,9 +20,11 @@ import org.apache.kafka.common.config.Config;
import org.apache.kafka.common.config.ConfigDef;
import org.apache.kafka.common.config.ConfigDef.ConfigKey;
import org.apache.kafka.common.config.ConfigDef.Type;
+import org.apache.kafka.common.config.ConfigException;
import org.apache.kafka.common.config.ConfigValue;
import org.apache.kafka.connect.connector.Connector;
import org.apache.kafka.connect.errors.NotFoundException;
+import org.apache.kafka.connect.runtime.isolation.Plugins;
import org.apache.kafka.connect.runtime.rest.entities.ConfigInfo;
import org.apache.kafka.connect.runtime.rest.entities.ConfigInfos;
import org.apache.kafka.connect.runtime.rest.entities.ConfigKeyInfo;
@@ -78,7 +80,6 @@ public abstract class AbstractHerder implements Herder, TaskStatus.Listener, Con
protected final ConfigBackingStore configBackingStore;
private Map<String, Connector> tempConnectors = new ConcurrentHashMap<>();
- private Thread classPathTraverser;
public AbstractHerder(Worker worker,
String workerId,
@@ -96,20 +97,12 @@ public abstract class AbstractHerder implements Herder, TaskStatus.Listener, Con
this.worker.start();
this.statusBackingStore.start();
this.configBackingStore.start();
- traverseClassPath();
}
protected void stopServices() {
this.statusBackingStore.stop();
this.configBackingStore.stop();
this.worker.stop();
- if (this.classPathTraverser != null) {
- try {
- this.classPathTraverser.join();
- } catch (InterruptedException e) {
- // ignore as it can only happen during shutdown
- }
- }
}
@Override
@@ -189,6 +182,11 @@ public abstract class AbstractHerder implements Herder, TaskStatus.Listener, Con
}
@Override
+ public Plugins plugins() {
+ return worker.getPlugins();
+ }
+
+ @Override
public ConnectorStateInfo connectorStatus(String connName) {
ConnectorStatus connector = statusBackingStore.get(connName);
if (connector == null)
@@ -233,32 +231,53 @@ public abstract class AbstractHerder implements Herder, TaskStatus.Listener, Con
if (connType == null)
throw new BadRequestException("Connector config " + connectorConfig + " contains no connector type");
- Connector connector = getConnector(connType);
-
- final ConfigDef connectorConfigDef = ConnectorConfig.enrich(
- (connector instanceof SourceConnector) ? SourceConnectorConfig.configDef() : SinkConnectorConfig.configDef(),
- connectorConfig,
- false
- );
-
List<ConfigValue> configValues = new ArrayList<>();
Map<String, ConfigKey> configKeys = new HashMap<>();
List<String> allGroups = new ArrayList<>();
- // do basic connector validation (name, connector type, etc.)
- Map<String, ConfigValue> validatedConnectorConfig = validateBasicConnectorConfig(connector, connectorConfigDef, connectorConfig);
- configValues.addAll(validatedConnectorConfig.values());
- configKeys.putAll(connectorConfigDef.configKeys());
- allGroups.addAll(connectorConfigDef.groups());
-
- // do custom connector-specific validation
- Config config = connector.validate(connectorConfig);
- ConfigDef configDef = connector.config();
- configKeys.putAll(configDef.configKeys());
- allGroups.addAll(configDef.groups());
- configValues.addAll(config.configValues());
+ Connector connector = getConnector(connType);
+ ClassLoader savedLoader = worker.getPlugins().compareAndSwapLoaders(connector);
+ try {
+ // do basic connector validation (name, connector type, etc.)
+ ConfigDef basicConfigDef = (connector instanceof SourceConnector)
+ ? SourceConnectorConfig.configDef()
+ : SinkConnectorConfig.configDef();
+ Map<String, ConfigValue> validatedConnectorConfig = validateBasicConnectorConfig(
+ connector,
+ basicConfigDef,
+ connectorConfig
+ );
+ configValues.addAll(validatedConnectorConfig.values());
+ configKeys.putAll(basicConfigDef.configKeys());
+ allGroups.addAll(basicConfigDef.groups());
+
+ ConnectorConfig connectorConfigToEnrich = (connector instanceof SourceConnector)
+ ? new SourceConnectorConfig(plugins(), connectorConfig)
+ : new SinkConnectorConfig(plugins(), connectorConfig);
+ final ConfigDef connectorConfigDef = connectorConfigToEnrich.enrich(
+ plugins(),
+ basicConfigDef,
+ connectorConfig,
+ false
+ );
- return generateResult(connType, configKeys, configValues, allGroups);
+ // Override is required here after the enriched ConfigDef has been created successfully
+ configKeys.putAll(connectorConfigDef.configKeys());
+ allGroups.addAll(connectorConfigDef.groups());
+
+ // do custom connector-specific validation
+ Config config = connector.validate(connectorConfig);
+ ConfigDef configDef = connector.config();
+ configKeys.putAll(configDef.configKeys());
+ allGroups.addAll(configDef.groups());
+ configValues.addAll(config.configValues());
+ return generateResult(connType, configKeys, configValues, allGroups);
+ } catch (ConfigException e) {
+ // Basic validation must have failed. Return the result.
+ return generateResult(connType, configKeys, configValues, allGroups);
+ } finally {
+ Plugins.compareAndSwapLoaders(savedLoader);
+ }
}
// public for testing
@@ -334,7 +353,7 @@ public abstract class AbstractHerder implements Herder, TaskStatus.Listener, Con
if (tempConnectors.containsKey(connType)) {
return tempConnectors.get(connType);
} else {
- Connector connector = worker.getConnectorFactory().newConnector(connType);
+ Connector connector = worker.getPlugins().newConnector(connType);
tempConnectors.put(connType, connector);
return connector;
}
@@ -383,14 +402,4 @@ public abstract class AbstractHerder implements Herder, TaskStatus.Listener, Con
return null;
}
}
-
- private void traverseClassPath() {
- classPathTraverser = new Thread(new Runnable() {
- @Override
- public void run() {
- PluginDiscovery.scanClasspathForPlugins();
- }
- }, "CLASSPATH traversal thread.");
- classPathTraverser.start();
- }
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/45f22617/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/ConnectorConfig.java
----------------------------------------------------------------------
diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/ConnectorConfig.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/ConnectorConfig.java
index 74aef62..869cfbd 100644
--- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/ConnectorConfig.java
+++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/ConnectorConfig.java
@@ -24,6 +24,8 @@ import org.apache.kafka.common.config.ConfigDef.Width;
import org.apache.kafka.common.config.ConfigException;
import org.apache.kafka.connect.connector.ConnectRecord;
import org.apache.kafka.connect.errors.ConnectException;
+import org.apache.kafka.connect.runtime.isolation.PluginDesc;
+import org.apache.kafka.connect.runtime.isolation.Plugins;
import org.apache.kafka.connect.transforms.Transformation;
import java.util.ArrayList;
@@ -81,6 +83,17 @@ public class ConnectorConfig extends AbstractConfig {
private static final String TRANSFORMS_DOC = "Aliases for the transformations to be applied to records.";
private static final String TRANSFORMS_DISPLAY = "Transforms";
+ private final EnrichedConnectorConfig enrichedConfig;
+ private static class EnrichedConnectorConfig extends AbstractConfig {
+ EnrichedConnectorConfig(ConfigDef configDef, Map<String, String> props) {
+ super(configDef, props);
+ }
+
+ public Object get(String key) {
+ return super.get(key);
+ }
+ }
+
public static ConfigDef configDef() {
return new ConfigDef()
.define(NAME_CONFIG, Type.STRING, Importance.HIGH, NAME_DOC, COMMON_GROUP, 1, Width.MEDIUM, NAME_DISPLAY)
@@ -100,16 +113,25 @@ public class ConnectorConfig extends AbstractConfig {
}, Importance.LOW, TRANSFORMS_DOC, TRANSFORMS_GROUP, 6, Width.LONG, TRANSFORMS_DISPLAY);
}
- public ConnectorConfig() {
- this(new HashMap<String, String>());
+ public ConnectorConfig(Plugins plugins) {
+ this(plugins, new HashMap<String, String>());
+ }
+
+ public ConnectorConfig(Plugins plugins, Map<String, String> props) {
+ this(plugins, configDef(), props);
}
- public ConnectorConfig(Map<String, String> props) {
- this(configDef(), props);
+ public ConnectorConfig(Plugins plugins, ConfigDef configDef, Map<String, String> props) {
+ super(configDef, props);
+ enrichedConfig = new EnrichedConnectorConfig(
+ enrich(plugins, configDef, props, true),
+ props
+ );
}
- public ConnectorConfig(ConfigDef configDef, Map<String, String> props) {
- super(enrich(configDef, props, true), props);
+ @Override
+ public Object get(String key) {
+ return enrichedConfig.get(key);
}
/**
@@ -142,15 +164,20 @@ public class ConnectorConfig extends AbstractConfig {
* <p>
* {@code requireFullConfig} specifies whether required config values that are missing should cause an exception to be thrown.
*/
- public static ConfigDef enrich(ConfigDef baseConfigDef, Map<String, String> props, boolean requireFullConfig) {
- final List<String> transformAliases = (List<String>) ConfigDef.parseType(TRANSFORMS_CONFIG, props.get(TRANSFORMS_CONFIG), Type.LIST);
- if (transformAliases == null || transformAliases.isEmpty()) {
+ public ConfigDef enrich(Plugins plugins, ConfigDef baseConfigDef, Map<String, String> props, boolean requireFullConfig) {
+ Object transformAliases = ConfigDef.parseType(TRANSFORMS_CONFIG, props.get(TRANSFORMS_CONFIG), Type.LIST);
+ if (!(transformAliases instanceof List)) {
return baseConfigDef;
}
- final ConfigDef newDef = new ConfigDef(baseConfigDef);
-
- for (String alias : new LinkedHashSet<>(transformAliases)) {
+ ConfigDef newDef = new ConfigDef(baseConfigDef);
+ LinkedHashSet<?> uniqueTransformAliases = new LinkedHashSet<>((List<?>) transformAliases);
+ for (Object o : uniqueTransformAliases) {
+ if (!(o instanceof String)) {
+ throw new ConfigException("Item in " + TRANSFORMS_CONFIG + " property is not of "
+ + "type String");
+ }
+ String alias = (String) o;
final String prefix = TRANSFORMS_CONFIG + "." + alias + ".";
final String group = TRANSFORMS_GROUP + ": " + alias;
int orderInGroup = 0;
@@ -164,7 +191,7 @@ public class ConnectorConfig extends AbstractConfig {
};
newDef.define(transformationTypeConfig, Type.CLASS, ConfigDef.NO_DEFAULT_VALUE, typeValidator, Importance.HIGH,
"Class for the '" + alias + "' transformation.", group, orderInGroup++, Width.LONG, "Transformation type for " + alias,
- Collections.<String>emptyList(), new TransformationClassRecommender());
+ Collections.<String>emptyList(), new TransformationClassRecommender(plugins));
final ConfigDef transformationConfigDef;
try {
@@ -204,9 +231,19 @@ public class ConnectorConfig extends AbstractConfig {
* Recommend bundled transformations.
*/
static final class TransformationClassRecommender implements ConfigDef.Recommender {
+ private final Plugins plugins;
+
+ TransformationClassRecommender(Plugins plugins) {
+ this.plugins = plugins;
+ }
+
@Override
public List<Object> validValues(String name, Map<String, Object> parsedConfig) {
- return (List) PluginDiscovery.transformationPlugins();
+ List<Object> transformationPlugins = new ArrayList<>();
+ for (PluginDesc<Transformation> plugin : plugins.transformations()) {
+ transformationPlugins.add(plugin.pluginClass());
+ }
+ return Collections.unmodifiableList(transformationPlugins);
}
@Override
@@ -215,4 +252,4 @@ public class ConnectorConfig extends AbstractConfig {
}
}
-}
\ No newline at end of file
+}
http://git-wip-us.apache.org/repos/asf/kafka/blob/45f22617/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Herder.java
----------------------------------------------------------------------
diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Herder.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Herder.java
index 93fc6f0..5dfb808 100644
--- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Herder.java
+++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Herder.java
@@ -16,6 +16,7 @@
*/
package org.apache.kafka.connect.runtime;
+import org.apache.kafka.connect.runtime.isolation.Plugins;
import org.apache.kafka.connect.runtime.rest.entities.ConfigInfos;
import org.apache.kafka.connect.runtime.rest.entities.ConnectorInfo;
import org.apache.kafka.connect.runtime.rest.entities.ConnectorStateInfo;
@@ -168,6 +169,12 @@ public interface Herder {
*/
void resumeConnector(String connector);
+ /**
+ * Returns a handle to the plugin factory used by this herder and its worker.
+ *
+ * @return a reference to the plugin factory.
+ */
+ Plugins plugins();
class Created<T> {
private final boolean created;
@@ -200,4 +207,4 @@ public interface Herder {
return Objects.hash(created, result);
}
}
-}
\ No newline at end of file
+}
http://git-wip-us.apache.org/repos/asf/kafka/blob/45f22617/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/PluginDiscovery.java
----------------------------------------------------------------------
diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/PluginDiscovery.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/PluginDiscovery.java
deleted file mode 100644
index 482139a..0000000
--- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/PluginDiscovery.java
+++ /dev/null
@@ -1,126 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.kafka.connect.runtime;
-
-import org.apache.kafka.connect.connector.Connector;
-import org.apache.kafka.connect.runtime.rest.entities.ConnectorPluginInfo;
-import org.apache.kafka.connect.tools.MockConnector;
-import org.apache.kafka.connect.tools.MockSinkConnector;
-import org.apache.kafka.connect.tools.MockSourceConnector;
-import org.apache.kafka.connect.tools.SchemaSourceConnector;
-import org.apache.kafka.connect.tools.VerifiableSinkConnector;
-import org.apache.kafka.connect.tools.VerifiableSourceConnector;
-import org.apache.kafka.connect.transforms.Transformation;
-import org.apache.kafka.connect.util.ReflectionsUtil;
-import org.reflections.Reflections;
-import org.reflections.util.ClasspathHelper;
-import org.reflections.util.ConfigurationBuilder;
-
-import java.lang.reflect.Modifier;
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.Collections;
-import java.util.Comparator;
-import java.util.List;
-import java.util.Set;
-
-public class PluginDiscovery {
-
- private static final List<Class<? extends Connector>> CONNECTOR_EXCLUDES = Arrays.asList(
- VerifiableSourceConnector.class, VerifiableSinkConnector.class,
- MockConnector.class, MockSourceConnector.class, MockSinkConnector.class,
- SchemaSourceConnector.class
- );
-
- private static final List<Class<? extends Transformation>> TRANSFORMATION_EXCLUDES = Arrays.asList();
-
- private static boolean scanned = false;
- private static List<ConnectorPluginInfo> validConnectorPlugins;
- private static List<Class<? extends Transformation>> validTransformationPlugins;
-
- public static synchronized List<ConnectorPluginInfo> connectorPlugins() {
- scanClasspathForPlugins();
- return validConnectorPlugins;
- }
-
- public static synchronized List<Class<? extends Transformation>> transformationPlugins() {
- scanClasspathForPlugins();
- return validTransformationPlugins;
- }
-
- public static synchronized void scanClasspathForPlugins() {
- if (scanned) return;
- ReflectionsUtil.registerUrlTypes();
- final Reflections reflections = new Reflections(new ConfigurationBuilder().setUrls(ClasspathHelper.forJavaClassPath()));
- validConnectorPlugins = Collections.unmodifiableList(connectorPlugins(reflections));
- validTransformationPlugins = Collections.unmodifiableList(transformationPlugins(reflections));
- scanned = true;
- }
-
- private static List<ConnectorPluginInfo> connectorPlugins(Reflections reflections) {
- final Set<Class<? extends Connector>> connectorClasses = reflections.getSubTypesOf(Connector.class);
- connectorClasses.removeAll(CONNECTOR_EXCLUDES);
-
- final List<ConnectorPluginInfo> connectorPlugins = new ArrayList<>(connectorClasses.size());
- for (Class<? extends Connector> connectorClass : connectorClasses) {
- if (isConcrete(connectorClass)) {
- connectorPlugins.add(new ConnectorPluginInfo(connectorClass));
- }
- }
-
- Collections.sort(connectorPlugins, new Comparator<ConnectorPluginInfo>() {
- @Override
- public int compare(ConnectorPluginInfo a, ConnectorPluginInfo b) {
- return a.className().compareTo(b.className());
- }
- });
-
- return connectorPlugins;
- }
-
- private static List<Class<? extends Transformation>> transformationPlugins(Reflections reflections) {
- final Set<Class<? extends Transformation>> transformationClasses = reflections.getSubTypesOf(Transformation.class);
- transformationClasses.removeAll(TRANSFORMATION_EXCLUDES);
-
- final List<Class<? extends Transformation>> transformationPlugins = new ArrayList<>(transformationClasses.size());
- for (Class<? extends Transformation> transformationClass : transformationClasses) {
- if (isConcrete(transformationClass)) {
- transformationPlugins.add(transformationClass);
- }
- }
-
- Collections.sort(transformationPlugins, new Comparator<Class<? extends Transformation>>() {
- @Override
- public int compare(Class<? extends Transformation> a, Class<? extends Transformation> b) {
- return a.getName().compareTo(b.getName());
- }
- });
-
- return transformationPlugins;
- }
-
- private static boolean isConcrete(Class<?> cls) {
- final int mod = cls.getModifiers();
- return !Modifier.isAbstract(mod) && !Modifier.isInterface(mod);
- }
-
- public static void main(String... args) {
- System.out.println(connectorPlugins());
- System.out.println(transformationPlugins());
- }
-
-}
http://git-wip-us.apache.org/repos/asf/kafka/blob/45f22617/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/SinkConnectorConfig.java
----------------------------------------------------------------------
diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/SinkConnectorConfig.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/SinkConnectorConfig.java
index 21abdd0..e47d537 100644
--- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/SinkConnectorConfig.java
+++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/SinkConnectorConfig.java
@@ -17,8 +17,8 @@
package org.apache.kafka.connect.runtime;
import org.apache.kafka.common.config.ConfigDef;
+import org.apache.kafka.connect.runtime.isolation.Plugins;
-import java.util.HashMap;
import java.util.Map;
/**
@@ -35,15 +35,11 @@ public class SinkConnectorConfig extends ConnectorConfig {
static ConfigDef config = ConnectorConfig.configDef()
.define(TOPICS_CONFIG, ConfigDef.Type.LIST, TOPICS_DEFAULT, ConfigDef.Importance.HIGH, TOPICS_DOC, COMMON_GROUP, 4, ConfigDef.Width.LONG, TOPICS_DISPLAY);
- public SinkConnectorConfig() {
- this(new HashMap<String, String>());
- }
-
public static ConfigDef configDef() {
return config;
}
- public SinkConnectorConfig(Map<String, String> props) {
- super(config, props);
+ public SinkConnectorConfig(Plugins plugins, Map<String, String> props) {
+ super(plugins, config, props);
}
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/45f22617/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/SourceConnectorConfig.java
----------------------------------------------------------------------
diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/SourceConnectorConfig.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/SourceConnectorConfig.java
index 651ac74..6915421 100644
--- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/SourceConnectorConfig.java
+++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/SourceConnectorConfig.java
@@ -17,6 +17,7 @@
package org.apache.kafka.connect.runtime;
import org.apache.kafka.common.config.ConfigDef;
+import org.apache.kafka.connect.runtime.isolation.Plugins;
import java.util.Map;
@@ -24,7 +25,7 @@ public class SourceConnectorConfig extends ConnectorConfig {
private static ConfigDef config = configDef();
- public SourceConnectorConfig(Map<String, String> props) {
- super(config, props);
+ public SourceConnectorConfig(Plugins plugins, Map<String, String> props) {
+ super(plugins, config, props);
}
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/45f22617/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Worker.java
----------------------------------------------------------------------
diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Worker.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Worker.java
index 400ae08..12802c1 100644
--- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Worker.java
+++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Worker.java
@@ -24,6 +24,7 @@ import org.apache.kafka.connect.connector.Connector;
import org.apache.kafka.connect.connector.ConnectorContext;
import org.apache.kafka.connect.connector.Task;
import org.apache.kafka.connect.errors.ConnectException;
+import org.apache.kafka.connect.runtime.isolation.Plugins;
import org.apache.kafka.connect.sink.SinkRecord;
import org.apache.kafka.connect.sink.SinkTask;
import org.apache.kafka.connect.source.SourceRecord;
@@ -65,7 +66,7 @@ public class Worker {
private final ExecutorService executor;
private final Time time;
private final String workerId;
- private final ConnectorFactory connectorFactory;
+ private final Plugins plugins;
private final WorkerConfig config;
private final Converter defaultKeyConverter;
private final Converter defaultValueConverter;
@@ -78,20 +79,45 @@ public class Worker {
private final ConcurrentMap<ConnectorTaskId, WorkerTask> tasks = new ConcurrentHashMap<>();
private SourceTaskOffsetCommitter sourceTaskOffsetCommitter;
- public Worker(String workerId, Time time, ConnectorFactory connectorFactory, WorkerConfig config, OffsetBackingStore offsetBackingStore) {
+ public Worker(
+ String workerId,
+ Time time,
+ Plugins plugins,
+ WorkerConfig config,
+ OffsetBackingStore offsetBackingStore
+ ) {
this.executor = Executors.newCachedThreadPool();
this.workerId = workerId;
this.time = time;
- this.connectorFactory = connectorFactory;
+ this.plugins = plugins;
this.config = config;
- this.defaultKeyConverter = config.getConfiguredInstance(WorkerConfig.KEY_CONVERTER_CLASS_CONFIG, Converter.class);
+ // Converters are required properties, thus getClass won't return null.
+ this.defaultKeyConverter = plugins.newConverter(
+ config.getClass(WorkerConfig.KEY_CONVERTER_CLASS_CONFIG).getName(),
+ config
+ );
this.defaultKeyConverter.configure(config.originalsWithPrefix("key.converter."), true);
- this.defaultValueConverter = config.getConfiguredInstance(WorkerConfig.VALUE_CONVERTER_CLASS_CONFIG, Converter.class);
+ this.defaultValueConverter = plugins.newConverter(
+ config.getClass(WorkerConfig.VALUE_CONVERTER_CLASS_CONFIG).getName(),
+ config
+ );
this.defaultValueConverter.configure(config.originalsWithPrefix("value.converter."), false);
- this.internalKeyConverter = config.getConfiguredInstance(WorkerConfig.INTERNAL_KEY_CONVERTER_CLASS_CONFIG, Converter.class);
- this.internalKeyConverter.configure(config.originalsWithPrefix("internal.key.converter."), true);
- this.internalValueConverter = config.getConfiguredInstance(WorkerConfig.INTERNAL_VALUE_CONVERTER_CLASS_CONFIG, Converter.class);
- this.internalValueConverter.configure(config.originalsWithPrefix("internal.value.converter."), false);
+ // Same, internal converters are required properties, thus getClass won't return null.
+ this.internalKeyConverter = plugins.newConverter(
+ config.getClass(WorkerConfig.INTERNAL_KEY_CONVERTER_CLASS_CONFIG).getName(),
+ config
+ );
+ this.internalKeyConverter.configure(
+ config.originalsWithPrefix("internal.key.converter."),
+ true);
+ this.internalValueConverter = plugins.newConverter(
+ config.getClass(WorkerConfig.INTERNAL_VALUE_CONVERTER_CLASS_CONFIG).getName(),
+ config
+ );
+ this.internalValueConverter.configure(
+ config.originalsWithPrefix("internal.value.converter."),
+ false
+ );
this.offsetBackingStore = offsetBackingStore;
this.offsetBackingStore.configure(config);
@@ -171,17 +197,23 @@ public class Worker {
throw new ConnectException("Connector with name " + connName + " already exists");
final WorkerConnector workerConnector;
+ ClassLoader savedLoader = plugins.currentThreadLoader();
try {
- final ConnectorConfig connConfig = new ConnectorConfig(connProps);
+ final ConnectorConfig connConfig = new ConnectorConfig(plugins, connProps);
final String connClass = connConfig.getString(ConnectorConfig.CONNECTOR_CLASS_CONFIG);
log.info("Creating connector {} of type {}", connName, connClass);
- final Connector connector = connectorFactory.newConnector(connClass);
+ final Connector connector = plugins.newConnector(connClass);
workerConnector = new WorkerConnector(connName, connector, ctx, statusListener);
log.info("Instantiated connector {} with version {} of type {}", connName, connector.version(), connector.getClass());
+ savedLoader = plugins.compareAndSwapLoaders(connector);
workerConnector.initialize(connConfig);
workerConnector.transitionTo(initialState);
+ Plugins.compareAndSwapLoaders(savedLoader);
} catch (Throwable t) {
log.error("Failed to start connector {}", connName, t);
+ // Can't be put in a finally block because it needs to be swapped before the call on
+ // statusListener
+ Plugins.compareAndSwapLoaders(savedLoader);
statusListener.onFailure(connName, t);
return false;
}
@@ -205,7 +237,14 @@ public class Worker {
WorkerConnector workerConnector = connectors.get(connName);
if (workerConnector == null)
throw new ConnectException("Connector " + connName + " not found in this worker.");
- return workerConnector.isSinkConnector();
+
+ ClassLoader savedLoader = plugins.currentThreadLoader();
+ try {
+ savedLoader = plugins.compareAndSwapLoaders(workerConnector.connector());
+ return workerConnector.isSinkConnector();
+ } finally {
+ Plugins.compareAndSwapLoaders(savedLoader);
+ }
}
/**
@@ -225,14 +264,23 @@ public class Worker {
Connector connector = workerConnector.connector();
List<Map<String, String>> result = new ArrayList<>();
- String taskClassName = connector.taskClass().getName();
- for (Map<String, String> taskProps : connector.taskConfigs(maxTasks)) {
- Map<String, String> taskConfig = new HashMap<>(taskProps); // Ensure we don't modify the connector's copy of the config
- taskConfig.put(TaskConfig.TASK_CLASS_CONFIG, taskClassName);
- if (sinkTopics != null)
- taskConfig.put(SinkTask.TOPICS_CONFIG, Utils.join(sinkTopics, ","));
- result.add(taskConfig);
+ ClassLoader savedLoader = plugins.currentThreadLoader();
+ try {
+ savedLoader = plugins.compareAndSwapLoaders(connector);
+ String taskClassName = connector.taskClass().getName();
+ for (Map<String, String> taskProps : connector.taskConfigs(maxTasks)) {
+ // Ensure we don't modify the connector's copy of the config
+ Map<String, String> taskConfig = new HashMap<>(taskProps);
+ taskConfig.put(TaskConfig.TASK_CLASS_CONFIG, taskClassName);
+ if (sinkTopics != null) {
+ taskConfig.put(SinkTask.TOPICS_CONFIG, Utils.join(sinkTopics, ","));
+ }
+ result.add(taskConfig);
+ }
+ } finally {
+ Plugins.compareAndSwapLoaders(savedLoader);
}
+
return result;
}
@@ -252,13 +300,19 @@ public class Worker {
public boolean stopConnector(String connName) {
log.info("Stopping connector {}", connName);
- WorkerConnector connector = connectors.remove(connName);
- if (connector == null) {
+ WorkerConnector workerConnector = connectors.remove(connName);
+ if (workerConnector == null) {
log.warn("Ignoring stop request for unowned connector {}", connName);
return false;
}
- connector.shutdown();
+ ClassLoader savedLoader = plugins.currentThreadLoader();
+ try {
+ savedLoader = plugins.compareAndSwapLoaders(workerConnector.connector());
+ workerConnector.shutdown();
+ } finally {
+ Plugins.compareAndSwapLoaders(savedLoader);
+ }
log.info("Stopped connector {}", connName);
return true;
@@ -280,8 +334,8 @@ public class Worker {
* @return true if the connector is running, false if the connector is not running or is not manages by this worker.
*/
public boolean isRunning(String connName) {
- WorkerConnector connector = connectors.get(connName);
- return connector != null && connector.isRunning();
+ WorkerConnector workerConnector = connectors.get(connName);
+ return workerConnector != null && workerConnector.isRunning();
}
/**
@@ -307,14 +361,20 @@ public class Worker {
throw new ConnectException("Task already exists in this worker: " + id);
final WorkerTask workerTask;
+ ClassLoader savedLoader = plugins.currentThreadLoader();
try {
- final ConnectorConfig connConfig = new ConnectorConfig(connProps);
+ final ConnectorConfig connConfig = new ConnectorConfig(plugins, connProps);
+ String connType = connConfig.getString(ConnectorConfig.CONNECTOR_CLASS_CONFIG);
+ ClassLoader connectorLoader = plugins.delegatingLoader().connectorLoader(connType);
+ savedLoader = Plugins.compareAndSwapLoaders(connectorLoader);
final TaskConfig taskConfig = new TaskConfig(taskProps);
-
final Class<? extends Task> taskClass = taskConfig.getClass(TaskConfig.TASK_CLASS_CONFIG).asSubclass(Task.class);
- final Task task = connectorFactory.newTask(taskClass);
+ final Task task = plugins.newTask(taskClass);
log.info("Instantiated task {} with version {} of type {}", id, task.version(), taskClass.getName());
+ // By maintaining connector's specific class loader for this thread here, we first
+ // search for converters within the connector dependencies, and if not found the
+ // plugin class loader delegates loading to the delegating classloader.
Converter keyConverter = connConfig.getConfiguredInstance(WorkerConfig.KEY_CONVERTER_CLASS_CONFIG, Converter.class);
if (keyConverter != null)
keyConverter.configure(connConfig.originalsWithPrefix("key.converter."), true);
@@ -326,10 +386,14 @@ public class Worker {
else
valueConverter = defaultValueConverter;
- workerTask = buildWorkerTask(connConfig, id, task, statusListener, initialState, keyConverter, valueConverter);
+ workerTask = buildWorkerTask(connConfig, id, task, statusListener, initialState, keyConverter, valueConverter, connectorLoader);
workerTask.initialize(taskConfig);
+ Plugins.compareAndSwapLoaders(savedLoader);
} catch (Throwable t) {
log.error("Failed to start task {}", id, t);
+ // Can't be put in a finally block because it needs to be swapped before the call on
+ // statusListener
+ Plugins.compareAndSwapLoaders(savedLoader);
statusListener.onFailure(id, t);
return false;
}
@@ -351,7 +415,8 @@ public class Worker {
TaskStatus.Listener statusListener,
TargetState initialState,
Converter keyConverter,
- Converter valueConverter) {
+ Converter valueConverter,
+ ClassLoader loader) {
// Decide which type of worker task we need based on the type of task.
if (task instanceof SourceTask) {
TransformationChain<SourceRecord> transformationChain = new TransformationChain<>(connConfig.<SourceRecord>transformations());
@@ -361,11 +426,11 @@ public class Worker {
internalKeyConverter, internalValueConverter);
KafkaProducer<byte[], byte[]> producer = new KafkaProducer<>(producerProps);
return new WorkerSourceTask(id, (SourceTask) task, statusListener, initialState, keyConverter,
- valueConverter, transformationChain, producer, offsetReader, offsetWriter, config, time);
+ valueConverter, transformationChain, producer, offsetReader, offsetWriter, config, loader, time);
} else if (task instanceof SinkTask) {
TransformationChain<SinkRecord> transformationChain = new TransformationChain<>(connConfig.<SinkRecord>transformations());
return new WorkerSinkTask(id, (SinkTask) task, statusListener, initialState, config, keyConverter,
- valueConverter, transformationChain, time);
+ valueConverter, transformationChain, loader, time);
} else {
log.error("Tasks must be a subclass of either SourceTask or SinkTask", task);
throw new ConnectException("Tasks must be a subclass of either SourceTask or SinkTask");
@@ -382,7 +447,14 @@ public class Worker {
log.info("Stopping task {}", task.id());
if (task instanceof WorkerSourceTask)
sourceTaskOffsetCommitter.remove(task.id());
- task.stop();
+
+ ClassLoader savedLoader = plugins.currentThreadLoader();
+ try {
+ savedLoader = Plugins.compareAndSwapLoaders(task.loader());
+ task.stop();
+ } finally {
+ Plugins.compareAndSwapLoaders(savedLoader);
+ }
}
private void stopTasks(Collection<ConnectorTaskId> ids) {
@@ -457,8 +529,8 @@ public class Worker {
return internalValueConverter;
}
- public ConnectorFactory getConnectorFactory() {
- return connectorFactory;
+ public Plugins getPlugins() {
+ return plugins;
}
public String workerId() {
@@ -468,14 +540,37 @@ public class Worker {
public void setTargetState(String connName, TargetState state) {
log.info("Setting connector {} state to {}", connName, state);
- WorkerConnector connector = connectors.get(connName);
- if (connector != null)
- connector.transitionTo(state);
+ WorkerConnector workerConnector = connectors.get(connName);
+ if (workerConnector != null) {
+ ClassLoader connectorLoader =
+ plugins.delegatingLoader().connectorLoader(workerConnector.connector());
+ transitionTo(workerConnector, state, connectorLoader);
+ }
for (Map.Entry<ConnectorTaskId, WorkerTask> taskEntry : tasks.entrySet()) {
- if (taskEntry.getKey().connector().equals(connName))
- taskEntry.getValue().transitionTo(state);
+ if (taskEntry.getKey().connector().equals(connName)) {
+ WorkerTask workerTask = taskEntry.getValue();
+ transitionTo(workerTask, state, workerTask.loader());
+ }
}
}
+ private void transitionTo(Object connectorOrTask, TargetState state, ClassLoader loader) {
+ ClassLoader savedLoader = plugins.currentThreadLoader();
+ try {
+ savedLoader = Plugins.compareAndSwapLoaders(loader);
+ if (connectorOrTask instanceof WorkerConnector) {
+ ((WorkerConnector) connectorOrTask).transitionTo(state);
+ } else if (connectorOrTask instanceof WorkerTask) {
+ ((WorkerTask) connectorOrTask).transitionTo(state);
+ } else {
+ throw new ConnectException(
+ "Request for state transition on an object that is neither a "
+ + "WorkerConnector nor a WorkerTask: "
+ + connectorOrTask.getClass());
+ }
+ } finally {
+ Plugins.compareAndSwapLoaders(savedLoader);
+ }
+ }
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/45f22617/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerConfig.java
----------------------------------------------------------------------
diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerConfig.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerConfig.java
index 680edaf..fe7a35a 100644
--- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerConfig.java
+++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerConfig.java
@@ -21,6 +21,9 @@ import org.apache.kafka.common.config.ConfigDef;
import org.apache.kafka.common.config.ConfigDef.Importance;
import org.apache.kafka.common.config.ConfigDef.Type;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
import java.util.Map;
/**
@@ -122,6 +125,18 @@ public class WorkerConfig extends AbstractConfig {
+ "The default value of the Access-Control-Allow-Methods header allows cross origin requests for GET, POST and HEAD.";
protected static final String ACCESS_CONTROL_ALLOW_METHODS_DEFAULT = "";
+ public static final String PLUGIN_PATH_CONFIG = "plugin.path";
+ protected static final String PLUGIN_PATH_DOC = "List of paths separated by commas (,) that "
+ + "contain plugins (connectors, converters, transformations). The list should consist"
+ + " of top level directories that include any combination of: \n"
+ + "a) directories immediately containing jars with plugins and their dependencies\n"
+ + "b) uber-jars with plugins and their dependencies\n"
+ + "c) directories immediately containing the package directory structure of classes of "
+ + "plugins and their dependencies\n"
+ + "Note: symlinks will be followed to discover dependencies or plugins.\n"
+ + "Examples: plugin.path=/usr/local/share/java,/usr/local/share/kafka/plugins,"
+ + "/opt/connectors";
+
/**
* Get a basic ConfigDef for a WorkerConfig. This includes all the common settings. Subclasses can use this to
* bootstrap their own ConfigDef.
@@ -155,7 +170,21 @@ public class WorkerConfig extends AbstractConfig {
ACCESS_CONTROL_ALLOW_ORIGIN_DOC)
.define(ACCESS_CONTROL_ALLOW_METHODS_CONFIG, Type.STRING,
ACCESS_CONTROL_ALLOW_METHODS_DEFAULT, Importance.LOW,
- ACCESS_CONTROL_ALLOW_METHODS_DOC);
+ ACCESS_CONTROL_ALLOW_METHODS_DOC)
+ .define(
+ PLUGIN_PATH_CONFIG,
+ Type.LIST,
+ null,
+ Importance.LOW,
+ PLUGIN_PATH_DOC
+ );
+ }
+
+ public static List<String> pluginLocations(Map<String, String> props) {
+ String locationList = props.get(WorkerConfig.PLUGIN_PATH_CONFIG);
+ return locationList == null
+ ? new ArrayList<String>()
+ : Arrays.asList(locationList.trim().split("\\s*,\\s*", -1));
}
public WorkerConfig(ConfigDef definition, Map<String, String> props) {
http://git-wip-us.apache.org/repos/asf/kafka/blob/45f22617/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSinkTask.java
----------------------------------------------------------------------
diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSinkTask.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSinkTask.java
index d5f337d..43ad6a1 100644
--- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSinkTask.java
+++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSinkTask.java
@@ -83,8 +83,9 @@ class WorkerSinkTask extends WorkerTask {
Converter keyConverter,
Converter valueConverter,
TransformationChain<SinkRecord> transformationChain,
+ ClassLoader loader,
Time time) {
- super(id, statusListener, initialState);
+ super(id, statusListener, initialState, loader);
this.workerConfig = workerConfig;
this.task = task;
http://git-wip-us.apache.org/repos/asf/kafka/blob/45f22617/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSourceTask.java
----------------------------------------------------------------------
diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSourceTask.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSourceTask.java
index ed15b85..5627145 100644
--- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSourceTask.java
+++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSourceTask.java
@@ -86,8 +86,9 @@ class WorkerSourceTask extends WorkerTask {
OffsetStorageReader offsetReader,
OffsetStorageWriter offsetWriter,
WorkerConfig workerConfig,
+ ClassLoader loader,
Time time) {
- super(id, statusListener, initialState);
+ super(id, statusListener, initialState, loader);
this.workerConfig = workerConfig;
this.task = task;
http://git-wip-us.apache.org/repos/asf/kafka/blob/45f22617/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerTask.java
----------------------------------------------------------------------
diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerTask.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerTask.java
index 43d45d8..9b233dd 100644
--- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerTask.java
+++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerTask.java
@@ -16,6 +16,7 @@
*/
package org.apache.kafka.connect.runtime;
+import org.apache.kafka.connect.runtime.isolation.Plugins;
import org.apache.kafka.connect.util.ConnectorTaskId;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -39,6 +40,7 @@ abstract class WorkerTask implements Runnable {
protected final ConnectorTaskId id;
private final TaskStatus.Listener statusListener;
+ protected final ClassLoader loader;
private final CountDownLatch shutdownLatch = new CountDownLatch(1);
private volatile TargetState targetState;
private volatile boolean stopping; // indicates whether the Worker has asked the task to stop
@@ -46,9 +48,11 @@ abstract class WorkerTask implements Runnable {
public WorkerTask(ConnectorTaskId id,
TaskStatus.Listener statusListener,
- TargetState initialState) {
+ TargetState initialState,
+ ClassLoader loader) {
this.id = id;
this.statusListener = statusListener;
+ this.loader = loader;
this.targetState = initialState;
this.stopping = false;
this.cancelled = false;
@@ -58,6 +62,10 @@ abstract class WorkerTask implements Runnable {
return id;
}
+ public ClassLoader loader() {
+ return loader;
+ }
+
/**
* Initialize the task for execution.
* @param taskConfig initial configuration
@@ -177,6 +185,7 @@ abstract class WorkerTask implements Runnable {
@Override
public void run() {
+ ClassLoader savedLoader = Plugins.compareAndSwapLoaders(loader);
try {
doRun();
onShutdown();
@@ -186,6 +195,7 @@ abstract class WorkerTask implements Runnable {
if (t instanceof Error)
throw (Error) t;
} finally {
+ Plugins.compareAndSwapLoaders(savedLoader);
shutdownLatch.countDown();
}
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/45f22617/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/DistributedHerder.java
----------------------------------------------------------------------
diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/DistributedHerder.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/DistributedHerder.java
index e908d0b..8f7503e 100644
--- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/DistributedHerder.java
+++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/DistributedHerder.java
@@ -954,10 +954,10 @@ public class DistributedHerder extends AbstractHerder implements Runnable {
ConnectorConfig connConfig;
List<String> sinkTopics = null;
if (worker.isSinkConnector(connName)) {
- connConfig = new SinkConnectorConfig(configs);
+ connConfig = new SinkConnectorConfig(plugins(), configs);
sinkTopics = connConfig.getList(SinkConnectorConfig.TOPICS_CONFIG);
} else {
- connConfig = new SourceConnectorConfig(configs);
+ connConfig = new SourceConnectorConfig(plugins(), configs);
}
final List<Map<String, String>> taskProps
http://git-wip-us.apache.org/repos/asf/kafka/blob/45f22617/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/isolation/DelegatingClassLoader.java
----------------------------------------------------------------------
diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/isolation/DelegatingClassLoader.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/isolation/DelegatingClassLoader.java
new file mode 100644
index 0000000..da8b444
--- /dev/null
+++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/isolation/DelegatingClassLoader.java
@@ -0,0 +1,299 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.connect.runtime.isolation;
+
+import org.apache.kafka.connect.connector.Connector;
+import org.apache.kafka.connect.storage.Converter;
+import org.apache.kafka.connect.transforms.Transformation;
+import org.reflections.Reflections;
+import org.reflections.util.ClasspathHelper;
+import org.reflections.util.ConfigurationBuilder;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.net.MalformedURLException;
+import java.net.URL;
+import java.net.URLClassLoader;
+import java.nio.file.Files;
+import java.nio.file.InvalidPathException;
+import java.nio.file.Path;
+import java.nio.file.Paths;
+import java.security.AccessController;
+import java.security.PrivilegedAction;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.SortedMap;
+import java.util.SortedSet;
+import java.util.TreeMap;
+import java.util.TreeSet;
+
+public class DelegatingClassLoader extends URLClassLoader {
+ private static final Logger log = LoggerFactory.getLogger(DelegatingClassLoader.class);
+
+ private final Map<String, SortedMap<PluginDesc<?>, ClassLoader>> pluginLoaders;
+ private final SortedSet<PluginDesc<Connector>> connectors;
+ private final SortedSet<PluginDesc<Converter>> converters;
+ private final SortedSet<PluginDesc<Transformation>> transformations;
+ private final List<String> pluginPaths;
+ private final Map<Path, PluginClassLoader> activePaths;
+
+ public DelegatingClassLoader(List<String> pluginPaths, ClassLoader parent) {
+ super(new URL[0], parent);
+ this.pluginPaths = pluginPaths;
+ this.pluginLoaders = new HashMap<>();
+ this.activePaths = new HashMap<>();
+ this.connectors = new TreeSet<>();
+ this.converters = new TreeSet<>();
+ this.transformations = new TreeSet<>();
+ }
+
+ public DelegatingClassLoader(List<String> pluginPaths) {
+ this(pluginPaths, ClassLoader.getSystemClassLoader());
+ }
+
+ public Set<PluginDesc<Connector>> connectors() {
+ return connectors;
+ }
+
+ public Set<PluginDesc<Converter>> converters() {
+ return converters;
+ }
+
+ public Set<PluginDesc<Transformation>> transformations() {
+ return transformations;
+ }
+
+ public ClassLoader connectorLoader(Connector connector) {
+ return connectorLoader(connector.getClass().getName());
+ }
+
+ public ClassLoader connectorLoader(String connectorClassOrAlias) {
+ log.debug("Getting plugin class loader for connector: '{}'", connectorClassOrAlias);
+ SortedMap<PluginDesc<?>, ClassLoader> inner =
+ pluginLoaders.get(connectorClassOrAlias);
+ if (inner == null) {
+ log.error(
+ "Plugin class loader for connector: '{}' was not found. Returning: {}",
+ connectorClassOrAlias,
+ this
+ );
+ return this;
+ }
+ return inner.get(inner.lastKey());
+ }
+
+ private static PluginClassLoader newPluginClassLoader(
+ final URL pluginLocation,
+ final URL[] urls,
+ final ClassLoader parent
+ ) {
+ return (PluginClassLoader) AccessController.doPrivileged(
+ new PrivilegedAction() {
+ @Override
+ public Object run() {
+ return new PluginClassLoader(pluginLocation, urls, parent);
+ }
+ }
+ );
+ }
+
+ private <T> void addPlugins(Collection<PluginDesc<T>> plugins, ClassLoader loader) {
+ for (PluginDesc<T> plugin : plugins) {
+ String pluginClassName = plugin.className();
+ SortedMap<PluginDesc<?>, ClassLoader> inner = pluginLoaders.get(pluginClassName);
+ if (inner == null) {
+ inner = new TreeMap<>();
+ pluginLoaders.put(pluginClassName, inner);
+ // TODO: once versioning is enabled this line should be moved outside this if branch
+ log.info("Added plugin '{}'", pluginClassName);
+ }
+ inner.put(plugin, loader);
+ }
+ }
+
+ protected void initLoaders() {
+ String path = null;
+ try {
+ for (String configPath : pluginPaths) {
+ path = configPath;
+ Path pluginPath = Paths.get(path).toAbsolutePath();
+ // Currently 'plugin.paths' property is a list of top-level directories
+ // containing plugins
+ if (Files.isDirectory(pluginPath)) {
+ for (Path pluginLocation : PluginUtils.pluginLocations(pluginPath)) {
+ log.info("Loading plugin from: {}", pluginLocation);
+ URL[] urls = PluginUtils.pluginUrls(pluginLocation).toArray(new URL[0]);
+ if (log.isDebugEnabled()) {
+ log.debug("Loading plugin urls: {}", Arrays.toString(urls));
+ }
+ PluginClassLoader loader = newPluginClassLoader(
+ pluginLocation.toUri().toURL(),
+ urls,
+ this
+ );
+
+ scanUrlsAndAddPlugins(loader, urls, pluginLocation);
+ }
+ }
+ }
+
+ path = "classpath";
+ // Finally add parent/system loader.
+ scanUrlsAndAddPlugins(
+ getParent(),
+ ClasspathHelper.forJavaClassPath().toArray(new URL[0]),
+ null
+ );
+ } catch (InvalidPathException | MalformedURLException e) {
+ log.error("Invalid path in plugin path: {}. Ignoring.", path);
+ } catch (IOException e) {
+ log.error("Could not get listing for plugin path: {}. Ignoring.", path);
+ } catch (InstantiationException | IllegalAccessException e) {
+ log.error("Could not instantiate plugins in: {}. Ignoring: {}", path, e);
+ }
+ addAllAliases();
+ }
+
+ private void scanUrlsAndAddPlugins(
+ ClassLoader loader,
+ URL[] urls,
+ Path pluginLocation
+ ) throws InstantiationException, IllegalAccessException {
+ PluginScanResult plugins = scanPluginPath(loader, urls);
+ log.info("Registered loader: {}", loader);
+ if (!plugins.isEmpty()) {
+ if (loader instanceof PluginClassLoader) {
+ activePaths.put(pluginLocation, (PluginClassLoader) loader);
+ }
+
+ addPlugins(plugins.connectors(), loader);
+ connectors.addAll(plugins.connectors());
+ addPlugins(plugins.converters(), loader);
+ converters.addAll(plugins.converters());
+ addPlugins(plugins.transformations(), loader);
+ transformations.addAll(plugins.transformations());
+ }
+ }
+
+ private PluginScanResult scanPluginPath(
+ ClassLoader loader,
+ URL[] urls
+ ) throws InstantiationException, IllegalAccessException {
+ ConfigurationBuilder builder = new ConfigurationBuilder();
+ builder.setClassLoaders(new ClassLoader[]{loader});
+ builder.addUrls(urls);
+ Reflections reflections = new Reflections(builder);
+
+ return new PluginScanResult(
+ getPluginDesc(reflections, Connector.class, loader),
+ getPluginDesc(reflections, Converter.class, loader),
+ getPluginDesc(reflections, Transformation.class, loader)
+ );
+ }
+
+ private <T> Collection<PluginDesc<T>> getPluginDesc(
+ Reflections reflections,
+ Class<T> klass,
+ ClassLoader loader
+ ) throws InstantiationException, IllegalAccessException {
+ Set<Class<? extends T>> plugins = reflections.getSubTypesOf(klass);
+
+ Collection<PluginDesc<T>> result = new ArrayList<>();
+ for (Class<? extends T> plugin : plugins) {
+ if (PluginUtils.isConcrete(plugin)) {
+ // Temporary workaround until all the plugins are versioned.
+ if (Connector.class.isAssignableFrom(plugin)) {
+ result.add(
+ new PluginDesc<>(
+ plugin,
+ ((Connector) plugin.newInstance()).version(),
+ loader
+ )
+ );
+ } else {
+ result.add(new PluginDesc<>(plugin, "undefined", loader));
+ }
+ }
+ }
+ return result;
+ }
+
+ @Override
+ protected Class<?> loadClass(String name, boolean resolve) throws ClassNotFoundException {
+ if (!PluginUtils.shouldLoadInIsolation(name)) {
+ // There are no paths in this classloader, will attempt to load with the parent.
+ return super.loadClass(name, resolve);
+ }
+
+ SortedMap<PluginDesc<?>, ClassLoader> inner = pluginLoaders.get(name);
+ if (inner != null) {
+ log.trace("Retrieving loaded class '{}' from '{}'", name, inner.get(inner.lastKey()));
+ ClassLoader pluginLoader = inner.get(inner.lastKey());
+ return pluginLoader instanceof PluginClassLoader
+ ? ((PluginClassLoader) pluginLoader).loadClass(name, resolve)
+ : super.loadClass(name, resolve);
+ }
+
+ Class<?> klass = null;
+ for (PluginClassLoader loader : activePaths.values()) {
+ try {
+ klass = loader.loadClass(name, resolve);
+ break;
+ } catch (ClassNotFoundException e) {
+ // Not found in this loader.
+ }
+ }
+ if (klass == null) {
+ return super.loadClass(name, resolve);
+ }
+ return klass;
+ }
+
+ private void addAllAliases() {
+ addAliases(connectors);
+ addAliases(converters);
+ addAliases(transformations);
+ }
+
+ private <S> void addAliases(Collection<PluginDesc<S>> plugins) {
+ for (PluginDesc<S> plugin : plugins) {
+ if (PluginUtils.isAliasUnique(plugin, plugins)) {
+ String simple = PluginUtils.simpleName(plugin);
+ String pruned = PluginUtils.prunedName(plugin);
+ SortedMap<PluginDesc<?>, ClassLoader> inner = pluginLoaders.get(plugin.className());
+ pluginLoaders.put(simple, inner);
+ if (simple.equals(pruned)) {
+ log.info("Added alias '{}' to plugin '{}'", simple, plugin.className());
+ } else {
+ pluginLoaders.put(pruned, inner);
+ log.info(
+ "Added aliases '{}' and '{}' to plugin '{}'",
+ simple,
+ pruned,
+ plugin.className()
+ );
+ }
+ }
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/kafka/blob/45f22617/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/isolation/PluginClassLoader.java
----------------------------------------------------------------------
diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/isolation/PluginClassLoader.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/isolation/PluginClassLoader.java
new file mode 100644
index 0000000..07438e9
--- /dev/null
+++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/isolation/PluginClassLoader.java
@@ -0,0 +1,68 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.connect.runtime.isolation;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.net.URL;
+import java.net.URLClassLoader;
+
+public class PluginClassLoader extends URLClassLoader {
+ private static final Logger log = LoggerFactory.getLogger(DelegatingClassLoader.class);
+ private final URL pluginLocation;
+
+ public PluginClassLoader(URL pluginLocation, URL[] urls, ClassLoader parent) {
+ super(urls, parent);
+ this.pluginLocation = pluginLocation;
+ }
+
+ public PluginClassLoader(URL pluginLocation, URL[] urls) {
+ super(urls);
+ this.pluginLocation = pluginLocation;
+ }
+
+ public String location() {
+ return pluginLocation.toString();
+ }
+
+ @Override
+ public String toString() {
+ return "PluginClassLoader{pluginLocation=" + pluginLocation + "}";
+ }
+
+ @Override
+ protected Class<?> loadClass(String name, boolean resolve) throws ClassNotFoundException {
+ Class<?> klass = findLoadedClass(name);
+ if (klass == null) {
+ if (PluginUtils.shouldLoadInIsolation(name)) {
+ try {
+ klass = findClass(name);
+ } catch (ClassNotFoundException e) {
+ // Not found in loader's path. Search in parents.
+ }
+ }
+ if (klass == null) {
+ klass = super.loadClass(name, false);
+ }
+ }
+ if (resolve) {
+ resolveClass(klass);
+ }
+ return klass;
+ }
+}
http://git-wip-us.apache.org/repos/asf/kafka/blob/45f22617/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/isolation/PluginDesc.java
----------------------------------------------------------------------
diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/isolation/PluginDesc.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/isolation/PluginDesc.java
new file mode 100644
index 0000000..a607704
--- /dev/null
+++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/isolation/PluginDesc.java
@@ -0,0 +1,110 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.connect.runtime.isolation;
+
+import com.fasterxml.jackson.annotation.JsonProperty;
+import org.apache.maven.artifact.versioning.DefaultArtifactVersion;
+
+import java.util.Objects;
+
+public class PluginDesc<T> implements Comparable<PluginDesc<T>> {
+ private final Class<? extends T> klass;
+ private final String name;
+ private final String version;
+ private final DefaultArtifactVersion encodedVersion;
+ private final PluginType type;
+ private final String typeName;
+ private final String location;
+
+ public PluginDesc(Class<? extends T> klass, String version, ClassLoader loader) {
+ this.klass = klass;
+ this.name = klass.getName();
+ this.version = version;
+ this.encodedVersion = new DefaultArtifactVersion(version);
+ this.type = PluginType.from(klass);
+ this.typeName = type.toString();
+ this.location = loader instanceof PluginClassLoader
+ ? ((PluginClassLoader) loader).location()
+ : "classpath";
+ }
+
+ @Override
+ public String toString() {
+ return "PluginDesc{" +
+ "klass=" + klass +
+ ", name='" + name + '\'' +
+ ", version='" + version + '\'' +
+ ", encodedVersion=" + encodedVersion +
+ ", type=" + type +
+ ", typeName='" + typeName + '\'' +
+ ", location='" + location + '\'' +
+ '}';
+ }
+
+ public Class<? extends T> pluginClass() {
+ return klass;
+ }
+
+ @JsonProperty("class")
+ public String className() {
+ return name;
+ }
+
+ @JsonProperty("version")
+ public String version() {
+ return version;
+ }
+
+ public PluginType type() {
+ return type;
+ }
+
+ @JsonProperty("type")
+ public String typeName() {
+ return typeName;
+ }
+
+ @JsonProperty("location")
+ public String location() {
+ return location;
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) {
+ return true;
+ }
+ if (!(o instanceof PluginDesc)) {
+ return false;
+ }
+ PluginDesc<?> that = (PluginDesc<?>) o;
+ return Objects.equals(klass, that.klass) &&
+ Objects.equals(version, that.version) &&
+ type == that.type;
+ }
+
+ @Override
+ public int hashCode() {
+ return Objects.hash(klass, version, type);
+ }
+
+ @Override
+ public int compareTo(PluginDesc other) {
+ int nameComp = name.compareTo(other.name);
+ return nameComp != 0 ? nameComp : encodedVersion.compareTo(other.encodedVersion);
+ }
+}
http://git-wip-us.apache.org/repos/asf/kafka/blob/45f22617/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/isolation/PluginScanResult.java
----------------------------------------------------------------------
diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/isolation/PluginScanResult.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/isolation/PluginScanResult.java
new file mode 100644
index 0000000..f3d2f21
--- /dev/null
+++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/isolation/PluginScanResult.java
@@ -0,0 +1,55 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.connect.runtime.isolation;
+
+import org.apache.kafka.connect.connector.Connector;
+import org.apache.kafka.connect.storage.Converter;
+import org.apache.kafka.connect.transforms.Transformation;
+
+import java.util.Collection;
+
+public class PluginScanResult {
+ private final Collection<PluginDesc<Connector>> connectors;
+ private final Collection<PluginDesc<Converter>> converters;
+ private final Collection<PluginDesc<Transformation>> transformations;
+
+ public PluginScanResult(
+ Collection<PluginDesc<Connector>> connectors,
+ Collection<PluginDesc<Converter>> converters,
+ Collection<PluginDesc<Transformation>> transformations
+ ) {
+ this.connectors = connectors;
+ this.converters = converters;
+ this.transformations = transformations;
+ }
+
+ public Collection<PluginDesc<Connector>> connectors() {
+ return connectors;
+ }
+
+ public Collection<PluginDesc<Converter>> converters() {
+ return converters;
+ }
+
+ public Collection<PluginDesc<Transformation>> transformations() {
+ return transformations;
+ }
+
+ public boolean isEmpty() {
+ return connectors().isEmpty() && converters().isEmpty() && transformations().isEmpty();
+ }
+}
http://git-wip-us.apache.org/repos/asf/kafka/blob/45f22617/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/isolation/PluginType.java
----------------------------------------------------------------------
diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/isolation/PluginType.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/isolation/PluginType.java
new file mode 100644
index 0000000..5649213
--- /dev/null
+++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/isolation/PluginType.java
@@ -0,0 +1,58 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.connect.runtime.isolation;
+
+import org.apache.kafka.connect.connector.Connector;
+import org.apache.kafka.connect.sink.SinkConnector;
+import org.apache.kafka.connect.source.SourceConnector;
+import org.apache.kafka.connect.storage.Converter;
+import org.apache.kafka.connect.transforms.Transformation;
+
+import java.util.Locale;
+
+public enum PluginType {
+ SOURCE(SourceConnector.class),
+ SINK(SinkConnector.class),
+ CONNECTOR(Connector.class),
+ CONVERTER(Converter.class),
+ TRANSFORMATION(Transformation.class),
+ UNKNOWN(Object.class);
+
+ private Class<?> klass;
+
+ PluginType(Class<?> klass) {
+ this.klass = klass;
+ }
+
+ public static PluginType from(Class<?> klass) {
+ for (PluginType type : PluginType.values()) {
+ if (type.klass.isAssignableFrom(klass)) {
+ return type;
+ }
+ }
+ return UNKNOWN;
+ }
+
+ public String simpleName() {
+ return klass.getSimpleName();
+ }
+
+ @Override
+ public String toString() {
+ return super.toString().toLowerCase(Locale.ROOT);
+ }
+}