You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by gw...@apache.org on 2015/10/30 23:00:13 UTC

[2/4] kafka git commit: KAFKA-2369: Add REST API for Copycat.

http://git-wip-us.apache.org/repos/asf/kafka/blob/c001b204/copycat/runtime/src/main/java/org/apache/kafka/copycat/runtime/rest/resources/ConnectorsResource.java
----------------------------------------------------------------------
diff --git a/copycat/runtime/src/main/java/org/apache/kafka/copycat/runtime/rest/resources/ConnectorsResource.java b/copycat/runtime/src/main/java/org/apache/kafka/copycat/runtime/rest/resources/ConnectorsResource.java
new file mode 100644
index 0000000..823155e
--- /dev/null
+++ b/copycat/runtime/src/main/java/org/apache/kafka/copycat/runtime/rest/resources/ConnectorsResource.java
@@ -0,0 +1,201 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ **/
+
+package org.apache.kafka.copycat.runtime.rest.resources;
+
+import com.fasterxml.jackson.core.type.TypeReference;
+import org.apache.kafka.copycat.runtime.ConnectorConfig;
+import org.apache.kafka.copycat.runtime.Herder;
+import org.apache.kafka.copycat.runtime.distributed.NotLeaderException;
+import org.apache.kafka.copycat.runtime.rest.RestServer;
+import org.apache.kafka.copycat.runtime.rest.entities.ConnectorInfo;
+import org.apache.kafka.copycat.runtime.rest.entities.CreateConnectorRequest;
+import org.apache.kafka.copycat.runtime.rest.entities.TaskInfo;
+import org.apache.kafka.copycat.runtime.rest.errors.CopycatRestException;
+import org.apache.kafka.copycat.util.FutureCallback;
+
+import javax.servlet.ServletContext;
+import javax.ws.rs.Consumes;
+import javax.ws.rs.DELETE;
+import javax.ws.rs.GET;
+import javax.ws.rs.POST;
+import javax.ws.rs.PUT;
+import javax.ws.rs.Path;
+import javax.ws.rs.PathParam;
+import javax.ws.rs.Produces;
+import javax.ws.rs.core.MediaType;
+import javax.ws.rs.core.Response;
+import java.net.URI;
+import java.util.Collection;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+
+@Path("/connectors")
+@Produces(MediaType.APPLICATION_JSON)
+@Consumes(MediaType.APPLICATION_JSON)
+public class ConnectorsResource {
+    // TODO: This should not be so long. However, due to potentially long rebalances that may have to wait a full
+    // session timeout to complete, during which we cannot serve some requests. Ideally we could reduce this, but
+    // we need to consider all possible scenarios this could fail. It might be ok to fail with a timeout in rare cases,
+    // but currently a worker simply leaving the group can take this long as well.
+    private static final long REQUEST_TIMEOUT_MS = 90 * 1000;
+
+    private final Herder herder;
+    @javax.ws.rs.core.Context
+    private ServletContext context;
+
+    public ConnectorsResource(Herder herder) {
+        this.herder = herder;
+    }
+
+    @GET
+    @Path("/")
+    public Collection<String> listConnectors() throws Throwable {
+        FutureCallback<Collection<String>> cb = new FutureCallback<>();
+        herder.connectors(cb);
+        return completeOrForwardRequest(cb, "/connectors", "GET", null, new TypeReference<Collection<String>>() {
+        });
+    }
+
+    @POST
+    @Path("/")
+    public Response createConnector(final CreateConnectorRequest createRequest) throws Throwable {
+        String name = createRequest.name();
+        Map<String, String> configs = createRequest.config();
+        if (!configs.containsKey(ConnectorConfig.NAME_CONFIG))
+            configs.put(ConnectorConfig.NAME_CONFIG, name);
+
+        FutureCallback<Herder.Created<ConnectorInfo>> cb = new FutureCallback<>();
+        herder.putConnectorConfig(name, configs, false, cb);
+        Herder.Created<ConnectorInfo> info = completeOrForwardRequest(cb, "/connectors", "POST", createRequest,
+                new TypeReference<ConnectorInfo>() { }, new CreatedConnectorInfoTranslator());
+        return Response.created(URI.create("/connectors/" + name)).entity(info.result()).build();
+    }
+
+    @GET
+    @Path("/{connector}")
+    public ConnectorInfo getConnector(final @PathParam("connector") String connector) throws Throwable {
+        FutureCallback<ConnectorInfo> cb = new FutureCallback<>();
+        herder.connectorInfo(connector, cb);
+        return completeOrForwardRequest(cb, "/connectors/" + connector, "GET", null, new TypeReference<ConnectorInfo>() {
+        });
+    }
+
+    @GET
+    @Path("/{connector}/config")
+    public Map<String, String> getConnectorConfig(final @PathParam("connector") String connector) throws Throwable {
+        FutureCallback<Map<String, String>> cb = new FutureCallback<>();
+        herder.connectorConfig(connector, cb);
+        return completeOrForwardRequest(cb, "/connectors/" + connector + "/config", "GET", null, new TypeReference<Map<String, String>>() {
+        });
+    }
+
+    @PUT
+    @Path("/{connector}/config")
+    public Response putConnectorConfig(final @PathParam("connector") String connector,
+                                   final Map<String, String> connectorConfig) throws Throwable {
+        FutureCallback<Herder.Created<ConnectorInfo>> cb = new FutureCallback<>();
+        herder.putConnectorConfig(connector, connectorConfig, true, cb);
+        Herder.Created<ConnectorInfo> createdInfo = completeOrForwardRequest(cb, "/connectors/" + connector + "/config",
+                "PUT", connectorConfig, new TypeReference<ConnectorInfo>() { }, new CreatedConnectorInfoTranslator());
+        Response.ResponseBuilder response;
+        if (createdInfo.created())
+            response = Response.created(URI.create("/connectors/" + connector));
+        else
+            response = Response.ok();
+        return response.entity(createdInfo.result()).build();
+    }
+
+    @GET
+    @Path("/{connector}/tasks")
+    public List<TaskInfo> getTaskConfigs(final @PathParam("connector") String connector) throws Throwable {
+        FutureCallback<List<TaskInfo>> cb = new FutureCallback<>();
+        herder.taskConfigs(connector, cb);
+        return completeOrForwardRequest(cb, "/connectors/" + connector + "/tasks", "GET", null, new TypeReference<List<TaskInfo>>() {
+        });
+    }
+
+    @POST
+    @Path("/{connector}/tasks")
+    public void putTaskConfigs(final @PathParam("connector") String connector,
+                               final List<Map<String, String>> taskConfigs) throws Throwable {
+        FutureCallback<Void> cb = new FutureCallback<>();
+        herder.putTaskConfigs(connector, taskConfigs, cb);
+        completeOrForwardRequest(cb, "/connectors/" + connector + "/tasks", "POST", taskConfigs);
+    }
+
+    @DELETE
+    @Path("/{connector}")
+    public void destroyConnector(final @PathParam("connector") String connector) throws Throwable {
+        FutureCallback<Herder.Created<ConnectorInfo>> cb = new FutureCallback<>();
+        herder.putConnectorConfig(connector, null, true, cb);
+        completeOrForwardRequest(cb, "/connectors/" + connector, "DELETE", null);
+    }
+
+    // Wait for a FutureCallback to complete. If it succeeds, return the parsed response. If it fails, try to forward the
+    // request to the leader.
+    private <T, U> T completeOrForwardRequest(
+            FutureCallback<T> cb, String path, String method, Object body, TypeReference<U> resultType,
+            Translator<T, U> translator) throws Throwable {
+        try {
+            return cb.get(REQUEST_TIMEOUT_MS, TimeUnit.MILLISECONDS);
+        } catch (ExecutionException e) {
+            if (e.getCause() instanceof NotLeaderException) {
+                NotLeaderException notLeaderError = (NotLeaderException) e.getCause();
+                return translator.translate(RestServer.httpRequest(RestServer.urlJoin(notLeaderError.leaderUrl(), path), method, body, resultType));
+            }
+
+            throw e.getCause();
+        } catch (TimeoutException e) {
+            // This timeout is for the operation itself. None of the timeout error codes are relevant, so internal server
+            // error is the best option
+            throw new CopycatRestException(Response.Status.INTERNAL_SERVER_ERROR.getStatusCode(), "Request timed out");
+        } catch (InterruptedException e) {
+            throw new CopycatRestException(Response.Status.INTERNAL_SERVER_ERROR.getStatusCode(), "Request interrupted");
+        }
+    }
+
+    private <T> T completeOrForwardRequest(FutureCallback<T> cb, String path, String method, Object body, TypeReference<T> resultType) throws Throwable {
+        return completeOrForwardRequest(cb, path, method, body, resultType, new IdentityTranslator<T>());
+    }
+
+    private <T> T completeOrForwardRequest(FutureCallback<T> cb, String path, String method, Object body) throws Throwable {
+        return completeOrForwardRequest(cb, path, method, body, null, new IdentityTranslator<T>());
+    }
+
+    private interface Translator<T, U> {
+        T translate(RestServer.HttpResponse<U> response);
+    }
+
+    private class IdentityTranslator<T> implements Translator<T, T> {
+        @Override
+        public T translate(RestServer.HttpResponse<T> response) {
+            return response.body();
+        }
+    }
+
+    private class CreatedConnectorInfoTranslator implements Translator<Herder.Created<ConnectorInfo>, ConnectorInfo> {
+        @Override
+        public Herder.Created<ConnectorInfo> translate(RestServer.HttpResponse<ConnectorInfo> response) {
+            boolean created = response.status() == 201;
+            return new Herder.Created<>(created, response.body());
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/c001b204/copycat/runtime/src/main/java/org/apache/kafka/copycat/runtime/rest/resources/RootResource.java
----------------------------------------------------------------------
diff --git a/copycat/runtime/src/main/java/org/apache/kafka/copycat/runtime/rest/resources/RootResource.java b/copycat/runtime/src/main/java/org/apache/kafka/copycat/runtime/rest/resources/RootResource.java
new file mode 100644
index 0000000..d012c5b
--- /dev/null
+++ b/copycat/runtime/src/main/java/org/apache/kafka/copycat/runtime/rest/resources/RootResource.java
@@ -0,0 +1,36 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ **/
+
+package org.apache.kafka.copycat.runtime.rest.resources;
+
+import org.apache.kafka.copycat.runtime.rest.entities.ServerInfo;
+
+import javax.ws.rs.GET;
+import javax.ws.rs.Path;
+import javax.ws.rs.Produces;
+import javax.ws.rs.core.MediaType;
+
+@Path("/")
+@Produces(MediaType.APPLICATION_JSON)
+public class RootResource {
+
+    @GET
+    @Path("/")
+    public ServerInfo serverInfo() {
+        return new ServerInfo();
+    }
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/c001b204/copycat/runtime/src/main/java/org/apache/kafka/copycat/runtime/standalone/StandaloneConfig.java
----------------------------------------------------------------------
diff --git a/copycat/runtime/src/main/java/org/apache/kafka/copycat/runtime/standalone/StandaloneConfig.java b/copycat/runtime/src/main/java/org/apache/kafka/copycat/runtime/standalone/StandaloneConfig.java
new file mode 100644
index 0000000..246d36d
--- /dev/null
+++ b/copycat/runtime/src/main/java/org/apache/kafka/copycat/runtime/standalone/StandaloneConfig.java
@@ -0,0 +1,35 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ **/
+
+package org.apache.kafka.copycat.runtime.standalone;
+
+import org.apache.kafka.common.config.ConfigDef;
+import org.apache.kafka.copycat.runtime.WorkerConfig;
+
+import java.util.Properties;
+
+public class StandaloneConfig extends WorkerConfig {
+    private static final ConfigDef CONFIG;
+
+    static {
+        CONFIG = baseConfigDef();
+    }
+
+    public StandaloneConfig(Properties props) {
+        super(CONFIG, props);
+    }
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/c001b204/copycat/runtime/src/main/java/org/apache/kafka/copycat/runtime/standalone/StandaloneHerder.java
----------------------------------------------------------------------
diff --git a/copycat/runtime/src/main/java/org/apache/kafka/copycat/runtime/standalone/StandaloneHerder.java b/copycat/runtime/src/main/java/org/apache/kafka/copycat/runtime/standalone/StandaloneHerder.java
index 167ee60..24a789a 100644
--- a/copycat/runtime/src/main/java/org/apache/kafka/copycat/runtime/standalone/StandaloneHerder.java
+++ b/copycat/runtime/src/main/java/org/apache/kafka/copycat/runtime/standalone/StandaloneHerder.java
@@ -17,22 +17,27 @@
 
 package org.apache.kafka.copycat.runtime.standalone;
 
+import org.apache.kafka.copycat.errors.AlreadyExistsException;
 import org.apache.kafka.copycat.errors.CopycatException;
+import org.apache.kafka.copycat.errors.NotFoundException;
 import org.apache.kafka.copycat.runtime.ConnectorConfig;
 import org.apache.kafka.copycat.runtime.Herder;
 import org.apache.kafka.copycat.runtime.HerderConnectorContext;
 import org.apache.kafka.copycat.runtime.TaskConfig;
 import org.apache.kafka.copycat.runtime.Worker;
+import org.apache.kafka.copycat.runtime.rest.entities.ConnectorInfo;
+import org.apache.kafka.copycat.runtime.rest.entities.TaskInfo;
 import org.apache.kafka.copycat.util.Callback;
 import org.apache.kafka.copycat.util.ConnectorTaskId;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import java.util.ArrayList;
+import java.util.Collection;
 import java.util.HashMap;
 import java.util.HashSet;
-import java.util.Iterator;
+import java.util.List;
 import java.util.Map;
-import java.util.Set;
 
 
 /**
@@ -41,7 +46,7 @@ import java.util.Set;
 public class StandaloneHerder implements Herder {
     private static final Logger log = LoggerFactory.getLogger(StandaloneHerder.class);
 
-    private Worker worker;
+    private final Worker worker;
     private HashMap<String, ConnectorState> connectors = new HashMap<>();
 
     public StandaloneHerder(Worker worker) {
@@ -59,40 +64,95 @@ public class StandaloneHerder implements Herder {
         // There's no coordination/hand-off to do here since this is all standalone. Instead, we
         // should just clean up the stuff we normally would, i.e. cleanly checkpoint and shutdown all
         // the tasks.
-        for (String connName : new HashSet<>(connectors.keySet()))
-            stopConnector(connName);
+        for (String connName : new HashSet<>(connectors.keySet())) {
+            removeConnectorTasks(connName);
+            try {
+                worker.stopConnector(connName);
+            } catch (CopycatException e) {
+                log.error("Error shutting down connector {}: ", connName, e);
+            }
+        }
+        connectors.clear();
 
         log.info("Herder stopped");
     }
 
     @Override
-    public synchronized void addConnector(Map<String, String> connectorProps,
-                                          Callback<String> callback) {
-        try {
-            ConnectorConfig connConfig = new ConnectorConfig(connectorProps);
-            String connName = connConfig.getString(ConnectorConfig.NAME_CONFIG);
-            worker.addConnector(connConfig, new HerderConnectorContext(this, connName));
-            connectors.put(connName, new ConnectorState(connConfig));
-            if (callback != null)
-                callback.onCompletion(null, connName);
-            // This should always be a new job, create jobs from scratch
-            createConnectorTasks(connName);
-        } catch (CopycatException e) {
-            if (callback != null)
-                callback.onCompletion(e, null);
+    public synchronized void connectors(Callback<Collection<String>> callback) {
+        callback.onCompletion(null, new ArrayList<>(connectors.keySet()));
+    }
+
+    @Override
+    public synchronized void connectorInfo(String connName, Callback<ConnectorInfo> callback) {
+        ConnectorState state = connectors.get(connName);
+        if (state == null) {
+            callback.onCompletion(new NotFoundException("Connector " + connName + " not found"), null);
+            return;
         }
+        callback.onCompletion(null, createConnectorInfo(state));
+    }
+
+    private ConnectorInfo createConnectorInfo(ConnectorState state) {
+        if (state == null)
+            return null;
+
+        List<ConnectorTaskId> taskIds = new ArrayList<>();
+        for (int i = 0; i < state.taskConfigs.size(); i++)
+            taskIds.add(new ConnectorTaskId(state.name, i));
+        return new ConnectorInfo(state.name, state.configOriginals, taskIds);
     }
 
     @Override
-    public synchronized void deleteConnector(String connName, Callback<Void> callback) {
+    public void connectorConfig(String connName, final Callback<Map<String, String>> callback) {
+        // Subset of connectorInfo, so piggy back on that implementation
+        connectorInfo(connName, new Callback<ConnectorInfo>() {
+            @Override
+            public void onCompletion(Throwable error, ConnectorInfo result) {
+                if (error != null) {
+                    callback.onCompletion(error, null);
+                    return;
+                }
+                callback.onCompletion(null, result.config());
+            }
+        });
+    }
+
+    @Override
+    public synchronized void putConnectorConfig(String connName, final Map<String, String> config,
+                                                boolean allowReplace,
+                                                final Callback<Created<ConnectorInfo>> callback) {
         try {
-            stopConnector(connName);
-            if (callback != null)
-                callback.onCompletion(null, null);
+            boolean created = false;
+            if (connectors.containsKey(connName)) {
+                if (!allowReplace) {
+                    callback.onCompletion(new AlreadyExistsException("Connector " + connName + " already exists"), null);
+                    return;
+                }
+                if (config == null) // Deletion, kill tasks as well
+                    removeConnectorTasks(connName);
+                worker.stopConnector(connName);
+                if (config == null)
+                    connectors.remove(connName);
+            } else {
+                if (config == null) {
+                    // Deletion, must already exist
+                    callback.onCompletion(new NotFoundException("Connector " + connName + " not found", null), null);
+                    return;
+                }
+                created = true;
+            }
+            if (config != null) {
+                startConnector(config);
+                updateConnectorTasks(connName);
+            }
+            if (config != null)
+                callback.onCompletion(null, new Created<>(created, createConnectorInfo(connectors.get(connName))));
+            else
+                callback.onCompletion(null, new Created<ConnectorInfo>(false, null));
         } catch (CopycatException e) {
-            if (callback != null)
-                callback.onCompletion(e, null);
+            callback.onCompletion(e, null);
         }
+
     }
 
     @Override
@@ -104,68 +164,109 @@ public class StandaloneHerder implements Herder {
         updateConnectorTasks(connName);
     }
 
-    // Stops a connectors tasks, then the connector
-    private void stopConnector(String connName) {
-        removeConnectorTasks(connName);
-        try {
-            worker.stopConnector(connName);
-            connectors.remove(connName);
-        } catch (CopycatException e) {
-            log.error("Error shutting down connector {}: ", connName, e);
+    @Override
+    public synchronized void taskConfigs(String connName, Callback<List<TaskInfo>> callback) {
+        ConnectorState state = connectors.get(connName);
+        if (state == null) {
+            callback.onCompletion(new NotFoundException("Connector " + connName + " not found", null), null);
+            return;
         }
+
+        List<TaskInfo> result = new ArrayList<>();
+        for (int i = 0; i < state.taskConfigs.size(); i++) {
+            TaskInfo info = new TaskInfo(new ConnectorTaskId(connName, i), state.taskConfigs.get(i));
+            result.add(info);
+        }
+        callback.onCompletion(null, result);
     }
 
-    private void createConnectorTasks(String connName) {
+    @Override
+    public void putTaskConfigs(String connName, List<Map<String, String>> configs, Callback<Void> callback) {
+        throw new UnsupportedOperationException("Copycat in standalone mode does not support externally setting task configurations.");
+    }
+
+    /**
+     * Start a connector in the worker and record its state.
+     * @param connectorProps new connector configuration
+     * @return the connector name
+     */
+    private String startConnector(Map<String, String> connectorProps) {
+        ConnectorConfig connConfig = new ConnectorConfig(connectorProps);
+        String connName = connConfig.getString(ConnectorConfig.NAME_CONFIG);
         ConnectorState state = connectors.get(connName);
-        Map<ConnectorTaskId, Map<String, String>> taskConfigs = worker.reconfigureConnectorTasks(connName,
+        worker.addConnector(connConfig, new HerderConnectorContext(this, connName));
+        if (state == null) {
+            connectors.put(connName, new ConnectorState(connectorProps, connConfig));
+        } else {
+            state.configOriginals = connectorProps;
+            state.config = connConfig;
+        }
+        return connName;
+    }
+
+
+    private List<Map<String, String>> recomputeTaskConfigs(String connName) {
+        ConnectorState state = connectors.get(connName);
+        return worker.connectorTaskConfigs(connName,
                 state.config.getInt(ConnectorConfig.TASKS_MAX_CONFIG),
                 state.config.getList(ConnectorConfig.TOPICS_CONFIG));
+    }
 
-        for (Map.Entry<ConnectorTaskId, Map<String, String>> taskEntry : taskConfigs.entrySet()) {
-            ConnectorTaskId taskId = taskEntry.getKey();
-            TaskConfig config = new TaskConfig(taskEntry.getValue());
+    private void createConnectorTasks(String connName) {
+        ConnectorState state = connectors.get(connName);
+        int index = 0;
+        for (Map<String, String> taskConfigMap : state.taskConfigs) {
+            ConnectorTaskId taskId = new ConnectorTaskId(connName, index);
+            TaskConfig config = new TaskConfig(taskConfigMap);
             try {
                 worker.addTask(taskId, config);
-                // We only need to store the task IDs so we can clean up.
-                state.tasks.add(taskId);
             } catch (Throwable e) {
                 log.error("Failed to add task {}: ", taskId, e);
                 // Swallow this so we can continue updating the rest of the tasks
                 // FIXME what's the proper response? Kill all the tasks? Consider this the same as a task
                 // that died after starting successfully.
             }
+            index++;
         }
     }
 
     private void removeConnectorTasks(String connName) {
         ConnectorState state = connectors.get(connName);
-        Iterator<ConnectorTaskId> taskIter = state.tasks.iterator();
-        while (taskIter.hasNext()) {
-            ConnectorTaskId taskId = taskIter.next();
+        for (int i = 0; i < state.taskConfigs.size(); i++) {
+            ConnectorTaskId taskId = new ConnectorTaskId(connName, i);
             try {
                 worker.stopTask(taskId);
-                taskIter.remove();
             } catch (CopycatException e) {
                 log.error("Failed to stop task {}: ", taskId, e);
                 // Swallow this so we can continue stopping the rest of the tasks
                 // FIXME: Forcibly kill the task?
             }
         }
+        state.taskConfigs = new ArrayList<>();
     }
 
     private void updateConnectorTasks(String connName) {
-        removeConnectorTasks(connName);
-        createConnectorTasks(connName);
+        List<Map<String, String>> newTaskConfigs = recomputeTaskConfigs(connName);
+        ConnectorState state = connectors.get(connName);
+        if (!newTaskConfigs.equals(state.taskConfigs)) {
+            removeConnectorTasks(connName);
+            state.taskConfigs = newTaskConfigs;
+            createConnectorTasks(connName);
+        }
     }
 
 
     private static class ConnectorState {
+        public String name;
+        public Map<String, String> configOriginals;
         public ConnectorConfig config;
-        Set<ConnectorTaskId> tasks;
+        List<Map<String, String>> taskConfigs;
 
-        public ConnectorState(ConnectorConfig config) {
+        public ConnectorState(Map<String, String> configOriginals, ConnectorConfig config) {
+            this.name = config.getString(ConnectorConfig.NAME_CONFIG);
+            this.configOriginals = configOriginals;
             this.config = config;
-            this.tasks = new HashSet<>();
+            this.taskConfigs = new ArrayList<>();
         }
     }
 }

http://git-wip-us.apache.org/repos/asf/kafka/blob/c001b204/copycat/runtime/src/main/java/org/apache/kafka/copycat/util/ConnectorTaskId.java
----------------------------------------------------------------------
diff --git a/copycat/runtime/src/main/java/org/apache/kafka/copycat/util/ConnectorTaskId.java b/copycat/runtime/src/main/java/org/apache/kafka/copycat/util/ConnectorTaskId.java
index e3e498c..d4cf824 100644
--- a/copycat/runtime/src/main/java/org/apache/kafka/copycat/util/ConnectorTaskId.java
+++ b/copycat/runtime/src/main/java/org/apache/kafka/copycat/util/ConnectorTaskId.java
@@ -17,6 +17,9 @@
 
 package org.apache.kafka.copycat.util;
 
+import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonProperty;
+
 import java.io.Serializable;
 
 /**
@@ -27,15 +30,18 @@ public class ConnectorTaskId implements Serializable, Comparable<ConnectorTaskId
     private final String connector;
     private final int task;
 
-    public ConnectorTaskId(String job, int task) {
-        this.connector = job;
+    @JsonCreator
+    public ConnectorTaskId(@JsonProperty("connector") String connector, @JsonProperty("task") int task) {
+        this.connector = connector;
         this.task = task;
     }
 
+    @JsonProperty
     public String connector() {
         return connector;
     }
 
+    @JsonProperty
     public int task() {
         return task;
     }

http://git-wip-us.apache.org/repos/asf/kafka/blob/c001b204/copycat/runtime/src/main/java/org/apache/kafka/copycat/util/ConvertingFutureCallback.java
----------------------------------------------------------------------
diff --git a/copycat/runtime/src/main/java/org/apache/kafka/copycat/util/ConvertingFutureCallback.java b/copycat/runtime/src/main/java/org/apache/kafka/copycat/util/ConvertingFutureCallback.java
index 6bf3885..862adf9 100644
--- a/copycat/runtime/src/main/java/org/apache/kafka/copycat/util/ConvertingFutureCallback.java
+++ b/copycat/runtime/src/main/java/org/apache/kafka/copycat/util/ConvertingFutureCallback.java
@@ -70,7 +70,8 @@ public abstract class ConvertingFutureCallback<U, T> implements Callback<U>, Fut
     @Override
     public T get(long l, TimeUnit timeUnit)
             throws InterruptedException, ExecutionException, TimeoutException {
-        finishedLatch.await(l, timeUnit);
+        if (!finishedLatch.await(l, timeUnit))
+            throw new TimeoutException("Timed out waiting for future");
         return result();
     }
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/c001b204/copycat/runtime/src/main/java/org/apache/kafka/copycat/util/FutureCallback.java
----------------------------------------------------------------------
diff --git a/copycat/runtime/src/main/java/org/apache/kafka/copycat/util/FutureCallback.java b/copycat/runtime/src/main/java/org/apache/kafka/copycat/util/FutureCallback.java
index 61e04b6..269482c 100644
--- a/copycat/runtime/src/main/java/org/apache/kafka/copycat/util/FutureCallback.java
+++ b/copycat/runtime/src/main/java/org/apache/kafka/copycat/util/FutureCallback.java
@@ -23,6 +23,10 @@ public class FutureCallback<T> extends ConvertingFutureCallback<T, T> {
         super(underlying);
     }
 
+    public FutureCallback() {
+        super(null);
+    }
+
     @Override
     public T convert(T result) {
         return result;

http://git-wip-us.apache.org/repos/asf/kafka/blob/c001b204/copycat/runtime/src/test/java/org/apache/kafka/copycat/runtime/WorkerSinkTaskTest.java
----------------------------------------------------------------------
diff --git a/copycat/runtime/src/test/java/org/apache/kafka/copycat/runtime/WorkerSinkTaskTest.java b/copycat/runtime/src/test/java/org/apache/kafka/copycat/runtime/WorkerSinkTaskTest.java
index d33f846..e5e5b85 100644
--- a/copycat/runtime/src/test/java/org/apache/kafka/copycat/runtime/WorkerSinkTaskTest.java
+++ b/copycat/runtime/src/test/java/org/apache/kafka/copycat/runtime/WorkerSinkTaskTest.java
@@ -20,10 +20,10 @@ package org.apache.kafka.copycat.runtime;
 import org.apache.kafka.common.utils.Time;
 import org.apache.kafka.clients.consumer.*;
 import org.apache.kafka.common.TopicPartition;
-import org.apache.kafka.copycat.cli.WorkerConfig;
 import org.apache.kafka.copycat.data.Schema;
 import org.apache.kafka.copycat.data.SchemaAndValue;
 import org.apache.kafka.copycat.errors.CopycatException;
+import org.apache.kafka.copycat.runtime.standalone.StandaloneConfig;
 import org.apache.kafka.copycat.sink.SinkRecord;
 import org.apache.kafka.copycat.sink.SinkTask;
 import org.apache.kafka.copycat.sink.SinkTaskContext;
@@ -101,7 +101,7 @@ public class WorkerSinkTaskTest extends ThreadedTest {
         workerProps.setProperty("internal.value.converter", "org.apache.kafka.copycat.json.JsonConverter");
         workerProps.setProperty("internal.key.converter.schemas.enable", "false");
         workerProps.setProperty("internal.value.converter.schemas.enable", "false");
-        workerConfig = new WorkerConfig(workerProps);
+        workerConfig = new StandaloneConfig(workerProps);
         workerTask = PowerMock.createPartialMock(
                 WorkerSinkTask.class, new String[]{"createConsumer", "createWorkerThread"},
                 taskId, sinkTask, workerConfig, keyConverter, valueConverter, time);

http://git-wip-us.apache.org/repos/asf/kafka/blob/c001b204/copycat/runtime/src/test/java/org/apache/kafka/copycat/runtime/WorkerSourceTaskTest.java
----------------------------------------------------------------------
diff --git a/copycat/runtime/src/test/java/org/apache/kafka/copycat/runtime/WorkerSourceTaskTest.java b/copycat/runtime/src/test/java/org/apache/kafka/copycat/runtime/WorkerSourceTaskTest.java
index 13d5228..566391d 100644
--- a/copycat/runtime/src/test/java/org/apache/kafka/copycat/runtime/WorkerSourceTaskTest.java
+++ b/copycat/runtime/src/test/java/org/apache/kafka/copycat/runtime/WorkerSourceTaskTest.java
@@ -22,8 +22,8 @@ import org.apache.kafka.clients.producer.KafkaProducer;
 import org.apache.kafka.clients.producer.ProducerRecord;
 import org.apache.kafka.clients.producer.RecordMetadata;
 import org.apache.kafka.common.TopicPartition;
-import org.apache.kafka.copycat.cli.WorkerConfig;
 import org.apache.kafka.copycat.data.Schema;
+import org.apache.kafka.copycat.runtime.standalone.StandaloneConfig;
 import org.apache.kafka.copycat.source.SourceRecord;
 import org.apache.kafka.copycat.source.SourceTask;
 import org.apache.kafka.copycat.source.SourceTaskContext;
@@ -96,7 +96,7 @@ public class WorkerSourceTaskTest extends ThreadedTest {
         workerProps.setProperty("internal.value.converter", "org.apache.kafka.copycat.json.JsonConverter");
         workerProps.setProperty("internal.key.converter.schemas.enable", "false");
         workerProps.setProperty("internal.value.converter.schemas.enable", "false");
-        config = new WorkerConfig(workerProps);
+        config = new StandaloneConfig(workerProps);
         producerCallbacks = EasyMock.newCapture();
     }
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/c001b204/copycat/runtime/src/test/java/org/apache/kafka/copycat/runtime/WorkerTest.java
----------------------------------------------------------------------
diff --git a/copycat/runtime/src/test/java/org/apache/kafka/copycat/runtime/WorkerTest.java b/copycat/runtime/src/test/java/org/apache/kafka/copycat/runtime/WorkerTest.java
index 19e1462..05015a4 100644
--- a/copycat/runtime/src/test/java/org/apache/kafka/copycat/runtime/WorkerTest.java
+++ b/copycat/runtime/src/test/java/org/apache/kafka/copycat/runtime/WorkerTest.java
@@ -20,11 +20,11 @@ package org.apache.kafka.copycat.runtime;
 import org.apache.kafka.common.utils.Time;
 import org.apache.kafka.clients.producer.KafkaProducer;
 import org.apache.kafka.common.utils.Utils;
-import org.apache.kafka.copycat.cli.WorkerConfig;
 import org.apache.kafka.copycat.connector.Connector;
 import org.apache.kafka.copycat.connector.ConnectorContext;
 import org.apache.kafka.copycat.connector.Task;
 import org.apache.kafka.copycat.errors.CopycatException;
+import org.apache.kafka.copycat.runtime.standalone.StandaloneConfig;
 import org.apache.kafka.copycat.sink.SinkTask;
 import org.apache.kafka.copycat.source.SourceRecord;
 import org.apache.kafka.copycat.source.SourceTask;
@@ -77,7 +77,7 @@ public class WorkerTest extends ThreadedTest {
         workerProps.setProperty("internal.value.converter", "org.apache.kafka.copycat.json.JsonConverter");
         workerProps.setProperty("internal.key.converter.schemas.enable", "false");
         workerProps.setProperty("internal.value.converter.schemas.enable", "false");
-        config = new WorkerConfig(workerProps);
+        config = new StandaloneConfig(workerProps);
     }
 
     @Test
@@ -203,14 +203,14 @@ public class WorkerTest extends ThreadedTest {
         } catch (CopycatException e) {
             // expected
         }
-        Map<ConnectorTaskId, Map<String, String>> taskConfigs = worker.reconfigureConnectorTasks(CONNECTOR_ID, 2, Arrays.asList("foo", "bar"));
+        List<Map<String, String>> taskConfigs = worker.connectorTaskConfigs(CONNECTOR_ID, 2, Arrays.asList("foo", "bar"));
         Properties expectedTaskProps = new Properties();
         expectedTaskProps.setProperty("foo", "bar");
         expectedTaskProps.setProperty(TaskConfig.TASK_CLASS_CONFIG, TestSourceTask.class.getName());
         expectedTaskProps.setProperty(SinkTask.TOPICS_CONFIG, "foo,bar");
         assertEquals(2, taskConfigs.size());
-        assertEquals(expectedTaskProps, taskConfigs.get(new ConnectorTaskId(CONNECTOR_ID, 0)));
-        assertEquals(expectedTaskProps, taskConfigs.get(new ConnectorTaskId(CONNECTOR_ID, 1)));
+        assertEquals(expectedTaskProps, taskConfigs.get(0));
+        assertEquals(expectedTaskProps, taskConfigs.get(1));
         worker.stopConnector(CONNECTOR_ID);
         assertEquals(Collections.emptySet(), worker.connectorNames());
         // Nothing should be left, so this should effectively be a nop

http://git-wip-us.apache.org/repos/asf/kafka/blob/c001b204/copycat/runtime/src/test/java/org/apache/kafka/copycat/runtime/distributed/DistributedHerderTest.java
----------------------------------------------------------------------
diff --git a/copycat/runtime/src/test/java/org/apache/kafka/copycat/runtime/distributed/DistributedHerderTest.java b/copycat/runtime/src/test/java/org/apache/kafka/copycat/runtime/distributed/DistributedHerderTest.java
index c8b4874..8f28f5f 100644
--- a/copycat/runtime/src/test/java/org/apache/kafka/copycat/runtime/distributed/DistributedHerderTest.java
+++ b/copycat/runtime/src/test/java/org/apache/kafka/copycat/runtime/distributed/DistributedHerderTest.java
@@ -19,15 +19,22 @@ package org.apache.kafka.copycat.runtime.distributed;
 
 import org.apache.kafka.clients.CommonClientConfigs;
 import org.apache.kafka.copycat.connector.ConnectorContext;
+import org.apache.kafka.copycat.errors.AlreadyExistsException;
 import org.apache.kafka.copycat.runtime.ConnectorConfig;
+import org.apache.kafka.copycat.runtime.Herder;
 import org.apache.kafka.copycat.runtime.TaskConfig;
 import org.apache.kafka.copycat.runtime.Worker;
+import org.apache.kafka.copycat.runtime.WorkerConfig;
+import org.apache.kafka.copycat.runtime.rest.entities.ConnectorInfo;
+import org.apache.kafka.copycat.runtime.rest.entities.TaskInfo;
 import org.apache.kafka.copycat.source.SourceConnector;
 import org.apache.kafka.copycat.source.SourceTask;
 import org.apache.kafka.copycat.storage.KafkaConfigStorage;
 import org.apache.kafka.copycat.util.Callback;
 import org.apache.kafka.copycat.util.ConnectorTaskId;
+import org.apache.kafka.copycat.util.FutureCallback;
 import org.apache.kafka.copycat.util.TestFuture;
+import org.easymock.Capture;
 import org.easymock.EasyMock;
 import org.easymock.IAnswer;
 import org.junit.Before;
@@ -40,57 +47,86 @@ import org.powermock.core.classloader.annotations.PrepareForTest;
 import org.powermock.modules.junit4.PowerMockRunner;
 import org.powermock.reflect.Whitebox;
 
+import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collection;
 import java.util.Collections;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
+import java.util.Properties;
 import java.util.concurrent.TimeoutException;
 
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+
 @RunWith(PowerMockRunner.class)
 @PrepareForTest(DistributedHerder.class)
 @PowerMockIgnore("javax.management.*")
 public class DistributedHerderTest {
-    private static final Map<String, String> HERDER_CONFIG = new HashMap<>();
+    private static final Properties HERDER_CONFIG = new Properties();
     static {
         HERDER_CONFIG.put(KafkaConfigStorage.CONFIG_TOPIC_CONFIG, "config-topic");
         HERDER_CONFIG.put(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
-        HERDER_CONFIG.put(DistributedHerderConfig.GROUP_ID_CONFIG, "test-copycat-group");
+        HERDER_CONFIG.put(DistributedConfig.GROUP_ID_CONFIG, "test-copycat-group");
+        // The WorkerConfig base class has some required settings without defaults
+        HERDER_CONFIG.put(WorkerConfig.KEY_CONVERTER_CLASS_CONFIG, "org.apache.kafka.copycat.json.JsonConverter");
+        HERDER_CONFIG.put(WorkerConfig.VALUE_CONVERTER_CLASS_CONFIG, "org.apache.kafka.copycat.json.JsonConverter");
+        HERDER_CONFIG.put(WorkerConfig.INTERNAL_KEY_CONVERTER_CLASS_CONFIG, "org.apache.kafka.copycat.json.JsonConverter");
+        HERDER_CONFIG.put(WorkerConfig.INTERNAL_VALUE_CONVERTER_CLASS_CONFIG, "org.apache.kafka.copycat.json.JsonConverter");
     }
+    private static final String MEMBER_URL = "memberUrl";
 
     private static final String CONN1 = "sourceA";
-    private static final String CONN2 = "sourceA";
+    private static final String CONN2 = "sourceB";
     private static final ConnectorTaskId TASK0 = new ConnectorTaskId(CONN1, 0);
     private static final ConnectorTaskId TASK1 = new ConnectorTaskId(CONN1, 1);
     private static final ConnectorTaskId TASK2 = new ConnectorTaskId(CONN1, 2);
     private static final Integer MAX_TASKS = 3;
-    private static final Map<String, String> CONNECTOR_CONFIG = new HashMap<>();
+    private static final Map<String, String> CONN1_CONFIG = new HashMap<>();
+    static {
+        CONN1_CONFIG.put(ConnectorConfig.NAME_CONFIG, CONN1);
+        CONN1_CONFIG.put(ConnectorConfig.TASKS_MAX_CONFIG, MAX_TASKS.toString());
+        CONN1_CONFIG.put(ConnectorConfig.TOPICS_CONFIG, "foo,bar");
+        CONN1_CONFIG.put(ConnectorConfig.CONNECTOR_CLASS_CONFIG, BogusSourceConnector.class.getName());
+    }
+    private static final Map<String, String> CONN1_CONFIG_UPDATED = new HashMap<>(CONN1_CONFIG);
     static {
-        CONNECTOR_CONFIG.put(ConnectorConfig.NAME_CONFIG, "sourceA");
-        CONNECTOR_CONFIG.put(ConnectorConfig.TASKS_MAX_CONFIG, MAX_TASKS.toString());
-        CONNECTOR_CONFIG.put(ConnectorConfig.TOPICS_CONFIG, "foo,bar");
-        CONNECTOR_CONFIG.put(ConnectorConfig.CONNECTOR_CLASS_CONFIG, BogusSourceConnector.class.getName());
+        CONN1_CONFIG_UPDATED.put(ConnectorConfig.TOPICS_CONFIG, "foo,bar,baz");
+    }
+    private static final Map<String, String> CONN2_CONFIG = new HashMap<>();
+    static {
+        CONN2_CONFIG.put(ConnectorConfig.NAME_CONFIG, CONN2);
+        CONN2_CONFIG.put(ConnectorConfig.TASKS_MAX_CONFIG, MAX_TASKS.toString());
+        CONN2_CONFIG.put(ConnectorConfig.TOPICS_CONFIG, "foo,bar");
+        CONN2_CONFIG.put(ConnectorConfig.CONNECTOR_CLASS_CONFIG, BogusSourceConnector.class.getName());
     }
     private static final Map<String, String> TASK_CONFIG = new HashMap<>();
     static {
         TASK_CONFIG.put(TaskConfig.TASK_CLASS_CONFIG, BogusSourceTask.class.getName());
     }
-    private static final HashMap<ConnectorTaskId, Map<String, String>> TASK_CONFIGS = new HashMap<>();
+    private static final List<Map<String, String>> TASK_CONFIGS = new ArrayList<>();
     static {
-        TASK_CONFIGS.put(TASK0, TASK_CONFIG);
-        TASK_CONFIGS.put(TASK1, TASK_CONFIG);
-        TASK_CONFIGS.put(TASK2, TASK_CONFIG);
+        TASK_CONFIGS.add(TASK_CONFIG);
+        TASK_CONFIGS.add(TASK_CONFIG);
+        TASK_CONFIGS.add(TASK_CONFIG);
+    }
+    private static final HashMap<ConnectorTaskId, Map<String, String>> TASK_CONFIGS_MAP = new HashMap<>();
+    static {
+        TASK_CONFIGS_MAP.put(TASK0, TASK_CONFIG);
+        TASK_CONFIGS_MAP.put(TASK1, TASK_CONFIG);
+        TASK_CONFIGS_MAP.put(TASK2, TASK_CONFIG);
     }
     private static final ClusterConfigState SNAPSHOT = new ClusterConfigState(1, Collections.singletonMap(CONN1, 3),
-            Collections.singletonMap(CONN1, CONNECTOR_CONFIG), TASK_CONFIGS, Collections.<String>emptySet());
+            Collections.singletonMap(CONN1, CONN1_CONFIG), TASK_CONFIGS_MAP, Collections.<String>emptySet());
+    private static final ClusterConfigState SNAPSHOT_UPDATED_CONN1_CONFIG = new ClusterConfigState(1, Collections.singletonMap(CONN1, 3),
+            Collections.singletonMap(CONN1, CONN1_CONFIG_UPDATED), TASK_CONFIGS_MAP, Collections.<String>emptySet());
 
     @Mock private KafkaConfigStorage configStorage;
     @Mock private WorkerGroupMember member;
     private DistributedHerder herder;
     @Mock private Worker worker;
-    @Mock private Callback<String> createCallback;
-    @Mock private Callback<Void> destroyCallback;
+    @Mock private Callback<Herder.Created<ConnectorInfo>> putConnectorCallback;
 
     private Callback<String> connectorConfigCallback;
     private Callback<List<ConnectorTaskId>> taskConfigCallback;
@@ -101,7 +137,7 @@ public class DistributedHerderTest {
         worker = PowerMock.createMock(Worker.class);
 
         herder = PowerMock.createPartialMock(DistributedHerder.class, new String[]{"backoff"},
-                worker, HERDER_CONFIG, configStorage, member);
+                new DistributedConfig(HERDER_CONFIG), worker, configStorage, member, MEMBER_URL);
         connectorConfigCallback = Whitebox.invokeMethod(herder, "connectorConfigCallback");
         taskConfigCallback = Whitebox.invokeMethod(herder, "taskConfigCallback");
         rebalanceListener = Whitebox.invokeMethod(herder, "rebalanceListener");
@@ -115,7 +151,7 @@ public class DistributedHerderTest {
         expectPostRebalanceCatchup(SNAPSHOT);
         worker.addConnector(EasyMock.<ConnectorConfig>anyObject(), EasyMock.<ConnectorContext>anyObject());
         PowerMock.expectLastCall();
-        EasyMock.expect(worker.reconfigureConnectorTasks(CONN1, MAX_TASKS, null)).andReturn(TASK_CONFIGS);
+        EasyMock.expect(worker.connectorTaskConfigs(CONN1, MAX_TASKS, null)).andReturn(TASK_CONFIGS);
         worker.addTask(EasyMock.eq(TASK1), EasyMock.<TaskConfig>anyObject());
         PowerMock.expectLastCall();
         member.poll(EasyMock.anyInt());
@@ -156,9 +192,11 @@ public class DistributedHerderTest {
 
         member.wakeup();
         PowerMock.expectLastCall();
-        configStorage.putConnectorConfig(CONN1, CONNECTOR_CONFIG);
+        // CONN2 is new, should succeed
+        configStorage.putConnectorConfig(CONN2, CONN2_CONFIG);
         PowerMock.expectLastCall();
-        createCallback.onCompletion(null, CONN1);
+        ConnectorInfo info = new ConnectorInfo(CONN2, CONN2_CONFIG, Collections.<ConnectorTaskId>emptyList());
+        putConnectorCallback.onCompletion(null, new Herder.Created<>(true, info));
         PowerMock.expectLastCall();
         member.poll(EasyMock.anyInt());
         PowerMock.expectLastCall();
@@ -166,7 +204,30 @@ public class DistributedHerderTest {
 
         PowerMock.replayAll();
 
-        herder.addConnector(CONNECTOR_CONFIG, createCallback);
+        herder.putConnectorConfig(CONN2, CONN2_CONFIG, false, putConnectorCallback);
+        herder.tick();
+
+        PowerMock.verifyAll();
+    }
+
+    @Test
+    public void testCreateConnectorAlreadyExists() throws Exception {
+        EasyMock.expect(member.memberId()).andStubReturn("leader");
+        expectRebalance(1, Collections.<String>emptyList(), Collections.<ConnectorTaskId>emptyList());
+        expectPostRebalanceCatchup(SNAPSHOT);
+
+        member.wakeup();
+        PowerMock.expectLastCall();
+        // CONN1 already exists
+        putConnectorCallback.onCompletion(EasyMock.<AlreadyExistsException>anyObject(), EasyMock.<Herder.Created<ConnectorInfo>>isNull());
+        PowerMock.expectLastCall();
+        member.poll(EasyMock.anyInt());
+        PowerMock.expectLastCall();
+        // No immediate action besides this -- change will be picked up via the config log
+
+        PowerMock.replayAll();
+
+        herder.putConnectorConfig(CONN1, CONN1_CONFIG, false, putConnectorCallback);
         herder.tick();
 
         PowerMock.verifyAll();
@@ -180,14 +241,14 @@ public class DistributedHerderTest {
         expectPostRebalanceCatchup(SNAPSHOT);
         worker.addConnector(EasyMock.<ConnectorConfig>anyObject(), EasyMock.<ConnectorContext>anyObject());
         PowerMock.expectLastCall();
-        EasyMock.expect(worker.reconfigureConnectorTasks(CONN1, MAX_TASKS, null)).andReturn(TASK_CONFIGS);
+        EasyMock.expect(worker.connectorTaskConfigs(CONN1, MAX_TASKS, null)).andReturn(TASK_CONFIGS);
 
         // And delete the connector
         member.wakeup();
         PowerMock.expectLastCall();
         configStorage.putConnectorConfig(CONN1, null);
         PowerMock.expectLastCall();
-        destroyCallback.onCompletion(null, null);
+        putConnectorCallback.onCompletion(null, new Herder.Created<ConnectorInfo>(false, null));
         PowerMock.expectLastCall();
         member.poll(EasyMock.anyInt());
         PowerMock.expectLastCall();
@@ -195,7 +256,7 @@ public class DistributedHerderTest {
 
         PowerMock.replayAll();
 
-        herder.deleteConnector(CONN1, destroyCallback);
+        herder.putConnectorConfig(CONN1, null, true, putConnectorCallback);
         herder.tick();
 
         PowerMock.verifyAll();
@@ -224,7 +285,7 @@ public class DistributedHerderTest {
                 CopycatProtocol.Assignment.NO_ERROR, 1, Arrays.asList(CONN1), Collections.<ConnectorTaskId>emptyList());
         worker.addConnector(EasyMock.<ConnectorConfig>anyObject(), EasyMock.<ConnectorContext>anyObject());
         PowerMock.expectLastCall();
-        EasyMock.expect(worker.reconfigureConnectorTasks(CONN1, MAX_TASKS, null)).andReturn(TASK_CONFIGS);
+        EasyMock.expect(worker.connectorTaskConfigs(CONN1, MAX_TASKS, null)).andReturn(TASK_CONFIGS);
         member.poll(EasyMock.anyInt());
         PowerMock.expectLastCall();
 
@@ -250,7 +311,7 @@ public class DistributedHerderTest {
         expectPostRebalanceCatchup(SNAPSHOT);
         worker.addConnector(EasyMock.<ConnectorConfig>anyObject(), EasyMock.<ConnectorContext>anyObject());
         PowerMock.expectLastCall();
-        EasyMock.expect(worker.reconfigureConnectorTasks(CONN1, MAX_TASKS, null)).andReturn(TASK_CONFIGS);
+        EasyMock.expect(worker.connectorTaskConfigs(CONN1, MAX_TASKS, null)).andReturn(TASK_CONFIGS);
         member.poll(EasyMock.anyInt());
         PowerMock.expectLastCall();
 
@@ -263,7 +324,7 @@ public class DistributedHerderTest {
         PowerMock.expectLastCall();
         worker.addConnector(EasyMock.<ConnectorConfig>anyObject(), EasyMock.<ConnectorContext>anyObject());
         PowerMock.expectLastCall();
-        EasyMock.expect(worker.reconfigureConnectorTasks(CONN1, MAX_TASKS, null)).andReturn(TASK_CONFIGS);
+        EasyMock.expect(worker.connectorTaskConfigs(CONN1, MAX_TASKS, null)).andReturn(TASK_CONFIGS);
         member.poll(EasyMock.anyInt());
         PowerMock.expectLastCall();
 
@@ -322,7 +383,7 @@ public class DistributedHerderTest {
         TestFuture<Void> readToEndFuture = new TestFuture<>();
         readToEndFuture.resolveOnGet(new TimeoutException());
         EasyMock.expect(configStorage.readToEnd()).andReturn(readToEndFuture);
-        PowerMock.expectPrivate(herder, "backoff", DistributedHerderConfig.WORKER_UNSYNC_BACKOFF_MS_DEFAULT);
+        PowerMock.expectPrivate(herder, "backoff", DistributedConfig.WORKER_UNSYNC_BACKOFF_MS_DEFAULT);
         member.requestRejoin();
 
         // After backoff, restart the process and this time succeed
@@ -331,7 +392,7 @@ public class DistributedHerderTest {
 
         worker.addConnector(EasyMock.<ConnectorConfig>anyObject(), EasyMock.<ConnectorContext>anyObject());
         PowerMock.expectLastCall();
-        EasyMock.expect(worker.reconfigureConnectorTasks(CONN1, MAX_TASKS, null)).andReturn(TASK_CONFIGS);
+        EasyMock.expect(worker.connectorTaskConfigs(CONN1, MAX_TASKS, null)).andReturn(TASK_CONFIGS);
         worker.addTask(EasyMock.eq(TASK1), EasyMock.<TaskConfig>anyObject());
         PowerMock.expectLastCall();
         member.poll(EasyMock.anyInt());
@@ -346,6 +407,123 @@ public class DistributedHerderTest {
     }
 
     @Test
+    public void testAccessors() throws Exception {
+        EasyMock.expect(member.memberId()).andStubReturn("leader");
+        expectRebalance(1, Collections.<String>emptyList(), Collections.<ConnectorTaskId>emptyList());
+        expectPostRebalanceCatchup(SNAPSHOT);
+
+
+        member.wakeup();
+        PowerMock.expectLastCall().anyTimes();
+        // list connectors, get connector info, get connector config, get task configs
+        member.poll(EasyMock.anyInt());
+        PowerMock.expectLastCall();
+
+
+        PowerMock.replayAll();
+
+        FutureCallback<Collection<String>> listConnectorsCb = new FutureCallback<>();
+        herder.connectors(listConnectorsCb);
+        FutureCallback<ConnectorInfo> connectorInfoCb = new FutureCallback<>();
+        herder.connectorInfo(CONN1, connectorInfoCb);
+        FutureCallback<Map<String, String>> connectorConfigCb = new FutureCallback<>();
+        herder.connectorConfig(CONN1, connectorConfigCb);
+        FutureCallback<List<TaskInfo>> taskConfigsCb = new FutureCallback<>();
+        herder.taskConfigs(CONN1, taskConfigsCb);
+
+        herder.tick();
+        assertTrue(listConnectorsCb.isDone());
+        assertEquals(Collections.singleton(CONN1), listConnectorsCb.get());
+        assertTrue(connectorInfoCb.isDone());
+        ConnectorInfo info = new ConnectorInfo(CONN1, CONN1_CONFIG, Arrays.asList(TASK0, TASK1, TASK2));
+        assertEquals(info, connectorInfoCb.get());
+        assertTrue(connectorConfigCb.isDone());
+        assertEquals(CONN1_CONFIG, connectorConfigCb.get());
+        assertTrue(taskConfigsCb.isDone());
+        assertEquals(Arrays.asList(
+                        new TaskInfo(TASK0, TASK_CONFIG),
+                        new TaskInfo(TASK1, TASK_CONFIG),
+                        new TaskInfo(TASK2, TASK_CONFIG)),
+                taskConfigsCb.get());
+
+        PowerMock.verifyAll();
+    }
+
+    @Test
+    public void testPutConnectorConfig() throws Exception {
+        EasyMock.expect(member.memberId()).andStubReturn("leader");
+        expectRebalance(1, Arrays.asList(CONN1), Collections.<ConnectorTaskId>emptyList());
+        expectPostRebalanceCatchup(SNAPSHOT);
+        worker.addConnector(EasyMock.<ConnectorConfig>anyObject(), EasyMock.<ConnectorContext>anyObject());
+        PowerMock.expectLastCall();
+        EasyMock.expect(worker.connectorTaskConfigs(CONN1, MAX_TASKS, null)).andReturn(TASK_CONFIGS);
+
+        // list connectors, get connector info, get connector config, get task configs
+        member.wakeup();
+        PowerMock.expectLastCall().anyTimes();
+        member.poll(EasyMock.anyInt());
+        PowerMock.expectLastCall();
+
+        // Poll loop for second round of calls
+        member.ensureActive();
+        PowerMock.expectLastCall();
+        configStorage.putConnectorConfig(CONN1, CONN1_CONFIG_UPDATED);
+        PowerMock.expectLastCall().andAnswer(new IAnswer<Object>() {
+            @Override
+            public Object answer() throws Throwable {
+                // Simulate response to writing config + waiting until end of log to be read
+                connectorConfigCallback.onCompletion(null, CONN1);
+                return null;
+            }
+        });
+        // As a result of reconfig, should need to update snapshot. With only connector updates, we'll just restart
+        // connector without rebalance
+        EasyMock.expect(configStorage.snapshot()).andReturn(SNAPSHOT_UPDATED_CONN1_CONFIG);
+        worker.stopConnector(CONN1);
+        PowerMock.expectLastCall();
+        Capture<ConnectorConfig> capturedUpdatedConfig = EasyMock.newCapture();
+        worker.addConnector(EasyMock.capture(capturedUpdatedConfig), EasyMock.<ConnectorContext>anyObject());
+        PowerMock.expectLastCall();
+        EasyMock.expect(worker.connectorTaskConfigs(CONN1, MAX_TASKS, null)).andReturn(TASK_CONFIGS);
+        member.poll(EasyMock.anyInt());
+        PowerMock.expectLastCall();
+
+        // Third tick just to read the config
+        member.ensureActive();
+        PowerMock.expectLastCall();
+        member.poll(EasyMock.anyInt());
+        PowerMock.expectLastCall();
+
+        PowerMock.replayAll();
+
+        // Should pick up original config
+        FutureCallback<Map<String, String>> connectorConfigCb = new FutureCallback<>();
+        herder.connectorConfig(CONN1, connectorConfigCb);
+        herder.tick();
+        assertTrue(connectorConfigCb.isDone());
+        assertEquals(CONN1_CONFIG, connectorConfigCb.get());
+
+        // Apply new config.
+        FutureCallback<Herder.Created<ConnectorInfo>> putConfigCb = new FutureCallback<>();
+        herder.putConnectorConfig(CONN1, CONN1_CONFIG_UPDATED, true, putConfigCb);
+        herder.tick();
+        assertTrue(putConfigCb.isDone());
+        ConnectorInfo updatedInfo = new ConnectorInfo(CONN1, CONN1_CONFIG_UPDATED, Arrays.asList(TASK0, TASK1, TASK2));
+        assertEquals(new Herder.Created<>(false, updatedInfo), putConfigCb.get());
+
+        // Check config again to validate change
+        connectorConfigCb = new FutureCallback<>();
+        herder.connectorConfig(CONN1, connectorConfigCb);
+        herder.tick();
+        assertTrue(connectorConfigCb.isDone());
+        assertEquals(CONN1_CONFIG_UPDATED, connectorConfigCb.get());
+        // The config passed to Worker should
+        assertEquals(Arrays.asList("foo", "bar", "baz"),
+                capturedUpdatedConfig.getValue().getList(ConnectorConfig.TOPICS_CONFIG));
+        PowerMock.verifyAll();
+    }
+
+    @Test
     public void testInconsistentConfigs() throws Exception {
         // FIXME: if we have inconsistent configs, we need to request forced reconfig + write of the connector's task configs
         // This requires inter-worker communication, so needs the REST API
@@ -366,7 +544,7 @@ public class DistributedHerderTest {
                 if (revokedConnectors != null)
                     rebalanceListener.onRevoked("leader", revokedConnectors, revokedTasks);
                 CopycatProtocol.Assignment assignment = new CopycatProtocol.Assignment(
-                        error, "leader", offset, assignedConnectors, assignedTasks);
+                        error, "leader", "leaderUrl", offset, assignedConnectors, assignedTasks);
                 rebalanceListener.onAssigned(assignment);
                 return null;
             }

http://git-wip-us.apache.org/repos/asf/kafka/blob/c001b204/copycat/runtime/src/test/java/org/apache/kafka/copycat/runtime/distributed/WorkerCoordinatorTest.java
----------------------------------------------------------------------
diff --git a/copycat/runtime/src/test/java/org/apache/kafka/copycat/runtime/distributed/WorkerCoordinatorTest.java b/copycat/runtime/src/test/java/org/apache/kafka/copycat/runtime/distributed/WorkerCoordinatorTest.java
index 2278045..ca53674 100644
--- a/copycat/runtime/src/test/java/org/apache/kafka/copycat/runtime/distributed/WorkerCoordinatorTest.java
+++ b/copycat/runtime/src/test/java/org/apache/kafka/copycat/runtime/distributed/WorkerCoordinatorTest.java
@@ -55,6 +55,9 @@ import static org.junit.Assert.assertFalse;
 
 public class WorkerCoordinatorTest {
 
+    private static final String LEADER_URL = "leaderUrl:8083";
+    private static final String MEMBER_URL = "memberUrl:8083";
+
     private String connectorId = "connector";
     private String connectorId2 = "connector2";
     private ConnectorTaskId taskId0 = new ConnectorTaskId(connectorId, 0);
@@ -104,6 +107,7 @@ public class WorkerCoordinatorTest {
                 time,
                 requestTimeoutMs,
                 retryBackoffMs,
+                LEADER_URL,
                 configStorage,
                 rebalanceListener);
 
@@ -147,7 +151,7 @@ public class WorkerCoordinatorTest {
 
         LinkedHashMap<String, ByteBuffer> serialized = coordinator.metadata();
         assertEquals(1, serialized.size());
-        CopycatProtocol.ConfigState state = CopycatProtocol.deserializeMetadata(serialized.get(WorkerCoordinator.DEFAULT_SUBPROTOCOL));
+        CopycatProtocol.WorkerState state = CopycatProtocol.deserializeMetadata(serialized.get(WorkerCoordinator.DEFAULT_SUBPROTOCOL));
         assertEquals(1, state.offset());
 
         PowerMock.verifyAll();
@@ -322,8 +326,8 @@ public class WorkerCoordinatorTest {
 
         Map<String, ByteBuffer> configs = new HashMap<>();
         // Mark everyone as in sync with configState1
-        configs.put("leader", CopycatProtocol.serializeMetadata(new CopycatProtocol.ConfigState(1L)));
-        configs.put("member", CopycatProtocol.serializeMetadata(new CopycatProtocol.ConfigState(1L)));
+        configs.put("leader", CopycatProtocol.serializeMetadata(new CopycatProtocol.WorkerState(LEADER_URL, 1L)));
+        configs.put("member", CopycatProtocol.serializeMetadata(new CopycatProtocol.WorkerState(MEMBER_URL, 1L)));
         Map<String, ByteBuffer> result = Whitebox.invokeMethod(coordinator, "performAssignment", "leader", WorkerCoordinator.DEFAULT_SUBPROTOCOL, configs);
 
         // configState1 has 1 connector, 1 task
@@ -358,8 +362,8 @@ public class WorkerCoordinatorTest {
 
         Map<String, ByteBuffer> configs = new HashMap<>();
         // Mark everyone as in sync with configState1
-        configs.put("leader", CopycatProtocol.serializeMetadata(new CopycatProtocol.ConfigState(1L)));
-        configs.put("member", CopycatProtocol.serializeMetadata(new CopycatProtocol.ConfigState(1L)));
+        configs.put("leader", CopycatProtocol.serializeMetadata(new CopycatProtocol.WorkerState(LEADER_URL, 1L)));
+        configs.put("member", CopycatProtocol.serializeMetadata(new CopycatProtocol.WorkerState(MEMBER_URL, 1L)));
         Map<String, ByteBuffer> result = Whitebox.invokeMethod(coordinator, "performAssignment", "leader", WorkerCoordinator.DEFAULT_SUBPROTOCOL, configs);
 
         // configState2 has 2 connector, 3 tasks and should trigger round robin assignment
@@ -390,7 +394,10 @@ public class WorkerCoordinatorTest {
                                            Map<String, Long> configOffsets, short error) {
         Map<String, ByteBuffer> metadata = new HashMap<>();
         for (Map.Entry<String, Long> configStateEntry : configOffsets.entrySet()) {
-            ByteBuffer buf = CopycatProtocol.serializeMetadata(new CopycatProtocol.ConfigState(configStateEntry.getValue()));
+            // We need a member URL, but it doesn't matter for the purposes of this test. Just set it to the member ID
+            String memberUrl = configStateEntry.getKey();
+            long configOffset = configStateEntry.getValue();
+            ByteBuffer buf = CopycatProtocol.serializeMetadata(new CopycatProtocol.WorkerState(memberUrl, configOffset));
             metadata.put(configStateEntry.getKey(), buf);
         }
         return new JoinGroupResponse(error, generationId, WorkerCoordinator.DEFAULT_SUBPROTOCOL, memberId, memberId, metadata).toStruct();
@@ -403,7 +410,7 @@ public class WorkerCoordinatorTest {
 
     private Struct syncGroupResponse(short assignmentError, String leader, long configOffset, List<String> connectorIds,
                                      List<ConnectorTaskId> taskIds, short error) {
-        CopycatProtocol.Assignment assignment = new CopycatProtocol.Assignment(assignmentError, leader, configOffset, connectorIds, taskIds);
+        CopycatProtocol.Assignment assignment = new CopycatProtocol.Assignment(assignmentError, leader, LEADER_URL, configOffset, connectorIds, taskIds);
         ByteBuffer buf = CopycatProtocol.serializeAssignment(assignment);
         return new SyncGroupResponse(error, buf).toStruct();
     }

http://git-wip-us.apache.org/repos/asf/kafka/blob/c001b204/copycat/runtime/src/test/java/org/apache/kafka/copycat/runtime/rest/resources/ConnectorsResourceTest.java
----------------------------------------------------------------------
diff --git a/copycat/runtime/src/test/java/org/apache/kafka/copycat/runtime/rest/resources/ConnectorsResourceTest.java b/copycat/runtime/src/test/java/org/apache/kafka/copycat/runtime/rest/resources/ConnectorsResourceTest.java
new file mode 100644
index 0000000..c987092
--- /dev/null
+++ b/copycat/runtime/src/test/java/org/apache/kafka/copycat/runtime/rest/resources/ConnectorsResourceTest.java
@@ -0,0 +1,364 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ **/
+
+package org.apache.kafka.copycat.runtime.rest.resources;
+
+import com.fasterxml.jackson.core.type.TypeReference;
+import org.apache.kafka.copycat.errors.AlreadyExistsException;
+import org.apache.kafka.copycat.errors.CopycatException;
+import org.apache.kafka.copycat.errors.NotFoundException;
+import org.apache.kafka.copycat.runtime.ConnectorConfig;
+import org.apache.kafka.copycat.runtime.Herder;
+import org.apache.kafka.copycat.runtime.distributed.NotLeaderException;
+import org.apache.kafka.copycat.runtime.rest.RestServer;
+import org.apache.kafka.copycat.runtime.rest.entities.ConnectorInfo;
+import org.apache.kafka.copycat.runtime.rest.entities.CreateConnectorRequest;
+import org.apache.kafka.copycat.runtime.rest.entities.TaskInfo;
+import org.apache.kafka.copycat.util.Callback;
+import org.apache.kafka.copycat.util.ConnectorTaskId;
+import org.easymock.Capture;
+import org.easymock.EasyMock;
+import org.easymock.IAnswer;
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.powermock.api.easymock.PowerMock;
+import org.powermock.api.easymock.annotation.Mock;
+import org.powermock.core.classloader.annotations.PowerMockIgnore;
+import org.powermock.core.classloader.annotations.PrepareForTest;
+import org.powermock.modules.junit4.PowerMockRunner;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+
+import static org.junit.Assert.assertEquals;
+
+@RunWith(PowerMockRunner.class)
+@PrepareForTest(RestServer.class)
+@PowerMockIgnore("javax.management.*")
+public class ConnectorsResourceTest {
+    // Note trailing / and that we do *not* use LEADER_URL to construct our reference values. This checks that we handle
+    // URL construction properly, avoiding //, which will mess up routing in the REST server
+    private static final String LEADER_URL = "http://leader:8083/";
+    private static final String CONNECTOR_NAME = "test";
+    private static final String CONNECTOR2_NAME = "test2";
+    private static final Map<String, String> CONNECTOR_CONFIG = new HashMap<>();
+    static {
+        CONNECTOR_CONFIG.put("name", CONNECTOR_NAME);
+        CONNECTOR_CONFIG.put("sample_config", "test_config");
+    }
+    private static final List<ConnectorTaskId> CONNECTOR_TASK_NAMES = Arrays.asList(
+            new ConnectorTaskId(CONNECTOR_NAME, 0),
+            new ConnectorTaskId(CONNECTOR_NAME, 1)
+    );
+    private static final List<Map<String, String>> TASK_CONFIGS = new ArrayList<>();
+    static {
+        TASK_CONFIGS.add(Collections.singletonMap("config", "value"));
+        TASK_CONFIGS.add(Collections.singletonMap("config", "other_value"));
+    }
+    private static final List<TaskInfo> TASK_INFOS = new ArrayList<>();
+    static {
+        TASK_INFOS.add(new TaskInfo(new ConnectorTaskId(CONNECTOR_NAME, 0), TASK_CONFIGS.get(0)));
+        TASK_INFOS.add(new TaskInfo(new ConnectorTaskId(CONNECTOR_NAME, 1), TASK_CONFIGS.get(1)));
+    }
+
+
+    @Mock
+    private Herder herder;
+    private ConnectorsResource connectorsResource;
+
+    @Before
+    public void setUp() throws NoSuchMethodException {
+        PowerMock.mockStatic(RestServer.class,
+                RestServer.class.getMethod("httpRequest", String.class, String.class, Object.class, TypeReference.class));
+        connectorsResource = new ConnectorsResource(herder);
+    }
+
+    @Test
+    public void testListConnectors() throws Throwable {
+        final Capture<Callback<Collection<String>>> cb = Capture.newInstance();
+        herder.connectors(EasyMock.capture(cb));
+        expectAndCallbackResult(cb, Arrays.asList(CONNECTOR2_NAME, CONNECTOR_NAME));
+
+        PowerMock.replayAll();
+
+        Collection<String> connectors = connectorsResource.listConnectors();
+        // Ordering isn't guaranteed, compare sets
+        assertEquals(new HashSet<>(Arrays.asList(CONNECTOR_NAME, CONNECTOR2_NAME)), new HashSet<>(connectors));
+
+        PowerMock.verifyAll();
+    }
+
+    @Test
+    public void testListConnectorsNotLeader() throws Throwable {
+        final Capture<Callback<Collection<String>>> cb = Capture.newInstance();
+        herder.connectors(EasyMock.capture(cb));
+        expectAndCallbackNotLeaderException(cb);
+        // Should forward request
+        EasyMock.expect(RestServer.httpRequest(EasyMock.eq("http://leader:8083/connectors"), EasyMock.eq("GET"),
+                EasyMock.isNull(), EasyMock.anyObject(TypeReference.class)))
+                .andReturn(new RestServer.HttpResponse<>(200, new HashMap<String, List<String>>(), Arrays.asList(CONNECTOR2_NAME, CONNECTOR_NAME)));
+
+        PowerMock.replayAll();
+
+        Collection<String> connectors = connectorsResource.listConnectors();
+        // Ordering isn't guaranteed, compare sets
+        assertEquals(new HashSet<>(Arrays.asList(CONNECTOR_NAME, CONNECTOR2_NAME)), new HashSet<>(connectors));
+
+        PowerMock.verifyAll();
+    }
+
+    @Test(expected = CopycatException.class)
+    public void testListConnectorsNotSynced() throws Throwable {
+        final Capture<Callback<Collection<String>>> cb = Capture.newInstance();
+        herder.connectors(EasyMock.capture(cb));
+        expectAndCallbackException(cb, new CopycatException("not synced"));
+
+        PowerMock.replayAll();
+
+        // throws
+        connectorsResource.listConnectors();
+    }
+
+    @Test
+    public void testCreateConnector() throws Throwable {
+        CreateConnectorRequest body = new CreateConnectorRequest(CONNECTOR_NAME, Collections.singletonMap(ConnectorConfig.NAME_CONFIG, CONNECTOR_NAME));
+
+        final Capture<Callback<Herder.Created<ConnectorInfo>>> cb = Capture.newInstance();
+        herder.putConnectorConfig(EasyMock.eq(CONNECTOR_NAME), EasyMock.eq(body.config()), EasyMock.eq(false), EasyMock.capture(cb));
+        expectAndCallbackResult(cb, new Herder.Created<>(true, new ConnectorInfo(CONNECTOR_NAME, CONNECTOR_CONFIG, CONNECTOR_TASK_NAMES)));
+
+        PowerMock.replayAll();
+
+        connectorsResource.createConnector(body);
+
+        PowerMock.verifyAll();
+    }
+
+    @Test
+    public void testCreateConnectorNotLeader() throws Throwable {
+        CreateConnectorRequest body = new CreateConnectorRequest(CONNECTOR_NAME, Collections.singletonMap(ConnectorConfig.NAME_CONFIG, CONNECTOR_NAME));
+
+        final Capture<Callback<Herder.Created<ConnectorInfo>>> cb = Capture.newInstance();
+        herder.putConnectorConfig(EasyMock.eq(CONNECTOR_NAME), EasyMock.eq(body.config()), EasyMock.eq(false), EasyMock.capture(cb));
+        expectAndCallbackNotLeaderException(cb);
+        // Should forward request
+        EasyMock.expect(RestServer.httpRequest(EasyMock.eq("http://leader:8083/connectors"), EasyMock.eq("POST"), EasyMock.eq(body), EasyMock.<TypeReference>anyObject()))
+                .andReturn(new RestServer.HttpResponse<>(201, new HashMap<String, List<String>>(), new ConnectorInfo(CONNECTOR_NAME, CONNECTOR_CONFIG, CONNECTOR_TASK_NAMES)));
+
+        PowerMock.replayAll();
+
+        connectorsResource.createConnector(body);
+
+        PowerMock.verifyAll();
+    }
+
+    @Test(expected = AlreadyExistsException.class)
+    public void testCreateConnectorExists() throws Throwable {
+        CreateConnectorRequest body = new CreateConnectorRequest(CONNECTOR_NAME, Collections.singletonMap(ConnectorConfig.NAME_CONFIG, CONNECTOR_NAME));
+
+        final Capture<Callback<Herder.Created<ConnectorInfo>>> cb = Capture.newInstance();
+        herder.putConnectorConfig(EasyMock.eq(CONNECTOR_NAME), EasyMock.eq(body.config()), EasyMock.eq(false), EasyMock.capture(cb));
+        expectAndCallbackException(cb, new AlreadyExistsException("already exists"));
+
+        PowerMock.replayAll();
+
+        connectorsResource.createConnector(body);
+
+        PowerMock.verifyAll();
+    }
+
+    @Test
+    public void testDeleteConnector() throws Throwable {
+        final Capture<Callback<Herder.Created<ConnectorInfo>>> cb = Capture.newInstance();
+        herder.putConnectorConfig(EasyMock.eq(CONNECTOR_NAME), EasyMock.<Map<String, String>>isNull(), EasyMock.eq(true), EasyMock.capture(cb));
+        expectAndCallbackResult(cb, null);
+
+        PowerMock.replayAll();
+
+        connectorsResource.destroyConnector(CONNECTOR_NAME);
+
+        PowerMock.verifyAll();
+    }
+
+    @Test
+    public void testDeleteConnectorNotLeader() throws Throwable {
+        final Capture<Callback<Herder.Created<ConnectorInfo>>> cb = Capture.newInstance();
+        herder.putConnectorConfig(EasyMock.eq(CONNECTOR_NAME), EasyMock.<Map<String, String>>isNull(), EasyMock.eq(true), EasyMock.capture(cb));
+        expectAndCallbackNotLeaderException(cb);
+        // Should forward request
+        EasyMock.expect(RestServer.httpRequest("http://leader:8083/connectors/" + CONNECTOR_NAME, "DELETE", null, null))
+                .andReturn(new RestServer.HttpResponse<>(204, new HashMap<String, List<String>>(), null));
+
+        PowerMock.replayAll();
+
+        connectorsResource.destroyConnector(CONNECTOR_NAME);
+
+        PowerMock.verifyAll();
+    }
+
+    // Not found exceptions should pass through to caller so they can be processed for 404s
+    @Test(expected = NotFoundException.class)
+    public void testDeleteConnectorNotFound() throws Throwable {
+        final Capture<Callback<Herder.Created<ConnectorInfo>>> cb = Capture.newInstance();
+        herder.putConnectorConfig(EasyMock.eq(CONNECTOR_NAME), EasyMock.<Map<String, String>>isNull(), EasyMock.eq(true), EasyMock.capture(cb));
+        expectAndCallbackException(cb, new NotFoundException("not found"));
+
+        PowerMock.replayAll();
+
+        connectorsResource.destroyConnector(CONNECTOR_NAME);
+
+        PowerMock.verifyAll();
+    }
+
+    @Test
+    public void testGetConnector() throws Throwable {
+        final Capture<Callback<ConnectorInfo>> cb = Capture.newInstance();
+        herder.connectorInfo(EasyMock.eq(CONNECTOR_NAME), EasyMock.capture(cb));
+        expectAndCallbackResult(cb, new ConnectorInfo(CONNECTOR_NAME, CONNECTOR_CONFIG, CONNECTOR_TASK_NAMES));
+
+        PowerMock.replayAll();
+
+        ConnectorInfo connInfo = connectorsResource.getConnector(CONNECTOR_NAME);
+        assertEquals(new ConnectorInfo(CONNECTOR_NAME, CONNECTOR_CONFIG, CONNECTOR_TASK_NAMES), connInfo);
+
+        PowerMock.verifyAll();
+    }
+
+    @Test
+    public void testGetConnectorConfig() throws Throwable {
+        final Capture<Callback<Map<String, String>>> cb = Capture.newInstance();
+        herder.connectorConfig(EasyMock.eq(CONNECTOR_NAME), EasyMock.capture(cb));
+        expectAndCallbackResult(cb, CONNECTOR_CONFIG);
+
+        PowerMock.replayAll();
+
+        Map<String, String> connConfig = connectorsResource.getConnectorConfig(CONNECTOR_NAME);
+        assertEquals(CONNECTOR_CONFIG, connConfig);
+
+        PowerMock.verifyAll();
+    }
+
+    @Test(expected = NotFoundException.class)
+    public void testGetConnectorConfigConnectorNotFound() throws Throwable {
+        final Capture<Callback<Map<String, String>>> cb = Capture.newInstance();
+        herder.connectorConfig(EasyMock.eq(CONNECTOR_NAME), EasyMock.capture(cb));
+        expectAndCallbackException(cb, new NotFoundException("not found"));
+
+        PowerMock.replayAll();
+
+        connectorsResource.getConnectorConfig(CONNECTOR_NAME);
+
+        PowerMock.verifyAll();
+    }
+
+    @Test
+    public void testPutConnectorConfig() throws Throwable {
+        final Capture<Callback<Herder.Created<ConnectorInfo>>> cb = Capture.newInstance();
+        herder.putConnectorConfig(EasyMock.eq(CONNECTOR_NAME), EasyMock.eq(CONNECTOR_CONFIG), EasyMock.eq(true), EasyMock.capture(cb));
+        expectAndCallbackResult(cb, new Herder.Created<>(false, new ConnectorInfo(CONNECTOR_NAME, CONNECTOR_CONFIG, CONNECTOR_TASK_NAMES)));
+
+        PowerMock.replayAll();
+
+        connectorsResource.putConnectorConfig(CONNECTOR_NAME, CONNECTOR_CONFIG);
+
+        PowerMock.verifyAll();
+    }
+
+    @Test
+    public void testGetConnectorTaskConfigs() throws Throwable {
+        final Capture<Callback<List<TaskInfo>>> cb = Capture.newInstance();
+        herder.taskConfigs(EasyMock.eq(CONNECTOR_NAME), EasyMock.capture(cb));
+        expectAndCallbackResult(cb, TASK_INFOS);
+
+        PowerMock.replayAll();
+
+        List<TaskInfo> taskInfos = connectorsResource.getTaskConfigs(CONNECTOR_NAME);
+        assertEquals(TASK_INFOS, taskInfos);
+
+        PowerMock.verifyAll();
+    }
+
+    @Test(expected = NotFoundException.class)
+    public void testGetConnectorTaskConfigsConnectorNotFound() throws Throwable {
+        final Capture<Callback<List<TaskInfo>>> cb = Capture.newInstance();
+        herder.taskConfigs(EasyMock.eq(CONNECTOR_NAME), EasyMock.capture(cb));
+        expectAndCallbackException(cb, new NotFoundException("connector not found"));
+
+        PowerMock.replayAll();
+
+        connectorsResource.getTaskConfigs(CONNECTOR_NAME);
+
+        PowerMock.verifyAll();
+    }
+
+    @Test
+    public void testPutConnectorTaskConfigs() throws Throwable {
+        final Capture<Callback<Void>> cb = Capture.newInstance();
+        herder.putTaskConfigs(EasyMock.eq(CONNECTOR_NAME), EasyMock.eq(TASK_CONFIGS), EasyMock.capture(cb));
+        expectAndCallbackResult(cb, null);
+
+        PowerMock.replayAll();
+
+        connectorsResource.putTaskConfigs(CONNECTOR_NAME, TASK_CONFIGS);
+
+        PowerMock.verifyAll();
+    }
+
+    @Test(expected = NotFoundException.class)
+    public void testPutConnectorTaskConfigsConnectorNotFound() throws Throwable {
+        final Capture<Callback<Void>> cb = Capture.newInstance();
+        herder.putTaskConfigs(EasyMock.eq(CONNECTOR_NAME), EasyMock.eq(TASK_CONFIGS), EasyMock.capture(cb));
+        expectAndCallbackException(cb, new NotFoundException("not found"));
+
+        PowerMock.replayAll();
+
+        connectorsResource.putTaskConfigs(CONNECTOR_NAME, TASK_CONFIGS);
+
+        PowerMock.verifyAll();
+    }
+
+    private  <T> void expectAndCallbackResult(final Capture<Callback<T>> cb, final T value) {
+        PowerMock.expectLastCall().andAnswer(new IAnswer<Void>() {
+            @Override
+            public Void answer() throws Throwable {
+                cb.getValue().onCompletion(null, value);
+                return null;
+            }
+        });
+    }
+
+    private  <T> void expectAndCallbackException(final Capture<Callback<T>> cb, final Throwable t) {
+        PowerMock.expectLastCall().andAnswer(new IAnswer<Void>() {
+            @Override
+            public Void answer() throws Throwable {
+                cb.getValue().onCompletion(t, null);
+                return null;
+            }
+        });
+    }
+
+    private  <T> void expectAndCallbackNotLeaderException(final Capture<Callback<T>> cb) {
+        expectAndCallbackException(cb, new NotLeaderException("not leader test", LEADER_URL));
+    }
+}