You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by ew...@apache.org on 2017/05/18 17:39:22 UTC

[2/3] kafka git commit: KAFKA-3487: Support classloading isolation in Connect (KIP-146)

http://git-wip-us.apache.org/repos/asf/kafka/blob/45f22617/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/isolation/PluginUtils.java
----------------------------------------------------------------------
diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/isolation/PluginUtils.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/isolation/PluginUtils.java
new file mode 100644
index 0000000..b2be997
--- /dev/null
+++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/isolation/PluginUtils.java
@@ -0,0 +1,147 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.connect.runtime.isolation;
+
+import java.io.IOException;
+import java.lang.reflect.Modifier;
+import java.net.URL;
+import java.nio.file.DirectoryStream;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.List;
+import java.util.Locale;
+
+public class PluginUtils {
+    private static final String BLACKLIST = "^(?:"
+            + "java"
+            + "|javax"
+            + "|org\\.omg"
+            + "|org\\.w3c\\.dom"
+            + "|org\\.apache\\.kafka\\.common"
+            + "|org\\.apache\\.kafka\\.connect"
+            + "|org\\.apache\\.log4j"
+            + ")\\..*$";
+
+    private static final String WHITELIST = "^org\\.apache\\.kafka\\.connect\\.(?:"
+            + "transforms\\.(?!Transformation$).*"
+            + "|json\\..*"
+            + "|file\\..*"
+            + "|converters\\..*"
+            + "|storage\\.StringConverter"
+            + ")$";
+
+    private static final DirectoryStream.Filter<Path> PLUGIN_PATH_FILTER = new DirectoryStream
+            .Filter<Path>() {
+        @Override
+        public boolean accept(Path path) throws IOException {
+            return Files.isDirectory(path) || PluginUtils.isJar(path);
+        }
+    };
+
+    public static boolean shouldLoadInIsolation(String name) {
+        return !(name.matches(BLACKLIST) && !name.matches(WHITELIST));
+    }
+
+    public static boolean isConcrete(Class<?> klass) {
+        int mod = klass.getModifiers();
+        return !Modifier.isAbstract(mod) && !Modifier.isInterface(mod);
+    }
+
+    public static boolean isJar(Path path) {
+        return path.toString().toLowerCase(Locale.ROOT).endsWith(".jar");
+    }
+
+    public static List<URL> pluginUrls(Path pluginPath) throws IOException {
+        List<URL> urls = new ArrayList<>();
+        if (PluginUtils.isJar(pluginPath)) {
+            urls.add(pluginPath.toUri().toURL());
+        } else if (Files.isDirectory(pluginPath)) {
+            try (
+                    DirectoryStream<Path> listing = Files.newDirectoryStream(
+                            pluginPath,
+                            PLUGIN_PATH_FILTER
+                    )
+            ) {
+                for (Path jar : listing) {
+                    urls.add(jar.toUri().toURL());
+                }
+            }
+        }
+        return urls;
+    }
+
+    public static List<Path> pluginLocations(Path topPath) throws IOException {
+        List<Path> locations = new ArrayList<>();
+        // Non-recursive for now. Plugin directories or jars need to be exactly under the topPath.
+        try (
+                DirectoryStream<Path> listing = Files.newDirectoryStream(
+                        topPath,
+                        PLUGIN_PATH_FILTER
+                )
+        ) {
+            for (Path dir : listing) {
+                locations.add(dir);
+            }
+        }
+        return locations;
+    }
+
+    public static String simpleName(PluginDesc<?> plugin) {
+        return plugin.pluginClass().getSimpleName();
+    }
+
+    public static String prunedName(PluginDesc<?> plugin) {
+        // It's currently simpler to switch on type than do pattern matching.
+        switch (plugin.type()) {
+            case SOURCE:
+            case SINK:
+            case CONNECTOR:
+                return prunePluginName(plugin, "Connector");
+            default:
+                return prunePluginName(plugin, plugin.type().simpleName());
+        }
+    }
+
+    public static <U> boolean isAliasUnique(
+            PluginDesc<U> alias,
+            Collection<PluginDesc<U>> plugins
+    ) {
+        boolean matched = false;
+        for (PluginDesc<U> plugin : plugins) {
+            if (simpleName(alias).equals(simpleName(plugin))
+                    || prunedName(alias).equals(prunedName(plugin))) {
+                if (matched) {
+                    return false;
+                }
+                matched = true;
+            }
+        }
+        return true;
+    }
+
+    private static String prunePluginName(PluginDesc<?> plugin, String suffix) {
+        String simple = plugin.pluginClass().getSimpleName();
+        int pos = simple.lastIndexOf(suffix);
+        if (pos > 0) {
+            return simple.substring(0, pos);
+        }
+        return simple;
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/45f22617/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/isolation/Plugins.java
----------------------------------------------------------------------
diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/isolation/Plugins.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/isolation/Plugins.java
new file mode 100644
index 0000000..654f485
--- /dev/null
+++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/isolation/Plugins.java
@@ -0,0 +1,217 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.connect.runtime.isolation;
+
+import org.apache.kafka.common.Configurable;
+import org.apache.kafka.common.config.AbstractConfig;
+import org.apache.kafka.common.utils.Utils;
+import org.apache.kafka.connect.connector.ConnectRecord;
+import org.apache.kafka.connect.connector.Connector;
+import org.apache.kafka.connect.connector.Task;
+import org.apache.kafka.connect.errors.ConnectException;
+import org.apache.kafka.connect.runtime.WorkerConfig;
+import org.apache.kafka.connect.storage.Converter;
+import org.apache.kafka.connect.transforms.Transformation;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.security.AccessController;
+import java.security.PrivilegedAction;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+public class Plugins {
+    private static final Logger log = LoggerFactory.getLogger(Plugins.class);
+    private final DelegatingClassLoader delegatingLoader;
+
+    public Plugins(Map<String, String> props) {
+        List<String> pluginLocations = WorkerConfig.pluginLocations(props);
+        delegatingLoader = newDelegatingClassLoader(pluginLocations);
+        delegatingLoader.initLoaders();
+    }
+
+    private static DelegatingClassLoader newDelegatingClassLoader(final List<String> paths) {
+        return (DelegatingClassLoader) AccessController.doPrivileged(
+                new PrivilegedAction() {
+                    @Override
+                    public Object run() {
+                        return new DelegatingClassLoader(paths);
+                    }
+                }
+        );
+    }
+
+    private static <T> String pluginNames(Collection<PluginDesc<T>> plugins) {
+        return Utils.join(plugins, ", ");
+    }
+
+    protected static <T> T newPlugin(Class<T> klass) {
+        try {
+            return Utils.newInstance(klass);
+        } catch (Throwable t) {
+            throw new ConnectException("Instantiation error", t);
+        }
+    }
+
+    protected static <T> T newConfiguredPlugin(AbstractConfig config, Class<T> klass) {
+        T plugin = Utils.newInstance(klass);
+        if (plugin instanceof Configurable) {
+            ((Configurable) plugin).configure(config.originals());
+        }
+        return plugin;
+    }
+
+    @SuppressWarnings("unchecked")
+    protected static <U> Class<? extends U> pluginClass(
+            DelegatingClassLoader loader,
+            String classOrAlias,
+            Class<U> pluginClass
+    ) throws ClassNotFoundException {
+        Class<?> klass = loader.loadClass(classOrAlias, false);
+        if (pluginClass.isAssignableFrom(klass)) {
+            return (Class<? extends U>) klass;
+        }
+
+        throw new ClassNotFoundException(
+                "Requested class: "
+                        + classOrAlias
+                        + " does not extend " + pluginClass.getSimpleName()
+        );
+    }
+
+    public static ClassLoader compareAndSwapLoaders(ClassLoader loader) {
+        ClassLoader current = Thread.currentThread().getContextClassLoader();
+        if (!current.equals(loader)) {
+            Thread.currentThread().setContextClassLoader(loader);
+        }
+        return current;
+    }
+
+    public ClassLoader currentThreadLoader() {
+        return Thread.currentThread().getContextClassLoader();
+    }
+
+    public ClassLoader compareAndSwapWithDelegatingLoader() {
+        ClassLoader current = Thread.currentThread().getContextClassLoader();
+        if (!current.equals(delegatingLoader)) {
+            Thread.currentThread().setContextClassLoader(delegatingLoader);
+        }
+        return current;
+    }
+
+    public ClassLoader compareAndSwapLoaders(Connector connector) {
+        ClassLoader connectorLoader = delegatingLoader.connectorLoader(connector);
+        return compareAndSwapLoaders(connectorLoader);
+    }
+
+    public DelegatingClassLoader delegatingLoader() {
+        return delegatingLoader;
+    }
+
+    public Set<PluginDesc<Connector>> connectors() {
+        return delegatingLoader.connectors();
+    }
+
+    public Set<PluginDesc<Converter>> converters() {
+        return delegatingLoader.converters();
+    }
+
+    public Set<PluginDesc<Transformation>> transformations() {
+        return delegatingLoader.transformations();
+    }
+
+    @SuppressWarnings("unchecked")
+    public Connector newConnector(String connectorClassOrAlias) {
+        Class<? extends Connector> klass;
+        try {
+            klass = pluginClass(
+                    delegatingLoader,
+                    connectorClassOrAlias,
+                    Connector.class
+            );
+        } catch (ClassNotFoundException e) {
+            List<PluginDesc<Connector>> matches = new ArrayList<>();
+            for (PluginDesc<Connector> plugin : delegatingLoader.connectors()) {
+                Class<?> pluginClass = plugin.pluginClass();
+                String simpleName = pluginClass.getSimpleName();
+                if (simpleName.equals(connectorClassOrAlias)
+                        || simpleName.equals(connectorClassOrAlias + "Connector")) {
+                    matches.add(plugin);
+                }
+            }
+
+            if (matches.isEmpty()) {
+                throw new ConnectException(
+                        "Failed to find any class that implements Connector and which name matches "
+                                + connectorClassOrAlias
+                                + ", available connectors are: "
+                                + pluginNames(delegatingLoader.connectors())
+                );
+            }
+            if (matches.size() > 1) {
+                throw new ConnectException(
+                        "More than one connector matches alias "
+                                + connectorClassOrAlias
+                                +
+                                ". Please use full package and class name instead. Classes found: "
+                                + pluginNames(matches)
+                );
+            }
+
+            PluginDesc<Connector> entry = matches.get(0);
+            klass = entry.pluginClass();
+        }
+        return newPlugin(klass);
+    }
+
+    public Task newTask(Class<? extends Task> taskClass) {
+        return newPlugin(taskClass);
+    }
+
+    public Converter newConverter(String converterClassOrAlias) {
+        return newConverter(converterClassOrAlias, null);
+    }
+
+    public Converter newConverter(String converterClassOrAlias, AbstractConfig config) {
+        Class<? extends Converter> klass;
+        try {
+            klass = pluginClass(
+                    delegatingLoader,
+                    converterClassOrAlias,
+                    Converter.class
+            );
+        } catch (ClassNotFoundException e) {
+            throw new ConnectException(
+                    "Failed to find any class that implements Converter and which name matches "
+                            + converterClassOrAlias
+                            + ", available connectors are: "
+                            + pluginNames(delegatingLoader.converters())
+            );
+        }
+        return config != null ? newConfiguredPlugin(config, klass) : newPlugin(klass);
+    }
+
+    public <R extends ConnectRecord<R>> Transformation<R> newTranformations(
+            String transformationClassOrAlias
+    ) {
+        return null;
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/45f22617/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/entities/ConnectorPluginInfo.java
----------------------------------------------------------------------
diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/entities/ConnectorPluginInfo.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/entities/ConnectorPluginInfo.java
index ff3c30d..36b896f 100644
--- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/entities/ConnectorPluginInfo.java
+++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/entities/ConnectorPluginInfo.java
@@ -18,22 +18,12 @@ package org.apache.kafka.connect.runtime.rest.entities;
 
 import com.fasterxml.jackson.annotation.JsonCreator;
 import com.fasterxml.jackson.annotation.JsonProperty;
-
 import org.apache.kafka.connect.connector.Connector;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
+import org.apache.kafka.connect.runtime.isolation.PluginDesc;
 
-import java.util.Map;
 import java.util.Objects;
-import java.util.concurrent.ConcurrentHashMap;
 
 public class ConnectorPluginInfo {
-
-    private static final Logger log = LoggerFactory.getLogger(ConnectorPluginInfo.class);
-
-    private static final Map<Class<? extends Connector>, String>
-        VERSIONS = new ConcurrentHashMap<>();
-
     private String className;
     private ConnectorType type;
     private String version;
@@ -49,29 +39,8 @@ public class ConnectorPluginInfo {
         this.version = version;
     }
 
-    public ConnectorPluginInfo(Class<? extends Connector> klass) {
-        this(klass.getCanonicalName(), ConnectorType.from(klass), getVersion(klass));
-    }
-
-    private static String getVersion(Class<? extends Connector> klass) {
-        if (!VERSIONS.containsKey(klass)) {
-            synchronized (VERSIONS) {
-                if (!VERSIONS.containsKey(klass)) {
-                    try {
-                        VERSIONS.put(klass, klass.newInstance().version());
-                    } catch (
-                        ExceptionInInitializerError
-                            | InstantiationException
-                            | IllegalAccessException
-                            | SecurityException e
-                    ) {
-                        log.warn("Unable to instantiate connector", e);
-                        VERSIONS.put(klass, "unknown");
-                    }
-                }
-            }
-        }
-        return VERSIONS.get(klass);
+    public ConnectorPluginInfo(PluginDesc<Connector> plugin) {
+        this(plugin.className(), ConnectorType.from(plugin.pluginClass()), plugin.version());
     }
 
     @JsonProperty("class")

http://git-wip-us.apache.org/repos/asf/kafka/blob/45f22617/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/resources/ConnectorPluginsResource.java
----------------------------------------------------------------------
diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/resources/ConnectorPluginsResource.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/resources/ConnectorPluginsResource.java
index 37e0f01..24eb93b 100644
--- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/resources/ConnectorPluginsResource.java
+++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/resources/ConnectorPluginsResource.java
@@ -16,14 +16,18 @@
  */
 package org.apache.kafka.connect.runtime.rest.resources;
 
+import org.apache.kafka.connect.connector.Connector;
 import org.apache.kafka.connect.runtime.ConnectorConfig;
 import org.apache.kafka.connect.runtime.Herder;
-import org.apache.kafka.connect.runtime.PluginDiscovery;
+import org.apache.kafka.connect.runtime.isolation.PluginDesc;
 import org.apache.kafka.connect.runtime.rest.entities.ConfigInfos;
 import org.apache.kafka.connect.runtime.rest.entities.ConnectorPluginInfo;
-
-import java.util.List;
-import java.util.Map;
+import org.apache.kafka.connect.tools.MockConnector;
+import org.apache.kafka.connect.tools.MockSinkConnector;
+import org.apache.kafka.connect.tools.MockSourceConnector;
+import org.apache.kafka.connect.tools.SchemaSourceConnector;
+import org.apache.kafka.connect.tools.VerifiableSinkConnector;
+import org.apache.kafka.connect.tools.VerifiableSourceConnector;
 
 import javax.ws.rs.BadRequestException;
 import javax.ws.rs.Consumes;
@@ -33,6 +37,11 @@ import javax.ws.rs.Path;
 import javax.ws.rs.PathParam;
 import javax.ws.rs.Produces;
 import javax.ws.rs.core.MediaType;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
 
 @Path("/connector-plugins")
 @Produces(MediaType.APPLICATION_JSON)
@@ -41,9 +50,17 @@ public class ConnectorPluginsResource {
 
     private static final String ALIAS_SUFFIX = "Connector";
     private final Herder herder;
+    private final List<ConnectorPluginInfo> connectorPlugins;
+
+    private static final List<Class<? extends Connector>> CONNECTOR_EXCLUDES = Arrays.asList(
+            VerifiableSourceConnector.class, VerifiableSinkConnector.class,
+            MockConnector.class, MockSourceConnector.class, MockSinkConnector.class,
+            SchemaSourceConnector.class
+    );
 
     public ConnectorPluginsResource(Herder herder) {
         this.herder = herder;
+        this.connectorPlugins = new ArrayList<>();
     }
 
     @PUT
@@ -67,7 +84,20 @@ public class ConnectorPluginsResource {
     @GET
     @Path("/")
     public List<ConnectorPluginInfo> listConnectorPlugins() {
-        return PluginDiscovery.connectorPlugins();
+        return getConnectorPlugins();
+    }
+
+    // TODO: improve once plugins are allowed to be added/removed during runtime.
+    private synchronized List<ConnectorPluginInfo> getConnectorPlugins() {
+        if (connectorPlugins.isEmpty()) {
+            for (PluginDesc<Connector> plugin : herder.plugins().connectors()) {
+                if (!CONNECTOR_EXCLUDES.contains(plugin.pluginClass())) {
+                    connectorPlugins.add(new ConnectorPluginInfo(plugin));
+                }
+            }
+        }
+
+        return Collections.unmodifiableList(connectorPlugins);
     }
 
     private String normalizedPluginName(String pluginName) {

http://git-wip-us.apache.org/repos/asf/kafka/blob/45f22617/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/standalone/StandaloneHerder.java
----------------------------------------------------------------------
diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/standalone/StandaloneHerder.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/standalone/StandaloneHerder.java
index 9c8c7ae..d57e75f 100644
--- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/standalone/StandaloneHerder.java
+++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/standalone/StandaloneHerder.java
@@ -251,12 +251,12 @@ public class StandaloneHerder extends AbstractHerder {
 
         ConnectorConfig connConfig;
         if (worker.isSinkConnector(connName)) {
-            connConfig = new SinkConnectorConfig(config);
+            connConfig = new SinkConnectorConfig(plugins(), config);
             return worker.connectorTaskConfigs(connName,
                                                connConfig.getInt(ConnectorConfig.TASKS_MAX_CONFIG),
                                                connConfig.getList(SinkConnectorConfig.TOPICS_CONFIG));
         } else {
-            connConfig = new SourceConnectorConfig(config);
+            connConfig = new SourceConnectorConfig(plugins(), config);
             return worker.connectorTaskConfigs(connName,
                                                connConfig.getInt(ConnectorConfig.TASKS_MAX_CONFIG),
                                                null);

http://git-wip-us.apache.org/repos/asf/kafka/blob/45f22617/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/ConnectorConfigTest.java
----------------------------------------------------------------------
diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/ConnectorConfigTest.java b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/ConnectorConfigTest.java
index 375b9c0..f8c4fd6 100644
--- a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/ConnectorConfigTest.java
+++ b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/ConnectorConfigTest.java
@@ -20,12 +20,16 @@ import org.apache.kafka.common.config.ConfigDef;
 import org.apache.kafka.common.config.ConfigException;
 import org.apache.kafka.connect.connector.ConnectRecord;
 import org.apache.kafka.connect.connector.Connector;
+import org.apache.kafka.connect.runtime.isolation.PluginDesc;
+import org.apache.kafka.connect.runtime.isolation.Plugins;
 import org.apache.kafka.connect.transforms.Transformation;
 import org.junit.Test;
 
+import java.util.Collections;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
+import java.util.Set;
 
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertTrue;
@@ -33,6 +37,13 @@ import static org.junit.Assert.fail;
 
 public class ConnectorConfigTest<R extends ConnectRecord<R>> {
 
+    private static final Plugins MOCK_PLUGINS = new Plugins(new HashMap<String, String>()) {
+        @Override
+        public Set<PluginDesc<Transformation>> transformations() {
+            return Collections.emptySet();
+        }
+    };
+
     public static abstract class TestConnector extends Connector {
     }
 
@@ -67,7 +78,7 @@ public class ConnectorConfigTest<R extends ConnectRecord<R>> {
         Map<String, String> props = new HashMap<>();
         props.put("name", "test");
         props.put("connector.class", TestConnector.class.getName());
-        new ConnectorConfig(props);
+        new ConnectorConfig(MOCK_PLUGINS, props);
     }
 
     @Test(expected = ConfigException.class)
@@ -76,7 +87,7 @@ public class ConnectorConfigTest<R extends ConnectRecord<R>> {
         props.put("name", "test");
         props.put("connector.class", TestConnector.class.getName());
         props.put("transforms", "dangler");
-        new ConnectorConfig(props);
+        new ConnectorConfig(MOCK_PLUGINS, props);
     }
 
     @Test(expected = ConfigException.class)
@@ -86,7 +97,7 @@ public class ConnectorConfigTest<R extends ConnectRecord<R>> {
         props.put("connector.class", TestConnector.class.getName());
         props.put("transforms", "a");
         props.put("transforms.a.type", "uninstantiable");
-        new ConnectorConfig(props);
+        new ConnectorConfig(MOCK_PLUGINS, props);
     }
 
     @Test(expected = ConfigException.class)
@@ -96,7 +107,7 @@ public class ConnectorConfigTest<R extends ConnectRecord<R>> {
         props.put("connector.class", TestConnector.class.getName());
         props.put("transforms", "a");
         props.put("transforms.a.type", SimpleTransformation.class.getName());
-        new ConnectorConfig(props);
+        new ConnectorConfig(MOCK_PLUGINS, props);
     }
 
     @Test
@@ -108,7 +119,7 @@ public class ConnectorConfigTest<R extends ConnectRecord<R>> {
         props.put("transforms.a.type", SimpleTransformation.class.getName());
         props.put("transforms.a.magic.number", "40");
         try {
-            new ConnectorConfig(props);
+            new ConnectorConfig(MOCK_PLUGINS, props);
             fail();
         } catch (ConfigException e) {
             assertTrue(e.getMessage().contains("Value must be at least 42"));
@@ -123,7 +134,7 @@ public class ConnectorConfigTest<R extends ConnectRecord<R>> {
         props.put("transforms", "a");
         props.put("transforms.a.type", SimpleTransformation.class.getName());
         props.put("transforms.a.magic.number", "42");
-        final ConnectorConfig config = new ConnectorConfig(props);
+        final ConnectorConfig config = new ConnectorConfig(MOCK_PLUGINS, props);
         final List<Transformation<R>> transformations = config.transformations();
         assertEquals(1, transformations.size());
         final SimpleTransformation xform = (SimpleTransformation) transformations.get(0);
@@ -138,7 +149,7 @@ public class ConnectorConfigTest<R extends ConnectRecord<R>> {
         props.put("transforms", "a, b");
         props.put("transforms.a.type", SimpleTransformation.class.getName());
         props.put("transforms.a.magic.number", "42");
-        new ConnectorConfig(props);
+        new ConnectorConfig(MOCK_PLUGINS, props);
     }
 
     @Test
@@ -151,7 +162,7 @@ public class ConnectorConfigTest<R extends ConnectRecord<R>> {
         props.put("transforms.a.magic.number", "42");
         props.put("transforms.b.type", SimpleTransformation.class.getName());
         props.put("transforms.b.magic.number", "84");
-        final ConnectorConfig config = new ConnectorConfig(props);
+        final ConnectorConfig config = new ConnectorConfig(MOCK_PLUGINS, props);
         final List<Transformation<R>> transformations = config.transformations();
         assertEquals(2, transformations.size());
         assertEquals(42, ((SimpleTransformation) transformations.get(0)).magicNumber);

http://git-wip-us.apache.org/repos/asf/kafka/blob/45f22617/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerConnectorTest.java
----------------------------------------------------------------------
diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerConnectorTest.java b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerConnectorTest.java
index 9e77198..11b05ee 100644
--- a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerConnectorTest.java
+++ b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerConnectorTest.java
@@ -18,10 +18,12 @@ package org.apache.kafka.connect.runtime;
 
 import org.apache.kafka.connect.connector.Connector;
 import org.apache.kafka.connect.connector.ConnectorContext;
+import org.apache.kafka.connect.runtime.isolation.Plugins;
 import org.easymock.EasyMock;
 import org.easymock.EasyMockRunner;
 import org.easymock.EasyMockSupport;
 import org.easymock.Mock;
+import org.junit.Before;
 import org.junit.Test;
 import org.junit.runner.RunWith;
 
@@ -39,12 +41,18 @@ public class WorkerConnectorTest extends EasyMockSupport {
         CONFIG.put(ConnectorConfig.CONNECTOR_CLASS_CONFIG, TestConnector.class.getName());
         CONFIG.put(ConnectorConfig.NAME_CONFIG, CONNECTOR);
     }
-    public static final ConnectorConfig CONNECTOR_CONFIG = new ConnectorConfig(CONFIG);
+    public ConnectorConfig connectorConfig;
 
+    @Mock Plugins plugins;
     @Mock Connector connector;
     @Mock ConnectorContext ctx;
     @Mock ConnectorStatus.Listener listener;
 
+    @Before
+    public void setup() {
+        connectorConfig = new ConnectorConfig(plugins, CONFIG);
+    }
+
     @Test
     public void testInitializeFailure() {
         RuntimeException exception = new RuntimeException();
@@ -62,7 +70,7 @@ public class WorkerConnectorTest extends EasyMockSupport {
 
         WorkerConnector workerConnector = new WorkerConnector(CONNECTOR, connector, ctx, listener);
 
-        workerConnector.initialize(CONNECTOR_CONFIG);
+        workerConnector.initialize(connectorConfig);
         workerConnector.shutdown();
 
         verifyAll();
@@ -87,7 +95,7 @@ public class WorkerConnectorTest extends EasyMockSupport {
 
         WorkerConnector workerConnector = new WorkerConnector(CONNECTOR, connector, ctx, listener);
 
-        workerConnector.initialize(CONNECTOR_CONFIG);
+        workerConnector.initialize(connectorConfig);
         workerConnector.transitionTo(TargetState.STARTED);
         workerConnector.shutdown();
 
@@ -115,7 +123,7 @@ public class WorkerConnectorTest extends EasyMockSupport {
 
         WorkerConnector workerConnector = new WorkerConnector(CONNECTOR, connector, ctx, listener);
 
-        workerConnector.initialize(CONNECTOR_CONFIG);
+        workerConnector.initialize(connectorConfig);
         workerConnector.transitionTo(TargetState.STARTED);
         workerConnector.shutdown();
 
@@ -146,7 +154,7 @@ public class WorkerConnectorTest extends EasyMockSupport {
 
         WorkerConnector workerConnector = new WorkerConnector(CONNECTOR, connector, ctx, listener);
 
-        workerConnector.initialize(CONNECTOR_CONFIG);
+        workerConnector.initialize(connectorConfig);
         workerConnector.transitionTo(TargetState.STARTED);
         workerConnector.transitionTo(TargetState.PAUSED);
         workerConnector.shutdown();
@@ -178,7 +186,7 @@ public class WorkerConnectorTest extends EasyMockSupport {
 
         WorkerConnector workerConnector = new WorkerConnector(CONNECTOR, connector, ctx, listener);
 
-        workerConnector.initialize(CONNECTOR_CONFIG);
+        workerConnector.initialize(connectorConfig);
         workerConnector.transitionTo(TargetState.PAUSED);
         workerConnector.transitionTo(TargetState.STARTED);
         workerConnector.shutdown();
@@ -203,7 +211,7 @@ public class WorkerConnectorTest extends EasyMockSupport {
 
         WorkerConnector workerConnector = new WorkerConnector(CONNECTOR, connector, ctx, listener);
 
-        workerConnector.initialize(CONNECTOR_CONFIG);
+        workerConnector.initialize(connectorConfig);
         workerConnector.transitionTo(TargetState.PAUSED);
         workerConnector.shutdown();
 
@@ -230,7 +238,7 @@ public class WorkerConnectorTest extends EasyMockSupport {
 
         WorkerConnector workerConnector = new WorkerConnector(CONNECTOR, connector, ctx, listener);
 
-        workerConnector.initialize(CONNECTOR_CONFIG);
+        workerConnector.initialize(connectorConfig);
         workerConnector.transitionTo(TargetState.STARTED);
         workerConnector.shutdown();
 
@@ -260,7 +268,7 @@ public class WorkerConnectorTest extends EasyMockSupport {
 
         WorkerConnector workerConnector = new WorkerConnector(CONNECTOR, connector, ctx, listener);
 
-        workerConnector.initialize(CONNECTOR_CONFIG);
+        workerConnector.initialize(connectorConfig);
         workerConnector.transitionTo(TargetState.STARTED);
         workerConnector.shutdown();
 
@@ -289,7 +297,7 @@ public class WorkerConnectorTest extends EasyMockSupport {
 
         WorkerConnector workerConnector = new WorkerConnector(CONNECTOR, connector, ctx, listener);
 
-        workerConnector.initialize(CONNECTOR_CONFIG);
+        workerConnector.initialize(connectorConfig);
         workerConnector.transitionTo(TargetState.STARTED);
         workerConnector.transitionTo(TargetState.STARTED);
         workerConnector.shutdown();
@@ -321,7 +329,7 @@ public class WorkerConnectorTest extends EasyMockSupport {
 
         WorkerConnector workerConnector = new WorkerConnector(CONNECTOR, connector, ctx, listener);
 
-        workerConnector.initialize(CONNECTOR_CONFIG);
+        workerConnector.initialize(connectorConfig);
         workerConnector.transitionTo(TargetState.STARTED);
         workerConnector.transitionTo(TargetState.PAUSED);
         workerConnector.transitionTo(TargetState.PAUSED);

http://git-wip-us.apache.org/repos/asf/kafka/blob/45f22617/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerSinkTaskTest.java
----------------------------------------------------------------------
diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerSinkTaskTest.java b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerSinkTaskTest.java
index 26ac486..eb5f25c 100644
--- a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerSinkTaskTest.java
+++ b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerSinkTaskTest.java
@@ -30,6 +30,7 @@ import org.apache.kafka.common.utils.Time;
 import org.apache.kafka.connect.data.Schema;
 import org.apache.kafka.connect.data.SchemaAndValue;
 import org.apache.kafka.connect.errors.RetriableException;
+import org.apache.kafka.connect.runtime.isolation.PluginClassLoader;
 import org.apache.kafka.connect.runtime.standalone.StandaloneConfig;
 import org.apache.kafka.connect.sink.SinkConnector;
 import org.apache.kafka.connect.sink.SinkRecord;
@@ -104,6 +105,8 @@ public class WorkerSinkTaskTest {
     private Capture<WorkerSinkTaskContext> sinkTaskContext = EasyMock.newCapture();
     private WorkerConfig workerConfig;
     @Mock
+    private PluginClassLoader pluginLoader;
+    @Mock
     private Converter keyConverter;
     @Mock
     private Converter valueConverter;
@@ -129,9 +132,10 @@ public class WorkerSinkTaskTest {
         workerProps.put("internal.value.converter.schemas.enable", "false");
         workerProps.put("offset.storage.file.filename", "/tmp/connect.offsets");
         workerConfig = new StandaloneConfig(workerProps);
+        pluginLoader = PowerMock.createMock(PluginClassLoader.class);
         workerTask = PowerMock.createPartialMock(
                 WorkerSinkTask.class, new String[]{"createConsumer"},
-                taskId, sinkTask, statusListener, initialState, workerConfig, keyConverter, valueConverter, transformationChain, time);
+                taskId, sinkTask, statusListener, initialState, workerConfig, keyConverter, valueConverter, transformationChain, pluginLoader, time);
 
         recordsReturned = 0;
     }
@@ -140,7 +144,7 @@ public class WorkerSinkTaskTest {
     public void testStartPaused() throws Exception {
         workerTask = PowerMock.createPartialMock(
                 WorkerSinkTask.class, new String[]{"createConsumer"},
-                taskId, sinkTask, statusListener, TargetState.PAUSED, workerConfig, keyConverter, valueConverter, transformationChain, time);
+                taskId, sinkTask, statusListener, TargetState.PAUSED, workerConfig, keyConverter, valueConverter, transformationChain, pluginLoader, time);
 
         expectInitializeTask();
         expectPollInitialAssignment();

http://git-wip-us.apache.org/repos/asf/kafka/blob/45f22617/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerSinkTaskThreadedTest.java
----------------------------------------------------------------------
diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerSinkTaskThreadedTest.java b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerSinkTaskThreadedTest.java
index fb7cf4f..29a6b52 100644
--- a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerSinkTaskThreadedTest.java
+++ b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerSinkTaskThreadedTest.java
@@ -28,6 +28,7 @@ import org.apache.kafka.common.utils.Time;
 import org.apache.kafka.connect.data.Schema;
 import org.apache.kafka.connect.data.SchemaAndValue;
 import org.apache.kafka.connect.errors.ConnectException;
+import org.apache.kafka.connect.runtime.isolation.PluginClassLoader;
 import org.apache.kafka.connect.runtime.standalone.StandaloneConfig;
 import org.apache.kafka.connect.sink.SinkConnector;
 import org.apache.kafka.connect.sink.SinkRecord;
@@ -101,6 +102,8 @@ public class WorkerSinkTaskThreadedTest extends ThreadedTest {
     @Mock private SinkTask sinkTask;
     private Capture<WorkerSinkTaskContext> sinkTaskContext = EasyMock.newCapture();
     private WorkerConfig workerConfig;
+    @Mock
+    private PluginClassLoader pluginLoader;
     @Mock private Converter keyConverter;
     @Mock private Converter valueConverter;
     @Mock private TransformationChain transformationChain;
@@ -125,10 +128,12 @@ public class WorkerSinkTaskThreadedTest extends ThreadedTest {
         workerProps.put("internal.key.converter.schemas.enable", "false");
         workerProps.put("internal.value.converter.schemas.enable", "false");
         workerProps.put("offset.storage.file.filename", "/tmp/connect.offsets");
+        pluginLoader = PowerMock.createMock(PluginClassLoader.class);
         workerConfig = new StandaloneConfig(workerProps);
         workerTask = PowerMock.createPartialMock(
                 WorkerSinkTask.class, new String[]{"createConsumer"},
-                taskId, sinkTask, statusListener, initialState, workerConfig, keyConverter, valueConverter, TransformationChain.<SinkRecord>noOp(), time);
+                taskId, sinkTask, statusListener, initialState, workerConfig, keyConverter,
+                valueConverter, TransformationChain.<SinkRecord>noOp(), pluginLoader, time);
 
         recordsReturned = 0;
     }

http://git-wip-us.apache.org/repos/asf/kafka/blob/45f22617/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerSourceTaskTest.java
----------------------------------------------------------------------
diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerSourceTaskTest.java b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerSourceTaskTest.java
index 31204f0..a3ddb3e 100644
--- a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerSourceTaskTest.java
+++ b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerSourceTaskTest.java
@@ -23,6 +23,7 @@ import org.apache.kafka.common.TopicPartition;
 import org.apache.kafka.common.record.InvalidRecordException;
 import org.apache.kafka.common.utils.Time;
 import org.apache.kafka.connect.data.Schema;
+import org.apache.kafka.connect.runtime.isolation.Plugins;
 import org.apache.kafka.connect.runtime.standalone.StandaloneConfig;
 import org.apache.kafka.connect.source.SourceRecord;
 import org.apache.kafka.connect.source.SourceTask;
@@ -82,6 +83,7 @@ public class WorkerSourceTaskTest extends ThreadedTest {
     private ExecutorService executor = Executors.newSingleThreadExecutor();
     private ConnectorTaskId taskId = new ConnectorTaskId("job", 0);
     private WorkerConfig config;
+    private Plugins plugins;
     @Mock private SourceTask sourceTask;
     @Mock private Converter keyConverter;
     @Mock private Converter valueConverter;
@@ -116,6 +118,7 @@ public class WorkerSourceTaskTest extends ThreadedTest {
         workerProps.put("internal.key.converter.schemas.enable", "false");
         workerProps.put("internal.value.converter.schemas.enable", "false");
         workerProps.put("offset.storage.file.filename", "/tmp/connect.offsets");
+        plugins = new Plugins(workerProps);
         config = new StandaloneConfig(workerProps);
         producerCallbacks = EasyMock.newCapture();
     }
@@ -126,7 +129,7 @@ public class WorkerSourceTaskTest extends ThreadedTest {
 
     private void createWorkerTask(TargetState initialState) {
         workerTask = new WorkerSourceTask(taskId, sourceTask, statusListener, initialState, keyConverter, valueConverter, transformationChain,
-                producer, offsetReader, offsetWriter, config, Time.SYSTEM);
+                producer, offsetReader, offsetWriter, config, plugins.delegatingLoader(), Time.SYSTEM);
     }
 
     @Test

http://git-wip-us.apache.org/repos/asf/kafka/blob/45f22617/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerTaskTest.java
----------------------------------------------------------------------
diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerTaskTest.java b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerTaskTest.java
index 6c2fc4d..871c887 100644
--- a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerTaskTest.java
+++ b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerTaskTest.java
@@ -44,10 +44,16 @@ public class WorkerTaskTest {
         ConnectorTaskId taskId = new ConnectorTaskId("foo", 0);
 
         TaskStatus.Listener statusListener = EasyMock.createMock(TaskStatus.Listener.class);
+        ClassLoader loader = EasyMock.createMock(ClassLoader.class);
 
         WorkerTask workerTask = partialMockBuilder(WorkerTask.class)
-                .withConstructor(ConnectorTaskId.class, TaskStatus.Listener.class, TargetState.class)
-                .withArgs(taskId, statusListener, TargetState.STARTED)
+                .withConstructor(
+                        ConnectorTaskId.class,
+                        TaskStatus.Listener.class,
+                        TargetState.class,
+                        ClassLoader.class
+                )
+                .withArgs(taskId, statusListener, TargetState.STARTED, loader)
                 .addMockedMethod("initialize")
                 .addMockedMethod("execute")
                 .addMockedMethod("close")
@@ -83,10 +89,16 @@ public class WorkerTaskTest {
         ConnectorTaskId taskId = new ConnectorTaskId("foo", 0);
 
         TaskStatus.Listener statusListener = EasyMock.createMock(TaskStatus.Listener.class);
+        ClassLoader loader = EasyMock.createMock(ClassLoader.class);
 
         WorkerTask workerTask = partialMockBuilder(WorkerTask.class)
-                .withConstructor(ConnectorTaskId.class, TaskStatus.Listener.class, TargetState.class)
-                .withArgs(taskId, statusListener, TargetState.STARTED)
+                .withConstructor(
+                        ConnectorTaskId.class,
+                        TaskStatus.Listener.class,
+                        TargetState.class,
+                        ClassLoader.class
+                )
+                .withArgs(taskId, statusListener, TargetState.STARTED, loader)
                 .addMockedMethod("initialize")
                 .addMockedMethod("execute")
                 .addMockedMethod("close")
@@ -115,10 +127,16 @@ public class WorkerTaskTest {
         ConnectorTaskId taskId = new ConnectorTaskId("foo", 0);
 
         TaskStatus.Listener statusListener = EasyMock.createMock(TaskStatus.Listener.class);
+        ClassLoader loader = EasyMock.createMock(ClassLoader.class);
 
         WorkerTask workerTask = partialMockBuilder(WorkerTask.class)
-                .withConstructor(ConnectorTaskId.class, TaskStatus.Listener.class, TargetState.class)
-                .withArgs(taskId, statusListener, TargetState.STARTED)
+                .withConstructor(
+                        ConnectorTaskId.class,
+                        TaskStatus.Listener.class,
+                        TargetState.class,
+                        ClassLoader.class
+                )
+                .withArgs(taskId, statusListener, TargetState.STARTED, loader)
                 .addMockedMethod("initialize")
                 .addMockedMethod("execute")
                 .addMockedMethod("close")

http://git-wip-us.apache.org/repos/asf/kafka/blob/45f22617/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerTest.java
----------------------------------------------------------------------
diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerTest.java b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerTest.java
index 47dfcef..ccc7e15 100644
--- a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerTest.java
+++ b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerTest.java
@@ -18,6 +18,7 @@ package org.apache.kafka.connect.runtime;
 
 import org.apache.kafka.clients.producer.KafkaProducer;
 import org.apache.kafka.common.config.ConfigDef;
+import org.apache.kafka.common.config.ConfigException;
 import org.apache.kafka.common.utils.Time;
 import org.apache.kafka.connect.connector.Connector;
 import org.apache.kafka.connect.connector.ConnectorContext;
@@ -26,6 +27,9 @@ import org.apache.kafka.connect.data.Schema;
 import org.apache.kafka.connect.data.SchemaAndValue;
 import org.apache.kafka.connect.errors.ConnectException;
 import org.apache.kafka.connect.json.JsonConverter;
+import org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader;
+import org.apache.kafka.connect.runtime.isolation.PluginClassLoader;
+import org.apache.kafka.connect.runtime.isolation.Plugins;
 import org.apache.kafka.connect.runtime.standalone.StandaloneConfig;
 import org.apache.kafka.connect.sink.SinkTask;
 import org.apache.kafka.connect.source.SourceRecord;
@@ -39,6 +43,7 @@ import org.apache.kafka.connect.util.MockTime;
 import org.apache.kafka.connect.util.ThreadedTest;
 import org.easymock.Capture;
 import org.easymock.EasyMock;
+import org.easymock.Mock;
 import org.junit.Before;
 import org.junit.Test;
 import org.junit.runner.RunWith;
@@ -59,7 +64,7 @@ import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.fail;
 
 @RunWith(PowerMockRunner.class)
-@PrepareForTest({Worker.class})
+@PrepareForTest({Worker.class, Plugins.class})
 @PowerMockIgnore("javax.management.*")
 public class WorkerTest extends ThreadedTest {
 
@@ -69,7 +74,14 @@ public class WorkerTest extends ThreadedTest {
 
     private WorkerConfig config;
     private Worker worker;
-    private ConnectorFactory connectorFactory = PowerMock.createMock(ConnectorFactory.class);
+
+    @Mock
+    private Plugins plugins = PowerMock.createMock(Plugins.class);
+    @Mock
+    private PluginClassLoader pluginLoader = PowerMock.createMock(PluginClassLoader.class);
+    @Mock
+    private DelegatingClassLoader delegatingLoader =
+            PowerMock.createMock(DelegatingClassLoader.class);
     private OffsetBackingStore offsetBackingStore = PowerMock.createMock(OffsetBackingStore.class);
     private TaskStatus.Listener taskStatusListener = PowerMock.createStrictMock(TaskStatus.Listener.class);
     private ConnectorStatus.Listener connectorStatusListener = PowerMock.createStrictMock(ConnectorStatus.Listener.class);
@@ -87,17 +99,22 @@ public class WorkerTest extends ThreadedTest {
         workerProps.put("internal.value.converter.schemas.enable", "false");
         workerProps.put("offset.storage.file.filename", "/tmp/connect.offsets");
         config = new StandaloneConfig(workerProps);
+
+        PowerMock.mockStatic(Plugins.class);
     }
 
     @Test
     public void testStartAndStopConnector() throws Exception {
+        expectConverters();
         expectStartStorage();
 
         // Create
         Connector connector = PowerMock.createMock(Connector.class);
         ConnectorContext ctx = PowerMock.createMock(ConnectorContext.class);
 
-        EasyMock.expect(connectorFactory.newConnector(WorkerTestConnector.class.getName())).andReturn(connector);
+        EasyMock.expect(plugins.currentThreadLoader()).andReturn(delegatingLoader).times(2);
+        EasyMock.expect(plugins.newConnector(WorkerTestConnector.class.getName()))
+                .andReturn(connector);
         EasyMock.expect(connector.version()).andReturn("1.0");
 
         Map<String, String> props = new HashMap<>();
@@ -106,11 +123,17 @@ public class WorkerTest extends ThreadedTest {
         props.put(ConnectorConfig.NAME_CONFIG, CONNECTOR_ID);
         props.put(ConnectorConfig.CONNECTOR_CLASS_CONFIG, WorkerTestConnector.class.getName());
 
+        EasyMock.expect(plugins.compareAndSwapLoaders(connector))
+                .andReturn(delegatingLoader)
+                .times(2);
         connector.initialize(EasyMock.anyObject(ConnectorContext.class));
         EasyMock.expectLastCall();
         connector.start(props);
         EasyMock.expectLastCall();
 
+        EasyMock.expect(Plugins.compareAndSwapLoaders(delegatingLoader))
+                .andReturn(pluginLoader).times(2);
+
         connectorStatusListener.onStartup(CONNECTOR_ID);
         EasyMock.expectLastCall();
 
@@ -125,7 +148,7 @@ public class WorkerTest extends ThreadedTest {
 
         PowerMock.replayAll();
 
-        worker = new Worker(WORKER_ID, new MockTime(), connectorFactory, config, offsetBackingStore);
+        worker = new Worker(WORKER_ID, new MockTime(), plugins, config, offsetBackingStore);
         worker.start();
 
         assertEquals(Collections.emptySet(), worker.connectorNames());
@@ -147,20 +170,33 @@ public class WorkerTest extends ThreadedTest {
 
     @Test
     public void testStartConnectorFailure() throws Exception {
+        expectConverters();
         expectStartStorage();
 
-        worker = new Worker(WORKER_ID, new MockTime(), connectorFactory, config, offsetBackingStore);
-        worker.start();
-
         Map<String, String> props = new HashMap<>();
         props.put(SinkConnectorConfig.TOPICS_CONFIG, "foo,bar");
         props.put(ConnectorConfig.TASKS_MAX_CONFIG, "1");
         props.put(ConnectorConfig.NAME_CONFIG, CONNECTOR_ID);
         props.put(ConnectorConfig.CONNECTOR_CLASS_CONFIG, "java.util.HashMap"); // Bad connector class name
 
-        connectorStatusListener.onFailure(EasyMock.eq(CONNECTOR_ID), EasyMock.<Throwable>anyObject());
+        EasyMock.expect(plugins.currentThreadLoader()).andReturn(delegatingLoader);
+        EasyMock.expect(plugins.newConnector(EasyMock.anyString()))
+                .andThrow(new ConnectException("Failed to find Connector"));
+
+        EasyMock.expect(Plugins.compareAndSwapLoaders(delegatingLoader))
+                .andReturn(pluginLoader);
+
+        connectorStatusListener.onFailure(
+                EasyMock.eq(CONNECTOR_ID),
+                EasyMock.<ConnectException>anyObject()
+        );
         EasyMock.expectLastCall();
 
+        PowerMock.replayAll();
+
+        worker = new Worker(WORKER_ID, new MockTime(), plugins, config, offsetBackingStore);
+        worker.start();
+
         assertFalse(worker.startConnector(CONNECTOR_ID, props, PowerMock.createMock(ConnectorContext.class), connectorStatusListener, TargetState.STARTED));
 
         assertEquals(Collections.emptySet(), worker.connectorNames());
@@ -170,13 +206,15 @@ public class WorkerTest extends ThreadedTest {
 
     @Test
     public void testAddConnectorByAlias() throws Exception {
+        expectConverters();
         expectStartStorage();
 
         // Create
         Connector connector = PowerMock.createMock(Connector.class);
         ConnectorContext ctx = PowerMock.createMock(ConnectorContext.class);
 
-        EasyMock.expect(connectorFactory.newConnector("WorkerTestConnector")).andReturn(connector);
+        EasyMock.expect(plugins.currentThreadLoader()).andReturn(delegatingLoader).times(2);
+        EasyMock.expect(plugins.newConnector("WorkerTestConnector")).andReturn(connector);
         EasyMock.expect(connector.version()).andReturn("1.0");
 
         Map<String, String> props = new HashMap<>();
@@ -185,11 +223,18 @@ public class WorkerTest extends ThreadedTest {
         props.put(ConnectorConfig.NAME_CONFIG, CONNECTOR_ID);
         props.put(ConnectorConfig.CONNECTOR_CLASS_CONFIG, "WorkerTestConnector");
 
+        EasyMock.expect(plugins.compareAndSwapLoaders(connector))
+                .andReturn(delegatingLoader)
+                .times(2);
         connector.initialize(EasyMock.anyObject(ConnectorContext.class));
         EasyMock.expectLastCall();
         connector.start(props);
         EasyMock.expectLastCall();
 
+        EasyMock.expect(Plugins.compareAndSwapLoaders(delegatingLoader))
+                .andReturn(pluginLoader)
+                .times(2);
+
         connectorStatusListener.onStartup(CONNECTOR_ID);
         EasyMock.expectLastCall();
 
@@ -204,7 +249,7 @@ public class WorkerTest extends ThreadedTest {
 
         PowerMock.replayAll();
 
-        worker = new Worker(WORKER_ID, new MockTime(), connectorFactory, config, offsetBackingStore);
+        worker = new Worker(WORKER_ID, new MockTime(), plugins, config, offsetBackingStore);
         worker.start();
 
         assertEquals(Collections.emptySet(), worker.connectorNames());
@@ -221,13 +266,15 @@ public class WorkerTest extends ThreadedTest {
 
     @Test
     public void testAddConnectorByShortAlias() throws Exception {
+        expectConverters();
         expectStartStorage();
 
         // Create
         Connector connector = PowerMock.createMock(Connector.class);
         ConnectorContext ctx = PowerMock.createMock(ConnectorContext.class);
 
-        EasyMock.expect(connectorFactory.newConnector("WorkerTest")).andReturn(connector);
+        EasyMock.expect(plugins.currentThreadLoader()).andReturn(delegatingLoader).times(2);
+        EasyMock.expect(plugins.newConnector("WorkerTest")).andReturn(connector);
         EasyMock.expect(connector.version()).andReturn("1.0");
 
         Map<String, String> props = new HashMap<>();
@@ -236,11 +283,18 @@ public class WorkerTest extends ThreadedTest {
         props.put(ConnectorConfig.NAME_CONFIG, CONNECTOR_ID);
         props.put(ConnectorConfig.CONNECTOR_CLASS_CONFIG, "WorkerTest");
 
+        EasyMock.expect(plugins.compareAndSwapLoaders(connector))
+                .andReturn(delegatingLoader)
+                .times(2);
         connector.initialize(EasyMock.anyObject(ConnectorContext.class));
         EasyMock.expectLastCall();
         connector.start(props);
         EasyMock.expectLastCall();
 
+        EasyMock.expect(Plugins.compareAndSwapLoaders(delegatingLoader))
+                .andReturn(pluginLoader)
+                .times(2);
+
         connectorStatusListener.onStartup(CONNECTOR_ID);
         EasyMock.expectLastCall();
 
@@ -255,7 +309,7 @@ public class WorkerTest extends ThreadedTest {
 
         PowerMock.replayAll();
 
-        worker = new Worker(WORKER_ID, new MockTime(), connectorFactory, config, offsetBackingStore);
+        worker = new Worker(WORKER_ID, new MockTime(), plugins, config, offsetBackingStore);
         worker.start();
 
         assertEquals(Collections.emptySet(), worker.connectorNames());
@@ -272,11 +326,12 @@ public class WorkerTest extends ThreadedTest {
 
     @Test
     public void testStopInvalidConnector() {
+        expectConverters();
         expectStartStorage();
 
         PowerMock.replayAll();
 
-        worker = new Worker(WORKER_ID, new MockTime(), connectorFactory, config, offsetBackingStore);
+        worker = new Worker(WORKER_ID, new MockTime(), plugins, config, offsetBackingStore);
         worker.start();
 
         worker.stopConnector(CONNECTOR_ID);
@@ -284,13 +339,16 @@ public class WorkerTest extends ThreadedTest {
 
     @Test
     public void testReconfigureConnectorTasks() throws Exception {
+        expectConverters();
         expectStartStorage();
 
         // Create
         Connector connector = PowerMock.createMock(Connector.class);
         ConnectorContext ctx = PowerMock.createMock(ConnectorContext.class);
 
-        EasyMock.expect(connectorFactory.newConnector(WorkerTestConnector.class.getName())).andReturn(connector);
+        EasyMock.expect(plugins.currentThreadLoader()).andReturn(delegatingLoader).times(3);
+        EasyMock.expect(plugins.newConnector(WorkerTestConnector.class.getName()))
+                .andReturn(connector);
         EasyMock.expect(connector.version()).andReturn("1.0");
 
         Map<String, String> props = new HashMap<>();
@@ -299,11 +357,18 @@ public class WorkerTest extends ThreadedTest {
         props.put(ConnectorConfig.NAME_CONFIG, CONNECTOR_ID);
         props.put(ConnectorConfig.CONNECTOR_CLASS_CONFIG, WorkerTestConnector.class.getName());
 
+        EasyMock.expect(plugins.compareAndSwapLoaders(connector))
+                .andReturn(delegatingLoader)
+                .times(3);
         connector.initialize(EasyMock.anyObject(ConnectorContext.class));
         EasyMock.expectLastCall();
         connector.start(props);
         EasyMock.expectLastCall();
 
+        EasyMock.expect(Plugins.compareAndSwapLoaders(delegatingLoader))
+                .andReturn(pluginLoader)
+                .times(3);
+
         connectorStatusListener.onStartup(CONNECTOR_ID);
         EasyMock.expectLastCall();
 
@@ -324,7 +389,7 @@ public class WorkerTest extends ThreadedTest {
 
         PowerMock.replayAll();
 
-        worker = new Worker(WORKER_ID, new MockTime(), connectorFactory, config, offsetBackingStore);
+        worker = new Worker(WORKER_ID, new MockTime(), plugins, config, offsetBackingStore);
         worker.start();
 
         assertEquals(Collections.emptySet(), worker.connectorNames());
@@ -355,16 +420,16 @@ public class WorkerTest extends ThreadedTest {
 
     @Test
     public void testAddRemoveTask() throws Exception {
+        expectConverters();
         expectStartStorage();
 
         // Create
         TestSourceTask task = PowerMock.createMock(TestSourceTask.class);
         WorkerSourceTask workerTask = PowerMock.createMock(WorkerSourceTask.class);
         EasyMock.expect(workerTask.id()).andStubReturn(TASK_ID);
-
-        EasyMock.expect(connectorFactory.newTask(TestSourceTask.class)).andReturn(task);
         EasyMock.expect(task.version()).andReturn("1.0");
 
+        EasyMock.expect(plugins.currentThreadLoader()).andReturn(delegatingLoader).times(2);
         PowerMock.expectNew(
                 WorkerSourceTask.class, EasyMock.eq(TASK_ID),
                 EasyMock.eq(task),
@@ -376,16 +441,30 @@ public class WorkerTest extends ThreadedTest {
                 EasyMock.anyObject(KafkaProducer.class),
                 EasyMock.anyObject(OffsetStorageReader.class),
                 EasyMock.anyObject(OffsetStorageWriter.class),
-                EasyMock.anyObject(WorkerConfig.class),
+                EasyMock.eq(config),
+                EasyMock.anyObject(ClassLoader.class),
                 EasyMock.anyObject(Time.class))
                 .andReturn(workerTask);
         Map<String, String> origProps = new HashMap<>();
         origProps.put(TaskConfig.TASK_CLASS_CONFIG, TestSourceTask.class.getName());
+
+        EasyMock.expect(plugins.newTask(TestSourceTask.class)).andReturn(task);
         workerTask.initialize(new TaskConfig(origProps));
         EasyMock.expectLastCall();
         workerTask.run();
         EasyMock.expectLastCall();
 
+        EasyMock.expect(plugins.delegatingLoader()).andReturn(delegatingLoader);
+        EasyMock.expect(delegatingLoader.connectorLoader(WorkerTestConnector.class.getName()))
+                .andReturn(pluginLoader);
+
+        EasyMock.expect(Plugins.compareAndSwapLoaders(pluginLoader)).andReturn(delegatingLoader)
+                .times(2);
+
+        EasyMock.expect(workerTask.loader()).andReturn(pluginLoader);
+        EasyMock.expect(Plugins.compareAndSwapLoaders(delegatingLoader)).andReturn(pluginLoader)
+                .times(2);
+
         // Remove
         workerTask.stop();
         EasyMock.expectLastCall();
@@ -396,7 +475,7 @@ public class WorkerTest extends ThreadedTest {
 
         PowerMock.replayAll();
 
-        worker = new Worker(WORKER_ID, new MockTime(), connectorFactory, config, offsetBackingStore);
+        worker = new Worker(WORKER_ID, new MockTime(), plugins, config, offsetBackingStore);
         worker.start();
         assertEquals(Collections.emptySet(), worker.taskIds());
         worker.startTask(TASK_ID, anyConnectorConfigMap(), origProps, taskStatusListener, TargetState.STARTED);
@@ -411,24 +490,39 @@ public class WorkerTest extends ThreadedTest {
 
     @Test
     public void testStartTaskFailure() throws Exception {
+        expectConverters();
         expectStartStorage();
 
-        worker = new Worker(WORKER_ID, new MockTime(), connectorFactory, config, offsetBackingStore);
-        worker.start();
-
         Map<String, String> origProps = new HashMap<>();
         origProps.put(TaskConfig.TASK_CLASS_CONFIG, "missing.From.This.Workers.Classpath");
 
-        assertFalse(worker.startTask(TASK_ID, anyConnectorConfigMap(), origProps, taskStatusListener, TargetState.STARTED));
+        EasyMock.expect(plugins.currentThreadLoader()).andReturn(delegatingLoader);
+        EasyMock.expect(plugins.delegatingLoader()).andReturn(delegatingLoader);
+        EasyMock.expect(delegatingLoader.connectorLoader(WorkerTestConnector.class.getName()))
+                .andReturn(pluginLoader);
+
+        EasyMock.expect(pluginLoader.loadClass(origProps.get(TaskConfig.TASK_CLASS_CONFIG)))
+                .andThrow(new ClassNotFoundException());
+
+        EasyMock.expect(Plugins.compareAndSwapLoaders(delegatingLoader))
+                .andReturn(pluginLoader);
 
-        taskStatusListener.onFailure(EasyMock.eq(TASK_ID), EasyMock.<Throwable>anyObject());
+        taskStatusListener.onFailure(EasyMock.eq(TASK_ID), EasyMock.<ConfigException>anyObject());
         EasyMock.expectLastCall();
 
+        PowerMock.replayAll();
+
+        worker = new Worker(WORKER_ID, new MockTime(), plugins, config, offsetBackingStore);
+        worker.start();
+
+        assertFalse(worker.startTask(TASK_ID, anyConnectorConfigMap(), origProps, taskStatusListener, TargetState.STARTED));
+
         assertEquals(Collections.emptySet(), worker.taskIds());
     }
 
     @Test
     public void testCleanupTasksOnStop() throws Exception {
+        expectConverters();
         expectStartStorage();
 
         // Create
@@ -436,9 +530,10 @@ public class WorkerTest extends ThreadedTest {
         WorkerSourceTask workerTask = PowerMock.createMock(WorkerSourceTask.class);
         EasyMock.expect(workerTask.id()).andStubReturn(TASK_ID);
 
-        EasyMock.expect(connectorFactory.newTask(TestSourceTask.class)).andReturn(task);
+        EasyMock.expect(plugins.newTask(TestSourceTask.class)).andReturn(task);
         EasyMock.expect(task.version()).andReturn("1.0");
-        
+
+        EasyMock.expect(plugins.currentThreadLoader()).andReturn(delegatingLoader).times(2);
         PowerMock.expectNew(
                 WorkerSourceTask.class, EasyMock.eq(TASK_ID),
                 EasyMock.eq(task),
@@ -451,6 +546,7 @@ public class WorkerTest extends ThreadedTest {
                 EasyMock.anyObject(OffsetStorageReader.class),
                 EasyMock.anyObject(OffsetStorageWriter.class),
                 EasyMock.anyObject(WorkerConfig.class),
+                EasyMock.eq(pluginLoader),
                 EasyMock.anyObject(Time.class))
                 .andReturn(workerTask);
         Map<String, String> origProps = new HashMap<>();
@@ -460,6 +556,17 @@ public class WorkerTest extends ThreadedTest {
         workerTask.run();
         EasyMock.expectLastCall();
 
+        EasyMock.expect(plugins.delegatingLoader()).andReturn(delegatingLoader);
+        EasyMock.expect(delegatingLoader.connectorLoader(WorkerTestConnector.class.getName()))
+                .andReturn(pluginLoader);
+
+        EasyMock.expect(Plugins.compareAndSwapLoaders(pluginLoader)).andReturn(delegatingLoader)
+                .times(2);
+
+        EasyMock.expect(workerTask.loader()).andReturn(pluginLoader);
+        EasyMock.expect(Plugins.compareAndSwapLoaders(delegatingLoader)).andReturn(pluginLoader)
+                .times(2);
+
         // Remove on Worker.stop()
         workerTask.stop();
         EasyMock.expectLastCall();
@@ -472,7 +579,7 @@ public class WorkerTest extends ThreadedTest {
 
         PowerMock.replayAll();
 
-        worker = new Worker(WORKER_ID, new MockTime(), connectorFactory, config, offsetBackingStore);
+        worker = new Worker(WORKER_ID, new MockTime(), plugins, config, offsetBackingStore);
         worker.start();
         worker.startTask(TASK_ID, anyConnectorConfigMap(), origProps, taskStatusListener, TargetState.STARTED);
         worker.stop();
@@ -482,18 +589,20 @@ public class WorkerTest extends ThreadedTest {
 
     @Test
     public void testConverterOverrides() throws Exception {
+        expectConverters();
         expectStartStorage();
 
         TestSourceTask task = PowerMock.createMock(TestSourceTask.class);
         WorkerSourceTask workerTask = PowerMock.createMock(WorkerSourceTask.class);
         EasyMock.expect(workerTask.id()).andStubReturn(TASK_ID);
 
-        EasyMock.expect(connectorFactory.newTask(TestSourceTask.class)).andReturn(task);
+        EasyMock.expect(plugins.newTask(TestSourceTask.class)).andReturn(task);
         EasyMock.expect(task.version()).andReturn("1.0");
 
         Capture<TestConverter> keyConverter = EasyMock.newCapture();
         Capture<TestConverter> valueConverter = EasyMock.newCapture();
 
+        EasyMock.expect(plugins.currentThreadLoader()).andReturn(delegatingLoader).times(2);
         PowerMock.expectNew(
                 WorkerSourceTask.class, EasyMock.eq(TASK_ID),
                 EasyMock.eq(task),
@@ -506,15 +615,27 @@ public class WorkerTest extends ThreadedTest {
                 EasyMock.anyObject(OffsetStorageReader.class),
                 EasyMock.anyObject(OffsetStorageWriter.class),
                 EasyMock.anyObject(WorkerConfig.class),
+                EasyMock.eq(pluginLoader),
                 EasyMock.anyObject(Time.class))
                 .andReturn(workerTask);
         Map<String, String> origProps = new HashMap<>();
         origProps.put(TaskConfig.TASK_CLASS_CONFIG, TestSourceTask.class.getName());
+
         workerTask.initialize(new TaskConfig(origProps));
         EasyMock.expectLastCall();
         workerTask.run();
         EasyMock.expectLastCall();
 
+        EasyMock.expect(plugins.delegatingLoader()).andReturn(delegatingLoader);
+        EasyMock.expect(delegatingLoader.connectorLoader(WorkerTestConnector.class.getName()))
+                .andReturn(pluginLoader);
+
+        EasyMock.expect(Plugins.compareAndSwapLoaders(pluginLoader)).andReturn(delegatingLoader)
+                .times(2);
+
+        EasyMock.expect(workerTask.loader()).andReturn(pluginLoader);
+        EasyMock.expect(Plugins.compareAndSwapLoaders(delegatingLoader)).andReturn(pluginLoader)
+                .times(2);
         // Remove
         workerTask.stop();
         EasyMock.expectLastCall();
@@ -525,7 +646,7 @@ public class WorkerTest extends ThreadedTest {
 
         PowerMock.replayAll();
 
-        worker = new Worker(WORKER_ID, new MockTime(), connectorFactory, config, offsetBackingStore);
+        worker = new Worker(WORKER_ID, new MockTime(), plugins, config, offsetBackingStore);
         worker.start();
         assertEquals(Collections.emptySet(), worker.taskIds());
         Map<String, String> connProps = anyConnectorConfigMap();
@@ -559,6 +680,51 @@ public class WorkerTest extends ThreadedTest {
         EasyMock.expectLastCall();
     }
 
+    private void expectConverters() {
+        expectConverters(JsonConverter.class);
+    }
+
+    private void expectConverters(Class<? extends Converter> converterClass) {
+        // connector default
+        Converter keyConverter = PowerMock.createMock(converterClass);
+        Converter valueConverter = PowerMock.createMock(converterClass);
+        //internal
+        Converter internalKeyConverter = PowerMock.createMock(converterClass);
+        Converter internalValueConverter = PowerMock.createMock(converterClass);
+
+        // Instantiate and configure default
+        EasyMock.expect(plugins.newConverter(JsonConverter.class.getName(), config))
+                .andReturn(keyConverter);
+        keyConverter.configure(
+                EasyMock.<Map<String, ?>>anyObject(),
+                EasyMock.anyBoolean()
+        );
+        EasyMock.expectLastCall();
+        EasyMock.expect(plugins.newConverter(JsonConverter.class.getName(), config))
+                .andReturn(valueConverter);
+        valueConverter.configure(
+                EasyMock.<Map<String, ?>>anyObject(),
+                EasyMock.anyBoolean()
+        );
+        EasyMock.expectLastCall();
+
+        // Instantiate and configure internal
+        EasyMock.expect(plugins.newConverter(JsonConverter.class.getName(), config))
+                .andReturn(internalKeyConverter);
+        internalKeyConverter.configure(
+                EasyMock.<Map<String, ?>>anyObject(),
+                EasyMock.anyBoolean()
+        );
+        EasyMock.expectLastCall();
+        EasyMock.expect(plugins.newConverter(JsonConverter.class.getName(), config))
+                .andReturn(internalValueConverter);
+        internalValueConverter.configure(
+                EasyMock.<Map<String, ?>>anyObject(),
+                EasyMock.anyBoolean()
+        );
+        EasyMock.expectLastCall();
+    }
+
     private Map<String, String> anyConnectorConfigMap() {
         Map<String, String> props = new HashMap<>();
         props.put(ConnectorConfig.NAME_CONFIG, CONNECTOR_ID);

http://git-wip-us.apache.org/repos/asf/kafka/blob/45f22617/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/distributed/DistributedHerderTest.java
----------------------------------------------------------------------
diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/distributed/DistributedHerderTest.java b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/distributed/DistributedHerderTest.java
index 70d0736..18d83c5 100644
--- a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/distributed/DistributedHerderTest.java
+++ b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/distributed/DistributedHerderTest.java
@@ -26,13 +26,15 @@ import org.apache.kafka.connect.connector.ConnectorContext;
 import org.apache.kafka.connect.errors.AlreadyExistsException;
 import org.apache.kafka.connect.errors.NotFoundException;
 import org.apache.kafka.connect.runtime.ConnectorConfig;
-import org.apache.kafka.connect.runtime.ConnectorFactory;
 import org.apache.kafka.connect.runtime.Herder;
 import org.apache.kafka.connect.runtime.SinkConnectorConfig;
 import org.apache.kafka.connect.runtime.TargetState;
 import org.apache.kafka.connect.runtime.TaskConfig;
 import org.apache.kafka.connect.runtime.Worker;
 import org.apache.kafka.connect.runtime.WorkerConfig;
+import org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader;
+import org.apache.kafka.connect.runtime.isolation.PluginClassLoader;
+import org.apache.kafka.connect.runtime.isolation.Plugins;
 import org.apache.kafka.connect.runtime.rest.entities.ConnectorInfo;
 import org.apache.kafka.connect.runtime.rest.entities.TaskInfo;
 import org.apache.kafka.connect.runtime.rest.errors.BadRequestException;
@@ -73,7 +75,7 @@ import static org.junit.Assert.assertTrue;
 import static org.junit.Assert.fail;
 
 @RunWith(PowerMockRunner.class)
-@PrepareForTest(DistributedHerder.class)
+@PrepareForTest({DistributedHerder.class, Plugins.class})
 @PowerMockIgnore("javax.management.*")
 public class DistributedHerderTest {
     private static final Map<String, String> HERDER_CONFIG = new HashMap<>();
@@ -150,6 +152,12 @@ public class DistributedHerderTest {
     private DistributedHerder herder;
     @Mock private Worker worker;
     @Mock private Callback<Herder.Created<ConnectorInfo>> putConnectorCallback;
+    @Mock
+    private Plugins plugins;
+    @Mock
+    private PluginClassLoader pluginLoader;
+    @Mock
+    private DelegatingClassLoader delegatingLoader;
 
     private ConfigBackingStore.UpdateListener configUpdateListener;
     private WorkerRebalanceListener rebalanceListener;
@@ -165,7 +173,10 @@ public class DistributedHerderTest {
 
         configUpdateListener = herder.new ConfigUpdateListener();
         rebalanceListener = herder.new RebalanceListener();
-
+        plugins = PowerMock.createMock(Plugins.class);
+        pluginLoader = PowerMock.createMock(PluginClassLoader.class);
+        delegatingLoader = PowerMock.createMock(DelegatingClassLoader.class);
+        PowerMock.mockStatic(Plugins.class);
         PowerMock.expectPrivate(herder, "updateDeletedConnectorStatus").andVoid().anyTimes();
     }
 
@@ -173,6 +184,7 @@ public class DistributedHerderTest {
     public void testJoinAssignment() throws Exception {
         // Join group and get assignment
         EasyMock.expect(member.memberId()).andStubReturn("member");
+        EasyMock.expect(worker.getPlugins()).andReturn(plugins);
         expectRebalance(1, Arrays.asList(CONN1), Arrays.asList(TASK1));
         expectPostRebalanceCatchup(SNAPSHOT);
         worker.startConnector(EasyMock.eq(CONN1), EasyMock.<Map<String, String>>anyObject(), EasyMock.<ConnectorContext>anyObject(),
@@ -198,6 +210,7 @@ public class DistributedHerderTest {
     public void testRebalance() throws Exception {
         // Join group and get assignment
         EasyMock.expect(member.memberId()).andStubReturn("member");
+        EasyMock.expect(worker.getPlugins()).andReturn(plugins);
         expectRebalance(1, Arrays.asList(CONN1), Arrays.asList(TASK1));
         expectPostRebalanceCatchup(SNAPSHOT);
         worker.startConnector(EasyMock.eq(CONN1), EasyMock.<Map<String, String>>anyObject(), EasyMock.<ConnectorContext>anyObject(),
@@ -211,6 +224,7 @@ public class DistributedHerderTest {
         member.poll(EasyMock.anyInt());
         PowerMock.expectLastCall();
 
+        EasyMock.expect(worker.getPlugins()).andReturn(plugins);
         expectRebalance(Arrays.asList(CONN1), Arrays.asList(TASK1), ConnectProtocol.Assignment.NO_ERROR,
                 1, Arrays.asList(CONN1), Arrays.<ConnectorTaskId>asList());
 
@@ -235,6 +249,7 @@ public class DistributedHerderTest {
     public void testRebalanceFailedConnector() throws Exception {
         // Join group and get assignment
         EasyMock.expect(member.memberId()).andStubReturn("member");
+        EasyMock.expect(worker.getPlugins()).andReturn(plugins);
         expectRebalance(1, Arrays.asList(CONN1), Arrays.asList(TASK1));
         expectPostRebalanceCatchup(SNAPSHOT);
         worker.startConnector(EasyMock.eq(CONN1), EasyMock.<Map<String, String>>anyObject(), EasyMock.<ConnectorContext>anyObject(),
@@ -304,12 +319,13 @@ public class DistributedHerderTest {
         PowerMock.expectLastCall();
 
         // config validation
-        ConnectorFactory connectorFactoryMock = PowerMock.createMock(ConnectorFactory.class);
-        EasyMock.expect(worker.getConnectorFactory()).andStubReturn(connectorFactoryMock);
         Connector connectorMock = PowerMock.createMock(Connector.class);
-        EasyMock.expect(connectorFactoryMock.newConnector(EasyMock.anyString())).andReturn(connectorMock);
+        EasyMock.expect(worker.getPlugins()).andReturn(plugins).times(4);
+        EasyMock.expect(plugins.compareAndSwapLoaders(connectorMock)).andReturn(delegatingLoader);
+        EasyMock.expect(plugins.newConnector(EasyMock.anyString())).andReturn(connectorMock);
         EasyMock.expect(connectorMock.config()).andReturn(new ConfigDef());
         EasyMock.expect(connectorMock.validate(CONN2_CONFIG)).andReturn(new Config(Collections.<ConfigValue>emptyList()));
+        EasyMock.expect(Plugins.compareAndSwapLoaders(delegatingLoader)).andReturn(pluginLoader);
 
         // CONN2 is new, should succeed
         configBackingStore.putConnectorConfig(CONN2, CONN2_CONFIG);
@@ -342,12 +358,11 @@ public class DistributedHerderTest {
         PowerMock.expectLastCall();
 
         // config validation
-        ConnectorFactory connectorFactoryMock = PowerMock.createMock(ConnectorFactory.class);
-        EasyMock.expect(worker.getConnectorFactory()).andStubReturn(connectorFactoryMock);
         Connector connectorMock = PowerMock.createMock(Connector.class);
-        EasyMock.expect(connectorFactoryMock.newConnector(EasyMock.anyString())).andReturn(connectorMock);
-        EasyMock.expect(connectorMock.config()).andReturn(new ConfigDef());
-        EasyMock.expect(connectorMock.validate(config)).andReturn(new Config(Collections.<ConfigValue>emptyList()));
+        EasyMock.expect(worker.getPlugins()).andReturn(plugins).times(3);
+        EasyMock.expect(plugins.compareAndSwapLoaders(connectorMock)).andReturn(delegatingLoader);
+        EasyMock.expect(plugins.newConnector(EasyMock.anyString())).andReturn(connectorMock);
+        EasyMock.expect(Plugins.compareAndSwapLoaders(delegatingLoader)).andReturn(pluginLoader);
 
         // CONN2 creation should fail
 
@@ -380,10 +395,10 @@ public class DistributedHerderTest {
         PowerMock.expectLastCall();
 
         // config validation
-        ConnectorFactory connectorFactoryMock = PowerMock.createMock(ConnectorFactory.class);
-        EasyMock.expect(worker.getConnectorFactory()).andStubReturn(connectorFactoryMock);
         Connector connectorMock = PowerMock.createMock(Connector.class);
-        EasyMock.expect(connectorFactoryMock.newConnector(EasyMock.anyString())).andReturn(connectorMock);
+        EasyMock.expect(worker.getPlugins()).andReturn(plugins).times(4);
+        EasyMock.expect(plugins.compareAndSwapLoaders(connectorMock)).andReturn(delegatingLoader);
+        EasyMock.expect(plugins.newConnector(EasyMock.anyString())).andReturn(connectorMock);
 
         ConfigDef configDef = new ConfigDef();
         configDef.define("foo.bar", ConfigDef.Type.STRING, ConfigDef.Importance.HIGH, "foo.bar doc");
@@ -392,6 +407,7 @@ public class DistributedHerderTest {
         ConfigValue validatedValue = new ConfigValue("foo.bar");
         validatedValue.addErrorMessage("Failed foo.bar validation");
         EasyMock.expect(connectorMock.validate(CONN2_CONFIG)).andReturn(new Config(singletonList(validatedValue)));
+        EasyMock.expect(Plugins.compareAndSwapLoaders(delegatingLoader)).andReturn(pluginLoader);
 
         // CONN2 creation should fail
 
@@ -427,12 +443,13 @@ public class DistributedHerderTest {
         config.put(ConnectorConfig.NAME_CONFIG, "test-group");
 
         // config validation
-        ConnectorFactory connectorFactoryMock = PowerMock.createMock(ConnectorFactory.class);
-        EasyMock.expect(worker.getConnectorFactory()).andStubReturn(connectorFactoryMock);
         Connector connectorMock = PowerMock.createMock(SinkConnector.class);
-        EasyMock.expect(connectorFactoryMock.newConnector(EasyMock.anyString())).andReturn(connectorMock);
+        EasyMock.expect(worker.getPlugins()).andReturn(plugins).times(4);
+        EasyMock.expect(plugins.compareAndSwapLoaders(connectorMock)).andReturn(delegatingLoader);
+        EasyMock.expect(plugins.newConnector(EasyMock.anyString())).andReturn(connectorMock);
         EasyMock.expect(connectorMock.config()).andReturn(new ConfigDef());
         EasyMock.expect(connectorMock.validate(config)).andReturn(new Config(Collections.<ConfigValue>emptyList()));
+        EasyMock.expect(Plugins.compareAndSwapLoaders(delegatingLoader)).andReturn(pluginLoader);
 
         // CONN2 creation should fail because the worker group id (connect-test-group) conflicts with
         // the consumer group id we would use for this sink
@@ -483,6 +500,7 @@ public class DistributedHerderTest {
     public void testDestroyConnector() throws Exception {
         EasyMock.expect(member.memberId()).andStubReturn("leader");
         // Start with one connector
+        EasyMock.expect(worker.getPlugins()).andReturn(plugins);
         expectRebalance(1, Arrays.asList(CONN1), Collections.<ConnectorTaskId>emptyList());
         expectPostRebalanceCatchup(SNAPSHOT);
         worker.startConnector(EasyMock.eq(CONN1), EasyMock.<Map<String, String>>anyObject(), EasyMock.<ConnectorContext>anyObject(),
@@ -516,6 +534,7 @@ public class DistributedHerderTest {
 
         // get the initial assignment
         EasyMock.expect(member.memberId()).andStubReturn("leader");
+        EasyMock.expect(worker.getPlugins()).andReturn(plugins);
         expectRebalance(1, singletonList(CONN1), Collections.<ConnectorTaskId>emptyList());
         expectPostRebalanceCatchup(SNAPSHOT);
         member.poll(EasyMock.anyInt());
@@ -535,6 +554,7 @@ public class DistributedHerderTest {
 
         worker.stopConnector(CONN1);
         PowerMock.expectLastCall().andReturn(true);
+        EasyMock.expect(worker.getPlugins()).andReturn(plugins);
         worker.startConnector(EasyMock.eq(CONN1), EasyMock.<Map<String, String>>anyObject(), EasyMock.<ConnectorContext>anyObject(),
                 EasyMock.eq(herder), EasyMock.eq(TargetState.STARTED));
         PowerMock.expectLastCall().andReturn(true);
@@ -839,6 +859,7 @@ public class DistributedHerderTest {
         worker.startConnector(EasyMock.eq(CONN1), EasyMock.<Map<String, String>>anyObject(), EasyMock.<ConnectorContext>anyObject(),
                 EasyMock.eq(herder), EasyMock.eq(TargetState.STARTED));
         PowerMock.expectLastCall().andReturn(true);
+        EasyMock.expect(worker.getPlugins()).andReturn(plugins);
         EasyMock.expect(worker.isRunning(CONN1)).andReturn(true);
         EasyMock.expect(worker.connectorTaskConfigs(CONN1, MAX_TASKS, null)).andReturn(TASK_CONFIGS);
         member.poll(EasyMock.anyInt());
@@ -867,6 +888,7 @@ public class DistributedHerderTest {
         worker.startConnector(EasyMock.eq(CONN1), EasyMock.<Map<String, String>>anyObject(), EasyMock.<ConnectorContext>anyObject(),
                 EasyMock.eq(herder), EasyMock.eq(TargetState.STARTED));
         PowerMock.expectLastCall().andReturn(true);
+        EasyMock.expect(worker.getPlugins()).andReturn(plugins);
         EasyMock.expect(worker.isRunning(CONN1)).andReturn(true);
         EasyMock.expect(worker.connectorTaskConfigs(CONN1, MAX_TASKS, null)).andReturn(TASK_CONFIGS);
         member.poll(EasyMock.anyInt());
@@ -882,6 +904,7 @@ public class DistributedHerderTest {
         worker.startConnector(EasyMock.eq(CONN1), EasyMock.<Map<String, String>>anyObject(), EasyMock.<ConnectorContext>anyObject(),
                 EasyMock.eq(herder), EasyMock.eq(TargetState.STARTED));
         PowerMock.expectLastCall().andReturn(true);
+        EasyMock.expect(worker.getPlugins()).andReturn(plugins);
         EasyMock.expect(worker.isRunning(CONN1)).andReturn(true);
         EasyMock.expect(worker.connectorTaskConfigs(CONN1, MAX_TASKS, null)).andReturn(TASK_CONFIGS);
         member.poll(EasyMock.anyInt());
@@ -909,6 +932,7 @@ public class DistributedHerderTest {
         worker.startConnector(EasyMock.eq(CONN1), EasyMock.<Map<String, String>>anyObject(), EasyMock.<ConnectorContext>anyObject(),
                 EasyMock.eq(herder), EasyMock.eq(TargetState.STARTED));
         PowerMock.expectLastCall().andReturn(true);
+        EasyMock.expect(worker.getPlugins()).andReturn(plugins);
         EasyMock.expect(worker.isRunning(CONN1)).andReturn(true);
         EasyMock.expect(worker.connectorTaskConfigs(CONN1, MAX_TASKS, null)).andReturn(TASK_CONFIGS);
         member.poll(EasyMock.anyInt());
@@ -948,6 +972,7 @@ public class DistributedHerderTest {
         worker.startConnector(EasyMock.eq(CONN1), EasyMock.<Map<String, String>>anyObject(), EasyMock.<ConnectorContext>anyObject(),
                 EasyMock.eq(herder), EasyMock.eq(TargetState.PAUSED));
         PowerMock.expectLastCall().andReturn(true);
+        EasyMock.expect(worker.getPlugins()).andReturn(plugins);
 
         member.poll(EasyMock.anyInt());
         PowerMock.expectLastCall();
@@ -1155,6 +1180,7 @@ public class DistributedHerderTest {
         worker.startConnector(EasyMock.eq(CONN1), EasyMock.<Map<String, String>>anyObject(), EasyMock.<ConnectorContext>anyObject(),
                 EasyMock.eq(herder), EasyMock.eq(TargetState.STARTED));
         PowerMock.expectLastCall().andReturn(true);
+        EasyMock.expect(worker.getPlugins()).andReturn(plugins);
         EasyMock.expect(worker.connectorTaskConfigs(CONN1, MAX_TASKS, null)).andReturn(TASK_CONFIGS);
         worker.startTask(EasyMock.eq(TASK1), EasyMock.<Map<String, String>>anyObject(), EasyMock.<Map<String, String>>anyObject(),
                 EasyMock.eq(herder), EasyMock.eq(TargetState.STARTED));
@@ -1236,12 +1262,13 @@ public class DistributedHerderTest {
         PowerMock.expectLastCall();
 
         // config validation
-        ConnectorFactory connectorFactoryMock = PowerMock.createMock(ConnectorFactory.class);
-        EasyMock.expect(worker.getConnectorFactory()).andStubReturn(connectorFactoryMock);
         Connector connectorMock = PowerMock.createMock(Connector.class);
-        EasyMock.expect(connectorFactoryMock.newConnector(EasyMock.anyString())).andReturn(connectorMock);
+        EasyMock.expect(worker.getPlugins()).andReturn(plugins).times(6);
+        EasyMock.expect(plugins.compareAndSwapLoaders(connectorMock)).andReturn(delegatingLoader);
+        EasyMock.expect(plugins.newConnector(EasyMock.anyString())).andReturn(connectorMock);
         EasyMock.expect(connectorMock.config()).andReturn(new ConfigDef());
         EasyMock.expect(connectorMock.validate(CONN1_CONFIG_UPDATED)).andReturn(new Config(Collections.<ConfigValue>emptyList()));
+        EasyMock.expect(Plugins.compareAndSwapLoaders(delegatingLoader)).andReturn(pluginLoader);
 
         configBackingStore.putConnectorConfig(CONN1, CONN1_CONFIG_UPDATED);
         PowerMock.expectLastCall().andAnswer(new IAnswer<Object>() {