You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@mesos.apache.org by ji...@apache.org on 2014/06/18 19:19:55 UTC
[1/2] git commit: Refactored subprocess to support more IO redirect
modes.
Repository: mesos
Updated Branches:
refs/heads/master b5708315d -> 83fcc8ca1
Refactored subprocess to support more IO redirect modes.
Review: https://reviews.apache.org/r/22688
Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/9657fe0f
Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/9657fe0f
Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/9657fe0f
Branch: refs/heads/master
Commit: 9657fe0f94f6c0a7ad00a6c02d21da3b5fbc1e0d
Parents: b570831
Author: Jie Yu <yu...@gmail.com>
Authored: Tue Jun 17 11:30:20 2014 -0700
Committer: Jie Yu <yu...@gmail.com>
Committed: Wed Jun 18 10:19:13 2014 -0700
----------------------------------------------------------------------
.../libprocess/include/process/subprocess.hpp | 103 +++-
3rdparty/libprocess/src/subprocess.cpp | 262 ++++++++--
.../libprocess/src/tests/subprocess_tests.cpp | 491 +++++++++++++++----
3 files changed, 707 insertions(+), 149 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/mesos/blob/9657fe0f/3rdparty/libprocess/include/process/subprocess.hpp
----------------------------------------------------------------------
diff --git a/3rdparty/libprocess/include/process/subprocess.hpp b/3rdparty/libprocess/include/process/subprocess.hpp
index e9d7b31..e93608a 100644
--- a/3rdparty/libprocess/include/process/subprocess.hpp
+++ b/3rdparty/libprocess/include/process/subprocess.hpp
@@ -1,6 +1,8 @@
#ifndef __PROCESS_SUBPROCESS_HPP__
#define __PROCESS_SUBPROCESS_HPP__
+#include <unistd.h>
+
#include <sys/types.h>
#include <map>
@@ -22,24 +24,79 @@ namespace process {
// 1. The subprocess has terminated, and
// 2. There are no longer any references to the associated
// Subprocess object.
-struct Subprocess
+class Subprocess
{
+public:
+ // Describes how the I/O is redirected for stdin/stdout/stderr.
+ // One of the following three modes are supported:
+ // 1. PIPE: Redirect to a pipe. The pipe will be created
+ // automatically and the user can read/write the parent side of
+ // the pipe from in()/out()/err().
+ // 2. PATH: Redirect to a file. The file will be created if it
+ // does not exist. If the file exists, it will be appended.
+ // 3. FD: Redirect to an open file descriptor.
+ class IO
+ {
+ private:
+ friend class Subprocess;
+
+ friend Try<Subprocess> subprocess(
+ const std::string& command,
+ const IO& in,
+ const IO& out,
+ const IO& err,
+ const Option<std::map<std::string, std::string> >& environment,
+ const Option<lambda::function<int()> >& setup);
+
+ enum Mode
+ {
+ PIPE, // Redirect I/O to a pipe.
+ PATH, // Redirect I/O to a file.
+ FD, // Redirect I/O to an open file descriptor.
+ };
+
+ IO(Mode _mode, const Option<int>& _fd, const Option<std::string>& _path)
+ : mode(_mode), fd(_fd), path(_path) {}
+
+ Mode mode;
+ Option<int> fd;
+ Option<std::string> path;
+ };
+
+ // Syntactic sugar to create IO descriptors.
+ static IO PIPE()
+ {
+ return IO(IO::PIPE, None(), None());
+ }
+
+ static IO PATH(const std::string& path)
+ {
+ return IO(IO::PATH, None(), path);
+ }
+
+ static IO FD(int fd)
+ {
+ return IO(IO::FD, fd, None());
+ }
+
// Returns the pid for the subprocess.
pid_t pid() const { return data->pid; }
- // File descriptor accessors for input / output.
- int in() const { return data->in; }
- int out() const { return data->out; }
- int err() const { return data->err; }
+ // The parent side of the pipe for stdin/stdout/stderr.
+ Option<int> in() const { return data->in; }
+ Option<int> out() const { return data->out; }
+ Option<int> err() const { return data->err; }
// Returns a future from process::reap of this subprocess.
// Discarding this future has no effect on the subprocess.
Future<Option<int> > status() const { return data->status; }
private:
- Subprocess() : data(new Data()) {}
friend Try<Subprocess> subprocess(
const std::string& command,
+ const Subprocess::IO& in,
+ const Subprocess::IO& out,
+ const Subprocess::IO& err,
const Option<std::map<std::string, std::string> >& environment,
const Option<lambda::function<int()> >& setup);
@@ -47,22 +104,26 @@ private:
{
~Data()
{
- os::close(in);
- os::close(out);
- os::close(err);
+ if (in.isSome()) { os::close(in.get()); }
+ if (out.isSome()) { os::close(out.get()); }
+ if (err.isSome()) { os::close(err.get()); }
}
pid_t pid;
+ // The parent side of the pipe for stdin/stdout/stderr. If the
+ // mode is not PIPE, None will be stored.
// NOTE: stdin, stdout, stderr are macros on some systems, hence
// these names instead.
- int in;
- int out;
- int err;
+ Option<int> in;
+ Option<int> out;
+ Option<int> err;
Future<Option<int> > status;
};
+ Subprocess() : data(new Data()) {}
+
memory::shared_ptr<Data> data;
};
@@ -78,9 +139,27 @@ private:
// TODO(dhamon): Add an option to not combine the two environments.
Try<Subprocess> subprocess(
const std::string& command,
+ const Subprocess::IO& in,
+ const Subprocess::IO& out,
+ const Subprocess::IO& err,
const Option<std::map<std::string, std::string> >& environment = None(),
const Option<lambda::function<int()> >& setup = None());
+
+inline Try<Subprocess> subprocess(
+ const std::string& command,
+ const Option<std::map<std::string, std::string> >& environment = None(),
+ const Option<lambda::function<int()> >& setup = None())
+{
+ return subprocess(
+ command,
+ Subprocess::FD(STDIN_FILENO),
+ Subprocess::FD(STDOUT_FILENO),
+ Subprocess::FD(STDERR_FILENO),
+ environment,
+ setup);
+}
+
} // namespace process {
#endif // __PROCESS_SUBPROCESS_HPP__
http://git-wip-us.apache.org/repos/asf/mesos/blob/9657fe0f/3rdparty/libprocess/src/subprocess.cpp
----------------------------------------------------------------------
diff --git a/3rdparty/libprocess/src/subprocess.cpp b/3rdparty/libprocess/src/subprocess.cpp
index 9f8f37f..78fa1ec 100644
--- a/3rdparty/libprocess/src/subprocess.cpp
+++ b/3rdparty/libprocess/src/subprocess.cpp
@@ -17,6 +17,7 @@
#include <stout/option.hpp>
#include <stout/os.hpp>
#include <stout/try.hpp>
+#include <stout/unreachable.hpp>
#include <stout/os/execenv.hpp>
@@ -27,7 +28,7 @@ namespace process {
namespace internal {
// See the comment below as to why subprocess is passed to cleanup.
-void cleanup(
+static void cleanup(
const Future<Option<int> >& result,
Promise<Option<int> >* promise,
const Subprocess& subprocess)
@@ -44,33 +45,176 @@ void cleanup(
delete promise;
}
+
+static void close(int stdinFd[2], int stdoutFd[2], int stderrFd[2])
+{
+ os::close(stdinFd[0]);
+ os::close(stdinFd[1]);
+ os::close(stdoutFd[0]);
+ os::close(stdoutFd[1]);
+ os::close(stderrFd[0]);
+ os::close(stderrFd[1]);
+}
+
+// This function will invoke os::cloexec on all file descriptors in
+// these pairs that are valid (i.e., >= 0).
+static Try<Nothing> cloexec(int stdinFd[2], int stdoutFd[2], int stderrFd[2])
+{
+ int fd[6] = {
+ stdinFd[0],
+ stdinFd[1],
+ stdoutFd[0],
+ stdoutFd[1],
+ stderrFd[0],
+ stderrFd[1]
+ };
+
+ for (int i = 0; i < 6; i++) {
+ if (fd[i] >= 0) {
+ Try<Nothing> cloexec = os::cloexec(fd[i]);
+ if (cloexec.isError()) {
+ return Error(cloexec.error());
+ }
+ }
+ }
+
+ return Nothing();
+}
+
} // namespace internal {
// Runs the provided command in a subprocess.
Try<Subprocess> subprocess(
const string& command,
+ const Subprocess::IO& in,
+ const Subprocess::IO& out,
+ const Subprocess::IO& err,
const Option<map<string, string> >& environment,
const Option<lambda::function<int()> >& setup)
{
- // Create pipes for stdin, stdout, stderr.
- // Index 0 is for reading, and index 1 is for writing.
- int stdinPipe[2];
- int stdoutPipe[2];
- int stderrPipe[2];
-
- if (pipe(stdinPipe) == -1) {
- return ErrnoError("Failed to create pipe");
- } else if (pipe(stdoutPipe) == -1) {
- os::close(stdinPipe[0]);
- os::close(stdinPipe[1]);
- return ErrnoError("Failed to create pipe");
- } else if (pipe(stderrPipe) == -1) {
- os::close(stdinPipe[0]);
- os::close(stdinPipe[1]);
- os::close(stdoutPipe[0]);
- os::close(stdoutPipe[1]);
- return ErrnoError("Failed to create pipe");
+ // File descriptors for redirecting stdin/stdout/stderr. These file
+ // descriptors are used for different purposes depending on the
+ // specified I/O modes. If the mode is PIPE, the two file
+ // descriptors represent two ends of a pipe. If the mode is PATH or
+ // FD, only one of the two file descriptors is used. Our protocol
+ // here is that index 0 is always for reading, and index 1 is always
+ // for writing (similar to the pipe semantics).
+ int stdinFd[2] = { -1, -1 };
+ int stdoutFd[2] = { -1, -1 };
+ int stderrFd[2] = { -1, -1 };
+
+ // Prepare the file descriptor(s) for stdin.
+ switch (in.mode) {
+ case Subprocess::IO::FD: {
+ stdinFd[0] = ::dup(in.fd.get());
+ if (stdinFd[0] == -1) {
+ return ErrnoError("Failed to dup");
+ }
+ break;
+ }
+ case Subprocess::IO::PIPE: {
+ if (pipe(stdinFd) == -1) {
+ return ErrnoError("Failed to create pipe");
+ }
+ break;
+ }
+ case Subprocess::IO::PATH: {
+ Try<int> open = os::open(in.path.get(), O_RDONLY);
+ if (open.isError()) {
+ return Error(
+ "Failed to open '" + in.path.get() + "': " + open.error());
+ }
+ stdinFd[0] = open.get();
+ break;
+ }
+ default:
+ return UNREACHABLE();
+ }
+
+ // Prepare the file descriptor(s) for stdout.
+ switch (out.mode) {
+ case Subprocess::IO::FD: {
+ stdoutFd[1] = ::dup(out.fd.get());
+ if (stdoutFd[1] == -1) {
+ // Save the errno as 'close' below might overwrite it.
+ ErrnoError error("Failed to dup");
+ internal::close(stdinFd, stdoutFd, stderrFd);
+ return error;
+ }
+ break;
+ }
+ case Subprocess::IO::PIPE: {
+ if (pipe(stdoutFd) == -1) {
+ // Save the errno as 'close' below might overwrite it.
+ ErrnoError error("Failed to create pipe");
+ internal::close(stdinFd, stdoutFd, stderrFd);
+ return error;
+ }
+ break;
+ }
+ case Subprocess::IO::PATH: {
+ Try<int> open = os::open(
+ out.path.get(),
+ O_WRONLY | O_CREAT | O_APPEND,
+ S_IRUSR | S_IWUSR | S_IRGRP | S_IRWXO);
+
+ if (open.isError()) {
+ internal::close(stdinFd, stdoutFd, stderrFd);
+ return Error(
+ "Failed to open '" + out.path.get() + "': " + open.error());
+ }
+ stdoutFd[1] = open.get();
+ break;
+ }
+ default:
+ return UNREACHABLE();
+ }
+
+ // Prepare the file descriptor(s) for stderr.
+ switch (err.mode) {
+ case Subprocess::IO::FD: {
+ stderrFd[1] = ::dup(err.fd.get());
+ if (stderrFd[1] == -1) {
+ // Save the errno as 'close' below might overwrite it.
+ ErrnoError error("Failed to dup");
+ internal::close(stdinFd, stdoutFd, stderrFd);
+ return error;
+ }
+ break;
+ }
+ case Subprocess::IO::PIPE: {
+ if (pipe(stderrFd) == -1) {
+ // Save the errno as 'close' below might overwrite it.
+ ErrnoError error("Failed to create pipe");
+ internal::close(stdinFd, stdoutFd, stderrFd);
+ return error;
+ }
+ break;
+ }
+ case Subprocess::IO::PATH: {
+ Try<int> open = os::open(
+ err.path.get(),
+ O_WRONLY | O_CREAT | O_APPEND,
+ S_IRUSR | S_IWUSR | S_IRGRP | S_IRWXO);
+
+ if (open.isError()) {
+ internal::close(stdinFd, stdoutFd, stderrFd);
+ return Error(
+ "Failed to open '" + err.path.get() + "': " + open.error());
+ }
+ stderrFd[1] = open.get();
+ break;
+ }
+ default:
+ return UNREACHABLE();
+ }
+
+ // TODO(jieyu): Consider using O_CLOEXEC for atomic close-on-exec.
+ Try<Nothing> cloexec = internal::cloexec(stdinFd, stdoutFd, stderrFd);
+ if (cloexec.isError()) {
+ internal::close(stdinFd, stdoutFd, stderrFd);
+ return Error("Failed to cloexec: " + cloexec.error());
}
// We need to do this construction before doing the fork as it
@@ -82,13 +226,10 @@ Try<Subprocess> subprocess(
pid_t pid;
if ((pid = fork()) == -1) {
- os::close(stdinPipe[0]);
- os::close(stdinPipe[1]);
- os::close(stdoutPipe[0]);
- os::close(stdoutPipe[1]);
- os::close(stderrPipe[0]);
- os::close(stderrPipe[1]);
- return ErrnoError("Failed to fork");
+ // Save the errno as 'close' below might overwrite it.
+ ErrnoError error("Failed to fork");
+ internal::close(stdinFd, stdoutFd, stderrFd);
+ return error;
}
Subprocess process;
@@ -97,19 +238,41 @@ Try<Subprocess> subprocess(
if (process.data->pid == 0) {
// Child.
// Close parent's end of the pipes.
- os::close(stdinPipe[1]);
- os::close(stdoutPipe[0]);
- os::close(stderrPipe[0]);
+ if (in.mode == Subprocess::IO::PIPE) {
+ while (::close(stdinFd[1]) == -1 && errno == EINTR);
+ }
+ if (out.mode == Subprocess::IO::PIPE) {
+ while (::close(stdoutFd[0]) == -1 && errno == EINTR);
+ }
+ if (err.mode == Subprocess::IO::PIPE) {
+ while (::close(stderrFd[0]) == -1 && errno == EINTR);
+ }
- // Make our pipes look like stdin, stderr, stdout before we exec.
- while (dup2(stdinPipe[0], STDIN_FILENO) == -1 && errno == EINTR);
- while (dup2(stdoutPipe[1], STDOUT_FILENO) == -1 && errno == EINTR);
- while (dup2(stderrPipe[1], STDERR_FILENO) == -1 && errno == EINTR);
+ // Redirect I/O for stdin/stdout/stderr.
+ while (::dup2(stdinFd[0], STDIN_FILENO) == -1 && errno == EINTR);
+ while (::dup2(stdoutFd[1], STDOUT_FILENO) == -1 && errno == EINTR);
+ while (::dup2(stderrFd[1], STDERR_FILENO) == -1 && errno == EINTR);
- // Close the copies.
- os::close(stdinPipe[0]);
- os::close(stdoutPipe[1]);
- os::close(stderrPipe[1]);
+ // Close the copies. We need to make sure that we do not close the
+ // file descriptor assigned to stdin/stdout/stderr in case the
+ // parent has closed stdin/stdout/stderr when calling this
+ // function (in that case, a dup'ed file descriptor may have the
+ // same file descriptor number as stdin/stdout/stderr).
+ if (stdinFd[0] != STDIN_FILENO &&
+ stdinFd[0] != STDOUT_FILENO &&
+ stdinFd[0] != STDERR_FILENO) {
+ while (::close(stdinFd[0]) == -1 && errno == EINTR);
+ }
+ if (stdoutFd[1] != STDIN_FILENO &&
+ stdoutFd[1] != STDOUT_FILENO &&
+ stdoutFd[1] != STDERR_FILENO) {
+ while (::close(stdoutFd[1]) == -1 && errno == EINTR);
+ }
+ if (stderrFd[1] != STDIN_FILENO &&
+ stderrFd[1] != STDOUT_FILENO &&
+ stderrFd[1] != STDERR_FILENO) {
+ while (::close(stderrFd[1]) == -1 && errno == EINTR);
+ }
if (setup.isSome()) {
int status = setup.get()();
@@ -124,15 +287,24 @@ Try<Subprocess> subprocess(
}
// Parent.
+ // Close the file descriptors that are created by this function. For
+ // pipes, we close the child ends and store the parent ends (see the
+ // code below).
+ os::close(stdinFd[0]);
+ os::close(stdoutFd[1]);
+ os::close(stderrFd[1]);
- // Close the child's end of the pipes.
- os::close(stdinPipe[0]);
- os::close(stdoutPipe[1]);
- os::close(stderrPipe[1]);
-
- process.data->in = stdinPipe[1];
- process.data->out = stdoutPipe[0];
- process.data->err = stderrPipe[0];
+ // If the mode is PIPE, store the parent side of the pipe so that
+ // the user can communicate with the subprocess.
+ if (in.mode == Subprocess::IO::PIPE) {
+ process.data->in = stdinFd[1];
+ }
+ if (out.mode == Subprocess::IO::PIPE) {
+ process.data->out = stdoutFd[0];
+ }
+ if (err.mode == Subprocess::IO::PIPE) {
+ process.data->err = stderrFd[0];
+ }
// Rather than directly exposing the future from process::reap, we
// must use an explicit promise so that we can ensure we can receive
http://git-wip-us.apache.org/repos/asf/mesos/blob/9657fe0f/3rdparty/libprocess/src/tests/subprocess_tests.cpp
----------------------------------------------------------------------
diff --git a/3rdparty/libprocess/src/tests/subprocess_tests.cpp b/3rdparty/libprocess/src/tests/subprocess_tests.cpp
index 7d890bf..1cb1ce3 100644
--- a/3rdparty/libprocess/src/tests/subprocess_tests.cpp
+++ b/3rdparty/libprocess/src/tests/subprocess_tests.cpp
@@ -15,16 +15,22 @@
#include <stout/foreach.hpp>
#include <stout/gtest.hpp>
#include <stout/list.hpp>
-#include <stout/os/read.hpp>
#include <stout/path.hpp>
+#include <stout/os/read.hpp>
+
+#include <stout/tests/utils.hpp>
+
using namespace process;
using std::map;
using std::string;
-TEST(Subprocess, status)
+class SubprocessTest: public TemporaryDirectoryTest {};
+
+
+TEST_F(SubprocessTest, Status)
{
Clock::pause();
@@ -43,8 +49,8 @@ TEST(Subprocess, status)
ASSERT_SOME(s.get().status().get());
int status = s.get().status().get().get();
- ASSERT_TRUE(WIFEXITED(status));
- ASSERT_EQ(0, WEXITSTATUS(status));
+ EXPECT_TRUE(WIFEXITED(status));
+ EXPECT_EQ(0, WEXITSTATUS(status));
// Exit 1.
s = subprocess("exit 1");
@@ -61,8 +67,8 @@ TEST(Subprocess, status)
ASSERT_SOME(s.get().status().get());
status = s.get().status().get().get();
- ASSERT_TRUE(WIFEXITED(status));
- ASSERT_EQ(1, WEXITSTATUS(status));
+ EXPECT_TRUE(WIFEXITED(status));
+ EXPECT_EQ(1, WEXITSTATUS(status));
// SIGTERM.
s = subprocess("sleep 60");
@@ -81,8 +87,8 @@ TEST(Subprocess, status)
ASSERT_SOME(s.get().status().get());
status = s.get().status().get().get();
- ASSERT_TRUE(WIFSIGNALED(status));
- ASSERT_EQ(SIGTERM, WTERMSIG(status));
+ EXPECT_TRUE(WIFSIGNALED(status));
+ EXPECT_EQ(SIGTERM, WTERMSIG(status));
// SIGKILL.
s = subprocess("sleep 60");
@@ -101,25 +107,88 @@ TEST(Subprocess, status)
ASSERT_SOME(s.get().status().get());
status = s.get().status().get().get();
- ASSERT_TRUE(WIFSIGNALED(status));
- ASSERT_EQ(SIGKILL, WTERMSIG(status));
+ EXPECT_TRUE(WIFSIGNALED(status));
+ EXPECT_EQ(SIGKILL, WTERMSIG(status));
Clock::resume();
}
-TEST(Subprocess, output)
+TEST_F(SubprocessTest, PipeOutput)
{
Clock::pause();
// Standard out.
- Try<Subprocess> s = subprocess("echo hello");
+ Try<Subprocess> s = subprocess(
+ "echo hello",
+ Subprocess::PIPE(),
+ Subprocess::PIPE(),
+ Subprocess::PIPE());
ASSERT_SOME(s);
+ ASSERT_SOME(s.get().out());
+ ASSERT_SOME(os::nonblock(s.get().out().get()));
+ AWAIT_EXPECT_EQ("hello\n", io::read(s.get().out().get()));
+
+ // Advance time until the internal reaper reaps the subprocess.
+ while (s.get().status().isPending()) {
+ Clock::advance(Seconds(1));
+ Clock::settle();
+ }
+
+ AWAIT_ASSERT_READY(s.get().status());
+ ASSERT_SOME(s.get().status().get());
+
+ int status = s.get().status().get().get();
+ EXPECT_TRUE(WIFEXITED(status));
+ EXPECT_EQ(0, WEXITSTATUS(status));
+
+ // Standard error.
+ s = subprocess(
+ "echo hello 1>&2",
+ Subprocess::PIPE(),
+ Subprocess::PIPE(),
+ Subprocess::PIPE());
+
+ ASSERT_SOME(s);
+ ASSERT_SOME(s.get().err());
+ ASSERT_SOME(os::nonblock(s.get().err().get()));
+ AWAIT_EXPECT_EQ("hello\n", io::read(s.get().err().get()));
+
+ // Advance time until the internal reaper reaps the subprocess.
+ while (s.get().status().isPending()) {
+ Clock::advance(Seconds(1));
+ Clock::settle();
+ }
+
+ AWAIT_ASSERT_READY(s.get().status());
+ ASSERT_SOME(s.get().status().get());
+
+ status = s.get().status().get().get();
+ EXPECT_TRUE(WIFEXITED(status));
+ EXPECT_EQ(0, WEXITSTATUS(status));
- ASSERT_SOME(os::nonblock(s.get().out()));
+ Clock::resume();
+}
- AWAIT_EXPECT_EQ("hello\n", io::read(s.get().out()));
+
+TEST_F(SubprocessTest, PipeInput)
+{
+ Clock::pause();
+
+ Try<Subprocess> s = subprocess(
+ "read word ; echo $word",
+ Subprocess::PIPE(),
+ Subprocess::PIPE(),
+ Subprocess::PIPE());
+
+ ASSERT_SOME(s);
+ ASSERT_SOME(s.get().in());
+ ASSERT_SOME(os::write(s.get().in().get(), "hello\n"));
+
+ ASSERT_SOME(s.get().out());
+ ASSERT_SOME(os::nonblock(s.get().out().get()));
+ AWAIT_EXPECT_EQ("hello\n", io::read(s.get().out().get()));
// Advance time until the internal reaper reaps the subprocess.
while (s.get().status().isPending()) {
@@ -129,19 +198,41 @@ TEST(Subprocess, output)
AWAIT_ASSERT_READY(s.get().status());
ASSERT_SOME(s.get().status().get());
+
int status = s.get().status().get().get();
+ EXPECT_TRUE(WIFEXITED(status));
+ EXPECT_EQ(0, WEXITSTATUS(status));
- ASSERT_TRUE(WIFEXITED(status));
- ASSERT_EQ(0, WEXITSTATUS(status));
+ Clock::resume();
+}
- // Standard error.
- s = subprocess("echo hello 1>&2");
+
+TEST_F(SubprocessTest, PipeSplice)
+{
+ Clock::pause();
+
+ Try<Subprocess> s = subprocess(
+ "echo 'hello world'",
+ Subprocess::PIPE(),
+ Subprocess::PIPE(),
+ Subprocess::PIPE());
ASSERT_SOME(s);
- ASSERT_SOME(os::nonblock(s.get().err()));
+ // Create a temporary file for splicing into.
+ string path = path::join(os::getcwd(), "stdout");
- AWAIT_EXPECT_EQ("hello\n", io::read(s.get().err()));
+ Try<int> fd = os::open(
+ path,
+ O_WRONLY | O_CREAT | O_TRUNC,
+ S_IRUSR | S_IWUSR | S_IRGRP | S_IRWXO);
+
+ ASSERT_SOME(fd);
+ ASSERT_SOME(os::nonblock(fd.get()));
+
+ ASSERT_SOME(s.get().out());
+ ASSERT_SOME(os::nonblock(s.get().out().get()));
+ AWAIT_READY(io::splice(s.get().out().get(), fd.get()));
// Advance time until the internal reaper reaps the subprocess.
while (s.get().status().isPending()) {
@@ -151,28 +242,61 @@ TEST(Subprocess, output)
AWAIT_ASSERT_READY(s.get().status());
ASSERT_SOME(s.get().status().get());
- status = s.get().status().get().get();
- ASSERT_TRUE(WIFEXITED(status));
- ASSERT_EQ(0, WEXITSTATUS(status));
+ int status = s.get().status().get().get();
+ EXPECT_TRUE(WIFEXITED(status));
+ EXPECT_EQ(0, WEXITSTATUS(status));
+
+ // Now make sure all the data is there!
+ Try<string> read = os::read(path);
+ ASSERT_SOME(read);
+ EXPECT_EQ("hello world\n", read.get());
Clock::resume();
}
-TEST(Subprocess, input)
+TEST_F(SubprocessTest, PathOutput)
{
Clock::pause();
- Try<Subprocess> s = subprocess("read word ; echo $word");
+ string out = path::join(os::getcwd(), "stdout");
+ string err = path::join(os::getcwd(), "stderr");
+
+ // Standard out.
+ Try<Subprocess> s = subprocess(
+ "echo hello",
+ Subprocess::PIPE(),
+ Subprocess::PATH(out),
+ Subprocess::PIPE());
ASSERT_SOME(s);
- ASSERT_SOME(os::write(s.get().in(), "hello\n"));
+ // Advance time until the internal reaper reaps the subprocess.
+ while (s.get().status().isPending()) {
+ Clock::advance(Seconds(1));
+ Clock::settle();
+ }
+
+ AWAIT_ASSERT_READY(s.get().status());
+ ASSERT_SOME(s.get().status().get());
+
+ int status = s.get().status().get().get();
+ EXPECT_TRUE(WIFEXITED(status));
+ EXPECT_EQ(0, WEXITSTATUS(status));
+
+ Try<string> read = os::read(out);
+ ASSERT_SOME(read);
+ EXPECT_EQ("hello\n", read.get());
- ASSERT_SOME(os::nonblock(s.get().out()));
+ // Standard error.
+ s = subprocess(
+ "echo hello 1>&2",
+ Subprocess::PIPE(),
+ Subprocess::PIPE(),
+ Subprocess::PATH(err));
- AWAIT_EXPECT_EQ("hello\n", io::read(s.get().out()));
+ ASSERT_SOME(s);
// Advance time until the internal reaper reaps the subprocess.
while (s.get().status().isPending()) {
@@ -182,36 +306,78 @@ TEST(Subprocess, input)
AWAIT_ASSERT_READY(s.get().status());
ASSERT_SOME(s.get().status().get());
- int status = s.get().status().get().get();
- ASSERT_TRUE(WIFEXITED(status));
- ASSERT_EQ(0, WEXITSTATUS(status));
+ status = s.get().status().get().get();
+ EXPECT_TRUE(WIFEXITED(status));
+ EXPECT_EQ(0, WEXITSTATUS(status));
+
+ read = os::read(err);
+ ASSERT_SOME(read);
+ EXPECT_EQ("hello\n", read.get());
Clock::resume();
}
-TEST(Subprocess, splice)
+TEST_F(SubprocessTest, PathInput)
{
Clock::pause();
- Try<Subprocess> s = subprocess("echo 'hello world'");
+ string in = path::join(os::getcwd(), "stdin");
+
+ ASSERT_SOME(os::write(in, "hello\n"));
+
+ Try<Subprocess> s = subprocess(
+ "read word ; echo $word",
+ Subprocess::PATH(in),
+ Subprocess::PIPE(),
+ Subprocess::PIPE());
ASSERT_SOME(s);
+ ASSERT_SOME(s.get().out());
+ ASSERT_SOME(os::nonblock(s.get().out().get()));
+ AWAIT_EXPECT_EQ("hello\n", io::read(s.get().out().get()));
- // Create a temporary file for splicing into.
- Try<string> path = os::mktemp();
- ASSERT_SOME(path);
+ // Advance time until the internal reaper reaps the subprocess.
+ while (s.get().status().isPending()) {
+ Clock::advance(Seconds(1));
+ Clock::settle();
+ }
+
+ AWAIT_ASSERT_READY(s.get().status());
+ ASSERT_SOME(s.get().status().get());
+
+ int status = s.get().status().get().get();
+ EXPECT_TRUE(WIFEXITED(status));
+ EXPECT_EQ(0, WEXITSTATUS(status));
+
+ Clock::resume();
+}
- Try<int> fd = os::open(
- path.get(),
- O_WRONLY | O_CREAT | O_TRUNC, S_IRUSR | S_IWUSR | S_IRGRP | S_IRWXO);
- ASSERT_SOME(fd);
- ASSERT_SOME(os::nonblock(fd.get()));
- ASSERT_SOME(os::nonblock(s.get().out()));
+TEST_F(SubprocessTest, FdOutput)
+{
+ Clock::pause();
+
+ string out = path::join(os::getcwd(), "stdout");
+ string err = path::join(os::getcwd(), "stderr");
+
+ // Standard out.
+ Try<int> outFd = os::open(
+ out,
+ O_WRONLY | O_CREAT | O_APPEND,
+ S_IRUSR | S_IWUSR | S_IRGRP | S_IRWXO);
+
+ ASSERT_SOME(outFd);
+
+ Try<Subprocess> s = subprocess(
+ "echo hello",
+ Subprocess::PIPE(),
+ Subprocess::FD(outFd.get()),
+ Subprocess::PIPE());
- AWAIT_READY(io::splice(s.get().out(), fd.get()));
+ ASSERT_SOME(os::close(outFd.get()));
+ ASSERT_SOME(s);
// Advance time until the internal reaper reaps the subprocess.
while (s.get().status().isPending()) {
@@ -221,34 +387,101 @@ TEST(Subprocess, splice)
AWAIT_ASSERT_READY(s.get().status());
ASSERT_SOME(s.get().status().get());
+
int status = s.get().status().get().get();
+ EXPECT_TRUE(WIFEXITED(status));
+ EXPECT_EQ(0, WEXITSTATUS(status));
- ASSERT_TRUE(WIFEXITED(status));
- ASSERT_EQ(0, WEXITSTATUS(status));
+ Try<string> read = os::read(out);
+ ASSERT_SOME(read);
+ EXPECT_EQ("hello\n", read.get());
- // Now make sure all the data is there!
- Try<string> read = os::read(path.get());
+ // Standard error.
+ Try<int> errFd = os::open(
+ err,
+ O_WRONLY | O_CREAT | O_APPEND,
+ S_IRUSR | S_IWUSR | S_IRGRP | S_IRWXO);
+
+ ASSERT_SOME(errFd);
+
+ s = subprocess(
+ "echo hello 1>&2",
+ Subprocess::PIPE(),
+ Subprocess::PIPE(),
+ Subprocess::FD(errFd.get()));
+
+ ASSERT_SOME(os::close(errFd.get()));
+ ASSERT_SOME(s);
+
+ // Advance time until the internal reaper reaps the subprocess.
+ while (s.get().status().isPending()) {
+ Clock::advance(Seconds(1));
+ Clock::settle();
+ }
+
+ AWAIT_ASSERT_READY(s.get().status());
+ ASSERT_SOME(s.get().status().get());
+
+ status = s.get().status().get().get();
+ EXPECT_TRUE(WIFEXITED(status));
+ EXPECT_EQ(0, WEXITSTATUS(status));
+
+ read = os::read(err);
ASSERT_SOME(read);
- EXPECT_EQ("hello world\n", read.get());
+ EXPECT_EQ("hello\n", read.get());
Clock::resume();
}
-TEST(Subprocess, environment)
+TEST_F(SubprocessTest, FdInput)
{
Clock::pause();
- // Simple value.
- map<string, string> environment;
- environment["MESSAGE"] = "hello";
- Try<Subprocess> s = subprocess("echo $MESSAGE", environment);
+ string in = path::join(os::getcwd(), "stdin");
+
+ ASSERT_SOME(os::write(in, "hello\n"));
+
+ Try<int> inFd = os::open(in, O_RDONLY, 0);
+ ASSERT_SOME(inFd);
+
+ Try<Subprocess> s = subprocess(
+ "read word ; echo $word",
+ Subprocess::FD(inFd.get()),
+ Subprocess::PIPE(),
+ Subprocess::PIPE());
+
+ ASSERT_SOME(os::close(inFd.get()));
ASSERT_SOME(s);
+ ASSERT_SOME(s.get().out());
+ ASSERT_SOME(os::nonblock(s.get().out().get()));
+ AWAIT_EXPECT_EQ("hello\n", io::read(s.get().out().get()));
+
+ // Advance time until the internal reaper reaps the subprocess.
+ while (s.get().status().isPending()) {
+ Clock::advance(Seconds(1));
+ Clock::settle();
+ }
+
+ AWAIT_ASSERT_READY(s.get().status());
+ ASSERT_SOME(s.get().status().get());
+
+ int status = s.get().status().get().get();
+ EXPECT_TRUE(WIFEXITED(status));
+ EXPECT_EQ(0, WEXITSTATUS(status));
+
+ Clock::resume();
+}
+
- ASSERT_SOME(os::nonblock(s.get().out()));
+TEST_F(SubprocessTest, Default)
+{
+ Clock::pause();
- AWAIT_EXPECT_EQ("hello\n", io::read(s.get().out()));
+ Try<Subprocess> s = subprocess("echo hello world");
+
+ ASSERT_SOME(s);
// Advance time until the internal reaper reaps the subprocess.
while (s.get().status().isPending()) {
@@ -258,22 +491,64 @@ TEST(Subprocess, environment)
AWAIT_ASSERT_READY(s.get().status());
ASSERT_SOME(s.get().status().get());
+
int status = s.get().status().get().get();
+ EXPECT_TRUE(WIFEXITED(status));
+ EXPECT_EQ(0, WEXITSTATUS(status));
+
+ Clock::resume();
+}
+
+
+TEST_F(SubprocessTest, Environment)
+{
+ Clock::pause();
+
+ // Simple value.
+ map<string, string> environment;
+ environment["MESSAGE"] = "hello";
+
+ Try<Subprocess> s = subprocess(
+ "echo $MESSAGE",
+ Subprocess::PIPE(),
+ Subprocess::PIPE(),
+ Subprocess::PIPE(),
+ environment);
+
+ ASSERT_SOME(s);
+ ASSERT_SOME(s.get().out());
+ ASSERT_SOME(os::nonblock(s.get().out().get()));
+ AWAIT_EXPECT_EQ("hello\n", io::read(s.get().out().get()));
+
+ // Advance time until the internal reaper reaps the subprocess.
+ while (s.get().status().isPending()) {
+ Clock::advance(Seconds(1));
+ Clock::settle();
+ }
+
+ AWAIT_ASSERT_READY(s.get().status());
+ ASSERT_SOME(s.get().status().get());
- ASSERT_TRUE(WIFEXITED(status));
- ASSERT_EQ(0, WEXITSTATUS(status));
+ int status = s.get().status().get().get();
+ EXPECT_TRUE(WIFEXITED(status));
+ EXPECT_EQ(0, WEXITSTATUS(status));
// Multiple key-value pairs.
environment.clear();
environment["MESSAGE0"] = "hello";
environment["MESSAGE1"] = "world";
- s = subprocess("echo $MESSAGE0 $MESSAGE1", environment);
- ASSERT_SOME(s);
-
- ASSERT_SOME(os::nonblock(s.get().out()));
+ s = subprocess(
+ "echo $MESSAGE0 $MESSAGE1",
+ Subprocess::PIPE(),
+ Subprocess::PIPE(),
+ Subprocess::PIPE(),
+ environment);
- AWAIT_EXPECT_EQ("hello world\n", io::read(s.get().out()));
+ ASSERT_SOME(s);
+ ASSERT_SOME(s.get().out());
+ ASSERT_SOME(os::nonblock(s.get().out().get()));
+ AWAIT_EXPECT_EQ("hello world\n", io::read(s.get().out().get()));
// Advance time until the internal reaper reaps the subprocess.
while (s.get().status().isPending()) {
@@ -283,27 +558,34 @@ TEST(Subprocess, environment)
AWAIT_ASSERT_READY(s.get().status());
ASSERT_SOME(s.get().status().get());
+
status = s.get().status().get().get();
+ EXPECT_TRUE(WIFEXITED(status));
+ EXPECT_EQ(0, WEXITSTATUS(status));
- ASSERT_TRUE(WIFEXITED(status));
- ASSERT_EQ(0, WEXITSTATUS(status));
+ Clock::resume();
}
-TEST(Subprocess, environmentWithSpaces)
+TEST_F(SubprocessTest, EnvironmentWithSpaces)
{
Clock::pause();
// Spaces in value.
map<string, string> environment;
environment["MESSAGE"] = "hello world";
- Try<Subprocess> s = subprocess("echo $MESSAGE", environment);
-
- ASSERT_SOME(s);
- ASSERT_SOME(os::nonblock(s.get().out()));
+ Try<Subprocess> s = subprocess(
+ "echo $MESSAGE",
+ Subprocess::PIPE(),
+ Subprocess::PIPE(),
+ Subprocess::PIPE(),
+ environment);
- AWAIT_EXPECT_EQ("hello world\n", io::read(s.get().out()));
+ ASSERT_SOME(s);
+ ASSERT_SOME(s.get().out());
+ ASSERT_SOME(os::nonblock(s.get().out().get()));
+ AWAIT_EXPECT_EQ("hello world\n", io::read(s.get().out().get()));
// Advance time until the internal reaper reaps the subprocess.
while (s.get().status().isPending()) {
@@ -313,27 +595,34 @@ TEST(Subprocess, environmentWithSpaces)
AWAIT_ASSERT_READY(s.get().status());
ASSERT_SOME(s.get().status().get());
+
int status = s.get().status().get().get();
+ EXPECT_TRUE(WIFEXITED(status));
+ EXPECT_EQ(0, WEXITSTATUS(status));
- ASSERT_TRUE(WIFEXITED(status));
- ASSERT_EQ(0, WEXITSTATUS(status));
+ Clock::resume();
}
-TEST(Subprocess, environmentWithSpacesAndQuotes)
+TEST_F(SubprocessTest, EnvironmentWithSpacesAndQuotes)
{
Clock::pause();
// Spaces and quotes in value.
map<string, string> environment;
environment["MESSAGE"] = "\"hello world\"";
- Try<Subprocess> s = subprocess("echo $MESSAGE", environment);
-
- ASSERT_SOME(s);
- ASSERT_SOME(os::nonblock(s.get().out()));
+ Try<Subprocess> s = subprocess(
+ "echo $MESSAGE",
+ Subprocess::PIPE(),
+ Subprocess::PIPE(),
+ Subprocess::PIPE(),
+ environment);
- AWAIT_EXPECT_EQ("\"hello world\"\n", io::read(s.get().out()));
+ ASSERT_SOME(s);
+ ASSERT_SOME(s.get().out());
+ ASSERT_SOME(os::nonblock(s.get().out().get()));
+ AWAIT_EXPECT_EQ("\"hello world\"\n", io::read(s.get().out().get()));
// Advance time until the internal reaper reaps the subprocess.
while (s.get().status().isPending()) {
@@ -343,14 +632,16 @@ TEST(Subprocess, environmentWithSpacesAndQuotes)
AWAIT_ASSERT_READY(s.get().status());
ASSERT_SOME(s.get().status().get());
+
int status = s.get().status().get().get();
+ EXPECT_TRUE(WIFEXITED(status));
+ EXPECT_EQ(0, WEXITSTATUS(status));
- ASSERT_TRUE(WIFEXITED(status));
- ASSERT_EQ(0, WEXITSTATUS(status));
+ Clock::resume();
}
-TEST(Subprocess, environmentOverride)
+TEST_F(SubprocessTest, EnvironmentOverride)
{
Clock::pause();
@@ -359,13 +650,18 @@ TEST(Subprocess, environmentOverride)
map<string, string> environment;
environment["MESSAGE"] = "goodbye";
- Try<Subprocess> s = subprocess("echo $MESSAGE", environment);
- ASSERT_SOME(s);
-
- ASSERT_SOME(os::nonblock(s.get().out()));
+ Try<Subprocess> s = subprocess(
+ "echo $MESSAGE",
+ Subprocess::PIPE(),
+ Subprocess::PIPE(),
+ Subprocess::PIPE(),
+ environment);
- AWAIT_EXPECT_EQ("goodbye\n", io::read(s.get().out()));
+ ASSERT_SOME(s);
+ ASSERT_SOME(s.get().out());
+ ASSERT_SOME(os::nonblock(s.get().out().get()));
+ AWAIT_EXPECT_EQ("goodbye\n", io::read(s.get().out().get()));
// Advance time until the internal reaper reaps the subprocess.
while (s.get().status().isPending()) {
@@ -375,14 +671,16 @@ TEST(Subprocess, environmentOverride)
AWAIT_ASSERT_READY(s.get().status());
ASSERT_SOME(s.get().status().get());
+
int status = s.get().status().get().get();
+ EXPECT_TRUE(WIFEXITED(status));
+ EXPECT_EQ(0, WEXITSTATUS(status));
- ASSERT_TRUE(WIFEXITED(status));
- ASSERT_EQ(0, WEXITSTATUS(status));
+ Clock::resume();
}
-int setupChdir(const string& directory)
+static int setupChdir(const string& directory)
{
// Keep everything async-signal safe.
if (::chdir(directory.c_str()) == -1) {
@@ -393,7 +691,7 @@ int setupChdir(const string& directory)
}
-TEST(Subprocess, setup)
+TEST_F(SubprocessTest, Setup)
{
Clock::pause();
@@ -403,6 +701,9 @@ TEST(Subprocess, setup)
// chdir().
Try<Subprocess> s = subprocess(
"echo hello world > file",
+ Subprocess::PIPE(),
+ Subprocess::PIPE(),
+ Subprocess::PIPE(),
None(),
lambda::bind(&setupChdir, directory.get()));
@@ -428,19 +729,22 @@ TEST(Subprocess, setup)
}
-int setupStatus(int ret)
+static int setupStatus(int ret)
{
return ret;
}
-TEST(Subprocess, setupStatus)
+TEST_F(SubprocessTest, SetupStatus)
{
Clock::pause();
// Exit 0 && setup 1.
Try<Subprocess> s = subprocess(
"exit 0",
+ Subprocess::PIPE(),
+ Subprocess::PIPE(),
+ Subprocess::PIPE(),
None(),
lambda::bind(&setupStatus, 1));
@@ -464,6 +768,9 @@ TEST(Subprocess, setupStatus)
// Exit 1 && setup 0.
s = subprocess(
"exit 1",
+ Subprocess::PIPE(),
+ Subprocess::PIPE(),
+ Subprocess::PIPE(),
None(),
lambda::bind(&setupStatus, 0));
[2/2] git commit: Updated mesos to use the new Subprocess API.
Posted by ji...@apache.org.
Updated mesos to use the new Subprocess API.
Review: https://reviews.apache.org/r/22702
Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/83fcc8ca
Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/83fcc8ca
Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/83fcc8ca
Branch: refs/heads/master
Commit: 83fcc8ca15e8b9d19a511d64c99ec908acd7082a
Parents: 9657fe0
Author: Jie Yu <yu...@gmail.com>
Authored: Tue Jun 17 16:31:23 2014 -0700
Committer: Jie Yu <yu...@gmail.com>
Committed: Wed Jun 18 10:19:23 2014 -0700
----------------------------------------------------------------------
src/launcher/launcher.cpp | 12 +++++++++---
src/linux/perf.cpp | 15 ++++++++++-----
.../containerizer/external_containerizer.cpp | 19 +++++++++++--------
src/slave/containerizer/mesos_containerizer.cpp | 12 +++++++++---
src/tests/slave_tests.cpp | 7 ++++++-
5 files changed, 45 insertions(+), 20 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/mesos/blob/83fcc8ca/src/launcher/launcher.cpp
----------------------------------------------------------------------
diff --git a/src/launcher/launcher.cpp b/src/launcher/launcher.cpp
index 1d352b6..5585aad 100644
--- a/src/launcher/launcher.cpp
+++ b/src/launcher/launcher.cpp
@@ -206,13 +206,19 @@ process::Future<Option<int> > Operation::launch(
// Prepare the command: 'mesos-launcher <operation_name> ...'.
string command = strings::join(" ", realpath.get(), name());
- Try<Subprocess> s = subprocess(command, environment);
+ Try<Subprocess> s = subprocess(
+ command,
+ Subprocess::PIPE(),
+ Subprocess::PIPE(),
+ Subprocess::PIPE(),
+ environment);
+
if (s.isError()) {
return Failure("Launch subprocess failed: " + s.error());
}
- io::redirect(s.get().out(), stdout);
- io::redirect(s.get().err(), stderr);
+ io::redirect(s.get().out().get(), stdout);
+ io::redirect(s.get().err().get(), stderr);
return s.get().status();
}
http://git-wip-us.apache.org/repos/asf/mesos/blob/83fcc8ca/src/linux/perf.cpp
----------------------------------------------------------------------
diff --git a/src/linux/perf.cpp b/src/linux/perf.cpp
index bc31db5..fc5acf4 100644
--- a/src/linux/perf.cpp
+++ b/src/linux/perf.cpp
@@ -157,7 +157,12 @@ protected:
private:
void sample()
{
- Try<Subprocess> _perf = subprocess(command);
+ Try<Subprocess> _perf = subprocess(
+ command,
+ Subprocess::PIPE(),
+ Subprocess::PIPE(),
+ Subprocess::PIPE());
+
if (_perf.isError()) {
promise.fail("Failed to launch perf process: " + _perf.error());
terminate(self());
@@ -165,7 +170,7 @@ private:
}
perf = _perf.get();
- Try<Nothing> nonblock = os::nonblock(perf.get().out());
+ Try<Nothing> nonblock = os::nonblock(perf.get().out().get());
if (nonblock.isError()) {
promise.fail("Failed to set nonblock on stdout for perf process: " +
nonblock.error());
@@ -173,7 +178,7 @@ private:
return;
}
- nonblock = os::nonblock(perf.get().err());
+ nonblock = os::nonblock(perf.get().err().get());
if (nonblock.isError()) {
promise.fail("Failed to set nonblock on stderr for perf process: " +
nonblock.error());
@@ -184,8 +189,8 @@ private:
// Start reading from stdout and stderr now. We don't use stderr
// but must read from it to avoid the subprocess blocking on the
// pipe.
- output.push_back(process::io::read(perf.get().out()));
- output.push_back(process::io::read(perf.get().err()));
+ output.push_back(process::io::read(perf.get().out().get()));
+ output.push_back(process::io::read(perf.get().err().get()));
// Wait for the process to exit.
perf.get().status()
http://git-wip-us.apache.org/repos/asf/mesos/blob/83fcc8ca/src/slave/containerizer/external_containerizer.cpp
----------------------------------------------------------------------
diff --git a/src/slave/containerizer/external_containerizer.cpp b/src/slave/containerizer/external_containerizer.cpp
index b5d0c4c..923046c 100644
--- a/src/slave/containerizer/external_containerizer.cpp
+++ b/src/slave/containerizer/external_containerizer.cpp
@@ -626,7 +626,7 @@ Future<containerizer::Termination> ExternalContainerizerProcess::_wait(
&::protobuf::read<containerizer::Termination>;
Future<Result<containerizer::Termination> > future = async(
- read, invoked.get().out(), false, false);
+ read, invoked.get().out().get(), false, false);
// Await both, a protobuf Message from the subprocess as well as
// its exit.
@@ -817,7 +817,7 @@ Future<ResourceStatistics> ExternalContainerizerProcess::_usage(
&::protobuf::read<ResourceStatistics>;
Future<Result<ResourceStatistics> > future = async(
- read, invoked.get().out(), false, false);
+ read, invoked.get().out().get(), false, false);
// Await both, a protobuf Message from the subprocess as well as
// its exit.
@@ -955,7 +955,7 @@ Future<hashset<ContainerID> > ExternalContainerizerProcess::containers()
&::protobuf::read<containerizer::Containers>;
Future<Result<containerizer::Containers> > future = async(
- read, invoked.get().out(), false, false);
+ read, invoked.get().out().get(), false, false);
// Await both, a protobuf Message from the subprocess as well as
// its exit.
@@ -1116,6 +1116,9 @@ Try<Subprocess> ExternalContainerizerProcess::invoke(
// the child-context.
Try<Subprocess> external = process::subprocess(
execute,
+ Subprocess::PIPE(),
+ Subprocess::PIPE(),
+ Subprocess::PIPE(),
environment,
lambda::bind(&setup, sandbox.isSome() ? sandbox.get().directory
: string()));
@@ -1128,11 +1131,11 @@ Try<Subprocess> ExternalContainerizerProcess::invoke(
// Sync parent and child process to make sure we have done the
// setsid within the child context before continuing.
int sync;
- while (::read(external.get().out(), &sync, sizeof(sync)) == -1 &&
+ while (::read(external.get().out().get(), &sync, sizeof(sync)) == -1 &&
errno == EINTR);
// Set stderr into non-blocking mode.
- Try<Nothing> nonblock = os::nonblock(external.get().err());
+ Try<Nothing> nonblock = os::nonblock(external.get().err().get());
if (nonblock.isError()) {
return Error("Failed to accept nonblock: " + nonblock.error());
}
@@ -1170,14 +1173,14 @@ Try<Subprocess> ExternalContainerizerProcess::invoke(
// TODO(tillt): Consider adding an overload to io::redirect
// that accepts a file path as 'to' for further reducing code.
- io::redirect(external.get().err(), err.get());
+ io::redirect(external.get().err().get(), err.get());
// Redirect does 'dup' the file descriptor, hence we can close the
// original now.
os::close(err.get());
VLOG(2) << "Subprocess pid: " << external.get().pid() << ", "
- << "output pipe: " << external.get().out();
+ << "output pipe: " << external.get().out().get();
return external;
}
@@ -1196,7 +1199,7 @@ Try<Subprocess> ExternalContainerizerProcess::invoke(
// Transmit protobuf data via stdout towards the external
// containerizer. Each message is prefixed by its total size.
- Try<Nothing> write = ::protobuf::write(external.get().in(), message);
+ Try<Nothing> write = ::protobuf::write(external.get().in().get(), message);
if (write.isError()) {
return Error("Failed to write protobuf to pipe: " + write.error());
}
http://git-wip-us.apache.org/repos/asf/mesos/blob/83fcc8ca/src/slave/containerizer/mesos_containerizer.cpp
----------------------------------------------------------------------
diff --git a/src/slave/containerizer/mesos_containerizer.cpp b/src/slave/containerizer/mesos_containerizer.cpp
index 4d97d49..e5c159d 100644
--- a/src/slave/containerizer/mesos_containerizer.cpp
+++ b/src/slave/containerizer/mesos_containerizer.cpp
@@ -658,7 +658,13 @@ Future<Nothing> MesosContainerizerProcess::fetch(
LOG(INFO) << "Fetching URIs for container '" << containerId
<< "' using command '" << command << "'";
- Try<Subprocess> fetcher = subprocess(command, environment);
+ Try<Subprocess> fetcher = subprocess(
+ command,
+ Subprocess::PIPE(),
+ Subprocess::PIPE(),
+ Subprocess::PIPE(),
+ environment);
+
if (fetcher.isError()) {
return Failure("Failed to execute mesos-fetcher: " + fetcher.error());
}
@@ -693,7 +699,7 @@ Future<Nothing> MesosContainerizerProcess::fetch(
// Redirect takes care of nonblocking and close-on-exec for the
// supplied file descriptors.
- io::redirect(fetcher.get().out(), out.get());
+ io::redirect(fetcher.get().out().get(), out.get());
// Redirect does 'dup' the file descriptor, hence we can close the
// original now.
@@ -722,7 +728,7 @@ Future<Nothing> MesosContainerizerProcess::fetch(
}
}
- io::redirect(fetcher.get().err(), err.get());
+ io::redirect(fetcher.get().err().get(), err.get());
os::close(err.get());
http://git-wip-us.apache.org/repos/asf/mesos/blob/83fcc8ca/src/tests/slave_tests.cpp
----------------------------------------------------------------------
diff --git a/src/tests/slave_tests.cpp b/src/tests/slave_tests.cpp
index 9178e01..9dc4046 100644
--- a/src/tests/slave_tests.cpp
+++ b/src/tests/slave_tests.cpp
@@ -346,7 +346,12 @@ TEST_F(SlaveTest, MesosExecutorWithOverride)
.WillOnce(FutureArg<1>(&status2));
Try<process::Subprocess> executor =
- process::subprocess(executorCommand, environment);
+ process::subprocess(
+ executorCommand,
+ process::Subprocess::PIPE(),
+ process::Subprocess::PIPE(),
+ process::Subprocess::PIPE(),
+ environment);
ASSERT_SOME(executor);