You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@mesos.apache.org by be...@apache.org on 2011/06/05 11:07:21 UTC
svn commit: r1132249 - /incubator/mesos/trunk/src/slave/slave.cpp
Author: benh
Date: Sun Jun 5 09:07:21 2011
New Revision: 1132249
URL: http://svn.apache.org/viewvc?rev=1132249&view=rev
Log:
Don't use HOST_NAME_MAX because it is not defined on all platforms
Modified:
incubator/mesos/trunk/src/slave/slave.cpp
Modified: incubator/mesos/trunk/src/slave/slave.cpp
URL: http://svn.apache.org/viewvc/incubator/mesos/trunk/src/slave/slave.cpp?rev=1132249&r1=1132248&r2=1132249&view=diff
==============================================================================
--- incubator/mesos/trunk/src/slave/slave.cpp (original)
+++ incubator/mesos/trunk/src/slave/slave.cpp Sun Jun 5 09:07:21 2011
@@ -6,8 +6,6 @@
#include <algorithm>
#include <fstream>
-#include <google/protobuf/descriptor.h>
-
#include "slave.hpp"
#include "webui.hpp"
@@ -16,61 +14,101 @@
#define gethostbyname2(name, _) gethostbyname(name)
#endif
-using namespace mesos;
-using namespace mesos::internal;
-using namespace mesos::internal::slave;
-
-using boost::unordered_map;
-using boost::unordered_set;
-
using std::list;
using std::make_pair;
using std::ostringstream;
+using std::istringstream;
using std::pair;
using std::queue;
using std::string;
using std::vector;
+using boost::lexical_cast;
+using boost::unordered_map;
+using boost::unordered_set;
+
+using namespace mesos;
+using namespace mesos::internal;
+using namespace mesos::internal::slave;
+
-Slave::Slave(const Resources& _resources, bool _local,
+namespace {
+
+// Periodically sends heartbeats to the master
+class Heart : public MesosProcess
+{
+private:
+ PID master;
+ PID slave;
+ SlaveID sid;
+ double interval;
+
+protected:
+ void operator () ()
+ {
+ link(slave);
+ link(master);
+ do {
+ switch (receive(interval)) {
+ case PROCESS_TIMEOUT:
+ send(master, pack<SH2M_HEARTBEAT>(sid));
+ break;
+ case PROCESS_EXIT:
+ return;
+ }
+ } while (true);
+ }
+
+public:
+ Heart(const PID &_master, const PID &_slave, SlaveID _sid, double _interval)
+ : master(_master), slave(_slave), sid(_sid), interval(_interval) {}
+};
+
+
+// Default values for CPU cores and memory to include in configuration
+const int32_t DEFAULT_CPUS = 1;
+const int32_t DEFAULT_MEM = 1 * Gigabyte;
+
+
+} /* namespace */
+
+
+Slave::Slave(Resources _resources, bool _local,
IsolationModule *_isolationModule)
- : resources(_resources), local(_local),
- isolationModule(_isolationModule), heart(NULL) {}
+ : id(""), resources(_resources), local(_local),
+ isolationModule(_isolationModule) {}
-Slave::Slave(const Configuration& _conf, bool _local,
- IsolationModule* _isolationModule)
- : conf(_conf), local(_local),
- isolationModule(_isolationModule), heart(NULL)
-{
- resources =
- Resources::parse(conf.get<string>("resources", "cpus:1;mem:1024"));
+Slave::Slave(const Params& _conf, bool _local, IsolationModule *_module)
+ : id(""), conf(_conf), local(_local), isolationModule(_module)
+{
+ resources = Resources(conf.get<int32_t>("cpus", DEFAULT_CPUS),
+ conf.get<int32_t>("mem", DEFAULT_MEM));
}
-void Slave::registerOptions(Configurator* configurator)
-{
- // TODO(benh): Is there a way to specify units for the resources?
- configurator->addOption<string>("resources",
- "Total consumable resources per slave\n");
-// configurator->addOption<string>("attributes",
-// "Attributes of machine\n");
- configurator->addOption<string>("work_dir",
- "Where to place framework work directories\n"
- "(default: MESOS_HOME/work)");
- configurator->addOption<string>("hadoop_home",
- "Where to find Hadoop installed (for\n"
- "fetching framework executors from HDFS)\n"
- "(default: look for HADOOP_HOME in\n"
- "environment or find hadoop on PATH)");
- configurator->addOption<bool>("switch_user",
- "Whether to run tasks as the user who\n"
- "submitted them rather than the user running\n"
- "the slave (requires setuid permission)",
- true);
- configurator->addOption<string>("frameworks_home",
- "Directory prepended to relative executor\n"
- "paths (default: MESOS_HOME/frameworks)");
+void Slave::registerOptions(Configurator* conf)
+{
+ conf->addOption<int32_t>("cpus", 'c', "CPU cores for use by tasks",
+ DEFAULT_CPUS);
+ conf->addOption<int64_t>("mem", 'm', "Memory for use by tasks, in MB\n",
+ DEFAULT_MEM);
+ conf->addOption<string>("work_dir",
+ "Where to place framework work directories\n"
+ "(default: MESOS_HOME/work)");
+ conf->addOption<string>("hadoop_home",
+ "Where to find Hadoop installed (for fetching\n"
+ "framework executors from HDFS)\n"
+ "(default: look for HADOOP_HOME environment\n"
+ "variable or find hadoop on PATH)");
+ conf->addOption<bool>("switch_user",
+ "Whether to run tasks as the user who\n"
+ "submitted them rather than the user running\n"
+ "the slave (requires setuid permission)",
+ true);
+ conf->addOption<string>("frameworks_home",
+ "Directory prepended to relative executor\n"
+ "paths (default: MESOS_HOME/frameworks)");
}
@@ -82,54 +120,25 @@ Slave::~Slave()
state::SlaveState *Slave::getState()
{
- Resources resources(resources);
- Resource::Scalar cpus;
- Resource::Scalar mem;
- cpus.set_value(-1);
- mem.set_value(-1);
- cpus = resources.getScalar("cpus", cpus);
- mem = resources.getScalar("mem", mem);
-
+ std::ostringstream my_pid;
+ my_pid << self();
+ std::ostringstream master_pid;
+ master_pid << master;
state::SlaveState *state =
- new state::SlaveState(BUILD_DATE, BUILD_USER, slaveId.value(),
- cpus.value(), mem.value(), self(), master);
-
-// foreachpair (_, Framework *f, frameworks) {
-
-// foreachpair (_, Executor* e, f->executors) {
+ new state::SlaveState(BUILD_DATE, BUILD_USER, id, resources.cpus,
+ resources.mem, my_pid.str(), master_pid.str());
-// Resources resources(e->resources);
-// Resource::Scalar cpus;
-// Resource::Scalar mem;
-// cpus.set_value(-1);
-// mem.set_value(-1);
-// cpus = resources.getScalar("cpus", cpus);
-// mem = resources.getScalar("mem", mem);
-
-// state::Framework *framework =
-// new state::Framework(f->frameworkId.value(), f->info.name(),
-// f->info.executor().uri(), f->executorStatus,
-// cpus.value(), mem.value());
-
-// state->frameworks.push_back(framework);
-
-// foreachpair(_, Task *t, f->tasks) {
-// Resources resources(t->resources());
-// Resource::Scalar cpus;
-// Resource::Scalar mem;
-// cpus.set_value(-1);
-// mem.set_value(-1);
-// cpus = resources.getScalar("cpus", cpus);
-// mem = resources.getScalar("mem", mem);
-
-// state::Task *task =
-// new state::Task(t->task_id().value(), t->name(),
-// TaskState_descriptor()->FindValueByNumber(t->state())->name(),
-// cpus.value(), mem.value());
-
-// framework->tasks.push_back(task);
-// }
-// }
+ foreachpair(_, Framework *f, frameworks) {
+ state::Framework *framework = new state::Framework(f->id, f->name,
+ f->executorInfo.uri, f->executorStatus, f->resources.cpus,
+ f->resources.mem);
+ state->frameworks.push_back(framework);
+ foreachpair(_, Task *t, f->tasks) {
+ state::Task *task = new state::Task(t->id, t->name, t->state,
+ t->resources.cpus, t->resources.mem);
+ framework->tasks.push_back(task);
+ }
+ }
return state;
}
@@ -138,60 +147,61 @@ state::SlaveState *Slave::getState()
void Slave::operator () ()
{
LOG(INFO) << "Slave started at " << self();
- LOG(INFO) << "Slave resources: " << resources;
// Get our hostname
- char buf[256];
+ char buf[512];
gethostname(buf, sizeof(buf));
hostent *he = gethostbyname2(buf, AF_INET);
string hostname = he->h_name;
- // Get our public DNS name. Normally this is our hostname, but on EC2
+ // Get our public Web UI URL. Normally this is our hostname, but on EC2
// we look for the MESOS_PUBLIC_DNS environment variable. This allows
// the master to display our public name in its web UI.
- string public_hostname = hostname;
+ LOG(INFO) << "setting up webUIUrl on port " << conf["webui_port"];
+ string webUIUrl;
if (getenv("MESOS_PUBLIC_DNS") != NULL) {
- public_hostname = getenv("MESOS_PUBLIC_DNS");
+ webUIUrl = getenv("MESOS_PUBLIC_DNS");
+ } else {
+ webUIUrl = hostname;
}
-
- SlaveInfo slave;
- slave.set_hostname(hostname);
- slave.set_public_hostname(public_hostname);
- slave.mutable_resources()->MergeFrom(resources);
+#ifdef MESOS_WEBUI
+ webUIUrl += ":" + conf["webui_port"];
+#endif
// Initialize isolation module.
isolationModule->initialize(this);
while (true) {
- switch (receive(1)) {
+ switch (receive()) {
case NEW_MASTER_DETECTED: {
- const Message<NEW_MASTER_DETECTED>& msg = message();
+ string masterSeq;
+ PID masterPid;
+ tie(masterSeq, masterPid) = unpack<NEW_MASTER_DETECTED>(body());
- LOG(INFO) << "New master at " << msg.pid();
+ LOG(INFO) << "New master at " << masterPid << " with ID:" << masterSeq;
- master = msg.pid();
+ redirect(master, masterPid);
+ master = masterPid;
link(master);
- if (slaveId == "") {
+ if (id.empty()) {
// Slave started before master.
- Message<S2M_REGISTER_SLAVE> out;
- out.mutable_slave()->MergeFrom(slave);
- send(master, out);
+ send(master, pack<S2M_REGISTER_SLAVE>(hostname, webUIUrl, resources));
} else {
- // Re-registering, so send tasks running.
- Message<S2M_REREGISTER_SLAVE> out;
- out.mutable_slave_id()->MergeFrom(slaveId);
- out.mutable_slave()->MergeFrom(slave);
-
- foreachpair (_, Framework* framework, frameworks) {
- foreachpair (_, Executor* executor, framework->executors) {
- foreachpair (_, Task* task, executor->tasks) {
- out.add_task()->MergeFrom(*task);
- }
- }
- }
+ // Reconnecting, so reconstruct resourcesInUse for the master.
+ Resources resourcesInUse;
+ vector<Task> taskVec;
+
+ foreachpair(_, Framework *framework, frameworks) {
+ foreachpair(_, Task *task, framework->tasks) {
+ resourcesInUse += task->resources;
+ Task ti = *task;
+ ti.slaveId = id;
+ taskVec.push_back(ti);
+ }
+ }
- send(master, out);
+ send(master, pack<S2M_REREGISTER_SLAVE>(id, hostname, webUIUrl, resources, taskVec));
}
break;
}
@@ -201,351 +211,231 @@ void Slave::operator () ()
break;
}
- case MASTER_DETECTION_FAILURE: {
- LOG(FATAL) << "Cannot reliably detect master ... committing suicide!";
- break;
- }
-
case M2S_REGISTER_REPLY: {
- const Message<M2S_REGISTER_REPLY>& msg = message();
- slaveId = msg.slave_id();
-
- LOG(INFO) << "Registered with master; given slave ID " << slaveId;
-
- heart = new Heart(master, self(), slaveId, msg.heartbeat_interval());
- link(spawn(heart));
+ double interval = 0;
+ tie(this->id, interval) = unpack<M2S_REGISTER_REPLY>(body());
+ LOG(INFO) << "Registered with master; given slave ID " << this->id;
+ link(spawn(new Heart(master, self(), this->id, interval)));
break;
}
case M2S_REREGISTER_REPLY: {
- const Message<M2S_REREGISTER_REPLY>& msg = message();
-
- LOG(INFO) << "Re-registered with master";
-
- if (!(slaveId == msg.slave_id())) {
- LOG(FATAL) << "Slave re-registered but got wrong ID";
- }
-
- if (heart != NULL) {
- send(heart->self(), MESOS_MSGID);
- wait(heart->self());
- delete heart;
- }
-
- heart = new Heart(master, self(), slaveId, msg.heartbeat_interval());
- link(spawn(heart));
+ SlaveID sid;
+ double interval = 0;
+ tie(sid, interval) = unpack<M2S_REREGISTER_REPLY>(body());
+ LOG(INFO) << "RE-registered with master; given slave ID " << sid << " had "<< this->id;
+ if (this->id == "")
+ this->id = sid;
+ CHECK(this->id == sid);
+ link(spawn(new Heart(master, self(), this->id, interval)));
break;
}
case M2S_RUN_TASK: {
- const Message<M2S_RUN_TASK>& msg = message();
-
- const TaskDescription& task = msg.task();
-
- LOG(INFO) << "Got assigned task " << task.task_id()
- << " for framework " << msg.framework_id();
-
- Framework *framework = getFramework(msg.framework_id());
+ FrameworkID fid;
+ TaskID tid;
+ string fwName, user, taskName, taskArg;
+ ExecutorInfo execInfo;
+ Params params;
+ PID pid;
+ tie(fid, tid, fwName, user, execInfo, taskName, taskArg, params, pid) =
+ unpack<M2S_RUN_TASK>(body());
+ LOG(INFO) << "Got assigned task " << fid << ":" << tid;
+ Resources res;
+ res.cpus = params.getInt32("cpus", -1);
+ res.mem = params.getInt64("mem", -1);
+ Framework *framework = getFramework(fid);
if (framework == NULL) {
- framework =
- new Framework(msg.framework_id(), msg.framework(), msg.pid());
- frameworks[msg.framework_id()] = framework;
- }
-
- // Either send the task to an executor or start a new executor
- // and queue the task until the executor has started.
- Executor* executor = task.has_executor()
- ? framework->getExecutor(task.executor().executor_id())
- : framework->getExecutor(framework->info.executor().executor_id());
-
- if (executor != NULL) {
- if (!executor->pid) {
- // Queue task until the executor starts up.
- executor->queuedTasks.push_back(task);
- } else {
- // Add the task to the executor.
- executor->addTask(task);
-
- Message<S2E_RUN_TASK> out;
- out.mutable_framework()->MergeFrom(framework->info);
- out.mutable_framework_id()->MergeFrom(framework->frameworkId);
- out.set_pid(framework->pid);
- out.mutable_task()->MergeFrom(task);
- send(executor->pid, out);
- isolationModule->resourcesChanged(framework, executor);
- }
+ // Framework not yet created on this node - create it.
+ framework = new Framework(fid, fwName, user, execInfo, pid);
+ frameworks[fid] = framework;
+ isolationModule->startExecutor(framework);
+ }
+ Task *task = framework->addTask(tid, taskName, res);
+ Executor *executor = getExecutor(fid);
+ if (executor) {
+ send(executor->pid,
+ pack<S2E_RUN_TASK>(tid, taskName, taskArg, params));
+ isolationModule->resourcesChanged(framework);
} else {
- // Launch an executor for this task.
- if (task.has_executor()) {
- executor = framework->createExecutor(task.executor());
- } else {
- executor = framework->createExecutor(framework->info.executor());
- }
-
- // Queue task until the executor starts up.
- executor->queuedTasks.push_back(task);
-
- // Tell the isolation module to launch the executor.
- isolationModule->launchExecutor(framework, executor);
+ // Executor not yet registered; queue task for when it starts up
+ TaskDescription *td = new TaskDescription(
+ tid, taskName, taskArg, params.str());
+ framework->queuedTasks.push_back(td);
}
break;
}
case M2S_KILL_TASK: {
- const Message<M2S_KILL_TASK>& msg = message();
-
- LOG(INFO) << "Asked to kill task " << msg.task_id()
- << " of framework " << msg.framework_id();
-
- Framework* framework = getFramework(msg.framework_id());
- if (framework != NULL) {
- // Tell the executor to kill the task if it is up and
- // running, otherwise, consider the task lost.
- Executor* executor = framework->getExecutor(msg.task_id());
- if (executor == NULL || !executor->pid) {
- // Update the resources locally, if an executor comes up
- // after this then it just won't receive this task.
- executor->removeTask(msg.task_id());
- isolationModule->resourcesChanged(framework, executor);
-
- Message<S2M_STATUS_UPDATE> out;
- out.mutable_framework_id()->MergeFrom(msg.framework_id());
- TaskStatus *status = out.mutable_status();
- status->mutable_task_id()->MergeFrom(msg.task_id());
- status->mutable_slave_id()->MergeFrom(slaveId);
- status->set_state(TASK_LOST);
- send(master, out);
-
- double deadline = elapsed() + STATUS_UPDATE_RETRY_TIMEOUT;
- framework->statuses[deadline][status->task_id()] = *status;
- } else {
- // Otherwise, send a message to the executor and wait for
- // it to send us a status update.
- Message<S2E_KILL_TASK> out;
- out.mutable_framework_id()->MergeFrom(msg.framework_id());
- out.mutable_task_id()->MergeFrom(msg.task_id());
- send(executor->pid, out);
- }
- } else {
- LOG(WARNING) << "Cannot kill task " << msg.task_id()
- << " of framework " << msg.framework_id()
- << " because no such framework is running";
-
- Message<S2M_STATUS_UPDATE> out;
- out.mutable_framework_id()->MergeFrom(msg.framework_id());
- TaskStatus *status = out.mutable_status();
- status->mutable_task_id()->MergeFrom(msg.task_id());
- status->mutable_slave_id()->MergeFrom(slaveId);
- status->set_state(TASK_LOST);
- send(master, out);
-
- double deadline = elapsed() + STATUS_UPDATE_RETRY_TIMEOUT;
- framework->statuses[deadline][status->task_id()] = *status;
+ FrameworkID fid;
+ TaskID tid;
+ tie(fid, tid) = unpack<M2S_KILL_TASK>(body());
+ LOG(INFO) << "Killing task " << fid << ":" << tid;
+ if (Executor *ex = getExecutor(fid)) {
+ send(ex->pid, pack<S2E_KILL_TASK>(tid));
+ }
+ if (Framework *fw = getFramework(fid)) {
+ fw->removeTask(tid);
+ isolationModule->resourcesChanged(fw);
}
break;
}
case M2S_KILL_FRAMEWORK: {
- const Message<M2S_KILL_FRAMEWORK>&msg = message();
-
- LOG(INFO) << "Asked to kill framework " << msg.framework_id();
-
- Framework *framework = getFramework(msg.framework_id());
- if (framework != NULL)
- killFramework(framework);
+ FrameworkID fid;
+ tie(fid) = unpack<M2S_KILL_FRAMEWORK>(body());
+ LOG(INFO) << "Asked to kill framework " << fid;
+ Framework *fw = getFramework(fid);
+ if (fw != NULL)
+ killFramework(fw);
break;
}
case M2S_FRAMEWORK_MESSAGE: {
- const Message<M2S_FRAMEWORK_MESSAGE>&msg = message();
-
- Framework* framework = getFramework(msg.framework_id());
- if (framework != NULL) {
- const FrameworkMessage& message = msg.message();
-
- Executor* executor = framework->getExecutor(message.executor_id());
- if (executor == NULL) {
- LOG(WARNING) << "Dropping message for executor "
- << message.executor_id() << " of framework "
- << msg.framework_id()
- << " because executor does not exist";
- } else if (!executor->pid) {
- // TODO(*): If executor is not started, queue framework message?
- // (It's probably okay to just drop it since frameworks can have
- // the executor send a message to the master to say when it's ready.)
- LOG(WARNING) << "Dropping message for executor "
- << message.executor_id() << " of framework "
- << msg.framework_id()
- << " because executor is not running";
- } else {
- Message<S2E_FRAMEWORK_MESSAGE> out;
- out.mutable_framework_id()->MergeFrom(msg.framework_id());
- out.mutable_message()->MergeFrom(message);
- send(executor->pid, out);
- }
+ FrameworkID fid;
+ FrameworkMessage message;
+ tie(fid, message) = unpack<M2S_FRAMEWORK_MESSAGE>(body());
+ if (Executor *ex = getExecutor(fid)) {
+ VLOG(1) << "Relaying framework message for framework " << fid;
+ send(ex->pid, pack<S2E_FRAMEWORK_MESSAGE>(message));
} else {
- LOG(WARNING) << "Dropping message for framework "
- << msg.framework_id()
- << " because it does not exist";
- }
- break;
- }
-
- case M2S_UPDATE_FRAMEWORK: {
- const Message<M2S_UPDATE_FRAMEWORK>&msg = message();
-
- Framework *framework = getFramework(msg.framework_id());
- if (framework != NULL) {
- LOG(INFO) << "Updating framework " << msg.framework_id()
- << " pid to " << msg.pid();
- framework->pid = msg.pid();
+ VLOG(1) << "Dropping framework message for framework " << fid
+ << " because its executor is not running";
}
+ // TODO(*): If executor is not started, queue framework message?
+ // (It's probably okay to just drop it since frameworks can have
+ // the executor send a message to the master to say when it's ready.)
break;
}
- case M2S_STATUS_UPDATE_ACK: {
- const Message<M2S_STATUS_UPDATE_ACK>& msg = message();
-
- Framework* framework = getFramework(msg.framework_id());
+ case M2S_UPDATE_FRAMEWORK_PID: {
+ FrameworkID fid;
+ PID pid;
+ tie(fid, pid) = unpack<M2S_UPDATE_FRAMEWORK_PID>(body());
+ Framework *framework = getFramework(fid);
if (framework != NULL) {
- foreachpair (double deadline, _, framework->statuses) {
- if (framework->statuses[deadline].count(msg.task_id()) > 0) {
- LOG(INFO) << "Got acknowledgement of status update"
- << " for task " << msg.task_id()
- << " of framework " << framework->frameworkId;
- framework->statuses[deadline].erase(msg.task_id());
- break;
- }
- }
+ LOG(INFO) << "Updating framework " << fid << " pid to " << pid;
+ framework->pid = pid;
}
break;
}
case E2S_REGISTER_EXECUTOR: {
- const Message<E2S_REGISTER_EXECUTOR>& msg = message();
-
- LOG(INFO) << "Got registration for executor "
- << msg.executor_id() << " of framework "
- << msg.framework_id();
-
- Framework* framework = getFramework(msg.framework_id());
- if (framework != NULL) {
- Executor* executor = framework->getExecutor(msg.executor_id());
-
- // Check the status of the executor.
- if (executor == NULL) {
- LOG(WARNING) << "Not expecting executor " << msg.executor_id()
- << " of framework " << msg.framework_id();
- send(from(), S2E_KILL_EXECUTOR);
- } else if (executor->pid != PID()) {
- LOG(WARNING) << "Not good, executor " << msg.executor_id()
- << " of framework " << msg.framework_id()
- << " is already running";
- send(from(), S2E_KILL_EXECUTOR);
- } else {
- // Save the pid for the executor.
- executor->pid = from();
-
- // Now that the executor is up, set its resource limits.
- isolationModule->resourcesChanged(framework, executor);
-
- // Tell executor it's registered and give it any queued tasks.
- Message<S2E_REGISTER_REPLY> out;
- ExecutorArgs* args = out.mutable_args();
- args->mutable_framework_id()->MergeFrom(framework->frameworkId);
- args->set_name(framework->info.name());
- args->mutable_slave_id()->MergeFrom(slaveId);
- args->set_hostname(hostname);
- args->set_data(framework->info.executor().data());
- send(executor->pid, out);
- sendQueuedTasks(framework, executor);
+ FrameworkID fid;
+ tie(fid) = unpack<E2S_REGISTER_EXECUTOR>(body());
+ LOG(INFO) << "Got executor registration for framework " << fid;
+ if (Framework *fw = getFramework(fid)) {
+ if (getExecutor(fid) != 0) {
+ LOG(ERROR) << "Executor for framework " << fid
+ << "already exists";
+ send(from(), pack<S2E_KILL_EXECUTOR>());
+ break;
}
+ Executor *executor = new Executor(fid, from());
+ executors[fid] = executor;
+ link(from());
+ // Now that the executor is up, set its resource limits
+ isolationModule->resourcesChanged(fw);
+ // Tell executor that it's registered and give it its queued tasks
+ send(from(), pack<S2E_REGISTER_REPLY>(this->id,
+ hostname,
+ fw->name,
+ fw->executorInfo.initArg));
+ sendQueuedTasks(fw);
} else {
- // Framework is gone; tell the executor to exit.
- LOG(WARNING) << "Framework " << msg.framework_id()
- << " does not exist (it may have been killed),"
- << " telling executor to exit";
-
- // TODO(benh): Don't we also want to tell the isolation
- // module to shut this guy down!
- send(from(), S2E_KILL_EXECUTOR);
+ // Framework is gone; tell the executor to exit
+ send(from(), pack<S2E_KILL_EXECUTOR>());
}
break;
}
case E2S_STATUS_UPDATE: {
- const Message<E2S_STATUS_UPDATE>& msg = message();
-
- const TaskStatus& status = msg.status();
+ FrameworkID fid;
+ TaskID tid;
+ TaskState taskState;
+ string data;
+ tie(fid, tid, taskState, data) = unpack<E2S_STATUS_UPDATE>(body());
- LOG(INFO) << "Status update: task " << status.task_id()
- << " of framework " << msg.framework_id()
- << " is now in state "
- << TaskState_descriptor()->FindValueByNumber(status.state())->name();
-
- Framework *framework = getFramework(msg.framework_id());
+ Framework *framework = getFramework(fid);
if (framework != NULL) {
- Executor* executor = framework->getExecutor(status.task_id());
- if (executor != NULL) {
- if (status.state() == TASK_FINISHED ||
- status.state() == TASK_FAILED ||
- status.state() == TASK_KILLED ||
- status.state() == TASK_LOST) {
- executor->removeTask(status.task_id());
- isolationModule->resourcesChanged(framework, executor);
- }
-
- // Send message and record the status for possible resending.
- Message<S2M_STATUS_UPDATE> out;
- out.mutable_framework_id()->MergeFrom(msg.framework_id());
- out.mutable_status()->MergeFrom(status);
- send(master, out);
-
- double deadline = elapsed() + STATUS_UPDATE_RETRY_TIMEOUT;
- framework->statuses[deadline][status.task_id()] = status;
- } else {
- LOG(WARNING) << "Status update error: couldn't lookup "
- << "executor for framework " << msg.framework_id();
+ LOG(INFO) << "Got status update for task " << fid << ":" << tid;
+ if (taskState == TASK_FINISHED || taskState == TASK_FAILED ||
+ taskState == TASK_KILLED || taskState == TASK_LOST) {
+ LOG(INFO) << "Task " << fid << ":" << tid << " done";
+
+ framework->removeTask(tid);
+ isolationModule->resourcesChanged(framework);
}
+
+ // Reliably send message and save sequence number for
+ // canceling later.
+ int seq = rsend(master, framework->pid,
+ pack<S2M_STATUS_UPDATE>(id, fid, tid,
+ taskState, data));
+ seqs[fid].insert(seq);
} else {
- LOG(WARNING) << "Status update error: couldn't lookup "
- << "framework " << msg.framework_id();
+ LOG(WARNING) << "Got status update for UNKNOWN task "
+ << fid << ":" << tid;
}
break;
}
case E2S_FRAMEWORK_MESSAGE: {
- const Message<E2S_FRAMEWORK_MESSAGE>& msg = message();
-
- const FrameworkMessage& message = msg.message();
+ FrameworkID fid;
+ FrameworkMessage message;
+ tie(fid, message) = unpack<E2S_FRAMEWORK_MESSAGE>(body());
- Framework *framework = getFramework(msg.framework_id());
+ Framework *framework = getFramework(fid);
if (framework != NULL) {
- LOG(INFO) << "Sending message for framework "
- << framework->frameworkId
+ LOG(INFO) << "Sending message for framework " << fid
<< " to " << framework->pid;
- // TODO(benh): This is weird, sending an M2F message.
- Message<M2F_FRAMEWORK_MESSAGE> out;
- out.mutable_framework_id()->MergeFrom(msg.framework_id());
- out.mutable_message()->MergeFrom(message);
- out.mutable_message()->mutable_slave_id()->MergeFrom(slaveId);
- send(framework->pid, out);
+ // Set slave ID in case framework omitted it.
+ message.slaveId = this->id;
+ VLOG(1) << "Sending framework message to framework " << fid
+ << " with PID " << framework->pid;
+ send(framework->pid, pack<M2F_FRAMEWORK_MESSAGE>(message));
}
break;
}
case S2S_GET_STATE: {
- state::SlaveState *state = getState();
- Message<S2S_GET_STATE_REPLY> out;
- out.set_pointer((char *) &state, sizeof(state));
- send(from(), out);
+ send(from(), pack<S2S_GET_STATE_REPLY>(getState()));
+ break;
+ }
+
+ case PROCESS_EXIT: {
+ LOG(INFO) << "Process exited: " << from();
+
+ if (from() == master) {
+ LOG(WARNING) << "Master disconnected! "
+ << "Waiting for a new master to be elected.";
+ // TODO(benh): After so long waiting for a master, commit suicide.
+ } else {
+ // Check if an executor has exited (this is technically
+ // redundant because the isolation module should be doing
+ // this for us).
+ foreachpair (_, Executor *ex, executors) {
+ if (from() == ex->pid) {
+ LOG(INFO) << "Executor for framework " << ex->frameworkId
+ << " disconnected";
+ Framework *framework = getFramework(ex->frameworkId);
+ if (framework != NULL) {
+ send(master, pack<S2M_LOST_EXECUTOR>(id, ex->frameworkId, -1));
+ killFramework(framework);
+ }
+ break;
+ }
+ }
+ }
+
break;
}
case M2S_SHUTDOWN: {
LOG(INFO) << "Asked to shut down by master: " << from();
- foreachpaircopy (_, Framework *framework, frameworks) {
+ unordered_map<FrameworkID, Framework*> frameworksCopy = frameworks;
+ foreachpair (_, Framework *framework, frameworksCopy) {
killFramework(framework);
}
return;
@@ -553,46 +443,16 @@ void Slave::operator () ()
case S2S_SHUTDOWN: {
LOG(INFO) << "Asked to shut down by " << from();
- foreachpaircopy (_, Framework *framework, frameworks) {
+ unordered_map<FrameworkID, Framework*> frameworksCopy = frameworks;
+ foreachpair (_, Framework *framework, frameworksCopy) {
killFramework(framework);
}
return;
}
- case PROCESS_EXIT: {
- LOG(INFO) << "Process exited: " << from();
-
- if (from() == master) {
- LOG(WARNING) << "Master disconnected! "
- << "Waiting for a new master to be elected.";
- // TODO(benh): After so long waiting for a master, commit suicide.
- }
- break;
- }
-
- case PROCESS_TIMEOUT: {
- // Check and see if we should re-send any status updates.
- foreachpair (_, Framework* framework, frameworks) {
- foreachpair (double deadline, _, framework->statuses) {
- if (deadline <= elapsed()) {
- foreachpair (_, const TaskStatus& status, framework->statuses[deadline]) {
- LOG(WARNING) << "Resending status update"
- << " for task " << status.task_id()
- << " of framework " << framework->frameworkId;
- Message<S2M_STATUS_UPDATE> out;
- out.mutable_framework_id()->MergeFrom(framework->frameworkId);
- out.mutable_status()->MergeFrom(status);
- send(master, out);
- }
- }
- }
- }
- break;
- }
-
default: {
- LOG(ERROR) << "Received unknown message (" << msgid()
- << ") from " << from();
+ LOG(ERROR) << "Received unknown message ID " << msgid()
+ << " from " << from();
break;
}
}
@@ -600,66 +460,70 @@ void Slave::operator () ()
}
-Framework* Slave::getFramework(const FrameworkID& frameworkId)
+Framework * Slave::getFramework(FrameworkID frameworkId)
{
- if (frameworks.count(frameworkId) > 0) {
- return frameworks[frameworkId];
- }
+ FrameworkMap::iterator it = frameworks.find(frameworkId);
+ if (it == frameworks.end()) return NULL;
+ return it->second;
+}
+
- return NULL;
+Executor * Slave::getExecutor(FrameworkID frameworkId)
+{
+ ExecutorMap::iterator it = executors.find(frameworkId);
+ if (it == executors.end()) return NULL;
+ return it->second;
}
// Send any tasks queued up for the given framework to its executor
// (needed if we received tasks while the executor was starting up)
-void Slave::sendQueuedTasks(Framework* framework, Executor* executor)
+void Slave::sendQueuedTasks(Framework *framework)
{
- LOG(INFO) << "Flushing queued tasks for framework "
- << framework->frameworkId;
-
- CHECK(executor->pid != PID());
-
- foreach (const TaskDescription& task, executor->queuedTasks) {
- // Add the task to the executor.
- executor->addTask(task);
-
- Message<S2E_RUN_TASK> out;
- out.mutable_framework()->MergeFrom(framework->info);
- out.mutable_framework_id()->MergeFrom(framework->frameworkId);
- out.set_pid(framework->pid);
- out.mutable_task()->MergeFrom(task);
- send(executor->pid, out);
+ LOG(INFO) << "Flushing queued tasks for framework " << framework->id;
+ Executor *executor = getExecutor(framework->id);
+ if (!executor) return;
+ foreach(TaskDescription *td, framework->queuedTasks) {
+ send(executor->pid,
+ pack<S2E_RUN_TASK>(td->tid, td->name, td->args, td->params));
+ delete td;
}
-
- executor->queuedTasks.clear();
+ framework->queuedTasks.clear();
}
// Kill a framework (including its executor if killExecutor is true).
-void Slave::killFramework(Framework *framework, bool killExecutors)
+void Slave::killFramework(Framework *framework, bool killExecutor)
{
- LOG(INFO) << "Cleaning up framework " << framework->frameworkId;
+ LOG(INFO) << "Cleaning up framework " << framework->id;
- // Shutdown all executors of this framework.
- foreachpaircopy (const ExecutorID& executorId, Executor* executor, framework->executors) {
- if (killExecutors) {
- LOG(INFO) << "Killing executor " << executorId
- << " of framework " << framework->frameworkId;
+ // Cancel sending any reliable messages for this framework.
+ foreach (int seq, seqs[framework->id])
+ cancel(seq);
- send(executor->pid, S2E_KILL_EXECUTOR);
+ seqs.erase(framework->id);
+ // Remove its allocated resources.
+ framework->resources = Resources();
+
+ // If an executor is running, tell it to exit and kill it.
+ if (Executor *ex = getExecutor(framework->id)) {
+ if (killExecutor) {
+ LOG(INFO) << "Killing executor for framework " << framework->id;
// TODO(benh): There really isn't ANY time between when an
// executor gets a S2E_KILL_EXECUTOR message and the isolation
// module goes and kills it. We should really think about making
// the semantics of this better.
-
- isolationModule->killExecutor(framework, executor);
+ send(ex->pid, pack<S2E_KILL_EXECUTOR>());
+ isolationModule->killExecutor(framework);
}
- framework->destroyExecutor(executorId);
+ LOG(INFO) << "Cleaning up executor for framework " << framework->id;
+ delete ex;
+ executors.erase(framework->id);
}
- frameworks.erase(framework->frameworkId);
+ frameworks.erase(framework->id);
delete framework;
}
@@ -667,44 +531,18 @@ void Slave::killFramework(Framework *fra
// Called by isolation module when an executor process exits
// TODO(benh): Make this callback be a message so that we can avoid
// race conditions.
-void Slave::executorExited(const FrameworkID& frameworkId, const ExecutorID& executorId, int status)
+void Slave::executorExited(FrameworkID fid, int status)
{
- Framework* framework = getFramework(frameworkId);
- if (framework != NULL) {
- Executor* executor = framework->getExecutor(executorId);
- if (executor != NULL) {
- LOG(INFO) << "Exited executor " << executorId
- << " of framework " << frameworkId
- << " with status " << status;
-
- Message<S2M_EXITED_EXECUTOR> out;
- out.mutable_slave_id()->MergeFrom(slaveId);
- out.mutable_framework_id()->MergeFrom(frameworkId);
- out.mutable_executor_id()->MergeFrom(executorId);
- out.set_status(status);
- send(master, out);
-
- framework->destroyExecutor(executorId);
-
- // TODO(benh): When should we kill the presence of an entire
- // framework on a slave?
- if (framework->executors.size() == 0) {
- killFramework(framework);
- }
- } else {
- LOG(WARNING) << "UNKNOWN executor " << executorId
- << " of framework " << frameworkId
- << " has exited with status " << status;
- }
- } else {
- LOG(WARNING) << "UNKNOWN executor " << executorId
- << " of UNKNOWN framework " << frameworkId
- << " has exited with status " << status;
+ if (Framework *f = getFramework(fid)) {
+ LOG(INFO) << "Executor for framework " << fid << " exited "
+ << "with status " << status;
+ send(master, pack<S2M_LOST_EXECUTOR>(id, fid, status));
+ killFramework(f, false);
}
};
-string Slave::getUniqueWorkDirectory(const FrameworkID& frameworkId)
+string Slave::getUniqueWorkDirectory(FrameworkID fid)
{
string workDir;
if (conf.contains("work_dir")) {
@@ -716,7 +554,7 @@ string Slave::getUniqueWorkDirectory(con
}
ostringstream os(std::ios_base::app | std::ios_base::out);
- os << workDir << "/slave-" << slaveId << "/fw-" << frameworkId;
+ os << workDir << "/slave-" << id << "/fw-" << fid;
// Find a unique directory based on the path given by the slave
// (this is because we might launch multiple executors from the same
@@ -737,7 +575,7 @@ string Slave::getUniqueWorkDirectory(con
}
-const Configuration& Slave::getConfiguration()
+const Params& Slave::getConf()
{
return conf;
}