You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@mesos.apache.org by bm...@apache.org on 2014/01/30 03:05:27 UTC

[6/6] git commit: Added an asynchronous subprocess utility.

Added an asynchronous subprocess utility.

Review: https://reviews.apache.org/r/17306


Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/0478c7f9
Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/0478c7f9
Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/0478c7f9

Branch: refs/heads/master
Commit: 0478c7f9292e75a30ef02327ff5698359d0c7235
Parents: c0af398
Author: Benjamin Mahler <bm...@twitter.com>
Authored: Thu Jan 23 22:36:53 2014 -0800
Committer: Benjamin Mahler <bm...@twitter.com>
Committed: Wed Jan 29 17:12:29 2014 -0800

----------------------------------------------------------------------
 3rdparty/libprocess/Makefile.am                 |   2 +
 .../libprocess/include/process/subprocess.hpp   | 195 +++++++++++++++++++
 .../libprocess/src/tests/subprocess_tests.cpp   | 189 ++++++++++++++++++
 3 files changed, 386 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/mesos/blob/0478c7f9/3rdparty/libprocess/Makefile.am
----------------------------------------------------------------------
diff --git a/3rdparty/libprocess/Makefile.am b/3rdparty/libprocess/Makefile.am
index 4a35f91..bbd17cc 100644
--- a/3rdparty/libprocess/Makefile.am
+++ b/3rdparty/libprocess/Makefile.am
@@ -93,6 +93,7 @@ libprocess_la_SOURCES +=					\
   $(top_srcdir)/include/process/shared.hpp			\
   $(top_srcdir)/include/process/socket.hpp			\
   $(top_srcdir)/include/process/statistics.hpp			\
+  $(top_srcdir)/include/process/subprocess.hpp			\
   $(top_srcdir)/include/process/time.hpp			\
   $(top_srcdir)/include/process/timeout.hpp			\
   $(top_srcdir)/include/process/timer.hpp
@@ -111,6 +112,7 @@ tests_SOURCES =							\
   src/tests/reap_tests.cpp					\
   src/tests/shared_tests.cpp					\
   src/tests/statistics_tests.cpp				\
+  src/tests/subprocess_tests.cpp				\
   src/tests/timeseries_tests.cpp				\
   src/tests/time_tests.cpp
 

http://git-wip-us.apache.org/repos/asf/mesos/blob/0478c7f9/3rdparty/libprocess/include/process/subprocess.hpp
----------------------------------------------------------------------
diff --git a/3rdparty/libprocess/include/process/subprocess.hpp b/3rdparty/libprocess/include/process/subprocess.hpp
new file mode 100644
index 0000000..db9c96b
--- /dev/null
+++ b/3rdparty/libprocess/include/process/subprocess.hpp
@@ -0,0 +1,195 @@
+#ifndef __PROCESS_SUBPROCESS_HPP__
+#define __PROCESS_SUBPROCESS_HPP__
+
+#include <unistd.h>
+
+#include <glog/logging.h>
+
+#include <sys/types.h>
+
+#include <string>
+
+#include <process/future.hpp>
+#include <process/reap.hpp>
+
+#include <stout/error.hpp>
+#include <stout/lambda.hpp>
+#include <stout/memory.hpp>
+#include <stout/nothing.hpp>
+#include <stout/option.hpp>
+#include <stout/try.hpp>
+
+namespace process {
+
+// Represents a fork() exec()ed subprocess. Access is provided to
+// the input / output of the process, as well as the exit status.
+// The input / output file descriptors are only closed after both:
+//   1. The subprocess has terminated, and
+//   2. There are no longer any references to the associated
+//      Subprocess object.
+struct Subprocess
+{
+  // Returns the pid for the subprocess.
+  pid_t pid() const { return data->pid; }
+
+  // File descriptor accessors for input / output.
+  int in()  const { return data->in;  }
+  int out() const { return data->out; }
+  int err() const { return data->err; }
+
+  // Returns a future from process::reap of this subprocess.
+  // Discarding this future has no effect on the subprocess.
+  Future<Option<int> > status() const { return data->status; }
+
+private:
+  Subprocess() : data(new Data()) {}
+  friend Try<Subprocess> subprocess(const std::string&);
+
+  struct Data
+  {
+    ~Data()
+    {
+      os::close(in);
+      os::close(out);
+      os::close(err);
+    }
+
+    pid_t pid;
+
+    // NOTE: stdin, stdout, stderr are macros on some systems, hence
+    // these names instead.
+    int in;
+    int out;
+    int err;
+
+    Future<Option<int> > status;
+  };
+
+  memory::shared_ptr<Data> data;
+};
+
+
+namespace internal {
+
+// See the comment below as to why subprocess is passed to cleanup.
+void cleanup(
+    const Future<Option<int> >& result,
+    Promise<Option<int> >* promise,
+    const Subprocess& subprocess)
+{
+  CHECK(!result.isPending());
+  CHECK(!result.isDiscarded());
+
+  if (result.isFailed()) {
+    promise->fail(result.failure());
+  } else {
+    promise->set(result.get());
+  }
+
+  delete promise;
+}
+
+}
+
+
+// Runs the provided command in a subprocess.
+inline Try<Subprocess> subprocess(const std::string& command)
+{
+  // Create pipes for stdin, stdout, stderr.
+  // Index 0 is for reading, and index 1 is for writing.
+  int stdinPipe[2];
+  int stdoutPipe[2];
+  int stderrPipe[2];
+
+  if (pipe(stdinPipe) == -1) {
+    return ErrnoError("Failed to create pipe");
+  } else if (pipe(stdoutPipe) == -1) {
+    os::close(stdinPipe[0]);
+    os::close(stdinPipe[1]);
+    return ErrnoError("Failed to create pipe");
+  } else if (pipe(stderrPipe) == -1) {
+    os::close(stdinPipe[0]);
+    os::close(stdinPipe[1]);
+    os::close(stdoutPipe[0]);
+    os::close(stdoutPipe[1]);
+    return ErrnoError("Failed to create pipe");
+  }
+
+  pid_t pid;
+  if ((pid = fork()) == -1) {
+    os::close(stdinPipe[0]);
+    os::close(stdinPipe[1]);
+    os::close(stdoutPipe[0]);
+    os::close(stdoutPipe[1]);
+    os::close(stderrPipe[0]);
+    os::close(stderrPipe[1]);
+    return ErrnoError("Failed to fork");
+  }
+
+  Subprocess process;
+  process.data->pid = pid;
+
+  if (process.data->pid == 0) {
+    // Child.
+    // Close parent's end of the pipes.
+    os::close(stdinPipe[1]);
+    os::close(stdoutPipe[0]);
+    os::close(stderrPipe[0]);
+
+    // Make our pipes look like stdin, stderr, stdout before we exec.
+    while (dup2(stdinPipe[0], STDIN_FILENO)   == -1 && errno == EINTR);
+    while (dup2(stdoutPipe[1], STDOUT_FILENO) == -1 && errno == EINTR);
+    while (dup2(stderrPipe[1], STDERR_FILENO) == -1 && errno == EINTR);
+
+    // Close the copies.
+    os::close(stdinPipe[0]);
+    os::close(stdoutPipe[1]);
+    os::close(stderrPipe[1]);
+
+    execl("/bin/sh", "sh", "-c", command.c_str(), (char *) NULL);
+
+    // Write the failure message in an async-signal safe manner,
+    // assuming strlen is async-signal safe or optimized out.
+    // In fact, it is highly unlikely that strlen would be
+    // implemented in an unsafe manner:
+    // http://austingroupbugs.net/view.php?id=692
+    const char* message = "Failed to execl '/bin sh -c ";
+    write(STDERR_FILENO, message, strlen(message));
+    write(STDERR_FILENO, command.c_str(), command.size());
+    write(STDERR_FILENO, "'\n", strlen("'\n"));
+
+    _exit(1);
+  }
+
+  // Parent.
+
+  // Close the child's end of the pipes.
+  os::close(stdinPipe[0]);
+  os::close(stdoutPipe[1]);
+  os::close(stderrPipe[1]);
+
+  process.data->in = stdinPipe[1];
+  process.data->out = stdoutPipe[0];
+  process.data->err = stderrPipe[0];
+
+  // Rather than directly exposing the future from process::reap, we
+  // must use an explicit promise so that we can ensure we can receive
+  // the termination signal. Otherwise, the caller can discard the
+  // reap future, and we will not know when it is safe to close the
+  // file descriptors.
+  Promise<Option<int> >* promise = new Promise<Option<int> >();
+  process.data->status = promise->future();
+
+  // We need to bind a copy of this Subprocess into the onAny callback
+  // below to ensure that we don't close the file descriptors before
+  // the subprocess has terminated (i.e., because the caller doesn't
+  // keep a copy of this Subprocess around themselves).
+  process::reap(process.data->pid)
+    .onAny(lambda::bind(internal::cleanup, lambda::_1, promise, process));
+
+  return process;
+}
+
+} // namespace process {
+
+#endif // __PROCESS_SUBPROCESS_HPP__

http://git-wip-us.apache.org/repos/asf/mesos/blob/0478c7f9/3rdparty/libprocess/src/tests/subprocess_tests.cpp
----------------------------------------------------------------------
diff --git a/3rdparty/libprocess/src/tests/subprocess_tests.cpp b/3rdparty/libprocess/src/tests/subprocess_tests.cpp
new file mode 100644
index 0000000..d2cf7f5
--- /dev/null
+++ b/3rdparty/libprocess/src/tests/subprocess_tests.cpp
@@ -0,0 +1,189 @@
+#include <signal.h>
+
+#include <gmock/gmock.h>
+
+#include <sys/types.h>
+
+#include <string>
+
+#include <process/gmock.hpp>
+#include <process/gtest.hpp>
+#include <process/io.hpp>
+#include <process/subprocess.hpp>
+
+#include <stout/foreach.hpp>
+#include <stout/gtest.hpp>
+#include <stout/list.hpp>
+#include <stout/os/read.hpp>
+
+using namespace process;
+
+using std::string;
+
+
+TEST(Subprocess, status)
+{
+  Clock::pause();
+
+  // Exit 0.
+  Try<Subprocess> s = subprocess("exit 0");
+
+  ASSERT_SOME(s);
+
+  // Advance time until the internal reaper reaps the subprocess.
+  while (s.get().status().isPending()) {
+    Clock::advance(Seconds(1));
+    Clock::settle();
+  }
+
+  AWAIT_ASSERT_READY(s.get().status());
+  ASSERT_SOME(s.get().status().get());
+
+  int status = s.get().status().get().get();
+  ASSERT_TRUE(WIFEXITED(status));
+  ASSERT_EQ(0, WEXITSTATUS(status));
+
+  // Exit 1.
+  s = subprocess("exit 1");
+
+  ASSERT_SOME(s);
+
+  // Advance time until the internal reaper reaps the subprocess.
+  while (s.get().status().isPending()) {
+    Clock::advance(Seconds(1));
+    Clock::settle();
+  }
+
+  AWAIT_ASSERT_READY(s.get().status());
+  ASSERT_SOME(s.get().status().get());
+
+  status = s.get().status().get().get();
+  ASSERT_TRUE(WIFEXITED(status));
+  ASSERT_EQ(1, WEXITSTATUS(status));
+
+  // SIGTERM.
+  s = subprocess("sleep 60");
+
+  ASSERT_SOME(s);
+
+  kill(s.get().pid(), SIGTERM);
+
+  // Advance time until the internal reaper reaps the subprocess.
+  while (s.get().status().isPending()) {
+    Clock::advance(Seconds(1));
+    Clock::settle();
+  }
+
+  AWAIT_ASSERT_READY(s.get().status());
+  ASSERT_SOME(s.get().status().get());
+
+  status = s.get().status().get().get();
+  ASSERT_TRUE(WIFSIGNALED(status));
+  ASSERT_EQ(SIGTERM, WTERMSIG(status));
+
+  // SIGKILL.
+  s = subprocess("sleep 60");
+
+  ASSERT_SOME(s);
+
+  kill(s.get().pid(), SIGKILL);
+
+  // Advance time until the internal reaper reaps the subprocess.
+  while (s.get().status().isPending()) {
+    Clock::advance(Seconds(1));
+    Clock::settle();
+  }
+
+  AWAIT_ASSERT_READY(s.get().status());
+  ASSERT_SOME(s.get().status().get());
+
+  status = s.get().status().get().get();
+  ASSERT_TRUE(WIFSIGNALED(status));
+  ASSERT_EQ(SIGKILL, WTERMSIG(status));
+
+  Clock::resume();
+}
+
+
+
+TEST(Subprocess, output)
+{
+  Clock::pause();
+
+  // Standard out.
+  Try<Subprocess> s = subprocess("echo hello");
+
+  ASSERT_SOME(s);
+
+  ASSERT_SOME(os::nonblock(s.get().out()));
+
+  AWAIT_EXPECT_EQ("hello\n", io::read(s.get().out()));
+
+  // Advance time until the internal reaper reaps the subprocess.
+  while (s.get().status().isPending()) {
+    Clock::advance(Seconds(1));
+    Clock::settle();
+  }
+
+  AWAIT_ASSERT_READY(s.get().status());
+  ASSERT_SOME(s.get().status().get());
+  int status = s.get().status().get().get();
+
+  ASSERT_TRUE(WIFEXITED(status));
+  ASSERT_EQ(0, WEXITSTATUS(status));
+
+  // Standard error.
+  s = subprocess("echo hello 1>&2");
+
+  ASSERT_SOME(s);
+
+  ASSERT_SOME(os::nonblock(s.get().err()));
+
+  AWAIT_EXPECT_EQ("hello\n", io::read(s.get().err()));
+
+  // Advance time until the internal reaper reaps the subprocess.
+  while (s.get().status().isPending()) {
+    Clock::advance(Seconds(1));
+    Clock::settle();
+  }
+
+  AWAIT_ASSERT_READY(s.get().status());
+  ASSERT_SOME(s.get().status().get());
+  status = s.get().status().get().get();
+
+  ASSERT_TRUE(WIFEXITED(status));
+  ASSERT_EQ(0, WEXITSTATUS(status));
+
+  Clock::resume();
+}
+
+
+TEST(Subprocess, input)
+{
+  Clock::pause();
+
+  Try<Subprocess> s = subprocess("read word ; echo $word");
+
+  ASSERT_SOME(s);
+
+  ASSERT_SOME(os::write(s.get().in(), "hello\n"));
+
+  ASSERT_SOME(os::nonblock(s.get().out()));
+
+  AWAIT_EXPECT_EQ("hello\n", io::read(s.get().out()));
+
+  // Advance time until the internal reaper reaps the subprocess.
+  while (s.get().status().isPending()) {
+    Clock::advance(Seconds(1));
+    Clock::settle();
+  }
+
+  AWAIT_ASSERT_READY(s.get().status());
+  ASSERT_SOME(s.get().status().get());
+  int status = s.get().status().get().get();
+
+  ASSERT_TRUE(WIFEXITED(status));
+  ASSERT_EQ(0, WEXITSTATUS(status));
+
+  Clock::resume();
+}