You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@mesos.apache.org by ji...@apache.org on 2015/10/26 19:40:34 UTC

[10/12] mesos git commit: Relocated MesosContainerizer specific files to the correct location.

http://git-wip-us.apache.org/repos/asf/mesos/blob/9a722d74/src/slave/containerizer/isolators/network/port_mapping.cpp
----------------------------------------------------------------------
diff --git a/src/slave/containerizer/isolators/network/port_mapping.cpp b/src/slave/containerizer/isolators/network/port_mapping.cpp
deleted file mode 100644
index e6bb75e..0000000
--- a/src/slave/containerizer/isolators/network/port_mapping.cpp
+++ /dev/null
@@ -1,3792 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-#include <limits.h>
-#include <string.h>
-#include <unistd.h>
-
-#include <iostream>
-#include <vector>
-
-#include <glog/logging.h>
-
-#include <mesos/mesos.hpp>
-
-#include <process/collect.hpp>
-#include <process/defer.hpp>
-#include <process/io.hpp>
-#include <process/pid.hpp>
-#include <process/subprocess.hpp>
-
-#include <stout/error.hpp>
-#include <stout/foreach.hpp>
-#include <stout/fs.hpp>
-#include <stout/hashset.hpp>
-#include <stout/json.hpp>
-#include <stout/lambda.hpp>
-#include <stout/mac.hpp>
-#include <stout/multihashmap.hpp>
-#include <stout/numify.hpp>
-#include <stout/os.hpp>
-#include <stout/option.hpp>
-#include <stout/path.hpp>
-#include <stout/protobuf.hpp>
-#include <stout/result.hpp>
-#include <stout/stringify.hpp>
-#include <stout/strings.hpp>
-#include <stout/utils.hpp>
-
-#include <stout/os/exists.hpp>
-#include <stout/os/stat.hpp>
-
-#include "common/status_utils.hpp"
-
-#include "linux/fs.hpp"
-#include "linux/ns.hpp"
-
-#include "linux/routing/route.hpp"
-#include "linux/routing/utils.hpp"
-
-#include "linux/routing/diagnosis/diagnosis.hpp"
-
-#include "linux/routing/filter/basic.hpp"
-#include "linux/routing/filter/icmp.hpp"
-#include "linux/routing/filter/ip.hpp"
-
-#include "linux/routing/handle.hpp"
-
-#include "linux/routing/link/link.hpp"
-
-#include "linux/routing/queueing/fq_codel.hpp"
-#include "linux/routing/queueing/htb.hpp"
-#include "linux/routing/queueing/ingress.hpp"
-#include "linux/routing/queueing/statistics.hpp"
-
-#include "mesos/resources.hpp"
-
-#include "slave/constants.hpp"
-
-#include "slave/containerizer/isolators/network/port_mapping.hpp"
-
-using namespace mesos::internal;
-
-using namespace process;
-
-using namespace routing;
-using namespace routing::filter;
-using namespace routing::queueing;
-using namespace routing::queueing::statistics;
-
-using std::cerr;
-using std::cout;
-using std::dec;
-using std::endl;
-using std::hex;
-using std::list;
-using std::ostringstream;
-using std::set;
-using std::sort;
-using std::string;
-using std::vector;
-
-using filter::ip::PortRange;
-
-using mesos::slave::ContainerLimitation;
-using mesos::slave::ContainerPrepareInfo;
-using mesos::slave::ContainerState;
-using mesos::slave::Isolator;
-
-// An old glibc might not have this symbol.
-#ifndef MNT_DETACH
-#define MNT_DETACH 2
-#endif
-
-namespace mesos {
-namespace internal {
-namespace slave {
-
-// The minimum number of ephemeral ports a container should have.
-static const uint16_t MIN_EPHEMERAL_PORTS_SIZE = 16;
-
-// Linux traffic control is a combination of queueing disciplines,
-// filters and classes organized as a tree for the ingress (tx) and
-// egress (rx) flows for each interface. Each container provides two
-// networking interfaces, a virtual eth0 and a loopback interface. The
-// flow of packets from the external network to container is shown
-// below:
-//
-//   +----------------------+----------------------+
-//   |                   Container                 |
-//   |----------------------|----------------------|
-//   |       eth0           |          lo          |
-//   +----------------------+----------------------+
-//          ^   |         ^           |
-//      [3] |   | [4]     |           |
-//          |   |     [7] +-----------+ [10]
-//          |   |
-//          |   |     [8] +-----------+ [9]
-//      [2] |   | [5]     |           |
-//          |   v         v           v
-//   +----------------------+----------------------+
-//   |      veth0           |          lo          |
-//   +----------------------|----------------------+
-//   |                     Host                    |
-//   |----------------------|----------------------|
-//   |                    eth0                     |
-//   +----------------------+----------------------|
-//                    ^           |
-//                [1] |           | [6]
-//                    |           v
-//
-// Traffic flowing from outside the network into a container enters
-// the system via the host ingress interface [1] and is routed based
-// on destination port to the outbound interface for the matching
-// container [2], which forwards the packet to the container's inbound
-// virtual interface. Outbound traffic destined for the external
-// network flows along the reverse path [4,5,6]. Loopback traffic is
-// directed to the corresponding Ethernet interface, either [7,10] or
-// [8,9] where the same destination port routing can be applied as to
-// external traffic. We use traffic control filters at several of the
-// interfaces to create these packet paths.
-//
-// Linux provides only a very simple topology for ingress interfaces.
-// A root is provided on a fixed handle (handle::INGRESS_ROOT) under
-// which a single qdisc can be installed, with handle ingress::HANDLE.
-// Traffic control filters can then be attached to the ingress qdisc.
-// We install one or more ingress filters on the host eth0 [1] to
-// direct traffic to the correct container, and on the container
-// virtual eth0 [5] to direct traffic to other containers or out of
-// the box. Since we know the ip port assignments for each container,
-// we can direct traffic directly to the appropriate container.
-// However, for ICMP and ARP traffic where no equivalent to a port
-// exists, we send a copy of the packet to every container and rely on
-// the network stack to drop unexpected packets.
-//
-// We install a Hierarchical Token Bucket (HTB) qdisc and class to
-// limit the outbound traffic bandwidth as the egress qdisc inside the
-// container [4] and then add a fq_codel qdisc to limit head of line
-// blocking on the egress filter. The egress traffic control chain is
-// thus:
-//
-// root device: handle::EGRESS_ROOT ->
-//    htb egress qdisc: CONTAINER_TX_HTB_HANDLE ->
-//        htb rate limiting class: CONTAINER_TX_HTB_CLASS_ID ->
-//            buffer-bloat reduction: FQ_CODEL
-constexpr Handle CONTAINER_TX_HTB_HANDLE = Handle(1, 0);
-constexpr Handle CONTAINER_TX_HTB_CLASS_ID =
-    Handle(CONTAINER_TX_HTB_HANDLE, 1);
-
-
-// Finally we create a second fq_codel qdisc on the public interface
-// of the host [6] to reduce performance interference between
-// containers. We create independent flows for each container, and
-// one for the host, which ensures packets from each container are
-// guaranteed fair access to the host interface. This egress traffic
-// control chain for the host interface is thus:
-//
-// root device: handle::EGRESS_ROOT ->
-//    buffer-bloat reduction: FQ_CODEL
-constexpr Handle HOST_TX_FQ_CODEL_HANDLE = Handle(1, 0);
-
-
-// The primary priority used by each type of filter.
-static const uint8_t ARP_FILTER_PRIORITY = 1;
-static const uint8_t ICMP_FILTER_PRIORITY = 2;
-static const uint8_t IP_FILTER_PRIORITY = 3;
-static const uint8_t DEFAULT_FILTER_PRIORITY = 4;
-
-
-// The secondary priorities used by filters.
-static const uint8_t HIGH = 1;
-static const uint8_t NORMAL = 2;
-static const uint8_t LOW = 3;
-
-
-// We assign a separate flow on host eth0 egress for each container
-// (See MESOS-2422 for details). Host egress traffic is assigned to a
-// reserved flow (HOST_FLOWID). ARP and ICMP traffic from containers
-// are not heavy, so they can share the same flow.
-static const uint16_t HOST_FLOWID = 1;
-static const uint16_t ARP_FLOWID = 2;
-static const uint16_t ICMP_FLOWID = 2;
-static const uint16_t CONTAINER_MIN_FLOWID = 3;
-
-
-// The well known ports. Used for sanity check.
-static Interval<uint16_t> WELL_KNOWN_PORTS()
-{
-  return (Bound<uint16_t>::closed(0), Bound<uint16_t>::open(1024));
-}
-
-
-/////////////////////////////////////////////////
-// Helper functions for the isolator.
-/////////////////////////////////////////////////
-
-// Given an integer x, find the largest integer t such that t <= x and
-// t is aligned to power of 2.
-static uint32_t roundDownToPowerOfTwo(uint32_t x)
-{
-  // Mutate x from 00001XXX to 0x00001111.
-
-  // We know the MSB has to be a 1, so kill the LSB and make sure the
-  // first 2 most significant bits are 1s.
-  x = x | (x >> 1);
-
-  // Now that the 2 most significant bits are 1s, make sure the first
-  // 4 most significant bits are 1s, too.
-  x = x | (x >> 2);
-
-  // We keep going. Note that the 0s left to the MSB are never turned
-  // to 1s.
-  x = x | (x >> 4);
-  x = x | (x >> 8);
-
-  // Now we have covered all 32 bits.
-  x = x | (x >> 16);
-
-  // 0x00001111 - (0x00001111 >> 1)
-  return x - (x >> 1);
-}
-
-
-// Returns the name of the host end of the virtual ethernet pair for a
-// given container. The kernel restricts link name to 16 characters or
-// less, so we cannot put container ID into the device name. Instead,
-// we use the pid of the executor process forked by the slave to
-// uniquely name the device for each container. It's safe because we
-// cannot have two active containers having the same pid for the
-// executor process.
-static string veth(pid_t pid)
-{
-  return PORT_MAPPING_VETH_PREFIX() + stringify(pid);
-}
-
-
-// Extracts the pid from the given veth name.
-static Option<pid_t> getPidFromVeth(const string& veth)
-{
-  if (strings::startsWith(veth, PORT_MAPPING_VETH_PREFIX())) {
-    Try<pid_t> pid = numify<pid_t>(
-        strings::remove(veth, PORT_MAPPING_VETH_PREFIX(), strings::PREFIX));
-
-    if (pid.isSome()) {
-      return pid.get();
-    }
-  }
-
-  return None();
-}
-
-
-// Extracts the container ID from the symlink that points to the
-// network namespace handle. The following is the layout of the bind
-// mount root and bind mount symlink root:
-//  <PORT_MAPPING_BIND_MOUNT_ROOT()>
-//    |--- 3945 (pid)                           <-|
-//                                                |
-//  <PORT_MAPPING_BIND_MOUNT_SYMLINK_ROOT()>      |
-//    |--- ecf293e7-e6e8-4cbc-aaee-4d6c958aa276 --|
-//         (symlink: container ID -> pid)
-static Try<ContainerID> getContainerIdFromSymlink(const string& symlink)
-{
-  if (!os::stat::islink(symlink)) {
-    return Error("Not a symlink");
-  }
-
-  string _containerId = Path(symlink).basename();
-
-  ContainerID containerId;
-  containerId.set_value(_containerId);
-
-  return containerId;
-}
-
-
-// Extracts the pid from the network namespace handle. Returns None if
-// the handle is clearly not created by us.
-static Result<pid_t> getPidFromNamespaceHandle(const string& handle)
-{
-  if (os::stat::islink(handle)) {
-    return Error("Not expecting a symlink");
-  }
-
-  string _pid = Path(handle).basename();
-
-  Try<pid_t> pid = numify<pid_t>(_pid);
-  if (pid.isError()) {
-    return None();
-  }
-
-  return pid.get();
-}
-
-
-// Extracts the pid from the symlink that points to the network
-// namespace handle. Returns None if it's a dangling symlink.
-static Result<pid_t> getPidFromSymlink(const string& symlink)
-{
-  if (!os::stat::islink(symlink)) {
-    return Error("Not a symlink");
-  }
-
-  Result<string> target = os::realpath(symlink);
-  if (target.isError()) {
-    return Error("Failed to follow the symlink: " + target.error());
-  } else if (target.isNone()) {
-    // This is a dangling symlink.
-    return None();
-  }
-
-  return getPidFromNamespaceHandle(target.get());
-}
-
-
-static string getSymlinkPath(const ContainerID& containerId)
-{
-  return path::join(
-      PORT_MAPPING_BIND_MOUNT_SYMLINK_ROOT(),
-      stringify(containerId));
-}
-
-
-static string getNamespaceHandlePath(pid_t pid)
-{
-  return path::join(
-      PORT_MAPPING_BIND_MOUNT_ROOT(),
-      stringify(pid));
-}
-
-
-// Converts from value ranges to interval set.
-static IntervalSet<uint16_t> getIntervalSet(const Value::Ranges& ranges)
-{
-  IntervalSet<uint16_t> set;
-
-  for (int i = 0; i < ranges.range_size(); i++) {
-    set += (Bound<uint16_t>::closed(ranges.range(i).begin()),
-            Bound<uint16_t>::closed(ranges.range(i).end()));
-  }
-
-  return set;
-}
-
-/////////////////////////////////////////////////
-// Implementation for PortMappingUpdate.
-/////////////////////////////////////////////////
-
-const char* PortMappingUpdate::NAME = "update";
-
-
-PortMappingUpdate::Flags::Flags()
-{
-  add(&eth0_name,
-      "eth0_name",
-      "The name of the public network interface (e.g., eth0)");
-
-  add(&lo_name,
-      "lo_name",
-      "The name of the loopback network interface (e.g., lo)");
-
-  add(&pid,
-      "pid",
-      "The pid of the process whose namespaces we will enter");
-
-  add(&ports_to_add,
-      "ports_to_add",
-      "A collection of port ranges (formatted as a JSON object)\n"
-      "for which to add IP filters. E.g.,\n"
-      "--ports_to_add={\"range\":[{\"begin\":4,\"end\":8}]}");
-
-  add(&ports_to_remove,
-      "ports_to_remove",
-      "A collection of port ranges (formatted as a JSON object)\n"
-      "for which to remove IP filters. E.g.,\n"
-      "--ports_to_remove={\"range\":[{\"begin\":4,\"end\":8}]}");
-}
-
-
-// The following two helper functions allow us to convert from a
-// collection of port ranges to a JSON object and vice versa. They
-// will be used for the port mapping update operation.
-template <typename Iterable>
-JSON::Object json(const Iterable& ranges)
-{
-  Value::Ranges values;
-  foreach (const PortRange& range, ranges) {
-    Value::Range value;
-    value.set_begin(range.begin());
-    value.set_end(range.end());
-
-    values.add_range()->CopyFrom(value);
-  }
-  return JSON::Protobuf(values);
-}
-
-
-static Try<vector<PortRange>> parse(const JSON::Object& object)
-{
-  Try<Value::Ranges> parsing = protobuf::parse<Value::Ranges>(object);
-  if (parsing.isError()) {
-    return Error("Failed to parse JSON: " + parsing.error());
-  }
-
-  vector<PortRange> ranges;
-  Value::Ranges values = parsing.get();
-  for (int i = 0; i < values.range_size(); i++) {
-    const Value::Range& value = values.range(i);
-    Try<PortRange> range = PortRange::fromBeginEnd(value.begin(), value.end());
-    if (range.isError()) {
-      return Error("Invalid port range: " + range.error());
-    }
-
-    ranges.push_back(range.get());
-  }
-  return ranges;
-}
-
-
-// Helper function to set up IP filters inside the container for a
-// given port range.
-static Try<Nothing> addContainerIPFilters(
-    const PortRange& range,
-    const string& eth0,
-    const string& lo)
-{
-  // Add an IP packet filter on lo such that local traffic inside a
-  // container will not be redirected to eth0.
-  Try<bool> loTerminal = filter::ip::create(
-      lo,
-      ingress::HANDLE,
-      ip::Classifier(None(), None(), None(), range),
-      Priority(IP_FILTER_PRIORITY, HIGH),
-      action::Terminal());
-
-  if (loTerminal.isError()) {
-    return Error(
-        "Failed to create an IP packet filter on " + lo +
-        " which stops packets from being sent to " + eth0 +
-        ": " + loTerminal.error());
-  } else if (!loTerminal.get()) {
-    return Error(
-        "The IP packet filter on " + lo +
-        " which stops packets from being sent to " +
-        eth0 + " already exists");
-  }
-
-  // Add an IP packet filter (for loopback IP) from eth0 to lo to
-  // redirect all loopback IP traffic to lo.
-  Try<bool> eth0ToLoLoopback = filter::ip::create(
-      eth0,
-      ingress::HANDLE,
-      ip::Classifier(
-          None(),
-          net::IPNetwork::LOOPBACK_V4().address(),
-          None(),
-          range),
-      Priority(IP_FILTER_PRIORITY, NORMAL),
-      action::Redirect(lo));
-
-  if (eth0ToLoLoopback.isError()) {
-    return Error(
-        "Failed to create an IP packet filter (for loopback IP) from " +
-        eth0 + " to " + lo + ": " + eth0ToLoLoopback.error());
-  } else if (!eth0ToLoLoopback.get()) {
-    return Error(
-        "The IP packet filter (for loopback IP) from " +
-        eth0 + " to " + lo + " already exists");
-  }
-
-  return Nothing();
-}
-
-
-// Helper function to remove IP filters inside the container for a
-// given port range.
-static Try<Nothing> removeContainerIPFilters(
-    const PortRange& range,
-    const string& eth0,
-    const string& lo)
-{
-  // Remove the 'terminal' IP packet filter on lo.
-  Try<bool> loTerminal = filter::ip::remove(
-      lo,
-      ingress::HANDLE,
-      ip::Classifier(None(), None(), None(), range));
-
-  if (loTerminal.isError()) {
-    return Error(
-        "Failed to remove the IP packet filter on " + lo +
-        " which stops packets from being sent to " + eth0 +
-        ": " + loTerminal.error());
-  } else if (!loTerminal.get()) {
-    return Error(
-        "The IP packet filter on " + lo +
-        " which stops packets from being sent to " + eth0 +
-        " does not exist");
-  }
-
-  // Remove the IP packet filter (for loopback IP) from eth0 to lo.
-  Try<bool> eth0ToLoLoopback = filter::ip::remove(
-      eth0,
-      ingress::HANDLE,
-      ip::Classifier(
-          None(),
-          net::IPNetwork::LOOPBACK_V4().address(),
-          None(),
-          range));
-
-  if (eth0ToLoLoopback.isError()) {
-    return Error(
-        "Failed to remove the IP packet filter (for loopback IP) from " +
-        eth0 + " to " + lo + ": " + eth0ToLoLoopback.error());
-  } else if (!eth0ToLoLoopback.get()) {
-    return Error(
-        "The IP packet filter (for loopback IP) from " +
-        eth0 + " to " + lo + " does not exist");
-  }
-
-  return Nothing();
-}
-
-
-int PortMappingUpdate::execute()
-{
-  if (flags.help) {
-    cerr << "Usage: " << name() << " [OPTIONS]" << endl << endl
-         << "Supported options:" << endl
-         << flags.usage();
-    return 0;
-  }
-
-  if (flags.eth0_name.isNone()) {
-    cerr << "The public interface name (e.g., eth0) is not specified" << endl;
-    return 1;
-  }
-
-  if (flags.lo_name.isNone()) {
-    cerr << "The loopback interface name (e.g., lo) is not specified" << endl;
-    return 1;
-  }
-
-  if (flags.pid.isNone()) {
-    cerr << "The pid is not specified" << endl;
-    return 1;
-  }
-
-  if (flags.ports_to_add.isNone() && flags.ports_to_remove.isNone()) {
-    cerr << "Nothing to update" << endl;
-    return 1;
-  }
-
-  Option<vector<PortRange>> portsToAdd;
-  Option<vector<PortRange>> portsToRemove;
-
-  if (flags.ports_to_add.isSome()) {
-    Try<vector<PortRange>> parsing = parse(flags.ports_to_add.get());
-    if (parsing.isError()) {
-      cerr << "Parsing 'ports_to_add' failed: " << parsing.error() << endl;
-      return 1;
-    }
-    portsToAdd = parsing.get();
-  }
-
-  if (flags.ports_to_remove.isSome()) {
-    Try<vector<PortRange>> parsing = parse(flags.ports_to_remove.get());
-    if (parsing.isError()) {
-      cerr << "Parsing 'ports_to_remove' failed: " << parsing.error() << endl;
-      return 1;
-    }
-    portsToRemove = parsing.get();
-  }
-
-  // Enter the network namespace.
-  Try<Nothing> setns = ns::setns(flags.pid.get(), "net");
-  if (setns.isError()) {
-    cerr << "Failed to enter the network namespace of pid " << flags.pid.get()
-         << ": " << setns.error() << endl;
-    return 1;
-  }
-
-  // Update IP packet filters.
-  const string eth0 = flags.eth0_name.get();
-  const string lo = flags.lo_name.get();
-
-  if (portsToAdd.isSome()) {
-    foreach (const PortRange& range, portsToAdd.get()) {
-      Try<Nothing> add = addContainerIPFilters(range, eth0, lo);
-      if (add.isError()) {
-        cerr << "Failed to add IP filters: " << add.error() << endl;
-        return 1;
-      }
-    }
-  }
-
-  if (portsToRemove.isSome()) {
-    foreach (const PortRange& range, portsToRemove.get()) {
-      Try<Nothing> remove = removeContainerIPFilters(range, eth0, lo);
-      if (remove.isError()) {
-        cerr << "Failed to remove IP filters: " << remove.error() << endl;
-        return 1;
-      }
-    }
-  }
-
-  return 0;
-}
-
-/////////////////////////////////////////////////
-// Implementation for PortMappingStatistics.
-/////////////////////////////////////////////////
-
-const char* PortMappingStatistics::NAME = "statistics";
-
-
-PortMappingStatistics::Flags::Flags()
-{
-  add(&eth0_name,
-      "eth0_name",
-      "The name of the public network interface (e.g., eth0)");
-
-  add(&pid,
-      "pid",
-      "The pid of the process whose namespaces we will enter");
-
-  add(&enable_socket_statistics_summary,
-      "enable_socket_statistics_summary",
-      "Whether to collect socket statistics summary for this container\n",
-      false);
-
-  add(&enable_socket_statistics_details,
-      "enable_socket_statistics_details",
-      "Whether to collect socket statistics details (e.g., TCP RTT)\n"
-      "for this container.",
-      false);
-}
-
-
-// A helper that copies the traffic control statistics from the
-// statistics hashmap into the ResourceStatistics protocol buffer.
-static void addTrafficControlStatistics(
-    const string& id,
-    const hashmap<string, uint64_t>& statistics,
-    ResourceStatistics* result)
-{
-  TrafficControlStatistics *tc = result->add_net_traffic_control_statistics();
-
-  tc->set_id(id);
-
-  // TODO(pbrett) Use protobuf reflection here.
-  if (statistics.contains(BACKLOG)) {
-    tc->set_backlog(statistics.at(BACKLOG));
-  }
-  if (statistics.contains(BYTES)) {
-    tc->set_bytes(statistics.at(BYTES));
-  }
-  if (statistics.contains(DROPS)) {
-    tc->set_drops(statistics.at(DROPS));
-  }
-  if (statistics.contains(OVERLIMITS)) {
-    tc->set_overlimits(statistics.at(OVERLIMITS));
-  }
-  if (statistics.contains(PACKETS)) {
-    tc->set_packets(statistics.at(PACKETS));
-  }
-  if (statistics.contains(QLEN)) {
-    tc->set_qlen(statistics.at(QLEN));
-  }
-  if (statistics.contains(RATE_BPS)) {
-    tc->set_ratebps(statistics.at(RATE_BPS));
-  }
-  if (statistics.contains(RATE_PPS)) {
-    tc->set_ratepps(statistics.at(RATE_PPS));
-  }
-  if (statistics.contains(REQUEUES)) {
-    tc->set_requeues(statistics.at(REQUEUES));
-  }
-}
-
-
-int PortMappingStatistics::execute()
-{
-  if (flags.help) {
-    cerr << "Usage: " << name() << " [OPTIONS]" << endl << endl
-         << "Supported options:" << endl
-         << flags.usage();
-    return 0;
-  }
-
-  if (flags.pid.isNone()) {
-    cerr << "The pid is not specified" << endl;
-    return 1;
-  }
-
-  if (flags.eth0_name.isNone()) {
-    cerr << "The public interface name (e.g., eth0) is not specified" << endl;
-    return 1;
-  }
-
-  // Enter the network namespace.
-  Try<Nothing> setns = ns::setns(flags.pid.get(), "net");
-  if (setns.isError()) {
-    // This could happen if the executor exits before this function is
-    // invoked. We do not log here to avoid spurious logging.
-    return 1;
-  }
-
-  ResourceStatistics result;
-
-  // NOTE: We use a dummy value here since this field will be cleared
-  // before the result is sent to the containerizer.
-  result.set_timestamp(0);
-
-  if (flags.enable_socket_statistics_summary) {
-    // Collections for socket statistics summary are below.
-
-    // For TCP, get the number of ACTIVE and TIME_WAIT connections,
-    // from reading /proc/net/sockstat (/proc/net/sockstat6 for IPV6).
-    // This is not as expensive in the kernel because only counter
-    // values are accessed instead of a dump of all the sockets.
-    // Example output:
-
-    // $ cat /proc/net/sockstat
-    // sockets: used 1391
-    // TCP: inuse 33 orphan 0 tw 0 alloc 37 mem 6
-    // UDP: inuse 15 mem 7
-    // UDPLITE: inuse 0
-    // RAW: inuse 0
-    // FRAG: inuse 0 memory 0
-
-    Try<string> value = os::read("/proc/net/sockstat");
-    if (value.isError()) {
-      cerr << "Failed to read /proc/net/sockstat: " << value.error() << endl;
-      return 1;
-    }
-
-    foreach (const string& line, strings::tokenize(value.get(), "\n")) {
-      if (!strings::startsWith(line, "TCP")) {
-        continue;
-      }
-
-      vector<string> tokens = strings::tokenize(line, " ");
-      for (size_t i = 0; i < tokens.size(); i++) {
-        if (tokens[i] == "inuse") {
-          if (i + 1 >= tokens.size()) {
-            cerr << "Unexpected output from /proc/net/sockstat" << endl;
-            // Be a bit forgiving here here since the /proc file
-            // output format can change, though not very likely.
-            continue;
-          }
-
-          // Set number of active TCP connections.
-          Try<size_t> inuse = numify<size_t>(tokens[i+1]);
-          if (inuse.isError()) {
-            cerr << "Failed to parse the number of tcp connections in use: "
-                 << inuse.error() << endl;
-            continue;
-          }
-
-          result.set_net_tcp_active_connections(inuse.get());
-        } else if (tokens[i] == "tw") {
-          if (i + 1 >= tokens.size()) {
-            cerr << "Unexpected output from /proc/net/sockstat" << endl;
-            // Be a bit forgiving here here since the /proc file
-            // output format can change, though not very likely.
-            continue;
-          }
-
-          // Set number of TIME_WAIT TCP connections.
-          Try<size_t> tw = numify<size_t>(tokens[i+1]);
-          if (tw.isError()) {
-            cerr << "Failed to parse the number of tcp connections in"
-                 << " TIME_WAIT: " << tw.error() << endl;
-            continue;
-          }
-
-          result.set_net_tcp_time_wait_connections(tw.get());
-        }
-      }
-    }
-  }
-
-  if (flags.enable_socket_statistics_details) {
-    // Collections for socket statistics details are below.
-
-    // NOTE: If the underlying library uses the older version of
-    // kernel API, the family argument passed in may not be honored.
-    Try<vector<diagnosis::socket::Info>> infos =
-      diagnosis::socket::infos(AF_INET, diagnosis::socket::state::ALL);
-
-    if (infos.isError()) {
-      cerr << "Failed to retrieve the socket information" << endl;
-      return 1;
-    }
-
-    vector<uint32_t> RTTs;
-    foreach (const diagnosis::socket::Info& info, infos.get()) {
-      // We double check on family regardless.
-      if (info.family != AF_INET) {
-        continue;
-      }
-
-      // We consider all sockets that have non-zero rtt value.
-      if (info.tcpInfo.isSome() && info.tcpInfo.get().tcpi_rtt != 0) {
-        RTTs.push_back(info.tcpInfo.get().tcpi_rtt);
-      }
-    }
-
-    // Only print to stdout when we have results.
-    if (RTTs.size() > 0) {
-      std::sort(RTTs.begin(), RTTs.end());
-
-      // NOTE: The size of RTTs is usually within 1 million so we
-      // don't need to worry about overflow here.
-      // TODO(jieyu): Right now, we choose to use "Nearest rank" for
-      // simplicity. Consider directly using the Statistics abstraction
-      // which computes "Linear interpolation between closest ranks".
-      // http://en.wikipedia.org/wiki/Percentile
-      size_t p50 = RTTs.size() * 50 / 100;
-      size_t p90 = RTTs.size() * 90 / 100;
-      size_t p95 = RTTs.size() * 95 / 100;
-      size_t p99 = RTTs.size() * 99 / 100;
-
-      result.set_net_tcp_rtt_microsecs_p50(RTTs[p50]);
-      result.set_net_tcp_rtt_microsecs_p90(RTTs[p90]);
-      result.set_net_tcp_rtt_microsecs_p95(RTTs[p95]);
-      result.set_net_tcp_rtt_microsecs_p99(RTTs[p99]);
-    }
-  }
-
-  // Collect traffic statistics for the container from the container
-  // virtual interface and export them in JSON.
-  const string& eth0 = flags.eth0_name.get();
-
-  // Overlimits are reported on the HTB qdisc at the egress root.
-  Result<hashmap<string, uint64_t>> statistics =
-    htb::statistics(eth0, EGRESS_ROOT);
-
-  if (statistics.isSome()) {
-    addTrafficControlStatistics(
-        NET_ISOLATOR_BW_LIMIT,
-        statistics.get(),
-        &result);
-  } else if (statistics.isNone()) {
-    // Traffic control statistics are only available when the
-    // container is created on a slave when the egress rate limit is
-    // on (i.e., egress_rate_limit_per_container flag is set). We
-    // can't just test for that flag here however, since the slave may
-    // have been restarted with different flags since the container
-    // was created. It is also possible that isolator statistics are
-    // unavailable because we the container is in the process of being
-    // created or destroy. Hence we do not report a lack of network
-    // statistics as an error.
-  } else if (statistics.isError()) {
-    cerr << "Failed to get htb qdisc statistics on " << eth0
-         << " in namespace " << flags.pid.get() << endl;
-  }
-
-  // Drops due to the bandwidth limit should be reported at the leaf.
-  statistics = fq_codel::statistics(eth0, CONTAINER_TX_HTB_CLASS_ID);
-  if (statistics.isSome()) {
-    addTrafficControlStatistics(
-        NET_ISOLATOR_BLOAT_REDUCTION,
-        statistics.get(),
-        &result);
-  } else if (statistics.isNone()) {
-    // See discussion on network isolator statistics above.
-  } else if (statistics.isError()) {
-    cerr << "Failed to get fq_codel qdisc statistics on " << eth0
-         << " in namespace " << flags.pid.get() << endl;
-  }
-
-  cout << stringify(JSON::Protobuf(result));
-  return 0;
-}
-
-
-/////////////////////////////////////////////////
-// Implementation for the isolator.
-/////////////////////////////////////////////////
-
-PortMappingIsolatorProcess::Metrics::Metrics()
-  : adding_eth0_ip_filters_errors(
-        "port_mapping/adding_eth0_ip_filters_errors"),
-    adding_eth0_ip_filters_already_exist(
-        "port_mapping/adding_eth0_ip_filters_already_exist"),
-    adding_eth0_egress_filters_errors(
-        "port_mapping/adding_eth0_egress_filters_errors"),
-    adding_eth0_egress_filters_already_exist(
-        "port_mapping/adding_eth0_egress_filters_already_exist"),
-    adding_lo_ip_filters_errors(
-        "port_mapping/adding_lo_ip_filters_errors"),
-    adding_lo_ip_filters_already_exist(
-        "port_mapping/adding_lo_ip_filters_already_exist"),
-    adding_veth_ip_filters_errors(
-        "port_mapping/adding_veth_ip_filters_errors"),
-    adding_veth_ip_filters_already_exist(
-        "port_mapping/adding_veth_ip_filters_already_exist"),
-    adding_veth_icmp_filters_errors(
-        "port_mapping/adding_veth_icmp_filters_errors"),
-    adding_veth_icmp_filters_already_exist(
-        "port_mapping/adding_veth_icmp_filters_already_exist"),
-    adding_veth_arp_filters_errors(
-        "port_mapping/adding_veth_arp_filters_errors"),
-    adding_veth_arp_filters_already_exist(
-        "port_mapping/adding_veth_arp_filters_already_exist"),
-    adding_eth0_icmp_filters_errors(
-        "port_mapping/adding_eth0_icmp_filters_errors"),
-    adding_eth0_icmp_filters_already_exist(
-        "port_mapping/adding_eth0_icmp_filters_already_exist"),
-    adding_eth0_arp_filters_errors(
-        "port_mapping/adding_eth0_arp_filters_errors"),
-    adding_eth0_arp_filters_already_exist(
-        "port_mapping/adding_eth0_arp_filters_already_exist"),
-    removing_eth0_ip_filters_errors(
-        "port_mapping/removing_eth0_ip_filters_errors"),
-    removing_eth0_ip_filters_do_not_exist(
-        "port_mapping/removing_eth0_ip_filters_do_not_exist"),
-    removing_eth0_egress_filters_errors(
-        "port_mapping/removing_eth0_egress_filters_errors"),
-    removing_eth0_egress_filters_do_not_exist(
-        "port_mapping/removinging_eth0_egress_filters_do_not_exist"),
-    removing_lo_ip_filters_errors(
-        "port_mapping/removing_lo_ip_filters_errors"),
-    removing_lo_ip_filters_do_not_exist(
-        "port_mapping/removing_lo_ip_filters_do_not_exist"),
-    removing_veth_ip_filters_errors(
-        "port_mapping/removing_veth_ip_filters_errors"),
-    removing_veth_ip_filters_do_not_exist(
-        "port_mapping/removing_veth_ip_filters_do_not_exist"),
-    removing_eth0_icmp_filters_errors(
-        "port_mapping/removing_eth0_icmp_filters_errors"),
-    removing_eth0_icmp_filters_do_not_exist(
-        "port_mapping/removing_eth0_icmp_filters_do_not_exist"),
-    removing_eth0_arp_filters_errors(
-        "port_mapping/removing_eth0_arp_filters_errors"),
-    removing_eth0_arp_filters_do_not_exist(
-        "port_mapping/removing_eth0_arp_filters_do_not_exist"),
-    updating_eth0_icmp_filters_errors(
-        "port_mapping/updating_eth0_icmp_filters_errors"),
-    updating_eth0_icmp_filters_already_exist(
-        "port_mapping/updating_eth0_icmp_filters_already_exist"),
-    updating_eth0_icmp_filters_do_not_exist(
-        "port_mapping/updating_eth0_icmp_filters_do_not_exist"),
-    updating_eth0_arp_filters_errors(
-        "port_mapping/updating_eth0_arp_filters_errors"),
-    updating_eth0_arp_filters_already_exist(
-        "port_mapping/updating_eth0_arp_filters_already_exist"),
-    updating_eth0_arp_filters_do_not_exist(
-        "port_mapping/updating_eth0_arp_filters_do_not_exist"),
-    updating_container_ip_filters_errors(
-        "port_mapping/updating_container_ip_filters_errors")
-{
-  process::metrics::add(adding_eth0_ip_filters_errors);
-  process::metrics::add(adding_eth0_ip_filters_already_exist);
-  process::metrics::add(adding_lo_ip_filters_errors);
-  process::metrics::add(adding_lo_ip_filters_already_exist);
-  process::metrics::add(adding_veth_ip_filters_errors);
-  process::metrics::add(adding_veth_ip_filters_already_exist);
-  process::metrics::add(adding_veth_icmp_filters_errors);
-  process::metrics::add(adding_veth_icmp_filters_already_exist);
-  process::metrics::add(adding_veth_arp_filters_errors);
-  process::metrics::add(adding_veth_arp_filters_already_exist);
-  process::metrics::add(adding_eth0_icmp_filters_errors);
-  process::metrics::add(adding_eth0_icmp_filters_already_exist);
-  process::metrics::add(adding_eth0_arp_filters_errors);
-  process::metrics::add(adding_eth0_arp_filters_already_exist);
-  process::metrics::add(removing_eth0_ip_filters_errors);
-  process::metrics::add(removing_eth0_ip_filters_do_not_exist);
-  process::metrics::add(removing_lo_ip_filters_errors);
-  process::metrics::add(removing_lo_ip_filters_do_not_exist);
-  process::metrics::add(removing_veth_ip_filters_errors);
-  process::metrics::add(removing_veth_ip_filters_do_not_exist);
-  process::metrics::add(removing_eth0_icmp_filters_errors);
-  process::metrics::add(removing_eth0_icmp_filters_do_not_exist);
-  process::metrics::add(removing_eth0_arp_filters_errors);
-  process::metrics::add(removing_eth0_arp_filters_do_not_exist);
-  process::metrics::add(updating_eth0_icmp_filters_errors);
-  process::metrics::add(updating_eth0_icmp_filters_already_exist);
-  process::metrics::add(updating_eth0_icmp_filters_do_not_exist);
-  process::metrics::add(updating_eth0_arp_filters_errors);
-  process::metrics::add(updating_eth0_arp_filters_already_exist);
-  process::metrics::add(updating_eth0_arp_filters_do_not_exist);
-  process::metrics::add(updating_container_ip_filters_errors);
-}
-
-
-PortMappingIsolatorProcess::Metrics::~Metrics()
-{
-  process::metrics::remove(adding_eth0_ip_filters_errors);
-  process::metrics::remove(adding_eth0_ip_filters_already_exist);
-  process::metrics::remove(adding_lo_ip_filters_errors);
-  process::metrics::remove(adding_lo_ip_filters_already_exist);
-  process::metrics::remove(adding_veth_ip_filters_errors);
-  process::metrics::remove(adding_veth_ip_filters_already_exist);
-  process::metrics::remove(adding_veth_icmp_filters_errors);
-  process::metrics::remove(adding_veth_icmp_filters_already_exist);
-  process::metrics::remove(adding_veth_arp_filters_errors);
-  process::metrics::remove(adding_veth_arp_filters_already_exist);
-  process::metrics::remove(adding_eth0_icmp_filters_errors);
-  process::metrics::remove(adding_eth0_icmp_filters_already_exist);
-  process::metrics::remove(adding_eth0_arp_filters_errors);
-  process::metrics::remove(adding_eth0_arp_filters_already_exist);
-  process::metrics::remove(removing_eth0_ip_filters_errors);
-  process::metrics::remove(removing_eth0_ip_filters_do_not_exist);
-  process::metrics::remove(removing_lo_ip_filters_errors);
-  process::metrics::remove(removing_lo_ip_filters_do_not_exist);
-  process::metrics::remove(removing_veth_ip_filters_errors);
-  process::metrics::remove(removing_veth_ip_filters_do_not_exist);
-  process::metrics::remove(removing_eth0_icmp_filters_errors);
-  process::metrics::remove(removing_eth0_icmp_filters_do_not_exist);
-  process::metrics::remove(removing_eth0_arp_filters_errors);
-  process::metrics::remove(removing_eth0_arp_filters_do_not_exist);
-  process::metrics::remove(updating_eth0_icmp_filters_errors);
-  process::metrics::remove(updating_eth0_icmp_filters_already_exist);
-  process::metrics::remove(updating_eth0_icmp_filters_do_not_exist);
-  process::metrics::remove(updating_eth0_arp_filters_errors);
-  process::metrics::remove(updating_eth0_arp_filters_already_exist);
-  process::metrics::remove(updating_eth0_arp_filters_do_not_exist);
-  process::metrics::remove(updating_container_ip_filters_errors);
-}
-
-
-Try<Isolator*> PortMappingIsolatorProcess::create(const Flags& flags)
-{
-  // Check for root permission.
-  if (geteuid() != 0) {
-    return Error("Using network isolator requires root permissions");
-  }
-
-  // Verify that the network namespace is available by checking the
-  // existence of the network namespace handle of the current process.
-  if (ns::namespaces().count("net") == 0) {
-    return Error(
-        "Using network isolator requires network namespace. "
-        "Make sure your kernel is newer than 3.4");
-  }
-
-  // Check the routing library.
-  Try<Nothing> check = routing::check();
-  if (check.isError()) {
-    return Error(
-        "Routing library check failed: " +
-        check.error());
-  }
-
-  // Check the availability of a few Linux commands that we will use.
-  // We use the blocking os::shell here because 'create' will only be
-  // invoked during initialization.
-  Try<string> checkCommandTc = os::shell("tc filter show");
-  if (checkCommandTc.isError()) {
-    return Error("Check command 'tc' failed: " + checkCommandTc.error());
-  }
-
-  Try<string> checkCommandIp = os::shell("ip link show");
-  if (checkCommandIp.isError()) {
-    return Error("Check command 'ip' failed: " + checkCommandIp.error());
-  }
-
-  Try<Resources> resources = Resources::parse(
-      flags.resources.getOrElse(""),
-      flags.default_role);
-
-  if (resources.isError()) {
-    return Error("Failed to parse --resources: " + resources.error());
-  }
-
-  // Get 'ports' resource from 'resources' flag. These ports will be
-  // treated as non-ephemeral ports.
-  IntervalSet<uint16_t> nonEphemeralPorts;
-  if (resources.get().ports().isSome()) {
-    nonEphemeralPorts = getIntervalSet(resources.get().ports().get());
-  }
-
-  // Get 'ephemeral_ports' resource from 'resources' flag. These ports
-  // will be allocated to each container as ephemeral ports.
-  IntervalSet<uint16_t> ephemeralPorts;
-  if (resources.get().ephemeral_ports().isSome()) {
-    ephemeralPorts = getIntervalSet(resources.get().ephemeral_ports().get());
-  }
-
-  // Each container requires at least one ephemeral port for slave
-  // executor communication. If no 'ephemeral_ports' resource is
-  // found, we will return error.
-  if (ephemeralPorts.empty()) {
-    return Error("Ephemeral ports are not specified");
-  }
-
-  // Sanity check to make sure that the ephemeral ports specified do
-  // not intersect with the specified non-ephemeral ports.
-  if (ephemeralPorts.intersects(nonEphemeralPorts)) {
-    return Error(
-        "The specified ephemeral ports " + stringify(ephemeralPorts) +
-        " intersect with the specified non-ephemeral ports " +
-        stringify(nonEphemeralPorts));
-  }
-
-  // This is a sanity check to make sure that the ephemeral ports
-  // specified do not intersect with the well known ports.
-  if (ephemeralPorts.intersects(WELL_KNOWN_PORTS())) {
-    return Error(
-        "The specified ephemeral ports " + stringify(ephemeralPorts) +
-        " intersect with well known ports " + stringify(WELL_KNOWN_PORTS()));
-  }
-
-  // Obtain the host ephemeral port range by reading the proc file
-  // system ('ip_local_port_range').
-  Try<string> value = os::read("/proc/sys/net/ipv4/ip_local_port_range");
-  if (value.isError()) {
-    return Error("Failed to read host ip_local_port_range: " + value.error());
-  }
-
-  vector<string> split = strings::split(strings::trim(value.get()), "\t");
-  if (split.size() != 2) {
-    return Error(
-        "Unexpected format from host ip_local_port_range: " + value.get());
-  }
-
-  Try<uint16_t> begin = numify<uint16_t>(split[0]);
-  if (begin.isError()) {
-    return Error(
-        "Failed to parse the begin of host ip_local_port_range: " + split[0]);
-  }
-
-  Try<uint16_t> end = numify<uint16_t>(split[1]);
-  if (end.isError()) {
-    return Error(
-        "Failed to parse the end of host ip_local_port_range: " + split[1]);
-  }
-
-  Interval<uint16_t> hostEphemeralPorts =
-    (Bound<uint16_t>::closed(begin.get()),
-     Bound<uint16_t>::closed(end.get()));
-
-  // Sanity check to make sure the specified ephemeral ports do not
-  // intersect with the ephemeral ports used by the host.
-  if (ephemeralPorts.intersects(hostEphemeralPorts)) {
-    return Error(
-        "The specified ephemeral ports " + stringify(ephemeralPorts) +
-        " intersect with the ephemeral ports used by the host " +
-        stringify(hostEphemeralPorts));
-  }
-
-  // TODO(chzhcn): Cross check ephemeral ports with used ports on the
-  // host (e.g., using port scan).
-
-  // Initialize the ephemeral ports allocator.
-
-  // In theory, any positive integer can be broken up into a few
-  // numbers that are power of 2 aligned. We choose to not allow this
-  // for now so that each container has a fixed (one) number of
-  // filters for ephemeral ports. This makes it easy to debug and
-  // infer performance.
-  if (roundDownToPowerOfTwo(flags.ephemeral_ports_per_container) !=
-      flags.ephemeral_ports_per_container) {
-    return Error(
-        "The number of ephemeral ports for each container (" +
-        stringify(flags.ephemeral_ports_per_container) +
-        ") is not a power of 2");
-  }
-
-  if (ephemeralPorts.size() < flags.ephemeral_ports_per_container) {
-    return Error(
-        "Network Isolator is given ephemeral ports of size: " +
-        stringify(ephemeralPorts.size()) + ", but asked to allocate " +
-        stringify(flags.ephemeral_ports_per_container) +
-        " ephemeral ports for a container");
-  }
-
-  if (flags.ephemeral_ports_per_container < MIN_EPHEMERAL_PORTS_SIZE) {
-    return Error(
-        "Each container has only " +
-        stringify(flags.ephemeral_ports_per_container) +
-        " ephemeral ports. The minimum required is: " +
-        stringify(MIN_EPHEMERAL_PORTS_SIZE));
-  }
-
-  Owned<EphemeralPortsAllocator> ephemeralPortsAllocator(
-      new EphemeralPortsAllocator(
-        ephemeralPorts,
-        flags.ephemeral_ports_per_container));
-
-  // Get the name of the public interface (e.g., eth0). If it is not
-  // specified, try to derive its name from the routing library.
-  Result<string> eth0 = link::eth0();
-  if (flags.eth0_name.isSome()) {
-    eth0 = flags.eth0_name.get();
-
-    // Check if the given public interface exists.
-    Try<bool> hostEth0Exists = link::exists(eth0.get());
-    if (hostEth0Exists.isError()) {
-      return Error(
-          "Failed to check if " + eth0.get() + " exists: " +
-          hostEth0Exists.error());
-    } else if (!hostEth0Exists.get()) {
-      return Error("The public interface " + eth0.get() + " does not exist");
-    }
-  } else if (!eth0.isSome()){
-    // eth0 is not specified in the flag and we did not get a valid
-    // eth0 from the library.
-    return Error(
-        "Network Isolator failed to find a public interface: " + eth0.error());
-  }
-
-  LOG(INFO) << "Using " << eth0.get() << " as the public interface";
-
-  // Get the name of the loopback interface. If it is not specified,
-  // try to derive its name based on the loopback IP address.
-  Result<string> lo = link::lo();
-  // Option<string> lo = flags.lo_name;
-  if (flags.lo_name.isSome()) {
-    lo = flags.lo_name;
-
-    // Check if the given loopback interface exists.
-    Try<bool> hostLoExists = link::exists(lo.get());
-    if (hostLoExists.isError()) {
-      return Error(
-          "Failed to check if " + lo.get() + " exists: " +
-          hostLoExists.error());
-    } else if (!hostLoExists.get()) {
-      return Error("The loopback interface " + lo.get() + " does not exist");
-    }
-  } else if (!lo.isSome()) {
-    // lo is not specified in the flag and we did not get a valid
-    // lo from the library.
-    return Error(
-        "Network Isolator failed to find a loopback interface: " + lo.error());
-  }
-
-  LOG(INFO) << "Using " << lo.get() << " as the loopback interface";
-
-  // If egress rate limit is provided, do a sanity check that it is
-  // not greater than the host physical link speed.
-  Option<Bytes> egressRateLimitPerContainer;
-  if (flags.egress_rate_limit_per_container.isSome()) {
-    // Read host physical link speed from /sys/class/net/eth0/speed.
-    // This value is in MBits/s.
-    Try<string> value =
-      os::read(path::join("/sys/class/net", eth0.get(), "speed"));
-
-    if (value.isError()) {
-      return Error(
-          "Failed to read " +
-          path::join("/sys/class/net", eth0.get(), "speed") +
-          ": " + value.error());
-    }
-
-    Try<uint64_t> hostLinkSpeed = numify<uint64_t>(strings::trim(value.get()));
-    CHECK_SOME(hostLinkSpeed);
-
-    // It could be possible that the nic driver doesn't support
-    // reporting physical link speed. In that case, report error.
-    if (hostLinkSpeed.get() == 0xFFFFFFFF) {
-      return Error(
-          "Network Isolator failed to determine link speed for " + eth0.get());
-    }
-
-    // Convert host link speed to Bytes/s for comparason.
-    if (hostLinkSpeed.get() * 1000000 / 8 <
-        flags.egress_rate_limit_per_container.get().bytes()) {
-      return Error(
-          "The given egress traffic limit for containers " +
-          stringify(flags.egress_rate_limit_per_container.get().bytes()) +
-          " Bytes/s is greater than the host link speed " +
-          stringify(hostLinkSpeed.get() * 1000000 / 8) + " Bytes/s");
-    }
-
-    if (flags.egress_rate_limit_per_container.get() != Bytes(0)) {
-      egressRateLimitPerContainer = flags.egress_rate_limit_per_container.get();
-    } else {
-      LOG(WARNING) << "Ignoring the given zero egress rate limit";
-    }
-  }
-
-  // Get the host IP network, MAC and default gateway.
-  Result<net::IPNetwork> hostIPNetwork =
-    net::IPNetwork::fromLinkDevice(eth0.get(), AF_INET);
-
-  if (!hostIPNetwork.isSome()) {
-    return Error(
-        "Failed to get the public IP network of " + eth0.get() + ": " +
-        (hostIPNetwork.isError() ?
-            hostIPNetwork.error() :
-            "does not have an IPv4 network"));
-  }
-
-  Result<net::MAC> hostMAC = net::mac(eth0.get());
-  if (!hostMAC.isSome()) {
-    return Error(
-        "Failed to get the MAC address of " + eth0.get() + ": " +
-        (hostMAC.isError() ? hostMAC.error() : "does not have a MAC address"));
-  }
-
-  Result<net::IP> hostDefaultGateway = route::defaultGateway();
-  if (!hostDefaultGateway.isSome()) {
-    return Error(
-        "Failed to get the default gateway of the host: " +
-        (hostDefaultGateway.isError() ? hostDefaultGateway.error()
-        : "The default gateway of the host does not exist"));
-  }
-
-  // Set the MAC address of the host loopback interface (lo) so that
-  // it matches that of the host public interface (eth0).  A fairly
-  // recent kernel patch is needed for this operation to succeed:
-  // https://git.kernel.org/cgit/linux/kernel/git/davem/net.git/:
-  // 25f929fbff0d1bcebf2e92656d33025cd330cbf8
-  Try<bool> setHostLoMAC = link::setMAC(lo.get(), hostMAC.get());
-  if (setHostLoMAC.isError()) {
-    return Error(
-        "Failed to set the MAC address of " + lo.get() +
-        ": " + setHostLoMAC.error());
-  }
-
-  // Set the MTU of the host loopback interface (lo) so that it
-  // matches that of the host public interface (eth0).
-  Result<unsigned int> hostEth0MTU = link::mtu(eth0.get());
-  if (hostEth0MTU.isError()) {
-    return Error(
-        "Failed to get the MTU of " + eth0.get() +
-        ": " + hostEth0MTU.error());
-  }
-
-  // The host public interface should exist since we just checked it.
-  CHECK_SOME(hostEth0MTU);
-
-  Try<bool> setHostLoMTU = link::setMTU(lo.get(), hostEth0MTU.get());
-  if (setHostLoMTU.isError()) {
-    return Error(
-        "Failed to set the MTU of " + lo.get() +
-        ": " + setHostLoMTU.error());
-  }
-
-  // Prepare the ingress queueing disciplines on host public interface
-  // (eth0) and host loopback interface (lo).
-  Try<bool> createHostEth0IngressQdisc = ingress::create(eth0.get());
-  if (createHostEth0IngressQdisc.isError()) {
-    return Error(
-        "Failed to create the ingress qdisc on " + eth0.get() +
-        ": " + createHostEth0IngressQdisc.error());
-  }
-
-  set<uint16_t> freeFlowIds;
-  if (flags.egress_unique_flow_per_container) {
-    // Prepare a fq_codel queueing discipline on host public interface
-    // (eth0) for egress flow classification.
-    //
-    // TODO(cwang): Maybe we can continue when some other egress qdisc
-    // exists because this is not a necessary qdisc for network
-    // isolation, but we don't want inconsistency, so we just fail in
-    // this case. See details in MESOS-2370.
-    Try<bool> createHostEth0EgressQdisc = fq_codel::create(
-        eth0.get(),
-        EGRESS_ROOT,
-        HOST_TX_FQ_CODEL_HANDLE);
-    if (createHostEth0EgressQdisc.isError()) {
-      return Error(
-          "Failed to create the egress qdisc on " + eth0.get() +
-          ": " + createHostEth0EgressQdisc.error());
-    }
-
-    // TODO(cwang): Make sure DEFAULT_FLOWS is large enough so that
-    // it's unlikely to run out of free flow IDs.
-    for (uint16_t i = CONTAINER_MIN_FLOWID; i < fq_codel::DEFAULT_FLOWS; i++) {
-      freeFlowIds.insert(i);
-    }
-  }
-
-  Try<bool> createHostLoQdisc = ingress::create(lo.get());
-  if (createHostLoQdisc.isError()) {
-    return Error(
-        "Failed to create the ingress qdisc on " + lo.get() +
-        ": " + createHostLoQdisc.error());
-  }
-
-  // Enable 'route_localnet' on host loopback interface (lo). This
-  // enables the use of 127.0.0.1/8 for local routing purpose. This
-  // feature only exists on kernel 3.6 or newer.
-  const string loRouteLocalnet =
-    path::join("/proc/sys/net/ipv4/conf", lo.get(), "route_localnet");
-
-  if (!os::exists(loRouteLocalnet)) {
-    // TODO(jieyu): Consider supporting running the isolator if this
-    // feature is not available. We need to conditionally disable
-    // routing for 127.0.0.1/8, and ask the tasks to use the public IP
-    // for container to container and container to host communication.
-    return Error("The kernel does not support 'route_localnet'");
-  }
-
-  Try<Nothing> write = os::write(loRouteLocalnet, "1");
-  if (write.isError()) {
-    return Error(
-        "Failed to enable route_localnet for " + lo.get() +
-        ": " + write.error());
-  }
-
-  // We disable 'rp_filter' and 'send_redirects' for host loopback
-  // interface (lo) to work around a kernel bug, which was only
-  // recently addressed in upstream in the following 3 commits.
-  // https://git.kernel.org/cgit/linux/kernel/git/davem/net.git/:
-  //   6a662719c9868b3d6c7d26b3a085f0cd3cc15e64
-  //   0d5edc68739f1c1e0519acbea1d3f0c1882a15d7
-  //   e374c618b1465f0292047a9f4c244bd71ab5f1f0
-  // The workaround ensures packets don't get dropped at lo.
-  write = os::write("/proc/sys/net/ipv4/conf/all/rp_filter", "0");
-  if (write.isError()) {
-    return Error(
-        "Failed to disable rp_filter for all: " + write.error());
-  }
-
-  write = os::write(path::join(
-      "/proc/sys/net/ipv4/conf", lo.get(), "rp_filter"), "0");
-  if (write.isError()) {
-    return Error(
-        "Failed to disable rp_filter for " + lo.get() +
-        ": " + write.error());
-  }
-
-  write = os::write("/proc/sys/net/ipv4/conf/all/send_redirects", "0");
-  if (write.isError()) {
-    return Error(
-        "Failed to disable send_redirects for all: " + write.error());
-  }
-
-  write = os::write(path::join(
-      "/proc/sys/net/ipv4/conf", lo.get(), "send_redirects"), "0");
-  if (write.isError()) {
-    return Error(
-        "Failed to disable send_redirects for " + lo.get() +
-        ": " + write.error());
-  }
-
-  // We need to enable accept_local on host loopback interface (lo)
-  // for kernels older than 3.6. Refer to the following:
-  // https://git.kernel.org/cgit/linux/kernel/git/davem/net.git/:
-  //   7a9bc9b81a5bc6e44ebc80ef781332e4385083f2
-  // https://www.kernel.org/doc/Documentation/networking/ip-sysctl.txt
-  write = os::write(path::join(
-      "/proc/sys/net/ipv4/conf", lo.get(), "accept_local"), "1");
-  if (write.isError()) {
-    return Error(
-        "Failed to enable accept_local for " + lo.get() +
-        ": " + write.error());
-  }
-
-  // Reading host network configurations. Each container will match
-  // these configurations.
-  hashset<string> procs;
-
-  // TODO(jieyu): The following is a partial list of all the
-  // configurations. In the future, we may want to expose these
-  // configurations using ContainerInfo.
-
-  // The kernel will use a default value for the following
-  // configurations inside a container. Therefore, we need to set them
-  // in the container to match that on the host.
-  procs.insert("/proc/sys/net/core/somaxconn");
-
-  // As of kernel 3.10, the following configurations are shared
-  // between host and containers, and therefore are not required to be
-  // set in containers. We keep them here just in case the kernel
-  // changes in the future.
-  procs.insert("/proc/sys/net/core/netdev_max_backlog");
-  procs.insert("/proc/sys/net/core/rmem_max");
-  procs.insert("/proc/sys/net/core/wmem_max");
-  procs.insert("/proc/sys/net/ipv4/tcp_keepalive_time");
-  procs.insert("/proc/sys/net/ipv4/tcp_keepalive_intvl");
-  procs.insert("/proc/sys/net/ipv4/tcp_keepalive_probes");
-  procs.insert("/proc/sys/net/ipv4/tcp_max_syn_backlog");
-  procs.insert("/proc/sys/net/ipv4/tcp_rmem");
-  procs.insert("/proc/sys/net/ipv4/tcp_retries2");
-  procs.insert("/proc/sys/net/ipv4/tcp_synack_retries");
-  procs.insert("/proc/sys/net/ipv4/tcp_wmem");
-  procs.insert("/proc/sys/net/ipv4/neigh/default/gc_thresh1");
-  procs.insert("/proc/sys/net/ipv4/neigh/default/gc_thresh2");
-  procs.insert("/proc/sys/net/ipv4/neigh/default/gc_thresh3");
-
-  hashmap<string, string> hostNetworkConfigurations;
-  foreach (const string& proc, procs) {
-    Try<string> value = os::read(proc);
-    if (value.isSome()) {
-      LOG(INFO) << proc << " = '" << strings::trim(value.get()) << "'";
-      hostNetworkConfigurations[proc] = strings::trim(value.get());
-    }
-  }
-
-  // Self bind mount PORT_MAPPING_BIND_MOUNT_ROOT(). Since we use a
-  // new mount namespace for each container, for this mount point, we
-  // set '--make-rshared' on the host and set '--make-rslave' inside
-  // each container. This is important because when we unmount the
-  // network namespace handles on the host, those handles will be
-  // unmounted in the containers as well, but NOT vice versa.
-
-  // We first create the bind mount directory if it does not exist.
-  Try<Nothing> mkdir = os::mkdir(PORT_MAPPING_BIND_MOUNT_ROOT());
-  if (mkdir.isError()) {
-    return Error(
-        "Failed to create the bind mount root directory at " +
-        PORT_MAPPING_BIND_MOUNT_ROOT() + ": " + mkdir.error());
-  }
-
-  // Now, check '/proc/mounts' to see if
-  // PORT_MAPPING_BIND_MOUNT_ROOT() has already been self mounted.
-  Try<fs::MountTable> mountTable = fs::MountTable::read("/proc/mounts");
-  if (mountTable.isError()) {
-    return Error(
-        "Failed to the read the mount table at '/proc/mounts': " +
-        mountTable.error());
-  }
-
-  Option<fs::MountTable::Entry> bindMountRoot;
-  foreach (const fs::MountTable::Entry& entry, mountTable.get().entries) {
-    if (entry.dir == PORT_MAPPING_BIND_MOUNT_ROOT()) {
-      bindMountRoot = entry;
-    }
-  }
-
-  // Self bind mount PORT_MAPPING_BIND_MOUNT_ROOT().
-  if (bindMountRoot.isNone()) {
-    // NOTE: Instead of using fs::mount to perform the bind mount, we
-    // use the shell command here because the syscall 'mount' does not
-    // update the mount table (i.e., /etc/mtab), which could cause
-    // issues for the shell command 'mount --make-rslave' inside the
-    // container. It's OK to use the blocking os::shell here because
-    // 'create' will only be invoked during initialization.
-    Try<string> mount = os::shell(
-        "mount --bind %s %s",
-        PORT_MAPPING_BIND_MOUNT_ROOT().c_str(),
-        PORT_MAPPING_BIND_MOUNT_ROOT().c_str());
-
-    if (mount.isError()) {
-      return Error(
-          "Failed to self bind mount '" + PORT_MAPPING_BIND_MOUNT_ROOT() +
-          "': " + mount.error());
-    }
-  }
-
-  // Mark the mount point PORT_MAPPING_BIND_MOUNT_ROOT() as
-  // recursively shared.
-  Try<string> mountShared = os::shell(
-      "mount --make-rshared %s",
-      PORT_MAPPING_BIND_MOUNT_ROOT().c_str());
-
-  if (mountShared.isError()) {
-    return Error(
-        "Failed to mark '" + PORT_MAPPING_BIND_MOUNT_ROOT() +
-        "' as recursively shared: " + mountShared.error());
-  }
-
-  // Create the network namespace handle symlink directory if it does
-  // not exist. It is used to host from network namespace handle
-  // symlinks whose basename is a container ID. This allows us to
-  // recover container IDs for orphan containers (i.e., not known by
-  // the slave). This is introduced in 0.23.0.
-  mkdir = os::mkdir(PORT_MAPPING_BIND_MOUNT_SYMLINK_ROOT());
-  if (mkdir.isError()) {
-    return Error(
-        "Failed to create the bind mount root directory at " +
-        PORT_MAPPING_BIND_MOUNT_SYMLINK_ROOT() + ": " + mkdir.error());
-  }
-
-  return new MesosIsolator(Owned<MesosIsolatorProcess>(
-      new PortMappingIsolatorProcess(
-          flags,
-          eth0.get(),
-          lo.get(),
-          hostMAC.get(),
-          hostIPNetwork.get(),
-          hostEth0MTU.get(),
-          hostDefaultGateway.get(),
-          hostNetworkConfigurations,
-          egressRateLimitPerContainer,
-          nonEphemeralPorts,
-          ephemeralPortsAllocator,
-          freeFlowIds)));
-}
-
-
-Future<Nothing> PortMappingIsolatorProcess::recover(
-    const list<ContainerState>& states,
-    const hashset<ContainerID>& orphans)
-{
-  // Extract pids from virtual device names (veth). This tells us
-  // about all the potential live containers on this slave.
-  Try<set<string>> links = net::links();
-  if (links.isError()) {
-    return Failure("Failed to get all the links: " + links.error());
-  }
-
-  hashset<pid_t> pids;
-  foreach (const string& name, links.get()) {
-    Option<pid_t> pid = getPidFromVeth(name);
-    // Not all links follow the naming: mesos{pid}, so we simply
-    // continue, e.g., eth0.
-    if (pid.isNone()) {
-      continue;
-    } else if (pids.contains(pid.get())) {
-      return Failure("Two virtual devices have the same name '" + name + "'");
-    }
-
-    pids.insert(pid.get());
-  }
-
-  // Scan the bind mount root to cleanup all stale network namespace
-  // handles that do not have an active veth associated with.
-  Try<list<string>> entries = os::ls(PORT_MAPPING_BIND_MOUNT_ROOT());
-  if (entries.isError()) {
-    return Failure(
-        "Failed to list bind mount root '" +
-        PORT_MAPPING_BIND_MOUNT_ROOT() +
-        "': " + entries.error());
-  }
-
-  foreach (const string& entry, entries.get()) {
-    const string path = path::join(PORT_MAPPING_BIND_MOUNT_ROOT(), entry);
-
-    // NOTE: We expect all regular files whose names are numbers under
-    // the bind mount root are network namespace handles.
-    Result<pid_t> pid = getPidFromNamespaceHandle(path);
-    if (pid.isError()) {
-      return Failure(
-          "Failed to get pid from network namespace handle '" +
-          path + "': " + pid.error());
-    } else if (pid.isNone()) {
-      // We ignore files that are clearly not network namespace
-      // handles created by us. It's likely that those are created by
-      // users or other tools.
-      LOG(WARNING) << "Unrecognized network namespace handle '" << path << "'";
-      continue;
-    }
-
-    // We cleanup the network namespace handle if the associated
-    // containers have clearly exited (i.e., the veth has gone). The
-    // cleanup here is best effort.
-    if (!pids.contains(pid.get())) {
-      LOG(INFO) << "Removing stale network namespace handle '" << path << "'";
-
-      Try<Nothing> unmount = fs::unmount(path, MNT_DETACH);
-      if (unmount.isError()) {
-        LOG(WARNING) << "Failed to unmount stale network namespace handle '"
-                     << path << "': " << unmount.error();
-      }
-
-      Try<Nothing> rm = os::rm(path);
-      if (rm.isError()) {
-        LOG(WARNING) << "Failed to remove stale network namespace handle '"
-                     << path << "': " << rm.error();
-      }
-    }
-  }
-
-  // Scan the bind mount symlink root for container IDs. This allows us
-  // to recover container IDs for orphan containers (i.e., not known
-  // by the slave). This is introduced in 0.23.0.
-  entries = os::ls(PORT_MAPPING_BIND_MOUNT_SYMLINK_ROOT());
-  if (entries.isError()) {
-    return Failure(
-        "Failed to list bind mount symlink root '" +
-        PORT_MAPPING_BIND_MOUNT_SYMLINK_ROOT() +
-        "': " + entries.error());
-  }
-
-  // This map stores the mapping between pids and container IDs
-  // recovered from the bind mount root that have valid veth links. We
-  // use a multihashmap here because multiple container IDs can map to
-  // the same pid if the removal of a symlink fails in '_cleanup()'
-  // and the pid is reused by a new container.
-  multihashmap<pid_t, ContainerID> linkers;
-
-  foreach (const string& entry, entries.get()) {
-    const string path =
-      path::join(PORT_MAPPING_BIND_MOUNT_SYMLINK_ROOT(), entry);
-
-    // We only create symlinks in this directory and assume
-    // non-symlink files are created by other users or tools,
-    // therefore will be ignored.
-    if (!os::stat::islink(path)) {
-      LOG(WARNING) << "Ignored non-symlink file '" << path
-                   << "' under bind mount symlink root '"
-                   << PORT_MAPPING_BIND_MOUNT_SYMLINK_ROOT() << "'";
-      continue;
-    }
-
-    // NOTE: We expect all symlinks under the bind mount symlink root
-    // to be container ID symlinks.
-
-    Try<ContainerID> containerId = getContainerIdFromSymlink(path);
-    if (containerId.isError()) {
-      return Failure(
-          "Failed to get container ID from network namespace handle symlink '" +
-          path + "': " + containerId.error());
-    }
-
-    Result<pid_t> pid = getPidFromSymlink(path);
-    if (pid.isError()) {
-      return Failure(
-          "Failed to get pid from network namespace handle symlink '" + path +
-          "': " + pid.error());
-    }
-
-    // We remove the symlink if it's dangling or the associated
-    // containers have clearly exited (i.e., the veth has gone). The
-    // cleanup here is best effort.
-    if (pid.isNone() || !pids.contains(pid.get())) {
-      LOG(INFO) << "Removing stale network namespace handle symlink '"
-                << path << "'";
-
-      Try<Nothing> rm = os::rm(path);
-      if (rm.isError()) {
-        LOG(WARNING) << "Failed to remove stale network namespace handle "
-                     << " symlink '" << path << "': " << rm.error();
-      }
-    } else {
-      LOG(INFO) << "Discovered network namespace handle symlink "
-                << containerId.get() << " -> " << pid.get();
-
-      linkers.put(pid.get(), containerId.get());
-    }
-  }
-
-  // If multiple container IDs point to the same pid, we remove both
-  // symlinks for safety (as if we cannot derive the container ID for
-  // orphans, which is OK because it'll be treated the same as those
-  // containers that are created by older (pre 0.23.0) versions). Note
-  // that it's possible that multiple container IDs map to the same
-  // pid if the removal of a symlink fails in '_cleanup()' and the pid
-  // is reused by a new container.
-  foreach (pid_t pid, linkers.keys()) {
-    list<ContainerID> containerIds = linkers.get(pid);
-    if (containerIds.size() > 1) {
-      foreach (const ContainerID& containerId, containerIds) {
-        const string linker = getSymlinkPath(containerId);
-
-        LOG(WARNING) << "Removing duplicated network namespace handle symlink '"
-                     << linker << "'";
-
-        Try<Nothing> rm = os::rm(linker);
-        if (rm.isError()) {
-          LOG(WARNING) << "Failed to remove duplicated network namespace "
-                       << "handle symlink '" << linker << "': " << rm.error();
-        }
-      }
-
-      linkers.remove(pid);
-    }
-  }
-
-  // Now, actually recover the isolator from slave's state.
-  foreach (const ContainerState& state, states) {
-    const ContainerID& containerId = state.container_id();
-    pid_t pid = state.pid();
-
-    VLOG(1) << "Recovering network isolator for container "
-            << containerId << " with pid " << pid;
-
-    if (!pids.contains(pid)) {
-      // There are two possible cases here:
-      //
-      // 1) The container was launched by the slave with network
-      //    isolation disabled, so the pid could not be found in the
-      //    device names in the system.
-      //
-      // 2) The container was launched by the slave with network
-      //    isolation enabled, but veth is removed (because the
-      //    corresponding container is destroyed), but the slave
-      //    restarts before it is able to write the sentinel file.
-      //
-      // In both cases, we treat the container as unmanaged. For case
-      // (2), it's safe to do so because the container has already
-      // been destroyed.
-      VLOG(1) << "Skipped recovery for container " << containerId
-              << "with pid " << pid << " as either it was not managed by "
-              << "the network isolator or it has already been destroyed";
-
-      unmanaged.insert(containerId);
-      continue;
-    }
-
-    Try<Info*> recover = _recover(pid);
-    if (recover.isError()) {
-      foreachvalue (Info* info, infos) {
-        delete info;
-      }
-
-      return Failure(
-          "Failed to recover container " + stringify(containerId) +
-          " with pid " + stringify(pid) + ": " + recover.error());
-    }
-
-    infos[containerId] = recover.get();
-
-    // Remove the successfully recovered pid.
-    pids.erase(pid);
-  }
-
-  // Recover orphans. Known orphans will be destroyed by containerizer
-  // using the normal cleanup path (refer to MESOS-2367 for details).
-  // Unknown orphans will be cleaned up immediately. The recovery will
-  // fail if there is some unknown orphan that cannot be cleaned up.
-  vector<Info*> unknownOrphans;
-
-  foreach (pid_t pid, pids) {
-    Try<Info*> recover = _recover(pid);
-    if (recover.isError()) {
-      foreachvalue (Info* info, infos) {
-        delete info;
-      }
-      foreach (Info* info, unknownOrphans) {
-        delete info;
-      }
-
-      return Failure(
-          "Failed to recover orphaned container with pid " +
-          stringify(pid) + ": " + recover.error());
-    }
-
-    if (linkers.get(pid).size() == 1) {
-      const ContainerID containerId = linkers.get(pid).front();
-      CHECK(!infos.contains(containerId));
-
-      if (orphans.contains(containerId)) {
-        infos[containerId] = recover.get();
-        continue;
-      }
-    }
-
-    unknownOrphans.push_back(recover.get());
-  }
-
-  foreach (Info* info, unknownOrphans) {
-    CHECK_SOME(info->pid);
-    pid_t pid = info->pid.get();
-
-    Option<ContainerID> containerId;
-    if (linkers.get(pid).size() == 1) {
-      containerId = linkers.get(pid).front();
-    }
-
-    // NOTE: If 'infos' is empty (means there is no regular container
-    // or known orphan), the '_cleanup' below will remove the ICMP and
-    // ARP packet filters on host eth0. This will cause subsequent
-    // calls to '_cleanup' for unknown orphans to fail. However, this
-    // is OK because when slave restarts and tries to recover again,
-    // it'll try to remove the remaining unknown orphans.
-    // TODO(jieyu): Consider call '_cleanup' for all the unknown
-    // orphans before returning even if error occurs.
-    Try<Nothing> cleanup = _cleanup(info, containerId);
-    if (cleanup.isError()) {
-      foreachvalue (Info* info, infos) {
-        delete info;
-      }
-
-      // TODO(jieyu): Also delete 'info' in unknownOrphans. Notice
-      // that some 'info' in unknownOrphans might have already been
-      // deleted in '_cleanup' above.
-
-      return Failure(
-          "Failed to cleanup orphaned container with pid " +
-          stringify(pid) + ": " + cleanup.error());
-    }
-  }
-
-  // TODO(cwang): Consider removing unrecognized flow classifiers from
-  // host eth0 egress.
-
-  LOG(INFO) << "Network isolator recovery complete";
-
-  return Nothing();
-}
-
-
-Try<PortMappingIsolatorProcess::Info*>
-PortMappingIsolatorProcess::_recover(pid_t pid)
-{
-  // Get all the IP filters on veth.
-  // NOTE: We only look at veth devices to recover port ranges
-  // assigned to each container. That's the reason why we need to make
-  // sure that we add filters to veth before adding filters to host
-  // eth0 and host lo. Also, we need to make sure we remove filters
-  // from host eth0 and host lo before removing filters from veth.
-  Result<vector<ip::Classifier>> vethIngressClassifiers =
-    ip::classifiers(veth(pid), ingress::HANDLE);
-
-  if (vethIngressClassifiers.isError()) {
-    return Error(
-        "Failed to get all the IP filters on " + veth(pid) +
-        ": " + vethIngressClassifiers.error());
-  } else if (vethIngressClassifiers.isNone()) {
-    return Error(
-        "Failed to get all the IP filters on " + veth(pid) +
-        ": link does not exist");
-  }
-
-  hashmap<PortRange, uint16_t> flowIds;
-
-  if (flags.egress_unique_flow_per_container) {
-    // Get all egress IP flow classifiers on eth0.
-    Result<vector<filter::Filter<ip::Classifier>>> eth0EgressFilters =
-      ip::filters(eth0, HOST_TX_FQ_CODEL_HANDLE);
-
-    if (eth0EgressFilters.isError()) {
-      return Error(
-          "Failed to get all the IP flow classifiers on " + eth0 +
-          ": " + eth0EgressFilters.error());
-    } else if (eth0EgressFilters.isNone()) {
-      return Error(
-          "Failed to get all the IP flow classifiers on " + eth0 +
-          ": link does not exist");
-    }
-
-    // Construct a port range to flow ID mapping from host eth0
-    // egress. This map will be used later.
-    foreach (const filter::Filter<ip::Classifier>& filter,
-             eth0EgressFilters.get()) {
-      const Option<PortRange> sourcePorts = filter.classifier.sourcePorts;
-      const Option<Handle> classid = filter.classid;
-
-      if (sourcePorts.isNone()) {
-        return Error("Missing source ports for filters on egress of " + eth0);
-      }
-
-      if (classid.isNone()) {
-        return Error("Missing classid for filters on egress of " + eth0);
-      }
-
-      if (flowIds.contains(sourcePorts.get())) {
-        return Error(
-          "Duplicated port range " + stringify(sourcePorts.get()) +
-          " detected on egress of " + eth0);
-      }
-
-      flowIds[sourcePorts.get()] = classid.get().secondary();
-    }
-  }
-
-  IntervalSet<uint16_t> nonEphemeralPorts;
-  IntervalSet<uint16_t> ephemeralPorts;
-  Option<uint16_t> flowId;
-
-  foreach (const ip::Classifier& classifier, vethIngressClassifiers.get()) {
-    const Option<PortRange> sourcePorts = classifier.sourcePorts;
-    const Option<PortRange> destinationPorts = classifier.destinationPorts;
-
-    // All the IP filters on veth used by us only have source ports.
-    if (sourcePorts.isNone() || destinationPorts.isSome()) {
-      return Error("Unexpected IP filter detected on " + veth(pid));
-    }
-
-    if (flowIds.contains(sourcePorts.get())) {
-      if (flowId.isNone()) {
-        flowId = flowIds.get(sourcePorts.get());
-      } else if (flowId != flowIds.get(sourcePorts.get())) {
-        return Error(
-            "A container is associated with multiple flows "
-            "on egress of " + eth0);
-      }
-    } else if (flowId.isSome()) {
-      // This is the case where some port range of a container is
-      // assigned to a flow while some isn't. This could happen if
-      // slave crashes while those filters are created. However, this
-      // is OK for us because packets by default go to the host flow.
-      LOG(WARNING) << "Container port range " << sourcePorts.get()
-                   << " does not have flow id " << flowId.get()
-                   << " assigned";
-    }
-
-    Interval<uint16_t> ports =
-      (Bound<uint16_t>::closed(sourcePorts.get().begin()),
-       Bound<uint16_t>::closed(sourcePorts.get().end()));
-
-    if (managedNonEphemeralPorts.contains(ports)) {
-      nonEphemeralPorts += ports;
-    } else if (ephemeralPortsAllocator->isManaged(ports)) {
-      // We have duplicate here because we have two IP filters with
-      // the same ephemeral port range (one for eth0 and one for lo).
-      // But we should never have two intersecting port ranges.
-      if (!ephemeralPorts.contains(ports) && ephemeralPorts.intersects(ports)) {
-        return Error("Unexpected intersected ephemeral port ranges");
-      }
-
-      ephemeralPorts += ports;
-    } else {
-      return Error("Unexpected IP filter detected on " + veth(pid));
-    }
-  }
-
-  Info* info = NULL;
-
-  if (ephemeralPorts.empty()) {
-    // NOTE: This is possible because the slave may crash while
-    // calling 'isolate()', leaving a partially isolated container. To
-    // clean up this partially isolated container, we still create an
-    // Info struct here and let the 'cleanup' function clean it up
-    // later.
-    LOG(WARNING) << "No ephemeral ports found for container with pid "
-                 << stringify(pid) << ". This could happen if slave crashes "
-                 << "while isolating a container";
-
-    info = new Info(nonEphemeralPorts, Interval<uint16_t>(), pid);
-  } else {
-    if (ephemeralPorts.intervalCount() != 1) {
-      return Error("Each container should have only one ephemeral port range");
-    }
-
-    // Tell the allocator that this ephemeral port range is used.
-    ephemeralPortsAllocator->allocate(*ephemeralPorts.begin());
-
-    info = new Info(nonEphemeralPorts, *ephemeralPorts.begin(), pid);
-
-    VLOG(1) << "Recovered network isolator for container with pid " << pid
-            << " non-ephemeral port ranges " << nonEphemeralPorts
-            << " and ephemeral port range " << *ephemeralPorts.begin();
-  }
-
-  if (flowId.isSome()) {
-    freeFlowIds.erase(flowId.get());
-    info->flowId = flowId.get();
-  }
-
-  return CHECK_NOTNULL(info);
-}
-
-
-Future<Option<ContainerPrepareInfo>> PortMappingIsolatorProcess::prepare(
-    const ContainerID& containerId,
-    const ExecutorInfo& executorInfo,
-    const string& directory,
-    const Option<string>& user)
-{
-  if (unmanaged.contains(containerId)) {
-    return Failure("Asked to prepare an unmanaged container");
-  }
-
-  if (infos.contains(containerId)) {
-    return Failure("Container has already been prepared");
-  }
-
-  Resources resources(executorInfo.resources());
-
-  IntervalSet<uint16_t> nonEphemeralPorts;
-
-  if (resources.ports().isSome()) {
-    nonEphemeralPorts = getIntervalSet(resources.ports().get());
-
-    // Sanity check to make sure that the assigned non-ephemeral ports
-    // for the container are part of the non-ephemeral ports specified
-    // by the slave.
-    if (!managedNonEphemeralPorts.contains(nonEphemeralPorts)) {
-        return Failure(
-            "Some non-ephemeral ports specified in " +
-            stringify(nonEphemeralPorts) +
-            " are not managed by the slave");
-    }
-  }
-
-  // TODO(jieyu): For now, we simply ignore the 'ephemeral_ports'
-  // specified in the executor info. However, this behavior needs to
-  // be changed once the master can make default allocations for
-  // ephemeral ports.
-  if (resources.ephemeral_ports().isSome()) {
-    LOG(WARNING) << "Ignoring the specified ephemeral_ports '"
-                 << resources.ephemeral_ports().get()
-                 << "' for container" << containerId
-                 << " of executor " << executorInfo.executor_id();
-  }
-
-  // Allocate the ephemeral ports used by this container.
-  Try<Interval<uint16_t>> ephemeralPorts = ephemeralPortsAllocator->allocate();
-  if (ephemeralPorts.isError()) {
-    return Failure(
-        "Failed to allocate ephemeral ports: " + ephemeralPorts.error());
-  }
-
-  infos[containerId] = new Info(nonEphemeralPorts, ephemeralPorts.get());
-
-  LOG(INFO) << "Using non-ephemeral ports " << nonEphemeralPorts
-            << " and ephemeral ports " << ephemeralPorts.get()
-            << " for container " << containerId << " of executor "
-            << executorInfo.executor_id();
-
-  ContainerPrepareInfo prepareInfo;
-  prepareInfo.add_commands()->set_value(scripts(infos[containerId]));
-
-  // NOTE: the port mapping isolator itself doesn't require mount
-  // namespace. However, if mount namespace is enabled because of
-  // other isolators, we need to set mount sharing accordingly for
-  // PORT_MAPPING_BIND_MOUNT_ROOT to avoid races described in
-  // MESOS-1558. So we turn on mount namespace here for consistency.
-  prepareInfo.set_namespaces(CLONE_NEWNET | CLONE_NEWNS);
-
-  return prepareInfo;
-}
-
-
-Future<Nothing> PortMappingIsolatorProcess::isolate(
-    const ContainerID& containerId,
-    pid_t pid)
-{
-  if (unmanaged.contains(containerId)) {
-    return Failure("Asked to isolate an unmanaged container");
-  }
-
-  if (!infos.contains(containerId)) {
-    return Failure("Unknown container");
-  }
-
-  Info* info = CHECK_NOTNULL(infos[containerId]);
-
-  if (info->pid.isSome()) {
-    return Failure("The container has already been isolated");
-  }
-
-  info->pid = pid;
-
-  if (flags.egress_unique_flow_per_container) {
-    info->flowId = getNextFlowId();
-  }
-
-  // Bind mount the network namespace handle of the process 'pid' to a
-  // directory to hold an extra reference to the network namespace
-  // which will be released in 'cleanup'. By holding the extra
-  // reference, the network namespace will not be destroyed even if
-  // the process 'pid' is gone, which allows us to explicitly control
-  // the network namespace life cycle.
-  const string source = path::join("/proc", stringify(pid), "ns", "net");
-  const string target = getNamespaceHandlePath(pid);
-
-  Try<Nothing> touch = os::touch(target);
-  if (touch.isError()) {
-    return Failure("Failed to create the bind mount point: " + touch.error());
-  }
-
-  Try<Nothing> mount = fs::mount(source, target, None(), MS_BIND, NULL);
-  if (mount.isError()) {
-    return Failure(
-        "Failed to mount the network namespace handle from '" +
-        source + "' to '" + target + "': " + mount.error());
-  }
-
-  LOG(INFO) << "Bind mounted '" << source << "' to '" << target
-            << "' for container " << containerId;
-
-  // Since 0.23.0, we create a symlink to the network namespace handle
-  // using the container ID. This serves two purposes. First, it
-  // allows us to recover the container ID later when slave restarts
-  // even if slave's checkpointed meta data is deleted. Second, it
-  // makes the debugging easier. See MESOS-2528 for details.
-  const string linker = getSymlinkPath(containerId);
-  Try<Nothing> symlink = ::fs::symlink(target, linker);
-  if (symlink.isError()) {
-    return Failure(
-        "Failed to symlink the network namespace handle '" +
-        linker + "' -> '" + target + "': " + symlink.error());
-  }
-
-  LOG(INFO) << "Created network namespace handle symlink '"
-            << linker << "' -> '" << target << "'";
-
-  // Create a virtual ethernet pair for this container.
-  Try<bool> createVethPair = link::create(veth(pid), eth0, pid);
-  if (createVethPair.isError()) {
-    return Failure(
-        "Failed to create virtual ethernet pair: " +
-        createVethPair.error());
-  }
-
-  // Disable IPv6 for veth as IPv6 packets won't be forwarded anyway.
-  const string disableIPv6 =
-    path::join("/proc/sys/net/ipv6/conf", veth(pid), "disable_ipv6");
-
-  if (os::exists(disableIPv6)) {
-    Try<Nothing> write = os::write(disableIPv6, "1");
-    if (write.isError()) {
-      return Failure(
-          "Failed to disable IPv6 for " + veth(pid) +
-          ": " + write.error());
-    }
-  }
-
-  // Sets the MAC address of veth to match the MAC address of the host
-  // public interface (eth0).
-  Try<bool> setVethMAC = link::setMAC(veth(pid), hostMAC);
-  if (setVethMAC.isError()) {
-    return Failure(
-        "Failed to set the MAC address of " + veth(pid) +
-        ": " + setVethMAC.error());
-  }
-
-  // Prepare the ingress queueing disciplines on veth.
-  Try<bool> createQdisc = ingress::create(veth(pid));
-  if (createQdisc.isError()) {
-    return Failure(
-        "Failed to create the ingress qdisc on " + veth(pid) +
-        ": " + createQdisc.error());
-  }
-
-  // Veth device should exist since we just created it.
-  CHECK(createQdisc.get());
-
-  // For each port range, add a set of IP packet filters to properly
-  // redirect IP traffic to/from containers.
-  foreach (const PortRange& range,
-           getPortRanges(info->nonEphemeralPorts + info->ephemeralPorts)) {
-    if (info->flowId.isSome()) {
-      LOG(INFO) << "Adding IP packet filters with ports " << range
-                << " with flow ID " << info->flowId.get()
-                << " for container " << containerId;
-    } else {
-      LOG(INFO) << "Adding IP packet filters with ports " << range
-                << " for container " << containerId;
-    }
-
-    Try<Nothing> add = addHostIPFilters(range, info->flowId, veth(pid));
-    if (add.isError()) {
-      return Failure(
-          "Failed to add IP packet filter with ports " +
-          stringify(range) + " for container with pid " +
-          stringify(pid) + ": " + add.error());
-    }
-  }
-
-  // Relay ICMP packets from veth of the container to host eth0.
-  Try<bool> icmpVethToEth0 = filter::icmp::create(
-      veth(pid),
-      ingress::HANDLE,
-      icmp::Classifier(None()),
-      Priority(ICMP_FILTER_PRIORITY, NORMAL),
-      action::Redirect(eth0));
-
-  if (icmpVethToEth0.isError()) {
-    ++metrics.adding_veth_icmp_filters_errors;
-
-    return Failure(
-        "Failed to create an ICMP packet filter from " + veth(pid) +
-        " to host " + eth0 + ": " + icmpVethToEth0.error());
-  } else if (!icmpVethToEth0.get()) {
-    ++metrics.adding_veth_icmp_filters_already_exist;
-
-    return Failure(
-        "The ICMP packet filter from " + veth(pid) +
-        " to host " + eth0 + " already exists");
-  }
-
-  // Relay ARP packets from veth of the container to host eth0.
-  Try<bool> arpVethToEth0 = filter::basic::create(
-      veth(pid),
-      ingress::HANDLE,
-      ETH_P_ARP,
-      Priority(ARP_FILTER_PRIORITY, NORMAL),
-      action::Redirect(eth0));
-
-  if (arpVethToEth0.isError()) {
-    ++metrics.adding_veth_arp_filters_errors;
-
-    return Failure(
-        "Failed to create an ARP packet filter from " + veth(pid) +
-        " to host " + eth0 + ": " + arpVethToEth0.error());
-  } else if (!arpVethToEth0.get()) {
-    ++metrics.adding_veth_arp_filters_already_exist;
-
-    return Failure(
-        "The ARP packet filter from " + veth(pid) +
-        " to host " + eth0 + " already exists");
-  }
-
-  // Setup filters for ICMP and ARP packets. We mirror ICMP and ARP
-  // packets from host eth0 to veths of all the containers. We also
-  // setup flow classifiers for host eth0 egress.
-  set<string> targets;
-  foreachvalue (Info* info, infos) {
-    if (info->pid.isSome()) {
-      targets.insert(veth(info->pid.get()));
-    }
-  }
-
-  if (targets.size() == 1) {
-    // We just create the first container in which case we should
-    // create filters for ICMP and ARP packets.
-
-    // Create a new ICMP filter on host eth0 ingress for mirroring
-    // packets from host eth0 to veth.
-    Try<bool> icmpEth0ToVeth = filter::icmp::create(
-        eth0,
-        ingress::HANDLE,
-        icmp::Classifier(hostIPNetwork.address()),
-        Priority(ICMP_FILTER_PRIORITY, NORMAL),
-        action::Mirror(targets));
-
-    if (icmpEth0ToVeth.isError()) {
-      ++metrics.adding_eth0_icmp_filters_errors;
-
-      return Failure(
-          "Failed to create an ICMP packet filter from host " + eth0 +
-          " to " + veth(pid) + ": " + icmpEth0ToVeth.error());
-    } else if (!icmpEth0ToVeth.get()) {
-      ++metrics.adding_eth0_icmp_filters_already_exist;
-
-      return Failure(
-          "The ICMP packet filter on host " + eth0 + " already exists");
-    }
-
-    // Create a new ARP filter on host eth0 ingress for mirroring
-    // packets from host eth0 to veth.
-    Try<bool> arpEth0ToVeth = filter::basic::create(
-        eth0,
-        ingress::HANDLE,
-        ETH_P_ARP,
-        Priority(ARP_FILTER_PRIORITY, NORMAL),
-        action::Mirror(targets));
-
-    if (arpEth0ToVeth.isError()) {
-      ++metrics.adding_eth0_arp_filters_errors;
-
-      return Failure(
-          "Failed to create an ARP packet filter from host " + eth0 +
-          " to " + veth(pid) + ": " + arpEth0ToVeth.error());
-    } else if (!arpEth0ToVeth.get()) {
-      ++metrics.adding_eth0_arp_filters_already_exist;
-
-      return Failure(
-          "The ARP packet filter on host " + eth0 + " already exists");
-    }
-
-    if (flags.egress_unique_flow_per_container) {
-      // Create a new ICMP filter on host eth0 egress for classifying
-      // packets into a reserved flow.
-      Try<bool> icmpEth0Egress = filter::icmp::create(
-          eth0,
-          HOST_TX_FQ_CODEL_HANDLE,
-          icmp::Classifier(None()),
-          Priority(ICMP_FILTER_PRIORITY, NORMAL),
-          Handle(HOST_TX_FQ_CODEL_HANDLE, ICMP_FLOWID));
-
-      if (icmpEth0Egress.isError()) {
-        ++metrics.adding_eth0_egress_filters_errors;
-
-        return Failure(
-            "Failed to create the ICMP flow classifier on host " +
-            eth0 + ": " + icmpEth0Egress.error());
-      } else if (!icmpEth0Egress.get()) {
-        ++metrics.adding_eth0_egress_filters_already_exist;
-
-        return Failure(
-            "The ICMP flow classifier on host " + eth0 + " already exists");
-      }
-
-      // Create a new ARP filter on host eth0 egress for classifying
-      // packets into a reserved flow.
-      Try<bool> arpEth0Egress = filter::basic::create(
-          eth0,
-          HOST_TX_FQ_CODEL_HANDLE,
-          ETH_P_ARP,
-          Priority(ARP_FILTER_PRIORITY, NORMAL),
-          Handle(HOST_TX_FQ_CODEL_HANDLE, ARP_FLOWID));
-
-      if (arpEth0Egress.isError()) {
-        ++metrics.adding_eth0_egress_filters_errors;
-
-        return Failure(
-            "Failed to create the ARP flow classifier on host " +
-            eth0 + ": " + arpEth0Egress.error());
-      } else if (!arpEth0Egress.get()) {
-        ++metrics.adding_eth0_egress_filters_already_exist;
-
-        return Failure(
-            "The ARP flow classifier on host " + eth0 + " already exists");
-      }
-
-      // Rest of the host packets go to a reserved flow.
-      Try<bool> defaultEth0Egress = filter::basic::create(
-          eth0,
-          HOST_TX_FQ_CODEL_HANDLE,
-          ETH_P_ALL,
-          Priority(DEFAULT_FILTER_PRIORITY, NORMAL),
-          Handle(HOST_TX_FQ_CODEL_HANDLE, HOST_FLOWID));
-
-      if (defaultEth0Egress.isError()) {
-        ++metrics.adding_eth0_egress_fi

<TRUNCATED>