You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by rs...@apache.org on 2017/11/03 09:38:07 UTC

[4/4] kafka git commit: KAFKA-6060; Add workload generation capabilities to Trogdor

KAFKA-6060; Add workload generation capabilities to Trogdor

Previously, Trogdor only handled "Faults."  Now, Trogdor can handle
"Tasks" which may be either faults, or workloads to execute in the
background.

The Agent and Coordinator have been refactored from a
mutexes-and-condition-variables paradigm into a message passing
paradigm.  No locks are necessary, because only one thread can access
the task state or worker state.  This makes them a lot easier to reason
about.

The MockTime class can now handle mocking deferred message passing
(adding a message to an ExecutorService with a delay).  I added a
MockTimeTest.

MiniTrogdorCluster now starts up Agent and Coordinator classes in
paralle in order to minimize junit test time.

RPC messages now inherit from a common Message.java class.  This class
handles implementing serialization, equals, hashCode, etc.

Remove FaultSet, since it is no longer necessary.

Previously, if CoordinatorClient or AgentClient hit a networking
problem, they would throw an exception.  They now retry several times
before giving up.  Additionally, the REST RPCs to the Coordinator and
Agent have been changed to be idempotent.  If a response is lost, and
the request is resent, no harm will be done.

Author: Colin P. Mccabe <cm...@confluent.io>

Reviewers: Rajini Sivaram <ra...@googlemail.com>, Ismael Juma <is...@juma.me.uk>

Closes #4073 from cmccabe/KAFKA-6060


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/4fac83ba
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/4fac83ba
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/4fac83ba

Branch: refs/heads/trunk
Commit: 4fac83ba1f80353e9544b15b95b8da9dc557041d
Parents: e4208b1
Author: Colin P. Mccabe <cm...@confluent.io>
Authored: Fri Nov 3 09:37:29 2017 +0000
Committer: Rajini Sivaram <ra...@googlemail.com>
Committed: Fri Nov 3 09:37:29 2017 +0000

----------------------------------------------------------------------
 checkstyle/import-control.xml                   |   4 +
 .../apache/kafka/common/utils/Scheduler.java    |  49 ++
 .../kafka/common/utils/SystemScheduler.java     |  43 ++
 .../kafka/common/utils/MockScheduler.java       | 121 ++++
 .../org/apache/kafka/common/utils/MockTime.java |  22 +
 .../apache/kafka/common/utils/MockTimeTest.java |  50 ++
 gradle/findbugs-exclude.xml                     |   8 +
 tests/kafkatest/services/trogdor/fault_spec.py  |  45 --
 .../trogdor/network_partition_fault_spec.py     |   6 +-
 .../services/trogdor/no_op_fault_spec.py        |  41 --
 .../services/trogdor/no_op_task_spec.py         |  41 ++
 tests/kafkatest/services/trogdor/task_spec.py   |  45 ++
 tests/kafkatest/services/trogdor/trogdor.py     | 115 +++-
 tests/kafkatest/tests/tools/trogdor_test.py     |  21 +-
 .../org/apache/kafka/trogdor/agent/Agent.java   | 281 ++-------
 .../apache/kafka/trogdor/agent/AgentClient.java |  82 +--
 .../kafka/trogdor/agent/AgentRestResource.java  |  24 +-
 .../kafka/trogdor/agent/WorkerManager.java      | 574 +++++++++++++++++++
 .../kafka/trogdor/basic/BasicPlatform.java      |  13 +-
 .../apache/kafka/trogdor/common/Platform.java   |   8 +
 .../kafka/trogdor/common/ThreadUtils.java       |  55 ++
 .../apache/kafka/trogdor/common/Topology.java   |  18 +
 .../kafka/trogdor/coordinator/Coordinator.java  | 259 ++-------
 .../trogdor/coordinator/CoordinatorClient.java  |  95 +--
 .../coordinator/CoordinatorRestResource.java    |  38 +-
 .../kafka/trogdor/coordinator/NodeManager.java  | 407 +++++++------
 .../kafka/trogdor/coordinator/TaskManager.java  | 535 +++++++++++++++++
 .../kafka/trogdor/fault/AbstractFault.java      | 106 ----
 .../kafka/trogdor/fault/AbstractFaultSpec.java  |  53 --
 .../apache/kafka/trogdor/fault/DoneState.java   |  47 --
 .../org/apache/kafka/trogdor/fault/Fault.java   |  70 ---
 .../apache/kafka/trogdor/fault/FaultSet.java    | 146 -----
 .../apache/kafka/trogdor/fault/FaultSpec.java   |  59 --
 .../apache/kafka/trogdor/fault/FaultState.java  |  51 --
 .../trogdor/fault/NetworkPartitionFault.java    |  96 ----
 .../fault/NetworkPartitionFaultController.java  |  42 ++
 .../fault/NetworkPartitionFaultSpec.java        |  40 +-
 .../fault/NetworkPartitionFaultWorker.java      |  79 +++
 .../apache/kafka/trogdor/fault/NoOpFault.java   |  57 --
 .../kafka/trogdor/fault/NoOpFaultSpec.java      |  50 --
 .../kafka/trogdor/fault/PendingState.java       |  30 -
 .../kafka/trogdor/fault/RunningState.java       |  38 --
 .../kafka/trogdor/fault/SendingState.java       |  64 ---
 .../kafka/trogdor/rest/AgentFaultsResponse.java |  52 --
 .../kafka/trogdor/rest/AgentStatusResponse.java |  39 +-
 .../trogdor/rest/CoordinatorFaultsResponse.java |  52 --
 .../rest/CoordinatorShutdownRequest.java        |  38 ++
 .../trogdor/rest/CoordinatorStatusResponse.java |  35 +-
 .../trogdor/rest/CreateAgentFaultRequest.java   |  69 ---
 .../rest/CreateCoordinatorFaultRequest.java     |  69 ---
 .../kafka/trogdor/rest/CreateTaskRequest.java   |  47 ++
 .../kafka/trogdor/rest/CreateTaskResponse.java  |  39 ++
 .../kafka/trogdor/rest/CreateWorkerRequest.java |  47 ++
 .../trogdor/rest/CreateWorkerResponse.java      |  39 ++
 .../apache/kafka/trogdor/rest/FaultDataMap.java |  98 ----
 .../kafka/trogdor/rest/JsonRestServer.java      |  70 ++-
 .../org/apache/kafka/trogdor/rest/Message.java  |  41 ++
 .../kafka/trogdor/rest/StopTaskRequest.java     |  38 ++
 .../kafka/trogdor/rest/StopTaskResponse.java    |  39 ++
 .../kafka/trogdor/rest/StopWorkerRequest.java   |  38 ++
 .../kafka/trogdor/rest/StopWorkerResponse.java  |  39 ++
 .../org/apache/kafka/trogdor/rest/TaskDone.java |  80 +++
 .../apache/kafka/trogdor/rest/TaskPending.java  |  32 ++
 .../apache/kafka/trogdor/rest/TaskRunning.java  |  44 ++
 .../apache/kafka/trogdor/rest/TaskState.java    |  48 ++
 .../apache/kafka/trogdor/rest/TaskStopping.java |  44 ++
 .../kafka/trogdor/rest/TasksResponse.java       |  40 ++
 .../apache/kafka/trogdor/rest/WorkerDone.java   |  88 +++
 .../kafka/trogdor/rest/WorkerReceiving.java     |  33 ++
 .../kafka/trogdor/rest/WorkerRunning.java       |  64 +++
 .../kafka/trogdor/rest/WorkerStarting.java      |  32 ++
 .../apache/kafka/trogdor/rest/WorkerState.java  |  70 +++
 .../kafka/trogdor/rest/WorkerStopping.java      |  69 +++
 .../kafka/trogdor/task/NoOpTaskController.java  |  36 ++
 .../apache/kafka/trogdor/task/NoOpTaskSpec.java |  44 ++
 .../kafka/trogdor/task/NoOpTaskWorker.java      |  46 ++
 .../kafka/trogdor/task/TaskController.java      |  36 ++
 .../org/apache/kafka/trogdor/task/TaskSpec.java | 105 ++++
 .../apache/kafka/trogdor/task/TaskWorker.java   |  78 +++
 .../apache/kafka/trogdor/agent/AgentTest.java   | 220 ++++---
 .../kafka/trogdor/common/ExpectedFaults.java    | 193 -------
 .../kafka/trogdor/common/ExpectedTasks.java     | 206 +++++++
 .../trogdor/common/MiniTrogdorCluster.java      |  73 ++-
 .../trogdor/coordinator/CoordinatorTest.java    | 195 +++++--
 .../kafka/trogdor/fault/FaultSetTest.java       | 126 ----
 .../trogdor/task/SampleTaskController.java      |  29 +
 .../kafka/trogdor/task/SampleTaskSpec.java      |  56 ++
 .../kafka/trogdor/task/SampleTaskWorker.java    |  63 ++
 88 files changed, 4461 insertions(+), 2606 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/4fac83ba/checkstyle/import-control.xml
----------------------------------------------------------------------
diff --git a/checkstyle/import-control.xml b/checkstyle/import-control.xml
index ddb13bc..403cae2 100644
--- a/checkstyle/import-control.xml
+++ b/checkstyle/import-control.xml
@@ -187,6 +187,10 @@
     <allow pkg="javax.servlet" />
     <allow pkg="javax.ws.rs" />
     <allow pkg="net.sourceforge.argparse4j" />
+    <allow pkg="org.apache.kafka.clients.admin" />
+    <allow pkg="org.apache.kafka.clients.consumer" exact-match="true"/>
+    <allow pkg="org.apache.kafka.clients.producer" exact-match="true"/>
+    <allow pkg="org.apache.kafka.common" />
     <allow pkg="org.apache.kafka.test"/>
     <allow pkg="org.apache.kafka.trogdor" />
     <allow pkg="org.apache.log4j" />

http://git-wip-us.apache.org/repos/asf/kafka/blob/4fac83ba/clients/src/main/java/org/apache/kafka/common/utils/Scheduler.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/utils/Scheduler.java b/clients/src/main/java/org/apache/kafka/common/utils/Scheduler.java
new file mode 100644
index 0000000..a8ada65
--- /dev/null
+++ b/clients/src/main/java/org/apache/kafka/common/utils/Scheduler.java
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.common.utils;
+
+import java.util.concurrent.Callable;
+import java.util.concurrent.Future;
+import java.util.concurrent.ScheduledExecutorService;
+
+/**
+ * An interface for scheduling tasks for the future.
+ *
+ * Implementations of this class should be thread-safe.
+ */
+public interface Scheduler {
+    Scheduler SYSTEM = new SystemScheduler();
+
+    /**
+     * Get the timekeeper associated with this scheduler.
+     */
+    Time time();
+
+    /**
+     * Schedule a callable to be executed in the future on a
+     * ScheduledExecutorService.  Note that the Callable may not be queued on
+     * the executor until the designated time arrives.
+     *
+     * @param executor      The executor to use.
+     * @param callable      The callable to execute.
+     * @param delayMs       The delay to use, in milliseconds.
+     * @param <T>           The return type of the callable.
+     * @return              A future which will complete when the callable is finished.
+     */
+    <T> Future<T> schedule(final ScheduledExecutorService executor,
+                           final Callable<T> callable, long delayMs);
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/4fac83ba/clients/src/main/java/org/apache/kafka/common/utils/SystemScheduler.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/utils/SystemScheduler.java b/clients/src/main/java/org/apache/kafka/common/utils/SystemScheduler.java
new file mode 100644
index 0000000..c8c1148
--- /dev/null
+++ b/clients/src/main/java/org/apache/kafka/common/utils/SystemScheduler.java
@@ -0,0 +1,43 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.common.utils;
+
+import java.util.concurrent.Callable;
+import java.util.concurrent.Future;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+
+/**
+ * A scheduler implementation that uses the system clock.
+ *
+ * Use Scheduler.SYSTEM instead of constructing an instance of this class.
+ */
+public class SystemScheduler implements Scheduler {
+    SystemScheduler() {
+    }
+
+    @Override
+    public Time time() {
+        return Time.SYSTEM;
+    }
+
+    @Override
+    public <T> Future<T> schedule(final ScheduledExecutorService executor,
+                                  final Callable<T> callable, long delayMs) {
+        return executor.schedule(callable, delayMs, TimeUnit.MILLISECONDS);
+    }
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/4fac83ba/clients/src/test/java/org/apache/kafka/common/utils/MockScheduler.java
----------------------------------------------------------------------
diff --git a/clients/src/test/java/org/apache/kafka/common/utils/MockScheduler.java b/clients/src/test/java/org/apache/kafka/common/utils/MockScheduler.java
new file mode 100644
index 0000000..ba5e1ed
--- /dev/null
+++ b/clients/src/test/java/org/apache/kafka/common/utils/MockScheduler.java
@@ -0,0 +1,121 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.common.utils;
+
+import org.apache.kafka.common.KafkaFuture;
+import org.apache.kafka.common.internals.KafkaFutureImpl;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.TreeMap;
+import java.util.concurrent.Callable;
+import java.util.concurrent.Future;
+import java.util.concurrent.ScheduledExecutorService;
+
+public class MockScheduler implements Scheduler, MockTime.MockTimeListener {
+    private static final Logger log = LoggerFactory.getLogger(MockScheduler.class);
+
+    /**
+     * The MockTime object.
+     */
+    private final MockTime time;
+
+    /**
+     * Futures which are waiting for a specified wall-clock time to arrive.
+     */
+    private final TreeMap<Long, List<KafkaFutureImpl<Long>>> waiters = new TreeMap<>();
+
+    public MockScheduler(MockTime time) {
+        this.time = time;
+        time.addListener(this);
+    }
+
+    @Override
+    public Time time() {
+        return time;
+    }
+
+    @Override
+    public synchronized void tick() {
+        long timeMs = time.milliseconds();
+        while (true) {
+            Map.Entry<Long, List<KafkaFutureImpl<Long>>> entry = waiters.firstEntry();
+            if ((entry == null) || (entry.getKey() > timeMs)) {
+                break;
+            }
+            for (KafkaFutureImpl<Long> future : entry.getValue()) {
+                future.complete(timeMs);
+            }
+            waiters.remove(entry.getKey());
+        }
+    }
+
+    public synchronized void addWaiter(long delayMs, KafkaFutureImpl<Long> waiter) {
+        long timeMs = time.milliseconds();
+        if (delayMs <= 0) {
+            waiter.complete(timeMs);
+        } else {
+            long triggerTimeMs = timeMs + delayMs;
+            List<KafkaFutureImpl<Long>> futures = waiters.get(triggerTimeMs);
+            if (futures == null) {
+                futures = new ArrayList<>();
+                waiters.put(triggerTimeMs, futures);
+            }
+            futures.add(waiter);
+        }
+    }
+
+    @Override
+    public <T> Future<T> schedule(final ScheduledExecutorService executor,
+                                  final Callable<T> callable, long delayMs) {
+        final KafkaFutureImpl<T> future = new KafkaFutureImpl<>();
+        KafkaFutureImpl<Long> waiter = new KafkaFutureImpl<>();
+        waiter.thenApply(new KafkaFuture.Function<Long, Void>() {
+            @Override
+            public Void apply(final Long now) {
+                executor.submit(new Callable<Void>() {
+                    @Override
+                    public Void call() throws Exception {
+                        // Note: it is possible that we'll execute Callable#call right after
+                        // the future is cancelled.  This is a valid sequence of events
+                        // that the author of the Callable needs to be able to handle.
+                        //
+                        // Note 2: If the future is cancelled, we will not remove the waiter
+                        // from this MockTime object.  This small bit of inefficiency is acceptable
+                        // in testing code (at least we aren't polling!)
+                        if (!future.isCancelled()) {
+                            try {
+                                log.trace("Invoking {} at {}", callable, now);
+                                future.complete(callable.call());
+                            } catch (Throwable throwable) {
+                                future.completeExceptionally(throwable);
+                            }
+                        }
+                        return null;
+                    }
+                });
+                return null;
+            }
+        });
+        log.trace("Scheduling {} for {} ms from now.", callable, delayMs);
+        addWaiter(delayMs, waiter);
+        return future;
+    }
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/4fac83ba/clients/src/test/java/org/apache/kafka/common/utils/MockTime.java
----------------------------------------------------------------------
diff --git a/clients/src/test/java/org/apache/kafka/common/utils/MockTime.java b/clients/src/test/java/org/apache/kafka/common/utils/MockTime.java
index be04aed..011eba2 100644
--- a/clients/src/test/java/org/apache/kafka/common/utils/MockTime.java
+++ b/clients/src/test/java/org/apache/kafka/common/utils/MockTime.java
@@ -16,6 +16,7 @@
  */
 package org.apache.kafka.common.utils;
 
+import java.util.concurrent.CopyOnWriteArrayList;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicLong;
 
@@ -24,6 +25,15 @@ import java.util.concurrent.atomic.AtomicLong;
  */
 public class MockTime implements Time {
 
+    interface MockTimeListener {
+        void tick();
+    }
+
+    /**
+     * Listeners which are waiting for time changes.
+     */
+    private final CopyOnWriteArrayList<MockTimeListener> listeners = new CopyOnWriteArrayList<>();
+
     private final long autoTickMs;
 
     // Values from `nanoTime` and `currentTimeMillis` are not comparable, so we store them separately to allow tests
@@ -45,6 +55,10 @@ public class MockTime implements Time {
         this.autoTickMs = autoTickMs;
     }
 
+    public void addListener(MockTimeListener listener) {
+        listeners.add(listener);
+    }
+
     @Override
     public long milliseconds() {
         maybeSleep(autoTickMs);
@@ -71,6 +85,7 @@ public class MockTime implements Time {
     public void sleep(long ms) {
         timeMs.addAndGet(ms);
         highResTimeNs.addAndGet(TimeUnit.MILLISECONDS.toNanos(ms));
+        tick();
     }
 
     public void setCurrentTimeMs(long newMs) {
@@ -81,5 +96,12 @@ public class MockTime implements Time {
             throw new IllegalArgumentException("Setting the time to " + newMs + " while current time " + oldMs + " is newer; this is not allowed");
 
         highResTimeNs.set(TimeUnit.MILLISECONDS.toNanos(newMs));
+        tick();
+    }
+
+    private void tick() {
+        for (MockTimeListener listener : listeners) {
+            listener.tick();
+        }
     }
 }

http://git-wip-us.apache.org/repos/asf/kafka/blob/4fac83ba/clients/src/test/java/org/apache/kafka/common/utils/MockTimeTest.java
----------------------------------------------------------------------
diff --git a/clients/src/test/java/org/apache/kafka/common/utils/MockTimeTest.java b/clients/src/test/java/org/apache/kafka/common/utils/MockTimeTest.java
new file mode 100644
index 0000000..58bcb19
--- /dev/null
+++ b/clients/src/test/java/org/apache/kafka/common/utils/MockTimeTest.java
@@ -0,0 +1,50 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.common.utils;
+
+import org.junit.Assert;
+import org.junit.Rule;
+import org.junit.rules.Timeout;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.junit.Test;
+
+public class MockTimeTest {
+    private static final Logger log = LoggerFactory.getLogger(MockTimeTest.class);
+
+    @Rule
+    final public Timeout globalTimeout = Timeout.millis(120000);
+
+    @Test
+    public void testAdvanceClock() throws Exception {
+        MockTime time = new MockTime(0, 100, 200);
+        Assert.assertEquals(100, time.milliseconds());
+        Assert.assertEquals(200, time.nanoseconds());
+        time.sleep(1);
+        Assert.assertEquals(101, time.milliseconds());
+        Assert.assertEquals(1000200, time.nanoseconds());
+    }
+
+    @Test
+    public void testAutoTickMs() throws Exception {
+        MockTime time = new MockTime(1, 100, 200);
+        Assert.assertEquals(101, time.milliseconds());
+        Assert.assertEquals(2000200, time.nanoseconds());
+        Assert.assertEquals(103, time.milliseconds());
+        Assert.assertEquals(104, time.milliseconds());
+    }
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/4fac83ba/gradle/findbugs-exclude.xml
----------------------------------------------------------------------
diff --git a/gradle/findbugs-exclude.xml b/gradle/findbugs-exclude.xml
index 9cb1db8..3bc3d39 100644
--- a/gradle/findbugs-exclude.xml
+++ b/gradle/findbugs-exclude.xml
@@ -319,4 +319,12 @@ For a detailed description of findbugs bug categories, see http://findbugs.sourc
         <Method name="main"/>
         <Bug pattern="IL_INFINITE_LOOP"/>
     </Match>
+
+    <Match>
+        <!-- Suppress a spurious warning about calling notify without modifying
+             other state under the monitor. -->
+        <Package name="org.apache.kafka.trogdor.workload"/>
+        <Source name="RoundTripWorker.java"/>
+        <Bug pattern="NN_NAKED_NOTIFY"/>
+    </Match>
 </FindBugsFilter>

http://git-wip-us.apache.org/repos/asf/kafka/blob/4fac83ba/tests/kafkatest/services/trogdor/fault_spec.py
----------------------------------------------------------------------
diff --git a/tests/kafkatest/services/trogdor/fault_spec.py b/tests/kafkatest/services/trogdor/fault_spec.py
deleted file mode 100644
index 9768765..0000000
--- a/tests/kafkatest/services/trogdor/fault_spec.py
+++ /dev/null
@@ -1,45 +0,0 @@
-# Licensed to the Apache Software Foundation (ASF) under one or more
-# contributor license agreements.  See the NOTICE file distributed with
-# this work for additional information regarding copyright ownership.
-# The ASF licenses this file to You under the Apache License, Version 2.0
-# (the "License"); you may not use this file except in compliance with
-# the License.  You may obtain a copy of the License at
-#
-#    http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-
-import json
-
-
-class FaultSpec(object):
-    """
-    The base class for a fault specification.
-
-    MAX_DURATION_MS         The longest duration we should use for a fault specification.
-    """
-
-    MAX_DURATION_MS=10000000
-
-    def __init__(self, start_ms, duration_ms):
-        """
-        Create a new fault specification.
-
-        :param start_ms:        The start time in milliseconds since the epoch.
-        :param duration_ms:     The duration in milliseconds.
-        """
-        self.start_ms = start_ms
-        self.duration_ms = duration_ms
-
-    def message(self):
-        """
-        Return a message suitable for sending to the Trogdor daemon.
-        """
-        raise NotImplemented
-
-    def __str__(self):
-        return json.dumps(self.message())

http://git-wip-us.apache.org/repos/asf/kafka/blob/4fac83ba/tests/kafkatest/services/trogdor/network_partition_fault_spec.py
----------------------------------------------------------------------
diff --git a/tests/kafkatest/services/trogdor/network_partition_fault_spec.py b/tests/kafkatest/services/trogdor/network_partition_fault_spec.py
index deb5c56..91c731e 100644
--- a/tests/kafkatest/services/trogdor/network_partition_fault_spec.py
+++ b/tests/kafkatest/services/trogdor/network_partition_fault_spec.py
@@ -13,10 +13,10 @@
 # See the License for the specific language governing permissions and
 # limitations under the License.
 
-from kafkatest.services.trogdor.fault_spec import FaultSpec
+from kafkatest.services.trogdor.task_spec import TaskSpec
 
 
-class NetworkPartitionFaultSpec(FaultSpec):
+class NetworkPartitionFaultSpec(TaskSpec):
     """
     The specification for a network partition fault.
 
@@ -28,7 +28,7 @@ class NetworkPartitionFaultSpec(FaultSpec):
         """
         Create a new NetworkPartitionFaultSpec.
 
-        :param start_ms:        The start time, as described in fault_spec.py
+        :param start_ms:        The start time, as described in task_spec.py
         :param duration_ms:     The duration in milliseconds.
         :param partitions:      An array of arrays describing the partitions.
                                 The inner arrays may contain either node names,

http://git-wip-us.apache.org/repos/asf/kafka/blob/4fac83ba/tests/kafkatest/services/trogdor/no_op_fault_spec.py
----------------------------------------------------------------------
diff --git a/tests/kafkatest/services/trogdor/no_op_fault_spec.py b/tests/kafkatest/services/trogdor/no_op_fault_spec.py
deleted file mode 100644
index 82e9713..0000000
--- a/tests/kafkatest/services/trogdor/no_op_fault_spec.py
+++ /dev/null
@@ -1,41 +0,0 @@
-# Licensed to the Apache Software Foundation (ASF) under one or more
-# contributor license agreements.  See the NOTICE file distributed with
-# this work for additional information regarding copyright ownership.
-# The ASF licenses this file to You under the Apache License, Version 2.0
-# (the "License"); you may not use this file except in compliance with
-# the License.  You may obtain a copy of the License at
-#
-#    http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-
-from kafkatest.services.trogdor.fault_spec import FaultSpec
-
-
-class NoOpFaultSpec(FaultSpec):
-    """
-    The specification for a nop-op fault.
-
-    No-op faults are used to test the fault injector.  They don't do anything,
-    but must be propagated to all fault injector daemons.
-    """
-
-    def __init__(self, start_ms, duration_ms):
-        """
-        Create a new NoOpFault.
-
-        :param start_ms:        The start time, as described in fault_spec.py
-        :param duration_ms:     The duration in milliseconds.
-        """
-        super(NoOpFaultSpec, self).__init__(start_ms, duration_ms)
-
-    def message(self):
-        return {
-            "class": "org.apache.kafka.trogdor.fault.NoOpFaultSpec",
-            "startMs": self.start_ms,
-            "durationMs": self.duration_ms,
-        }

http://git-wip-us.apache.org/repos/asf/kafka/blob/4fac83ba/tests/kafkatest/services/trogdor/no_op_task_spec.py
----------------------------------------------------------------------
diff --git a/tests/kafkatest/services/trogdor/no_op_task_spec.py b/tests/kafkatest/services/trogdor/no_op_task_spec.py
new file mode 100644
index 0000000..eb75264
--- /dev/null
+++ b/tests/kafkatest/services/trogdor/no_op_task_spec.py
@@ -0,0 +1,41 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+from kafkatest.services.trogdor.task_spec import TaskSpec
+
+
+class NoOpTaskSpec(TaskSpec):
+    """
+    The specification for a nop-op task.
+
+    No-op faults are used to test Trogdor.  They don't do anything,
+    but must be propagated to all Trogdor agents.
+    """
+
+    def __init__(self, start_ms, duration_ms):
+        """
+        Create a new NoOpFault.
+
+        :param start_ms:        The start time, as described in task_spec.py
+        :param duration_ms:     The duration in milliseconds.
+        """
+        super(NoOpTaskSpec, self).__init__(start_ms, duration_ms)
+
+    def message(self):
+        return {
+            "class": "org.apache.kafka.trogdor.task.NoOpTaskSpec",
+            "startMs": self.start_ms,
+            "durationMs": self.duration_ms,
+        }

http://git-wip-us.apache.org/repos/asf/kafka/blob/4fac83ba/tests/kafkatest/services/trogdor/task_spec.py
----------------------------------------------------------------------
diff --git a/tests/kafkatest/services/trogdor/task_spec.py b/tests/kafkatest/services/trogdor/task_spec.py
new file mode 100644
index 0000000..61a080a
--- /dev/null
+++ b/tests/kafkatest/services/trogdor/task_spec.py
@@ -0,0 +1,45 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+import json
+
+
+class TaskSpec(object):
+    """
+    The base class for a task specification.
+
+    MAX_DURATION_MS         The longest duration we should use for a task specification.
+    """
+
+    MAX_DURATION_MS=10000000
+
+    def __init__(self, start_ms, duration_ms):
+        """
+        Create a new task specification.
+
+        :param start_ms:        The target start time in milliseconds since the epoch.
+        :param duration_ms:     The duration in milliseconds.
+        """
+        self.start_ms = start_ms
+        self.duration_ms = duration_ms
+
+    def message(self):
+        """
+        Return a message suitable for sending to the Trogdor daemon.
+        """
+        raise NotImplemented
+
+    def __str__(self):
+        return json.dumps(self.message())

http://git-wip-us.apache.org/repos/asf/kafka/blob/4fac83ba/tests/kafkatest/services/trogdor/trogdor.py
----------------------------------------------------------------------
diff --git a/tests/kafkatest/services/trogdor/trogdor.py b/tests/kafkatest/services/trogdor/trogdor.py
index 8b05e99..a4fcfb5 100644
--- a/tests/kafkatest/services/trogdor/trogdor.py
+++ b/tests/kafkatest/services/trogdor/trogdor.py
@@ -20,6 +20,7 @@ from requests.adapters import HTTPAdapter
 from requests.packages.urllib3 import Retry
 
 from ducktape.services.service import Service
+from ducktape.utils.util import wait_until
 from kafkatest.directory_layout.kafka_path import KafkaPathResolverMixin
 
 
@@ -70,22 +71,29 @@ class TrogdorService(KafkaPathResolverMixin, Service):
             "collect_default": True},
     }
 
-    def __init__(self, context, agent_nodes, agent_port=DEFAULT_AGENT_PORT,
-                 coordinator_port=DEFAULT_COORDINATOR_PORT):
+
+    def __init__(self, context, agent_nodes=None, client_services=None,
+                 agent_port=DEFAULT_AGENT_PORT, coordinator_port=DEFAULT_COORDINATOR_PORT):
         """
         Create a Trogdor service.
 
         :param context:             The test context.
         :param agent_nodes:         The nodes to run the agents on.
+        :param client_services:     Services whose nodes we should run agents on.
         :param agent_port:          The port to use for the trogdor_agent daemons.
         :param coordinator_port:    The port to use for the trogdor_coordinator daemons.
         """
         Service.__init__(self, context, num_nodes=1)
         self.coordinator_node = self.nodes[0]
-        if (len(agent_nodes) == 0):
-            raise RuntimeError("You must supply at least one node to run the service on.")
-        for agent_node in agent_nodes:
-            self.nodes.append(agent_node)
+        if client_services is not None:
+            for client_service in client_services:
+                for node in client_service.nodes:
+                    self.nodes.append(node)
+        if agent_nodes is not None:
+            for agent_node in agent_nodes:
+                self.nodes.append(agent_node)
+        if (len(self.nodes) == 1):
+            raise RuntimeError("You must supply at least one agent node to run the service on.")
         self.agent_port = agent_port
         self.coordinator_port = coordinator_port
 
@@ -108,9 +116,12 @@ class TrogdorService(KafkaPathResolverMixin, Service):
         for node in self.nodes:
             dict_nodes[node.name] = {
                 "hostname": node.account.ssh_hostname,
-                "trogdor.agent.port": self.agent_port,
             }
-        dict_nodes[self.coordinator_node.name]["trogdor.coordinator.port"] = self.coordinator_port
+            if node.name == self.coordinator_node.name:
+                dict_nodes[node.name]["trogdor.coordinator.port"] = self.coordinator_port
+            else:
+                dict_nodes[node.name]["trogdor.agent.port"] = self.agent_port
+
         return {
             "platform": "org.apache.kafka.trogdor.basic.BasicPlatform",
             "nodes": dict_nodes,
@@ -160,7 +171,7 @@ class TrogdorService(KafkaPathResolverMixin, Service):
                 stdout_stderr_capture_path)
         node.account.ssh(cmd)
         with node.account.monitor_log(log_path) as monitor:
-            monitor.wait_until("Starting main service thread.", timeout_sec=30, backoff_sec=.25,
+            monitor.wait_until("Starting %s process." % daemon_name, timeout_sec=60, backoff_sec=.25,
                                err_msg=("%s on %s didn't finish startup" % (daemon_name, node.name)))
 
     def wait_node(self, node, timeout_sec=None):
@@ -194,6 +205,22 @@ class TrogdorService(KafkaPathResolverMixin, Service):
                       HTTPAdapter(max_retries=Retry(total=4, backoff_factor=0.3)))
         return session
 
+    def _coordinator_post(self, path, message):
+        """
+        Make a POST request to the Trogdor coordinator.
+
+        :param path:            The URL path to use.
+        :param message:         The message object to send.
+        :return:                The response as an object.
+        """
+        url = self._coordinator_url(path)
+        self.logger.info("POST %s %s" % (url, message))
+        response = self.request_session().post(url, json=message,
+                                               timeout=TrogdorService.REQUEST_TIMEOUT,
+                                               headers=TrogdorService.REQUEST_HEADERS)
+        response.raise_for_status()
+        return response.json()
+
     def _coordinator_put(self, path, message):
         """
         Make a PUT request to the Trogdor coordinator.
@@ -226,24 +253,33 @@ class TrogdorService(KafkaPathResolverMixin, Service):
         response.raise_for_status()
         return response.json()
 
-    def create_fault(self, id, spec):
+    def create_task(self, id, spec):
         """
-        Create a new fault.
+        Create a new task.
 
-        :param id:          The fault id.
-        :param spec:        The fault spec.
+        :param id:          The task id.
+        :param spec:        The task spec.
         """
-        self._coordinator_put("fault", { "id": id, "spec": spec.message()})
+        self._coordinator_post("task/create", { "id": id, "spec": spec.message()})
+        return TrogdorTask(id, self)
 
-    def get_faults(self):
+    def stop_task(self, id):
         """
-        Get the faults which are on the coordinator.
+        Stop a task.
 
-        :returns:           A map of fault id strings to fault data objects.
-                            Fault data objects contain a 'spec' field with the spec
+        :param id:          The task id.
+        """
+        self._coordinator_put("task/stop", { "id": id })
+
+    def tasks(self):
+        """
+        Get the tasks which are on the coordinator.
+
+        :returns:           A map of task id strings to task state objects.
+                            Task state objects contain a 'spec' field with the spec
                             and a 'state' field with the state.
         """
-        return self._coordinator_get("faults", {})
+        return self._coordinator_get("tasks", {})
 
     def is_coordinator(self, node):
         return node == self.coordinator_node
@@ -253,3 +289,44 @@ class TrogdorService(KafkaPathResolverMixin, Service):
 
     def coordinator_class_name(self):
         return "org.apache.kafka.trogdor.coordinator.Coordinator"
+
+class TrogdorTask(object):
+    PENDING_STATE = "PENDING"
+    RUNNING_STATE = "RUNNING"
+    STOPPING_STATE = "STOPPING"
+    DONE_STATE = "DONE"
+
+    def __init__(self, id, trogdor):
+        self.id = id
+        self.trogdor = trogdor
+
+    def done(self):
+        """
+        Check if this task is done.
+
+        :raises RuntimeError:       If the task encountered an error.
+        :returns:                   True if the task is in DONE_STATE;
+                                    False if it is in a different state.
+        """
+        task_state = self.trogdor.tasks()["tasks"][self.id]
+        if task_state is None:
+            raise RuntimeError("Coordinator did not know about %s." % self.id)
+        error = task_state.get("error")
+        if error is None or error == "":
+            return task_state["state"] == TrogdorTask.DONE_STATE
+        raise RuntimeError("Failed to gracefully stop %s: got task error: %s" % (self.id, error))
+
+    def stop(self):
+        """
+        Stop this task.
+
+        :raises RuntimeError:       If the task encountered an error.
+        """
+        if self.done():
+            return
+        self.trogdor.stop_task(self.id)
+
+    def wait_for_done(self, timeout_sec=360):
+        wait_until(lambda: self.done(),
+                   timeout_sec=timeout_sec,
+                   err_msg="%s failed to finish in the expected amount of time." % self.id)

http://git-wip-us.apache.org/repos/asf/kafka/blob/4fac83ba/tests/kafkatest/tests/tools/trogdor_test.py
----------------------------------------------------------------------
diff --git a/tests/kafkatest/tests/tools/trogdor_test.py b/tests/kafkatest/tests/tools/trogdor_test.py
index 026ecaf..44d00b2 100644
--- a/tests/kafkatest/tests/tools/trogdor_test.py
+++ b/tests/kafkatest/tests/tools/trogdor_test.py
@@ -19,8 +19,8 @@ from ducktape.cluster.cluster_spec import ClusterSpec
 from ducktape.mark.resource import cluster
 from ducktape.tests.test import Test
 from ducktape.utils.util import wait_until
-from kafkatest.services.trogdor.fault_spec import FaultSpec
-from kafkatest.services.trogdor.no_op_fault_spec import NoOpFaultSpec
+from kafkatest.services.trogdor.task_spec import TaskSpec
+from kafkatest.services.trogdor.no_op_task_spec import NoOpTaskSpec
 from kafkatest.services.trogdor.trogdor import TrogdorService
 from kafkatest.utils import node_is_reachable
 
@@ -58,14 +58,15 @@ class TrogdorTest(Test):
         Test that we can bring up Trogdor and create a no-op fault.
         """
         self.set_up_trogdor(3)
-        spec = NoOpFaultSpec(0, FaultSpec.MAX_DURATION_MS)
-        self.trogdor.create_fault("myfault", spec)
-        def check_for_faults():
-            faults = self.trogdor.get_faults()
-            self.logger.info("faults = %s" % faults)
+        spec = NoOpTaskSpec(0, TaskSpec.MAX_DURATION_MS)
+        self.trogdor.create_task("myfault", spec)
+        def check_for_myfault():
+            faults = self.trogdor.tasks()["tasks"]
+            self.logger.info("tasks = %s" % faults)
             return "myfault" in faults
-        wait_until(lambda: check_for_faults,
+        wait_until(lambda: check_for_myfault,
                    timeout_sec=10, backoff_sec=.2, err_msg="Failed to read back myfault.")
+        self.trogdor.stop_task("myfault")
 
     @cluster(num_nodes=4)
     def test_network_partition_fault(self):
@@ -73,12 +74,12 @@ class TrogdorTest(Test):
         Test that the network partition fault results in a true network partition between nodes.
         """
         self.set_up_trogdor(3)
-        spec = NetworkPartitionFaultSpec(0, FaultSpec.MAX_DURATION_MS,
+        spec = NetworkPartitionFaultSpec(0, TaskSpec.MAX_DURATION_MS,
                                             [[self.agent_nodes[0]], self.agent_nodes[1:]])
         assert 2 == len(spec.partitions)
         assert [self.agent_nodes[0].name] == spec.partitions[0]
         assert [self.agent_nodes[1].name, self.agent_nodes[2].name] == spec.partitions[1]
-        self.trogdor.create_fault("partition0", spec)
+        self.trogdor.create_task("partition0", spec)
         def verify_nodes_partitioned():
             if node_is_reachable(self.agent_nodes[0], self.agent_nodes[1]):
                 return False

http://git-wip-us.apache.org/repos/asf/kafka/blob/4fac83ba/tools/src/main/java/org/apache/kafka/trogdor/agent/Agent.java
----------------------------------------------------------------------
diff --git a/tools/src/main/java/org/apache/kafka/trogdor/agent/Agent.java b/tools/src/main/java/org/apache/kafka/trogdor/agent/Agent.java
index 0ddf4c1..43334a1 100644
--- a/tools/src/main/java/org/apache/kafka/trogdor/agent/Agent.java
+++ b/tools/src/main/java/org/apache/kafka/trogdor/agent/Agent.java
@@ -22,68 +22,37 @@ import net.sourceforge.argparse4j.inf.ArgumentParser;
 import net.sourceforge.argparse4j.inf.ArgumentParserException;
 import net.sourceforge.argparse4j.inf.Namespace;
 import org.apache.kafka.common.utils.Exit;
-import org.apache.kafka.common.utils.KafkaThread;
-import org.apache.kafka.common.utils.Time;
+import org.apache.kafka.common.utils.Scheduler;
 import org.apache.kafka.trogdor.common.Node;
 import org.apache.kafka.trogdor.common.Platform;
-import org.apache.kafka.trogdor.fault.Fault;
-import org.apache.kafka.trogdor.fault.FaultSet;
-import org.apache.kafka.trogdor.fault.FaultSpec;
-import org.apache.kafka.trogdor.fault.RunningState;
-import org.apache.kafka.trogdor.rest.AgentFaultsResponse;
-import org.apache.kafka.trogdor.rest.CreateAgentFaultRequest;
+import org.apache.kafka.trogdor.rest.AgentStatusResponse;
+import org.apache.kafka.trogdor.rest.CreateWorkerRequest;
+import org.apache.kafka.trogdor.rest.CreateWorkerResponse;
 import org.apache.kafka.trogdor.rest.JsonRestServer;
+import org.apache.kafka.trogdor.rest.StopWorkerRequest;
+import org.apache.kafka.trogdor.rest.StopWorkerResponse;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import java.util.ArrayList;
-import java.util.Iterator;
-import java.util.List;
-import java.util.Map;
-import java.util.TreeMap;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.locks.Condition;
-import java.util.concurrent.locks.ReentrantLock;
-
 import static net.sourceforge.argparse4j.impl.Arguments.store;
 
 /**
  * The Trogdor agent.
  *
- * The agent process implements faults directly.
+ * The agent process runs tasks.
  */
 public final class Agent {
     private static final Logger log = LoggerFactory.getLogger(Agent.class);
 
     /**
-     * The clock to use for this agent.
-     */
-    private final Time time;
-
-    /**
      * The time at which this server was started.
      */
-    private final long startTimeMs;
-
-    /**
-     * The platform.
-     */
-    private final Platform platform;
+    private final long serverStartMs;
 
     /**
-     * The lock protecting shutdown and faultSet.
+     * The WorkerManager.
      */
-    private final ReentrantLock lock = new ReentrantLock();
-
-    /**
-     * The condition variable which the agent thread waits on.
-     */
-    private final Condition cond = lock.newCondition();
-
-    /**
-     * The agent runnable.
-     */
-    private final AgentRunnable runnable;
+    private final WorkerManager workerManager;
 
     /**
      * The REST server.
@@ -91,162 +60,18 @@ public final class Agent {
     private final JsonRestServer restServer;
 
     /**
-     * The agent thread.
-     */
-    private final KafkaThread thread;
-
-    /**
-     * The set of pending faults.
-     */
-    private final FaultSet pendingFaults = new FaultSet();
-
-    /**
-     * The set of faults which are running.
-     */
-    private final FaultSet runningFaults = new FaultSet();
-
-    /**
-     * The set of faults which are done.
-     */
-    private final FaultSet doneFaults = new FaultSet();
-
-    /**
-     * True if the server is shutting down.
-     */
-    private boolean shutdown = false;
-
-    class AgentRunnable implements Runnable {
-        @Override
-        public void run() {
-            log.info("Starting main service thread.");
-            try {
-                while (true) {
-                    List<Fault> toStart = new ArrayList<>();
-                    List<Fault> started = new ArrayList<>();
-                    List<Fault> toEnd = new ArrayList<>();
-                    List<Fault> ended = new ArrayList<>();
-                    long now = time.milliseconds();
-                    long nextWakeMs = now + (60L * 60L * 1000L);
-                    lock.lock();
-                    try {
-                        Iterator<Fault> pending = pendingFaults.iterateByStart();
-                        while (pending.hasNext()) {
-                            Fault fault = pending.next();
-                            toStart.add(fault);
-                            long endMs = fault.spec().startMs() + fault.spec().durationMs();
-                            nextWakeMs = Math.min(nextWakeMs, endMs);
-                            pending.remove();
-                        }
-                        Iterator<Fault> running = runningFaults.iterateByEnd();
-                        while (running.hasNext()) {
-                            Fault fault = running.next();
-                            RunningState state = (RunningState) fault.state();
-                            long endMs = state.startedMs() + fault.spec().durationMs();
-                            if (now < endMs) {
-                                nextWakeMs = Math.min(nextWakeMs, endMs);
-                                break;
-                            }
-                            toEnd.add(fault);
-                            running.remove();
-                        }
-                    } finally {
-                        lock.unlock();
-                    }
-                    for (Fault fault: toStart) {
-                        try {
-                            log.debug("Activating fault " + fault);
-                            fault.activate(now, platform);
-                            started.add(fault);
-                        } catch (Throwable e) {
-                            log.error("Error activating fault " + fault.id(), e);
-                            ended.add(fault);
-                        }
-                    }
-                    for (Fault fault: toEnd) {
-                        try {
-                            log.debug("Deactivating fault " + fault);
-                            fault.deactivate(now, platform);
-                        } catch (Throwable e) {
-                            log.error("Error deactivating fault " + fault.id(), e);
-                        } finally {
-                            ended.add(fault);
-                        }
-                    }
-                    lock.lock();
-                    try {
-                        for (Fault fault : started) {
-                            runningFaults.add(fault);
-                        }
-                        for (Fault fault : ended) {
-                            doneFaults.add(fault);
-                        }
-                        if (shutdown) {
-                            return;
-                        }
-                        if (nextWakeMs > now) {
-                            log.trace("Sleeping for {} ms", nextWakeMs - now);
-                            if (cond.await(nextWakeMs - now, TimeUnit.MILLISECONDS)) {
-                                log.trace("AgentRunnable woke up early");
-                            }
-                        }
-                        if (shutdown) {
-                            return;
-                        }
-                    } finally {
-                        lock.unlock();
-                    }
-                }
-            } catch (Throwable t) {
-                log.error("Unhandled exception in AgentRunnable", t);
-            } finally {
-                log.info("AgentRunnable shutting down.");
-                restServer.stop();
-                int numDeactivated = deactivateRunningFaults();
-                log.info("AgentRunnable deactivated {} fault(s).", numDeactivated);
-            }
-        }
-    }
-
-    private int deactivateRunningFaults() {
-        long now = time.milliseconds();
-        int numDeactivated = 0;
-        lock.lock();
-        try {
-            for (Iterator<Fault> iter = runningFaults.iterateByStart(); iter.hasNext(); ) {
-                Fault fault = iter.next();
-                try {
-                    numDeactivated++;
-                    iter.remove();
-                    fault.deactivate(now, platform);
-                } catch (Exception e) {
-                    log.error("Got exception while deactivating {}", fault, e);
-                } finally {
-                    doneFaults.add(fault);
-                }
-            }
-        } finally {
-            lock.unlock();
-        }
-        return numDeactivated;
-    }
-
-    /**
      * Create a new Agent.
      *
      * @param platform      The platform object to use.
-     * @param time          The timekeeper to use for this Agent.
+     * @param scheduler     The scheduler to use for this Agent.
      * @param restServer    The REST server to use.
      * @param resource      The AgentRestResoure to use.
      */
-    public Agent(Platform platform, Time time, JsonRestServer restServer,
-                 AgentRestResource resource) {
-        this.platform = platform;
-        this.time = time;
+    public Agent(Platform platform, Scheduler scheduler,
+                 JsonRestServer restServer, AgentRestResource resource) {
+        this.serverStartMs = scheduler.time().milliseconds();
+        this.workerManager = new WorkerManager(platform, scheduler);
         this.restServer = restServer;
-        this.startTimeMs = time.milliseconds();
-        this.runnable = new AgentRunnable();
-        this.thread = new KafkaThread("TrogdorAgentThread", runnable, false);
-        this.thread.start();
         resource.setAgent(this);
     }
 
@@ -254,64 +79,27 @@ public final class Agent {
         return this.restServer.port();
     }
 
-    public void beginShutdown() {
-        lock.lock();
-        try {
-            if (shutdown)
-                return;
-            this.shutdown = true;
-            cond.signalAll();
-        } finally {
-            lock.unlock();
-        }
+    public void beginShutdown() throws Exception {
+        restServer.beginShutdown();
+        workerManager.beginShutdown();
     }
 
-    public void waitForShutdown() {
-        try {
-            this.thread.join();
-        } catch (InterruptedException e) {
-            log.error("Interrupted while waiting for thread shutdown", e);
-            Thread.currentThread().interrupt();
-        }
+    public void waitForShutdown() throws Exception {
+        restServer.waitForShutdown();
+        workerManager.waitForShutdown();
     }
 
-    public long startTimeMs() {
-        return startTimeMs;
+    public AgentStatusResponse status() throws Exception {
+        return new AgentStatusResponse(serverStartMs, workerManager.workerStates());
     }
 
-    public AgentFaultsResponse faults() {
-        Map<String, AgentFaultsResponse.FaultData> faultData = new TreeMap<>();
-        lock.lock();
-        try {
-            updateFaultsResponse(faultData, pendingFaults);
-            updateFaultsResponse(faultData, runningFaults);
-            updateFaultsResponse(faultData, doneFaults);
-        } finally {
-            lock.unlock();
-        }
-        return new AgentFaultsResponse(faultData);
+    public CreateWorkerResponse createWorker(CreateWorkerRequest req) throws Exception {
+        workerManager.createWorker(req.id(), req.spec());
+        return new CreateWorkerResponse(req.spec());
     }
 
-    private void updateFaultsResponse(Map<String, AgentFaultsResponse.FaultData> faultData,
-                                      FaultSet faultSet) {
-        for (Iterator<Fault> iter = faultSet.iterateByStart();
-                iter.hasNext(); ) {
-            Fault fault = iter.next();
-            AgentFaultsResponse.FaultData data =
-                new AgentFaultsResponse.FaultData(fault.spec(), fault.state());
-            faultData.put(fault.id(), data);
-        }
-    }
-
-    public void createFault(CreateAgentFaultRequest request) throws ClassNotFoundException {
-        lock.lock();
-        try {
-            Fault fault = FaultSpec.Util.createFault(request.id(), request.spec());
-            pendingFaults.add(fault);
-            cond.signalAll();
-        } finally {
-            lock.unlock();
-        }
+    public StopWorkerResponse stopWorker(StopWorkerRequest req) throws Exception {
+        return new StopWorkerResponse(workerManager.stopWorker(req.id()));
     }
 
     public static void main(String[] args) throws Exception {
@@ -352,14 +140,19 @@ public final class Agent {
         JsonRestServer restServer =
             new JsonRestServer(Node.Util.getTrogdorAgentPort(platform.curNode()));
         AgentRestResource resource = new AgentRestResource();
-        final Agent agent = new Agent(platform, Time.SYSTEM, restServer, resource);
+        log.info("Starting agent process.");
+        final Agent agent = new Agent(platform, Scheduler.SYSTEM, restServer, resource);
         restServer.start(resource);
         Runtime.getRuntime().addShutdownHook(new Thread() {
             @Override
             public void run() {
-                log.error("Running shutdown hook...");
-                agent.beginShutdown();
-                agent.waitForShutdown();
+                log.warn("Running agent shutdown hook.");
+                try {
+                    agent.beginShutdown();
+                    agent.waitForShutdown();
+                } catch (Exception e) {
+                    log.error("Got exception while running agent shutdown hook.", e);
+                }
             }
         });
         agent.waitForShutdown();

http://git-wip-us.apache.org/repos/asf/kafka/blob/4fac83ba/tools/src/main/java/org/apache/kafka/trogdor/agent/AgentClient.java
----------------------------------------------------------------------
diff --git a/tools/src/main/java/org/apache/kafka/trogdor/agent/AgentClient.java b/tools/src/main/java/org/apache/kafka/trogdor/agent/AgentClient.java
index 928820b..9c06591 100644
--- a/tools/src/main/java/org/apache/kafka/trogdor/agent/AgentClient.java
+++ b/tools/src/main/java/org/apache/kafka/trogdor/agent/AgentClient.java
@@ -25,12 +25,14 @@ import net.sourceforge.argparse4j.inf.MutuallyExclusiveGroup;
 import net.sourceforge.argparse4j.inf.Namespace;
 import org.apache.kafka.common.utils.Exit;
 import org.apache.kafka.trogdor.common.JsonUtil;
-import org.apache.kafka.trogdor.rest.AgentFaultsResponse;
 import org.apache.kafka.trogdor.rest.AgentStatusResponse;
-import org.apache.kafka.trogdor.rest.CreateAgentFaultRequest;
+import org.apache.kafka.trogdor.rest.CreateWorkerRequest;
+import org.apache.kafka.trogdor.rest.CreateWorkerResponse;
 import org.apache.kafka.trogdor.rest.Empty;
 import org.apache.kafka.trogdor.rest.JsonRestServer;
 import org.apache.kafka.trogdor.rest.JsonRestServer.HttpResponse;
+import org.apache.kafka.trogdor.rest.StopWorkerRequest;
+import org.apache.kafka.trogdor.rest.StopWorkerResponse;
 
 import static net.sourceforge.argparse4j.impl.Arguments.store;
 import static net.sourceforge.argparse4j.impl.Arguments.storeTrue;
@@ -40,51 +42,64 @@ import static net.sourceforge.argparse4j.impl.Arguments.storeTrue;
  */
 public class AgentClient {
     /**
+     * The maximum number of tries to make.
+     */
+    private final int maxTries;
+
+    /**
      * The URL target.
      */
     private final String target;
 
-    public AgentClient(String host, int port) {
-        this(String.format("%s:%d", host, port));
+    public AgentClient(int maxTries, String host, int port) {
+        this(maxTries, String.format("%s:%d", host, port));
     }
 
-    public AgentClient(String target) {
+    public AgentClient(int maxTries, String target) {
         this.target = target;
+        this.maxTries = maxTries;
     }
 
     public String target() {
         return target;
     }
 
+    public int maxTries() {
+        return maxTries;
+    }
+
     private String url(String suffix) {
         return String.format("http://%s%s", target, suffix);
     }
 
-    public AgentStatusResponse getStatus() throws Exception {
+    public AgentStatusResponse status() throws Exception {
         HttpResponse<AgentStatusResponse> resp =
             JsonRestServer.<AgentStatusResponse>httpRequest(url("/agent/status"), "GET",
-                null, new TypeReference<AgentStatusResponse>() { });
+                null, new TypeReference<AgentStatusResponse>() { }, maxTries);
         return resp.body();
     }
 
-    public AgentFaultsResponse getFaults() throws Exception {
-        HttpResponse<AgentFaultsResponse> resp =
-            JsonRestServer.<AgentFaultsResponse>httpRequest(url("/agent/faults"), "GET",
-                null, new TypeReference<AgentFaultsResponse>() { });
+    public CreateWorkerResponse createWorker(CreateWorkerRequest request) throws Exception {
+        HttpResponse<CreateWorkerResponse> resp =
+            JsonRestServer.<CreateWorkerResponse>httpRequest(
+                url("/agent/worker/create"), "POST",
+                request, new TypeReference<CreateWorkerResponse>() { }, maxTries);
         return resp.body();
     }
 
-    public void putFault(CreateAgentFaultRequest request) throws Exception {
-        HttpResponse<AgentFaultsResponse> resp =
-            JsonRestServer.<AgentFaultsResponse>httpRequest(url("/agent/fault"), "PUT",
-                request, new TypeReference<AgentFaultsResponse>() { });
-        resp.body();
+    public StopWorkerResponse stopWorker(StopWorkerRequest request) throws Exception {
+        HttpResponse<StopWorkerResponse> resp =
+            JsonRestServer.<StopWorkerResponse>httpRequest(url(
+                "/agent/worker/stop"), "PUT",
+                request, new TypeReference<StopWorkerResponse>() { }, maxTries);
+        return resp.body();
     }
 
     public void invokeShutdown() throws Exception {
         HttpResponse<Empty> resp =
-            JsonRestServer.<Empty>httpRequest(url("/agent/shutdown"), "PUT",
-                null, new TypeReference<Empty>() { });
+            JsonRestServer.<Empty>httpRequest(url(
+                "/agent/shutdown"), "PUT",
+                null, new TypeReference<Empty>() { }, maxTries);
         resp.body();
     }
 
@@ -106,16 +121,17 @@ public class AgentClient {
             .type(Boolean.class)
             .dest("status")
             .help("Get agent status.");
-        actions.addArgument("--get-faults")
-            .action(storeTrue())
-            .type(Boolean.class)
-            .dest("get_faults")
-            .help("Get agent faults.");
-        actions.addArgument("--create-fault")
+        actions.addArgument("--create-worker")
+            .action(store())
+            .type(String.class)
+            .dest("create_worker")
+            .metavar("SPEC_JSON")
+            .help("Create a new fault.");
+        actions.addArgument("--stop-worker")
             .action(store())
             .type(String.class)
-            .dest("create_fault")
-            .metavar("FAULT_JSON")
+            .dest("stop_worker")
+            .metavar("SPEC_JSON")
             .help("Create a new fault.");
         actions.addArgument("--shutdown")
             .action(storeTrue())
@@ -136,16 +152,14 @@ public class AgentClient {
             }
         }
         String target = res.getString("target");
-        AgentClient client = new AgentClient(target);
+        AgentClient client = new AgentClient(3, target);
         if (res.getBoolean("status")) {
             System.out.println("Got agent status: " +
-                JsonUtil.toPrettyJsonString(client.getStatus()));
-        } else if (res.getBoolean("get_faults")) {
-            System.out.println("Got agent faults: " +
-                JsonUtil.toPrettyJsonString(client.getFaults()));
-        } else if (res.getString("create_fault") != null) {
-            client.putFault(JsonUtil.JSON_SERDE.readValue(res.getString("create_fault"),
-                CreateAgentFaultRequest.class));
+                JsonUtil.toPrettyJsonString(client.status()));
+        } else if (res.getString("create_worker") != null) {
+            client.createWorker(JsonUtil.JSON_SERDE.
+                readValue(res.getString("create_worker"),
+                    CreateWorkerRequest.class));
             System.out.println("Created fault.");
         } else if (res.getBoolean("shutdown")) {
             client.invokeShutdown();

http://git-wip-us.apache.org/repos/asf/kafka/blob/4fac83ba/tools/src/main/java/org/apache/kafka/trogdor/agent/AgentRestResource.java
----------------------------------------------------------------------
diff --git a/tools/src/main/java/org/apache/kafka/trogdor/agent/AgentRestResource.java b/tools/src/main/java/org/apache/kafka/trogdor/agent/AgentRestResource.java
index 690fa68..773c580 100644
--- a/tools/src/main/java/org/apache/kafka/trogdor/agent/AgentRestResource.java
+++ b/tools/src/main/java/org/apache/kafka/trogdor/agent/AgentRestResource.java
@@ -16,14 +16,17 @@
  */
 package org.apache.kafka.trogdor.agent;
 
-import org.apache.kafka.trogdor.rest.AgentFaultsResponse;
 import org.apache.kafka.trogdor.rest.AgentStatusResponse;
-import org.apache.kafka.trogdor.rest.CreateAgentFaultRequest;
+import org.apache.kafka.trogdor.rest.CreateWorkerRequest;
+import org.apache.kafka.trogdor.rest.CreateWorkerResponse;
 import org.apache.kafka.trogdor.rest.Empty;
+import org.apache.kafka.trogdor.rest.StopWorkerRequest;
+import org.apache.kafka.trogdor.rest.StopWorkerResponse;
 
 import javax.servlet.ServletContext;
 import javax.ws.rs.Consumes;
 import javax.ws.rs.GET;
+import javax.ws.rs.POST;
 import javax.ws.rs.PUT;
 import javax.ws.rs.Path;
 import javax.ws.rs.Produces;
@@ -47,20 +50,19 @@ public class AgentRestResource {
     @GET
     @Path("/status")
     public AgentStatusResponse getStatus() throws Throwable {
-        return new AgentStatusResponse(agent().startTimeMs());
+        return agent().status();
     }
 
-    @GET
-    @Path("/faults")
-    public AgentFaultsResponse getAgentFaults() throws Throwable {
-        return agent().faults();
+    @POST
+    @Path("/worker/create")
+    public CreateWorkerResponse createWorker(CreateWorkerRequest req) throws Throwable {
+        return agent().createWorker(req);
     }
 
     @PUT
-    @Path("/fault")
-    public Empty putAgentFault(CreateAgentFaultRequest request) throws Throwable {
-        agent().createFault(request);
-        return Empty.INSTANCE;
+    @Path("/worker/stop")
+    public StopWorkerResponse stopWorker(StopWorkerRequest req) throws Throwable {
+        return agent().stopWorker(req);
     }
 
     @PUT

http://git-wip-us.apache.org/repos/asf/kafka/blob/4fac83ba/tools/src/main/java/org/apache/kafka/trogdor/agent/WorkerManager.java
----------------------------------------------------------------------
diff --git a/tools/src/main/java/org/apache/kafka/trogdor/agent/WorkerManager.java b/tools/src/main/java/org/apache/kafka/trogdor/agent/WorkerManager.java
new file mode 100644
index 0000000..3c03e1e
--- /dev/null
+++ b/tools/src/main/java/org/apache/kafka/trogdor/agent/WorkerManager.java
@@ -0,0 +1,574 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.trogdor.agent;
+
+import org.apache.kafka.common.KafkaException;
+import org.apache.kafka.common.KafkaFuture;
+import org.apache.kafka.common.internals.KafkaFutureImpl;
+import org.apache.kafka.common.utils.Scheduler;
+import org.apache.kafka.common.utils.Time;
+import org.apache.kafka.trogdor.common.Platform;
+import org.apache.kafka.trogdor.common.ThreadUtils;
+import org.apache.kafka.trogdor.rest.WorkerDone;
+import org.apache.kafka.trogdor.rest.WorkerRunning;
+import org.apache.kafka.trogdor.rest.WorkerStarting;
+import org.apache.kafka.trogdor.rest.WorkerStopping;
+import org.apache.kafka.trogdor.rest.WorkerState;
+import org.apache.kafka.trogdor.task.TaskSpec;
+import org.apache.kafka.trogdor.task.TaskWorker;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.TreeMap;
+import java.util.concurrent.Callable;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicReference;
+
+public final class WorkerManager {
+    private static final Logger log = LoggerFactory.getLogger(WorkerManager.class);
+
+    /**
+     * The platform to use.
+     */
+    private final Platform platform;
+
+    /**
+     * The name of this node.
+     */
+    private final String nodeName;
+
+    /**
+     * The scheduler to use.
+     */
+    private final Scheduler scheduler;
+
+    /**
+     * The clock to use.
+     */
+    private final Time time;
+
+    /**
+     * A map of task IDs to Work objects.
+     */
+    private final Map<String, Worker> workers;
+
+    /**
+     * An ExecutorService used to schedule events in the future.
+     */
+    private final ScheduledExecutorService stateChangeExecutor;
+
+    /**
+     * An ExecutorService used to clean up TaskWorkers.
+     */
+    private final ScheduledExecutorService workerCleanupExecutor;
+
+    /**
+     * An ExecutorService to help with shutting down.
+     */
+    private final ScheduledExecutorService shutdownExecutor;
+
+    /**
+     * The shutdown manager.
+     */
+    private final ShutdownManager shutdownManager = new ShutdownManager();
+
+    /**
+     * The shutdown manager handles shutting down gracefully.
+     *
+     * We can shut down gracefully only when all the references handed out
+     * by the ShutdownManager has been closed, and the shutdown bit has
+     * been set.  RPC operations hold a reference for the duration of their
+     * execution, and so do Workers which have not been shut down.
+     * This prevents us from shutting down in the middle of an RPC, or with
+     * workers which are still running.
+     */
+    static class ShutdownManager {
+        private boolean shutdown = false;
+        private long refCount = 0;
+
+        class Reference implements AutoCloseable {
+            AtomicBoolean closed = new AtomicBoolean(false);
+
+            @Override
+            public void close() {
+                if (closed.compareAndSet(false, true)) {
+                    synchronized (ShutdownManager.this) {
+                        refCount--;
+                        if (shutdown && (refCount == 0)) {
+                            ShutdownManager.this.notifyAll();
+                        }
+                    }
+                }
+            }
+        }
+
+        synchronized Reference takeReference() {
+            if (shutdown) {
+                throw new KafkaException("WorkerManager is shut down.");
+            }
+            refCount++;
+            return new Reference();
+        }
+
+        synchronized boolean shutdown() {
+            if (shutdown) {
+                return false;
+            }
+            shutdown = true;
+            return true;
+        }
+
+        synchronized void waitForQuiescence() throws InterruptedException {
+            while ((!shutdown) || (refCount > 0)) {
+                wait();
+            }
+        }
+    }
+
+    WorkerManager(Platform platform, Scheduler scheduler) {
+        this.platform = platform;
+        this.nodeName = platform.curNode().name();
+        this.scheduler = scheduler;
+        this.time = scheduler.time();
+        this.workers = new HashMap<>();
+        this.stateChangeExecutor = Executors.newSingleThreadScheduledExecutor(
+                ThreadUtils.createThreadFactory("WorkerManagerStateThread", false));
+        this.workerCleanupExecutor = Executors.newScheduledThreadPool(1,
+            ThreadUtils.createThreadFactory("WorkerCleanupThread%d", false));
+        this.shutdownExecutor = Executors.newScheduledThreadPool(0,
+            ThreadUtils.createThreadFactory("WorkerManagerShutdownThread%d", false));
+    }
+
+    enum State {
+        STARTING,
+        CANCELLING,
+        RUNNING,
+        STOPPING,
+        DONE,
+    }
+
+    /**
+     * A worker which is being tracked.
+     */
+    class Worker {
+        /**
+         * The task ID.
+         */
+        private final String id;
+
+        /**
+         * The task specification.
+         */
+        private final TaskSpec spec;
+
+        /**
+         * The work which this worker is performing.
+         */
+        private final TaskWorker taskWorker;
+
+        /**
+         * The worker status.
+         */
+        private final AtomicReference<String> status = new AtomicReference<>("");
+
+        /**
+         * The time when this task was started.
+         */
+        private final long startedMs;
+
+        /**
+         * The work state.
+         */
+        private State state = State.STARTING;
+
+        /**
+         * The time when this task was completed, or -1 if it has not been.
+         */
+        private long doneMs = -1;
+
+        /**
+         * The worker error.
+         */
+        private String error = "";
+
+        /**
+         * If there is a task timeout scheduled, this is a future which can
+         * be used to cancel it.
+         */
+        private Future<TaskSpec> timeoutFuture = null;
+
+        /**
+         * A shutdown manager reference which will keep the WorkerManager
+         * alive for as long as this worker is alive.
+         */
+        private ShutdownManager.Reference reference;
+
+        Worker(String id, TaskSpec spec, long now) {
+            this.id = id;
+            this.spec = spec;
+            this.taskWorker = spec.newTaskWorker(id);
+            this.startedMs = now;
+            this.reference = shutdownManager.takeReference();
+        }
+
+        String id() {
+            return id;
+        }
+
+        TaskSpec spec() {
+            return spec;
+        }
+
+        WorkerState state() {
+            switch (state) {
+                case STARTING:
+                    return new WorkerStarting(spec);
+                case RUNNING:
+                    return new WorkerRunning(spec, startedMs, status.get());
+                case CANCELLING:
+                case STOPPING:
+                    return new WorkerStopping(spec, startedMs, status.get());
+                case DONE:
+                    return new WorkerDone(spec, startedMs, doneMs, status.get(), error);
+            }
+            throw new RuntimeException("unreachable");
+        }
+
+        void transitionToRunning() {
+            state = State.RUNNING;
+            timeoutFuture = scheduler.schedule(stateChangeExecutor,
+                new StopWorker(id), spec.durationMs());
+        }
+
+        void transitionToStopping() {
+            state = State.STOPPING;
+            if (timeoutFuture != null) {
+                timeoutFuture.cancel(false);
+                timeoutFuture = null;
+            }
+            workerCleanupExecutor.submit(new CleanupWorker(this));
+        }
+
+        void transitionToDone() {
+            state = State.DONE;
+            doneMs = time.milliseconds();
+            if (reference != null) {
+                reference.close();
+                reference = null;
+            }
+        }
+    }
+
+    public void createWorker(final String id, TaskSpec spec) throws Exception {
+        try (ShutdownManager.Reference ref = shutdownManager.takeReference()) {
+            final Worker worker = stateChangeExecutor.
+                submit(new CreateWorker(id, spec, time.milliseconds())).get();
+            if (worker == null) {
+                log.info("{}: Ignoring request to create worker {}, because there is already " +
+                    "a worker with that id.", nodeName, id);
+                return;
+            }
+            KafkaFutureImpl<String> haltFuture = new KafkaFutureImpl<>();
+            haltFuture.thenApply(new KafkaFuture.Function<String, Void>() {
+                @Override
+                public Void apply(String errorString) {
+                    if (errorString.isEmpty()) {
+                        log.info("{}: Worker {} is halting.", nodeName, id);
+                    } else {
+                        log.info("{}: Worker {} is halting with error {}", nodeName, id, errorString);
+                    }
+                    stateChangeExecutor.submit(
+                        new HandleWorkerHalting(worker, errorString, false));
+                    return null;
+                }
+            });
+            try {
+                worker.taskWorker.start(platform, worker.status, haltFuture);
+            } catch (Exception e) {
+                stateChangeExecutor.submit(new HandleWorkerHalting(worker,
+                    "worker.start() exception: " + e.getMessage(), true));
+            }
+            stateChangeExecutor.submit(new FinishCreatingWorker(worker));
+        }
+    }
+
+    /**
+     * Handles a request to create a new worker.  Processed by the state change thread.
+     */
+    class CreateWorker implements Callable<Worker> {
+        private final String id;
+        private final TaskSpec spec;
+        private final long now;
+
+        CreateWorker(String id, TaskSpec spec, long now) {
+            this.id = id;
+            this.spec = spec;
+            this.now = now;
+        }
+
+        @Override
+        public Worker call() throws Exception {
+            Worker worker = workers.get(id);
+            if (worker != null) {
+                log.info("{}: Task ID {} is already in use.", nodeName, id);
+                return null;
+            }
+            worker = new Worker(id, spec, now);
+            workers.put(id, worker);
+            log.info("{}: Created a new worker for task {} with spec {}", nodeName, id, spec);
+            return worker;
+        }
+    }
+
+    /**
+     * Finish creating a Worker.  Processed by the state change thread.
+     */
+    class FinishCreatingWorker implements Callable<Void> {
+        private final Worker worker;
+
+        FinishCreatingWorker(Worker worker) {
+            this.worker = worker;
+        }
+
+        @Override
+        public Void call() throws Exception {
+            switch (worker.state) {
+                case CANCELLING:
+                    log.info("{}: Worker {} was cancelled while it was starting up.  " +
+                        "Transitioning to STOPPING.", nodeName, worker.id);
+                    worker.transitionToStopping();
+                    break;
+                case STARTING:
+                    log.info("{}: Worker {} is now RUNNING.  Scheduled to stop in {} ms.",
+                        nodeName, worker.id, worker.spec.durationMs());
+                    worker.transitionToRunning();
+                    break;
+                default:
+                    break;
+            }
+            return null;
+        }
+    }
+
+    /**
+     * Handles a worker halting.  Processed by the state change thread.
+     */
+    class HandleWorkerHalting implements Callable<Void> {
+        private final Worker worker;
+        private final String failure;
+        private final boolean startupHalt;
+
+        HandleWorkerHalting(Worker worker, String failure, boolean startupHalt) {
+            this.worker = worker;
+            this.failure = failure;
+            this.startupHalt = startupHalt;
+        }
+
+        @Override
+        public Void call() throws Exception {
+            if (worker.error.isEmpty()) {
+                worker.error = failure;
+            }
+            String verb = (worker.error.isEmpty()) ? "halting" :
+                "halting with error [" + worker.error + "]";
+            switch (worker.state) {
+                case STARTING:
+                    if (startupHalt) {
+                        log.info("{}: Worker {} {} during startup.  Transitioning to DONE.",
+                            nodeName, worker.id, verb);
+                        worker.transitionToDone();
+                    } else {
+                        log.info("{}: Worker {} {} during startup.  Transitioning to CANCELLING.",
+                            nodeName, worker.id, verb);
+                        worker.state = State.CANCELLING;
+                    }
+                    break;
+                case CANCELLING:
+                    log.info("{}: Cancelling worker {} {}.  ",
+                            nodeName, worker.id, verb);
+                    break;
+                case RUNNING:
+                    log.info("{}: Running worker {} {}.  Transitioning to STOPPING.",
+                        nodeName, worker.id, verb);
+                    worker.transitionToStopping();
+                    break;
+                case STOPPING:
+                    log.info("{}: Stopping worker {} {}.", nodeName, worker.id, verb);
+                    break;
+                case DONE:
+                    log.info("{}: Can't halt worker {} because it is already DONE.",
+                        nodeName, worker.id);
+                    break;
+            }
+            return null;
+        }
+    }
+
+    /**
+     * Transitions a worker to WorkerDone.  Processed by the state change thread.
+     */
+    static class CompleteWorker implements Callable<Void> {
+        private final Worker worker;
+
+        private final String failure;
+
+        CompleteWorker(Worker worker, String failure) {
+            this.worker = worker;
+            this.failure = failure;
+        }
+
+        @Override
+        public Void call() throws Exception {
+            if (worker.error.isEmpty() && !failure.isEmpty()) {
+                worker.error = failure;
+            }
+            worker.transitionToDone();
+            return null;
+        }
+    }
+
+    public TaskSpec stopWorker(String id) throws Exception {
+        try (ShutdownManager.Reference ref = shutdownManager.takeReference()) {
+            TaskSpec taskSpec = stateChangeExecutor.submit(new StopWorker(id)).get();
+            if (taskSpec == null) {
+                throw new KafkaException("No task found with id " + id);
+            }
+            return taskSpec;
+        }
+    }
+
+    /**
+     * Stops a worker.  Processed by the state change thread.
+     */
+    class StopWorker implements Callable<TaskSpec> {
+        private final String id;
+
+        StopWorker(String id) {
+            this.id = id;
+        }
+
+        @Override
+        public TaskSpec call() throws Exception {
+            Worker worker = workers.get(id);
+            if (worker == null) {
+                return null;
+            }
+            switch (worker.state) {
+                case STARTING:
+                    log.info("{}: Cancelling worker {} during its startup process.",
+                        nodeName, id);
+                    worker.state = State.CANCELLING;
+                    break;
+                case CANCELLING:
+                    log.info("{}: Can't stop worker {}, because it is already being " +
+                        "cancelled.", nodeName, id);
+                    break;
+                case RUNNING:
+                    log.info("{}: Stopping running worker {}.", nodeName, id);
+                    worker.transitionToStopping();
+                    break;
+                case STOPPING:
+                    log.info("{}: Can't stop worker {}, because it is already " +
+                            "stopping.", nodeName, id);
+                    break;
+                case DONE:
+                    log.debug("{}: Can't stop worker {}, because it is already done.",
+                        nodeName, id);
+                    break;
+            }
+            return worker.spec();
+        }
+    }
+
+    /**
+     * Cleans up the resources associated with a worker.  Processed by the worker
+     * cleanup thread pool.
+     */
+    class CleanupWorker implements Callable<Void> {
+        private final Worker worker;
+
+        CleanupWorker(Worker worker) {
+            this.worker = worker;
+        }
+
+        @Override
+        public Void call() throws Exception {
+            String failure = "";
+            try {
+                worker.taskWorker.stop(platform);
+            } catch (Exception exception) {
+                log.error("{}: worker.stop() exception", nodeName, exception);
+                failure = exception.getMessage();
+            }
+            stateChangeExecutor.submit(new CompleteWorker(worker, failure));
+            return null;
+        }
+    }
+
+    public TreeMap<String, WorkerState> workerStates() throws Exception {
+        try (ShutdownManager.Reference ref = shutdownManager.takeReference()) {
+            return stateChangeExecutor.submit(new GetWorkerStates()).get();
+        }
+    }
+
+    class GetWorkerStates implements Callable<TreeMap<String, WorkerState>> {
+        @Override
+        public TreeMap<String, WorkerState> call() throws Exception {
+            TreeMap<String, WorkerState> workerMap = new TreeMap<>();
+            for (Worker worker : workers.values()) {
+                workerMap.put(worker.id(), worker.state());
+            }
+            return workerMap;
+        }
+    }
+
+    public void beginShutdown() throws Exception {
+        if (shutdownManager.shutdown()) {
+            shutdownExecutor.submit(new Shutdown());
+        }
+    }
+
+    public void waitForShutdown() throws Exception {
+        while (!shutdownExecutor.isShutdown()) {
+            shutdownExecutor.awaitTermination(1, TimeUnit.DAYS);
+        }
+    }
+
+    class Shutdown implements Callable<Void> {
+        @Override
+        public Void call() throws Exception {
+            log.info("{}: Shutting down WorkerManager.", platform.curNode().name());
+            for (Worker worker : workers.values()) {
+                stateChangeExecutor.submit(new StopWorker(worker.id));
+            }
+            shutdownManager.waitForQuiescence();
+            workerCleanupExecutor.shutdownNow();
+            stateChangeExecutor.shutdownNow();
+            workerCleanupExecutor.awaitTermination(1, TimeUnit.DAYS);
+            stateChangeExecutor.awaitTermination(1, TimeUnit.DAYS);
+            shutdownExecutor.shutdown();
+            return null;
+        }
+    }
+}