You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@mesos.apache.org by bm...@apache.org on 2014/01/30 03:05:23 UTC
[2/6] git commit: Added a child Reaper utility in libprocess.
Added a child Reaper utility in libprocess.
Review: https://reviews.apache.org/r/17304
Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/81d01813
Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/81d01813
Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/81d01813
Branch: refs/heads/master
Commit: 81d018136890406fdb442be8ac51d70225b9f65e
Parents: 0697d97
Author: Benjamin Mahler <bm...@twitter.com>
Authored: Thu Jan 23 14:51:06 2014 -0800
Committer: Benjamin Mahler <bm...@twitter.com>
Committed: Wed Jan 29 17:12:28 2014 -0800
----------------------------------------------------------------------
3rdparty/libprocess/Makefile.am | 3 +
3rdparty/libprocess/include/process/reap.hpp | 19 +++
3rdparty/libprocess/src/reap.cpp | 127 +++++++++++++++
3rdparty/libprocess/src/tests/reap_tests.cpp | 179 ++++++++++++++++++++++
4 files changed, 328 insertions(+)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/mesos/blob/81d01813/3rdparty/libprocess/Makefile.am
----------------------------------------------------------------------
diff --git a/3rdparty/libprocess/Makefile.am b/3rdparty/libprocess/Makefile.am
index 40f01a7..4a35f91 100644
--- a/3rdparty/libprocess/Makefile.am
+++ b/3rdparty/libprocess/Makefile.am
@@ -34,6 +34,7 @@ libprocess_la_SOURCES = \
src/latch.cpp \
src/pid.cpp \
src/process.cpp \
+ src/reap.cpp \
src/statistics.cpp \
src/synchronized.hpp
@@ -87,6 +88,7 @@ libprocess_la_SOURCES += \
$(top_srcdir)/include/process/process.hpp \
$(top_srcdir)/include/process/profiler.hpp \
$(top_srcdir)/include/process/protobuf.hpp \
+ $(top_srcdir)/include/process/reap.hpp \
$(top_srcdir)/include/process/run.hpp \
$(top_srcdir)/include/process/shared.hpp \
$(top_srcdir)/include/process/socket.hpp \
@@ -106,6 +108,7 @@ tests_SOURCES = \
src/tests/main.cpp \
src/tests/owned_tests.cpp \
src/tests/process_tests.cpp \
+ src/tests/reap_tests.cpp \
src/tests/shared_tests.cpp \
src/tests/statistics_tests.cpp \
src/tests/timeseries_tests.cpp \
http://git-wip-us.apache.org/repos/asf/mesos/blob/81d01813/3rdparty/libprocess/include/process/reap.hpp
----------------------------------------------------------------------
diff --git a/3rdparty/libprocess/include/process/reap.hpp b/3rdparty/libprocess/include/process/reap.hpp
new file mode 100644
index 0000000..dcc6c72
--- /dev/null
+++ b/3rdparty/libprocess/include/process/reap.hpp
@@ -0,0 +1,19 @@
+#ifndef __PROCESS_REAP_HPP__
+#define __PROCESS_REAP_HPP__
+
+#include <sys/types.h>
+
+#include <stout/option.hpp>
+
+namespace process {
+
+// Returns the exit status of the specified process if and only if
+// the process is a direct child and it has not already been reaped.
+// Otherwise, returns None once the process has been reaped elsewhere
+// (or does not exist, which is indistinguishable from being reaped
+// elsewhere). This will never discard the returned future.
+Future<Option<int> > reap(pid_t pid);
+
+} // namespace process {
+
+#endif // __PROCESS_REAP_HPP__
http://git-wip-us.apache.org/repos/asf/mesos/blob/81d01813/3rdparty/libprocess/src/reap.cpp
----------------------------------------------------------------------
diff --git a/3rdparty/libprocess/src/reap.cpp b/3rdparty/libprocess/src/reap.cpp
new file mode 100644
index 0000000..274d129
--- /dev/null
+++ b/3rdparty/libprocess/src/reap.cpp
@@ -0,0 +1,127 @@
+#include <glog/logging.h>
+
+#include <sys/types.h>
+#include <sys/wait.h>
+
+#include <process/delay.hpp>
+#include <process/future.hpp>
+#include <process/id.hpp>
+#include <process/once.hpp>
+#include <process/owned.hpp>
+#include <process/reap.hpp>
+
+#include <stout/check.hpp>
+#include <stout/foreach.hpp>
+#include <stout/multihashmap.hpp>
+#include <stout/none.hpp>
+#include <stout/os.hpp>
+#include <stout/result.hpp>
+#include <stout/try.hpp>
+
+namespace process {
+
+
+// TODO(bmahler): This can be optimized to use a thread per pid, where
+// each thread makes a blocking call to waitpid. This eliminates the
+// unfortunate 1 second reap delay.
+
+class ReaperProcess : public Process<ReaperProcess>
+{
+public:
+ ReaperProcess() : ProcessBase(ID::generate("reaper")) {}
+
+ Future<Option<int> > reap(pid_t pid)
+ {
+ // Check to see if this pid exists.
+ const Result<os::Process>& process = os::process(pid);
+
+ if (process.isSome()) {
+ // The process exists, we add it to the promises map.
+ Owned<Promise<Option<int> > > promise(new Promise<Option<int> >());
+ promises.put(pid, promise);
+ return promise->future();
+ } else if (process.isNone()) {
+ return None();
+ } else {
+ return Failure(
+ "Failed to monitor process " + stringify(pid) + ": " +
+ process.error());
+ }
+ }
+
+protected:
+ virtual void initialize() { wait(); }
+
+ void wait()
+ {
+ // There are a few cases to consider here for each pid:
+ // 1) The process is our child. In this case, we will notify
+ // with the exit status once it terminates.
+ // 2) The process exists but is not our child. In this case,
+ // we'll notify with None() once it no longer exists, since
+ // we cannot reap it.
+ // 3) The process does not exist, notify with None() since it
+ // has likely been reaped elsewhere.
+
+ foreach (pid_t pid, promises.keys()) {
+ int status;
+ if (waitpid(pid, &status, WNOHANG) > 0) {
+ // Terminated child process.
+ notify(pid, status);
+ } else if (errno == ECHILD) {
+ // The process is not our child, or does not exist. We want to
+ // notify with None() once it no longer exists (reaped by
+ // someone else).
+ const Result<os::Process>& process = os::process(pid);
+
+ if (process.isError()) {
+ notify(pid, Error(process.error()));
+ } else if (process.isNone()) {
+ // The process has been reaped.
+ notify(pid, None());
+ }
+ }
+ }
+
+ delay(Seconds(1), self(), &ReaperProcess::wait); // Reap forever!
+ }
+
+ void notify(pid_t pid, Result<int> status)
+ {
+ foreach (const Owned<Promise<Option<int> > >& promise, promises.get(pid)) {
+ if (status.isError()) {
+ promise->fail(status.error());
+ } else if (status.isNone()) {
+ promise->set(Option<int>::none());
+ } else {
+ promise->set(Option<int>(status.get()));
+ }
+ }
+ promises.remove(pid);
+ }
+
+private:
+ multihashmap<pid_t, Owned<Promise<Option<int> > > > promises;
+};
+
+
+// Global reaper object.
+static ReaperProcess* reaper = NULL;
+
+
+Future<Option<int> > reap(pid_t pid)
+{
+ static Once* initialized = new Once();
+
+ if (!initialized->once()) {
+ reaper = new ReaperProcess();
+ spawn(reaper);
+ initialized->done();
+ }
+
+ CHECK_NOTNULL(reaper);
+
+ return dispatch(reaper, &ReaperProcess::reap, pid);
+}
+
+} // namespace process {
http://git-wip-us.apache.org/repos/asf/mesos/blob/81d01813/3rdparty/libprocess/src/tests/reap_tests.cpp
----------------------------------------------------------------------
diff --git a/3rdparty/libprocess/src/tests/reap_tests.cpp b/3rdparty/libprocess/src/tests/reap_tests.cpp
new file mode 100644
index 0000000..a18d54c
--- /dev/null
+++ b/3rdparty/libprocess/src/tests/reap_tests.cpp
@@ -0,0 +1,179 @@
+#include <signal.h>
+#include <unistd.h>
+
+#include <sys/wait.h>
+
+#include <gtest/gtest.h>
+
+#include <process/clock.hpp>
+#include <process/dispatch.hpp>
+#include <process/gmock.hpp>
+#include <process/gtest.hpp>
+#include <process/reap.hpp>
+
+#include <stout/exit.hpp>
+#include <stout/gtest.hpp>
+#include <stout/os/fork.hpp>
+#include <stout/os/pstree.hpp>
+#include <stout/try.hpp>
+
+using namespace process;
+
+using os::Exec;
+using os::Fork;
+using os::ProcessTree;
+
+using testing::_;
+using testing::DoDefault;
+
+
+// This test checks that we can reap a non-child process, in terms
+// of receiving the termination notification.
+TEST(Reap, NonChildProcess)
+{
+ // The child process creates a grandchild and then exits. The
+ // grandchild sleeps for 10 seconds. The process tree looks like:
+ // -+- child exit 0
+ // \-+- grandchild sleep 10
+
+ // After the child exits, the grandchild is going to be re-parented
+ // by 'init', like this:
+ // -+- child (exit 0)
+ // -+- grandchild sleep 10
+ Try<ProcessTree> tree = Fork(None(),
+ Fork(Exec("sleep 10")),
+ Exec("exit 0"))();
+ ASSERT_SOME(tree);
+ ASSERT_EQ(1u, tree.get().children.size());
+ pid_t grandchild = tree.get().children.front();
+
+ // Reap the grandchild process.
+ Future<Option<int> > status = process::reap(grandchild);
+
+ EXPECT_TRUE(status.isPending());
+
+ // Now kill the grandchild.
+ // NOTE: We send a SIGKILL here because sometimes the grandchild
+ // process seems to be in a hung state and not responding to
+ // SIGTERM/SIGINT.
+ EXPECT_EQ(0, kill(grandchild, SIGKILL));
+
+ Clock::pause();
+
+ // Now advance time until the Reaper reaps the grandchild.
+ while (status.isPending()) {
+ Clock::advance(Seconds(1));
+ Clock::settle();
+ }
+
+ AWAIT_READY(status);
+
+ // The status is None because pid is not an immediate child.
+ ASSERT_NONE(status.get()) << status.get().get();
+
+ // Reap the child as well to clean up after ourselves.
+ status = process::reap(tree.get().process.pid);
+
+ // Now advance time until the child is reaped.
+ while (status.isPending()) {
+ Clock::advance(Seconds(1));
+ Clock::settle();
+ }
+
+ // Check if the status is correct.
+ ASSERT_SOME(status.get());
+ int status_ = status.get().get();
+ ASSERT_TRUE(WIFEXITED(status_));
+ ASSERT_EQ(0, WEXITSTATUS(status_));
+
+ Clock::resume();
+}
+
+
+// This test checks that the we can reap a child process and obtain
+// the correct exit status.
+TEST(Reap, ChildProcess)
+{
+ ASSERT_TRUE(GTEST_IS_THREADSAFE);
+
+ // The child process sleeps and will be killed by the parent.
+ Try<ProcessTree> tree = Fork(None(),
+ Exec("sleep 10"))();
+
+ ASSERT_SOME(tree);
+ pid_t child = tree.get();
+
+ // Reap the child process.
+ Future<Option<int> > status = process::reap(child);
+
+ // Now kill the child.
+ EXPECT_EQ(0, kill(child, SIGKILL));
+
+ Clock::pause();
+
+ // Now advance time until the reaper reaps the child.
+ while (status.isPending()) {
+ Clock::advance(Seconds(1));
+ Clock::settle();
+ }
+
+ AWAIT_READY(status);
+
+ // Check if the status is correct.
+ ASSERT_SOME(status.get());
+ int status_ = status.get().get();
+ ASSERT_TRUE(WIFSIGNALED(status_));
+ ASSERT_EQ(SIGKILL, WTERMSIG(status_));
+
+ Clock::resume();
+}
+
+
+// Check that we can reap a child process that is already exited.
+TEST(Reap, TerminatedChildProcess)
+{
+ ASSERT_TRUE(GTEST_IS_THREADSAFE);
+
+ // The child process immediately exits.
+ Try<ProcessTree> tree = Fork(None(),
+ Exec("exit 0"))();
+
+ ASSERT_SOME(tree);
+ pid_t child = tree.get();
+
+ ASSERT_SOME(os::process(child));
+
+ // Make sure the process is transitioned into the zombie
+ // state before we reap it.
+ while (true) {
+ const Result<os::Process>& process = os::process(child);
+ ASSERT_SOME(process) << "Process " << child << " reaped unexpectedly";
+
+ if (process.get().zombie) {
+ break;
+ }
+
+ os::sleep(Milliseconds(1));
+ }
+
+ // Now that it's terminated, attempt to reap it.
+ Future<Option<int> > status = process::reap(child);
+
+ // Advance time until the reaper sends the notification.
+ Clock::pause();
+ while (status.isPending()) {
+ Clock::advance(Seconds(1));
+ Clock::settle();
+ }
+
+ AWAIT_READY(status);
+
+ // Expect to get the correct status.
+ ASSERT_SOME(status.get());
+
+ int status_ = status.get().get();
+ ASSERT_TRUE(WIFEXITED(status_));
+ ASSERT_EQ(0, WEXITSTATUS(status_));
+
+ Clock::resume();
+}