You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@mesos.apache.org by bm...@apache.org on 2014/01/30 03:05:22 UTC
[1/6] git commit: Added a missing lock acquisition in /__processes__.
Updated Branches:
refs/heads/master 2aad6c9ae -> 0478c7f92
Added a missing lock acquisition in /__processes__.
Review: https://reviews.apache.org/r/17519
Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/035d9131
Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/035d9131
Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/035d9131
Branch: refs/heads/master
Commit: 035d91314ca5bd20c48af13374b5288d5151b9bd
Parents: 2aad6c9
Author: Benjamin Mahler <bm...@twitter.com>
Authored: Wed Jan 29 11:42:08 2014 -0800
Committer: Benjamin Mahler <bm...@twitter.com>
Committed: Wed Jan 29 17:09:33 2014 -0800
----------------------------------------------------------------------
3rdparty/libprocess/src/process.cpp | 10 +++++++---
1 file changed, 7 insertions(+), 3 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/mesos/blob/035d9131/3rdparty/libprocess/src/process.cpp
----------------------------------------------------------------------
diff --git a/3rdparty/libprocess/src/process.cpp b/3rdparty/libprocess/src/process.cpp
index c28fc35..7d8f00f 100644
--- a/3rdparty/libprocess/src/process.cpp
+++ b/3rdparty/libprocess/src/process.cpp
@@ -2922,7 +2922,7 @@ Future<Response> ProcessManager::__processes__(const Request&)
JSON::Array array;
synchronized (processes) {
- foreachvalue (const ProcessBase* process, process_manager->processes) {
+ foreachvalue (ProcessBase* process, process_manager->processes) {
JSON::Object object;
object.values["id"] = process->pid.id;
@@ -2984,9 +2984,13 @@ Future<Response> ProcessManager::__processes__(const Request&)
JSON::Array* events;
} visitor(&events);
- foreach (Event* event, process->events) {
- event->visit(&visitor);
+ process->lock();
+ {
+ foreach (Event* event, process->events) {
+ event->visit(&visitor);
+ }
}
+ process->unlock();
object.values["events"] = events;
array.values.push_back(object);
[3/6] git commit: Fixed the use of a raw unix time in the Slave.
Posted by bm...@apache.org.
Fixed the use of a raw unix time in the Slave.
This updates the slave to use Time::create to ensure the modification
time matches the current time frame in libprocess.
Review: https://reviews.apache.org/r/17481
Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/0697d97a
Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/0697d97a
Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/0697d97a
Branch: refs/heads/master
Commit: 0697d97a687e9832e5a47a84d11bce236f9b8799
Parents: 84b86ff
Author: Benjamin Mahler <bm...@twitter.com>
Authored: Tue Jan 28 18:51:34 2014 -0800
Committer: Benjamin Mahler <bm...@twitter.com>
Committed: Wed Jan 29 17:12:28 2014 -0800
----------------------------------------------------------------------
src/slave/slave.cpp | 10 ++++++++--
1 file changed, 8 insertions(+), 2 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/mesos/blob/0697d97a/src/slave/slave.cpp
----------------------------------------------------------------------
diff --git a/src/slave/slave.cpp b/src/slave/slave.cpp
index 2d21e16..a97b1d5 100644
--- a/src/slave/slave.cpp
+++ b/src/slave/slave.cpp
@@ -31,6 +31,7 @@
#include <process/delay.hpp>
#include <process/dispatch.hpp>
#include <process/id.hpp>
+#include <process/time.hpp>
#include <stout/bytes.hpp>
#include <stout/check.hpp>
@@ -2858,9 +2859,14 @@ Future<Nothing> Slave::garbageCollect(const string& path)
return Failure(mtime.error());
}
+ // It is unsafe for testing to use unix time directly, we must use
+ // Time::create to convert into a Time object that reflects the
+ // possibly advanced state state of the libprocess Clock.
+ Try<Time> time = Time::create(mtime.get());
+ CHECK_SOME(time);
+
// GC based on the modification time.
- Duration delay =
- flags.gc_delay - (Clock::now().duration() - Seconds(mtime.get()));
+ Duration delay = flags.gc_delay - (Clock::now() - time.get());
return gc.schedule(delay, path);
}
[6/6] git commit: Added an asynchronous subprocess utility.
Posted by bm...@apache.org.
Added an asynchronous subprocess utility.
Review: https://reviews.apache.org/r/17306
Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/0478c7f9
Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/0478c7f9
Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/0478c7f9
Branch: refs/heads/master
Commit: 0478c7f9292e75a30ef02327ff5698359d0c7235
Parents: c0af398
Author: Benjamin Mahler <bm...@twitter.com>
Authored: Thu Jan 23 22:36:53 2014 -0800
Committer: Benjamin Mahler <bm...@twitter.com>
Committed: Wed Jan 29 17:12:29 2014 -0800
----------------------------------------------------------------------
3rdparty/libprocess/Makefile.am | 2 +
.../libprocess/include/process/subprocess.hpp | 195 +++++++++++++++++++
.../libprocess/src/tests/subprocess_tests.cpp | 189 ++++++++++++++++++
3 files changed, 386 insertions(+)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/mesos/blob/0478c7f9/3rdparty/libprocess/Makefile.am
----------------------------------------------------------------------
diff --git a/3rdparty/libprocess/Makefile.am b/3rdparty/libprocess/Makefile.am
index 4a35f91..bbd17cc 100644
--- a/3rdparty/libprocess/Makefile.am
+++ b/3rdparty/libprocess/Makefile.am
@@ -93,6 +93,7 @@ libprocess_la_SOURCES += \
$(top_srcdir)/include/process/shared.hpp \
$(top_srcdir)/include/process/socket.hpp \
$(top_srcdir)/include/process/statistics.hpp \
+ $(top_srcdir)/include/process/subprocess.hpp \
$(top_srcdir)/include/process/time.hpp \
$(top_srcdir)/include/process/timeout.hpp \
$(top_srcdir)/include/process/timer.hpp
@@ -111,6 +112,7 @@ tests_SOURCES = \
src/tests/reap_tests.cpp \
src/tests/shared_tests.cpp \
src/tests/statistics_tests.cpp \
+ src/tests/subprocess_tests.cpp \
src/tests/timeseries_tests.cpp \
src/tests/time_tests.cpp
http://git-wip-us.apache.org/repos/asf/mesos/blob/0478c7f9/3rdparty/libprocess/include/process/subprocess.hpp
----------------------------------------------------------------------
diff --git a/3rdparty/libprocess/include/process/subprocess.hpp b/3rdparty/libprocess/include/process/subprocess.hpp
new file mode 100644
index 0000000..db9c96b
--- /dev/null
+++ b/3rdparty/libprocess/include/process/subprocess.hpp
@@ -0,0 +1,195 @@
+#ifndef __PROCESS_SUBPROCESS_HPP__
+#define __PROCESS_SUBPROCESS_HPP__
+
+#include <unistd.h>
+
+#include <glog/logging.h>
+
+#include <sys/types.h>
+
+#include <string>
+
+#include <process/future.hpp>
+#include <process/reap.hpp>
+
+#include <stout/error.hpp>
+#include <stout/lambda.hpp>
+#include <stout/memory.hpp>
+#include <stout/nothing.hpp>
+#include <stout/option.hpp>
+#include <stout/try.hpp>
+
+namespace process {
+
+// Represents a fork() exec()ed subprocess. Access is provided to
+// the input / output of the process, as well as the exit status.
+// The input / output file descriptors are only closed after both:
+// 1. The subprocess has terminated, and
+// 2. There are no longer any references to the associated
+// Subprocess object.
+struct Subprocess
+{
+ // Returns the pid for the subprocess.
+ pid_t pid() const { return data->pid; }
+
+ // File descriptor accessors for input / output.
+ int in() const { return data->in; }
+ int out() const { return data->out; }
+ int err() const { return data->err; }
+
+ // Returns a future from process::reap of this subprocess.
+ // Discarding this future has no effect on the subprocess.
+ Future<Option<int> > status() const { return data->status; }
+
+private:
+ Subprocess() : data(new Data()) {}
+ friend Try<Subprocess> subprocess(const std::string&);
+
+ struct Data
+ {
+ ~Data()
+ {
+ os::close(in);
+ os::close(out);
+ os::close(err);
+ }
+
+ pid_t pid;
+
+ // NOTE: stdin, stdout, stderr are macros on some systems, hence
+ // these names instead.
+ int in;
+ int out;
+ int err;
+
+ Future<Option<int> > status;
+ };
+
+ memory::shared_ptr<Data> data;
+};
+
+
+namespace internal {
+
+// See the comment below as to why subprocess is passed to cleanup.
+void cleanup(
+ const Future<Option<int> >& result,
+ Promise<Option<int> >* promise,
+ const Subprocess& subprocess)
+{
+ CHECK(!result.isPending());
+ CHECK(!result.isDiscarded());
+
+ if (result.isFailed()) {
+ promise->fail(result.failure());
+ } else {
+ promise->set(result.get());
+ }
+
+ delete promise;
+}
+
+}
+
+
+// Runs the provided command in a subprocess.
+inline Try<Subprocess> subprocess(const std::string& command)
+{
+ // Create pipes for stdin, stdout, stderr.
+ // Index 0 is for reading, and index 1 is for writing.
+ int stdinPipe[2];
+ int stdoutPipe[2];
+ int stderrPipe[2];
+
+ if (pipe(stdinPipe) == -1) {
+ return ErrnoError("Failed to create pipe");
+ } else if (pipe(stdoutPipe) == -1) {
+ os::close(stdinPipe[0]);
+ os::close(stdinPipe[1]);
+ return ErrnoError("Failed to create pipe");
+ } else if (pipe(stderrPipe) == -1) {
+ os::close(stdinPipe[0]);
+ os::close(stdinPipe[1]);
+ os::close(stdoutPipe[0]);
+ os::close(stdoutPipe[1]);
+ return ErrnoError("Failed to create pipe");
+ }
+
+ pid_t pid;
+ if ((pid = fork()) == -1) {
+ os::close(stdinPipe[0]);
+ os::close(stdinPipe[1]);
+ os::close(stdoutPipe[0]);
+ os::close(stdoutPipe[1]);
+ os::close(stderrPipe[0]);
+ os::close(stderrPipe[1]);
+ return ErrnoError("Failed to fork");
+ }
+
+ Subprocess process;
+ process.data->pid = pid;
+
+ if (process.data->pid == 0) {
+ // Child.
+ // Close parent's end of the pipes.
+ os::close(stdinPipe[1]);
+ os::close(stdoutPipe[0]);
+ os::close(stderrPipe[0]);
+
+ // Make our pipes look like stdin, stderr, stdout before we exec.
+ while (dup2(stdinPipe[0], STDIN_FILENO) == -1 && errno == EINTR);
+ while (dup2(stdoutPipe[1], STDOUT_FILENO) == -1 && errno == EINTR);
+ while (dup2(stderrPipe[1], STDERR_FILENO) == -1 && errno == EINTR);
+
+ // Close the copies.
+ os::close(stdinPipe[0]);
+ os::close(stdoutPipe[1]);
+ os::close(stderrPipe[1]);
+
+ execl("/bin/sh", "sh", "-c", command.c_str(), (char *) NULL);
+
+ // Write the failure message in an async-signal safe manner,
+ // assuming strlen is async-signal safe or optimized out.
+ // In fact, it is highly unlikely that strlen would be
+ // implemented in an unsafe manner:
+ // http://austingroupbugs.net/view.php?id=692
+ const char* message = "Failed to execl '/bin sh -c ";
+ write(STDERR_FILENO, message, strlen(message));
+ write(STDERR_FILENO, command.c_str(), command.size());
+ write(STDERR_FILENO, "'\n", strlen("'\n"));
+
+ _exit(1);
+ }
+
+ // Parent.
+
+ // Close the child's end of the pipes.
+ os::close(stdinPipe[0]);
+ os::close(stdoutPipe[1]);
+ os::close(stderrPipe[1]);
+
+ process.data->in = stdinPipe[1];
+ process.data->out = stdoutPipe[0];
+ process.data->err = stderrPipe[0];
+
+ // Rather than directly exposing the future from process::reap, we
+ // must use an explicit promise so that we can ensure we can receive
+ // the termination signal. Otherwise, the caller can discard the
+ // reap future, and we will not know when it is safe to close the
+ // file descriptors.
+ Promise<Option<int> >* promise = new Promise<Option<int> >();
+ process.data->status = promise->future();
+
+ // We need to bind a copy of this Subprocess into the onAny callback
+ // below to ensure that we don't close the file descriptors before
+ // the subprocess has terminated (i.e., because the caller doesn't
+ // keep a copy of this Subprocess around themselves).
+ process::reap(process.data->pid)
+ .onAny(lambda::bind(internal::cleanup, lambda::_1, promise, process));
+
+ return process;
+}
+
+} // namespace process {
+
+#endif // __PROCESS_SUBPROCESS_HPP__
http://git-wip-us.apache.org/repos/asf/mesos/blob/0478c7f9/3rdparty/libprocess/src/tests/subprocess_tests.cpp
----------------------------------------------------------------------
diff --git a/3rdparty/libprocess/src/tests/subprocess_tests.cpp b/3rdparty/libprocess/src/tests/subprocess_tests.cpp
new file mode 100644
index 0000000..d2cf7f5
--- /dev/null
+++ b/3rdparty/libprocess/src/tests/subprocess_tests.cpp
@@ -0,0 +1,189 @@
+#include <signal.h>
+
+#include <gmock/gmock.h>
+
+#include <sys/types.h>
+
+#include <string>
+
+#include <process/gmock.hpp>
+#include <process/gtest.hpp>
+#include <process/io.hpp>
+#include <process/subprocess.hpp>
+
+#include <stout/foreach.hpp>
+#include <stout/gtest.hpp>
+#include <stout/list.hpp>
+#include <stout/os/read.hpp>
+
+using namespace process;
+
+using std::string;
+
+
+TEST(Subprocess, status)
+{
+ Clock::pause();
+
+ // Exit 0.
+ Try<Subprocess> s = subprocess("exit 0");
+
+ ASSERT_SOME(s);
+
+ // Advance time until the internal reaper reaps the subprocess.
+ while (s.get().status().isPending()) {
+ Clock::advance(Seconds(1));
+ Clock::settle();
+ }
+
+ AWAIT_ASSERT_READY(s.get().status());
+ ASSERT_SOME(s.get().status().get());
+
+ int status = s.get().status().get().get();
+ ASSERT_TRUE(WIFEXITED(status));
+ ASSERT_EQ(0, WEXITSTATUS(status));
+
+ // Exit 1.
+ s = subprocess("exit 1");
+
+ ASSERT_SOME(s);
+
+ // Advance time until the internal reaper reaps the subprocess.
+ while (s.get().status().isPending()) {
+ Clock::advance(Seconds(1));
+ Clock::settle();
+ }
+
+ AWAIT_ASSERT_READY(s.get().status());
+ ASSERT_SOME(s.get().status().get());
+
+ status = s.get().status().get().get();
+ ASSERT_TRUE(WIFEXITED(status));
+ ASSERT_EQ(1, WEXITSTATUS(status));
+
+ // SIGTERM.
+ s = subprocess("sleep 60");
+
+ ASSERT_SOME(s);
+
+ kill(s.get().pid(), SIGTERM);
+
+ // Advance time until the internal reaper reaps the subprocess.
+ while (s.get().status().isPending()) {
+ Clock::advance(Seconds(1));
+ Clock::settle();
+ }
+
+ AWAIT_ASSERT_READY(s.get().status());
+ ASSERT_SOME(s.get().status().get());
+
+ status = s.get().status().get().get();
+ ASSERT_TRUE(WIFSIGNALED(status));
+ ASSERT_EQ(SIGTERM, WTERMSIG(status));
+
+ // SIGKILL.
+ s = subprocess("sleep 60");
+
+ ASSERT_SOME(s);
+
+ kill(s.get().pid(), SIGKILL);
+
+ // Advance time until the internal reaper reaps the subprocess.
+ while (s.get().status().isPending()) {
+ Clock::advance(Seconds(1));
+ Clock::settle();
+ }
+
+ AWAIT_ASSERT_READY(s.get().status());
+ ASSERT_SOME(s.get().status().get());
+
+ status = s.get().status().get().get();
+ ASSERT_TRUE(WIFSIGNALED(status));
+ ASSERT_EQ(SIGKILL, WTERMSIG(status));
+
+ Clock::resume();
+}
+
+
+
+TEST(Subprocess, output)
+{
+ Clock::pause();
+
+ // Standard out.
+ Try<Subprocess> s = subprocess("echo hello");
+
+ ASSERT_SOME(s);
+
+ ASSERT_SOME(os::nonblock(s.get().out()));
+
+ AWAIT_EXPECT_EQ("hello\n", io::read(s.get().out()));
+
+ // Advance time until the internal reaper reaps the subprocess.
+ while (s.get().status().isPending()) {
+ Clock::advance(Seconds(1));
+ Clock::settle();
+ }
+
+ AWAIT_ASSERT_READY(s.get().status());
+ ASSERT_SOME(s.get().status().get());
+ int status = s.get().status().get().get();
+
+ ASSERT_TRUE(WIFEXITED(status));
+ ASSERT_EQ(0, WEXITSTATUS(status));
+
+ // Standard error.
+ s = subprocess("echo hello 1>&2");
+
+ ASSERT_SOME(s);
+
+ ASSERT_SOME(os::nonblock(s.get().err()));
+
+ AWAIT_EXPECT_EQ("hello\n", io::read(s.get().err()));
+
+ // Advance time until the internal reaper reaps the subprocess.
+ while (s.get().status().isPending()) {
+ Clock::advance(Seconds(1));
+ Clock::settle();
+ }
+
+ AWAIT_ASSERT_READY(s.get().status());
+ ASSERT_SOME(s.get().status().get());
+ status = s.get().status().get().get();
+
+ ASSERT_TRUE(WIFEXITED(status));
+ ASSERT_EQ(0, WEXITSTATUS(status));
+
+ Clock::resume();
+}
+
+
+TEST(Subprocess, input)
+{
+ Clock::pause();
+
+ Try<Subprocess> s = subprocess("read word ; echo $word");
+
+ ASSERT_SOME(s);
+
+ ASSERT_SOME(os::write(s.get().in(), "hello\n"));
+
+ ASSERT_SOME(os::nonblock(s.get().out()));
+
+ AWAIT_EXPECT_EQ("hello\n", io::read(s.get().out()));
+
+ // Advance time until the internal reaper reaps the subprocess.
+ while (s.get().status().isPending()) {
+ Clock::advance(Seconds(1));
+ Clock::settle();
+ }
+
+ AWAIT_ASSERT_READY(s.get().status());
+ ASSERT_SOME(s.get().status().get());
+ int status = s.get().status().get().get();
+
+ ASSERT_TRUE(WIFEXITED(status));
+ ASSERT_EQ(0, WEXITSTATUS(status));
+
+ Clock::resume();
+}
[5/6] git commit: Update the slave to use the libprocess Reaper.
Posted by bm...@apache.org.
Update the slave to use the libprocess Reaper.
Review: https://reviews.apache.org/r/17305
Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/c0af3984
Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/c0af3984
Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/c0af3984
Branch: refs/heads/master
Commit: c0af39845bd010a01bf2e77b2be7d05122f1126a
Parents: 81d0181
Author: Benjamin Mahler <bm...@twitter.com>
Authored: Thu Jan 23 14:58:36 2014 -0800
Committer: Benjamin Mahler <bm...@twitter.com>
Committed: Wed Jan 29 17:12:29 2014 -0800
----------------------------------------------------------------------
src/Makefile.am | 3 -
src/launcher/executor.cpp | 6 +-
src/slave/cgroups_isolator.cpp | 5 +-
src/slave/cgroups_isolator.hpp | 2 -
src/slave/process_isolator.cpp | 5 +-
src/slave/process_isolator.hpp | 2 -
src/slave/reaper.cpp | 158 -------------------------
src/slave/reaper.hpp | 92 ---------------
src/tests/environment.cpp | 12 ++
src/tests/reaper_tests.cpp | 201 --------------------------------
src/tests/slave_recovery_tests.cpp | 1 -
11 files changed, 20 insertions(+), 467 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/mesos/blob/c0af3984/src/Makefile.am
----------------------------------------------------------------------
diff --git a/src/Makefile.am b/src/Makefile.am
index d58b46e..c307068 100644
--- a/src/Makefile.am
+++ b/src/Makefile.am
@@ -178,7 +178,6 @@ libmesos_no_3rdparty_la_SOURCES = \
slave/http.cpp \
slave/isolator.cpp \
slave/process_isolator.cpp \
- slave/reaper.cpp \
slave/status_update_manager.cpp \
launcher/launcher.cpp \
exec/exec.cpp \
@@ -239,7 +238,6 @@ libmesos_no_3rdparty_la_SOURCES += common/attributes.hpp \
slave/paths.hpp slave/state.hpp \
slave/status_update_manager.hpp \
slave/process_isolator.hpp \
- slave/reaper.hpp \
slave/slave.hpp \
tests/environment.hpp tests/script.hpp \
tests/zookeeper.hpp tests/flags.hpp tests/utils.hpp \
@@ -857,7 +855,6 @@ mesos_tests_SOURCES = \
tests/monitor_tests.cpp \
tests/paths_tests.cpp \
tests/protobuf_io_tests.cpp \
- tests/reaper_tests.cpp \
tests/registrar_tests.cpp \
tests/resource_offers_tests.cpp \
tests/resources_tests.cpp \
http://git-wip-us.apache.org/repos/asf/mesos/blob/c0af3984/src/launcher/executor.cpp
----------------------------------------------------------------------
diff --git a/src/launcher/executor.cpp b/src/launcher/executor.cpp
index b73ab47..e30d77a 100644
--- a/src/launcher/executor.cpp
+++ b/src/launcher/executor.cpp
@@ -30,6 +30,7 @@
#include <process/defer.hpp>
#include <process/future.hpp>
#include <process/process.hpp>
+#include <process/reap.hpp>
#include <stout/duration.hpp>
#include <stout/lambda.hpp>
@@ -40,8 +41,6 @@
#include "logging/logging.hpp"
-#include "slave/reaper.hpp"
-
using process::wait; // Necessary on some OS's to disambiguate.
using std::cout;
@@ -186,7 +185,7 @@ public:
std::cout << "Forked command at " << pid << std::endl;
// Monitor this process.
- reaper.monitor(pid)
+ process::reap(pid)
.onAny(defer(self(),
&Self::reaped,
driver,
@@ -294,7 +293,6 @@ private:
bool launched;
bool killed;
pid_t pid;
- slave::Reaper reaper;
};
http://git-wip-us.apache.org/repos/asf/mesos/blob/c0af3984/src/slave/cgroups_isolator.cpp
----------------------------------------------------------------------
diff --git a/src/slave/cgroups_isolator.cpp b/src/slave/cgroups_isolator.cpp
index 5298f20..690ae81 100644
--- a/src/slave/cgroups_isolator.cpp
+++ b/src/slave/cgroups_isolator.cpp
@@ -32,6 +32,7 @@
#include <process/clock.hpp>
#include <process/defer.hpp>
#include <process/dispatch.hpp>
+#include <process/reap.hpp>
#include <stout/bytes.hpp>
#include <stout/check.hpp>
@@ -569,7 +570,7 @@ void CgroupsIsolator::launchExecutor(
// Store the pid of the leading process of the executor.
info->pid = pid;
- reaper.monitor(pid)
+ process::reap(pid)
.onAny(defer(PID<CgroupsIsolator>(this),
&CgroupsIsolator::reaped,
pid,
@@ -901,7 +902,7 @@ Future<Nothing> CgroupsIsolator::recover(
// Add the pid to the reaper to monitor exit status.
if (run.forkedPid.isSome()) {
- reaper.monitor(run.forkedPid.get())
+ process::reap(run.forkedPid.get())
.onAny(defer(PID<CgroupsIsolator>(this),
&CgroupsIsolator::reaped,
run.forkedPid.get(),
http://git-wip-us.apache.org/repos/asf/mesos/blob/c0af3984/src/slave/cgroups_isolator.hpp
----------------------------------------------------------------------
diff --git a/src/slave/cgroups_isolator.hpp b/src/slave/cgroups_isolator.hpp
index e86062e..1a66dc6 100644
--- a/src/slave/cgroups_isolator.hpp
+++ b/src/slave/cgroups_isolator.hpp
@@ -41,7 +41,6 @@
#include "slave/flags.hpp"
#include "slave/isolator.hpp"
-#include "slave/reaper.hpp"
#include "slave/slave.hpp"
namespace mesos {
@@ -290,7 +289,6 @@ private:
bool local;
process::PID<Slave> slave;
bool initialized;
- Reaper reaper;
// File descriptor to 'mesos/tasks' file in the cgroup on which we place
// an advisory lock.
http://git-wip-us.apache.org/repos/asf/mesos/blob/c0af3984/src/slave/process_isolator.cpp
----------------------------------------------------------------------
diff --git a/src/slave/process_isolator.cpp b/src/slave/process_isolator.cpp
index 0bc698f..748d9c2 100644
--- a/src/slave/process_isolator.cpp
+++ b/src/slave/process_isolator.cpp
@@ -29,6 +29,7 @@
#include <process/defer.hpp>
#include <process/dispatch.hpp>
#include <process/id.hpp>
+#include <process/reap.hpp>
#include <stout/check.hpp>
#include <stout/exit.hpp>
@@ -165,7 +166,7 @@ void ProcessIsolator::launchExecutor(
// Record the pid (should also be the pgid since we setsid below).
infos[frameworkId][executorId]->pid = pid;
- reaper.monitor(pid)
+ process::reap(pid)
.onAny(defer(PID<ProcessIsolator>(this),
&ProcessIsolator::reaped,
pid,
@@ -358,7 +359,7 @@ Future<Nothing> ProcessIsolator::recover(
// Add the pid to the reaper to monitor exit status.
if (run.forkedPid.isSome()) {
- reaper.monitor(run.forkedPid.get())
+ process::reap(run.forkedPid.get())
.onAny(defer(PID<ProcessIsolator>(this),
&ProcessIsolator::reaped,
run.forkedPid.get(),
http://git-wip-us.apache.org/repos/asf/mesos/blob/c0af3984/src/slave/process_isolator.hpp
----------------------------------------------------------------------
diff --git a/src/slave/process_isolator.hpp b/src/slave/process_isolator.hpp
index 4ae093f..bc52f33 100644
--- a/src/slave/process_isolator.hpp
+++ b/src/slave/process_isolator.hpp
@@ -34,7 +34,6 @@
#include "slave/flags.hpp"
#include "slave/isolator.hpp"
-#include "slave/reaper.hpp"
#include "slave/slave.hpp"
namespace mesos {
@@ -106,7 +105,6 @@ private:
bool local;
process::PID<Slave> slave;
bool initialized;
- Reaper reaper;
hashmap<FrameworkID, hashmap<ExecutorID, ProcessInfo*> > infos;
void reaped(pid_t pid, const Future<Option<int> >& status);
http://git-wip-us.apache.org/repos/asf/mesos/blob/c0af3984/src/slave/reaper.cpp
----------------------------------------------------------------------
diff --git a/src/slave/reaper.cpp b/src/slave/reaper.cpp
deleted file mode 100644
index 5eabbc3..0000000
--- a/src/slave/reaper.cpp
+++ /dev/null
@@ -1,158 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-#include <glog/logging.h>
-
-#include <sys/types.h>
-#include <sys/wait.h>
-
-#include <process/delay.hpp>
-#include <process/dispatch.hpp>
-#include <process/id.hpp>
-
-#include <stout/check.hpp>
-#include <stout/foreach.hpp>
-#include <stout/nothing.hpp>
-#include <stout/os.hpp>
-#include <stout/try.hpp>
-
-#include <stout/utils.hpp>
-
-#include "slave/reaper.hpp"
-
-using namespace process;
-
-namespace mesos {
-namespace internal {
-namespace slave {
-
-ReaperProcess::ReaperProcess()
- : ProcessBase(ID::generate("reaper")) {}
-
-
-Future<Option<int> > ReaperProcess::monitor(pid_t pid)
-{
- // Check to see if this pid exists.
- const Result<os::Process>& process = os::process(pid);
-
- if (process.isSome()) {
- // The process exists, we add it to the promises map.
- Owned<Promise<Option<int> > > promise(new Promise<Option<int> >());
- promises.put(pid, promise);
- return promise->future();
- } else if (process.isNone()) {
- LOG(WARNING) << "Cannot monitor process " << pid
- << " because it doesn't exist";
- return None();
- } else {
- return Failure(
- "Failed to monitor process " + stringify(pid) + ": " + process.error());
- }
-}
-
-
-void ReaperProcess::initialize()
-{
- reap();
-}
-
-
-void ReaperProcess::notify(pid_t pid, Option<int> status)
-{
- foreach (const Owned<Promise<Option<int> > >& promise, promises.get(pid)) {
- promise->set(status);
- }
- promises.remove(pid);
-}
-
-
-void ReaperProcess::reap()
-{
- // This method assumes that the registered PIDs are
- // 1) children: we can reap their exit status when they are
- // terminated.
- // 2) non-children: we cannot reap their exit status.
- // 3) nonexistent: already reaped elsewhere.
-
- // Reap all child processes first.
- pid_t pid;
- int status;
- while ((pid = waitpid(-1, &status, WNOHANG)) > 0) {
- // Ignore this if the process has only stopped.
- // Notify the "listeners" only if they have requested to monitor
- // this pid. Otherwise the status is discarded.
- // This means if a child pid is registered via the monitor() call
- // after it's reaped, status 'None' will be returned.
- if (!WIFSTOPPED(status) && promises.contains(pid)) {
- notify(pid, status);
- }
- }
-
- // Check whether any monitored process has exited and been reaped.
- // 1) If a child terminates before the foreach loop but after the
- // while loop, it won't be reaped until the next reap() cycle.
- // 2) If a non-child process terminates and is reaped elsewhere,
- // e.g. by init, we notify the listeners.
- // 3) If a non-child process terminates and is not yet reaped,
- // no notification is sent.
- // 4) If a child terminates before the while loop above, then we've
- // already reaped it and have the listeners notified!
- foreach (pid_t pid, utils::copy(promises.keys())) {
- const Result<os::Process>& process = os::process(pid);
-
- if (process.isError()) {
- LOG(ERROR) << "Failed to get process information for " << pid
- <<": " << process.error();
- notify(pid, None());
- } else if (process.isNone()) {
- // The process has been reaped.
- LOG(WARNING) << "Cannot get the exit status of process " << pid
- << " because it no longer exists";
- notify(pid, None());
- }
- }
-
- delay(Seconds(1), self(), &ReaperProcess::reap); // Reap forever!
-}
-
-
-Reaper::Reaper()
-{
- process = new ReaperProcess();
- spawn(process);
-}
-
-
-Reaper::~Reaper()
-{
- terminate(process);
- wait(process);
- delete process;
-}
-
-
-Future<Option<int> > Reaper::monitor(pid_t pid)
-{
- return dispatch(process, &ReaperProcess::monitor, pid);
-}
-
-} // namespace slave {
-} // namespace internal {
-} // namespace mesos {
-
-
http://git-wip-us.apache.org/repos/asf/mesos/blob/c0af3984/src/slave/reaper.hpp
----------------------------------------------------------------------
diff --git a/src/slave/reaper.hpp b/src/slave/reaper.hpp
deleted file mode 100644
index 9a31c75..0000000
--- a/src/slave/reaper.hpp
+++ /dev/null
@@ -1,92 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-#ifndef __REAPER_HPP__
-#define __REAPER_HPP__
-
-#include <list>
-#include <set>
-
-#include <process/future.hpp>
-#include <process/owned.hpp>
-#include <process/process.hpp>
-
-#include <stout/multihashmap.hpp>
-#include <stout/nothing.hpp>
-#include <stout/try.hpp>
-
-namespace mesos {
-namespace internal {
-namespace slave {
-
-// Forward declaration.
-class ReaperProcess;
-
-
-// TODO(vinod): Pull reaper into common or libprocess.
-class Reaper
-{
-public:
- Reaper();
- virtual ~Reaper();
-
- // Monitor the given process and notify the caller if it terminates
- // via a Future of the exit status.
- //
- // The exit status of 'pid' can only be correctly captured if the
- // calling process is the parent of 'pid' and the process hasn't
- // been reaped yet, otherwise 'None' is returned.
- // Note that an invalid pid does not cause a failed Future, but an
- // empty result ('None').
- process::Future<Option<int> > monitor(pid_t pid);
-
-private:
- ReaperProcess* process;
-};
-
-
-// Reaper implementation.
-class ReaperProcess : public process::Process<ReaperProcess>
-{
-public:
- ReaperProcess();
-
- process::Future<Option<int> > monitor(pid_t pid);
-
-protected:
- virtual void initialize();
-
- void reap();
-
- // The notification is sent only if the pid is explicitly registered
- // via the monitor() call.
- void notify(pid_t pid, Option<int> status);
-
-private:
- // Mapping from the monitored pid to all promises the pid exit
- // status should be sent to.
- multihashmap<
- pid_t, process::Owned<process::Promise<Option<int> > > > promises;
-};
-
-
-} // namespace slave {
-} // namespace internal {
-} // namespace mesos {
-
-#endif // __REAPER_HPP__
http://git-wip-us.apache.org/repos/asf/mesos/blob/c0af3984/src/tests/environment.cpp
----------------------------------------------------------------------
diff --git a/src/tests/environment.cpp b/src/tests/environment.cpp
index 6edce45..41b8a71 100644
--- a/src/tests/environment.cpp
+++ b/src/tests/environment.cpp
@@ -18,6 +18,10 @@
#include <gtest/gtest.h>
+#include <sys/wait.h>
+
+#include <string.h>
+
#include <list>
#include <string>
@@ -235,6 +239,14 @@ void Environment::TearDown()
}
}
directories.clear();
+
+ // Make sure we haven't left any child processes lying around.
+ Try<os::ProcessTree> pstree = os::pstree(0);
+
+ if (pstree.isSome() && !pstree.get().children.empty()) {
+ FAIL() << "Tests completed with child processes remaining:\n"
+ << pstree.get();
+ }
}
http://git-wip-us.apache.org/repos/asf/mesos/blob/c0af3984/src/tests/reaper_tests.cpp
----------------------------------------------------------------------
diff --git a/src/tests/reaper_tests.cpp b/src/tests/reaper_tests.cpp
deleted file mode 100644
index 608ec0e..0000000
--- a/src/tests/reaper_tests.cpp
+++ /dev/null
@@ -1,201 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-#include <signal.h>
-#include <unistd.h>
-
-#include <sys/wait.h>
-
-#include <gtest/gtest.h>
-
-#include <process/clock.hpp>
-#include <process/dispatch.hpp>
-#include <process/gmock.hpp>
-#include <process/gtest.hpp>
-
-#include <stout/exit.hpp>
-#include <stout/gtest.hpp>
-#include <stout/os.hpp>
-
-#include "slave/reaper.hpp"
-
-using namespace os;
-using namespace mesos;
-using namespace mesos::internal;
-using namespace mesos::internal::slave;
-
-using process::Clock;
-using process::Future;
-
-using testing::_;
-using testing::DoDefault;
-
-
-// This test checks that the Reaper can monitor a non-child process.
-TEST(ReaperTest, NonChildProcess)
-{
- // The child process creates a grandchild and then exits. The
- // grandchild sleeps for 10 seconds. The process tree looks like:
- // -+- child exit 0
- // \-+- grandchild sleep 10
-
- // After the child exits, the grandchild is going to be re-parented
- // by 'init', like this:
- // -+- child (exit 0)
- // -+- grandchild sleep 10
- Try<ProcessTree> tree = Fork(None(),
- Fork(Exec("sleep 10")),
- Exec("exit 0"))();
- ASSERT_SOME(tree);
- ASSERT_EQ(1u, tree.get().children.size());
- pid_t grandchild = tree.get().children.front();
-
- Reaper reaper;
-
- Future<Nothing> monitor = FUTURE_DISPATCH(_, &ReaperProcess::monitor);
-
- // Ask the reaper to monitor the grand child process.
- Future<Option<int> > status = reaper.monitor(grandchild);
-
- AWAIT_READY(monitor);
-
- // This makes sure the status only becomes ready after the
- // grandchild is killed.
- EXPECT_TRUE(status.isPending());
-
- // Now kill the grand child.
- // NOTE: We send a SIGKILL here because sometimes the grand child
- // process seems to be in a hung state and not responding to
- // SIGTERM/SIGINT.
- EXPECT_EQ(0, kill(grandchild, SIGKILL));
-
- Clock::pause();
-
- // Now advance time until the reaper reaps the executor.
- while (status.isPending()) {
- Clock::advance(Seconds(1));
- Clock::settle();
- }
-
- // Ensure the reaper notifies of the terminated process.
- AWAIT_READY(status);
-
- // Status is None because pid is not an immediate child.
- ASSERT_NONE(status.get());
-
- Clock::resume();
-}
-
-
-// This test checks that the Reaper can monitor a child process with
-// accurate exit status returned.
-TEST(ReaperTest, ChildProcess)
-{
- ASSERT_TRUE(GTEST_IS_THREADSAFE);
-
- // The child process sleeps and will be killed by the parent.
- Try<ProcessTree> tree = Fork(None(),
- Exec("sleep 10"))();
-
- ASSERT_SOME(tree);
- pid_t child = tree.get();
-
- Reaper reaper;
-
- Future<Nothing> monitor = FUTURE_DISPATCH(_, &ReaperProcess::monitor);
-
- // Ask the reaper to monitor the grand child process.
- Future<Option<int> > status = reaper.monitor(child);
-
- AWAIT_READY(monitor);
-
- // Now kill the child.
- EXPECT_EQ(0, kill(child, SIGKILL));
-
- Clock::pause();
-
- // Now advance time until the reaper reaps the executor.
- while (status.isPending()) {
- Clock::advance(Seconds(1));
- Clock::settle();
- }
-
- // Ensure the reaper notifies of the terminated process.
- AWAIT_READY(status);
-
- // Check if the status is correct.
- ASSERT_SOME(status.get());
- int status_ = status.get().get();
- ASSERT_TRUE(WIFSIGNALED(status_));
- ASSERT_EQ(SIGKILL, WTERMSIG(status_));
-
- Clock::resume();
-}
-
-
-// Check that the Reaper can monitor a child process that exits
-// before monitor() is called on it.
-TEST(ReaperTest, TerminatedChildProcess)
-{
- ASSERT_TRUE(GTEST_IS_THREADSAFE);
-
- // The child process immediately exits.
- Try<ProcessTree> tree = Fork(None(),
- Exec("exit 0"))();
-
- ASSERT_SOME(tree);
- pid_t child = tree.get();
-
- ASSERT_SOME(os::process(child));
-
- Clock::pause();
-
- Reaper reaper;
-
- // Because reaper reaps all child processes even if they aren't
- // registered, we advance time until that happens.
- while (os::process(child).isSome()) {
- Clock::advance(Seconds(1));
- Clock::settle();
- }
-
- // Now we request to monitor the child process which is already
- // reaped.
-
- Future<Nothing> monitor = FUTURE_DISPATCH(_, &ReaperProcess::monitor);
-
- // Ask the reaper to monitor the child process.
- Future<Option<int> > status = reaper.monitor(child);
-
- AWAIT_READY(monitor);
-
- // Now advance time until the reaper sends the notification.
- while (status.isPending()) {
- Clock::advance(Seconds(1));
- Clock::settle();
- }
-
- // Ensure the reaper notifies of the terminated process.
- AWAIT_READY(status);
-
- // Invalid status is returned because it is reaped before being
- // monitored.
- ASSERT_NONE(status.get());
-
- Clock::resume();
-}
http://git-wip-us.apache.org/repos/asf/mesos/blob/c0af3984/src/tests/slave_recovery_tests.cpp
----------------------------------------------------------------------
diff --git a/src/tests/slave_recovery_tests.cpp b/src/tests/slave_recovery_tests.cpp
index 999e598..b8c5123 100644
--- a/src/tests/slave_recovery_tests.cpp
+++ b/src/tests/slave_recovery_tests.cpp
@@ -52,7 +52,6 @@
#endif
#include "slave/paths.hpp"
#include "slave/process_isolator.hpp"
-#include "slave/reaper.hpp"
#include "slave/slave.hpp"
#include "slave/state.hpp"
[2/6] git commit: Added a child Reaper utility in libprocess.
Posted by bm...@apache.org.
Added a child Reaper utility in libprocess.
Review: https://reviews.apache.org/r/17304
Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/81d01813
Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/81d01813
Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/81d01813
Branch: refs/heads/master
Commit: 81d018136890406fdb442be8ac51d70225b9f65e
Parents: 0697d97
Author: Benjamin Mahler <bm...@twitter.com>
Authored: Thu Jan 23 14:51:06 2014 -0800
Committer: Benjamin Mahler <bm...@twitter.com>
Committed: Wed Jan 29 17:12:28 2014 -0800
----------------------------------------------------------------------
3rdparty/libprocess/Makefile.am | 3 +
3rdparty/libprocess/include/process/reap.hpp | 19 +++
3rdparty/libprocess/src/reap.cpp | 127 +++++++++++++++
3rdparty/libprocess/src/tests/reap_tests.cpp | 179 ++++++++++++++++++++++
4 files changed, 328 insertions(+)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/mesos/blob/81d01813/3rdparty/libprocess/Makefile.am
----------------------------------------------------------------------
diff --git a/3rdparty/libprocess/Makefile.am b/3rdparty/libprocess/Makefile.am
index 40f01a7..4a35f91 100644
--- a/3rdparty/libprocess/Makefile.am
+++ b/3rdparty/libprocess/Makefile.am
@@ -34,6 +34,7 @@ libprocess_la_SOURCES = \
src/latch.cpp \
src/pid.cpp \
src/process.cpp \
+ src/reap.cpp \
src/statistics.cpp \
src/synchronized.hpp
@@ -87,6 +88,7 @@ libprocess_la_SOURCES += \
$(top_srcdir)/include/process/process.hpp \
$(top_srcdir)/include/process/profiler.hpp \
$(top_srcdir)/include/process/protobuf.hpp \
+ $(top_srcdir)/include/process/reap.hpp \
$(top_srcdir)/include/process/run.hpp \
$(top_srcdir)/include/process/shared.hpp \
$(top_srcdir)/include/process/socket.hpp \
@@ -106,6 +108,7 @@ tests_SOURCES = \
src/tests/main.cpp \
src/tests/owned_tests.cpp \
src/tests/process_tests.cpp \
+ src/tests/reap_tests.cpp \
src/tests/shared_tests.cpp \
src/tests/statistics_tests.cpp \
src/tests/timeseries_tests.cpp \
http://git-wip-us.apache.org/repos/asf/mesos/blob/81d01813/3rdparty/libprocess/include/process/reap.hpp
----------------------------------------------------------------------
diff --git a/3rdparty/libprocess/include/process/reap.hpp b/3rdparty/libprocess/include/process/reap.hpp
new file mode 100644
index 0000000..dcc6c72
--- /dev/null
+++ b/3rdparty/libprocess/include/process/reap.hpp
@@ -0,0 +1,19 @@
+#ifndef __PROCESS_REAP_HPP__
+#define __PROCESS_REAP_HPP__
+
+#include <sys/types.h>
+
+#include <stout/option.hpp>
+
+namespace process {
+
+// Returns the exit status of the specified process if and only if
+// the process is a direct child and it has not already been reaped.
+// Otherwise, returns None once the process has been reaped elsewhere
+// (or does not exist, which is indistinguishable from being reaped
+// elsewhere). This will never discard the returned future.
+Future<Option<int> > reap(pid_t pid);
+
+} // namespace process {
+
+#endif // __PROCESS_REAP_HPP__
http://git-wip-us.apache.org/repos/asf/mesos/blob/81d01813/3rdparty/libprocess/src/reap.cpp
----------------------------------------------------------------------
diff --git a/3rdparty/libprocess/src/reap.cpp b/3rdparty/libprocess/src/reap.cpp
new file mode 100644
index 0000000..274d129
--- /dev/null
+++ b/3rdparty/libprocess/src/reap.cpp
@@ -0,0 +1,127 @@
+#include <glog/logging.h>
+
+#include <sys/types.h>
+#include <sys/wait.h>
+
+#include <process/delay.hpp>
+#include <process/future.hpp>
+#include <process/id.hpp>
+#include <process/once.hpp>
+#include <process/owned.hpp>
+#include <process/reap.hpp>
+
+#include <stout/check.hpp>
+#include <stout/foreach.hpp>
+#include <stout/multihashmap.hpp>
+#include <stout/none.hpp>
+#include <stout/os.hpp>
+#include <stout/result.hpp>
+#include <stout/try.hpp>
+
+namespace process {
+
+
+// TODO(bmahler): This can be optimized to use a thread per pid, where
+// each thread makes a blocking call to waitpid. This eliminates the
+// unfortunate 1 second reap delay.
+
+class ReaperProcess : public Process<ReaperProcess>
+{
+public:
+ ReaperProcess() : ProcessBase(ID::generate("reaper")) {}
+
+ Future<Option<int> > reap(pid_t pid)
+ {
+ // Check to see if this pid exists.
+ const Result<os::Process>& process = os::process(pid);
+
+ if (process.isSome()) {
+ // The process exists, we add it to the promises map.
+ Owned<Promise<Option<int> > > promise(new Promise<Option<int> >());
+ promises.put(pid, promise);
+ return promise->future();
+ } else if (process.isNone()) {
+ return None();
+ } else {
+ return Failure(
+ "Failed to monitor process " + stringify(pid) + ": " +
+ process.error());
+ }
+ }
+
+protected:
+ virtual void initialize() { wait(); }
+
+ void wait()
+ {
+ // There are a few cases to consider here for each pid:
+ // 1) The process is our child. In this case, we will notify
+ // with the exit status once it terminates.
+ // 2) The process exists but is not our child. In this case,
+ // we'll notify with None() once it no longer exists, since
+ // we cannot reap it.
+ // 3) The process does not exist, notify with None() since it
+ // has likely been reaped elsewhere.
+
+ foreach (pid_t pid, promises.keys()) {
+ int status;
+ if (waitpid(pid, &status, WNOHANG) > 0) {
+ // Terminated child process.
+ notify(pid, status);
+ } else if (errno == ECHILD) {
+ // The process is not our child, or does not exist. We want to
+ // notify with None() once it no longer exists (reaped by
+ // someone else).
+ const Result<os::Process>& process = os::process(pid);
+
+ if (process.isError()) {
+ notify(pid, Error(process.error()));
+ } else if (process.isNone()) {
+ // The process has been reaped.
+ notify(pid, None());
+ }
+ }
+ }
+
+ delay(Seconds(1), self(), &ReaperProcess::wait); // Reap forever!
+ }
+
+ void notify(pid_t pid, Result<int> status)
+ {
+ foreach (const Owned<Promise<Option<int> > >& promise, promises.get(pid)) {
+ if (status.isError()) {
+ promise->fail(status.error());
+ } else if (status.isNone()) {
+ promise->set(Option<int>::none());
+ } else {
+ promise->set(Option<int>(status.get()));
+ }
+ }
+ promises.remove(pid);
+ }
+
+private:
+ multihashmap<pid_t, Owned<Promise<Option<int> > > > promises;
+};
+
+
+// Global reaper object.
+static ReaperProcess* reaper = NULL;
+
+
+Future<Option<int> > reap(pid_t pid)
+{
+ static Once* initialized = new Once();
+
+ if (!initialized->once()) {
+ reaper = new ReaperProcess();
+ spawn(reaper);
+ initialized->done();
+ }
+
+ CHECK_NOTNULL(reaper);
+
+ return dispatch(reaper, &ReaperProcess::reap, pid);
+}
+
+} // namespace process {
http://git-wip-us.apache.org/repos/asf/mesos/blob/81d01813/3rdparty/libprocess/src/tests/reap_tests.cpp
----------------------------------------------------------------------
diff --git a/3rdparty/libprocess/src/tests/reap_tests.cpp b/3rdparty/libprocess/src/tests/reap_tests.cpp
new file mode 100644
index 0000000..a18d54c
--- /dev/null
+++ b/3rdparty/libprocess/src/tests/reap_tests.cpp
@@ -0,0 +1,179 @@
+#include <signal.h>
+#include <unistd.h>
+
+#include <sys/wait.h>
+
+#include <gtest/gtest.h>
+
+#include <process/clock.hpp>
+#include <process/dispatch.hpp>
+#include <process/gmock.hpp>
+#include <process/gtest.hpp>
+#include <process/reap.hpp>
+
+#include <stout/exit.hpp>
+#include <stout/gtest.hpp>
+#include <stout/os/fork.hpp>
+#include <stout/os/pstree.hpp>
+#include <stout/try.hpp>
+
+using namespace process;
+
+using os::Exec;
+using os::Fork;
+using os::ProcessTree;
+
+using testing::_;
+using testing::DoDefault;
+
+
+// This test checks that we can reap a non-child process, in terms
+// of receiving the termination notification.
+TEST(Reap, NonChildProcess)
+{
+ // The child process creates a grandchild and then exits. The
+ // grandchild sleeps for 10 seconds. The process tree looks like:
+ // -+- child exit 0
+ // \-+- grandchild sleep 10
+
+ // After the child exits, the grandchild is going to be re-parented
+ // by 'init', like this:
+ // -+- child (exit 0)
+ // -+- grandchild sleep 10
+ Try<ProcessTree> tree = Fork(None(),
+ Fork(Exec("sleep 10")),
+ Exec("exit 0"))();
+ ASSERT_SOME(tree);
+ ASSERT_EQ(1u, tree.get().children.size());
+ pid_t grandchild = tree.get().children.front();
+
+ // Reap the grandchild process.
+ Future<Option<int> > status = process::reap(grandchild);
+
+ EXPECT_TRUE(status.isPending());
+
+ // Now kill the grandchild.
+ // NOTE: We send a SIGKILL here because sometimes the grandchild
+ // process seems to be in a hung state and not responding to
+ // SIGTERM/SIGINT.
+ EXPECT_EQ(0, kill(grandchild, SIGKILL));
+
+ Clock::pause();
+
+ // Now advance time until the Reaper reaps the grandchild.
+ while (status.isPending()) {
+ Clock::advance(Seconds(1));
+ Clock::settle();
+ }
+
+ AWAIT_READY(status);
+
+ // The status is None because pid is not an immediate child.
+ ASSERT_NONE(status.get()) << status.get().get();
+
+ // Reap the child as well to clean up after ourselves.
+ status = process::reap(tree.get().process.pid);
+
+ // Now advance time until the child is reaped.
+ while (status.isPending()) {
+ Clock::advance(Seconds(1));
+ Clock::settle();
+ }
+
+ // Check if the status is correct.
+ ASSERT_SOME(status.get());
+ int status_ = status.get().get();
+ ASSERT_TRUE(WIFEXITED(status_));
+ ASSERT_EQ(0, WEXITSTATUS(status_));
+
+ Clock::resume();
+}
+
+
+// This test checks that the we can reap a child process and obtain
+// the correct exit status.
+TEST(Reap, ChildProcess)
+{
+ ASSERT_TRUE(GTEST_IS_THREADSAFE);
+
+ // The child process sleeps and will be killed by the parent.
+ Try<ProcessTree> tree = Fork(None(),
+ Exec("sleep 10"))();
+
+ ASSERT_SOME(tree);
+ pid_t child = tree.get();
+
+ // Reap the child process.
+ Future<Option<int> > status = process::reap(child);
+
+ // Now kill the child.
+ EXPECT_EQ(0, kill(child, SIGKILL));
+
+ Clock::pause();
+
+ // Now advance time until the reaper reaps the child.
+ while (status.isPending()) {
+ Clock::advance(Seconds(1));
+ Clock::settle();
+ }
+
+ AWAIT_READY(status);
+
+ // Check if the status is correct.
+ ASSERT_SOME(status.get());
+ int status_ = status.get().get();
+ ASSERT_TRUE(WIFSIGNALED(status_));
+ ASSERT_EQ(SIGKILL, WTERMSIG(status_));
+
+ Clock::resume();
+}
+
+
+// Check that we can reap a child process that is already exited.
+TEST(Reap, TerminatedChildProcess)
+{
+ ASSERT_TRUE(GTEST_IS_THREADSAFE);
+
+ // The child process immediately exits.
+ Try<ProcessTree> tree = Fork(None(),
+ Exec("exit 0"))();
+
+ ASSERT_SOME(tree);
+ pid_t child = tree.get();
+
+ ASSERT_SOME(os::process(child));
+
+ // Make sure the process is transitioned into the zombie
+ // state before we reap it.
+ while (true) {
+ const Result<os::Process>& process = os::process(child);
+ ASSERT_SOME(process) << "Process " << child << " reaped unexpectedly";
+
+ if (process.get().zombie) {
+ break;
+ }
+
+ os::sleep(Milliseconds(1));
+ }
+
+ // Now that it's terminated, attempt to reap it.
+ Future<Option<int> > status = process::reap(child);
+
+ // Advance time until the reaper sends the notification.
+ Clock::pause();
+ while (status.isPending()) {
+ Clock::advance(Seconds(1));
+ Clock::settle();
+ }
+
+ AWAIT_READY(status);
+
+ // Expect to get the correct status.
+ ASSERT_SOME(status.get());
+
+ int status_ = status.get().get();
+ ASSERT_TRUE(WIFEXITED(status_));
+ ASSERT_EQ(0, WEXITSTATUS(status_));
+
+ Clock::resume();
+}
[4/6] git commit: Updated the semantics of Clock::resume.
Posted by bm...@apache.org.
Updated the semantics of Clock::resume.
Now Clock::resume results in the resumption of time from the state
of Time when resume is called, rather than re-winding time back to
real time.
This requires an update to Time::create to take into account the
advanced state of time.
Review: https://reviews.apache.org/r/17480
Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/84b86ff3
Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/84b86ff3
Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/84b86ff3
Branch: refs/heads/master
Commit: 84b86ff3381c79b76dc620eca60b9250366e519a
Parents: 035d913
Author: Benjamin Mahler <bm...@twitter.com>
Authored: Tue Jan 28 18:49:36 2014 -0800
Committer: Benjamin Mahler <bm...@twitter.com>
Committed: Wed Jan 29 17:12:28 2014 -0800
----------------------------------------------------------------------
3rdparty/libprocess/include/process/time.hpp | 10 +---------
3rdparty/libprocess/src/process.cpp | 18 +++++++++++++++++-
2 files changed, 18 insertions(+), 10 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/mesos/blob/84b86ff3/3rdparty/libprocess/include/process/time.hpp
----------------------------------------------------------------------
diff --git a/3rdparty/libprocess/include/process/time.hpp b/3rdparty/libprocess/include/process/time.hpp
index 307fd2c..26cec3d 100644
--- a/3rdparty/libprocess/include/process/time.hpp
+++ b/3rdparty/libprocess/include/process/time.hpp
@@ -21,15 +21,7 @@ public:
static Time EPOCH;
static Time MAX;
- static Try<Time> create(double secs)
- {
- Try<Duration> duration = Duration::create(secs);
- if (duration.isSome()) {
- return Time(duration.get());
- } else {
- return Error("Argument too large for Time: " + duration.error());
- }
- }
+ static Try<Time> create(double seconds);
Duration duration() const { return sinceEpoch; }
http://git-wip-us.apache.org/repos/asf/mesos/blob/84b86ff3/3rdparty/libprocess/src/process.cpp
----------------------------------------------------------------------
diff --git a/3rdparty/libprocess/src/process.cpp b/3rdparty/libprocess/src/process.cpp
index 7d8f00f..1083a35 100644
--- a/3rdparty/libprocess/src/process.cpp
+++ b/3rdparty/libprocess/src/process.cpp
@@ -605,6 +605,8 @@ map<ProcessBase*, Time>* currents = new map<ProcessBase*, Time>();
Time initial = Time::EPOCH;
Time current = Time::EPOCH;
+Duration advanced = Duration::zero();
+
bool paused = false;
} // namespace clock {
@@ -640,7 +642,7 @@ Time Clock::now(ProcessBase* process)
// TODO(benh): Versus ev_now()?
double d = ev_time();
- Try<Time> time = Time::create(d);
+ Try<Time> time = Time::create(d); // Compensates for clock::advanced.
// TODO(xujyan): Move CHECK_SOME to libprocess and add CHECK_SOME
// here.
@@ -696,6 +698,7 @@ void Clock::advance(const Duration& duration)
{
synchronized (timeouts) {
if (clock::paused) {
+ clock::advanced += duration;
clock::current += duration;
VLOG(2) << "Clock advanced (" << duration << ") to " << clock::current;
if (!update_timer) {
@@ -726,6 +729,7 @@ void Clock::update(const Time& time)
synchronized (timeouts) {
if (clock::paused) {
if (clock::current < time) {
+ clock::advanced += (time - clock::current);
clock::current = Time(time);
VLOG(2) << "Clock updated to " << clock::current;
if (!update_timer) {
@@ -764,6 +768,18 @@ void Clock::settle()
}
+Try<Time> Time::create(double seconds)
+{
+ Try<Duration> duration = Duration::create(seconds);
+ if (duration.isSome()) {
+ // In production code, clock::advanced will always be zero!
+ return Time(duration.get() + clock::advanced);
+ } else {
+ return Error("Argument too large for Time: " + duration.error());
+ }
+}
+
+
static Message* encode(const UPID& from,
const UPID& to,
const string& name,