You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@mesos.apache.org by bm...@apache.org on 2013/06/21 19:55:14 UTC

[1/4] git commit: Refactored Reaper with an abstraction that hides the dispatch syntax.

Updated Branches:
  refs/heads/master 464192eee -> ad198fe48


Refactored Reaper with an abstraction that hides the dispatch syntax.

From: Jiang Yan Xu <ya...@jxu.me>
Review: https://reviews.apache.org/r/11552


Project: http://git-wip-us.apache.org/repos/asf/incubator-mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-mesos/commit/93dfc5f0
Tree: http://git-wip-us.apache.org/repos/asf/incubator-mesos/tree/93dfc5f0
Diff: http://git-wip-us.apache.org/repos/asf/incubator-mesos/diff/93dfc5f0

Branch: refs/heads/master
Commit: 93dfc5f03d32b7b01f5c26fceeb9f869dd11bbb0
Parents: 464192e
Author: Benjamin Mahler <bm...@twitter.com>
Authored: Fri Jun 21 10:43:02 2013 -0700
Committer: Benjamin Mahler <bm...@twitter.com>
Committed: Fri Jun 21 10:43:02 2013 -0700

----------------------------------------------------------------------
 src/slave/cgroups_isolator.cpp | 15 ++----------
 src/slave/cgroups_isolator.hpp |  4 +---
 src/slave/process_isolator.cpp | 15 ++----------
 src/slave/process_isolator.hpp |  4 +---
 src/slave/reaper.cpp           | 48 +++++++++++++++++++++++++++----------
 src/slave/reaper.hpp           | 40 +++++++++++++++++++++----------
 src/tests/reaper_tests.cpp     |  8 ++-----
 7 files changed, 72 insertions(+), 62 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-mesos/blob/93dfc5f0/src/slave/cgroups_isolator.cpp
----------------------------------------------------------------------
diff --git a/src/slave/cgroups_isolator.cpp b/src/slave/cgroups_isolator.cpp
index 553844b..9f5de60 100644
--- a/src/slave/cgroups_isolator.cpp
+++ b/src/slave/cgroups_isolator.cpp
@@ -200,18 +200,7 @@ CgroupsIsolator::CgroupsIsolator()
   // Spawn the reaper, note that it might send us a message before we
   // actually get spawned ourselves, but that's okay, the message will
   // just get dropped.
-  reaper = new Reaper();
-  spawn(reaper);
-  dispatch(reaper, &Reaper::addListener, this);
-}
-
-
-CgroupsIsolator::~CgroupsIsolator()
-{
-  CHECK(reaper != NULL);
-  terminate(reaper);
-  process::wait(reaper); // Necessary for disambiguation.
-  delete reaper;
+  reaper.addListener(this);
 }
 
 
@@ -780,7 +769,7 @@ Future<Nothing> CgroupsIsolator::recover(
 
         // Add the pid to the reaper to monitor exit status.
         if (run.forkedPid.isSome()) {
-          dispatch(reaper, &Reaper::monitor, run.forkedPid.get());
+          reaper.monitor(run.forkedPid.get());
         }
       }
     }

http://git-wip-us.apache.org/repos/asf/incubator-mesos/blob/93dfc5f0/src/slave/cgroups_isolator.hpp
----------------------------------------------------------------------
diff --git a/src/slave/cgroups_isolator.hpp b/src/slave/cgroups_isolator.hpp
index 124a4b3..df6069e 100644
--- a/src/slave/cgroups_isolator.hpp
+++ b/src/slave/cgroups_isolator.hpp
@@ -81,8 +81,6 @@ class CgroupsIsolator : public Isolator, public ProcessExitedListener
 public:
   CgroupsIsolator();
 
-  virtual ~CgroupsIsolator();
-
   virtual void initialize(
       const Flags& flags,
       const Resources& resources,
@@ -292,7 +290,7 @@ private:
   bool local;
   process::PID<Slave> slave;
   bool initialized;
-  Reaper* reaper;
+  Reaper reaper;
 
   // File descriptor to 'mesos/tasks' file in the cgroup on which we place
   // an advisory lock.

http://git-wip-us.apache.org/repos/asf/incubator-mesos/blob/93dfc5f0/src/slave/process_isolator.cpp
----------------------------------------------------------------------
diff --git a/src/slave/process_isolator.cpp b/src/slave/process_isolator.cpp
index 97a017e..a3eace3 100644
--- a/src/slave/process_isolator.cpp
+++ b/src/slave/process_isolator.cpp
@@ -68,18 +68,7 @@ ProcessIsolator::ProcessIsolator()
   // Spawn the reaper, note that it might send us a message before we
   // actually get spawned ourselves, but that's okay, the message will
   // just get dropped.
-  reaper = new Reaper();
-  spawn(reaper);
-  dispatch(reaper, &Reaper::addListener, this);
-}
-
-
-ProcessIsolator::~ProcessIsolator()
-{
-  CHECK(reaper != NULL);
-  terminate(reaper);
-  wait(reaper);
-  delete reaper;
+  reaper.addListener(this);
 }
 
 
@@ -345,7 +334,7 @@ Future<Nothing> ProcessIsolator::recover(
 
       // Add the pid to the reaper to monitor exit status.
       if (run.forkedPid.isSome()) {
-        dispatch(reaper, &Reaper::monitor, run.forkedPid.get());
+        reaper.monitor(run.forkedPid.get());
       }
     }
   }

http://git-wip-us.apache.org/repos/asf/incubator-mesos/blob/93dfc5f0/src/slave/process_isolator.hpp
----------------------------------------------------------------------
diff --git a/src/slave/process_isolator.hpp b/src/slave/process_isolator.hpp
index ee693f6..c5abc6b 100644
--- a/src/slave/process_isolator.hpp
+++ b/src/slave/process_isolator.hpp
@@ -46,8 +46,6 @@ class ProcessIsolator : public Isolator, public ProcessExitedListener
 public:
   ProcessIsolator();
 
-  virtual ~ProcessIsolator();
-
   virtual void initialize(
       const Flags& flags,
       const Resources& resources,
@@ -109,7 +107,7 @@ private:
   bool local;
   process::PID<Slave> slave;
   bool initialized;
-  Reaper* reaper;
+  Reaper reaper;
   hashmap<FrameworkID, hashmap<ExecutorID, ProcessInfo*> > infos;
 };
 

http://git-wip-us.apache.org/repos/asf/incubator-mesos/blob/93dfc5f0/src/slave/reaper.cpp
----------------------------------------------------------------------
diff --git a/src/slave/reaper.cpp b/src/slave/reaper.cpp
index 1d826d1..c2e606e 100644
--- a/src/slave/reaper.cpp
+++ b/src/slave/reaper.cpp
@@ -41,28 +41,25 @@ namespace mesos {
 namespace internal {
 namespace slave {
 
-Reaper::Reaper()
+ReaperProcess::ReaperProcess()
   : ProcessBase(ID::generate("reaper")) {}
 
 
-Reaper::~Reaper() {}
-
-
-void Reaper::addListener(
+void ReaperProcess::addListener(
     const PID<ProcessExitedListener>& listener)
 {
   listeners.push_back(listener);
 }
 
 
-Try<Nothing> Reaper::monitor(pid_t pid)
+Future<Nothing> ReaperProcess::monitor(pid_t pid)
 {
   // Check to see if the current process has sufficient privileges to
   // monitor the liveness of this pid.
   Try<bool> alive = os::alive(pid);
   if (alive.isError()) {
-    return Error("Failed to monitor process " + stringify(pid) +
-                  ": " + alive.error());
+    return Future<Nothing>::failed("Failed to monitor process " +
+                                   stringify(pid) + ": " + alive.error());
   } else {
     pids.insert(pid);
   }
@@ -70,13 +67,13 @@ Try<Nothing> Reaper::monitor(pid_t pid)
 }
 
 
-void Reaper::initialize()
+void ReaperProcess::initialize()
 {
   reap();
 }
 
 
-void Reaper::notify(pid_t pid, int status)
+void ReaperProcess::notify(pid_t pid, int status)
 {
   foreach (const PID<ProcessExitedListener>& listener, listeners) {
     dispatch(listener, &ProcessExitedListener::processExited, pid, status);
@@ -84,7 +81,7 @@ void Reaper::notify(pid_t pid, int status)
 }
 
 
-void Reaper::reap()
+void ReaperProcess::reap()
 {
   // Check whether any monitored process has exited.
   foreach (pid_t pid, utils::copy(pids)) {
@@ -117,7 +114,34 @@ void Reaper::reap()
       pids.erase(pid);
     }
   }
-  delay(Seconds(1), self(), &Reaper::reap); // Reap forever!
+  delay(Seconds(1), self(), &ReaperProcess::reap); // Reap forever!
+}
+
+
+Reaper::Reaper()
+{
+  process = new ReaperProcess();
+  spawn(process);
+}
+
+
+Reaper::~Reaper()
+{
+  terminate(process);
+  wait(process);
+  delete process;
+}
+
+
+void Reaper::addListener(const process::PID<ProcessExitedListener>& listener)
+{
+  dispatch(process, &ReaperProcess::addListener, listener);
+}
+
+
+Future<Nothing> Reaper::monitor(pid_t pid)
+{
+  return dispatch(process, &ReaperProcess::monitor, pid);
 }
 
 } // namespace slave {

http://git-wip-us.apache.org/repos/asf/incubator-mesos/blob/93dfc5f0/src/slave/reaper.hpp
----------------------------------------------------------------------
diff --git a/src/slave/reaper.hpp b/src/slave/reaper.hpp
index 09844d8..d599e9c 100644
--- a/src/slave/reaper.hpp
+++ b/src/slave/reaper.hpp
@@ -27,6 +27,7 @@
 #include <stout/nothing.hpp>
 #include <stout/try.hpp>
 
+
 namespace mesos {
 namespace internal {
 namespace slave {
@@ -38,11 +39,35 @@ public:
 };
 
 
+// Reaper implementation. See comments of the Reaper class.
+class ReaperProcess : public process::Process<ReaperProcess>
+{
+public:
+  ReaperProcess();
+
+  void addListener(const process::PID<ProcessExitedListener>&);
+
+  process::Future<Nothing> monitor(pid_t pid);
+
+protected:
+  virtual void initialize();
+
+  void reap();
+
+  // TODO(vinod): Make 'status' an option.
+  void notify(pid_t pid, int status);
+
+private:
+  std::list<process::PID<ProcessExitedListener> > listeners;
+  std::set<pid_t> pids;
+};
+
+
 // TODO(vinod): Refactor the Reaper into 2 components:
 // 1) Reaps the status of child processes.
 // 2) Checks the exit status of requested processes.
 // Also, use Futures instead of callbacks to notify process exits.
-class Reaper : public process::Process<Reaper>
+class Reaper
 {
 public:
   Reaper();
@@ -55,19 +80,10 @@ public:
   // 1) is the parent of 'pid' or
   // 2) has the same real/effective UID as that of 'pid' or
   // 3) is run as a privileged user.
-  Try<Nothing> monitor(pid_t pid);
-
-protected:
-  virtual void initialize();
-
-  void reap();
-
-  // TOOD(vinod): Make 'status' an option.
-  void notify(pid_t pid, int status);
+  process::Future<Nothing> monitor(pid_t pid);
 
 private:
-  std::list<process::PID<ProcessExitedListener> > listeners;
-  std::set<pid_t> pids;
+  ReaperProcess* process;
 };
 
 

http://git-wip-us.apache.org/repos/asf/incubator-mesos/blob/93dfc5f0/src/tests/reaper_tests.cpp
----------------------------------------------------------------------
diff --git a/src/tests/reaper_tests.cpp b/src/tests/reaper_tests.cpp
index fbb6066..6849419 100644
--- a/src/tests/reaper_tests.cpp
+++ b/src/tests/reaper_tests.cpp
@@ -102,16 +102,15 @@ TEST(ReaperTest, NonChildProcess)
 
   // Spawn the reaper.
   Reaper reaper;
-  spawn(reaper);
 
   // Ignore the exit of the child process.
   EXPECT_CALL(listener, processExited(_,_))
     .WillRepeatedly(DoDefault());
 
-  dispatch(reaper, &Reaper::addListener, listener.self());
+  reaper.addListener(listener.self());
 
   // Ask the reaper to monitor the grand child process.
-  dispatch(reaper, &Reaper::monitor, pid);
+  reaper.monitor(pid);
 
   // Catch the exit of the grand child process.
   Future<Nothing> processExited;
@@ -134,9 +133,6 @@ TEST(ReaperTest, NonChildProcess)
   // Ensure the reaper notifies of the terminated process.
   AWAIT_READY(processExited);
 
-  terminate(reaper);
-  wait(reaper);
-
   terminate(listener);
   wait(listener);
 


[4/4] git commit: Changed reaper to use an Option for the process exit status.

Posted by bm...@apache.org.
Changed reaper to use an Option for the process exit status.

From: Jiang Yan Xu <ya...@jxu.me>
Review: https://reviews.apache.org/r/11556


Project: http://git-wip-us.apache.org/repos/asf/incubator-mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-mesos/commit/ad198fe4
Tree: http://git-wip-us.apache.org/repos/asf/incubator-mesos/tree/ad198fe4
Diff: http://git-wip-us.apache.org/repos/asf/incubator-mesos/diff/ad198fe4

Branch: refs/heads/master
Commit: ad198fe4876ce2ce027c55e04c198005294d0fc5
Parents: 4dd692f
Author: Benjamin Mahler <bm...@twitter.com>
Authored: Fri Jun 21 10:47:44 2013 -0700
Committer: Benjamin Mahler <bm...@twitter.com>
Committed: Fri Jun 21 10:47:44 2013 -0700

----------------------------------------------------------------------
 src/slave/cgroups_isolator.cpp | 23 ++++++++++---------
 src/slave/cgroups_isolator.hpp |  4 ++--
 src/slave/process_isolator.cpp | 18 +++++++--------
 src/slave/process_isolator.hpp |  2 +-
 src/slave/reaper.cpp           | 24 ++++++++++----------
 src/slave/reaper.hpp           | 13 ++++++-----
 src/slave/slave.cpp            | 45 ++++++++++++++++++++++---------------
 src/slave/slave.hpp            |  2 +-
 src/tests/reaper_tests.cpp     | 20 +++++++++--------
 9 files changed, 81 insertions(+), 70 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-mesos/blob/ad198fe4/src/slave/cgroups_isolator.cpp
----------------------------------------------------------------------
diff --git a/src/slave/cgroups_isolator.cpp b/src/slave/cgroups_isolator.cpp
index 6b87d08..9d18886 100644
--- a/src/slave/cgroups_isolator.cpp
+++ b/src/slave/cgroups_isolator.cpp
@@ -801,25 +801,26 @@ Future<Nothing> CgroupsIsolator::recover(
 }
 
 
-void CgroupsIsolator::reaped(pid_t pid, const Future<int>& status)
+void CgroupsIsolator::reaped(pid_t pid, const Future<Option<int> >& status)
 {
-  if (status.isDiscarded()) {
-    LOG(ERROR) << "The status was discarded";
-    return;
-  }
-  if (status.isFailed()) {
-    LOG(ERROR) << status.failure();
-    return;
-  }
-
   CgroupInfo* info = findCgroupInfo(pid);
   if (info != NULL) {
     FrameworkID frameworkId = info->frameworkId;
     ExecutorID executorId = info->executorId;
 
+    if (!status.isReady()) {
+      LOG(ERROR) << "Failed to get the status for executor " << executorId
+                 << " of framework " << frameworkId << ": "
+                 << (status.isFailed() ? status.failure() : "discarded");
+      return;
+    }
+
     LOG(INFO) << "Executor " << executorId
               << " of framework " << frameworkId
-              << " terminated with status " << status.get();
+              << " terminated with status "
+              << (status.get().isSome()
+                  ? stringify(status.get().get())
+                  : "unknown");
 
     // Set the exit status, so that '_killExecutor()' can send it to the slave.
     info->status = status.get();

http://git-wip-us.apache.org/repos/asf/incubator-mesos/blob/ad198fe4/src/slave/cgroups_isolator.hpp
----------------------------------------------------------------------
diff --git a/src/slave/cgroups_isolator.hpp b/src/slave/cgroups_isolator.hpp
index 06bac98..e86062e 100644
--- a/src/slave/cgroups_isolator.hpp
+++ b/src/slave/cgroups_isolator.hpp
@@ -119,7 +119,7 @@ private:
   CgroupsIsolator(const CgroupsIsolator&);
   CgroupsIsolator& operator = (const CgroupsIsolator&);
 
-  void reaped(pid_t pid, const Future<int>& status);
+  void reaped(pid_t pid, const Future<Option<int> >& status);
 
   // The cgroup information for each live executor.
   struct CgroupInfo
@@ -163,7 +163,7 @@ private:
 
     std::string message; // The reason behind the destruction.
 
-    int status; // Exit status of the executor.
+    Option<int> status; // Exit status of the executor.
 
     Flags flags; // Slave flags.
 

http://git-wip-us.apache.org/repos/asf/incubator-mesos/blob/ad198fe4/src/slave/process_isolator.cpp
----------------------------------------------------------------------
diff --git a/src/slave/process_isolator.cpp b/src/slave/process_isolator.cpp
index a3d31e5..b750b26 100644
--- a/src/slave/process_isolator.cpp
+++ b/src/slave/process_isolator.cpp
@@ -423,22 +423,20 @@ Future<ResourceStatistics> ProcessIsolator::usage(
 }
 
 
-void ProcessIsolator::reaped(pid_t pid, const Future<int>& status)
+void ProcessIsolator::reaped(pid_t pid, const Future<Option<int> >& status)
 {
-  if (status.isDiscarded()) {
-    LOG(ERROR) << "The status was discarded";
-    return;
-  }
-  if (status.isFailed()) {
-    LOG(ERROR) << status.failure();
-    return;
-  }
-
   foreachkey (const FrameworkID& frameworkId, infos) {
     foreachkey (const ExecutorID& executorId, infos[frameworkId]) {
       ProcessInfo* info = infos[frameworkId][executorId];
 
       if (info->pid.isSome() && info->pid.get() == pid) {
+        if (!status.isReady()) {
+          LOG(ERROR) << "Failed to get the status for executor '" << executorId
+                     << "' of framework " << frameworkId << ": "
+                     << (status.isFailed() ? status.failure() : "discarded");
+          return;
+        }
+
         LOG(INFO) << "Telling slave of terminated executor '" << executorId
                   << "' of framework " << frameworkId;
 

http://git-wip-us.apache.org/repos/asf/incubator-mesos/blob/ad198fe4/src/slave/process_isolator.hpp
----------------------------------------------------------------------
diff --git a/src/slave/process_isolator.hpp b/src/slave/process_isolator.hpp
index a3f6c68..4ae093f 100644
--- a/src/slave/process_isolator.hpp
+++ b/src/slave/process_isolator.hpp
@@ -109,7 +109,7 @@ private:
   Reaper reaper;
   hashmap<FrameworkID, hashmap<ExecutorID, ProcessInfo*> > infos;
 
-  void reaped(pid_t pid, const Future<int>& status);
+  void reaped(pid_t pid, const Future<Option<int> >& status);
 };
 
 } // namespace slave {

http://git-wip-us.apache.org/repos/asf/incubator-mesos/blob/ad198fe4/src/slave/reaper.cpp
----------------------------------------------------------------------
diff --git a/src/slave/reaper.cpp b/src/slave/reaper.cpp
index 1b19eb2..8d3101d 100644
--- a/src/slave/reaper.cpp
+++ b/src/slave/reaper.cpp
@@ -45,7 +45,7 @@ ReaperProcess::ReaperProcess()
   : ProcessBase(ID::generate("reaper")) {}
 
 
-Future<int> ReaperProcess::monitor(pid_t pid)
+Future<Option<int> > ReaperProcess::monitor(pid_t pid)
 {
   // Check to see if the current process has sufficient privileges to
   // monitor the liveness of this pid.
@@ -55,14 +55,14 @@ Future<int> ReaperProcess::monitor(pid_t pid)
     if (alive.get()) {
       // We have permissions to check the validity of the process
       // and it's alive, so add it to the promises map.
-      Owned<Promise<int> > promise(new Promise<int>());
+      Owned<Promise<Option<int> > > promise(new Promise<Option<int> >());
       promises.put(pid, promise);
       return promise->future();
     } else {
       // Process doesn't exist.
       LOG(WARNING) << "Cannot monitor process " << pid
                    << " because it doesn't exist";
-      return -1;
+      return None();
     }
   }
 
@@ -75,17 +75,17 @@ Future<int> ReaperProcess::monitor(pid_t pid)
     // The process terminated and the status was reaped.
     // Notify other listeners and return directly for this caller.
     notify(pid, status);
-    return status;
+    return Option<int>(status);
   } else if (result == 0) {
     // Child still active, add to the map.
-    Owned<Promise<int> > promise(new Promise<int>());
+    Owned<Promise<Option<int> > > promise(new Promise<Option<int> >());
     promises.put(pid, promise);
     return promise->future();
   } else {
     // Not a child nor do we have permission to for os::alive();
     // we cannot monitor this pid.
-    return Future<int>::failed("Failed to monitor process " +
-                               stringify(pid) + ": " + strerror(errno));
+    return Future<Option<int> >::failed(
+        "Failed to monitor process " + stringify(pid) + ": " + strerror(errno));
   }
 }
 
@@ -96,9 +96,9 @@ void ReaperProcess::initialize()
 }
 
 
-void ReaperProcess::notify(pid_t pid, int status)
+void ReaperProcess::notify(pid_t pid, Option<int> status)
 {
-  foreach (const Owned<Promise<int> >& promise, promises.get(pid)) {
+  foreach (const Owned<Promise<Option<int> > >& promise, promises.get(pid)) {
     promise->set(status);
   }
   promises.remove(pid);
@@ -120,7 +120,7 @@ void ReaperProcess::reap()
     // Notify the "listeners" only if they have requested to monitor
     // this pid. Otherwise the status is discarded.
     // This means if a child pid is registered via the monitor() call
-    // after it's reaped, an invalid status (-1) will be returned.
+    // after it's reaped, status 'None' will be returned.
     if (!WIFSTOPPED(status) && promises.contains(pid)) {
       notify(pid, status);
     }
@@ -145,7 +145,7 @@ void ReaperProcess::reap()
       LOG(WARNING) << "Cannot get the exit status of process " << pid
                    << " because it is not a child of the calling "
                    << "process: " << strerror(errno);
-      notify(pid, -1);
+      notify(pid, None());
     }
   }
 
@@ -168,7 +168,7 @@ Reaper::~Reaper()
 }
 
 
-Future<int> Reaper::monitor(pid_t pid)
+Future<Option<int> > Reaper::monitor(pid_t pid)
 {
   return dispatch(process, &ReaperProcess::monitor, pid);
 }

http://git-wip-us.apache.org/repos/asf/incubator-mesos/blob/ad198fe4/src/slave/reaper.hpp
----------------------------------------------------------------------
diff --git a/src/slave/reaper.hpp b/src/slave/reaper.hpp
index 15ebc0d..2d6db03 100644
--- a/src/slave/reaper.hpp
+++ b/src/slave/reaper.hpp
@@ -60,8 +60,10 @@ public:
   //
   // The exit status of 'pid' can only be correctly captured if the
   // calling process is the parent of 'pid' and the process hasn't
-  // been reaped yet, otherwise -1 is returned.
-  process::Future<int> monitor(pid_t pid);
+  // been reaped yet, otherwise 'None' is returned.
+  // Note that an invalid pid does not cause a failed Future, but an
+  // empty result ('None').
+  process::Future<Option<int> > monitor(pid_t pid);
 
 private:
   ReaperProcess* process;
@@ -74,22 +76,21 @@ class ReaperProcess : public process::Process<ReaperProcess>
 public:
   ReaperProcess();
 
-  process::Future<int> monitor(pid_t pid);
+  process::Future<Option<int> > monitor(pid_t pid);
 
 protected:
   virtual void initialize();
 
   void reap();
 
-  // TODO(vinod): Make 'status' an option.
   // The notification is sent only if the pid is explicitly registered
   // via the monitor() call.
-  void notify(pid_t pid, int status);
+  void notify(pid_t pid, Option<int> status);
 
 private:
   // Mapping from the monitored pid to all promises the pid exit
   // status should be sent to.
-  multihashmap<pid_t, Owned<process::Promise<int> > > promises;
+  multihashmap<pid_t, Owned<process::Promise<Option<int> > > > promises;
 };
 
 

http://git-wip-us.apache.org/repos/asf/incubator-mesos/blob/ad198fe4/src/slave/slave.cpp
----------------------------------------------------------------------
diff --git a/src/slave/slave.cpp b/src/slave/slave.cpp
index 952bd14..309d1ac 100644
--- a/src/slave/slave.cpp
+++ b/src/slave/slave.cpp
@@ -2045,19 +2045,28 @@ void _unwatch(
 void Slave::executorTerminated(
     const FrameworkID& frameworkId,
     const ExecutorID& executorId,
-    int status,
+    const Option<int>& status_,
     bool destroyed,
     const string& message)
 {
-  LOG(INFO) << "Executor '" << executorId
-            << "' of framework " << frameworkId
-            << (WIFEXITED(status)
-                ? " has exited with status '"
-                : " has terminated with signal '")
-            << (WIFEXITED(status)
-                ? stringify(WEXITSTATUS(status))
-                : strsignal(WTERMSIG(status)))
-            << "'";
+  int status;
+  if (status_.isNone()) {
+    LOG(INFO) << "Executor '" << executorId
+              << "' of framework " << frameworkId
+              << " has terminated with unknown status";
+    // Set a special status for None.
+    status = -1;
+  } else {
+    status = status_.get();
+    LOG(INFO) << "Executor '" << executorId
+              << "' of framework " << frameworkId
+              << (WIFEXITED(status)
+                  ? " has exited with status "
+                  : " has terminated with signal ")
+              << (WIFEXITED(status)
+                  ? stringify(WEXITSTATUS(status))
+                  : strsignal(WTERMSIG(status)));
+  }
 
   Framework* framework = getFramework(frameworkId);
   if (framework == NULL) {
@@ -2106,18 +2115,18 @@ void Slave::executorTerminated(
         // Transition all live launched tasks.
         foreachvalue (Task* task, utils::copy(executor->launchedTasks)) {
           if (!protobuf::isTerminalState(task->state())) {
-            mesos::TaskState status;
+            mesos::TaskState taskState;
             isCommandExecutor = !task->has_executor_id();
             if (destroyed || isCommandExecutor.get()) {
-              status = TASK_FAILED;
+              taskState = TASK_FAILED;
             } else {
-              status = TASK_LOST;
+              taskState = TASK_LOST;
             }
             statusUpdate(protobuf::createStatusUpdate(
                 frameworkId,
                 info.id(),
                 task->task_id(),
-                status,
+                taskState,
                 message,
                 executorId));
           }
@@ -2126,18 +2135,18 @@ void Slave::executorTerminated(
         // Transition all queued tasks.
         foreachvalue (const TaskInfo& task,
                       utils::copy(executor->queuedTasks)) {
-          mesos::TaskState status;
+          mesos::TaskState taskState;
           isCommandExecutor = task.has_command();
           if (destroyed || isCommandExecutor.get()) {
-            status = TASK_FAILED;
+            taskState = TASK_FAILED;
           } else {
-            status = TASK_LOST;
+            taskState = TASK_LOST;
           }
           statusUpdate(protobuf::createStatusUpdate(
               frameworkId,
               info.id(),
               task.task_id(),
-              status,
+              taskState,
               message,
               executorId));
         }

http://git-wip-us.apache.org/repos/asf/incubator-mesos/blob/ad198fe4/src/slave/slave.hpp
----------------------------------------------------------------------
diff --git a/src/slave/slave.hpp b/src/slave/slave.hpp
index d1ba82e..7ef6ad8 100644
--- a/src/slave/slave.hpp
+++ b/src/slave/slave.hpp
@@ -171,7 +171,7 @@ public:
   void executorTerminated(
       const FrameworkID& frameworkId,
       const ExecutorID& executorId,
-      int status,
+      const Option<int>& status,
       bool destroyed,
       const std::string& message);
 

http://git-wip-us.apache.org/repos/asf/incubator-mesos/blob/ad198fe4/src/tests/reaper_tests.cpp
----------------------------------------------------------------------
diff --git a/src/tests/reaper_tests.cpp b/src/tests/reaper_tests.cpp
index cfda0f4..d55ac24 100644
--- a/src/tests/reaper_tests.cpp
+++ b/src/tests/reaper_tests.cpp
@@ -29,6 +29,7 @@
 #include <process/gtest.hpp>
 
 #include <stout/exit.hpp>
+#include <stout/gtest.hpp>
 #include <stout/os.hpp>
 
 #include "slave/reaper.hpp"
@@ -99,7 +100,7 @@ TEST(ReaperTest, NonChildProcess)
   Future<Nothing> monitor = FUTURE_DISPATCH(_, &ReaperProcess::monitor);
 
   // Ask the reaper to monitor the grand child process.
-  Future<int> status = reaper.monitor(pid);
+  Future<Option<int> > status = reaper.monitor(pid);
 
   AWAIT_READY(monitor);
 
@@ -120,8 +121,8 @@ TEST(ReaperTest, NonChildProcess)
   // Ensure the reaper notifies of the terminated process.
   AWAIT_READY(status);
 
-  // Status is -1 because pid is not an immediate child.
-  ASSERT_EQ(-1, status.get());
+  // Status is None because pid is not an immediate child.
+  ASSERT_NONE(status.get());
 
   Clock::resume();
 }
@@ -149,7 +150,7 @@ TEST(ReaperTest, ChildProcess)
   Future<Nothing> monitor = FUTURE_DISPATCH(_, &ReaperProcess::monitor);
 
   // Ask the reaper to monitor the grand child process.
-  Future<int> status = reaper.monitor(pid);
+  Future<Option<int> > status = reaper.monitor(pid);
 
   AWAIT_READY(monitor);
 
@@ -168,9 +169,10 @@ TEST(ReaperTest, ChildProcess)
   AWAIT_READY(status);
 
   // Check if the status is correct.
-  int stat = status.get();
-  ASSERT_TRUE(WIFSIGNALED(stat));
-  ASSERT_EQ(SIGKILL, WTERMSIG(stat));
+  ASSERT_SOME(status.get());
+  int status_ = status.get().get();
+  ASSERT_TRUE(WIFSIGNALED(status_));
+  ASSERT_EQ(SIGKILL, WTERMSIG(status_));
 
   Clock::resume();
 }
@@ -212,7 +214,7 @@ TEST(ReaperTest, TerminatedChildProcess)
   Future<Nothing> monitor = FUTURE_DISPATCH(_, &ReaperProcess::monitor);
 
   // Ask the reaper to monitor the child process.
-  Future<int> status = reaper.monitor(pid);
+  Future<Option<int> > status = reaper.monitor(pid);
 
   AWAIT_READY(monitor);
 
@@ -227,7 +229,7 @@ TEST(ReaperTest, TerminatedChildProcess)
 
   // Invalid status is returned because it is reaped before being
   // monitored.
-  ASSERT_EQ(-1, status.get());
+  ASSERT_NONE(status.get());
 
   Clock::resume();
 }


[3/4] git commit: Changed reaper to use a Future as the notification mechanism. The notification is now sent only if the pid is explicitly registered via the monitor() call.

Posted by bm...@apache.org.
Changed reaper to use a Future as the notification mechanism.
The notification is now sent only if the pid is explicitly registered
via the monitor() call.

From: Jiang Yan Xu <ya...@jxu.me>
Review: https://reviews.apache.org/r/11554


Project: http://git-wip-us.apache.org/repos/asf/incubator-mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-mesos/commit/4dd692f1
Tree: http://git-wip-us.apache.org/repos/asf/incubator-mesos/tree/4dd692f1
Diff: http://git-wip-us.apache.org/repos/asf/incubator-mesos/diff/4dd692f1

Branch: refs/heads/master
Commit: 4dd692f19973482e145d67fc429ab931d0ecb882
Parents: ad842c3
Author: Benjamin Mahler <bm...@twitter.com>
Authored: Fri Jun 21 10:46:33 2013 -0700
Committer: Benjamin Mahler <bm...@twitter.com>
Committed: Fri Jun 21 10:46:33 2013 -0700

----------------------------------------------------------------------
 src/slave/cgroups_isolator.cpp |  35 ++++++---
 src/slave/cgroups_isolator.hpp |   6 +-
 src/slave/process_isolator.cpp |  36 ++++++---
 src/slave/process_isolator.hpp |   5 +-
 src/slave/reaper.cpp           | 121 ++++++++++++++++++-----------
 src/slave/reaper.hpp           |  72 +++++++++--------
 src/tests/reaper_tests.cpp     | 149 +++++++++++++++++++++++++++++-------
 7 files changed, 291 insertions(+), 133 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-mesos/blob/4dd692f1/src/slave/cgroups_isolator.cpp
----------------------------------------------------------------------
diff --git a/src/slave/cgroups_isolator.cpp b/src/slave/cgroups_isolator.cpp
index 9f5de60..6b87d08 100644
--- a/src/slave/cgroups_isolator.cpp
+++ b/src/slave/cgroups_isolator.cpp
@@ -195,13 +195,7 @@ std::ostream& operator << (std::ostream& out, const Cpuset& cpuset)
 CgroupsIsolator::CgroupsIsolator()
   : ProcessBase(ID::generate("cgroups-isolator")),
     initialized(false),
-    lockFile(None())
-{
-  // Spawn the reaper, note that it might send us a message before we
-  // actually get spawned ourselves, but that's okay, the message will
-  // just get dropped.
-  reaper.addListener(this);
-}
+    lockFile(None()) {}
 
 
 void CgroupsIsolator::initialize(
@@ -540,6 +534,12 @@ void CgroupsIsolator::launchExecutor(
     // Store the pid of the leading process of the executor.
     info->pid = pid;
 
+    reaper.monitor(pid)
+      .onAny(defer(PID<CgroupsIsolator>(this),
+                   &CgroupsIsolator::reaped,
+                   pid,
+                   lambda::_1));
+
     // Tell the slave this executor has started.
     dispatch(slave,
              &Slave::executorStarted,
@@ -769,7 +769,11 @@ Future<Nothing> CgroupsIsolator::recover(
 
         // Add the pid to the reaper to monitor exit status.
         if (run.forkedPid.isSome()) {
-          reaper.monitor(run.forkedPid.get());
+          reaper.monitor(run.forkedPid.get())
+            .onAny(defer(PID<CgroupsIsolator>(this),
+                         &CgroupsIsolator::reaped,
+                         run.forkedPid.get(),
+                         lambda::_1));
         }
       }
     }
@@ -797,8 +801,17 @@ Future<Nothing> CgroupsIsolator::recover(
 }
 
 
-void CgroupsIsolator::processExited(pid_t pid, int status)
+void CgroupsIsolator::reaped(pid_t pid, const Future<int>& status)
 {
+  if (status.isDiscarded()) {
+    LOG(ERROR) << "The status was discarded";
+    return;
+  }
+  if (status.isFailed()) {
+    LOG(ERROR) << status.failure();
+    return;
+  }
+
   CgroupInfo* info = findCgroupInfo(pid);
   if (info != NULL) {
     FrameworkID frameworkId = info->frameworkId;
@@ -806,10 +819,10 @@ void CgroupsIsolator::processExited(pid_t pid, int status)
 
     LOG(INFO) << "Executor " << executorId
               << " of framework " << frameworkId
-              << " terminated with status " << status;
+              << " terminated with status " << status.get();
 
     // Set the exit status, so that '_killExecutor()' can send it to the slave.
-    info->status = status;
+    info->status = status.get();
 
     if (!info->killed) {
       killExecutor(frameworkId, executorId);

http://git-wip-us.apache.org/repos/asf/incubator-mesos/blob/4dd692f1/src/slave/cgroups_isolator.hpp
----------------------------------------------------------------------
diff --git a/src/slave/cgroups_isolator.hpp b/src/slave/cgroups_isolator.hpp
index df6069e..06bac98 100644
--- a/src/slave/cgroups_isolator.hpp
+++ b/src/slave/cgroups_isolator.hpp
@@ -76,7 +76,7 @@ private:
 };
 
 
-class CgroupsIsolator : public Isolator, public ProcessExitedListener
+class CgroupsIsolator : public Isolator
 {
 public:
   CgroupsIsolator();
@@ -114,13 +114,13 @@ public:
   virtual process::Future<Nothing> recover(
       const Option<state::SlaveState>& state);
 
-  virtual void processExited(pid_t pid, int status);
-
 private:
   // No copying, no assigning.
   CgroupsIsolator(const CgroupsIsolator&);
   CgroupsIsolator& operator = (const CgroupsIsolator&);
 
+  void reaped(pid_t pid, const Future<int>& status);
+
   // The cgroup information for each live executor.
   struct CgroupInfo
   {

http://git-wip-us.apache.org/repos/asf/incubator-mesos/blob/4dd692f1/src/slave/process_isolator.cpp
----------------------------------------------------------------------
diff --git a/src/slave/process_isolator.cpp b/src/slave/process_isolator.cpp
index a3eace3..a3d31e5 100644
--- a/src/slave/process_isolator.cpp
+++ b/src/slave/process_isolator.cpp
@@ -25,12 +25,14 @@
 #include <set>
 
 #include <process/clock.hpp>
+#include <process/defer.hpp>
 #include <process/dispatch.hpp>
 #include <process/id.hpp>
 
 #include <stout/check.hpp>
 #include <stout/exit.hpp>
 #include <stout/foreach.hpp>
+#include <stout/lambda.hpp>
 #include <stout/nothing.hpp>
 #include <stout/option.hpp>
 #include <stout/os.hpp>
@@ -48,6 +50,7 @@ using std::map;
 using std::set;
 using std::string;
 
+using process::defer;
 using process::wait; // Necessary on some OS's to disambiguate.
 
 namespace mesos {
@@ -63,13 +66,7 @@ using state::RunState;
 
 ProcessIsolator::ProcessIsolator()
   : ProcessBase(ID::generate("process-isolator")),
-    initialized(false)
-{
-  // Spawn the reaper, note that it might send us a message before we
-  // actually get spawned ourselves, but that's okay, the message will
-  // just get dropped.
-  reaper.addListener(this);
-}
+    initialized(false) {}
 
 
 void ProcessIsolator::initialize(
@@ -166,6 +163,12 @@ void ProcessIsolator::launchExecutor(
     // Record the pid (should also be the pgid since we setsid below).
     infos[frameworkId][executorId]->pid = pid;
 
+    reaper.monitor(pid)
+      .onAny(defer(PID<ProcessIsolator>(this),
+                   &ProcessIsolator::reaped,
+                   pid,
+                   lambda::_1));
+
     // Tell the slave this executor has started.
     dispatch(slave, &Slave::executorStarted, frameworkId, executorId, pid);
   } else {
@@ -334,7 +337,11 @@ Future<Nothing> ProcessIsolator::recover(
 
       // Add the pid to the reaper to monitor exit status.
       if (run.forkedPid.isSome()) {
-        reaper.monitor(run.forkedPid.get());
+        reaper.monitor(run.forkedPid.get())
+          .onAny(defer(PID<ProcessIsolator>(this),
+                       &ProcessIsolator::reaped,
+                       run.forkedPid.get(),
+                       lambda::_1));
       }
     }
   }
@@ -416,8 +423,17 @@ Future<ResourceStatistics> ProcessIsolator::usage(
 }
 
 
-void ProcessIsolator::processExited(pid_t pid, int status)
+void ProcessIsolator::reaped(pid_t pid, const Future<int>& status)
 {
+  if (status.isDiscarded()) {
+    LOG(ERROR) << "The status was discarded";
+    return;
+  }
+  if (status.isFailed()) {
+    LOG(ERROR) << status.failure();
+    return;
+  }
+
   foreachkey (const FrameworkID& frameworkId, infos) {
     foreachkey (const ExecutorID& executorId, infos[frameworkId]) {
       ProcessInfo* info = infos[frameworkId][executorId];
@@ -430,7 +446,7 @@ void ProcessIsolator::processExited(pid_t pid, int status)
                  &Slave::executorTerminated,
                  frameworkId,
                  executorId,
-                 status,
+                 status.get(),
                  false,
                  "Executor terminated");
 

http://git-wip-us.apache.org/repos/asf/incubator-mesos/blob/4dd692f1/src/slave/process_isolator.hpp
----------------------------------------------------------------------
diff --git a/src/slave/process_isolator.hpp b/src/slave/process_isolator.hpp
index c5abc6b..a3f6c68 100644
--- a/src/slave/process_isolator.hpp
+++ b/src/slave/process_isolator.hpp
@@ -41,7 +41,7 @@ namespace mesos {
 namespace internal {
 namespace slave {
 
-class ProcessIsolator : public Isolator, public ProcessExitedListener
+class ProcessIsolator : public Isolator
 {
 public:
   ProcessIsolator();
@@ -77,7 +77,6 @@ public:
   virtual process::Future<Nothing> recover(
       const Option<state::SlaveState>& state);
 
-  virtual void processExited(pid_t pid, int status);
 
 private:
   // No copying, no assigning.
@@ -109,6 +108,8 @@ private:
   bool initialized;
   Reaper reaper;
   hashmap<FrameworkID, hashmap<ExecutorID, ProcessInfo*> > infos;
+
+  void reaped(pid_t pid, const Future<int>& status);
 };
 
 } // namespace slave {

http://git-wip-us.apache.org/repos/asf/incubator-mesos/blob/4dd692f1/src/slave/reaper.cpp
----------------------------------------------------------------------
diff --git a/src/slave/reaper.cpp b/src/slave/reaper.cpp
index c2e606e..1b19eb2 100644
--- a/src/slave/reaper.cpp
+++ b/src/slave/reaper.cpp
@@ -45,25 +45,48 @@ ReaperProcess::ReaperProcess()
   : ProcessBase(ID::generate("reaper")) {}
 
 
-void ReaperProcess::addListener(
-    const PID<ProcessExitedListener>& listener)
-{
-  listeners.push_back(listener);
-}
-
-
-Future<Nothing> ReaperProcess::monitor(pid_t pid)
+Future<int> ReaperProcess::monitor(pid_t pid)
 {
   // Check to see if the current process has sufficient privileges to
   // monitor the liveness of this pid.
   Try<bool> alive = os::alive(pid);
-  if (alive.isError()) {
-    return Future<Nothing>::failed("Failed to monitor process " +
-                                   stringify(pid) + ": " + alive.error());
+
+  if (alive.isSome()) {
+    if (alive.get()) {
+      // We have permissions to check the validity of the process
+      // and it's alive, so add it to the promises map.
+      Owned<Promise<int> > promise(new Promise<int>());
+      promises.put(pid, promise);
+      return promise->future();
+    } else {
+      // Process doesn't exist.
+      LOG(WARNING) << "Cannot monitor process " << pid
+                   << " because it doesn't exist";
+      return -1;
+    }
+  }
+
+  // Now we know we don't have permission for alive(), but we can
+  // still monitor it if it is our child.
+  int status;
+  pid_t result = waitpid(pid, &status, WNOHANG);
+
+  if (result > 0) {
+    // The process terminated and the status was reaped.
+    // Notify other listeners and return directly for this caller.
+    notify(pid, status);
+    return status;
+  } else if (result == 0) {
+    // Child still active, add to the map.
+    Owned<Promise<int> > promise(new Promise<int>());
+    promises.put(pid, promise);
+    return promise->future();
   } else {
-    pids.insert(pid);
+    // Not a child nor do we have permission to for os::alive();
+    // we cannot monitor this pid.
+    return Future<int>::failed("Failed to monitor process " +
+                               stringify(pid) + ": " + strerror(errno));
   }
-  return Nothing();
 }
 
 
@@ -75,45 +98,57 @@ void ReaperProcess::initialize()
 
 void ReaperProcess::notify(pid_t pid, int status)
 {
-  foreach (const PID<ProcessExitedListener>& listener, listeners) {
-    dispatch(listener, &ProcessExitedListener::processExited, pid, status);
+  foreach (const Owned<Promise<int> >& promise, promises.get(pid)) {
+    promise->set(status);
   }
+  promises.remove(pid);
 }
 
 
 void ReaperProcess::reap()
 {
-  // Check whether any monitored process has exited.
-  foreach (pid_t pid, utils::copy(pids)) {
-    Try<bool> alive = os::alive(pid);
-    CHECK_SOME(alive);
-
-    if (!alive.get()) { // The process has terminated.
-      // Attempt to reap the status.
-      // If pid is not a child process of the current process, this is a no-op.
-      int status = -1;
-      if (waitpid(pid, &status, WNOHANG) < 0) {
-        LOG(WARNING) << "Cannot get the exit status of process " << pid
-                     << " because it either does not exist or"
-                     << " is not a child of the calling process: "
-                     << strerror(errno);
-      }
-
-      notify(pid, status); // Notify the listeners.
-      pids.erase(pid);
-    }
-  }
+  // This method assumes that the registered PIDs are
+  // 1) children, or
+  // 2) non-children that we have permission to check liveness, or
+  // 3) nonexistent / reaped elsewhere.
 
-  // Check whether any child processes have exited.
+  // Reap all child processes first.
   pid_t pid;
   int status;
   while ((pid = waitpid(-1, &status, WNOHANG)) > 0) {
     // Ignore this if the process has only stopped.
-    if (!WIFSTOPPED(status)) {
-      notify(pid, status); // Notify the listeners.
-      pids.erase(pid);
+    // Notify the "listeners" only if they have requested to monitor
+    // this pid. Otherwise the status is discarded.
+    // This means if a child pid is registered via the monitor() call
+    // after it's reaped, an invalid status (-1) will be returned.
+    if (!WIFSTOPPED(status) && promises.contains(pid)) {
+      notify(pid, status);
+    }
+  }
+
+  // Check whether any monitored process has exited and been reaped.
+  // 1) If a child terminates before the foreach loop but after the
+  //    while loop, it won't be reaped until the next reap() cycle
+  //    and the alive() check below returns true.
+  // 2) If a non-child process terminates and is reaped elsewhere,
+  //    e.g. by init, we notify the listeners. (We know we have
+  //    permission to check its liveness in this case.)
+  // 3) If a non-child process terminates and is not yet reaped,
+  //    alive() returns true and no notification is sent.
+  // 4) If a child terminates before the while loop above, then we've
+  //    already reaped it and have the listeners notified!
+  foreach (pid_t pid, utils::copy(promises.keys())) {
+    Try<bool> alive = os::alive(pid);
+
+    if (alive.isSome() && !alive.get()) {
+      // The process has been reaped.
+      LOG(WARNING) << "Cannot get the exit status of process " << pid
+                   << " because it is not a child of the calling "
+                   << "process: " << strerror(errno);
+      notify(pid, -1);
     }
   }
+
   delay(Seconds(1), self(), &ReaperProcess::reap); // Reap forever!
 }
 
@@ -133,13 +168,7 @@ Reaper::~Reaper()
 }
 
 
-void Reaper::addListener(const process::PID<ProcessExitedListener>& listener)
-{
-  dispatch(process, &ReaperProcess::addListener, listener);
-}
-
-
-Future<Nothing> Reaper::monitor(pid_t pid)
+Future<int> Reaper::monitor(pid_t pid)
 {
   return dispatch(process, &ReaperProcess::monitor, pid);
 }

http://git-wip-us.apache.org/repos/asf/incubator-mesos/blob/4dd692f1/src/slave/reaper.hpp
----------------------------------------------------------------------
diff --git a/src/slave/reaper.hpp b/src/slave/reaper.hpp
index d599e9c..15ebc0d 100644
--- a/src/slave/reaper.hpp
+++ b/src/slave/reaper.hpp
@@ -22,32 +22,59 @@
 #include <list>
 #include <set>
 
+#include <process/future.hpp>
 #include <process/process.hpp>
 
+#include <stout/multihashmap.hpp>
 #include <stout/nothing.hpp>
+#include <stout/owned.hpp>
 #include <stout/try.hpp>
 
-
 namespace mesos {
 namespace internal {
 namespace slave {
 
-class ProcessExitedListener : public process::Process<ProcessExitedListener>
+// Forward declaration.
+class ReaperProcess;
+
+
+// TODO(vinod): Refactor the Reaper into 2 components:
+// 1) Reaps the status of child processes.
+// 2) Checks the exit status of requested processes.
+class Reaper
 {
 public:
-  virtual void processExited(pid_t pid, int status) = 0;
+  Reaper();
+  virtual ~Reaper();
+
+  // Monitor the given process and notify the caller if it terminates
+  // via a Future of the exit status.
+  //
+  // NOTE: The termination of pid can only be monitored if the
+  // calling process:
+  //   1) has the same real or effective user ID as the real or saved
+  //      set-user-ID of 'pid', or
+  //   2) is run as a privileged user, or
+  //   3) pid is a child of the current process.
+  // Otherwise a failed Future is returned.
+  //
+  // The exit status of 'pid' can only be correctly captured if the
+  // calling process is the parent of 'pid' and the process hasn't
+  // been reaped yet, otherwise -1 is returned.
+  process::Future<int> monitor(pid_t pid);
+
+private:
+  ReaperProcess* process;
 };
 
 
-// Reaper implementation. See comments of the Reaper class.
+// Reaper implementation.
 class ReaperProcess : public process::Process<ReaperProcess>
 {
 public:
   ReaperProcess();
 
-  void addListener(const process::PID<ProcessExitedListener>&);
-
-  process::Future<Nothing> monitor(pid_t pid);
+  process::Future<int> monitor(pid_t pid);
 
 protected:
   virtual void initialize();
@@ -55,35 +82,14 @@ protected:
   void reap();
 
   // TODO(vinod): Make 'status' an option.
+  // The notification is sent only if the pid is explicitly registered
+  // via the monitor() call.
   void notify(pid_t pid, int status);
 
 private:
-  std::list<process::PID<ProcessExitedListener> > listeners;
-  std::set<pid_t> pids;
-};
-
-
-// TODO(vinod): Refactor the Reaper into 2 components:
-// 1) Reaps the status of child processes.
-// 2) Checks the exit status of requested processes.
-// Also, use Futures instead of callbacks to notify process exits.
-class Reaper
-{
-public:
-  Reaper();
-  virtual ~Reaper();
-
-  void addListener(const process::PID<ProcessExitedListener>&);
-
-  // Monitor the given process and notify the listener if it terminates.
-  // NOTE: A notification is only sent if the calling process:
-  // 1) is the parent of 'pid' or
-  // 2) has the same real/effective UID as that of 'pid' or
-  // 3) is run as a privileged user.
-  process::Future<Nothing> monitor(pid_t pid);
-
-private:
-  ReaperProcess* process;
+  // Mapping from the monitored pid to all promises the pid exit
+  // status should be sent to.
+  multihashmap<pid_t, Owned<process::Promise<int> > > promises;
 };
 
 

http://git-wip-us.apache.org/repos/asf/incubator-mesos/blob/4dd692f1/src/tests/reaper_tests.cpp
----------------------------------------------------------------------
diff --git a/src/tests/reaper_tests.cpp b/src/tests/reaper_tests.cpp
index 6849419..cfda0f4 100644
--- a/src/tests/reaper_tests.cpp
+++ b/src/tests/reaper_tests.cpp
@@ -19,6 +19,8 @@
 #include <signal.h>
 #include <unistd.h>
 
+#include <sys/wait.h>
+
 #include <gtest/gtest.h>
 
 #include <process/clock.hpp>
@@ -26,6 +28,9 @@
 #include <process/gmock.hpp>
 #include <process/gtest.hpp>
 
+#include <stout/exit.hpp>
+#include <stout/os.hpp>
+
 #include "slave/reaper.hpp"
 
 using namespace mesos;
@@ -39,13 +44,6 @@ using testing::_;
 using testing::DoDefault;
 
 
-class MockProcessListener : public ProcessExitedListener
-{
-public:
-  MOCK_METHOD2(processExited, void(pid_t, int));
-};
-
-
 // This test checks that the Reaper can monitor a non-child process.
 TEST(ReaperTest, NonChildProcess)
 {
@@ -93,48 +91,143 @@ TEST(ReaperTest, NonChildProcess)
     }
   }
 
+  // In parent process.
   LOG(INFO) << "Grand child process " << pid;
 
-  MockProcessListener listener;
+  Reaper reaper;
 
-  // Spawn the listener.
-  spawn(listener);
+  Future<Nothing> monitor = FUTURE_DISPATCH(_, &ReaperProcess::monitor);
 
-  // Spawn the reaper.
-  Reaper reaper;
+  // Ask the reaper to monitor the grand child process.
+  Future<int> status = reaper.monitor(pid);
+
+  AWAIT_READY(monitor);
+
+  // Now kill the grand child.
+  // NOTE: We send a SIGKILL here because sometimes the grand child
+  // process seems to be in a hung state and not responding to
+  // SIGTERM/SIGINT.
+  EXPECT_EQ(0, kill(pid, SIGKILL));
+
+  Clock::pause();
+
+  // Now advance time until the reaper reaps the executor.
+  while (status.isPending()) {
+    Clock::advance(Seconds(1));
+    Clock::settle();
+  }
+
+  // Ensure the reaper notifies of the terminated process.
+  AWAIT_READY(status);
+
+  // Status is -1 because pid is not an immediate child.
+  ASSERT_EQ(-1, status.get());
+
+  Clock::resume();
+}
+
+
+// This test checks that the Reaper can monitor a child process with
+// accurate exit status returned.
+TEST(ReaperTest, ChildProcess)
+{
+  ASSERT_TRUE(GTEST_IS_THREADSAFE);
+
+  pid_t pid = fork();
+  ASSERT_NE(-1, pid);
+
+  if (pid == 0) {
+    // In child process. Keep waiting till we get a signal.
+    while (true);
+  }
+
+  // In parent process.
+  LOG(INFO) << "Child process " << pid;
 
-  // Ignore the exit of the child process.
-  EXPECT_CALL(listener, processExited(_,_))
-    .WillRepeatedly(DoDefault());
+  Reaper reaper;
 
-  reaper.addListener(listener.self());
+  Future<Nothing> monitor = FUTURE_DISPATCH(_, &ReaperProcess::monitor);
 
   // Ask the reaper to monitor the grand child process.
-  reaper.monitor(pid);
+  Future<int> status = reaper.monitor(pid);
 
-  // Catch the exit of the grand child process.
-  Future<Nothing> processExited;
-  EXPECT_CALL(listener, processExited(pid, _))
-    .WillOnce(FutureSatisfy(&processExited));
+  AWAIT_READY(monitor);
 
-  // Now kill the grand child.
-  // NOTE: We send a SIGKILL here because sometimes the grand child process
-  // seems to be in a hung state and not responding to SIGTERM/SIGINT.
+  // Now kill the child.
   EXPECT_EQ(0, kill(pid, SIGKILL));
 
   Clock::pause();
 
   // Now advance time until the reaper reaps the executor.
-  while (processExited.isPending()) {
+  while (status.isPending()) {
+    Clock::advance(Seconds(1));
+    Clock::settle();
+  }
+
+  // Ensure the reaper notifies of the terminated process.
+  AWAIT_READY(status);
+
+  // Check if the status is correct.
+  int stat = status.get();
+  ASSERT_TRUE(WIFSIGNALED(stat));
+  ASSERT_EQ(SIGKILL, WTERMSIG(stat));
+
+  Clock::resume();
+}
+
+
+// Check that the Reaper can monitor a child process that exits
+// before monitor() is called on it.
+TEST(ReaperTest, TerminatedChildProcess)
+{
+  ASSERT_TRUE(GTEST_IS_THREADSAFE);
+
+  pid_t pid = fork();
+  ASSERT_NE(-1, pid);
+
+  if (pid == 0) {
+    // In child process. Return directly
+    exit(EXIT_SUCCESS);
+  }
+
+  // In parent process.
+  LOG(INFO) << "Child process " << pid;
+
+  Reaper reaper;
+
+  Clock::pause();
+
+  ASSERT_SOME(os::alive(pid));
+
+  // Because reaper reaps all child processes even if they aren't
+  // registered, we advance time until that happens.
+  while (os::alive(pid).get()) {
+    Clock::advance(Seconds(1));
+    Clock::settle();
+  }
+
+  // Now we request to monitor the child process which is already
+  // reaped.
+
+  Future<Nothing> monitor = FUTURE_DISPATCH(_, &ReaperProcess::monitor);
+
+  // Ask the reaper to monitor the child process.
+  Future<int> status = reaper.monitor(pid);
+
+  AWAIT_READY(monitor);
+
+  // Now advance time until the reaper sends the notification.
+  while (status.isPending()) {
     Clock::advance(Seconds(1));
     Clock::settle();
   }
 
   // Ensure the reaper notifies of the terminated process.
-  AWAIT_READY(processExited);
+  AWAIT_READY(status);
 
-  terminate(listener);
-  wait(listener);
+  // Invalid status is returned because it is reaped before being
+  // monitored.
+  ASSERT_EQ(-1, status.get());
 
   Clock::resume();
 }


[2/4] git commit: Added (EXPECT|ASSERT)_NONE in gtest.hpp.

Posted by bm...@apache.org.
Added (EXPECT|ASSERT)_NONE in gtest.hpp.

From: Jiang Yan Xu <ya...@jxu.me>
Review: https://reviews.apache.org/r/11555


Project: http://git-wip-us.apache.org/repos/asf/incubator-mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-mesos/commit/ad842c39
Tree: http://git-wip-us.apache.org/repos/asf/incubator-mesos/tree/ad842c39
Diff: http://git-wip-us.apache.org/repos/asf/incubator-mesos/diff/ad842c39

Branch: refs/heads/master
Commit: ad842c3998fe5e97fe178cf7ae9ffe1f890f7b4f
Parents: 93dfc5f
Author: Benjamin Mahler <bm...@twitter.com>
Authored: Fri Jun 21 10:45:53 2013 -0700
Committer: Benjamin Mahler <bm...@twitter.com>
Committed: Fri Jun 21 10:45:53 2013 -0700

----------------------------------------------------------------------
 3rdparty/libprocess/3rdparty/stout/include/stout/gtest.hpp | 8 ++++++++
 1 file changed, 8 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-mesos/blob/ad842c39/3rdparty/libprocess/3rdparty/stout/include/stout/gtest.hpp
----------------------------------------------------------------------
diff --git a/3rdparty/libprocess/3rdparty/stout/include/stout/gtest.hpp b/3rdparty/libprocess/3rdparty/stout/include/stout/gtest.hpp
index 3c34124..1f10834 100644
--- a/3rdparty/libprocess/3rdparty/stout/include/stout/gtest.hpp
+++ b/3rdparty/libprocess/3rdparty/stout/include/stout/gtest.hpp
@@ -119,4 +119,12 @@ template <typename T1, typename T2>
 #define EXPECT_ERROR(actual)                    \
   EXPECT_TRUE(actual.isError())
 
+
+#define ASSERT_NONE(actual)                     \
+  ASSERT_TRUE(actual.isNone())
+
+
+#define EXPECT_NONE(actual)                     \
+  EXPECT_TRUE(actual.isNone())
+
 #endif // __STOUT_GTEST_HPP__