You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@mesos.apache.org by vi...@apache.org on 2013/07/19 23:20:27 UTC

[2/2] git commit: Refactored Command Executor to use libprocess.

Refactored Command Executor to use libprocess.

Review: https://reviews.apache.org/r/12754


Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/6d8aca41
Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/6d8aca41
Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/6d8aca41

Branch: refs/heads/master
Commit: 6d8aca413d14fd81770c9adc7d8ee510701d4b5b
Parents: 898e906
Author: Vinod Kone <vi...@twitter.com>
Authored: Thu Jul 18 19:59:41 2013 -0700
Committer: Vinod Kone <vi...@twitter.com>
Committed: Fri Jul 19 14:19:50 2013 -0700

----------------------------------------------------------------------
 src/launcher/executor.cpp | 242 +++++++++++++++++++++++++++++------------
 src/slave/reaper.hpp      |   4 +-
 2 files changed, 174 insertions(+), 72 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/mesos/blob/6d8aca41/src/launcher/executor.cpp
----------------------------------------------------------------------
diff --git a/src/launcher/executor.cpp b/src/launcher/executor.cpp
index 2907ec5..c5fd78e 100644
--- a/src/launcher/executor.cpp
+++ b/src/launcher/executor.cpp
@@ -26,105 +26,81 @@
 
 #include <mesos/executor.hpp>
 
+#include <process/defer.hpp>
+#include <process/future.hpp>
+#include <process/process.hpp>
+
 #include <stout/duration.hpp>
+#include <stout/lambda.hpp>
 #include <stout/os.hpp>
 #include <stout/strings.hpp>
 
-#include "common/thread.hpp"
+#include "common/type_utils.hpp"
 
 #include "logging/logging.hpp"
 
+#include "slave/reaper.hpp"
+
+using process::wait; // Necessary on some OS's to disambiguate.
+
+using std::cout;
+using std::cerr;
+using std::endl;
 using std::string;
 
 namespace mesos {
 namespace internal {
 
-// Waits for command to finish. Note that we currently launch a thread
-// that calls this function and thus the ExecutorDriver pointer might
-// be used by multiple threads. This is not ideal, but should be
-// sufficient for now (see the comment below where we instantiate the
-// MesosSchedulerDriver for how we "get around" any issues related to
-// the driver pointer becoming a dangling reference).
-static void waiter(pid_t pid, const TaskID& taskId, ExecutorDriver* driver)
-{
-  int status;
-  while (wait(&status) != pid || WIFSTOPPED(status));
-
-  CHECK(WIFEXITED(status) || WIFSIGNALED(status));
-
-  std::cout << "Waited on process " << pid
-            << ", returned status " << status << std::endl;
-
-  TaskStatus taskStatus;
-  taskStatus.mutable_task_id()->MergeFrom(taskId);
-
-  if (WIFEXITED(status) && WEXITSTATUS(status) == 0) {
-    taskStatus.set_state(TASK_FINISHED);
-  } else {
-    taskStatus.set_state(TASK_FAILED);
-  }
-
-  Try<string> message = WIFEXITED(status)
-    ? strings::format("Command exited with status %d", WEXITSTATUS(status))
-    : strings::format("Command terminated with signal '%s'",
-                      strsignal(WTERMSIG(status)));
-
-  if (message.isSome()) {
-    taskStatus.set_message(message.get());
-  }
-
-  driver->sendStatusUpdate(taskStatus);
-
-  // A hack for now ... but we need to wait until for the status
-  // update to get sent to the slave before we shut ourselves down.
-  os::sleep(Seconds(1));
-  driver->stop();
-}
+using namespace process;
 
 
-class CommandExecutor : public Executor
+class CommandExecutorProcess : public Process<CommandExecutorProcess>
 {
 public:
-  CommandExecutor()
+  CommandExecutorProcess()
     : launched(false),
       killed(false),
       pid(-1) {}
 
-  virtual ~CommandExecutor() {}
+  virtual ~CommandExecutorProcess() {}
 
-  virtual void registered(
+  void registered(
       ExecutorDriver* driver,
       const ExecutorInfo& executorInfo,
       const FrameworkInfo& frameworkInfo,
       const SlaveInfo& slaveInfo)
   {
-    std::cout << "Registered executor on " << slaveInfo.hostname() << std::endl;
+    cout << "Registered executor on " << slaveInfo.hostname() << endl;
   }
 
-  virtual void reregistered(ExecutorDriver* driver,
-                            const SlaveInfo& slaveInfo)
+  void reregistered(
+      ExecutorDriver* driver,
+      const SlaveInfo& slaveInfo)
   {
-    std::cout << "Re-registered executor on " << slaveInfo.hostname()
-      << std::endl;
+    cout << "Re-registered executor on " << slaveInfo.hostname() << endl;
   }
 
-  virtual void disconnected(ExecutorDriver* driver) {}
+  void disconnected(ExecutorDriver* driver) {}
 
-  virtual void launchTask(ExecutorDriver* driver, const TaskInfo& task)
+  void launchTask(ExecutorDriver* driver, const TaskInfo& task)
   {
     if (launched) {
       TaskStatus status;
       status.mutable_task_id()->MergeFrom(task.task_id());
       status.set_state(TASK_FAILED);
-      status.set_message("Attempted to run tasks using a \"command\" executor");
+      status.set_message(
+          "Attempted to run multiple tasks using a \"command\" executor");
+
       driver->sendStatusUpdate(status);
       return;
     }
 
-    CHECK(task.has_command()) << "Expecting task to have a command!";
+    CHECK(task.has_command()) << "Expecting task " << task.task_id()
+                              << " to have a command!";
 
-    std::cout << "Starting task " << task.task_id().value() << std::endl;
+    std::cout << "Starting task " << task.task_id() << std::endl;
 
+    // TODO(benh): Clean this up with the new 'Fork' abstraction.
     // Use pipes to determine which child has successfully changed
     // session. This is needed as the setsid call can fail from other
     // processes having the same group id.
@@ -208,8 +184,14 @@ public:
 
     std::cout << "Forked command at " << pid << std::endl;
 
-    // In parent process, fork a thread to wait for this process.
-    thread::start(std::tr1::bind(&waiter, pid, task.task_id(), driver));
+    // Monitor this process.
+    reaper.monitor(pid)
+      .onAny(defer(self(),
+                   &Self::reaped,
+                   driver,
+                   task.task_id(),
+                   pid,
+                   lambda::_1));
 
     TaskStatus status;
     status.mutable_task_id()->MergeFrom(task.task_id());
@@ -219,7 +201,7 @@ public:
     launched = true;
   }
 
-  virtual void killTask(ExecutorDriver* driver, const TaskID& taskId)
+  void killTask(ExecutorDriver* driver, const TaskID& taskId)
   {
     std::cout << "Killing command at pid " << pid << std::endl;
 
@@ -231,9 +213,9 @@ public:
     }
   }
 
-  virtual void frameworkMessage(ExecutorDriver* driver, const string& data) {}
+  void frameworkMessage(ExecutorDriver* driver, const string& data) {}
 
-  virtual void shutdown(ExecutorDriver* driver)
+  void shutdown(ExecutorDriver* driver)
   {
     std::cout << "Shutting down" << std::endl;
 
@@ -248,9 +230,137 @@ public:
   virtual void error(ExecutorDriver* driver, const string& message) {}
 
 private:
+  void reaped(
+      ExecutorDriver* driver,
+      const TaskID& taskId,
+      pid_t pid,
+      const Future<Option<int> >& status_)
+  {
+    TaskState state;
+    string message;
+
+    if (!status_.isReady()) {
+      state = TASK_FAILED;
+      message =
+        "Failed to get exit status for Command: " +
+        (status_.isFailed() ? status_.failure() : "future discarded");
+    } else if (status_.get().isNone()) {
+      state = TASK_FAILED;
+      message = "Failed to get exit status for Command";
+    } else {
+      int status = status_.get().get();
+      CHECK(WIFEXITED(status) || WIFSIGNALED(status)) << status;
+
+      if (WIFEXITED(status) && WEXITSTATUS(status) == 0) {
+        state = TASK_FINISHED;
+      } else {
+        // TODO(vinod): Send TASK_KILLED if 'killed' is true.
+        state = TASK_FAILED;
+      }
+
+      message = string("Command") +
+          (WIFEXITED(status)
+          ? " exited with status "
+          : " terminated with signal ") +
+          (WIFEXITED(status)
+          ? stringify(WEXITSTATUS(status))
+          : strsignal(WTERMSIG(status)));
+    }
+
+    cout << message << " (pid: " << pid << ")" << endl;
+
+    TaskStatus taskStatus;
+    taskStatus.mutable_task_id()->MergeFrom(taskId);
+    taskStatus.set_state(state);
+    taskStatus.set_message(message);
+
+    driver->sendStatusUpdate(taskStatus);
+
+    // A hack for now ... but we need to wait until the status update
+    // is sent to the slave before we shut ourselves down.
+    os::sleep(Seconds(1));
+    driver->stop();
+  }
+
   bool launched;
   bool killed;
   pid_t pid;
+  slave::Reaper reaper;
+};
+
+
+class CommandExecutor: public Executor
+{
+public:
+  CommandExecutor()
+  {
+    process = new CommandExecutorProcess();
+    spawn(process);
+  }
+
+  virtual ~CommandExecutor()
+  {
+    terminate(process);
+    wait(process);
+    delete process;
+  }
+
+  virtual void registered(
+        ExecutorDriver* driver,
+        const ExecutorInfo& executorInfo,
+        const FrameworkInfo& frameworkInfo,
+        const SlaveInfo& slaveInfo)
+  {
+    dispatch(process,
+             &CommandExecutorProcess::registered,
+             driver,
+             executorInfo,
+             frameworkInfo,
+             slaveInfo);
+  }
+
+  virtual void reregistered(
+      ExecutorDriver* driver,
+      const SlaveInfo& slaveInfo)
+  {
+    dispatch(process,
+             &CommandExecutorProcess::reregistered,
+             driver,
+             slaveInfo);
+  }
+
+  virtual void disconnected(ExecutorDriver* driver)
+  {
+    dispatch(process, &CommandExecutorProcess::disconnected, driver);
+  }
+
+  virtual void launchTask(ExecutorDriver* driver, const TaskInfo& task)
+  {
+    dispatch(process, &CommandExecutorProcess::launchTask, driver, task);
+  }
+
+  virtual void killTask(ExecutorDriver* driver, const TaskID& taskId)
+  {
+    dispatch(process, &CommandExecutorProcess::killTask, driver, taskId);
+  }
+
+  virtual void frameworkMessage(ExecutorDriver* driver, const string& data)
+  {
+    dispatch(process, &CommandExecutorProcess::frameworkMessage, driver, data);
+  }
+
+  virtual void shutdown(ExecutorDriver* driver)
+  {
+    dispatch(process, &CommandExecutorProcess::shutdown, driver);
+  }
+
+  virtual void error(ExecutorDriver* driver, const string& data)
+  {
+    dispatch(process, &CommandExecutorProcess::error, driver, data);
+  }
+
+private:
+  CommandExecutorProcess* process;
 };
 
 } // namespace internal {
@@ -260,12 +370,6 @@ private:
 int main(int argc, char** argv)
 {
   mesos::internal::CommandExecutor executor;
-
-  // Note that we currently put the MesosSchedulerDriver on the heap
-  // so that we don't have to deal with issues created because the
-  // thread we launched is trying to use the pointer.
-  mesos::MesosExecutorDriver* driver =
-    new mesos::MesosExecutorDriver(&executor);
-
-  return driver->run() == mesos::DRIVER_STOPPED ? 0 : 1;
+  mesos::MesosExecutorDriver driver(&executor);
+  return driver.run() == mesos::DRIVER_STOPPED ? 0 : 1;
 }

http://git-wip-us.apache.org/repos/asf/mesos/blob/6d8aca41/src/slave/reaper.hpp
----------------------------------------------------------------------
diff --git a/src/slave/reaper.hpp b/src/slave/reaper.hpp
index 6eb9a3a..4498139 100644
--- a/src/slave/reaper.hpp
+++ b/src/slave/reaper.hpp
@@ -38,9 +38,7 @@ namespace slave {
 class ReaperProcess;
 
 
-// TODO(vinod): Refactor the Reaper into 2 components:
-// 1) Reaps the status of child processes.
-// 2) Checks the exit status of requested processes.
+// TODO(vinod): Pull reaper into common or libprocess.
 class Reaper
 {
 public: