You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@mesos.apache.org by jo...@apache.org on 2016/10/24 23:19:10 UTC

[6/8] mesos git commit: Libprocess Reinit: Moved ReaperProcess instantiation into process.cpp.

Libprocess Reinit: Moved ReaperProcess instantiation into process.cpp.

The reaper singleton must be unified with `process::initialize` so
that it also falls under the scope of reinitialization.  The singleton
must also not be guarded by `Once`.

Review: https://reviews.apache.org/r/40413/


Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/3e3aeb24
Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/3e3aeb24
Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/3e3aeb24

Branch: refs/heads/master
Commit: 3e3aeb240fcda3001e62f628fe78ba66f9a8f948
Parents: 757727c
Author: Joseph Wu <jo...@mesosphere.io>
Authored: Mon Oct 24 15:06:39 2016 -0700
Committer: Joseph Wu <jo...@apache.org>
Committed: Mon Oct 24 16:18:51 2016 -0700

----------------------------------------------------------------------
 3rdparty/libprocess/include/process/reap.hpp |  33 +++++
 3rdparty/libprocess/src/process.cpp          |  11 ++
 3rdparty/libprocess/src/reap.cpp             | 156 +++++++++++-----------
 3 files changed, 120 insertions(+), 80 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/mesos/blob/3e3aeb24/3rdparty/libprocess/include/process/reap.hpp
----------------------------------------------------------------------
diff --git a/3rdparty/libprocess/include/process/reap.hpp b/3rdparty/libprocess/include/process/reap.hpp
index d7e0fa3..211fc70 100644
--- a/3rdparty/libprocess/include/process/reap.hpp
+++ b/3rdparty/libprocess/include/process/reap.hpp
@@ -16,14 +16,47 @@
 #include <sys/types.h>
 
 #include <process/future.hpp>
+#include <process/id.hpp>
+#include <process/process.hpp>
 
+#include <stout/multihashmap.hpp>
 #include <stout/option.hpp>
+#include <stout/result.hpp>
 
 namespace process {
 
 // The upper bound for the poll interval in the reaper.
 Duration MAX_REAP_INTERVAL();
 
+namespace internal {
+
+class ReaperProcess : public Process<ReaperProcess>
+{
+public:
+  ReaperProcess();
+
+  Future<Option<int>> reap(pid_t pid);
+
+protected:
+  virtual void initialize();
+
+  void wait();
+
+  void notify(pid_t pid, Result<int> status);
+
+private:
+  const Duration interval();
+
+  multihashmap<pid_t, Owned<Promise<Option<int>>>> promises;
+};
+
+
+// Global reaper process. Defined in process.cpp.
+extern PID<ReaperProcess> reaper;
+
+} // namespace internal {
+
+
 // Returns the exit status of the specified process if and only if
 // the process is a direct child and it has not already been reaped.
 // Otherwise, returns None once the process has been reaped elsewhere

http://git-wip-us.apache.org/repos/asf/mesos/blob/3e3aeb24/3rdparty/libprocess/src/process.cpp
----------------------------------------------------------------------
diff --git a/3rdparty/libprocess/src/process.cpp b/3rdparty/libprocess/src/process.cpp
index 8af056e..b6ef0f2 100644
--- a/3rdparty/libprocess/src/process.cpp
+++ b/3rdparty/libprocess/src/process.cpp
@@ -78,6 +78,7 @@
 #include <process/owned.hpp>
 #include <process/process.hpp>
 #include <process/profiler.hpp>
+#include <process/reap.hpp>
 #include <process/sequence.hpp>
 #include <process/socket.hpp>
 #include <process/statistics.hpp>
@@ -553,6 +554,12 @@ PID<metrics::internal::MetricsProcess> metrics;
 } // namespace internal {
 } // namespace metrics {
 
+namespace internal {
+
+PID<process::internal::ReaperProcess> reaper;
+
+} // namespace internal {
+
 
 namespace http {
 
@@ -1104,6 +1111,10 @@ bool initialize(
   // Create the global HTTP authentication router.
   authenticator_manager = new AuthenticatorManager();
 
+  // Create the global reaper process.
+  process::internal::reaper =
+    spawn(new process::internal::ReaperProcess(), true);
+
   // Initialize the mime types.
   mime::initialize();
 

http://git-wip-us.apache.org/repos/asf/mesos/blob/3e3aeb24/3rdparty/libprocess/src/reap.cpp
----------------------------------------------------------------------
diff --git a/3rdparty/libprocess/src/reap.cpp b/3rdparty/libprocess/src/reap.cpp
index 5fc2a4d..380edbb 100644
--- a/3rdparty/libprocess/src/reap.cpp
+++ b/3rdparty/libprocess/src/reap.cpp
@@ -28,6 +28,7 @@
 #include <stout/foreach.hpp>
 #include <stout/multihashmap.hpp>
 #include <stout/none.hpp>
+#include <stout/option.hpp>
 #include <stout/os.hpp>
 #include <stout/result.hpp>
 #include <stout/try.hpp>
@@ -60,108 +61,103 @@ Duration MIN_REAP_INTERVAL() { return Milliseconds(100); }
 const size_t HIGH_PID_COUNT = 500;
 Duration MAX_REAP_INTERVAL() { return Seconds(1); }
 
+namespace internal {
 
-class ReaperProcess : public Process<ReaperProcess>
+ReaperProcess::ReaperProcess() : ProcessBase(ID::generate("__reaper__")) {}
+
+
+Future<Option<int>> ReaperProcess::reap(pid_t pid)
 {
-public:
-  ReaperProcess() : ProcessBase(ID::generate("__reaper__")) {}
-
-  Future<Option<int>> reap(pid_t pid)
-  {
-    // Check to see if this pid exists.
-    if (os::exists(pid)) {
-      Owned<Promise<Option<int>>> promise(new Promise<Option<int>>());
-      promises.put(pid, promise);
-      return promise->future();
-    } else {
-      return None();
-    }
+  // Check to see if this pid exists.
+  if (os::exists(pid)) {
+    Owned<Promise<Option<int>>> promise(new Promise<Option<int>>());
+    promises.put(pid, promise);
+    return promise->future();
+  } else {
+    return None();
   }
+}
 
-protected:
-  virtual void initialize() { wait(); }
-
-  void wait()
-  {
-    // There are two cases to consider for each pid when it terminates:
-    //   1) The process is our child. In this case, we will reap the process and
-    //      notify with the exit status.
-    //   2) The process was not our child. In this case, it will be reaped by
-    //      someone else (its parent or init, if reparented) so we cannot know
-    //      the exit status and we must notify with None().
-    //
-    // NOTE: A child can only be reaped by us, the parent. If a child exits
-    // between waitpid and the (!exists) conditional it will still exist as a
-    // zombie; it will be reaped by us on the next loop.
-    foreach (pid_t pid, promises.keys()) {
-      int status;
-      Result<pid_t> child_pid = os::waitpid(pid, &status, WNOHANG);
-      if (child_pid.isSome()) {
-        // We have reaped a child.
-        notify(pid, status);
-      } else if (!os::exists(pid)) {
-        // The process no longer exists and has been reaped by someone else.
-        notify(pid, None());
-      }
-    }
 
-    delay(interval(), self(), &ReaperProcess::wait); // Reap forever!
-  }
+void ReaperProcess::initialize()
+{
+  wait();
+}
 
-  void notify(pid_t pid, Result<int> status)
-  {
-    foreach (const Owned<Promise<Option<int>>>& promise, promises.get(pid)) {
-      if (status.isError()) {
-        promise->fail(status.error());
-      } else if (status.isNone()) {
-        promise->set(Option<int>::none());
-      } else {
-        promise->set(Option<int>(status.get()));
-      }
+
+void ReaperProcess::wait()
+{
+  // There are two cases to consider for each pid when it terminates:
+  //   1) The process is our child. In this case, we will reap the process and
+  //      notify with the exit status.
+  //   2) The process was not our child. In this case, it will be reaped by
+  //      someone else (its parent or init, if reparented) so we cannot know
+  //      the exit status and we must notify with None().
+  //
+  // NOTE: A child can only be reaped by us, the parent. If a child exits
+  // between waitpid and the (!exists) conditional it will still exist as a
+  // zombie; it will be reaped by us on the next loop.
+  foreach (pid_t pid, promises.keys()) {
+    int status;
+    Result<pid_t> child_pid = os::waitpid(pid, &status, WNOHANG);
+    if (child_pid.isSome()) {
+      // We have reaped a child.
+      notify(pid, status);
+    } else if (!os::exists(pid)) {
+      // The process no longer exists and has been reaped by someone else.
+      notify(pid, None());
     }
-    promises.remove(pid);
   }
 
-private:
-  const Duration interval()
-  {
-    size_t count = promises.size();
+  delay(interval(), self(), &ReaperProcess::wait); // Reap forever!
+}
 
-    if (count <= LOW_PID_COUNT) {
-      return MIN_REAP_INTERVAL();
-    } else if (count >= HIGH_PID_COUNT) {
-      return MAX_REAP_INTERVAL();
+
+void ReaperProcess::notify(pid_t pid, Result<int> status)
+{
+  foreach (const Owned<Promise<Option<int>>>& promise, promises.get(pid)) {
+    if (status.isError()) {
+      promise->fail(status.error());
+    } else if (status.isNone()) {
+      promise->set(Option<int>::none());
+    } else {
+      promise->set(Option<int>(status.get()));
     }
+  }
+  promises.remove(pid);
+}
 
-    // Linear interpolation between min and max reap intervals.
-    double fraction =
-      ((double) (count - LOW_PID_COUNT) / (HIGH_PID_COUNT - LOW_PID_COUNT));
 
-    return (MIN_REAP_INTERVAL() +
-            (MAX_REAP_INTERVAL() - MIN_REAP_INTERVAL()) * fraction);
+const Duration ReaperProcess::interval()
+{
+  size_t count = promises.size();
+
+  if (count <= LOW_PID_COUNT) {
+    return MIN_REAP_INTERVAL();
+  } else if (count >= HIGH_PID_COUNT) {
+    return MAX_REAP_INTERVAL();
   }
 
-  multihashmap<pid_t, Owned<Promise<Option<int>>>> promises;
-};
+  // Linear interpolation between min and max reap intervals.
+  double fraction =
+    ((double) (count - LOW_PID_COUNT) / (HIGH_PID_COUNT - LOW_PID_COUNT));
 
+  return (MIN_REAP_INTERVAL() +
+          (MAX_REAP_INTERVAL() - MIN_REAP_INTERVAL()) * fraction);
+}
 
-// Global reaper object.
-static ReaperProcess* reaper = nullptr;
+} // namespace internal {
 
 
 Future<Option<int>> reap(pid_t pid)
 {
-  static Once* initialized = new Once();
-
-  if (!initialized->once()) {
-    reaper = new ReaperProcess();
-    spawn(reaper);
-    initialized->done();
-  }
-
-  CHECK_NOTNULL(reaper);
+  // The reaper process is instantiated in `process::initialize`.
+  process::initialize();
 
-  return dispatch(reaper, &ReaperProcess::reap, pid);
+  return dispatch(
+      internal::reaper,
+      &internal::ReaperProcess::reap,
+      pid);
 }
 
 } // namespace process {