You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by gw...@apache.org on 2015/10/24 01:37:40 UTC

[1/3] kafka git commit: KAFKA-2371: Add distributed support for Copycat.

Repository: kafka
Updated Branches:
  refs/heads/trunk 21443f214 -> 2e6177359


http://git-wip-us.apache.org/repos/asf/kafka/blob/2e617735/copycat/runtime/src/test/java/org/apache/kafka/copycat/runtime/distributed/DistributedHerderTest.java
----------------------------------------------------------------------
diff --git a/copycat/runtime/src/test/java/org/apache/kafka/copycat/runtime/distributed/DistributedHerderTest.java b/copycat/runtime/src/test/java/org/apache/kafka/copycat/runtime/distributed/DistributedHerderTest.java
index 0463b85..1213656 100644
--- a/copycat/runtime/src/test/java/org/apache/kafka/copycat/runtime/distributed/DistributedHerderTest.java
+++ b/copycat/runtime/src/test/java/org/apache/kafka/copycat/runtime/distributed/DistributedHerderTest.java
@@ -17,20 +17,19 @@
 
 package org.apache.kafka.copycat.runtime.distributed;
 
-import org.apache.kafka.copycat.connector.Connector;
-import org.apache.kafka.copycat.connector.Task;
+import org.apache.kafka.clients.CommonClientConfigs;
+import org.apache.kafka.copycat.connector.ConnectorContext;
 import org.apache.kafka.copycat.runtime.ConnectorConfig;
-import org.apache.kafka.copycat.runtime.HerderConnectorContext;
+import org.apache.kafka.copycat.runtime.TaskConfig;
 import org.apache.kafka.copycat.runtime.Worker;
-import org.apache.kafka.copycat.sink.SinkConnector;
-import org.apache.kafka.copycat.sink.SinkTask;
 import org.apache.kafka.copycat.source.SourceConnector;
 import org.apache.kafka.copycat.source.SourceTask;
 import org.apache.kafka.copycat.storage.KafkaConfigStorage;
 import org.apache.kafka.copycat.util.Callback;
 import org.apache.kafka.copycat.util.ConnectorTaskId;
-import org.apache.kafka.copycat.util.FutureCallback;
+import org.apache.kafka.copycat.util.TestFuture;
 import org.easymock.EasyMock;
+import org.easymock.IAnswer;
 import org.junit.Before;
 import org.junit.Test;
 import org.junit.runner.RunWith;
@@ -39,251 +38,354 @@ import org.powermock.api.easymock.annotation.Mock;
 import org.powermock.core.classloader.annotations.PowerMockIgnore;
 import org.powermock.core.classloader.annotations.PrepareForTest;
 import org.powermock.modules.junit4.PowerMockRunner;
+import org.powermock.reflect.Whitebox;
 
-import java.util.*;
-import java.util.concurrent.TimeUnit;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.TimeoutException;
 
 @RunWith(PowerMockRunner.class)
-@PrepareForTest({DistributedHerder.class})
+@PrepareForTest(DistributedHerder.class)
 @PowerMockIgnore("javax.management.*")
 public class DistributedHerderTest {
-    private static final List<String> CONNECTOR_NAMES = Arrays.asList("source-test1", "source-test2", "sink-test3");
-    private static final List<String> SOURCE_CONNECTOR_NAMES = Arrays.asList("source-test1", "source-test2");
-    private static final List<String> SINK_CONNECTOR_NAMES = Arrays.asList("sink-test3");
-    private static final String TOPICS_LIST_STR = "topic1,topic2";
+    private static final Map<String, String> HERDER_CONFIG = new HashMap<>();
+    static {
+        HERDER_CONFIG.put(KafkaConfigStorage.CONFIG_TOPIC_CONFIG, "config-topic");
+        HERDER_CONFIG.put(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
+        HERDER_CONFIG.put(DistributedHerderConfig.GROUP_ID_CONFIG, "test-copycat-group");
+    }
 
-    private static final Map<String, String> CONFIG_STORAGE_CONFIG = Collections.singletonMap(KafkaConfigStorage.CONFIG_TOPIC_CONFIG, "config-topic");
+    private static final String CONN1 = "sourceA";
+    private static final String CONN2 = "sourceA";
+    private static final ConnectorTaskId TASK0 = new ConnectorTaskId(CONN1, 0);
+    private static final ConnectorTaskId TASK1 = new ConnectorTaskId(CONN1, 1);
+    private static final ConnectorTaskId TASK2 = new ConnectorTaskId(CONN1, 2);
+    private static final Integer MAX_TASKS = 3;
+    private static final Map<String, String> CONNECTOR_CONFIG = new HashMap<>();
+    static {
+        CONNECTOR_CONFIG.put(ConnectorConfig.NAME_CONFIG, "sourceA");
+        CONNECTOR_CONFIG.put(ConnectorConfig.TASKS_MAX_CONFIG, MAX_TASKS.toString());
+        CONNECTOR_CONFIG.put(ConnectorConfig.TOPICS_CONFIG, "foo,bar");
+        CONNECTOR_CONFIG.put(ConnectorConfig.CONNECTOR_CLASS_CONFIG, BogusSourceConnector.class.getName());
+    }
+    private static final Map<String, String> TASK_CONFIG = new HashMap<>();
+    static {
+        TASK_CONFIG.put(TaskConfig.TASK_CLASS_CONFIG, BogusSourceTask.class.getName());
+    }
+    private static final HashMap<ConnectorTaskId, Map<String, String>> TASK_CONFIGS = new HashMap<>();
+    static {
+        TASK_CONFIGS.put(TASK0, TASK_CONFIG);
+        TASK_CONFIGS.put(TASK1, TASK_CONFIG);
+        TASK_CONFIGS.put(TASK2, TASK_CONFIG);
+    }
+    private static final ClusterConfigState SNAPSHOT = new ClusterConfigState(1, Collections.singletonMap(CONN1, 3),
+            Collections.singletonMap(CONN1, CONNECTOR_CONFIG), TASK_CONFIGS, Collections.<String>emptySet());
 
     @Mock private KafkaConfigStorage configStorage;
+    @Mock private WorkerGroupMember member;
     private DistributedHerder herder;
     @Mock private Worker worker;
     @Mock private Callback<String> createCallback;
+    @Mock private Callback<Void> destroyCallback;
 
-    private Map<String, Map<String, String>> connectorProps;
-    private Map<String, Class<? extends Connector>> connectorClasses;
-    private Map<String, Class<? extends Task>> connectorTaskClasses;
-    private Map<String, Connector> connectors;
-    private Properties taskProps;
+    private Callback<String> connectorConfigCallback;
+    private Callback<List<ConnectorTaskId>> taskConfigCallback;
+    private WorkerRebalanceListener rebalanceListener;
 
     @Before
-    public void setUp() {
+    public void setUp() throws Exception {
         worker = PowerMock.createMock(Worker.class);
-        herder = new DistributedHerder(worker, configStorage);
-
-        connectorProps = new HashMap<>();
-        connectorClasses = new HashMap<>();
-        connectorTaskClasses = new HashMap<>();
-        connectors = new HashMap<>();
-        for (String connectorName : CONNECTOR_NAMES) {
-            Class<? extends Connector> connectorClass = connectorName.contains("source") ? BogusSourceConnector.class : BogusSinkConnector.class;
-            Class<? extends Task> taskClass = connectorName.contains("source") ? BogusSourceTask.class : BogusSinkTask.class;
-            Connector connector = connectorName.contains("source") ? PowerMock.createMock(BogusSourceConnector.class) : PowerMock.createMock(BogusSinkConnector.class);
-
-            Map<String, String> props = new HashMap<>();
-            props.put(ConnectorConfig.NAME_CONFIG, connectorName);
-            props.put(SinkConnector.TOPICS_CONFIG, TOPICS_LIST_STR);
-            props.put(ConnectorConfig.CONNECTOR_CLASS_CONFIG, connectorClass.getName());
-
-            connectorProps.put(connectorName, props);
-            connectorClasses.put(connectorName, connectorClass);
-            connectorTaskClasses.put(connectorName, taskClass);
-            connectors.put(connectorName, connector);
-        }
-
-        PowerMock.mockStatic(DistributedHerder.class);
-
-        // These can be anything since connectors can pass along whatever they want.
-        taskProps = new Properties();
-        taskProps.setProperty("foo", "bar");
+
+        herder = PowerMock.createPartialMock(DistributedHerder.class, new String[]{"backoff"},
+                worker, HERDER_CONFIG, configStorage, member);
+        connectorConfigCallback = Whitebox.invokeMethod(herder, "connectorConfigCallback");
+        taskConfigCallback = Whitebox.invokeMethod(herder, "taskConfigCallback");
+        rebalanceListener = Whitebox.invokeMethod(herder, "rebalanceListener");
     }
 
     @Test
-    public void testCreateSourceConnector() throws Exception {
-        String connectorName = SOURCE_CONNECTOR_NAMES.get(0);
+    public void testJoinAssignment() {
+        // Join group and get assignment
+        EasyMock.expect(member.memberId()).andStubReturn("member");
+        expectRebalance(1, Arrays.asList(CONN1), Arrays.asList(TASK1));
+        expectPostRebalanceCatchup(SNAPSHOT);
+        worker.addConnector(EasyMock.<ConnectorConfig>anyObject(), EasyMock.<ConnectorContext>anyObject());
+        PowerMock.expectLastCall();
+        EasyMock.expect(worker.reconfigureConnectorTasks(CONN1, MAX_TASKS, null)).andReturn(TASK_CONFIGS);
+        worker.addTask(EasyMock.eq(TASK1), EasyMock.<TaskConfig>anyObject());
+        PowerMock.expectLastCall();
+        member.poll(EasyMock.anyInt());
+        PowerMock.expectLastCall();
 
-        expectConfigStorageConfigureStart();
-        expectEmptyRestore();
-        expectAdd(connectorName);
         PowerMock.replayAll();
 
-        herder.configure(CONFIG_STORAGE_CONFIG);
-        herder.start();
-        herder.addConnector(connectorProps.get(connectorName), createCallback);
+        herder.tick();
 
         PowerMock.verifyAll();
     }
 
     @Test
-    public void testCreateSinkConnector() throws Exception {
-        String connectorName = SINK_CONNECTOR_NAMES.get(0);
+    public void testHaltCleansUpWorker() {
+        EasyMock.expect(worker.connectorNames()).andReturn(Collections.singleton(CONN1));
+        worker.stopConnector(CONN1);
+        PowerMock.expectLastCall();
+        EasyMock.expect(worker.taskIds()).andReturn(Collections.singleton(TASK1));
+        worker.stopTask(TASK1);
+        PowerMock.expectLastCall();
+        member.stop();
+        PowerMock.expectLastCall();
+        configStorage.stop();
+        PowerMock.expectLastCall();
 
-        expectConfigStorageConfigureStart();
-        expectEmptyRestore();
-        expectAdd(connectorName);
         PowerMock.replayAll();
 
-        herder.configure(CONFIG_STORAGE_CONFIG);
-        herder.start();
-        herder.addConnector(connectorProps.get(connectorName), createCallback);
+        herder.halt();
 
         PowerMock.verifyAll();
     }
 
     @Test
-    public void testDestroyConnector() throws Exception {
-        String connectorName = SOURCE_CONNECTOR_NAMES.get(0);
-
-        expectConfigStorageConfigureStart();
-        expectEmptyRestore();
-        expectAdd(connectorName);
-        expectDestroy(connectorName);
-        PowerMock.replayAll();
+    public void testCreateConnector() throws Exception {
+        EasyMock.expect(member.memberId()).andStubReturn("leader");
+        expectRebalance(1, Collections.<String>emptyList(), Collections.<ConnectorTaskId>emptyList());
+        expectPostRebalanceCatchup(SNAPSHOT);
 
-        herder.configure(CONFIG_STORAGE_CONFIG);
-        herder.start();
-        herder.addConnector(connectorProps.get(connectorName), createCallback);
+        member.wakeup();
+        PowerMock.expectLastCall();
+        configStorage.putConnectorConfig(CONN1, CONNECTOR_CONFIG);
+        PowerMock.expectLastCall();
+        createCallback.onCompletion(null, CONN1);
+        PowerMock.expectLastCall();
+        member.poll(EasyMock.anyInt());
+        PowerMock.expectLastCall();
+        // No immediate action besides this -- change will be picked up via the config log
 
-        FutureCallback<Void> futureCb = new FutureCallback<>(new Callback<Void>() {
-            @Override
-            public void onCompletion(Throwable error, Void result) {
+        PowerMock.replayAll();
 
-            }
-        });
-        herder.deleteConnector(CONNECTOR_NAMES.get(0), futureCb);
-        futureCb.get(1000L, TimeUnit.MILLISECONDS);
+        herder.addConnector(CONNECTOR_CONFIG, createCallback);
+        herder.tick();
 
         PowerMock.verifyAll();
     }
 
     @Test
-    public void testCreateAndStop() throws Exception {
-        String connectorName = SOURCE_CONNECTOR_NAMES.get(0);
+    public void testDestroyConnector() throws Exception {
+        EasyMock.expect(member.memberId()).andStubReturn("leader");
+        // Start with one connector
+        expectRebalance(1, Arrays.asList(CONN1), Collections.<ConnectorTaskId>emptyList());
+        expectPostRebalanceCatchup(SNAPSHOT);
+        worker.addConnector(EasyMock.<ConnectorConfig>anyObject(), EasyMock.<ConnectorContext>anyObject());
+        PowerMock.expectLastCall();
+        EasyMock.expect(worker.reconfigureConnectorTasks(CONN1, MAX_TASKS, null)).andReturn(TASK_CONFIGS);
+
+        // And delete the connector
+        member.wakeup();
+        PowerMock.expectLastCall();
+        configStorage.putConnectorConfig(CONN1, null);
+        PowerMock.expectLastCall();
+        destroyCallback.onCompletion(null, null);
+        PowerMock.expectLastCall();
+        member.poll(EasyMock.anyInt());
+        PowerMock.expectLastCall();
+        // No immediate action besides this -- change will be picked up via the config log
 
-        expectConfigStorageConfigureStart();
-        expectEmptyRestore();
-        expectAdd(connectorName);
         PowerMock.replayAll();
 
-        herder.configure(CONFIG_STORAGE_CONFIG);
-        herder.start();
-        herder.addConnector(connectorProps.get(connectorName), createCallback);
+        herder.deleteConnector(CONN1, destroyCallback);
+        herder.tick();
 
         PowerMock.verifyAll();
     }
 
     @Test
-    public void testRestoreAndStop() throws Exception {
-        String restoreConnectorName1 = SOURCE_CONNECTOR_NAMES.get(0);
-        String restoreConnectorName2 = SINK_CONNECTOR_NAMES.get(0);
-        String additionalConnectorName = SOURCE_CONNECTOR_NAMES.get(1);
-
-        expectConfigStorageConfigureStart();
-        expectRestore(Arrays.asList(restoreConnectorName1, restoreConnectorName2));
-        expectAdd(additionalConnectorName);
-        // Stopping the herder should correctly stop all restored and new connectors
-        expectStop(restoreConnectorName1);
-        expectStop(restoreConnectorName2);
-        expectStop(additionalConnectorName);
-        configStorage.stop();
+    public void testConnectorConfigAdded() {
+        // If a connector was added, we need to rebalance
+        EasyMock.expect(member.memberId()).andStubReturn("member");
+
+        // join, no configs so no need to catch up on config topic
+        expectRebalance(-1, Collections.<String>emptyList(), Collections.<ConnectorTaskId>emptyList());
+        member.poll(EasyMock.anyInt());
+        PowerMock.expectLastCall();
+
+        // apply config
+        member.wakeup();
+        member.ensureActive();
+        PowerMock.expectLastCall();
+        // Checks for config updates and starts rebalance
+        EasyMock.expect(configStorage.snapshot()).andReturn(SNAPSHOT);
+        member.requestRejoin();
+        PowerMock.expectLastCall();
+        // Performs rebalance and gets new assignment
+        expectRebalance(Collections.<String>emptyList(), Collections.<ConnectorTaskId>emptyList(),
+                CopycatProtocol.Assignment.NO_ERROR, 1, Arrays.asList(CONN1), Collections.<ConnectorTaskId>emptyList());
+        worker.addConnector(EasyMock.<ConnectorConfig>anyObject(), EasyMock.<ConnectorContext>anyObject());
+        PowerMock.expectLastCall();
+        EasyMock.expect(worker.reconfigureConnectorTasks(CONN1, MAX_TASKS, null)).andReturn(TASK_CONFIGS);
+        member.poll(EasyMock.anyInt());
         PowerMock.expectLastCall();
 
         PowerMock.replayAll();
 
-        herder.configure(CONFIG_STORAGE_CONFIG);
-        herder.start();
-        herder.addConnector(connectorProps.get(additionalConnectorName), createCallback);
-        herder.stop();
+        herder.tick(); // join
+        connectorConfigCallback.onCompletion(null, CONN1); // read updated config
+        herder.tick(); // apply config
+        herder.tick(); // do rebalance
 
         PowerMock.verifyAll();
     }
 
-    private void expectConfigStorageConfigureStart() {
-        configStorage.configure(CONFIG_STORAGE_CONFIG);
+    @Test
+    public void testConnectorConfigUpdate() {
+        // Connector config can be applied without any rebalance
+
+        EasyMock.expect(member.memberId()).andStubReturn("member");
+        EasyMock.expect(worker.connectorNames()).andStubReturn(Collections.singleton(CONN1));
+
+        // join
+        expectRebalance(1, Arrays.asList(CONN1), Collections.<ConnectorTaskId>emptyList());
+        expectPostRebalanceCatchup(SNAPSHOT);
+        worker.addConnector(EasyMock.<ConnectorConfig>anyObject(), EasyMock.<ConnectorContext>anyObject());
         PowerMock.expectLastCall();
-        configStorage.start();
+        EasyMock.expect(worker.reconfigureConnectorTasks(CONN1, MAX_TASKS, null)).andReturn(TASK_CONFIGS);
+        member.poll(EasyMock.anyInt());
         PowerMock.expectLastCall();
-    }
 
-    private void expectAdd(String connectorName) throws Exception {
-        configStorage.putConnectorConfig(connectorName, connectorProps.get(connectorName));
+        // apply config
+        member.wakeup();
+        member.ensureActive();
+        PowerMock.expectLastCall();
+        EasyMock.expect(configStorage.snapshot()).andReturn(SNAPSHOT); // for this test, it doesn't matter if we use the same config snapshot
+        worker.stopConnector(CONN1);
+        PowerMock.expectLastCall();
+        worker.addConnector(EasyMock.<ConnectorConfig>anyObject(), EasyMock.<ConnectorContext>anyObject());
+        PowerMock.expectLastCall();
+        EasyMock.expect(worker.reconfigureConnectorTasks(CONN1, MAX_TASKS, null)).andReturn(TASK_CONFIGS);
+        member.poll(EasyMock.anyInt());
         PowerMock.expectLastCall();
-        expectInstantiateConnector(connectorName, true);
-    }
 
-    private void expectEmptyRestore() throws Exception {
-        expectRestore(Collections.<String>emptyList());
-    }
+        PowerMock.replayAll();
+
+        herder.tick(); // join
+        connectorConfigCallback.onCompletion(null, CONN1); // read updated config
+        herder.tick(); // apply config
 
-    private void expectRestore(List<String> connectorNames) throws Exception {
-        Map<String, Integer> rootConfig = new HashMap<>();
-        Map<String, Map<String, String>> connectorConfigs = new HashMap<>();
-        for (String connName : connectorNames) {
-            rootConfig.put(connName, 0);
-            connectorConfigs.put(connName, connectorProps.get(connName));
-        }
-        EasyMock.expect(configStorage.snapshot())
-                .andReturn(new ClusterConfigState(1, rootConfig, connectorConfigs, Collections.EMPTY_MAP, Collections.EMPTY_SET));
-
-        // Restore never uses a callback
-        for (String connectorName : connectorNames)
-            expectInstantiateConnector(connectorName, false);
+        PowerMock.verifyAll();
     }
 
-    private void expectInstantiateConnector(String connectorName, boolean expectCallback) throws Exception {
-        PowerMock.expectPrivate(DistributedHerder.class, "instantiateConnector", connectorClasses.get(connectorName).getName())
-                .andReturn(connectors.get(connectorName));
-        if (expectCallback) {
-            createCallback.onCompletion(null, connectorName);
-            PowerMock.expectLastCall();
-        }
+    @Test
+    public void testTaskConfigAdded() {
+        // Task config always requires rebalance
+        EasyMock.expect(member.memberId()).andStubReturn("member");
+
+        // join
+        expectRebalance(-1, Collections.<String>emptyList(), Collections.<ConnectorTaskId>emptyList());
+        member.poll(EasyMock.anyInt());
+        PowerMock.expectLastCall();
 
-        Connector connector = connectors.get(connectorName);
-        connector.initialize(EasyMock.anyObject(HerderConnectorContext.class));
+        // apply config
+        member.wakeup();
+        member.ensureActive();
+        PowerMock.expectLastCall();
+        // Checks for config updates and starts rebalance
+        EasyMock.expect(configStorage.snapshot()).andReturn(SNAPSHOT);
+        member.requestRejoin();
         PowerMock.expectLastCall();
-        connector.start(new Properties());
+        // Performs rebalance and gets new assignment
+        expectRebalance(Collections.<String>emptyList(), Collections.<ConnectorTaskId>emptyList(),
+                CopycatProtocol.Assignment.NO_ERROR, 1, Collections.<String>emptyList(), Arrays.asList(TASK0));
+        worker.addTask(EasyMock.eq(TASK0), EasyMock.<TaskConfig>anyObject());
         PowerMock.expectLastCall();
+        member.poll(EasyMock.anyInt());
+        PowerMock.expectLastCall();
+
+        PowerMock.replayAll();
 
-        // Just return the connector properties for the individual task we generate by default
-        EasyMock.<Class<? extends Task>>expect(connector.taskClass()).andReturn(connectorTaskClasses.get(connectorName));
+        herder.tick(); // join
+        taskConfigCallback.onCompletion(null, Arrays.asList(TASK0, TASK1, TASK2)); // read updated config
+        herder.tick(); // apply config
+        herder.tick(); // do rebalance
 
-        EasyMock.expect(connector.taskConfigs(ConnectorConfig.TASKS_MAX_DEFAULT))
-                .andReturn(Arrays.asList(taskProps));
-        // And we should instantiate the tasks. For a sink task, we should see added properties for
-        // the input topic partitions
-        Properties generatedTaskProps = new Properties();
-        generatedTaskProps.putAll(taskProps);
-        if (connectorName.contains("sink"))
-            generatedTaskProps.setProperty(SinkTask.TOPICS_CONFIG, TOPICS_LIST_STR);
-        ConnectorTaskId taskId = new ConnectorTaskId(connectorName, 0);
-        worker.addTask(taskId, connectorTaskClasses.get(connectorName).getName(), generatedTaskProps);
+        PowerMock.verifyAll();
+    }
+
+    @Test
+    public void testJoinLeaderCatchUpFails() throws Exception {
+        // Join group and as leader fail to do assignment
+        EasyMock.expect(member.memberId()).andStubReturn("leader");
+        expectRebalance(Collections.<String>emptyList(), Collections.<ConnectorTaskId>emptyList(),
+                CopycatProtocol.Assignment.CONFIG_MISMATCH, 1, Collections.<String>emptyList(), Collections.<ConnectorTaskId>emptyList());
+        // Reading to end of log times out
+        TestFuture<Void> readToEndFuture = new TestFuture<>();
+        readToEndFuture.resolveOnGet(new TimeoutException());
+        EasyMock.expect(configStorage.readToEnd()).andReturn(readToEndFuture);
+        PowerMock.expectPrivate(herder, "backoff", DistributedHerderConfig.WORKER_UNSYNC_BACKOFF_MS_DEFAULT);
+        member.requestRejoin();
+
+        // After backoff, restart the process and this time succeed
+        expectRebalance(1, Arrays.asList(CONN1), Arrays.asList(TASK1));
+        expectPostRebalanceCatchup(SNAPSHOT);
+
+        worker.addConnector(EasyMock.<ConnectorConfig>anyObject(), EasyMock.<ConnectorContext>anyObject());
         PowerMock.expectLastCall();
+        EasyMock.expect(worker.reconfigureConnectorTasks(CONN1, MAX_TASKS, null)).andReturn(TASK_CONFIGS);
+        worker.addTask(EasyMock.eq(TASK1), EasyMock.<TaskConfig>anyObject());
+        PowerMock.expectLastCall();
+        member.poll(EasyMock.anyInt());
+        PowerMock.expectLastCall();
+
+        PowerMock.replayAll();
+
+        herder.tick();
+        herder.tick();
+
+        PowerMock.verifyAll();
     }
 
-    private void expectStop(String connectorName) {
-        worker.stopTask(new ConnectorTaskId(connectorName, 0));
-        EasyMock.expectLastCall();
-        Connector connector = connectors.get(connectorName);
-        connector.stop();
-        EasyMock.expectLastCall();
+    @Test
+    public void testInconsistentConfigs() throws Exception {
+        // FIXME: if we have inconsistent configs, we need to request forced reconfig + write of the connector's task configs
+        // This requires inter-worker communication, so needs the REST API
     }
 
-    private void expectDestroy(String connectorName) {
-        expectStop(connectorName);
-        configStorage.putConnectorConfig(connectorName, null);
-        PowerMock.expectLastCall();
+
+    private void expectRebalance(final long offset, final List<String> assignedConnectors, final List<ConnectorTaskId> assignedTasks) {
+        expectRebalance(null, null, CopycatProtocol.Assignment.NO_ERROR, offset, assignedConnectors, assignedTasks);
     }
 
-    // We need to use a real class here due to some issue with mocking java.lang.Class
-    private abstract class BogusSourceConnector extends SourceConnector {
+    // Handles common initial part of rebalance callback. Does not handle instantiation of connectors and tasks.
+    private void expectRebalance(final Collection<String> revokedConnectors, final List<ConnectorTaskId> revokedTasks,
+                                 final short error, final long offset, final List<String> assignedConnectors, final List<ConnectorTaskId> assignedTasks) {
+        member.ensureActive();
+        PowerMock.expectLastCall().andAnswer(new IAnswer<Object>() {
+            @Override
+            public Object answer() throws Throwable {
+                if (revokedConnectors != null)
+                    rebalanceListener.onRevoked("leader", revokedConnectors, revokedTasks);
+                CopycatProtocol.Assignment assignment = new CopycatProtocol.Assignment(
+                        error, "leader", offset, assignedConnectors, assignedTasks);
+                rebalanceListener.onAssigned(assignment);
+                return null;
+            }
+        });
     }
 
-    private abstract class BogusSourceTask extends SourceTask {
+    private void expectPostRebalanceCatchup(final ClusterConfigState readToEndSnapshot) {
+        TestFuture<Void> readToEndFuture = new TestFuture<>();
+        readToEndFuture.resolveOnGet((Void) null);
+        EasyMock.expect(configStorage.readToEnd()).andReturn(readToEndFuture);
+        EasyMock.expect(configStorage.snapshot()).andReturn(readToEndSnapshot);
     }
 
-    private abstract class BogusSinkConnector extends SinkConnector {
+
+    // We need to use a real class here due to some issue with mocking java.lang.Class
+    private abstract class BogusSourceConnector extends SourceConnector {
     }
 
-    private abstract class BogusSinkTask extends SourceTask {
+    private abstract class BogusSourceTask extends SourceTask {
     }
 
 }

http://git-wip-us.apache.org/repos/asf/kafka/blob/2e617735/copycat/runtime/src/test/java/org/apache/kafka/copycat/runtime/distributed/WorkerCoordinatorTest.java
----------------------------------------------------------------------
diff --git a/copycat/runtime/src/test/java/org/apache/kafka/copycat/runtime/distributed/WorkerCoordinatorTest.java b/copycat/runtime/src/test/java/org/apache/kafka/copycat/runtime/distributed/WorkerCoordinatorTest.java
new file mode 100644
index 0000000..30c76a2
--- /dev/null
+++ b/copycat/runtime/src/test/java/org/apache/kafka/copycat/runtime/distributed/WorkerCoordinatorTest.java
@@ -0,0 +1,436 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ **/
+package org.apache.kafka.copycat.runtime.distributed;
+
+import org.apache.kafka.clients.ClientRequest;
+import org.apache.kafka.clients.Metadata;
+import org.apache.kafka.clients.MockClient;
+import org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient;
+import org.apache.kafka.common.Cluster;
+import org.apache.kafka.common.Node;
+import org.apache.kafka.common.metrics.Metrics;
+import org.apache.kafka.common.protocol.Errors;
+import org.apache.kafka.common.protocol.types.Struct;
+import org.apache.kafka.common.requests.GroupMetadataResponse;
+import org.apache.kafka.common.requests.JoinGroupResponse;
+import org.apache.kafka.common.requests.SyncGroupRequest;
+import org.apache.kafka.common.requests.SyncGroupResponse;
+import org.apache.kafka.common.utils.MockTime;
+import org.apache.kafka.copycat.storage.KafkaConfigStorage;
+import org.apache.kafka.copycat.util.ConnectorTaskId;
+import org.apache.kafka.test.TestUtils;
+import org.easymock.EasyMock;
+import org.easymock.Mock;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+import org.powermock.api.easymock.PowerMock;
+import org.powermock.reflect.Whitebox;
+
+import java.nio.ByteBuffer;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Map;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+
+public class WorkerCoordinatorTest {
+
+    private String connectorId = "connector";
+    private String connectorId2 = "connector2";
+    private ConnectorTaskId taskId0 = new ConnectorTaskId(connectorId, 0);
+    private ConnectorTaskId taskId1 = new ConnectorTaskId(connectorId, 1);
+    private ConnectorTaskId taskId2 = new ConnectorTaskId(connectorId2, 0);
+
+    private String groupId = "test-group";
+    private int sessionTimeoutMs = 10;
+    private int heartbeatIntervalMs = 2;
+    private long retryBackoffMs = 100;
+    private long requestTimeoutMs = 5000;
+    private MockTime time;
+    private MockClient client;
+    private Cluster cluster = TestUtils.singletonCluster("topic", 1);
+    private Node node = cluster.nodes().get(0);
+    private Metadata metadata;
+    private Metrics metrics;
+    private Map<String, String> metricTags = new LinkedHashMap<>();
+    private ConsumerNetworkClient consumerClient;
+    private MockRebalanceListener rebalanceListener;
+    @Mock private KafkaConfigStorage configStorage;
+    private WorkerCoordinator coordinator;
+
+    private ClusterConfigState configState1;
+    private ClusterConfigState configState2;
+
+    @Before
+    public void setup() {
+        this.time = new MockTime();
+        this.client = new MockClient(time);
+        this.metadata = new Metadata(0, Long.MAX_VALUE);
+        this.metadata.update(cluster, time.milliseconds());
+        this.consumerClient = new ConsumerNetworkClient(client, metadata, time, 100);
+        this.metrics = new Metrics(time);
+        this.rebalanceListener = new MockRebalanceListener();
+        this.configStorage = PowerMock.createMock(KafkaConfigStorage.class);
+
+        client.setNode(node);
+
+        this.coordinator = new WorkerCoordinator(consumerClient,
+                groupId,
+                sessionTimeoutMs,
+                heartbeatIntervalMs,
+                metrics,
+                "consumer" + groupId,
+                metricTags,
+                time,
+                requestTimeoutMs,
+                retryBackoffMs,
+                configStorage,
+                rebalanceListener);
+
+        configState1 = new ClusterConfigState(
+                1L, Collections.singletonMap(connectorId, 1),
+                Collections.singletonMap(connectorId, (Map<String, String>) new HashMap<String, String>()),
+                Collections.singletonMap(taskId0, (Map<String, String>) new HashMap<String, String>()),
+                Collections.<String>emptySet()
+        );
+        Map<String, Integer> configState2ConnectorTaskCounts = new HashMap<>();
+        configState2ConnectorTaskCounts.put(connectorId, 2);
+        configState2ConnectorTaskCounts.put(connectorId2, 1);
+        Map<String, Map<String, String>> configState2ConnectorConfigs = new HashMap<>();
+        configState2ConnectorConfigs.put(connectorId, new HashMap<String, String>());
+        configState2ConnectorConfigs.put(connectorId2, new HashMap<String, String>());
+        Map<ConnectorTaskId, Map<String, String>> configState2TaskConfigs = new HashMap<>();
+        configState2TaskConfigs.put(taskId0, new HashMap<String, String>());
+        configState2TaskConfigs.put(taskId1, new HashMap<String, String>());
+        configState2TaskConfigs.put(taskId2, new HashMap<String, String>());
+        configState2 = new ClusterConfigState(
+                2L, configState2ConnectorTaskCounts,
+                configState2ConnectorConfigs,
+                configState2TaskConfigs,
+                Collections.<String>emptySet()
+        );
+    }
+
+    @After
+    public void teardown() {
+        this.metrics.close();
+    }
+
+    // We only test functionality unique to WorkerCoordinator. Most functionality is already well tested via the tests
+    // that cover AbstractCoordinator & ConsumerCoordinator.
+
+    @Test
+    public void testMetadata() {
+        EasyMock.expect(configStorage.snapshot()).andReturn(configState1);
+
+        PowerMock.replayAll();
+
+        LinkedHashMap<String, ByteBuffer> serialized = coordinator.metadata();
+        assertEquals(1, serialized.size());
+        CopycatProtocol.ConfigState state = CopycatProtocol.deserializeMetadata(serialized.get(WorkerCoordinator.DEFAULT_SUBPROTOCOL));
+        assertEquals(1, state.offset());
+
+        PowerMock.verifyAll();
+    }
+
+    @Test
+    public void testNormalJoinGroupLeader() {
+        EasyMock.expect(configStorage.snapshot()).andReturn(configState1);
+
+        PowerMock.replayAll();
+
+        final String consumerId = "leader";
+
+        client.prepareResponse(groupMetadataResponse(node, Errors.NONE.code()));
+        coordinator.ensureCoordinatorKnown();
+
+        // normal join group
+        Map<String, Long> memberConfigOffsets = new HashMap<>();
+        memberConfigOffsets.put("leader", 1L);
+        memberConfigOffsets.put("member", 1L);
+        client.prepareResponse(joinGroupLeaderResponse(1, consumerId, memberConfigOffsets, Errors.NONE.code()));
+        client.prepareResponse(new MockClient.RequestMatcher() {
+            @Override
+            public boolean matches(ClientRequest request) {
+                SyncGroupRequest sync = new SyncGroupRequest(request.request().body());
+                return sync.memberId().equals(consumerId) &&
+                        sync.generationId() == 1 &&
+                        sync.groupAssignment().containsKey(consumerId);
+            }
+        }, syncGroupResponse(CopycatProtocol.Assignment.NO_ERROR, "leader", 1L, Collections.singletonList(connectorId),
+                Collections.<ConnectorTaskId>emptyList(), Errors.NONE.code()));
+        coordinator.ensureActiveGroup();
+
+        assertFalse(coordinator.needRejoin());
+        assertEquals(0, rebalanceListener.revokedCount);
+        assertEquals(1, rebalanceListener.assignedCount);
+        assertFalse(rebalanceListener.assignment.failed());
+        assertEquals(1L, rebalanceListener.assignment.offset());
+        assertEquals("leader", rebalanceListener.assignment.leader());
+        assertEquals(Collections.singletonList(connectorId), rebalanceListener.assignment.connectors());
+        assertEquals(Collections.emptyList(), rebalanceListener.assignment.tasks());
+
+        PowerMock.verifyAll();
+    }
+
+    @Test
+    public void testNormalJoinGroupFollower() {
+        EasyMock.expect(configStorage.snapshot()).andReturn(configState1);
+
+        PowerMock.replayAll();
+
+        final String memberId = "member";
+
+        client.prepareResponse(groupMetadataResponse(node, Errors.NONE.code()));
+        coordinator.ensureCoordinatorKnown();
+
+        // normal join group
+        client.prepareResponse(joinGroupFollowerResponse(1, memberId, "leader", Errors.NONE.code()));
+        client.prepareResponse(new MockClient.RequestMatcher() {
+            @Override
+            public boolean matches(ClientRequest request) {
+                SyncGroupRequest sync = new SyncGroupRequest(request.request().body());
+                return sync.memberId().equals(memberId) &&
+                        sync.generationId() == 1 &&
+                        sync.groupAssignment().isEmpty();
+            }
+        }, syncGroupResponse(CopycatProtocol.Assignment.NO_ERROR, "leader", 1L, Collections.<String>emptyList(),
+                Collections.singletonList(taskId0), Errors.NONE.code()));
+        coordinator.ensureActiveGroup();
+
+        assertFalse(coordinator.needRejoin());
+        assertEquals(0, rebalanceListener.revokedCount);
+        assertEquals(1, rebalanceListener.assignedCount);
+        assertFalse(rebalanceListener.assignment.failed());
+        assertEquals(1L, rebalanceListener.assignment.offset());
+        assertEquals(Collections.emptyList(), rebalanceListener.assignment.connectors());
+        assertEquals(Collections.singletonList(taskId0), rebalanceListener.assignment.tasks());
+
+        PowerMock.verifyAll();
+    }
+
+    @Test
+    public void testJoinLeaderCannotAssign() {
+        // If the selected leader can't get up to the maximum offset, it will fail to assign and we should immediately
+        // need to retry the join.
+
+        // When the first round fails, we'll take an updated config snapshot
+        EasyMock.expect(configStorage.snapshot()).andReturn(configState1);
+        EasyMock.expect(configStorage.snapshot()).andReturn(configState2);
+
+        PowerMock.replayAll();
+
+        final String memberId = "member";
+
+        client.prepareResponse(groupMetadataResponse(node, Errors.NONE.code()));
+        coordinator.ensureCoordinatorKnown();
+
+        // config mismatch results in assignment error
+        client.prepareResponse(joinGroupFollowerResponse(1, memberId, "leader", Errors.NONE.code()));
+        MockClient.RequestMatcher matcher = new MockClient.RequestMatcher() {
+            @Override
+            public boolean matches(ClientRequest request) {
+                SyncGroupRequest sync = new SyncGroupRequest(request.request().body());
+                return sync.memberId().equals(memberId) &&
+                        sync.generationId() == 1 &&
+                        sync.groupAssignment().isEmpty();
+            }
+        };
+        client.prepareResponse(matcher, syncGroupResponse(CopycatProtocol.Assignment.CONFIG_MISMATCH, "leader", 10L,
+                Collections.<String>emptyList(), Collections.<ConnectorTaskId>emptyList(), Errors.NONE.code()));
+        client.prepareResponse(joinGroupFollowerResponse(1, memberId, "leader", Errors.NONE.code()));
+        client.prepareResponse(matcher, syncGroupResponse(CopycatProtocol.Assignment.NO_ERROR, "leader", 1L,
+                Collections.<String>emptyList(), Collections.singletonList(taskId0), Errors.NONE.code()));
+        coordinator.ensureActiveGroup();
+
+        PowerMock.verifyAll();
+    }
+
+    @Test
+    public void testRejoinGroup() {
+        EasyMock.expect(configStorage.snapshot()).andReturn(configState1);
+        EasyMock.expect(configStorage.snapshot()).andReturn(configState1);
+
+        PowerMock.replayAll();
+
+        client.prepareResponse(groupMetadataResponse(node, Errors.NONE.code()));
+        coordinator.ensureCoordinatorKnown();
+
+        // join the group once
+        client.prepareResponse(joinGroupFollowerResponse(1, "consumer", "leader", Errors.NONE.code()));
+        client.prepareResponse(syncGroupResponse(CopycatProtocol.Assignment.NO_ERROR, "leader", 1L, Collections.<String>emptyList(),
+                Collections.singletonList(taskId0), Errors.NONE.code()));
+        coordinator.ensureActiveGroup();
+
+        assertEquals(0, rebalanceListener.revokedCount);
+        assertEquals(1, rebalanceListener.assignedCount);
+        assertFalse(rebalanceListener.assignment.failed());
+        assertEquals(1L, rebalanceListener.assignment.offset());
+        assertEquals(Collections.emptyList(), rebalanceListener.assignment.connectors());
+        assertEquals(Collections.singletonList(taskId0), rebalanceListener.assignment.tasks());
+
+        // and join the group again
+        coordinator.requestRejoin();
+        client.prepareResponse(joinGroupFollowerResponse(1, "consumer", "leader", Errors.NONE.code()));
+        client.prepareResponse(syncGroupResponse(CopycatProtocol.Assignment.NO_ERROR, "leader", 1L, Collections.singletonList(connectorId),
+                Collections.<ConnectorTaskId>emptyList(), Errors.NONE.code()));
+        coordinator.ensureActiveGroup();
+
+        assertEquals(1, rebalanceListener.revokedCount);
+        assertEquals(Collections.emptyList(), rebalanceListener.revokedConnectors);
+        assertEquals(Collections.singletonList(taskId0), rebalanceListener.revokedTasks);
+        assertEquals(2, rebalanceListener.assignedCount);
+        assertFalse(rebalanceListener.assignment.failed());
+        assertEquals(1L, rebalanceListener.assignment.offset());
+        assertEquals(Collections.singletonList(connectorId), rebalanceListener.assignment.connectors());
+        assertEquals(Collections.emptyList(), rebalanceListener.assignment.tasks());
+
+        PowerMock.verifyAll();
+    }
+
+    @Test
+    public void testLeaderDoSync1() throws Exception {
+        // Since all the protocol responses are mocked, the other tests validate doSync runs, but don't validate its
+        // output. So we test it directly here.
+
+        EasyMock.expect(configStorage.snapshot()).andReturn(configState1);
+
+        PowerMock.replayAll();
+
+        // Prime the current configuration state
+        coordinator.metadata();
+
+        Map<String, ByteBuffer> configs = new HashMap<>();
+        // Mark everyone as in sync with configState1
+        configs.put("leader", CopycatProtocol.serializeMetadata(new CopycatProtocol.ConfigState(1L)));
+        configs.put("member", CopycatProtocol.serializeMetadata(new CopycatProtocol.ConfigState(1L)));
+        Map<String, ByteBuffer> result = Whitebox.invokeMethod(coordinator, "doSync", "leader", WorkerCoordinator.DEFAULT_SUBPROTOCOL, configs);
+
+        // configState1 has 1 connector, 1 task
+        CopycatProtocol.Assignment leaderAssignment = CopycatProtocol.deserializeAssignment(result.get("leader"));
+        assertEquals(false, leaderAssignment.failed());
+        assertEquals("leader", leaderAssignment.leader());
+        assertEquals(1, leaderAssignment.offset());
+        assertEquals(Collections.singletonList(connectorId), leaderAssignment.connectors());
+        assertEquals(Collections.emptyList(), leaderAssignment.tasks());
+
+        CopycatProtocol.Assignment memberAssignment = CopycatProtocol.deserializeAssignment(result.get("member"));
+        assertEquals(false, memberAssignment.failed());
+        assertEquals("leader", memberAssignment.leader());
+        assertEquals(1, memberAssignment.offset());
+        assertEquals(Collections.emptyList(), memberAssignment.connectors());
+        assertEquals(Collections.singletonList(taskId0), memberAssignment.tasks());
+
+        PowerMock.verifyAll();
+    }
+
+    @Test
+    public void testLeaderDoSync2() throws Exception {
+        // Since all the protocol responses are mocked, the other tests validate doSync runs, but don't validate its
+        // output. So we test it directly here.
+
+        EasyMock.expect(configStorage.snapshot()).andReturn(configState2);
+
+        PowerMock.replayAll();
+
+        // Prime the current configuration state
+        coordinator.metadata();
+
+        Map<String, ByteBuffer> configs = new HashMap<>();
+        // Mark everyone as in sync with configState1
+        configs.put("leader", CopycatProtocol.serializeMetadata(new CopycatProtocol.ConfigState(1L)));
+        configs.put("member", CopycatProtocol.serializeMetadata(new CopycatProtocol.ConfigState(1L)));
+        Map<String, ByteBuffer> result = Whitebox.invokeMethod(coordinator, "doSync", "leader", WorkerCoordinator.DEFAULT_SUBPROTOCOL, configs);
+
+        // configState2 has 2 connector, 3 tasks and should trigger round robin assignment
+        CopycatProtocol.Assignment leaderAssignment = CopycatProtocol.deserializeAssignment(result.get("leader"));
+        assertEquals(false, leaderAssignment.failed());
+        assertEquals("leader", leaderAssignment.leader());
+        assertEquals(1, leaderAssignment.offset());
+        assertEquals(Collections.singletonList(connectorId), leaderAssignment.connectors());
+        assertEquals(Arrays.asList(taskId1, taskId2), leaderAssignment.tasks());
+
+        CopycatProtocol.Assignment memberAssignment = CopycatProtocol.deserializeAssignment(result.get("member"));
+        assertEquals(false, memberAssignment.failed());
+        assertEquals("leader", memberAssignment.leader());
+        assertEquals(1, memberAssignment.offset());
+        assertEquals(Collections.singletonList(connectorId2), memberAssignment.connectors());
+        assertEquals(Collections.singletonList(taskId0), memberAssignment.tasks());
+
+        PowerMock.verifyAll();
+    }
+
+
+    private Struct groupMetadataResponse(Node node, short error) {
+        GroupMetadataResponse response = new GroupMetadataResponse(error, node);
+        return response.toStruct();
+    }
+
+    private Struct joinGroupLeaderResponse(int generationId, String memberId,
+                                           Map<String, Long> configOffsets, short error) {
+        Map<String, ByteBuffer> metadata = new HashMap<>();
+        for (Map.Entry<String, Long> configStateEntry : configOffsets.entrySet()) {
+            ByteBuffer buf = CopycatProtocol.serializeMetadata(new CopycatProtocol.ConfigState(configStateEntry.getValue()));
+            metadata.put(configStateEntry.getKey(), buf);
+        }
+        return new JoinGroupResponse(error, generationId, WorkerCoordinator.DEFAULT_SUBPROTOCOL, memberId, memberId, metadata).toStruct();
+    }
+
+    private Struct joinGroupFollowerResponse(int generationId, String memberId, String leaderId, short error) {
+        return new JoinGroupResponse(error, generationId, WorkerCoordinator.DEFAULT_SUBPROTOCOL, memberId, leaderId,
+                Collections.<String, ByteBuffer>emptyMap()).toStruct();
+    }
+
+    private Struct syncGroupResponse(short assignmentError, String leader, long configOffset, List<String> connectorIds,
+                                     List<ConnectorTaskId> taskIds, short error) {
+        CopycatProtocol.Assignment assignment = new CopycatProtocol.Assignment(assignmentError, leader, configOffset, connectorIds, taskIds);
+        ByteBuffer buf = CopycatProtocol.serializeAssignment(assignment);
+        return new SyncGroupResponse(error, buf).toStruct();
+    }
+
+
+    private static class MockRebalanceListener implements WorkerRebalanceListener {
+        public CopycatProtocol.Assignment assignment = null;
+
+        public String revokedLeader;
+        public Collection<String> revokedConnectors;
+        public Collection<ConnectorTaskId> revokedTasks;
+
+        public int revokedCount = 0;
+        public int assignedCount = 0;
+
+        @Override
+        public void onAssigned(CopycatProtocol.Assignment assignment) {
+            this.assignment = assignment;
+            assignedCount++;
+        }
+
+        @Override
+        public void onRevoked(String leader, Collection<String> connectors, Collection<ConnectorTaskId> tasks) {
+            this.revokedLeader = leader;
+            this.revokedConnectors = connectors;
+            this.revokedTasks = tasks;
+            revokedCount++;
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/2e617735/copycat/runtime/src/test/java/org/apache/kafka/copycat/runtime/standalone/StandaloneHerderTest.java
----------------------------------------------------------------------
diff --git a/copycat/runtime/src/test/java/org/apache/kafka/copycat/runtime/standalone/StandaloneHerderTest.java b/copycat/runtime/src/test/java/org/apache/kafka/copycat/runtime/standalone/StandaloneHerderTest.java
index 606b94d..b395fc7 100644
--- a/copycat/runtime/src/test/java/org/apache/kafka/copycat/runtime/standalone/StandaloneHerderTest.java
+++ b/copycat/runtime/src/test/java/org/apache/kafka/copycat/runtime/standalone/StandaloneHerderTest.java
@@ -21,6 +21,7 @@ import org.apache.kafka.copycat.connector.Connector;
 import org.apache.kafka.copycat.connector.Task;
 import org.apache.kafka.copycat.runtime.ConnectorConfig;
 import org.apache.kafka.copycat.runtime.HerderConnectorContext;
+import org.apache.kafka.copycat.runtime.TaskConfig;
 import org.apache.kafka.copycat.runtime.Worker;
 import org.apache.kafka.copycat.sink.SinkConnector;
 import org.apache.kafka.copycat.sink.SinkTask;
@@ -35,22 +36,21 @@ import org.junit.Test;
 import org.junit.runner.RunWith;
 import org.powermock.api.easymock.PowerMock;
 import org.powermock.api.easymock.annotation.Mock;
-import org.powermock.core.classloader.annotations.PowerMockIgnore;
-import org.powermock.core.classloader.annotations.PrepareForTest;
 import org.powermock.modules.junit4.PowerMockRunner;
 
 import java.util.Arrays;
+import java.util.Collections;
 import java.util.HashMap;
+import java.util.List;
 import java.util.Map;
-import java.util.Properties;
 import java.util.concurrent.TimeUnit;
 
 @RunWith(PowerMockRunner.class)
-@PrepareForTest({StandaloneHerder.class})
-@PowerMockIgnore("javax.management.*")
 public class StandaloneHerderTest {
     private static final String CONNECTOR_NAME = "test";
+    private static final List<String> TOPICS_LIST = Arrays.asList("topic1", "topic2");
     private static final String TOPICS_LIST_STR = "topic1,topic2";
+    private static final int DEFAULT_MAX_TASKS = 1;
 
     private StandaloneHerder herder;
     @Mock protected Worker worker;
@@ -58,7 +58,7 @@ public class StandaloneHerderTest {
     @Mock protected Callback<String> createCallback;
 
     private Map<String, String> connectorProps;
-    private Properties taskProps;
+    private Map<String, String> taskProps;
 
     @Before
     public void setup() {
@@ -68,11 +68,10 @@ public class StandaloneHerderTest {
         connectorProps = new HashMap<>();
         connectorProps.put(ConnectorConfig.NAME_CONFIG, CONNECTOR_NAME);
         connectorProps.put(SinkConnector.TOPICS_CONFIG, TOPICS_LIST_STR);
-        PowerMock.mockStatic(StandaloneHerder.class);
 
         // These can be anything since connectors can pass along whatever they want.
-        taskProps = new Properties();
-        taskProps.setProperty("foo", "bar");
+        taskProps = new HashMap<>();
+        taskProps.put("foo", "bar");
     }
 
     @Test
@@ -121,7 +120,9 @@ public class StandaloneHerderTest {
     public void testCreateAndStop() throws Exception {
         connector = PowerMock.createMock(BogusSourceConnector.class);
         expectAdd(BogusSourceConnector.class, BogusSourceTask.class, false);
+        // herder.stop() should stop any running connectors and tasks even if destroyConnector was not invoked
         expectStop();
+
         PowerMock.replayAll();
 
         herder.addConnector(connectorProps, createCallback);
@@ -135,36 +136,30 @@ public class StandaloneHerderTest {
                            boolean sink) throws Exception {
         connectorProps.put(ConnectorConfig.CONNECTOR_CLASS_CONFIG, connClass.getName());
 
-        PowerMock.expectPrivate(StandaloneHerder.class, "instantiateConnector", connClass.getName())
-                .andReturn(connector);
-
-        createCallback.onCompletion(null, CONNECTOR_NAME);
+        worker.addConnector(EasyMock.eq(new ConnectorConfig(connectorProps)), EasyMock.anyObject(HerderConnectorContext.class));
         PowerMock.expectLastCall();
 
-        connector.initialize(EasyMock.anyObject(HerderConnectorContext.class));
-        PowerMock.expectLastCall();
-        connector.start(new Properties());
+        createCallback.onCompletion(null, CONNECTOR_NAME);
         PowerMock.expectLastCall();
 
-        // Just return the connector properties for the individual task we generate by default
-        EasyMock.<Class<? extends Task>>expect(connector.taskClass()).andReturn(taskClass);
-
-        EasyMock.expect(connector.taskConfigs(ConnectorConfig.TASKS_MAX_DEFAULT))
-                .andReturn(Arrays.asList(taskProps));
         // And we should instantiate the tasks. For a sink task, we should see added properties for
         // the input topic partitions
-        Properties generatedTaskProps = new Properties();
+        Map<String, String> generatedTaskProps = new HashMap<>();
         generatedTaskProps.putAll(taskProps);
+        generatedTaskProps.put(TaskConfig.TASK_CLASS_CONFIG, taskClass.getName());
         if (sink)
-            generatedTaskProps.setProperty(SinkTask.TOPICS_CONFIG, TOPICS_LIST_STR);
-        worker.addTask(new ConnectorTaskId(CONNECTOR_NAME, 0), taskClass.getName(), generatedTaskProps);
+            generatedTaskProps.put(SinkTask.TOPICS_CONFIG, TOPICS_LIST_STR);
+        EasyMock.expect(worker.reconfigureConnectorTasks(CONNECTOR_NAME, DEFAULT_MAX_TASKS, TOPICS_LIST))
+                .andReturn(Collections.singletonMap(new ConnectorTaskId(CONNECTOR_NAME, 0), generatedTaskProps));
+
+        worker.addTask(new ConnectorTaskId(CONNECTOR_NAME, 0), new TaskConfig(generatedTaskProps));
         PowerMock.expectLastCall();
     }
 
     private void expectStop() {
         worker.stopTask(new ConnectorTaskId(CONNECTOR_NAME, 0));
         EasyMock.expectLastCall();
-        connector.stop();
+        worker.stopConnector(CONNECTOR_NAME);
         EasyMock.expectLastCall();
     }
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/2e617735/copycat/runtime/src/test/java/org/apache/kafka/copycat/storage/KafkaConfigStorageTest.java
----------------------------------------------------------------------
diff --git a/copycat/runtime/src/test/java/org/apache/kafka/copycat/storage/KafkaConfigStorageTest.java b/copycat/runtime/src/test/java/org/apache/kafka/copycat/storage/KafkaConfigStorageTest.java
index b02b752..cf9f8aa 100644
--- a/copycat/runtime/src/test/java/org/apache/kafka/copycat/storage/KafkaConfigStorageTest.java
+++ b/copycat/runtime/src/test/java/org/apache/kafka/copycat/storage/KafkaConfigStorageTest.java
@@ -169,6 +169,12 @@ public class KafkaConfigStorageTest {
         connectorReconfiguredCallback.onCompletion(null, CONNECTOR_IDS.get(1));
         EasyMock.expectLastCall();
 
+        // Config deletion
+        expectConvertWriteAndRead(
+                CONNECTOR_CONFIG_KEYS.get(1), KafkaConfigStorage.CONNECTOR_CONFIGURATION_V0, null, null, null);
+        connectorReconfiguredCallback.onCompletion(null, CONNECTOR_IDS.get(1));
+        EasyMock.expectLastCall();
+
         expectStop();
 
         PowerMock.replayAll();
@@ -185,17 +191,24 @@ public class KafkaConfigStorageTest {
         // Writing should block until it is written and read back from Kafka
         configStorage.putConnectorConfig(CONNECTOR_IDS.get(0), SAMPLE_CONFIGS.get(0));
         configState = configStorage.snapshot();
-        assertEquals(0, configState.offset());
+        assertEquals(1, configState.offset());
         assertEquals(SAMPLE_CONFIGS.get(0), configState.connectorConfig(CONNECTOR_IDS.get(0)));
         assertNull(configState.connectorConfig(CONNECTOR_IDS.get(1)));
 
         // Second should also block and all configs should still be available
         configStorage.putConnectorConfig(CONNECTOR_IDS.get(1), SAMPLE_CONFIGS.get(1));
         configState = configStorage.snapshot();
-        assertEquals(1, configState.offset());
+        assertEquals(2, configState.offset());
         assertEquals(SAMPLE_CONFIGS.get(0), configState.connectorConfig(CONNECTOR_IDS.get(0)));
         assertEquals(SAMPLE_CONFIGS.get(1), configState.connectorConfig(CONNECTOR_IDS.get(1)));
 
+        // Deletion should remove the second one we added
+        configStorage.putConnectorConfig(CONNECTOR_IDS.get(1), null);
+        configState = configStorage.snapshot();
+        assertEquals(3, configState.offset());
+        assertEquals(SAMPLE_CONFIGS.get(0), configState.connectorConfig(CONNECTOR_IDS.get(0)));
+        assertNull(configState.connectorConfig(CONNECTOR_IDS.get(1)));
+
         configStorage.stop();
 
         PowerMock.verifyAll();
@@ -255,13 +268,13 @@ public class KafkaConfigStorageTest {
 
         // Validate root config by listing all connectors and tasks
         configState = configStorage.snapshot();
-        assertEquals(2, configState.offset());
+        assertEquals(3, configState.offset());
         String connectorName = CONNECTOR_IDS.get(0);
         assertEquals(Arrays.asList(connectorName), new ArrayList<>(configState.connectors()));
-        assertEquals(Arrays.asList(TASK_IDS.get(0), TASK_IDS.get(1)), configState.tasks(connectorName));
+        assertEquals(new HashSet<>(Arrays.asList(TASK_IDS.get(0), TASK_IDS.get(1))), configState.tasks(connectorName));
         assertEquals(SAMPLE_CONFIGS.get(0), configState.taskConfig(TASK_IDS.get(0)));
         assertEquals(SAMPLE_CONFIGS.get(1), configState.taskConfig(TASK_IDS.get(1)));
-        assertEquals(new HashSet<>(Collections.EMPTY_LIST), configState.inconsistentConnectors());
+        assertEquals(Collections.EMPTY_SET, configState.inconsistentConnectors());
 
         configStorage.stop();
 
@@ -306,16 +319,16 @@ public class KafkaConfigStorageTest {
 
         // Should see a single connector and its config should be the last one seen anywhere in the log
         ClusterConfigState configState = configStorage.snapshot();
-        assertEquals(6, configState.offset()); // Should always be last read, even if uncommitted
+        assertEquals(7, configState.offset()); // Should always be next to be read, even if uncommitted
         assertEquals(Arrays.asList(CONNECTOR_IDS.get(0)), new ArrayList<>(configState.connectors()));
         // CONNECTOR_CONFIG_STRUCTS[2] -> SAMPLE_CONFIGS[2]
         assertEquals(SAMPLE_CONFIGS.get(2), configState.connectorConfig(CONNECTOR_IDS.get(0)));
         // Should see 2 tasks for that connector. Only config updates before the root key update should be reflected
-        assertEquals(Arrays.asList(TASK_IDS.get(0), TASK_IDS.get(1)), configState.tasks(CONNECTOR_IDS.get(0)));
+        assertEquals(new HashSet<>(Arrays.asList(TASK_IDS.get(0), TASK_IDS.get(1))), configState.tasks(CONNECTOR_IDS.get(0)));
         // Both TASK_CONFIG_STRUCTS[0] -> SAMPLE_CONFIGS[0]
         assertEquals(SAMPLE_CONFIGS.get(0), configState.taskConfig(TASK_IDS.get(0)));
         assertEquals(SAMPLE_CONFIGS.get(0), configState.taskConfig(TASK_IDS.get(1)));
-        assertEquals(new HashSet<>(Collections.EMPTY_LIST), configState.inconsistentConnectors());
+        assertEquals(Collections.EMPTY_SET, configState.inconsistentConnectors());
 
         configStorage.stop();
 
@@ -374,10 +387,10 @@ public class KafkaConfigStorageTest {
         configStorage.start();
         // After reading the log, it should have been in an inconsistent state
         ClusterConfigState configState = configStorage.snapshot();
-        assertEquals(5, configState.offset()); // Should always be last read, not last committed
+        assertEquals(6, configState.offset()); // Should always be next to be read, not last committed
         assertEquals(Arrays.asList(CONNECTOR_IDS.get(0)), new ArrayList<>(configState.connectors()));
         // Inconsistent data should leave us with no tasks listed for the connector and an entry in the inconsistent list
-        assertEquals(Collections.EMPTY_LIST, configState.tasks(CONNECTOR_IDS.get(0)));
+        assertEquals(Collections.EMPTY_SET, configState.tasks(CONNECTOR_IDS.get(0)));
         // Both TASK_CONFIG_STRUCTS[0] -> SAMPLE_CONFIGS[0]
         assertNull(configState.taskConfig(TASK_IDS.get(0)));
         assertNull(configState.taskConfig(TASK_IDS.get(1)));
@@ -398,11 +411,11 @@ public class KafkaConfigStorageTest {
         configState = configStorage.snapshot();
         // This is only two more ahead of the last one because multiple calls fail, and so their configs are not written
         // to the topic. Only the last call with 1 task config + 1 commit actually gets written.
-        assertEquals(7, configState.offset());
+        assertEquals(8, configState.offset());
         assertEquals(Arrays.asList(CONNECTOR_IDS.get(0)), new ArrayList<>(configState.connectors()));
-        assertEquals(Arrays.asList(TASK_IDS.get(0)), configState.tasks(CONNECTOR_IDS.get(0)));
+        assertEquals(new HashSet<>(Arrays.asList(TASK_IDS.get(0))), configState.tasks(CONNECTOR_IDS.get(0)));
         assertEquals(SAMPLE_CONFIGS.get(0), configState.taskConfig(TASK_IDS.get(0)));
-        assertEquals(new HashSet<>(Collections.EMPTY_LIST), configState.inconsistentConnectors());
+        assertEquals(Collections.EMPTY_SET, configState.inconsistentConnectors());
 
         configStorage.stop();
 
@@ -446,17 +459,19 @@ public class KafkaConfigStorageTest {
     private void expectConvertWriteRead(final String configKey, final Schema valueSchema, final byte[] serialized,
                                         final String dataFieldName, final Object dataFieldValue) {
         final Capture<Struct> capturedRecord = EasyMock.newCapture();
-        EasyMock.expect(converter.fromCopycatData(EasyMock.eq(TOPIC), EasyMock.eq(valueSchema), EasyMock.capture(capturedRecord)))
-                .andReturn(serialized);
+        if (serialized != null)
+            EasyMock.expect(converter.fromCopycatData(EasyMock.eq(TOPIC), EasyMock.eq(valueSchema), EasyMock.capture(capturedRecord)))
+                    .andReturn(serialized);
         storeLog.send(EasyMock.eq(configKey), EasyMock.aryEq(serialized));
         PowerMock.expectLastCall();
         EasyMock.expect(converter.toCopycatData(EasyMock.eq(TOPIC), EasyMock.aryEq(serialized)))
                 .andAnswer(new IAnswer<SchemaAndValue>() {
                     @Override
                     public SchemaAndValue answer() throws Throwable {
-                        assertEquals(dataFieldValue, capturedRecord.getValue().get(dataFieldName));
+                        if (dataFieldName != null)
+                            assertEquals(dataFieldValue, capturedRecord.getValue().get(dataFieldName));
                         // Note null schema because default settings for internal serialization are schema-less
-                        return new SchemaAndValue(null, structToMap(capturedRecord.getValue()));
+                        return new SchemaAndValue(null, serialized == null ? null : structToMap(capturedRecord.getValue()));
                     }
                 });
     }

http://git-wip-us.apache.org/repos/asf/kafka/blob/2e617735/copycat/runtime/src/test/java/org/apache/kafka/copycat/util/TestFuture.java
----------------------------------------------------------------------
diff --git a/copycat/runtime/src/test/java/org/apache/kafka/copycat/util/TestFuture.java b/copycat/runtime/src/test/java/org/apache/kafka/copycat/util/TestFuture.java
index c5978ec..55e24c8 100644
--- a/copycat/runtime/src/test/java/org/apache/kafka/copycat/util/TestFuture.java
+++ b/copycat/runtime/src/test/java/org/apache/kafka/copycat/util/TestFuture.java
@@ -102,8 +102,14 @@ public class TestFuture<T> implements Future<T> {
             }
         }
 
-        if (exception != null)
-            throw new ExecutionException(exception);
+        if (exception != null) {
+            if (exception instanceof TimeoutException)
+                throw (TimeoutException) exception;
+            else if (exception instanceof InterruptedException)
+                throw (InterruptedException) exception;
+            else
+                throw new ExecutionException(exception);
+        }
         return result;
     }
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/2e617735/tests/kafkatest/services/copycat.py
----------------------------------------------------------------------
diff --git a/tests/kafkatest/services/copycat.py b/tests/kafkatest/services/copycat.py
index 4e2ab40..45ef330 100644
--- a/tests/kafkatest/services/copycat.py
+++ b/tests/kafkatest/services/copycat.py
@@ -39,6 +39,16 @@ class CopycatServiceBase(Service):
         except:
             return []
 
+    def set_configs(self, config_template, connector_config_templates):
+        """
+        Set configurations for the worker and the connector to run on
+        it. These are not provided in the constructor because the worker
+        config generally needs access to ZK/Kafka services to
+        create the configuration.
+        """
+        self.config_template = config_template
+        self.connector_config_templates = connector_config_templates
+
     def stop_node(self, node, clean_shutdown=True):
         pids = self.pids(node)
         sig = signal.SIGTERM if clean_shutdown else signal.SIGKILL
@@ -51,7 +61,7 @@ class CopycatServiceBase(Service):
         node.account.ssh("rm -f /mnt/copycat.pid", allow_fail=False)
 
     def restart(self):
-        # We don't want to do any clean up here, just restart the process
+        # We don't want to do any clean up here, just restart the process.
         for node in self.nodes:
             self.stop_node(node)
             self.start_node(node)
@@ -62,8 +72,11 @@ class CopycatServiceBase(Service):
                              (self.__class__.__name__, node.account))
         for pid in self.pids(node):
             node.account.signal(pid, signal.SIGKILL, allow_fail=False)
-        node.account.ssh("rm -rf /mnt/copycat.pid /mnt/copycat.log /mnt/copycat.properties /mnt/copycat-connector.properties " + " ".join(self.files), allow_fail=False)
 
+        node.account.ssh("rm -rf /mnt/copycat.pid /mnt/copycat.log /mnt/copycat.properties  " + " ".join(self.config_filenames() + self.files), allow_fail=False)
+
+    def config_filenames(self):
+        return ["/mnt/copycat-connector-" + str(idx) + ".properties" for idx, template in enumerate(self.connector_config_templates)]
 
 class CopycatStandaloneService(CopycatServiceBase):
     """Runs Copycat in standalone mode."""
@@ -71,16 +84,6 @@ class CopycatStandaloneService(CopycatServiceBase):
     def __init__(self, context, kafka, files):
         super(CopycatStandaloneService, self).__init__(context, 1, kafka, files)
 
-    def set_configs(self, config_template, connector_config_template):
-        """
-        Set configurations for the worker and the connector to run on
-        it. These are not provided in the constructor because the worker
-        config generally needs access to ZK/Kafka services to
-        create the configuration.
-        """
-        self.config_template = config_template
-        self.connector_config_template = connector_config_template
-
     # For convenience since this service only makes sense with a single node
     @property
     def node(self):
@@ -88,12 +91,17 @@ class CopycatStandaloneService(CopycatServiceBase):
 
     def start_node(self, node):
         node.account.create_file("/mnt/copycat.properties", self.config_template)
-        node.account.create_file("/mnt/copycat-connector.properties", self.connector_config_template)
+        remote_connector_configs = []
+        for idx, template in enumerate(self.connector_config_templates):
+            target_file = "/mnt/copycat-connector-" + str(idx) + ".properties"
+            node.account.create_file(target_file, template)
+            remote_connector_configs.append(target_file)
 
         self.logger.info("Starting Copycat standalone process")
         with node.account.monitor_log("/mnt/copycat.log") as monitor:
-            node.account.ssh("/opt/kafka/bin/copycat-standalone.sh /mnt/copycat.properties /mnt/copycat-connector.properties " +
-                             "1>> /mnt/copycat.log 2>> /mnt/copycat.log & echo $! > /mnt/copycat.pid")
+            node.account.ssh("/opt/kafka/bin/copycat-standalone.sh /mnt/copycat.properties " +
+                             " ".join(remote_connector_configs) +
+                             " 1>> /mnt/copycat.log 2>> /mnt/copycat.log & echo $! > /mnt/copycat.pid")
             monitor.wait_until('Copycat started', timeout_sec=10, err_msg="Never saw message indicating Copycat finished startup")
 
         if len(self.pids(node)) == 0:
@@ -108,27 +116,28 @@ class CopycatDistributedService(CopycatServiceBase):
         super(CopycatDistributedService, self).__init__(context, num_nodes, kafka, files)
         self.offsets_topic = offsets_topic
         self.configs_topic = configs_topic
-
-    def set_configs(self, config_template, connector_config_template):
-        """
-        Set configurations for the worker and the connector to run on
-        it. These are not provided in the constructor because the worker
-        config generally needs access to ZK/Kafka services to
-        create the configuration.
-        """
-        self.config_template = config_template
-        self.connector_config_template = connector_config_template
+        self.first_start = True
 
     def start_node(self, node):
         node.account.create_file("/mnt/copycat.properties", self.config_template)
-        node.account.create_file("/mnt/copycat-connector.properties", self.connector_config_template)
+        remote_connector_configs = []
+        for idx, template in enumerate(self.connector_config_templates):
+            target_file = "/mnt/copycat-connector-" + str(idx) + ".properties"
+            node.account.create_file(target_file, template)
+            remote_connector_configs.append(target_file)
 
-        self.logger.info("Starting Copycat standalone process")
+        self.logger.info("Starting Copycat distributed process")
         with node.account.monitor_log("/mnt/copycat.log") as monitor:
-            node.account.ssh("/opt/kafka/bin/copycat-distributed.sh /mnt/copycat.properties /mnt/copycat-connector.properties " +
-                             "1>> /mnt/copycat.log 2>> /mnt/copycat.log & echo $! > /mnt/copycat.pid")
+            cmd = "/opt/kafka/bin/copycat-distributed.sh /mnt/copycat.properties "
+            # Only submit connectors on the first node so they don't get submitted multiple times. Also only submit them
+            # the first time the node is started so
+            if self.first_start and node == self.nodes[0]:
+                cmd += " ".join(remote_connector_configs)
+            cmd += " 1>> /mnt/copycat.log 2>> /mnt/copycat.log & echo $! > /mnt/copycat.pid"
+            node.account.ssh(cmd)
             monitor.wait_until('Copycat started', timeout_sec=10, err_msg="Never saw message indicating Copycat finished startup")
 
         if len(self.pids(node)) == 0:
             raise RuntimeError("No process ids recorded")
 
+        self.first_start = False

http://git-wip-us.apache.org/repos/asf/kafka/blob/2e617735/tests/kafkatest/tests/copycat_distributed_test.py
----------------------------------------------------------------------
diff --git a/tests/kafkatest/tests/copycat_distributed_test.py b/tests/kafkatest/tests/copycat_distributed_test.py
index 9d00334..57965e5 100644
--- a/tests/kafkatest/tests/copycat_distributed_test.py
+++ b/tests/kafkatest/tests/copycat_distributed_test.py
@@ -31,10 +31,12 @@ class CopycatDistributedFileTest(KafkaTest):
     OFFSETS_TOPIC = "copycat-offsets"
     CONFIG_TOPIC = "copycat-configs"
 
-    FIRST_INPUT_LISTS = [["foo", "bar", "baz"], ["foo2", "bar2", "baz2"]]
-    FIRST_INPUTS = ["\n".join(input_list) + "\n" for input_list in FIRST_INPUT_LISTS]
-    SECOND_INPUT_LISTS = [["razz", "ma", "tazz"], ["razz2", "ma2", "tazz2"]]
-    SECOND_INPUTS = ["\n".join(input_list) + "\n" for input_list in SECOND_INPUT_LISTS]
+    # Since tasks can be assigned to any node and we're testing with files, we need to make sure the content is the same
+    # across all nodes.
+    FIRST_INPUT_LIST = ["foo", "bar", "baz"]
+    FIRST_INPUTS = "\n".join(FIRST_INPUT_LIST) + "\n"
+    SECOND_INPUT_LIST = ["razz", "ma", "tazz"]
+    SECOND_INPUTS = "\n".join(SECOND_INPUT_LIST) + "\n"
 
     SCHEMA = { "type": "string", "optional": False }
 
@@ -43,13 +45,7 @@ class CopycatDistributedFileTest(KafkaTest):
             'test' : { 'partitions': 1, 'replication-factor': 1 }
         })
 
-        # FIXME these should have multiple nodes. However, currently the connectors are submitted via command line,
-        # which means we would get duplicates. Both would run, but they would have conflicting keys for offsets and
-        # configs. Until we have real distributed coordination of workers with unified connector submission, we need
-        # to restrict each of these to a single node.
-        self.num_nodes = 1
-        self.source = CopycatDistributedService(test_context, self.num_nodes, self.kafka, [self.INPUT_FILE])
-        self.sink = CopycatDistributedService(test_context, self.num_nodes, self.kafka, [self.OUTPUT_FILE])
+        self.cc = CopycatDistributedService(test_context, 2, self.kafka, [self.INPUT_FILE, self.OUTPUT_FILE])
 
     def test_file_source_and_sink(self, converter="org.apache.kafka.copycat.json.JsonConverter", schemas=True):
         assert converter != None, "converter type must be set"
@@ -58,33 +54,40 @@ class CopycatDistributedFileTest(KafkaTest):
         self.value_converter = converter
         self.schemas = schemas
 
-        # These need to be set
-        self.source.set_configs(self.render("copycat-distributed.properties"), self.render("copycat-file-source.properties"))
-        self.sink.set_configs(self.render("copycat-distributed.properties"), self.render("copycat-file-sink.properties"))
+        self.cc.set_configs(self.render("copycat-distributed.properties"), [self.render("copycat-file-source.properties"), self.render("copycat-file-sink.properties")])
 
-        self.source.start()
-        self.sink.start()
+        self.cc.start()
 
-        # Generating data on the source node should generate new records and create new output on the sink node
-        for node, input in zip(self.source.nodes, self.FIRST_INPUTS):
-            node.account.ssh("echo -e -n " + repr(input) + " >> " + self.INPUT_FILE)
-        wait_until(lambda: self.validate_output(self.FIRST_INPUT_LISTS[:self.num_nodes]), timeout_sec=60, err_msg="Data added to input file was not seen in the output file in a reasonable amount of time.")
+        # Generating data on the source node should generate new records and create new output on the sink node. Timeouts
+        # here need to be more generous than they are for standalone mode because a) it takes longer to write configs,
+        # do rebalancing of the group, etc, and b) without explicit leave group support, rebalancing takes awhile
+        for node in self.cc.nodes:
+            node.account.ssh("echo -e -n " + repr(self.FIRST_INPUTS) + " >> " + self.INPUT_FILE)
+        wait_until(lambda: self.validate_output(self.FIRST_INPUT_LIST), timeout_sec=120, err_msg="Data added to input file was not seen in the output file in a reasonable amount of time.")
 
         # Restarting both should result in them picking up where they left off,
         # only processing new data.
-        self.source.restart()
-        self.sink.restart()
+        self.cc.restart()
 
-        for node, input in zip(self.source.nodes, self.SECOND_INPUTS):
-            node.account.ssh("echo -e -n " + repr(input) + " >> " + self.INPUT_FILE)
-        wait_until(lambda: self.validate_output(self.FIRST_INPUT_LISTS[:self.num_nodes] + self.SECOND_INPUT_LISTS[:self.num_nodes]), timeout_sec=60, err_msg="Sink output file never converged to the same state as the input file")
+        for node in self.cc.nodes:
+            node.account.ssh("echo -e -n " + repr(self.SECOND_INPUTS) + " >> " + self.INPUT_FILE)
+        wait_until(lambda: self.validate_output(self.FIRST_INPUT_LIST + self.SECOND_INPUT_LIST), timeout_sec=120, err_msg="Sink output file never converged to the same state as the input file")
 
-    def validate_output(self, inputs):
+    def validate_output(self, input):
+        input_set = set(input)
+        # Output needs to be collected from all nodes because we can't be sure where the tasks will be scheduled.
+        # Between the first and second rounds, we might even end up with half the data on each node.
+        output_set = set(itertools.chain(*[
+            [line.strip() for line in self.file_contents(node, self.OUTPUT_FILE)] for node in self.cc.nodes
+        ]))
+        #print input_set, output_set
+        return input_set == output_set
+
+
+    def file_contents(self, node, file):
         try:
-            input_set = set(itertools.chain(*inputs))
-            output_set = set(itertools.chain(*[
-                [line.strip() for line in node.account.ssh_capture("cat " + self.OUTPUT_FILE)] for node in self.sink.nodes
-            ]))
-            return input_set == output_set
+            # Convert to a list here or the CalledProcessError may be returned during a call to the generator instead of
+            # immediately
+            return list(node.account.ssh_capture("cat " + file))
         except subprocess.CalledProcessError:
-            return False
+            return []

http://git-wip-us.apache.org/repos/asf/kafka/blob/2e617735/tests/kafkatest/tests/copycat_test.py
----------------------------------------------------------------------
diff --git a/tests/kafkatest/tests/copycat_test.py b/tests/kafkatest/tests/copycat_test.py
index 1bd8ccb..bad5330 100644
--- a/tests/kafkatest/tests/copycat_test.py
+++ b/tests/kafkatest/tests/copycat_test.py
@@ -60,9 +60,8 @@ class CopycatStandaloneFileTest(KafkaTest):
         self.value_converter = converter
         self.schemas = schemas
 
-        # These need to be set
-        self.source.set_configs(self.render("copycat-standalone.properties"), self.render("copycat-file-source.properties"))
-        self.sink.set_configs(self.render("copycat-standalone.properties"), self.render("copycat-file-sink.properties"))
+        self.source.set_configs(self.render("copycat-standalone.properties"), [self.render("copycat-file-source.properties")])
+        self.sink.set_configs(self.render("copycat-standalone.properties"), [self.render("copycat-file-sink.properties")])
 
         self.source.start()
         self.sink.start()

http://git-wip-us.apache.org/repos/asf/kafka/blob/2e617735/tests/kafkatest/tests/templates/copycat-distributed.properties
----------------------------------------------------------------------
diff --git a/tests/kafkatest/tests/templates/copycat-distributed.properties b/tests/kafkatest/tests/templates/copycat-distributed.properties
index 31f9901..325dc85 100644
--- a/tests/kafkatest/tests/templates/copycat-distributed.properties
+++ b/tests/kafkatest/tests/templates/copycat-distributed.properties
@@ -15,6 +15,8 @@
 
 bootstrap.servers={{ kafka.bootstrap_servers() }}
 
+group.id={{ group|default("copycat-cluster") }}
+
 key.converter={{ key_converter|default("org.apache.kafka.copycat.json.JsonConverter") }}
 value.converter={{ value_converter|default("org.apache.kafka.copycat.json.JsonConverter") }}
 {% if key_converter is not defined or key_converter.endswith("JsonConverter") %}
@@ -30,4 +32,7 @@ internal.key.converter.schemas.enable=false
 internal.value.converter.schemas.enable=false
 
 offset.storage.topic={{ OFFSETS_TOPIC }}
-config.storage.topic={{ CONFIG_TOPIC }}
\ No newline at end of file
+config.storage.topic={{ CONFIG_TOPIC }}
+
+# Make sure data gets flushed frequently so tests don't have to wait to ensure they see data in output systems
+offset.flush.interval.ms=5000


[3/3] kafka git commit: KAFKA-2371: Add distributed support for Copycat.

Posted by gw...@apache.org.
KAFKA-2371: Add distributed support for Copycat.

This adds coordination between DistributedHerders using the generalized consumer
support, allowing automatic balancing of connectors and tasks across workers. A
few pieces that require interaction between workers (resolving config
inconsistencies, forwarding of configuration changes to the leader worker) are
incomplete because they require REST API support to implement properly.

Author: Ewen Cheslack-Postava <me...@ewencp.org>

Reviewers: Jason Gustafson, Gwen Shapira

Closes #321 from ewencp/kafka-2371-distributed-herder


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/2e617735
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/2e617735
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/2e617735

Branch: refs/heads/trunk
Commit: 2e61773590c0ba86cb8813e6ba17bf6ee33f4461
Parents: 21443f2
Author: Ewen Cheslack-Postava <me...@ewencp.org>
Authored: Fri Oct 23 16:37:30 2015 -0700
Committer: Gwen Shapira <cs...@gmail.com>
Committed: Fri Oct 23 16:37:30 2015 -0700

----------------------------------------------------------------------
 build.gradle                                    |   1 +
 checkstyle/import-control.xml                   |   1 +
 .../clients/consumer/RoundRobinAssignor.java    |  35 +-
 .../consumer/internals/AbstractCoordinator.java |   8 +-
 .../kafka/common/utils/CircularIterator.java    |  54 ++
 config/copycat-distributed.properties           |   2 +
 .../kafka/copycat/file/FileStreamSinkTask.java  |  12 +-
 .../copycat/file/FileStreamSourceTask.java      |  17 +-
 .../kafka/copycat/cli/CopycatDistributed.java   |   7 +-
 .../kafka/copycat/runtime/ConnectorConfig.java  |   2 +-
 .../kafka/copycat/runtime/TaskConfig.java       |  54 ++
 .../apache/kafka/copycat/runtime/Worker.java    | 145 +++-
 .../runtime/distributed/ClusterConfigState.java |  40 +-
 .../runtime/distributed/CopycatProtocol.java    | 246 +++++++
 .../runtime/distributed/DistributedHerder.java  | 733 +++++++++++++------
 .../distributed/DistributedHerderConfig.java    | 192 +++++
 .../runtime/distributed/NotLeaderException.java |  38 +
 .../runtime/distributed/WorkerCoordinator.java  | 288 ++++++++
 .../runtime/distributed/WorkerGroupMember.java  | 184 +++++
 .../distributed/WorkerRebalanceListener.java    |  38 +
 .../runtime/standalone/StandaloneHerder.java    | 168 ++---
 .../copycat/storage/KafkaConfigStorage.java     |  64 +-
 .../storage/KafkaOffsetBackingStore.java        |   2 +
 .../kafka/copycat/util/ConnectorTaskId.java     |  10 +-
 .../kafka/copycat/runtime/WorkerTest.java       | 199 ++++-
 .../distributed/DistributedHerderTest.java      | 436 ++++++-----
 .../distributed/WorkerCoordinatorTest.java      | 436 +++++++++++
 .../standalone/StandaloneHerderTest.java        |  45 +-
 .../copycat/storage/KafkaConfigStorageTest.java |  49 +-
 .../apache/kafka/copycat/util/TestFuture.java   |  10 +-
 tests/kafkatest/services/copycat.py             |  67 +-
 .../kafkatest/tests/copycat_distributed_test.py |  67 +-
 tests/kafkatest/tests/copycat_test.py           |   5 +-
 .../templates/copycat-distributed.properties    |   7 +-
 34 files changed, 2966 insertions(+), 696 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/2e617735/build.gradle
----------------------------------------------------------------------
diff --git a/build.gradle b/build.gradle
index 16fb981..128c195 100644
--- a/build.gradle
+++ b/build.gradle
@@ -754,6 +754,7 @@ project(':copycat:runtime') {
     testCompile "$easymock"
     testCompile "$powermock"
     testCompile "$powermock_easymock"
+    testCompile project(':clients').sourceSets.test.output
     testRuntime "$slf4jlog4j"
     testRuntime project(":copycat:json")
   }

http://git-wip-us.apache.org/repos/asf/kafka/blob/2e617735/checkstyle/import-control.xml
----------------------------------------------------------------------
diff --git a/checkstyle/import-control.xml b/checkstyle/import-control.xml
index 6474865..e1ea93c 100644
--- a/checkstyle/import-control.xml
+++ b/checkstyle/import-control.xml
@@ -146,6 +146,7 @@
     <allow pkg="org.apache.kafka.copycat.data" />
     <allow pkg="org.apache.kafka.copycat.errors" />
     <allow pkg="org.apache.kafka.clients" />
+    <allow pkg="org.apache.kafka.test"/>
 
     <subpackage name="source">
       <allow pkg="org.apache.kafka.copycat.connector" />

http://git-wip-us.apache.org/repos/asf/kafka/blob/2e617735/clients/src/main/java/org/apache/kafka/clients/consumer/RoundRobinAssignor.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/RoundRobinAssignor.java b/clients/src/main/java/org/apache/kafka/clients/consumer/RoundRobinAssignor.java
index c5ea2bb..b8dc253 100644
--- a/clients/src/main/java/org/apache/kafka/clients/consumer/RoundRobinAssignor.java
+++ b/clients/src/main/java/org/apache/kafka/clients/consumer/RoundRobinAssignor.java
@@ -14,11 +14,11 @@ package org.apache.kafka.clients.consumer;
 
 import org.apache.kafka.clients.consumer.internals.AbstractPartitionAssignor;
 import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.utils.CircularIterator;
 import org.apache.kafka.common.utils.Utils;
 
 import java.util.ArrayList;
 import java.util.HashMap;
-import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
 import java.util.SortedSet;
@@ -78,37 +78,4 @@ public class RoundRobinAssignor extends AbstractPartitionAssignor {
         return "roundrobin";
     }
 
-    private static class CircularIterator<T> implements Iterator<T> {
-        int i = 0;
-        private List<T> list;
-
-        public CircularIterator(List<T> list) {
-            if (list.isEmpty()) {
-                throw new IllegalArgumentException("CircularIterator can only be used on non-empty lists");
-            }
-            this.list = list;
-        }
-
-        @Override
-        public boolean hasNext() {
-            return true;
-        }
-
-        @Override
-        public T next() {
-            T next = list.get(i);
-            i = (i + 1) % list.size();
-            return next;
-        }
-
-        public T peek() {
-            return list.get(i);
-        }
-
-        @Override
-        public void remove() {
-            throw new UnsupportedOperationException();
-        }
-    }
-
 }

http://git-wip-us.apache.org/repos/asf/kafka/blob/2e617735/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java
index 1ffd2bb..a2b9ec5 100644
--- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java
+++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java
@@ -199,7 +199,7 @@ public abstract class AbstractCoordinator {
         this.memberId = JoinGroupRequest.UNKNOWN_MEMBER_ID;
         rejoinNeeded = true;
     }
-
+    private boolean needsOnLeave = true;
     /**
      * Ensure that the group is active (i.e. joined and synced)
      */
@@ -208,7 +208,10 @@ public abstract class AbstractCoordinator {
             return;
 
         // onLeave only invoked if we have a valid current generation
-        onLeave(generation, memberId);
+        if (needsOnLeave) {
+            onLeave(generation, memberId);
+            needsOnLeave = false;
+        }
 
         while (needRejoin()) {
             ensureCoordinatorKnown();
@@ -225,6 +228,7 @@ public abstract class AbstractCoordinator {
 
             if (future.succeeded()) {
                 onJoin(generation, memberId, protocol, future.value());
+                needsOnLeave = true;
                 heartbeatTask.reset();
             } else {
                 if (future.exception() instanceof UnknownMemberIdException)

http://git-wip-us.apache.org/repos/asf/kafka/blob/2e617735/clients/src/main/java/org/apache/kafka/common/utils/CircularIterator.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/utils/CircularIterator.java b/clients/src/main/java/org/apache/kafka/common/utils/CircularIterator.java
new file mode 100644
index 0000000..00be783
--- /dev/null
+++ b/clients/src/main/java/org/apache/kafka/common/utils/CircularIterator.java
@@ -0,0 +1,54 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ **/
+
+package org.apache.kafka.common.utils;
+
+import java.util.Iterator;
+import java.util.List;
+
+public class CircularIterator<T> implements Iterator<T> {
+    int i = 0;
+    private List<T> list;
+
+    public CircularIterator(List<T> list) {
+        if (list.isEmpty()) {
+            throw new IllegalArgumentException("CircularIterator can only be used on non-empty lists");
+        }
+        this.list = list;
+    }
+
+    @Override
+    public boolean hasNext() {
+        return true;
+    }
+
+    @Override
+    public T next() {
+        T next = list.get(i);
+        i = (i + 1) % list.size();
+        return next;
+    }
+
+    public T peek() {
+        return list.get(i);
+    }
+
+    @Override
+    public void remove() {
+        throw new UnsupportedOperationException();
+    }
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/2e617735/config/copycat-distributed.properties
----------------------------------------------------------------------
diff --git a/config/copycat-distributed.properties b/config/copycat-distributed.properties
index b122413..2ea5b73 100644
--- a/config/copycat-distributed.properties
+++ b/config/copycat-distributed.properties
@@ -18,6 +18,8 @@
 # These are defaults. This file just demonstrates how to override some settings.
 bootstrap.servers=localhost:9092
 
+group.id=copycat-cluster
+
 # The converters specify the format of data in Kafka and how to translate it into Copycat data. Every Copycat user will
 # need to configure these based on the format they want their data in when loaded from or stored into Kafka
 key.converter=org.apache.kafka.copycat.json.JsonConverter

http://git-wip-us.apache.org/repos/asf/kafka/blob/2e617735/copycat/file/src/main/java/org/apache/kafka/copycat/file/FileStreamSinkTask.java
----------------------------------------------------------------------
diff --git a/copycat/file/src/main/java/org/apache/kafka/copycat/file/FileStreamSinkTask.java b/copycat/file/src/main/java/org/apache/kafka/copycat/file/FileStreamSinkTask.java
index 9ea459c..6dfe4a7 100644
--- a/copycat/file/src/main/java/org/apache/kafka/copycat/file/FileStreamSinkTask.java
+++ b/copycat/file/src/main/java/org/apache/kafka/copycat/file/FileStreamSinkTask.java
@@ -38,6 +38,7 @@ import java.util.Properties;
 public class FileStreamSinkTask extends SinkTask {
     private static final Logger log = LoggerFactory.getLogger(FileStreamSinkTask.class);
 
+    private String filename;
     private PrintStream outputStream;
 
     public FileStreamSinkTask() {
@@ -45,12 +46,13 @@ public class FileStreamSinkTask extends SinkTask {
 
     // for testing
     public FileStreamSinkTask(PrintStream outputStream) {
+        filename = null;
         this.outputStream = outputStream;
     }
 
     @Override
     public void start(Properties props) {
-        String filename = props.getProperty(FileStreamSinkConnector.FILE_CONFIG);
+        filename = props.getProperty(FileStreamSinkConnector.FILE_CONFIG);
         if (filename == null) {
             outputStream = System.out;
         } else {
@@ -65,16 +67,24 @@ public class FileStreamSinkTask extends SinkTask {
     @Override
     public void put(Collection<SinkRecord> sinkRecords) {
         for (SinkRecord record : sinkRecords) {
+            log.trace("Writing line to {}: {}", logFilename(), record.value());
             outputStream.println(record.value());
         }
     }
 
     @Override
     public void flush(Map<TopicPartition, OffsetAndMetadata> offsets) {
+        log.trace("Flushing output stream for {}", logFilename());
         outputStream.flush();
     }
 
     @Override
     public void stop() {
+        if (outputStream != System.out)
+            outputStream.close();
+    }
+
+    private String logFilename() {
+        return filename == null ? "stdout" : filename;
     }
 }

http://git-wip-us.apache.org/repos/asf/kafka/blob/2e617735/copycat/file/src/main/java/org/apache/kafka/copycat/file/FileStreamSourceTask.java
----------------------------------------------------------------------
diff --git a/copycat/file/src/main/java/org/apache/kafka/copycat/file/FileStreamSourceTask.java b/copycat/file/src/main/java/org/apache/kafka/copycat/file/FileStreamSourceTask.java
index cf71be3..f2249d0 100644
--- a/copycat/file/src/main/java/org/apache/kafka/copycat/file/FileStreamSourceTask.java
+++ b/copycat/file/src/main/java/org/apache/kafka/copycat/file/FileStreamSourceTask.java
@@ -56,7 +56,7 @@ public class FileStreamSourceTask extends SourceTask {
         }
         topic = props.getProperty(FileStreamSourceConnector.TOPIC_CONFIG);
         if (topic == null)
-            throw new CopycatException("ConsoleSourceTask config missing topic setting");
+            throw new CopycatException("FileStreamSourceTask config missing topic setting");
     }
 
     @Override
@@ -88,6 +88,7 @@ public class FileStreamSourceTask extends SourceTask {
                     streamOffset = 0L;
                 }
                 reader = new BufferedReader(new InputStreamReader(stream));
+                log.debug("Opened {} for reading", logFilename());
             } catch (FileNotFoundException e) {
                 log.warn("Couldn't find file for FileStreamSourceTask, sleeping to wait for it to be created");
                 synchronized (this) {
@@ -113,6 +114,7 @@ public class FileStreamSourceTask extends SourceTask {
             int nread = 0;
             while (readerCopy.ready()) {
                 nread = readerCopy.read(buffer, offset, buffer.length - offset);
+                log.trace("Read {} bytes from {}", nread, logFilename());
 
                 if (nread > 0) {
                     offset += nread;
@@ -126,6 +128,7 @@ public class FileStreamSourceTask extends SourceTask {
                     do {
                         line = extractLine();
                         if (line != null) {
+                            log.trace("Read a line from {}", logFilename());
                             if (records == null)
                                 records = new ArrayList<>();
                             records.add(new SourceRecord(offsetKey(filename), offsetValue(streamOffset), topic, VALUE_SCHEMA, line));
@@ -183,10 +186,12 @@ public class FileStreamSourceTask extends SourceTask {
         log.trace("Stopping");
         synchronized (this) {
             try {
-                stream.close();
-                log.trace("Closed input stream");
+                if (stream != null && stream != System.in) {
+                    stream.close();
+                    log.trace("Closed input stream");
+                }
             } catch (IOException e) {
-                log.error("Failed to close ConsoleSourceTask stream: ", e);
+                log.error("Failed to close FileStreamSourceTask stream: ", e);
             }
             this.notify();
         }
@@ -199,4 +204,8 @@ public class FileStreamSourceTask extends SourceTask {
     private Map<String, Long> offsetValue(Long pos) {
         return Collections.singletonMap(POSITION_FIELD, pos);
     }
+
+    private String logFilename() {
+        return filename == null ? "stdin" : filename;
+    }
 }

http://git-wip-us.apache.org/repos/asf/kafka/blob/2e617735/copycat/runtime/src/main/java/org/apache/kafka/copycat/cli/CopycatDistributed.java
----------------------------------------------------------------------
diff --git a/copycat/runtime/src/main/java/org/apache/kafka/copycat/cli/CopycatDistributed.java b/copycat/runtime/src/main/java/org/apache/kafka/copycat/cli/CopycatDistributed.java
index b0230b2..0ff6e81 100644
--- a/copycat/runtime/src/main/java/org/apache/kafka/copycat/cli/CopycatDistributed.java
+++ b/copycat/runtime/src/main/java/org/apache/kafka/copycat/cli/CopycatDistributed.java
@@ -48,8 +48,8 @@ public class CopycatDistributed {
         Properties workerProps;
         Properties connectorProps;
 
-        if (args.length < 2) {
-            log.info("Usage: CopycatDistributed worker.properties connector1.properties [connector2.properties ...]");
+        if (args.length < 1) {
+            log.info("Usage: CopycatDistributed worker.properties [connector1.properties connector2.properties ...]");
             System.exit(1);
         }
 
@@ -58,8 +58,7 @@ public class CopycatDistributed {
 
         WorkerConfig workerConfig = new WorkerConfig(workerProps);
         Worker worker = new Worker(workerConfig, new KafkaOffsetBackingStore());
-        DistributedHerder herder = new DistributedHerder(worker);
-        herder.configure(workerConfig.originals());
+        DistributedHerder herder = new DistributedHerder(worker, workerConfig.originals());
         final Copycat copycat = new Copycat(worker, herder);
         copycat.start();
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/2e617735/copycat/runtime/src/main/java/org/apache/kafka/copycat/runtime/ConnectorConfig.java
----------------------------------------------------------------------
diff --git a/copycat/runtime/src/main/java/org/apache/kafka/copycat/runtime/ConnectorConfig.java b/copycat/runtime/src/main/java/org/apache/kafka/copycat/runtime/ConnectorConfig.java
index 767c88b..2242299 100644
--- a/copycat/runtime/src/main/java/org/apache/kafka/copycat/runtime/ConnectorConfig.java
+++ b/copycat/runtime/src/main/java/org/apache/kafka/copycat/runtime/ConnectorConfig.java
@@ -59,7 +59,7 @@ public class ConnectorConfig extends AbstractConfig {
     static {
         config = new ConfigDef()
                 .define(NAME_CONFIG, Type.STRING, Importance.HIGH, NAME_DOC)
-                .define(CONNECTOR_CLASS_CONFIG, Type.STRING, Importance.HIGH, CONNECTOR_CLASS_DOC)
+                .define(CONNECTOR_CLASS_CONFIG, Type.CLASS, Importance.HIGH, CONNECTOR_CLASS_DOC)
                 .define(TASKS_MAX_CONFIG, Type.INT, TASKS_MAX_DEFAULT, Importance.HIGH, TASKS_MAX_DOC)
                 .define(TOPICS_CONFIG, Type.LIST, TOPICS_DEFAULT, Importance.HIGH, TOPICS_DOC);
     }

http://git-wip-us.apache.org/repos/asf/kafka/blob/2e617735/copycat/runtime/src/main/java/org/apache/kafka/copycat/runtime/TaskConfig.java
----------------------------------------------------------------------
diff --git a/copycat/runtime/src/main/java/org/apache/kafka/copycat/runtime/TaskConfig.java b/copycat/runtime/src/main/java/org/apache/kafka/copycat/runtime/TaskConfig.java
new file mode 100644
index 0000000..be97879
--- /dev/null
+++ b/copycat/runtime/src/main/java/org/apache/kafka/copycat/runtime/TaskConfig.java
@@ -0,0 +1,54 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ **/
+
+package org.apache.kafka.copycat.runtime;
+
+import org.apache.kafka.common.config.AbstractConfig;
+import org.apache.kafka.common.config.ConfigDef;
+import org.apache.kafka.common.config.ConfigDef.Importance;
+import org.apache.kafka.common.config.ConfigDef.Type;
+
+import java.util.HashMap;
+import java.util.Map;
+
+/**
+ * <p>
+ * Configuration options for Tasks. These only include Copycat system-level configuration
+ * options.
+ * </p>
+ */
+public class TaskConfig extends AbstractConfig {
+
+    public static final String TASK_CLASS_CONFIG = "task.class";
+    private static final String TASK_CLASS_DOC =
+            "Name of the class for this task. Must be a subclass of org.apache.kafka.copycat.connector.Task";
+
+    private static ConfigDef config;
+
+    static {
+        config = new ConfigDef()
+                .define(TASK_CLASS_CONFIG, Type.CLASS, Importance.HIGH, TASK_CLASS_DOC);
+    }
+
+    public TaskConfig() {
+        this(new HashMap<String, String>());
+    }
+
+    public TaskConfig(Map<String, ?> props) {
+        super(config, props);
+    }
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/2e617735/copycat/runtime/src/main/java/org/apache/kafka/copycat/runtime/Worker.java
----------------------------------------------------------------------
diff --git a/copycat/runtime/src/main/java/org/apache/kafka/copycat/runtime/Worker.java b/copycat/runtime/src/main/java/org/apache/kafka/copycat/runtime/Worker.java
index 0fdab4c..b37e49f 100644
--- a/copycat/runtime/src/main/java/org/apache/kafka/copycat/runtime/Worker.java
+++ b/copycat/runtime/src/main/java/org/apache/kafka/copycat/runtime/Worker.java
@@ -17,12 +17,15 @@
 
 package org.apache.kafka.copycat.runtime;
 
+import org.apache.kafka.common.KafkaException;
 import org.apache.kafka.common.utils.SystemTime;
 import org.apache.kafka.common.utils.Time;
 import org.apache.kafka.clients.producer.KafkaProducer;
 import org.apache.kafka.clients.producer.ProducerConfig;
 import org.apache.kafka.common.utils.Utils;
 import org.apache.kafka.copycat.cli.WorkerConfig;
+import org.apache.kafka.copycat.connector.Connector;
+import org.apache.kafka.copycat.connector.ConnectorContext;
 import org.apache.kafka.copycat.connector.Task;
 import org.apache.kafka.copycat.errors.CopycatException;
 import org.apache.kafka.copycat.sink.SinkTask;
@@ -33,8 +36,10 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import java.util.HashMap;
+import java.util.List;
 import java.util.Map;
 import java.util.Properties;
+import java.util.Set;
 
 /**
  * <p>
@@ -55,6 +60,7 @@ public class Worker {
     private Converter internalKeyConverter;
     private Converter internalValueConverter;
     private OffsetBackingStore offsetBackingStore;
+    private HashMap<String, Connector> connectors = new HashMap<>();
     private HashMap<ConnectorTaskId, WorkerTask> tasks = new HashMap<>();
     private KafkaProducer<byte[], byte[]> producer;
     private SourceTaskOffsetCommitter sourceTaskOffsetCommitter;
@@ -106,6 +112,17 @@ public class Worker {
         long started = time.milliseconds();
         long limit = started + config.getLong(WorkerConfig.TASK_SHUTDOWN_GRACEFUL_TIMEOUT_MS_CONFIG);
 
+        for (Map.Entry<String, Connector> entry : connectors.entrySet()) {
+            Connector conn = entry.getValue();
+            log.warn("Shutting down connector {} uncleanly; herder should have shut down connectors before the" +
+                    "Worker is stopped.", conn);
+            try {
+                conn.stop();
+            } catch (CopycatException e) {
+                log.error("Error while shutting down connector " + conn, e);
+            }
+        }
+
         for (Map.Entry<ConnectorTaskId, WorkerTask> entry : tasks.entrySet()) {
             WorkerTask task = entry.getValue();
             log.warn("Shutting down task {} uncleanly; herder should have shut down "
@@ -134,15 +151,106 @@ public class Worker {
     }
 
     /**
+     * Add a new connector.
+     * @param connConfig connector configuration
+     * @param ctx context for the connector
+     */
+    public void addConnector(ConnectorConfig connConfig, ConnectorContext ctx) {
+        String connName = connConfig.getString(ConnectorConfig.NAME_CONFIG);
+        Class<?> maybeConnClass = connConfig.getClass(ConnectorConfig.CONNECTOR_CLASS_CONFIG);
+        log.info("Creating connector {} of type {}", connName, maybeConnClass.getName());
+
+        Class<? extends Connector> connClass;
+        try {
+            connClass = maybeConnClass.asSubclass(Connector.class);
+        } catch (ClassCastException e) {
+            throw new CopycatException("Specified class is not a subclass of Connector: " + maybeConnClass.getName());
+        }
+
+        if (connectors.containsKey(connName))
+            throw new CopycatException("Connector with name " + connName + " already exists");
+
+        final Connector connector = instantiateConnector(connClass);
+        connector.initialize(ctx);
+        try {
+            Map<String, Object> originals = connConfig.originals();
+            Properties props = new Properties();
+            props.putAll(originals);
+            connector.start(props);
+        } catch (CopycatException e) {
+            throw new CopycatException("Connector threw an exception while starting", e);
+        }
+
+        connectors.put(connName, connector);
+
+        log.info("Finished creating connector {}", connName);
+    }
+
+    private static Connector instantiateConnector(Class<? extends Connector> connClass) {
+        try {
+            return Utils.newInstance(connClass);
+        } catch (Throwable t) {
+            // Catches normal exceptions due to instantiation errors as well as any runtime errors that
+            // may be caused by user code
+            throw new CopycatException("Failed to create connector instance", t);
+        }
+    }
+
+    public Map<ConnectorTaskId, Map<String, String>> reconfigureConnectorTasks(String connName, int maxTasks, List<String> sinkTopics) {
+        log.trace("Reconfiguring connector tasks for {}", connName);
+
+        Connector connector = connectors.get(connName);
+        if (connector == null)
+            throw new CopycatException("Connector " + connName + " not found in this worker.");
+
+        Map<ConnectorTaskId, Map<String, String>> result = new HashMap<>();
+        String taskClassName = connector.taskClass().getName();
+        int index = 0;
+        for (Properties taskProps : connector.taskConfigs(maxTasks)) {
+            ConnectorTaskId taskId = new ConnectorTaskId(connName, index);
+            index++;
+            Map<String, String> taskConfig = Utils.propsToStringMap(taskProps);
+            taskConfig.put(TaskConfig.TASK_CLASS_CONFIG, taskClassName);
+            if (sinkTopics != null)
+                taskConfig.put(SinkTask.TOPICS_CONFIG, Utils.join(sinkTopics, ","));
+            result.put(taskId, taskConfig);
+        }
+        return result;
+    }
+
+    public void stopConnector(String connName) {
+        log.info("Stopping connector {}", connName);
+
+        Connector connector = connectors.get(connName);
+        if (connector == null)
+            throw new CopycatException("Connector " + connName + " not found in this worker.");
+
+        try {
+            connector.stop();
+        } catch (CopycatException e) {
+            log.error("Error shutting down connector {}: ", connector, e);
+        }
+
+        connectors.remove(connName);
+
+        log.info("Stopped connector {}", connName);
+    }
+
+    /**
+     * Get the IDs of the connectors currently running in this worker.
+     */
+    public Set<String> connectorNames() {
+        return connectors.keySet();
+    }
+
+    /**
      * Add a new task.
      * @param id Globally unique ID for this task.
-     * @param taskClassName name of the {@link org.apache.kafka.copycat.connector.Task}
-     *                      class to instantiate. Must be a subclass of either
-     *                      {@link org.apache.kafka.copycat.source.SourceTask} or
-     *                      {@link org.apache.kafka.copycat.sink.SinkTask}.
-     * @param props configuration options for the task
+     * @param taskConfig the parsed task configuration
      */
-    public void addTask(ConnectorTaskId id, String taskClassName, Properties props) {
+    public void addTask(ConnectorTaskId id, TaskConfig taskConfig) {
+        log.info("Creating task {}", id);
+
         if (tasks.containsKey(id)) {
             String msg = "Task already exists in this worker; the herder should not have requested "
                     + "that this : " + id;
@@ -150,7 +258,7 @@ public class Worker {
             throw new CopycatException(msg);
         }
 
-        final Task task = instantiateTask(taskClassName);
+        final Task task = instantiateTask(taskConfig.getClass(TaskConfig.TASK_CLASS_CONFIG).asSubclass(Task.class));
 
         // Decide which type of worker task we need based on the type of task.
         final WorkerTask workerTask;
@@ -171,20 +279,30 @@ public class Worker {
 
         // Start the task before adding modifying any state, any exceptions are caught higher up the
         // call chain and there's no cleanup to do here
+        Properties props = new Properties();
+        props.putAll(taskConfig.originals());
         workerTask.start(props);
+        if (task instanceof SourceTask) {
+            WorkerSourceTask workerSourceTask = (WorkerSourceTask) workerTask;
+            sourceTaskOffsetCommitter.schedule(id, workerSourceTask);
+        }
         tasks.put(id, workerTask);
     }
 
-    private static Task instantiateTask(String taskClassName) {
+    private static Task instantiateTask(Class<? extends Task> taskClass) {
         try {
-            return Utils.newInstance(Class.forName(taskClassName).asSubclass(Task.class));
-        } catch (ClassNotFoundException e) {
+            return Utils.newInstance(taskClass);
+        } catch (KafkaException e) {
             throw new CopycatException("Task class not found", e);
         }
     }
 
     public void stopTask(ConnectorTaskId id) {
+        log.info("Stopping task {}", id);
+
         WorkerTask task = getTask(id);
+        if (task instanceof WorkerSourceTask)
+            sourceTaskOffsetCommitter.remove(id);
         task.stop();
         if (!task.awaitStop(config.getLong(WorkerConfig.TASK_SHUTDOWN_GRACEFUL_TIMEOUT_MS_CONFIG)))
             log.error("Graceful stop of task {} failed.", task);
@@ -192,6 +310,13 @@ public class Worker {
         tasks.remove(id);
     }
 
+    /**
+     * Get the IDs of the tasks currently running in this worker.
+     */
+    public Set<ConnectorTaskId> taskIds() {
+        return tasks.keySet();
+    }
+
     private WorkerTask getTask(ConnectorTaskId id) {
         WorkerTask task = tasks.get(id);
         if (task == null) {

http://git-wip-us.apache.org/repos/asf/kafka/blob/2e617735/copycat/runtime/src/main/java/org/apache/kafka/copycat/runtime/distributed/ClusterConfigState.java
----------------------------------------------------------------------
diff --git a/copycat/runtime/src/main/java/org/apache/kafka/copycat/runtime/distributed/ClusterConfigState.java b/copycat/runtime/src/main/java/org/apache/kafka/copycat/runtime/distributed/ClusterConfigState.java
index 719dd09..a46141e 100644
--- a/copycat/runtime/src/main/java/org/apache/kafka/copycat/runtime/distributed/ClusterConfigState.java
+++ b/copycat/runtime/src/main/java/org/apache/kafka/copycat/runtime/distributed/ClusterConfigState.java
@@ -19,10 +19,8 @@ package org.apache.kafka.copycat.runtime.distributed;
 
 import org.apache.kafka.copycat.util.ConnectorTaskId;
 
-import java.util.ArrayList;
-import java.util.Collection;
 import java.util.Collections;
-import java.util.List;
+import java.util.HashSet;
 import java.util.Map;
 import java.util.Set;
 
@@ -30,6 +28,10 @@ import java.util.Set;
  * An immutable snapshot of the configuration state of connectors and tasks in a Copycat cluster.
  */
 public class ClusterConfigState {
+    public static final ClusterConfigState EMPTY = new ClusterConfigState(-1, Collections.<String, Integer>emptyMap(),
+            Collections.<String, Map<String, String>>emptyMap(), Collections.<ConnectorTaskId, Map<String, String>>emptyMap(),
+            Collections.<String>emptySet());
+
     private final long offset;
     private final Map<String, Integer> connectorTaskCounts;
     private final Map<String, Map<String, String>> connectorConfigs;
@@ -60,8 +62,8 @@ public class ClusterConfigState {
     /**
      * Get a list of the connectors in this configuration
      */
-    public Collection<String> connectors() {
-        return connectorTaskCounts.keySet();
+    public Set<String> connectors() {
+        return connectorConfigs.keySet();
     }
 
     /**
@@ -83,19 +85,29 @@ public class ClusterConfigState {
     }
 
     /**
+     * Get the number of tasks assigned for the given conncetor.
+     * @param connectorName name of the connector to look up tasks for
+     * @return the number of tasks
+     */
+    public int taskCount(String connectorName) {
+        Integer count = connectorTaskCounts.get(connectorName);
+        return count == null ? 0 : count;
+    }
+
+    /**
      * Get the current set of task IDs for the specified connector.
      * @param connectorName the name of the connector to look up task configs for
      * @return the current set of connector task IDs
      */
-    public Collection<ConnectorTaskId> tasks(String connectorName) {
+    public Set<ConnectorTaskId> tasks(String connectorName) {
         if (inconsistentConnectors.contains(connectorName))
-            return Collections.EMPTY_LIST;
+            return Collections.emptySet();
 
         Integer numTasks = connectorTaskCounts.get(connectorName);
         if (numTasks == null)
-            throw new IllegalArgumentException("Connector does not exist in current configuration.");
+            return Collections.emptySet();
 
-        List<ConnectorTaskId> taskIds = new ArrayList<>();
+        Set<ConnectorTaskId> taskIds = new HashSet<>();
         for (int taskIndex = 0; taskIndex < numTasks; taskIndex++) {
             ConnectorTaskId taskId = new ConnectorTaskId(connectorName, taskIndex);
             taskIds.add(taskId);
@@ -119,4 +131,14 @@ public class ClusterConfigState {
         return inconsistentConnectors;
     }
 
+    @Override
+    public String toString() {
+        return "ClusterConfigState{" +
+                "offset=" + offset +
+                ", connectorTaskCounts=" + connectorTaskCounts +
+                ", connectorConfigs=" + connectorConfigs +
+                ", taskConfigs=" + taskConfigs +
+                ", inconsistentConnectors=" + inconsistentConnectors +
+                '}';
+    }
 }

http://git-wip-us.apache.org/repos/asf/kafka/blob/2e617735/copycat/runtime/src/main/java/org/apache/kafka/copycat/runtime/distributed/CopycatProtocol.java
----------------------------------------------------------------------
diff --git a/copycat/runtime/src/main/java/org/apache/kafka/copycat/runtime/distributed/CopycatProtocol.java b/copycat/runtime/src/main/java/org/apache/kafka/copycat/runtime/distributed/CopycatProtocol.java
new file mode 100644
index 0000000..a450b1d
--- /dev/null
+++ b/copycat/runtime/src/main/java/org/apache/kafka/copycat/runtime/distributed/CopycatProtocol.java
@@ -0,0 +1,246 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ **/
+
+package org.apache.kafka.copycat.runtime.distributed;
+
+import org.apache.kafka.common.protocol.types.ArrayOf;
+import org.apache.kafka.common.protocol.types.Field;
+import org.apache.kafka.common.protocol.types.Schema;
+import org.apache.kafka.common.protocol.types.SchemaException;
+import org.apache.kafka.common.protocol.types.Struct;
+import org.apache.kafka.common.protocol.types.Type;
+import org.apache.kafka.copycat.util.ConnectorTaskId;
+
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.HashSet;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Map;
+
+/**
+ * This class implements the protocol for Copycat workers in a group. It includes the format of worker state used when
+ * joining the group and distributing assignments, and the format of assignments of connectors and tasks to workers.
+ */
+public class CopycatProtocol {
+    public static final String VERSION_KEY_NAME = "version";
+    public static final String CONFIG_OFFSET_KEY_NAME = "config-offset";
+    public static final String CONNECTOR_KEY_NAME = "connector";
+    public static final String LEADER_KEY_NAME = "leader";
+    public static final String ERROR_KEY_NAME = "error";
+    public static final String TASKS_KEY_NAME = "tasks";
+    public static final String ASSIGNMENT_KEY_NAME = "assignment";
+    public static final int CONNECTOR_TASK = -1;
+
+    public static final short COPYCAT_PROTOCOL_V0 = 0;
+    public static final Schema COPYCAT_PROTOCOL_HEADER_SCHEMA = new Schema(
+            new Field(VERSION_KEY_NAME, Type.INT16));
+    private static final Struct COPYCAT_PROTOCOL_HEADER_V0 = new Struct(COPYCAT_PROTOCOL_HEADER_SCHEMA)
+            .set(VERSION_KEY_NAME, COPYCAT_PROTOCOL_V0);
+
+    public static final Schema CONFIG_STATE_V0 = new Schema(
+            new Field(CONFIG_OFFSET_KEY_NAME, Type.INT64));
+    // Assignments for each worker are a set of connectors and tasks. These are categorized by connector ID. A sentinel
+    // task ID (CONNECTOR_TASK) is used to indicate the connector itself (i.e. that the assignment includes
+    // responsibility for running the Connector instance in addition to any tasks it generates).
+    public static final Schema CONNECTOR_ASSIGNMENT_V0 = new Schema(
+            new Field(CONNECTOR_KEY_NAME, Type.STRING),
+            new Field(TASKS_KEY_NAME, new ArrayOf(Type.INT32)));
+    public static final Schema ASSIGNMENT_V0 = new Schema(
+            new Field(ERROR_KEY_NAME, Type.INT16),
+            new Field(LEADER_KEY_NAME, Type.STRING),
+            new Field(CONFIG_OFFSET_KEY_NAME, Type.INT64),
+            new Field(ASSIGNMENT_KEY_NAME, new ArrayOf(CONNECTOR_ASSIGNMENT_V0)));
+
+    public static ByteBuffer serializeMetadata(ConfigState configState) {
+        Struct struct = new Struct(CONFIG_STATE_V0);
+        struct.set(CONFIG_OFFSET_KEY_NAME, configState.offset());
+        ByteBuffer buffer = ByteBuffer.allocate(COPYCAT_PROTOCOL_HEADER_V0.sizeOf() + CONFIG_STATE_V0.sizeOf(struct));
+        COPYCAT_PROTOCOL_HEADER_V0.writeTo(buffer);
+        CONFIG_STATE_V0.write(buffer, struct);
+        buffer.flip();
+        return buffer;
+    }
+
+    public static ConfigState deserializeMetadata(ByteBuffer buffer) {
+        Struct header = (Struct) COPYCAT_PROTOCOL_HEADER_SCHEMA.read(buffer);
+        Short version = header.getShort(VERSION_KEY_NAME);
+        checkVersionCompatibility(version);
+        Struct struct = (Struct) CONFIG_STATE_V0.read(buffer);
+        long configOffset = struct.getLong(CONFIG_OFFSET_KEY_NAME);
+        return new ConfigState(configOffset);
+    }
+
+    public static ByteBuffer serializeAssignment(Assignment assignment) {
+        Struct struct = new Struct(ASSIGNMENT_V0);
+        struct.set(ERROR_KEY_NAME, assignment.error());
+        struct.set(LEADER_KEY_NAME, assignment.leader());
+        struct.set(CONFIG_OFFSET_KEY_NAME, assignment.offset());
+        List<Struct> taskAssignments = new ArrayList<>();
+        for (Map.Entry<String, List<Integer>> connectorEntry : assignment.asMap().entrySet()) {
+            Struct taskAssignment = new Struct(CONNECTOR_ASSIGNMENT_V0);
+            taskAssignment.set(CONNECTOR_KEY_NAME, connectorEntry.getKey());
+            List<Integer> tasks = connectorEntry.getValue();
+            taskAssignment.set(TASKS_KEY_NAME, tasks.toArray());
+            taskAssignments.add(taskAssignment);
+        }
+        struct.set(ASSIGNMENT_KEY_NAME, taskAssignments.toArray());
+
+        ByteBuffer buffer = ByteBuffer.allocate(COPYCAT_PROTOCOL_HEADER_V0.sizeOf() + ASSIGNMENT_V0.sizeOf(struct));
+        COPYCAT_PROTOCOL_HEADER_V0.writeTo(buffer);
+        ASSIGNMENT_V0.write(buffer, struct);
+        buffer.flip();
+        return buffer;
+    }
+
+    public static Assignment deserializeAssignment(ByteBuffer buffer) {
+        Struct header = (Struct) COPYCAT_PROTOCOL_HEADER_SCHEMA.read(buffer);
+        Short version = header.getShort(VERSION_KEY_NAME);
+        checkVersionCompatibility(version);
+        Struct struct = (Struct) ASSIGNMENT_V0.read(buffer);
+        short error = struct.getShort(ERROR_KEY_NAME);
+        String leader = struct.getString(LEADER_KEY_NAME);
+        long offset = struct.getLong(CONFIG_OFFSET_KEY_NAME);
+        List<String> connectorIds = new ArrayList<>();
+        List<ConnectorTaskId> taskIds = new ArrayList<>();
+        for (Object structObj : struct.getArray(ASSIGNMENT_KEY_NAME)) {
+            Struct assignment = (Struct) structObj;
+            String connector = assignment.getString(CONNECTOR_KEY_NAME);
+            for (Object taskIdObj : assignment.getArray(TASKS_KEY_NAME)) {
+                Integer taskId = (Integer) taskIdObj;
+                if (taskId == CONNECTOR_TASK)
+                    connectorIds.add(connector);
+                else
+                    taskIds.add(new ConnectorTaskId(connector, taskId));
+            }
+        }
+        return new Assignment(error, leader, offset, connectorIds, taskIds);
+    }
+
+    public static class ConfigState {
+        private final long offset;
+
+        public ConfigState(long offset) {
+            this.offset = offset;
+        }
+
+        public long offset() {
+            return offset;
+        }
+
+        @Override
+        public String toString() {
+            return "ConfigState{" +
+                    "offset=" + offset +
+                    '}';
+        }
+    }
+
+    public static class Assignment {
+        public static final short NO_ERROR = 0;
+        // Configuration offsets mismatched in a way that the leader could not resolve. Workers should read to the end
+        // of the config log and try to re-join
+        public static final short CONFIG_MISMATCH = 1;
+
+        private final short error;
+        private final String leader;
+        private final long offset;
+        private final List<String> connectorIds;
+        private final List<ConnectorTaskId> taskIds;
+
+        /**
+         * Create an assignment indicating responsibility for the given connector instances and task Ids.
+         * @param connectorIds list of connectors that the worker should instantiate and run
+         * @param taskIds list of task IDs that the worker should instantiate and run
+         */
+        public Assignment(short error, String leader, long configOffset,
+                          List<String> connectorIds, List<ConnectorTaskId> taskIds) {
+            this.error = error;
+            this.leader = leader;
+            this.offset = configOffset;
+            this.taskIds = taskIds;
+            this.connectorIds = connectorIds;
+        }
+
+        public short error() {
+            return error;
+        }
+
+        public String leader() {
+            return leader;
+        }
+
+        public boolean failed() {
+            return error != NO_ERROR;
+        }
+
+        public long offset() {
+            return offset;
+        }
+
+        public List<String> connectors() {
+            return connectorIds;
+        }
+
+        public List<ConnectorTaskId> tasks() {
+            return taskIds;
+        }
+
+        @Override
+        public String toString() {
+            return "Assignment{" +
+                    "error=" + error +
+                    ", leader='" + leader + '\'' +
+                    ", offset=" + offset +
+                    ", connectorIds=" + connectorIds +
+                    ", taskIds=" + taskIds +
+                    '}';
+        }
+
+        private Map<String, List<Integer>> asMap() {
+            // Using LinkedHashMap preserves the ordering, which is helpful for tests and debugging
+            Map<String, List<Integer>> taskMap = new LinkedHashMap<>();
+            for (String connectorId : new HashSet<>(connectorIds)) {
+                List<Integer> connectorTasks = taskMap.get(connectorId);
+                if (connectorTasks == null) {
+                    connectorTasks = new ArrayList<>();
+                    taskMap.put(connectorId, connectorTasks);
+                }
+                connectorTasks.add(CONNECTOR_TASK);
+            }
+            for (ConnectorTaskId taskId : taskIds) {
+                String connectorId = taskId.connector();
+                List<Integer> connectorTasks = taskMap.get(connectorId);
+                if (connectorTasks == null) {
+                    connectorTasks = new ArrayList<>();
+                    taskMap.put(connectorId, connectorTasks);
+                }
+                connectorTasks.add(taskId.task());
+            }
+            return taskMap;
+        }
+    }
+
+    private static void checkVersionCompatibility(short version) {
+        // check for invalid versions
+        if (version < COPYCAT_PROTOCOL_V0)
+            throw new SchemaException("Unsupported subscription version: " + version);
+
+        // otherwise, assume versions can be parsed as V0
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/2e617735/copycat/runtime/src/main/java/org/apache/kafka/copycat/runtime/distributed/DistributedHerder.java
----------------------------------------------------------------------
diff --git a/copycat/runtime/src/main/java/org/apache/kafka/copycat/runtime/distributed/DistributedHerder.java b/copycat/runtime/src/main/java/org/apache/kafka/copycat/runtime/distributed/DistributedHerder.java
index 5273658..bf3229d 100644
--- a/copycat/runtime/src/main/java/org/apache/kafka/copycat/runtime/distributed/DistributedHerder.java
+++ b/copycat/runtime/src/main/java/org/apache/kafka/copycat/runtime/distributed/DistributedHerder.java
@@ -17,304 +17,627 @@
 
 package org.apache.kafka.copycat.runtime.distributed;
 
+import org.apache.kafka.clients.consumer.ConsumerWakeupException;
+import org.apache.kafka.common.config.ConfigException;
 import org.apache.kafka.common.utils.Utils;
-import org.apache.kafka.copycat.connector.Connector;
+import org.apache.kafka.copycat.connector.ConnectorContext;
 import org.apache.kafka.copycat.errors.CopycatException;
 import org.apache.kafka.copycat.runtime.ConnectorConfig;
 import org.apache.kafka.copycat.runtime.Herder;
 import org.apache.kafka.copycat.runtime.HerderConnectorContext;
+import org.apache.kafka.copycat.runtime.TaskConfig;
 import org.apache.kafka.copycat.runtime.Worker;
 import org.apache.kafka.copycat.sink.SinkConnector;
-import org.apache.kafka.copycat.sink.SinkTask;
 import org.apache.kafka.copycat.storage.KafkaConfigStorage;
 import org.apache.kafka.copycat.util.Callback;
 import org.apache.kafka.copycat.util.ConnectorTaskId;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import java.util.*;
+import java.util.Collection;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Queue;
+import java.util.Set;
+import java.util.concurrent.Callable;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.LinkedBlockingDeque;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+import java.util.concurrent.atomic.AtomicBoolean;
 
 /**
- * Distributed "herder" that coordinates with other workers to spread work across multiple processes.
+ * <p>
+ *     Distributed "herder" that coordinates with other workers to spread work across multiple processes.
+ * </p>
+ * <p>
+ *     Under the hood, this is implemented as a group managed by Kafka's group membership facilities (i.e. the generalized
+ *     group/consumer coordinator). Each instance of DistributedHerder joins the group and indicates what it's current
+ *     configuration state is (where it is in the configuration log). The group coordinator selects one member to take
+ *     this information and assign each instance a subset of the active connectors & tasks to execute. This assignment
+ *     is currently performed in a simple round-robin fashion, but this is not guaranteed -- the herder may also choose
+ *     to, e.g., use a sticky assignment to avoid the usual start/stop costs associated with connectors and tasks. Once
+ *     an assignment is received, the DistributedHerder simply runs its assigned connectors and tasks in a Worker.
+ * </p>
+ * <p>
+ *     In addition to distributing work, the DistributedHerder uses the leader determined during the work assignment
+ *     to select a leader for this generation of the group who is responsible for other tasks that can only be performed
+ *     by a single node at a time. Most importantly, this includes writing updated configurations for connectors and tasks,
+ *     (and therefore, also for creating, destroy, and scaling up/down connectors).
+ * </p>
  */
-public class DistributedHerder implements Herder {
+public class DistributedHerder implements Herder, Runnable {
     private static final Logger log = LoggerFactory.getLogger(DistributedHerder.class);
 
-    private Worker worker;
-    private KafkaConfigStorage configStorage;
+    private final Worker worker;
+    private final KafkaConfigStorage configStorage;
     private ClusterConfigState configState;
-    private HashMap<String, ConnectorState> connectors = new HashMap<>();
 
-    public DistributedHerder(Worker worker) {
-        this.worker = worker;
-        this.configStorage = new KafkaConfigStorage(worker.getInternalValueConverter(),
-                new ConnectorConfigCallback(), new TaskConfigCallback());
+    private final int workerSyncTimeoutMs;
+    private final int workerUnsyncBackoffMs;
+
+    private final WorkerGroupMember member;
+    private final AtomicBoolean stopping;
+    private final CountDownLatch stopLatch = new CountDownLatch(1);
+
+    // Track enough information about the current membership state to be able to determine which requests via the API
+    // and the from other nodes are safe to process
+    private boolean rebalanceResolved;
+    private CopycatProtocol.Assignment assignment;
+
+    // To handle most external requests, like creating or destroying a connector, we can use a generic request where
+    // the caller specifies all the code that should be executed.
+    private final Queue<HerderRequest> requests = new LinkedBlockingDeque<>();
+    // Config updates can be collected and applied together when possible. Also, we need to take care to rebalance when
+    // needed (e.g. task reconfiguration, which requires everyone to coordinate offset commits).
+    private final Set<String> connectorConfigUpdates = new HashSet<>();
+    private boolean needsReconfigRebalance;
+
+    public DistributedHerder(Worker worker, Map<String, ?> configs) {
+        this(worker, configs, null, null);
     }
 
-    // Public for testing (mock KafkaConfigStorage)
-    public DistributedHerder(Worker worker, KafkaConfigStorage configStorage) {
+    // public for testing
+    public DistributedHerder(Worker worker, Map<String, ?> configs, KafkaConfigStorage configStorage, WorkerGroupMember member) {
         this.worker = worker;
-        this.configStorage = configStorage;
+        if (configStorage != null) {
+            // For testing. Assume configuration has already been performed
+            this.configStorage = configStorage;
+        } else {
+            this.configStorage = new KafkaConfigStorage(worker.getInternalValueConverter(), connectorConfigCallback(), taskConfigCallback());
+            this.configStorage.configure(configs);
+        }
+        configState = ClusterConfigState.EMPTY;
+
+        DistributedHerderConfig config = new DistributedHerderConfig(configs);
+        this.workerSyncTimeoutMs = config.getInt(DistributedHerderConfig.WORKER_SYNC_TIMEOUT_MS_CONFIG);
+        this.workerUnsyncBackoffMs = config.getInt(DistributedHerderConfig.WORKER_UNSYNC_BACKOFF_MS_CONFIG);
+
+        this.member = member != null ? member : new WorkerGroupMember(config, this.configStorage, rebalanceListener());
+        stopping = new AtomicBoolean(false);
+
+        rebalanceResolved = true; // If we still need to follow up after a rebalance occurred, starting up tasks
+        needsReconfigRebalance = false;
     }
 
-    public synchronized void configure(Map<String, ?> configs) {
-        configStorage.configure(configs);
+    @Override
+    public void start() {
+        Thread thread = new Thread(this);
+        thread.start();
     }
 
-    public synchronized void start() {
-        log.info("Herder starting");
+    public void run() {
+        try {
+            log.info("Herder starting");
 
-        configStorage.start();
+            configStorage.start();
 
-        log.info("Restoring connectors from stored configs");
-        restoreConnectors();
+            log.info("Herder started");
 
-        log.info("Herder started");
+            while (!stopping.get()) {
+                tick();
+            }
+
+            halt();
+
+            log.info("Herder stopped");
+        } finally {
+            stopLatch.countDown();
+        }
     }
 
-    public synchronized void stop() {
-        log.info("Herder stopping");
+    // public for testing
+    public void tick() {
+        // The main loop does two primary things: 1) drive the group membership protocol, responding to rebalance events
+        // as they occur, and 2) handle external requests targeted at the leader. All the "real" work of the herder is
+        // performed in this thread, which keeps synchronization straightforward at the cost of some operations possibly
+        // blocking up this thread (especially those in callbacks due to rebalance events).
 
-        // There's no coordination/hand-off to do here since this is all standalone. Instead, we
-        // should just clean up the stuff we normally would, i.e. cleanly checkpoint and shutdown all
-        // the tasks.
-        for (Map.Entry<String, ConnectorState> entry : connectors.entrySet()) {
-            ConnectorState state = entry.getValue();
-            stopConnector(state);
+        try {
+            member.ensureActive();
+            // Ensure we're in a good state in our group. If not restart and everything should be setup to rejoin
+            if (!handleRebalanceCompleted()) return;
+        } catch (ConsumerWakeupException e) {
+            // May be due to a request from another thread, or might be stopping. If the latter, we need to check the
+            // flag immediately. If the former, we need to re-run the ensureActive call since we can't handle requests
+            // unless we're in the group.
+            return;
         }
-        connectors.clear();
 
-        if (configStorage != null) {
-            configStorage.stop();
-            configStorage = null;
+        // Process any external requests
+        while (!requests.isEmpty()) {
+            HerderRequest request = requests.poll();
+            try {
+                request.callback().onCompletion(null, request.action().call());
+            } catch (Throwable t) {
+                request.callback().onCompletion(t, null);
+            }
         }
 
-        log.info("Herder stopped");
-    }
+        // Process any configuration updates
+        synchronized (this) {
+            if (needsReconfigRebalance || !connectorConfigUpdates.isEmpty()) {
+                // Connector reconfigs only need local updates since there is no coordination between workers required.
+                // However, if connectors were added or removed, work needs to be rebalanced since we have more work
+                // items to distribute among workers.
+                ClusterConfigState newConfigState = configStorage.snapshot();
+                if (!newConfigState.connectors().equals(configState.connectors()))
+                    needsReconfigRebalance = true;
+                configState = newConfigState;
+                if (needsReconfigRebalance) {
+                    // Task reconfigs require a rebalance. Request the rebalance, clean out state, and then restart
+                    // this loop, which will then ensure the rebalance occurs without any other requests being
+                    // processed until it completes.
+                    member.requestRejoin();
+                    // Any connector config updates will be addressed during the rebalance too
+                    connectorConfigUpdates.clear();
+                    needsReconfigRebalance = false;
+                    return;
+                } else if (!connectorConfigUpdates.isEmpty()) {
+                    // If we only have connector config updates, we can just bounce the updated connectors that are
+                    // currently assigned to this worker.
+                    Set<String> localConnectors = worker.connectorNames();
+                    for (String connectorName : connectorConfigUpdates) {
+                        if (!localConnectors.contains(connectorName))
+                            continue;
+                        worker.stopConnector(connectorName);
+                        // The update may be a deletion, so verify we actually need to restart the connector
+                        if (configState.connectors().contains(connectorName))
+                            startConnector(connectorName);
+                    }
+                    connectorConfigUpdates.clear();
+                }
+            }
+        }
 
-    @Override
-    public synchronized void addConnector(Map<String, String> connectorProps,
-                                          Callback<String> callback) {
+        // Let the group take any actions it needs to
         try {
-            // Ensure the config is written to storage first
-            ConnectorConfig connConfig = new ConnectorConfig(connectorProps);
-            String connName = connConfig.getString(ConnectorConfig.NAME_CONFIG);
-            configStorage.putConnectorConfig(connName, connectorProps);
+            member.poll(Long.MAX_VALUE);
+            // Ensure we're in a good state in our group. If not restart and everything should be setup to rejoin
+            if (!handleRebalanceCompleted()) return;
+        } catch (ConsumerWakeupException e) { // FIXME should not be ConsumerWakeupException
+            // Ignore. Just indicates we need to check the exit flag, for requested actions, etc.
+        }
+    }
 
-            ConnectorState connState = createConnector(connConfig);
-            if (callback != null)
-                callback.onCompletion(null, connState.name);
-            // This should always be a new job, create jobs from scratch
-            createConnectorTasks(connState);
-        } catch (CopycatException e) {
-            if (callback != null)
-                callback.onCompletion(e, null);
+    // public for testing
+    public void halt() {
+        synchronized (this) {
+            // Clean up any connectors and tasks that are still running.
+            log.info("Stopping connectors and tasks that are still assigned to this worker.");
+            for (String connName : new HashSet<>(worker.connectorNames())) {
+                try {
+                    worker.stopConnector(connName);
+                } catch (Throwable t) {
+                    log.error("Failed to shut down connector " + connName, t);
+                }
+            }
+            for (ConnectorTaskId taskId : new HashSet<>(worker.taskIds())) {
+                try {
+                    worker.stopTask(taskId);
+                } catch (Throwable t) {
+                    log.error("Failed to shut down task " + taskId, t);
+                }
+            }
+
+            member.stop();
+
+            // Explicitly fail any outstanding requests so they actually get a response and get an understandable reason
+            // for their failure
+            while (!requests.isEmpty()) {
+                HerderRequest request = requests.poll();
+                request.callback().onCompletion(new CopycatException("Worker is shutting down"), null);
+            }
+
+            if (configStorage != null)
+                configStorage.stop();
         }
     }
 
     @Override
-    public synchronized void deleteConnector(String name, Callback<Void> callback) {
-        try {
-            destroyConnector(name);
-            if (callback != null)
-                callback.onCompletion(null, null);
-        } catch (CopycatException e) {
-            if (callback != null)
-                callback.onCompletion(e, null);
+    public void stop() {
+        log.info("Herder stopping");
+
+        stopping.set(true);
+        member.wakeup();
+        while (stopLatch.getCount() > 0) {
+            try {
+                stopLatch.await();
+            } catch (InterruptedException e) {
+                // ignore, should not happen
+            }
         }
     }
 
     @Override
-    public synchronized void requestTaskReconfiguration(String connName) {
-        ConnectorState state = connectors.get(connName);
-        if (state == null) {
-            log.error("Task that requested reconfiguration does not exist: {}", connName);
+    public synchronized void addConnector(final Map<String, String> connectorProps,
+                                          final Callback<String> callback) {
+        final ConnectorConfig connConfig;
+        final String connName;
+        try {
+            connConfig = new ConnectorConfig(connectorProps);
+            connName = connConfig.getString(ConnectorConfig.NAME_CONFIG);
+        } catch (Throwable t) {
+            if (callback != null)
+                callback.onCompletion(t, null);
             return;
         }
-        updateConnectorTasks(state);
+
+        log.debug("Submitting connector config {}", connName);
+
+        requests.add(new HerderRequest(
+                new Callable<Void>() {
+                    @Override
+                    public Void call() throws Exception {
+                        if (!isLeader())
+                            throw new NotLeaderException("Only the leader can add connectors.");
+
+                        log.debug("Submitting connector config {}", connName);
+                        configStorage.putConnectorConfig(connName, connectorProps);
+
+                        return null;
+                    }
+                },
+                new Callback<Void>() {
+                    @Override
+                    public void onCompletion(Throwable error, Void result) {
+                        if (callback == null) return;
+
+                        if (error != null)
+                            callback.onCompletion(error, null);
+                        else
+                            callback.onCompletion(null, connName);
+                    }
+                }));
+        member.wakeup();
     }
 
-    // Creates and configures the connector. Does not setup any tasks
-    private ConnectorState createConnector(ConnectorConfig connConfig) {
-        String connName = connConfig.getString(ConnectorConfig.NAME_CONFIG);
-        String className = connConfig.getString(ConnectorConfig.CONNECTOR_CLASS_CONFIG);
-        log.info("Creating connector {} of type {}", connName, className);
-        int maxTasks = connConfig.getInt(ConnectorConfig.TASKS_MAX_CONFIG);
-        List<String> topics = connConfig.getList(ConnectorConfig.TOPICS_CONFIG); // Sinks only
-        Properties configs = connConfig.unusedProperties();
-
-        if (connectors.containsKey(connName)) {
-            log.error("Ignoring request to create connector due to conflicting connector name");
-            throw new CopycatException("Connector with name " + connName + " already exists");
-        }
+    @Override
+    public synchronized void deleteConnector(final String connName, final Callback<Void> callback) {
+        log.debug("Submitting connector config deletion {}", connName);
+
+        requests.add(new HerderRequest(
+                new Callable<Void>() {
+                    @Override
+                    public Void call() throws Exception {
+                        if (!isLeader())
+                            throw new NotLeaderException("Only the leader can delete connectors.");
+
+                        log.debug("Submitting null connector config {}", connName);
+                        configStorage.putConnectorConfig(connName, null);
+                        return null;
+                    }
+                },
+                new Callback<Void>() {
+                    @Override
+                    public void onCompletion(Throwable error, Void result) {
+                        if (callback != null)
+                            callback.onCompletion(error, null);
+                    }
+                }
+        ));
+        member.wakeup();
+    }
 
-        final Connector connector;
-        try {
-            connector = instantiateConnector(className);
-        } catch (Throwable t) {
-            // Catches normal exceptions due to instantiation errors as well as any runtime errors that
-            // may be caused by user code
-            throw new CopycatException("Failed to create connector instance", t);
-        }
-        connector.initialize(new HerderConnectorContext(this, connName));
-        try {
-            connector.start(configs);
-        } catch (CopycatException e) {
-            throw new CopycatException("Connector threw an exception while starting", e);
-        }
-        ConnectorState state = new ConnectorState(connName, connector, maxTasks, topics);
-        connectors.put(connName, state);
+    @Override
+    public synchronized void requestTaskReconfiguration(final String connName) {
+        requests.add(new HerderRequest(
+                new Callable<Void>() {
+                    @Override
+                    public Void call() throws Exception {
+                        reconfigureConnector(connName);
+                        return null;
+                    }
+                }
+        ));
+        member.wakeup();
+    }
 
-        log.info("Finished creating connector {}", connName);
 
-        return state;
+    private boolean isLeader() {
+        return assignment != null && member.memberId().equals(assignment.leader());
     }
 
-    private static Connector instantiateConnector(String className) {
-        try {
-            return Utils.newInstance(className, Connector.class);
-        } catch (ClassNotFoundException e) {
-            throw new CopycatException("Couldn't instantiate connector class", e);
+    /**
+     * Handle post-assignment operations, either trying to resolve issues that kept assignment from completing, getting
+     * this node into sync and its work started. Since
+     *
+     * @return false if we couldn't finish
+     */
+    private boolean handleRebalanceCompleted() {
+        if (this.rebalanceResolved)
+            return true;
+
+        rebalanceResolved = true;
+
+        // We need to handle a variety of cases after a rebalance:
+        // 1. Assignment failed
+        //  1a. We are the leader for the round. We will be leader again if we rejoin now, so we need to catch up before
+        //      even attempting to. If we can't we should drop out of the group because we will block everyone from making
+        //      progress. We can backoff and try rejoining later.
+        //  1b. We are not the leader. We might need to catch up. If we're already caught up we can rejoin immediately,
+        //      otherwise, we just want to wait indefinitely to catch up and rejoin whenver we're finally ready.
+        // 2. Assignment succeeded.
+        //  2a. We are caught up on configs. Awesome! We can proceed to run our assigned work.
+        //  2b. We need to try to catch up. We can do this potentially indefinitely because if it takes to long, we'll
+        //      be kicked out of the group anyway due to lack of heartbeats.
+
+        boolean needsReadToEnd = false;
+        long syncConfigsTimeoutMs = Long.MAX_VALUE;
+        boolean needsRejoin = false;
+        if (assignment.failed()) {
+            needsRejoin = true;
+            if (isLeader()) {
+                log.warn("Join group completed, but assignment failed and we are the leader. Reading to end of config and retrying.");
+                needsReadToEnd = true;
+                syncConfigsTimeoutMs = workerSyncTimeoutMs;
+            } else if (configState.offset() < assignment.offset()) {
+                log.warn("Join group completed, but assignment failed and we lagging. Reading to end of config and retrying.");
+                needsReadToEnd = true;
+            } else {
+                log.warn("Join group completed, but assignment failed. We were up to date, so just retrying.");
+            }
+        } else {
+            if (configState.offset() < assignment.offset()) {
+                log.warn("Catching up to assignment's config offset.");
+                needsReadToEnd = true;
+            }
         }
-    }
 
-    private void destroyConnector(String connName) {
-        log.info("Destroying connector {}", connName);
-        ConnectorState state = connectors.get(connName);
-        if (state == null) {
-            log.error("Failed to destroy connector {} because it does not exist", connName);
-            throw new CopycatException("Connector does not exist");
+        if (needsReadToEnd) {
+            // Force exiting this method to avoid creating any connectors/tasks and require immediate rejoining if
+            // we timed out. This should only happen if we were the leader and didn't finish quickly enough, in which
+            // case we've waited a long time and should have already left the group OR the timeout should have been
+            // very long and not having finished also indicates we've waited longer than the session timeout.
+            if (!readConfigToEnd(syncConfigsTimeoutMs))
+                needsRejoin = true;
         }
 
-        stopConnector(state);
-        configStorage.putConnectorConfig(state.name, null);
-        connectors.remove(state.name);
+        if (needsRejoin) {
+            member.requestRejoin();
+            return false;
+        }
 
-        log.info("Finished destroying connector {}", connName);
+        // Should still validate that they match since we may have gone *past* the required offset, in which case we
+        // should *not* start any tasks and rejoin
+        if (configState.offset() != assignment.offset()) {
+            log.info("Current config state offset {} does not match group assignment {}. Forcing rebalance.", configState.offset(), assignment.offset());
+            member.requestRejoin();
+            return false;
+        }
+
+        startWork();
+
+        return true;
     }
 
-    // Stops a connectors tasks, then the connector
-    private void stopConnector(ConnectorState state) {
-        removeConnectorTasks(state);
+    /**
+     * Try to read to the end of the config log within the given timeout
+     * @param timeoutMs maximum time to wait to sync to the end of the log
+     * @return true if successful, false if timed out
+     */
+    private boolean readConfigToEnd(long timeoutMs) {
+        log.info("Current config state offset {} is behind group assignment {}, reading to end of config log", configState.offset(), assignment.offset());
         try {
-            state.connector.stop();
-        } catch (CopycatException e) {
-            log.error("Error shutting down connector {}: ", state.connector, e);
+            configStorage.readToEnd().get(timeoutMs, TimeUnit.MILLISECONDS);
+            configState = configStorage.snapshot();
+            log.info("Finished reading to end of log and updated config snapshot, new config log offset: {}", configState.offset());
+            return true;
+        } catch (TimeoutException e) {
+            log.warn("Didn't reach end of config log quickly enough", e);
+            // TODO: With explicit leave group support, it would be good to explicitly leave the group *before* this
+            // backoff since it'll be longer than the session timeout
+            if (isLeader())
+                backoff(workerUnsyncBackoffMs);
+            return false;
+        } catch (InterruptedException | ExecutionException e) {
+            throw new CopycatException("Error trying to catch up after assignment", e);
         }
     }
 
-    private void createConnectorTasks(ConnectorState state) {
-        String taskClassName = state.connector.taskClass().getName();
-
-        log.info("Creating tasks for connector {} of type {}", state.name, taskClassName);
-
-        List<Properties> taskConfigs = state.connector.taskConfigs(state.maxTasks);
-
-        // Generate the final configs, including framework provided settings
-        Map<ConnectorTaskId, Properties> taskProps = new HashMap<>();
-        for (int i = 0; i < taskConfigs.size(); i++) {
-            ConnectorTaskId taskId = new ConnectorTaskId(state.name, i);
-            Properties config = taskConfigs.get(i);
-            // TODO: This probably shouldn't be in the Herder. It's nice to have Copycat ensure the list of topics
-            // is automatically provided to tasks since it is required by the framework, but this
-            String subscriptionTopics = Utils.join(state.inputTopics, ",");
-            if (state.connector instanceof SinkConnector) {
-                // Make sure we don't modify the original since the connector may reuse it internally
-                Properties configForSink = new Properties();
-                configForSink.putAll(config);
-                configForSink.setProperty(SinkTask.TOPICS_CONFIG, subscriptionTopics);
-                config = configForSink;
-            }
-            taskProps.put(taskId, config);
-        }
+    private void backoff(long ms) {
+        Utils.sleep(ms);
+    }
 
-        // And initiate the tasks
-        for (int i = 0; i < taskConfigs.size(); i++) {
-            ConnectorTaskId taskId = new ConnectorTaskId(state.name, i);
-            Properties config = taskProps.get(taskId);
+    private void startWork() {
+        // Start assigned connectors and tasks
+        log.info("Starting connectors and tasks using config offset {}", assignment.offset());
+        for (String connectorName : assignment.connectors()) {
             try {
-                worker.addTask(taskId, taskClassName, config);
-                // We only need to store the task IDs so we can clean up.
-                state.tasks.add(taskId);
-            } catch (Throwable e) {
-                log.error("Failed to add task {}: ", taskId, e);
-                // Swallow this so we can continue updating the rest of the tasks
-                // FIXME what's the proper response? Kill all the tasks? Consider this the same as a task
-                // that died after starting successfully.
+                startConnector(connectorName);
+            } catch (ConfigException e) {
+                log.error("Couldn't instantiate connector " + connectorName + " because it has an invalid connector " +
+                        "configuration. This connector will not execute until reconfigured.", e);
             }
         }
-    }
-
-    private void removeConnectorTasks(ConnectorState state) {
-        Iterator<ConnectorTaskId> taskIter = state.tasks.iterator();
-        while (taskIter.hasNext()) {
-            ConnectorTaskId taskId = taskIter.next();
+        for (ConnectorTaskId taskId : assignment.tasks()) {
             try {
-                worker.stopTask(taskId);
-                taskIter.remove();
-            } catch (CopycatException e) {
-                log.error("Failed to stop task {}: ", taskId, e);
-                // Swallow this so we can continue stopping the rest of the tasks
-                // FIXME: Forcibly kill the task?
+                log.info("Starting task {}", taskId);
+                Map<String, String> configs = configState.taskConfig(taskId);
+                TaskConfig taskConfig = new TaskConfig(configs);
+                worker.addTask(taskId, taskConfig);
+            } catch (ConfigException e) {
+                log.error("Couldn't instantiate task " + taskId + " because it has an invalid task " +
+                        "configuration. This task will not execute until reconfigured.", e);
             }
         }
     }
 
-    private void updateConnectorTasks(ConnectorState state) {
-        removeConnectorTasks(state);
-        createConnectorTasks(state);
+    // Helper for starting a connector with the given name, which will extract & parse the config, generate connector
+    // context and add to the worker. This needs to be called from within the main worker thread for this herder.
+    private void startConnector(String connectorName) {
+        log.info("Starting connector {}", connectorName);
+        Map<String, String> configs = configState.connectorConfig(connectorName);
+        ConnectorConfig connConfig = new ConnectorConfig(configs);
+        String connName = connConfig.getString(ConnectorConfig.NAME_CONFIG);
+        ConnectorContext ctx = new HerderConnectorContext(DistributedHerder.this, connName);
+        worker.addConnector(connConfig, ctx);
+
+        // Immediately request configuration since this could be a brand new connector. However, also only update those
+        // task configs if they are actually different from the existing ones to avoid unnecessary updates when this is
+        // just restoring an existing connector.
+        reconfigureConnector(connName);
     }
 
-    private void restoreConnectors() {
-        configState = configStorage.snapshot();
-        Collection<String> connNames = configState.connectors();
-        for (String connName : connNames) {
-            log.info("Restoring connector {}", connName);
-            Map<String, String> connProps = configState.connectorConfig(connName);
-            ConnectorConfig connConfig = new ConnectorConfig(connProps);
-            ConnectorState connState = createConnector(connConfig);
-            // Because this coordinator is standalone, connectors are only restored when this process
-            // starts and we know there can't be any existing tasks. So in this special case we're able
-            // to just create the tasks rather than having to check for existing tasks and sort out
-            // whether they need to be reconfigured.
-            createConnectorTasks(connState);
+    // Updates configurations for a connector by requesting them from the connector, filling in parameters provided
+    // by the system, then checks whether any configs have actually changed before submitting the new configs to storage
+    private void reconfigureConnector(String connName) {
+        Map<String, String> configs = configState.connectorConfig(connName);
+        ConnectorConfig connConfig = new ConnectorConfig(configs);
+
+        List<String> sinkTopics = null;
+        if (SinkConnector.class.isAssignableFrom(connConfig.getClass(ConnectorConfig.CONNECTOR_CLASS_CONFIG)))
+            sinkTopics = connConfig.getList(ConnectorConfig.TOPICS_CONFIG);
+
+        Map<ConnectorTaskId, Map<String, String>> taskProps
+                = worker.reconfigureConnectorTasks(connName, connConfig.getInt(ConnectorConfig.TASKS_MAX_CONFIG), sinkTopics);
+        boolean changed = false;
+        int currentNumTasks = configState.taskCount(connName);
+        if (taskProps.size() != currentNumTasks) {
+            log.debug("Change in connector task count from {} to {}, writing updated task configurations", currentNumTasks, taskProps.size());
+            changed = true;
+        } else {
+            for (Map.Entry<ConnectorTaskId, Map<String, String>> taskConfig : taskProps.entrySet()) {
+                if (!taskConfig.getValue().equals(configState.taskConfig(taskConfig.getKey()))) {
+                    log.debug("Change in task configurations, writing updated task configurations");
+                    changed = true;
+                    break;
+                }
+            }
+        }
+        if (changed) {
+            // FIXME: Configs should only be written by the leader to avoid conflicts due to zombies. However, until the
+            // REST API is available to forward this request, we need to do this on the worker that generates the config
+            configStorage.putTaskConfigs(taskProps);
         }
     }
 
 
+    private class HerderRequest {
+        private final Callable<Void> action;
+        private final Callback<Void> callback;
 
-    private static class ConnectorState {
-        public String name;
-        public Connector connector;
-        public int maxTasks;
-        public List<String> inputTopics;
-        Set<ConnectorTaskId> tasks;
+        public HerderRequest(Callable<Void> action, Callback<Void> callback) {
+            this.action = action;
+            this.callback = callback;
+        }
 
-        public ConnectorState(String name, Connector connector, int maxTasks,
-                              List<String> inputTopics) {
-            this.name = name;
-            this.connector = connector;
-            this.maxTasks = maxTasks;
-            this.inputTopics = inputTopics;
-            this.tasks = new HashSet<>();
+        public HerderRequest(Callable<Void> action) {
+            this.action = action;
+            this.callback = DEFAULT_CALLBACK;
         }
-    }
 
-    private class ConnectorConfigCallback implements Callback<String> {
-        @Override
-        public void onCompletion(Throwable error, String result) {
-            configState = configStorage.snapshot();
-            // FIXME
+        public Callable<Void> action() {
+            return action;
+        }
+
+        public Callback<Void> callback() {
+            return callback;
         }
     }
 
-    private class TaskConfigCallback implements Callback<List<ConnectorTaskId>> {
+    private static final Callback<Void> DEFAULT_CALLBACK = new Callback<Void>() {
         @Override
-        public void onCompletion(Throwable error, List<ConnectorTaskId> result) {
-            configState = configStorage.snapshot();
-            // FIXME
+        public void onCompletion(Throwable error, Void result) {
+            if (error != null)
+                log.error("HerderRequest's action threw an exception: ", error);
         }
+    };
+
+
+    // Config callbacks are triggered from the KafkaConfigStorage thread
+    private Callback<String> connectorConfigCallback() {
+        return new Callback<String>() {
+            @Override
+            public void onCompletion(Throwable error, String connector) {
+                log.debug("Connector {} config updated", connector);
+                // Stage the update and wake up the work thread. Connector config *changes* only need the one connector
+                // to be bounced. However, this callback may also indicate a connector *addition*, which does require
+                // a rebalance, so we need to be careful about what operation we request.
+                synchronized (DistributedHerder.this) {
+                    connectorConfigUpdates.add(connector);
+                }
+                member.wakeup();
+            }
+        };
     }
 
+    private Callback<List<ConnectorTaskId>> taskConfigCallback() {
+        return new Callback<List<ConnectorTaskId>>() {
+            @Override
+            public void onCompletion(Throwable error, List<ConnectorTaskId> tasks) {
+                log.debug("Tasks {} configs updated", tasks);
+                // Stage the update and wake up the work thread. No need to record the set of tasks here because task reconfigs
+                // always need a rebalance to ensure offsets get committed.
+                // TODO: As an optimization, some task config updates could avoid a rebalance. In particular, single-task
+                // connectors clearly don't need any coordination.
+                synchronized (DistributedHerder.this) {
+                    needsReconfigRebalance = true;
+                }
+                member.wakeup();
+            }
+        };
+    }
+
+    // Rebalances are triggered internally from the group member, so these are always executed in the work thread.
+    private WorkerRebalanceListener rebalanceListener() {
+        return new WorkerRebalanceListener() {
+            @Override
+            public void onAssigned(CopycatProtocol.Assignment assignment) {
+                // This callback just logs the info and saves it. The actual response is handled in the main loop, which
+                // ensures the group member's logic for rebalancing can complete, potentially long-running steps to
+                // catch up (or backoff if we fail) not executed in a callback, and so we'll be able to invoke other
+                // group membership actions (e.g., we may need to explicitly leave the group if we cannot handle the
+                // assigned tasks).
+                log.info("Joined group and got assignment: {}", assignment);
+                DistributedHerder.this.assignment = assignment;
+                rebalanceResolved = false;
+            }
 
+            @Override
+            public void onRevoked(String leader, Collection<String> connectors, Collection<ConnectorTaskId> tasks) {
+                log.info("Rebalance started");
+
+                // Note that since we don't reset the assignment, we we don't revoke leadership here. During a rebalance,
+                // it is still important to have a leader that can write configs, offsets, etc.
+
+                // TODO: Parallelize this. We should be able to request all connectors and tasks to stop, then wait on all of
+                // them to finish
+                // TODO: Technically we don't have to stop connectors at all until we know they've really been removed from
+                // this worker. Instead, we can let them continue to run but buffer any update requests (which should be
+                // rare anyway). This would avoid a steady stream of start/stop, which probably also includes lots of
+                // unnecessary repeated connections to the source/sink system.
+                for (String connectorName : connectors)
+                    worker.stopConnector(connectorName);
+                // TODO: We need to at least commit task offsets, but if we could commit offsets & pause them instead of
+                // stopping them then state could continue to be reused when the task remains on this worker. For example,
+                // this would avoid having to close a connection and then reopen it when the task is assigned back to this
+                // worker again.
+                for (ConnectorTaskId taskId : tasks)
+                    worker.stopTask(taskId);
+            }
+        };
+    }
 }


[2/3] kafka git commit: KAFKA-2371: Add distributed support for Copycat.

Posted by gw...@apache.org.
http://git-wip-us.apache.org/repos/asf/kafka/blob/2e617735/copycat/runtime/src/main/java/org/apache/kafka/copycat/runtime/distributed/DistributedHerderConfig.java
----------------------------------------------------------------------
diff --git a/copycat/runtime/src/main/java/org/apache/kafka/copycat/runtime/distributed/DistributedHerderConfig.java b/copycat/runtime/src/main/java/org/apache/kafka/copycat/runtime/distributed/DistributedHerderConfig.java
new file mode 100644
index 0000000..bd2ba56
--- /dev/null
+++ b/copycat/runtime/src/main/java/org/apache/kafka/copycat/runtime/distributed/DistributedHerderConfig.java
@@ -0,0 +1,192 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ **/
+
+package org.apache.kafka.copycat.runtime.distributed;
+
+import org.apache.kafka.clients.CommonClientConfigs;
+import org.apache.kafka.common.config.AbstractConfig;
+import org.apache.kafka.common.config.ConfigDef;
+import org.apache.kafka.common.config.SSLConfigs;
+import org.apache.kafka.common.config.SaslConfigs;
+
+import java.util.Map;
+
+import static org.apache.kafka.common.config.ConfigDef.Range.atLeast;
+
+public class DistributedHerderConfig extends AbstractConfig {
+    private static final ConfigDef CONFIG;
+
+    /*
+     * NOTE: DO NOT CHANGE EITHER CONFIG STRINGS OR THEIR JAVA VARIABLE NAMES AS
+     * THESE ARE PART OF THE PUBLIC API AND CHANGE WILL BREAK USER CODE.
+     */
+
+    /**
+     * <code>group.id</code>
+     */
+    public static final String GROUP_ID_CONFIG = "group.id";
+    private static final String GROUP_ID_DOC = "A unique string that identifies the Copycat cluster group this worker belongs to.";
+
+    /**
+     * <code>session.timeout.ms</code>
+     */
+    public static final String SESSION_TIMEOUT_MS_CONFIG = "session.timeout.ms";
+    private static final String SESSION_TIMEOUT_MS_DOC = "The timeout used to detect failures when using Kafka's group management facilities.";
+
+    /**
+     * <code>heartbeat.interval.ms</code>
+     */
+    public static final String HEARTBEAT_INTERVAL_MS_CONFIG = "heartbeat.interval.ms";
+    private static final String HEARTBEAT_INTERVAL_MS_DOC = "The expected time between heartbeats to the group coordinator when using Kafka's group management facilities. Heartbeats are used to ensure that the worker's session stays active and to facilitate rebalancing when new members join or leave the group. The value must be set lower than <code>session.timeout.ms</code>, but typically should be set no higher than 1/3 of that value. It can be adjusted even lower to control the expected time for normal rebalances.";
+
+    /**
+     * <code>worker.sync.timeout.ms</code>
+     */
+    public static final String WORKER_SYNC_TIMEOUT_MS_CONFIG = "worker.sync.timeout.ms";
+    private static final String WORKER_SYNC_TIMEOUT_MS_DOC = "When the worker is out of sync with other workers and needs" +
+            " to resynchronize configurations, wait up to this amount of time before giving up, leaving the group, and" +
+            " waiting a backoff period before rejoining.";
+
+    /**
+     * <code>group.unsync.timeout.ms</code>
+     */
+    public static final String WORKER_UNSYNC_BACKOFF_MS_CONFIG = "worker.unsync.backoff.ms";
+    private static final String WORKER_UNSYNC_BACKOFF_MS_DOC = "When the worker is out of sync with other workers and " +
+            " fails to catch up within worker.sync.timeout.ms, leave the Copycat cluster for this long before rejoining.";
+    public static final int WORKER_UNSYNC_BACKOFF_MS_DEFAULT = 5 * 60 * 1000;
+
+    static {
+        CONFIG = new ConfigDef()
+                .define(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG,
+                        ConfigDef.Type.LIST,
+                        ConfigDef.Importance.HIGH,
+                        CommonClientConfigs.BOOSTRAP_SERVERS_DOC)
+                .define(GROUP_ID_CONFIG, ConfigDef.Type.STRING, ConfigDef.Importance.HIGH, GROUP_ID_DOC)
+                .define(SESSION_TIMEOUT_MS_CONFIG,
+                        ConfigDef.Type.INT,
+                        30000,
+                        ConfigDef.Importance.HIGH,
+                        SESSION_TIMEOUT_MS_DOC)
+                .define(HEARTBEAT_INTERVAL_MS_CONFIG,
+                        ConfigDef.Type.INT,
+                        3000,
+                        ConfigDef.Importance.HIGH,
+                        HEARTBEAT_INTERVAL_MS_DOC)
+                .define(CommonClientConfigs.METADATA_MAX_AGE_CONFIG,
+                        ConfigDef.Type.LONG,
+                        5 * 60 * 1000,
+                        atLeast(0),
+                        ConfigDef.Importance.LOW,
+                        CommonClientConfigs.METADATA_MAX_AGE_DOC)
+                .define(CommonClientConfigs.CLIENT_ID_CONFIG,
+                        ConfigDef.Type.STRING,
+                        "",
+                        ConfigDef.Importance.LOW,
+                        CommonClientConfigs.CLIENT_ID_DOC)
+                .define(CommonClientConfigs.SEND_BUFFER_CONFIG,
+                        ConfigDef.Type.INT,
+                        128 * 1024,
+                        atLeast(0),
+                        ConfigDef.Importance.MEDIUM,
+                        CommonClientConfigs.SEND_BUFFER_DOC)
+                .define(CommonClientConfigs.RECEIVE_BUFFER_CONFIG,
+                        ConfigDef.Type.INT,
+                        32 * 1024,
+                        atLeast(0),
+                        ConfigDef.Importance.MEDIUM,
+                        CommonClientConfigs.RECEIVE_BUFFER_DOC)
+                .define(CommonClientConfigs.RECONNECT_BACKOFF_MS_CONFIG,
+                        ConfigDef.Type.LONG,
+                        50L,
+                        atLeast(0L),
+                        ConfigDef.Importance.LOW,
+                        CommonClientConfigs.RECONNECT_BACKOFF_MS_DOC)
+                .define(CommonClientConfigs.RETRY_BACKOFF_MS_CONFIG,
+                        ConfigDef.Type.LONG,
+                        100L,
+                        atLeast(0L),
+                        ConfigDef.Importance.LOW,
+                        CommonClientConfigs.RETRY_BACKOFF_MS_DOC)
+                .define(CommonClientConfigs.METRICS_SAMPLE_WINDOW_MS_CONFIG,
+                        ConfigDef.Type.LONG,
+                        30000,
+                        atLeast(0),
+                        ConfigDef.Importance.LOW,
+                        CommonClientConfigs.METRICS_SAMPLE_WINDOW_MS_DOC)
+                .define(CommonClientConfigs.METRICS_NUM_SAMPLES_CONFIG,
+                        ConfigDef.Type.INT,
+                        2,
+                        atLeast(1),
+                        ConfigDef.Importance.LOW,
+                        CommonClientConfigs.METRICS_NUM_SAMPLES_DOC)
+                .define(CommonClientConfigs.METRIC_REPORTER_CLASSES_CONFIG,
+                        ConfigDef.Type.LIST,
+                        "",
+                        ConfigDef.Importance.LOW,
+                        CommonClientConfigs.METRIC_REPORTER_CLASSES_DOC)
+                .define(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, ConfigDef.Type.STRING, CommonClientConfigs.DEFAULT_SECURITY_PROTOCOL, ConfigDef.Importance.MEDIUM, CommonClientConfigs.SECURITY_PROTOCOL_DOC)
+                .define(SSLConfigs.PRINCIPAL_BUILDER_CLASS_CONFIG, ConfigDef.Type.CLASS, SSLConfigs.DEFAULT_PRINCIPAL_BUILDER_CLASS, ConfigDef.Importance.LOW, SSLConfigs.PRINCIPAL_BUILDER_CLASS_DOC)
+                .define(SSLConfigs.SSL_PROTOCOL_CONFIG, ConfigDef.Type.STRING, SSLConfigs.DEFAULT_SSL_PROTOCOL, ConfigDef.Importance.MEDIUM, SSLConfigs.SSL_PROTOCOL_DOC)
+                .define(SSLConfigs.SSL_PROVIDER_CONFIG, ConfigDef.Type.STRING, ConfigDef.Importance.MEDIUM, SSLConfigs.SSL_PROVIDER_DOC, false)
+                .define(SSLConfigs.SSL_CIPHER_SUITES_CONFIG, ConfigDef.Type.LIST, ConfigDef.Importance.LOW, SSLConfigs.SSL_CIPHER_SUITES_DOC, false)
+                .define(SSLConfigs.SSL_ENABLED_PROTOCOLS_CONFIG, ConfigDef.Type.LIST, SSLConfigs.DEFAULT_ENABLED_PROTOCOLS, ConfigDef.Importance.MEDIUM, SSLConfigs.SSL_ENABLED_PROTOCOLS_DOC)
+                .define(SSLConfigs.SSL_KEYSTORE_TYPE_CONFIG, ConfigDef.Type.STRING, SSLConfigs.DEFAULT_SSL_KEYSTORE_TYPE, ConfigDef.Importance.MEDIUM, SSLConfigs.SSL_KEYSTORE_TYPE_DOC)
+                .define(SSLConfigs.SSL_KEYSTORE_LOCATION_CONFIG, ConfigDef.Type.STRING, ConfigDef.Importance.HIGH, SSLConfigs.SSL_KEYSTORE_LOCATION_DOC, false)
+                .define(SSLConfigs.SSL_KEYSTORE_PASSWORD_CONFIG, ConfigDef.Type.STRING, ConfigDef.Importance.HIGH, SSLConfigs.SSL_KEYSTORE_PASSWORD_DOC, false)
+                .define(SSLConfigs.SSL_KEY_PASSWORD_CONFIG, ConfigDef.Type.STRING, ConfigDef.Importance.HIGH, SSLConfigs.SSL_KEY_PASSWORD_DOC, false)
+                .define(SSLConfigs.SSL_TRUSTSTORE_TYPE_CONFIG, ConfigDef.Type.STRING, SSLConfigs.DEFAULT_SSL_TRUSTSTORE_TYPE, ConfigDef.Importance.MEDIUM, SSLConfigs.SSL_TRUSTSTORE_TYPE_DOC)
+                .define(SSLConfigs.SSL_TRUSTSTORE_LOCATION_CONFIG, ConfigDef.Type.STRING, ConfigDef.Importance.HIGH, SSLConfigs.SSL_TRUSTSTORE_LOCATION_DOC, false)
+                .define(SSLConfigs.SSL_TRUSTSTORE_PASSWORD_CONFIG, ConfigDef.Type.STRING, ConfigDef.Importance.HIGH, SSLConfigs.SSL_TRUSTSTORE_PASSWORD_DOC, false)
+                .define(SSLConfigs.SSL_KEYMANAGER_ALGORITHM_CONFIG, ConfigDef.Type.STRING, SSLConfigs.DEFAULT_SSL_KEYMANGER_ALGORITHM, ConfigDef.Importance.LOW, SSLConfigs.SSL_KEYMANAGER_ALGORITHM_DOC)
+                .define(SSLConfigs.SSL_TRUSTMANAGER_ALGORITHM_CONFIG, ConfigDef.Type.STRING, SSLConfigs.DEFAULT_SSL_TRUSTMANAGER_ALGORITHM, ConfigDef.Importance.LOW, SSLConfigs.SSL_TRUSTMANAGER_ALGORITHM_DOC)
+                .define(SSLConfigs.SSL_ENDPOINT_IDENTIFICATION_ALGORITHM_CONFIG, ConfigDef.Type.STRING, ConfigDef.Importance.LOW, SSLConfigs.SSL_ENDPOINT_IDENTIFICATION_ALGORITHM_DOC, false)
+                .define(SaslConfigs.SASL_KERBEROS_SERVICE_NAME, ConfigDef.Type.STRING, ConfigDef.Importance.MEDIUM, SaslConfigs.SASL_KERBEROS_SERVICE_NAME_DOC, false)
+                .define(SaslConfigs.SASL_KERBEROS_KINIT_CMD, ConfigDef.Type.STRING, SaslConfigs.DEFAULT_KERBEROS_KINIT_CMD, ConfigDef.Importance.LOW, SaslConfigs.SASL_KERBEROS_KINIT_CMD_DOC)
+                .define(SaslConfigs.SASL_KAFKA_SERVER_REALM, ConfigDef.Type.STRING, ConfigDef.Importance.LOW, SaslConfigs.SASL_KAFKA_SERVER_DOC, false)
+                .define(SaslConfigs.SASL_KERBEROS_TICKET_RENEW_WINDOW_FACTOR, ConfigDef.Type.DOUBLE, SaslConfigs.DEFAULT_KERBEROS_TICKET_RENEW_WINDOW_FACTOR, ConfigDef.Importance.LOW, SaslConfigs.SASL_KERBEROS_TICKET_RENEW_WINDOW_FACTOR_DOC)
+                .define(SaslConfigs.SASL_KERBEROS_TICKET_RENEW_JITTER, ConfigDef.Type.DOUBLE, SaslConfigs.DEFAULT_KERBEROS_TICKET_RENEW_JITTER, ConfigDef.Importance.LOW, SaslConfigs.SASL_KERBEROS_TICKET_RENEW_JITTER_DOC)
+                .define(SaslConfigs.SASL_KERBEROS_MIN_TIME_BEFORE_RELOGIN, ConfigDef.Type.LONG, SaslConfigs.DEFAULT_KERBEROS_MIN_TIME_BEFORE_RELOGIN, ConfigDef.Importance.LOW, SaslConfigs.SASL_KERBEROS_MIN_TIME_BEFORE_RELOGIN_DOC)
+                .define(SaslConfigs.AUTH_TO_LOCAL, ConfigDef.Type.LIST, SaslConfigs.DEFAULT_AUTH_TO_LOCAL, ConfigDef.Importance.MEDIUM, SaslConfigs.AUTH_TO_LOCAL_DOC)
+                .define(CommonClientConfigs.REQUEST_TIMEOUT_MS_CONFIG,
+                        ConfigDef.Type.INT,
+                        40 * 1000,
+                        atLeast(0),
+                        ConfigDef.Importance.MEDIUM,
+                        CommonClientConfigs.REQUEST_TIMEOUT_MS_DOC)
+                        /* default is set to be a bit lower than the server default (10 min), to avoid both client and server closing connection at same time */
+                .define(CommonClientConfigs.CONNECTIONS_MAX_IDLE_MS_CONFIG,
+                        ConfigDef.Type.LONG,
+                        9 * 60 * 1000,
+                        ConfigDef.Importance.MEDIUM,
+                        CommonClientConfigs.CONNECTIONS_MAX_IDLE_MS_DOC)
+                .define(WORKER_SYNC_TIMEOUT_MS_CONFIG,
+                        ConfigDef.Type.INT,
+                        3000,
+                        ConfigDef.Importance.MEDIUM,
+                        WORKER_SYNC_TIMEOUT_MS_DOC)
+                .define(WORKER_UNSYNC_BACKOFF_MS_CONFIG,
+                        ConfigDef.Type.INT,
+                        WORKER_UNSYNC_BACKOFF_MS_DEFAULT,
+                        ConfigDef.Importance.MEDIUM,
+                        WORKER_UNSYNC_BACKOFF_MS_DOC);
+    }
+
+    DistributedHerderConfig(Map<?, ?> props) {
+        super(CONFIG, props);
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/2e617735/copycat/runtime/src/main/java/org/apache/kafka/copycat/runtime/distributed/NotLeaderException.java
----------------------------------------------------------------------
diff --git a/copycat/runtime/src/main/java/org/apache/kafka/copycat/runtime/distributed/NotLeaderException.java b/copycat/runtime/src/main/java/org/apache/kafka/copycat/runtime/distributed/NotLeaderException.java
new file mode 100644
index 0000000..ce8fba5
--- /dev/null
+++ b/copycat/runtime/src/main/java/org/apache/kafka/copycat/runtime/distributed/NotLeaderException.java
@@ -0,0 +1,38 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ **/
+
+package org.apache.kafka.copycat.runtime.distributed;
+
+import org.apache.kafka.copycat.errors.CopycatException;
+
+/**
+ * Indicates an operation was not permitted because it can only be performed on the leader and this worker is not currently
+ * the leader.
+ */
+public class NotLeaderException extends CopycatException {
+    public NotLeaderException(String s) {
+        super(s);
+    }
+
+    public NotLeaderException(String s, Throwable throwable) {
+        super(s, throwable);
+    }
+
+    public NotLeaderException(Throwable throwable) {
+        super(throwable);
+    }
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/2e617735/copycat/runtime/src/main/java/org/apache/kafka/copycat/runtime/distributed/WorkerCoordinator.java
----------------------------------------------------------------------
diff --git a/copycat/runtime/src/main/java/org/apache/kafka/copycat/runtime/distributed/WorkerCoordinator.java b/copycat/runtime/src/main/java/org/apache/kafka/copycat/runtime/distributed/WorkerCoordinator.java
new file mode 100644
index 0000000..c70ed4f
--- /dev/null
+++ b/copycat/runtime/src/main/java/org/apache/kafka/copycat/runtime/distributed/WorkerCoordinator.java
@@ -0,0 +1,288 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ **/
+package org.apache.kafka.copycat.runtime.distributed;
+
+import org.apache.kafka.clients.consumer.internals.AbstractCoordinator;
+import org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient;
+import org.apache.kafka.common.MetricName;
+import org.apache.kafka.common.metrics.Measurable;
+import org.apache.kafka.common.metrics.MetricConfig;
+import org.apache.kafka.common.metrics.Metrics;
+import org.apache.kafka.common.utils.CircularIterator;
+import org.apache.kafka.common.utils.Time;
+import org.apache.kafka.copycat.storage.KafkaConfigStorage;
+import org.apache.kafka.copycat.util.ConnectorTaskId;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.Closeable;
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Map;
+
+/**
+ * This class manages the coordination process with the Kafka group coordinator on the broker for managing Copycat assignments to workers.
+ */
+public final class WorkerCoordinator extends AbstractCoordinator implements Closeable {
+    private static final Logger log = LoggerFactory.getLogger(WorkerCoordinator.class);
+
+    // Currently Copycat doesn't support multiple task assignment strategies, so we currently just fill in a default value
+    public static final String DEFAULT_SUBPROTOCOL = "default";
+
+    private final KafkaConfigStorage configStorage;
+    private CopycatProtocol.Assignment assignmentSnapshot;
+    private final CopycatWorkerCoordinatorMetrics sensors;
+    private ClusterConfigState configSnapshot;
+    private final WorkerRebalanceListener listener;
+
+    private boolean rejoinRequested;
+
+    /**
+     * Initialize the coordination manager.
+     */
+    public WorkerCoordinator(ConsumerNetworkClient client,
+                             String groupId,
+                             int sessionTimeoutMs,
+                             int heartbeatIntervalMs,
+                             Metrics metrics,
+                             String metricGrpPrefix,
+                             Map<String, String> metricTags,
+                             Time time,
+                             long requestTimeoutMs,
+                             long retryBackoffMs,
+                             KafkaConfigStorage configStorage,
+                             WorkerRebalanceListener listener) {
+        super(client,
+                groupId,
+                sessionTimeoutMs,
+                heartbeatIntervalMs,
+                metrics,
+                metricGrpPrefix,
+                metricTags,
+                time,
+                requestTimeoutMs,
+                retryBackoffMs);
+        this.configStorage = configStorage;
+        this.assignmentSnapshot = null;
+        this.sensors = new CopycatWorkerCoordinatorMetrics(metrics, metricGrpPrefix, metricTags);
+        this.listener = listener;
+        this.rejoinRequested = false;
+    }
+
+    public void requestRejoin() {
+        rejoinRequested = true;
+    }
+
+    @Override
+    public String protocolType() {
+        return "copycat";
+    }
+
+    @Override
+    public LinkedHashMap<String, ByteBuffer> metadata() {
+        LinkedHashMap<String, ByteBuffer> metadata = new LinkedHashMap<>();
+        configSnapshot = configStorage.snapshot();
+        CopycatProtocol.ConfigState configState = new CopycatProtocol.ConfigState(configSnapshot.offset());
+        metadata.put(DEFAULT_SUBPROTOCOL, CopycatProtocol.serializeMetadata(configState));
+        return metadata;
+    }
+
+    @Override
+    protected void onJoin(int generation, String memberId, String protocol, ByteBuffer memberAssignment) {
+        assignmentSnapshot = CopycatProtocol.deserializeAssignment(memberAssignment);
+        // At this point we always consider ourselves to be a member of the cluster, even if there was an assignment
+        // error (the leader couldn't make the assignment) or we are behind the config and cannot yet work on our assigned
+        // tasks. It's the responsibility of the code driving this process to decide how to react (e.g. trying to get
+        // up to date, try to rejoin again, leaving the group and backing off, etc.).
+        rejoinRequested = false;
+        listener.onAssigned(assignmentSnapshot);
+    }
+
+    @Override
+    protected Map<String, ByteBuffer> doSync(String leaderId, String protocol, Map<String, ByteBuffer> allMemberMetadata) {
+        log.debug("Performing task assignment");
+
+        Map<String, CopycatProtocol.ConfigState> allConfigs = new HashMap<>();
+        for (Map.Entry<String, ByteBuffer> entry : allMemberMetadata.entrySet())
+            allConfigs.put(entry.getKey(), CopycatProtocol.deserializeMetadata(entry.getValue()));
+
+        long maxOffset = findMaxMemberConfigOffset(allConfigs);
+        Long leaderOffset = ensureLeaderConfig(maxOffset);
+        if (leaderOffset == null)
+            return fillAssignmentsAndSerialize(allConfigs.keySet(), CopycatProtocol.Assignment.CONFIG_MISMATCH,
+                    leaderId, maxOffset, new HashMap<String, List<String>>(), new HashMap<String, List<ConnectorTaskId>>());
+        return performTaskAssignment(leaderId, leaderOffset, allConfigs);
+    }
+
+    private long findMaxMemberConfigOffset(Map<String, CopycatProtocol.ConfigState> allConfigs) {
+        // The new config offset is the maximum seen by any member. We always perform assignment using this offset,
+        // even if some members have fallen behind. The config offset used to generate the assignment is included in
+        // the response so members that have fallen behind will not use the assignment until they have caught up.
+        Long maxOffset = null;
+        for (Map.Entry<String, CopycatProtocol.ConfigState> stateEntry : allConfigs.entrySet()) {
+            long memberRootOffset = stateEntry.getValue().offset();
+            if (maxOffset == null)
+                maxOffset = memberRootOffset;
+            else
+                maxOffset = Math.max(maxOffset, memberRootOffset);
+        }
+
+        log.debug("Max config offset root: {}, local snapshot config offsets root: {}",
+                maxOffset, configSnapshot.offset());
+        return maxOffset;
+    }
+
+    private Long ensureLeaderConfig(long maxOffset) {
+        // If this leader is behind some other members, we can't do assignment
+        if (configSnapshot.offset() < maxOffset) {
+            // We might be able to take a new snapshot to catch up immediately and avoid another round of syncing here.
+            // Alternatively, if this node has already passed the maximum reported by any other member of the group, it
+            // is also safe to use this newer state.
+            ClusterConfigState updatedSnapshot = configStorage.snapshot();
+            if (updatedSnapshot.offset() < maxOffset) {
+                log.info("Was selected to perform assignments, but do not have latest config found in sync request. " +
+                        "Returning an empty configuration to trigger re-sync.");
+                return null;
+            } else {
+                configSnapshot = updatedSnapshot;
+                return configSnapshot.offset();
+            }
+        }
+
+        return maxOffset;
+    }
+
+    private Map<String, ByteBuffer> performTaskAssignment(String leaderId, long maxOffset, Map<String, CopycatProtocol.ConfigState> allConfigs) {
+        Map<String, List<String>> connectorAssignments = new HashMap<>();
+        Map<String, List<ConnectorTaskId>> taskAssignments = new HashMap<>();
+
+        // Perform round-robin task assignment
+        CircularIterator<String> memberIt = new CircularIterator<>(sorted(allConfigs.keySet()));
+        for (String connectorId : sorted(configSnapshot.connectors())) {
+            String connectorAssignedTo = memberIt.next();
+            log.trace("Assigning connector {} to {}", connectorId, connectorAssignedTo);
+            List<String> memberConnectors = connectorAssignments.get(connectorAssignedTo);
+            if (memberConnectors == null) {
+                memberConnectors = new ArrayList<>();
+                connectorAssignments.put(connectorAssignedTo, memberConnectors);
+            }
+            memberConnectors.add(connectorId);
+
+            for (ConnectorTaskId taskId : sorted(configSnapshot.tasks(connectorId))) {
+                String taskAssignedTo = memberIt.next();
+                log.trace("Assigning task {} to {}", taskId, taskAssignedTo);
+                List<ConnectorTaskId> memberTasks = taskAssignments.get(taskAssignedTo);
+                if (memberTasks == null) {
+                    memberTasks = new ArrayList<>();
+                    taskAssignments.put(taskAssignedTo, memberTasks);
+                }
+                memberTasks.add(taskId);
+            }
+        }
+
+        return fillAssignmentsAndSerialize(allConfigs.keySet(), CopycatProtocol.Assignment.NO_ERROR,
+                leaderId, maxOffset, connectorAssignments, taskAssignments);
+    }
+
+    private Map<String, ByteBuffer> fillAssignmentsAndSerialize(Collection<String> members,
+                                                                short error,
+                                                                String leaderId,
+                                                                long maxOffset,
+                                                                Map<String, List<String>> connectorAssignments,
+                                                                Map<String, List<ConnectorTaskId>> taskAssignments) {
+
+        Map<String, ByteBuffer> groupAssignment = new HashMap<>();
+        for (String member : members) {
+            List<String> connectors = connectorAssignments.get(member);
+            if (connectors == null)
+                connectors = Collections.emptyList();
+            List<ConnectorTaskId> tasks = taskAssignments.get(member);
+            if (tasks == null)
+                tasks = Collections.emptyList();
+            CopycatProtocol.Assignment assignment = new CopycatProtocol.Assignment(error, leaderId, maxOffset, connectors, tasks);
+            log.debug("Assignment: {} -> {}", member, assignment);
+            groupAssignment.put(member, CopycatProtocol.serializeAssignment(assignment));
+        }
+        log.debug("Finished assignment");
+        return groupAssignment;
+    }
+
+    @Override
+    protected void onLeave(int generation, String memberId) {
+        log.debug("Revoking previous assignment {}", assignmentSnapshot);
+        if (assignmentSnapshot != null && !assignmentSnapshot.failed())
+            listener.onRevoked(assignmentSnapshot.leader(), assignmentSnapshot.connectors(), assignmentSnapshot.tasks());
+    }
+
+    @Override
+    public boolean needRejoin() {
+        return super.needRejoin() || (assignmentSnapshot == null || assignmentSnapshot.failed()) || rejoinRequested;
+    }
+
+    @Override
+    public void close() {
+    }
+
+    public String memberId() {
+        return this.memberId;
+    }
+
+    private class CopycatWorkerCoordinatorMetrics {
+        public final Metrics metrics;
+        public final String metricGrpName;
+
+        public CopycatWorkerCoordinatorMetrics(Metrics metrics, String metricGrpPrefix, Map<String, String> tags) {
+            this.metrics = metrics;
+            this.metricGrpName = metricGrpPrefix + "-coordinator-metrics";
+
+            Measurable numConnectors = new Measurable() {
+                public double measure(MetricConfig config, long now) {
+                    return assignmentSnapshot.connectors().size();
+                }
+            };
+
+            Measurable numTasks = new Measurable() {
+                public double measure(MetricConfig config, long now) {
+                    return assignmentSnapshot.tasks().size();
+                }
+            };
+
+            metrics.addMetric(new MetricName("assigned-connectors",
+                            this.metricGrpName,
+                            "The number of connector instances currently assigned to this consumer",
+                            tags),
+                    numConnectors);
+            metrics.addMetric(new MetricName("assigned-tasks",
+                            this.metricGrpName,
+                            "The number of tasks currently assigned to this consumer",
+                            tags),
+                    numTasks);
+        }
+    }
+
+    private static <T extends Comparable<T>> List<T> sorted(Collection<T> members) {
+        List<T> res = new ArrayList<>(members);
+        Collections.sort(res);
+        return res;
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/2e617735/copycat/runtime/src/main/java/org/apache/kafka/copycat/runtime/distributed/WorkerGroupMember.java
----------------------------------------------------------------------
diff --git a/copycat/runtime/src/main/java/org/apache/kafka/copycat/runtime/distributed/WorkerGroupMember.java b/copycat/runtime/src/main/java/org/apache/kafka/copycat/runtime/distributed/WorkerGroupMember.java
new file mode 100644
index 0000000..f8cabaa
--- /dev/null
+++ b/copycat/runtime/src/main/java/org/apache/kafka/copycat/runtime/distributed/WorkerGroupMember.java
@@ -0,0 +1,184 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ **/
+
+package org.apache.kafka.copycat.runtime.distributed;
+
+import org.apache.kafka.clients.ClientUtils;
+import org.apache.kafka.clients.CommonClientConfigs;
+import org.apache.kafka.clients.Metadata;
+import org.apache.kafka.clients.NetworkClient;
+import org.apache.kafka.clients.consumer.ConsumerConfig;
+import org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient;
+import org.apache.kafka.common.Cluster;
+import org.apache.kafka.common.KafkaException;
+import org.apache.kafka.common.metrics.JmxReporter;
+import org.apache.kafka.common.metrics.MetricConfig;
+import org.apache.kafka.common.metrics.Metrics;
+import org.apache.kafka.common.metrics.MetricsReporter;
+import org.apache.kafka.common.network.ChannelBuilder;
+import org.apache.kafka.common.network.Selector;
+import org.apache.kafka.common.utils.AppInfoParser;
+import org.apache.kafka.common.utils.SystemTime;
+import org.apache.kafka.common.utils.Time;
+import org.apache.kafka.copycat.storage.KafkaConfigStorage;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.net.InetSocketAddress;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicReference;
+
+/**
+ * This class manages the coordination process with brokers for the copycat cluster group membership. It ties together
+ * the Coordinator, which implements the group member protocol, with all the other pieces needed to drive the connection
+ * to the group coordinator broker. This isolates all the networking to a single thread managed by this class, with
+ * higher level operations in response to group membership events being handled by the herder.
+ */
+public class WorkerGroupMember {
+    private static final Logger log = LoggerFactory.getLogger(WorkerGroupMember.class);
+
+    private static final AtomicInteger COPYCAT_CLIENT_ID_SEQUENCE = new AtomicInteger(1);
+    private static final String JMX_PREFIX = "kafka.copycat";
+
+    private final Time time;
+    private final String clientId;
+    private final ConsumerNetworkClient client;
+    private final Metrics metrics;
+    private final Metadata metadata;
+    private final long retryBackoffMs;
+    private final WorkerCoordinator coordinator;
+
+    private boolean stopped = false;
+
+    public WorkerGroupMember(DistributedHerderConfig config, KafkaConfigStorage configStorage, WorkerRebalanceListener listener) {
+        try {
+            this.time = new SystemTime();
+
+            MetricConfig metricConfig = new MetricConfig().samples(config.getInt(CommonClientConfigs.METRICS_NUM_SAMPLES_CONFIG))
+                    .timeWindow(config.getLong(CommonClientConfigs.METRICS_SAMPLE_WINDOW_MS_CONFIG), TimeUnit.MILLISECONDS);
+            String clientIdConfig = config.getString(CommonClientConfigs.CLIENT_ID_CONFIG);
+            clientId = clientIdConfig.length() <= 0 ? "copycat-" + COPYCAT_CLIENT_ID_SEQUENCE.getAndIncrement() : clientIdConfig;
+            List<MetricsReporter> reporters = config.getConfiguredInstances(CommonClientConfigs.METRIC_REPORTER_CLASSES_CONFIG, MetricsReporter.class);
+            reporters.add(new JmxReporter(JMX_PREFIX));
+            this.metrics = new Metrics(metricConfig, reporters, time);
+            this.retryBackoffMs = config.getLong(CommonClientConfigs.RETRY_BACKOFF_MS_CONFIG);
+            this.metadata = new Metadata(retryBackoffMs, config.getLong(CommonClientConfigs.METADATA_MAX_AGE_CONFIG));
+            List<InetSocketAddress> addresses = ClientUtils.parseAndValidateAddresses(config.getList(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG));
+            this.metadata.update(Cluster.bootstrap(addresses), 0);
+            String metricGrpPrefix = "copycat";
+            Map<String, String> metricsTags = new LinkedHashMap<>();
+            metricsTags.put("client-id", clientId);
+            ChannelBuilder channelBuilder = ClientUtils.createChannelBuilder(config.values());
+            NetworkClient netClient = new NetworkClient(
+                    new Selector(config.getLong(CommonClientConfigs.CONNECTIONS_MAX_IDLE_MS_CONFIG), metrics, time, metricGrpPrefix, metricsTags, channelBuilder),
+                    this.metadata,
+                    clientId,
+                    100, // a fixed large enough value will suffice
+                    config.getLong(CommonClientConfigs.RECONNECT_BACKOFF_MS_CONFIG),
+                    config.getInt(CommonClientConfigs.SEND_BUFFER_CONFIG),
+                    config.getInt(CommonClientConfigs.RECEIVE_BUFFER_CONFIG),
+                    config.getInt(CommonClientConfigs.REQUEST_TIMEOUT_MS_CONFIG), time);
+            this.client = new ConsumerNetworkClient(netClient, metadata, time, retryBackoffMs);
+            this.coordinator = new WorkerCoordinator(this.client,
+                    config.getString(DistributedHerderConfig.GROUP_ID_CONFIG),
+                    config.getInt(DistributedHerderConfig.SESSION_TIMEOUT_MS_CONFIG),
+                    config.getInt(DistributedHerderConfig.HEARTBEAT_INTERVAL_MS_CONFIG),
+                    metrics,
+                    metricGrpPrefix,
+                    metricsTags,
+                    this.time,
+                    config.getInt(ConsumerConfig.REQUEST_TIMEOUT_MS_CONFIG),
+                    retryBackoffMs,
+                    configStorage,
+                    listener);
+
+            AppInfoParser.registerAppInfo(JMX_PREFIX, clientId);
+            log.debug("Copycat group member created");
+        } catch (Throwable t) {
+            // call close methods if internal objects are already constructed
+            // this is to prevent resource leak. see KAFKA-2121
+            stop(true);
+            // now propagate the exception
+            throw new KafkaException("Failed to construct kafka consumer", t);
+        }
+    }
+
+    public void stop() {
+        if (stopped) return;
+        stop(false);
+    }
+
+    public void ensureActive() {
+        coordinator.ensureCoordinatorKnown();
+        coordinator.ensureActiveGroup();
+    }
+
+    public void poll(long timeout) {
+        if (timeout < 0)
+            throw new IllegalArgumentException("Timeout must not be negative");
+
+        // poll for new data until the timeout expires
+        long remaining = timeout;
+        while (remaining >= 0) {
+            long start = time.milliseconds();
+            coordinator.ensureCoordinatorKnown();
+            coordinator.ensureActiveGroup();
+            client.poll(remaining);
+            remaining -= time.milliseconds() - start;
+        }
+    }
+
+    /**
+     * Interrupt any running poll() calls, causing a ConsumerWakeupException to be thrown in the thread invoking that method.
+     */
+    public void wakeup() {
+        this.client.wakeup();
+    }
+
+    /**
+     * Get the member ID of this worker in the group of workers.
+     *
+     * This ID is the unique member ID automatically generated.
+     *
+     * @return the member ID
+     */
+    public String memberId() {
+        return coordinator.memberId();
+    }
+
+    public void requestRejoin() {
+        coordinator.requestRejoin();
+    }
+
+    private void stop(boolean swallowException) {
+        log.trace("Stopping the Copycat group member.");
+        AtomicReference<Throwable> firstException = new AtomicReference<Throwable>();
+        this.stopped = true;
+        ClientUtils.closeQuietly(coordinator, "coordinator", firstException);
+        ClientUtils.closeQuietly(metrics, "consumer metrics", firstException);
+        ClientUtils.closeQuietly(client, "consumer network client", firstException);
+        AppInfoParser.unregisterAppInfo(JMX_PREFIX, clientId);
+        if (firstException.get() != null && !swallowException)
+            throw new KafkaException("Failed to stop the Copycat group member", firstException.get());
+        else
+            log.debug("The Copycat group member has stopped.");
+    }
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/2e617735/copycat/runtime/src/main/java/org/apache/kafka/copycat/runtime/distributed/WorkerRebalanceListener.java
----------------------------------------------------------------------
diff --git a/copycat/runtime/src/main/java/org/apache/kafka/copycat/runtime/distributed/WorkerRebalanceListener.java b/copycat/runtime/src/main/java/org/apache/kafka/copycat/runtime/distributed/WorkerRebalanceListener.java
new file mode 100644
index 0000000..c9d2ed2
--- /dev/null
+++ b/copycat/runtime/src/main/java/org/apache/kafka/copycat/runtime/distributed/WorkerRebalanceListener.java
@@ -0,0 +1,38 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ **/
+
+package org.apache.kafka.copycat.runtime.distributed;
+
+import org.apache.kafka.copycat.util.ConnectorTaskId;
+
+import java.util.Collection;
+
+/**
+ * Listener for rebalance events in the worker group.
+ */
+public interface WorkerRebalanceListener {
+    /**
+     * Invoked when a new assignment is created by joining the Copycat worker group. This is invoked for both successful
+     * and unsuccessful assignments.
+     */
+    void onAssigned(CopycatProtocol.Assignment assignment);
+
+    /**
+     * Invoked when a rebalance operation starts, revoking ownership for the set of connectors and tasks.
+     */
+    void onRevoked(String leader, Collection<String> connectors, Collection<ConnectorTaskId> tasks);
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/2e617735/copycat/runtime/src/main/java/org/apache/kafka/copycat/runtime/standalone/StandaloneHerder.java
----------------------------------------------------------------------
diff --git a/copycat/runtime/src/main/java/org/apache/kafka/copycat/runtime/standalone/StandaloneHerder.java b/copycat/runtime/src/main/java/org/apache/kafka/copycat/runtime/standalone/StandaloneHerder.java
index d5670fd..167ee60 100644
--- a/copycat/runtime/src/main/java/org/apache/kafka/copycat/runtime/standalone/StandaloneHerder.java
+++ b/copycat/runtime/src/main/java/org/apache/kafka/copycat/runtime/standalone/StandaloneHerder.java
@@ -17,20 +17,23 @@
 
 package org.apache.kafka.copycat.runtime.standalone;
 
-import org.apache.kafka.common.utils.Utils;
-import org.apache.kafka.copycat.connector.Connector;
 import org.apache.kafka.copycat.errors.CopycatException;
 import org.apache.kafka.copycat.runtime.ConnectorConfig;
 import org.apache.kafka.copycat.runtime.Herder;
 import org.apache.kafka.copycat.runtime.HerderConnectorContext;
+import org.apache.kafka.copycat.runtime.TaskConfig;
 import org.apache.kafka.copycat.runtime.Worker;
-import org.apache.kafka.copycat.sink.SinkConnector;
-import org.apache.kafka.copycat.sink.SinkTask;
-import org.apache.kafka.copycat.util.*;
+import org.apache.kafka.copycat.util.Callback;
+import org.apache.kafka.copycat.util.ConnectorTaskId;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import java.util.*;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.Map;
+import java.util.Set;
+
 
 /**
  * Single process, in-memory "herder". Useful for a standalone copycat process.
@@ -56,11 +59,8 @@ public class StandaloneHerder implements Herder {
         // There's no coordination/hand-off to do here since this is all standalone. Instead, we
         // should just clean up the stuff we normally would, i.e. cleanly checkpoint and shutdown all
         // the tasks.
-        for (Map.Entry<String, ConnectorState> entry : connectors.entrySet()) {
-            ConnectorState state = entry.getValue();
-            stopConnector(state);
-        }
-        connectors.clear();
+        for (String connName : new HashSet<>(connectors.keySet()))
+            stopConnector(connName);
 
         log.info("Herder stopped");
     }
@@ -69,11 +69,14 @@ public class StandaloneHerder implements Herder {
     public synchronized void addConnector(Map<String, String> connectorProps,
                                           Callback<String> callback) {
         try {
-            ConnectorState connState = createConnector(connectorProps);
+            ConnectorConfig connConfig = new ConnectorConfig(connectorProps);
+            String connName = connConfig.getString(ConnectorConfig.NAME_CONFIG);
+            worker.addConnector(connConfig, new HerderConnectorContext(this, connName));
+            connectors.put(connName, new ConnectorState(connConfig));
             if (callback != null)
-                callback.onCompletion(null, connState.name);
+                callback.onCompletion(null, connName);
             // This should always be a new job, create jobs from scratch
-            createConnectorTasks(connState);
+            createConnectorTasks(connName);
         } catch (CopycatException e) {
             if (callback != null)
                 callback.onCompletion(e, null);
@@ -81,9 +84,9 @@ public class StandaloneHerder implements Herder {
     }
 
     @Override
-    public synchronized void deleteConnector(String name, Callback<Void> callback) {
+    public synchronized void deleteConnector(String connName, Callback<Void> callback) {
         try {
-            destroyConnector(name);
+            stopConnector(connName);
             if (callback != null)
                 callback.onCompletion(null, null);
         } catch (CopycatException e) {
@@ -94,114 +97,35 @@ public class StandaloneHerder implements Herder {
 
     @Override
     public synchronized void requestTaskReconfiguration(String connName) {
-        ConnectorState state = connectors.get(connName);
-        if (state == null) {
+        if (!worker.connectorNames().contains(connName)) {
             log.error("Task that requested reconfiguration does not exist: {}", connName);
             return;
         }
-        updateConnectorTasks(state);
-    }
-
-    // Creates and configures the connector. Does not setup any tasks
-    private ConnectorState createConnector(Map<String, String> connectorProps) {
-        ConnectorConfig connConfig = new ConnectorConfig(connectorProps);
-        String connName = connConfig.getString(ConnectorConfig.NAME_CONFIG);
-        String className = connConfig.getString(ConnectorConfig.CONNECTOR_CLASS_CONFIG);
-        log.info("Creating connector {} of type {}", connName, className);
-        int maxTasks = connConfig.getInt(ConnectorConfig.TASKS_MAX_CONFIG);
-        List<String> topics = connConfig.getList(ConnectorConfig.TOPICS_CONFIG); // Sinks only
-        Properties configs = connConfig.unusedProperties();
-
-        if (connectors.containsKey(connName)) {
-            log.error("Ignoring request to create connector due to conflicting connector name");
-            throw new CopycatException("Connector with name " + connName + " already exists");
-        }
-
-        final Connector connector;
-        try {
-            connector = instantiateConnector(className);
-        } catch (Throwable t) {
-            // Catches normal exceptions due to instantiation errors as well as any runtime errors that
-            // may be caused by user code
-            throw new CopycatException("Failed to create connector instance", t);
-        }
-        connector.initialize(new HerderConnectorContext(this, connName));
-        try {
-            connector.start(configs);
-        } catch (CopycatException e) {
-            throw new CopycatException("Connector threw an exception while starting", e);
-        }
-        ConnectorState state = new ConnectorState(connName, connector, maxTasks, topics);
-        connectors.put(connName, state);
-
-        log.info("Finished creating connector {}", connName);
-
-        return state;
-    }
-
-    private static Connector instantiateConnector(String className) {
-        try {
-            return Utils.newInstance(className, Connector.class);
-        } catch (ClassNotFoundException e) {
-            throw new CopycatException("Couldn't instantiate connector class", e);
-        }
-    }
-
-    private void destroyConnector(String connName) {
-        log.info("Destroying connector {}", connName);
-        ConnectorState state = connectors.get(connName);
-        if (state == null) {
-            log.error("Failed to destroy connector {} because it does not exist", connName);
-            throw new CopycatException("Connector does not exist");
-        }
-
-        stopConnector(state);
-        connectors.remove(state.name);
-
-        log.info("Finished destroying connector {}", connName);
+        updateConnectorTasks(connName);
     }
 
     // Stops a connectors tasks, then the connector
-    private void stopConnector(ConnectorState state) {
-        removeConnectorTasks(state);
+    private void stopConnector(String connName) {
+        removeConnectorTasks(connName);
         try {
-            state.connector.stop();
+            worker.stopConnector(connName);
+            connectors.remove(connName);
         } catch (CopycatException e) {
-            log.error("Error shutting down connector {}: ", state.connector, e);
+            log.error("Error shutting down connector {}: ", connName, e);
         }
     }
 
-    private void createConnectorTasks(ConnectorState state) {
-        String taskClassName = state.connector.taskClass().getName();
-
-        log.info("Creating tasks for connector {} of type {}", state.name, taskClassName);
-
-        List<Properties> taskConfigs = state.connector.taskConfigs(state.maxTasks);
-
-        // Generate the final configs, including framework provided settings
-        Map<ConnectorTaskId, Properties> taskProps = new HashMap<>();
-        for (int i = 0; i < taskConfigs.size(); i++) {
-            ConnectorTaskId taskId = new ConnectorTaskId(state.name, i);
-            Properties config = taskConfigs.get(i);
-            // TODO: This probably shouldn't be in the Herder. It's nice to have Copycat ensure the list of topics
-            // is automatically provided to tasks since it is required by the framework, but this
-            String subscriptionTopics = Utils.join(state.inputTopics, ",");
-            if (state.connector instanceof SinkConnector) {
-                // Make sure we don't modify the original since the connector may reuse it internally
-                Properties configForSink = new Properties();
-                configForSink.putAll(config);
-                configForSink.setProperty(SinkTask.TOPICS_CONFIG, subscriptionTopics);
-                config = configForSink;
-            }
-            taskProps.put(taskId, config);
-        }
+    private void createConnectorTasks(String connName) {
+        ConnectorState state = connectors.get(connName);
+        Map<ConnectorTaskId, Map<String, String>> taskConfigs = worker.reconfigureConnectorTasks(connName,
+                state.config.getInt(ConnectorConfig.TASKS_MAX_CONFIG),
+                state.config.getList(ConnectorConfig.TOPICS_CONFIG));
 
-        // And initiate the tasks
-        for (int i = 0; i < taskConfigs.size(); i++) {
-            ConnectorTaskId taskId = new ConnectorTaskId(state.name, i);
-            Properties config = taskProps.get(taskId);
+        for (Map.Entry<ConnectorTaskId, Map<String, String>> taskEntry : taskConfigs.entrySet()) {
+            ConnectorTaskId taskId = taskEntry.getKey();
+            TaskConfig config = new TaskConfig(taskEntry.getValue());
             try {
-                worker.addTask(taskId, taskClassName, config);
+                worker.addTask(taskId, config);
                 // We only need to store the task IDs so we can clean up.
                 state.tasks.add(taskId);
             } catch (Throwable e) {
@@ -213,7 +137,8 @@ public class StandaloneHerder implements Herder {
         }
     }
 
-    private void removeConnectorTasks(ConnectorState state) {
+    private void removeConnectorTasks(String connName) {
+        ConnectorState state = connectors.get(connName);
         Iterator<ConnectorTaskId> taskIter = state.tasks.iterator();
         while (taskIter.hasNext()) {
             ConnectorTaskId taskId = taskIter.next();
@@ -228,25 +153,18 @@ public class StandaloneHerder implements Herder {
         }
     }
 
-    private void updateConnectorTasks(ConnectorState state) {
-        removeConnectorTasks(state);
-        createConnectorTasks(state);
+    private void updateConnectorTasks(String connName) {
+        removeConnectorTasks(connName);
+        createConnectorTasks(connName);
     }
 
 
     private static class ConnectorState {
-        public String name;
-        public Connector connector;
-        public int maxTasks;
-        public List<String> inputTopics;
+        public ConnectorConfig config;
         Set<ConnectorTaskId> tasks;
 
-        public ConnectorState(String name, Connector connector, int maxTasks,
-                              List<String> inputTopics) {
-            this.name = name;
-            this.connector = connector;
-            this.maxTasks = maxTasks;
-            this.inputTopics = inputTopics;
+        public ConnectorState(ConnectorConfig config) {
+            this.config = config;
             this.tasks = new HashSet<>();
         }
     }

http://git-wip-us.apache.org/repos/asf/kafka/blob/2e617735/copycat/runtime/src/main/java/org/apache/kafka/copycat/storage/KafkaConfigStorage.java
----------------------------------------------------------------------
diff --git a/copycat/runtime/src/main/java/org/apache/kafka/copycat/storage/KafkaConfigStorage.java b/copycat/runtime/src/main/java/org/apache/kafka/copycat/storage/KafkaConfigStorage.java
index 366bf13..fb4f70d 100644
--- a/copycat/runtime/src/main/java/org/apache/kafka/copycat/storage/KafkaConfigStorage.java
+++ b/copycat/runtime/src/main/java/org/apache/kafka/copycat/storage/KafkaConfigStorage.java
@@ -44,6 +44,7 @@ import java.util.Map;
 import java.util.Set;
 import java.util.TreeSet;
 import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Future;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.TimeoutException;
 
@@ -226,6 +227,7 @@ public class KafkaConfigStorage {
         consumerProps.putAll(configs);
         consumerProps.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");
         consumerProps.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.ByteArrayDeserializer");
+        consumerProps.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);
 
         configLog = createKafkaBasedLog(topic, producerProps, consumerProps, consumedCallback);
     }
@@ -271,9 +273,14 @@ public class KafkaConfigStorage {
      * @param properties the configuration to write
      */
     public void putConnectorConfig(String connector, Map<String, String> properties) {
-        Struct copycatConfig = new Struct(CONNECTOR_CONFIGURATION_V0);
-        copycatConfig.put("properties", properties);
-        byte[] serializedConfig = converter.fromCopycatData(topic, CONNECTOR_CONFIGURATION_V0, copycatConfig);
+        byte[] serializedConfig;
+        if (properties == null) {
+            serializedConfig = null;
+        } else {
+            Struct copycatConfig = new Struct(CONNECTOR_CONFIGURATION_V0);
+            copycatConfig.put("properties", properties);
+            serializedConfig = converter.fromCopycatData(topic, CONNECTOR_CONFIGURATION_V0, copycatConfig);
+        }
 
         try {
             configLog.send(CONNECTOR_KEY(connector), serializedConfig);
@@ -349,6 +356,14 @@ public class KafkaConfigStorage {
         }
     }
 
+    public Future<Void> readToEnd() {
+        return configLog.readToEnd();
+    }
+
+    public void readToEnd(Callback<Void> cb) {
+        configLog.readToEnd(cb);
+    }
+
     private KafkaBasedLog<String, byte[]> createKafkaBasedLog(String topic, Map<String, Object> producerProps,
                                                               Map<String, Object> consumerProps, Callback<ConsumerRecord<String, byte[]>> consumedCallback) {
         return new KafkaBasedLog<>(topic, producerProps, consumerProps, consumedCallback, new SystemTime());
@@ -369,22 +384,29 @@ public class KafkaConfigStorage {
                 log.error("Failed to convert config data to Copycat format: ", e);
                 return;
             }
-            offset = record.offset();
+            // Make the recorded offset match the API used for positions in the consumer -- return the offset of the
+            // *next record*, not the last one consumed.
+            offset = record.offset() + 1;
 
             if (record.key().startsWith(CONNECTOR_PREFIX)) {
                 String connectorName = record.key().substring(CONNECTOR_PREFIX.length());
                 synchronized (lock) {
-                    // Connector configs can be applied and callbacks invoked immediately
-                    if (!(value.value() instanceof Map)) {
-                        log.error("Found connector configuration (" + record.key() + ") in wrong format: " + value.value().getClass());
-                        return;
-                    }
-                    Object newConnectorConfig = ((Map<String, Object>) value.value()).get("properties");
-                    if (!(newConnectorConfig instanceof Map)) {
-                        log.error("Invalid data for connector config: properties filed should be a Map but is " + newConnectorConfig.getClass());
-                        return;
+                    if (value.value() == null) {
+                        // Connector deletion will be written as a null value
+                        connectorConfigs.remove(connectorName);
+                    } else {
+                        // Connector configs can be applied and callbacks invoked immediately
+                        if (!(value.value() instanceof Map)) {
+                            log.error("Found connector configuration (" + record.key() + ") in wrong format: " + value.value().getClass());
+                            return;
+                        }
+                        Object newConnectorConfig = ((Map<String, Object>) value.value()).get("properties");
+                        if (!(newConnectorConfig instanceof Map)) {
+                            log.error("Invalid data for connector config: properties filed should be a Map but is " + newConnectorConfig.getClass());
+                            return;
+                        }
+                        connectorConfigs.put(connectorName, (Map<String, String>) newConnectorConfig);
                     }
-                    connectorConfigs.put(connectorName, (Map<String, String>) newConnectorConfig);
                 }
                 if (!starting)
                     connectorConfigCallback.onCompletion(null, connectorName);
@@ -445,8 +467,7 @@ public class KafkaConfigStorage {
 
                     Map<ConnectorTaskId, Map<String, String>> deferred = deferredTaskUpdates.get(connectorName);
 
-                    Object newTaskCountObj = ((Map<String, Object>) value.value()).get("tasks");
-                    Integer newTaskCount = (Integer) newTaskCountObj;
+                    int newTaskCount = intValue(((Map<String, Object>) value.value()).get("tasks"));
 
                     // Validate the configs we're supposed to update to ensure we're getting a complete configuration
                     // update of all tasks that are expected based on the number of tasks in the commit message.
@@ -542,5 +563,16 @@ public class KafkaConfigStorage {
                 return false;
         return true;
     }
+
+    // Convert an integer value extracted from a schemaless struct to an int. This handles potentially different
+    // encodings by different Converters.
+    private static int intValue(Object value) {
+        if (value instanceof Integer)
+            return (int) value;
+        else if (value instanceof Long)
+            return (int) (long) value;
+        else
+            throw new CopycatException("Expected integer value to be either Integer or Long");
+    }
 }
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/2e617735/copycat/runtime/src/main/java/org/apache/kafka/copycat/storage/KafkaOffsetBackingStore.java
----------------------------------------------------------------------
diff --git a/copycat/runtime/src/main/java/org/apache/kafka/copycat/storage/KafkaOffsetBackingStore.java b/copycat/runtime/src/main/java/org/apache/kafka/copycat/storage/KafkaOffsetBackingStore.java
index b5af1fe..b270368 100644
--- a/copycat/runtime/src/main/java/org/apache/kafka/copycat/storage/KafkaOffsetBackingStore.java
+++ b/copycat/runtime/src/main/java/org/apache/kafka/copycat/storage/KafkaOffsetBackingStore.java
@@ -68,11 +68,13 @@ public class KafkaOffsetBackingStore implements OffsetBackingStore {
         producerProps.putAll(configs);
         producerProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.ByteArraySerializer");
         producerProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.ByteArraySerializer");
+        producerProps.put(ProducerConfig.ACKS_CONFIG, "all");
 
         Map<String, Object> consumerProps = new HashMap<>();
         consumerProps.putAll(configs);
         consumerProps.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.ByteArrayDeserializer");
         consumerProps.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.ByteArrayDeserializer");
+        consumerProps.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);
 
         offsetLog = createKafkaBasedLog(topic, producerProps, consumerProps, consumedCallback);
     }

http://git-wip-us.apache.org/repos/asf/kafka/blob/2e617735/copycat/runtime/src/main/java/org/apache/kafka/copycat/util/ConnectorTaskId.java
----------------------------------------------------------------------
diff --git a/copycat/runtime/src/main/java/org/apache/kafka/copycat/util/ConnectorTaskId.java b/copycat/runtime/src/main/java/org/apache/kafka/copycat/util/ConnectorTaskId.java
index 683c634..e3e498c 100644
--- a/copycat/runtime/src/main/java/org/apache/kafka/copycat/util/ConnectorTaskId.java
+++ b/copycat/runtime/src/main/java/org/apache/kafka/copycat/util/ConnectorTaskId.java
@@ -23,7 +23,7 @@ import java.io.Serializable;
  * Unique ID for a single task. It includes a unique connector ID and a task ID that is unique within
  * the connector.
  */
-public class ConnectorTaskId implements Serializable {
+public class ConnectorTaskId implements Serializable, Comparable<ConnectorTaskId> {
     private final String connector;
     private final int task;
 
@@ -68,4 +68,12 @@ public class ConnectorTaskId implements Serializable {
     public String toString() {
         return connector + '-' + task;
     }
+
+    @Override
+    public int compareTo(ConnectorTaskId o) {
+        int connectorCmp = connector.compareTo(o.connector);
+        if (connectorCmp != 0)
+            return connectorCmp;
+        return ((Integer) task).compareTo(o.task);
+    }
 }

http://git-wip-us.apache.org/repos/asf/kafka/blob/2e617735/copycat/runtime/src/test/java/org/apache/kafka/copycat/runtime/WorkerTest.java
----------------------------------------------------------------------
diff --git a/copycat/runtime/src/test/java/org/apache/kafka/copycat/runtime/WorkerTest.java b/copycat/runtime/src/test/java/org/apache/kafka/copycat/runtime/WorkerTest.java
index 4a30992..19e1462 100644
--- a/copycat/runtime/src/test/java/org/apache/kafka/copycat/runtime/WorkerTest.java
+++ b/copycat/runtime/src/test/java/org/apache/kafka/copycat/runtime/WorkerTest.java
@@ -19,11 +19,19 @@ package org.apache.kafka.copycat.runtime;
 
 import org.apache.kafka.common.utils.Time;
 import org.apache.kafka.clients.producer.KafkaProducer;
+import org.apache.kafka.common.utils.Utils;
 import org.apache.kafka.copycat.cli.WorkerConfig;
+import org.apache.kafka.copycat.connector.Connector;
+import org.apache.kafka.copycat.connector.ConnectorContext;
+import org.apache.kafka.copycat.connector.Task;
 import org.apache.kafka.copycat.errors.CopycatException;
+import org.apache.kafka.copycat.sink.SinkTask;
 import org.apache.kafka.copycat.source.SourceRecord;
 import org.apache.kafka.copycat.source.SourceTask;
-import org.apache.kafka.copycat.storage.*;
+import org.apache.kafka.copycat.storage.Converter;
+import org.apache.kafka.copycat.storage.OffsetBackingStore;
+import org.apache.kafka.copycat.storage.OffsetStorageReader;
+import org.apache.kafka.copycat.storage.OffsetStorageWriter;
 import org.apache.kafka.copycat.util.ConnectorTaskId;
 import org.apache.kafka.copycat.util.MockTime;
 import org.apache.kafka.copycat.util.ThreadedTest;
@@ -36,16 +44,24 @@ import org.powermock.core.classloader.annotations.PowerMockIgnore;
 import org.powermock.core.classloader.annotations.PrepareForTest;
 import org.powermock.modules.junit4.PowerMockRunner;
 
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
 import java.util.Properties;
 
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.fail;
+
 @RunWith(PowerMockRunner.class)
 @PrepareForTest(Worker.class)
 @PowerMockIgnore("javax.management.*")
 public class WorkerTest extends ThreadedTest {
 
-    private ConnectorTaskId taskId = new ConnectorTaskId("job", 0);
+    private static final String CONNECTOR_ID = "test-connector";
+    private static final ConnectorTaskId TASK_ID = new ConnectorTaskId("job", 0);
+
     private WorkerConfig config;
     private Worker worker;
     private OffsetBackingStore offsetBackingStore = PowerMock.createMock(OffsetBackingStore.class);
@@ -65,6 +81,146 @@ public class WorkerTest extends ThreadedTest {
     }
 
     @Test
+    public void testAddRemoveConnector() throws Exception {
+        offsetBackingStore.configure(EasyMock.anyObject(Map.class));
+        EasyMock.expectLastCall();
+        offsetBackingStore.start();
+        EasyMock.expectLastCall();
+
+        // Create
+        Connector connector = PowerMock.createMock(Connector.class);
+        ConnectorContext ctx = PowerMock.createMock(ConnectorContext.class);
+
+        PowerMock.mockStatic(Worker.class);
+        PowerMock.expectPrivate(Worker.class, "instantiateConnector", new Object[]{TestConnector.class}).andReturn(connector);
+
+        Properties props = new Properties();
+        props.put(ConnectorConfig.TOPICS_CONFIG, "foo,bar");
+        props.put(ConnectorConfig.TASKS_MAX_CONFIG, "1");
+        props.put(ConnectorConfig.NAME_CONFIG, CONNECTOR_ID);
+        props.put(ConnectorConfig.CONNECTOR_CLASS_CONFIG, TestConnector.class.getName());
+
+        connector.initialize(ctx);
+        EasyMock.expectLastCall();
+        connector.start(props);
+        EasyMock.expectLastCall();
+
+        // Remove
+        connector.stop();
+        EasyMock.expectLastCall();
+
+        offsetBackingStore.stop();
+        EasyMock.expectLastCall();
+
+        PowerMock.replayAll();
+
+        worker = new Worker(new MockTime(), config, offsetBackingStore);
+        worker.start();
+
+        ConnectorConfig config = new ConnectorConfig(Utils.propsToStringMap(props));
+        assertEquals(Collections.emptySet(), worker.connectorNames());
+        worker.addConnector(config, ctx);
+        assertEquals(new HashSet<>(Arrays.asList(CONNECTOR_ID)), worker.connectorNames());
+        try {
+            worker.addConnector(config, ctx);
+            fail("Should have thrown exception when trying to add connector with same name.");
+        } catch (CopycatException e) {
+            // expected
+        }
+        worker.stopConnector(CONNECTOR_ID);
+        assertEquals(Collections.emptySet(), worker.connectorNames());
+        // Nothing should be left, so this should effectively be a nop
+        worker.stop();
+
+        PowerMock.verifyAll();
+    }
+
+    @Test(expected = CopycatException.class)
+    public void testStopInvalidConnector() {
+        offsetBackingStore.configure(EasyMock.anyObject(Map.class));
+        EasyMock.expectLastCall();
+        offsetBackingStore.start();
+        EasyMock.expectLastCall();
+
+        PowerMock.replayAll();
+
+        worker = new Worker(new MockTime(), config, offsetBackingStore);
+        worker.start();
+
+        worker.stopConnector(CONNECTOR_ID);
+    }
+
+    @Test
+    public void testReconfigureConnectorTasks() throws Exception {
+        offsetBackingStore.configure(EasyMock.anyObject(Map.class));
+        EasyMock.expectLastCall();
+        offsetBackingStore.start();
+        EasyMock.expectLastCall();
+
+        // Create
+        Connector connector = PowerMock.createMock(Connector.class);
+        ConnectorContext ctx = PowerMock.createMock(ConnectorContext.class);
+
+        PowerMock.mockStatic(Worker.class);
+        PowerMock.expectPrivate(Worker.class, "instantiateConnector", new Object[]{TestConnector.class}).andReturn(connector);
+
+        Properties props = new Properties();
+        props.put(ConnectorConfig.TOPICS_CONFIG, "foo,bar");
+        props.put(ConnectorConfig.TASKS_MAX_CONFIG, "1");
+        props.put(ConnectorConfig.NAME_CONFIG, CONNECTOR_ID);
+        props.put(ConnectorConfig.CONNECTOR_CLASS_CONFIG, TestConnector.class.getName());
+
+        connector.initialize(ctx);
+        EasyMock.expectLastCall();
+        connector.start(props);
+        EasyMock.expectLastCall();
+
+        // Reconfigure
+        EasyMock.<Class<? extends Task>>expect(connector.taskClass()).andReturn(TestSourceTask.class);
+        Properties taskProps = new Properties();
+        taskProps.setProperty("foo", "bar");
+        EasyMock.expect(connector.taskConfigs(2)).andReturn(Arrays.asList(taskProps, taskProps));
+
+        // Remove
+        connector.stop();
+        EasyMock.expectLastCall();
+
+        offsetBackingStore.stop();
+        EasyMock.expectLastCall();
+
+        PowerMock.replayAll();
+
+        worker = new Worker(new MockTime(), config, offsetBackingStore);
+        worker.start();
+
+        ConnectorConfig config = new ConnectorConfig(Utils.propsToStringMap(props));
+        assertEquals(Collections.emptySet(), worker.connectorNames());
+        worker.addConnector(config, ctx);
+        assertEquals(new HashSet<>(Arrays.asList(CONNECTOR_ID)), worker.connectorNames());
+        try {
+            worker.addConnector(config, ctx);
+            fail("Should have thrown exception when trying to add connector with same name.");
+        } catch (CopycatException e) {
+            // expected
+        }
+        Map<ConnectorTaskId, Map<String, String>> taskConfigs = worker.reconfigureConnectorTasks(CONNECTOR_ID, 2, Arrays.asList("foo", "bar"));
+        Properties expectedTaskProps = new Properties();
+        expectedTaskProps.setProperty("foo", "bar");
+        expectedTaskProps.setProperty(TaskConfig.TASK_CLASS_CONFIG, TestSourceTask.class.getName());
+        expectedTaskProps.setProperty(SinkTask.TOPICS_CONFIG, "foo,bar");
+        assertEquals(2, taskConfigs.size());
+        assertEquals(expectedTaskProps, taskConfigs.get(new ConnectorTaskId(CONNECTOR_ID, 0)));
+        assertEquals(expectedTaskProps, taskConfigs.get(new ConnectorTaskId(CONNECTOR_ID, 1)));
+        worker.stopConnector(CONNECTOR_ID);
+        assertEquals(Collections.emptySet(), worker.connectorNames());
+        // Nothing should be left, so this should effectively be a nop
+        worker.stop();
+
+        PowerMock.verifyAll();
+    }
+
+
+    @Test
     public void testAddRemoveTask() throws Exception {
         offsetBackingStore.configure(EasyMock.anyObject(Map.class));
         EasyMock.expectLastCall();
@@ -78,7 +234,7 @@ public class WorkerTest extends ThreadedTest {
         WorkerSourceTask workerTask = PowerMock.createMock(WorkerSourceTask.class);
 
         PowerMock.mockStatic(Worker.class);
-        PowerMock.expectPrivate(Worker.class, "instantiateTask", TestSourceTask.class.getName()).andReturn(task);
+        PowerMock.expectPrivate(Worker.class, "instantiateTask", new Object[]{TestSourceTask.class}).andReturn(task);
 
         PowerMock.expectNew(
                 WorkerSourceTask.class, EasyMock.eq(taskId), EasyMock.eq(task),
@@ -91,6 +247,7 @@ public class WorkerTest extends ThreadedTest {
                 EasyMock.anyObject(Time.class))
                 .andReturn(workerTask);
         Properties origProps = new Properties();
+        origProps.put(TaskConfig.TASK_CLASS_CONFIG, TestSourceTask.class.getName());
         workerTask.start(origProps);
         EasyMock.expectLastCall();
 
@@ -108,8 +265,11 @@ public class WorkerTest extends ThreadedTest {
 
         worker = new Worker(new MockTime(), config, offsetBackingStore);
         worker.start();
-        worker.addTask(taskId, TestSourceTask.class.getName(), origProps);
+        assertEquals(Collections.emptySet(), worker.taskIds());
+        worker.addTask(taskId, new TaskConfig(Utils.propsToStringMap(origProps)));
+        assertEquals(new HashSet<>(Arrays.asList(taskId)), worker.taskIds());
         worker.stopTask(taskId);
+        assertEquals(Collections.emptySet(), worker.taskIds());
         // Nothing should be left, so this should effectively be a nop
         worker.stop();
 
@@ -128,7 +288,7 @@ public class WorkerTest extends ThreadedTest {
         worker = new Worker(new MockTime(), config, offsetBackingStore);
         worker.start();
 
-        worker.stopTask(taskId);
+        worker.stopTask(TASK_ID);
     }
 
     @Test
@@ -143,10 +303,10 @@ public class WorkerTest extends ThreadedTest {
         WorkerSourceTask workerTask = PowerMock.createMock(WorkerSourceTask.class);
 
         PowerMock.mockStatic(Worker.class);
-        PowerMock.expectPrivate(Worker.class, "instantiateTask", TestSourceTask.class.getName()).andReturn(task);
+        PowerMock.expectPrivate(Worker.class, "instantiateTask", new Object[]{TestSourceTask.class}).andReturn(task);
 
         PowerMock.expectNew(
-                WorkerSourceTask.class, EasyMock.eq(taskId), EasyMock.eq(task),
+                WorkerSourceTask.class, EasyMock.eq(TASK_ID), EasyMock.eq(task),
                 EasyMock.anyObject(Converter.class),
                 EasyMock.anyObject(Converter.class),
                 EasyMock.anyObject(KafkaProducer.class),
@@ -156,6 +316,7 @@ public class WorkerTest extends ThreadedTest {
                 EasyMock.anyObject(Time.class))
                 .andReturn(workerTask);
         Properties origProps = new Properties();
+        origProps.put(TaskConfig.TASK_CLASS_CONFIG, TestSourceTask.class.getName());
         workerTask.start(origProps);
         EasyMock.expectLastCall();
 
@@ -174,13 +335,35 @@ public class WorkerTest extends ThreadedTest {
 
         worker = new Worker(new MockTime(), config, offsetBackingStore);
         worker.start();
-        worker.addTask(taskId, TestSourceTask.class.getName(), origProps);
+        worker.addTask(TASK_ID, new TaskConfig(Utils.propsToStringMap(origProps)));
         worker.stop();
 
         PowerMock.verifyAll();
     }
 
 
+    private static class TestConnector extends Connector {
+        @Override
+        public void start(Properties props) {
+
+        }
+
+        @Override
+        public Class<? extends Task> taskClass() {
+            return null;
+        }
+
+        @Override
+        public List<Properties> taskConfigs(int maxTasks) {
+            return null;
+        }
+
+        @Override
+        public void stop() {
+
+        }
+    }
+
     private static class TestSourceTask extends SourceTask {
         public TestSourceTask() {
         }