You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@mesos.apache.org by ji...@apache.org on 2014/06/26 00:08:23 UTC

[2/2] git commit: Added a network isolator based on port mapping.

Added a network isolator based on port mapping.

Added a network isolator using port-range based traffic redirection on
Linux.

- Containers are assigned non-ephemeral ports by the scheduler and
  ephemeral ports by the network isolator.

- Virtual ethernet devices and traffic control filters are set up so
  that network traffic in and out of the containers is isolated based on
  the ports assigned to them.

- Containers run inside their own network namespaces with separate
  network stacks, from which per-container network statistics can be
  retrieved.

A joint work with:
- Cong Wang (cwang@twopensource.com)
- Jie Yu (yujie.jay@gmail.com)
- Ian Downes (ian.downes@gmail.com)

Review: https://reviews.apache.org/r/21594


Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/e878c74f
Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/e878c74f
Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/e878c74f

Branch: refs/heads/master
Commit: e878c74fb6309366d26d9eee2fbccf16ac0617cf
Parents: 74e09a8
Author: Chi Zhang <ch...@gmail.com>
Authored: Wed Jun 25 15:06:22 2014 -0700
Committer: Jie Yu <yu...@gmail.com>
Committed: Wed Jun 25 15:06:22 2014 -0700

----------------------------------------------------------------------
 include/mesos/mesos.proto                       |   15 +-
 src/Makefile.am                                 |   11 +
 src/slave/constants.cpp                         |    4 +
 src/slave/constants.hpp                         |    9 +
 .../containerizer/isolators/network/helper.cpp  |   34 +
 .../isolators/network/port_mapping.cpp          | 2162 ++++++++++++++++++
 .../isolators/network/port_mapping.hpp          |  249 ++
 src/slave/containerizer/linux_launcher.cpp      |   16 +
 src/slave/containerizer/mesos/containerizer.cpp |   12 +-
 src/slave/flags.hpp                             |   40 +-
 src/slave/slave.cpp                             |   27 +-
 src/tests/environment.cpp                       |    8 +
 src/tests/isolator_tests.cpp                    |    1 +
 src/tests/mesos.cpp                             |    8 +
 14 files changed, 2586 insertions(+), 10 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/mesos/blob/e878c74f/include/mesos/mesos.proto
----------------------------------------------------------------------
diff --git a/include/mesos/mesos.proto b/include/mesos/mesos.proto
index c02b8ec..b41dc7f 100644
--- a/include/mesos/mesos.proto
+++ b/include/mesos/mesos.proto
@@ -292,6 +292,10 @@ message SlaveInfo {
   optional SlaveID id = 6;
   optional bool checkpoint = 7 [default = false];
 
+  // Resources that are managed locally by the slave, and not exposed
+  // to master and frameworks.
+  repeated Resource private_resources = 9;
+
   // Deprecated!
   optional string webui_hostname = 2;
   optional int32 webui_port = 4 [default = 8081];
@@ -404,10 +408,19 @@ message ResourceStatistics {
   optional uint64 mem_mapped_file_bytes = 12;
 
   // TODO(bmahler): Add disk usage.
-  // TODO(bmahler): Add network usage?
 
   // Perf statistics.
   optional PerfStatistics perf = 13;
+
+  // Network Usage Information:
+  optional uint64 net_rx_packets = 14;
+  optional uint64 net_rx_bytes = 15;
+  optional uint64 net_rx_errors = 16;
+  optional uint64 net_rx_dropped = 17;
+  optional uint64 net_tx_packets = 18;
+  optional uint64 net_tx_bytes = 19;
+  optional uint64 net_tx_errors = 20;
+  optional uint64 net_tx_dropped = 21;
 }
 
 

http://git-wip-us.apache.org/repos/asf/mesos/blob/e878c74f/src/Makefile.am
----------------------------------------------------------------------
diff --git a/src/Makefile.am b/src/Makefile.am
index fb3af9d..3c8b91a 100644
--- a/src/Makefile.am
+++ b/src/Makefile.am
@@ -337,6 +337,10 @@ if WITH_NETWORK_ISOLATOR
 	linux/routing/queueing/handle.hpp				\
 	linux/routing/queueing/ingress.hpp				\
 	linux/routing/queueing/internal.hpp
+
+  libmesos_no_3rdparty_la_SOURCES +=					\
+	slave/containerizer/isolators/network/port_mapping.cpp		\
+	slave/containerizer/isolators/network/port_mapping.hpp
 endif
 
 libmesos_no_3rdparty_la_SOURCES +=					\
@@ -569,6 +573,13 @@ mesos_containerizer_SOURCES = slave/containerizer/mesos/main.cpp
 mesos_containerizer_CPPFLAGS = $(MESOS_CPPFLAGS)
 mesos_containerizer_LDADD = libmesos.la
 
+if WITH_NETWORK_ISOLATOR
+pkglibexec_PROGRAMS += mesos-network-helper
+mesos_network_helper_SOURCES = slave/containerizer/isolators/network/helper.cpp
+mesos_network_helper_CPPFLAGS = $(MESOS_CPPFLAGS)
+mesos_network_helper_LDADD = libmesos.la
+endif
+
 pkglibexec_PROGRAMS += mesos-health-check
 mesos_health_check_SOURCES = health-check/main.cpp
 mesos_health_check_CPPFLAGS = $(MESOS_CPPFLAGS)

http://git-wip-us.apache.org/repos/asf/mesos/blob/e878c74f/src/slave/constants.cpp
----------------------------------------------------------------------
diff --git a/src/slave/constants.cpp b/src/slave/constants.cpp
index 51f65bb..a75b1ef 100644
--- a/src/slave/constants.cpp
+++ b/src/slave/constants.cpp
@@ -44,6 +44,10 @@ const double DEFAULT_CPUS = 1;
 const Bytes DEFAULT_MEM = Gigabytes(1);
 const Bytes DEFAULT_DISK = Gigabytes(10);
 const std::string DEFAULT_PORTS = "[31000-32000]";
+#ifdef WITH_NETWORK_ISOLATOR
+const uint16_t DEFAULT_EPHEMERAL_PORTS_PER_CONTAINER = 16;
+const std::string DEFAULT_EPHEMERAL_PORTS = "[30001-30999]";
+#endif
 
 } // namespace slave {
 } // namespace internal {

http://git-wip-us.apache.org/repos/asf/mesos/blob/e878c74f/src/slave/constants.hpp
----------------------------------------------------------------------
diff --git a/src/slave/constants.hpp b/src/slave/constants.hpp
index c65a62d..97dc1b3 100644
--- a/src/slave/constants.hpp
+++ b/src/slave/constants.hpp
@@ -86,6 +86,15 @@ const double DEFAULT_EXECUTOR_CPUS = 0.1;
 // Default memory resource given to a command executor.
 const Bytes DEFAULT_EXECUTOR_MEM = Megabytes(32);
 
+#ifdef WITH_NETWORK_ISOLATOR
+// Default number of ephemeral ports allocated to a container by the
+// network isolator.
+extern const uint16_t DEFAULT_EPHEMERAL_PORTS_PER_CONTAINER;
+
+// Default ephemeral port range reserved for the network isolator.
+extern const std::string DEFAULT_EPHEMERAL_PORTS;
+#endif
+
 } // namespace slave {
 } // namespace internal {
 } // namespace mesos {

http://git-wip-us.apache.org/repos/asf/mesos/blob/e878c74f/src/slave/containerizer/isolators/network/helper.cpp
----------------------------------------------------------------------
diff --git a/src/slave/containerizer/isolators/network/helper.cpp b/src/slave/containerizer/isolators/network/helper.cpp
new file mode 100644
index 0000000..6cbcb33
--- /dev/null
+++ b/src/slave/containerizer/isolators/network/helper.cpp
@@ -0,0 +1,34 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include <stout/none.hpp>
+#include <stout/subcommand.hpp>
+
+#include "slave/containerizer/isolators/network/port_mapping.hpp"
+
+using namespace mesos::internal::slave;
+
+
+int main(int argc, char** argv)
+{
+  return Subcommand::dispatch(
+      None(),
+      argc,
+      argv,
+      new PortMappingUpdate());
+}

http://git-wip-us.apache.org/repos/asf/mesos/blob/e878c74f/src/slave/containerizer/isolators/network/port_mapping.cpp
----------------------------------------------------------------------
diff --git a/src/slave/containerizer/isolators/network/port_mapping.cpp b/src/slave/containerizer/isolators/network/port_mapping.cpp
new file mode 100644
index 0000000..6f8dd17
--- /dev/null
+++ b/src/slave/containerizer/isolators/network/port_mapping.cpp
@@ -0,0 +1,2162 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include <limits.h>
+#include <string.h>
+#include <unistd.h>
+
+#include <iostream>
+#include <vector>
+
+#include <glog/logging.h>
+
+#include <mesos/mesos.hpp>
+
+#include <process/collect.hpp>
+#include <process/defer.hpp>
+#include <process/subprocess.hpp>
+
+#include <stout/error.hpp>
+#include <stout/foreach.hpp>
+#include <stout/hashset.hpp>
+#include <stout/json.hpp>
+#include <stout/lambda.hpp>
+#include <stout/os.hpp>
+#include <stout/option.hpp>
+#include <stout/protobuf.hpp>
+#include <stout/result.hpp>
+#include <stout/stringify.hpp>
+
+#include <stout/os/exists.hpp>
+#include <stout/os/setns.hpp>
+
+#include "linux/fs.hpp"
+
+#include "linux/routing/route.hpp"
+#include "linux/routing/utils.hpp"
+
+#include "linux/routing/filter/arp.hpp"
+#include "linux/routing/filter/icmp.hpp"
+#include "linux/routing/filter/ip.hpp"
+
+#include "linux/routing/link/link.hpp"
+
+#include "linux/routing/queueing/ingress.hpp"
+
+#include "mesos/resources.hpp"
+
+#include "slave/constants.hpp"
+
+#include "slave/containerizer/isolators/network/port_mapping.hpp"
+
+using namespace mesos::internal;
+
+using namespace process;
+
+using namespace routing;
+using namespace routing::filter;
+using namespace routing::queueing;
+
+using std::cerr;
+using std::dec;
+using std::endl;
+using std::hex;
+using std::list;
+using std::ostringstream;
+using std::set;
+using std::string;
+using std::vector;
+
+using filter::ip::PortRange;
+
+namespace mesos {
+namespace internal {
+namespace slave {
+
+// The root directory where we bind mount all the namespace handles.
+// We choose the directory '/var/run/netns' so that we can use
+// iproute2 suite (e.g., ip netns show/exec) to inspect or enter the
+// network namespace. This is very useful for debugging purposes.
+static const string BIND_MOUNT_ROOT = "/var/run/netns";
+
+
+// The minimum number of ephemeral ports a container should have.
+static const uint16_t MIN_EPHEMERAL_PORTS_SIZE =
+    DEFAULT_EPHEMERAL_PORTS_PER_CONTAINER;
+
+
+// The primary priority used by each type of filter.
+static const uint8_t ARP_FILTER_PRIORITY = 1;
+static const uint8_t ICMP_FILTER_PRIORITY = 2;
+static const uint8_t IP_FILTER_PRIORITY = 3;
+
+
+// The secondary priorities used by filters.
+static const uint8_t HIGH = 1;
+static const uint8_t NORMAL = 2;
+static const uint8_t LOW = 3;
+
+
+// The loopback IP reserved by IPv4 standard.
+// TODO(jieyu): Support IP filters for the entire subnet.
+static net::IP LOOPBACK_IP = net::IP::fromDotDecimal("127.0.0.1/8").get();
+
+
+// The well known ports. Used for sanity check.
+static const Interval<uint16_t> WELL_KNOWN_PORTS =
+  (Bound<uint16_t>::closed(0), Bound<uint16_t>::open(1024));
+
+
+/////////////////////////////////////////////////
+// Helper functions for the isolator.
+/////////////////////////////////////////////////
+
+static Future<Nothing> _nothing() { return Nothing(); }
+
+
+// Given an integer x, find the largest integer t such that t <= x and
+// t is aligned to power of 2.
+static uint32_t roundDownToPowerOfTwo(uint32_t x)
+{
+  // Mutate x from 00001XXX to 0x00001111.
+
+  // We know the MSB has to be a 1, so kill the LSB and make sure the
+  // first 2 most significant bits are 1s.
+  x = x | (x >> 1);
+
+  // Now that the 2 most significant bits are 1s, make sure the first
+  // 4 most significant bits are 1s, too.
+  x = x | (x >> 2);
+
+  // We keep going. Note that the 0s left to the MSB are never turned
+  // to 1s.
+  x = x | (x >> 4);
+  x = x | (x >> 8);
+
+  // Now we have covered all 32 bits.
+  x = x | (x >> 16);
+
+  // 0x00001111 - (0x00001111 >> 1)
+  return x - (x >> 1);
+}
+
+
+// Returns the name of the host end of the virtual ethernet pair for a
+// given container. The kernel restricts link name to 16 characters or
+// less, so we cannot put container ID into the device name. Instead,
+// we use the pid of the executor process forked by the slave to
+// uniquely name the device for each container. It's safe because we
+// cannot have two active containers having the same pid for the
+// executor process.
+static string veth(pid_t pid)
+{
+  return "veth" + stringify(pid);
+}
+
+
+// Extracts the pid from the given veth name.
+static Option<pid_t> getPid(string veth)
+{
+  if (strings::startsWith(veth, "veth")) {
+    Try<pid_t> pid = numify<pid_t>(veth.substr(strlen("veth")));
+    if (pid.isSome()) {
+      return pid.get();
+    }
+  }
+
+  return None();
+}
+
+
+// Converts from value ranges to interval set.
+static IntervalSet<uint16_t> getIntervalSet(const Value::Ranges& ranges)
+{
+  IntervalSet<uint16_t> set;
+
+  for (int i = 0; i < ranges.range_size(); i++) {
+    set += (Bound<uint16_t>::closed(ranges.range(i).begin()),
+            Bound<uint16_t>::closed(ranges.range(i).end()));
+  }
+
+  return set;
+}
+
+
+// Helper function to set up IP filters on the host side for a given
+// port range.
+static Try<Nothing> addHostIPFilters(
+    const PortRange& range,
+    const string& eth0,
+    const string& lo,
+    const string& veth,
+    const net::MAC& hostMAC,
+    const net::IP& hostIP)
+{
+  // Add an IP packet filter from host eth0 to veth of the container
+  // such that any incoming IP packet will be properly redirected to
+  // the corresponding container based on its destination port.
+  Try<bool> hostEth0ToVeth = filter::ip::create(
+      eth0,
+      ingress::HANDLE,
+      ip::Classifier(hostMAC, net::IP(hostIP.address()), None(), range),
+      Priority(IP_FILTER_PRIORITY, NORMAL),
+      action::Redirect(veth));
+
+  if (hostEth0ToVeth.isError()) {
+    return Error(
+        "Failed to create an IP packet filter from host " +
+        eth0 + " to " + veth + ": " + hostEth0ToVeth.error());
+  } else if (!hostEth0ToVeth.get()) {
+    return Error(
+        "The IP packet filter from host " + eth0 + " to " +
+        veth + " already exists");
+  }
+
+  // Add an IP packet filter from host lo to veth of the container
+  // such that any internally generated IP packet will be properly
+  // redirected to the corresponding container based on its
+  // destination port.
+  Try<bool> hostLoToVeth = filter::ip::create(
+      lo,
+      ingress::HANDLE,
+      ip::Classifier(None(), None(), None(), range),
+      Priority(IP_FILTER_PRIORITY, NORMAL),
+      action::Redirect(veth));
+
+  if (hostLoToVeth.isError()) {
+    return Error(
+        "Failed to create an IP packet filter from host " +
+        lo + " to " + veth + ": " + hostLoToVeth.error());
+  } else if (!hostLoToVeth.get()) {
+    return Error(
+        "The IP packet filter from host " + lo + " to " +
+        veth + " already exists");
+  }
+
+  // Add two IP packet filters (one for public IP and one for loopback
+  // IP) from veth of the container to host lo to properly redirect IP
+  // packets sent from one container to either the host or another
+  // container. Notice that here we also check the source port of a
+  // packet. If the source port is not within the port ranges
+  // allocated for the container, the packet will get dropped.
+  Try<bool> vethToHostLoPublic = filter::ip::create(
+      veth,
+      ingress::HANDLE,
+      ip::Classifier(None(), net::IP(hostIP.address()), range, None()),
+      Priority(IP_FILTER_PRIORITY, NORMAL),
+      action::Redirect(lo));
+
+  if (vethToHostLoPublic.isError()) {
+    return Error(
+        "Failed to create an IP packet filter (for public IP) from " +
+        veth + " to host " + lo + ": " + vethToHostLoPublic.error());
+  } else if (!vethToHostLoPublic.get()) {
+    return Error(
+        "The IP packet filter (for public IP) from " +
+        veth + " to host " + lo + " already exists");
+  }
+
+  Try<bool> vethToHostLoLoopback = filter::ip::create(
+      veth,
+      ingress::HANDLE,
+      ip::Classifier(None(), net::IP(LOOPBACK_IP.address()), range, None()),
+      Priority(IP_FILTER_PRIORITY, NORMAL),
+      action::Redirect(lo));
+
+  if (vethToHostLoLoopback.isError()) {
+    return Error(
+        "Failed to create an IP packet filter (for loopback IP) from " +
+        veth + " to host " + lo + ": " + vethToHostLoLoopback.error());
+  } else if (!vethToHostLoLoopback.get()) {
+    return Error(
+        "The IP packet filter (for loopback IP) from " +
+        veth + " to host " + lo + " already exists");
+  }
+
+  // Add an IP packet filter from veth of the container to host eth0
+  // to properly redirect IP packets sent from one container to
+  // external hosts. This filter has a lower priority compared to the
+  // 'vethToHostLo' filter because it does not check the destination
+  // IP. Notice that here we also check the source port of a packet.
+  // If the source port is not within the port ranges allocated for
+  // the container, the packet will get dropped.
+  Try<bool> vethToHostEth0 = filter::ip::create(
+      veth,
+      ingress::HANDLE,
+      ip::Classifier(None(), None(), range, None()),
+      Priority(IP_FILTER_PRIORITY, LOW),
+      action::Redirect(eth0));
+
+  if (vethToHostEth0.isError()) {
+    return Error(
+        "Failed to create an IP packet filter from " + veth +
+        " to host " + eth0 + ": " + vethToHostEth0.error());
+  } else if (!vethToHostEth0.get()) {
+    return Error(
+        "The IP packet filter from " + veth +
+        " to host " + eth0 + " already exists");
+  }
+
+  return Nothing();
+}
+
+
+// Helper function to remove IP filters from the host side for a given
+// port range.
+static Try<Nothing> removeHostIPFilters(
+    const PortRange& range,
+    const string& eth0,
+    const string& lo,
+    const string& veth,
+    const net::MAC& hostMAC,
+    const net::IP& hostIP)
+{
+  // Remove the IP packet filter from host eth0 to veth of the container
+  Try<bool> hostEth0ToVeth = filter::ip::remove(
+      eth0,
+      ingress::HANDLE,
+      ip::Classifier(hostMAC, net::IP(hostIP.address()), None(), range));
+
+  if (hostEth0ToVeth.isError()) {
+    return Error(
+        "Failed to remove the IP packet filter from host " +
+        eth0 + " to " + veth + ": " + hostEth0ToVeth.error());
+  } else if (!hostEth0ToVeth.get()) {
+    LOG(ERROR) << "The IP packet filter from host " << eth0
+               << " to " << veth << " does not exist";
+  }
+
+  // Remove the IP packet filter from host lo to veth of the container
+  Try<bool> hostLoToVeth = filter::ip::remove(
+      lo,
+      ingress::HANDLE,
+      ip::Classifier(None(), None(), None(), range));
+
+  if (hostLoToVeth.isError()) {
+    return Error(
+        "Failed to remove the IP packet filter from host " +
+        lo + " to " + veth + ": " + hostLoToVeth.error());
+  } else if (!hostLoToVeth.get()) {
+    LOG(ERROR) << "The IP packet filter from host " << lo
+               << " to " << veth << " does not exist";
+  }
+
+  // Remove the IP packet filter from veth of the container to
+  // host lo for the public IP.
+  Try<bool> vethToHostLoPublic = filter::ip::remove(
+      veth,
+      ingress::HANDLE,
+      ip::Classifier(None(), net::IP(hostIP.address()), range, None()));
+
+  if (vethToHostLoPublic.isError()) {
+    return Error(
+        "Failed to remove the IP packet filter (for public IP) from " +
+        veth + " to host " + lo + ": " + vethToHostLoPublic.error());
+  } else if (!vethToHostLoPublic.get()) {
+    LOG(ERROR) << "The IP packet filter (for public IP) from "
+               << veth << " to host " << lo << " does not exist";
+  }
+
+  // Remove the IP packet filter from veth of the container to
+  // host lo for the loopback IP.
+  Try<bool> vethToHostLoLoopback = filter::ip::remove(
+      veth,
+      ingress::HANDLE,
+      ip::Classifier(None(), net::IP(LOOPBACK_IP.address()), range, None()));
+
+  if (vethToHostLoLoopback.isError()) {
+    return Error(
+        "Failed to remove the IP packet filter (for loopback IP) from " +
+        veth + " to host " + lo + ": " + vethToHostLoLoopback.error());
+  } else if (!vethToHostLoLoopback.get()) {
+    LOG(ERROR) << "The IP packet filter (for loopback IP) from "
+               << veth << " to host " << lo << " does not exist";
+  }
+
+  // Remove the IP packet filter from veth of the container to
+  // host eth0.
+  Try<bool> vethToHostEth0 = filter::ip::remove(
+      veth,
+      ingress::HANDLE,
+      ip::Classifier(None(), None(), range, None()));
+
+  if (vethToHostEth0.isError()) {
+    return Error(
+        "Failed to remove the IP packet filter from " + veth +
+        " to host " + eth0 + ": " + vethToHostEth0.error());
+  } else if (!vethToHostEth0.get()) {
+    LOG(ERROR) << "The IP packet filter from " << veth
+               << " to host " << eth0 << " does not exist";
+  }
+
+  return Nothing();
+}
+
+
+// Helper function to set up IP filters inside the container for a
+// given port range.
+static Try<Nothing> addContainerIPFilters(
+    const PortRange& range,
+    const string& eth0,
+    const string& lo)
+{
+  // Add an IP packet filter on lo such that local traffic inside a
+  // container will not be redirected to eth0.
+  Try<bool> loTerminal = filter::ip::create(
+      lo,
+      ingress::HANDLE,
+      ip::Classifier(None(), None(), None(), range),
+      Priority(IP_FILTER_PRIORITY, HIGH),
+      action::Terminal());
+
+  if (loTerminal.isError()) {
+    return Error(
+        "Failed to create an IP packet filter on " + lo +
+        " which stops packets from being sent to " + eth0 +
+        ": " + loTerminal.error());
+  } else if (!loTerminal.get()) {
+    return Error(
+        "The IP packet filter on " + lo +
+        " which stops packets from being sent to " +
+        eth0 + " already exists");
+  }
+
+  // Add an IP packet filter (for loopback IP) from eth0 to lo to
+  // redirect all loopback IP traffic to lo.
+  Try<bool> eth0ToLoLoopback = filter::ip::create(
+      eth0,
+      ingress::HANDLE,
+      ip::Classifier(None(), net::IP(LOOPBACK_IP.address()), None(), range),
+      Priority(IP_FILTER_PRIORITY, NORMAL),
+      action::Redirect(lo));
+
+  if (eth0ToLoLoopback.isError()) {
+    return Error(
+        "Failed to create an IP packet filter (for loopback IP) from " +
+        eth0 + " to " + lo + ": " + eth0ToLoLoopback.error());
+  } else if (!eth0ToLoLoopback.get()) {
+    return Error(
+        "The IP packet filter (for loopback IP) from " +
+        eth0 + " to " + lo + " already exists");
+  }
+
+  return Nothing();
+}
+
+
+// Helper function to remove IP filters inside the container for a
+// given port range.
+static Try<Nothing> removeContainerIPFilters(
+    const PortRange& range,
+    const string& eth0,
+    const string& lo)
+{
+  // Remove the 'terminal' IP packet filter on lo.
+  Try<bool> loTerminal = filter::ip::remove(
+      lo,
+      ingress::HANDLE,
+      ip::Classifier(None(), None(), None(), range));
+
+  if (loTerminal.isError()) {
+    return Error(
+        "Failed to remove the IP packet filter on " + lo +
+        " which stops packets from being sent to " + eth0 +
+        ": " + loTerminal.error());
+  } else if (!loTerminal.get()) {
+    return Error(
+        "The IP packet filter on " + lo +
+        " which stops packets from being sent to " + eth0 +
+        " does not exist");
+  }
+
+  // Remove the IP packet filter (for loopback IP) from eth0 to lo.
+  Try<bool> eth0ToLoLoopback = filter::ip::remove(
+      eth0,
+      ingress::HANDLE,
+      ip::Classifier(None(), net::IP(LOOPBACK_IP.address()), None(), range));
+
+  if (eth0ToLoLoopback.isError()) {
+    return Error(
+        "Failed to remove the IP packet filter (for loopback IP) from " +
+        eth0 + " to " + lo + ": " + eth0ToLoLoopback.error());
+  } else if (!eth0ToLoLoopback.get()) {
+    return Error(
+        "The IP packet filter (for loopback IP) from " +
+        eth0 + " to " + lo + " does not exist");
+  }
+
+  return Nothing();
+}
+
+/////////////////////////////////////////////////
+// Implementation for PortMappingUpdate.
+/////////////////////////////////////////////////
+
+const std::string PortMappingUpdate::NAME = "update";
+
+
+PortMappingUpdate::Flags::Flags()
+{
+  add(&help,
+      "help",
+      "Prints this help message",
+      false);
+
+  add(&eth0_name,
+      "eth0_name",
+      "The name of the public network interface (e.g., eth0)");
+
+  add(&lo_name,
+      "lo_name",
+      "The name of the loopback network interface (e.g., lo)");
+
+  add(&pid,
+      "pid",
+      "The pid of the process whose namespaces we will enter");
+
+  add(&ports_to_add,
+      "ports_to_add",
+      "A collection of port ranges (formatted as a JSON object)\n"
+      "for which to add IP filters. E.g.,\n"
+      "--ports_to_add={\"range\":[{\"begin\":4,\"end\":8}]}");
+
+  add(&ports_to_remove,
+      "ports_to_remove",
+      "A collection of port ranges (formatted as a JSON object)\n"
+      "for which to remove IP filters. E.g.,\n"
+      "--ports_to_remove={\"range\":[{\"begin\":4,\"end\":8}]}");
+}
+
+
+// The following two helper functions allow us to convert from a
+// collection of port ranges to a JSON object and vice versa. They
+// will be used for the port mapping update operation.
+template <typename Iterable>
+JSON::Object json(const Iterable& ranges)
+{
+  Value::Ranges values;
+  foreach (const PortRange& range, ranges) {
+    Value::Range value;
+    value.set_begin(range.begin());
+    value.set_end(range.end());
+
+    values.add_range()->CopyFrom(value);
+  }
+  return JSON::Protobuf(values);
+}
+
+
+static Try<vector<PortRange> > parse(const JSON::Object& object)
+{
+  Try<Value::Ranges> parsing = protobuf::parse<Value::Ranges>(object);
+  if (parsing.isError()) {
+    return Error("Failed to parse JSON: " + parsing.error());
+  }
+
+  vector<PortRange> ranges;
+  Value::Ranges values = parsing.get();
+  for (int i = 0; i < values.range_size(); i++) {
+    const Value::Range& value = values.range(i);
+    Try<PortRange> range = PortRange::fromBeginEnd(value.begin(), value.end());
+    if (range.isError()) {
+      return Error("Invalid port range: " + range.error());
+    }
+
+    ranges.push_back(range.get());
+  }
+  return ranges;
+}
+
+
+int PortMappingUpdate::execute()
+{
+  if (flags.help) {
+    cerr << "Usage: " << name() << " [OPTIONS]" << endl << endl
+         << "Supported options:" << endl
+         << flags.usage();
+    return 0;
+  }
+
+  if (flags.eth0_name.isNone()) {
+    cerr << "The public interface name (e.g., eth0) is not specified" << endl;
+    return 1;
+  }
+
+  if (flags.lo_name.isNone()) {
+    cerr << "The loopback interface name (e.g., lo) is not specified" << endl;
+    return 1;
+  }
+
+  if (flags.pid.isNone()) {
+    cerr << "The pid is not specified" << endl;
+    return 1;
+  }
+
+  if (flags.ports_to_add.isNone() && flags.ports_to_remove.isNone()) {
+    cerr << "Nothing to update" << endl;
+    return 1;
+  }
+
+  Option<vector<PortRange> > portsToAdd;
+  Option<vector<PortRange> > portsToRemove;
+
+  if (flags.ports_to_add.isSome()) {
+    Try<vector<PortRange> > parsing = parse(flags.ports_to_add.get());
+    if (parsing.isError()) {
+      cerr << "Parsing 'ports_to_add' failed: " << parsing.error() << endl;
+      return 1;
+    }
+    portsToAdd = parsing.get();
+  }
+
+  if (flags.ports_to_remove.isSome()) {
+    Try<vector<PortRange> > parsing = parse(flags.ports_to_remove.get());
+    if (parsing.isError()) {
+      cerr << "Parsing 'ports_to_remove' failed: " << parsing.error() << endl;
+      return 1;
+    }
+    portsToRemove = parsing.get();
+  }
+
+  // Enter the network namespace.
+  Try<Nothing> setns = os::setns(flags.pid.get(), "net");
+  if (setns.isError()) {
+    cerr << "Failed to enter the network namespace of pid " << flags.pid.get()
+         << ": " << setns.error() << endl;
+    return 1;
+  }
+
+  // Update IP packet filters.
+  const string eth0 = flags.eth0_name.get();
+  const string lo = flags.lo_name.get();
+
+  if (portsToAdd.isSome()) {
+    foreach (const PortRange& range, portsToAdd.get()) {
+      Try<Nothing> add = addContainerIPFilters(range, eth0, lo);
+      if (add.isError()) {
+        cerr << "Failed to add IP filters: " << add.error() << endl;
+        return 1;
+      }
+    }
+  }
+
+  if (portsToRemove.isSome()) {
+    foreach (const PortRange& range, portsToRemove.get()) {
+      Try<Nothing> remove = removeContainerIPFilters(range, eth0, lo);
+      if (remove.isError()) {
+        cerr << "Failed to remove IP filters: " << remove.error() << endl;
+        return 1;
+      }
+    }
+  }
+
+  return 0;
+}
+
+/////////////////////////////////////////////////
+// Implementation for the isolator.
+/////////////////////////////////////////////////
+
+Try<Isolator*> PortMappingIsolatorProcess::create(const Flags& flags)
+{
+  // Check for root permission.
+  if (geteuid() != 0) {
+    return Error("Using network isolator requires root permissions");
+  }
+
+  // Verify that the network namespace is available by checking the
+  // existence of the network namespace handle of the current process.
+  if (os::namespaces().count("net") == 0) {
+    return Error(
+        "Using network isolator requires network namespace. "
+        "Make sure your kernel is newer than 3.4");
+  }
+
+  // Check the routing library.
+  Try<Nothing> check = routing::check();
+  if (check.isError()) {
+    return Error(
+        "Routing library check failed: " +
+        check.error());
+  }
+
+  // Check the availability of a few Linux commands that we will use.
+  // We use the blocking os::shell here because 'create' will only be
+  // invoked during initialization.
+  Try<int> checkCommandTc = os::shell(NULL, "tc filter show");
+  if (checkCommandTc.isError()) {
+    return Error("Check command 'tc' failed: " + checkCommandTc.error());
+  } else if (checkCommandTc.get() != 0) {
+    return Error(
+        "Check command 'tc' failed: non-zero exit code: " +
+        checkCommandTc.get());
+  }
+
+  Try<int> checkCommandIp = os::shell(NULL, "ip link show");
+  if (checkCommandIp.isError()) {
+    return Error("Check command 'ip' failed: " + checkCommandIp.error());
+  } else if (checkCommandIp.get() != 0) {
+    return Error(
+        "Check command 'ip' failed: non-zero exit code: " +
+        checkCommandIp.get());
+  }
+
+  // Get 'ports' resource from 'resources' flag. These ports will be
+  // treated as non-ephemeral ports.
+  IntervalSet<uint16_t> nonEphemeralPorts;
+
+  Try<Resources> resources = Resources::parse(
+      flags.resources.get(""),
+      flags.default_role);
+
+  if (resources.isError()) {
+    return Error("Failed to parse --resources: " + resources.error());
+  }
+
+  if (resources.get().ports().isSome()) {
+    nonEphemeralPorts = getIntervalSet(resources.get().ports().get());
+  }
+
+  // Get 'ports' resource from 'private_resources' flag. These ports
+  // will be allocated to each container as ephemeral ports.
+  IntervalSet<uint16_t> ephemeralPorts;
+
+  resources = Resources::parse(
+      flags.private_resources.get(""),
+      flags.default_role);
+
+  if (resources.isError()) {
+    return Error("Failed to parse --private_resources: " + resources.error());
+  }
+
+  if (resources.get().ports().isSome()) {
+    ephemeralPorts = getIntervalSet(resources.get().ports().get());
+  }
+
+  // Each container requires at least one ephemeral port for slave
+  // executor communication. If no 'ports' resource is found, we will
+  // return error.
+  if (ephemeralPorts.empty()) {
+    return Error("Local resources do not contain ports");
+  }
+
+  // Sanity check to make sure that the ephemeral ports specified do
+  // not intersect with the specified non-ephemeral ports.
+  if (ephemeralPorts.intersects(nonEphemeralPorts)) {
+    return Error(
+        "The specified ephemeral ports " + stringify(ephemeralPorts) +
+        " intersect with the specified non-ephemeral ports " +
+        stringify(nonEphemeralPorts));
+  }
+
+  // This is a sanity check to make sure that the ephemeral ports
+  // specified do not intersect with the well known ports.
+  if (ephemeralPorts.intersects(WELL_KNOWN_PORTS)) {
+    return Error(
+        "The specified ephemeral ports " + stringify(ephemeralPorts) +
+        " intersect with well known ports " + stringify(WELL_KNOWN_PORTS));
+  }
+
+  // Obtain the host ephemeral port range by reading the proc file
+  // system ('ip_local_port_range').
+  Try<string> value = os::read("/proc/sys/net/ipv4/ip_local_port_range");
+  if (value.isError()) {
+    return Error("Failed to read host ip_local_port_range: " + value.error());
+  }
+
+  vector<string> split = strings::split(strings::trim(value.get()), "\t");
+  if (split.size() != 2) {
+    return Error(
+        "Unexpected format from host ip_local_port_range: " + value.get());
+  }
+
+  Try<uint16_t> begin = numify<uint16_t>(split[0]);
+  if (begin.isError()) {
+    return Error(
+        "Failed to parse the begin of host ip_local_port_range: " + split[0]);
+  }
+
+  Try<uint16_t> end = numify<uint16_t>(split[1]);
+  if (end.isError()) {
+    return Error(
+        "Failed to parse the end of host ip_local_port_range: " + split[1]);
+  }
+
+  Interval<uint16_t> hostEphemeralPorts =
+    (Bound<uint16_t>::closed(begin.get()),
+     Bound<uint16_t>::closed(end.get()));
+
+  // Sanity check to make sure the specified ephemeral ports do not
+  // intersect with the ephemeral ports used by the host.
+  if (ephemeralPorts.intersects(hostEphemeralPorts)) {
+    return Error(
+        "The specified ephemeral ports " + stringify(ephemeralPorts) +
+        " intersect with the ephemeral ports used by the host " +
+        stringify(hostEphemeralPorts));
+  }
+
+  // TODO(chzhcn): Cross check ephemeral ports with used ports on the
+  // host (e.g., using port scan).
+
+  // Initialize the ephemeral ports allocator.
+
+  // In theory, any positive integer can be broken up into a few
+  // numbers that are power of 2 aligned. We choose to not allow this
+  // for now so that each container has a fixed (one) number of
+  // filters for ephemeral ports. This makes it easy to debug and
+  // infer performance.
+  if (roundDownToPowerOfTwo(flags.ephemeral_ports_per_container) !=
+      flags.ephemeral_ports_per_container) {
+    return Error(
+        "The number of ephemeral ports for each container (" +
+        stringify(flags.ephemeral_ports_per_container) +
+        ") is not a power of 2");
+  }
+
+  if (ephemeralPorts.size() < flags.ephemeral_ports_per_container) {
+    return Error(
+        "Network Isolator is given ephemeral ports of size: " +
+        stringify(ephemeralPorts.size()) + ", but asked to allocate " +
+        stringify(flags.ephemeral_ports_per_container) +
+        " ephemeral ports for a container");
+  }
+
+  if (flags.ephemeral_ports_per_container < MIN_EPHEMERAL_PORTS_SIZE) {
+    return Error(
+        "Each container has only " +
+        stringify(flags.ephemeral_ports_per_container) +
+        " ephemeral ports. The minimum required is: " +
+        stringify(MIN_EPHEMERAL_PORTS_SIZE));
+  }
+
+  Owned<EphemeralPortsAllocator> ephemeralPortsAllocator(
+      new EphemeralPortsAllocator(
+        ephemeralPorts,
+        flags.ephemeral_ports_per_container));
+
+  // Get the name of the public interface (e.g., eth0). If it is not
+  // specified, try to derive its name from the routing library.
+  Result<string> eth0 = link::eth0();
+  if (flags.eth0_name.isSome()) {
+    eth0 = flags.eth0_name.get();
+
+    // Check if the given public interface exists.
+    Try<bool> hostEth0Exists = link::exists(eth0.get());
+    if (hostEth0Exists.isError()) {
+      return Error(
+          "Failed to check if " + eth0.get() + " exists: " +
+          hostEth0Exists.error());
+    } else if (!hostEth0Exists.get()) {
+      return Error("The public interface " + eth0.get() + " does not exist");
+    }
+  } else if (!eth0.isSome()){
+    // eth0 is not specified in the flag and we did not get a valid
+    // eth0 from the library.
+    return Error(
+        "Network Isolator failed to find a public interface: " + eth0.error());
+  }
+
+  LOG(INFO) << "Using " << eth0.get() << " as the public interface";
+
+  // Get the name of the loopback interface. If it is not specified,
+  // try to derive its name based on the loopback IP address.
+  Result<string> lo = link::lo();
+  // Option<string> lo = flags.lo_name;
+  if (flags.lo_name.isSome()) {
+    lo = flags.lo_name;
+
+    // Check if the given loopback interface exists.
+    Try<bool> hostLoExists = link::exists(lo.get());
+    if (hostLoExists.isError()) {
+      return Error(
+          "Failed to check if " + lo.get() + " exists: " +
+          hostLoExists.error());
+    } else if (!hostLoExists.get()) {
+      return Error("The loopback interface " + lo.get() + " does not exist");
+    }
+  } else if (!lo.isSome()) {
+    // lo is not specified in the flag and we did not get a valid
+    // lo from the library.
+    return Error(
+        "Network Isolator failed to find a loopback interface: " + lo.error());
+  }
+
+  LOG(INFO) << "Using " << lo.get() << " as the loopback interface";
+
+  // Get the host IP, MAC and default gateway.
+  Result<net::IP> hostIP = net::ip(eth0.get());
+  if (!hostIP.isSome()) {
+    return Error(
+        "Failed to get the public IP of " + eth0.get() + ": " +
+        (hostIP.isError() ? hostIP.error() : "does not have an IPv4 address"));
+  }
+
+  Result<net::MAC> hostMAC = net::mac(eth0.get());
+  if (!hostMAC.isSome()) {
+    return Error(
+        "Failed to get the MAC address of " + eth0.get() + ": " +
+        (hostMAC.isError() ? hostMAC.error() : "does not have a MAC address"));
+  }
+
+  Result<net::IP> hostDefaultGateway = route::defaultGateway();
+  if (!hostDefaultGateway.isSome()) {
+    return Error(
+        "Failed to get the default gateway of the host: " +
+        (hostDefaultGateway.isError() ? hostDefaultGateway.error()
+        : "The default gateway of the host does not exist"));
+  }
+
+  // Set the MAC address of the host loopback interface (lo) so that
+  // it matches that of the host public interface (eth0).  A fairly
+  // recent kernel patch is needed for this operation to succeed:
+  // https://git.kernel.org/cgit/linux/kernel/git/davem/net.git/:
+  // 25f929fbff0d1bcebf2e92656d33025cd330cbf8
+  Try<bool> setHostLoMAC = link::setMAC(lo.get(), hostMAC.get());
+  if (setHostLoMAC.isError()) {
+    return Error(
+        "Failed to set the MAC address of " + lo.get() +
+        ": " + setHostLoMAC.error());
+  }
+
+  // Set the MTU of the host loopback interface (lo) so that it
+  // matches that of the host public interface (eth0).
+  Result<unsigned int> hostEth0MTU = link::mtu(eth0.get());
+  if (hostEth0MTU.isError()) {
+    return Error(
+        "Failed to get the MTU of " + eth0.get() +
+        ": " + hostEth0MTU.error());
+  }
+
+  // The host public interface should exist since we just checked it.
+  CHECK_SOME(hostEth0MTU);
+
+  Try<bool> setHostLoMTU = link::setMTU(lo.get(), hostEth0MTU.get());
+  if (setHostLoMTU.isError()) {
+    return Error(
+        "Failed to set the MTU of " + lo.get() +
+        ": " + setHostLoMTU.error());
+  }
+
+  // Prepare the ingress queueing disciplines on host public interface
+  // (eth0) and host loopback interface (lo).
+  Try<bool> createHostEth0Qdisc = ingress::create(eth0.get());
+  if (createHostEth0Qdisc.isError()) {
+    return Error(
+        "Failed to create the ingress qdisc on " + eth0.get() +
+        ": " + createHostEth0Qdisc.error());
+  }
+
+  Try<bool> createHostLoQdisc = ingress::create(lo.get());
+  if (createHostLoQdisc.isError()) {
+    return Error(
+        "Failed to create the ingress qdisc on " + lo.get() +
+        ": " + createHostLoQdisc.error());
+  }
+
+  // Enable 'route_localnet' on host loopback interface (lo). This
+  // enables the use of 127.0.0.1/8 for local routing purpose. This
+  // feature only exists on kernel 3.6 or newer.
+  const string loRouteLocalnet =
+    path::join("/proc/sys/net/ipv4/conf", lo.get(), "route_localnet");
+  if (!os::exists(loRouteLocalnet)) {
+    // TODO(jieyu): Consider supporting running the isolator if this
+    // feature is not available. We need to conditionally disable
+    // routing for 127.0.0.1/8, and ask the tasks to use the public IP
+    // for container to container and container to host communication.
+    return Error("The kernel does not support 'route_localnet'");
+  }
+
+  Try<Nothing> write = os::write(loRouteLocalnet, "1");
+  if (write.isError()) {
+    return Error(
+        "Failed to enable route_localnet for " + lo.get() +
+        ": " + write.error());
+  }
+
+  // We disable 'rp_filter' and 'send_redirects' for host loopback
+  // interface (lo) to work around a kernel bug, which was only
+  // recently addressed in upstream in the following 3 commits.
+  // https://git.kernel.org/cgit/linux/kernel/git/davem/net.git/:
+  //   6a662719c9868b3d6c7d26b3a085f0cd3cc15e64
+  //   0d5edc68739f1c1e0519acbea1d3f0c1882a15d7
+  //   e374c618b1465f0292047a9f4c244bd71ab5f1f0
+  // The workaround ensures packets don't get dropped at lo.
+  write = os::write("/proc/sys/net/ipv4/conf/all/rp_filter", "0");
+  if (write.isError()) {
+    return Error(
+        "Failed to disable rp_filter for all: " + write.error());
+  }
+
+  write = os::write(path::join(
+      "/proc/sys/net/ipv4/conf", lo.get(), "rp_filter"), "0");
+  if (write.isError()) {
+    return Error(
+        "Failed to disable rp_filter for " + lo.get() +
+        ": " + write.error());
+  }
+
+  write = os::write("/proc/sys/net/ipv4/conf/all/send_redirects", "0");
+  if (write.isError()) {
+    return Error(
+        "Failed to disable send_redirects for all: " + write.error());
+  }
+
+  write = os::write(path::join(
+      "/proc/sys/net/ipv4/conf", lo.get(), "send_redirects"), "0");
+  if (write.isError()) {
+    return Error(
+        "Failed to disable send_redirects for " + lo.get() +
+        ": " + write.error());
+  }
+
+  // We need to enable accept_local on host loopback interface (lo)
+  // for kernels older than 3.6. Refer to the following:
+  // https://git.kernel.org/cgit/linux/kernel/git/davem/net.git/:
+  //   7a9bc9b81a5bc6e44ebc80ef781332e4385083f2
+  // https://www.kernel.org/doc/Documentation/networking/ip-sysctl.txt
+  write = os::write(path::join(
+      "/proc/sys/net/ipv4/conf", lo.get(), "accept_local"), "1");
+  if (write.isError()) {
+    return Error(
+        "Failed to enable accept_local for " + lo.get() +
+        ": " + write.error());
+  }
+
+  // Create the bind mount directory.
+  Try<Nothing> mkdir = os::mkdir(BIND_MOUNT_ROOT);
+  if(mkdir.isError()) {
+    return Error(
+        "Failed to create the bind mount root directory at " +
+        BIND_MOUNT_ROOT + ": " + mkdir.error());
+  }
+
+  return new Isolator(Owned<IsolatorProcess>(
+      new PortMappingIsolatorProcess(
+          flags,
+          eth0.get(),
+          lo.get(),
+          hostMAC.get(),
+          hostIP.get(),
+          hostEth0MTU.get(),
+          hostDefaultGateway.get(),
+          nonEphemeralPorts,
+          ephemeralPortsAllocator)));
+}
+
+
+Future<Nothing> PortMappingIsolatorProcess::recover(
+    const list<state::RunState>& states)
+{
+  // Extract pids from virtual device names.
+  Try<set<string> > links = net::links();
+  if (links.isError()) {
+    return Failure("Failed to get all the links: " + links.error());
+  }
+
+  hashset<pid_t> pids;
+  foreach (const string& name, links.get()) {
+    Option<pid_t> pid = getPid(name);
+    // Not all links follow the naming: veth{pid}, so we simply
+    // continue, e.g., eth0.
+    if (pid.isNone()) {
+      continue;
+    } else if (pids.contains(pid.get())) {
+      return Failure("Two virtual devices have the same name '" + name + "'");
+    }
+
+    pids.insert(pid.get());
+  }
+
+  foreach (const state::RunState& state, states) {
+    if (!state.id.isSome()) {
+      foreachvalue (Info* info, infos) {
+        delete info;
+      }
+      infos.clear();
+
+      return Failure("ContainerID and pid are required to recover");
+    }
+
+    // Containerizer is not supposed to let the isolator recover a run
+    // with a forked pid.
+    CHECK_SOME(state.forkedPid);
+
+    const ContainerID& containerId = state.id.get();
+    pid_t pid = state.forkedPid.get();
+
+    VLOG(1) << "Recovering network isolator for container "
+            << containerId << " with pid " << pid;
+
+    Result<Info*> recover = _recover(pid);
+    if (recover.isError()) {
+      foreachvalue (Info* info, infos) {
+        delete info;
+      }
+      infos.clear();
+
+      return Failure(
+          "Failed to recover container " + stringify(containerId) +
+          " with pid " + stringify(pid) + ": " + recover.error());
+    } else if (recover.isNone()) {
+      LOG(WARNING) << "Cannot recover container " << containerId
+                   << " with pid " << pid
+                   << ". It may have already been destroyed";
+
+      // This may occur if the executor has exited and the isolator
+      // has destroyed the container but the slave dies before
+      // noticing this.
+      continue;
+    }
+
+    infos[containerId] = recover.get();
+
+    // Remove the successfully recovered pid.
+    pids.erase(pid);
+  }
+
+  // If there are orphaned containers left, clean them up.
+  foreach (pid_t pid, pids) {
+    Result<Info*> recover = _recover(pid);
+    if (recover.isError()) {
+      foreachvalue (Info* info, infos) {
+        delete info;
+      }
+      infos.clear();
+
+      return Failure(
+          "Failed to recover orphaned container with pid " +
+          stringify(pid) + ": " + recover.error());
+    } else if (recover.isNone()) {
+      // If the control reaches here, a serious problem has occurred
+      // because our link (veth) has been unexpectedly deleted.
+      LOG(FATAL) << "The veth for orphaned container with pid "
+                 << pid << " has been unexpectedly deleted";
+    }
+
+    // The recovery should fail if we cannot cleanup an orphan.
+    Try<Nothing> cleanup = _cleanup(recover.get());
+    if (cleanup.isError()) {
+      foreachvalue (Info* info, infos) {
+        delete info;
+      }
+      infos.clear();
+
+      return Failure(
+          "Failed to cleanup orphaned container with pid " +
+          stringify(pid) + ": " + cleanup.error());
+    }
+  }
+
+  LOG(INFO) << "Network isolator recovery complete";
+
+  return Nothing();
+}
+
+
+Result<PortMappingIsolatorProcess::Info*>
+PortMappingIsolatorProcess::_recover(pid_t pid)
+{
+  // Get all the IP filters on veth.
+  Result<vector<ip::Classifier> > classifiers =
+    ip::classifiers(veth(pid), ingress::HANDLE);
+
+  if (classifiers.isError()) {
+    return Error(
+        "Failed to get all the IP filters on " + veth(pid) +
+        ": " + classifiers.error());
+  } else if (classifiers.isNone()) {
+    // Since we bind mount the network namespace handle (which causes
+    // an extra reference), the veth should be present even if the
+    // executor has exited. However, we may encounter a case where the
+    // veth is removed (because the corresponding container is
+    // destroyed), but the slave restarts before it is able to write
+    // the sentinel file. In that case, when the slave restarts, it
+    // will try to recover a container that has already been
+    // destroyed. To distinguish this case, we return None here.
+    return None();
+  }
+
+  IntervalSet<uint16_t> nonEphemeralPorts;
+  IntervalSet<uint16_t> ephemeralPorts;
+
+  foreach (const ip::Classifier& classifier, classifiers.get()) {
+    Option<PortRange> sourcePorts = classifier.sourcePorts();
+    Option<PortRange> destinationPorts = classifier.destinationPorts();
+
+    // All the IP filters on veth used by us only have source ports.
+    if (sourcePorts.isNone() || destinationPorts.isSome()) {
+      return Error("Unexpected IP filter detected on " + veth(pid));
+    }
+
+    Interval<uint16_t> ports =
+      (Bound<uint16_t>::closed(sourcePorts.get().begin()),
+       Bound<uint16_t>::closed(sourcePorts.get().end()));
+
+    if (managedNonEphemeralPorts.contains(ports)) {
+      nonEphemeralPorts += ports;
+    } else if (ephemeralPortsAllocator->isManaged(ports)) {
+      // We have duplicate here because we have two IP filters with
+      // the same ephemeral port range (one for eth0 and one for lo).
+      // But we should never have two intersecting port ranges.
+      if (!ephemeralPorts.contains(ports) && ephemeralPorts.intersects(ports)) {
+        return Error("Unexpected intersected ephemeral port ranges");
+      }
+
+      ephemeralPorts += ports;
+    } else {
+      return Error("Unexpected IP filter detected on " + veth(pid));
+    }
+  }
+
+  if (ephemeralPorts.empty()) {
+    return Error("No ephemeral ports found");
+  }
+
+  if (ephemeralPorts.intervalCount() != 1) {
+    return Error("Each container should have only one ephemeral port range");
+  }
+
+  // Tell the allocator that this ephemeral port range is used.
+  ephemeralPortsAllocator->allocate(*ephemeralPorts.begin());
+
+  Info* info = new Info(nonEphemeralPorts, *ephemeralPorts.begin(), pid);
+  CHECK_NOTNULL(info);
+
+  VLOG(1) << "Recovered network isolator for container with pid " << pid
+          << " non-ephemeral port ranges " << nonEphemeralPorts
+          << " and ephemeral port range " << *ephemeralPorts.begin();
+
+  return info;
+}
+
+
+Future<Option<CommandInfo> > PortMappingIsolatorProcess::prepare(
+    const ContainerID& containerId,
+    const ExecutorInfo& executorInfo)
+{
+  if (infos.contains(containerId)) {
+    return Failure("Container has already been prepared");
+  }
+
+  Resources resources(executorInfo.resources());
+
+  IntervalSet<uint16_t> nonEphemeralPorts;
+
+  if (resources.ports().isSome()) {
+    nonEphemeralPorts = getIntervalSet(resources.ports().get());
+
+    // Sanity check to make sure that the assigned non-ephemeral ports
+    // for the container are part of the non-ephemeral ports specified
+    // by the slave.
+    if (!managedNonEphemeralPorts.contains(nonEphemeralPorts)) {
+        return Failure(
+            "Some non-ephemeral ports specified in " +
+            stringify(nonEphemeralPorts) +
+            " are not managed by the slave");
+    }
+  }
+
+  // Determine the ephemeral ports used by this container.
+  Try<Interval<uint16_t> > ephemeralPorts = ephemeralPortsAllocator->allocate();
+  if (ephemeralPorts.isError()) {
+    return Failure(
+        "Failed to allocate ephemeral ports: " + ephemeralPorts.error());
+  }
+
+  infos[containerId] =
+    CHECK_NOTNULL(new Info(nonEphemeralPorts, ephemeralPorts.get()));
+
+  LOG(INFO) << "Allocated non-ephemeral ports " << nonEphemeralPorts
+            << " and ephemeral ports " << ephemeralPorts.get()
+            << " for container " << containerId << " of executor "
+            << executorInfo.executor_id();
+
+  CommandInfo command;
+  command.set_value(scripts(infos[containerId]));
+
+  return command;
+}
+
+
+Future<Nothing> PortMappingIsolatorProcess::isolate(
+    const ContainerID& containerId,
+    pid_t pid)
+{
+  if (!infos.contains(containerId)) {
+    return Failure("Unknown container");
+  }
+
+  Info* info = CHECK_NOTNULL(infos[containerId]);
+
+  if (info->pid.isSome()) {
+    return Failure("The container has already been isolated");
+  }
+
+  info->pid = pid;
+
+  // Bind mount the network namespace handle of the process 'pid' to a
+  // directory to hold an extra reference to the network namespace
+  // which will be released in 'cleanup'. By holding the extra
+  // reference, the network namespace will not be destroyed even if
+  // the process 'pid' is gone, which allows us to explicitly control
+  // the network namespace life cycle.
+  const string source = path::join("/proc", stringify(pid), "ns", "net");
+  const string target = path::join(BIND_MOUNT_ROOT, stringify(pid));
+
+  Try<Nothing> touch = os::touch(target);
+  if (touch.isError()) {
+    return Failure("Failed to create the bind mount point: " + touch.error());
+  }
+
+  Try<Nothing> mount = fs::mount(source, target, "none", MS_BIND, NULL);
+  if (mount.isError()) {
+    return Failure(
+        "Failed to mount the network namespace handle from " +
+        source + " to " + target + ": " + mount.error());
+  }
+
+  LOG(INFO) << "Bind mounted " << source << " to " << target
+            << " for container " << containerId;
+
+  // Create a virtual ethernet pair for this container.
+  Try<bool> createVethPair = link::create(veth(pid), eth0, pid);
+  if (createVethPair.isError()) {
+    return Failure(
+        "Failed to create virtual ethernet pair: " +
+        createVethPair.error());
+  }
+
+  // Sets the MAC address of veth to match the MAC address of the host
+  // public interface (eth0).
+  Try<bool> setVethMAC = link::setMAC(veth(pid), hostMAC);
+  if (setVethMAC.isError()) {
+    return Failure(
+        "Failed to set the MAC address of " + veth(pid) +
+        ": " + setVethMAC.error());
+  }
+
+  // Prepare the ingress queueing disciplines on veth.
+  Try<bool> createQdisc = ingress::create(veth(pid));
+  if (createQdisc.isError()) {
+    return Failure(
+        "Failed to create the ingress qdisc on " + veth(pid) +
+        ": " + createQdisc.error());
+  }
+
+  // Veth device should exist since we just created it.
+  CHECK(createQdisc.get());
+
+  // For each port range, add a set of IP packet filters to properly
+  // redirect IP traffic to/from containers.
+  foreach (const PortRange& range,
+           getPortRanges(info->nonEphemeralPorts + info->ephemeralPorts)) {
+    LOG(INFO) << "Adding IP packet filters with ports " << range
+              << " for container " << containerId;
+
+    Try<Nothing> add = addHostIPFilters(
+        range,
+        eth0,
+        lo,
+        veth(pid),
+        hostMAC,
+        hostIP);
+
+    if (add.isError()) {
+      return Failure(
+          "Failed to add IP packet filter with ports " +
+          stringify(range) + " for container with pid " +
+          stringify(pid) + ": " + add.error());
+    }
+  }
+
+  // Relay ICMP packets from veth of the container to host eth0.
+  Try<bool> icmpVethToEth0 = filter::icmp::create(
+      veth(pid),
+      ingress::HANDLE,
+      icmp::Classifier(None()),
+      Priority(ICMP_FILTER_PRIORITY, NORMAL),
+      action::Redirect(eth0));
+
+  if (icmpVethToEth0.isError()) {
+    return Failure(
+        "Failed to create an ICMP packet filter from " + veth(pid) +
+        " to host " + eth0 + ": " + icmpVethToEth0.error());
+  } else if (!icmpVethToEth0.get()) {
+    return Failure(
+        "The ICMP packet filter from " + veth(pid) +
+        " to host " + eth0 + " already exists");
+  }
+
+  // Relay ARP packets from veth of the container to host eth0.
+  Try<bool> arpVethToEth0 = filter::arp::create(
+      veth(pid),
+      ingress::HANDLE,
+      Priority(ARP_FILTER_PRIORITY, NORMAL),
+      action::Redirect(eth0));
+
+  if (arpVethToEth0.isError()) {
+    return Failure(
+        "Failed to create an ARP packet filter from " + veth(pid) +
+        " to host " + eth0 + ": " + arpVethToEth0.error());
+  } else if (!arpVethToEth0.get()) {
+    return Failure(
+        "The ARP packet filter from " + veth(pid) +
+        " to host " + eth0 + " already exists");
+  }
+
+  // Mirror ICMP and ARP packets from host eth0 to veths of all the
+  // containers.
+  set<string> targets;
+  foreachvalue (Info* info, infos) {
+    if (info->pid.isSome()) {
+      targets.insert(veth(info->pid.get()));
+    }
+  }
+
+  if (targets.size() == 1) {
+    // Create a new ICMP filter on host eth0.
+    Try<bool> icmpEth0ToVeth = filter::icmp::create(
+        eth0,
+        ingress::HANDLE,
+        icmp::Classifier(net::IP(hostIP.address())),
+        Priority(ICMP_FILTER_PRIORITY, NORMAL),
+        action::Mirror(targets));
+
+    if (icmpEth0ToVeth.isError()) {
+      return Failure(
+          "Failed to create an ICMP packet filter from host " + eth0 +
+          " to " + veth(pid) + ": " + icmpEth0ToVeth.error());
+    } else if (!icmpEth0ToVeth.get()) {
+      return Failure(
+          "The ICMP packet filter on host " + eth0 + " already exists");
+    }
+
+    // Create a new ARP filter on host eth0.
+    Try<bool> arpEth0ToVeth = filter::arp::create(
+        eth0,
+        ingress::HANDLE,
+        Priority(ARP_FILTER_PRIORITY, NORMAL),
+        action::Mirror(targets));
+
+    if (arpEth0ToVeth.isError()) {
+      return Failure(
+          "Failed to create an ARP packet filter from host " + eth0 +
+          " to " + veth(pid) + ": " + arpEth0ToVeth.error());
+    } else if (!arpEth0ToVeth.get()) {
+      return Failure(
+          "The ARP packet filter on host " + eth0 + " already exists");
+    }
+  } else {
+    // Update the ICMP filter on host eth0.
+    Try<bool> icmpEth0ToVeth = filter::icmp::update(
+        eth0,
+        ingress::HANDLE,
+        icmp::Classifier(net::IP(hostIP.address())),
+        action::Mirror(targets));
+
+    if (icmpEth0ToVeth.isError()) {
+      return Failure(
+          "Failed to append a ICMP mirror action from host " +
+           eth0 + " to " + veth(pid) + ": " + icmpEth0ToVeth.error());
+    } else if (!icmpEth0ToVeth.get()) {
+      return Failure(
+          "The ICMP packet filter on host " + eth0 + " already exists");
+    }
+
+    // Update the ARP filter on host eth0.
+    Try<bool> arpEth0ToVeth = filter::arp::update(
+        eth0,
+        ingress::HANDLE,
+        action::Mirror(targets));
+
+    if (arpEth0ToVeth.isError()) {
+      return Failure(
+          "Failed to append an ARP mirror action from host " +
+           eth0 + " to " + veth(pid) + ": " + arpEth0ToVeth.error());
+    } else if (!arpEth0ToVeth.get()) {
+      return Failure(
+          "The ARP packet filter on host " + eth0 + " already exists");
+    }
+  }
+
+  // Turn on the veth.
+  Try<bool> enable = link::setUp(veth(pid));
+  if (enable.isError()) {
+    return Failure("Failed to turn on " + veth(pid) + ": " + enable.error());
+  } else if (!enable.get()) {
+    return Failure("Not expecting " + veth(pid) + " to be missing");
+  }
+
+  return Nothing();
+}
+
+
+Future<Limitation> PortMappingIsolatorProcess::watch(
+    const ContainerID& containerId)
+{
+  if (!infos.contains(containerId)) {
+    return Failure("Unknown container");
+  }
+
+  // Currently, we always return a pending future because limitation
+  // is never reached.
+  return Future<Limitation>();
+}
+
+
+static void _update(
+    const Future<Option<int> >& status,
+    const ContainerID& containerId)
+{
+  if (!status.isReady()) {
+    LOG(ERROR) << "Failed to launch the launcher for updating container "
+               << containerId << ": "
+               << (status.isFailed() ? status.failure() : "discarded");
+  } else if (status.get().isNone()) {
+    LOG(ERROR) << "The launcher for updating container " << containerId
+               << " is not expected to be reaped elsewhere";
+  } else if (status.get().get() != 0) {
+    LOG(ERROR) << "Received non-zero exit status " << status.get().get()
+               << " from the launcher for updating container " << containerId;
+  } else {
+    LOG(INFO) << "The launcher for updating container " << containerId
+              << " finished successfully";
+  }
+}
+
+
+Future<Nothing> PortMappingIsolatorProcess::update(
+    const ContainerID& containerId,
+    const Resources& resources)
+{
+  if (!infos.contains(containerId)) {
+    return Failure("Unknown container");
+  }
+
+  Info* info = CHECK_NOTNULL(infos[containerId]);
+
+  if (info->pid.isNone()) {
+    return Failure("The container has not been isolated");
+  }
+  pid_t pid = info->pid.get();
+
+  IntervalSet<uint16_t> nonEphemeralPorts;
+
+  if (resources.ports().isSome()) {
+    nonEphemeralPorts = getIntervalSet(resources.ports().get());
+
+    // Sanity check to make sure that the assigned non-ephemeral ports
+    // for the container are part of the non-ephemeral ports specified
+    // by the slave.
+    if (!managedNonEphemeralPorts.contains(nonEphemeralPorts)) {
+        return Failure(
+            "Some non-ephemeral ports specified in " +
+            stringify(nonEphemeralPorts) +
+            " are not managed by the slave");
+    }
+  }
+
+  // No need to proceed if no change to the non-ephemeral ports.
+  if (nonEphemeralPorts == info->nonEphemeralPorts) {
+    return Nothing();
+  }
+
+  LOG(INFO) << "Updating non-ephemeral ports for container "
+            << containerId << " from " << info->nonEphemeralPorts
+            << " to " << nonEphemeralPorts;
+
+  Result<vector<ip::Classifier> > classifiers =
+    ip::classifiers(veth(pid), ingress::HANDLE);
+
+  if (classifiers.isError()) {
+    return Failure(
+        "Failed to get all the IP filters on " + veth(pid) +
+        ": " + classifiers.error());
+  } else if (classifiers.isNone()) {
+    return Failure("Failed to find " + veth(pid));
+  }
+
+  // We first decide what port ranges need to be removed. Any filter
+  // whose port range is not within the new non-ephemeral ports should
+  // be removed.
+  hashset<PortRange> portsToRemove;
+  IntervalSet<uint16_t> remaining = info->nonEphemeralPorts;
+
+  foreach (const ip::Classifier& classifier, classifiers.get()) {
+    Option<PortRange> sourcePorts = classifier.sourcePorts();
+    Option<PortRange> destinationPorts = classifier.destinationPorts();
+
+    // All the IP filters on veth used by us only have source ports.
+    if (sourcePorts.isNone() || destinationPorts.isSome()) {
+      return Failure("Unexpected IP filter detected on " + veth(pid));
+    }
+
+    Interval<uint16_t> ports =
+      (Bound<uint16_t>::closed(sourcePorts.get().begin()),
+       Bound<uint16_t>::closed(sourcePorts.get().end()));
+
+    // Skip the ephemeral ports.
+    if (ports == info->ephemeralPorts) {
+      continue;
+    }
+
+    if (!nonEphemeralPorts.contains(ports)) {
+      remaining -= ports;
+      portsToRemove.insert(sourcePorts.get());
+    }
+  }
+
+  // We then decide what port ranges need to be added.
+  vector<PortRange> portsToAdd = getPortRanges(nonEphemeralPorts - remaining);
+
+  foreach (const PortRange& range, portsToAdd) {
+    LOG(INFO) << "Adding IP packet filters with ports " << range
+              << " for container " << containerId;
+
+    Try<Nothing> add = addHostIPFilters(
+        range,
+        eth0,
+        lo,
+        veth(pid),
+        hostMAC,
+        hostIP);
+
+    if (add.isError()) {
+      return Failure(
+          "Failed to add IP packet filter with ports " +
+          stringify(range) + " for container with pid " +
+          stringify(pid) + ": " + add.error());
+    }
+  }
+
+  foreach (const PortRange& range, portsToRemove) {
+    LOG(INFO) << "Removing IP packet filters with ports " << range
+              << " for container with pid " << pid;
+
+    Try<Nothing> removing =
+      removeHostIPFilters(
+          range,
+          eth0,
+          lo,
+          veth(pid),
+          hostMAC,
+          hostIP);
+
+    if (removing.isError()) {
+      return Failure(
+          "Failed to remove IP packet filter with ports " +
+          stringify(range) + " for container with pid " +
+          stringify(pid) + ": " + removing.error());
+    }
+  }
+
+  // Update the non-ephemeral ports of this container.
+  info->nonEphemeralPorts = nonEphemeralPorts;
+
+  // Update the IP filters inside the container.
+  PortMappingUpdate update;
+  update.flags.eth0_name = eth0;
+  update.flags.lo_name = lo;
+  update.flags.pid = pid;
+  update.flags.ports_to_add = json(portsToAdd);
+  update.flags.ports_to_remove = json(portsToRemove);
+
+  vector<string> argv(2);
+  argv[0] = "mesos-network-helper";
+  argv[1] = PortMappingUpdate::NAME;
+
+  Try<Subprocess> s = subprocess(
+      path::join(flags.launcher_dir, "mesos-network-helper"),
+      argv,
+      update.flags);
+
+  if (s.isError()) {
+    return Failure("Failed to launch update subcommand: " + s.error());
+  }
+
+  return s.get().status()
+    .onAny(lambda::bind(&_update, lambda::_1, containerId))
+    .then(lambda::bind(&_nothing));
+}
+
+
+Future<ResourceStatistics> PortMappingIsolatorProcess::usage(
+    const ContainerID& containerId)
+{
+  ResourceStatistics result;
+
+  if (!infos.contains(containerId)) {
+    LOG(WARNING) << "Unknown container";
+    return result;
+  }
+
+  Info* info = CHECK_NOTNULL(infos[containerId]);
+
+  if(info->pid.isNone()) {
+    return result;
+  }
+
+  Result<hashmap<string, uint64_t> > stat =
+    link::statistics(veth(info->pid.get()));
+
+  if (stat.isError()) {
+    return Failure(
+        "Failed to retrieve statistics on link " +
+        veth(info->pid.get()) + ": " + stat.error());
+  } else if (stat.isNone()) {
+     return Failure(
+         "Failed to find link: " + veth(info->pid.get()));
+  }
+
+  Option<uint64_t> rx_packets = stat.get().get("rx_packets");
+  if (rx_packets.isSome()) {
+    result.set_net_rx_packets(rx_packets.get());
+  }
+
+  Option<uint64_t> rx_bytes = stat.get().get("rx_bytes");
+  if (rx_bytes.isSome()) {
+    result.set_net_rx_bytes(rx_bytes.get());
+  }
+
+  Option<uint64_t> rx_errors = stat.get().get("rx_errors");
+  if (rx_errors.isSome()) {
+    result.set_net_rx_errors(rx_errors.get());
+  }
+
+  Option<uint64_t> rx_dropped = stat.get().get("rx_dropped");
+  if (rx_dropped.isSome()) {
+    result.set_net_rx_dropped(rx_dropped.get());
+  }
+
+  Option<uint64_t> tx_packets = stat.get().get("tx_packets");
+  if (tx_packets.isSome()) {
+    result.set_net_tx_packets(tx_packets.get());
+  }
+
+  Option<uint64_t> tx_bytes = stat.get().get("tx_bytes");
+  if (tx_bytes.isSome()) {
+    result.set_net_tx_bytes(tx_bytes.get());
+  }
+
+  Option<uint64_t> tx_errors = stat.get().get("tx_errors");
+  if (tx_errors.isSome()) {
+    result.set_net_tx_errors(tx_errors.get());
+  }
+
+  Option<uint64_t> tx_dropped = stat.get().get("tx_dropped");
+  if (tx_dropped.isSome()) {
+    result.set_net_tx_dropped(tx_dropped.get());
+  }
+
+  return result;
+}
+
+
+Future<Nothing> PortMappingIsolatorProcess::cleanup(
+      const ContainerID& containerId)
+{
+  if (!infos.contains(containerId)) {
+    LOG(WARNING) << "Ignoring cleanup for unknown container " << containerId;
+    return Nothing();
+  }
+
+  Info* info = CHECK_NOTNULL(infos[containerId]);
+
+  // For a normally exited container, we take its info pointer off the
+  // hashmap infos before using the helper function to clean it up.
+  infos.erase(containerId);
+
+  Try<Nothing> cleanup = _cleanup(info);
+  if (cleanup.isError()) {
+    return Failure(cleanup.error());
+  }
+
+  return Nothing();
+}
+
+
+// An old glibc might not have this symbol.
+#ifndef MNT_DETACH
+#define MNT_DETACH 2
+#endif
+
+
+Try<Nothing> PortMappingIsolatorProcess::_cleanup(Info* _info)
+{
+  // Set '_info' to be auto-managed so that it will be deleted when
+  // this function returns.
+  Owned<Info> info(CHECK_NOTNULL(_info));
+
+  if(!info->pid.isSome()) {
+    LOG(WARNING) << "The container has not been isolated";
+    return Nothing();
+  }
+
+  pid_t pid = info->pid.get();
+
+  // Remove the IP filters on eth0 and lo for non-ephemeral port
+  // ranges and the ephemeral port range.
+  foreach (const PortRange& range,
+           getPortRanges(info->nonEphemeralPorts + info->ephemeralPorts)) {
+    LOG(INFO) << "Removing IP packet filters with ports " << range
+              << " for container with pid " << pid;
+
+    Try<Nothing> removing =
+      removeHostIPFilters(
+          range,
+          eth0,
+          lo,
+          veth(pid),
+          hostMAC,
+          hostIP);
+
+    if (removing.isError()) {
+      return Error(
+          "Failed to remove IP packet filter with ports " +
+          stringify(range) + " for container with pid " +
+          stringify(pid) + ": " + removing.error());
+    }
+  }
+
+  // Free the ephemeral ports used by this container.
+  ephemeralPortsAllocator->deallocate(info->ephemeralPorts);
+
+  LOG(INFO) << "Freed ephemeral ports " << info->ephemeralPorts
+            << " for container with pid " << pid;
+
+  set<string> targets;
+  foreachvalue (Info* info, infos) {
+    if (info->pid.isSome()) {
+      targets.insert(veth(info->pid.get()));
+    }
+  }
+
+  if (targets.empty()) {
+    // This is the last container, remove the ARP and ICMP filters on
+    // host eth0.
+
+    // Remove the ICMP filter on host eth0.
+    Try<bool> icmpEth0ToVeth = filter::icmp::remove(
+        eth0,
+        ingress::HANDLE,
+        icmp::Classifier(net::IP(hostIP.address())));
+
+    if (icmpEth0ToVeth.isError()) {
+      return Error(
+          "Failed to remove the ICMP packet filter on host " + eth0 +
+          ": " + icmpEth0ToVeth.error());
+    } else if (!icmpEth0ToVeth.get()) {
+      LOG(ERROR) << "The ICMP packet filter on host " << eth0
+                 << " does not exist";
+    }
+
+    // Remove the ARP filter on host eth0.
+    Try<bool> arpEth0ToVeth = filter::arp::remove(
+        eth0,
+        ingress::HANDLE);
+
+    if (arpEth0ToVeth.isError()) {
+      return Error(
+          "Failed to remove the ARP packet filter on host " + eth0 +
+          ": " + arpEth0ToVeth.error());
+    } else if (!arpEth0ToVeth.get()) {
+      LOG(ERROR) << "The ARP packet filter on host " << eth0
+                 << " does not exist";
+    }
+
+  } else {
+    // This is not the last container. Replace the ARP and ICMP
+    // filters. The reason we do this is that we don't have an easy
+    // way to search and delete an action from the multiple actions on
+    // a single filter.
+    Try<bool> icmpEth0ToVeth = filter::icmp::update(
+        eth0,
+        ingress::HANDLE,
+        icmp::Classifier(net::IP(hostIP.address())),
+        action::Mirror(targets));
+
+    if (icmpEth0ToVeth.isError()) {
+      return Error(
+          "Failed to update the ICMP mirror action from host " + eth0 +
+          " to " + veth(pid) + ": " + icmpEth0ToVeth.error());
+    } else if (!icmpEth0ToVeth.get()) {
+      return Error(
+          "The ICMP packet filter on host " + eth0 + " does not exist");
+    }
+
+    Try<bool> arpEth0ToVeth = filter::arp::update(
+        eth0,
+        ingress::HANDLE,
+        action::Mirror(targets));
+
+    if (arpEth0ToVeth.isError()) {
+      return Error(
+          "Failed to update the ARP mirror action from host " + eth0 +
+          " to " + veth(pid) + ": " + arpEth0ToVeth.error());
+    } else if (!arpEth0ToVeth.get()) {
+      return Error(
+          "The ARP packet filter on host " + eth0 + " does not exist");
+    }
+  }
+
+  // Release the bind mount for this container.
+  const string target = path::join(BIND_MOUNT_ROOT, stringify(pid));
+
+  Try<Nothing> unmount = fs::unmount(target, MNT_DETACH);
+  if (unmount.isError()) {
+    return Error("Failed to umount: " + unmount.error());
+  }
+
+  Try<Nothing> rm = os::rm(target);
+  if (rm.isError()) {
+    return Error("Failed to remove " + target + ": " + rm.error());
+  }
+
+  // We manually remove veth to avoid having to wait for the kernel to
+  // do it.
+  Try<bool> remove = link::remove(veth(pid));
+  if (remove.isError()) {
+    return Error(
+        "Failed to remove the link " + veth(pid) + ": " + remove.error());
+  }
+
+  LOG(INFO) << "Successfully removed the link " << veth(pid);
+
+  return Nothing();
+}
+
+// This function returns the scripts that need to be run in child
+// context before child execs to complete network isolation.
+// TODO(jieyu): Use the launcher abstraction to remove most of the
+// logic here. Completely remove this function once we can assume a
+// newer kernel where 'setns' works for mount namespaces.
+string PortMappingIsolatorProcess::scripts(Info* info)
+{
+  ostringstream script;
+
+  script << "#!/bin/sh\n";
+  script << "set -x\n";
+
+  // Remount /proc and /sys to show a separate networking stack.
+  // These should be done by a FilesystemIsolator in the future.
+  script << "mount -n -o remount -t sysfs none /sys\n";
+  script << "mount -n -o remount -t proc none /proc\n";
+
+  // Umount all the mount objects in BIND_MOUNT_ROOT inside child to
+  // clear reference counts to the mount namespace.
+  script << "umount " << BIND_MOUNT_ROOT << "/*\n";
+
+  // Configure lo and eth0.
+  script << "ip link set " << lo << " address " << hostMAC
+         << " mtu "<< hostEth0MTU << " up\n";
+
+  script << "ip link set " << eth0 << " address " << hostMAC << " up\n";
+  script << "ip addr add " << hostIP  << " dev " << eth0 << "\n";
+
+  // Set up the default gateway to match that of eth0.
+  script << "ip route add default via "
+         << net::IP(hostDefaultGateway.address()) << "\n";
+
+  // Restrict the ephemeral ports that can be used by the container.
+  script << "echo -e " << info->ephemeralPorts.lower() << "\t"
+         << (info->ephemeralPorts.upper() - 1)
+         << " > /proc/sys/net/ipv4/ip_local_port_range\n";
+
+  // Allow eth0 and lo in the container to accept local packets. We
+  // need this because we will set up filters to redirect packets from
+  // lo to eth0 in the container.
+  script << "echo 1 > /proc/sys/net/ipv4/conf/" << eth0 << "/accept_local\n";
+  script << "echo 1 > /proc/sys/net/ipv4/conf/" << lo << "/accept_local\n";
+
+  // Enable route_localnet on lo because by default 127.0.0.1 traffic
+  // is dropped. This feature exists on 3.6 kernel or newer.
+  if (os::exists(path::join("/proc/sys/net/ipv4/conf", lo, "route_localnet"))) {
+    script << "echo 1 > /proc/sys/net/ipv4/conf/" << lo << "/route_localnet\n";
+  }
+
+  // Set up filters on lo and eth0.
+  script << "tc qdisc add dev " << lo << " ingress\n";
+  script << "tc qdisc add dev " << eth0 << " ingress\n";
+
+  // Allow talking between containers and from container to host.
+  // TODO(chzhcn): Consider merging the following two filters.
+  script << "tc filter add dev " << lo << " parent ffff: protocol ip"
+         << " prio " << Priority(IP_FILTER_PRIORITY, NORMAL).get() << " u32"
+         << " flowid ffff:0"
+         << " match ip dst " << net::IP(hostIP.address())
+         << " action mirred egress redirect dev " << eth0 << "\n";
+
+  script << "tc filter add dev " << lo << " parent ffff: protocol ip"
+         << " prio " << Priority(IP_FILTER_PRIORITY, NORMAL).get() << " u32"
+         << " flowid ffff:0"
+         << " match ip dst " << net::IP(LOOPBACK_IP.address())
+         << " action mirred egress redirect dev " << eth0 << "\n";
+
+  foreach (const PortRange& range,
+           getPortRanges(info->nonEphemeralPorts + info->ephemeralPorts)) {
+    // Local traffic inside a container will not be redirected to eth0.
+    script << "tc filter add dev " << lo << " parent ffff: protocol ip"
+           << " prio " << Priority(IP_FILTER_PRIORITY, HIGH).get() << " u32"
+           << " flowid ffff:0"
+           << " match ip dport " << range.begin() << " "
+           << hex << range.mask() << dec << "\n";
+
+    // Traffic going to host loopback IP and ports assigned to this
+    // container will be redirected to lo.
+    script << "tc filter add dev " << eth0 << " parent ffff: protocol ip"
+           << " prio " << Priority(IP_FILTER_PRIORITY, NORMAL).get() << " u32"
+           << " flowid ffff:0"
+           << " match ip dst " << net::IP(LOOPBACK_IP.address())
+           << " match ip dport " << range.begin() << " "
+           << hex << range.mask() << dec
+           << " action mirred egress redirect dev " << lo << "\n";
+  }
+
+  // Do not forward the ICMP packet if the destination IP is self.
+  script << "tc filter add dev " << lo << " parent ffff: protocol ip"
+         << " prio " << Priority(ICMP_FILTER_PRIORITY, NORMAL).get() << " u32"
+         << " flowid ffff:0"
+         << " match ip protocol 1 0xff"
+         << " match ip dst " << net::IP(hostIP.address()) << "\n";
+
+  script << "tc filter add dev " << lo << " parent ffff: protocol ip"
+         << " prio " << Priority(ICMP_FILTER_PRIORITY, NORMAL).get() << " u32"
+         << " flowid ffff:0"
+         << " match ip protocol 1 0xff"
+         << " match ip dst " << net::IP(LOOPBACK_IP.address()) << "\n";
+
+  // Display the filters created on eth0 and lo.
+  script << "tc filter show dev " << eth0 << " parent ffff:\n";
+  script << "tc filter show dev " << lo << " parent ffff:\n";
+
+  return script.str();
+}
+
+////////////////////////////////////////////////////
+// Implementation for the ephemeral ports allocator.
+////////////////////////////////////////////////////
+
+
+uint32_t EphemeralPortsAllocator::nextMultipleOf(uint32_t x, uint32_t m)
+{
+  uint32_t div = x / m;
+  uint32_t mod = x % m;
+
+  return (div + (mod == 0 ? 0 : 1)) * m;
+}
+
+
+Try<Interval<uint16_t> > EphemeralPortsAllocator::allocate()
+{
+  if (portsPerContainer_ == 0) {
+    return Error("Number of ephemeral ports per container is zero");
+  }
+
+  Option<Interval<uint16_t> > allocated;
+
+  foreach (const Interval<uint16_t>& interval, free) {
+    uint16_t upper = interval.upper();
+    uint16_t lower = interval.lower();
+    uint16_t size = upper - lower;
+
+    if (size < portsPerContainer_) {
+      continue;
+    }
+
+    // If 'lower' is not aligned, calculate the new aligned 'lower'.
+    if (lower % portsPerContainer_ != 0) {
+      lower = nextMultipleOf(lower, portsPerContainer_);
+      if (lower + portsPerContainer_ > upper) {
+        continue;
+      }
+    }
+
+    allocated = (Bound<uint16_t>::closed(lower),
+                 Bound<uint16_t>::open(lower + portsPerContainer_));
+    break;
+  }
+
+  if (allocated.isNone()) {
+    return Error("Failed to allocate ephemeral ports");
+  }
+
+  allocate(allocated.get());
+
+  return allocated.get();
+}
+
+
+void EphemeralPortsAllocator::allocate(const Interval<uint16_t>& ports)
+{
+  CHECK(free.contains(ports));
+  CHECK(!used.contains(ports));
+  free -= ports;
+  used += ports;
+}
+
+
+void EphemeralPortsAllocator::deallocate(const Interval<uint16_t>& ports)
+{
+  CHECK(!free.contains(ports));
+  CHECK(used.contains(ports));
+  free += ports;
+  used -= ports;
+}
+
+
+// This function is exposed for unit testing.
+vector<PortRange> getPortRanges(const IntervalSet<uint16_t>& ports)
+{
+  vector<PortRange> ranges;
+
+  foreach (const Interval<uint16_t>& interval, ports) {
+    uint16_t lower = interval.lower(); // Inclusive lower.
+    uint16_t upper = interval.upper(); // Exclusive upper.
+
+    // Construct a set of valid port ranges (i.e., that can be used by
+    // a filter) from 'interval'. We keep incrementing 'lower' as we
+    // find valid port ranges until we reach 'upper'.
+    while (lower < upper) {
+      // Determine the size of the port range starting from 'lower'.
+      // The size has to satisfy the following conditions: 1) size =
+      // 2^n (n=0,1,2,...); 2) lower % size == 0.
+      size_t size;
+      for (size = roundDownToPowerOfTwo(lower) ; size > 1; size = size / 2) {
+        if (lower % size == 0 && lower + size <= upper) {
+          break;
+        }
+      }
+
+      // Construct the port range given the size.
+      Try<PortRange> range = PortRange::fromBeginEnd(lower, lower + size - 1);
+
+      CHECK_SOME(range) << "Invalid port range: " << "[" << lower << ","
+                        << (lower + size - 1) << "]";
+
+      ranges.push_back(range.get());
+
+      lower += size;
+    }
+  }
+
+  return ranges;
+}
+
+} // namespace slave {
+} // namespace internal {
+} // namespace mesos {

http://git-wip-us.apache.org/repos/asf/mesos/blob/e878c74f/src/slave/containerizer/isolators/network/port_mapping.hpp
----------------------------------------------------------------------
diff --git a/src/slave/containerizer/isolators/network/port_mapping.hpp b/src/slave/containerizer/isolators/network/port_mapping.hpp
new file mode 100644
index 0000000..9ef28bc
--- /dev/null
+++ b/src/slave/containerizer/isolators/network/port_mapping.hpp
@@ -0,0 +1,249 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#ifndef __PORT_MAPPING_ISOLATOR_HPP__
+#define __PORT_MAPPING_ISOLATOR_HPP__
+
+#include <stdint.h>
+
+#include <sys/types.h>
+
+#include <string>
+#include <vector>
+
+#include <process/owned.hpp>
+
+#include <stout/hashmap.hpp>
+#include <stout/interval.hpp>
+#include <stout/net.hpp>
+#include <stout/none.hpp>
+#include <stout/option.hpp>
+#include <stout/subcommand.hpp>
+
+#include "linux/routing/filter/ip.hpp"
+
+#include "slave/flags.hpp"
+
+#include "slave/containerizer/isolator.hpp"
+
+namespace mesos {
+namespace internal {
+namespace slave {
+
+// Responsible for allocating ephemeral ports for the port mapping
+// network isolator. This class is exposed mainly for unit testing.
+class EphemeralPortsAllocator
+{
+public:
+  EphemeralPortsAllocator(
+      const IntervalSet<uint16_t>& total,
+      size_t _portsPerContainer)
+    : free(total),
+      portsPerContainer_(_portsPerContainer) {};
+
+  // Returns the number of ephemeral ports for each container.
+  size_t portsPerContainer() const { return portsPerContainer_; }
+
+  // Allocate an ephemeral port range for a container. The allocator
+  // will automatically find one port range with the given container
+  // size. Returns error if the allocation cannot be fulfilled (e.g.,
+  // exhausting available ephemeral ports).
+  Try<Interval<uint16_t> > allocate();
+
+  // Mark an ephemeral port range allocated. This is used in
+  // 'recover'.
+  void allocate(const Interval<uint16_t>& ports);
+
+  // Deallocate an ephemeral port range.
+  void deallocate(const Interval<uint16_t>& ports);
+
+  // Return true if the port range 'ports' is managed by the
+  // allocator, regardless it has been allocated to use or not.
+  bool isManaged(const Interval<uint16_t>& ports)
+  {
+    return (free + used).contains(ports);
+  }
+
+private:
+  // Given an integer x, return the smallest integer t such that t >=
+  // x and t % m == 0.
+  static uint32_t nextMultipleOf(uint32_t x, uint32_t m);
+
+  IntervalSet<uint16_t> free;
+  IntervalSet<uint16_t> used;
+
+  // The number of ephemeral ports for each container.
+  size_t portsPerContainer_;
+};
+
+
+// For the specified ports, generate a set of port ranges each of
+// which can be used by a single IP filter. In other words, each port
+// range needs to satisfy the following two conditions: 1) the size of
+// the range is 2^n (n=0,1,2...); 2) the begin of the range is size
+// aligned (i.e., begin % size == 0). This function is exposed mainly
+// for unit testing.
+std::vector<routing::filter::ip::PortRange> getPortRanges(
+    const IntervalSet<uint16_t>& ports);
+
+
+// Provides network isolation using port mapping. Each container is
+// assigned a fixed set of ports (including ephemeral ports). The
+// isolator will set up filters on the host such that network traffic
+// to the host will be properly redirected to the corresponding
+// container depending on the destination ports. The network traffic
+// from containers will also be properly relayed to the host. This
+// isolator is useful when the operator wants to reuse the host IP for
+// all containers running on the host (e.g., there are insufficient
+// IPs).
+class PortMappingIsolatorProcess : public IsolatorProcess
+{
+public:
+  static Try<Isolator*> create(const Flags& flags);
+
+  virtual ~PortMappingIsolatorProcess() {}
+
+  virtual process::Future<Nothing> recover(
+      const std::list<state::RunState>& states);
+
+  virtual process::Future<Option<CommandInfo> > prepare(
+      const ContainerID& containerId,
+      const ExecutorInfo& executorInfo);
+
+  virtual process::Future<Nothing> isolate(
+      const ContainerID& containerId,
+      pid_t pid);
+
+  virtual process::Future<Limitation> watch(
+      const ContainerID& containerId);
+
+  virtual process::Future<Nothing> update(
+      const ContainerID& containerId,
+      const Resources& resources);
+
+  virtual process::Future<ResourceStatistics> usage(
+      const ContainerID& containerId);
+
+  virtual process::Future<Nothing> cleanup(
+      const ContainerID& containerId);
+
+private:
+  struct Info
+  {
+    Info(const IntervalSet<uint16_t>& _nonEphemeralPorts,
+         const Interval<uint16_t>& _ephemeralPorts,
+         const Option<pid_t>& _pid = None())
+      : nonEphemeralPorts(_nonEphemeralPorts),
+        ephemeralPorts(_ephemeralPorts),
+        pid(_pid) {}
+
+    // Non-ephemeral ports used by the container. It's possible that a
+    // container does not use any non-ephemeral ports. In that case,
+    // 'nonEphemeralPorts' will be empty. This variable could change
+    // upon 'update'.
+    IntervalSet<uint16_t> nonEphemeralPorts;
+
+    // Each container has one and only one range of ephemeral ports.
+    // It cannot have more than one ranges of ephemeral ports because
+    // we need to setup the ip_local_port_range (which only accepts a
+    // single interval) inside the container to restrict the ephemeral
+    // ports used by the container.
+    const Interval<uint16_t> ephemeralPorts;
+
+    Option<pid_t> pid;
+  };
+
+  PortMappingIsolatorProcess(
+      const Flags& _flags,
+      const std::string& _eth0,
+      const std::string& _lo,
+      const net::MAC& _hostMAC,
+      const net::IP& _hostIP,
+      const size_t _hostEth0MTU,
+      const net::IP& _hostDefaultGateway,
+      const IntervalSet<uint16_t>& _managedNonEphemeralPorts,
+      const process::Owned<EphemeralPortsAllocator>& _ephemeralPortsAllocator)
+    : flags(_flags),
+      eth0(_eth0),
+      lo(_lo),
+      hostMAC(_hostMAC),
+      hostIP(_hostIP),
+      hostEth0MTU(_hostEth0MTU),
+      hostDefaultGateway(_hostDefaultGateway),
+      managedNonEphemeralPorts(_managedNonEphemeralPorts),
+      ephemeralPortsAllocator(_ephemeralPortsAllocator) {}
+
+  // Return the scripts that will be executed in the child context.
+  std::string scripts(Info* info);
+
+  // Continuations.
+  Try<Nothing> _cleanup(Info* info);
+  Result<Info*> _recover(pid_t pid);
+
+  const Flags flags;
+
+  const std::string eth0;
+  const std::string lo;
+  const net::MAC hostMAC;
+  const net::IP hostIP;
+  const size_t hostEth0MTU;
+  const net::IP hostDefaultGateway;
+
+  // All the non-ephemeral ports managed by the slave, as passed in
+  // via flags.resources.
+  const IntervalSet<uint16_t> managedNonEphemeralPorts;
+
+  process::Owned<EphemeralPortsAllocator> ephemeralPortsAllocator;
+
+  hashmap<ContainerID, Info*> infos;
+};
+
+
+// Defines the subcommand for 'update' that needs to be executed by a
+// subprocess to update the filters inside a container.
+class PortMappingUpdate : public Subcommand
+{
+public:
+  static const std::string NAME;
+
+  struct Flags : public flags::FlagsBase
+  {
+    Flags();
+
+    bool help;
+    Option<std::string> eth0_name;
+    Option<std::string> lo_name;
+    Option<pid_t> pid;
+    Option<JSON::Object> ports_to_add;
+    Option<JSON::Object> ports_to_remove;
+  };
+
+  PortMappingUpdate() : Subcommand(NAME) {}
+
+  Flags flags;
+
+protected:
+  virtual int execute();
+  virtual flags::FlagsBase* getFlags() { return &flags; }
+};
+
+} // namespace slave {
+} // namespace internal {
+} // namespace mesos {
+
+#endif // __PORT_MAPPING_ISOLATOR_HPP__