You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@mesos.apache.org by ji...@apache.org on 2014/06/18 19:19:55 UTC

[1/2] git commit: Refactored subprocess to support more IO redirect modes.

Repository: mesos
Updated Branches:
  refs/heads/master b5708315d -> 83fcc8ca1


Refactored subprocess to support more IO redirect modes.

Review: https://reviews.apache.org/r/22688


Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/9657fe0f
Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/9657fe0f
Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/9657fe0f

Branch: refs/heads/master
Commit: 9657fe0f94f6c0a7ad00a6c02d21da3b5fbc1e0d
Parents: b570831
Author: Jie Yu <yu...@gmail.com>
Authored: Tue Jun 17 11:30:20 2014 -0700
Committer: Jie Yu <yu...@gmail.com>
Committed: Wed Jun 18 10:19:13 2014 -0700

----------------------------------------------------------------------
 .../libprocess/include/process/subprocess.hpp   | 103 +++-
 3rdparty/libprocess/src/subprocess.cpp          | 262 ++++++++--
 .../libprocess/src/tests/subprocess_tests.cpp   | 491 +++++++++++++++----
 3 files changed, 707 insertions(+), 149 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/mesos/blob/9657fe0f/3rdparty/libprocess/include/process/subprocess.hpp
----------------------------------------------------------------------
diff --git a/3rdparty/libprocess/include/process/subprocess.hpp b/3rdparty/libprocess/include/process/subprocess.hpp
index e9d7b31..e93608a 100644
--- a/3rdparty/libprocess/include/process/subprocess.hpp
+++ b/3rdparty/libprocess/include/process/subprocess.hpp
@@ -1,6 +1,8 @@
 #ifndef __PROCESS_SUBPROCESS_HPP__
 #define __PROCESS_SUBPROCESS_HPP__
 
+#include <unistd.h>
+
 #include <sys/types.h>
 
 #include <map>
@@ -22,24 +24,79 @@ namespace process {
 //   1. The subprocess has terminated, and
 //   2. There are no longer any references to the associated
 //      Subprocess object.
-struct Subprocess
+class Subprocess
 {
+public:
+  // Describes how the I/O is redirected for stdin/stdout/stderr.
+  // One of the following three modes are supported:
+  //   1. PIPE: Redirect to a pipe. The pipe will be created
+  //      automatically and the user can read/write the parent side of
+  //      the pipe from in()/out()/err().
+  //   2. PATH: Redirect to a file. The file will be created if it
+  //      does not exist. If the file exists, it will be appended.
+  //   3. FD: Redirect to an open file descriptor.
+  class IO
+  {
+  private:
+    friend class Subprocess;
+
+    friend Try<Subprocess> subprocess(
+        const std::string& command,
+        const IO& in,
+        const IO& out,
+        const IO& err,
+        const Option<std::map<std::string, std::string> >& environment,
+        const Option<lambda::function<int()> >& setup);
+
+    enum Mode
+    {
+      PIPE, // Redirect I/O to a pipe.
+      PATH, // Redirect I/O to a file.
+      FD,   // Redirect I/O to an open file descriptor.
+    };
+
+    IO(Mode _mode, const Option<int>& _fd, const Option<std::string>& _path)
+      : mode(_mode), fd(_fd), path(_path) {}
+
+    Mode mode;
+    Option<int> fd;
+    Option<std::string> path;
+  };
+
+  // Syntactic sugar to create IO descriptors.
+  static IO PIPE()
+  {
+    return IO(IO::PIPE, None(), None());
+  }
+
+  static IO PATH(const std::string& path)
+  {
+    return IO(IO::PATH, None(), path);
+  }
+
+  static IO FD(int fd)
+  {
+    return IO(IO::FD, fd, None());
+  }
+
   // Returns the pid for the subprocess.
   pid_t pid() const { return data->pid; }
 
-  // File descriptor accessors for input / output.
-  int in()  const { return data->in;  }
-  int out() const { return data->out; }
-  int err() const { return data->err; }
+  // The parent side of the pipe for stdin/stdout/stderr.
+  Option<int> in()  const { return data->in;  }
+  Option<int> out() const { return data->out; }
+  Option<int> err() const { return data->err; }
 
   // Returns a future from process::reap of this subprocess.
   // Discarding this future has no effect on the subprocess.
   Future<Option<int> > status() const { return data->status; }
 
 private:
-  Subprocess() : data(new Data()) {}
   friend Try<Subprocess> subprocess(
       const std::string& command,
+      const Subprocess::IO& in,
+      const Subprocess::IO& out,
+      const Subprocess::IO& err,
       const Option<std::map<std::string, std::string> >& environment,
       const Option<lambda::function<int()> >& setup);
 
@@ -47,22 +104,26 @@ private:
   {
     ~Data()
     {
-      os::close(in);
-      os::close(out);
-      os::close(err);
+      if (in.isSome()) { os::close(in.get()); }
+      if (out.isSome()) { os::close(out.get()); }
+      if (err.isSome()) { os::close(err.get()); }
     }
 
     pid_t pid;
 
+    // The parent side of the pipe for stdin/stdout/stderr. If the
+    // mode is not PIPE, None will be stored.
     // NOTE: stdin, stdout, stderr are macros on some systems, hence
     // these names instead.
-    int in;
-    int out;
-    int err;
+    Option<int> in;
+    Option<int> out;
+    Option<int> err;
 
     Future<Option<int> > status;
   };
 
+  Subprocess() : data(new Data()) {}
+
   memory::shared_ptr<Data> data;
 };
 
@@ -78,9 +139,27 @@ private:
 // TODO(dhamon): Add an option to not combine the two environments.
 Try<Subprocess> subprocess(
     const std::string& command,
+    const Subprocess::IO& in,
+    const Subprocess::IO& out,
+    const Subprocess::IO& err,
     const Option<std::map<std::string, std::string> >& environment = None(),
     const Option<lambda::function<int()> >& setup = None());
 
+
+inline Try<Subprocess> subprocess(
+    const std::string& command,
+    const Option<std::map<std::string, std::string> >& environment = None(),
+    const Option<lambda::function<int()> >& setup = None())
+{
+  return subprocess(
+      command,
+      Subprocess::FD(STDIN_FILENO),
+      Subprocess::FD(STDOUT_FILENO),
+      Subprocess::FD(STDERR_FILENO),
+      environment,
+      setup);
+}
+
 } // namespace process {
 
 #endif // __PROCESS_SUBPROCESS_HPP__

http://git-wip-us.apache.org/repos/asf/mesos/blob/9657fe0f/3rdparty/libprocess/src/subprocess.cpp
----------------------------------------------------------------------
diff --git a/3rdparty/libprocess/src/subprocess.cpp b/3rdparty/libprocess/src/subprocess.cpp
index 9f8f37f..78fa1ec 100644
--- a/3rdparty/libprocess/src/subprocess.cpp
+++ b/3rdparty/libprocess/src/subprocess.cpp
@@ -17,6 +17,7 @@
 #include <stout/option.hpp>
 #include <stout/os.hpp>
 #include <stout/try.hpp>
+#include <stout/unreachable.hpp>
 
 #include <stout/os/execenv.hpp>
 
@@ -27,7 +28,7 @@ namespace process {
 namespace internal {
 
 // See the comment below as to why subprocess is passed to cleanup.
-void cleanup(
+static void cleanup(
     const Future<Option<int> >& result,
     Promise<Option<int> >* promise,
     const Subprocess& subprocess)
@@ -44,33 +45,176 @@ void cleanup(
   delete promise;
 }
 
+
+static void close(int stdinFd[2], int stdoutFd[2], int stderrFd[2])
+{
+  os::close(stdinFd[0]);
+  os::close(stdinFd[1]);
+  os::close(stdoutFd[0]);
+  os::close(stdoutFd[1]);
+  os::close(stderrFd[0]);
+  os::close(stderrFd[1]);
+}
+
+// This function will invoke os::cloexec on all file descriptors in
+// these pairs that are valid (i.e., >= 0).
+static Try<Nothing> cloexec(int stdinFd[2], int stdoutFd[2], int stderrFd[2])
+{
+  int fd[6] = {
+    stdinFd[0],
+    stdinFd[1],
+    stdoutFd[0],
+    stdoutFd[1],
+    stderrFd[0],
+    stderrFd[1]
+  };
+
+  for (int i = 0; i < 6; i++) {
+    if (fd[i] >= 0) {
+      Try<Nothing> cloexec = os::cloexec(fd[i]);
+      if (cloexec.isError()) {
+        return Error(cloexec.error());
+      }
+    }
+  }
+
+  return Nothing();
+}
+
 }  // namespace internal {
 
 
 // Runs the provided command in a subprocess.
 Try<Subprocess> subprocess(
     const string& command,
+    const Subprocess::IO& in,
+    const Subprocess::IO& out,
+    const Subprocess::IO& err,
     const Option<map<string, string> >& environment,
     const Option<lambda::function<int()> >& setup)
 {
-  // Create pipes for stdin, stdout, stderr.
-  // Index 0 is for reading, and index 1 is for writing.
-  int stdinPipe[2];
-  int stdoutPipe[2];
-  int stderrPipe[2];
-
-  if (pipe(stdinPipe) == -1) {
-    return ErrnoError("Failed to create pipe");
-  } else if (pipe(stdoutPipe) == -1) {
-    os::close(stdinPipe[0]);
-    os::close(stdinPipe[1]);
-    return ErrnoError("Failed to create pipe");
-  } else if (pipe(stderrPipe) == -1) {
-    os::close(stdinPipe[0]);
-    os::close(stdinPipe[1]);
-    os::close(stdoutPipe[0]);
-    os::close(stdoutPipe[1]);
-    return ErrnoError("Failed to create pipe");
+  // File descriptors for redirecting stdin/stdout/stderr. These file
+  // descriptors are used for different purposes depending on the
+  // specified I/O modes. If the mode is PIPE, the two file
+  // descriptors represent two ends of a pipe. If the mode is PATH or
+  // FD, only one of the two file descriptors is used. Our protocol
+  // here is that index 0 is always for reading, and index 1 is always
+  // for writing (similar to the pipe semantics).
+  int stdinFd[2] = { -1, -1 };
+  int stdoutFd[2] = { -1, -1 };
+  int stderrFd[2] = { -1, -1 };
+
+  // Prepare the file descriptor(s) for stdin.
+  switch (in.mode) {
+    case Subprocess::IO::FD: {
+      stdinFd[0] = ::dup(in.fd.get());
+      if (stdinFd[0] == -1) {
+        return ErrnoError("Failed to dup");
+      }
+      break;
+    }
+    case Subprocess::IO::PIPE: {
+      if (pipe(stdinFd) == -1) {
+        return ErrnoError("Failed to create pipe");
+      }
+      break;
+    }
+    case Subprocess::IO::PATH: {
+      Try<int> open = os::open(in.path.get(), O_RDONLY);
+      if (open.isError()) {
+        return Error(
+            "Failed to open '" + in.path.get() + "': " + open.error());
+      }
+      stdinFd[0] = open.get();
+      break;
+    }
+    default:
+      return UNREACHABLE();
+  }
+
+  // Prepare the file descriptor(s) for stdout.
+  switch (out.mode) {
+    case Subprocess::IO::FD: {
+      stdoutFd[1] = ::dup(out.fd.get());
+      if (stdoutFd[1] == -1) {
+        // Save the errno as 'close' below might overwrite it.
+        ErrnoError error("Failed to dup");
+        internal::close(stdinFd, stdoutFd, stderrFd);
+        return error;
+      }
+      break;
+    }
+    case Subprocess::IO::PIPE: {
+      if (pipe(stdoutFd) == -1) {
+        // Save the errno as 'close' below might overwrite it.
+        ErrnoError error("Failed to create pipe");
+        internal::close(stdinFd, stdoutFd, stderrFd);
+        return error;
+      }
+      break;
+    }
+    case Subprocess::IO::PATH: {
+      Try<int> open = os::open(
+          out.path.get(),
+          O_WRONLY | O_CREAT | O_APPEND,
+          S_IRUSR | S_IWUSR | S_IRGRP | S_IRWXO);
+
+      if (open.isError()) {
+        internal::close(stdinFd, stdoutFd, stderrFd);
+        return Error(
+            "Failed to open '" + out.path.get() + "': " + open.error());
+      }
+      stdoutFd[1] = open.get();
+      break;
+    }
+    default:
+      return UNREACHABLE();
+  }
+
+  // Prepare the file descriptor(s) for stderr.
+  switch (err.mode) {
+    case Subprocess::IO::FD: {
+      stderrFd[1] = ::dup(err.fd.get());
+      if (stderrFd[1] == -1) {
+        // Save the errno as 'close' below might overwrite it.
+        ErrnoError error("Failed to dup");
+        internal::close(stdinFd, stdoutFd, stderrFd);
+        return error;
+      }
+      break;
+    }
+    case Subprocess::IO::PIPE: {
+      if (pipe(stderrFd) == -1) {
+        // Save the errno as 'close' below might overwrite it.
+        ErrnoError error("Failed to create pipe");
+        internal::close(stdinFd, stdoutFd, stderrFd);
+        return error;
+      }
+      break;
+    }
+    case Subprocess::IO::PATH: {
+      Try<int> open = os::open(
+          err.path.get(),
+          O_WRONLY | O_CREAT | O_APPEND,
+          S_IRUSR | S_IWUSR | S_IRGRP | S_IRWXO);
+
+      if (open.isError()) {
+        internal::close(stdinFd, stdoutFd, stderrFd);
+        return Error(
+            "Failed to open '" + err.path.get() + "': " + open.error());
+      }
+      stderrFd[1] = open.get();
+      break;
+    }
+    default:
+      return UNREACHABLE();
+  }
+
+  // TODO(jieyu): Consider using O_CLOEXEC for atomic close-on-exec.
+  Try<Nothing> cloexec = internal::cloexec(stdinFd, stdoutFd, stderrFd);
+  if (cloexec.isError()) {
+    internal::close(stdinFd, stdoutFd, stderrFd);
+    return Error("Failed to cloexec: " + cloexec.error());
   }
 
   // We need to do this construction before doing the fork as it
@@ -82,13 +226,10 @@ Try<Subprocess> subprocess(
 
   pid_t pid;
   if ((pid = fork()) == -1) {
-    os::close(stdinPipe[0]);
-    os::close(stdinPipe[1]);
-    os::close(stdoutPipe[0]);
-    os::close(stdoutPipe[1]);
-    os::close(stderrPipe[0]);
-    os::close(stderrPipe[1]);
-    return ErrnoError("Failed to fork");
+    // Save the errno as 'close' below might overwrite it.
+    ErrnoError error("Failed to fork");
+    internal::close(stdinFd, stdoutFd, stderrFd);
+    return error;
   }
 
   Subprocess process;
@@ -97,19 +238,41 @@ Try<Subprocess> subprocess(
   if (process.data->pid == 0) {
     // Child.
     // Close parent's end of the pipes.
-    os::close(stdinPipe[1]);
-    os::close(stdoutPipe[0]);
-    os::close(stderrPipe[0]);
+    if (in.mode == Subprocess::IO::PIPE) {
+      while (::close(stdinFd[1]) == -1 && errno == EINTR);
+    }
+    if (out.mode == Subprocess::IO::PIPE) {
+      while (::close(stdoutFd[0]) == -1 && errno == EINTR);
+    }
+    if (err.mode == Subprocess::IO::PIPE) {
+      while (::close(stderrFd[0]) == -1 && errno == EINTR);
+    }
 
-    // Make our pipes look like stdin, stderr, stdout before we exec.
-    while (dup2(stdinPipe[0], STDIN_FILENO)   == -1 && errno == EINTR);
-    while (dup2(stdoutPipe[1], STDOUT_FILENO) == -1 && errno == EINTR);
-    while (dup2(stderrPipe[1], STDERR_FILENO) == -1 && errno == EINTR);
+    // Redirect I/O for stdin/stdout/stderr.
+    while (::dup2(stdinFd[0], STDIN_FILENO) == -1 && errno == EINTR);
+    while (::dup2(stdoutFd[1], STDOUT_FILENO) == -1 && errno == EINTR);
+    while (::dup2(stderrFd[1], STDERR_FILENO) == -1 && errno == EINTR);
 
-    // Close the copies.
-    os::close(stdinPipe[0]);
-    os::close(stdoutPipe[1]);
-    os::close(stderrPipe[1]);
+    // Close the copies. We need to make sure that we do not close the
+    // file descriptor assigned to stdin/stdout/stderr in case the
+    // parent has closed stdin/stdout/stderr when calling this
+    // function (in that case, a dup'ed file descriptor may have the
+    // same file descriptor number as stdin/stdout/stderr).
+    if (stdinFd[0] != STDIN_FILENO &&
+        stdinFd[0] != STDOUT_FILENO &&
+        stdinFd[0] != STDERR_FILENO) {
+      while (::close(stdinFd[0]) == -1 && errno == EINTR);
+    }
+    if (stdoutFd[1] != STDIN_FILENO &&
+        stdoutFd[1] != STDOUT_FILENO &&
+        stdoutFd[1] != STDERR_FILENO) {
+      while (::close(stdoutFd[1]) == -1 && errno == EINTR);
+    }
+    if (stderrFd[1] != STDIN_FILENO &&
+        stderrFd[1] != STDOUT_FILENO &&
+        stderrFd[1] != STDERR_FILENO) {
+      while (::close(stderrFd[1]) == -1 && errno == EINTR);
+    }
 
     if (setup.isSome()) {
       int status = setup.get()();
@@ -124,15 +287,24 @@ Try<Subprocess> subprocess(
   }
 
   // Parent.
+  // Close the file descriptors that are created by this function. For
+  // pipes, we close the child ends and store the parent ends (see the
+  // code below).
+  os::close(stdinFd[0]);
+  os::close(stdoutFd[1]);
+  os::close(stderrFd[1]);
 
-  // Close the child's end of the pipes.
-  os::close(stdinPipe[0]);
-  os::close(stdoutPipe[1]);
-  os::close(stderrPipe[1]);
-
-  process.data->in = stdinPipe[1];
-  process.data->out = stdoutPipe[0];
-  process.data->err = stderrPipe[0];
+  // If the mode is PIPE, store the parent side of the pipe so that
+  // the user can communicate with the subprocess.
+  if (in.mode == Subprocess::IO::PIPE) {
+    process.data->in = stdinFd[1];
+  }
+  if (out.mode == Subprocess::IO::PIPE) {
+    process.data->out = stdoutFd[0];
+  }
+  if (err.mode == Subprocess::IO::PIPE) {
+    process.data->err = stderrFd[0];
+  }
 
   // Rather than directly exposing the future from process::reap, we
   // must use an explicit promise so that we can ensure we can receive

http://git-wip-us.apache.org/repos/asf/mesos/blob/9657fe0f/3rdparty/libprocess/src/tests/subprocess_tests.cpp
----------------------------------------------------------------------
diff --git a/3rdparty/libprocess/src/tests/subprocess_tests.cpp b/3rdparty/libprocess/src/tests/subprocess_tests.cpp
index 7d890bf..1cb1ce3 100644
--- a/3rdparty/libprocess/src/tests/subprocess_tests.cpp
+++ b/3rdparty/libprocess/src/tests/subprocess_tests.cpp
@@ -15,16 +15,22 @@
 #include <stout/foreach.hpp>
 #include <stout/gtest.hpp>
 #include <stout/list.hpp>
-#include <stout/os/read.hpp>
 #include <stout/path.hpp>
 
+#include <stout/os/read.hpp>
+
+#include <stout/tests/utils.hpp>
+
 using namespace process;
 
 using std::map;
 using std::string;
 
 
-TEST(Subprocess, status)
+class SubprocessTest: public TemporaryDirectoryTest {};
+
+
+TEST_F(SubprocessTest, Status)
 {
   Clock::pause();
 
@@ -43,8 +49,8 @@ TEST(Subprocess, status)
   ASSERT_SOME(s.get().status().get());
 
   int status = s.get().status().get().get();
-  ASSERT_TRUE(WIFEXITED(status));
-  ASSERT_EQ(0, WEXITSTATUS(status));
+  EXPECT_TRUE(WIFEXITED(status));
+  EXPECT_EQ(0, WEXITSTATUS(status));
 
   // Exit 1.
   s = subprocess("exit 1");
@@ -61,8 +67,8 @@ TEST(Subprocess, status)
   ASSERT_SOME(s.get().status().get());
 
   status = s.get().status().get().get();
-  ASSERT_TRUE(WIFEXITED(status));
-  ASSERT_EQ(1, WEXITSTATUS(status));
+  EXPECT_TRUE(WIFEXITED(status));
+  EXPECT_EQ(1, WEXITSTATUS(status));
 
   // SIGTERM.
   s = subprocess("sleep 60");
@@ -81,8 +87,8 @@ TEST(Subprocess, status)
   ASSERT_SOME(s.get().status().get());
 
   status = s.get().status().get().get();
-  ASSERT_TRUE(WIFSIGNALED(status));
-  ASSERT_EQ(SIGTERM, WTERMSIG(status));
+  EXPECT_TRUE(WIFSIGNALED(status));
+  EXPECT_EQ(SIGTERM, WTERMSIG(status));
 
   // SIGKILL.
   s = subprocess("sleep 60");
@@ -101,25 +107,88 @@ TEST(Subprocess, status)
   ASSERT_SOME(s.get().status().get());
 
   status = s.get().status().get().get();
-  ASSERT_TRUE(WIFSIGNALED(status));
-  ASSERT_EQ(SIGKILL, WTERMSIG(status));
+  EXPECT_TRUE(WIFSIGNALED(status));
+  EXPECT_EQ(SIGKILL, WTERMSIG(status));
 
   Clock::resume();
 }
 
 
-TEST(Subprocess, output)
+TEST_F(SubprocessTest, PipeOutput)
 {
   Clock::pause();
 
   // Standard out.
-  Try<Subprocess> s = subprocess("echo hello");
+  Try<Subprocess> s = subprocess(
+      "echo hello",
+      Subprocess::PIPE(),
+      Subprocess::PIPE(),
+      Subprocess::PIPE());
 
   ASSERT_SOME(s);
+  ASSERT_SOME(s.get().out());
+  ASSERT_SOME(os::nonblock(s.get().out().get()));
+  AWAIT_EXPECT_EQ("hello\n", io::read(s.get().out().get()));
+
+  // Advance time until the internal reaper reaps the subprocess.
+  while (s.get().status().isPending()) {
+    Clock::advance(Seconds(1));
+    Clock::settle();
+  }
+
+  AWAIT_ASSERT_READY(s.get().status());
+  ASSERT_SOME(s.get().status().get());
+
+  int status = s.get().status().get().get();
+  EXPECT_TRUE(WIFEXITED(status));
+  EXPECT_EQ(0, WEXITSTATUS(status));
+
+  // Standard error.
+  s = subprocess(
+      "echo hello 1>&2",
+      Subprocess::PIPE(),
+      Subprocess::PIPE(),
+      Subprocess::PIPE());
+
+  ASSERT_SOME(s);
+  ASSERT_SOME(s.get().err());
+  ASSERT_SOME(os::nonblock(s.get().err().get()));
+  AWAIT_EXPECT_EQ("hello\n", io::read(s.get().err().get()));
+
+  // Advance time until the internal reaper reaps the subprocess.
+  while (s.get().status().isPending()) {
+    Clock::advance(Seconds(1));
+    Clock::settle();
+  }
+
+  AWAIT_ASSERT_READY(s.get().status());
+  ASSERT_SOME(s.get().status().get());
+
+  status = s.get().status().get().get();
+  EXPECT_TRUE(WIFEXITED(status));
+  EXPECT_EQ(0, WEXITSTATUS(status));
 
-  ASSERT_SOME(os::nonblock(s.get().out()));
+  Clock::resume();
+}
 
-  AWAIT_EXPECT_EQ("hello\n", io::read(s.get().out()));
+
+TEST_F(SubprocessTest, PipeInput)
+{
+  Clock::pause();
+
+  Try<Subprocess> s = subprocess(
+      "read word ; echo $word",
+      Subprocess::PIPE(),
+      Subprocess::PIPE(),
+      Subprocess::PIPE());
+
+  ASSERT_SOME(s);
+  ASSERT_SOME(s.get().in());
+  ASSERT_SOME(os::write(s.get().in().get(), "hello\n"));
+
+  ASSERT_SOME(s.get().out());
+  ASSERT_SOME(os::nonblock(s.get().out().get()));
+  AWAIT_EXPECT_EQ("hello\n", io::read(s.get().out().get()));
 
   // Advance time until the internal reaper reaps the subprocess.
   while (s.get().status().isPending()) {
@@ -129,19 +198,41 @@ TEST(Subprocess, output)
 
   AWAIT_ASSERT_READY(s.get().status());
   ASSERT_SOME(s.get().status().get());
+
   int status = s.get().status().get().get();
+  EXPECT_TRUE(WIFEXITED(status));
+  EXPECT_EQ(0, WEXITSTATUS(status));
 
-  ASSERT_TRUE(WIFEXITED(status));
-  ASSERT_EQ(0, WEXITSTATUS(status));
+  Clock::resume();
+}
 
-  // Standard error.
-  s = subprocess("echo hello 1>&2");
+
+TEST_F(SubprocessTest, PipeSplice)
+{
+  Clock::pause();
+
+  Try<Subprocess> s = subprocess(
+      "echo 'hello world'",
+      Subprocess::PIPE(),
+      Subprocess::PIPE(),
+      Subprocess::PIPE());
 
   ASSERT_SOME(s);
 
-  ASSERT_SOME(os::nonblock(s.get().err()));
+  // Create a temporary file for splicing into.
+  string path = path::join(os::getcwd(), "stdout");
 
-  AWAIT_EXPECT_EQ("hello\n", io::read(s.get().err()));
+  Try<int> fd = os::open(
+      path,
+      O_WRONLY | O_CREAT | O_TRUNC,
+      S_IRUSR | S_IWUSR | S_IRGRP | S_IRWXO);
+
+  ASSERT_SOME(fd);
+  ASSERT_SOME(os::nonblock(fd.get()));
+
+  ASSERT_SOME(s.get().out());
+  ASSERT_SOME(os::nonblock(s.get().out().get()));
+  AWAIT_READY(io::splice(s.get().out().get(), fd.get()));
 
   // Advance time until the internal reaper reaps the subprocess.
   while (s.get().status().isPending()) {
@@ -151,28 +242,61 @@ TEST(Subprocess, output)
 
   AWAIT_ASSERT_READY(s.get().status());
   ASSERT_SOME(s.get().status().get());
-  status = s.get().status().get().get();
 
-  ASSERT_TRUE(WIFEXITED(status));
-  ASSERT_EQ(0, WEXITSTATUS(status));
+  int status = s.get().status().get().get();
+  EXPECT_TRUE(WIFEXITED(status));
+  EXPECT_EQ(0, WEXITSTATUS(status));
+
+  // Now make sure all the data is there!
+  Try<string> read = os::read(path);
+  ASSERT_SOME(read);
+  EXPECT_EQ("hello world\n", read.get());
 
   Clock::resume();
 }
 
 
-TEST(Subprocess, input)
+TEST_F(SubprocessTest, PathOutput)
 {
   Clock::pause();
 
-  Try<Subprocess> s = subprocess("read word ; echo $word");
+  string out = path::join(os::getcwd(), "stdout");
+  string err = path::join(os::getcwd(), "stderr");
+
+  // Standard out.
+  Try<Subprocess> s = subprocess(
+      "echo hello",
+      Subprocess::PIPE(),
+      Subprocess::PATH(out),
+      Subprocess::PIPE());
 
   ASSERT_SOME(s);
 
-  ASSERT_SOME(os::write(s.get().in(), "hello\n"));
+  // Advance time until the internal reaper reaps the subprocess.
+  while (s.get().status().isPending()) {
+    Clock::advance(Seconds(1));
+    Clock::settle();
+  }
+
+  AWAIT_ASSERT_READY(s.get().status());
+  ASSERT_SOME(s.get().status().get());
+
+  int status = s.get().status().get().get();
+  EXPECT_TRUE(WIFEXITED(status));
+  EXPECT_EQ(0, WEXITSTATUS(status));
+
+  Try<string> read = os::read(out);
+  ASSERT_SOME(read);
+  EXPECT_EQ("hello\n", read.get());
 
-  ASSERT_SOME(os::nonblock(s.get().out()));
+  // Standard error.
+  s = subprocess(
+      "echo hello 1>&2",
+      Subprocess::PIPE(),
+      Subprocess::PIPE(),
+      Subprocess::PATH(err));
 
-  AWAIT_EXPECT_EQ("hello\n", io::read(s.get().out()));
+  ASSERT_SOME(s);
 
   // Advance time until the internal reaper reaps the subprocess.
   while (s.get().status().isPending()) {
@@ -182,36 +306,78 @@ TEST(Subprocess, input)
 
   AWAIT_ASSERT_READY(s.get().status());
   ASSERT_SOME(s.get().status().get());
-  int status = s.get().status().get().get();
 
-  ASSERT_TRUE(WIFEXITED(status));
-  ASSERT_EQ(0, WEXITSTATUS(status));
+  status = s.get().status().get().get();
+  EXPECT_TRUE(WIFEXITED(status));
+  EXPECT_EQ(0, WEXITSTATUS(status));
+
+  read = os::read(err);
+  ASSERT_SOME(read);
+  EXPECT_EQ("hello\n", read.get());
 
   Clock::resume();
 }
 
 
-TEST(Subprocess, splice)
+TEST_F(SubprocessTest, PathInput)
 {
   Clock::pause();
 
-  Try<Subprocess> s = subprocess("echo 'hello world'");
+  string in = path::join(os::getcwd(), "stdin");
+
+  ASSERT_SOME(os::write(in, "hello\n"));
+
+  Try<Subprocess> s = subprocess(
+      "read word ; echo $word",
+      Subprocess::PATH(in),
+      Subprocess::PIPE(),
+      Subprocess::PIPE());
 
   ASSERT_SOME(s);
+  ASSERT_SOME(s.get().out());
+  ASSERT_SOME(os::nonblock(s.get().out().get()));
+  AWAIT_EXPECT_EQ("hello\n", io::read(s.get().out().get()));
 
-  // Create a temporary file for splicing into.
-  Try<string> path = os::mktemp();
-  ASSERT_SOME(path);
+  // Advance time until the internal reaper reaps the subprocess.
+  while (s.get().status().isPending()) {
+    Clock::advance(Seconds(1));
+    Clock::settle();
+  }
+
+  AWAIT_ASSERT_READY(s.get().status());
+  ASSERT_SOME(s.get().status().get());
+
+  int status = s.get().status().get().get();
+  EXPECT_TRUE(WIFEXITED(status));
+  EXPECT_EQ(0, WEXITSTATUS(status));
+
+  Clock::resume();
+}
 
-  Try<int> fd = os::open(
-      path.get(),
-      O_WRONLY | O_CREAT | O_TRUNC, S_IRUSR | S_IWUSR | S_IRGRP | S_IRWXO);
-  ASSERT_SOME(fd);
-  ASSERT_SOME(os::nonblock(fd.get()));
 
-  ASSERT_SOME(os::nonblock(s.get().out()));
+TEST_F(SubprocessTest, FdOutput)
+{
+  Clock::pause();
+
+  string out = path::join(os::getcwd(), "stdout");
+  string err = path::join(os::getcwd(), "stderr");
+
+  // Standard out.
+  Try<int> outFd = os::open(
+      out,
+      O_WRONLY | O_CREAT | O_APPEND,
+      S_IRUSR | S_IWUSR | S_IRGRP | S_IRWXO);
+
+  ASSERT_SOME(outFd);
+
+  Try<Subprocess> s = subprocess(
+      "echo hello",
+      Subprocess::PIPE(),
+      Subprocess::FD(outFd.get()),
+      Subprocess::PIPE());
 
-  AWAIT_READY(io::splice(s.get().out(), fd.get()));
+  ASSERT_SOME(os::close(outFd.get()));
+  ASSERT_SOME(s);
 
   // Advance time until the internal reaper reaps the subprocess.
   while (s.get().status().isPending()) {
@@ -221,34 +387,101 @@ TEST(Subprocess, splice)
 
   AWAIT_ASSERT_READY(s.get().status());
   ASSERT_SOME(s.get().status().get());
+
   int status = s.get().status().get().get();
+  EXPECT_TRUE(WIFEXITED(status));
+  EXPECT_EQ(0, WEXITSTATUS(status));
 
-  ASSERT_TRUE(WIFEXITED(status));
-  ASSERT_EQ(0, WEXITSTATUS(status));
+  Try<string> read = os::read(out);
+  ASSERT_SOME(read);
+  EXPECT_EQ("hello\n", read.get());
 
-  // Now make sure all the data is there!
-  Try<string> read = os::read(path.get());
+  // Standard error.
+  Try<int> errFd = os::open(
+      err,
+      O_WRONLY | O_CREAT | O_APPEND,
+      S_IRUSR | S_IWUSR | S_IRGRP | S_IRWXO);
+
+  ASSERT_SOME(errFd);
+
+  s = subprocess(
+      "echo hello 1>&2",
+      Subprocess::PIPE(),
+      Subprocess::PIPE(),
+      Subprocess::FD(errFd.get()));
+
+  ASSERT_SOME(os::close(errFd.get()));
+  ASSERT_SOME(s);
+
+  // Advance time until the internal reaper reaps the subprocess.
+  while (s.get().status().isPending()) {
+    Clock::advance(Seconds(1));
+    Clock::settle();
+  }
+
+  AWAIT_ASSERT_READY(s.get().status());
+  ASSERT_SOME(s.get().status().get());
+
+  status = s.get().status().get().get();
+  EXPECT_TRUE(WIFEXITED(status));
+  EXPECT_EQ(0, WEXITSTATUS(status));
+
+  read = os::read(err);
   ASSERT_SOME(read);
-  EXPECT_EQ("hello world\n", read.get());
+  EXPECT_EQ("hello\n", read.get());
 
   Clock::resume();
 }
 
 
-TEST(Subprocess, environment)
+TEST_F(SubprocessTest, FdInput)
 {
   Clock::pause();
 
-  // Simple value.
-  map<string, string> environment;
-  environment["MESSAGE"] = "hello";
-  Try<Subprocess> s = subprocess("echo $MESSAGE", environment);
+  string in = path::join(os::getcwd(), "stdin");
+
+  ASSERT_SOME(os::write(in, "hello\n"));
+
+  Try<int> inFd = os::open(in, O_RDONLY, 0);
+  ASSERT_SOME(inFd);
+
+  Try<Subprocess> s = subprocess(
+      "read word ; echo $word",
+      Subprocess::FD(inFd.get()),
+      Subprocess::PIPE(),
+      Subprocess::PIPE());
+
+  ASSERT_SOME(os::close(inFd.get()));
 
   ASSERT_SOME(s);
+  ASSERT_SOME(s.get().out());
+  ASSERT_SOME(os::nonblock(s.get().out().get()));
+  AWAIT_EXPECT_EQ("hello\n", io::read(s.get().out().get()));
+
+  // Advance time until the internal reaper reaps the subprocess.
+  while (s.get().status().isPending()) {
+    Clock::advance(Seconds(1));
+    Clock::settle();
+  }
+
+  AWAIT_ASSERT_READY(s.get().status());
+  ASSERT_SOME(s.get().status().get());
+
+  int status = s.get().status().get().get();
+  EXPECT_TRUE(WIFEXITED(status));
+  EXPECT_EQ(0, WEXITSTATUS(status));
+
+  Clock::resume();
+}
+
 
-  ASSERT_SOME(os::nonblock(s.get().out()));
+TEST_F(SubprocessTest, Default)
+{
+  Clock::pause();
 
-  AWAIT_EXPECT_EQ("hello\n", io::read(s.get().out()));
+  Try<Subprocess> s = subprocess("echo hello world");
+
+  ASSERT_SOME(s);
 
   // Advance time until the internal reaper reaps the subprocess.
   while (s.get().status().isPending()) {
@@ -258,22 +491,64 @@ TEST(Subprocess, environment)
 
   AWAIT_ASSERT_READY(s.get().status());
   ASSERT_SOME(s.get().status().get());
+
   int status = s.get().status().get().get();
+  EXPECT_TRUE(WIFEXITED(status));
+  EXPECT_EQ(0, WEXITSTATUS(status));
+
+  Clock::resume();
+}
+
+
+TEST_F(SubprocessTest, Environment)
+{
+  Clock::pause();
+
+  // Simple value.
+  map<string, string> environment;
+  environment["MESSAGE"] = "hello";
+
+  Try<Subprocess> s = subprocess(
+      "echo $MESSAGE",
+      Subprocess::PIPE(),
+      Subprocess::PIPE(),
+      Subprocess::PIPE(),
+      environment);
+
+  ASSERT_SOME(s);
+  ASSERT_SOME(s.get().out());
+  ASSERT_SOME(os::nonblock(s.get().out().get()));
+  AWAIT_EXPECT_EQ("hello\n", io::read(s.get().out().get()));
+
+  // Advance time until the internal reaper reaps the subprocess.
+  while (s.get().status().isPending()) {
+    Clock::advance(Seconds(1));
+    Clock::settle();
+  }
+
+  AWAIT_ASSERT_READY(s.get().status());
+  ASSERT_SOME(s.get().status().get());
 
-  ASSERT_TRUE(WIFEXITED(status));
-  ASSERT_EQ(0, WEXITSTATUS(status));
+  int status = s.get().status().get().get();
+  EXPECT_TRUE(WIFEXITED(status));
+  EXPECT_EQ(0, WEXITSTATUS(status));
 
   // Multiple key-value pairs.
   environment.clear();
   environment["MESSAGE0"] = "hello";
   environment["MESSAGE1"] = "world";
-  s = subprocess("echo $MESSAGE0 $MESSAGE1", environment);
 
-  ASSERT_SOME(s);
-
-  ASSERT_SOME(os::nonblock(s.get().out()));
+  s = subprocess(
+      "echo $MESSAGE0 $MESSAGE1",
+      Subprocess::PIPE(),
+      Subprocess::PIPE(),
+      Subprocess::PIPE(),
+      environment);
 
-  AWAIT_EXPECT_EQ("hello world\n", io::read(s.get().out()));
+  ASSERT_SOME(s);
+  ASSERT_SOME(s.get().out());
+  ASSERT_SOME(os::nonblock(s.get().out().get()));
+  AWAIT_EXPECT_EQ("hello world\n", io::read(s.get().out().get()));
 
   // Advance time until the internal reaper reaps the subprocess.
   while (s.get().status().isPending()) {
@@ -283,27 +558,34 @@ TEST(Subprocess, environment)
 
   AWAIT_ASSERT_READY(s.get().status());
   ASSERT_SOME(s.get().status().get());
+
   status = s.get().status().get().get();
+  EXPECT_TRUE(WIFEXITED(status));
+  EXPECT_EQ(0, WEXITSTATUS(status));
 
-  ASSERT_TRUE(WIFEXITED(status));
-  ASSERT_EQ(0, WEXITSTATUS(status));
+  Clock::resume();
 }
 
 
-TEST(Subprocess, environmentWithSpaces)
+TEST_F(SubprocessTest, EnvironmentWithSpaces)
 {
   Clock::pause();
 
   // Spaces in value.
   map<string, string> environment;
   environment["MESSAGE"] = "hello world";
-  Try<Subprocess> s = subprocess("echo $MESSAGE", environment);
-
-  ASSERT_SOME(s);
 
-  ASSERT_SOME(os::nonblock(s.get().out()));
+  Try<Subprocess> s = subprocess(
+      "echo $MESSAGE",
+      Subprocess::PIPE(),
+      Subprocess::PIPE(),
+      Subprocess::PIPE(),
+      environment);
 
-  AWAIT_EXPECT_EQ("hello world\n", io::read(s.get().out()));
+  ASSERT_SOME(s);
+  ASSERT_SOME(s.get().out());
+  ASSERT_SOME(os::nonblock(s.get().out().get()));
+  AWAIT_EXPECT_EQ("hello world\n", io::read(s.get().out().get()));
 
   // Advance time until the internal reaper reaps the subprocess.
   while (s.get().status().isPending()) {
@@ -313,27 +595,34 @@ TEST(Subprocess, environmentWithSpaces)
 
   AWAIT_ASSERT_READY(s.get().status());
   ASSERT_SOME(s.get().status().get());
+
   int status = s.get().status().get().get();
+  EXPECT_TRUE(WIFEXITED(status));
+  EXPECT_EQ(0, WEXITSTATUS(status));
 
-  ASSERT_TRUE(WIFEXITED(status));
-  ASSERT_EQ(0, WEXITSTATUS(status));
+  Clock::resume();
 }
 
 
-TEST(Subprocess, environmentWithSpacesAndQuotes)
+TEST_F(SubprocessTest, EnvironmentWithSpacesAndQuotes)
 {
   Clock::pause();
 
   // Spaces and quotes in value.
   map<string, string> environment;
   environment["MESSAGE"] = "\"hello world\"";
-  Try<Subprocess> s = subprocess("echo $MESSAGE", environment);
-
-  ASSERT_SOME(s);
 
-  ASSERT_SOME(os::nonblock(s.get().out()));
+  Try<Subprocess> s = subprocess(
+      "echo $MESSAGE",
+      Subprocess::PIPE(),
+      Subprocess::PIPE(),
+      Subprocess::PIPE(),
+      environment);
 
-  AWAIT_EXPECT_EQ("\"hello world\"\n", io::read(s.get().out()));
+  ASSERT_SOME(s);
+  ASSERT_SOME(s.get().out());
+  ASSERT_SOME(os::nonblock(s.get().out().get()));
+  AWAIT_EXPECT_EQ("\"hello world\"\n", io::read(s.get().out().get()));
 
   // Advance time until the internal reaper reaps the subprocess.
   while (s.get().status().isPending()) {
@@ -343,14 +632,16 @@ TEST(Subprocess, environmentWithSpacesAndQuotes)
 
   AWAIT_ASSERT_READY(s.get().status());
   ASSERT_SOME(s.get().status().get());
+
   int status = s.get().status().get().get();
+  EXPECT_TRUE(WIFEXITED(status));
+  EXPECT_EQ(0, WEXITSTATUS(status));
 
-  ASSERT_TRUE(WIFEXITED(status));
-  ASSERT_EQ(0, WEXITSTATUS(status));
+  Clock::resume();
 }
 
 
-TEST(Subprocess, environmentOverride)
+TEST_F(SubprocessTest, EnvironmentOverride)
 {
   Clock::pause();
 
@@ -359,13 +650,18 @@ TEST(Subprocess, environmentOverride)
 
   map<string, string> environment;
   environment["MESSAGE"] = "goodbye";
-  Try<Subprocess> s = subprocess("echo $MESSAGE", environment);
 
-  ASSERT_SOME(s);
-
-  ASSERT_SOME(os::nonblock(s.get().out()));
+  Try<Subprocess> s = subprocess(
+      "echo $MESSAGE",
+      Subprocess::PIPE(),
+      Subprocess::PIPE(),
+      Subprocess::PIPE(),
+      environment);
 
-  AWAIT_EXPECT_EQ("goodbye\n", io::read(s.get().out()));
+  ASSERT_SOME(s);
+  ASSERT_SOME(s.get().out());
+  ASSERT_SOME(os::nonblock(s.get().out().get()));
+  AWAIT_EXPECT_EQ("goodbye\n", io::read(s.get().out().get()));
 
   // Advance time until the internal reaper reaps the subprocess.
   while (s.get().status().isPending()) {
@@ -375,14 +671,16 @@ TEST(Subprocess, environmentOverride)
 
   AWAIT_ASSERT_READY(s.get().status());
   ASSERT_SOME(s.get().status().get());
+
   int status = s.get().status().get().get();
+  EXPECT_TRUE(WIFEXITED(status));
+  EXPECT_EQ(0, WEXITSTATUS(status));
 
-  ASSERT_TRUE(WIFEXITED(status));
-  ASSERT_EQ(0, WEXITSTATUS(status));
+  Clock::resume();
 }
 
 
-int setupChdir(const string& directory)
+static int setupChdir(const string& directory)
 {
   // Keep everything async-signal safe.
   if (::chdir(directory.c_str()) == -1) {
@@ -393,7 +691,7 @@ int setupChdir(const string& directory)
 }
 
 
-TEST(Subprocess, setup)
+TEST_F(SubprocessTest, Setup)
 {
   Clock::pause();
 
@@ -403,6 +701,9 @@ TEST(Subprocess, setup)
   // chdir().
   Try<Subprocess> s = subprocess(
       "echo hello world > file",
+      Subprocess::PIPE(),
+      Subprocess::PIPE(),
+      Subprocess::PIPE(),
       None(),
       lambda::bind(&setupChdir, directory.get()));
 
@@ -428,19 +729,22 @@ TEST(Subprocess, setup)
 }
 
 
-int setupStatus(int ret)
+static int setupStatus(int ret)
 {
   return ret;
 }
 
 
-TEST(Subprocess, setupStatus)
+TEST_F(SubprocessTest, SetupStatus)
 {
   Clock::pause();
 
   // Exit 0 && setup 1.
   Try<Subprocess> s = subprocess(
       "exit 0",
+      Subprocess::PIPE(),
+      Subprocess::PIPE(),
+      Subprocess::PIPE(),
       None(),
       lambda::bind(&setupStatus, 1));
 
@@ -464,6 +768,9 @@ TEST(Subprocess, setupStatus)
   // Exit 1 && setup 0.
   s = subprocess(
       "exit 1",
+      Subprocess::PIPE(),
+      Subprocess::PIPE(),
+      Subprocess::PIPE(),
       None(),
       lambda::bind(&setupStatus, 0));
 


[2/2] git commit: Updated mesos to use the new Subprocess API.

Posted by ji...@apache.org.
Updated mesos to use the new Subprocess API.

Review: https://reviews.apache.org/r/22702


Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/83fcc8ca
Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/83fcc8ca
Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/83fcc8ca

Branch: refs/heads/master
Commit: 83fcc8ca15e8b9d19a511d64c99ec908acd7082a
Parents: 9657fe0
Author: Jie Yu <yu...@gmail.com>
Authored: Tue Jun 17 16:31:23 2014 -0700
Committer: Jie Yu <yu...@gmail.com>
Committed: Wed Jun 18 10:19:23 2014 -0700

----------------------------------------------------------------------
 src/launcher/launcher.cpp                        | 12 +++++++++---
 src/linux/perf.cpp                               | 15 ++++++++++-----
 .../containerizer/external_containerizer.cpp     | 19 +++++++++++--------
 src/slave/containerizer/mesos_containerizer.cpp  | 12 +++++++++---
 src/tests/slave_tests.cpp                        |  7 ++++++-
 5 files changed, 45 insertions(+), 20 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/mesos/blob/83fcc8ca/src/launcher/launcher.cpp
----------------------------------------------------------------------
diff --git a/src/launcher/launcher.cpp b/src/launcher/launcher.cpp
index 1d352b6..5585aad 100644
--- a/src/launcher/launcher.cpp
+++ b/src/launcher/launcher.cpp
@@ -206,13 +206,19 @@ process::Future<Option<int> > Operation::launch(
   // Prepare the command: 'mesos-launcher <operation_name> ...'.
   string command = strings::join(" ", realpath.get(), name());
 
-  Try<Subprocess> s = subprocess(command, environment);
+  Try<Subprocess> s = subprocess(
+      command,
+      Subprocess::PIPE(),
+      Subprocess::PIPE(),
+      Subprocess::PIPE(),
+      environment);
+
   if (s.isError()) {
     return Failure("Launch subprocess failed: " + s.error());
   }
 
-  io::redirect(s.get().out(), stdout);
-  io::redirect(s.get().err(), stderr);
+  io::redirect(s.get().out().get(), stdout);
+  io::redirect(s.get().err().get(), stderr);
 
   return s.get().status();
 }

http://git-wip-us.apache.org/repos/asf/mesos/blob/83fcc8ca/src/linux/perf.cpp
----------------------------------------------------------------------
diff --git a/src/linux/perf.cpp b/src/linux/perf.cpp
index bc31db5..fc5acf4 100644
--- a/src/linux/perf.cpp
+++ b/src/linux/perf.cpp
@@ -157,7 +157,12 @@ protected:
 private:
   void sample()
   {
-    Try<Subprocess> _perf = subprocess(command);
+    Try<Subprocess> _perf = subprocess(
+        command,
+        Subprocess::PIPE(),
+        Subprocess::PIPE(),
+        Subprocess::PIPE());
+
     if (_perf.isError()) {
       promise.fail("Failed to launch perf process: " + _perf.error());
       terminate(self());
@@ -165,7 +170,7 @@ private:
     }
     perf = _perf.get();
 
-    Try<Nothing> nonblock = os::nonblock(perf.get().out());
+    Try<Nothing> nonblock = os::nonblock(perf.get().out().get());
     if (nonblock.isError()) {
       promise.fail("Failed to set nonblock on stdout for perf process: " +
                     nonblock.error());
@@ -173,7 +178,7 @@ private:
       return;
     }
 
-    nonblock = os::nonblock(perf.get().err());
+    nonblock = os::nonblock(perf.get().err().get());
     if (nonblock.isError()) {
       promise.fail("Failed to set nonblock on stderr for perf process: " +
                     nonblock.error());
@@ -184,8 +189,8 @@ private:
     // Start reading from stdout and stderr now. We don't use stderr
     // but must read from it to avoid the subprocess blocking on the
     // pipe.
-    output.push_back(process::io::read(perf.get().out()));
-    output.push_back(process::io::read(perf.get().err()));
+    output.push_back(process::io::read(perf.get().out().get()));
+    output.push_back(process::io::read(perf.get().err().get()));
 
     // Wait for the process to exit.
     perf.get().status()

http://git-wip-us.apache.org/repos/asf/mesos/blob/83fcc8ca/src/slave/containerizer/external_containerizer.cpp
----------------------------------------------------------------------
diff --git a/src/slave/containerizer/external_containerizer.cpp b/src/slave/containerizer/external_containerizer.cpp
index b5d0c4c..923046c 100644
--- a/src/slave/containerizer/external_containerizer.cpp
+++ b/src/slave/containerizer/external_containerizer.cpp
@@ -626,7 +626,7 @@ Future<containerizer::Termination> ExternalContainerizerProcess::_wait(
     &::protobuf::read<containerizer::Termination>;
 
   Future<Result<containerizer::Termination> > future = async(
-      read, invoked.get().out(), false, false);
+      read, invoked.get().out().get(), false, false);
 
   // Await both, a protobuf Message from the subprocess as well as
   // its exit.
@@ -817,7 +817,7 @@ Future<ResourceStatistics> ExternalContainerizerProcess::_usage(
     &::protobuf::read<ResourceStatistics>;
 
   Future<Result<ResourceStatistics> > future = async(
-      read, invoked.get().out(), false, false);
+      read, invoked.get().out().get(), false, false);
 
   // Await both, a protobuf Message from the subprocess as well as
   // its exit.
@@ -955,7 +955,7 @@ Future<hashset<ContainerID> > ExternalContainerizerProcess::containers()
     &::protobuf::read<containerizer::Containers>;
 
   Future<Result<containerizer::Containers> > future = async(
-      read, invoked.get().out(), false, false);
+      read, invoked.get().out().get(), false, false);
 
   // Await both, a protobuf Message from the subprocess as well as
   // its exit.
@@ -1116,6 +1116,9 @@ Try<Subprocess> ExternalContainerizerProcess::invoke(
   // the child-context.
   Try<Subprocess> external = process::subprocess(
       execute,
+      Subprocess::PIPE(),
+      Subprocess::PIPE(),
+      Subprocess::PIPE(),
       environment,
       lambda::bind(&setup, sandbox.isSome() ? sandbox.get().directory
                                             : string()));
@@ -1128,11 +1131,11 @@ Try<Subprocess> ExternalContainerizerProcess::invoke(
   // Sync parent and child process to make sure we have done the
   // setsid within the child context before continuing.
   int sync;
-  while (::read(external.get().out(), &sync, sizeof(sync)) == -1 &&
+  while (::read(external.get().out().get(), &sync, sizeof(sync)) == -1 &&
          errno == EINTR);
 
   // Set stderr into non-blocking mode.
-  Try<Nothing> nonblock = os::nonblock(external.get().err());
+  Try<Nothing> nonblock = os::nonblock(external.get().err().get());
   if (nonblock.isError()) {
     return Error("Failed to accept nonblock: " + nonblock.error());
   }
@@ -1170,14 +1173,14 @@ Try<Subprocess> ExternalContainerizerProcess::invoke(
 
   // TODO(tillt): Consider adding an overload to io::redirect
   // that accepts a file path as 'to' for further reducing code.
-  io::redirect(external.get().err(), err.get());
+  io::redirect(external.get().err().get(), err.get());
 
   // Redirect does 'dup' the file descriptor, hence we can close the
   // original now.
   os::close(err.get());
 
   VLOG(2) << "Subprocess pid: " << external.get().pid() << ", "
-          << "output pipe: " << external.get().out();
+          << "output pipe: " << external.get().out().get();
 
   return external;
 }
@@ -1196,7 +1199,7 @@ Try<Subprocess> ExternalContainerizerProcess::invoke(
 
   // Transmit protobuf data via stdout towards the external
   // containerizer. Each message is prefixed by its total size.
-  Try<Nothing> write = ::protobuf::write(external.get().in(), message);
+  Try<Nothing> write = ::protobuf::write(external.get().in().get(), message);
   if (write.isError()) {
     return Error("Failed to write protobuf to pipe: " + write.error());
   }

http://git-wip-us.apache.org/repos/asf/mesos/blob/83fcc8ca/src/slave/containerizer/mesos_containerizer.cpp
----------------------------------------------------------------------
diff --git a/src/slave/containerizer/mesos_containerizer.cpp b/src/slave/containerizer/mesos_containerizer.cpp
index 4d97d49..e5c159d 100644
--- a/src/slave/containerizer/mesos_containerizer.cpp
+++ b/src/slave/containerizer/mesos_containerizer.cpp
@@ -658,7 +658,13 @@ Future<Nothing> MesosContainerizerProcess::fetch(
   LOG(INFO) << "Fetching URIs for container '" << containerId
             << "' using command '" << command << "'";
 
-  Try<Subprocess> fetcher = subprocess(command, environment);
+  Try<Subprocess> fetcher = subprocess(
+      command,
+      Subprocess::PIPE(),
+      Subprocess::PIPE(),
+      Subprocess::PIPE(),
+      environment);
+
   if (fetcher.isError()) {
     return Failure("Failed to execute mesos-fetcher: " + fetcher.error());
   }
@@ -693,7 +699,7 @@ Future<Nothing> MesosContainerizerProcess::fetch(
 
   // Redirect takes care of nonblocking and close-on-exec for the
   // supplied file descriptors.
-  io::redirect(fetcher.get().out(), out.get());
+  io::redirect(fetcher.get().out().get(), out.get());
 
   // Redirect does 'dup' the file descriptor, hence we can close the
   // original now.
@@ -722,7 +728,7 @@ Future<Nothing> MesosContainerizerProcess::fetch(
     }
   }
 
-  io::redirect(fetcher.get().err(), err.get());
+  io::redirect(fetcher.get().err().get(), err.get());
 
   os::close(err.get());
 

http://git-wip-us.apache.org/repos/asf/mesos/blob/83fcc8ca/src/tests/slave_tests.cpp
----------------------------------------------------------------------
diff --git a/src/tests/slave_tests.cpp b/src/tests/slave_tests.cpp
index 9178e01..9dc4046 100644
--- a/src/tests/slave_tests.cpp
+++ b/src/tests/slave_tests.cpp
@@ -346,7 +346,12 @@ TEST_F(SlaveTest, MesosExecutorWithOverride)
     .WillOnce(FutureArg<1>(&status2));
 
   Try<process::Subprocess> executor =
-    process::subprocess(executorCommand, environment);
+    process::subprocess(
+        executorCommand,
+        process::Subprocess::PIPE(),
+        process::Subprocess::PIPE(),
+        process::Subprocess::PIPE(),
+        environment);
 
   ASSERT_SOME(executor);