You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by rs...@apache.org on 2017/11/16 17:59:47 UTC
kafka git commit: KAFKA-5811;
Add Kibosh integration for Trogdor and Ducktape
Repository: kafka
Updated Branches:
refs/heads/trunk 53c5ccb33 -> d9cbc6b1a
KAFKA-5811; Add Kibosh integration for Trogdor and Ducktape
For ducktape: add Kibosh to the testing Dockerfile.
Create files_unreadable_fault_spec.py.
For trogdor: create FilesUnreadableFaultSpec.java.
Add a unit test of using the Kibosh service.
Author: Colin P. Mccabe <cm...@confluent.io>
Reviewers: Rajini Sivaram <ra...@googlemail.com>
Closes #4195 from cmccabe/KAFKA-5811
Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/d9cbc6b1
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/d9cbc6b1
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/d9cbc6b1
Branch: refs/heads/trunk
Commit: d9cbc6b1a28e2fb3f88137429e08c12201998bc7
Parents: 53c5ccb
Author: Colin P. Mccabe <cm...@confluent.io>
Authored: Thu Nov 16 17:59:24 2017 +0000
Committer: Rajini Sivaram <ra...@googlemail.com>
Committed: Thu Nov 16 17:59:24 2017 +0000
----------------------------------------------------------------------
checkstyle/suppressions.xml | 2 +-
tests/docker/Dockerfile | 4 +
.../trogdor/files_unreadable_fault_spec.py | 58 ++++++
tests/kafkatest/services/trogdor/kibosh.py | 152 ++++++++++++++
tests/kafkatest/tests/tools/kibosh_test.py | 81 ++++++++
.../trogdor/fault/FilesUnreadableFaultSpec.java | 82 ++++++++
.../org/apache/kafka/trogdor/fault/Kibosh.java | 199 +++++++++++++++++++
.../trogdor/fault/KiboshFaultController.java | 36 ++++
.../kafka/trogdor/fault/KiboshFaultWorker.java | 56 ++++++
.../apache/kafka/trogdor/agent/AgentTest.java | 78 ++++++++
10 files changed, 747 insertions(+), 1 deletion(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/kafka/blob/d9cbc6b1/checkstyle/suppressions.xml
----------------------------------------------------------------------
diff --git a/checkstyle/suppressions.xml b/checkstyle/suppressions.xml
index 7c32eb0..6218794 100644
--- a/checkstyle/suppressions.xml
+++ b/checkstyle/suppressions.xml
@@ -45,7 +45,7 @@
<suppress checks="ClassDataAbstractionCoupling"
files="(KafkaConsumer|ConsumerCoordinator|Fetcher|KafkaProducer|AbstractRequest|AbstractResponse|TransactionManager|KafkaAdminClient).java"/>
<suppress checks="ClassDataAbstractionCoupling"
- files="(Errors|SaslAuthenticatorTest).java"/>
+ files="(Errors|SaslAuthenticatorTest|AgentTest).java"/>
<suppress checks="BooleanExpressionComplexity"
files="(Utils|Topic|KafkaLZ4BlockOutputStream|AclData).java"/>
http://git-wip-us.apache.org/repos/asf/kafka/blob/d9cbc6b1/tests/docker/Dockerfile
----------------------------------------------------------------------
diff --git a/tests/docker/Dockerfile b/tests/docker/Dockerfile
index 3ca9993..d855fe4 100644
--- a/tests/docker/Dockerfile
+++ b/tests/docker/Dockerfile
@@ -48,6 +48,10 @@ RUN mkdir -p "/opt/kafka-0.10.1.1" && curl -s "${MIRROR}kafka/0.10.1.1/kafka_2.1
RUN mkdir -p "/opt/kafka-0.10.2.1" && curl -s "${MIRROR}kafka/0.10.2.1/kafka_2.11-0.10.2.1.tgz" | tar xz --strip-components=1 -C "/opt/kafka-0.10.2.1"
RUN mkdir -p "/opt/kafka-0.11.0.0" && curl -s "${MIRROR}kafka/0.11.0.0/kafka_2.11-0.11.0.0.tgz" | tar xz --strip-components=1 -C "/opt/kafka-0.11.0.0"
+# Install Kibosh
+RUN apt-get install fuse
+RUN cd /opt && git clone -q https://github.com/confluentinc/kibosh.git && cd "/opt/kibosh" && git reset --hard 399de967c5520e8fe6e32d4e5c1c55d71cd7f46c && mkdir "/opt/kibosh/build" && cd "/opt/kibosh/build" && ../configure && make -j 2
+
# Set up the ducker user.
RUN useradd -ms /bin/bash ducker && mkdir -p /home/ducker/ && rsync -aiq /root/.ssh/ /home/ducker/.ssh && chown -R ducker /home/ducker/ /mnt/ && echo 'ducker ALL=(ALL) NOPASSWD: ALL' >> /etc/sudoers
USER ducker
http://git-wip-us.apache.org/repos/asf/kafka/blob/d9cbc6b1/tests/kafkatest/services/trogdor/files_unreadable_fault_spec.py
----------------------------------------------------------------------
diff --git a/tests/kafkatest/services/trogdor/files_unreadable_fault_spec.py b/tests/kafkatest/services/trogdor/files_unreadable_fault_spec.py
new file mode 100644
index 0000000..4f0540a
--- /dev/null
+++ b/tests/kafkatest/services/trogdor/files_unreadable_fault_spec.py
@@ -0,0 +1,58 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+from kafkatest.services.trogdor.task_spec import TaskSpec
+
+
+class FilesUnreadableFaultSpec(TaskSpec):
+ """
+ The specification for a fault which makes files unreadable.
+ """
+
+ def __init__(self, start_ms, duration_ms, node_names, mount_path,
+ prefix, error_code):
+ """
+ Create a new FilesUnreadableFaultSpec.
+
+ :param start_ms: The start time, as described in task_spec.py
+ :param duration_ms: The duration in milliseconds.
+ :param node_names: The names of the node(s) to create the fault on.
+ :param mount_path: The mount path.
+ :param prefix: The prefix within the mount point to make unreadable.
+ :param error_code: The error code to use.
+ """
+ super(FilesUnreadableFaultSpec, self).__init__(start_ms, duration_ms)
+ self.node_names = node_names
+ self.mount_path = mount_path
+ self.prefix = prefix
+ self.error_code = error_code
+
+ def message(self):
+ return {
+ "class": "org.apache.kafka.trogdor.fault.FilesUnreadableFaultSpec",
+ "startMs": self.start_ms,
+ "durationMs": self.duration_ms,
+ "nodeNames": self.node_names,
+ "mountPath": self.mount_path,
+ "prefix": self.prefix,
+ "errorCode": self.error_code,
+ }
+
+ def kibosh_message(self):
+ return {
+ "type": "unreadable",
+ "prefix": self.prefix,
+ "code": self.error_code,
+ }
http://git-wip-us.apache.org/repos/asf/kafka/blob/d9cbc6b1/tests/kafkatest/services/trogdor/kibosh.py
----------------------------------------------------------------------
diff --git a/tests/kafkatest/services/trogdor/kibosh.py b/tests/kafkatest/services/trogdor/kibosh.py
new file mode 100644
index 0000000..1bd4224
--- /dev/null
+++ b/tests/kafkatest/services/trogdor/kibosh.py
@@ -0,0 +1,152 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+import json
+import os.path
+
+from ducktape.services.service import Service
+from ducktape.utils import util
+
+
+class KiboshService(Service):
+ """
+ Kibosh is a fault-injecting FUSE filesystem.
+
+ Attributes:
+ INSTALL_ROOT The path of where Kibosh is installed.
+ BINARY_NAME The Kibosh binary name.
+ BINARY_PATH The path to the kibosh binary.
+ """
+ INSTALL_ROOT = "/opt/kibosh/build"
+ BINARY_NAME = "kibosh"
+ BINARY_PATH = os.path.join(INSTALL_ROOT, BINARY_NAME)
+
+ def __init__(self, context, nodes, target, mirror, persist="/mnt/kibosh"):
+ """
+ Create a Kibosh service.
+
+ :param context: The TestContext object.
+ :param nodes: The nodes to put the Kibosh FS on. Kibosh allocates no
+ nodes of its own.
+ :param target: The target directory, which Kibosh exports a view of.
+ :param mirror: The mirror directory, where Kibosh injects faults.
+ :param persist: Where the log files and pid files will be created.
+ """
+ Service.__init__(self, context, num_nodes=0)
+ if (len(nodes) == 0):
+ raise RuntimeError("You must supply at least one node to run the service on.")
+ for node in nodes:
+ self.nodes.append(node)
+
+ self.target = target
+ self.mirror = mirror
+ self.persist = persist
+
+ self.control_path = os.path.join(self.mirror, "kibosh_control")
+ self.pidfile_path = os.path.join(self.persist, "pidfile")
+ self.stdout_stderr_path = os.path.join(self.persist, "kibosh-stdout-stderr.log")
+ self.log_path = os.path.join(self.persist, "kibosh.log")
+ self.logs = {
+ "kibosh-stdout-stderr.log": {
+ "path": self.stdout_stderr_path,
+ "collect_default": True},
+ "kibosh.log": {
+ "path": self.log_path,
+ "collect_default": True}
+ }
+
+ def free(self):
+ """Clear the nodes list."""
+ # Because the filesystem runs on nodes which have been allocated by other services, those nodes
+ # are not deallocated here.
+ self.nodes = []
+ Service.free(self)
+
+ def kibosh_running(self, node):
+ return 0 == node.account.ssh("test -e '%s'" % self.control_path, allow_fail=True)
+
+ def start_node(self, node):
+ node.account.mkdirs(self.persist)
+ cmd = "sudo -E "
+ cmd += " %s" % KiboshService.BINARY_PATH
+ cmd += " --target %s" % self.target
+ cmd += " --pidfile %s" % self.pidfile_path
+ cmd += " --log %s" % self.log_path
+ cmd += " --verbose"
+ cmd += " %s" % self.mirror
+ cmd += " &> %s" % self.stdout_stderr_path
+ node.account.ssh(cmd)
+ util.wait_until(lambda: self.kibosh_running(node), 20, backoff_sec=.1,
+ err_msg="Timed out waiting for kibosh to start on %s" % node.account.hostname)
+
+ def pids(self, node):
+ return [pid for pid in node.account.ssh_capture("test -e '%s' && test -e /proc/$(cat '%s')" %
+ (self.pidfile_path, self.pidfile_path), allow_fail=True)]
+
+ def wait_node(self, node, timeout_sec=None):
+ return len(self.pids(node)) == 0
+
+ def kibosh_process_running(self, node):
+ pids = self.pids(node)
+ if len(pids) == 0:
+ return True
+ return False
+
+ def stop_node(self, node):
+ """Halt kibosh process(es) on this node."""
+ node.account.logger.debug("stop_node(%s): unmounting %s" % (node.name, self.mirror))
+ node.account.ssh("sudo fusermount -u %s" % self.mirror, allow_fail=True)
+ # Wait for the kibosh process to terminate.
+ try:
+ util.wait_until(lambda: self.kibosh_process_running(node), 20, backoff_sec=.1,
+ err_msg="Timed out waiting for kibosh to stop on %s" % node.account.hostname)
+ except TimeoutError:
+ # If the process won't terminate, use kill -9 to shut it down.
+ node.account.logger.debug("stop_node(%s): killing the kibosh process managing %s" % (node.name, self.mirror))
+ node.account.ssh("sudo kill -9 %s" % (" ".join(self.pids(node))), allow_fail=True)
+ node.account.ssh("sudo fusermount -u %s" % self.mirror)
+ util.wait_until(lambda: self.kibosh_process_running(node), 20, backoff_sec=.1,
+ err_msg="Timed out waiting for kibosh to stop on %s" % node.account.hostname)
+
+ def clean_node(self, node):
+ """Clean up persistent state on this node - e.g. service logs, configuration files etc."""
+ self.stop_node(node)
+ node.account.ssh("rm -rf -- %s" % self.persist)
+
+ def set_faults(self, node, specs):
+ """
+ Set the currently active faults.
+
+ :param node: The node.
+ :param spec: An array of FaultSpec objects describing the faults.
+ """
+ fault_array = [spec.kibosh_message() for spec in specs]
+ obj = { 'faults': fault_array }
+ obj_json = json.dumps(obj)
+ node.account.create_file(self.control_path, obj_json)
+
+ def get_fault_json(self, node):
+ """
+ Return a JSON string which contains the currently active faults.
+
+ :param node: The node.
+
+ :returns: The fault JSON describing the faults.
+ """
+ iter = node.account.ssh_capture("cat '%s'" % self.control_path)
+ text = ""
+ for line in iter:
+ text = "%s%s" % (text, line.rstrip("\r\n"))
+ return text
http://git-wip-us.apache.org/repos/asf/kafka/blob/d9cbc6b1/tests/kafkatest/tests/tools/kibosh_test.py
----------------------------------------------------------------------
diff --git a/tests/kafkatest/tests/tools/kibosh_test.py b/tests/kafkatest/tests/tools/kibosh_test.py
new file mode 100644
index 0000000..5844c27
--- /dev/null
+++ b/tests/kafkatest/tests/tools/kibosh_test.py
@@ -0,0 +1,81 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+from ducktape.cluster.cluster_spec import ClusterSpec
+from ducktape.mark.resource import cluster
+from ducktape.tests.test import Test
+from ducktape.utils.util import wait_until
+from kafkatest.services.trogdor.task_spec import TaskSpec
+from kafkatest.services.trogdor.files_unreadable_fault_spec import FilesUnreadableFaultSpec
+from kafkatest.services.trogdor.kibosh import KiboshService
+import json
+
+
+class KiboshTest(Test):
+ TARGET = "/mnt/kibosh-target"
+ MIRROR = "/mnt/kibosh-mirror"
+
+ """
+ Tests the Kibosh fault injection filesystem in isolation.
+ """
+
+ def __init__(self, test_context):
+ super(KiboshTest, self).__init__(test_context)
+
+ def set_up_kibosh(self, num_nodes):
+ self.nodes = self.test_context.cluster.alloc(ClusterSpec.simple_linux(num_nodes))
+ for node in self.nodes:
+ node.account.logger = self.logger
+ node.account.ssh("mkdir -p -- %s %s" % (KiboshTest.TARGET, KiboshTest.MIRROR))
+ self.kibosh = KiboshService(self.test_context, self.nodes,
+ KiboshTest.TARGET, KiboshTest.MIRROR)
+ for node in self.nodes:
+ node.account.logger = self.kibosh.logger
+ self.kibosh.start()
+
+ def setUp(self):
+ self.kibosh = None
+ self.nodes = None
+
+ def tearDown(self):
+ if self.kibosh is not None:
+ self.kibosh.stop()
+ self.kibosh = None
+ for node in self.nodes:
+ node.account.ssh("rm -rf -- %s %s" % (KiboshTest.TARGET, KiboshTest.MIRROR))
+ if self.nodes is not None:
+ self.test_context.cluster.free(self.nodes)
+ self.nodes = None
+
+ @cluster(num_nodes=4)
+ def test_kibosh_service(self):
+ pass
+ """
+ Test that we can bring up kibosh and create a fault.
+ """
+ self.set_up_kibosh(3)
+ spec = FilesUnreadableFaultSpec(0, TaskSpec.MAX_DURATION_MS,
+ [self.nodes[0].name], KiboshTest.TARGET, "/foo", 12)
+ node = self.nodes[0]
+
+ def check(self, node):
+ fault_json = self.kibosh.get_fault_json(node)
+ expected_json = json.dumps({"faults": [spec.kibosh_message()]})
+ self.logger.info("Read back: [%s]. Expected: [%s]." % (fault_json, expected_json))
+ return fault_json == expected_json
+
+ self.kibosh.set_faults(node, [spec])
+ wait_until(lambda: check(self, node),
+ timeout_sec=10, backoff_sec=.2, err_msg="Failed to read back fault array.")
http://git-wip-us.apache.org/repos/asf/kafka/blob/d9cbc6b1/tools/src/main/java/org/apache/kafka/trogdor/fault/FilesUnreadableFaultSpec.java
----------------------------------------------------------------------
diff --git a/tools/src/main/java/org/apache/kafka/trogdor/fault/FilesUnreadableFaultSpec.java b/tools/src/main/java/org/apache/kafka/trogdor/fault/FilesUnreadableFaultSpec.java
new file mode 100644
index 0000000..1fbf9d0
--- /dev/null
+++ b/tools/src/main/java/org/apache/kafka/trogdor/fault/FilesUnreadableFaultSpec.java
@@ -0,0 +1,82 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.trogdor.fault;
+
+import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonProperty;
+import org.apache.kafka.trogdor.fault.Kibosh.KiboshFilesUnreadableFaultSpec;
+import org.apache.kafka.trogdor.task.TaskController;
+import org.apache.kafka.trogdor.task.TaskSpec;
+import org.apache.kafka.trogdor.task.TaskWorker;
+
+import java.util.Set;
+
+/**
+ * The specification for a fault that makes files unreadable.
+ */
+public class FilesUnreadableFaultSpec extends TaskSpec {
+ private final Set<String> nodeNames;
+ private final String mountPath;
+ private final String prefix;
+ private final int errorCode;
+
+ @JsonCreator
+ public FilesUnreadableFaultSpec(@JsonProperty("startMs") long startMs,
+ @JsonProperty("durationMs") long durationMs,
+ @JsonProperty("nodeNames") Set<String> nodeNames,
+ @JsonProperty("mountPath") String mountPath,
+ @JsonProperty("prefix") String prefix,
+ @JsonProperty("errorCode") int errorCode) {
+ super(startMs, durationMs);
+ this.nodeNames = nodeNames;
+ this.mountPath = mountPath;
+ this.prefix = prefix;
+ this.errorCode = errorCode;
+ }
+
+ @JsonProperty
+ public Set<String> nodeNames() {
+ return nodeNames;
+ }
+
+ @JsonProperty
+ public String mountPath() {
+ return mountPath;
+ }
+
+ @JsonProperty
+ public String prefix() {
+ return prefix;
+ }
+
+ @JsonProperty
+ public int errorCode() {
+ return errorCode;
+ }
+
+ @Override
+ public TaskController newController(String id) {
+ return new KiboshFaultController(nodeNames);
+ }
+
+ @Override
+ public TaskWorker newTaskWorker(String id) {
+ return new KiboshFaultWorker(id,
+ new KiboshFilesUnreadableFaultSpec(prefix, errorCode), mountPath);
+ }
+}
http://git-wip-us.apache.org/repos/asf/kafka/blob/d9cbc6b1/tools/src/main/java/org/apache/kafka/trogdor/fault/Kibosh.java
----------------------------------------------------------------------
diff --git a/tools/src/main/java/org/apache/kafka/trogdor/fault/Kibosh.java b/tools/src/main/java/org/apache/kafka/trogdor/fault/Kibosh.java
new file mode 100644
index 0000000..6fa1a4b
--- /dev/null
+++ b/tools/src/main/java/org/apache/kafka/trogdor/fault/Kibosh.java
@@ -0,0 +1,199 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.trogdor.fault;
+
+import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonProperty;
+import com.fasterxml.jackson.annotation.JsonSubTypes;
+import com.fasterxml.jackson.annotation.JsonTypeInfo;
+import org.apache.kafka.trogdor.common.JsonUtil;
+
+import java.io.IOException;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.nio.file.Paths;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.Objects;
+import java.util.TreeMap;
+
+public final class Kibosh {
+ public static final Kibosh INSTANCE = new Kibosh();
+
+ public final static String KIBOSH_CONTROL = "kibosh_control";
+
+ @JsonTypeInfo(use = JsonTypeInfo.Id.NAME,
+ include = JsonTypeInfo.As.PROPERTY,
+ property = "type")
+ @JsonSubTypes({
+ @JsonSubTypes.Type(value = KiboshFilesUnreadableFaultSpec.class, name = "unreadable"),
+ })
+ public static abstract class KiboshFaultSpec {
+ @Override
+ public final boolean equals(Object o) {
+ if (this == o) return true;
+ if (o == null || getClass() != o.getClass()) return false;
+ return Objects.equals(toString(), o.toString());
+ }
+
+ @Override
+ public final int hashCode() {
+ return toString().hashCode();
+ }
+
+ @Override
+ public final String toString() {
+ return JsonUtil.toJsonString(this);
+ }
+ }
+
+ public static class KiboshFilesUnreadableFaultSpec extends KiboshFaultSpec {
+ private final String prefix;
+ private final int errorCode;
+
+ @JsonCreator
+ public KiboshFilesUnreadableFaultSpec(@JsonProperty("prefix") String prefix,
+ @JsonProperty("errorCode") int errorCode) {
+ this.prefix = prefix;
+ this.errorCode = errorCode;
+ }
+
+ @JsonProperty
+ public String prefix() {
+ return prefix;
+ }
+
+ @JsonProperty
+ public int errorCode() {
+ return errorCode;
+ }
+ }
+
+ private static class KiboshProcess {
+ private final Path controlPath;
+
+ KiboshProcess(String mountPath) {
+ this.controlPath = Paths.get(mountPath, KIBOSH_CONTROL);
+ if (!Files.exists(controlPath)) {
+ throw new RuntimeException("Can't find file " + controlPath);
+ }
+ }
+
+ synchronized void addFault(KiboshFaultSpec toAdd) throws IOException {
+ KiboshControlFile file = KiboshControlFile.read(controlPath);
+ List<KiboshFaultSpec> faults = new ArrayList<>(file.faults());
+ faults.add(toAdd);
+ new KiboshControlFile(faults).write(controlPath);
+ }
+
+ synchronized void removeFault(KiboshFaultSpec toRemove) throws IOException {
+ KiboshControlFile file = KiboshControlFile.read(controlPath);
+ List<KiboshFaultSpec> faults = new ArrayList<>();
+ boolean foundToRemove = false;
+ for (KiboshFaultSpec fault : file.faults()) {
+ if (fault.equals(toRemove)) {
+ foundToRemove = true;
+ } else {
+ faults.add(fault);
+ }
+ }
+ if (!foundToRemove) {
+ throw new RuntimeException("Failed to find fault " + toRemove + ". ");
+ }
+ new KiboshControlFile(faults).write(controlPath);
+ }
+ }
+
+ public static class KiboshControlFile {
+ private final List<KiboshFaultSpec> faults;
+
+ public final static KiboshControlFile EMPTY =
+ new KiboshControlFile(Collections.<KiboshFaultSpec>emptyList());
+
+ public static KiboshControlFile read(Path controlPath) throws IOException {
+ byte[] controlFileBytes = Files.readAllBytes(controlPath);
+ return JsonUtil.JSON_SERDE.readValue(controlFileBytes, KiboshControlFile.class);
+ }
+
+ @JsonCreator
+ public KiboshControlFile(@JsonProperty("faults") List<KiboshFaultSpec> faults) {
+ this.faults = faults;
+ }
+
+ @JsonProperty
+ public List<KiboshFaultSpec> faults() {
+ return faults;
+ }
+
+ public void write(Path controlPath) throws IOException {
+ Files.write(controlPath, JsonUtil.JSON_SERDE.writeValueAsBytes(this));
+ }
+
+ @Override
+ public final boolean equals(Object o) {
+ if (this == o) return true;
+ if (o == null || getClass() != o.getClass()) return false;
+ return Objects.equals(toString(), o.toString());
+ }
+
+ @Override
+ public final int hashCode() {
+ return toString().hashCode();
+ }
+
+ @Override
+ public final String toString() {
+ return JsonUtil.toJsonString(this);
+ }
+ }
+
+ private final TreeMap<String, KiboshProcess> processes = new TreeMap<>();
+
+ private Kibosh() {
+ }
+
+ /**
+ * Get or create a KiboshProcess object to manage the Kibosh process at a given path.
+ */
+ private synchronized KiboshProcess findProcessObject(String mountPath) {
+ String path = Paths.get(mountPath).normalize().toString();
+ KiboshProcess process = processes.get(path);
+ if (process == null) {
+ process = new KiboshProcess(mountPath);
+ processes.put(path, process);
+ }
+ return process;
+ }
+
+ /**
+ * Add a new Kibosh fault.
+ */
+ void addFault(String mountPath, KiboshFaultSpec spec) throws IOException {
+ KiboshProcess process = findProcessObject(mountPath);
+ process.addFault(spec);
+ }
+
+ /**
+ * Remove a Kibosh fault.
+ */
+ void removeFault(String mountPath, KiboshFaultSpec spec) throws IOException {
+ KiboshProcess process = findProcessObject(mountPath);
+ process.removeFault(spec);
+ }
+}
http://git-wip-us.apache.org/repos/asf/kafka/blob/d9cbc6b1/tools/src/main/java/org/apache/kafka/trogdor/fault/KiboshFaultController.java
----------------------------------------------------------------------
diff --git a/tools/src/main/java/org/apache/kafka/trogdor/fault/KiboshFaultController.java b/tools/src/main/java/org/apache/kafka/trogdor/fault/KiboshFaultController.java
new file mode 100644
index 0000000..140abf1
--- /dev/null
+++ b/tools/src/main/java/org/apache/kafka/trogdor/fault/KiboshFaultController.java
@@ -0,0 +1,36 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.trogdor.fault;
+
+import org.apache.kafka.trogdor.common.Topology;
+import org.apache.kafka.trogdor.task.TaskController;
+
+import java.util.Set;
+
+public class KiboshFaultController implements TaskController {
+ private final Set<String> nodeNames;
+
+ public KiboshFaultController(Set<String> nodeNames) {
+ this.nodeNames = nodeNames;
+ }
+
+ @Override
+ public Set<String> targetNodes(Topology topology) {
+ return nodeNames;
+ }
+}
http://git-wip-us.apache.org/repos/asf/kafka/blob/d9cbc6b1/tools/src/main/java/org/apache/kafka/trogdor/fault/KiboshFaultWorker.java
----------------------------------------------------------------------
diff --git a/tools/src/main/java/org/apache/kafka/trogdor/fault/KiboshFaultWorker.java b/tools/src/main/java/org/apache/kafka/trogdor/fault/KiboshFaultWorker.java
new file mode 100644
index 0000000..629d15e
--- /dev/null
+++ b/tools/src/main/java/org/apache/kafka/trogdor/fault/KiboshFaultWorker.java
@@ -0,0 +1,56 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.trogdor.fault;
+
+import org.apache.kafka.common.internals.KafkaFutureImpl;
+import org.apache.kafka.trogdor.common.Platform;
+import org.apache.kafka.trogdor.fault.Kibosh.KiboshFaultSpec;
+import org.apache.kafka.trogdor.task.TaskWorker;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.concurrent.atomic.AtomicReference;
+
+public class KiboshFaultWorker implements TaskWorker {
+ private static final Logger log = LoggerFactory.getLogger(KiboshFaultWorker.class);
+
+ private final String id;
+
+ private final KiboshFaultSpec spec;
+
+ private final String mountPath;
+
+ public KiboshFaultWorker(String id, KiboshFaultSpec spec, String mountPath) {
+ this.id = id;
+ this.spec = spec;
+ this.mountPath = mountPath;
+ }
+
+ @Override
+ public void start(Platform platform, AtomicReference<String> status,
+ KafkaFutureImpl<String> errorFuture) throws Exception {
+ log.info("Activating {} {}: {}.", spec.getClass().getSimpleName(), id, spec);
+ Kibosh.INSTANCE.addFault(mountPath, spec);
+ }
+
+ @Override
+ public void stop(Platform platform) throws Exception {
+ log.info("Deactivating {} {}: {}.", spec.getClass().getSimpleName(), id, spec);
+ Kibosh.INSTANCE.removeFault(mountPath, spec);
+ }
+}
http://git-wip-us.apache.org/repos/asf/kafka/blob/d9cbc6b1/tools/src/test/java/org/apache/kafka/trogdor/agent/AgentTest.java
----------------------------------------------------------------------
diff --git a/tools/src/test/java/org/apache/kafka/trogdor/agent/AgentTest.java b/tools/src/test/java/org/apache/kafka/trogdor/agent/AgentTest.java
index 342fefc..b5fa001 100644
--- a/tools/src/test/java/org/apache/kafka/trogdor/agent/AgentTest.java
+++ b/tools/src/test/java/org/apache/kafka/trogdor/agent/AgentTest.java
@@ -20,12 +20,18 @@ package org.apache.kafka.trogdor.agent;
import org.apache.kafka.common.utils.MockScheduler;
import org.apache.kafka.common.utils.MockTime;
import org.apache.kafka.common.utils.Scheduler;
+import org.apache.kafka.common.utils.Utils;
+import org.apache.kafka.test.TestUtils;
import org.apache.kafka.trogdor.basic.BasicNode;
import org.apache.kafka.trogdor.basic.BasicPlatform;
import org.apache.kafka.trogdor.basic.BasicTopology;
import org.apache.kafka.trogdor.common.ExpectedTasks;
import org.apache.kafka.trogdor.common.ExpectedTasks.ExpectedTaskBuilder;
import org.apache.kafka.trogdor.common.Node;
+import org.apache.kafka.trogdor.fault.FilesUnreadableFaultSpec;
+import org.apache.kafka.trogdor.fault.Kibosh;
+import org.apache.kafka.trogdor.fault.Kibosh.KiboshControlFile;
+import org.apache.kafka.trogdor.fault.Kibosh.KiboshFilesUnreadableFaultSpec;
import org.apache.kafka.trogdor.rest.AgentStatusResponse;
import org.apache.kafka.trogdor.rest.CreateWorkerRequest;
@@ -36,10 +42,16 @@ import org.apache.kafka.trogdor.rest.WorkerDone;
import org.apache.kafka.trogdor.rest.WorkerRunning;
import org.apache.kafka.trogdor.task.NoOpTaskSpec;
import org.apache.kafka.trogdor.task.SampleTaskSpec;
+import org.junit.Assert;
import org.junit.Rule;
import org.junit.rules.Timeout;
import org.junit.Test;
+import java.io.File;
+import java.io.IOException;
+import java.nio.file.Path;
+import java.nio.file.Paths;
+import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.TreeMap;
@@ -235,4 +247,70 @@ public class AgentTest {
build()).
waitFor(client);
}
+
+ private static class MockKibosh implements AutoCloseable {
+ private final File tempDir;
+ private final Path controlFile;
+
+ MockKibosh() throws IOException {
+ tempDir = TestUtils.tempDirectory();
+ controlFile = Paths.get(tempDir.toPath().toString(), Kibosh.KIBOSH_CONTROL);
+ KiboshControlFile.EMPTY.write(controlFile);
+ }
+
+ KiboshControlFile read() throws IOException {
+ return KiboshControlFile.read(controlFile);
+ }
+
+ @Override
+ public void close() throws Exception {
+ Utils.delete(tempDir);
+ }
+ }
+
+ @Test
+ public void testKiboshFaults() throws Exception {
+ MockTime time = new MockTime(0, 0, 0);
+ MockScheduler scheduler = new MockScheduler(time);
+ Agent agent = createAgent(scheduler);
+ AgentClient client = new AgentClient(10, "localhost", agent.port());
+ new ExpectedTasks().waitFor(client);
+
+ try (MockKibosh mockKibosh = new MockKibosh()) {
+ Assert.assertEquals(KiboshControlFile.EMPTY, mockKibosh.read());
+ FilesUnreadableFaultSpec fooSpec = new FilesUnreadableFaultSpec(0, 900000,
+ Collections.singleton("myAgent"), mockKibosh.tempDir.getPath().toString(), "/foo", 123);
+ client.createWorker(new CreateWorkerRequest("foo", fooSpec));
+ new ExpectedTasks().
+ addTask(new ExpectedTaskBuilder("foo").
+ workerState(new WorkerRunning(fooSpec, 0, "")).
+ build()).
+ waitFor(client);
+ Assert.assertEquals(new KiboshControlFile(Collections.<Kibosh.KiboshFaultSpec>singletonList(
+ new KiboshFilesUnreadableFaultSpec("/foo", 123))), mockKibosh.read());
+ FilesUnreadableFaultSpec barSpec = new FilesUnreadableFaultSpec(0, 900000,
+ Collections.singleton("myAgent"), mockKibosh.tempDir.getPath().toString(), "/bar", 456);
+ client.createWorker(new CreateWorkerRequest("bar", barSpec));
+ new ExpectedTasks().
+ addTask(new ExpectedTaskBuilder("foo").
+ workerState(new WorkerRunning(fooSpec, 0, "")).build()).
+ addTask(new ExpectedTaskBuilder("bar").
+ workerState(new WorkerRunning(barSpec, 0, "")).build()).
+ waitFor(client);
+ Assert.assertEquals(new KiboshControlFile(new ArrayList<Kibosh.KiboshFaultSpec>() {{
+ add(new KiboshFilesUnreadableFaultSpec("/foo", 123));
+ add(new KiboshFilesUnreadableFaultSpec("/bar", 456));
+ }}), mockKibosh.read());
+ time.sleep(1);
+ client.stopWorker(new StopWorkerRequest("foo"));
+ new ExpectedTasks().
+ addTask(new ExpectedTaskBuilder("foo").
+ workerState(new WorkerDone(fooSpec, 0, 1, "", "")).build()).
+ addTask(new ExpectedTaskBuilder("bar").
+ workerState(new WorkerRunning(barSpec, 0, "")).build()).
+ waitFor(client);
+ Assert.assertEquals(new KiboshControlFile(Collections.<Kibosh.KiboshFaultSpec>singletonList(
+ new KiboshFilesUnreadableFaultSpec("/bar", 456))), mockKibosh.read());
+ }
+ }
};