You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by rs...@apache.org on 2017/11/16 17:59:47 UTC

kafka git commit: KAFKA-5811; Add Kibosh integration for Trogdor and Ducktape

Repository: kafka
Updated Branches:
  refs/heads/trunk 53c5ccb33 -> d9cbc6b1a


KAFKA-5811; Add Kibosh integration for Trogdor and Ducktape

For ducktape: add Kibosh to the testing Dockerfile.
Create files_unreadable_fault_spec.py.

For trogdor: create FilesUnreadableFaultSpec.java.
Add a unit test of using the Kibosh service.

Author: Colin P. Mccabe <cm...@confluent.io>

Reviewers: Rajini Sivaram <ra...@googlemail.com>

Closes #4195 from cmccabe/KAFKA-5811


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/d9cbc6b1
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/d9cbc6b1
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/d9cbc6b1

Branch: refs/heads/trunk
Commit: d9cbc6b1a28e2fb3f88137429e08c12201998bc7
Parents: 53c5ccb
Author: Colin P. Mccabe <cm...@confluent.io>
Authored: Thu Nov 16 17:59:24 2017 +0000
Committer: Rajini Sivaram <ra...@googlemail.com>
Committed: Thu Nov 16 17:59:24 2017 +0000

----------------------------------------------------------------------
 checkstyle/suppressions.xml                     |   2 +-
 tests/docker/Dockerfile                         |   4 +
 .../trogdor/files_unreadable_fault_spec.py      |  58 ++++++
 tests/kafkatest/services/trogdor/kibosh.py      | 152 ++++++++++++++
 tests/kafkatest/tests/tools/kibosh_test.py      |  81 ++++++++
 .../trogdor/fault/FilesUnreadableFaultSpec.java |  82 ++++++++
 .../org/apache/kafka/trogdor/fault/Kibosh.java  | 199 +++++++++++++++++++
 .../trogdor/fault/KiboshFaultController.java    |  36 ++++
 .../kafka/trogdor/fault/KiboshFaultWorker.java  |  56 ++++++
 .../apache/kafka/trogdor/agent/AgentTest.java   |  78 ++++++++
 10 files changed, 747 insertions(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/d9cbc6b1/checkstyle/suppressions.xml
----------------------------------------------------------------------
diff --git a/checkstyle/suppressions.xml b/checkstyle/suppressions.xml
index 7c32eb0..6218794 100644
--- a/checkstyle/suppressions.xml
+++ b/checkstyle/suppressions.xml
@@ -45,7 +45,7 @@
     <suppress checks="ClassDataAbstractionCoupling"
               files="(KafkaConsumer|ConsumerCoordinator|Fetcher|KafkaProducer|AbstractRequest|AbstractResponse|TransactionManager|KafkaAdminClient).java"/>
     <suppress checks="ClassDataAbstractionCoupling"
-              files="(Errors|SaslAuthenticatorTest).java"/>
+              files="(Errors|SaslAuthenticatorTest|AgentTest).java"/>
 
     <suppress checks="BooleanExpressionComplexity"
               files="(Utils|Topic|KafkaLZ4BlockOutputStream|AclData).java"/>

http://git-wip-us.apache.org/repos/asf/kafka/blob/d9cbc6b1/tests/docker/Dockerfile
----------------------------------------------------------------------
diff --git a/tests/docker/Dockerfile b/tests/docker/Dockerfile
index 3ca9993..d855fe4 100644
--- a/tests/docker/Dockerfile
+++ b/tests/docker/Dockerfile
@@ -48,6 +48,10 @@ RUN mkdir -p "/opt/kafka-0.10.1.1" && curl -s "${MIRROR}kafka/0.10.1.1/kafka_2.1
 RUN mkdir -p "/opt/kafka-0.10.2.1" && curl -s "${MIRROR}kafka/0.10.2.1/kafka_2.11-0.10.2.1.tgz" | tar xz --strip-components=1 -C "/opt/kafka-0.10.2.1"
 RUN mkdir -p "/opt/kafka-0.11.0.0" && curl -s "${MIRROR}kafka/0.11.0.0/kafka_2.11-0.11.0.0.tgz" | tar xz --strip-components=1 -C "/opt/kafka-0.11.0.0"
 
+# Install Kibosh
+RUN apt-get install fuse
+RUN cd /opt && git clone -q  https://github.com/confluentinc/kibosh.git && cd "/opt/kibosh" && git reset --hard 399de967c5520e8fe6e32d4e5c1c55d71cd7f46c && mkdir "/opt/kibosh/build" && cd "/opt/kibosh/build" && ../configure && make -j 2
+
 # Set up the ducker user.
 RUN useradd -ms /bin/bash ducker && mkdir -p /home/ducker/ && rsync -aiq /root/.ssh/ /home/ducker/.ssh && chown -R ducker /home/ducker/ /mnt/ && echo 'ducker ALL=(ALL) NOPASSWD: ALL' >> /etc/sudoers
 USER ducker

http://git-wip-us.apache.org/repos/asf/kafka/blob/d9cbc6b1/tests/kafkatest/services/trogdor/files_unreadable_fault_spec.py
----------------------------------------------------------------------
diff --git a/tests/kafkatest/services/trogdor/files_unreadable_fault_spec.py b/tests/kafkatest/services/trogdor/files_unreadable_fault_spec.py
new file mode 100644
index 0000000..4f0540a
--- /dev/null
+++ b/tests/kafkatest/services/trogdor/files_unreadable_fault_spec.py
@@ -0,0 +1,58 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+from kafkatest.services.trogdor.task_spec import TaskSpec
+
+
+class FilesUnreadableFaultSpec(TaskSpec):
+    """
+    The specification for a fault which makes files unreadable.
+    """
+
+    def __init__(self, start_ms, duration_ms, node_names, mount_path,
+                 prefix, error_code):
+        """
+        Create a new FilesUnreadableFaultSpec.
+
+        :param start_ms:        The start time, as described in task_spec.py
+        :param duration_ms:     The duration in milliseconds.
+        :param node_names:      The names of the node(s) to create the fault on.
+        :param mount_path:      The mount path.
+        :param prefix:          The prefix within the mount point to make unreadable.
+        :param error_code:      The error code to use.
+        """
+        super(FilesUnreadableFaultSpec, self).__init__(start_ms, duration_ms)
+        self.node_names = node_names
+        self.mount_path = mount_path
+        self.prefix = prefix
+        self.error_code = error_code
+
+    def message(self):
+        return {
+            "class": "org.apache.kafka.trogdor.fault.FilesUnreadableFaultSpec",
+            "startMs": self.start_ms,
+            "durationMs": self.duration_ms,
+            "nodeNames": self.node_names,
+            "mountPath": self.mount_path,
+            "prefix": self.prefix,
+            "errorCode": self.error_code,
+        }
+
+    def kibosh_message(self):
+        return {
+            "type": "unreadable",
+            "prefix": self.prefix,
+            "code": self.error_code,
+        }

http://git-wip-us.apache.org/repos/asf/kafka/blob/d9cbc6b1/tests/kafkatest/services/trogdor/kibosh.py
----------------------------------------------------------------------
diff --git a/tests/kafkatest/services/trogdor/kibosh.py b/tests/kafkatest/services/trogdor/kibosh.py
new file mode 100644
index 0000000..1bd4224
--- /dev/null
+++ b/tests/kafkatest/services/trogdor/kibosh.py
@@ -0,0 +1,152 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+import json
+import os.path
+
+from ducktape.services.service import Service
+from ducktape.utils import util
+
+
+class KiboshService(Service):
+    """
+    Kibosh is a fault-injecting FUSE filesystem.
+
+    Attributes:
+        INSTALL_ROOT                    The path of where Kibosh is installed.
+        BINARY_NAME                     The Kibosh binary name.
+        BINARY_PATH                     The path to the kibosh binary.
+    """
+    INSTALL_ROOT = "/opt/kibosh/build"
+    BINARY_NAME = "kibosh"
+    BINARY_PATH = os.path.join(INSTALL_ROOT, BINARY_NAME)
+
+    def __init__(self, context, nodes, target, mirror, persist="/mnt/kibosh"):
+        """
+        Create a Kibosh service.
+
+        :param context:             The TestContext object.
+        :param nodes:               The nodes to put the Kibosh FS on.  Kibosh allocates no
+                                    nodes of its own.
+        :param target:              The target directory, which Kibosh exports a view of.
+        :param mirror:              The mirror directory, where Kibosh injects faults.
+        :param persist:             Where the log files and pid files will be created.
+        """
+        Service.__init__(self, context, num_nodes=0)
+        if (len(nodes) == 0):
+            raise RuntimeError("You must supply at least one node to run the service on.")
+        for node in nodes:
+            self.nodes.append(node)
+
+        self.target = target
+        self.mirror = mirror
+        self.persist = persist
+
+        self.control_path = os.path.join(self.mirror, "kibosh_control")
+        self.pidfile_path = os.path.join(self.persist, "pidfile")
+        self.stdout_stderr_path = os.path.join(self.persist, "kibosh-stdout-stderr.log")
+        self.log_path = os.path.join(self.persist, "kibosh.log")
+        self.logs = {
+            "kibosh-stdout-stderr.log": {
+                "path": self.stdout_stderr_path,
+                "collect_default": True},
+            "kibosh.log": {
+                "path": self.log_path,
+                "collect_default": True}
+        }
+
+    def free(self):
+        """Clear the nodes list."""
+        # Because the filesystem runs on nodes which have been allocated by other services, those nodes
+        # are not deallocated here.
+        self.nodes = []
+        Service.free(self)
+
+    def kibosh_running(self, node):
+        return 0 == node.account.ssh("test -e '%s'" % self.control_path, allow_fail=True)
+
+    def start_node(self, node):
+        node.account.mkdirs(self.persist)
+        cmd = "sudo -E "
+        cmd += " %s" % KiboshService.BINARY_PATH
+        cmd += " --target %s" % self.target
+        cmd += " --pidfile %s" % self.pidfile_path
+        cmd += " --log %s" % self.log_path
+        cmd += " --verbose"
+        cmd += " %s" % self.mirror
+        cmd += " &> %s" % self.stdout_stderr_path
+        node.account.ssh(cmd)
+        util.wait_until(lambda: self.kibosh_running(node), 20, backoff_sec=.1,
+                        err_msg="Timed out waiting for kibosh to start on %s" % node.account.hostname)
+
+    def pids(self, node):
+        return [pid for pid in node.account.ssh_capture("test -e '%s' && test -e /proc/$(cat '%s')" %
+                                                        (self.pidfile_path, self.pidfile_path), allow_fail=True)]
+
+    def wait_node(self, node, timeout_sec=None):
+        return len(self.pids(node)) == 0
+
+    def kibosh_process_running(self, node):
+        pids = self.pids(node)
+        if len(pids) == 0:
+            return True
+        return False
+
+    def stop_node(self, node):
+        """Halt kibosh process(es) on this node."""
+        node.account.logger.debug("stop_node(%s): unmounting %s" % (node.name, self.mirror))
+        node.account.ssh("sudo fusermount -u %s" % self.mirror, allow_fail=True)
+        # Wait for the kibosh process to terminate.
+        try:
+            util.wait_until(lambda: self.kibosh_process_running(node), 20, backoff_sec=.1,
+                            err_msg="Timed out waiting for kibosh to stop on %s" % node.account.hostname)
+        except TimeoutError:
+            # If the process won't terminate, use kill -9 to shut it down.
+            node.account.logger.debug("stop_node(%s): killing the kibosh process managing %s" % (node.name, self.mirror))
+            node.account.ssh("sudo kill -9 %s" % (" ".join(self.pids(node))), allow_fail=True)
+            node.account.ssh("sudo fusermount -u %s" % self.mirror)
+            util.wait_until(lambda: self.kibosh_process_running(node), 20, backoff_sec=.1,
+                            err_msg="Timed out waiting for kibosh to stop on %s" % node.account.hostname)
+
+    def clean_node(self, node):
+        """Clean up persistent state on this node - e.g. service logs, configuration files etc."""
+        self.stop_node(node)
+        node.account.ssh("rm -rf -- %s" % self.persist)
+
+    def set_faults(self, node, specs):
+        """
+        Set the currently active faults.
+
+        :param node:        The node.
+        :param spec:        An array of FaultSpec objects describing the faults.
+        """
+        fault_array = [spec.kibosh_message() for spec in specs]
+        obj = { 'faults': fault_array }
+        obj_json = json.dumps(obj)
+        node.account.create_file(self.control_path, obj_json)
+
+    def get_fault_json(self, node):
+        """
+        Return a JSON string which contains the currently active faults.
+
+        :param node:        The node.
+
+        :returns:           The fault JSON describing the faults.
+        """
+        iter = node.account.ssh_capture("cat '%s'" % self.control_path)
+        text = ""
+        for line in iter:
+            text = "%s%s" % (text, line.rstrip("\r\n"))
+        return text

http://git-wip-us.apache.org/repos/asf/kafka/blob/d9cbc6b1/tests/kafkatest/tests/tools/kibosh_test.py
----------------------------------------------------------------------
diff --git a/tests/kafkatest/tests/tools/kibosh_test.py b/tests/kafkatest/tests/tools/kibosh_test.py
new file mode 100644
index 0000000..5844c27
--- /dev/null
+++ b/tests/kafkatest/tests/tools/kibosh_test.py
@@ -0,0 +1,81 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+from ducktape.cluster.cluster_spec import ClusterSpec
+from ducktape.mark.resource import cluster
+from ducktape.tests.test import Test
+from ducktape.utils.util import wait_until
+from kafkatest.services.trogdor.task_spec import TaskSpec
+from kafkatest.services.trogdor.files_unreadable_fault_spec import FilesUnreadableFaultSpec
+from kafkatest.services.trogdor.kibosh import KiboshService
+import json
+
+
+class KiboshTest(Test):
+    TARGET = "/mnt/kibosh-target"
+    MIRROR = "/mnt/kibosh-mirror"
+
+    """
+    Tests the Kibosh fault injection filesystem in isolation.
+    """
+
+    def __init__(self, test_context):
+        super(KiboshTest, self).__init__(test_context)
+
+    def set_up_kibosh(self, num_nodes):
+        self.nodes = self.test_context.cluster.alloc(ClusterSpec.simple_linux(num_nodes))
+        for node in self.nodes:
+            node.account.logger = self.logger
+            node.account.ssh("mkdir -p -- %s %s" % (KiboshTest.TARGET, KiboshTest.MIRROR))
+        self.kibosh = KiboshService(self.test_context, self.nodes,
+                                    KiboshTest.TARGET, KiboshTest.MIRROR)
+        for node in self.nodes:
+            node.account.logger = self.kibosh.logger
+        self.kibosh.start()
+
+    def setUp(self):
+        self.kibosh = None
+        self.nodes = None
+
+    def tearDown(self):
+        if self.kibosh is not None:
+            self.kibosh.stop()
+            self.kibosh = None
+        for node in self.nodes:
+            node.account.ssh("rm -rf -- %s %s" % (KiboshTest.TARGET, KiboshTest.MIRROR))
+        if self.nodes is not None:
+            self.test_context.cluster.free(self.nodes)
+            self.nodes = None
+
+    @cluster(num_nodes=4)
+    def test_kibosh_service(self):
+        pass
+        """
+        Test that we can bring up kibosh and create a fault.
+        """
+        self.set_up_kibosh(3)
+        spec = FilesUnreadableFaultSpec(0, TaskSpec.MAX_DURATION_MS,
+                [self.nodes[0].name], KiboshTest.TARGET, "/foo", 12)
+        node = self.nodes[0]
+
+        def check(self, node):
+            fault_json = self.kibosh.get_fault_json(node)
+            expected_json = json.dumps({"faults": [spec.kibosh_message()]})
+            self.logger.info("Read back: [%s].  Expected: [%s]." % (fault_json, expected_json))
+            return fault_json == expected_json
+
+        self.kibosh.set_faults(node, [spec])
+        wait_until(lambda: check(self, node),
+                   timeout_sec=10, backoff_sec=.2, err_msg="Failed to read back fault array.")

http://git-wip-us.apache.org/repos/asf/kafka/blob/d9cbc6b1/tools/src/main/java/org/apache/kafka/trogdor/fault/FilesUnreadableFaultSpec.java
----------------------------------------------------------------------
diff --git a/tools/src/main/java/org/apache/kafka/trogdor/fault/FilesUnreadableFaultSpec.java b/tools/src/main/java/org/apache/kafka/trogdor/fault/FilesUnreadableFaultSpec.java
new file mode 100644
index 0000000..1fbf9d0
--- /dev/null
+++ b/tools/src/main/java/org/apache/kafka/trogdor/fault/FilesUnreadableFaultSpec.java
@@ -0,0 +1,82 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.trogdor.fault;
+
+import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonProperty;
+import org.apache.kafka.trogdor.fault.Kibosh.KiboshFilesUnreadableFaultSpec;
+import org.apache.kafka.trogdor.task.TaskController;
+import org.apache.kafka.trogdor.task.TaskSpec;
+import org.apache.kafka.trogdor.task.TaskWorker;
+
+import java.util.Set;
+
+/**
+ * The specification for a fault that makes files unreadable.
+ */
+public class FilesUnreadableFaultSpec extends TaskSpec {
+    private final Set<String> nodeNames;
+    private final String mountPath;
+    private final String prefix;
+    private final int errorCode;
+
+    @JsonCreator
+    public FilesUnreadableFaultSpec(@JsonProperty("startMs") long startMs,
+                                    @JsonProperty("durationMs") long durationMs,
+                                    @JsonProperty("nodeNames") Set<String> nodeNames,
+                                    @JsonProperty("mountPath") String mountPath,
+                                    @JsonProperty("prefix") String prefix,
+                                    @JsonProperty("errorCode") int errorCode) {
+        super(startMs, durationMs);
+        this.nodeNames = nodeNames;
+        this.mountPath = mountPath;
+        this.prefix = prefix;
+        this.errorCode = errorCode;
+    }
+
+    @JsonProperty
+    public Set<String> nodeNames() {
+        return nodeNames;
+    }
+
+    @JsonProperty
+    public String mountPath() {
+        return mountPath;
+    }
+
+    @JsonProperty
+    public String prefix() {
+        return prefix;
+    }
+
+    @JsonProperty
+    public int errorCode() {
+        return errorCode;
+    }
+
+    @Override
+    public TaskController newController(String id) {
+        return new KiboshFaultController(nodeNames);
+    }
+
+    @Override
+    public TaskWorker newTaskWorker(String id) {
+        return new KiboshFaultWorker(id,
+            new KiboshFilesUnreadableFaultSpec(prefix, errorCode), mountPath);
+    }
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/d9cbc6b1/tools/src/main/java/org/apache/kafka/trogdor/fault/Kibosh.java
----------------------------------------------------------------------
diff --git a/tools/src/main/java/org/apache/kafka/trogdor/fault/Kibosh.java b/tools/src/main/java/org/apache/kafka/trogdor/fault/Kibosh.java
new file mode 100644
index 0000000..6fa1a4b
--- /dev/null
+++ b/tools/src/main/java/org/apache/kafka/trogdor/fault/Kibosh.java
@@ -0,0 +1,199 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.trogdor.fault;
+
+import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonProperty;
+import com.fasterxml.jackson.annotation.JsonSubTypes;
+import com.fasterxml.jackson.annotation.JsonTypeInfo;
+import org.apache.kafka.trogdor.common.JsonUtil;
+
+import java.io.IOException;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.nio.file.Paths;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.Objects;
+import java.util.TreeMap;
+
+public final class Kibosh {
+    public static final Kibosh INSTANCE = new Kibosh();
+
+    public final static String KIBOSH_CONTROL = "kibosh_control";
+
+    @JsonTypeInfo(use = JsonTypeInfo.Id.NAME,
+        include = JsonTypeInfo.As.PROPERTY,
+        property = "type")
+    @JsonSubTypes({
+            @JsonSubTypes.Type(value = KiboshFilesUnreadableFaultSpec.class, name = "unreadable"),
+        })
+    public static abstract class KiboshFaultSpec {
+        @Override
+        public final boolean equals(Object o) {
+            if (this == o) return true;
+            if (o == null || getClass() != o.getClass()) return false;
+            return Objects.equals(toString(), o.toString());
+        }
+
+        @Override
+        public final int hashCode() {
+            return toString().hashCode();
+        }
+
+        @Override
+        public final String toString() {
+            return JsonUtil.toJsonString(this);
+        }
+    }
+
+    public static class KiboshFilesUnreadableFaultSpec extends KiboshFaultSpec {
+        private final String prefix;
+        private final int errorCode;
+
+        @JsonCreator
+        public KiboshFilesUnreadableFaultSpec(@JsonProperty("prefix") String prefix,
+                                              @JsonProperty("errorCode") int errorCode) {
+            this.prefix = prefix;
+            this.errorCode = errorCode;
+        }
+
+        @JsonProperty
+        public String prefix() {
+            return prefix;
+        }
+
+        @JsonProperty
+        public int errorCode() {
+            return errorCode;
+        }
+    }
+
+    private static class KiboshProcess {
+        private final Path controlPath;
+
+        KiboshProcess(String mountPath) {
+            this.controlPath = Paths.get(mountPath, KIBOSH_CONTROL);
+            if (!Files.exists(controlPath)) {
+                throw new RuntimeException("Can't find file " + controlPath);
+            }
+        }
+
+        synchronized void addFault(KiboshFaultSpec toAdd) throws IOException {
+            KiboshControlFile file = KiboshControlFile.read(controlPath);
+            List<KiboshFaultSpec> faults = new ArrayList<>(file.faults());
+            faults.add(toAdd);
+            new KiboshControlFile(faults).write(controlPath);
+        }
+
+        synchronized void removeFault(KiboshFaultSpec toRemove) throws IOException {
+            KiboshControlFile file = KiboshControlFile.read(controlPath);
+            List<KiboshFaultSpec> faults = new ArrayList<>();
+            boolean foundToRemove = false;
+            for (KiboshFaultSpec fault : file.faults()) {
+                if (fault.equals(toRemove)) {
+                    foundToRemove = true;
+                } else {
+                    faults.add(fault);
+                }
+            }
+            if (!foundToRemove) {
+                throw new RuntimeException("Failed to find fault " + toRemove + ". ");
+            }
+            new KiboshControlFile(faults).write(controlPath);
+        }
+    }
+
+    public static class KiboshControlFile {
+        private final List<KiboshFaultSpec> faults;
+
+        public final static KiboshControlFile EMPTY =
+            new KiboshControlFile(Collections.<KiboshFaultSpec>emptyList());
+
+        public static KiboshControlFile read(Path controlPath) throws IOException {
+            byte[] controlFileBytes = Files.readAllBytes(controlPath);
+            return JsonUtil.JSON_SERDE.readValue(controlFileBytes, KiboshControlFile.class);
+        }
+
+        @JsonCreator
+        public KiboshControlFile(@JsonProperty("faults") List<KiboshFaultSpec> faults) {
+            this.faults = faults;
+        }
+
+        @JsonProperty
+        public List<KiboshFaultSpec> faults() {
+            return faults;
+        }
+
+        public void write(Path controlPath) throws IOException {
+            Files.write(controlPath, JsonUtil.JSON_SERDE.writeValueAsBytes(this));
+        }
+
+        @Override
+        public final boolean equals(Object o) {
+            if (this == o) return true;
+            if (o == null || getClass() != o.getClass()) return false;
+            return Objects.equals(toString(), o.toString());
+        }
+
+        @Override
+        public final int hashCode() {
+            return toString().hashCode();
+        }
+
+        @Override
+        public final String toString() {
+            return JsonUtil.toJsonString(this);
+        }
+    }
+
+    private final TreeMap<String, KiboshProcess> processes = new TreeMap<>();
+
+    private Kibosh() {
+    }
+
+    /**
+     * Get or create a KiboshProcess object to manage the Kibosh process at a given path.
+     */
+    private synchronized KiboshProcess findProcessObject(String mountPath) {
+        String path = Paths.get(mountPath).normalize().toString();
+        KiboshProcess process = processes.get(path);
+        if (process == null) {
+            process = new KiboshProcess(mountPath);
+            processes.put(path, process);
+        }
+        return process;
+    }
+
+    /**
+     * Add a new Kibosh fault.
+     */
+    void addFault(String mountPath, KiboshFaultSpec spec) throws IOException {
+        KiboshProcess process = findProcessObject(mountPath);
+        process.addFault(spec);
+    }
+
+    /**
+     * Remove a Kibosh fault.
+     */
+    void removeFault(String mountPath, KiboshFaultSpec spec) throws IOException {
+        KiboshProcess process = findProcessObject(mountPath);
+        process.removeFault(spec);
+    }
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/d9cbc6b1/tools/src/main/java/org/apache/kafka/trogdor/fault/KiboshFaultController.java
----------------------------------------------------------------------
diff --git a/tools/src/main/java/org/apache/kafka/trogdor/fault/KiboshFaultController.java b/tools/src/main/java/org/apache/kafka/trogdor/fault/KiboshFaultController.java
new file mode 100644
index 0000000..140abf1
--- /dev/null
+++ b/tools/src/main/java/org/apache/kafka/trogdor/fault/KiboshFaultController.java
@@ -0,0 +1,36 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.trogdor.fault;
+
+import org.apache.kafka.trogdor.common.Topology;
+import org.apache.kafka.trogdor.task.TaskController;
+
+import java.util.Set;
+
+public class KiboshFaultController implements TaskController {
+    private final Set<String> nodeNames;
+
+    public KiboshFaultController(Set<String> nodeNames) {
+        this.nodeNames = nodeNames;
+    }
+
+    @Override
+    public Set<String> targetNodes(Topology topology) {
+        return nodeNames;
+    }
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/d9cbc6b1/tools/src/main/java/org/apache/kafka/trogdor/fault/KiboshFaultWorker.java
----------------------------------------------------------------------
diff --git a/tools/src/main/java/org/apache/kafka/trogdor/fault/KiboshFaultWorker.java b/tools/src/main/java/org/apache/kafka/trogdor/fault/KiboshFaultWorker.java
new file mode 100644
index 0000000..629d15e
--- /dev/null
+++ b/tools/src/main/java/org/apache/kafka/trogdor/fault/KiboshFaultWorker.java
@@ -0,0 +1,56 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.trogdor.fault;
+
+import org.apache.kafka.common.internals.KafkaFutureImpl;
+import org.apache.kafka.trogdor.common.Platform;
+import org.apache.kafka.trogdor.fault.Kibosh.KiboshFaultSpec;
+import org.apache.kafka.trogdor.task.TaskWorker;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.concurrent.atomic.AtomicReference;
+
+public class KiboshFaultWorker implements TaskWorker {
+    private static final Logger log = LoggerFactory.getLogger(KiboshFaultWorker.class);
+
+    private final String id;
+
+    private final KiboshFaultSpec spec;
+
+    private final String mountPath;
+
+    public KiboshFaultWorker(String id, KiboshFaultSpec spec, String mountPath) {
+        this.id = id;
+        this.spec = spec;
+        this.mountPath = mountPath;
+    }
+
+    @Override
+    public void start(Platform platform, AtomicReference<String> status,
+                      KafkaFutureImpl<String> errorFuture) throws Exception {
+        log.info("Activating {} {}: {}.", spec.getClass().getSimpleName(), id, spec);
+        Kibosh.INSTANCE.addFault(mountPath, spec);
+    }
+
+    @Override
+    public void stop(Platform platform) throws Exception {
+        log.info("Deactivating {} {}: {}.", spec.getClass().getSimpleName(), id, spec);
+        Kibosh.INSTANCE.removeFault(mountPath, spec);
+    }
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/d9cbc6b1/tools/src/test/java/org/apache/kafka/trogdor/agent/AgentTest.java
----------------------------------------------------------------------
diff --git a/tools/src/test/java/org/apache/kafka/trogdor/agent/AgentTest.java b/tools/src/test/java/org/apache/kafka/trogdor/agent/AgentTest.java
index 342fefc..b5fa001 100644
--- a/tools/src/test/java/org/apache/kafka/trogdor/agent/AgentTest.java
+++ b/tools/src/test/java/org/apache/kafka/trogdor/agent/AgentTest.java
@@ -20,12 +20,18 @@ package org.apache.kafka.trogdor.agent;
 import org.apache.kafka.common.utils.MockScheduler;
 import org.apache.kafka.common.utils.MockTime;
 import org.apache.kafka.common.utils.Scheduler;
+import org.apache.kafka.common.utils.Utils;
+import org.apache.kafka.test.TestUtils;
 import org.apache.kafka.trogdor.basic.BasicNode;
 import org.apache.kafka.trogdor.basic.BasicPlatform;
 import org.apache.kafka.trogdor.basic.BasicTopology;
 import org.apache.kafka.trogdor.common.ExpectedTasks;
 import org.apache.kafka.trogdor.common.ExpectedTasks.ExpectedTaskBuilder;
 import org.apache.kafka.trogdor.common.Node;
+import org.apache.kafka.trogdor.fault.FilesUnreadableFaultSpec;
+import org.apache.kafka.trogdor.fault.Kibosh;
+import org.apache.kafka.trogdor.fault.Kibosh.KiboshControlFile;
+import org.apache.kafka.trogdor.fault.Kibosh.KiboshFilesUnreadableFaultSpec;
 import org.apache.kafka.trogdor.rest.AgentStatusResponse;
 
 import org.apache.kafka.trogdor.rest.CreateWorkerRequest;
@@ -36,10 +42,16 @@ import org.apache.kafka.trogdor.rest.WorkerDone;
 import org.apache.kafka.trogdor.rest.WorkerRunning;
 import org.apache.kafka.trogdor.task.NoOpTaskSpec;
 import org.apache.kafka.trogdor.task.SampleTaskSpec;
+import org.junit.Assert;
 import org.junit.Rule;
 import org.junit.rules.Timeout;
 import org.junit.Test;
 
+import java.io.File;
+import java.io.IOException;
+import java.nio.file.Path;
+import java.nio.file.Paths;
+import java.util.ArrayList;
 import java.util.Collections;
 import java.util.HashMap;
 import java.util.TreeMap;
@@ -235,4 +247,70 @@ public class AgentTest {
                 build()).
             waitFor(client);
     }
+
+    private static class MockKibosh implements AutoCloseable {
+        private final File tempDir;
+        private final Path controlFile;
+
+        MockKibosh() throws IOException {
+            tempDir = TestUtils.tempDirectory();
+            controlFile = Paths.get(tempDir.toPath().toString(), Kibosh.KIBOSH_CONTROL);
+            KiboshControlFile.EMPTY.write(controlFile);
+        }
+
+        KiboshControlFile read() throws IOException {
+            return KiboshControlFile.read(controlFile);
+        }
+
+        @Override
+        public void close() throws Exception {
+            Utils.delete(tempDir);
+        }
+    }
+
+    @Test
+    public void testKiboshFaults() throws Exception {
+        MockTime time = new MockTime(0, 0, 0);
+        MockScheduler scheduler = new MockScheduler(time);
+        Agent agent = createAgent(scheduler);
+        AgentClient client = new AgentClient(10, "localhost", agent.port());
+        new ExpectedTasks().waitFor(client);
+
+        try (MockKibosh mockKibosh = new MockKibosh()) {
+            Assert.assertEquals(KiboshControlFile.EMPTY, mockKibosh.read());
+            FilesUnreadableFaultSpec fooSpec = new FilesUnreadableFaultSpec(0, 900000,
+                Collections.singleton("myAgent"), mockKibosh.tempDir.getPath().toString(), "/foo", 123);
+            client.createWorker(new CreateWorkerRequest("foo", fooSpec));
+            new ExpectedTasks().
+                addTask(new ExpectedTaskBuilder("foo").
+                    workerState(new WorkerRunning(fooSpec, 0, "")).
+                    build()).
+                waitFor(client);
+            Assert.assertEquals(new KiboshControlFile(Collections.<Kibosh.KiboshFaultSpec>singletonList(
+                new KiboshFilesUnreadableFaultSpec("/foo", 123))), mockKibosh.read());
+            FilesUnreadableFaultSpec barSpec = new FilesUnreadableFaultSpec(0, 900000,
+                Collections.singleton("myAgent"), mockKibosh.tempDir.getPath().toString(), "/bar", 456);
+            client.createWorker(new CreateWorkerRequest("bar", barSpec));
+            new ExpectedTasks().
+                addTask(new ExpectedTaskBuilder("foo").
+                    workerState(new WorkerRunning(fooSpec, 0, "")).build()).
+                addTask(new ExpectedTaskBuilder("bar").
+                    workerState(new WorkerRunning(barSpec, 0, "")).build()).
+                waitFor(client);
+            Assert.assertEquals(new KiboshControlFile(new ArrayList<Kibosh.KiboshFaultSpec>() {{
+                    add(new KiboshFilesUnreadableFaultSpec("/foo", 123));
+                    add(new KiboshFilesUnreadableFaultSpec("/bar", 456));
+                }}), mockKibosh.read());
+            time.sleep(1);
+            client.stopWorker(new StopWorkerRequest("foo"));
+            new ExpectedTasks().
+                addTask(new ExpectedTaskBuilder("foo").
+                    workerState(new WorkerDone(fooSpec, 0, 1, "", "")).build()).
+                addTask(new ExpectedTaskBuilder("bar").
+                    workerState(new WorkerRunning(barSpec, 0, "")).build()).
+                waitFor(client);
+            Assert.assertEquals(new KiboshControlFile(Collections.<Kibosh.KiboshFaultSpec>singletonList(
+                new KiboshFilesUnreadableFaultSpec("/bar", 456))), mockKibosh.read());
+        }
+    }
 };