You are viewing a plain text version of this content. The canonical link for it is here.
Posted to by on 2015/07/25 01:38:00 UTC

[07/12] mesos git commit: Moved containerizer related tests under src/tests/containerizer.
diff --git a/src/tests/containerizer/port_mapping_tests.cpp b/src/tests/containerizer/port_mapping_tests.cpp
new file mode 100644
index 0000000..45ef97a
--- /dev/null
+++ b/src/tests/containerizer/port_mapping_tests.cpp
@@ -0,0 +1,2296 @@
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#include <gmock/gmock.h>
+#include <iostream>
+#include <string>
+#include <vector>
+#include <process/future.hpp>
+#include <process/io.hpp>
+#include <process/reap.hpp>
+#include <process/subprocess.hpp>
+#include <stout/bytes.hpp>
+#include <stout/gtest.hpp>
+#include <stout/ip.hpp>
+#include <stout/json.hpp>
+#include <stout/mac.hpp>
+#include <stout/net.hpp>
+#include <stout/stopwatch.hpp>
+#include <stout/os/stat.hpp>
+#include <stout/os/exists.hpp>
+#include "linux/fs.hpp"
+#include "linux/routing/utils.hpp"
+#include "linux/routing/filter/ip.hpp"
+#include "linux/routing/link/link.hpp"
+#include "linux/routing/queueing/ingress.hpp"
+#include "master/master.hpp"
+#include "mesos/mesos.hpp"
+#include "slave/flags.hpp"
+#include "slave/slave.hpp"
+#include "slave/containerizer/isolators/network/port_mapping.hpp"
+#include "slave/containerizer/fetcher.hpp"
+#include "slave/containerizer/launcher.hpp"
+#include "slave/containerizer/linux_launcher.hpp"
+#include "slave/containerizer/mesos/containerizer.hpp"
+#include "slave/containerizer/mesos/launch.hpp"
+#include "tests/flags.hpp"
+#include "tests/mesos.hpp"
+#include "tests/utils.hpp"
+using namespace mesos::internal::slave;
+using namespace process;
+using namespace routing;
+using namespace routing::filter;
+using namespace routing::queueing;
+using mesos::internal::master::Master;
+using mesos::slave::Isolator;
+using std::list;
+using std::ostringstream;
+using std::set;
+using std::string;
+using std::vector;
+using testing::_;
+using testing::Eq;
+using testing::Return;
+namespace mesos {
+namespace internal {
+namespace tests {
+// An old glibc might not have this symbol.
+#ifndef MNT_DETACH
+#define MNT_DETACH 2
+// Each test container works with a common specification of 2 CPUs,
+// 1GB of memory and 1GB of disk space, which experience has shown
+// to be sufficient to not encounter resource starvation issues when
+// running the test suite.
+const char* const containerCPU = "cpus:2";
+const char* const containerMemory = "mem:1024";
+const char* const containerDisk = "disk:1024";
+// We configure ephemeral and persistent port ranges outside the
+// default linux ip_local_port_range [32768-61000] in order to reduce
+// the probability of a conflict which could result in spurious
+// results (positive or negative) from these tests.
+const char* const ephemeralPorts = "ephemeral_ports:[30001-30999]";
+const char* const persistentPorts = "ports:[31000-32000]";
+// To keep things simple, we used fixed port ranges for our containers
+// in these tests rather than try to dynamically track port usage.
+// Note that container ports must be contained in the persistent port
+// range.
+const char* const container1Ports = "ports:[31000-31499]";
+const char* const container2Ports = "ports:[31500-32000]";
+// We define a validPort in the container1 assigned range which can
+// therefore accept incoming traffic.
+const int validPort = 31001;
+// We also define a port outside the persistent port range; containers
+// connecting to this port will never receive incoming traffic.
+const int invalidPort = 32502;
+static void cleanup(const string& eth0, const string& lo)
+  // Clean up the ingress qdisc on eth0 and lo if exists.
+  Try<bool> hostEth0ExistsQdisc = ingress::exists(eth0);
+  ASSERT_SOME(hostEth0ExistsQdisc);
+  if (hostEth0ExistsQdisc.get()) {
+    ASSERT_SOME_TRUE(ingress::remove(eth0));
+  }
+  Try<bool> hostLoExistsQdisc = ingress::exists(lo);
+  ASSERT_SOME(hostLoExistsQdisc);
+  if (hostLoExistsQdisc.get()) {
+    ASSERT_SOME_TRUE(ingress::remove(lo));
+  }
+  // Clean up all 'veth' devices if exist.
+  Try<set<string> > links = net::links();
+  ASSERT_SOME(links);
+  foreach (const string& name, links.get()) {
+    if (strings::startsWith(name, slave::PORT_MAPPING_VETH_PREFIX())) {
+      ASSERT_SOME_TRUE(link::remove(name));
+    }
+  }
+  Try<list<string> > entries = os::ls(slave::PORT_MAPPING_BIND_MOUNT_ROOT());
+  ASSERT_SOME(entries);
+  foreach (const string& file, entries.get()) {
+    string target = path::join(slave::PORT_MAPPING_BIND_MOUNT_ROOT(), file);
+    // NOTE: Here, we ignore the unmount errors because previous tests
+    // may have created the file and died before mounting.
+    if (!os::stat::islink(target)) {
+      mesos::internal::fs::unmount(target, MNT_DETACH);
+    }
+    // Remove the network namespace handle and the corresponding
+    // symlinks. The removal here is best effort.
+    os::rm(target);
+  }
+class PortMappingIsolatorTest : public TemporaryDirectoryTest
+  static void SetUpTestCase()
+  {
+    ASSERT_SOME(routing::check())
+      << "-------------------------------------------------------------\n"
+      << "We cannot run any PortMappingIsolatorTests because your\n"
+      << "libnl library is not new enough. You can either install a\n"
+      << "new libnl library, or disable this test case\n"
+      << "-------------------------------------------------------------";
+    ASSERT_SOME_EQ(0, os::shell(NULL, "which nc"))
+      << "-------------------------------------------------------------\n"
+      << "We cannot run any PortMappingIsolatorTests because 'nc'\n"
+      << "could not be found. You can either install 'nc', or disable\n"
+      << "this test case\n"
+      << "-------------------------------------------------------------";
+    ASSERT_SOME_EQ(0, os::shell(NULL, "which arping"))
+      << "-------------------------------------------------------------\n"
+      << "We cannot run some PortMappingIsolatorTests because 'arping'\n"
+      << "could not be found. You can either isntall 'arping', or\n"
+      << "disable this test case\n"
+      << "-------------------------------------------------------------";
+  }
+  PortMappingIsolatorTest() : hostIP(net::IP(INADDR_ANY)) {}
+  virtual void SetUp()
+  {
+    TemporaryDirectoryTest::SetUp();
+    flags = CreateSlaveFlags();
+    // Guess the name of the public interface.
+    Result<string> _eth0 = link::eth0();
+    ASSERT_SOME(_eth0) << "Failed to guess the name of the public interface";
+    eth0 = _eth0.get();
+    LOG(INFO) << "Using " << eth0 << " as the public interface";
+    // Guess the name of the loopback interface.
+    Result<string> _lo = link::lo();
+    ASSERT_SOME(_lo) << "Failed to guess the name of the loopback interface";
+    lo = _lo.get();
+    LOG(INFO) << "Using " << lo << " as the loopback interface";
+    // Clean up qdiscs and veth devices.
+    cleanup(eth0, lo);
+    // Get host IP address.
+    Result<net::IPNetwork> hostIPNetwork =
+        net::IPNetwork::fromLinkDevice(eth0, AF_INET);
+    CHECK_SOME(hostIPNetwork)
+      << "Failed to retrieve the host public IP network from " << eth0 << ": "
+      << hostIPNetwork.error();
+    hostIP = hostIPNetwork.get().address();
+    // Get all the external name servers for tests that need to talk
+    // to an external host, e.g., ping, DNS.
+    Try<string> read = os::read("/etc/resolv.conf");
+    CHECK_SOME(read);
+    foreach (const string& line, strings::split(read.get(), "\n")) {
+      if (!strings::startsWith(line, "nameserver")) {
+        continue;
+      }
+      vector<string> tokens = strings::split(line, " ");
+      ASSERT_EQ(2u, tokens.size()) << "Unexpected format in '/etc/resolv.conf'";
+      if (tokens[1] != "") {
+        nameServers.push_back(tokens[1]);
+      }
+    }
+    container1Ready = path::join(os::getcwd(), "container1_ready");
+    container2Ready = path::join(os::getcwd(), "container2_ready");
+    trafficViaLoopback = path::join(os::getcwd(), "traffic_via_loopback");
+    trafficViaPublic = path::join(os::getcwd(), "traffic_via_public");
+    exitStatus = path::join(os::getcwd(), "exit_status");
+  }
+  virtual void TearDown()
+  {
+    cleanup(eth0, lo);
+    TemporaryDirectoryTest::TearDown();
+  }
+  slave::Flags CreateSlaveFlags()
+  {
+    slave::Flags flags;
+    flags.launcher_dir = path::join(tests::flags.build_dir, "src");
+    flags.resources = strings::join(";", vector<string>({
+        containerCPU,
+        containerMemory,
+        containerDisk,
+        ephemeralPorts,
+        persistentPorts }));
+    // NOTE: '16' should be enough for all our tests.
+    flags.ephemeral_ports_per_container = 16;
+    flags.isolation = "network/port_mapping";
+    return flags;
+  }
+  Try<pid_t> launchHelper(
+      Launcher* launcher,
+      int pipes[2],
+      const ContainerID& containerId,
+      const string& command,
+      const Option<CommandInfo>& preparation)
+  {
+    CommandInfo commandInfo;
+    commandInfo.set_value(command);
+    // The flags to pass to the helper process.
+    MesosContainerizerLaunch::Flags launchFlags;
+    launchFlags.command = JSON::Protobuf(commandInfo);
+ = os::getcwd();
+    CHECK_SOME(os::user());
+    launchFlags.user = os::user().get();
+    launchFlags.pipe_read = pipes[0];
+    launchFlags.pipe_write = pipes[1];
+    JSON::Object commands;
+    JSON::Array array;
+    array.values.push_back(JSON::Protobuf(preparation.get()));
+    commands.values["commands"] = array;
+    launchFlags.commands = commands;
+    vector<string> argv(2);
+    argv[1] = MesosContainerizerLaunch::NAME;
+    Try<pid_t> pid = launcher->fork(
+        containerId,
+        path::join(flags.launcher_dir, MESOS_CONTAINERIZER),
+        argv,
+        Subprocess::FD(STDIN_FILENO),
+        Subprocess::FD(STDOUT_FILENO),
+        Subprocess::FD(STDERR_FILENO),
+        launchFlags,
+        None(),
+        None());
+    return pid;
+  }
+  Result<ResourceStatistics> statisticsHelper(
+      pid_t pid,
+      bool enable_summary,
+      bool enable_details)
+  {
+    // Retrieve the socket information from inside the container.
+    PortMappingStatistics statistics;
+ = pid;
+    statistics.flags.eth0_name = eth0;
+    statistics.flags.enable_socket_statistics_summary = enable_summary;
+    statistics.flags.enable_socket_statistics_details = enable_details;
+    vector<string> argv(2);
+    argv[0] = "mesos-network-helper";
+    argv[1] = PortMappingStatistics::NAME;
+    // We don't need STDIN; we need STDOUT for the result; we leave
+    // STDERR as is to log to slave process.
+    Try<Subprocess> s = subprocess(
+        path::join(flags.launcher_dir, "mesos-network-helper"),
+        argv,
+        Subprocess::PATH("/dev/null"),
+        Subprocess::PIPE(),
+        Subprocess::FD(STDERR_FILENO),
+        statistics.flags);
+    CHECK_SOME(s);
+    Future<Option<int> > status = s.get().status();
+    AWAIT_EXPECT_READY(status);
+    EXPECT_SOME_EQ(0, status.get());
+    Future<string> out = io::read(s.get().out().get());
+    Try<JSON::Object> object = JSON::parse<JSON::Object>(out.get());
+    CHECK_SOME(object);
+    return ::protobuf::parse<ResourceStatistics>(object.get());
+  }
+  slave::Flags flags;
+  // Name of the host eth0 and lo.
+  string eth0;
+  string lo;
+  // Host public IP network.
+  net::IP hostIP;
+  // All the external name servers as read from /etc/resolv.conf.
+  vector<string> nameServers;
+  // Some auxiliary files for the tests.
+  string container1Ready;
+  string container2Ready;
+  string trafficViaLoopback;
+  string trafficViaPublic;
+  string exitStatus;
+// Wait up to timeout seconds for a file to be created. If timeout is
+// zero, then wait indefinitely. Return true if file exists.
+// TODO(pbrett): Consider generalizing this function and moving it to
+// a common header.
+static bool waitForFileCreation(
+    const string& path,
+    const Duration& duration = Seconds(60))
+  Stopwatch timer;
+  timer.start();
+  while (!os::exists(path)) {
+    if ((duration > Duration::zero()) && (timer.elapsed() > duration))
+      break;
+    os::sleep(Milliseconds(50));
+  }
+  return os::exists(path);
+// This test uses two containers: one listens to 'validPort' and
+// 'invalidPort' and writes data received to files; the other
+// container attempts to connect to the previous container using
+// 'validPort' and 'invalidPort'. Verify that only the connection
+// through 'validPort' is successful by confirming that the expected
+// data has been written to its output file.
+TEST_F(PortMappingIsolatorTest, ROOT_NC_ContainerToContainerTCP)
+  Try<Isolator*> isolator = PortMappingIsolatorProcess::create(flags);
+  CHECK_SOME(isolator);
+  Try<Launcher*> launcher =
+    LinuxLauncher::create(flags, isolator.get()->namespaces().get());
+  CHECK_SOME(launcher);
+  // Set the executor's resources.
+  ExecutorInfo executorInfo;
+  executorInfo.mutable_resources()->CopyFrom(
+      Resources::parse(container1Ports).get());
+  ContainerID containerId1;
+  containerId1.set_value("container1");
+  // Use a relative temporary directory so it gets cleaned up
+  // automatically with the test.
+  Try<string> dir1 = os::mkdtemp(path::join(os::getcwd(), "XXXXXX"));
+  ASSERT_SOME(dir1);
+  Future<Option<CommandInfo> > preparation1 =
+    isolator.get()->prepare(
+        containerId1,
+        executorInfo,
+        dir1.get(),
+        None(),
+        None());
+  AWAIT_READY(preparation1);
+  ASSERT_SOME(preparation1.get());
+  ostringstream command1;
+  // Listen to 'localhost' and 'port'.
+  command1 << "nc -l localhost " << validPort << " > " << trafficViaLoopback
+           << "& ";
+  // Listen to 'public ip' and 'port'.
+  command1 << "nc -l " << hostIP << " " << validPort << " > "
+           << trafficViaPublic << "& ";
+  // Listen to 'invalidPort'.  This should not receive any data.
+  command1 << "nc -l " << invalidPort << " | tee " << trafficViaLoopback << " "
+           << trafficViaPublic << "& ";
+  // Touch the guard file.
+  command1 << "touch " << container1Ready;
+  int pipes[2];
+  ASSERT_NE(-1, ::pipe(pipes));
+  Try<pid_t> pid = launchHelper(
+      launcher.get(),
+      pipes,
+      containerId1,
+      command1.str(),
+      preparation1.get());
+  ASSERT_SOME(pid);
+  // Reap the forked child.
+  Future<Option<int> > status1 = process::reap(pid.get());
+  // Continue in the parent.
+  ::close(pipes[0]);
+  // Isolate the forked child.
+  AWAIT_READY(isolator.get()->isolate(containerId1, pid.get()));
+  // Now signal the child to continue.
+  char dummy;
+  ASSERT_LT(0, ::write(pipes[1], &dummy, sizeof(dummy)));
+  ::close(pipes[1]);
+  // Wait for the command to start.
+  ASSERT_TRUE(waitForFileCreation(container1Ready));
+  ContainerID containerId2;
+  containerId2.set_value("container2");
+  executorInfo.mutable_resources()->CopyFrom(
+      Resources::parse(container2Ports).get());
+  // Use a relative temporary directory so it gets cleaned up
+  // automatically with the test.
+  Try<string> dir2 = os::mkdtemp(path::join(os::getcwd(), "XXXXXX"));
+  ASSERT_SOME(dir2);
+  Future<Option<CommandInfo> > preparation2 =
+    isolator.get()->prepare(
+        containerId2,
+        executorInfo,
+        dir2.get(),
+        None(),
+        None());
+  AWAIT_READY(preparation2);
+  ASSERT_SOME(preparation2.get());
+  ostringstream command2;
+  // Send to 'localhost' and 'port'.
+  command2 << "printf hello1 | nc localhost " << validPort << ";";
+  // Send to 'localhost' and 'invalidPort'. This should fail.
+  command2 << "printf hello2 | nc localhost " << invalidPort << ";";
+  // Send to 'public IP' and 'port'.
+  command2 << "printf hello3 | nc " << hostIP << " " << validPort << ";";
+  // Send to 'public IP' and 'invalidPort'. This should fail.
+  command2 << "printf hello4 | nc " << hostIP << " " << invalidPort << ";";
+  // Touch the guard file.
+  command2 << "touch " << container2Ready;
+  ASSERT_NE(-1, ::pipe(pipes));
+  pid = launchHelper(
+      launcher.get(),
+      pipes,
+      containerId2,
+      command2.str(),
+      preparation2.get());
+  ASSERT_SOME(pid);
+  // Reap the forked child.
+  Future<Option<int> > status2 = process::reap(pid.get());
+  // Continue in the parent.
+  ::close(pipes[0]);
+  // Isolate the forked child.
+  AWAIT_READY(isolator.get()->isolate(containerId2, pid.get()));
+  // Now signal the child to continue.
+  ASSERT_LT(0, ::write(pipes[1], &dummy, sizeof(dummy)));
+  ::close(pipes[1]);
+  // Wait for the command to start.
+  ASSERT_TRUE(waitForFileCreation(container2Ready));
+  // Wait for the command to complete.
+  AWAIT_READY(status1);
+  AWAIT_READY(status2);
+  EXPECT_SOME_EQ("hello1", os::read(trafficViaLoopback));
+  EXPECT_SOME_EQ("hello3", os::read(trafficViaPublic));
+  // Ensure all processes are killed.
+  AWAIT_READY(launcher.get()->destroy(containerId1));
+  AWAIT_READY(launcher.get()->destroy(containerId2));
+  // Let the isolator clean up.
+  AWAIT_READY(isolator.get()->cleanup(containerId1));
+  AWAIT_READY(isolator.get()->cleanup(containerId2));
+  delete isolator.get();
+  delete launcher.get();
+// The same container-to-container test but with UDP.
+TEST_F(PortMappingIsolatorTest, ROOT_NC_ContainerToContainerUDP)
+  Try<Isolator*> isolator = PortMappingIsolatorProcess::create(flags);
+  CHECK_SOME(isolator);
+  Try<Launcher*> launcher =
+    LinuxLauncher::create(flags, isolator.get()->namespaces().get());
+  CHECK_SOME(launcher);
+  // Set the executor's resources.
+  ExecutorInfo executorInfo;
+  executorInfo.mutable_resources()->CopyFrom(
+      Resources::parse(container1Ports).get());
+  ContainerID containerId1;
+  containerId1.set_value("container1");
+  // Use a relative temporary directory so it gets cleaned up
+  // automatically with the test.
+  Try<string> dir1 = os::mkdtemp(path::join(os::getcwd(), "XXXXXX"));
+  ASSERT_SOME(dir1);
+  Future<Option<CommandInfo> > preparation1 =
+    isolator.get()->prepare(
+        containerId1,
+        executorInfo,
+        dir1.get(),
+        None(),
+        None());
+  AWAIT_READY(preparation1);
+  ASSERT_SOME(preparation1.get());
+  ostringstream command1;
+  // Listen to 'localhost' and 'port'.
+  command1 << "nc -u -l localhost " << validPort << " > " << trafficViaLoopback
+           << "& ";
+  // Listen to 'public ip' and 'port'.
+  command1 << "nc -u -l " << hostIP << " " << validPort << " > "
+           << trafficViaPublic << "& ";
+  // Listen to 'invalidPort'. This should not receive anything.
+  command1 << "nc -u -l " << invalidPort << " | tee " << trafficViaLoopback
+           << " " << trafficViaPublic << "& ";
+  // Touch the guard file.
+  command1 << "touch " << container1Ready;
+  int pipes[2];
+  ASSERT_NE(-1, ::pipe(pipes));
+  Try<pid_t> pid = launchHelper(
+      launcher.get(),
+      pipes,
+      containerId1,
+      command1.str(),
+      preparation1.get());
+  ASSERT_SOME(pid);
+  // Reap the forked child.
+  Future<Option<int> > status1 = process::reap(pid.get());
+  // Continue in the parent.
+  ::close(pipes[0]);
+  // Isolate the forked child.
+  AWAIT_READY(isolator.get()->isolate(containerId1, pid.get()));
+  // Now signal the child to continue.
+  char dummy;
+  ASSERT_LT(0, ::write(pipes[1], &dummy, sizeof(dummy)));
+  ::close(pipes[1]);
+  // Wait for the command to start.
+  ASSERT_TRUE(waitForFileCreation(container1Ready));
+  ContainerID containerId2;
+  containerId2.set_value("container2");
+  executorInfo.mutable_resources()->CopyFrom(
+      Resources::parse(container2Ports).get());
+  // Use a relative temporary directory so it gets cleaned up
+  // automatically with the test.
+  Try<string> dir2 = os::mkdtemp(path::join(os::getcwd(), "XXXXXX"));
+  ASSERT_SOME(dir2);
+  Future<Option<CommandInfo> > preparation2 =
+    isolator.get()->prepare(
+        containerId2,
+        executorInfo,
+        dir2.get(),
+        None(),
+        None());
+  AWAIT_READY(preparation2);
+  ASSERT_SOME(preparation2.get());
+  ostringstream command2;
+  // Send to 'localhost' and 'port'.
+  command2 << "printf hello1 | nc -w1 -u localhost " << validPort << ";";
+  // Send to 'localhost' and 'invalidPort'. No data should be sent.
+  command2 << "printf hello2 | nc -w1 -u localhost " << invalidPort << ";";
+  // Send to 'public IP' and 'port'.
+  command2 << "printf hello3 | nc -w1 -u " << hostIP << " " << validPort << ";";
+  // Send to 'public IP' and 'invalidPort'. No data should be sent.
+  command2 << "printf hello4 | nc -w1 -u " << hostIP << " " << invalidPort
+           << ";";
+  // Touch the guard file.
+  command2 << "touch " << container2Ready;
+  ASSERT_NE(-1, ::pipe(pipes));
+  pid = launchHelper(
+      launcher.get(),
+      pipes,
+      containerId2,
+      command2.str(),
+      preparation2.get());
+  ASSERT_SOME(pid);
+  // Reap the forked child.
+  Future<Option<int> > status2 = process::reap(pid.get());
+  // Continue in the parent.
+  ::close(pipes[0]);
+  // Isolate the forked child.
+  AWAIT_READY(isolator.get()->isolate(containerId2, pid.get()));
+  // Now signal the child to continue.
+  ASSERT_LT(0, ::write(pipes[1], &dummy, sizeof(dummy)));
+  ::close(pipes[1]);
+  // Wait for the command to start.
+  ASSERT_TRUE(waitForFileCreation(container2Ready));
+  // Wait for the command to complete.
+  AWAIT_READY(status1);
+  AWAIT_READY(status2);
+  EXPECT_SOME_EQ("hello1", os::read(trafficViaLoopback));
+  EXPECT_SOME_EQ("hello3", os::read(trafficViaPublic));
+  AWAIT_READY(launcher.get()->destroy(containerId1));
+  AWAIT_READY(launcher.get()->destroy(containerId2));
+  // Let the isolator clean up.
+  AWAIT_READY(isolator.get()->cleanup(containerId1));
+  AWAIT_READY(isolator.get()->cleanup(containerId2));
+  delete isolator.get();
+  delete launcher.get();
+// Test the scenario where a UDP server is in a container while host
+// tries to establish a UDP connection.
+TEST_F(PortMappingIsolatorTest, ROOT_NC_HostToContainerUDP)
+  Try<Isolator*> isolator = PortMappingIsolatorProcess::create(flags);
+  CHECK_SOME(isolator);
+  Try<Launcher*> launcher =
+    LinuxLauncher::create(flags, isolator.get()->namespaces().get());
+  CHECK_SOME(launcher);
+  // Set the executor's resources.
+  ExecutorInfo executorInfo;
+  executorInfo.mutable_resources()->CopyFrom(
+      Resources::parse(container1Ports).get());
+  ContainerID containerId;
+  containerId.set_value("container1");
+  // Use a relative temporary directory so it gets cleaned up
+  // automatically with the test.
+  Try<string> dir = os::mkdtemp(path::join(os::getcwd(), "XXXXXX"));
+  ASSERT_SOME(dir);
+  Future<Option<CommandInfo> > preparation1 =
+    isolator.get()->prepare(
+        containerId,
+        executorInfo,
+        dir.get(),
+        None(),
+        None());
+  AWAIT_READY(preparation1);
+  ASSERT_SOME(preparation1.get());
+  ostringstream command1;
+  // Listen to 'localhost' and 'Port'.
+  command1 << "nc -u -l localhost " << validPort << " > " << trafficViaLoopback
+           << "&";
+  // Listen to 'public IP' and 'Port'.
+  command1 << "nc -u -l " << hostIP << " " << validPort << " > "
+           << trafficViaPublic << "&";
+  // Listen to 'public IP' and 'invalidPort'. This should not receive anything.
+  command1 << "nc -u -l " << invalidPort << " | tee " << trafficViaLoopback
+           << " " << trafficViaPublic << "&";
+  // Touch the guard file.
+  command1 << "touch " << container1Ready;
+  int pipes[2];
+  ASSERT_NE(-1, ::pipe(pipes));
+  Try<pid_t> pid = launchHelper(
+      launcher.get(),
+      pipes,
+      containerId,
+      command1.str(),
+      preparation1.get());
+  ASSERT_SOME(pid);
+  // Reap the forked child.
+  Future<Option<int> > status = process::reap(pid.get());
+  // Continue in the parent.
+  ::close(pipes[0]);
+  // Isolate the forked child.
+  AWAIT_READY(isolator.get()->isolate(containerId, pid.get()));
+  // Now signal the child to continue.
+  char dummy;
+  ASSERT_LT(0, ::write(pipes[1], &dummy, sizeof(dummy)));
+  ::close(pipes[1]);
+  // Wait for the command to start.
+  ASSERT_TRUE(waitForFileCreation(container1Ready));
+  // Send to 'localhost' and 'port'.
+  ostringstream command2;
+  command2 << "printf hello1 | nc -w1 -u localhost " << validPort;
+  ASSERT_SOME_EQ(0, os::shell(NULL, command2.str().c_str()));
+  // Send to 'localhost' and 'invalidPort'. The command should return
+  // successfully because UDP is stateless but no data could be sent.
+  ostringstream command3;
+  command3 << "printf hello2 | nc -w1 -u localhost " << invalidPort;
+  ASSERT_SOME_EQ(0, os::shell(NULL, command3.str().c_str()));
+  // Send to 'public IP' and 'port'.
+  ostringstream command4;
+  command4 << "printf hello3 | nc -w1 -u " << hostIP << " " << validPort;
+  ASSERT_SOME_EQ(0, os::shell(NULL, command4.str().c_str()));
+  // Send to 'public IP' and 'invalidPort'. The command should return
+  // successfully because UDP is stateless but no data could be sent.
+  ostringstream command5;
+  command5 << "printf hello4 | nc -w1 -u " << hostIP << " " << invalidPort;
+  ASSERT_SOME_EQ(0, os::shell(NULL, command5.str().c_str()));
+  EXPECT_SOME_EQ("hello1", os::read(trafficViaLoopback));
+  EXPECT_SOME_EQ("hello3", os::read(trafficViaPublic));
+  // Ensure all processes are killed.
+  AWAIT_READY(launcher.get()->destroy(containerId));
+  // Let the isolator clean up.
+  AWAIT_READY(isolator.get()->cleanup(containerId));
+  delete isolator.get();
+  delete launcher.get();
+// Test the scenario where a TCP server is in a container while host
+// tries to establish a TCP connection.
+TEST_F(PortMappingIsolatorTest, ROOT_NC_HostToContainerTCP)
+  Try<Isolator*> isolator = PortMappingIsolatorProcess::create(flags);
+  CHECK_SOME(isolator);
+  Try<Launcher*> launcher =
+    LinuxLauncher::create(flags, isolator.get()->namespaces().get());
+  CHECK_SOME(launcher);
+  // Set the executor's resources.
+  ExecutorInfo executorInfo;
+  executorInfo.mutable_resources()->CopyFrom(
+      Resources::parse(container1Ports).get());
+  ContainerID containerId;
+  containerId.set_value("container1");
+  // Use a relative temporary directory so it gets cleaned up
+  // automatically with the test.
+  Try<string> dir = os::mkdtemp(path::join(os::getcwd(), "XXXXXX"));
+  ASSERT_SOME(dir);
+  Future<Option<CommandInfo> > preparation1 =
+    isolator.get()->prepare(
+        containerId,
+        executorInfo,
+        dir.get(),
+        None(),
+        None());
+  AWAIT_READY(preparation1);
+  ASSERT_SOME(preparation1.get());
+  ostringstream command1;
+  // Listen to 'localhost' and 'Port'.
+  command1 << "nc -l localhost " << validPort << " > " << trafficViaLoopback
+           << "&";
+  // Listen to 'public IP' and 'Port'.
+  command1 << "nc -l " << hostIP << " " << validPort << " > "
+           << trafficViaPublic << "&";
+  // Listen to 'public IP' and 'invalidPort'. This should fail.
+  command1 << "nc -l " << invalidPort << " | tee " << trafficViaLoopback << " "
+           << trafficViaPublic << "&";
+  // Touch the guard file.
+  command1 << "touch " << container1Ready;
+  int pipes[2];
+  ASSERT_NE(-1, ::pipe(pipes));
+  Try<pid_t> pid = launchHelper(
+      launcher.get(),
+      pipes,
+      containerId,
+      command1.str(),
+      preparation1.get());
+  ASSERT_SOME(pid);
+  // Reap the forked child.
+  Future<Option<int> > status = process::reap(pid.get());
+  // Continue in the parent.
+  ::close(pipes[0]);
+  // Isolate the forked child.
+  AWAIT_READY(isolator.get()->isolate(containerId, pid.get()));
+  // Now signal the child to continue.
+  char dummy;
+  ASSERT_LT(0, ::write(pipes[1], &dummy, sizeof(dummy)));
+  ::close(pipes[1]);
+  // Wait for the command to start.
+  ASSERT_TRUE(waitForFileCreation(container1Ready));
+  // Send to 'localhost' and 'port'.
+  ostringstream command2;
+  command2 << "printf hello1 | nc localhost " << validPort;
+  ASSERT_SOME_EQ(0, os::shell(NULL, command2.str().c_str()));
+  // Send to 'localhost' and 'invalidPort'. This should fail because TCP
+  // connection couldn't be established..
+  ostringstream command3;
+  command3 << "printf hello2 | nc localhost " << invalidPort;
+  ASSERT_SOME_EQ(256, os::shell(NULL, command3.str().c_str()));
+  // Send to 'public IP' and 'port'.
+  ostringstream command4;
+  command4 << "printf hello3 | nc " << hostIP << " " << validPort;
+  ASSERT_SOME_EQ(0, os::shell(NULL, command4.str().c_str()));
+  // Send to 'public IP' and 'invalidPort'. This should fail because TCP
+  // connection couldn't be established.
+  ostringstream command5;
+  command5 << "printf hello4 | nc " << hostIP << " " << invalidPort;
+  ASSERT_SOME_EQ(256, os::shell(NULL, command5.str().c_str()));
+  EXPECT_SOME_EQ("hello1", os::read(trafficViaLoopback));
+  EXPECT_SOME_EQ("hello3", os::read(trafficViaPublic));
+  // Ensure all processes are killed.
+  AWAIT_READY(launcher.get()->destroy(containerId));
+  // Let the isolator clean up.
+  AWAIT_READY(isolator.get()->cleanup(containerId));
+  delete isolator.get();
+  delete launcher.get();
+// Test the scenario where a container issues ICMP requests to
+// external hosts.
+TEST_F(PortMappingIsolatorTest, ROOT_ContainerICMPExternal)
+  // TODO(chzhcn): Even though this is unlikely, consider a better
+  // way to get external servers.
+  ASSERT_FALSE(nameServers.empty())
+    << "-------------------------------------------------------------\n"
+    << "We cannot run some PortMappingIsolatorTests because we could\n"
+    << "not find any external name servers in /etc/resolv.conf.\n"
+    << "-------------------------------------------------------------";
+  Try<Isolator*> isolator = PortMappingIsolatorProcess::create(flags);
+  CHECK_SOME(isolator);
+  Try<Launcher*> launcher =
+    LinuxLauncher::create(flags, isolator.get()->namespaces().get());
+  CHECK_SOME(launcher);
+  // Set the executor's resources.
+  ExecutorInfo executorInfo;
+  executorInfo.mutable_resources()->CopyFrom(
+      Resources::parse(container1Ports).get());
+  ContainerID containerId;
+  containerId.set_value("container1");
+  // Use a relative temporary directory so it gets cleaned up
+  // automatically with the test.
+  Try<string> dir = os::mkdtemp(path::join(os::getcwd(), "XXXXXX"));
+  ASSERT_SOME(dir);
+  Future<Option<CommandInfo> > preparation1 =
+    isolator.get()->prepare(
+        containerId,
+        executorInfo,
+        dir.get(),
+        None(),
+        None());
+  AWAIT_READY(preparation1);
+  ASSERT_SOME(preparation1.get());
+  ostringstream command1;
+  for (unsigned int i = 0; i < nameServers.size(); i++) {
+    const string& IP = nameServers[i];
+    command1 << "ping -c1 " << IP;
+    if (i + 1 < nameServers.size()) {
+      command1 << " && ";
+    }
+  }
+  command1 << "; printf $? > " << exitStatus << "; sync";
+  int pipes[2];
+  ASSERT_NE(-1, ::pipe(pipes));
+  Try<pid_t> pid = launchHelper(
+      launcher.get(),
+      pipes,
+      containerId,
+      command1.str(),
+      preparation1.get());
+  ASSERT_SOME(pid);
+  // Reap the forked child.
+  Future<Option<int> > status = process::reap(pid.get());
+  // Continue in the parent.
+  ::close(pipes[0]);
+  // Isolate the forked child.
+  AWAIT_READY(isolator.get()->isolate(containerId, pid.get()));
+  // Now signal the child to continue.
+  char dummy;
+  ASSERT_LT(0, ::write(pipes[1], &dummy, sizeof(dummy)));
+  ::close(pipes[1]);
+  // Wait for the command to complete.
+  AWAIT_READY(status);
+  EXPECT_SOME_EQ("0", os::read(exitStatus));
+  // Ensure all processes are killed.
+  AWAIT_READY(launcher.get()->destroy(containerId));
+  // Let the isolator clean up.
+  AWAIT_READY(isolator.get()->cleanup(containerId));
+  delete isolator.get();
+  delete launcher.get();
+// Test the scenario where a container issues ICMP requests to itself.
+TEST_F(PortMappingIsolatorTest, ROOT_ContainerICMPInternal)
+  Try<Isolator*> isolator = PortMappingIsolatorProcess::create(flags);
+  CHECK_SOME(isolator);
+  Try<Launcher*> launcher =
+    LinuxLauncher::create(flags, isolator.get()->namespaces().get());
+  CHECK_SOME(launcher);
+  // Set the executor's resources.
+  ExecutorInfo executorInfo;
+  executorInfo.mutable_resources()->CopyFrom(
+      Resources::parse(container1Ports).get());
+  ContainerID containerId;
+  containerId.set_value("container1");
+  // Use a relative temporary directory so it gets cleaned up
+  // automatically with the test.
+  Try<string> dir = os::mkdtemp(path::join(os::getcwd(), "XXXXXX"));
+  ASSERT_SOME(dir);
+  Future<Option<CommandInfo> > preparation1 =
+    isolator.get()->prepare(
+        containerId,
+        executorInfo,
+        dir.get(),
+        None(),
+        None());
+  AWAIT_READY(preparation1);
+  ASSERT_SOME(preparation1.get());
+  ostringstream command1;
+  command1 << "ping -c1 && ping -c1 " << hostIP
+           << "; printf $? > " << exitStatus << "; sync";
+  int pipes[2];
+  ASSERT_NE(-1, ::pipe(pipes));
+  Try<pid_t> pid = launchHelper(
+      launcher.get(),
+      pipes,
+      containerId,
+      command1.str(),
+      preparation1.get());
+  ASSERT_SOME(pid);
+  // Reap the forked child.
+  Future<Option<int> > status = process::reap(pid.get());
+  // Continue in the parent.
+  ::close(pipes[0]);
+  // Isolate the forked child.
+  AWAIT_READY(isolator.get()->isolate(containerId, pid.get()));
+  // Now signal the child to continue.
+  char dummy;
+  ASSERT_LT(0, ::write(pipes[1], &dummy, sizeof(dummy)));
+  ::close(pipes[1]);
+  // Wait for the command to complete.
+  AWAIT_READY(status);
+  EXPECT_SOME_EQ("0", os::read(exitStatus));
+  // Ensure all processes are killed.
+  AWAIT_READY(launcher.get()->destroy(containerId));
+  // Let the isolator clean up.
+  AWAIT_READY(isolator.get()->cleanup(containerId));
+  delete isolator.get();
+  delete launcher.get();
+// Test the scenario where a container issues ARP requests to
+// external hosts.
+TEST_F(PortMappingIsolatorTest, ROOT_ContainerARPExternal)
+  // TODO(chzhcn): Even though this is unlikely, consider a better
+  // way to get external servers.
+  ASSERT_FALSE(nameServers.empty())
+    << "-------------------------------------------------------------\n"
+    << "We cannot run some PortMappingIsolatorTests because we could\n"
+    << "not find any external name servers in /etc/resolv.conf.\n"
+    << "-------------------------------------------------------------";
+  Try<Isolator*> isolator = PortMappingIsolatorProcess::create(flags);
+  CHECK_SOME(isolator);
+  Try<Launcher*> launcher =
+    LinuxLauncher::create(flags, isolator.get()->namespaces().get());
+  CHECK_SOME(launcher);
+  // Set the executor's resources.
+  ExecutorInfo executorInfo;
+  executorInfo.mutable_resources()->CopyFrom(
+      Resources::parse(container1Ports).get());
+  ContainerID containerId;
+  containerId.set_value("container1");
+  // Use a relative temporary directory so it gets cleaned up
+  // automatically with the test.
+  Try<string> dir = os::mkdtemp(path::join(os::getcwd(), "XXXXXX"));
+  ASSERT_SOME(dir);
+  Future<Option<CommandInfo> > preparation1 =
+    isolator.get()->prepare(
+        containerId,
+        executorInfo,
+        dir.get(),
+        None(),
+        None());
+  AWAIT_READY(preparation1);
+  ASSERT_SOME(preparation1.get());
+  ostringstream command1;
+  for (unsigned int i = 0; i < nameServers.size(); i++) {
+    const string& IP = nameServers[i];
+    // Time out after 1s and terminate upon receiving the first reply.
+    command1 << "arping -f -w1 " << IP << " -I " << eth0;
+    if (i + 1 < nameServers.size()) {
+      command1 << " && ";
+    }
+  }
+  command1 << "; printf $? > " << exitStatus << "; sync";
+  int pipes[2];
+  ASSERT_NE(-1, ::pipe(pipes));
+  Try<pid_t> pid = launchHelper(
+      launcher.get(),
+      pipes,
+      containerId,
+      command1.str(),
+      preparation1.get());
+  ASSERT_SOME(pid);
+  // Reap the forked child.
+  Future<Option<int> > status = process::reap(pid.get());
+  // Continue in the parent.
+  ::close(pipes[0]);
+  // Isolate the forked child.
+  AWAIT_READY(isolator.get()->isolate(containerId, pid.get()));
+  // Now signal the child to continue.
+  char dummy;
+  ASSERT_LT(0, ::write(pipes[1], &dummy, sizeof(dummy)));
+  ::close(pipes[1]);
+  // Wait for the command to complete.
+  AWAIT_READY(status);
+  EXPECT_SOME_EQ("0", os::read(exitStatus));
+  // Ensure all processes are killed.
+  AWAIT_READY(launcher.get()->destroy(containerId));
+  // Let the isolator clean up.
+  AWAIT_READY(isolator.get()->cleanup(containerId));
+  delete isolator.get();
+  delete launcher.get();
+// Test DNS connectivity.
+TEST_F(PortMappingIsolatorTest, ROOT_DNS)
+  // TODO(chzhcn): Even though this is unlikely, consider a better
+  // way to get external servers.
+  ASSERT_FALSE(nameServers.empty())
+    << "-------------------------------------------------------------\n"
+    << "We cannot run some PortMappingIsolatorTests because we could\n"
+    << "not find any external name servers in /etc/resolv.conf.\n"
+    << "-------------------------------------------------------------";
+  Try<Isolator*> isolator = PortMappingIsolatorProcess::create(flags);
+  CHECK_SOME(isolator);
+  Try<Launcher*> launcher =
+    LinuxLauncher::create(flags, isolator.get()->namespaces().get());
+  CHECK_SOME(launcher);
+  // Set the executor's resources.
+  ExecutorInfo executorInfo;
+  executorInfo.mutable_resources()->CopyFrom(
+      Resources::parse(container1Ports).get());
+  ContainerID containerId;
+  containerId.set_value("container1");
+  // Use a relative temporary directory so it gets cleaned up
+  // automatically with the test.
+  Try<string> dir = os::mkdtemp(path::join(os::getcwd(), "XXXXXX"));
+  ASSERT_SOME(dir);
+  Future<Option<CommandInfo> > preparation1 =
+    isolator.get()->prepare(
+        containerId,
+        executorInfo,
+        dir.get(),
+        None(),
+        None());
+  AWAIT_READY(preparation1);
+  ASSERT_SOME(preparation1.get());
+  ostringstream command1;
+  for (unsigned int i = 0; i < nameServers.size(); i++) {
+    const string& IP = nameServers[i];
+    command1 << "host " << IP;
+    if (i + 1 < nameServers.size()) {
+      command1 << " && ";
+    }
+  }
+  command1 << "; printf $? > " << exitStatus << "; sync";
+  int pipes[2];
+  ASSERT_NE(-1, ::pipe(pipes));
+  Try<pid_t> pid = launchHelper(
+      launcher.get(),
+      pipes,
+      containerId,
+      command1.str(),
+      preparation1.get());
+  ASSERT_SOME(pid);
+  // Reap the forked child.
+  Future<Option<int> > status = process::reap(pid.get());
+  // Continue in the parent.
+  ::close(pipes[0]);
+  // Isolate the forked child.
+  AWAIT_READY(isolator.get()->isolate(containerId, pid.get()));
+  // Now signal the child to continue.
+  char dummy;
+  ASSERT_LT(0, ::write(pipes[1], &dummy, sizeof(dummy)));
+  ::close(pipes[1]);
+  // Wait for the command to complete.
+  AWAIT_READY(status);
+  EXPECT_SOME_EQ("0", os::read(exitStatus));
+  // Ensure all processes are killed.
+  AWAIT_READY(launcher.get()->destroy(containerId));
+  // Let the isolator clean up.
+  AWAIT_READY(isolator.get()->cleanup(containerId));
+  delete isolator.get();
+  delete launcher.get();
+// Test the scenario where a container has run out of ephemeral ports
+// to use.
+TEST_F(PortMappingIsolatorTest, ROOT_TooManyContainers)
+  // Increase the ephemeral ports per container so that we dont have
+  // enough ephemeral ports to launch a second container.
+  flags.ephemeral_ports_per_container = 512;
+  Try<Isolator*> isolator = PortMappingIsolatorProcess::create(flags);
+  CHECK_SOME(isolator);
+  Try<Launcher*> launcher =
+    LinuxLauncher::create(flags, isolator.get()->namespaces().get());
+  CHECK_SOME(launcher);
+  // Set the executor's resources.
+  ExecutorInfo executorInfo;
+  executorInfo.mutable_resources()->CopyFrom(
+      Resources::parse(container1Ports).get());
+  ContainerID containerId1;
+  containerId1.set_value("container1");
+  // Use a relative temporary directory so it gets cleaned up
+  // automatically with the test.
+  Try<string> dir1 = os::mkdtemp(path::join(os::getcwd(), "XXXXXX"));
+  ASSERT_SOME(dir1);
+  Future<Option<CommandInfo> > preparation1 =
+    isolator.get()->prepare(
+        containerId1,
+        executorInfo,
+        dir1.get(),
+        None(),
+        None());
+  AWAIT_READY(preparation1);
+  ASSERT_SOME(preparation1.get());
+  ostringstream command1;
+  command1 << "sleep 1000";
+  int pipes[2];
+  ASSERT_NE(-1, ::pipe(pipes));
+  Try<pid_t> pid = launchHelper(
+      launcher.get(),
+      pipes,
+      containerId1,
+      command1.str(),
+      preparation1.get());
+  ASSERT_SOME(pid);
+  // Reap the forked child.
+  Future<Option<int> > status1 = process::reap(pid.get());
+  // Continue in the parent.
+  ::close(pipes[0]);
+  // Isolate the forked child.
+  AWAIT_READY(isolator.get()->isolate(containerId1, pid.get()));
+  // Now signal the child to continue.
+  char dummy;
+  ASSERT_LT(0, ::write(pipes[1], &dummy, sizeof(dummy)));
+  ::close(pipes[1]);
+  ContainerID containerId2;
+  containerId2.set_value("container2");
+  executorInfo.mutable_resources()->CopyFrom(
+      Resources::parse(container2Ports).get());
+  // Use a relative temporary directory so it gets cleaned up
+  // automatically with the test.
+  Try<string> dir2 = os::mkdtemp(path::join(os::getcwd(), "XXXXXX"));
+  ASSERT_SOME(dir2);
+  Future<Option<CommandInfo> > preparation2 =
+    isolator.get()->prepare(
+        containerId2,
+        executorInfo,
+        dir2.get(),
+        None(),
+        None());
+  AWAIT_FAILED(preparation2);
+  // Ensure all processes are killed.
+  AWAIT_READY(launcher.get()->destroy(containerId1));
+  // Let the isolator clean up.
+  AWAIT_READY(isolator.get()->cleanup(containerId1));
+  delete isolator.get();
+  delete launcher.get();
+// Test the scenario where PortMappingIsolator uses a very small
+// egress rate limit.
+TEST_F(PortMappingIsolatorTest, ROOT_NC_SmallEgressLimit)
+  // Note that the underlying rate limiting mechanism usually has a
+  // small allowance for burst. Empirically, as least 10x of the rate
+  // limit amount of data is required to make sure the burst is an
+  // insignificant factor of the transmission time.
+  // To-be-tested egress rate limit, in Bytes/s.
+  const Bytes rate = 2000;
+  // Size of the data to send, in Bytes.
+  const Bytes size = 20480;
+  // Use a very small egress limit.
+  flags.egress_rate_limit_per_container = rate;
+  Try<Isolator*> isolator = PortMappingIsolatorProcess::create(flags);
+  CHECK_SOME(isolator);
+  Try<Launcher*> launcher =
+    LinuxLauncher::create(flags, isolator.get()->namespaces().get());
+  CHECK_SOME(launcher);
+  // Open an nc server on the host side. Note that 'invalidPort' is in
+  // neither 'ports' nor 'ephemeral_ports', which makes it a good port
+  // to use on the host.
+  ostringstream command1;
+  command1 << "nc -l localhost " << invalidPort << " > /devnull";
+  Try<Subprocess> s = subprocess(command1.str().c_str());
+  // Set the executor's resources.
+  ExecutorInfo executorInfo;
+  executorInfo.mutable_resources()->CopyFrom(
+      Resources::parse(container1Ports).get());
+  ContainerID containerId;
+  containerId.set_value("container1");
+  // Use a relative temporary directory so it gets cleaned up
+  // automatically with the test.
+  Try<string> dir = os::mkdtemp(path::join(os::getcwd(), "XXXXXX"));
+  ASSERT_SOME(dir);
+  Future<Option<CommandInfo> > preparation1 =
+    isolator.get()->prepare(
+        containerId,
+        executorInfo,
+        dir.get(),
+        None(),
+        None());
+  AWAIT_READY(preparation1);
+  ASSERT_SOME(preparation1.get());
+  // Fill 'size' bytes of data. The actual content does not matter.
+  string data(size.bytes(), 'a');
+  ostringstream command2;
+  const string transmissionTime = path::join(os::getcwd(), "transmission_time");
+  command2 << "echo 'Sending " << size.bytes()
+           << " bytes of data under egress rate limit " << rate.bytes()
+           << "Bytes/s...';";
+  command2 << "{ time -p echo " << data  << " | nc localhost "
+           << invalidPort << " ; } 2> " << transmissionTime << " && ";
+  // Touch the guard file.
+  command2 << "touch " << container1Ready;
+  int pipes[2];
+  ASSERT_NE(-1, ::pipe(pipes));
+  Try<pid_t> pid = launchHelper(
+      launcher.get(),
+      pipes,
+      containerId,
+      command2.str(),
+      preparation1.get());
+  ASSERT_SOME(pid);
+  // Reap the forked child.
+  Future<Option<int> > reap = process::reap(pid.get());
+  // Continue in the parent.
+  ::close(pipes[0]);
+  // Isolate the forked child.
+  AWAIT_READY(isolator.get()->isolate(containerId, pid.get()));
+  // Now signal the child to continue.
+  char dummy;
+  ASSERT_LT(0, ::write(pipes[1], &dummy, sizeof(dummy)));
+  ::close(pipes[1]);
+  // Wait for the command to finish.
+  ASSERT_TRUE(waitForFileCreation(container1Ready));
+  Try<string> read = os::read(transmissionTime);
+  CHECK_SOME(read);
+  // Get the real elapsed time from `time` output. Sample output:
+  // real 12.37
+  // user 0.00
+  // sys 0.00
+  vector<string> lines = strings::split(strings::trim(read.get()), "\n");
+  ASSERT_EQ(3u, lines.size());
+  vector<string> split = strings::split(lines[0], " ");
+  ASSERT_EQ(2u, split.size());
+  Try<float> time = numify<float>(split[1]);
+  ASSERT_SOME(time);
+  ASSERT_GT(time.get(), (size.bytes() / rate.bytes()));
+  // Make sure the nc server exits normally.
+  Future<Option<int> > status = s.get().status();
+  AWAIT_READY(status);
+  EXPECT_SOME_EQ(0, status.get());
+  // Ensure all processes are killed.
+  AWAIT_READY(launcher.get()->destroy(containerId));
+  // Let the isolator clean up.
+  AWAIT_READY(isolator.get()->cleanup(containerId));
+  delete isolator.get();
+  delete launcher.get();
+bool HasTCPSocketsCount(const ResourceStatistics& statistics)
+  return statistics.has_net_tcp_active_connections() &&
+    statistics.has_net_tcp_time_wait_connections();
+bool HasTCPSocketsRTT(const ResourceStatistics& statistics)
+  // We either have all of the following metrics or we have nothing.
+  if (statistics.has_net_tcp_rtt_microsecs_p50() &&
+      statistics.has_net_tcp_rtt_microsecs_p90() &&
+      statistics.has_net_tcp_rtt_microsecs_p95() &&
+      statistics.has_net_tcp_rtt_microsecs_p99()) {
+    return true;
+  } else {
+    return false;
+  }
+// Test that RTT can be returned properly from usage(). This test is
+// very similar to SmallEgressLimitTest in its setup.
+TEST_F(PortMappingIsolatorTest, ROOT_NC_PortMappingStatistics)
+  // To-be-tested egress rate limit, in Bytes/s.
+  const Bytes rate = 2000;
+  // Size of the data to send, in Bytes.
+  const Bytes size = 20480;
+  // Use a very small egress limit.
+  flags.egress_rate_limit_per_container = rate;
+  flags.network_enable_socket_statistics_summary = true;
+  flags.network_enable_socket_statistics_details = true;
+  Try<Isolator*> isolator = PortMappingIsolatorProcess::create(flags);
+  CHECK_SOME(isolator);
+  Try<Launcher*> launcher =
+    LinuxLauncher::create(flags, isolator.get()->namespaces().get());
+  CHECK_SOME(launcher);
+  // Open an nc server on the host side. Note that 'invalidPort' is
+  // in neither 'ports' nor 'ephemeral_ports', which makes it a good
+  // port to use on the host. We use this host's public IP because
+  // connections to the localhost IP are filtered out when retrieving
+  // the RTT information inside containers.
+  ostringstream command1;
+  command1 << "nc -l " << hostIP << " " << invalidPort << " > /devnull";
+  Try<Subprocess> s = subprocess(command1.str().c_str());
+  // Set the executor's resources.
+  ExecutorInfo executorInfo;
+  executorInfo.mutable_resources()->CopyFrom(
+      Resources::parse(container1Ports).get());
+  ContainerID containerId;
+  containerId.set_value("container1");
+  // Use a relative temporary directory so it gets cleaned up
+  // automatically with the test.
+  Try<string> dir1 = os::mkdtemp(path::join(os::getcwd(), "XXXXXX"));
+  ASSERT_SOME(dir1);
+  Future<Option<CommandInfo> > preparation1 =
+    isolator.get()->prepare(
+        containerId,
+        executorInfo,
+        dir1.get(),
+        None(),
+        None());
+  AWAIT_READY(preparation1);
+  ASSERT_SOME(preparation1.get());
+  // Fill 'size' bytes of data. The actual content does not matter.
+  string data(size.bytes(), 'a');
+  ostringstream command2;
+  const string transmissionTime = path::join(os::getcwd(), "transmission_time");
+  command2 << "echo 'Sending " << size.bytes()
+           << " bytes of data under egress rate limit " << rate.bytes()
+           << "Bytes/s...';";
+  command2 << "{ time -p echo " << data  << " | nc " << hostIP << " "
+           << invalidPort << " ; } 2> " << transmissionTime << " && ";
+  // Touch the guard file.
+  command2 << "touch " << container1Ready;
+  int pipes[2];
+  ASSERT_NE(-1, ::pipe(pipes));
+  Try<pid_t> pid = launchHelper(
+      launcher.get(),
+      pipes,
+      containerId,
+      command2.str(),
+      preparation1.get());
+  ASSERT_SOME(pid);
+  // Reap the forked child.
+  Future<Option<int> > reap = process::reap(pid.get());
+  // Continue in the parent.
+  ::close(pipes[0]);
+  // Isolate the forked child.
+  AWAIT_READY(isolator.get()->isolate(containerId, pid.get()));
+  // Now signal the child to continue.
+  char dummy;
+  ASSERT_LT(0, ::write(pipes[1], &dummy, sizeof(dummy)));
+  ::close(pipes[1]);
+  // Test that RTT can be returned while transmission is going. It is
+  // possible that the first few statistics returned don't have a RTT
+  // value because it takes a few round-trips to actually establish a
+  // tcp connection and start sending data. Nevertheless, we should
+  // see a meaningful result well within seconds.
+  Duration waited = Duration::zero();
+  do {
+    os::sleep(Milliseconds(200));
+    waited += Milliseconds(200);
+    // Do an end-to-end test by calling `usage`.
+    Future<ResourceStatistics> usage = isolator.get()->usage(containerId);
+    AWAIT_READY(usage);
+    if (usage.get().has_net_tcp_rtt_microsecs_p50() &&
+        usage.get().has_net_tcp_active_connections()) {
+      EXPECT_GT(usage.get().net_tcp_active_connections(), 0);
+      break;
+    }
+  } while (waited < Seconds(5));
+  ASSERT_LT(waited, Seconds(5));
+  // While the connection is still active, try out different flag
+  // combinations.
+  Result<ResourceStatistics> statistics =
+      statisticsHelper(pid.get(), true, true);
+  ASSERT_SOME(statistics);
+  EXPECT_TRUE(HasTCPSocketsCount(statistics.get()));
+  EXPECT_TRUE(HasTCPSocketsRTT(statistics.get()));
+  statistics = statisticsHelper(pid.get(), true, false);
+  ASSERT_SOME(statistics);
+  EXPECT_TRUE(HasTCPSocketsCount(statistics.get()));
+  EXPECT_FALSE(HasTCPSocketsRTT(statistics.get()));
+  statistics = statisticsHelper(pid.get(), false, true);
+  ASSERT_SOME(statistics);
+  EXPECT_FALSE(HasTCPSocketsCount(statistics.get()));
+  EXPECT_TRUE(HasTCPSocketsRTT(statistics.get()));
+  statistics = statisticsHelper(pid.get(), false, false);
+  ASSERT_SOME(statistics);
+  EXPECT_FALSE(HasTCPSocketsCount(statistics.get()));
+  EXPECT_FALSE(HasTCPSocketsRTT(statistics.get()));
+  // Wait for the command to finish.
+  ASSERT_TRUE(waitForFileCreation(container1Ready));
+  // Make sure the nc server exits normally.
+  Future<Option<int> > status = s.get().status();
+  AWAIT_READY(status);
+  EXPECT_SOME_EQ(0, status.get());
+  // Ensure all processes are killed.
+  AWAIT_READY(launcher.get()->destroy(containerId));
+  // Let the isolator clean up.
+  AWAIT_READY(isolator.get()->cleanup(containerId));
+  delete isolator.get();
+  delete launcher.get();
+class PortMappingMesosTest : public ContainerizerTest<MesosContainerizer>
+  virtual void SetUp()
+  {
+    ContainerizerTest<MesosContainerizer>::SetUp();
+    // Guess the name of the public interface.
+    Result<string> _eth0 = link::eth0();
+    ASSERT_SOME(_eth0) << "Failed to guess the name of the public interface";
+    eth0 = _eth0.get();
+    LOG(INFO) << "Using " << eth0 << " as the public interface";
+    // Guess the name of the loopback interface.
+    Result<string> _lo = link::lo();
+    ASSERT_SOME(_lo) << "Failed to guess the name of the loopback interface";
+    lo = _lo.get();
+    LOG(INFO) << "Using " << lo << " as the loopback interface";
+    cleanup(eth0, lo);
+  }
+  virtual void TearDown()
+  {
+    cleanup(eth0, lo);
+    ContainerizerTest<MesosContainerizer>::TearDown();
+  }
+  Fetcher fetcher;
+  // Name of the host eth0 and lo.
+  string eth0;
+  string lo;
+// Test the scenario where the network isolator is asked to recover
+// both types of containers: containers that were previously managed
+// by network isolator, and containers that weren't.
+TEST_F(PortMappingMesosTest, CGROUPS_ROOT_RecoverMixedContainers)
+  master::Flags masterFlags = CreateMasterFlags();
+  Try<PID<Master>> master = StartMaster(masterFlags);
+  ASSERT_SOME(master);
+  // Start the first slave without the network isolator.
+  slave::Flags slaveFlags = CreateSlaveFlags();
+  // NOTE: This is to make sure that we use the linux launcher which
+  // is consistent with the launchers we use for other containerizers
+  // we create in this test. Also, this will bypass MESOS-2554.
+  slaveFlags.isolation = "cgroups/cpu,cgroups/mem";
+  Try<MesosContainerizer*> containerizer1 =
+    MesosContainerizer::create(slaveFlags, true, &fetcher);
+  ASSERT_SOME(containerizer1);
+  Try<PID<Slave> > slave = StartSlave(containerizer1.get(), slaveFlags);
+  ASSERT_SOME(slave);
+  MockScheduler sched;
+  // Enable checkpointing for the framework.
+  FrameworkInfo frameworkInfo = DEFAULT_FRAMEWORK_INFO;
+  frameworkInfo.set_checkpoint(true);
+  MesosSchedulerDriver driver(
+      &sched, frameworkInfo, master.get(), DEFAULT_CREDENTIAL);
+  EXPECT_CALL(sched, registered(_, _, _));
+  Filters filters;
+  filters.set_refuse_seconds(0);
+  // NOTE: We set filter explicitly here so that the resources will
+  // not be filtered for 5 seconds (by default).
+  Future<vector<Offer> > offers1;
+  EXPECT_CALL(sched, resourceOffers(_, _))
+    .WillOnce(FutureArg<1>(&offers1))
+    .WillRepeatedly(DeclineOffers(filters));      // Ignore subsequent offers.
+  driver.start();
+  AWAIT_READY(offers1);
+  EXPECT_NE(0u, offers1.get().size());
+  Offer offer1 = offers1.get()[0];
+  // Start a long running task without using the network isolator.
+  TaskInfo task1 = createTask(
+      offer1.slave_id(),
+      Resources::parse("cpus:1;mem:512").get(),
+      "sleep 1000");
+  EXPECT_CALL(sched, statusUpdate(_, _));
+  Future<Nothing> _statusUpdateAcknowledgement1 =
+    FUTURE_DISPATCH(_, &Slave::_statusUpdateAcknowledgement);
+  driver.launchTasks(offers1.get()[0].id(), {task1}, filters);
+  // Wait for the ACK to be checkpointed.
+  AWAIT_READY(_statusUpdateAcknowledgement1);
+  Stop(slave.get());
+  delete containerizer1.get();
+  Future<Nothing> _recover1 = FUTURE_DISPATCH(_, &Slave::_recover);
+  Future<SlaveReregisteredMessage> slaveReregisteredMessage1 =
+    FUTURE_PROTOBUF(SlaveReregisteredMessage(), _, _);
+  Future<vector<Offer> > offers2;
+  EXPECT_CALL(sched, resourceOffers(_, _))
+    .WillOnce(FutureArg<1>(&offers2))
+    .WillRepeatedly(DeclineOffers(filters));      // Ignore subsequent offers.
+  // Restart the slave with the network isolator.
+  slaveFlags.isolation += ",network/port_mapping";
+  Try<MesosContainerizer*> containerizer2 =
+    MesosContainerizer::create(slaveFlags, true, &fetcher);
+  ASSERT_SOME(containerizer2);
+  slave = StartSlave(containerizer2.get(), slaveFlags);
+  ASSERT_SOME(slave);
+  Clock::pause();
+  AWAIT_READY(_recover1);
+  Clock::settle(); // Wait for slave to schedule reregister timeout.
+  AWAIT_READY(slaveReregisteredMessage1);
+  Clock::settle(); // Make sure an allocation is scheduled.
+  Clock::advance(masterFlags.allocation_interval);
+  Clock::resume();
+  AWAIT_READY(offers2);
+  EXPECT_NE(0u, offers2.get().size());
+  Offer offer2 = offers2.get()[0];
+  // Start a long running task using the network isolator.
+  TaskInfo task2 = createTask(offer2, "sleep 1000");
+  EXPECT_CALL(sched, statusUpdate(_, _));
+  Future<Nothing> _statusUpdateAcknowledgement2 =
+    FUTURE_DISPATCH(_, &Slave::_statusUpdateAcknowledgement);
+  driver.launchTasks(offers2.get()[0].id(), {task2});
+  // Wait for the ACK to be checkpointed.
+  AWAIT_READY(_statusUpdateAcknowledgement2);
+  Stop(slave.get());
+  delete containerizer2.get();
+  Future<Nothing> _recover2 = FUTURE_DISPATCH(_, &Slave::_recover);
+  Future<SlaveReregisteredMessage> slaveReregisteredMessage2 =
+    FUTURE_PROTOBUF(SlaveReregisteredMessage(), _, _);
+  // Restart the slave with the network isolator. This is to verify
+  // the slave recovery case where one task is running with the
+  // network isolator and another task is running without it.
+  Try<MesosContainerizer*> containerizer3 =
+    MesosContainerizer::create(slaveFlags, true, &fetcher);
+  ASSERT_SOME(containerizer3);
+  slave = StartSlave(containerizer3.get(), slaveFlags);
+  ASSERT_SOME(slave);
+  Clock::pause();
+  AWAIT_READY(_recover2);
+  Clock::settle(); // Wait for slave to schedule reregister timeout.
+  AWAIT_READY(slaveReregisteredMessage2);
+  Clock::resume();
+  // Ensure that both containers (with and without network isolation)
+  // were recovered.
+  Future<hashset<ContainerID>> containers = containerizer3.get()->containers();
+  AWAIT_READY(containers);
+  EXPECT_EQ(2u, containers.get().size());
+  foreach (const ContainerID& containerId, containers.get()) {
+    // Do some basic checks to make sure the network isolator can
+    // handle mixed types of containers correctly.
+    Future<ResourceStatistics> usage = containerizer3.get()->usage(containerId);
+    AWAIT_READY(usage);
+    // TODO(chzhcn): Write a more thorough test for update.
+  }
+  driver.stop();
+  driver.join();
+  Shutdown();
+  delete containerizer3.get();
+// Test that all configurations (tc filters etc) is cleaned up for an
+// orphaned container using the network isolator.
+TEST_F(PortMappingMesosTest, CGROUPS_ROOT_CleanUpOrphan)
+  Try<PID<Master> > master = StartMaster();
+  ASSERT_SOME(master);
+  slave::Flags flags = CreateSlaveFlags();
+  // NOTE: We add 'cgroups/cpu,cgroups/mem' to bypass MESOS-2554.
+  flags.isolation = "cgroups/cpu,cgroups/mem,network/port_mapping";
+  Try<PID<Slave> > slave = StartSlave(flags);
+  ASSERT_SOME(slave);
+  MockScheduler sched;
+  // Enable checkpointing for the framework.
+  FrameworkInfo frameworkInfo = DEFAULT_FRAMEWORK_INFO;
+  frameworkInfo.set_checkpoint(true);
+  MesosSchedulerDriver driver(
+      &sched, frameworkInfo, master.get(), DEFAULT_CREDENTIAL);
+  Future<FrameworkID> frameworkId;
+  EXPECT_CALL(sched, registered(_, _, _))
+    .WillOnce(FutureArg<1>(&frameworkId));
+  Future<vector<Offer> > offers;
+  EXPECT_CALL(sched, resourceOffers(_, _))
+    .WillOnce(FutureArg<1>(&offers))
+    .WillRepeatedly(DeclineOffers());      // Ignore subsequent offers.
+  driver.start();
+  AWAIT_READY(offers);
+  EXPECT_NE(0u, offers.get().size());
+  // Start a long running task using network islator.
+  TaskInfo task = createTask(offers.get()[0], "sleep 1000");
+  EXPECT_CALL(sched, statusUpdate(_, _));
+  Future<Nothing> _statusUpdateAcknowledgement =
+    FUTURE_DISPATCH(_, &Slave::_statusUpdateAcknowledgement);
+  driver.launchTasks(offers.get()[0].id(), {task});
+  // Wait for the ACK to be checkpointed.
+  AWAIT_READY(_statusUpdateAcknowledgement);
+  Stop(slave.get());
+  // Wipe the slave meta directory so that the slave will treat the
+  // above running task as an orphan.
+  ASSERT_SOME(os::rmdir(paths::getMetaRootDir(flags.work_dir)));
+  Future<SlaveRegisteredMessage> slaveRegisteredMessage =
+    FUTURE_PROTOBUF(SlaveRegisteredMessage(), _, _);
+  Future<Nothing> orphansDestroyed =
+    FUTURE_DISPATCH(_, &MesosContainerizerProcess::___recover);
+  // Restart the slave.
+  slave = StartSlave(flags);
+  ASSERT_SOME(slave);
+  AWAIT_READY(slaveRegisteredMessage);
+  AWAIT_READY(orphansDestroyed);
+  // Expect that qdiscs still exist on eth0 and lo but with no filters.
+  Try<bool> hostEth0ExistsQdisc = ingress::exists(eth0);
+  EXPECT_SOME_TRUE(hostEth0ExistsQdisc);
+  Try<bool> hostLoExistsQdisc = ingress::exists(lo);
+  EXPECT_SOME_TRUE(hostLoExistsQdisc);
+  Result<vector<ip::Classifier> > classifiers =
+    ip::classifiers(eth0, ingress::HANDLE);
+  EXPECT_SOME(classifiers);
+  EXPECT_EQ(0u, classifiers.get().size());
+  classifiers = ip::classifiers(lo, ingress::HANDLE);
+  EXPECT_SOME(classifiers);
+  EXPECT_EQ(0u, classifiers.get().size());
+  // Expect no 'veth' devices.
+  Try<set<string> > links = net::links();
+  ASSERT_SOME(links);
+  foreach (const string& name, links.get()) {
+    EXPECT_FALSE(strings::startsWith(name, slave::PORT_MAPPING_VETH_PREFIX()));
+  }
+  // Expect no files in bind mount directory.
+  Try<list<string> > files = os::ls(slave::PORT_MAPPING_BIND_MOUNT_ROOT());
+  ASSERT_SOME(files);
+  EXPECT_EQ(0u, files.get().size());
+  driver.stop();
+  driver.join();
+  Shutdown();
+// This test verfies the creation and destroy of the network namespace
+// handle symlink. The symlink was introduced in 0.23.0.
+TEST_F(PortMappingMesosTest, ROOT_NetworkNamespaceHandleSymlink)
+  Try<PID<Master> > master = StartMaster();
+  ASSERT_SOME(master);
+  slave::Flags flags = CreateSlaveFlags();
+  flags.isolation = "network/port_mapping";
+  Try<MesosContainerizer*> containerizer =
+    MesosContainerizer::create(flags, true, &fetcher);
+  ASSERT_SOME(containerizer);
+  Try<PID<Slave>> slave = StartSlave(containerizer.get(), flags);
+  ASSERT_SOME(slave);
+  MockScheduler sched;
+  MesosSchedulerDriver driver(
+  EXPECT_CALL(sched, registered(_, _, _));
+  Future<vector<Offer> > offers;
+  EXPECT_CALL(sched, resourceOffers(_, _))
+    .WillOnce(FutureArg<1>(&offers))
+    .WillRepeatedly(DeclineOffers());      // Ignore subsequent offers.
+  driver.start();
+  AWAIT_READY(offers);
+  EXPECT_NE(0u, offers.get().size());
+  // Start a long running task using network islator.
+  TaskInfo task = createTask(offers.get()[0], "sleep 1000");
+  Future<TaskStatus> status1;
+  Future<TaskStatus> status2;
+  EXPECT_CALL(sched, statusUpdate(&driver, _))
+    .WillOnce(FutureArg<1>(&status1))
+    .WillOnce(FutureArg<1>(&status2))
+    .WillRepeatedly(Return());       // Ignore subsequent updates.
+  driver.launchTasks(offers.get()[0].id(), {task});
+  AWAIT_READY(status1);
+  EXPECT_EQ(task.task_id(), status1.get().task_id());
+  EXPECT_EQ(TASK_RUNNING, status1.get().state());
+  Future<hashset<ContainerID>> containers = containerizer.get()->containers();
+  AWAIT_READY(containers);
+  ASSERT_EQ(1u, containers.get().size());
+  ContainerID containerId = *(containers.get().begin());
+  const string symlink = path::join(
+      stringify(containerId));
+  EXPECT_TRUE(os::exists(symlink));
+  EXPECT_TRUE(os::stat::islink(symlink));
+  Future<containerizer::Termination> termination =
+    containerizer.get()->wait(containerId);
+  driver.killTask(task.task_id());
+  AWAIT_READY(status2);
+  EXPECT_EQ(task.task_id(), status2.get().task_id());
+  EXPECT_EQ(TASK_KILLED, status2.get().state());
+  AWAIT_READY(termination);
+  EXPECT_FALSE(os::exists(symlink));
+  driver.stop();
+  driver.join();
+  Shutdown();
+  delete containerizer.get();
+// This test verfies that the isolator is able to recover a mix of
+// known and unkonwn orphans. This is used to capture the regression
+// described in MESOS-2914.
+TEST_F(PortMappingMesosTest, CGROUPS_ROOT_RecoverMixedKnownAndUnKnownOrphans)
+  Try<PID<Master>> master = StartMaster(CreateMasterFlags());
+  ASSERT_SOME(master);
+  slave::Flags flags = CreateSlaveFlags();
+  flags.isolation = "network/port_mapping";
+  Try<MesosContainerizer*> containerizer =
+    MesosContainerizer::create(flags, true, &fetcher);
+  ASSERT_SOME(containerizer);
+  Try<PID<Slave> > slave = StartSlave(containerizer.get(), flags);
+  ASSERT_SOME(slave);
+  MockScheduler sched;
+  FrameworkInfo frameworkInfo = DEFAULT_FRAMEWORK_INFO;
+  frameworkInfo.set_checkpoint(true);
+  MesosSchedulerDriver driver(
+      &sched, frameworkInfo, master.get(), DEFAULT_CREDENTIAL);
+  EXPECT_CALL(sched, registered(_, _, _));
+  Future<vector<Offer> > offers;
+  EXPECT_CALL(sched, resourceOffers(_, _))
+    .WillOnce(FutureArg<1>(&offers))
+    .WillRepeatedly(DeclineOffers());      // Ignore subsequent offers.
+  driver.start();
+  AWAIT_READY(offers);
+  EXPECT_NE(0u, offers.get().size());
+  Offer offer = offers.get()[0];
+  TaskInfo task1 = createTask(
+      offer.slave_id(),
+      Resources::parse("cpus:1;mem:64").get(),
+      "sleep 1000");
+  TaskInfo task2 = createTask(
+      offer.slave_id(),
+      Resources::parse("cpus:1;mem:64").get(),
+      "sleep 1000");
+  Future<TaskStatus> status1;
+  Future<TaskStatus> status2;
+  EXPECT_CALL(sched, statusUpdate(&driver, _))
+    .WillOnce(FutureArg<1>(&status1))
+    .WillOnce(FutureArg<1>(&status2))
+    .WillRepeatedly(Return());       // Ignore subsequent updates.
+  driver.launchTasks(offers.get()[0].id(), {task1, task2});
+  AWAIT_READY(status1);
+  ASSERT_EQ(TASK_RUNNING, status1.get().state());
+  AWAIT_READY(status2);
+  ASSERT_EQ(TASK_RUNNING, status2.get().state());
+  // Obtain the container IDs.
+  Future<hashset<ContainerID>> containers = containerizer.get()->containers();
+  AWAIT_READY(containers);
+  ASSERT_EQ(2u, containers.get().size());
+  Stop(slave.get());
+  delete containerizer.get();
+  // Wipe the slave meta directory so that the slave will treat the
+  // above running tasks as orphans.
+  ASSERT_SOME(os::rmdir(paths::getMetaRootDir(flags.work_dir)));
+  // Remove the network namespace symlink for one container so that it
+  // becomes an unknown orphan.
+  const ContainerID containerId = *(containers.get().begin());
+  const string symlink = path::join(
+      stringify(containerId));
+  ASSERT_TRUE(os::exists(symlink));
+  ASSERT_TRUE(os::stat::islink(symlink));
+  ASSERT_SOME(os::rm(symlink));
+  Future<SlaveRegisteredMessage> slaveRegisteredMessage =
+    FUTURE_PROTOBUF(SlaveRegisteredMessage(), _, _);
+  Future<Nothing> knownOrphansDestroyed =
+    FUTURE_DISPATCH(_, &MesosContainerizerProcess::___recover);
+  // Restart the slave.
+  slave = StartSlave(flags);
+  ASSERT_SOME(slave);
+  AWAIT_READY(slaveRegisteredMessage);
+  AWAIT_READY(knownOrphansDestroyed);
+  // We settle the clock here to ensure that the processing of
+  // 'MesosContainerizerProcess::___destroy()' is complete and the
+  // metric is updated.
+  Clock::pause();
+  Clock::settle();
+  Clock::resume();
+  JSON::Object metrics = Metrics();
+      0u,
+      metrics.values["containerizer/mesos/container_destroy_errors"]);
+} // namespace tests {
+} // namespace internal {
+} // namespace mesos {