You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@mesos.apache.org by bm...@apache.org on 2013/06/21 19:55:17 UTC

[4/4] git commit: Changed reaper to use an Option for the process exit status.

Changed reaper to use an Option for the process exit status.

From: Jiang Yan Xu <ya...@jxu.me>
Review: https://reviews.apache.org/r/11556


Project: http://git-wip-us.apache.org/repos/asf/incubator-mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-mesos/commit/ad198fe4
Tree: http://git-wip-us.apache.org/repos/asf/incubator-mesos/tree/ad198fe4
Diff: http://git-wip-us.apache.org/repos/asf/incubator-mesos/diff/ad198fe4

Branch: refs/heads/master
Commit: ad198fe4876ce2ce027c55e04c198005294d0fc5
Parents: 4dd692f
Author: Benjamin Mahler <bm...@twitter.com>
Authored: Fri Jun 21 10:47:44 2013 -0700
Committer: Benjamin Mahler <bm...@twitter.com>
Committed: Fri Jun 21 10:47:44 2013 -0700

----------------------------------------------------------------------
 src/slave/cgroups_isolator.cpp | 23 ++++++++++---------
 src/slave/cgroups_isolator.hpp |  4 ++--
 src/slave/process_isolator.cpp | 18 +++++++--------
 src/slave/process_isolator.hpp |  2 +-
 src/slave/reaper.cpp           | 24 ++++++++++----------
 src/slave/reaper.hpp           | 13 ++++++-----
 src/slave/slave.cpp            | 45 ++++++++++++++++++++++---------------
 src/slave/slave.hpp            |  2 +-
 src/tests/reaper_tests.cpp     | 20 +++++++++--------
 9 files changed, 81 insertions(+), 70 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-mesos/blob/ad198fe4/src/slave/cgroups_isolator.cpp
----------------------------------------------------------------------
diff --git a/src/slave/cgroups_isolator.cpp b/src/slave/cgroups_isolator.cpp
index 6b87d08..9d18886 100644
--- a/src/slave/cgroups_isolator.cpp
+++ b/src/slave/cgroups_isolator.cpp
@@ -801,25 +801,26 @@ Future<Nothing> CgroupsIsolator::recover(
 }
 
 
-void CgroupsIsolator::reaped(pid_t pid, const Future<int>& status)
+void CgroupsIsolator::reaped(pid_t pid, const Future<Option<int> >& status)
 {
-  if (status.isDiscarded()) {
-    LOG(ERROR) << "The status was discarded";
-    return;
-  }
-  if (status.isFailed()) {
-    LOG(ERROR) << status.failure();
-    return;
-  }
-
   CgroupInfo* info = findCgroupInfo(pid);
   if (info != NULL) {
     FrameworkID frameworkId = info->frameworkId;
     ExecutorID executorId = info->executorId;
 
+    if (!status.isReady()) {
+      LOG(ERROR) << "Failed to get the status for executor " << executorId
+                 << " of framework " << frameworkId << ": "
+                 << (status.isFailed() ? status.failure() : "discarded");
+      return;
+    }
+
     LOG(INFO) << "Executor " << executorId
               << " of framework " << frameworkId
-              << " terminated with status " << status.get();
+              << " terminated with status "
+              << (status.get().isSome()
+                  ? stringify(status.get().get())
+                  : "unknown");
 
     // Set the exit status, so that '_killExecutor()' can send it to the slave.
     info->status = status.get();

http://git-wip-us.apache.org/repos/asf/incubator-mesos/blob/ad198fe4/src/slave/cgroups_isolator.hpp
----------------------------------------------------------------------
diff --git a/src/slave/cgroups_isolator.hpp b/src/slave/cgroups_isolator.hpp
index 06bac98..e86062e 100644
--- a/src/slave/cgroups_isolator.hpp
+++ b/src/slave/cgroups_isolator.hpp
@@ -119,7 +119,7 @@ private:
   CgroupsIsolator(const CgroupsIsolator&);
   CgroupsIsolator& operator = (const CgroupsIsolator&);
 
-  void reaped(pid_t pid, const Future<int>& status);
+  void reaped(pid_t pid, const Future<Option<int> >& status);
 
   // The cgroup information for each live executor.
   struct CgroupInfo
@@ -163,7 +163,7 @@ private:
 
     std::string message; // The reason behind the destruction.
 
-    int status; // Exit status of the executor.
+    Option<int> status; // Exit status of the executor.
 
     Flags flags; // Slave flags.
 

http://git-wip-us.apache.org/repos/asf/incubator-mesos/blob/ad198fe4/src/slave/process_isolator.cpp
----------------------------------------------------------------------
diff --git a/src/slave/process_isolator.cpp b/src/slave/process_isolator.cpp
index a3d31e5..b750b26 100644
--- a/src/slave/process_isolator.cpp
+++ b/src/slave/process_isolator.cpp
@@ -423,22 +423,20 @@ Future<ResourceStatistics> ProcessIsolator::usage(
 }
 
 
-void ProcessIsolator::reaped(pid_t pid, const Future<int>& status)
+void ProcessIsolator::reaped(pid_t pid, const Future<Option<int> >& status)
 {
-  if (status.isDiscarded()) {
-    LOG(ERROR) << "The status was discarded";
-    return;
-  }
-  if (status.isFailed()) {
-    LOG(ERROR) << status.failure();
-    return;
-  }
-
   foreachkey (const FrameworkID& frameworkId, infos) {
     foreachkey (const ExecutorID& executorId, infos[frameworkId]) {
       ProcessInfo* info = infos[frameworkId][executorId];
 
       if (info->pid.isSome() && info->pid.get() == pid) {
+        if (!status.isReady()) {
+          LOG(ERROR) << "Failed to get the status for executor '" << executorId
+                     << "' of framework " << frameworkId << ": "
+                     << (status.isFailed() ? status.failure() : "discarded");
+          return;
+        }
+
         LOG(INFO) << "Telling slave of terminated executor '" << executorId
                   << "' of framework " << frameworkId;
 

http://git-wip-us.apache.org/repos/asf/incubator-mesos/blob/ad198fe4/src/slave/process_isolator.hpp
----------------------------------------------------------------------
diff --git a/src/slave/process_isolator.hpp b/src/slave/process_isolator.hpp
index a3f6c68..4ae093f 100644
--- a/src/slave/process_isolator.hpp
+++ b/src/slave/process_isolator.hpp
@@ -109,7 +109,7 @@ private:
   Reaper reaper;
   hashmap<FrameworkID, hashmap<ExecutorID, ProcessInfo*> > infos;
 
-  void reaped(pid_t pid, const Future<int>& status);
+  void reaped(pid_t pid, const Future<Option<int> >& status);
 };
 
 } // namespace slave {

http://git-wip-us.apache.org/repos/asf/incubator-mesos/blob/ad198fe4/src/slave/reaper.cpp
----------------------------------------------------------------------
diff --git a/src/slave/reaper.cpp b/src/slave/reaper.cpp
index 1b19eb2..8d3101d 100644
--- a/src/slave/reaper.cpp
+++ b/src/slave/reaper.cpp
@@ -45,7 +45,7 @@ ReaperProcess::ReaperProcess()
   : ProcessBase(ID::generate("reaper")) {}
 
 
-Future<int> ReaperProcess::monitor(pid_t pid)
+Future<Option<int> > ReaperProcess::monitor(pid_t pid)
 {
   // Check to see if the current process has sufficient privileges to
   // monitor the liveness of this pid.
@@ -55,14 +55,14 @@ Future<int> ReaperProcess::monitor(pid_t pid)
     if (alive.get()) {
       // We have permissions to check the validity of the process
       // and it's alive, so add it to the promises map.
-      Owned<Promise<int> > promise(new Promise<int>());
+      Owned<Promise<Option<int> > > promise(new Promise<Option<int> >());
       promises.put(pid, promise);
       return promise->future();
     } else {
       // Process doesn't exist.
       LOG(WARNING) << "Cannot monitor process " << pid
                    << " because it doesn't exist";
-      return -1;
+      return None();
     }
   }
 
@@ -75,17 +75,17 @@ Future<int> ReaperProcess::monitor(pid_t pid)
     // The process terminated and the status was reaped.
     // Notify other listeners and return directly for this caller.
     notify(pid, status);
-    return status;
+    return Option<int>(status);
   } else if (result == 0) {
     // Child still active, add to the map.
-    Owned<Promise<int> > promise(new Promise<int>());
+    Owned<Promise<Option<int> > > promise(new Promise<Option<int> >());
     promises.put(pid, promise);
     return promise->future();
   } else {
     // Not a child nor do we have permission to for os::alive();
     // we cannot monitor this pid.
-    return Future<int>::failed("Failed to monitor process " +
-                               stringify(pid) + ": " + strerror(errno));
+    return Future<Option<int> >::failed(
+        "Failed to monitor process " + stringify(pid) + ": " + strerror(errno));
   }
 }
 
@@ -96,9 +96,9 @@ void ReaperProcess::initialize()
 }
 
 
-void ReaperProcess::notify(pid_t pid, int status)
+void ReaperProcess::notify(pid_t pid, Option<int> status)
 {
-  foreach (const Owned<Promise<int> >& promise, promises.get(pid)) {
+  foreach (const Owned<Promise<Option<int> > >& promise, promises.get(pid)) {
     promise->set(status);
   }
   promises.remove(pid);
@@ -120,7 +120,7 @@ void ReaperProcess::reap()
     // Notify the "listeners" only if they have requested to monitor
     // this pid. Otherwise the status is discarded.
     // This means if a child pid is registered via the monitor() call
-    // after it's reaped, an invalid status (-1) will be returned.
+    // after it's reaped, status 'None' will be returned.
     if (!WIFSTOPPED(status) && promises.contains(pid)) {
       notify(pid, status);
     }
@@ -145,7 +145,7 @@ void ReaperProcess::reap()
       LOG(WARNING) << "Cannot get the exit status of process " << pid
                    << " because it is not a child of the calling "
                    << "process: " << strerror(errno);
-      notify(pid, -1);
+      notify(pid, None());
     }
   }
 
@@ -168,7 +168,7 @@ Reaper::~Reaper()
 }
 
 
-Future<int> Reaper::monitor(pid_t pid)
+Future<Option<int> > Reaper::monitor(pid_t pid)
 {
   return dispatch(process, &ReaperProcess::monitor, pid);
 }

http://git-wip-us.apache.org/repos/asf/incubator-mesos/blob/ad198fe4/src/slave/reaper.hpp
----------------------------------------------------------------------
diff --git a/src/slave/reaper.hpp b/src/slave/reaper.hpp
index 15ebc0d..2d6db03 100644
--- a/src/slave/reaper.hpp
+++ b/src/slave/reaper.hpp
@@ -60,8 +60,10 @@ public:
   //
   // The exit status of 'pid' can only be correctly captured if the
   // calling process is the parent of 'pid' and the process hasn't
-  // been reaped yet, otherwise -1 is returned.
-  process::Future<int> monitor(pid_t pid);
+  // been reaped yet, otherwise 'None' is returned.
+  // Note that an invalid pid does not cause a failed Future, but an
+  // empty result ('None').
+  process::Future<Option<int> > monitor(pid_t pid);
 
 private:
   ReaperProcess* process;
@@ -74,22 +76,21 @@ class ReaperProcess : public process::Process<ReaperProcess>
 public:
   ReaperProcess();
 
-  process::Future<int> monitor(pid_t pid);
+  process::Future<Option<int> > monitor(pid_t pid);
 
 protected:
   virtual void initialize();
 
   void reap();
 
-  // TODO(vinod): Make 'status' an option.
   // The notification is sent only if the pid is explicitly registered
   // via the monitor() call.
-  void notify(pid_t pid, int status);
+  void notify(pid_t pid, Option<int> status);
 
 private:
   // Mapping from the monitored pid to all promises the pid exit
   // status should be sent to.
-  multihashmap<pid_t, Owned<process::Promise<int> > > promises;
+  multihashmap<pid_t, Owned<process::Promise<Option<int> > > > promises;
 };
 
 

http://git-wip-us.apache.org/repos/asf/incubator-mesos/blob/ad198fe4/src/slave/slave.cpp
----------------------------------------------------------------------
diff --git a/src/slave/slave.cpp b/src/slave/slave.cpp
index 952bd14..309d1ac 100644
--- a/src/slave/slave.cpp
+++ b/src/slave/slave.cpp
@@ -2045,19 +2045,28 @@ void _unwatch(
 void Slave::executorTerminated(
     const FrameworkID& frameworkId,
     const ExecutorID& executorId,
-    int status,
+    const Option<int>& status_,
     bool destroyed,
     const string& message)
 {
-  LOG(INFO) << "Executor '" << executorId
-            << "' of framework " << frameworkId
-            << (WIFEXITED(status)
-                ? " has exited with status '"
-                : " has terminated with signal '")
-            << (WIFEXITED(status)
-                ? stringify(WEXITSTATUS(status))
-                : strsignal(WTERMSIG(status)))
-            << "'";
+  int status;
+  if (status_.isNone()) {
+    LOG(INFO) << "Executor '" << executorId
+              << "' of framework " << frameworkId
+              << " has terminated with unknown status";
+    // Set a special status for None.
+    status = -1;
+  } else {
+    status = status_.get();
+    LOG(INFO) << "Executor '" << executorId
+              << "' of framework " << frameworkId
+              << (WIFEXITED(status)
+                  ? " has exited with status "
+                  : " has terminated with signal ")
+              << (WIFEXITED(status)
+                  ? stringify(WEXITSTATUS(status))
+                  : strsignal(WTERMSIG(status)));
+  }
 
   Framework* framework = getFramework(frameworkId);
   if (framework == NULL) {
@@ -2106,18 +2115,18 @@ void Slave::executorTerminated(
         // Transition all live launched tasks.
         foreachvalue (Task* task, utils::copy(executor->launchedTasks)) {
           if (!protobuf::isTerminalState(task->state())) {
-            mesos::TaskState status;
+            mesos::TaskState taskState;
             isCommandExecutor = !task->has_executor_id();
             if (destroyed || isCommandExecutor.get()) {
-              status = TASK_FAILED;
+              taskState = TASK_FAILED;
             } else {
-              status = TASK_LOST;
+              taskState = TASK_LOST;
             }
             statusUpdate(protobuf::createStatusUpdate(
                 frameworkId,
                 info.id(),
                 task->task_id(),
-                status,
+                taskState,
                 message,
                 executorId));
           }
@@ -2126,18 +2135,18 @@ void Slave::executorTerminated(
         // Transition all queued tasks.
         foreachvalue (const TaskInfo& task,
                       utils::copy(executor->queuedTasks)) {
-          mesos::TaskState status;
+          mesos::TaskState taskState;
           isCommandExecutor = task.has_command();
           if (destroyed || isCommandExecutor.get()) {
-            status = TASK_FAILED;
+            taskState = TASK_FAILED;
           } else {
-            status = TASK_LOST;
+            taskState = TASK_LOST;
           }
           statusUpdate(protobuf::createStatusUpdate(
               frameworkId,
               info.id(),
               task.task_id(),
-              status,
+              taskState,
               message,
               executorId));
         }

http://git-wip-us.apache.org/repos/asf/incubator-mesos/blob/ad198fe4/src/slave/slave.hpp
----------------------------------------------------------------------
diff --git a/src/slave/slave.hpp b/src/slave/slave.hpp
index d1ba82e..7ef6ad8 100644
--- a/src/slave/slave.hpp
+++ b/src/slave/slave.hpp
@@ -171,7 +171,7 @@ public:
   void executorTerminated(
       const FrameworkID& frameworkId,
       const ExecutorID& executorId,
-      int status,
+      const Option<int>& status,
       bool destroyed,
       const std::string& message);
 

http://git-wip-us.apache.org/repos/asf/incubator-mesos/blob/ad198fe4/src/tests/reaper_tests.cpp
----------------------------------------------------------------------
diff --git a/src/tests/reaper_tests.cpp b/src/tests/reaper_tests.cpp
index cfda0f4..d55ac24 100644
--- a/src/tests/reaper_tests.cpp
+++ b/src/tests/reaper_tests.cpp
@@ -29,6 +29,7 @@
 #include <process/gtest.hpp>
 
 #include <stout/exit.hpp>
+#include <stout/gtest.hpp>
 #include <stout/os.hpp>
 
 #include "slave/reaper.hpp"
@@ -99,7 +100,7 @@ TEST(ReaperTest, NonChildProcess)
   Future<Nothing> monitor = FUTURE_DISPATCH(_, &ReaperProcess::monitor);
 
   // Ask the reaper to monitor the grand child process.
-  Future<int> status = reaper.monitor(pid);
+  Future<Option<int> > status = reaper.monitor(pid);
 
   AWAIT_READY(monitor);
 
@@ -120,8 +121,8 @@ TEST(ReaperTest, NonChildProcess)
   // Ensure the reaper notifies of the terminated process.
   AWAIT_READY(status);
 
-  // Status is -1 because pid is not an immediate child.
-  ASSERT_EQ(-1, status.get());
+  // Status is None because pid is not an immediate child.
+  ASSERT_NONE(status.get());
 
   Clock::resume();
 }
@@ -149,7 +150,7 @@ TEST(ReaperTest, ChildProcess)
   Future<Nothing> monitor = FUTURE_DISPATCH(_, &ReaperProcess::monitor);
 
   // Ask the reaper to monitor the grand child process.
-  Future<int> status = reaper.monitor(pid);
+  Future<Option<int> > status = reaper.monitor(pid);
 
   AWAIT_READY(monitor);
 
@@ -168,9 +169,10 @@ TEST(ReaperTest, ChildProcess)
   AWAIT_READY(status);
 
   // Check if the status is correct.
-  int stat = status.get();
-  ASSERT_TRUE(WIFSIGNALED(stat));
-  ASSERT_EQ(SIGKILL, WTERMSIG(stat));
+  ASSERT_SOME(status.get());
+  int status_ = status.get().get();
+  ASSERT_TRUE(WIFSIGNALED(status_));
+  ASSERT_EQ(SIGKILL, WTERMSIG(status_));
 
   Clock::resume();
 }
@@ -212,7 +214,7 @@ TEST(ReaperTest, TerminatedChildProcess)
   Future<Nothing> monitor = FUTURE_DISPATCH(_, &ReaperProcess::monitor);
 
   // Ask the reaper to monitor the child process.
-  Future<int> status = reaper.monitor(pid);
+  Future<Option<int> > status = reaper.monitor(pid);
 
   AWAIT_READY(monitor);
 
@@ -227,7 +229,7 @@ TEST(ReaperTest, TerminatedChildProcess)
 
   // Invalid status is returned because it is reaped before being
   // monitored.
-  ASSERT_EQ(-1, status.get());
+  ASSERT_NONE(status.get());
 
   Clock::resume();
 }