You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by rs...@apache.org on 2017/11/03 09:38:05 UTC

[2/4] kafka git commit: KAFKA-6060; Add workload generation capabilities to Trogdor

http://git-wip-us.apache.org/repos/asf/kafka/blob/4fac83ba/tools/src/main/java/org/apache/kafka/trogdor/fault/FaultSpec.java
----------------------------------------------------------------------
diff --git a/tools/src/main/java/org/apache/kafka/trogdor/fault/FaultSpec.java b/tools/src/main/java/org/apache/kafka/trogdor/fault/FaultSpec.java
deleted file mode 100644
index e15c4e9..0000000
--- a/tools/src/main/java/org/apache/kafka/trogdor/fault/FaultSpec.java
+++ /dev/null
@@ -1,59 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.kafka.trogdor.fault;
-
-import com.fasterxml.jackson.annotation.JsonProperty;
-import com.fasterxml.jackson.annotation.JsonTypeInfo;
-import org.apache.kafka.common.utils.Utils;
-
-
-/**
- * The specification for a fault.
- */
-@JsonTypeInfo(use = JsonTypeInfo.Id.CLASS,
-              include = JsonTypeInfo.As.PROPERTY,
-              property = "class")
-public interface FaultSpec {
-    class Util {
-        private static final String SPEC_STRING = "Spec";
-
-        public static Fault createFault(String faultId, FaultSpec faultSpec) throws ClassNotFoundException {
-            String faultSpecClassName = faultSpec.getClass().getName();
-            if (!faultSpecClassName.endsWith(SPEC_STRING)) {
-                throw new RuntimeException("FaultSpec class name must end with " + SPEC_STRING);
-            }
-            String faultClassName = faultSpecClassName.substring(0,
-                    faultSpecClassName.length() - SPEC_STRING.length());
-            return Utils.newParameterizedInstance(faultClassName,
-                String.class, faultId,
-                FaultSpec.class, faultSpec);
-        }
-    }
-
-    /**
-     * Get the start time of this fault in ms.
-     */
-    @JsonProperty
-    long startMs();
-
-    /**
-     * Get the duration of this fault in ms.
-     */
-    @JsonProperty
-    long durationMs();
-}

http://git-wip-us.apache.org/repos/asf/kafka/blob/4fac83ba/tools/src/main/java/org/apache/kafka/trogdor/fault/FaultState.java
----------------------------------------------------------------------
diff --git a/tools/src/main/java/org/apache/kafka/trogdor/fault/FaultState.java b/tools/src/main/java/org/apache/kafka/trogdor/fault/FaultState.java
deleted file mode 100644
index cba8419..0000000
--- a/tools/src/main/java/org/apache/kafka/trogdor/fault/FaultState.java
+++ /dev/null
@@ -1,51 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.kafka.trogdor.fault;
-
-import com.fasterxml.jackson.annotation.JsonSubTypes;
-import com.fasterxml.jackson.annotation.JsonTypeInfo;
-import org.apache.kafka.trogdor.common.JsonUtil;
-import java.util.Objects;
-
-@JsonTypeInfo(use = JsonTypeInfo.Id.NAME,
-    include = JsonTypeInfo.As.PROPERTY,
-    property = "stateName")
-@JsonSubTypes({
-        @JsonSubTypes.Type(value = DoneState.class, name = "done"),
-        @JsonSubTypes.Type(value = PendingState.class, name = "pending"),
-        @JsonSubTypes.Type(value = RunningState.class, name = "running"),
-        @JsonSubTypes.Type(value = SendingState.class, name = "sending")
-    })
-public abstract class FaultState {
-    @Override
-    public final boolean equals(Object o) {
-        if (this == o) return true;
-        if (o == null || getClass() != o.getClass()) return false;
-        return toString().equals(o.toString());
-    }
-
-    @Override
-    public final int hashCode() {
-        return Objects.hashCode(toString());
-    }
-
-    @Override
-    public final String toString() {
-        return JsonUtil.toJsonString(this);
-    }
-}

http://git-wip-us.apache.org/repos/asf/kafka/blob/4fac83ba/tools/src/main/java/org/apache/kafka/trogdor/fault/NetworkPartitionFault.java
----------------------------------------------------------------------
diff --git a/tools/src/main/java/org/apache/kafka/trogdor/fault/NetworkPartitionFault.java b/tools/src/main/java/org/apache/kafka/trogdor/fault/NetworkPartitionFault.java
deleted file mode 100644
index cf3270a..0000000
--- a/tools/src/main/java/org/apache/kafka/trogdor/fault/NetworkPartitionFault.java
+++ /dev/null
@@ -1,96 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.kafka.trogdor.fault;
-
-import org.apache.kafka.trogdor.common.Node;
-import org.apache.kafka.trogdor.common.Platform;
-import org.apache.kafka.trogdor.common.Topology;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.net.InetAddress;
-import java.util.ArrayList;
-import java.util.HashSet;
-import java.util.List;
-import java.util.Set;
-import java.util.TreeSet;
-
-public class NetworkPartitionFault extends AbstractFault {
-    private static final Logger log = LoggerFactory.getLogger(NetworkPartitionFault.class);
-
-    private final List<Set<String>> partitions;
-
-    public NetworkPartitionFault(String id, FaultSpec spec) {
-        super(id, spec);
-        NetworkPartitionFaultSpec faultSpec = (NetworkPartitionFaultSpec) spec;
-        this.partitions = new ArrayList<>();
-        HashSet<String> prevNodes = new HashSet<>();
-        for (List<String> partition : faultSpec.partitions()) {
-            for (String nodeName : partition) {
-                if (prevNodes.contains(nodeName)) {
-                    throw new RuntimeException("Node " + nodeName +
-                        " appears in more than one partition.");
-                }
-                prevNodes.add(nodeName);
-                this.partitions.add(new HashSet<String>(partition));
-            }
-        }
-    }
-
-    @Override
-    protected void handleActivation(long now, Platform platform) throws Exception {
-        log.info("Activating NetworkPartitionFault...");
-        runIptablesCommands(platform, "-A");
-    }
-
-    @Override
-    protected void handleDeactivation(long now, Platform platform) throws Exception {
-        log.info("Deactivating NetworkPartitionFault...");
-        runIptablesCommands(platform, "-D");
-    }
-
-    private void runIptablesCommands(Platform platform, String iptablesAction) throws Exception {
-        Node curNode = platform.curNode();
-        Topology topology = platform.topology();
-        TreeSet<String> toBlock = new TreeSet<>();
-        for (Set<String> partition : partitions) {
-            if (!partition.contains(curNode.name())) {
-                for (String nodeName : partition) {
-                    toBlock.add(nodeName);
-                }
-            }
-        }
-        for (String nodeName : toBlock) {
-            Node node = topology.node(nodeName);
-            InetAddress addr = InetAddress.getByName(node.hostname());
-            platform.runCommand(new String[] {
-                "sudo", "iptables", iptablesAction, "INPUT", "-p", "tcp", "-s",
-                addr.getHostAddress(), "-j", "DROP", "-m", "comment", "--comment", nodeName
-            });
-        }
-    }
-
-    @Override
-    public Set<String> targetNodes(Topology topology) {
-        Set<String> targetNodes = new HashSet<>();
-        for (Set<String> partition : partitions) {
-            targetNodes.addAll(partition);
-        }
-        return targetNodes;
-    }
-}

http://git-wip-us.apache.org/repos/asf/kafka/blob/4fac83ba/tools/src/main/java/org/apache/kafka/trogdor/fault/NetworkPartitionFaultController.java
----------------------------------------------------------------------
diff --git a/tools/src/main/java/org/apache/kafka/trogdor/fault/NetworkPartitionFaultController.java b/tools/src/main/java/org/apache/kafka/trogdor/fault/NetworkPartitionFaultController.java
new file mode 100644
index 0000000..d90534f
--- /dev/null
+++ b/tools/src/main/java/org/apache/kafka/trogdor/fault/NetworkPartitionFaultController.java
@@ -0,0 +1,42 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.trogdor.fault;
+
+import org.apache.kafka.trogdor.common.Topology;
+import org.apache.kafka.trogdor.task.TaskController;
+
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+
+public class NetworkPartitionFaultController implements TaskController {
+    private final List<Set<String>> partitionSets;
+
+    public NetworkPartitionFaultController(List<Set<String>> partitionSets) {
+        this.partitionSets = partitionSets;
+    }
+
+    @Override
+    public Set<String> targetNodes(Topology topology) {
+        Set<String> targetNodes = new HashSet<>();
+        for (Set<String> partitionSet : partitionSets) {
+            targetNodes.addAll(partitionSet);
+        }
+        return targetNodes;
+    }
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/4fac83ba/tools/src/main/java/org/apache/kafka/trogdor/fault/NetworkPartitionFaultSpec.java
----------------------------------------------------------------------
diff --git a/tools/src/main/java/org/apache/kafka/trogdor/fault/NetworkPartitionFaultSpec.java b/tools/src/main/java/org/apache/kafka/trogdor/fault/NetworkPartitionFaultSpec.java
index d734dce..7b9ccc4 100644
--- a/tools/src/main/java/org/apache/kafka/trogdor/fault/NetworkPartitionFaultSpec.java
+++ b/tools/src/main/java/org/apache/kafka/trogdor/fault/NetworkPartitionFaultSpec.java
@@ -19,15 +19,19 @@ package org.apache.kafka.trogdor.fault;
 
 import com.fasterxml.jackson.annotation.JsonCreator;
 import com.fasterxml.jackson.annotation.JsonProperty;
-import org.apache.kafka.trogdor.common.JsonUtil;
+import org.apache.kafka.trogdor.task.TaskController;
+import org.apache.kafka.trogdor.task.TaskSpec;
+import org.apache.kafka.trogdor.task.TaskWorker;
 
+import java.util.ArrayList;
+import java.util.HashSet;
 import java.util.List;
-import java.util.Objects;
+import java.util.Set;
 
 /**
  * The specification for a fault that creates a network partition.
  */
-public class NetworkPartitionFaultSpec extends AbstractFaultSpec {
+public class NetworkPartitionFaultSpec extends TaskSpec {
     private final List<List<String>> partitions;
 
     @JsonCreator
@@ -44,22 +48,28 @@ public class NetworkPartitionFaultSpec extends AbstractFaultSpec {
     }
 
     @Override
-    public boolean equals(Object o) {
-        if (this == o) return true;
-        if (o == null || getClass() != o.getClass()) return false;
-        NetworkPartitionFaultSpec that = (NetworkPartitionFaultSpec) o;
-        return Objects.equals(startMs(), that.startMs()) &&
-            Objects.equals(durationMs(), that.durationMs()) &&
-            Objects.equals(partitions, that.partitions);
+    public TaskController newController(String id) {
+        return new NetworkPartitionFaultController(partitionSets());
     }
 
     @Override
-    public int hashCode() {
-        return Objects.hash(startMs(), durationMs(), partitions);
+    public TaskWorker newTaskWorker(String id) {
+        return new NetworkPartitionFaultWorker(id, partitionSets());
     }
 
-    @Override
-    public String toString() {
-        return JsonUtil.toJsonString(this);
+    private List<Set<String>> partitionSets() {
+        List<Set<String>> partitionSets = new ArrayList<>();
+        HashSet<String> prevNodes = new HashSet<>();
+        for (List<String> partition : this.partitions()) {
+            for (String nodeName : partition) {
+                if (prevNodes.contains(nodeName)) {
+                    throw new RuntimeException("Node " + nodeName +
+                        " appears in more than one partition.");
+                }
+                prevNodes.add(nodeName);
+                partitionSets.add(new HashSet<>(partition));
+            }
+        }
+        return partitionSets;
     }
 }

http://git-wip-us.apache.org/repos/asf/kafka/blob/4fac83ba/tools/src/main/java/org/apache/kafka/trogdor/fault/NetworkPartitionFaultWorker.java
----------------------------------------------------------------------
diff --git a/tools/src/main/java/org/apache/kafka/trogdor/fault/NetworkPartitionFaultWorker.java b/tools/src/main/java/org/apache/kafka/trogdor/fault/NetworkPartitionFaultWorker.java
new file mode 100644
index 0000000..787c5e0
--- /dev/null
+++ b/tools/src/main/java/org/apache/kafka/trogdor/fault/NetworkPartitionFaultWorker.java
@@ -0,0 +1,79 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.trogdor.fault;
+
+import org.apache.kafka.common.internals.KafkaFutureImpl;
+import org.apache.kafka.trogdor.common.Node;
+import org.apache.kafka.trogdor.common.Platform;
+import org.apache.kafka.trogdor.common.Topology;
+import org.apache.kafka.trogdor.task.TaskWorker;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.net.InetAddress;
+import java.util.List;
+import java.util.Set;
+import java.util.TreeSet;
+import java.util.concurrent.atomic.AtomicReference;
+
+public class NetworkPartitionFaultWorker implements TaskWorker {
+    private static final Logger log = LoggerFactory.getLogger(NetworkPartitionFaultWorker.class);
+
+    private final String id;
+
+    private final List<Set<String>> partitionSets;
+
+    public NetworkPartitionFaultWorker(String id, List<Set<String>> partitionSets) {
+        this.id = id;
+        this.partitionSets = partitionSets;
+    }
+
+    @Override
+    public void start(Platform platform, AtomicReference<String> status,
+                      KafkaFutureImpl<String> errorFuture) throws Exception {
+        log.info("Activating NetworkPartitionFault {}.", id);
+        runIptablesCommands(platform, "-A");
+    }
+
+    @Override
+    public void stop(Platform platform) throws Exception {
+        log.info("Deactivating NetworkPartitionFault {}.", id);
+        runIptablesCommands(platform, "-D");
+    }
+
+    private void runIptablesCommands(Platform platform, String iptablesAction) throws Exception {
+        Node curNode = platform.curNode();
+        Topology topology = platform.topology();
+        TreeSet<String> toBlock = new TreeSet<>();
+        for (Set<String> partitionSet : partitionSets) {
+            if (!partitionSet.contains(curNode.name())) {
+                for (String nodeName : partitionSet) {
+                    toBlock.add(nodeName);
+                }
+            }
+        }
+        for (String nodeName : toBlock) {
+            Node node = topology.node(nodeName);
+            InetAddress addr = InetAddress.getByName(node.hostname());
+            platform.runCommand(new String[] {
+                "sudo", "iptables", iptablesAction, "INPUT", "-p", "tcp", "-s",
+                addr.getHostAddress(), "-j", "DROP", "-m", "comment", "--comment", nodeName
+            });
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/4fac83ba/tools/src/main/java/org/apache/kafka/trogdor/fault/NoOpFault.java
----------------------------------------------------------------------
diff --git a/tools/src/main/java/org/apache/kafka/trogdor/fault/NoOpFault.java b/tools/src/main/java/org/apache/kafka/trogdor/fault/NoOpFault.java
deleted file mode 100644
index 70b4965..0000000
--- a/tools/src/main/java/org/apache/kafka/trogdor/fault/NoOpFault.java
+++ /dev/null
@@ -1,57 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.kafka.trogdor.fault;
-
-import org.apache.kafka.trogdor.common.Node;
-import org.apache.kafka.trogdor.common.Platform;
-import org.apache.kafka.trogdor.common.Topology;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.util.HashSet;
-import java.util.Map;
-import java.util.Set;
-
-public class NoOpFault extends AbstractFault {
-    private static final Logger log = LoggerFactory.getLogger(NoOpFault.class);
-
-    public NoOpFault(String id, FaultSpec spec) {
-        super(id, spec);
-    }
-
-    @Override
-    protected void handleActivation(long now, Platform platform) throws Exception {
-        log.info("Activating NoOpFault...");
-    }
-
-    @Override
-    protected void handleDeactivation(long now, Platform platform) throws Exception {
-        log.info("Deactivating NoOpFault...");
-    }
-
-    @Override
-    public Set<String> targetNodes(Topology topology) {
-        Set<String> set = new HashSet<>();
-        for (Map.Entry<String, Node> entry : topology.nodes().entrySet()) {
-            if (Node.Util.getTrogdorAgentPort(entry.getValue()) > 0) {
-                set.add(entry.getKey());
-            }
-        }
-        return set;
-    }
-}

http://git-wip-us.apache.org/repos/asf/kafka/blob/4fac83ba/tools/src/main/java/org/apache/kafka/trogdor/fault/NoOpFaultSpec.java
----------------------------------------------------------------------
diff --git a/tools/src/main/java/org/apache/kafka/trogdor/fault/NoOpFaultSpec.java b/tools/src/main/java/org/apache/kafka/trogdor/fault/NoOpFaultSpec.java
deleted file mode 100644
index 1d4b94d..0000000
--- a/tools/src/main/java/org/apache/kafka/trogdor/fault/NoOpFaultSpec.java
+++ /dev/null
@@ -1,50 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.kafka.trogdor.fault;
-
-import com.fasterxml.jackson.annotation.JsonCreator;
-import com.fasterxml.jackson.annotation.JsonProperty;
-
-import java.util.Objects;
-
-/**
- * The specification for a fault that does nothing.
- *
- * This fault type exists mainly to test the fault injection system.
- */
-public class NoOpFaultSpec extends AbstractFaultSpec {
-    @JsonCreator
-    public NoOpFaultSpec(@JsonProperty("startMs") long startMs,
-                         @JsonProperty("durationMs") long durationMs) {
-        super(startMs, durationMs);
-    }
-
-    @Override
-    public boolean equals(Object o) {
-        if (this == o) return true;
-        if (o == null || getClass() != o.getClass()) return false;
-        NoOpFaultSpec that = (NoOpFaultSpec) o;
-        return Objects.equals(startMs(), that.startMs()) &&
-            Objects.equals(durationMs(), that.durationMs());
-    }
-
-    @Override
-    public int hashCode() {
-        return Objects.hash(startMs(), durationMs());
-    }
-}

http://git-wip-us.apache.org/repos/asf/kafka/blob/4fac83ba/tools/src/main/java/org/apache/kafka/trogdor/fault/PendingState.java
----------------------------------------------------------------------
diff --git a/tools/src/main/java/org/apache/kafka/trogdor/fault/PendingState.java b/tools/src/main/java/org/apache/kafka/trogdor/fault/PendingState.java
deleted file mode 100644
index 57c8e88..0000000
--- a/tools/src/main/java/org/apache/kafka/trogdor/fault/PendingState.java
+++ /dev/null
@@ -1,30 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.kafka.trogdor.fault;
-
-import com.fasterxml.jackson.annotation.JsonCreator;
-
-/**
- * The state a fault is in on the agent or controller when we haven't yet done
- * anything with it.
- */
-public class PendingState extends FaultState {
-    @JsonCreator
-    public PendingState() {
-    }
-}

http://git-wip-us.apache.org/repos/asf/kafka/blob/4fac83ba/tools/src/main/java/org/apache/kafka/trogdor/fault/RunningState.java
----------------------------------------------------------------------
diff --git a/tools/src/main/java/org/apache/kafka/trogdor/fault/RunningState.java b/tools/src/main/java/org/apache/kafka/trogdor/fault/RunningState.java
deleted file mode 100644
index 1b81bf5..0000000
--- a/tools/src/main/java/org/apache/kafka/trogdor/fault/RunningState.java
+++ /dev/null
@@ -1,38 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.kafka.trogdor.fault;
-
-import com.fasterxml.jackson.annotation.JsonCreator;
-import com.fasterxml.jackson.annotation.JsonProperty;
-
-/**
- * The state a fault is in on the agent when it is running.
- */
-public class RunningState extends FaultState {
-    private final long startedMs;
-
-    @JsonCreator
-    public RunningState(@JsonProperty("startedMs") long startedMs) {
-        this.startedMs = startedMs;
-    }
-
-    @JsonProperty
-    public long startedMs() {
-        return startedMs;
-    }
-}

http://git-wip-us.apache.org/repos/asf/kafka/blob/4fac83ba/tools/src/main/java/org/apache/kafka/trogdor/fault/SendingState.java
----------------------------------------------------------------------
diff --git a/tools/src/main/java/org/apache/kafka/trogdor/fault/SendingState.java b/tools/src/main/java/org/apache/kafka/trogdor/fault/SendingState.java
deleted file mode 100644
index edfbed2..0000000
--- a/tools/src/main/java/org/apache/kafka/trogdor/fault/SendingState.java
+++ /dev/null
@@ -1,64 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.kafka.trogdor.fault;
-
-import com.fasterxml.jackson.annotation.JsonProperty;
-import org.apache.kafka.common.utils.Utils;
-
-import java.util.TreeMap;
-import java.util.Set;
-
-/**
- * The state a fault is in on the controller when it is scheduled to be sent to several agents.
- */
-public class SendingState extends FaultState {
-    private final TreeMap<String, Boolean> nodes;
-    private int remainingNodes;
-
-    public SendingState(@JsonProperty("nodeNames") Set<String> nodeNames) {
-        this.nodes = new TreeMap<>();
-        for (String nodeName : nodeNames) {
-            nodes.put(nodeName, false);
-        }
-        remainingNodes = nodeNames.size();
-    }
-
-    @JsonProperty
-    public synchronized Set<String> nodeNames() {
-        return nodes.keySet();
-    }
-
-    /**
-     * Complete a send operation.
-     *
-     * @param nodeName      The name of the node we sent to.
-     * @return              True if there are no more send operations left.
-     */
-    public synchronized boolean completeSend(String nodeName) {
-        if (!nodes.containsKey(nodeName)) {
-            throw new RuntimeException("Node " + nodeName + " was not to supposed to " +
-                "receive this fault.  The fault was scheduled on nodes: " +
-                Utils.join(nodes.keySet(), ", "));
-        }
-        if (nodes.put(nodeName, true)) {
-            throw new RuntimeException("Node " + nodeName + " already received this fault.");
-        }
-        remainingNodes--;
-        return remainingNodes == 0;
-    }
-}

http://git-wip-us.apache.org/repos/asf/kafka/blob/4fac83ba/tools/src/main/java/org/apache/kafka/trogdor/rest/AgentFaultsResponse.java
----------------------------------------------------------------------
diff --git a/tools/src/main/java/org/apache/kafka/trogdor/rest/AgentFaultsResponse.java b/tools/src/main/java/org/apache/kafka/trogdor/rest/AgentFaultsResponse.java
deleted file mode 100644
index a1b5246..0000000
--- a/tools/src/main/java/org/apache/kafka/trogdor/rest/AgentFaultsResponse.java
+++ /dev/null
@@ -1,52 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.kafka.trogdor.rest;
-
-import com.fasterxml.jackson.annotation.JsonCreator;
-import com.fasterxml.jackson.annotation.JsonProperty;
-import org.apache.kafka.trogdor.common.JsonUtil;
-
-import java.util.Map;
-
-/**
- * Response to GET /faults
- */
-public class AgentFaultsResponse extends FaultDataMap {
-    @JsonCreator
-    public AgentFaultsResponse(@JsonProperty("faults") Map<String, FaultData> faults) {
-        super(faults);
-    }
-
-    @Override
-    public boolean equals(Object o) {
-        if (this == o) return true;
-        if (o == null || getClass() != o.getClass()) return false;
-        AgentFaultsResponse that = (AgentFaultsResponse) o;
-        return super.equals(that);
-    }
-
-    @Override
-    public int hashCode() {
-        return super.hashCode();
-    }
-
-    @Override
-    public String toString() {
-        return JsonUtil.toJsonString(this);
-    }
-}

http://git-wip-us.apache.org/repos/asf/kafka/blob/4fac83ba/tools/src/main/java/org/apache/kafka/trogdor/rest/AgentStatusResponse.java
----------------------------------------------------------------------
diff --git a/tools/src/main/java/org/apache/kafka/trogdor/rest/AgentStatusResponse.java b/tools/src/main/java/org/apache/kafka/trogdor/rest/AgentStatusResponse.java
index 8e32f87..77b4bfb 100644
--- a/tools/src/main/java/org/apache/kafka/trogdor/rest/AgentStatusResponse.java
+++ b/tools/src/main/java/org/apache/kafka/trogdor/rest/AgentStatusResponse.java
@@ -19,41 +19,30 @@ package org.apache.kafka.trogdor.rest;
 
 import com.fasterxml.jackson.annotation.JsonCreator;
 import com.fasterxml.jackson.annotation.JsonProperty;
-import org.apache.kafka.trogdor.common.JsonUtil;
 
-import java.util.Objects;
+import java.util.TreeMap;
 
 /**
- * The status of the Trogdor agent.
+ * A response from the Trogdor agent about the worker states and specifications.
  */
-public class AgentStatusResponse {
-    private final long startTimeMs;
+public class AgentStatusResponse extends Message {
+    private final long serverStartMs;
+    private final TreeMap<String, WorkerState> workers;
 
     @JsonCreator
-    public AgentStatusResponse(@JsonProperty("startTimeMs") long startTimeMs) {
-        this.startTimeMs = startTimeMs;
+    public AgentStatusResponse(@JsonProperty("serverStartMs") long serverStartMs,
+            @JsonProperty("workers") TreeMap<String, WorkerState> workers) {
+        this.serverStartMs = serverStartMs;
+        this.workers = workers;
     }
 
     @JsonProperty
-    public long startTimeMs() {
-        return startTimeMs;
+    public long serverStartMs() {
+        return serverStartMs;
     }
 
-    @Override
-    public boolean equals(Object o) {
-        if (this == o) return true;
-        if (o == null || getClass() != o.getClass()) return false;
-        AgentStatusResponse that = (AgentStatusResponse) o;
-        return Objects.equals(startTimeMs, that.startTimeMs);
-    }
-
-    @Override
-    public int hashCode() {
-        return Objects.hash(startTimeMs);
-    }
-
-    @Override
-    public String toString() {
-        return JsonUtil.toJsonString(this);
+    @JsonProperty
+    public TreeMap<String, WorkerState> workers() {
+        return workers;
     }
 }

http://git-wip-us.apache.org/repos/asf/kafka/blob/4fac83ba/tools/src/main/java/org/apache/kafka/trogdor/rest/CoordinatorFaultsResponse.java
----------------------------------------------------------------------
diff --git a/tools/src/main/java/org/apache/kafka/trogdor/rest/CoordinatorFaultsResponse.java b/tools/src/main/java/org/apache/kafka/trogdor/rest/CoordinatorFaultsResponse.java
deleted file mode 100644
index df26274..0000000
--- a/tools/src/main/java/org/apache/kafka/trogdor/rest/CoordinatorFaultsResponse.java
+++ /dev/null
@@ -1,52 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.kafka.trogdor.rest;
-
-import com.fasterxml.jackson.annotation.JsonCreator;
-import com.fasterxml.jackson.annotation.JsonProperty;
-import org.apache.kafka.trogdor.common.JsonUtil;
-
-import java.util.Map;
-
-/**
- * Response to GET /faults
- */
-public class CoordinatorFaultsResponse extends FaultDataMap {
-    @JsonCreator
-    public CoordinatorFaultsResponse(@JsonProperty("faults") Map<String, FaultData> faults) {
-        super(faults);
-    }
-
-    @Override
-    public boolean equals(Object o) {
-        if (this == o) return true;
-        if (o == null || getClass() != o.getClass()) return false;
-        CoordinatorFaultsResponse that = (CoordinatorFaultsResponse) o;
-        return super.equals(that);
-    }
-
-    @Override
-    public int hashCode() {
-        return super.hashCode();
-    }
-
-    @Override
-    public String toString() {
-        return JsonUtil.toJsonString(this);
-    }
-}

http://git-wip-us.apache.org/repos/asf/kafka/blob/4fac83ba/tools/src/main/java/org/apache/kafka/trogdor/rest/CoordinatorShutdownRequest.java
----------------------------------------------------------------------
diff --git a/tools/src/main/java/org/apache/kafka/trogdor/rest/CoordinatorShutdownRequest.java b/tools/src/main/java/org/apache/kafka/trogdor/rest/CoordinatorShutdownRequest.java
new file mode 100644
index 0000000..1aacaaf
--- /dev/null
+++ b/tools/src/main/java/org/apache/kafka/trogdor/rest/CoordinatorShutdownRequest.java
@@ -0,0 +1,38 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.trogdor.rest;
+
+import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonProperty;
+
+/**
+ * A request to the Trogdor coordinator to shut down.
+ */
+public class CoordinatorShutdownRequest extends Message {
+    private final boolean stopAgents;
+
+    @JsonCreator
+    public CoordinatorShutdownRequest(@JsonProperty("stopAgents") boolean stopAgents) {
+        this.stopAgents = stopAgents;
+    }
+
+    @JsonProperty
+    public boolean stopAgents() {
+        return stopAgents;
+    }
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/4fac83ba/tools/src/main/java/org/apache/kafka/trogdor/rest/CoordinatorStatusResponse.java
----------------------------------------------------------------------
diff --git a/tools/src/main/java/org/apache/kafka/trogdor/rest/CoordinatorStatusResponse.java b/tools/src/main/java/org/apache/kafka/trogdor/rest/CoordinatorStatusResponse.java
index 348e310..8840d29 100644
--- a/tools/src/main/java/org/apache/kafka/trogdor/rest/CoordinatorStatusResponse.java
+++ b/tools/src/main/java/org/apache/kafka/trogdor/rest/CoordinatorStatusResponse.java
@@ -19,41 +19,20 @@ package org.apache.kafka.trogdor.rest;
 
 import com.fasterxml.jackson.annotation.JsonCreator;
 import com.fasterxml.jackson.annotation.JsonProperty;
-import org.apache.kafka.trogdor.common.JsonUtil;
-
-import java.util.Objects;
 
 /**
- * The status of the Trogdor coordinator.
+ * A status response from the Trogdor coordinator.
  */
-public class CoordinatorStatusResponse {
-    private final long startTimeMs;
+public class CoordinatorStatusResponse extends Message {
+    private final long serverStartMs;
 
     @JsonCreator
-    public CoordinatorStatusResponse(@JsonProperty("startTimeMs") long startTimeMs) {
-        this.startTimeMs = startTimeMs;
+    public CoordinatorStatusResponse(@JsonProperty("serverStartMs") long serverStartMs) {
+        this.serverStartMs = serverStartMs;
     }
 
     @JsonProperty
-    public long startTimeMs() {
-        return startTimeMs;
-    }
-
-    @Override
-    public boolean equals(Object o) {
-        if (this == o) return true;
-        if (o == null || getClass() != o.getClass()) return false;
-        CoordinatorStatusResponse that = (CoordinatorStatusResponse) o;
-        return Objects.equals(startTimeMs, that.startTimeMs);
-    }
-
-    @Override
-    public int hashCode() {
-        return Objects.hash(startTimeMs);
-    }
-
-    @Override
-    public String toString() {
-        return JsonUtil.toJsonString(this);
+    public long serverStartMs() {
+        return serverStartMs;
     }
 }

http://git-wip-us.apache.org/repos/asf/kafka/blob/4fac83ba/tools/src/main/java/org/apache/kafka/trogdor/rest/CreateAgentFaultRequest.java
----------------------------------------------------------------------
diff --git a/tools/src/main/java/org/apache/kafka/trogdor/rest/CreateAgentFaultRequest.java b/tools/src/main/java/org/apache/kafka/trogdor/rest/CreateAgentFaultRequest.java
deleted file mode 100644
index 6e772d9..0000000
--- a/tools/src/main/java/org/apache/kafka/trogdor/rest/CreateAgentFaultRequest.java
+++ /dev/null
@@ -1,69 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.kafka.trogdor.rest;
-
-import com.fasterxml.jackson.annotation.JsonCreator;
-import com.fasterxml.jackson.annotation.JsonProperty;
-import org.apache.kafka.trogdor.common.JsonUtil;
-import org.apache.kafka.trogdor.fault.FaultSpec;
-
-import java.util.Objects;
-
-/**
- * A request to the Trogdor agent to create a fault.
- */
-public class CreateAgentFaultRequest {
-    private final String id;
-    private final FaultSpec spec;
-
-    @JsonCreator
-    public CreateAgentFaultRequest(@JsonProperty("id") String id,
-            @JsonProperty("spec") FaultSpec spec) {
-        this.id = id;
-        this.spec = spec;
-    }
-
-    @JsonProperty
-    public String id() {
-        return id;
-    }
-
-    @JsonProperty
-    public FaultSpec spec() {
-        return spec;
-    }
-
-    @Override
-    public boolean equals(Object o) {
-        if (this == o) return true;
-        if (o == null || getClass() != o.getClass()) return false;
-        CreateAgentFaultRequest that = (CreateAgentFaultRequest) o;
-        return Objects.equals(id, that.id) &&
-               Objects.equals(spec, that.spec);
-    }
-
-    @Override
-    public int hashCode() {
-        return Objects.hash(id, spec);
-    }
-
-    @Override
-    public String toString() {
-        return JsonUtil.toJsonString(this);
-    }
-}

http://git-wip-us.apache.org/repos/asf/kafka/blob/4fac83ba/tools/src/main/java/org/apache/kafka/trogdor/rest/CreateCoordinatorFaultRequest.java
----------------------------------------------------------------------
diff --git a/tools/src/main/java/org/apache/kafka/trogdor/rest/CreateCoordinatorFaultRequest.java b/tools/src/main/java/org/apache/kafka/trogdor/rest/CreateCoordinatorFaultRequest.java
deleted file mode 100644
index ec00cf3..0000000
--- a/tools/src/main/java/org/apache/kafka/trogdor/rest/CreateCoordinatorFaultRequest.java
+++ /dev/null
@@ -1,69 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.kafka.trogdor.rest;
-
-import com.fasterxml.jackson.annotation.JsonCreator;
-import com.fasterxml.jackson.annotation.JsonProperty;
-import org.apache.kafka.trogdor.common.JsonUtil;
-import org.apache.kafka.trogdor.fault.FaultSpec;
-
-import java.util.Objects;
-
-/**
- * A request to the Trogdor coordinator to create a fault.
- */
-public class CreateCoordinatorFaultRequest {
-    private final String id;
-    private final FaultSpec spec;
-
-    @JsonCreator
-    public CreateCoordinatorFaultRequest(@JsonProperty("id") String id,
-            @JsonProperty("spec") FaultSpec spec) {
-        this.id = id;
-        this.spec = spec;
-    }
-
-    @JsonProperty
-    public String id() {
-        return id;
-    }
-
-    @JsonProperty
-    public FaultSpec spec() {
-        return spec;
-    }
-
-    @Override
-    public boolean equals(Object o) {
-        if (this == o) return true;
-        if (o == null || getClass() != o.getClass()) return false;
-        CreateCoordinatorFaultRequest that = (CreateCoordinatorFaultRequest) o;
-        return Objects.equals(id, that.id) &&
-               Objects.equals(spec, that.spec);
-    }
-
-    @Override
-    public int hashCode() {
-        return Objects.hash(id, spec);
-    }
-
-    @Override
-    public String toString() {
-        return JsonUtil.toJsonString(this);
-    }
-}

http://git-wip-us.apache.org/repos/asf/kafka/blob/4fac83ba/tools/src/main/java/org/apache/kafka/trogdor/rest/CreateTaskRequest.java
----------------------------------------------------------------------
diff --git a/tools/src/main/java/org/apache/kafka/trogdor/rest/CreateTaskRequest.java b/tools/src/main/java/org/apache/kafka/trogdor/rest/CreateTaskRequest.java
new file mode 100644
index 0000000..d463e36
--- /dev/null
+++ b/tools/src/main/java/org/apache/kafka/trogdor/rest/CreateTaskRequest.java
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.trogdor.rest;
+
+import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonProperty;
+import org.apache.kafka.trogdor.task.TaskSpec;
+
+/**
+ * A request to the Trogdor coorinator to create a task.
+ */
+public class CreateTaskRequest extends Message {
+    private final String id;
+    private final TaskSpec spec;
+
+    @JsonCreator
+    public CreateTaskRequest(@JsonProperty("id") String id,
+            @JsonProperty("spec") TaskSpec spec) {
+        this.id = id;
+        this.spec = spec;
+    }
+
+    @JsonProperty
+    public String id() {
+        return id;
+    }
+
+    @JsonProperty
+    public TaskSpec spec() {
+        return spec;
+    }
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/4fac83ba/tools/src/main/java/org/apache/kafka/trogdor/rest/CreateTaskResponse.java
----------------------------------------------------------------------
diff --git a/tools/src/main/java/org/apache/kafka/trogdor/rest/CreateTaskResponse.java b/tools/src/main/java/org/apache/kafka/trogdor/rest/CreateTaskResponse.java
new file mode 100644
index 0000000..54ea0f2
--- /dev/null
+++ b/tools/src/main/java/org/apache/kafka/trogdor/rest/CreateTaskResponse.java
@@ -0,0 +1,39 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.trogdor.rest;
+
+import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonProperty;
+import org.apache.kafka.trogdor.task.TaskSpec;
+
+/**
+ * A response from the Trogdor coordinator about creating a task.
+ */
+public class CreateTaskResponse extends Message {
+    private final TaskSpec spec;
+
+    @JsonCreator
+    public CreateTaskResponse(@JsonProperty("spec") TaskSpec spec) {
+        this.spec = spec;
+    }
+
+    @JsonProperty
+    public TaskSpec spec() {
+        return spec;
+    }
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/4fac83ba/tools/src/main/java/org/apache/kafka/trogdor/rest/CreateWorkerRequest.java
----------------------------------------------------------------------
diff --git a/tools/src/main/java/org/apache/kafka/trogdor/rest/CreateWorkerRequest.java b/tools/src/main/java/org/apache/kafka/trogdor/rest/CreateWorkerRequest.java
new file mode 100644
index 0000000..9f6e8dc
--- /dev/null
+++ b/tools/src/main/java/org/apache/kafka/trogdor/rest/CreateWorkerRequest.java
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.trogdor.rest;
+
+import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonProperty;
+import org.apache.kafka.trogdor.task.TaskSpec;
+
+/**
+ * A request to the Trogdor agent to create a worker.
+ */
+public class CreateWorkerRequest extends Message {
+    private final String id;
+    private final TaskSpec spec;
+
+    @JsonCreator
+    public CreateWorkerRequest(@JsonProperty("id") String id,
+            @JsonProperty("spec") TaskSpec spec) {
+        this.id = id;
+        this.spec = spec;
+    }
+
+    @JsonProperty
+    public String id() {
+        return id;
+    }
+
+    @JsonProperty
+    public TaskSpec spec() {
+        return spec;
+    }
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/4fac83ba/tools/src/main/java/org/apache/kafka/trogdor/rest/CreateWorkerResponse.java
----------------------------------------------------------------------
diff --git a/tools/src/main/java/org/apache/kafka/trogdor/rest/CreateWorkerResponse.java b/tools/src/main/java/org/apache/kafka/trogdor/rest/CreateWorkerResponse.java
new file mode 100644
index 0000000..9e068ec
--- /dev/null
+++ b/tools/src/main/java/org/apache/kafka/trogdor/rest/CreateWorkerResponse.java
@@ -0,0 +1,39 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.trogdor.rest;
+
+import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonProperty;
+import org.apache.kafka.trogdor.task.TaskSpec;
+
+/**
+ * A response from the Trogdor agent about creating a worker.
+ */
+public class CreateWorkerResponse extends Message {
+    private final TaskSpec spec;
+
+    @JsonCreator
+    public CreateWorkerResponse(@JsonProperty("spec") TaskSpec spec) {
+        this.spec = spec;
+    }
+
+    @JsonProperty
+    public TaskSpec spec() {
+        return spec;
+    }
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/4fac83ba/tools/src/main/java/org/apache/kafka/trogdor/rest/FaultDataMap.java
----------------------------------------------------------------------
diff --git a/tools/src/main/java/org/apache/kafka/trogdor/rest/FaultDataMap.java b/tools/src/main/java/org/apache/kafka/trogdor/rest/FaultDataMap.java
deleted file mode 100644
index b2f7c91..0000000
--- a/tools/src/main/java/org/apache/kafka/trogdor/rest/FaultDataMap.java
+++ /dev/null
@@ -1,98 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.kafka.trogdor.rest;
-
-import com.fasterxml.jackson.annotation.JsonCreator;
-import com.fasterxml.jackson.annotation.JsonProperty;
-import org.apache.kafka.trogdor.common.JsonUtil;
-import org.apache.kafka.trogdor.fault.FaultSpec;
-import org.apache.kafka.trogdor.fault.FaultState;
-
-import java.util.Map;
-import java.util.Objects;
-
-/**
- * Response to GET /faults
- */
-public class FaultDataMap {
-    private final Map<String, FaultData> faults;
-
-    public static class FaultData  {
-        private final FaultSpec spec;
-        private final FaultState state;
-
-        @JsonCreator
-        public FaultData(@JsonProperty("spec") FaultSpec spec,
-                @JsonProperty("state") FaultState state) {
-            this.spec = spec;
-            this.state = state;
-        }
-
-        @JsonProperty
-        public FaultSpec spec() {
-            return spec;
-        }
-
-        @JsonProperty
-        public FaultState state() {
-            return state;
-        }
-
-        @Override
-        public boolean equals(Object o) {
-            if (this == o) return true;
-            if (o == null || getClass() != o.getClass()) return false;
-            FaultData that = (FaultData) o;
-            return Objects.equals(spec, that.spec) &&
-                Objects.equals(state, that.state);
-        }
-
-        @Override
-        public int hashCode() {
-            return Objects.hash(spec, state);
-        }
-    }
-
-    @JsonCreator
-    public FaultDataMap(@JsonProperty("faults") Map<String, FaultData> faults) {
-        this.faults = faults;
-    }
-
-    @JsonProperty
-    public Map<String, FaultData> faults() {
-        return faults;
-    }
-
-    @Override
-    public boolean equals(Object o) {
-        if (this == o) return true;
-        if (o == null || getClass() != o.getClass()) return false;
-        FaultDataMap that = (FaultDataMap) o;
-        return Objects.equals(faults, that.faults);
-    }
-
-    @Override
-    public int hashCode() {
-        return Objects.hashCode(faults);
-    }
-
-    @Override
-    public String toString() {
-        return JsonUtil.toJsonString(this);
-    }
-}

http://git-wip-us.apache.org/repos/asf/kafka/blob/4fac83ba/tools/src/main/java/org/apache/kafka/trogdor/rest/JsonRestServer.java
----------------------------------------------------------------------
diff --git a/tools/src/main/java/org/apache/kafka/trogdor/rest/JsonRestServer.java b/tools/src/main/java/org/apache/kafka/trogdor/rest/JsonRestServer.java
index 1b23a9e..e61b7fe 100644
--- a/tools/src/main/java/org/apache/kafka/trogdor/rest/JsonRestServer.java
+++ b/tools/src/main/java/org/apache/kafka/trogdor/rest/JsonRestServer.java
@@ -21,6 +21,7 @@ import com.fasterxml.jackson.core.type.TypeReference;
 import com.fasterxml.jackson.jaxrs.json.JacksonJsonProvider;
 
 import org.apache.kafka.trogdor.common.JsonUtil;
+import org.apache.kafka.trogdor.common.ThreadUtils;
 import org.eclipse.jetty.server.Connector;
 import org.eclipse.jetty.server.Handler;
 import org.eclipse.jetty.server.Server;
@@ -43,6 +44,10 @@ import java.io.OutputStream;
 import java.net.HttpURLConnection;
 import java.net.URL;
 import java.nio.charset.StandardCharsets;
+import java.util.concurrent.Callable;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
 
 /**
  * Embedded server for the REST API that provides the control plane for Trogdor.
@@ -50,7 +55,9 @@ import java.nio.charset.StandardCharsets;
 public class JsonRestServer {
     private static final Logger log = LoggerFactory.getLogger(JsonRestServer.class);
 
-    private static final long GRACEFUL_SHUTDOWN_TIMEOUT_MS = 2 * 1000;
+    private static final long GRACEFUL_SHUTDOWN_TIMEOUT_MS = 100;
+
+    private final ScheduledExecutorService shutdownExecutor;
 
     private final Server jettyServer;
 
@@ -63,6 +70,8 @@ public class JsonRestServer {
      *                          0 to use a random port.
      */
     public JsonRestServer(int port) {
+        this.shutdownExecutor = Executors.newSingleThreadScheduledExecutor(
+            ThreadUtils.createThreadFactory("JsonRestServerCleanupExecutor", false));
         this.jettyServer = new Server();
         this.connector = new ServerConnector(jettyServer);
         if (port > 0) {
@@ -78,7 +87,6 @@ public class JsonRestServer {
      */
     public void start(Object... resources) {
         log.info("Starting REST server");
-
         ResourceConfig resourceConfig = new ResourceConfig();
         resourceConfig.register(new JacksonJsonProvider(JsonUtil.JSON_SERDE));
         for (Object resource : resources) {
@@ -119,17 +127,37 @@ public class JsonRestServer {
         return connector.getLocalPort();
     }
 
-    public void stop() {
-        log.info("Stopping REST server");
+    /**
+     * Initiate shutdown, but do not wait for it to complete.
+     */
+    public void beginShutdown() {
+        if (!shutdownExecutor.isShutdown()) {
+            shutdownExecutor.submit(new Callable<Void>() {
+                @Override
+                public Void call() throws Exception {
+                    try {
+                        log.info("Stopping REST server");
+                        jettyServer.stop();
+                        jettyServer.join();
+                        log.info("REST server stopped");
+                    } catch (Exception e) {
+                        log.error("Unable to stop REST server", e);
+                    } finally {
+                        jettyServer.destroy();
+                    }
+                    shutdownExecutor.shutdown();
+                    return null;
+                }
+            });
+        }
+    }
 
-        try {
-            jettyServer.stop();
-            jettyServer.join();
-            log.info("REST server stopped");
-        } catch (Exception e) {
-            log.error("Unable to stop REST server", e);
-        } finally {
-            jettyServer.destroy();
+    /**
+     * Wait for shutdown to complete.  May be called prior to beginShutdown.
+     */
+    public void waitForShutdown() throws InterruptedException {
+        while (!shutdownExecutor.isShutdown()) {
+            shutdownExecutor.awaitTermination(1, TimeUnit.DAYS);
         }
     }
 
@@ -197,6 +225,24 @@ public class JsonRestServer {
         }
     }
 
+    public static <T> HttpResponse<T> httpRequest(String url, String method,
+            Object requestBodyData, TypeReference<T> responseFormat, int maxTries)
+            throws IOException, InterruptedException {
+        IOException exc = null;
+        for (int tries = 0; tries < maxTries; tries++) {
+            if (tries > 0) {
+                Thread.sleep(tries > 1 ? 10 : 2);
+            }
+            try {
+                return httpRequest(url, method, requestBodyData, responseFormat);
+            } catch (IOException e) {
+                log.info("{} {}: error: {}", method, url, e.getMessage());
+                exc = e;
+            }
+        }
+        throw exc;
+    }
+
     public static class HttpResponse<T> {
         private final T body;
         private final ErrorResponse error;

http://git-wip-us.apache.org/repos/asf/kafka/blob/4fac83ba/tools/src/main/java/org/apache/kafka/trogdor/rest/Message.java
----------------------------------------------------------------------
diff --git a/tools/src/main/java/org/apache/kafka/trogdor/rest/Message.java b/tools/src/main/java/org/apache/kafka/trogdor/rest/Message.java
new file mode 100644
index 0000000..c2ee840
--- /dev/null
+++ b/tools/src/main/java/org/apache/kafka/trogdor/rest/Message.java
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.trogdor.rest;
+
+import org.apache.kafka.trogdor.common.JsonUtil;
+
+import java.util.Objects;
+
+public abstract class Message {
+    @Override
+    public final boolean equals(Object o) {
+        if (this == o) return true;
+        if (o == null || getClass() != o.getClass()) return false;
+        return Objects.equals(toString(), o.toString());
+    }
+
+    @Override
+    public final int hashCode() {
+        return toString().hashCode();
+    }
+
+    @Override
+    public final String toString() {
+        return JsonUtil.toJsonString(this);
+    }
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/4fac83ba/tools/src/main/java/org/apache/kafka/trogdor/rest/StopTaskRequest.java
----------------------------------------------------------------------
diff --git a/tools/src/main/java/org/apache/kafka/trogdor/rest/StopTaskRequest.java b/tools/src/main/java/org/apache/kafka/trogdor/rest/StopTaskRequest.java
new file mode 100644
index 0000000..3287801
--- /dev/null
+++ b/tools/src/main/java/org/apache/kafka/trogdor/rest/StopTaskRequest.java
@@ -0,0 +1,38 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.trogdor.rest;
+
+import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonProperty;
+
+/**
+ * A request to the Trogdor agent to stop a task.
+ */
+public class StopTaskRequest extends Message {
+    private final String id;
+
+    @JsonCreator
+    public StopTaskRequest(@JsonProperty("id") String id) {
+        this.id = id;
+    }
+
+    @JsonProperty
+    public String id() {
+        return id;
+    }
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/4fac83ba/tools/src/main/java/org/apache/kafka/trogdor/rest/StopTaskResponse.java
----------------------------------------------------------------------
diff --git a/tools/src/main/java/org/apache/kafka/trogdor/rest/StopTaskResponse.java b/tools/src/main/java/org/apache/kafka/trogdor/rest/StopTaskResponse.java
new file mode 100644
index 0000000..f344dc9
--- /dev/null
+++ b/tools/src/main/java/org/apache/kafka/trogdor/rest/StopTaskResponse.java
@@ -0,0 +1,39 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.trogdor.rest;
+
+import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonProperty;
+import org.apache.kafka.trogdor.task.TaskSpec;
+
+/**
+ * A response from the Trogdor coordinator about stopping a task.
+ */
+public class StopTaskResponse extends Message {
+    private final TaskSpec spec;
+
+    @JsonCreator
+    public StopTaskResponse(@JsonProperty("spec") TaskSpec spec) {
+        this.spec = spec;
+    }
+
+    @JsonProperty
+    public TaskSpec spec() {
+        return spec;
+    }
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/4fac83ba/tools/src/main/java/org/apache/kafka/trogdor/rest/StopWorkerRequest.java
----------------------------------------------------------------------
diff --git a/tools/src/main/java/org/apache/kafka/trogdor/rest/StopWorkerRequest.java b/tools/src/main/java/org/apache/kafka/trogdor/rest/StopWorkerRequest.java
new file mode 100644
index 0000000..54c689a
--- /dev/null
+++ b/tools/src/main/java/org/apache/kafka/trogdor/rest/StopWorkerRequest.java
@@ -0,0 +1,38 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.trogdor.rest;
+
+import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonProperty;
+
+/**
+ * A request to the Trogdor agent to stop a worker.
+ */
+public class StopWorkerRequest extends Message {
+    private final String id;
+
+    @JsonCreator
+    public StopWorkerRequest(@JsonProperty("id") String id) {
+        this.id = id;
+    }
+
+    @JsonProperty
+    public String id() {
+        return id;
+    }
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/4fac83ba/tools/src/main/java/org/apache/kafka/trogdor/rest/StopWorkerResponse.java
----------------------------------------------------------------------
diff --git a/tools/src/main/java/org/apache/kafka/trogdor/rest/StopWorkerResponse.java b/tools/src/main/java/org/apache/kafka/trogdor/rest/StopWorkerResponse.java
new file mode 100644
index 0000000..7d5b468
--- /dev/null
+++ b/tools/src/main/java/org/apache/kafka/trogdor/rest/StopWorkerResponse.java
@@ -0,0 +1,39 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.trogdor.rest;
+
+import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonProperty;
+import org.apache.kafka.trogdor.task.TaskSpec;
+
+/**
+ * A response from the Trogdor agent about stopping a worker.
+ */
+public class StopWorkerResponse extends Message {
+    private final TaskSpec spec;
+
+    @JsonCreator
+    public StopWorkerResponse(@JsonProperty("spec") TaskSpec spec) {
+        this.spec = spec;
+    }
+
+    @JsonProperty
+    public TaskSpec spec() {
+        return spec;
+    }
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/4fac83ba/tools/src/main/java/org/apache/kafka/trogdor/rest/TaskDone.java
----------------------------------------------------------------------
diff --git a/tools/src/main/java/org/apache/kafka/trogdor/rest/TaskDone.java b/tools/src/main/java/org/apache/kafka/trogdor/rest/TaskDone.java
new file mode 100644
index 0000000..536d3f2
--- /dev/null
+++ b/tools/src/main/java/org/apache/kafka/trogdor/rest/TaskDone.java
@@ -0,0 +1,80 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.trogdor.rest;
+
+import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonProperty;
+import org.apache.kafka.trogdor.task.TaskSpec;
+
+/**
+ * The state a task is in once it's done.
+ */
+public class TaskDone extends TaskState {
+    /**
+     * The time on the coordinator when the task was started.
+     */
+    private final long startedMs;
+
+    /**
+     * The time on the coordinator when the task was completed.
+     */
+    private final long doneMs;
+
+    /**
+     * Empty if the task completed without error; the error message otherwise.
+     */
+    private final String error;
+
+    /**
+     * True if the task was manually cancelled, rather than terminating itself.
+     */
+    private final boolean cancelled;
+
+    @JsonCreator
+    public TaskDone(@JsonProperty("spec") TaskSpec spec,
+            @JsonProperty("startedMs") long startedMs,
+            @JsonProperty("doneMs") long doneMs,
+            @JsonProperty("error") String error,
+            @JsonProperty("cancelled") boolean cancelled) {
+        super(spec);
+        this.startedMs = startedMs;
+        this.doneMs = doneMs;
+        this.error = error;
+        this.cancelled = cancelled;
+    }
+
+    @JsonProperty
+    public long startedMs() {
+        return startedMs;
+    }
+
+    @JsonProperty
+    public long doneMs() {
+        return doneMs;
+    }
+
+    @JsonProperty
+    public String error() {
+        return error;
+    }
+
+    @JsonProperty
+    public boolean cancelled() {
+        return cancelled;
+    }
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/4fac83ba/tools/src/main/java/org/apache/kafka/trogdor/rest/TaskPending.java
----------------------------------------------------------------------
diff --git a/tools/src/main/java/org/apache/kafka/trogdor/rest/TaskPending.java b/tools/src/main/java/org/apache/kafka/trogdor/rest/TaskPending.java
new file mode 100644
index 0000000..b0162d3
--- /dev/null
+++ b/tools/src/main/java/org/apache/kafka/trogdor/rest/TaskPending.java
@@ -0,0 +1,32 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.trogdor.rest;
+
+import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonProperty;
+import org.apache.kafka.trogdor.task.TaskSpec;
+
+/**
+ * The state for a task which is still pending.
+ */
+public class TaskPending extends TaskState {
+    @JsonCreator
+    public TaskPending(@JsonProperty("spec") TaskSpec spec) {
+        super(spec);
+    }
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/4fac83ba/tools/src/main/java/org/apache/kafka/trogdor/rest/TaskRunning.java
----------------------------------------------------------------------
diff --git a/tools/src/main/java/org/apache/kafka/trogdor/rest/TaskRunning.java b/tools/src/main/java/org/apache/kafka/trogdor/rest/TaskRunning.java
new file mode 100644
index 0000000..bff3676
--- /dev/null
+++ b/tools/src/main/java/org/apache/kafka/trogdor/rest/TaskRunning.java
@@ -0,0 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.trogdor.rest;
+
+import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonProperty;
+import org.apache.kafka.trogdor.task.TaskSpec;
+
+/**
+ * The state for a task which is being run by the agent.
+ */
+public class TaskRunning extends TaskState {
+    /**
+     * The time on the agent when the task was started.
+     */
+    private final long startedMs;
+
+    @JsonCreator
+    public TaskRunning(@JsonProperty("spec") TaskSpec spec,
+            @JsonProperty("startedMs") long startedMs) {
+        super(spec);
+        this.startedMs = startedMs;
+    }
+
+    @JsonProperty
+    public long startedMs() {
+        return startedMs;
+    }
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/4fac83ba/tools/src/main/java/org/apache/kafka/trogdor/rest/TaskState.java
----------------------------------------------------------------------
diff --git a/tools/src/main/java/org/apache/kafka/trogdor/rest/TaskState.java b/tools/src/main/java/org/apache/kafka/trogdor/rest/TaskState.java
new file mode 100644
index 0000000..28b6108
--- /dev/null
+++ b/tools/src/main/java/org/apache/kafka/trogdor/rest/TaskState.java
@@ -0,0 +1,48 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.trogdor.rest;
+
+import com.fasterxml.jackson.annotation.JsonProperty;
+import com.fasterxml.jackson.annotation.JsonSubTypes;
+import com.fasterxml.jackson.annotation.JsonTypeInfo;
+import org.apache.kafka.trogdor.task.TaskSpec;
+
+/**
+ * The state which a task is in on the Coordinator.
+ */
+@JsonTypeInfo(use = JsonTypeInfo.Id.NAME,
+    include = JsonTypeInfo.As.PROPERTY,
+    property = "state")
+@JsonSubTypes({
+        @JsonSubTypes.Type(value = TaskPending.class, name = "PENDING"),
+        @JsonSubTypes.Type(value = TaskRunning.class, name = "RUNNING"),
+        @JsonSubTypes.Type(value = TaskStopping.class, name = "STOPPING"),
+        @JsonSubTypes.Type(value = TaskDone.class, name = "DONE")
+    })
+public abstract class TaskState extends Message {
+    private final TaskSpec spec;
+
+    public TaskState(TaskSpec spec) {
+        this.spec = spec;
+    }
+
+    @JsonProperty
+    public TaskSpec spec() {
+        return spec;
+    }
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/4fac83ba/tools/src/main/java/org/apache/kafka/trogdor/rest/TaskStopping.java
----------------------------------------------------------------------
diff --git a/tools/src/main/java/org/apache/kafka/trogdor/rest/TaskStopping.java b/tools/src/main/java/org/apache/kafka/trogdor/rest/TaskStopping.java
new file mode 100644
index 0000000..4446b75
--- /dev/null
+++ b/tools/src/main/java/org/apache/kafka/trogdor/rest/TaskStopping.java
@@ -0,0 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.trogdor.rest;
+
+import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonProperty;
+import org.apache.kafka.trogdor.task.TaskSpec;
+
+/**
+ * The state for a task which is being stopped on the coordinator.
+ */
+public class TaskStopping extends TaskState {
+    /**
+     * The time on the agent when the task was received.
+     */
+    private final long startedMs;
+
+    @JsonCreator
+    public TaskStopping(@JsonProperty("spec") TaskSpec spec,
+            @JsonProperty("startedMs") long startedMs) {
+        super(spec);
+        this.startedMs = startedMs;
+    }
+
+    @JsonProperty
+    public long startedMs() {
+        return startedMs;
+    }
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/4fac83ba/tools/src/main/java/org/apache/kafka/trogdor/rest/TasksResponse.java
----------------------------------------------------------------------
diff --git a/tools/src/main/java/org/apache/kafka/trogdor/rest/TasksResponse.java b/tools/src/main/java/org/apache/kafka/trogdor/rest/TasksResponse.java
new file mode 100644
index 0000000..d3b415b
--- /dev/null
+++ b/tools/src/main/java/org/apache/kafka/trogdor/rest/TasksResponse.java
@@ -0,0 +1,40 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.trogdor.rest;
+
+import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonProperty;
+
+import java.util.TreeMap;
+
+/**
+ * The response to /coordinator/tasks
+ */
+public class TasksResponse extends Message {
+    private final TreeMap<String, TaskState> tasks;
+
+    @JsonCreator
+    public TasksResponse(@JsonProperty("tasks") TreeMap<String, TaskState> tasks) {
+        this.tasks = tasks;
+    }
+
+    @JsonProperty
+    public TreeMap<String, TaskState> tasks() {
+        return tasks;
+    }
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/4fac83ba/tools/src/main/java/org/apache/kafka/trogdor/rest/WorkerDone.java
----------------------------------------------------------------------
diff --git a/tools/src/main/java/org/apache/kafka/trogdor/rest/WorkerDone.java b/tools/src/main/java/org/apache/kafka/trogdor/rest/WorkerDone.java
new file mode 100644
index 0000000..0f46b25
--- /dev/null
+++ b/tools/src/main/java/org/apache/kafka/trogdor/rest/WorkerDone.java
@@ -0,0 +1,88 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.trogdor.rest;
+
+import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonProperty;
+import org.apache.kafka.trogdor.task.TaskSpec;
+
+/**
+ * The state a worker is in once it's done.
+ */
+public class WorkerDone extends WorkerState {
+    /**
+     * The time on the agent when the task was started.
+     */
+    private final long startedMs;
+
+    /**
+     * The time on the agent when the task was completed.
+     */
+    private final long doneMs;
+
+    /**
+     * The task status.  The format will depend on the type of task that is
+     * being run.
+     */
+    private final String status;
+
+    /**
+     * Empty if the task completed without error; the error message otherwise.
+     */
+    private final String error;
+
+    @JsonCreator
+    public WorkerDone(@JsonProperty("spec") TaskSpec spec,
+            @JsonProperty("startedMs") long startedMs,
+            @JsonProperty("doneMs") long doneMs,
+            @JsonProperty("status") String status,
+            @JsonProperty("error") String error) {
+        super(spec);
+        this.startedMs = startedMs;
+        this.doneMs = doneMs;
+        this.status = status;
+        this.error = error;
+    }
+
+    @JsonProperty
+    @Override
+    public long startedMs() {
+        return startedMs;
+    }
+
+    @JsonProperty
+    public long doneMs() {
+        return doneMs;
+    }
+
+    @JsonProperty
+    @Override
+    public String status() {
+        return status;
+    }
+
+    @JsonProperty
+    public String error() {
+        return error;
+    }
+
+    @Override
+    public boolean done() {
+        return true;
+    }
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/4fac83ba/tools/src/main/java/org/apache/kafka/trogdor/rest/WorkerReceiving.java
----------------------------------------------------------------------
diff --git a/tools/src/main/java/org/apache/kafka/trogdor/rest/WorkerReceiving.java b/tools/src/main/java/org/apache/kafka/trogdor/rest/WorkerReceiving.java
new file mode 100644
index 0000000..d3e3565
--- /dev/null
+++ b/tools/src/main/java/org/apache/kafka/trogdor/rest/WorkerReceiving.java
@@ -0,0 +1,33 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.trogdor.rest;
+
+import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonProperty;
+import org.apache.kafka.trogdor.task.TaskSpec;
+
+/**
+ * When we're in the process of sending a TaskSpec to the Agent, the Worker is regarded
+ * as being in WorkerReceiving state.
+ */
+public final class WorkerReceiving extends WorkerState {
+    @JsonCreator
+    public WorkerReceiving(@JsonProperty("spec") TaskSpec spec) {
+        super(spec);
+    }
+}