You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by ew...@apache.org on 2016/05/09 05:26:45 UTC

kafka git commit: KAFKA 3671: Move topics to SinkConnectorConfig

Repository: kafka
Updated Branches:
  refs/heads/trunk 62b9fa225 -> d1bb2b9df


KAFKA 3671: Move topics to SinkConnectorConfig

Author: Liquan Pei <li...@gmail.com>

Reviewers: Dan Norwood <no...@confluent.io>, Ewen Cheslack-Postava <ew...@confluent.io>

Closes #1335 from Ishiihara/sink-connector-config


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/d1bb2b9d
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/d1bb2b9d
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/d1bb2b9d

Branch: refs/heads/trunk
Commit: d1bb2b9df105169c47f16d057c887acb7f8fe818
Parents: 62b9fa2
Author: Liquan Pei <li...@gmail.com>
Authored: Sun May 8 22:26:26 2016 -0700
Committer: Ewen Cheslack-Postava <me...@ewencp.org>
Committed: Sun May 8 22:26:26 2016 -0700

----------------------------------------------------------------------
 .../kafka/connect/runtime/AbstractHerder.java   | 11 ++--
 .../kafka/connect/runtime/ConnectorConfig.java  | 20 ++++---
 .../connect/runtime/SinkConnectorConfig.java    | 46 ++++++++++++++++
 .../connect/runtime/SourceConnectorConfig.java  | 27 ++++++++++
 .../runtime/distributed/DistributedHerder.java  | 13 +++--
 .../runtime/standalone/StandaloneHerder.java    | 19 +++++--
 .../kafka/connect/runtime/WorkerTest.java       |  8 +--
 .../distributed/DistributedHerderTest.java      | 10 ++--
 .../standalone/StandaloneHerderTest.java        | 55 ++++++++++++--------
 9 files changed, 155 insertions(+), 54 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/d1bb2b9d/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/AbstractHerder.java
----------------------------------------------------------------------
diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/AbstractHerder.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/AbstractHerder.java
index ee20859..43fc4d1 100644
--- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/AbstractHerder.java
+++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/AbstractHerder.java
@@ -29,6 +29,7 @@ import org.apache.kafka.connect.runtime.rest.entities.ConfigKeyInfo;
 import org.apache.kafka.connect.runtime.rest.entities.ConfigValueInfo;
 import org.apache.kafka.connect.runtime.rest.entities.ConnectorPluginInfo;
 import org.apache.kafka.connect.runtime.rest.entities.ConnectorStateInfo;
+import org.apache.kafka.connect.source.SourceConnector;
 import org.apache.kafka.connect.storage.ConfigBackingStore;
 import org.apache.kafka.connect.storage.StatusBackingStore;
 import org.apache.kafka.connect.tools.VerifiableSinkConnector;
@@ -232,10 +233,14 @@ public abstract class AbstractHerder implements Herder, TaskStatus.Listener, Con
 
     @Override
     public ConfigInfos validateConfigs(String connType, Map<String, String> connectorConfig) {
-        ConfigDef connectorConfigDef = ConnectorConfig.configDef();
-        List<ConfigValue> connectorConfigValues = connectorConfigDef.validate(connectorConfig);
-
         Connector connector = getConnector(connType);
+        ConfigDef connectorConfigDef;
+        if (connector instanceof SourceConnector) {
+            connectorConfigDef = SourceConnectorConfig.configDef();
+        } else {
+            connectorConfigDef = SinkConnectorConfig.configDef();
+        }
+        List<ConfigValue> connectorConfigValues = connectorConfigDef.validate(connectorConfig);
 
         Config config = connector.validate(connectorConfig);
         ConfigDef configDef = connector.config();

http://git-wip-us.apache.org/repos/asf/kafka/blob/d1bb2b9d/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/ConnectorConfig.java
----------------------------------------------------------------------
diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/ConnectorConfig.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/ConnectorConfig.java
index e439552..0cbfe21 100644
--- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/ConnectorConfig.java
+++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/ConnectorConfig.java
@@ -22,12 +22,12 @@ import org.apache.kafka.common.config.ConfigDef;
 import org.apache.kafka.common.config.ConfigDef.Importance;
 import org.apache.kafka.common.config.ConfigDef.Type;
 import org.apache.kafka.common.config.ConfigDef.Width;
-import static org.apache.kafka.common.config.ConfigDef.Range.atLeast;
-
 
 import java.util.HashMap;
 import java.util.Map;
 
+import static org.apache.kafka.common.config.ConfigDef.Range.atLeast;
+
 /**
  * <p>
  * Configuration options for Connectors. These only include Kafka Connect system-level configuration
@@ -40,7 +40,7 @@ import java.util.Map;
  * </p>
  */
 public class ConnectorConfig extends AbstractConfig {
-    private static final String COMMON_GROUP = "Common";
+    protected static final String COMMON_GROUP = "Common";
 
     public static final String NAME_CONFIG = "name";
     private static final String NAME_DOC = "Globally unique name to use for this connector.";
@@ -60,19 +60,13 @@ public class ConnectorConfig extends AbstractConfig {
 
     private static final String TASK_MAX_DISPLAY = "Tasks max";
 
-    public static final String TOPICS_CONFIG = "topics";
-    private static final String TOPICS_DOC = "";
-    public static final String TOPICS_DEFAULT = "";
-    private static final String TOPICS_DISPLAY = "Topics";
-
-    private static ConfigDef config;
+    protected static ConfigDef config;
 
     static {
         config = new ConfigDef()
                 .define(NAME_CONFIG, Type.STRING, Importance.HIGH, NAME_DOC, COMMON_GROUP, 1, Width.MEDIUM, NAME_DISPLAY)
                 .define(CONNECTOR_CLASS_CONFIG, Type.STRING, Importance.HIGH, CONNECTOR_CLASS_DOC, COMMON_GROUP, 2, Width.LONG, CONNECTOR_CLASS_DISPLAY)
-                .define(TASKS_MAX_CONFIG, Type.INT, TASKS_MAX_DEFAULT,  atLeast(TASKS_MIN_CONFIG), Importance.HIGH, TASKS_MAX_DOC, COMMON_GROUP, 3, Width.SHORT, TASK_MAX_DISPLAY)
-                .define(TOPICS_CONFIG, Type.LIST, TOPICS_DEFAULT, Importance.HIGH, TOPICS_DOC, COMMON_GROUP, 4, Width.LONG, TOPICS_DISPLAY);
+                .define(TASKS_MAX_CONFIG, Type.INT, TASKS_MAX_DEFAULT,  atLeast(TASKS_MIN_CONFIG), Importance.HIGH, TASKS_MAX_DOC, COMMON_GROUP, 3, Width.SHORT, TASK_MAX_DISPLAY);
     }
 
     public static ConfigDef configDef() {
@@ -86,4 +80,8 @@ public class ConnectorConfig extends AbstractConfig {
     public ConnectorConfig(Map<String, String> props) {
         super(config, props);
     }
+
+    public ConnectorConfig(ConfigDef subClassConfig, Map<String, String> props) {
+        super(subClassConfig, props);
+    }
 }

http://git-wip-us.apache.org/repos/asf/kafka/blob/d1bb2b9d/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/SinkConnectorConfig.java
----------------------------------------------------------------------
diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/SinkConnectorConfig.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/SinkConnectorConfig.java
new file mode 100644
index 0000000..cbfc6d1
--- /dev/null
+++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/SinkConnectorConfig.java
@@ -0,0 +1,46 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ **/
+
+package org.apache.kafka.connect.runtime;
+
+import org.apache.kafka.common.config.ConfigDef;
+
+import java.util.HashMap;
+import java.util.Map;
+
+/**
+ * Configuration needed for all sink connectors
+ */
+
+public class SinkConnectorConfig extends ConnectorConfig {
+
+    public static final String TOPICS_CONFIG = "topics";
+    private static final String TOPICS_DOC = "";
+    public static final String TOPICS_DEFAULT = "";
+    private static final String TOPICS_DISPLAY = "Topics";
+
+    static ConfigDef config = ConnectorConfig.configDef()
+        .define(TOPICS_CONFIG, ConfigDef.Type.LIST, TOPICS_DEFAULT, ConfigDef.Importance.HIGH, TOPICS_DOC, COMMON_GROUP, 4, ConfigDef.Width.LONG, TOPICS_DISPLAY);
+
+    public SinkConnectorConfig() {
+        this(new HashMap<String, String>());
+    }
+
+    public SinkConnectorConfig(Map<String, String> props) {
+        super(config, props);
+    }
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/d1bb2b9d/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/SourceConnectorConfig.java
----------------------------------------------------------------------
diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/SourceConnectorConfig.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/SourceConnectorConfig.java
new file mode 100644
index 0000000..ca9219f
--- /dev/null
+++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/SourceConnectorConfig.java
@@ -0,0 +1,27 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ **/
+
+package org.apache.kafka.connect.runtime;
+
+import java.util.Map;
+
+public class SourceConnectorConfig extends ConnectorConfig {
+
+    public SourceConnectorConfig(Map<String, String> props) {
+        super(config, props);
+    }
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/d1bb2b9d/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/DistributedHerder.java
----------------------------------------------------------------------
diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/DistributedHerder.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/DistributedHerder.java
index 037eba7..a2beff3 100644
--- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/DistributedHerder.java
+++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/DistributedHerder.java
@@ -28,6 +28,8 @@ import org.apache.kafka.connect.errors.NotFoundException;
 import org.apache.kafka.connect.runtime.AbstractHerder;
 import org.apache.kafka.connect.runtime.ConnectorConfig;
 import org.apache.kafka.connect.runtime.HerderConnectorContext;
+import org.apache.kafka.connect.runtime.SinkConnectorConfig;
+import org.apache.kafka.connect.runtime.SourceConnectorConfig;
 import org.apache.kafka.connect.runtime.TargetState;
 import org.apache.kafka.connect.runtime.TaskConfig;
 import org.apache.kafka.connect.runtime.Worker;
@@ -827,10 +829,15 @@ public class DistributedHerder extends AbstractHerder implements Runnable {
             }
 
             Map<String, String> configs = configState.connectorConfig(connName);
-            ConnectorConfig connConfig = new ConnectorConfig(configs);
+
+            ConnectorConfig connConfig;
             List<String> sinkTopics = null;
-            if (worker.isSinkConnector(connName))
-                sinkTopics = connConfig.getList(ConnectorConfig.TOPICS_CONFIG);
+            if (worker.isSinkConnector(connName)) {
+                connConfig = new SinkConnectorConfig(configs);
+                sinkTopics = connConfig.getList(SinkConnectorConfig.TOPICS_CONFIG);
+            } else {
+                connConfig = new SourceConnectorConfig(configs);
+            }
 
             final List<Map<String, String>> taskProps
                     = worker.connectorTaskConfigs(connName, connConfig.getInt(ConnectorConfig.TASKS_MAX_CONFIG), sinkTopics);

http://git-wip-us.apache.org/repos/asf/kafka/blob/d1bb2b9d/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/standalone/StandaloneHerder.java
----------------------------------------------------------------------
diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/standalone/StandaloneHerder.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/standalone/StandaloneHerder.java
index 2316bae..8dbda74 100644
--- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/standalone/StandaloneHerder.java
+++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/standalone/StandaloneHerder.java
@@ -23,6 +23,8 @@ import org.apache.kafka.connect.errors.NotFoundException;
 import org.apache.kafka.connect.runtime.AbstractHerder;
 import org.apache.kafka.connect.runtime.ConnectorConfig;
 import org.apache.kafka.connect.runtime.HerderConnectorContext;
+import org.apache.kafka.connect.runtime.SinkConnectorConfig;
+import org.apache.kafka.connect.runtime.SourceConnectorConfig;
 import org.apache.kafka.connect.runtime.TargetState;
 import org.apache.kafka.connect.runtime.TaskConfig;
 import org.apache.kafka.connect.runtime.Worker;
@@ -251,11 +253,20 @@ public class StandaloneHerder extends AbstractHerder {
 
     private List<Map<String, String>> recomputeTaskConfigs(String connName) {
         Map<String, String> config = configState.connectorConfig(connName);
-        ConnectorConfig connConfig = new ConnectorConfig(config);
 
-        return worker.connectorTaskConfigs(connName,
-                connConfig.getInt(ConnectorConfig.TASKS_MAX_CONFIG),
-                connConfig.getList(ConnectorConfig.TOPICS_CONFIG));
+        ConnectorConfig connConfig;
+        if (worker.isSinkConnector(connName)) {
+            connConfig = new SinkConnectorConfig(config);
+            return worker.connectorTaskConfigs(connName,
+                                               connConfig.getInt(ConnectorConfig.TASKS_MAX_CONFIG),
+                                               connConfig.getList(SinkConnectorConfig.TOPICS_CONFIG));
+        } else {
+            connConfig = new SourceConnectorConfig(config);
+            return worker.connectorTaskConfigs(connName,
+                                               connConfig.getInt(ConnectorConfig.TASKS_MAX_CONFIG),
+                                               null);
+        }
+
     }
 
     private void createConnectorTasks(String connName, TargetState initialState) {

http://git-wip-us.apache.org/repos/asf/kafka/blob/d1bb2b9d/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerTest.java
----------------------------------------------------------------------
diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerTest.java b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerTest.java
index 2004c99..ec4f025 100644
--- a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerTest.java
+++ b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerTest.java
@@ -97,7 +97,7 @@ public class WorkerTest extends ThreadedTest {
         EasyMock.expect(connector.version()).andReturn("1.0");
 
         Map<String, String> props = new HashMap<>();
-        props.put(ConnectorConfig.TOPICS_CONFIG, "foo,bar");
+        props.put(SinkConnectorConfig.TOPICS_CONFIG, "foo,bar");
         props.put(ConnectorConfig.TASKS_MAX_CONFIG, "1");
         props.put(ConnectorConfig.NAME_CONFIG, CONNECTOR_ID);
         props.put(ConnectorConfig.CONNECTOR_CLASS_CONFIG, WorkerTestConnector.class.getName());
@@ -155,7 +155,7 @@ public class WorkerTest extends ThreadedTest {
         EasyMock.expect(connector.version()).andReturn("1.0");
 
         Map<String, String> props = new HashMap<>();
-        props.put(ConnectorConfig.TOPICS_CONFIG, "foo,bar");
+        props.put(SinkConnectorConfig.TOPICS_CONFIG, "foo,bar");
         props.put(ConnectorConfig.TASKS_MAX_CONFIG, "1");
         props.put(ConnectorConfig.NAME_CONFIG, CONNECTOR_ID);
         props.put(ConnectorConfig.CONNECTOR_CLASS_CONFIG, "WorkerTestConnector");
@@ -208,7 +208,7 @@ public class WorkerTest extends ThreadedTest {
         EasyMock.expect(connector.version()).andReturn("1.0");
 
         Map<String, String> props = new HashMap<>();
-        props.put(ConnectorConfig.TOPICS_CONFIG, "foo,bar");
+        props.put(SinkConnectorConfig.TOPICS_CONFIG, "foo,bar");
         props.put(ConnectorConfig.TASKS_MAX_CONFIG, "1");
         props.put(ConnectorConfig.NAME_CONFIG, CONNECTOR_ID);
         props.put(ConnectorConfig.CONNECTOR_CLASS_CONFIG, "WorkerTest");
@@ -274,7 +274,7 @@ public class WorkerTest extends ThreadedTest {
         EasyMock.expect(connector.version()).andReturn("1.0");
 
         Map<String, String> props = new HashMap<>();
-        props.put(ConnectorConfig.TOPICS_CONFIG, "foo,bar");
+        props.put(SinkConnectorConfig.TOPICS_CONFIG, "foo,bar");
         props.put(ConnectorConfig.TASKS_MAX_CONFIG, "1");
         props.put(ConnectorConfig.NAME_CONFIG, CONNECTOR_ID);
         props.put(ConnectorConfig.CONNECTOR_CLASS_CONFIG, WorkerTestConnector.class.getName());

http://git-wip-us.apache.org/repos/asf/kafka/blob/d1bb2b9d/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/distributed/DistributedHerderTest.java
----------------------------------------------------------------------
diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/distributed/DistributedHerderTest.java b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/distributed/DistributedHerderTest.java
index e62b663..fbccc55 100644
--- a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/distributed/DistributedHerderTest.java
+++ b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/distributed/DistributedHerderTest.java
@@ -24,6 +24,7 @@ import org.apache.kafka.connect.errors.AlreadyExistsException;
 import org.apache.kafka.connect.errors.NotFoundException;
 import org.apache.kafka.connect.runtime.ConnectorConfig;
 import org.apache.kafka.connect.runtime.Herder;
+import org.apache.kafka.connect.runtime.SinkConnectorConfig;
 import org.apache.kafka.connect.runtime.TargetState;
 import org.apache.kafka.connect.runtime.TaskConfig;
 import org.apache.kafka.connect.runtime.Worker;
@@ -96,18 +97,18 @@ public class DistributedHerderTest {
     static {
         CONN1_CONFIG.put(ConnectorConfig.NAME_CONFIG, CONN1);
         CONN1_CONFIG.put(ConnectorConfig.TASKS_MAX_CONFIG, MAX_TASKS.toString());
-        CONN1_CONFIG.put(ConnectorConfig.TOPICS_CONFIG, "foo,bar");
+        CONN1_CONFIG.put(SinkConnectorConfig.TOPICS_CONFIG, "foo,bar");
         CONN1_CONFIG.put(ConnectorConfig.CONNECTOR_CLASS_CONFIG, BogusSourceConnector.class.getName());
     }
     private static final Map<String, String> CONN1_CONFIG_UPDATED = new HashMap<>(CONN1_CONFIG);
     static {
-        CONN1_CONFIG_UPDATED.put(ConnectorConfig.TOPICS_CONFIG, "foo,bar,baz");
+        CONN1_CONFIG_UPDATED.put(SinkConnectorConfig.TOPICS_CONFIG, "foo,bar,baz");
     }
     private static final Map<String, String> CONN2_CONFIG = new HashMap<>();
     static {
         CONN2_CONFIG.put(ConnectorConfig.NAME_CONFIG, CONN2);
         CONN2_CONFIG.put(ConnectorConfig.TASKS_MAX_CONFIG, MAX_TASKS.toString());
-        CONN2_CONFIG.put(ConnectorConfig.TOPICS_CONFIG, "foo,bar");
+        CONN2_CONFIG.put(SinkConnectorConfig.TOPICS_CONFIG, "foo,bar");
         CONN2_CONFIG.put(ConnectorConfig.CONNECTOR_CLASS_CONFIG, BogusSourceConnector.class.getName());
     }
     private static final Map<String, String> TASK_CONFIG = new HashMap<>();
@@ -935,9 +936,6 @@ public class DistributedHerderTest {
         herder.tick();
         assertTrue(connectorConfigCb.isDone());
         assertEquals(CONN1_CONFIG_UPDATED, connectorConfigCb.get());
-        // The config passed to Worker should
-        assertEquals(Arrays.asList("foo", "bar", "baz"),
-                capturedUpdatedConfig.getValue().getList(ConnectorConfig.TOPICS_CONFIG));
         PowerMock.verifyAll();
     }
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/d1bb2b9d/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/standalone/StandaloneHerderTest.java
----------------------------------------------------------------------
diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/standalone/StandaloneHerderTest.java b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/standalone/StandaloneHerderTest.java
index 10e5194..e70b968 100644
--- a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/standalone/StandaloneHerderTest.java
+++ b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/standalone/StandaloneHerderTest.java
@@ -92,7 +92,7 @@ public class StandaloneHerderTest {
 
         PowerMock.replayAll();
 
-        herder.putConnectorConfig(CONNECTOR_NAME, connectorConfig(CONNECTOR_NAME, BogusSourceConnector.class), false, createCallback);
+        herder.putConnectorConfig(CONNECTOR_NAME, connectorConfig(CONNECTOR_NAME, BogusSourceConnector.class, false), false, createCallback);
 
         PowerMock.verifyAll();
     }
@@ -109,8 +109,8 @@ public class StandaloneHerderTest {
 
         PowerMock.replayAll();
 
-        herder.putConnectorConfig(CONNECTOR_NAME, connectorConfig(CONNECTOR_NAME, BogusSourceConnector.class), false, createCallback);
-        herder.putConnectorConfig(CONNECTOR_NAME, connectorConfig(CONNECTOR_NAME, BogusSourceConnector.class), false, createCallback);
+        herder.putConnectorConfig(CONNECTOR_NAME, connectorConfig(CONNECTOR_NAME, BogusSourceConnector.class, false), false, createCallback);
+        herder.putConnectorConfig(CONNECTOR_NAME, connectorConfig(CONNECTOR_NAME, BogusSourceConnector.class, false), false, createCallback);
 
         PowerMock.verifyAll();
     }
@@ -122,7 +122,7 @@ public class StandaloneHerderTest {
 
         PowerMock.replayAll();
 
-        herder.putConnectorConfig(CONNECTOR_NAME, connectorConfig(CONNECTOR_NAME, BogusSinkConnector.class), false, createCallback);
+        herder.putConnectorConfig(CONNECTOR_NAME, connectorConfig(CONNECTOR_NAME, BogusSinkConnector.class, true), false, createCallback);
 
         PowerMock.verifyAll();
     }
@@ -139,7 +139,7 @@ public class StandaloneHerderTest {
 
         PowerMock.replayAll();
 
-        herder.putConnectorConfig(CONNECTOR_NAME, connectorConfig(CONNECTOR_NAME, BogusSourceConnector.class), false, createCallback);
+        herder.putConnectorConfig(CONNECTOR_NAME, connectorConfig(CONNECTOR_NAME, BogusSourceConnector.class, false), false, createCallback);
         FutureCallback<Herder.Created<ConnectorInfo>> futureCb = new FutureCallback<>();
         herder.putConnectorConfig(CONNECTOR_NAME, null, true, futureCb);
         futureCb.get(1000L, TimeUnit.MILLISECONDS);
@@ -164,13 +164,13 @@ public class StandaloneHerderTest {
         worker.stopConnector(CONNECTOR_NAME);
         EasyMock.expectLastCall();
 
-        worker.startConnector(EasyMock.eq(new ConnectorConfig(connectorConfig(CONNECTOR_NAME, BogusSourceConnector.class))),
+        worker.startConnector(EasyMock.eq(new ConnectorConfig(connectorConfig(CONNECTOR_NAME, BogusSourceConnector.class, false))),
                 EasyMock.anyObject(HerderConnectorContext.class), EasyMock.eq(herder), EasyMock.eq(TargetState.STARTED));
         EasyMock.expectLastCall();
 
         PowerMock.replayAll();
 
-        herder.putConnectorConfig(CONNECTOR_NAME, connectorConfig(CONNECTOR_NAME, BogusSourceConnector.class), false, createCallback);
+        herder.putConnectorConfig(CONNECTOR_NAME, connectorConfig(CONNECTOR_NAME, BogusSourceConnector.class, false), false, createCallback);
 
         FutureCallback<Void> cb = new FutureCallback<>();
         herder.restartConnector(CONNECTOR_NAME, cb);
@@ -191,7 +191,7 @@ public class StandaloneHerderTest {
 
         PowerMock.replayAll();
 
-        herder.putConnectorConfig(CONNECTOR_NAME, connectorConfig(CONNECTOR_NAME, BogusSourceConnector.class), false, createCallback);
+        herder.putConnectorConfig(CONNECTOR_NAME, connectorConfig(CONNECTOR_NAME, BogusSourceConnector.class, false), false, createCallback);
 
         FutureCallback<Void> cb = new FutureCallback<>();
         herder.restartConnector(CONNECTOR_NAME, cb);
@@ -213,13 +213,13 @@ public class StandaloneHerderTest {
         EasyMock.expectLastCall();
 
         RuntimeException e = new RuntimeException();
-        worker.startConnector(EasyMock.eq(new ConnectorConfig(connectorConfig(CONNECTOR_NAME, BogusSourceConnector.class))),
+        worker.startConnector(EasyMock.eq(new ConnectorConfig(connectorConfig(CONNECTOR_NAME, BogusSourceConnector.class, false))),
                 EasyMock.anyObject(HerderConnectorContext.class), EasyMock.eq(herder), EasyMock.eq(TargetState.STARTED));
         EasyMock.expectLastCall().andThrow(e);
 
         PowerMock.replayAll();
 
-        herder.putConnectorConfig(CONNECTOR_NAME, connectorConfig(CONNECTOR_NAME, BogusSourceConnector.class), false, createCallback);
+        herder.putConnectorConfig(CONNECTOR_NAME, connectorConfig(CONNECTOR_NAME, BogusSourceConnector.class, false), false, createCallback);
 
         FutureCallback<Void> cb = new FutureCallback<>();
         herder.restartConnector(CONNECTOR_NAME, cb);
@@ -247,7 +247,7 @@ public class StandaloneHerderTest {
 
         PowerMock.replayAll();
 
-        herder.putConnectorConfig(CONNECTOR_NAME, connectorConfig(CONNECTOR_NAME, BogusSourceConnector.class), false, createCallback);
+        herder.putConnectorConfig(CONNECTOR_NAME, connectorConfig(CONNECTOR_NAME, BogusSourceConnector.class, false), false, createCallback);
 
         FutureCallback<Void> cb = new FutureCallback<>();
         herder.restartTask(taskId, cb);
@@ -269,7 +269,7 @@ public class StandaloneHerderTest {
 
         PowerMock.replayAll();
 
-        herder.putConnectorConfig(CONNECTOR_NAME, connectorConfig(CONNECTOR_NAME, BogusSourceConnector.class), false, createCallback);
+        herder.putConnectorConfig(CONNECTOR_NAME, connectorConfig(CONNECTOR_NAME, BogusSourceConnector.class, false), false, createCallback);
 
         FutureCallback<Void> cb = new FutureCallback<>();
         herder.restartTask(taskId, cb);
@@ -297,7 +297,7 @@ public class StandaloneHerderTest {
 
         PowerMock.replayAll();
 
-        herder.putConnectorConfig(CONNECTOR_NAME, connectorConfig(CONNECTOR_NAME, BogusSourceConnector.class), false, createCallback);
+        herder.putConnectorConfig(CONNECTOR_NAME, connectorConfig(CONNECTOR_NAME, BogusSourceConnector.class, false), false, createCallback);
 
         FutureCallback<Void> cb = new FutureCallback<>();
         herder.restartTask(taskId, cb);
@@ -325,7 +325,7 @@ public class StandaloneHerderTest {
 
         PowerMock.replayAll();
 
-        herder.putConnectorConfig(CONNECTOR_NAME, connectorConfig(CONNECTOR_NAME, BogusSourceConnector.class), false, createCallback);
+        herder.putConnectorConfig(CONNECTOR_NAME, connectorConfig(CONNECTOR_NAME, BogusSourceConnector.class, false), false, createCallback);
         herder.stop();
 
         PowerMock.verifyAll();
@@ -333,7 +333,7 @@ public class StandaloneHerderTest {
 
     @Test
     public void testAccessors() throws Exception {
-        Map<String, String> connConfig = connectorConfig(CONNECTOR_NAME, BogusSourceConnector.class);
+        Map<String, String> connConfig = connectorConfig(CONNECTOR_NAME, BogusSourceConnector.class, false);
 
         Callback<Collection<String>> listConnectorsCb = PowerMock.createMock(Callback.class);
         Callback<ConnectorInfo> connectorInfoCb = PowerMock.createMock(Callback.class);
@@ -388,7 +388,7 @@ public class StandaloneHerderTest {
 
     @Test
     public void testPutConnectorConfig() throws Exception {
-        Map<String, String> connConfig = connectorConfig(CONNECTOR_NAME, BogusSourceConnector.class);
+        Map<String, String> connConfig = connectorConfig(CONNECTOR_NAME, BogusSourceConnector.class, false);
         Map<String, String> newConnConfig = new HashMap<>(connConfig);
         newConnConfig.put("foo", "bar");
 
@@ -410,8 +410,10 @@ public class StandaloneHerderTest {
         EasyMock.expectLastCall();
         EasyMock.expect(worker.isRunning(CONNECTOR_NAME)).andReturn(true);
         // Generate same task config, which should result in no additional action to restart tasks
-        EasyMock.expect(worker.connectorTaskConfigs(CONNECTOR_NAME, DEFAULT_MAX_TASKS, TOPICS_LIST))
+        EasyMock.expect(worker.connectorTaskConfigs(CONNECTOR_NAME, DEFAULT_MAX_TASKS, null))
                 .andReturn(Collections.singletonList(taskConfig(BogusSourceTask.class, false)));
+        worker.isSinkConnector(CONNECTOR_NAME);
+        EasyMock.expectLastCall().andReturn(false);
         ConnectorInfo newConnInfo = new ConnectorInfo(CONNECTOR_NAME, newConnConfig, Arrays.asList(new ConnectorTaskId(CONNECTOR_NAME, 0)));
         putConnectorConfigCb.onCompletion(null, new Herder.Created<>(false, newConnInfo));
         EasyMock.expectLastCall();
@@ -448,10 +450,11 @@ public class StandaloneHerderTest {
                            Class<? extends Connector> connClass,
                            Class<? extends Task> taskClass,
                            boolean sink) throws Exception {
-        Map<String, String> connectorProps = connectorConfig(name, connClass);
+
+        Map<String, String> connectorProps = connectorConfig(name, connClass, sink);
 
         worker.startConnector(EasyMock.eq(new ConnectorConfig(connectorProps)), EasyMock.anyObject(HerderConnectorContext.class),
-                EasyMock.eq(herder), EasyMock.eq(TargetState.STARTED));
+                              EasyMock.eq(herder), EasyMock.eq(TargetState.STARTED));
         EasyMock.expectLastCall();
         EasyMock.expect(worker.isRunning(name)).andReturn(true);
 
@@ -462,11 +465,15 @@ public class StandaloneHerderTest {
         // And we should instantiate the tasks. For a sink task, we should see added properties for
         // the input topic partitions
         Map<String, String> generatedTaskProps = taskConfig(taskClass, sink);
-        EasyMock.expect(worker.connectorTaskConfigs(CONNECTOR_NAME, DEFAULT_MAX_TASKS, TOPICS_LIST))
-                .andReturn(Collections.singletonList(generatedTaskProps));
+
+        EasyMock.expect(worker.connectorTaskConfigs(CONNECTOR_NAME, DEFAULT_MAX_TASKS, sink ? TOPICS_LIST : null))
+            .andReturn(Collections.singletonList(generatedTaskProps));
 
         worker.startTask(new ConnectorTaskId(CONNECTOR_NAME, 0), new TaskConfig(generatedTaskProps), herder, TargetState.STARTED);
         EasyMock.expectLastCall();
+
+        worker.isSinkConnector(CONNECTOR_NAME);
+        PowerMock.expectLastCall().andReturn(sink);
     }
 
     private void expectStop() {
@@ -483,11 +490,13 @@ public class StandaloneHerderTest {
         expectStop();
     }
 
-    private static HashMap<String, String> connectorConfig(String name, Class<? extends Connector> connClass) {
+    private static HashMap<String, String> connectorConfig(String name, Class<? extends Connector> connClass, boolean sink) {
         HashMap<String, String> connectorProps = new HashMap<>();
         connectorProps.put(ConnectorConfig.NAME_CONFIG, name);
-        connectorProps.put(SinkConnector.TOPICS_CONFIG, TOPICS_LIST_STR);
         connectorProps.put(ConnectorConfig.CONNECTOR_CLASS_CONFIG, connClass.getName());
+        if (sink) {
+            connectorProps.put(SinkConnector.TOPICS_CONFIG, TOPICS_LIST_STR);
+        }
         return connectorProps;
     }