You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by ew...@apache.org on 2017/11/22 00:01:22 UTC

kafka git commit: KAFKA-3073: Add topic regex support for Connect sinks

Repository: kafka
Updated Branches:
  refs/heads/trunk fd8eb268d -> 049342e44


KAFKA-3073: Add topic regex support for Connect sinks

There are more methods that had to be touched than I anticipated when writing [the KIP](https://cwiki.apache.org/confluence/display/KAFKA/KIP-215%3A+Add+topic+regex+support+for+Connect+sinks).

The implementation here is now complete and includes a test that verifies that there's a call to `consumer.subscribe(Pattern, RebalanceHandler)` when `topics.regex` is provided.

Author: Jeff Klukas <je...@klukas.net>

Reviewers: Randall Hauch <rh...@gmail.com>, Ewen Cheslack-Postava <ew...@confluent.io>

Closes #4151 from jklukas/connect-topics.regex


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/049342e4
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/049342e4
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/049342e4

Branch: refs/heads/trunk
Commit: 049342e440a5ca045771f3eb5b4c72d3e52ffac6
Parents: fd8eb26
Author: Jeff Klukas <je...@klukas.net>
Authored: Tue Nov 21 16:01:16 2017 -0800
Committer: Ewen Cheslack-Postava <me...@ewencp.org>
Committed: Tue Nov 21 16:01:16 2017 -0800

----------------------------------------------------------------------
 .../org/apache/kafka/connect/sink/SinkTask.java |  8 +++++
 .../connect/runtime/SinkConnectorConfig.java    | 16 +++++++--
 .../apache/kafka/connect/runtime/Worker.java    | 14 +++++---
 .../kafka/connect/runtime/WorkerSinkTask.java   | 32 ++++++++++++++---
 .../runtime/distributed/DistributedHerder.java  |  5 +--
 .../runtime/standalone/StandaloneHerder.java    | 16 +++------
 .../connect/runtime/WorkerSinkTaskTest.java     | 37 ++++++++++++++++++++
 .../kafka/connect/runtime/WorkerTest.java       |  5 ++-
 .../distributed/DistributedHerderTest.java      | 37 +++++++++++---------
 .../standalone/StandaloneHerderTest.java        |  9 +++--
 docs/connect.html                               |  5 +--
 11 files changed, 134 insertions(+), 50 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/049342e4/connect/api/src/main/java/org/apache/kafka/connect/sink/SinkTask.java
----------------------------------------------------------------------
diff --git a/connect/api/src/main/java/org/apache/kafka/connect/sink/SinkTask.java b/connect/api/src/main/java/org/apache/kafka/connect/sink/SinkTask.java
index 8abff47..1406b30 100644
--- a/connect/api/src/main/java/org/apache/kafka/connect/sink/SinkTask.java
+++ b/connect/api/src/main/java/org/apache/kafka/connect/sink/SinkTask.java
@@ -61,6 +61,14 @@ public abstract class SinkTask implements Task {
      */
     public static final String TOPICS_CONFIG = "topics";
 
+    /**
+     * <p>
+     * The configuration key that provides a regex specifying which topics to include as inputs
+     * for this SinkTask.
+     * </p>
+     */
+    public static final String TOPICS_REGEX_CONFIG = "topics.regex";
+
     protected SinkTaskContext context;
 
     /**

http://git-wip-us.apache.org/repos/asf/kafka/blob/049342e4/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/SinkConnectorConfig.java
----------------------------------------------------------------------
diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/SinkConnectorConfig.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/SinkConnectorConfig.java
index e47d537..cf5564c 100644
--- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/SinkConnectorConfig.java
+++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/SinkConnectorConfig.java
@@ -18,6 +18,8 @@ package org.apache.kafka.connect.runtime;
 
 import org.apache.kafka.common.config.ConfigDef;
 import org.apache.kafka.connect.runtime.isolation.Plugins;
+import org.apache.kafka.connect.sink.SinkTask;
+import org.apache.kafka.connect.transforms.util.RegexValidator;
 
 import java.util.Map;
 
@@ -27,13 +29,21 @@ import java.util.Map;
 
 public class SinkConnectorConfig extends ConnectorConfig {
 
-    public static final String TOPICS_CONFIG = "topics";
-    private static final String TOPICS_DOC = "";
+    public static final String TOPICS_CONFIG = SinkTask.TOPICS_CONFIG;
+    private static final String TOPICS_DOC = "List of topics to consume, separated by commas";
     public static final String TOPICS_DEFAULT = "";
     private static final String TOPICS_DISPLAY = "Topics";
 
+    private static final String TOPICS_REGEX_CONFIG = SinkTask.TOPICS_REGEX_CONFIG;
+    private static final String TOPICS_REGEX_DOC = "Regular expression giving topics to consume. " +
+        "Under the hood, the regex is compiled to a <code>java.util.regex.Pattern</code>. " +
+        "Only one of " + TOPICS_CONFIG + " or " + TOPICS_REGEX_CONFIG + " should be specified.";
+    public static final String TOPICS_REGEX_DEFAULT = "";
+    private static final String TOPICS_REGEX_DISPLAY = "Topics regex";
+
     static ConfigDef config = ConnectorConfig.configDef()
-        .define(TOPICS_CONFIG, ConfigDef.Type.LIST, TOPICS_DEFAULT, ConfigDef.Importance.HIGH, TOPICS_DOC, COMMON_GROUP, 4, ConfigDef.Width.LONG, TOPICS_DISPLAY);
+        .define(TOPICS_CONFIG, ConfigDef.Type.LIST, TOPICS_DEFAULT, ConfigDef.Importance.HIGH, TOPICS_DOC, COMMON_GROUP, 4, ConfigDef.Width.LONG, TOPICS_DISPLAY)
+        .define(TOPICS_REGEX_CONFIG, ConfigDef.Type.STRING, TOPICS_REGEX_DEFAULT, new RegexValidator(), ConfigDef.Importance.HIGH, TOPICS_REGEX_DOC, COMMON_GROUP, 4, ConfigDef.Width.LONG, TOPICS_REGEX_DISPLAY);
 
     public static ConfigDef configDef() {
         return config;

http://git-wip-us.apache.org/repos/asf/kafka/blob/049342e4/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Worker.java
----------------------------------------------------------------------
diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Worker.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Worker.java
index c6e2e17..992825c 100644
--- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Worker.java
+++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Worker.java
@@ -254,17 +254,18 @@ public class Worker {
      * Get a list of updated task properties for the tasks of this connector.
      *
      * @param connName the connector name.
-     * @param maxTasks the maxinum number of tasks.
-     * @param sinkTopics a list of sink topics.
      * @return a list of updated tasks properties.
      */
-    public List<Map<String, String>> connectorTaskConfigs(String connName, int maxTasks, List<String> sinkTopics) {
+    public List<Map<String, String>> connectorTaskConfigs(String connName, ConnectorConfig connConfig) {
         log.trace("Reconfiguring connector tasks for {}", connName);
 
         WorkerConnector workerConnector = connectors.get(connName);
         if (workerConnector == null)
             throw new ConnectException("Connector " + connName + " not found in this worker.");
 
+        int maxTasks = connConfig.getInt(ConnectorConfig.TASKS_MAX_CONFIG);
+        Map<String, String> connOriginals = connConfig.originalsStrings();
+
         Connector connector = workerConnector.connector();
         List<Map<String, String>> result = new ArrayList<>();
         ClassLoader savedLoader = plugins.currentThreadLoader();
@@ -275,8 +276,11 @@ public class Worker {
                 // Ensure we don't modify the connector's copy of the config
                 Map<String, String> taskConfig = new HashMap<>(taskProps);
                 taskConfig.put(TaskConfig.TASK_CLASS_CONFIG, taskClassName);
-                if (sinkTopics != null) {
-                    taskConfig.put(SinkTask.TOPICS_CONFIG, Utils.join(sinkTopics, ","));
+                if (connOriginals.containsKey(SinkTask.TOPICS_CONFIG)) {
+                    taskConfig.put(SinkTask.TOPICS_CONFIG, connOriginals.get(SinkTask.TOPICS_CONFIG));
+                }
+                if (connOriginals.containsKey(SinkTask.TOPICS_REGEX_CONFIG)) {
+                    taskConfig.put(SinkTask.TOPICS_REGEX_CONFIG, connOriginals.get(SinkTask.TOPICS_REGEX_CONFIG));
                 }
                 result.add(taskConfig);
             }

http://git-wip-us.apache.org/repos/asf/kafka/blob/049342e4/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSinkTask.java
----------------------------------------------------------------------
diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSinkTask.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSinkTask.java
index 234ce8a..05ace58 100644
--- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSinkTask.java
+++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSinkTask.java
@@ -25,6 +25,7 @@ import org.apache.kafka.clients.consumer.OffsetAndMetadata;
 import org.apache.kafka.clients.consumer.OffsetCommitCallback;
 import org.apache.kafka.common.KafkaException;
 import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.config.ConfigException;
 import org.apache.kafka.common.errors.WakeupException;
 import org.apache.kafka.common.metrics.Sensor;
 import org.apache.kafka.common.metrics.stats.Avg;
@@ -53,6 +54,7 @@ import java.util.Collection;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
+import java.util.regex.Pattern;
 
 import static java.util.Collections.singleton;
 
@@ -258,11 +260,31 @@ class WorkerSinkTask extends WorkerTask {
      */
     protected void initializeAndStart() {
         String topicsStr = taskConfig.get(SinkTask.TOPICS_CONFIG);
-        if (topicsStr == null || topicsStr.isEmpty())
-            throw new ConnectException("Sink tasks require a list of topics.");
-        String[] topics = topicsStr.split(",");
-        consumer.subscribe(Arrays.asList(topics), new HandleRebalance());
-        log.debug("{} Initializing and starting task for topics {}", this, topics);
+        boolean topicsStrPresent = topicsStr != null && !topicsStr.trim().isEmpty();
+
+        String topicsRegexStr = taskConfig.get(SinkTask.TOPICS_REGEX_CONFIG);
+        boolean topicsRegexStrPresent = topicsRegexStr != null && !topicsRegexStr.trim().isEmpty();
+
+        if (topicsStrPresent && topicsRegexStrPresent) {
+            throw new ConfigException(SinkTask.TOPICS_CONFIG + " and " + SinkTask.TOPICS_REGEX_CONFIG +
+                " are mutually exclusive options, but both are set.");
+        }
+
+        if (!topicsStrPresent && !topicsRegexStrPresent) {
+            throw new ConfigException("Must configure one of " +
+                SinkTask.TOPICS_CONFIG + " or " + SinkTask.TOPICS_REGEX_CONFIG);
+        }
+
+        if (topicsStrPresent) {
+            String[] topics = topicsStr.split(",");
+            consumer.subscribe(Arrays.asList(topics), new HandleRebalance());
+            log.debug("{} Initializing and starting task for topics {}", this, topics);
+        } else {
+            Pattern pattern = Pattern.compile(topicsRegexStr);
+            consumer.subscribe(pattern, new HandleRebalance());
+            log.debug("{} Initializing and starting task for topics regex {}", this, topicsRegexStr);
+        }
+
         task.initialize(context);
         task.start(taskConfig);
         log.info("{} Sink task finished initialization and start", this);

http://git-wip-us.apache.org/repos/asf/kafka/blob/049342e4/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/DistributedHerder.java
----------------------------------------------------------------------
diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/DistributedHerder.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/DistributedHerder.java
index 79d32da..a1cc56a 100644
--- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/DistributedHerder.java
+++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/DistributedHerder.java
@@ -974,16 +974,13 @@ public class DistributedHerder extends AbstractHerder implements Runnable {
             Map<String, String> configs = configState.connectorConfig(connName);
 
             ConnectorConfig connConfig;
-            List<String> sinkTopics = null;
             if (worker.isSinkConnector(connName)) {
                 connConfig = new SinkConnectorConfig(plugins(), configs);
-                sinkTopics = connConfig.getList(SinkConnectorConfig.TOPICS_CONFIG);
             } else {
                 connConfig = new SourceConnectorConfig(plugins(), configs);
             }
 
-            final List<Map<String, String>> taskProps
-                    = worker.connectorTaskConfigs(connName, connConfig.getInt(ConnectorConfig.TASKS_MAX_CONFIG), sinkTopics);
+            final List<Map<String, String>> taskProps = worker.connectorTaskConfigs(connName, connConfig);
             boolean changed = false;
             int currentNumTasks = configState.taskCount(connName);
             if (taskProps.size() != currentNumTasks) {

http://git-wip-us.apache.org/repos/asf/kafka/blob/049342e4/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/standalone/StandaloneHerder.java
----------------------------------------------------------------------
diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/standalone/StandaloneHerder.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/standalone/StandaloneHerder.java
index 5d8beab..e9ec0f9 100644
--- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/standalone/StandaloneHerder.java
+++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/standalone/StandaloneHerder.java
@@ -255,19 +255,11 @@ public class StandaloneHerder extends AbstractHerder {
     private List<Map<String, String>> recomputeTaskConfigs(String connName) {
         Map<String, String> config = configState.connectorConfig(connName);
 
-        ConnectorConfig connConfig;
-        if (worker.isSinkConnector(connName)) {
-            connConfig = new SinkConnectorConfig(plugins(), config);
-            return worker.connectorTaskConfigs(connName,
-                                               connConfig.getInt(ConnectorConfig.TASKS_MAX_CONFIG),
-                                               connConfig.getList(SinkConnectorConfig.TOPICS_CONFIG));
-        } else {
-            connConfig = new SourceConnectorConfig(plugins(), config);
-            return worker.connectorTaskConfigs(connName,
-                                               connConfig.getInt(ConnectorConfig.TASKS_MAX_CONFIG),
-                                               null);
-        }
+        ConnectorConfig connConfig = worker.isSinkConnector(connName) ?
+            new SinkConnectorConfig(plugins(), config) :
+            new SourceConnectorConfig(plugins(), config);
 
+        return worker.connectorTaskConfigs(connName, connConfig);
     }
 
     private void createConnectorTasks(String connName, TargetState initialState) {

http://git-wip-us.apache.org/repos/asf/kafka/blob/049342e4/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerSinkTaskTest.java
----------------------------------------------------------------------
diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerSinkTaskTest.java b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerSinkTaskTest.java
index 50b091d..48d8740 100644
--- a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerSinkTaskTest.java
+++ b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerSinkTaskTest.java
@@ -67,6 +67,7 @@ import java.util.concurrent.Executors;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicReference;
+import java.util.regex.Pattern;
 
 import static java.util.Arrays.asList;
 import static java.util.Collections.singleton;
@@ -127,6 +128,7 @@ public class WorkerSinkTaskTest {
     @Mock
     private KafkaConsumer<byte[], byte[]> consumer;
     private Capture<ConsumerRebalanceListener> rebalanceListener = EasyMock.newCapture();
+    private Capture<Pattern> topicsRegex = EasyMock.newCapture();
 
     private long recordsReturnedTp1;
     private long recordsReturnedTp3;
@@ -1143,6 +1145,41 @@ public class WorkerSinkTaskTest {
         PowerMock.verifyAll();
     }
 
+    @Test
+    public void testTopicsRegex() throws Exception {
+        Map<String, String> props = new HashMap<>(TASK_PROPS);
+        props.remove("topics");
+        props.put("topics.regex", "te.*");
+        TaskConfig taskConfig = new TaskConfig(props);
+
+        createTask(TargetState.PAUSED);
+
+        PowerMock.expectPrivate(workerTask, "createConsumer").andReturn(consumer);
+        consumer.subscribe(EasyMock.capture(topicsRegex), EasyMock.capture(rebalanceListener));
+        PowerMock.expectLastCall();
+
+        sinkTask.initialize(EasyMock.capture(sinkTaskContext));
+        PowerMock.expectLastCall();
+        sinkTask.start(props);
+        PowerMock.expectLastCall();
+
+        expectPollInitialAssignment();
+
+        Set<TopicPartition> partitions = new HashSet<>(asList(TOPIC_PARTITION, TOPIC_PARTITION2));
+        EasyMock.expect(consumer.assignment()).andReturn(partitions);
+        consumer.pause(partitions);
+        PowerMock.expectLastCall();
+
+        PowerMock.replayAll();
+
+        workerTask.initialize(taskConfig);
+        workerTask.initializeAndStart();
+        workerTask.iteration();
+        time.sleep(10000L);
+
+        PowerMock.verifyAll();
+    }
+
     private void expectInitializeTask() throws Exception {
         PowerMock.expectPrivate(workerTask, "createConsumer").andReturn(consumer);
         consumer.subscribe(EasyMock.eq(asList(TOPIC)), EasyMock.capture(rebalanceListener));

http://git-wip-us.apache.org/repos/asf/kafka/blob/049342e4/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerTest.java
----------------------------------------------------------------------
diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerTest.java b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerTest.java
index 80c65df..e78ccc8 100644
--- a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerTest.java
+++ b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerTest.java
@@ -443,7 +443,10 @@ public class WorkerTest extends ThreadedTest {
         } catch (ConnectException e) {
             // expected
         }
-        List<Map<String, String>> taskConfigs = worker.connectorTaskConfigs(CONNECTOR_ID, 2, Arrays.asList("foo", "bar"));
+        Map<String, String> connProps = new HashMap<>(props);
+        connProps.put(ConnectorConfig.TASKS_MAX_CONFIG, "2");
+        ConnectorConfig connConfig = new SinkConnectorConfig(plugins, connProps);
+        List<Map<String, String>> taskConfigs = worker.connectorTaskConfigs(CONNECTOR_ID, connConfig);
         Map<String, String> expectedTaskProps = new HashMap<>();
         expectedTaskProps.put("foo", "bar");
         expectedTaskProps.put(TaskConfig.TASK_CLASS_CONFIG, TestSourceTask.class.getName());

http://git-wip-us.apache.org/repos/asf/kafka/blob/049342e4/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/distributed/DistributedHerderTest.java
----------------------------------------------------------------------
diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/distributed/DistributedHerderTest.java b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/distributed/DistributedHerderTest.java
index 7483261..d41ccbe 100644
--- a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/distributed/DistributedHerderTest.java
+++ b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/distributed/DistributedHerderTest.java
@@ -168,12 +168,15 @@ public class DistributedHerderTest {
     private ConfigBackingStore.UpdateListener configUpdateListener;
     private WorkerRebalanceListener rebalanceListener;
 
+    private SinkConnectorConfig conn1SinkConfig;
+    private SinkConnectorConfig conn1SinkConfigUpdated;
+
     @Before
     public void setUp() throws Exception {
         time = new MockTime();
         metrics = new MockConnectMetrics(time);
         worker = PowerMock.createMock(Worker.class);
-        EasyMock.expect(worker.isSinkConnector(CONN1)).andStubReturn(Boolean.FALSE);
+        EasyMock.expect(worker.isSinkConnector(CONN1)).andStubReturn(Boolean.TRUE);
 
         herder = PowerMock.createPartialMock(DistributedHerder.class, new String[]{"backoff", "connectorTypeForClass", "updateDeletedConnectorStatus"},
                 new DistributedConfig(HERDER_CONFIG), worker, WORKER_ID, statusBackingStore, configBackingStore, member, MEMBER_URL, metrics, time);
@@ -181,6 +184,8 @@ public class DistributedHerderTest {
         configUpdateListener = herder.new ConfigUpdateListener();
         rebalanceListener = herder.new RebalanceListener();
         plugins = PowerMock.createMock(Plugins.class);
+        conn1SinkConfig = new SinkConnectorConfig(plugins, CONN1_CONFIG);
+        conn1SinkConfigUpdated = new SinkConnectorConfig(plugins, CONN1_CONFIG_UPDATED);
         EasyMock.expect(herder.connectorTypeForClass(BogusSourceConnector.class.getName())).andReturn(ConnectorType.SOURCE).anyTimes();
         pluginLoader = PowerMock.createMock(PluginClassLoader.class);
         delegatingLoader = PowerMock.createMock(DelegatingClassLoader.class);
@@ -205,7 +210,7 @@ public class DistributedHerderTest {
         PowerMock.expectLastCall().andReturn(true);
         EasyMock.expect(worker.isRunning(CONN1)).andReturn(true);
 
-        EasyMock.expect(worker.connectorTaskConfigs(CONN1, MAX_TASKS, null)).andReturn(TASK_CONFIGS);
+        EasyMock.expect(worker.connectorTaskConfigs(CONN1, conn1SinkConfig)).andReturn(TASK_CONFIGS);
         worker.startTask(EasyMock.eq(TASK1), EasyMock.<Map<String, String>>anyObject(), EasyMock.<Map<String, String>>anyObject(),
                 EasyMock.eq(herder), EasyMock.eq(TargetState.STARTED));
         PowerMock.expectLastCall().andReturn(true);
@@ -232,7 +237,7 @@ public class DistributedHerderTest {
                 EasyMock.eq(herder), EasyMock.eq(TargetState.STARTED));
         PowerMock.expectLastCall().andReturn(true);
         EasyMock.expect(worker.isRunning(CONN1)).andReturn(true);
-        EasyMock.expect(worker.connectorTaskConfigs(CONN1, MAX_TASKS, null)).andReturn(TASK_CONFIGS);
+        EasyMock.expect(worker.connectorTaskConfigs(CONN1, conn1SinkConfig)).andReturn(TASK_CONFIGS);
         worker.startTask(EasyMock.eq(TASK1), EasyMock.<Map<String, String>>anyObject(), EasyMock.<Map<String, String>>anyObject(),
                 EasyMock.eq(herder), EasyMock.eq(TargetState.STARTED));
         PowerMock.expectLastCall().andReturn(true);
@@ -248,7 +253,7 @@ public class DistributedHerderTest {
                 EasyMock.eq(herder), EasyMock.eq(TargetState.STARTED));
         PowerMock.expectLastCall().andReturn(true);
         EasyMock.expect(worker.isRunning(CONN1)).andReturn(true);
-        EasyMock.expect(worker.connectorTaskConfigs(CONN1, MAX_TASKS, null)).andReturn(TASK_CONFIGS);
+        EasyMock.expect(worker.connectorTaskConfigs(CONN1, conn1SinkConfig)).andReturn(TASK_CONFIGS);
         member.poll(EasyMock.anyInt());
         PowerMock.expectLastCall();
 
@@ -279,7 +284,7 @@ public class DistributedHerderTest {
                 EasyMock.eq(herder), EasyMock.eq(TargetState.STARTED));
         PowerMock.expectLastCall().andReturn(true);
         EasyMock.expect(worker.isRunning(CONN1)).andReturn(true);
-        EasyMock.expect(worker.connectorTaskConfigs(CONN1, MAX_TASKS, null)).andReturn(TASK_CONFIGS);
+        EasyMock.expect(worker.connectorTaskConfigs(CONN1, conn1SinkConfig)).andReturn(TASK_CONFIGS);
         worker.startTask(EasyMock.eq(TASK1), EasyMock.<Map<String, String>>anyObject(), EasyMock.<Map<String, String>>anyObject(),
                 EasyMock.eq(herder), EasyMock.eq(TargetState.STARTED));
         PowerMock.expectLastCall().andReturn(true);
@@ -558,7 +563,7 @@ public class DistributedHerderTest {
                 EasyMock.eq(herder), EasyMock.eq(TargetState.STARTED));
         PowerMock.expectLastCall().andReturn(true);
         EasyMock.expect(worker.isRunning(CONN1)).andReturn(true);
-        EasyMock.expect(worker.connectorTaskConfigs(CONN1, MAX_TASKS, null)).andReturn(TASK_CONFIGS);
+        EasyMock.expect(worker.connectorTaskConfigs(CONN1, conn1SinkConfig)).andReturn(TASK_CONFIGS);
 
         // And delete the connector
         member.wakeup();
@@ -584,7 +589,7 @@ public class DistributedHerderTest {
 
     @Test
     public void testRestartConnector() throws Exception {
-        EasyMock.expect(worker.connectorTaskConfigs(CONN1, MAX_TASKS, null)).andStubReturn(TASK_CONFIGS);
+        EasyMock.expect(worker.connectorTaskConfigs(CONN1, conn1SinkConfig)).andStubReturn(TASK_CONFIGS);
 
         // get the initial assignment
         EasyMock.expect(member.memberId()).andStubReturn("leader");
@@ -740,7 +745,7 @@ public class DistributedHerderTest {
 
     @Test
     public void testRestartTask() throws Exception {
-        EasyMock.expect(worker.connectorTaskConfigs(CONN1, MAX_TASKS, null)).andStubReturn(TASK_CONFIGS);
+        EasyMock.expect(worker.connectorTaskConfigs(CONN1, conn1SinkConfig)).andStubReturn(TASK_CONFIGS);
 
         // get the initial assignment
         EasyMock.expect(member.memberId()).andStubReturn("leader");
@@ -921,7 +926,7 @@ public class DistributedHerderTest {
         PowerMock.expectLastCall().andReturn(true);
         EasyMock.expect(worker.getPlugins()).andReturn(plugins);
         EasyMock.expect(worker.isRunning(CONN1)).andReturn(true);
-        EasyMock.expect(worker.connectorTaskConfigs(CONN1, MAX_TASKS, null)).andReturn(TASK_CONFIGS);
+        EasyMock.expect(worker.connectorTaskConfigs(CONN1, conn1SinkConfig)).andReturn(TASK_CONFIGS);
         member.poll(EasyMock.anyInt());
         PowerMock.expectLastCall();
 
@@ -950,7 +955,7 @@ public class DistributedHerderTest {
         PowerMock.expectLastCall().andReturn(true);
         EasyMock.expect(worker.getPlugins()).andReturn(plugins);
         EasyMock.expect(worker.isRunning(CONN1)).andReturn(true);
-        EasyMock.expect(worker.connectorTaskConfigs(CONN1, MAX_TASKS, null)).andReturn(TASK_CONFIGS);
+        EasyMock.expect(worker.connectorTaskConfigs(CONN1, conn1SinkConfig)).andReturn(TASK_CONFIGS);
         member.poll(EasyMock.anyInt());
         PowerMock.expectLastCall();
 
@@ -966,7 +971,7 @@ public class DistributedHerderTest {
         PowerMock.expectLastCall().andReturn(true);
         EasyMock.expect(worker.getPlugins()).andReturn(plugins);
         EasyMock.expect(worker.isRunning(CONN1)).andReturn(true);
-        EasyMock.expect(worker.connectorTaskConfigs(CONN1, MAX_TASKS, null)).andReturn(TASK_CONFIGS);
+        EasyMock.expect(worker.connectorTaskConfigs(CONN1, conn1SinkConfig)).andReturn(TASK_CONFIGS);
         member.poll(EasyMock.anyInt());
         PowerMock.expectLastCall();
 
@@ -994,7 +999,7 @@ public class DistributedHerderTest {
         PowerMock.expectLastCall().andReturn(true);
         EasyMock.expect(worker.getPlugins()).andReturn(plugins);
         EasyMock.expect(worker.isRunning(CONN1)).andReturn(true);
-        EasyMock.expect(worker.connectorTaskConfigs(CONN1, MAX_TASKS, null)).andReturn(TASK_CONFIGS);
+        EasyMock.expect(worker.connectorTaskConfigs(CONN1, conn1SinkConfig)).andReturn(TASK_CONFIGS);
         member.poll(EasyMock.anyInt());
         PowerMock.expectLastCall();
 
@@ -1047,7 +1052,7 @@ public class DistributedHerderTest {
 
         // we expect reconfiguration after resuming
         EasyMock.expect(worker.isRunning(CONN1)).andReturn(true);
-        EasyMock.expect(worker.connectorTaskConfigs(CONN1, MAX_TASKS, null)).andReturn(TASK_CONFIGS);
+        EasyMock.expect(worker.connectorTaskConfigs(CONN1, conn1SinkConfig)).andReturn(TASK_CONFIGS);
 
         worker.setTargetState(CONN1, TargetState.STARTED);
         PowerMock.expectLastCall();
@@ -1241,7 +1246,7 @@ public class DistributedHerderTest {
                 EasyMock.eq(herder), EasyMock.eq(TargetState.STARTED));
         PowerMock.expectLastCall().andReturn(true);
         EasyMock.expect(worker.getPlugins()).andReturn(plugins);
-        EasyMock.expect(worker.connectorTaskConfigs(CONN1, MAX_TASKS, null)).andReturn(TASK_CONFIGS);
+        EasyMock.expect(worker.connectorTaskConfigs(CONN1, conn1SinkConfig)).andReturn(TASK_CONFIGS);
         worker.startTask(EasyMock.eq(TASK1), EasyMock.<Map<String, String>>anyObject(), EasyMock.<Map<String, String>>anyObject(),
                 EasyMock.eq(herder), EasyMock.eq(TargetState.STARTED));
         PowerMock.expectLastCall().andReturn(true);
@@ -1317,7 +1322,7 @@ public class DistributedHerderTest {
                 EasyMock.eq(herder), EasyMock.eq(TargetState.STARTED));
         PowerMock.expectLastCall().andReturn(true);
         EasyMock.expect(worker.isRunning(CONN1)).andReturn(true);
-        EasyMock.expect(worker.connectorTaskConfigs(CONN1, MAX_TASKS, null)).andReturn(TASK_CONFIGS);
+        EasyMock.expect(worker.connectorTaskConfigs(CONN1, conn1SinkConfig)).andReturn(TASK_CONFIGS);
 
         // list connectors, get connector info, get connector config, get task configs
         member.wakeup();
@@ -1356,7 +1361,7 @@ public class DistributedHerderTest {
                 EasyMock.eq(herder), EasyMock.eq(TargetState.STARTED));
         PowerMock.expectLastCall().andReturn(true);
         EasyMock.expect(worker.isRunning(CONN1)).andReturn(true);
-        EasyMock.expect(worker.connectorTaskConfigs(CONN1, MAX_TASKS, null)).andReturn(TASK_CONFIGS);
+        EasyMock.expect(worker.connectorTaskConfigs(CONN1, conn1SinkConfigUpdated)).andReturn(TASK_CONFIGS);
 
         member.poll(EasyMock.anyInt());
         PowerMock.expectLastCall();

http://git-wip-us.apache.org/repos/asf/kafka/blob/049342e4/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/standalone/StandaloneHerderTest.java
----------------------------------------------------------------------
diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/standalone/StandaloneHerderTest.java b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/standalone/StandaloneHerderTest.java
index 1cd1804..18d2739 100644
--- a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/standalone/StandaloneHerderTest.java
+++ b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/standalone/StandaloneHerderTest.java
@@ -30,6 +30,8 @@ import org.apache.kafka.connect.runtime.ConnectorConfig;
 import org.apache.kafka.connect.runtime.ConnectorStatus;
 import org.apache.kafka.connect.runtime.Herder;
 import org.apache.kafka.connect.runtime.HerderConnectorContext;
+import org.apache.kafka.connect.runtime.SinkConnectorConfig;
+import org.apache.kafka.connect.runtime.SourceConnectorConfig;
 import org.apache.kafka.connect.runtime.TargetState;
 import org.apache.kafka.connect.runtime.TaskConfig;
 import org.apache.kafka.connect.runtime.TaskStatus;
@@ -479,7 +481,7 @@ public class StandaloneHerderTest {
         EasyMock.expectLastCall().andReturn(true);
         EasyMock.expect(worker.isRunning(CONNECTOR_NAME)).andReturn(true);
         // Generate same task config, which should result in no additional action to restart tasks
-        EasyMock.expect(worker.connectorTaskConfigs(CONNECTOR_NAME, DEFAULT_MAX_TASKS, null))
+        EasyMock.expect(worker.connectorTaskConfigs(CONNECTOR_NAME, new SourceConnectorConfig(plugins, newConnConfig)))
                 .andReturn(singletonList(taskConfig(SourceSink.SOURCE)));
         worker.isSinkConnector(CONNECTOR_NAME);
         EasyMock.expectLastCall().andReturn(false);
@@ -562,6 +564,9 @@ public class StandaloneHerderTest {
     private void expectAdd(SourceSink sourceSink) throws Exception {
 
         Map<String, String> connectorProps = connectorConfig(sourceSink);
+        ConnectorConfig connConfig = sourceSink == SourceSink.SOURCE ?
+            new SourceConnectorConfig(plugins, connectorProps) :
+            new SinkConnectorConfig(plugins, connectorProps);
 
         worker.startConnector(EasyMock.eq(CONNECTOR_NAME), EasyMock.eq(connectorProps), EasyMock.anyObject(HerderConnectorContext.class),
                               EasyMock.eq(herder), EasyMock.eq(TargetState.STARTED));
@@ -577,7 +582,7 @@ public class StandaloneHerderTest {
 
         Map<String, String> generatedTaskProps = taskConfig(sourceSink);
 
-        EasyMock.expect(worker.connectorTaskConfigs(CONNECTOR_NAME, DEFAULT_MAX_TASKS, sourceSink == SourceSink.SINK ? TOPICS_LIST : null))
+        EasyMock.expect(worker.connectorTaskConfigs(CONNECTOR_NAME, connConfig))
             .andReturn(singletonList(generatedTaskProps));
 
         worker.startTask(new ConnectorTaskId(CONNECTOR_NAME, 0), connectorConfig(sourceSink), generatedTaskProps, herder, TargetState.STARTED);

http://git-wip-us.apache.org/repos/asf/kafka/blob/049342e4/docs/connect.html
----------------------------------------------------------------------
diff --git a/docs/connect.html b/docs/connect.html
index 78c66b1..b910cf5 100644
--- a/docs/connect.html
+++ b/docs/connect.html
@@ -95,9 +95,10 @@
 
     <p>The <code>connector.class</code> config supports several formats: the full name or alias of the class for this connector. If the connector is org.apache.kafka.connect.file.FileStreamSinkConnector, you can either specify this full name or use FileStreamSink or FileStreamSinkConnector to make the configuration a bit shorter.</p>
 
-    <p>Sink connectors also have one additional option to control their input:</p>
+    <p>Sink connectors also have a few additional options to control their input. Each sink connector must set one of the following:</p>
     <ul>
-        <li><code>topics</code> - A list of topics to use as input for this connector</li>
+        <li><code>topics</code> - A comma-separated list of topics to use as input for this connector</li>
+        <li><code>topics.regex</code> - A Java regular expression of topics to use as input for this connector</li>
     </ul>
 
     <p>For any other options, you should consult the documentation for the connector.</p>