You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by ew...@apache.org on 2017/05/18 17:39:21 UTC

[1/3] kafka git commit: KAFKA-3487: Support classloading isolation in Connect (KIP-146)

Repository: kafka
Updated Branches:
  refs/heads/trunk 5aaaba7ff -> 45f226176


http://git-wip-us.apache.org/repos/asf/kafka/blob/45f22617/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/isolation/PluginUtilsTest.java
----------------------------------------------------------------------
diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/isolation/PluginUtilsTest.java b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/isolation/PluginUtilsTest.java
new file mode 100644
index 0000000..c943863
--- /dev/null
+++ b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/isolation/PluginUtilsTest.java
@@ -0,0 +1,127 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.connect.runtime.isolation;
+
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+
+public class PluginUtilsTest {
+    @Rule
+    public TemporaryFolder rootDir = new TemporaryFolder();
+
+    @Before
+    public void setUp() throws Exception {
+    }
+
+    @Test
+    public void testJavaLibraryClasses() throws Exception {
+        assertFalse(PluginUtils.shouldLoadInIsolation("java."));
+        assertFalse(PluginUtils.shouldLoadInIsolation("java.lang.Object"));
+        assertFalse(PluginUtils.shouldLoadInIsolation("java.lang.String"));
+        assertFalse(PluginUtils.shouldLoadInIsolation("java.util.HashMap$Entry"));
+        assertFalse(PluginUtils.shouldLoadInIsolation("java.io.Serializable"));
+        assertFalse(PluginUtils.shouldLoadInIsolation("javax."));
+        assertFalse(PluginUtils.shouldLoadInIsolation(
+                "javax.management.loading.ClassLoaderRepository")
+        );
+        assertFalse(PluginUtils.shouldLoadInIsolation("org.omg."));
+        assertFalse(PluginUtils.shouldLoadInIsolation("org.omg.CORBA.Object"));
+        assertFalse(PluginUtils.shouldLoadInIsolation("org.w3c.dom."));
+        assertFalse(PluginUtils.shouldLoadInIsolation("org.w3c.dom.traversal.TreeWalker"));
+    }
+
+    @Test
+    public void testThirdPartyClasses() throws Exception {
+        assertFalse(PluginUtils.shouldLoadInIsolation("org.apache.log4j."));
+        assertFalse(PluginUtils.shouldLoadInIsolation("org.apache.log4j.Level"));
+    }
+
+    @Test
+    public void testConnectFrameworkClasses() throws Exception {
+        assertFalse(PluginUtils.shouldLoadInIsolation("org.apache.kafka.common."));
+        assertFalse(PluginUtils.shouldLoadInIsolation(
+                "org.apache.kafka.common.config.AbstractConfig")
+        );
+        assertFalse(PluginUtils.shouldLoadInIsolation(
+                "org.apache.kafka.common.config.ConfigDef$Type")
+        );
+        assertFalse(PluginUtils.shouldLoadInIsolation(
+                "org.apache.kafka.common.serialization.Deserializer")
+        );
+        assertFalse(PluginUtils.shouldLoadInIsolation("org.apache.kafka.connect."));
+        assertFalse(PluginUtils.shouldLoadInIsolation(
+                "org.apache.kafka.connect.connector.Connector")
+        );
+        assertFalse(PluginUtils.shouldLoadInIsolation(
+                "org.apache.kafka.connect.source.SourceConnector")
+        );
+        assertFalse(PluginUtils.shouldLoadInIsolation(
+                "org.apache.kafka.connect.sink.SinkConnector")
+        );
+        assertFalse(PluginUtils.shouldLoadInIsolation("org.apache.kafka.connect.connector.Task"));
+        assertFalse(PluginUtils.shouldLoadInIsolation(
+                "org.apache.kafka.connect.source.SourceTask")
+        );
+        assertFalse(PluginUtils.shouldLoadInIsolation("org.apache.kafka.connect.sink.SinkTask"));
+        assertFalse(PluginUtils.shouldLoadInIsolation(
+                "org.apache.kafka.connect.transforms.Transformation")
+        );
+        assertFalse(PluginUtils.shouldLoadInIsolation(
+                "org.apache.kafka.connect.storage.Converter")
+        );
+        assertFalse(PluginUtils.shouldLoadInIsolation(
+                "org.apache.kafka.connect.storage.OffsetBackingStore")
+        );
+    }
+
+    @Test
+    public void testAllowedConnectFrameworkClasses() throws Exception {
+        assertTrue(PluginUtils.shouldLoadInIsolation("org.apache.kafka.connect.transforms."));
+        assertTrue(PluginUtils.shouldLoadInIsolation(
+                "org.apache.kafka.connect.transforms.ExtractField")
+        );
+        assertTrue(PluginUtils.shouldLoadInIsolation(
+                "org.apache.kafka.connect.transforms.ExtractField$Key")
+        );
+        assertTrue(PluginUtils.shouldLoadInIsolation("org.apache.kafka.connect.json."));
+        assertTrue(PluginUtils.shouldLoadInIsolation(
+                "org.apache.kafka.connect.json.JsonConverter")
+        );
+        assertTrue(PluginUtils.shouldLoadInIsolation(
+                "org.apache.kafka.connect.json.JsonConverter$21")
+        );
+        assertTrue(PluginUtils.shouldLoadInIsolation("org.apache.kafka.connect.file."));
+        assertTrue(PluginUtils.shouldLoadInIsolation(
+                "org.apache.kafka.connect.file.FileStreamSourceTask")
+        );
+        assertTrue(PluginUtils.shouldLoadInIsolation(
+                "org.apache.kafka.connect.file.FileStreamSinkConnector")
+        );
+        assertTrue(PluginUtils.shouldLoadInIsolation("org.apache.kafka.connect.converters."));
+        assertTrue(PluginUtils.shouldLoadInIsolation(
+                "org.apache.kafka.connect.converters.ByteArrayConverter")
+        );
+        assertTrue(PluginUtils.shouldLoadInIsolation(
+                "org.apache.kafka.connect.storage.StringConverter")
+        );
+    }
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/45f22617/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/rest/resources/ConnectorPluginsResourceTest.java
----------------------------------------------------------------------
diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/rest/resources/ConnectorPluginsResourceTest.java b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/rest/resources/ConnectorPluginsResourceTest.java
index 966098c..2d0448e 100644
--- a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/rest/resources/ConnectorPluginsResourceTest.java
+++ b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/rest/resources/ConnectorPluginsResourceTest.java
@@ -18,7 +18,6 @@ package org.apache.kafka.connect.runtime.rest.resources;
 
 import com.fasterxml.jackson.core.type.TypeReference;
 import com.fasterxml.jackson.databind.ObjectMapper;
-
 import org.apache.kafka.common.config.Config;
 import org.apache.kafka.common.config.ConfigDef;
 import org.apache.kafka.common.config.ConfigDef.Importance;
@@ -31,6 +30,9 @@ import org.apache.kafka.connect.connector.Task;
 import org.apache.kafka.connect.runtime.AbstractHerder;
 import org.apache.kafka.connect.runtime.ConnectorConfig;
 import org.apache.kafka.connect.runtime.Herder;
+import org.apache.kafka.connect.runtime.isolation.PluginClassLoader;
+import org.apache.kafka.connect.runtime.isolation.PluginDesc;
+import org.apache.kafka.connect.runtime.isolation.Plugins;
 import org.apache.kafka.connect.runtime.rest.RestServer;
 import org.apache.kafka.connect.runtime.rest.entities.ConfigInfo;
 import org.apache.kafka.connect.runtime.rest.entities.ConfigInfos;
@@ -57,7 +59,8 @@ import org.powermock.core.classloader.annotations.PowerMockIgnore;
 import org.powermock.core.classloader.annotations.PrepareForTest;
 import org.powermock.modules.junit4.PowerMockRunner;
 
-import java.io.IOException;
+import javax.ws.rs.BadRequestException;
+import java.net.URL;
 import java.util.Arrays;
 import java.util.Collections;
 import java.util.HashMap;
@@ -66,8 +69,7 @@ import java.util.LinkedList;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
-
-import javax.ws.rs.BadRequestException;
+import java.util.TreeSet;
 
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
@@ -88,12 +90,14 @@ public class ConnectorPluginsResourceTest {
 
         props = new HashMap<>(partialProps);
         props.put("connector.class", ConnectorPluginsResourceTestConnector.class.getSimpleName());
+        props.put("plugin.path", null);
     }
 
     private static final ConfigInfos CONFIG_INFOS;
     private static final ConfigInfos PARTIAL_CONFIG_INFOS;
     private static final int ERROR_COUNT = 0;
     private static final int PARTIAL_CONFIG_ERROR_COUNT = 1;
+    private static final Set<PluginDesc<Connector>> CONNECTOR_PLUGINS = new TreeSet<>();
 
     static {
         List<ConfigInfo> configs = new LinkedList<>();
@@ -133,19 +137,58 @@ public class ConnectorPluginsResourceTest {
 
         CONFIG_INFOS = new ConfigInfos(ConnectorPluginsResourceTestConnector.class.getName(), ERROR_COUNT, Collections.singletonList("Test"), configs);
         PARTIAL_CONFIG_INFOS = new ConfigInfos(ConnectorPluginsResourceTestConnector.class.getName(), PARTIAL_CONFIG_ERROR_COUNT, Collections.singletonList("Test"), partialConfigs);
+
+        Class<?>[] abstractConnectorClasses = {
+            Connector.class,
+            SourceConnector.class,
+            SinkConnector.class
+        };
+
+        Class<?>[] connectorClasses = {
+            VerifiableSourceConnector.class,
+            VerifiableSinkConnector.class,
+            MockSourceConnector.class,
+            MockSinkConnector.class,
+            MockConnector.class,
+            SchemaSourceConnector.class,
+            ConnectorPluginsResourceTestConnector.class
+        };
+
+        try {
+            for (Class<?> klass : abstractConnectorClasses) {
+                CONNECTOR_PLUGINS.add(
+                        new MockConnectorPluginDesc((Class<? extends Connector>) klass, "0.0.0"));
+            }
+            for (Class<?> klass : connectorClasses) {
+                CONNECTOR_PLUGINS.add(new MockConnectorPluginDesc((Class<? extends Connector>) klass));
+            }
+        } catch (Exception e) {
+            throw new RuntimeException(e);
+        }
     }
 
     @Mock
     private Herder herder;
+    @Mock
+    private Plugins plugins;
     private ConnectorPluginsResource connectorPluginsResource;
 
     @Before
-    public void setUp() throws NoSuchMethodException {
+    public void setUp() throws Exception {
         PowerMock.mockStatic(RestServer.class,
                              RestServer.class.getMethod("httpRequest", String.class, String.class, Object.class, TypeReference.class));
+
+        plugins = PowerMock.createMock(Plugins.class);
+        herder = PowerMock.createMock(AbstractHerder.class);
         connectorPluginsResource = new ConnectorPluginsResource(herder);
     }
 
+    private void expectPlugins() {
+        EasyMock.expect(herder.plugins()).andReturn(plugins);
+        EasyMock.expect(plugins.connectors()).andReturn(CONNECTOR_PLUGINS);
+        PowerMock.replayAll();
+    }
+
     @Test
     public void testValidateConfigWithSingleErrorDueToMissingConnectorClassname() throws Throwable {
         herder.validateConnectorConfig(EasyMock.eq(partialProps));
@@ -359,27 +402,30 @@ public class ConnectorPluginsResourceTest {
     }
 
     @Test
-    public void testListConnectorPlugins() {
+    public void testListConnectorPlugins() throws Exception {
+        expectPlugins();
         Set<ConnectorPluginInfo> connectorPlugins = new HashSet<>(connectorPluginsResource.listConnectorPlugins());
-        assertFalse(connectorPlugins.contains(new ConnectorPluginInfo(Connector.class)));
-        assertFalse(connectorPlugins.contains(new ConnectorPluginInfo(SourceConnector.class)));
-        assertFalse(connectorPlugins.contains(new ConnectorPluginInfo(SinkConnector.class)));
-        assertFalse(connectorPlugins.contains(new ConnectorPluginInfo(VerifiableSourceConnector.class)));
-        assertFalse(connectorPlugins.contains(new ConnectorPluginInfo(VerifiableSinkConnector.class)));
-        assertFalse(connectorPlugins.contains(new ConnectorPluginInfo(MockSourceConnector.class)));
-        assertFalse(connectorPlugins.contains(new ConnectorPluginInfo(MockSinkConnector.class)));
-        assertFalse(connectorPlugins.contains(new ConnectorPluginInfo(MockConnector.class)));
-        assertFalse(connectorPlugins.contains(new ConnectorPluginInfo(SchemaSourceConnector.class)));
-        assertTrue(connectorPlugins.contains(new ConnectorPluginInfo(ConnectorPluginsResourceTestConnector.class)));
+        assertFalse(connectorPlugins.contains(newInfo(Connector.class, "0.0")));
+        assertFalse(connectorPlugins.contains(newInfo(SourceConnector.class, "0.0")));
+        assertFalse(connectorPlugins.contains(newInfo(SinkConnector.class, "0.0")));
+        assertFalse(connectorPlugins.contains(newInfo(VerifiableSourceConnector.class)));
+        assertFalse(connectorPlugins.contains(newInfo(VerifiableSinkConnector.class)));
+        assertFalse(connectorPlugins.contains(newInfo(MockSourceConnector.class)));
+        assertFalse(connectorPlugins.contains(newInfo(MockSinkConnector.class)));
+        assertFalse(connectorPlugins.contains(newInfo(MockConnector.class)));
+        assertFalse(connectorPlugins.contains(newInfo(SchemaSourceConnector.class)));
+        assertTrue(connectorPlugins.contains(newInfo(ConnectorPluginsResourceTestConnector.class)));
+        PowerMock.verifyAll();
     }
 
     @Test
-    public void testConnectorPluginsIncludesTypeAndVersionInformation()
-        throws IOException {
-        ConnectorPluginInfo sinkInfo = new ConnectorPluginInfo(TestSinkConnector.class);
-        ConnectorPluginInfo sourceInfo = new ConnectorPluginInfo(TestSourceConnector.class);
+    public void testConnectorPluginsIncludesTypeAndVersionInformation() throws Exception {
+        expectPlugins();
+        ConnectorPluginInfo sinkInfo = newInfo(TestSinkConnector.class);
+        ConnectorPluginInfo sourceInfo =
+                newInfo(TestSourceConnector.class);
         ConnectorPluginInfo unkownInfo =
-            new ConnectorPluginInfo(ConnectorPluginsResourceTestConnector.class);
+            newInfo(ConnectorPluginsResourceTestConnector.class);
         assertEquals(ConnectorType.SINK, sinkInfo.type());
         assertEquals(ConnectorType.SOURCE, sourceInfo.type());
         assertEquals(ConnectorType.UNKNOWN, unkownInfo.type());
@@ -407,6 +453,46 @@ public class ConnectorPluginsResourceTest {
         );
     }
 
+    protected static ConnectorPluginInfo newInfo(Class<? extends Connector> klass, String version)
+            throws Exception {
+        return new ConnectorPluginInfo(new MockConnectorPluginDesc(klass, version));
+    }
+
+    protected static ConnectorPluginInfo newInfo(Class<? extends Connector> klass)
+            throws Exception {
+        return new ConnectorPluginInfo(new MockConnectorPluginDesc(klass));
+    }
+
+    public static class MockPluginClassLoader extends PluginClassLoader {
+        public MockPluginClassLoader(URL pluginLocation, URL[] urls, ClassLoader parent) {
+            super(pluginLocation, urls, parent);
+        }
+
+        public MockPluginClassLoader(URL pluginLocation, URL[] urls) {
+            super(pluginLocation, urls);
+        }
+
+        @Override
+        public String location() {
+            return "/tmp/mockpath";
+        }
+    }
+
+    public static class MockConnectorPluginDesc extends PluginDesc<Connector> {
+        public MockConnectorPluginDesc(Class<? extends Connector> klass, String version)
+                throws Exception {
+            super(klass, version, new MockPluginClassLoader(null, new URL[0]));
+        }
+
+        public MockConnectorPluginDesc(Class<? extends Connector> klass) throws Exception {
+            super(
+                    klass,
+                    klass.newInstance().version(),
+                    new MockPluginClassLoader(null, new URL[0])
+            );
+        }
+    }
+
     public static class TestSinkConnector extends SinkConnector {
 
         static final String VERSION = "some great version";

http://git-wip-us.apache.org/repos/asf/kafka/blob/45f22617/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/standalone/StandaloneHerderTest.java
----------------------------------------------------------------------
diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/standalone/StandaloneHerderTest.java b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/standalone/StandaloneHerderTest.java
index da1edbc..1c3dddb 100644
--- a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/standalone/StandaloneHerderTest.java
+++ b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/standalone/StandaloneHerderTest.java
@@ -27,7 +27,6 @@ import org.apache.kafka.connect.errors.ConnectException;
 import org.apache.kafka.connect.errors.NotFoundException;
 import org.apache.kafka.connect.runtime.AbstractStatus;
 import org.apache.kafka.connect.runtime.ConnectorConfig;
-import org.apache.kafka.connect.runtime.ConnectorFactory;
 import org.apache.kafka.connect.runtime.ConnectorStatus;
 import org.apache.kafka.connect.runtime.Herder;
 import org.apache.kafka.connect.runtime.HerderConnectorContext;
@@ -35,6 +34,9 @@ import org.apache.kafka.connect.runtime.TargetState;
 import org.apache.kafka.connect.runtime.TaskConfig;
 import org.apache.kafka.connect.runtime.TaskStatus;
 import org.apache.kafka.connect.runtime.Worker;
+import org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader;
+import org.apache.kafka.connect.runtime.isolation.PluginClassLoader;
+import org.apache.kafka.connect.runtime.isolation.Plugins;
 import org.apache.kafka.connect.runtime.rest.entities.ConnectorInfo;
 import org.apache.kafka.connect.runtime.rest.entities.TaskInfo;
 import org.apache.kafka.connect.runtime.rest.errors.BadRequestException;
@@ -54,6 +56,7 @@ import org.junit.Test;
 import org.junit.runner.RunWith;
 import org.powermock.api.easymock.PowerMock;
 import org.powermock.api.easymock.annotation.Mock;
+import org.powermock.core.classloader.annotations.PrepareForTest;
 import org.powermock.modules.junit4.PowerMockRunner;
 
 import java.util.ArrayList;
@@ -75,6 +78,7 @@ import static org.junit.Assert.fail;
 
 @RunWith(PowerMockRunner.class)
 @SuppressWarnings("unchecked")
+@PrepareForTest({StandaloneHerder.class, Plugins.class})
 public class StandaloneHerderTest {
     private static final String CONNECTOR_NAME = "test";
     private static final List<String> TOPICS_LIST = Arrays.asList("topic1", "topic2");
@@ -90,12 +94,21 @@ public class StandaloneHerderTest {
 
     private Connector connector;
     @Mock protected Worker worker;
+    @Mock private Plugins plugins;
+    @Mock
+    private PluginClassLoader pluginLoader;
+    @Mock
+    private DelegatingClassLoader delegatingLoader;
     @Mock protected Callback<Herder.Created<ConnectorInfo>> createCallback;
     @Mock protected StatusBackingStore statusBackingStore;
 
     @Before
     public void setup() {
         herder = new StandaloneHerder(worker, WORKER_ID, statusBackingStore, new MemoryConfigBackingStore());
+        plugins = PowerMock.createMock(Plugins.class);
+        pluginLoader = PowerMock.createMock(PluginClassLoader.class);
+        delegatingLoader = PowerMock.createMock(DelegatingClassLoader.class);
+        PowerMock.mockStatic(Plugins.class);
     }
 
     @Test
@@ -120,12 +133,12 @@ public class StandaloneHerderTest {
         Map<String, String> config = connectorConfig(SourceSink.SOURCE);
         config.remove(ConnectorConfig.NAME_CONFIG);
 
-        ConnectorFactory connectorFactoryMock = PowerMock.createMock(ConnectorFactory.class);
-        EasyMock.expect(worker.getConnectorFactory()).andStubReturn(connectorFactoryMock);
         Connector connectorMock = PowerMock.createMock(Connector.class);
-        EasyMock.expect(connectorFactoryMock.newConnector(EasyMock.anyString())).andReturn(connectorMock);
+        EasyMock.expect(worker.getPlugins()).andReturn(plugins).times(3);
+        EasyMock.expect(plugins.compareAndSwapLoaders(connectorMock)).andReturn(delegatingLoader);
+        EasyMock.expect(plugins.newConnector(EasyMock.anyString())).andReturn(connectorMock);
         EasyMock.expect(connectorMock.config()).andStubReturn(new ConfigDef());
-        EasyMock.expect(connectorMock.validate(config)).andReturn(new Config(Collections.<ConfigValue>emptyList()));
+        EasyMock.expect(Plugins.compareAndSwapLoaders(delegatingLoader)).andReturn(pluginLoader);
 
         createCallback.onCompletion(EasyMock.<BadRequestException>anyObject(), EasyMock.<Herder.Created<ConnectorInfo>>isNull());
         PowerMock.expectLastCall();
@@ -141,11 +154,10 @@ public class StandaloneHerderTest {
     public void testCreateConnectorFailedCustomValidation() throws Exception {
         connector = PowerMock.createMock(BogusSourceConnector.class);
 
-        Map<String, String> config = connectorConfig(SourceSink.SOURCE);
-        ConnectorFactory connectorFactoryMock = PowerMock.createMock(ConnectorFactory.class);
-        EasyMock.expect(worker.getConnectorFactory()).andStubReturn(connectorFactoryMock);
         Connector connectorMock = PowerMock.createMock(Connector.class);
-        EasyMock.expect(connectorFactoryMock.newConnector(EasyMock.anyString())).andReturn(connectorMock);
+        EasyMock.expect(worker.getPlugins()).andReturn(plugins).times(4);
+        EasyMock.expect(plugins.compareAndSwapLoaders(connectorMock)).andReturn(delegatingLoader);
+        EasyMock.expect(plugins.newConnector(EasyMock.anyString())).andReturn(connectorMock);
 
         ConfigDef configDef = new ConfigDef();
         configDef.define("foo.bar", ConfigDef.Type.STRING, ConfigDef.Importance.HIGH, "foo.bar doc");
@@ -153,7 +165,9 @@ public class StandaloneHerderTest {
 
         ConfigValue validatedValue = new ConfigValue("foo.bar");
         validatedValue.addErrorMessage("Failed foo.bar validation");
+        Map<String, String> config = connectorConfig(SourceSink.SOURCE);
         EasyMock.expect(connectorMock.validate(config)).andReturn(new Config(singletonList(validatedValue)));
+        EasyMock.expect(Plugins.compareAndSwapLoaders(delegatingLoader)).andReturn(pluginLoader);
 
         createCallback.onCompletion(EasyMock.<BadRequestException>anyObject(), EasyMock.<Herder.Created<ConnectorInfo>>isNull());
         PowerMock.expectLastCall();
@@ -172,8 +186,13 @@ public class StandaloneHerderTest {
         expectAdd(SourceSink.SOURCE);
 
         Map<String, String> config = connectorConfig(SourceSink.SOURCE);
-        expectConfigValidation(config, config);
+        Connector connectorMock = PowerMock.createMock(Connector.class);
+        expectConfigValidation(connectorMock, true, config, config);
 
+        EasyMock.expect(worker.getPlugins()).andReturn(plugins).times(3);
+        EasyMock.expect(plugins.compareAndSwapLoaders(connectorMock)).andReturn(delegatingLoader);
+        // No new connector is created
+        EasyMock.expect(Plugins.compareAndSwapLoaders(delegatingLoader)).andReturn(pluginLoader);
         // Second should fail
         createCallback.onCompletion(EasyMock.<AlreadyExistsException>anyObject(), EasyMock.<Herder.Created<ConnectorInfo>>isNull());
         PowerMock.expectLastCall();
@@ -435,7 +454,8 @@ public class StandaloneHerderTest {
         // Create
         connector = PowerMock.createMock(BogusSourceConnector.class);
         expectAdd(SourceSink.SOURCE);
-        expectConfigValidation(connConfig, newConnConfig);
+        Connector connectorMock = PowerMock.createMock(Connector.class);
+        expectConfigValidation(connectorMock, true, connConfig);
 
         // Should get first config
         connectorConfigCb.onCompletion(null, connConfig);
@@ -457,6 +477,7 @@ public class StandaloneHerderTest {
         putConnectorConfigCb.onCompletion(null, new Herder.Created<>(false, newConnInfo));
         EasyMock.expectLastCall();
         // Should get new config
+        expectConfigValidation(connectorMock, false, newConnConfig);
         connectorConfigCb.onCompletion(null, newConnConfig);
         EasyMock.expectLastCall();
 
@@ -501,11 +522,12 @@ public class StandaloneHerderTest {
         );
         ConfigDef configDef = new ConfigDef();
         configDef.define(key, ConfigDef.Type.STRING, ConfigDef.Importance.HIGH, "");
+        EasyMock.expect(worker.getPlugins()).andReturn(plugins).times(4);
+        EasyMock.expect(plugins.compareAndSwapLoaders(connectorMock)).andReturn(delegatingLoader);
+        EasyMock.expect(worker.getPlugins()).andStubReturn(plugins);
+        EasyMock.expect(plugins.newConnector(EasyMock.anyString())).andReturn(connectorMock);
         EasyMock.expect(connectorMock.config()).andStubReturn(configDef);
-        ConnectorFactory connectorFactoryMock = PowerMock.createMock(ConnectorFactory.class);
-        EasyMock.expect(worker.getConnectorFactory()).andStubReturn(connectorFactoryMock);
-        EasyMock.expect(connectorFactoryMock.newConnector(EasyMock.anyString()))
-            .andReturn(connectorMock);
+        EasyMock.expect(Plugins.compareAndSwapLoaders(delegatingLoader)).andReturn(pluginLoader);
         Callback<Herder.Created<ConnectorInfo>> callback = PowerMock.createMock(Callback.class);
         Capture<BadRequestException> capture = Capture.newInstance();
         callback.onCompletion(
@@ -588,15 +610,27 @@ public class StandaloneHerderTest {
 
 
     private void expectConfigValidation(Map<String, String> ... configs) {
-        // config validation
-        ConnectorFactory connectorFactoryMock = PowerMock.createMock(ConnectorFactory.class);
-        EasyMock.expect(worker.getConnectorFactory()).andStubReturn(connectorFactoryMock);
         Connector connectorMock = PowerMock.createMock(Connector.class);
-        EasyMock.expect(connectorFactoryMock.newConnector(EasyMock.anyString())).andReturn(connectorMock);
+        expectConfigValidation(connectorMock, true, configs);
+    }
+
+    private void expectConfigValidation(
+            Connector connectorMock,
+            boolean shouldCreateConnector,
+            Map<String, String>... configs
+    ) {
+        // config validation
+        EasyMock.expect(worker.getPlugins()).andReturn(plugins).times(4);
+        EasyMock.expect(plugins.compareAndSwapLoaders(connectorMock)).andReturn(delegatingLoader);
+        if (shouldCreateConnector) {
+            EasyMock.expect(worker.getPlugins()).andReturn(plugins);
+            EasyMock.expect(plugins.newConnector(EasyMock.anyString())).andReturn(connectorMock);
+        }
         EasyMock.expect(connectorMock.config()).andStubReturn(new ConfigDef());
 
         for (Map<String, String> config : configs)
             EasyMock.expect(connectorMock.validate(config)).andReturn(new Config(Collections.<ConfigValue>emptyList()));
+        EasyMock.expect(Plugins.compareAndSwapLoaders(delegatingLoader)).andReturn(pluginLoader);
     }
 
     // We need to use a real class here due to some issue with mocking java.lang.Class

http://git-wip-us.apache.org/repos/asf/kafka/blob/45f22617/gradle/dependencies.gradle
----------------------------------------------------------------------
diff --git a/gradle/dependencies.gradle b/gradle/dependencies.gradle
index cdd8c23..64ea09e 100644
--- a/gradle/dependencies.gradle
+++ b/gradle/dependencies.gradle
@@ -71,6 +71,7 @@ versions += [
   zkclient: "0.10",
   zookeeper: "3.4.10",
   jfreechart: "1.0.0",
+  mavenArtifact: "3.5.0",
 ]
 
 libs += [
@@ -112,5 +113,6 @@ libs += [
   snappy: "org.xerial.snappy:snappy-java:$versions.snappy",
   zkclient: "com.101tec:zkclient:$versions.zkclient",
   zookeeper: "org.apache.zookeeper:zookeeper:$versions.zookeeper",
-  jfreechart: "jfreechart:jfreechart:$versions.jfreechart"
+  jfreechart: "jfreechart:jfreechart:$versions.jfreechart",
+  mavenArtifact: "org.apache.maven:maven-artifact:$versions.mavenArtifact"
 ]


[3/3] kafka git commit: KAFKA-3487: Support classloading isolation in Connect (KIP-146)

Posted by ew...@apache.org.
KAFKA-3487: Support classloading isolation in Connect (KIP-146)

Author: Konstantine Karantasis <ko...@confluent.io>

Reviewers: Randall Hauch <rh...@gmail.com>, Ewen Cheslack-Postava <ew...@confluent.io>

Closes #3028 from kkonstantine/KAFKA-3487-Support-classloading-isolation-in-Connect


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/45f22617
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/45f22617
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/45f22617

Branch: refs/heads/trunk
Commit: 45f2261763eac5caaebf860daab32ef5337c9293
Parents: 5aaaba7
Author: Konstantine Karantasis <ko...@confluent.io>
Authored: Thu May 18 10:39:15 2017 -0700
Committer: Ewen Cheslack-Postava <me...@ewencp.org>
Committed: Thu May 18 10:39:15 2017 -0700

----------------------------------------------------------------------
 build.gradle                                    |   1 +
 checkstyle/import-control.xml                   |   5 +
 checkstyle/suppressions.xml                     |   2 +-
 config/connect-distributed.properties           |  10 +
 config/connect-standalone.properties            |  11 +
 .../kafka/connect/cli/ConnectDistributed.java   |   7 +-
 .../kafka/connect/cli/ConnectStandalone.java    |   7 +-
 .../kafka/connect/runtime/AbstractHerder.java   |  91 +++---
 .../kafka/connect/runtime/ConnectorConfig.java  |  67 ++++-
 .../apache/kafka/connect/runtime/Herder.java    |   9 +-
 .../kafka/connect/runtime/PluginDiscovery.java  | 126 --------
 .../connect/runtime/SinkConnectorConfig.java    |  10 +-
 .../connect/runtime/SourceConnectorConfig.java  |   5 +-
 .../apache/kafka/connect/runtime/Worker.java    | 173 ++++++++---
 .../kafka/connect/runtime/WorkerConfig.java     |  31 +-
 .../kafka/connect/runtime/WorkerSinkTask.java   |   3 +-
 .../kafka/connect/runtime/WorkerSourceTask.java |   3 +-
 .../kafka/connect/runtime/WorkerTask.java       |  12 +-
 .../runtime/distributed/DistributedHerder.java  |   4 +-
 .../isolation/DelegatingClassLoader.java        | 299 +++++++++++++++++++
 .../runtime/isolation/PluginClassLoader.java    |  68 +++++
 .../connect/runtime/isolation/PluginDesc.java   | 110 +++++++
 .../runtime/isolation/PluginScanResult.java     |  55 ++++
 .../connect/runtime/isolation/PluginType.java   |  58 ++++
 .../connect/runtime/isolation/PluginUtils.java  | 147 +++++++++
 .../connect/runtime/isolation/Plugins.java      | 217 ++++++++++++++
 .../rest/entities/ConnectorPluginInfo.java      |  37 +--
 .../resources/ConnectorPluginsResource.java     |  40 ++-
 .../runtime/standalone/StandaloneHerder.java    |   4 +-
 .../connect/runtime/ConnectorConfigTest.java    |  27 +-
 .../connect/runtime/WorkerConnectorTest.java    |  30 +-
 .../connect/runtime/WorkerSinkTaskTest.java     |   8 +-
 .../runtime/WorkerSinkTaskThreadedTest.java     |   7 +-
 .../connect/runtime/WorkerSourceTaskTest.java   |   5 +-
 .../kafka/connect/runtime/WorkerTaskTest.java   |  30 +-
 .../kafka/connect/runtime/WorkerTest.java       | 224 ++++++++++++--
 .../distributed/DistributedHerderTest.java      |  67 +++--
 .../runtime/isolation/PluginUtilsTest.java      | 127 ++++++++
 .../resources/ConnectorPluginsResourceTest.java | 128 ++++++--
 .../standalone/StandaloneHerderTest.java        |  72 +++--
 gradle/dependencies.gradle                      |   4 +-
 41 files changed, 1937 insertions(+), 404 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/45f22617/build.gradle
----------------------------------------------------------------------
diff --git a/build.gradle b/build.gradle
index 8d6e703..1693723 100644
--- a/build.gradle
+++ b/build.gradle
@@ -1056,6 +1056,7 @@ project(':connect:runtime') {
     compile libs.jettyServlet
     compile libs.jettyServlets
     compile(libs.reflections)
+    compile(libs.mavenArtifact)
 
     testCompile project(':clients').sourceSets.test.output
     testCompile libs.easymock

http://git-wip-us.apache.org/repos/asf/kafka/blob/45f22617/checkstyle/import-control.xml
----------------------------------------------------------------------
diff --git a/checkstyle/import-control.xml b/checkstyle/import-control.xml
index 21d9d3c..7f51979 100644
--- a/checkstyle/import-control.xml
+++ b/checkstyle/import-control.xml
@@ -254,6 +254,11 @@
         <allow pkg="org.glassfish.jersey" />
         <allow pkg="com.fasterxml.jackson" />
       </subpackage>
+
+      <subpackage name="isolation">
+        <allow pkg="com.fasterxml.jackson" />
+        <allow pkg="org.apache.maven.artifact.versioning" />
+      </subpackage>
     </subpackage>
 
     <subpackage name="cli">

http://git-wip-us.apache.org/repos/asf/kafka/blob/45f22617/checkstyle/suppressions.xml
----------------------------------------------------------------------
diff --git a/checkstyle/suppressions.xml b/checkstyle/suppressions.xml
index dc00bee..3b865bc 100644
--- a/checkstyle/suppressions.xml
+++ b/checkstyle/suppressions.xml
@@ -66,7 +66,7 @@
 
     <!-- Connect -->
     <suppress checks="ClassFanOutComplexity"
-              files="DistributedHerder.java"/>
+              files="DistributedHerder(|Test).java"/>
 
     <suppress checks="MethodLength"
               files="(KafkaConfigBackingStore|RequestResponseTest).java"/>

http://git-wip-us.apache.org/repos/asf/kafka/blob/45f22617/config/connect-distributed.properties
----------------------------------------------------------------------
diff --git a/config/connect-distributed.properties b/config/connect-distributed.properties
index b0092bb..752e1f5 100644
--- a/config/connect-distributed.properties
+++ b/config/connect-distributed.properties
@@ -58,3 +58,13 @@ offset.flush.interval.ms=10000
 # The Hostname & Port that will be given out to other workers to connect to i.e. URLs that are routable from other servers.
 #rest.advertised.host.name=
 #rest.advertised.port=
+
+# Set to a list of filesystem paths separated by commas (,) to enable class loading isolation for plugins
+# (connectors, converters, transformations). The list should consist of top level directories that include 
+# any combination of: 
+# a) directories immediately containing jars with plugins and their dependencies
+# b) uber-jars with plugins and their dependencies
+# c) directories immediately containing the package directory structure of classes of plugins and their dependencies
+# Examples: 
+# plugin.path=/usr/local/share/java,/usr/local/share/kafka/plugins,/opt/connectors,
+#plugin.path=

http://git-wip-us.apache.org/repos/asf/kafka/blob/45f22617/config/connect-standalone.properties
----------------------------------------------------------------------
diff --git a/config/connect-standalone.properties b/config/connect-standalone.properties
index 8760590..0039796 100644
--- a/config/connect-standalone.properties
+++ b/config/connect-standalone.properties
@@ -35,3 +35,14 @@ internal.value.converter.schemas.enable=false
 offset.storage.file.filename=/tmp/connect.offsets
 # Flush much faster than normal, which is useful for testing/debugging
 offset.flush.interval.ms=10000
+
+# Set to a list of filesystem paths separated by commas (,) to enable class loading isolation for plugins
+# (connectors, converters, transformations). The list should consist of top level directories that include 
+# any combination of: 
+# a) directories immediately containing jars with plugins and their dependencies
+# b) uber-jars with plugins and their dependencies
+# c) directories immediately containing the package directory structure of classes of plugins and their dependencies
+# Note: symlinks will be followed to discover dependencies or plugins.
+# Examples: 
+# plugin.path=/usr/local/share/java,/usr/local/share/kafka/plugins,/opt/connectors,
+#plugin.path=

http://git-wip-us.apache.org/repos/asf/kafka/blob/45f22617/connect/runtime/src/main/java/org/apache/kafka/connect/cli/ConnectDistributed.java
----------------------------------------------------------------------
diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/cli/ConnectDistributed.java b/connect/runtime/src/main/java/org/apache/kafka/connect/cli/ConnectDistributed.java
index fb3d693..717ccd9 100644
--- a/connect/runtime/src/main/java/org/apache/kafka/connect/cli/ConnectDistributed.java
+++ b/connect/runtime/src/main/java/org/apache/kafka/connect/cli/ConnectDistributed.java
@@ -20,10 +20,10 @@ import org.apache.kafka.common.utils.Exit;
 import org.apache.kafka.common.utils.Time;
 import org.apache.kafka.common.utils.Utils;
 import org.apache.kafka.connect.runtime.Connect;
-import org.apache.kafka.connect.runtime.ConnectorFactory;
 import org.apache.kafka.connect.runtime.Worker;
 import org.apache.kafka.connect.runtime.distributed.DistributedConfig;
 import org.apache.kafka.connect.runtime.distributed.DistributedHerder;
+import org.apache.kafka.connect.runtime.isolation.Plugins;
 import org.apache.kafka.connect.runtime.rest.RestServer;
 import org.apache.kafka.connect.storage.ConfigBackingStore;
 import org.apache.kafka.connect.storage.KafkaConfigBackingStore;
@@ -60,7 +60,8 @@ public class ConnectDistributed {
                 Utils.propsToStringMap(Utils.loadProps(workerPropsFile)) : Collections.<String, String>emptyMap();
 
         Time time = Time.SYSTEM;
-        ConnectorFactory connectorFactory = new ConnectorFactory();
+        Plugins plugins = new Plugins(workerProps);
+        plugins.compareAndSwapWithDelegatingLoader();
         DistributedConfig config = new DistributedConfig(workerProps);
 
         RestServer rest = new RestServer(config);
@@ -70,7 +71,7 @@ public class ConnectDistributed {
         KafkaOffsetBackingStore offsetBackingStore = new KafkaOffsetBackingStore();
         offsetBackingStore.configure(config);
 
-        Worker worker = new Worker(workerId, time, connectorFactory, config, offsetBackingStore);
+        Worker worker = new Worker(workerId, time, plugins, config, offsetBackingStore);
 
         StatusBackingStore statusBackingStore = new KafkaStatusBackingStore(time, worker.getInternalValueConverter());
         statusBackingStore.configure(config);

http://git-wip-us.apache.org/repos/asf/kafka/blob/45f22617/connect/runtime/src/main/java/org/apache/kafka/connect/cli/ConnectStandalone.java
----------------------------------------------------------------------
diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/cli/ConnectStandalone.java b/connect/runtime/src/main/java/org/apache/kafka/connect/cli/ConnectStandalone.java
index 0465048..c6d0e59 100644
--- a/connect/runtime/src/main/java/org/apache/kafka/connect/cli/ConnectStandalone.java
+++ b/connect/runtime/src/main/java/org/apache/kafka/connect/cli/ConnectStandalone.java
@@ -21,9 +21,9 @@ import org.apache.kafka.common.utils.Time;
 import org.apache.kafka.common.utils.Utils;
 import org.apache.kafka.connect.runtime.Connect;
 import org.apache.kafka.connect.runtime.ConnectorConfig;
-import org.apache.kafka.connect.runtime.ConnectorFactory;
 import org.apache.kafka.connect.runtime.Herder;
 import org.apache.kafka.connect.runtime.Worker;
+import org.apache.kafka.connect.runtime.isolation.Plugins;
 import org.apache.kafka.connect.runtime.rest.RestServer;
 import org.apache.kafka.connect.runtime.rest.entities.ConnectorInfo;
 import org.apache.kafka.connect.runtime.standalone.StandaloneConfig;
@@ -65,14 +65,15 @@ public class ConnectStandalone {
                 Utils.propsToStringMap(Utils.loadProps(workerPropsFile)) : Collections.<String, String>emptyMap();
 
         Time time = Time.SYSTEM;
-        ConnectorFactory connectorFactory = new ConnectorFactory();
+        Plugins plugins = new Plugins(workerProps);
+        plugins.compareAndSwapWithDelegatingLoader();
         StandaloneConfig config = new StandaloneConfig(workerProps);
 
         RestServer rest = new RestServer(config);
         URI advertisedUrl = rest.advertisedUrl();
         String workerId = advertisedUrl.getHost() + ":" + advertisedUrl.getPort();
 
-        Worker worker = new Worker(workerId, time, connectorFactory, config, new FileOffsetBackingStore());
+        Worker worker = new Worker(workerId, time, plugins, config, new FileOffsetBackingStore());
 
         Herder herder = new StandaloneHerder(worker);
         final Connect connect = new Connect(herder, rest);

http://git-wip-us.apache.org/repos/asf/kafka/blob/45f22617/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/AbstractHerder.java
----------------------------------------------------------------------
diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/AbstractHerder.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/AbstractHerder.java
index fb286e2..6293b01 100644
--- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/AbstractHerder.java
+++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/AbstractHerder.java
@@ -20,9 +20,11 @@ import org.apache.kafka.common.config.Config;
 import org.apache.kafka.common.config.ConfigDef;
 import org.apache.kafka.common.config.ConfigDef.ConfigKey;
 import org.apache.kafka.common.config.ConfigDef.Type;
+import org.apache.kafka.common.config.ConfigException;
 import org.apache.kafka.common.config.ConfigValue;
 import org.apache.kafka.connect.connector.Connector;
 import org.apache.kafka.connect.errors.NotFoundException;
+import org.apache.kafka.connect.runtime.isolation.Plugins;
 import org.apache.kafka.connect.runtime.rest.entities.ConfigInfo;
 import org.apache.kafka.connect.runtime.rest.entities.ConfigInfos;
 import org.apache.kafka.connect.runtime.rest.entities.ConfigKeyInfo;
@@ -78,7 +80,6 @@ public abstract class AbstractHerder implements Herder, TaskStatus.Listener, Con
     protected final ConfigBackingStore configBackingStore;
 
     private Map<String, Connector> tempConnectors = new ConcurrentHashMap<>();
-    private Thread classPathTraverser;
 
     public AbstractHerder(Worker worker,
                           String workerId,
@@ -96,20 +97,12 @@ public abstract class AbstractHerder implements Herder, TaskStatus.Listener, Con
         this.worker.start();
         this.statusBackingStore.start();
         this.configBackingStore.start();
-        traverseClassPath();
     }
 
     protected void stopServices() {
         this.statusBackingStore.stop();
         this.configBackingStore.stop();
         this.worker.stop();
-        if (this.classPathTraverser != null) {
-            try {
-                this.classPathTraverser.join();
-            } catch (InterruptedException e) {
-                // ignore as it can only happen during shutdown
-            }
-        }
     }
 
     @Override
@@ -189,6 +182,11 @@ public abstract class AbstractHerder implements Herder, TaskStatus.Listener, Con
     }
 
     @Override
+    public Plugins plugins() {
+        return worker.getPlugins();
+    }
+
+    @Override
     public ConnectorStateInfo connectorStatus(String connName) {
         ConnectorStatus connector = statusBackingStore.get(connName);
         if (connector == null)
@@ -233,32 +231,53 @@ public abstract class AbstractHerder implements Herder, TaskStatus.Listener, Con
         if (connType == null)
             throw new BadRequestException("Connector config " + connectorConfig + " contains no connector type");
 
-        Connector connector = getConnector(connType);
-
-        final ConfigDef connectorConfigDef = ConnectorConfig.enrich(
-                (connector instanceof SourceConnector) ? SourceConnectorConfig.configDef() : SinkConnectorConfig.configDef(),
-                connectorConfig,
-                false
-        );
-
         List<ConfigValue> configValues = new ArrayList<>();
         Map<String, ConfigKey> configKeys = new HashMap<>();
         List<String> allGroups = new ArrayList<>();
 
-        // do basic connector validation (name, connector type, etc.)
-        Map<String, ConfigValue> validatedConnectorConfig = validateBasicConnectorConfig(connector, connectorConfigDef, connectorConfig);
-        configValues.addAll(validatedConnectorConfig.values());
-        configKeys.putAll(connectorConfigDef.configKeys());
-        allGroups.addAll(connectorConfigDef.groups());
-
-        // do custom connector-specific validation
-        Config config = connector.validate(connectorConfig);
-        ConfigDef configDef = connector.config();
-        configKeys.putAll(configDef.configKeys());
-        allGroups.addAll(configDef.groups());
-        configValues.addAll(config.configValues());
+        Connector connector = getConnector(connType);
+        ClassLoader savedLoader = worker.getPlugins().compareAndSwapLoaders(connector);
+        try {
+            // do basic connector validation (name, connector type, etc.)
+            ConfigDef basicConfigDef = (connector instanceof SourceConnector)
+                                       ? SourceConnectorConfig.configDef()
+                                       : SinkConnectorConfig.configDef();
+            Map<String, ConfigValue> validatedConnectorConfig = validateBasicConnectorConfig(
+                    connector,
+                    basicConfigDef,
+                    connectorConfig
+            );
+            configValues.addAll(validatedConnectorConfig.values());
+            configKeys.putAll(basicConfigDef.configKeys());
+            allGroups.addAll(basicConfigDef.groups());
+
+            ConnectorConfig connectorConfigToEnrich = (connector instanceof SourceConnector)
+                    ? new SourceConnectorConfig(plugins(), connectorConfig)
+                    : new SinkConnectorConfig(plugins(), connectorConfig);
+            final ConfigDef connectorConfigDef = connectorConfigToEnrich.enrich(
+                    plugins(),
+                    basicConfigDef,
+                    connectorConfig,
+                    false
+            );
 
-        return generateResult(connType, configKeys, configValues, allGroups);
+            // Override is required here after the enriched ConfigDef has been created successfully
+            configKeys.putAll(connectorConfigDef.configKeys());
+            allGroups.addAll(connectorConfigDef.groups());
+
+            // do custom connector-specific validation
+            Config config = connector.validate(connectorConfig);
+            ConfigDef configDef = connector.config();
+            configKeys.putAll(configDef.configKeys());
+            allGroups.addAll(configDef.groups());
+            configValues.addAll(config.configValues());
+            return generateResult(connType, configKeys, configValues, allGroups);
+        } catch (ConfigException e) {
+            // Basic validation must have failed. Return the result.
+            return generateResult(connType, configKeys, configValues, allGroups);
+        } finally {
+            Plugins.compareAndSwapLoaders(savedLoader);
+        }
     }
 
     // public for testing
@@ -334,7 +353,7 @@ public abstract class AbstractHerder implements Herder, TaskStatus.Listener, Con
         if (tempConnectors.containsKey(connType)) {
             return tempConnectors.get(connType);
         } else {
-            Connector connector = worker.getConnectorFactory().newConnector(connType);
+            Connector connector = worker.getPlugins().newConnector(connType);
             tempConnectors.put(connType, connector);
             return connector;
         }
@@ -383,14 +402,4 @@ public abstract class AbstractHerder implements Herder, TaskStatus.Listener, Con
             return null;
         }
     }
-
-    private void traverseClassPath() {
-        classPathTraverser = new Thread(new Runnable() {
-            @Override
-            public void run() {
-                PluginDiscovery.scanClasspathForPlugins();
-            }
-        }, "CLASSPATH traversal thread.");
-        classPathTraverser.start();
-    }
 }

http://git-wip-us.apache.org/repos/asf/kafka/blob/45f22617/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/ConnectorConfig.java
----------------------------------------------------------------------
diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/ConnectorConfig.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/ConnectorConfig.java
index 74aef62..869cfbd 100644
--- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/ConnectorConfig.java
+++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/ConnectorConfig.java
@@ -24,6 +24,8 @@ import org.apache.kafka.common.config.ConfigDef.Width;
 import org.apache.kafka.common.config.ConfigException;
 import org.apache.kafka.connect.connector.ConnectRecord;
 import org.apache.kafka.connect.errors.ConnectException;
+import org.apache.kafka.connect.runtime.isolation.PluginDesc;
+import org.apache.kafka.connect.runtime.isolation.Plugins;
 import org.apache.kafka.connect.transforms.Transformation;
 
 import java.util.ArrayList;
@@ -81,6 +83,17 @@ public class ConnectorConfig extends AbstractConfig {
     private static final String TRANSFORMS_DOC = "Aliases for the transformations to be applied to records.";
     private static final String TRANSFORMS_DISPLAY = "Transforms";
 
+    private final EnrichedConnectorConfig enrichedConfig;
+    private static class EnrichedConnectorConfig extends AbstractConfig {
+        EnrichedConnectorConfig(ConfigDef configDef, Map<String, String> props) {
+            super(configDef, props);
+        }
+
+        public Object get(String key) {
+            return super.get(key);
+        }
+    }
+
     public static ConfigDef configDef() {
         return new ConfigDef()
                 .define(NAME_CONFIG, Type.STRING, Importance.HIGH, NAME_DOC, COMMON_GROUP, 1, Width.MEDIUM, NAME_DISPLAY)
@@ -100,16 +113,25 @@ public class ConnectorConfig extends AbstractConfig {
                 }, Importance.LOW, TRANSFORMS_DOC, TRANSFORMS_GROUP, 6, Width.LONG, TRANSFORMS_DISPLAY);
     }
 
-    public ConnectorConfig() {
-        this(new HashMap<String, String>());
+    public ConnectorConfig(Plugins plugins) {
+        this(plugins, new HashMap<String, String>());
+    }
+
+    public ConnectorConfig(Plugins plugins, Map<String, String> props) {
+        this(plugins, configDef(), props);
     }
 
-    public ConnectorConfig(Map<String, String> props) {
-        this(configDef(), props);
+    public ConnectorConfig(Plugins plugins, ConfigDef configDef, Map<String, String> props) {
+        super(configDef, props);
+        enrichedConfig = new EnrichedConnectorConfig(
+                enrich(plugins, configDef, props, true),
+                props
+        );
     }
 
-    public ConnectorConfig(ConfigDef configDef, Map<String, String> props) {
-        super(enrich(configDef, props, true), props);
+    @Override
+    public Object get(String key) {
+        return enrichedConfig.get(key);
     }
 
     /**
@@ -142,15 +164,20 @@ public class ConnectorConfig extends AbstractConfig {
      * <p>
      * {@code requireFullConfig} specifies whether required config values that are missing should cause an exception to be thrown.
      */
-    public static ConfigDef enrich(ConfigDef baseConfigDef, Map<String, String> props, boolean requireFullConfig) {
-        final List<String> transformAliases = (List<String>) ConfigDef.parseType(TRANSFORMS_CONFIG, props.get(TRANSFORMS_CONFIG), Type.LIST);
-        if (transformAliases == null || transformAliases.isEmpty()) {
+    public ConfigDef enrich(Plugins plugins, ConfigDef baseConfigDef, Map<String, String> props, boolean requireFullConfig) {
+        Object transformAliases = ConfigDef.parseType(TRANSFORMS_CONFIG, props.get(TRANSFORMS_CONFIG), Type.LIST);
+        if (!(transformAliases instanceof List)) {
             return baseConfigDef;
         }
 
-        final ConfigDef newDef = new ConfigDef(baseConfigDef);
-
-        for (String alias : new LinkedHashSet<>(transformAliases)) {
+        ConfigDef newDef = new ConfigDef(baseConfigDef);
+        LinkedHashSet<?> uniqueTransformAliases = new LinkedHashSet<>((List<?>) transformAliases);
+        for (Object o : uniqueTransformAliases) {
+            if (!(o instanceof String)) {
+                throw new ConfigException("Item in " + TRANSFORMS_CONFIG + " property is not of "
+                        + "type String");
+            }
+            String alias = (String) o;
             final String prefix = TRANSFORMS_CONFIG + "." + alias + ".";
             final String group = TRANSFORMS_GROUP + ": " + alias;
             int orderInGroup = 0;
@@ -164,7 +191,7 @@ public class ConnectorConfig extends AbstractConfig {
             };
             newDef.define(transformationTypeConfig, Type.CLASS, ConfigDef.NO_DEFAULT_VALUE, typeValidator, Importance.HIGH,
                     "Class for the '" + alias + "' transformation.", group, orderInGroup++, Width.LONG, "Transformation type for " + alias,
-                    Collections.<String>emptyList(), new TransformationClassRecommender());
+                    Collections.<String>emptyList(), new TransformationClassRecommender(plugins));
 
             final ConfigDef transformationConfigDef;
             try {
@@ -204,9 +231,19 @@ public class ConnectorConfig extends AbstractConfig {
      * Recommend bundled transformations.
      */
     static final class TransformationClassRecommender implements ConfigDef.Recommender {
+        private final Plugins plugins;
+
+        TransformationClassRecommender(Plugins plugins) {
+            this.plugins = plugins;
+        }
+
         @Override
         public List<Object> validValues(String name, Map<String, Object> parsedConfig) {
-            return (List) PluginDiscovery.transformationPlugins();
+            List<Object> transformationPlugins = new ArrayList<>();
+            for (PluginDesc<Transformation> plugin : plugins.transformations()) {
+                transformationPlugins.add(plugin.pluginClass());
+            }
+            return Collections.unmodifiableList(transformationPlugins);
         }
 
         @Override
@@ -215,4 +252,4 @@ public class ConnectorConfig extends AbstractConfig {
         }
     }
 
-}
\ No newline at end of file
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/45f22617/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Herder.java
----------------------------------------------------------------------
diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Herder.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Herder.java
index 93fc6f0..5dfb808 100644
--- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Herder.java
+++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Herder.java
@@ -16,6 +16,7 @@
  */
 package org.apache.kafka.connect.runtime;
 
+import org.apache.kafka.connect.runtime.isolation.Plugins;
 import org.apache.kafka.connect.runtime.rest.entities.ConfigInfos;
 import org.apache.kafka.connect.runtime.rest.entities.ConnectorInfo;
 import org.apache.kafka.connect.runtime.rest.entities.ConnectorStateInfo;
@@ -168,6 +169,12 @@ public interface Herder {
      */
     void resumeConnector(String connector);
 
+    /**
+     * Returns a handle to the plugin factory used by this herder and its worker.
+     *
+     * @return a reference to the plugin factory.
+     */
+    Plugins plugins();
 
     class Created<T> {
         private final boolean created;
@@ -200,4 +207,4 @@ public interface Herder {
             return Objects.hash(created, result);
         }
     }
-}
\ No newline at end of file
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/45f22617/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/PluginDiscovery.java
----------------------------------------------------------------------
diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/PluginDiscovery.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/PluginDiscovery.java
deleted file mode 100644
index 482139a..0000000
--- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/PluginDiscovery.java
+++ /dev/null
@@ -1,126 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.kafka.connect.runtime;
-
-import org.apache.kafka.connect.connector.Connector;
-import org.apache.kafka.connect.runtime.rest.entities.ConnectorPluginInfo;
-import org.apache.kafka.connect.tools.MockConnector;
-import org.apache.kafka.connect.tools.MockSinkConnector;
-import org.apache.kafka.connect.tools.MockSourceConnector;
-import org.apache.kafka.connect.tools.SchemaSourceConnector;
-import org.apache.kafka.connect.tools.VerifiableSinkConnector;
-import org.apache.kafka.connect.tools.VerifiableSourceConnector;
-import org.apache.kafka.connect.transforms.Transformation;
-import org.apache.kafka.connect.util.ReflectionsUtil;
-import org.reflections.Reflections;
-import org.reflections.util.ClasspathHelper;
-import org.reflections.util.ConfigurationBuilder;
-
-import java.lang.reflect.Modifier;
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.Collections;
-import java.util.Comparator;
-import java.util.List;
-import java.util.Set;
-
-public class PluginDiscovery {
-
-    private static final List<Class<? extends Connector>> CONNECTOR_EXCLUDES = Arrays.asList(
-            VerifiableSourceConnector.class, VerifiableSinkConnector.class,
-            MockConnector.class, MockSourceConnector.class, MockSinkConnector.class,
-            SchemaSourceConnector.class
-    );
-
-    private static final List<Class<? extends Transformation>> TRANSFORMATION_EXCLUDES = Arrays.asList();
-
-    private static boolean scanned = false;
-    private static List<ConnectorPluginInfo> validConnectorPlugins;
-    private static List<Class<? extends Transformation>> validTransformationPlugins;
-
-    public static synchronized List<ConnectorPluginInfo> connectorPlugins() {
-        scanClasspathForPlugins();
-        return validConnectorPlugins;
-    }
-
-    public static synchronized List<Class<? extends Transformation>> transformationPlugins() {
-        scanClasspathForPlugins();
-        return validTransformationPlugins;
-    }
-
-    public static synchronized void scanClasspathForPlugins() {
-        if (scanned) return;
-        ReflectionsUtil.registerUrlTypes();
-        final Reflections reflections = new Reflections(new ConfigurationBuilder().setUrls(ClasspathHelper.forJavaClassPath()));
-        validConnectorPlugins = Collections.unmodifiableList(connectorPlugins(reflections));
-        validTransformationPlugins = Collections.unmodifiableList(transformationPlugins(reflections));
-        scanned = true;
-    }
-
-    private static List<ConnectorPluginInfo> connectorPlugins(Reflections reflections) {
-        final Set<Class<? extends Connector>> connectorClasses = reflections.getSubTypesOf(Connector.class);
-        connectorClasses.removeAll(CONNECTOR_EXCLUDES);
-
-        final List<ConnectorPluginInfo> connectorPlugins = new ArrayList<>(connectorClasses.size());
-        for (Class<? extends Connector> connectorClass : connectorClasses) {
-            if (isConcrete(connectorClass)) {
-                connectorPlugins.add(new ConnectorPluginInfo(connectorClass));
-            }
-        }
-
-        Collections.sort(connectorPlugins, new Comparator<ConnectorPluginInfo>() {
-            @Override
-            public int compare(ConnectorPluginInfo a, ConnectorPluginInfo b) {
-                return a.className().compareTo(b.className());
-            }
-        });
-
-        return connectorPlugins;
-    }
-
-    private static List<Class<? extends Transformation>> transformationPlugins(Reflections reflections) {
-        final Set<Class<? extends Transformation>> transformationClasses = reflections.getSubTypesOf(Transformation.class);
-        transformationClasses.removeAll(TRANSFORMATION_EXCLUDES);
-
-        final List<Class<? extends Transformation>> transformationPlugins = new ArrayList<>(transformationClasses.size());
-        for (Class<? extends Transformation> transformationClass : transformationClasses) {
-            if (isConcrete(transformationClass)) {
-                transformationPlugins.add(transformationClass);
-            }
-        }
-
-        Collections.sort(transformationPlugins, new Comparator<Class<? extends Transformation>>() {
-            @Override
-            public int compare(Class<? extends Transformation> a, Class<? extends Transformation> b) {
-                return a.getName().compareTo(b.getName());
-            }
-        });
-
-        return transformationPlugins;
-    }
-
-    private static boolean isConcrete(Class<?> cls) {
-        final int mod = cls.getModifiers();
-        return !Modifier.isAbstract(mod) && !Modifier.isInterface(mod);
-    }
-
-    public static void main(String... args) {
-        System.out.println(connectorPlugins());
-        System.out.println(transformationPlugins());
-    }
-
-}

http://git-wip-us.apache.org/repos/asf/kafka/blob/45f22617/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/SinkConnectorConfig.java
----------------------------------------------------------------------
diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/SinkConnectorConfig.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/SinkConnectorConfig.java
index 21abdd0..e47d537 100644
--- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/SinkConnectorConfig.java
+++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/SinkConnectorConfig.java
@@ -17,8 +17,8 @@
 package org.apache.kafka.connect.runtime;
 
 import org.apache.kafka.common.config.ConfigDef;
+import org.apache.kafka.connect.runtime.isolation.Plugins;
 
-import java.util.HashMap;
 import java.util.Map;
 
 /**
@@ -35,15 +35,11 @@ public class SinkConnectorConfig extends ConnectorConfig {
     static ConfigDef config = ConnectorConfig.configDef()
         .define(TOPICS_CONFIG, ConfigDef.Type.LIST, TOPICS_DEFAULT, ConfigDef.Importance.HIGH, TOPICS_DOC, COMMON_GROUP, 4, ConfigDef.Width.LONG, TOPICS_DISPLAY);
 
-    public SinkConnectorConfig() {
-        this(new HashMap<String, String>());
-    }
-
     public static ConfigDef configDef() {
         return config;
     }
 
-    public SinkConnectorConfig(Map<String, String> props) {
-        super(config, props);
+    public SinkConnectorConfig(Plugins plugins, Map<String, String> props) {
+        super(plugins, config, props);
     }
 }

http://git-wip-us.apache.org/repos/asf/kafka/blob/45f22617/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/SourceConnectorConfig.java
----------------------------------------------------------------------
diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/SourceConnectorConfig.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/SourceConnectorConfig.java
index 651ac74..6915421 100644
--- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/SourceConnectorConfig.java
+++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/SourceConnectorConfig.java
@@ -17,6 +17,7 @@
 package org.apache.kafka.connect.runtime;
 
 import org.apache.kafka.common.config.ConfigDef;
+import org.apache.kafka.connect.runtime.isolation.Plugins;
 
 import java.util.Map;
 
@@ -24,7 +25,7 @@ public class SourceConnectorConfig extends ConnectorConfig {
 
     private static ConfigDef config = configDef();
 
-    public SourceConnectorConfig(Map<String, String> props) {
-        super(config, props);
+    public SourceConnectorConfig(Plugins plugins, Map<String, String> props) {
+        super(plugins, config, props);
     }
 }

http://git-wip-us.apache.org/repos/asf/kafka/blob/45f22617/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Worker.java
----------------------------------------------------------------------
diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Worker.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Worker.java
index 400ae08..12802c1 100644
--- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Worker.java
+++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Worker.java
@@ -24,6 +24,7 @@ import org.apache.kafka.connect.connector.Connector;
 import org.apache.kafka.connect.connector.ConnectorContext;
 import org.apache.kafka.connect.connector.Task;
 import org.apache.kafka.connect.errors.ConnectException;
+import org.apache.kafka.connect.runtime.isolation.Plugins;
 import org.apache.kafka.connect.sink.SinkRecord;
 import org.apache.kafka.connect.sink.SinkTask;
 import org.apache.kafka.connect.source.SourceRecord;
@@ -65,7 +66,7 @@ public class Worker {
     private final ExecutorService executor;
     private final Time time;
     private final String workerId;
-    private final ConnectorFactory connectorFactory;
+    private final Plugins plugins;
     private final WorkerConfig config;
     private final Converter defaultKeyConverter;
     private final Converter defaultValueConverter;
@@ -78,20 +79,45 @@ public class Worker {
     private final ConcurrentMap<ConnectorTaskId, WorkerTask> tasks = new ConcurrentHashMap<>();
     private SourceTaskOffsetCommitter sourceTaskOffsetCommitter;
 
-    public Worker(String workerId, Time time, ConnectorFactory connectorFactory, WorkerConfig config, OffsetBackingStore offsetBackingStore) {
+    public Worker(
+            String workerId,
+            Time time,
+            Plugins plugins,
+            WorkerConfig config,
+            OffsetBackingStore offsetBackingStore
+    ) {
         this.executor = Executors.newCachedThreadPool();
         this.workerId = workerId;
         this.time = time;
-        this.connectorFactory = connectorFactory;
+        this.plugins = plugins;
         this.config = config;
-        this.defaultKeyConverter = config.getConfiguredInstance(WorkerConfig.KEY_CONVERTER_CLASS_CONFIG, Converter.class);
+        // Converters are required properties, thus getClass won't return null.
+        this.defaultKeyConverter = plugins.newConverter(
+                config.getClass(WorkerConfig.KEY_CONVERTER_CLASS_CONFIG).getName(),
+                config
+        );
         this.defaultKeyConverter.configure(config.originalsWithPrefix("key.converter."), true);
-        this.defaultValueConverter = config.getConfiguredInstance(WorkerConfig.VALUE_CONVERTER_CLASS_CONFIG, Converter.class);
+        this.defaultValueConverter = plugins.newConverter(
+                config.getClass(WorkerConfig.VALUE_CONVERTER_CLASS_CONFIG).getName(),
+                config
+        );
         this.defaultValueConverter.configure(config.originalsWithPrefix("value.converter."), false);
-        this.internalKeyConverter = config.getConfiguredInstance(WorkerConfig.INTERNAL_KEY_CONVERTER_CLASS_CONFIG, Converter.class);
-        this.internalKeyConverter.configure(config.originalsWithPrefix("internal.key.converter."), true);
-        this.internalValueConverter = config.getConfiguredInstance(WorkerConfig.INTERNAL_VALUE_CONVERTER_CLASS_CONFIG, Converter.class);
-        this.internalValueConverter.configure(config.originalsWithPrefix("internal.value.converter."), false);
+        // Same, internal converters are required properties, thus getClass won't return null.
+        this.internalKeyConverter = plugins.newConverter(
+                config.getClass(WorkerConfig.INTERNAL_KEY_CONVERTER_CLASS_CONFIG).getName(),
+                config
+        );
+        this.internalKeyConverter.configure(
+                config.originalsWithPrefix("internal.key.converter."),
+                true);
+        this.internalValueConverter = plugins.newConverter(
+                config.getClass(WorkerConfig.INTERNAL_VALUE_CONVERTER_CLASS_CONFIG).getName(),
+                config
+        );
+        this.internalValueConverter.configure(
+                config.originalsWithPrefix("internal.value.converter."),
+                false
+        );
 
         this.offsetBackingStore = offsetBackingStore;
         this.offsetBackingStore.configure(config);
@@ -171,17 +197,23 @@ public class Worker {
             throw new ConnectException("Connector with name " + connName + " already exists");
 
         final WorkerConnector workerConnector;
+        ClassLoader savedLoader = plugins.currentThreadLoader();
         try {
-            final ConnectorConfig connConfig = new ConnectorConfig(connProps);
+            final ConnectorConfig connConfig = new ConnectorConfig(plugins, connProps);
             final String connClass = connConfig.getString(ConnectorConfig.CONNECTOR_CLASS_CONFIG);
             log.info("Creating connector {} of type {}", connName, connClass);
-            final Connector connector = connectorFactory.newConnector(connClass);
+            final Connector connector = plugins.newConnector(connClass);
             workerConnector = new WorkerConnector(connName, connector, ctx, statusListener);
             log.info("Instantiated connector {} with version {} of type {}", connName, connector.version(), connector.getClass());
+            savedLoader = plugins.compareAndSwapLoaders(connector);
             workerConnector.initialize(connConfig);
             workerConnector.transitionTo(initialState);
+            Plugins.compareAndSwapLoaders(savedLoader);
         } catch (Throwable t) {
             log.error("Failed to start connector {}", connName, t);
+            // Can't be put in a finally block because it needs to be swapped before the call on
+            // statusListener
+            Plugins.compareAndSwapLoaders(savedLoader);
             statusListener.onFailure(connName, t);
             return false;
         }
@@ -205,7 +237,14 @@ public class Worker {
         WorkerConnector workerConnector = connectors.get(connName);
         if (workerConnector == null)
             throw new ConnectException("Connector " + connName + " not found in this worker.");
-        return workerConnector.isSinkConnector();
+
+        ClassLoader savedLoader = plugins.currentThreadLoader();
+        try {
+            savedLoader = plugins.compareAndSwapLoaders(workerConnector.connector());
+            return workerConnector.isSinkConnector();
+        } finally {
+            Plugins.compareAndSwapLoaders(savedLoader);
+        }
     }
 
     /**
@@ -225,14 +264,23 @@ public class Worker {
 
         Connector connector = workerConnector.connector();
         List<Map<String, String>> result = new ArrayList<>();
-        String taskClassName = connector.taskClass().getName();
-        for (Map<String, String> taskProps : connector.taskConfigs(maxTasks)) {
-            Map<String, String> taskConfig = new HashMap<>(taskProps); // Ensure we don't modify the connector's copy of the config
-            taskConfig.put(TaskConfig.TASK_CLASS_CONFIG, taskClassName);
-            if (sinkTopics != null)
-                taskConfig.put(SinkTask.TOPICS_CONFIG, Utils.join(sinkTopics, ","));
-            result.add(taskConfig);
+        ClassLoader savedLoader = plugins.currentThreadLoader();
+        try {
+            savedLoader = plugins.compareAndSwapLoaders(connector);
+            String taskClassName = connector.taskClass().getName();
+            for (Map<String, String> taskProps : connector.taskConfigs(maxTasks)) {
+                // Ensure we don't modify the connector's copy of the config
+                Map<String, String> taskConfig = new HashMap<>(taskProps);
+                taskConfig.put(TaskConfig.TASK_CLASS_CONFIG, taskClassName);
+                if (sinkTopics != null) {
+                    taskConfig.put(SinkTask.TOPICS_CONFIG, Utils.join(sinkTopics, ","));
+                }
+                result.add(taskConfig);
+            }
+        } finally {
+            Plugins.compareAndSwapLoaders(savedLoader);
         }
+
         return result;
     }
 
@@ -252,13 +300,19 @@ public class Worker {
     public boolean stopConnector(String connName) {
         log.info("Stopping connector {}", connName);
 
-        WorkerConnector connector = connectors.remove(connName);
-        if (connector == null) {
+        WorkerConnector workerConnector = connectors.remove(connName);
+        if (workerConnector == null) {
             log.warn("Ignoring stop request for unowned connector {}", connName);
             return false;
         }
 
-        connector.shutdown();
+        ClassLoader savedLoader = plugins.currentThreadLoader();
+        try {
+            savedLoader = plugins.compareAndSwapLoaders(workerConnector.connector());
+            workerConnector.shutdown();
+        } finally {
+            Plugins.compareAndSwapLoaders(savedLoader);
+        }
 
         log.info("Stopped connector {}", connName);
         return true;
@@ -280,8 +334,8 @@ public class Worker {
      * @return true if the connector is running, false if the connector is not running or is not manages by this worker.
      */
     public boolean isRunning(String connName) {
-        WorkerConnector connector = connectors.get(connName);
-        return connector != null && connector.isRunning();
+        WorkerConnector workerConnector = connectors.get(connName);
+        return workerConnector != null && workerConnector.isRunning();
     }
 
     /**
@@ -307,14 +361,20 @@ public class Worker {
             throw new ConnectException("Task already exists in this worker: " + id);
 
         final WorkerTask workerTask;
+        ClassLoader savedLoader = plugins.currentThreadLoader();
         try {
-            final ConnectorConfig connConfig = new ConnectorConfig(connProps);
+            final ConnectorConfig connConfig = new ConnectorConfig(plugins, connProps);
+            String connType = connConfig.getString(ConnectorConfig.CONNECTOR_CLASS_CONFIG);
+            ClassLoader connectorLoader = plugins.delegatingLoader().connectorLoader(connType);
+            savedLoader = Plugins.compareAndSwapLoaders(connectorLoader);
             final TaskConfig taskConfig = new TaskConfig(taskProps);
-
             final Class<? extends Task> taskClass = taskConfig.getClass(TaskConfig.TASK_CLASS_CONFIG).asSubclass(Task.class);
-            final Task task = connectorFactory.newTask(taskClass);
+            final Task task = plugins.newTask(taskClass);
             log.info("Instantiated task {} with version {} of type {}", id, task.version(), taskClass.getName());
 
+            // By maintaining connector's specific class loader for this thread here, we first
+            // search for converters within the connector dependencies, and if not found the
+            // plugin class loader delegates loading to the delegating classloader.
             Converter keyConverter = connConfig.getConfiguredInstance(WorkerConfig.KEY_CONVERTER_CLASS_CONFIG, Converter.class);
             if (keyConverter != null)
                 keyConverter.configure(connConfig.originalsWithPrefix("key.converter."), true);
@@ -326,10 +386,14 @@ public class Worker {
             else
                 valueConverter = defaultValueConverter;
 
-            workerTask = buildWorkerTask(connConfig, id, task, statusListener, initialState, keyConverter, valueConverter);
+            workerTask = buildWorkerTask(connConfig, id, task, statusListener, initialState, keyConverter, valueConverter, connectorLoader);
             workerTask.initialize(taskConfig);
+            Plugins.compareAndSwapLoaders(savedLoader);
         } catch (Throwable t) {
             log.error("Failed to start task {}", id, t);
+            // Can't be put in a finally block because it needs to be swapped before the call on
+            // statusListener
+            Plugins.compareAndSwapLoaders(savedLoader);
             statusListener.onFailure(id, t);
             return false;
         }
@@ -351,7 +415,8 @@ public class Worker {
                                        TaskStatus.Listener statusListener,
                                        TargetState initialState,
                                        Converter keyConverter,
-                                       Converter valueConverter) {
+                                       Converter valueConverter,
+                                       ClassLoader loader) {
         // Decide which type of worker task we need based on the type of task.
         if (task instanceof SourceTask) {
             TransformationChain<SourceRecord> transformationChain = new TransformationChain<>(connConfig.<SourceRecord>transformations());
@@ -361,11 +426,11 @@ public class Worker {
                     internalKeyConverter, internalValueConverter);
             KafkaProducer<byte[], byte[]> producer = new KafkaProducer<>(producerProps);
             return new WorkerSourceTask(id, (SourceTask) task, statusListener, initialState, keyConverter,
-                     valueConverter, transformationChain, producer, offsetReader, offsetWriter, config, time);
+                    valueConverter, transformationChain, producer, offsetReader, offsetWriter, config, loader, time);
         } else if (task instanceof SinkTask) {
             TransformationChain<SinkRecord> transformationChain = new TransformationChain<>(connConfig.<SinkRecord>transformations());
             return new WorkerSinkTask(id, (SinkTask) task, statusListener, initialState, config, keyConverter,
-                    valueConverter, transformationChain, time);
+                    valueConverter, transformationChain, loader, time);
         } else {
             log.error("Tasks must be a subclass of either SourceTask or SinkTask", task);
             throw new ConnectException("Tasks must be a subclass of either SourceTask or SinkTask");
@@ -382,7 +447,14 @@ public class Worker {
         log.info("Stopping task {}", task.id());
         if (task instanceof WorkerSourceTask)
             sourceTaskOffsetCommitter.remove(task.id());
-        task.stop();
+
+        ClassLoader savedLoader = plugins.currentThreadLoader();
+        try {
+            savedLoader = Plugins.compareAndSwapLoaders(task.loader());
+            task.stop();
+        } finally {
+            Plugins.compareAndSwapLoaders(savedLoader);
+        }
     }
 
     private void stopTasks(Collection<ConnectorTaskId> ids) {
@@ -457,8 +529,8 @@ public class Worker {
         return internalValueConverter;
     }
 
-    public ConnectorFactory getConnectorFactory() {
-        return connectorFactory;
+    public Plugins getPlugins() {
+        return plugins;
     }
 
     public String workerId() {
@@ -468,14 +540,37 @@ public class Worker {
     public void setTargetState(String connName, TargetState state) {
         log.info("Setting connector {} state to {}", connName, state);
 
-        WorkerConnector connector = connectors.get(connName);
-        if (connector != null)
-            connector.transitionTo(state);
+        WorkerConnector workerConnector = connectors.get(connName);
+        if (workerConnector != null) {
+            ClassLoader connectorLoader =
+                    plugins.delegatingLoader().connectorLoader(workerConnector.connector());
+            transitionTo(workerConnector, state, connectorLoader);
+        }
 
         for (Map.Entry<ConnectorTaskId, WorkerTask> taskEntry : tasks.entrySet()) {
-            if (taskEntry.getKey().connector().equals(connName))
-                taskEntry.getValue().transitionTo(state);
+            if (taskEntry.getKey().connector().equals(connName)) {
+                WorkerTask workerTask = taskEntry.getValue();
+                transitionTo(workerTask, state, workerTask.loader());
+            }
         }
     }
 
+    private void transitionTo(Object connectorOrTask, TargetState state, ClassLoader loader) {
+        ClassLoader savedLoader = plugins.currentThreadLoader();
+        try {
+            savedLoader = Plugins.compareAndSwapLoaders(loader);
+            if (connectorOrTask instanceof WorkerConnector) {
+                ((WorkerConnector) connectorOrTask).transitionTo(state);
+            } else if (connectorOrTask instanceof WorkerTask) {
+                ((WorkerTask) connectorOrTask).transitionTo(state);
+            } else {
+                throw new ConnectException(
+                        "Request for state transition on an object that is neither a "
+                                + "WorkerConnector nor a WorkerTask: "
+                                + connectorOrTask.getClass());
+            }
+        } finally {
+            Plugins.compareAndSwapLoaders(savedLoader);
+        }
+    }
 }

http://git-wip-us.apache.org/repos/asf/kafka/blob/45f22617/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerConfig.java
----------------------------------------------------------------------
diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerConfig.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerConfig.java
index 680edaf..fe7a35a 100644
--- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerConfig.java
+++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerConfig.java
@@ -21,6 +21,9 @@ import org.apache.kafka.common.config.ConfigDef;
 import org.apache.kafka.common.config.ConfigDef.Importance;
 import org.apache.kafka.common.config.ConfigDef.Type;
 
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
 import java.util.Map;
 
 /**
@@ -122,6 +125,18 @@ public class WorkerConfig extends AbstractConfig {
         + "The default value of the Access-Control-Allow-Methods header allows cross origin requests for GET, POST and HEAD.";
     protected static final String ACCESS_CONTROL_ALLOW_METHODS_DEFAULT = "";
 
+    public static final String PLUGIN_PATH_CONFIG = "plugin.path";
+    protected static final String PLUGIN_PATH_DOC = "List of paths separated by commas (,) that "
+            + "contain plugins (connectors, converters, transformations). The list should consist"
+            + " of top level directories that include any combination of: \n"
+            + "a) directories immediately containing jars with plugins and their dependencies\n"
+            + "b) uber-jars with plugins and their dependencies\n"
+            + "c) directories immediately containing the package directory structure of classes of "
+            + "plugins and their dependencies\n"
+            + "Note: symlinks will be followed to discover dependencies or plugins.\n"
+            + "Examples: plugin.path=/usr/local/share/java,/usr/local/share/kafka/plugins,"
+            + "/opt/connectors";
+
     /**
      * Get a basic ConfigDef for a WorkerConfig. This includes all the common settings. Subclasses can use this to
      * bootstrap their own ConfigDef.
@@ -155,7 +170,21 @@ public class WorkerConfig extends AbstractConfig {
                         ACCESS_CONTROL_ALLOW_ORIGIN_DOC)
                 .define(ACCESS_CONTROL_ALLOW_METHODS_CONFIG, Type.STRING,
                         ACCESS_CONTROL_ALLOW_METHODS_DEFAULT, Importance.LOW,
-                        ACCESS_CONTROL_ALLOW_METHODS_DOC);
+                        ACCESS_CONTROL_ALLOW_METHODS_DOC)
+                .define(
+                        PLUGIN_PATH_CONFIG,
+                        Type.LIST,
+                        null,
+                        Importance.LOW,
+                        PLUGIN_PATH_DOC
+                );
+    }
+
+    public static List<String> pluginLocations(Map<String, String> props) {
+        String locationList = props.get(WorkerConfig.PLUGIN_PATH_CONFIG);
+        return locationList == null
+                         ? new ArrayList<String>()
+                         : Arrays.asList(locationList.trim().split("\\s*,\\s*", -1));
     }
 
     public WorkerConfig(ConfigDef definition, Map<String, String> props) {

http://git-wip-us.apache.org/repos/asf/kafka/blob/45f22617/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSinkTask.java
----------------------------------------------------------------------
diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSinkTask.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSinkTask.java
index d5f337d..43ad6a1 100644
--- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSinkTask.java
+++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSinkTask.java
@@ -83,8 +83,9 @@ class WorkerSinkTask extends WorkerTask {
                           Converter keyConverter,
                           Converter valueConverter,
                           TransformationChain<SinkRecord> transformationChain,
+                          ClassLoader loader,
                           Time time) {
-        super(id, statusListener, initialState);
+        super(id, statusListener, initialState, loader);
 
         this.workerConfig = workerConfig;
         this.task = task;

http://git-wip-us.apache.org/repos/asf/kafka/blob/45f22617/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSourceTask.java
----------------------------------------------------------------------
diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSourceTask.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSourceTask.java
index ed15b85..5627145 100644
--- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSourceTask.java
+++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSourceTask.java
@@ -86,8 +86,9 @@ class WorkerSourceTask extends WorkerTask {
                             OffsetStorageReader offsetReader,
                             OffsetStorageWriter offsetWriter,
                             WorkerConfig workerConfig,
+                            ClassLoader loader,
                             Time time) {
-        super(id, statusListener, initialState);
+        super(id, statusListener, initialState, loader);
 
         this.workerConfig = workerConfig;
         this.task = task;

http://git-wip-us.apache.org/repos/asf/kafka/blob/45f22617/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerTask.java
----------------------------------------------------------------------
diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerTask.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerTask.java
index 43d45d8..9b233dd 100644
--- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerTask.java
+++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerTask.java
@@ -16,6 +16,7 @@
  */
 package org.apache.kafka.connect.runtime;
 
+import org.apache.kafka.connect.runtime.isolation.Plugins;
 import org.apache.kafka.connect.util.ConnectorTaskId;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -39,6 +40,7 @@ abstract class WorkerTask implements Runnable {
 
     protected final ConnectorTaskId id;
     private final TaskStatus.Listener statusListener;
+    protected final ClassLoader loader;
     private final CountDownLatch shutdownLatch = new CountDownLatch(1);
     private volatile TargetState targetState;
     private volatile boolean stopping;   // indicates whether the Worker has asked the task to stop
@@ -46,9 +48,11 @@ abstract class WorkerTask implements Runnable {
 
     public WorkerTask(ConnectorTaskId id,
                       TaskStatus.Listener statusListener,
-                      TargetState initialState) {
+                      TargetState initialState,
+                      ClassLoader loader) {
         this.id = id;
         this.statusListener = statusListener;
+        this.loader = loader;
         this.targetState = initialState;
         this.stopping = false;
         this.cancelled = false;
@@ -58,6 +62,10 @@ abstract class WorkerTask implements Runnable {
         return id;
     }
 
+    public ClassLoader loader() {
+        return loader;
+    }
+
     /**
      * Initialize the task for execution.
      * @param taskConfig initial configuration
@@ -177,6 +185,7 @@ abstract class WorkerTask implements Runnable {
 
     @Override
     public void run() {
+        ClassLoader savedLoader = Plugins.compareAndSwapLoaders(loader);
         try {
             doRun();
             onShutdown();
@@ -186,6 +195,7 @@ abstract class WorkerTask implements Runnable {
             if (t instanceof Error)
                 throw (Error) t;
         } finally {
+            Plugins.compareAndSwapLoaders(savedLoader);
             shutdownLatch.countDown();
         }
     }

http://git-wip-us.apache.org/repos/asf/kafka/blob/45f22617/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/DistributedHerder.java
----------------------------------------------------------------------
diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/DistributedHerder.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/DistributedHerder.java
index e908d0b..8f7503e 100644
--- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/DistributedHerder.java
+++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/DistributedHerder.java
@@ -954,10 +954,10 @@ public class DistributedHerder extends AbstractHerder implements Runnable {
             ConnectorConfig connConfig;
             List<String> sinkTopics = null;
             if (worker.isSinkConnector(connName)) {
-                connConfig = new SinkConnectorConfig(configs);
+                connConfig = new SinkConnectorConfig(plugins(), configs);
                 sinkTopics = connConfig.getList(SinkConnectorConfig.TOPICS_CONFIG);
             } else {
-                connConfig = new SourceConnectorConfig(configs);
+                connConfig = new SourceConnectorConfig(plugins(), configs);
             }
 
             final List<Map<String, String>> taskProps

http://git-wip-us.apache.org/repos/asf/kafka/blob/45f22617/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/isolation/DelegatingClassLoader.java
----------------------------------------------------------------------
diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/isolation/DelegatingClassLoader.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/isolation/DelegatingClassLoader.java
new file mode 100644
index 0000000..da8b444
--- /dev/null
+++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/isolation/DelegatingClassLoader.java
@@ -0,0 +1,299 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.connect.runtime.isolation;
+
+import org.apache.kafka.connect.connector.Connector;
+import org.apache.kafka.connect.storage.Converter;
+import org.apache.kafka.connect.transforms.Transformation;
+import org.reflections.Reflections;
+import org.reflections.util.ClasspathHelper;
+import org.reflections.util.ConfigurationBuilder;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.net.MalformedURLException;
+import java.net.URL;
+import java.net.URLClassLoader;
+import java.nio.file.Files;
+import java.nio.file.InvalidPathException;
+import java.nio.file.Path;
+import java.nio.file.Paths;
+import java.security.AccessController;
+import java.security.PrivilegedAction;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.SortedMap;
+import java.util.SortedSet;
+import java.util.TreeMap;
+import java.util.TreeSet;
+
+public class DelegatingClassLoader extends URLClassLoader {
+    private static final Logger log = LoggerFactory.getLogger(DelegatingClassLoader.class);
+
+    private final Map<String, SortedMap<PluginDesc<?>, ClassLoader>> pluginLoaders;
+    private final SortedSet<PluginDesc<Connector>> connectors;
+    private final SortedSet<PluginDesc<Converter>> converters;
+    private final SortedSet<PluginDesc<Transformation>> transformations;
+    private final List<String> pluginPaths;
+    private final Map<Path, PluginClassLoader> activePaths;
+
+    public DelegatingClassLoader(List<String> pluginPaths, ClassLoader parent) {
+        super(new URL[0], parent);
+        this.pluginPaths = pluginPaths;
+        this.pluginLoaders = new HashMap<>();
+        this.activePaths = new HashMap<>();
+        this.connectors = new TreeSet<>();
+        this.converters = new TreeSet<>();
+        this.transformations = new TreeSet<>();
+    }
+
+    public DelegatingClassLoader(List<String> pluginPaths) {
+        this(pluginPaths, ClassLoader.getSystemClassLoader());
+    }
+
+    public Set<PluginDesc<Connector>> connectors() {
+        return connectors;
+    }
+
+    public Set<PluginDesc<Converter>> converters() {
+        return converters;
+    }
+
+    public Set<PluginDesc<Transformation>> transformations() {
+        return transformations;
+    }
+
+    public ClassLoader connectorLoader(Connector connector) {
+        return connectorLoader(connector.getClass().getName());
+    }
+
+    public ClassLoader connectorLoader(String connectorClassOrAlias) {
+        log.debug("Getting plugin class loader for connector: '{}'", connectorClassOrAlias);
+        SortedMap<PluginDesc<?>, ClassLoader> inner =
+                pluginLoaders.get(connectorClassOrAlias);
+        if (inner == null) {
+            log.error(
+                    "Plugin class loader for connector: '{}' was not found. Returning: {}",
+                    connectorClassOrAlias,
+                    this
+            );
+            return this;
+        }
+        return inner.get(inner.lastKey());
+    }
+
+    private static PluginClassLoader newPluginClassLoader(
+            final URL pluginLocation,
+            final URL[] urls,
+            final ClassLoader parent
+    ) {
+        return (PluginClassLoader) AccessController.doPrivileged(
+                new PrivilegedAction() {
+                    @Override
+                    public Object run() {
+                        return new PluginClassLoader(pluginLocation, urls, parent);
+                    }
+                }
+        );
+    }
+
+    private <T> void addPlugins(Collection<PluginDesc<T>> plugins, ClassLoader loader) {
+        for (PluginDesc<T> plugin : plugins) {
+            String pluginClassName = plugin.className();
+            SortedMap<PluginDesc<?>, ClassLoader> inner = pluginLoaders.get(pluginClassName);
+            if (inner == null) {
+                inner = new TreeMap<>();
+                pluginLoaders.put(pluginClassName, inner);
+                // TODO: once versioning is enabled this line should be moved outside this if branch
+                log.info("Added plugin '{}'", pluginClassName);
+            }
+            inner.put(plugin, loader);
+        }
+    }
+
+    protected void initLoaders() {
+        String path = null;
+        try {
+            for (String configPath : pluginPaths) {
+                path = configPath;
+                Path pluginPath = Paths.get(path).toAbsolutePath();
+                // Currently 'plugin.paths' property is a list of top-level directories
+                // containing plugins
+                if (Files.isDirectory(pluginPath)) {
+                    for (Path pluginLocation : PluginUtils.pluginLocations(pluginPath)) {
+                        log.info("Loading plugin from: {}", pluginLocation);
+                        URL[] urls = PluginUtils.pluginUrls(pluginLocation).toArray(new URL[0]);
+                        if (log.isDebugEnabled()) {
+                            log.debug("Loading plugin urls: {}", Arrays.toString(urls));
+                        }
+                        PluginClassLoader loader = newPluginClassLoader(
+                                pluginLocation.toUri().toURL(),
+                                urls,
+                                this
+                        );
+
+                        scanUrlsAndAddPlugins(loader, urls, pluginLocation);
+                    }
+                }
+            }
+
+            path = "classpath";
+            // Finally add parent/system loader.
+            scanUrlsAndAddPlugins(
+                    getParent(),
+                    ClasspathHelper.forJavaClassPath().toArray(new URL[0]),
+                    null
+            );
+        } catch (InvalidPathException | MalformedURLException e) {
+            log.error("Invalid path in plugin path: {}. Ignoring.", path);
+        } catch (IOException e) {
+            log.error("Could not get listing for plugin path: {}. Ignoring.", path);
+        } catch (InstantiationException | IllegalAccessException e) {
+            log.error("Could not instantiate plugins in: {}. Ignoring: {}", path, e);
+        }
+        addAllAliases();
+    }
+
+    private void scanUrlsAndAddPlugins(
+            ClassLoader loader,
+            URL[] urls,
+            Path pluginLocation
+    ) throws InstantiationException, IllegalAccessException {
+        PluginScanResult plugins = scanPluginPath(loader, urls);
+        log.info("Registered loader: {}", loader);
+        if (!plugins.isEmpty()) {
+            if (loader instanceof PluginClassLoader) {
+                activePaths.put(pluginLocation, (PluginClassLoader) loader);
+            }
+
+            addPlugins(plugins.connectors(), loader);
+            connectors.addAll(plugins.connectors());
+            addPlugins(plugins.converters(), loader);
+            converters.addAll(plugins.converters());
+            addPlugins(plugins.transformations(), loader);
+            transformations.addAll(plugins.transformations());
+        }
+    }
+
+    private PluginScanResult scanPluginPath(
+            ClassLoader loader,
+            URL[] urls
+    ) throws InstantiationException, IllegalAccessException {
+        ConfigurationBuilder builder = new ConfigurationBuilder();
+        builder.setClassLoaders(new ClassLoader[]{loader});
+        builder.addUrls(urls);
+        Reflections reflections = new Reflections(builder);
+
+        return new PluginScanResult(
+                getPluginDesc(reflections, Connector.class, loader),
+                getPluginDesc(reflections, Converter.class, loader),
+                getPluginDesc(reflections, Transformation.class, loader)
+        );
+    }
+
+    private <T> Collection<PluginDesc<T>> getPluginDesc(
+            Reflections reflections,
+            Class<T> klass,
+            ClassLoader loader
+    ) throws InstantiationException, IllegalAccessException {
+        Set<Class<? extends T>> plugins = reflections.getSubTypesOf(klass);
+
+        Collection<PluginDesc<T>> result = new ArrayList<>();
+        for (Class<? extends T> plugin : plugins) {
+            if (PluginUtils.isConcrete(plugin)) {
+                // Temporary workaround until all the plugins are versioned.
+                if (Connector.class.isAssignableFrom(plugin)) {
+                    result.add(
+                            new PluginDesc<>(
+                                    plugin,
+                                    ((Connector) plugin.newInstance()).version(),
+                                    loader
+                            )
+                    );
+                } else {
+                    result.add(new PluginDesc<>(plugin, "undefined", loader));
+                }
+            }
+        }
+        return result;
+    }
+
+    @Override
+    protected Class<?> loadClass(String name, boolean resolve) throws ClassNotFoundException {
+        if (!PluginUtils.shouldLoadInIsolation(name)) {
+            // There are no paths in this classloader, will attempt to load with the parent.
+            return super.loadClass(name, resolve);
+        }
+
+        SortedMap<PluginDesc<?>, ClassLoader> inner = pluginLoaders.get(name);
+        if (inner != null) {
+            log.trace("Retrieving loaded class '{}' from '{}'", name, inner.get(inner.lastKey()));
+            ClassLoader pluginLoader = inner.get(inner.lastKey());
+            return pluginLoader instanceof PluginClassLoader
+                   ? ((PluginClassLoader) pluginLoader).loadClass(name, resolve)
+                   : super.loadClass(name, resolve);
+        }
+
+        Class<?> klass = null;
+        for (PluginClassLoader loader : activePaths.values()) {
+            try {
+                klass = loader.loadClass(name, resolve);
+                break;
+            } catch (ClassNotFoundException e) {
+                // Not found in this loader.
+            }
+        }
+        if (klass == null) {
+            return super.loadClass(name, resolve);
+        }
+        return klass;
+    }
+
+    private void addAllAliases() {
+        addAliases(connectors);
+        addAliases(converters);
+        addAliases(transformations);
+    }
+
+    private <S> void addAliases(Collection<PluginDesc<S>> plugins) {
+        for (PluginDesc<S> plugin : plugins) {
+            if (PluginUtils.isAliasUnique(plugin, plugins)) {
+                String simple = PluginUtils.simpleName(plugin);
+                String pruned = PluginUtils.prunedName(plugin);
+                SortedMap<PluginDesc<?>, ClassLoader> inner = pluginLoaders.get(plugin.className());
+                pluginLoaders.put(simple, inner);
+                if (simple.equals(pruned)) {
+                    log.info("Added alias '{}' to plugin '{}'", simple, plugin.className());
+                } else {
+                    pluginLoaders.put(pruned, inner);
+                    log.info(
+                            "Added aliases '{}' and '{}' to plugin '{}'",
+                            simple,
+                            pruned,
+                            plugin.className()
+                    );
+                }
+            }
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/45f22617/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/isolation/PluginClassLoader.java
----------------------------------------------------------------------
diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/isolation/PluginClassLoader.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/isolation/PluginClassLoader.java
new file mode 100644
index 0000000..07438e9
--- /dev/null
+++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/isolation/PluginClassLoader.java
@@ -0,0 +1,68 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.connect.runtime.isolation;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.net.URL;
+import java.net.URLClassLoader;
+
+public class PluginClassLoader extends URLClassLoader {
+    private static final Logger log = LoggerFactory.getLogger(DelegatingClassLoader.class);
+    private final URL pluginLocation;
+
+    public PluginClassLoader(URL pluginLocation, URL[] urls, ClassLoader parent) {
+        super(urls, parent);
+        this.pluginLocation = pluginLocation;
+    }
+
+    public PluginClassLoader(URL pluginLocation, URL[] urls) {
+        super(urls);
+        this.pluginLocation = pluginLocation;
+    }
+
+    public String location() {
+        return pluginLocation.toString();
+    }
+
+    @Override
+    public String toString() {
+        return "PluginClassLoader{pluginLocation=" + pluginLocation + "}";
+    }
+
+    @Override
+    protected Class<?> loadClass(String name, boolean resolve) throws ClassNotFoundException {
+        Class<?> klass = findLoadedClass(name);
+        if (klass == null) {
+            if (PluginUtils.shouldLoadInIsolation(name)) {
+                try {
+                    klass = findClass(name);
+                } catch (ClassNotFoundException e) {
+                    // Not found in loader's path. Search in parents.
+                }
+            }
+            if (klass == null) {
+                klass = super.loadClass(name, false);
+            }
+        }
+        if (resolve) {
+            resolveClass(klass);
+        }
+        return klass;
+    }
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/45f22617/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/isolation/PluginDesc.java
----------------------------------------------------------------------
diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/isolation/PluginDesc.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/isolation/PluginDesc.java
new file mode 100644
index 0000000..a607704
--- /dev/null
+++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/isolation/PluginDesc.java
@@ -0,0 +1,110 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.connect.runtime.isolation;
+
+import com.fasterxml.jackson.annotation.JsonProperty;
+import org.apache.maven.artifact.versioning.DefaultArtifactVersion;
+
+import java.util.Objects;
+
+public class PluginDesc<T> implements Comparable<PluginDesc<T>> {
+    private final Class<? extends T> klass;
+    private final String name;
+    private final String version;
+    private final DefaultArtifactVersion encodedVersion;
+    private final PluginType type;
+    private final String typeName;
+    private final String location;
+
+    public PluginDesc(Class<? extends T> klass, String version, ClassLoader loader) {
+        this.klass = klass;
+        this.name = klass.getName();
+        this.version = version;
+        this.encodedVersion = new DefaultArtifactVersion(version);
+        this.type = PluginType.from(klass);
+        this.typeName = type.toString();
+        this.location = loader instanceof PluginClassLoader
+                ? ((PluginClassLoader) loader).location()
+                : "classpath";
+    }
+
+    @Override
+    public String toString() {
+        return "PluginDesc{" +
+                "klass=" + klass +
+                ", name='" + name + '\'' +
+                ", version='" + version + '\'' +
+                ", encodedVersion=" + encodedVersion +
+                ", type=" + type +
+                ", typeName='" + typeName + '\'' +
+                ", location='" + location + '\'' +
+                '}';
+    }
+
+    public Class<? extends T> pluginClass() {
+        return klass;
+    }
+
+    @JsonProperty("class")
+    public String className() {
+        return name;
+    }
+
+    @JsonProperty("version")
+    public String version() {
+        return version;
+    }
+
+    public PluginType type() {
+        return type;
+    }
+
+    @JsonProperty("type")
+    public String typeName() {
+        return typeName;
+    }
+
+    @JsonProperty("location")
+    public String location() {
+        return location;
+    }
+
+    @Override
+    public boolean equals(Object o) {
+        if (this == o) {
+            return true;
+        }
+        if (!(o instanceof PluginDesc)) {
+            return false;
+        }
+        PluginDesc<?> that = (PluginDesc<?>) o;
+        return Objects.equals(klass, that.klass) &&
+                Objects.equals(version, that.version) &&
+                type == that.type;
+    }
+
+    @Override
+    public int hashCode() {
+        return Objects.hash(klass, version, type);
+    }
+
+    @Override
+    public int compareTo(PluginDesc other) {
+        int nameComp = name.compareTo(other.name);
+        return nameComp != 0 ? nameComp : encodedVersion.compareTo(other.encodedVersion);
+    }
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/45f22617/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/isolation/PluginScanResult.java
----------------------------------------------------------------------
diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/isolation/PluginScanResult.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/isolation/PluginScanResult.java
new file mode 100644
index 0000000..f3d2f21
--- /dev/null
+++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/isolation/PluginScanResult.java
@@ -0,0 +1,55 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.connect.runtime.isolation;
+
+import org.apache.kafka.connect.connector.Connector;
+import org.apache.kafka.connect.storage.Converter;
+import org.apache.kafka.connect.transforms.Transformation;
+
+import java.util.Collection;
+
+public class PluginScanResult {
+    private final Collection<PluginDesc<Connector>> connectors;
+    private final Collection<PluginDesc<Converter>> converters;
+    private final Collection<PluginDesc<Transformation>> transformations;
+
+    public PluginScanResult(
+            Collection<PluginDesc<Connector>> connectors,
+            Collection<PluginDesc<Converter>> converters,
+            Collection<PluginDesc<Transformation>> transformations
+    ) {
+        this.connectors = connectors;
+        this.converters = converters;
+        this.transformations = transformations;
+    }
+
+    public Collection<PluginDesc<Connector>> connectors() {
+        return connectors;
+    }
+
+    public Collection<PluginDesc<Converter>> converters() {
+        return converters;
+    }
+
+    public Collection<PluginDesc<Transformation>> transformations() {
+        return transformations;
+    }
+
+    public boolean isEmpty() {
+        return connectors().isEmpty() && converters().isEmpty() && transformations().isEmpty();
+    }
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/45f22617/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/isolation/PluginType.java
----------------------------------------------------------------------
diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/isolation/PluginType.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/isolation/PluginType.java
new file mode 100644
index 0000000..5649213
--- /dev/null
+++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/isolation/PluginType.java
@@ -0,0 +1,58 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.connect.runtime.isolation;
+
+import org.apache.kafka.connect.connector.Connector;
+import org.apache.kafka.connect.sink.SinkConnector;
+import org.apache.kafka.connect.source.SourceConnector;
+import org.apache.kafka.connect.storage.Converter;
+import org.apache.kafka.connect.transforms.Transformation;
+
+import java.util.Locale;
+
+public enum PluginType {
+    SOURCE(SourceConnector.class),
+    SINK(SinkConnector.class),
+    CONNECTOR(Connector.class),
+    CONVERTER(Converter.class),
+    TRANSFORMATION(Transformation.class),
+    UNKNOWN(Object.class);
+
+    private Class<?> klass;
+
+    PluginType(Class<?> klass) {
+        this.klass = klass;
+    }
+
+    public static PluginType from(Class<?> klass) {
+        for (PluginType type : PluginType.values()) {
+            if (type.klass.isAssignableFrom(klass)) {
+                return type;
+            }
+        }
+        return UNKNOWN;
+    }
+
+    public String simpleName() {
+        return klass.getSimpleName();
+    }
+
+    @Override
+    public String toString() {
+        return super.toString().toLowerCase(Locale.ROOT);
+    }
+}


[2/3] kafka git commit: KAFKA-3487: Support classloading isolation in Connect (KIP-146)

Posted by ew...@apache.org.
http://git-wip-us.apache.org/repos/asf/kafka/blob/45f22617/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/isolation/PluginUtils.java
----------------------------------------------------------------------
diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/isolation/PluginUtils.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/isolation/PluginUtils.java
new file mode 100644
index 0000000..b2be997
--- /dev/null
+++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/isolation/PluginUtils.java
@@ -0,0 +1,147 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.connect.runtime.isolation;
+
+import java.io.IOException;
+import java.lang.reflect.Modifier;
+import java.net.URL;
+import java.nio.file.DirectoryStream;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.List;
+import java.util.Locale;
+
+public class PluginUtils {
+    private static final String BLACKLIST = "^(?:"
+            + "java"
+            + "|javax"
+            + "|org\\.omg"
+            + "|org\\.w3c\\.dom"
+            + "|org\\.apache\\.kafka\\.common"
+            + "|org\\.apache\\.kafka\\.connect"
+            + "|org\\.apache\\.log4j"
+            + ")\\..*$";
+
+    private static final String WHITELIST = "^org\\.apache\\.kafka\\.connect\\.(?:"
+            + "transforms\\.(?!Transformation$).*"
+            + "|json\\..*"
+            + "|file\\..*"
+            + "|converters\\..*"
+            + "|storage\\.StringConverter"
+            + ")$";
+
+    private static final DirectoryStream.Filter<Path> PLUGIN_PATH_FILTER = new DirectoryStream
+            .Filter<Path>() {
+        @Override
+        public boolean accept(Path path) throws IOException {
+            return Files.isDirectory(path) || PluginUtils.isJar(path);
+        }
+    };
+
+    public static boolean shouldLoadInIsolation(String name) {
+        return !(name.matches(BLACKLIST) && !name.matches(WHITELIST));
+    }
+
+    public static boolean isConcrete(Class<?> klass) {
+        int mod = klass.getModifiers();
+        return !Modifier.isAbstract(mod) && !Modifier.isInterface(mod);
+    }
+
+    public static boolean isJar(Path path) {
+        return path.toString().toLowerCase(Locale.ROOT).endsWith(".jar");
+    }
+
+    public static List<URL> pluginUrls(Path pluginPath) throws IOException {
+        List<URL> urls = new ArrayList<>();
+        if (PluginUtils.isJar(pluginPath)) {
+            urls.add(pluginPath.toUri().toURL());
+        } else if (Files.isDirectory(pluginPath)) {
+            try (
+                    DirectoryStream<Path> listing = Files.newDirectoryStream(
+                            pluginPath,
+                            PLUGIN_PATH_FILTER
+                    )
+            ) {
+                for (Path jar : listing) {
+                    urls.add(jar.toUri().toURL());
+                }
+            }
+        }
+        return urls;
+    }
+
+    public static List<Path> pluginLocations(Path topPath) throws IOException {
+        List<Path> locations = new ArrayList<>();
+        // Non-recursive for now. Plugin directories or jars need to be exactly under the topPath.
+        try (
+                DirectoryStream<Path> listing = Files.newDirectoryStream(
+                        topPath,
+                        PLUGIN_PATH_FILTER
+                )
+        ) {
+            for (Path dir : listing) {
+                locations.add(dir);
+            }
+        }
+        return locations;
+    }
+
+    public static String simpleName(PluginDesc<?> plugin) {
+        return plugin.pluginClass().getSimpleName();
+    }
+
+    public static String prunedName(PluginDesc<?> plugin) {
+        // It's currently simpler to switch on type than do pattern matching.
+        switch (plugin.type()) {
+            case SOURCE:
+            case SINK:
+            case CONNECTOR:
+                return prunePluginName(plugin, "Connector");
+            default:
+                return prunePluginName(plugin, plugin.type().simpleName());
+        }
+    }
+
+    public static <U> boolean isAliasUnique(
+            PluginDesc<U> alias,
+            Collection<PluginDesc<U>> plugins
+    ) {
+        boolean matched = false;
+        for (PluginDesc<U> plugin : plugins) {
+            if (simpleName(alias).equals(simpleName(plugin))
+                    || prunedName(alias).equals(prunedName(plugin))) {
+                if (matched) {
+                    return false;
+                }
+                matched = true;
+            }
+        }
+        return true;
+    }
+
+    private static String prunePluginName(PluginDesc<?> plugin, String suffix) {
+        String simple = plugin.pluginClass().getSimpleName();
+        int pos = simple.lastIndexOf(suffix);
+        if (pos > 0) {
+            return simple.substring(0, pos);
+        }
+        return simple;
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/45f22617/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/isolation/Plugins.java
----------------------------------------------------------------------
diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/isolation/Plugins.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/isolation/Plugins.java
new file mode 100644
index 0000000..654f485
--- /dev/null
+++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/isolation/Plugins.java
@@ -0,0 +1,217 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.connect.runtime.isolation;
+
+import org.apache.kafka.common.Configurable;
+import org.apache.kafka.common.config.AbstractConfig;
+import org.apache.kafka.common.utils.Utils;
+import org.apache.kafka.connect.connector.ConnectRecord;
+import org.apache.kafka.connect.connector.Connector;
+import org.apache.kafka.connect.connector.Task;
+import org.apache.kafka.connect.errors.ConnectException;
+import org.apache.kafka.connect.runtime.WorkerConfig;
+import org.apache.kafka.connect.storage.Converter;
+import org.apache.kafka.connect.transforms.Transformation;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.security.AccessController;
+import java.security.PrivilegedAction;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+public class Plugins {
+    private static final Logger log = LoggerFactory.getLogger(Plugins.class);
+    private final DelegatingClassLoader delegatingLoader;
+
+    public Plugins(Map<String, String> props) {
+        List<String> pluginLocations = WorkerConfig.pluginLocations(props);
+        delegatingLoader = newDelegatingClassLoader(pluginLocations);
+        delegatingLoader.initLoaders();
+    }
+
+    private static DelegatingClassLoader newDelegatingClassLoader(final List<String> paths) {
+        return (DelegatingClassLoader) AccessController.doPrivileged(
+                new PrivilegedAction() {
+                    @Override
+                    public Object run() {
+                        return new DelegatingClassLoader(paths);
+                    }
+                }
+        );
+    }
+
+    private static <T> String pluginNames(Collection<PluginDesc<T>> plugins) {
+        return Utils.join(plugins, ", ");
+    }
+
+    protected static <T> T newPlugin(Class<T> klass) {
+        try {
+            return Utils.newInstance(klass);
+        } catch (Throwable t) {
+            throw new ConnectException("Instantiation error", t);
+        }
+    }
+
+    protected static <T> T newConfiguredPlugin(AbstractConfig config, Class<T> klass) {
+        T plugin = Utils.newInstance(klass);
+        if (plugin instanceof Configurable) {
+            ((Configurable) plugin).configure(config.originals());
+        }
+        return plugin;
+    }
+
+    @SuppressWarnings("unchecked")
+    protected static <U> Class<? extends U> pluginClass(
+            DelegatingClassLoader loader,
+            String classOrAlias,
+            Class<U> pluginClass
+    ) throws ClassNotFoundException {
+        Class<?> klass = loader.loadClass(classOrAlias, false);
+        if (pluginClass.isAssignableFrom(klass)) {
+            return (Class<? extends U>) klass;
+        }
+
+        throw new ClassNotFoundException(
+                "Requested class: "
+                        + classOrAlias
+                        + " does not extend " + pluginClass.getSimpleName()
+        );
+    }
+
+    public static ClassLoader compareAndSwapLoaders(ClassLoader loader) {
+        ClassLoader current = Thread.currentThread().getContextClassLoader();
+        if (!current.equals(loader)) {
+            Thread.currentThread().setContextClassLoader(loader);
+        }
+        return current;
+    }
+
+    public ClassLoader currentThreadLoader() {
+        return Thread.currentThread().getContextClassLoader();
+    }
+
+    public ClassLoader compareAndSwapWithDelegatingLoader() {
+        ClassLoader current = Thread.currentThread().getContextClassLoader();
+        if (!current.equals(delegatingLoader)) {
+            Thread.currentThread().setContextClassLoader(delegatingLoader);
+        }
+        return current;
+    }
+
+    public ClassLoader compareAndSwapLoaders(Connector connector) {
+        ClassLoader connectorLoader = delegatingLoader.connectorLoader(connector);
+        return compareAndSwapLoaders(connectorLoader);
+    }
+
+    public DelegatingClassLoader delegatingLoader() {
+        return delegatingLoader;
+    }
+
+    public Set<PluginDesc<Connector>> connectors() {
+        return delegatingLoader.connectors();
+    }
+
+    public Set<PluginDesc<Converter>> converters() {
+        return delegatingLoader.converters();
+    }
+
+    public Set<PluginDesc<Transformation>> transformations() {
+        return delegatingLoader.transformations();
+    }
+
+    @SuppressWarnings("unchecked")
+    public Connector newConnector(String connectorClassOrAlias) {
+        Class<? extends Connector> klass;
+        try {
+            klass = pluginClass(
+                    delegatingLoader,
+                    connectorClassOrAlias,
+                    Connector.class
+            );
+        } catch (ClassNotFoundException e) {
+            List<PluginDesc<Connector>> matches = new ArrayList<>();
+            for (PluginDesc<Connector> plugin : delegatingLoader.connectors()) {
+                Class<?> pluginClass = plugin.pluginClass();
+                String simpleName = pluginClass.getSimpleName();
+                if (simpleName.equals(connectorClassOrAlias)
+                        || simpleName.equals(connectorClassOrAlias + "Connector")) {
+                    matches.add(plugin);
+                }
+            }
+
+            if (matches.isEmpty()) {
+                throw new ConnectException(
+                        "Failed to find any class that implements Connector and which name matches "
+                                + connectorClassOrAlias
+                                + ", available connectors are: "
+                                + pluginNames(delegatingLoader.connectors())
+                );
+            }
+            if (matches.size() > 1) {
+                throw new ConnectException(
+                        "More than one connector matches alias "
+                                + connectorClassOrAlias
+                                +
+                                ". Please use full package and class name instead. Classes found: "
+                                + pluginNames(matches)
+                );
+            }
+
+            PluginDesc<Connector> entry = matches.get(0);
+            klass = entry.pluginClass();
+        }
+        return newPlugin(klass);
+    }
+
+    public Task newTask(Class<? extends Task> taskClass) {
+        return newPlugin(taskClass);
+    }
+
+    public Converter newConverter(String converterClassOrAlias) {
+        return newConverter(converterClassOrAlias, null);
+    }
+
+    public Converter newConverter(String converterClassOrAlias, AbstractConfig config) {
+        Class<? extends Converter> klass;
+        try {
+            klass = pluginClass(
+                    delegatingLoader,
+                    converterClassOrAlias,
+                    Converter.class
+            );
+        } catch (ClassNotFoundException e) {
+            throw new ConnectException(
+                    "Failed to find any class that implements Converter and which name matches "
+                            + converterClassOrAlias
+                            + ", available connectors are: "
+                            + pluginNames(delegatingLoader.converters())
+            );
+        }
+        return config != null ? newConfiguredPlugin(config, klass) : newPlugin(klass);
+    }
+
+    public <R extends ConnectRecord<R>> Transformation<R> newTranformations(
+            String transformationClassOrAlias
+    ) {
+        return null;
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/45f22617/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/entities/ConnectorPluginInfo.java
----------------------------------------------------------------------
diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/entities/ConnectorPluginInfo.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/entities/ConnectorPluginInfo.java
index ff3c30d..36b896f 100644
--- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/entities/ConnectorPluginInfo.java
+++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/entities/ConnectorPluginInfo.java
@@ -18,22 +18,12 @@ package org.apache.kafka.connect.runtime.rest.entities;
 
 import com.fasterxml.jackson.annotation.JsonCreator;
 import com.fasterxml.jackson.annotation.JsonProperty;
-
 import org.apache.kafka.connect.connector.Connector;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
+import org.apache.kafka.connect.runtime.isolation.PluginDesc;
 
-import java.util.Map;
 import java.util.Objects;
-import java.util.concurrent.ConcurrentHashMap;
 
 public class ConnectorPluginInfo {
-
-    private static final Logger log = LoggerFactory.getLogger(ConnectorPluginInfo.class);
-
-    private static final Map<Class<? extends Connector>, String>
-        VERSIONS = new ConcurrentHashMap<>();
-
     private String className;
     private ConnectorType type;
     private String version;
@@ -49,29 +39,8 @@ public class ConnectorPluginInfo {
         this.version = version;
     }
 
-    public ConnectorPluginInfo(Class<? extends Connector> klass) {
-        this(klass.getCanonicalName(), ConnectorType.from(klass), getVersion(klass));
-    }
-
-    private static String getVersion(Class<? extends Connector> klass) {
-        if (!VERSIONS.containsKey(klass)) {
-            synchronized (VERSIONS) {
-                if (!VERSIONS.containsKey(klass)) {
-                    try {
-                        VERSIONS.put(klass, klass.newInstance().version());
-                    } catch (
-                        ExceptionInInitializerError
-                            | InstantiationException
-                            | IllegalAccessException
-                            | SecurityException e
-                    ) {
-                        log.warn("Unable to instantiate connector", e);
-                        VERSIONS.put(klass, "unknown");
-                    }
-                }
-            }
-        }
-        return VERSIONS.get(klass);
+    public ConnectorPluginInfo(PluginDesc<Connector> plugin) {
+        this(plugin.className(), ConnectorType.from(plugin.pluginClass()), plugin.version());
     }
 
     @JsonProperty("class")

http://git-wip-us.apache.org/repos/asf/kafka/blob/45f22617/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/resources/ConnectorPluginsResource.java
----------------------------------------------------------------------
diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/resources/ConnectorPluginsResource.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/resources/ConnectorPluginsResource.java
index 37e0f01..24eb93b 100644
--- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/resources/ConnectorPluginsResource.java
+++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/resources/ConnectorPluginsResource.java
@@ -16,14 +16,18 @@
  */
 package org.apache.kafka.connect.runtime.rest.resources;
 
+import org.apache.kafka.connect.connector.Connector;
 import org.apache.kafka.connect.runtime.ConnectorConfig;
 import org.apache.kafka.connect.runtime.Herder;
-import org.apache.kafka.connect.runtime.PluginDiscovery;
+import org.apache.kafka.connect.runtime.isolation.PluginDesc;
 import org.apache.kafka.connect.runtime.rest.entities.ConfigInfos;
 import org.apache.kafka.connect.runtime.rest.entities.ConnectorPluginInfo;
-
-import java.util.List;
-import java.util.Map;
+import org.apache.kafka.connect.tools.MockConnector;
+import org.apache.kafka.connect.tools.MockSinkConnector;
+import org.apache.kafka.connect.tools.MockSourceConnector;
+import org.apache.kafka.connect.tools.SchemaSourceConnector;
+import org.apache.kafka.connect.tools.VerifiableSinkConnector;
+import org.apache.kafka.connect.tools.VerifiableSourceConnector;
 
 import javax.ws.rs.BadRequestException;
 import javax.ws.rs.Consumes;
@@ -33,6 +37,11 @@ import javax.ws.rs.Path;
 import javax.ws.rs.PathParam;
 import javax.ws.rs.Produces;
 import javax.ws.rs.core.MediaType;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
 
 @Path("/connector-plugins")
 @Produces(MediaType.APPLICATION_JSON)
@@ -41,9 +50,17 @@ public class ConnectorPluginsResource {
 
     private static final String ALIAS_SUFFIX = "Connector";
     private final Herder herder;
+    private final List<ConnectorPluginInfo> connectorPlugins;
+
+    private static final List<Class<? extends Connector>> CONNECTOR_EXCLUDES = Arrays.asList(
+            VerifiableSourceConnector.class, VerifiableSinkConnector.class,
+            MockConnector.class, MockSourceConnector.class, MockSinkConnector.class,
+            SchemaSourceConnector.class
+    );
 
     public ConnectorPluginsResource(Herder herder) {
         this.herder = herder;
+        this.connectorPlugins = new ArrayList<>();
     }
 
     @PUT
@@ -67,7 +84,20 @@ public class ConnectorPluginsResource {
     @GET
     @Path("/")
     public List<ConnectorPluginInfo> listConnectorPlugins() {
-        return PluginDiscovery.connectorPlugins();
+        return getConnectorPlugins();
+    }
+
+    // TODO: improve once plugins are allowed to be added/removed during runtime.
+    private synchronized List<ConnectorPluginInfo> getConnectorPlugins() {
+        if (connectorPlugins.isEmpty()) {
+            for (PluginDesc<Connector> plugin : herder.plugins().connectors()) {
+                if (!CONNECTOR_EXCLUDES.contains(plugin.pluginClass())) {
+                    connectorPlugins.add(new ConnectorPluginInfo(plugin));
+                }
+            }
+        }
+
+        return Collections.unmodifiableList(connectorPlugins);
     }
 
     private String normalizedPluginName(String pluginName) {

http://git-wip-us.apache.org/repos/asf/kafka/blob/45f22617/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/standalone/StandaloneHerder.java
----------------------------------------------------------------------
diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/standalone/StandaloneHerder.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/standalone/StandaloneHerder.java
index 9c8c7ae..d57e75f 100644
--- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/standalone/StandaloneHerder.java
+++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/standalone/StandaloneHerder.java
@@ -251,12 +251,12 @@ public class StandaloneHerder extends AbstractHerder {
 
         ConnectorConfig connConfig;
         if (worker.isSinkConnector(connName)) {
-            connConfig = new SinkConnectorConfig(config);
+            connConfig = new SinkConnectorConfig(plugins(), config);
             return worker.connectorTaskConfigs(connName,
                                                connConfig.getInt(ConnectorConfig.TASKS_MAX_CONFIG),
                                                connConfig.getList(SinkConnectorConfig.TOPICS_CONFIG));
         } else {
-            connConfig = new SourceConnectorConfig(config);
+            connConfig = new SourceConnectorConfig(plugins(), config);
             return worker.connectorTaskConfigs(connName,
                                                connConfig.getInt(ConnectorConfig.TASKS_MAX_CONFIG),
                                                null);

http://git-wip-us.apache.org/repos/asf/kafka/blob/45f22617/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/ConnectorConfigTest.java
----------------------------------------------------------------------
diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/ConnectorConfigTest.java b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/ConnectorConfigTest.java
index 375b9c0..f8c4fd6 100644
--- a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/ConnectorConfigTest.java
+++ b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/ConnectorConfigTest.java
@@ -20,12 +20,16 @@ import org.apache.kafka.common.config.ConfigDef;
 import org.apache.kafka.common.config.ConfigException;
 import org.apache.kafka.connect.connector.ConnectRecord;
 import org.apache.kafka.connect.connector.Connector;
+import org.apache.kafka.connect.runtime.isolation.PluginDesc;
+import org.apache.kafka.connect.runtime.isolation.Plugins;
 import org.apache.kafka.connect.transforms.Transformation;
 import org.junit.Test;
 
+import java.util.Collections;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
+import java.util.Set;
 
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertTrue;
@@ -33,6 +37,13 @@ import static org.junit.Assert.fail;
 
 public class ConnectorConfigTest<R extends ConnectRecord<R>> {
 
+    private static final Plugins MOCK_PLUGINS = new Plugins(new HashMap<String, String>()) {
+        @Override
+        public Set<PluginDesc<Transformation>> transformations() {
+            return Collections.emptySet();
+        }
+    };
+
     public static abstract class TestConnector extends Connector {
     }
 
@@ -67,7 +78,7 @@ public class ConnectorConfigTest<R extends ConnectRecord<R>> {
         Map<String, String> props = new HashMap<>();
         props.put("name", "test");
         props.put("connector.class", TestConnector.class.getName());
-        new ConnectorConfig(props);
+        new ConnectorConfig(MOCK_PLUGINS, props);
     }
 
     @Test(expected = ConfigException.class)
@@ -76,7 +87,7 @@ public class ConnectorConfigTest<R extends ConnectRecord<R>> {
         props.put("name", "test");
         props.put("connector.class", TestConnector.class.getName());
         props.put("transforms", "dangler");
-        new ConnectorConfig(props);
+        new ConnectorConfig(MOCK_PLUGINS, props);
     }
 
     @Test(expected = ConfigException.class)
@@ -86,7 +97,7 @@ public class ConnectorConfigTest<R extends ConnectRecord<R>> {
         props.put("connector.class", TestConnector.class.getName());
         props.put("transforms", "a");
         props.put("transforms.a.type", "uninstantiable");
-        new ConnectorConfig(props);
+        new ConnectorConfig(MOCK_PLUGINS, props);
     }
 
     @Test(expected = ConfigException.class)
@@ -96,7 +107,7 @@ public class ConnectorConfigTest<R extends ConnectRecord<R>> {
         props.put("connector.class", TestConnector.class.getName());
         props.put("transforms", "a");
         props.put("transforms.a.type", SimpleTransformation.class.getName());
-        new ConnectorConfig(props);
+        new ConnectorConfig(MOCK_PLUGINS, props);
     }
 
     @Test
@@ -108,7 +119,7 @@ public class ConnectorConfigTest<R extends ConnectRecord<R>> {
         props.put("transforms.a.type", SimpleTransformation.class.getName());
         props.put("transforms.a.magic.number", "40");
         try {
-            new ConnectorConfig(props);
+            new ConnectorConfig(MOCK_PLUGINS, props);
             fail();
         } catch (ConfigException e) {
             assertTrue(e.getMessage().contains("Value must be at least 42"));
@@ -123,7 +134,7 @@ public class ConnectorConfigTest<R extends ConnectRecord<R>> {
         props.put("transforms", "a");
         props.put("transforms.a.type", SimpleTransformation.class.getName());
         props.put("transforms.a.magic.number", "42");
-        final ConnectorConfig config = new ConnectorConfig(props);
+        final ConnectorConfig config = new ConnectorConfig(MOCK_PLUGINS, props);
         final List<Transformation<R>> transformations = config.transformations();
         assertEquals(1, transformations.size());
         final SimpleTransformation xform = (SimpleTransformation) transformations.get(0);
@@ -138,7 +149,7 @@ public class ConnectorConfigTest<R extends ConnectRecord<R>> {
         props.put("transforms", "a, b");
         props.put("transforms.a.type", SimpleTransformation.class.getName());
         props.put("transforms.a.magic.number", "42");
-        new ConnectorConfig(props);
+        new ConnectorConfig(MOCK_PLUGINS, props);
     }
 
     @Test
@@ -151,7 +162,7 @@ public class ConnectorConfigTest<R extends ConnectRecord<R>> {
         props.put("transforms.a.magic.number", "42");
         props.put("transforms.b.type", SimpleTransformation.class.getName());
         props.put("transforms.b.magic.number", "84");
-        final ConnectorConfig config = new ConnectorConfig(props);
+        final ConnectorConfig config = new ConnectorConfig(MOCK_PLUGINS, props);
         final List<Transformation<R>> transformations = config.transformations();
         assertEquals(2, transformations.size());
         assertEquals(42, ((SimpleTransformation) transformations.get(0)).magicNumber);

http://git-wip-us.apache.org/repos/asf/kafka/blob/45f22617/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerConnectorTest.java
----------------------------------------------------------------------
diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerConnectorTest.java b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerConnectorTest.java
index 9e77198..11b05ee 100644
--- a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerConnectorTest.java
+++ b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerConnectorTest.java
@@ -18,10 +18,12 @@ package org.apache.kafka.connect.runtime;
 
 import org.apache.kafka.connect.connector.Connector;
 import org.apache.kafka.connect.connector.ConnectorContext;
+import org.apache.kafka.connect.runtime.isolation.Plugins;
 import org.easymock.EasyMock;
 import org.easymock.EasyMockRunner;
 import org.easymock.EasyMockSupport;
 import org.easymock.Mock;
+import org.junit.Before;
 import org.junit.Test;
 import org.junit.runner.RunWith;
 
@@ -39,12 +41,18 @@ public class WorkerConnectorTest extends EasyMockSupport {
         CONFIG.put(ConnectorConfig.CONNECTOR_CLASS_CONFIG, TestConnector.class.getName());
         CONFIG.put(ConnectorConfig.NAME_CONFIG, CONNECTOR);
     }
-    public static final ConnectorConfig CONNECTOR_CONFIG = new ConnectorConfig(CONFIG);
+    public ConnectorConfig connectorConfig;
 
+    @Mock Plugins plugins;
     @Mock Connector connector;
     @Mock ConnectorContext ctx;
     @Mock ConnectorStatus.Listener listener;
 
+    @Before
+    public void setup() {
+        connectorConfig = new ConnectorConfig(plugins, CONFIG);
+    }
+
     @Test
     public void testInitializeFailure() {
         RuntimeException exception = new RuntimeException();
@@ -62,7 +70,7 @@ public class WorkerConnectorTest extends EasyMockSupport {
 
         WorkerConnector workerConnector = new WorkerConnector(CONNECTOR, connector, ctx, listener);
 
-        workerConnector.initialize(CONNECTOR_CONFIG);
+        workerConnector.initialize(connectorConfig);
         workerConnector.shutdown();
 
         verifyAll();
@@ -87,7 +95,7 @@ public class WorkerConnectorTest extends EasyMockSupport {
 
         WorkerConnector workerConnector = new WorkerConnector(CONNECTOR, connector, ctx, listener);
 
-        workerConnector.initialize(CONNECTOR_CONFIG);
+        workerConnector.initialize(connectorConfig);
         workerConnector.transitionTo(TargetState.STARTED);
         workerConnector.shutdown();
 
@@ -115,7 +123,7 @@ public class WorkerConnectorTest extends EasyMockSupport {
 
         WorkerConnector workerConnector = new WorkerConnector(CONNECTOR, connector, ctx, listener);
 
-        workerConnector.initialize(CONNECTOR_CONFIG);
+        workerConnector.initialize(connectorConfig);
         workerConnector.transitionTo(TargetState.STARTED);
         workerConnector.shutdown();
 
@@ -146,7 +154,7 @@ public class WorkerConnectorTest extends EasyMockSupport {
 
         WorkerConnector workerConnector = new WorkerConnector(CONNECTOR, connector, ctx, listener);
 
-        workerConnector.initialize(CONNECTOR_CONFIG);
+        workerConnector.initialize(connectorConfig);
         workerConnector.transitionTo(TargetState.STARTED);
         workerConnector.transitionTo(TargetState.PAUSED);
         workerConnector.shutdown();
@@ -178,7 +186,7 @@ public class WorkerConnectorTest extends EasyMockSupport {
 
         WorkerConnector workerConnector = new WorkerConnector(CONNECTOR, connector, ctx, listener);
 
-        workerConnector.initialize(CONNECTOR_CONFIG);
+        workerConnector.initialize(connectorConfig);
         workerConnector.transitionTo(TargetState.PAUSED);
         workerConnector.transitionTo(TargetState.STARTED);
         workerConnector.shutdown();
@@ -203,7 +211,7 @@ public class WorkerConnectorTest extends EasyMockSupport {
 
         WorkerConnector workerConnector = new WorkerConnector(CONNECTOR, connector, ctx, listener);
 
-        workerConnector.initialize(CONNECTOR_CONFIG);
+        workerConnector.initialize(connectorConfig);
         workerConnector.transitionTo(TargetState.PAUSED);
         workerConnector.shutdown();
 
@@ -230,7 +238,7 @@ public class WorkerConnectorTest extends EasyMockSupport {
 
         WorkerConnector workerConnector = new WorkerConnector(CONNECTOR, connector, ctx, listener);
 
-        workerConnector.initialize(CONNECTOR_CONFIG);
+        workerConnector.initialize(connectorConfig);
         workerConnector.transitionTo(TargetState.STARTED);
         workerConnector.shutdown();
 
@@ -260,7 +268,7 @@ public class WorkerConnectorTest extends EasyMockSupport {
 
         WorkerConnector workerConnector = new WorkerConnector(CONNECTOR, connector, ctx, listener);
 
-        workerConnector.initialize(CONNECTOR_CONFIG);
+        workerConnector.initialize(connectorConfig);
         workerConnector.transitionTo(TargetState.STARTED);
         workerConnector.shutdown();
 
@@ -289,7 +297,7 @@ public class WorkerConnectorTest extends EasyMockSupport {
 
         WorkerConnector workerConnector = new WorkerConnector(CONNECTOR, connector, ctx, listener);
 
-        workerConnector.initialize(CONNECTOR_CONFIG);
+        workerConnector.initialize(connectorConfig);
         workerConnector.transitionTo(TargetState.STARTED);
         workerConnector.transitionTo(TargetState.STARTED);
         workerConnector.shutdown();
@@ -321,7 +329,7 @@ public class WorkerConnectorTest extends EasyMockSupport {
 
         WorkerConnector workerConnector = new WorkerConnector(CONNECTOR, connector, ctx, listener);
 
-        workerConnector.initialize(CONNECTOR_CONFIG);
+        workerConnector.initialize(connectorConfig);
         workerConnector.transitionTo(TargetState.STARTED);
         workerConnector.transitionTo(TargetState.PAUSED);
         workerConnector.transitionTo(TargetState.PAUSED);

http://git-wip-us.apache.org/repos/asf/kafka/blob/45f22617/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerSinkTaskTest.java
----------------------------------------------------------------------
diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerSinkTaskTest.java b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerSinkTaskTest.java
index 26ac486..eb5f25c 100644
--- a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerSinkTaskTest.java
+++ b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerSinkTaskTest.java
@@ -30,6 +30,7 @@ import org.apache.kafka.common.utils.Time;
 import org.apache.kafka.connect.data.Schema;
 import org.apache.kafka.connect.data.SchemaAndValue;
 import org.apache.kafka.connect.errors.RetriableException;
+import org.apache.kafka.connect.runtime.isolation.PluginClassLoader;
 import org.apache.kafka.connect.runtime.standalone.StandaloneConfig;
 import org.apache.kafka.connect.sink.SinkConnector;
 import org.apache.kafka.connect.sink.SinkRecord;
@@ -104,6 +105,8 @@ public class WorkerSinkTaskTest {
     private Capture<WorkerSinkTaskContext> sinkTaskContext = EasyMock.newCapture();
     private WorkerConfig workerConfig;
     @Mock
+    private PluginClassLoader pluginLoader;
+    @Mock
     private Converter keyConverter;
     @Mock
     private Converter valueConverter;
@@ -129,9 +132,10 @@ public class WorkerSinkTaskTest {
         workerProps.put("internal.value.converter.schemas.enable", "false");
         workerProps.put("offset.storage.file.filename", "/tmp/connect.offsets");
         workerConfig = new StandaloneConfig(workerProps);
+        pluginLoader = PowerMock.createMock(PluginClassLoader.class);
         workerTask = PowerMock.createPartialMock(
                 WorkerSinkTask.class, new String[]{"createConsumer"},
-                taskId, sinkTask, statusListener, initialState, workerConfig, keyConverter, valueConverter, transformationChain, time);
+                taskId, sinkTask, statusListener, initialState, workerConfig, keyConverter, valueConverter, transformationChain, pluginLoader, time);
 
         recordsReturned = 0;
     }
@@ -140,7 +144,7 @@ public class WorkerSinkTaskTest {
     public void testStartPaused() throws Exception {
         workerTask = PowerMock.createPartialMock(
                 WorkerSinkTask.class, new String[]{"createConsumer"},
-                taskId, sinkTask, statusListener, TargetState.PAUSED, workerConfig, keyConverter, valueConverter, transformationChain, time);
+                taskId, sinkTask, statusListener, TargetState.PAUSED, workerConfig, keyConverter, valueConverter, transformationChain, pluginLoader, time);
 
         expectInitializeTask();
         expectPollInitialAssignment();

http://git-wip-us.apache.org/repos/asf/kafka/blob/45f22617/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerSinkTaskThreadedTest.java
----------------------------------------------------------------------
diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerSinkTaskThreadedTest.java b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerSinkTaskThreadedTest.java
index fb7cf4f..29a6b52 100644
--- a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerSinkTaskThreadedTest.java
+++ b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerSinkTaskThreadedTest.java
@@ -28,6 +28,7 @@ import org.apache.kafka.common.utils.Time;
 import org.apache.kafka.connect.data.Schema;
 import org.apache.kafka.connect.data.SchemaAndValue;
 import org.apache.kafka.connect.errors.ConnectException;
+import org.apache.kafka.connect.runtime.isolation.PluginClassLoader;
 import org.apache.kafka.connect.runtime.standalone.StandaloneConfig;
 import org.apache.kafka.connect.sink.SinkConnector;
 import org.apache.kafka.connect.sink.SinkRecord;
@@ -101,6 +102,8 @@ public class WorkerSinkTaskThreadedTest extends ThreadedTest {
     @Mock private SinkTask sinkTask;
     private Capture<WorkerSinkTaskContext> sinkTaskContext = EasyMock.newCapture();
     private WorkerConfig workerConfig;
+    @Mock
+    private PluginClassLoader pluginLoader;
     @Mock private Converter keyConverter;
     @Mock private Converter valueConverter;
     @Mock private TransformationChain transformationChain;
@@ -125,10 +128,12 @@ public class WorkerSinkTaskThreadedTest extends ThreadedTest {
         workerProps.put("internal.key.converter.schemas.enable", "false");
         workerProps.put("internal.value.converter.schemas.enable", "false");
         workerProps.put("offset.storage.file.filename", "/tmp/connect.offsets");
+        pluginLoader = PowerMock.createMock(PluginClassLoader.class);
         workerConfig = new StandaloneConfig(workerProps);
         workerTask = PowerMock.createPartialMock(
                 WorkerSinkTask.class, new String[]{"createConsumer"},
-                taskId, sinkTask, statusListener, initialState, workerConfig, keyConverter, valueConverter, TransformationChain.<SinkRecord>noOp(), time);
+                taskId, sinkTask, statusListener, initialState, workerConfig, keyConverter,
+                valueConverter, TransformationChain.<SinkRecord>noOp(), pluginLoader, time);
 
         recordsReturned = 0;
     }

http://git-wip-us.apache.org/repos/asf/kafka/blob/45f22617/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerSourceTaskTest.java
----------------------------------------------------------------------
diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerSourceTaskTest.java b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerSourceTaskTest.java
index 31204f0..a3ddb3e 100644
--- a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerSourceTaskTest.java
+++ b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerSourceTaskTest.java
@@ -23,6 +23,7 @@ import org.apache.kafka.common.TopicPartition;
 import org.apache.kafka.common.record.InvalidRecordException;
 import org.apache.kafka.common.utils.Time;
 import org.apache.kafka.connect.data.Schema;
+import org.apache.kafka.connect.runtime.isolation.Plugins;
 import org.apache.kafka.connect.runtime.standalone.StandaloneConfig;
 import org.apache.kafka.connect.source.SourceRecord;
 import org.apache.kafka.connect.source.SourceTask;
@@ -82,6 +83,7 @@ public class WorkerSourceTaskTest extends ThreadedTest {
     private ExecutorService executor = Executors.newSingleThreadExecutor();
     private ConnectorTaskId taskId = new ConnectorTaskId("job", 0);
     private WorkerConfig config;
+    private Plugins plugins;
     @Mock private SourceTask sourceTask;
     @Mock private Converter keyConverter;
     @Mock private Converter valueConverter;
@@ -116,6 +118,7 @@ public class WorkerSourceTaskTest extends ThreadedTest {
         workerProps.put("internal.key.converter.schemas.enable", "false");
         workerProps.put("internal.value.converter.schemas.enable", "false");
         workerProps.put("offset.storage.file.filename", "/tmp/connect.offsets");
+        plugins = new Plugins(workerProps);
         config = new StandaloneConfig(workerProps);
         producerCallbacks = EasyMock.newCapture();
     }
@@ -126,7 +129,7 @@ public class WorkerSourceTaskTest extends ThreadedTest {
 
     private void createWorkerTask(TargetState initialState) {
         workerTask = new WorkerSourceTask(taskId, sourceTask, statusListener, initialState, keyConverter, valueConverter, transformationChain,
-                producer, offsetReader, offsetWriter, config, Time.SYSTEM);
+                producer, offsetReader, offsetWriter, config, plugins.delegatingLoader(), Time.SYSTEM);
     }
 
     @Test

http://git-wip-us.apache.org/repos/asf/kafka/blob/45f22617/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerTaskTest.java
----------------------------------------------------------------------
diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerTaskTest.java b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerTaskTest.java
index 6c2fc4d..871c887 100644
--- a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerTaskTest.java
+++ b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerTaskTest.java
@@ -44,10 +44,16 @@ public class WorkerTaskTest {
         ConnectorTaskId taskId = new ConnectorTaskId("foo", 0);
 
         TaskStatus.Listener statusListener = EasyMock.createMock(TaskStatus.Listener.class);
+        ClassLoader loader = EasyMock.createMock(ClassLoader.class);
 
         WorkerTask workerTask = partialMockBuilder(WorkerTask.class)
-                .withConstructor(ConnectorTaskId.class, TaskStatus.Listener.class, TargetState.class)
-                .withArgs(taskId, statusListener, TargetState.STARTED)
+                .withConstructor(
+                        ConnectorTaskId.class,
+                        TaskStatus.Listener.class,
+                        TargetState.class,
+                        ClassLoader.class
+                )
+                .withArgs(taskId, statusListener, TargetState.STARTED, loader)
                 .addMockedMethod("initialize")
                 .addMockedMethod("execute")
                 .addMockedMethod("close")
@@ -83,10 +89,16 @@ public class WorkerTaskTest {
         ConnectorTaskId taskId = new ConnectorTaskId("foo", 0);
 
         TaskStatus.Listener statusListener = EasyMock.createMock(TaskStatus.Listener.class);
+        ClassLoader loader = EasyMock.createMock(ClassLoader.class);
 
         WorkerTask workerTask = partialMockBuilder(WorkerTask.class)
-                .withConstructor(ConnectorTaskId.class, TaskStatus.Listener.class, TargetState.class)
-                .withArgs(taskId, statusListener, TargetState.STARTED)
+                .withConstructor(
+                        ConnectorTaskId.class,
+                        TaskStatus.Listener.class,
+                        TargetState.class,
+                        ClassLoader.class
+                )
+                .withArgs(taskId, statusListener, TargetState.STARTED, loader)
                 .addMockedMethod("initialize")
                 .addMockedMethod("execute")
                 .addMockedMethod("close")
@@ -115,10 +127,16 @@ public class WorkerTaskTest {
         ConnectorTaskId taskId = new ConnectorTaskId("foo", 0);
 
         TaskStatus.Listener statusListener = EasyMock.createMock(TaskStatus.Listener.class);
+        ClassLoader loader = EasyMock.createMock(ClassLoader.class);
 
         WorkerTask workerTask = partialMockBuilder(WorkerTask.class)
-                .withConstructor(ConnectorTaskId.class, TaskStatus.Listener.class, TargetState.class)
-                .withArgs(taskId, statusListener, TargetState.STARTED)
+                .withConstructor(
+                        ConnectorTaskId.class,
+                        TaskStatus.Listener.class,
+                        TargetState.class,
+                        ClassLoader.class
+                )
+                .withArgs(taskId, statusListener, TargetState.STARTED, loader)
                 .addMockedMethod("initialize")
                 .addMockedMethod("execute")
                 .addMockedMethod("close")

http://git-wip-us.apache.org/repos/asf/kafka/blob/45f22617/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerTest.java
----------------------------------------------------------------------
diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerTest.java b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerTest.java
index 47dfcef..ccc7e15 100644
--- a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerTest.java
+++ b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerTest.java
@@ -18,6 +18,7 @@ package org.apache.kafka.connect.runtime;
 
 import org.apache.kafka.clients.producer.KafkaProducer;
 import org.apache.kafka.common.config.ConfigDef;
+import org.apache.kafka.common.config.ConfigException;
 import org.apache.kafka.common.utils.Time;
 import org.apache.kafka.connect.connector.Connector;
 import org.apache.kafka.connect.connector.ConnectorContext;
@@ -26,6 +27,9 @@ import org.apache.kafka.connect.data.Schema;
 import org.apache.kafka.connect.data.SchemaAndValue;
 import org.apache.kafka.connect.errors.ConnectException;
 import org.apache.kafka.connect.json.JsonConverter;
+import org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader;
+import org.apache.kafka.connect.runtime.isolation.PluginClassLoader;
+import org.apache.kafka.connect.runtime.isolation.Plugins;
 import org.apache.kafka.connect.runtime.standalone.StandaloneConfig;
 import org.apache.kafka.connect.sink.SinkTask;
 import org.apache.kafka.connect.source.SourceRecord;
@@ -39,6 +43,7 @@ import org.apache.kafka.connect.util.MockTime;
 import org.apache.kafka.connect.util.ThreadedTest;
 import org.easymock.Capture;
 import org.easymock.EasyMock;
+import org.easymock.Mock;
 import org.junit.Before;
 import org.junit.Test;
 import org.junit.runner.RunWith;
@@ -59,7 +64,7 @@ import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.fail;
 
 @RunWith(PowerMockRunner.class)
-@PrepareForTest({Worker.class})
+@PrepareForTest({Worker.class, Plugins.class})
 @PowerMockIgnore("javax.management.*")
 public class WorkerTest extends ThreadedTest {
 
@@ -69,7 +74,14 @@ public class WorkerTest extends ThreadedTest {
 
     private WorkerConfig config;
     private Worker worker;
-    private ConnectorFactory connectorFactory = PowerMock.createMock(ConnectorFactory.class);
+
+    @Mock
+    private Plugins plugins = PowerMock.createMock(Plugins.class);
+    @Mock
+    private PluginClassLoader pluginLoader = PowerMock.createMock(PluginClassLoader.class);
+    @Mock
+    private DelegatingClassLoader delegatingLoader =
+            PowerMock.createMock(DelegatingClassLoader.class);
     private OffsetBackingStore offsetBackingStore = PowerMock.createMock(OffsetBackingStore.class);
     private TaskStatus.Listener taskStatusListener = PowerMock.createStrictMock(TaskStatus.Listener.class);
     private ConnectorStatus.Listener connectorStatusListener = PowerMock.createStrictMock(ConnectorStatus.Listener.class);
@@ -87,17 +99,22 @@ public class WorkerTest extends ThreadedTest {
         workerProps.put("internal.value.converter.schemas.enable", "false");
         workerProps.put("offset.storage.file.filename", "/tmp/connect.offsets");
         config = new StandaloneConfig(workerProps);
+
+        PowerMock.mockStatic(Plugins.class);
     }
 
     @Test
     public void testStartAndStopConnector() throws Exception {
+        expectConverters();
         expectStartStorage();
 
         // Create
         Connector connector = PowerMock.createMock(Connector.class);
         ConnectorContext ctx = PowerMock.createMock(ConnectorContext.class);
 
-        EasyMock.expect(connectorFactory.newConnector(WorkerTestConnector.class.getName())).andReturn(connector);
+        EasyMock.expect(plugins.currentThreadLoader()).andReturn(delegatingLoader).times(2);
+        EasyMock.expect(plugins.newConnector(WorkerTestConnector.class.getName()))
+                .andReturn(connector);
         EasyMock.expect(connector.version()).andReturn("1.0");
 
         Map<String, String> props = new HashMap<>();
@@ -106,11 +123,17 @@ public class WorkerTest extends ThreadedTest {
         props.put(ConnectorConfig.NAME_CONFIG, CONNECTOR_ID);
         props.put(ConnectorConfig.CONNECTOR_CLASS_CONFIG, WorkerTestConnector.class.getName());
 
+        EasyMock.expect(plugins.compareAndSwapLoaders(connector))
+                .andReturn(delegatingLoader)
+                .times(2);
         connector.initialize(EasyMock.anyObject(ConnectorContext.class));
         EasyMock.expectLastCall();
         connector.start(props);
         EasyMock.expectLastCall();
 
+        EasyMock.expect(Plugins.compareAndSwapLoaders(delegatingLoader))
+                .andReturn(pluginLoader).times(2);
+
         connectorStatusListener.onStartup(CONNECTOR_ID);
         EasyMock.expectLastCall();
 
@@ -125,7 +148,7 @@ public class WorkerTest extends ThreadedTest {
 
         PowerMock.replayAll();
 
-        worker = new Worker(WORKER_ID, new MockTime(), connectorFactory, config, offsetBackingStore);
+        worker = new Worker(WORKER_ID, new MockTime(), plugins, config, offsetBackingStore);
         worker.start();
 
         assertEquals(Collections.emptySet(), worker.connectorNames());
@@ -147,20 +170,33 @@ public class WorkerTest extends ThreadedTest {
 
     @Test
     public void testStartConnectorFailure() throws Exception {
+        expectConverters();
         expectStartStorage();
 
-        worker = new Worker(WORKER_ID, new MockTime(), connectorFactory, config, offsetBackingStore);
-        worker.start();
-
         Map<String, String> props = new HashMap<>();
         props.put(SinkConnectorConfig.TOPICS_CONFIG, "foo,bar");
         props.put(ConnectorConfig.TASKS_MAX_CONFIG, "1");
         props.put(ConnectorConfig.NAME_CONFIG, CONNECTOR_ID);
         props.put(ConnectorConfig.CONNECTOR_CLASS_CONFIG, "java.util.HashMap"); // Bad connector class name
 
-        connectorStatusListener.onFailure(EasyMock.eq(CONNECTOR_ID), EasyMock.<Throwable>anyObject());
+        EasyMock.expect(plugins.currentThreadLoader()).andReturn(delegatingLoader);
+        EasyMock.expect(plugins.newConnector(EasyMock.anyString()))
+                .andThrow(new ConnectException("Failed to find Connector"));
+
+        EasyMock.expect(Plugins.compareAndSwapLoaders(delegatingLoader))
+                .andReturn(pluginLoader);
+
+        connectorStatusListener.onFailure(
+                EasyMock.eq(CONNECTOR_ID),
+                EasyMock.<ConnectException>anyObject()
+        );
         EasyMock.expectLastCall();
 
+        PowerMock.replayAll();
+
+        worker = new Worker(WORKER_ID, new MockTime(), plugins, config, offsetBackingStore);
+        worker.start();
+
         assertFalse(worker.startConnector(CONNECTOR_ID, props, PowerMock.createMock(ConnectorContext.class), connectorStatusListener, TargetState.STARTED));
 
         assertEquals(Collections.emptySet(), worker.connectorNames());
@@ -170,13 +206,15 @@ public class WorkerTest extends ThreadedTest {
 
     @Test
     public void testAddConnectorByAlias() throws Exception {
+        expectConverters();
         expectStartStorage();
 
         // Create
         Connector connector = PowerMock.createMock(Connector.class);
         ConnectorContext ctx = PowerMock.createMock(ConnectorContext.class);
 
-        EasyMock.expect(connectorFactory.newConnector("WorkerTestConnector")).andReturn(connector);
+        EasyMock.expect(plugins.currentThreadLoader()).andReturn(delegatingLoader).times(2);
+        EasyMock.expect(plugins.newConnector("WorkerTestConnector")).andReturn(connector);
         EasyMock.expect(connector.version()).andReturn("1.0");
 
         Map<String, String> props = new HashMap<>();
@@ -185,11 +223,18 @@ public class WorkerTest extends ThreadedTest {
         props.put(ConnectorConfig.NAME_CONFIG, CONNECTOR_ID);
         props.put(ConnectorConfig.CONNECTOR_CLASS_CONFIG, "WorkerTestConnector");
 
+        EasyMock.expect(plugins.compareAndSwapLoaders(connector))
+                .andReturn(delegatingLoader)
+                .times(2);
         connector.initialize(EasyMock.anyObject(ConnectorContext.class));
         EasyMock.expectLastCall();
         connector.start(props);
         EasyMock.expectLastCall();
 
+        EasyMock.expect(Plugins.compareAndSwapLoaders(delegatingLoader))
+                .andReturn(pluginLoader)
+                .times(2);
+
         connectorStatusListener.onStartup(CONNECTOR_ID);
         EasyMock.expectLastCall();
 
@@ -204,7 +249,7 @@ public class WorkerTest extends ThreadedTest {
 
         PowerMock.replayAll();
 
-        worker = new Worker(WORKER_ID, new MockTime(), connectorFactory, config, offsetBackingStore);
+        worker = new Worker(WORKER_ID, new MockTime(), plugins, config, offsetBackingStore);
         worker.start();
 
         assertEquals(Collections.emptySet(), worker.connectorNames());
@@ -221,13 +266,15 @@ public class WorkerTest extends ThreadedTest {
 
     @Test
     public void testAddConnectorByShortAlias() throws Exception {
+        expectConverters();
         expectStartStorage();
 
         // Create
         Connector connector = PowerMock.createMock(Connector.class);
         ConnectorContext ctx = PowerMock.createMock(ConnectorContext.class);
 
-        EasyMock.expect(connectorFactory.newConnector("WorkerTest")).andReturn(connector);
+        EasyMock.expect(plugins.currentThreadLoader()).andReturn(delegatingLoader).times(2);
+        EasyMock.expect(plugins.newConnector("WorkerTest")).andReturn(connector);
         EasyMock.expect(connector.version()).andReturn("1.0");
 
         Map<String, String> props = new HashMap<>();
@@ -236,11 +283,18 @@ public class WorkerTest extends ThreadedTest {
         props.put(ConnectorConfig.NAME_CONFIG, CONNECTOR_ID);
         props.put(ConnectorConfig.CONNECTOR_CLASS_CONFIG, "WorkerTest");
 
+        EasyMock.expect(plugins.compareAndSwapLoaders(connector))
+                .andReturn(delegatingLoader)
+                .times(2);
         connector.initialize(EasyMock.anyObject(ConnectorContext.class));
         EasyMock.expectLastCall();
         connector.start(props);
         EasyMock.expectLastCall();
 
+        EasyMock.expect(Plugins.compareAndSwapLoaders(delegatingLoader))
+                .andReturn(pluginLoader)
+                .times(2);
+
         connectorStatusListener.onStartup(CONNECTOR_ID);
         EasyMock.expectLastCall();
 
@@ -255,7 +309,7 @@ public class WorkerTest extends ThreadedTest {
 
         PowerMock.replayAll();
 
-        worker = new Worker(WORKER_ID, new MockTime(), connectorFactory, config, offsetBackingStore);
+        worker = new Worker(WORKER_ID, new MockTime(), plugins, config, offsetBackingStore);
         worker.start();
 
         assertEquals(Collections.emptySet(), worker.connectorNames());
@@ -272,11 +326,12 @@ public class WorkerTest extends ThreadedTest {
 
     @Test
     public void testStopInvalidConnector() {
+        expectConverters();
         expectStartStorage();
 
         PowerMock.replayAll();
 
-        worker = new Worker(WORKER_ID, new MockTime(), connectorFactory, config, offsetBackingStore);
+        worker = new Worker(WORKER_ID, new MockTime(), plugins, config, offsetBackingStore);
         worker.start();
 
         worker.stopConnector(CONNECTOR_ID);
@@ -284,13 +339,16 @@ public class WorkerTest extends ThreadedTest {
 
     @Test
     public void testReconfigureConnectorTasks() throws Exception {
+        expectConverters();
         expectStartStorage();
 
         // Create
         Connector connector = PowerMock.createMock(Connector.class);
         ConnectorContext ctx = PowerMock.createMock(ConnectorContext.class);
 
-        EasyMock.expect(connectorFactory.newConnector(WorkerTestConnector.class.getName())).andReturn(connector);
+        EasyMock.expect(plugins.currentThreadLoader()).andReturn(delegatingLoader).times(3);
+        EasyMock.expect(plugins.newConnector(WorkerTestConnector.class.getName()))
+                .andReturn(connector);
         EasyMock.expect(connector.version()).andReturn("1.0");
 
         Map<String, String> props = new HashMap<>();
@@ -299,11 +357,18 @@ public class WorkerTest extends ThreadedTest {
         props.put(ConnectorConfig.NAME_CONFIG, CONNECTOR_ID);
         props.put(ConnectorConfig.CONNECTOR_CLASS_CONFIG, WorkerTestConnector.class.getName());
 
+        EasyMock.expect(plugins.compareAndSwapLoaders(connector))
+                .andReturn(delegatingLoader)
+                .times(3);
         connector.initialize(EasyMock.anyObject(ConnectorContext.class));
         EasyMock.expectLastCall();
         connector.start(props);
         EasyMock.expectLastCall();
 
+        EasyMock.expect(Plugins.compareAndSwapLoaders(delegatingLoader))
+                .andReturn(pluginLoader)
+                .times(3);
+
         connectorStatusListener.onStartup(CONNECTOR_ID);
         EasyMock.expectLastCall();
 
@@ -324,7 +389,7 @@ public class WorkerTest extends ThreadedTest {
 
         PowerMock.replayAll();
 
-        worker = new Worker(WORKER_ID, new MockTime(), connectorFactory, config, offsetBackingStore);
+        worker = new Worker(WORKER_ID, new MockTime(), plugins, config, offsetBackingStore);
         worker.start();
 
         assertEquals(Collections.emptySet(), worker.connectorNames());
@@ -355,16 +420,16 @@ public class WorkerTest extends ThreadedTest {
 
     @Test
     public void testAddRemoveTask() throws Exception {
+        expectConverters();
         expectStartStorage();
 
         // Create
         TestSourceTask task = PowerMock.createMock(TestSourceTask.class);
         WorkerSourceTask workerTask = PowerMock.createMock(WorkerSourceTask.class);
         EasyMock.expect(workerTask.id()).andStubReturn(TASK_ID);
-
-        EasyMock.expect(connectorFactory.newTask(TestSourceTask.class)).andReturn(task);
         EasyMock.expect(task.version()).andReturn("1.0");
 
+        EasyMock.expect(plugins.currentThreadLoader()).andReturn(delegatingLoader).times(2);
         PowerMock.expectNew(
                 WorkerSourceTask.class, EasyMock.eq(TASK_ID),
                 EasyMock.eq(task),
@@ -376,16 +441,30 @@ public class WorkerTest extends ThreadedTest {
                 EasyMock.anyObject(KafkaProducer.class),
                 EasyMock.anyObject(OffsetStorageReader.class),
                 EasyMock.anyObject(OffsetStorageWriter.class),
-                EasyMock.anyObject(WorkerConfig.class),
+                EasyMock.eq(config),
+                EasyMock.anyObject(ClassLoader.class),
                 EasyMock.anyObject(Time.class))
                 .andReturn(workerTask);
         Map<String, String> origProps = new HashMap<>();
         origProps.put(TaskConfig.TASK_CLASS_CONFIG, TestSourceTask.class.getName());
+
+        EasyMock.expect(plugins.newTask(TestSourceTask.class)).andReturn(task);
         workerTask.initialize(new TaskConfig(origProps));
         EasyMock.expectLastCall();
         workerTask.run();
         EasyMock.expectLastCall();
 
+        EasyMock.expect(plugins.delegatingLoader()).andReturn(delegatingLoader);
+        EasyMock.expect(delegatingLoader.connectorLoader(WorkerTestConnector.class.getName()))
+                .andReturn(pluginLoader);
+
+        EasyMock.expect(Plugins.compareAndSwapLoaders(pluginLoader)).andReturn(delegatingLoader)
+                .times(2);
+
+        EasyMock.expect(workerTask.loader()).andReturn(pluginLoader);
+        EasyMock.expect(Plugins.compareAndSwapLoaders(delegatingLoader)).andReturn(pluginLoader)
+                .times(2);
+
         // Remove
         workerTask.stop();
         EasyMock.expectLastCall();
@@ -396,7 +475,7 @@ public class WorkerTest extends ThreadedTest {
 
         PowerMock.replayAll();
 
-        worker = new Worker(WORKER_ID, new MockTime(), connectorFactory, config, offsetBackingStore);
+        worker = new Worker(WORKER_ID, new MockTime(), plugins, config, offsetBackingStore);
         worker.start();
         assertEquals(Collections.emptySet(), worker.taskIds());
         worker.startTask(TASK_ID, anyConnectorConfigMap(), origProps, taskStatusListener, TargetState.STARTED);
@@ -411,24 +490,39 @@ public class WorkerTest extends ThreadedTest {
 
     @Test
     public void testStartTaskFailure() throws Exception {
+        expectConverters();
         expectStartStorage();
 
-        worker = new Worker(WORKER_ID, new MockTime(), connectorFactory, config, offsetBackingStore);
-        worker.start();
-
         Map<String, String> origProps = new HashMap<>();
         origProps.put(TaskConfig.TASK_CLASS_CONFIG, "missing.From.This.Workers.Classpath");
 
-        assertFalse(worker.startTask(TASK_ID, anyConnectorConfigMap(), origProps, taskStatusListener, TargetState.STARTED));
+        EasyMock.expect(plugins.currentThreadLoader()).andReturn(delegatingLoader);
+        EasyMock.expect(plugins.delegatingLoader()).andReturn(delegatingLoader);
+        EasyMock.expect(delegatingLoader.connectorLoader(WorkerTestConnector.class.getName()))
+                .andReturn(pluginLoader);
+
+        EasyMock.expect(pluginLoader.loadClass(origProps.get(TaskConfig.TASK_CLASS_CONFIG)))
+                .andThrow(new ClassNotFoundException());
+
+        EasyMock.expect(Plugins.compareAndSwapLoaders(delegatingLoader))
+                .andReturn(pluginLoader);
 
-        taskStatusListener.onFailure(EasyMock.eq(TASK_ID), EasyMock.<Throwable>anyObject());
+        taskStatusListener.onFailure(EasyMock.eq(TASK_ID), EasyMock.<ConfigException>anyObject());
         EasyMock.expectLastCall();
 
+        PowerMock.replayAll();
+
+        worker = new Worker(WORKER_ID, new MockTime(), plugins, config, offsetBackingStore);
+        worker.start();
+
+        assertFalse(worker.startTask(TASK_ID, anyConnectorConfigMap(), origProps, taskStatusListener, TargetState.STARTED));
+
         assertEquals(Collections.emptySet(), worker.taskIds());
     }
 
     @Test
     public void testCleanupTasksOnStop() throws Exception {
+        expectConverters();
         expectStartStorage();
 
         // Create
@@ -436,9 +530,10 @@ public class WorkerTest extends ThreadedTest {
         WorkerSourceTask workerTask = PowerMock.createMock(WorkerSourceTask.class);
         EasyMock.expect(workerTask.id()).andStubReturn(TASK_ID);
 
-        EasyMock.expect(connectorFactory.newTask(TestSourceTask.class)).andReturn(task);
+        EasyMock.expect(plugins.newTask(TestSourceTask.class)).andReturn(task);
         EasyMock.expect(task.version()).andReturn("1.0");
-        
+
+        EasyMock.expect(plugins.currentThreadLoader()).andReturn(delegatingLoader).times(2);
         PowerMock.expectNew(
                 WorkerSourceTask.class, EasyMock.eq(TASK_ID),
                 EasyMock.eq(task),
@@ -451,6 +546,7 @@ public class WorkerTest extends ThreadedTest {
                 EasyMock.anyObject(OffsetStorageReader.class),
                 EasyMock.anyObject(OffsetStorageWriter.class),
                 EasyMock.anyObject(WorkerConfig.class),
+                EasyMock.eq(pluginLoader),
                 EasyMock.anyObject(Time.class))
                 .andReturn(workerTask);
         Map<String, String> origProps = new HashMap<>();
@@ -460,6 +556,17 @@ public class WorkerTest extends ThreadedTest {
         workerTask.run();
         EasyMock.expectLastCall();
 
+        EasyMock.expect(plugins.delegatingLoader()).andReturn(delegatingLoader);
+        EasyMock.expect(delegatingLoader.connectorLoader(WorkerTestConnector.class.getName()))
+                .andReturn(pluginLoader);
+
+        EasyMock.expect(Plugins.compareAndSwapLoaders(pluginLoader)).andReturn(delegatingLoader)
+                .times(2);
+
+        EasyMock.expect(workerTask.loader()).andReturn(pluginLoader);
+        EasyMock.expect(Plugins.compareAndSwapLoaders(delegatingLoader)).andReturn(pluginLoader)
+                .times(2);
+
         // Remove on Worker.stop()
         workerTask.stop();
         EasyMock.expectLastCall();
@@ -472,7 +579,7 @@ public class WorkerTest extends ThreadedTest {
 
         PowerMock.replayAll();
 
-        worker = new Worker(WORKER_ID, new MockTime(), connectorFactory, config, offsetBackingStore);
+        worker = new Worker(WORKER_ID, new MockTime(), plugins, config, offsetBackingStore);
         worker.start();
         worker.startTask(TASK_ID, anyConnectorConfigMap(), origProps, taskStatusListener, TargetState.STARTED);
         worker.stop();
@@ -482,18 +589,20 @@ public class WorkerTest extends ThreadedTest {
 
     @Test
     public void testConverterOverrides() throws Exception {
+        expectConverters();
         expectStartStorage();
 
         TestSourceTask task = PowerMock.createMock(TestSourceTask.class);
         WorkerSourceTask workerTask = PowerMock.createMock(WorkerSourceTask.class);
         EasyMock.expect(workerTask.id()).andStubReturn(TASK_ID);
 
-        EasyMock.expect(connectorFactory.newTask(TestSourceTask.class)).andReturn(task);
+        EasyMock.expect(plugins.newTask(TestSourceTask.class)).andReturn(task);
         EasyMock.expect(task.version()).andReturn("1.0");
 
         Capture<TestConverter> keyConverter = EasyMock.newCapture();
         Capture<TestConverter> valueConverter = EasyMock.newCapture();
 
+        EasyMock.expect(plugins.currentThreadLoader()).andReturn(delegatingLoader).times(2);
         PowerMock.expectNew(
                 WorkerSourceTask.class, EasyMock.eq(TASK_ID),
                 EasyMock.eq(task),
@@ -506,15 +615,27 @@ public class WorkerTest extends ThreadedTest {
                 EasyMock.anyObject(OffsetStorageReader.class),
                 EasyMock.anyObject(OffsetStorageWriter.class),
                 EasyMock.anyObject(WorkerConfig.class),
+                EasyMock.eq(pluginLoader),
                 EasyMock.anyObject(Time.class))
                 .andReturn(workerTask);
         Map<String, String> origProps = new HashMap<>();
         origProps.put(TaskConfig.TASK_CLASS_CONFIG, TestSourceTask.class.getName());
+
         workerTask.initialize(new TaskConfig(origProps));
         EasyMock.expectLastCall();
         workerTask.run();
         EasyMock.expectLastCall();
 
+        EasyMock.expect(plugins.delegatingLoader()).andReturn(delegatingLoader);
+        EasyMock.expect(delegatingLoader.connectorLoader(WorkerTestConnector.class.getName()))
+                .andReturn(pluginLoader);
+
+        EasyMock.expect(Plugins.compareAndSwapLoaders(pluginLoader)).andReturn(delegatingLoader)
+                .times(2);
+
+        EasyMock.expect(workerTask.loader()).andReturn(pluginLoader);
+        EasyMock.expect(Plugins.compareAndSwapLoaders(delegatingLoader)).andReturn(pluginLoader)
+                .times(2);
         // Remove
         workerTask.stop();
         EasyMock.expectLastCall();
@@ -525,7 +646,7 @@ public class WorkerTest extends ThreadedTest {
 
         PowerMock.replayAll();
 
-        worker = new Worker(WORKER_ID, new MockTime(), connectorFactory, config, offsetBackingStore);
+        worker = new Worker(WORKER_ID, new MockTime(), plugins, config, offsetBackingStore);
         worker.start();
         assertEquals(Collections.emptySet(), worker.taskIds());
         Map<String, String> connProps = anyConnectorConfigMap();
@@ -559,6 +680,51 @@ public class WorkerTest extends ThreadedTest {
         EasyMock.expectLastCall();
     }
 
+    private void expectConverters() {
+        expectConverters(JsonConverter.class);
+    }
+
+    private void expectConverters(Class<? extends Converter> converterClass) {
+        // connector default
+        Converter keyConverter = PowerMock.createMock(converterClass);
+        Converter valueConverter = PowerMock.createMock(converterClass);
+        //internal
+        Converter internalKeyConverter = PowerMock.createMock(converterClass);
+        Converter internalValueConverter = PowerMock.createMock(converterClass);
+
+        // Instantiate and configure default
+        EasyMock.expect(plugins.newConverter(JsonConverter.class.getName(), config))
+                .andReturn(keyConverter);
+        keyConverter.configure(
+                EasyMock.<Map<String, ?>>anyObject(),
+                EasyMock.anyBoolean()
+        );
+        EasyMock.expectLastCall();
+        EasyMock.expect(plugins.newConverter(JsonConverter.class.getName(), config))
+                .andReturn(valueConverter);
+        valueConverter.configure(
+                EasyMock.<Map<String, ?>>anyObject(),
+                EasyMock.anyBoolean()
+        );
+        EasyMock.expectLastCall();
+
+        // Instantiate and configure internal
+        EasyMock.expect(plugins.newConverter(JsonConverter.class.getName(), config))
+                .andReturn(internalKeyConverter);
+        internalKeyConverter.configure(
+                EasyMock.<Map<String, ?>>anyObject(),
+                EasyMock.anyBoolean()
+        );
+        EasyMock.expectLastCall();
+        EasyMock.expect(plugins.newConverter(JsonConverter.class.getName(), config))
+                .andReturn(internalValueConverter);
+        internalValueConverter.configure(
+                EasyMock.<Map<String, ?>>anyObject(),
+                EasyMock.anyBoolean()
+        );
+        EasyMock.expectLastCall();
+    }
+
     private Map<String, String> anyConnectorConfigMap() {
         Map<String, String> props = new HashMap<>();
         props.put(ConnectorConfig.NAME_CONFIG, CONNECTOR_ID);

http://git-wip-us.apache.org/repos/asf/kafka/blob/45f22617/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/distributed/DistributedHerderTest.java
----------------------------------------------------------------------
diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/distributed/DistributedHerderTest.java b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/distributed/DistributedHerderTest.java
index 70d0736..18d83c5 100644
--- a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/distributed/DistributedHerderTest.java
+++ b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/distributed/DistributedHerderTest.java
@@ -26,13 +26,15 @@ import org.apache.kafka.connect.connector.ConnectorContext;
 import org.apache.kafka.connect.errors.AlreadyExistsException;
 import org.apache.kafka.connect.errors.NotFoundException;
 import org.apache.kafka.connect.runtime.ConnectorConfig;
-import org.apache.kafka.connect.runtime.ConnectorFactory;
 import org.apache.kafka.connect.runtime.Herder;
 import org.apache.kafka.connect.runtime.SinkConnectorConfig;
 import org.apache.kafka.connect.runtime.TargetState;
 import org.apache.kafka.connect.runtime.TaskConfig;
 import org.apache.kafka.connect.runtime.Worker;
 import org.apache.kafka.connect.runtime.WorkerConfig;
+import org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader;
+import org.apache.kafka.connect.runtime.isolation.PluginClassLoader;
+import org.apache.kafka.connect.runtime.isolation.Plugins;
 import org.apache.kafka.connect.runtime.rest.entities.ConnectorInfo;
 import org.apache.kafka.connect.runtime.rest.entities.TaskInfo;
 import org.apache.kafka.connect.runtime.rest.errors.BadRequestException;
@@ -73,7 +75,7 @@ import static org.junit.Assert.assertTrue;
 import static org.junit.Assert.fail;
 
 @RunWith(PowerMockRunner.class)
-@PrepareForTest(DistributedHerder.class)
+@PrepareForTest({DistributedHerder.class, Plugins.class})
 @PowerMockIgnore("javax.management.*")
 public class DistributedHerderTest {
     private static final Map<String, String> HERDER_CONFIG = new HashMap<>();
@@ -150,6 +152,12 @@ public class DistributedHerderTest {
     private DistributedHerder herder;
     @Mock private Worker worker;
     @Mock private Callback<Herder.Created<ConnectorInfo>> putConnectorCallback;
+    @Mock
+    private Plugins plugins;
+    @Mock
+    private PluginClassLoader pluginLoader;
+    @Mock
+    private DelegatingClassLoader delegatingLoader;
 
     private ConfigBackingStore.UpdateListener configUpdateListener;
     private WorkerRebalanceListener rebalanceListener;
@@ -165,7 +173,10 @@ public class DistributedHerderTest {
 
         configUpdateListener = herder.new ConfigUpdateListener();
         rebalanceListener = herder.new RebalanceListener();
-
+        plugins = PowerMock.createMock(Plugins.class);
+        pluginLoader = PowerMock.createMock(PluginClassLoader.class);
+        delegatingLoader = PowerMock.createMock(DelegatingClassLoader.class);
+        PowerMock.mockStatic(Plugins.class);
         PowerMock.expectPrivate(herder, "updateDeletedConnectorStatus").andVoid().anyTimes();
     }
 
@@ -173,6 +184,7 @@ public class DistributedHerderTest {
     public void testJoinAssignment() throws Exception {
         // Join group and get assignment
         EasyMock.expect(member.memberId()).andStubReturn("member");
+        EasyMock.expect(worker.getPlugins()).andReturn(plugins);
         expectRebalance(1, Arrays.asList(CONN1), Arrays.asList(TASK1));
         expectPostRebalanceCatchup(SNAPSHOT);
         worker.startConnector(EasyMock.eq(CONN1), EasyMock.<Map<String, String>>anyObject(), EasyMock.<ConnectorContext>anyObject(),
@@ -198,6 +210,7 @@ public class DistributedHerderTest {
     public void testRebalance() throws Exception {
         // Join group and get assignment
         EasyMock.expect(member.memberId()).andStubReturn("member");
+        EasyMock.expect(worker.getPlugins()).andReturn(plugins);
         expectRebalance(1, Arrays.asList(CONN1), Arrays.asList(TASK1));
         expectPostRebalanceCatchup(SNAPSHOT);
         worker.startConnector(EasyMock.eq(CONN1), EasyMock.<Map<String, String>>anyObject(), EasyMock.<ConnectorContext>anyObject(),
@@ -211,6 +224,7 @@ public class DistributedHerderTest {
         member.poll(EasyMock.anyInt());
         PowerMock.expectLastCall();
 
+        EasyMock.expect(worker.getPlugins()).andReturn(plugins);
         expectRebalance(Arrays.asList(CONN1), Arrays.asList(TASK1), ConnectProtocol.Assignment.NO_ERROR,
                 1, Arrays.asList(CONN1), Arrays.<ConnectorTaskId>asList());
 
@@ -235,6 +249,7 @@ public class DistributedHerderTest {
     public void testRebalanceFailedConnector() throws Exception {
         // Join group and get assignment
         EasyMock.expect(member.memberId()).andStubReturn("member");
+        EasyMock.expect(worker.getPlugins()).andReturn(plugins);
         expectRebalance(1, Arrays.asList(CONN1), Arrays.asList(TASK1));
         expectPostRebalanceCatchup(SNAPSHOT);
         worker.startConnector(EasyMock.eq(CONN1), EasyMock.<Map<String, String>>anyObject(), EasyMock.<ConnectorContext>anyObject(),
@@ -304,12 +319,13 @@ public class DistributedHerderTest {
         PowerMock.expectLastCall();
 
         // config validation
-        ConnectorFactory connectorFactoryMock = PowerMock.createMock(ConnectorFactory.class);
-        EasyMock.expect(worker.getConnectorFactory()).andStubReturn(connectorFactoryMock);
         Connector connectorMock = PowerMock.createMock(Connector.class);
-        EasyMock.expect(connectorFactoryMock.newConnector(EasyMock.anyString())).andReturn(connectorMock);
+        EasyMock.expect(worker.getPlugins()).andReturn(plugins).times(4);
+        EasyMock.expect(plugins.compareAndSwapLoaders(connectorMock)).andReturn(delegatingLoader);
+        EasyMock.expect(plugins.newConnector(EasyMock.anyString())).andReturn(connectorMock);
         EasyMock.expect(connectorMock.config()).andReturn(new ConfigDef());
         EasyMock.expect(connectorMock.validate(CONN2_CONFIG)).andReturn(new Config(Collections.<ConfigValue>emptyList()));
+        EasyMock.expect(Plugins.compareAndSwapLoaders(delegatingLoader)).andReturn(pluginLoader);
 
         // CONN2 is new, should succeed
         configBackingStore.putConnectorConfig(CONN2, CONN2_CONFIG);
@@ -342,12 +358,11 @@ public class DistributedHerderTest {
         PowerMock.expectLastCall();
 
         // config validation
-        ConnectorFactory connectorFactoryMock = PowerMock.createMock(ConnectorFactory.class);
-        EasyMock.expect(worker.getConnectorFactory()).andStubReturn(connectorFactoryMock);
         Connector connectorMock = PowerMock.createMock(Connector.class);
-        EasyMock.expect(connectorFactoryMock.newConnector(EasyMock.anyString())).andReturn(connectorMock);
-        EasyMock.expect(connectorMock.config()).andReturn(new ConfigDef());
-        EasyMock.expect(connectorMock.validate(config)).andReturn(new Config(Collections.<ConfigValue>emptyList()));
+        EasyMock.expect(worker.getPlugins()).andReturn(plugins).times(3);
+        EasyMock.expect(plugins.compareAndSwapLoaders(connectorMock)).andReturn(delegatingLoader);
+        EasyMock.expect(plugins.newConnector(EasyMock.anyString())).andReturn(connectorMock);
+        EasyMock.expect(Plugins.compareAndSwapLoaders(delegatingLoader)).andReturn(pluginLoader);
 
         // CONN2 creation should fail
 
@@ -380,10 +395,10 @@ public class DistributedHerderTest {
         PowerMock.expectLastCall();
 
         // config validation
-        ConnectorFactory connectorFactoryMock = PowerMock.createMock(ConnectorFactory.class);
-        EasyMock.expect(worker.getConnectorFactory()).andStubReturn(connectorFactoryMock);
         Connector connectorMock = PowerMock.createMock(Connector.class);
-        EasyMock.expect(connectorFactoryMock.newConnector(EasyMock.anyString())).andReturn(connectorMock);
+        EasyMock.expect(worker.getPlugins()).andReturn(plugins).times(4);
+        EasyMock.expect(plugins.compareAndSwapLoaders(connectorMock)).andReturn(delegatingLoader);
+        EasyMock.expect(plugins.newConnector(EasyMock.anyString())).andReturn(connectorMock);
 
         ConfigDef configDef = new ConfigDef();
         configDef.define("foo.bar", ConfigDef.Type.STRING, ConfigDef.Importance.HIGH, "foo.bar doc");
@@ -392,6 +407,7 @@ public class DistributedHerderTest {
         ConfigValue validatedValue = new ConfigValue("foo.bar");
         validatedValue.addErrorMessage("Failed foo.bar validation");
         EasyMock.expect(connectorMock.validate(CONN2_CONFIG)).andReturn(new Config(singletonList(validatedValue)));
+        EasyMock.expect(Plugins.compareAndSwapLoaders(delegatingLoader)).andReturn(pluginLoader);
 
         // CONN2 creation should fail
 
@@ -427,12 +443,13 @@ public class DistributedHerderTest {
         config.put(ConnectorConfig.NAME_CONFIG, "test-group");
 
         // config validation
-        ConnectorFactory connectorFactoryMock = PowerMock.createMock(ConnectorFactory.class);
-        EasyMock.expect(worker.getConnectorFactory()).andStubReturn(connectorFactoryMock);
         Connector connectorMock = PowerMock.createMock(SinkConnector.class);
-        EasyMock.expect(connectorFactoryMock.newConnector(EasyMock.anyString())).andReturn(connectorMock);
+        EasyMock.expect(worker.getPlugins()).andReturn(plugins).times(4);
+        EasyMock.expect(plugins.compareAndSwapLoaders(connectorMock)).andReturn(delegatingLoader);
+        EasyMock.expect(plugins.newConnector(EasyMock.anyString())).andReturn(connectorMock);
         EasyMock.expect(connectorMock.config()).andReturn(new ConfigDef());
         EasyMock.expect(connectorMock.validate(config)).andReturn(new Config(Collections.<ConfigValue>emptyList()));
+        EasyMock.expect(Plugins.compareAndSwapLoaders(delegatingLoader)).andReturn(pluginLoader);
 
         // CONN2 creation should fail because the worker group id (connect-test-group) conflicts with
         // the consumer group id we would use for this sink
@@ -483,6 +500,7 @@ public class DistributedHerderTest {
     public void testDestroyConnector() throws Exception {
         EasyMock.expect(member.memberId()).andStubReturn("leader");
         // Start with one connector
+        EasyMock.expect(worker.getPlugins()).andReturn(plugins);
         expectRebalance(1, Arrays.asList(CONN1), Collections.<ConnectorTaskId>emptyList());
         expectPostRebalanceCatchup(SNAPSHOT);
         worker.startConnector(EasyMock.eq(CONN1), EasyMock.<Map<String, String>>anyObject(), EasyMock.<ConnectorContext>anyObject(),
@@ -516,6 +534,7 @@ public class DistributedHerderTest {
 
         // get the initial assignment
         EasyMock.expect(member.memberId()).andStubReturn("leader");
+        EasyMock.expect(worker.getPlugins()).andReturn(plugins);
         expectRebalance(1, singletonList(CONN1), Collections.<ConnectorTaskId>emptyList());
         expectPostRebalanceCatchup(SNAPSHOT);
         member.poll(EasyMock.anyInt());
@@ -535,6 +554,7 @@ public class DistributedHerderTest {
 
         worker.stopConnector(CONN1);
         PowerMock.expectLastCall().andReturn(true);
+        EasyMock.expect(worker.getPlugins()).andReturn(plugins);
         worker.startConnector(EasyMock.eq(CONN1), EasyMock.<Map<String, String>>anyObject(), EasyMock.<ConnectorContext>anyObject(),
                 EasyMock.eq(herder), EasyMock.eq(TargetState.STARTED));
         PowerMock.expectLastCall().andReturn(true);
@@ -839,6 +859,7 @@ public class DistributedHerderTest {
         worker.startConnector(EasyMock.eq(CONN1), EasyMock.<Map<String, String>>anyObject(), EasyMock.<ConnectorContext>anyObject(),
                 EasyMock.eq(herder), EasyMock.eq(TargetState.STARTED));
         PowerMock.expectLastCall().andReturn(true);
+        EasyMock.expect(worker.getPlugins()).andReturn(plugins);
         EasyMock.expect(worker.isRunning(CONN1)).andReturn(true);
         EasyMock.expect(worker.connectorTaskConfigs(CONN1, MAX_TASKS, null)).andReturn(TASK_CONFIGS);
         member.poll(EasyMock.anyInt());
@@ -867,6 +888,7 @@ public class DistributedHerderTest {
         worker.startConnector(EasyMock.eq(CONN1), EasyMock.<Map<String, String>>anyObject(), EasyMock.<ConnectorContext>anyObject(),
                 EasyMock.eq(herder), EasyMock.eq(TargetState.STARTED));
         PowerMock.expectLastCall().andReturn(true);
+        EasyMock.expect(worker.getPlugins()).andReturn(plugins);
         EasyMock.expect(worker.isRunning(CONN1)).andReturn(true);
         EasyMock.expect(worker.connectorTaskConfigs(CONN1, MAX_TASKS, null)).andReturn(TASK_CONFIGS);
         member.poll(EasyMock.anyInt());
@@ -882,6 +904,7 @@ public class DistributedHerderTest {
         worker.startConnector(EasyMock.eq(CONN1), EasyMock.<Map<String, String>>anyObject(), EasyMock.<ConnectorContext>anyObject(),
                 EasyMock.eq(herder), EasyMock.eq(TargetState.STARTED));
         PowerMock.expectLastCall().andReturn(true);
+        EasyMock.expect(worker.getPlugins()).andReturn(plugins);
         EasyMock.expect(worker.isRunning(CONN1)).andReturn(true);
         EasyMock.expect(worker.connectorTaskConfigs(CONN1, MAX_TASKS, null)).andReturn(TASK_CONFIGS);
         member.poll(EasyMock.anyInt());
@@ -909,6 +932,7 @@ public class DistributedHerderTest {
         worker.startConnector(EasyMock.eq(CONN1), EasyMock.<Map<String, String>>anyObject(), EasyMock.<ConnectorContext>anyObject(),
                 EasyMock.eq(herder), EasyMock.eq(TargetState.STARTED));
         PowerMock.expectLastCall().andReturn(true);
+        EasyMock.expect(worker.getPlugins()).andReturn(plugins);
         EasyMock.expect(worker.isRunning(CONN1)).andReturn(true);
         EasyMock.expect(worker.connectorTaskConfigs(CONN1, MAX_TASKS, null)).andReturn(TASK_CONFIGS);
         member.poll(EasyMock.anyInt());
@@ -948,6 +972,7 @@ public class DistributedHerderTest {
         worker.startConnector(EasyMock.eq(CONN1), EasyMock.<Map<String, String>>anyObject(), EasyMock.<ConnectorContext>anyObject(),
                 EasyMock.eq(herder), EasyMock.eq(TargetState.PAUSED));
         PowerMock.expectLastCall().andReturn(true);
+        EasyMock.expect(worker.getPlugins()).andReturn(plugins);
 
         member.poll(EasyMock.anyInt());
         PowerMock.expectLastCall();
@@ -1155,6 +1180,7 @@ public class DistributedHerderTest {
         worker.startConnector(EasyMock.eq(CONN1), EasyMock.<Map<String, String>>anyObject(), EasyMock.<ConnectorContext>anyObject(),
                 EasyMock.eq(herder), EasyMock.eq(TargetState.STARTED));
         PowerMock.expectLastCall().andReturn(true);
+        EasyMock.expect(worker.getPlugins()).andReturn(plugins);
         EasyMock.expect(worker.connectorTaskConfigs(CONN1, MAX_TASKS, null)).andReturn(TASK_CONFIGS);
         worker.startTask(EasyMock.eq(TASK1), EasyMock.<Map<String, String>>anyObject(), EasyMock.<Map<String, String>>anyObject(),
                 EasyMock.eq(herder), EasyMock.eq(TargetState.STARTED));
@@ -1236,12 +1262,13 @@ public class DistributedHerderTest {
         PowerMock.expectLastCall();
 
         // config validation
-        ConnectorFactory connectorFactoryMock = PowerMock.createMock(ConnectorFactory.class);
-        EasyMock.expect(worker.getConnectorFactory()).andStubReturn(connectorFactoryMock);
         Connector connectorMock = PowerMock.createMock(Connector.class);
-        EasyMock.expect(connectorFactoryMock.newConnector(EasyMock.anyString())).andReturn(connectorMock);
+        EasyMock.expect(worker.getPlugins()).andReturn(plugins).times(6);
+        EasyMock.expect(plugins.compareAndSwapLoaders(connectorMock)).andReturn(delegatingLoader);
+        EasyMock.expect(plugins.newConnector(EasyMock.anyString())).andReturn(connectorMock);
         EasyMock.expect(connectorMock.config()).andReturn(new ConfigDef());
         EasyMock.expect(connectorMock.validate(CONN1_CONFIG_UPDATED)).andReturn(new Config(Collections.<ConfigValue>emptyList()));
+        EasyMock.expect(Plugins.compareAndSwapLoaders(delegatingLoader)).andReturn(pluginLoader);
 
         configBackingStore.putConnectorConfig(CONN1, CONN1_CONFIG_UPDATED);
         PowerMock.expectLastCall().andAnswer(new IAnswer<Object>() {