You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by ew...@apache.org on 2017/11/22 00:01:22 UTC
kafka git commit: KAFKA-3073: Add topic regex support for Connect
sinks
Repository: kafka
Updated Branches:
refs/heads/trunk fd8eb268d -> 049342e44
KAFKA-3073: Add topic regex support for Connect sinks
There are more methods that had to be touched than I anticipated when writing [the KIP](https://cwiki.apache.org/confluence/display/KAFKA/KIP-215%3A+Add+topic+regex+support+for+Connect+sinks).
The implementation here is now complete and includes a test that verifies that there's a call to `consumer.subscribe(Pattern, RebalanceHandler)` when `topics.regex` is provided.
Author: Jeff Klukas <je...@klukas.net>
Reviewers: Randall Hauch <rh...@gmail.com>, Ewen Cheslack-Postava <ew...@confluent.io>
Closes #4151 from jklukas/connect-topics.regex
Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/049342e4
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/049342e4
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/049342e4
Branch: refs/heads/trunk
Commit: 049342e440a5ca045771f3eb5b4c72d3e52ffac6
Parents: fd8eb26
Author: Jeff Klukas <je...@klukas.net>
Authored: Tue Nov 21 16:01:16 2017 -0800
Committer: Ewen Cheslack-Postava <me...@ewencp.org>
Committed: Tue Nov 21 16:01:16 2017 -0800
----------------------------------------------------------------------
.../org/apache/kafka/connect/sink/SinkTask.java | 8 +++++
.../connect/runtime/SinkConnectorConfig.java | 16 +++++++--
.../apache/kafka/connect/runtime/Worker.java | 14 +++++---
.../kafka/connect/runtime/WorkerSinkTask.java | 32 ++++++++++++++---
.../runtime/distributed/DistributedHerder.java | 5 +--
.../runtime/standalone/StandaloneHerder.java | 16 +++------
.../connect/runtime/WorkerSinkTaskTest.java | 37 ++++++++++++++++++++
.../kafka/connect/runtime/WorkerTest.java | 5 ++-
.../distributed/DistributedHerderTest.java | 37 +++++++++++---------
.../standalone/StandaloneHerderTest.java | 9 +++--
docs/connect.html | 5 +--
11 files changed, 134 insertions(+), 50 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/kafka/blob/049342e4/connect/api/src/main/java/org/apache/kafka/connect/sink/SinkTask.java
----------------------------------------------------------------------
diff --git a/connect/api/src/main/java/org/apache/kafka/connect/sink/SinkTask.java b/connect/api/src/main/java/org/apache/kafka/connect/sink/SinkTask.java
index 8abff47..1406b30 100644
--- a/connect/api/src/main/java/org/apache/kafka/connect/sink/SinkTask.java
+++ b/connect/api/src/main/java/org/apache/kafka/connect/sink/SinkTask.java
@@ -61,6 +61,14 @@ public abstract class SinkTask implements Task {
*/
public static final String TOPICS_CONFIG = "topics";
+ /**
+ * <p>
+ * The configuration key that provides a regex specifying which topics to include as inputs
+ * for this SinkTask.
+ * </p>
+ */
+ public static final String TOPICS_REGEX_CONFIG = "topics.regex";
+
protected SinkTaskContext context;
/**
http://git-wip-us.apache.org/repos/asf/kafka/blob/049342e4/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/SinkConnectorConfig.java
----------------------------------------------------------------------
diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/SinkConnectorConfig.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/SinkConnectorConfig.java
index e47d537..cf5564c 100644
--- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/SinkConnectorConfig.java
+++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/SinkConnectorConfig.java
@@ -18,6 +18,8 @@ package org.apache.kafka.connect.runtime;
import org.apache.kafka.common.config.ConfigDef;
import org.apache.kafka.connect.runtime.isolation.Plugins;
+import org.apache.kafka.connect.sink.SinkTask;
+import org.apache.kafka.connect.transforms.util.RegexValidator;
import java.util.Map;
@@ -27,13 +29,21 @@ import java.util.Map;
public class SinkConnectorConfig extends ConnectorConfig {
- public static final String TOPICS_CONFIG = "topics";
- private static final String TOPICS_DOC = "";
+ public static final String TOPICS_CONFIG = SinkTask.TOPICS_CONFIG;
+ private static final String TOPICS_DOC = "List of topics to consume, separated by commas";
public static final String TOPICS_DEFAULT = "";
private static final String TOPICS_DISPLAY = "Topics";
+ private static final String TOPICS_REGEX_CONFIG = SinkTask.TOPICS_REGEX_CONFIG;
+ private static final String TOPICS_REGEX_DOC = "Regular expression giving topics to consume. " +
+ "Under the hood, the regex is compiled to a <code>java.util.regex.Pattern</code>. " +
+ "Only one of " + TOPICS_CONFIG + " or " + TOPICS_REGEX_CONFIG + " should be specified.";
+ public static final String TOPICS_REGEX_DEFAULT = "";
+ private static final String TOPICS_REGEX_DISPLAY = "Topics regex";
+
static ConfigDef config = ConnectorConfig.configDef()
- .define(TOPICS_CONFIG, ConfigDef.Type.LIST, TOPICS_DEFAULT, ConfigDef.Importance.HIGH, TOPICS_DOC, COMMON_GROUP, 4, ConfigDef.Width.LONG, TOPICS_DISPLAY);
+ .define(TOPICS_CONFIG, ConfigDef.Type.LIST, TOPICS_DEFAULT, ConfigDef.Importance.HIGH, TOPICS_DOC, COMMON_GROUP, 4, ConfigDef.Width.LONG, TOPICS_DISPLAY)
+ .define(TOPICS_REGEX_CONFIG, ConfigDef.Type.STRING, TOPICS_REGEX_DEFAULT, new RegexValidator(), ConfigDef.Importance.HIGH, TOPICS_REGEX_DOC, COMMON_GROUP, 4, ConfigDef.Width.LONG, TOPICS_REGEX_DISPLAY);
public static ConfigDef configDef() {
return config;
http://git-wip-us.apache.org/repos/asf/kafka/blob/049342e4/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Worker.java
----------------------------------------------------------------------
diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Worker.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Worker.java
index c6e2e17..992825c 100644
--- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Worker.java
+++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Worker.java
@@ -254,17 +254,18 @@ public class Worker {
* Get a list of updated task properties for the tasks of this connector.
*
* @param connName the connector name.
- * @param maxTasks the maxinum number of tasks.
- * @param sinkTopics a list of sink topics.
* @return a list of updated tasks properties.
*/
- public List<Map<String, String>> connectorTaskConfigs(String connName, int maxTasks, List<String> sinkTopics) {
+ public List<Map<String, String>> connectorTaskConfigs(String connName, ConnectorConfig connConfig) {
log.trace("Reconfiguring connector tasks for {}", connName);
WorkerConnector workerConnector = connectors.get(connName);
if (workerConnector == null)
throw new ConnectException("Connector " + connName + " not found in this worker.");
+ int maxTasks = connConfig.getInt(ConnectorConfig.TASKS_MAX_CONFIG);
+ Map<String, String> connOriginals = connConfig.originalsStrings();
+
Connector connector = workerConnector.connector();
List<Map<String, String>> result = new ArrayList<>();
ClassLoader savedLoader = plugins.currentThreadLoader();
@@ -275,8 +276,11 @@ public class Worker {
// Ensure we don't modify the connector's copy of the config
Map<String, String> taskConfig = new HashMap<>(taskProps);
taskConfig.put(TaskConfig.TASK_CLASS_CONFIG, taskClassName);
- if (sinkTopics != null) {
- taskConfig.put(SinkTask.TOPICS_CONFIG, Utils.join(sinkTopics, ","));
+ if (connOriginals.containsKey(SinkTask.TOPICS_CONFIG)) {
+ taskConfig.put(SinkTask.TOPICS_CONFIG, connOriginals.get(SinkTask.TOPICS_CONFIG));
+ }
+ if (connOriginals.containsKey(SinkTask.TOPICS_REGEX_CONFIG)) {
+ taskConfig.put(SinkTask.TOPICS_REGEX_CONFIG, connOriginals.get(SinkTask.TOPICS_REGEX_CONFIG));
}
result.add(taskConfig);
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/049342e4/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSinkTask.java
----------------------------------------------------------------------
diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSinkTask.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSinkTask.java
index 234ce8a..05ace58 100644
--- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSinkTask.java
+++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSinkTask.java
@@ -25,6 +25,7 @@ import org.apache.kafka.clients.consumer.OffsetAndMetadata;
import org.apache.kafka.clients.consumer.OffsetCommitCallback;
import org.apache.kafka.common.KafkaException;
import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.config.ConfigException;
import org.apache.kafka.common.errors.WakeupException;
import org.apache.kafka.common.metrics.Sensor;
import org.apache.kafka.common.metrics.stats.Avg;
@@ -53,6 +54,7 @@ import java.util.Collection;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
+import java.util.regex.Pattern;
import static java.util.Collections.singleton;
@@ -258,11 +260,31 @@ class WorkerSinkTask extends WorkerTask {
*/
protected void initializeAndStart() {
String topicsStr = taskConfig.get(SinkTask.TOPICS_CONFIG);
- if (topicsStr == null || topicsStr.isEmpty())
- throw new ConnectException("Sink tasks require a list of topics.");
- String[] topics = topicsStr.split(",");
- consumer.subscribe(Arrays.asList(topics), new HandleRebalance());
- log.debug("{} Initializing and starting task for topics {}", this, topics);
+ boolean topicsStrPresent = topicsStr != null && !topicsStr.trim().isEmpty();
+
+ String topicsRegexStr = taskConfig.get(SinkTask.TOPICS_REGEX_CONFIG);
+ boolean topicsRegexStrPresent = topicsRegexStr != null && !topicsRegexStr.trim().isEmpty();
+
+ if (topicsStrPresent && topicsRegexStrPresent) {
+ throw new ConfigException(SinkTask.TOPICS_CONFIG + " and " + SinkTask.TOPICS_REGEX_CONFIG +
+ " are mutually exclusive options, but both are set.");
+ }
+
+ if (!topicsStrPresent && !topicsRegexStrPresent) {
+ throw new ConfigException("Must configure one of " +
+ SinkTask.TOPICS_CONFIG + " or " + SinkTask.TOPICS_REGEX_CONFIG);
+ }
+
+ if (topicsStrPresent) {
+ String[] topics = topicsStr.split(",");
+ consumer.subscribe(Arrays.asList(topics), new HandleRebalance());
+ log.debug("{} Initializing and starting task for topics {}", this, topics);
+ } else {
+ Pattern pattern = Pattern.compile(topicsRegexStr);
+ consumer.subscribe(pattern, new HandleRebalance());
+ log.debug("{} Initializing and starting task for topics regex {}", this, topicsRegexStr);
+ }
+
task.initialize(context);
task.start(taskConfig);
log.info("{} Sink task finished initialization and start", this);
http://git-wip-us.apache.org/repos/asf/kafka/blob/049342e4/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/DistributedHerder.java
----------------------------------------------------------------------
diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/DistributedHerder.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/DistributedHerder.java
index 79d32da..a1cc56a 100644
--- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/DistributedHerder.java
+++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/DistributedHerder.java
@@ -974,16 +974,13 @@ public class DistributedHerder extends AbstractHerder implements Runnable {
Map<String, String> configs = configState.connectorConfig(connName);
ConnectorConfig connConfig;
- List<String> sinkTopics = null;
if (worker.isSinkConnector(connName)) {
connConfig = new SinkConnectorConfig(plugins(), configs);
- sinkTopics = connConfig.getList(SinkConnectorConfig.TOPICS_CONFIG);
} else {
connConfig = new SourceConnectorConfig(plugins(), configs);
}
- final List<Map<String, String>> taskProps
- = worker.connectorTaskConfigs(connName, connConfig.getInt(ConnectorConfig.TASKS_MAX_CONFIG), sinkTopics);
+ final List<Map<String, String>> taskProps = worker.connectorTaskConfigs(connName, connConfig);
boolean changed = false;
int currentNumTasks = configState.taskCount(connName);
if (taskProps.size() != currentNumTasks) {
http://git-wip-us.apache.org/repos/asf/kafka/blob/049342e4/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/standalone/StandaloneHerder.java
----------------------------------------------------------------------
diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/standalone/StandaloneHerder.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/standalone/StandaloneHerder.java
index 5d8beab..e9ec0f9 100644
--- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/standalone/StandaloneHerder.java
+++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/standalone/StandaloneHerder.java
@@ -255,19 +255,11 @@ public class StandaloneHerder extends AbstractHerder {
private List<Map<String, String>> recomputeTaskConfigs(String connName) {
Map<String, String> config = configState.connectorConfig(connName);
- ConnectorConfig connConfig;
- if (worker.isSinkConnector(connName)) {
- connConfig = new SinkConnectorConfig(plugins(), config);
- return worker.connectorTaskConfigs(connName,
- connConfig.getInt(ConnectorConfig.TASKS_MAX_CONFIG),
- connConfig.getList(SinkConnectorConfig.TOPICS_CONFIG));
- } else {
- connConfig = new SourceConnectorConfig(plugins(), config);
- return worker.connectorTaskConfigs(connName,
- connConfig.getInt(ConnectorConfig.TASKS_MAX_CONFIG),
- null);
- }
+ ConnectorConfig connConfig = worker.isSinkConnector(connName) ?
+ new SinkConnectorConfig(plugins(), config) :
+ new SourceConnectorConfig(plugins(), config);
+ return worker.connectorTaskConfigs(connName, connConfig);
}
private void createConnectorTasks(String connName, TargetState initialState) {
http://git-wip-us.apache.org/repos/asf/kafka/blob/049342e4/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerSinkTaskTest.java
----------------------------------------------------------------------
diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerSinkTaskTest.java b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerSinkTaskTest.java
index 50b091d..48d8740 100644
--- a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerSinkTaskTest.java
+++ b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerSinkTaskTest.java
@@ -67,6 +67,7 @@ import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;
+import java.util.regex.Pattern;
import static java.util.Arrays.asList;
import static java.util.Collections.singleton;
@@ -127,6 +128,7 @@ public class WorkerSinkTaskTest {
@Mock
private KafkaConsumer<byte[], byte[]> consumer;
private Capture<ConsumerRebalanceListener> rebalanceListener = EasyMock.newCapture();
+ private Capture<Pattern> topicsRegex = EasyMock.newCapture();
private long recordsReturnedTp1;
private long recordsReturnedTp3;
@@ -1143,6 +1145,41 @@ public class WorkerSinkTaskTest {
PowerMock.verifyAll();
}
+ @Test
+ public void testTopicsRegex() throws Exception {
+ Map<String, String> props = new HashMap<>(TASK_PROPS);
+ props.remove("topics");
+ props.put("topics.regex", "te.*");
+ TaskConfig taskConfig = new TaskConfig(props);
+
+ createTask(TargetState.PAUSED);
+
+ PowerMock.expectPrivate(workerTask, "createConsumer").andReturn(consumer);
+ consumer.subscribe(EasyMock.capture(topicsRegex), EasyMock.capture(rebalanceListener));
+ PowerMock.expectLastCall();
+
+ sinkTask.initialize(EasyMock.capture(sinkTaskContext));
+ PowerMock.expectLastCall();
+ sinkTask.start(props);
+ PowerMock.expectLastCall();
+
+ expectPollInitialAssignment();
+
+ Set<TopicPartition> partitions = new HashSet<>(asList(TOPIC_PARTITION, TOPIC_PARTITION2));
+ EasyMock.expect(consumer.assignment()).andReturn(partitions);
+ consumer.pause(partitions);
+ PowerMock.expectLastCall();
+
+ PowerMock.replayAll();
+
+ workerTask.initialize(taskConfig);
+ workerTask.initializeAndStart();
+ workerTask.iteration();
+ time.sleep(10000L);
+
+ PowerMock.verifyAll();
+ }
+
private void expectInitializeTask() throws Exception {
PowerMock.expectPrivate(workerTask, "createConsumer").andReturn(consumer);
consumer.subscribe(EasyMock.eq(asList(TOPIC)), EasyMock.capture(rebalanceListener));
http://git-wip-us.apache.org/repos/asf/kafka/blob/049342e4/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerTest.java
----------------------------------------------------------------------
diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerTest.java b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerTest.java
index 80c65df..e78ccc8 100644
--- a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerTest.java
+++ b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerTest.java
@@ -443,7 +443,10 @@ public class WorkerTest extends ThreadedTest {
} catch (ConnectException e) {
// expected
}
- List<Map<String, String>> taskConfigs = worker.connectorTaskConfigs(CONNECTOR_ID, 2, Arrays.asList("foo", "bar"));
+ Map<String, String> connProps = new HashMap<>(props);
+ connProps.put(ConnectorConfig.TASKS_MAX_CONFIG, "2");
+ ConnectorConfig connConfig = new SinkConnectorConfig(plugins, connProps);
+ List<Map<String, String>> taskConfigs = worker.connectorTaskConfigs(CONNECTOR_ID, connConfig);
Map<String, String> expectedTaskProps = new HashMap<>();
expectedTaskProps.put("foo", "bar");
expectedTaskProps.put(TaskConfig.TASK_CLASS_CONFIG, TestSourceTask.class.getName());
http://git-wip-us.apache.org/repos/asf/kafka/blob/049342e4/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/distributed/DistributedHerderTest.java
----------------------------------------------------------------------
diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/distributed/DistributedHerderTest.java b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/distributed/DistributedHerderTest.java
index 7483261..d41ccbe 100644
--- a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/distributed/DistributedHerderTest.java
+++ b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/distributed/DistributedHerderTest.java
@@ -168,12 +168,15 @@ public class DistributedHerderTest {
private ConfigBackingStore.UpdateListener configUpdateListener;
private WorkerRebalanceListener rebalanceListener;
+ private SinkConnectorConfig conn1SinkConfig;
+ private SinkConnectorConfig conn1SinkConfigUpdated;
+
@Before
public void setUp() throws Exception {
time = new MockTime();
metrics = new MockConnectMetrics(time);
worker = PowerMock.createMock(Worker.class);
- EasyMock.expect(worker.isSinkConnector(CONN1)).andStubReturn(Boolean.FALSE);
+ EasyMock.expect(worker.isSinkConnector(CONN1)).andStubReturn(Boolean.TRUE);
herder = PowerMock.createPartialMock(DistributedHerder.class, new String[]{"backoff", "connectorTypeForClass", "updateDeletedConnectorStatus"},
new DistributedConfig(HERDER_CONFIG), worker, WORKER_ID, statusBackingStore, configBackingStore, member, MEMBER_URL, metrics, time);
@@ -181,6 +184,8 @@ public class DistributedHerderTest {
configUpdateListener = herder.new ConfigUpdateListener();
rebalanceListener = herder.new RebalanceListener();
plugins = PowerMock.createMock(Plugins.class);
+ conn1SinkConfig = new SinkConnectorConfig(plugins, CONN1_CONFIG);
+ conn1SinkConfigUpdated = new SinkConnectorConfig(plugins, CONN1_CONFIG_UPDATED);
EasyMock.expect(herder.connectorTypeForClass(BogusSourceConnector.class.getName())).andReturn(ConnectorType.SOURCE).anyTimes();
pluginLoader = PowerMock.createMock(PluginClassLoader.class);
delegatingLoader = PowerMock.createMock(DelegatingClassLoader.class);
@@ -205,7 +210,7 @@ public class DistributedHerderTest {
PowerMock.expectLastCall().andReturn(true);
EasyMock.expect(worker.isRunning(CONN1)).andReturn(true);
- EasyMock.expect(worker.connectorTaskConfigs(CONN1, MAX_TASKS, null)).andReturn(TASK_CONFIGS);
+ EasyMock.expect(worker.connectorTaskConfigs(CONN1, conn1SinkConfig)).andReturn(TASK_CONFIGS);
worker.startTask(EasyMock.eq(TASK1), EasyMock.<Map<String, String>>anyObject(), EasyMock.<Map<String, String>>anyObject(),
EasyMock.eq(herder), EasyMock.eq(TargetState.STARTED));
PowerMock.expectLastCall().andReturn(true);
@@ -232,7 +237,7 @@ public class DistributedHerderTest {
EasyMock.eq(herder), EasyMock.eq(TargetState.STARTED));
PowerMock.expectLastCall().andReturn(true);
EasyMock.expect(worker.isRunning(CONN1)).andReturn(true);
- EasyMock.expect(worker.connectorTaskConfigs(CONN1, MAX_TASKS, null)).andReturn(TASK_CONFIGS);
+ EasyMock.expect(worker.connectorTaskConfigs(CONN1, conn1SinkConfig)).andReturn(TASK_CONFIGS);
worker.startTask(EasyMock.eq(TASK1), EasyMock.<Map<String, String>>anyObject(), EasyMock.<Map<String, String>>anyObject(),
EasyMock.eq(herder), EasyMock.eq(TargetState.STARTED));
PowerMock.expectLastCall().andReturn(true);
@@ -248,7 +253,7 @@ public class DistributedHerderTest {
EasyMock.eq(herder), EasyMock.eq(TargetState.STARTED));
PowerMock.expectLastCall().andReturn(true);
EasyMock.expect(worker.isRunning(CONN1)).andReturn(true);
- EasyMock.expect(worker.connectorTaskConfigs(CONN1, MAX_TASKS, null)).andReturn(TASK_CONFIGS);
+ EasyMock.expect(worker.connectorTaskConfigs(CONN1, conn1SinkConfig)).andReturn(TASK_CONFIGS);
member.poll(EasyMock.anyInt());
PowerMock.expectLastCall();
@@ -279,7 +284,7 @@ public class DistributedHerderTest {
EasyMock.eq(herder), EasyMock.eq(TargetState.STARTED));
PowerMock.expectLastCall().andReturn(true);
EasyMock.expect(worker.isRunning(CONN1)).andReturn(true);
- EasyMock.expect(worker.connectorTaskConfigs(CONN1, MAX_TASKS, null)).andReturn(TASK_CONFIGS);
+ EasyMock.expect(worker.connectorTaskConfigs(CONN1, conn1SinkConfig)).andReturn(TASK_CONFIGS);
worker.startTask(EasyMock.eq(TASK1), EasyMock.<Map<String, String>>anyObject(), EasyMock.<Map<String, String>>anyObject(),
EasyMock.eq(herder), EasyMock.eq(TargetState.STARTED));
PowerMock.expectLastCall().andReturn(true);
@@ -558,7 +563,7 @@ public class DistributedHerderTest {
EasyMock.eq(herder), EasyMock.eq(TargetState.STARTED));
PowerMock.expectLastCall().andReturn(true);
EasyMock.expect(worker.isRunning(CONN1)).andReturn(true);
- EasyMock.expect(worker.connectorTaskConfigs(CONN1, MAX_TASKS, null)).andReturn(TASK_CONFIGS);
+ EasyMock.expect(worker.connectorTaskConfigs(CONN1, conn1SinkConfig)).andReturn(TASK_CONFIGS);
// And delete the connector
member.wakeup();
@@ -584,7 +589,7 @@ public class DistributedHerderTest {
@Test
public void testRestartConnector() throws Exception {
- EasyMock.expect(worker.connectorTaskConfigs(CONN1, MAX_TASKS, null)).andStubReturn(TASK_CONFIGS);
+ EasyMock.expect(worker.connectorTaskConfigs(CONN1, conn1SinkConfig)).andStubReturn(TASK_CONFIGS);
// get the initial assignment
EasyMock.expect(member.memberId()).andStubReturn("leader");
@@ -740,7 +745,7 @@ public class DistributedHerderTest {
@Test
public void testRestartTask() throws Exception {
- EasyMock.expect(worker.connectorTaskConfigs(CONN1, MAX_TASKS, null)).andStubReturn(TASK_CONFIGS);
+ EasyMock.expect(worker.connectorTaskConfigs(CONN1, conn1SinkConfig)).andStubReturn(TASK_CONFIGS);
// get the initial assignment
EasyMock.expect(member.memberId()).andStubReturn("leader");
@@ -921,7 +926,7 @@ public class DistributedHerderTest {
PowerMock.expectLastCall().andReturn(true);
EasyMock.expect(worker.getPlugins()).andReturn(plugins);
EasyMock.expect(worker.isRunning(CONN1)).andReturn(true);
- EasyMock.expect(worker.connectorTaskConfigs(CONN1, MAX_TASKS, null)).andReturn(TASK_CONFIGS);
+ EasyMock.expect(worker.connectorTaskConfigs(CONN1, conn1SinkConfig)).andReturn(TASK_CONFIGS);
member.poll(EasyMock.anyInt());
PowerMock.expectLastCall();
@@ -950,7 +955,7 @@ public class DistributedHerderTest {
PowerMock.expectLastCall().andReturn(true);
EasyMock.expect(worker.getPlugins()).andReturn(plugins);
EasyMock.expect(worker.isRunning(CONN1)).andReturn(true);
- EasyMock.expect(worker.connectorTaskConfigs(CONN1, MAX_TASKS, null)).andReturn(TASK_CONFIGS);
+ EasyMock.expect(worker.connectorTaskConfigs(CONN1, conn1SinkConfig)).andReturn(TASK_CONFIGS);
member.poll(EasyMock.anyInt());
PowerMock.expectLastCall();
@@ -966,7 +971,7 @@ public class DistributedHerderTest {
PowerMock.expectLastCall().andReturn(true);
EasyMock.expect(worker.getPlugins()).andReturn(plugins);
EasyMock.expect(worker.isRunning(CONN1)).andReturn(true);
- EasyMock.expect(worker.connectorTaskConfigs(CONN1, MAX_TASKS, null)).andReturn(TASK_CONFIGS);
+ EasyMock.expect(worker.connectorTaskConfigs(CONN1, conn1SinkConfig)).andReturn(TASK_CONFIGS);
member.poll(EasyMock.anyInt());
PowerMock.expectLastCall();
@@ -994,7 +999,7 @@ public class DistributedHerderTest {
PowerMock.expectLastCall().andReturn(true);
EasyMock.expect(worker.getPlugins()).andReturn(plugins);
EasyMock.expect(worker.isRunning(CONN1)).andReturn(true);
- EasyMock.expect(worker.connectorTaskConfigs(CONN1, MAX_TASKS, null)).andReturn(TASK_CONFIGS);
+ EasyMock.expect(worker.connectorTaskConfigs(CONN1, conn1SinkConfig)).andReturn(TASK_CONFIGS);
member.poll(EasyMock.anyInt());
PowerMock.expectLastCall();
@@ -1047,7 +1052,7 @@ public class DistributedHerderTest {
// we expect reconfiguration after resuming
EasyMock.expect(worker.isRunning(CONN1)).andReturn(true);
- EasyMock.expect(worker.connectorTaskConfigs(CONN1, MAX_TASKS, null)).andReturn(TASK_CONFIGS);
+ EasyMock.expect(worker.connectorTaskConfigs(CONN1, conn1SinkConfig)).andReturn(TASK_CONFIGS);
worker.setTargetState(CONN1, TargetState.STARTED);
PowerMock.expectLastCall();
@@ -1241,7 +1246,7 @@ public class DistributedHerderTest {
EasyMock.eq(herder), EasyMock.eq(TargetState.STARTED));
PowerMock.expectLastCall().andReturn(true);
EasyMock.expect(worker.getPlugins()).andReturn(plugins);
- EasyMock.expect(worker.connectorTaskConfigs(CONN1, MAX_TASKS, null)).andReturn(TASK_CONFIGS);
+ EasyMock.expect(worker.connectorTaskConfigs(CONN1, conn1SinkConfig)).andReturn(TASK_CONFIGS);
worker.startTask(EasyMock.eq(TASK1), EasyMock.<Map<String, String>>anyObject(), EasyMock.<Map<String, String>>anyObject(),
EasyMock.eq(herder), EasyMock.eq(TargetState.STARTED));
PowerMock.expectLastCall().andReturn(true);
@@ -1317,7 +1322,7 @@ public class DistributedHerderTest {
EasyMock.eq(herder), EasyMock.eq(TargetState.STARTED));
PowerMock.expectLastCall().andReturn(true);
EasyMock.expect(worker.isRunning(CONN1)).andReturn(true);
- EasyMock.expect(worker.connectorTaskConfigs(CONN1, MAX_TASKS, null)).andReturn(TASK_CONFIGS);
+ EasyMock.expect(worker.connectorTaskConfigs(CONN1, conn1SinkConfig)).andReturn(TASK_CONFIGS);
// list connectors, get connector info, get connector config, get task configs
member.wakeup();
@@ -1356,7 +1361,7 @@ public class DistributedHerderTest {
EasyMock.eq(herder), EasyMock.eq(TargetState.STARTED));
PowerMock.expectLastCall().andReturn(true);
EasyMock.expect(worker.isRunning(CONN1)).andReturn(true);
- EasyMock.expect(worker.connectorTaskConfigs(CONN1, MAX_TASKS, null)).andReturn(TASK_CONFIGS);
+ EasyMock.expect(worker.connectorTaskConfigs(CONN1, conn1SinkConfigUpdated)).andReturn(TASK_CONFIGS);
member.poll(EasyMock.anyInt());
PowerMock.expectLastCall();
http://git-wip-us.apache.org/repos/asf/kafka/blob/049342e4/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/standalone/StandaloneHerderTest.java
----------------------------------------------------------------------
diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/standalone/StandaloneHerderTest.java b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/standalone/StandaloneHerderTest.java
index 1cd1804..18d2739 100644
--- a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/standalone/StandaloneHerderTest.java
+++ b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/standalone/StandaloneHerderTest.java
@@ -30,6 +30,8 @@ import org.apache.kafka.connect.runtime.ConnectorConfig;
import org.apache.kafka.connect.runtime.ConnectorStatus;
import org.apache.kafka.connect.runtime.Herder;
import org.apache.kafka.connect.runtime.HerderConnectorContext;
+import org.apache.kafka.connect.runtime.SinkConnectorConfig;
+import org.apache.kafka.connect.runtime.SourceConnectorConfig;
import org.apache.kafka.connect.runtime.TargetState;
import org.apache.kafka.connect.runtime.TaskConfig;
import org.apache.kafka.connect.runtime.TaskStatus;
@@ -479,7 +481,7 @@ public class StandaloneHerderTest {
EasyMock.expectLastCall().andReturn(true);
EasyMock.expect(worker.isRunning(CONNECTOR_NAME)).andReturn(true);
// Generate same task config, which should result in no additional action to restart tasks
- EasyMock.expect(worker.connectorTaskConfigs(CONNECTOR_NAME, DEFAULT_MAX_TASKS, null))
+ EasyMock.expect(worker.connectorTaskConfigs(CONNECTOR_NAME, new SourceConnectorConfig(plugins, newConnConfig)))
.andReturn(singletonList(taskConfig(SourceSink.SOURCE)));
worker.isSinkConnector(CONNECTOR_NAME);
EasyMock.expectLastCall().andReturn(false);
@@ -562,6 +564,9 @@ public class StandaloneHerderTest {
private void expectAdd(SourceSink sourceSink) throws Exception {
Map<String, String> connectorProps = connectorConfig(sourceSink);
+ ConnectorConfig connConfig = sourceSink == SourceSink.SOURCE ?
+ new SourceConnectorConfig(plugins, connectorProps) :
+ new SinkConnectorConfig(plugins, connectorProps);
worker.startConnector(EasyMock.eq(CONNECTOR_NAME), EasyMock.eq(connectorProps), EasyMock.anyObject(HerderConnectorContext.class),
EasyMock.eq(herder), EasyMock.eq(TargetState.STARTED));
@@ -577,7 +582,7 @@ public class StandaloneHerderTest {
Map<String, String> generatedTaskProps = taskConfig(sourceSink);
- EasyMock.expect(worker.connectorTaskConfigs(CONNECTOR_NAME, DEFAULT_MAX_TASKS, sourceSink == SourceSink.SINK ? TOPICS_LIST : null))
+ EasyMock.expect(worker.connectorTaskConfigs(CONNECTOR_NAME, connConfig))
.andReturn(singletonList(generatedTaskProps));
worker.startTask(new ConnectorTaskId(CONNECTOR_NAME, 0), connectorConfig(sourceSink), generatedTaskProps, herder, TargetState.STARTED);
http://git-wip-us.apache.org/repos/asf/kafka/blob/049342e4/docs/connect.html
----------------------------------------------------------------------
diff --git a/docs/connect.html b/docs/connect.html
index 78c66b1..b910cf5 100644
--- a/docs/connect.html
+++ b/docs/connect.html
@@ -95,9 +95,10 @@
<p>The <code>connector.class</code> config supports several formats: the full name or alias of the class for this connector. If the connector is org.apache.kafka.connect.file.FileStreamSinkConnector, you can either specify this full name or use FileStreamSink or FileStreamSinkConnector to make the configuration a bit shorter.</p>
- <p>Sink connectors also have one additional option to control their input:</p>
+ <p>Sink connectors also have a few additional options to control their input. Each sink connector must set one of the following:</p>
<ul>
- <li><code>topics</code> - A list of topics to use as input for this connector</li>
+ <li><code>topics</code> - A comma-separated list of topics to use as input for this connector</li>
+ <li><code>topics.regex</code> - A Java regular expression of topics to use as input for this connector</li>
</ul>
<p>For any other options, you should consult the documentation for the connector.</p>