You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by ew...@apache.org on 2017/06/21 21:20:53 UTC

kafka git commit: KAFKA-5475: Connector config validation should include fields for defined transformation aliases

Repository: kafka
Updated Branches:
  refs/heads/trunk e6e263174 -> 96587f4b1


KAFKA-5475: Connector config validation should include fields for defined transformation aliases

Author: Ewen Cheslack-Postava <me...@ewencp.org>

Reviewers: Konstantine Karantasis <ko...@confluent.io>, Jason Gustafson <ja...@confluent.io>

Closes #3399 from ewencp/kafka-5475-validation-transformations


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/96587f4b
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/96587f4b
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/96587f4b

Branch: refs/heads/trunk
Commit: 96587f4b1ffd372d3e4f9a1fba6fc1d2f84a191d
Parents: e6e2631
Author: Ewen Cheslack-Postava <me...@ewencp.org>
Authored: Wed Jun 21 14:20:48 2017 -0700
Committer: Ewen Cheslack-Postava <me...@ewencp.org>
Committed: Wed Jun 21 14:20:48 2017 -0700

----------------------------------------------------------------------
 .../kafka/connect/runtime/AbstractHerder.java   |  42 ++---
 .../kafka/connect/runtime/ConnectorConfig.java  |   2 +-
 .../connect/runtime/SourceConnectorConfig.java  |   6 +-
 .../connect/runtime/AbstractHerderTest.java     | 158 ++++++++++++++++++-
 .../connect/runtime/TestSinkConnector.java      |  61 +++++++
 .../connect/runtime/TestSourceConnector.java    |  61 +++++++
 .../distributed/DistributedHerderTest.java      |  13 +-
 .../resources/ConnectorPluginsResourceTest.java |  72 +--------
 .../standalone/StandaloneHerderTest.java        |  13 +-
 9 files changed, 313 insertions(+), 115 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/96587f4b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/AbstractHerder.java
----------------------------------------------------------------------
diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/AbstractHerder.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/AbstractHerder.java
index cfb8ae0..607a724 100644
--- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/AbstractHerder.java
+++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/AbstractHerder.java
@@ -20,7 +20,6 @@ import org.apache.kafka.common.config.Config;
 import org.apache.kafka.common.config.ConfigDef;
 import org.apache.kafka.common.config.ConfigDef.ConfigKey;
 import org.apache.kafka.common.config.ConfigDef.Type;
-import org.apache.kafka.common.config.ConfigException;
 import org.apache.kafka.common.config.ConfigValue;
 import org.apache.kafka.connect.connector.Connector;
 import org.apache.kafka.connect.errors.NotFoundException;
@@ -229,10 +228,10 @@ public abstract class AbstractHerder implements Herder, TaskStatus.Listener, Con
     }
 
     @Override
-    public ConfigInfos validateConnectorConfig(Map<String, String> connectorConfig) {
-        String connType = connectorConfig.get(ConnectorConfig.CONNECTOR_CLASS_CONFIG);
+    public ConfigInfos validateConnectorConfig(Map<String, String> connectorProps) {
+        String connType = connectorProps.get(ConnectorConfig.CONNECTOR_CLASS_CONFIG);
         if (connType == null)
-            throw new BadRequestException("Connector config " + connectorConfig + " contains no connector type");
+            throw new BadRequestException("Connector config " + connectorProps + " contains no connector type");
 
         List<ConfigValue> configValues = new ArrayList<>();
         Map<String, ConfigKey> configKeys = new LinkedHashMap<>();
@@ -241,43 +240,26 @@ public abstract class AbstractHerder implements Herder, TaskStatus.Listener, Con
         Connector connector = getConnector(connType);
         ClassLoader savedLoader = plugins().compareAndSwapLoaders(connector);
         try {
-            // do basic connector validation (name, connector type, etc.)
-            ConfigDef basicConfigDef = (connector instanceof SourceConnector)
-                                       ? SourceConnectorConfig.configDef()
-                                       : SinkConnectorConfig.configDef();
+            ConfigDef baseConfigDef = (connector instanceof SourceConnector)
+                    ? SourceConnectorConfig.configDef()
+                    : SinkConnectorConfig.configDef();
+            ConfigDef enrichedConfigDef = ConnectorConfig.enrich(plugins(), baseConfigDef, connectorProps, false);
             Map<String, ConfigValue> validatedConnectorConfig = validateBasicConnectorConfig(
                     connector,
-                    basicConfigDef,
-                    connectorConfig
+                    enrichedConfigDef,
+                    connectorProps
             );
             configValues.addAll(validatedConnectorConfig.values());
-            configKeys.putAll(basicConfigDef.configKeys());
-            allGroups.addAll(basicConfigDef.groups());
-
-            ConnectorConfig connectorConfigToEnrich = (connector instanceof SourceConnector)
-                    ? new SourceConnectorConfig(plugins(), connectorConfig)
-                    : new SinkConnectorConfig(plugins(), connectorConfig);
-            final ConfigDef connectorConfigDef = connectorConfigToEnrich.enrich(
-                    plugins(),
-                    basicConfigDef,
-                    connectorConfig,
-                    false
-            );
-
-            // Override is required here after the enriched ConfigDef has been created successfully
-            configKeys.putAll(connectorConfigDef.configKeys());
-            allGroups.addAll(connectorConfigDef.groups());
+            configKeys.putAll(enrichedConfigDef.configKeys());
+            allGroups.addAll(enrichedConfigDef.groups());
 
             // do custom connector-specific validation
-            Config config = connector.validate(connectorConfig);
+            Config config = connector.validate(connectorProps);
             ConfigDef configDef = connector.config();
             configKeys.putAll(configDef.configKeys());
             allGroups.addAll(configDef.groups());
             configValues.addAll(config.configValues());
             return generateResult(connType, configKeys, configValues, new ArrayList<>(allGroups));
-        } catch (ConfigException e) {
-            // Basic validation must have failed. Return the result.
-            return generateResult(connType, configKeys, configValues, new ArrayList<>(allGroups));
         } finally {
             Plugins.compareAndSwapLoaders(savedLoader);
         }

http://git-wip-us.apache.org/repos/asf/kafka/blob/96587f4b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/ConnectorConfig.java
----------------------------------------------------------------------
diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/ConnectorConfig.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/ConnectorConfig.java
index 869cfbd..0f8c390 100644
--- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/ConnectorConfig.java
+++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/ConnectorConfig.java
@@ -164,7 +164,7 @@ public class ConnectorConfig extends AbstractConfig {
      * <p>
      * {@code requireFullConfig} specifies whether required config values that are missing should cause an exception to be thrown.
      */
-    public ConfigDef enrich(Plugins plugins, ConfigDef baseConfigDef, Map<String, String> props, boolean requireFullConfig) {
+    public static ConfigDef enrich(Plugins plugins, ConfigDef baseConfigDef, Map<String, String> props, boolean requireFullConfig) {
         Object transformAliases = ConfigDef.parseType(TRANSFORMS_CONFIG, props.get(TRANSFORMS_CONFIG), Type.LIST);
         if (!(transformAliases instanceof List)) {
             return baseConfigDef;

http://git-wip-us.apache.org/repos/asf/kafka/blob/96587f4b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/SourceConnectorConfig.java
----------------------------------------------------------------------
diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/SourceConnectorConfig.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/SourceConnectorConfig.java
index 6915421..ab8fd01 100644
--- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/SourceConnectorConfig.java
+++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/SourceConnectorConfig.java
@@ -23,7 +23,11 @@ import java.util.Map;
 
 public class SourceConnectorConfig extends ConnectorConfig {
 
-    private static ConfigDef config = configDef();
+    private static ConfigDef config = ConnectorConfig.configDef();
+
+    public static ConfigDef configDef() {
+        return config;
+    }
 
     public SourceConnectorConfig(Plugins plugins, Map<String, String> props) {
         super(plugins, config, props);

http://git-wip-us.apache.org/repos/asf/kafka/blob/96587f4b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/AbstractHerderTest.java
----------------------------------------------------------------------
diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/AbstractHerderTest.java b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/AbstractHerderTest.java
index b9276af..c261ab6 100644
--- a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/AbstractHerderTest.java
+++ b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/AbstractHerderTest.java
@@ -16,9 +16,17 @@
  */
 package org.apache.kafka.connect.runtime;
 
+import org.apache.kafka.common.config.ConfigDef;
+import org.apache.kafka.connect.connector.ConnectRecord;
+import org.apache.kafka.connect.connector.Connector;
+import org.apache.kafka.connect.runtime.isolation.PluginDesc;
+import org.apache.kafka.connect.runtime.isolation.Plugins;
+import org.apache.kafka.connect.runtime.rest.entities.ConfigInfos;
 import org.apache.kafka.connect.runtime.rest.entities.ConnectorStateInfo;
+import org.apache.kafka.connect.runtime.rest.errors.BadRequestException;
 import org.apache.kafka.connect.storage.ConfigBackingStore;
 import org.apache.kafka.connect.storage.StatusBackingStore;
+import org.apache.kafka.connect.transforms.Transformation;
 import org.apache.kafka.connect.util.ConnectorTaskId;
 import org.easymock.Capture;
 import org.easymock.EasyMock;
@@ -26,19 +34,29 @@ import org.easymock.EasyMockSupport;
 import org.easymock.IAnswer;
 import org.junit.Test;
 
+import java.util.Arrays;
 import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
 
 import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertTrue;
 
 public class AbstractHerderTest extends EasyMockSupport {
+    private final Worker worker = strictMock(Worker.class);
+    private final String workerId = "workerId";
+    private final int generation = 5;
+    private final String connector = "connector";
+    private final Plugins plugins = strictMock(Plugins.class);
+    private final ClassLoader classLoader = strictMock(ClassLoader.class);
 
     @Test
     public void connectorStatus() {
-        Worker worker = null;
-        String workerId = "workerId";
-        String connector = "connector";
-        int generation = 5;
         ConnectorTaskId taskId = new ConnectorTaskId(connector, 0);
 
         ConfigBackingStore configStore = strictMock(ConfigBackingStore.class);
@@ -79,7 +97,6 @@ public class AbstractHerderTest extends EasyMockSupport {
 
     @Test
     public void taskStatus() {
-        Worker worker = null;
         ConnectorTaskId taskId = new ConnectorTaskId("connector", 0);
         String workerId = "workerId";
 
@@ -117,4 +134,135 @@ public class AbstractHerderTest extends EasyMockSupport {
 
         verifyAll();
     }
+
+
+    @Test(expected = BadRequestException.class)
+    public void testConfigValidationEmptyConfig() {
+        AbstractHerder herder = createConfigValidationHerder(TestSourceConnector.class);
+        replayAll();
+
+        herder.validateConnectorConfig(new HashMap<String, String>());
+
+        verifyAll();
+    }
+
+    @Test()
+    public void testConfigValidationMissingName() {
+        AbstractHerder herder = createConfigValidationHerder(TestSourceConnector.class);
+        replayAll();
+
+        Map<String, String> config = Collections.singletonMap(ConnectorConfig.CONNECTOR_CLASS_CONFIG, TestSourceConnector.class.getName());
+        ConfigInfos result = herder.validateConnectorConfig(config);
+
+        // We expect there to be errors due to the missing name and .... Note that these assertions depend heavily on
+        // the config fields for SourceConnectorConfig, but we expect these to change rarely.
+        assertEquals(TestSourceConnector.class.getName(), result.name());
+        assertEquals(Arrays.asList(ConnectorConfig.COMMON_GROUP, ConnectorConfig.TRANSFORMS_GROUP), result.groups());
+        assertEquals(2, result.errorCount());
+        // Base connector config has 6 fields, connector's configs add 2
+        assertEquals(8, result.values().size());
+        // Missing name should generate an error
+        assertEquals(ConnectorConfig.NAME_CONFIG, result.values().get(0).configValue().name());
+        assertEquals(1, result.values().get(0).configValue().errors().size());
+        // "required" config from connector should generate an error
+        assertEquals("required", result.values().get(6).configValue().name());
+        assertEquals(1, result.values().get(6).configValue().errors().size());
+
+        verifyAll();
+    }
+
+    @Test()
+    public void testConfigValidationTransformsExtendResults() {
+        AbstractHerder herder = createConfigValidationHerder(TestSourceConnector.class);
+
+        // 2 transform aliases defined -> 2 plugin lookups
+        Set<PluginDesc<Transformation>> transformations = new HashSet<>();
+        transformations.add(new PluginDesc<Transformation>(SampleTransformation.class, "1.0", classLoader));
+        EasyMock.expect(plugins.transformations()).andReturn(transformations).times(2);
+
+        replayAll();
+
+        // Define 2 transformations. One has a class defined and so can get embedded configs, the other is missing
+        // class info that should generate an error.
+        Map<String, String> config = new HashMap<>();
+        config.put(ConnectorConfig.CONNECTOR_CLASS_CONFIG, TestSourceConnector.class.getName());
+        config.put(ConnectorConfig.NAME_CONFIG, "connector-name");
+        config.put(ConnectorConfig.TRANSFORMS_CONFIG, "xformA,xformB");
+        config.put(ConnectorConfig.TRANSFORMS_CONFIG + ".xformA.type", SampleTransformation.class.getName());
+        config.put("required", "value"); // connector required config
+        ConfigInfos result = herder.validateConnectorConfig(config);
+
+        // We expect there to be errors due to the missing name and .... Note that these assertions depend heavily on
+        // the config fields for SourceConnectorConfig, but we expect these to change rarely.
+        assertEquals(TestSourceConnector.class.getName(), result.name());
+        // Each transform also gets its own group
+        List<String> expectedGroups = Arrays.asList(
+                ConnectorConfig.COMMON_GROUP,
+                ConnectorConfig.TRANSFORMS_GROUP,
+                "Transforms: xformA",
+                "Transforms: xformB"
+        );
+        assertEquals(expectedGroups, result.groups());
+        assertEquals(2, result.errorCount());
+        // Base connector config has 6 fields, connector's configs add 2, 2 type fields from the transforms, and
+        // 1 from the valid transformation's config
+        assertEquals(11, result.values().size());
+        // Should get 2 type fields from the transforms, first adds its own config since it has a valid class
+        assertEquals("transforms.xformA.type", result.values().get(6).configValue().name());
+        assertTrue(result.values().get(6).configValue().errors().isEmpty());
+        assertEquals("transforms.xformA.subconfig", result.values().get(7).configValue().name());
+        assertEquals("transforms.xformB.type", result.values().get(8).configValue().name());
+        assertFalse(result.values().get(8).configValue().errors().isEmpty());
+
+        verifyAll();
+    }
+
+    private AbstractHerder createConfigValidationHerder(Class<? extends Connector> connectorClass) {
+
+
+        ConfigBackingStore configStore = strictMock(ConfigBackingStore.class);
+        StatusBackingStore statusStore = strictMock(StatusBackingStore.class);
+
+        AbstractHerder herder = partialMockBuilder(AbstractHerder.class)
+                .withConstructor(Worker.class, String.class, StatusBackingStore.class, ConfigBackingStore.class)
+                .withArgs(worker, workerId, statusStore, configStore)
+                .addMockedMethod("generation")
+                .createMock();
+        EasyMock.expect(herder.generation()).andStubReturn(generation);
+
+        // Call to validateConnectorConfig
+        EasyMock.expect(worker.getPlugins()).andStubReturn(plugins);
+        final Connector connector;
+        try {
+            connector = connectorClass.newInstance();
+        } catch (InstantiationException | IllegalAccessException e) {
+            throw new RuntimeException("Couldn't create connector", e);
+        }
+        EasyMock.expect(plugins.newConnector(connectorClass.getName())).andReturn(connector);
+        EasyMock.expect(plugins.compareAndSwapLoaders(connector)).andReturn(classLoader);
+        return herder;
+    }
+
+    public static class SampleTransformation<R extends ConnectRecord<R>> implements Transformation<R> {
+        @Override
+        public void configure(Map<String, ?> configs) {
+
+        }
+
+        @Override
+        public R apply(R record) {
+            return record;
+        }
+
+        @Override
+        public ConfigDef config() {
+            return new ConfigDef()
+                    .define("subconfig", ConfigDef.Type.STRING, "default", ConfigDef.Importance.LOW, "docs");
+        }
+
+        @Override
+        public void close() {
+
+        }
+    }
 }

http://git-wip-us.apache.org/repos/asf/kafka/blob/96587f4b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/TestSinkConnector.java
----------------------------------------------------------------------
diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/TestSinkConnector.java b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/TestSinkConnector.java
new file mode 100644
index 0000000..a6e3bb1
--- /dev/null
+++ b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/TestSinkConnector.java
@@ -0,0 +1,61 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.connect.runtime;
+
+import org.apache.kafka.common.config.ConfigDef;
+import org.apache.kafka.connect.connector.Task;
+import org.apache.kafka.connect.sink.SinkConnector;
+
+import java.util.List;
+import java.util.Map;
+
+public class TestSinkConnector extends SinkConnector {
+
+    public static final String VERSION = "some great version";
+
+    @Override
+    public String version() {
+        return VERSION;
+    }
+
+    @Override
+    public void start(Map<String, String> props) {
+
+    }
+
+    @Override
+    public Class<? extends Task> taskClass() {
+        return null;
+    }
+
+    @Override
+    public List<Map<String, String>> taskConfigs(int maxTasks) {
+        return null;
+    }
+
+    @Override
+    public void stop() {
+
+    }
+
+    @Override
+    public ConfigDef config() {
+        return new ConfigDef()
+                .define("required", ConfigDef.Type.STRING, ConfigDef.Importance.HIGH, "required docs")
+                .define("optional", ConfigDef.Type.STRING, "defaultVal", ConfigDef.Importance.HIGH, "optional docs");
+    }
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/96587f4b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/TestSourceConnector.java
----------------------------------------------------------------------
diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/TestSourceConnector.java b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/TestSourceConnector.java
new file mode 100644
index 0000000..5f754e2
--- /dev/null
+++ b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/TestSourceConnector.java
@@ -0,0 +1,61 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.connect.runtime;
+
+import org.apache.kafka.common.config.ConfigDef;
+import org.apache.kafka.connect.connector.Task;
+import org.apache.kafka.connect.source.SourceConnector;
+
+import java.util.List;
+import java.util.Map;
+
+public class TestSourceConnector extends SourceConnector {
+
+    public static final String VERSION = "an entirely different version";
+
+    @Override
+    public String version() {
+        return VERSION;
+    }
+
+    @Override
+    public void start(Map<String, String> props) {
+
+    }
+
+    @Override
+    public Class<? extends Task> taskClass() {
+        return null;
+    }
+
+    @Override
+    public List<Map<String, String>> taskConfigs(int maxTasks) {
+        return null;
+    }
+
+    @Override
+    public void stop() {
+
+    }
+
+    @Override
+    public ConfigDef config() {
+        return new ConfigDef()
+                .define("required", ConfigDef.Type.STRING, ConfigDef.Importance.HIGH, "required docs")
+                .define("optional", ConfigDef.Type.STRING, "defaultVal", ConfigDef.Importance.HIGH, "optional docs");
+    }
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/96587f4b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/distributed/DistributedHerderTest.java
----------------------------------------------------------------------
diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/distributed/DistributedHerderTest.java b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/distributed/DistributedHerderTest.java
index 18d83c5..7834a89 100644
--- a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/distributed/DistributedHerderTest.java
+++ b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/distributed/DistributedHerderTest.java
@@ -320,7 +320,7 @@ public class DistributedHerderTest {
 
         // config validation
         Connector connectorMock = PowerMock.createMock(Connector.class);
-        EasyMock.expect(worker.getPlugins()).andReturn(plugins).times(4);
+        EasyMock.expect(worker.getPlugins()).andReturn(plugins).times(3);
         EasyMock.expect(plugins.compareAndSwapLoaders(connectorMock)).andReturn(delegatingLoader);
         EasyMock.expect(plugins.newConnector(EasyMock.anyString())).andReturn(connectorMock);
         EasyMock.expect(connectorMock.config()).andReturn(new ConfigDef());
@@ -362,6 +362,11 @@ public class DistributedHerderTest {
         EasyMock.expect(worker.getPlugins()).andReturn(plugins).times(3);
         EasyMock.expect(plugins.compareAndSwapLoaders(connectorMock)).andReturn(delegatingLoader);
         EasyMock.expect(plugins.newConnector(EasyMock.anyString())).andReturn(connectorMock);
+
+        EasyMock.expect(connectorMock.config()).andStubReturn(new ConfigDef());
+        ConfigValue validatedValue = new ConfigValue("foo.bar");
+        EasyMock.expect(connectorMock.validate(config)).andReturn(new Config(singletonList(validatedValue)));
+
         EasyMock.expect(Plugins.compareAndSwapLoaders(delegatingLoader)).andReturn(pluginLoader);
 
         // CONN2 creation should fail
@@ -396,7 +401,7 @@ public class DistributedHerderTest {
 
         // config validation
         Connector connectorMock = PowerMock.createMock(Connector.class);
-        EasyMock.expect(worker.getPlugins()).andReturn(plugins).times(4);
+        EasyMock.expect(worker.getPlugins()).andReturn(plugins).times(3);
         EasyMock.expect(plugins.compareAndSwapLoaders(connectorMock)).andReturn(delegatingLoader);
         EasyMock.expect(plugins.newConnector(EasyMock.anyString())).andReturn(connectorMock);
 
@@ -444,7 +449,7 @@ public class DistributedHerderTest {
 
         // config validation
         Connector connectorMock = PowerMock.createMock(SinkConnector.class);
-        EasyMock.expect(worker.getPlugins()).andReturn(plugins).times(4);
+        EasyMock.expect(worker.getPlugins()).andReturn(plugins).times(3);
         EasyMock.expect(plugins.compareAndSwapLoaders(connectorMock)).andReturn(delegatingLoader);
         EasyMock.expect(plugins.newConnector(EasyMock.anyString())).andReturn(connectorMock);
         EasyMock.expect(connectorMock.config()).andReturn(new ConfigDef());
@@ -1263,7 +1268,7 @@ public class DistributedHerderTest {
 
         // config validation
         Connector connectorMock = PowerMock.createMock(Connector.class);
-        EasyMock.expect(worker.getPlugins()).andReturn(plugins).times(6);
+        EasyMock.expect(worker.getPlugins()).andReturn(plugins).times(5);
         EasyMock.expect(plugins.compareAndSwapLoaders(connectorMock)).andReturn(delegatingLoader);
         EasyMock.expect(plugins.newConnector(EasyMock.anyString())).andReturn(connectorMock);
         EasyMock.expect(connectorMock.config()).andReturn(new ConfigDef());

http://git-wip-us.apache.org/repos/asf/kafka/blob/96587f4b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/rest/resources/ConnectorPluginsResourceTest.java
----------------------------------------------------------------------
diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/rest/resources/ConnectorPluginsResourceTest.java b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/rest/resources/ConnectorPluginsResourceTest.java
index 2d0448e..538e5db 100644
--- a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/rest/resources/ConnectorPluginsResourceTest.java
+++ b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/rest/resources/ConnectorPluginsResourceTest.java
@@ -30,6 +30,8 @@ import org.apache.kafka.connect.connector.Task;
 import org.apache.kafka.connect.runtime.AbstractHerder;
 import org.apache.kafka.connect.runtime.ConnectorConfig;
 import org.apache.kafka.connect.runtime.Herder;
+import org.apache.kafka.connect.runtime.TestSinkConnector;
+import org.apache.kafka.connect.runtime.TestSourceConnector;
 import org.apache.kafka.connect.runtime.isolation.PluginClassLoader;
 import org.apache.kafka.connect.runtime.isolation.PluginDesc;
 import org.apache.kafka.connect.runtime.isolation.Plugins;
@@ -493,76 +495,6 @@ public class ConnectorPluginsResourceTest {
         }
     }
 
-    public static class TestSinkConnector extends SinkConnector {
-
-        static final String VERSION = "some great version";
-
-        @Override
-        public String version() {
-            return VERSION;
-        }
-
-        @Override
-        public void start(Map<String, String> props) {
-
-        }
-
-        @Override
-        public Class<? extends Task> taskClass() {
-            return null;
-        }
-
-        @Override
-        public List<Map<String, String>> taskConfigs(int maxTasks) {
-            return null;
-        }
-
-        @Override
-        public void stop() {
-
-        }
-
-        @Override
-        public ConfigDef config() {
-            return null;
-        }
-    }
-
-    public static class TestSourceConnector extends SourceConnector {
-
-        static final String VERSION = "an entirely different version";
-
-        @Override
-        public String version() {
-            return VERSION;
-        }
-
-        @Override
-        public void start(Map<String, String> props) {
-
-        }
-
-        @Override
-        public Class<? extends Task> taskClass() {
-            return null;
-        }
-
-        @Override
-        public List<Map<String, String>> taskConfigs(int maxTasks) {
-            return null;
-        }
-
-        @Override
-        public void stop() {
-
-        }
-
-        @Override
-        public ConfigDef config() {
-            return null;
-        }
-    }
-
     /* Name here needs to be unique as we are testing the aliasing mechanism */
     public static class ConnectorPluginsResourceTestConnector extends Connector {
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/96587f4b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/standalone/StandaloneHerderTest.java
----------------------------------------------------------------------
diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/standalone/StandaloneHerderTest.java b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/standalone/StandaloneHerderTest.java
index 1c3dddb..c58d702 100644
--- a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/standalone/StandaloneHerderTest.java
+++ b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/standalone/StandaloneHerderTest.java
@@ -128,6 +128,7 @@ public class StandaloneHerderTest {
 
     @Test
     public void testCreateConnectorFailedBasicValidation() throws Exception {
+        // Basic validation should be performed and return an error, but should still evaluate the connector's config
         connector = PowerMock.createMock(BogusSourceConnector.class);
 
         Map<String, String> config = connectorConfig(SourceSink.SOURCE);
@@ -137,7 +138,11 @@ public class StandaloneHerderTest {
         EasyMock.expect(worker.getPlugins()).andReturn(plugins).times(3);
         EasyMock.expect(plugins.compareAndSwapLoaders(connectorMock)).andReturn(delegatingLoader);
         EasyMock.expect(plugins.newConnector(EasyMock.anyString())).andReturn(connectorMock);
+
         EasyMock.expect(connectorMock.config()).andStubReturn(new ConfigDef());
+
+        ConfigValue validatedValue = new ConfigValue("foo.bar");
+        EasyMock.expect(connectorMock.validate(config)).andReturn(new Config(singletonList(validatedValue)));
         EasyMock.expect(Plugins.compareAndSwapLoaders(delegatingLoader)).andReturn(pluginLoader);
 
         createCallback.onCompletion(EasyMock.<BadRequestException>anyObject(), EasyMock.<Herder.Created<ConnectorInfo>>isNull());
@@ -155,7 +160,7 @@ public class StandaloneHerderTest {
         connector = PowerMock.createMock(BogusSourceConnector.class);
 
         Connector connectorMock = PowerMock.createMock(Connector.class);
-        EasyMock.expect(worker.getPlugins()).andReturn(plugins).times(4);
+        EasyMock.expect(worker.getPlugins()).andReturn(plugins).times(3);
         EasyMock.expect(plugins.compareAndSwapLoaders(connectorMock)).andReturn(delegatingLoader);
         EasyMock.expect(plugins.newConnector(EasyMock.anyString())).andReturn(connectorMock);
 
@@ -189,7 +194,7 @@ public class StandaloneHerderTest {
         Connector connectorMock = PowerMock.createMock(Connector.class);
         expectConfigValidation(connectorMock, true, config, config);
 
-        EasyMock.expect(worker.getPlugins()).andReturn(plugins).times(3);
+        EasyMock.expect(worker.getPlugins()).andReturn(plugins).times(2);
         EasyMock.expect(plugins.compareAndSwapLoaders(connectorMock)).andReturn(delegatingLoader);
         // No new connector is created
         EasyMock.expect(Plugins.compareAndSwapLoaders(delegatingLoader)).andReturn(pluginLoader);
@@ -522,7 +527,7 @@ public class StandaloneHerderTest {
         );
         ConfigDef configDef = new ConfigDef();
         configDef.define(key, ConfigDef.Type.STRING, ConfigDef.Importance.HIGH, "");
-        EasyMock.expect(worker.getPlugins()).andReturn(plugins).times(4);
+        EasyMock.expect(worker.getPlugins()).andReturn(plugins).times(3);
         EasyMock.expect(plugins.compareAndSwapLoaders(connectorMock)).andReturn(delegatingLoader);
         EasyMock.expect(worker.getPlugins()).andStubReturn(plugins);
         EasyMock.expect(plugins.newConnector(EasyMock.anyString())).andReturn(connectorMock);
@@ -620,7 +625,7 @@ public class StandaloneHerderTest {
             Map<String, String>... configs
     ) {
         // config validation
-        EasyMock.expect(worker.getPlugins()).andReturn(plugins).times(4);
+        EasyMock.expect(worker.getPlugins()).andReturn(plugins).times(3);
         EasyMock.expect(plugins.compareAndSwapLoaders(connectorMock)).andReturn(delegatingLoader);
         if (shouldCreateConnector) {
             EasyMock.expect(worker.getPlugins()).andReturn(plugins);