You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by rs...@apache.org on 2017/11/03 09:38:06 UTC

[3/4] kafka git commit: KAFKA-6060; Add workload generation capabilities to Trogdor

http://git-wip-us.apache.org/repos/asf/kafka/blob/4fac83ba/tools/src/main/java/org/apache/kafka/trogdor/basic/BasicPlatform.java
----------------------------------------------------------------------
diff --git a/tools/src/main/java/org/apache/kafka/trogdor/basic/BasicPlatform.java b/tools/src/main/java/org/apache/kafka/trogdor/basic/BasicPlatform.java
index 85270cd..6922c2e 100644
--- a/tools/src/main/java/org/apache/kafka/trogdor/basic/BasicPlatform.java
+++ b/tools/src/main/java/org/apache/kafka/trogdor/basic/BasicPlatform.java
@@ -18,6 +18,7 @@
 package org.apache.kafka.trogdor.basic;
 
 import com.fasterxml.jackson.databind.JsonNode;
+import org.apache.kafka.common.utils.Scheduler;
 import org.apache.kafka.common.utils.Shell;
 import org.apache.kafka.common.utils.Utils;
 import org.apache.kafka.trogdor.common.Node;
@@ -36,6 +37,7 @@ public class BasicPlatform implements Platform {
 
     private final Node curNode;
     private final BasicTopology topology;
+    private final Scheduler scheduler;
     private final CommandRunner commandRunner;
 
     public interface CommandRunner {
@@ -57,7 +59,7 @@ public class BasicPlatform implements Platform {
     }
 
     public BasicPlatform(String curNodeName, BasicTopology topology,
-                         CommandRunner commandRunner) {
+                         Scheduler scheduler, CommandRunner commandRunner) {
         this.curNode = topology.node(curNodeName);
         if (this.curNode == null) {
             throw new RuntimeException(String.format("No node named %s found " +
@@ -65,16 +67,18 @@ public class BasicPlatform implements Platform {
                 Utils.join(topology.nodes().keySet(), ",")));
         }
         this.topology = topology;
+        this.scheduler = scheduler;
         this.commandRunner = commandRunner;
     }
 
-    public BasicPlatform(String curNodeName, JsonNode configRoot) {
+    public BasicPlatform(String curNodeName, Scheduler scheduler, JsonNode configRoot) {
         JsonNode nodes = configRoot.get("nodes");
         if (nodes == null) {
             throw new RuntimeException("Expected to find a 'nodes' field " +
                 "in the root JSON configuration object");
         }
         this.topology = new BasicTopology(nodes);
+        this.scheduler = scheduler;
         this.curNode = topology.node(curNodeName);
         if (this.curNode == null) {
             throw new RuntimeException(String.format("No node named %s found " +
@@ -100,6 +104,11 @@ public class BasicPlatform implements Platform {
     }
 
     @Override
+    public Scheduler scheduler() {
+        return scheduler;
+    }
+
+    @Override
     public String runCommand(String[] command) throws IOException {
         return commandRunner.run(curNode, command);
     }

http://git-wip-us.apache.org/repos/asf/kafka/blob/4fac83ba/tools/src/main/java/org/apache/kafka/trogdor/common/Platform.java
----------------------------------------------------------------------
diff --git a/tools/src/main/java/org/apache/kafka/trogdor/common/Platform.java b/tools/src/main/java/org/apache/kafka/trogdor/common/Platform.java
index 1177ace..cb20620 100644
--- a/tools/src/main/java/org/apache/kafka/trogdor/common/Platform.java
+++ b/tools/src/main/java/org/apache/kafka/trogdor/common/Platform.java
@@ -21,6 +21,8 @@ import com.fasterxml.jackson.databind.JsonNode;
 
 import java.io.File;
 import java.io.IOException;
+
+import org.apache.kafka.common.utils.Scheduler;
 import org.apache.kafka.common.utils.Utils;
 
 /**
@@ -47,6 +49,7 @@ public interface Platform {
             String platformName = platformNode.textValue();
             return Utils.newParameterizedInstance(platformName,
                 String.class, curNodeName,
+                Scheduler.class, Scheduler.SYSTEM,
                 JsonNode.class, root);
         }
     }
@@ -67,6 +70,11 @@ public interface Platform {
     Topology topology();
 
     /**
+     * Get the scheduler to use.
+     */
+    Scheduler scheduler();
+
+    /**
      * Run a command on this local node.
      *
      * Throws an exception if the command could not be run, or if the

http://git-wip-us.apache.org/repos/asf/kafka/blob/4fac83ba/tools/src/main/java/org/apache/kafka/trogdor/common/ThreadUtils.java
----------------------------------------------------------------------
diff --git a/tools/src/main/java/org/apache/kafka/trogdor/common/ThreadUtils.java b/tools/src/main/java/org/apache/kafka/trogdor/common/ThreadUtils.java
new file mode 100644
index 0000000..7e02856
--- /dev/null
+++ b/tools/src/main/java/org/apache/kafka/trogdor/common/ThreadUtils.java
@@ -0,0 +1,55 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.trogdor.common;
+
+import java.util.concurrent.ThreadFactory;
+import java.util.concurrent.atomic.AtomicLong;
+
+/**
+ * Utilities for working with threads.
+ */
+public class ThreadUtils {
+    /**
+     * Create a new ThreadFactory.
+     *
+     * @param pattern       The pattern to use.  If this contains %d, it will be
+     *                      replaced with a thread number.  It should not contain more
+     *                      than one %d.
+     * @param daemon        True if we want daemon threads.
+     * @return              The new ThreadFactory.
+     */
+    public static ThreadFactory createThreadFactory(final String pattern,
+                                                    final boolean daemon) {
+        return new ThreadFactory() {
+            private final AtomicLong threadEpoch = new AtomicLong(0);
+
+            @Override
+            public Thread newThread(Runnable r) {
+                String threadName;
+                if (pattern.contains("%d")) {
+                    threadName = String.format(pattern, threadEpoch.addAndGet(1));
+                } else {
+                    threadName = pattern;
+                }
+                Thread thread = new Thread(r, threadName);
+                thread.setDaemon(daemon);
+                return thread;
+            }
+        };
+    }
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/4fac83ba/tools/src/main/java/org/apache/kafka/trogdor/common/Topology.java
----------------------------------------------------------------------
diff --git a/tools/src/main/java/org/apache/kafka/trogdor/common/Topology.java b/tools/src/main/java/org/apache/kafka/trogdor/common/Topology.java
index 94a5474..d48bbde 100644
--- a/tools/src/main/java/org/apache/kafka/trogdor/common/Topology.java
+++ b/tools/src/main/java/org/apache/kafka/trogdor/common/Topology.java
@@ -17,12 +17,30 @@
 
 package org.apache.kafka.trogdor.common;
 
+import java.util.HashSet;
+import java.util.Map;
 import java.util.NavigableMap;
+import java.util.Set;
 
 /**
  * Defines a cluster topology
  */
 public interface Topology {
+    class Util {
+        /**
+         * Get the names of agent nodes in the topology.
+         */
+        public static Set<String> agentNodeNames(Topology topology) {
+            Set<String> set = new HashSet<>();
+            for (Map.Entry<String, Node> entry : topology.nodes().entrySet()) {
+                if (Node.Util.getTrogdorAgentPort(entry.getValue()) > 0) {
+                    set.add(entry.getKey());
+                }
+            }
+            return set;
+        }
+    }
+
     /**
      * Get the node with the given name.
      */

http://git-wip-us.apache.org/repos/asf/kafka/blob/4fac83ba/tools/src/main/java/org/apache/kafka/trogdor/coordinator/Coordinator.java
----------------------------------------------------------------------
diff --git a/tools/src/main/java/org/apache/kafka/trogdor/coordinator/Coordinator.java b/tools/src/main/java/org/apache/kafka/trogdor/coordinator/Coordinator.java
index 8f3563b..8c26d8d 100644
--- a/tools/src/main/java/org/apache/kafka/trogdor/coordinator/Coordinator.java
+++ b/tools/src/main/java/org/apache/kafka/trogdor/coordinator/Coordinator.java
@@ -22,34 +22,19 @@ import net.sourceforge.argparse4j.inf.ArgumentParser;
 import net.sourceforge.argparse4j.inf.ArgumentParserException;
 import net.sourceforge.argparse4j.inf.Namespace;
 import org.apache.kafka.common.utils.Exit;
-import org.apache.kafka.common.utils.KafkaThread;
-import org.apache.kafka.common.utils.Time;
-import org.apache.kafka.common.utils.Utils;
+import org.apache.kafka.common.utils.Scheduler;
 import org.apache.kafka.trogdor.common.Node;
 import org.apache.kafka.trogdor.common.Platform;
-import org.apache.kafka.trogdor.fault.DoneState;
-import org.apache.kafka.trogdor.fault.Fault;
-import org.apache.kafka.trogdor.fault.FaultSet;
-import org.apache.kafka.trogdor.fault.FaultSpec;
-import org.apache.kafka.trogdor.fault.SendingState;
-import org.apache.kafka.trogdor.rest.CoordinatorFaultsResponse;
-import org.apache.kafka.trogdor.rest.CreateCoordinatorFaultRequest;
+import org.apache.kafka.trogdor.rest.CoordinatorStatusResponse;
+import org.apache.kafka.trogdor.rest.CreateTaskRequest;
+import org.apache.kafka.trogdor.rest.CreateTaskResponse;
 import org.apache.kafka.trogdor.rest.JsonRestServer;
+import org.apache.kafka.trogdor.rest.StopTaskRequest;
+import org.apache.kafka.trogdor.rest.StopTaskResponse;
+import org.apache.kafka.trogdor.rest.TasksResponse;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.Iterator;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
-import java.util.TreeMap;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.locks.Condition;
-import java.util.concurrent.locks.ReentrantLock;
-
 import static net.sourceforge.argparse4j.impl.Arguments.store;
 
 /**
@@ -61,39 +46,14 @@ public final class Coordinator {
     private static final Logger log = LoggerFactory.getLogger(Coordinator.class);
 
     /**
-     * The clock to use for this coordinator.
-     */
-    private final Time time;
-
-    /**
-     * The start time in milliseconds.
+     * The start time of the Coordinator in milliseconds.
      */
     private final long startTimeMs;
 
     /**
-     * The platform.
+     * The task manager.
      */
-    private final Platform platform;
-
-    /**
-     * NodeManager objects for each node in the cluster.
-     */
-    private final Map<String, NodeManager> nodeManagers;
-
-    /**
-     * The lock protecting shutdown and faultQueue.
-     */
-    private final ReentrantLock lock = new ReentrantLock();
-
-    /**
-     * The condition variable which the coordinator thread waits on.
-     */
-    private final Condition cond = lock.newCondition();
-
-    /**
-     * The coordinator runnable.
-     */
-    private final CoordinatorRunnable runnable;
+    private final TaskManager taskManager;
 
     /**
      * The REST server.
@@ -101,83 +61,6 @@ public final class Coordinator {
     private final JsonRestServer restServer;
 
     /**
-     * The coordinator thread.
-     */
-    private final KafkaThread thread;
-
-    /**
-     * True if the server is shutting down.
-     */
-    private boolean shutdown = false;
-
-    /**
-     * The set of faults which have been scheduled.
-     */
-    private final FaultSet pendingFaults = new FaultSet();
-
-    /**
-     * The set of faults which have been sent to the NodeManagers.
-     */
-    private final FaultSet processedFaults = new FaultSet();
-
-    class CoordinatorRunnable implements Runnable {
-        @Override
-        public void run() {
-            log.info("Starting main service thread.");
-            try {
-                long nextWakeMs = 0;
-                while (true) {
-                    long now = time.milliseconds();
-                    List<Fault> toStart = new ArrayList<>();
-                    lock.lock();
-                    try {
-                        if (shutdown) {
-                            break;
-                        }
-                        if (nextWakeMs > now) {
-                            if (cond.await(nextWakeMs - now, TimeUnit.MILLISECONDS)) {
-                                log.trace("CoordinatorRunnable woke up early.");
-                            }
-                            now = time.milliseconds();
-                            if (shutdown) {
-                                break;
-                            }
-                        }
-                        nextWakeMs = now + (60L * 60L * 1000L);
-                        Iterator<Fault> iter = pendingFaults.iterateByStart();
-                        while (iter.hasNext()) {
-                            Fault fault = iter.next();
-                            if (now < fault.spec().startMs()) {
-                                nextWakeMs = Math.min(nextWakeMs, fault.spec().startMs());
-                                break;
-                            }
-                            toStart.add(fault);
-                            iter.remove();
-                            processedFaults.add(fault);
-                        }
-                    } finally {
-                        lock.unlock();
-                    }
-                    for (Fault fault: toStart) {
-                        startFault(now, fault);
-                    }
-                }
-            } catch (Throwable t) {
-                log.error("CoordinatorRunnable shutting down with exception", t);
-            } finally {
-                log.info("CoordinatorRunnable shutting down.");
-                restServer.stop();
-                for (NodeManager nodeManager : nodeManagers.values()) {
-                    nodeManager.beginShutdown();
-                }
-                for (NodeManager nodeManager : nodeManagers.values()) {
-                    nodeManager.waitForShutdown();
-                }
-            }
-        }
-    }
-
-    /**
      * Create a new Coordinator.
      *
      * @param platform      The platform object to use.
@@ -185,24 +68,11 @@ public final class Coordinator {
      * @param restServer    The REST server to use.
      * @param resource      The AgentRestResoure to use.
      */
-    public Coordinator(Platform platform, Time time, JsonRestServer restServer,
+    public Coordinator(Platform platform, Scheduler scheduler, JsonRestServer restServer,
                        CoordinatorRestResource resource) {
-        this.platform = platform;
-        this.time = time;
-        this.startTimeMs = time.milliseconds();
-        this.runnable = new CoordinatorRunnable();
+        this.startTimeMs = scheduler.time().milliseconds();
+        this.taskManager = new TaskManager(platform, scheduler);
         this.restServer = restServer;
-        this.nodeManagers = new HashMap<>();
-        for (Node node : platform.topology().nodes().values()) {
-            if (Node.Util.getTrogdorAgentPort(node) > 0) {
-                this.nodeManagers.put(node.name(), new NodeManager(time, node));
-            }
-        }
-        if (this.nodeManagers.isEmpty()) {
-            log.warn("No agent nodes configured.");
-        }
-        this.thread = new KafkaThread("TrogdorCoordinatorThread", runnable, false);
-        this.thread.start();
         resource.setCoordinator(this);
     }
 
@@ -210,94 +80,32 @@ public final class Coordinator {
         return this.restServer.port();
     }
 
-    private void startFault(long now, Fault fault) {
-        Set<String> affectedNodes = fault.targetNodes(platform.topology());
-        Set<NodeManager> affectedManagers = new HashSet<>();
-        Set<String> nonexistentNodes = new HashSet<>();
-        Set<String> nodeNames = new HashSet<>();
-        for (String affectedNode : affectedNodes) {
-            NodeManager nodeManager = nodeManagers.get(affectedNode);
-            if (nodeManager == null) {
-                nonexistentNodes.add(affectedNode);
-            } else {
-                affectedManagers.add(nodeManager);
-                nodeNames.add(affectedNode);
-            }
-        }
-        if (!nonexistentNodes.isEmpty()) {
-            log.warn("Fault {} refers to {} non-existent node(s): {}", fault.id(),
-                    nonexistentNodes.size(), Utils.join(nonexistentNodes, ", "));
-        }
-        log.info("Applying fault {} on {} node(s): {}", fault.id(),
-                nodeNames.size(), Utils.join(nodeNames, ", "));
-        if (nodeNames.isEmpty()) {
-            fault.setState(new DoneState(now, ""));
-        } else {
-            fault.setState(new SendingState(nodeNames));
-        }
-        for (NodeManager nodeManager : affectedManagers) {
-            nodeManager.enqueueFault(fault);
-        }
-    }
-
-    public void beginShutdown() {
-        lock.lock();
-        try {
-            this.shutdown = true;
-            cond.signalAll();
-        } finally {
-            lock.unlock();
-        }
+    public CoordinatorStatusResponse status() throws Exception {
+        return new CoordinatorStatusResponse(startTimeMs);
     }
 
-    public void waitForShutdown() {
-        try {
-            this.thread.join();
-        } catch (InterruptedException e) {
-            log.error("Interrupted while waiting for thread shutdown", e);
-            Thread.currentThread().interrupt();
-        }
+    public CreateTaskResponse createTask(CreateTaskRequest request) throws Exception {
+        return new CreateTaskResponse(taskManager.createTask(request.id(), request.spec()));
     }
 
-    public long startTimeMs() {
-        return startTimeMs;
+    public StopTaskResponse stopTask(StopTaskRequest request) throws Exception {
+        return new StopTaskResponse(taskManager.stopTask(request.id()));
     }
 
-    public CoordinatorFaultsResponse getFaults() {
-        Map<String, CoordinatorFaultsResponse.FaultData> faultData = new TreeMap<>();
-        lock.lock();
-        try {
-            getFaultsImpl(faultData, pendingFaults);
-            getFaultsImpl(faultData, processedFaults);
-        } finally {
-            lock.unlock();
-        }
-        return new CoordinatorFaultsResponse(faultData);
+    public TasksResponse tasks() throws Exception {
+        return taskManager.tasks();
     }
 
-    private void getFaultsImpl(Map<String, CoordinatorFaultsResponse.FaultData> faultData,
-                               FaultSet faultSet) {
-        for (Iterator<Fault> iter = faultSet.iterateByStart();
-             iter.hasNext(); ) {
-            Fault fault = iter.next();
-            CoordinatorFaultsResponse.FaultData data =
-                new CoordinatorFaultsResponse.FaultData(fault.spec(), fault.state());
-            faultData.put(fault.id(), data);
-        }
+    public void beginShutdown(boolean stopAgents) throws Exception {
+        restServer.beginShutdown();
+        taskManager.beginShutdown(stopAgents);
     }
 
-    public void createFault(CreateCoordinatorFaultRequest request) throws ClassNotFoundException {
-        lock.lock();
-        try {
-            Fault fault = FaultSpec.Util.createFault(request.id(), request.spec());
-            pendingFaults.add(fault);
-            cond.signalAll();
-        } finally {
-            lock.unlock();
-        }
+    public void waitForShutdown() throws Exception {
+        restServer.waitForShutdown();
+        taskManager.waitForShutdown();
     }
 
-
     public static void main(String[] args) throws Exception {
         ArgumentParser parser = ArgumentParsers
             .newArgumentParser("trogdor-coordinator")
@@ -336,15 +144,20 @@ public final class Coordinator {
         JsonRestServer restServer = new JsonRestServer(
             Node.Util.getTrogdorCoordinatorPort(platform.curNode()));
         CoordinatorRestResource resource = new CoordinatorRestResource();
-        final Coordinator coordinator = new Coordinator(platform, Time.SYSTEM,
+        log.info("Starting coordinator process.");
+        final Coordinator coordinator = new Coordinator(platform, Scheduler.SYSTEM,
             restServer, resource);
         restServer.start(resource);
         Runtime.getRuntime().addShutdownHook(new Thread() {
             @Override
             public void run() {
-                log.error("Running shutdown hook...");
-                coordinator.beginShutdown();
-                coordinator.waitForShutdown();
+                log.warn("Running coordinator shutdown hook.");
+                try {
+                    coordinator.beginShutdown(false);
+                    coordinator.waitForShutdown();
+                } catch (Exception e) {
+                    log.error("Got exception while running coordinator shutdown hook.", e);
+                }
             }
         });
         coordinator.waitForShutdown();

http://git-wip-us.apache.org/repos/asf/kafka/blob/4fac83ba/tools/src/main/java/org/apache/kafka/trogdor/coordinator/CoordinatorClient.java
----------------------------------------------------------------------
diff --git a/tools/src/main/java/org/apache/kafka/trogdor/coordinator/CoordinatorClient.java b/tools/src/main/java/org/apache/kafka/trogdor/coordinator/CoordinatorClient.java
index 1137d08..821a76b 100644
--- a/tools/src/main/java/org/apache/kafka/trogdor/coordinator/CoordinatorClient.java
+++ b/tools/src/main/java/org/apache/kafka/trogdor/coordinator/CoordinatorClient.java
@@ -25,12 +25,15 @@ import net.sourceforge.argparse4j.inf.MutuallyExclusiveGroup;
 import net.sourceforge.argparse4j.inf.Namespace;
 import org.apache.kafka.common.utils.Exit;
 import org.apache.kafka.trogdor.common.JsonUtil;
-import org.apache.kafka.trogdor.rest.CoordinatorFaultsResponse;
 import org.apache.kafka.trogdor.rest.CoordinatorStatusResponse;
-import org.apache.kafka.trogdor.rest.CreateCoordinatorFaultRequest;
+import org.apache.kafka.trogdor.rest.CreateTaskRequest;
+import org.apache.kafka.trogdor.rest.CreateTaskResponse;
 import org.apache.kafka.trogdor.rest.Empty;
 import org.apache.kafka.trogdor.rest.JsonRestServer;
 import org.apache.kafka.trogdor.rest.JsonRestServer.HttpResponse;
+import org.apache.kafka.trogdor.rest.StopTaskRequest;
+import org.apache.kafka.trogdor.rest.StopTaskResponse;
+import org.apache.kafka.trogdor.rest.TasksResponse;
 
 import static net.sourceforge.argparse4j.impl.Arguments.store;
 import static net.sourceforge.argparse4j.impl.Arguments.storeTrue;
@@ -40,47 +43,64 @@ import static net.sourceforge.argparse4j.impl.Arguments.storeTrue;
  */
 public class CoordinatorClient {
     /**
+     * The maximum number of tries to make.
+     */
+    private final int maxTries;
+
+    /**
      * The URL target.
      */
     private final String target;
 
-    public CoordinatorClient(String host, int port) {
-        this(String.format("%s:%d", host, port));
+    public CoordinatorClient(int maxTries, String host, int port) {
+        this(maxTries, String.format("%s:%d", host, port));
     }
 
-    public CoordinatorClient(String target) {
+    public CoordinatorClient(int maxTries, String target) {
+        this.maxTries = maxTries;
         this.target = target;
     }
 
+    public int maxTries() {
+        return maxTries;
+    }
+
     private String url(String suffix) {
         return String.format("http://%s%s", target, suffix);
     }
 
-    public CoordinatorStatusResponse getStatus() throws Exception {
+    public CoordinatorStatusResponse status() throws Exception {
         HttpResponse<CoordinatorStatusResponse> resp =
             JsonRestServer.<CoordinatorStatusResponse>httpRequest(url("/coordinator/status"), "GET",
-                null, new TypeReference<CoordinatorStatusResponse>() { });
+                null, new TypeReference<CoordinatorStatusResponse>() { }, maxTries);
         return resp.body();
     }
 
-    public CoordinatorFaultsResponse getFaults() throws Exception {
-        HttpResponse<CoordinatorFaultsResponse> resp =
-            JsonRestServer.<CoordinatorFaultsResponse>httpRequest(url("/coordinator/faults"), "GET",
-                null, new TypeReference<CoordinatorFaultsResponse>() { });
+    public CreateTaskResponse createTask(CreateTaskRequest request) throws Exception {
+        HttpResponse<CreateTaskResponse> resp =
+            JsonRestServer.<CreateTaskResponse>httpRequest(url("/coordinator/task/create"), "POST",
+                request, new TypeReference<CreateTaskResponse>() { }, maxTries);
         return resp.body();
     }
 
-    public void putFault(CreateCoordinatorFaultRequest request) throws Exception {
-        HttpResponse<CreateCoordinatorFaultRequest> resp =
-            JsonRestServer.<CreateCoordinatorFaultRequest>httpRequest(url("/coordinator/fault"), "PUT",
-                request, new TypeReference<CreateCoordinatorFaultRequest>() { });
-        resp.body();
+    public StopTaskResponse stopTask(StopTaskRequest request) throws Exception {
+        HttpResponse<StopTaskResponse> resp =
+            JsonRestServer.<StopTaskResponse>httpRequest(url("/coordinator/task/stop"), "PUT",
+                request, new TypeReference<StopTaskResponse>() { }, maxTries);
+        return resp.body();
+    }
+
+    public TasksResponse tasks() throws Exception {
+        HttpResponse<TasksResponse> resp =
+            JsonRestServer.<TasksResponse>httpRequest(url("/coordinator/tasks"), "GET",
+                null, new TypeReference<TasksResponse>() { }, maxTries);
+        return resp.body();
     }
 
     public void shutdown() throws Exception {
         HttpResponse<Empty> resp =
             JsonRestServer.<Empty>httpRequest(url("/coordinator/shutdown"), "PUT",
-                null, new TypeReference<Empty>() { });
+                null, new TypeReference<Empty>() { }, maxTries);
         resp.body();
     }
 
@@ -102,17 +122,23 @@ public class CoordinatorClient {
             .type(Boolean.class)
             .dest("status")
             .help("Get coordinator status.");
-        actions.addArgument("--get-faults")
+        actions.addArgument("--show-tasks")
             .action(storeTrue())
             .type(Boolean.class)
-            .dest("get_faults")
-            .help("Get coordinator faults.");
-        actions.addArgument("--create-fault")
+            .dest("show_tasks")
+            .help("Show coordinator tasks.");
+        actions.addArgument("--create-task")
+            .action(store())
+            .type(String.class)
+            .dest("create_task")
+            .metavar("TASK_SPEC_JSON")
+            .help("Create a new task from a task spec.");
+        actions.addArgument("--stop-task")
             .action(store())
             .type(String.class)
-            .dest("create_fault")
-            .metavar("FAULT_JSON")
-            .help("Create a new fault.");
+            .dest("stop_task")
+            .metavar("TASK_ID")
+            .help("Stop a task.");
         actions.addArgument("--shutdown")
             .action(storeTrue())
             .type(Boolean.class)
@@ -132,17 +158,20 @@ public class CoordinatorClient {
             }
         }
         String target = res.getString("target");
-        CoordinatorClient client = new CoordinatorClient(target);
+        CoordinatorClient client = new CoordinatorClient(3, target);
         if (res.getBoolean("status")) {
             System.out.println("Got coordinator status: " +
-                JsonUtil.toPrettyJsonString(client.getStatus()));
-        } else if (res.getBoolean("get_faults")) {
-            System.out.println("Got coordinator faults: " +
-                JsonUtil.toPrettyJsonString(client.getFaults()));
-        } else if (res.getString("create_fault") != null) {
-            client.putFault(JsonUtil.JSON_SERDE.readValue(res.getString("create_fault"),
-                CreateCoordinatorFaultRequest.class));
-            System.out.println("Created fault.");
+                JsonUtil.toPrettyJsonString(client.status()));
+        } else if (res.getBoolean("show_tasks")) {
+            System.out.println("Got coordinator tasks: " +
+                JsonUtil.toPrettyJsonString(client.tasks()));
+        } else if (res.getString("create_task") != null) {
+            client.createTask(JsonUtil.JSON_SERDE.readValue(res.getString("create_task"),
+                CreateTaskRequest.class));
+            System.out.println("Created task.");
+        } else if (res.getString("stop_task") != null) {
+            client.stopTask(new StopTaskRequest(res.getString("stop_task")));
+            System.out.println("Created task.");
         } else if (res.getBoolean("shutdown")) {
             client.shutdown();
             System.out.println("Sent shutdown request.");

http://git-wip-us.apache.org/repos/asf/kafka/blob/4fac83ba/tools/src/main/java/org/apache/kafka/trogdor/coordinator/CoordinatorRestResource.java
----------------------------------------------------------------------
diff --git a/tools/src/main/java/org/apache/kafka/trogdor/coordinator/CoordinatorRestResource.java b/tools/src/main/java/org/apache/kafka/trogdor/coordinator/CoordinatorRestResource.java
index 60357b8..7775dd0 100644
--- a/tools/src/main/java/org/apache/kafka/trogdor/coordinator/CoordinatorRestResource.java
+++ b/tools/src/main/java/org/apache/kafka/trogdor/coordinator/CoordinatorRestResource.java
@@ -16,14 +16,19 @@
  */
 package org.apache.kafka.trogdor.coordinator;
 
-import org.apache.kafka.trogdor.rest.CoordinatorFaultsResponse;
+import org.apache.kafka.trogdor.rest.CoordinatorShutdownRequest;
 import org.apache.kafka.trogdor.rest.CoordinatorStatusResponse;
-import org.apache.kafka.trogdor.rest.CreateCoordinatorFaultRequest;
+import org.apache.kafka.trogdor.rest.CreateTaskRequest;
+import org.apache.kafka.trogdor.rest.CreateTaskResponse;
 import org.apache.kafka.trogdor.rest.Empty;
+import org.apache.kafka.trogdor.rest.StopTaskRequest;
+import org.apache.kafka.trogdor.rest.StopTaskResponse;
+import org.apache.kafka.trogdor.rest.TasksResponse;
 
 import javax.servlet.ServletContext;
 import javax.ws.rs.Consumes;
 import javax.ws.rs.GET;
+import javax.ws.rs.POST;
 import javax.ws.rs.PUT;
 import javax.ws.rs.Path;
 import javax.ws.rs.Produces;
@@ -46,27 +51,32 @@ public class CoordinatorRestResource {
 
     @GET
     @Path("/status")
-    public CoordinatorStatusResponse getStatus() throws Throwable {
-        return new CoordinatorStatusResponse(coordinator().startTimeMs());
+    public CoordinatorStatusResponse status() throws Throwable {
+        return coordinator().status();
     }
 
-    @GET
-    @Path("/faults")
-    public CoordinatorFaultsResponse getCoordinatorFaults() throws Throwable {
-        return coordinator().getFaults();
+    @POST
+    @Path("/task/create")
+    public CreateTaskResponse createTask(CreateTaskRequest request) throws Throwable {
+        return coordinator().createTask(request);
     }
 
     @PUT
-    @Path("/fault")
-    public Empty putCoordinatorFault(CreateCoordinatorFaultRequest request) throws Throwable {
-        coordinator().createFault(request);
-        return Empty.INSTANCE;
+    @Path("/task/stop")
+    public StopTaskResponse stopTask(StopTaskRequest request) throws Throwable {
+        return coordinator().stopTask(request);
+    }
+
+    @GET
+    @Path("/tasks")
+    public TasksResponse tasks() throws Throwable {
+        return coordinator().tasks();
     }
 
     @PUT
     @Path("/shutdown")
-    public Empty shutdown() throws Throwable {
-        coordinator().beginShutdown();
+    public Empty beginShutdown(CoordinatorShutdownRequest request) throws Throwable {
+        coordinator().beginShutdown(request.stopAgents());
         return Empty.INSTANCE;
     }
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/4fac83ba/tools/src/main/java/org/apache/kafka/trogdor/coordinator/NodeManager.java
----------------------------------------------------------------------
diff --git a/tools/src/main/java/org/apache/kafka/trogdor/coordinator/NodeManager.java b/tools/src/main/java/org/apache/kafka/trogdor/coordinator/NodeManager.java
index ee71190..aea9617 100644
--- a/tools/src/main/java/org/apache/kafka/trogdor/coordinator/NodeManager.java
+++ b/tools/src/main/java/org/apache/kafka/trogdor/coordinator/NodeManager.java
@@ -15,254 +15,315 @@
  * limitations under the License.
  */
 
+/*
+ * So, when a task comes in, it happens via createTask (the RPC backend).
+ * This starts a CreateTask on the main state change thread, and waits for it.
+ * That task checks the main task hash map, and returns back the existing task spec
+ * if there is something there.  If there is nothing there, it creates
+ * something new, and returns null.
+ * It also schedules a RunTask some time in the future on the main state change thread.
+ * We save the future from this in case we need to cancel it later, in a StopTask.
+ * If we can't create the TaskController for the task, we transition to DONE with an
+ * appropriate error message.
+ *
+ * RunTask actually starts the task which was created earlier.  This could
+ * happen an arbitrary amount of time after task creation (it is based on the
+ * task spec).  RunTask must operate only on PENDING tasks... if the task has been
+ * stopped, then we have nothing to do here.
+ * RunTask asks the TaskController for a list of all the names of nodes
+ * affected by this task.
+ * If this list contains nodes we don't know about, or zero nodes, we
+ * transition directly to DONE state with an appropriate error set.
+ * RunTask schedules CreateWorker Callables on all the affected worker nodes.
+ * These callables run in the context of the relevant NodeManager.
+ *
+ * CreateWorker calls the RPC of the same name for the agent.
+ * There is some complexity here due to retries.
+ */
+
 package org.apache.kafka.trogdor.coordinator;
 
-import org.apache.kafka.common.utils.KafkaThread;
-import org.apache.kafka.common.utils.Time;
 import org.apache.kafka.trogdor.agent.AgentClient;
 import org.apache.kafka.trogdor.common.Node;
-import org.apache.kafka.trogdor.common.Platform;
-import org.apache.kafka.trogdor.fault.DoneState;
-import org.apache.kafka.trogdor.fault.Fault;
-import org.apache.kafka.trogdor.fault.SendingState;
+import org.apache.kafka.trogdor.common.ThreadUtils;
 import org.apache.kafka.trogdor.rest.AgentStatusResponse;
-import org.apache.kafka.trogdor.rest.CreateAgentFaultRequest;
+import org.apache.kafka.trogdor.rest.CreateWorkerRequest;
+import org.apache.kafka.trogdor.rest.StopWorkerRequest;
+import org.apache.kafka.trogdor.rest.WorkerDone;
+import org.apache.kafka.trogdor.rest.WorkerReceiving;
+import org.apache.kafka.trogdor.rest.WorkerRunning;
+import org.apache.kafka.trogdor.rest.WorkerStarting;
+import org.apache.kafka.trogdor.rest.WorkerState;
+import org.apache.kafka.trogdor.task.TaskSpec;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import java.util.ArrayList;
-import java.util.List;
+import java.net.ConnectException;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.concurrent.Callable;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ScheduledFuture;
 import java.util.concurrent.TimeUnit;
-import java.util.concurrent.locks.Condition;
-import java.util.concurrent.locks.Lock;
-import java.util.concurrent.locks.ReentrantLock;
 
-class NodeManager {
+/**
+ * The NodeManager handles communicating with a specific agent node.
+ * Each NodeManager has its own ExecutorService which runs in a dedicated thread.
+ */
+public final class NodeManager {
     private static final Logger log = LoggerFactory.getLogger(NodeManager.class);
 
     /**
-     * The Time object used to fetch the current time.
+     * The normal amount of seconds between heartbeats sent to the agent.
      */
-    private final Time time;
+    private static final long HEARTBEAT_DELAY_MS = 1000L;
 
-    /**
-     * The node which is being managed.
-     */
-    private final Node node;
+    class ManagedWorker {
+        private final String id;
+        private final TaskSpec spec;
+        private boolean shouldRun;
+        private WorkerState state;
 
-    /**
-     * The client for the node being managed.
-     */
-    private final AgentClient client;
+        ManagedWorker(String id, TaskSpec spec, boolean shouldRun, WorkerState state) {
+            this.id = id;
+            this.spec = spec;
+            this.shouldRun = shouldRun;
+            this.state = state;
+        }
 
-    /**
-     * The maximum amount of time to go without contacting the node.
-     */
-    private final long heartbeatMs;
+        void tryCreate() {
+            try {
+                client.createWorker(new CreateWorkerRequest(id, spec));
+            } catch (Throwable e) {
+                log.error("{}: error creating worker {}.", node.name(), id, e);
+            }
+        }
 
-    /**
-     * True if the NodeManager is shutting down.  Protected by the queueLock.
-     */
-    private boolean shutdown = false;
+        void tryStop() {
+            try {
+                client.stopWorker(new StopWorkerRequest(id));
+            } catch (Throwable e) {
+                log.error("{}: error stopping worker {}.", node.name(), id, e);
+            }
+        }
+    }
 
     /**
-     * The Node Manager runnable.
+     * The node which we are managing.
      */
-    private final NodeManagerRunnable runnable;
+    private final Node node;
 
     /**
-     * The Node Manager thread.
+     * The task manager.
      */
-    private final KafkaThread thread;
+    private final TaskManager taskManager;
 
     /**
-     * The lock protecting the NodeManager fields.
+     * A client for the Node's Agent.
      */
-    private final Lock lock = new ReentrantLock();
+    private final AgentClient client;
 
     /**
-     * The condition variable used to wake the thread when it is waiting for a
-     * queue or shutdown change.
+     * Maps task IDs to worker structures.
      */
-    private final Condition cond = lock.newCondition();
+    private final Map<String, ManagedWorker> workers;
 
     /**
-     * A queue of faults which should be sent to this node.  Protected by the lock.
+     * An executor service which manages the thread dedicated to this node.
      */
-    private final List<Fault> faultQueue = new ArrayList<>();
+    private final ScheduledExecutorService executor;
 
     /**
-     * The last time we successfully contacted the node.  Protected by the lock.
+     * The heartbeat runnable.
      */
-    private long lastContactMs = 0;
+    private final NodeHeartbeat heartbeat;
 
     /**
-     * The current status of this node.
+     * A future which can be used to cancel the periodic hearbeat task.
      */
-    public static class NodeStatus {
-        private final String nodeName;
-        private final long lastContactMs;
+    private ScheduledFuture<?> heartbeatFuture;
 
-        NodeStatus(String nodeName, long lastContactMs) {
-            this.nodeName = nodeName;
-            this.lastContactMs = lastContactMs;
-        }
-
-        public String nodeName() {
-            return nodeName;
-        }
+    NodeManager(Node node, TaskManager taskManager) {
+        this.node = node;
+        this.taskManager = taskManager;
+        this.client = new AgentClient(1, node.hostname(), Node.Util.getTrogdorAgentPort(node));
+        this.workers = new HashMap<>();
+        this.executor = Executors.newSingleThreadScheduledExecutor(
+            ThreadUtils.createThreadFactory("NodeManager(" + node.name() + ")",
+                false));
+        this.heartbeat = new NodeHeartbeat();
+        rescheduleNextHeartbeat(HEARTBEAT_DELAY_MS);
+    }
 
-        public long lastContactMs() {
-            return lastContactMs;
+    /**
+     * Reschedule the heartbeat runnable.
+     *
+     * @param initialDelayMs        The initial delay to use.
+     */
+    void rescheduleNextHeartbeat(long initialDelayMs) {
+        if (this.heartbeatFuture != null) {
+            this.heartbeatFuture.cancel(false);
         }
+        this.heartbeatFuture = this.executor.scheduleAtFixedRate(heartbeat,
+            initialDelayMs, HEARTBEAT_DELAY_MS, TimeUnit.MILLISECONDS);
     }
 
-    class NodeManagerRunnable implements Runnable {
+    /**
+     * The heartbeat runnable.
+     */
+    class NodeHeartbeat implements Runnable {
         @Override
         public void run() {
+            rescheduleNextHeartbeat(HEARTBEAT_DELAY_MS);
             try {
-                Fault fault = null;
-                long lastCommAttemptMs = 0;
-                while (true) {
-                    long now = time.milliseconds();
-                    if (fault != null) {
-                        lastCommAttemptMs = now;
-                        if (sendFault(now, fault)) {
-                            fault = null;
+                AgentStatusResponse agentStatus = null;
+                try {
+                    agentStatus = client.status();
+                } catch (ConnectException e) {
+                    log.error("{}: failed to get agent status: ConnectException {}", node.name(), e.getMessage());
+                    return;
+                } catch (Exception e) {
+                    log.error("{}: failed to get agent status", node.name(), e);
+                    // TODO: eventually think about putting tasks into a bad state as a result of
+                    // agents going down?
+                    return;
+                }
+                // Identify workers which we think should be running, but which do not appear
+                // in the agent's response.  We need to send startWorker requests for these.
+                for (Map.Entry<String, ManagedWorker> entry : workers.entrySet()) {
+                    String id = entry.getKey();
+                    if (!agentStatus.workers().containsKey(id)) {
+                        ManagedWorker worker = entry.getValue();
+                        if (worker.shouldRun) {
+                            worker.tryCreate();
                         }
                     }
-                    long nextCommAttemptMs = lastCommAttemptMs + heartbeatMs;
-                    if (now < nextCommAttemptMs) {
-                        lastCommAttemptMs = now;
-                        sendHeartbeat(now);
+                }
+                // Identify tasks which are running, but which we don't know about.
+                // Add these to the NodeManager as tasks that should not be running.
+                for (Map.Entry<String, WorkerState> entry : agentStatus.workers().entrySet()) {
+                    String id = entry.getKey();
+                    WorkerState state = entry.getValue();
+                    if (!workers.containsKey(id)) {
+                        log.warn("{}: scheduling unknown worker {} for stopping.", node.name(), id);
+                        workers.put(id, new ManagedWorker(id, state.spec(), false, state));
                     }
-                    long waitMs = Math.max(0L, nextCommAttemptMs - now);
-                    lock.lock();
-                    try {
-                        if (shutdown) {
-                            return;
-                        }
-                        try {
-                            cond.await(waitMs, TimeUnit.MILLISECONDS);
-                        } catch (InterruptedException e) {
-                            log.info("{}: NodeManagerRunnable got InterruptedException", node.name());
-                            Thread.currentThread().interrupt();
+                }
+                // Handle workers which need to be stopped.  Handle workers which have newly completed.
+                for (Map.Entry<String, WorkerState> entry : agentStatus.workers().entrySet()) {
+                    String id = entry.getKey();
+                    WorkerState state = entry.getValue();
+                    ManagedWorker worker = workers.get(id);
+                    if (state instanceof WorkerStarting || state instanceof WorkerRunning) {
+                        if (!worker.shouldRun) {
+                            worker.tryStop();
                         }
-                        if (fault == null) {
-                            if (!faultQueue.isEmpty()) {
-                                fault = faultQueue.remove(0);
+                    } else if (state instanceof WorkerDone) {
+                        if (!(worker.state instanceof WorkerDone)) {
+                            String error = ((WorkerDone) state).error();
+                            if (error.isEmpty()) {
+                                log.warn("{}: Worker {} finished with no error.", node.name(), id);
+                            } else {
+                                log.warn("{}: Worker {} finished with error '{}'", node.name(), id, error);
                             }
+                            taskManager.handleWorkerCompletion(node.name(), worker.id, error);
                         }
-                    } finally {
-                        lock.unlock();
                     }
+                    worker.state = state;
                 }
             } catch (Throwable e) {
-                log.warn("{}: exiting NodeManagerRunnable with exception", node.name(), e);
-            } finally {
+                log.error("{}: Unhandled exception in NodeHeartbeatRunnable", node.name(), e);
             }
         }
     }
 
-    NodeManager(Time time, Node node) {
-        this.time = time;
-        this.node = node;
-        this.client = new AgentClient(node.hostname(), Node.Util.getTrogdorAgentPort(node));
-        this.heartbeatMs = Node.Util.getIntConfig(node,
-                Platform.Config.TROGDOR_COORDINATOR_HEARTBEAT_MS,
-                Platform.Config.TROGDOR_COORDINATOR_HEARTBEAT_MS_DEFAULT);
-        this.runnable = new NodeManagerRunnable();
-        this.thread = new KafkaThread("NodeManagerThread(" + node.name() + ")", runnable, false);
-        this.thread.start();
+    /**
+     * Create a new worker.
+     *
+     * @param id                    The new worker id.
+     * @param spec                  The task specification to use with the new worker.
+     */
+    public void createWorker(String id, TaskSpec spec) {
+        executor.submit(new CreateWorker(id, spec));
     }
 
-    private boolean sendFault(long now, Fault fault) {
-        try {
-            client.putFault(new CreateAgentFaultRequest(fault.id(), fault.spec()));
-        } catch (Exception e) {
-            log.warn("{}: error sending fault to {}.", node.name(), client.target(), e);
-            return false;
-        }
-        lock.lock();
-        try {
-            lastContactMs = now;
-        } finally {
-            lock.unlock();
-        }
-        SendingState state = (SendingState) fault.state();
-        if (state.completeSend(node.name())) {
-            fault.setState(new DoneState(now, ""));
-        }
-        return true;
-    }
+    /**
+     * Starts a worker.
+     */
+    class CreateWorker implements Callable<Void> {
+        private final String id;
+        private final TaskSpec spec;
 
-    private void sendHeartbeat(long now) {
-        AgentStatusResponse status = null;
-        try {
-            status = client.getStatus();
-        } catch (Exception e) {
-            log.warn("{}: error sending heartbeat to {}.", node.name(), client.target(), e);
-            return;
+        CreateWorker(String id, TaskSpec spec) {
+            this.id = id;
+            this.spec = spec;
         }
-        lock.lock();
-        try {
-            lastContactMs = now;
-        } finally {
-            lock.unlock();
-        }
-        log.debug("{}: got heartbeat status {}.", node.name(), status);
-    }
 
-    public void beginShutdown() {
-        lock.lock();
-        try {
-            if (shutdown)
-                return;
-            log.trace("{}: beginning shutdown.", node.name());
-            shutdown = true;
-            cond.signalAll();
-        } finally {
-            lock.unlock();
-        }
-    }
-
-    public void waitForShutdown() {
-        log.trace("waiting for NodeManager({}) shutdown.", node.name());
-        try {
-            thread.join();
-        } catch (InterruptedException e) {
-            log.error("{}: Interrupted while waiting for thread shutdown", node.name(), e);
-            Thread.currentThread().interrupt();
+        @Override
+        public Void call() throws Exception {
+            ManagedWorker worker = workers.get(id);
+            if (worker != null) {
+                log.error("{}: there is already a worker for task {}.", node.name(), id);
+                return null;
+            }
+            log.info("{}: scheduling worker {} to start.", node.name(), id);
+            workers.put(id, new ManagedWorker(id, spec, true, new WorkerReceiving(spec)));
+            rescheduleNextHeartbeat(0);
+            return null;
         }
     }
 
     /**
-     * Get the current status of this node.
+     * Stop a worker.
      *
-     * @return                  The node status.
+     * @param id                    The id of the worker to stop.
      */
-    public NodeStatus status() {
-        lock.lock();
-        try {
-            return new NodeStatus(node.name(), lastContactMs);
-        } finally {
-            lock.unlock();
-        }
+    public void stopWorker(String id) {
+        executor.submit(new StopWorker(id));
     }
 
     /**
-     * Enqueue a new fault.
-     *
-     * @param fault             The fault to enqueue.
+     * Stops a worker.
      */
-    public void enqueueFault(Fault fault) {
-        lock.lock();
-        try {
-            log.trace("{}: added {} to fault queue.", node.name(), fault);
-            faultQueue.add(fault);
-            cond.signalAll();
-        } finally {
-            lock.unlock();
+    class StopWorker implements Callable<Void> {
+        private final String id;
+
+        StopWorker(String id) {
+            this.id = id;
+        }
+
+        @Override
+        public Void call() throws Exception {
+            ManagedWorker worker = workers.get(id);
+            if (worker == null) {
+                log.error("{}: can't stop non-existent worker {}.", node.name(), id);
+                return null;
+            }
+            if (!worker.shouldRun) {
+                log.error("{}: The worker for task {} is already scheduled to stop.",
+                    node.name(), id);
+                return null;
+            }
+            log.info("{}: scheduling worker {} on {} to stop.", node.name(), id);
+            worker.shouldRun = false;
+            rescheduleNextHeartbeat(0);
+            return null;
         }
     }
+
+    public void beginShutdown(boolean stopNode) {
+        executor.shutdownNow();
+        if (stopNode) {
+            try {
+                client.invokeShutdown();
+            } catch (Exception e) {
+                log.error("{}: Failed to send shutdown request", node.name(), e);
+            }
+        }
+    }
+
+    public void waitForShutdown() throws InterruptedException {
+        executor.awaitTermination(1, TimeUnit.DAYS);
+    }
 };

http://git-wip-us.apache.org/repos/asf/kafka/blob/4fac83ba/tools/src/main/java/org/apache/kafka/trogdor/coordinator/TaskManager.java
----------------------------------------------------------------------
diff --git a/tools/src/main/java/org/apache/kafka/trogdor/coordinator/TaskManager.java b/tools/src/main/java/org/apache/kafka/trogdor/coordinator/TaskManager.java
new file mode 100644
index 0000000..547c9da
--- /dev/null
+++ b/tools/src/main/java/org/apache/kafka/trogdor/coordinator/TaskManager.java
@@ -0,0 +1,535 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.trogdor.coordinator;
+
+import org.apache.kafka.common.KafkaException;
+import org.apache.kafka.common.utils.Scheduler;
+import org.apache.kafka.common.utils.Time;
+import org.apache.kafka.common.utils.Utils;
+import org.apache.kafka.trogdor.common.Node;
+import org.apache.kafka.trogdor.common.Platform;
+import org.apache.kafka.trogdor.common.ThreadUtils;
+import org.apache.kafka.trogdor.rest.TaskDone;
+import org.apache.kafka.trogdor.rest.TaskPending;
+import org.apache.kafka.trogdor.rest.TaskRunning;
+import org.apache.kafka.trogdor.rest.TaskState;
+import org.apache.kafka.trogdor.rest.TaskStopping;
+import org.apache.kafka.trogdor.rest.TasksResponse;
+import org.apache.kafka.trogdor.task.TaskController;
+import org.apache.kafka.trogdor.task.TaskSpec;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Set;
+import java.util.TreeMap;
+import java.util.TreeSet;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+/**
+ * The TaskManager is responsible for managing tasks inside the Trogdor coordinator.
+ *
+ * The task manager has a single thread, managed by the executor.  We start, stop,
+ * and handle state changes to tasks by adding requests to the executor queue.
+ * Because the executor is single threaded, no locks are needed when accessing
+ * TaskManager data structures.
+ *
+ * The TaskManager maintains a state machine for each task.  Tasks begin in the
+ * PENDING state, waiting for their designated start time to arrive.
+ * When their time arrives, they transition to the RUNNING state.  In this state,
+ * the NodeManager will start them, and monitor them.
+ *
+ * The TaskManager does not handle communication with the agents.  This is handled
+ * by the NodeManagers.  There is one NodeManager per node being managed.
+ * See {org.apache.kafka.trogdor.coordinator.NodeManager} for details.
+ */
+public final class TaskManager {
+    private static final Logger log = LoggerFactory.getLogger(TaskManager.class);
+
+    /**
+     * The platform.
+     */
+    private final Platform platform;
+
+    /**
+     * The scheduler to use for this coordinator.
+     */
+    private final Scheduler scheduler;
+
+    /**
+     * The clock to use for this coordinator.
+     */
+    private final Time time;
+
+    /**
+     * A map of task IDs to Task objects.
+     */
+    private final Map<String, ManagedTask> tasks;
+
+    /**
+     * The executor used for handling Task state changes.
+     */
+    private final ScheduledExecutorService executor;
+
+    /**
+     * Maps node names to node managers.
+     */
+    private final Map<String, NodeManager> nodeManagers;
+
+    /**
+     * True if the TaskManager is shut down.
+     */
+    private AtomicBoolean shutdown = new AtomicBoolean(false);
+
+    TaskManager(Platform platform, Scheduler scheduler) {
+        this.platform = platform;
+        this.scheduler = scheduler;
+        this.time = scheduler.time();
+        this.tasks = new HashMap<>();
+        this.executor = Executors.newSingleThreadScheduledExecutor(
+            ThreadUtils.createThreadFactory("TaskManagerStateThread", false));
+        this.nodeManagers = new HashMap<>();
+        for (Node node : platform.topology().nodes().values()) {
+            if (Node.Util.getTrogdorAgentPort(node) > 0) {
+                this.nodeManagers.put(node.name(), new NodeManager(node, this));
+            }
+        }
+    }
+
+    enum ManagedTaskState {
+        PENDING,
+        RUNNING,
+        STOPPING,
+        DONE;
+    }
+
+    class ManagedTask {
+        /**
+         * The task id.
+         */
+        final private String id;
+
+        /**
+         * The task specification.
+         */
+        final private TaskSpec spec;
+
+        /**
+         * The task controller.
+         */
+        final private TaskController controller;
+
+        /**
+         * The task state.
+         */
+        private ManagedTaskState state;
+
+        /**
+         * The time when the task was started, or -1 if the task has not been started.
+         */
+        private long startedMs = -1;
+
+        /**
+         * The time when the task was finished, or -1 if the task has not been finished.
+         */
+        private long doneMs = -1;
+
+        /**
+         * True if the task was cancelled by a stop request.
+         */
+        boolean cancelled = false;
+
+        /**
+         * If there is a task start scheduled, this is a future which can
+         * be used to cancel it.
+         */
+        private Future<?> startFuture = null;
+
+        /**
+         * The name of the worker nodes involved with this task.
+         * Null if the task is not running.
+         */
+        private Set<String> workers = null;
+
+        /**
+         * The names of the worker nodes which are still running this task.
+         * Null if the task is not running.
+         */
+        private Set<String> activeWorkers = null;
+
+        /**
+         * If this is non-empty, a message describing how this task failed.
+         */
+        private String error = "";
+
+        ManagedTask(String id, TaskSpec spec, TaskController controller, ManagedTaskState state) {
+            this.id = id;
+            this.spec = spec;
+            this.controller = controller;
+            this.state = state;
+        }
+
+        void clearStartFuture() {
+            if (startFuture != null) {
+                startFuture.cancel(false);
+                startFuture = null;
+            }
+        }
+
+        long startDelayMs(long now) {
+            if (now > spec.startMs()) {
+                return 0;
+            }
+            return spec.startMs() - now;
+        }
+
+        TreeSet<String> findNodeNames() {
+            Set<String> nodeNames = controller.targetNodes(platform.topology());
+            TreeSet<String> validNodeNames = new TreeSet<>();
+            TreeSet<String> nonExistentNodeNames = new TreeSet<>();
+            for (String nodeName : nodeNames) {
+                if (nodeManagers.containsKey(nodeName)) {
+                    validNodeNames.add(nodeName);
+                } else {
+                    nonExistentNodeNames.add(nodeName);
+                }
+            }
+            if (!nonExistentNodeNames.isEmpty()) {
+                throw new KafkaException("Unknown node names: " +
+                        Utils.join(nonExistentNodeNames, ", "));
+            }
+            if (validNodeNames.isEmpty()) {
+                throw new KafkaException("No node names specified.");
+            }
+            return validNodeNames;
+        }
+
+        void maybeSetError(String newError) {
+            if (error.isEmpty()) {
+                error = newError;
+            }
+        }
+
+        TaskState taskState() {
+            switch (state) {
+                case PENDING:
+                    return new TaskPending(spec);
+                case RUNNING:
+                    return new TaskRunning(spec, startedMs);
+                case STOPPING:
+                    return new TaskStopping(spec, startedMs);
+                case DONE:
+                    return new TaskDone(spec, startedMs, doneMs, error, cancelled);
+            }
+            throw new RuntimeException("unreachable");
+        }
+    }
+
+    /**
+     * Create a task.
+     *
+     * @param id                    The ID of the task to create.
+     * @param spec                  The specification of the task to create.
+     *
+     * @return                      The specification of the task with the given ID.
+     *                              Note that if there was already a task with the given ID,
+     *                              this may be different from the specification that was
+     *                              requested.
+     */
+    public TaskSpec createTask(final String id, TaskSpec spec)
+            throws ExecutionException, InterruptedException {
+        final TaskSpec existingSpec = executor.submit(new CreateTask(id, spec)).get();
+        if (existingSpec != null) {
+            log.info("Ignoring request to create task {}, because there is already " +
+                "a task with that id.", id);
+            return existingSpec;
+        }
+        return spec;
+    }
+
+    /**
+     * Handles a request to create a new task.  Processed by the state change thread.
+     */
+    class CreateTask implements Callable<TaskSpec> {
+        private final String id;
+        private final TaskSpec spec;
+
+        CreateTask(String id, TaskSpec spec) {
+            this.id = id;
+            this.spec = spec;
+        }
+
+        @Override
+        public TaskSpec call() throws Exception {
+            ManagedTask task = tasks.get(id);
+            if (task != null) {
+                log.info("Task ID {} is already in use.", id);
+                return task.spec;
+            }
+            TaskController controller = null;
+            String failure = null;
+            try {
+                controller = spec.newController(id);
+            } catch (Throwable t) {
+                failure = "Failed to create TaskController: " + t.getMessage();
+            }
+            if (failure != null) {
+                log.info("Failed to create a new task {} with spec {}: {}",
+                    id, spec, failure);
+                task = new ManagedTask(id, spec, null, ManagedTaskState.DONE);
+                task.doneMs = time.milliseconds();
+                task.maybeSetError(failure);
+                tasks.put(id, task);
+                return null;
+            }
+            task = new ManagedTask(id, spec, controller, ManagedTaskState.PENDING);
+            tasks.put(id, task);
+            long delayMs = task.startDelayMs(time.milliseconds());
+            task.startFuture = scheduler.schedule(executor, new RunTask(task), delayMs);
+            log.info("Created a new task {} with spec {}, scheduled to start {} ms from now.",
+                    id, spec, delayMs);
+            return null;
+        }
+    }
+
+    /**
+     * Handles starting a task.  Processed by the state change thread.
+     */
+    class RunTask implements Callable<Void> {
+        private final ManagedTask task;
+
+        RunTask(ManagedTask task) {
+            this.task = task;
+        }
+
+        @Override
+        public Void call() throws Exception {
+            task.clearStartFuture();
+            if (task.state != ManagedTaskState.PENDING) {
+                log.info("Can't start task {}, because it is already in state {}.",
+                    task.id, task.state);
+                return null;
+            }
+            TreeSet<String> nodeNames;
+            try {
+                nodeNames = task.findNodeNames();
+            } catch (Exception e) {
+                log.error("Unable to find nodes for task {}", task.id, e);
+                task.doneMs = time.milliseconds();
+                task.state = ManagedTaskState.DONE;
+                task.maybeSetError("Unable to find nodes for task: " + e.getMessage());
+                return null;
+            }
+            log.info("Running task {} on node(s): {}", task.id, Utils.join(nodeNames, ", "));
+            task.state = ManagedTaskState.RUNNING;
+            task.startedMs = time.milliseconds();
+            task.workers = nodeNames;
+            task.activeWorkers = new HashSet<>();
+            for (String workerName : task.workers) {
+                task.activeWorkers.add(workerName);
+                nodeManagers.get(workerName).createWorker(task.id, task.spec);
+            }
+            return null;
+        }
+    }
+
+    /**
+     * Stop a task.
+     *
+     * @param id                    The ID of the task to stop.
+     * @return                      The specification of the task which was stopped, or null if there
+     *                              was no task found with the given ID.
+     */
+    public TaskSpec stopTask(final String id) throws ExecutionException, InterruptedException {
+        final TaskSpec spec = executor.submit(new CancelTask(id)).get();
+        return spec;
+    }
+
+    /**
+     * Handles cancelling a task.  Processed by the state change thread.
+     */
+    class CancelTask implements Callable<TaskSpec> {
+        private final String id;
+
+        CancelTask(String id) {
+            this.id = id;
+        }
+
+        @Override
+        public TaskSpec call() throws Exception {
+            ManagedTask task = tasks.get(id);
+            if (task == null) {
+                log.info("Can't cancel non-existent task {}.", id);
+                return null;
+            }
+            switch (task.state) {
+                case PENDING:
+                    task.cancelled = true;
+                    task.clearStartFuture();
+                    task.doneMs = time.milliseconds();
+                    task.state = ManagedTaskState.DONE;
+                    log.info("Stopped pending task {}.", id);
+                    break;
+                case RUNNING:
+                    task.cancelled = true;
+                    if (task.activeWorkers.size() == 0) {
+                        log.info("Task {} is now complete with error: {}", id, task.error);
+                        task.doneMs = time.milliseconds();
+                        task.state = ManagedTaskState.DONE;
+                    } else {
+                        for (String workerName : task.activeWorkers) {
+                            nodeManagers.get(workerName).stopWorker(id);
+                        }
+                        log.info("Cancelling task {} on worker(s): {}", id, Utils.join(task.activeWorkers, ", "));
+                        task.state = ManagedTaskState.STOPPING;
+                    }
+                    break;
+                case STOPPING:
+                    log.info("Can't cancel task {} because it is already stopping.", id);
+                    break;
+                case DONE:
+                    log.info("Can't cancel task {} because it is already done.", id);
+                    break;
+            }
+            return task.spec;
+        }
+    }
+
+    /**
+     * A callback NodeManager makes to indicate that a worker has completed.
+     * The task will transition to DONE once all workers are done.
+     *
+     * @param nodeName      The node name.
+     * @param id            The worker name.
+     * @param error         An empty string if there is no error, or an error string.
+     */
+    public void handleWorkerCompletion(String nodeName, String id, String error) {
+        executor.submit(new HandleWorkerCompletion(nodeName, id, error));
+    }
+
+    class HandleWorkerCompletion implements Callable<Void> {
+        private final String nodeName;
+        private final String id;
+        private final String error;
+
+        HandleWorkerCompletion(String nodeName, String id, String error) {
+            this.nodeName = nodeName;
+            this.id = id;
+            this.error = error;
+        }
+
+        @Override
+        public Void call() throws Exception {
+            ManagedTask task = tasks.get(id);
+            if (task == null) {
+                log.error("Can't handle completion of unknown worker {} on node {}",
+                    id, nodeName);
+                return null;
+            }
+            if ((task.state == ManagedTaskState.PENDING) || (task.state == ManagedTaskState.DONE)) {
+                log.error("Task {} got unexpected worker completion from {} while " +
+                    "in {} state.", id, nodeName, task.state);
+                return null;
+            }
+            boolean broadcastStop = false;
+            if (task.state == ManagedTaskState.RUNNING) {
+                task.state = ManagedTaskState.STOPPING;
+                broadcastStop = true;
+            }
+            task.maybeSetError(error);
+            task.activeWorkers.remove(nodeName);
+            if (task.activeWorkers.size() == 0) {
+                task.doneMs = time.milliseconds();
+                task.state = ManagedTaskState.DONE;
+                log.info("Task {} is now complete on {} with error: {}", id,
+                    Utils.join(task.workers, ", "),
+                    task.error.isEmpty() ? "(none)" : task.error);
+            } else if (broadcastStop) {
+                log.info("Node {} stopped.  Stopping task {} on worker(s): {}",
+                    id, Utils.join(task.activeWorkers, ", "));
+                for (String workerName : task.activeWorkers) {
+                    nodeManagers.get(workerName).stopWorker(id);
+                }
+            }
+            return null;
+        }
+    }
+
+    /**
+     * Get information about the tasks being managed.
+     */
+    public TasksResponse tasks() throws ExecutionException, InterruptedException {
+        return executor.submit(new GetTasksResponse()).get();
+    }
+
+    class GetTasksResponse implements Callable<TasksResponse> {
+        @Override
+        public TasksResponse call() throws Exception {
+            TreeMap<String, TaskState> states = new TreeMap<>();
+            for (ManagedTask task : tasks.values()) {
+                states.put(task.id, task.taskState());
+            }
+            return new TasksResponse(states);
+        }
+    }
+
+    /**
+     * Initiate shutdown, but do not wait for it to complete.
+     */
+    public void beginShutdown(boolean stopAgents) throws ExecutionException, InterruptedException {
+        if (shutdown.compareAndSet(false, true)) {
+            executor.submit(new Shutdown(stopAgents));
+        }
+    }
+
+    /**
+     * Wait for shutdown to complete.  May be called prior to beginShutdown.
+     */
+    public void waitForShutdown() throws ExecutionException, InterruptedException {
+        while (!executor.awaitTermination(1, TimeUnit.DAYS)) { }
+    }
+
+    class Shutdown implements Callable<Void> {
+        private final boolean stopAgents;
+
+        Shutdown(boolean stopAgents) {
+            this.stopAgents = stopAgents;
+        }
+
+        @Override
+        public Void call() throws Exception {
+            log.info("Shutting down TaskManager{}.", stopAgents ? " and agents" : "");
+            for (NodeManager nodeManager : nodeManagers.values()) {
+                nodeManager.beginShutdown(stopAgents);
+            }
+            for (NodeManager nodeManager : nodeManagers.values()) {
+                nodeManager.waitForShutdown();
+            }
+            executor.shutdown();
+            return null;
+        }
+    }
+};

http://git-wip-us.apache.org/repos/asf/kafka/blob/4fac83ba/tools/src/main/java/org/apache/kafka/trogdor/fault/AbstractFault.java
----------------------------------------------------------------------
diff --git a/tools/src/main/java/org/apache/kafka/trogdor/fault/AbstractFault.java b/tools/src/main/java/org/apache/kafka/trogdor/fault/AbstractFault.java
deleted file mode 100644
index 2d63b82..0000000
--- a/tools/src/main/java/org/apache/kafka/trogdor/fault/AbstractFault.java
+++ /dev/null
@@ -1,106 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.kafka.trogdor.fault;
-
-import org.apache.kafka.trogdor.common.JsonUtil;
-import org.apache.kafka.trogdor.common.Platform;
-import org.apache.kafka.trogdor.common.Topology;
-
-import java.util.Objects;
-import java.util.Set;
-
-public abstract class AbstractFault implements Fault {
-    private final String id;
-    private final FaultSpec spec;
-    private FaultState state;
-
-    public AbstractFault(String id, FaultSpec spec) {
-        this.id = id;
-        this.spec = spec;
-        this.state = new PendingState();
-    }
-
-    @Override
-    public final String id() {
-        return id;
-    }
-
-    @Override
-    public final FaultSpec spec() {
-        return spec;
-    }
-
-    @Override
-    public synchronized FaultState state() {
-        return state;
-    }
-
-    @Override
-    public synchronized void setState(FaultState state) {
-        this.state = state;
-    }
-
-    @Override
-    public final void activate(long now, Platform platform) throws Exception {
-        try {
-            handleActivation(now, platform);
-            setState(new RunningState(now));
-        } catch (Exception e) {
-            setState(new DoneState(now, e.getMessage()));
-            throw e;
-        }
-    }
-
-    protected abstract void handleActivation(long now, Platform platform) throws Exception;
-
-    @Override
-    public final void deactivate(long now, Platform platform) throws Exception {
-        try {
-            handleDeactivation(now, platform);
-            setState(new DoneState(now, ""));
-        } catch (Exception e) {
-            setState(new DoneState(now, e.getMessage()));
-            throw e;
-        }
-    }
-
-    protected abstract void handleDeactivation(long now, Platform platform) throws Exception;
-
-    @Override
-    public abstract Set<String> targetNodes(Topology topology);
-
-    @Override
-    public final boolean equals(Object o) {
-        if (this == o) return true;
-        if (o == null || getClass() != o.getClass()) return false;
-        return toString().equals(o.toString());
-    }
-
-    @Override
-    public final int hashCode() {
-        return Objects.hashCode(toString());
-    }
-
-    @Override
-    public final String toString() {
-        return getClass().getSimpleName() + "(id=" + id +
-            ", spec=" + JsonUtil.toJsonString(spec) +
-            ", state=" + JsonUtil.toJsonString(state()) +
-            ")";
-    }
-}

http://git-wip-us.apache.org/repos/asf/kafka/blob/4fac83ba/tools/src/main/java/org/apache/kafka/trogdor/fault/AbstractFaultSpec.java
----------------------------------------------------------------------
diff --git a/tools/src/main/java/org/apache/kafka/trogdor/fault/AbstractFaultSpec.java b/tools/src/main/java/org/apache/kafka/trogdor/fault/AbstractFaultSpec.java
deleted file mode 100644
index 5f551b5..0000000
--- a/tools/src/main/java/org/apache/kafka/trogdor/fault/AbstractFaultSpec.java
+++ /dev/null
@@ -1,53 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.kafka.trogdor.fault;
-
-import com.fasterxml.jackson.annotation.JsonProperty;
-import org.apache.kafka.trogdor.common.JsonUtil;
-
-
-/**
- * A base class that can be used for FaultSpecs. 
- */
-public abstract class AbstractFaultSpec implements FaultSpec {
-    private final long startMs;
-    private final long durationMs;
-
-    protected AbstractFaultSpec(@JsonProperty("startMs") long startMs,
-                             @JsonProperty("durationMs") long durationMs) {
-        this.startMs = startMs;
-        this.durationMs = durationMs;
-    }
-
-    @JsonProperty
-    @Override
-    public long startMs() {
-        return startMs;
-    }
-
-    @JsonProperty
-    @Override
-    public long durationMs() {
-        return durationMs;
-    }
-
-    @Override
-    public String toString() {
-        return JsonUtil.toJsonString(this);
-    }
-}

http://git-wip-us.apache.org/repos/asf/kafka/blob/4fac83ba/tools/src/main/java/org/apache/kafka/trogdor/fault/DoneState.java
----------------------------------------------------------------------
diff --git a/tools/src/main/java/org/apache/kafka/trogdor/fault/DoneState.java b/tools/src/main/java/org/apache/kafka/trogdor/fault/DoneState.java
deleted file mode 100644
index 222caf0..0000000
--- a/tools/src/main/java/org/apache/kafka/trogdor/fault/DoneState.java
+++ /dev/null
@@ -1,47 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.kafka.trogdor.fault;
-
-import com.fasterxml.jackson.annotation.JsonCreator;
-import com.fasterxml.jackson.annotation.JsonProperty;
-
-/**
- * The state a fault is in on the agent or controller when it is completed,
- * either normally or with an error.
- */
-public class DoneState extends FaultState {
-    private final long doneMs;
-    private final String errorStr;
-
-    @JsonCreator
-    public DoneState(@JsonProperty("doneMs") long doneMs,
-                     @JsonProperty("errorStr") String errorStr) {
-        this.doneMs = doneMs;
-        this.errorStr = errorStr;
-    }
-
-    @JsonProperty
-    public long doneMs() {
-        return doneMs;
-    }
-
-    @JsonProperty
-    public String errorStr() {
-        return errorStr;
-    }
-}

http://git-wip-us.apache.org/repos/asf/kafka/blob/4fac83ba/tools/src/main/java/org/apache/kafka/trogdor/fault/Fault.java
----------------------------------------------------------------------
diff --git a/tools/src/main/java/org/apache/kafka/trogdor/fault/Fault.java b/tools/src/main/java/org/apache/kafka/trogdor/fault/Fault.java
deleted file mode 100644
index e44d56a..0000000
--- a/tools/src/main/java/org/apache/kafka/trogdor/fault/Fault.java
+++ /dev/null
@@ -1,70 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.kafka.trogdor.fault;
-
-import org.apache.kafka.trogdor.common.Platform;
-import org.apache.kafka.trogdor.common.Topology;
-
-import java.util.Set;
-
-public interface Fault {
-    /**
-     * Get the ID of this fault.
-     */
-    String id();
-
-    /**
-     * Get the specification for this Fault.
-     */
-    FaultSpec spec();
-
-    /**
-     * Get the current fault state.  Thread-safe.
-     */
-    FaultState state();
-
-    /**
-     * Set the current fault state.  Thread-safe.
-     */
-    void setState(FaultState state);
-
-    /**
-     * Activate the fault.  Will transition into RunningState or DoneState.
-     *
-     * @param now           The current time in ms.
-     * @param platform      The platform to use.
-     */
-    void activate(long now, Platform platform) throws Exception;
-
-    /**
-     * Deactivate the fault.  Will transition into DoneState.
-     *
-     * @param now           The current time in ms.
-     * @param platform      The platform to use.
-     */
-    void deactivate(long now, Platform platform) throws Exception;
-
-    /**
-     * Get the nodes which this fault is targetting.
-     *
-     * @param topology      The topology to use.
-     *
-     * @return              A set of target node names.
-     */
-    Set<String> targetNodes(Topology topology);
-}

http://git-wip-us.apache.org/repos/asf/kafka/blob/4fac83ba/tools/src/main/java/org/apache/kafka/trogdor/fault/FaultSet.java
----------------------------------------------------------------------
diff --git a/tools/src/main/java/org/apache/kafka/trogdor/fault/FaultSet.java b/tools/src/main/java/org/apache/kafka/trogdor/fault/FaultSet.java
deleted file mode 100644
index 63e5ff4..0000000
--- a/tools/src/main/java/org/apache/kafka/trogdor/fault/FaultSet.java
+++ /dev/null
@@ -1,146 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.kafka.trogdor.fault;
-
-import java.util.Iterator;
-import java.util.Map;
-import java.util.NoSuchElementException;
-import java.util.TreeMap;
-
-public class FaultSet {
-    private final static long NS_PER_MS = 1000000L;
-
-    /**
-     * Maps fault start times in nanoseconds to faults.
-     */
-    private final TreeMap<Long, Fault> byStart = new TreeMap<Long, Fault>();
-
-    /**
-     * Maps fault end times in nanoseconds to faults.
-     */
-    private final TreeMap<Long, Fault> byEnd = new TreeMap<Long, Fault>();
-
-    /**
-     * Return an iterator that iterates over the fault set in start time order.
-     */
-    public FaultSetIterator iterateByStart() {
-        return new FaultSetIterator(byStart);
-    }
-
-    /**
-     * Return an iterator that iterates over the fault set in end time order.
-     */
-    public FaultSetIterator iterateByEnd() {
-        return new FaultSetIterator(byEnd);
-    }
-
-    /**
-     * Add a new fault to the FaultSet.
-     */
-    public void add(Fault fault) {
-        insertUnique(byStart, fault.spec().startMs() * NS_PER_MS, fault);
-        long endMs = fault.spec().startMs() + fault.spec().durationMs();
-        insertUnique(byEnd, endMs * NS_PER_MS, fault);
-    }
-
-    /**
-     * Insert a new fault to a TreeMap.
-     *
-     * If there is already a fault with the given key, the fault will be stored
-     * with the next available key.
-     */
-    private void insertUnique(TreeMap<Long, Fault> map, long key, Fault fault) {
-        while (true) {
-            Fault existing = map.get(key);
-            if (existing == null) {
-                map.put(key, fault);
-                return;
-            } else if (existing == fault) {
-                return;
-            } else {
-                key++;
-            }
-        }
-    }
-
-    /**
-     * Remove a fault from the TreeMap.  The fault is removed by object equality.
-     */
-    public void remove(Fault fault) {
-        removeUnique(byStart, fault.spec().startMs() * NS_PER_MS, fault);
-        long endMs = fault.spec().startMs() + fault.spec().durationMs();
-        removeUnique(byEnd, endMs * NS_PER_MS, fault);
-    }
-
-    /**
-     * Helper function to remove a fault from a map.  We will search every
-     * element of the map equal to or higher than the given key.
-     */
-    private void removeUnique(TreeMap<Long, Fault> map, long key, Fault fault) {
-        while (true) {
-            Map.Entry<Long, Fault> existing = map.ceilingEntry(key);
-            if (existing == null) {
-                throw new NoSuchElementException("No such element as " + fault);
-            } else if (existing.getValue() == fault) {
-                map.remove(existing.getKey());
-                return;
-            } else {
-                key = existing.getKey() + 1;
-            }
-        }
-    }
-
-    /**
-     * An iterator over the FaultSet.
-     */
-    class FaultSetIterator implements Iterator<Fault> {
-        private final TreeMap<Long, Fault> map;
-        private Fault cur = null;
-        private long prevKey = -1;
-
-        FaultSetIterator(TreeMap<Long, Fault> map) {
-            this.map = map;
-        }
-
-        @Override
-        public boolean hasNext() {
-            Map.Entry<Long, Fault> entry = map.higherEntry(prevKey);
-            return entry != null;
-        }
-
-        @Override
-        public Fault next() {
-            Map.Entry<Long, Fault> entry = map.higherEntry(prevKey);
-            if (entry == null) {
-                throw new NoSuchElementException();
-            }
-            prevKey = entry.getKey();
-            cur = entry.getValue();
-            return cur;
-        }
-
-        @Override
-        public void remove() {
-            if (cur == null) {
-                throw new IllegalStateException();
-            }
-            FaultSet.this.remove(cur);
-            cur = null;
-        }
-    }
-};