You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@mesos.apache.org by ji...@apache.org on 2015/10/26 19:40:30 UTC

[06/12] mesos git commit: Relocated MesosContainerizer specific files to the correct location.

http://git-wip-us.apache.org/repos/asf/mesos/blob/9a722d74/src/slave/containerizer/mesos/isolators/network/port_mapping.cpp
----------------------------------------------------------------------
diff --git a/src/slave/containerizer/mesos/isolators/network/port_mapping.cpp b/src/slave/containerizer/mesos/isolators/network/port_mapping.cpp
new file mode 100644
index 0000000..565f9cc
--- /dev/null
+++ b/src/slave/containerizer/mesos/isolators/network/port_mapping.cpp
@@ -0,0 +1,3792 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include <limits.h>
+#include <string.h>
+#include <unistd.h>
+
+#include <iostream>
+#include <vector>
+
+#include <glog/logging.h>
+
+#include <mesos/mesos.hpp>
+
+#include <process/collect.hpp>
+#include <process/defer.hpp>
+#include <process/io.hpp>
+#include <process/pid.hpp>
+#include <process/subprocess.hpp>
+
+#include <stout/error.hpp>
+#include <stout/foreach.hpp>
+#include <stout/fs.hpp>
+#include <stout/hashset.hpp>
+#include <stout/json.hpp>
+#include <stout/lambda.hpp>
+#include <stout/mac.hpp>
+#include <stout/multihashmap.hpp>
+#include <stout/numify.hpp>
+#include <stout/os.hpp>
+#include <stout/option.hpp>
+#include <stout/path.hpp>
+#include <stout/protobuf.hpp>
+#include <stout/result.hpp>
+#include <stout/stringify.hpp>
+#include <stout/strings.hpp>
+#include <stout/utils.hpp>
+
+#include <stout/os/exists.hpp>
+#include <stout/os/stat.hpp>
+
+#include "common/status_utils.hpp"
+
+#include "linux/fs.hpp"
+#include "linux/ns.hpp"
+
+#include "linux/routing/route.hpp"
+#include "linux/routing/utils.hpp"
+
+#include "linux/routing/diagnosis/diagnosis.hpp"
+
+#include "linux/routing/filter/basic.hpp"
+#include "linux/routing/filter/icmp.hpp"
+#include "linux/routing/filter/ip.hpp"
+
+#include "linux/routing/handle.hpp"
+
+#include "linux/routing/link/link.hpp"
+
+#include "linux/routing/queueing/fq_codel.hpp"
+#include "linux/routing/queueing/htb.hpp"
+#include "linux/routing/queueing/ingress.hpp"
+#include "linux/routing/queueing/statistics.hpp"
+
+#include "mesos/resources.hpp"
+
+#include "slave/constants.hpp"
+
+#include "slave/containerizer/mesos/isolators/network/port_mapping.hpp"
+
+using namespace mesos::internal;
+
+using namespace process;
+
+using namespace routing;
+using namespace routing::filter;
+using namespace routing::queueing;
+using namespace routing::queueing::statistics;
+
+using std::cerr;
+using std::cout;
+using std::dec;
+using std::endl;
+using std::hex;
+using std::list;
+using std::ostringstream;
+using std::set;
+using std::sort;
+using std::string;
+using std::vector;
+
+using filter::ip::PortRange;
+
+using mesos::slave::ContainerLimitation;
+using mesos::slave::ContainerPrepareInfo;
+using mesos::slave::ContainerState;
+using mesos::slave::Isolator;
+
+// An old glibc might not have this symbol.
+#ifndef MNT_DETACH
+#define MNT_DETACH 2
+#endif
+
+namespace mesos {
+namespace internal {
+namespace slave {
+
+// The minimum number of ephemeral ports a container should have.
+static const uint16_t MIN_EPHEMERAL_PORTS_SIZE = 16;
+
+// Linux traffic control is a combination of queueing disciplines,
+// filters and classes organized as a tree for the ingress (tx) and
+// egress (rx) flows for each interface. Each container provides two
+// networking interfaces, a virtual eth0 and a loopback interface. The
+// flow of packets from the external network to container is shown
+// below:
+//
+//   +----------------------+----------------------+
+//   |                   Container                 |
+//   |----------------------|----------------------|
+//   |       eth0           |          lo          |
+//   +----------------------+----------------------+
+//          ^   |         ^           |
+//      [3] |   | [4]     |           |
+//          |   |     [7] +-----------+ [10]
+//          |   |
+//          |   |     [8] +-----------+ [9]
+//      [2] |   | [5]     |           |
+//          |   v         v           v
+//   +----------------------+----------------------+
+//   |      veth0           |          lo          |
+//   +----------------------|----------------------+
+//   |                     Host                    |
+//   |----------------------|----------------------|
+//   |                    eth0                     |
+//   +----------------------+----------------------|
+//                    ^           |
+//                [1] |           | [6]
+//                    |           v
+//
+// Traffic flowing from outside the network into a container enters
+// the system via the host ingress interface [1] and is routed based
+// on destination port to the outbound interface for the matching
+// container [2], which forwards the packet to the container's inbound
+// virtual interface. Outbound traffic destined for the external
+// network flows along the reverse path [4,5,6]. Loopback traffic is
+// directed to the corresponding Ethernet interface, either [7,10] or
+// [8,9] where the same destination port routing can be applied as to
+// external traffic. We use traffic control filters at several of the
+// interfaces to create these packet paths.
+//
+// Linux provides only a very simple topology for ingress interfaces.
+// A root is provided on a fixed handle (handle::INGRESS_ROOT) under
+// which a single qdisc can be installed, with handle ingress::HANDLE.
+// Traffic control filters can then be attached to the ingress qdisc.
+// We install one or more ingress filters on the host eth0 [1] to
+// direct traffic to the correct container, and on the container
+// virtual eth0 [5] to direct traffic to other containers or out of
+// the box. Since we know the ip port assignments for each container,
+// we can direct traffic directly to the appropriate container.
+// However, for ICMP and ARP traffic where no equivalent to a port
+// exists, we send a copy of the packet to every container and rely on
+// the network stack to drop unexpected packets.
+//
+// We install a Hierarchical Token Bucket (HTB) qdisc and class to
+// limit the outbound traffic bandwidth as the egress qdisc inside the
+// container [4] and then add a fq_codel qdisc to limit head of line
+// blocking on the egress filter. The egress traffic control chain is
+// thus:
+//
+// root device: handle::EGRESS_ROOT ->
+//    htb egress qdisc: CONTAINER_TX_HTB_HANDLE ->
+//        htb rate limiting class: CONTAINER_TX_HTB_CLASS_ID ->
+//            buffer-bloat reduction: FQ_CODEL
+constexpr Handle CONTAINER_TX_HTB_HANDLE = Handle(1, 0);
+constexpr Handle CONTAINER_TX_HTB_CLASS_ID =
+    Handle(CONTAINER_TX_HTB_HANDLE, 1);
+
+
+// Finally we create a second fq_codel qdisc on the public interface
+// of the host [6] to reduce performance interference between
+// containers. We create independent flows for each container, and
+// one for the host, which ensures packets from each container are
+// guaranteed fair access to the host interface. This egress traffic
+// control chain for the host interface is thus:
+//
+// root device: handle::EGRESS_ROOT ->
+//    buffer-bloat reduction: FQ_CODEL
+constexpr Handle HOST_TX_FQ_CODEL_HANDLE = Handle(1, 0);
+
+
+// The primary priority used by each type of filter.
+static const uint8_t ARP_FILTER_PRIORITY = 1;
+static const uint8_t ICMP_FILTER_PRIORITY = 2;
+static const uint8_t IP_FILTER_PRIORITY = 3;
+static const uint8_t DEFAULT_FILTER_PRIORITY = 4;
+
+
+// The secondary priorities used by filters.
+static const uint8_t HIGH = 1;
+static const uint8_t NORMAL = 2;
+static const uint8_t LOW = 3;
+
+
+// We assign a separate flow on host eth0 egress for each container
+// (See MESOS-2422 for details). Host egress traffic is assigned to a
+// reserved flow (HOST_FLOWID). ARP and ICMP traffic from containers
+// are not heavy, so they can share the same flow.
+static const uint16_t HOST_FLOWID = 1;
+static const uint16_t ARP_FLOWID = 2;
+static const uint16_t ICMP_FLOWID = 2;
+static const uint16_t CONTAINER_MIN_FLOWID = 3;
+
+
+// The well known ports. Used for sanity check.
+static Interval<uint16_t> WELL_KNOWN_PORTS()
+{
+  return (Bound<uint16_t>::closed(0), Bound<uint16_t>::open(1024));
+}
+
+
+/////////////////////////////////////////////////
+// Helper functions for the isolator.
+/////////////////////////////////////////////////
+
+// Given an integer x, find the largest integer t such that t <= x and
+// t is aligned to power of 2.
+static uint32_t roundDownToPowerOfTwo(uint32_t x)
+{
+  // Mutate x from 00001XXX to 0x00001111.
+
+  // We know the MSB has to be a 1, so kill the LSB and make sure the
+  // first 2 most significant bits are 1s.
+  x = x | (x >> 1);
+
+  // Now that the 2 most significant bits are 1s, make sure the first
+  // 4 most significant bits are 1s, too.
+  x = x | (x >> 2);
+
+  // We keep going. Note that the 0s left to the MSB are never turned
+  // to 1s.
+  x = x | (x >> 4);
+  x = x | (x >> 8);
+
+  // Now we have covered all 32 bits.
+  x = x | (x >> 16);
+
+  // 0x00001111 - (0x00001111 >> 1)
+  return x - (x >> 1);
+}
+
+
+// Returns the name of the host end of the virtual ethernet pair for a
+// given container. The kernel restricts link name to 16 characters or
+// less, so we cannot put container ID into the device name. Instead,
+// we use the pid of the executor process forked by the slave to
+// uniquely name the device for each container. It's safe because we
+// cannot have two active containers having the same pid for the
+// executor process.
+static string veth(pid_t pid)
+{
+  return PORT_MAPPING_VETH_PREFIX() + stringify(pid);
+}
+
+
+// Extracts the pid from the given veth name.
+static Option<pid_t> getPidFromVeth(const string& veth)
+{
+  if (strings::startsWith(veth, PORT_MAPPING_VETH_PREFIX())) {
+    Try<pid_t> pid = numify<pid_t>(
+        strings::remove(veth, PORT_MAPPING_VETH_PREFIX(), strings::PREFIX));
+
+    if (pid.isSome()) {
+      return pid.get();
+    }
+  }
+
+  return None();
+}
+
+
+// Extracts the container ID from the symlink that points to the
+// network namespace handle. The following is the layout of the bind
+// mount root and bind mount symlink root:
+//  <PORT_MAPPING_BIND_MOUNT_ROOT()>
+//    |--- 3945 (pid)                           <-|
+//                                                |
+//  <PORT_MAPPING_BIND_MOUNT_SYMLINK_ROOT()>      |
+//    |--- ecf293e7-e6e8-4cbc-aaee-4d6c958aa276 --|
+//         (symlink: container ID -> pid)
+static Try<ContainerID> getContainerIdFromSymlink(const string& symlink)
+{
+  if (!os::stat::islink(symlink)) {
+    return Error("Not a symlink");
+  }
+
+  string _containerId = Path(symlink).basename();
+
+  ContainerID containerId;
+  containerId.set_value(_containerId);
+
+  return containerId;
+}
+
+
+// Extracts the pid from the network namespace handle. Returns None if
+// the handle is clearly not created by us.
+static Result<pid_t> getPidFromNamespaceHandle(const string& handle)
+{
+  if (os::stat::islink(handle)) {
+    return Error("Not expecting a symlink");
+  }
+
+  string _pid = Path(handle).basename();
+
+  Try<pid_t> pid = numify<pid_t>(_pid);
+  if (pid.isError()) {
+    return None();
+  }
+
+  return pid.get();
+}
+
+
+// Extracts the pid from the symlink that points to the network
+// namespace handle. Returns None if it's a dangling symlink.
+static Result<pid_t> getPidFromSymlink(const string& symlink)
+{
+  if (!os::stat::islink(symlink)) {
+    return Error("Not a symlink");
+  }
+
+  Result<string> target = os::realpath(symlink);
+  if (target.isError()) {
+    return Error("Failed to follow the symlink: " + target.error());
+  } else if (target.isNone()) {
+    // This is a dangling symlink.
+    return None();
+  }
+
+  return getPidFromNamespaceHandle(target.get());
+}
+
+
+static string getSymlinkPath(const ContainerID& containerId)
+{
+  return path::join(
+      PORT_MAPPING_BIND_MOUNT_SYMLINK_ROOT(),
+      stringify(containerId));
+}
+
+
+static string getNamespaceHandlePath(pid_t pid)
+{
+  return path::join(
+      PORT_MAPPING_BIND_MOUNT_ROOT(),
+      stringify(pid));
+}
+
+
+// Converts from value ranges to interval set.
+static IntervalSet<uint16_t> getIntervalSet(const Value::Ranges& ranges)
+{
+  IntervalSet<uint16_t> set;
+
+  for (int i = 0; i < ranges.range_size(); i++) {
+    set += (Bound<uint16_t>::closed(ranges.range(i).begin()),
+            Bound<uint16_t>::closed(ranges.range(i).end()));
+  }
+
+  return set;
+}
+
+/////////////////////////////////////////////////
+// Implementation for PortMappingUpdate.
+/////////////////////////////////////////////////
+
+const char* PortMappingUpdate::NAME = "update";
+
+
+PortMappingUpdate::Flags::Flags()
+{
+  add(&eth0_name,
+      "eth0_name",
+      "The name of the public network interface (e.g., eth0)");
+
+  add(&lo_name,
+      "lo_name",
+      "The name of the loopback network interface (e.g., lo)");
+
+  add(&pid,
+      "pid",
+      "The pid of the process whose namespaces we will enter");
+
+  add(&ports_to_add,
+      "ports_to_add",
+      "A collection of port ranges (formatted as a JSON object)\n"
+      "for which to add IP filters. E.g.,\n"
+      "--ports_to_add={\"range\":[{\"begin\":4,\"end\":8}]}");
+
+  add(&ports_to_remove,
+      "ports_to_remove",
+      "A collection of port ranges (formatted as a JSON object)\n"
+      "for which to remove IP filters. E.g.,\n"
+      "--ports_to_remove={\"range\":[{\"begin\":4,\"end\":8}]}");
+}
+
+
+// The following two helper functions allow us to convert from a
+// collection of port ranges to a JSON object and vice versa. They
+// will be used for the port mapping update operation.
+template <typename Iterable>
+JSON::Object json(const Iterable& ranges)
+{
+  Value::Ranges values;
+  foreach (const PortRange& range, ranges) {
+    Value::Range value;
+    value.set_begin(range.begin());
+    value.set_end(range.end());
+
+    values.add_range()->CopyFrom(value);
+  }
+  return JSON::Protobuf(values);
+}
+
+
+static Try<vector<PortRange>> parse(const JSON::Object& object)
+{
+  Try<Value::Ranges> parsing = protobuf::parse<Value::Ranges>(object);
+  if (parsing.isError()) {
+    return Error("Failed to parse JSON: " + parsing.error());
+  }
+
+  vector<PortRange> ranges;
+  Value::Ranges values = parsing.get();
+  for (int i = 0; i < values.range_size(); i++) {
+    const Value::Range& value = values.range(i);
+    Try<PortRange> range = PortRange::fromBeginEnd(value.begin(), value.end());
+    if (range.isError()) {
+      return Error("Invalid port range: " + range.error());
+    }
+
+    ranges.push_back(range.get());
+  }
+  return ranges;
+}
+
+
+// Helper function to set up IP filters inside the container for a
+// given port range.
+static Try<Nothing> addContainerIPFilters(
+    const PortRange& range,
+    const string& eth0,
+    const string& lo)
+{
+  // Add an IP packet filter on lo such that local traffic inside a
+  // container will not be redirected to eth0.
+  Try<bool> loTerminal = filter::ip::create(
+      lo,
+      ingress::HANDLE,
+      ip::Classifier(None(), None(), None(), range),
+      Priority(IP_FILTER_PRIORITY, HIGH),
+      action::Terminal());
+
+  if (loTerminal.isError()) {
+    return Error(
+        "Failed to create an IP packet filter on " + lo +
+        " which stops packets from being sent to " + eth0 +
+        ": " + loTerminal.error());
+  } else if (!loTerminal.get()) {
+    return Error(
+        "The IP packet filter on " + lo +
+        " which stops packets from being sent to " +
+        eth0 + " already exists");
+  }
+
+  // Add an IP packet filter (for loopback IP) from eth0 to lo to
+  // redirect all loopback IP traffic to lo.
+  Try<bool> eth0ToLoLoopback = filter::ip::create(
+      eth0,
+      ingress::HANDLE,
+      ip::Classifier(
+          None(),
+          net::IPNetwork::LOOPBACK_V4().address(),
+          None(),
+          range),
+      Priority(IP_FILTER_PRIORITY, NORMAL),
+      action::Redirect(lo));
+
+  if (eth0ToLoLoopback.isError()) {
+    return Error(
+        "Failed to create an IP packet filter (for loopback IP) from " +
+        eth0 + " to " + lo + ": " + eth0ToLoLoopback.error());
+  } else if (!eth0ToLoLoopback.get()) {
+    return Error(
+        "The IP packet filter (for loopback IP) from " +
+        eth0 + " to " + lo + " already exists");
+  }
+
+  return Nothing();
+}
+
+
+// Helper function to remove IP filters inside the container for a
+// given port range.
+static Try<Nothing> removeContainerIPFilters(
+    const PortRange& range,
+    const string& eth0,
+    const string& lo)
+{
+  // Remove the 'terminal' IP packet filter on lo.
+  Try<bool> loTerminal = filter::ip::remove(
+      lo,
+      ingress::HANDLE,
+      ip::Classifier(None(), None(), None(), range));
+
+  if (loTerminal.isError()) {
+    return Error(
+        "Failed to remove the IP packet filter on " + lo +
+        " which stops packets from being sent to " + eth0 +
+        ": " + loTerminal.error());
+  } else if (!loTerminal.get()) {
+    return Error(
+        "The IP packet filter on " + lo +
+        " which stops packets from being sent to " + eth0 +
+        " does not exist");
+  }
+
+  // Remove the IP packet filter (for loopback IP) from eth0 to lo.
+  Try<bool> eth0ToLoLoopback = filter::ip::remove(
+      eth0,
+      ingress::HANDLE,
+      ip::Classifier(
+          None(),
+          net::IPNetwork::LOOPBACK_V4().address(),
+          None(),
+          range));
+
+  if (eth0ToLoLoopback.isError()) {
+    return Error(
+        "Failed to remove the IP packet filter (for loopback IP) from " +
+        eth0 + " to " + lo + ": " + eth0ToLoLoopback.error());
+  } else if (!eth0ToLoLoopback.get()) {
+    return Error(
+        "The IP packet filter (for loopback IP) from " +
+        eth0 + " to " + lo + " does not exist");
+  }
+
+  return Nothing();
+}
+
+
+int PortMappingUpdate::execute()
+{
+  if (flags.help) {
+    cerr << "Usage: " << name() << " [OPTIONS]" << endl << endl
+         << "Supported options:" << endl
+         << flags.usage();
+    return 0;
+  }
+
+  if (flags.eth0_name.isNone()) {
+    cerr << "The public interface name (e.g., eth0) is not specified" << endl;
+    return 1;
+  }
+
+  if (flags.lo_name.isNone()) {
+    cerr << "The loopback interface name (e.g., lo) is not specified" << endl;
+    return 1;
+  }
+
+  if (flags.pid.isNone()) {
+    cerr << "The pid is not specified" << endl;
+    return 1;
+  }
+
+  if (flags.ports_to_add.isNone() && flags.ports_to_remove.isNone()) {
+    cerr << "Nothing to update" << endl;
+    return 1;
+  }
+
+  Option<vector<PortRange>> portsToAdd;
+  Option<vector<PortRange>> portsToRemove;
+
+  if (flags.ports_to_add.isSome()) {
+    Try<vector<PortRange>> parsing = parse(flags.ports_to_add.get());
+    if (parsing.isError()) {
+      cerr << "Parsing 'ports_to_add' failed: " << parsing.error() << endl;
+      return 1;
+    }
+    portsToAdd = parsing.get();
+  }
+
+  if (flags.ports_to_remove.isSome()) {
+    Try<vector<PortRange>> parsing = parse(flags.ports_to_remove.get());
+    if (parsing.isError()) {
+      cerr << "Parsing 'ports_to_remove' failed: " << parsing.error() << endl;
+      return 1;
+    }
+    portsToRemove = parsing.get();
+  }
+
+  // Enter the network namespace.
+  Try<Nothing> setns = ns::setns(flags.pid.get(), "net");
+  if (setns.isError()) {
+    cerr << "Failed to enter the network namespace of pid " << flags.pid.get()
+         << ": " << setns.error() << endl;
+    return 1;
+  }
+
+  // Update IP packet filters.
+  const string eth0 = flags.eth0_name.get();
+  const string lo = flags.lo_name.get();
+
+  if (portsToAdd.isSome()) {
+    foreach (const PortRange& range, portsToAdd.get()) {
+      Try<Nothing> add = addContainerIPFilters(range, eth0, lo);
+      if (add.isError()) {
+        cerr << "Failed to add IP filters: " << add.error() << endl;
+        return 1;
+      }
+    }
+  }
+
+  if (portsToRemove.isSome()) {
+    foreach (const PortRange& range, portsToRemove.get()) {
+      Try<Nothing> remove = removeContainerIPFilters(range, eth0, lo);
+      if (remove.isError()) {
+        cerr << "Failed to remove IP filters: " << remove.error() << endl;
+        return 1;
+      }
+    }
+  }
+
+  return 0;
+}
+
+/////////////////////////////////////////////////
+// Implementation for PortMappingStatistics.
+/////////////////////////////////////////////////
+
+const char* PortMappingStatistics::NAME = "statistics";
+
+
+PortMappingStatistics::Flags::Flags()
+{
+  add(&eth0_name,
+      "eth0_name",
+      "The name of the public network interface (e.g., eth0)");
+
+  add(&pid,
+      "pid",
+      "The pid of the process whose namespaces we will enter");
+
+  add(&enable_socket_statistics_summary,
+      "enable_socket_statistics_summary",
+      "Whether to collect socket statistics summary for this container\n",
+      false);
+
+  add(&enable_socket_statistics_details,
+      "enable_socket_statistics_details",
+      "Whether to collect socket statistics details (e.g., TCP RTT)\n"
+      "for this container.",
+      false);
+}
+
+
+// A helper that copies the traffic control statistics from the
+// statistics hashmap into the ResourceStatistics protocol buffer.
+static void addTrafficControlStatistics(
+    const string& id,
+    const hashmap<string, uint64_t>& statistics,
+    ResourceStatistics* result)
+{
+  TrafficControlStatistics *tc = result->add_net_traffic_control_statistics();
+
+  tc->set_id(id);
+
+  // TODO(pbrett) Use protobuf reflection here.
+  if (statistics.contains(BACKLOG)) {
+    tc->set_backlog(statistics.at(BACKLOG));
+  }
+  if (statistics.contains(BYTES)) {
+    tc->set_bytes(statistics.at(BYTES));
+  }
+  if (statistics.contains(DROPS)) {
+    tc->set_drops(statistics.at(DROPS));
+  }
+  if (statistics.contains(OVERLIMITS)) {
+    tc->set_overlimits(statistics.at(OVERLIMITS));
+  }
+  if (statistics.contains(PACKETS)) {
+    tc->set_packets(statistics.at(PACKETS));
+  }
+  if (statistics.contains(QLEN)) {
+    tc->set_qlen(statistics.at(QLEN));
+  }
+  if (statistics.contains(RATE_BPS)) {
+    tc->set_ratebps(statistics.at(RATE_BPS));
+  }
+  if (statistics.contains(RATE_PPS)) {
+    tc->set_ratepps(statistics.at(RATE_PPS));
+  }
+  if (statistics.contains(REQUEUES)) {
+    tc->set_requeues(statistics.at(REQUEUES));
+  }
+}
+
+
+int PortMappingStatistics::execute()
+{
+  if (flags.help) {
+    cerr << "Usage: " << name() << " [OPTIONS]" << endl << endl
+         << "Supported options:" << endl
+         << flags.usage();
+    return 0;
+  }
+
+  if (flags.pid.isNone()) {
+    cerr << "The pid is not specified" << endl;
+    return 1;
+  }
+
+  if (flags.eth0_name.isNone()) {
+    cerr << "The public interface name (e.g., eth0) is not specified" << endl;
+    return 1;
+  }
+
+  // Enter the network namespace.
+  Try<Nothing> setns = ns::setns(flags.pid.get(), "net");
+  if (setns.isError()) {
+    // This could happen if the executor exits before this function is
+    // invoked. We do not log here to avoid spurious logging.
+    return 1;
+  }
+
+  ResourceStatistics result;
+
+  // NOTE: We use a dummy value here since this field will be cleared
+  // before the result is sent to the containerizer.
+  result.set_timestamp(0);
+
+  if (flags.enable_socket_statistics_summary) {
+    // Collections for socket statistics summary are below.
+
+    // For TCP, get the number of ACTIVE and TIME_WAIT connections,
+    // from reading /proc/net/sockstat (/proc/net/sockstat6 for IPV6).
+    // This is not as expensive in the kernel because only counter
+    // values are accessed instead of a dump of all the sockets.
+    // Example output:
+
+    // $ cat /proc/net/sockstat
+    // sockets: used 1391
+    // TCP: inuse 33 orphan 0 tw 0 alloc 37 mem 6
+    // UDP: inuse 15 mem 7
+    // UDPLITE: inuse 0
+    // RAW: inuse 0
+    // FRAG: inuse 0 memory 0
+
+    Try<string> value = os::read("/proc/net/sockstat");
+    if (value.isError()) {
+      cerr << "Failed to read /proc/net/sockstat: " << value.error() << endl;
+      return 1;
+    }
+
+    foreach (const string& line, strings::tokenize(value.get(), "\n")) {
+      if (!strings::startsWith(line, "TCP")) {
+        continue;
+      }
+
+      vector<string> tokens = strings::tokenize(line, " ");
+      for (size_t i = 0; i < tokens.size(); i++) {
+        if (tokens[i] == "inuse") {
+          if (i + 1 >= tokens.size()) {
+            cerr << "Unexpected output from /proc/net/sockstat" << endl;
+            // Be a bit forgiving here here since the /proc file
+            // output format can change, though not very likely.
+            continue;
+          }
+
+          // Set number of active TCP connections.
+          Try<size_t> inuse = numify<size_t>(tokens[i+1]);
+          if (inuse.isError()) {
+            cerr << "Failed to parse the number of tcp connections in use: "
+                 << inuse.error() << endl;
+            continue;
+          }
+
+          result.set_net_tcp_active_connections(inuse.get());
+        } else if (tokens[i] == "tw") {
+          if (i + 1 >= tokens.size()) {
+            cerr << "Unexpected output from /proc/net/sockstat" << endl;
+            // Be a bit forgiving here here since the /proc file
+            // output format can change, though not very likely.
+            continue;
+          }
+
+          // Set number of TIME_WAIT TCP connections.
+          Try<size_t> tw = numify<size_t>(tokens[i+1]);
+          if (tw.isError()) {
+            cerr << "Failed to parse the number of tcp connections in"
+                 << " TIME_WAIT: " << tw.error() << endl;
+            continue;
+          }
+
+          result.set_net_tcp_time_wait_connections(tw.get());
+        }
+      }
+    }
+  }
+
+  if (flags.enable_socket_statistics_details) {
+    // Collections for socket statistics details are below.
+
+    // NOTE: If the underlying library uses the older version of
+    // kernel API, the family argument passed in may not be honored.
+    Try<vector<diagnosis::socket::Info>> infos =
+      diagnosis::socket::infos(AF_INET, diagnosis::socket::state::ALL);
+
+    if (infos.isError()) {
+      cerr << "Failed to retrieve the socket information" << endl;
+      return 1;
+    }
+
+    vector<uint32_t> RTTs;
+    foreach (const diagnosis::socket::Info& info, infos.get()) {
+      // We double check on family regardless.
+      if (info.family != AF_INET) {
+        continue;
+      }
+
+      // We consider all sockets that have non-zero rtt value.
+      if (info.tcpInfo.isSome() && info.tcpInfo.get().tcpi_rtt != 0) {
+        RTTs.push_back(info.tcpInfo.get().tcpi_rtt);
+      }
+    }
+
+    // Only print to stdout when we have results.
+    if (RTTs.size() > 0) {
+      std::sort(RTTs.begin(), RTTs.end());
+
+      // NOTE: The size of RTTs is usually within 1 million so we
+      // don't need to worry about overflow here.
+      // TODO(jieyu): Right now, we choose to use "Nearest rank" for
+      // simplicity. Consider directly using the Statistics abstraction
+      // which computes "Linear interpolation between closest ranks".
+      // http://en.wikipedia.org/wiki/Percentile
+      size_t p50 = RTTs.size() * 50 / 100;
+      size_t p90 = RTTs.size() * 90 / 100;
+      size_t p95 = RTTs.size() * 95 / 100;
+      size_t p99 = RTTs.size() * 99 / 100;
+
+      result.set_net_tcp_rtt_microsecs_p50(RTTs[p50]);
+      result.set_net_tcp_rtt_microsecs_p90(RTTs[p90]);
+      result.set_net_tcp_rtt_microsecs_p95(RTTs[p95]);
+      result.set_net_tcp_rtt_microsecs_p99(RTTs[p99]);
+    }
+  }
+
+  // Collect traffic statistics for the container from the container
+  // virtual interface and export them in JSON.
+  const string& eth0 = flags.eth0_name.get();
+
+  // Overlimits are reported on the HTB qdisc at the egress root.
+  Result<hashmap<string, uint64_t>> statistics =
+    htb::statistics(eth0, EGRESS_ROOT);
+
+  if (statistics.isSome()) {
+    addTrafficControlStatistics(
+        NET_ISOLATOR_BW_LIMIT,
+        statistics.get(),
+        &result);
+  } else if (statistics.isNone()) {
+    // Traffic control statistics are only available when the
+    // container is created on a slave when the egress rate limit is
+    // on (i.e., egress_rate_limit_per_container flag is set). We
+    // can't just test for that flag here however, since the slave may
+    // have been restarted with different flags since the container
+    // was created. It is also possible that isolator statistics are
+    // unavailable because we the container is in the process of being
+    // created or destroy. Hence we do not report a lack of network
+    // statistics as an error.
+  } else if (statistics.isError()) {
+    cerr << "Failed to get htb qdisc statistics on " << eth0
+         << " in namespace " << flags.pid.get() << endl;
+  }
+
+  // Drops due to the bandwidth limit should be reported at the leaf.
+  statistics = fq_codel::statistics(eth0, CONTAINER_TX_HTB_CLASS_ID);
+  if (statistics.isSome()) {
+    addTrafficControlStatistics(
+        NET_ISOLATOR_BLOAT_REDUCTION,
+        statistics.get(),
+        &result);
+  } else if (statistics.isNone()) {
+    // See discussion on network isolator statistics above.
+  } else if (statistics.isError()) {
+    cerr << "Failed to get fq_codel qdisc statistics on " << eth0
+         << " in namespace " << flags.pid.get() << endl;
+  }
+
+  cout << stringify(JSON::Protobuf(result));
+  return 0;
+}
+
+
+/////////////////////////////////////////////////
+// Implementation for the isolator.
+/////////////////////////////////////////////////
+
+PortMappingIsolatorProcess::Metrics::Metrics()
+  : adding_eth0_ip_filters_errors(
+        "port_mapping/adding_eth0_ip_filters_errors"),
+    adding_eth0_ip_filters_already_exist(
+        "port_mapping/adding_eth0_ip_filters_already_exist"),
+    adding_eth0_egress_filters_errors(
+        "port_mapping/adding_eth0_egress_filters_errors"),
+    adding_eth0_egress_filters_already_exist(
+        "port_mapping/adding_eth0_egress_filters_already_exist"),
+    adding_lo_ip_filters_errors(
+        "port_mapping/adding_lo_ip_filters_errors"),
+    adding_lo_ip_filters_already_exist(
+        "port_mapping/adding_lo_ip_filters_already_exist"),
+    adding_veth_ip_filters_errors(
+        "port_mapping/adding_veth_ip_filters_errors"),
+    adding_veth_ip_filters_already_exist(
+        "port_mapping/adding_veth_ip_filters_already_exist"),
+    adding_veth_icmp_filters_errors(
+        "port_mapping/adding_veth_icmp_filters_errors"),
+    adding_veth_icmp_filters_already_exist(
+        "port_mapping/adding_veth_icmp_filters_already_exist"),
+    adding_veth_arp_filters_errors(
+        "port_mapping/adding_veth_arp_filters_errors"),
+    adding_veth_arp_filters_already_exist(
+        "port_mapping/adding_veth_arp_filters_already_exist"),
+    adding_eth0_icmp_filters_errors(
+        "port_mapping/adding_eth0_icmp_filters_errors"),
+    adding_eth0_icmp_filters_already_exist(
+        "port_mapping/adding_eth0_icmp_filters_already_exist"),
+    adding_eth0_arp_filters_errors(
+        "port_mapping/adding_eth0_arp_filters_errors"),
+    adding_eth0_arp_filters_already_exist(
+        "port_mapping/adding_eth0_arp_filters_already_exist"),
+    removing_eth0_ip_filters_errors(
+        "port_mapping/removing_eth0_ip_filters_errors"),
+    removing_eth0_ip_filters_do_not_exist(
+        "port_mapping/removing_eth0_ip_filters_do_not_exist"),
+    removing_eth0_egress_filters_errors(
+        "port_mapping/removing_eth0_egress_filters_errors"),
+    removing_eth0_egress_filters_do_not_exist(
+        "port_mapping/removinging_eth0_egress_filters_do_not_exist"),
+    removing_lo_ip_filters_errors(
+        "port_mapping/removing_lo_ip_filters_errors"),
+    removing_lo_ip_filters_do_not_exist(
+        "port_mapping/removing_lo_ip_filters_do_not_exist"),
+    removing_veth_ip_filters_errors(
+        "port_mapping/removing_veth_ip_filters_errors"),
+    removing_veth_ip_filters_do_not_exist(
+        "port_mapping/removing_veth_ip_filters_do_not_exist"),
+    removing_eth0_icmp_filters_errors(
+        "port_mapping/removing_eth0_icmp_filters_errors"),
+    removing_eth0_icmp_filters_do_not_exist(
+        "port_mapping/removing_eth0_icmp_filters_do_not_exist"),
+    removing_eth0_arp_filters_errors(
+        "port_mapping/removing_eth0_arp_filters_errors"),
+    removing_eth0_arp_filters_do_not_exist(
+        "port_mapping/removing_eth0_arp_filters_do_not_exist"),
+    updating_eth0_icmp_filters_errors(
+        "port_mapping/updating_eth0_icmp_filters_errors"),
+    updating_eth0_icmp_filters_already_exist(
+        "port_mapping/updating_eth0_icmp_filters_already_exist"),
+    updating_eth0_icmp_filters_do_not_exist(
+        "port_mapping/updating_eth0_icmp_filters_do_not_exist"),
+    updating_eth0_arp_filters_errors(
+        "port_mapping/updating_eth0_arp_filters_errors"),
+    updating_eth0_arp_filters_already_exist(
+        "port_mapping/updating_eth0_arp_filters_already_exist"),
+    updating_eth0_arp_filters_do_not_exist(
+        "port_mapping/updating_eth0_arp_filters_do_not_exist"),
+    updating_container_ip_filters_errors(
+        "port_mapping/updating_container_ip_filters_errors")
+{
+  process::metrics::add(adding_eth0_ip_filters_errors);
+  process::metrics::add(adding_eth0_ip_filters_already_exist);
+  process::metrics::add(adding_lo_ip_filters_errors);
+  process::metrics::add(adding_lo_ip_filters_already_exist);
+  process::metrics::add(adding_veth_ip_filters_errors);
+  process::metrics::add(adding_veth_ip_filters_already_exist);
+  process::metrics::add(adding_veth_icmp_filters_errors);
+  process::metrics::add(adding_veth_icmp_filters_already_exist);
+  process::metrics::add(adding_veth_arp_filters_errors);
+  process::metrics::add(adding_veth_arp_filters_already_exist);
+  process::metrics::add(adding_eth0_icmp_filters_errors);
+  process::metrics::add(adding_eth0_icmp_filters_already_exist);
+  process::metrics::add(adding_eth0_arp_filters_errors);
+  process::metrics::add(adding_eth0_arp_filters_already_exist);
+  process::metrics::add(removing_eth0_ip_filters_errors);
+  process::metrics::add(removing_eth0_ip_filters_do_not_exist);
+  process::metrics::add(removing_lo_ip_filters_errors);
+  process::metrics::add(removing_lo_ip_filters_do_not_exist);
+  process::metrics::add(removing_veth_ip_filters_errors);
+  process::metrics::add(removing_veth_ip_filters_do_not_exist);
+  process::metrics::add(removing_eth0_icmp_filters_errors);
+  process::metrics::add(removing_eth0_icmp_filters_do_not_exist);
+  process::metrics::add(removing_eth0_arp_filters_errors);
+  process::metrics::add(removing_eth0_arp_filters_do_not_exist);
+  process::metrics::add(updating_eth0_icmp_filters_errors);
+  process::metrics::add(updating_eth0_icmp_filters_already_exist);
+  process::metrics::add(updating_eth0_icmp_filters_do_not_exist);
+  process::metrics::add(updating_eth0_arp_filters_errors);
+  process::metrics::add(updating_eth0_arp_filters_already_exist);
+  process::metrics::add(updating_eth0_arp_filters_do_not_exist);
+  process::metrics::add(updating_container_ip_filters_errors);
+}
+
+
+PortMappingIsolatorProcess::Metrics::~Metrics()
+{
+  process::metrics::remove(adding_eth0_ip_filters_errors);
+  process::metrics::remove(adding_eth0_ip_filters_already_exist);
+  process::metrics::remove(adding_lo_ip_filters_errors);
+  process::metrics::remove(adding_lo_ip_filters_already_exist);
+  process::metrics::remove(adding_veth_ip_filters_errors);
+  process::metrics::remove(adding_veth_ip_filters_already_exist);
+  process::metrics::remove(adding_veth_icmp_filters_errors);
+  process::metrics::remove(adding_veth_icmp_filters_already_exist);
+  process::metrics::remove(adding_veth_arp_filters_errors);
+  process::metrics::remove(adding_veth_arp_filters_already_exist);
+  process::metrics::remove(adding_eth0_icmp_filters_errors);
+  process::metrics::remove(adding_eth0_icmp_filters_already_exist);
+  process::metrics::remove(adding_eth0_arp_filters_errors);
+  process::metrics::remove(adding_eth0_arp_filters_already_exist);
+  process::metrics::remove(removing_eth0_ip_filters_errors);
+  process::metrics::remove(removing_eth0_ip_filters_do_not_exist);
+  process::metrics::remove(removing_lo_ip_filters_errors);
+  process::metrics::remove(removing_lo_ip_filters_do_not_exist);
+  process::metrics::remove(removing_veth_ip_filters_errors);
+  process::metrics::remove(removing_veth_ip_filters_do_not_exist);
+  process::metrics::remove(removing_eth0_icmp_filters_errors);
+  process::metrics::remove(removing_eth0_icmp_filters_do_not_exist);
+  process::metrics::remove(removing_eth0_arp_filters_errors);
+  process::metrics::remove(removing_eth0_arp_filters_do_not_exist);
+  process::metrics::remove(updating_eth0_icmp_filters_errors);
+  process::metrics::remove(updating_eth0_icmp_filters_already_exist);
+  process::metrics::remove(updating_eth0_icmp_filters_do_not_exist);
+  process::metrics::remove(updating_eth0_arp_filters_errors);
+  process::metrics::remove(updating_eth0_arp_filters_already_exist);
+  process::metrics::remove(updating_eth0_arp_filters_do_not_exist);
+  process::metrics::remove(updating_container_ip_filters_errors);
+}
+
+
+Try<Isolator*> PortMappingIsolatorProcess::create(const Flags& flags)
+{
+  // Check for root permission.
+  if (geteuid() != 0) {
+    return Error("Using network isolator requires root permissions");
+  }
+
+  // Verify that the network namespace is available by checking the
+  // existence of the network namespace handle of the current process.
+  if (ns::namespaces().count("net") == 0) {
+    return Error(
+        "Using network isolator requires network namespace. "
+        "Make sure your kernel is newer than 3.4");
+  }
+
+  // Check the routing library.
+  Try<Nothing> check = routing::check();
+  if (check.isError()) {
+    return Error(
+        "Routing library check failed: " +
+        check.error());
+  }
+
+  // Check the availability of a few Linux commands that we will use.
+  // We use the blocking os::shell here because 'create' will only be
+  // invoked during initialization.
+  Try<string> checkCommandTc = os::shell("tc filter show");
+  if (checkCommandTc.isError()) {
+    return Error("Check command 'tc' failed: " + checkCommandTc.error());
+  }
+
+  Try<string> checkCommandIp = os::shell("ip link show");
+  if (checkCommandIp.isError()) {
+    return Error("Check command 'ip' failed: " + checkCommandIp.error());
+  }
+
+  Try<Resources> resources = Resources::parse(
+      flags.resources.getOrElse(""),
+      flags.default_role);
+
+  if (resources.isError()) {
+    return Error("Failed to parse --resources: " + resources.error());
+  }
+
+  // Get 'ports' resource from 'resources' flag. These ports will be
+  // treated as non-ephemeral ports.
+  IntervalSet<uint16_t> nonEphemeralPorts;
+  if (resources.get().ports().isSome()) {
+    nonEphemeralPorts = getIntervalSet(resources.get().ports().get());
+  }
+
+  // Get 'ephemeral_ports' resource from 'resources' flag. These ports
+  // will be allocated to each container as ephemeral ports.
+  IntervalSet<uint16_t> ephemeralPorts;
+  if (resources.get().ephemeral_ports().isSome()) {
+    ephemeralPorts = getIntervalSet(resources.get().ephemeral_ports().get());
+  }
+
+  // Each container requires at least one ephemeral port for slave
+  // executor communication. If no 'ephemeral_ports' resource is
+  // found, we will return error.
+  if (ephemeralPorts.empty()) {
+    return Error("Ephemeral ports are not specified");
+  }
+
+  // Sanity check to make sure that the ephemeral ports specified do
+  // not intersect with the specified non-ephemeral ports.
+  if (ephemeralPorts.intersects(nonEphemeralPorts)) {
+    return Error(
+        "The specified ephemeral ports " + stringify(ephemeralPorts) +
+        " intersect with the specified non-ephemeral ports " +
+        stringify(nonEphemeralPorts));
+  }
+
+  // This is a sanity check to make sure that the ephemeral ports
+  // specified do not intersect with the well known ports.
+  if (ephemeralPorts.intersects(WELL_KNOWN_PORTS())) {
+    return Error(
+        "The specified ephemeral ports " + stringify(ephemeralPorts) +
+        " intersect with well known ports " + stringify(WELL_KNOWN_PORTS()));
+  }
+
+  // Obtain the host ephemeral port range by reading the proc file
+  // system ('ip_local_port_range').
+  Try<string> value = os::read("/proc/sys/net/ipv4/ip_local_port_range");
+  if (value.isError()) {
+    return Error("Failed to read host ip_local_port_range: " + value.error());
+  }
+
+  vector<string> split = strings::split(strings::trim(value.get()), "\t");
+  if (split.size() != 2) {
+    return Error(
+        "Unexpected format from host ip_local_port_range: " + value.get());
+  }
+
+  Try<uint16_t> begin = numify<uint16_t>(split[0]);
+  if (begin.isError()) {
+    return Error(
+        "Failed to parse the begin of host ip_local_port_range: " + split[0]);
+  }
+
+  Try<uint16_t> end = numify<uint16_t>(split[1]);
+  if (end.isError()) {
+    return Error(
+        "Failed to parse the end of host ip_local_port_range: " + split[1]);
+  }
+
+  Interval<uint16_t> hostEphemeralPorts =
+    (Bound<uint16_t>::closed(begin.get()),
+     Bound<uint16_t>::closed(end.get()));
+
+  // Sanity check to make sure the specified ephemeral ports do not
+  // intersect with the ephemeral ports used by the host.
+  if (ephemeralPorts.intersects(hostEphemeralPorts)) {
+    return Error(
+        "The specified ephemeral ports " + stringify(ephemeralPorts) +
+        " intersect with the ephemeral ports used by the host " +
+        stringify(hostEphemeralPorts));
+  }
+
+  // TODO(chzhcn): Cross check ephemeral ports with used ports on the
+  // host (e.g., using port scan).
+
+  // Initialize the ephemeral ports allocator.
+
+  // In theory, any positive integer can be broken up into a few
+  // numbers that are power of 2 aligned. We choose to not allow this
+  // for now so that each container has a fixed (one) number of
+  // filters for ephemeral ports. This makes it easy to debug and
+  // infer performance.
+  if (roundDownToPowerOfTwo(flags.ephemeral_ports_per_container) !=
+      flags.ephemeral_ports_per_container) {
+    return Error(
+        "The number of ephemeral ports for each container (" +
+        stringify(flags.ephemeral_ports_per_container) +
+        ") is not a power of 2");
+  }
+
+  if (ephemeralPorts.size() < flags.ephemeral_ports_per_container) {
+    return Error(
+        "Network Isolator is given ephemeral ports of size: " +
+        stringify(ephemeralPorts.size()) + ", but asked to allocate " +
+        stringify(flags.ephemeral_ports_per_container) +
+        " ephemeral ports for a container");
+  }
+
+  if (flags.ephemeral_ports_per_container < MIN_EPHEMERAL_PORTS_SIZE) {
+    return Error(
+        "Each container has only " +
+        stringify(flags.ephemeral_ports_per_container) +
+        " ephemeral ports. The minimum required is: " +
+        stringify(MIN_EPHEMERAL_PORTS_SIZE));
+  }
+
+  Owned<EphemeralPortsAllocator> ephemeralPortsAllocator(
+      new EphemeralPortsAllocator(
+        ephemeralPorts,
+        flags.ephemeral_ports_per_container));
+
+  // Get the name of the public interface (e.g., eth0). If it is not
+  // specified, try to derive its name from the routing library.
+  Result<string> eth0 = link::eth0();
+  if (flags.eth0_name.isSome()) {
+    eth0 = flags.eth0_name.get();
+
+    // Check if the given public interface exists.
+    Try<bool> hostEth0Exists = link::exists(eth0.get());
+    if (hostEth0Exists.isError()) {
+      return Error(
+          "Failed to check if " + eth0.get() + " exists: " +
+          hostEth0Exists.error());
+    } else if (!hostEth0Exists.get()) {
+      return Error("The public interface " + eth0.get() + " does not exist");
+    }
+  } else if (!eth0.isSome()){
+    // eth0 is not specified in the flag and we did not get a valid
+    // eth0 from the library.
+    return Error(
+        "Network Isolator failed to find a public interface: " + eth0.error());
+  }
+
+  LOG(INFO) << "Using " << eth0.get() << " as the public interface";
+
+  // Get the name of the loopback interface. If it is not specified,
+  // try to derive its name based on the loopback IP address.
+  Result<string> lo = link::lo();
+  // Option<string> lo = flags.lo_name;
+  if (flags.lo_name.isSome()) {
+    lo = flags.lo_name;
+
+    // Check if the given loopback interface exists.
+    Try<bool> hostLoExists = link::exists(lo.get());
+    if (hostLoExists.isError()) {
+      return Error(
+          "Failed to check if " + lo.get() + " exists: " +
+          hostLoExists.error());
+    } else if (!hostLoExists.get()) {
+      return Error("The loopback interface " + lo.get() + " does not exist");
+    }
+  } else if (!lo.isSome()) {
+    // lo is not specified in the flag and we did not get a valid
+    // lo from the library.
+    return Error(
+        "Network Isolator failed to find a loopback interface: " + lo.error());
+  }
+
+  LOG(INFO) << "Using " << lo.get() << " as the loopback interface";
+
+  // If egress rate limit is provided, do a sanity check that it is
+  // not greater than the host physical link speed.
+  Option<Bytes> egressRateLimitPerContainer;
+  if (flags.egress_rate_limit_per_container.isSome()) {
+    // Read host physical link speed from /sys/class/net/eth0/speed.
+    // This value is in MBits/s.
+    Try<string> value =
+      os::read(path::join("/sys/class/net", eth0.get(), "speed"));
+
+    if (value.isError()) {
+      return Error(
+          "Failed to read " +
+          path::join("/sys/class/net", eth0.get(), "speed") +
+          ": " + value.error());
+    }
+
+    Try<uint64_t> hostLinkSpeed = numify<uint64_t>(strings::trim(value.get()));
+    CHECK_SOME(hostLinkSpeed);
+
+    // It could be possible that the nic driver doesn't support
+    // reporting physical link speed. In that case, report error.
+    if (hostLinkSpeed.get() == 0xFFFFFFFF) {
+      return Error(
+          "Network Isolator failed to determine link speed for " + eth0.get());
+    }
+
+    // Convert host link speed to Bytes/s for comparason.
+    if (hostLinkSpeed.get() * 1000000 / 8 <
+        flags.egress_rate_limit_per_container.get().bytes()) {
+      return Error(
+          "The given egress traffic limit for containers " +
+          stringify(flags.egress_rate_limit_per_container.get().bytes()) +
+          " Bytes/s is greater than the host link speed " +
+          stringify(hostLinkSpeed.get() * 1000000 / 8) + " Bytes/s");
+    }
+
+    if (flags.egress_rate_limit_per_container.get() != Bytes(0)) {
+      egressRateLimitPerContainer = flags.egress_rate_limit_per_container.get();
+    } else {
+      LOG(WARNING) << "Ignoring the given zero egress rate limit";
+    }
+  }
+
+  // Get the host IP network, MAC and default gateway.
+  Result<net::IPNetwork> hostIPNetwork =
+    net::IPNetwork::fromLinkDevice(eth0.get(), AF_INET);
+
+  if (!hostIPNetwork.isSome()) {
+    return Error(
+        "Failed to get the public IP network of " + eth0.get() + ": " +
+        (hostIPNetwork.isError() ?
+            hostIPNetwork.error() :
+            "does not have an IPv4 network"));
+  }
+
+  Result<net::MAC> hostMAC = net::mac(eth0.get());
+  if (!hostMAC.isSome()) {
+    return Error(
+        "Failed to get the MAC address of " + eth0.get() + ": " +
+        (hostMAC.isError() ? hostMAC.error() : "does not have a MAC address"));
+  }
+
+  Result<net::IP> hostDefaultGateway = route::defaultGateway();
+  if (!hostDefaultGateway.isSome()) {
+    return Error(
+        "Failed to get the default gateway of the host: " +
+        (hostDefaultGateway.isError() ? hostDefaultGateway.error()
+        : "The default gateway of the host does not exist"));
+  }
+
+  // Set the MAC address of the host loopback interface (lo) so that
+  // it matches that of the host public interface (eth0).  A fairly
+  // recent kernel patch is needed for this operation to succeed:
+  // https://git.kernel.org/cgit/linux/kernel/git/davem/net.git/:
+  // 25f929fbff0d1bcebf2e92656d33025cd330cbf8
+  Try<bool> setHostLoMAC = link::setMAC(lo.get(), hostMAC.get());
+  if (setHostLoMAC.isError()) {
+    return Error(
+        "Failed to set the MAC address of " + lo.get() +
+        ": " + setHostLoMAC.error());
+  }
+
+  // Set the MTU of the host loopback interface (lo) so that it
+  // matches that of the host public interface (eth0).
+  Result<unsigned int> hostEth0MTU = link::mtu(eth0.get());
+  if (hostEth0MTU.isError()) {
+    return Error(
+        "Failed to get the MTU of " + eth0.get() +
+        ": " + hostEth0MTU.error());
+  }
+
+  // The host public interface should exist since we just checked it.
+  CHECK_SOME(hostEth0MTU);
+
+  Try<bool> setHostLoMTU = link::setMTU(lo.get(), hostEth0MTU.get());
+  if (setHostLoMTU.isError()) {
+    return Error(
+        "Failed to set the MTU of " + lo.get() +
+        ": " + setHostLoMTU.error());
+  }
+
+  // Prepare the ingress queueing disciplines on host public interface
+  // (eth0) and host loopback interface (lo).
+  Try<bool> createHostEth0IngressQdisc = ingress::create(eth0.get());
+  if (createHostEth0IngressQdisc.isError()) {
+    return Error(
+        "Failed to create the ingress qdisc on " + eth0.get() +
+        ": " + createHostEth0IngressQdisc.error());
+  }
+
+  set<uint16_t> freeFlowIds;
+  if (flags.egress_unique_flow_per_container) {
+    // Prepare a fq_codel queueing discipline on host public interface
+    // (eth0) for egress flow classification.
+    //
+    // TODO(cwang): Maybe we can continue when some other egress qdisc
+    // exists because this is not a necessary qdisc for network
+    // isolation, but we don't want inconsistency, so we just fail in
+    // this case. See details in MESOS-2370.
+    Try<bool> createHostEth0EgressQdisc = fq_codel::create(
+        eth0.get(),
+        EGRESS_ROOT,
+        HOST_TX_FQ_CODEL_HANDLE);
+    if (createHostEth0EgressQdisc.isError()) {
+      return Error(
+          "Failed to create the egress qdisc on " + eth0.get() +
+          ": " + createHostEth0EgressQdisc.error());
+    }
+
+    // TODO(cwang): Make sure DEFAULT_FLOWS is large enough so that
+    // it's unlikely to run out of free flow IDs.
+    for (uint16_t i = CONTAINER_MIN_FLOWID; i < fq_codel::DEFAULT_FLOWS; i++) {
+      freeFlowIds.insert(i);
+    }
+  }
+
+  Try<bool> createHostLoQdisc = ingress::create(lo.get());
+  if (createHostLoQdisc.isError()) {
+    return Error(
+        "Failed to create the ingress qdisc on " + lo.get() +
+        ": " + createHostLoQdisc.error());
+  }
+
+  // Enable 'route_localnet' on host loopback interface (lo). This
+  // enables the use of 127.0.0.1/8 for local routing purpose. This
+  // feature only exists on kernel 3.6 or newer.
+  const string loRouteLocalnet =
+    path::join("/proc/sys/net/ipv4/conf", lo.get(), "route_localnet");
+
+  if (!os::exists(loRouteLocalnet)) {
+    // TODO(jieyu): Consider supporting running the isolator if this
+    // feature is not available. We need to conditionally disable
+    // routing for 127.0.0.1/8, and ask the tasks to use the public IP
+    // for container to container and container to host communication.
+    return Error("The kernel does not support 'route_localnet'");
+  }
+
+  Try<Nothing> write = os::write(loRouteLocalnet, "1");
+  if (write.isError()) {
+    return Error(
+        "Failed to enable route_localnet for " + lo.get() +
+        ": " + write.error());
+  }
+
+  // We disable 'rp_filter' and 'send_redirects' for host loopback
+  // interface (lo) to work around a kernel bug, which was only
+  // recently addressed in upstream in the following 3 commits.
+  // https://git.kernel.org/cgit/linux/kernel/git/davem/net.git/:
+  //   6a662719c9868b3d6c7d26b3a085f0cd3cc15e64
+  //   0d5edc68739f1c1e0519acbea1d3f0c1882a15d7
+  //   e374c618b1465f0292047a9f4c244bd71ab5f1f0
+  // The workaround ensures packets don't get dropped at lo.
+  write = os::write("/proc/sys/net/ipv4/conf/all/rp_filter", "0");
+  if (write.isError()) {
+    return Error(
+        "Failed to disable rp_filter for all: " + write.error());
+  }
+
+  write = os::write(path::join(
+      "/proc/sys/net/ipv4/conf", lo.get(), "rp_filter"), "0");
+  if (write.isError()) {
+    return Error(
+        "Failed to disable rp_filter for " + lo.get() +
+        ": " + write.error());
+  }
+
+  write = os::write("/proc/sys/net/ipv4/conf/all/send_redirects", "0");
+  if (write.isError()) {
+    return Error(
+        "Failed to disable send_redirects for all: " + write.error());
+  }
+
+  write = os::write(path::join(
+      "/proc/sys/net/ipv4/conf", lo.get(), "send_redirects"), "0");
+  if (write.isError()) {
+    return Error(
+        "Failed to disable send_redirects for " + lo.get() +
+        ": " + write.error());
+  }
+
+  // We need to enable accept_local on host loopback interface (lo)
+  // for kernels older than 3.6. Refer to the following:
+  // https://git.kernel.org/cgit/linux/kernel/git/davem/net.git/:
+  //   7a9bc9b81a5bc6e44ebc80ef781332e4385083f2
+  // https://www.kernel.org/doc/Documentation/networking/ip-sysctl.txt
+  write = os::write(path::join(
+      "/proc/sys/net/ipv4/conf", lo.get(), "accept_local"), "1");
+  if (write.isError()) {
+    return Error(
+        "Failed to enable accept_local for " + lo.get() +
+        ": " + write.error());
+  }
+
+  // Reading host network configurations. Each container will match
+  // these configurations.
+  hashset<string> procs;
+
+  // TODO(jieyu): The following is a partial list of all the
+  // configurations. In the future, we may want to expose these
+  // configurations using ContainerInfo.
+
+  // The kernel will use a default value for the following
+  // configurations inside a container. Therefore, we need to set them
+  // in the container to match that on the host.
+  procs.insert("/proc/sys/net/core/somaxconn");
+
+  // As of kernel 3.10, the following configurations are shared
+  // between host and containers, and therefore are not required to be
+  // set in containers. We keep them here just in case the kernel
+  // changes in the future.
+  procs.insert("/proc/sys/net/core/netdev_max_backlog");
+  procs.insert("/proc/sys/net/core/rmem_max");
+  procs.insert("/proc/sys/net/core/wmem_max");
+  procs.insert("/proc/sys/net/ipv4/tcp_keepalive_time");
+  procs.insert("/proc/sys/net/ipv4/tcp_keepalive_intvl");
+  procs.insert("/proc/sys/net/ipv4/tcp_keepalive_probes");
+  procs.insert("/proc/sys/net/ipv4/tcp_max_syn_backlog");
+  procs.insert("/proc/sys/net/ipv4/tcp_rmem");
+  procs.insert("/proc/sys/net/ipv4/tcp_retries2");
+  procs.insert("/proc/sys/net/ipv4/tcp_synack_retries");
+  procs.insert("/proc/sys/net/ipv4/tcp_wmem");
+  procs.insert("/proc/sys/net/ipv4/neigh/default/gc_thresh1");
+  procs.insert("/proc/sys/net/ipv4/neigh/default/gc_thresh2");
+  procs.insert("/proc/sys/net/ipv4/neigh/default/gc_thresh3");
+
+  hashmap<string, string> hostNetworkConfigurations;
+  foreach (const string& proc, procs) {
+    Try<string> value = os::read(proc);
+    if (value.isSome()) {
+      LOG(INFO) << proc << " = '" << strings::trim(value.get()) << "'";
+      hostNetworkConfigurations[proc] = strings::trim(value.get());
+    }
+  }
+
+  // Self bind mount PORT_MAPPING_BIND_MOUNT_ROOT(). Since we use a
+  // new mount namespace for each container, for this mount point, we
+  // set '--make-rshared' on the host and set '--make-rslave' inside
+  // each container. This is important because when we unmount the
+  // network namespace handles on the host, those handles will be
+  // unmounted in the containers as well, but NOT vice versa.
+
+  // We first create the bind mount directory if it does not exist.
+  Try<Nothing> mkdir = os::mkdir(PORT_MAPPING_BIND_MOUNT_ROOT());
+  if (mkdir.isError()) {
+    return Error(
+        "Failed to create the bind mount root directory at " +
+        PORT_MAPPING_BIND_MOUNT_ROOT() + ": " + mkdir.error());
+  }
+
+  // Now, check '/proc/mounts' to see if
+  // PORT_MAPPING_BIND_MOUNT_ROOT() has already been self mounted.
+  Try<fs::MountTable> mountTable = fs::MountTable::read("/proc/mounts");
+  if (mountTable.isError()) {
+    return Error(
+        "Failed to the read the mount table at '/proc/mounts': " +
+        mountTable.error());
+  }
+
+  Option<fs::MountTable::Entry> bindMountRoot;
+  foreach (const fs::MountTable::Entry& entry, mountTable.get().entries) {
+    if (entry.dir == PORT_MAPPING_BIND_MOUNT_ROOT()) {
+      bindMountRoot = entry;
+    }
+  }
+
+  // Self bind mount PORT_MAPPING_BIND_MOUNT_ROOT().
+  if (bindMountRoot.isNone()) {
+    // NOTE: Instead of using fs::mount to perform the bind mount, we
+    // use the shell command here because the syscall 'mount' does not
+    // update the mount table (i.e., /etc/mtab), which could cause
+    // issues for the shell command 'mount --make-rslave' inside the
+    // container. It's OK to use the blocking os::shell here because
+    // 'create' will only be invoked during initialization.
+    Try<string> mount = os::shell(
+        "mount --bind %s %s",
+        PORT_MAPPING_BIND_MOUNT_ROOT().c_str(),
+        PORT_MAPPING_BIND_MOUNT_ROOT().c_str());
+
+    if (mount.isError()) {
+      return Error(
+          "Failed to self bind mount '" + PORT_MAPPING_BIND_MOUNT_ROOT() +
+          "': " + mount.error());
+    }
+  }
+
+  // Mark the mount point PORT_MAPPING_BIND_MOUNT_ROOT() as
+  // recursively shared.
+  Try<string> mountShared = os::shell(
+      "mount --make-rshared %s",
+      PORT_MAPPING_BIND_MOUNT_ROOT().c_str());
+
+  if (mountShared.isError()) {
+    return Error(
+        "Failed to mark '" + PORT_MAPPING_BIND_MOUNT_ROOT() +
+        "' as recursively shared: " + mountShared.error());
+  }
+
+  // Create the network namespace handle symlink directory if it does
+  // not exist. It is used to host from network namespace handle
+  // symlinks whose basename is a container ID. This allows us to
+  // recover container IDs for orphan containers (i.e., not known by
+  // the slave). This is introduced in 0.23.0.
+  mkdir = os::mkdir(PORT_MAPPING_BIND_MOUNT_SYMLINK_ROOT());
+  if (mkdir.isError()) {
+    return Error(
+        "Failed to create the bind mount root directory at " +
+        PORT_MAPPING_BIND_MOUNT_SYMLINK_ROOT() + ": " + mkdir.error());
+  }
+
+  return new MesosIsolator(Owned<MesosIsolatorProcess>(
+      new PortMappingIsolatorProcess(
+          flags,
+          eth0.get(),
+          lo.get(),
+          hostMAC.get(),
+          hostIPNetwork.get(),
+          hostEth0MTU.get(),
+          hostDefaultGateway.get(),
+          hostNetworkConfigurations,
+          egressRateLimitPerContainer,
+          nonEphemeralPorts,
+          ephemeralPortsAllocator,
+          freeFlowIds)));
+}
+
+
+Future<Nothing> PortMappingIsolatorProcess::recover(
+    const list<ContainerState>& states,
+    const hashset<ContainerID>& orphans)
+{
+  // Extract pids from virtual device names (veth). This tells us
+  // about all the potential live containers on this slave.
+  Try<set<string>> links = net::links();
+  if (links.isError()) {
+    return Failure("Failed to get all the links: " + links.error());
+  }
+
+  hashset<pid_t> pids;
+  foreach (const string& name, links.get()) {
+    Option<pid_t> pid = getPidFromVeth(name);
+    // Not all links follow the naming: mesos{pid}, so we simply
+    // continue, e.g., eth0.
+    if (pid.isNone()) {
+      continue;
+    } else if (pids.contains(pid.get())) {
+      return Failure("Two virtual devices have the same name '" + name + "'");
+    }
+
+    pids.insert(pid.get());
+  }
+
+  // Scan the bind mount root to cleanup all stale network namespace
+  // handles that do not have an active veth associated with.
+  Try<list<string>> entries = os::ls(PORT_MAPPING_BIND_MOUNT_ROOT());
+  if (entries.isError()) {
+    return Failure(
+        "Failed to list bind mount root '" +
+        PORT_MAPPING_BIND_MOUNT_ROOT() +
+        "': " + entries.error());
+  }
+
+  foreach (const string& entry, entries.get()) {
+    const string path = path::join(PORT_MAPPING_BIND_MOUNT_ROOT(), entry);
+
+    // NOTE: We expect all regular files whose names are numbers under
+    // the bind mount root are network namespace handles.
+    Result<pid_t> pid = getPidFromNamespaceHandle(path);
+    if (pid.isError()) {
+      return Failure(
+          "Failed to get pid from network namespace handle '" +
+          path + "': " + pid.error());
+    } else if (pid.isNone()) {
+      // We ignore files that are clearly not network namespace
+      // handles created by us. It's likely that those are created by
+      // users or other tools.
+      LOG(WARNING) << "Unrecognized network namespace handle '" << path << "'";
+      continue;
+    }
+
+    // We cleanup the network namespace handle if the associated
+    // containers have clearly exited (i.e., the veth has gone). The
+    // cleanup here is best effort.
+    if (!pids.contains(pid.get())) {
+      LOG(INFO) << "Removing stale network namespace handle '" << path << "'";
+
+      Try<Nothing> unmount = fs::unmount(path, MNT_DETACH);
+      if (unmount.isError()) {
+        LOG(WARNING) << "Failed to unmount stale network namespace handle '"
+                     << path << "': " << unmount.error();
+      }
+
+      Try<Nothing> rm = os::rm(path);
+      if (rm.isError()) {
+        LOG(WARNING) << "Failed to remove stale network namespace handle '"
+                     << path << "': " << rm.error();
+      }
+    }
+  }
+
+  // Scan the bind mount symlink root for container IDs. This allows us
+  // to recover container IDs for orphan containers (i.e., not known
+  // by the slave). This is introduced in 0.23.0.
+  entries = os::ls(PORT_MAPPING_BIND_MOUNT_SYMLINK_ROOT());
+  if (entries.isError()) {
+    return Failure(
+        "Failed to list bind mount symlink root '" +
+        PORT_MAPPING_BIND_MOUNT_SYMLINK_ROOT() +
+        "': " + entries.error());
+  }
+
+  // This map stores the mapping between pids and container IDs
+  // recovered from the bind mount root that have valid veth links. We
+  // use a multihashmap here because multiple container IDs can map to
+  // the same pid if the removal of a symlink fails in '_cleanup()'
+  // and the pid is reused by a new container.
+  multihashmap<pid_t, ContainerID> linkers;
+
+  foreach (const string& entry, entries.get()) {
+    const string path =
+      path::join(PORT_MAPPING_BIND_MOUNT_SYMLINK_ROOT(), entry);
+
+    // We only create symlinks in this directory and assume
+    // non-symlink files are created by other users or tools,
+    // therefore will be ignored.
+    if (!os::stat::islink(path)) {
+      LOG(WARNING) << "Ignored non-symlink file '" << path
+                   << "' under bind mount symlink root '"
+                   << PORT_MAPPING_BIND_MOUNT_SYMLINK_ROOT() << "'";
+      continue;
+    }
+
+    // NOTE: We expect all symlinks under the bind mount symlink root
+    // to be container ID symlinks.
+
+    Try<ContainerID> containerId = getContainerIdFromSymlink(path);
+    if (containerId.isError()) {
+      return Failure(
+          "Failed to get container ID from network namespace handle symlink '" +
+          path + "': " + containerId.error());
+    }
+
+    Result<pid_t> pid = getPidFromSymlink(path);
+    if (pid.isError()) {
+      return Failure(
+          "Failed to get pid from network namespace handle symlink '" + path +
+          "': " + pid.error());
+    }
+
+    // We remove the symlink if it's dangling or the associated
+    // containers have clearly exited (i.e., the veth has gone). The
+    // cleanup here is best effort.
+    if (pid.isNone() || !pids.contains(pid.get())) {
+      LOG(INFO) << "Removing stale network namespace handle symlink '"
+                << path << "'";
+
+      Try<Nothing> rm = os::rm(path);
+      if (rm.isError()) {
+        LOG(WARNING) << "Failed to remove stale network namespace handle "
+                     << " symlink '" << path << "': " << rm.error();
+      }
+    } else {
+      LOG(INFO) << "Discovered network namespace handle symlink "
+                << containerId.get() << " -> " << pid.get();
+
+      linkers.put(pid.get(), containerId.get());
+    }
+  }
+
+  // If multiple container IDs point to the same pid, we remove both
+  // symlinks for safety (as if we cannot derive the container ID for
+  // orphans, which is OK because it'll be treated the same as those
+  // containers that are created by older (pre 0.23.0) versions). Note
+  // that it's possible that multiple container IDs map to the same
+  // pid if the removal of a symlink fails in '_cleanup()' and the pid
+  // is reused by a new container.
+  foreach (pid_t pid, linkers.keys()) {
+    list<ContainerID> containerIds = linkers.get(pid);
+    if (containerIds.size() > 1) {
+      foreach (const ContainerID& containerId, containerIds) {
+        const string linker = getSymlinkPath(containerId);
+
+        LOG(WARNING) << "Removing duplicated network namespace handle symlink '"
+                     << linker << "'";
+
+        Try<Nothing> rm = os::rm(linker);
+        if (rm.isError()) {
+          LOG(WARNING) << "Failed to remove duplicated network namespace "
+                       << "handle symlink '" << linker << "': " << rm.error();
+        }
+      }
+
+      linkers.remove(pid);
+    }
+  }
+
+  // Now, actually recover the isolator from slave's state.
+  foreach (const ContainerState& state, states) {
+    const ContainerID& containerId = state.container_id();
+    pid_t pid = state.pid();
+
+    VLOG(1) << "Recovering network isolator for container "
+            << containerId << " with pid " << pid;
+
+    if (!pids.contains(pid)) {
+      // There are two possible cases here:
+      //
+      // 1) The container was launched by the slave with network
+      //    isolation disabled, so the pid could not be found in the
+      //    device names in the system.
+      //
+      // 2) The container was launched by the slave with network
+      //    isolation enabled, but veth is removed (because the
+      //    corresponding container is destroyed), but the slave
+      //    restarts before it is able to write the sentinel file.
+      //
+      // In both cases, we treat the container as unmanaged. For case
+      // (2), it's safe to do so because the container has already
+      // been destroyed.
+      VLOG(1) << "Skipped recovery for container " << containerId
+              << "with pid " << pid << " as either it was not managed by "
+              << "the network isolator or it has already been destroyed";
+
+      unmanaged.insert(containerId);
+      continue;
+    }
+
+    Try<Info*> recover = _recover(pid);
+    if (recover.isError()) {
+      foreachvalue (Info* info, infos) {
+        delete info;
+      }
+
+      return Failure(
+          "Failed to recover container " + stringify(containerId) +
+          " with pid " + stringify(pid) + ": " + recover.error());
+    }
+
+    infos[containerId] = recover.get();
+
+    // Remove the successfully recovered pid.
+    pids.erase(pid);
+  }
+
+  // Recover orphans. Known orphans will be destroyed by containerizer
+  // using the normal cleanup path (refer to MESOS-2367 for details).
+  // Unknown orphans will be cleaned up immediately. The recovery will
+  // fail if there is some unknown orphan that cannot be cleaned up.
+  vector<Info*> unknownOrphans;
+
+  foreach (pid_t pid, pids) {
+    Try<Info*> recover = _recover(pid);
+    if (recover.isError()) {
+      foreachvalue (Info* info, infos) {
+        delete info;
+      }
+      foreach (Info* info, unknownOrphans) {
+        delete info;
+      }
+
+      return Failure(
+          "Failed to recover orphaned container with pid " +
+          stringify(pid) + ": " + recover.error());
+    }
+
+    if (linkers.get(pid).size() == 1) {
+      const ContainerID containerId = linkers.get(pid).front();
+      CHECK(!infos.contains(containerId));
+
+      if (orphans.contains(containerId)) {
+        infos[containerId] = recover.get();
+        continue;
+      }
+    }
+
+    unknownOrphans.push_back(recover.get());
+  }
+
+  foreach (Info* info, unknownOrphans) {
+    CHECK_SOME(info->pid);
+    pid_t pid = info->pid.get();
+
+    Option<ContainerID> containerId;
+    if (linkers.get(pid).size() == 1) {
+      containerId = linkers.get(pid).front();
+    }
+
+    // NOTE: If 'infos' is empty (means there is no regular container
+    // or known orphan), the '_cleanup' below will remove the ICMP and
+    // ARP packet filters on host eth0. This will cause subsequent
+    // calls to '_cleanup' for unknown orphans to fail. However, this
+    // is OK because when slave restarts and tries to recover again,
+    // it'll try to remove the remaining unknown orphans.
+    // TODO(jieyu): Consider call '_cleanup' for all the unknown
+    // orphans before returning even if error occurs.
+    Try<Nothing> cleanup = _cleanup(info, containerId);
+    if (cleanup.isError()) {
+      foreachvalue (Info* info, infos) {
+        delete info;
+      }
+
+      // TODO(jieyu): Also delete 'info' in unknownOrphans. Notice
+      // that some 'info' in unknownOrphans might have already been
+      // deleted in '_cleanup' above.
+
+      return Failure(
+          "Failed to cleanup orphaned container with pid " +
+          stringify(pid) + ": " + cleanup.error());
+    }
+  }
+
+  // TODO(cwang): Consider removing unrecognized flow classifiers from
+  // host eth0 egress.
+
+  LOG(INFO) << "Network isolator recovery complete";
+
+  return Nothing();
+}
+
+
+Try<PortMappingIsolatorProcess::Info*>
+PortMappingIsolatorProcess::_recover(pid_t pid)
+{
+  // Get all the IP filters on veth.
+  // NOTE: We only look at veth devices to recover port ranges
+  // assigned to each container. That's the reason why we need to make
+  // sure that we add filters to veth before adding filters to host
+  // eth0 and host lo. Also, we need to make sure we remove filters
+  // from host eth0 and host lo before removing filters from veth.
+  Result<vector<ip::Classifier>> vethIngressClassifiers =
+    ip::classifiers(veth(pid), ingress::HANDLE);
+
+  if (vethIngressClassifiers.isError()) {
+    return Error(
+        "Failed to get all the IP filters on " + veth(pid) +
+        ": " + vethIngressClassifiers.error());
+  } else if (vethIngressClassifiers.isNone()) {
+    return Error(
+        "Failed to get all the IP filters on " + veth(pid) +
+        ": link does not exist");
+  }
+
+  hashmap<PortRange, uint16_t> flowIds;
+
+  if (flags.egress_unique_flow_per_container) {
+    // Get all egress IP flow classifiers on eth0.
+    Result<vector<filter::Filter<ip::Classifier>>> eth0EgressFilters =
+      ip::filters(eth0, HOST_TX_FQ_CODEL_HANDLE);
+
+    if (eth0EgressFilters.isError()) {
+      return Error(
+          "Failed to get all the IP flow classifiers on " + eth0 +
+          ": " + eth0EgressFilters.error());
+    } else if (eth0EgressFilters.isNone()) {
+      return Error(
+          "Failed to get all the IP flow classifiers on " + eth0 +
+          ": link does not exist");
+    }
+
+    // Construct a port range to flow ID mapping from host eth0
+    // egress. This map will be used later.
+    foreach (const filter::Filter<ip::Classifier>& filter,
+             eth0EgressFilters.get()) {
+      const Option<PortRange> sourcePorts = filter.classifier.sourcePorts;
+      const Option<Handle> classid = filter.classid;
+
+      if (sourcePorts.isNone()) {
+        return Error("Missing source ports for filters on egress of " + eth0);
+      }
+
+      if (classid.isNone()) {
+        return Error("Missing classid for filters on egress of " + eth0);
+      }
+
+      if (flowIds.contains(sourcePorts.get())) {
+        return Error(
+          "Duplicated port range " + stringify(sourcePorts.get()) +
+          " detected on egress of " + eth0);
+      }
+
+      flowIds[sourcePorts.get()] = classid.get().secondary();
+    }
+  }
+
+  IntervalSet<uint16_t> nonEphemeralPorts;
+  IntervalSet<uint16_t> ephemeralPorts;
+  Option<uint16_t> flowId;
+
+  foreach (const ip::Classifier& classifier, vethIngressClassifiers.get()) {
+    const Option<PortRange> sourcePorts = classifier.sourcePorts;
+    const Option<PortRange> destinationPorts = classifier.destinationPorts;
+
+    // All the IP filters on veth used by us only have source ports.
+    if (sourcePorts.isNone() || destinationPorts.isSome()) {
+      return Error("Unexpected IP filter detected on " + veth(pid));
+    }
+
+    if (flowIds.contains(sourcePorts.get())) {
+      if (flowId.isNone()) {
+        flowId = flowIds.get(sourcePorts.get());
+      } else if (flowId != flowIds.get(sourcePorts.get())) {
+        return Error(
+            "A container is associated with multiple flows "
+            "on egress of " + eth0);
+      }
+    } else if (flowId.isSome()) {
+      // This is the case where some port range of a container is
+      // assigned to a flow while some isn't. This could happen if
+      // slave crashes while those filters are created. However, this
+      // is OK for us because packets by default go to the host flow.
+      LOG(WARNING) << "Container port range " << sourcePorts.get()
+                   << " does not have flow id " << flowId.get()
+                   << " assigned";
+    }
+
+    Interval<uint16_t> ports =
+      (Bound<uint16_t>::closed(sourcePorts.get().begin()),
+       Bound<uint16_t>::closed(sourcePorts.get().end()));
+
+    if (managedNonEphemeralPorts.contains(ports)) {
+      nonEphemeralPorts += ports;
+    } else if (ephemeralPortsAllocator->isManaged(ports)) {
+      // We have duplicate here because we have two IP filters with
+      // the same ephemeral port range (one for eth0 and one for lo).
+      // But we should never have two intersecting port ranges.
+      if (!ephemeralPorts.contains(ports) && ephemeralPorts.intersects(ports)) {
+        return Error("Unexpected intersected ephemeral port ranges");
+      }
+
+      ephemeralPorts += ports;
+    } else {
+      return Error("Unexpected IP filter detected on " + veth(pid));
+    }
+  }
+
+  Info* info = NULL;
+
+  if (ephemeralPorts.empty()) {
+    // NOTE: This is possible because the slave may crash while
+    // calling 'isolate()', leaving a partially isolated container. To
+    // clean up this partially isolated container, we still create an
+    // Info struct here and let the 'cleanup' function clean it up
+    // later.
+    LOG(WARNING) << "No ephemeral ports found for container with pid "
+                 << stringify(pid) << ". This could happen if slave crashes "
+                 << "while isolating a container";
+
+    info = new Info(nonEphemeralPorts, Interval<uint16_t>(), pid);
+  } else {
+    if (ephemeralPorts.intervalCount() != 1) {
+      return Error("Each container should have only one ephemeral port range");
+    }
+
+    // Tell the allocator that this ephemeral port range is used.
+    ephemeralPortsAllocator->allocate(*ephemeralPorts.begin());
+
+    info = new Info(nonEphemeralPorts, *ephemeralPorts.begin(), pid);
+
+    VLOG(1) << "Recovered network isolator for container with pid " << pid
+            << " non-ephemeral port ranges " << nonEphemeralPorts
+            << " and ephemeral port range " << *ephemeralPorts.begin();
+  }
+
+  if (flowId.isSome()) {
+    freeFlowIds.erase(flowId.get());
+    info->flowId = flowId.get();
+  }
+
+  return CHECK_NOTNULL(info);
+}
+
+
+Future<Option<ContainerPrepareInfo>> PortMappingIsolatorProcess::prepare(
+    const ContainerID& containerId,
+    const ExecutorInfo& executorInfo,
+    const string& directory,
+    const Option<string>& user)
+{
+  if (unmanaged.contains(containerId)) {
+    return Failure("Asked to prepare an unmanaged container");
+  }
+
+  if (infos.contains(containerId)) {
+    return Failure("Container has already been prepared");
+  }
+
+  Resources resources(executorInfo.resources());
+
+  IntervalSet<uint16_t> nonEphemeralPorts;
+
+  if (resources.ports().isSome()) {
+    nonEphemeralPorts = getIntervalSet(resources.ports().get());
+
+    // Sanity check to make sure that the assigned non-ephemeral ports
+    // for the container are part of the non-ephemeral ports specified
+    // by the slave.
+    if (!managedNonEphemeralPorts.contains(nonEphemeralPorts)) {
+        return Failure(
+            "Some non-ephemeral ports specified in " +
+            stringify(nonEphemeralPorts) +
+            " are not managed by the slave");
+    }
+  }
+
+  // TODO(jieyu): For now, we simply ignore the 'ephemeral_ports'
+  // specified in the executor info. However, this behavior needs to
+  // be changed once the master can make default allocations for
+  // ephemeral ports.
+  if (resources.ephemeral_ports().isSome()) {
+    LOG(WARNING) << "Ignoring the specified ephemeral_ports '"
+                 << resources.ephemeral_ports().get()
+                 << "' for container" << containerId
+                 << " of executor " << executorInfo.executor_id();
+  }
+
+  // Allocate the ephemeral ports used by this container.
+  Try<Interval<uint16_t>> ephemeralPorts = ephemeralPortsAllocator->allocate();
+  if (ephemeralPorts.isError()) {
+    return Failure(
+        "Failed to allocate ephemeral ports: " + ephemeralPorts.error());
+  }
+
+  infos[containerId] = new Info(nonEphemeralPorts, ephemeralPorts.get());
+
+  LOG(INFO) << "Using non-ephemeral ports " << nonEphemeralPorts
+            << " and ephemeral ports " << ephemeralPorts.get()
+            << " for container " << containerId << " of executor "
+            << executorInfo.executor_id();
+
+  ContainerPrepareInfo prepareInfo;
+  prepareInfo.add_commands()->set_value(scripts(infos[containerId]));
+
+  // NOTE: the port mapping isolator itself doesn't require mount
+  // namespace. However, if mount namespace is enabled because of
+  // other isolators, we need to set mount sharing accordingly for
+  // PORT_MAPPING_BIND_MOUNT_ROOT to avoid races described in
+  // MESOS-1558. So we turn on mount namespace here for consistency.
+  prepareInfo.set_namespaces(CLONE_NEWNET | CLONE_NEWNS);
+
+  return prepareInfo;
+}
+
+
+Future<Nothing> PortMappingIsolatorProcess::isolate(
+    const ContainerID& containerId,
+    pid_t pid)
+{
+  if (unmanaged.contains(containerId)) {
+    return Failure("Asked to isolate an unmanaged container");
+  }
+
+  if (!infos.contains(containerId)) {
+    return Failure("Unknown container");
+  }
+
+  Info* info = CHECK_NOTNULL(infos[containerId]);
+
+  if (info->pid.isSome()) {
+    return Failure("The container has already been isolated");
+  }
+
+  info->pid = pid;
+
+  if (flags.egress_unique_flow_per_container) {
+    info->flowId = getNextFlowId();
+  }
+
+  // Bind mount the network namespace handle of the process 'pid' to a
+  // directory to hold an extra reference to the network namespace
+  // which will be released in 'cleanup'. By holding the extra
+  // reference, the network namespace will not be destroyed even if
+  // the process 'pid' is gone, which allows us to explicitly control
+  // the network namespace life cycle.
+  const string source = path::join("/proc", stringify(pid), "ns", "net");
+  const string target = getNamespaceHandlePath(pid);
+
+  Try<Nothing> touch = os::touch(target);
+  if (touch.isError()) {
+    return Failure("Failed to create the bind mount point: " + touch.error());
+  }
+
+  Try<Nothing> mount = fs::mount(source, target, None(), MS_BIND, NULL);
+  if (mount.isError()) {
+    return Failure(
+        "Failed to mount the network namespace handle from '" +
+        source + "' to '" + target + "': " + mount.error());
+  }
+
+  LOG(INFO) << "Bind mounted '" << source << "' to '" << target
+            << "' for container " << containerId;
+
+  // Since 0.23.0, we create a symlink to the network namespace handle
+  // using the container ID. This serves two purposes. First, it
+  // allows us to recover the container ID later when slave restarts
+  // even if slave's checkpointed meta data is deleted. Second, it
+  // makes the debugging easier. See MESOS-2528 for details.
+  const string linker = getSymlinkPath(containerId);
+  Try<Nothing> symlink = ::fs::symlink(target, linker);
+  if (symlink.isError()) {
+    return Failure(
+        "Failed to symlink the network namespace handle '" +
+        linker + "' -> '" + target + "': " + symlink.error());
+  }
+
+  LOG(INFO) << "Created network namespace handle symlink '"
+            << linker << "' -> '" << target << "'";
+
+  // Create a virtual ethernet pair for this container.
+  Try<bool> createVethPair = link::create(veth(pid), eth0, pid);
+  if (createVethPair.isError()) {
+    return Failure(
+        "Failed to create virtual ethernet pair: " +
+        createVethPair.error());
+  }
+
+  // Disable IPv6 for veth as IPv6 packets won't be forwarded anyway.
+  const string disableIPv6 =
+    path::join("/proc/sys/net/ipv6/conf", veth(pid), "disable_ipv6");
+
+  if (os::exists(disableIPv6)) {
+    Try<Nothing> write = os::write(disableIPv6, "1");
+    if (write.isError()) {
+      return Failure(
+          "Failed to disable IPv6 for " + veth(pid) +
+          ": " + write.error());
+    }
+  }
+
+  // Sets the MAC address of veth to match the MAC address of the host
+  // public interface (eth0).
+  Try<bool> setVethMAC = link::setMAC(veth(pid), hostMAC);
+  if (setVethMAC.isError()) {
+    return Failure(
+        "Failed to set the MAC address of " + veth(pid) +
+        ": " + setVethMAC.error());
+  }
+
+  // Prepare the ingress queueing disciplines on veth.
+  Try<bool> createQdisc = ingress::create(veth(pid));
+  if (createQdisc.isError()) {
+    return Failure(
+        "Failed to create the ingress qdisc on " + veth(pid) +
+        ": " + createQdisc.error());
+  }
+
+  // Veth device should exist since we just created it.
+  CHECK(createQdisc.get());
+
+  // For each port range, add a set of IP packet filters to properly
+  // redirect IP traffic to/from containers.
+  foreach (const PortRange& range,
+           getPortRanges(info->nonEphemeralPorts + info->ephemeralPorts)) {
+    if (info->flowId.isSome()) {
+      LOG(INFO) << "Adding IP packet filters with ports " << range
+                << " with flow ID " << info->flowId.get()
+                << " for container " << containerId;
+    } else {
+      LOG(INFO) << "Adding IP packet filters with ports " << range
+                << " for container " << containerId;
+    }
+
+    Try<Nothing> add = addHostIPFilters(range, info->flowId, veth(pid));
+    if (add.isError()) {
+      return Failure(
+          "Failed to add IP packet filter with ports " +
+          stringify(range) + " for container with pid " +
+          stringify(pid) + ": " + add.error());
+    }
+  }
+
+  // Relay ICMP packets from veth of the container to host eth0.
+  Try<bool> icmpVethToEth0 = filter::icmp::create(
+      veth(pid),
+      ingress::HANDLE,
+      icmp::Classifier(None()),
+      Priority(ICMP_FILTER_PRIORITY, NORMAL),
+      action::Redirect(eth0));
+
+  if (icmpVethToEth0.isError()) {
+    ++metrics.adding_veth_icmp_filters_errors;
+
+    return Failure(
+        "Failed to create an ICMP packet filter from " + veth(pid) +
+        " to host " + eth0 + ": " + icmpVethToEth0.error());
+  } else if (!icmpVethToEth0.get()) {
+    ++metrics.adding_veth_icmp_filters_already_exist;
+
+    return Failure(
+        "The ICMP packet filter from " + veth(pid) +
+        " to host " + eth0 + " already exists");
+  }
+
+  // Relay ARP packets from veth of the container to host eth0.
+  Try<bool> arpVethToEth0 = filter::basic::create(
+      veth(pid),
+      ingress::HANDLE,
+      ETH_P_ARP,
+      Priority(ARP_FILTER_PRIORITY, NORMAL),
+      action::Redirect(eth0));
+
+  if (arpVethToEth0.isError()) {
+    ++metrics.adding_veth_arp_filters_errors;
+
+    return Failure(
+        "Failed to create an ARP packet filter from " + veth(pid) +
+        " to host " + eth0 + ": " + arpVethToEth0.error());
+  } else if (!arpVethToEth0.get()) {
+    ++metrics.adding_veth_arp_filters_already_exist;
+
+    return Failure(
+        "The ARP packet filter from " + veth(pid) +
+        " to host " + eth0 + " already exists");
+  }
+
+  // Setup filters for ICMP and ARP packets. We mirror ICMP and ARP
+  // packets from host eth0 to veths of all the containers. We also
+  // setup flow classifiers for host eth0 egress.
+  set<string> targets;
+  foreachvalue (Info* info, infos) {
+    if (info->pid.isSome()) {
+      targets.insert(veth(info->pid.get()));
+    }
+  }
+
+  if (targets.size() == 1) {
+    // We just create the first container in which case we should
+    // create filters for ICMP and ARP packets.
+
+    // Create a new ICMP filter on host eth0 ingress for mirroring
+    // packets from host eth0 to veth.
+    Try<bool> icmpEth0ToVeth = filter::icmp::create(
+        eth0,
+        ingress::HANDLE,
+        icmp::Classifier(hostIPNetwork.address()),
+        Priority(ICMP_FILTER_PRIORITY, NORMAL),
+        action::Mirror(targets));
+
+    if (icmpEth0ToVeth.isError()) {
+      ++metrics.adding_eth0_icmp_filters_errors;
+
+      return Failure(
+          "Failed to create an ICMP packet filter from host " + eth0 +
+          " to " + veth(pid) + ": " + icmpEth0ToVeth.error());
+    } else if (!icmpEth0ToVeth.get()) {
+      ++metrics.adding_eth0_icmp_filters_already_exist;
+
+      return Failure(
+          "The ICMP packet filter on host " + eth0 + " already exists");
+    }
+
+    // Create a new ARP filter on host eth0 ingress for mirroring
+    // packets from host eth0 to veth.
+    Try<bool> arpEth0ToVeth = filter::basic::create(
+        eth0,
+        ingress::HANDLE,
+        ETH_P_ARP,
+        Priority(ARP_FILTER_PRIORITY, NORMAL),
+        action::Mirror(targets));
+
+    if (arpEth0ToVeth.isError()) {
+      ++metrics.adding_eth0_arp_filters_errors;
+
+      return Failure(
+          "Failed to create an ARP packet filter from host " + eth0 +
+          " to " + veth(pid) + ": " + arpEth0ToVeth.error());
+    } else if (!arpEth0ToVeth.get()) {
+      ++metrics.adding_eth0_arp_filters_already_exist;
+
+      return Failure(
+          "The ARP packet filter on host " + eth0 + " already exists");
+    }
+
+    if (flags.egress_unique_flow_per_container) {
+      // Create a new ICMP filter on host eth0 egress for classifying
+      // packets into a reserved flow.
+      Try<bool> icmpEth0Egress = filter::icmp::create(
+          eth0,
+          HOST_TX_FQ_CODEL_HANDLE,
+          icmp::Classifier(None()),
+          Priority(ICMP_FILTER_PRIORITY, NORMAL),
+          Handle(HOST_TX_FQ_CODEL_HANDLE, ICMP_FLOWID));
+
+      if (icmpEth0Egress.isError()) {
+        ++metrics.adding_eth0_egress_filters_errors;
+
+        return Failure(
+            "Failed to create the ICMP flow classifier on host " +
+            eth0 + ": " + icmpEth0Egress.error());
+      } else if (!icmpEth0Egress.get()) {
+        ++metrics.adding_eth0_egress_filters_already_exist;
+
+        return Failure(
+            "The ICMP flow classifier on host " + eth0 + " already exists");
+      }
+
+      // Create a new ARP filter on host eth0 egress for classifying
+      // packets into a reserved flow.
+      Try<bool> arpEth0Egress = filter::basic::create(
+          eth0,
+          HOST_TX_FQ_CODEL_HANDLE,
+          ETH_P_ARP,
+          Priority(ARP_FILTER_PRIORITY, NORMAL),
+          Handle(HOST_TX_FQ_CODEL_HANDLE, ARP_FLOWID));
+
+      if (arpEth0Egress.isError()) {
+        ++metrics.adding_eth0_egress_filters_errors;
+
+        return Failure(
+            "Failed to create the ARP flow classifier on host " +
+            eth0 + ": " + arpEth0Egress.error());
+      } else if (!arpEth0Egress.get()) {
+        ++metrics.adding_eth0_egress_filters_already_exist;
+
+        return Failure(
+            "The ARP flow classifier on host " + eth0 + " already exists");
+      }
+
+      // Rest of the host packets go to a reserved flow.
+      Try<bool> defaultEth0Egress = filter::basic::create(
+          eth0,
+          HOST_TX_FQ_CODEL_HANDLE,
+          ETH_P_ALL,
+          Priority(DEFAULT_FILTER_PRIORITY, NORMAL),
+          Handle(HOST_TX_FQ_CODEL_HANDLE, HOST_FLOWID));
+
+      if (defaultEth0Egress.isError()) {
+        ++met

<TRUNCATED>