You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by ew...@apache.org on 2016/12/01 22:42:30 UTC

kafka git commit: KAFKA-3008: Parallel start and stop of connectors and tasks in Connect

Repository: kafka
Updated Branches:
  refs/heads/trunk 88d8508b8 -> b65f9a777


KAFKA-3008: Parallel start and stop of connectors and tasks in Connect

Author: Konstantine Karantasis <ko...@confluent.io>
Author: Konstantine Karantasis <k....@gmail.com>

Reviewers: Jason Gustafson <ja...@confluent.io>, Shikhar Bhushan <sh...@confluent.io>, Ewen Cheslack-Postava <ew...@confluent.io>

Closes #1788 from kkonstantine/KAFKA-3008-Parallel-start-and-stop-of-connectors-and-tasks


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/b65f9a77
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/b65f9a77
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/b65f9a77

Branch: refs/heads/trunk
Commit: b65f9a777d46fbe4edfed8a4c7216dd1e741be53
Parents: 88d8508
Author: Konstantine Karantasis <ko...@confluent.io>
Authored: Thu Dec 1 14:42:15 2016 -0800
Committer: Ewen Cheslack-Postava <me...@ewencp.org>
Committed: Thu Dec 1 14:42:15 2016 -0800

----------------------------------------------------------------------
 .../runtime/SourceTaskOffsetCommitter.java      | 104 +++++-----
 .../apache/kafka/connect/runtime/Worker.java    | 173 ++++++++++++-----
 .../kafka/connect/runtime/WorkerTask.java       |   2 +-
 .../runtime/distributed/DistributedHerder.java  | 178 +++++++++++++----
 .../runtime/SourceTaskOffsetCommitterTest.java  | 194 +++++++++++++++++++
 .../kafka/connect/runtime/WorkerTest.java       |  14 --
 .../distributed/DistributedHerderTest.java      |  59 +++---
 .../standalone/StandaloneHerderTest.java        |  12 +-
 8 files changed, 539 insertions(+), 197 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/b65f9a77/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/SourceTaskOffsetCommitter.java
----------------------------------------------------------------------
diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/SourceTaskOffsetCommitter.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/SourceTaskOffsetCommitter.java
index c7f869e..acc2d0d 100644
--- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/SourceTaskOffsetCommitter.java
+++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/SourceTaskOffsetCommitter.java
@@ -22,8 +22,10 @@ import org.apache.kafka.connect.util.ConnectorTaskId;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import java.util.HashMap;
-import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.CancellationException;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.ExecutionException;
 import java.util.concurrent.Executors;
 import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.ScheduledFuture;
@@ -44,13 +46,22 @@ import java.util.concurrent.TimeUnit;
 class SourceTaskOffsetCommitter {
     private static final Logger log = LoggerFactory.getLogger(SourceTaskOffsetCommitter.class);
 
-    private WorkerConfig config;
-    private ScheduledExecutorService commitExecutorService = null;
-    private final HashMap<ConnectorTaskId, ScheduledCommitTask> committers = new HashMap<>();
+    private final WorkerConfig config;
+    private final ScheduledExecutorService commitExecutorService;
+    private final ConcurrentMap<ConnectorTaskId, ScheduledFuture<?>> committers;
 
-    SourceTaskOffsetCommitter(WorkerConfig config) {
+    // visible for testing
+    SourceTaskOffsetCommitter(WorkerConfig config,
+                              ScheduledExecutorService commitExecutorService,
+                              ConcurrentMap<ConnectorTaskId, ScheduledFuture<?>> committers) {
         this.config = config;
-        commitExecutorService = Executors.newSingleThreadScheduledExecutor();
+        this.commitExecutorService = commitExecutorService;
+        this.committers = committers;
+    }
+
+    public SourceTaskOffsetCommitter(WorkerConfig config) {
+        this(config, Executors.newSingleThreadScheduledExecutor(),
+                new ConcurrentHashMap<ConnectorTaskId, ScheduledFuture<?>>());
     }
 
     public void close(long timeoutMs) {
@@ -65,72 +76,45 @@ class SourceTaskOffsetCommitter {
     }
 
     public void schedule(final ConnectorTaskId id, final WorkerSourceTask workerTask) {
-        synchronized (committers) {
-            long commitIntervalMs = config.getLong(WorkerConfig.OFFSET_COMMIT_INTERVAL_MS_CONFIG);
-            ScheduledFuture<?> commitFuture = commitExecutorService.schedule(new Runnable() {
-                @Override
-                public void run() {
-                    commit(id, workerTask);
-                }
-            }, commitIntervalMs, TimeUnit.MILLISECONDS);
-            committers.put(id, new ScheduledCommitTask(commitFuture));
-        }
+        long commitIntervalMs = config.getLong(WorkerConfig.OFFSET_COMMIT_INTERVAL_MS_CONFIG);
+        ScheduledFuture<?> commitFuture = commitExecutorService.scheduleWithFixedDelay(new Runnable() {
+            @Override
+            public void run() {
+                commit(workerTask);
+            }
+        }, commitIntervalMs, commitIntervalMs, TimeUnit.MILLISECONDS);
+        committers.put(id, commitFuture);
     }
 
     public void remove(ConnectorTaskId id) {
-        final ScheduledCommitTask task;
-        synchronized (committers) {
-            task = committers.remove(id);
-            task.cancelled = true;
-            task.commitFuture.cancel(false);
-        }
-        if (task.finishedLatch != null) {
-            try {
-                task.finishedLatch.await();
-            } catch (InterruptedException e) {
-                throw new ConnectException("Unexpected interruption in SourceTaskOffsetCommitter.", e);
-            }
-        }
-    }
+        final ScheduledFuture<?> task = committers.remove(id);
+        if (task == null)
+            return;
 
-    private void commit(ConnectorTaskId id, WorkerSourceTask workerTask) {
-        final ScheduledCommitTask task;
-        synchronized (committers) {
-            task = committers.get(id);
-            if (task == null || task.cancelled)
-                return;
-            task.finishedLatch = new CountDownLatch(1);
+        try {
+            task.cancel(false);
+            if (!task.isDone())
+                task.get();
+        } catch (CancellationException e) {
+            // ignore
+            log.trace("Offset commit thread was cancelled by another thread while removing connector task with id: {}", id);
+        } catch (ExecutionException | InterruptedException e) {
+            throw new ConnectException("Unexpected interruption in SourceTaskOffsetCommitter while removing task with id: " + id, e);
         }
+    }
 
+    private void commit(WorkerSourceTask workerTask) {
+        log.debug("Committing offsets for {}", workerTask);
         try {
-            log.debug("Committing offsets for {}", workerTask);
-            boolean success = workerTask.commitOffsets();
-            if (!success) {
-                log.error("Failed to commit offsets for {}", workerTask);
+            if (workerTask.commitOffsets()) {
+                return;
             }
+            log.error("Failed to commit offsets for {}", workerTask);
         } catch (Throwable t) {
             // We're very careful about exceptions here since any uncaught exceptions in the commit
             // thread would cause the fixed interval schedule on the ExecutorService to stop running
             // for that task
             log.error("Unhandled exception when committing {}: ", workerTask, t);
-        } finally {
-            synchronized (committers) {
-                task.finishedLatch.countDown();
-                if (!task.cancelled)
-                    schedule(id, workerTask);
-            }
-        }
-    }
-
-    private static class ScheduledCommitTask {
-        ScheduledFuture<?> commitFuture;
-        boolean cancelled;
-        CountDownLatch finishedLatch;
-
-        ScheduledCommitTask(ScheduledFuture<?> commitFuture) {
-            this.commitFuture = commitFuture;
-            this.cancelled = false;
-            this.finishedLatch = null;
         }
     }
 }

http://git-wip-us.apache.org/repos/asf/kafka/blob/b65f9a77/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Worker.java
----------------------------------------------------------------------
diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Worker.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Worker.java
index 1265f9e..c575d92 100644
--- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Worker.java
+++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Worker.java
@@ -40,10 +40,11 @@ import java.util.ArrayList;
 import java.util.Collection;
 import java.util.Collections;
 import java.util.HashMap;
-import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
 
@@ -72,8 +73,8 @@ public class Worker {
     private final OffsetBackingStore offsetBackingStore;
     private final Map<String, Object> producerProps;
 
-    private HashMap<String, WorkerConnector> connectors = new HashMap<>();
-    private HashMap<ConnectorTaskId, WorkerTask> tasks = new HashMap<>();
+    private final ConcurrentMap<String, WorkerConnector> connectors = new ConcurrentHashMap<>();
+    private final ConcurrentMap<ConnectorTaskId, WorkerTask> tasks = new ConcurrentHashMap<>();
     private SourceTaskOffsetCommitter sourceTaskOffsetCommitter;
 
     public Worker(String workerId, Time time, ConnectorFactory connectorFactory, WorkerConfig config, OffsetBackingStore offsetBackingStore) {
@@ -109,6 +110,9 @@ public class Worker {
         producerProps.putAll(config.originalsWithPrefix("producer."));
     }
 
+    /**
+     * Start worker.
+     */
     public void start() {
         log.info("Worker starting");
 
@@ -118,6 +122,9 @@ public class Worker {
         log.info("Worker started");
     }
 
+    /**
+     * Stop worker.
+     */
     public void stop() {
         log.info("Worker stopping");
 
@@ -142,6 +149,16 @@ public class Worker {
         log.info("Worker stopped");
     }
 
+    /**
+     * Start a connector managed by this worker.
+     *
+     * @param connName the connector name.
+     * @param connProps the properties of the connector.
+     * @param ctx the connector runtime context.
+     * @param statusListener a listener for the runtime status transitions of the connector.
+     * @param initialState the initial state of the connector.
+     * @return true if the connector started successfully.
+     */
     public boolean startConnector(
             String connName,
             Map<String, String> connProps,
@@ -168,18 +185,36 @@ public class Worker {
             return false;
         }
 
-        connectors.put(connName, workerConnector);
+        WorkerConnector existing = connectors.putIfAbsent(connName, workerConnector);
+        if (existing != null)
+            throw new ConnectException("Connector with name " + connName + " already exists");
 
         log.info("Finished creating connector {}", connName);
         return true;
     }
 
-    /* Now that the configuration doesn't contain the actual class name, we need to be able to tell the herder whether a connector is a Sink */
+    /**
+     * Return true if the connector associated with this worker is a sink connector.
+     *
+     * @param connName the connector name.
+     * @return true if the connector belongs to the worker and is a sink connector.
+     * @throws ConnectException if the worker does not manage a connector with the given name.
+     */
     public boolean isSinkConnector(String connName) {
         WorkerConnector workerConnector = connectors.get(connName);
+        if (workerConnector == null)
+            throw new ConnectException("Connector " + connName + " not found in this worker.");
         return workerConnector.isSinkConnector();
     }
 
+    /**
+     * Get a list of updated task properties for the tasks of this connector.
+     *
+     * @param connName the connector name.
+     * @param maxTasks the maxinum number of tasks.
+     * @param sinkTopics a list of sink topics.
+     * @return a list of updated tasks properties.
+     */
     public List<Map<String, String>> connectorTaskConfigs(String connName, int maxTasks, List<String> sinkTopics) {
         log.trace("Reconfiguring connector tasks for {}", connName);
 
@@ -200,31 +235,29 @@ public class Worker {
         return result;
     }
 
-    public void stopConnectors() {
-        stopConnectors(new HashSet<>(connectors.keySet()));
-    }
-
-    public Collection<String> stopConnectors(Collection<String> connectors) {
-        final List<String> stopped = new ArrayList<>(connectors.size());
-        for (String connector: connectors) {
-            if (stopConnector(connector)) {
-                stopped.add(connector);
-            }
-        }
-        return stopped;
+    private void stopConnectors() {
+        // Herder is responsible for stopping connectors. This is an internal method to sequentially
+        // stop connectors that have not explicitly been stopped.
+        for (String connector: connectors.keySet())
+            stopConnector(connector);
     }
 
+    /**
+     * Stop a connector managed by this worker.
+     *
+     * @param connName the connector name.
+     * @return true if the connector belonged to this worker and was successfully stopped.
+     */
     public boolean stopConnector(String connName) {
         log.info("Stopping connector {}", connName);
 
-        WorkerConnector connector = connectors.get(connName);
+        WorkerConnector connector = connectors.remove(connName);
         if (connector == null) {
             log.warn("Ignoring stop request for unowned connector {}", connName);
             return false;
         }
 
         connector.shutdown();
-        connectors.remove(connName);
 
         log.info("Stopped connector {}", connName);
         return true;
@@ -232,16 +265,34 @@ public class Worker {
 
     /**
      * Get the IDs of the connectors currently running in this worker.
+     *
+     * @return the set of connector IDs.
      */
     public Set<String> connectorNames() {
         return connectors.keySet();
     }
 
+    /**
+     * Return true if a connector with the given name is managed by this worker and is currently running.
+     *
+     * @param connName the connector name.
+     * @return true if the connector is running, false if the connector is not running or is not manages by this worker.
+     */
     public boolean isRunning(String connName) {
         WorkerConnector connector = connectors.get(connName);
         return connector != null && connector.isRunning();
     }
 
+    /**
+     * Start a task managed by this worker.
+     *
+     * @param id the task ID.
+     * @param connProps the connector properties.
+     * @param taskProps the tasks properties.
+     * @param statusListener a listener for the runtime status transitions of the task.
+     * @param initialState the initial state of the connector.
+     * @return true if the task started successfully.
+     */
     public boolean startTask(
             ConnectorTaskId id,
             Map<String, String> connProps,
@@ -282,11 +333,14 @@ public class Worker {
             return false;
         }
 
+        WorkerTask existing = tasks.putIfAbsent(id, workerTask);
+        if (existing != null)
+            throw new ConnectException("Task already exists in this worker: " + id);
+
         executor.submit(workerTask);
         if (workerTask instanceof WorkerSourceTask) {
             sourceTaskOffsetCommitter.schedule(id, (WorkerSourceTask) workerTask);
         }
-        tasks.put(id, workerTask);
         return true;
     }
 
@@ -314,34 +368,38 @@ public class Worker {
         }
     }
 
-    public boolean stopAndAwaitTask(ConnectorTaskId id) {
-        return !stopAndAwaitTasks(Collections.singleton(id)).isEmpty();
-    }
+    private void stopTask(ConnectorTaskId taskId) {
+        WorkerTask task = tasks.get(taskId);
+        if (task == null) {
+            log.warn("Ignoring stop request for unowned task {}", taskId);
+            return;
+        }
 
-    public void stopAndAwaitTasks() {
-        stopAndAwaitTasks(new HashSet<>(tasks.keySet()));
+        log.info("Stopping task {}", task.id());
+        if (task instanceof WorkerSourceTask)
+            sourceTaskOffsetCommitter.remove(task.id());
+        task.stop();
     }
 
-    public Collection<ConnectorTaskId> stopAndAwaitTasks(Collection<ConnectorTaskId> ids) {
-        final List<ConnectorTaskId> stoppable = new ArrayList<>(ids.size());
+    private void stopTasks(Collection<ConnectorTaskId> ids) {
+        // Herder is responsible for stopping tasks. This is an internal method to sequentially
+        // stop the tasks that have not explicitly been stopped.
         for (ConnectorTaskId taskId : ids) {
-            final WorkerTask task = tasks.get(taskId);
-            if (task == null) {
-                log.warn("Ignoring stop request for unowned task {}", taskId);
-                continue;
-            }
-            stopTask(task);
-            stoppable.add(taskId);
+            stopTask(taskId);
         }
-        awaitStopTasks(stoppable);
-        return stoppable;
     }
 
-    private void stopTask(WorkerTask task) {
-        log.info("Stopping task {}", task.id());
-        if (task instanceof WorkerSourceTask)
-            sourceTaskOffsetCommitter.remove(task.id());
-        task.stop();
+    private void awaitStopTask(ConnectorTaskId taskId, long timeout) {
+        WorkerTask task = tasks.remove(taskId);
+        if (task == null) {
+            log.warn("Ignoring await stop request for non-present task {}", taskId);
+            return;
+        }
+
+        if (!task.awaitStop(timeout)) {
+            log.error("Graceful stop of task {} failed.", task.id());
+            task.cancel();
+        }
     }
 
     private void awaitStopTasks(Collection<ConnectorTaskId> ids) {
@@ -349,16 +407,35 @@ public class Worker {
         long deadline = now + config.getLong(WorkerConfig.TASK_SHUTDOWN_GRACEFUL_TIMEOUT_MS_CONFIG);
         for (ConnectorTaskId id : ids) {
             long remaining = Math.max(0, deadline - time.milliseconds());
-            awaitStopTask(tasks.get(id), remaining);
+            awaitStopTask(id, remaining);
         }
     }
 
-    private void awaitStopTask(WorkerTask task, long timeout) {
-        if (!task.awaitStop(timeout)) {
-            log.error("Graceful stop of task {} failed.", task.id());
-            task.cancel();
-        }
-        tasks.remove(task.id());
+    /**
+     * Stop asynchronously all the worker's tasks and await their termination.
+     */
+    public void stopAndAwaitTasks() {
+        stopAndAwaitTasks(new ArrayList<>(tasks.keySet()));
+    }
+
+    /**
+     * Stop asynchronously a collection of tasks that belong to this worker and await their termination.
+     *
+     * @param ids the collection of tasks to be stopped.
+     */
+    public void stopAndAwaitTasks(Collection<ConnectorTaskId> ids) {
+        stopTasks(ids);
+        awaitStopTasks(ids);
+    }
+
+    /**
+     * Stop a task that belongs to this worker and await its termination.
+     *
+     * @param taskId the ID of the task to be stopped.
+     */
+    public void stopAndAwaitTask(ConnectorTaskId taskId) {
+        stopTask(taskId);
+        awaitStopTasks(Collections.singletonList(taskId));
     }
 
     /**

http://git-wip-us.apache.org/repos/asf/kafka/blob/b65f9a77/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerTask.java
----------------------------------------------------------------------
diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerTask.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerTask.java
index 846ca95..2f2ebb5 100644
--- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerTask.java
+++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerTask.java
@@ -64,7 +64,7 @@ abstract class WorkerTask implements Runnable {
 
     /**
      * Initialize the task for execution.
-     * @param props initial configuration
+     * @param taskConfig initial configuration
      */
     public abstract void initialize(TaskConfig taskConfig);
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/b65f9a77/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/DistributedHerder.java
----------------------------------------------------------------------
diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/DistributedHerder.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/DistributedHerder.java
index 170c983..ce2e72a 100644
--- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/DistributedHerder.java
+++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/DistributedHerder.java
@@ -54,10 +54,11 @@ import java.util.Collections;
 import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
-import java.util.PriorityQueue;
-import java.util.Queue;
+import java.util.NavigableSet;
+import java.util.NoSuchElementException;
 import java.util.Set;
 import java.util.concurrent.Callable;
+import java.util.concurrent.ConcurrentSkipListSet;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
@@ -98,6 +99,7 @@ public class DistributedHerder extends AbstractHerder implements Runnable {
     private static final Logger log = LoggerFactory.getLogger(DistributedHerder.class);
 
     private static final long RECONFIGURE_CONNECTOR_TASKS_BACKOFF_MS = 250;
+    private static final int START_STOP_THREAD_POOL_SIZE = 8;
 
     private final Time time;
 
@@ -106,6 +108,7 @@ public class DistributedHerder extends AbstractHerder implements Runnable {
     private final int workerUnsyncBackoffMs;
 
     private final ExecutorService forwardRequestExecutor;
+    private final ExecutorService startAndStopExecutor;
     private final WorkerGroupMember member;
     private final AtomicBoolean stopping;
     private final CountDownLatch stopLatch = new CountDownLatch(1);
@@ -119,7 +122,7 @@ public class DistributedHerder extends AbstractHerder implements Runnable {
 
     // To handle most external requests, like creating or destroying a connector, we can use a generic request where
     // the caller specifies all the code that should be executed.
-    private final Queue<HerderRequest> requests = new PriorityQueue<>();
+    private final NavigableSet<HerderRequest> requests = new ConcurrentSkipListSet<>();
     // Config updates can be collected and applied together when possible. Also, we need to take care to rebalance when
     // needed (e.g. task reconfiguration, which requires everyone to coordinate offset commits).
     private Set<String> connectorConfigUpdates = new HashSet<>();
@@ -144,11 +147,11 @@ public class DistributedHerder extends AbstractHerder implements Runnable {
                       Worker worker,
                       String workerId,
                       StatusBackingStore statusBackingStore,
-                      ConfigBackingStore configStorage,
+                      ConfigBackingStore configBackingStore,
                       WorkerGroupMember member,
                       String restUrl,
                       Time time) {
-        super(worker, workerId, statusBackingStore, configStorage);
+        super(worker, workerId, statusBackingStore, configBackingStore);
 
         this.time = time;
         this.workerGroupId = config.getString(DistributedConfig.GROUP_ID_CONFIG);
@@ -156,6 +159,7 @@ public class DistributedHerder extends AbstractHerder implements Runnable {
         this.workerUnsyncBackoffMs = config.getInt(DistributedConfig.WORKER_UNSYNC_BACKOFF_MS_CONFIG);
         this.member = member != null ? member : new WorkerGroupMember(config, restUrl, this.configBackingStore, new RebalanceListener(), time);
         this.forwardRequestExecutor = Executors.newSingleThreadExecutor();
+        this.startAndStopExecutor = Executors.newFixedThreadPool(START_STOP_THREAD_POOL_SIZE);
 
         stopping = new AtomicBoolean(false);
         configState = ClusterConfigState.EMPTY;
@@ -222,17 +226,14 @@ public class DistributedHerder extends AbstractHerder implements Runnable {
         final long now = time.milliseconds();
         long nextRequestTimeoutMs = Long.MAX_VALUE;
         while (true) {
-            final HerderRequest next;
-            synchronized (this) {
-                next = requests.peek();
-                if (next == null) {
-                    break;
-                } else if (now >= next.at) {
-                    requests.poll();
-                } else {
-                    nextRequestTimeoutMs = next.at - now;
-                    break;
-                }
+            final HerderRequest next = peekWithoutException();
+            if (next == null) {
+                break;
+            } else if (now >= next.at) {
+                requests.pollFirst();
+            } else {
+                nextRequestTimeoutMs = next.at - now;
+                break;
             }
 
             try {
@@ -292,7 +293,7 @@ public class DistributedHerder extends AbstractHerder implements Runnable {
         try {
             member.poll(nextRequestTimeoutMs);
             // Ensure we're in a good state in our group. If not restart and everything should be setup to rejoin
-            if (!handleRebalanceCompleted()) return;
+            handleRebalanceCompleted();
         } catch (WakeupException e) { // FIXME should not be WakeupException
             // Ignore. Just indicates we need to check the exit flag, for requested actions, etc.
         }
@@ -338,17 +339,24 @@ public class DistributedHerder extends AbstractHerder implements Runnable {
     public void halt() {
         synchronized (this) {
             // Clean up any connectors and tasks that are still running.
-            log.info("Stopping connectors and tasks that are still assigned to the worker");
-            worker.stopConnectors();
-            worker.stopAndAwaitTasks();
+            log.info("Stopping connectors and tasks that are still assigned to this worker.");
+            List<Callable<Void>> callables = new ArrayList<>();
+            for (String connectorName : new ArrayList<>(worker.connectorNames())) {
+                callables.add(getConnectorStoppingCallable(connectorName));
+            }
+            for (ConnectorTaskId taskId : new ArrayList<>(worker.taskIds())) {
+                callables.add(getTaskStoppingCallable(taskId));
+            }
+            startAndStop(callables);
 
             member.stop();
 
-            // Explicitly fail any outstanding requests so they actually get a response and get an understandable reason
-            // for their failure
-            while (!requests.isEmpty()) {
-                HerderRequest request = requests.poll();
+            // Explicitly fail any outstanding requests so they actually get a response and get an
+            // understandable reason for their failure.
+            HerderRequest request = requests.pollFirst();
+            while (request != null) {
                 request.callback().onCompletion(new ConnectException("Worker is shutting down"), null);
+                request = requests.pollFirst();
             }
 
             stopServices();
@@ -370,9 +378,12 @@ public class DistributedHerder extends AbstractHerder implements Runnable {
         }
 
         forwardRequestExecutor.shutdown();
+        startAndStopExecutor.shutdown();
         try {
             if (!forwardRequestExecutor.awaitTermination(10000, TimeUnit.MILLISECONDS))
                 forwardRequestExecutor.shutdownNow();
+            if (!startAndStopExecutor.awaitTermination(1000, TimeUnit.MILLISECONDS))
+                startAndStopExecutor.shutdownNow();
         } catch (InterruptedException e) {
             // ignore
         }
@@ -381,7 +392,7 @@ public class DistributedHerder extends AbstractHerder implements Runnable {
     }
 
     @Override
-    public synchronized void connectors(final Callback<Collection<String>> callback) {
+    public void connectors(final Callback<Collection<String>> callback) {
         log.trace("Submitting connector listing request");
 
         addRequest(
@@ -400,7 +411,7 @@ public class DistributedHerder extends AbstractHerder implements Runnable {
     }
 
     @Override
-    public synchronized void connectorInfo(final String connName, final Callback<ConnectorInfo> callback) {
+    public void connectorInfo(final String connName, final Callback<ConnectorInfo> callback) {
         log.trace("Submitting connector info request {}", connName);
 
         addRequest(
@@ -523,7 +534,7 @@ public class DistributedHerder extends AbstractHerder implements Runnable {
     }
 
     @Override
-    public synchronized void requestTaskReconfiguration(final String connName) {
+    public void requestTaskReconfiguration(final String connName) {
         log.trace("Submitting connector task reconfiguration request {}", connName);
 
         addRequest(
@@ -547,7 +558,7 @@ public class DistributedHerder extends AbstractHerder implements Runnable {
     }
 
     @Override
-    public synchronized void taskConfigs(final String connName, final Callback<List<TaskInfo>> callback) {
+    public void taskConfigs(final String connName, final Callback<List<TaskInfo>> callback) {
         log.trace("Submitting get task configuration request {}", connName);
 
         addRequest(
@@ -575,7 +586,7 @@ public class DistributedHerder extends AbstractHerder implements Runnable {
     }
 
     @Override
-    public synchronized void putTaskConfigs(final String connName, final List<Map<String, String>> configs, final Callback<Void> callback) {
+    public void putTaskConfigs(final String connName, final List<Map<String, String>> configs, final Callback<Void> callback) {
         log.trace("Submitting put task configuration request {}", connName);
 
         addRequest(
@@ -598,7 +609,7 @@ public class DistributedHerder extends AbstractHerder implements Runnable {
     }
 
     @Override
-    public synchronized void restartConnector(final String connName, final Callback<Void> callback) {
+    public void restartConnector(final String connName, final Callback<Void> callback) {
         addRequest(new Callable<Void>() {
             @Override
             public Void call() throws Exception {
@@ -631,7 +642,7 @@ public class DistributedHerder extends AbstractHerder implements Runnable {
     }
 
     @Override
-    public synchronized void restartTask(final ConnectorTaskId id, final Callback<Void> callback) {
+    public void restartTask(final ConnectorTaskId id, final Callback<Void> callback) {
         addRequest(new Callable<Void>() {
             @Override
             public Void call() throws Exception {
@@ -788,15 +799,26 @@ public class DistributedHerder extends AbstractHerder implements Runnable {
         Utils.sleep(ms);
     }
 
+    private void startAndStop(Collection<Callable<Void>> callables) {
+        try {
+            startAndStopExecutor.invokeAll(callables);
+        } catch (InterruptedException e) {
+            // ignore
+        }
+    }
+
     private void startWork() {
         // Start assigned connectors and tasks
         log.info("Starting connectors and tasks using config offset {}", assignment.offset());
+        List<Callable<Void>> callables = new ArrayList<>();
         for (String connectorName : assignment.connectors()) {
-            startConnector(connectorName);
+            callables.add(getConnectorStartingCallable(connectorName));
         }
+
         for (ConnectorTaskId taskId : assignment.tasks()) {
-            startTask(taskId);
+            callables.add(getTaskStartingCallable(taskId));
         }
+        startAndStop(callables);
         log.info("Finished starting connectors and tasks");
     }
 
@@ -811,12 +833,38 @@ public class DistributedHerder extends AbstractHerder implements Runnable {
         );
     }
 
+    private Callable<Void> getTaskStartingCallable(final ConnectorTaskId taskId) {
+        return new Callable<Void>() {
+            @Override
+            public Void call() throws Exception {
+                try {
+                    startTask(taskId);
+                } catch (Throwable t) {
+                    log.error("Couldn't instantiate task {} because it has an invalid task configuration. This task will not execute until reconfigured.",
+                            taskId, t);
+                    onFailure(taskId, t);
+                }
+                return null;
+            }
+        };
+    }
+
+    private Callable<Void> getTaskStoppingCallable(final ConnectorTaskId taskId) {
+        return new Callable<Void>() {
+            @Override
+            public Void call() throws Exception {
+                worker.stopAndAwaitTask(taskId);
+                return null;
+            }
+        };
+    }
+
     // Helper for starting a connector with the given name, which will extract & parse the config, generate connector
     // context and add to the worker. This needs to be called from within the main worker thread for this herder.
     private boolean startConnector(String connectorName) {
         log.info("Starting connector {}", connectorName);
         final Map<String, String> configProps = configState.connectorConfig(connectorName);
-        final ConnectorContext ctx = new HerderConnectorContext(DistributedHerder.this, connectorName);
+        final ConnectorContext ctx = new HerderConnectorContext(this, connectorName);
         final TargetState initialState = configState.targetState(connectorName);
         boolean started = worker.startConnector(connectorName, configProps, ctx, this, initialState);
 
@@ -829,6 +877,36 @@ public class DistributedHerder extends AbstractHerder implements Runnable {
         return started;
     }
 
+    private Callable<Void> getConnectorStartingCallable(final String connectorName) {
+        return new Callable<Void>() {
+            @Override
+            public Void call() throws Exception {
+                try {
+                    startConnector(connectorName);
+                } catch (Throwable t) {
+                    log.error("Couldn't instantiate connector " + connectorName + " because it has an invalid connector " +
+                            "configuration. This connector will not execute until reconfigured.", t);
+                    onFailure(connectorName, t);
+                }
+                return null;
+            }
+        };
+    }
+
+    private Callable<Void> getConnectorStoppingCallable(final String connectorName) {
+        return new Callable<Void>() {
+            @Override
+            public Void call() throws Exception {
+                try {
+                    worker.stopConnector(connectorName);
+                } catch (Throwable t) {
+                    log.error("Failed to shut down connector " + connectorName, t);
+                }
+                return null;
+            }
+        };
+    }
+
     private void reconfigureConnectorTasksWithRetry(final String connName) {
         reconfigureConnector(connName, new Callback<Void>() {
             @Override
@@ -941,10 +1019,20 @@ public class DistributedHerder extends AbstractHerder implements Runnable {
     private void addRequest(long delayMs, Callable<Void> action, Callback<Void> callback) {
         HerderRequest req = new HerderRequest(time.milliseconds() + delayMs, action, callback);
         requests.add(req);
-        if (requests.peek() == req)
+        if (peekWithoutException() == req)
             member.wakeup();
     }
 
+    private HerderRequest peekWithoutException() {
+        try {
+            return requests.isEmpty() ? null : requests.first();
+        } catch (NoSuchElementException e) {
+            // Ignore exception. Should be rare. Means that the collection became empty between
+            // checking the size and retrieving the first element.
+        }
+        return null;
+    }
+
     public class ConfigUpdateListener implements ConfigBackingStore.UpdateListener {
         @Override
         public void onConnectorConfigRemove(String connector) {
@@ -999,7 +1087,7 @@ public class DistributedHerder extends AbstractHerder implements Runnable {
         }
     }
 
-    private class HerderRequest implements Comparable<HerderRequest> {
+    private static class HerderRequest implements Comparable<HerderRequest> {
         private final long at;
         private final Callable<Void> action;
         private final Callback<Void> callback;
@@ -1020,7 +1108,9 @@ public class DistributedHerder extends AbstractHerder implements Runnable {
 
         @Override
         public int compareTo(HerderRequest o) {
-            return Long.compare(at, o.at);
+            final int soonest = Long.compare(at, o.at);
+            // If tied, returning a positive value should respect insertion order.
+            return soonest != 0 ? soonest : 1;
         }
     }
 
@@ -1081,19 +1171,25 @@ public class DistributedHerder extends AbstractHerder implements Runnable {
             // it is still important to have a leader that can write configs, offsets, etc.
 
             if (rebalanceResolved) {
-                // TODO: Parallelize this. We should be able to request all connectors and tasks to stop, then wait on all of
-                // them to finish
                 // TODO: Technically we don't have to stop connectors at all until we know they've really been removed from
                 // this worker. Instead, we can let them continue to run but buffer any update requests (which should be
                 // rare anyway). This would avoid a steady stream of start/stop, which probably also includes lots of
                 // unnecessary repeated connections to the source/sink system.
-                worker.stopConnectors(connectors);
+                List<Callable<Void>> callables = new ArrayList<>();
+                for (final String connectorName : connectors) {
+                    callables.add(getConnectorStoppingCallable(connectorName));
+                }
 
                 // TODO: We need to at least commit task offsets, but if we could commit offsets & pause them instead of
                 // stopping them then state could continue to be reused when the task remains on this worker. For example,
                 // this would avoid having to close a connection and then reopen it when the task is assigned back to this
                 // worker again.
-                worker.stopAndAwaitTasks(tasks);
+                for (final ConnectorTaskId taskId : tasks) {
+                    callables.add(getTaskStoppingCallable(taskId));
+                }
+
+                // The actual timeout for graceful task stop is applied in worker's stopAndAwaitTask method.
+                startAndStop(callables);
 
                 // Ensure that all status updates have been pushed to the storage system before rebalancing.
                 // Otherwise, we may inadvertently overwrite the state with a stale value after the rebalance

http://git-wip-us.apache.org/repos/asf/kafka/blob/b65f9a77/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/SourceTaskOffsetCommitterTest.java
----------------------------------------------------------------------
diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/SourceTaskOffsetCommitterTest.java b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/SourceTaskOffsetCommitterTest.java
new file mode 100644
index 0000000..45125cc
--- /dev/null
+++ b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/SourceTaskOffsetCommitterTest.java
@@ -0,0 +1,194 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ **/
+
+package org.apache.kafka.connect.runtime;
+
+import org.apache.kafka.connect.errors.ConnectException;
+import org.apache.kafka.connect.runtime.standalone.StandaloneConfig;
+import org.apache.kafka.connect.util.ConnectorTaskId;
+import org.apache.kafka.connect.util.ThreadedTest;
+import org.easymock.Capture;
+import org.easymock.EasyMock;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.powermock.api.easymock.PowerMock;
+import org.powermock.api.easymock.annotation.Mock;
+import org.powermock.core.classloader.annotations.PrepareForTest;
+import org.powermock.modules.junit4.PowerMockRunner;
+import org.powermock.reflect.Whitebox;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.concurrent.CancellationException;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ScheduledFuture;
+import java.util.concurrent.TimeUnit;
+
+import static org.easymock.EasyMock.eq;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+
+@RunWith(PowerMockRunner.class)
+@PrepareForTest({SourceTaskOffsetCommitter.class, LoggerFactory.class})
+public class SourceTaskOffsetCommitterTest extends ThreadedTest {
+    @Mock
+    private ScheduledExecutorService executor;
+    @Mock
+    private ConcurrentHashMap committers;
+    @Mock
+    private Logger mockLog;
+
+    private SourceTaskOffsetCommitter committer;
+
+    private static final long DEFAULT_OFFSET_COMMIT_INTERVAL_MS = 1000;
+
+    @Override
+    public void setup() {
+        super.setup();
+        Map<String, String> workerProps = new HashMap<>();
+        workerProps.put("key.converter", "org.apache.kafka.connect.json.JsonConverter");
+        workerProps.put("value.converter", "org.apache.kafka.connect.json.JsonConverter");
+        workerProps.put("internal.key.converter", "org.apache.kafka.connect.json.JsonConverter");
+        workerProps.put("internal.value.converter", "org.apache.kafka.connect.json.JsonConverter");
+        workerProps.put("internal.key.converter.schemas.enable", "false");
+        workerProps.put("internal.value.converter.schemas.enable", "false");
+        workerProps.put("offset.storage.file.filename", "/tmp/connect.offsets");
+        workerProps.put("offset.flush.interval.ms",
+                Long.toString(DEFAULT_OFFSET_COMMIT_INTERVAL_MS));
+        WorkerConfig config = new StandaloneConfig(workerProps);
+        committer = new SourceTaskOffsetCommitter(config, executor, committers);
+        Whitebox.setInternalState(SourceTaskOffsetCommitter.class, "log", mockLog);
+    }
+
+    @Test
+    public void testSchedule() throws Exception {
+        Capture<Runnable> taskWrapper = EasyMock.newCapture();
+
+        ScheduledFuture commitFuture = PowerMock.createMock(ScheduledFuture.class);
+        EasyMock.expect(executor.scheduleWithFixedDelay(
+                EasyMock.capture(taskWrapper), eq(DEFAULT_OFFSET_COMMIT_INTERVAL_MS),
+                eq(DEFAULT_OFFSET_COMMIT_INTERVAL_MS), eq(TimeUnit.MILLISECONDS))
+        ).andReturn(commitFuture);
+
+        ConnectorTaskId taskId = PowerMock.createMock(ConnectorTaskId.class);
+        WorkerSourceTask task = PowerMock.createMock(WorkerSourceTask.class);
+
+        EasyMock.expect(committers.put(taskId, commitFuture)).andReturn(null);
+
+        PowerMock.replayAll();
+
+        committer.schedule(taskId, task);
+        assertTrue(taskWrapper.hasCaptured());
+        assertNotNull(taskWrapper.getValue());
+
+        PowerMock.verifyAll();
+    }
+
+    @Test
+    public void testClose() throws Exception {
+        long timeoutMs = 1000;
+
+        // Normal termination, where termination times out.
+        executor.shutdown();
+        PowerMock.expectLastCall();
+
+        EasyMock.expect(executor.awaitTermination(eq(timeoutMs), eq(TimeUnit.MILLISECONDS)))
+                .andReturn(false);
+        mockLog.error(EasyMock.anyString());
+        PowerMock.expectLastCall();
+        PowerMock.replayAll();
+
+        committer.close(timeoutMs);
+
+        PowerMock.verifyAll();
+        PowerMock.resetAll();
+
+        // Termination interrupted
+        executor.shutdown();
+        PowerMock.expectLastCall();
+
+        EasyMock.expect(executor.awaitTermination(eq(timeoutMs), eq(TimeUnit.MILLISECONDS)))
+                .andThrow(new InterruptedException());
+        PowerMock.replayAll();
+
+        committer.close(timeoutMs);
+
+        PowerMock.verifyAll();
+    }
+
+    @Test
+    public void testRemove() throws Exception {
+        ConnectorTaskId taskId = PowerMock.createMock(ConnectorTaskId.class);
+        ScheduledFuture task = PowerMock.createMock(ScheduledFuture.class);
+
+        // Try to remove a non-existing task
+        EasyMock.expect(committers.remove(taskId)).andReturn(null);
+        PowerMock.replayAll();
+
+        committer.remove(taskId);
+
+        PowerMock.verifyAll();
+        PowerMock.resetAll();
+
+        // Try to remove an existing task
+        EasyMock.expect(committers.remove(taskId)).andReturn(task);
+        EasyMock.expect(task.cancel(eq(false))).andReturn(false);
+        EasyMock.expect(task.isDone()).andReturn(false);
+        EasyMock.expect(task.get()).andReturn(null);
+        PowerMock.replayAll();
+
+        committer.remove(taskId);
+
+        PowerMock.verifyAll();
+        PowerMock.resetAll();
+
+        // Try to remove a cancelled task
+        EasyMock.expect(committers.remove(taskId)).andReturn(task);
+        EasyMock.expect(task.cancel(eq(false))).andReturn(false);
+        EasyMock.expect(task.isDone()).andReturn(false);
+        EasyMock.expect(task.get()).andThrow(new CancellationException());
+        mockLog.trace(EasyMock.anyString(), EasyMock.anyObject());
+        PowerMock.expectLastCall();
+        PowerMock.replayAll();
+
+        committer.remove(taskId);
+
+        PowerMock.verifyAll();
+        PowerMock.resetAll();
+
+        // Try to remove an interrupted task
+        EasyMock.expect(committers.remove(taskId)).andReturn(task);
+        EasyMock.expect(task.cancel(eq(false))).andReturn(false);
+        EasyMock.expect(task.isDone()).andReturn(false);
+        EasyMock.expect(task.get()).andThrow(new InterruptedException());
+        PowerMock.replayAll();
+
+        try {
+            committer.remove(taskId);
+            fail("Expected ConnectException to be raised");
+        } catch (ConnectException e) {
+            //ignore
+        }
+
+        PowerMock.verifyAll();
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/b65f9a77/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerTest.java
----------------------------------------------------------------------
diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerTest.java b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerTest.java
index 97e29be..eac0520 100644
--- a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerTest.java
+++ b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerTest.java
@@ -425,20 +425,6 @@ public class WorkerTest extends ThreadedTest {
         EasyMock.expectLastCall();
 
         assertEquals(Collections.emptySet(), worker.taskIds());
-
-        assertFalse(worker.stopAndAwaitTask(TASK_ID));
-    }
-
-    @Test
-    public void testStopInvalidTask() {
-        expectStartStorage();
-
-        PowerMock.replayAll();
-
-        worker = new Worker(WORKER_ID, new MockTime(), connectorFactory, config, offsetBackingStore);
-        worker.start();
-
-        assertFalse(worker.stopAndAwaitTask(TASK_ID));
     }
 
     @Test

http://git-wip-us.apache.org/repos/asf/kafka/blob/b65f9a77/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/distributed/DistributedHerderTest.java
----------------------------------------------------------------------
diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/distributed/DistributedHerderTest.java b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/distributed/DistributedHerderTest.java
index 1da4595..5be2044 100644
--- a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/distributed/DistributedHerderTest.java
+++ b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/distributed/DistributedHerderTest.java
@@ -41,7 +41,6 @@ import org.apache.kafka.connect.sink.SinkConnector;
 import org.apache.kafka.connect.source.SourceConnector;
 import org.apache.kafka.connect.source.SourceTask;
 import org.apache.kafka.connect.storage.ConfigBackingStore;
-import org.apache.kafka.connect.storage.KafkaConfigBackingStore;
 import org.apache.kafka.connect.storage.StatusBackingStore;
 import org.apache.kafka.connect.util.Callback;
 import org.apache.kafka.connect.util.ConnectorTaskId;
@@ -145,7 +144,7 @@ public class DistributedHerderTest {
 
     private static final String WORKER_ID = "localhost:8083";
 
-    @Mock private KafkaConfigBackingStore configStorage;
+    @Mock private ConfigBackingStore configBackingStore;
     @Mock private StatusBackingStore statusBackingStore;
     @Mock private WorkerGroupMember member;
     private MockTime time;
@@ -163,7 +162,7 @@ public class DistributedHerderTest {
         time = new MockTime();
 
         herder = PowerMock.createPartialMock(DistributedHerder.class, new String[]{"backoff", "updateDeletedConnectorStatus"},
-                new DistributedConfig(HERDER_CONFIG), worker, WORKER_ID, statusBackingStore, configStorage, member, MEMBER_URL, time);
+                new DistributedConfig(HERDER_CONFIG), worker, WORKER_ID, statusBackingStore, configBackingStore, member, MEMBER_URL, time);
 
         configUpdateListener = herder.new ConfigUpdateListener();
         rebalanceListener = herder.new RebalanceListener();
@@ -274,13 +273,15 @@ public class DistributedHerderTest {
 
     @Test
     public void testHaltCleansUpWorker() {
-        worker.stopConnectors();
-        PowerMock.expectLastCall();
-        worker.stopAndAwaitTasks();
+        EasyMock.expect(worker.connectorNames()).andReturn(Collections.singleton(CONN1));
+        worker.stopConnector(CONN1);
+        PowerMock.expectLastCall().andReturn(true);
+        EasyMock.expect(worker.taskIds()).andReturn(Collections.singleton(TASK1));
+        worker.stopAndAwaitTask(TASK1);
         PowerMock.expectLastCall();
         member.stop();
         PowerMock.expectLastCall();
-        configStorage.stop();
+        configBackingStore.stop();
         PowerMock.expectLastCall();
         statusBackingStore.stop();
         PowerMock.expectLastCall();
@@ -312,7 +313,7 @@ public class DistributedHerderTest {
         EasyMock.expect(connectorMock.validate(CONN2_CONFIG)).andReturn(new Config(Collections.<ConfigValue>emptyList()));
 
         // CONN2 is new, should succeed
-        configStorage.putConnectorConfig(CONN2, CONN2_CONFIG);
+        configBackingStore.putConnectorConfig(CONN2, CONN2_CONFIG);
         PowerMock.expectLastCall();
         ConnectorInfo info = new ConnectorInfo(CONN2, CONN2_CONFIG, Collections.<ConnectorTaskId>emptyList());
         putConnectorCallback.onCompletion(null, new Herder.Created<>(true, info));
@@ -494,7 +495,7 @@ public class DistributedHerderTest {
         // And delete the connector
         member.wakeup();
         PowerMock.expectLastCall();
-        configStorage.removeConnectorConfig(CONN1);
+        configBackingStore.removeConnectorConfig(CONN1);
         PowerMock.expectLastCall();
         putConnectorCallback.onCompletion(null, new Herder.Created<ConnectorInfo>(false, null));
         PowerMock.expectLastCall();
@@ -681,7 +682,7 @@ public class DistributedHerderTest {
         PowerMock.expectLastCall();
 
         worker.stopAndAwaitTask(TASK0);
-        PowerMock.expectLastCall().andReturn(true);
+        PowerMock.expectLastCall();
         worker.startTask(EasyMock.eq(TASK0), EasyMock.<Map<String, String>>anyObject(), EasyMock.<Map<String, String>>anyObject(),
                 EasyMock.eq(herder), EasyMock.eq(TargetState.STARTED));
         PowerMock.expectLastCall().andReturn(true);
@@ -817,7 +818,7 @@ public class DistributedHerderTest {
         member.ensureActive();
         PowerMock.expectLastCall();
         // Checks for config updates and starts rebalance
-        EasyMock.expect(configStorage.snapshot()).andReturn(SNAPSHOT);
+        EasyMock.expect(configBackingStore.snapshot()).andReturn(SNAPSHOT);
         member.requestRejoin();
         PowerMock.expectLastCall();
         // Performs rebalance and gets new assignment
@@ -863,7 +864,7 @@ public class DistributedHerderTest {
         member.wakeup();
         member.ensureActive();
         PowerMock.expectLastCall();
-        EasyMock.expect(configStorage.snapshot()).andReturn(SNAPSHOT); // for this test, it doesn't matter if we use the same config snapshot
+        EasyMock.expect(configBackingStore.snapshot()).andReturn(SNAPSHOT); // for this test, it doesn't matter if we use the same config snapshot
         worker.stopConnector(CONN1);
         PowerMock.expectLastCall().andReturn(true);
         worker.startConnector(EasyMock.eq(CONN1), EasyMock.<Map<String, String>>anyObject(), EasyMock.<ConnectorContext>anyObject(),
@@ -906,7 +907,7 @@ public class DistributedHerderTest {
         member.ensureActive();
         PowerMock.expectLastCall();
 
-        EasyMock.expect(configStorage.snapshot()).andReturn(SNAPSHOT_PAUSED_CONN1);
+        EasyMock.expect(configBackingStore.snapshot()).andReturn(SNAPSHOT_PAUSED_CONN1);
         PowerMock.expectLastCall();
 
         worker.setTargetState(CONN1, TargetState.PAUSED);
@@ -944,7 +945,7 @@ public class DistributedHerderTest {
         member.ensureActive();
         PowerMock.expectLastCall();
 
-        EasyMock.expect(configStorage.snapshot()).andReturn(SNAPSHOT);
+        EasyMock.expect(configBackingStore.snapshot()).andReturn(SNAPSHOT);
         PowerMock.expectLastCall();
 
         // we expect reconfiguration after resuming
@@ -985,7 +986,7 @@ public class DistributedHerderTest {
         member.ensureActive();
         PowerMock.expectLastCall();
 
-        EasyMock.expect(configStorage.snapshot()).andReturn(SNAPSHOT);
+        EasyMock.expect(configBackingStore.snapshot()).andReturn(SNAPSHOT);
         PowerMock.expectLastCall();
 
         member.poll(EasyMock.anyInt());
@@ -1022,7 +1023,7 @@ public class DistributedHerderTest {
         member.ensureActive();
         PowerMock.expectLastCall();
 
-        EasyMock.expect(configStorage.snapshot()).andReturn(SNAPSHOT_PAUSED_CONN1);
+        EasyMock.expect(configBackingStore.snapshot()).andReturn(SNAPSHOT_PAUSED_CONN1);
         PowerMock.expectLastCall();
 
         worker.setTargetState(CONN1, TargetState.PAUSED);
@@ -1062,7 +1063,7 @@ public class DistributedHerderTest {
         member.ensureActive();
         PowerMock.expectLastCall();
 
-        EasyMock.expect(configStorage.snapshot()).andReturn(SNAPSHOT);
+        EasyMock.expect(configBackingStore.snapshot()).andReturn(SNAPSHOT);
         PowerMock.expectLastCall();
 
         worker.setTargetState(CONN1, TargetState.STARTED);
@@ -1097,7 +1098,7 @@ public class DistributedHerderTest {
         member.ensureActive();
         PowerMock.expectLastCall();
         // Checks for config updates and starts rebalance
-        EasyMock.expect(configStorage.snapshot()).andReturn(SNAPSHOT);
+        EasyMock.expect(configBackingStore.snapshot()).andReturn(SNAPSHOT);
         member.requestRejoin();
         PowerMock.expectLastCall();
         // Performs rebalance and gets new assignment
@@ -1128,7 +1129,7 @@ public class DistributedHerderTest {
                 ConnectProtocol.Assignment.CONFIG_MISMATCH, 1, Collections.<String>emptyList(),
                 Collections.<ConnectorTaskId>emptyList());
         // Reading to end of log times out
-        configStorage.refresh(EasyMock.anyLong(), EasyMock.anyObject(TimeUnit.class));
+        configBackingStore.refresh(EasyMock.anyLong(), EasyMock.anyObject(TimeUnit.class));
         EasyMock.expectLastCall().andThrow(new TimeoutException());
         member.maybeLeaveGroup();
         EasyMock.expectLastCall();
@@ -1230,7 +1231,7 @@ public class DistributedHerderTest {
         EasyMock.expect(connectorMock.config()).andReturn(new ConfigDef());
         EasyMock.expect(connectorMock.validate(CONN1_CONFIG_UPDATED)).andReturn(new Config(Collections.<ConfigValue>emptyList()));
 
-        configStorage.putConnectorConfig(CONN1, CONN1_CONFIG_UPDATED);
+        configBackingStore.putConnectorConfig(CONN1, CONN1_CONFIG_UPDATED);
         PowerMock.expectLastCall().andAnswer(new IAnswer<Object>() {
             @Override
             public Object answer() throws Throwable {
@@ -1241,7 +1242,7 @@ public class DistributedHerderTest {
         });
         // As a result of reconfig, should need to update snapshot. With only connector updates, we'll just restart
         // connector without rebalance
-        EasyMock.expect(configStorage.snapshot()).andReturn(SNAPSHOT_UPDATED_CONN1_CONFIG);
+        EasyMock.expect(configBackingStore.snapshot()).andReturn(SNAPSHOT_UPDATED_CONN1_CONFIG);
         worker.stopConnector(CONN1);
         PowerMock.expectLastCall().andReturn(true);
         worker.startConnector(EasyMock.eq(CONN1), EasyMock.<Map<String, String>>anyObject(), EasyMock.<ConnectorContext>anyObject(),
@@ -1318,13 +1319,15 @@ public class DistributedHerderTest {
         });
 
         if (revokedConnectors != null) {
-            worker.stopConnectors(revokedConnectors);
-            PowerMock.expectLastCall().andReturn(revokedConnectors);
+            for (String connector : revokedConnectors) {
+                worker.stopConnector(connector);
+                PowerMock.expectLastCall().andReturn(true);
+            }
         }
 
-        if (revokedTasks != null) {
-            worker.stopAndAwaitTasks(revokedTasks);
-            PowerMock.expectLastCall().andReturn(revokedTasks);
+        if (revokedTasks != null && !revokedTasks.isEmpty()) {
+            worker.stopAndAwaitTask(EasyMock.anyObject(ConnectorTaskId.class));
+            PowerMock.expectLastCall();
         }
 
         if (revokedConnectors != null) {
@@ -1337,9 +1340,9 @@ public class DistributedHerderTest {
     }
 
     private void expectPostRebalanceCatchup(final ClusterConfigState readToEndSnapshot) throws TimeoutException {
-        configStorage.refresh(EasyMock.anyLong(), EasyMock.anyObject(TimeUnit.class));
+        configBackingStore.refresh(EasyMock.anyLong(), EasyMock.anyObject(TimeUnit.class));
         EasyMock.expectLastCall();
-        EasyMock.expect(configStorage.snapshot()).andReturn(readToEndSnapshot);
+        EasyMock.expect(configBackingStore.snapshot()).andReturn(readToEndSnapshot);
     }
 
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/b65f9a77/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/standalone/StandaloneHerderTest.java
----------------------------------------------------------------------
diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/standalone/StandaloneHerderTest.java b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/standalone/StandaloneHerderTest.java
index 0bc3d5c..010c0b2 100644
--- a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/standalone/StandaloneHerderTest.java
+++ b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/standalone/StandaloneHerderTest.java
@@ -66,7 +66,9 @@ import java.util.Map;
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.TimeUnit;
 
+import static java.util.Collections.singleton;
 import static java.util.Collections.singletonList;
+import static java.util.Collections.singletonMap;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertTrue;
 import static org.junit.Assert.fail;
@@ -296,7 +298,7 @@ public class StandaloneHerderTest {
         expectConfigValidation(connectorConfig);
 
         worker.stopAndAwaitTask(taskId);
-        EasyMock.expectLastCall().andReturn(true);
+        EasyMock.expectLastCall();
 
         worker.startTask(taskId, connectorConfig, taskConfig(SourceSink.SOURCE), herder, TargetState.STARTED);
         EasyMock.expectLastCall().andReturn(true);
@@ -321,7 +323,7 @@ public class StandaloneHerderTest {
         expectConfigValidation(connectorConfig);
 
         worker.stopAndAwaitTask(taskId);
-        EasyMock.expectLastCall().andReturn(true);
+        EasyMock.expectLastCall();
 
         worker.startTask(taskId, connectorConfig, taskConfig(SourceSink.SOURCE), herder, TargetState.STARTED);
         EasyMock.expectLastCall().andReturn(false);
@@ -391,7 +393,7 @@ public class StandaloneHerderTest {
         expectConfigValidation(connConfig);
 
         // Validate accessors with 1 connector
-        listConnectorsCb.onCompletion(null, Collections.singleton(CONNECTOR_NAME));
+        listConnectorsCb.onCompletion(null, singleton(CONNECTOR_NAME));
         EasyMock.expectLastCall();
         ConnectorInfo connInfo = new ConnectorInfo(CONNECTOR_NAME, connConfig, Arrays.asList(new ConnectorTaskId(CONNECTOR_NAME, 0)));
         connectorInfoCb.onCompletion(null, connInfo);
@@ -477,7 +479,7 @@ public class StandaloneHerderTest {
         PowerMock.replayAll();
 
         herder.putTaskConfigs(CONNECTOR_NAME,
-                Arrays.asList(Collections.singletonMap("config", "value")),
+                Arrays.asList(singletonMap("config", "value")),
                 cb);
 
         PowerMock.verifyAll();
@@ -513,7 +515,7 @@ public class StandaloneHerderTest {
     private void expectStop() {
         ConnectorTaskId task = new ConnectorTaskId(CONNECTOR_NAME, 0);
         worker.stopAndAwaitTasks(singletonList(task));
-        EasyMock.expectLastCall().andReturn(Collections.singleton(task));
+        EasyMock.expectLastCall();
         worker.stopConnector(CONNECTOR_NAME);
         EasyMock.expectLastCall().andReturn(true);
     }