You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@mesos.apache.org by bm...@apache.org on 2014/01/30 03:05:23 UTC

[2/6] git commit: Added a child Reaper utility in libprocess.

Added a child Reaper utility in libprocess.

Review: https://reviews.apache.org/r/17304


Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/81d01813
Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/81d01813
Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/81d01813

Branch: refs/heads/master
Commit: 81d018136890406fdb442be8ac51d70225b9f65e
Parents: 0697d97
Author: Benjamin Mahler <bm...@twitter.com>
Authored: Thu Jan 23 14:51:06 2014 -0800
Committer: Benjamin Mahler <bm...@twitter.com>
Committed: Wed Jan 29 17:12:28 2014 -0800

----------------------------------------------------------------------
 3rdparty/libprocess/Makefile.am              |   3 +
 3rdparty/libprocess/include/process/reap.hpp |  19 +++
 3rdparty/libprocess/src/reap.cpp             | 127 +++++++++++++++
 3rdparty/libprocess/src/tests/reap_tests.cpp | 179 ++++++++++++++++++++++
 4 files changed, 328 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/mesos/blob/81d01813/3rdparty/libprocess/Makefile.am
----------------------------------------------------------------------
diff --git a/3rdparty/libprocess/Makefile.am b/3rdparty/libprocess/Makefile.am
index 40f01a7..4a35f91 100644
--- a/3rdparty/libprocess/Makefile.am
+++ b/3rdparty/libprocess/Makefile.am
@@ -34,6 +34,7 @@ libprocess_la_SOURCES =		\
   src/latch.cpp			\
   src/pid.cpp			\
   src/process.cpp		\
+  src/reap.cpp			\
   src/statistics.cpp		\
   src/synchronized.hpp
 
@@ -87,6 +88,7 @@ libprocess_la_SOURCES +=					\
   $(top_srcdir)/include/process/process.hpp			\
   $(top_srcdir)/include/process/profiler.hpp			\
   $(top_srcdir)/include/process/protobuf.hpp			\
+  $(top_srcdir)/include/process/reap.hpp			\
   $(top_srcdir)/include/process/run.hpp				\
   $(top_srcdir)/include/process/shared.hpp			\
   $(top_srcdir)/include/process/socket.hpp			\
@@ -106,6 +108,7 @@ tests_SOURCES =							\
   src/tests/main.cpp						\
   src/tests/owned_tests.cpp					\
   src/tests/process_tests.cpp					\
+  src/tests/reap_tests.cpp					\
   src/tests/shared_tests.cpp					\
   src/tests/statistics_tests.cpp				\
   src/tests/timeseries_tests.cpp				\

http://git-wip-us.apache.org/repos/asf/mesos/blob/81d01813/3rdparty/libprocess/include/process/reap.hpp
----------------------------------------------------------------------
diff --git a/3rdparty/libprocess/include/process/reap.hpp b/3rdparty/libprocess/include/process/reap.hpp
new file mode 100644
index 0000000..dcc6c72
--- /dev/null
+++ b/3rdparty/libprocess/include/process/reap.hpp
@@ -0,0 +1,19 @@
+#ifndef __PROCESS_REAP_HPP__
+#define __PROCESS_REAP_HPP__
+
+#include <sys/types.h>
+
+#include <stout/option.hpp>
+
+namespace process {
+
+// Returns the exit status of the specified process if and only if
+// the process is a direct child and it has not already been reaped.
+// Otherwise, returns None once the process has been reaped elsewhere
+// (or does not exist, which is indistinguishable from being reaped
+// elsewhere). This will never discard the returned future.
+Future<Option<int> > reap(pid_t pid);
+
+} // namespace process {
+
+#endif // __PROCESS_REAP_HPP__

http://git-wip-us.apache.org/repos/asf/mesos/blob/81d01813/3rdparty/libprocess/src/reap.cpp
----------------------------------------------------------------------
diff --git a/3rdparty/libprocess/src/reap.cpp b/3rdparty/libprocess/src/reap.cpp
new file mode 100644
index 0000000..274d129
--- /dev/null
+++ b/3rdparty/libprocess/src/reap.cpp
@@ -0,0 +1,127 @@
+#include <glog/logging.h>
+
+#include <sys/types.h>
+#include <sys/wait.h>
+
+#include <process/delay.hpp>
+#include <process/future.hpp>
+#include <process/id.hpp>
+#include <process/once.hpp>
+#include <process/owned.hpp>
+#include <process/reap.hpp>
+
+#include <stout/check.hpp>
+#include <stout/foreach.hpp>
+#include <stout/multihashmap.hpp>
+#include <stout/none.hpp>
+#include <stout/os.hpp>
+#include <stout/result.hpp>
+#include <stout/try.hpp>
+
+namespace process {
+
+
+// TODO(bmahler): This can be optimized to use a thread per pid, where
+// each thread makes a blocking call to waitpid. This eliminates the
+// unfortunate 1 second reap delay.
+
+class ReaperProcess : public Process<ReaperProcess>
+{
+public:
+  ReaperProcess() : ProcessBase(ID::generate("reaper")) {}
+
+  Future<Option<int> > reap(pid_t pid)
+  {
+    // Check to see if this pid exists.
+    const Result<os::Process>& process = os::process(pid);
+
+    if (process.isSome()) {
+      // The process exists, we add it to the promises map.
+      Owned<Promise<Option<int> > > promise(new Promise<Option<int> >());
+      promises.put(pid, promise);
+      return promise->future();
+    } else if (process.isNone()) {
+      return None();
+    } else {
+      return Failure(
+          "Failed to monitor process " + stringify(pid) + ": " +
+          process.error());
+    }
+  }
+
+protected:
+  virtual void initialize() { wait(); }
+
+  void wait()
+  {
+    // There are a few cases to consider here for each pid:
+    //   1) The process is our child. In this case, we will notify
+    //      with the exit status once it terminates.
+    //   2) The process exists but is not our child. In this case,
+    //      we'll notify with None() once it no longer exists, since
+    //      we cannot reap it.
+    //   3) The process does not exist, notify with None() since it
+    //      has likely been reaped elsewhere.
+
+    foreach (pid_t pid, promises.keys()) {
+      int status;
+      if (waitpid(pid, &status, WNOHANG) > 0) {
+        // Terminated child process.
+        notify(pid, status);
+      } else if (errno == ECHILD) {
+        // The process is not our child, or does not exist. We want to
+        // notify with None() once it no longer exists (reaped by
+        // someone else).
+        const Result<os::Process>& process = os::process(pid);
+
+        if (process.isError()) {
+          notify(pid, Error(process.error()));
+        } else if (process.isNone()) {
+          // The process has been reaped.
+          notify(pid, None());
+        }
+      }
+    }
+
+    delay(Seconds(1), self(), &ReaperProcess::wait); // Reap forever!
+  }
+
+  void notify(pid_t pid, Result<int> status)
+  {
+    foreach (const Owned<Promise<Option<int> > >& promise, promises.get(pid)) {
+      if (status.isError()) {
+        promise->fail(status.error());
+      } else if (status.isNone()) {
+        promise->set(Option<int>::none());
+      } else {
+        promise->set(Option<int>(status.get()));
+      }
+    }
+    promises.remove(pid);
+  }
+
+private:
+  multihashmap<pid_t, Owned<Promise<Option<int> > > > promises;
+};
+
+
+// Global reaper object.
+static ReaperProcess* reaper = NULL;
+
+
+Future<Option<int> > reap(pid_t pid)
+{
+  static Once* initialized = new Once();
+
+  if (!initialized->once()) {
+    reaper = new ReaperProcess();
+    spawn(reaper);
+    initialized->done();
+  }
+
+  CHECK_NOTNULL(reaper);
+
+  return dispatch(reaper, &ReaperProcess::reap, pid);
+}
+
+} // namespace process {

http://git-wip-us.apache.org/repos/asf/mesos/blob/81d01813/3rdparty/libprocess/src/tests/reap_tests.cpp
----------------------------------------------------------------------
diff --git a/3rdparty/libprocess/src/tests/reap_tests.cpp b/3rdparty/libprocess/src/tests/reap_tests.cpp
new file mode 100644
index 0000000..a18d54c
--- /dev/null
+++ b/3rdparty/libprocess/src/tests/reap_tests.cpp
@@ -0,0 +1,179 @@
+#include <signal.h>
+#include <unistd.h>
+
+#include <sys/wait.h>
+
+#include <gtest/gtest.h>
+
+#include <process/clock.hpp>
+#include <process/dispatch.hpp>
+#include <process/gmock.hpp>
+#include <process/gtest.hpp>
+#include <process/reap.hpp>
+
+#include <stout/exit.hpp>
+#include <stout/gtest.hpp>
+#include <stout/os/fork.hpp>
+#include <stout/os/pstree.hpp>
+#include <stout/try.hpp>
+
+using namespace process;
+
+using os::Exec;
+using os::Fork;
+using os::ProcessTree;
+
+using testing::_;
+using testing::DoDefault;
+
+
+// This test checks that we can reap a non-child process, in terms
+// of receiving the termination notification.
+TEST(Reap, NonChildProcess)
+{
+  // The child process creates a grandchild and then exits. The
+  // grandchild sleeps for 10 seconds. The process tree looks like:
+  //  -+- child exit 0
+  //   \-+- grandchild sleep 10
+
+  // After the child exits, the grandchild is going to be re-parented
+  // by 'init', like this:
+  //  -+- child (exit 0)
+  //  -+- grandchild sleep 10
+  Try<ProcessTree> tree = Fork(None(),
+                               Fork(Exec("sleep 10")),
+                               Exec("exit 0"))();
+  ASSERT_SOME(tree);
+  ASSERT_EQ(1u, tree.get().children.size());
+  pid_t grandchild = tree.get().children.front();
+
+  // Reap the grandchild process.
+  Future<Option<int> > status = process::reap(grandchild);
+
+  EXPECT_TRUE(status.isPending());
+
+  // Now kill the grandchild.
+  // NOTE: We send a SIGKILL here because sometimes the grandchild
+  // process seems to be in a hung state and not responding to
+  // SIGTERM/SIGINT.
+  EXPECT_EQ(0, kill(grandchild, SIGKILL));
+
+  Clock::pause();
+
+  // Now advance time until the Reaper reaps the grandchild.
+  while (status.isPending()) {
+    Clock::advance(Seconds(1));
+    Clock::settle();
+  }
+
+  AWAIT_READY(status);
+
+  // The status is None because pid is not an immediate child.
+  ASSERT_NONE(status.get()) << status.get().get();
+
+  // Reap the child as well to clean up after ourselves.
+  status = process::reap(tree.get().process.pid);
+
+  // Now advance time until the child is reaped.
+  while (status.isPending()) {
+    Clock::advance(Seconds(1));
+    Clock::settle();
+  }
+
+  // Check if the status is correct.
+  ASSERT_SOME(status.get());
+  int status_ = status.get().get();
+  ASSERT_TRUE(WIFEXITED(status_));
+  ASSERT_EQ(0, WEXITSTATUS(status_));
+
+  Clock::resume();
+}
+
+
+// This test checks that the we can reap a child process and obtain
+// the correct exit status.
+TEST(Reap, ChildProcess)
+{
+  ASSERT_TRUE(GTEST_IS_THREADSAFE);
+
+  // The child process sleeps and will be killed by the parent.
+  Try<ProcessTree> tree = Fork(None(),
+                               Exec("sleep 10"))();
+
+  ASSERT_SOME(tree);
+  pid_t child = tree.get();
+
+  // Reap the child process.
+  Future<Option<int> > status = process::reap(child);
+
+  // Now kill the child.
+  EXPECT_EQ(0, kill(child, SIGKILL));
+
+  Clock::pause();
+
+  // Now advance time until the reaper reaps the child.
+  while (status.isPending()) {
+    Clock::advance(Seconds(1));
+    Clock::settle();
+  }
+
+  AWAIT_READY(status);
+
+  // Check if the status is correct.
+  ASSERT_SOME(status.get());
+  int status_ = status.get().get();
+  ASSERT_TRUE(WIFSIGNALED(status_));
+  ASSERT_EQ(SIGKILL, WTERMSIG(status_));
+
+  Clock::resume();
+}
+
+
+// Check that we can reap a child process that is already exited.
+TEST(Reap, TerminatedChildProcess)
+{
+  ASSERT_TRUE(GTEST_IS_THREADSAFE);
+
+  // The child process immediately exits.
+  Try<ProcessTree> tree = Fork(None(),
+                               Exec("exit 0"))();
+
+  ASSERT_SOME(tree);
+  pid_t child = tree.get();
+
+  ASSERT_SOME(os::process(child));
+
+  // Make sure the process is transitioned into the zombie
+  // state before we reap it.
+  while (true) {
+    const Result<os::Process>& process = os::process(child);
+    ASSERT_SOME(process) << "Process " << child << " reaped unexpectedly";
+
+    if (process.get().zombie) {
+      break;
+    }
+
+    os::sleep(Milliseconds(1));
+  }
+
+  // Now that it's terminated, attempt to reap it.
+  Future<Option<int> > status = process::reap(child);
+
+  // Advance time until the reaper sends the notification.
+  Clock::pause();
+  while (status.isPending()) {
+    Clock::advance(Seconds(1));
+    Clock::settle();
+  }
+
+  AWAIT_READY(status);
+
+  // Expect to get the correct status.
+  ASSERT_SOME(status.get());
+
+  int status_ = status.get().get();
+  ASSERT_TRUE(WIFEXITED(status_));
+  ASSERT_EQ(0, WEXITSTATUS(status_));
+
+  Clock::resume();
+}