You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@mesos.apache.org by ji...@apache.org on 2014/06/25 23:37:43 UTC

[1/7] git commit: Refactored Subprocess to support execve style launch and customized clone function.

Repository: mesos
Updated Branches:
  refs/heads/master b2d13bc3b -> d0046dca7


Refactored Subprocess to support execve style launch and customized
clone function.

Review: https://reviews.apache.org/r/22831


Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/1b0fdf01
Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/1b0fdf01
Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/1b0fdf01

Branch: refs/heads/master
Commit: 1b0fdf01b47591ebd966eba3e0df360caa40888e
Parents: b2d13bc
Author: Jie Yu <yu...@gmail.com>
Authored: Fri Jun 20 12:28:02 2014 -0700
Committer: Jie Yu <yu...@gmail.com>
Committed: Wed Jun 25 14:31:29 2014 -0700

----------------------------------------------------------------------
 .../libprocess/include/process/subprocess.hpp   | 101 ++++++++--
 3rdparty/libprocess/src/subprocess.cpp          | 201 ++++++++++++-------
 .../libprocess/src/tests/subprocess_tests.cpp   |  19 +-
 3 files changed, 223 insertions(+), 98 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/mesos/blob/1b0fdf01/3rdparty/libprocess/include/process/subprocess.hpp
----------------------------------------------------------------------
diff --git a/3rdparty/libprocess/include/process/subprocess.hpp b/3rdparty/libprocess/include/process/subprocess.hpp
index d6e2c1f..7ff5a10 100644
--- a/3rdparty/libprocess/include/process/subprocess.hpp
+++ b/3rdparty/libprocess/include/process/subprocess.hpp
@@ -7,6 +7,7 @@
 
 #include <map>
 #include <string>
+#include <vector>
 
 #include <process/future.hpp>
 
@@ -38,17 +39,25 @@ public:
   //   3. FD: Redirect to an open file descriptor.
   class IO
   {
+  public:
+    bool isPipe() const { return mode == PIPE; }
+    bool isPath() const { return mode == PATH; }
+    bool isFd() const { return mode == FD; }
+
   private:
     friend class Subprocess;
 
     friend Try<Subprocess> subprocess(
-        const std::string& command,
-        const IO& in,
-        const IO& out,
-        const IO& err,
+        const std::string& path,
+        std::vector<std::string> argv,
+        const Subprocess::IO& in,
+        const Subprocess::IO& out,
+        const Subprocess::IO& err,
         const Option<flags::FlagsBase>& flags,
         const Option<std::map<std::string, std::string> >& environment,
-        const Option<lambda::function<int()> >& setup);
+        const Option<lambda::function<int()> >& setup,
+        const Option<lambda::function<
+            pid_t(const lambda::function<int()>&)> >& clone);
 
     enum Mode
     {
@@ -95,13 +104,16 @@ public:
 
 private:
   friend Try<Subprocess> subprocess(
-      const std::string& command,
-      const IO& in,
-      const IO& out,
-      const IO& err,
+      const std::string& path,
+      std::vector<std::string> argv,
+      const Subprocess::IO& in,
+      const Subprocess::IO& out,
+      const Subprocess::IO& err,
       const Option<flags::FlagsBase>& flags,
       const Option<std::map<std::string, std::string> >& environment,
-      const Option<lambda::function<int()> >& setup);
+      const Option<lambda::function<int()> >& setup,
+      const Option<lambda::function<
+          pid_t(const lambda::function<int()>&)> >& clone);
 
   struct Data
   {
@@ -141,29 +153,86 @@ private:
 // must not contain any async unsafe code.
 // TODO(dhamon): Add an option to not combine the two environments.
 Try<Subprocess> subprocess(
-    const std::string& command,
+    const std::string& path,
+    std::vector<std::string> argv,
     const Subprocess::IO& in,
     const Subprocess::IO& out,
     const Subprocess::IO& err,
     const Option<flags::FlagsBase>& flags = None(),
     const Option<std::map<std::string, std::string> >& environment = None(),
-    const Option<lambda::function<int()> >& setup = None());
+    const Option<lambda::function<int()> >& setup = None(),
+    const Option<lambda::function<
+        pid_t(const lambda::function<int()>&)> >& clone = None());
 
 
 inline Try<Subprocess> subprocess(
-    const std::string& command,
+    const std::string& path,
+    std::vector<std::string> argv,
     const Option<flags::FlagsBase>& flags = None(),
     const Option<std::map<std::string, std::string> >& environment = None(),
-    const Option<lambda::function<int()> >& setup = None())
+    const Option<lambda::function<int()> >& setup = None(),
+    const Option<lambda::function<
+        pid_t(const lambda::function<int()>&)> >& clone = None())
 {
   return subprocess(
-      command,
+      path,
+      argv,
       Subprocess::FD(STDIN_FILENO),
       Subprocess::FD(STDOUT_FILENO),
       Subprocess::FD(STDERR_FILENO),
       flags,
       environment,
-      setup);
+      setup,
+      clone);
+}
+
+
+// Overloads for launching a shell command. Currently, we do not
+// support flags for shell command variants due to the complexity
+// involved in escaping quotes in flags.
+inline Try<Subprocess> subprocess(
+    const std::string& command,
+    const Subprocess::IO& in,
+    const Subprocess::IO& out,
+    const Subprocess::IO& err,
+    const Option<std::map<std::string, std::string> >& environment = None(),
+    const Option<lambda::function<int()> >& setup = None(),
+    const Option<lambda::function<
+        pid_t(const lambda::function<int()>&)> >& clone = None())
+{
+  std::vector<std::string> argv(3);
+  argv[0] = "sh";
+  argv[1] = "-c";
+  argv[2] = command;
+
+  return subprocess(
+      "/bin/sh",
+      argv,
+      in,
+      out,
+      err,
+      None(),
+      environment,
+      setup,
+      clone);
+}
+
+
+inline Try<Subprocess> subprocess(
+    const std::string& command,
+    const Option<std::map<std::string, std::string> >& environment = None(),
+    const Option<lambda::function<int()> >& setup = None(),
+    const Option<lambda::function<
+        pid_t(const lambda::function<int()>&)> >& clone = None())
+{
+  return subprocess(
+      command,
+      Subprocess::FD(STDIN_FILENO),
+      Subprocess::FD(STDOUT_FILENO),
+      Subprocess::FD(STDERR_FILENO),
+      environment,
+      setup,
+      clone);
 }
 
 } // namespace process {

http://git-wip-us.apache.org/repos/asf/mesos/blob/1b0fdf01/3rdparty/libprocess/src/subprocess.cpp
----------------------------------------------------------------------
diff --git a/3rdparty/libprocess/src/subprocess.cpp b/3rdparty/libprocess/src/subprocess.cpp
index 6ac7b5b..68bfd5d 100644
--- a/3rdparty/libprocess/src/subprocess.cpp
+++ b/3rdparty/libprocess/src/subprocess.cpp
@@ -24,6 +24,7 @@
 
 using std::map;
 using std::string;
+using std::vector;
 
 namespace process {
 namespace internal {
@@ -85,15 +86,99 @@ static Try<Nothing> cloexec(int stdinFd[2], int stdoutFd[2], int stderrFd[2])
 }  // namespace internal {
 
 
-// Runs the provided command in a subprocess.
+static pid_t defaultClone(const lambda::function<int()>& func)
+{
+  pid_t pid = ::fork();
+  if (pid == -1) {
+    return -1;
+  } else if (pid == 0) {
+    // Child.
+    ::exit(func());
+    return UNREACHABLE();
+  } else {
+    // Parent.
+    return pid;
+  }
+}
+
+
+// The main entry of the child process. Note that this function has to
+// be async singal safe.
+static int childMain(
+    const string& path,
+    char** argv,
+    const Subprocess::IO& in,
+    const Subprocess::IO& out,
+    const Subprocess::IO& err,
+    os::ExecEnv* envp,
+    const Option<lambda::function<int()> >& setup,
+    int stdinFd[2],
+    int stdoutFd[2],
+    int stderrFd[2])
+{
+  // Close parent's end of the pipes.
+  if (in.isPipe()) {
+    while (::close(stdinFd[1]) == -1 && errno == EINTR);
+  }
+  if (out.isPipe()) {
+    while (::close(stdoutFd[0]) == -1 && errno == EINTR);
+  }
+  if (err.isPipe()) {
+    while (::close(stderrFd[0]) == -1 && errno == EINTR);
+  }
+
+  // Redirect I/O for stdin/stdout/stderr.
+  while (::dup2(stdinFd[0], STDIN_FILENO) == -1 && errno == EINTR);
+  while (::dup2(stdoutFd[1], STDOUT_FILENO) == -1 && errno == EINTR);
+  while (::dup2(stderrFd[1], STDERR_FILENO) == -1 && errno == EINTR);
+
+  // Close the copies. We need to make sure that we do not close the
+  // file descriptor assigned to stdin/stdout/stderr in case the
+  // parent has closed stdin/stdout/stderr when calling this
+  // function (in that case, a dup'ed file descriptor may have the
+  // same file descriptor number as stdin/stdout/stderr).
+  if (stdinFd[0] != STDIN_FILENO &&
+      stdinFd[0] != STDOUT_FILENO &&
+      stdinFd[0] != STDERR_FILENO) {
+    while (::close(stdinFd[0]) == -1 && errno == EINTR);
+  }
+  if (stdoutFd[1] != STDIN_FILENO &&
+      stdoutFd[1] != STDOUT_FILENO &&
+      stdoutFd[1] != STDERR_FILENO) {
+    while (::close(stdoutFd[1]) == -1 && errno == EINTR);
+  }
+  if (stderrFd[1] != STDIN_FILENO &&
+      stderrFd[1] != STDOUT_FILENO &&
+      stderrFd[1] != STDERR_FILENO) {
+    while (::close(stderrFd[1]) == -1 && errno == EINTR);
+  }
+
+  if (setup.isSome()) {
+    int status = setup.get()();
+    if (status != 0) {
+      _exit(status);
+    }
+  }
+
+  execve(path.c_str(), argv, (*envp)());
+
+  ABORT("Failed to execve in childMain\n");
+
+  return UNREACHABLE();
+}
+
+
 Try<Subprocess> subprocess(
-    const string& _command,
+    const string& path,
+    vector<string> argv,
     const Subprocess::IO& in,
     const Subprocess::IO& out,
     const Subprocess::IO& err,
     const Option<flags::FlagsBase>& flags,
     const Option<map<string, string> >& environment,
-    const Option<lambda::function<int()> >& setup)
+    const Option<lambda::function<int()> >& setup,
+    const Option<lambda::function<
+        pid_t(const lambda::function<int()>&)> >& _clone)
 {
   // File descriptors for redirecting stdin/stdout/stderr. These file
   // descriptors are used for different purposes depending on the
@@ -116,7 +201,7 @@ Try<Subprocess> subprocess(
       break;
     }
     case Subprocess::IO::PIPE: {
-      if (pipe(stdinFd) == -1) {
+      if (::pipe(stdinFd) == -1) {
         return ErrnoError("Failed to create pipe");
       }
       break;
@@ -147,7 +232,7 @@ Try<Subprocess> subprocess(
       break;
     }
     case Subprocess::IO::PIPE: {
-      if (pipe(stdoutFd) == -1) {
+      if (::pipe(stdoutFd) == -1) {
         // Save the errno as 'close' below might overwrite it.
         ErrnoError error("Failed to create pipe");
         internal::close(stdinFd, stdoutFd, stderrFd);
@@ -186,7 +271,7 @@ Try<Subprocess> subprocess(
       break;
     }
     case Subprocess::IO::PIPE: {
-      if (pipe(stderrFd) == -1) {
+      if (::pipe(stderrFd) == -1) {
         // Save the errno as 'close' below might overwrite it.
         ErrnoError error("Failed to create pipe");
         internal::close(stdinFd, stdoutFd, stderrFd);
@@ -219,95 +304,65 @@ Try<Subprocess> subprocess(
     return Error("Failed to cloexec: " + cloexec.error());
   }
 
-  // Prepare the command to execute. If the user specifies the
-  // 'flags', we will stringify it and append it to the command.
-  string command = _command;
-
+  // Prepare the arguments. If the user specifies the 'flags', we will
+  // stringify them and append them to the existing arguments.
   if (flags.isSome()) {
     foreachpair (const string& name, const flags::Flag& flag, flags.get()) {
       Option<string> value = flag.stringify(flags.get());
       if (value.isSome()) {
-        // TODO(jieyu): Need a better way to escape quotes. For
-        // example, what if 'value.get()' contains a single quote?
-        string argument = "--" + name + "='" + value.get() + "'";
-        command = strings::join(" ", command, argument);
+        argv.push_back("--" + name + "=" + value.get());
       }
     }
   }
 
-  // We need to do this construction before doing the fork as it
+  // The real arguments that will be passed to 'execve'. We need to
+  // construct them here before doing the clone as it might not be
+  // async signal safe.
+  char** _argv = new char*[argv.size() + 1];
+  for (int i = 0; i < argv.size(); i++) {
+    _argv[i] = (char*) argv[i].c_str();
+  }
+  _argv[argv.size()] = NULL;
+
+  // We need to do this construction before doing the clone as it
   // might not be async-safe.
   // TODO(tillt): Consider optimizing this to not pass an empty map
   // into the constructor or even further to use execl instead of
   // execle once we have no user supplied environment.
   os::ExecEnv envp(environment.get(map<string, string>()));
 
-  pid_t pid;
-  if ((pid = fork()) == -1) {
+  // Determine the function to clone the child process. If the user
+  // does not specify the clone function, we will use the default.
+  lambda::function<pid_t(const lambda::function<int()>&)> clone =
+    (_clone.isSome() ? _clone.get() : defaultClone);
+
+  // Now, clone the child process.
+  pid_t pid = clone(lambda::bind(
+      &childMain,
+      path,
+      _argv,
+      in,
+      out,
+      err,
+      &envp,
+      setup,
+      stdinFd,
+      stdoutFd,
+      stderrFd));
+
+  delete[] _argv;
+
+  if (pid == -1) {
     // Save the errno as 'close' below might overwrite it.
-    ErrnoError error("Failed to fork");
+    ErrnoError error("Failed to clone");
     internal::close(stdinFd, stdoutFd, stderrFd);
     return error;
   }
 
+  // Parent.
   Subprocess process;
   process.data->pid = pid;
 
-  if (process.data->pid == 0) {
-    // Child.
-    // Close parent's end of the pipes.
-    if (in.mode == Subprocess::IO::PIPE) {
-      while (::close(stdinFd[1]) == -1 && errno == EINTR);
-    }
-    if (out.mode == Subprocess::IO::PIPE) {
-      while (::close(stdoutFd[0]) == -1 && errno == EINTR);
-    }
-    if (err.mode == Subprocess::IO::PIPE) {
-      while (::close(stderrFd[0]) == -1 && errno == EINTR);
-    }
-
-    // Redirect I/O for stdin/stdout/stderr.
-    while (::dup2(stdinFd[0], STDIN_FILENO) == -1 && errno == EINTR);
-    while (::dup2(stdoutFd[1], STDOUT_FILENO) == -1 && errno == EINTR);
-    while (::dup2(stderrFd[1], STDERR_FILENO) == -1 && errno == EINTR);
-
-    // Close the copies. We need to make sure that we do not close the
-    // file descriptor assigned to stdin/stdout/stderr in case the
-    // parent has closed stdin/stdout/stderr when calling this
-    // function (in that case, a dup'ed file descriptor may have the
-    // same file descriptor number as stdin/stdout/stderr).
-    if (stdinFd[0] != STDIN_FILENO &&
-        stdinFd[0] != STDOUT_FILENO &&
-        stdinFd[0] != STDERR_FILENO) {
-      while (::close(stdinFd[0]) == -1 && errno == EINTR);
-    }
-    if (stdoutFd[1] != STDIN_FILENO &&
-        stdoutFd[1] != STDOUT_FILENO &&
-        stdoutFd[1] != STDERR_FILENO) {
-      while (::close(stdoutFd[1]) == -1 && errno == EINTR);
-    }
-    if (stderrFd[1] != STDIN_FILENO &&
-        stderrFd[1] != STDOUT_FILENO &&
-        stderrFd[1] != STDERR_FILENO) {
-      while (::close(stderrFd[1]) == -1 && errno == EINTR);
-    }
-
-    if (setup.isSome()) {
-      int status = setup.get()();
-      if (status != 0) {
-        _exit(status);
-      }
-    }
-
-    // TODO(jieyu): Consider providing an optional way to launch the
-    // subprocess without using the shell (similar to 'shell=False'
-    // used in python subprocess.Popen).
-    execle("/bin/sh", "sh", "-c", command.c_str(), (char*) NULL, envp());
-
-    ABORT("Failed to execle '/bin/sh -c ", command.c_str(), "'\n");
-  }
-
-  // Parent.
   // Close the file descriptors that are created by this function. For
   // pipes, we close the child ends and store the parent ends (see the
   // code below).

http://git-wip-us.apache.org/repos/asf/mesos/blob/1b0fdf01/3rdparty/libprocess/src/tests/subprocess_tests.cpp
----------------------------------------------------------------------
diff --git a/3rdparty/libprocess/src/tests/subprocess_tests.cpp b/3rdparty/libprocess/src/tests/subprocess_tests.cpp
index 7dfa384..98a4e44 100644
--- a/3rdparty/libprocess/src/tests/subprocess_tests.cpp
+++ b/3rdparty/libprocess/src/tests/subprocess_tests.cpp
@@ -509,6 +509,8 @@ struct Flags : public flags::FlagsBase
     add(&b, "b", "bool");
     add(&i, "i", "int");
     add(&s, "s", "string");
+    add(&s2, "s2", "string with single quote");
+    add(&s3, "s3", "string with double quote");
     add(&d, "d", "Duration");
     add(&y, "y", "Bytes");
     add(&j, "j", "JSON::Object");
@@ -517,6 +519,8 @@ struct Flags : public flags::FlagsBase
   Option<bool> b;
   Option<int> i;
   Option<string> s;
+  Option<string> s2;
+  Option<string> s3;
   Option<Duration> d;
   Option<Bytes> y;
   Option<JSON::Object> j;
@@ -531,6 +535,8 @@ TEST_F(SubprocessTest, Flags)
   flags.b = true;
   flags.i = 42;
   flags.s = "hello";
+  flags.s2 = "we're";
+  flags.s3 = "\"geek\"";
   flags.d = Seconds(10);
   flags.y = Bytes(100);
 
@@ -555,7 +561,8 @@ TEST_F(SubprocessTest, Flags)
   string out = path::join(os::getcwd(), "stdout");
 
   Try<Subprocess> s = subprocess(
-      "echo",
+      "/bin/echo",
+      vector<string>(1, "echo"),
       Subprocess::PIPE(),
       Subprocess::PATH(out),
       Subprocess::PIPE(),
@@ -597,6 +604,8 @@ TEST_F(SubprocessTest, Flags)
   EXPECT_EQ(flags.b, flags2.b);
   EXPECT_EQ(flags.i, flags2.i);
   EXPECT_EQ(flags.s, flags2.s);
+  EXPECT_EQ(flags.s2, flags2.s2);
+  EXPECT_EQ(flags.s3, flags2.s3);
   EXPECT_EQ(flags.d, flags2.d);
   EXPECT_EQ(flags.y, flags2.y);
   EXPECT_EQ(flags.j, flags2.j);
@@ -623,7 +632,6 @@ TEST_F(SubprocessTest, Environment)
       Subprocess::PIPE(),
       Subprocess::PIPE(),
       Subprocess::PIPE(),
-      None(),
       environment);
 
   ASSERT_SOME(s);
@@ -654,7 +662,6 @@ TEST_F(SubprocessTest, Environment)
       Subprocess::PIPE(),
       Subprocess::PIPE(),
       Subprocess::PIPE(),
-      None(),
       environment);
 
   ASSERT_SOME(s);
@@ -692,7 +699,6 @@ TEST_F(SubprocessTest, EnvironmentWithSpaces)
       Subprocess::PIPE(),
       Subprocess::PIPE(),
       Subprocess::PIPE(),
-      None(),
       environment);
 
   ASSERT_SOME(s);
@@ -730,7 +736,6 @@ TEST_F(SubprocessTest, EnvironmentWithSpacesAndQuotes)
       Subprocess::PIPE(),
       Subprocess::PIPE(),
       Subprocess::PIPE(),
-      None(),
       environment);
 
   ASSERT_SOME(s);
@@ -770,7 +775,6 @@ TEST_F(SubprocessTest, EnvironmentOverride)
       Subprocess::PIPE(),
       Subprocess::PIPE(),
       Subprocess::PIPE(),
-      None(),
       environment);
 
   ASSERT_SOME(s);
@@ -820,7 +824,6 @@ TEST_F(SubprocessTest, Setup)
       Subprocess::PIPE(),
       Subprocess::PIPE(),
       None(),
-      None(),
       lambda::bind(&setupChdir, directory.get()));
 
   ASSERT_SOME(s);
@@ -862,7 +865,6 @@ TEST_F(SubprocessTest, SetupStatus)
       Subprocess::PIPE(),
       Subprocess::PIPE(),
       None(),
-      None(),
       lambda::bind(&setupStatus, 1));
 
   ASSERT_SOME(s);
@@ -889,7 +891,6 @@ TEST_F(SubprocessTest, SetupStatus)
       Subprocess::PIPE(),
       Subprocess::PIPE(),
       None(),
-      None(),
       lambda::bind(&setupStatus, 0));
 
   ASSERT_SOME(s);


[6/7] Refactored the mesos containerizer launcher to fix MESOS-1404.

Posted by ji...@apache.org.
http://git-wip-us.apache.org/repos/asf/mesos/blob/d0046dca/src/slave/containerizer/mesos_containerizer.cpp
----------------------------------------------------------------------
diff --git a/src/slave/containerizer/mesos_containerizer.cpp b/src/slave/containerizer/mesos_containerizer.cpp
deleted file mode 100644
index 8a109f4..0000000
--- a/src/slave/containerizer/mesos_containerizer.cpp
+++ /dev/null
@@ -1,1174 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-#include <sstream>
-
-#include <process/collect.hpp>
-#include <process/defer.hpp>
-#include <process/io.hpp>
-#include <process/reap.hpp>
-#include <process/subprocess.hpp>
-
-#include <stout/fatal.hpp>
-#include <stout/os.hpp>
-#include <stout/unreachable.hpp>
-
-#include <stout/os/execenv.hpp>
-
-#include "slave/paths.hpp"
-#include "slave/slave.hpp"
-
-#ifdef __linux__
-#include "slave/containerizer/linux_launcher.hpp"
-#endif // __linux__
-#include "slave/containerizer/containerizer.hpp"
-#include "slave/containerizer/isolator.hpp"
-#include "slave/containerizer/launcher.hpp"
-#include "slave/containerizer/mesos_containerizer.hpp"
-
-#include "slave/containerizer/isolators/posix.hpp"
-#ifdef __linux__
-#include "slave/containerizer/isolators/cgroups/cpushare.hpp"
-#include "slave/containerizer/isolators/cgroups/mem.hpp"
-#include "slave/containerizer/isolators/cgroups/perf_event.hpp"
-#endif // __linux__
-
-using std::list;
-using std::map;
-using std::string;
-using std::vector;
-
-using namespace process;
-
-namespace mesos {
-namespace internal {
-namespace slave {
-
-using state::SlaveState;
-using state::FrameworkState;
-using state::ExecutorState;
-using state::RunState;
-
-// Local function declaration/definitions.
-Future<Nothing> _nothing() { return Nothing(); }
-
-
-// Helper method to build the environment map used to launch fetcher.
-map<string, string> fetcherEnvironment(
-    const CommandInfo& commandInfo,
-    const std::string& directory,
-    const Option<std::string>& user,
-    const Flags& flags)
-{
-  // Prepare the environment variables to pass to mesos-fetcher.
-  string uris = "";
-  foreach (const CommandInfo::URI& uri, commandInfo.uris()) {
-    uris += uri.value() + "+" +
-            (uri.has_executable() && uri.executable() ? "1" : "0") +
-            (uri.extract() ? "X" : "N");
-    uris += " ";
-  }
-  // Remove extra space at the end.
-  uris = strings::trim(uris);
-
-  map<string, string> environment;
-  environment["MESOS_EXECUTOR_URIS"] = uris;
-  environment["MESOS_WORK_DIRECTORY"] = directory;
-  if (user.isSome()) {
-    environment["MESOS_USER"] = user.get();
-  }
-  if (!flags.frameworks_home.empty()) {
-    environment["MESOS_FRAMEWORKS_HOME"] = flags.frameworks_home;
-  }
-  if (!flags.hadoop_home.empty()) {
-    environment["HADOOP_HOME"] = flags.hadoop_home;
-  }
-
-  return environment;
-}
-
-
-Try<MesosContainerizer*> MesosContainerizer::create(
-    const Flags& flags,
-    bool local)
-{
-  string isolation;
-  if (flags.isolation == "process") {
-    LOG(WARNING) << "The 'process' isolation flag is deprecated, "
-                 << "please update your flags to"
-                 << " '--isolation=posix/cpu,posix/mem'.";
-    isolation = "posix/cpu,posix/mem";
-  } else if (flags.isolation == "cgroups") {
-    LOG(WARNING) << "The 'cgroups' isolation flag is deprecated, "
-                 << "please update your flags to"
-                 << " '--isolation=cgroups/cpu,cgroups/mem'.";
-    isolation = "cgroups/cpu,cgroups/mem";
-  } else {
-    isolation = flags.isolation;
-  }
-
-  LOG(INFO) << "Using isolation: " << isolation;
-
-  // Create a MesosContainerizerProcess using isolators and a launcher.
-  hashmap<std::string, Try<Isolator*> (*)(const Flags&)> creators;
-
-  creators["posix/cpu"]   = &PosixCpuIsolatorProcess::create;
-  creators["posix/mem"]   = &PosixMemIsolatorProcess::create;
-#ifdef __linux__
-  creators["cgroups/cpu"] = &CgroupsCpushareIsolatorProcess::create;
-  creators["cgroups/mem"] = &CgroupsMemIsolatorProcess::create;
-  creators["cgroups/perf_event"] = &CgroupsPerfEventIsolatorProcess::create;
-#endif // __linux__
-
-  vector<Owned<Isolator> > isolators;
-
-  foreach (const string& type, strings::split(isolation, ",")) {
-    if (creators.contains(type)) {
-      Try<Isolator*> isolator = creators[type](flags);
-      if (isolator.isError()) {
-        return Error(
-            "Could not create isolator " + type + ": " + isolator.error());
-      } else {
-        isolators.push_back(Owned<Isolator>(isolator.get()));
-      }
-    } else {
-      return Error("Unknown or unsupported isolator: " + type);
-    }
-  }
-
-#ifdef __linux__
-  // Use cgroups on Linux if any cgroups isolators are used.
-  Try<Launcher*> launcher = strings::contains(isolation, "cgroups")
-    ? LinuxLauncher::create(flags)
-    : PosixLauncher::create(flags);
-#else
-  Try<Launcher*> launcher = PosixLauncher::create(flags);
-#endif // __linux__
-
-  if (launcher.isError()) {
-    return Error("Failed to create launcher: " + launcher.error());
-  }
-
-  return new MesosContainerizer(
-      flags, local, Owned<Launcher>(launcher.get()), isolators);
-}
-
-
-MesosContainerizer::MesosContainerizer(
-    const Flags& flags,
-    bool local,
-    const Owned<Launcher>& launcher,
-    const vector<Owned<Isolator> >& isolators)
-{
-  process = new MesosContainerizerProcess(
-      flags, local, launcher, isolators);
-  spawn(process);
-}
-
-
-MesosContainerizer::~MesosContainerizer()
-{
-  terminate(process);
-  process::wait(process);
-  delete process;
-}
-
-
-Future<Nothing> MesosContainerizer::recover(
-    const Option<state::SlaveState>& state)
-{
-  return dispatch(process, &MesosContainerizerProcess::recover, state);
-}
-
-
-Future<Nothing> MesosContainerizer::launch(
-    const ContainerID& containerId,
-    const ExecutorInfo& executorInfo,
-    const string& directory,
-    const Option<string>& user,
-    const SlaveID& slaveId,
-    const PID<Slave>& slavePid,
-    bool checkpoint)
-{
-  return dispatch(process,
-                  &MesosContainerizerProcess::launch,
-                  containerId,
-                  executorInfo,
-                  directory,
-                  user,
-                  slaveId,
-                  slavePid,
-                  checkpoint);
-}
-
-
-Future<Nothing> MesosContainerizer::launch(
-    const ContainerID& containerId,
-    const TaskInfo& taskInfo,
-    const ExecutorInfo& executorInfo,
-    const string& directory,
-    const Option<string>& user,
-    const SlaveID& slaveId,
-    const PID<Slave>& slavePid,
-    bool checkpoint)
-{
-  return dispatch(process,
-                  &MesosContainerizerProcess::launch,
-                  containerId,
-                  taskInfo,
-                  executorInfo,
-                  directory,
-                  user,
-                  slaveId,
-                  slavePid,
-                  checkpoint);
-}
-
-
-Future<Nothing> MesosContainerizer::update(
-    const ContainerID& containerId,
-    const Resources& resources)
-{
-  return dispatch(process,
-                  &MesosContainerizerProcess::update,
-                  containerId,
-                  resources);
-}
-
-
-Future<ResourceStatistics> MesosContainerizer::usage(
-    const ContainerID& containerId)
-{
-  return dispatch(process, &MesosContainerizerProcess::usage, containerId);
-}
-
-
-Future<containerizer::Termination> MesosContainerizer::wait(
-    const ContainerID& containerId)
-{
-  return dispatch(process, &MesosContainerizerProcess::wait, containerId);
-}
-
-
-void MesosContainerizer::destroy(const ContainerID& containerId)
-{
-  dispatch(process, &MesosContainerizerProcess::destroy, containerId);
-}
-
-
-Future<hashset<ContainerID> > MesosContainerizer::containers()
-{
-  return dispatch(process, &MesosContainerizerProcess::containers);
-}
-
-
-Future<Nothing> MesosContainerizerProcess::recover(
-    const Option<state::SlaveState>& state)
-{
-  LOG(INFO) << "Recovering containerizer";
-
-  // Gather the executor run states that we will attempt to recover.
-  list<RunState> recoverable;
-  if (state.isSome()) {
-    foreachvalue (const FrameworkState& framework, state.get().frameworks) {
-      foreachvalue (const ExecutorState& executor, framework.executors) {
-        if (executor.info.isNone()) {
-          LOG(WARNING) << "Skipping recovery of executor '" << executor.id
-                       << "' of framework " << framework.id
-                       << " because its info could not be recovered";
-          continue;
-        }
-
-        if (executor.latest.isNone()) {
-          LOG(WARNING) << "Skipping recovery of executor '" << executor.id
-                       << "' of framework " << framework.id
-                       << " because its latest run could not be recovered";
-          continue;
-        }
-
-        // We are only interested in the latest run of the executor!
-        const ContainerID& containerId = executor.latest.get();
-        Option<RunState> run = executor.runs.get(containerId);
-        CHECK_SOME(run);
-
-        // We need the pid so the reaper can monitor the executor so skip this
-        // executor if it's not present. This is not an error because the slave
-        // will try to wait on the container which will return a failed
-        // Termination and everything will get cleaned up.
-        if (!run.get().forkedPid.isSome()) {
-          continue;
-        }
-
-        if (run.get().completed) {
-          VLOG(1) << "Skipping recovery of executor '" << executor.id
-                  << "' of framework " << framework.id
-                  << " because its latest run "
-                  << containerId << " is completed";
-          continue;
-        }
-
-        LOG(INFO) << "Recovering container '" << containerId
-                  << "' for executor '" << executor.id
-                  << "' of framework " << framework.id;
-
-        recoverable.push_back(run.get());
-      }
-    }
-  }
-
-  // Try to recover the launcher first.
-  return launcher->recover(recoverable)
-    .then(defer(self(), &Self::_recover, recoverable));
-}
-
-
-Future<Nothing> MesosContainerizerProcess::_recover(
-    const list<RunState>& recoverable)
-{
-  // Then recover the isolators.
-  list<Future<Nothing> > futures;
-  foreach (const Owned<Isolator>& isolator, isolators) {
-    futures.push_back(isolator->recover(recoverable));
-  }
-
-  // If all isolators recover then continue.
-  return collect(futures)
-    .then(defer(self(), &Self::__recover, recoverable));
-}
-
-
-Future<Nothing> MesosContainerizerProcess::__recover(
-    const list<RunState>& recovered)
-{
-  foreach (const RunState& run, recovered) {
-    CHECK_SOME(run.id);
-    const ContainerID& containerId = run.id.get();
-
-    Owned<Promise<containerizer::Termination> > promise(
-        new Promise<containerizer::Termination>());
-    promises.put(containerId, promise);
-
-    CHECK_SOME(run.forkedPid);
-    Future<Option<int > > status = process::reap(run.forkedPid.get());
-    statuses[containerId] = status;
-    status.onAny(defer(self(), &Self::reaped, containerId));
-
-    foreach (const Owned<Isolator>& isolator, isolators) {
-      isolator->watch(containerId)
-        .onAny(defer(self(), &Self::limited, containerId, lambda::_1));
-    }
-  }
-
-  return Nothing();
-}
-
-
-// This function is executed by the forked child and must remain
-// async-signal-safe.
-int execute(
-    const CommandInfo& command,
-    const string& directory,
-    const os::ExecEnv& envp,
-    uid_t uid,
-    gid_t gid,
-    bool redirectIO,
-    int pipeRead,
-    int pipeWrite,
-    const list<Option<CommandInfo> >& commands)
-{
-  if (close(pipeWrite) != 0) {
-    ABORT("Failed to close pipe[1]");
-  }
-
-  // Do a blocking read on the pipe until the parent signals us to continue.
-  char dummy;
-  ssize_t length;
-  while ((length = read(pipeRead, &dummy, sizeof(dummy))) == -1 &&
-         errno == EINTR);
-
-  if (length != sizeof(dummy)) {
-    // This will occur if the slave terminates during executor launch.
-    // There's a reasonable probability this will occur during slave restarts
-    // across a large/busy cluster so we log and exit non-zero rather than
-    // ABORT.
-    const char* message =
-      "Failed to synchronize with slave (it has probably exited)";
-
-    while (write(STDERR_FILENO, message, strlen(message)) == -1 &&
-           errno == EINTR);
-
-    _exit(1);
-  }
-
-  if (close(pipeRead) != 0) {
-    ABORT("Failed to close pipe[0]");
-  }
-
-  // Change gid and uid.
-  if (setgid(gid) != 0) {
-    ABORT("Failed to set gid");
-  }
-
-  if (setuid(uid) != 0) {
-    ABORT("Failed to set uid");
-  }
-
-  // Enter working directory.
-  if (chdir(directory.c_str()) != 0) {
-    ABORT("Failed to chdir into work directory");
-  }
-
-  // Redirect output to files in working dir if required. We append because
-  // others (e.g., mesos-fetcher) may have already logged to the files.
-  // TODO(bmahler): It would be best if instead of closing stderr /
-  // stdout and redirecting, we instead always output to stderr /
-  // stdout. Also tee'ing their output into the work directory files
-  // when redirection is desired.
-  if (redirectIO) {
-    int fd;
-    while ((fd = open(
-        "stdout",
-        O_CREAT | O_WRONLY | O_APPEND,
-        S_IRUSR | S_IWUSR | S_IRGRP | S_IWGRP | S_IROTH)) == -1 &&
-            errno == EINTR);
-    if (fd == -1) {
-      ABORT("Failed to open stdout");
-    }
-
-    int status;
-    while ((status = dup2(fd, STDOUT_FILENO)) == -1 && errno == EINTR);
-    if (status == -1) {
-      ABORT("Failed to dup2 for stdout");
-    }
-
-    if (close(fd) == -1) {
-      ABORT("Failed to close stdout fd");
-    }
-
-    while ((fd = open(
-        "stderr",
-        O_CREAT | O_WRONLY | O_APPEND,
-        S_IRUSR | S_IWUSR | S_IRGRP | S_IWGRP | S_IROTH)) == -1 &&
-            errno == EINTR);
-    if (fd == -1) {
-      ABORT("Failed to open stderr");
-    }
-
-    while ((status = dup2(fd, STDERR_FILENO)) == -1 && errno == EINTR);
-    if (status == -1) {
-      ABORT("Failed to dup2 for stderr");
-    }
-
-    if (close(fd) == -1) {
-      ABORT("Failed to close stderr fd");
-    }
-  }
-
-  // Run additional preparation commands. These are run as the same user and
-  // with the environment as the slave.
-  // NOTE: os::system() is async-signal-safe.
-  foreach (const Option<CommandInfo>& command, commands) {
-    if (command.isSome()) {
-      // Block until the command completes.
-      int status = os::system(command.get().value());
-
-      if (!WIFEXITED(status) || (WEXITSTATUS(status) != 0)) {
-        ABORT("Command '",
-              command.get().value().c_str(),
-              "' failed to execute successfully");
-      }
-    }
-  }
-
-  // Execute the command (via '/bin/sh -c command') with its environment.
-  execle("/bin/sh", "sh", "-c", command.value().c_str(), (char*) NULL, envp());
-
-  // If we get here, the execv call failed.
-  ABORT("Failed to execute command");
-
-  // This should not be reached.
-  return -1;
-}
-
-
-// Launching an executor involves the following steps:
-// 1. Call prepare on each isolator.
-// 2. Fork the executor. The forked child is blocked from exec'ing until it has
-//    been isolated.
-// 3. Isolate the executor. Call isolate with the pid for each isolator.
-// 4. Fetch the executor.
-// 4. Exec the executor. The forked child is signalled to continue. It will
-//    first execute any preparation commands from isolators and then exec the
-//    executor.
-Future<Nothing> MesosContainerizerProcess::launch(
-    const ContainerID& containerId,
-    const ExecutorInfo& executorInfo,
-    const string& directory,
-    const Option<string>& user,
-    const SlaveID& slaveId,
-    const PID<Slave>& slavePid,
-    bool checkpoint)
-{
-  if (promises.contains(containerId)) {
-    LOG(ERROR) << "Cannot start already running container '"
-               << containerId << "'";
-    return Failure("Container already started");
-  }
-
-  // TODO(tillt): The slave should expose which containerization
-  // mechanisms it supports to avoid scheduling tasks that it cannot
-  // run.
-  const CommandInfo& command = executorInfo.command();
-  if (command.has_container()) {
-    // We return a Failure as this containerizer does not support
-    // handling ContainerInfo. Users have to be made aware of this
-    // lack of support to prevent confusion in the task configuration.
-    return Failure("ContainerInfo is not supported");
-  }
-
-  Owned<Promise<containerizer::Termination> > promise(
-      new Promise<containerizer::Termination>());
-  promises.put(containerId, promise);
-
-  // Store the resources for usage().
-  resources.put(containerId, executorInfo.resources());
-
-  LOG(INFO) << "Starting container '" << containerId
-            << "' for executor '" << executorInfo.executor_id()
-            << "' of framework '" << executorInfo.framework_id() << "'";
-
-  return prepare(containerId, executorInfo, directory, user)
-    .then(defer(self(),
-                &Self::_launch,
-                containerId,
-                executorInfo,
-                directory,
-                user,
-                slaveId,
-                slavePid,
-                checkpoint,
-                lambda::_1))
-    .onFailed(defer(self(),
-                    &Self::destroy,
-                    containerId));
-}
-
-
-Future<Nothing> MesosContainerizerProcess::launch(
-    const ContainerID& containerId,
-    const TaskInfo&,
-    const ExecutorInfo& executorInfo,
-    const string& directory,
-    const Option<string>& user,
-    const SlaveID& slaveId,
-    const PID<Slave>& slavePid,
-    bool checkpoint)
-{
-  return launch(
-      containerId,
-      executorInfo,
-      directory,
-      user,
-      slaveId,
-      slavePid,
-      checkpoint);
-}
-
-Future<list<Option<CommandInfo> > > MesosContainerizerProcess::prepare(
-    const ContainerID& containerId,
-    const ExecutorInfo& executorInfo,
-    const string& directory,
-    const Option<string>& user)
-{
-  // Start preparing all isolators (in parallel) and gather any additional
-  // preparation comands that must be run in the forked child before exec'ing
-  // the executor.
-  list<Future<Option<CommandInfo> > > futures;
-  foreach (const Owned<Isolator>& isolator, isolators) {
-    futures.push_back(isolator->prepare(containerId, executorInfo));
-  }
-
-  // Wait for all isolators to complete preparations.
-  return collect(futures);
-}
-
-
-Future<Nothing> _fetch(
-    const ContainerID& containerId,
-    const string& directory,
-    const Option<string>& user,
-    const Option<int>& status)
-{
-  if (status.isNone() || (status.get() != 0)) {
-    return Failure("Failed to fetch URIs for container '" +
-                   stringify(containerId) + "': exit status " +
-                   (status.isNone() ? "none" : stringify(status.get())));
-  }
-
-  // Chown the work directory if a user is provided.
-  if (user.isSome()) {
-    Try<Nothing> chown = os::chown(user.get(), directory);
-    if (chown.isError()) {
-      return Failure("Failed to chown work directory");
-    }
-  }
-
-  return Nothing();
-}
-
-
-Future<Nothing> MesosContainerizerProcess::fetch(
-    const ContainerID& containerId,
-    const CommandInfo& commandInfo,
-    const string& directory,
-    const Option<string>& user)
-{
-  // Determine path for mesos-fetcher.
-  Result<string> realpath = os::realpath(
-      path::join(flags.launcher_dir, "mesos-fetcher"));
-
-  if (!realpath.isSome()) {
-    LOG(ERROR) << "Failed to determine the canonical path "
-                << "for the mesos-fetcher '"
-                << path::join(flags.launcher_dir, "mesos-fetcher")
-                << "': "
-                << (realpath.isError() ? realpath.error()
-                                       : "No such file or directory");
-    return Failure("Could not fetch URIs: failed to find mesos-fetcher");
-  }
-
-  map<string, string> environment =
-    fetcherEnvironment(commandInfo, directory, user, flags);
-
-  // Now the actual mesos-fetcher command.
-  string command = realpath.get();
-
-  LOG(INFO) << "Fetching URIs for container '" << containerId
-            << "' using command '" << command << "'";
-
-  Try<Subprocess> fetcher = subprocess(
-      command,
-      Subprocess::PIPE(),
-      Subprocess::PIPE(),
-      Subprocess::PIPE(),
-      environment);
-
-  if (fetcher.isError()) {
-    return Failure("Failed to execute mesos-fetcher: " + fetcher.error());
-  }
-
-  // Redirect output (stdout and stderr) from the fetcher to log files
-  // in the executor work directory, chown'ing them if a user is
-  // specified.
-  // TODO(tillt): Consider adding O_CLOEXEC for atomic close-on-exec.
-  // TODO(tillt): Consider adding an overload to io::redirect
-  // that accepts a file path as 'to' for further reducing code. We
-  // would however also need an owner user parameter for such overload
-  // to perfectly replace the below.
-  Try<int> out = os::open(
-      path::join(directory, "stdout"),
-      O_WRONLY | O_CREAT | O_TRUNC | O_NONBLOCK,
-      S_IRUSR | S_IWUSR | S_IRGRP | S_IRWXO);
-
-  if (out.isError()) {
-    return Failure("Failed to redirect stdout: " + out.error());
-  }
-
-  if (user.isSome()) {
-    Try<Nothing> chown = os::chown(
-        user.get(), path::join(directory, "stdout"));
-    if (chown.isError()) {
-      os::close(out.get());
-      return Failure(
-          "Failed to redirect stdout: Failed to chown: " +
-          chown.error());
-    }
-  }
-
-  // Redirect takes care of nonblocking and close-on-exec for the
-  // supplied file descriptors.
-  io::redirect(fetcher.get().out().get(), out.get());
-
-  // Redirect does 'dup' the file descriptor, hence we can close the
-  // original now.
-  os::close(out.get());
-
-  // Repeat for stderr.
-  Try<int> err = os::open(
-      path::join(directory, "stderr"),
-      O_WRONLY | O_CREAT | O_TRUNC | O_NONBLOCK,
-      S_IRUSR | S_IWUSR | S_IRGRP | S_IRWXO);
-
-  if (err.isError()) {
-    return Failure(
-        "Failed to redirect stderr: Failed to open: " +
-        err.error());
-  }
-
-  if (user.isSome()) {
-    Try<Nothing> chown = os::chown(
-        user.get(), path::join(directory, "stderr"));
-    if (chown.isError()) {
-      os::close(err.get());
-      return Failure(
-          "Failed to redirect stderr: Failed to chown: " +
-          chown.error());
-    }
-  }
-
-  io::redirect(fetcher.get().err().get(), err.get());
-
-  os::close(err.get());
-
-  return fetcher.get().status()
-    .then(lambda::bind(&_fetch, containerId, directory, user, lambda::_1));
-}
-
-
-Future<Nothing> MesosContainerizerProcess::_launch(
-    const ContainerID& containerId,
-    const ExecutorInfo& executorInfo,
-    const string& directory,
-    const Option<string>& user,
-    const SlaveID& slaveId,
-    const PID<Slave>& slavePid,
-    bool checkpoint,
-    const list<Option<CommandInfo> >& commands)
-{
-  // Prepare environment variables for the executor.
-  map<string, string> env = executorEnvironment(
-      executorInfo,
-      directory,
-      slaveId,
-      slavePid,
-      checkpoint,
-      flags.recovery_timeout);
-
-  // Include any enviroment variables from CommandInfo.
-  foreach (const Environment::Variable& variable,
-           executorInfo.command().environment().variables()) {
-    env[variable.name()] = variable.value();
-  }
-
-  // Construct a representation of the environment suitable for
-  // passing to execle in the child. We construct it here because it
-  // is not async-signal-safe.
-  os::ExecEnv envp(env);
-
-  // Use a pipe to block the child until it's been isolated.
-  int pipes[2];
-  // We assume this should not fail under reasonable conditions so we
-  // use CHECK.
-  CHECK(pipe(pipes) == 0);
-
-  // Determine the uid and gid for the child now because getpwnam is
-  // not async signal safe.
-  Result<uid_t> uid = os::getuid(user);
-  if (uid.isError() || uid.isNone()) {
-    return Failure("Invalid user: " + (uid.isError() ? uid.error()
-                                                     : "nonexistent"));
-  }
-
-  Result<gid_t> gid = os::getgid(user);
-  if (gid.isError() || gid.isNone()) {
-    return Failure("Invalid user: " + (gid.isError() ? gid.error()
-                                                     : "nonexistent"));
-  }
-
-
-  // Prepare a function for the forked child to exec() the executor.
-  lambda::function<int()> childFunction = lambda::bind(
-      &execute,
-      executorInfo.command(),
-      directory,
-      envp,
-      uid.get(),
-      gid.get(),
-      !local,
-      pipes[0],
-      pipes[1],
-      commands);
-
-  Try<pid_t> forked = launcher->fork(containerId, childFunction);
-
-  if (forked.isError()) {
-    return Failure("Failed to fork executor: " + forked.error());
-  }
-  pid_t pid = forked.get();
-
-  // Checkpoint the executor's pid if requested.
-  if (checkpoint) {
-    const string& path = slave::paths::getForkedPidPath(
-        slave::paths::getMetaRootDir(flags.work_dir),
-        slaveId,
-        executorInfo.framework_id(),
-        executorInfo.executor_id(),
-        containerId);
-
-    LOG(INFO) << "Checkpointing executor's forked pid " << pid
-              << " to '" << path <<  "'";
-
-    Try<Nothing> checkpointed =
-      slave::state::checkpoint(path, stringify(pid));
-
-    if (checkpointed.isError()) {
-      LOG(ERROR) << "Failed to checkpoint executor's forked pid to '"
-                 << path << "': " << checkpointed.error();
-
-      return Failure("Could not checkpoint executor's pid");
-    }
-  }
-
-  // Monitor the executor's pid. We keep the future because we'll refer to it
-  // again during container destroy.
-  Future<Option<int> > status = process::reap(pid);
-  statuses.put(containerId, status);
-  status.onAny(defer(self(), &Self::reaped, containerId));
-
-  return isolate(containerId, pid)
-    .then(defer(self(),
-                &Self::fetch,
-                containerId,
-                executorInfo.command(),
-                directory,
-                user))
-    .then(defer(self(), &Self::exec, containerId, pipes[1]))
-    .onAny(lambda::bind(&os::close, pipes[0]))
-    .onAny(lambda::bind(&os::close, pipes[1]));
-}
-
-
-Future<Nothing> MesosContainerizerProcess::isolate(
-    const ContainerID& containerId,
-    pid_t _pid)
-{
-  // Set up callbacks for isolator limitations.
-  foreach (const Owned<Isolator>& isolator, isolators) {
-    isolator->watch(containerId)
-      .onAny(defer(self(), &Self::limited, containerId, lambda::_1));
-  }
-
-  // Isolate the executor with each isolator.
-  list<Future<Nothing> > futures;
-  foreach (const Owned<Isolator>& isolator, isolators) {
-    futures.push_back(isolator->isolate(containerId, _pid));
-  }
-
-  // Wait for all isolators to complete.
-  return collect(futures)
-    .then(lambda::bind(&_nothing));
-}
-
-
-Future<Nothing> MesosContainerizerProcess::exec(
-    const ContainerID& containerId,
-    int pipeWrite)
-{
-  CHECK(promises.contains(containerId));
-
-  // Now that we've contained the child we can signal it to continue by
-  // writing to the pipe.
-  char dummy;
-  ssize_t length;
-  while ((length = write(pipeWrite, &dummy, sizeof(dummy))) == -1 &&
-         errno == EINTR);
-
-  if (length != sizeof(dummy)) {
-    return Failure("Failed to synchronize child process: " +
-                   string(strerror(errno)));
-  }
-
-  return Nothing();
-}
-
-
-Future<containerizer::Termination> MesosContainerizerProcess::wait(
-    const ContainerID& containerId)
-{
-  if (!promises.contains(containerId)) {
-    return Failure("Unknown container: " + stringify(containerId));
-  }
-
-  return promises[containerId]->future();
-}
-
-
-Future<Nothing> MesosContainerizerProcess::update(
-    const ContainerID& containerId,
-    const Resources& _resources)
-{
-  // The resources hashmap won't initially contain the container's resources
-  // after recovery so we must check the promises hashmap (which is set during
-  // recovery).
-  if (!promises.contains(containerId)) {
-    // It is not considered a failure if the container is not known
-    // because the slave will attempt to update the container's
-    // resources on a task's terminal state change but the executor
-    // may have already exited and the container cleaned up.
-    LOG(WARNING) << "Ignoring update for unknown container: " << containerId;
-    return Nothing();
-  }
-
-  // Store the resources for usage().
-  resources.put(containerId, _resources);
-
-  // Update each isolator.
-  list<Future<Nothing> > futures;
-  foreach (const Owned<Isolator>& isolator, isolators) {
-    futures.push_back(isolator->update(containerId, _resources));
-  }
-
-  // Wait for all isolators to complete.
-  return collect(futures)
-    .then(lambda::bind(&_nothing));
-}
-
-
-// Resources are used to set the limit fields in the statistics but are
-// optional because they aren't known after recovery until/unless update() is
-// called.
-Future<ResourceStatistics> _usage(
-    const ContainerID& containerId,
-    const Option<Resources>& resources,
-    const list<Future<ResourceStatistics> >& statistics)
-{
-  ResourceStatistics result;
-
-  // Set the timestamp now we have all statistics.
-  result.set_timestamp(Clock::now().secs());
-
-  foreach (const Future<ResourceStatistics>& statistic, statistics) {
-    if (statistic.isReady()) {
-      result.MergeFrom(statistic.get());
-    } else {
-      LOG(WARNING) << "Skipping resource statistic for container "
-                   << containerId << " because: "
-                   << (statistic.isFailed() ? statistic.failure()
-                                            : "discarded");
-    }
-  }
-
-  if (resources.isSome()) {
-    // Set the resource allocations.
-    Option<Bytes> mem = resources.get().mem();
-    if (mem.isSome()) {
-      result.set_mem_limit_bytes(mem.get().bytes());
-    }
-
-    Option<double> cpus = resources.get().cpus();
-    if (cpus.isSome()) {
-      result.set_cpus_limit(cpus.get());
-    }
-  }
-
-  return result;
-}
-
-
-Future<ResourceStatistics> MesosContainerizerProcess::usage(
-    const ContainerID& containerId)
-{
-  if (!promises.contains(containerId)) {
-    return Failure("Unknown container: " + stringify(containerId));
-  }
-
-  list<Future<ResourceStatistics> > futures;
-  foreach (const Owned<Isolator>& isolator, isolators) {
-    futures.push_back(isolator->usage(containerId));
-  }
-
-  // Use await() here so we can return partial usage statistics.
-  // TODO(idownes): After recovery resources won't be known until after an
-  // update() because they aren't part of the SlaveState.
-  return await(futures)
-    .then(lambda::bind(
-          _usage, containerId, resources.get(containerId), lambda::_1));
-}
-
-
-void MesosContainerizerProcess::destroy(const ContainerID& containerId)
-{
-  if (!promises.contains(containerId)) {
-    LOG(WARNING) << "Ignoring destroy of unknown container: " << containerId;
-    return;
-  }
-
-  if (destroying.contains(containerId)) {
-    // Destroy has already been initiated.
-    return;
-  }
-  destroying.insert(containerId);
-
-  LOG(INFO) << "Destroying container '" << containerId << "'";
-
-  if (statuses.contains(containerId)) {
-    // Kill all processes then continue destruction.
-    launcher->destroy(containerId)
-      .onAny(defer(self(), &Self::_destroy, containerId, lambda::_1));
-  } else {
-    // The executor never forked so no processes to kill, go straight to
-    // __destroy() with status = None().
-    __destroy(containerId, None());
-  }
-}
-
-
-void MesosContainerizerProcess::_destroy(
-    const ContainerID& containerId,
-    const Future<Nothing>& future)
-{
-  // Something has gone wrong and the launcher wasn't able to kill all the
-  // processes in the container. We cannot clean up the isolators because they
-  // may require that all processes have exited so just return the failure to
-  // the slave.
-  // TODO(idownes): This is a pretty bad state to be in but we should consider
-  // cleaning up here.
-  if (!future.isReady()) {
-    promises[containerId]->fail(
-        "Failed to destroy container: " +
-        (future.isFailed() ? future.failure() : "discarded future"));
-
-    destroying.erase(containerId);
-
-    return;
-  }
-
-  // We've successfully killed all processes in the container so get the exit
-  // status of the executor when it's ready (it may already be) and continue
-  // the destroy.
-  statuses.get(containerId).get()
-    .onAny(defer(self(), &Self::__destroy, containerId, lambda::_1));
-}
-
-
-void MesosContainerizerProcess::__destroy(
-    const ContainerID& containerId,
-    const Future<Option<int > >& status)
-{
-  // Now that all processes have exited we can now clean up all isolators.
-  list<Future<Nothing> > futures;
-  foreach (const Owned<Isolator>& isolator, isolators) {
-    futures.push_back(isolator->cleanup(containerId));
-  }
-
-  // Wait for all isolators to complete cleanup before continuing.
-  collect(futures)
-    .onAny(defer(self(), &Self::___destroy, containerId, status, lambda::_1));
-}
-
-
-void MesosContainerizerProcess::___destroy(
-    const ContainerID& containerId,
-    const Future<Option<int > >& status,
-    const Future<list<Nothing> >& futures)
-{
-  // Something has gone wrong with one of the Isolators and cleanup failed.
-  // We'll fail the container termination and remove the 'destroying' flag but
-  // leave all other state. The containerizer is now in a bad state because
-  // at least one isolator has failed to clean up.
-  if (!futures.isReady()) {
-    promises[containerId]->fail(
-        "Failed to clean up isolators when destroying container: " +
-        (futures.isFailed() ? futures.failure() : "discarded future"));
-
-    destroying.erase(containerId);
-
-    return;
-  }
-
-  // A container is 'killed' if any isolator limited it.
-  // Note: We may not see a limitation in time for it to be registered. This
-  // could occur if the limitation (e.g., an OOM) killed the executor and we
-  // triggered destroy() off the executor exit.
-  bool killed = false;
-  string message;
-  if (limitations.contains(containerId)) {
-    killed = true;
-    foreach (const Limitation& limitation, limitations.get(containerId)) {
-      message += limitation.message;
-    }
-    message = strings::trim(message);
-  } else {
-    message = "Executor terminated";
-  }
-
-  containerizer::Termination termination;
-  termination.set_killed(killed);
-  termination.set_message(message);
-  if (status.isReady() && status.get().isSome()) {
-    termination.set_status(status.get().get());
-  }
-
-  promises[containerId]->set(termination);
-
-  promises.erase(containerId);
-  statuses.erase(containerId);
-  limitations.erase(containerId);
-  resources.erase(containerId);
-  destroying.erase(containerId);
-}
-
-
-void MesosContainerizerProcess::reaped(const ContainerID& containerId)
-{
-  if (!promises.contains(containerId)) {
-    return;
-  }
-
-  LOG(INFO) << "Executor for container '" << containerId << "' has exited";
-
-  // The executor has exited so destroy the container.
-  destroy(containerId);
-}
-
-
-void MesosContainerizerProcess::limited(
-    const ContainerID& containerId,
-    const Future<Limitation>& future)
-{
-  if (!promises.contains(containerId)) {
-    return;
-  }
-
-  if (future.isReady()) {
-    LOG(INFO) << "Container " << containerId << " has reached its limit for"
-              << " resource " << future.get().resource
-              << " and will be terminated";
-    limitations.put(containerId, future.get());
-  } else {
-    // TODO(idownes): A discarded future will not be an error when isolators
-    // discard their promises after cleanup.
-    LOG(ERROR) << "Error in a resource limitation for container "
-               << containerId << ": " << (future.isFailed() ? future.failure()
-                                                            : "discarded");
-  }
-
-  // The container has been affected by the limitation so destroy it.
-  destroy(containerId);
-}
-
-
-Future<hashset<ContainerID> > MesosContainerizerProcess::containers()
-{
-  return promises.keys();
-}
-
-
-} // namespace slave {
-} // namespace internal {
-} // namespace mesos {

http://git-wip-us.apache.org/repos/asf/mesos/blob/d0046dca/src/slave/containerizer/mesos_containerizer.hpp
----------------------------------------------------------------------
diff --git a/src/slave/containerizer/mesos_containerizer.hpp b/src/slave/containerizer/mesos_containerizer.hpp
deleted file mode 100644
index 21affae..0000000
--- a/src/slave/containerizer/mesos_containerizer.hpp
+++ /dev/null
@@ -1,242 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-#ifndef __MESOS_CONTAINERIZER_HPP__
-#define __MESOS_CONTAINERIZER_HPP__
-
-#include <list>
-#include <vector>
-
-#include <stout/hashmap.hpp>
-#include <stout/lambda.hpp>
-#include <stout/multihashmap.hpp>
-
-#include "slave/containerizer/containerizer.hpp"
-#include "slave/containerizer/isolator.hpp"
-#include "slave/containerizer/launcher.hpp"
-
-namespace mesos {
-namespace internal {
-namespace slave {
-
-// Forward declaration.
-class MesosContainerizerProcess;
-
-class MesosContainerizer : public Containerizer
-{
-public:
-  static Try<MesosContainerizer*> create(const Flags& flags, bool local);
-
-  MesosContainerizer(
-      const Flags& flags,
-      bool local,
-      const process::Owned<Launcher>& launcher,
-      const std::vector<process::Owned<Isolator> >& isolators);
-
-  virtual ~MesosContainerizer();
-
-  virtual process::Future<Nothing> recover(
-      const Option<state::SlaveState>& state);
-
-  virtual process::Future<Nothing> launch(
-      const ContainerID& containerId,
-      const ExecutorInfo& executorInfo,
-      const std::string& directory,
-      const Option<std::string>& user,
-      const SlaveID& slaveId,
-      const process::PID<Slave>& slavePid,
-      bool checkpoint);
-
-  virtual process::Future<Nothing> launch(
-      const ContainerID& containerId,
-      const TaskInfo& taskInfo,
-      const ExecutorInfo& executorInfo,
-      const std::string& directory,
-      const Option<std::string>& user,
-      const SlaveID& slaveId,
-      const process::PID<Slave>& slavePid,
-      bool checkpoint);
-
-  virtual process::Future<Nothing> update(
-      const ContainerID& containerId,
-      const Resources& resources);
-
-  virtual process::Future<ResourceStatistics> usage(
-      const ContainerID& containerId);
-
-  virtual process::Future<containerizer::Termination> wait(
-      const ContainerID& containerId);
-
-  virtual void destroy(const ContainerID& containerId);
-
-  virtual process::Future<hashset<ContainerID> > containers();
-
-private:
-  MesosContainerizerProcess* process;
-};
-
-
-class MesosContainerizerProcess
-  : public process::Process<MesosContainerizerProcess>
-{
-public:
-  MesosContainerizerProcess(
-      const Flags& _flags,
-      bool _local,
-      const process::Owned<Launcher>& _launcher,
-      const std::vector<process::Owned<Isolator> >& _isolators)
-    : flags(_flags),
-      local(_local),
-      launcher(_launcher),
-      isolators(_isolators) {}
-
-  virtual ~MesosContainerizerProcess() {}
-
-  process::Future<Nothing> recover(
-      const Option<state::SlaveState>& state);
-
-  process::Future<Nothing> launch(
-      const ContainerID& containerId,
-      const ExecutorInfo& executorInfo,
-      const std::string& directory,
-      const Option<std::string>& user,
-      const SlaveID& slaveId,
-      const process::PID<Slave>& slavePid,
-      bool checkpoint);
-
-  process::Future<Nothing> launch(
-      const ContainerID& containerId,
-      const TaskInfo& taskInfo,
-      const ExecutorInfo& executorInfo,
-      const std::string& directory,
-      const Option<std::string>& user,
-      const SlaveID& slaveId,
-      const process::PID<Slave>& slavePid,
-      bool checkpoint);
-
-  process::Future<Nothing> update(
-      const ContainerID& containerId,
-      const Resources& resources);
-
-  process::Future<ResourceStatistics> usage(
-      const ContainerID& containerId);
-
-  process::Future<containerizer::Termination> wait(
-      const ContainerID& containerId);
-
-  void destroy(const ContainerID& containerId);
-
-  process::Future<hashset<ContainerID> > containers();
-
-private:
-  process::Future<Nothing> _recover(
-      const std::list<state::RunState>& recoverable);
-
-  process::Future<Nothing> __recover(
-      const std::list<state::RunState>& recovered);
-
-  process::Future<std::list<Option<CommandInfo> > > prepare(
-      const ContainerID& containerId,
-      const ExecutorInfo& executorInfo,
-      const std::string& directory,
-      const Option<std::string>& user);
-
-  process::Future<Nothing> fetch(
-      const ContainerID& containerId,
-      const CommandInfo& commandInfo,
-      const std::string& directory,
-      const Option<std::string>& user);
-
-  process::Future<Nothing> _launch(
-      const ContainerID& containerId,
-      const ExecutorInfo& executorInfo,
-      const std::string& directory,
-      const Option<std::string>& user,
-      const SlaveID& slaveId,
-      const process::PID<Slave>& slavePid,
-      bool checkpoint,
-      const std::list<Option<CommandInfo> >& scripts);
-
-  process::Future<Nothing> isolate(
-      const ContainerID& containerId,
-      pid_t _pid);
-
-  process::Future<Nothing> exec(
-      const ContainerID& containerId,
-      int pipeWrite);
-
-  // Continues 'destroy()' once all processes have been killed by the launcher.
-  void _destroy(
-      const ContainerID& containerId,
-      const process::Future<Nothing>& future);
-
-  // Continues '_destroy()' once we get the exit status of the executor.
-  void __destroy(
-      const ContainerID& containerId,
-      const process::Future<Option<int > >& status);
-
-  // Continues (and completes) '__destroy()' once all isolators have completed
-  // cleanup.
-  void ___destroy(
-      const ContainerID& containerId,
-      const process::Future<Option<int > >& status,
-      const process::Future<std::list<Nothing> >& futures);
-
-  // Call back for when an isolator limits a container and impacts the
-  // processes. This will trigger container destruction.
-  void limited(
-      const ContainerID& containerId,
-      const process::Future<Limitation>& future);
-
-  // Call back for when the executor exits. This will trigger container
-  // destroy.
-  void reaped(const ContainerID& containerId);
-
-  const Flags flags;
-  const bool local;
-  const process::Owned<Launcher> launcher;
-  const std::vector<process::Owned<Isolator> > isolators;
-
-  // TODO(idownes): Consider putting these per-container variables into a
-  // struct.
-  // Promises for futures returned from wait().
-  hashmap<ContainerID,
-    process::Owned<process::Promise<containerizer::Termination> > > promises;
-
-  // We need to keep track of the future exit status for each executor because
-  // we'll only get a single notification when the executor exits.
-  hashmap<ContainerID, process::Future<Option<int> > > statuses;
-
-  // We keep track of any limitations received from each isolator so we can
-  // determine the cause of an executor termination.
-  multihashmap<ContainerID, Limitation> limitations;
-
-  // We keep track of the resources for each container so we can set the
-  // ResourceStatistics limits in usage().
-  hashmap<ContainerID, Resources> resources;
-
-  // Set of containers that are in process of being destroyed.
-  hashset<ContainerID> destroying;
-};
-
-
-} // namespace slave {
-} // namespace internal {
-} // namespace mesos {
-
-#endif // __MESOS_CONTAINERIZER_HPP__

http://git-wip-us.apache.org/repos/asf/mesos/blob/d0046dca/src/tests/cgroups_isolator_tests.cpp
----------------------------------------------------------------------
diff --git a/src/tests/cgroups_isolator_tests.cpp b/src/tests/cgroups_isolator_tests.cpp
index 5a9704d..2d9738f 100644
--- a/src/tests/cgroups_isolator_tests.cpp
+++ b/src/tests/cgroups_isolator_tests.cpp
@@ -25,7 +25,7 @@
 #include <stout/proc.hpp>
 #include <stout/stringify.hpp>
 
-#include "slave/containerizer/mesos_containerizer.hpp"
+#include "slave/containerizer/mesos/containerizer.hpp"
 
 #include "tests/script.hpp"
 

http://git-wip-us.apache.org/repos/asf/mesos/blob/d0046dca/src/tests/containerizer_tests.cpp
----------------------------------------------------------------------
diff --git a/src/tests/containerizer_tests.cpp b/src/tests/containerizer_tests.cpp
index 8ea7974..70e1245 100644
--- a/src/tests/containerizer_tests.cpp
+++ b/src/tests/containerizer_tests.cpp
@@ -31,7 +31,8 @@
 
 #include "slave/containerizer/isolator.hpp"
 #include "slave/containerizer/launcher.hpp"
-#include "slave/containerizer/mesos_containerizer.hpp"
+
+#include "slave/containerizer/mesos/containerizer.hpp"
 
 #include "tests/flags.hpp"
 #include "tests/isolator.hpp"

http://git-wip-us.apache.org/repos/asf/mesos/blob/d0046dca/src/tests/isolator_tests.cpp
----------------------------------------------------------------------
diff --git a/src/tests/isolator_tests.cpp b/src/tests/isolator_tests.cpp
index 0bbec09..5a141e3 100644
--- a/src/tests/isolator_tests.cpp
+++ b/src/tests/isolator_tests.cpp
@@ -29,6 +29,7 @@
 #include <process/owned.hpp>
 #include <process/reap.hpp>
 
+#include <stout/abort.hpp>
 #include <stout/os.hpp>
 #include <stout/path.hpp>
 
@@ -84,23 +85,24 @@ using testing::Return;
 using testing::SaveArg;
 
 
-int execute(const std::string& command, int pipes[2])
+static int childSetup(int pipes[2])
 {
-  // In child process
-  ::close(pipes[1]);
+  // In child process.
+  while (::close(pipes[1]) == -1 && errno == EINTR);
 
   // Wait until the parent signals us to continue.
-  int buf;
-  while (::read(pipes[0], &buf, sizeof(buf)) == -1 && errno == EINTR);
-  ::close(pipes[0]);
+  char dummy;
+  ssize_t length;
+  while ((length = ::read(pipes[0], &dummy, sizeof(dummy))) == -1 &&
+         errno == EINTR);
 
-  execl("/bin/sh", "sh", "-c", command.c_str(), (char*) NULL);
+  if (length != sizeof(dummy)) {
+    ABORT("Failed to synchronize with parent");
+  }
 
-  const char* message = "Child failed to exec";
-  while (write(STDERR_FILENO, message, strlen(message)) == -1 &&
-         errno == EINTR);
+  while (::close(pipes[0]) == -1 && errno == EINTR);
 
-  _exit(1);
+  return 0;
 }
 
 
@@ -147,24 +149,38 @@ TYPED_TEST(CpuIsolatorTest, UserCpuUsage)
   int pipes[2];
   ASSERT_NE(-1, ::pipe(pipes));
 
-  lambda::function<int()> inChild = lambda::bind(&execute, command, pipes);
+  vector<string> argv(3);
+  argv[0] = "sh";
+  argv[1] = "-c";
+  argv[2] = command;
+
+  Try<pid_t> pid = launcher.get()->fork(
+      containerId,
+      "/bin/sh",
+      argv,
+      Subprocess::FD(STDIN_FILENO),
+      Subprocess::FD(STDOUT_FILENO),
+      Subprocess::FD(STDERR_FILENO),
+      None(),
+      None(),
+      lambda::bind(&childSetup, pipes));
 
-  Try<pid_t> pid = launcher.get()->fork(containerId, inChild);
   ASSERT_SOME(pid);
 
   // Reap the forked child.
   Future<Option<int> > status = process::reap(pid.get());
 
   // Continue in the parent.
-  ::close(pipes[0]);
+  ASSERT_SOME(os::close(pipes[0]));
 
   // Isolate the forked child.
   AWAIT_READY(isolator.get()->isolate(containerId, pid.get()));
 
   // Now signal the child to continue.
-  int buf;
-  ASSERT_LT(0, ::write(pipes[1],  &buf, sizeof(buf)));
-  ::close(pipes[1]);
+  char dummy;
+  ASSERT_LT(0, ::write(pipes[1], &dummy, sizeof(dummy)));
+
+  ASSERT_SOME(os::close(pipes[1]));
 
   // Wait for the command to start.
   while (!os::exists(file));
@@ -238,24 +254,38 @@ TYPED_TEST(CpuIsolatorTest, SystemCpuUsage)
   int pipes[2];
   ASSERT_NE(-1, ::pipe(pipes));
 
-  lambda::function<int()> inChild = lambda::bind(&execute, command, pipes);
+  vector<string> argv(3);
+  argv[0] = "sh";
+  argv[1] = "-c";
+  argv[2] = command;
+
+  Try<pid_t> pid = launcher.get()->fork(
+      containerId,
+      "/bin/sh",
+      argv,
+      Subprocess::FD(STDIN_FILENO),
+      Subprocess::FD(STDOUT_FILENO),
+      Subprocess::FD(STDERR_FILENO),
+      None(),
+      None(),
+      lambda::bind(&childSetup, pipes));
 
-  Try<pid_t> pid = launcher.get()->fork(containerId, inChild);
   ASSERT_SOME(pid);
 
   // Reap the forked child.
   Future<Option<int> > status = process::reap(pid.get());
 
   // Continue in the parent.
-  ::close(pipes[0]);
+  ASSERT_SOME(os::close(pipes[0]));
 
   // Isolate the forked child.
   AWAIT_READY(isolator.get()->isolate(containerId, pid.get()));
 
   // Now signal the child to continue.
-  int buf;
-  ASSERT_LT(0, ::write(pipes[1],  &buf, sizeof(buf)));
-  ::close(pipes[1]);
+  char dummy;
+  ASSERT_LT(0, ::write(pipes[1],  &dummy, sizeof(dummy)));
+
+  ASSERT_SOME(os::close(pipes[1]));
 
   // Wait for the command to start.
   while (!os::exists(file));
@@ -335,24 +365,38 @@ TEST_F(LimitedCpuIsolatorTest, ROOT_CGROUPS_Cfs)
   int pipes[2];
   ASSERT_NE(-1, ::pipe(pipes));
 
-  lambda::function<int()> inChild = lambda::bind(&execute, command, pipes);
+  vector<string> argv(3);
+  argv[0] = "sh";
+  argv[1] = "-c";
+  argv[2] = command;
+
+  Try<pid_t> pid = launcher.get()->fork(
+      containerId,
+      "/bin/sh",
+      argv,
+      Subprocess::FD(STDIN_FILENO),
+      Subprocess::FD(STDOUT_FILENO),
+      Subprocess::FD(STDERR_FILENO),
+      None(),
+      None(),
+      lambda::bind(&childSetup, pipes));
 
-  Try<pid_t> pid = launcher.get()->fork(containerId, inChild);
   ASSERT_SOME(pid);
 
   // Reap the forked child.
   Future<Option<int> > status = process::reap(pid.get());
 
   // Continue in the parent.
-  ::close(pipes[0]);
+  ASSERT_SOME(os::close(pipes[0]));
 
   // Isolate the forked child.
   AWAIT_READY(isolator.get()->isolate(containerId, pid.get()));
 
   // Now signal the child to continue.
-  int buf;
-  ASSERT_LT(0, ::write(pipes[1],  &buf, sizeof(buf)));
-  ::close(pipes[1]);
+  char dummy;
+  ASSERT_LT(0, ::write(pipes[1],  &dummy, sizeof(dummy)));
+
+  ASSERT_SOME(os::close(pipes[1]));
 
   // Wait for the command to complete.
   AWAIT_READY(status);
@@ -413,24 +457,38 @@ TEST_F(LimitedCpuIsolatorTest, ROOT_CGROUPS_Cfs_Big_Quota)
   int pipes[2];
   ASSERT_NE(-1, ::pipe(pipes));
 
-  lambda::function<int()> inChild = lambda::bind(&execute, "exit 0", pipes);
+  vector<string> argv(3);
+  argv[0] = "sh";
+  argv[1] = "-c";
+  argv[2] = "exit 0";
+
+  Try<pid_t> pid = launcher.get()->fork(
+      containerId,
+      "/bin/sh",
+      argv,
+      Subprocess::FD(STDIN_FILENO),
+      Subprocess::FD(STDOUT_FILENO),
+      Subprocess::FD(STDERR_FILENO),
+      None(),
+      None(),
+      lambda::bind(&childSetup, pipes));
 
-  Try<pid_t> pid = launcher.get()->fork(containerId, inChild);
   ASSERT_SOME(pid);
 
   // Reap the forked child.
   Future<Option<int> > status = process::reap(pid.get());
 
   // Continue in the parent.
-  ::close(pipes[0]);
+  ASSERT_SOME(os::close(pipes[0]));
 
   // Isolate the forked child.
   AWAIT_READY(isolator.get()->isolate(containerId, pid.get()));
 
   // Now signal the child to continue.
-  int buf;
-  ASSERT_LT(0, ::write(pipes[1],  &buf, sizeof(buf)));
-  ::close(pipes[1]);
+  char dummy;
+  ASSERT_LT(0, ::write(pipes[1],  &dummy, sizeof(dummy)));
+
+  ASSERT_SOME(os::close(pipes[1]));
 
   // Wait for the command to complete successfully.
   AWAIT_READY(status);
@@ -466,13 +524,20 @@ TYPED_TEST_CASE(MemIsolatorTest, MemIsolatorTypes);
 // posix_memalign, mlock, memset and perror are not safe.
 int consumeMemory(const Bytes& _size, const Duration& duration, int pipes[2])
 {
-  // In child process
-  ::close(pipes[1]);
+  // In child process.
+  while (::close(pipes[1]) == -1 && errno == EINTR);
 
-  int buf;
   // Wait until the parent signals us to continue.
-  while (::read(pipes[0], &buf, sizeof(buf)) == -1 && errno == EINTR);
-  ::close(pipes[0]);
+  char dummy;
+  ssize_t length;
+  while ((length = ::read(pipes[0], &dummy, sizeof(dummy))) == -1 &&
+         errno == EINTR);
+
+  if (length != sizeof(dummy)) {
+    ABORT("Failed to synchronize with parent");
+  }
+
+  while (::close(pipes[0]) == -1 && errno == EINTR);
 
   size_t size = static_cast<size_t>(_size.bytes());
   void* buffer = NULL;
@@ -522,28 +587,33 @@ TYPED_TEST(MemIsolatorTest, MemUsage)
   int pipes[2];
   ASSERT_NE(-1, ::pipe(pipes));
 
-  lambda::function<int()> inChild = lambda::bind(
-      &consumeMemory,
-      Megabytes(256),
-      Seconds(10),
-      pipes);
+  Try<pid_t> pid = launcher.get()->fork(
+      containerId,
+      "/bin/sh",
+      vector<string>(),
+      Subprocess::FD(STDIN_FILENO),
+      Subprocess::FD(STDOUT_FILENO),
+      Subprocess::FD(STDERR_FILENO),
+      None(),
+      None(),
+      lambda::bind(&consumeMemory, Megabytes(256), Seconds(10), pipes));
 
-  Try<pid_t> pid = launcher.get()->fork(containerId, inChild);
   ASSERT_SOME(pid);
 
   // Set up the reaper to wait on the forked child.
   Future<Option<int> > status = process::reap(pid.get());
 
   // Continue in the parent.
-  ::close(pipes[0]);
+  ASSERT_SOME(os::close(pipes[0]));
 
   // Isolate the forked child.
   AWAIT_READY(isolator.get()->isolate(containerId, pid.get()));
 
   // Now signal the child to continue.
-  int buf;
-  ASSERT_LT(0, ::write(pipes[1], &buf, sizeof(buf)));
-  ::close(pipes[1]);
+  char dummy;
+  ASSERT_LT(0, ::write(pipes[1], &dummy, sizeof(dummy)));
+
+  ASSERT_SOME(os::close(pipes[1]));
 
   // Wait up to 5 seconds for the child process to consume 256 MB of memory;
   ResourceStatistics statistics;

http://git-wip-us.apache.org/repos/asf/mesos/blob/d0046dca/src/tests/master_tests.cpp
----------------------------------------------------------------------
diff --git a/src/tests/master_tests.cpp b/src/tests/master_tests.cpp
index 885e0fb..c498d33 100644
--- a/src/tests/master_tests.cpp
+++ b/src/tests/master_tests.cpp
@@ -50,7 +50,7 @@
 #include "slave/flags.hpp"
 #include "slave/slave.hpp"
 
-#include "slave/containerizer/mesos_containerizer.hpp"
+#include "slave/containerizer/mesos/containerizer.hpp"
 
 #include "tests/containerizer.hpp"
 #include "tests/mesos.hpp"

http://git-wip-us.apache.org/repos/asf/mesos/blob/d0046dca/src/tests/mesos.cpp
----------------------------------------------------------------------
diff --git a/src/tests/mesos.cpp b/src/tests/mesos.cpp
index 24af32b..216bd6f 100644
--- a/src/tests/mesos.cpp
+++ b/src/tests/mesos.cpp
@@ -34,7 +34,8 @@
 #endif
 
 #include "slave/containerizer/containerizer.hpp"
-#include "slave/containerizer/mesos_containerizer.hpp"
+
+#include "slave/containerizer/mesos/containerizer.hpp"
 
 #include "tests/containerizer.hpp"
 #include "tests/environment.hpp"

http://git-wip-us.apache.org/repos/asf/mesos/blob/d0046dca/src/tests/mesos.hpp
----------------------------------------------------------------------
diff --git a/src/tests/mesos.hpp b/src/tests/mesos.hpp
index eeb8631..ae38a13 100644
--- a/src/tests/mesos.hpp
+++ b/src/tests/mesos.hpp
@@ -50,10 +50,12 @@
 #include "master/hierarchical_allocator_process.hpp"
 #include "master/master.hpp"
 
-#include "slave/containerizer/containerizer.hpp"
-#include "slave/containerizer/mesos_containerizer.hpp"
 #include "slave/slave.hpp"
 
+#include "slave/containerizer/containerizer.hpp"
+
+#include "slave/containerizer/mesos/containerizer.hpp"
+
 #include "tests/cluster.hpp"
 #include "tests/utils.hpp"
 

http://git-wip-us.apache.org/repos/asf/mesos/blob/d0046dca/src/tests/slave_tests.cpp
----------------------------------------------------------------------
diff --git a/src/tests/slave_tests.cpp b/src/tests/slave_tests.cpp
index 873f22d..371a5b8 100644
--- a/src/tests/slave_tests.cpp
+++ b/src/tests/slave_tests.cpp
@@ -43,11 +43,12 @@
 #include "master/master.hpp"
 
 #include "slave/constants.hpp"
-#include "slave/containerizer/mesos_containerizer.hpp"
 #include "slave/gc.hpp"
 #include "slave/flags.hpp"
 #include "slave/slave.hpp"
 
+#include "slave/containerizer/mesos/containerizer.hpp"
+
 #include "tests/containerizer.hpp"
 #include "tests/flags.hpp"
 #include "tests/mesos.hpp"


[5/7] git commit: Reverted Operation in favor of using Subcommand.

Posted by ji...@apache.org.
Reverted Operation in favor of using Subcommand.

Review: https://reviews.apache.org/r/22764


Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/ae3c8e22
Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/ae3c8e22
Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/ae3c8e22

Branch: refs/heads/master
Commit: ae3c8e221c39ac9ef4b7f59fb833bf78904cc111
Parents: 6b7e657
Author: Jie Yu <yu...@gmail.com>
Authored: Wed Jun 25 14:35:22 2014 -0700
Committer: Jie Yu <yu...@gmail.com>
Committed: Wed Jun 25 14:35:27 2014 -0700

----------------------------------------------------------------------
 src/Makefile.am              |   8 --
 src/launcher/launcher.cpp    | 228 --------------------------------------
 src/launcher/launcher.hpp    | 135 ----------------------
 src/launcher/main.cpp        |  29 -----
 src/tests/environment.cpp    |   5 -
 src/tests/launcher_tests.cpp |  79 -------------
 6 files changed, 484 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/mesos/blob/ae3c8e22/src/Makefile.am
----------------------------------------------------------------------
diff --git a/src/Makefile.am b/src/Makefile.am
index 861aad2..a49a3fe 100644
--- a/src/Makefile.am
+++ b/src/Makefile.am
@@ -230,7 +230,6 @@ libmesos_no_3rdparty_la_SOURCES =					\
 	sched/sched.cpp							\
 	scheduler/scheduler.cpp						\
 	local/local.cpp							\
-	launcher/launcher.cpp						\
 	master/contender.cpp						\
 	master/constants.cpp						\
 	master/detector.cpp						\
@@ -352,7 +351,6 @@ libmesos_no_3rdparty_la_SOURCES +=					\
 	credentials/credentials.hpp					\
 	examples/utils.hpp files/files.hpp				\
 	hdfs/hdfs.hpp							\
-	launcher/launcher.hpp						\
 	linux/cgroups.hpp						\
 	linux/perf.hpp							\
 	linux/fs.hpp local/flags.hpp local/local.hpp			\
@@ -574,11 +572,6 @@ mesos_usage_SOURCES = usage/main.cpp
 mesos_usage_CPPFLAGS = $(MESOS_CPPFLAGS)
 mesos_usage_LDADD = libmesos.la
 
-pkglibexec_PROGRAMS += mesos-launcher
-mesos_launcher_SOURCES = launcher/main.cpp
-mesos_launcher_CPPFLAGS = $(MESOS_CPPFLAGS)
-mesos_launcher_LDADD = libmesos.la
-
 bin_PROGRAMS += mesos-log
 mesos_log_SOURCES = log/main.cpp
 mesos_log_CPPFLAGS = $(MESOS_CPPFLAGS)
@@ -995,7 +988,6 @@ mesos_tests_SOURCES =				\
   tests/gc_tests.cpp				\
   tests/isolator_tests.cpp			\
   tests/external_containerizer_test.cpp		\
-  tests/launcher_tests.cpp			\
   tests/log_tests.cpp				\
   tests/logging_tests.cpp			\
   tests/main.cpp				\

http://git-wip-us.apache.org/repos/asf/mesos/blob/ae3c8e22/src/launcher/launcher.cpp
----------------------------------------------------------------------
diff --git a/src/launcher/launcher.cpp b/src/launcher/launcher.cpp
deleted file mode 100644
index 5585aad..0000000
--- a/src/launcher/launcher.cpp
+++ /dev/null
@@ -1,228 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-#include <process/internal.hpp>
-#include <process/io.hpp>
-#include <process/subprocess.hpp>
-
-#include <stout/foreach.hpp>
-#include <stout/hashmap.hpp>
-#include <stout/os.hpp>
-#include <stout/path.hpp>
-#include <stout/strings.hpp>
-
-#include "launcher/launcher.hpp"
-
-using namespace process;
-
-using std::cerr;
-using std::endl;
-using std::map;
-using std::string;
-
-namespace mesos {
-namespace internal {
-namespace launcher {
-
-// The default executable.
-const string DEFAULT_EXECUTABLE = "mesos-launcher";
-
-
-// The prefix of the environment variables that launcher uses.
-static const string LAUNCHER_PREFIX = "MESOS_LAUNCHER_";
-
-
-// The default directory to search for the executable.
-static Option<string> defaultPath;
-static int defaultPathLock = 0;
-
-
-// Stores all the registered operations.
-static hashmap<string, Owned<Operation> > operations;
-
-
-static void usage(const char* argv0)
-{
-  cerr << "Usage: " << argv0 << " <operation> [OPTIONS]" << endl
-       << endl
-       << "Available operations:" << endl
-       << "    help" << endl;
-
-  // Get a list of available operations.
-  foreachkey (const string& name, operations) {
-    cerr << "    " << name << endl;
-  }
-}
-
-
-void setDefaultPath(const string& path)
-{
-  process::internal::acquire(&defaultPathLock);
-  {
-    defaultPath = path;
-  }
-  process::internal::release(&defaultPathLock);
-}
-
-
-static Option<string> getDefaultPath()
-{
-  Option<string> path;
-
-  process::internal::acquire(&defaultPathLock);
-  {
-    path = defaultPath;
-  }
-  process::internal::release(&defaultPathLock);
-
-  return path;
-}
-
-
-void add(const Owned<Operation>& operation)
-{
-  operations[operation->name()] = operation;
-}
-
-
-int main(int argc, char** argv)
-{
-  if (argc < 2) {
-    usage(argv[0]);
-    return 1;
-  }
-
-  if (!strcmp(argv[1], "help")) {
-    if (argc == 2) {
-      usage(argv[0]);
-      return 1;
-    }
-
-    // 'argv[0] help operation' => 'argv[0] operation --help'
-    argv[1] = argv[2];
-    argv[2] = (char*) "--help";
-  }
-
-  const string operation = argv[1];
-
-  if (!operations.contains(operation)) {
-    cerr << "Operation '" << operation << "' is not available" << endl;
-    usage(argv[0]);
-    return 1;
-  }
-
-  // Create the operation specific flags.
-  flags::FlagsBase* flags = operations[operation]->getFlags();
-
-  // Parse the flags from the environment and the command line.
-  Try<Nothing> load = flags->load(LAUNCHER_PREFIX, argc, argv);
-  if (load.isError()) {
-    cerr << "Failed to parse the flags: " << load.error() << endl;
-    return 1;
-  }
-
-  // Execute the operation.
-  return operations[operation]->execute();
-}
-
-
-ShellOperation::Flags::Flags()
-{
-  add(&command,
-      "command",
-      "The shell command to be executed");
-}
-
-
-int ShellOperation::execute()
-{
-  if (flags.command.isNone()) {
-    cerr << "The command is not specified" << endl;
-    return 1;
-  }
-
-  int status = os::system(flags.command.get());
-  if (!WIFEXITED(status)) {
-    return 1;
-  }
-
-  return WEXITSTATUS(status);
-}
-
-
-process::Future<Option<int> > Operation::launch(
-    const Option<int>& stdout,
-    const Option<int>& stderr,
-    const string& executable,
-    const Option<string>& _path)
-{
-  // Determine the path to search for the executable. If the path is
-  // specified by the user, use it. Otherwise, use the default path.
-  // If both are not specified, return failure.
-  string path;
-  if (_path.isSome()) {
-    path = _path.get();
-  } else {
-    Option<string> _defaultPath = getDefaultPath();
-    if (_defaultPath.isNone()) {
-      return Failure("Path is not specified and no default path is found");
-    }
-    path = _defaultPath.get();
-  }
-
-  Result<string> realpath = os::realpath(path::join(path, executable));
-  if (!realpath.isSome()) {
-    return Failure(
-        "Failed to determine the canonical path for '" + executable + "': " +
-        (realpath.isError() ? realpath.error() : "No such file or directory"));
-  }
-
-  // Prepare the environment variables.
-  map<string, string> environment;
-  foreachpair (const string& name, const flags::Flag& flag, *getFlags()) {
-    Option<string> value = flag.stringify(*getFlags());
-    if (value.isSome()) {
-      string key = LAUNCHER_PREFIX + name;
-      environment[key] = value.get();
-      VLOG(1) << "Setting launcher environment " << key << "=" << value.get();
-    }
-  }
-
-  // Prepare the command: 'mesos-launcher <operation_name> ...'.
-  string command = strings::join(" ", realpath.get(), name());
-
-  Try<Subprocess> s = subprocess(
-      command,
-      Subprocess::PIPE(),
-      Subprocess::PIPE(),
-      Subprocess::PIPE(),
-      environment);
-
-  if (s.isError()) {
-    return Failure("Launch subprocess failed: " + s.error());
-  }
-
-  io::redirect(s.get().out().get(), stdout);
-  io::redirect(s.get().err().get(), stderr);
-
-  return s.get().status();
-}
-
-} // namespace launcher {
-} // namespace internal {
-} // namespace mesos {

http://git-wip-us.apache.org/repos/asf/mesos/blob/ae3c8e22/src/launcher/launcher.hpp
----------------------------------------------------------------------
diff --git a/src/launcher/launcher.hpp b/src/launcher/launcher.hpp
deleted file mode 100644
index 35cdc69..0000000
--- a/src/launcher/launcher.hpp
+++ /dev/null
@@ -1,135 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-#ifndef __LAUNCHER_LAUNCHER_HPP__
-#define __LAUNCHER_LAUNCHER_HPP__
-
-#include <string>
-
-#include <process/future.hpp>
-#include <process/owned.hpp>
-
-#include <stout/flags.hpp>
-#include <stout/none.hpp>
-#include <stout/nothing.hpp>
-#include <stout/option.hpp>
-#include <stout/try.hpp>
-
-namespace mesos {
-namespace internal {
-namespace launcher {
-
-// The default executable used by the launcher.
-extern const std::string DEFAULT_EXECUTABLE;
-
-
-// Represents an operation to be executed by a subprocess.
-class Operation
-{
-public:
-  virtual ~Operation() {}
-
-  // Launches this operation in a subprocess. The user may choose to
-  // specify the executable and the path in which to search for the
-  // executable. If not specified, the default executable and the
-  // default path will be used.
-  process::Future<Option<int> > launch(
-      const Option<int>& stdout = None(),
-      const Option<int>& stderr = None(),
-      const std::string& executable = DEFAULT_EXECUTABLE,
-      const Option<std::string>& path = None());
-
-protected:
-  // Returns the name of this operation.
-  virtual std::string name() const = 0;
-
-  // Defines the operation that will be executed by a subprocess. The
-  // return value will be the exit code of the subprocess.
-  virtual int execute() = 0;
-
-  // Returns the pointer to the flags that will be used for this
-  // operation. By default, the flags is empty.
-  virtual flags::FlagsBase* getFlags() { return &flags; }
-
-private:
-  friend void add(const process::Owned<Operation>& operation);
-  friend int main(int argc, char** argv);
-
-  // The default flags which is empty.
-  flags::FlagsBase flags;
-};
-
-
-// Tell the launcher which directory to search for the executable by
-// default if it is not specified by the user. When launching an
-// operation, if the user does not specify the 'path' and no default
-// 'path' is set, the 'launch' will fail.
-void setDefaultPath(const std::string& path);
-
-
-// Register an operation. This is supposed to be called in the main
-// function of the subprocess.
-void add(const process::Owned<Operation>& operation);
-
-
-// Syntactic sugar for registering an operation. For example, the
-// following code shows a typical main function of the subprocess.
-//
-// int main(int argc, char** argv)
-// {
-//   launcher::add<Operation1>();
-//   launcher::add<OPeration2>();
-//
-//   return launcher::main(argc, argv);
-// }
-template <typename T>
-void add()
-{
-  add(process::Owned<Operation>(new T()));
-}
-
-
-// The main entry of the subprocess.
-int main(int argc, char** argv);
-
-
-// An operation which takes a shell command and executes it. This is
-// mainly used for testing.
-class ShellOperation : public Operation
-{
-public:
-  struct Flags : public flags::FlagsBase
-  {
-    Flags();
-
-    Option<std::string> command;
-  };
-
-  Flags flags;
-
-protected:
-  virtual std::string name() const { return "shell"; }
-  virtual int execute();
-  virtual flags::FlagsBase* getFlags() { return &flags; }
-};
-
-} // namespace launcher {
-} // namespace internal {
-} // namespace mesos {
-
-#endif // __LAUNCHER_LAUNCHER_HPP__

http://git-wip-us.apache.org/repos/asf/mesos/blob/ae3c8e22/src/launcher/main.cpp
----------------------------------------------------------------------
diff --git a/src/launcher/main.cpp b/src/launcher/main.cpp
deleted file mode 100644
index b497e98..0000000
--- a/src/launcher/main.cpp
+++ /dev/null
@@ -1,29 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-#include "launcher/launcher.hpp"
-
-using namespace mesos::internal;
-
-
-int main(int argc, char** argv)
-{
-  launcher::add<launcher::ShellOperation>();
-
-  return launcher::main(argc, argv);
-}

http://git-wip-us.apache.org/repos/asf/mesos/blob/ae3c8e22/src/tests/environment.cpp
----------------------------------------------------------------------
diff --git a/src/tests/environment.cpp b/src/tests/environment.cpp
index 21b9d1d..e991d57 100644
--- a/src/tests/environment.cpp
+++ b/src/tests/environment.cpp
@@ -38,8 +38,6 @@
 #include "linux/cgroups.hpp"
 #endif
 
-#include "launcher/launcher.hpp"
-
 #include "logging/logging.hpp"
 
 #include "tests/environment.hpp"
@@ -246,9 +244,6 @@ void Environment::SetUp()
     os::setenv("MESOS_NATIVE_JAVA_LIBRARY", path);
   }
 
-  // Set the default path for the launcher.
-  launcher::setDefaultPath(path::join(tests::flags.build_dir, "src"));
-
   if (!GTEST_IS_THREADSAFE) {
     EXIT(1) << "Testing environment is not thread safe, bailing!";
   }

http://git-wip-us.apache.org/repos/asf/mesos/blob/ae3c8e22/src/tests/launcher_tests.cpp
----------------------------------------------------------------------
diff --git a/src/tests/launcher_tests.cpp b/src/tests/launcher_tests.cpp
deleted file mode 100644
index e293cc5..0000000
--- a/src/tests/launcher_tests.cpp
+++ /dev/null
@@ -1,79 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-#include <stdio.h>
-
-#include <gtest/gtest.h>
-
-#include <process/gtest.hpp>
-
-#include <stout/gtest.hpp>
-#include <stout/os.hpp>
-#include <stout/path.hpp>
-
-#include "launcher/launcher.hpp"
-
-#include "tests/flags.hpp"
-#include "tests/utils.hpp"
-
-using namespace process;
-
-using namespace mesos::internal;
-using namespace mesos::internal::launcher;
-
-using std::string;
-
-
-class LauncherTest: public tests::TemporaryDirectoryTest {};
-
-
-TEST_F(LauncherTest, Launch)
-{
-  Option<int> stdout = None();
-  Option<int> stderr = None();
-
-  // Redirect output if running the tests verbosely.
-  if (tests::flags.verbose) {
-    stdout = STDOUT_FILENO;
-    stderr = STDERR_FILENO;
-  }
-
-  string temp1 = path::join(os::getcwd(), "temp1");
-  string temp2 = path::join(os::getcwd(), "temp2");
-
-  ASSERT_SOME(os::write(temp1, "hello world"));
-
-  ShellOperation operation;
-  operation.flags.command = "cp " + temp1 + " " + temp2;
-
-  Future<Option<int> > launch = operation.launch(stdout, stderr);
-  AWAIT_READY(launch);
-  EXPECT_SOME_EQ(0, launch.get());
-  ASSERT_SOME_EQ("hello world", os::read(temp2));
-
-  AWAIT_FAILED(operation.launch(
-      stdout,
-      stderr,
-      "non-exist"));
-
-  AWAIT_FAILED(operation.launch(
-      stdout,
-      stderr,
-      launcher::DEFAULT_EXECUTABLE,
-      "non-exist"));
-}


[3/7] git commit: Introduced a Subcommand abstraction in stout.

Posted by ji...@apache.org.
Introduced a Subcommand abstraction in stout.

Review: https://reviews.apache.org/r/22764/


Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/3279c406
Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/3279c406
Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/3279c406

Branch: refs/heads/master
Commit: 3279c406c7ff12f869189f7b20dbef17310bce49
Parents: 7f1774b
Author: Jie Yu <yu...@gmail.com>
Authored: Wed Jun 25 14:33:16 2014 -0700
Committer: Jie Yu <yu...@gmail.com>
Committed: Wed Jun 25 14:33:21 2014 -0700

----------------------------------------------------------------------
 3rdparty/libprocess/3rdparty/stout/Makefile.am  |   2 +
 .../3rdparty/stout/include/stout/subcommand.hpp | 191 +++++++++++++++++++
 .../3rdparty/stout/tests/subcommand_tests.cpp   | 181 ++++++++++++++++++
 3 files changed, 374 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/mesos/blob/3279c406/3rdparty/libprocess/3rdparty/stout/Makefile.am
----------------------------------------------------------------------
diff --git a/3rdparty/libprocess/3rdparty/stout/Makefile.am b/3rdparty/libprocess/3rdparty/stout/Makefile.am
index eac7ab5..b562e87 100644
--- a/3rdparty/libprocess/3rdparty/stout/Makefile.am
+++ b/3rdparty/libprocess/3rdparty/stout/Makefile.am
@@ -68,6 +68,7 @@ EXTRA_DIST =					\
   include/stout/stopwatch.hpp			\
   include/stout/stringify.hpp			\
   include/stout/strings.hpp			\
+  include/stout/subcommand.hpp			\
   include/stout/tests/utils.hpp			\
   include/stout/thread.hpp			\
   include/stout/try.hpp				\
@@ -105,5 +106,6 @@ EXTRA_DIST =					\
   tests/set_tests.cpp				\
   tests/some_tests.cpp				\
   tests/strings_tests.cpp			\
+  tests/subcommand_tests.cpp			\
   tests/thread_tests.cpp			\
   tests/uuid_tests.cpp

http://git-wip-us.apache.org/repos/asf/mesos/blob/3279c406/3rdparty/libprocess/3rdparty/stout/include/stout/subcommand.hpp
----------------------------------------------------------------------
diff --git a/3rdparty/libprocess/3rdparty/stout/include/stout/subcommand.hpp b/3rdparty/libprocess/3rdparty/stout/include/stout/subcommand.hpp
new file mode 100644
index 0000000..b121836
--- /dev/null
+++ b/3rdparty/libprocess/3rdparty/stout/include/stout/subcommand.hpp
@@ -0,0 +1,191 @@
+/**
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *  http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#ifndef __STOUT_SUBCOMMAND_HPP__
+#define __STOUT_SUBCOMMAND_HPP__
+
+#include <iostream>
+#include <sstream>
+#include <string>
+#include <vector>
+
+#include <stout/flags.hpp>
+#include <stout/foreach.hpp>
+#include <stout/hashset.hpp>
+#include <stout/option.hpp>
+#include <stout/preprocessor.hpp>
+
+// Subcommand is an abstraction for creating command binaries that
+// encompass many subcommands. For example:
+//
+//  $ ./runner start --arg1=1 --arg2=2
+//  $ ./runner stop --arg3=3 --arg4=4
+//
+// Here, the 'runner' command contains two subcommand implementations:
+// StartCommand and StopCommand. Each subcommand needs to define a
+// name, implement an 'execute' function, and provide the address of a
+// flags where the command line arguments will be parsed to. To
+// simplify creating command binaries that encompass many subcommands,
+// we provide a 'dispatch' function which will look at argv[1] to
+// decide which subcommand to execute (based on its name) and then
+// parse the command line flags for you.
+class Subcommand
+{
+public:
+  // This function is supposed to be called by the main function of
+  // the command binary. A user needs to register at least one
+  // subcommand. Here is a typical example of the main function of the
+  // command binary:
+  //
+  // int main(int argc, char** argv)
+  // {
+  //   return Subcommand::dispatch(
+  //     None(),
+  //     argc,
+  //     argv,
+  //     new Subcommand1(),
+  //     new Subcommand2(),
+  //     new Subcommand3());
+  // }
+#define INSERT(z, N, _) subcommands.push_back( c ## N );
+#define TEMPLATE(Z, N, DATA)                            \
+  static int dispatch(                                  \
+      const Option<std::string>& prefix,                \
+      int argc,                                         \
+      char** argv,                                      \
+      ENUM_PARAMS(N, Subcommand* c))                    \
+  {                                                     \
+    std::vector<Subcommand*> subcommands;               \
+    REPEAT_FROM_TO(0, N, INSERT, _)                     \
+    return dispatch(prefix, argc, argv, subcommands);   \
+  }
+
+  REPEAT_FROM_TO(1, 11, TEMPLATE, _) // Args C1 -> C11.
+#undef TEMPLATE
+#undef INSERT
+
+  explicit Subcommand(const std::string& _name) : name_(_name) {}
+  virtual ~Subcommand() {}
+
+  std::string name() const { return name_; }
+
+protected:
+  // Defines the main function of this subcommand. The return value
+  // will be used as the exit code.
+  // TODO(jieyu): Consider passing in argc and argv as some users
+  // might want to access the remaining command line arguments.
+  virtual int execute() = 0;
+
+  // Returns the pointer to the flags that will be used for this
+  // subcommand. If the user does not provide an override, the default
+  // empty flags will be used.
+  virtual flags::FlagsBase* getFlags() { return &flags_; }
+
+private:
+  // Returns the usage by listing all the registered subcommands.
+  static std::string usage(
+      const std::string& argv0,
+      const std::vector<Subcommand*>& subcommands);
+
+  static int dispatch(
+    const Option<std::string>& prefix,
+    int argc,
+    char** argv,
+    const std::vector<Subcommand*>& subcommands);
+
+  // The name of this subcommand.
+  std::string name_;
+
+  // The default flags which is empty.
+  flags::FlagsBase flags_;
+};
+
+
+inline std::string Subcommand::usage(
+    const std::string& argv0,
+    const std::vector<Subcommand*>& subcommands)
+{
+  std::ostringstream stream;
+
+  stream << "Usage: " << argv0 << " <subcommand> [OPTIONS]\n\n"
+         << "Available subcommands:\n"
+         << "    help\n";
+
+  // Get a list of available subcommands.
+  foreach (Subcommand* subcommand, subcommands) {
+    stream << "    " << subcommand->name() << "\n";
+  }
+
+  return stream.str();
+}
+
+
+inline int Subcommand::dispatch(
+    const Option<std::string>& prefix,
+    int argc,
+    char** argv,
+    const std::vector<Subcommand*>& subcommands)
+{
+  if (subcommands.empty()) {
+    std::cerr << "No subcommand is found" << std::endl;
+    return 1;
+  }
+
+  // Check for duplicated subcommand names.
+  hashset<std::string> names;
+  foreach (Subcommand* subcommand, subcommands) {
+    if (names.contains(subcommand->name())) {
+      std::cerr << "Multiple subcommands have name '"
+                << subcommand->name() << "'" << std::endl;
+      return 1;
+    }
+    names.insert(subcommand->name());
+  }
+
+  if (argc < 2) {
+    std::cerr << usage(argv[0], subcommands) << std::endl;
+    return 1;
+  }
+
+  if (std::string(argv[1]) == "help") {
+    if (argc == 2) {
+      std::cout << usage(argv[0], subcommands) << std::endl;
+      return 0;
+    }
+
+    // 'argv[0] help subcommand' => 'argv[0] subcommand --help'
+    argv[1] = argv[2];
+    argv[2] = (char*) "--help";
+  }
+
+  foreach (Subcommand* subcommand, subcommands) {
+    if (subcommand->name() == argv[1]) {
+      flags::FlagsBase* flags = subcommand->getFlags();
+
+      Try<Nothing> load = flags->load(prefix, argc - 1, argv + 1);
+      if (load.isError()) {
+        std::cerr << "Failed to parse the flags: " << load.error() << std::endl;
+        return 1;
+      }
+
+      return subcommand->execute();
+    }
+  }
+
+  std::cerr << "Subcommand '" << argv[1] << "' is not available\n"
+            << usage(argv[0], subcommands) << std::endl;
+  return 1;
+}
+
+#endif // __STOUT_SUBCOMMAND_HPP__

http://git-wip-us.apache.org/repos/asf/mesos/blob/3279c406/3rdparty/libprocess/3rdparty/stout/tests/subcommand_tests.cpp
----------------------------------------------------------------------
diff --git a/3rdparty/libprocess/3rdparty/stout/tests/subcommand_tests.cpp b/3rdparty/libprocess/3rdparty/stout/tests/subcommand_tests.cpp
new file mode 100644
index 0000000..c40bba4
--- /dev/null
+++ b/3rdparty/libprocess/3rdparty/stout/tests/subcommand_tests.cpp
@@ -0,0 +1,181 @@
+/**
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *  http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include <string.h>
+#include <stdlib.h>
+
+#include <string>
+#include <vector>
+
+#include <gtest/gtest.h>
+
+#include <stout/flags.hpp>
+#include <stout/foreach.hpp>
+#include <stout/subcommand.hpp>
+
+using std::string;
+using std::vector;
+
+
+class TestSubcommand : public Subcommand
+{
+public:
+  struct Flags : public flags::FlagsBase
+  {
+    Flags()
+    {
+      add(&b, "b", "bool");
+      add(&i, "i", "int");
+      add(&s, "s", "string");
+      add(&s2, "s2", "string with single quote");
+      add(&s3, "s3", "string with double quote");
+      add(&d, "d", "Duration");
+      add(&y, "y", "Bytes");
+      add(&j, "j", "JSON::Object");
+    }
+
+    void populate()
+    {
+      b = true;
+      i = 42;
+      s = "hello";
+      s2 = "we're";
+      s3 = "\"geek\"";
+      d = Seconds(10);
+      y = Bytes(100);
+
+      JSON::Object object;
+      object.values["strings"] = "string";
+      object.values["integer1"] = 1;
+      object.values["integer2"] = -1;
+      object.values["double1"] = 1;
+      object.values["double2"] = -1;
+      object.values["double3"] = -1.42;
+
+      JSON::Object nested;
+      nested.values["string"] = "string";
+      object.values["nested"] = nested;
+
+      JSON::Array array;
+      array.values.push_back(nested);
+      object.values["array"] = array;
+
+      j = object;
+    }
+
+    Option<bool> b;
+    Option<int> i;
+    Option<string> s;
+    Option<string> s2;
+    Option<string> s3;
+    Option<Duration> d;
+    Option<Bytes> y;
+    Option<JSON::Object> j;
+  };
+
+  explicit TestSubcommand(const string& name) : Subcommand(name) {}
+
+  Flags flags;
+
+protected:
+  virtual int execute() { return 0; }
+  virtual flags::FlagsBase* getFlags() { return &flags; }
+};
+
+
+// Generates a vector of arguments from flags.
+static vector<string> getArgv(const flags::FlagsBase& flags)
+{
+  vector<string> argv;
+  foreachpair (const string& name, const flags::Flag& flag, flags) {
+    Option<string> value = flag.stringify(flags);
+    if (value.isSome()) {
+      argv.push_back("--" + name + "=" + value.get());
+    }
+  }
+  return argv;
+}
+
+
+TEST(SubcommandTest, Flags)
+{
+  TestSubcommand::Flags flags;
+  flags.populate();
+
+  // Construct the command line arguments.
+  vector<string> _argv = getArgv(flags);
+  int argc = _argv.size() + 2;
+  char** argv = new char*[argc];
+  argv[0] = (char*) "command";
+  argv[1] = (char*) "subcommand";
+  for (int i = 2; i < argc; i++) {
+    argv[i] = ::strdup(_argv[i - 2].c_str());
+  }
+
+  TestSubcommand subcommand("subcommand");
+
+  ASSERT_EQ(0, Subcommand::dispatch(
+      None(),
+      argc,
+      argv,
+      &subcommand));
+
+  EXPECT_EQ(flags.b, subcommand.flags.b);
+  EXPECT_EQ(flags.i, subcommand.flags.i);
+  EXPECT_EQ(flags.s, subcommand.flags.s);
+  EXPECT_EQ(flags.s2, subcommand.flags.s2);
+  EXPECT_EQ(flags.s3, subcommand.flags.s3);
+  EXPECT_EQ(flags.d, subcommand.flags.d);
+  EXPECT_EQ(flags.y, subcommand.flags.y);
+  EXPECT_EQ(flags.j, subcommand.flags.j);
+
+  for (int i = 2; i < argc; i++) {
+    ::free(argv[i]);
+  }
+  delete argv;
+}
+
+
+TEST(SubcommandTest, Dispatch)
+{
+  TestSubcommand subcommand("subcommand");
+  TestSubcommand subcommand2("subcommand2");
+
+  int argc = 2;
+  char* argv[] = {
+    (char*) "command",
+    (char*) "subcommand"
+  };
+
+  EXPECT_EQ(1, Subcommand::dispatch(
+      None(),
+      argc,
+      argv,
+      &subcommand2));
+
+  // Duplicated subcommand names.
+  EXPECT_EQ(1, Subcommand::dispatch(
+      None(),
+      argc,
+      argv,
+      &subcommand,
+      &subcommand));
+
+  EXPECT_EQ(0, Subcommand::dispatch(
+      None(),
+      argc,
+      argv,
+      &subcommand,
+      &subcommand2));
+}


[2/7] git commit: Updated mesos to adapt to the new Subprocess interfaces.

Posted by ji...@apache.org.
Updated mesos to adapt to the new Subprocess interfaces.

Review: https://reviews.apache.org/r/22851


Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/7f1774b8
Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/7f1774b8
Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/7f1774b8

Branch: refs/heads/master
Commit: 7f1774b8aee8ad85f80c0d26fac1321cde3ee1cc
Parents: 1b0fdf0
Author: Jie Yu <yu...@gmail.com>
Authored: Fri Jun 20 12:46:08 2014 -0700
Committer: Jie Yu <yu...@gmail.com>
Committed: Wed Jun 25 14:32:16 2014 -0700

----------------------------------------------------------------------
 src/health-check/main.cpp                       |  1 -
 src/launcher/executor.cpp                       | 41 +++++++++++---------
 src/launcher/launcher.cpp                       |  1 -
 .../containerizer/external_containerizer.cpp    |  1 -
 src/slave/containerizer/mesos_containerizer.cpp |  1 -
 src/tests/slave_tests.cpp                       |  1 -
 6 files changed, 23 insertions(+), 23 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/mesos/blob/7f1774b8/src/health-check/main.cpp
----------------------------------------------------------------------
diff --git a/src/health-check/main.cpp b/src/health-check/main.cpp
index 05e2924..707810a 100644
--- a/src/health-check/main.cpp
+++ b/src/health-check/main.cpp
@@ -154,7 +154,6 @@ private:
           Subprocess::PIPE(),
           Subprocess::FD(STDERR_FILENO),
           Subprocess::FD(STDERR_FILENO),
-          None(),
           environment);
 
       if (external.isError()) {

http://git-wip-us.apache.org/repos/asf/mesos/blob/7f1774b8/src/launcher/executor.cpp
----------------------------------------------------------------------
diff --git a/src/launcher/executor.cpp b/src/launcher/executor.cpp
index 9f48c88..9c80848 100644
--- a/src/launcher/executor.cpp
+++ b/src/launcher/executor.cpp
@@ -62,6 +62,7 @@ using std::cout;
 using std::cerr;
 using std::endl;
 using std::string;
+using std::vector;
 
 namespace mesos {
 namespace internal {
@@ -402,31 +403,35 @@ private:
   void launchHealthCheck(const TaskInfo& task)
   {
     if (task.has_health_check()) {
-      const HealthCheck& healthCheck = task.health_check();
-      JSON::Object json = JSON::Protobuf(healthCheck);
-      // TODO(tnachen): Use flags when subprocess handle arguments
-      // with quotes.
-      const string& healthCommand =
-        path::join(healthCheckDir, "mesos-health-check") + " --executor=\"" +
-         stringify(self()) + "\" --health_check_json='" + stringify(json) +
-         "' --task_id=" + task.task_id().value();
-      cout << "Launching health check process: " << healthCommand << endl;
+      JSON::Object json = JSON::Protobuf(task.health_check());
+
+      // Launch the subprocess using 'execve' style so that quotes can
+      // be properly handled.
+      vector<string> argv(4);
+      argv[0] = "mesos-health-check";
+      argv[1] = "--executor=" + stringify(self());
+      argv[2] = "--health_check_json=" + stringify(json);
+      argv[3] = "--task_id=" + task.task_id().value();
+
+      cout << "Launching health check process: "
+           << path::join(healthCheckDir, "mesos-health-check")
+           << " " << argv[1] << " " << argv[2] << " " << argv[3] << endl;
+
       Try<Subprocess> healthProcess =
         process::subprocess(
-          healthCommand,
+          path::join(healthCheckDir, "mesos-health-check"),
+          argv,
           Subprocess::PIPE(),
           Subprocess::FD(STDOUT_FILENO),
-          Subprocess::FD(STDERR_FILENO),
-          None(),
-          None(),
-          None());
+          Subprocess::FD(STDERR_FILENO));
+
       if (healthProcess.isError()) {
         cerr << "Unable to launch health process: " << healthProcess.error();
       } else {
-        const Subprocess& health = healthProcess.get();
-        healthPid = health.pid();
-        cout << "Health check process launched at pid: " << stringify(healthPid)
-             << endl;
+        healthPid = healthProcess.get().pid();
+
+        cout << "Health check process launched at pid: "
+             << stringify(healthPid) << endl;
       }
     }
   }

http://git-wip-us.apache.org/repos/asf/mesos/blob/7f1774b8/src/launcher/launcher.cpp
----------------------------------------------------------------------
diff --git a/src/launcher/launcher.cpp b/src/launcher/launcher.cpp
index 872e2e8..5585aad 100644
--- a/src/launcher/launcher.cpp
+++ b/src/launcher/launcher.cpp
@@ -211,7 +211,6 @@ process::Future<Option<int> > Operation::launch(
       Subprocess::PIPE(),
       Subprocess::PIPE(),
       Subprocess::PIPE(),
-      None(),
       environment);
 
   if (s.isError()) {

http://git-wip-us.apache.org/repos/asf/mesos/blob/7f1774b8/src/slave/containerizer/external_containerizer.cpp
----------------------------------------------------------------------
diff --git a/src/slave/containerizer/external_containerizer.cpp b/src/slave/containerizer/external_containerizer.cpp
index bb3e5cc..3f28d85 100644
--- a/src/slave/containerizer/external_containerizer.cpp
+++ b/src/slave/containerizer/external_containerizer.cpp
@@ -1117,7 +1117,6 @@ Try<Subprocess> ExternalContainerizerProcess::invoke(
       Subprocess::PIPE(),
       Subprocess::PIPE(),
       Subprocess::PIPE(),
-      None(),
       environment,
       lambda::bind(&setup, sandbox.isSome() ? sandbox.get().directory
                                             : string()));

http://git-wip-us.apache.org/repos/asf/mesos/blob/7f1774b8/src/slave/containerizer/mesos_containerizer.cpp
----------------------------------------------------------------------
diff --git a/src/slave/containerizer/mesos_containerizer.cpp b/src/slave/containerizer/mesos_containerizer.cpp
index 76c101c..8a109f4 100644
--- a/src/slave/containerizer/mesos_containerizer.cpp
+++ b/src/slave/containerizer/mesos_containerizer.cpp
@@ -666,7 +666,6 @@ Future<Nothing> MesosContainerizerProcess::fetch(
       Subprocess::PIPE(),
       Subprocess::PIPE(),
       Subprocess::PIPE(),
-      None(),
       environment);
 
   if (fetcher.isError()) {

http://git-wip-us.apache.org/repos/asf/mesos/blob/7f1774b8/src/tests/slave_tests.cpp
----------------------------------------------------------------------
diff --git a/src/tests/slave_tests.cpp b/src/tests/slave_tests.cpp
index 21fe685..873f22d 100644
--- a/src/tests/slave_tests.cpp
+++ b/src/tests/slave_tests.cpp
@@ -351,7 +351,6 @@ TEST_F(SlaveTest, MesosExecutorWithOverride)
         process::Subprocess::PIPE(),
         process::Subprocess::PIPE(),
         process::Subprocess::PIPE(),
-        None(),
         environment);
 
   ASSERT_SOME(executor);


[7/7] git commit: Refactored the mesos containerizer launcher to fix MESOS-1404.

Posted by ji...@apache.org.
Refactored the mesos containerizer launcher to fix MESOS-1404.

Review: https://reviews.apache.org/r/22852


Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/d0046dca
Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/d0046dca
Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/d0046dca

Branch: refs/heads/master
Commit: d0046dca732ac4c1636ef85384bda3092ff3fa4f
Parents: ae3c8e2
Author: Jie Yu <yu...@gmail.com>
Authored: Fri Jun 20 21:40:37 2014 -0700
Committer: Jie Yu <yu...@gmail.com>
Committed: Wed Jun 25 14:36:12 2014 -0700

----------------------------------------------------------------------
 src/Makefile.am                                 |   21 +-
 src/slave/containerizer/containerizer.cpp       |    3 +-
 src/slave/containerizer/launcher.cpp            |   89 +-
 src/slave/containerizer/launcher.hpp            |   46 +-
 src/slave/containerizer/linux_launcher.cpp      |  199 +--
 src/slave/containerizer/linux_launcher.hpp      |   17 +-
 src/slave/containerizer/mesos/containerizer.cpp | 1048 ++++++++++++++++
 src/slave/containerizer/mesos/containerizer.hpp |  240 ++++
 src/slave/containerizer/mesos/launch.cpp        |  211 ++++
 src/slave/containerizer/mesos/launch.hpp        |   60 +
 src/slave/containerizer/mesos/main.cpp          |   34 +
 src/slave/containerizer/mesos_containerizer.cpp | 1174 ------------------
 src/slave/containerizer/mesos_containerizer.hpp |  242 ----
 src/tests/cgroups_isolator_tests.cpp            |    2 +-
 src/tests/containerizer_tests.cpp               |    3 +-
 src/tests/isolator_tests.cpp                    |  170 ++-
 src/tests/master_tests.cpp                      |    2 +-
 src/tests/mesos.cpp                             |    3 +-
 src/tests/mesos.hpp                             |    6 +-
 src/tests/slave_tests.cpp                       |    3 +-
 20 files changed, 1958 insertions(+), 1615 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/mesos/blob/d0046dca/src/Makefile.am
----------------------------------------------------------------------
diff --git a/src/Makefile.am b/src/Makefile.am
index a49a3fe..fb3af9d 100644
--- a/src/Makefile.am
+++ b/src/Makefile.am
@@ -247,10 +247,11 @@ libmesos_no_3rdparty_la_SOURCES =					\
 	slave/slave.cpp							\
 	slave/http.cpp							\
 	slave/containerizer/containerizer.cpp				\
+	slave/containerizer/external_containerizer.cpp			\
 	slave/containerizer/isolator.cpp				\
 	slave/containerizer/launcher.cpp				\
-	slave/containerizer/mesos_containerizer.cpp			\
-	slave/containerizer/external_containerizer.cpp			\
+	slave/containerizer/mesos/containerizer.cpp			\
+	slave/containerizer/mesos/launch.cpp				\
 	slave/status_update_manager.cpp					\
 	exec/exec.cpp							\
 	common/lock.cpp							\
@@ -365,16 +366,17 @@ libmesos_no_3rdparty_la_SOURCES +=					\
 	master/registrar.hpp						\
 	master/master.hpp master/sorter.hpp				\
 	messages/messages.hpp slave/constants.hpp			\
-	slave/containerizer/linux_launcher.hpp				\
 	slave/containerizer/containerizer.hpp				\
+	slave/containerizer/external_containerizer.hpp			\
 	slave/containerizer/isolator.hpp				\
+	slave/containerizer/launcher.hpp				\
+	slave/containerizer/linux_launcher.hpp				\
+	slave/containerizer/mesos/containerizer.hpp			\
+	slave/containerizer/mesos/launch.hpp				\
+	slave/containerizer/isolators/posix.hpp				\
 	slave/containerizer/isolators/cgroups/cpushare.hpp		\
 	slave/containerizer/isolators/cgroups/mem.hpp			\
 	slave/containerizer/isolators/cgroups/perf_event.hpp		\
-	slave/containerizer/isolators/posix.hpp				\
-	slave/containerizer/launcher.hpp				\
-	slave/containerizer/mesos_containerizer.hpp			\
-	slave/containerizer/external_containerizer.hpp			\
 	slave/flags.hpp slave/gc.hpp slave/monitor.hpp			\
 	slave/paths.hpp slave/state.hpp					\
 	slave/status_update_manager.hpp					\
@@ -562,6 +564,11 @@ mesos_executor_SOURCES = launcher/executor.cpp
 mesos_executor_CPPFLAGS = $(MESOS_CPPFLAGS)
 mesos_executor_LDADD = libmesos.la
 
+pkglibexec_PROGRAMS += mesos-containerizer
+mesos_containerizer_SOURCES = slave/containerizer/mesos/main.cpp
+mesos_containerizer_CPPFLAGS = $(MESOS_CPPFLAGS)
+mesos_containerizer_LDADD = libmesos.la
+
 pkglibexec_PROGRAMS += mesos-health-check
 mesos_health_check_SOURCES = health-check/main.cpp
 mesos_health_check_CPPFLAGS = $(MESOS_CPPFLAGS)

http://git-wip-us.apache.org/repos/asf/mesos/blob/d0046dca/src/slave/containerizer/containerizer.cpp
----------------------------------------------------------------------
diff --git a/src/slave/containerizer/containerizer.cpp b/src/slave/containerizer/containerizer.cpp
index 922ae17..1b71f33 100644
--- a/src/slave/containerizer/containerizer.cpp
+++ b/src/slave/containerizer/containerizer.cpp
@@ -37,9 +37,10 @@
 #include "slave/containerizer/containerizer.hpp"
 #include "slave/containerizer/isolator.hpp"
 #include "slave/containerizer/launcher.hpp"
-#include "slave/containerizer/mesos_containerizer.hpp"
 #include "slave/containerizer/external_containerizer.hpp"
 
+#include "slave/containerizer/mesos/containerizer.hpp"
+
 using std::map;
 using std::string;
 using std::vector;

http://git-wip-us.apache.org/repos/asf/mesos/blob/d0046dca/src/slave/containerizer/launcher.cpp
----------------------------------------------------------------------
diff --git a/src/slave/containerizer/launcher.cpp b/src/slave/containerizer/launcher.cpp
index 2b13d1f..92c0657 100644
--- a/src/slave/containerizer/launcher.cpp
+++ b/src/slave/containerizer/launcher.cpp
@@ -16,8 +16,6 @@
  * limitations under the License.
  */
 
-#include <unistd.h>
-
 #include <process/collect.hpp>
 #include <process/delay.hpp>
 #include <process/process.hpp>
@@ -32,7 +30,9 @@
 using namespace process;
 
 using std::list;
+using std::map;
 using std::string;
+using std::vector;
 
 namespace mesos {
 namespace internal {
@@ -63,11 +63,12 @@ Future<Nothing> PosixLauncher::recover(const list<RunState>& states)
     pid_t pid = state.forkedPid.get();
 
     if (pids.containsValue(pid)) {
-      // This should (almost) never occur. There is the possibility that a new
-      // executor is launched with the same pid as one that just exited (highly
-      // unlikely) and the slave dies after the new executor is launched but
-      // before it hears about the termination of the earlier executor (also
-      // unlikely). Regardless, the launcher can't do anything sensible so this
+      // This should (almost) never occur. There is the possibility
+      // that a new executor is launched with the same pid as one that
+      // just exited (highly unlikely) and the slave dies after the
+      // new executor is launched but before it hears about the
+      // termination of the earlier executor (also unlikely).
+      // Regardless, the launcher can't do anything sensible so this
       // is considered an error.
       return Failure("Detected duplicate pid " + stringify(pid) +
                      " for container " + stringify(containerId));
@@ -80,46 +81,66 @@ Future<Nothing> PosixLauncher::recover(const list<RunState>& states)
 }
 
 
+// The setup function in child before the exec.
+static int childSetup(const Option<lambda::function<int()> >& setup)
+{
+  // POSIX guarantees a forked child's pid does not match any existing
+  // process group id so only a single setsid() is required and the
+  // session id will be the pid.
+  // TODO(idownes): perror is not listed as async-signal-safe and
+  // should be reimplemented safely.
+  // TODO(jieyu): Move this logic to the subprocess (i.e.,
+  // mesos-containerizer launch).
+  if (::setsid() == -1) {
+    perror("Failed to put child in a new session");
+    _exit(1);
+  }
+
+  if (setup.isSome()) {
+    return setup.get()();
+  }
+
+  return 0;
+}
+
+
 Try<pid_t> PosixLauncher::fork(
     const ContainerID& containerId,
-    const lambda::function<int()>& childFunction)
+    const string& path,
+    const vector<string>& argv,
+    const Subprocess::IO& in,
+    const Subprocess::IO& out,
+    const Subprocess::IO& err,
+    const Option<flags::FlagsBase>& flags,
+    const Option<map<string, string> >& environment,
+    const Option<lambda::function<int()> >& setup)
 {
   if (pids.contains(containerId)) {
     return Error("Process has already been forked for container " +
                  stringify(containerId));
   }
 
-  pid_t pid;
-
-  if ((pid = ::fork()) == -1) {
-    return ErrnoError("Failed to fork");
-  }
-
-  if (pid == 0) {
-    // In child.
-    // POSIX guarantees a forked child's pid does not match any existing
-    // process group id so only a single setsid() is required and the session
-    // id will be the pid.
-    // TODO(idownes): perror is not listed as async-signal-safe and should be
-    // reimplemented safely.
-    if (setsid() == -1) {
-      perror("Failed to put child in a new session");
-      _exit(1);
-    }
-
-    // This function should exec() and therefore not return.
-    childFunction();
-
-    ABORT("Child failed to exec");
+  Try<Subprocess> child = subprocess(
+      path,
+      argv,
+      in,
+      out,
+      err,
+      flags,
+      environment,
+      lambda::bind(&childSetup, setup));
+
+  if (child.isError()) {
+    return Error("Failed to fork a child process: " + child.error());
   }
 
-  // parent.
-  LOG(INFO) << "Forked child with pid '" << pid
+  LOG(INFO) << "Forked child with pid '" << child.get().pid()
             << "' for container '" << containerId << "'";
+
   // Store the pid (session id and process group id).
-  pids.put(containerId, pid);
+  pids.put(containerId, child.get().pid());
 
-  return pid;
+  return child.get().pid();
 }
 
 

http://git-wip-us.apache.org/repos/asf/mesos/blob/d0046dca/src/slave/containerizer/launcher.hpp
----------------------------------------------------------------------
diff --git a/src/slave/containerizer/launcher.hpp b/src/slave/containerizer/launcher.hpp
index 835d7a9..18b3546 100644
--- a/src/slave/containerizer/launcher.hpp
+++ b/src/slave/containerizer/launcher.hpp
@@ -20,10 +20,15 @@
 #define __LAUNCHER_HPP__
 
 #include <list>
+#include <map>
+#include <string>
 
 #include <process/future.hpp>
+#include <process/subprocess.hpp>
 
+#include <stout/flags.hpp>
 #include <stout/lambda.hpp>
+#include <stout/option.hpp>
 #include <stout/try.hpp>
 
 #include "slave/flags.hpp"
@@ -42,23 +47,33 @@ public:
   virtual process::Future<Nothing> recover(
       const std::list<state::RunState>& states) = 0;
 
-  // Fork a new process in the containerized context. The child will call the
-  // specified function and the parent will return the child's pid.
-  // NOTE: The function must be async-signal safe and should exec as soon as
-  // possible.
+  // Fork a new process in the containerized context. The child will
+  // exec the binary at the given path with the given argv, flags and
+  // environment. The I/O of the child will be redirected according to
+  // the specified I/O descriptors. The user can provide a 'setup'
+  // function which will be invoked in the child process right before
+  // the exec. The 'setup' function has to be async signal safe. The
+  // parent will return the child's pid if the fork is successful.
   virtual Try<pid_t> fork(
       const ContainerID& containerId,
-      const lambda::function<int()>& childFunction) = 0;
+      const std::string& path,
+      const std::vector<std::string>& argv,
+      const process::Subprocess::IO& in,
+      const process::Subprocess::IO& out,
+      const process::Subprocess::IO& err,
+      const Option<flags::FlagsBase>& flags,
+      const Option<std::map<std::string, std::string> >& environment,
+      const Option<lambda::function<int()> >& setup) = 0;
 
   // Kill all processes in the containerized context.
   virtual process::Future<Nothing> destroy(const ContainerID& containerId) = 0;
 };
 
 
-// Launcher suitable for any POSIX compliant system. Uses process groups and
-// sessions to track processes in a container. POSIX states that process groups
-// cannot migrate between sessions so all processes for a container will be
-// contained in a session.
+// Launcher suitable for any POSIX compliant system. Uses process
+// groups and sessions to track processes in a container. POSIX states
+// that process groups cannot migrate between sessions so all
+// processes for a container will be contained in a session.
 class PosixLauncher : public Launcher
 {
 public:
@@ -71,15 +86,22 @@ public:
 
   virtual Try<pid_t> fork(
       const ContainerID& containerId,
-      const lambda::function<int()>& childFunction);
+      const std::string& path,
+      const std::vector<std::string>& argv,
+      const process::Subprocess::IO& in,
+      const process::Subprocess::IO& out,
+      const process::Subprocess::IO& err,
+      const Option<flags::FlagsBase>& flags,
+      const Option<std::map<std::string, std::string> >& environment,
+      const Option<lambda::function<int()> >& setup);
 
   virtual process::Future<Nothing> destroy(const ContainerID& containerId);
 
 private:
   PosixLauncher() {}
 
-  // The 'pid' is the process id of the first process and also the process
-  // group id and session id.
+  // The 'pid' is the process id of the first process and also the
+  // process group id and session id.
   hashmap<ContainerID, pid_t> pids;
 };
 

http://git-wip-us.apache.org/repos/asf/mesos/blob/d0046dca/src/slave/containerizer/linux_launcher.cpp
----------------------------------------------------------------------
diff --git a/src/slave/containerizer/linux_launcher.cpp b/src/slave/containerizer/linux_launcher.cpp
index 7ebccb4..1ce03e3 100644
--- a/src/slave/containerizer/linux_launcher.cpp
+++ b/src/slave/containerizer/linux_launcher.cpp
@@ -38,6 +38,7 @@
 using namespace process;
 
 using std::list;
+using std::map;
 using std::string;
 using std::vector;
 
@@ -59,7 +60,9 @@ LinuxLauncher::LinuxLauncher(
 Try<Launcher*> LinuxLauncher::create(const Flags& flags)
 {
   Try<string> hierarchy = cgroups::prepare(
-      flags.cgroups_hierarchy, "freezer", flags.cgroups_root);
+      flags.cgroups_hierarchy,
+      "freezer",
+      flags.cgroups_root);
 
   if (hierarchy.isError()) {
     return Error("Failed to create Linux launcher: " + hierarchy.error());
@@ -68,7 +71,8 @@ Try<Launcher*> LinuxLauncher::create(const Flags& flags)
   LOG(INFO) << "Using " << hierarchy.get()
             << " as the freezer hierarchy for the Linux launcher";
 
-  // TODO(idownes): Inspect the isolation flag to determine namespaces to use.
+  // TODO(idownes): Inspect the isolation flag to determine namespaces
+  // to use.
   int namespaces = 0;
 
   return new LinuxLauncher(flags, namespaces, hierarchy.get());
@@ -94,10 +98,10 @@ Future<Nothing> LinuxLauncher::recover(const std::list<state::RunState>& states)
     Try<bool> exists = cgroups::exists(hierarchy, cgroup(containerId));
 
     if (!exists.get()) {
-      // This may occur if the freezer cgroup was destroyed but the slave dies
-      // before noticing this.
-      // The containerizer will monitor the container's pid and notice that it
-      // has exited, triggering destruction of the container.
+      // This may occur if the freezer cgroup was destroyed but the
+      // slave dies before noticing this. The containerizer will
+      // monitor the container's pid and notice that it has exited,
+      // triggering destruction of the container.
       LOG(INFO) << "Couldn't find freezer cgroup for container " << containerId;
       continue;
     }
@@ -109,11 +113,12 @@ Future<Nothing> LinuxLauncher::recover(const std::list<state::RunState>& states)
     pid_t pid = state.forkedPid.get();
 
     if (pids.containsValue(pid)) {
-      // This should (almost) never occur. There is the possibility that a new
-      // executor is launched with the same pid as one that just exited (highly
-      // unlikely) and the slave dies after the new executor is launched but
-      // before it hears about the termination of the earlier executor (also
-      // unlikely). Regardless, the launcher can't do anything sensible so this
+      // This should (almost) never occur. There is the possibility
+      // that a new executor is launched with the same pid as one that
+      // just exited (highly unlikely) and the slave dies after the
+      // new executor is launched but before it hears about the
+      // termination of the earlier executor (also unlikely).
+      // Regardless, the launcher can't do anything sensible so this
       // is considered an error.
       return Failure("Detected duplicate pid " + stringify(pid) +
                      " for container " + stringify(containerId));
@@ -149,58 +154,84 @@ Future<Nothing> LinuxLauncher::recover(const std::list<state::RunState>& states)
 
 
 // Helper for clone() which expects an int(void*).
-static int childMain(void* child)
+static int childMain(void* _func)
 {
   const lambda::function<int()>* func =
-    static_cast<const lambda::function<int()>*> (child);
+    static_cast<const lambda::function<int()>*> (_func);
 
   return (*func)();
 }
 
 
-// Helper that creates a new session then blocks on reading the pipe before
-// calling the supplied function.
-static int _childMain(
-    const lambda::function<int()>& childFunction,
-    int pipes[2])
+// The customized clone function which will be used by 'subprocess()'.
+static pid_t clone(const lambda::function<int()>& func, int namespaces)
 {
-  // In child.
-  os::close(pipes[1]);
+  // Stack for the child.
+  // - unsigned long long used for best alignment.
+  // - static is ok because each child gets their own copy after the clone.
+  // - 8 MiB appears to be the default for "ulimit -s" on OSX and Linux.
+  static unsigned long long stack[(8*1024*1024)/sizeof(unsigned long long)];
+
+  LOG(INFO) << "Cloning child process with flags = " << namespaces;
+
+  return ::clone(
+      childMain,
+      &stack[sizeof(stack)/sizeof(stack[0]) - 1],  // stack grows down
+      namespaces | SIGCHLD,   // Specify SIGCHLD as child termination signal
+      (void*) &func);
+}
 
-  // Move to a different session (and new process group) so we're independent
-  // from the slave's session (otherwise children will receive SIGHUP if the
-  // slave exits).
-  // TODO(idownes): perror is not listed as async-signal-safe and should be
-  // reimplemented safely.
-  if (setsid() == -1) {
-    perror("Failed to put child in a new session");
-    os::close(pipes[0]);
-    _exit(1);
-  }
 
-  // Do a blocking read on the pipe until the parent signals us to continue.
-  int buf;
-  int len;
-  while ((len = read(pipes[0], &buf, sizeof(buf))) == -1 && errno == EINTR);
+static int childSetup(
+    int pipes[2],
+    const Option<lambda::function<int()> >& setup)
+{
+  // In child.
+  while (::close(pipes[1]) == -1 && errno == EINTR);
 
-  if (len != sizeof(buf)) {
+  // Do a blocking read on the pipe until the parent signals us to
+  // continue.
+  char dummy;
+  ssize_t length;
+  while ((length = ::read(pipes[0], &dummy, sizeof(dummy))) == -1 &&
+         errno == EINTR);
+
+  if (length != sizeof(dummy)) {
     ABORT("Failed to synchronize with parent");
   }
 
-  os::close(pipes[0]);
+  while (::close(pipes[0]) == -1 && errno == EINTR);
 
-  // This function should exec() and therefore not return.
-  childFunction();
+  // Move to a different session (and new process group) so we're
+  // independent from the slave's session (otherwise children will
+  // receive SIGHUP if the slave exits).
+  // TODO(idownes): perror is not listed as async-signal-safe and
+  // should be reimplemented safely.
+  // TODO(jieyu): Move this logic to the subprocess (i.e.,
+  // mesos-containerizer launch).
+  if (::setsid() == -1) {
+    perror("Failed to put child in a new session");
+    return 1;
+  }
 
-  ABORT("Child failed to exec");
+  if (setup.isSome()) {
+    return setup.get()();
+  }
 
-  return -1;
+  return 0;
 }
 
 
 Try<pid_t> LinuxLauncher::fork(
     const ContainerID& containerId,
-    const lambda::function<int()>& childFunction)
+    const string& path,
+    const vector<string>& argv,
+    const process::Subprocess::IO& in,
+    const process::Subprocess::IO& out,
+    const process::Subprocess::IO& err,
+    const Option<flags::FlagsBase>& flags,
+    const Option<map<string, string> >& environment,
+    const Option<lambda::function<int()> >& setup)
 {
   // Create a freezer cgroup for this container if necessary.
   Try<bool> exists = cgroups::exists(hierarchy, cgroup(containerId));
@@ -217,70 +248,72 @@ Try<pid_t> LinuxLauncher::fork(
     }
   }
 
-  // Use a pipe to block the child until it's been moved into the freezer
-  // cgroup.
+  // Use a pipe to block the child until it's been moved into the
+  // freezer cgroup.
   int pipes[2];
-  // We assume this should not fail under reasonable conditions so we use CHECK.
-  CHECK(pipe(pipes) == 0);
 
-  // Use the _childMain helper which moves the child into a new session and
-  // blocks on the pipe until we're ready for it to run.
-  lambda::function<int()> func =
-    lambda::bind(&_childMain, childFunction, pipes);
-
-  // Stack for the child.
-  // - unsigned long long used for best alignment.
-  // - static is ok because each child gets their own copy after the clone.
-  // - 8 MiB appears to be the default for "ulimit -s" on OSX and Linux.
-  static unsigned long long stack[(8*1024*1024)/sizeof(unsigned long long)];
-
-  LOG(INFO) << "Cloning child process with flags = " << namespaces;
-
-  pid_t pid;
-  if ((pid = ::clone(
-          childMain,
-          &stack[sizeof(stack)/sizeof(stack[0]) - 1],  // stack grows down
-          namespaces | SIGCHLD,   // Specify SIGCHLD as child termination signal
-          static_cast<void*>(&func))) == -1) {
-      return ErrnoError("Failed to clone child process");
+  // We assume this should not fail under reasonable conditions so we
+  // use CHECK.
+  CHECK_EQ(0, ::pipe(pipes));
+
+  Try<Subprocess> child = subprocess(
+      path,
+      argv,
+      in,
+      out,
+      err,
+      flags,
+      environment,
+      lambda::bind(&childSetup, pipes, setup),
+      lambda::bind(&clone, lambda::_1, namespaces));
+
+  if (child.isError()) {
+    return Error("Failed to clone child process: " + child.error());
   }
 
   // Parent.
   os::close(pipes[0]);
 
-  // Move the child into the freezer cgroup. Any grandchildren will also be
-  // contained in the cgroup.
-  Try<Nothing> assign = cgroups::assign(hierarchy, cgroup(containerId), pid);
+  // Move the child into the freezer cgroup. Any grandchildren will
+  // also be contained in the cgroup.
+  // TODO(jieyu): Move this logic to the subprocess (i.e.,
+  // mesos-containerizer launch).
+  Try<Nothing> assign = cgroups::assign(
+      hierarchy,
+      cgroup(containerId),
+      child.get().pid());
 
   if (assign.isError()) {
-    LOG(ERROR) << "Failed to assign process " << pid
+    LOG(ERROR) << "Failed to assign process " << child.get().pid()
                 << " of container '" << containerId << "'"
                 << " to its freezer cgroup: " << assign.error();
-    kill(pid, SIGKILL);
+
+    ::kill(child.get().pid(), SIGKILL);
     return Error("Failed to contain process");
   }
 
-  // Now that we've contained the child we can signal it to continue by
-  // writing to the pipe.
-  int buf;
-  ssize_t len;
-  while ((len = write(pipes[1], &buf, sizeof(buf))) == -1 && errno == EINTR);
+  // Now that we've contained the child we can signal it to continue
+  // by writing to the pipe.
+  char dummy;
+  ssize_t length;
+  while ((length = ::write(pipes[1], &dummy, sizeof(dummy))) == -1 &&
+         errno == EINTR);
 
-  if (len != sizeof(buf)) {
+  os::close(pipes[1]);
+
+  if (length != sizeof(dummy)) {
     // Ensure the child is killed.
-    kill(pid, SIGKILL);
-    os::close(pipes[1]);
+    ::kill(child.get().pid(), SIGKILL);
     return Error("Failed to synchronize child process");
   }
-  os::close(pipes[1]);
 
-  // Store the pid (session id and process group id) if this is the first
-  // process forked for this container.
+  // Store the pid (session id and process group id) if this is the
+  // first process forked for this container.
   if (!pids.contains(containerId)) {
-    pids.put(containerId, pid);
+    pids.put(containerId, child.get().pid());
   }
 
-  return pid;
+  return child.get().pid();
 }
 
 

http://git-wip-us.apache.org/repos/asf/mesos/blob/d0046dca/src/slave/containerizer/linux_launcher.hpp
----------------------------------------------------------------------
diff --git a/src/slave/containerizer/linux_launcher.hpp b/src/slave/containerizer/linux_launcher.hpp
index 622810c..3d9794d 100644
--- a/src/slave/containerizer/linux_launcher.hpp
+++ b/src/slave/containerizer/linux_launcher.hpp
@@ -25,8 +25,8 @@ namespace mesos {
 namespace internal {
 namespace slave {
 
-// Launcher for Linux systems with cgroups. Uses a freezer cgroup to track
-// pids.
+// Launcher for Linux systems with cgroups. Uses a freezer cgroup to
+// track pids.
 class LinuxLauncher : public Launcher
 {
 public:
@@ -39,7 +39,14 @@ public:
 
   virtual Try<pid_t> fork(
       const ContainerID& containerId,
-      const lambda::function<int()>& childFunction);
+      const std::string& path,
+      const std::vector<std::string>& argv,
+      const process::Subprocess::IO& in,
+      const process::Subprocess::IO& out,
+      const process::Subprocess::IO& err,
+      const Option<flags::FlagsBase>& flags,
+      const Option<std::map<std::string, std::string> >& environment,
+      const Option<lambda::function<int()> >& setup);
 
   virtual process::Future<Nothing> destroy(const ContainerID& containerId);
 
@@ -56,8 +63,8 @@ private:
 
   std::string cgroup(const ContainerID& containerId);
 
-  // The 'pid' is the process id of the child process and also the process
-  // group id and session id.
+  // The 'pid' is the process id of the child process and also the
+  // process group id and session id.
   hashmap<ContainerID, pid_t> pids;
 };
 

http://git-wip-us.apache.org/repos/asf/mesos/blob/d0046dca/src/slave/containerizer/mesos/containerizer.cpp
----------------------------------------------------------------------
diff --git a/src/slave/containerizer/mesos/containerizer.cpp b/src/slave/containerizer/mesos/containerizer.cpp
new file mode 100644
index 0000000..27f8e09
--- /dev/null
+++ b/src/slave/containerizer/mesos/containerizer.cpp
@@ -0,0 +1,1048 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include <process/collect.hpp>
+#include <process/defer.hpp>
+#include <process/io.hpp>
+#include <process/reap.hpp>
+#include <process/subprocess.hpp>
+
+#include <stout/os.hpp>
+
+#include "slave/paths.hpp"
+#include "slave/slave.hpp"
+
+#include "slave/containerizer/containerizer.hpp"
+#include "slave/containerizer/isolator.hpp"
+#include "slave/containerizer/launcher.hpp"
+#ifdef __linux__
+#include "slave/containerizer/linux_launcher.hpp"
+#endif // __linux__
+
+#include "slave/containerizer/isolators/posix.hpp"
+#ifdef __linux__
+#include "slave/containerizer/isolators/cgroups/cpushare.hpp"
+#include "slave/containerizer/isolators/cgroups/mem.hpp"
+#include "slave/containerizer/isolators/cgroups/perf_event.hpp"
+#endif // __linux__
+
+#include "slave/containerizer/mesos/containerizer.hpp"
+#include "slave/containerizer/mesos/launch.hpp"
+
+using std::list;
+using std::map;
+using std::string;
+using std::vector;
+
+using namespace process;
+
+namespace mesos {
+namespace internal {
+namespace slave {
+
+using state::SlaveState;
+using state::FrameworkState;
+using state::ExecutorState;
+using state::RunState;
+
+// Local function declaration/definitions.
+Future<Nothing> _nothing() { return Nothing(); }
+
+
+// Helper method to build the environment map used to launch fetcher.
+map<string, string> fetcherEnvironment(
+    const CommandInfo& commandInfo,
+    const std::string& directory,
+    const Option<std::string>& user,
+    const Flags& flags)
+{
+  // Prepare the environment variables to pass to mesos-fetcher.
+  string uris = "";
+  foreach (const CommandInfo::URI& uri, commandInfo.uris()) {
+    uris += uri.value() + "+" +
+            (uri.has_executable() && uri.executable() ? "1" : "0") +
+            (uri.extract() ? "X" : "N");
+    uris += " ";
+  }
+  // Remove extra space at the end.
+  uris = strings::trim(uris);
+
+  map<string, string> environment;
+  environment["MESOS_EXECUTOR_URIS"] = uris;
+  environment["MESOS_WORK_DIRECTORY"] = directory;
+  if (user.isSome()) {
+    environment["MESOS_USER"] = user.get();
+  }
+  if (!flags.frameworks_home.empty()) {
+    environment["MESOS_FRAMEWORKS_HOME"] = flags.frameworks_home;
+  }
+  if (!flags.hadoop_home.empty()) {
+    environment["HADOOP_HOME"] = flags.hadoop_home;
+  }
+
+  return environment;
+}
+
+
+Try<MesosContainerizer*> MesosContainerizer::create(
+    const Flags& flags,
+    bool local)
+{
+  string isolation;
+  if (flags.isolation == "process") {
+    LOG(WARNING) << "The 'process' isolation flag is deprecated, "
+                 << "please update your flags to"
+                 << " '--isolation=posix/cpu,posix/mem'.";
+    isolation = "posix/cpu,posix/mem";
+  } else if (flags.isolation == "cgroups") {
+    LOG(WARNING) << "The 'cgroups' isolation flag is deprecated, "
+                 << "please update your flags to"
+                 << " '--isolation=cgroups/cpu,cgroups/mem'.";
+    isolation = "cgroups/cpu,cgroups/mem";
+  } else {
+    isolation = flags.isolation;
+  }
+
+  LOG(INFO) << "Using isolation: " << isolation;
+
+  // Create a MesosContainerizerProcess using isolators and a launcher.
+  hashmap<std::string, Try<Isolator*> (*)(const Flags&)> creators;
+
+  creators["posix/cpu"]   = &PosixCpuIsolatorProcess::create;
+  creators["posix/mem"]   = &PosixMemIsolatorProcess::create;
+#ifdef __linux__
+  creators["cgroups/cpu"] = &CgroupsCpushareIsolatorProcess::create;
+  creators["cgroups/mem"] = &CgroupsMemIsolatorProcess::create;
+  creators["cgroups/perf_event"] = &CgroupsPerfEventIsolatorProcess::create;
+#endif // __linux__
+
+  vector<Owned<Isolator> > isolators;
+
+  foreach (const string& type, strings::split(isolation, ",")) {
+    if (creators.contains(type)) {
+      Try<Isolator*> isolator = creators[type](flags);
+      if (isolator.isError()) {
+        return Error(
+            "Could not create isolator " + type + ": " + isolator.error());
+      } else {
+        isolators.push_back(Owned<Isolator>(isolator.get()));
+      }
+    } else {
+      return Error("Unknown or unsupported isolator: " + type);
+    }
+  }
+
+#ifdef __linux__
+  // Use cgroups on Linux if any cgroups isolators are used.
+  Try<Launcher*> launcher = strings::contains(isolation, "cgroups")
+    ? LinuxLauncher::create(flags)
+    : PosixLauncher::create(flags);
+#else
+  Try<Launcher*> launcher = PosixLauncher::create(flags);
+#endif // __linux__
+
+  if (launcher.isError()) {
+    return Error("Failed to create launcher: " + launcher.error());
+  }
+
+  return new MesosContainerizer(
+      flags, local, Owned<Launcher>(launcher.get()), isolators);
+}
+
+
+MesosContainerizer::MesosContainerizer(
+    const Flags& flags,
+    bool local,
+    const Owned<Launcher>& launcher,
+    const vector<Owned<Isolator> >& isolators)
+{
+  process = new MesosContainerizerProcess(
+      flags, local, launcher, isolators);
+  spawn(process);
+}
+
+
+MesosContainerizer::~MesosContainerizer()
+{
+  terminate(process);
+  process::wait(process);
+  delete process;
+}
+
+
+Future<Nothing> MesosContainerizer::recover(
+    const Option<state::SlaveState>& state)
+{
+  return dispatch(process, &MesosContainerizerProcess::recover, state);
+}
+
+
+Future<Nothing> MesosContainerizer::launch(
+    const ContainerID& containerId,
+    const ExecutorInfo& executorInfo,
+    const string& directory,
+    const Option<string>& user,
+    const SlaveID& slaveId,
+    const PID<Slave>& slavePid,
+    bool checkpoint)
+{
+  return dispatch(process,
+                  &MesosContainerizerProcess::launch,
+                  containerId,
+                  executorInfo,
+                  directory,
+                  user,
+                  slaveId,
+                  slavePid,
+                  checkpoint);
+}
+
+
+Future<Nothing> MesosContainerizer::launch(
+    const ContainerID& containerId,
+    const TaskInfo& taskInfo,
+    const ExecutorInfo& executorInfo,
+    const string& directory,
+    const Option<string>& user,
+    const SlaveID& slaveId,
+    const PID<Slave>& slavePid,
+    bool checkpoint)
+{
+  return dispatch(process,
+                  &MesosContainerizerProcess::launch,
+                  containerId,
+                  taskInfo,
+                  executorInfo,
+                  directory,
+                  user,
+                  slaveId,
+                  slavePid,
+                  checkpoint);
+}
+
+
+Future<Nothing> MesosContainerizer::update(
+    const ContainerID& containerId,
+    const Resources& resources)
+{
+  return dispatch(process,
+                  &MesosContainerizerProcess::update,
+                  containerId,
+                  resources);
+}
+
+
+Future<ResourceStatistics> MesosContainerizer::usage(
+    const ContainerID& containerId)
+{
+  return dispatch(process, &MesosContainerizerProcess::usage, containerId);
+}
+
+
+Future<containerizer::Termination> MesosContainerizer::wait(
+    const ContainerID& containerId)
+{
+  return dispatch(process, &MesosContainerizerProcess::wait, containerId);
+}
+
+
+void MesosContainerizer::destroy(const ContainerID& containerId)
+{
+  dispatch(process, &MesosContainerizerProcess::destroy, containerId);
+}
+
+
+Future<hashset<ContainerID> > MesosContainerizer::containers()
+{
+  return dispatch(process, &MesosContainerizerProcess::containers);
+}
+
+
+Future<Nothing> MesosContainerizerProcess::recover(
+    const Option<state::SlaveState>& state)
+{
+  LOG(INFO) << "Recovering containerizer";
+
+  // Gather the executor run states that we will attempt to recover.
+  list<RunState> recoverable;
+  if (state.isSome()) {
+    foreachvalue (const FrameworkState& framework, state.get().frameworks) {
+      foreachvalue (const ExecutorState& executor, framework.executors) {
+        if (executor.info.isNone()) {
+          LOG(WARNING) << "Skipping recovery of executor '" << executor.id
+                       << "' of framework " << framework.id
+                       << " because its info could not be recovered";
+          continue;
+        }
+
+        if (executor.latest.isNone()) {
+          LOG(WARNING) << "Skipping recovery of executor '" << executor.id
+                       << "' of framework " << framework.id
+                       << " because its latest run could not be recovered";
+          continue;
+        }
+
+        // We are only interested in the latest run of the executor!
+        const ContainerID& containerId = executor.latest.get();
+        Option<RunState> run = executor.runs.get(containerId);
+        CHECK_SOME(run);
+
+        // We need the pid so the reaper can monitor the executor so skip this
+        // executor if it's not present. This is not an error because the slave
+        // will try to wait on the container which will return a failed
+        // Termination and everything will get cleaned up.
+        if (!run.get().forkedPid.isSome()) {
+          continue;
+        }
+
+        if (run.get().completed) {
+          VLOG(1) << "Skipping recovery of executor '" << executor.id
+                  << "' of framework " << framework.id
+                  << " because its latest run "
+                  << containerId << " is completed";
+          continue;
+        }
+
+        LOG(INFO) << "Recovering container '" << containerId
+                  << "' for executor '" << executor.id
+                  << "' of framework " << framework.id;
+
+        recoverable.push_back(run.get());
+      }
+    }
+  }
+
+  // Try to recover the launcher first.
+  return launcher->recover(recoverable)
+    .then(defer(self(), &Self::_recover, recoverable));
+}
+
+
+Future<Nothing> MesosContainerizerProcess::_recover(
+    const list<RunState>& recoverable)
+{
+  // Then recover the isolators.
+  list<Future<Nothing> > futures;
+  foreach (const Owned<Isolator>& isolator, isolators) {
+    futures.push_back(isolator->recover(recoverable));
+  }
+
+  // If all isolators recover then continue.
+  return collect(futures)
+    .then(defer(self(), &Self::__recover, recoverable));
+}
+
+
+Future<Nothing> MesosContainerizerProcess::__recover(
+    const list<RunState>& recovered)
+{
+  foreach (const RunState& run, recovered) {
+    CHECK_SOME(run.id);
+    const ContainerID& containerId = run.id.get();
+
+    Owned<Promise<containerizer::Termination> > promise(
+        new Promise<containerizer::Termination>());
+    promises.put(containerId, promise);
+
+    CHECK_SOME(run.forkedPid);
+    Future<Option<int > > status = process::reap(run.forkedPid.get());
+    statuses[containerId] = status;
+    status.onAny(defer(self(), &Self::reaped, containerId));
+
+    foreach (const Owned<Isolator>& isolator, isolators) {
+      isolator->watch(containerId)
+        .onAny(defer(self(), &Self::limited, containerId, lambda::_1));
+    }
+  }
+
+  return Nothing();
+}
+
+
+// Launching an executor involves the following steps:
+// 1. Call prepare on each isolator.
+// 2. Fork the executor. The forked child is blocked from exec'ing until it has
+//    been isolated.
+// 3. Isolate the executor. Call isolate with the pid for each isolator.
+// 4. Fetch the executor.
+// 4. Exec the executor. The forked child is signalled to continue. It will
+//    first execute any preparation commands from isolators and then exec the
+//    executor.
+Future<Nothing> MesosContainerizerProcess::launch(
+    const ContainerID& containerId,
+    const ExecutorInfo& executorInfo,
+    const string& directory,
+    const Option<string>& user,
+    const SlaveID& slaveId,
+    const PID<Slave>& slavePid,
+    bool checkpoint)
+{
+  if (promises.contains(containerId)) {
+    LOG(ERROR) << "Cannot start already running container '"
+               << containerId << "'";
+    return Failure("Container already started");
+  }
+
+  // TODO(tillt): The slave should expose which containerization
+  // mechanisms it supports to avoid scheduling tasks that it cannot
+  // run.
+  const CommandInfo& command = executorInfo.command();
+  if (command.has_container()) {
+    // We return a Failure as this containerizer does not support
+    // handling ContainerInfo. Users have to be made aware of this
+    // lack of support to prevent confusion in the task configuration.
+    return Failure("ContainerInfo is not supported");
+  }
+
+  Owned<Promise<containerizer::Termination> > promise(
+      new Promise<containerizer::Termination>());
+  promises.put(containerId, promise);
+
+  // Store the resources for usage().
+  resources.put(containerId, executorInfo.resources());
+
+  LOG(INFO) << "Starting container '" << containerId
+            << "' for executor '" << executorInfo.executor_id()
+            << "' of framework '" << executorInfo.framework_id() << "'";
+
+  return prepare(containerId, executorInfo, directory, user)
+    .then(defer(self(),
+                &Self::_launch,
+                containerId,
+                executorInfo,
+                directory,
+                user,
+                slaveId,
+                slavePid,
+                checkpoint,
+                lambda::_1))
+    .onFailed(defer(self(),
+                    &Self::destroy,
+                    containerId));
+}
+
+
+Future<Nothing> MesosContainerizerProcess::launch(
+    const ContainerID& containerId,
+    const TaskInfo&,
+    const ExecutorInfo& executorInfo,
+    const string& directory,
+    const Option<string>& user,
+    const SlaveID& slaveId,
+    const PID<Slave>& slavePid,
+    bool checkpoint)
+{
+  return launch(
+      containerId,
+      executorInfo,
+      directory,
+      user,
+      slaveId,
+      slavePid,
+      checkpoint);
+}
+
+Future<list<Option<CommandInfo> > > MesosContainerizerProcess::prepare(
+    const ContainerID& containerId,
+    const ExecutorInfo& executorInfo,
+    const string& directory,
+    const Option<string>& user)
+{
+  // Start preparing all isolators (in parallel) and gather any additional
+  // preparation comands that must be run in the forked child before exec'ing
+  // the executor.
+  list<Future<Option<CommandInfo> > > futures;
+  foreach (const Owned<Isolator>& isolator, isolators) {
+    futures.push_back(isolator->prepare(containerId, executorInfo));
+  }
+
+  // Wait for all isolators to complete preparations.
+  return collect(futures);
+}
+
+
+Future<Nothing> _fetch(
+    const ContainerID& containerId,
+    const string& directory,
+    const Option<string>& user,
+    const Option<int>& status)
+{
+  if (status.isNone() || (status.get() != 0)) {
+    return Failure("Failed to fetch URIs for container '" +
+                   stringify(containerId) + "': exit status " +
+                   (status.isNone() ? "none" : stringify(status.get())));
+  }
+
+  // Chown the work directory if a user is provided.
+  if (user.isSome()) {
+    Try<Nothing> chown = os::chown(user.get(), directory);
+    if (chown.isError()) {
+      return Failure("Failed to chown work directory");
+    }
+  }
+
+  return Nothing();
+}
+
+
+Future<Nothing> MesosContainerizerProcess::fetch(
+    const ContainerID& containerId,
+    const CommandInfo& commandInfo,
+    const string& directory,
+    const Option<string>& user)
+{
+  // Determine path for mesos-fetcher.
+  Result<string> realpath = os::realpath(
+      path::join(flags.launcher_dir, "mesos-fetcher"));
+
+  if (!realpath.isSome()) {
+    LOG(ERROR) << "Failed to determine the canonical path "
+                << "for the mesos-fetcher '"
+                << path::join(flags.launcher_dir, "mesos-fetcher")
+                << "': "
+                << (realpath.isError() ? realpath.error()
+                                       : "No such file or directory");
+    return Failure("Could not fetch URIs: failed to find mesos-fetcher");
+  }
+
+  map<string, string> environment =
+    fetcherEnvironment(commandInfo, directory, user, flags);
+
+  // Now the actual mesos-fetcher command.
+  string command = realpath.get();
+
+  LOG(INFO) << "Fetching URIs for container '" << containerId
+            << "' using command '" << command << "'";
+
+  Try<Subprocess> fetcher = subprocess(
+      command,
+      Subprocess::PIPE(),
+      Subprocess::PIPE(),
+      Subprocess::PIPE(),
+      environment);
+
+  if (fetcher.isError()) {
+    return Failure("Failed to execute mesos-fetcher: " + fetcher.error());
+  }
+
+  // Redirect output (stdout and stderr) from the fetcher to log files
+  // in the executor work directory, chown'ing them if a user is
+  // specified.
+  // TODO(tillt): Consider adding O_CLOEXEC for atomic close-on-exec.
+  // TODO(tillt): Consider adding an overload to io::redirect
+  // that accepts a file path as 'to' for further reducing code. We
+  // would however also need an owner user parameter for such overload
+  // to perfectly replace the below.
+  Try<int> out = os::open(
+      path::join(directory, "stdout"),
+      O_WRONLY | O_CREAT | O_TRUNC | O_NONBLOCK,
+      S_IRUSR | S_IWUSR | S_IRGRP | S_IRWXO);
+
+  if (out.isError()) {
+    return Failure("Failed to redirect stdout: " + out.error());
+  }
+
+  if (user.isSome()) {
+    Try<Nothing> chown = os::chown(
+        user.get(), path::join(directory, "stdout"));
+    if (chown.isError()) {
+      os::close(out.get());
+      return Failure(
+          "Failed to redirect stdout: Failed to chown: " +
+          chown.error());
+    }
+  }
+
+  // Redirect takes care of nonblocking and close-on-exec for the
+  // supplied file descriptors.
+  io::redirect(fetcher.get().out().get(), out.get());
+
+  // Redirect does 'dup' the file descriptor, hence we can close the
+  // original now.
+  os::close(out.get());
+
+  // Repeat for stderr.
+  Try<int> err = os::open(
+      path::join(directory, "stderr"),
+      O_WRONLY | O_CREAT | O_TRUNC | O_NONBLOCK,
+      S_IRUSR | S_IWUSR | S_IRGRP | S_IRWXO);
+
+  if (err.isError()) {
+    return Failure(
+        "Failed to redirect stderr: Failed to open: " +
+        err.error());
+  }
+
+  if (user.isSome()) {
+    Try<Nothing> chown = os::chown(
+        user.get(), path::join(directory, "stderr"));
+    if (chown.isError()) {
+      os::close(err.get());
+      return Failure(
+          "Failed to redirect stderr: Failed to chown: " +
+          chown.error());
+    }
+  }
+
+  io::redirect(fetcher.get().err().get(), err.get());
+
+  os::close(err.get());
+
+  return fetcher.get().status()
+    .then(lambda::bind(&_fetch, containerId, directory, user, lambda::_1));
+}
+
+
+Future<Nothing> MesosContainerizerProcess::_launch(
+    const ContainerID& containerId,
+    const ExecutorInfo& executorInfo,
+    const string& directory,
+    const Option<string>& user,
+    const SlaveID& slaveId,
+    const PID<Slave>& slavePid,
+    bool checkpoint,
+    const list<Option<CommandInfo> >& commands)
+{
+  // Prepare environment variables for the executor.
+  map<string, string> env = executorEnvironment(
+      executorInfo,
+      directory,
+      slaveId,
+      slavePid,
+      checkpoint,
+      flags.recovery_timeout);
+
+  // Include any enviroment variables from CommandInfo.
+  foreach (const Environment::Variable& variable,
+           executorInfo.command().environment().variables()) {
+    env[variable.name()] = variable.value();
+  }
+
+  // Use a pipe to block the child until it's been isolated.
+  int pipes[2];
+
+  // We assume this should not fail under reasonable conditions so we
+  // use CHECK.
+  CHECK(pipe(pipes) == 0);
+
+  // Prepare the flags to pass to the launch process.
+  MesosContainerizerLaunch::Flags launchFlags;
+
+  launchFlags.command = JSON::Protobuf(executorInfo.command());
+  launchFlags.directory = directory;
+  launchFlags.user = user;
+  launchFlags.pipe_read = pipes[0];
+  launchFlags.pipe_write = pipes[1];
+
+  // Prepare the additional preparation commands.
+  // TODO(jieyu): Use JSON::Array once we have generic parse support.
+  JSON::Object object;
+  JSON::Array array;
+  foreach (const Option<CommandInfo>& command, commands) {
+    if (command.isSome()) {
+      array.values.push_back(JSON::Protobuf(command.get()));
+    }
+  }
+  object.values["commands"] = array;
+
+  launchFlags.commands = object;
+
+  // Fork the child using launcher.
+  vector<string> argv(2);
+  argv[0] = "mesos-containerizer";
+  argv[1] = MesosContainerizerLaunch::NAME;
+
+  Try<pid_t> forked = launcher->fork(
+      containerId,
+      path::join(flags.launcher_dir, "mesos-containerizer"),
+      argv,
+      Subprocess::FD(STDIN_FILENO),
+      (local ? Subprocess::FD(STDOUT_FILENO)
+             : Subprocess::PATH(path::join(directory, "stdout"))),
+      (local ? Subprocess::FD(STDERR_FILENO)
+             : Subprocess::PATH(path::join(directory, "stderr"))),
+      launchFlags,
+      env,
+      None());
+
+  if (forked.isError()) {
+    return Failure("Failed to fork executor: " + forked.error());
+  }
+  pid_t pid = forked.get();
+
+  // Checkpoint the executor's pid if requested.
+  if (checkpoint) {
+    const string& path = slave::paths::getForkedPidPath(
+        slave::paths::getMetaRootDir(flags.work_dir),
+        slaveId,
+        executorInfo.framework_id(),
+        executorInfo.executor_id(),
+        containerId);
+
+    LOG(INFO) << "Checkpointing executor's forked pid " << pid
+              << " to '" << path <<  "'";
+
+    Try<Nothing> checkpointed =
+      slave::state::checkpoint(path, stringify(pid));
+
+    if (checkpointed.isError()) {
+      LOG(ERROR) << "Failed to checkpoint executor's forked pid to '"
+                 << path << "': " << checkpointed.error();
+
+      return Failure("Could not checkpoint executor's pid");
+    }
+  }
+
+  // Monitor the executor's pid. We keep the future because we'll
+  // refer to it again during container destroy.
+  Future<Option<int> > status = process::reap(pid);
+  statuses.put(containerId, status);
+  status.onAny(defer(self(), &Self::reaped, containerId));
+
+  return isolate(containerId, pid)
+    .then(defer(self(),
+                &Self::fetch,
+                containerId,
+                executorInfo.command(),
+                directory,
+                user))
+    .then(defer(self(), &Self::exec, containerId, pipes[1]))
+    .onAny(lambda::bind(&os::close, pipes[0]))
+    .onAny(lambda::bind(&os::close, pipes[1]));
+}
+
+
+Future<Nothing> MesosContainerizerProcess::isolate(
+    const ContainerID& containerId,
+    pid_t _pid)
+{
+  // Set up callbacks for isolator limitations.
+  foreach (const Owned<Isolator>& isolator, isolators) {
+    isolator->watch(containerId)
+      .onAny(defer(self(), &Self::limited, containerId, lambda::_1));
+  }
+
+  // Isolate the executor with each isolator.
+  list<Future<Nothing> > futures;
+  foreach (const Owned<Isolator>& isolator, isolators) {
+    futures.push_back(isolator->isolate(containerId, _pid));
+  }
+
+  // Wait for all isolators to complete.
+  return collect(futures)
+    .then(lambda::bind(&_nothing));
+}
+
+
+Future<Nothing> MesosContainerizerProcess::exec(
+    const ContainerID& containerId,
+    int pipeWrite)
+{
+  CHECK(promises.contains(containerId));
+
+  // Now that we've contained the child we can signal it to continue by
+  // writing to the pipe.
+  char dummy;
+  ssize_t length;
+  while ((length = write(pipeWrite, &dummy, sizeof(dummy))) == -1 &&
+         errno == EINTR);
+
+  if (length != sizeof(dummy)) {
+    return Failure("Failed to synchronize child process: " +
+                   string(strerror(errno)));
+  }
+
+  return Nothing();
+}
+
+
+Future<containerizer::Termination> MesosContainerizerProcess::wait(
+    const ContainerID& containerId)
+{
+  if (!promises.contains(containerId)) {
+    return Failure("Unknown container: " + stringify(containerId));
+  }
+
+  return promises[containerId]->future();
+}
+
+
+Future<Nothing> MesosContainerizerProcess::update(
+    const ContainerID& containerId,
+    const Resources& _resources)
+{
+  // The resources hashmap won't initially contain the container's resources
+  // after recovery so we must check the promises hashmap (which is set during
+  // recovery).
+  if (!promises.contains(containerId)) {
+    // It is not considered a failure if the container is not known
+    // because the slave will attempt to update the container's
+    // resources on a task's terminal state change but the executor
+    // may have already exited and the container cleaned up.
+    LOG(WARNING) << "Ignoring update for unknown container: " << containerId;
+    return Nothing();
+  }
+
+  // Store the resources for usage().
+  resources.put(containerId, _resources);
+
+  // Update each isolator.
+  list<Future<Nothing> > futures;
+  foreach (const Owned<Isolator>& isolator, isolators) {
+    futures.push_back(isolator->update(containerId, _resources));
+  }
+
+  // Wait for all isolators to complete.
+  return collect(futures)
+    .then(lambda::bind(&_nothing));
+}
+
+
+// Resources are used to set the limit fields in the statistics but are
+// optional because they aren't known after recovery until/unless update() is
+// called.
+Future<ResourceStatistics> _usage(
+    const ContainerID& containerId,
+    const Option<Resources>& resources,
+    const list<Future<ResourceStatistics> >& statistics)
+{
+  ResourceStatistics result;
+
+  // Set the timestamp now we have all statistics.
+  result.set_timestamp(Clock::now().secs());
+
+  foreach (const Future<ResourceStatistics>& statistic, statistics) {
+    if (statistic.isReady()) {
+      result.MergeFrom(statistic.get());
+    } else {
+      LOG(WARNING) << "Skipping resource statistic for container "
+                   << containerId << " because: "
+                   << (statistic.isFailed() ? statistic.failure()
+                                            : "discarded");
+    }
+  }
+
+  if (resources.isSome()) {
+    // Set the resource allocations.
+    Option<Bytes> mem = resources.get().mem();
+    if (mem.isSome()) {
+      result.set_mem_limit_bytes(mem.get().bytes());
+    }
+
+    Option<double> cpus = resources.get().cpus();
+    if (cpus.isSome()) {
+      result.set_cpus_limit(cpus.get());
+    }
+  }
+
+  return result;
+}
+
+
+Future<ResourceStatistics> MesosContainerizerProcess::usage(
+    const ContainerID& containerId)
+{
+  if (!promises.contains(containerId)) {
+    return Failure("Unknown container: " + stringify(containerId));
+  }
+
+  list<Future<ResourceStatistics> > futures;
+  foreach (const Owned<Isolator>& isolator, isolators) {
+    futures.push_back(isolator->usage(containerId));
+  }
+
+  // Use await() here so we can return partial usage statistics.
+  // TODO(idownes): After recovery resources won't be known until after an
+  // update() because they aren't part of the SlaveState.
+  return await(futures)
+    .then(lambda::bind(
+          _usage, containerId, resources.get(containerId), lambda::_1));
+}
+
+
+void MesosContainerizerProcess::destroy(const ContainerID& containerId)
+{
+  if (!promises.contains(containerId)) {
+    LOG(WARNING) << "Ignoring destroy of unknown container: " << containerId;
+    return;
+  }
+
+  if (destroying.contains(containerId)) {
+    // Destroy has already been initiated.
+    return;
+  }
+  destroying.insert(containerId);
+
+  LOG(INFO) << "Destroying container '" << containerId << "'";
+
+  if (statuses.contains(containerId)) {
+    // Kill all processes then continue destruction.
+    launcher->destroy(containerId)
+      .onAny(defer(self(), &Self::_destroy, containerId, lambda::_1));
+  } else {
+    // The executor never forked so no processes to kill, go straight to
+    // __destroy() with status = None().
+    __destroy(containerId, None());
+  }
+}
+
+
+void MesosContainerizerProcess::_destroy(
+    const ContainerID& containerId,
+    const Future<Nothing>& future)
+{
+  // Something has gone wrong and the launcher wasn't able to kill all the
+  // processes in the container. We cannot clean up the isolators because they
+  // may require that all processes have exited so just return the failure to
+  // the slave.
+  // TODO(idownes): This is a pretty bad state to be in but we should consider
+  // cleaning up here.
+  if (!future.isReady()) {
+    promises[containerId]->fail(
+        "Failed to destroy container: " +
+        (future.isFailed() ? future.failure() : "discarded future"));
+
+    destroying.erase(containerId);
+
+    return;
+  }
+
+  // We've successfully killed all processes in the container so get the exit
+  // status of the executor when it's ready (it may already be) and continue
+  // the destroy.
+  statuses.get(containerId).get()
+    .onAny(defer(self(), &Self::__destroy, containerId, lambda::_1));
+}
+
+
+void MesosContainerizerProcess::__destroy(
+    const ContainerID& containerId,
+    const Future<Option<int > >& status)
+{
+  // Now that all processes have exited we can now clean up all isolators.
+  list<Future<Nothing> > futures;
+  foreach (const Owned<Isolator>& isolator, isolators) {
+    futures.push_back(isolator->cleanup(containerId));
+  }
+
+  // Wait for all isolators to complete cleanup before continuing.
+  collect(futures)
+    .onAny(defer(self(), &Self::___destroy, containerId, status, lambda::_1));
+}
+
+
+void MesosContainerizerProcess::___destroy(
+    const ContainerID& containerId,
+    const Future<Option<int > >& status,
+    const Future<list<Nothing> >& futures)
+{
+  // Something has gone wrong with one of the Isolators and cleanup failed.
+  // We'll fail the container termination and remove the 'destroying' flag but
+  // leave all other state. The containerizer is now in a bad state because
+  // at least one isolator has failed to clean up.
+  if (!futures.isReady()) {
+    promises[containerId]->fail(
+        "Failed to clean up isolators when destroying container: " +
+        (futures.isFailed() ? futures.failure() : "discarded future"));
+
+    destroying.erase(containerId);
+
+    return;
+  }
+
+  // A container is 'killed' if any isolator limited it.
+  // Note: We may not see a limitation in time for it to be registered. This
+  // could occur if the limitation (e.g., an OOM) killed the executor and we
+  // triggered destroy() off the executor exit.
+  bool killed = false;
+  string message;
+  if (limitations.contains(containerId)) {
+    killed = true;
+    foreach (const Limitation& limitation, limitations.get(containerId)) {
+      message += limitation.message;
+    }
+    message = strings::trim(message);
+  } else {
+    message = "Executor terminated";
+  }
+
+  containerizer::Termination termination;
+  termination.set_killed(killed);
+  termination.set_message(message);
+  if (status.isReady() && status.get().isSome()) {
+    termination.set_status(status.get().get());
+  }
+
+  promises[containerId]->set(termination);
+
+  promises.erase(containerId);
+  statuses.erase(containerId);
+  limitations.erase(containerId);
+  resources.erase(containerId);
+  destroying.erase(containerId);
+}
+
+
+void MesosContainerizerProcess::reaped(const ContainerID& containerId)
+{
+  if (!promises.contains(containerId)) {
+    return;
+  }
+
+  LOG(INFO) << "Executor for container '" << containerId << "' has exited";
+
+  // The executor has exited so destroy the container.
+  destroy(containerId);
+}
+
+
+void MesosContainerizerProcess::limited(
+    const ContainerID& containerId,
+    const Future<Limitation>& future)
+{
+  if (!promises.contains(containerId)) {
+    return;
+  }
+
+  if (future.isReady()) {
+    LOG(INFO) << "Container " << containerId << " has reached its limit for"
+              << " resource " << future.get().resource
+              << " and will be terminated";
+    limitations.put(containerId, future.get());
+  } else {
+    // TODO(idownes): A discarded future will not be an error when isolators
+    // discard their promises after cleanup.
+    LOG(ERROR) << "Error in a resource limitation for container "
+               << containerId << ": " << (future.isFailed() ? future.failure()
+                                                            : "discarded");
+  }
+
+  // The container has been affected by the limitation so destroy it.
+  destroy(containerId);
+}
+
+
+Future<hashset<ContainerID> > MesosContainerizerProcess::containers()
+{
+  return promises.keys();
+}
+
+
+} // namespace slave {
+} // namespace internal {
+} // namespace mesos {

http://git-wip-us.apache.org/repos/asf/mesos/blob/d0046dca/src/slave/containerizer/mesos/containerizer.hpp
----------------------------------------------------------------------
diff --git a/src/slave/containerizer/mesos/containerizer.hpp b/src/slave/containerizer/mesos/containerizer.hpp
new file mode 100644
index 0000000..8746968
--- /dev/null
+++ b/src/slave/containerizer/mesos/containerizer.hpp
@@ -0,0 +1,240 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#ifndef __MESOS_CONTAINERIZER_HPP__
+#define __MESOS_CONTAINERIZER_HPP__
+
+#include <list>
+#include <vector>
+
+#include <stout/hashmap.hpp>
+#include <stout/multihashmap.hpp>
+
+#include "slave/containerizer/containerizer.hpp"
+#include "slave/containerizer/isolator.hpp"
+#include "slave/containerizer/launcher.hpp"
+
+namespace mesos {
+namespace internal {
+namespace slave {
+
+// Forward declaration.
+class MesosContainerizerProcess;
+
+class MesosContainerizer : public Containerizer
+{
+public:
+  static Try<MesosContainerizer*> create(const Flags& flags, bool local);
+
+  MesosContainerizer(
+      const Flags& flags,
+      bool local,
+      const process::Owned<Launcher>& launcher,
+      const std::vector<process::Owned<Isolator> >& isolators);
+
+  virtual ~MesosContainerizer();
+
+  virtual process::Future<Nothing> recover(
+      const Option<state::SlaveState>& state);
+
+  virtual process::Future<Nothing> launch(
+      const ContainerID& containerId,
+      const ExecutorInfo& executorInfo,
+      const std::string& directory,
+      const Option<std::string>& user,
+      const SlaveID& slaveId,
+      const process::PID<Slave>& slavePid,
+      bool checkpoint);
+
+  virtual process::Future<Nothing> launch(
+      const ContainerID& containerId,
+      const TaskInfo& taskInfo,
+      const ExecutorInfo& executorInfo,
+      const std::string& directory,
+      const Option<std::string>& user,
+      const SlaveID& slaveId,
+      const process::PID<Slave>& slavePid,
+      bool checkpoint);
+
+  virtual process::Future<Nothing> update(
+      const ContainerID& containerId,
+      const Resources& resources);
+
+  virtual process::Future<ResourceStatistics> usage(
+      const ContainerID& containerId);
+
+  virtual process::Future<containerizer::Termination> wait(
+      const ContainerID& containerId);
+
+  virtual void destroy(const ContainerID& containerId);
+
+  virtual process::Future<hashset<ContainerID> > containers();
+
+private:
+  MesosContainerizerProcess* process;
+};
+
+
+class MesosContainerizerProcess
+  : public process::Process<MesosContainerizerProcess>
+{
+public:
+  MesosContainerizerProcess(
+      const Flags& _flags,
+      bool _local,
+      const process::Owned<Launcher>& _launcher,
+      const std::vector<process::Owned<Isolator> >& _isolators)
+    : flags(_flags),
+      local(_local),
+      launcher(_launcher),
+      isolators(_isolators) {}
+
+  virtual ~MesosContainerizerProcess() {}
+
+  process::Future<Nothing> recover(
+      const Option<state::SlaveState>& state);
+
+  process::Future<Nothing> launch(
+      const ContainerID& containerId,
+      const ExecutorInfo& executorInfo,
+      const std::string& directory,
+      const Option<std::string>& user,
+      const SlaveID& slaveId,
+      const process::PID<Slave>& slavePid,
+      bool checkpoint);
+
+  process::Future<Nothing> launch(
+      const ContainerID& containerId,
+      const TaskInfo& taskInfo,
+      const ExecutorInfo& executorInfo,
+      const std::string& directory,
+      const Option<std::string>& user,
+      const SlaveID& slaveId,
+      const process::PID<Slave>& slavePid,
+      bool checkpoint);
+
+  process::Future<Nothing> update(
+      const ContainerID& containerId,
+      const Resources& resources);
+
+  process::Future<ResourceStatistics> usage(
+      const ContainerID& containerId);
+
+  process::Future<containerizer::Termination> wait(
+      const ContainerID& containerId);
+
+  void destroy(const ContainerID& containerId);
+
+  process::Future<hashset<ContainerID> > containers();
+
+private:
+  process::Future<Nothing> _recover(
+      const std::list<state::RunState>& recoverable);
+
+  process::Future<Nothing> __recover(
+      const std::list<state::RunState>& recovered);
+
+  process::Future<std::list<Option<CommandInfo> > > prepare(
+      const ContainerID& containerId,
+      const ExecutorInfo& executorInfo,
+      const std::string& directory,
+      const Option<std::string>& user);
+
+  process::Future<Nothing> fetch(
+      const ContainerID& containerId,
+      const CommandInfo& commandInfo,
+      const std::string& directory,
+      const Option<std::string>& user);
+
+  process::Future<Nothing> _launch(
+      const ContainerID& containerId,
+      const ExecutorInfo& executorInfo,
+      const std::string& directory,
+      const Option<std::string>& user,
+      const SlaveID& slaveId,
+      const process::PID<Slave>& slavePid,
+      bool checkpoint,
+      const std::list<Option<CommandInfo> >& scripts);
+
+  process::Future<Nothing> isolate(
+      const ContainerID& containerId,
+      pid_t _pid);
+
+  process::Future<Nothing> exec(
+      const ContainerID& containerId,
+      int pipeWrite);
+
+  // Continues 'destroy()' once all processes have been killed by the launcher.
+  void _destroy(
+      const ContainerID& containerId,
+      const process::Future<Nothing>& future);
+
+  // Continues '_destroy()' once we get the exit status of the executor.
+  void __destroy(
+      const ContainerID& containerId,
+      const process::Future<Option<int > >& status);
+
+  // Continues (and completes) '__destroy()' once all isolators have completed
+  // cleanup.
+  void ___destroy(
+      const ContainerID& containerId,
+      const process::Future<Option<int > >& status,
+      const process::Future<std::list<Nothing> >& futures);
+
+  // Call back for when an isolator limits a container and impacts the
+  // processes. This will trigger container destruction.
+  void limited(
+      const ContainerID& containerId,
+      const process::Future<Limitation>& future);
+
+  // Call back for when the executor exits. This will trigger container
+  // destroy.
+  void reaped(const ContainerID& containerId);
+
+  const Flags flags;
+  const bool local;
+  const process::Owned<Launcher> launcher;
+  const std::vector<process::Owned<Isolator> > isolators;
+
+  // TODO(idownes): Consider putting these per-container variables into a
+  // struct.
+  // Promises for futures returned from wait().
+  hashmap<ContainerID,
+    process::Owned<process::Promise<containerizer::Termination> > > promises;
+
+  // We need to keep track of the future exit status for each executor because
+  // we'll only get a single notification when the executor exits.
+  hashmap<ContainerID, process::Future<Option<int> > > statuses;
+
+  // We keep track of any limitations received from each isolator so we can
+  // determine the cause of an executor termination.
+  multihashmap<ContainerID, Limitation> limitations;
+
+  // We keep track of the resources for each container so we can set the
+  // ResourceStatistics limits in usage().
+  hashmap<ContainerID, Resources> resources;
+
+  // Set of containers that are in process of being destroyed.
+  hashset<ContainerID> destroying;
+};
+
+} // namespace slave {
+} // namespace internal {
+} // namespace mesos {
+
+#endif // __MESOS_CONTAINERIZER_HPP__

http://git-wip-us.apache.org/repos/asf/mesos/blob/d0046dca/src/slave/containerizer/mesos/launch.cpp
----------------------------------------------------------------------
diff --git a/src/slave/containerizer/mesos/launch.cpp b/src/slave/containerizer/mesos/launch.cpp
new file mode 100644
index 0000000..2db1c7a
--- /dev/null
+++ b/src/slave/containerizer/mesos/launch.cpp
@@ -0,0 +1,211 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include <unistd.h>
+
+#include <iostream>
+#include <map>
+
+#include <stout/foreach.hpp>
+#include <stout/os.hpp>
+#include <stout/protobuf.hpp>
+#include <stout/unreachable.hpp>
+
+#include <stout/os/execenv.hpp>
+
+#include "mesos/mesos.hpp"
+
+#include "slave/containerizer/mesos/launch.hpp"
+
+using std::cerr;
+using std::endl;
+using std::map;
+using std::string;
+
+namespace mesos {
+namespace internal {
+namespace slave {
+
+const string MesosContainerizerLaunch::NAME = "launch";
+
+
+MesosContainerizerLaunch::Flags::Flags()
+{
+  add(&command,
+      "command",
+      "The command to execute.");
+
+  add(&directory,
+      "directory",
+      "The directory to chdir to.");
+
+  add(&user,
+      "user",
+      "The user to change to.");
+
+  add(&pipe_read,
+      "pipe_read",
+      "The read end of the control pipe.");
+
+  add(&pipe_write,
+      "pipe_write",
+      "The write end of the control pipe.");
+
+  add(&commands,
+      "commands",
+      "The additional preparation commands to execute before\n"
+      "executing the command.");
+}
+
+
+int MesosContainerizerLaunch::execute()
+{
+  // Check command line flags.
+  if (flags.command.isNone()) {
+    cerr << "Flag --command is not specified" << endl;
+    return 1;
+  }
+
+  if (flags.directory.isNone()) {
+    cerr << "Flag --directory is not specified" << endl;
+    return 1;
+  }
+
+  if (flags.pipe_read.isNone()) {
+    cerr << "Flag --pipe_read is not specified" << endl;
+    return 1;
+  }
+
+  if (flags.pipe_write.isNone()) {
+    cerr << "Flag --pipe_write is not specified" << endl;
+    return 1;
+  }
+
+  // Parse the command.
+  Try<CommandInfo> command =
+    ::protobuf::parse<CommandInfo>(flags.command.get());
+
+  if (command.isError()) {
+    cerr << "Failed to parse the command: " << command.error() << endl;
+    return 1;
+  }
+
+  Try<Nothing> close = os::close(flags.pipe_write.get());
+  if (close.isError()) {
+    cerr << "Failed to close pipe[1]: " << close.error() << endl;
+    return 1;
+  }
+
+  // Do a blocking read on the pipe until the parent signals us to continue.
+  char dummy;
+  ssize_t length;
+  while ((length = ::read(
+              flags.pipe_read.get(),
+              &dummy,
+              sizeof(dummy))) == -1 &&
+          errno == EINTR);
+
+  if (length != sizeof(dummy)) {
+     // There's a reasonable probability this will occur during slave
+     // restarts across a large/busy cluster.
+     cerr << "Failed to synchronize with slave (it's probably exited)" << endl;
+     return 1;
+  }
+
+  close = os::close(flags.pipe_read.get());
+  if (close.isError()) {
+    cerr << "Failed to close pipe[0]: " << close.error() << endl;
+    return 1;
+  }
+
+  // Run additional preparation commands. These are run as the same
+  // user and with the environment as the slave.
+  if (flags.commands.isSome()) {
+    // TODO(jieyu): Use JSON::Array if we have generic parse support.
+    JSON::Object object = flags.commands.get();
+    if (object.values.count("commands") == 0) {
+      cerr << "Invalid JSON format for flag --commands" << endl;
+      return 1;
+    }
+
+    if (!object.values["commands"].is<JSON::Array>()) {
+      cerr << "Invalid JSON format for flag --commands" << endl;
+      return 1;
+    }
+
+    JSON::Array array = object.values["commands"].as<JSON::Array>();
+    foreach (const JSON::Value& value, array.values) {
+      if (!value.is<JSON::Object>()) {
+        cerr << "Invalid JSON format for flag --commands" << endl;
+        return 1;
+      }
+
+      Try<CommandInfo> parse = ::protobuf::parse<CommandInfo>(value);
+      if (parse.isError()) {
+        cerr << "Failed to parse a preparation command: "
+             << parse.error() << endl;
+        return 1;
+      }
+
+      // Block until the command completes.
+      int status = os::system(parse.get().value());
+      if (!WIFEXITED(status) || (WEXITSTATUS(status) != 0)) {
+        cerr << "Failed to execute a preparation command" << endl;
+        return 1;
+      }
+    }
+  }
+
+  // Enter working directory.
+  if (!os::chdir(flags.directory.get())) {
+    cerr << "Failed to chdir into work directory "
+         << flags.directory.get() << endl;
+    return 1;
+  }
+
+  // Change user if provided. Note that we do that after executing the
+  // preparation commands so that those commands will be run with the
+  // same privilege as the mesos-slave.
+  if (flags.user.isSome() && !os::su(flags.user.get())) {
+    cerr << "Failed to change user to " << flags.user.get() << endl;
+    return 1;
+  }
+
+  // Relay the environment variables.
+  // TODO(jieyu): Consider using a clean environment.
+  map<string, string> env;
+  os::ExecEnv envp(env);
+
+  // Execute the command (via '/bin/sh -c command') with its environment.
+  execle(
+      "/bin/sh",
+      "sh",
+      "-c",
+      command.get().value().c_str(),
+      (char*) NULL,
+      envp());
+
+  // If we get here, the execle call failed.
+  cerr << "Failed to execute command" << endl;
+
+  return UNREACHABLE();
+}
+
+} // namespace slave {
+} // namespace internal {
+} // namespace mesos {

http://git-wip-us.apache.org/repos/asf/mesos/blob/d0046dca/src/slave/containerizer/mesos/launch.hpp
----------------------------------------------------------------------
diff --git a/src/slave/containerizer/mesos/launch.hpp b/src/slave/containerizer/mesos/launch.hpp
new file mode 100644
index 0000000..7c8b535
--- /dev/null
+++ b/src/slave/containerizer/mesos/launch.hpp
@@ -0,0 +1,60 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#ifndef __MESOS_CONTAINERIZER_LAUNCH_HPP__
+#define __MESOS_CONTAINERIZER_LAUNCH_HPP__
+
+#include <stout/json.hpp>
+#include <stout/option.hpp>
+#include <stout/subcommand.hpp>
+
+namespace mesos {
+namespace internal {
+namespace slave {
+
+class MesosContainerizerLaunch : public Subcommand
+{
+public:
+  static const std::string NAME;
+
+  struct Flags : public flags::FlagsBase
+  {
+    Flags();
+
+    Option<JSON::Object> command;
+    Option<std::string> directory;
+    Option<std::string> user;
+    Option<int> pipe_read;
+    Option<int> pipe_write;
+    Option<JSON::Object> commands; // Additional preparation commands.
+  };
+
+  MesosContainerizerLaunch() : Subcommand(NAME) {}
+
+  Flags flags;
+
+protected:
+  virtual int execute();
+  virtual flags::FlagsBase* getFlags() { return &flags; }
+};
+
+} // namespace slave {
+} // namespace internal {
+} // namespace mesos {
+
+#endif // __MESOS_CONTAINERIZER_LAUNCH_HPP__

http://git-wip-us.apache.org/repos/asf/mesos/blob/d0046dca/src/slave/containerizer/mesos/main.cpp
----------------------------------------------------------------------
diff --git a/src/slave/containerizer/mesos/main.cpp b/src/slave/containerizer/mesos/main.cpp
new file mode 100644
index 0000000..0e17931
--- /dev/null
+++ b/src/slave/containerizer/mesos/main.cpp
@@ -0,0 +1,34 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include <stout/none.hpp>
+#include <stout/subcommand.hpp>
+
+#include "slave/containerizer/mesos/launch.hpp"
+
+using namespace mesos::internal::slave;
+
+
+int main(int argc, char** argv)
+{
+  return Subcommand::dispatch(
+      None(),
+      argc,
+      argv,
+      new MesosContainerizerLaunch());
+}


[4/7] git commit: Added subcommand tests to stout tests.

Posted by ji...@apache.org.
Added subcommand tests to stout tests.

Review: https://reviews.apache.org/r/22764


Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/6b7e6572
Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/6b7e6572
Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/6b7e6572

Branch: refs/heads/master
Commit: 6b7e6572d27ec56847b55157fb000547707b87d5
Parents: 3279c40
Author: Jie Yu <yu...@gmail.com>
Authored: Wed Jun 25 14:34:12 2014 -0700
Committer: Jie Yu <yu...@gmail.com>
Committed: Wed Jun 25 14:34:15 2014 -0700

----------------------------------------------------------------------
 3rdparty/libprocess/3rdparty/Makefile.am | 1 +
 1 file changed, 1 insertion(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/mesos/blob/6b7e6572/3rdparty/libprocess/3rdparty/Makefile.am
----------------------------------------------------------------------
diff --git a/3rdparty/libprocess/3rdparty/Makefile.am b/3rdparty/libprocess/3rdparty/Makefile.am
index 3359907..429c956 100644
--- a/3rdparty/libprocess/3rdparty/Makefile.am
+++ b/3rdparty/libprocess/3rdparty/Makefile.am
@@ -156,6 +156,7 @@ stout_tests_SOURCES =				\
   $(STOUT)/tests/set_tests.cpp			\
   $(STOUT)/tests/some_tests.cpp			\
   $(STOUT)/tests/strings_tests.cpp		\
+  $(STOUT)/tests/subcommand_tests.cpp		\
   $(STOUT)/tests/thread_tests.cpp		\
   $(STOUT)/tests/uuid_tests.cpp