You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by ew...@apache.org on 2016/05/09 05:26:45 UTC
kafka git commit: KAFKA 3671: Move topics to SinkConnectorConfig
Repository: kafka
Updated Branches:
refs/heads/trunk 62b9fa225 -> d1bb2b9df
KAFKA 3671: Move topics to SinkConnectorConfig
Author: Liquan Pei <li...@gmail.com>
Reviewers: Dan Norwood <no...@confluent.io>, Ewen Cheslack-Postava <ew...@confluent.io>
Closes #1335 from Ishiihara/sink-connector-config
Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/d1bb2b9d
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/d1bb2b9d
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/d1bb2b9d
Branch: refs/heads/trunk
Commit: d1bb2b9df105169c47f16d057c887acb7f8fe818
Parents: 62b9fa2
Author: Liquan Pei <li...@gmail.com>
Authored: Sun May 8 22:26:26 2016 -0700
Committer: Ewen Cheslack-Postava <me...@ewencp.org>
Committed: Sun May 8 22:26:26 2016 -0700
----------------------------------------------------------------------
.../kafka/connect/runtime/AbstractHerder.java | 11 ++--
.../kafka/connect/runtime/ConnectorConfig.java | 20 ++++---
.../connect/runtime/SinkConnectorConfig.java | 46 ++++++++++++++++
.../connect/runtime/SourceConnectorConfig.java | 27 ++++++++++
.../runtime/distributed/DistributedHerder.java | 13 +++--
.../runtime/standalone/StandaloneHerder.java | 19 +++++--
.../kafka/connect/runtime/WorkerTest.java | 8 +--
.../distributed/DistributedHerderTest.java | 10 ++--
.../standalone/StandaloneHerderTest.java | 55 ++++++++++++--------
9 files changed, 155 insertions(+), 54 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/kafka/blob/d1bb2b9d/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/AbstractHerder.java
----------------------------------------------------------------------
diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/AbstractHerder.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/AbstractHerder.java
index ee20859..43fc4d1 100644
--- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/AbstractHerder.java
+++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/AbstractHerder.java
@@ -29,6 +29,7 @@ import org.apache.kafka.connect.runtime.rest.entities.ConfigKeyInfo;
import org.apache.kafka.connect.runtime.rest.entities.ConfigValueInfo;
import org.apache.kafka.connect.runtime.rest.entities.ConnectorPluginInfo;
import org.apache.kafka.connect.runtime.rest.entities.ConnectorStateInfo;
+import org.apache.kafka.connect.source.SourceConnector;
import org.apache.kafka.connect.storage.ConfigBackingStore;
import org.apache.kafka.connect.storage.StatusBackingStore;
import org.apache.kafka.connect.tools.VerifiableSinkConnector;
@@ -232,10 +233,14 @@ public abstract class AbstractHerder implements Herder, TaskStatus.Listener, Con
@Override
public ConfigInfos validateConfigs(String connType, Map<String, String> connectorConfig) {
- ConfigDef connectorConfigDef = ConnectorConfig.configDef();
- List<ConfigValue> connectorConfigValues = connectorConfigDef.validate(connectorConfig);
-
Connector connector = getConnector(connType);
+ ConfigDef connectorConfigDef;
+ if (connector instanceof SourceConnector) {
+ connectorConfigDef = SourceConnectorConfig.configDef();
+ } else {
+ connectorConfigDef = SinkConnectorConfig.configDef();
+ }
+ List<ConfigValue> connectorConfigValues = connectorConfigDef.validate(connectorConfig);
Config config = connector.validate(connectorConfig);
ConfigDef configDef = connector.config();
http://git-wip-us.apache.org/repos/asf/kafka/blob/d1bb2b9d/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/ConnectorConfig.java
----------------------------------------------------------------------
diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/ConnectorConfig.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/ConnectorConfig.java
index e439552..0cbfe21 100644
--- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/ConnectorConfig.java
+++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/ConnectorConfig.java
@@ -22,12 +22,12 @@ import org.apache.kafka.common.config.ConfigDef;
import org.apache.kafka.common.config.ConfigDef.Importance;
import org.apache.kafka.common.config.ConfigDef.Type;
import org.apache.kafka.common.config.ConfigDef.Width;
-import static org.apache.kafka.common.config.ConfigDef.Range.atLeast;
-
import java.util.HashMap;
import java.util.Map;
+import static org.apache.kafka.common.config.ConfigDef.Range.atLeast;
+
/**
* <p>
* Configuration options for Connectors. These only include Kafka Connect system-level configuration
@@ -40,7 +40,7 @@ import java.util.Map;
* </p>
*/
public class ConnectorConfig extends AbstractConfig {
- private static final String COMMON_GROUP = "Common";
+ protected static final String COMMON_GROUP = "Common";
public static final String NAME_CONFIG = "name";
private static final String NAME_DOC = "Globally unique name to use for this connector.";
@@ -60,19 +60,13 @@ public class ConnectorConfig extends AbstractConfig {
private static final String TASK_MAX_DISPLAY = "Tasks max";
- public static final String TOPICS_CONFIG = "topics";
- private static final String TOPICS_DOC = "";
- public static final String TOPICS_DEFAULT = "";
- private static final String TOPICS_DISPLAY = "Topics";
-
- private static ConfigDef config;
+ protected static ConfigDef config;
static {
config = new ConfigDef()
.define(NAME_CONFIG, Type.STRING, Importance.HIGH, NAME_DOC, COMMON_GROUP, 1, Width.MEDIUM, NAME_DISPLAY)
.define(CONNECTOR_CLASS_CONFIG, Type.STRING, Importance.HIGH, CONNECTOR_CLASS_DOC, COMMON_GROUP, 2, Width.LONG, CONNECTOR_CLASS_DISPLAY)
- .define(TASKS_MAX_CONFIG, Type.INT, TASKS_MAX_DEFAULT, atLeast(TASKS_MIN_CONFIG), Importance.HIGH, TASKS_MAX_DOC, COMMON_GROUP, 3, Width.SHORT, TASK_MAX_DISPLAY)
- .define(TOPICS_CONFIG, Type.LIST, TOPICS_DEFAULT, Importance.HIGH, TOPICS_DOC, COMMON_GROUP, 4, Width.LONG, TOPICS_DISPLAY);
+ .define(TASKS_MAX_CONFIG, Type.INT, TASKS_MAX_DEFAULT, atLeast(TASKS_MIN_CONFIG), Importance.HIGH, TASKS_MAX_DOC, COMMON_GROUP, 3, Width.SHORT, TASK_MAX_DISPLAY);
}
public static ConfigDef configDef() {
@@ -86,4 +80,8 @@ public class ConnectorConfig extends AbstractConfig {
public ConnectorConfig(Map<String, String> props) {
super(config, props);
}
+
+ public ConnectorConfig(ConfigDef subClassConfig, Map<String, String> props) {
+ super(subClassConfig, props);
+ }
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/d1bb2b9d/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/SinkConnectorConfig.java
----------------------------------------------------------------------
diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/SinkConnectorConfig.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/SinkConnectorConfig.java
new file mode 100644
index 0000000..cbfc6d1
--- /dev/null
+++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/SinkConnectorConfig.java
@@ -0,0 +1,46 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ **/
+
+package org.apache.kafka.connect.runtime;
+
+import org.apache.kafka.common.config.ConfigDef;
+
+import java.util.HashMap;
+import java.util.Map;
+
+/**
+ * Configuration needed for all sink connectors
+ */
+
+public class SinkConnectorConfig extends ConnectorConfig {
+
+ public static final String TOPICS_CONFIG = "topics";
+ private static final String TOPICS_DOC = "";
+ public static final String TOPICS_DEFAULT = "";
+ private static final String TOPICS_DISPLAY = "Topics";
+
+ static ConfigDef config = ConnectorConfig.configDef()
+ .define(TOPICS_CONFIG, ConfigDef.Type.LIST, TOPICS_DEFAULT, ConfigDef.Importance.HIGH, TOPICS_DOC, COMMON_GROUP, 4, ConfigDef.Width.LONG, TOPICS_DISPLAY);
+
+ public SinkConnectorConfig() {
+ this(new HashMap<String, String>());
+ }
+
+ public SinkConnectorConfig(Map<String, String> props) {
+ super(config, props);
+ }
+}
http://git-wip-us.apache.org/repos/asf/kafka/blob/d1bb2b9d/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/SourceConnectorConfig.java
----------------------------------------------------------------------
diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/SourceConnectorConfig.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/SourceConnectorConfig.java
new file mode 100644
index 0000000..ca9219f
--- /dev/null
+++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/SourceConnectorConfig.java
@@ -0,0 +1,27 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ **/
+
+package org.apache.kafka.connect.runtime;
+
+import java.util.Map;
+
+public class SourceConnectorConfig extends ConnectorConfig {
+
+ public SourceConnectorConfig(Map<String, String> props) {
+ super(config, props);
+ }
+}
http://git-wip-us.apache.org/repos/asf/kafka/blob/d1bb2b9d/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/DistributedHerder.java
----------------------------------------------------------------------
diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/DistributedHerder.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/DistributedHerder.java
index 037eba7..a2beff3 100644
--- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/DistributedHerder.java
+++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/DistributedHerder.java
@@ -28,6 +28,8 @@ import org.apache.kafka.connect.errors.NotFoundException;
import org.apache.kafka.connect.runtime.AbstractHerder;
import org.apache.kafka.connect.runtime.ConnectorConfig;
import org.apache.kafka.connect.runtime.HerderConnectorContext;
+import org.apache.kafka.connect.runtime.SinkConnectorConfig;
+import org.apache.kafka.connect.runtime.SourceConnectorConfig;
import org.apache.kafka.connect.runtime.TargetState;
import org.apache.kafka.connect.runtime.TaskConfig;
import org.apache.kafka.connect.runtime.Worker;
@@ -827,10 +829,15 @@ public class DistributedHerder extends AbstractHerder implements Runnable {
}
Map<String, String> configs = configState.connectorConfig(connName);
- ConnectorConfig connConfig = new ConnectorConfig(configs);
+
+ ConnectorConfig connConfig;
List<String> sinkTopics = null;
- if (worker.isSinkConnector(connName))
- sinkTopics = connConfig.getList(ConnectorConfig.TOPICS_CONFIG);
+ if (worker.isSinkConnector(connName)) {
+ connConfig = new SinkConnectorConfig(configs);
+ sinkTopics = connConfig.getList(SinkConnectorConfig.TOPICS_CONFIG);
+ } else {
+ connConfig = new SourceConnectorConfig(configs);
+ }
final List<Map<String, String>> taskProps
= worker.connectorTaskConfigs(connName, connConfig.getInt(ConnectorConfig.TASKS_MAX_CONFIG), sinkTopics);
http://git-wip-us.apache.org/repos/asf/kafka/blob/d1bb2b9d/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/standalone/StandaloneHerder.java
----------------------------------------------------------------------
diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/standalone/StandaloneHerder.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/standalone/StandaloneHerder.java
index 2316bae..8dbda74 100644
--- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/standalone/StandaloneHerder.java
+++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/standalone/StandaloneHerder.java
@@ -23,6 +23,8 @@ import org.apache.kafka.connect.errors.NotFoundException;
import org.apache.kafka.connect.runtime.AbstractHerder;
import org.apache.kafka.connect.runtime.ConnectorConfig;
import org.apache.kafka.connect.runtime.HerderConnectorContext;
+import org.apache.kafka.connect.runtime.SinkConnectorConfig;
+import org.apache.kafka.connect.runtime.SourceConnectorConfig;
import org.apache.kafka.connect.runtime.TargetState;
import org.apache.kafka.connect.runtime.TaskConfig;
import org.apache.kafka.connect.runtime.Worker;
@@ -251,11 +253,20 @@ public class StandaloneHerder extends AbstractHerder {
private List<Map<String, String>> recomputeTaskConfigs(String connName) {
Map<String, String> config = configState.connectorConfig(connName);
- ConnectorConfig connConfig = new ConnectorConfig(config);
- return worker.connectorTaskConfigs(connName,
- connConfig.getInt(ConnectorConfig.TASKS_MAX_CONFIG),
- connConfig.getList(ConnectorConfig.TOPICS_CONFIG));
+ ConnectorConfig connConfig;
+ if (worker.isSinkConnector(connName)) {
+ connConfig = new SinkConnectorConfig(config);
+ return worker.connectorTaskConfigs(connName,
+ connConfig.getInt(ConnectorConfig.TASKS_MAX_CONFIG),
+ connConfig.getList(SinkConnectorConfig.TOPICS_CONFIG));
+ } else {
+ connConfig = new SourceConnectorConfig(config);
+ return worker.connectorTaskConfigs(connName,
+ connConfig.getInt(ConnectorConfig.TASKS_MAX_CONFIG),
+ null);
+ }
+
}
private void createConnectorTasks(String connName, TargetState initialState) {
http://git-wip-us.apache.org/repos/asf/kafka/blob/d1bb2b9d/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerTest.java
----------------------------------------------------------------------
diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerTest.java b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerTest.java
index 2004c99..ec4f025 100644
--- a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerTest.java
+++ b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerTest.java
@@ -97,7 +97,7 @@ public class WorkerTest extends ThreadedTest {
EasyMock.expect(connector.version()).andReturn("1.0");
Map<String, String> props = new HashMap<>();
- props.put(ConnectorConfig.TOPICS_CONFIG, "foo,bar");
+ props.put(SinkConnectorConfig.TOPICS_CONFIG, "foo,bar");
props.put(ConnectorConfig.TASKS_MAX_CONFIG, "1");
props.put(ConnectorConfig.NAME_CONFIG, CONNECTOR_ID);
props.put(ConnectorConfig.CONNECTOR_CLASS_CONFIG, WorkerTestConnector.class.getName());
@@ -155,7 +155,7 @@ public class WorkerTest extends ThreadedTest {
EasyMock.expect(connector.version()).andReturn("1.0");
Map<String, String> props = new HashMap<>();
- props.put(ConnectorConfig.TOPICS_CONFIG, "foo,bar");
+ props.put(SinkConnectorConfig.TOPICS_CONFIG, "foo,bar");
props.put(ConnectorConfig.TASKS_MAX_CONFIG, "1");
props.put(ConnectorConfig.NAME_CONFIG, CONNECTOR_ID);
props.put(ConnectorConfig.CONNECTOR_CLASS_CONFIG, "WorkerTestConnector");
@@ -208,7 +208,7 @@ public class WorkerTest extends ThreadedTest {
EasyMock.expect(connector.version()).andReturn("1.0");
Map<String, String> props = new HashMap<>();
- props.put(ConnectorConfig.TOPICS_CONFIG, "foo,bar");
+ props.put(SinkConnectorConfig.TOPICS_CONFIG, "foo,bar");
props.put(ConnectorConfig.TASKS_MAX_CONFIG, "1");
props.put(ConnectorConfig.NAME_CONFIG, CONNECTOR_ID);
props.put(ConnectorConfig.CONNECTOR_CLASS_CONFIG, "WorkerTest");
@@ -274,7 +274,7 @@ public class WorkerTest extends ThreadedTest {
EasyMock.expect(connector.version()).andReturn("1.0");
Map<String, String> props = new HashMap<>();
- props.put(ConnectorConfig.TOPICS_CONFIG, "foo,bar");
+ props.put(SinkConnectorConfig.TOPICS_CONFIG, "foo,bar");
props.put(ConnectorConfig.TASKS_MAX_CONFIG, "1");
props.put(ConnectorConfig.NAME_CONFIG, CONNECTOR_ID);
props.put(ConnectorConfig.CONNECTOR_CLASS_CONFIG, WorkerTestConnector.class.getName());
http://git-wip-us.apache.org/repos/asf/kafka/blob/d1bb2b9d/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/distributed/DistributedHerderTest.java
----------------------------------------------------------------------
diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/distributed/DistributedHerderTest.java b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/distributed/DistributedHerderTest.java
index e62b663..fbccc55 100644
--- a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/distributed/DistributedHerderTest.java
+++ b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/distributed/DistributedHerderTest.java
@@ -24,6 +24,7 @@ import org.apache.kafka.connect.errors.AlreadyExistsException;
import org.apache.kafka.connect.errors.NotFoundException;
import org.apache.kafka.connect.runtime.ConnectorConfig;
import org.apache.kafka.connect.runtime.Herder;
+import org.apache.kafka.connect.runtime.SinkConnectorConfig;
import org.apache.kafka.connect.runtime.TargetState;
import org.apache.kafka.connect.runtime.TaskConfig;
import org.apache.kafka.connect.runtime.Worker;
@@ -96,18 +97,18 @@ public class DistributedHerderTest {
static {
CONN1_CONFIG.put(ConnectorConfig.NAME_CONFIG, CONN1);
CONN1_CONFIG.put(ConnectorConfig.TASKS_MAX_CONFIG, MAX_TASKS.toString());
- CONN1_CONFIG.put(ConnectorConfig.TOPICS_CONFIG, "foo,bar");
+ CONN1_CONFIG.put(SinkConnectorConfig.TOPICS_CONFIG, "foo,bar");
CONN1_CONFIG.put(ConnectorConfig.CONNECTOR_CLASS_CONFIG, BogusSourceConnector.class.getName());
}
private static final Map<String, String> CONN1_CONFIG_UPDATED = new HashMap<>(CONN1_CONFIG);
static {
- CONN1_CONFIG_UPDATED.put(ConnectorConfig.TOPICS_CONFIG, "foo,bar,baz");
+ CONN1_CONFIG_UPDATED.put(SinkConnectorConfig.TOPICS_CONFIG, "foo,bar,baz");
}
private static final Map<String, String> CONN2_CONFIG = new HashMap<>();
static {
CONN2_CONFIG.put(ConnectorConfig.NAME_CONFIG, CONN2);
CONN2_CONFIG.put(ConnectorConfig.TASKS_MAX_CONFIG, MAX_TASKS.toString());
- CONN2_CONFIG.put(ConnectorConfig.TOPICS_CONFIG, "foo,bar");
+ CONN2_CONFIG.put(SinkConnectorConfig.TOPICS_CONFIG, "foo,bar");
CONN2_CONFIG.put(ConnectorConfig.CONNECTOR_CLASS_CONFIG, BogusSourceConnector.class.getName());
}
private static final Map<String, String> TASK_CONFIG = new HashMap<>();
@@ -935,9 +936,6 @@ public class DistributedHerderTest {
herder.tick();
assertTrue(connectorConfigCb.isDone());
assertEquals(CONN1_CONFIG_UPDATED, connectorConfigCb.get());
- // The config passed to Worker should
- assertEquals(Arrays.asList("foo", "bar", "baz"),
- capturedUpdatedConfig.getValue().getList(ConnectorConfig.TOPICS_CONFIG));
PowerMock.verifyAll();
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/d1bb2b9d/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/standalone/StandaloneHerderTest.java
----------------------------------------------------------------------
diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/standalone/StandaloneHerderTest.java b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/standalone/StandaloneHerderTest.java
index 10e5194..e70b968 100644
--- a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/standalone/StandaloneHerderTest.java
+++ b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/standalone/StandaloneHerderTest.java
@@ -92,7 +92,7 @@ public class StandaloneHerderTest {
PowerMock.replayAll();
- herder.putConnectorConfig(CONNECTOR_NAME, connectorConfig(CONNECTOR_NAME, BogusSourceConnector.class), false, createCallback);
+ herder.putConnectorConfig(CONNECTOR_NAME, connectorConfig(CONNECTOR_NAME, BogusSourceConnector.class, false), false, createCallback);
PowerMock.verifyAll();
}
@@ -109,8 +109,8 @@ public class StandaloneHerderTest {
PowerMock.replayAll();
- herder.putConnectorConfig(CONNECTOR_NAME, connectorConfig(CONNECTOR_NAME, BogusSourceConnector.class), false, createCallback);
- herder.putConnectorConfig(CONNECTOR_NAME, connectorConfig(CONNECTOR_NAME, BogusSourceConnector.class), false, createCallback);
+ herder.putConnectorConfig(CONNECTOR_NAME, connectorConfig(CONNECTOR_NAME, BogusSourceConnector.class, false), false, createCallback);
+ herder.putConnectorConfig(CONNECTOR_NAME, connectorConfig(CONNECTOR_NAME, BogusSourceConnector.class, false), false, createCallback);
PowerMock.verifyAll();
}
@@ -122,7 +122,7 @@ public class StandaloneHerderTest {
PowerMock.replayAll();
- herder.putConnectorConfig(CONNECTOR_NAME, connectorConfig(CONNECTOR_NAME, BogusSinkConnector.class), false, createCallback);
+ herder.putConnectorConfig(CONNECTOR_NAME, connectorConfig(CONNECTOR_NAME, BogusSinkConnector.class, true), false, createCallback);
PowerMock.verifyAll();
}
@@ -139,7 +139,7 @@ public class StandaloneHerderTest {
PowerMock.replayAll();
- herder.putConnectorConfig(CONNECTOR_NAME, connectorConfig(CONNECTOR_NAME, BogusSourceConnector.class), false, createCallback);
+ herder.putConnectorConfig(CONNECTOR_NAME, connectorConfig(CONNECTOR_NAME, BogusSourceConnector.class, false), false, createCallback);
FutureCallback<Herder.Created<ConnectorInfo>> futureCb = new FutureCallback<>();
herder.putConnectorConfig(CONNECTOR_NAME, null, true, futureCb);
futureCb.get(1000L, TimeUnit.MILLISECONDS);
@@ -164,13 +164,13 @@ public class StandaloneHerderTest {
worker.stopConnector(CONNECTOR_NAME);
EasyMock.expectLastCall();
- worker.startConnector(EasyMock.eq(new ConnectorConfig(connectorConfig(CONNECTOR_NAME, BogusSourceConnector.class))),
+ worker.startConnector(EasyMock.eq(new ConnectorConfig(connectorConfig(CONNECTOR_NAME, BogusSourceConnector.class, false))),
EasyMock.anyObject(HerderConnectorContext.class), EasyMock.eq(herder), EasyMock.eq(TargetState.STARTED));
EasyMock.expectLastCall();
PowerMock.replayAll();
- herder.putConnectorConfig(CONNECTOR_NAME, connectorConfig(CONNECTOR_NAME, BogusSourceConnector.class), false, createCallback);
+ herder.putConnectorConfig(CONNECTOR_NAME, connectorConfig(CONNECTOR_NAME, BogusSourceConnector.class, false), false, createCallback);
FutureCallback<Void> cb = new FutureCallback<>();
herder.restartConnector(CONNECTOR_NAME, cb);
@@ -191,7 +191,7 @@ public class StandaloneHerderTest {
PowerMock.replayAll();
- herder.putConnectorConfig(CONNECTOR_NAME, connectorConfig(CONNECTOR_NAME, BogusSourceConnector.class), false, createCallback);
+ herder.putConnectorConfig(CONNECTOR_NAME, connectorConfig(CONNECTOR_NAME, BogusSourceConnector.class, false), false, createCallback);
FutureCallback<Void> cb = new FutureCallback<>();
herder.restartConnector(CONNECTOR_NAME, cb);
@@ -213,13 +213,13 @@ public class StandaloneHerderTest {
EasyMock.expectLastCall();
RuntimeException e = new RuntimeException();
- worker.startConnector(EasyMock.eq(new ConnectorConfig(connectorConfig(CONNECTOR_NAME, BogusSourceConnector.class))),
+ worker.startConnector(EasyMock.eq(new ConnectorConfig(connectorConfig(CONNECTOR_NAME, BogusSourceConnector.class, false))),
EasyMock.anyObject(HerderConnectorContext.class), EasyMock.eq(herder), EasyMock.eq(TargetState.STARTED));
EasyMock.expectLastCall().andThrow(e);
PowerMock.replayAll();
- herder.putConnectorConfig(CONNECTOR_NAME, connectorConfig(CONNECTOR_NAME, BogusSourceConnector.class), false, createCallback);
+ herder.putConnectorConfig(CONNECTOR_NAME, connectorConfig(CONNECTOR_NAME, BogusSourceConnector.class, false), false, createCallback);
FutureCallback<Void> cb = new FutureCallback<>();
herder.restartConnector(CONNECTOR_NAME, cb);
@@ -247,7 +247,7 @@ public class StandaloneHerderTest {
PowerMock.replayAll();
- herder.putConnectorConfig(CONNECTOR_NAME, connectorConfig(CONNECTOR_NAME, BogusSourceConnector.class), false, createCallback);
+ herder.putConnectorConfig(CONNECTOR_NAME, connectorConfig(CONNECTOR_NAME, BogusSourceConnector.class, false), false, createCallback);
FutureCallback<Void> cb = new FutureCallback<>();
herder.restartTask(taskId, cb);
@@ -269,7 +269,7 @@ public class StandaloneHerderTest {
PowerMock.replayAll();
- herder.putConnectorConfig(CONNECTOR_NAME, connectorConfig(CONNECTOR_NAME, BogusSourceConnector.class), false, createCallback);
+ herder.putConnectorConfig(CONNECTOR_NAME, connectorConfig(CONNECTOR_NAME, BogusSourceConnector.class, false), false, createCallback);
FutureCallback<Void> cb = new FutureCallback<>();
herder.restartTask(taskId, cb);
@@ -297,7 +297,7 @@ public class StandaloneHerderTest {
PowerMock.replayAll();
- herder.putConnectorConfig(CONNECTOR_NAME, connectorConfig(CONNECTOR_NAME, BogusSourceConnector.class), false, createCallback);
+ herder.putConnectorConfig(CONNECTOR_NAME, connectorConfig(CONNECTOR_NAME, BogusSourceConnector.class, false), false, createCallback);
FutureCallback<Void> cb = new FutureCallback<>();
herder.restartTask(taskId, cb);
@@ -325,7 +325,7 @@ public class StandaloneHerderTest {
PowerMock.replayAll();
- herder.putConnectorConfig(CONNECTOR_NAME, connectorConfig(CONNECTOR_NAME, BogusSourceConnector.class), false, createCallback);
+ herder.putConnectorConfig(CONNECTOR_NAME, connectorConfig(CONNECTOR_NAME, BogusSourceConnector.class, false), false, createCallback);
herder.stop();
PowerMock.verifyAll();
@@ -333,7 +333,7 @@ public class StandaloneHerderTest {
@Test
public void testAccessors() throws Exception {
- Map<String, String> connConfig = connectorConfig(CONNECTOR_NAME, BogusSourceConnector.class);
+ Map<String, String> connConfig = connectorConfig(CONNECTOR_NAME, BogusSourceConnector.class, false);
Callback<Collection<String>> listConnectorsCb = PowerMock.createMock(Callback.class);
Callback<ConnectorInfo> connectorInfoCb = PowerMock.createMock(Callback.class);
@@ -388,7 +388,7 @@ public class StandaloneHerderTest {
@Test
public void testPutConnectorConfig() throws Exception {
- Map<String, String> connConfig = connectorConfig(CONNECTOR_NAME, BogusSourceConnector.class);
+ Map<String, String> connConfig = connectorConfig(CONNECTOR_NAME, BogusSourceConnector.class, false);
Map<String, String> newConnConfig = new HashMap<>(connConfig);
newConnConfig.put("foo", "bar");
@@ -410,8 +410,10 @@ public class StandaloneHerderTest {
EasyMock.expectLastCall();
EasyMock.expect(worker.isRunning(CONNECTOR_NAME)).andReturn(true);
// Generate same task config, which should result in no additional action to restart tasks
- EasyMock.expect(worker.connectorTaskConfigs(CONNECTOR_NAME, DEFAULT_MAX_TASKS, TOPICS_LIST))
+ EasyMock.expect(worker.connectorTaskConfigs(CONNECTOR_NAME, DEFAULT_MAX_TASKS, null))
.andReturn(Collections.singletonList(taskConfig(BogusSourceTask.class, false)));
+ worker.isSinkConnector(CONNECTOR_NAME);
+ EasyMock.expectLastCall().andReturn(false);
ConnectorInfo newConnInfo = new ConnectorInfo(CONNECTOR_NAME, newConnConfig, Arrays.asList(new ConnectorTaskId(CONNECTOR_NAME, 0)));
putConnectorConfigCb.onCompletion(null, new Herder.Created<>(false, newConnInfo));
EasyMock.expectLastCall();
@@ -448,10 +450,11 @@ public class StandaloneHerderTest {
Class<? extends Connector> connClass,
Class<? extends Task> taskClass,
boolean sink) throws Exception {
- Map<String, String> connectorProps = connectorConfig(name, connClass);
+
+ Map<String, String> connectorProps = connectorConfig(name, connClass, sink);
worker.startConnector(EasyMock.eq(new ConnectorConfig(connectorProps)), EasyMock.anyObject(HerderConnectorContext.class),
- EasyMock.eq(herder), EasyMock.eq(TargetState.STARTED));
+ EasyMock.eq(herder), EasyMock.eq(TargetState.STARTED));
EasyMock.expectLastCall();
EasyMock.expect(worker.isRunning(name)).andReturn(true);
@@ -462,11 +465,15 @@ public class StandaloneHerderTest {
// And we should instantiate the tasks. For a sink task, we should see added properties for
// the input topic partitions
Map<String, String> generatedTaskProps = taskConfig(taskClass, sink);
- EasyMock.expect(worker.connectorTaskConfigs(CONNECTOR_NAME, DEFAULT_MAX_TASKS, TOPICS_LIST))
- .andReturn(Collections.singletonList(generatedTaskProps));
+
+ EasyMock.expect(worker.connectorTaskConfigs(CONNECTOR_NAME, DEFAULT_MAX_TASKS, sink ? TOPICS_LIST : null))
+ .andReturn(Collections.singletonList(generatedTaskProps));
worker.startTask(new ConnectorTaskId(CONNECTOR_NAME, 0), new TaskConfig(generatedTaskProps), herder, TargetState.STARTED);
EasyMock.expectLastCall();
+
+ worker.isSinkConnector(CONNECTOR_NAME);
+ PowerMock.expectLastCall().andReturn(sink);
}
private void expectStop() {
@@ -483,11 +490,13 @@ public class StandaloneHerderTest {
expectStop();
}
- private static HashMap<String, String> connectorConfig(String name, Class<? extends Connector> connClass) {
+ private static HashMap<String, String> connectorConfig(String name, Class<? extends Connector> connClass, boolean sink) {
HashMap<String, String> connectorProps = new HashMap<>();
connectorProps.put(ConnectorConfig.NAME_CONFIG, name);
- connectorProps.put(SinkConnector.TOPICS_CONFIG, TOPICS_LIST_STR);
connectorProps.put(ConnectorConfig.CONNECTOR_CLASS_CONFIG, connClass.getName());
+ if (sink) {
+ connectorProps.put(SinkConnector.TOPICS_CONFIG, TOPICS_LIST_STR);
+ }
return connectorProps;
}