You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@mesos.apache.org by jo...@apache.org on 2016/10/24 23:19:10 UTC
[6/8] mesos git commit: Libprocess Reinit: Moved ReaperProcess
instantiation into process.cpp.
Libprocess Reinit: Moved ReaperProcess instantiation into process.cpp.
The reaper singleton must be unified with `process::initialize` so
that it also falls under the scope of reinitialization. The singleton
must also not be guarded by `Once`.
Review: https://reviews.apache.org/r/40413/
Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/3e3aeb24
Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/3e3aeb24
Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/3e3aeb24
Branch: refs/heads/master
Commit: 3e3aeb240fcda3001e62f628fe78ba66f9a8f948
Parents: 757727c
Author: Joseph Wu <jo...@mesosphere.io>
Authored: Mon Oct 24 15:06:39 2016 -0700
Committer: Joseph Wu <jo...@apache.org>
Committed: Mon Oct 24 16:18:51 2016 -0700
----------------------------------------------------------------------
3rdparty/libprocess/include/process/reap.hpp | 33 +++++
3rdparty/libprocess/src/process.cpp | 11 ++
3rdparty/libprocess/src/reap.cpp | 156 +++++++++++-----------
3 files changed, 120 insertions(+), 80 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/mesos/blob/3e3aeb24/3rdparty/libprocess/include/process/reap.hpp
----------------------------------------------------------------------
diff --git a/3rdparty/libprocess/include/process/reap.hpp b/3rdparty/libprocess/include/process/reap.hpp
index d7e0fa3..211fc70 100644
--- a/3rdparty/libprocess/include/process/reap.hpp
+++ b/3rdparty/libprocess/include/process/reap.hpp
@@ -16,14 +16,47 @@
#include <sys/types.h>
#include <process/future.hpp>
+#include <process/id.hpp>
+#include <process/process.hpp>
+#include <stout/multihashmap.hpp>
#include <stout/option.hpp>
+#include <stout/result.hpp>
namespace process {
// The upper bound for the poll interval in the reaper.
Duration MAX_REAP_INTERVAL();
+namespace internal {
+
+class ReaperProcess : public Process<ReaperProcess>
+{
+public:
+ ReaperProcess();
+
+ Future<Option<int>> reap(pid_t pid);
+
+protected:
+ virtual void initialize();
+
+ void wait();
+
+ void notify(pid_t pid, Result<int> status);
+
+private:
+ const Duration interval();
+
+ multihashmap<pid_t, Owned<Promise<Option<int>>>> promises;
+};
+
+
+// Global reaper process. Defined in process.cpp.
+extern PID<ReaperProcess> reaper;
+
+} // namespace internal {
+
+
// Returns the exit status of the specified process if and only if
// the process is a direct child and it has not already been reaped.
// Otherwise, returns None once the process has been reaped elsewhere
http://git-wip-us.apache.org/repos/asf/mesos/blob/3e3aeb24/3rdparty/libprocess/src/process.cpp
----------------------------------------------------------------------
diff --git a/3rdparty/libprocess/src/process.cpp b/3rdparty/libprocess/src/process.cpp
index 8af056e..b6ef0f2 100644
--- a/3rdparty/libprocess/src/process.cpp
+++ b/3rdparty/libprocess/src/process.cpp
@@ -78,6 +78,7 @@
#include <process/owned.hpp>
#include <process/process.hpp>
#include <process/profiler.hpp>
+#include <process/reap.hpp>
#include <process/sequence.hpp>
#include <process/socket.hpp>
#include <process/statistics.hpp>
@@ -553,6 +554,12 @@ PID<metrics::internal::MetricsProcess> metrics;
} // namespace internal {
} // namespace metrics {
+namespace internal {
+
+PID<process::internal::ReaperProcess> reaper;
+
+} // namespace internal {
+
namespace http {
@@ -1104,6 +1111,10 @@ bool initialize(
// Create the global HTTP authentication router.
authenticator_manager = new AuthenticatorManager();
+ // Create the global reaper process.
+ process::internal::reaper =
+ spawn(new process::internal::ReaperProcess(), true);
+
// Initialize the mime types.
mime::initialize();
http://git-wip-us.apache.org/repos/asf/mesos/blob/3e3aeb24/3rdparty/libprocess/src/reap.cpp
----------------------------------------------------------------------
diff --git a/3rdparty/libprocess/src/reap.cpp b/3rdparty/libprocess/src/reap.cpp
index 5fc2a4d..380edbb 100644
--- a/3rdparty/libprocess/src/reap.cpp
+++ b/3rdparty/libprocess/src/reap.cpp
@@ -28,6 +28,7 @@
#include <stout/foreach.hpp>
#include <stout/multihashmap.hpp>
#include <stout/none.hpp>
+#include <stout/option.hpp>
#include <stout/os.hpp>
#include <stout/result.hpp>
#include <stout/try.hpp>
@@ -60,108 +61,103 @@ Duration MIN_REAP_INTERVAL() { return Milliseconds(100); }
const size_t HIGH_PID_COUNT = 500;
Duration MAX_REAP_INTERVAL() { return Seconds(1); }
+namespace internal {
-class ReaperProcess : public Process<ReaperProcess>
+ReaperProcess::ReaperProcess() : ProcessBase(ID::generate("__reaper__")) {}
+
+
+Future<Option<int>> ReaperProcess::reap(pid_t pid)
{
-public:
- ReaperProcess() : ProcessBase(ID::generate("__reaper__")) {}
-
- Future<Option<int>> reap(pid_t pid)
- {
- // Check to see if this pid exists.
- if (os::exists(pid)) {
- Owned<Promise<Option<int>>> promise(new Promise<Option<int>>());
- promises.put(pid, promise);
- return promise->future();
- } else {
- return None();
- }
+ // Check to see if this pid exists.
+ if (os::exists(pid)) {
+ Owned<Promise<Option<int>>> promise(new Promise<Option<int>>());
+ promises.put(pid, promise);
+ return promise->future();
+ } else {
+ return None();
}
+}
-protected:
- virtual void initialize() { wait(); }
-
- void wait()
- {
- // There are two cases to consider for each pid when it terminates:
- // 1) The process is our child. In this case, we will reap the process and
- // notify with the exit status.
- // 2) The process was not our child. In this case, it will be reaped by
- // someone else (its parent or init, if reparented) so we cannot know
- // the exit status and we must notify with None().
- //
- // NOTE: A child can only be reaped by us, the parent. If a child exits
- // between waitpid and the (!exists) conditional it will still exist as a
- // zombie; it will be reaped by us on the next loop.
- foreach (pid_t pid, promises.keys()) {
- int status;
- Result<pid_t> child_pid = os::waitpid(pid, &status, WNOHANG);
- if (child_pid.isSome()) {
- // We have reaped a child.
- notify(pid, status);
- } else if (!os::exists(pid)) {
- // The process no longer exists and has been reaped by someone else.
- notify(pid, None());
- }
- }
- delay(interval(), self(), &ReaperProcess::wait); // Reap forever!
- }
+void ReaperProcess::initialize()
+{
+ wait();
+}
- void notify(pid_t pid, Result<int> status)
- {
- foreach (const Owned<Promise<Option<int>>>& promise, promises.get(pid)) {
- if (status.isError()) {
- promise->fail(status.error());
- } else if (status.isNone()) {
- promise->set(Option<int>::none());
- } else {
- promise->set(Option<int>(status.get()));
- }
+
+void ReaperProcess::wait()
+{
+ // There are two cases to consider for each pid when it terminates:
+ // 1) The process is our child. In this case, we will reap the process and
+ // notify with the exit status.
+ // 2) The process was not our child. In this case, it will be reaped by
+ // someone else (its parent or init, if reparented) so we cannot know
+ // the exit status and we must notify with None().
+ //
+ // NOTE: A child can only be reaped by us, the parent. If a child exits
+ // between waitpid and the (!exists) conditional it will still exist as a
+ // zombie; it will be reaped by us on the next loop.
+ foreach (pid_t pid, promises.keys()) {
+ int status;
+ Result<pid_t> child_pid = os::waitpid(pid, &status, WNOHANG);
+ if (child_pid.isSome()) {
+ // We have reaped a child.
+ notify(pid, status);
+ } else if (!os::exists(pid)) {
+ // The process no longer exists and has been reaped by someone else.
+ notify(pid, None());
}
- promises.remove(pid);
}
-private:
- const Duration interval()
- {
- size_t count = promises.size();
+ delay(interval(), self(), &ReaperProcess::wait); // Reap forever!
+}
- if (count <= LOW_PID_COUNT) {
- return MIN_REAP_INTERVAL();
- } else if (count >= HIGH_PID_COUNT) {
- return MAX_REAP_INTERVAL();
+
+void ReaperProcess::notify(pid_t pid, Result<int> status)
+{
+ foreach (const Owned<Promise<Option<int>>>& promise, promises.get(pid)) {
+ if (status.isError()) {
+ promise->fail(status.error());
+ } else if (status.isNone()) {
+ promise->set(Option<int>::none());
+ } else {
+ promise->set(Option<int>(status.get()));
}
+ }
+ promises.remove(pid);
+}
- // Linear interpolation between min and max reap intervals.
- double fraction =
- ((double) (count - LOW_PID_COUNT) / (HIGH_PID_COUNT - LOW_PID_COUNT));
- return (MIN_REAP_INTERVAL() +
- (MAX_REAP_INTERVAL() - MIN_REAP_INTERVAL()) * fraction);
+const Duration ReaperProcess::interval()
+{
+ size_t count = promises.size();
+
+ if (count <= LOW_PID_COUNT) {
+ return MIN_REAP_INTERVAL();
+ } else if (count >= HIGH_PID_COUNT) {
+ return MAX_REAP_INTERVAL();
}
- multihashmap<pid_t, Owned<Promise<Option<int>>>> promises;
-};
+ // Linear interpolation between min and max reap intervals.
+ double fraction =
+ ((double) (count - LOW_PID_COUNT) / (HIGH_PID_COUNT - LOW_PID_COUNT));
+ return (MIN_REAP_INTERVAL() +
+ (MAX_REAP_INTERVAL() - MIN_REAP_INTERVAL()) * fraction);
+}
-// Global reaper object.
-static ReaperProcess* reaper = nullptr;
+} // namespace internal {
Future<Option<int>> reap(pid_t pid)
{
- static Once* initialized = new Once();
-
- if (!initialized->once()) {
- reaper = new ReaperProcess();
- spawn(reaper);
- initialized->done();
- }
-
- CHECK_NOTNULL(reaper);
+ // The reaper process is instantiated in `process::initialize`.
+ process::initialize();
- return dispatch(reaper, &ReaperProcess::reap, pid);
+ return dispatch(
+ internal::reaper,
+ &internal::ReaperProcess::reap,
+ pid);
}
} // namespace process {