You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by rs...@apache.org on 2017/09/07 12:23:17 UTC
kafka git commit: KAFKA-5777; Add ducktape integration for Trogdor
Repository: kafka
Updated Branches:
refs/heads/trunk 329d5fa64 -> 4065ffb3e
KAFKA-5777; Add ducktape integration for Trogdor
Author: Colin P. Mccabe <cm...@confluent.io>
Reviewers: Rajini Sivaram <ra...@googlemail.com>
Closes #3726 from cmccabe/KAFKA-5777
Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/4065ffb3
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/4065ffb3
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/4065ffb3
Branch: refs/heads/trunk
Commit: 4065ffb3e1a82f71578811a0a329cdc90e5e1567
Parents: 329d5fa
Author: Colin P. Mccabe <cm...@confluent.io>
Authored: Thu Sep 7 13:23:03 2017 +0100
Committer: Rajini Sivaram <ra...@googlemail.com>
Committed: Thu Sep 7 13:23:03 2017 +0100
----------------------------------------------------------------------
tests/kafkatest/services/trogdor/__init__.py | 14 +
tests/kafkatest/services/trogdor/fault_spec.py | 45 ++++
.../trogdor/network_partition_fault_spec.py | 54 ++++
.../services/trogdor/no_op_fault_spec.py | 41 +++
.../services/trogdor/templates/log4j.properties | 23 ++
tests/kafkatest/services/trogdor/trogdor.py | 255 +++++++++++++++++++
tests/kafkatest/tests/tools/trogdor_test.py | 97 +++++++
tests/kafkatest/utils/__init__.py | 2 +-
tests/kafkatest/utils/util.py | 9 +
.../org/apache/kafka/trogdor/agent/Agent.java | 50 +++-
.../kafka/trogdor/basic/BasicPlatform.java | 13 +-
.../kafka/trogdor/coordinator/Coordinator.java | 32 ++-
.../kafka/trogdor/coordinator/NodeManager.java | 6 +
.../kafka/trogdor/fault/AbstractFault.java | 106 ++++++++
.../apache/kafka/trogdor/fault/DoneState.java | 47 ++++
.../org/apache/kafka/trogdor/fault/Fault.java | 28 +-
.../apache/kafka/trogdor/fault/FaultState.java | 36 ++-
.../trogdor/fault/NetworkPartitionFault.java | 46 +---
.../apache/kafka/trogdor/fault/NoOpFault.java | 43 +---
.../kafka/trogdor/fault/PendingState.java | 30 +++
.../kafka/trogdor/fault/RunningState.java | 38 +++
.../kafka/trogdor/fault/SendingState.java | 64 +++++
.../apache/kafka/trogdor/rest/FaultDataMap.java | 2 +-
.../apache/kafka/trogdor/agent/AgentTest.java | 37 ++-
.../trogdor/coordinator/CoordinatorTest.java | 18 +-
25 files changed, 993 insertions(+), 143 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/kafka/blob/4065ffb3/tests/kafkatest/services/trogdor/__init__.py
----------------------------------------------------------------------
diff --git a/tests/kafkatest/services/trogdor/__init__.py b/tests/kafkatest/services/trogdor/__init__.py
new file mode 100644
index 0000000..ec20143
--- /dev/null
+++ b/tests/kafkatest/services/trogdor/__init__.py
@@ -0,0 +1,14 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
http://git-wip-us.apache.org/repos/asf/kafka/blob/4065ffb3/tests/kafkatest/services/trogdor/fault_spec.py
----------------------------------------------------------------------
diff --git a/tests/kafkatest/services/trogdor/fault_spec.py b/tests/kafkatest/services/trogdor/fault_spec.py
new file mode 100644
index 0000000..9768765
--- /dev/null
+++ b/tests/kafkatest/services/trogdor/fault_spec.py
@@ -0,0 +1,45 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+import json
+
+
+class FaultSpec(object):
+ """
+ The base class for a fault specification.
+
+ MAX_DURATION_MS The longest duration we should use for a fault specification.
+ """
+
+ MAX_DURATION_MS=10000000
+
+ def __init__(self, start_ms, duration_ms):
+ """
+ Create a new fault specification.
+
+ :param start_ms: The start time in milliseconds since the epoch.
+ :param duration_ms: The duration in milliseconds.
+ """
+ self.start_ms = start_ms
+ self.duration_ms = duration_ms
+
+ def message(self):
+ """
+ Return a message suitable for sending to the Trogdor daemon.
+ """
+ raise NotImplemented
+
+ def __str__(self):
+ return json.dumps(self.message())
http://git-wip-us.apache.org/repos/asf/kafka/blob/4065ffb3/tests/kafkatest/services/trogdor/network_partition_fault_spec.py
----------------------------------------------------------------------
diff --git a/tests/kafkatest/services/trogdor/network_partition_fault_spec.py b/tests/kafkatest/services/trogdor/network_partition_fault_spec.py
new file mode 100644
index 0000000..deb5c56
--- /dev/null
+++ b/tests/kafkatest/services/trogdor/network_partition_fault_spec.py
@@ -0,0 +1,54 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+from kafkatest.services.trogdor.fault_spec import FaultSpec
+
+
+class NetworkPartitionFaultSpec(FaultSpec):
+ """
+ The specification for a network partition fault.
+
+ Network partition faults fracture the network into different partitions
+ that cannot communicate with each other.
+ """
+
+ def __init__(self, start_ms, duration_ms, partitions):
+ """
+ Create a new NetworkPartitionFaultSpec.
+
+ :param start_ms: The start time, as described in fault_spec.py
+ :param duration_ms: The duration in milliseconds.
+ :param partitions: An array of arrays describing the partitions.
+ The inner arrays may contain either node names,
+ or ClusterNode objects.
+ """
+ super(NetworkPartitionFaultSpec, self).__init__(start_ms, duration_ms)
+ self.partitions = []
+ for partition in partitions:
+ nodes = []
+ for obj in partition:
+ if isinstance(obj, basestring):
+ nodes.append(obj)
+ else:
+ nodes.append(obj.name)
+ self.partitions.append(nodes)
+
+ def message(self):
+ return {
+ "class": "org.apache.kafka.trogdor.fault.NetworkPartitionFaultSpec",
+ "startMs": self.start_ms,
+ "durationMs": self.duration_ms,
+ "partitions": self.partitions,
+ }
http://git-wip-us.apache.org/repos/asf/kafka/blob/4065ffb3/tests/kafkatest/services/trogdor/no_op_fault_spec.py
----------------------------------------------------------------------
diff --git a/tests/kafkatest/services/trogdor/no_op_fault_spec.py b/tests/kafkatest/services/trogdor/no_op_fault_spec.py
new file mode 100644
index 0000000..82e9713
--- /dev/null
+++ b/tests/kafkatest/services/trogdor/no_op_fault_spec.py
@@ -0,0 +1,41 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+from kafkatest.services.trogdor.fault_spec import FaultSpec
+
+
+class NoOpFaultSpec(FaultSpec):
+ """
+ The specification for a nop-op fault.
+
+ No-op faults are used to test the fault injector. They don't do anything,
+ but must be propagated to all fault injector daemons.
+ """
+
+ def __init__(self, start_ms, duration_ms):
+ """
+ Create a new NoOpFault.
+
+ :param start_ms: The start time, as described in fault_spec.py
+ :param duration_ms: The duration in milliseconds.
+ """
+ super(NoOpFaultSpec, self).__init__(start_ms, duration_ms)
+
+ def message(self):
+ return {
+ "class": "org.apache.kafka.trogdor.fault.NoOpFaultSpec",
+ "startMs": self.start_ms,
+ "durationMs": self.duration_ms,
+ }
http://git-wip-us.apache.org/repos/asf/kafka/blob/4065ffb3/tests/kafkatest/services/trogdor/templates/log4j.properties
----------------------------------------------------------------------
diff --git a/tests/kafkatest/services/trogdor/templates/log4j.properties b/tests/kafkatest/services/trogdor/templates/log4j.properties
new file mode 100644
index 0000000..252668e
--- /dev/null
+++ b/tests/kafkatest/services/trogdor/templates/log4j.properties
@@ -0,0 +1,23 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+log4j.rootLogger=DEBUG, mylogger
+log4j.logger.kafka=DEBUG
+log4j.logger.org.apache.kafka=DEBUG
+log4j.logger.org.eclipse=INFO
+log4j.appender.mylogger=org.apache.log4j.FileAppender
+log4j.appender.mylogger.File={{ log_path }}
+log4j.appender.mylogger.layout=org.apache.log4j.PatternLayout
+log4j.appender.mylogger.layout.ConversionPattern=[%d] %p %m (%c)%n
http://git-wip-us.apache.org/repos/asf/kafka/blob/4065ffb3/tests/kafkatest/services/trogdor/trogdor.py
----------------------------------------------------------------------
diff --git a/tests/kafkatest/services/trogdor/trogdor.py b/tests/kafkatest/services/trogdor/trogdor.py
new file mode 100644
index 0000000..8b05e99
--- /dev/null
+++ b/tests/kafkatest/services/trogdor/trogdor.py
@@ -0,0 +1,255 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+import json
+import os.path
+import requests
+from requests.adapters import HTTPAdapter
+from requests.packages.urllib3 import Retry
+
+from ducktape.services.service import Service
+from kafkatest.directory_layout.kafka_path import KafkaPathResolverMixin
+
+
+class TrogdorService(KafkaPathResolverMixin, Service):
+ """
+ A ducktape service for running the trogdor fault injection daemons.
+
+ Attributes:
+ PERSISTENT_ROOT The root filesystem path to store service files under.
+ COORDINATOR_STDOUT_STDERR The path where we store the coordinator's stdout/stderr output.
+ AGENT_STDOUT_STDERR The path where we store the agents's stdout/stderr output.
+ COORDINATOR_LOG The path where we store the coordinator's log4j output.
+ AGENT_LOG The path where we store the agent's log4j output.
+ AGENT_LOG4J_PROPERTIES The path to the agent log4j.properties file for log config.
+ COORDINATOR_LOG4J_PROPERTIES The path to the coordinator log4j.properties file for log config.
+ CONFIG_PATH The path to the trogdor configuration file.
+ DEFAULT_AGENT_PORT The default port to use for trogdor_agent daemons.
+ DEFAULT_COORDINATOR_PORT The default port to use for trogdor_coordinator daemons.
+ REQUEST_TIMEOUT The request timeout in seconds to use for REST requests.
+ REQUEST_HEADERS The request headers to use when communicating with trogdor.
+ """
+
+ PERSISTENT_ROOT="/mnt/trogdor"
+ COORDINATOR_STDOUT_STDERR = os.path.join(PERSISTENT_ROOT, "trogdor-coordinator-stdout-stderr.log")
+ AGENT_STDOUT_STDERR = os.path.join(PERSISTENT_ROOT, "trogdor-agent-stdout-stderr.log")
+ COORDINATOR_LOG = os.path.join(PERSISTENT_ROOT, "trogdor-coordinator.log")
+ AGENT_LOG = os.path.join(PERSISTENT_ROOT, "trogdor-agent.log")
+ COORDINATOR_LOG4J_PROPERTIES = os.path.join(PERSISTENT_ROOT, "trogdor-coordinator-log4j.properties")
+ AGENT_LOG4J_PROPERTIES = os.path.join(PERSISTENT_ROOT, "trogdor-agent-log4j.properties")
+ CONFIG_PATH = os.path.join(PERSISTENT_ROOT, "trogdor.conf")
+ DEFAULT_AGENT_PORT=8888
+ DEFAULT_COORDINATOR_PORT=8889
+ REQUEST_TIMEOUT=5
+ REQUEST_HEADERS = {"Content-type": "application/json"}
+
+ logs = {
+ "trogdor_coordinator_stdout_stderr": {
+ "path": COORDINATOR_STDOUT_STDERR,
+ "collect_default": True},
+ "trogdor_agent_stdout_stderr": {
+ "path": AGENT_STDOUT_STDERR,
+ "collect_default": True},
+ "trogdor_coordinator_log": {
+ "path": COORDINATOR_LOG,
+ "collect_default": True},
+ "trogdor_agent_log": {
+ "path": AGENT_LOG,
+ "collect_default": True},
+ }
+
+ def __init__(self, context, agent_nodes, agent_port=DEFAULT_AGENT_PORT,
+ coordinator_port=DEFAULT_COORDINATOR_PORT):
+ """
+ Create a Trogdor service.
+
+ :param context: The test context.
+ :param agent_nodes: The nodes to run the agents on.
+ :param agent_port: The port to use for the trogdor_agent daemons.
+ :param coordinator_port: The port to use for the trogdor_coordinator daemons.
+ """
+ Service.__init__(self, context, num_nodes=1)
+ self.coordinator_node = self.nodes[0]
+ if (len(agent_nodes) == 0):
+ raise RuntimeError("You must supply at least one node to run the service on.")
+ for agent_node in agent_nodes:
+ self.nodes.append(agent_node)
+ self.agent_port = agent_port
+ self.coordinator_port = coordinator_port
+
+ def free(self):
+ # We only want to deallocate the coordinator node, not the agent nodes. So we
+ # change self.nodes to include only the coordinator node, and then invoke
+ # the base class' free method.
+ if self.coordinator_node is not None:
+ self.nodes = [self.coordinator_node]
+ self.coordinator_node = None
+ Service.free(self)
+
+ def _create_config_dict(self):
+ """
+ Create a dictionary with the Trogdor configuration.
+
+ :return: The configuration dictionary.
+ """
+ dict_nodes = {}
+ for node in self.nodes:
+ dict_nodes[node.name] = {
+ "hostname": node.account.ssh_hostname,
+ "trogdor.agent.port": self.agent_port,
+ }
+ dict_nodes[self.coordinator_node.name]["trogdor.coordinator.port"] = self.coordinator_port
+ return {
+ "platform": "org.apache.kafka.trogdor.basic.BasicPlatform",
+ "nodes": dict_nodes,
+ }
+
+ def start_node(self, node):
+ node.account.mkdirs(TrogdorService.PERSISTENT_ROOT)
+
+ # Create the configuration file on the node.
+ str = json.dumps(self._create_config_dict(), indent=2)
+ self.logger.info("Creating configuration file %s with %s" % (TrogdorService.CONFIG_PATH, str))
+ node.account.create_file(TrogdorService.CONFIG_PATH, str)
+
+ if self.is_coordinator(node):
+ self._start_coordinator_node(node)
+ else:
+ self._start_agent_node(node)
+
+ def _start_coordinator_node(self, node):
+ node.account.create_file(TrogdorService.COORDINATOR_LOG4J_PROPERTIES,
+ self.render('log4j.properties',
+ log_path=TrogdorService.COORDINATOR_LOG))
+ self._start_trogdor_daemon("coordinator", TrogdorService.COORDINATOR_STDOUT_STDERR,
+ TrogdorService.COORDINATOR_LOG4J_PROPERTIES,
+ TrogdorService.COORDINATOR_LOG, node)
+ self.logger.info("Started trogdor coordinator on %s." % node.name)
+
+ def _start_agent_node(self, node):
+ node.account.create_file(TrogdorService.AGENT_LOG4J_PROPERTIES,
+ self.render('log4j.properties',
+ log_path=TrogdorService.AGENT_LOG))
+ self._start_trogdor_daemon("agent", TrogdorService.AGENT_STDOUT_STDERR,
+ TrogdorService.AGENT_LOG4J_PROPERTIES,
+ TrogdorService.AGENT_LOG, node)
+ self.logger.info("Started trogdor agent on %s." % node.name)
+
+ def _start_trogdor_daemon(self, daemon_name, stdout_stderr_capture_path,
+ log4j_properties_path, log_path, node):
+ cmd = "export KAFKA_LOG4J_OPTS='-Dlog4j.configuration=file:%s'; " % log4j_properties_path
+ cmd += "%s %s --%s.config %s --node-name %s 1>> %s 2>> %s &" % \
+ (self.path.script("trogdor.sh", node),
+ daemon_name,
+ daemon_name,
+ TrogdorService.CONFIG_PATH,
+ node.name,
+ stdout_stderr_capture_path,
+ stdout_stderr_capture_path)
+ node.account.ssh(cmd)
+ with node.account.monitor_log(log_path) as monitor:
+ monitor.wait_until("Starting main service thread.", timeout_sec=30, backoff_sec=.25,
+ err_msg=("%s on %s didn't finish startup" % (daemon_name, node.name)))
+
+ def wait_node(self, node, timeout_sec=None):
+ if self.is_coordinator(node):
+ return len(node.account.java_pids(self.coordinator_class_name())) == 0
+ else:
+ return len(node.account.java_pids(self.agent_class_name())) == 0
+
+ def stop_node(self, node):
+ """Halt trogdor processes on this node."""
+ if self.is_coordinator(node):
+ node.account.kill_java_processes(self.coordinator_class_name())
+ else:
+ node.account.kill_java_processes(self.agent_class_name())
+
+ def clean_node(self, node):
+ """Clean up persistent state on this node - e.g. service logs, configuration files etc."""
+ self.stop_node(node)
+ node.account.ssh("rm -rf -- %s" % TrogdorService.PERSISTENT_ROOT)
+
+ def _coordinator_url(self, path):
+ return "http://%s:%d/coordinator/%s" % \
+ (self.coordinator_node.account.ssh_hostname, self.coordinator_port, path)
+
+ def request_session(self):
+ """
+ Creates a new request session which will retry for a while.
+ """
+ session = requests.Session()
+ session.mount('http://',
+ HTTPAdapter(max_retries=Retry(total=4, backoff_factor=0.3)))
+ return session
+
+ def _coordinator_put(self, path, message):
+ """
+ Make a PUT request to the Trogdor coordinator.
+
+ :param path: The URL path to use.
+ :param message: The message object to send.
+ :return: The response as an object.
+ """
+ url = self._coordinator_url(path)
+ self.logger.info("PUT %s %s" % (url, message))
+ response = self.request_session().put(url, json=message,
+ timeout=TrogdorService.REQUEST_TIMEOUT,
+ headers=TrogdorService.REQUEST_HEADERS)
+ response.raise_for_status()
+ return response.json()
+
+ def _coordinator_get(self, path, message):
+ """
+ Make a GET request to the Trogdor coordinator.
+
+ :param path: The URL path to use.
+ :param message: The message object to send.
+ :return: The response as an object.
+ """
+ url = self._coordinator_url(path)
+ self.logger.info("GET %s %s" % (url, message))
+ response = self.request_session().get(url, json=message,
+ timeout=TrogdorService.REQUEST_TIMEOUT,
+ headers=TrogdorService.REQUEST_HEADERS)
+ response.raise_for_status()
+ return response.json()
+
+ def create_fault(self, id, spec):
+ """
+ Create a new fault.
+
+ :param id: The fault id.
+ :param spec: The fault spec.
+ """
+ self._coordinator_put("fault", { "id": id, "spec": spec.message()})
+
+ def get_faults(self):
+ """
+ Get the faults which are on the coordinator.
+
+ :returns: A map of fault id strings to fault data objects.
+ Fault data objects contain a 'spec' field with the spec
+ and a 'state' field with the state.
+ """
+ return self._coordinator_get("faults", {})
+
+ def is_coordinator(self, node):
+ return node == self.coordinator_node
+
+ def agent_class_name(self):
+ return "org.apache.kafka.trogdor.agent.Agent"
+
+ def coordinator_class_name(self):
+ return "org.apache.kafka.trogdor.coordinator.Coordinator"
http://git-wip-us.apache.org/repos/asf/kafka/blob/4065ffb3/tests/kafkatest/tests/tools/trogdor_test.py
----------------------------------------------------------------------
diff --git a/tests/kafkatest/tests/tools/trogdor_test.py b/tests/kafkatest/tests/tools/trogdor_test.py
new file mode 100644
index 0000000..026ecaf
--- /dev/null
+++ b/tests/kafkatest/tests/tools/trogdor_test.py
@@ -0,0 +1,97 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+from kafkatest.services.trogdor.network_partition_fault_spec import NetworkPartitionFaultSpec
+
+from ducktape.cluster.cluster_spec import ClusterSpec
+from ducktape.mark.resource import cluster
+from ducktape.tests.test import Test
+from ducktape.utils.util import wait_until
+from kafkatest.services.trogdor.fault_spec import FaultSpec
+from kafkatest.services.trogdor.no_op_fault_spec import NoOpFaultSpec
+from kafkatest.services.trogdor.trogdor import TrogdorService
+from kafkatest.utils import node_is_reachable
+
+
+class TrogdorTest(Test):
+ """
+ Tests the Trogdor fault injection daemon in isolation.
+ """
+
+ def __init__(self, test_context):
+ super(TrogdorTest, self).__init__(test_context)
+
+ def set_up_trogdor(self, num_agent_nodes):
+ self.agent_nodes = self.test_context.cluster.alloc(ClusterSpec.simple_linux(num_agent_nodes))
+ self.trogdor = TrogdorService(context=self.test_context, agent_nodes=self.agent_nodes)
+ for agent_node in self.agent_nodes:
+ agent_node.account.logger = self.trogdor.logger
+ self.trogdor.start()
+
+ def setUp(self):
+ self.trogdor = None
+ self.agent_nodes = None
+
+ def tearDown(self):
+ if self.trogdor is not None:
+ self.trogdor.stop()
+ self.trogdor = None
+ if self.agent_nodes is not None:
+ self.test_context.cluster.free(self.agent_nodes)
+ self.agent_nodes = None
+
+ @cluster(num_nodes=4)
+ def test_trogdor_service(self):
+ """
+ Test that we can bring up Trogdor and create a no-op fault.
+ """
+ self.set_up_trogdor(3)
+ spec = NoOpFaultSpec(0, FaultSpec.MAX_DURATION_MS)
+ self.trogdor.create_fault("myfault", spec)
+ def check_for_faults():
+ faults = self.trogdor.get_faults()
+ self.logger.info("faults = %s" % faults)
+ return "myfault" in faults
+ wait_until(lambda: check_for_faults,
+ timeout_sec=10, backoff_sec=.2, err_msg="Failed to read back myfault.")
+
+ @cluster(num_nodes=4)
+ def test_network_partition_fault(self):
+ """
+ Test that the network partition fault results in a true network partition between nodes.
+ """
+ self.set_up_trogdor(3)
+ spec = NetworkPartitionFaultSpec(0, FaultSpec.MAX_DURATION_MS,
+ [[self.agent_nodes[0]], self.agent_nodes[1:]])
+ assert 2 == len(spec.partitions)
+ assert [self.agent_nodes[0].name] == spec.partitions[0]
+ assert [self.agent_nodes[1].name, self.agent_nodes[2].name] == spec.partitions[1]
+ self.trogdor.create_fault("partition0", spec)
+ def verify_nodes_partitioned():
+ if node_is_reachable(self.agent_nodes[0], self.agent_nodes[1]):
+ return False
+ if node_is_reachable(self.agent_nodes[1], self.agent_nodes[0]):
+ return False
+ if node_is_reachable(self.agent_nodes[2], self.agent_nodes[0]):
+ return False
+ return True
+ wait_until(lambda: verify_nodes_partitioned,
+ timeout_sec=10, backoff_sec=.2, err_msg="Failed to verify that the nodes were partitioned.")
+ if not node_is_reachable(self.agent_nodes[0], self.agent_nodes[0]):
+ raise RuntimeError("Node 0 must be reachable from itself.")
+ if not node_is_reachable(self.agent_nodes[1], self.agent_nodes[2]):
+ raise RuntimeError("Node 2 must be reachable from node 1.")
+ if not node_is_reachable(self.agent_nodes[2], self.agent_nodes[1]):
+ raise RuntimeError("Node 1 must be reachable from node 2.")
http://git-wip-us.apache.org/repos/asf/kafka/blob/4065ffb3/tests/kafkatest/utils/__init__.py
----------------------------------------------------------------------
diff --git a/tests/kafkatest/utils/__init__.py b/tests/kafkatest/utils/__init__.py
index 6618016..8c473bf 100644
--- a/tests/kafkatest/utils/__init__.py
+++ b/tests/kafkatest/utils/__init__.py
@@ -13,4 +13,4 @@
# See the License for the specific language governing permissions and
# limitations under the License.
-from util import kafkatest_version, is_version, is_int, is_int_with_prefix
+from util import kafkatest_version, is_version, is_int, is_int_with_prefix, node_is_reachable
http://git-wip-us.apache.org/repos/asf/kafka/blob/4065ffb3/tests/kafkatest/utils/util.py
----------------------------------------------------------------------
diff --git a/tests/kafkatest/utils/util.py b/tests/kafkatest/utils/util.py
index d82903a..dd20273 100644
--- a/tests/kafkatest/utils/util.py
+++ b/tests/kafkatest/utils/util.py
@@ -103,3 +103,12 @@ def is_int_with_prefix(msg):
"prefix dot integer value, but one of the two parts (before or after dot) "
"are not integers. Message: %s" % (msg))
+def node_is_reachable(src_node, dst_node):
+ """
+ Returns true if a node is unreachable from another node.
+
+ :param src_node: The source node to check from reachability from.
+ :param dst_node: The destination node to check for reachability to.
+ :return: True only if dst is reachable from src.
+ """
+ return 0 == src_node.account.ssh("nc -w 3 -z %s 22" % dst_node.account.hostname, allow_fail=True)
http://git-wip-us.apache.org/repos/asf/kafka/blob/4065ffb3/tools/src/main/java/org/apache/kafka/trogdor/agent/Agent.java
----------------------------------------------------------------------
diff --git a/tools/src/main/java/org/apache/kafka/trogdor/agent/Agent.java b/tools/src/main/java/org/apache/kafka/trogdor/agent/Agent.java
index 0cb01ad..0ddf4c1 100644
--- a/tools/src/main/java/org/apache/kafka/trogdor/agent/Agent.java
+++ b/tools/src/main/java/org/apache/kafka/trogdor/agent/Agent.java
@@ -29,7 +29,7 @@ import org.apache.kafka.trogdor.common.Platform;
import org.apache.kafka.trogdor.fault.Fault;
import org.apache.kafka.trogdor.fault.FaultSet;
import org.apache.kafka.trogdor.fault.FaultSpec;
-import org.apache.kafka.trogdor.fault.FaultState;
+import org.apache.kafka.trogdor.fault.RunningState;
import org.apache.kafka.trogdor.rest.AgentFaultsResponse;
import org.apache.kafka.trogdor.rest.CreateAgentFaultRequest;
import org.apache.kafka.trogdor.rest.JsonRestServer;
@@ -140,7 +140,8 @@ public final class Agent {
Iterator<Fault> running = runningFaults.iterateByEnd();
while (running.hasNext()) {
Fault fault = running.next();
- long endMs = fault.spec().startMs() + fault.spec().durationMs();
+ RunningState state = (RunningState) fault.state();
+ long endMs = state.startedMs() + fault.spec().durationMs();
if (now < endMs) {
nextWakeMs = Math.min(nextWakeMs, endMs);
break;
@@ -154,7 +155,7 @@ public final class Agent {
for (Fault fault: toStart) {
try {
log.debug("Activating fault " + fault);
- fault.activate(platform);
+ fault.activate(now, platform);
started.add(fault);
} catch (Throwable e) {
log.error("Error activating fault " + fault.id(), e);
@@ -164,7 +165,7 @@ public final class Agent {
for (Fault fault: toEnd) {
try {
log.debug("Deactivating fault " + fault);
- fault.deactivate(platform);
+ fault.deactivate(now, platform);
} catch (Throwable e) {
log.error("Error deactivating fault " + fault.id(), e);
} finally {
@@ -200,8 +201,33 @@ public final class Agent {
} finally {
log.info("AgentRunnable shutting down.");
restServer.stop();
+ int numDeactivated = deactivateRunningFaults();
+ log.info("AgentRunnable deactivated {} fault(s).", numDeactivated);
+ }
+ }
+ }
+
+ private int deactivateRunningFaults() {
+ long now = time.milliseconds();
+ int numDeactivated = 0;
+ lock.lock();
+ try {
+ for (Iterator<Fault> iter = runningFaults.iterateByStart(); iter.hasNext(); ) {
+ Fault fault = iter.next();
+ try {
+ numDeactivated++;
+ iter.remove();
+ fault.deactivate(now, platform);
+ } catch (Exception e) {
+ log.error("Got exception while deactivating {}", fault, e);
+ } finally {
+ doneFaults.add(fault);
+ }
}
+ } finally {
+ lock.unlock();
}
+ return numDeactivated;
}
/**
@@ -257,9 +283,9 @@ public final class Agent {
Map<String, AgentFaultsResponse.FaultData> faultData = new TreeMap<>();
lock.lock();
try {
- updateFaultsResponse(faultData, pendingFaults, FaultState.PENDING);
- updateFaultsResponse(faultData, runningFaults, FaultState.RUNNING);
- updateFaultsResponse(faultData, doneFaults, FaultState.DONE);
+ updateFaultsResponse(faultData, pendingFaults);
+ updateFaultsResponse(faultData, runningFaults);
+ updateFaultsResponse(faultData, doneFaults);
} finally {
lock.unlock();
}
@@ -267,12 +293,12 @@ public final class Agent {
}
private void updateFaultsResponse(Map<String, AgentFaultsResponse.FaultData> faultData,
- FaultSet faultSet, FaultState state) {
+ FaultSet faultSet) {
for (Iterator<Fault> iter = faultSet.iterateByStart();
iter.hasNext(); ) {
Fault fault = iter.next();
AgentFaultsResponse.FaultData data =
- new AgentFaultsResponse.FaultData(fault.spec(), state);
+ new AgentFaultsResponse.FaultData(fault.spec(), fault.state());
faultData.put(fault.id(), data);
}
}
@@ -326,12 +352,14 @@ public final class Agent {
JsonRestServer restServer =
new JsonRestServer(Node.Util.getTrogdorAgentPort(platform.curNode()));
AgentRestResource resource = new AgentRestResource();
- Agent agent = new Agent(platform, Time.SYSTEM, restServer, resource);
+ final Agent agent = new Agent(platform, Time.SYSTEM, restServer, resource);
restServer.start(resource);
Runtime.getRuntime().addShutdownHook(new Thread() {
@Override
public void run() {
- log.error("Agent shutting down...");
+ log.error("Running shutdown hook...");
+ agent.beginShutdown();
+ agent.waitForShutdown();
}
});
agent.waitForShutdown();
http://git-wip-us.apache.org/repos/asf/kafka/blob/4065ffb3/tools/src/main/java/org/apache/kafka/trogdor/basic/BasicPlatform.java
----------------------------------------------------------------------
diff --git a/tools/src/main/java/org/apache/kafka/trogdor/basic/BasicPlatform.java b/tools/src/main/java/org/apache/kafka/trogdor/basic/BasicPlatform.java
index 9bd2cb9..85270cd 100644
--- a/tools/src/main/java/org/apache/kafka/trogdor/basic/BasicPlatform.java
+++ b/tools/src/main/java/org/apache/kafka/trogdor/basic/BasicPlatform.java
@@ -23,6 +23,8 @@ import org.apache.kafka.common.utils.Utils;
import org.apache.kafka.trogdor.common.Node;
import org.apache.kafka.trogdor.common.Platform;
import org.apache.kafka.trogdor.common.Topology;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
import java.io.IOException;
@@ -30,6 +32,8 @@ import java.io.IOException;
* Defines a cluster topology
*/
public class BasicPlatform implements Platform {
+ private static final Logger log = LoggerFactory.getLogger(BasicPlatform.class);
+
private final Node curNode;
private final BasicTopology topology;
private final CommandRunner commandRunner;
@@ -41,7 +45,14 @@ public class BasicPlatform implements Platform {
public static class ShellCommandRunner implements CommandRunner {
@Override
public String run(Node curNode, String[] command) throws IOException {
- return Shell.execCommand(command);
+ try {
+ String result = Shell.execCommand(command);
+ log.info("RUN: {}. RESULT: [{}]", Utils.join(command, " "), result);
+ return result;
+ } catch (RuntimeException | IOException e) {
+ log.info("RUN: {}. ERROR: [{}]", Utils.join(command, " "), e.getMessage());
+ throw e;
+ }
}
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/4065ffb3/tools/src/main/java/org/apache/kafka/trogdor/coordinator/Coordinator.java
----------------------------------------------------------------------
diff --git a/tools/src/main/java/org/apache/kafka/trogdor/coordinator/Coordinator.java b/tools/src/main/java/org/apache/kafka/trogdor/coordinator/Coordinator.java
index 2759a36..8f3563b 100644
--- a/tools/src/main/java/org/apache/kafka/trogdor/coordinator/Coordinator.java
+++ b/tools/src/main/java/org/apache/kafka/trogdor/coordinator/Coordinator.java
@@ -27,10 +27,11 @@ import org.apache.kafka.common.utils.Time;
import org.apache.kafka.common.utils.Utils;
import org.apache.kafka.trogdor.common.Node;
import org.apache.kafka.trogdor.common.Platform;
+import org.apache.kafka.trogdor.fault.DoneState;
import org.apache.kafka.trogdor.fault.Fault;
import org.apache.kafka.trogdor.fault.FaultSet;
import org.apache.kafka.trogdor.fault.FaultSpec;
-import org.apache.kafka.trogdor.fault.FaultState;
+import org.apache.kafka.trogdor.fault.SendingState;
import org.apache.kafka.trogdor.rest.CoordinatorFaultsResponse;
import org.apache.kafka.trogdor.rest.CreateCoordinatorFaultRequest;
import org.apache.kafka.trogdor.rest.JsonRestServer;
@@ -115,9 +116,9 @@ public final class Coordinator {
private final FaultSet pendingFaults = new FaultSet();
/**
- * The set of faults which have been sent to the agents.
+ * The set of faults which have been sent to the NodeManagers.
*/
- private final FaultSet doneFaults = new FaultSet();
+ private final FaultSet processedFaults = new FaultSet();
class CoordinatorRunnable implements Runnable {
@Override
@@ -152,13 +153,13 @@ public final class Coordinator {
}
toStart.add(fault);
iter.remove();
- doneFaults.add(fault);
+ processedFaults.add(fault);
}
} finally {
lock.unlock();
}
for (Fault fault: toStart) {
- startFault(fault);
+ startFault(now, fault);
}
}
} catch (Throwable t) {
@@ -209,7 +210,7 @@ public final class Coordinator {
return this.restServer.port();
}
- private void startFault(Fault fault) {
+ private void startFault(long now, Fault fault) {
Set<String> affectedNodes = fault.targetNodes(platform.topology());
Set<NodeManager> affectedManagers = new HashSet<>();
Set<String> nonexistentNodes = new HashSet<>();
@@ -229,6 +230,11 @@ public final class Coordinator {
}
log.info("Applying fault {} on {} node(s): {}", fault.id(),
nodeNames.size(), Utils.join(nodeNames, ", "));
+ if (nodeNames.isEmpty()) {
+ fault.setState(new DoneState(now, ""));
+ } else {
+ fault.setState(new SendingState(nodeNames));
+ }
for (NodeManager nodeManager : affectedManagers) {
nodeManager.enqueueFault(fault);
}
@@ -261,8 +267,8 @@ public final class Coordinator {
Map<String, CoordinatorFaultsResponse.FaultData> faultData = new TreeMap<>();
lock.lock();
try {
- getFaultsImpl(faultData, pendingFaults, FaultState.PENDING);
- getFaultsImpl(faultData, doneFaults, FaultState.DONE);
+ getFaultsImpl(faultData, pendingFaults);
+ getFaultsImpl(faultData, processedFaults);
} finally {
lock.unlock();
}
@@ -270,12 +276,12 @@ public final class Coordinator {
}
private void getFaultsImpl(Map<String, CoordinatorFaultsResponse.FaultData> faultData,
- FaultSet faultSet, FaultState state) {
+ FaultSet faultSet) {
for (Iterator<Fault> iter = faultSet.iterateByStart();
iter.hasNext(); ) {
Fault fault = iter.next();
CoordinatorFaultsResponse.FaultData data =
- new CoordinatorFaultsResponse.FaultData(fault.spec(), state);
+ new CoordinatorFaultsResponse.FaultData(fault.spec(), fault.state());
faultData.put(fault.id(), data);
}
}
@@ -330,13 +336,15 @@ public final class Coordinator {
JsonRestServer restServer = new JsonRestServer(
Node.Util.getTrogdorCoordinatorPort(platform.curNode()));
CoordinatorRestResource resource = new CoordinatorRestResource();
- Coordinator coordinator = new Coordinator(platform, Time.SYSTEM,
+ final Coordinator coordinator = new Coordinator(platform, Time.SYSTEM,
restServer, resource);
restServer.start(resource);
Runtime.getRuntime().addShutdownHook(new Thread() {
@Override
public void run() {
- log.error("Coordinator shutting down...");
+ log.error("Running shutdown hook...");
+ coordinator.beginShutdown();
+ coordinator.waitForShutdown();
}
});
coordinator.waitForShutdown();
http://git-wip-us.apache.org/repos/asf/kafka/blob/4065ffb3/tools/src/main/java/org/apache/kafka/trogdor/coordinator/NodeManager.java
----------------------------------------------------------------------
diff --git a/tools/src/main/java/org/apache/kafka/trogdor/coordinator/NodeManager.java b/tools/src/main/java/org/apache/kafka/trogdor/coordinator/NodeManager.java
index 04d714c..ee71190 100644
--- a/tools/src/main/java/org/apache/kafka/trogdor/coordinator/NodeManager.java
+++ b/tools/src/main/java/org/apache/kafka/trogdor/coordinator/NodeManager.java
@@ -22,7 +22,9 @@ import org.apache.kafka.common.utils.Time;
import org.apache.kafka.trogdor.agent.AgentClient;
import org.apache.kafka.trogdor.common.Node;
import org.apache.kafka.trogdor.common.Platform;
+import org.apache.kafka.trogdor.fault.DoneState;
import org.apache.kafka.trogdor.fault.Fault;
+import org.apache.kafka.trogdor.fault.SendingState;
import org.apache.kafka.trogdor.rest.AgentStatusResponse;
import org.apache.kafka.trogdor.rest.CreateAgentFaultRequest;
import org.slf4j.Logger;
@@ -187,6 +189,10 @@ class NodeManager {
} finally {
lock.unlock();
}
+ SendingState state = (SendingState) fault.state();
+ if (state.completeSend(node.name())) {
+ fault.setState(new DoneState(now, ""));
+ }
return true;
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/4065ffb3/tools/src/main/java/org/apache/kafka/trogdor/fault/AbstractFault.java
----------------------------------------------------------------------
diff --git a/tools/src/main/java/org/apache/kafka/trogdor/fault/AbstractFault.java b/tools/src/main/java/org/apache/kafka/trogdor/fault/AbstractFault.java
new file mode 100644
index 0000000..2d63b82
--- /dev/null
+++ b/tools/src/main/java/org/apache/kafka/trogdor/fault/AbstractFault.java
@@ -0,0 +1,106 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.trogdor.fault;
+
+import org.apache.kafka.trogdor.common.JsonUtil;
+import org.apache.kafka.trogdor.common.Platform;
+import org.apache.kafka.trogdor.common.Topology;
+
+import java.util.Objects;
+import java.util.Set;
+
+public abstract class AbstractFault implements Fault {
+ private final String id;
+ private final FaultSpec spec;
+ private FaultState state;
+
+ public AbstractFault(String id, FaultSpec spec) {
+ this.id = id;
+ this.spec = spec;
+ this.state = new PendingState();
+ }
+
+ @Override
+ public final String id() {
+ return id;
+ }
+
+ @Override
+ public final FaultSpec spec() {
+ return spec;
+ }
+
+ @Override
+ public synchronized FaultState state() {
+ return state;
+ }
+
+ @Override
+ public synchronized void setState(FaultState state) {
+ this.state = state;
+ }
+
+ @Override
+ public final void activate(long now, Platform platform) throws Exception {
+ try {
+ handleActivation(now, platform);
+ setState(new RunningState(now));
+ } catch (Exception e) {
+ setState(new DoneState(now, e.getMessage()));
+ throw e;
+ }
+ }
+
+ protected abstract void handleActivation(long now, Platform platform) throws Exception;
+
+ @Override
+ public final void deactivate(long now, Platform platform) throws Exception {
+ try {
+ handleDeactivation(now, platform);
+ setState(new DoneState(now, ""));
+ } catch (Exception e) {
+ setState(new DoneState(now, e.getMessage()));
+ throw e;
+ }
+ }
+
+ protected abstract void handleDeactivation(long now, Platform platform) throws Exception;
+
+ @Override
+ public abstract Set<String> targetNodes(Topology topology);
+
+ @Override
+ public final boolean equals(Object o) {
+ if (this == o) return true;
+ if (o == null || getClass() != o.getClass()) return false;
+ return toString().equals(o.toString());
+ }
+
+ @Override
+ public final int hashCode() {
+ return Objects.hashCode(toString());
+ }
+
+ @Override
+ public final String toString() {
+ return getClass().getSimpleName() + "(id=" + id +
+ ", spec=" + JsonUtil.toJsonString(spec) +
+ ", state=" + JsonUtil.toJsonString(state()) +
+ ")";
+ }
+}
http://git-wip-us.apache.org/repos/asf/kafka/blob/4065ffb3/tools/src/main/java/org/apache/kafka/trogdor/fault/DoneState.java
----------------------------------------------------------------------
diff --git a/tools/src/main/java/org/apache/kafka/trogdor/fault/DoneState.java b/tools/src/main/java/org/apache/kafka/trogdor/fault/DoneState.java
new file mode 100644
index 0000000..222caf0
--- /dev/null
+++ b/tools/src/main/java/org/apache/kafka/trogdor/fault/DoneState.java
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.trogdor.fault;
+
+import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonProperty;
+
+/**
+ * The state a fault is in on the agent or controller when it is completed,
+ * either normally or with an error.
+ */
+public class DoneState extends FaultState {
+ private final long doneMs;
+ private final String errorStr;
+
+ @JsonCreator
+ public DoneState(@JsonProperty("doneMs") long doneMs,
+ @JsonProperty("errorStr") String errorStr) {
+ this.doneMs = doneMs;
+ this.errorStr = errorStr;
+ }
+
+ @JsonProperty
+ public long doneMs() {
+ return doneMs;
+ }
+
+ @JsonProperty
+ public String errorStr() {
+ return errorStr;
+ }
+}
http://git-wip-us.apache.org/repos/asf/kafka/blob/4065ffb3/tools/src/main/java/org/apache/kafka/trogdor/fault/Fault.java
----------------------------------------------------------------------
diff --git a/tools/src/main/java/org/apache/kafka/trogdor/fault/Fault.java b/tools/src/main/java/org/apache/kafka/trogdor/fault/Fault.java
index 9f1a19a..e44d56a 100644
--- a/tools/src/main/java/org/apache/kafka/trogdor/fault/Fault.java
+++ b/tools/src/main/java/org/apache/kafka/trogdor/fault/Fault.java
@@ -34,17 +34,37 @@ public interface Fault {
FaultSpec spec();
/**
- * Activate the fault.
+ * Get the current fault state. Thread-safe.
*/
- void activate(Platform platform) throws Exception;
+ FaultState state();
/**
- * Deactivate the fault.
+ * Set the current fault state. Thread-safe.
*/
- void deactivate(Platform platform) throws Exception;
+ void setState(FaultState state);
+
+ /**
+ * Activate the fault. Will transition into RunningState or DoneState.
+ *
+ * @param now The current time in ms.
+ * @param platform The platform to use.
+ */
+ void activate(long now, Platform platform) throws Exception;
+
+ /**
+ * Deactivate the fault. Will transition into DoneState.
+ *
+ * @param now The current time in ms.
+ * @param platform The platform to use.
+ */
+ void deactivate(long now, Platform platform) throws Exception;
/**
* Get the nodes which this fault is targetting.
+ *
+ * @param topology The topology to use.
+ *
+ * @return A set of target node names.
*/
Set<String> targetNodes(Topology topology);
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/4065ffb3/tools/src/main/java/org/apache/kafka/trogdor/fault/FaultState.java
----------------------------------------------------------------------
diff --git a/tools/src/main/java/org/apache/kafka/trogdor/fault/FaultState.java b/tools/src/main/java/org/apache/kafka/trogdor/fault/FaultState.java
index bec0792..cba8419 100644
--- a/tools/src/main/java/org/apache/kafka/trogdor/fault/FaultState.java
+++ b/tools/src/main/java/org/apache/kafka/trogdor/fault/FaultState.java
@@ -17,11 +17,35 @@
package org.apache.kafka.trogdor.fault;
-import com.fasterxml.jackson.annotation.JsonFormat;
+import com.fasterxml.jackson.annotation.JsonSubTypes;
+import com.fasterxml.jackson.annotation.JsonTypeInfo;
+import org.apache.kafka.trogdor.common.JsonUtil;
+import java.util.Objects;
-@JsonFormat(shape = JsonFormat.Shape.STRING)
-public enum FaultState {
- PENDING,
- RUNNING,
- DONE
+@JsonTypeInfo(use = JsonTypeInfo.Id.NAME,
+ include = JsonTypeInfo.As.PROPERTY,
+ property = "stateName")
+@JsonSubTypes({
+ @JsonSubTypes.Type(value = DoneState.class, name = "done"),
+ @JsonSubTypes.Type(value = PendingState.class, name = "pending"),
+ @JsonSubTypes.Type(value = RunningState.class, name = "running"),
+ @JsonSubTypes.Type(value = SendingState.class, name = "sending")
+ })
+public abstract class FaultState {
+ @Override
+ public final boolean equals(Object o) {
+ if (this == o) return true;
+ if (o == null || getClass() != o.getClass()) return false;
+ return toString().equals(o.toString());
+ }
+
+ @Override
+ public final int hashCode() {
+ return Objects.hashCode(toString());
+ }
+
+ @Override
+ public final String toString() {
+ return JsonUtil.toJsonString(this);
+ }
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/4065ffb3/tools/src/main/java/org/apache/kafka/trogdor/fault/NetworkPartitionFault.java
----------------------------------------------------------------------
diff --git a/tools/src/main/java/org/apache/kafka/trogdor/fault/NetworkPartitionFault.java b/tools/src/main/java/org/apache/kafka/trogdor/fault/NetworkPartitionFault.java
index 7524af1..cf3270a 100644
--- a/tools/src/main/java/org/apache/kafka/trogdor/fault/NetworkPartitionFault.java
+++ b/tools/src/main/java/org/apache/kafka/trogdor/fault/NetworkPartitionFault.java
@@ -17,7 +17,6 @@
package org.apache.kafka.trogdor.fault;
-import org.apache.kafka.trogdor.common.JsonUtil;
import org.apache.kafka.trogdor.common.Node;
import org.apache.kafka.trogdor.common.Platform;
import org.apache.kafka.trogdor.common.Topology;
@@ -28,23 +27,20 @@ import java.net.InetAddress;
import java.util.ArrayList;
import java.util.HashSet;
import java.util.List;
-import java.util.Objects;
import java.util.Set;
import java.util.TreeSet;
-public class NetworkPartitionFault implements Fault {
+public class NetworkPartitionFault extends AbstractFault {
private static final Logger log = LoggerFactory.getLogger(NetworkPartitionFault.class);
- private final String id;
- private final NetworkPartitionFaultSpec spec;
private final List<Set<String>> partitions;
public NetworkPartitionFault(String id, FaultSpec spec) {
- this.id = id;
- this.spec = (NetworkPartitionFaultSpec) spec;
+ super(id, spec);
+ NetworkPartitionFaultSpec faultSpec = (NetworkPartitionFaultSpec) spec;
this.partitions = new ArrayList<>();
HashSet<String> prevNodes = new HashSet<>();
- for (List<String> partition : this.spec.partitions()) {
+ for (List<String> partition : faultSpec.partitions()) {
for (String nodeName : partition) {
if (prevNodes.contains(nodeName)) {
throw new RuntimeException("Node " + nodeName +
@@ -57,23 +53,13 @@ public class NetworkPartitionFault implements Fault {
}
@Override
- public String id() {
- return id;
- }
-
- @Override
- public FaultSpec spec() {
- return spec;
- }
-
- @Override
- public void activate(Platform platform) throws Exception {
+ protected void handleActivation(long now, Platform platform) throws Exception {
log.info("Activating NetworkPartitionFault...");
runIptablesCommands(platform, "-A");
}
@Override
- public void deactivate(Platform platform) throws Exception {
+ protected void handleDeactivation(long now, Platform platform) throws Exception {
log.info("Deactivating NetworkPartitionFault...");
runIptablesCommands(platform, "-D");
}
@@ -107,24 +93,4 @@ public class NetworkPartitionFault implements Fault {
}
return targetNodes;
}
-
- @Override
- public boolean equals(Object o) {
- if (this == o) return true;
- if (o == null || getClass() != o.getClass()) return false;
- NetworkPartitionFault that = (NetworkPartitionFault) o;
- return Objects.equals(id, that.id) &&
- Objects.equals(spec, that.spec) &&
- Objects.equals(partitions, that.partitions);
- }
-
- @Override
- public int hashCode() {
- return Objects.hash(id, spec, partitions);
- }
-
- @Override
- public String toString() {
- return "NoOpFault(id=" + id + ", spec=" + JsonUtil.toJsonString(spec) + ")";
- }
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/4065ffb3/tools/src/main/java/org/apache/kafka/trogdor/fault/NoOpFault.java
----------------------------------------------------------------------
diff --git a/tools/src/main/java/org/apache/kafka/trogdor/fault/NoOpFault.java b/tools/src/main/java/org/apache/kafka/trogdor/fault/NoOpFault.java
index c7ac4de..70b4965 100644
--- a/tools/src/main/java/org/apache/kafka/trogdor/fault/NoOpFault.java
+++ b/tools/src/main/java/org/apache/kafka/trogdor/fault/NoOpFault.java
@@ -17,7 +17,6 @@
package org.apache.kafka.trogdor.fault;
-import org.apache.kafka.trogdor.common.JsonUtil;
import org.apache.kafka.trogdor.common.Node;
import org.apache.kafka.trogdor.common.Platform;
import org.apache.kafka.trogdor.common.Topology;
@@ -26,37 +25,22 @@ import org.slf4j.LoggerFactory;
import java.util.HashSet;
import java.util.Map;
-import java.util.Objects;
import java.util.Set;
-public class NoOpFault implements Fault {
+public class NoOpFault extends AbstractFault {
private static final Logger log = LoggerFactory.getLogger(NoOpFault.class);
- private final String id;
- private final FaultSpec spec;
-
public NoOpFault(String id, FaultSpec spec) {
- this.id = id;
- this.spec = spec;
- }
-
- @Override
- public String id() {
- return id;
+ super(id, spec);
}
@Override
- public FaultSpec spec() {
- return spec;
- }
-
- @Override
- public void activate(Platform platform) {
+ protected void handleActivation(long now, Platform platform) throws Exception {
log.info("Activating NoOpFault...");
}
@Override
- public void deactivate(Platform platform) {
+ protected void handleDeactivation(long now, Platform platform) throws Exception {
log.info("Deactivating NoOpFault...");
}
@@ -70,23 +54,4 @@ public class NoOpFault implements Fault {
}
return set;
}
-
- @Override
- public boolean equals(Object o) {
- if (this == o) return true;
- if (o == null || getClass() != o.getClass()) return false;
- NoOpFault that = (NoOpFault) o;
- return Objects.equals(id, that.id) &&
- Objects.equals(spec, that.spec);
- }
-
- @Override
- public int hashCode() {
- return Objects.hash(id, spec);
- }
-
- @Override
- public String toString() {
- return "NoOpFault(id=" + id + ", spec=" + JsonUtil.toJsonString(spec) + ")";
- }
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/4065ffb3/tools/src/main/java/org/apache/kafka/trogdor/fault/PendingState.java
----------------------------------------------------------------------
diff --git a/tools/src/main/java/org/apache/kafka/trogdor/fault/PendingState.java b/tools/src/main/java/org/apache/kafka/trogdor/fault/PendingState.java
new file mode 100644
index 0000000..57c8e88
--- /dev/null
+++ b/tools/src/main/java/org/apache/kafka/trogdor/fault/PendingState.java
@@ -0,0 +1,30 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.trogdor.fault;
+
+import com.fasterxml.jackson.annotation.JsonCreator;
+
+/**
+ * The state a fault is in on the agent or controller when we haven't yet done
+ * anything with it.
+ */
+public class PendingState extends FaultState {
+ @JsonCreator
+ public PendingState() {
+ }
+}
http://git-wip-us.apache.org/repos/asf/kafka/blob/4065ffb3/tools/src/main/java/org/apache/kafka/trogdor/fault/RunningState.java
----------------------------------------------------------------------
diff --git a/tools/src/main/java/org/apache/kafka/trogdor/fault/RunningState.java b/tools/src/main/java/org/apache/kafka/trogdor/fault/RunningState.java
new file mode 100644
index 0000000..1b81bf5
--- /dev/null
+++ b/tools/src/main/java/org/apache/kafka/trogdor/fault/RunningState.java
@@ -0,0 +1,38 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.trogdor.fault;
+
+import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonProperty;
+
+/**
+ * The state a fault is in on the agent when it is running.
+ */
+public class RunningState extends FaultState {
+ private final long startedMs;
+
+ @JsonCreator
+ public RunningState(@JsonProperty("startedMs") long startedMs) {
+ this.startedMs = startedMs;
+ }
+
+ @JsonProperty
+ public long startedMs() {
+ return startedMs;
+ }
+}
http://git-wip-us.apache.org/repos/asf/kafka/blob/4065ffb3/tools/src/main/java/org/apache/kafka/trogdor/fault/SendingState.java
----------------------------------------------------------------------
diff --git a/tools/src/main/java/org/apache/kafka/trogdor/fault/SendingState.java b/tools/src/main/java/org/apache/kafka/trogdor/fault/SendingState.java
new file mode 100644
index 0000000..edfbed2
--- /dev/null
+++ b/tools/src/main/java/org/apache/kafka/trogdor/fault/SendingState.java
@@ -0,0 +1,64 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.trogdor.fault;
+
+import com.fasterxml.jackson.annotation.JsonProperty;
+import org.apache.kafka.common.utils.Utils;
+
+import java.util.TreeMap;
+import java.util.Set;
+
+/**
+ * The state a fault is in on the controller when it is scheduled to be sent to several agents.
+ */
+public class SendingState extends FaultState {
+ private final TreeMap<String, Boolean> nodes;
+ private int remainingNodes;
+
+ public SendingState(@JsonProperty("nodeNames") Set<String> nodeNames) {
+ this.nodes = new TreeMap<>();
+ for (String nodeName : nodeNames) {
+ nodes.put(nodeName, false);
+ }
+ remainingNodes = nodeNames.size();
+ }
+
+ @JsonProperty
+ public synchronized Set<String> nodeNames() {
+ return nodes.keySet();
+ }
+
+ /**
+ * Complete a send operation.
+ *
+ * @param nodeName The name of the node we sent to.
+ * @return True if there are no more send operations left.
+ */
+ public synchronized boolean completeSend(String nodeName) {
+ if (!nodes.containsKey(nodeName)) {
+ throw new RuntimeException("Node " + nodeName + " was not to supposed to " +
+ "receive this fault. The fault was scheduled on nodes: " +
+ Utils.join(nodes.keySet(), ", "));
+ }
+ if (nodes.put(nodeName, true)) {
+ throw new RuntimeException("Node " + nodeName + " already received this fault.");
+ }
+ remainingNodes--;
+ return remainingNodes == 0;
+ }
+}
http://git-wip-us.apache.org/repos/asf/kafka/blob/4065ffb3/tools/src/main/java/org/apache/kafka/trogdor/rest/FaultDataMap.java
----------------------------------------------------------------------
diff --git a/tools/src/main/java/org/apache/kafka/trogdor/rest/FaultDataMap.java b/tools/src/main/java/org/apache/kafka/trogdor/rest/FaultDataMap.java
index 773d519..b2f7c91 100644
--- a/tools/src/main/java/org/apache/kafka/trogdor/rest/FaultDataMap.java
+++ b/tools/src/main/java/org/apache/kafka/trogdor/rest/FaultDataMap.java
@@ -38,7 +38,7 @@ public class FaultDataMap {
@JsonCreator
public FaultData(@JsonProperty("spec") FaultSpec spec,
- @JsonProperty("status") FaultState state) {
+ @JsonProperty("state") FaultState state) {
this.spec = spec;
this.state = state;
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/4065ffb3/tools/src/test/java/org/apache/kafka/trogdor/agent/AgentTest.java
----------------------------------------------------------------------
diff --git a/tools/src/test/java/org/apache/kafka/trogdor/agent/AgentTest.java b/tools/src/test/java/org/apache/kafka/trogdor/agent/AgentTest.java
index c587e44..53ef849 100644
--- a/tools/src/test/java/org/apache/kafka/trogdor/agent/AgentTest.java
+++ b/tools/src/test/java/org/apache/kafka/trogdor/agent/AgentTest.java
@@ -24,8 +24,9 @@ import org.apache.kafka.trogdor.basic.BasicPlatform;
import org.apache.kafka.trogdor.basic.BasicTopology;
import org.apache.kafka.trogdor.common.ExpectedFaults;
import org.apache.kafka.trogdor.common.Node;
-import org.apache.kafka.trogdor.fault.FaultState;
+import org.apache.kafka.trogdor.fault.DoneState;
import org.apache.kafka.trogdor.fault.NoOpFaultSpec;
+import org.apache.kafka.trogdor.fault.RunningState;
import org.apache.kafka.trogdor.rest.AgentFaultsResponse;
import org.apache.kafka.trogdor.rest.AgentStatusResponse;
import org.apache.kafka.trogdor.rest.CreateAgentFaultRequest;
@@ -129,36 +130,32 @@ public class AgentTest {
final NoOpFaultSpec fooSpec = new NoOpFaultSpec(10, 2);
client.putFault(new CreateAgentFaultRequest("foo", fooSpec));
- new ExpectedFaults().addFault("foo", FaultState.RUNNING).waitFor(client);
+ new ExpectedFaults().addFault("foo", new RunningState(0)).waitFor(client);
+
+ time.sleep(3);
+ new ExpectedFaults().addFault("foo", new DoneState(3, "")).waitFor(client);
final NoOpFaultSpec barSpec = new NoOpFaultSpec(20, 3);
client.putFault(new CreateAgentFaultRequest("bar", barSpec));
- time.sleep(11);
new ExpectedFaults().
- addFault("foo", FaultState.RUNNING).
- addFault("bar", FaultState.RUNNING).
+ addFault("foo", new DoneState(3, "")).
+ addFault("bar", new RunningState(3)).
waitFor(client);
- final NoOpFaultSpec bazSpec = new NoOpFaultSpec(1, 11);
+ time.sleep(4);
+ final NoOpFaultSpec bazSpec = new NoOpFaultSpec(1, 2);
client.putFault(new CreateAgentFaultRequest("baz", bazSpec));
new ExpectedFaults().
- addFault("foo", FaultState.RUNNING).
- addFault("bar", FaultState.RUNNING).
- addFault("baz", FaultState.RUNNING).
- waitFor(client);
-
- time.sleep(2);
- new ExpectedFaults().
- addFault("foo", FaultState.DONE).
- addFault("bar", FaultState.RUNNING).
- addFault("baz", FaultState.DONE).
+ addFault("foo", new DoneState(3, "")).
+ addFault("bar", new DoneState(7, "")).
+ addFault("baz", new RunningState(7)).
waitFor(client);
- time.sleep(100);
+ time.sleep(3);
new ExpectedFaults().
- addFault("foo", FaultState.DONE).
- addFault("bar", FaultState.DONE).
- addFault("baz", FaultState.DONE).
+ addFault("foo", new DoneState(3, "")).
+ addFault("bar", new DoneState(7, "")).
+ addFault("baz", new DoneState(10, "")).
waitFor(client);
agent.beginShutdown();
http://git-wip-us.apache.org/repos/asf/kafka/blob/4065ffb3/tools/src/test/java/org/apache/kafka/trogdor/coordinator/CoordinatorTest.java
----------------------------------------------------------------------
diff --git a/tools/src/test/java/org/apache/kafka/trogdor/coordinator/CoordinatorTest.java b/tools/src/test/java/org/apache/kafka/trogdor/coordinator/CoordinatorTest.java
index d7a8fa0..75109d2 100644
--- a/tools/src/test/java/org/apache/kafka/trogdor/coordinator/CoordinatorTest.java
+++ b/tools/src/test/java/org/apache/kafka/trogdor/coordinator/CoordinatorTest.java
@@ -27,9 +27,11 @@ import org.apache.kafka.trogdor.common.CapturingCommandRunner;
import org.apache.kafka.trogdor.common.ExpectedFaults;
import org.apache.kafka.trogdor.common.MiniTrogdorCluster;
-import org.apache.kafka.trogdor.fault.FaultState;
+import org.apache.kafka.trogdor.fault.DoneState;
import org.apache.kafka.trogdor.fault.NetworkPartitionFaultSpec;
import org.apache.kafka.trogdor.fault.NoOpFaultSpec;
+import org.apache.kafka.trogdor.fault.PendingState;
+import org.apache.kafka.trogdor.fault.RunningState;
import org.apache.kafka.trogdor.rest.CoordinatorStatusResponse;
import org.apache.kafka.trogdor.rest.CreateCoordinatorFaultRequest;
import org.junit.Rule;
@@ -69,16 +71,16 @@ public class CoordinatorTest {
build()) {
new ExpectedFaults().waitFor(cluster.coordinatorClient());
- NoOpFaultSpec noOpFaultSpec = new NoOpFaultSpec(10, 2);
+ NoOpFaultSpec noOpFaultSpec = new NoOpFaultSpec(1, 2);
cluster.coordinatorClient().putFault(
new CreateCoordinatorFaultRequest("fault1", noOpFaultSpec));
new ExpectedFaults().
- addFault("fault1", noOpFaultSpec, FaultState.PENDING).
+ addFault("fault1", noOpFaultSpec, new PendingState()).
waitFor(cluster.coordinatorClient());
- time.sleep(11);
+ time.sleep(2);
new ExpectedFaults().
- addFault("fault1", noOpFaultSpec, FaultState.DONE).
+ addFault("fault1", noOpFaultSpec, new DoneState(2, "")).
waitFor(cluster.coordinatorClient());
}
}
@@ -99,7 +101,7 @@ public class CoordinatorTest {
NoOpFaultSpec noOpFaultSpec = new NoOpFaultSpec(10, 2);
coordinatorClient.putFault(new CreateCoordinatorFaultRequest("fault1", noOpFaultSpec));
new ExpectedFaults().
- addFault("fault1", noOpFaultSpec, FaultState.PENDING).
+ addFault("fault1", noOpFaultSpec, new PendingState()).
waitFor(coordinatorClient);
new ExpectedFaults().
waitFor(agentClient1).
@@ -107,10 +109,10 @@ public class CoordinatorTest {
time.sleep(10);
new ExpectedFaults().
- addFault("fault1", noOpFaultSpec, FaultState.DONE).
+ addFault("fault1", noOpFaultSpec, new DoneState(10, "")).
waitFor(coordinatorClient);
new ExpectedFaults().
- addFault("fault1", noOpFaultSpec, FaultState.RUNNING).
+ addFault("fault1", noOpFaultSpec, new RunningState(10)).
waitFor(agentClient1).
waitFor(agentClient2);
}