You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by rs...@apache.org on 2017/09/07 12:23:17 UTC

kafka git commit: KAFKA-5777; Add ducktape integration for Trogdor

Repository: kafka
Updated Branches:
  refs/heads/trunk 329d5fa64 -> 4065ffb3e


KAFKA-5777; Add ducktape integration for Trogdor

Author: Colin P. Mccabe <cm...@confluent.io>

Reviewers: Rajini Sivaram <ra...@googlemail.com>

Closes #3726 from cmccabe/KAFKA-5777


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/4065ffb3
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/4065ffb3
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/4065ffb3

Branch: refs/heads/trunk
Commit: 4065ffb3e1a82f71578811a0a329cdc90e5e1567
Parents: 329d5fa
Author: Colin P. Mccabe <cm...@confluent.io>
Authored: Thu Sep 7 13:23:03 2017 +0100
Committer: Rajini Sivaram <ra...@googlemail.com>
Committed: Thu Sep 7 13:23:03 2017 +0100

----------------------------------------------------------------------
 tests/kafkatest/services/trogdor/__init__.py    |  14 +
 tests/kafkatest/services/trogdor/fault_spec.py  |  45 ++++
 .../trogdor/network_partition_fault_spec.py     |  54 ++++
 .../services/trogdor/no_op_fault_spec.py        |  41 +++
 .../services/trogdor/templates/log4j.properties |  23 ++
 tests/kafkatest/services/trogdor/trogdor.py     | 255 +++++++++++++++++++
 tests/kafkatest/tests/tools/trogdor_test.py     |  97 +++++++
 tests/kafkatest/utils/__init__.py               |   2 +-
 tests/kafkatest/utils/util.py                   |   9 +
 .../org/apache/kafka/trogdor/agent/Agent.java   |  50 +++-
 .../kafka/trogdor/basic/BasicPlatform.java      |  13 +-
 .../kafka/trogdor/coordinator/Coordinator.java  |  32 ++-
 .../kafka/trogdor/coordinator/NodeManager.java  |   6 +
 .../kafka/trogdor/fault/AbstractFault.java      | 106 ++++++++
 .../apache/kafka/trogdor/fault/DoneState.java   |  47 ++++
 .../org/apache/kafka/trogdor/fault/Fault.java   |  28 +-
 .../apache/kafka/trogdor/fault/FaultState.java  |  36 ++-
 .../trogdor/fault/NetworkPartitionFault.java    |  46 +---
 .../apache/kafka/trogdor/fault/NoOpFault.java   |  43 +---
 .../kafka/trogdor/fault/PendingState.java       |  30 +++
 .../kafka/trogdor/fault/RunningState.java       |  38 +++
 .../kafka/trogdor/fault/SendingState.java       |  64 +++++
 .../apache/kafka/trogdor/rest/FaultDataMap.java |   2 +-
 .../apache/kafka/trogdor/agent/AgentTest.java   |  37 ++-
 .../trogdor/coordinator/CoordinatorTest.java    |  18 +-
 25 files changed, 993 insertions(+), 143 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/4065ffb3/tests/kafkatest/services/trogdor/__init__.py
----------------------------------------------------------------------
diff --git a/tests/kafkatest/services/trogdor/__init__.py b/tests/kafkatest/services/trogdor/__init__.py
new file mode 100644
index 0000000..ec20143
--- /dev/null
+++ b/tests/kafkatest/services/trogdor/__init__.py
@@ -0,0 +1,14 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.

http://git-wip-us.apache.org/repos/asf/kafka/blob/4065ffb3/tests/kafkatest/services/trogdor/fault_spec.py
----------------------------------------------------------------------
diff --git a/tests/kafkatest/services/trogdor/fault_spec.py b/tests/kafkatest/services/trogdor/fault_spec.py
new file mode 100644
index 0000000..9768765
--- /dev/null
+++ b/tests/kafkatest/services/trogdor/fault_spec.py
@@ -0,0 +1,45 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+import json
+
+
+class FaultSpec(object):
+    """
+    The base class for a fault specification.
+
+    MAX_DURATION_MS         The longest duration we should use for a fault specification.
+    """
+
+    MAX_DURATION_MS=10000000
+
+    def __init__(self, start_ms, duration_ms):
+        """
+        Create a new fault specification.
+
+        :param start_ms:        The start time in milliseconds since the epoch.
+        :param duration_ms:     The duration in milliseconds.
+        """
+        self.start_ms = start_ms
+        self.duration_ms = duration_ms
+
+    def message(self):
+        """
+        Return a message suitable for sending to the Trogdor daemon.
+        """
+        raise NotImplemented
+
+    def __str__(self):
+        return json.dumps(self.message())

http://git-wip-us.apache.org/repos/asf/kafka/blob/4065ffb3/tests/kafkatest/services/trogdor/network_partition_fault_spec.py
----------------------------------------------------------------------
diff --git a/tests/kafkatest/services/trogdor/network_partition_fault_spec.py b/tests/kafkatest/services/trogdor/network_partition_fault_spec.py
new file mode 100644
index 0000000..deb5c56
--- /dev/null
+++ b/tests/kafkatest/services/trogdor/network_partition_fault_spec.py
@@ -0,0 +1,54 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+from kafkatest.services.trogdor.fault_spec import FaultSpec
+
+
+class NetworkPartitionFaultSpec(FaultSpec):
+    """
+    The specification for a network partition fault.
+
+    Network partition faults fracture the network into different partitions
+    that cannot communicate with each other.
+    """
+
+    def __init__(self, start_ms, duration_ms, partitions):
+        """
+        Create a new NetworkPartitionFaultSpec.
+
+        :param start_ms:        The start time, as described in fault_spec.py
+        :param duration_ms:     The duration in milliseconds.
+        :param partitions:      An array of arrays describing the partitions.
+                                The inner arrays may contain either node names,
+                                or ClusterNode objects.
+        """
+        super(NetworkPartitionFaultSpec, self).__init__(start_ms, duration_ms)
+        self.partitions = []
+        for partition in partitions:
+            nodes = []
+            for obj in partition:
+                if isinstance(obj, basestring):
+                    nodes.append(obj)
+                else:
+                    nodes.append(obj.name)
+            self.partitions.append(nodes)
+
+    def message(self):
+        return {
+            "class": "org.apache.kafka.trogdor.fault.NetworkPartitionFaultSpec",
+            "startMs": self.start_ms,
+            "durationMs": self.duration_ms,
+            "partitions": self.partitions,
+        }

http://git-wip-us.apache.org/repos/asf/kafka/blob/4065ffb3/tests/kafkatest/services/trogdor/no_op_fault_spec.py
----------------------------------------------------------------------
diff --git a/tests/kafkatest/services/trogdor/no_op_fault_spec.py b/tests/kafkatest/services/trogdor/no_op_fault_spec.py
new file mode 100644
index 0000000..82e9713
--- /dev/null
+++ b/tests/kafkatest/services/trogdor/no_op_fault_spec.py
@@ -0,0 +1,41 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+from kafkatest.services.trogdor.fault_spec import FaultSpec
+
+
+class NoOpFaultSpec(FaultSpec):
+    """
+    The specification for a nop-op fault.
+
+    No-op faults are used to test the fault injector.  They don't do anything,
+    but must be propagated to all fault injector daemons.
+    """
+
+    def __init__(self, start_ms, duration_ms):
+        """
+        Create a new NoOpFault.
+
+        :param start_ms:        The start time, as described in fault_spec.py
+        :param duration_ms:     The duration in milliseconds.
+        """
+        super(NoOpFaultSpec, self).__init__(start_ms, duration_ms)
+
+    def message(self):
+        return {
+            "class": "org.apache.kafka.trogdor.fault.NoOpFaultSpec",
+            "startMs": self.start_ms,
+            "durationMs": self.duration_ms,
+        }

http://git-wip-us.apache.org/repos/asf/kafka/blob/4065ffb3/tests/kafkatest/services/trogdor/templates/log4j.properties
----------------------------------------------------------------------
diff --git a/tests/kafkatest/services/trogdor/templates/log4j.properties b/tests/kafkatest/services/trogdor/templates/log4j.properties
new file mode 100644
index 0000000..252668e
--- /dev/null
+++ b/tests/kafkatest/services/trogdor/templates/log4j.properties
@@ -0,0 +1,23 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+log4j.rootLogger=DEBUG, mylogger
+log4j.logger.kafka=DEBUG
+log4j.logger.org.apache.kafka=DEBUG
+log4j.logger.org.eclipse=INFO
+log4j.appender.mylogger=org.apache.log4j.FileAppender
+log4j.appender.mylogger.File={{ log_path }}
+log4j.appender.mylogger.layout=org.apache.log4j.PatternLayout
+log4j.appender.mylogger.layout.ConversionPattern=[%d] %p %m (%c)%n

http://git-wip-us.apache.org/repos/asf/kafka/blob/4065ffb3/tests/kafkatest/services/trogdor/trogdor.py
----------------------------------------------------------------------
diff --git a/tests/kafkatest/services/trogdor/trogdor.py b/tests/kafkatest/services/trogdor/trogdor.py
new file mode 100644
index 0000000..8b05e99
--- /dev/null
+++ b/tests/kafkatest/services/trogdor/trogdor.py
@@ -0,0 +1,255 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+import json
+import os.path
+import requests
+from requests.adapters import HTTPAdapter
+from requests.packages.urllib3 import Retry
+
+from ducktape.services.service import Service
+from kafkatest.directory_layout.kafka_path import KafkaPathResolverMixin
+
+
+class TrogdorService(KafkaPathResolverMixin, Service):
+    """
+    A ducktape service for running the trogdor fault injection daemons.
+
+    Attributes:
+        PERSISTENT_ROOT                 The root filesystem path to store service files under.
+        COORDINATOR_STDOUT_STDERR       The path where we store the coordinator's stdout/stderr output.
+        AGENT_STDOUT_STDERR             The path where we store the agents's stdout/stderr output.
+        COORDINATOR_LOG                 The path where we store the coordinator's log4j output.
+        AGENT_LOG                       The path where we store the agent's log4j output.
+        AGENT_LOG4J_PROPERTIES          The path to the agent log4j.properties file for log config.
+        COORDINATOR_LOG4J_PROPERTIES    The path to the coordinator log4j.properties file for log config.
+        CONFIG_PATH                     The path to the trogdor configuration file.
+        DEFAULT_AGENT_PORT              The default port to use for trogdor_agent daemons.
+        DEFAULT_COORDINATOR_PORT        The default port to use for trogdor_coordinator daemons.
+        REQUEST_TIMEOUT                 The request timeout in seconds to use for REST requests.
+        REQUEST_HEADERS                 The request headers to use when communicating with trogdor.
+    """
+
+    PERSISTENT_ROOT="/mnt/trogdor"
+    COORDINATOR_STDOUT_STDERR = os.path.join(PERSISTENT_ROOT, "trogdor-coordinator-stdout-stderr.log")
+    AGENT_STDOUT_STDERR = os.path.join(PERSISTENT_ROOT, "trogdor-agent-stdout-stderr.log")
+    COORDINATOR_LOG = os.path.join(PERSISTENT_ROOT, "trogdor-coordinator.log")
+    AGENT_LOG = os.path.join(PERSISTENT_ROOT, "trogdor-agent.log")
+    COORDINATOR_LOG4J_PROPERTIES = os.path.join(PERSISTENT_ROOT, "trogdor-coordinator-log4j.properties")
+    AGENT_LOG4J_PROPERTIES = os.path.join(PERSISTENT_ROOT, "trogdor-agent-log4j.properties")
+    CONFIG_PATH = os.path.join(PERSISTENT_ROOT, "trogdor.conf")
+    DEFAULT_AGENT_PORT=8888
+    DEFAULT_COORDINATOR_PORT=8889
+    REQUEST_TIMEOUT=5
+    REQUEST_HEADERS = {"Content-type": "application/json"}
+
+    logs = {
+        "trogdor_coordinator_stdout_stderr": {
+            "path": COORDINATOR_STDOUT_STDERR,
+            "collect_default": True},
+        "trogdor_agent_stdout_stderr": {
+            "path": AGENT_STDOUT_STDERR,
+            "collect_default": True},
+        "trogdor_coordinator_log": {
+            "path": COORDINATOR_LOG,
+            "collect_default": True},
+        "trogdor_agent_log": {
+            "path": AGENT_LOG,
+            "collect_default": True},
+    }
+
+    def __init__(self, context, agent_nodes, agent_port=DEFAULT_AGENT_PORT,
+                 coordinator_port=DEFAULT_COORDINATOR_PORT):
+        """
+        Create a Trogdor service.
+
+        :param context:             The test context.
+        :param agent_nodes:         The nodes to run the agents on.
+        :param agent_port:          The port to use for the trogdor_agent daemons.
+        :param coordinator_port:    The port to use for the trogdor_coordinator daemons.
+        """
+        Service.__init__(self, context, num_nodes=1)
+        self.coordinator_node = self.nodes[0]
+        if (len(agent_nodes) == 0):
+            raise RuntimeError("You must supply at least one node to run the service on.")
+        for agent_node in agent_nodes:
+            self.nodes.append(agent_node)
+        self.agent_port = agent_port
+        self.coordinator_port = coordinator_port
+
+    def free(self):
+        # We only want to deallocate the coordinator node, not the agent nodes.  So we
+        # change self.nodes to include only the coordinator node, and then invoke
+        # the base class' free method.
+        if self.coordinator_node is not None:
+            self.nodes = [self.coordinator_node]
+            self.coordinator_node = None
+            Service.free(self)
+
+    def _create_config_dict(self):
+        """
+        Create a dictionary with the Trogdor configuration.
+
+        :return:            The configuration dictionary.
+        """
+        dict_nodes = {}
+        for node in self.nodes:
+            dict_nodes[node.name] = {
+                "hostname": node.account.ssh_hostname,
+                "trogdor.agent.port": self.agent_port,
+            }
+        dict_nodes[self.coordinator_node.name]["trogdor.coordinator.port"] = self.coordinator_port
+        return {
+            "platform": "org.apache.kafka.trogdor.basic.BasicPlatform",
+            "nodes": dict_nodes,
+        }
+
+    def start_node(self, node):
+        node.account.mkdirs(TrogdorService.PERSISTENT_ROOT)
+
+        # Create the configuration file on the node.
+        str = json.dumps(self._create_config_dict(), indent=2)
+        self.logger.info("Creating configuration file %s with %s" % (TrogdorService.CONFIG_PATH, str))
+        node.account.create_file(TrogdorService.CONFIG_PATH, str)
+
+        if self.is_coordinator(node):
+            self._start_coordinator_node(node)
+        else:
+            self._start_agent_node(node)
+
+    def _start_coordinator_node(self, node):
+        node.account.create_file(TrogdorService.COORDINATOR_LOG4J_PROPERTIES,
+                                 self.render('log4j.properties',
+                                             log_path=TrogdorService.COORDINATOR_LOG))
+        self._start_trogdor_daemon("coordinator", TrogdorService.COORDINATOR_STDOUT_STDERR,
+                                   TrogdorService.COORDINATOR_LOG4J_PROPERTIES,
+                                   TrogdorService.COORDINATOR_LOG, node)
+        self.logger.info("Started trogdor coordinator on %s." % node.name)
+
+    def _start_agent_node(self, node):
+        node.account.create_file(TrogdorService.AGENT_LOG4J_PROPERTIES,
+                                 self.render('log4j.properties',
+                                             log_path=TrogdorService.AGENT_LOG))
+        self._start_trogdor_daemon("agent", TrogdorService.AGENT_STDOUT_STDERR,
+                                   TrogdorService.AGENT_LOG4J_PROPERTIES,
+                                   TrogdorService.AGENT_LOG, node)
+        self.logger.info("Started trogdor agent on %s." % node.name)
+
+    def _start_trogdor_daemon(self, daemon_name, stdout_stderr_capture_path,
+                              log4j_properties_path, log_path, node):
+        cmd = "export KAFKA_LOG4J_OPTS='-Dlog4j.configuration=file:%s'; " % log4j_properties_path
+        cmd += "%s %s --%s.config %s --node-name %s 1>> %s 2>> %s &" % \
+               (self.path.script("trogdor.sh", node),
+                daemon_name,
+                daemon_name,
+                TrogdorService.CONFIG_PATH,
+                node.name,
+                stdout_stderr_capture_path,
+                stdout_stderr_capture_path)
+        node.account.ssh(cmd)
+        with node.account.monitor_log(log_path) as monitor:
+            monitor.wait_until("Starting main service thread.", timeout_sec=30, backoff_sec=.25,
+                               err_msg=("%s on %s didn't finish startup" % (daemon_name, node.name)))
+
+    def wait_node(self, node, timeout_sec=None):
+        if self.is_coordinator(node):
+            return len(node.account.java_pids(self.coordinator_class_name())) == 0
+        else:
+            return len(node.account.java_pids(self.agent_class_name())) == 0
+
+    def stop_node(self, node):
+        """Halt trogdor processes on this node."""
+        if self.is_coordinator(node):
+            node.account.kill_java_processes(self.coordinator_class_name())
+        else:
+            node.account.kill_java_processes(self.agent_class_name())
+
+    def clean_node(self, node):
+        """Clean up persistent state on this node - e.g. service logs, configuration files etc."""
+        self.stop_node(node)
+        node.account.ssh("rm -rf -- %s" % TrogdorService.PERSISTENT_ROOT)
+
+    def _coordinator_url(self, path):
+        return "http://%s:%d/coordinator/%s" % \
+               (self.coordinator_node.account.ssh_hostname, self.coordinator_port, path)
+
+    def request_session(self):
+        """
+        Creates a new request session which will retry for a while.
+        """
+        session = requests.Session()
+        session.mount('http://',
+                      HTTPAdapter(max_retries=Retry(total=4, backoff_factor=0.3)))
+        return session
+
+    def _coordinator_put(self, path, message):
+        """
+        Make a PUT request to the Trogdor coordinator.
+
+        :param path:            The URL path to use.
+        :param message:         The message object to send.
+        :return:                The response as an object.
+        """
+        url = self._coordinator_url(path)
+        self.logger.info("PUT %s %s" % (url, message))
+        response = self.request_session().put(url, json=message,
+                                              timeout=TrogdorService.REQUEST_TIMEOUT,
+                                              headers=TrogdorService.REQUEST_HEADERS)
+        response.raise_for_status()
+        return response.json()
+
+    def _coordinator_get(self, path, message):
+        """
+        Make a GET request to the Trogdor coordinator.
+
+        :param path:            The URL path to use.
+        :param message:         The message object to send.
+        :return:                The response as an object.
+        """
+        url = self._coordinator_url(path)
+        self.logger.info("GET %s %s" % (url, message))
+        response = self.request_session().get(url, json=message,
+                                              timeout=TrogdorService.REQUEST_TIMEOUT,
+                                              headers=TrogdorService.REQUEST_HEADERS)
+        response.raise_for_status()
+        return response.json()
+
+    def create_fault(self, id, spec):
+        """
+        Create a new fault.
+
+        :param id:          The fault id.
+        :param spec:        The fault spec.
+        """
+        self._coordinator_put("fault", { "id": id, "spec": spec.message()})
+
+    def get_faults(self):
+        """
+        Get the faults which are on the coordinator.
+
+        :returns:           A map of fault id strings to fault data objects.
+                            Fault data objects contain a 'spec' field with the spec
+                            and a 'state' field with the state.
+        """
+        return self._coordinator_get("faults", {})
+
+    def is_coordinator(self, node):
+        return node == self.coordinator_node
+
+    def agent_class_name(self):
+        return "org.apache.kafka.trogdor.agent.Agent"
+
+    def coordinator_class_name(self):
+        return "org.apache.kafka.trogdor.coordinator.Coordinator"

http://git-wip-us.apache.org/repos/asf/kafka/blob/4065ffb3/tests/kafkatest/tests/tools/trogdor_test.py
----------------------------------------------------------------------
diff --git a/tests/kafkatest/tests/tools/trogdor_test.py b/tests/kafkatest/tests/tools/trogdor_test.py
new file mode 100644
index 0000000..026ecaf
--- /dev/null
+++ b/tests/kafkatest/tests/tools/trogdor_test.py
@@ -0,0 +1,97 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+from kafkatest.services.trogdor.network_partition_fault_spec import NetworkPartitionFaultSpec
+
+from ducktape.cluster.cluster_spec import ClusterSpec
+from ducktape.mark.resource import cluster
+from ducktape.tests.test import Test
+from ducktape.utils.util import wait_until
+from kafkatest.services.trogdor.fault_spec import FaultSpec
+from kafkatest.services.trogdor.no_op_fault_spec import NoOpFaultSpec
+from kafkatest.services.trogdor.trogdor import TrogdorService
+from kafkatest.utils import node_is_reachable
+
+
+class TrogdorTest(Test):
+    """
+    Tests the Trogdor fault injection daemon in isolation.
+    """
+
+    def __init__(self, test_context):
+        super(TrogdorTest, self).__init__(test_context)
+
+    def set_up_trogdor(self, num_agent_nodes):
+        self.agent_nodes = self.test_context.cluster.alloc(ClusterSpec.simple_linux(num_agent_nodes))
+        self.trogdor = TrogdorService(context=self.test_context, agent_nodes=self.agent_nodes)
+        for agent_node in self.agent_nodes:
+            agent_node.account.logger = self.trogdor.logger
+        self.trogdor.start()
+
+    def setUp(self):
+        self.trogdor = None
+        self.agent_nodes = None
+
+    def tearDown(self):
+        if self.trogdor is not None:
+            self.trogdor.stop()
+            self.trogdor = None
+        if self.agent_nodes is not None:
+            self.test_context.cluster.free(self.agent_nodes)
+            self.agent_nodes = None
+
+    @cluster(num_nodes=4)
+    def test_trogdor_service(self):
+        """
+        Test that we can bring up Trogdor and create a no-op fault.
+        """
+        self.set_up_trogdor(3)
+        spec = NoOpFaultSpec(0, FaultSpec.MAX_DURATION_MS)
+        self.trogdor.create_fault("myfault", spec)
+        def check_for_faults():
+            faults = self.trogdor.get_faults()
+            self.logger.info("faults = %s" % faults)
+            return "myfault" in faults
+        wait_until(lambda: check_for_faults,
+                   timeout_sec=10, backoff_sec=.2, err_msg="Failed to read back myfault.")
+
+    @cluster(num_nodes=4)
+    def test_network_partition_fault(self):
+        """
+        Test that the network partition fault results in a true network partition between nodes.
+        """
+        self.set_up_trogdor(3)
+        spec = NetworkPartitionFaultSpec(0, FaultSpec.MAX_DURATION_MS,
+                                            [[self.agent_nodes[0]], self.agent_nodes[1:]])
+        assert 2 == len(spec.partitions)
+        assert [self.agent_nodes[0].name] == spec.partitions[0]
+        assert [self.agent_nodes[1].name, self.agent_nodes[2].name] == spec.partitions[1]
+        self.trogdor.create_fault("partition0", spec)
+        def verify_nodes_partitioned():
+            if node_is_reachable(self.agent_nodes[0], self.agent_nodes[1]):
+                return False
+            if node_is_reachable(self.agent_nodes[1], self.agent_nodes[0]):
+                return False
+            if node_is_reachable(self.agent_nodes[2], self.agent_nodes[0]):
+                return False
+            return True
+        wait_until(lambda: verify_nodes_partitioned,
+                   timeout_sec=10, backoff_sec=.2, err_msg="Failed to verify that the nodes were partitioned.")
+        if not node_is_reachable(self.agent_nodes[0], self.agent_nodes[0]):
+            raise RuntimeError("Node 0 must be reachable from itself.")
+        if not node_is_reachable(self.agent_nodes[1], self.agent_nodes[2]):
+            raise RuntimeError("Node 2 must be reachable from node 1.")
+        if not node_is_reachable(self.agent_nodes[2], self.agent_nodes[1]):
+            raise RuntimeError("Node 1 must be reachable from node 2.")

http://git-wip-us.apache.org/repos/asf/kafka/blob/4065ffb3/tests/kafkatest/utils/__init__.py
----------------------------------------------------------------------
diff --git a/tests/kafkatest/utils/__init__.py b/tests/kafkatest/utils/__init__.py
index 6618016..8c473bf 100644
--- a/tests/kafkatest/utils/__init__.py
+++ b/tests/kafkatest/utils/__init__.py
@@ -13,4 +13,4 @@
 # See the License for the specific language governing permissions and
 # limitations under the License.
 
-from util import kafkatest_version, is_version, is_int, is_int_with_prefix
+from util import kafkatest_version, is_version, is_int, is_int_with_prefix, node_is_reachable

http://git-wip-us.apache.org/repos/asf/kafka/blob/4065ffb3/tests/kafkatest/utils/util.py
----------------------------------------------------------------------
diff --git a/tests/kafkatest/utils/util.py b/tests/kafkatest/utils/util.py
index d82903a..dd20273 100644
--- a/tests/kafkatest/utils/util.py
+++ b/tests/kafkatest/utils/util.py
@@ -103,3 +103,12 @@ def is_int_with_prefix(msg):
                         "prefix dot integer value, but one of the two parts (before or after dot) "
                         "are not integers. Message: %s" % (msg))
 
+def node_is_reachable(src_node, dst_node):
+    """
+    Returns true if a node is unreachable from another node.
+
+    :param src_node:        The source node to check from reachability from.
+    :param dst_node:        The destination node to check for reachability to.
+    :return:                True only if dst is reachable from src.
+    """
+    return 0 == src_node.account.ssh("nc -w 3 -z %s 22" % dst_node.account.hostname, allow_fail=True)

http://git-wip-us.apache.org/repos/asf/kafka/blob/4065ffb3/tools/src/main/java/org/apache/kafka/trogdor/agent/Agent.java
----------------------------------------------------------------------
diff --git a/tools/src/main/java/org/apache/kafka/trogdor/agent/Agent.java b/tools/src/main/java/org/apache/kafka/trogdor/agent/Agent.java
index 0cb01ad..0ddf4c1 100644
--- a/tools/src/main/java/org/apache/kafka/trogdor/agent/Agent.java
+++ b/tools/src/main/java/org/apache/kafka/trogdor/agent/Agent.java
@@ -29,7 +29,7 @@ import org.apache.kafka.trogdor.common.Platform;
 import org.apache.kafka.trogdor.fault.Fault;
 import org.apache.kafka.trogdor.fault.FaultSet;
 import org.apache.kafka.trogdor.fault.FaultSpec;
-import org.apache.kafka.trogdor.fault.FaultState;
+import org.apache.kafka.trogdor.fault.RunningState;
 import org.apache.kafka.trogdor.rest.AgentFaultsResponse;
 import org.apache.kafka.trogdor.rest.CreateAgentFaultRequest;
 import org.apache.kafka.trogdor.rest.JsonRestServer;
@@ -140,7 +140,8 @@ public final class Agent {
                         Iterator<Fault> running = runningFaults.iterateByEnd();
                         while (running.hasNext()) {
                             Fault fault = running.next();
-                            long endMs = fault.spec().startMs() + fault.spec().durationMs();
+                            RunningState state = (RunningState) fault.state();
+                            long endMs = state.startedMs() + fault.spec().durationMs();
                             if (now < endMs) {
                                 nextWakeMs = Math.min(nextWakeMs, endMs);
                                 break;
@@ -154,7 +155,7 @@ public final class Agent {
                     for (Fault fault: toStart) {
                         try {
                             log.debug("Activating fault " + fault);
-                            fault.activate(platform);
+                            fault.activate(now, platform);
                             started.add(fault);
                         } catch (Throwable e) {
                             log.error("Error activating fault " + fault.id(), e);
@@ -164,7 +165,7 @@ public final class Agent {
                     for (Fault fault: toEnd) {
                         try {
                             log.debug("Deactivating fault " + fault);
-                            fault.deactivate(platform);
+                            fault.deactivate(now, platform);
                         } catch (Throwable e) {
                             log.error("Error deactivating fault " + fault.id(), e);
                         } finally {
@@ -200,8 +201,33 @@ public final class Agent {
             } finally {
                 log.info("AgentRunnable shutting down.");
                 restServer.stop();
+                int numDeactivated = deactivateRunningFaults();
+                log.info("AgentRunnable deactivated {} fault(s).", numDeactivated);
+            }
+        }
+    }
+
+    private int deactivateRunningFaults() {
+        long now = time.milliseconds();
+        int numDeactivated = 0;
+        lock.lock();
+        try {
+            for (Iterator<Fault> iter = runningFaults.iterateByStart(); iter.hasNext(); ) {
+                Fault fault = iter.next();
+                try {
+                    numDeactivated++;
+                    iter.remove();
+                    fault.deactivate(now, platform);
+                } catch (Exception e) {
+                    log.error("Got exception while deactivating {}", fault, e);
+                } finally {
+                    doneFaults.add(fault);
+                }
             }
+        } finally {
+            lock.unlock();
         }
+        return numDeactivated;
     }
 
     /**
@@ -257,9 +283,9 @@ public final class Agent {
         Map<String, AgentFaultsResponse.FaultData> faultData = new TreeMap<>();
         lock.lock();
         try {
-            updateFaultsResponse(faultData, pendingFaults, FaultState.PENDING);
-            updateFaultsResponse(faultData, runningFaults, FaultState.RUNNING);
-            updateFaultsResponse(faultData, doneFaults, FaultState.DONE);
+            updateFaultsResponse(faultData, pendingFaults);
+            updateFaultsResponse(faultData, runningFaults);
+            updateFaultsResponse(faultData, doneFaults);
         } finally {
             lock.unlock();
         }
@@ -267,12 +293,12 @@ public final class Agent {
     }
 
     private void updateFaultsResponse(Map<String, AgentFaultsResponse.FaultData> faultData,
-                                      FaultSet faultSet, FaultState state) {
+                                      FaultSet faultSet) {
         for (Iterator<Fault> iter = faultSet.iterateByStart();
                 iter.hasNext(); ) {
             Fault fault = iter.next();
             AgentFaultsResponse.FaultData data =
-                new AgentFaultsResponse.FaultData(fault.spec(), state);
+                new AgentFaultsResponse.FaultData(fault.spec(), fault.state());
             faultData.put(fault.id(), data);
         }
     }
@@ -326,12 +352,14 @@ public final class Agent {
         JsonRestServer restServer =
             new JsonRestServer(Node.Util.getTrogdorAgentPort(platform.curNode()));
         AgentRestResource resource = new AgentRestResource();
-        Agent agent = new Agent(platform, Time.SYSTEM, restServer, resource);
+        final Agent agent = new Agent(platform, Time.SYSTEM, restServer, resource);
         restServer.start(resource);
         Runtime.getRuntime().addShutdownHook(new Thread() {
             @Override
             public void run() {
-                log.error("Agent shutting down...");
+                log.error("Running shutdown hook...");
+                agent.beginShutdown();
+                agent.waitForShutdown();
             }
         });
         agent.waitForShutdown();

http://git-wip-us.apache.org/repos/asf/kafka/blob/4065ffb3/tools/src/main/java/org/apache/kafka/trogdor/basic/BasicPlatform.java
----------------------------------------------------------------------
diff --git a/tools/src/main/java/org/apache/kafka/trogdor/basic/BasicPlatform.java b/tools/src/main/java/org/apache/kafka/trogdor/basic/BasicPlatform.java
index 9bd2cb9..85270cd 100644
--- a/tools/src/main/java/org/apache/kafka/trogdor/basic/BasicPlatform.java
+++ b/tools/src/main/java/org/apache/kafka/trogdor/basic/BasicPlatform.java
@@ -23,6 +23,8 @@ import org.apache.kafka.common.utils.Utils;
 import org.apache.kafka.trogdor.common.Node;
 import org.apache.kafka.trogdor.common.Platform;
 import org.apache.kafka.trogdor.common.Topology;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 import java.io.IOException;
 
@@ -30,6 +32,8 @@ import java.io.IOException;
  * Defines a cluster topology
  */
 public class BasicPlatform implements Platform {
+    private static final Logger log = LoggerFactory.getLogger(BasicPlatform.class);
+
     private final Node curNode;
     private final BasicTopology topology;
     private final CommandRunner commandRunner;
@@ -41,7 +45,14 @@ public class BasicPlatform implements Platform {
     public static class ShellCommandRunner implements CommandRunner {
         @Override
         public String run(Node curNode, String[] command) throws IOException {
-            return Shell.execCommand(command);
+            try {
+                String result = Shell.execCommand(command);
+                log.info("RUN: {}. RESULT: [{}]", Utils.join(command, " "), result);
+                return result;
+            } catch (RuntimeException | IOException e) {
+                log.info("RUN: {}. ERROR: [{}]", Utils.join(command, " "), e.getMessage());
+                throw e;
+            }
         }
     }
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/4065ffb3/tools/src/main/java/org/apache/kafka/trogdor/coordinator/Coordinator.java
----------------------------------------------------------------------
diff --git a/tools/src/main/java/org/apache/kafka/trogdor/coordinator/Coordinator.java b/tools/src/main/java/org/apache/kafka/trogdor/coordinator/Coordinator.java
index 2759a36..8f3563b 100644
--- a/tools/src/main/java/org/apache/kafka/trogdor/coordinator/Coordinator.java
+++ b/tools/src/main/java/org/apache/kafka/trogdor/coordinator/Coordinator.java
@@ -27,10 +27,11 @@ import org.apache.kafka.common.utils.Time;
 import org.apache.kafka.common.utils.Utils;
 import org.apache.kafka.trogdor.common.Node;
 import org.apache.kafka.trogdor.common.Platform;
+import org.apache.kafka.trogdor.fault.DoneState;
 import org.apache.kafka.trogdor.fault.Fault;
 import org.apache.kafka.trogdor.fault.FaultSet;
 import org.apache.kafka.trogdor.fault.FaultSpec;
-import org.apache.kafka.trogdor.fault.FaultState;
+import org.apache.kafka.trogdor.fault.SendingState;
 import org.apache.kafka.trogdor.rest.CoordinatorFaultsResponse;
 import org.apache.kafka.trogdor.rest.CreateCoordinatorFaultRequest;
 import org.apache.kafka.trogdor.rest.JsonRestServer;
@@ -115,9 +116,9 @@ public final class Coordinator {
     private final FaultSet pendingFaults = new FaultSet();
 
     /**
-     * The set of faults which have been sent to the agents.
+     * The set of faults which have been sent to the NodeManagers.
      */
-    private final FaultSet doneFaults = new FaultSet();
+    private final FaultSet processedFaults = new FaultSet();
 
     class CoordinatorRunnable implements Runnable {
         @Override
@@ -152,13 +153,13 @@ public final class Coordinator {
                             }
                             toStart.add(fault);
                             iter.remove();
-                            doneFaults.add(fault);
+                            processedFaults.add(fault);
                         }
                     } finally {
                         lock.unlock();
                     }
                     for (Fault fault: toStart) {
-                        startFault(fault);
+                        startFault(now, fault);
                     }
                 }
             } catch (Throwable t) {
@@ -209,7 +210,7 @@ public final class Coordinator {
         return this.restServer.port();
     }
 
-    private void startFault(Fault fault) {
+    private void startFault(long now, Fault fault) {
         Set<String> affectedNodes = fault.targetNodes(platform.topology());
         Set<NodeManager> affectedManagers = new HashSet<>();
         Set<String> nonexistentNodes = new HashSet<>();
@@ -229,6 +230,11 @@ public final class Coordinator {
         }
         log.info("Applying fault {} on {} node(s): {}", fault.id(),
                 nodeNames.size(), Utils.join(nodeNames, ", "));
+        if (nodeNames.isEmpty()) {
+            fault.setState(new DoneState(now, ""));
+        } else {
+            fault.setState(new SendingState(nodeNames));
+        }
         for (NodeManager nodeManager : affectedManagers) {
             nodeManager.enqueueFault(fault);
         }
@@ -261,8 +267,8 @@ public final class Coordinator {
         Map<String, CoordinatorFaultsResponse.FaultData> faultData = new TreeMap<>();
         lock.lock();
         try {
-            getFaultsImpl(faultData, pendingFaults, FaultState.PENDING);
-            getFaultsImpl(faultData, doneFaults, FaultState.DONE);
+            getFaultsImpl(faultData, pendingFaults);
+            getFaultsImpl(faultData, processedFaults);
         } finally {
             lock.unlock();
         }
@@ -270,12 +276,12 @@ public final class Coordinator {
     }
 
     private void getFaultsImpl(Map<String, CoordinatorFaultsResponse.FaultData> faultData,
-                               FaultSet faultSet, FaultState state) {
+                               FaultSet faultSet) {
         for (Iterator<Fault> iter = faultSet.iterateByStart();
              iter.hasNext(); ) {
             Fault fault = iter.next();
             CoordinatorFaultsResponse.FaultData data =
-                new CoordinatorFaultsResponse.FaultData(fault.spec(), state);
+                new CoordinatorFaultsResponse.FaultData(fault.spec(), fault.state());
             faultData.put(fault.id(), data);
         }
     }
@@ -330,13 +336,15 @@ public final class Coordinator {
         JsonRestServer restServer = new JsonRestServer(
             Node.Util.getTrogdorCoordinatorPort(platform.curNode()));
         CoordinatorRestResource resource = new CoordinatorRestResource();
-        Coordinator coordinator = new Coordinator(platform, Time.SYSTEM,
+        final Coordinator coordinator = new Coordinator(platform, Time.SYSTEM,
             restServer, resource);
         restServer.start(resource);
         Runtime.getRuntime().addShutdownHook(new Thread() {
             @Override
             public void run() {
-                log.error("Coordinator shutting down...");
+                log.error("Running shutdown hook...");
+                coordinator.beginShutdown();
+                coordinator.waitForShutdown();
             }
         });
         coordinator.waitForShutdown();

http://git-wip-us.apache.org/repos/asf/kafka/blob/4065ffb3/tools/src/main/java/org/apache/kafka/trogdor/coordinator/NodeManager.java
----------------------------------------------------------------------
diff --git a/tools/src/main/java/org/apache/kafka/trogdor/coordinator/NodeManager.java b/tools/src/main/java/org/apache/kafka/trogdor/coordinator/NodeManager.java
index 04d714c..ee71190 100644
--- a/tools/src/main/java/org/apache/kafka/trogdor/coordinator/NodeManager.java
+++ b/tools/src/main/java/org/apache/kafka/trogdor/coordinator/NodeManager.java
@@ -22,7 +22,9 @@ import org.apache.kafka.common.utils.Time;
 import org.apache.kafka.trogdor.agent.AgentClient;
 import org.apache.kafka.trogdor.common.Node;
 import org.apache.kafka.trogdor.common.Platform;
+import org.apache.kafka.trogdor.fault.DoneState;
 import org.apache.kafka.trogdor.fault.Fault;
+import org.apache.kafka.trogdor.fault.SendingState;
 import org.apache.kafka.trogdor.rest.AgentStatusResponse;
 import org.apache.kafka.trogdor.rest.CreateAgentFaultRequest;
 import org.slf4j.Logger;
@@ -187,6 +189,10 @@ class NodeManager {
         } finally {
             lock.unlock();
         }
+        SendingState state = (SendingState) fault.state();
+        if (state.completeSend(node.name())) {
+            fault.setState(new DoneState(now, ""));
+        }
         return true;
     }
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/4065ffb3/tools/src/main/java/org/apache/kafka/trogdor/fault/AbstractFault.java
----------------------------------------------------------------------
diff --git a/tools/src/main/java/org/apache/kafka/trogdor/fault/AbstractFault.java b/tools/src/main/java/org/apache/kafka/trogdor/fault/AbstractFault.java
new file mode 100644
index 0000000..2d63b82
--- /dev/null
+++ b/tools/src/main/java/org/apache/kafka/trogdor/fault/AbstractFault.java
@@ -0,0 +1,106 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.trogdor.fault;
+
+import org.apache.kafka.trogdor.common.JsonUtil;
+import org.apache.kafka.trogdor.common.Platform;
+import org.apache.kafka.trogdor.common.Topology;
+
+import java.util.Objects;
+import java.util.Set;
+
+public abstract class AbstractFault implements Fault {
+    private final String id;
+    private final FaultSpec spec;
+    private FaultState state;
+
+    public AbstractFault(String id, FaultSpec spec) {
+        this.id = id;
+        this.spec = spec;
+        this.state = new PendingState();
+    }
+
+    @Override
+    public final String id() {
+        return id;
+    }
+
+    @Override
+    public final FaultSpec spec() {
+        return spec;
+    }
+
+    @Override
+    public synchronized FaultState state() {
+        return state;
+    }
+
+    @Override
+    public synchronized void setState(FaultState state) {
+        this.state = state;
+    }
+
+    @Override
+    public final void activate(long now, Platform platform) throws Exception {
+        try {
+            handleActivation(now, platform);
+            setState(new RunningState(now));
+        } catch (Exception e) {
+            setState(new DoneState(now, e.getMessage()));
+            throw e;
+        }
+    }
+
+    protected abstract void handleActivation(long now, Platform platform) throws Exception;
+
+    @Override
+    public final void deactivate(long now, Platform platform) throws Exception {
+        try {
+            handleDeactivation(now, platform);
+            setState(new DoneState(now, ""));
+        } catch (Exception e) {
+            setState(new DoneState(now, e.getMessage()));
+            throw e;
+        }
+    }
+
+    protected abstract void handleDeactivation(long now, Platform platform) throws Exception;
+
+    @Override
+    public abstract Set<String> targetNodes(Topology topology);
+
+    @Override
+    public final boolean equals(Object o) {
+        if (this == o) return true;
+        if (o == null || getClass() != o.getClass()) return false;
+        return toString().equals(o.toString());
+    }
+
+    @Override
+    public final int hashCode() {
+        return Objects.hashCode(toString());
+    }
+
+    @Override
+    public final String toString() {
+        return getClass().getSimpleName() + "(id=" + id +
+            ", spec=" + JsonUtil.toJsonString(spec) +
+            ", state=" + JsonUtil.toJsonString(state()) +
+            ")";
+    }
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/4065ffb3/tools/src/main/java/org/apache/kafka/trogdor/fault/DoneState.java
----------------------------------------------------------------------
diff --git a/tools/src/main/java/org/apache/kafka/trogdor/fault/DoneState.java b/tools/src/main/java/org/apache/kafka/trogdor/fault/DoneState.java
new file mode 100644
index 0000000..222caf0
--- /dev/null
+++ b/tools/src/main/java/org/apache/kafka/trogdor/fault/DoneState.java
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.trogdor.fault;
+
+import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonProperty;
+
+/**
+ * The state a fault is in on the agent or controller when it is completed,
+ * either normally or with an error.
+ */
+public class DoneState extends FaultState {
+    private final long doneMs;
+    private final String errorStr;
+
+    @JsonCreator
+    public DoneState(@JsonProperty("doneMs") long doneMs,
+                     @JsonProperty("errorStr") String errorStr) {
+        this.doneMs = doneMs;
+        this.errorStr = errorStr;
+    }
+
+    @JsonProperty
+    public long doneMs() {
+        return doneMs;
+    }
+
+    @JsonProperty
+    public String errorStr() {
+        return errorStr;
+    }
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/4065ffb3/tools/src/main/java/org/apache/kafka/trogdor/fault/Fault.java
----------------------------------------------------------------------
diff --git a/tools/src/main/java/org/apache/kafka/trogdor/fault/Fault.java b/tools/src/main/java/org/apache/kafka/trogdor/fault/Fault.java
index 9f1a19a..e44d56a 100644
--- a/tools/src/main/java/org/apache/kafka/trogdor/fault/Fault.java
+++ b/tools/src/main/java/org/apache/kafka/trogdor/fault/Fault.java
@@ -34,17 +34,37 @@ public interface Fault {
     FaultSpec spec();
 
     /**
-     * Activate the fault.
+     * Get the current fault state.  Thread-safe.
      */
-    void activate(Platform platform) throws Exception;
+    FaultState state();
 
     /**
-     * Deactivate the fault.
+     * Set the current fault state.  Thread-safe.
      */
-    void deactivate(Platform platform) throws Exception;
+    void setState(FaultState state);
+
+    /**
+     * Activate the fault.  Will transition into RunningState or DoneState.
+     *
+     * @param now           The current time in ms.
+     * @param platform      The platform to use.
+     */
+    void activate(long now, Platform platform) throws Exception;
+
+    /**
+     * Deactivate the fault.  Will transition into DoneState.
+     *
+     * @param now           The current time in ms.
+     * @param platform      The platform to use.
+     */
+    void deactivate(long now, Platform platform) throws Exception;
 
     /**
      * Get the nodes which this fault is targetting.
+     *
+     * @param topology      The topology to use.
+     *
+     * @return              A set of target node names.
      */
     Set<String> targetNodes(Topology topology);
 }

http://git-wip-us.apache.org/repos/asf/kafka/blob/4065ffb3/tools/src/main/java/org/apache/kafka/trogdor/fault/FaultState.java
----------------------------------------------------------------------
diff --git a/tools/src/main/java/org/apache/kafka/trogdor/fault/FaultState.java b/tools/src/main/java/org/apache/kafka/trogdor/fault/FaultState.java
index bec0792..cba8419 100644
--- a/tools/src/main/java/org/apache/kafka/trogdor/fault/FaultState.java
+++ b/tools/src/main/java/org/apache/kafka/trogdor/fault/FaultState.java
@@ -17,11 +17,35 @@
 
 package org.apache.kafka.trogdor.fault;
 
-import com.fasterxml.jackson.annotation.JsonFormat;
+import com.fasterxml.jackson.annotation.JsonSubTypes;
+import com.fasterxml.jackson.annotation.JsonTypeInfo;
+import org.apache.kafka.trogdor.common.JsonUtil;
+import java.util.Objects;
 
-@JsonFormat(shape = JsonFormat.Shape.STRING)
-public enum FaultState {
-    PENDING,
-    RUNNING,
-    DONE
+@JsonTypeInfo(use = JsonTypeInfo.Id.NAME,
+    include = JsonTypeInfo.As.PROPERTY,
+    property = "stateName")
+@JsonSubTypes({
+        @JsonSubTypes.Type(value = DoneState.class, name = "done"),
+        @JsonSubTypes.Type(value = PendingState.class, name = "pending"),
+        @JsonSubTypes.Type(value = RunningState.class, name = "running"),
+        @JsonSubTypes.Type(value = SendingState.class, name = "sending")
+    })
+public abstract class FaultState {
+    @Override
+    public final boolean equals(Object o) {
+        if (this == o) return true;
+        if (o == null || getClass() != o.getClass()) return false;
+        return toString().equals(o.toString());
+    }
+
+    @Override
+    public final int hashCode() {
+        return Objects.hashCode(toString());
+    }
+
+    @Override
+    public final String toString() {
+        return JsonUtil.toJsonString(this);
+    }
 }

http://git-wip-us.apache.org/repos/asf/kafka/blob/4065ffb3/tools/src/main/java/org/apache/kafka/trogdor/fault/NetworkPartitionFault.java
----------------------------------------------------------------------
diff --git a/tools/src/main/java/org/apache/kafka/trogdor/fault/NetworkPartitionFault.java b/tools/src/main/java/org/apache/kafka/trogdor/fault/NetworkPartitionFault.java
index 7524af1..cf3270a 100644
--- a/tools/src/main/java/org/apache/kafka/trogdor/fault/NetworkPartitionFault.java
+++ b/tools/src/main/java/org/apache/kafka/trogdor/fault/NetworkPartitionFault.java
@@ -17,7 +17,6 @@
 
 package org.apache.kafka.trogdor.fault;
 
-import org.apache.kafka.trogdor.common.JsonUtil;
 import org.apache.kafka.trogdor.common.Node;
 import org.apache.kafka.trogdor.common.Platform;
 import org.apache.kafka.trogdor.common.Topology;
@@ -28,23 +27,20 @@ import java.net.InetAddress;
 import java.util.ArrayList;
 import java.util.HashSet;
 import java.util.List;
-import java.util.Objects;
 import java.util.Set;
 import java.util.TreeSet;
 
-public class NetworkPartitionFault implements Fault {
+public class NetworkPartitionFault extends AbstractFault {
     private static final Logger log = LoggerFactory.getLogger(NetworkPartitionFault.class);
 
-    private final String id;
-    private final NetworkPartitionFaultSpec spec;
     private final List<Set<String>> partitions;
 
     public NetworkPartitionFault(String id, FaultSpec spec) {
-        this.id = id;
-        this.spec = (NetworkPartitionFaultSpec) spec;
+        super(id, spec);
+        NetworkPartitionFaultSpec faultSpec = (NetworkPartitionFaultSpec) spec;
         this.partitions = new ArrayList<>();
         HashSet<String> prevNodes = new HashSet<>();
-        for (List<String> partition : this.spec.partitions()) {
+        for (List<String> partition : faultSpec.partitions()) {
             for (String nodeName : partition) {
                 if (prevNodes.contains(nodeName)) {
                     throw new RuntimeException("Node " + nodeName +
@@ -57,23 +53,13 @@ public class NetworkPartitionFault implements Fault {
     }
 
     @Override
-    public String id() {
-        return id;
-    }
-
-    @Override
-    public FaultSpec spec() {
-        return spec;
-    }
-
-    @Override
-    public void activate(Platform platform) throws Exception {
+    protected void handleActivation(long now, Platform platform) throws Exception {
         log.info("Activating NetworkPartitionFault...");
         runIptablesCommands(platform, "-A");
     }
 
     @Override
-    public void deactivate(Platform platform) throws Exception {
+    protected void handleDeactivation(long now, Platform platform) throws Exception {
         log.info("Deactivating NetworkPartitionFault...");
         runIptablesCommands(platform, "-D");
     }
@@ -107,24 +93,4 @@ public class NetworkPartitionFault implements Fault {
         }
         return targetNodes;
     }
-
-    @Override
-    public boolean equals(Object o) {
-        if (this == o) return true;
-        if (o == null || getClass() != o.getClass()) return false;
-        NetworkPartitionFault that = (NetworkPartitionFault) o;
-        return Objects.equals(id, that.id) &&
-            Objects.equals(spec, that.spec) &&
-            Objects.equals(partitions, that.partitions);
-    }
-
-    @Override
-    public int hashCode() {
-        return Objects.hash(id, spec, partitions);
-    }
-
-    @Override
-    public String toString() {
-        return "NoOpFault(id=" + id + ", spec=" + JsonUtil.toJsonString(spec) + ")";
-    }
 }

http://git-wip-us.apache.org/repos/asf/kafka/blob/4065ffb3/tools/src/main/java/org/apache/kafka/trogdor/fault/NoOpFault.java
----------------------------------------------------------------------
diff --git a/tools/src/main/java/org/apache/kafka/trogdor/fault/NoOpFault.java b/tools/src/main/java/org/apache/kafka/trogdor/fault/NoOpFault.java
index c7ac4de..70b4965 100644
--- a/tools/src/main/java/org/apache/kafka/trogdor/fault/NoOpFault.java
+++ b/tools/src/main/java/org/apache/kafka/trogdor/fault/NoOpFault.java
@@ -17,7 +17,6 @@
 
 package org.apache.kafka.trogdor.fault;
 
-import org.apache.kafka.trogdor.common.JsonUtil;
 import org.apache.kafka.trogdor.common.Node;
 import org.apache.kafka.trogdor.common.Platform;
 import org.apache.kafka.trogdor.common.Topology;
@@ -26,37 +25,22 @@ import org.slf4j.LoggerFactory;
 
 import java.util.HashSet;
 import java.util.Map;
-import java.util.Objects;
 import java.util.Set;
 
-public class NoOpFault implements Fault {
+public class NoOpFault extends AbstractFault {
     private static final Logger log = LoggerFactory.getLogger(NoOpFault.class);
 
-    private final String id;
-    private final FaultSpec spec;
-
     public NoOpFault(String id, FaultSpec spec) {
-        this.id = id;
-        this.spec = spec;
-    }
-
-    @Override
-    public String id() {
-        return id;
+        super(id, spec);
     }
 
     @Override
-    public FaultSpec spec() {
-        return spec;
-    }
-
-    @Override
-    public void activate(Platform platform) {
+    protected void handleActivation(long now, Platform platform) throws Exception {
         log.info("Activating NoOpFault...");
     }
 
     @Override
-    public void deactivate(Platform platform) {
+    protected void handleDeactivation(long now, Platform platform) throws Exception {
         log.info("Deactivating NoOpFault...");
     }
 
@@ -70,23 +54,4 @@ public class NoOpFault implements Fault {
         }
         return set;
     }
-
-    @Override
-    public boolean equals(Object o) {
-        if (this == o) return true;
-        if (o == null || getClass() != o.getClass()) return false;
-        NoOpFault that = (NoOpFault) o;
-        return Objects.equals(id, that.id) &&
-            Objects.equals(spec, that.spec);
-    }
-
-    @Override
-    public int hashCode() {
-        return Objects.hash(id, spec);
-    }
-
-    @Override
-    public String toString() {
-        return "NoOpFault(id=" + id + ", spec=" + JsonUtil.toJsonString(spec) + ")";
-    }
 }

http://git-wip-us.apache.org/repos/asf/kafka/blob/4065ffb3/tools/src/main/java/org/apache/kafka/trogdor/fault/PendingState.java
----------------------------------------------------------------------
diff --git a/tools/src/main/java/org/apache/kafka/trogdor/fault/PendingState.java b/tools/src/main/java/org/apache/kafka/trogdor/fault/PendingState.java
new file mode 100644
index 0000000..57c8e88
--- /dev/null
+++ b/tools/src/main/java/org/apache/kafka/trogdor/fault/PendingState.java
@@ -0,0 +1,30 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.trogdor.fault;
+
+import com.fasterxml.jackson.annotation.JsonCreator;
+
+/**
+ * The state a fault is in on the agent or controller when we haven't yet done
+ * anything with it.
+ */
+public class PendingState extends FaultState {
+    @JsonCreator
+    public PendingState() {
+    }
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/4065ffb3/tools/src/main/java/org/apache/kafka/trogdor/fault/RunningState.java
----------------------------------------------------------------------
diff --git a/tools/src/main/java/org/apache/kafka/trogdor/fault/RunningState.java b/tools/src/main/java/org/apache/kafka/trogdor/fault/RunningState.java
new file mode 100644
index 0000000..1b81bf5
--- /dev/null
+++ b/tools/src/main/java/org/apache/kafka/trogdor/fault/RunningState.java
@@ -0,0 +1,38 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.trogdor.fault;
+
+import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonProperty;
+
+/**
+ * The state a fault is in on the agent when it is running.
+ */
+public class RunningState extends FaultState {
+    private final long startedMs;
+
+    @JsonCreator
+    public RunningState(@JsonProperty("startedMs") long startedMs) {
+        this.startedMs = startedMs;
+    }
+
+    @JsonProperty
+    public long startedMs() {
+        return startedMs;
+    }
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/4065ffb3/tools/src/main/java/org/apache/kafka/trogdor/fault/SendingState.java
----------------------------------------------------------------------
diff --git a/tools/src/main/java/org/apache/kafka/trogdor/fault/SendingState.java b/tools/src/main/java/org/apache/kafka/trogdor/fault/SendingState.java
new file mode 100644
index 0000000..edfbed2
--- /dev/null
+++ b/tools/src/main/java/org/apache/kafka/trogdor/fault/SendingState.java
@@ -0,0 +1,64 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.trogdor.fault;
+
+import com.fasterxml.jackson.annotation.JsonProperty;
+import org.apache.kafka.common.utils.Utils;
+
+import java.util.TreeMap;
+import java.util.Set;
+
+/**
+ * The state a fault is in on the controller when it is scheduled to be sent to several agents.
+ */
+public class SendingState extends FaultState {
+    private final TreeMap<String, Boolean> nodes;
+    private int remainingNodes;
+
+    public SendingState(@JsonProperty("nodeNames") Set<String> nodeNames) {
+        this.nodes = new TreeMap<>();
+        for (String nodeName : nodeNames) {
+            nodes.put(nodeName, false);
+        }
+        remainingNodes = nodeNames.size();
+    }
+
+    @JsonProperty
+    public synchronized Set<String> nodeNames() {
+        return nodes.keySet();
+    }
+
+    /**
+     * Complete a send operation.
+     *
+     * @param nodeName      The name of the node we sent to.
+     * @return              True if there are no more send operations left.
+     */
+    public synchronized boolean completeSend(String nodeName) {
+        if (!nodes.containsKey(nodeName)) {
+            throw new RuntimeException("Node " + nodeName + " was not to supposed to " +
+                "receive this fault.  The fault was scheduled on nodes: " +
+                Utils.join(nodes.keySet(), ", "));
+        }
+        if (nodes.put(nodeName, true)) {
+            throw new RuntimeException("Node " + nodeName + " already received this fault.");
+        }
+        remainingNodes--;
+        return remainingNodes == 0;
+    }
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/4065ffb3/tools/src/main/java/org/apache/kafka/trogdor/rest/FaultDataMap.java
----------------------------------------------------------------------
diff --git a/tools/src/main/java/org/apache/kafka/trogdor/rest/FaultDataMap.java b/tools/src/main/java/org/apache/kafka/trogdor/rest/FaultDataMap.java
index 773d519..b2f7c91 100644
--- a/tools/src/main/java/org/apache/kafka/trogdor/rest/FaultDataMap.java
+++ b/tools/src/main/java/org/apache/kafka/trogdor/rest/FaultDataMap.java
@@ -38,7 +38,7 @@ public class FaultDataMap {
 
         @JsonCreator
         public FaultData(@JsonProperty("spec") FaultSpec spec,
-                @JsonProperty("status") FaultState state) {
+                @JsonProperty("state") FaultState state) {
             this.spec = spec;
             this.state = state;
         }

http://git-wip-us.apache.org/repos/asf/kafka/blob/4065ffb3/tools/src/test/java/org/apache/kafka/trogdor/agent/AgentTest.java
----------------------------------------------------------------------
diff --git a/tools/src/test/java/org/apache/kafka/trogdor/agent/AgentTest.java b/tools/src/test/java/org/apache/kafka/trogdor/agent/AgentTest.java
index c587e44..53ef849 100644
--- a/tools/src/test/java/org/apache/kafka/trogdor/agent/AgentTest.java
+++ b/tools/src/test/java/org/apache/kafka/trogdor/agent/AgentTest.java
@@ -24,8 +24,9 @@ import org.apache.kafka.trogdor.basic.BasicPlatform;
 import org.apache.kafka.trogdor.basic.BasicTopology;
 import org.apache.kafka.trogdor.common.ExpectedFaults;
 import org.apache.kafka.trogdor.common.Node;
-import org.apache.kafka.trogdor.fault.FaultState;
+import org.apache.kafka.trogdor.fault.DoneState;
 import org.apache.kafka.trogdor.fault.NoOpFaultSpec;
+import org.apache.kafka.trogdor.fault.RunningState;
 import org.apache.kafka.trogdor.rest.AgentFaultsResponse;
 import org.apache.kafka.trogdor.rest.AgentStatusResponse;
 import org.apache.kafka.trogdor.rest.CreateAgentFaultRequest;
@@ -129,36 +130,32 @@ public class AgentTest {
 
         final NoOpFaultSpec fooSpec = new NoOpFaultSpec(10, 2);
         client.putFault(new CreateAgentFaultRequest("foo", fooSpec));
-        new ExpectedFaults().addFault("foo", FaultState.RUNNING).waitFor(client);
+        new ExpectedFaults().addFault("foo", new RunningState(0)).waitFor(client);
+
+        time.sleep(3);
+        new ExpectedFaults().addFault("foo", new DoneState(3, "")).waitFor(client);
 
         final NoOpFaultSpec barSpec = new NoOpFaultSpec(20, 3);
         client.putFault(new CreateAgentFaultRequest("bar", barSpec));
-        time.sleep(11);
         new ExpectedFaults().
-            addFault("foo", FaultState.RUNNING).
-            addFault("bar", FaultState.RUNNING).
+            addFault("foo", new DoneState(3, "")).
+            addFault("bar", new RunningState(3)).
             waitFor(client);
 
-        final NoOpFaultSpec bazSpec = new NoOpFaultSpec(1, 11);
+        time.sleep(4);
+        final NoOpFaultSpec bazSpec = new NoOpFaultSpec(1, 2);
         client.putFault(new CreateAgentFaultRequest("baz", bazSpec));
         new ExpectedFaults().
-            addFault("foo", FaultState.RUNNING).
-            addFault("bar", FaultState.RUNNING).
-            addFault("baz", FaultState.RUNNING).
-            waitFor(client);
-
-        time.sleep(2);
-        new ExpectedFaults().
-            addFault("foo", FaultState.DONE).
-            addFault("bar", FaultState.RUNNING).
-            addFault("baz", FaultState.DONE).
+            addFault("foo", new DoneState(3, "")).
+            addFault("bar", new DoneState(7, "")).
+            addFault("baz", new RunningState(7)).
             waitFor(client);
 
-        time.sleep(100);
+        time.sleep(3);
         new ExpectedFaults().
-            addFault("foo", FaultState.DONE).
-            addFault("bar", FaultState.DONE).
-            addFault("baz", FaultState.DONE).
+            addFault("foo", new DoneState(3, "")).
+            addFault("bar", new DoneState(7, "")).
+            addFault("baz", new DoneState(10, "")).
             waitFor(client);
 
         agent.beginShutdown();

http://git-wip-us.apache.org/repos/asf/kafka/blob/4065ffb3/tools/src/test/java/org/apache/kafka/trogdor/coordinator/CoordinatorTest.java
----------------------------------------------------------------------
diff --git a/tools/src/test/java/org/apache/kafka/trogdor/coordinator/CoordinatorTest.java b/tools/src/test/java/org/apache/kafka/trogdor/coordinator/CoordinatorTest.java
index d7a8fa0..75109d2 100644
--- a/tools/src/test/java/org/apache/kafka/trogdor/coordinator/CoordinatorTest.java
+++ b/tools/src/test/java/org/apache/kafka/trogdor/coordinator/CoordinatorTest.java
@@ -27,9 +27,11 @@ import org.apache.kafka.trogdor.common.CapturingCommandRunner;
 import org.apache.kafka.trogdor.common.ExpectedFaults;
 import org.apache.kafka.trogdor.common.MiniTrogdorCluster;
 
-import org.apache.kafka.trogdor.fault.FaultState;
+import org.apache.kafka.trogdor.fault.DoneState;
 import org.apache.kafka.trogdor.fault.NetworkPartitionFaultSpec;
 import org.apache.kafka.trogdor.fault.NoOpFaultSpec;
+import org.apache.kafka.trogdor.fault.PendingState;
+import org.apache.kafka.trogdor.fault.RunningState;
 import org.apache.kafka.trogdor.rest.CoordinatorStatusResponse;
 import org.apache.kafka.trogdor.rest.CreateCoordinatorFaultRequest;
 import org.junit.Rule;
@@ -69,16 +71,16 @@ public class CoordinatorTest {
                 build()) {
             new ExpectedFaults().waitFor(cluster.coordinatorClient());
 
-            NoOpFaultSpec noOpFaultSpec = new NoOpFaultSpec(10, 2);
+            NoOpFaultSpec noOpFaultSpec = new NoOpFaultSpec(1, 2);
             cluster.coordinatorClient().putFault(
                 new CreateCoordinatorFaultRequest("fault1", noOpFaultSpec));
             new ExpectedFaults().
-                addFault("fault1", noOpFaultSpec, FaultState.PENDING).
+                addFault("fault1", noOpFaultSpec, new PendingState()).
                 waitFor(cluster.coordinatorClient());
 
-            time.sleep(11);
+            time.sleep(2);
             new ExpectedFaults().
-                addFault("fault1", noOpFaultSpec, FaultState.DONE).
+                addFault("fault1", noOpFaultSpec, new DoneState(2, "")).
                 waitFor(cluster.coordinatorClient());
         }
     }
@@ -99,7 +101,7 @@ public class CoordinatorTest {
             NoOpFaultSpec noOpFaultSpec = new NoOpFaultSpec(10, 2);
             coordinatorClient.putFault(new CreateCoordinatorFaultRequest("fault1", noOpFaultSpec));
             new ExpectedFaults().
-                addFault("fault1", noOpFaultSpec, FaultState.PENDING).
+                addFault("fault1", noOpFaultSpec, new PendingState()).
                 waitFor(coordinatorClient);
             new ExpectedFaults().
                 waitFor(agentClient1).
@@ -107,10 +109,10 @@ public class CoordinatorTest {
 
             time.sleep(10);
             new ExpectedFaults().
-                addFault("fault1", noOpFaultSpec, FaultState.DONE).
+                addFault("fault1", noOpFaultSpec, new DoneState(10, "")).
                 waitFor(coordinatorClient);
             new ExpectedFaults().
-                addFault("fault1", noOpFaultSpec, FaultState.RUNNING).
+                addFault("fault1", noOpFaultSpec, new RunningState(10)).
                 waitFor(agentClient1).
                 waitFor(agentClient2);
         }