You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@mesos.apache.org by ji...@apache.org on 2015/10/26 19:40:34 UTC
[10/12] mesos git commit: Relocated MesosContainerizer specific files
to the correct location.
http://git-wip-us.apache.org/repos/asf/mesos/blob/9a722d74/src/slave/containerizer/isolators/network/port_mapping.cpp
----------------------------------------------------------------------
diff --git a/src/slave/containerizer/isolators/network/port_mapping.cpp b/src/slave/containerizer/isolators/network/port_mapping.cpp
deleted file mode 100644
index e6bb75e..0000000
--- a/src/slave/containerizer/isolators/network/port_mapping.cpp
+++ /dev/null
@@ -1,3792 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-#include <limits.h>
-#include <string.h>
-#include <unistd.h>
-
-#include <iostream>
-#include <vector>
-
-#include <glog/logging.h>
-
-#include <mesos/mesos.hpp>
-
-#include <process/collect.hpp>
-#include <process/defer.hpp>
-#include <process/io.hpp>
-#include <process/pid.hpp>
-#include <process/subprocess.hpp>
-
-#include <stout/error.hpp>
-#include <stout/foreach.hpp>
-#include <stout/fs.hpp>
-#include <stout/hashset.hpp>
-#include <stout/json.hpp>
-#include <stout/lambda.hpp>
-#include <stout/mac.hpp>
-#include <stout/multihashmap.hpp>
-#include <stout/numify.hpp>
-#include <stout/os.hpp>
-#include <stout/option.hpp>
-#include <stout/path.hpp>
-#include <stout/protobuf.hpp>
-#include <stout/result.hpp>
-#include <stout/stringify.hpp>
-#include <stout/strings.hpp>
-#include <stout/utils.hpp>
-
-#include <stout/os/exists.hpp>
-#include <stout/os/stat.hpp>
-
-#include "common/status_utils.hpp"
-
-#include "linux/fs.hpp"
-#include "linux/ns.hpp"
-
-#include "linux/routing/route.hpp"
-#include "linux/routing/utils.hpp"
-
-#include "linux/routing/diagnosis/diagnosis.hpp"
-
-#include "linux/routing/filter/basic.hpp"
-#include "linux/routing/filter/icmp.hpp"
-#include "linux/routing/filter/ip.hpp"
-
-#include "linux/routing/handle.hpp"
-
-#include "linux/routing/link/link.hpp"
-
-#include "linux/routing/queueing/fq_codel.hpp"
-#include "linux/routing/queueing/htb.hpp"
-#include "linux/routing/queueing/ingress.hpp"
-#include "linux/routing/queueing/statistics.hpp"
-
-#include "mesos/resources.hpp"
-
-#include "slave/constants.hpp"
-
-#include "slave/containerizer/isolators/network/port_mapping.hpp"
-
-using namespace mesos::internal;
-
-using namespace process;
-
-using namespace routing;
-using namespace routing::filter;
-using namespace routing::queueing;
-using namespace routing::queueing::statistics;
-
-using std::cerr;
-using std::cout;
-using std::dec;
-using std::endl;
-using std::hex;
-using std::list;
-using std::ostringstream;
-using std::set;
-using std::sort;
-using std::string;
-using std::vector;
-
-using filter::ip::PortRange;
-
-using mesos::slave::ContainerLimitation;
-using mesos::slave::ContainerPrepareInfo;
-using mesos::slave::ContainerState;
-using mesos::slave::Isolator;
-
-// An old glibc might not have this symbol.
-#ifndef MNT_DETACH
-#define MNT_DETACH 2
-#endif
-
-namespace mesos {
-namespace internal {
-namespace slave {
-
-// The minimum number of ephemeral ports a container should have.
-static const uint16_t MIN_EPHEMERAL_PORTS_SIZE = 16;
-
-// Linux traffic control is a combination of queueing disciplines,
-// filters and classes organized as a tree for the ingress (tx) and
-// egress (rx) flows for each interface. Each container provides two
-// networking interfaces, a virtual eth0 and a loopback interface. The
-// flow of packets from the external network to container is shown
-// below:
-//
-// +----------------------+----------------------+
-// | Container |
-// |----------------------|----------------------|
-// | eth0 | lo |
-// +----------------------+----------------------+
-// ^ | ^ |
-// [3] | | [4] | |
-// | | [7] +-----------+ [10]
-// | |
-// | | [8] +-----------+ [9]
-// [2] | | [5] | |
-// | v v v
-// +----------------------+----------------------+
-// | veth0 | lo |
-// +----------------------|----------------------+
-// | Host |
-// |----------------------|----------------------|
-// | eth0 |
-// +----------------------+----------------------|
-// ^ |
-// [1] | | [6]
-// | v
-//
-// Traffic flowing from outside the network into a container enters
-// the system via the host ingress interface [1] and is routed based
-// on destination port to the outbound interface for the matching
-// container [2], which forwards the packet to the container's inbound
-// virtual interface. Outbound traffic destined for the external
-// network flows along the reverse path [4,5,6]. Loopback traffic is
-// directed to the corresponding Ethernet interface, either [7,10] or
-// [8,9] where the same destination port routing can be applied as to
-// external traffic. We use traffic control filters at several of the
-// interfaces to create these packet paths.
-//
-// Linux provides only a very simple topology for ingress interfaces.
-// A root is provided on a fixed handle (handle::INGRESS_ROOT) under
-// which a single qdisc can be installed, with handle ingress::HANDLE.
-// Traffic control filters can then be attached to the ingress qdisc.
-// We install one or more ingress filters on the host eth0 [1] to
-// direct traffic to the correct container, and on the container
-// virtual eth0 [5] to direct traffic to other containers or out of
-// the box. Since we know the ip port assignments for each container,
-// we can direct traffic directly to the appropriate container.
-// However, for ICMP and ARP traffic where no equivalent to a port
-// exists, we send a copy of the packet to every container and rely on
-// the network stack to drop unexpected packets.
-//
-// We install a Hierarchical Token Bucket (HTB) qdisc and class to
-// limit the outbound traffic bandwidth as the egress qdisc inside the
-// container [4] and then add a fq_codel qdisc to limit head of line
-// blocking on the egress filter. The egress traffic control chain is
-// thus:
-//
-// root device: handle::EGRESS_ROOT ->
-// htb egress qdisc: CONTAINER_TX_HTB_HANDLE ->
-// htb rate limiting class: CONTAINER_TX_HTB_CLASS_ID ->
-// buffer-bloat reduction: FQ_CODEL
-constexpr Handle CONTAINER_TX_HTB_HANDLE = Handle(1, 0);
-constexpr Handle CONTAINER_TX_HTB_CLASS_ID =
- Handle(CONTAINER_TX_HTB_HANDLE, 1);
-
-
-// Finally we create a second fq_codel qdisc on the public interface
-// of the host [6] to reduce performance interference between
-// containers. We create independent flows for each container, and
-// one for the host, which ensures packets from each container are
-// guaranteed fair access to the host interface. This egress traffic
-// control chain for the host interface is thus:
-//
-// root device: handle::EGRESS_ROOT ->
-// buffer-bloat reduction: FQ_CODEL
-constexpr Handle HOST_TX_FQ_CODEL_HANDLE = Handle(1, 0);
-
-
-// The primary priority used by each type of filter.
-static const uint8_t ARP_FILTER_PRIORITY = 1;
-static const uint8_t ICMP_FILTER_PRIORITY = 2;
-static const uint8_t IP_FILTER_PRIORITY = 3;
-static const uint8_t DEFAULT_FILTER_PRIORITY = 4;
-
-
-// The secondary priorities used by filters.
-static const uint8_t HIGH = 1;
-static const uint8_t NORMAL = 2;
-static const uint8_t LOW = 3;
-
-
-// We assign a separate flow on host eth0 egress for each container
-// (See MESOS-2422 for details). Host egress traffic is assigned to a
-// reserved flow (HOST_FLOWID). ARP and ICMP traffic from containers
-// are not heavy, so they can share the same flow.
-static const uint16_t HOST_FLOWID = 1;
-static const uint16_t ARP_FLOWID = 2;
-static const uint16_t ICMP_FLOWID = 2;
-static const uint16_t CONTAINER_MIN_FLOWID = 3;
-
-
-// The well known ports. Used for sanity check.
-static Interval<uint16_t> WELL_KNOWN_PORTS()
-{
- return (Bound<uint16_t>::closed(0), Bound<uint16_t>::open(1024));
-}
-
-
-/////////////////////////////////////////////////
-// Helper functions for the isolator.
-/////////////////////////////////////////////////
-
-// Given an integer x, find the largest integer t such that t <= x and
-// t is aligned to power of 2.
-static uint32_t roundDownToPowerOfTwo(uint32_t x)
-{
- // Mutate x from 00001XXX to 0x00001111.
-
- // We know the MSB has to be a 1, so kill the LSB and make sure the
- // first 2 most significant bits are 1s.
- x = x | (x >> 1);
-
- // Now that the 2 most significant bits are 1s, make sure the first
- // 4 most significant bits are 1s, too.
- x = x | (x >> 2);
-
- // We keep going. Note that the 0s left to the MSB are never turned
- // to 1s.
- x = x | (x >> 4);
- x = x | (x >> 8);
-
- // Now we have covered all 32 bits.
- x = x | (x >> 16);
-
- // 0x00001111 - (0x00001111 >> 1)
- return x - (x >> 1);
-}
-
-
-// Returns the name of the host end of the virtual ethernet pair for a
-// given container. The kernel restricts link name to 16 characters or
-// less, so we cannot put container ID into the device name. Instead,
-// we use the pid of the executor process forked by the slave to
-// uniquely name the device for each container. It's safe because we
-// cannot have two active containers having the same pid for the
-// executor process.
-static string veth(pid_t pid)
-{
- return PORT_MAPPING_VETH_PREFIX() + stringify(pid);
-}
-
-
-// Extracts the pid from the given veth name.
-static Option<pid_t> getPidFromVeth(const string& veth)
-{
- if (strings::startsWith(veth, PORT_MAPPING_VETH_PREFIX())) {
- Try<pid_t> pid = numify<pid_t>(
- strings::remove(veth, PORT_MAPPING_VETH_PREFIX(), strings::PREFIX));
-
- if (pid.isSome()) {
- return pid.get();
- }
- }
-
- return None();
-}
-
-
-// Extracts the container ID from the symlink that points to the
-// network namespace handle. The following is the layout of the bind
-// mount root and bind mount symlink root:
-// <PORT_MAPPING_BIND_MOUNT_ROOT()>
-// |--- 3945 (pid) <-|
-// |
-// <PORT_MAPPING_BIND_MOUNT_SYMLINK_ROOT()> |
-// |--- ecf293e7-e6e8-4cbc-aaee-4d6c958aa276 --|
-// (symlink: container ID -> pid)
-static Try<ContainerID> getContainerIdFromSymlink(const string& symlink)
-{
- if (!os::stat::islink(symlink)) {
- return Error("Not a symlink");
- }
-
- string _containerId = Path(symlink).basename();
-
- ContainerID containerId;
- containerId.set_value(_containerId);
-
- return containerId;
-}
-
-
-// Extracts the pid from the network namespace handle. Returns None if
-// the handle is clearly not created by us.
-static Result<pid_t> getPidFromNamespaceHandle(const string& handle)
-{
- if (os::stat::islink(handle)) {
- return Error("Not expecting a symlink");
- }
-
- string _pid = Path(handle).basename();
-
- Try<pid_t> pid = numify<pid_t>(_pid);
- if (pid.isError()) {
- return None();
- }
-
- return pid.get();
-}
-
-
-// Extracts the pid from the symlink that points to the network
-// namespace handle. Returns None if it's a dangling symlink.
-static Result<pid_t> getPidFromSymlink(const string& symlink)
-{
- if (!os::stat::islink(symlink)) {
- return Error("Not a symlink");
- }
-
- Result<string> target = os::realpath(symlink);
- if (target.isError()) {
- return Error("Failed to follow the symlink: " + target.error());
- } else if (target.isNone()) {
- // This is a dangling symlink.
- return None();
- }
-
- return getPidFromNamespaceHandle(target.get());
-}
-
-
-static string getSymlinkPath(const ContainerID& containerId)
-{
- return path::join(
- PORT_MAPPING_BIND_MOUNT_SYMLINK_ROOT(),
- stringify(containerId));
-}
-
-
-static string getNamespaceHandlePath(pid_t pid)
-{
- return path::join(
- PORT_MAPPING_BIND_MOUNT_ROOT(),
- stringify(pid));
-}
-
-
-// Converts from value ranges to interval set.
-static IntervalSet<uint16_t> getIntervalSet(const Value::Ranges& ranges)
-{
- IntervalSet<uint16_t> set;
-
- for (int i = 0; i < ranges.range_size(); i++) {
- set += (Bound<uint16_t>::closed(ranges.range(i).begin()),
- Bound<uint16_t>::closed(ranges.range(i).end()));
- }
-
- return set;
-}
-
-/////////////////////////////////////////////////
-// Implementation for PortMappingUpdate.
-/////////////////////////////////////////////////
-
-const char* PortMappingUpdate::NAME = "update";
-
-
-PortMappingUpdate::Flags::Flags()
-{
- add(ð0_name,
- "eth0_name",
- "The name of the public network interface (e.g., eth0)");
-
- add(&lo_name,
- "lo_name",
- "The name of the loopback network interface (e.g., lo)");
-
- add(&pid,
- "pid",
- "The pid of the process whose namespaces we will enter");
-
- add(&ports_to_add,
- "ports_to_add",
- "A collection of port ranges (formatted as a JSON object)\n"
- "for which to add IP filters. E.g.,\n"
- "--ports_to_add={\"range\":[{\"begin\":4,\"end\":8}]}");
-
- add(&ports_to_remove,
- "ports_to_remove",
- "A collection of port ranges (formatted as a JSON object)\n"
- "for which to remove IP filters. E.g.,\n"
- "--ports_to_remove={\"range\":[{\"begin\":4,\"end\":8}]}");
-}
-
-
-// The following two helper functions allow us to convert from a
-// collection of port ranges to a JSON object and vice versa. They
-// will be used for the port mapping update operation.
-template <typename Iterable>
-JSON::Object json(const Iterable& ranges)
-{
- Value::Ranges values;
- foreach (const PortRange& range, ranges) {
- Value::Range value;
- value.set_begin(range.begin());
- value.set_end(range.end());
-
- values.add_range()->CopyFrom(value);
- }
- return JSON::Protobuf(values);
-}
-
-
-static Try<vector<PortRange>> parse(const JSON::Object& object)
-{
- Try<Value::Ranges> parsing = protobuf::parse<Value::Ranges>(object);
- if (parsing.isError()) {
- return Error("Failed to parse JSON: " + parsing.error());
- }
-
- vector<PortRange> ranges;
- Value::Ranges values = parsing.get();
- for (int i = 0; i < values.range_size(); i++) {
- const Value::Range& value = values.range(i);
- Try<PortRange> range = PortRange::fromBeginEnd(value.begin(), value.end());
- if (range.isError()) {
- return Error("Invalid port range: " + range.error());
- }
-
- ranges.push_back(range.get());
- }
- return ranges;
-}
-
-
-// Helper function to set up IP filters inside the container for a
-// given port range.
-static Try<Nothing> addContainerIPFilters(
- const PortRange& range,
- const string& eth0,
- const string& lo)
-{
- // Add an IP packet filter on lo such that local traffic inside a
- // container will not be redirected to eth0.
- Try<bool> loTerminal = filter::ip::create(
- lo,
- ingress::HANDLE,
- ip::Classifier(None(), None(), None(), range),
- Priority(IP_FILTER_PRIORITY, HIGH),
- action::Terminal());
-
- if (loTerminal.isError()) {
- return Error(
- "Failed to create an IP packet filter on " + lo +
- " which stops packets from being sent to " + eth0 +
- ": " + loTerminal.error());
- } else if (!loTerminal.get()) {
- return Error(
- "The IP packet filter on " + lo +
- " which stops packets from being sent to " +
- eth0 + " already exists");
- }
-
- // Add an IP packet filter (for loopback IP) from eth0 to lo to
- // redirect all loopback IP traffic to lo.
- Try<bool> eth0ToLoLoopback = filter::ip::create(
- eth0,
- ingress::HANDLE,
- ip::Classifier(
- None(),
- net::IPNetwork::LOOPBACK_V4().address(),
- None(),
- range),
- Priority(IP_FILTER_PRIORITY, NORMAL),
- action::Redirect(lo));
-
- if (eth0ToLoLoopback.isError()) {
- return Error(
- "Failed to create an IP packet filter (for loopback IP) from " +
- eth0 + " to " + lo + ": " + eth0ToLoLoopback.error());
- } else if (!eth0ToLoLoopback.get()) {
- return Error(
- "The IP packet filter (for loopback IP) from " +
- eth0 + " to " + lo + " already exists");
- }
-
- return Nothing();
-}
-
-
-// Helper function to remove IP filters inside the container for a
-// given port range.
-static Try<Nothing> removeContainerIPFilters(
- const PortRange& range,
- const string& eth0,
- const string& lo)
-{
- // Remove the 'terminal' IP packet filter on lo.
- Try<bool> loTerminal = filter::ip::remove(
- lo,
- ingress::HANDLE,
- ip::Classifier(None(), None(), None(), range));
-
- if (loTerminal.isError()) {
- return Error(
- "Failed to remove the IP packet filter on " + lo +
- " which stops packets from being sent to " + eth0 +
- ": " + loTerminal.error());
- } else if (!loTerminal.get()) {
- return Error(
- "The IP packet filter on " + lo +
- " which stops packets from being sent to " + eth0 +
- " does not exist");
- }
-
- // Remove the IP packet filter (for loopback IP) from eth0 to lo.
- Try<bool> eth0ToLoLoopback = filter::ip::remove(
- eth0,
- ingress::HANDLE,
- ip::Classifier(
- None(),
- net::IPNetwork::LOOPBACK_V4().address(),
- None(),
- range));
-
- if (eth0ToLoLoopback.isError()) {
- return Error(
- "Failed to remove the IP packet filter (for loopback IP) from " +
- eth0 + " to " + lo + ": " + eth0ToLoLoopback.error());
- } else if (!eth0ToLoLoopback.get()) {
- return Error(
- "The IP packet filter (for loopback IP) from " +
- eth0 + " to " + lo + " does not exist");
- }
-
- return Nothing();
-}
-
-
-int PortMappingUpdate::execute()
-{
- if (flags.help) {
- cerr << "Usage: " << name() << " [OPTIONS]" << endl << endl
- << "Supported options:" << endl
- << flags.usage();
- return 0;
- }
-
- if (flags.eth0_name.isNone()) {
- cerr << "The public interface name (e.g., eth0) is not specified" << endl;
- return 1;
- }
-
- if (flags.lo_name.isNone()) {
- cerr << "The loopback interface name (e.g., lo) is not specified" << endl;
- return 1;
- }
-
- if (flags.pid.isNone()) {
- cerr << "The pid is not specified" << endl;
- return 1;
- }
-
- if (flags.ports_to_add.isNone() && flags.ports_to_remove.isNone()) {
- cerr << "Nothing to update" << endl;
- return 1;
- }
-
- Option<vector<PortRange>> portsToAdd;
- Option<vector<PortRange>> portsToRemove;
-
- if (flags.ports_to_add.isSome()) {
- Try<vector<PortRange>> parsing = parse(flags.ports_to_add.get());
- if (parsing.isError()) {
- cerr << "Parsing 'ports_to_add' failed: " << parsing.error() << endl;
- return 1;
- }
- portsToAdd = parsing.get();
- }
-
- if (flags.ports_to_remove.isSome()) {
- Try<vector<PortRange>> parsing = parse(flags.ports_to_remove.get());
- if (parsing.isError()) {
- cerr << "Parsing 'ports_to_remove' failed: " << parsing.error() << endl;
- return 1;
- }
- portsToRemove = parsing.get();
- }
-
- // Enter the network namespace.
- Try<Nothing> setns = ns::setns(flags.pid.get(), "net");
- if (setns.isError()) {
- cerr << "Failed to enter the network namespace of pid " << flags.pid.get()
- << ": " << setns.error() << endl;
- return 1;
- }
-
- // Update IP packet filters.
- const string eth0 = flags.eth0_name.get();
- const string lo = flags.lo_name.get();
-
- if (portsToAdd.isSome()) {
- foreach (const PortRange& range, portsToAdd.get()) {
- Try<Nothing> add = addContainerIPFilters(range, eth0, lo);
- if (add.isError()) {
- cerr << "Failed to add IP filters: " << add.error() << endl;
- return 1;
- }
- }
- }
-
- if (portsToRemove.isSome()) {
- foreach (const PortRange& range, portsToRemove.get()) {
- Try<Nothing> remove = removeContainerIPFilters(range, eth0, lo);
- if (remove.isError()) {
- cerr << "Failed to remove IP filters: " << remove.error() << endl;
- return 1;
- }
- }
- }
-
- return 0;
-}
-
-/////////////////////////////////////////////////
-// Implementation for PortMappingStatistics.
-/////////////////////////////////////////////////
-
-const char* PortMappingStatistics::NAME = "statistics";
-
-
-PortMappingStatistics::Flags::Flags()
-{
- add(ð0_name,
- "eth0_name",
- "The name of the public network interface (e.g., eth0)");
-
- add(&pid,
- "pid",
- "The pid of the process whose namespaces we will enter");
-
- add(&enable_socket_statistics_summary,
- "enable_socket_statistics_summary",
- "Whether to collect socket statistics summary for this container\n",
- false);
-
- add(&enable_socket_statistics_details,
- "enable_socket_statistics_details",
- "Whether to collect socket statistics details (e.g., TCP RTT)\n"
- "for this container.",
- false);
-}
-
-
-// A helper that copies the traffic control statistics from the
-// statistics hashmap into the ResourceStatistics protocol buffer.
-static void addTrafficControlStatistics(
- const string& id,
- const hashmap<string, uint64_t>& statistics,
- ResourceStatistics* result)
-{
- TrafficControlStatistics *tc = result->add_net_traffic_control_statistics();
-
- tc->set_id(id);
-
- // TODO(pbrett) Use protobuf reflection here.
- if (statistics.contains(BACKLOG)) {
- tc->set_backlog(statistics.at(BACKLOG));
- }
- if (statistics.contains(BYTES)) {
- tc->set_bytes(statistics.at(BYTES));
- }
- if (statistics.contains(DROPS)) {
- tc->set_drops(statistics.at(DROPS));
- }
- if (statistics.contains(OVERLIMITS)) {
- tc->set_overlimits(statistics.at(OVERLIMITS));
- }
- if (statistics.contains(PACKETS)) {
- tc->set_packets(statistics.at(PACKETS));
- }
- if (statistics.contains(QLEN)) {
- tc->set_qlen(statistics.at(QLEN));
- }
- if (statistics.contains(RATE_BPS)) {
- tc->set_ratebps(statistics.at(RATE_BPS));
- }
- if (statistics.contains(RATE_PPS)) {
- tc->set_ratepps(statistics.at(RATE_PPS));
- }
- if (statistics.contains(REQUEUES)) {
- tc->set_requeues(statistics.at(REQUEUES));
- }
-}
-
-
-int PortMappingStatistics::execute()
-{
- if (flags.help) {
- cerr << "Usage: " << name() << " [OPTIONS]" << endl << endl
- << "Supported options:" << endl
- << flags.usage();
- return 0;
- }
-
- if (flags.pid.isNone()) {
- cerr << "The pid is not specified" << endl;
- return 1;
- }
-
- if (flags.eth0_name.isNone()) {
- cerr << "The public interface name (e.g., eth0) is not specified" << endl;
- return 1;
- }
-
- // Enter the network namespace.
- Try<Nothing> setns = ns::setns(flags.pid.get(), "net");
- if (setns.isError()) {
- // This could happen if the executor exits before this function is
- // invoked. We do not log here to avoid spurious logging.
- return 1;
- }
-
- ResourceStatistics result;
-
- // NOTE: We use a dummy value here since this field will be cleared
- // before the result is sent to the containerizer.
- result.set_timestamp(0);
-
- if (flags.enable_socket_statistics_summary) {
- // Collections for socket statistics summary are below.
-
- // For TCP, get the number of ACTIVE and TIME_WAIT connections,
- // from reading /proc/net/sockstat (/proc/net/sockstat6 for IPV6).
- // This is not as expensive in the kernel because only counter
- // values are accessed instead of a dump of all the sockets.
- // Example output:
-
- // $ cat /proc/net/sockstat
- // sockets: used 1391
- // TCP: inuse 33 orphan 0 tw 0 alloc 37 mem 6
- // UDP: inuse 15 mem 7
- // UDPLITE: inuse 0
- // RAW: inuse 0
- // FRAG: inuse 0 memory 0
-
- Try<string> value = os::read("/proc/net/sockstat");
- if (value.isError()) {
- cerr << "Failed to read /proc/net/sockstat: " << value.error() << endl;
- return 1;
- }
-
- foreach (const string& line, strings::tokenize(value.get(), "\n")) {
- if (!strings::startsWith(line, "TCP")) {
- continue;
- }
-
- vector<string> tokens = strings::tokenize(line, " ");
- for (size_t i = 0; i < tokens.size(); i++) {
- if (tokens[i] == "inuse") {
- if (i + 1 >= tokens.size()) {
- cerr << "Unexpected output from /proc/net/sockstat" << endl;
- // Be a bit forgiving here here since the /proc file
- // output format can change, though not very likely.
- continue;
- }
-
- // Set number of active TCP connections.
- Try<size_t> inuse = numify<size_t>(tokens[i+1]);
- if (inuse.isError()) {
- cerr << "Failed to parse the number of tcp connections in use: "
- << inuse.error() << endl;
- continue;
- }
-
- result.set_net_tcp_active_connections(inuse.get());
- } else if (tokens[i] == "tw") {
- if (i + 1 >= tokens.size()) {
- cerr << "Unexpected output from /proc/net/sockstat" << endl;
- // Be a bit forgiving here here since the /proc file
- // output format can change, though not very likely.
- continue;
- }
-
- // Set number of TIME_WAIT TCP connections.
- Try<size_t> tw = numify<size_t>(tokens[i+1]);
- if (tw.isError()) {
- cerr << "Failed to parse the number of tcp connections in"
- << " TIME_WAIT: " << tw.error() << endl;
- continue;
- }
-
- result.set_net_tcp_time_wait_connections(tw.get());
- }
- }
- }
- }
-
- if (flags.enable_socket_statistics_details) {
- // Collections for socket statistics details are below.
-
- // NOTE: If the underlying library uses the older version of
- // kernel API, the family argument passed in may not be honored.
- Try<vector<diagnosis::socket::Info>> infos =
- diagnosis::socket::infos(AF_INET, diagnosis::socket::state::ALL);
-
- if (infos.isError()) {
- cerr << "Failed to retrieve the socket information" << endl;
- return 1;
- }
-
- vector<uint32_t> RTTs;
- foreach (const diagnosis::socket::Info& info, infos.get()) {
- // We double check on family regardless.
- if (info.family != AF_INET) {
- continue;
- }
-
- // We consider all sockets that have non-zero rtt value.
- if (info.tcpInfo.isSome() && info.tcpInfo.get().tcpi_rtt != 0) {
- RTTs.push_back(info.tcpInfo.get().tcpi_rtt);
- }
- }
-
- // Only print to stdout when we have results.
- if (RTTs.size() > 0) {
- std::sort(RTTs.begin(), RTTs.end());
-
- // NOTE: The size of RTTs is usually within 1 million so we
- // don't need to worry about overflow here.
- // TODO(jieyu): Right now, we choose to use "Nearest rank" for
- // simplicity. Consider directly using the Statistics abstraction
- // which computes "Linear interpolation between closest ranks".
- // http://en.wikipedia.org/wiki/Percentile
- size_t p50 = RTTs.size() * 50 / 100;
- size_t p90 = RTTs.size() * 90 / 100;
- size_t p95 = RTTs.size() * 95 / 100;
- size_t p99 = RTTs.size() * 99 / 100;
-
- result.set_net_tcp_rtt_microsecs_p50(RTTs[p50]);
- result.set_net_tcp_rtt_microsecs_p90(RTTs[p90]);
- result.set_net_tcp_rtt_microsecs_p95(RTTs[p95]);
- result.set_net_tcp_rtt_microsecs_p99(RTTs[p99]);
- }
- }
-
- // Collect traffic statistics for the container from the container
- // virtual interface and export them in JSON.
- const string& eth0 = flags.eth0_name.get();
-
- // Overlimits are reported on the HTB qdisc at the egress root.
- Result<hashmap<string, uint64_t>> statistics =
- htb::statistics(eth0, EGRESS_ROOT);
-
- if (statistics.isSome()) {
- addTrafficControlStatistics(
- NET_ISOLATOR_BW_LIMIT,
- statistics.get(),
- &result);
- } else if (statistics.isNone()) {
- // Traffic control statistics are only available when the
- // container is created on a slave when the egress rate limit is
- // on (i.e., egress_rate_limit_per_container flag is set). We
- // can't just test for that flag here however, since the slave may
- // have been restarted with different flags since the container
- // was created. It is also possible that isolator statistics are
- // unavailable because we the container is in the process of being
- // created or destroy. Hence we do not report a lack of network
- // statistics as an error.
- } else if (statistics.isError()) {
- cerr << "Failed to get htb qdisc statistics on " << eth0
- << " in namespace " << flags.pid.get() << endl;
- }
-
- // Drops due to the bandwidth limit should be reported at the leaf.
- statistics = fq_codel::statistics(eth0, CONTAINER_TX_HTB_CLASS_ID);
- if (statistics.isSome()) {
- addTrafficControlStatistics(
- NET_ISOLATOR_BLOAT_REDUCTION,
- statistics.get(),
- &result);
- } else if (statistics.isNone()) {
- // See discussion on network isolator statistics above.
- } else if (statistics.isError()) {
- cerr << "Failed to get fq_codel qdisc statistics on " << eth0
- << " in namespace " << flags.pid.get() << endl;
- }
-
- cout << stringify(JSON::Protobuf(result));
- return 0;
-}
-
-
-/////////////////////////////////////////////////
-// Implementation for the isolator.
-/////////////////////////////////////////////////
-
-PortMappingIsolatorProcess::Metrics::Metrics()
- : adding_eth0_ip_filters_errors(
- "port_mapping/adding_eth0_ip_filters_errors"),
- adding_eth0_ip_filters_already_exist(
- "port_mapping/adding_eth0_ip_filters_already_exist"),
- adding_eth0_egress_filters_errors(
- "port_mapping/adding_eth0_egress_filters_errors"),
- adding_eth0_egress_filters_already_exist(
- "port_mapping/adding_eth0_egress_filters_already_exist"),
- adding_lo_ip_filters_errors(
- "port_mapping/adding_lo_ip_filters_errors"),
- adding_lo_ip_filters_already_exist(
- "port_mapping/adding_lo_ip_filters_already_exist"),
- adding_veth_ip_filters_errors(
- "port_mapping/adding_veth_ip_filters_errors"),
- adding_veth_ip_filters_already_exist(
- "port_mapping/adding_veth_ip_filters_already_exist"),
- adding_veth_icmp_filters_errors(
- "port_mapping/adding_veth_icmp_filters_errors"),
- adding_veth_icmp_filters_already_exist(
- "port_mapping/adding_veth_icmp_filters_already_exist"),
- adding_veth_arp_filters_errors(
- "port_mapping/adding_veth_arp_filters_errors"),
- adding_veth_arp_filters_already_exist(
- "port_mapping/adding_veth_arp_filters_already_exist"),
- adding_eth0_icmp_filters_errors(
- "port_mapping/adding_eth0_icmp_filters_errors"),
- adding_eth0_icmp_filters_already_exist(
- "port_mapping/adding_eth0_icmp_filters_already_exist"),
- adding_eth0_arp_filters_errors(
- "port_mapping/adding_eth0_arp_filters_errors"),
- adding_eth0_arp_filters_already_exist(
- "port_mapping/adding_eth0_arp_filters_already_exist"),
- removing_eth0_ip_filters_errors(
- "port_mapping/removing_eth0_ip_filters_errors"),
- removing_eth0_ip_filters_do_not_exist(
- "port_mapping/removing_eth0_ip_filters_do_not_exist"),
- removing_eth0_egress_filters_errors(
- "port_mapping/removing_eth0_egress_filters_errors"),
- removing_eth0_egress_filters_do_not_exist(
- "port_mapping/removinging_eth0_egress_filters_do_not_exist"),
- removing_lo_ip_filters_errors(
- "port_mapping/removing_lo_ip_filters_errors"),
- removing_lo_ip_filters_do_not_exist(
- "port_mapping/removing_lo_ip_filters_do_not_exist"),
- removing_veth_ip_filters_errors(
- "port_mapping/removing_veth_ip_filters_errors"),
- removing_veth_ip_filters_do_not_exist(
- "port_mapping/removing_veth_ip_filters_do_not_exist"),
- removing_eth0_icmp_filters_errors(
- "port_mapping/removing_eth0_icmp_filters_errors"),
- removing_eth0_icmp_filters_do_not_exist(
- "port_mapping/removing_eth0_icmp_filters_do_not_exist"),
- removing_eth0_arp_filters_errors(
- "port_mapping/removing_eth0_arp_filters_errors"),
- removing_eth0_arp_filters_do_not_exist(
- "port_mapping/removing_eth0_arp_filters_do_not_exist"),
- updating_eth0_icmp_filters_errors(
- "port_mapping/updating_eth0_icmp_filters_errors"),
- updating_eth0_icmp_filters_already_exist(
- "port_mapping/updating_eth0_icmp_filters_already_exist"),
- updating_eth0_icmp_filters_do_not_exist(
- "port_mapping/updating_eth0_icmp_filters_do_not_exist"),
- updating_eth0_arp_filters_errors(
- "port_mapping/updating_eth0_arp_filters_errors"),
- updating_eth0_arp_filters_already_exist(
- "port_mapping/updating_eth0_arp_filters_already_exist"),
- updating_eth0_arp_filters_do_not_exist(
- "port_mapping/updating_eth0_arp_filters_do_not_exist"),
- updating_container_ip_filters_errors(
- "port_mapping/updating_container_ip_filters_errors")
-{
- process::metrics::add(adding_eth0_ip_filters_errors);
- process::metrics::add(adding_eth0_ip_filters_already_exist);
- process::metrics::add(adding_lo_ip_filters_errors);
- process::metrics::add(adding_lo_ip_filters_already_exist);
- process::metrics::add(adding_veth_ip_filters_errors);
- process::metrics::add(adding_veth_ip_filters_already_exist);
- process::metrics::add(adding_veth_icmp_filters_errors);
- process::metrics::add(adding_veth_icmp_filters_already_exist);
- process::metrics::add(adding_veth_arp_filters_errors);
- process::metrics::add(adding_veth_arp_filters_already_exist);
- process::metrics::add(adding_eth0_icmp_filters_errors);
- process::metrics::add(adding_eth0_icmp_filters_already_exist);
- process::metrics::add(adding_eth0_arp_filters_errors);
- process::metrics::add(adding_eth0_arp_filters_already_exist);
- process::metrics::add(removing_eth0_ip_filters_errors);
- process::metrics::add(removing_eth0_ip_filters_do_not_exist);
- process::metrics::add(removing_lo_ip_filters_errors);
- process::metrics::add(removing_lo_ip_filters_do_not_exist);
- process::metrics::add(removing_veth_ip_filters_errors);
- process::metrics::add(removing_veth_ip_filters_do_not_exist);
- process::metrics::add(removing_eth0_icmp_filters_errors);
- process::metrics::add(removing_eth0_icmp_filters_do_not_exist);
- process::metrics::add(removing_eth0_arp_filters_errors);
- process::metrics::add(removing_eth0_arp_filters_do_not_exist);
- process::metrics::add(updating_eth0_icmp_filters_errors);
- process::metrics::add(updating_eth0_icmp_filters_already_exist);
- process::metrics::add(updating_eth0_icmp_filters_do_not_exist);
- process::metrics::add(updating_eth0_arp_filters_errors);
- process::metrics::add(updating_eth0_arp_filters_already_exist);
- process::metrics::add(updating_eth0_arp_filters_do_not_exist);
- process::metrics::add(updating_container_ip_filters_errors);
-}
-
-
-PortMappingIsolatorProcess::Metrics::~Metrics()
-{
- process::metrics::remove(adding_eth0_ip_filters_errors);
- process::metrics::remove(adding_eth0_ip_filters_already_exist);
- process::metrics::remove(adding_lo_ip_filters_errors);
- process::metrics::remove(adding_lo_ip_filters_already_exist);
- process::metrics::remove(adding_veth_ip_filters_errors);
- process::metrics::remove(adding_veth_ip_filters_already_exist);
- process::metrics::remove(adding_veth_icmp_filters_errors);
- process::metrics::remove(adding_veth_icmp_filters_already_exist);
- process::metrics::remove(adding_veth_arp_filters_errors);
- process::metrics::remove(adding_veth_arp_filters_already_exist);
- process::metrics::remove(adding_eth0_icmp_filters_errors);
- process::metrics::remove(adding_eth0_icmp_filters_already_exist);
- process::metrics::remove(adding_eth0_arp_filters_errors);
- process::metrics::remove(adding_eth0_arp_filters_already_exist);
- process::metrics::remove(removing_eth0_ip_filters_errors);
- process::metrics::remove(removing_eth0_ip_filters_do_not_exist);
- process::metrics::remove(removing_lo_ip_filters_errors);
- process::metrics::remove(removing_lo_ip_filters_do_not_exist);
- process::metrics::remove(removing_veth_ip_filters_errors);
- process::metrics::remove(removing_veth_ip_filters_do_not_exist);
- process::metrics::remove(removing_eth0_icmp_filters_errors);
- process::metrics::remove(removing_eth0_icmp_filters_do_not_exist);
- process::metrics::remove(removing_eth0_arp_filters_errors);
- process::metrics::remove(removing_eth0_arp_filters_do_not_exist);
- process::metrics::remove(updating_eth0_icmp_filters_errors);
- process::metrics::remove(updating_eth0_icmp_filters_already_exist);
- process::metrics::remove(updating_eth0_icmp_filters_do_not_exist);
- process::metrics::remove(updating_eth0_arp_filters_errors);
- process::metrics::remove(updating_eth0_arp_filters_already_exist);
- process::metrics::remove(updating_eth0_arp_filters_do_not_exist);
- process::metrics::remove(updating_container_ip_filters_errors);
-}
-
-
-Try<Isolator*> PortMappingIsolatorProcess::create(const Flags& flags)
-{
- // Check for root permission.
- if (geteuid() != 0) {
- return Error("Using network isolator requires root permissions");
- }
-
- // Verify that the network namespace is available by checking the
- // existence of the network namespace handle of the current process.
- if (ns::namespaces().count("net") == 0) {
- return Error(
- "Using network isolator requires network namespace. "
- "Make sure your kernel is newer than 3.4");
- }
-
- // Check the routing library.
- Try<Nothing> check = routing::check();
- if (check.isError()) {
- return Error(
- "Routing library check failed: " +
- check.error());
- }
-
- // Check the availability of a few Linux commands that we will use.
- // We use the blocking os::shell here because 'create' will only be
- // invoked during initialization.
- Try<string> checkCommandTc = os::shell("tc filter show");
- if (checkCommandTc.isError()) {
- return Error("Check command 'tc' failed: " + checkCommandTc.error());
- }
-
- Try<string> checkCommandIp = os::shell("ip link show");
- if (checkCommandIp.isError()) {
- return Error("Check command 'ip' failed: " + checkCommandIp.error());
- }
-
- Try<Resources> resources = Resources::parse(
- flags.resources.getOrElse(""),
- flags.default_role);
-
- if (resources.isError()) {
- return Error("Failed to parse --resources: " + resources.error());
- }
-
- // Get 'ports' resource from 'resources' flag. These ports will be
- // treated as non-ephemeral ports.
- IntervalSet<uint16_t> nonEphemeralPorts;
- if (resources.get().ports().isSome()) {
- nonEphemeralPorts = getIntervalSet(resources.get().ports().get());
- }
-
- // Get 'ephemeral_ports' resource from 'resources' flag. These ports
- // will be allocated to each container as ephemeral ports.
- IntervalSet<uint16_t> ephemeralPorts;
- if (resources.get().ephemeral_ports().isSome()) {
- ephemeralPorts = getIntervalSet(resources.get().ephemeral_ports().get());
- }
-
- // Each container requires at least one ephemeral port for slave
- // executor communication. If no 'ephemeral_ports' resource is
- // found, we will return error.
- if (ephemeralPorts.empty()) {
- return Error("Ephemeral ports are not specified");
- }
-
- // Sanity check to make sure that the ephemeral ports specified do
- // not intersect with the specified non-ephemeral ports.
- if (ephemeralPorts.intersects(nonEphemeralPorts)) {
- return Error(
- "The specified ephemeral ports " + stringify(ephemeralPorts) +
- " intersect with the specified non-ephemeral ports " +
- stringify(nonEphemeralPorts));
- }
-
- // This is a sanity check to make sure that the ephemeral ports
- // specified do not intersect with the well known ports.
- if (ephemeralPorts.intersects(WELL_KNOWN_PORTS())) {
- return Error(
- "The specified ephemeral ports " + stringify(ephemeralPorts) +
- " intersect with well known ports " + stringify(WELL_KNOWN_PORTS()));
- }
-
- // Obtain the host ephemeral port range by reading the proc file
- // system ('ip_local_port_range').
- Try<string> value = os::read("/proc/sys/net/ipv4/ip_local_port_range");
- if (value.isError()) {
- return Error("Failed to read host ip_local_port_range: " + value.error());
- }
-
- vector<string> split = strings::split(strings::trim(value.get()), "\t");
- if (split.size() != 2) {
- return Error(
- "Unexpected format from host ip_local_port_range: " + value.get());
- }
-
- Try<uint16_t> begin = numify<uint16_t>(split[0]);
- if (begin.isError()) {
- return Error(
- "Failed to parse the begin of host ip_local_port_range: " + split[0]);
- }
-
- Try<uint16_t> end = numify<uint16_t>(split[1]);
- if (end.isError()) {
- return Error(
- "Failed to parse the end of host ip_local_port_range: " + split[1]);
- }
-
- Interval<uint16_t> hostEphemeralPorts =
- (Bound<uint16_t>::closed(begin.get()),
- Bound<uint16_t>::closed(end.get()));
-
- // Sanity check to make sure the specified ephemeral ports do not
- // intersect with the ephemeral ports used by the host.
- if (ephemeralPorts.intersects(hostEphemeralPorts)) {
- return Error(
- "The specified ephemeral ports " + stringify(ephemeralPorts) +
- " intersect with the ephemeral ports used by the host " +
- stringify(hostEphemeralPorts));
- }
-
- // TODO(chzhcn): Cross check ephemeral ports with used ports on the
- // host (e.g., using port scan).
-
- // Initialize the ephemeral ports allocator.
-
- // In theory, any positive integer can be broken up into a few
- // numbers that are power of 2 aligned. We choose to not allow this
- // for now so that each container has a fixed (one) number of
- // filters for ephemeral ports. This makes it easy to debug and
- // infer performance.
- if (roundDownToPowerOfTwo(flags.ephemeral_ports_per_container) !=
- flags.ephemeral_ports_per_container) {
- return Error(
- "The number of ephemeral ports for each container (" +
- stringify(flags.ephemeral_ports_per_container) +
- ") is not a power of 2");
- }
-
- if (ephemeralPorts.size() < flags.ephemeral_ports_per_container) {
- return Error(
- "Network Isolator is given ephemeral ports of size: " +
- stringify(ephemeralPorts.size()) + ", but asked to allocate " +
- stringify(flags.ephemeral_ports_per_container) +
- " ephemeral ports for a container");
- }
-
- if (flags.ephemeral_ports_per_container < MIN_EPHEMERAL_PORTS_SIZE) {
- return Error(
- "Each container has only " +
- stringify(flags.ephemeral_ports_per_container) +
- " ephemeral ports. The minimum required is: " +
- stringify(MIN_EPHEMERAL_PORTS_SIZE));
- }
-
- Owned<EphemeralPortsAllocator> ephemeralPortsAllocator(
- new EphemeralPortsAllocator(
- ephemeralPorts,
- flags.ephemeral_ports_per_container));
-
- // Get the name of the public interface (e.g., eth0). If it is not
- // specified, try to derive its name from the routing library.
- Result<string> eth0 = link::eth0();
- if (flags.eth0_name.isSome()) {
- eth0 = flags.eth0_name.get();
-
- // Check if the given public interface exists.
- Try<bool> hostEth0Exists = link::exists(eth0.get());
- if (hostEth0Exists.isError()) {
- return Error(
- "Failed to check if " + eth0.get() + " exists: " +
- hostEth0Exists.error());
- } else if (!hostEth0Exists.get()) {
- return Error("The public interface " + eth0.get() + " does not exist");
- }
- } else if (!eth0.isSome()){
- // eth0 is not specified in the flag and we did not get a valid
- // eth0 from the library.
- return Error(
- "Network Isolator failed to find a public interface: " + eth0.error());
- }
-
- LOG(INFO) << "Using " << eth0.get() << " as the public interface";
-
- // Get the name of the loopback interface. If it is not specified,
- // try to derive its name based on the loopback IP address.
- Result<string> lo = link::lo();
- // Option<string> lo = flags.lo_name;
- if (flags.lo_name.isSome()) {
- lo = flags.lo_name;
-
- // Check if the given loopback interface exists.
- Try<bool> hostLoExists = link::exists(lo.get());
- if (hostLoExists.isError()) {
- return Error(
- "Failed to check if " + lo.get() + " exists: " +
- hostLoExists.error());
- } else if (!hostLoExists.get()) {
- return Error("The loopback interface " + lo.get() + " does not exist");
- }
- } else if (!lo.isSome()) {
- // lo is not specified in the flag and we did not get a valid
- // lo from the library.
- return Error(
- "Network Isolator failed to find a loopback interface: " + lo.error());
- }
-
- LOG(INFO) << "Using " << lo.get() << " as the loopback interface";
-
- // If egress rate limit is provided, do a sanity check that it is
- // not greater than the host physical link speed.
- Option<Bytes> egressRateLimitPerContainer;
- if (flags.egress_rate_limit_per_container.isSome()) {
- // Read host physical link speed from /sys/class/net/eth0/speed.
- // This value is in MBits/s.
- Try<string> value =
- os::read(path::join("/sys/class/net", eth0.get(), "speed"));
-
- if (value.isError()) {
- return Error(
- "Failed to read " +
- path::join("/sys/class/net", eth0.get(), "speed") +
- ": " + value.error());
- }
-
- Try<uint64_t> hostLinkSpeed = numify<uint64_t>(strings::trim(value.get()));
- CHECK_SOME(hostLinkSpeed);
-
- // It could be possible that the nic driver doesn't support
- // reporting physical link speed. In that case, report error.
- if (hostLinkSpeed.get() == 0xFFFFFFFF) {
- return Error(
- "Network Isolator failed to determine link speed for " + eth0.get());
- }
-
- // Convert host link speed to Bytes/s for comparason.
- if (hostLinkSpeed.get() * 1000000 / 8 <
- flags.egress_rate_limit_per_container.get().bytes()) {
- return Error(
- "The given egress traffic limit for containers " +
- stringify(flags.egress_rate_limit_per_container.get().bytes()) +
- " Bytes/s is greater than the host link speed " +
- stringify(hostLinkSpeed.get() * 1000000 / 8) + " Bytes/s");
- }
-
- if (flags.egress_rate_limit_per_container.get() != Bytes(0)) {
- egressRateLimitPerContainer = flags.egress_rate_limit_per_container.get();
- } else {
- LOG(WARNING) << "Ignoring the given zero egress rate limit";
- }
- }
-
- // Get the host IP network, MAC and default gateway.
- Result<net::IPNetwork> hostIPNetwork =
- net::IPNetwork::fromLinkDevice(eth0.get(), AF_INET);
-
- if (!hostIPNetwork.isSome()) {
- return Error(
- "Failed to get the public IP network of " + eth0.get() + ": " +
- (hostIPNetwork.isError() ?
- hostIPNetwork.error() :
- "does not have an IPv4 network"));
- }
-
- Result<net::MAC> hostMAC = net::mac(eth0.get());
- if (!hostMAC.isSome()) {
- return Error(
- "Failed to get the MAC address of " + eth0.get() + ": " +
- (hostMAC.isError() ? hostMAC.error() : "does not have a MAC address"));
- }
-
- Result<net::IP> hostDefaultGateway = route::defaultGateway();
- if (!hostDefaultGateway.isSome()) {
- return Error(
- "Failed to get the default gateway of the host: " +
- (hostDefaultGateway.isError() ? hostDefaultGateway.error()
- : "The default gateway of the host does not exist"));
- }
-
- // Set the MAC address of the host loopback interface (lo) so that
- // it matches that of the host public interface (eth0). A fairly
- // recent kernel patch is needed for this operation to succeed:
- // https://git.kernel.org/cgit/linux/kernel/git/davem/net.git/:
- // 25f929fbff0d1bcebf2e92656d33025cd330cbf8
- Try<bool> setHostLoMAC = link::setMAC(lo.get(), hostMAC.get());
- if (setHostLoMAC.isError()) {
- return Error(
- "Failed to set the MAC address of " + lo.get() +
- ": " + setHostLoMAC.error());
- }
-
- // Set the MTU of the host loopback interface (lo) so that it
- // matches that of the host public interface (eth0).
- Result<unsigned int> hostEth0MTU = link::mtu(eth0.get());
- if (hostEth0MTU.isError()) {
- return Error(
- "Failed to get the MTU of " + eth0.get() +
- ": " + hostEth0MTU.error());
- }
-
- // The host public interface should exist since we just checked it.
- CHECK_SOME(hostEth0MTU);
-
- Try<bool> setHostLoMTU = link::setMTU(lo.get(), hostEth0MTU.get());
- if (setHostLoMTU.isError()) {
- return Error(
- "Failed to set the MTU of " + lo.get() +
- ": " + setHostLoMTU.error());
- }
-
- // Prepare the ingress queueing disciplines on host public interface
- // (eth0) and host loopback interface (lo).
- Try<bool> createHostEth0IngressQdisc = ingress::create(eth0.get());
- if (createHostEth0IngressQdisc.isError()) {
- return Error(
- "Failed to create the ingress qdisc on " + eth0.get() +
- ": " + createHostEth0IngressQdisc.error());
- }
-
- set<uint16_t> freeFlowIds;
- if (flags.egress_unique_flow_per_container) {
- // Prepare a fq_codel queueing discipline on host public interface
- // (eth0) for egress flow classification.
- //
- // TODO(cwang): Maybe we can continue when some other egress qdisc
- // exists because this is not a necessary qdisc for network
- // isolation, but we don't want inconsistency, so we just fail in
- // this case. See details in MESOS-2370.
- Try<bool> createHostEth0EgressQdisc = fq_codel::create(
- eth0.get(),
- EGRESS_ROOT,
- HOST_TX_FQ_CODEL_HANDLE);
- if (createHostEth0EgressQdisc.isError()) {
- return Error(
- "Failed to create the egress qdisc on " + eth0.get() +
- ": " + createHostEth0EgressQdisc.error());
- }
-
- // TODO(cwang): Make sure DEFAULT_FLOWS is large enough so that
- // it's unlikely to run out of free flow IDs.
- for (uint16_t i = CONTAINER_MIN_FLOWID; i < fq_codel::DEFAULT_FLOWS; i++) {
- freeFlowIds.insert(i);
- }
- }
-
- Try<bool> createHostLoQdisc = ingress::create(lo.get());
- if (createHostLoQdisc.isError()) {
- return Error(
- "Failed to create the ingress qdisc on " + lo.get() +
- ": " + createHostLoQdisc.error());
- }
-
- // Enable 'route_localnet' on host loopback interface (lo). This
- // enables the use of 127.0.0.1/8 for local routing purpose. This
- // feature only exists on kernel 3.6 or newer.
- const string loRouteLocalnet =
- path::join("/proc/sys/net/ipv4/conf", lo.get(), "route_localnet");
-
- if (!os::exists(loRouteLocalnet)) {
- // TODO(jieyu): Consider supporting running the isolator if this
- // feature is not available. We need to conditionally disable
- // routing for 127.0.0.1/8, and ask the tasks to use the public IP
- // for container to container and container to host communication.
- return Error("The kernel does not support 'route_localnet'");
- }
-
- Try<Nothing> write = os::write(loRouteLocalnet, "1");
- if (write.isError()) {
- return Error(
- "Failed to enable route_localnet for " + lo.get() +
- ": " + write.error());
- }
-
- // We disable 'rp_filter' and 'send_redirects' for host loopback
- // interface (lo) to work around a kernel bug, which was only
- // recently addressed in upstream in the following 3 commits.
- // https://git.kernel.org/cgit/linux/kernel/git/davem/net.git/:
- // 6a662719c9868b3d6c7d26b3a085f0cd3cc15e64
- // 0d5edc68739f1c1e0519acbea1d3f0c1882a15d7
- // e374c618b1465f0292047a9f4c244bd71ab5f1f0
- // The workaround ensures packets don't get dropped at lo.
- write = os::write("/proc/sys/net/ipv4/conf/all/rp_filter", "0");
- if (write.isError()) {
- return Error(
- "Failed to disable rp_filter for all: " + write.error());
- }
-
- write = os::write(path::join(
- "/proc/sys/net/ipv4/conf", lo.get(), "rp_filter"), "0");
- if (write.isError()) {
- return Error(
- "Failed to disable rp_filter for " + lo.get() +
- ": " + write.error());
- }
-
- write = os::write("/proc/sys/net/ipv4/conf/all/send_redirects", "0");
- if (write.isError()) {
- return Error(
- "Failed to disable send_redirects for all: " + write.error());
- }
-
- write = os::write(path::join(
- "/proc/sys/net/ipv4/conf", lo.get(), "send_redirects"), "0");
- if (write.isError()) {
- return Error(
- "Failed to disable send_redirects for " + lo.get() +
- ": " + write.error());
- }
-
- // We need to enable accept_local on host loopback interface (lo)
- // for kernels older than 3.6. Refer to the following:
- // https://git.kernel.org/cgit/linux/kernel/git/davem/net.git/:
- // 7a9bc9b81a5bc6e44ebc80ef781332e4385083f2
- // https://www.kernel.org/doc/Documentation/networking/ip-sysctl.txt
- write = os::write(path::join(
- "/proc/sys/net/ipv4/conf", lo.get(), "accept_local"), "1");
- if (write.isError()) {
- return Error(
- "Failed to enable accept_local for " + lo.get() +
- ": " + write.error());
- }
-
- // Reading host network configurations. Each container will match
- // these configurations.
- hashset<string> procs;
-
- // TODO(jieyu): The following is a partial list of all the
- // configurations. In the future, we may want to expose these
- // configurations using ContainerInfo.
-
- // The kernel will use a default value for the following
- // configurations inside a container. Therefore, we need to set them
- // in the container to match that on the host.
- procs.insert("/proc/sys/net/core/somaxconn");
-
- // As of kernel 3.10, the following configurations are shared
- // between host and containers, and therefore are not required to be
- // set in containers. We keep them here just in case the kernel
- // changes in the future.
- procs.insert("/proc/sys/net/core/netdev_max_backlog");
- procs.insert("/proc/sys/net/core/rmem_max");
- procs.insert("/proc/sys/net/core/wmem_max");
- procs.insert("/proc/sys/net/ipv4/tcp_keepalive_time");
- procs.insert("/proc/sys/net/ipv4/tcp_keepalive_intvl");
- procs.insert("/proc/sys/net/ipv4/tcp_keepalive_probes");
- procs.insert("/proc/sys/net/ipv4/tcp_max_syn_backlog");
- procs.insert("/proc/sys/net/ipv4/tcp_rmem");
- procs.insert("/proc/sys/net/ipv4/tcp_retries2");
- procs.insert("/proc/sys/net/ipv4/tcp_synack_retries");
- procs.insert("/proc/sys/net/ipv4/tcp_wmem");
- procs.insert("/proc/sys/net/ipv4/neigh/default/gc_thresh1");
- procs.insert("/proc/sys/net/ipv4/neigh/default/gc_thresh2");
- procs.insert("/proc/sys/net/ipv4/neigh/default/gc_thresh3");
-
- hashmap<string, string> hostNetworkConfigurations;
- foreach (const string& proc, procs) {
- Try<string> value = os::read(proc);
- if (value.isSome()) {
- LOG(INFO) << proc << " = '" << strings::trim(value.get()) << "'";
- hostNetworkConfigurations[proc] = strings::trim(value.get());
- }
- }
-
- // Self bind mount PORT_MAPPING_BIND_MOUNT_ROOT(). Since we use a
- // new mount namespace for each container, for this mount point, we
- // set '--make-rshared' on the host and set '--make-rslave' inside
- // each container. This is important because when we unmount the
- // network namespace handles on the host, those handles will be
- // unmounted in the containers as well, but NOT vice versa.
-
- // We first create the bind mount directory if it does not exist.
- Try<Nothing> mkdir = os::mkdir(PORT_MAPPING_BIND_MOUNT_ROOT());
- if (mkdir.isError()) {
- return Error(
- "Failed to create the bind mount root directory at " +
- PORT_MAPPING_BIND_MOUNT_ROOT() + ": " + mkdir.error());
- }
-
- // Now, check '/proc/mounts' to see if
- // PORT_MAPPING_BIND_MOUNT_ROOT() has already been self mounted.
- Try<fs::MountTable> mountTable = fs::MountTable::read("/proc/mounts");
- if (mountTable.isError()) {
- return Error(
- "Failed to the read the mount table at '/proc/mounts': " +
- mountTable.error());
- }
-
- Option<fs::MountTable::Entry> bindMountRoot;
- foreach (const fs::MountTable::Entry& entry, mountTable.get().entries) {
- if (entry.dir == PORT_MAPPING_BIND_MOUNT_ROOT()) {
- bindMountRoot = entry;
- }
- }
-
- // Self bind mount PORT_MAPPING_BIND_MOUNT_ROOT().
- if (bindMountRoot.isNone()) {
- // NOTE: Instead of using fs::mount to perform the bind mount, we
- // use the shell command here because the syscall 'mount' does not
- // update the mount table (i.e., /etc/mtab), which could cause
- // issues for the shell command 'mount --make-rslave' inside the
- // container. It's OK to use the blocking os::shell here because
- // 'create' will only be invoked during initialization.
- Try<string> mount = os::shell(
- "mount --bind %s %s",
- PORT_MAPPING_BIND_MOUNT_ROOT().c_str(),
- PORT_MAPPING_BIND_MOUNT_ROOT().c_str());
-
- if (mount.isError()) {
- return Error(
- "Failed to self bind mount '" + PORT_MAPPING_BIND_MOUNT_ROOT() +
- "': " + mount.error());
- }
- }
-
- // Mark the mount point PORT_MAPPING_BIND_MOUNT_ROOT() as
- // recursively shared.
- Try<string> mountShared = os::shell(
- "mount --make-rshared %s",
- PORT_MAPPING_BIND_MOUNT_ROOT().c_str());
-
- if (mountShared.isError()) {
- return Error(
- "Failed to mark '" + PORT_MAPPING_BIND_MOUNT_ROOT() +
- "' as recursively shared: " + mountShared.error());
- }
-
- // Create the network namespace handle symlink directory if it does
- // not exist. It is used to host from network namespace handle
- // symlinks whose basename is a container ID. This allows us to
- // recover container IDs for orphan containers (i.e., not known by
- // the slave). This is introduced in 0.23.0.
- mkdir = os::mkdir(PORT_MAPPING_BIND_MOUNT_SYMLINK_ROOT());
- if (mkdir.isError()) {
- return Error(
- "Failed to create the bind mount root directory at " +
- PORT_MAPPING_BIND_MOUNT_SYMLINK_ROOT() + ": " + mkdir.error());
- }
-
- return new MesosIsolator(Owned<MesosIsolatorProcess>(
- new PortMappingIsolatorProcess(
- flags,
- eth0.get(),
- lo.get(),
- hostMAC.get(),
- hostIPNetwork.get(),
- hostEth0MTU.get(),
- hostDefaultGateway.get(),
- hostNetworkConfigurations,
- egressRateLimitPerContainer,
- nonEphemeralPorts,
- ephemeralPortsAllocator,
- freeFlowIds)));
-}
-
-
-Future<Nothing> PortMappingIsolatorProcess::recover(
- const list<ContainerState>& states,
- const hashset<ContainerID>& orphans)
-{
- // Extract pids from virtual device names (veth). This tells us
- // about all the potential live containers on this slave.
- Try<set<string>> links = net::links();
- if (links.isError()) {
- return Failure("Failed to get all the links: " + links.error());
- }
-
- hashset<pid_t> pids;
- foreach (const string& name, links.get()) {
- Option<pid_t> pid = getPidFromVeth(name);
- // Not all links follow the naming: mesos{pid}, so we simply
- // continue, e.g., eth0.
- if (pid.isNone()) {
- continue;
- } else if (pids.contains(pid.get())) {
- return Failure("Two virtual devices have the same name '" + name + "'");
- }
-
- pids.insert(pid.get());
- }
-
- // Scan the bind mount root to cleanup all stale network namespace
- // handles that do not have an active veth associated with.
- Try<list<string>> entries = os::ls(PORT_MAPPING_BIND_MOUNT_ROOT());
- if (entries.isError()) {
- return Failure(
- "Failed to list bind mount root '" +
- PORT_MAPPING_BIND_MOUNT_ROOT() +
- "': " + entries.error());
- }
-
- foreach (const string& entry, entries.get()) {
- const string path = path::join(PORT_MAPPING_BIND_MOUNT_ROOT(), entry);
-
- // NOTE: We expect all regular files whose names are numbers under
- // the bind mount root are network namespace handles.
- Result<pid_t> pid = getPidFromNamespaceHandle(path);
- if (pid.isError()) {
- return Failure(
- "Failed to get pid from network namespace handle '" +
- path + "': " + pid.error());
- } else if (pid.isNone()) {
- // We ignore files that are clearly not network namespace
- // handles created by us. It's likely that those are created by
- // users or other tools.
- LOG(WARNING) << "Unrecognized network namespace handle '" << path << "'";
- continue;
- }
-
- // We cleanup the network namespace handle if the associated
- // containers have clearly exited (i.e., the veth has gone). The
- // cleanup here is best effort.
- if (!pids.contains(pid.get())) {
- LOG(INFO) << "Removing stale network namespace handle '" << path << "'";
-
- Try<Nothing> unmount = fs::unmount(path, MNT_DETACH);
- if (unmount.isError()) {
- LOG(WARNING) << "Failed to unmount stale network namespace handle '"
- << path << "': " << unmount.error();
- }
-
- Try<Nothing> rm = os::rm(path);
- if (rm.isError()) {
- LOG(WARNING) << "Failed to remove stale network namespace handle '"
- << path << "': " << rm.error();
- }
- }
- }
-
- // Scan the bind mount symlink root for container IDs. This allows us
- // to recover container IDs for orphan containers (i.e., not known
- // by the slave). This is introduced in 0.23.0.
- entries = os::ls(PORT_MAPPING_BIND_MOUNT_SYMLINK_ROOT());
- if (entries.isError()) {
- return Failure(
- "Failed to list bind mount symlink root '" +
- PORT_MAPPING_BIND_MOUNT_SYMLINK_ROOT() +
- "': " + entries.error());
- }
-
- // This map stores the mapping between pids and container IDs
- // recovered from the bind mount root that have valid veth links. We
- // use a multihashmap here because multiple container IDs can map to
- // the same pid if the removal of a symlink fails in '_cleanup()'
- // and the pid is reused by a new container.
- multihashmap<pid_t, ContainerID> linkers;
-
- foreach (const string& entry, entries.get()) {
- const string path =
- path::join(PORT_MAPPING_BIND_MOUNT_SYMLINK_ROOT(), entry);
-
- // We only create symlinks in this directory and assume
- // non-symlink files are created by other users or tools,
- // therefore will be ignored.
- if (!os::stat::islink(path)) {
- LOG(WARNING) << "Ignored non-symlink file '" << path
- << "' under bind mount symlink root '"
- << PORT_MAPPING_BIND_MOUNT_SYMLINK_ROOT() << "'";
- continue;
- }
-
- // NOTE: We expect all symlinks under the bind mount symlink root
- // to be container ID symlinks.
-
- Try<ContainerID> containerId = getContainerIdFromSymlink(path);
- if (containerId.isError()) {
- return Failure(
- "Failed to get container ID from network namespace handle symlink '" +
- path + "': " + containerId.error());
- }
-
- Result<pid_t> pid = getPidFromSymlink(path);
- if (pid.isError()) {
- return Failure(
- "Failed to get pid from network namespace handle symlink '" + path +
- "': " + pid.error());
- }
-
- // We remove the symlink if it's dangling or the associated
- // containers have clearly exited (i.e., the veth has gone). The
- // cleanup here is best effort.
- if (pid.isNone() || !pids.contains(pid.get())) {
- LOG(INFO) << "Removing stale network namespace handle symlink '"
- << path << "'";
-
- Try<Nothing> rm = os::rm(path);
- if (rm.isError()) {
- LOG(WARNING) << "Failed to remove stale network namespace handle "
- << " symlink '" << path << "': " << rm.error();
- }
- } else {
- LOG(INFO) << "Discovered network namespace handle symlink "
- << containerId.get() << " -> " << pid.get();
-
- linkers.put(pid.get(), containerId.get());
- }
- }
-
- // If multiple container IDs point to the same pid, we remove both
- // symlinks for safety (as if we cannot derive the container ID for
- // orphans, which is OK because it'll be treated the same as those
- // containers that are created by older (pre 0.23.0) versions). Note
- // that it's possible that multiple container IDs map to the same
- // pid if the removal of a symlink fails in '_cleanup()' and the pid
- // is reused by a new container.
- foreach (pid_t pid, linkers.keys()) {
- list<ContainerID> containerIds = linkers.get(pid);
- if (containerIds.size() > 1) {
- foreach (const ContainerID& containerId, containerIds) {
- const string linker = getSymlinkPath(containerId);
-
- LOG(WARNING) << "Removing duplicated network namespace handle symlink '"
- << linker << "'";
-
- Try<Nothing> rm = os::rm(linker);
- if (rm.isError()) {
- LOG(WARNING) << "Failed to remove duplicated network namespace "
- << "handle symlink '" << linker << "': " << rm.error();
- }
- }
-
- linkers.remove(pid);
- }
- }
-
- // Now, actually recover the isolator from slave's state.
- foreach (const ContainerState& state, states) {
- const ContainerID& containerId = state.container_id();
- pid_t pid = state.pid();
-
- VLOG(1) << "Recovering network isolator for container "
- << containerId << " with pid " << pid;
-
- if (!pids.contains(pid)) {
- // There are two possible cases here:
- //
- // 1) The container was launched by the slave with network
- // isolation disabled, so the pid could not be found in the
- // device names in the system.
- //
- // 2) The container was launched by the slave with network
- // isolation enabled, but veth is removed (because the
- // corresponding container is destroyed), but the slave
- // restarts before it is able to write the sentinel file.
- //
- // In both cases, we treat the container as unmanaged. For case
- // (2), it's safe to do so because the container has already
- // been destroyed.
- VLOG(1) << "Skipped recovery for container " << containerId
- << "with pid " << pid << " as either it was not managed by "
- << "the network isolator or it has already been destroyed";
-
- unmanaged.insert(containerId);
- continue;
- }
-
- Try<Info*> recover = _recover(pid);
- if (recover.isError()) {
- foreachvalue (Info* info, infos) {
- delete info;
- }
-
- return Failure(
- "Failed to recover container " + stringify(containerId) +
- " with pid " + stringify(pid) + ": " + recover.error());
- }
-
- infos[containerId] = recover.get();
-
- // Remove the successfully recovered pid.
- pids.erase(pid);
- }
-
- // Recover orphans. Known orphans will be destroyed by containerizer
- // using the normal cleanup path (refer to MESOS-2367 for details).
- // Unknown orphans will be cleaned up immediately. The recovery will
- // fail if there is some unknown orphan that cannot be cleaned up.
- vector<Info*> unknownOrphans;
-
- foreach (pid_t pid, pids) {
- Try<Info*> recover = _recover(pid);
- if (recover.isError()) {
- foreachvalue (Info* info, infos) {
- delete info;
- }
- foreach (Info* info, unknownOrphans) {
- delete info;
- }
-
- return Failure(
- "Failed to recover orphaned container with pid " +
- stringify(pid) + ": " + recover.error());
- }
-
- if (linkers.get(pid).size() == 1) {
- const ContainerID containerId = linkers.get(pid).front();
- CHECK(!infos.contains(containerId));
-
- if (orphans.contains(containerId)) {
- infos[containerId] = recover.get();
- continue;
- }
- }
-
- unknownOrphans.push_back(recover.get());
- }
-
- foreach (Info* info, unknownOrphans) {
- CHECK_SOME(info->pid);
- pid_t pid = info->pid.get();
-
- Option<ContainerID> containerId;
- if (linkers.get(pid).size() == 1) {
- containerId = linkers.get(pid).front();
- }
-
- // NOTE: If 'infos' is empty (means there is no regular container
- // or known orphan), the '_cleanup' below will remove the ICMP and
- // ARP packet filters on host eth0. This will cause subsequent
- // calls to '_cleanup' for unknown orphans to fail. However, this
- // is OK because when slave restarts and tries to recover again,
- // it'll try to remove the remaining unknown orphans.
- // TODO(jieyu): Consider call '_cleanup' for all the unknown
- // orphans before returning even if error occurs.
- Try<Nothing> cleanup = _cleanup(info, containerId);
- if (cleanup.isError()) {
- foreachvalue (Info* info, infos) {
- delete info;
- }
-
- // TODO(jieyu): Also delete 'info' in unknownOrphans. Notice
- // that some 'info' in unknownOrphans might have already been
- // deleted in '_cleanup' above.
-
- return Failure(
- "Failed to cleanup orphaned container with pid " +
- stringify(pid) + ": " + cleanup.error());
- }
- }
-
- // TODO(cwang): Consider removing unrecognized flow classifiers from
- // host eth0 egress.
-
- LOG(INFO) << "Network isolator recovery complete";
-
- return Nothing();
-}
-
-
-Try<PortMappingIsolatorProcess::Info*>
-PortMappingIsolatorProcess::_recover(pid_t pid)
-{
- // Get all the IP filters on veth.
- // NOTE: We only look at veth devices to recover port ranges
- // assigned to each container. That's the reason why we need to make
- // sure that we add filters to veth before adding filters to host
- // eth0 and host lo. Also, we need to make sure we remove filters
- // from host eth0 and host lo before removing filters from veth.
- Result<vector<ip::Classifier>> vethIngressClassifiers =
- ip::classifiers(veth(pid), ingress::HANDLE);
-
- if (vethIngressClassifiers.isError()) {
- return Error(
- "Failed to get all the IP filters on " + veth(pid) +
- ": " + vethIngressClassifiers.error());
- } else if (vethIngressClassifiers.isNone()) {
- return Error(
- "Failed to get all the IP filters on " + veth(pid) +
- ": link does not exist");
- }
-
- hashmap<PortRange, uint16_t> flowIds;
-
- if (flags.egress_unique_flow_per_container) {
- // Get all egress IP flow classifiers on eth0.
- Result<vector<filter::Filter<ip::Classifier>>> eth0EgressFilters =
- ip::filters(eth0, HOST_TX_FQ_CODEL_HANDLE);
-
- if (eth0EgressFilters.isError()) {
- return Error(
- "Failed to get all the IP flow classifiers on " + eth0 +
- ": " + eth0EgressFilters.error());
- } else if (eth0EgressFilters.isNone()) {
- return Error(
- "Failed to get all the IP flow classifiers on " + eth0 +
- ": link does not exist");
- }
-
- // Construct a port range to flow ID mapping from host eth0
- // egress. This map will be used later.
- foreach (const filter::Filter<ip::Classifier>& filter,
- eth0EgressFilters.get()) {
- const Option<PortRange> sourcePorts = filter.classifier.sourcePorts;
- const Option<Handle> classid = filter.classid;
-
- if (sourcePorts.isNone()) {
- return Error("Missing source ports for filters on egress of " + eth0);
- }
-
- if (classid.isNone()) {
- return Error("Missing classid for filters on egress of " + eth0);
- }
-
- if (flowIds.contains(sourcePorts.get())) {
- return Error(
- "Duplicated port range " + stringify(sourcePorts.get()) +
- " detected on egress of " + eth0);
- }
-
- flowIds[sourcePorts.get()] = classid.get().secondary();
- }
- }
-
- IntervalSet<uint16_t> nonEphemeralPorts;
- IntervalSet<uint16_t> ephemeralPorts;
- Option<uint16_t> flowId;
-
- foreach (const ip::Classifier& classifier, vethIngressClassifiers.get()) {
- const Option<PortRange> sourcePorts = classifier.sourcePorts;
- const Option<PortRange> destinationPorts = classifier.destinationPorts;
-
- // All the IP filters on veth used by us only have source ports.
- if (sourcePorts.isNone() || destinationPorts.isSome()) {
- return Error("Unexpected IP filter detected on " + veth(pid));
- }
-
- if (flowIds.contains(sourcePorts.get())) {
- if (flowId.isNone()) {
- flowId = flowIds.get(sourcePorts.get());
- } else if (flowId != flowIds.get(sourcePorts.get())) {
- return Error(
- "A container is associated with multiple flows "
- "on egress of " + eth0);
- }
- } else if (flowId.isSome()) {
- // This is the case where some port range of a container is
- // assigned to a flow while some isn't. This could happen if
- // slave crashes while those filters are created. However, this
- // is OK for us because packets by default go to the host flow.
- LOG(WARNING) << "Container port range " << sourcePorts.get()
- << " does not have flow id " << flowId.get()
- << " assigned";
- }
-
- Interval<uint16_t> ports =
- (Bound<uint16_t>::closed(sourcePorts.get().begin()),
- Bound<uint16_t>::closed(sourcePorts.get().end()));
-
- if (managedNonEphemeralPorts.contains(ports)) {
- nonEphemeralPorts += ports;
- } else if (ephemeralPortsAllocator->isManaged(ports)) {
- // We have duplicate here because we have two IP filters with
- // the same ephemeral port range (one for eth0 and one for lo).
- // But we should never have two intersecting port ranges.
- if (!ephemeralPorts.contains(ports) && ephemeralPorts.intersects(ports)) {
- return Error("Unexpected intersected ephemeral port ranges");
- }
-
- ephemeralPorts += ports;
- } else {
- return Error("Unexpected IP filter detected on " + veth(pid));
- }
- }
-
- Info* info = NULL;
-
- if (ephemeralPorts.empty()) {
- // NOTE: This is possible because the slave may crash while
- // calling 'isolate()', leaving a partially isolated container. To
- // clean up this partially isolated container, we still create an
- // Info struct here and let the 'cleanup' function clean it up
- // later.
- LOG(WARNING) << "No ephemeral ports found for container with pid "
- << stringify(pid) << ". This could happen if slave crashes "
- << "while isolating a container";
-
- info = new Info(nonEphemeralPorts, Interval<uint16_t>(), pid);
- } else {
- if (ephemeralPorts.intervalCount() != 1) {
- return Error("Each container should have only one ephemeral port range");
- }
-
- // Tell the allocator that this ephemeral port range is used.
- ephemeralPortsAllocator->allocate(*ephemeralPorts.begin());
-
- info = new Info(nonEphemeralPorts, *ephemeralPorts.begin(), pid);
-
- VLOG(1) << "Recovered network isolator for container with pid " << pid
- << " non-ephemeral port ranges " << nonEphemeralPorts
- << " and ephemeral port range " << *ephemeralPorts.begin();
- }
-
- if (flowId.isSome()) {
- freeFlowIds.erase(flowId.get());
- info->flowId = flowId.get();
- }
-
- return CHECK_NOTNULL(info);
-}
-
-
-Future<Option<ContainerPrepareInfo>> PortMappingIsolatorProcess::prepare(
- const ContainerID& containerId,
- const ExecutorInfo& executorInfo,
- const string& directory,
- const Option<string>& user)
-{
- if (unmanaged.contains(containerId)) {
- return Failure("Asked to prepare an unmanaged container");
- }
-
- if (infos.contains(containerId)) {
- return Failure("Container has already been prepared");
- }
-
- Resources resources(executorInfo.resources());
-
- IntervalSet<uint16_t> nonEphemeralPorts;
-
- if (resources.ports().isSome()) {
- nonEphemeralPorts = getIntervalSet(resources.ports().get());
-
- // Sanity check to make sure that the assigned non-ephemeral ports
- // for the container are part of the non-ephemeral ports specified
- // by the slave.
- if (!managedNonEphemeralPorts.contains(nonEphemeralPorts)) {
- return Failure(
- "Some non-ephemeral ports specified in " +
- stringify(nonEphemeralPorts) +
- " are not managed by the slave");
- }
- }
-
- // TODO(jieyu): For now, we simply ignore the 'ephemeral_ports'
- // specified in the executor info. However, this behavior needs to
- // be changed once the master can make default allocations for
- // ephemeral ports.
- if (resources.ephemeral_ports().isSome()) {
- LOG(WARNING) << "Ignoring the specified ephemeral_ports '"
- << resources.ephemeral_ports().get()
- << "' for container" << containerId
- << " of executor " << executorInfo.executor_id();
- }
-
- // Allocate the ephemeral ports used by this container.
- Try<Interval<uint16_t>> ephemeralPorts = ephemeralPortsAllocator->allocate();
- if (ephemeralPorts.isError()) {
- return Failure(
- "Failed to allocate ephemeral ports: " + ephemeralPorts.error());
- }
-
- infos[containerId] = new Info(nonEphemeralPorts, ephemeralPorts.get());
-
- LOG(INFO) << "Using non-ephemeral ports " << nonEphemeralPorts
- << " and ephemeral ports " << ephemeralPorts.get()
- << " for container " << containerId << " of executor "
- << executorInfo.executor_id();
-
- ContainerPrepareInfo prepareInfo;
- prepareInfo.add_commands()->set_value(scripts(infos[containerId]));
-
- // NOTE: the port mapping isolator itself doesn't require mount
- // namespace. However, if mount namespace is enabled because of
- // other isolators, we need to set mount sharing accordingly for
- // PORT_MAPPING_BIND_MOUNT_ROOT to avoid races described in
- // MESOS-1558. So we turn on mount namespace here for consistency.
- prepareInfo.set_namespaces(CLONE_NEWNET | CLONE_NEWNS);
-
- return prepareInfo;
-}
-
-
-Future<Nothing> PortMappingIsolatorProcess::isolate(
- const ContainerID& containerId,
- pid_t pid)
-{
- if (unmanaged.contains(containerId)) {
- return Failure("Asked to isolate an unmanaged container");
- }
-
- if (!infos.contains(containerId)) {
- return Failure("Unknown container");
- }
-
- Info* info = CHECK_NOTNULL(infos[containerId]);
-
- if (info->pid.isSome()) {
- return Failure("The container has already been isolated");
- }
-
- info->pid = pid;
-
- if (flags.egress_unique_flow_per_container) {
- info->flowId = getNextFlowId();
- }
-
- // Bind mount the network namespace handle of the process 'pid' to a
- // directory to hold an extra reference to the network namespace
- // which will be released in 'cleanup'. By holding the extra
- // reference, the network namespace will not be destroyed even if
- // the process 'pid' is gone, which allows us to explicitly control
- // the network namespace life cycle.
- const string source = path::join("/proc", stringify(pid), "ns", "net");
- const string target = getNamespaceHandlePath(pid);
-
- Try<Nothing> touch = os::touch(target);
- if (touch.isError()) {
- return Failure("Failed to create the bind mount point: " + touch.error());
- }
-
- Try<Nothing> mount = fs::mount(source, target, None(), MS_BIND, NULL);
- if (mount.isError()) {
- return Failure(
- "Failed to mount the network namespace handle from '" +
- source + "' to '" + target + "': " + mount.error());
- }
-
- LOG(INFO) << "Bind mounted '" << source << "' to '" << target
- << "' for container " << containerId;
-
- // Since 0.23.0, we create a symlink to the network namespace handle
- // using the container ID. This serves two purposes. First, it
- // allows us to recover the container ID later when slave restarts
- // even if slave's checkpointed meta data is deleted. Second, it
- // makes the debugging easier. See MESOS-2528 for details.
- const string linker = getSymlinkPath(containerId);
- Try<Nothing> symlink = ::fs::symlink(target, linker);
- if (symlink.isError()) {
- return Failure(
- "Failed to symlink the network namespace handle '" +
- linker + "' -> '" + target + "': " + symlink.error());
- }
-
- LOG(INFO) << "Created network namespace handle symlink '"
- << linker << "' -> '" << target << "'";
-
- // Create a virtual ethernet pair for this container.
- Try<bool> createVethPair = link::create(veth(pid), eth0, pid);
- if (createVethPair.isError()) {
- return Failure(
- "Failed to create virtual ethernet pair: " +
- createVethPair.error());
- }
-
- // Disable IPv6 for veth as IPv6 packets won't be forwarded anyway.
- const string disableIPv6 =
- path::join("/proc/sys/net/ipv6/conf", veth(pid), "disable_ipv6");
-
- if (os::exists(disableIPv6)) {
- Try<Nothing> write = os::write(disableIPv6, "1");
- if (write.isError()) {
- return Failure(
- "Failed to disable IPv6 for " + veth(pid) +
- ": " + write.error());
- }
- }
-
- // Sets the MAC address of veth to match the MAC address of the host
- // public interface (eth0).
- Try<bool> setVethMAC = link::setMAC(veth(pid), hostMAC);
- if (setVethMAC.isError()) {
- return Failure(
- "Failed to set the MAC address of " + veth(pid) +
- ": " + setVethMAC.error());
- }
-
- // Prepare the ingress queueing disciplines on veth.
- Try<bool> createQdisc = ingress::create(veth(pid));
- if (createQdisc.isError()) {
- return Failure(
- "Failed to create the ingress qdisc on " + veth(pid) +
- ": " + createQdisc.error());
- }
-
- // Veth device should exist since we just created it.
- CHECK(createQdisc.get());
-
- // For each port range, add a set of IP packet filters to properly
- // redirect IP traffic to/from containers.
- foreach (const PortRange& range,
- getPortRanges(info->nonEphemeralPorts + info->ephemeralPorts)) {
- if (info->flowId.isSome()) {
- LOG(INFO) << "Adding IP packet filters with ports " << range
- << " with flow ID " << info->flowId.get()
- << " for container " << containerId;
- } else {
- LOG(INFO) << "Adding IP packet filters with ports " << range
- << " for container " << containerId;
- }
-
- Try<Nothing> add = addHostIPFilters(range, info->flowId, veth(pid));
- if (add.isError()) {
- return Failure(
- "Failed to add IP packet filter with ports " +
- stringify(range) + " for container with pid " +
- stringify(pid) + ": " + add.error());
- }
- }
-
- // Relay ICMP packets from veth of the container to host eth0.
- Try<bool> icmpVethToEth0 = filter::icmp::create(
- veth(pid),
- ingress::HANDLE,
- icmp::Classifier(None()),
- Priority(ICMP_FILTER_PRIORITY, NORMAL),
- action::Redirect(eth0));
-
- if (icmpVethToEth0.isError()) {
- ++metrics.adding_veth_icmp_filters_errors;
-
- return Failure(
- "Failed to create an ICMP packet filter from " + veth(pid) +
- " to host " + eth0 + ": " + icmpVethToEth0.error());
- } else if (!icmpVethToEth0.get()) {
- ++metrics.adding_veth_icmp_filters_already_exist;
-
- return Failure(
- "The ICMP packet filter from " + veth(pid) +
- " to host " + eth0 + " already exists");
- }
-
- // Relay ARP packets from veth of the container to host eth0.
- Try<bool> arpVethToEth0 = filter::basic::create(
- veth(pid),
- ingress::HANDLE,
- ETH_P_ARP,
- Priority(ARP_FILTER_PRIORITY, NORMAL),
- action::Redirect(eth0));
-
- if (arpVethToEth0.isError()) {
- ++metrics.adding_veth_arp_filters_errors;
-
- return Failure(
- "Failed to create an ARP packet filter from " + veth(pid) +
- " to host " + eth0 + ": " + arpVethToEth0.error());
- } else if (!arpVethToEth0.get()) {
- ++metrics.adding_veth_arp_filters_already_exist;
-
- return Failure(
- "The ARP packet filter from " + veth(pid) +
- " to host " + eth0 + " already exists");
- }
-
- // Setup filters for ICMP and ARP packets. We mirror ICMP and ARP
- // packets from host eth0 to veths of all the containers. We also
- // setup flow classifiers for host eth0 egress.
- set<string> targets;
- foreachvalue (Info* info, infos) {
- if (info->pid.isSome()) {
- targets.insert(veth(info->pid.get()));
- }
- }
-
- if (targets.size() == 1) {
- // We just create the first container in which case we should
- // create filters for ICMP and ARP packets.
-
- // Create a new ICMP filter on host eth0 ingress for mirroring
- // packets from host eth0 to veth.
- Try<bool> icmpEth0ToVeth = filter::icmp::create(
- eth0,
- ingress::HANDLE,
- icmp::Classifier(hostIPNetwork.address()),
- Priority(ICMP_FILTER_PRIORITY, NORMAL),
- action::Mirror(targets));
-
- if (icmpEth0ToVeth.isError()) {
- ++metrics.adding_eth0_icmp_filters_errors;
-
- return Failure(
- "Failed to create an ICMP packet filter from host " + eth0 +
- " to " + veth(pid) + ": " + icmpEth0ToVeth.error());
- } else if (!icmpEth0ToVeth.get()) {
- ++metrics.adding_eth0_icmp_filters_already_exist;
-
- return Failure(
- "The ICMP packet filter on host " + eth0 + " already exists");
- }
-
- // Create a new ARP filter on host eth0 ingress for mirroring
- // packets from host eth0 to veth.
- Try<bool> arpEth0ToVeth = filter::basic::create(
- eth0,
- ingress::HANDLE,
- ETH_P_ARP,
- Priority(ARP_FILTER_PRIORITY, NORMAL),
- action::Mirror(targets));
-
- if (arpEth0ToVeth.isError()) {
- ++metrics.adding_eth0_arp_filters_errors;
-
- return Failure(
- "Failed to create an ARP packet filter from host " + eth0 +
- " to " + veth(pid) + ": " + arpEth0ToVeth.error());
- } else if (!arpEth0ToVeth.get()) {
- ++metrics.adding_eth0_arp_filters_already_exist;
-
- return Failure(
- "The ARP packet filter on host " + eth0 + " already exists");
- }
-
- if (flags.egress_unique_flow_per_container) {
- // Create a new ICMP filter on host eth0 egress for classifying
- // packets into a reserved flow.
- Try<bool> icmpEth0Egress = filter::icmp::create(
- eth0,
- HOST_TX_FQ_CODEL_HANDLE,
- icmp::Classifier(None()),
- Priority(ICMP_FILTER_PRIORITY, NORMAL),
- Handle(HOST_TX_FQ_CODEL_HANDLE, ICMP_FLOWID));
-
- if (icmpEth0Egress.isError()) {
- ++metrics.adding_eth0_egress_filters_errors;
-
- return Failure(
- "Failed to create the ICMP flow classifier on host " +
- eth0 + ": " + icmpEth0Egress.error());
- } else if (!icmpEth0Egress.get()) {
- ++metrics.adding_eth0_egress_filters_already_exist;
-
- return Failure(
- "The ICMP flow classifier on host " + eth0 + " already exists");
- }
-
- // Create a new ARP filter on host eth0 egress for classifying
- // packets into a reserved flow.
- Try<bool> arpEth0Egress = filter::basic::create(
- eth0,
- HOST_TX_FQ_CODEL_HANDLE,
- ETH_P_ARP,
- Priority(ARP_FILTER_PRIORITY, NORMAL),
- Handle(HOST_TX_FQ_CODEL_HANDLE, ARP_FLOWID));
-
- if (arpEth0Egress.isError()) {
- ++metrics.adding_eth0_egress_filters_errors;
-
- return Failure(
- "Failed to create the ARP flow classifier on host " +
- eth0 + ": " + arpEth0Egress.error());
- } else if (!arpEth0Egress.get()) {
- ++metrics.adding_eth0_egress_filters_already_exist;
-
- return Failure(
- "The ARP flow classifier on host " + eth0 + " already exists");
- }
-
- // Rest of the host packets go to a reserved flow.
- Try<bool> defaultEth0Egress = filter::basic::create(
- eth0,
- HOST_TX_FQ_CODEL_HANDLE,
- ETH_P_ALL,
- Priority(DEFAULT_FILTER_PRIORITY, NORMAL),
- Handle(HOST_TX_FQ_CODEL_HANDLE, HOST_FLOWID));
-
- if (defaultEth0Egress.isError()) {
- ++metrics.adding_eth0_egress_fi
<TRUNCATED>