You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@mesos.apache.org by be...@apache.org on 2011/06/05 05:21:46 UTC

svn commit: r1131553 - in /incubator/mesos/trunk: include/nexus_exec.hpp src/nexus_exec.cpp

Author: benh
Date: Sun Jun  5 03:21:46 2011
New Revision: 1131553

URL: http://svn.apache.org/viewvc?rev=1131553&view=rev
Log:
Initial work on adding drivers to API (#6); frameworks not yet updated

Modified:
    incubator/mesos/trunk/include/nexus_exec.hpp
    incubator/mesos/trunk/src/nexus_exec.cpp

Modified: incubator/mesos/trunk/include/nexus_exec.hpp
URL: http://svn.apache.org/viewvc/incubator/mesos/trunk/include/nexus_exec.hpp?rev=1131553&r1=1131552&r2=1131553&view=diff
==============================================================================
--- incubator/mesos/trunk/include/nexus_exec.hpp (original)
+++ incubator/mesos/trunk/include/nexus_exec.hpp Sun Jun  5 03:21:46 2011
@@ -5,10 +5,17 @@
 
 #include <nexus.hpp>
 
+
 namespace nexus {
 
+class ExecutorDriver;
+
 namespace internal { class ExecutorProcess; }
 
+
+/**
+ * Arguments passed to executors on initialization.
+ */
 struct ExecutorArgs
 {
   ExecutorArgs() {}
@@ -25,29 +32,69 @@ struct ExecutorArgs
 };
 
 
+/**
+ * Callback interface to be implemented by frameworks' executors.
+ */
 class Executor
 {
 public:
-  Executor();
-  virtual ~Executor();
+  virtual ~Executor() {}
+
+  virtual void init(ExecutorDriver* d, const ExecutorArgs& args) {}
+  virtual void startTask(ExecutorDriver* d, const TaskDescription& task) {}
+  virtual void killTask(ExecutorDriver* d, TaskID taskId) {}
+  virtual void frameworkMessage(ExecutorDriver* d,
+                                const FrameworkMessage& message) {}
+  virtual void shutdown(ExecutorDriver* d) {}
+  virtual void error(ExecutorDriver* d, int code, const std::string& message);
+};
+
+
+/**
+ * Abstract interface for driving an executor connected to Nexus.
+ * This interface is used both to start the executor running (and
+ * communicating with the slave) and to send information from the executor
+ * to Nexus (such as status updates). Concrete implementations of
+ * ExecutorDriver will take a Executor as a parameter in order to make
+ * callbacks into it on various events.
+ */
+class ExecutorDriver
+{
+public:
+  virtual ~ExecutorDriver() {}
 
-  // Overridable callbacks
-  virtual void init(const ExecutorArgs& args) {}
-  virtual void startTask(const TaskDescription& task) {}
-  virtual void killTask(TaskID taskId) {}
-  virtual void frameworkMessage(const FrameworkMessage& message) {}
-  virtual void shutdown();
-  virtual void error(int code, const std::string& message);
-
-  // Non-overridable lifecycle methods
-  void run();
-
-  // Non-overridable communication methods
-  void sendStatusUpdate(const TaskStatus &status);
-  void sendFrameworkMessage(const FrameworkMessage &message);
+  // Connect to a slave and run the scheduler until it is shut down
+  virtual void run() {}
+
+  // Communication methods from executor to Nexus
+  virtual void sendStatusUpdate(const TaskStatus& status) {}
+  virtual void sendFrameworkMessage(const FrameworkMessage& message) {}
+};
+
+
+/**
+ * Concrete implementation of ExecutorDriver that communicates with a
+ * Nexus slave. The slave's location is read from environment variables
+ * set by it when it execs the user's executor script; users only need
+ * to create the NexusExecutorDriver and call run() on it.
+ */
+class NexusExecutorDriver : public ExecutorDriver
+{
+public:
+  NexusExecutorDriver(Executor* executor);
+  virtual ~NexusExecutorDriver();
+
+  virtual void run();
+  virtual void sendStatusUpdate(const TaskStatus& status);
+  virtual void sendFrameworkMessage(const FrameworkMessage& message);
 
 private:
   friend class internal::ExecutorProcess;
+
+  Executor* executor;
+
+  // LibProcess process for communicating with slave
+  internal::ExecutorProcess* process;
   
   // Mutex to enforce all non-callbacks are execute serially
   pthread_mutex_t mutex;

Modified: incubator/mesos/trunk/src/nexus_exec.cpp
URL: http://svn.apache.org/viewvc/incubator/mesos/trunk/src/nexus_exec.cpp?rev=1131553&r1=1131552&r2=1131553&view=diff
==============================================================================
--- incubator/mesos/trunk/src/nexus_exec.cpp (original)
+++ incubator/mesos/trunk/src/nexus_exec.cpp Sun Jun  5 03:21:46 2011
@@ -11,6 +11,7 @@
 #include "nexus_exec.h"
 
 #include "fatal.hpp"
+#include "lock.hpp"
 #include "messages.hpp"
 #include "nexus_exec.hpp"
 
@@ -30,17 +31,21 @@ namespace nexus { namespace internal {
 class ExecutorProcess : public Tuple<Process>
 {
 public:
-  friend class nexus::Executor;
+  friend class nexus::NexusExecutorDriver;
   
 protected:
   PID slave;
+  NexusExecutorDriver* driver;
+  Executor* executor;
   FrameworkID fid;
   SlaveID sid;
-  Executor* executor;
 
 public:
-  ExecutorProcess(const PID& _slave, FrameworkID _fid, Executor* _executor)
-    : slave(_slave), fid(_fid), executor(_executor) {}
+  ExecutorProcess(const PID& _slave,
+                  NexusExecutorDriver* _driver,
+                  Executor* _executor,
+                  FrameworkID _fid)
+    : slave(_slave), driver(_driver), executor(_executor), fid(_fid) {}
 
 protected:
   void operator() ()
@@ -53,8 +58,8 @@ protected:
           string name;
           string args;
           unpack<S2E_REGISTER_REPLY>(sid, name, args);
-          ExecutorArgs ea(sid, fid, name, args);
-          invoke(bind(&Executor::init, executor, ea));
+          ExecutorArgs execArg(sid, fid, name, args);
+          invoke(bind(&Executor::init, executor, driver, ref(execArg)));
           break;
         }
 
@@ -66,34 +71,37 @@ protected:
           unpack<S2E_RUN_TASK>(tid, name, args, params);
           TaskDescription task(tid, sid, name, params.getMap(), args);
           send(slave, pack<E2S_STATUS_UPDATE>(fid, tid, TASK_RUNNING, ""));
-          invoke(bind(&Executor::startTask, executor, ref(task)));
+          invoke(bind(&Executor::startTask, executor, driver, ref(task)));
           break;
         }
 
         case S2E_KILL_TASK: {
           TaskID tid;
           unpack<S2E_KILL_TASK>(tid);
-          invoke(bind(&Executor::killTask, executor, tid));
+          invoke(bind(&Executor::killTask, executor, driver, tid));
           break;
         }
 
         case S2E_FRAMEWORK_MESSAGE: {
-          FrameworkMessage message;
-          unpack<S2E_FRAMEWORK_MESSAGE>(message);
-          invoke(bind(&Executor::frameworkMessage, executor, ref(message)));
+          FrameworkMessage msg;
+          unpack<S2E_FRAMEWORK_MESSAGE>(msg);
+          invoke(bind(&Executor::frameworkMessage, executor, driver, ref(msg)));
           break;
         }
 
         case S2E_KILL_EXECUTOR: {
-          invoke(bind(&Executor::shutdown, executor));
-          return; // Shut down this libpocess process
+          invoke(bind(&Executor::shutdown, executor, driver));
+          exit(0);
         }
 
         case PROCESS_EXIT: {
+          // TODO: Pass an argument to shutdown to tell it this is abnormal?
+          invoke(bind(&Executor::shutdown, executor, driver));
           exit(1);
         }
 
         default: {
+          // TODO: Is this serious enough to exit?
           cerr << "Received unknown message ID " << msgid()
                << " from " << from() << endl;
           break;
@@ -106,28 +114,23 @@ protected:
 }} /* namespace nexus { namespace internal { */
 
 
-static ExecutorProcess* _process = NULL;
-
-
-namespace {
-
-// RAII class for locking mutexes
-struct Lock
-{
-  pthread_mutex_t* m;
-  Lock(pthread_mutex_t* _m): m(_m) { pthread_mutex_lock(m); }
-  ~Lock() { pthread_mutex_unlock(m); }
-};
-
-} /* namespace { */
-
-
 /*
  * Implementation of C++ API.
  */
 
 
-Executor::Executor()
+// Default implementation of error() that logs to stderr and exits
+void Executor::error(ExecutorDriver*, int code, const string &message)
+{
+  cerr << "Nexus error: " << message
+       << " (error code: " << code << ")" << endl;
+  // TODO(*): Don't exit here, let errors be recoverable. (?)
+  exit(1);
+}
+
+
+NexusExecutorDriver::NexusExecutorDriver(Executor* _executor)
+  : executor(_executor)
 {
   // Create mutex and condition variable
   pthread_mutexattr_t attr;
@@ -138,13 +141,13 @@ Executor::Executor()
 }
 
 
-Executor::~Executor()
+NexusExecutorDriver::~NexusExecutorDriver()
 {
   pthread_mutex_destroy(&mutex);
 }
 
 
-void Executor::run()
+void NexusExecutorDriver::run()
 {
   // Set stream buffering mode to flush on newlines so that we capture logs
   // from user processes even when output is redirected to a file.
@@ -180,88 +183,71 @@ void Executor::run()
   if (!(iss >> fid))
     fatal("cannot parse NEXUS_FRAMEWORK_ID");
 
-  _process = new ExecutorProcess(slave, fid, this);
+  process = new ExecutorProcess(slave, this, executor, fid);
 
-  Process::wait(Process::spawn(_process));
+  Process::wait(Process::spawn(process));
 
-  _process = NULL;
+  process = NULL;
 }
 
 
-void Executor::sendStatusUpdate(const TaskStatus &status)
+void NexusExecutorDriver::sendStatusUpdate(const TaskStatus &status)
 {
   Lock lock(&mutex);
 
   /* TODO(benh): Increment ref count on process. */
   
-  if (!_process)
-    error(EINVAL, "Executor has exited");
+  if (!process)
+    executor->error(this, EINVAL, "Executor has exited");
 
-  _process->send(_process->slave,
-                 _process->pack<E2S_STATUS_UPDATE>(_process->fid,
-                                                   status.taskId,
-                                                   status.state,
-                                                   status.data));
+  process->send(process->slave,
+                process->pack<E2S_STATUS_UPDATE>(process->fid,
+                                                 status.taskId,
+                                                 status.state,
+                                                 status.data));
 
   /* TODO(benh): Decrement ref count on process. */
 }
 
 
-void Executor::sendFrameworkMessage(const FrameworkMessage &message)
+void NexusExecutorDriver::sendFrameworkMessage(const FrameworkMessage &message)
 {
   Lock lock(&mutex);
 
   /* TODO(benh): Increment ref count on process. */
   
-  if (!_process)
-    error(EINVAL, "Executor has exited");
+  if (!process)
+    executor->error(this, EINVAL, "Executor has exited");
 
-  _process->send(_process->slave,
-                 _process->pack<E2S_FRAMEWORK_MESSAGE>(_process->fid,
-                                                       message));
+  process->send(process->slave,
+                process->pack<E2S_FRAMEWORK_MESSAGE>(process->fid,
+                                                     message));
 
   /* TODO(benh): Decrement ref count on process. */
 }
 
 
-// Default implementation of error() that logs to stderr and exits
-void Executor::error(int code, const string &message)
-{
-  cerr << "Nexus error: " << message
-       << " (error code: " << code << ")" << endl;
-  // TODO(*): Don't exit here, let errors be recoverable.
-  exit(1);
-}
-
-
-// Default implementation of shutdown() that just exits
-void Executor::shutdown()
-{
-  exit(0);
-}
-
-
 /*
  * Implementation of C API.
  */
 
 
-namespace nexus {
+namespace nexus { namespace internal {
 
 /*
  * We wrap calls from the C API into the C++ API with the following
  * specialized implementation of Executor.
  */
 class CExecutor : public Executor {
-private:
+public:
   nexus_exec* exec;
+  ExecutorDriver* driver; // Set externally after object is created
   
-public:
-  CExecutor(nexus_exec* _exec) : exec(_exec) {}
+  CExecutor(nexus_exec* _exec) : exec(_exec), driver(NULL) {}
 
   virtual ~CExecutor() {}
 
-  virtual void init(const ExecutorArgs& args)
+  virtual void init(ExecutorDriver*, const ExecutorArgs& args)
   {
     exec->init(exec,
                args.slaveId.c_str(),
@@ -271,7 +257,7 @@ public:
                args.data.size());
   }
 
-  virtual void startTask(const TaskDescription& task)
+  virtual void startTask(ExecutorDriver*, const TaskDescription& task)
   {
     // Convert params to key=value list
     Params paramsObj(task.params);
@@ -285,12 +271,13 @@ public:
     exec->run(exec, &td);
   }
 
-  virtual void killTask(TaskID taskId)
+  virtual void killTask(ExecutorDriver*, TaskID taskId)
   {
     exec->kill(exec, taskId);
   }
   
-  virtual void frameworkMessage(const FrameworkMessage& message)
+  virtual void frameworkMessage(ExecutorDriver*,
+                                const FrameworkMessage& message)
   {
     nexus_framework_message msg = { message.slaveId.c_str(),
                                     message.taskId,
@@ -299,24 +286,27 @@ public:
     exec->message(exec, &msg);
   }
   
-  virtual void shutdown()
+  virtual void shutdown(ExecutorDriver*)
   {
     exec->shutdown(exec);
   }
   
-  virtual void error(int code, const std::string& message)
+  virtual void error(ExecutorDriver*, int code, const std::string& message)
   {
     exec->error(exec, code, message.c_str());
   }
 };
 
-} /* namespace nexus { */
-
 
 /*
  * A single CExecutor instance used with the C API.
+ *
+ * TODO: Is this a good idea? How can one unit-test C frameworks? It might
+ *       be better to have a hashtable as in the scheduler API eventually.
  */
-CExecutor* _executor = NULL;
+CExecutor* c_executor = NULL;
+
+}} /* namespace nexus { namespace internal {*/
 
 
 extern "C" {
@@ -324,16 +314,19 @@ extern "C" {
 
 int nexus_exec_run(struct nexus_exec* exec)
 {
-  if (exec == NULL || _executor != NULL) {
+  if (exec == NULL || c_executor != NULL) {
     errno = EINVAL;
     return -1;
   }
 
-  _executor = new CExecutor(exec);
+  CExecutor executor(exec);
+  c_executor = &executor;
   
-  _executor->run();
+  NexusExecutorDriver driver(&executor);
+  executor.driver = &driver;
+  driver.run();
 
-  _executor = NULL;
+  c_executor = NULL;
 
   return 0;
 }
@@ -342,7 +335,7 @@ int nexus_exec_run(struct nexus_exec* ex
 int nexus_exec_send_message(struct nexus_exec* exec,
                             struct nexus_framework_message* msg)
 {
-  if (exec == NULL || _executor == NULL || msg == NULL) {
+  if (exec == NULL || c_executor == NULL || msg == NULL) {
     errno = EINVAL;
     return -1;
   }
@@ -350,7 +343,7 @@ int nexus_exec_send_message(struct nexus
   string data((char*) msg->data, msg->data_len);
   FrameworkMessage message(string(msg->sid), msg->tid, data);
 
-  _executor->sendFrameworkMessage(message);
+  c_executor->driver->sendFrameworkMessage(message);
 
   return 0;
 }
@@ -360,7 +353,7 @@ int nexus_exec_status_update(struct nexu
                              struct nexus_task_status* status)
 {
 
-  if (exec == NULL || _executor == NULL || status == NULL) {
+  if (exec == NULL || c_executor == NULL || status == NULL) {
     errno = EINVAL;
     return -1;
   }
@@ -368,7 +361,7 @@ int nexus_exec_status_update(struct nexu
   string data((char*) status->data, status->data_len);
   TaskStatus ts(status->tid, status->state, data);
 
-  _executor->sendStatusUpdate(ts);
+  c_executor->driver->sendStatusUpdate(ts);
 
   return 0;
 }