You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@mesos.apache.org by be...@apache.org on 2011/06/05 05:21:46 UTC
svn commit: r1131553 - in /incubator/mesos/trunk: include/nexus_exec.hpp
src/nexus_exec.cpp
Author: benh
Date: Sun Jun 5 03:21:46 2011
New Revision: 1131553
URL: http://svn.apache.org/viewvc?rev=1131553&view=rev
Log:
Initial work on adding drivers to API (#6); frameworks not yet updated
Modified:
incubator/mesos/trunk/include/nexus_exec.hpp
incubator/mesos/trunk/src/nexus_exec.cpp
Modified: incubator/mesos/trunk/include/nexus_exec.hpp
URL: http://svn.apache.org/viewvc/incubator/mesos/trunk/include/nexus_exec.hpp?rev=1131553&r1=1131552&r2=1131553&view=diff
==============================================================================
--- incubator/mesos/trunk/include/nexus_exec.hpp (original)
+++ incubator/mesos/trunk/include/nexus_exec.hpp Sun Jun 5 03:21:46 2011
@@ -5,10 +5,17 @@
#include <nexus.hpp>
+
namespace nexus {
+class ExecutorDriver;
+
namespace internal { class ExecutorProcess; }
+
+/**
+ * Arguments passed to executors on initialization.
+ */
struct ExecutorArgs
{
ExecutorArgs() {}
@@ -25,29 +32,69 @@ struct ExecutorArgs
};
+/**
+ * Callback interface to be implemented by frameworks' executors.
+ */
class Executor
{
public:
- Executor();
- virtual ~Executor();
+ virtual ~Executor() {}
+
+ virtual void init(ExecutorDriver* d, const ExecutorArgs& args) {}
+ virtual void startTask(ExecutorDriver* d, const TaskDescription& task) {}
+ virtual void killTask(ExecutorDriver* d, TaskID taskId) {}
+ virtual void frameworkMessage(ExecutorDriver* d,
+ const FrameworkMessage& message) {}
+ virtual void shutdown(ExecutorDriver* d) {}
+ virtual void error(ExecutorDriver* d, int code, const std::string& message);
+};
+
+
+/**
+ * Abstract interface for driving an executor connected to Nexus.
+ * This interface is used both to start the executor running (and
+ * communicating with the slave) and to send information from the executor
+ * to Nexus (such as status updates). Concrete implementations of
+ * ExecutorDriver will take a Executor as a parameter in order to make
+ * callbacks into it on various events.
+ */
+class ExecutorDriver
+{
+public:
+ virtual ~ExecutorDriver() {}
- // Overridable callbacks
- virtual void init(const ExecutorArgs& args) {}
- virtual void startTask(const TaskDescription& task) {}
- virtual void killTask(TaskID taskId) {}
- virtual void frameworkMessage(const FrameworkMessage& message) {}
- virtual void shutdown();
- virtual void error(int code, const std::string& message);
-
- // Non-overridable lifecycle methods
- void run();
-
- // Non-overridable communication methods
- void sendStatusUpdate(const TaskStatus &status);
- void sendFrameworkMessage(const FrameworkMessage &message);
+ // Connect to a slave and run the scheduler until it is shut down
+ virtual void run() {}
+
+ // Communication methods from executor to Nexus
+ virtual void sendStatusUpdate(const TaskStatus& status) {}
+ virtual void sendFrameworkMessage(const FrameworkMessage& message) {}
+};
+
+
+/**
+ * Concrete implementation of ExecutorDriver that communicates with a
+ * Nexus slave. The slave's location is read from environment variables
+ * set by it when it execs the user's executor script; users only need
+ * to create the NexusExecutorDriver and call run() on it.
+ */
+class NexusExecutorDriver : public ExecutorDriver
+{
+public:
+ NexusExecutorDriver(Executor* executor);
+ virtual ~NexusExecutorDriver();
+
+ virtual void run();
+ virtual void sendStatusUpdate(const TaskStatus& status);
+ virtual void sendFrameworkMessage(const FrameworkMessage& message);
private:
friend class internal::ExecutorProcess;
+
+ Executor* executor;
+
+ // LibProcess process for communicating with slave
+ internal::ExecutorProcess* process;
// Mutex to enforce all non-callbacks are execute serially
pthread_mutex_t mutex;
Modified: incubator/mesos/trunk/src/nexus_exec.cpp
URL: http://svn.apache.org/viewvc/incubator/mesos/trunk/src/nexus_exec.cpp?rev=1131553&r1=1131552&r2=1131553&view=diff
==============================================================================
--- incubator/mesos/trunk/src/nexus_exec.cpp (original)
+++ incubator/mesos/trunk/src/nexus_exec.cpp Sun Jun 5 03:21:46 2011
@@ -11,6 +11,7 @@
#include "nexus_exec.h"
#include "fatal.hpp"
+#include "lock.hpp"
#include "messages.hpp"
#include "nexus_exec.hpp"
@@ -30,17 +31,21 @@ namespace nexus { namespace internal {
class ExecutorProcess : public Tuple<Process>
{
public:
- friend class nexus::Executor;
+ friend class nexus::NexusExecutorDriver;
protected:
PID slave;
+ NexusExecutorDriver* driver;
+ Executor* executor;
FrameworkID fid;
SlaveID sid;
- Executor* executor;
public:
- ExecutorProcess(const PID& _slave, FrameworkID _fid, Executor* _executor)
- : slave(_slave), fid(_fid), executor(_executor) {}
+ ExecutorProcess(const PID& _slave,
+ NexusExecutorDriver* _driver,
+ Executor* _executor,
+ FrameworkID _fid)
+ : slave(_slave), driver(_driver), executor(_executor), fid(_fid) {}
protected:
void operator() ()
@@ -53,8 +58,8 @@ protected:
string name;
string args;
unpack<S2E_REGISTER_REPLY>(sid, name, args);
- ExecutorArgs ea(sid, fid, name, args);
- invoke(bind(&Executor::init, executor, ea));
+ ExecutorArgs execArg(sid, fid, name, args);
+ invoke(bind(&Executor::init, executor, driver, ref(execArg)));
break;
}
@@ -66,34 +71,37 @@ protected:
unpack<S2E_RUN_TASK>(tid, name, args, params);
TaskDescription task(tid, sid, name, params.getMap(), args);
send(slave, pack<E2S_STATUS_UPDATE>(fid, tid, TASK_RUNNING, ""));
- invoke(bind(&Executor::startTask, executor, ref(task)));
+ invoke(bind(&Executor::startTask, executor, driver, ref(task)));
break;
}
case S2E_KILL_TASK: {
TaskID tid;
unpack<S2E_KILL_TASK>(tid);
- invoke(bind(&Executor::killTask, executor, tid));
+ invoke(bind(&Executor::killTask, executor, driver, tid));
break;
}
case S2E_FRAMEWORK_MESSAGE: {
- FrameworkMessage message;
- unpack<S2E_FRAMEWORK_MESSAGE>(message);
- invoke(bind(&Executor::frameworkMessage, executor, ref(message)));
+ FrameworkMessage msg;
+ unpack<S2E_FRAMEWORK_MESSAGE>(msg);
+ invoke(bind(&Executor::frameworkMessage, executor, driver, ref(msg)));
break;
}
case S2E_KILL_EXECUTOR: {
- invoke(bind(&Executor::shutdown, executor));
- return; // Shut down this libpocess process
+ invoke(bind(&Executor::shutdown, executor, driver));
+ exit(0);
}
case PROCESS_EXIT: {
+ // TODO: Pass an argument to shutdown to tell it this is abnormal?
+ invoke(bind(&Executor::shutdown, executor, driver));
exit(1);
}
default: {
+ // TODO: Is this serious enough to exit?
cerr << "Received unknown message ID " << msgid()
<< " from " << from() << endl;
break;
@@ -106,28 +114,23 @@ protected:
}} /* namespace nexus { namespace internal { */
-static ExecutorProcess* _process = NULL;
-
-
-namespace {
-
-// RAII class for locking mutexes
-struct Lock
-{
- pthread_mutex_t* m;
- Lock(pthread_mutex_t* _m): m(_m) { pthread_mutex_lock(m); }
- ~Lock() { pthread_mutex_unlock(m); }
-};
-
-} /* namespace { */
-
-
/*
* Implementation of C++ API.
*/
-Executor::Executor()
+// Default implementation of error() that logs to stderr and exits
+void Executor::error(ExecutorDriver*, int code, const string &message)
+{
+ cerr << "Nexus error: " << message
+ << " (error code: " << code << ")" << endl;
+ // TODO(*): Don't exit here, let errors be recoverable. (?)
+ exit(1);
+}
+
+
+NexusExecutorDriver::NexusExecutorDriver(Executor* _executor)
+ : executor(_executor)
{
// Create mutex and condition variable
pthread_mutexattr_t attr;
@@ -138,13 +141,13 @@ Executor::Executor()
}
-Executor::~Executor()
+NexusExecutorDriver::~NexusExecutorDriver()
{
pthread_mutex_destroy(&mutex);
}
-void Executor::run()
+void NexusExecutorDriver::run()
{
// Set stream buffering mode to flush on newlines so that we capture logs
// from user processes even when output is redirected to a file.
@@ -180,88 +183,71 @@ void Executor::run()
if (!(iss >> fid))
fatal("cannot parse NEXUS_FRAMEWORK_ID");
- _process = new ExecutorProcess(slave, fid, this);
+ process = new ExecutorProcess(slave, this, executor, fid);
- Process::wait(Process::spawn(_process));
+ Process::wait(Process::spawn(process));
- _process = NULL;
+ process = NULL;
}
-void Executor::sendStatusUpdate(const TaskStatus &status)
+void NexusExecutorDriver::sendStatusUpdate(const TaskStatus &status)
{
Lock lock(&mutex);
/* TODO(benh): Increment ref count on process. */
- if (!_process)
- error(EINVAL, "Executor has exited");
+ if (!process)
+ executor->error(this, EINVAL, "Executor has exited");
- _process->send(_process->slave,
- _process->pack<E2S_STATUS_UPDATE>(_process->fid,
- status.taskId,
- status.state,
- status.data));
+ process->send(process->slave,
+ process->pack<E2S_STATUS_UPDATE>(process->fid,
+ status.taskId,
+ status.state,
+ status.data));
/* TODO(benh): Decrement ref count on process. */
}
-void Executor::sendFrameworkMessage(const FrameworkMessage &message)
+void NexusExecutorDriver::sendFrameworkMessage(const FrameworkMessage &message)
{
Lock lock(&mutex);
/* TODO(benh): Increment ref count on process. */
- if (!_process)
- error(EINVAL, "Executor has exited");
+ if (!process)
+ executor->error(this, EINVAL, "Executor has exited");
- _process->send(_process->slave,
- _process->pack<E2S_FRAMEWORK_MESSAGE>(_process->fid,
- message));
+ process->send(process->slave,
+ process->pack<E2S_FRAMEWORK_MESSAGE>(process->fid,
+ message));
/* TODO(benh): Decrement ref count on process. */
}
-// Default implementation of error() that logs to stderr and exits
-void Executor::error(int code, const string &message)
-{
- cerr << "Nexus error: " << message
- << " (error code: " << code << ")" << endl;
- // TODO(*): Don't exit here, let errors be recoverable.
- exit(1);
-}
-
-
-// Default implementation of shutdown() that just exits
-void Executor::shutdown()
-{
- exit(0);
-}
-
-
/*
* Implementation of C API.
*/
-namespace nexus {
+namespace nexus { namespace internal {
/*
* We wrap calls from the C API into the C++ API with the following
* specialized implementation of Executor.
*/
class CExecutor : public Executor {
-private:
+public:
nexus_exec* exec;
+ ExecutorDriver* driver; // Set externally after object is created
-public:
- CExecutor(nexus_exec* _exec) : exec(_exec) {}
+ CExecutor(nexus_exec* _exec) : exec(_exec), driver(NULL) {}
virtual ~CExecutor() {}
- virtual void init(const ExecutorArgs& args)
+ virtual void init(ExecutorDriver*, const ExecutorArgs& args)
{
exec->init(exec,
args.slaveId.c_str(),
@@ -271,7 +257,7 @@ public:
args.data.size());
}
- virtual void startTask(const TaskDescription& task)
+ virtual void startTask(ExecutorDriver*, const TaskDescription& task)
{
// Convert params to key=value list
Params paramsObj(task.params);
@@ -285,12 +271,13 @@ public:
exec->run(exec, &td);
}
- virtual void killTask(TaskID taskId)
+ virtual void killTask(ExecutorDriver*, TaskID taskId)
{
exec->kill(exec, taskId);
}
- virtual void frameworkMessage(const FrameworkMessage& message)
+ virtual void frameworkMessage(ExecutorDriver*,
+ const FrameworkMessage& message)
{
nexus_framework_message msg = { message.slaveId.c_str(),
message.taskId,
@@ -299,24 +286,27 @@ public:
exec->message(exec, &msg);
}
- virtual void shutdown()
+ virtual void shutdown(ExecutorDriver*)
{
exec->shutdown(exec);
}
- virtual void error(int code, const std::string& message)
+ virtual void error(ExecutorDriver*, int code, const std::string& message)
{
exec->error(exec, code, message.c_str());
}
};
-} /* namespace nexus { */
-
/*
* A single CExecutor instance used with the C API.
+ *
+ * TODO: Is this a good idea? How can one unit-test C frameworks? It might
+ * be better to have a hashtable as in the scheduler API eventually.
*/
-CExecutor* _executor = NULL;
+CExecutor* c_executor = NULL;
+
+}} /* namespace nexus { namespace internal {*/
extern "C" {
@@ -324,16 +314,19 @@ extern "C" {
int nexus_exec_run(struct nexus_exec* exec)
{
- if (exec == NULL || _executor != NULL) {
+ if (exec == NULL || c_executor != NULL) {
errno = EINVAL;
return -1;
}
- _executor = new CExecutor(exec);
+ CExecutor executor(exec);
+ c_executor = &executor;
- _executor->run();
+ NexusExecutorDriver driver(&executor);
+ executor.driver = &driver;
+ driver.run();
- _executor = NULL;
+ c_executor = NULL;
return 0;
}
@@ -342,7 +335,7 @@ int nexus_exec_run(struct nexus_exec* ex
int nexus_exec_send_message(struct nexus_exec* exec,
struct nexus_framework_message* msg)
{
- if (exec == NULL || _executor == NULL || msg == NULL) {
+ if (exec == NULL || c_executor == NULL || msg == NULL) {
errno = EINVAL;
return -1;
}
@@ -350,7 +343,7 @@ int nexus_exec_send_message(struct nexus
string data((char*) msg->data, msg->data_len);
FrameworkMessage message(string(msg->sid), msg->tid, data);
- _executor->sendFrameworkMessage(message);
+ c_executor->driver->sendFrameworkMessage(message);
return 0;
}
@@ -360,7 +353,7 @@ int nexus_exec_status_update(struct nexu
struct nexus_task_status* status)
{
- if (exec == NULL || _executor == NULL || status == NULL) {
+ if (exec == NULL || c_executor == NULL || status == NULL) {
errno = EINVAL;
return -1;
}
@@ -368,7 +361,7 @@ int nexus_exec_status_update(struct nexu
string data((char*) status->data, status->data_len);
TaskStatus ts(status->tid, status->state, data);
- _executor->sendStatusUpdate(ts);
+ c_executor->driver->sendStatusUpdate(ts);
return 0;
}