You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@mesos.apache.org by bm...@apache.org on 2014/01/30 03:05:22 UTC

[1/6] git commit: Added a missing lock acquisition in /__processes__.

Updated Branches:
  refs/heads/master 2aad6c9ae -> 0478c7f92


Added a missing lock acquisition in /__processes__.

Review: https://reviews.apache.org/r/17519


Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/035d9131
Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/035d9131
Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/035d9131

Branch: refs/heads/master
Commit: 035d91314ca5bd20c48af13374b5288d5151b9bd
Parents: 2aad6c9
Author: Benjamin Mahler <bm...@twitter.com>
Authored: Wed Jan 29 11:42:08 2014 -0800
Committer: Benjamin Mahler <bm...@twitter.com>
Committed: Wed Jan 29 17:09:33 2014 -0800

----------------------------------------------------------------------
 3rdparty/libprocess/src/process.cpp | 10 +++++++---
 1 file changed, 7 insertions(+), 3 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/mesos/blob/035d9131/3rdparty/libprocess/src/process.cpp
----------------------------------------------------------------------
diff --git a/3rdparty/libprocess/src/process.cpp b/3rdparty/libprocess/src/process.cpp
index c28fc35..7d8f00f 100644
--- a/3rdparty/libprocess/src/process.cpp
+++ b/3rdparty/libprocess/src/process.cpp
@@ -2922,7 +2922,7 @@ Future<Response> ProcessManager::__processes__(const Request&)
   JSON::Array array;
 
   synchronized (processes) {
-    foreachvalue (const ProcessBase* process, process_manager->processes) {
+    foreachvalue (ProcessBase* process, process_manager->processes) {
       JSON::Object object;
       object.values["id"] = process->pid.id;
 
@@ -2984,9 +2984,13 @@ Future<Response> ProcessManager::__processes__(const Request&)
         JSON::Array* events;
       } visitor(&events);
 
-      foreach (Event* event, process->events) {
-        event->visit(&visitor);
+      process->lock();
+      {
+        foreach (Event* event, process->events) {
+          event->visit(&visitor);
+        }
       }
+      process->unlock();
 
       object.values["events"] = events;
       array.values.push_back(object);


[3/6] git commit: Fixed the use of a raw unix time in the Slave.

Posted by bm...@apache.org.
Fixed the use of a raw unix time in the Slave.

This updates the slave to use Time::create to ensure the modification
time matches the current time frame in libprocess.

Review: https://reviews.apache.org/r/17481


Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/0697d97a
Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/0697d97a
Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/0697d97a

Branch: refs/heads/master
Commit: 0697d97a687e9832e5a47a84d11bce236f9b8799
Parents: 84b86ff
Author: Benjamin Mahler <bm...@twitter.com>
Authored: Tue Jan 28 18:51:34 2014 -0800
Committer: Benjamin Mahler <bm...@twitter.com>
Committed: Wed Jan 29 17:12:28 2014 -0800

----------------------------------------------------------------------
 src/slave/slave.cpp | 10 ++++++++--
 1 file changed, 8 insertions(+), 2 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/mesos/blob/0697d97a/src/slave/slave.cpp
----------------------------------------------------------------------
diff --git a/src/slave/slave.cpp b/src/slave/slave.cpp
index 2d21e16..a97b1d5 100644
--- a/src/slave/slave.cpp
+++ b/src/slave/slave.cpp
@@ -31,6 +31,7 @@
 #include <process/delay.hpp>
 #include <process/dispatch.hpp>
 #include <process/id.hpp>
+#include <process/time.hpp>
 
 #include <stout/bytes.hpp>
 #include <stout/check.hpp>
@@ -2858,9 +2859,14 @@ Future<Nothing> Slave::garbageCollect(const string& path)
     return Failure(mtime.error());
   }
 
+  // It is unsafe for testing to use unix time directly, we must use
+  // Time::create to convert into a Time object that reflects the
+  // possibly advanced state state of the libprocess Clock.
+  Try<Time> time = Time::create(mtime.get());
+  CHECK_SOME(time);
+
   // GC based on the modification time.
-  Duration delay =
-    flags.gc_delay - (Clock::now().duration() - Seconds(mtime.get()));
+  Duration delay = flags.gc_delay - (Clock::now() - time.get());
 
   return gc.schedule(delay, path);
 }


[6/6] git commit: Added an asynchronous subprocess utility.

Posted by bm...@apache.org.
Added an asynchronous subprocess utility.

Review: https://reviews.apache.org/r/17306


Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/0478c7f9
Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/0478c7f9
Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/0478c7f9

Branch: refs/heads/master
Commit: 0478c7f9292e75a30ef02327ff5698359d0c7235
Parents: c0af398
Author: Benjamin Mahler <bm...@twitter.com>
Authored: Thu Jan 23 22:36:53 2014 -0800
Committer: Benjamin Mahler <bm...@twitter.com>
Committed: Wed Jan 29 17:12:29 2014 -0800

----------------------------------------------------------------------
 3rdparty/libprocess/Makefile.am                 |   2 +
 .../libprocess/include/process/subprocess.hpp   | 195 +++++++++++++++++++
 .../libprocess/src/tests/subprocess_tests.cpp   | 189 ++++++++++++++++++
 3 files changed, 386 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/mesos/blob/0478c7f9/3rdparty/libprocess/Makefile.am
----------------------------------------------------------------------
diff --git a/3rdparty/libprocess/Makefile.am b/3rdparty/libprocess/Makefile.am
index 4a35f91..bbd17cc 100644
--- a/3rdparty/libprocess/Makefile.am
+++ b/3rdparty/libprocess/Makefile.am
@@ -93,6 +93,7 @@ libprocess_la_SOURCES +=					\
   $(top_srcdir)/include/process/shared.hpp			\
   $(top_srcdir)/include/process/socket.hpp			\
   $(top_srcdir)/include/process/statistics.hpp			\
+  $(top_srcdir)/include/process/subprocess.hpp			\
   $(top_srcdir)/include/process/time.hpp			\
   $(top_srcdir)/include/process/timeout.hpp			\
   $(top_srcdir)/include/process/timer.hpp
@@ -111,6 +112,7 @@ tests_SOURCES =							\
   src/tests/reap_tests.cpp					\
   src/tests/shared_tests.cpp					\
   src/tests/statistics_tests.cpp				\
+  src/tests/subprocess_tests.cpp				\
   src/tests/timeseries_tests.cpp				\
   src/tests/time_tests.cpp
 

http://git-wip-us.apache.org/repos/asf/mesos/blob/0478c7f9/3rdparty/libprocess/include/process/subprocess.hpp
----------------------------------------------------------------------
diff --git a/3rdparty/libprocess/include/process/subprocess.hpp b/3rdparty/libprocess/include/process/subprocess.hpp
new file mode 100644
index 0000000..db9c96b
--- /dev/null
+++ b/3rdparty/libprocess/include/process/subprocess.hpp
@@ -0,0 +1,195 @@
+#ifndef __PROCESS_SUBPROCESS_HPP__
+#define __PROCESS_SUBPROCESS_HPP__
+
+#include <unistd.h>
+
+#include <glog/logging.h>
+
+#include <sys/types.h>
+
+#include <string>
+
+#include <process/future.hpp>
+#include <process/reap.hpp>
+
+#include <stout/error.hpp>
+#include <stout/lambda.hpp>
+#include <stout/memory.hpp>
+#include <stout/nothing.hpp>
+#include <stout/option.hpp>
+#include <stout/try.hpp>
+
+namespace process {
+
+// Represents a fork() exec()ed subprocess. Access is provided to
+// the input / output of the process, as well as the exit status.
+// The input / output file descriptors are only closed after both:
+//   1. The subprocess has terminated, and
+//   2. There are no longer any references to the associated
+//      Subprocess object.
+struct Subprocess
+{
+  // Returns the pid for the subprocess.
+  pid_t pid() const { return data->pid; }
+
+  // File descriptor accessors for input / output.
+  int in()  const { return data->in;  }
+  int out() const { return data->out; }
+  int err() const { return data->err; }
+
+  // Returns a future from process::reap of this subprocess.
+  // Discarding this future has no effect on the subprocess.
+  Future<Option<int> > status() const { return data->status; }
+
+private:
+  Subprocess() : data(new Data()) {}
+  friend Try<Subprocess> subprocess(const std::string&);
+
+  struct Data
+  {
+    ~Data()
+    {
+      os::close(in);
+      os::close(out);
+      os::close(err);
+    }
+
+    pid_t pid;
+
+    // NOTE: stdin, stdout, stderr are macros on some systems, hence
+    // these names instead.
+    int in;
+    int out;
+    int err;
+
+    Future<Option<int> > status;
+  };
+
+  memory::shared_ptr<Data> data;
+};
+
+
+namespace internal {
+
+// See the comment below as to why subprocess is passed to cleanup.
+void cleanup(
+    const Future<Option<int> >& result,
+    Promise<Option<int> >* promise,
+    const Subprocess& subprocess)
+{
+  CHECK(!result.isPending());
+  CHECK(!result.isDiscarded());
+
+  if (result.isFailed()) {
+    promise->fail(result.failure());
+  } else {
+    promise->set(result.get());
+  }
+
+  delete promise;
+}
+
+}
+
+
+// Runs the provided command in a subprocess.
+inline Try<Subprocess> subprocess(const std::string& command)
+{
+  // Create pipes for stdin, stdout, stderr.
+  // Index 0 is for reading, and index 1 is for writing.
+  int stdinPipe[2];
+  int stdoutPipe[2];
+  int stderrPipe[2];
+
+  if (pipe(stdinPipe) == -1) {
+    return ErrnoError("Failed to create pipe");
+  } else if (pipe(stdoutPipe) == -1) {
+    os::close(stdinPipe[0]);
+    os::close(stdinPipe[1]);
+    return ErrnoError("Failed to create pipe");
+  } else if (pipe(stderrPipe) == -1) {
+    os::close(stdinPipe[0]);
+    os::close(stdinPipe[1]);
+    os::close(stdoutPipe[0]);
+    os::close(stdoutPipe[1]);
+    return ErrnoError("Failed to create pipe");
+  }
+
+  pid_t pid;
+  if ((pid = fork()) == -1) {
+    os::close(stdinPipe[0]);
+    os::close(stdinPipe[1]);
+    os::close(stdoutPipe[0]);
+    os::close(stdoutPipe[1]);
+    os::close(stderrPipe[0]);
+    os::close(stderrPipe[1]);
+    return ErrnoError("Failed to fork");
+  }
+
+  Subprocess process;
+  process.data->pid = pid;
+
+  if (process.data->pid == 0) {
+    // Child.
+    // Close parent's end of the pipes.
+    os::close(stdinPipe[1]);
+    os::close(stdoutPipe[0]);
+    os::close(stderrPipe[0]);
+
+    // Make our pipes look like stdin, stderr, stdout before we exec.
+    while (dup2(stdinPipe[0], STDIN_FILENO)   == -1 && errno == EINTR);
+    while (dup2(stdoutPipe[1], STDOUT_FILENO) == -1 && errno == EINTR);
+    while (dup2(stderrPipe[1], STDERR_FILENO) == -1 && errno == EINTR);
+
+    // Close the copies.
+    os::close(stdinPipe[0]);
+    os::close(stdoutPipe[1]);
+    os::close(stderrPipe[1]);
+
+    execl("/bin/sh", "sh", "-c", command.c_str(), (char *) NULL);
+
+    // Write the failure message in an async-signal safe manner,
+    // assuming strlen is async-signal safe or optimized out.
+    // In fact, it is highly unlikely that strlen would be
+    // implemented in an unsafe manner:
+    // http://austingroupbugs.net/view.php?id=692
+    const char* message = "Failed to execl '/bin sh -c ";
+    write(STDERR_FILENO, message, strlen(message));
+    write(STDERR_FILENO, command.c_str(), command.size());
+    write(STDERR_FILENO, "'\n", strlen("'\n"));
+
+    _exit(1);
+  }
+
+  // Parent.
+
+  // Close the child's end of the pipes.
+  os::close(stdinPipe[0]);
+  os::close(stdoutPipe[1]);
+  os::close(stderrPipe[1]);
+
+  process.data->in = stdinPipe[1];
+  process.data->out = stdoutPipe[0];
+  process.data->err = stderrPipe[0];
+
+  // Rather than directly exposing the future from process::reap, we
+  // must use an explicit promise so that we can ensure we can receive
+  // the termination signal. Otherwise, the caller can discard the
+  // reap future, and we will not know when it is safe to close the
+  // file descriptors.
+  Promise<Option<int> >* promise = new Promise<Option<int> >();
+  process.data->status = promise->future();
+
+  // We need to bind a copy of this Subprocess into the onAny callback
+  // below to ensure that we don't close the file descriptors before
+  // the subprocess has terminated (i.e., because the caller doesn't
+  // keep a copy of this Subprocess around themselves).
+  process::reap(process.data->pid)
+    .onAny(lambda::bind(internal::cleanup, lambda::_1, promise, process));
+
+  return process;
+}
+
+} // namespace process {
+
+#endif // __PROCESS_SUBPROCESS_HPP__

http://git-wip-us.apache.org/repos/asf/mesos/blob/0478c7f9/3rdparty/libprocess/src/tests/subprocess_tests.cpp
----------------------------------------------------------------------
diff --git a/3rdparty/libprocess/src/tests/subprocess_tests.cpp b/3rdparty/libprocess/src/tests/subprocess_tests.cpp
new file mode 100644
index 0000000..d2cf7f5
--- /dev/null
+++ b/3rdparty/libprocess/src/tests/subprocess_tests.cpp
@@ -0,0 +1,189 @@
+#include <signal.h>
+
+#include <gmock/gmock.h>
+
+#include <sys/types.h>
+
+#include <string>
+
+#include <process/gmock.hpp>
+#include <process/gtest.hpp>
+#include <process/io.hpp>
+#include <process/subprocess.hpp>
+
+#include <stout/foreach.hpp>
+#include <stout/gtest.hpp>
+#include <stout/list.hpp>
+#include <stout/os/read.hpp>
+
+using namespace process;
+
+using std::string;
+
+
+TEST(Subprocess, status)
+{
+  Clock::pause();
+
+  // Exit 0.
+  Try<Subprocess> s = subprocess("exit 0");
+
+  ASSERT_SOME(s);
+
+  // Advance time until the internal reaper reaps the subprocess.
+  while (s.get().status().isPending()) {
+    Clock::advance(Seconds(1));
+    Clock::settle();
+  }
+
+  AWAIT_ASSERT_READY(s.get().status());
+  ASSERT_SOME(s.get().status().get());
+
+  int status = s.get().status().get().get();
+  ASSERT_TRUE(WIFEXITED(status));
+  ASSERT_EQ(0, WEXITSTATUS(status));
+
+  // Exit 1.
+  s = subprocess("exit 1");
+
+  ASSERT_SOME(s);
+
+  // Advance time until the internal reaper reaps the subprocess.
+  while (s.get().status().isPending()) {
+    Clock::advance(Seconds(1));
+    Clock::settle();
+  }
+
+  AWAIT_ASSERT_READY(s.get().status());
+  ASSERT_SOME(s.get().status().get());
+
+  status = s.get().status().get().get();
+  ASSERT_TRUE(WIFEXITED(status));
+  ASSERT_EQ(1, WEXITSTATUS(status));
+
+  // SIGTERM.
+  s = subprocess("sleep 60");
+
+  ASSERT_SOME(s);
+
+  kill(s.get().pid(), SIGTERM);
+
+  // Advance time until the internal reaper reaps the subprocess.
+  while (s.get().status().isPending()) {
+    Clock::advance(Seconds(1));
+    Clock::settle();
+  }
+
+  AWAIT_ASSERT_READY(s.get().status());
+  ASSERT_SOME(s.get().status().get());
+
+  status = s.get().status().get().get();
+  ASSERT_TRUE(WIFSIGNALED(status));
+  ASSERT_EQ(SIGTERM, WTERMSIG(status));
+
+  // SIGKILL.
+  s = subprocess("sleep 60");
+
+  ASSERT_SOME(s);
+
+  kill(s.get().pid(), SIGKILL);
+
+  // Advance time until the internal reaper reaps the subprocess.
+  while (s.get().status().isPending()) {
+    Clock::advance(Seconds(1));
+    Clock::settle();
+  }
+
+  AWAIT_ASSERT_READY(s.get().status());
+  ASSERT_SOME(s.get().status().get());
+
+  status = s.get().status().get().get();
+  ASSERT_TRUE(WIFSIGNALED(status));
+  ASSERT_EQ(SIGKILL, WTERMSIG(status));
+
+  Clock::resume();
+}
+
+
+
+TEST(Subprocess, output)
+{
+  Clock::pause();
+
+  // Standard out.
+  Try<Subprocess> s = subprocess("echo hello");
+
+  ASSERT_SOME(s);
+
+  ASSERT_SOME(os::nonblock(s.get().out()));
+
+  AWAIT_EXPECT_EQ("hello\n", io::read(s.get().out()));
+
+  // Advance time until the internal reaper reaps the subprocess.
+  while (s.get().status().isPending()) {
+    Clock::advance(Seconds(1));
+    Clock::settle();
+  }
+
+  AWAIT_ASSERT_READY(s.get().status());
+  ASSERT_SOME(s.get().status().get());
+  int status = s.get().status().get().get();
+
+  ASSERT_TRUE(WIFEXITED(status));
+  ASSERT_EQ(0, WEXITSTATUS(status));
+
+  // Standard error.
+  s = subprocess("echo hello 1>&2");
+
+  ASSERT_SOME(s);
+
+  ASSERT_SOME(os::nonblock(s.get().err()));
+
+  AWAIT_EXPECT_EQ("hello\n", io::read(s.get().err()));
+
+  // Advance time until the internal reaper reaps the subprocess.
+  while (s.get().status().isPending()) {
+    Clock::advance(Seconds(1));
+    Clock::settle();
+  }
+
+  AWAIT_ASSERT_READY(s.get().status());
+  ASSERT_SOME(s.get().status().get());
+  status = s.get().status().get().get();
+
+  ASSERT_TRUE(WIFEXITED(status));
+  ASSERT_EQ(0, WEXITSTATUS(status));
+
+  Clock::resume();
+}
+
+
+TEST(Subprocess, input)
+{
+  Clock::pause();
+
+  Try<Subprocess> s = subprocess("read word ; echo $word");
+
+  ASSERT_SOME(s);
+
+  ASSERT_SOME(os::write(s.get().in(), "hello\n"));
+
+  ASSERT_SOME(os::nonblock(s.get().out()));
+
+  AWAIT_EXPECT_EQ("hello\n", io::read(s.get().out()));
+
+  // Advance time until the internal reaper reaps the subprocess.
+  while (s.get().status().isPending()) {
+    Clock::advance(Seconds(1));
+    Clock::settle();
+  }
+
+  AWAIT_ASSERT_READY(s.get().status());
+  ASSERT_SOME(s.get().status().get());
+  int status = s.get().status().get().get();
+
+  ASSERT_TRUE(WIFEXITED(status));
+  ASSERT_EQ(0, WEXITSTATUS(status));
+
+  Clock::resume();
+}


[5/6] git commit: Update the slave to use the libprocess Reaper.

Posted by bm...@apache.org.
Update the slave to use the libprocess Reaper.

Review: https://reviews.apache.org/r/17305


Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/c0af3984
Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/c0af3984
Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/c0af3984

Branch: refs/heads/master
Commit: c0af39845bd010a01bf2e77b2be7d05122f1126a
Parents: 81d0181
Author: Benjamin Mahler <bm...@twitter.com>
Authored: Thu Jan 23 14:58:36 2014 -0800
Committer: Benjamin Mahler <bm...@twitter.com>
Committed: Wed Jan 29 17:12:29 2014 -0800

----------------------------------------------------------------------
 src/Makefile.am                    |   3 -
 src/launcher/executor.cpp          |   6 +-
 src/slave/cgroups_isolator.cpp     |   5 +-
 src/slave/cgroups_isolator.hpp     |   2 -
 src/slave/process_isolator.cpp     |   5 +-
 src/slave/process_isolator.hpp     |   2 -
 src/slave/reaper.cpp               | 158 -------------------------
 src/slave/reaper.hpp               |  92 ---------------
 src/tests/environment.cpp          |  12 ++
 src/tests/reaper_tests.cpp         | 201 --------------------------------
 src/tests/slave_recovery_tests.cpp |   1 -
 11 files changed, 20 insertions(+), 467 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/mesos/blob/c0af3984/src/Makefile.am
----------------------------------------------------------------------
diff --git a/src/Makefile.am b/src/Makefile.am
index d58b46e..c307068 100644
--- a/src/Makefile.am
+++ b/src/Makefile.am
@@ -178,7 +178,6 @@ libmesos_no_3rdparty_la_SOURCES =					\
 	slave/http.cpp							\
 	slave/isolator.cpp						\
 	slave/process_isolator.cpp					\
-	slave/reaper.cpp						\
 	slave/status_update_manager.cpp					\
 	launcher/launcher.cpp						\
 	exec/exec.cpp							\
@@ -239,7 +238,6 @@ libmesos_no_3rdparty_la_SOURCES += common/attributes.hpp		\
 	slave/paths.hpp slave/state.hpp					\
 	slave/status_update_manager.hpp					\
 	slave/process_isolator.hpp					\
-        slave/reaper.hpp						\
 	slave/slave.hpp							\
 	tests/environment.hpp tests/script.hpp				\
 	tests/zookeeper.hpp tests/flags.hpp tests/utils.hpp		\
@@ -857,7 +855,6 @@ mesos_tests_SOURCES =				\
   tests/monitor_tests.cpp			\
   tests/paths_tests.cpp				\
   tests/protobuf_io_tests.cpp			\
-  tests/reaper_tests.cpp			\
   tests/registrar_tests.cpp			\
   tests/resource_offers_tests.cpp		\
   tests/resources_tests.cpp			\

http://git-wip-us.apache.org/repos/asf/mesos/blob/c0af3984/src/launcher/executor.cpp
----------------------------------------------------------------------
diff --git a/src/launcher/executor.cpp b/src/launcher/executor.cpp
index b73ab47..e30d77a 100644
--- a/src/launcher/executor.cpp
+++ b/src/launcher/executor.cpp
@@ -30,6 +30,7 @@
 #include <process/defer.hpp>
 #include <process/future.hpp>
 #include <process/process.hpp>
+#include <process/reap.hpp>
 
 #include <stout/duration.hpp>
 #include <stout/lambda.hpp>
@@ -40,8 +41,6 @@
 
 #include "logging/logging.hpp"
 
-#include "slave/reaper.hpp"
-
 using process::wait; // Necessary on some OS's to disambiguate.
 
 using std::cout;
@@ -186,7 +185,7 @@ public:
     std::cout << "Forked command at " << pid << std::endl;
 
     // Monitor this process.
-    reaper.monitor(pid)
+    process::reap(pid)
       .onAny(defer(self(),
                    &Self::reaped,
                    driver,
@@ -294,7 +293,6 @@ private:
   bool launched;
   bool killed;
   pid_t pid;
-  slave::Reaper reaper;
 };
 
 

http://git-wip-us.apache.org/repos/asf/mesos/blob/c0af3984/src/slave/cgroups_isolator.cpp
----------------------------------------------------------------------
diff --git a/src/slave/cgroups_isolator.cpp b/src/slave/cgroups_isolator.cpp
index 5298f20..690ae81 100644
--- a/src/slave/cgroups_isolator.cpp
+++ b/src/slave/cgroups_isolator.cpp
@@ -32,6 +32,7 @@
 #include <process/clock.hpp>
 #include <process/defer.hpp>
 #include <process/dispatch.hpp>
+#include <process/reap.hpp>
 
 #include <stout/bytes.hpp>
 #include <stout/check.hpp>
@@ -569,7 +570,7 @@ void CgroupsIsolator::launchExecutor(
     // Store the pid of the leading process of the executor.
     info->pid = pid;
 
-    reaper.monitor(pid)
+    process::reap(pid)
       .onAny(defer(PID<CgroupsIsolator>(this),
                    &CgroupsIsolator::reaped,
                    pid,
@@ -901,7 +902,7 @@ Future<Nothing> CgroupsIsolator::recover(
 
         // Add the pid to the reaper to monitor exit status.
         if (run.forkedPid.isSome()) {
-          reaper.monitor(run.forkedPid.get())
+          process::reap(run.forkedPid.get())
             .onAny(defer(PID<CgroupsIsolator>(this),
                          &CgroupsIsolator::reaped,
                          run.forkedPid.get(),

http://git-wip-us.apache.org/repos/asf/mesos/blob/c0af3984/src/slave/cgroups_isolator.hpp
----------------------------------------------------------------------
diff --git a/src/slave/cgroups_isolator.hpp b/src/slave/cgroups_isolator.hpp
index e86062e..1a66dc6 100644
--- a/src/slave/cgroups_isolator.hpp
+++ b/src/slave/cgroups_isolator.hpp
@@ -41,7 +41,6 @@
 
 #include "slave/flags.hpp"
 #include "slave/isolator.hpp"
-#include "slave/reaper.hpp"
 #include "slave/slave.hpp"
 
 namespace mesos {
@@ -290,7 +289,6 @@ private:
   bool local;
   process::PID<Slave> slave;
   bool initialized;
-  Reaper reaper;
 
   // File descriptor to 'mesos/tasks' file in the cgroup on which we place
   // an advisory lock.

http://git-wip-us.apache.org/repos/asf/mesos/blob/c0af3984/src/slave/process_isolator.cpp
----------------------------------------------------------------------
diff --git a/src/slave/process_isolator.cpp b/src/slave/process_isolator.cpp
index 0bc698f..748d9c2 100644
--- a/src/slave/process_isolator.cpp
+++ b/src/slave/process_isolator.cpp
@@ -29,6 +29,7 @@
 #include <process/defer.hpp>
 #include <process/dispatch.hpp>
 #include <process/id.hpp>
+#include <process/reap.hpp>
 
 #include <stout/check.hpp>
 #include <stout/exit.hpp>
@@ -165,7 +166,7 @@ void ProcessIsolator::launchExecutor(
     // Record the pid (should also be the pgid since we setsid below).
     infos[frameworkId][executorId]->pid = pid;
 
-    reaper.monitor(pid)
+    process::reap(pid)
       .onAny(defer(PID<ProcessIsolator>(this),
                    &ProcessIsolator::reaped,
                    pid,
@@ -358,7 +359,7 @@ Future<Nothing> ProcessIsolator::recover(
 
       // Add the pid to the reaper to monitor exit status.
       if (run.forkedPid.isSome()) {
-        reaper.monitor(run.forkedPid.get())
+        process::reap(run.forkedPid.get())
           .onAny(defer(PID<ProcessIsolator>(this),
                        &ProcessIsolator::reaped,
                        run.forkedPid.get(),

http://git-wip-us.apache.org/repos/asf/mesos/blob/c0af3984/src/slave/process_isolator.hpp
----------------------------------------------------------------------
diff --git a/src/slave/process_isolator.hpp b/src/slave/process_isolator.hpp
index 4ae093f..bc52f33 100644
--- a/src/slave/process_isolator.hpp
+++ b/src/slave/process_isolator.hpp
@@ -34,7 +34,6 @@
 
 #include "slave/flags.hpp"
 #include "slave/isolator.hpp"
-#include "slave/reaper.hpp"
 #include "slave/slave.hpp"
 
 namespace mesos {
@@ -106,7 +105,6 @@ private:
   bool local;
   process::PID<Slave> slave;
   bool initialized;
-  Reaper reaper;
   hashmap<FrameworkID, hashmap<ExecutorID, ProcessInfo*> > infos;
 
   void reaped(pid_t pid, const Future<Option<int> >& status);

http://git-wip-us.apache.org/repos/asf/mesos/blob/c0af3984/src/slave/reaper.cpp
----------------------------------------------------------------------
diff --git a/src/slave/reaper.cpp b/src/slave/reaper.cpp
deleted file mode 100644
index 5eabbc3..0000000
--- a/src/slave/reaper.cpp
+++ /dev/null
@@ -1,158 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-#include <glog/logging.h>
-
-#include <sys/types.h>
-#include <sys/wait.h>
-
-#include <process/delay.hpp>
-#include <process/dispatch.hpp>
-#include <process/id.hpp>
-
-#include <stout/check.hpp>
-#include <stout/foreach.hpp>
-#include <stout/nothing.hpp>
-#include <stout/os.hpp>
-#include <stout/try.hpp>
-
-#include <stout/utils.hpp>
-
-#include "slave/reaper.hpp"
-
-using namespace process;
-
-namespace mesos {
-namespace internal {
-namespace slave {
-
-ReaperProcess::ReaperProcess()
-  : ProcessBase(ID::generate("reaper")) {}
-
-
-Future<Option<int> > ReaperProcess::monitor(pid_t pid)
-{
-  // Check to see if this pid exists.
-  const Result<os::Process>& process = os::process(pid);
-
-  if (process.isSome()) {
-    // The process exists, we add it to the promises map.
-    Owned<Promise<Option<int> > > promise(new Promise<Option<int> >());
-    promises.put(pid, promise);
-    return promise->future();
-  } else if (process.isNone()) {
-    LOG(WARNING) << "Cannot monitor process " << pid
-                 << " because it doesn't exist";
-    return None();
-  } else {
-    return Failure(
-        "Failed to monitor process " + stringify(pid) + ": " + process.error());
-  }
-}
-
-
-void ReaperProcess::initialize()
-{
-  reap();
-}
-
-
-void ReaperProcess::notify(pid_t pid, Option<int> status)
-{
-  foreach (const Owned<Promise<Option<int> > >& promise, promises.get(pid)) {
-    promise->set(status);
-  }
-  promises.remove(pid);
-}
-
-
-void ReaperProcess::reap()
-{
-  // This method assumes that the registered PIDs are
-  // 1) children: we can reap their exit status when they are
-  //    terminated.
-  // 2) non-children: we cannot reap their exit status.
-  // 3) nonexistent: already reaped elsewhere.
-
-  // Reap all child processes first.
-  pid_t pid;
-  int status;
-  while ((pid = waitpid(-1, &status, WNOHANG)) > 0) {
-    // Ignore this if the process has only stopped.
-    // Notify the "listeners" only if they have requested to monitor
-    // this pid. Otherwise the status is discarded.
-    // This means if a child pid is registered via the monitor() call
-    // after it's reaped, status 'None' will be returned.
-    if (!WIFSTOPPED(status) && promises.contains(pid)) {
-      notify(pid, status);
-    }
-  }
-
-  // Check whether any monitored process has exited and been reaped.
-  // 1) If a child terminates before the foreach loop but after the
-  //    while loop, it won't be reaped until the next reap() cycle.
-  // 2) If a non-child process terminates and is reaped elsewhere,
-  //    e.g. by init, we notify the listeners.
-  // 3) If a non-child process terminates and is not yet reaped,
-  //    no notification is sent.
-  // 4) If a child terminates before the while loop above, then we've
-  //    already reaped it and have the listeners notified!
-  foreach (pid_t pid, utils::copy(promises.keys())) {
-    const Result<os::Process>& process = os::process(pid);
-
-    if (process.isError()) {
-      LOG(ERROR) << "Failed to get process information for " << pid
-                 <<": " << process.error();
-      notify(pid, None());
-    } else if (process.isNone()) {
-      // The process has been reaped.
-      LOG(WARNING) << "Cannot get the exit status of process " << pid
-                   << " because it no longer exists";
-      notify(pid, None());
-    }
-  }
-
-  delay(Seconds(1), self(), &ReaperProcess::reap); // Reap forever!
-}
-
-
-Reaper::Reaper()
-{
-  process = new ReaperProcess();
-  spawn(process);
-}
-
-
-Reaper::~Reaper()
-{
-  terminate(process);
-  wait(process);
-  delete process;
-}
-
-
-Future<Option<int> > Reaper::monitor(pid_t pid)
-{
-  return dispatch(process, &ReaperProcess::monitor, pid);
-}
-
-} // namespace slave {
-} // namespace internal {
-} // namespace mesos {
-
-

http://git-wip-us.apache.org/repos/asf/mesos/blob/c0af3984/src/slave/reaper.hpp
----------------------------------------------------------------------
diff --git a/src/slave/reaper.hpp b/src/slave/reaper.hpp
deleted file mode 100644
index 9a31c75..0000000
--- a/src/slave/reaper.hpp
+++ /dev/null
@@ -1,92 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-#ifndef __REAPER_HPP__
-#define __REAPER_HPP__
-
-#include <list>
-#include <set>
-
-#include <process/future.hpp>
-#include <process/owned.hpp>
-#include <process/process.hpp>
-
-#include <stout/multihashmap.hpp>
-#include <stout/nothing.hpp>
-#include <stout/try.hpp>
-
-namespace mesos {
-namespace internal {
-namespace slave {
-
-// Forward declaration.
-class ReaperProcess;
-
-
-// TODO(vinod): Pull reaper into common or libprocess.
-class Reaper
-{
-public:
-  Reaper();
-  virtual ~Reaper();
-
-  // Monitor the given process and notify the caller if it terminates
-  // via a Future of the exit status.
-  //
-  // The exit status of 'pid' can only be correctly captured if the
-  // calling process is the parent of 'pid' and the process hasn't
-  // been reaped yet, otherwise 'None' is returned.
-  // Note that an invalid pid does not cause a failed Future, but an
-  // empty result ('None').
-  process::Future<Option<int> > monitor(pid_t pid);
-
-private:
-  ReaperProcess* process;
-};
-
-
-// Reaper implementation.
-class ReaperProcess : public process::Process<ReaperProcess>
-{
-public:
-  ReaperProcess();
-
-  process::Future<Option<int> > monitor(pid_t pid);
-
-protected:
-  virtual void initialize();
-
-  void reap();
-
-  // The notification is sent only if the pid is explicitly registered
-  // via the monitor() call.
-  void notify(pid_t pid, Option<int> status);
-
-private:
-  // Mapping from the monitored pid to all promises the pid exit
-  // status should be sent to.
-  multihashmap<
-    pid_t, process::Owned<process::Promise<Option<int> > > > promises;
-};
-
-
-} // namespace slave {
-} // namespace internal {
-} // namespace mesos {
-
-#endif // __REAPER_HPP__

http://git-wip-us.apache.org/repos/asf/mesos/blob/c0af3984/src/tests/environment.cpp
----------------------------------------------------------------------
diff --git a/src/tests/environment.cpp b/src/tests/environment.cpp
index 6edce45..41b8a71 100644
--- a/src/tests/environment.cpp
+++ b/src/tests/environment.cpp
@@ -18,6 +18,10 @@
 
 #include <gtest/gtest.h>
 
+#include <sys/wait.h>
+
+#include <string.h>
+
 #include <list>
 #include <string>
 
@@ -235,6 +239,14 @@ void Environment::TearDown()
     }
   }
   directories.clear();
+
+  // Make sure we haven't left any child processes lying around.
+  Try<os::ProcessTree> pstree = os::pstree(0);
+
+  if (pstree.isSome() && !pstree.get().children.empty()) {
+    FAIL() << "Tests completed with child processes remaining:\n"
+           << pstree.get();
+  }
 }
 
 

http://git-wip-us.apache.org/repos/asf/mesos/blob/c0af3984/src/tests/reaper_tests.cpp
----------------------------------------------------------------------
diff --git a/src/tests/reaper_tests.cpp b/src/tests/reaper_tests.cpp
deleted file mode 100644
index 608ec0e..0000000
--- a/src/tests/reaper_tests.cpp
+++ /dev/null
@@ -1,201 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-#include <signal.h>
-#include <unistd.h>
-
-#include <sys/wait.h>
-
-#include <gtest/gtest.h>
-
-#include <process/clock.hpp>
-#include <process/dispatch.hpp>
-#include <process/gmock.hpp>
-#include <process/gtest.hpp>
-
-#include <stout/exit.hpp>
-#include <stout/gtest.hpp>
-#include <stout/os.hpp>
-
-#include "slave/reaper.hpp"
-
-using namespace os;
-using namespace mesos;
-using namespace mesos::internal;
-using namespace mesos::internal::slave;
-
-using process::Clock;
-using process::Future;
-
-using testing::_;
-using testing::DoDefault;
-
-
-// This test checks that the Reaper can monitor a non-child process.
-TEST(ReaperTest, NonChildProcess)
-{
-  // The child process creates a grandchild and then exits. The
-  // grandchild sleeps for 10 seconds. The process tree looks like:
-  //  -+- child exit 0
-  //   \-+- grandchild sleep 10
-
-  // After the child exits, the grandchild is going to be re-parented
-  // by 'init', like this:
-  //  -+- child (exit 0)
-  //  -+- grandchild sleep 10
-  Try<ProcessTree> tree = Fork(None(),
-                               Fork(Exec("sleep 10")),
-                               Exec("exit 0"))();
-  ASSERT_SOME(tree);
-  ASSERT_EQ(1u, tree.get().children.size());
-  pid_t grandchild = tree.get().children.front();
-
-  Reaper reaper;
-
-  Future<Nothing> monitor = FUTURE_DISPATCH(_, &ReaperProcess::monitor);
-
-  // Ask the reaper to monitor the grand child process.
-  Future<Option<int> > status = reaper.monitor(grandchild);
-
-  AWAIT_READY(monitor);
-
-  // This makes sure the status only becomes ready after the
-  // grandchild is killed.
-  EXPECT_TRUE(status.isPending());
-
-  // Now kill the grand child.
-  // NOTE: We send a SIGKILL here because sometimes the grand child
-  // process seems to be in a hung state and not responding to
-  // SIGTERM/SIGINT.
-  EXPECT_EQ(0, kill(grandchild, SIGKILL));
-
-  Clock::pause();
-
-  // Now advance time until the reaper reaps the executor.
-  while (status.isPending()) {
-    Clock::advance(Seconds(1));
-    Clock::settle();
-  }
-
-  // Ensure the reaper notifies of the terminated process.
-  AWAIT_READY(status);
-
-  // Status is None because pid is not an immediate child.
-  ASSERT_NONE(status.get());
-
-  Clock::resume();
-}
-
-
-// This test checks that the Reaper can monitor a child process with
-// accurate exit status returned.
-TEST(ReaperTest, ChildProcess)
-{
-  ASSERT_TRUE(GTEST_IS_THREADSAFE);
-
-  // The child process sleeps and will be killed by the parent.
-  Try<ProcessTree> tree = Fork(None(),
-                               Exec("sleep 10"))();
-
-  ASSERT_SOME(tree);
-  pid_t child = tree.get();
-
-  Reaper reaper;
-
-  Future<Nothing> monitor = FUTURE_DISPATCH(_, &ReaperProcess::monitor);
-
-  // Ask the reaper to monitor the grand child process.
-  Future<Option<int> > status = reaper.monitor(child);
-
-  AWAIT_READY(monitor);
-
-  // Now kill the child.
-  EXPECT_EQ(0, kill(child, SIGKILL));
-
-  Clock::pause();
-
-  // Now advance time until the reaper reaps the executor.
-  while (status.isPending()) {
-    Clock::advance(Seconds(1));
-    Clock::settle();
-  }
-
-  // Ensure the reaper notifies of the terminated process.
-  AWAIT_READY(status);
-
-  // Check if the status is correct.
-  ASSERT_SOME(status.get());
-  int status_ = status.get().get();
-  ASSERT_TRUE(WIFSIGNALED(status_));
-  ASSERT_EQ(SIGKILL, WTERMSIG(status_));
-
-  Clock::resume();
-}
-
-
-// Check that the Reaper can monitor a child process that exits
-// before monitor() is called on it.
-TEST(ReaperTest, TerminatedChildProcess)
-{
-  ASSERT_TRUE(GTEST_IS_THREADSAFE);
-
-  // The child process immediately exits.
-  Try<ProcessTree> tree = Fork(None(),
-                               Exec("exit 0"))();
-
-  ASSERT_SOME(tree);
-  pid_t child = tree.get();
-
-  ASSERT_SOME(os::process(child));
-
-  Clock::pause();
-
-  Reaper reaper;
-
-  // Because reaper reaps all child processes even if they aren't
-  // registered, we advance time until that happens.
-  while (os::process(child).isSome()) {
-    Clock::advance(Seconds(1));
-    Clock::settle();
-  }
-
-  // Now we request to monitor the child process which is already
-  // reaped.
-
-  Future<Nothing> monitor = FUTURE_DISPATCH(_, &ReaperProcess::monitor);
-
-  // Ask the reaper to monitor the child process.
-  Future<Option<int> > status = reaper.monitor(child);
-
-  AWAIT_READY(monitor);
-
-  // Now advance time until the reaper sends the notification.
-  while (status.isPending()) {
-    Clock::advance(Seconds(1));
-    Clock::settle();
-  }
-
-  // Ensure the reaper notifies of the terminated process.
-  AWAIT_READY(status);
-
-  // Invalid status is returned because it is reaped before being
-  // monitored.
-  ASSERT_NONE(status.get());
-
-  Clock::resume();
-}

http://git-wip-us.apache.org/repos/asf/mesos/blob/c0af3984/src/tests/slave_recovery_tests.cpp
----------------------------------------------------------------------
diff --git a/src/tests/slave_recovery_tests.cpp b/src/tests/slave_recovery_tests.cpp
index 999e598..b8c5123 100644
--- a/src/tests/slave_recovery_tests.cpp
+++ b/src/tests/slave_recovery_tests.cpp
@@ -52,7 +52,6 @@
 #endif
 #include "slave/paths.hpp"
 #include "slave/process_isolator.hpp"
-#include "slave/reaper.hpp"
 #include "slave/slave.hpp"
 #include "slave/state.hpp"
 


[2/6] git commit: Added a child Reaper utility in libprocess.

Posted by bm...@apache.org.
Added a child Reaper utility in libprocess.

Review: https://reviews.apache.org/r/17304


Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/81d01813
Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/81d01813
Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/81d01813

Branch: refs/heads/master
Commit: 81d018136890406fdb442be8ac51d70225b9f65e
Parents: 0697d97
Author: Benjamin Mahler <bm...@twitter.com>
Authored: Thu Jan 23 14:51:06 2014 -0800
Committer: Benjamin Mahler <bm...@twitter.com>
Committed: Wed Jan 29 17:12:28 2014 -0800

----------------------------------------------------------------------
 3rdparty/libprocess/Makefile.am              |   3 +
 3rdparty/libprocess/include/process/reap.hpp |  19 +++
 3rdparty/libprocess/src/reap.cpp             | 127 +++++++++++++++
 3rdparty/libprocess/src/tests/reap_tests.cpp | 179 ++++++++++++++++++++++
 4 files changed, 328 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/mesos/blob/81d01813/3rdparty/libprocess/Makefile.am
----------------------------------------------------------------------
diff --git a/3rdparty/libprocess/Makefile.am b/3rdparty/libprocess/Makefile.am
index 40f01a7..4a35f91 100644
--- a/3rdparty/libprocess/Makefile.am
+++ b/3rdparty/libprocess/Makefile.am
@@ -34,6 +34,7 @@ libprocess_la_SOURCES =		\
   src/latch.cpp			\
   src/pid.cpp			\
   src/process.cpp		\
+  src/reap.cpp			\
   src/statistics.cpp		\
   src/synchronized.hpp
 
@@ -87,6 +88,7 @@ libprocess_la_SOURCES +=					\
   $(top_srcdir)/include/process/process.hpp			\
   $(top_srcdir)/include/process/profiler.hpp			\
   $(top_srcdir)/include/process/protobuf.hpp			\
+  $(top_srcdir)/include/process/reap.hpp			\
   $(top_srcdir)/include/process/run.hpp				\
   $(top_srcdir)/include/process/shared.hpp			\
   $(top_srcdir)/include/process/socket.hpp			\
@@ -106,6 +108,7 @@ tests_SOURCES =							\
   src/tests/main.cpp						\
   src/tests/owned_tests.cpp					\
   src/tests/process_tests.cpp					\
+  src/tests/reap_tests.cpp					\
   src/tests/shared_tests.cpp					\
   src/tests/statistics_tests.cpp				\
   src/tests/timeseries_tests.cpp				\

http://git-wip-us.apache.org/repos/asf/mesos/blob/81d01813/3rdparty/libprocess/include/process/reap.hpp
----------------------------------------------------------------------
diff --git a/3rdparty/libprocess/include/process/reap.hpp b/3rdparty/libprocess/include/process/reap.hpp
new file mode 100644
index 0000000..dcc6c72
--- /dev/null
+++ b/3rdparty/libprocess/include/process/reap.hpp
@@ -0,0 +1,19 @@
+#ifndef __PROCESS_REAP_HPP__
+#define __PROCESS_REAP_HPP__
+
+#include <sys/types.h>
+
+#include <stout/option.hpp>
+
+namespace process {
+
+// Returns the exit status of the specified process if and only if
+// the process is a direct child and it has not already been reaped.
+// Otherwise, returns None once the process has been reaped elsewhere
+// (or does not exist, which is indistinguishable from being reaped
+// elsewhere). This will never discard the returned future.
+Future<Option<int> > reap(pid_t pid);
+
+} // namespace process {
+
+#endif // __PROCESS_REAP_HPP__

http://git-wip-us.apache.org/repos/asf/mesos/blob/81d01813/3rdparty/libprocess/src/reap.cpp
----------------------------------------------------------------------
diff --git a/3rdparty/libprocess/src/reap.cpp b/3rdparty/libprocess/src/reap.cpp
new file mode 100644
index 0000000..274d129
--- /dev/null
+++ b/3rdparty/libprocess/src/reap.cpp
@@ -0,0 +1,127 @@
+#include <glog/logging.h>
+
+#include <sys/types.h>
+#include <sys/wait.h>
+
+#include <process/delay.hpp>
+#include <process/future.hpp>
+#include <process/id.hpp>
+#include <process/once.hpp>
+#include <process/owned.hpp>
+#include <process/reap.hpp>
+
+#include <stout/check.hpp>
+#include <stout/foreach.hpp>
+#include <stout/multihashmap.hpp>
+#include <stout/none.hpp>
+#include <stout/os.hpp>
+#include <stout/result.hpp>
+#include <stout/try.hpp>
+
+namespace process {
+
+
+// TODO(bmahler): This can be optimized to use a thread per pid, where
+// each thread makes a blocking call to waitpid. This eliminates the
+// unfortunate 1 second reap delay.
+
+class ReaperProcess : public Process<ReaperProcess>
+{
+public:
+  ReaperProcess() : ProcessBase(ID::generate("reaper")) {}
+
+  Future<Option<int> > reap(pid_t pid)
+  {
+    // Check to see if this pid exists.
+    const Result<os::Process>& process = os::process(pid);
+
+    if (process.isSome()) {
+      // The process exists, we add it to the promises map.
+      Owned<Promise<Option<int> > > promise(new Promise<Option<int> >());
+      promises.put(pid, promise);
+      return promise->future();
+    } else if (process.isNone()) {
+      return None();
+    } else {
+      return Failure(
+          "Failed to monitor process " + stringify(pid) + ": " +
+          process.error());
+    }
+  }
+
+protected:
+  virtual void initialize() { wait(); }
+
+  void wait()
+  {
+    // There are a few cases to consider here for each pid:
+    //   1) The process is our child. In this case, we will notify
+    //      with the exit status once it terminates.
+    //   2) The process exists but is not our child. In this case,
+    //      we'll notify with None() once it no longer exists, since
+    //      we cannot reap it.
+    //   3) The process does not exist, notify with None() since it
+    //      has likely been reaped elsewhere.
+
+    foreach (pid_t pid, promises.keys()) {
+      int status;
+      if (waitpid(pid, &status, WNOHANG) > 0) {
+        // Terminated child process.
+        notify(pid, status);
+      } else if (errno == ECHILD) {
+        // The process is not our child, or does not exist. We want to
+        // notify with None() once it no longer exists (reaped by
+        // someone else).
+        const Result<os::Process>& process = os::process(pid);
+
+        if (process.isError()) {
+          notify(pid, Error(process.error()));
+        } else if (process.isNone()) {
+          // The process has been reaped.
+          notify(pid, None());
+        }
+      }
+    }
+
+    delay(Seconds(1), self(), &ReaperProcess::wait); // Reap forever!
+  }
+
+  void notify(pid_t pid, Result<int> status)
+  {
+    foreach (const Owned<Promise<Option<int> > >& promise, promises.get(pid)) {
+      if (status.isError()) {
+        promise->fail(status.error());
+      } else if (status.isNone()) {
+        promise->set(Option<int>::none());
+      } else {
+        promise->set(Option<int>(status.get()));
+      }
+    }
+    promises.remove(pid);
+  }
+
+private:
+  multihashmap<pid_t, Owned<Promise<Option<int> > > > promises;
+};
+
+
+// Global reaper object.
+static ReaperProcess* reaper = NULL;
+
+
+Future<Option<int> > reap(pid_t pid)
+{
+  static Once* initialized = new Once();
+
+  if (!initialized->once()) {
+    reaper = new ReaperProcess();
+    spawn(reaper);
+    initialized->done();
+  }
+
+  CHECK_NOTNULL(reaper);
+
+  return dispatch(reaper, &ReaperProcess::reap, pid);
+}
+
+} // namespace process {

http://git-wip-us.apache.org/repos/asf/mesos/blob/81d01813/3rdparty/libprocess/src/tests/reap_tests.cpp
----------------------------------------------------------------------
diff --git a/3rdparty/libprocess/src/tests/reap_tests.cpp b/3rdparty/libprocess/src/tests/reap_tests.cpp
new file mode 100644
index 0000000..a18d54c
--- /dev/null
+++ b/3rdparty/libprocess/src/tests/reap_tests.cpp
@@ -0,0 +1,179 @@
+#include <signal.h>
+#include <unistd.h>
+
+#include <sys/wait.h>
+
+#include <gtest/gtest.h>
+
+#include <process/clock.hpp>
+#include <process/dispatch.hpp>
+#include <process/gmock.hpp>
+#include <process/gtest.hpp>
+#include <process/reap.hpp>
+
+#include <stout/exit.hpp>
+#include <stout/gtest.hpp>
+#include <stout/os/fork.hpp>
+#include <stout/os/pstree.hpp>
+#include <stout/try.hpp>
+
+using namespace process;
+
+using os::Exec;
+using os::Fork;
+using os::ProcessTree;
+
+using testing::_;
+using testing::DoDefault;
+
+
+// This test checks that we can reap a non-child process, in terms
+// of receiving the termination notification.
+TEST(Reap, NonChildProcess)
+{
+  // The child process creates a grandchild and then exits. The
+  // grandchild sleeps for 10 seconds. The process tree looks like:
+  //  -+- child exit 0
+  //   \-+- grandchild sleep 10
+
+  // After the child exits, the grandchild is going to be re-parented
+  // by 'init', like this:
+  //  -+- child (exit 0)
+  //  -+- grandchild sleep 10
+  Try<ProcessTree> tree = Fork(None(),
+                               Fork(Exec("sleep 10")),
+                               Exec("exit 0"))();
+  ASSERT_SOME(tree);
+  ASSERT_EQ(1u, tree.get().children.size());
+  pid_t grandchild = tree.get().children.front();
+
+  // Reap the grandchild process.
+  Future<Option<int> > status = process::reap(grandchild);
+
+  EXPECT_TRUE(status.isPending());
+
+  // Now kill the grandchild.
+  // NOTE: We send a SIGKILL here because sometimes the grandchild
+  // process seems to be in a hung state and not responding to
+  // SIGTERM/SIGINT.
+  EXPECT_EQ(0, kill(grandchild, SIGKILL));
+
+  Clock::pause();
+
+  // Now advance time until the Reaper reaps the grandchild.
+  while (status.isPending()) {
+    Clock::advance(Seconds(1));
+    Clock::settle();
+  }
+
+  AWAIT_READY(status);
+
+  // The status is None because pid is not an immediate child.
+  ASSERT_NONE(status.get()) << status.get().get();
+
+  // Reap the child as well to clean up after ourselves.
+  status = process::reap(tree.get().process.pid);
+
+  // Now advance time until the child is reaped.
+  while (status.isPending()) {
+    Clock::advance(Seconds(1));
+    Clock::settle();
+  }
+
+  // Check if the status is correct.
+  ASSERT_SOME(status.get());
+  int status_ = status.get().get();
+  ASSERT_TRUE(WIFEXITED(status_));
+  ASSERT_EQ(0, WEXITSTATUS(status_));
+
+  Clock::resume();
+}
+
+
+// This test checks that the we can reap a child process and obtain
+// the correct exit status.
+TEST(Reap, ChildProcess)
+{
+  ASSERT_TRUE(GTEST_IS_THREADSAFE);
+
+  // The child process sleeps and will be killed by the parent.
+  Try<ProcessTree> tree = Fork(None(),
+                               Exec("sleep 10"))();
+
+  ASSERT_SOME(tree);
+  pid_t child = tree.get();
+
+  // Reap the child process.
+  Future<Option<int> > status = process::reap(child);
+
+  // Now kill the child.
+  EXPECT_EQ(0, kill(child, SIGKILL));
+
+  Clock::pause();
+
+  // Now advance time until the reaper reaps the child.
+  while (status.isPending()) {
+    Clock::advance(Seconds(1));
+    Clock::settle();
+  }
+
+  AWAIT_READY(status);
+
+  // Check if the status is correct.
+  ASSERT_SOME(status.get());
+  int status_ = status.get().get();
+  ASSERT_TRUE(WIFSIGNALED(status_));
+  ASSERT_EQ(SIGKILL, WTERMSIG(status_));
+
+  Clock::resume();
+}
+
+
+// Check that we can reap a child process that is already exited.
+TEST(Reap, TerminatedChildProcess)
+{
+  ASSERT_TRUE(GTEST_IS_THREADSAFE);
+
+  // The child process immediately exits.
+  Try<ProcessTree> tree = Fork(None(),
+                               Exec("exit 0"))();
+
+  ASSERT_SOME(tree);
+  pid_t child = tree.get();
+
+  ASSERT_SOME(os::process(child));
+
+  // Make sure the process is transitioned into the zombie
+  // state before we reap it.
+  while (true) {
+    const Result<os::Process>& process = os::process(child);
+    ASSERT_SOME(process) << "Process " << child << " reaped unexpectedly";
+
+    if (process.get().zombie) {
+      break;
+    }
+
+    os::sleep(Milliseconds(1));
+  }
+
+  // Now that it's terminated, attempt to reap it.
+  Future<Option<int> > status = process::reap(child);
+
+  // Advance time until the reaper sends the notification.
+  Clock::pause();
+  while (status.isPending()) {
+    Clock::advance(Seconds(1));
+    Clock::settle();
+  }
+
+  AWAIT_READY(status);
+
+  // Expect to get the correct status.
+  ASSERT_SOME(status.get());
+
+  int status_ = status.get().get();
+  ASSERT_TRUE(WIFEXITED(status_));
+  ASSERT_EQ(0, WEXITSTATUS(status_));
+
+  Clock::resume();
+}


[4/6] git commit: Updated the semantics of Clock::resume.

Posted by bm...@apache.org.
Updated the semantics of Clock::resume.

Now Clock::resume results in the resumption of time from the state
of Time when resume is called, rather than re-winding time back to
real time.

This requires an update to Time::create to take into account the
advanced state of time.

Review: https://reviews.apache.org/r/17480


Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/84b86ff3
Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/84b86ff3
Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/84b86ff3

Branch: refs/heads/master
Commit: 84b86ff3381c79b76dc620eca60b9250366e519a
Parents: 035d913
Author: Benjamin Mahler <bm...@twitter.com>
Authored: Tue Jan 28 18:49:36 2014 -0800
Committer: Benjamin Mahler <bm...@twitter.com>
Committed: Wed Jan 29 17:12:28 2014 -0800

----------------------------------------------------------------------
 3rdparty/libprocess/include/process/time.hpp | 10 +---------
 3rdparty/libprocess/src/process.cpp          | 18 +++++++++++++++++-
 2 files changed, 18 insertions(+), 10 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/mesos/blob/84b86ff3/3rdparty/libprocess/include/process/time.hpp
----------------------------------------------------------------------
diff --git a/3rdparty/libprocess/include/process/time.hpp b/3rdparty/libprocess/include/process/time.hpp
index 307fd2c..26cec3d 100644
--- a/3rdparty/libprocess/include/process/time.hpp
+++ b/3rdparty/libprocess/include/process/time.hpp
@@ -21,15 +21,7 @@ public:
   static Time EPOCH;
   static Time MAX;
 
-  static Try<Time> create(double secs)
-  {
-    Try<Duration> duration = Duration::create(secs);
-    if (duration.isSome()) {
-      return Time(duration.get());
-    } else {
-      return Error("Argument too large for Time: " + duration.error());
-    }
-  }
+  static Try<Time> create(double seconds);
 
   Duration duration() const { return sinceEpoch; }
 

http://git-wip-us.apache.org/repos/asf/mesos/blob/84b86ff3/3rdparty/libprocess/src/process.cpp
----------------------------------------------------------------------
diff --git a/3rdparty/libprocess/src/process.cpp b/3rdparty/libprocess/src/process.cpp
index 7d8f00f..1083a35 100644
--- a/3rdparty/libprocess/src/process.cpp
+++ b/3rdparty/libprocess/src/process.cpp
@@ -605,6 +605,8 @@ map<ProcessBase*, Time>* currents = new map<ProcessBase*, Time>();
 Time initial = Time::EPOCH;
 Time current = Time::EPOCH;
 
+Duration advanced = Duration::zero();
+
 bool paused = false;
 
 } // namespace clock {
@@ -640,7 +642,7 @@ Time Clock::now(ProcessBase* process)
 
   // TODO(benh): Versus ev_now()?
   double d = ev_time();
-  Try<Time> time = Time::create(d);
+  Try<Time> time = Time::create(d); // Compensates for clock::advanced.
 
   // TODO(xujyan): Move CHECK_SOME to libprocess and add CHECK_SOME
   // here.
@@ -696,6 +698,7 @@ void Clock::advance(const Duration& duration)
 {
   synchronized (timeouts) {
     if (clock::paused) {
+      clock::advanced += duration;
       clock::current += duration;
       VLOG(2) << "Clock advanced ("  << duration << ") to " << clock::current;
       if (!update_timer) {
@@ -726,6 +729,7 @@ void Clock::update(const Time& time)
   synchronized (timeouts) {
     if (clock::paused) {
       if (clock::current < time) {
+        clock::advanced += (time - clock::current);
         clock::current = Time(time);
         VLOG(2) << "Clock updated to " << clock::current;
         if (!update_timer) {
@@ -764,6 +768,18 @@ void Clock::settle()
 }
 
 
+Try<Time> Time::create(double seconds)
+{
+  Try<Duration> duration = Duration::create(seconds);
+  if (duration.isSome()) {
+    // In production code, clock::advanced will always be zero!
+    return Time(duration.get() + clock::advanced);
+  } else {
+    return Error("Argument too large for Time: " + duration.error());
+  }
+}
+
+
 static Message* encode(const UPID& from,
                        const UPID& to,
                        const string& name,