You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by da...@apache.org on 2019/06/21 15:30:27 UTC

[kafka] branch trunk updated: KAFKA-8519 Add trogdor action to slow down a network (#6912)

This is an automated email from the ASF dual-hosted git repository.

davidarthur pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new d7a5e31  KAFKA-8519 Add trogdor action to slow down a network (#6912)
d7a5e31 is described below

commit d7a5e31ca2f694fe0f5aaf8536ea30400fc19d3e
Author: David Arthur <mu...@gmail.com>
AuthorDate: Fri Jun 21 11:30:05 2019 -0400

    KAFKA-8519 Add trogdor action to slow down a network (#6912)
    
    This adds a new Trogdor fault spec for inducing network latency on a network device for system testing. It operates very similarly to the existing network partition spec by executing the `tc` linux utility.
---
 .../trogdor/degraded_network_fault_spec.py         |  37 +++++++
 .../kafkatest/tests/core/round_trip_fault_test.py  |  13 +++
 .../trogdor/fault/DegradedNetworkFaultSpec.java    |  77 ++++++++++++++
 .../trogdor/fault/DegradedNetworkFaultWorker.java  | 112 +++++++++++++++++++++
 4 files changed, 239 insertions(+)

diff --git a/tests/kafkatest/services/trogdor/degraded_network_fault_spec.py b/tests/kafkatest/services/trogdor/degraded_network_fault_spec.py
new file mode 100644
index 0000000..450a787
--- /dev/null
+++ b/tests/kafkatest/services/trogdor/degraded_network_fault_spec.py
@@ -0,0 +1,37 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+from kafkatest.services.trogdor.task_spec import TaskSpec
+
+
+class DegradedNetworkFaultSpec(TaskSpec):
+    """
+    The specification for a network degradation fault.
+
+    Degrades the network so that traffic on a subset of nodes has higher latency
+    """
+
+    def __init__(self, start_ms, duration_ms, node_specs):
+        """
+        Create a new NetworkDegradeFaultSpec.
+
+        :param start_ms:        The start time, as described in task_spec.py
+        :param duration_ms:     The duration in milliseconds.
+        :param node_latencies:  A dict of node name to desired latency
+        :param network_device:  The name of the network device
+        """
+        super(DegradedNetworkFaultSpec, self).__init__(start_ms, duration_ms)
+        self.message["class"] = "org.apache.kafka.trogdor.fault.DegradedNetworkFaultSpec"
+        self.message["nodeSpecs"] = node_specs
diff --git a/tests/kafkatest/tests/core/round_trip_fault_test.py b/tests/kafkatest/tests/core/round_trip_fault_test.py
index 8bee6a1..64e9941 100644
--- a/tests/kafkatest/tests/core/round_trip_fault_test.py
+++ b/tests/kafkatest/tests/core/round_trip_fault_test.py
@@ -16,6 +16,7 @@
 import time
 from ducktape.tests.test import Test
 from kafkatest.services.trogdor.network_partition_fault_spec import NetworkPartitionFaultSpec
+from kafkatest.services.trogdor.degraded_network_fault_spec import DegradedNetworkFaultSpec
 from kafkatest.services.kafka import KafkaService
 from kafkatest.services.trogdor.process_stop_fault_spec import ProcessStopFaultSpec
 from kafkatest.services.trogdor.round_trip_workload import RoundTripWorkloadService, RoundTripWorkloadSpec
@@ -92,3 +93,15 @@ class RoundTripFaultTest(Test):
         workload1.wait_for_done(timeout_sec=600)
         stop1.stop()
         stop1.wait_for_done()
+
+    def test_produce_consume_with_latency(self):
+        workload1 = self.trogdor.create_task("workload1", self.round_trip_spec)
+        time.sleep(2)
+        node_specs = {}
+        for node in self.kafka.nodes + self.zk.nodes:
+            node_specs[node.name] = {"latencyMs": 500, "networkDevice": "eth0"}
+        spec = DegradedNetworkFaultSpec(0, 60000, node_specs)
+        slow1 = self.trogdor.create_task("slow1", spec)
+        workload1.wait_for_done(timeout_sec=600)
+        slow1.stop()
+        slow1.wait_for_done()
diff --git a/tools/src/main/java/org/apache/kafka/trogdor/fault/DegradedNetworkFaultSpec.java b/tools/src/main/java/org/apache/kafka/trogdor/fault/DegradedNetworkFaultSpec.java
new file mode 100644
index 0000000..719d00b
--- /dev/null
+++ b/tools/src/main/java/org/apache/kafka/trogdor/fault/DegradedNetworkFaultSpec.java
@@ -0,0 +1,77 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.trogdor.fault;
+
+import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonProperty;
+import org.apache.kafka.trogdor.task.TaskController;
+import org.apache.kafka.trogdor.task.TaskSpec;
+import org.apache.kafka.trogdor.task.TaskWorker;
+
+import java.util.Collections;
+import java.util.Map;
+
+public class DegradedNetworkFaultSpec extends TaskSpec {
+
+    public static class NodeDegradeSpec {
+        private final String networkDevice;
+        private final int latencyMs;
+
+        public NodeDegradeSpec(
+                @JsonProperty("networkDevice") String networkDevice,
+                @JsonProperty("latencyMs") int latencyMs) {
+            this.networkDevice = networkDevice == null ? "" : networkDevice;
+            this.latencyMs = latencyMs;
+        }
+
+        @JsonProperty("networkDevice")
+        public String networkDevice() {
+            return networkDevice;
+        }
+
+        @JsonProperty("latencyMs")
+        public int latencyMs() {
+            return latencyMs;
+        }
+    }
+
+    private final Map<String, NodeDegradeSpec> nodeSpecs;
+
+    @JsonCreator
+    public DegradedNetworkFaultSpec(@JsonProperty("startMs") long startMs,
+                                    @JsonProperty("durationMs") long durationMs,
+                                    @JsonProperty("nodeSpecs") Map<String, NodeDegradeSpec> nodeSpecs) {
+        super(startMs, durationMs);
+        this.nodeSpecs = nodeSpecs == null ? Collections.emptyMap() : Collections.unmodifiableMap(nodeSpecs);
+    }
+
+    @Override
+    public TaskController newController(String id) {
+        return topology -> nodeSpecs.keySet();
+    }
+
+    @Override
+    public TaskWorker newTaskWorker(String id) {
+        return new DegradedNetworkFaultWorker(id, nodeSpecs);
+    }
+
+    @JsonProperty("nodeSpecs")
+    public Map<String, NodeDegradeSpec> nodeSpecs() {
+        return nodeSpecs;
+    }
+}
diff --git a/tools/src/main/java/org/apache/kafka/trogdor/fault/DegradedNetworkFaultWorker.java b/tools/src/main/java/org/apache/kafka/trogdor/fault/DegradedNetworkFaultWorker.java
new file mode 100644
index 0000000..1ccf4cf
--- /dev/null
+++ b/tools/src/main/java/org/apache/kafka/trogdor/fault/DegradedNetworkFaultWorker.java
@@ -0,0 +1,112 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.trogdor.fault;
+
+import com.fasterxml.jackson.databind.node.TextNode;
+import org.apache.kafka.common.internals.KafkaFutureImpl;
+import org.apache.kafka.trogdor.common.Node;
+import org.apache.kafka.trogdor.common.Platform;
+import org.apache.kafka.trogdor.task.TaskWorker;
+import org.apache.kafka.trogdor.task.WorkerStatusTracker;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.net.NetworkInterface;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Set;
+
+/**
+ * Uses the linux utility <pre>tc</pre> (traffic controller) to simulate latency on a specified network device
+ */
+public class DegradedNetworkFaultWorker implements TaskWorker {
+
+    private static final Logger log = LoggerFactory.getLogger(DegradedNetworkFaultWorker.class);
+
+    private final String id;
+    private final Map<String, DegradedNetworkFaultSpec.NodeDegradeSpec> nodeSpecs;
+    private WorkerStatusTracker status;
+
+    public DegradedNetworkFaultWorker(String id, Map<String, DegradedNetworkFaultSpec.NodeDegradeSpec> nodeSpecs) {
+        this.id = id;
+        this.nodeSpecs = nodeSpecs;
+    }
+
+    @Override
+    public void start(Platform platform, WorkerStatusTracker status, KafkaFutureImpl<String> haltFuture) throws Exception {
+        log.info("Activating DegradedNetworkFaultWorker {}.", id);
+        this.status = status;
+        this.status.update(new TextNode("enabling traffic control " + id));
+        Node curNode = platform.curNode();
+        DegradedNetworkFaultSpec.NodeDegradeSpec nodeSpec = nodeSpecs.get(curNode.name());
+        if (nodeSpec != null) {
+            for (String device : devicesForSpec(nodeSpec)) {
+                if (nodeSpec.latencyMs() < 0) {
+                    throw new RuntimeException("Expected a positive value for latencyMs, but got " + nodeSpec.latencyMs());
+                } else {
+                    enableTrafficControl(platform, device, nodeSpec.latencyMs());
+                }
+            }
+        }
+        this.status.update(new TextNode("enabled traffic control " + id));
+    }
+
+    @Override
+    public void stop(Platform platform) throws Exception {
+        log.info("Deactivating DegradedNetworkFaultWorker {}.", id);
+        this.status.update(new TextNode("disabling traffic control " + id));
+        Node curNode = platform.curNode();
+        DegradedNetworkFaultSpec.NodeDegradeSpec nodeSpec = nodeSpecs.get(curNode.name());
+        if (nodeSpec != null) {
+            for (String device : devicesForSpec(nodeSpec)) {
+                disableTrafficControl(platform, device);
+            }
+        }
+        this.status.update(new TextNode("disabled traffic control " + id));
+    }
+
+    private Set<String> devicesForSpec(DegradedNetworkFaultSpec.NodeDegradeSpec nodeSpec) throws Exception {
+        Set<String> devices = new HashSet<>();
+        if (nodeSpec.networkDevice().isEmpty()) {
+            for (NetworkInterface networkInterface : Collections.list(NetworkInterface.getNetworkInterfaces())) {
+                if (!networkInterface.isLoopback()) {
+                    devices.add(networkInterface.getName());
+                }
+            }
+        } else {
+            devices.add(nodeSpec.networkDevice());
+        }
+        return devices;
+    }
+
+    private void enableTrafficControl(Platform platform, String networkDevice, int delayMs) throws IOException {
+        int deviationMs = Math.max(1, (int) Math.sqrt(delayMs));
+        platform.runCommand(new String[] {
+            "sudo", "tc", "qdisc", "add", "dev", networkDevice, "root", "netem",
+            "delay", String.format("%dms", delayMs), String.format("%dms", deviationMs), "distribution", "normal"
+        });
+    }
+
+    private void disableTrafficControl(Platform platform, String networkDevice) throws IOException {
+        platform.runCommand(new String[] {
+            "sudo", "tc", "qdisc", "del", "dev", networkDevice, "root"
+        });
+    }
+}