You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@mesos.apache.org by ji...@apache.org on 2015/10/26 19:40:30 UTC
[06/12] mesos git commit: Relocated MesosContainerizer specific files
to the correct location.
http://git-wip-us.apache.org/repos/asf/mesos/blob/9a722d74/src/slave/containerizer/mesos/isolators/network/port_mapping.cpp
----------------------------------------------------------------------
diff --git a/src/slave/containerizer/mesos/isolators/network/port_mapping.cpp b/src/slave/containerizer/mesos/isolators/network/port_mapping.cpp
new file mode 100644
index 0000000..565f9cc
--- /dev/null
+++ b/src/slave/containerizer/mesos/isolators/network/port_mapping.cpp
@@ -0,0 +1,3792 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include <limits.h>
+#include <string.h>
+#include <unistd.h>
+
+#include <iostream>
+#include <vector>
+
+#include <glog/logging.h>
+
+#include <mesos/mesos.hpp>
+
+#include <process/collect.hpp>
+#include <process/defer.hpp>
+#include <process/io.hpp>
+#include <process/pid.hpp>
+#include <process/subprocess.hpp>
+
+#include <stout/error.hpp>
+#include <stout/foreach.hpp>
+#include <stout/fs.hpp>
+#include <stout/hashset.hpp>
+#include <stout/json.hpp>
+#include <stout/lambda.hpp>
+#include <stout/mac.hpp>
+#include <stout/multihashmap.hpp>
+#include <stout/numify.hpp>
+#include <stout/os.hpp>
+#include <stout/option.hpp>
+#include <stout/path.hpp>
+#include <stout/protobuf.hpp>
+#include <stout/result.hpp>
+#include <stout/stringify.hpp>
+#include <stout/strings.hpp>
+#include <stout/utils.hpp>
+
+#include <stout/os/exists.hpp>
+#include <stout/os/stat.hpp>
+
+#include "common/status_utils.hpp"
+
+#include "linux/fs.hpp"
+#include "linux/ns.hpp"
+
+#include "linux/routing/route.hpp"
+#include "linux/routing/utils.hpp"
+
+#include "linux/routing/diagnosis/diagnosis.hpp"
+
+#include "linux/routing/filter/basic.hpp"
+#include "linux/routing/filter/icmp.hpp"
+#include "linux/routing/filter/ip.hpp"
+
+#include "linux/routing/handle.hpp"
+
+#include "linux/routing/link/link.hpp"
+
+#include "linux/routing/queueing/fq_codel.hpp"
+#include "linux/routing/queueing/htb.hpp"
+#include "linux/routing/queueing/ingress.hpp"
+#include "linux/routing/queueing/statistics.hpp"
+
+#include "mesos/resources.hpp"
+
+#include "slave/constants.hpp"
+
+#include "slave/containerizer/mesos/isolators/network/port_mapping.hpp"
+
+using namespace mesos::internal;
+
+using namespace process;
+
+using namespace routing;
+using namespace routing::filter;
+using namespace routing::queueing;
+using namespace routing::queueing::statistics;
+
+using std::cerr;
+using std::cout;
+using std::dec;
+using std::endl;
+using std::hex;
+using std::list;
+using std::ostringstream;
+using std::set;
+using std::sort;
+using std::string;
+using std::vector;
+
+using filter::ip::PortRange;
+
+using mesos::slave::ContainerLimitation;
+using mesos::slave::ContainerPrepareInfo;
+using mesos::slave::ContainerState;
+using mesos::slave::Isolator;
+
+// An old glibc might not have this symbol.
+#ifndef MNT_DETACH
+#define MNT_DETACH 2
+#endif
+
+namespace mesos {
+namespace internal {
+namespace slave {
+
+// The minimum number of ephemeral ports a container should have.
+static const uint16_t MIN_EPHEMERAL_PORTS_SIZE = 16;
+
+// Linux traffic control is a combination of queueing disciplines,
+// filters and classes organized as a tree for the ingress (tx) and
+// egress (rx) flows for each interface. Each container provides two
+// networking interfaces, a virtual eth0 and a loopback interface. The
+// flow of packets from the external network to container is shown
+// below:
+//
+// +----------------------+----------------------+
+// | Container |
+// |----------------------|----------------------|
+// | eth0 | lo |
+// +----------------------+----------------------+
+// ^ | ^ |
+// [3] | | [4] | |
+// | | [7] +-----------+ [10]
+// | |
+// | | [8] +-----------+ [9]
+// [2] | | [5] | |
+// | v v v
+// +----------------------+----------------------+
+// | veth0 | lo |
+// +----------------------|----------------------+
+// | Host |
+// |----------------------|----------------------|
+// | eth0 |
+// +----------------------+----------------------|
+// ^ |
+// [1] | | [6]
+// | v
+//
+// Traffic flowing from outside the network into a container enters
+// the system via the host ingress interface [1] and is routed based
+// on destination port to the outbound interface for the matching
+// container [2], which forwards the packet to the container's inbound
+// virtual interface. Outbound traffic destined for the external
+// network flows along the reverse path [4,5,6]. Loopback traffic is
+// directed to the corresponding Ethernet interface, either [7,10] or
+// [8,9] where the same destination port routing can be applied as to
+// external traffic. We use traffic control filters at several of the
+// interfaces to create these packet paths.
+//
+// Linux provides only a very simple topology for ingress interfaces.
+// A root is provided on a fixed handle (handle::INGRESS_ROOT) under
+// which a single qdisc can be installed, with handle ingress::HANDLE.
+// Traffic control filters can then be attached to the ingress qdisc.
+// We install one or more ingress filters on the host eth0 [1] to
+// direct traffic to the correct container, and on the container
+// virtual eth0 [5] to direct traffic to other containers or out of
+// the box. Since we know the ip port assignments for each container,
+// we can direct traffic directly to the appropriate container.
+// However, for ICMP and ARP traffic where no equivalent to a port
+// exists, we send a copy of the packet to every container and rely on
+// the network stack to drop unexpected packets.
+//
+// We install a Hierarchical Token Bucket (HTB) qdisc and class to
+// limit the outbound traffic bandwidth as the egress qdisc inside the
+// container [4] and then add a fq_codel qdisc to limit head of line
+// blocking on the egress filter. The egress traffic control chain is
+// thus:
+//
+// root device: handle::EGRESS_ROOT ->
+// htb egress qdisc: CONTAINER_TX_HTB_HANDLE ->
+// htb rate limiting class: CONTAINER_TX_HTB_CLASS_ID ->
+// buffer-bloat reduction: FQ_CODEL
+constexpr Handle CONTAINER_TX_HTB_HANDLE = Handle(1, 0);
+constexpr Handle CONTAINER_TX_HTB_CLASS_ID =
+ Handle(CONTAINER_TX_HTB_HANDLE, 1);
+
+
+// Finally we create a second fq_codel qdisc on the public interface
+// of the host [6] to reduce performance interference between
+// containers. We create independent flows for each container, and
+// one for the host, which ensures packets from each container are
+// guaranteed fair access to the host interface. This egress traffic
+// control chain for the host interface is thus:
+//
+// root device: handle::EGRESS_ROOT ->
+// buffer-bloat reduction: FQ_CODEL
+constexpr Handle HOST_TX_FQ_CODEL_HANDLE = Handle(1, 0);
+
+
+// The primary priority used by each type of filter.
+static const uint8_t ARP_FILTER_PRIORITY = 1;
+static const uint8_t ICMP_FILTER_PRIORITY = 2;
+static const uint8_t IP_FILTER_PRIORITY = 3;
+static const uint8_t DEFAULT_FILTER_PRIORITY = 4;
+
+
+// The secondary priorities used by filters.
+static const uint8_t HIGH = 1;
+static const uint8_t NORMAL = 2;
+static const uint8_t LOW = 3;
+
+
+// We assign a separate flow on host eth0 egress for each container
+// (See MESOS-2422 for details). Host egress traffic is assigned to a
+// reserved flow (HOST_FLOWID). ARP and ICMP traffic from containers
+// are not heavy, so they can share the same flow.
+static const uint16_t HOST_FLOWID = 1;
+static const uint16_t ARP_FLOWID = 2;
+static const uint16_t ICMP_FLOWID = 2;
+static const uint16_t CONTAINER_MIN_FLOWID = 3;
+
+
+// The well known ports. Used for sanity check.
+static Interval<uint16_t> WELL_KNOWN_PORTS()
+{
+ return (Bound<uint16_t>::closed(0), Bound<uint16_t>::open(1024));
+}
+
+
+/////////////////////////////////////////////////
+// Helper functions for the isolator.
+/////////////////////////////////////////////////
+
+// Given an integer x, find the largest integer t such that t <= x and
+// t is aligned to power of 2.
+static uint32_t roundDownToPowerOfTwo(uint32_t x)
+{
+ // Mutate x from 00001XXX to 0x00001111.
+
+ // We know the MSB has to be a 1, so kill the LSB and make sure the
+ // first 2 most significant bits are 1s.
+ x = x | (x >> 1);
+
+ // Now that the 2 most significant bits are 1s, make sure the first
+ // 4 most significant bits are 1s, too.
+ x = x | (x >> 2);
+
+ // We keep going. Note that the 0s left to the MSB are never turned
+ // to 1s.
+ x = x | (x >> 4);
+ x = x | (x >> 8);
+
+ // Now we have covered all 32 bits.
+ x = x | (x >> 16);
+
+ // 0x00001111 - (0x00001111 >> 1)
+ return x - (x >> 1);
+}
+
+
+// Returns the name of the host end of the virtual ethernet pair for a
+// given container. The kernel restricts link name to 16 characters or
+// less, so we cannot put container ID into the device name. Instead,
+// we use the pid of the executor process forked by the slave to
+// uniquely name the device for each container. It's safe because we
+// cannot have two active containers having the same pid for the
+// executor process.
+static string veth(pid_t pid)
+{
+ return PORT_MAPPING_VETH_PREFIX() + stringify(pid);
+}
+
+
+// Extracts the pid from the given veth name.
+static Option<pid_t> getPidFromVeth(const string& veth)
+{
+ if (strings::startsWith(veth, PORT_MAPPING_VETH_PREFIX())) {
+ Try<pid_t> pid = numify<pid_t>(
+ strings::remove(veth, PORT_MAPPING_VETH_PREFIX(), strings::PREFIX));
+
+ if (pid.isSome()) {
+ return pid.get();
+ }
+ }
+
+ return None();
+}
+
+
+// Extracts the container ID from the symlink that points to the
+// network namespace handle. The following is the layout of the bind
+// mount root and bind mount symlink root:
+// <PORT_MAPPING_BIND_MOUNT_ROOT()>
+// |--- 3945 (pid) <-|
+// |
+// <PORT_MAPPING_BIND_MOUNT_SYMLINK_ROOT()> |
+// |--- ecf293e7-e6e8-4cbc-aaee-4d6c958aa276 --|
+// (symlink: container ID -> pid)
+static Try<ContainerID> getContainerIdFromSymlink(const string& symlink)
+{
+ if (!os::stat::islink(symlink)) {
+ return Error("Not a symlink");
+ }
+
+ string _containerId = Path(symlink).basename();
+
+ ContainerID containerId;
+ containerId.set_value(_containerId);
+
+ return containerId;
+}
+
+
+// Extracts the pid from the network namespace handle. Returns None if
+// the handle is clearly not created by us.
+static Result<pid_t> getPidFromNamespaceHandle(const string& handle)
+{
+ if (os::stat::islink(handle)) {
+ return Error("Not expecting a symlink");
+ }
+
+ string _pid = Path(handle).basename();
+
+ Try<pid_t> pid = numify<pid_t>(_pid);
+ if (pid.isError()) {
+ return None();
+ }
+
+ return pid.get();
+}
+
+
+// Extracts the pid from the symlink that points to the network
+// namespace handle. Returns None if it's a dangling symlink.
+static Result<pid_t> getPidFromSymlink(const string& symlink)
+{
+ if (!os::stat::islink(symlink)) {
+ return Error("Not a symlink");
+ }
+
+ Result<string> target = os::realpath(symlink);
+ if (target.isError()) {
+ return Error("Failed to follow the symlink: " + target.error());
+ } else if (target.isNone()) {
+ // This is a dangling symlink.
+ return None();
+ }
+
+ return getPidFromNamespaceHandle(target.get());
+}
+
+
+static string getSymlinkPath(const ContainerID& containerId)
+{
+ return path::join(
+ PORT_MAPPING_BIND_MOUNT_SYMLINK_ROOT(),
+ stringify(containerId));
+}
+
+
+static string getNamespaceHandlePath(pid_t pid)
+{
+ return path::join(
+ PORT_MAPPING_BIND_MOUNT_ROOT(),
+ stringify(pid));
+}
+
+
+// Converts from value ranges to interval set.
+static IntervalSet<uint16_t> getIntervalSet(const Value::Ranges& ranges)
+{
+ IntervalSet<uint16_t> set;
+
+ for (int i = 0; i < ranges.range_size(); i++) {
+ set += (Bound<uint16_t>::closed(ranges.range(i).begin()),
+ Bound<uint16_t>::closed(ranges.range(i).end()));
+ }
+
+ return set;
+}
+
+/////////////////////////////////////////////////
+// Implementation for PortMappingUpdate.
+/////////////////////////////////////////////////
+
+const char* PortMappingUpdate::NAME = "update";
+
+
+PortMappingUpdate::Flags::Flags()
+{
+ add(ð0_name,
+ "eth0_name",
+ "The name of the public network interface (e.g., eth0)");
+
+ add(&lo_name,
+ "lo_name",
+ "The name of the loopback network interface (e.g., lo)");
+
+ add(&pid,
+ "pid",
+ "The pid of the process whose namespaces we will enter");
+
+ add(&ports_to_add,
+ "ports_to_add",
+ "A collection of port ranges (formatted as a JSON object)\n"
+ "for which to add IP filters. E.g.,\n"
+ "--ports_to_add={\"range\":[{\"begin\":4,\"end\":8}]}");
+
+ add(&ports_to_remove,
+ "ports_to_remove",
+ "A collection of port ranges (formatted as a JSON object)\n"
+ "for which to remove IP filters. E.g.,\n"
+ "--ports_to_remove={\"range\":[{\"begin\":4,\"end\":8}]}");
+}
+
+
+// The following two helper functions allow us to convert from a
+// collection of port ranges to a JSON object and vice versa. They
+// will be used for the port mapping update operation.
+template <typename Iterable>
+JSON::Object json(const Iterable& ranges)
+{
+ Value::Ranges values;
+ foreach (const PortRange& range, ranges) {
+ Value::Range value;
+ value.set_begin(range.begin());
+ value.set_end(range.end());
+
+ values.add_range()->CopyFrom(value);
+ }
+ return JSON::Protobuf(values);
+}
+
+
+static Try<vector<PortRange>> parse(const JSON::Object& object)
+{
+ Try<Value::Ranges> parsing = protobuf::parse<Value::Ranges>(object);
+ if (parsing.isError()) {
+ return Error("Failed to parse JSON: " + parsing.error());
+ }
+
+ vector<PortRange> ranges;
+ Value::Ranges values = parsing.get();
+ for (int i = 0; i < values.range_size(); i++) {
+ const Value::Range& value = values.range(i);
+ Try<PortRange> range = PortRange::fromBeginEnd(value.begin(), value.end());
+ if (range.isError()) {
+ return Error("Invalid port range: " + range.error());
+ }
+
+ ranges.push_back(range.get());
+ }
+ return ranges;
+}
+
+
+// Helper function to set up IP filters inside the container for a
+// given port range.
+static Try<Nothing> addContainerIPFilters(
+ const PortRange& range,
+ const string& eth0,
+ const string& lo)
+{
+ // Add an IP packet filter on lo such that local traffic inside a
+ // container will not be redirected to eth0.
+ Try<bool> loTerminal = filter::ip::create(
+ lo,
+ ingress::HANDLE,
+ ip::Classifier(None(), None(), None(), range),
+ Priority(IP_FILTER_PRIORITY, HIGH),
+ action::Terminal());
+
+ if (loTerminal.isError()) {
+ return Error(
+ "Failed to create an IP packet filter on " + lo +
+ " which stops packets from being sent to " + eth0 +
+ ": " + loTerminal.error());
+ } else if (!loTerminal.get()) {
+ return Error(
+ "The IP packet filter on " + lo +
+ " which stops packets from being sent to " +
+ eth0 + " already exists");
+ }
+
+ // Add an IP packet filter (for loopback IP) from eth0 to lo to
+ // redirect all loopback IP traffic to lo.
+ Try<bool> eth0ToLoLoopback = filter::ip::create(
+ eth0,
+ ingress::HANDLE,
+ ip::Classifier(
+ None(),
+ net::IPNetwork::LOOPBACK_V4().address(),
+ None(),
+ range),
+ Priority(IP_FILTER_PRIORITY, NORMAL),
+ action::Redirect(lo));
+
+ if (eth0ToLoLoopback.isError()) {
+ return Error(
+ "Failed to create an IP packet filter (for loopback IP) from " +
+ eth0 + " to " + lo + ": " + eth0ToLoLoopback.error());
+ } else if (!eth0ToLoLoopback.get()) {
+ return Error(
+ "The IP packet filter (for loopback IP) from " +
+ eth0 + " to " + lo + " already exists");
+ }
+
+ return Nothing();
+}
+
+
+// Helper function to remove IP filters inside the container for a
+// given port range.
+static Try<Nothing> removeContainerIPFilters(
+ const PortRange& range,
+ const string& eth0,
+ const string& lo)
+{
+ // Remove the 'terminal' IP packet filter on lo.
+ Try<bool> loTerminal = filter::ip::remove(
+ lo,
+ ingress::HANDLE,
+ ip::Classifier(None(), None(), None(), range));
+
+ if (loTerminal.isError()) {
+ return Error(
+ "Failed to remove the IP packet filter on " + lo +
+ " which stops packets from being sent to " + eth0 +
+ ": " + loTerminal.error());
+ } else if (!loTerminal.get()) {
+ return Error(
+ "The IP packet filter on " + lo +
+ " which stops packets from being sent to " + eth0 +
+ " does not exist");
+ }
+
+ // Remove the IP packet filter (for loopback IP) from eth0 to lo.
+ Try<bool> eth0ToLoLoopback = filter::ip::remove(
+ eth0,
+ ingress::HANDLE,
+ ip::Classifier(
+ None(),
+ net::IPNetwork::LOOPBACK_V4().address(),
+ None(),
+ range));
+
+ if (eth0ToLoLoopback.isError()) {
+ return Error(
+ "Failed to remove the IP packet filter (for loopback IP) from " +
+ eth0 + " to " + lo + ": " + eth0ToLoLoopback.error());
+ } else if (!eth0ToLoLoopback.get()) {
+ return Error(
+ "The IP packet filter (for loopback IP) from " +
+ eth0 + " to " + lo + " does not exist");
+ }
+
+ return Nothing();
+}
+
+
+int PortMappingUpdate::execute()
+{
+ if (flags.help) {
+ cerr << "Usage: " << name() << " [OPTIONS]" << endl << endl
+ << "Supported options:" << endl
+ << flags.usage();
+ return 0;
+ }
+
+ if (flags.eth0_name.isNone()) {
+ cerr << "The public interface name (e.g., eth0) is not specified" << endl;
+ return 1;
+ }
+
+ if (flags.lo_name.isNone()) {
+ cerr << "The loopback interface name (e.g., lo) is not specified" << endl;
+ return 1;
+ }
+
+ if (flags.pid.isNone()) {
+ cerr << "The pid is not specified" << endl;
+ return 1;
+ }
+
+ if (flags.ports_to_add.isNone() && flags.ports_to_remove.isNone()) {
+ cerr << "Nothing to update" << endl;
+ return 1;
+ }
+
+ Option<vector<PortRange>> portsToAdd;
+ Option<vector<PortRange>> portsToRemove;
+
+ if (flags.ports_to_add.isSome()) {
+ Try<vector<PortRange>> parsing = parse(flags.ports_to_add.get());
+ if (parsing.isError()) {
+ cerr << "Parsing 'ports_to_add' failed: " << parsing.error() << endl;
+ return 1;
+ }
+ portsToAdd = parsing.get();
+ }
+
+ if (flags.ports_to_remove.isSome()) {
+ Try<vector<PortRange>> parsing = parse(flags.ports_to_remove.get());
+ if (parsing.isError()) {
+ cerr << "Parsing 'ports_to_remove' failed: " << parsing.error() << endl;
+ return 1;
+ }
+ portsToRemove = parsing.get();
+ }
+
+ // Enter the network namespace.
+ Try<Nothing> setns = ns::setns(flags.pid.get(), "net");
+ if (setns.isError()) {
+ cerr << "Failed to enter the network namespace of pid " << flags.pid.get()
+ << ": " << setns.error() << endl;
+ return 1;
+ }
+
+ // Update IP packet filters.
+ const string eth0 = flags.eth0_name.get();
+ const string lo = flags.lo_name.get();
+
+ if (portsToAdd.isSome()) {
+ foreach (const PortRange& range, portsToAdd.get()) {
+ Try<Nothing> add = addContainerIPFilters(range, eth0, lo);
+ if (add.isError()) {
+ cerr << "Failed to add IP filters: " << add.error() << endl;
+ return 1;
+ }
+ }
+ }
+
+ if (portsToRemove.isSome()) {
+ foreach (const PortRange& range, portsToRemove.get()) {
+ Try<Nothing> remove = removeContainerIPFilters(range, eth0, lo);
+ if (remove.isError()) {
+ cerr << "Failed to remove IP filters: " << remove.error() << endl;
+ return 1;
+ }
+ }
+ }
+
+ return 0;
+}
+
+/////////////////////////////////////////////////
+// Implementation for PortMappingStatistics.
+/////////////////////////////////////////////////
+
+const char* PortMappingStatistics::NAME = "statistics";
+
+
+PortMappingStatistics::Flags::Flags()
+{
+ add(ð0_name,
+ "eth0_name",
+ "The name of the public network interface (e.g., eth0)");
+
+ add(&pid,
+ "pid",
+ "The pid of the process whose namespaces we will enter");
+
+ add(&enable_socket_statistics_summary,
+ "enable_socket_statistics_summary",
+ "Whether to collect socket statistics summary for this container\n",
+ false);
+
+ add(&enable_socket_statistics_details,
+ "enable_socket_statistics_details",
+ "Whether to collect socket statistics details (e.g., TCP RTT)\n"
+ "for this container.",
+ false);
+}
+
+
+// A helper that copies the traffic control statistics from the
+// statistics hashmap into the ResourceStatistics protocol buffer.
+static void addTrafficControlStatistics(
+ const string& id,
+ const hashmap<string, uint64_t>& statistics,
+ ResourceStatistics* result)
+{
+ TrafficControlStatistics *tc = result->add_net_traffic_control_statistics();
+
+ tc->set_id(id);
+
+ // TODO(pbrett) Use protobuf reflection here.
+ if (statistics.contains(BACKLOG)) {
+ tc->set_backlog(statistics.at(BACKLOG));
+ }
+ if (statistics.contains(BYTES)) {
+ tc->set_bytes(statistics.at(BYTES));
+ }
+ if (statistics.contains(DROPS)) {
+ tc->set_drops(statistics.at(DROPS));
+ }
+ if (statistics.contains(OVERLIMITS)) {
+ tc->set_overlimits(statistics.at(OVERLIMITS));
+ }
+ if (statistics.contains(PACKETS)) {
+ tc->set_packets(statistics.at(PACKETS));
+ }
+ if (statistics.contains(QLEN)) {
+ tc->set_qlen(statistics.at(QLEN));
+ }
+ if (statistics.contains(RATE_BPS)) {
+ tc->set_ratebps(statistics.at(RATE_BPS));
+ }
+ if (statistics.contains(RATE_PPS)) {
+ tc->set_ratepps(statistics.at(RATE_PPS));
+ }
+ if (statistics.contains(REQUEUES)) {
+ tc->set_requeues(statistics.at(REQUEUES));
+ }
+}
+
+
+int PortMappingStatistics::execute()
+{
+ if (flags.help) {
+ cerr << "Usage: " << name() << " [OPTIONS]" << endl << endl
+ << "Supported options:" << endl
+ << flags.usage();
+ return 0;
+ }
+
+ if (flags.pid.isNone()) {
+ cerr << "The pid is not specified" << endl;
+ return 1;
+ }
+
+ if (flags.eth0_name.isNone()) {
+ cerr << "The public interface name (e.g., eth0) is not specified" << endl;
+ return 1;
+ }
+
+ // Enter the network namespace.
+ Try<Nothing> setns = ns::setns(flags.pid.get(), "net");
+ if (setns.isError()) {
+ // This could happen if the executor exits before this function is
+ // invoked. We do not log here to avoid spurious logging.
+ return 1;
+ }
+
+ ResourceStatistics result;
+
+ // NOTE: We use a dummy value here since this field will be cleared
+ // before the result is sent to the containerizer.
+ result.set_timestamp(0);
+
+ if (flags.enable_socket_statistics_summary) {
+ // Collections for socket statistics summary are below.
+
+ // For TCP, get the number of ACTIVE and TIME_WAIT connections,
+ // from reading /proc/net/sockstat (/proc/net/sockstat6 for IPV6).
+ // This is not as expensive in the kernel because only counter
+ // values are accessed instead of a dump of all the sockets.
+ // Example output:
+
+ // $ cat /proc/net/sockstat
+ // sockets: used 1391
+ // TCP: inuse 33 orphan 0 tw 0 alloc 37 mem 6
+ // UDP: inuse 15 mem 7
+ // UDPLITE: inuse 0
+ // RAW: inuse 0
+ // FRAG: inuse 0 memory 0
+
+ Try<string> value = os::read("/proc/net/sockstat");
+ if (value.isError()) {
+ cerr << "Failed to read /proc/net/sockstat: " << value.error() << endl;
+ return 1;
+ }
+
+ foreach (const string& line, strings::tokenize(value.get(), "\n")) {
+ if (!strings::startsWith(line, "TCP")) {
+ continue;
+ }
+
+ vector<string> tokens = strings::tokenize(line, " ");
+ for (size_t i = 0; i < tokens.size(); i++) {
+ if (tokens[i] == "inuse") {
+ if (i + 1 >= tokens.size()) {
+ cerr << "Unexpected output from /proc/net/sockstat" << endl;
+ // Be a bit forgiving here here since the /proc file
+ // output format can change, though not very likely.
+ continue;
+ }
+
+ // Set number of active TCP connections.
+ Try<size_t> inuse = numify<size_t>(tokens[i+1]);
+ if (inuse.isError()) {
+ cerr << "Failed to parse the number of tcp connections in use: "
+ << inuse.error() << endl;
+ continue;
+ }
+
+ result.set_net_tcp_active_connections(inuse.get());
+ } else if (tokens[i] == "tw") {
+ if (i + 1 >= tokens.size()) {
+ cerr << "Unexpected output from /proc/net/sockstat" << endl;
+ // Be a bit forgiving here here since the /proc file
+ // output format can change, though not very likely.
+ continue;
+ }
+
+ // Set number of TIME_WAIT TCP connections.
+ Try<size_t> tw = numify<size_t>(tokens[i+1]);
+ if (tw.isError()) {
+ cerr << "Failed to parse the number of tcp connections in"
+ << " TIME_WAIT: " << tw.error() << endl;
+ continue;
+ }
+
+ result.set_net_tcp_time_wait_connections(tw.get());
+ }
+ }
+ }
+ }
+
+ if (flags.enable_socket_statistics_details) {
+ // Collections for socket statistics details are below.
+
+ // NOTE: If the underlying library uses the older version of
+ // kernel API, the family argument passed in may not be honored.
+ Try<vector<diagnosis::socket::Info>> infos =
+ diagnosis::socket::infos(AF_INET, diagnosis::socket::state::ALL);
+
+ if (infos.isError()) {
+ cerr << "Failed to retrieve the socket information" << endl;
+ return 1;
+ }
+
+ vector<uint32_t> RTTs;
+ foreach (const diagnosis::socket::Info& info, infos.get()) {
+ // We double check on family regardless.
+ if (info.family != AF_INET) {
+ continue;
+ }
+
+ // We consider all sockets that have non-zero rtt value.
+ if (info.tcpInfo.isSome() && info.tcpInfo.get().tcpi_rtt != 0) {
+ RTTs.push_back(info.tcpInfo.get().tcpi_rtt);
+ }
+ }
+
+ // Only print to stdout when we have results.
+ if (RTTs.size() > 0) {
+ std::sort(RTTs.begin(), RTTs.end());
+
+ // NOTE: The size of RTTs is usually within 1 million so we
+ // don't need to worry about overflow here.
+ // TODO(jieyu): Right now, we choose to use "Nearest rank" for
+ // simplicity. Consider directly using the Statistics abstraction
+ // which computes "Linear interpolation between closest ranks".
+ // http://en.wikipedia.org/wiki/Percentile
+ size_t p50 = RTTs.size() * 50 / 100;
+ size_t p90 = RTTs.size() * 90 / 100;
+ size_t p95 = RTTs.size() * 95 / 100;
+ size_t p99 = RTTs.size() * 99 / 100;
+
+ result.set_net_tcp_rtt_microsecs_p50(RTTs[p50]);
+ result.set_net_tcp_rtt_microsecs_p90(RTTs[p90]);
+ result.set_net_tcp_rtt_microsecs_p95(RTTs[p95]);
+ result.set_net_tcp_rtt_microsecs_p99(RTTs[p99]);
+ }
+ }
+
+ // Collect traffic statistics for the container from the container
+ // virtual interface and export them in JSON.
+ const string& eth0 = flags.eth0_name.get();
+
+ // Overlimits are reported on the HTB qdisc at the egress root.
+ Result<hashmap<string, uint64_t>> statistics =
+ htb::statistics(eth0, EGRESS_ROOT);
+
+ if (statistics.isSome()) {
+ addTrafficControlStatistics(
+ NET_ISOLATOR_BW_LIMIT,
+ statistics.get(),
+ &result);
+ } else if (statistics.isNone()) {
+ // Traffic control statistics are only available when the
+ // container is created on a slave when the egress rate limit is
+ // on (i.e., egress_rate_limit_per_container flag is set). We
+ // can't just test for that flag here however, since the slave may
+ // have been restarted with different flags since the container
+ // was created. It is also possible that isolator statistics are
+ // unavailable because we the container is in the process of being
+ // created or destroy. Hence we do not report a lack of network
+ // statistics as an error.
+ } else if (statistics.isError()) {
+ cerr << "Failed to get htb qdisc statistics on " << eth0
+ << " in namespace " << flags.pid.get() << endl;
+ }
+
+ // Drops due to the bandwidth limit should be reported at the leaf.
+ statistics = fq_codel::statistics(eth0, CONTAINER_TX_HTB_CLASS_ID);
+ if (statistics.isSome()) {
+ addTrafficControlStatistics(
+ NET_ISOLATOR_BLOAT_REDUCTION,
+ statistics.get(),
+ &result);
+ } else if (statistics.isNone()) {
+ // See discussion on network isolator statistics above.
+ } else if (statistics.isError()) {
+ cerr << "Failed to get fq_codel qdisc statistics on " << eth0
+ << " in namespace " << flags.pid.get() << endl;
+ }
+
+ cout << stringify(JSON::Protobuf(result));
+ return 0;
+}
+
+
+/////////////////////////////////////////////////
+// Implementation for the isolator.
+/////////////////////////////////////////////////
+
+PortMappingIsolatorProcess::Metrics::Metrics()
+ : adding_eth0_ip_filters_errors(
+ "port_mapping/adding_eth0_ip_filters_errors"),
+ adding_eth0_ip_filters_already_exist(
+ "port_mapping/adding_eth0_ip_filters_already_exist"),
+ adding_eth0_egress_filters_errors(
+ "port_mapping/adding_eth0_egress_filters_errors"),
+ adding_eth0_egress_filters_already_exist(
+ "port_mapping/adding_eth0_egress_filters_already_exist"),
+ adding_lo_ip_filters_errors(
+ "port_mapping/adding_lo_ip_filters_errors"),
+ adding_lo_ip_filters_already_exist(
+ "port_mapping/adding_lo_ip_filters_already_exist"),
+ adding_veth_ip_filters_errors(
+ "port_mapping/adding_veth_ip_filters_errors"),
+ adding_veth_ip_filters_already_exist(
+ "port_mapping/adding_veth_ip_filters_already_exist"),
+ adding_veth_icmp_filters_errors(
+ "port_mapping/adding_veth_icmp_filters_errors"),
+ adding_veth_icmp_filters_already_exist(
+ "port_mapping/adding_veth_icmp_filters_already_exist"),
+ adding_veth_arp_filters_errors(
+ "port_mapping/adding_veth_arp_filters_errors"),
+ adding_veth_arp_filters_already_exist(
+ "port_mapping/adding_veth_arp_filters_already_exist"),
+ adding_eth0_icmp_filters_errors(
+ "port_mapping/adding_eth0_icmp_filters_errors"),
+ adding_eth0_icmp_filters_already_exist(
+ "port_mapping/adding_eth0_icmp_filters_already_exist"),
+ adding_eth0_arp_filters_errors(
+ "port_mapping/adding_eth0_arp_filters_errors"),
+ adding_eth0_arp_filters_already_exist(
+ "port_mapping/adding_eth0_arp_filters_already_exist"),
+ removing_eth0_ip_filters_errors(
+ "port_mapping/removing_eth0_ip_filters_errors"),
+ removing_eth0_ip_filters_do_not_exist(
+ "port_mapping/removing_eth0_ip_filters_do_not_exist"),
+ removing_eth0_egress_filters_errors(
+ "port_mapping/removing_eth0_egress_filters_errors"),
+ removing_eth0_egress_filters_do_not_exist(
+ "port_mapping/removinging_eth0_egress_filters_do_not_exist"),
+ removing_lo_ip_filters_errors(
+ "port_mapping/removing_lo_ip_filters_errors"),
+ removing_lo_ip_filters_do_not_exist(
+ "port_mapping/removing_lo_ip_filters_do_not_exist"),
+ removing_veth_ip_filters_errors(
+ "port_mapping/removing_veth_ip_filters_errors"),
+ removing_veth_ip_filters_do_not_exist(
+ "port_mapping/removing_veth_ip_filters_do_not_exist"),
+ removing_eth0_icmp_filters_errors(
+ "port_mapping/removing_eth0_icmp_filters_errors"),
+ removing_eth0_icmp_filters_do_not_exist(
+ "port_mapping/removing_eth0_icmp_filters_do_not_exist"),
+ removing_eth0_arp_filters_errors(
+ "port_mapping/removing_eth0_arp_filters_errors"),
+ removing_eth0_arp_filters_do_not_exist(
+ "port_mapping/removing_eth0_arp_filters_do_not_exist"),
+ updating_eth0_icmp_filters_errors(
+ "port_mapping/updating_eth0_icmp_filters_errors"),
+ updating_eth0_icmp_filters_already_exist(
+ "port_mapping/updating_eth0_icmp_filters_already_exist"),
+ updating_eth0_icmp_filters_do_not_exist(
+ "port_mapping/updating_eth0_icmp_filters_do_not_exist"),
+ updating_eth0_arp_filters_errors(
+ "port_mapping/updating_eth0_arp_filters_errors"),
+ updating_eth0_arp_filters_already_exist(
+ "port_mapping/updating_eth0_arp_filters_already_exist"),
+ updating_eth0_arp_filters_do_not_exist(
+ "port_mapping/updating_eth0_arp_filters_do_not_exist"),
+ updating_container_ip_filters_errors(
+ "port_mapping/updating_container_ip_filters_errors")
+{
+ process::metrics::add(adding_eth0_ip_filters_errors);
+ process::metrics::add(adding_eth0_ip_filters_already_exist);
+ process::metrics::add(adding_lo_ip_filters_errors);
+ process::metrics::add(adding_lo_ip_filters_already_exist);
+ process::metrics::add(adding_veth_ip_filters_errors);
+ process::metrics::add(adding_veth_ip_filters_already_exist);
+ process::metrics::add(adding_veth_icmp_filters_errors);
+ process::metrics::add(adding_veth_icmp_filters_already_exist);
+ process::metrics::add(adding_veth_arp_filters_errors);
+ process::metrics::add(adding_veth_arp_filters_already_exist);
+ process::metrics::add(adding_eth0_icmp_filters_errors);
+ process::metrics::add(adding_eth0_icmp_filters_already_exist);
+ process::metrics::add(adding_eth0_arp_filters_errors);
+ process::metrics::add(adding_eth0_arp_filters_already_exist);
+ process::metrics::add(removing_eth0_ip_filters_errors);
+ process::metrics::add(removing_eth0_ip_filters_do_not_exist);
+ process::metrics::add(removing_lo_ip_filters_errors);
+ process::metrics::add(removing_lo_ip_filters_do_not_exist);
+ process::metrics::add(removing_veth_ip_filters_errors);
+ process::metrics::add(removing_veth_ip_filters_do_not_exist);
+ process::metrics::add(removing_eth0_icmp_filters_errors);
+ process::metrics::add(removing_eth0_icmp_filters_do_not_exist);
+ process::metrics::add(removing_eth0_arp_filters_errors);
+ process::metrics::add(removing_eth0_arp_filters_do_not_exist);
+ process::metrics::add(updating_eth0_icmp_filters_errors);
+ process::metrics::add(updating_eth0_icmp_filters_already_exist);
+ process::metrics::add(updating_eth0_icmp_filters_do_not_exist);
+ process::metrics::add(updating_eth0_arp_filters_errors);
+ process::metrics::add(updating_eth0_arp_filters_already_exist);
+ process::metrics::add(updating_eth0_arp_filters_do_not_exist);
+ process::metrics::add(updating_container_ip_filters_errors);
+}
+
+
+PortMappingIsolatorProcess::Metrics::~Metrics()
+{
+ process::metrics::remove(adding_eth0_ip_filters_errors);
+ process::metrics::remove(adding_eth0_ip_filters_already_exist);
+ process::metrics::remove(adding_lo_ip_filters_errors);
+ process::metrics::remove(adding_lo_ip_filters_already_exist);
+ process::metrics::remove(adding_veth_ip_filters_errors);
+ process::metrics::remove(adding_veth_ip_filters_already_exist);
+ process::metrics::remove(adding_veth_icmp_filters_errors);
+ process::metrics::remove(adding_veth_icmp_filters_already_exist);
+ process::metrics::remove(adding_veth_arp_filters_errors);
+ process::metrics::remove(adding_veth_arp_filters_already_exist);
+ process::metrics::remove(adding_eth0_icmp_filters_errors);
+ process::metrics::remove(adding_eth0_icmp_filters_already_exist);
+ process::metrics::remove(adding_eth0_arp_filters_errors);
+ process::metrics::remove(adding_eth0_arp_filters_already_exist);
+ process::metrics::remove(removing_eth0_ip_filters_errors);
+ process::metrics::remove(removing_eth0_ip_filters_do_not_exist);
+ process::metrics::remove(removing_lo_ip_filters_errors);
+ process::metrics::remove(removing_lo_ip_filters_do_not_exist);
+ process::metrics::remove(removing_veth_ip_filters_errors);
+ process::metrics::remove(removing_veth_ip_filters_do_not_exist);
+ process::metrics::remove(removing_eth0_icmp_filters_errors);
+ process::metrics::remove(removing_eth0_icmp_filters_do_not_exist);
+ process::metrics::remove(removing_eth0_arp_filters_errors);
+ process::metrics::remove(removing_eth0_arp_filters_do_not_exist);
+ process::metrics::remove(updating_eth0_icmp_filters_errors);
+ process::metrics::remove(updating_eth0_icmp_filters_already_exist);
+ process::metrics::remove(updating_eth0_icmp_filters_do_not_exist);
+ process::metrics::remove(updating_eth0_arp_filters_errors);
+ process::metrics::remove(updating_eth0_arp_filters_already_exist);
+ process::metrics::remove(updating_eth0_arp_filters_do_not_exist);
+ process::metrics::remove(updating_container_ip_filters_errors);
+}
+
+
+Try<Isolator*> PortMappingIsolatorProcess::create(const Flags& flags)
+{
+ // Check for root permission.
+ if (geteuid() != 0) {
+ return Error("Using network isolator requires root permissions");
+ }
+
+ // Verify that the network namespace is available by checking the
+ // existence of the network namespace handle of the current process.
+ if (ns::namespaces().count("net") == 0) {
+ return Error(
+ "Using network isolator requires network namespace. "
+ "Make sure your kernel is newer than 3.4");
+ }
+
+ // Check the routing library.
+ Try<Nothing> check = routing::check();
+ if (check.isError()) {
+ return Error(
+ "Routing library check failed: " +
+ check.error());
+ }
+
+ // Check the availability of a few Linux commands that we will use.
+ // We use the blocking os::shell here because 'create' will only be
+ // invoked during initialization.
+ Try<string> checkCommandTc = os::shell("tc filter show");
+ if (checkCommandTc.isError()) {
+ return Error("Check command 'tc' failed: " + checkCommandTc.error());
+ }
+
+ Try<string> checkCommandIp = os::shell("ip link show");
+ if (checkCommandIp.isError()) {
+ return Error("Check command 'ip' failed: " + checkCommandIp.error());
+ }
+
+ Try<Resources> resources = Resources::parse(
+ flags.resources.getOrElse(""),
+ flags.default_role);
+
+ if (resources.isError()) {
+ return Error("Failed to parse --resources: " + resources.error());
+ }
+
+ // Get 'ports' resource from 'resources' flag. These ports will be
+ // treated as non-ephemeral ports.
+ IntervalSet<uint16_t> nonEphemeralPorts;
+ if (resources.get().ports().isSome()) {
+ nonEphemeralPorts = getIntervalSet(resources.get().ports().get());
+ }
+
+ // Get 'ephemeral_ports' resource from 'resources' flag. These ports
+ // will be allocated to each container as ephemeral ports.
+ IntervalSet<uint16_t> ephemeralPorts;
+ if (resources.get().ephemeral_ports().isSome()) {
+ ephemeralPorts = getIntervalSet(resources.get().ephemeral_ports().get());
+ }
+
+ // Each container requires at least one ephemeral port for slave
+ // executor communication. If no 'ephemeral_ports' resource is
+ // found, we will return error.
+ if (ephemeralPorts.empty()) {
+ return Error("Ephemeral ports are not specified");
+ }
+
+ // Sanity check to make sure that the ephemeral ports specified do
+ // not intersect with the specified non-ephemeral ports.
+ if (ephemeralPorts.intersects(nonEphemeralPorts)) {
+ return Error(
+ "The specified ephemeral ports " + stringify(ephemeralPorts) +
+ " intersect with the specified non-ephemeral ports " +
+ stringify(nonEphemeralPorts));
+ }
+
+ // This is a sanity check to make sure that the ephemeral ports
+ // specified do not intersect with the well known ports.
+ if (ephemeralPorts.intersects(WELL_KNOWN_PORTS())) {
+ return Error(
+ "The specified ephemeral ports " + stringify(ephemeralPorts) +
+ " intersect with well known ports " + stringify(WELL_KNOWN_PORTS()));
+ }
+
+ // Obtain the host ephemeral port range by reading the proc file
+ // system ('ip_local_port_range').
+ Try<string> value = os::read("/proc/sys/net/ipv4/ip_local_port_range");
+ if (value.isError()) {
+ return Error("Failed to read host ip_local_port_range: " + value.error());
+ }
+
+ vector<string> split = strings::split(strings::trim(value.get()), "\t");
+ if (split.size() != 2) {
+ return Error(
+ "Unexpected format from host ip_local_port_range: " + value.get());
+ }
+
+ Try<uint16_t> begin = numify<uint16_t>(split[0]);
+ if (begin.isError()) {
+ return Error(
+ "Failed to parse the begin of host ip_local_port_range: " + split[0]);
+ }
+
+ Try<uint16_t> end = numify<uint16_t>(split[1]);
+ if (end.isError()) {
+ return Error(
+ "Failed to parse the end of host ip_local_port_range: " + split[1]);
+ }
+
+ Interval<uint16_t> hostEphemeralPorts =
+ (Bound<uint16_t>::closed(begin.get()),
+ Bound<uint16_t>::closed(end.get()));
+
+ // Sanity check to make sure the specified ephemeral ports do not
+ // intersect with the ephemeral ports used by the host.
+ if (ephemeralPorts.intersects(hostEphemeralPorts)) {
+ return Error(
+ "The specified ephemeral ports " + stringify(ephemeralPorts) +
+ " intersect with the ephemeral ports used by the host " +
+ stringify(hostEphemeralPorts));
+ }
+
+ // TODO(chzhcn): Cross check ephemeral ports with used ports on the
+ // host (e.g., using port scan).
+
+ // Initialize the ephemeral ports allocator.
+
+ // In theory, any positive integer can be broken up into a few
+ // numbers that are power of 2 aligned. We choose to not allow this
+ // for now so that each container has a fixed (one) number of
+ // filters for ephemeral ports. This makes it easy to debug and
+ // infer performance.
+ if (roundDownToPowerOfTwo(flags.ephemeral_ports_per_container) !=
+ flags.ephemeral_ports_per_container) {
+ return Error(
+ "The number of ephemeral ports for each container (" +
+ stringify(flags.ephemeral_ports_per_container) +
+ ") is not a power of 2");
+ }
+
+ if (ephemeralPorts.size() < flags.ephemeral_ports_per_container) {
+ return Error(
+ "Network Isolator is given ephemeral ports of size: " +
+ stringify(ephemeralPorts.size()) + ", but asked to allocate " +
+ stringify(flags.ephemeral_ports_per_container) +
+ " ephemeral ports for a container");
+ }
+
+ if (flags.ephemeral_ports_per_container < MIN_EPHEMERAL_PORTS_SIZE) {
+ return Error(
+ "Each container has only " +
+ stringify(flags.ephemeral_ports_per_container) +
+ " ephemeral ports. The minimum required is: " +
+ stringify(MIN_EPHEMERAL_PORTS_SIZE));
+ }
+
+ Owned<EphemeralPortsAllocator> ephemeralPortsAllocator(
+ new EphemeralPortsAllocator(
+ ephemeralPorts,
+ flags.ephemeral_ports_per_container));
+
+ // Get the name of the public interface (e.g., eth0). If it is not
+ // specified, try to derive its name from the routing library.
+ Result<string> eth0 = link::eth0();
+ if (flags.eth0_name.isSome()) {
+ eth0 = flags.eth0_name.get();
+
+ // Check if the given public interface exists.
+ Try<bool> hostEth0Exists = link::exists(eth0.get());
+ if (hostEth0Exists.isError()) {
+ return Error(
+ "Failed to check if " + eth0.get() + " exists: " +
+ hostEth0Exists.error());
+ } else if (!hostEth0Exists.get()) {
+ return Error("The public interface " + eth0.get() + " does not exist");
+ }
+ } else if (!eth0.isSome()){
+ // eth0 is not specified in the flag and we did not get a valid
+ // eth0 from the library.
+ return Error(
+ "Network Isolator failed to find a public interface: " + eth0.error());
+ }
+
+ LOG(INFO) << "Using " << eth0.get() << " as the public interface";
+
+ // Get the name of the loopback interface. If it is not specified,
+ // try to derive its name based on the loopback IP address.
+ Result<string> lo = link::lo();
+ // Option<string> lo = flags.lo_name;
+ if (flags.lo_name.isSome()) {
+ lo = flags.lo_name;
+
+ // Check if the given loopback interface exists.
+ Try<bool> hostLoExists = link::exists(lo.get());
+ if (hostLoExists.isError()) {
+ return Error(
+ "Failed to check if " + lo.get() + " exists: " +
+ hostLoExists.error());
+ } else if (!hostLoExists.get()) {
+ return Error("The loopback interface " + lo.get() + " does not exist");
+ }
+ } else if (!lo.isSome()) {
+ // lo is not specified in the flag and we did not get a valid
+ // lo from the library.
+ return Error(
+ "Network Isolator failed to find a loopback interface: " + lo.error());
+ }
+
+ LOG(INFO) << "Using " << lo.get() << " as the loopback interface";
+
+ // If egress rate limit is provided, do a sanity check that it is
+ // not greater than the host physical link speed.
+ Option<Bytes> egressRateLimitPerContainer;
+ if (flags.egress_rate_limit_per_container.isSome()) {
+ // Read host physical link speed from /sys/class/net/eth0/speed.
+ // This value is in MBits/s.
+ Try<string> value =
+ os::read(path::join("/sys/class/net", eth0.get(), "speed"));
+
+ if (value.isError()) {
+ return Error(
+ "Failed to read " +
+ path::join("/sys/class/net", eth0.get(), "speed") +
+ ": " + value.error());
+ }
+
+ Try<uint64_t> hostLinkSpeed = numify<uint64_t>(strings::trim(value.get()));
+ CHECK_SOME(hostLinkSpeed);
+
+ // It could be possible that the nic driver doesn't support
+ // reporting physical link speed. In that case, report error.
+ if (hostLinkSpeed.get() == 0xFFFFFFFF) {
+ return Error(
+ "Network Isolator failed to determine link speed for " + eth0.get());
+ }
+
+ // Convert host link speed to Bytes/s for comparason.
+ if (hostLinkSpeed.get() * 1000000 / 8 <
+ flags.egress_rate_limit_per_container.get().bytes()) {
+ return Error(
+ "The given egress traffic limit for containers " +
+ stringify(flags.egress_rate_limit_per_container.get().bytes()) +
+ " Bytes/s is greater than the host link speed " +
+ stringify(hostLinkSpeed.get() * 1000000 / 8) + " Bytes/s");
+ }
+
+ if (flags.egress_rate_limit_per_container.get() != Bytes(0)) {
+ egressRateLimitPerContainer = flags.egress_rate_limit_per_container.get();
+ } else {
+ LOG(WARNING) << "Ignoring the given zero egress rate limit";
+ }
+ }
+
+ // Get the host IP network, MAC and default gateway.
+ Result<net::IPNetwork> hostIPNetwork =
+ net::IPNetwork::fromLinkDevice(eth0.get(), AF_INET);
+
+ if (!hostIPNetwork.isSome()) {
+ return Error(
+ "Failed to get the public IP network of " + eth0.get() + ": " +
+ (hostIPNetwork.isError() ?
+ hostIPNetwork.error() :
+ "does not have an IPv4 network"));
+ }
+
+ Result<net::MAC> hostMAC = net::mac(eth0.get());
+ if (!hostMAC.isSome()) {
+ return Error(
+ "Failed to get the MAC address of " + eth0.get() + ": " +
+ (hostMAC.isError() ? hostMAC.error() : "does not have a MAC address"));
+ }
+
+ Result<net::IP> hostDefaultGateway = route::defaultGateway();
+ if (!hostDefaultGateway.isSome()) {
+ return Error(
+ "Failed to get the default gateway of the host: " +
+ (hostDefaultGateway.isError() ? hostDefaultGateway.error()
+ : "The default gateway of the host does not exist"));
+ }
+
+ // Set the MAC address of the host loopback interface (lo) so that
+ // it matches that of the host public interface (eth0). A fairly
+ // recent kernel patch is needed for this operation to succeed:
+ // https://git.kernel.org/cgit/linux/kernel/git/davem/net.git/:
+ // 25f929fbff0d1bcebf2e92656d33025cd330cbf8
+ Try<bool> setHostLoMAC = link::setMAC(lo.get(), hostMAC.get());
+ if (setHostLoMAC.isError()) {
+ return Error(
+ "Failed to set the MAC address of " + lo.get() +
+ ": " + setHostLoMAC.error());
+ }
+
+ // Set the MTU of the host loopback interface (lo) so that it
+ // matches that of the host public interface (eth0).
+ Result<unsigned int> hostEth0MTU = link::mtu(eth0.get());
+ if (hostEth0MTU.isError()) {
+ return Error(
+ "Failed to get the MTU of " + eth0.get() +
+ ": " + hostEth0MTU.error());
+ }
+
+ // The host public interface should exist since we just checked it.
+ CHECK_SOME(hostEth0MTU);
+
+ Try<bool> setHostLoMTU = link::setMTU(lo.get(), hostEth0MTU.get());
+ if (setHostLoMTU.isError()) {
+ return Error(
+ "Failed to set the MTU of " + lo.get() +
+ ": " + setHostLoMTU.error());
+ }
+
+ // Prepare the ingress queueing disciplines on host public interface
+ // (eth0) and host loopback interface (lo).
+ Try<bool> createHostEth0IngressQdisc = ingress::create(eth0.get());
+ if (createHostEth0IngressQdisc.isError()) {
+ return Error(
+ "Failed to create the ingress qdisc on " + eth0.get() +
+ ": " + createHostEth0IngressQdisc.error());
+ }
+
+ set<uint16_t> freeFlowIds;
+ if (flags.egress_unique_flow_per_container) {
+ // Prepare a fq_codel queueing discipline on host public interface
+ // (eth0) for egress flow classification.
+ //
+ // TODO(cwang): Maybe we can continue when some other egress qdisc
+ // exists because this is not a necessary qdisc for network
+ // isolation, but we don't want inconsistency, so we just fail in
+ // this case. See details in MESOS-2370.
+ Try<bool> createHostEth0EgressQdisc = fq_codel::create(
+ eth0.get(),
+ EGRESS_ROOT,
+ HOST_TX_FQ_CODEL_HANDLE);
+ if (createHostEth0EgressQdisc.isError()) {
+ return Error(
+ "Failed to create the egress qdisc on " + eth0.get() +
+ ": " + createHostEth0EgressQdisc.error());
+ }
+
+ // TODO(cwang): Make sure DEFAULT_FLOWS is large enough so that
+ // it's unlikely to run out of free flow IDs.
+ for (uint16_t i = CONTAINER_MIN_FLOWID; i < fq_codel::DEFAULT_FLOWS; i++) {
+ freeFlowIds.insert(i);
+ }
+ }
+
+ Try<bool> createHostLoQdisc = ingress::create(lo.get());
+ if (createHostLoQdisc.isError()) {
+ return Error(
+ "Failed to create the ingress qdisc on " + lo.get() +
+ ": " + createHostLoQdisc.error());
+ }
+
+ // Enable 'route_localnet' on host loopback interface (lo). This
+ // enables the use of 127.0.0.1/8 for local routing purpose. This
+ // feature only exists on kernel 3.6 or newer.
+ const string loRouteLocalnet =
+ path::join("/proc/sys/net/ipv4/conf", lo.get(), "route_localnet");
+
+ if (!os::exists(loRouteLocalnet)) {
+ // TODO(jieyu): Consider supporting running the isolator if this
+ // feature is not available. We need to conditionally disable
+ // routing for 127.0.0.1/8, and ask the tasks to use the public IP
+ // for container to container and container to host communication.
+ return Error("The kernel does not support 'route_localnet'");
+ }
+
+ Try<Nothing> write = os::write(loRouteLocalnet, "1");
+ if (write.isError()) {
+ return Error(
+ "Failed to enable route_localnet for " + lo.get() +
+ ": " + write.error());
+ }
+
+ // We disable 'rp_filter' and 'send_redirects' for host loopback
+ // interface (lo) to work around a kernel bug, which was only
+ // recently addressed in upstream in the following 3 commits.
+ // https://git.kernel.org/cgit/linux/kernel/git/davem/net.git/:
+ // 6a662719c9868b3d6c7d26b3a085f0cd3cc15e64
+ // 0d5edc68739f1c1e0519acbea1d3f0c1882a15d7
+ // e374c618b1465f0292047a9f4c244bd71ab5f1f0
+ // The workaround ensures packets don't get dropped at lo.
+ write = os::write("/proc/sys/net/ipv4/conf/all/rp_filter", "0");
+ if (write.isError()) {
+ return Error(
+ "Failed to disable rp_filter for all: " + write.error());
+ }
+
+ write = os::write(path::join(
+ "/proc/sys/net/ipv4/conf", lo.get(), "rp_filter"), "0");
+ if (write.isError()) {
+ return Error(
+ "Failed to disable rp_filter for " + lo.get() +
+ ": " + write.error());
+ }
+
+ write = os::write("/proc/sys/net/ipv4/conf/all/send_redirects", "0");
+ if (write.isError()) {
+ return Error(
+ "Failed to disable send_redirects for all: " + write.error());
+ }
+
+ write = os::write(path::join(
+ "/proc/sys/net/ipv4/conf", lo.get(), "send_redirects"), "0");
+ if (write.isError()) {
+ return Error(
+ "Failed to disable send_redirects for " + lo.get() +
+ ": " + write.error());
+ }
+
+ // We need to enable accept_local on host loopback interface (lo)
+ // for kernels older than 3.6. Refer to the following:
+ // https://git.kernel.org/cgit/linux/kernel/git/davem/net.git/:
+ // 7a9bc9b81a5bc6e44ebc80ef781332e4385083f2
+ // https://www.kernel.org/doc/Documentation/networking/ip-sysctl.txt
+ write = os::write(path::join(
+ "/proc/sys/net/ipv4/conf", lo.get(), "accept_local"), "1");
+ if (write.isError()) {
+ return Error(
+ "Failed to enable accept_local for " + lo.get() +
+ ": " + write.error());
+ }
+
+ // Reading host network configurations. Each container will match
+ // these configurations.
+ hashset<string> procs;
+
+ // TODO(jieyu): The following is a partial list of all the
+ // configurations. In the future, we may want to expose these
+ // configurations using ContainerInfo.
+
+ // The kernel will use a default value for the following
+ // configurations inside a container. Therefore, we need to set them
+ // in the container to match that on the host.
+ procs.insert("/proc/sys/net/core/somaxconn");
+
+ // As of kernel 3.10, the following configurations are shared
+ // between host and containers, and therefore are not required to be
+ // set in containers. We keep them here just in case the kernel
+ // changes in the future.
+ procs.insert("/proc/sys/net/core/netdev_max_backlog");
+ procs.insert("/proc/sys/net/core/rmem_max");
+ procs.insert("/proc/sys/net/core/wmem_max");
+ procs.insert("/proc/sys/net/ipv4/tcp_keepalive_time");
+ procs.insert("/proc/sys/net/ipv4/tcp_keepalive_intvl");
+ procs.insert("/proc/sys/net/ipv4/tcp_keepalive_probes");
+ procs.insert("/proc/sys/net/ipv4/tcp_max_syn_backlog");
+ procs.insert("/proc/sys/net/ipv4/tcp_rmem");
+ procs.insert("/proc/sys/net/ipv4/tcp_retries2");
+ procs.insert("/proc/sys/net/ipv4/tcp_synack_retries");
+ procs.insert("/proc/sys/net/ipv4/tcp_wmem");
+ procs.insert("/proc/sys/net/ipv4/neigh/default/gc_thresh1");
+ procs.insert("/proc/sys/net/ipv4/neigh/default/gc_thresh2");
+ procs.insert("/proc/sys/net/ipv4/neigh/default/gc_thresh3");
+
+ hashmap<string, string> hostNetworkConfigurations;
+ foreach (const string& proc, procs) {
+ Try<string> value = os::read(proc);
+ if (value.isSome()) {
+ LOG(INFO) << proc << " = '" << strings::trim(value.get()) << "'";
+ hostNetworkConfigurations[proc] = strings::trim(value.get());
+ }
+ }
+
+ // Self bind mount PORT_MAPPING_BIND_MOUNT_ROOT(). Since we use a
+ // new mount namespace for each container, for this mount point, we
+ // set '--make-rshared' on the host and set '--make-rslave' inside
+ // each container. This is important because when we unmount the
+ // network namespace handles on the host, those handles will be
+ // unmounted in the containers as well, but NOT vice versa.
+
+ // We first create the bind mount directory if it does not exist.
+ Try<Nothing> mkdir = os::mkdir(PORT_MAPPING_BIND_MOUNT_ROOT());
+ if (mkdir.isError()) {
+ return Error(
+ "Failed to create the bind mount root directory at " +
+ PORT_MAPPING_BIND_MOUNT_ROOT() + ": " + mkdir.error());
+ }
+
+ // Now, check '/proc/mounts' to see if
+ // PORT_MAPPING_BIND_MOUNT_ROOT() has already been self mounted.
+ Try<fs::MountTable> mountTable = fs::MountTable::read("/proc/mounts");
+ if (mountTable.isError()) {
+ return Error(
+ "Failed to the read the mount table at '/proc/mounts': " +
+ mountTable.error());
+ }
+
+ Option<fs::MountTable::Entry> bindMountRoot;
+ foreach (const fs::MountTable::Entry& entry, mountTable.get().entries) {
+ if (entry.dir == PORT_MAPPING_BIND_MOUNT_ROOT()) {
+ bindMountRoot = entry;
+ }
+ }
+
+ // Self bind mount PORT_MAPPING_BIND_MOUNT_ROOT().
+ if (bindMountRoot.isNone()) {
+ // NOTE: Instead of using fs::mount to perform the bind mount, we
+ // use the shell command here because the syscall 'mount' does not
+ // update the mount table (i.e., /etc/mtab), which could cause
+ // issues for the shell command 'mount --make-rslave' inside the
+ // container. It's OK to use the blocking os::shell here because
+ // 'create' will only be invoked during initialization.
+ Try<string> mount = os::shell(
+ "mount --bind %s %s",
+ PORT_MAPPING_BIND_MOUNT_ROOT().c_str(),
+ PORT_MAPPING_BIND_MOUNT_ROOT().c_str());
+
+ if (mount.isError()) {
+ return Error(
+ "Failed to self bind mount '" + PORT_MAPPING_BIND_MOUNT_ROOT() +
+ "': " + mount.error());
+ }
+ }
+
+ // Mark the mount point PORT_MAPPING_BIND_MOUNT_ROOT() as
+ // recursively shared.
+ Try<string> mountShared = os::shell(
+ "mount --make-rshared %s",
+ PORT_MAPPING_BIND_MOUNT_ROOT().c_str());
+
+ if (mountShared.isError()) {
+ return Error(
+ "Failed to mark '" + PORT_MAPPING_BIND_MOUNT_ROOT() +
+ "' as recursively shared: " + mountShared.error());
+ }
+
+ // Create the network namespace handle symlink directory if it does
+ // not exist. It is used to host from network namespace handle
+ // symlinks whose basename is a container ID. This allows us to
+ // recover container IDs for orphan containers (i.e., not known by
+ // the slave). This is introduced in 0.23.0.
+ mkdir = os::mkdir(PORT_MAPPING_BIND_MOUNT_SYMLINK_ROOT());
+ if (mkdir.isError()) {
+ return Error(
+ "Failed to create the bind mount root directory at " +
+ PORT_MAPPING_BIND_MOUNT_SYMLINK_ROOT() + ": " + mkdir.error());
+ }
+
+ return new MesosIsolator(Owned<MesosIsolatorProcess>(
+ new PortMappingIsolatorProcess(
+ flags,
+ eth0.get(),
+ lo.get(),
+ hostMAC.get(),
+ hostIPNetwork.get(),
+ hostEth0MTU.get(),
+ hostDefaultGateway.get(),
+ hostNetworkConfigurations,
+ egressRateLimitPerContainer,
+ nonEphemeralPorts,
+ ephemeralPortsAllocator,
+ freeFlowIds)));
+}
+
+
+Future<Nothing> PortMappingIsolatorProcess::recover(
+ const list<ContainerState>& states,
+ const hashset<ContainerID>& orphans)
+{
+ // Extract pids from virtual device names (veth). This tells us
+ // about all the potential live containers on this slave.
+ Try<set<string>> links = net::links();
+ if (links.isError()) {
+ return Failure("Failed to get all the links: " + links.error());
+ }
+
+ hashset<pid_t> pids;
+ foreach (const string& name, links.get()) {
+ Option<pid_t> pid = getPidFromVeth(name);
+ // Not all links follow the naming: mesos{pid}, so we simply
+ // continue, e.g., eth0.
+ if (pid.isNone()) {
+ continue;
+ } else if (pids.contains(pid.get())) {
+ return Failure("Two virtual devices have the same name '" + name + "'");
+ }
+
+ pids.insert(pid.get());
+ }
+
+ // Scan the bind mount root to cleanup all stale network namespace
+ // handles that do not have an active veth associated with.
+ Try<list<string>> entries = os::ls(PORT_MAPPING_BIND_MOUNT_ROOT());
+ if (entries.isError()) {
+ return Failure(
+ "Failed to list bind mount root '" +
+ PORT_MAPPING_BIND_MOUNT_ROOT() +
+ "': " + entries.error());
+ }
+
+ foreach (const string& entry, entries.get()) {
+ const string path = path::join(PORT_MAPPING_BIND_MOUNT_ROOT(), entry);
+
+ // NOTE: We expect all regular files whose names are numbers under
+ // the bind mount root are network namespace handles.
+ Result<pid_t> pid = getPidFromNamespaceHandle(path);
+ if (pid.isError()) {
+ return Failure(
+ "Failed to get pid from network namespace handle '" +
+ path + "': " + pid.error());
+ } else if (pid.isNone()) {
+ // We ignore files that are clearly not network namespace
+ // handles created by us. It's likely that those are created by
+ // users or other tools.
+ LOG(WARNING) << "Unrecognized network namespace handle '" << path << "'";
+ continue;
+ }
+
+ // We cleanup the network namespace handle if the associated
+ // containers have clearly exited (i.e., the veth has gone). The
+ // cleanup here is best effort.
+ if (!pids.contains(pid.get())) {
+ LOG(INFO) << "Removing stale network namespace handle '" << path << "'";
+
+ Try<Nothing> unmount = fs::unmount(path, MNT_DETACH);
+ if (unmount.isError()) {
+ LOG(WARNING) << "Failed to unmount stale network namespace handle '"
+ << path << "': " << unmount.error();
+ }
+
+ Try<Nothing> rm = os::rm(path);
+ if (rm.isError()) {
+ LOG(WARNING) << "Failed to remove stale network namespace handle '"
+ << path << "': " << rm.error();
+ }
+ }
+ }
+
+ // Scan the bind mount symlink root for container IDs. This allows us
+ // to recover container IDs for orphan containers (i.e., not known
+ // by the slave). This is introduced in 0.23.0.
+ entries = os::ls(PORT_MAPPING_BIND_MOUNT_SYMLINK_ROOT());
+ if (entries.isError()) {
+ return Failure(
+ "Failed to list bind mount symlink root '" +
+ PORT_MAPPING_BIND_MOUNT_SYMLINK_ROOT() +
+ "': " + entries.error());
+ }
+
+ // This map stores the mapping between pids and container IDs
+ // recovered from the bind mount root that have valid veth links. We
+ // use a multihashmap here because multiple container IDs can map to
+ // the same pid if the removal of a symlink fails in '_cleanup()'
+ // and the pid is reused by a new container.
+ multihashmap<pid_t, ContainerID> linkers;
+
+ foreach (const string& entry, entries.get()) {
+ const string path =
+ path::join(PORT_MAPPING_BIND_MOUNT_SYMLINK_ROOT(), entry);
+
+ // We only create symlinks in this directory and assume
+ // non-symlink files are created by other users or tools,
+ // therefore will be ignored.
+ if (!os::stat::islink(path)) {
+ LOG(WARNING) << "Ignored non-symlink file '" << path
+ << "' under bind mount symlink root '"
+ << PORT_MAPPING_BIND_MOUNT_SYMLINK_ROOT() << "'";
+ continue;
+ }
+
+ // NOTE: We expect all symlinks under the bind mount symlink root
+ // to be container ID symlinks.
+
+ Try<ContainerID> containerId = getContainerIdFromSymlink(path);
+ if (containerId.isError()) {
+ return Failure(
+ "Failed to get container ID from network namespace handle symlink '" +
+ path + "': " + containerId.error());
+ }
+
+ Result<pid_t> pid = getPidFromSymlink(path);
+ if (pid.isError()) {
+ return Failure(
+ "Failed to get pid from network namespace handle symlink '" + path +
+ "': " + pid.error());
+ }
+
+ // We remove the symlink if it's dangling or the associated
+ // containers have clearly exited (i.e., the veth has gone). The
+ // cleanup here is best effort.
+ if (pid.isNone() || !pids.contains(pid.get())) {
+ LOG(INFO) << "Removing stale network namespace handle symlink '"
+ << path << "'";
+
+ Try<Nothing> rm = os::rm(path);
+ if (rm.isError()) {
+ LOG(WARNING) << "Failed to remove stale network namespace handle "
+ << " symlink '" << path << "': " << rm.error();
+ }
+ } else {
+ LOG(INFO) << "Discovered network namespace handle symlink "
+ << containerId.get() << " -> " << pid.get();
+
+ linkers.put(pid.get(), containerId.get());
+ }
+ }
+
+ // If multiple container IDs point to the same pid, we remove both
+ // symlinks for safety (as if we cannot derive the container ID for
+ // orphans, which is OK because it'll be treated the same as those
+ // containers that are created by older (pre 0.23.0) versions). Note
+ // that it's possible that multiple container IDs map to the same
+ // pid if the removal of a symlink fails in '_cleanup()' and the pid
+ // is reused by a new container.
+ foreach (pid_t pid, linkers.keys()) {
+ list<ContainerID> containerIds = linkers.get(pid);
+ if (containerIds.size() > 1) {
+ foreach (const ContainerID& containerId, containerIds) {
+ const string linker = getSymlinkPath(containerId);
+
+ LOG(WARNING) << "Removing duplicated network namespace handle symlink '"
+ << linker << "'";
+
+ Try<Nothing> rm = os::rm(linker);
+ if (rm.isError()) {
+ LOG(WARNING) << "Failed to remove duplicated network namespace "
+ << "handle symlink '" << linker << "': " << rm.error();
+ }
+ }
+
+ linkers.remove(pid);
+ }
+ }
+
+ // Now, actually recover the isolator from slave's state.
+ foreach (const ContainerState& state, states) {
+ const ContainerID& containerId = state.container_id();
+ pid_t pid = state.pid();
+
+ VLOG(1) << "Recovering network isolator for container "
+ << containerId << " with pid " << pid;
+
+ if (!pids.contains(pid)) {
+ // There are two possible cases here:
+ //
+ // 1) The container was launched by the slave with network
+ // isolation disabled, so the pid could not be found in the
+ // device names in the system.
+ //
+ // 2) The container was launched by the slave with network
+ // isolation enabled, but veth is removed (because the
+ // corresponding container is destroyed), but the slave
+ // restarts before it is able to write the sentinel file.
+ //
+ // In both cases, we treat the container as unmanaged. For case
+ // (2), it's safe to do so because the container has already
+ // been destroyed.
+ VLOG(1) << "Skipped recovery for container " << containerId
+ << "with pid " << pid << " as either it was not managed by "
+ << "the network isolator or it has already been destroyed";
+
+ unmanaged.insert(containerId);
+ continue;
+ }
+
+ Try<Info*> recover = _recover(pid);
+ if (recover.isError()) {
+ foreachvalue (Info* info, infos) {
+ delete info;
+ }
+
+ return Failure(
+ "Failed to recover container " + stringify(containerId) +
+ " with pid " + stringify(pid) + ": " + recover.error());
+ }
+
+ infos[containerId] = recover.get();
+
+ // Remove the successfully recovered pid.
+ pids.erase(pid);
+ }
+
+ // Recover orphans. Known orphans will be destroyed by containerizer
+ // using the normal cleanup path (refer to MESOS-2367 for details).
+ // Unknown orphans will be cleaned up immediately. The recovery will
+ // fail if there is some unknown orphan that cannot be cleaned up.
+ vector<Info*> unknownOrphans;
+
+ foreach (pid_t pid, pids) {
+ Try<Info*> recover = _recover(pid);
+ if (recover.isError()) {
+ foreachvalue (Info* info, infos) {
+ delete info;
+ }
+ foreach (Info* info, unknownOrphans) {
+ delete info;
+ }
+
+ return Failure(
+ "Failed to recover orphaned container with pid " +
+ stringify(pid) + ": " + recover.error());
+ }
+
+ if (linkers.get(pid).size() == 1) {
+ const ContainerID containerId = linkers.get(pid).front();
+ CHECK(!infos.contains(containerId));
+
+ if (orphans.contains(containerId)) {
+ infos[containerId] = recover.get();
+ continue;
+ }
+ }
+
+ unknownOrphans.push_back(recover.get());
+ }
+
+ foreach (Info* info, unknownOrphans) {
+ CHECK_SOME(info->pid);
+ pid_t pid = info->pid.get();
+
+ Option<ContainerID> containerId;
+ if (linkers.get(pid).size() == 1) {
+ containerId = linkers.get(pid).front();
+ }
+
+ // NOTE: If 'infos' is empty (means there is no regular container
+ // or known orphan), the '_cleanup' below will remove the ICMP and
+ // ARP packet filters on host eth0. This will cause subsequent
+ // calls to '_cleanup' for unknown orphans to fail. However, this
+ // is OK because when slave restarts and tries to recover again,
+ // it'll try to remove the remaining unknown orphans.
+ // TODO(jieyu): Consider call '_cleanup' for all the unknown
+ // orphans before returning even if error occurs.
+ Try<Nothing> cleanup = _cleanup(info, containerId);
+ if (cleanup.isError()) {
+ foreachvalue (Info* info, infos) {
+ delete info;
+ }
+
+ // TODO(jieyu): Also delete 'info' in unknownOrphans. Notice
+ // that some 'info' in unknownOrphans might have already been
+ // deleted in '_cleanup' above.
+
+ return Failure(
+ "Failed to cleanup orphaned container with pid " +
+ stringify(pid) + ": " + cleanup.error());
+ }
+ }
+
+ // TODO(cwang): Consider removing unrecognized flow classifiers from
+ // host eth0 egress.
+
+ LOG(INFO) << "Network isolator recovery complete";
+
+ return Nothing();
+}
+
+
+Try<PortMappingIsolatorProcess::Info*>
+PortMappingIsolatorProcess::_recover(pid_t pid)
+{
+ // Get all the IP filters on veth.
+ // NOTE: We only look at veth devices to recover port ranges
+ // assigned to each container. That's the reason why we need to make
+ // sure that we add filters to veth before adding filters to host
+ // eth0 and host lo. Also, we need to make sure we remove filters
+ // from host eth0 and host lo before removing filters from veth.
+ Result<vector<ip::Classifier>> vethIngressClassifiers =
+ ip::classifiers(veth(pid), ingress::HANDLE);
+
+ if (vethIngressClassifiers.isError()) {
+ return Error(
+ "Failed to get all the IP filters on " + veth(pid) +
+ ": " + vethIngressClassifiers.error());
+ } else if (vethIngressClassifiers.isNone()) {
+ return Error(
+ "Failed to get all the IP filters on " + veth(pid) +
+ ": link does not exist");
+ }
+
+ hashmap<PortRange, uint16_t> flowIds;
+
+ if (flags.egress_unique_flow_per_container) {
+ // Get all egress IP flow classifiers on eth0.
+ Result<vector<filter::Filter<ip::Classifier>>> eth0EgressFilters =
+ ip::filters(eth0, HOST_TX_FQ_CODEL_HANDLE);
+
+ if (eth0EgressFilters.isError()) {
+ return Error(
+ "Failed to get all the IP flow classifiers on " + eth0 +
+ ": " + eth0EgressFilters.error());
+ } else if (eth0EgressFilters.isNone()) {
+ return Error(
+ "Failed to get all the IP flow classifiers on " + eth0 +
+ ": link does not exist");
+ }
+
+ // Construct a port range to flow ID mapping from host eth0
+ // egress. This map will be used later.
+ foreach (const filter::Filter<ip::Classifier>& filter,
+ eth0EgressFilters.get()) {
+ const Option<PortRange> sourcePorts = filter.classifier.sourcePorts;
+ const Option<Handle> classid = filter.classid;
+
+ if (sourcePorts.isNone()) {
+ return Error("Missing source ports for filters on egress of " + eth0);
+ }
+
+ if (classid.isNone()) {
+ return Error("Missing classid for filters on egress of " + eth0);
+ }
+
+ if (flowIds.contains(sourcePorts.get())) {
+ return Error(
+ "Duplicated port range " + stringify(sourcePorts.get()) +
+ " detected on egress of " + eth0);
+ }
+
+ flowIds[sourcePorts.get()] = classid.get().secondary();
+ }
+ }
+
+ IntervalSet<uint16_t> nonEphemeralPorts;
+ IntervalSet<uint16_t> ephemeralPorts;
+ Option<uint16_t> flowId;
+
+ foreach (const ip::Classifier& classifier, vethIngressClassifiers.get()) {
+ const Option<PortRange> sourcePorts = classifier.sourcePorts;
+ const Option<PortRange> destinationPorts = classifier.destinationPorts;
+
+ // All the IP filters on veth used by us only have source ports.
+ if (sourcePorts.isNone() || destinationPorts.isSome()) {
+ return Error("Unexpected IP filter detected on " + veth(pid));
+ }
+
+ if (flowIds.contains(sourcePorts.get())) {
+ if (flowId.isNone()) {
+ flowId = flowIds.get(sourcePorts.get());
+ } else if (flowId != flowIds.get(sourcePorts.get())) {
+ return Error(
+ "A container is associated with multiple flows "
+ "on egress of " + eth0);
+ }
+ } else if (flowId.isSome()) {
+ // This is the case where some port range of a container is
+ // assigned to a flow while some isn't. This could happen if
+ // slave crashes while those filters are created. However, this
+ // is OK for us because packets by default go to the host flow.
+ LOG(WARNING) << "Container port range " << sourcePorts.get()
+ << " does not have flow id " << flowId.get()
+ << " assigned";
+ }
+
+ Interval<uint16_t> ports =
+ (Bound<uint16_t>::closed(sourcePorts.get().begin()),
+ Bound<uint16_t>::closed(sourcePorts.get().end()));
+
+ if (managedNonEphemeralPorts.contains(ports)) {
+ nonEphemeralPorts += ports;
+ } else if (ephemeralPortsAllocator->isManaged(ports)) {
+ // We have duplicate here because we have two IP filters with
+ // the same ephemeral port range (one for eth0 and one for lo).
+ // But we should never have two intersecting port ranges.
+ if (!ephemeralPorts.contains(ports) && ephemeralPorts.intersects(ports)) {
+ return Error("Unexpected intersected ephemeral port ranges");
+ }
+
+ ephemeralPorts += ports;
+ } else {
+ return Error("Unexpected IP filter detected on " + veth(pid));
+ }
+ }
+
+ Info* info = NULL;
+
+ if (ephemeralPorts.empty()) {
+ // NOTE: This is possible because the slave may crash while
+ // calling 'isolate()', leaving a partially isolated container. To
+ // clean up this partially isolated container, we still create an
+ // Info struct here and let the 'cleanup' function clean it up
+ // later.
+ LOG(WARNING) << "No ephemeral ports found for container with pid "
+ << stringify(pid) << ". This could happen if slave crashes "
+ << "while isolating a container";
+
+ info = new Info(nonEphemeralPorts, Interval<uint16_t>(), pid);
+ } else {
+ if (ephemeralPorts.intervalCount() != 1) {
+ return Error("Each container should have only one ephemeral port range");
+ }
+
+ // Tell the allocator that this ephemeral port range is used.
+ ephemeralPortsAllocator->allocate(*ephemeralPorts.begin());
+
+ info = new Info(nonEphemeralPorts, *ephemeralPorts.begin(), pid);
+
+ VLOG(1) << "Recovered network isolator for container with pid " << pid
+ << " non-ephemeral port ranges " << nonEphemeralPorts
+ << " and ephemeral port range " << *ephemeralPorts.begin();
+ }
+
+ if (flowId.isSome()) {
+ freeFlowIds.erase(flowId.get());
+ info->flowId = flowId.get();
+ }
+
+ return CHECK_NOTNULL(info);
+}
+
+
+Future<Option<ContainerPrepareInfo>> PortMappingIsolatorProcess::prepare(
+ const ContainerID& containerId,
+ const ExecutorInfo& executorInfo,
+ const string& directory,
+ const Option<string>& user)
+{
+ if (unmanaged.contains(containerId)) {
+ return Failure("Asked to prepare an unmanaged container");
+ }
+
+ if (infos.contains(containerId)) {
+ return Failure("Container has already been prepared");
+ }
+
+ Resources resources(executorInfo.resources());
+
+ IntervalSet<uint16_t> nonEphemeralPorts;
+
+ if (resources.ports().isSome()) {
+ nonEphemeralPorts = getIntervalSet(resources.ports().get());
+
+ // Sanity check to make sure that the assigned non-ephemeral ports
+ // for the container are part of the non-ephemeral ports specified
+ // by the slave.
+ if (!managedNonEphemeralPorts.contains(nonEphemeralPorts)) {
+ return Failure(
+ "Some non-ephemeral ports specified in " +
+ stringify(nonEphemeralPorts) +
+ " are not managed by the slave");
+ }
+ }
+
+ // TODO(jieyu): For now, we simply ignore the 'ephemeral_ports'
+ // specified in the executor info. However, this behavior needs to
+ // be changed once the master can make default allocations for
+ // ephemeral ports.
+ if (resources.ephemeral_ports().isSome()) {
+ LOG(WARNING) << "Ignoring the specified ephemeral_ports '"
+ << resources.ephemeral_ports().get()
+ << "' for container" << containerId
+ << " of executor " << executorInfo.executor_id();
+ }
+
+ // Allocate the ephemeral ports used by this container.
+ Try<Interval<uint16_t>> ephemeralPorts = ephemeralPortsAllocator->allocate();
+ if (ephemeralPorts.isError()) {
+ return Failure(
+ "Failed to allocate ephemeral ports: " + ephemeralPorts.error());
+ }
+
+ infos[containerId] = new Info(nonEphemeralPorts, ephemeralPorts.get());
+
+ LOG(INFO) << "Using non-ephemeral ports " << nonEphemeralPorts
+ << " and ephemeral ports " << ephemeralPorts.get()
+ << " for container " << containerId << " of executor "
+ << executorInfo.executor_id();
+
+ ContainerPrepareInfo prepareInfo;
+ prepareInfo.add_commands()->set_value(scripts(infos[containerId]));
+
+ // NOTE: the port mapping isolator itself doesn't require mount
+ // namespace. However, if mount namespace is enabled because of
+ // other isolators, we need to set mount sharing accordingly for
+ // PORT_MAPPING_BIND_MOUNT_ROOT to avoid races described in
+ // MESOS-1558. So we turn on mount namespace here for consistency.
+ prepareInfo.set_namespaces(CLONE_NEWNET | CLONE_NEWNS);
+
+ return prepareInfo;
+}
+
+
+Future<Nothing> PortMappingIsolatorProcess::isolate(
+ const ContainerID& containerId,
+ pid_t pid)
+{
+ if (unmanaged.contains(containerId)) {
+ return Failure("Asked to isolate an unmanaged container");
+ }
+
+ if (!infos.contains(containerId)) {
+ return Failure("Unknown container");
+ }
+
+ Info* info = CHECK_NOTNULL(infos[containerId]);
+
+ if (info->pid.isSome()) {
+ return Failure("The container has already been isolated");
+ }
+
+ info->pid = pid;
+
+ if (flags.egress_unique_flow_per_container) {
+ info->flowId = getNextFlowId();
+ }
+
+ // Bind mount the network namespace handle of the process 'pid' to a
+ // directory to hold an extra reference to the network namespace
+ // which will be released in 'cleanup'. By holding the extra
+ // reference, the network namespace will not be destroyed even if
+ // the process 'pid' is gone, which allows us to explicitly control
+ // the network namespace life cycle.
+ const string source = path::join("/proc", stringify(pid), "ns", "net");
+ const string target = getNamespaceHandlePath(pid);
+
+ Try<Nothing> touch = os::touch(target);
+ if (touch.isError()) {
+ return Failure("Failed to create the bind mount point: " + touch.error());
+ }
+
+ Try<Nothing> mount = fs::mount(source, target, None(), MS_BIND, NULL);
+ if (mount.isError()) {
+ return Failure(
+ "Failed to mount the network namespace handle from '" +
+ source + "' to '" + target + "': " + mount.error());
+ }
+
+ LOG(INFO) << "Bind mounted '" << source << "' to '" << target
+ << "' for container " << containerId;
+
+ // Since 0.23.0, we create a symlink to the network namespace handle
+ // using the container ID. This serves two purposes. First, it
+ // allows us to recover the container ID later when slave restarts
+ // even if slave's checkpointed meta data is deleted. Second, it
+ // makes the debugging easier. See MESOS-2528 for details.
+ const string linker = getSymlinkPath(containerId);
+ Try<Nothing> symlink = ::fs::symlink(target, linker);
+ if (symlink.isError()) {
+ return Failure(
+ "Failed to symlink the network namespace handle '" +
+ linker + "' -> '" + target + "': " + symlink.error());
+ }
+
+ LOG(INFO) << "Created network namespace handle symlink '"
+ << linker << "' -> '" << target << "'";
+
+ // Create a virtual ethernet pair for this container.
+ Try<bool> createVethPair = link::create(veth(pid), eth0, pid);
+ if (createVethPair.isError()) {
+ return Failure(
+ "Failed to create virtual ethernet pair: " +
+ createVethPair.error());
+ }
+
+ // Disable IPv6 for veth as IPv6 packets won't be forwarded anyway.
+ const string disableIPv6 =
+ path::join("/proc/sys/net/ipv6/conf", veth(pid), "disable_ipv6");
+
+ if (os::exists(disableIPv6)) {
+ Try<Nothing> write = os::write(disableIPv6, "1");
+ if (write.isError()) {
+ return Failure(
+ "Failed to disable IPv6 for " + veth(pid) +
+ ": " + write.error());
+ }
+ }
+
+ // Sets the MAC address of veth to match the MAC address of the host
+ // public interface (eth0).
+ Try<bool> setVethMAC = link::setMAC(veth(pid), hostMAC);
+ if (setVethMAC.isError()) {
+ return Failure(
+ "Failed to set the MAC address of " + veth(pid) +
+ ": " + setVethMAC.error());
+ }
+
+ // Prepare the ingress queueing disciplines on veth.
+ Try<bool> createQdisc = ingress::create(veth(pid));
+ if (createQdisc.isError()) {
+ return Failure(
+ "Failed to create the ingress qdisc on " + veth(pid) +
+ ": " + createQdisc.error());
+ }
+
+ // Veth device should exist since we just created it.
+ CHECK(createQdisc.get());
+
+ // For each port range, add a set of IP packet filters to properly
+ // redirect IP traffic to/from containers.
+ foreach (const PortRange& range,
+ getPortRanges(info->nonEphemeralPorts + info->ephemeralPorts)) {
+ if (info->flowId.isSome()) {
+ LOG(INFO) << "Adding IP packet filters with ports " << range
+ << " with flow ID " << info->flowId.get()
+ << " for container " << containerId;
+ } else {
+ LOG(INFO) << "Adding IP packet filters with ports " << range
+ << " for container " << containerId;
+ }
+
+ Try<Nothing> add = addHostIPFilters(range, info->flowId, veth(pid));
+ if (add.isError()) {
+ return Failure(
+ "Failed to add IP packet filter with ports " +
+ stringify(range) + " for container with pid " +
+ stringify(pid) + ": " + add.error());
+ }
+ }
+
+ // Relay ICMP packets from veth of the container to host eth0.
+ Try<bool> icmpVethToEth0 = filter::icmp::create(
+ veth(pid),
+ ingress::HANDLE,
+ icmp::Classifier(None()),
+ Priority(ICMP_FILTER_PRIORITY, NORMAL),
+ action::Redirect(eth0));
+
+ if (icmpVethToEth0.isError()) {
+ ++metrics.adding_veth_icmp_filters_errors;
+
+ return Failure(
+ "Failed to create an ICMP packet filter from " + veth(pid) +
+ " to host " + eth0 + ": " + icmpVethToEth0.error());
+ } else if (!icmpVethToEth0.get()) {
+ ++metrics.adding_veth_icmp_filters_already_exist;
+
+ return Failure(
+ "The ICMP packet filter from " + veth(pid) +
+ " to host " + eth0 + " already exists");
+ }
+
+ // Relay ARP packets from veth of the container to host eth0.
+ Try<bool> arpVethToEth0 = filter::basic::create(
+ veth(pid),
+ ingress::HANDLE,
+ ETH_P_ARP,
+ Priority(ARP_FILTER_PRIORITY, NORMAL),
+ action::Redirect(eth0));
+
+ if (arpVethToEth0.isError()) {
+ ++metrics.adding_veth_arp_filters_errors;
+
+ return Failure(
+ "Failed to create an ARP packet filter from " + veth(pid) +
+ " to host " + eth0 + ": " + arpVethToEth0.error());
+ } else if (!arpVethToEth0.get()) {
+ ++metrics.adding_veth_arp_filters_already_exist;
+
+ return Failure(
+ "The ARP packet filter from " + veth(pid) +
+ " to host " + eth0 + " already exists");
+ }
+
+ // Setup filters for ICMP and ARP packets. We mirror ICMP and ARP
+ // packets from host eth0 to veths of all the containers. We also
+ // setup flow classifiers for host eth0 egress.
+ set<string> targets;
+ foreachvalue (Info* info, infos) {
+ if (info->pid.isSome()) {
+ targets.insert(veth(info->pid.get()));
+ }
+ }
+
+ if (targets.size() == 1) {
+ // We just create the first container in which case we should
+ // create filters for ICMP and ARP packets.
+
+ // Create a new ICMP filter on host eth0 ingress for mirroring
+ // packets from host eth0 to veth.
+ Try<bool> icmpEth0ToVeth = filter::icmp::create(
+ eth0,
+ ingress::HANDLE,
+ icmp::Classifier(hostIPNetwork.address()),
+ Priority(ICMP_FILTER_PRIORITY, NORMAL),
+ action::Mirror(targets));
+
+ if (icmpEth0ToVeth.isError()) {
+ ++metrics.adding_eth0_icmp_filters_errors;
+
+ return Failure(
+ "Failed to create an ICMP packet filter from host " + eth0 +
+ " to " + veth(pid) + ": " + icmpEth0ToVeth.error());
+ } else if (!icmpEth0ToVeth.get()) {
+ ++metrics.adding_eth0_icmp_filters_already_exist;
+
+ return Failure(
+ "The ICMP packet filter on host " + eth0 + " already exists");
+ }
+
+ // Create a new ARP filter on host eth0 ingress for mirroring
+ // packets from host eth0 to veth.
+ Try<bool> arpEth0ToVeth = filter::basic::create(
+ eth0,
+ ingress::HANDLE,
+ ETH_P_ARP,
+ Priority(ARP_FILTER_PRIORITY, NORMAL),
+ action::Mirror(targets));
+
+ if (arpEth0ToVeth.isError()) {
+ ++metrics.adding_eth0_arp_filters_errors;
+
+ return Failure(
+ "Failed to create an ARP packet filter from host " + eth0 +
+ " to " + veth(pid) + ": " + arpEth0ToVeth.error());
+ } else if (!arpEth0ToVeth.get()) {
+ ++metrics.adding_eth0_arp_filters_already_exist;
+
+ return Failure(
+ "The ARP packet filter on host " + eth0 + " already exists");
+ }
+
+ if (flags.egress_unique_flow_per_container) {
+ // Create a new ICMP filter on host eth0 egress for classifying
+ // packets into a reserved flow.
+ Try<bool> icmpEth0Egress = filter::icmp::create(
+ eth0,
+ HOST_TX_FQ_CODEL_HANDLE,
+ icmp::Classifier(None()),
+ Priority(ICMP_FILTER_PRIORITY, NORMAL),
+ Handle(HOST_TX_FQ_CODEL_HANDLE, ICMP_FLOWID));
+
+ if (icmpEth0Egress.isError()) {
+ ++metrics.adding_eth0_egress_filters_errors;
+
+ return Failure(
+ "Failed to create the ICMP flow classifier on host " +
+ eth0 + ": " + icmpEth0Egress.error());
+ } else if (!icmpEth0Egress.get()) {
+ ++metrics.adding_eth0_egress_filters_already_exist;
+
+ return Failure(
+ "The ICMP flow classifier on host " + eth0 + " already exists");
+ }
+
+ // Create a new ARP filter on host eth0 egress for classifying
+ // packets into a reserved flow.
+ Try<bool> arpEth0Egress = filter::basic::create(
+ eth0,
+ HOST_TX_FQ_CODEL_HANDLE,
+ ETH_P_ARP,
+ Priority(ARP_FILTER_PRIORITY, NORMAL),
+ Handle(HOST_TX_FQ_CODEL_HANDLE, ARP_FLOWID));
+
+ if (arpEth0Egress.isError()) {
+ ++metrics.adding_eth0_egress_filters_errors;
+
+ return Failure(
+ "Failed to create the ARP flow classifier on host " +
+ eth0 + ": " + arpEth0Egress.error());
+ } else if (!arpEth0Egress.get()) {
+ ++metrics.adding_eth0_egress_filters_already_exist;
+
+ return Failure(
+ "The ARP flow classifier on host " + eth0 + " already exists");
+ }
+
+ // Rest of the host packets go to a reserved flow.
+ Try<bool> defaultEth0Egress = filter::basic::create(
+ eth0,
+ HOST_TX_FQ_CODEL_HANDLE,
+ ETH_P_ALL,
+ Priority(DEFAULT_FILTER_PRIORITY, NORMAL),
+ Handle(HOST_TX_FQ_CODEL_HANDLE, HOST_FLOWID));
+
+ if (defaultEth0Egress.isError()) {
+ ++met
<TRUNCATED>