You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by gw...@apache.org on 2015/10/24 01:37:42 UTC
[3/3] kafka git commit: KAFKA-2371: Add distributed support for
Copycat.
KAFKA-2371: Add distributed support for Copycat.
This adds coordination between DistributedHerders using the generalized consumer
support, allowing automatic balancing of connectors and tasks across workers. A
few pieces that require interaction between workers (resolving config
inconsistencies, forwarding of configuration changes to the leader worker) are
incomplete because they require REST API support to implement properly.
Author: Ewen Cheslack-Postava <me...@ewencp.org>
Reviewers: Jason Gustafson, Gwen Shapira
Closes #321 from ewencp/kafka-2371-distributed-herder
Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/2e617735
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/2e617735
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/2e617735
Branch: refs/heads/trunk
Commit: 2e61773590c0ba86cb8813e6ba17bf6ee33f4461
Parents: 21443f2
Author: Ewen Cheslack-Postava <me...@ewencp.org>
Authored: Fri Oct 23 16:37:30 2015 -0700
Committer: Gwen Shapira <cs...@gmail.com>
Committed: Fri Oct 23 16:37:30 2015 -0700
----------------------------------------------------------------------
build.gradle | 1 +
checkstyle/import-control.xml | 1 +
.../clients/consumer/RoundRobinAssignor.java | 35 +-
.../consumer/internals/AbstractCoordinator.java | 8 +-
.../kafka/common/utils/CircularIterator.java | 54 ++
config/copycat-distributed.properties | 2 +
.../kafka/copycat/file/FileStreamSinkTask.java | 12 +-
.../copycat/file/FileStreamSourceTask.java | 17 +-
.../kafka/copycat/cli/CopycatDistributed.java | 7 +-
.../kafka/copycat/runtime/ConnectorConfig.java | 2 +-
.../kafka/copycat/runtime/TaskConfig.java | 54 ++
.../apache/kafka/copycat/runtime/Worker.java | 145 +++-
.../runtime/distributed/ClusterConfigState.java | 40 +-
.../runtime/distributed/CopycatProtocol.java | 246 +++++++
.../runtime/distributed/DistributedHerder.java | 733 +++++++++++++------
.../distributed/DistributedHerderConfig.java | 192 +++++
.../runtime/distributed/NotLeaderException.java | 38 +
.../runtime/distributed/WorkerCoordinator.java | 288 ++++++++
.../runtime/distributed/WorkerGroupMember.java | 184 +++++
.../distributed/WorkerRebalanceListener.java | 38 +
.../runtime/standalone/StandaloneHerder.java | 168 ++---
.../copycat/storage/KafkaConfigStorage.java | 64 +-
.../storage/KafkaOffsetBackingStore.java | 2 +
.../kafka/copycat/util/ConnectorTaskId.java | 10 +-
.../kafka/copycat/runtime/WorkerTest.java | 199 ++++-
.../distributed/DistributedHerderTest.java | 436 ++++++-----
.../distributed/WorkerCoordinatorTest.java | 436 +++++++++++
.../standalone/StandaloneHerderTest.java | 45 +-
.../copycat/storage/KafkaConfigStorageTest.java | 49 +-
.../apache/kafka/copycat/util/TestFuture.java | 10 +-
tests/kafkatest/services/copycat.py | 67 +-
.../kafkatest/tests/copycat_distributed_test.py | 67 +-
tests/kafkatest/tests/copycat_test.py | 5 +-
.../templates/copycat-distributed.properties | 7 +-
34 files changed, 2966 insertions(+), 696 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/kafka/blob/2e617735/build.gradle
----------------------------------------------------------------------
diff --git a/build.gradle b/build.gradle
index 16fb981..128c195 100644
--- a/build.gradle
+++ b/build.gradle
@@ -754,6 +754,7 @@ project(':copycat:runtime') {
testCompile "$easymock"
testCompile "$powermock"
testCompile "$powermock_easymock"
+ testCompile project(':clients').sourceSets.test.output
testRuntime "$slf4jlog4j"
testRuntime project(":copycat:json")
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/2e617735/checkstyle/import-control.xml
----------------------------------------------------------------------
diff --git a/checkstyle/import-control.xml b/checkstyle/import-control.xml
index 6474865..e1ea93c 100644
--- a/checkstyle/import-control.xml
+++ b/checkstyle/import-control.xml
@@ -146,6 +146,7 @@
<allow pkg="org.apache.kafka.copycat.data" />
<allow pkg="org.apache.kafka.copycat.errors" />
<allow pkg="org.apache.kafka.clients" />
+ <allow pkg="org.apache.kafka.test"/>
<subpackage name="source">
<allow pkg="org.apache.kafka.copycat.connector" />
http://git-wip-us.apache.org/repos/asf/kafka/blob/2e617735/clients/src/main/java/org/apache/kafka/clients/consumer/RoundRobinAssignor.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/RoundRobinAssignor.java b/clients/src/main/java/org/apache/kafka/clients/consumer/RoundRobinAssignor.java
index c5ea2bb..b8dc253 100644
--- a/clients/src/main/java/org/apache/kafka/clients/consumer/RoundRobinAssignor.java
+++ b/clients/src/main/java/org/apache/kafka/clients/consumer/RoundRobinAssignor.java
@@ -14,11 +14,11 @@ package org.apache.kafka.clients.consumer;
import org.apache.kafka.clients.consumer.internals.AbstractPartitionAssignor;
import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.utils.CircularIterator;
import org.apache.kafka.common.utils.Utils;
import java.util.ArrayList;
import java.util.HashMap;
-import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.SortedSet;
@@ -78,37 +78,4 @@ public class RoundRobinAssignor extends AbstractPartitionAssignor {
return "roundrobin";
}
- private static class CircularIterator<T> implements Iterator<T> {
- int i = 0;
- private List<T> list;
-
- public CircularIterator(List<T> list) {
- if (list.isEmpty()) {
- throw new IllegalArgumentException("CircularIterator can only be used on non-empty lists");
- }
- this.list = list;
- }
-
- @Override
- public boolean hasNext() {
- return true;
- }
-
- @Override
- public T next() {
- T next = list.get(i);
- i = (i + 1) % list.size();
- return next;
- }
-
- public T peek() {
- return list.get(i);
- }
-
- @Override
- public void remove() {
- throw new UnsupportedOperationException();
- }
- }
-
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/2e617735/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java
index 1ffd2bb..a2b9ec5 100644
--- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java
+++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java
@@ -199,7 +199,7 @@ public abstract class AbstractCoordinator {
this.memberId = JoinGroupRequest.UNKNOWN_MEMBER_ID;
rejoinNeeded = true;
}
-
+ private boolean needsOnLeave = true;
/**
* Ensure that the group is active (i.e. joined and synced)
*/
@@ -208,7 +208,10 @@ public abstract class AbstractCoordinator {
return;
// onLeave only invoked if we have a valid current generation
- onLeave(generation, memberId);
+ if (needsOnLeave) {
+ onLeave(generation, memberId);
+ needsOnLeave = false;
+ }
while (needRejoin()) {
ensureCoordinatorKnown();
@@ -225,6 +228,7 @@ public abstract class AbstractCoordinator {
if (future.succeeded()) {
onJoin(generation, memberId, protocol, future.value());
+ needsOnLeave = true;
heartbeatTask.reset();
} else {
if (future.exception() instanceof UnknownMemberIdException)
http://git-wip-us.apache.org/repos/asf/kafka/blob/2e617735/clients/src/main/java/org/apache/kafka/common/utils/CircularIterator.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/utils/CircularIterator.java b/clients/src/main/java/org/apache/kafka/common/utils/CircularIterator.java
new file mode 100644
index 0000000..00be783
--- /dev/null
+++ b/clients/src/main/java/org/apache/kafka/common/utils/CircularIterator.java
@@ -0,0 +1,54 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ **/
+
+package org.apache.kafka.common.utils;
+
+import java.util.Iterator;
+import java.util.List;
+
+public class CircularIterator<T> implements Iterator<T> {
+ int i = 0;
+ private List<T> list;
+
+ public CircularIterator(List<T> list) {
+ if (list.isEmpty()) {
+ throw new IllegalArgumentException("CircularIterator can only be used on non-empty lists");
+ }
+ this.list = list;
+ }
+
+ @Override
+ public boolean hasNext() {
+ return true;
+ }
+
+ @Override
+ public T next() {
+ T next = list.get(i);
+ i = (i + 1) % list.size();
+ return next;
+ }
+
+ public T peek() {
+ return list.get(i);
+ }
+
+ @Override
+ public void remove() {
+ throw new UnsupportedOperationException();
+ }
+}
http://git-wip-us.apache.org/repos/asf/kafka/blob/2e617735/config/copycat-distributed.properties
----------------------------------------------------------------------
diff --git a/config/copycat-distributed.properties b/config/copycat-distributed.properties
index b122413..2ea5b73 100644
--- a/config/copycat-distributed.properties
+++ b/config/copycat-distributed.properties
@@ -18,6 +18,8 @@
# These are defaults. This file just demonstrates how to override some settings.
bootstrap.servers=localhost:9092
+group.id=copycat-cluster
+
# The converters specify the format of data in Kafka and how to translate it into Copycat data. Every Copycat user will
# need to configure these based on the format they want their data in when loaded from or stored into Kafka
key.converter=org.apache.kafka.copycat.json.JsonConverter
http://git-wip-us.apache.org/repos/asf/kafka/blob/2e617735/copycat/file/src/main/java/org/apache/kafka/copycat/file/FileStreamSinkTask.java
----------------------------------------------------------------------
diff --git a/copycat/file/src/main/java/org/apache/kafka/copycat/file/FileStreamSinkTask.java b/copycat/file/src/main/java/org/apache/kafka/copycat/file/FileStreamSinkTask.java
index 9ea459c..6dfe4a7 100644
--- a/copycat/file/src/main/java/org/apache/kafka/copycat/file/FileStreamSinkTask.java
+++ b/copycat/file/src/main/java/org/apache/kafka/copycat/file/FileStreamSinkTask.java
@@ -38,6 +38,7 @@ import java.util.Properties;
public class FileStreamSinkTask extends SinkTask {
private static final Logger log = LoggerFactory.getLogger(FileStreamSinkTask.class);
+ private String filename;
private PrintStream outputStream;
public FileStreamSinkTask() {
@@ -45,12 +46,13 @@ public class FileStreamSinkTask extends SinkTask {
// for testing
public FileStreamSinkTask(PrintStream outputStream) {
+ filename = null;
this.outputStream = outputStream;
}
@Override
public void start(Properties props) {
- String filename = props.getProperty(FileStreamSinkConnector.FILE_CONFIG);
+ filename = props.getProperty(FileStreamSinkConnector.FILE_CONFIG);
if (filename == null) {
outputStream = System.out;
} else {
@@ -65,16 +67,24 @@ public class FileStreamSinkTask extends SinkTask {
@Override
public void put(Collection<SinkRecord> sinkRecords) {
for (SinkRecord record : sinkRecords) {
+ log.trace("Writing line to {}: {}", logFilename(), record.value());
outputStream.println(record.value());
}
}
@Override
public void flush(Map<TopicPartition, OffsetAndMetadata> offsets) {
+ log.trace("Flushing output stream for {}", logFilename());
outputStream.flush();
}
@Override
public void stop() {
+ if (outputStream != System.out)
+ outputStream.close();
+ }
+
+ private String logFilename() {
+ return filename == null ? "stdout" : filename;
}
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/2e617735/copycat/file/src/main/java/org/apache/kafka/copycat/file/FileStreamSourceTask.java
----------------------------------------------------------------------
diff --git a/copycat/file/src/main/java/org/apache/kafka/copycat/file/FileStreamSourceTask.java b/copycat/file/src/main/java/org/apache/kafka/copycat/file/FileStreamSourceTask.java
index cf71be3..f2249d0 100644
--- a/copycat/file/src/main/java/org/apache/kafka/copycat/file/FileStreamSourceTask.java
+++ b/copycat/file/src/main/java/org/apache/kafka/copycat/file/FileStreamSourceTask.java
@@ -56,7 +56,7 @@ public class FileStreamSourceTask extends SourceTask {
}
topic = props.getProperty(FileStreamSourceConnector.TOPIC_CONFIG);
if (topic == null)
- throw new CopycatException("ConsoleSourceTask config missing topic setting");
+ throw new CopycatException("FileStreamSourceTask config missing topic setting");
}
@Override
@@ -88,6 +88,7 @@ public class FileStreamSourceTask extends SourceTask {
streamOffset = 0L;
}
reader = new BufferedReader(new InputStreamReader(stream));
+ log.debug("Opened {} for reading", logFilename());
} catch (FileNotFoundException e) {
log.warn("Couldn't find file for FileStreamSourceTask, sleeping to wait for it to be created");
synchronized (this) {
@@ -113,6 +114,7 @@ public class FileStreamSourceTask extends SourceTask {
int nread = 0;
while (readerCopy.ready()) {
nread = readerCopy.read(buffer, offset, buffer.length - offset);
+ log.trace("Read {} bytes from {}", nread, logFilename());
if (nread > 0) {
offset += nread;
@@ -126,6 +128,7 @@ public class FileStreamSourceTask extends SourceTask {
do {
line = extractLine();
if (line != null) {
+ log.trace("Read a line from {}", logFilename());
if (records == null)
records = new ArrayList<>();
records.add(new SourceRecord(offsetKey(filename), offsetValue(streamOffset), topic, VALUE_SCHEMA, line));
@@ -183,10 +186,12 @@ public class FileStreamSourceTask extends SourceTask {
log.trace("Stopping");
synchronized (this) {
try {
- stream.close();
- log.trace("Closed input stream");
+ if (stream != null && stream != System.in) {
+ stream.close();
+ log.trace("Closed input stream");
+ }
} catch (IOException e) {
- log.error("Failed to close ConsoleSourceTask stream: ", e);
+ log.error("Failed to close FileStreamSourceTask stream: ", e);
}
this.notify();
}
@@ -199,4 +204,8 @@ public class FileStreamSourceTask extends SourceTask {
private Map<String, Long> offsetValue(Long pos) {
return Collections.singletonMap(POSITION_FIELD, pos);
}
+
+ private String logFilename() {
+ return filename == null ? "stdin" : filename;
+ }
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/2e617735/copycat/runtime/src/main/java/org/apache/kafka/copycat/cli/CopycatDistributed.java
----------------------------------------------------------------------
diff --git a/copycat/runtime/src/main/java/org/apache/kafka/copycat/cli/CopycatDistributed.java b/copycat/runtime/src/main/java/org/apache/kafka/copycat/cli/CopycatDistributed.java
index b0230b2..0ff6e81 100644
--- a/copycat/runtime/src/main/java/org/apache/kafka/copycat/cli/CopycatDistributed.java
+++ b/copycat/runtime/src/main/java/org/apache/kafka/copycat/cli/CopycatDistributed.java
@@ -48,8 +48,8 @@ public class CopycatDistributed {
Properties workerProps;
Properties connectorProps;
- if (args.length < 2) {
- log.info("Usage: CopycatDistributed worker.properties connector1.properties [connector2.properties ...]");
+ if (args.length < 1) {
+ log.info("Usage: CopycatDistributed worker.properties [connector1.properties connector2.properties ...]");
System.exit(1);
}
@@ -58,8 +58,7 @@ public class CopycatDistributed {
WorkerConfig workerConfig = new WorkerConfig(workerProps);
Worker worker = new Worker(workerConfig, new KafkaOffsetBackingStore());
- DistributedHerder herder = new DistributedHerder(worker);
- herder.configure(workerConfig.originals());
+ DistributedHerder herder = new DistributedHerder(worker, workerConfig.originals());
final Copycat copycat = new Copycat(worker, herder);
copycat.start();
http://git-wip-us.apache.org/repos/asf/kafka/blob/2e617735/copycat/runtime/src/main/java/org/apache/kafka/copycat/runtime/ConnectorConfig.java
----------------------------------------------------------------------
diff --git a/copycat/runtime/src/main/java/org/apache/kafka/copycat/runtime/ConnectorConfig.java b/copycat/runtime/src/main/java/org/apache/kafka/copycat/runtime/ConnectorConfig.java
index 767c88b..2242299 100644
--- a/copycat/runtime/src/main/java/org/apache/kafka/copycat/runtime/ConnectorConfig.java
+++ b/copycat/runtime/src/main/java/org/apache/kafka/copycat/runtime/ConnectorConfig.java
@@ -59,7 +59,7 @@ public class ConnectorConfig extends AbstractConfig {
static {
config = new ConfigDef()
.define(NAME_CONFIG, Type.STRING, Importance.HIGH, NAME_DOC)
- .define(CONNECTOR_CLASS_CONFIG, Type.STRING, Importance.HIGH, CONNECTOR_CLASS_DOC)
+ .define(CONNECTOR_CLASS_CONFIG, Type.CLASS, Importance.HIGH, CONNECTOR_CLASS_DOC)
.define(TASKS_MAX_CONFIG, Type.INT, TASKS_MAX_DEFAULT, Importance.HIGH, TASKS_MAX_DOC)
.define(TOPICS_CONFIG, Type.LIST, TOPICS_DEFAULT, Importance.HIGH, TOPICS_DOC);
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/2e617735/copycat/runtime/src/main/java/org/apache/kafka/copycat/runtime/TaskConfig.java
----------------------------------------------------------------------
diff --git a/copycat/runtime/src/main/java/org/apache/kafka/copycat/runtime/TaskConfig.java b/copycat/runtime/src/main/java/org/apache/kafka/copycat/runtime/TaskConfig.java
new file mode 100644
index 0000000..be97879
--- /dev/null
+++ b/copycat/runtime/src/main/java/org/apache/kafka/copycat/runtime/TaskConfig.java
@@ -0,0 +1,54 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ **/
+
+package org.apache.kafka.copycat.runtime;
+
+import org.apache.kafka.common.config.AbstractConfig;
+import org.apache.kafka.common.config.ConfigDef;
+import org.apache.kafka.common.config.ConfigDef.Importance;
+import org.apache.kafka.common.config.ConfigDef.Type;
+
+import java.util.HashMap;
+import java.util.Map;
+
+/**
+ * <p>
+ * Configuration options for Tasks. These only include Copycat system-level configuration
+ * options.
+ * </p>
+ */
+public class TaskConfig extends AbstractConfig {
+
+ public static final String TASK_CLASS_CONFIG = "task.class";
+ private static final String TASK_CLASS_DOC =
+ "Name of the class for this task. Must be a subclass of org.apache.kafka.copycat.connector.Task";
+
+ private static ConfigDef config;
+
+ static {
+ config = new ConfigDef()
+ .define(TASK_CLASS_CONFIG, Type.CLASS, Importance.HIGH, TASK_CLASS_DOC);
+ }
+
+ public TaskConfig() {
+ this(new HashMap<String, String>());
+ }
+
+ public TaskConfig(Map<String, ?> props) {
+ super(config, props);
+ }
+}
http://git-wip-us.apache.org/repos/asf/kafka/blob/2e617735/copycat/runtime/src/main/java/org/apache/kafka/copycat/runtime/Worker.java
----------------------------------------------------------------------
diff --git a/copycat/runtime/src/main/java/org/apache/kafka/copycat/runtime/Worker.java b/copycat/runtime/src/main/java/org/apache/kafka/copycat/runtime/Worker.java
index 0fdab4c..b37e49f 100644
--- a/copycat/runtime/src/main/java/org/apache/kafka/copycat/runtime/Worker.java
+++ b/copycat/runtime/src/main/java/org/apache/kafka/copycat/runtime/Worker.java
@@ -17,12 +17,15 @@
package org.apache.kafka.copycat.runtime;
+import org.apache.kafka.common.KafkaException;
import org.apache.kafka.common.utils.SystemTime;
import org.apache.kafka.common.utils.Time;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.common.utils.Utils;
import org.apache.kafka.copycat.cli.WorkerConfig;
+import org.apache.kafka.copycat.connector.Connector;
+import org.apache.kafka.copycat.connector.ConnectorContext;
import org.apache.kafka.copycat.connector.Task;
import org.apache.kafka.copycat.errors.CopycatException;
import org.apache.kafka.copycat.sink.SinkTask;
@@ -33,8 +36,10 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.HashMap;
+import java.util.List;
import java.util.Map;
import java.util.Properties;
+import java.util.Set;
/**
* <p>
@@ -55,6 +60,7 @@ public class Worker {
private Converter internalKeyConverter;
private Converter internalValueConverter;
private OffsetBackingStore offsetBackingStore;
+ private HashMap<String, Connector> connectors = new HashMap<>();
private HashMap<ConnectorTaskId, WorkerTask> tasks = new HashMap<>();
private KafkaProducer<byte[], byte[]> producer;
private SourceTaskOffsetCommitter sourceTaskOffsetCommitter;
@@ -106,6 +112,17 @@ public class Worker {
long started = time.milliseconds();
long limit = started + config.getLong(WorkerConfig.TASK_SHUTDOWN_GRACEFUL_TIMEOUT_MS_CONFIG);
+ for (Map.Entry<String, Connector> entry : connectors.entrySet()) {
+ Connector conn = entry.getValue();
+ log.warn("Shutting down connector {} uncleanly; herder should have shut down connectors before the" +
+ "Worker is stopped.", conn);
+ try {
+ conn.stop();
+ } catch (CopycatException e) {
+ log.error("Error while shutting down connector " + conn, e);
+ }
+ }
+
for (Map.Entry<ConnectorTaskId, WorkerTask> entry : tasks.entrySet()) {
WorkerTask task = entry.getValue();
log.warn("Shutting down task {} uncleanly; herder should have shut down "
@@ -134,15 +151,106 @@ public class Worker {
}
/**
+ * Add a new connector.
+ * @param connConfig connector configuration
+ * @param ctx context for the connector
+ */
+ public void addConnector(ConnectorConfig connConfig, ConnectorContext ctx) {
+ String connName = connConfig.getString(ConnectorConfig.NAME_CONFIG);
+ Class<?> maybeConnClass = connConfig.getClass(ConnectorConfig.CONNECTOR_CLASS_CONFIG);
+ log.info("Creating connector {} of type {}", connName, maybeConnClass.getName());
+
+ Class<? extends Connector> connClass;
+ try {
+ connClass = maybeConnClass.asSubclass(Connector.class);
+ } catch (ClassCastException e) {
+ throw new CopycatException("Specified class is not a subclass of Connector: " + maybeConnClass.getName());
+ }
+
+ if (connectors.containsKey(connName))
+ throw new CopycatException("Connector with name " + connName + " already exists");
+
+ final Connector connector = instantiateConnector(connClass);
+ connector.initialize(ctx);
+ try {
+ Map<String, Object> originals = connConfig.originals();
+ Properties props = new Properties();
+ props.putAll(originals);
+ connector.start(props);
+ } catch (CopycatException e) {
+ throw new CopycatException("Connector threw an exception while starting", e);
+ }
+
+ connectors.put(connName, connector);
+
+ log.info("Finished creating connector {}", connName);
+ }
+
+ private static Connector instantiateConnector(Class<? extends Connector> connClass) {
+ try {
+ return Utils.newInstance(connClass);
+ } catch (Throwable t) {
+ // Catches normal exceptions due to instantiation errors as well as any runtime errors that
+ // may be caused by user code
+ throw new CopycatException("Failed to create connector instance", t);
+ }
+ }
+
+ public Map<ConnectorTaskId, Map<String, String>> reconfigureConnectorTasks(String connName, int maxTasks, List<String> sinkTopics) {
+ log.trace("Reconfiguring connector tasks for {}", connName);
+
+ Connector connector = connectors.get(connName);
+ if (connector == null)
+ throw new CopycatException("Connector " + connName + " not found in this worker.");
+
+ Map<ConnectorTaskId, Map<String, String>> result = new HashMap<>();
+ String taskClassName = connector.taskClass().getName();
+ int index = 0;
+ for (Properties taskProps : connector.taskConfigs(maxTasks)) {
+ ConnectorTaskId taskId = new ConnectorTaskId(connName, index);
+ index++;
+ Map<String, String> taskConfig = Utils.propsToStringMap(taskProps);
+ taskConfig.put(TaskConfig.TASK_CLASS_CONFIG, taskClassName);
+ if (sinkTopics != null)
+ taskConfig.put(SinkTask.TOPICS_CONFIG, Utils.join(sinkTopics, ","));
+ result.put(taskId, taskConfig);
+ }
+ return result;
+ }
+
+ public void stopConnector(String connName) {
+ log.info("Stopping connector {}", connName);
+
+ Connector connector = connectors.get(connName);
+ if (connector == null)
+ throw new CopycatException("Connector " + connName + " not found in this worker.");
+
+ try {
+ connector.stop();
+ } catch (CopycatException e) {
+ log.error("Error shutting down connector {}: ", connector, e);
+ }
+
+ connectors.remove(connName);
+
+ log.info("Stopped connector {}", connName);
+ }
+
+ /**
+ * Get the IDs of the connectors currently running in this worker.
+ */
+ public Set<String> connectorNames() {
+ return connectors.keySet();
+ }
+
+ /**
* Add a new task.
* @param id Globally unique ID for this task.
- * @param taskClassName name of the {@link org.apache.kafka.copycat.connector.Task}
- * class to instantiate. Must be a subclass of either
- * {@link org.apache.kafka.copycat.source.SourceTask} or
- * {@link org.apache.kafka.copycat.sink.SinkTask}.
- * @param props configuration options for the task
+ * @param taskConfig the parsed task configuration
*/
- public void addTask(ConnectorTaskId id, String taskClassName, Properties props) {
+ public void addTask(ConnectorTaskId id, TaskConfig taskConfig) {
+ log.info("Creating task {}", id);
+
if (tasks.containsKey(id)) {
String msg = "Task already exists in this worker; the herder should not have requested "
+ "that this : " + id;
@@ -150,7 +258,7 @@ public class Worker {
throw new CopycatException(msg);
}
- final Task task = instantiateTask(taskClassName);
+ final Task task = instantiateTask(taskConfig.getClass(TaskConfig.TASK_CLASS_CONFIG).asSubclass(Task.class));
// Decide which type of worker task we need based on the type of task.
final WorkerTask workerTask;
@@ -171,20 +279,30 @@ public class Worker {
// Start the task before adding modifying any state, any exceptions are caught higher up the
// call chain and there's no cleanup to do here
+ Properties props = new Properties();
+ props.putAll(taskConfig.originals());
workerTask.start(props);
+ if (task instanceof SourceTask) {
+ WorkerSourceTask workerSourceTask = (WorkerSourceTask) workerTask;
+ sourceTaskOffsetCommitter.schedule(id, workerSourceTask);
+ }
tasks.put(id, workerTask);
}
- private static Task instantiateTask(String taskClassName) {
+ private static Task instantiateTask(Class<? extends Task> taskClass) {
try {
- return Utils.newInstance(Class.forName(taskClassName).asSubclass(Task.class));
- } catch (ClassNotFoundException e) {
+ return Utils.newInstance(taskClass);
+ } catch (KafkaException e) {
throw new CopycatException("Task class not found", e);
}
}
public void stopTask(ConnectorTaskId id) {
+ log.info("Stopping task {}", id);
+
WorkerTask task = getTask(id);
+ if (task instanceof WorkerSourceTask)
+ sourceTaskOffsetCommitter.remove(id);
task.stop();
if (!task.awaitStop(config.getLong(WorkerConfig.TASK_SHUTDOWN_GRACEFUL_TIMEOUT_MS_CONFIG)))
log.error("Graceful stop of task {} failed.", task);
@@ -192,6 +310,13 @@ public class Worker {
tasks.remove(id);
}
+ /**
+ * Get the IDs of the tasks currently running in this worker.
+ */
+ public Set<ConnectorTaskId> taskIds() {
+ return tasks.keySet();
+ }
+
private WorkerTask getTask(ConnectorTaskId id) {
WorkerTask task = tasks.get(id);
if (task == null) {
http://git-wip-us.apache.org/repos/asf/kafka/blob/2e617735/copycat/runtime/src/main/java/org/apache/kafka/copycat/runtime/distributed/ClusterConfigState.java
----------------------------------------------------------------------
diff --git a/copycat/runtime/src/main/java/org/apache/kafka/copycat/runtime/distributed/ClusterConfigState.java b/copycat/runtime/src/main/java/org/apache/kafka/copycat/runtime/distributed/ClusterConfigState.java
index 719dd09..a46141e 100644
--- a/copycat/runtime/src/main/java/org/apache/kafka/copycat/runtime/distributed/ClusterConfigState.java
+++ b/copycat/runtime/src/main/java/org/apache/kafka/copycat/runtime/distributed/ClusterConfigState.java
@@ -19,10 +19,8 @@ package org.apache.kafka.copycat.runtime.distributed;
import org.apache.kafka.copycat.util.ConnectorTaskId;
-import java.util.ArrayList;
-import java.util.Collection;
import java.util.Collections;
-import java.util.List;
+import java.util.HashSet;
import java.util.Map;
import java.util.Set;
@@ -30,6 +28,10 @@ import java.util.Set;
* An immutable snapshot of the configuration state of connectors and tasks in a Copycat cluster.
*/
public class ClusterConfigState {
+ public static final ClusterConfigState EMPTY = new ClusterConfigState(-1, Collections.<String, Integer>emptyMap(),
+ Collections.<String, Map<String, String>>emptyMap(), Collections.<ConnectorTaskId, Map<String, String>>emptyMap(),
+ Collections.<String>emptySet());
+
private final long offset;
private final Map<String, Integer> connectorTaskCounts;
private final Map<String, Map<String, String>> connectorConfigs;
@@ -60,8 +62,8 @@ public class ClusterConfigState {
/**
* Get a list of the connectors in this configuration
*/
- public Collection<String> connectors() {
- return connectorTaskCounts.keySet();
+ public Set<String> connectors() {
+ return connectorConfigs.keySet();
}
/**
@@ -83,19 +85,29 @@ public class ClusterConfigState {
}
/**
+ * Get the number of tasks assigned for the given conncetor.
+ * @param connectorName name of the connector to look up tasks for
+ * @return the number of tasks
+ */
+ public int taskCount(String connectorName) {
+ Integer count = connectorTaskCounts.get(connectorName);
+ return count == null ? 0 : count;
+ }
+
+ /**
* Get the current set of task IDs for the specified connector.
* @param connectorName the name of the connector to look up task configs for
* @return the current set of connector task IDs
*/
- public Collection<ConnectorTaskId> tasks(String connectorName) {
+ public Set<ConnectorTaskId> tasks(String connectorName) {
if (inconsistentConnectors.contains(connectorName))
- return Collections.EMPTY_LIST;
+ return Collections.emptySet();
Integer numTasks = connectorTaskCounts.get(connectorName);
if (numTasks == null)
- throw new IllegalArgumentException("Connector does not exist in current configuration.");
+ return Collections.emptySet();
- List<ConnectorTaskId> taskIds = new ArrayList<>();
+ Set<ConnectorTaskId> taskIds = new HashSet<>();
for (int taskIndex = 0; taskIndex < numTasks; taskIndex++) {
ConnectorTaskId taskId = new ConnectorTaskId(connectorName, taskIndex);
taskIds.add(taskId);
@@ -119,4 +131,14 @@ public class ClusterConfigState {
return inconsistentConnectors;
}
+ @Override
+ public String toString() {
+ return "ClusterConfigState{" +
+ "offset=" + offset +
+ ", connectorTaskCounts=" + connectorTaskCounts +
+ ", connectorConfigs=" + connectorConfigs +
+ ", taskConfigs=" + taskConfigs +
+ ", inconsistentConnectors=" + inconsistentConnectors +
+ '}';
+ }
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/2e617735/copycat/runtime/src/main/java/org/apache/kafka/copycat/runtime/distributed/CopycatProtocol.java
----------------------------------------------------------------------
diff --git a/copycat/runtime/src/main/java/org/apache/kafka/copycat/runtime/distributed/CopycatProtocol.java b/copycat/runtime/src/main/java/org/apache/kafka/copycat/runtime/distributed/CopycatProtocol.java
new file mode 100644
index 0000000..a450b1d
--- /dev/null
+++ b/copycat/runtime/src/main/java/org/apache/kafka/copycat/runtime/distributed/CopycatProtocol.java
@@ -0,0 +1,246 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ **/
+
+package org.apache.kafka.copycat.runtime.distributed;
+
+import org.apache.kafka.common.protocol.types.ArrayOf;
+import org.apache.kafka.common.protocol.types.Field;
+import org.apache.kafka.common.protocol.types.Schema;
+import org.apache.kafka.common.protocol.types.SchemaException;
+import org.apache.kafka.common.protocol.types.Struct;
+import org.apache.kafka.common.protocol.types.Type;
+import org.apache.kafka.copycat.util.ConnectorTaskId;
+
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.HashSet;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Map;
+
+/**
+ * This class implements the protocol for Copycat workers in a group. It includes the format of worker state used when
+ * joining the group and distributing assignments, and the format of assignments of connectors and tasks to workers.
+ */
+public class CopycatProtocol {
+ public static final String VERSION_KEY_NAME = "version";
+ public static final String CONFIG_OFFSET_KEY_NAME = "config-offset";
+ public static final String CONNECTOR_KEY_NAME = "connector";
+ public static final String LEADER_KEY_NAME = "leader";
+ public static final String ERROR_KEY_NAME = "error";
+ public static final String TASKS_KEY_NAME = "tasks";
+ public static final String ASSIGNMENT_KEY_NAME = "assignment";
+ public static final int CONNECTOR_TASK = -1;
+
+ public static final short COPYCAT_PROTOCOL_V0 = 0;
+ public static final Schema COPYCAT_PROTOCOL_HEADER_SCHEMA = new Schema(
+ new Field(VERSION_KEY_NAME, Type.INT16));
+ private static final Struct COPYCAT_PROTOCOL_HEADER_V0 = new Struct(COPYCAT_PROTOCOL_HEADER_SCHEMA)
+ .set(VERSION_KEY_NAME, COPYCAT_PROTOCOL_V0);
+
+ public static final Schema CONFIG_STATE_V0 = new Schema(
+ new Field(CONFIG_OFFSET_KEY_NAME, Type.INT64));
+ // Assignments for each worker are a set of connectors and tasks. These are categorized by connector ID. A sentinel
+ // task ID (CONNECTOR_TASK) is used to indicate the connector itself (i.e. that the assignment includes
+ // responsibility for running the Connector instance in addition to any tasks it generates).
+ public static final Schema CONNECTOR_ASSIGNMENT_V0 = new Schema(
+ new Field(CONNECTOR_KEY_NAME, Type.STRING),
+ new Field(TASKS_KEY_NAME, new ArrayOf(Type.INT32)));
+ public static final Schema ASSIGNMENT_V0 = new Schema(
+ new Field(ERROR_KEY_NAME, Type.INT16),
+ new Field(LEADER_KEY_NAME, Type.STRING),
+ new Field(CONFIG_OFFSET_KEY_NAME, Type.INT64),
+ new Field(ASSIGNMENT_KEY_NAME, new ArrayOf(CONNECTOR_ASSIGNMENT_V0)));
+
+ public static ByteBuffer serializeMetadata(ConfigState configState) {
+ Struct struct = new Struct(CONFIG_STATE_V0);
+ struct.set(CONFIG_OFFSET_KEY_NAME, configState.offset());
+ ByteBuffer buffer = ByteBuffer.allocate(COPYCAT_PROTOCOL_HEADER_V0.sizeOf() + CONFIG_STATE_V0.sizeOf(struct));
+ COPYCAT_PROTOCOL_HEADER_V0.writeTo(buffer);
+ CONFIG_STATE_V0.write(buffer, struct);
+ buffer.flip();
+ return buffer;
+ }
+
+ public static ConfigState deserializeMetadata(ByteBuffer buffer) {
+ Struct header = (Struct) COPYCAT_PROTOCOL_HEADER_SCHEMA.read(buffer);
+ Short version = header.getShort(VERSION_KEY_NAME);
+ checkVersionCompatibility(version);
+ Struct struct = (Struct) CONFIG_STATE_V0.read(buffer);
+ long configOffset = struct.getLong(CONFIG_OFFSET_KEY_NAME);
+ return new ConfigState(configOffset);
+ }
+
+ public static ByteBuffer serializeAssignment(Assignment assignment) {
+ Struct struct = new Struct(ASSIGNMENT_V0);
+ struct.set(ERROR_KEY_NAME, assignment.error());
+ struct.set(LEADER_KEY_NAME, assignment.leader());
+ struct.set(CONFIG_OFFSET_KEY_NAME, assignment.offset());
+ List<Struct> taskAssignments = new ArrayList<>();
+ for (Map.Entry<String, List<Integer>> connectorEntry : assignment.asMap().entrySet()) {
+ Struct taskAssignment = new Struct(CONNECTOR_ASSIGNMENT_V0);
+ taskAssignment.set(CONNECTOR_KEY_NAME, connectorEntry.getKey());
+ List<Integer> tasks = connectorEntry.getValue();
+ taskAssignment.set(TASKS_KEY_NAME, tasks.toArray());
+ taskAssignments.add(taskAssignment);
+ }
+ struct.set(ASSIGNMENT_KEY_NAME, taskAssignments.toArray());
+
+ ByteBuffer buffer = ByteBuffer.allocate(COPYCAT_PROTOCOL_HEADER_V0.sizeOf() + ASSIGNMENT_V0.sizeOf(struct));
+ COPYCAT_PROTOCOL_HEADER_V0.writeTo(buffer);
+ ASSIGNMENT_V0.write(buffer, struct);
+ buffer.flip();
+ return buffer;
+ }
+
+ public static Assignment deserializeAssignment(ByteBuffer buffer) {
+ Struct header = (Struct) COPYCAT_PROTOCOL_HEADER_SCHEMA.read(buffer);
+ Short version = header.getShort(VERSION_KEY_NAME);
+ checkVersionCompatibility(version);
+ Struct struct = (Struct) ASSIGNMENT_V0.read(buffer);
+ short error = struct.getShort(ERROR_KEY_NAME);
+ String leader = struct.getString(LEADER_KEY_NAME);
+ long offset = struct.getLong(CONFIG_OFFSET_KEY_NAME);
+ List<String> connectorIds = new ArrayList<>();
+ List<ConnectorTaskId> taskIds = new ArrayList<>();
+ for (Object structObj : struct.getArray(ASSIGNMENT_KEY_NAME)) {
+ Struct assignment = (Struct) structObj;
+ String connector = assignment.getString(CONNECTOR_KEY_NAME);
+ for (Object taskIdObj : assignment.getArray(TASKS_KEY_NAME)) {
+ Integer taskId = (Integer) taskIdObj;
+ if (taskId == CONNECTOR_TASK)
+ connectorIds.add(connector);
+ else
+ taskIds.add(new ConnectorTaskId(connector, taskId));
+ }
+ }
+ return new Assignment(error, leader, offset, connectorIds, taskIds);
+ }
+
+ public static class ConfigState {
+ private final long offset;
+
+ public ConfigState(long offset) {
+ this.offset = offset;
+ }
+
+ public long offset() {
+ return offset;
+ }
+
+ @Override
+ public String toString() {
+ return "ConfigState{" +
+ "offset=" + offset +
+ '}';
+ }
+ }
+
+ public static class Assignment {
+ public static final short NO_ERROR = 0;
+ // Configuration offsets mismatched in a way that the leader could not resolve. Workers should read to the end
+ // of the config log and try to re-join
+ public static final short CONFIG_MISMATCH = 1;
+
+ private final short error;
+ private final String leader;
+ private final long offset;
+ private final List<String> connectorIds;
+ private final List<ConnectorTaskId> taskIds;
+
+ /**
+ * Create an assignment indicating responsibility for the given connector instances and task Ids.
+ * @param connectorIds list of connectors that the worker should instantiate and run
+ * @param taskIds list of task IDs that the worker should instantiate and run
+ */
+ public Assignment(short error, String leader, long configOffset,
+ List<String> connectorIds, List<ConnectorTaskId> taskIds) {
+ this.error = error;
+ this.leader = leader;
+ this.offset = configOffset;
+ this.taskIds = taskIds;
+ this.connectorIds = connectorIds;
+ }
+
+ public short error() {
+ return error;
+ }
+
+ public String leader() {
+ return leader;
+ }
+
+ public boolean failed() {
+ return error != NO_ERROR;
+ }
+
+ public long offset() {
+ return offset;
+ }
+
+ public List<String> connectors() {
+ return connectorIds;
+ }
+
+ public List<ConnectorTaskId> tasks() {
+ return taskIds;
+ }
+
+ @Override
+ public String toString() {
+ return "Assignment{" +
+ "error=" + error +
+ ", leader='" + leader + '\'' +
+ ", offset=" + offset +
+ ", connectorIds=" + connectorIds +
+ ", taskIds=" + taskIds +
+ '}';
+ }
+
+ private Map<String, List<Integer>> asMap() {
+ // Using LinkedHashMap preserves the ordering, which is helpful for tests and debugging
+ Map<String, List<Integer>> taskMap = new LinkedHashMap<>();
+ for (String connectorId : new HashSet<>(connectorIds)) {
+ List<Integer> connectorTasks = taskMap.get(connectorId);
+ if (connectorTasks == null) {
+ connectorTasks = new ArrayList<>();
+ taskMap.put(connectorId, connectorTasks);
+ }
+ connectorTasks.add(CONNECTOR_TASK);
+ }
+ for (ConnectorTaskId taskId : taskIds) {
+ String connectorId = taskId.connector();
+ List<Integer> connectorTasks = taskMap.get(connectorId);
+ if (connectorTasks == null) {
+ connectorTasks = new ArrayList<>();
+ taskMap.put(connectorId, connectorTasks);
+ }
+ connectorTasks.add(taskId.task());
+ }
+ return taskMap;
+ }
+ }
+
+ private static void checkVersionCompatibility(short version) {
+ // check for invalid versions
+ if (version < COPYCAT_PROTOCOL_V0)
+ throw new SchemaException("Unsupported subscription version: " + version);
+
+ // otherwise, assume versions can be parsed as V0
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/kafka/blob/2e617735/copycat/runtime/src/main/java/org/apache/kafka/copycat/runtime/distributed/DistributedHerder.java
----------------------------------------------------------------------
diff --git a/copycat/runtime/src/main/java/org/apache/kafka/copycat/runtime/distributed/DistributedHerder.java b/copycat/runtime/src/main/java/org/apache/kafka/copycat/runtime/distributed/DistributedHerder.java
index 5273658..bf3229d 100644
--- a/copycat/runtime/src/main/java/org/apache/kafka/copycat/runtime/distributed/DistributedHerder.java
+++ b/copycat/runtime/src/main/java/org/apache/kafka/copycat/runtime/distributed/DistributedHerder.java
@@ -17,304 +17,627 @@
package org.apache.kafka.copycat.runtime.distributed;
+import org.apache.kafka.clients.consumer.ConsumerWakeupException;
+import org.apache.kafka.common.config.ConfigException;
import org.apache.kafka.common.utils.Utils;
-import org.apache.kafka.copycat.connector.Connector;
+import org.apache.kafka.copycat.connector.ConnectorContext;
import org.apache.kafka.copycat.errors.CopycatException;
import org.apache.kafka.copycat.runtime.ConnectorConfig;
import org.apache.kafka.copycat.runtime.Herder;
import org.apache.kafka.copycat.runtime.HerderConnectorContext;
+import org.apache.kafka.copycat.runtime.TaskConfig;
import org.apache.kafka.copycat.runtime.Worker;
import org.apache.kafka.copycat.sink.SinkConnector;
-import org.apache.kafka.copycat.sink.SinkTask;
import org.apache.kafka.copycat.storage.KafkaConfigStorage;
import org.apache.kafka.copycat.util.Callback;
import org.apache.kafka.copycat.util.ConnectorTaskId;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import java.util.*;
+import java.util.Collection;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Queue;
+import java.util.Set;
+import java.util.concurrent.Callable;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.LinkedBlockingDeque;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+import java.util.concurrent.atomic.AtomicBoolean;
/**
- * Distributed "herder" that coordinates with other workers to spread work across multiple processes.
+ * <p>
+ * Distributed "herder" that coordinates with other workers to spread work across multiple processes.
+ * </p>
+ * <p>
+ * Under the hood, this is implemented as a group managed by Kafka's group membership facilities (i.e. the generalized
+ * group/consumer coordinator). Each instance of DistributedHerder joins the group and indicates what it's current
+ * configuration state is (where it is in the configuration log). The group coordinator selects one member to take
+ * this information and assign each instance a subset of the active connectors & tasks to execute. This assignment
+ * is currently performed in a simple round-robin fashion, but this is not guaranteed -- the herder may also choose
+ * to, e.g., use a sticky assignment to avoid the usual start/stop costs associated with connectors and tasks. Once
+ * an assignment is received, the DistributedHerder simply runs its assigned connectors and tasks in a Worker.
+ * </p>
+ * <p>
+ * In addition to distributing work, the DistributedHerder uses the leader determined during the work assignment
+ * to select a leader for this generation of the group who is responsible for other tasks that can only be performed
+ * by a single node at a time. Most importantly, this includes writing updated configurations for connectors and tasks,
+ * (and therefore, also for creating, destroy, and scaling up/down connectors).
+ * </p>
*/
-public class DistributedHerder implements Herder {
+public class DistributedHerder implements Herder, Runnable {
private static final Logger log = LoggerFactory.getLogger(DistributedHerder.class);
- private Worker worker;
- private KafkaConfigStorage configStorage;
+ private final Worker worker;
+ private final KafkaConfigStorage configStorage;
private ClusterConfigState configState;
- private HashMap<String, ConnectorState> connectors = new HashMap<>();
- public DistributedHerder(Worker worker) {
- this.worker = worker;
- this.configStorage = new KafkaConfigStorage(worker.getInternalValueConverter(),
- new ConnectorConfigCallback(), new TaskConfigCallback());
+ private final int workerSyncTimeoutMs;
+ private final int workerUnsyncBackoffMs;
+
+ private final WorkerGroupMember member;
+ private final AtomicBoolean stopping;
+ private final CountDownLatch stopLatch = new CountDownLatch(1);
+
+ // Track enough information about the current membership state to be able to determine which requests via the API
+ // and the from other nodes are safe to process
+ private boolean rebalanceResolved;
+ private CopycatProtocol.Assignment assignment;
+
+ // To handle most external requests, like creating or destroying a connector, we can use a generic request where
+ // the caller specifies all the code that should be executed.
+ private final Queue<HerderRequest> requests = new LinkedBlockingDeque<>();
+ // Config updates can be collected and applied together when possible. Also, we need to take care to rebalance when
+ // needed (e.g. task reconfiguration, which requires everyone to coordinate offset commits).
+ private final Set<String> connectorConfigUpdates = new HashSet<>();
+ private boolean needsReconfigRebalance;
+
+ public DistributedHerder(Worker worker, Map<String, ?> configs) {
+ this(worker, configs, null, null);
}
- // Public for testing (mock KafkaConfigStorage)
- public DistributedHerder(Worker worker, KafkaConfigStorage configStorage) {
+ // public for testing
+ public DistributedHerder(Worker worker, Map<String, ?> configs, KafkaConfigStorage configStorage, WorkerGroupMember member) {
this.worker = worker;
- this.configStorage = configStorage;
+ if (configStorage != null) {
+ // For testing. Assume configuration has already been performed
+ this.configStorage = configStorage;
+ } else {
+ this.configStorage = new KafkaConfigStorage(worker.getInternalValueConverter(), connectorConfigCallback(), taskConfigCallback());
+ this.configStorage.configure(configs);
+ }
+ configState = ClusterConfigState.EMPTY;
+
+ DistributedHerderConfig config = new DistributedHerderConfig(configs);
+ this.workerSyncTimeoutMs = config.getInt(DistributedHerderConfig.WORKER_SYNC_TIMEOUT_MS_CONFIG);
+ this.workerUnsyncBackoffMs = config.getInt(DistributedHerderConfig.WORKER_UNSYNC_BACKOFF_MS_CONFIG);
+
+ this.member = member != null ? member : new WorkerGroupMember(config, this.configStorage, rebalanceListener());
+ stopping = new AtomicBoolean(false);
+
+ rebalanceResolved = true; // If we still need to follow up after a rebalance occurred, starting up tasks
+ needsReconfigRebalance = false;
}
- public synchronized void configure(Map<String, ?> configs) {
- configStorage.configure(configs);
+ @Override
+ public void start() {
+ Thread thread = new Thread(this);
+ thread.start();
}
- public synchronized void start() {
- log.info("Herder starting");
+ public void run() {
+ try {
+ log.info("Herder starting");
- configStorage.start();
+ configStorage.start();
- log.info("Restoring connectors from stored configs");
- restoreConnectors();
+ log.info("Herder started");
- log.info("Herder started");
+ while (!stopping.get()) {
+ tick();
+ }
+
+ halt();
+
+ log.info("Herder stopped");
+ } finally {
+ stopLatch.countDown();
+ }
}
- public synchronized void stop() {
- log.info("Herder stopping");
+ // public for testing
+ public void tick() {
+ // The main loop does two primary things: 1) drive the group membership protocol, responding to rebalance events
+ // as they occur, and 2) handle external requests targeted at the leader. All the "real" work of the herder is
+ // performed in this thread, which keeps synchronization straightforward at the cost of some operations possibly
+ // blocking up this thread (especially those in callbacks due to rebalance events).
- // There's no coordination/hand-off to do here since this is all standalone. Instead, we
- // should just clean up the stuff we normally would, i.e. cleanly checkpoint and shutdown all
- // the tasks.
- for (Map.Entry<String, ConnectorState> entry : connectors.entrySet()) {
- ConnectorState state = entry.getValue();
- stopConnector(state);
+ try {
+ member.ensureActive();
+ // Ensure we're in a good state in our group. If not restart and everything should be setup to rejoin
+ if (!handleRebalanceCompleted()) return;
+ } catch (ConsumerWakeupException e) {
+ // May be due to a request from another thread, or might be stopping. If the latter, we need to check the
+ // flag immediately. If the former, we need to re-run the ensureActive call since we can't handle requests
+ // unless we're in the group.
+ return;
}
- connectors.clear();
- if (configStorage != null) {
- configStorage.stop();
- configStorage = null;
+ // Process any external requests
+ while (!requests.isEmpty()) {
+ HerderRequest request = requests.poll();
+ try {
+ request.callback().onCompletion(null, request.action().call());
+ } catch (Throwable t) {
+ request.callback().onCompletion(t, null);
+ }
}
- log.info("Herder stopped");
- }
+ // Process any configuration updates
+ synchronized (this) {
+ if (needsReconfigRebalance || !connectorConfigUpdates.isEmpty()) {
+ // Connector reconfigs only need local updates since there is no coordination between workers required.
+ // However, if connectors were added or removed, work needs to be rebalanced since we have more work
+ // items to distribute among workers.
+ ClusterConfigState newConfigState = configStorage.snapshot();
+ if (!newConfigState.connectors().equals(configState.connectors()))
+ needsReconfigRebalance = true;
+ configState = newConfigState;
+ if (needsReconfigRebalance) {
+ // Task reconfigs require a rebalance. Request the rebalance, clean out state, and then restart
+ // this loop, which will then ensure the rebalance occurs without any other requests being
+ // processed until it completes.
+ member.requestRejoin();
+ // Any connector config updates will be addressed during the rebalance too
+ connectorConfigUpdates.clear();
+ needsReconfigRebalance = false;
+ return;
+ } else if (!connectorConfigUpdates.isEmpty()) {
+ // If we only have connector config updates, we can just bounce the updated connectors that are
+ // currently assigned to this worker.
+ Set<String> localConnectors = worker.connectorNames();
+ for (String connectorName : connectorConfigUpdates) {
+ if (!localConnectors.contains(connectorName))
+ continue;
+ worker.stopConnector(connectorName);
+ // The update may be a deletion, so verify we actually need to restart the connector
+ if (configState.connectors().contains(connectorName))
+ startConnector(connectorName);
+ }
+ connectorConfigUpdates.clear();
+ }
+ }
+ }
- @Override
- public synchronized void addConnector(Map<String, String> connectorProps,
- Callback<String> callback) {
+ // Let the group take any actions it needs to
try {
- // Ensure the config is written to storage first
- ConnectorConfig connConfig = new ConnectorConfig(connectorProps);
- String connName = connConfig.getString(ConnectorConfig.NAME_CONFIG);
- configStorage.putConnectorConfig(connName, connectorProps);
+ member.poll(Long.MAX_VALUE);
+ // Ensure we're in a good state in our group. If not restart and everything should be setup to rejoin
+ if (!handleRebalanceCompleted()) return;
+ } catch (ConsumerWakeupException e) { // FIXME should not be ConsumerWakeupException
+ // Ignore. Just indicates we need to check the exit flag, for requested actions, etc.
+ }
+ }
- ConnectorState connState = createConnector(connConfig);
- if (callback != null)
- callback.onCompletion(null, connState.name);
- // This should always be a new job, create jobs from scratch
- createConnectorTasks(connState);
- } catch (CopycatException e) {
- if (callback != null)
- callback.onCompletion(e, null);
+ // public for testing
+ public void halt() {
+ synchronized (this) {
+ // Clean up any connectors and tasks that are still running.
+ log.info("Stopping connectors and tasks that are still assigned to this worker.");
+ for (String connName : new HashSet<>(worker.connectorNames())) {
+ try {
+ worker.stopConnector(connName);
+ } catch (Throwable t) {
+ log.error("Failed to shut down connector " + connName, t);
+ }
+ }
+ for (ConnectorTaskId taskId : new HashSet<>(worker.taskIds())) {
+ try {
+ worker.stopTask(taskId);
+ } catch (Throwable t) {
+ log.error("Failed to shut down task " + taskId, t);
+ }
+ }
+
+ member.stop();
+
+ // Explicitly fail any outstanding requests so they actually get a response and get an understandable reason
+ // for their failure
+ while (!requests.isEmpty()) {
+ HerderRequest request = requests.poll();
+ request.callback().onCompletion(new CopycatException("Worker is shutting down"), null);
+ }
+
+ if (configStorage != null)
+ configStorage.stop();
}
}
@Override
- public synchronized void deleteConnector(String name, Callback<Void> callback) {
- try {
- destroyConnector(name);
- if (callback != null)
- callback.onCompletion(null, null);
- } catch (CopycatException e) {
- if (callback != null)
- callback.onCompletion(e, null);
+ public void stop() {
+ log.info("Herder stopping");
+
+ stopping.set(true);
+ member.wakeup();
+ while (stopLatch.getCount() > 0) {
+ try {
+ stopLatch.await();
+ } catch (InterruptedException e) {
+ // ignore, should not happen
+ }
}
}
@Override
- public synchronized void requestTaskReconfiguration(String connName) {
- ConnectorState state = connectors.get(connName);
- if (state == null) {
- log.error("Task that requested reconfiguration does not exist: {}", connName);
+ public synchronized void addConnector(final Map<String, String> connectorProps,
+ final Callback<String> callback) {
+ final ConnectorConfig connConfig;
+ final String connName;
+ try {
+ connConfig = new ConnectorConfig(connectorProps);
+ connName = connConfig.getString(ConnectorConfig.NAME_CONFIG);
+ } catch (Throwable t) {
+ if (callback != null)
+ callback.onCompletion(t, null);
return;
}
- updateConnectorTasks(state);
+
+ log.debug("Submitting connector config {}", connName);
+
+ requests.add(new HerderRequest(
+ new Callable<Void>() {
+ @Override
+ public Void call() throws Exception {
+ if (!isLeader())
+ throw new NotLeaderException("Only the leader can add connectors.");
+
+ log.debug("Submitting connector config {}", connName);
+ configStorage.putConnectorConfig(connName, connectorProps);
+
+ return null;
+ }
+ },
+ new Callback<Void>() {
+ @Override
+ public void onCompletion(Throwable error, Void result) {
+ if (callback == null) return;
+
+ if (error != null)
+ callback.onCompletion(error, null);
+ else
+ callback.onCompletion(null, connName);
+ }
+ }));
+ member.wakeup();
}
- // Creates and configures the connector. Does not setup any tasks
- private ConnectorState createConnector(ConnectorConfig connConfig) {
- String connName = connConfig.getString(ConnectorConfig.NAME_CONFIG);
- String className = connConfig.getString(ConnectorConfig.CONNECTOR_CLASS_CONFIG);
- log.info("Creating connector {} of type {}", connName, className);
- int maxTasks = connConfig.getInt(ConnectorConfig.TASKS_MAX_CONFIG);
- List<String> topics = connConfig.getList(ConnectorConfig.TOPICS_CONFIG); // Sinks only
- Properties configs = connConfig.unusedProperties();
-
- if (connectors.containsKey(connName)) {
- log.error("Ignoring request to create connector due to conflicting connector name");
- throw new CopycatException("Connector with name " + connName + " already exists");
- }
+ @Override
+ public synchronized void deleteConnector(final String connName, final Callback<Void> callback) {
+ log.debug("Submitting connector config deletion {}", connName);
+
+ requests.add(new HerderRequest(
+ new Callable<Void>() {
+ @Override
+ public Void call() throws Exception {
+ if (!isLeader())
+ throw new NotLeaderException("Only the leader can delete connectors.");
+
+ log.debug("Submitting null connector config {}", connName);
+ configStorage.putConnectorConfig(connName, null);
+ return null;
+ }
+ },
+ new Callback<Void>() {
+ @Override
+ public void onCompletion(Throwable error, Void result) {
+ if (callback != null)
+ callback.onCompletion(error, null);
+ }
+ }
+ ));
+ member.wakeup();
+ }
- final Connector connector;
- try {
- connector = instantiateConnector(className);
- } catch (Throwable t) {
- // Catches normal exceptions due to instantiation errors as well as any runtime errors that
- // may be caused by user code
- throw new CopycatException("Failed to create connector instance", t);
- }
- connector.initialize(new HerderConnectorContext(this, connName));
- try {
- connector.start(configs);
- } catch (CopycatException e) {
- throw new CopycatException("Connector threw an exception while starting", e);
- }
- ConnectorState state = new ConnectorState(connName, connector, maxTasks, topics);
- connectors.put(connName, state);
+ @Override
+ public synchronized void requestTaskReconfiguration(final String connName) {
+ requests.add(new HerderRequest(
+ new Callable<Void>() {
+ @Override
+ public Void call() throws Exception {
+ reconfigureConnector(connName);
+ return null;
+ }
+ }
+ ));
+ member.wakeup();
+ }
- log.info("Finished creating connector {}", connName);
- return state;
+ private boolean isLeader() {
+ return assignment != null && member.memberId().equals(assignment.leader());
}
- private static Connector instantiateConnector(String className) {
- try {
- return Utils.newInstance(className, Connector.class);
- } catch (ClassNotFoundException e) {
- throw new CopycatException("Couldn't instantiate connector class", e);
+ /**
+ * Handle post-assignment operations, either trying to resolve issues that kept assignment from completing, getting
+ * this node into sync and its work started. Since
+ *
+ * @return false if we couldn't finish
+ */
+ private boolean handleRebalanceCompleted() {
+ if (this.rebalanceResolved)
+ return true;
+
+ rebalanceResolved = true;
+
+ // We need to handle a variety of cases after a rebalance:
+ // 1. Assignment failed
+ // 1a. We are the leader for the round. We will be leader again if we rejoin now, so we need to catch up before
+ // even attempting to. If we can't we should drop out of the group because we will block everyone from making
+ // progress. We can backoff and try rejoining later.
+ // 1b. We are not the leader. We might need to catch up. If we're already caught up we can rejoin immediately,
+ // otherwise, we just want to wait indefinitely to catch up and rejoin whenver we're finally ready.
+ // 2. Assignment succeeded.
+ // 2a. We are caught up on configs. Awesome! We can proceed to run our assigned work.
+ // 2b. We need to try to catch up. We can do this potentially indefinitely because if it takes to long, we'll
+ // be kicked out of the group anyway due to lack of heartbeats.
+
+ boolean needsReadToEnd = false;
+ long syncConfigsTimeoutMs = Long.MAX_VALUE;
+ boolean needsRejoin = false;
+ if (assignment.failed()) {
+ needsRejoin = true;
+ if (isLeader()) {
+ log.warn("Join group completed, but assignment failed and we are the leader. Reading to end of config and retrying.");
+ needsReadToEnd = true;
+ syncConfigsTimeoutMs = workerSyncTimeoutMs;
+ } else if (configState.offset() < assignment.offset()) {
+ log.warn("Join group completed, but assignment failed and we lagging. Reading to end of config and retrying.");
+ needsReadToEnd = true;
+ } else {
+ log.warn("Join group completed, but assignment failed. We were up to date, so just retrying.");
+ }
+ } else {
+ if (configState.offset() < assignment.offset()) {
+ log.warn("Catching up to assignment's config offset.");
+ needsReadToEnd = true;
+ }
}
- }
- private void destroyConnector(String connName) {
- log.info("Destroying connector {}", connName);
- ConnectorState state = connectors.get(connName);
- if (state == null) {
- log.error("Failed to destroy connector {} because it does not exist", connName);
- throw new CopycatException("Connector does not exist");
+ if (needsReadToEnd) {
+ // Force exiting this method to avoid creating any connectors/tasks and require immediate rejoining if
+ // we timed out. This should only happen if we were the leader and didn't finish quickly enough, in which
+ // case we've waited a long time and should have already left the group OR the timeout should have been
+ // very long and not having finished also indicates we've waited longer than the session timeout.
+ if (!readConfigToEnd(syncConfigsTimeoutMs))
+ needsRejoin = true;
}
- stopConnector(state);
- configStorage.putConnectorConfig(state.name, null);
- connectors.remove(state.name);
+ if (needsRejoin) {
+ member.requestRejoin();
+ return false;
+ }
- log.info("Finished destroying connector {}", connName);
+ // Should still validate that they match since we may have gone *past* the required offset, in which case we
+ // should *not* start any tasks and rejoin
+ if (configState.offset() != assignment.offset()) {
+ log.info("Current config state offset {} does not match group assignment {}. Forcing rebalance.", configState.offset(), assignment.offset());
+ member.requestRejoin();
+ return false;
+ }
+
+ startWork();
+
+ return true;
}
- // Stops a connectors tasks, then the connector
- private void stopConnector(ConnectorState state) {
- removeConnectorTasks(state);
+ /**
+ * Try to read to the end of the config log within the given timeout
+ * @param timeoutMs maximum time to wait to sync to the end of the log
+ * @return true if successful, false if timed out
+ */
+ private boolean readConfigToEnd(long timeoutMs) {
+ log.info("Current config state offset {} is behind group assignment {}, reading to end of config log", configState.offset(), assignment.offset());
try {
- state.connector.stop();
- } catch (CopycatException e) {
- log.error("Error shutting down connector {}: ", state.connector, e);
+ configStorage.readToEnd().get(timeoutMs, TimeUnit.MILLISECONDS);
+ configState = configStorage.snapshot();
+ log.info("Finished reading to end of log and updated config snapshot, new config log offset: {}", configState.offset());
+ return true;
+ } catch (TimeoutException e) {
+ log.warn("Didn't reach end of config log quickly enough", e);
+ // TODO: With explicit leave group support, it would be good to explicitly leave the group *before* this
+ // backoff since it'll be longer than the session timeout
+ if (isLeader())
+ backoff(workerUnsyncBackoffMs);
+ return false;
+ } catch (InterruptedException | ExecutionException e) {
+ throw new CopycatException("Error trying to catch up after assignment", e);
}
}
- private void createConnectorTasks(ConnectorState state) {
- String taskClassName = state.connector.taskClass().getName();
-
- log.info("Creating tasks for connector {} of type {}", state.name, taskClassName);
-
- List<Properties> taskConfigs = state.connector.taskConfigs(state.maxTasks);
-
- // Generate the final configs, including framework provided settings
- Map<ConnectorTaskId, Properties> taskProps = new HashMap<>();
- for (int i = 0; i < taskConfigs.size(); i++) {
- ConnectorTaskId taskId = new ConnectorTaskId(state.name, i);
- Properties config = taskConfigs.get(i);
- // TODO: This probably shouldn't be in the Herder. It's nice to have Copycat ensure the list of topics
- // is automatically provided to tasks since it is required by the framework, but this
- String subscriptionTopics = Utils.join(state.inputTopics, ",");
- if (state.connector instanceof SinkConnector) {
- // Make sure we don't modify the original since the connector may reuse it internally
- Properties configForSink = new Properties();
- configForSink.putAll(config);
- configForSink.setProperty(SinkTask.TOPICS_CONFIG, subscriptionTopics);
- config = configForSink;
- }
- taskProps.put(taskId, config);
- }
+ private void backoff(long ms) {
+ Utils.sleep(ms);
+ }
- // And initiate the tasks
- for (int i = 0; i < taskConfigs.size(); i++) {
- ConnectorTaskId taskId = new ConnectorTaskId(state.name, i);
- Properties config = taskProps.get(taskId);
+ private void startWork() {
+ // Start assigned connectors and tasks
+ log.info("Starting connectors and tasks using config offset {}", assignment.offset());
+ for (String connectorName : assignment.connectors()) {
try {
- worker.addTask(taskId, taskClassName, config);
- // We only need to store the task IDs so we can clean up.
- state.tasks.add(taskId);
- } catch (Throwable e) {
- log.error("Failed to add task {}: ", taskId, e);
- // Swallow this so we can continue updating the rest of the tasks
- // FIXME what's the proper response? Kill all the tasks? Consider this the same as a task
- // that died after starting successfully.
+ startConnector(connectorName);
+ } catch (ConfigException e) {
+ log.error("Couldn't instantiate connector " + connectorName + " because it has an invalid connector " +
+ "configuration. This connector will not execute until reconfigured.", e);
}
}
- }
-
- private void removeConnectorTasks(ConnectorState state) {
- Iterator<ConnectorTaskId> taskIter = state.tasks.iterator();
- while (taskIter.hasNext()) {
- ConnectorTaskId taskId = taskIter.next();
+ for (ConnectorTaskId taskId : assignment.tasks()) {
try {
- worker.stopTask(taskId);
- taskIter.remove();
- } catch (CopycatException e) {
- log.error("Failed to stop task {}: ", taskId, e);
- // Swallow this so we can continue stopping the rest of the tasks
- // FIXME: Forcibly kill the task?
+ log.info("Starting task {}", taskId);
+ Map<String, String> configs = configState.taskConfig(taskId);
+ TaskConfig taskConfig = new TaskConfig(configs);
+ worker.addTask(taskId, taskConfig);
+ } catch (ConfigException e) {
+ log.error("Couldn't instantiate task " + taskId + " because it has an invalid task " +
+ "configuration. This task will not execute until reconfigured.", e);
}
}
}
- private void updateConnectorTasks(ConnectorState state) {
- removeConnectorTasks(state);
- createConnectorTasks(state);
+ // Helper for starting a connector with the given name, which will extract & parse the config, generate connector
+ // context and add to the worker. This needs to be called from within the main worker thread for this herder.
+ private void startConnector(String connectorName) {
+ log.info("Starting connector {}", connectorName);
+ Map<String, String> configs = configState.connectorConfig(connectorName);
+ ConnectorConfig connConfig = new ConnectorConfig(configs);
+ String connName = connConfig.getString(ConnectorConfig.NAME_CONFIG);
+ ConnectorContext ctx = new HerderConnectorContext(DistributedHerder.this, connName);
+ worker.addConnector(connConfig, ctx);
+
+ // Immediately request configuration since this could be a brand new connector. However, also only update those
+ // task configs if they are actually different from the existing ones to avoid unnecessary updates when this is
+ // just restoring an existing connector.
+ reconfigureConnector(connName);
}
- private void restoreConnectors() {
- configState = configStorage.snapshot();
- Collection<String> connNames = configState.connectors();
- for (String connName : connNames) {
- log.info("Restoring connector {}", connName);
- Map<String, String> connProps = configState.connectorConfig(connName);
- ConnectorConfig connConfig = new ConnectorConfig(connProps);
- ConnectorState connState = createConnector(connConfig);
- // Because this coordinator is standalone, connectors are only restored when this process
- // starts and we know there can't be any existing tasks. So in this special case we're able
- // to just create the tasks rather than having to check for existing tasks and sort out
- // whether they need to be reconfigured.
- createConnectorTasks(connState);
+ // Updates configurations for a connector by requesting them from the connector, filling in parameters provided
+ // by the system, then checks whether any configs have actually changed before submitting the new configs to storage
+ private void reconfigureConnector(String connName) {
+ Map<String, String> configs = configState.connectorConfig(connName);
+ ConnectorConfig connConfig = new ConnectorConfig(configs);
+
+ List<String> sinkTopics = null;
+ if (SinkConnector.class.isAssignableFrom(connConfig.getClass(ConnectorConfig.CONNECTOR_CLASS_CONFIG)))
+ sinkTopics = connConfig.getList(ConnectorConfig.TOPICS_CONFIG);
+
+ Map<ConnectorTaskId, Map<String, String>> taskProps
+ = worker.reconfigureConnectorTasks(connName, connConfig.getInt(ConnectorConfig.TASKS_MAX_CONFIG), sinkTopics);
+ boolean changed = false;
+ int currentNumTasks = configState.taskCount(connName);
+ if (taskProps.size() != currentNumTasks) {
+ log.debug("Change in connector task count from {} to {}, writing updated task configurations", currentNumTasks, taskProps.size());
+ changed = true;
+ } else {
+ for (Map.Entry<ConnectorTaskId, Map<String, String>> taskConfig : taskProps.entrySet()) {
+ if (!taskConfig.getValue().equals(configState.taskConfig(taskConfig.getKey()))) {
+ log.debug("Change in task configurations, writing updated task configurations");
+ changed = true;
+ break;
+ }
+ }
+ }
+ if (changed) {
+ // FIXME: Configs should only be written by the leader to avoid conflicts due to zombies. However, until the
+ // REST API is available to forward this request, we need to do this on the worker that generates the config
+ configStorage.putTaskConfigs(taskProps);
}
}
+ private class HerderRequest {
+ private final Callable<Void> action;
+ private final Callback<Void> callback;
- private static class ConnectorState {
- public String name;
- public Connector connector;
- public int maxTasks;
- public List<String> inputTopics;
- Set<ConnectorTaskId> tasks;
+ public HerderRequest(Callable<Void> action, Callback<Void> callback) {
+ this.action = action;
+ this.callback = callback;
+ }
- public ConnectorState(String name, Connector connector, int maxTasks,
- List<String> inputTopics) {
- this.name = name;
- this.connector = connector;
- this.maxTasks = maxTasks;
- this.inputTopics = inputTopics;
- this.tasks = new HashSet<>();
+ public HerderRequest(Callable<Void> action) {
+ this.action = action;
+ this.callback = DEFAULT_CALLBACK;
}
- }
- private class ConnectorConfigCallback implements Callback<String> {
- @Override
- public void onCompletion(Throwable error, String result) {
- configState = configStorage.snapshot();
- // FIXME
+ public Callable<Void> action() {
+ return action;
+ }
+
+ public Callback<Void> callback() {
+ return callback;
}
}
- private class TaskConfigCallback implements Callback<List<ConnectorTaskId>> {
+ private static final Callback<Void> DEFAULT_CALLBACK = new Callback<Void>() {
@Override
- public void onCompletion(Throwable error, List<ConnectorTaskId> result) {
- configState = configStorage.snapshot();
- // FIXME
+ public void onCompletion(Throwable error, Void result) {
+ if (error != null)
+ log.error("HerderRequest's action threw an exception: ", error);
}
+ };
+
+
+ // Config callbacks are triggered from the KafkaConfigStorage thread
+ private Callback<String> connectorConfigCallback() {
+ return new Callback<String>() {
+ @Override
+ public void onCompletion(Throwable error, String connector) {
+ log.debug("Connector {} config updated", connector);
+ // Stage the update and wake up the work thread. Connector config *changes* only need the one connector
+ // to be bounced. However, this callback may also indicate a connector *addition*, which does require
+ // a rebalance, so we need to be careful about what operation we request.
+ synchronized (DistributedHerder.this) {
+ connectorConfigUpdates.add(connector);
+ }
+ member.wakeup();
+ }
+ };
}
+ private Callback<List<ConnectorTaskId>> taskConfigCallback() {
+ return new Callback<List<ConnectorTaskId>>() {
+ @Override
+ public void onCompletion(Throwable error, List<ConnectorTaskId> tasks) {
+ log.debug("Tasks {} configs updated", tasks);
+ // Stage the update and wake up the work thread. No need to record the set of tasks here because task reconfigs
+ // always need a rebalance to ensure offsets get committed.
+ // TODO: As an optimization, some task config updates could avoid a rebalance. In particular, single-task
+ // connectors clearly don't need any coordination.
+ synchronized (DistributedHerder.this) {
+ needsReconfigRebalance = true;
+ }
+ member.wakeup();
+ }
+ };
+ }
+
+ // Rebalances are triggered internally from the group member, so these are always executed in the work thread.
+ private WorkerRebalanceListener rebalanceListener() {
+ return new WorkerRebalanceListener() {
+ @Override
+ public void onAssigned(CopycatProtocol.Assignment assignment) {
+ // This callback just logs the info and saves it. The actual response is handled in the main loop, which
+ // ensures the group member's logic for rebalancing can complete, potentially long-running steps to
+ // catch up (or backoff if we fail) not executed in a callback, and so we'll be able to invoke other
+ // group membership actions (e.g., we may need to explicitly leave the group if we cannot handle the
+ // assigned tasks).
+ log.info("Joined group and got assignment: {}", assignment);
+ DistributedHerder.this.assignment = assignment;
+ rebalanceResolved = false;
+ }
+ @Override
+ public void onRevoked(String leader, Collection<String> connectors, Collection<ConnectorTaskId> tasks) {
+ log.info("Rebalance started");
+
+ // Note that since we don't reset the assignment, we we don't revoke leadership here. During a rebalance,
+ // it is still important to have a leader that can write configs, offsets, etc.
+
+ // TODO: Parallelize this. We should be able to request all connectors and tasks to stop, then wait on all of
+ // them to finish
+ // TODO: Technically we don't have to stop connectors at all until we know they've really been removed from
+ // this worker. Instead, we can let them continue to run but buffer any update requests (which should be
+ // rare anyway). This would avoid a steady stream of start/stop, which probably also includes lots of
+ // unnecessary repeated connections to the source/sink system.
+ for (String connectorName : connectors)
+ worker.stopConnector(connectorName);
+ // TODO: We need to at least commit task offsets, but if we could commit offsets & pause them instead of
+ // stopping them then state could continue to be reused when the task remains on this worker. For example,
+ // this would avoid having to close a connection and then reopen it when the task is assigned back to this
+ // worker again.
+ for (ConnectorTaskId taskId : tasks)
+ worker.stopTask(taskId);
+ }
+ };
+ }
}