You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by ew...@apache.org on 2017/06/21 21:20:53 UTC
kafka git commit: KAFKA-5475: Connector config validation should
include fields for defined transformation aliases
Repository: kafka
Updated Branches:
refs/heads/trunk e6e263174 -> 96587f4b1
KAFKA-5475: Connector config validation should include fields for defined transformation aliases
Author: Ewen Cheslack-Postava <me...@ewencp.org>
Reviewers: Konstantine Karantasis <ko...@confluent.io>, Jason Gustafson <ja...@confluent.io>
Closes #3399 from ewencp/kafka-5475-validation-transformations
Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/96587f4b
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/96587f4b
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/96587f4b
Branch: refs/heads/trunk
Commit: 96587f4b1ffd372d3e4f9a1fba6fc1d2f84a191d
Parents: e6e2631
Author: Ewen Cheslack-Postava <me...@ewencp.org>
Authored: Wed Jun 21 14:20:48 2017 -0700
Committer: Ewen Cheslack-Postava <me...@ewencp.org>
Committed: Wed Jun 21 14:20:48 2017 -0700
----------------------------------------------------------------------
.../kafka/connect/runtime/AbstractHerder.java | 42 ++---
.../kafka/connect/runtime/ConnectorConfig.java | 2 +-
.../connect/runtime/SourceConnectorConfig.java | 6 +-
.../connect/runtime/AbstractHerderTest.java | 158 ++++++++++++++++++-
.../connect/runtime/TestSinkConnector.java | 61 +++++++
.../connect/runtime/TestSourceConnector.java | 61 +++++++
.../distributed/DistributedHerderTest.java | 13 +-
.../resources/ConnectorPluginsResourceTest.java | 72 +--------
.../standalone/StandaloneHerderTest.java | 13 +-
9 files changed, 313 insertions(+), 115 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/kafka/blob/96587f4b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/AbstractHerder.java
----------------------------------------------------------------------
diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/AbstractHerder.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/AbstractHerder.java
index cfb8ae0..607a724 100644
--- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/AbstractHerder.java
+++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/AbstractHerder.java
@@ -20,7 +20,6 @@ import org.apache.kafka.common.config.Config;
import org.apache.kafka.common.config.ConfigDef;
import org.apache.kafka.common.config.ConfigDef.ConfigKey;
import org.apache.kafka.common.config.ConfigDef.Type;
-import org.apache.kafka.common.config.ConfigException;
import org.apache.kafka.common.config.ConfigValue;
import org.apache.kafka.connect.connector.Connector;
import org.apache.kafka.connect.errors.NotFoundException;
@@ -229,10 +228,10 @@ public abstract class AbstractHerder implements Herder, TaskStatus.Listener, Con
}
@Override
- public ConfigInfos validateConnectorConfig(Map<String, String> connectorConfig) {
- String connType = connectorConfig.get(ConnectorConfig.CONNECTOR_CLASS_CONFIG);
+ public ConfigInfos validateConnectorConfig(Map<String, String> connectorProps) {
+ String connType = connectorProps.get(ConnectorConfig.CONNECTOR_CLASS_CONFIG);
if (connType == null)
- throw new BadRequestException("Connector config " + connectorConfig + " contains no connector type");
+ throw new BadRequestException("Connector config " + connectorProps + " contains no connector type");
List<ConfigValue> configValues = new ArrayList<>();
Map<String, ConfigKey> configKeys = new LinkedHashMap<>();
@@ -241,43 +240,26 @@ public abstract class AbstractHerder implements Herder, TaskStatus.Listener, Con
Connector connector = getConnector(connType);
ClassLoader savedLoader = plugins().compareAndSwapLoaders(connector);
try {
- // do basic connector validation (name, connector type, etc.)
- ConfigDef basicConfigDef = (connector instanceof SourceConnector)
- ? SourceConnectorConfig.configDef()
- : SinkConnectorConfig.configDef();
+ ConfigDef baseConfigDef = (connector instanceof SourceConnector)
+ ? SourceConnectorConfig.configDef()
+ : SinkConnectorConfig.configDef();
+ ConfigDef enrichedConfigDef = ConnectorConfig.enrich(plugins(), baseConfigDef, connectorProps, false);
Map<String, ConfigValue> validatedConnectorConfig = validateBasicConnectorConfig(
connector,
- basicConfigDef,
- connectorConfig
+ enrichedConfigDef,
+ connectorProps
);
configValues.addAll(validatedConnectorConfig.values());
- configKeys.putAll(basicConfigDef.configKeys());
- allGroups.addAll(basicConfigDef.groups());
-
- ConnectorConfig connectorConfigToEnrich = (connector instanceof SourceConnector)
- ? new SourceConnectorConfig(plugins(), connectorConfig)
- : new SinkConnectorConfig(plugins(), connectorConfig);
- final ConfigDef connectorConfigDef = connectorConfigToEnrich.enrich(
- plugins(),
- basicConfigDef,
- connectorConfig,
- false
- );
-
- // Override is required here after the enriched ConfigDef has been created successfully
- configKeys.putAll(connectorConfigDef.configKeys());
- allGroups.addAll(connectorConfigDef.groups());
+ configKeys.putAll(enrichedConfigDef.configKeys());
+ allGroups.addAll(enrichedConfigDef.groups());
// do custom connector-specific validation
- Config config = connector.validate(connectorConfig);
+ Config config = connector.validate(connectorProps);
ConfigDef configDef = connector.config();
configKeys.putAll(configDef.configKeys());
allGroups.addAll(configDef.groups());
configValues.addAll(config.configValues());
return generateResult(connType, configKeys, configValues, new ArrayList<>(allGroups));
- } catch (ConfigException e) {
- // Basic validation must have failed. Return the result.
- return generateResult(connType, configKeys, configValues, new ArrayList<>(allGroups));
} finally {
Plugins.compareAndSwapLoaders(savedLoader);
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/96587f4b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/ConnectorConfig.java
----------------------------------------------------------------------
diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/ConnectorConfig.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/ConnectorConfig.java
index 869cfbd..0f8c390 100644
--- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/ConnectorConfig.java
+++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/ConnectorConfig.java
@@ -164,7 +164,7 @@ public class ConnectorConfig extends AbstractConfig {
* <p>
* {@code requireFullConfig} specifies whether required config values that are missing should cause an exception to be thrown.
*/
- public ConfigDef enrich(Plugins plugins, ConfigDef baseConfigDef, Map<String, String> props, boolean requireFullConfig) {
+ public static ConfigDef enrich(Plugins plugins, ConfigDef baseConfigDef, Map<String, String> props, boolean requireFullConfig) {
Object transformAliases = ConfigDef.parseType(TRANSFORMS_CONFIG, props.get(TRANSFORMS_CONFIG), Type.LIST);
if (!(transformAliases instanceof List)) {
return baseConfigDef;
http://git-wip-us.apache.org/repos/asf/kafka/blob/96587f4b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/SourceConnectorConfig.java
----------------------------------------------------------------------
diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/SourceConnectorConfig.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/SourceConnectorConfig.java
index 6915421..ab8fd01 100644
--- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/SourceConnectorConfig.java
+++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/SourceConnectorConfig.java
@@ -23,7 +23,11 @@ import java.util.Map;
public class SourceConnectorConfig extends ConnectorConfig {
- private static ConfigDef config = configDef();
+ private static ConfigDef config = ConnectorConfig.configDef();
+
+ public static ConfigDef configDef() {
+ return config;
+ }
public SourceConnectorConfig(Plugins plugins, Map<String, String> props) {
super(plugins, config, props);
http://git-wip-us.apache.org/repos/asf/kafka/blob/96587f4b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/AbstractHerderTest.java
----------------------------------------------------------------------
diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/AbstractHerderTest.java b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/AbstractHerderTest.java
index b9276af..c261ab6 100644
--- a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/AbstractHerderTest.java
+++ b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/AbstractHerderTest.java
@@ -16,9 +16,17 @@
*/
package org.apache.kafka.connect.runtime;
+import org.apache.kafka.common.config.ConfigDef;
+import org.apache.kafka.connect.connector.ConnectRecord;
+import org.apache.kafka.connect.connector.Connector;
+import org.apache.kafka.connect.runtime.isolation.PluginDesc;
+import org.apache.kafka.connect.runtime.isolation.Plugins;
+import org.apache.kafka.connect.runtime.rest.entities.ConfigInfos;
import org.apache.kafka.connect.runtime.rest.entities.ConnectorStateInfo;
+import org.apache.kafka.connect.runtime.rest.errors.BadRequestException;
import org.apache.kafka.connect.storage.ConfigBackingStore;
import org.apache.kafka.connect.storage.StatusBackingStore;
+import org.apache.kafka.connect.transforms.Transformation;
import org.apache.kafka.connect.util.ConnectorTaskId;
import org.easymock.Capture;
import org.easymock.EasyMock;
@@ -26,19 +34,29 @@ import org.easymock.EasyMockSupport;
import org.easymock.IAnswer;
import org.junit.Test;
+import java.util.Arrays;
import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertTrue;
public class AbstractHerderTest extends EasyMockSupport {
+ private final Worker worker = strictMock(Worker.class);
+ private final String workerId = "workerId";
+ private final int generation = 5;
+ private final String connector = "connector";
+ private final Plugins plugins = strictMock(Plugins.class);
+ private final ClassLoader classLoader = strictMock(ClassLoader.class);
@Test
public void connectorStatus() {
- Worker worker = null;
- String workerId = "workerId";
- String connector = "connector";
- int generation = 5;
ConnectorTaskId taskId = new ConnectorTaskId(connector, 0);
ConfigBackingStore configStore = strictMock(ConfigBackingStore.class);
@@ -79,7 +97,6 @@ public class AbstractHerderTest extends EasyMockSupport {
@Test
public void taskStatus() {
- Worker worker = null;
ConnectorTaskId taskId = new ConnectorTaskId("connector", 0);
String workerId = "workerId";
@@ -117,4 +134,135 @@ public class AbstractHerderTest extends EasyMockSupport {
verifyAll();
}
+
+
+ @Test(expected = BadRequestException.class)
+ public void testConfigValidationEmptyConfig() {
+ AbstractHerder herder = createConfigValidationHerder(TestSourceConnector.class);
+ replayAll();
+
+ herder.validateConnectorConfig(new HashMap<String, String>());
+
+ verifyAll();
+ }
+
+ @Test()
+ public void testConfigValidationMissingName() {
+ AbstractHerder herder = createConfigValidationHerder(TestSourceConnector.class);
+ replayAll();
+
+ Map<String, String> config = Collections.singletonMap(ConnectorConfig.CONNECTOR_CLASS_CONFIG, TestSourceConnector.class.getName());
+ ConfigInfos result = herder.validateConnectorConfig(config);
+
+ // We expect there to be errors due to the missing name and .... Note that these assertions depend heavily on
+ // the config fields for SourceConnectorConfig, but we expect these to change rarely.
+ assertEquals(TestSourceConnector.class.getName(), result.name());
+ assertEquals(Arrays.asList(ConnectorConfig.COMMON_GROUP, ConnectorConfig.TRANSFORMS_GROUP), result.groups());
+ assertEquals(2, result.errorCount());
+ // Base connector config has 6 fields, connector's configs add 2
+ assertEquals(8, result.values().size());
+ // Missing name should generate an error
+ assertEquals(ConnectorConfig.NAME_CONFIG, result.values().get(0).configValue().name());
+ assertEquals(1, result.values().get(0).configValue().errors().size());
+ // "required" config from connector should generate an error
+ assertEquals("required", result.values().get(6).configValue().name());
+ assertEquals(1, result.values().get(6).configValue().errors().size());
+
+ verifyAll();
+ }
+
+ @Test()
+ public void testConfigValidationTransformsExtendResults() {
+ AbstractHerder herder = createConfigValidationHerder(TestSourceConnector.class);
+
+ // 2 transform aliases defined -> 2 plugin lookups
+ Set<PluginDesc<Transformation>> transformations = new HashSet<>();
+ transformations.add(new PluginDesc<Transformation>(SampleTransformation.class, "1.0", classLoader));
+ EasyMock.expect(plugins.transformations()).andReturn(transformations).times(2);
+
+ replayAll();
+
+ // Define 2 transformations. One has a class defined and so can get embedded configs, the other is missing
+ // class info that should generate an error.
+ Map<String, String> config = new HashMap<>();
+ config.put(ConnectorConfig.CONNECTOR_CLASS_CONFIG, TestSourceConnector.class.getName());
+ config.put(ConnectorConfig.NAME_CONFIG, "connector-name");
+ config.put(ConnectorConfig.TRANSFORMS_CONFIG, "xformA,xformB");
+ config.put(ConnectorConfig.TRANSFORMS_CONFIG + ".xformA.type", SampleTransformation.class.getName());
+ config.put("required", "value"); // connector required config
+ ConfigInfos result = herder.validateConnectorConfig(config);
+
+ // We expect there to be errors due to the missing name and .... Note that these assertions depend heavily on
+ // the config fields for SourceConnectorConfig, but we expect these to change rarely.
+ assertEquals(TestSourceConnector.class.getName(), result.name());
+ // Each transform also gets its own group
+ List<String> expectedGroups = Arrays.asList(
+ ConnectorConfig.COMMON_GROUP,
+ ConnectorConfig.TRANSFORMS_GROUP,
+ "Transforms: xformA",
+ "Transforms: xformB"
+ );
+ assertEquals(expectedGroups, result.groups());
+ assertEquals(2, result.errorCount());
+ // Base connector config has 6 fields, connector's configs add 2, 2 type fields from the transforms, and
+ // 1 from the valid transformation's config
+ assertEquals(11, result.values().size());
+ // Should get 2 type fields from the transforms, first adds its own config since it has a valid class
+ assertEquals("transforms.xformA.type", result.values().get(6).configValue().name());
+ assertTrue(result.values().get(6).configValue().errors().isEmpty());
+ assertEquals("transforms.xformA.subconfig", result.values().get(7).configValue().name());
+ assertEquals("transforms.xformB.type", result.values().get(8).configValue().name());
+ assertFalse(result.values().get(8).configValue().errors().isEmpty());
+
+ verifyAll();
+ }
+
+ private AbstractHerder createConfigValidationHerder(Class<? extends Connector> connectorClass) {
+
+
+ ConfigBackingStore configStore = strictMock(ConfigBackingStore.class);
+ StatusBackingStore statusStore = strictMock(StatusBackingStore.class);
+
+ AbstractHerder herder = partialMockBuilder(AbstractHerder.class)
+ .withConstructor(Worker.class, String.class, StatusBackingStore.class, ConfigBackingStore.class)
+ .withArgs(worker, workerId, statusStore, configStore)
+ .addMockedMethod("generation")
+ .createMock();
+ EasyMock.expect(herder.generation()).andStubReturn(generation);
+
+ // Call to validateConnectorConfig
+ EasyMock.expect(worker.getPlugins()).andStubReturn(plugins);
+ final Connector connector;
+ try {
+ connector = connectorClass.newInstance();
+ } catch (InstantiationException | IllegalAccessException e) {
+ throw new RuntimeException("Couldn't create connector", e);
+ }
+ EasyMock.expect(plugins.newConnector(connectorClass.getName())).andReturn(connector);
+ EasyMock.expect(plugins.compareAndSwapLoaders(connector)).andReturn(classLoader);
+ return herder;
+ }
+
+ public static class SampleTransformation<R extends ConnectRecord<R>> implements Transformation<R> {
+ @Override
+ public void configure(Map<String, ?> configs) {
+
+ }
+
+ @Override
+ public R apply(R record) {
+ return record;
+ }
+
+ @Override
+ public ConfigDef config() {
+ return new ConfigDef()
+ .define("subconfig", ConfigDef.Type.STRING, "default", ConfigDef.Importance.LOW, "docs");
+ }
+
+ @Override
+ public void close() {
+
+ }
+ }
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/96587f4b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/TestSinkConnector.java
----------------------------------------------------------------------
diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/TestSinkConnector.java b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/TestSinkConnector.java
new file mode 100644
index 0000000..a6e3bb1
--- /dev/null
+++ b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/TestSinkConnector.java
@@ -0,0 +1,61 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.connect.runtime;
+
+import org.apache.kafka.common.config.ConfigDef;
+import org.apache.kafka.connect.connector.Task;
+import org.apache.kafka.connect.sink.SinkConnector;
+
+import java.util.List;
+import java.util.Map;
+
+public class TestSinkConnector extends SinkConnector {
+
+ public static final String VERSION = "some great version";
+
+ @Override
+ public String version() {
+ return VERSION;
+ }
+
+ @Override
+ public void start(Map<String, String> props) {
+
+ }
+
+ @Override
+ public Class<? extends Task> taskClass() {
+ return null;
+ }
+
+ @Override
+ public List<Map<String, String>> taskConfigs(int maxTasks) {
+ return null;
+ }
+
+ @Override
+ public void stop() {
+
+ }
+
+ @Override
+ public ConfigDef config() {
+ return new ConfigDef()
+ .define("required", ConfigDef.Type.STRING, ConfigDef.Importance.HIGH, "required docs")
+ .define("optional", ConfigDef.Type.STRING, "defaultVal", ConfigDef.Importance.HIGH, "optional docs");
+ }
+}
http://git-wip-us.apache.org/repos/asf/kafka/blob/96587f4b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/TestSourceConnector.java
----------------------------------------------------------------------
diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/TestSourceConnector.java b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/TestSourceConnector.java
new file mode 100644
index 0000000..5f754e2
--- /dev/null
+++ b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/TestSourceConnector.java
@@ -0,0 +1,61 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.connect.runtime;
+
+import org.apache.kafka.common.config.ConfigDef;
+import org.apache.kafka.connect.connector.Task;
+import org.apache.kafka.connect.source.SourceConnector;
+
+import java.util.List;
+import java.util.Map;
+
+public class TestSourceConnector extends SourceConnector {
+
+ public static final String VERSION = "an entirely different version";
+
+ @Override
+ public String version() {
+ return VERSION;
+ }
+
+ @Override
+ public void start(Map<String, String> props) {
+
+ }
+
+ @Override
+ public Class<? extends Task> taskClass() {
+ return null;
+ }
+
+ @Override
+ public List<Map<String, String>> taskConfigs(int maxTasks) {
+ return null;
+ }
+
+ @Override
+ public void stop() {
+
+ }
+
+ @Override
+ public ConfigDef config() {
+ return new ConfigDef()
+ .define("required", ConfigDef.Type.STRING, ConfigDef.Importance.HIGH, "required docs")
+ .define("optional", ConfigDef.Type.STRING, "defaultVal", ConfigDef.Importance.HIGH, "optional docs");
+ }
+}
http://git-wip-us.apache.org/repos/asf/kafka/blob/96587f4b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/distributed/DistributedHerderTest.java
----------------------------------------------------------------------
diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/distributed/DistributedHerderTest.java b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/distributed/DistributedHerderTest.java
index 18d83c5..7834a89 100644
--- a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/distributed/DistributedHerderTest.java
+++ b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/distributed/DistributedHerderTest.java
@@ -320,7 +320,7 @@ public class DistributedHerderTest {
// config validation
Connector connectorMock = PowerMock.createMock(Connector.class);
- EasyMock.expect(worker.getPlugins()).andReturn(plugins).times(4);
+ EasyMock.expect(worker.getPlugins()).andReturn(plugins).times(3);
EasyMock.expect(plugins.compareAndSwapLoaders(connectorMock)).andReturn(delegatingLoader);
EasyMock.expect(plugins.newConnector(EasyMock.anyString())).andReturn(connectorMock);
EasyMock.expect(connectorMock.config()).andReturn(new ConfigDef());
@@ -362,6 +362,11 @@ public class DistributedHerderTest {
EasyMock.expect(worker.getPlugins()).andReturn(plugins).times(3);
EasyMock.expect(plugins.compareAndSwapLoaders(connectorMock)).andReturn(delegatingLoader);
EasyMock.expect(plugins.newConnector(EasyMock.anyString())).andReturn(connectorMock);
+
+ EasyMock.expect(connectorMock.config()).andStubReturn(new ConfigDef());
+ ConfigValue validatedValue = new ConfigValue("foo.bar");
+ EasyMock.expect(connectorMock.validate(config)).andReturn(new Config(singletonList(validatedValue)));
+
EasyMock.expect(Plugins.compareAndSwapLoaders(delegatingLoader)).andReturn(pluginLoader);
// CONN2 creation should fail
@@ -396,7 +401,7 @@ public class DistributedHerderTest {
// config validation
Connector connectorMock = PowerMock.createMock(Connector.class);
- EasyMock.expect(worker.getPlugins()).andReturn(plugins).times(4);
+ EasyMock.expect(worker.getPlugins()).andReturn(plugins).times(3);
EasyMock.expect(plugins.compareAndSwapLoaders(connectorMock)).andReturn(delegatingLoader);
EasyMock.expect(plugins.newConnector(EasyMock.anyString())).andReturn(connectorMock);
@@ -444,7 +449,7 @@ public class DistributedHerderTest {
// config validation
Connector connectorMock = PowerMock.createMock(SinkConnector.class);
- EasyMock.expect(worker.getPlugins()).andReturn(plugins).times(4);
+ EasyMock.expect(worker.getPlugins()).andReturn(plugins).times(3);
EasyMock.expect(plugins.compareAndSwapLoaders(connectorMock)).andReturn(delegatingLoader);
EasyMock.expect(plugins.newConnector(EasyMock.anyString())).andReturn(connectorMock);
EasyMock.expect(connectorMock.config()).andReturn(new ConfigDef());
@@ -1263,7 +1268,7 @@ public class DistributedHerderTest {
// config validation
Connector connectorMock = PowerMock.createMock(Connector.class);
- EasyMock.expect(worker.getPlugins()).andReturn(plugins).times(6);
+ EasyMock.expect(worker.getPlugins()).andReturn(plugins).times(5);
EasyMock.expect(plugins.compareAndSwapLoaders(connectorMock)).andReturn(delegatingLoader);
EasyMock.expect(plugins.newConnector(EasyMock.anyString())).andReturn(connectorMock);
EasyMock.expect(connectorMock.config()).andReturn(new ConfigDef());
http://git-wip-us.apache.org/repos/asf/kafka/blob/96587f4b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/rest/resources/ConnectorPluginsResourceTest.java
----------------------------------------------------------------------
diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/rest/resources/ConnectorPluginsResourceTest.java b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/rest/resources/ConnectorPluginsResourceTest.java
index 2d0448e..538e5db 100644
--- a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/rest/resources/ConnectorPluginsResourceTest.java
+++ b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/rest/resources/ConnectorPluginsResourceTest.java
@@ -30,6 +30,8 @@ import org.apache.kafka.connect.connector.Task;
import org.apache.kafka.connect.runtime.AbstractHerder;
import org.apache.kafka.connect.runtime.ConnectorConfig;
import org.apache.kafka.connect.runtime.Herder;
+import org.apache.kafka.connect.runtime.TestSinkConnector;
+import org.apache.kafka.connect.runtime.TestSourceConnector;
import org.apache.kafka.connect.runtime.isolation.PluginClassLoader;
import org.apache.kafka.connect.runtime.isolation.PluginDesc;
import org.apache.kafka.connect.runtime.isolation.Plugins;
@@ -493,76 +495,6 @@ public class ConnectorPluginsResourceTest {
}
}
- public static class TestSinkConnector extends SinkConnector {
-
- static final String VERSION = "some great version";
-
- @Override
- public String version() {
- return VERSION;
- }
-
- @Override
- public void start(Map<String, String> props) {
-
- }
-
- @Override
- public Class<? extends Task> taskClass() {
- return null;
- }
-
- @Override
- public List<Map<String, String>> taskConfigs(int maxTasks) {
- return null;
- }
-
- @Override
- public void stop() {
-
- }
-
- @Override
- public ConfigDef config() {
- return null;
- }
- }
-
- public static class TestSourceConnector extends SourceConnector {
-
- static final String VERSION = "an entirely different version";
-
- @Override
- public String version() {
- return VERSION;
- }
-
- @Override
- public void start(Map<String, String> props) {
-
- }
-
- @Override
- public Class<? extends Task> taskClass() {
- return null;
- }
-
- @Override
- public List<Map<String, String>> taskConfigs(int maxTasks) {
- return null;
- }
-
- @Override
- public void stop() {
-
- }
-
- @Override
- public ConfigDef config() {
- return null;
- }
- }
-
/* Name here needs to be unique as we are testing the aliasing mechanism */
public static class ConnectorPluginsResourceTestConnector extends Connector {
http://git-wip-us.apache.org/repos/asf/kafka/blob/96587f4b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/standalone/StandaloneHerderTest.java
----------------------------------------------------------------------
diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/standalone/StandaloneHerderTest.java b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/standalone/StandaloneHerderTest.java
index 1c3dddb..c58d702 100644
--- a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/standalone/StandaloneHerderTest.java
+++ b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/standalone/StandaloneHerderTest.java
@@ -128,6 +128,7 @@ public class StandaloneHerderTest {
@Test
public void testCreateConnectorFailedBasicValidation() throws Exception {
+ // Basic validation should be performed and return an error, but should still evaluate the connector's config
connector = PowerMock.createMock(BogusSourceConnector.class);
Map<String, String> config = connectorConfig(SourceSink.SOURCE);
@@ -137,7 +138,11 @@ public class StandaloneHerderTest {
EasyMock.expect(worker.getPlugins()).andReturn(plugins).times(3);
EasyMock.expect(plugins.compareAndSwapLoaders(connectorMock)).andReturn(delegatingLoader);
EasyMock.expect(plugins.newConnector(EasyMock.anyString())).andReturn(connectorMock);
+
EasyMock.expect(connectorMock.config()).andStubReturn(new ConfigDef());
+
+ ConfigValue validatedValue = new ConfigValue("foo.bar");
+ EasyMock.expect(connectorMock.validate(config)).andReturn(new Config(singletonList(validatedValue)));
EasyMock.expect(Plugins.compareAndSwapLoaders(delegatingLoader)).andReturn(pluginLoader);
createCallback.onCompletion(EasyMock.<BadRequestException>anyObject(), EasyMock.<Herder.Created<ConnectorInfo>>isNull());
@@ -155,7 +160,7 @@ public class StandaloneHerderTest {
connector = PowerMock.createMock(BogusSourceConnector.class);
Connector connectorMock = PowerMock.createMock(Connector.class);
- EasyMock.expect(worker.getPlugins()).andReturn(plugins).times(4);
+ EasyMock.expect(worker.getPlugins()).andReturn(plugins).times(3);
EasyMock.expect(plugins.compareAndSwapLoaders(connectorMock)).andReturn(delegatingLoader);
EasyMock.expect(plugins.newConnector(EasyMock.anyString())).andReturn(connectorMock);
@@ -189,7 +194,7 @@ public class StandaloneHerderTest {
Connector connectorMock = PowerMock.createMock(Connector.class);
expectConfigValidation(connectorMock, true, config, config);
- EasyMock.expect(worker.getPlugins()).andReturn(plugins).times(3);
+ EasyMock.expect(worker.getPlugins()).andReturn(plugins).times(2);
EasyMock.expect(plugins.compareAndSwapLoaders(connectorMock)).andReturn(delegatingLoader);
// No new connector is created
EasyMock.expect(Plugins.compareAndSwapLoaders(delegatingLoader)).andReturn(pluginLoader);
@@ -522,7 +527,7 @@ public class StandaloneHerderTest {
);
ConfigDef configDef = new ConfigDef();
configDef.define(key, ConfigDef.Type.STRING, ConfigDef.Importance.HIGH, "");
- EasyMock.expect(worker.getPlugins()).andReturn(plugins).times(4);
+ EasyMock.expect(worker.getPlugins()).andReturn(plugins).times(3);
EasyMock.expect(plugins.compareAndSwapLoaders(connectorMock)).andReturn(delegatingLoader);
EasyMock.expect(worker.getPlugins()).andStubReturn(plugins);
EasyMock.expect(plugins.newConnector(EasyMock.anyString())).andReturn(connectorMock);
@@ -620,7 +625,7 @@ public class StandaloneHerderTest {
Map<String, String>... configs
) {
// config validation
- EasyMock.expect(worker.getPlugins()).andReturn(plugins).times(4);
+ EasyMock.expect(worker.getPlugins()).andReturn(plugins).times(3);
EasyMock.expect(plugins.compareAndSwapLoaders(connectorMock)).andReturn(delegatingLoader);
if (shouldCreateConnector) {
EasyMock.expect(worker.getPlugins()).andReturn(plugins);