You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@mesos.apache.org by bm...@apache.org on 2014/01/30 03:05:27 UTC
[6/6] git commit: Added an asynchronous subprocess utility.
Added an asynchronous subprocess utility.
Review: https://reviews.apache.org/r/17306
Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/0478c7f9
Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/0478c7f9
Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/0478c7f9
Branch: refs/heads/master
Commit: 0478c7f9292e75a30ef02327ff5698359d0c7235
Parents: c0af398
Author: Benjamin Mahler <bm...@twitter.com>
Authored: Thu Jan 23 22:36:53 2014 -0800
Committer: Benjamin Mahler <bm...@twitter.com>
Committed: Wed Jan 29 17:12:29 2014 -0800
----------------------------------------------------------------------
3rdparty/libprocess/Makefile.am | 2 +
.../libprocess/include/process/subprocess.hpp | 195 +++++++++++++++++++
.../libprocess/src/tests/subprocess_tests.cpp | 189 ++++++++++++++++++
3 files changed, 386 insertions(+)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/mesos/blob/0478c7f9/3rdparty/libprocess/Makefile.am
----------------------------------------------------------------------
diff --git a/3rdparty/libprocess/Makefile.am b/3rdparty/libprocess/Makefile.am
index 4a35f91..bbd17cc 100644
--- a/3rdparty/libprocess/Makefile.am
+++ b/3rdparty/libprocess/Makefile.am
@@ -93,6 +93,7 @@ libprocess_la_SOURCES += \
$(top_srcdir)/include/process/shared.hpp \
$(top_srcdir)/include/process/socket.hpp \
$(top_srcdir)/include/process/statistics.hpp \
+ $(top_srcdir)/include/process/subprocess.hpp \
$(top_srcdir)/include/process/time.hpp \
$(top_srcdir)/include/process/timeout.hpp \
$(top_srcdir)/include/process/timer.hpp
@@ -111,6 +112,7 @@ tests_SOURCES = \
src/tests/reap_tests.cpp \
src/tests/shared_tests.cpp \
src/tests/statistics_tests.cpp \
+ src/tests/subprocess_tests.cpp \
src/tests/timeseries_tests.cpp \
src/tests/time_tests.cpp
http://git-wip-us.apache.org/repos/asf/mesos/blob/0478c7f9/3rdparty/libprocess/include/process/subprocess.hpp
----------------------------------------------------------------------
diff --git a/3rdparty/libprocess/include/process/subprocess.hpp b/3rdparty/libprocess/include/process/subprocess.hpp
new file mode 100644
index 0000000..db9c96b
--- /dev/null
+++ b/3rdparty/libprocess/include/process/subprocess.hpp
@@ -0,0 +1,195 @@
+#ifndef __PROCESS_SUBPROCESS_HPP__
+#define __PROCESS_SUBPROCESS_HPP__
+
+#include <unistd.h>
+
+#include <glog/logging.h>
+
+#include <sys/types.h>
+
+#include <string>
+
+#include <process/future.hpp>
+#include <process/reap.hpp>
+
+#include <stout/error.hpp>
+#include <stout/lambda.hpp>
+#include <stout/memory.hpp>
+#include <stout/nothing.hpp>
+#include <stout/option.hpp>
+#include <stout/try.hpp>
+
+namespace process {
+
+// Represents a fork() exec()ed subprocess. Access is provided to
+// the input / output of the process, as well as the exit status.
+// The input / output file descriptors are only closed after both:
+// 1. The subprocess has terminated, and
+// 2. There are no longer any references to the associated
+// Subprocess object.
+struct Subprocess
+{
+ // Returns the pid for the subprocess.
+ pid_t pid() const { return data->pid; }
+
+ // File descriptor accessors for input / output.
+ int in() const { return data->in; }
+ int out() const { return data->out; }
+ int err() const { return data->err; }
+
+ // Returns a future from process::reap of this subprocess.
+ // Discarding this future has no effect on the subprocess.
+ Future<Option<int> > status() const { return data->status; }
+
+private:
+ Subprocess() : data(new Data()) {}
+ friend Try<Subprocess> subprocess(const std::string&);
+
+ struct Data
+ {
+ ~Data()
+ {
+ os::close(in);
+ os::close(out);
+ os::close(err);
+ }
+
+ pid_t pid;
+
+ // NOTE: stdin, stdout, stderr are macros on some systems, hence
+ // these names instead.
+ int in;
+ int out;
+ int err;
+
+ Future<Option<int> > status;
+ };
+
+ memory::shared_ptr<Data> data;
+};
+
+
+namespace internal {
+
+// See the comment below as to why subprocess is passed to cleanup.
+void cleanup(
+ const Future<Option<int> >& result,
+ Promise<Option<int> >* promise,
+ const Subprocess& subprocess)
+{
+ CHECK(!result.isPending());
+ CHECK(!result.isDiscarded());
+
+ if (result.isFailed()) {
+ promise->fail(result.failure());
+ } else {
+ promise->set(result.get());
+ }
+
+ delete promise;
+}
+
+}
+
+
+// Runs the provided command in a subprocess.
+inline Try<Subprocess> subprocess(const std::string& command)
+{
+ // Create pipes for stdin, stdout, stderr.
+ // Index 0 is for reading, and index 1 is for writing.
+ int stdinPipe[2];
+ int stdoutPipe[2];
+ int stderrPipe[2];
+
+ if (pipe(stdinPipe) == -1) {
+ return ErrnoError("Failed to create pipe");
+ } else if (pipe(stdoutPipe) == -1) {
+ os::close(stdinPipe[0]);
+ os::close(stdinPipe[1]);
+ return ErrnoError("Failed to create pipe");
+ } else if (pipe(stderrPipe) == -1) {
+ os::close(stdinPipe[0]);
+ os::close(stdinPipe[1]);
+ os::close(stdoutPipe[0]);
+ os::close(stdoutPipe[1]);
+ return ErrnoError("Failed to create pipe");
+ }
+
+ pid_t pid;
+ if ((pid = fork()) == -1) {
+ os::close(stdinPipe[0]);
+ os::close(stdinPipe[1]);
+ os::close(stdoutPipe[0]);
+ os::close(stdoutPipe[1]);
+ os::close(stderrPipe[0]);
+ os::close(stderrPipe[1]);
+ return ErrnoError("Failed to fork");
+ }
+
+ Subprocess process;
+ process.data->pid = pid;
+
+ if (process.data->pid == 0) {
+ // Child.
+ // Close parent's end of the pipes.
+ os::close(stdinPipe[1]);
+ os::close(stdoutPipe[0]);
+ os::close(stderrPipe[0]);
+
+ // Make our pipes look like stdin, stderr, stdout before we exec.
+ while (dup2(stdinPipe[0], STDIN_FILENO) == -1 && errno == EINTR);
+ while (dup2(stdoutPipe[1], STDOUT_FILENO) == -1 && errno == EINTR);
+ while (dup2(stderrPipe[1], STDERR_FILENO) == -1 && errno == EINTR);
+
+ // Close the copies.
+ os::close(stdinPipe[0]);
+ os::close(stdoutPipe[1]);
+ os::close(stderrPipe[1]);
+
+ execl("/bin/sh", "sh", "-c", command.c_str(), (char *) NULL);
+
+ // Write the failure message in an async-signal safe manner,
+ // assuming strlen is async-signal safe or optimized out.
+ // In fact, it is highly unlikely that strlen would be
+ // implemented in an unsafe manner:
+ // http://austingroupbugs.net/view.php?id=692
+ const char* message = "Failed to execl '/bin sh -c ";
+ write(STDERR_FILENO, message, strlen(message));
+ write(STDERR_FILENO, command.c_str(), command.size());
+ write(STDERR_FILENO, "'\n", strlen("'\n"));
+
+ _exit(1);
+ }
+
+ // Parent.
+
+ // Close the child's end of the pipes.
+ os::close(stdinPipe[0]);
+ os::close(stdoutPipe[1]);
+ os::close(stderrPipe[1]);
+
+ process.data->in = stdinPipe[1];
+ process.data->out = stdoutPipe[0];
+ process.data->err = stderrPipe[0];
+
+ // Rather than directly exposing the future from process::reap, we
+ // must use an explicit promise so that we can ensure we can receive
+ // the termination signal. Otherwise, the caller can discard the
+ // reap future, and we will not know when it is safe to close the
+ // file descriptors.
+ Promise<Option<int> >* promise = new Promise<Option<int> >();
+ process.data->status = promise->future();
+
+ // We need to bind a copy of this Subprocess into the onAny callback
+ // below to ensure that we don't close the file descriptors before
+ // the subprocess has terminated (i.e., because the caller doesn't
+ // keep a copy of this Subprocess around themselves).
+ process::reap(process.data->pid)
+ .onAny(lambda::bind(internal::cleanup, lambda::_1, promise, process));
+
+ return process;
+}
+
+} // namespace process {
+
+#endif // __PROCESS_SUBPROCESS_HPP__
http://git-wip-us.apache.org/repos/asf/mesos/blob/0478c7f9/3rdparty/libprocess/src/tests/subprocess_tests.cpp
----------------------------------------------------------------------
diff --git a/3rdparty/libprocess/src/tests/subprocess_tests.cpp b/3rdparty/libprocess/src/tests/subprocess_tests.cpp
new file mode 100644
index 0000000..d2cf7f5
--- /dev/null
+++ b/3rdparty/libprocess/src/tests/subprocess_tests.cpp
@@ -0,0 +1,189 @@
+#include <signal.h>
+
+#include <gmock/gmock.h>
+
+#include <sys/types.h>
+
+#include <string>
+
+#include <process/gmock.hpp>
+#include <process/gtest.hpp>
+#include <process/io.hpp>
+#include <process/subprocess.hpp>
+
+#include <stout/foreach.hpp>
+#include <stout/gtest.hpp>
+#include <stout/list.hpp>
+#include <stout/os/read.hpp>
+
+using namespace process;
+
+using std::string;
+
+
+TEST(Subprocess, status)
+{
+ Clock::pause();
+
+ // Exit 0.
+ Try<Subprocess> s = subprocess("exit 0");
+
+ ASSERT_SOME(s);
+
+ // Advance time until the internal reaper reaps the subprocess.
+ while (s.get().status().isPending()) {
+ Clock::advance(Seconds(1));
+ Clock::settle();
+ }
+
+ AWAIT_ASSERT_READY(s.get().status());
+ ASSERT_SOME(s.get().status().get());
+
+ int status = s.get().status().get().get();
+ ASSERT_TRUE(WIFEXITED(status));
+ ASSERT_EQ(0, WEXITSTATUS(status));
+
+ // Exit 1.
+ s = subprocess("exit 1");
+
+ ASSERT_SOME(s);
+
+ // Advance time until the internal reaper reaps the subprocess.
+ while (s.get().status().isPending()) {
+ Clock::advance(Seconds(1));
+ Clock::settle();
+ }
+
+ AWAIT_ASSERT_READY(s.get().status());
+ ASSERT_SOME(s.get().status().get());
+
+ status = s.get().status().get().get();
+ ASSERT_TRUE(WIFEXITED(status));
+ ASSERT_EQ(1, WEXITSTATUS(status));
+
+ // SIGTERM.
+ s = subprocess("sleep 60");
+
+ ASSERT_SOME(s);
+
+ kill(s.get().pid(), SIGTERM);
+
+ // Advance time until the internal reaper reaps the subprocess.
+ while (s.get().status().isPending()) {
+ Clock::advance(Seconds(1));
+ Clock::settle();
+ }
+
+ AWAIT_ASSERT_READY(s.get().status());
+ ASSERT_SOME(s.get().status().get());
+
+ status = s.get().status().get().get();
+ ASSERT_TRUE(WIFSIGNALED(status));
+ ASSERT_EQ(SIGTERM, WTERMSIG(status));
+
+ // SIGKILL.
+ s = subprocess("sleep 60");
+
+ ASSERT_SOME(s);
+
+ kill(s.get().pid(), SIGKILL);
+
+ // Advance time until the internal reaper reaps the subprocess.
+ while (s.get().status().isPending()) {
+ Clock::advance(Seconds(1));
+ Clock::settle();
+ }
+
+ AWAIT_ASSERT_READY(s.get().status());
+ ASSERT_SOME(s.get().status().get());
+
+ status = s.get().status().get().get();
+ ASSERT_TRUE(WIFSIGNALED(status));
+ ASSERT_EQ(SIGKILL, WTERMSIG(status));
+
+ Clock::resume();
+}
+
+
+
+TEST(Subprocess, output)
+{
+ Clock::pause();
+
+ // Standard out.
+ Try<Subprocess> s = subprocess("echo hello");
+
+ ASSERT_SOME(s);
+
+ ASSERT_SOME(os::nonblock(s.get().out()));
+
+ AWAIT_EXPECT_EQ("hello\n", io::read(s.get().out()));
+
+ // Advance time until the internal reaper reaps the subprocess.
+ while (s.get().status().isPending()) {
+ Clock::advance(Seconds(1));
+ Clock::settle();
+ }
+
+ AWAIT_ASSERT_READY(s.get().status());
+ ASSERT_SOME(s.get().status().get());
+ int status = s.get().status().get().get();
+
+ ASSERT_TRUE(WIFEXITED(status));
+ ASSERT_EQ(0, WEXITSTATUS(status));
+
+ // Standard error.
+ s = subprocess("echo hello 1>&2");
+
+ ASSERT_SOME(s);
+
+ ASSERT_SOME(os::nonblock(s.get().err()));
+
+ AWAIT_EXPECT_EQ("hello\n", io::read(s.get().err()));
+
+ // Advance time until the internal reaper reaps the subprocess.
+ while (s.get().status().isPending()) {
+ Clock::advance(Seconds(1));
+ Clock::settle();
+ }
+
+ AWAIT_ASSERT_READY(s.get().status());
+ ASSERT_SOME(s.get().status().get());
+ status = s.get().status().get().get();
+
+ ASSERT_TRUE(WIFEXITED(status));
+ ASSERT_EQ(0, WEXITSTATUS(status));
+
+ Clock::resume();
+}
+
+
+TEST(Subprocess, input)
+{
+ Clock::pause();
+
+ Try<Subprocess> s = subprocess("read word ; echo $word");
+
+ ASSERT_SOME(s);
+
+ ASSERT_SOME(os::write(s.get().in(), "hello\n"));
+
+ ASSERT_SOME(os::nonblock(s.get().out()));
+
+ AWAIT_EXPECT_EQ("hello\n", io::read(s.get().out()));
+
+ // Advance time until the internal reaper reaps the subprocess.
+ while (s.get().status().isPending()) {
+ Clock::advance(Seconds(1));
+ Clock::settle();
+ }
+
+ AWAIT_ASSERT_READY(s.get().status());
+ ASSERT_SOME(s.get().status().get());
+ int status = s.get().status().get().get();
+
+ ASSERT_TRUE(WIFEXITED(status));
+ ASSERT_EQ(0, WEXITSTATUS(status));
+
+ Clock::resume();
+}