You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@mesos.apache.org by ji...@apache.org on 2014/06/25 23:37:49 UTC

[7/7] git commit: Refactored the mesos containerizer launcher to fix MESOS-1404.

Refactored the mesos containerizer launcher to fix MESOS-1404.

Review: https://reviews.apache.org/r/22852


Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/d0046dca
Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/d0046dca
Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/d0046dca

Branch: refs/heads/master
Commit: d0046dca732ac4c1636ef85384bda3092ff3fa4f
Parents: ae3c8e2
Author: Jie Yu <yu...@gmail.com>
Authored: Fri Jun 20 21:40:37 2014 -0700
Committer: Jie Yu <yu...@gmail.com>
Committed: Wed Jun 25 14:36:12 2014 -0700

----------------------------------------------------------------------
 src/Makefile.am                                 |   21 +-
 src/slave/containerizer/containerizer.cpp       |    3 +-
 src/slave/containerizer/launcher.cpp            |   89 +-
 src/slave/containerizer/launcher.hpp            |   46 +-
 src/slave/containerizer/linux_launcher.cpp      |  199 +--
 src/slave/containerizer/linux_launcher.hpp      |   17 +-
 src/slave/containerizer/mesos/containerizer.cpp | 1048 ++++++++++++++++
 src/slave/containerizer/mesos/containerizer.hpp |  240 ++++
 src/slave/containerizer/mesos/launch.cpp        |  211 ++++
 src/slave/containerizer/mesos/launch.hpp        |   60 +
 src/slave/containerizer/mesos/main.cpp          |   34 +
 src/slave/containerizer/mesos_containerizer.cpp | 1174 ------------------
 src/slave/containerizer/mesos_containerizer.hpp |  242 ----
 src/tests/cgroups_isolator_tests.cpp            |    2 +-
 src/tests/containerizer_tests.cpp               |    3 +-
 src/tests/isolator_tests.cpp                    |  170 ++-
 src/tests/master_tests.cpp                      |    2 +-
 src/tests/mesos.cpp                             |    3 +-
 src/tests/mesos.hpp                             |    6 +-
 src/tests/slave_tests.cpp                       |    3 +-
 20 files changed, 1958 insertions(+), 1615 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/mesos/blob/d0046dca/src/Makefile.am
----------------------------------------------------------------------
diff --git a/src/Makefile.am b/src/Makefile.am
index a49a3fe..fb3af9d 100644
--- a/src/Makefile.am
+++ b/src/Makefile.am
@@ -247,10 +247,11 @@ libmesos_no_3rdparty_la_SOURCES =					\
 	slave/slave.cpp							\
 	slave/http.cpp							\
 	slave/containerizer/containerizer.cpp				\
+	slave/containerizer/external_containerizer.cpp			\
 	slave/containerizer/isolator.cpp				\
 	slave/containerizer/launcher.cpp				\
-	slave/containerizer/mesos_containerizer.cpp			\
-	slave/containerizer/external_containerizer.cpp			\
+	slave/containerizer/mesos/containerizer.cpp			\
+	slave/containerizer/mesos/launch.cpp				\
 	slave/status_update_manager.cpp					\
 	exec/exec.cpp							\
 	common/lock.cpp							\
@@ -365,16 +366,17 @@ libmesos_no_3rdparty_la_SOURCES +=					\
 	master/registrar.hpp						\
 	master/master.hpp master/sorter.hpp				\
 	messages/messages.hpp slave/constants.hpp			\
-	slave/containerizer/linux_launcher.hpp				\
 	slave/containerizer/containerizer.hpp				\
+	slave/containerizer/external_containerizer.hpp			\
 	slave/containerizer/isolator.hpp				\
+	slave/containerizer/launcher.hpp				\
+	slave/containerizer/linux_launcher.hpp				\
+	slave/containerizer/mesos/containerizer.hpp			\
+	slave/containerizer/mesos/launch.hpp				\
+	slave/containerizer/isolators/posix.hpp				\
 	slave/containerizer/isolators/cgroups/cpushare.hpp		\
 	slave/containerizer/isolators/cgroups/mem.hpp			\
 	slave/containerizer/isolators/cgroups/perf_event.hpp		\
-	slave/containerizer/isolators/posix.hpp				\
-	slave/containerizer/launcher.hpp				\
-	slave/containerizer/mesos_containerizer.hpp			\
-	slave/containerizer/external_containerizer.hpp			\
 	slave/flags.hpp slave/gc.hpp slave/monitor.hpp			\
 	slave/paths.hpp slave/state.hpp					\
 	slave/status_update_manager.hpp					\
@@ -562,6 +564,11 @@ mesos_executor_SOURCES = launcher/executor.cpp
 mesos_executor_CPPFLAGS = $(MESOS_CPPFLAGS)
 mesos_executor_LDADD = libmesos.la
 
+pkglibexec_PROGRAMS += mesos-containerizer
+mesos_containerizer_SOURCES = slave/containerizer/mesos/main.cpp
+mesos_containerizer_CPPFLAGS = $(MESOS_CPPFLAGS)
+mesos_containerizer_LDADD = libmesos.la
+
 pkglibexec_PROGRAMS += mesos-health-check
 mesos_health_check_SOURCES = health-check/main.cpp
 mesos_health_check_CPPFLAGS = $(MESOS_CPPFLAGS)

http://git-wip-us.apache.org/repos/asf/mesos/blob/d0046dca/src/slave/containerizer/containerizer.cpp
----------------------------------------------------------------------
diff --git a/src/slave/containerizer/containerizer.cpp b/src/slave/containerizer/containerizer.cpp
index 922ae17..1b71f33 100644
--- a/src/slave/containerizer/containerizer.cpp
+++ b/src/slave/containerizer/containerizer.cpp
@@ -37,9 +37,10 @@
 #include "slave/containerizer/containerizer.hpp"
 #include "slave/containerizer/isolator.hpp"
 #include "slave/containerizer/launcher.hpp"
-#include "slave/containerizer/mesos_containerizer.hpp"
 #include "slave/containerizer/external_containerizer.hpp"
 
+#include "slave/containerizer/mesos/containerizer.hpp"
+
 using std::map;
 using std::string;
 using std::vector;

http://git-wip-us.apache.org/repos/asf/mesos/blob/d0046dca/src/slave/containerizer/launcher.cpp
----------------------------------------------------------------------
diff --git a/src/slave/containerizer/launcher.cpp b/src/slave/containerizer/launcher.cpp
index 2b13d1f..92c0657 100644
--- a/src/slave/containerizer/launcher.cpp
+++ b/src/slave/containerizer/launcher.cpp
@@ -16,8 +16,6 @@
  * limitations under the License.
  */
 
-#include <unistd.h>
-
 #include <process/collect.hpp>
 #include <process/delay.hpp>
 #include <process/process.hpp>
@@ -32,7 +30,9 @@
 using namespace process;
 
 using std::list;
+using std::map;
 using std::string;
+using std::vector;
 
 namespace mesos {
 namespace internal {
@@ -63,11 +63,12 @@ Future<Nothing> PosixLauncher::recover(const list<RunState>& states)
     pid_t pid = state.forkedPid.get();
 
     if (pids.containsValue(pid)) {
-      // This should (almost) never occur. There is the possibility that a new
-      // executor is launched with the same pid as one that just exited (highly
-      // unlikely) and the slave dies after the new executor is launched but
-      // before it hears about the termination of the earlier executor (also
-      // unlikely). Regardless, the launcher can't do anything sensible so this
+      // This should (almost) never occur. There is the possibility
+      // that a new executor is launched with the same pid as one that
+      // just exited (highly unlikely) and the slave dies after the
+      // new executor is launched but before it hears about the
+      // termination of the earlier executor (also unlikely).
+      // Regardless, the launcher can't do anything sensible so this
       // is considered an error.
       return Failure("Detected duplicate pid " + stringify(pid) +
                      " for container " + stringify(containerId));
@@ -80,46 +81,66 @@ Future<Nothing> PosixLauncher::recover(const list<RunState>& states)
 }
 
 
+// The setup function in child before the exec.
+static int childSetup(const Option<lambda::function<int()> >& setup)
+{
+  // POSIX guarantees a forked child's pid does not match any existing
+  // process group id so only a single setsid() is required and the
+  // session id will be the pid.
+  // TODO(idownes): perror is not listed as async-signal-safe and
+  // should be reimplemented safely.
+  // TODO(jieyu): Move this logic to the subprocess (i.e.,
+  // mesos-containerizer launch).
+  if (::setsid() == -1) {
+    perror("Failed to put child in a new session");
+    _exit(1);
+  }
+
+  if (setup.isSome()) {
+    return setup.get()();
+  }
+
+  return 0;
+}
+
+
 Try<pid_t> PosixLauncher::fork(
     const ContainerID& containerId,
-    const lambda::function<int()>& childFunction)
+    const string& path,
+    const vector<string>& argv,
+    const Subprocess::IO& in,
+    const Subprocess::IO& out,
+    const Subprocess::IO& err,
+    const Option<flags::FlagsBase>& flags,
+    const Option<map<string, string> >& environment,
+    const Option<lambda::function<int()> >& setup)
 {
   if (pids.contains(containerId)) {
     return Error("Process has already been forked for container " +
                  stringify(containerId));
   }
 
-  pid_t pid;
-
-  if ((pid = ::fork()) == -1) {
-    return ErrnoError("Failed to fork");
-  }
-
-  if (pid == 0) {
-    // In child.
-    // POSIX guarantees a forked child's pid does not match any existing
-    // process group id so only a single setsid() is required and the session
-    // id will be the pid.
-    // TODO(idownes): perror is not listed as async-signal-safe and should be
-    // reimplemented safely.
-    if (setsid() == -1) {
-      perror("Failed to put child in a new session");
-      _exit(1);
-    }
-
-    // This function should exec() and therefore not return.
-    childFunction();
-
-    ABORT("Child failed to exec");
+  Try<Subprocess> child = subprocess(
+      path,
+      argv,
+      in,
+      out,
+      err,
+      flags,
+      environment,
+      lambda::bind(&childSetup, setup));
+
+  if (child.isError()) {
+    return Error("Failed to fork a child process: " + child.error());
   }
 
-  // parent.
-  LOG(INFO) << "Forked child with pid '" << pid
+  LOG(INFO) << "Forked child with pid '" << child.get().pid()
             << "' for container '" << containerId << "'";
+
   // Store the pid (session id and process group id).
-  pids.put(containerId, pid);
+  pids.put(containerId, child.get().pid());
 
-  return pid;
+  return child.get().pid();
 }
 
 

http://git-wip-us.apache.org/repos/asf/mesos/blob/d0046dca/src/slave/containerizer/launcher.hpp
----------------------------------------------------------------------
diff --git a/src/slave/containerizer/launcher.hpp b/src/slave/containerizer/launcher.hpp
index 835d7a9..18b3546 100644
--- a/src/slave/containerizer/launcher.hpp
+++ b/src/slave/containerizer/launcher.hpp
@@ -20,10 +20,15 @@
 #define __LAUNCHER_HPP__
 
 #include <list>
+#include <map>
+#include <string>
 
 #include <process/future.hpp>
+#include <process/subprocess.hpp>
 
+#include <stout/flags.hpp>
 #include <stout/lambda.hpp>
+#include <stout/option.hpp>
 #include <stout/try.hpp>
 
 #include "slave/flags.hpp"
@@ -42,23 +47,33 @@ public:
   virtual process::Future<Nothing> recover(
       const std::list<state::RunState>& states) = 0;
 
-  // Fork a new process in the containerized context. The child will call the
-  // specified function and the parent will return the child's pid.
-  // NOTE: The function must be async-signal safe and should exec as soon as
-  // possible.
+  // Fork a new process in the containerized context. The child will
+  // exec the binary at the given path with the given argv, flags and
+  // environment. The I/O of the child will be redirected according to
+  // the specified I/O descriptors. The user can provide a 'setup'
+  // function which will be invoked in the child process right before
+  // the exec. The 'setup' function has to be async signal safe. The
+  // parent will return the child's pid if the fork is successful.
   virtual Try<pid_t> fork(
       const ContainerID& containerId,
-      const lambda::function<int()>& childFunction) = 0;
+      const std::string& path,
+      const std::vector<std::string>& argv,
+      const process::Subprocess::IO& in,
+      const process::Subprocess::IO& out,
+      const process::Subprocess::IO& err,
+      const Option<flags::FlagsBase>& flags,
+      const Option<std::map<std::string, std::string> >& environment,
+      const Option<lambda::function<int()> >& setup) = 0;
 
   // Kill all processes in the containerized context.
   virtual process::Future<Nothing> destroy(const ContainerID& containerId) = 0;
 };
 
 
-// Launcher suitable for any POSIX compliant system. Uses process groups and
-// sessions to track processes in a container. POSIX states that process groups
-// cannot migrate between sessions so all processes for a container will be
-// contained in a session.
+// Launcher suitable for any POSIX compliant system. Uses process
+// groups and sessions to track processes in a container. POSIX states
+// that process groups cannot migrate between sessions so all
+// processes for a container will be contained in a session.
 class PosixLauncher : public Launcher
 {
 public:
@@ -71,15 +86,22 @@ public:
 
   virtual Try<pid_t> fork(
       const ContainerID& containerId,
-      const lambda::function<int()>& childFunction);
+      const std::string& path,
+      const std::vector<std::string>& argv,
+      const process::Subprocess::IO& in,
+      const process::Subprocess::IO& out,
+      const process::Subprocess::IO& err,
+      const Option<flags::FlagsBase>& flags,
+      const Option<std::map<std::string, std::string> >& environment,
+      const Option<lambda::function<int()> >& setup);
 
   virtual process::Future<Nothing> destroy(const ContainerID& containerId);
 
 private:
   PosixLauncher() {}
 
-  // The 'pid' is the process id of the first process and also the process
-  // group id and session id.
+  // The 'pid' is the process id of the first process and also the
+  // process group id and session id.
   hashmap<ContainerID, pid_t> pids;
 };
 

http://git-wip-us.apache.org/repos/asf/mesos/blob/d0046dca/src/slave/containerizer/linux_launcher.cpp
----------------------------------------------------------------------
diff --git a/src/slave/containerizer/linux_launcher.cpp b/src/slave/containerizer/linux_launcher.cpp
index 7ebccb4..1ce03e3 100644
--- a/src/slave/containerizer/linux_launcher.cpp
+++ b/src/slave/containerizer/linux_launcher.cpp
@@ -38,6 +38,7 @@
 using namespace process;
 
 using std::list;
+using std::map;
 using std::string;
 using std::vector;
 
@@ -59,7 +60,9 @@ LinuxLauncher::LinuxLauncher(
 Try<Launcher*> LinuxLauncher::create(const Flags& flags)
 {
   Try<string> hierarchy = cgroups::prepare(
-      flags.cgroups_hierarchy, "freezer", flags.cgroups_root);
+      flags.cgroups_hierarchy,
+      "freezer",
+      flags.cgroups_root);
 
   if (hierarchy.isError()) {
     return Error("Failed to create Linux launcher: " + hierarchy.error());
@@ -68,7 +71,8 @@ Try<Launcher*> LinuxLauncher::create(const Flags& flags)
   LOG(INFO) << "Using " << hierarchy.get()
             << " as the freezer hierarchy for the Linux launcher";
 
-  // TODO(idownes): Inspect the isolation flag to determine namespaces to use.
+  // TODO(idownes): Inspect the isolation flag to determine namespaces
+  // to use.
   int namespaces = 0;
 
   return new LinuxLauncher(flags, namespaces, hierarchy.get());
@@ -94,10 +98,10 @@ Future<Nothing> LinuxLauncher::recover(const std::list<state::RunState>& states)
     Try<bool> exists = cgroups::exists(hierarchy, cgroup(containerId));
 
     if (!exists.get()) {
-      // This may occur if the freezer cgroup was destroyed but the slave dies
-      // before noticing this.
-      // The containerizer will monitor the container's pid and notice that it
-      // has exited, triggering destruction of the container.
+      // This may occur if the freezer cgroup was destroyed but the
+      // slave dies before noticing this. The containerizer will
+      // monitor the container's pid and notice that it has exited,
+      // triggering destruction of the container.
       LOG(INFO) << "Couldn't find freezer cgroup for container " << containerId;
       continue;
     }
@@ -109,11 +113,12 @@ Future<Nothing> LinuxLauncher::recover(const std::list<state::RunState>& states)
     pid_t pid = state.forkedPid.get();
 
     if (pids.containsValue(pid)) {
-      // This should (almost) never occur. There is the possibility that a new
-      // executor is launched with the same pid as one that just exited (highly
-      // unlikely) and the slave dies after the new executor is launched but
-      // before it hears about the termination of the earlier executor (also
-      // unlikely). Regardless, the launcher can't do anything sensible so this
+      // This should (almost) never occur. There is the possibility
+      // that a new executor is launched with the same pid as one that
+      // just exited (highly unlikely) and the slave dies after the
+      // new executor is launched but before it hears about the
+      // termination of the earlier executor (also unlikely).
+      // Regardless, the launcher can't do anything sensible so this
       // is considered an error.
       return Failure("Detected duplicate pid " + stringify(pid) +
                      " for container " + stringify(containerId));
@@ -149,58 +154,84 @@ Future<Nothing> LinuxLauncher::recover(const std::list<state::RunState>& states)
 
 
 // Helper for clone() which expects an int(void*).
-static int childMain(void* child)
+static int childMain(void* _func)
 {
   const lambda::function<int()>* func =
-    static_cast<const lambda::function<int()>*> (child);
+    static_cast<const lambda::function<int()>*> (_func);
 
   return (*func)();
 }
 
 
-// Helper that creates a new session then blocks on reading the pipe before
-// calling the supplied function.
-static int _childMain(
-    const lambda::function<int()>& childFunction,
-    int pipes[2])
+// The customized clone function which will be used by 'subprocess()'.
+static pid_t clone(const lambda::function<int()>& func, int namespaces)
 {
-  // In child.
-  os::close(pipes[1]);
+  // Stack for the child.
+  // - unsigned long long used for best alignment.
+  // - static is ok because each child gets their own copy after the clone.
+  // - 8 MiB appears to be the default for "ulimit -s" on OSX and Linux.
+  static unsigned long long stack[(8*1024*1024)/sizeof(unsigned long long)];
+
+  LOG(INFO) << "Cloning child process with flags = " << namespaces;
+
+  return ::clone(
+      childMain,
+      &stack[sizeof(stack)/sizeof(stack[0]) - 1],  // stack grows down
+      namespaces | SIGCHLD,   // Specify SIGCHLD as child termination signal
+      (void*) &func);
+}
 
-  // Move to a different session (and new process group) so we're independent
-  // from the slave's session (otherwise children will receive SIGHUP if the
-  // slave exits).
-  // TODO(idownes): perror is not listed as async-signal-safe and should be
-  // reimplemented safely.
-  if (setsid() == -1) {
-    perror("Failed to put child in a new session");
-    os::close(pipes[0]);
-    _exit(1);
-  }
 
-  // Do a blocking read on the pipe until the parent signals us to continue.
-  int buf;
-  int len;
-  while ((len = read(pipes[0], &buf, sizeof(buf))) == -1 && errno == EINTR);
+static int childSetup(
+    int pipes[2],
+    const Option<lambda::function<int()> >& setup)
+{
+  // In child.
+  while (::close(pipes[1]) == -1 && errno == EINTR);
 
-  if (len != sizeof(buf)) {
+  // Do a blocking read on the pipe until the parent signals us to
+  // continue.
+  char dummy;
+  ssize_t length;
+  while ((length = ::read(pipes[0], &dummy, sizeof(dummy))) == -1 &&
+         errno == EINTR);
+
+  if (length != sizeof(dummy)) {
     ABORT("Failed to synchronize with parent");
   }
 
-  os::close(pipes[0]);
+  while (::close(pipes[0]) == -1 && errno == EINTR);
 
-  // This function should exec() and therefore not return.
-  childFunction();
+  // Move to a different session (and new process group) so we're
+  // independent from the slave's session (otherwise children will
+  // receive SIGHUP if the slave exits).
+  // TODO(idownes): perror is not listed as async-signal-safe and
+  // should be reimplemented safely.
+  // TODO(jieyu): Move this logic to the subprocess (i.e.,
+  // mesos-containerizer launch).
+  if (::setsid() == -1) {
+    perror("Failed to put child in a new session");
+    return 1;
+  }
 
-  ABORT("Child failed to exec");
+  if (setup.isSome()) {
+    return setup.get()();
+  }
 
-  return -1;
+  return 0;
 }
 
 
 Try<pid_t> LinuxLauncher::fork(
     const ContainerID& containerId,
-    const lambda::function<int()>& childFunction)
+    const string& path,
+    const vector<string>& argv,
+    const process::Subprocess::IO& in,
+    const process::Subprocess::IO& out,
+    const process::Subprocess::IO& err,
+    const Option<flags::FlagsBase>& flags,
+    const Option<map<string, string> >& environment,
+    const Option<lambda::function<int()> >& setup)
 {
   // Create a freezer cgroup for this container if necessary.
   Try<bool> exists = cgroups::exists(hierarchy, cgroup(containerId));
@@ -217,70 +248,72 @@ Try<pid_t> LinuxLauncher::fork(
     }
   }
 
-  // Use a pipe to block the child until it's been moved into the freezer
-  // cgroup.
+  // Use a pipe to block the child until it's been moved into the
+  // freezer cgroup.
   int pipes[2];
-  // We assume this should not fail under reasonable conditions so we use CHECK.
-  CHECK(pipe(pipes) == 0);
 
-  // Use the _childMain helper which moves the child into a new session and
-  // blocks on the pipe until we're ready for it to run.
-  lambda::function<int()> func =
-    lambda::bind(&_childMain, childFunction, pipes);
-
-  // Stack for the child.
-  // - unsigned long long used for best alignment.
-  // - static is ok because each child gets their own copy after the clone.
-  // - 8 MiB appears to be the default for "ulimit -s" on OSX and Linux.
-  static unsigned long long stack[(8*1024*1024)/sizeof(unsigned long long)];
-
-  LOG(INFO) << "Cloning child process with flags = " << namespaces;
-
-  pid_t pid;
-  if ((pid = ::clone(
-          childMain,
-          &stack[sizeof(stack)/sizeof(stack[0]) - 1],  // stack grows down
-          namespaces | SIGCHLD,   // Specify SIGCHLD as child termination signal
-          static_cast<void*>(&func))) == -1) {
-      return ErrnoError("Failed to clone child process");
+  // We assume this should not fail under reasonable conditions so we
+  // use CHECK.
+  CHECK_EQ(0, ::pipe(pipes));
+
+  Try<Subprocess> child = subprocess(
+      path,
+      argv,
+      in,
+      out,
+      err,
+      flags,
+      environment,
+      lambda::bind(&childSetup, pipes, setup),
+      lambda::bind(&clone, lambda::_1, namespaces));
+
+  if (child.isError()) {
+    return Error("Failed to clone child process: " + child.error());
   }
 
   // Parent.
   os::close(pipes[0]);
 
-  // Move the child into the freezer cgroup. Any grandchildren will also be
-  // contained in the cgroup.
-  Try<Nothing> assign = cgroups::assign(hierarchy, cgroup(containerId), pid);
+  // Move the child into the freezer cgroup. Any grandchildren will
+  // also be contained in the cgroup.
+  // TODO(jieyu): Move this logic to the subprocess (i.e.,
+  // mesos-containerizer launch).
+  Try<Nothing> assign = cgroups::assign(
+      hierarchy,
+      cgroup(containerId),
+      child.get().pid());
 
   if (assign.isError()) {
-    LOG(ERROR) << "Failed to assign process " << pid
+    LOG(ERROR) << "Failed to assign process " << child.get().pid()
                 << " of container '" << containerId << "'"
                 << " to its freezer cgroup: " << assign.error();
-    kill(pid, SIGKILL);
+
+    ::kill(child.get().pid(), SIGKILL);
     return Error("Failed to contain process");
   }
 
-  // Now that we've contained the child we can signal it to continue by
-  // writing to the pipe.
-  int buf;
-  ssize_t len;
-  while ((len = write(pipes[1], &buf, sizeof(buf))) == -1 && errno == EINTR);
+  // Now that we've contained the child we can signal it to continue
+  // by writing to the pipe.
+  char dummy;
+  ssize_t length;
+  while ((length = ::write(pipes[1], &dummy, sizeof(dummy))) == -1 &&
+         errno == EINTR);
 
-  if (len != sizeof(buf)) {
+  os::close(pipes[1]);
+
+  if (length != sizeof(dummy)) {
     // Ensure the child is killed.
-    kill(pid, SIGKILL);
-    os::close(pipes[1]);
+    ::kill(child.get().pid(), SIGKILL);
     return Error("Failed to synchronize child process");
   }
-  os::close(pipes[1]);
 
-  // Store the pid (session id and process group id) if this is the first
-  // process forked for this container.
+  // Store the pid (session id and process group id) if this is the
+  // first process forked for this container.
   if (!pids.contains(containerId)) {
-    pids.put(containerId, pid);
+    pids.put(containerId, child.get().pid());
   }
 
-  return pid;
+  return child.get().pid();
 }
 
 

http://git-wip-us.apache.org/repos/asf/mesos/blob/d0046dca/src/slave/containerizer/linux_launcher.hpp
----------------------------------------------------------------------
diff --git a/src/slave/containerizer/linux_launcher.hpp b/src/slave/containerizer/linux_launcher.hpp
index 622810c..3d9794d 100644
--- a/src/slave/containerizer/linux_launcher.hpp
+++ b/src/slave/containerizer/linux_launcher.hpp
@@ -25,8 +25,8 @@ namespace mesos {
 namespace internal {
 namespace slave {
 
-// Launcher for Linux systems with cgroups. Uses a freezer cgroup to track
-// pids.
+// Launcher for Linux systems with cgroups. Uses a freezer cgroup to
+// track pids.
 class LinuxLauncher : public Launcher
 {
 public:
@@ -39,7 +39,14 @@ public:
 
   virtual Try<pid_t> fork(
       const ContainerID& containerId,
-      const lambda::function<int()>& childFunction);
+      const std::string& path,
+      const std::vector<std::string>& argv,
+      const process::Subprocess::IO& in,
+      const process::Subprocess::IO& out,
+      const process::Subprocess::IO& err,
+      const Option<flags::FlagsBase>& flags,
+      const Option<std::map<std::string, std::string> >& environment,
+      const Option<lambda::function<int()> >& setup);
 
   virtual process::Future<Nothing> destroy(const ContainerID& containerId);
 
@@ -56,8 +63,8 @@ private:
 
   std::string cgroup(const ContainerID& containerId);
 
-  // The 'pid' is the process id of the child process and also the process
-  // group id and session id.
+  // The 'pid' is the process id of the child process and also the
+  // process group id and session id.
   hashmap<ContainerID, pid_t> pids;
 };
 

http://git-wip-us.apache.org/repos/asf/mesos/blob/d0046dca/src/slave/containerizer/mesos/containerizer.cpp
----------------------------------------------------------------------
diff --git a/src/slave/containerizer/mesos/containerizer.cpp b/src/slave/containerizer/mesos/containerizer.cpp
new file mode 100644
index 0000000..27f8e09
--- /dev/null
+++ b/src/slave/containerizer/mesos/containerizer.cpp
@@ -0,0 +1,1048 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include <process/collect.hpp>
+#include <process/defer.hpp>
+#include <process/io.hpp>
+#include <process/reap.hpp>
+#include <process/subprocess.hpp>
+
+#include <stout/os.hpp>
+
+#include "slave/paths.hpp"
+#include "slave/slave.hpp"
+
+#include "slave/containerizer/containerizer.hpp"
+#include "slave/containerizer/isolator.hpp"
+#include "slave/containerizer/launcher.hpp"
+#ifdef __linux__
+#include "slave/containerizer/linux_launcher.hpp"
+#endif // __linux__
+
+#include "slave/containerizer/isolators/posix.hpp"
+#ifdef __linux__
+#include "slave/containerizer/isolators/cgroups/cpushare.hpp"
+#include "slave/containerizer/isolators/cgroups/mem.hpp"
+#include "slave/containerizer/isolators/cgroups/perf_event.hpp"
+#endif // __linux__
+
+#include "slave/containerizer/mesos/containerizer.hpp"
+#include "slave/containerizer/mesos/launch.hpp"
+
+using std::list;
+using std::map;
+using std::string;
+using std::vector;
+
+using namespace process;
+
+namespace mesos {
+namespace internal {
+namespace slave {
+
+using state::SlaveState;
+using state::FrameworkState;
+using state::ExecutorState;
+using state::RunState;
+
+// Local function declaration/definitions.
+Future<Nothing> _nothing() { return Nothing(); }
+
+
+// Helper method to build the environment map used to launch fetcher.
+map<string, string> fetcherEnvironment(
+    const CommandInfo& commandInfo,
+    const std::string& directory,
+    const Option<std::string>& user,
+    const Flags& flags)
+{
+  // Prepare the environment variables to pass to mesos-fetcher.
+  string uris = "";
+  foreach (const CommandInfo::URI& uri, commandInfo.uris()) {
+    uris += uri.value() + "+" +
+            (uri.has_executable() && uri.executable() ? "1" : "0") +
+            (uri.extract() ? "X" : "N");
+    uris += " ";
+  }
+  // Remove extra space at the end.
+  uris = strings::trim(uris);
+
+  map<string, string> environment;
+  environment["MESOS_EXECUTOR_URIS"] = uris;
+  environment["MESOS_WORK_DIRECTORY"] = directory;
+  if (user.isSome()) {
+    environment["MESOS_USER"] = user.get();
+  }
+  if (!flags.frameworks_home.empty()) {
+    environment["MESOS_FRAMEWORKS_HOME"] = flags.frameworks_home;
+  }
+  if (!flags.hadoop_home.empty()) {
+    environment["HADOOP_HOME"] = flags.hadoop_home;
+  }
+
+  return environment;
+}
+
+
+Try<MesosContainerizer*> MesosContainerizer::create(
+    const Flags& flags,
+    bool local)
+{
+  string isolation;
+  if (flags.isolation == "process") {
+    LOG(WARNING) << "The 'process' isolation flag is deprecated, "
+                 << "please update your flags to"
+                 << " '--isolation=posix/cpu,posix/mem'.";
+    isolation = "posix/cpu,posix/mem";
+  } else if (flags.isolation == "cgroups") {
+    LOG(WARNING) << "The 'cgroups' isolation flag is deprecated, "
+                 << "please update your flags to"
+                 << " '--isolation=cgroups/cpu,cgroups/mem'.";
+    isolation = "cgroups/cpu,cgroups/mem";
+  } else {
+    isolation = flags.isolation;
+  }
+
+  LOG(INFO) << "Using isolation: " << isolation;
+
+  // Create a MesosContainerizerProcess using isolators and a launcher.
+  hashmap<std::string, Try<Isolator*> (*)(const Flags&)> creators;
+
+  creators["posix/cpu"]   = &PosixCpuIsolatorProcess::create;
+  creators["posix/mem"]   = &PosixMemIsolatorProcess::create;
+#ifdef __linux__
+  creators["cgroups/cpu"] = &CgroupsCpushareIsolatorProcess::create;
+  creators["cgroups/mem"] = &CgroupsMemIsolatorProcess::create;
+  creators["cgroups/perf_event"] = &CgroupsPerfEventIsolatorProcess::create;
+#endif // __linux__
+
+  vector<Owned<Isolator> > isolators;
+
+  foreach (const string& type, strings::split(isolation, ",")) {
+    if (creators.contains(type)) {
+      Try<Isolator*> isolator = creators[type](flags);
+      if (isolator.isError()) {
+        return Error(
+            "Could not create isolator " + type + ": " + isolator.error());
+      } else {
+        isolators.push_back(Owned<Isolator>(isolator.get()));
+      }
+    } else {
+      return Error("Unknown or unsupported isolator: " + type);
+    }
+  }
+
+#ifdef __linux__
+  // Use cgroups on Linux if any cgroups isolators are used.
+  Try<Launcher*> launcher = strings::contains(isolation, "cgroups")
+    ? LinuxLauncher::create(flags)
+    : PosixLauncher::create(flags);
+#else
+  Try<Launcher*> launcher = PosixLauncher::create(flags);
+#endif // __linux__
+
+  if (launcher.isError()) {
+    return Error("Failed to create launcher: " + launcher.error());
+  }
+
+  return new MesosContainerizer(
+      flags, local, Owned<Launcher>(launcher.get()), isolators);
+}
+
+
+MesosContainerizer::MesosContainerizer(
+    const Flags& flags,
+    bool local,
+    const Owned<Launcher>& launcher,
+    const vector<Owned<Isolator> >& isolators)
+{
+  process = new MesosContainerizerProcess(
+      flags, local, launcher, isolators);
+  spawn(process);
+}
+
+
+MesosContainerizer::~MesosContainerizer()
+{
+  terminate(process);
+  process::wait(process);
+  delete process;
+}
+
+
+Future<Nothing> MesosContainerizer::recover(
+    const Option<state::SlaveState>& state)
+{
+  return dispatch(process, &MesosContainerizerProcess::recover, state);
+}
+
+
+Future<Nothing> MesosContainerizer::launch(
+    const ContainerID& containerId,
+    const ExecutorInfo& executorInfo,
+    const string& directory,
+    const Option<string>& user,
+    const SlaveID& slaveId,
+    const PID<Slave>& slavePid,
+    bool checkpoint)
+{
+  return dispatch(process,
+                  &MesosContainerizerProcess::launch,
+                  containerId,
+                  executorInfo,
+                  directory,
+                  user,
+                  slaveId,
+                  slavePid,
+                  checkpoint);
+}
+
+
+Future<Nothing> MesosContainerizer::launch(
+    const ContainerID& containerId,
+    const TaskInfo& taskInfo,
+    const ExecutorInfo& executorInfo,
+    const string& directory,
+    const Option<string>& user,
+    const SlaveID& slaveId,
+    const PID<Slave>& slavePid,
+    bool checkpoint)
+{
+  return dispatch(process,
+                  &MesosContainerizerProcess::launch,
+                  containerId,
+                  taskInfo,
+                  executorInfo,
+                  directory,
+                  user,
+                  slaveId,
+                  slavePid,
+                  checkpoint);
+}
+
+
+Future<Nothing> MesosContainerizer::update(
+    const ContainerID& containerId,
+    const Resources& resources)
+{
+  return dispatch(process,
+                  &MesosContainerizerProcess::update,
+                  containerId,
+                  resources);
+}
+
+
+Future<ResourceStatistics> MesosContainerizer::usage(
+    const ContainerID& containerId)
+{
+  return dispatch(process, &MesosContainerizerProcess::usage, containerId);
+}
+
+
+Future<containerizer::Termination> MesosContainerizer::wait(
+    const ContainerID& containerId)
+{
+  return dispatch(process, &MesosContainerizerProcess::wait, containerId);
+}
+
+
+void MesosContainerizer::destroy(const ContainerID& containerId)
+{
+  dispatch(process, &MesosContainerizerProcess::destroy, containerId);
+}
+
+
+Future<hashset<ContainerID> > MesosContainerizer::containers()
+{
+  return dispatch(process, &MesosContainerizerProcess::containers);
+}
+
+
+Future<Nothing> MesosContainerizerProcess::recover(
+    const Option<state::SlaveState>& state)
+{
+  LOG(INFO) << "Recovering containerizer";
+
+  // Gather the executor run states that we will attempt to recover.
+  list<RunState> recoverable;
+  if (state.isSome()) {
+    foreachvalue (const FrameworkState& framework, state.get().frameworks) {
+      foreachvalue (const ExecutorState& executor, framework.executors) {
+        if (executor.info.isNone()) {
+          LOG(WARNING) << "Skipping recovery of executor '" << executor.id
+                       << "' of framework " << framework.id
+                       << " because its info could not be recovered";
+          continue;
+        }
+
+        if (executor.latest.isNone()) {
+          LOG(WARNING) << "Skipping recovery of executor '" << executor.id
+                       << "' of framework " << framework.id
+                       << " because its latest run could not be recovered";
+          continue;
+        }
+
+        // We are only interested in the latest run of the executor!
+        const ContainerID& containerId = executor.latest.get();
+        Option<RunState> run = executor.runs.get(containerId);
+        CHECK_SOME(run);
+
+        // We need the pid so the reaper can monitor the executor so skip this
+        // executor if it's not present. This is not an error because the slave
+        // will try to wait on the container which will return a failed
+        // Termination and everything will get cleaned up.
+        if (!run.get().forkedPid.isSome()) {
+          continue;
+        }
+
+        if (run.get().completed) {
+          VLOG(1) << "Skipping recovery of executor '" << executor.id
+                  << "' of framework " << framework.id
+                  << " because its latest run "
+                  << containerId << " is completed";
+          continue;
+        }
+
+        LOG(INFO) << "Recovering container '" << containerId
+                  << "' for executor '" << executor.id
+                  << "' of framework " << framework.id;
+
+        recoverable.push_back(run.get());
+      }
+    }
+  }
+
+  // Try to recover the launcher first.
+  return launcher->recover(recoverable)
+    .then(defer(self(), &Self::_recover, recoverable));
+}
+
+
+Future<Nothing> MesosContainerizerProcess::_recover(
+    const list<RunState>& recoverable)
+{
+  // Then recover the isolators.
+  list<Future<Nothing> > futures;
+  foreach (const Owned<Isolator>& isolator, isolators) {
+    futures.push_back(isolator->recover(recoverable));
+  }
+
+  // If all isolators recover then continue.
+  return collect(futures)
+    .then(defer(self(), &Self::__recover, recoverable));
+}
+
+
+Future<Nothing> MesosContainerizerProcess::__recover(
+    const list<RunState>& recovered)
+{
+  foreach (const RunState& run, recovered) {
+    CHECK_SOME(run.id);
+    const ContainerID& containerId = run.id.get();
+
+    Owned<Promise<containerizer::Termination> > promise(
+        new Promise<containerizer::Termination>());
+    promises.put(containerId, promise);
+
+    CHECK_SOME(run.forkedPid);
+    Future<Option<int > > status = process::reap(run.forkedPid.get());
+    statuses[containerId] = status;
+    status.onAny(defer(self(), &Self::reaped, containerId));
+
+    foreach (const Owned<Isolator>& isolator, isolators) {
+      isolator->watch(containerId)
+        .onAny(defer(self(), &Self::limited, containerId, lambda::_1));
+    }
+  }
+
+  return Nothing();
+}
+
+
+// Launching an executor involves the following steps:
+// 1. Call prepare on each isolator.
+// 2. Fork the executor. The forked child is blocked from exec'ing until it has
+//    been isolated.
+// 3. Isolate the executor. Call isolate with the pid for each isolator.
+// 4. Fetch the executor.
+// 4. Exec the executor. The forked child is signalled to continue. It will
+//    first execute any preparation commands from isolators and then exec the
+//    executor.
+Future<Nothing> MesosContainerizerProcess::launch(
+    const ContainerID& containerId,
+    const ExecutorInfo& executorInfo,
+    const string& directory,
+    const Option<string>& user,
+    const SlaveID& slaveId,
+    const PID<Slave>& slavePid,
+    bool checkpoint)
+{
+  if (promises.contains(containerId)) {
+    LOG(ERROR) << "Cannot start already running container '"
+               << containerId << "'";
+    return Failure("Container already started");
+  }
+
+  // TODO(tillt): The slave should expose which containerization
+  // mechanisms it supports to avoid scheduling tasks that it cannot
+  // run.
+  const CommandInfo& command = executorInfo.command();
+  if (command.has_container()) {
+    // We return a Failure as this containerizer does not support
+    // handling ContainerInfo. Users have to be made aware of this
+    // lack of support to prevent confusion in the task configuration.
+    return Failure("ContainerInfo is not supported");
+  }
+
+  Owned<Promise<containerizer::Termination> > promise(
+      new Promise<containerizer::Termination>());
+  promises.put(containerId, promise);
+
+  // Store the resources for usage().
+  resources.put(containerId, executorInfo.resources());
+
+  LOG(INFO) << "Starting container '" << containerId
+            << "' for executor '" << executorInfo.executor_id()
+            << "' of framework '" << executorInfo.framework_id() << "'";
+
+  return prepare(containerId, executorInfo, directory, user)
+    .then(defer(self(),
+                &Self::_launch,
+                containerId,
+                executorInfo,
+                directory,
+                user,
+                slaveId,
+                slavePid,
+                checkpoint,
+                lambda::_1))
+    .onFailed(defer(self(),
+                    &Self::destroy,
+                    containerId));
+}
+
+
+Future<Nothing> MesosContainerizerProcess::launch(
+    const ContainerID& containerId,
+    const TaskInfo&,
+    const ExecutorInfo& executorInfo,
+    const string& directory,
+    const Option<string>& user,
+    const SlaveID& slaveId,
+    const PID<Slave>& slavePid,
+    bool checkpoint)
+{
+  return launch(
+      containerId,
+      executorInfo,
+      directory,
+      user,
+      slaveId,
+      slavePid,
+      checkpoint);
+}
+
+Future<list<Option<CommandInfo> > > MesosContainerizerProcess::prepare(
+    const ContainerID& containerId,
+    const ExecutorInfo& executorInfo,
+    const string& directory,
+    const Option<string>& user)
+{
+  // Start preparing all isolators (in parallel) and gather any additional
+  // preparation comands that must be run in the forked child before exec'ing
+  // the executor.
+  list<Future<Option<CommandInfo> > > futures;
+  foreach (const Owned<Isolator>& isolator, isolators) {
+    futures.push_back(isolator->prepare(containerId, executorInfo));
+  }
+
+  // Wait for all isolators to complete preparations.
+  return collect(futures);
+}
+
+
+Future<Nothing> _fetch(
+    const ContainerID& containerId,
+    const string& directory,
+    const Option<string>& user,
+    const Option<int>& status)
+{
+  if (status.isNone() || (status.get() != 0)) {
+    return Failure("Failed to fetch URIs for container '" +
+                   stringify(containerId) + "': exit status " +
+                   (status.isNone() ? "none" : stringify(status.get())));
+  }
+
+  // Chown the work directory if a user is provided.
+  if (user.isSome()) {
+    Try<Nothing> chown = os::chown(user.get(), directory);
+    if (chown.isError()) {
+      return Failure("Failed to chown work directory");
+    }
+  }
+
+  return Nothing();
+}
+
+
+Future<Nothing> MesosContainerizerProcess::fetch(
+    const ContainerID& containerId,
+    const CommandInfo& commandInfo,
+    const string& directory,
+    const Option<string>& user)
+{
+  // Determine path for mesos-fetcher.
+  Result<string> realpath = os::realpath(
+      path::join(flags.launcher_dir, "mesos-fetcher"));
+
+  if (!realpath.isSome()) {
+    LOG(ERROR) << "Failed to determine the canonical path "
+                << "for the mesos-fetcher '"
+                << path::join(flags.launcher_dir, "mesos-fetcher")
+                << "': "
+                << (realpath.isError() ? realpath.error()
+                                       : "No such file or directory");
+    return Failure("Could not fetch URIs: failed to find mesos-fetcher");
+  }
+
+  map<string, string> environment =
+    fetcherEnvironment(commandInfo, directory, user, flags);
+
+  // Now the actual mesos-fetcher command.
+  string command = realpath.get();
+
+  LOG(INFO) << "Fetching URIs for container '" << containerId
+            << "' using command '" << command << "'";
+
+  Try<Subprocess> fetcher = subprocess(
+      command,
+      Subprocess::PIPE(),
+      Subprocess::PIPE(),
+      Subprocess::PIPE(),
+      environment);
+
+  if (fetcher.isError()) {
+    return Failure("Failed to execute mesos-fetcher: " + fetcher.error());
+  }
+
+  // Redirect output (stdout and stderr) from the fetcher to log files
+  // in the executor work directory, chown'ing them if a user is
+  // specified.
+  // TODO(tillt): Consider adding O_CLOEXEC for atomic close-on-exec.
+  // TODO(tillt): Consider adding an overload to io::redirect
+  // that accepts a file path as 'to' for further reducing code. We
+  // would however also need an owner user parameter for such overload
+  // to perfectly replace the below.
+  Try<int> out = os::open(
+      path::join(directory, "stdout"),
+      O_WRONLY | O_CREAT | O_TRUNC | O_NONBLOCK,
+      S_IRUSR | S_IWUSR | S_IRGRP | S_IRWXO);
+
+  if (out.isError()) {
+    return Failure("Failed to redirect stdout: " + out.error());
+  }
+
+  if (user.isSome()) {
+    Try<Nothing> chown = os::chown(
+        user.get(), path::join(directory, "stdout"));
+    if (chown.isError()) {
+      os::close(out.get());
+      return Failure(
+          "Failed to redirect stdout: Failed to chown: " +
+          chown.error());
+    }
+  }
+
+  // Redirect takes care of nonblocking and close-on-exec for the
+  // supplied file descriptors.
+  io::redirect(fetcher.get().out().get(), out.get());
+
+  // Redirect does 'dup' the file descriptor, hence we can close the
+  // original now.
+  os::close(out.get());
+
+  // Repeat for stderr.
+  Try<int> err = os::open(
+      path::join(directory, "stderr"),
+      O_WRONLY | O_CREAT | O_TRUNC | O_NONBLOCK,
+      S_IRUSR | S_IWUSR | S_IRGRP | S_IRWXO);
+
+  if (err.isError()) {
+    return Failure(
+        "Failed to redirect stderr: Failed to open: " +
+        err.error());
+  }
+
+  if (user.isSome()) {
+    Try<Nothing> chown = os::chown(
+        user.get(), path::join(directory, "stderr"));
+    if (chown.isError()) {
+      os::close(err.get());
+      return Failure(
+          "Failed to redirect stderr: Failed to chown: " +
+          chown.error());
+    }
+  }
+
+  io::redirect(fetcher.get().err().get(), err.get());
+
+  os::close(err.get());
+
+  return fetcher.get().status()
+    .then(lambda::bind(&_fetch, containerId, directory, user, lambda::_1));
+}
+
+
+Future<Nothing> MesosContainerizerProcess::_launch(
+    const ContainerID& containerId,
+    const ExecutorInfo& executorInfo,
+    const string& directory,
+    const Option<string>& user,
+    const SlaveID& slaveId,
+    const PID<Slave>& slavePid,
+    bool checkpoint,
+    const list<Option<CommandInfo> >& commands)
+{
+  // Prepare environment variables for the executor.
+  map<string, string> env = executorEnvironment(
+      executorInfo,
+      directory,
+      slaveId,
+      slavePid,
+      checkpoint,
+      flags.recovery_timeout);
+
+  // Include any enviroment variables from CommandInfo.
+  foreach (const Environment::Variable& variable,
+           executorInfo.command().environment().variables()) {
+    env[variable.name()] = variable.value();
+  }
+
+  // Use a pipe to block the child until it's been isolated.
+  int pipes[2];
+
+  // We assume this should not fail under reasonable conditions so we
+  // use CHECK.
+  CHECK(pipe(pipes) == 0);
+
+  // Prepare the flags to pass to the launch process.
+  MesosContainerizerLaunch::Flags launchFlags;
+
+  launchFlags.command = JSON::Protobuf(executorInfo.command());
+  launchFlags.directory = directory;
+  launchFlags.user = user;
+  launchFlags.pipe_read = pipes[0];
+  launchFlags.pipe_write = pipes[1];
+
+  // Prepare the additional preparation commands.
+  // TODO(jieyu): Use JSON::Array once we have generic parse support.
+  JSON::Object object;
+  JSON::Array array;
+  foreach (const Option<CommandInfo>& command, commands) {
+    if (command.isSome()) {
+      array.values.push_back(JSON::Protobuf(command.get()));
+    }
+  }
+  object.values["commands"] = array;
+
+  launchFlags.commands = object;
+
+  // Fork the child using launcher.
+  vector<string> argv(2);
+  argv[0] = "mesos-containerizer";
+  argv[1] = MesosContainerizerLaunch::NAME;
+
+  Try<pid_t> forked = launcher->fork(
+      containerId,
+      path::join(flags.launcher_dir, "mesos-containerizer"),
+      argv,
+      Subprocess::FD(STDIN_FILENO),
+      (local ? Subprocess::FD(STDOUT_FILENO)
+             : Subprocess::PATH(path::join(directory, "stdout"))),
+      (local ? Subprocess::FD(STDERR_FILENO)
+             : Subprocess::PATH(path::join(directory, "stderr"))),
+      launchFlags,
+      env,
+      None());
+
+  if (forked.isError()) {
+    return Failure("Failed to fork executor: " + forked.error());
+  }
+  pid_t pid = forked.get();
+
+  // Checkpoint the executor's pid if requested.
+  if (checkpoint) {
+    const string& path = slave::paths::getForkedPidPath(
+        slave::paths::getMetaRootDir(flags.work_dir),
+        slaveId,
+        executorInfo.framework_id(),
+        executorInfo.executor_id(),
+        containerId);
+
+    LOG(INFO) << "Checkpointing executor's forked pid " << pid
+              << " to '" << path <<  "'";
+
+    Try<Nothing> checkpointed =
+      slave::state::checkpoint(path, stringify(pid));
+
+    if (checkpointed.isError()) {
+      LOG(ERROR) << "Failed to checkpoint executor's forked pid to '"
+                 << path << "': " << checkpointed.error();
+
+      return Failure("Could not checkpoint executor's pid");
+    }
+  }
+
+  // Monitor the executor's pid. We keep the future because we'll
+  // refer to it again during container destroy.
+  Future<Option<int> > status = process::reap(pid);
+  statuses.put(containerId, status);
+  status.onAny(defer(self(), &Self::reaped, containerId));
+
+  return isolate(containerId, pid)
+    .then(defer(self(),
+                &Self::fetch,
+                containerId,
+                executorInfo.command(),
+                directory,
+                user))
+    .then(defer(self(), &Self::exec, containerId, pipes[1]))
+    .onAny(lambda::bind(&os::close, pipes[0]))
+    .onAny(lambda::bind(&os::close, pipes[1]));
+}
+
+
+Future<Nothing> MesosContainerizerProcess::isolate(
+    const ContainerID& containerId,
+    pid_t _pid)
+{
+  // Set up callbacks for isolator limitations.
+  foreach (const Owned<Isolator>& isolator, isolators) {
+    isolator->watch(containerId)
+      .onAny(defer(self(), &Self::limited, containerId, lambda::_1));
+  }
+
+  // Isolate the executor with each isolator.
+  list<Future<Nothing> > futures;
+  foreach (const Owned<Isolator>& isolator, isolators) {
+    futures.push_back(isolator->isolate(containerId, _pid));
+  }
+
+  // Wait for all isolators to complete.
+  return collect(futures)
+    .then(lambda::bind(&_nothing));
+}
+
+
+Future<Nothing> MesosContainerizerProcess::exec(
+    const ContainerID& containerId,
+    int pipeWrite)
+{
+  CHECK(promises.contains(containerId));
+
+  // Now that we've contained the child we can signal it to continue by
+  // writing to the pipe.
+  char dummy;
+  ssize_t length;
+  while ((length = write(pipeWrite, &dummy, sizeof(dummy))) == -1 &&
+         errno == EINTR);
+
+  if (length != sizeof(dummy)) {
+    return Failure("Failed to synchronize child process: " +
+                   string(strerror(errno)));
+  }
+
+  return Nothing();
+}
+
+
+Future<containerizer::Termination> MesosContainerizerProcess::wait(
+    const ContainerID& containerId)
+{
+  if (!promises.contains(containerId)) {
+    return Failure("Unknown container: " + stringify(containerId));
+  }
+
+  return promises[containerId]->future();
+}
+
+
+Future<Nothing> MesosContainerizerProcess::update(
+    const ContainerID& containerId,
+    const Resources& _resources)
+{
+  // The resources hashmap won't initially contain the container's resources
+  // after recovery so we must check the promises hashmap (which is set during
+  // recovery).
+  if (!promises.contains(containerId)) {
+    // It is not considered a failure if the container is not known
+    // because the slave will attempt to update the container's
+    // resources on a task's terminal state change but the executor
+    // may have already exited and the container cleaned up.
+    LOG(WARNING) << "Ignoring update for unknown container: " << containerId;
+    return Nothing();
+  }
+
+  // Store the resources for usage().
+  resources.put(containerId, _resources);
+
+  // Update each isolator.
+  list<Future<Nothing> > futures;
+  foreach (const Owned<Isolator>& isolator, isolators) {
+    futures.push_back(isolator->update(containerId, _resources));
+  }
+
+  // Wait for all isolators to complete.
+  return collect(futures)
+    .then(lambda::bind(&_nothing));
+}
+
+
+// Resources are used to set the limit fields in the statistics but are
+// optional because they aren't known after recovery until/unless update() is
+// called.
+Future<ResourceStatistics> _usage(
+    const ContainerID& containerId,
+    const Option<Resources>& resources,
+    const list<Future<ResourceStatistics> >& statistics)
+{
+  ResourceStatistics result;
+
+  // Set the timestamp now we have all statistics.
+  result.set_timestamp(Clock::now().secs());
+
+  foreach (const Future<ResourceStatistics>& statistic, statistics) {
+    if (statistic.isReady()) {
+      result.MergeFrom(statistic.get());
+    } else {
+      LOG(WARNING) << "Skipping resource statistic for container "
+                   << containerId << " because: "
+                   << (statistic.isFailed() ? statistic.failure()
+                                            : "discarded");
+    }
+  }
+
+  if (resources.isSome()) {
+    // Set the resource allocations.
+    Option<Bytes> mem = resources.get().mem();
+    if (mem.isSome()) {
+      result.set_mem_limit_bytes(mem.get().bytes());
+    }
+
+    Option<double> cpus = resources.get().cpus();
+    if (cpus.isSome()) {
+      result.set_cpus_limit(cpus.get());
+    }
+  }
+
+  return result;
+}
+
+
+Future<ResourceStatistics> MesosContainerizerProcess::usage(
+    const ContainerID& containerId)
+{
+  if (!promises.contains(containerId)) {
+    return Failure("Unknown container: " + stringify(containerId));
+  }
+
+  list<Future<ResourceStatistics> > futures;
+  foreach (const Owned<Isolator>& isolator, isolators) {
+    futures.push_back(isolator->usage(containerId));
+  }
+
+  // Use await() here so we can return partial usage statistics.
+  // TODO(idownes): After recovery resources won't be known until after an
+  // update() because they aren't part of the SlaveState.
+  return await(futures)
+    .then(lambda::bind(
+          _usage, containerId, resources.get(containerId), lambda::_1));
+}
+
+
+void MesosContainerizerProcess::destroy(const ContainerID& containerId)
+{
+  if (!promises.contains(containerId)) {
+    LOG(WARNING) << "Ignoring destroy of unknown container: " << containerId;
+    return;
+  }
+
+  if (destroying.contains(containerId)) {
+    // Destroy has already been initiated.
+    return;
+  }
+  destroying.insert(containerId);
+
+  LOG(INFO) << "Destroying container '" << containerId << "'";
+
+  if (statuses.contains(containerId)) {
+    // Kill all processes then continue destruction.
+    launcher->destroy(containerId)
+      .onAny(defer(self(), &Self::_destroy, containerId, lambda::_1));
+  } else {
+    // The executor never forked so no processes to kill, go straight to
+    // __destroy() with status = None().
+    __destroy(containerId, None());
+  }
+}
+
+
+void MesosContainerizerProcess::_destroy(
+    const ContainerID& containerId,
+    const Future<Nothing>& future)
+{
+  // Something has gone wrong and the launcher wasn't able to kill all the
+  // processes in the container. We cannot clean up the isolators because they
+  // may require that all processes have exited so just return the failure to
+  // the slave.
+  // TODO(idownes): This is a pretty bad state to be in but we should consider
+  // cleaning up here.
+  if (!future.isReady()) {
+    promises[containerId]->fail(
+        "Failed to destroy container: " +
+        (future.isFailed() ? future.failure() : "discarded future"));
+
+    destroying.erase(containerId);
+
+    return;
+  }
+
+  // We've successfully killed all processes in the container so get the exit
+  // status of the executor when it's ready (it may already be) and continue
+  // the destroy.
+  statuses.get(containerId).get()
+    .onAny(defer(self(), &Self::__destroy, containerId, lambda::_1));
+}
+
+
+void MesosContainerizerProcess::__destroy(
+    const ContainerID& containerId,
+    const Future<Option<int > >& status)
+{
+  // Now that all processes have exited we can now clean up all isolators.
+  list<Future<Nothing> > futures;
+  foreach (const Owned<Isolator>& isolator, isolators) {
+    futures.push_back(isolator->cleanup(containerId));
+  }
+
+  // Wait for all isolators to complete cleanup before continuing.
+  collect(futures)
+    .onAny(defer(self(), &Self::___destroy, containerId, status, lambda::_1));
+}
+
+
+void MesosContainerizerProcess::___destroy(
+    const ContainerID& containerId,
+    const Future<Option<int > >& status,
+    const Future<list<Nothing> >& futures)
+{
+  // Something has gone wrong with one of the Isolators and cleanup failed.
+  // We'll fail the container termination and remove the 'destroying' flag but
+  // leave all other state. The containerizer is now in a bad state because
+  // at least one isolator has failed to clean up.
+  if (!futures.isReady()) {
+    promises[containerId]->fail(
+        "Failed to clean up isolators when destroying container: " +
+        (futures.isFailed() ? futures.failure() : "discarded future"));
+
+    destroying.erase(containerId);
+
+    return;
+  }
+
+  // A container is 'killed' if any isolator limited it.
+  // Note: We may not see a limitation in time for it to be registered. This
+  // could occur if the limitation (e.g., an OOM) killed the executor and we
+  // triggered destroy() off the executor exit.
+  bool killed = false;
+  string message;
+  if (limitations.contains(containerId)) {
+    killed = true;
+    foreach (const Limitation& limitation, limitations.get(containerId)) {
+      message += limitation.message;
+    }
+    message = strings::trim(message);
+  } else {
+    message = "Executor terminated";
+  }
+
+  containerizer::Termination termination;
+  termination.set_killed(killed);
+  termination.set_message(message);
+  if (status.isReady() && status.get().isSome()) {
+    termination.set_status(status.get().get());
+  }
+
+  promises[containerId]->set(termination);
+
+  promises.erase(containerId);
+  statuses.erase(containerId);
+  limitations.erase(containerId);
+  resources.erase(containerId);
+  destroying.erase(containerId);
+}
+
+
+void MesosContainerizerProcess::reaped(const ContainerID& containerId)
+{
+  if (!promises.contains(containerId)) {
+    return;
+  }
+
+  LOG(INFO) << "Executor for container '" << containerId << "' has exited";
+
+  // The executor has exited so destroy the container.
+  destroy(containerId);
+}
+
+
+void MesosContainerizerProcess::limited(
+    const ContainerID& containerId,
+    const Future<Limitation>& future)
+{
+  if (!promises.contains(containerId)) {
+    return;
+  }
+
+  if (future.isReady()) {
+    LOG(INFO) << "Container " << containerId << " has reached its limit for"
+              << " resource " << future.get().resource
+              << " and will be terminated";
+    limitations.put(containerId, future.get());
+  } else {
+    // TODO(idownes): A discarded future will not be an error when isolators
+    // discard their promises after cleanup.
+    LOG(ERROR) << "Error in a resource limitation for container "
+               << containerId << ": " << (future.isFailed() ? future.failure()
+                                                            : "discarded");
+  }
+
+  // The container has been affected by the limitation so destroy it.
+  destroy(containerId);
+}
+
+
+Future<hashset<ContainerID> > MesosContainerizerProcess::containers()
+{
+  return promises.keys();
+}
+
+
+} // namespace slave {
+} // namespace internal {
+} // namespace mesos {

http://git-wip-us.apache.org/repos/asf/mesos/blob/d0046dca/src/slave/containerizer/mesos/containerizer.hpp
----------------------------------------------------------------------
diff --git a/src/slave/containerizer/mesos/containerizer.hpp b/src/slave/containerizer/mesos/containerizer.hpp
new file mode 100644
index 0000000..8746968
--- /dev/null
+++ b/src/slave/containerizer/mesos/containerizer.hpp
@@ -0,0 +1,240 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#ifndef __MESOS_CONTAINERIZER_HPP__
+#define __MESOS_CONTAINERIZER_HPP__
+
+#include <list>
+#include <vector>
+
+#include <stout/hashmap.hpp>
+#include <stout/multihashmap.hpp>
+
+#include "slave/containerizer/containerizer.hpp"
+#include "slave/containerizer/isolator.hpp"
+#include "slave/containerizer/launcher.hpp"
+
+namespace mesos {
+namespace internal {
+namespace slave {
+
+// Forward declaration.
+class MesosContainerizerProcess;
+
+class MesosContainerizer : public Containerizer
+{
+public:
+  static Try<MesosContainerizer*> create(const Flags& flags, bool local);
+
+  MesosContainerizer(
+      const Flags& flags,
+      bool local,
+      const process::Owned<Launcher>& launcher,
+      const std::vector<process::Owned<Isolator> >& isolators);
+
+  virtual ~MesosContainerizer();
+
+  virtual process::Future<Nothing> recover(
+      const Option<state::SlaveState>& state);
+
+  virtual process::Future<Nothing> launch(
+      const ContainerID& containerId,
+      const ExecutorInfo& executorInfo,
+      const std::string& directory,
+      const Option<std::string>& user,
+      const SlaveID& slaveId,
+      const process::PID<Slave>& slavePid,
+      bool checkpoint);
+
+  virtual process::Future<Nothing> launch(
+      const ContainerID& containerId,
+      const TaskInfo& taskInfo,
+      const ExecutorInfo& executorInfo,
+      const std::string& directory,
+      const Option<std::string>& user,
+      const SlaveID& slaveId,
+      const process::PID<Slave>& slavePid,
+      bool checkpoint);
+
+  virtual process::Future<Nothing> update(
+      const ContainerID& containerId,
+      const Resources& resources);
+
+  virtual process::Future<ResourceStatistics> usage(
+      const ContainerID& containerId);
+
+  virtual process::Future<containerizer::Termination> wait(
+      const ContainerID& containerId);
+
+  virtual void destroy(const ContainerID& containerId);
+
+  virtual process::Future<hashset<ContainerID> > containers();
+
+private:
+  MesosContainerizerProcess* process;
+};
+
+
+class MesosContainerizerProcess
+  : public process::Process<MesosContainerizerProcess>
+{
+public:
+  MesosContainerizerProcess(
+      const Flags& _flags,
+      bool _local,
+      const process::Owned<Launcher>& _launcher,
+      const std::vector<process::Owned<Isolator> >& _isolators)
+    : flags(_flags),
+      local(_local),
+      launcher(_launcher),
+      isolators(_isolators) {}
+
+  virtual ~MesosContainerizerProcess() {}
+
+  process::Future<Nothing> recover(
+      const Option<state::SlaveState>& state);
+
+  process::Future<Nothing> launch(
+      const ContainerID& containerId,
+      const ExecutorInfo& executorInfo,
+      const std::string& directory,
+      const Option<std::string>& user,
+      const SlaveID& slaveId,
+      const process::PID<Slave>& slavePid,
+      bool checkpoint);
+
+  process::Future<Nothing> launch(
+      const ContainerID& containerId,
+      const TaskInfo& taskInfo,
+      const ExecutorInfo& executorInfo,
+      const std::string& directory,
+      const Option<std::string>& user,
+      const SlaveID& slaveId,
+      const process::PID<Slave>& slavePid,
+      bool checkpoint);
+
+  process::Future<Nothing> update(
+      const ContainerID& containerId,
+      const Resources& resources);
+
+  process::Future<ResourceStatistics> usage(
+      const ContainerID& containerId);
+
+  process::Future<containerizer::Termination> wait(
+      const ContainerID& containerId);
+
+  void destroy(const ContainerID& containerId);
+
+  process::Future<hashset<ContainerID> > containers();
+
+private:
+  process::Future<Nothing> _recover(
+      const std::list<state::RunState>& recoverable);
+
+  process::Future<Nothing> __recover(
+      const std::list<state::RunState>& recovered);
+
+  process::Future<std::list<Option<CommandInfo> > > prepare(
+      const ContainerID& containerId,
+      const ExecutorInfo& executorInfo,
+      const std::string& directory,
+      const Option<std::string>& user);
+
+  process::Future<Nothing> fetch(
+      const ContainerID& containerId,
+      const CommandInfo& commandInfo,
+      const std::string& directory,
+      const Option<std::string>& user);
+
+  process::Future<Nothing> _launch(
+      const ContainerID& containerId,
+      const ExecutorInfo& executorInfo,
+      const std::string& directory,
+      const Option<std::string>& user,
+      const SlaveID& slaveId,
+      const process::PID<Slave>& slavePid,
+      bool checkpoint,
+      const std::list<Option<CommandInfo> >& scripts);
+
+  process::Future<Nothing> isolate(
+      const ContainerID& containerId,
+      pid_t _pid);
+
+  process::Future<Nothing> exec(
+      const ContainerID& containerId,
+      int pipeWrite);
+
+  // Continues 'destroy()' once all processes have been killed by the launcher.
+  void _destroy(
+      const ContainerID& containerId,
+      const process::Future<Nothing>& future);
+
+  // Continues '_destroy()' once we get the exit status of the executor.
+  void __destroy(
+      const ContainerID& containerId,
+      const process::Future<Option<int > >& status);
+
+  // Continues (and completes) '__destroy()' once all isolators have completed
+  // cleanup.
+  void ___destroy(
+      const ContainerID& containerId,
+      const process::Future<Option<int > >& status,
+      const process::Future<std::list<Nothing> >& futures);
+
+  // Call back for when an isolator limits a container and impacts the
+  // processes. This will trigger container destruction.
+  void limited(
+      const ContainerID& containerId,
+      const process::Future<Limitation>& future);
+
+  // Call back for when the executor exits. This will trigger container
+  // destroy.
+  void reaped(const ContainerID& containerId);
+
+  const Flags flags;
+  const bool local;
+  const process::Owned<Launcher> launcher;
+  const std::vector<process::Owned<Isolator> > isolators;
+
+  // TODO(idownes): Consider putting these per-container variables into a
+  // struct.
+  // Promises for futures returned from wait().
+  hashmap<ContainerID,
+    process::Owned<process::Promise<containerizer::Termination> > > promises;
+
+  // We need to keep track of the future exit status for each executor because
+  // we'll only get a single notification when the executor exits.
+  hashmap<ContainerID, process::Future<Option<int> > > statuses;
+
+  // We keep track of any limitations received from each isolator so we can
+  // determine the cause of an executor termination.
+  multihashmap<ContainerID, Limitation> limitations;
+
+  // We keep track of the resources for each container so we can set the
+  // ResourceStatistics limits in usage().
+  hashmap<ContainerID, Resources> resources;
+
+  // Set of containers that are in process of being destroyed.
+  hashset<ContainerID> destroying;
+};
+
+} // namespace slave {
+} // namespace internal {
+} // namespace mesos {
+
+#endif // __MESOS_CONTAINERIZER_HPP__

http://git-wip-us.apache.org/repos/asf/mesos/blob/d0046dca/src/slave/containerizer/mesos/launch.cpp
----------------------------------------------------------------------
diff --git a/src/slave/containerizer/mesos/launch.cpp b/src/slave/containerizer/mesos/launch.cpp
new file mode 100644
index 0000000..2db1c7a
--- /dev/null
+++ b/src/slave/containerizer/mesos/launch.cpp
@@ -0,0 +1,211 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include <unistd.h>
+
+#include <iostream>
+#include <map>
+
+#include <stout/foreach.hpp>
+#include <stout/os.hpp>
+#include <stout/protobuf.hpp>
+#include <stout/unreachable.hpp>
+
+#include <stout/os/execenv.hpp>
+
+#include "mesos/mesos.hpp"
+
+#include "slave/containerizer/mesos/launch.hpp"
+
+using std::cerr;
+using std::endl;
+using std::map;
+using std::string;
+
+namespace mesos {
+namespace internal {
+namespace slave {
+
+const string MesosContainerizerLaunch::NAME = "launch";
+
+
+MesosContainerizerLaunch::Flags::Flags()
+{
+  add(&command,
+      "command",
+      "The command to execute.");
+
+  add(&directory,
+      "directory",
+      "The directory to chdir to.");
+
+  add(&user,
+      "user",
+      "The user to change to.");
+
+  add(&pipe_read,
+      "pipe_read",
+      "The read end of the control pipe.");
+
+  add(&pipe_write,
+      "pipe_write",
+      "The write end of the control pipe.");
+
+  add(&commands,
+      "commands",
+      "The additional preparation commands to execute before\n"
+      "executing the command.");
+}
+
+
+int MesosContainerizerLaunch::execute()
+{
+  // Check command line flags.
+  if (flags.command.isNone()) {
+    cerr << "Flag --command is not specified" << endl;
+    return 1;
+  }
+
+  if (flags.directory.isNone()) {
+    cerr << "Flag --directory is not specified" << endl;
+    return 1;
+  }
+
+  if (flags.pipe_read.isNone()) {
+    cerr << "Flag --pipe_read is not specified" << endl;
+    return 1;
+  }
+
+  if (flags.pipe_write.isNone()) {
+    cerr << "Flag --pipe_write is not specified" << endl;
+    return 1;
+  }
+
+  // Parse the command.
+  Try<CommandInfo> command =
+    ::protobuf::parse<CommandInfo>(flags.command.get());
+
+  if (command.isError()) {
+    cerr << "Failed to parse the command: " << command.error() << endl;
+    return 1;
+  }
+
+  Try<Nothing> close = os::close(flags.pipe_write.get());
+  if (close.isError()) {
+    cerr << "Failed to close pipe[1]: " << close.error() << endl;
+    return 1;
+  }
+
+  // Do a blocking read on the pipe until the parent signals us to continue.
+  char dummy;
+  ssize_t length;
+  while ((length = ::read(
+              flags.pipe_read.get(),
+              &dummy,
+              sizeof(dummy))) == -1 &&
+          errno == EINTR);
+
+  if (length != sizeof(dummy)) {
+     // There's a reasonable probability this will occur during slave
+     // restarts across a large/busy cluster.
+     cerr << "Failed to synchronize with slave (it's probably exited)" << endl;
+     return 1;
+  }
+
+  close = os::close(flags.pipe_read.get());
+  if (close.isError()) {
+    cerr << "Failed to close pipe[0]: " << close.error() << endl;
+    return 1;
+  }
+
+  // Run additional preparation commands. These are run as the same
+  // user and with the environment as the slave.
+  if (flags.commands.isSome()) {
+    // TODO(jieyu): Use JSON::Array if we have generic parse support.
+    JSON::Object object = flags.commands.get();
+    if (object.values.count("commands") == 0) {
+      cerr << "Invalid JSON format for flag --commands" << endl;
+      return 1;
+    }
+
+    if (!object.values["commands"].is<JSON::Array>()) {
+      cerr << "Invalid JSON format for flag --commands" << endl;
+      return 1;
+    }
+
+    JSON::Array array = object.values["commands"].as<JSON::Array>();
+    foreach (const JSON::Value& value, array.values) {
+      if (!value.is<JSON::Object>()) {
+        cerr << "Invalid JSON format for flag --commands" << endl;
+        return 1;
+      }
+
+      Try<CommandInfo> parse = ::protobuf::parse<CommandInfo>(value);
+      if (parse.isError()) {
+        cerr << "Failed to parse a preparation command: "
+             << parse.error() << endl;
+        return 1;
+      }
+
+      // Block until the command completes.
+      int status = os::system(parse.get().value());
+      if (!WIFEXITED(status) || (WEXITSTATUS(status) != 0)) {
+        cerr << "Failed to execute a preparation command" << endl;
+        return 1;
+      }
+    }
+  }
+
+  // Enter working directory.
+  if (!os::chdir(flags.directory.get())) {
+    cerr << "Failed to chdir into work directory "
+         << flags.directory.get() << endl;
+    return 1;
+  }
+
+  // Change user if provided. Note that we do that after executing the
+  // preparation commands so that those commands will be run with the
+  // same privilege as the mesos-slave.
+  if (flags.user.isSome() && !os::su(flags.user.get())) {
+    cerr << "Failed to change user to " << flags.user.get() << endl;
+    return 1;
+  }
+
+  // Relay the environment variables.
+  // TODO(jieyu): Consider using a clean environment.
+  map<string, string> env;
+  os::ExecEnv envp(env);
+
+  // Execute the command (via '/bin/sh -c command') with its environment.
+  execle(
+      "/bin/sh",
+      "sh",
+      "-c",
+      command.get().value().c_str(),
+      (char*) NULL,
+      envp());
+
+  // If we get here, the execle call failed.
+  cerr << "Failed to execute command" << endl;
+
+  return UNREACHABLE();
+}
+
+} // namespace slave {
+} // namespace internal {
+} // namespace mesos {

http://git-wip-us.apache.org/repos/asf/mesos/blob/d0046dca/src/slave/containerizer/mesos/launch.hpp
----------------------------------------------------------------------
diff --git a/src/slave/containerizer/mesos/launch.hpp b/src/slave/containerizer/mesos/launch.hpp
new file mode 100644
index 0000000..7c8b535
--- /dev/null
+++ b/src/slave/containerizer/mesos/launch.hpp
@@ -0,0 +1,60 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#ifndef __MESOS_CONTAINERIZER_LAUNCH_HPP__
+#define __MESOS_CONTAINERIZER_LAUNCH_HPP__
+
+#include <stout/json.hpp>
+#include <stout/option.hpp>
+#include <stout/subcommand.hpp>
+
+namespace mesos {
+namespace internal {
+namespace slave {
+
+class MesosContainerizerLaunch : public Subcommand
+{
+public:
+  static const std::string NAME;
+
+  struct Flags : public flags::FlagsBase
+  {
+    Flags();
+
+    Option<JSON::Object> command;
+    Option<std::string> directory;
+    Option<std::string> user;
+    Option<int> pipe_read;
+    Option<int> pipe_write;
+    Option<JSON::Object> commands; // Additional preparation commands.
+  };
+
+  MesosContainerizerLaunch() : Subcommand(NAME) {}
+
+  Flags flags;
+
+protected:
+  virtual int execute();
+  virtual flags::FlagsBase* getFlags() { return &flags; }
+};
+
+} // namespace slave {
+} // namespace internal {
+} // namespace mesos {
+
+#endif // __MESOS_CONTAINERIZER_LAUNCH_HPP__

http://git-wip-us.apache.org/repos/asf/mesos/blob/d0046dca/src/slave/containerizer/mesos/main.cpp
----------------------------------------------------------------------
diff --git a/src/slave/containerizer/mesos/main.cpp b/src/slave/containerizer/mesos/main.cpp
new file mode 100644
index 0000000..0e17931
--- /dev/null
+++ b/src/slave/containerizer/mesos/main.cpp
@@ -0,0 +1,34 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include <stout/none.hpp>
+#include <stout/subcommand.hpp>
+
+#include "slave/containerizer/mesos/launch.hpp"
+
+using namespace mesos::internal::slave;
+
+
+int main(int argc, char** argv)
+{
+  return Subcommand::dispatch(
+      None(),
+      argc,
+      argv,
+      new MesosContainerizerLaunch());
+}