You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by gw...@apache.org on 2015/10/24 01:37:42 UTC

[3/3] kafka git commit: KAFKA-2371: Add distributed support for Copycat.

KAFKA-2371: Add distributed support for Copycat.

This adds coordination between DistributedHerders using the generalized consumer
support, allowing automatic balancing of connectors and tasks across workers. A
few pieces that require interaction between workers (resolving config
inconsistencies, forwarding of configuration changes to the leader worker) are
incomplete because they require REST API support to implement properly.

Author: Ewen Cheslack-Postava <me...@ewencp.org>

Reviewers: Jason Gustafson, Gwen Shapira

Closes #321 from ewencp/kafka-2371-distributed-herder


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/2e617735
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/2e617735
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/2e617735

Branch: refs/heads/trunk
Commit: 2e61773590c0ba86cb8813e6ba17bf6ee33f4461
Parents: 21443f2
Author: Ewen Cheslack-Postava <me...@ewencp.org>
Authored: Fri Oct 23 16:37:30 2015 -0700
Committer: Gwen Shapira <cs...@gmail.com>
Committed: Fri Oct 23 16:37:30 2015 -0700

----------------------------------------------------------------------
 build.gradle                                    |   1 +
 checkstyle/import-control.xml                   |   1 +
 .../clients/consumer/RoundRobinAssignor.java    |  35 +-
 .../consumer/internals/AbstractCoordinator.java |   8 +-
 .../kafka/common/utils/CircularIterator.java    |  54 ++
 config/copycat-distributed.properties           |   2 +
 .../kafka/copycat/file/FileStreamSinkTask.java  |  12 +-
 .../copycat/file/FileStreamSourceTask.java      |  17 +-
 .../kafka/copycat/cli/CopycatDistributed.java   |   7 +-
 .../kafka/copycat/runtime/ConnectorConfig.java  |   2 +-
 .../kafka/copycat/runtime/TaskConfig.java       |  54 ++
 .../apache/kafka/copycat/runtime/Worker.java    | 145 +++-
 .../runtime/distributed/ClusterConfigState.java |  40 +-
 .../runtime/distributed/CopycatProtocol.java    | 246 +++++++
 .../runtime/distributed/DistributedHerder.java  | 733 +++++++++++++------
 .../distributed/DistributedHerderConfig.java    | 192 +++++
 .../runtime/distributed/NotLeaderException.java |  38 +
 .../runtime/distributed/WorkerCoordinator.java  | 288 ++++++++
 .../runtime/distributed/WorkerGroupMember.java  | 184 +++++
 .../distributed/WorkerRebalanceListener.java    |  38 +
 .../runtime/standalone/StandaloneHerder.java    | 168 ++---
 .../copycat/storage/KafkaConfigStorage.java     |  64 +-
 .../storage/KafkaOffsetBackingStore.java        |   2 +
 .../kafka/copycat/util/ConnectorTaskId.java     |  10 +-
 .../kafka/copycat/runtime/WorkerTest.java       | 199 ++++-
 .../distributed/DistributedHerderTest.java      | 436 ++++++-----
 .../distributed/WorkerCoordinatorTest.java      | 436 +++++++++++
 .../standalone/StandaloneHerderTest.java        |  45 +-
 .../copycat/storage/KafkaConfigStorageTest.java |  49 +-
 .../apache/kafka/copycat/util/TestFuture.java   |  10 +-
 tests/kafkatest/services/copycat.py             |  67 +-
 .../kafkatest/tests/copycat_distributed_test.py |  67 +-
 tests/kafkatest/tests/copycat_test.py           |   5 +-
 .../templates/copycat-distributed.properties    |   7 +-
 34 files changed, 2966 insertions(+), 696 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/2e617735/build.gradle
----------------------------------------------------------------------
diff --git a/build.gradle b/build.gradle
index 16fb981..128c195 100644
--- a/build.gradle
+++ b/build.gradle
@@ -754,6 +754,7 @@ project(':copycat:runtime') {
     testCompile "$easymock"
     testCompile "$powermock"
     testCompile "$powermock_easymock"
+    testCompile project(':clients').sourceSets.test.output
     testRuntime "$slf4jlog4j"
     testRuntime project(":copycat:json")
   }

http://git-wip-us.apache.org/repos/asf/kafka/blob/2e617735/checkstyle/import-control.xml
----------------------------------------------------------------------
diff --git a/checkstyle/import-control.xml b/checkstyle/import-control.xml
index 6474865..e1ea93c 100644
--- a/checkstyle/import-control.xml
+++ b/checkstyle/import-control.xml
@@ -146,6 +146,7 @@
     <allow pkg="org.apache.kafka.copycat.data" />
     <allow pkg="org.apache.kafka.copycat.errors" />
     <allow pkg="org.apache.kafka.clients" />
+    <allow pkg="org.apache.kafka.test"/>
 
     <subpackage name="source">
       <allow pkg="org.apache.kafka.copycat.connector" />

http://git-wip-us.apache.org/repos/asf/kafka/blob/2e617735/clients/src/main/java/org/apache/kafka/clients/consumer/RoundRobinAssignor.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/RoundRobinAssignor.java b/clients/src/main/java/org/apache/kafka/clients/consumer/RoundRobinAssignor.java
index c5ea2bb..b8dc253 100644
--- a/clients/src/main/java/org/apache/kafka/clients/consumer/RoundRobinAssignor.java
+++ b/clients/src/main/java/org/apache/kafka/clients/consumer/RoundRobinAssignor.java
@@ -14,11 +14,11 @@ package org.apache.kafka.clients.consumer;
 
 import org.apache.kafka.clients.consumer.internals.AbstractPartitionAssignor;
 import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.utils.CircularIterator;
 import org.apache.kafka.common.utils.Utils;
 
 import java.util.ArrayList;
 import java.util.HashMap;
-import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
 import java.util.SortedSet;
@@ -78,37 +78,4 @@ public class RoundRobinAssignor extends AbstractPartitionAssignor {
         return "roundrobin";
     }
 
-    private static class CircularIterator<T> implements Iterator<T> {
-        int i = 0;
-        private List<T> list;
-
-        public CircularIterator(List<T> list) {
-            if (list.isEmpty()) {
-                throw new IllegalArgumentException("CircularIterator can only be used on non-empty lists");
-            }
-            this.list = list;
-        }
-
-        @Override
-        public boolean hasNext() {
-            return true;
-        }
-
-        @Override
-        public T next() {
-            T next = list.get(i);
-            i = (i + 1) % list.size();
-            return next;
-        }
-
-        public T peek() {
-            return list.get(i);
-        }
-
-        @Override
-        public void remove() {
-            throw new UnsupportedOperationException();
-        }
-    }
-
 }

http://git-wip-us.apache.org/repos/asf/kafka/blob/2e617735/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java
index 1ffd2bb..a2b9ec5 100644
--- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java
+++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java
@@ -199,7 +199,7 @@ public abstract class AbstractCoordinator {
         this.memberId = JoinGroupRequest.UNKNOWN_MEMBER_ID;
         rejoinNeeded = true;
     }
-
+    private boolean needsOnLeave = true;
     /**
      * Ensure that the group is active (i.e. joined and synced)
      */
@@ -208,7 +208,10 @@ public abstract class AbstractCoordinator {
             return;
 
         // onLeave only invoked if we have a valid current generation
-        onLeave(generation, memberId);
+        if (needsOnLeave) {
+            onLeave(generation, memberId);
+            needsOnLeave = false;
+        }
 
         while (needRejoin()) {
             ensureCoordinatorKnown();
@@ -225,6 +228,7 @@ public abstract class AbstractCoordinator {
 
             if (future.succeeded()) {
                 onJoin(generation, memberId, protocol, future.value());
+                needsOnLeave = true;
                 heartbeatTask.reset();
             } else {
                 if (future.exception() instanceof UnknownMemberIdException)

http://git-wip-us.apache.org/repos/asf/kafka/blob/2e617735/clients/src/main/java/org/apache/kafka/common/utils/CircularIterator.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/utils/CircularIterator.java b/clients/src/main/java/org/apache/kafka/common/utils/CircularIterator.java
new file mode 100644
index 0000000..00be783
--- /dev/null
+++ b/clients/src/main/java/org/apache/kafka/common/utils/CircularIterator.java
@@ -0,0 +1,54 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ **/
+
+package org.apache.kafka.common.utils;
+
+import java.util.Iterator;
+import java.util.List;
+
+public class CircularIterator<T> implements Iterator<T> {
+    int i = 0;
+    private List<T> list;
+
+    public CircularIterator(List<T> list) {
+        if (list.isEmpty()) {
+            throw new IllegalArgumentException("CircularIterator can only be used on non-empty lists");
+        }
+        this.list = list;
+    }
+
+    @Override
+    public boolean hasNext() {
+        return true;
+    }
+
+    @Override
+    public T next() {
+        T next = list.get(i);
+        i = (i + 1) % list.size();
+        return next;
+    }
+
+    public T peek() {
+        return list.get(i);
+    }
+
+    @Override
+    public void remove() {
+        throw new UnsupportedOperationException();
+    }
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/2e617735/config/copycat-distributed.properties
----------------------------------------------------------------------
diff --git a/config/copycat-distributed.properties b/config/copycat-distributed.properties
index b122413..2ea5b73 100644
--- a/config/copycat-distributed.properties
+++ b/config/copycat-distributed.properties
@@ -18,6 +18,8 @@
 # These are defaults. This file just demonstrates how to override some settings.
 bootstrap.servers=localhost:9092
 
+group.id=copycat-cluster
+
 # The converters specify the format of data in Kafka and how to translate it into Copycat data. Every Copycat user will
 # need to configure these based on the format they want their data in when loaded from or stored into Kafka
 key.converter=org.apache.kafka.copycat.json.JsonConverter

http://git-wip-us.apache.org/repos/asf/kafka/blob/2e617735/copycat/file/src/main/java/org/apache/kafka/copycat/file/FileStreamSinkTask.java
----------------------------------------------------------------------
diff --git a/copycat/file/src/main/java/org/apache/kafka/copycat/file/FileStreamSinkTask.java b/copycat/file/src/main/java/org/apache/kafka/copycat/file/FileStreamSinkTask.java
index 9ea459c..6dfe4a7 100644
--- a/copycat/file/src/main/java/org/apache/kafka/copycat/file/FileStreamSinkTask.java
+++ b/copycat/file/src/main/java/org/apache/kafka/copycat/file/FileStreamSinkTask.java
@@ -38,6 +38,7 @@ import java.util.Properties;
 public class FileStreamSinkTask extends SinkTask {
     private static final Logger log = LoggerFactory.getLogger(FileStreamSinkTask.class);
 
+    private String filename;
     private PrintStream outputStream;
 
     public FileStreamSinkTask() {
@@ -45,12 +46,13 @@ public class FileStreamSinkTask extends SinkTask {
 
     // for testing
     public FileStreamSinkTask(PrintStream outputStream) {
+        filename = null;
         this.outputStream = outputStream;
     }
 
     @Override
     public void start(Properties props) {
-        String filename = props.getProperty(FileStreamSinkConnector.FILE_CONFIG);
+        filename = props.getProperty(FileStreamSinkConnector.FILE_CONFIG);
         if (filename == null) {
             outputStream = System.out;
         } else {
@@ -65,16 +67,24 @@ public class FileStreamSinkTask extends SinkTask {
     @Override
     public void put(Collection<SinkRecord> sinkRecords) {
         for (SinkRecord record : sinkRecords) {
+            log.trace("Writing line to {}: {}", logFilename(), record.value());
             outputStream.println(record.value());
         }
     }
 
     @Override
     public void flush(Map<TopicPartition, OffsetAndMetadata> offsets) {
+        log.trace("Flushing output stream for {}", logFilename());
         outputStream.flush();
     }
 
     @Override
     public void stop() {
+        if (outputStream != System.out)
+            outputStream.close();
+    }
+
+    private String logFilename() {
+        return filename == null ? "stdout" : filename;
     }
 }

http://git-wip-us.apache.org/repos/asf/kafka/blob/2e617735/copycat/file/src/main/java/org/apache/kafka/copycat/file/FileStreamSourceTask.java
----------------------------------------------------------------------
diff --git a/copycat/file/src/main/java/org/apache/kafka/copycat/file/FileStreamSourceTask.java b/copycat/file/src/main/java/org/apache/kafka/copycat/file/FileStreamSourceTask.java
index cf71be3..f2249d0 100644
--- a/copycat/file/src/main/java/org/apache/kafka/copycat/file/FileStreamSourceTask.java
+++ b/copycat/file/src/main/java/org/apache/kafka/copycat/file/FileStreamSourceTask.java
@@ -56,7 +56,7 @@ public class FileStreamSourceTask extends SourceTask {
         }
         topic = props.getProperty(FileStreamSourceConnector.TOPIC_CONFIG);
         if (topic == null)
-            throw new CopycatException("ConsoleSourceTask config missing topic setting");
+            throw new CopycatException("FileStreamSourceTask config missing topic setting");
     }
 
     @Override
@@ -88,6 +88,7 @@ public class FileStreamSourceTask extends SourceTask {
                     streamOffset = 0L;
                 }
                 reader = new BufferedReader(new InputStreamReader(stream));
+                log.debug("Opened {} for reading", logFilename());
             } catch (FileNotFoundException e) {
                 log.warn("Couldn't find file for FileStreamSourceTask, sleeping to wait for it to be created");
                 synchronized (this) {
@@ -113,6 +114,7 @@ public class FileStreamSourceTask extends SourceTask {
             int nread = 0;
             while (readerCopy.ready()) {
                 nread = readerCopy.read(buffer, offset, buffer.length - offset);
+                log.trace("Read {} bytes from {}", nread, logFilename());
 
                 if (nread > 0) {
                     offset += nread;
@@ -126,6 +128,7 @@ public class FileStreamSourceTask extends SourceTask {
                     do {
                         line = extractLine();
                         if (line != null) {
+                            log.trace("Read a line from {}", logFilename());
                             if (records == null)
                                 records = new ArrayList<>();
                             records.add(new SourceRecord(offsetKey(filename), offsetValue(streamOffset), topic, VALUE_SCHEMA, line));
@@ -183,10 +186,12 @@ public class FileStreamSourceTask extends SourceTask {
         log.trace("Stopping");
         synchronized (this) {
             try {
-                stream.close();
-                log.trace("Closed input stream");
+                if (stream != null && stream != System.in) {
+                    stream.close();
+                    log.trace("Closed input stream");
+                }
             } catch (IOException e) {
-                log.error("Failed to close ConsoleSourceTask stream: ", e);
+                log.error("Failed to close FileStreamSourceTask stream: ", e);
             }
             this.notify();
         }
@@ -199,4 +204,8 @@ public class FileStreamSourceTask extends SourceTask {
     private Map<String, Long> offsetValue(Long pos) {
         return Collections.singletonMap(POSITION_FIELD, pos);
     }
+
+    private String logFilename() {
+        return filename == null ? "stdin" : filename;
+    }
 }

http://git-wip-us.apache.org/repos/asf/kafka/blob/2e617735/copycat/runtime/src/main/java/org/apache/kafka/copycat/cli/CopycatDistributed.java
----------------------------------------------------------------------
diff --git a/copycat/runtime/src/main/java/org/apache/kafka/copycat/cli/CopycatDistributed.java b/copycat/runtime/src/main/java/org/apache/kafka/copycat/cli/CopycatDistributed.java
index b0230b2..0ff6e81 100644
--- a/copycat/runtime/src/main/java/org/apache/kafka/copycat/cli/CopycatDistributed.java
+++ b/copycat/runtime/src/main/java/org/apache/kafka/copycat/cli/CopycatDistributed.java
@@ -48,8 +48,8 @@ public class CopycatDistributed {
         Properties workerProps;
         Properties connectorProps;
 
-        if (args.length < 2) {
-            log.info("Usage: CopycatDistributed worker.properties connector1.properties [connector2.properties ...]");
+        if (args.length < 1) {
+            log.info("Usage: CopycatDistributed worker.properties [connector1.properties connector2.properties ...]");
             System.exit(1);
         }
 
@@ -58,8 +58,7 @@ public class CopycatDistributed {
 
         WorkerConfig workerConfig = new WorkerConfig(workerProps);
         Worker worker = new Worker(workerConfig, new KafkaOffsetBackingStore());
-        DistributedHerder herder = new DistributedHerder(worker);
-        herder.configure(workerConfig.originals());
+        DistributedHerder herder = new DistributedHerder(worker, workerConfig.originals());
         final Copycat copycat = new Copycat(worker, herder);
         copycat.start();
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/2e617735/copycat/runtime/src/main/java/org/apache/kafka/copycat/runtime/ConnectorConfig.java
----------------------------------------------------------------------
diff --git a/copycat/runtime/src/main/java/org/apache/kafka/copycat/runtime/ConnectorConfig.java b/copycat/runtime/src/main/java/org/apache/kafka/copycat/runtime/ConnectorConfig.java
index 767c88b..2242299 100644
--- a/copycat/runtime/src/main/java/org/apache/kafka/copycat/runtime/ConnectorConfig.java
+++ b/copycat/runtime/src/main/java/org/apache/kafka/copycat/runtime/ConnectorConfig.java
@@ -59,7 +59,7 @@ public class ConnectorConfig extends AbstractConfig {
     static {
         config = new ConfigDef()
                 .define(NAME_CONFIG, Type.STRING, Importance.HIGH, NAME_DOC)
-                .define(CONNECTOR_CLASS_CONFIG, Type.STRING, Importance.HIGH, CONNECTOR_CLASS_DOC)
+                .define(CONNECTOR_CLASS_CONFIG, Type.CLASS, Importance.HIGH, CONNECTOR_CLASS_DOC)
                 .define(TASKS_MAX_CONFIG, Type.INT, TASKS_MAX_DEFAULT, Importance.HIGH, TASKS_MAX_DOC)
                 .define(TOPICS_CONFIG, Type.LIST, TOPICS_DEFAULT, Importance.HIGH, TOPICS_DOC);
     }

http://git-wip-us.apache.org/repos/asf/kafka/blob/2e617735/copycat/runtime/src/main/java/org/apache/kafka/copycat/runtime/TaskConfig.java
----------------------------------------------------------------------
diff --git a/copycat/runtime/src/main/java/org/apache/kafka/copycat/runtime/TaskConfig.java b/copycat/runtime/src/main/java/org/apache/kafka/copycat/runtime/TaskConfig.java
new file mode 100644
index 0000000..be97879
--- /dev/null
+++ b/copycat/runtime/src/main/java/org/apache/kafka/copycat/runtime/TaskConfig.java
@@ -0,0 +1,54 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ **/
+
+package org.apache.kafka.copycat.runtime;
+
+import org.apache.kafka.common.config.AbstractConfig;
+import org.apache.kafka.common.config.ConfigDef;
+import org.apache.kafka.common.config.ConfigDef.Importance;
+import org.apache.kafka.common.config.ConfigDef.Type;
+
+import java.util.HashMap;
+import java.util.Map;
+
+/**
+ * <p>
+ * Configuration options for Tasks. These only include Copycat system-level configuration
+ * options.
+ * </p>
+ */
+public class TaskConfig extends AbstractConfig {
+
+    public static final String TASK_CLASS_CONFIG = "task.class";
+    private static final String TASK_CLASS_DOC =
+            "Name of the class for this task. Must be a subclass of org.apache.kafka.copycat.connector.Task";
+
+    private static ConfigDef config;
+
+    static {
+        config = new ConfigDef()
+                .define(TASK_CLASS_CONFIG, Type.CLASS, Importance.HIGH, TASK_CLASS_DOC);
+    }
+
+    public TaskConfig() {
+        this(new HashMap<String, String>());
+    }
+
+    public TaskConfig(Map<String, ?> props) {
+        super(config, props);
+    }
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/2e617735/copycat/runtime/src/main/java/org/apache/kafka/copycat/runtime/Worker.java
----------------------------------------------------------------------
diff --git a/copycat/runtime/src/main/java/org/apache/kafka/copycat/runtime/Worker.java b/copycat/runtime/src/main/java/org/apache/kafka/copycat/runtime/Worker.java
index 0fdab4c..b37e49f 100644
--- a/copycat/runtime/src/main/java/org/apache/kafka/copycat/runtime/Worker.java
+++ b/copycat/runtime/src/main/java/org/apache/kafka/copycat/runtime/Worker.java
@@ -17,12 +17,15 @@
 
 package org.apache.kafka.copycat.runtime;
 
+import org.apache.kafka.common.KafkaException;
 import org.apache.kafka.common.utils.SystemTime;
 import org.apache.kafka.common.utils.Time;
 import org.apache.kafka.clients.producer.KafkaProducer;
 import org.apache.kafka.clients.producer.ProducerConfig;
 import org.apache.kafka.common.utils.Utils;
 import org.apache.kafka.copycat.cli.WorkerConfig;
+import org.apache.kafka.copycat.connector.Connector;
+import org.apache.kafka.copycat.connector.ConnectorContext;
 import org.apache.kafka.copycat.connector.Task;
 import org.apache.kafka.copycat.errors.CopycatException;
 import org.apache.kafka.copycat.sink.SinkTask;
@@ -33,8 +36,10 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import java.util.HashMap;
+import java.util.List;
 import java.util.Map;
 import java.util.Properties;
+import java.util.Set;
 
 /**
  * <p>
@@ -55,6 +60,7 @@ public class Worker {
     private Converter internalKeyConverter;
     private Converter internalValueConverter;
     private OffsetBackingStore offsetBackingStore;
+    private HashMap<String, Connector> connectors = new HashMap<>();
     private HashMap<ConnectorTaskId, WorkerTask> tasks = new HashMap<>();
     private KafkaProducer<byte[], byte[]> producer;
     private SourceTaskOffsetCommitter sourceTaskOffsetCommitter;
@@ -106,6 +112,17 @@ public class Worker {
         long started = time.milliseconds();
         long limit = started + config.getLong(WorkerConfig.TASK_SHUTDOWN_GRACEFUL_TIMEOUT_MS_CONFIG);
 
+        for (Map.Entry<String, Connector> entry : connectors.entrySet()) {
+            Connector conn = entry.getValue();
+            log.warn("Shutting down connector {} uncleanly; herder should have shut down connectors before the" +
+                    "Worker is stopped.", conn);
+            try {
+                conn.stop();
+            } catch (CopycatException e) {
+                log.error("Error while shutting down connector " + conn, e);
+            }
+        }
+
         for (Map.Entry<ConnectorTaskId, WorkerTask> entry : tasks.entrySet()) {
             WorkerTask task = entry.getValue();
             log.warn("Shutting down task {} uncleanly; herder should have shut down "
@@ -134,15 +151,106 @@ public class Worker {
     }
 
     /**
+     * Add a new connector.
+     * @param connConfig connector configuration
+     * @param ctx context for the connector
+     */
+    public void addConnector(ConnectorConfig connConfig, ConnectorContext ctx) {
+        String connName = connConfig.getString(ConnectorConfig.NAME_CONFIG);
+        Class<?> maybeConnClass = connConfig.getClass(ConnectorConfig.CONNECTOR_CLASS_CONFIG);
+        log.info("Creating connector {} of type {}", connName, maybeConnClass.getName());
+
+        Class<? extends Connector> connClass;
+        try {
+            connClass = maybeConnClass.asSubclass(Connector.class);
+        } catch (ClassCastException e) {
+            throw new CopycatException("Specified class is not a subclass of Connector: " + maybeConnClass.getName());
+        }
+
+        if (connectors.containsKey(connName))
+            throw new CopycatException("Connector with name " + connName + " already exists");
+
+        final Connector connector = instantiateConnector(connClass);
+        connector.initialize(ctx);
+        try {
+            Map<String, Object> originals = connConfig.originals();
+            Properties props = new Properties();
+            props.putAll(originals);
+            connector.start(props);
+        } catch (CopycatException e) {
+            throw new CopycatException("Connector threw an exception while starting", e);
+        }
+
+        connectors.put(connName, connector);
+
+        log.info("Finished creating connector {}", connName);
+    }
+
+    private static Connector instantiateConnector(Class<? extends Connector> connClass) {
+        try {
+            return Utils.newInstance(connClass);
+        } catch (Throwable t) {
+            // Catches normal exceptions due to instantiation errors as well as any runtime errors that
+            // may be caused by user code
+            throw new CopycatException("Failed to create connector instance", t);
+        }
+    }
+
+    public Map<ConnectorTaskId, Map<String, String>> reconfigureConnectorTasks(String connName, int maxTasks, List<String> sinkTopics) {
+        log.trace("Reconfiguring connector tasks for {}", connName);
+
+        Connector connector = connectors.get(connName);
+        if (connector == null)
+            throw new CopycatException("Connector " + connName + " not found in this worker.");
+
+        Map<ConnectorTaskId, Map<String, String>> result = new HashMap<>();
+        String taskClassName = connector.taskClass().getName();
+        int index = 0;
+        for (Properties taskProps : connector.taskConfigs(maxTasks)) {
+            ConnectorTaskId taskId = new ConnectorTaskId(connName, index);
+            index++;
+            Map<String, String> taskConfig = Utils.propsToStringMap(taskProps);
+            taskConfig.put(TaskConfig.TASK_CLASS_CONFIG, taskClassName);
+            if (sinkTopics != null)
+                taskConfig.put(SinkTask.TOPICS_CONFIG, Utils.join(sinkTopics, ","));
+            result.put(taskId, taskConfig);
+        }
+        return result;
+    }
+
+    public void stopConnector(String connName) {
+        log.info("Stopping connector {}", connName);
+
+        Connector connector = connectors.get(connName);
+        if (connector == null)
+            throw new CopycatException("Connector " + connName + " not found in this worker.");
+
+        try {
+            connector.stop();
+        } catch (CopycatException e) {
+            log.error("Error shutting down connector {}: ", connector, e);
+        }
+
+        connectors.remove(connName);
+
+        log.info("Stopped connector {}", connName);
+    }
+
+    /**
+     * Get the IDs of the connectors currently running in this worker.
+     */
+    public Set<String> connectorNames() {
+        return connectors.keySet();
+    }
+
+    /**
      * Add a new task.
      * @param id Globally unique ID for this task.
-     * @param taskClassName name of the {@link org.apache.kafka.copycat.connector.Task}
-     *                      class to instantiate. Must be a subclass of either
-     *                      {@link org.apache.kafka.copycat.source.SourceTask} or
-     *                      {@link org.apache.kafka.copycat.sink.SinkTask}.
-     * @param props configuration options for the task
+     * @param taskConfig the parsed task configuration
      */
-    public void addTask(ConnectorTaskId id, String taskClassName, Properties props) {
+    public void addTask(ConnectorTaskId id, TaskConfig taskConfig) {
+        log.info("Creating task {}", id);
+
         if (tasks.containsKey(id)) {
             String msg = "Task already exists in this worker; the herder should not have requested "
                     + "that this : " + id;
@@ -150,7 +258,7 @@ public class Worker {
             throw new CopycatException(msg);
         }
 
-        final Task task = instantiateTask(taskClassName);
+        final Task task = instantiateTask(taskConfig.getClass(TaskConfig.TASK_CLASS_CONFIG).asSubclass(Task.class));
 
         // Decide which type of worker task we need based on the type of task.
         final WorkerTask workerTask;
@@ -171,20 +279,30 @@ public class Worker {
 
         // Start the task before adding modifying any state, any exceptions are caught higher up the
         // call chain and there's no cleanup to do here
+        Properties props = new Properties();
+        props.putAll(taskConfig.originals());
         workerTask.start(props);
+        if (task instanceof SourceTask) {
+            WorkerSourceTask workerSourceTask = (WorkerSourceTask) workerTask;
+            sourceTaskOffsetCommitter.schedule(id, workerSourceTask);
+        }
         tasks.put(id, workerTask);
     }
 
-    private static Task instantiateTask(String taskClassName) {
+    private static Task instantiateTask(Class<? extends Task> taskClass) {
         try {
-            return Utils.newInstance(Class.forName(taskClassName).asSubclass(Task.class));
-        } catch (ClassNotFoundException e) {
+            return Utils.newInstance(taskClass);
+        } catch (KafkaException e) {
             throw new CopycatException("Task class not found", e);
         }
     }
 
     public void stopTask(ConnectorTaskId id) {
+        log.info("Stopping task {}", id);
+
         WorkerTask task = getTask(id);
+        if (task instanceof WorkerSourceTask)
+            sourceTaskOffsetCommitter.remove(id);
         task.stop();
         if (!task.awaitStop(config.getLong(WorkerConfig.TASK_SHUTDOWN_GRACEFUL_TIMEOUT_MS_CONFIG)))
             log.error("Graceful stop of task {} failed.", task);
@@ -192,6 +310,13 @@ public class Worker {
         tasks.remove(id);
     }
 
+    /**
+     * Get the IDs of the tasks currently running in this worker.
+     */
+    public Set<ConnectorTaskId> taskIds() {
+        return tasks.keySet();
+    }
+
     private WorkerTask getTask(ConnectorTaskId id) {
         WorkerTask task = tasks.get(id);
         if (task == null) {

http://git-wip-us.apache.org/repos/asf/kafka/blob/2e617735/copycat/runtime/src/main/java/org/apache/kafka/copycat/runtime/distributed/ClusterConfigState.java
----------------------------------------------------------------------
diff --git a/copycat/runtime/src/main/java/org/apache/kafka/copycat/runtime/distributed/ClusterConfigState.java b/copycat/runtime/src/main/java/org/apache/kafka/copycat/runtime/distributed/ClusterConfigState.java
index 719dd09..a46141e 100644
--- a/copycat/runtime/src/main/java/org/apache/kafka/copycat/runtime/distributed/ClusterConfigState.java
+++ b/copycat/runtime/src/main/java/org/apache/kafka/copycat/runtime/distributed/ClusterConfigState.java
@@ -19,10 +19,8 @@ package org.apache.kafka.copycat.runtime.distributed;
 
 import org.apache.kafka.copycat.util.ConnectorTaskId;
 
-import java.util.ArrayList;
-import java.util.Collection;
 import java.util.Collections;
-import java.util.List;
+import java.util.HashSet;
 import java.util.Map;
 import java.util.Set;
 
@@ -30,6 +28,10 @@ import java.util.Set;
  * An immutable snapshot of the configuration state of connectors and tasks in a Copycat cluster.
  */
 public class ClusterConfigState {
+    public static final ClusterConfigState EMPTY = new ClusterConfigState(-1, Collections.<String, Integer>emptyMap(),
+            Collections.<String, Map<String, String>>emptyMap(), Collections.<ConnectorTaskId, Map<String, String>>emptyMap(),
+            Collections.<String>emptySet());
+
     private final long offset;
     private final Map<String, Integer> connectorTaskCounts;
     private final Map<String, Map<String, String>> connectorConfigs;
@@ -60,8 +62,8 @@ public class ClusterConfigState {
     /**
      * Get a list of the connectors in this configuration
      */
-    public Collection<String> connectors() {
-        return connectorTaskCounts.keySet();
+    public Set<String> connectors() {
+        return connectorConfigs.keySet();
     }
 
     /**
@@ -83,19 +85,29 @@ public class ClusterConfigState {
     }
 
     /**
+     * Get the number of tasks assigned for the given conncetor.
+     * @param connectorName name of the connector to look up tasks for
+     * @return the number of tasks
+     */
+    public int taskCount(String connectorName) {
+        Integer count = connectorTaskCounts.get(connectorName);
+        return count == null ? 0 : count;
+    }
+
+    /**
      * Get the current set of task IDs for the specified connector.
      * @param connectorName the name of the connector to look up task configs for
      * @return the current set of connector task IDs
      */
-    public Collection<ConnectorTaskId> tasks(String connectorName) {
+    public Set<ConnectorTaskId> tasks(String connectorName) {
         if (inconsistentConnectors.contains(connectorName))
-            return Collections.EMPTY_LIST;
+            return Collections.emptySet();
 
         Integer numTasks = connectorTaskCounts.get(connectorName);
         if (numTasks == null)
-            throw new IllegalArgumentException("Connector does not exist in current configuration.");
+            return Collections.emptySet();
 
-        List<ConnectorTaskId> taskIds = new ArrayList<>();
+        Set<ConnectorTaskId> taskIds = new HashSet<>();
         for (int taskIndex = 0; taskIndex < numTasks; taskIndex++) {
             ConnectorTaskId taskId = new ConnectorTaskId(connectorName, taskIndex);
             taskIds.add(taskId);
@@ -119,4 +131,14 @@ public class ClusterConfigState {
         return inconsistentConnectors;
     }
 
+    @Override
+    public String toString() {
+        return "ClusterConfigState{" +
+                "offset=" + offset +
+                ", connectorTaskCounts=" + connectorTaskCounts +
+                ", connectorConfigs=" + connectorConfigs +
+                ", taskConfigs=" + taskConfigs +
+                ", inconsistentConnectors=" + inconsistentConnectors +
+                '}';
+    }
 }

http://git-wip-us.apache.org/repos/asf/kafka/blob/2e617735/copycat/runtime/src/main/java/org/apache/kafka/copycat/runtime/distributed/CopycatProtocol.java
----------------------------------------------------------------------
diff --git a/copycat/runtime/src/main/java/org/apache/kafka/copycat/runtime/distributed/CopycatProtocol.java b/copycat/runtime/src/main/java/org/apache/kafka/copycat/runtime/distributed/CopycatProtocol.java
new file mode 100644
index 0000000..a450b1d
--- /dev/null
+++ b/copycat/runtime/src/main/java/org/apache/kafka/copycat/runtime/distributed/CopycatProtocol.java
@@ -0,0 +1,246 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ **/
+
+package org.apache.kafka.copycat.runtime.distributed;
+
+import org.apache.kafka.common.protocol.types.ArrayOf;
+import org.apache.kafka.common.protocol.types.Field;
+import org.apache.kafka.common.protocol.types.Schema;
+import org.apache.kafka.common.protocol.types.SchemaException;
+import org.apache.kafka.common.protocol.types.Struct;
+import org.apache.kafka.common.protocol.types.Type;
+import org.apache.kafka.copycat.util.ConnectorTaskId;
+
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.HashSet;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Map;
+
+/**
+ * This class implements the protocol for Copycat workers in a group. It includes the format of worker state used when
+ * joining the group and distributing assignments, and the format of assignments of connectors and tasks to workers.
+ */
+public class CopycatProtocol {
+    public static final String VERSION_KEY_NAME = "version";
+    public static final String CONFIG_OFFSET_KEY_NAME = "config-offset";
+    public static final String CONNECTOR_KEY_NAME = "connector";
+    public static final String LEADER_KEY_NAME = "leader";
+    public static final String ERROR_KEY_NAME = "error";
+    public static final String TASKS_KEY_NAME = "tasks";
+    public static final String ASSIGNMENT_KEY_NAME = "assignment";
+    public static final int CONNECTOR_TASK = -1;
+
+    public static final short COPYCAT_PROTOCOL_V0 = 0;
+    public static final Schema COPYCAT_PROTOCOL_HEADER_SCHEMA = new Schema(
+            new Field(VERSION_KEY_NAME, Type.INT16));
+    private static final Struct COPYCAT_PROTOCOL_HEADER_V0 = new Struct(COPYCAT_PROTOCOL_HEADER_SCHEMA)
+            .set(VERSION_KEY_NAME, COPYCAT_PROTOCOL_V0);
+
+    public static final Schema CONFIG_STATE_V0 = new Schema(
+            new Field(CONFIG_OFFSET_KEY_NAME, Type.INT64));
+    // Assignments for each worker are a set of connectors and tasks. These are categorized by connector ID. A sentinel
+    // task ID (CONNECTOR_TASK) is used to indicate the connector itself (i.e. that the assignment includes
+    // responsibility for running the Connector instance in addition to any tasks it generates).
+    public static final Schema CONNECTOR_ASSIGNMENT_V0 = new Schema(
+            new Field(CONNECTOR_KEY_NAME, Type.STRING),
+            new Field(TASKS_KEY_NAME, new ArrayOf(Type.INT32)));
+    public static final Schema ASSIGNMENT_V0 = new Schema(
+            new Field(ERROR_KEY_NAME, Type.INT16),
+            new Field(LEADER_KEY_NAME, Type.STRING),
+            new Field(CONFIG_OFFSET_KEY_NAME, Type.INT64),
+            new Field(ASSIGNMENT_KEY_NAME, new ArrayOf(CONNECTOR_ASSIGNMENT_V0)));
+
+    public static ByteBuffer serializeMetadata(ConfigState configState) {
+        Struct struct = new Struct(CONFIG_STATE_V0);
+        struct.set(CONFIG_OFFSET_KEY_NAME, configState.offset());
+        ByteBuffer buffer = ByteBuffer.allocate(COPYCAT_PROTOCOL_HEADER_V0.sizeOf() + CONFIG_STATE_V0.sizeOf(struct));
+        COPYCAT_PROTOCOL_HEADER_V0.writeTo(buffer);
+        CONFIG_STATE_V0.write(buffer, struct);
+        buffer.flip();
+        return buffer;
+    }
+
+    public static ConfigState deserializeMetadata(ByteBuffer buffer) {
+        Struct header = (Struct) COPYCAT_PROTOCOL_HEADER_SCHEMA.read(buffer);
+        Short version = header.getShort(VERSION_KEY_NAME);
+        checkVersionCompatibility(version);
+        Struct struct = (Struct) CONFIG_STATE_V0.read(buffer);
+        long configOffset = struct.getLong(CONFIG_OFFSET_KEY_NAME);
+        return new ConfigState(configOffset);
+    }
+
+    public static ByteBuffer serializeAssignment(Assignment assignment) {
+        Struct struct = new Struct(ASSIGNMENT_V0);
+        struct.set(ERROR_KEY_NAME, assignment.error());
+        struct.set(LEADER_KEY_NAME, assignment.leader());
+        struct.set(CONFIG_OFFSET_KEY_NAME, assignment.offset());
+        List<Struct> taskAssignments = new ArrayList<>();
+        for (Map.Entry<String, List<Integer>> connectorEntry : assignment.asMap().entrySet()) {
+            Struct taskAssignment = new Struct(CONNECTOR_ASSIGNMENT_V0);
+            taskAssignment.set(CONNECTOR_KEY_NAME, connectorEntry.getKey());
+            List<Integer> tasks = connectorEntry.getValue();
+            taskAssignment.set(TASKS_KEY_NAME, tasks.toArray());
+            taskAssignments.add(taskAssignment);
+        }
+        struct.set(ASSIGNMENT_KEY_NAME, taskAssignments.toArray());
+
+        ByteBuffer buffer = ByteBuffer.allocate(COPYCAT_PROTOCOL_HEADER_V0.sizeOf() + ASSIGNMENT_V0.sizeOf(struct));
+        COPYCAT_PROTOCOL_HEADER_V0.writeTo(buffer);
+        ASSIGNMENT_V0.write(buffer, struct);
+        buffer.flip();
+        return buffer;
+    }
+
+    public static Assignment deserializeAssignment(ByteBuffer buffer) {
+        Struct header = (Struct) COPYCAT_PROTOCOL_HEADER_SCHEMA.read(buffer);
+        Short version = header.getShort(VERSION_KEY_NAME);
+        checkVersionCompatibility(version);
+        Struct struct = (Struct) ASSIGNMENT_V0.read(buffer);
+        short error = struct.getShort(ERROR_KEY_NAME);
+        String leader = struct.getString(LEADER_KEY_NAME);
+        long offset = struct.getLong(CONFIG_OFFSET_KEY_NAME);
+        List<String> connectorIds = new ArrayList<>();
+        List<ConnectorTaskId> taskIds = new ArrayList<>();
+        for (Object structObj : struct.getArray(ASSIGNMENT_KEY_NAME)) {
+            Struct assignment = (Struct) structObj;
+            String connector = assignment.getString(CONNECTOR_KEY_NAME);
+            for (Object taskIdObj : assignment.getArray(TASKS_KEY_NAME)) {
+                Integer taskId = (Integer) taskIdObj;
+                if (taskId == CONNECTOR_TASK)
+                    connectorIds.add(connector);
+                else
+                    taskIds.add(new ConnectorTaskId(connector, taskId));
+            }
+        }
+        return new Assignment(error, leader, offset, connectorIds, taskIds);
+    }
+
+    public static class ConfigState {
+        private final long offset;
+
+        public ConfigState(long offset) {
+            this.offset = offset;
+        }
+
+        public long offset() {
+            return offset;
+        }
+
+        @Override
+        public String toString() {
+            return "ConfigState{" +
+                    "offset=" + offset +
+                    '}';
+        }
+    }
+
+    public static class Assignment {
+        public static final short NO_ERROR = 0;
+        // Configuration offsets mismatched in a way that the leader could not resolve. Workers should read to the end
+        // of the config log and try to re-join
+        public static final short CONFIG_MISMATCH = 1;
+
+        private final short error;
+        private final String leader;
+        private final long offset;
+        private final List<String> connectorIds;
+        private final List<ConnectorTaskId> taskIds;
+
+        /**
+         * Create an assignment indicating responsibility for the given connector instances and task Ids.
+         * @param connectorIds list of connectors that the worker should instantiate and run
+         * @param taskIds list of task IDs that the worker should instantiate and run
+         */
+        public Assignment(short error, String leader, long configOffset,
+                          List<String> connectorIds, List<ConnectorTaskId> taskIds) {
+            this.error = error;
+            this.leader = leader;
+            this.offset = configOffset;
+            this.taskIds = taskIds;
+            this.connectorIds = connectorIds;
+        }
+
+        public short error() {
+            return error;
+        }
+
+        public String leader() {
+            return leader;
+        }
+
+        public boolean failed() {
+            return error != NO_ERROR;
+        }
+
+        public long offset() {
+            return offset;
+        }
+
+        public List<String> connectors() {
+            return connectorIds;
+        }
+
+        public List<ConnectorTaskId> tasks() {
+            return taskIds;
+        }
+
+        @Override
+        public String toString() {
+            return "Assignment{" +
+                    "error=" + error +
+                    ", leader='" + leader + '\'' +
+                    ", offset=" + offset +
+                    ", connectorIds=" + connectorIds +
+                    ", taskIds=" + taskIds +
+                    '}';
+        }
+
+        private Map<String, List<Integer>> asMap() {
+            // Using LinkedHashMap preserves the ordering, which is helpful for tests and debugging
+            Map<String, List<Integer>> taskMap = new LinkedHashMap<>();
+            for (String connectorId : new HashSet<>(connectorIds)) {
+                List<Integer> connectorTasks = taskMap.get(connectorId);
+                if (connectorTasks == null) {
+                    connectorTasks = new ArrayList<>();
+                    taskMap.put(connectorId, connectorTasks);
+                }
+                connectorTasks.add(CONNECTOR_TASK);
+            }
+            for (ConnectorTaskId taskId : taskIds) {
+                String connectorId = taskId.connector();
+                List<Integer> connectorTasks = taskMap.get(connectorId);
+                if (connectorTasks == null) {
+                    connectorTasks = new ArrayList<>();
+                    taskMap.put(connectorId, connectorTasks);
+                }
+                connectorTasks.add(taskId.task());
+            }
+            return taskMap;
+        }
+    }
+
+    private static void checkVersionCompatibility(short version) {
+        // check for invalid versions
+        if (version < COPYCAT_PROTOCOL_V0)
+            throw new SchemaException("Unsupported subscription version: " + version);
+
+        // otherwise, assume versions can be parsed as V0
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/2e617735/copycat/runtime/src/main/java/org/apache/kafka/copycat/runtime/distributed/DistributedHerder.java
----------------------------------------------------------------------
diff --git a/copycat/runtime/src/main/java/org/apache/kafka/copycat/runtime/distributed/DistributedHerder.java b/copycat/runtime/src/main/java/org/apache/kafka/copycat/runtime/distributed/DistributedHerder.java
index 5273658..bf3229d 100644
--- a/copycat/runtime/src/main/java/org/apache/kafka/copycat/runtime/distributed/DistributedHerder.java
+++ b/copycat/runtime/src/main/java/org/apache/kafka/copycat/runtime/distributed/DistributedHerder.java
@@ -17,304 +17,627 @@
 
 package org.apache.kafka.copycat.runtime.distributed;
 
+import org.apache.kafka.clients.consumer.ConsumerWakeupException;
+import org.apache.kafka.common.config.ConfigException;
 import org.apache.kafka.common.utils.Utils;
-import org.apache.kafka.copycat.connector.Connector;
+import org.apache.kafka.copycat.connector.ConnectorContext;
 import org.apache.kafka.copycat.errors.CopycatException;
 import org.apache.kafka.copycat.runtime.ConnectorConfig;
 import org.apache.kafka.copycat.runtime.Herder;
 import org.apache.kafka.copycat.runtime.HerderConnectorContext;
+import org.apache.kafka.copycat.runtime.TaskConfig;
 import org.apache.kafka.copycat.runtime.Worker;
 import org.apache.kafka.copycat.sink.SinkConnector;
-import org.apache.kafka.copycat.sink.SinkTask;
 import org.apache.kafka.copycat.storage.KafkaConfigStorage;
 import org.apache.kafka.copycat.util.Callback;
 import org.apache.kafka.copycat.util.ConnectorTaskId;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import java.util.*;
+import java.util.Collection;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Queue;
+import java.util.Set;
+import java.util.concurrent.Callable;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.LinkedBlockingDeque;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+import java.util.concurrent.atomic.AtomicBoolean;
 
 /**
- * Distributed "herder" that coordinates with other workers to spread work across multiple processes.
+ * <p>
+ *     Distributed "herder" that coordinates with other workers to spread work across multiple processes.
+ * </p>
+ * <p>
+ *     Under the hood, this is implemented as a group managed by Kafka's group membership facilities (i.e. the generalized
+ *     group/consumer coordinator). Each instance of DistributedHerder joins the group and indicates what it's current
+ *     configuration state is (where it is in the configuration log). The group coordinator selects one member to take
+ *     this information and assign each instance a subset of the active connectors & tasks to execute. This assignment
+ *     is currently performed in a simple round-robin fashion, but this is not guaranteed -- the herder may also choose
+ *     to, e.g., use a sticky assignment to avoid the usual start/stop costs associated with connectors and tasks. Once
+ *     an assignment is received, the DistributedHerder simply runs its assigned connectors and tasks in a Worker.
+ * </p>
+ * <p>
+ *     In addition to distributing work, the DistributedHerder uses the leader determined during the work assignment
+ *     to select a leader for this generation of the group who is responsible for other tasks that can only be performed
+ *     by a single node at a time. Most importantly, this includes writing updated configurations for connectors and tasks,
+ *     (and therefore, also for creating, destroy, and scaling up/down connectors).
+ * </p>
  */
-public class DistributedHerder implements Herder {
+public class DistributedHerder implements Herder, Runnable {
     private static final Logger log = LoggerFactory.getLogger(DistributedHerder.class);
 
-    private Worker worker;
-    private KafkaConfigStorage configStorage;
+    private final Worker worker;
+    private final KafkaConfigStorage configStorage;
     private ClusterConfigState configState;
-    private HashMap<String, ConnectorState> connectors = new HashMap<>();
 
-    public DistributedHerder(Worker worker) {
-        this.worker = worker;
-        this.configStorage = new KafkaConfigStorage(worker.getInternalValueConverter(),
-                new ConnectorConfigCallback(), new TaskConfigCallback());
+    private final int workerSyncTimeoutMs;
+    private final int workerUnsyncBackoffMs;
+
+    private final WorkerGroupMember member;
+    private final AtomicBoolean stopping;
+    private final CountDownLatch stopLatch = new CountDownLatch(1);
+
+    // Track enough information about the current membership state to be able to determine which requests via the API
+    // and the from other nodes are safe to process
+    private boolean rebalanceResolved;
+    private CopycatProtocol.Assignment assignment;
+
+    // To handle most external requests, like creating or destroying a connector, we can use a generic request where
+    // the caller specifies all the code that should be executed.
+    private final Queue<HerderRequest> requests = new LinkedBlockingDeque<>();
+    // Config updates can be collected and applied together when possible. Also, we need to take care to rebalance when
+    // needed (e.g. task reconfiguration, which requires everyone to coordinate offset commits).
+    private final Set<String> connectorConfigUpdates = new HashSet<>();
+    private boolean needsReconfigRebalance;
+
+    public DistributedHerder(Worker worker, Map<String, ?> configs) {
+        this(worker, configs, null, null);
     }
 
-    // Public for testing (mock KafkaConfigStorage)
-    public DistributedHerder(Worker worker, KafkaConfigStorage configStorage) {
+    // public for testing
+    public DistributedHerder(Worker worker, Map<String, ?> configs, KafkaConfigStorage configStorage, WorkerGroupMember member) {
         this.worker = worker;
-        this.configStorage = configStorage;
+        if (configStorage != null) {
+            // For testing. Assume configuration has already been performed
+            this.configStorage = configStorage;
+        } else {
+            this.configStorage = new KafkaConfigStorage(worker.getInternalValueConverter(), connectorConfigCallback(), taskConfigCallback());
+            this.configStorage.configure(configs);
+        }
+        configState = ClusterConfigState.EMPTY;
+
+        DistributedHerderConfig config = new DistributedHerderConfig(configs);
+        this.workerSyncTimeoutMs = config.getInt(DistributedHerderConfig.WORKER_SYNC_TIMEOUT_MS_CONFIG);
+        this.workerUnsyncBackoffMs = config.getInt(DistributedHerderConfig.WORKER_UNSYNC_BACKOFF_MS_CONFIG);
+
+        this.member = member != null ? member : new WorkerGroupMember(config, this.configStorage, rebalanceListener());
+        stopping = new AtomicBoolean(false);
+
+        rebalanceResolved = true; // If we still need to follow up after a rebalance occurred, starting up tasks
+        needsReconfigRebalance = false;
     }
 
-    public synchronized void configure(Map<String, ?> configs) {
-        configStorage.configure(configs);
+    @Override
+    public void start() {
+        Thread thread = new Thread(this);
+        thread.start();
     }
 
-    public synchronized void start() {
-        log.info("Herder starting");
+    public void run() {
+        try {
+            log.info("Herder starting");
 
-        configStorage.start();
+            configStorage.start();
 
-        log.info("Restoring connectors from stored configs");
-        restoreConnectors();
+            log.info("Herder started");
 
-        log.info("Herder started");
+            while (!stopping.get()) {
+                tick();
+            }
+
+            halt();
+
+            log.info("Herder stopped");
+        } finally {
+            stopLatch.countDown();
+        }
     }
 
-    public synchronized void stop() {
-        log.info("Herder stopping");
+    // public for testing
+    public void tick() {
+        // The main loop does two primary things: 1) drive the group membership protocol, responding to rebalance events
+        // as they occur, and 2) handle external requests targeted at the leader. All the "real" work of the herder is
+        // performed in this thread, which keeps synchronization straightforward at the cost of some operations possibly
+        // blocking up this thread (especially those in callbacks due to rebalance events).
 
-        // There's no coordination/hand-off to do here since this is all standalone. Instead, we
-        // should just clean up the stuff we normally would, i.e. cleanly checkpoint and shutdown all
-        // the tasks.
-        for (Map.Entry<String, ConnectorState> entry : connectors.entrySet()) {
-            ConnectorState state = entry.getValue();
-            stopConnector(state);
+        try {
+            member.ensureActive();
+            // Ensure we're in a good state in our group. If not restart and everything should be setup to rejoin
+            if (!handleRebalanceCompleted()) return;
+        } catch (ConsumerWakeupException e) {
+            // May be due to a request from another thread, or might be stopping. If the latter, we need to check the
+            // flag immediately. If the former, we need to re-run the ensureActive call since we can't handle requests
+            // unless we're in the group.
+            return;
         }
-        connectors.clear();
 
-        if (configStorage != null) {
-            configStorage.stop();
-            configStorage = null;
+        // Process any external requests
+        while (!requests.isEmpty()) {
+            HerderRequest request = requests.poll();
+            try {
+                request.callback().onCompletion(null, request.action().call());
+            } catch (Throwable t) {
+                request.callback().onCompletion(t, null);
+            }
         }
 
-        log.info("Herder stopped");
-    }
+        // Process any configuration updates
+        synchronized (this) {
+            if (needsReconfigRebalance || !connectorConfigUpdates.isEmpty()) {
+                // Connector reconfigs only need local updates since there is no coordination between workers required.
+                // However, if connectors were added or removed, work needs to be rebalanced since we have more work
+                // items to distribute among workers.
+                ClusterConfigState newConfigState = configStorage.snapshot();
+                if (!newConfigState.connectors().equals(configState.connectors()))
+                    needsReconfigRebalance = true;
+                configState = newConfigState;
+                if (needsReconfigRebalance) {
+                    // Task reconfigs require a rebalance. Request the rebalance, clean out state, and then restart
+                    // this loop, which will then ensure the rebalance occurs without any other requests being
+                    // processed until it completes.
+                    member.requestRejoin();
+                    // Any connector config updates will be addressed during the rebalance too
+                    connectorConfigUpdates.clear();
+                    needsReconfigRebalance = false;
+                    return;
+                } else if (!connectorConfigUpdates.isEmpty()) {
+                    // If we only have connector config updates, we can just bounce the updated connectors that are
+                    // currently assigned to this worker.
+                    Set<String> localConnectors = worker.connectorNames();
+                    for (String connectorName : connectorConfigUpdates) {
+                        if (!localConnectors.contains(connectorName))
+                            continue;
+                        worker.stopConnector(connectorName);
+                        // The update may be a deletion, so verify we actually need to restart the connector
+                        if (configState.connectors().contains(connectorName))
+                            startConnector(connectorName);
+                    }
+                    connectorConfigUpdates.clear();
+                }
+            }
+        }
 
-    @Override
-    public synchronized void addConnector(Map<String, String> connectorProps,
-                                          Callback<String> callback) {
+        // Let the group take any actions it needs to
         try {
-            // Ensure the config is written to storage first
-            ConnectorConfig connConfig = new ConnectorConfig(connectorProps);
-            String connName = connConfig.getString(ConnectorConfig.NAME_CONFIG);
-            configStorage.putConnectorConfig(connName, connectorProps);
+            member.poll(Long.MAX_VALUE);
+            // Ensure we're in a good state in our group. If not restart and everything should be setup to rejoin
+            if (!handleRebalanceCompleted()) return;
+        } catch (ConsumerWakeupException e) { // FIXME should not be ConsumerWakeupException
+            // Ignore. Just indicates we need to check the exit flag, for requested actions, etc.
+        }
+    }
 
-            ConnectorState connState = createConnector(connConfig);
-            if (callback != null)
-                callback.onCompletion(null, connState.name);
-            // This should always be a new job, create jobs from scratch
-            createConnectorTasks(connState);
-        } catch (CopycatException e) {
-            if (callback != null)
-                callback.onCompletion(e, null);
+    // public for testing
+    public void halt() {
+        synchronized (this) {
+            // Clean up any connectors and tasks that are still running.
+            log.info("Stopping connectors and tasks that are still assigned to this worker.");
+            for (String connName : new HashSet<>(worker.connectorNames())) {
+                try {
+                    worker.stopConnector(connName);
+                } catch (Throwable t) {
+                    log.error("Failed to shut down connector " + connName, t);
+                }
+            }
+            for (ConnectorTaskId taskId : new HashSet<>(worker.taskIds())) {
+                try {
+                    worker.stopTask(taskId);
+                } catch (Throwable t) {
+                    log.error("Failed to shut down task " + taskId, t);
+                }
+            }
+
+            member.stop();
+
+            // Explicitly fail any outstanding requests so they actually get a response and get an understandable reason
+            // for their failure
+            while (!requests.isEmpty()) {
+                HerderRequest request = requests.poll();
+                request.callback().onCompletion(new CopycatException("Worker is shutting down"), null);
+            }
+
+            if (configStorage != null)
+                configStorage.stop();
         }
     }
 
     @Override
-    public synchronized void deleteConnector(String name, Callback<Void> callback) {
-        try {
-            destroyConnector(name);
-            if (callback != null)
-                callback.onCompletion(null, null);
-        } catch (CopycatException e) {
-            if (callback != null)
-                callback.onCompletion(e, null);
+    public void stop() {
+        log.info("Herder stopping");
+
+        stopping.set(true);
+        member.wakeup();
+        while (stopLatch.getCount() > 0) {
+            try {
+                stopLatch.await();
+            } catch (InterruptedException e) {
+                // ignore, should not happen
+            }
         }
     }
 
     @Override
-    public synchronized void requestTaskReconfiguration(String connName) {
-        ConnectorState state = connectors.get(connName);
-        if (state == null) {
-            log.error("Task that requested reconfiguration does not exist: {}", connName);
+    public synchronized void addConnector(final Map<String, String> connectorProps,
+                                          final Callback<String> callback) {
+        final ConnectorConfig connConfig;
+        final String connName;
+        try {
+            connConfig = new ConnectorConfig(connectorProps);
+            connName = connConfig.getString(ConnectorConfig.NAME_CONFIG);
+        } catch (Throwable t) {
+            if (callback != null)
+                callback.onCompletion(t, null);
             return;
         }
-        updateConnectorTasks(state);
+
+        log.debug("Submitting connector config {}", connName);
+
+        requests.add(new HerderRequest(
+                new Callable<Void>() {
+                    @Override
+                    public Void call() throws Exception {
+                        if (!isLeader())
+                            throw new NotLeaderException("Only the leader can add connectors.");
+
+                        log.debug("Submitting connector config {}", connName);
+                        configStorage.putConnectorConfig(connName, connectorProps);
+
+                        return null;
+                    }
+                },
+                new Callback<Void>() {
+                    @Override
+                    public void onCompletion(Throwable error, Void result) {
+                        if (callback == null) return;
+
+                        if (error != null)
+                            callback.onCompletion(error, null);
+                        else
+                            callback.onCompletion(null, connName);
+                    }
+                }));
+        member.wakeup();
     }
 
-    // Creates and configures the connector. Does not setup any tasks
-    private ConnectorState createConnector(ConnectorConfig connConfig) {
-        String connName = connConfig.getString(ConnectorConfig.NAME_CONFIG);
-        String className = connConfig.getString(ConnectorConfig.CONNECTOR_CLASS_CONFIG);
-        log.info("Creating connector {} of type {}", connName, className);
-        int maxTasks = connConfig.getInt(ConnectorConfig.TASKS_MAX_CONFIG);
-        List<String> topics = connConfig.getList(ConnectorConfig.TOPICS_CONFIG); // Sinks only
-        Properties configs = connConfig.unusedProperties();
-
-        if (connectors.containsKey(connName)) {
-            log.error("Ignoring request to create connector due to conflicting connector name");
-            throw new CopycatException("Connector with name " + connName + " already exists");
-        }
+    @Override
+    public synchronized void deleteConnector(final String connName, final Callback<Void> callback) {
+        log.debug("Submitting connector config deletion {}", connName);
+
+        requests.add(new HerderRequest(
+                new Callable<Void>() {
+                    @Override
+                    public Void call() throws Exception {
+                        if (!isLeader())
+                            throw new NotLeaderException("Only the leader can delete connectors.");
+
+                        log.debug("Submitting null connector config {}", connName);
+                        configStorage.putConnectorConfig(connName, null);
+                        return null;
+                    }
+                },
+                new Callback<Void>() {
+                    @Override
+                    public void onCompletion(Throwable error, Void result) {
+                        if (callback != null)
+                            callback.onCompletion(error, null);
+                    }
+                }
+        ));
+        member.wakeup();
+    }
 
-        final Connector connector;
-        try {
-            connector = instantiateConnector(className);
-        } catch (Throwable t) {
-            // Catches normal exceptions due to instantiation errors as well as any runtime errors that
-            // may be caused by user code
-            throw new CopycatException("Failed to create connector instance", t);
-        }
-        connector.initialize(new HerderConnectorContext(this, connName));
-        try {
-            connector.start(configs);
-        } catch (CopycatException e) {
-            throw new CopycatException("Connector threw an exception while starting", e);
-        }
-        ConnectorState state = new ConnectorState(connName, connector, maxTasks, topics);
-        connectors.put(connName, state);
+    @Override
+    public synchronized void requestTaskReconfiguration(final String connName) {
+        requests.add(new HerderRequest(
+                new Callable<Void>() {
+                    @Override
+                    public Void call() throws Exception {
+                        reconfigureConnector(connName);
+                        return null;
+                    }
+                }
+        ));
+        member.wakeup();
+    }
 
-        log.info("Finished creating connector {}", connName);
 
-        return state;
+    private boolean isLeader() {
+        return assignment != null && member.memberId().equals(assignment.leader());
     }
 
-    private static Connector instantiateConnector(String className) {
-        try {
-            return Utils.newInstance(className, Connector.class);
-        } catch (ClassNotFoundException e) {
-            throw new CopycatException("Couldn't instantiate connector class", e);
+    /**
+     * Handle post-assignment operations, either trying to resolve issues that kept assignment from completing, getting
+     * this node into sync and its work started. Since
+     *
+     * @return false if we couldn't finish
+     */
+    private boolean handleRebalanceCompleted() {
+        if (this.rebalanceResolved)
+            return true;
+
+        rebalanceResolved = true;
+
+        // We need to handle a variety of cases after a rebalance:
+        // 1. Assignment failed
+        //  1a. We are the leader for the round. We will be leader again if we rejoin now, so we need to catch up before
+        //      even attempting to. If we can't we should drop out of the group because we will block everyone from making
+        //      progress. We can backoff and try rejoining later.
+        //  1b. We are not the leader. We might need to catch up. If we're already caught up we can rejoin immediately,
+        //      otherwise, we just want to wait indefinitely to catch up and rejoin whenver we're finally ready.
+        // 2. Assignment succeeded.
+        //  2a. We are caught up on configs. Awesome! We can proceed to run our assigned work.
+        //  2b. We need to try to catch up. We can do this potentially indefinitely because if it takes to long, we'll
+        //      be kicked out of the group anyway due to lack of heartbeats.
+
+        boolean needsReadToEnd = false;
+        long syncConfigsTimeoutMs = Long.MAX_VALUE;
+        boolean needsRejoin = false;
+        if (assignment.failed()) {
+            needsRejoin = true;
+            if (isLeader()) {
+                log.warn("Join group completed, but assignment failed and we are the leader. Reading to end of config and retrying.");
+                needsReadToEnd = true;
+                syncConfigsTimeoutMs = workerSyncTimeoutMs;
+            } else if (configState.offset() < assignment.offset()) {
+                log.warn("Join group completed, but assignment failed and we lagging. Reading to end of config and retrying.");
+                needsReadToEnd = true;
+            } else {
+                log.warn("Join group completed, but assignment failed. We were up to date, so just retrying.");
+            }
+        } else {
+            if (configState.offset() < assignment.offset()) {
+                log.warn("Catching up to assignment's config offset.");
+                needsReadToEnd = true;
+            }
         }
-    }
 
-    private void destroyConnector(String connName) {
-        log.info("Destroying connector {}", connName);
-        ConnectorState state = connectors.get(connName);
-        if (state == null) {
-            log.error("Failed to destroy connector {} because it does not exist", connName);
-            throw new CopycatException("Connector does not exist");
+        if (needsReadToEnd) {
+            // Force exiting this method to avoid creating any connectors/tasks and require immediate rejoining if
+            // we timed out. This should only happen if we were the leader and didn't finish quickly enough, in which
+            // case we've waited a long time and should have already left the group OR the timeout should have been
+            // very long and not having finished also indicates we've waited longer than the session timeout.
+            if (!readConfigToEnd(syncConfigsTimeoutMs))
+                needsRejoin = true;
         }
 
-        stopConnector(state);
-        configStorage.putConnectorConfig(state.name, null);
-        connectors.remove(state.name);
+        if (needsRejoin) {
+            member.requestRejoin();
+            return false;
+        }
 
-        log.info("Finished destroying connector {}", connName);
+        // Should still validate that they match since we may have gone *past* the required offset, in which case we
+        // should *not* start any tasks and rejoin
+        if (configState.offset() != assignment.offset()) {
+            log.info("Current config state offset {} does not match group assignment {}. Forcing rebalance.", configState.offset(), assignment.offset());
+            member.requestRejoin();
+            return false;
+        }
+
+        startWork();
+
+        return true;
     }
 
-    // Stops a connectors tasks, then the connector
-    private void stopConnector(ConnectorState state) {
-        removeConnectorTasks(state);
+    /**
+     * Try to read to the end of the config log within the given timeout
+     * @param timeoutMs maximum time to wait to sync to the end of the log
+     * @return true if successful, false if timed out
+     */
+    private boolean readConfigToEnd(long timeoutMs) {
+        log.info("Current config state offset {} is behind group assignment {}, reading to end of config log", configState.offset(), assignment.offset());
         try {
-            state.connector.stop();
-        } catch (CopycatException e) {
-            log.error("Error shutting down connector {}: ", state.connector, e);
+            configStorage.readToEnd().get(timeoutMs, TimeUnit.MILLISECONDS);
+            configState = configStorage.snapshot();
+            log.info("Finished reading to end of log and updated config snapshot, new config log offset: {}", configState.offset());
+            return true;
+        } catch (TimeoutException e) {
+            log.warn("Didn't reach end of config log quickly enough", e);
+            // TODO: With explicit leave group support, it would be good to explicitly leave the group *before* this
+            // backoff since it'll be longer than the session timeout
+            if (isLeader())
+                backoff(workerUnsyncBackoffMs);
+            return false;
+        } catch (InterruptedException | ExecutionException e) {
+            throw new CopycatException("Error trying to catch up after assignment", e);
         }
     }
 
-    private void createConnectorTasks(ConnectorState state) {
-        String taskClassName = state.connector.taskClass().getName();
-
-        log.info("Creating tasks for connector {} of type {}", state.name, taskClassName);
-
-        List<Properties> taskConfigs = state.connector.taskConfigs(state.maxTasks);
-
-        // Generate the final configs, including framework provided settings
-        Map<ConnectorTaskId, Properties> taskProps = new HashMap<>();
-        for (int i = 0; i < taskConfigs.size(); i++) {
-            ConnectorTaskId taskId = new ConnectorTaskId(state.name, i);
-            Properties config = taskConfigs.get(i);
-            // TODO: This probably shouldn't be in the Herder. It's nice to have Copycat ensure the list of topics
-            // is automatically provided to tasks since it is required by the framework, but this
-            String subscriptionTopics = Utils.join(state.inputTopics, ",");
-            if (state.connector instanceof SinkConnector) {
-                // Make sure we don't modify the original since the connector may reuse it internally
-                Properties configForSink = new Properties();
-                configForSink.putAll(config);
-                configForSink.setProperty(SinkTask.TOPICS_CONFIG, subscriptionTopics);
-                config = configForSink;
-            }
-            taskProps.put(taskId, config);
-        }
+    private void backoff(long ms) {
+        Utils.sleep(ms);
+    }
 
-        // And initiate the tasks
-        for (int i = 0; i < taskConfigs.size(); i++) {
-            ConnectorTaskId taskId = new ConnectorTaskId(state.name, i);
-            Properties config = taskProps.get(taskId);
+    private void startWork() {
+        // Start assigned connectors and tasks
+        log.info("Starting connectors and tasks using config offset {}", assignment.offset());
+        for (String connectorName : assignment.connectors()) {
             try {
-                worker.addTask(taskId, taskClassName, config);
-                // We only need to store the task IDs so we can clean up.
-                state.tasks.add(taskId);
-            } catch (Throwable e) {
-                log.error("Failed to add task {}: ", taskId, e);
-                // Swallow this so we can continue updating the rest of the tasks
-                // FIXME what's the proper response? Kill all the tasks? Consider this the same as a task
-                // that died after starting successfully.
+                startConnector(connectorName);
+            } catch (ConfigException e) {
+                log.error("Couldn't instantiate connector " + connectorName + " because it has an invalid connector " +
+                        "configuration. This connector will not execute until reconfigured.", e);
             }
         }
-    }
-
-    private void removeConnectorTasks(ConnectorState state) {
-        Iterator<ConnectorTaskId> taskIter = state.tasks.iterator();
-        while (taskIter.hasNext()) {
-            ConnectorTaskId taskId = taskIter.next();
+        for (ConnectorTaskId taskId : assignment.tasks()) {
             try {
-                worker.stopTask(taskId);
-                taskIter.remove();
-            } catch (CopycatException e) {
-                log.error("Failed to stop task {}: ", taskId, e);
-                // Swallow this so we can continue stopping the rest of the tasks
-                // FIXME: Forcibly kill the task?
+                log.info("Starting task {}", taskId);
+                Map<String, String> configs = configState.taskConfig(taskId);
+                TaskConfig taskConfig = new TaskConfig(configs);
+                worker.addTask(taskId, taskConfig);
+            } catch (ConfigException e) {
+                log.error("Couldn't instantiate task " + taskId + " because it has an invalid task " +
+                        "configuration. This task will not execute until reconfigured.", e);
             }
         }
     }
 
-    private void updateConnectorTasks(ConnectorState state) {
-        removeConnectorTasks(state);
-        createConnectorTasks(state);
+    // Helper for starting a connector with the given name, which will extract & parse the config, generate connector
+    // context and add to the worker. This needs to be called from within the main worker thread for this herder.
+    private void startConnector(String connectorName) {
+        log.info("Starting connector {}", connectorName);
+        Map<String, String> configs = configState.connectorConfig(connectorName);
+        ConnectorConfig connConfig = new ConnectorConfig(configs);
+        String connName = connConfig.getString(ConnectorConfig.NAME_CONFIG);
+        ConnectorContext ctx = new HerderConnectorContext(DistributedHerder.this, connName);
+        worker.addConnector(connConfig, ctx);
+
+        // Immediately request configuration since this could be a brand new connector. However, also only update those
+        // task configs if they are actually different from the existing ones to avoid unnecessary updates when this is
+        // just restoring an existing connector.
+        reconfigureConnector(connName);
     }
 
-    private void restoreConnectors() {
-        configState = configStorage.snapshot();
-        Collection<String> connNames = configState.connectors();
-        for (String connName : connNames) {
-            log.info("Restoring connector {}", connName);
-            Map<String, String> connProps = configState.connectorConfig(connName);
-            ConnectorConfig connConfig = new ConnectorConfig(connProps);
-            ConnectorState connState = createConnector(connConfig);
-            // Because this coordinator is standalone, connectors are only restored when this process
-            // starts and we know there can't be any existing tasks. So in this special case we're able
-            // to just create the tasks rather than having to check for existing tasks and sort out
-            // whether they need to be reconfigured.
-            createConnectorTasks(connState);
+    // Updates configurations for a connector by requesting them from the connector, filling in parameters provided
+    // by the system, then checks whether any configs have actually changed before submitting the new configs to storage
+    private void reconfigureConnector(String connName) {
+        Map<String, String> configs = configState.connectorConfig(connName);
+        ConnectorConfig connConfig = new ConnectorConfig(configs);
+
+        List<String> sinkTopics = null;
+        if (SinkConnector.class.isAssignableFrom(connConfig.getClass(ConnectorConfig.CONNECTOR_CLASS_CONFIG)))
+            sinkTopics = connConfig.getList(ConnectorConfig.TOPICS_CONFIG);
+
+        Map<ConnectorTaskId, Map<String, String>> taskProps
+                = worker.reconfigureConnectorTasks(connName, connConfig.getInt(ConnectorConfig.TASKS_MAX_CONFIG), sinkTopics);
+        boolean changed = false;
+        int currentNumTasks = configState.taskCount(connName);
+        if (taskProps.size() != currentNumTasks) {
+            log.debug("Change in connector task count from {} to {}, writing updated task configurations", currentNumTasks, taskProps.size());
+            changed = true;
+        } else {
+            for (Map.Entry<ConnectorTaskId, Map<String, String>> taskConfig : taskProps.entrySet()) {
+                if (!taskConfig.getValue().equals(configState.taskConfig(taskConfig.getKey()))) {
+                    log.debug("Change in task configurations, writing updated task configurations");
+                    changed = true;
+                    break;
+                }
+            }
+        }
+        if (changed) {
+            // FIXME: Configs should only be written by the leader to avoid conflicts due to zombies. However, until the
+            // REST API is available to forward this request, we need to do this on the worker that generates the config
+            configStorage.putTaskConfigs(taskProps);
         }
     }
 
 
+    private class HerderRequest {
+        private final Callable<Void> action;
+        private final Callback<Void> callback;
 
-    private static class ConnectorState {
-        public String name;
-        public Connector connector;
-        public int maxTasks;
-        public List<String> inputTopics;
-        Set<ConnectorTaskId> tasks;
+        public HerderRequest(Callable<Void> action, Callback<Void> callback) {
+            this.action = action;
+            this.callback = callback;
+        }
 
-        public ConnectorState(String name, Connector connector, int maxTasks,
-                              List<String> inputTopics) {
-            this.name = name;
-            this.connector = connector;
-            this.maxTasks = maxTasks;
-            this.inputTopics = inputTopics;
-            this.tasks = new HashSet<>();
+        public HerderRequest(Callable<Void> action) {
+            this.action = action;
+            this.callback = DEFAULT_CALLBACK;
         }
-    }
 
-    private class ConnectorConfigCallback implements Callback<String> {
-        @Override
-        public void onCompletion(Throwable error, String result) {
-            configState = configStorage.snapshot();
-            // FIXME
+        public Callable<Void> action() {
+            return action;
+        }
+
+        public Callback<Void> callback() {
+            return callback;
         }
     }
 
-    private class TaskConfigCallback implements Callback<List<ConnectorTaskId>> {
+    private static final Callback<Void> DEFAULT_CALLBACK = new Callback<Void>() {
         @Override
-        public void onCompletion(Throwable error, List<ConnectorTaskId> result) {
-            configState = configStorage.snapshot();
-            // FIXME
+        public void onCompletion(Throwable error, Void result) {
+            if (error != null)
+                log.error("HerderRequest's action threw an exception: ", error);
         }
+    };
+
+
+    // Config callbacks are triggered from the KafkaConfigStorage thread
+    private Callback<String> connectorConfigCallback() {
+        return new Callback<String>() {
+            @Override
+            public void onCompletion(Throwable error, String connector) {
+                log.debug("Connector {} config updated", connector);
+                // Stage the update and wake up the work thread. Connector config *changes* only need the one connector
+                // to be bounced. However, this callback may also indicate a connector *addition*, which does require
+                // a rebalance, so we need to be careful about what operation we request.
+                synchronized (DistributedHerder.this) {
+                    connectorConfigUpdates.add(connector);
+                }
+                member.wakeup();
+            }
+        };
     }
 
+    private Callback<List<ConnectorTaskId>> taskConfigCallback() {
+        return new Callback<List<ConnectorTaskId>>() {
+            @Override
+            public void onCompletion(Throwable error, List<ConnectorTaskId> tasks) {
+                log.debug("Tasks {} configs updated", tasks);
+                // Stage the update and wake up the work thread. No need to record the set of tasks here because task reconfigs
+                // always need a rebalance to ensure offsets get committed.
+                // TODO: As an optimization, some task config updates could avoid a rebalance. In particular, single-task
+                // connectors clearly don't need any coordination.
+                synchronized (DistributedHerder.this) {
+                    needsReconfigRebalance = true;
+                }
+                member.wakeup();
+            }
+        };
+    }
+
+    // Rebalances are triggered internally from the group member, so these are always executed in the work thread.
+    private WorkerRebalanceListener rebalanceListener() {
+        return new WorkerRebalanceListener() {
+            @Override
+            public void onAssigned(CopycatProtocol.Assignment assignment) {
+                // This callback just logs the info and saves it. The actual response is handled in the main loop, which
+                // ensures the group member's logic for rebalancing can complete, potentially long-running steps to
+                // catch up (or backoff if we fail) not executed in a callback, and so we'll be able to invoke other
+                // group membership actions (e.g., we may need to explicitly leave the group if we cannot handle the
+                // assigned tasks).
+                log.info("Joined group and got assignment: {}", assignment);
+                DistributedHerder.this.assignment = assignment;
+                rebalanceResolved = false;
+            }
 
+            @Override
+            public void onRevoked(String leader, Collection<String> connectors, Collection<ConnectorTaskId> tasks) {
+                log.info("Rebalance started");
+
+                // Note that since we don't reset the assignment, we we don't revoke leadership here. During a rebalance,
+                // it is still important to have a leader that can write configs, offsets, etc.
+
+                // TODO: Parallelize this. We should be able to request all connectors and tasks to stop, then wait on all of
+                // them to finish
+                // TODO: Technically we don't have to stop connectors at all until we know they've really been removed from
+                // this worker. Instead, we can let them continue to run but buffer any update requests (which should be
+                // rare anyway). This would avoid a steady stream of start/stop, which probably also includes lots of
+                // unnecessary repeated connections to the source/sink system.
+                for (String connectorName : connectors)
+                    worker.stopConnector(connectorName);
+                // TODO: We need to at least commit task offsets, but if we could commit offsets & pause them instead of
+                // stopping them then state could continue to be reused when the task remains on this worker. For example,
+                // this would avoid having to close a connection and then reopen it when the task is assigned back to this
+                // worker again.
+                for (ConnectorTaskId taskId : tasks)
+                    worker.stopTask(taskId);
+            }
+        };
+    }
 }