You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by da...@apache.org on 2019/06/21 15:30:27 UTC
[kafka] branch trunk updated: KAFKA-8519 Add trogdor action to slow
down a network (#6912)
This is an automated email from the ASF dual-hosted git repository.
davidarthur pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new d7a5e31 KAFKA-8519 Add trogdor action to slow down a network (#6912)
d7a5e31 is described below
commit d7a5e31ca2f694fe0f5aaf8536ea30400fc19d3e
Author: David Arthur <mu...@gmail.com>
AuthorDate: Fri Jun 21 11:30:05 2019 -0400
KAFKA-8519 Add trogdor action to slow down a network (#6912)
This adds a new Trogdor fault spec for inducing network latency on a network device for system testing. It operates very similarly to the existing network partition spec by executing the `tc` linux utility.
---
.../trogdor/degraded_network_fault_spec.py | 37 +++++++
.../kafkatest/tests/core/round_trip_fault_test.py | 13 +++
.../trogdor/fault/DegradedNetworkFaultSpec.java | 77 ++++++++++++++
.../trogdor/fault/DegradedNetworkFaultWorker.java | 112 +++++++++++++++++++++
4 files changed, 239 insertions(+)
diff --git a/tests/kafkatest/services/trogdor/degraded_network_fault_spec.py b/tests/kafkatest/services/trogdor/degraded_network_fault_spec.py
new file mode 100644
index 0000000..450a787
--- /dev/null
+++ b/tests/kafkatest/services/trogdor/degraded_network_fault_spec.py
@@ -0,0 +1,37 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+from kafkatest.services.trogdor.task_spec import TaskSpec
+
+
+class DegradedNetworkFaultSpec(TaskSpec):
+ """
+ The specification for a network degradation fault.
+
+ Degrades the network so that traffic on a subset of nodes has higher latency
+ """
+
+ def __init__(self, start_ms, duration_ms, node_specs):
+ """
+ Create a new NetworkDegradeFaultSpec.
+
+ :param start_ms: The start time, as described in task_spec.py
+ :param duration_ms: The duration in milliseconds.
+ :param node_latencies: A dict of node name to desired latency
+ :param network_device: The name of the network device
+ """
+ super(DegradedNetworkFaultSpec, self).__init__(start_ms, duration_ms)
+ self.message["class"] = "org.apache.kafka.trogdor.fault.DegradedNetworkFaultSpec"
+ self.message["nodeSpecs"] = node_specs
diff --git a/tests/kafkatest/tests/core/round_trip_fault_test.py b/tests/kafkatest/tests/core/round_trip_fault_test.py
index 8bee6a1..64e9941 100644
--- a/tests/kafkatest/tests/core/round_trip_fault_test.py
+++ b/tests/kafkatest/tests/core/round_trip_fault_test.py
@@ -16,6 +16,7 @@
import time
from ducktape.tests.test import Test
from kafkatest.services.trogdor.network_partition_fault_spec import NetworkPartitionFaultSpec
+from kafkatest.services.trogdor.degraded_network_fault_spec import DegradedNetworkFaultSpec
from kafkatest.services.kafka import KafkaService
from kafkatest.services.trogdor.process_stop_fault_spec import ProcessStopFaultSpec
from kafkatest.services.trogdor.round_trip_workload import RoundTripWorkloadService, RoundTripWorkloadSpec
@@ -92,3 +93,15 @@ class RoundTripFaultTest(Test):
workload1.wait_for_done(timeout_sec=600)
stop1.stop()
stop1.wait_for_done()
+
+ def test_produce_consume_with_latency(self):
+ workload1 = self.trogdor.create_task("workload1", self.round_trip_spec)
+ time.sleep(2)
+ node_specs = {}
+ for node in self.kafka.nodes + self.zk.nodes:
+ node_specs[node.name] = {"latencyMs": 500, "networkDevice": "eth0"}
+ spec = DegradedNetworkFaultSpec(0, 60000, node_specs)
+ slow1 = self.trogdor.create_task("slow1", spec)
+ workload1.wait_for_done(timeout_sec=600)
+ slow1.stop()
+ slow1.wait_for_done()
diff --git a/tools/src/main/java/org/apache/kafka/trogdor/fault/DegradedNetworkFaultSpec.java b/tools/src/main/java/org/apache/kafka/trogdor/fault/DegradedNetworkFaultSpec.java
new file mode 100644
index 0000000..719d00b
--- /dev/null
+++ b/tools/src/main/java/org/apache/kafka/trogdor/fault/DegradedNetworkFaultSpec.java
@@ -0,0 +1,77 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.trogdor.fault;
+
+import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonProperty;
+import org.apache.kafka.trogdor.task.TaskController;
+import org.apache.kafka.trogdor.task.TaskSpec;
+import org.apache.kafka.trogdor.task.TaskWorker;
+
+import java.util.Collections;
+import java.util.Map;
+
+public class DegradedNetworkFaultSpec extends TaskSpec {
+
+ public static class NodeDegradeSpec {
+ private final String networkDevice;
+ private final int latencyMs;
+
+ public NodeDegradeSpec(
+ @JsonProperty("networkDevice") String networkDevice,
+ @JsonProperty("latencyMs") int latencyMs) {
+ this.networkDevice = networkDevice == null ? "" : networkDevice;
+ this.latencyMs = latencyMs;
+ }
+
+ @JsonProperty("networkDevice")
+ public String networkDevice() {
+ return networkDevice;
+ }
+
+ @JsonProperty("latencyMs")
+ public int latencyMs() {
+ return latencyMs;
+ }
+ }
+
+ private final Map<String, NodeDegradeSpec> nodeSpecs;
+
+ @JsonCreator
+ public DegradedNetworkFaultSpec(@JsonProperty("startMs") long startMs,
+ @JsonProperty("durationMs") long durationMs,
+ @JsonProperty("nodeSpecs") Map<String, NodeDegradeSpec> nodeSpecs) {
+ super(startMs, durationMs);
+ this.nodeSpecs = nodeSpecs == null ? Collections.emptyMap() : Collections.unmodifiableMap(nodeSpecs);
+ }
+
+ @Override
+ public TaskController newController(String id) {
+ return topology -> nodeSpecs.keySet();
+ }
+
+ @Override
+ public TaskWorker newTaskWorker(String id) {
+ return new DegradedNetworkFaultWorker(id, nodeSpecs);
+ }
+
+ @JsonProperty("nodeSpecs")
+ public Map<String, NodeDegradeSpec> nodeSpecs() {
+ return nodeSpecs;
+ }
+}
diff --git a/tools/src/main/java/org/apache/kafka/trogdor/fault/DegradedNetworkFaultWorker.java b/tools/src/main/java/org/apache/kafka/trogdor/fault/DegradedNetworkFaultWorker.java
new file mode 100644
index 0000000..1ccf4cf
--- /dev/null
+++ b/tools/src/main/java/org/apache/kafka/trogdor/fault/DegradedNetworkFaultWorker.java
@@ -0,0 +1,112 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.trogdor.fault;
+
+import com.fasterxml.jackson.databind.node.TextNode;
+import org.apache.kafka.common.internals.KafkaFutureImpl;
+import org.apache.kafka.trogdor.common.Node;
+import org.apache.kafka.trogdor.common.Platform;
+import org.apache.kafka.trogdor.task.TaskWorker;
+import org.apache.kafka.trogdor.task.WorkerStatusTracker;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.net.NetworkInterface;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Set;
+
+/**
+ * Uses the linux utility <pre>tc</pre> (traffic controller) to simulate latency on a specified network device
+ */
+public class DegradedNetworkFaultWorker implements TaskWorker {
+
+ private static final Logger log = LoggerFactory.getLogger(DegradedNetworkFaultWorker.class);
+
+ private final String id;
+ private final Map<String, DegradedNetworkFaultSpec.NodeDegradeSpec> nodeSpecs;
+ private WorkerStatusTracker status;
+
+ public DegradedNetworkFaultWorker(String id, Map<String, DegradedNetworkFaultSpec.NodeDegradeSpec> nodeSpecs) {
+ this.id = id;
+ this.nodeSpecs = nodeSpecs;
+ }
+
+ @Override
+ public void start(Platform platform, WorkerStatusTracker status, KafkaFutureImpl<String> haltFuture) throws Exception {
+ log.info("Activating DegradedNetworkFaultWorker {}.", id);
+ this.status = status;
+ this.status.update(new TextNode("enabling traffic control " + id));
+ Node curNode = platform.curNode();
+ DegradedNetworkFaultSpec.NodeDegradeSpec nodeSpec = nodeSpecs.get(curNode.name());
+ if (nodeSpec != null) {
+ for (String device : devicesForSpec(nodeSpec)) {
+ if (nodeSpec.latencyMs() < 0) {
+ throw new RuntimeException("Expected a positive value for latencyMs, but got " + nodeSpec.latencyMs());
+ } else {
+ enableTrafficControl(platform, device, nodeSpec.latencyMs());
+ }
+ }
+ }
+ this.status.update(new TextNode("enabled traffic control " + id));
+ }
+
+ @Override
+ public void stop(Platform platform) throws Exception {
+ log.info("Deactivating DegradedNetworkFaultWorker {}.", id);
+ this.status.update(new TextNode("disabling traffic control " + id));
+ Node curNode = platform.curNode();
+ DegradedNetworkFaultSpec.NodeDegradeSpec nodeSpec = nodeSpecs.get(curNode.name());
+ if (nodeSpec != null) {
+ for (String device : devicesForSpec(nodeSpec)) {
+ disableTrafficControl(platform, device);
+ }
+ }
+ this.status.update(new TextNode("disabled traffic control " + id));
+ }
+
+ private Set<String> devicesForSpec(DegradedNetworkFaultSpec.NodeDegradeSpec nodeSpec) throws Exception {
+ Set<String> devices = new HashSet<>();
+ if (nodeSpec.networkDevice().isEmpty()) {
+ for (NetworkInterface networkInterface : Collections.list(NetworkInterface.getNetworkInterfaces())) {
+ if (!networkInterface.isLoopback()) {
+ devices.add(networkInterface.getName());
+ }
+ }
+ } else {
+ devices.add(nodeSpec.networkDevice());
+ }
+ return devices;
+ }
+
+ private void enableTrafficControl(Platform platform, String networkDevice, int delayMs) throws IOException {
+ int deviationMs = Math.max(1, (int) Math.sqrt(delayMs));
+ platform.runCommand(new String[] {
+ "sudo", "tc", "qdisc", "add", "dev", networkDevice, "root", "netem",
+ "delay", String.format("%dms", delayMs), String.format("%dms", deviationMs), "distribution", "normal"
+ });
+ }
+
+ private void disableTrafficControl(Platform platform, String networkDevice) throws IOException {
+ platform.runCommand(new String[] {
+ "sudo", "tc", "qdisc", "del", "dev", networkDevice, "root"
+ });
+ }
+}