You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@mesos.apache.org by be...@apache.org on 2011/06/05 11:28:34 UTC
svn commit: r1132344 - in /incubator/mesos/trunk/src: common/utils.hpp
slave/slave.cpp slave/slave.hpp
Author: benh
Date: Sun Jun 5 09:28:33 2011
New Revision: 1132344
URL: http://svn.apache.org/viewvc?rev=1132344&view=rev
Log:
Added statistics to slaves, added some http endpoints for accessing those statistics (and getting general 'vars').
Modified:
incubator/mesos/trunk/src/common/utils.hpp
incubator/mesos/trunk/src/slave/slave.cpp
incubator/mesos/trunk/src/slave/slave.hpp
Modified: incubator/mesos/trunk/src/common/utils.hpp
URL: http://svn.apache.org/viewvc/incubator/mesos/trunk/src/common/utils.hpp?rev=1132344&r1=1132343&r2=1132344&view=diff
==============================================================================
--- incubator/mesos/trunk/src/common/utils.hpp (original)
+++ incubator/mesos/trunk/src/common/utils.hpp Sun Jun 5 09:28:33 2011
@@ -1,10 +1,41 @@
#ifndef __UTILS_HPP__
#define __UTILS_HPP__
+#include <unistd.h>
+
+
+// Useful common macros.
#define VA_NUM_ARGS_IMPL(_1, _2, _3, _4, _5, N, ...) N
#define VA_NUM_ARGS(...) VA_NUM_ARGS_IMPL(__VA_ARGS__, 5, 4, 3, 2, 1)
#define CONCAT_IMPL(A, B) A ## B
#define CONCAT(A, B) CONCAT_IMPL(A, B)
+
+namespace mesos { namespace internal { namespace utils {
+
+inline std::string getcwd()
+{
+ size_t size = 100;
+
+ while (true) {
+ char* temp = new char[size];
+ if (::getcwd(temp, size) == temp) {
+ std::string result(temp);
+ delete[] temp;
+ return result;
+ } else {
+ delete[] temp;
+ if (errno != ERANGE) {
+ return std::string();
+ }
+ size *= 2;
+ }
+ }
+
+ return std::string();
+}
+
+}}} // namespace mesos { namespace internal { namespace utils {
+
#endif // __UTILS_HPP__
Modified: incubator/mesos/trunk/src/slave/slave.cpp
URL: http://svn.apache.org/viewvc/incubator/mesos/trunk/src/slave/slave.cpp?rev=1132344&r1=1132343&r2=1132344&view=diff
==============================================================================
--- incubator/mesos/trunk/src/slave/slave.cpp (original)
+++ incubator/mesos/trunk/src/slave/slave.cpp Sun Jun 5 09:28:33 2011
@@ -6,109 +6,85 @@
#include <algorithm>
#include <fstream>
+#include <google/protobuf/descriptor.h>
+
#include "slave.hpp"
#include "webui.hpp"
+#include "common/utils.hpp"
+
// There's no gethostbyname2 on Solaris, so fake it by calling gethostbyname
#ifdef __sun__
#define gethostbyname2(name, _) gethostbyname(name)
#endif
+using namespace mesos;
+using namespace mesos::internal;
+using namespace mesos::internal::slave;
+
+using boost::lexical_cast;
+using boost::unordered_map;
+using boost::unordered_set;
+
+using process::HttpOKResponse;
+using process::HttpResponse;
+using process::HttpRequest;
+using process::Promise;
+using process::UPID;
+
using std::list;
using std::make_pair;
using std::ostringstream;
-using std::istringstream;
using std::pair;
using std::queue;
using std::string;
using std::vector;
-using boost::lexical_cast;
-using boost::unordered_map;
-using boost::unordered_set;
-
-using namespace mesos;
-using namespace mesos::internal;
-using namespace mesos::internal::slave;
-
-
-namespace {
-
-// Periodically sends heartbeats to the master
-class Heart : public MesosProcess
-{
-private:
- PID master;
- PID slave;
- SlaveID sid;
- double interval;
-
-protected:
- void operator () ()
- {
- link(slave);
- link(master);
- do {
- switch (receive(interval)) {
- case PROCESS_TIMEOUT:
- send(master, pack<SH2M_HEARTBEAT>(sid));
- break;
- case PROCESS_EXIT:
- return;
- }
- } while (true);
- }
-
-public:
- Heart(const PID &_master, const PID &_slave, SlaveID _sid, double _interval)
- : master(_master), slave(_slave), sid(_sid), interval(_interval) {}
-};
-
-// Default values for CPU cores and memory to include in configuration
-const int32_t DEFAULT_CPUS = 1;
-const int32_t DEFAULT_MEM = 1 * Gigabyte;
-
-
-} /* namespace */
-
-
-Slave::Slave(Resources _resources, bool _local,
+Slave::Slave(const Resources& _resources, bool _local,
IsolationModule *_isolationModule)
- : id(""), resources(_resources), local(_local),
- isolationModule(_isolationModule) {}
+ : MesosProcess<Slave>("slave"),
+ resources(_resources), local(_local), isolationModule(_isolationModule)
+{
+ initialize();
+}
-Slave::Slave(const Params& _conf, bool _local, IsolationModule *_module)
- : id(""), conf(_conf), local(_local), isolationModule(_module)
-{
- resources = Resources(conf.get<int32_t>("cpus", DEFAULT_CPUS),
- conf.get<int32_t>("mem", DEFAULT_MEM));
+Slave::Slave(const Configuration& _conf, bool _local,
+ IsolationModule* _isolationModule)
+ : MesosProcess<Slave>("slave"),
+ conf(_conf), local(_local), isolationModule(_isolationModule)
+{
+ resources =
+ Resources::parse(conf.get<string>("resources", "cpus:1;mem:1024"));
+
+ initialize();
}
-void Slave::registerOptions(Configurator* conf)
-{
- conf->addOption<int32_t>("cpus", 'c', "CPU cores for use by tasks",
- DEFAULT_CPUS);
- conf->addOption<int64_t>("mem", 'm', "Memory for use by tasks, in MB\n",
- DEFAULT_MEM);
- conf->addOption<string>("work_dir",
- "Where to place framework work directories\n"
- "(default: MESOS_HOME/work)");
- conf->addOption<string>("hadoop_home",
- "Where to find Hadoop installed (for fetching\n"
- "framework executors from HDFS)\n"
- "(default: look for HADOOP_HOME environment\n"
- "variable or find hadoop on PATH)");
- conf->addOption<bool>("switch_user",
- "Whether to run tasks as the user who\n"
- "submitted them rather than the user running\n"
- "the slave (requires setuid permission)",
- true);
- conf->addOption<string>("frameworks_home",
- "Directory prepended to relative executor\n"
- "paths (default: MESOS_HOME/frameworks)");
+void Slave::registerOptions(Configurator* configurator)
+{
+ // TODO(benh): Is there a way to specify units for the resources?
+ configurator->addOption<string>("resources",
+ "Total consumable resources per slave\n");
+// configurator->addOption<string>("attributes",
+// "Attributes of machine\n");
+ configurator->addOption<string>("work_dir",
+ "Where to place framework work directories\n"
+ "(default: MESOS_HOME/work)");
+ configurator->addOption<string>("hadoop_home",
+ "Where to find Hadoop installed (for\n"
+ "fetching framework executors from HDFS)\n"
+ "(default: look for HADOOP_HOME in\n"
+ "environment or find hadoop on PATH)");
+ configurator->addOption<bool>("switch_user",
+ "Whether to run tasks as the user who\n"
+ "submitted them rather than the user running\n"
+ "the slave (requires setuid permission)",
+ true);
+ configurator->addOption<string>("frameworks_home",
+ "Directory prepended to relative executor\n"
+ "paths (default: MESOS_HOME/frameworks)");
}
@@ -118,25 +94,63 @@ Slave::~Slave()
}
-state::SlaveState *Slave::getState()
+Promise<state::SlaveState*> Slave::getState()
{
- std::ostringstream my_pid;
- my_pid << self();
- std::ostringstream master_pid;
- master_pid << master;
- state::SlaveState *state =
- new state::SlaveState(BUILD_DATE, BUILD_USER, id, resources.cpus,
- resources.mem, my_pid.str(), master_pid.str());
-
- foreachpair(_, Framework *f, frameworks) {
- state::Framework *framework = new state::Framework(f->id, f->name,
- f->executorInfo.uri, f->executorStatus, f->resources.cpus,
- f->resources.mem);
- state->frameworks.push_back(framework);
- foreachpair(_, Task *t, f->tasks) {
- state::Task *task = new state::Task(t->id, t->name, t->state,
- t->resources.cpus, t->resources.mem);
- framework->tasks.push_back(task);
+ Resources resources(resources);
+ Resource::Scalar cpus;
+ Resource::Scalar mem;
+ cpus.set_value(-1);
+ mem.set_value(-1);
+ cpus = resources.getScalar("cpus", cpus);
+ mem = resources.getScalar("mem", mem);
+
+ state::SlaveState* state =
+ new state::SlaveState(build::DATE, build::USER, slaveId.value(),
+ cpus.value(), mem.value(), self(), master);
+
+ foreachpair (_, Framework* f, frameworks) {
+ foreachpair (_, Executor* e, f->executors) {
+ Resources resources(e->resources);
+ Resource::Scalar cpus;
+ Resource::Scalar mem;
+ cpus.set_value(-1);
+ mem.set_value(-1);
+ cpus = resources.getScalar("cpus", cpus);
+ mem = resources.getScalar("mem", mem);
+
+ // TOOD(benh): For now, we will add a state::Framework object
+ // for each executor that the framework has. Therefore, we tweak
+ // the framework ID to also include the associated executor ID
+ // to differentiate them. This is so we don't have to make very
+ // many changes to the webui right now. Note that this ID
+ // construction must be identical to what we do for directory
+ // suffix returned from Slave::getUniqueWorkDirectory.
+
+ string id = f->frameworkId.value() + "-" + e->info.executor_id().value();
+
+ state::Framework* framework =
+ new state::Framework(id, f->info.name(),
+ e->info.uri(), e->executorStatus,
+ cpus.value(), mem.value());
+
+ state->frameworks.push_back(framework);
+
+ foreachpair (_, Task* t, e->tasks) {
+ Resources resources(t->resources());
+ Resource::Scalar cpus;
+ Resource::Scalar mem;
+ cpus.set_value(-1);
+ mem.set_value(-1);
+ cpus = resources.getScalar("cpus", cpus);
+ mem = resources.getScalar("mem", mem);
+
+ state::Task* task =
+ new state::Task(t->task_id().value(), t->name(),
+ TaskState_descriptor()->FindValueByNumber(t->state())->name(),
+ cpus.value(), mem.value());
+
+ framework->tasks.push_back(task);
+ }
}
}
@@ -144,380 +158,777 @@ state::SlaveState *Slave::getState()
}
+void Slave::initialize()
+{
+ // Start all the statistics at 0.
+ statistics.launched_tasks = 0;
+ statistics.finished_tasks = 0;
+ statistics.killed_tasks = 0;
+ statistics.failed_tasks = 0;
+ statistics.lost_tasks = 0;
+ statistics.valid_status_updates = 0;
+ statistics.invalid_status_updates = 0;
+ statistics.valid_framework_messages = 0;
+ statistics.invalid_framework_messages = 0;
+
+ startTime = elapsedTime();
+
+ install(NEW_MASTER_DETECTED, &Slave::newMasterDetected,
+ &NewMasterDetectedMessage::pid);
+
+ install(NO_MASTER_DETECTED, &Slave::noMasterDetected);
+
+ install(M2S_REGISTER_REPLY, &Slave::registerReply,
+ &SlaveRegisteredMessage::slave_id);
+
+ install(M2S_REREGISTER_REPLY, &Slave::reregisterReply,
+ &SlaveRegisteredMessage::slave_id);
+
+ install(M2S_RUN_TASK, &Slave::runTask,
+ &RunTaskMessage::framework,
+ &RunTaskMessage::framework_id,
+ &RunTaskMessage::pid,
+ &RunTaskMessage::task);
+
+ install(M2S_KILL_TASK, &Slave::killTask,
+ &KillTaskMessage::framework_id,
+ &KillTaskMessage::task_id);
+
+ install(M2S_KILL_FRAMEWORK, &Slave::killFramework,
+ &KillFrameworkMessage::framework_id);
+
+ install(M2S_FRAMEWORK_MESSAGE, &Slave::schedulerMessage,
+ &FrameworkMessageMessage::slave_id,
+ &FrameworkMessageMessage::framework_id,
+ &FrameworkMessageMessage::executor_id,
+ &FrameworkMessageMessage::data);
+
+ install(M2S_UPDATE_FRAMEWORK, &Slave::updateFramework,
+ &UpdateFrameworkMessage::framework_id,
+ &UpdateFrameworkMessage::pid);
+
+ install(M2S_STATUS_UPDATE_ACK, &Slave::statusUpdateAck,
+ &StatusUpdateAckMessage::framework_id,
+ &StatusUpdateAckMessage::slave_id,
+ &StatusUpdateAckMessage::task_id);
+
+ install(E2S_REGISTER_EXECUTOR, &Slave::registerExecutor,
+ &RegisterExecutorMessage::framework_id,
+ &RegisterExecutorMessage::executor_id);
+
+ install(E2S_STATUS_UPDATE, &Slave::statusUpdate,
+ &StatusUpdateMessage::framework_id,
+ &StatusUpdateMessage::status);
+
+ install(E2S_FRAMEWORK_MESSAGE, &Slave::executorMessage,
+ &FrameworkMessageMessage::slave_id,
+ &FrameworkMessageMessage::framework_id,
+ &FrameworkMessageMessage::executor_id,
+ &FrameworkMessageMessage::data);
+
+ install(PING, &Slave::ping);
+
+ install(process::TIMEOUT, &Slave::timeout);
+
+ install(process::EXITED, &Slave::exited);
+
+ installHttpHandler("info.json", &Slave::http_info_json);
+ installHttpHandler("frameworks.json", &Slave::http_frameworks_json);
+ installHttpHandler("tasks.json", &Slave::http_tasks_json);
+ installHttpHandler("stats.json", &Slave::http_stats_json);
+ installHttpHandler("vars", &Slave::http_vars);
+}
+
+
void Slave::operator () ()
{
LOG(INFO) << "Slave started at " << self();
+ LOG(INFO) << "Slave resources: " << resources;
// Get our hostname
char buf[512];
gethostname(buf, sizeof(buf));
- hostent *he = gethostbyname2(buf, AF_INET);
+ hostent* he = gethostbyname2(buf, AF_INET);
string hostname = he->h_name;
- // Get our public DNS name. Normally this is our hostname, but on EC2
- // we look for the MESOS_PUBLIC_DNS environment variable. This allows
- // the master to display our public name in its web UI.
- string publicDns = hostname;
+ // Check and see if we have a different public DNS name. Normally
+ // this is our hostname, but on EC2 we look for the MESOS_PUBLIC_DNS
+ // environment variable. This allows the master to display our
+ // public name in its web UI.
+ string public_hostname = hostname;
if (getenv("MESOS_PUBLIC_DNS") != NULL) {
- publicDns = getenv("MESOS_PUBLIC_DNS");
+ public_hostname = getenv("MESOS_PUBLIC_DNS");
}
+ // Initialize slave info.
+ slave.set_hostname(hostname);
+ slave.set_public_hostname(public_hostname);
+ slave.mutable_resources()->MergeFrom(resources);
+
// Initialize isolation module.
isolationModule->initialize(this);
while (true) {
- switch (receive()) {
- case NEW_MASTER_DETECTED: {
- string masterSeq;
- PID masterPid;
- tie(masterSeq, masterPid) = unpack<NEW_MASTER_DETECTED>(body());
-
- LOG(INFO) << "New master at " << masterPid << " with ID:" << masterSeq;
-
- redirect(master, masterPid);
- master = masterPid;
- link(master);
-
- if (id.empty()) {
- // Slave started before master.
- send(master, pack<S2M_REGISTER_SLAVE>(hostname, publicDns, resources));
- } else {
- // Reconnecting, so reconstruct resourcesInUse for the master.
- Resources resourcesInUse;
- vector<Task> taskVec;
-
- foreachpair(_, Framework *framework, frameworks) {
- foreachpair(_, Task *task, framework->tasks) {
- resourcesInUse += task->resources;
- Task ti = *task;
- ti.slaveId = id;
- taskVec.push_back(ti);
- }
- }
+ serve(1);
+ if (name() == process::TERMINATE) {
+ LOG(INFO) << "Asked to shut down by " << from();
+ foreachpaircopy (_, Framework* framework, frameworks) {
+ killFramework(framework);
+ }
+ return;
+ }
+ }
+}
+
+
+void Slave::newMasterDetected(const string& pid)
+{
+ LOG(INFO) << "New master detected at " << pid;
- send(master, pack<S2M_REREGISTER_SLAVE>(id, hostname, publicDns, resources, taskVec));
+ master = pid;
+ link(master);
+
+ if (slaveId == "") {
+ // Slave started before master.
+ MSG<S2M_REGISTER_SLAVE> out;
+ out.mutable_slave()->MergeFrom(slave);
+ send(master, out);
+ } else {
+ // Re-registering, so send tasks running.
+ MSG<S2M_REREGISTER_SLAVE> out;
+ out.mutable_slave_id()->MergeFrom(slaveId);
+ out.mutable_slave()->MergeFrom(slave);
+
+ foreachpair (_, Framework* framework, frameworks) {
+ foreachpair (_, Executor* executor, framework->executors) {
+ foreachpair (_, Task* task, executor->tasks) {
+ out.add_tasks()->MergeFrom(*task);
}
- break;
- }
-
- case NO_MASTER_DETECTED: {
- LOG(INFO) << "Lost master(s) ... waiting";
- break;
}
+ }
- case M2S_REGISTER_REPLY: {
- double interval = 0;
- tie(this->id, interval) = unpack<M2S_REGISTER_REPLY>(body());
- LOG(INFO) << "Registered with master; given slave ID " << this->id;
- link(spawn(new Heart(master, self(), this->id, interval)));
- break;
- }
-
- case M2S_REREGISTER_REPLY: {
- SlaveID sid;
- double interval = 0;
- tie(sid, interval) = unpack<M2S_REREGISTER_REPLY>(body());
- LOG(INFO) << "RE-registered with master; given slave ID " << sid << " had "<< this->id;
- if (this->id == "")
- this->id = sid;
- CHECK(this->id == sid);
- link(spawn(new Heart(master, self(), this->id, interval)));
- break;
- }
-
- case M2S_RUN_TASK: {
- FrameworkID fid;
- TaskID tid;
- string fwName, user, taskName, taskArg;
- ExecutorInfo execInfo;
- Params params;
- PID pid;
- tie(fid, tid, fwName, user, execInfo, taskName, taskArg, params, pid) =
- unpack<M2S_RUN_TASK>(body());
- LOG(INFO) << "Got assigned task " << fid << ":" << tid;
- Resources res;
- res.cpus = params.getInt32("cpus", -1);
- res.mem = params.getInt64("mem", -1);
- Framework *framework = getFramework(fid);
- if (framework == NULL) {
- // Framework not yet created on this node - create it.
- framework = new Framework(fid, fwName, user, execInfo, pid);
- frameworks[fid] = framework;
- isolationModule->startExecutor(framework);
- }
- Task *task = framework->addTask(tid, taskName, res);
- Executor *executor = getExecutor(fid);
- if (executor) {
- send(executor->pid,
- pack<S2E_RUN_TASK>(tid, taskName, taskArg, params));
- isolationModule->resourcesChanged(framework);
- } else {
- // Executor not yet registered; queue task for when it starts up
- TaskDescription *td = new TaskDescription(
- tid, taskName, taskArg, params.str());
- framework->queuedTasks.push_back(td);
- }
- break;
- }
+ send(master, out);
+ }
+}
- case M2S_KILL_TASK: {
- FrameworkID fid;
- TaskID tid;
- tie(fid, tid) = unpack<M2S_KILL_TASK>(body());
- LOG(INFO) << "Killing task " << fid << ":" << tid;
- if (Executor *ex = getExecutor(fid)) {
- send(ex->pid, pack<S2E_KILL_TASK>(tid));
- }
- if (Framework *fw = getFramework(fid)) {
- fw->removeTask(tid);
- isolationModule->resourcesChanged(fw);
- }
- break;
- }
- case M2S_KILL_FRAMEWORK: {
- FrameworkID fid;
- tie(fid) = unpack<M2S_KILL_FRAMEWORK>(body());
- LOG(INFO) << "Asked to kill framework " << fid;
- Framework *fw = getFramework(fid);
- if (fw != NULL)
- killFramework(fw);
- break;
- }
+void Slave::noMasterDetected()
+{
+ LOG(INFO) << "Lost master(s) ... waiting";
+}
- case M2S_FRAMEWORK_MESSAGE: {
- FrameworkID fid;
- FrameworkMessage message;
- tie(fid, message) = unpack<M2S_FRAMEWORK_MESSAGE>(body());
- if (Executor *ex = getExecutor(fid)) {
- VLOG(1) << "Relaying framework message for framework " << fid;
- send(ex->pid, pack<S2E_FRAMEWORK_MESSAGE>(message));
- } else {
- VLOG(1) << "Dropping framework message for framework " << fid
- << " because its executor is not running";
- }
- // TODO(*): If executor is not started, queue framework message?
- // (It's probably okay to just drop it since frameworks can have
- // the executor send a message to the master to say when it's ready.)
- break;
- }
- case M2S_UPDATE_FRAMEWORK_PID: {
- FrameworkID fid;
- PID pid;
- tie(fid, pid) = unpack<M2S_UPDATE_FRAMEWORK_PID>(body());
- Framework *framework = getFramework(fid);
- if (framework != NULL) {
- LOG(INFO) << "Updating framework " << fid << " pid to " << pid;
- framework->pid = pid;
- }
- break;
- }
+void Slave::registerReply(const SlaveID& slaveId)
+{
+ LOG(INFO) << "Registered with master; given slave ID " << slaveId;
+ this->slaveId = slaveId;
+}
- case E2S_REGISTER_EXECUTOR: {
- FrameworkID fid;
- tie(fid) = unpack<E2S_REGISTER_EXECUTOR>(body());
- LOG(INFO) << "Got executor registration for framework " << fid;
- if (Framework *fw = getFramework(fid)) {
- if (getExecutor(fid) != 0) {
- LOG(ERROR) << "Executor for framework " << fid
- << "already exists";
- send(from(), pack<S2E_KILL_EXECUTOR>());
- break;
- }
- Executor *executor = new Executor(fid, from());
- executors[fid] = executor;
- link(from());
- // Now that the executor is up, set its resource limits
- isolationModule->resourcesChanged(fw);
- // Tell executor that it's registered and give it its queued tasks
- send(from(), pack<S2E_REGISTER_REPLY>(this->id,
- hostname,
- fw->name,
- fw->executorInfo.initArg));
- sendQueuedTasks(fw);
- } else {
- // Framework is gone; tell the executor to exit
- send(from(), pack<S2E_KILL_EXECUTOR>());
- }
- break;
- }
- case E2S_STATUS_UPDATE: {
- FrameworkID fid;
- TaskID tid;
- TaskState taskState;
- string data;
- tie(fid, tid, taskState, data) = unpack<E2S_STATUS_UPDATE>(body());
-
- Framework *framework = getFramework(fid);
- if (framework != NULL) {
- LOG(INFO) << "Got status update for task " << fid << ":" << tid;
- if (taskState == TASK_FINISHED || taskState == TASK_FAILED ||
- taskState == TASK_KILLED || taskState == TASK_LOST) {
- LOG(INFO) << "Task " << fid << ":" << tid << " done";
-
- framework->removeTask(tid);
- isolationModule->resourcesChanged(framework);
- }
-
- // Reliably send message and save sequence number for
- // canceling later.
- int seq = rsend(master, framework->pid,
- pack<S2M_STATUS_UPDATE>(id, fid, tid,
- taskState, data));
- seqs[fid].insert(seq);
- } else {
- LOG(WARNING) << "Got status update for UNKNOWN task "
- << fid << ":" << tid;
- }
- break;
- }
+void Slave::reregisterReply(const SlaveID& slaveId)
+{
+ LOG(INFO) << "Re-registered with master";
- case E2S_FRAMEWORK_MESSAGE: {
- FrameworkID fid;
- FrameworkMessage message;
- tie(fid, message) = unpack<E2S_FRAMEWORK_MESSAGE>(body());
-
- Framework *framework = getFramework(fid);
- if (framework != NULL) {
- LOG(INFO) << "Sending message for framework " << fid
- << " to " << framework->pid;
-
- // Set slave ID in case framework omitted it.
- message.slaveId = this->id;
- VLOG(1) << "Sending framework message to framework " << fid
- << " with PID " << framework->pid;
- send(framework->pid, pack<M2F_FRAMEWORK_MESSAGE>(message));
- }
- break;
- }
+ if (!(this->slaveId == slaveId)) {
+ LOG(FATAL) << "Slave re-registered but got wrong ID";
+ }
+}
- case S2S_GET_STATE: {
- send(from(), pack<S2S_GET_STATE_REPLY>(getState()));
- break;
- }
- case PROCESS_EXIT: {
- LOG(INFO) << "Process exited: " << from();
+void Slave::runTask(const FrameworkInfo& frameworkInfo,
+ const FrameworkID& frameworkId,
+ const string& pid,
+ const TaskDescription& task)
+{
+ LOG(INFO) << "Got assigned task " << task.task_id()
+ << " for framework " << frameworkId;
+
+ Framework* framework = getFramework(frameworkId);
+ if (framework == NULL) {
+ framework = new Framework(frameworkId, frameworkInfo, pid);
+ frameworks[frameworkId] = framework;
+ }
- if (from() == master) {
- LOG(WARNING) << "Master disconnected! "
- << "Waiting for a new master to be elected.";
- // TODO(benh): After so long waiting for a master, commit suicide.
- } else {
- // Check if an executor has exited (this is technically
- // redundant because the isolation module should be doing
- // this for us).
- foreachpair (_, Executor *ex, executors) {
- if (from() == ex->pid) {
- LOG(INFO) << "Executor for framework " << ex->frameworkId
- << " disconnected";
- Framework *framework = getFramework(ex->frameworkId);
- if (framework != NULL) {
- send(master, pack<S2M_LOST_EXECUTOR>(id, ex->frameworkId, -1));
- killFramework(framework);
- }
- break;
- }
- }
- }
+ // Either send the task to an executor or start a new executor
+ // and queue the task until the executor has started.
+ Executor* executor = task.has_executor()
+ ? framework->getExecutor(task.executor().executor_id())
+ : framework->getExecutor(framework->info.executor().executor_id());
+
+ if (executor != NULL) {
+ if (!executor->pid) {
+ // Queue task until the executor starts up.
+ executor->queuedTasks.push_back(task);
+ } else {
+ // Add the task to the executor.
+ executor->addTask(task);
+
+ MSG<S2E_RUN_TASK> out;
+ out.mutable_framework()->MergeFrom(framework->info);
+ out.mutable_framework_id()->MergeFrom(framework->frameworkId);
+ out.set_pid(framework->pid);
+ out.mutable_task()->MergeFrom(task);
+ send(executor->pid, out);
+ isolationModule->resourcesChanged(framework, executor);
+ }
+ } else {
+ // Launch an executor for this task.
+ if (task.has_executor()) {
+ executor = framework->createExecutor(task.executor());
+ } else {
+ executor = framework->createExecutor(framework->info.executor());
+ }
+
+ // Queue task until the executor starts up.
+ executor->queuedTasks.push_back(task);
+ // Tell the isolation module to launch the executor.
+ isolationModule->launchExecutor(framework, executor);
+ }
+}
+
+
+void Slave::killTask(const FrameworkID& frameworkId,
+ const TaskID& taskId)
+{
+ LOG(INFO) << "Asked to kill task " << taskId
+ << " of framework " << frameworkId;
+
+ Framework* framework = getFramework(frameworkId);
+ if (framework != NULL) {
+ // Tell the executor to kill the task if it is up and
+ // running, otherwise, consider the task lost.
+ Executor* executor = framework->getExecutor(taskId);
+ if (executor == NULL || !executor->pid) {
+ // Update the resources locally, if an executor comes up
+ // after this then it just won't receive this task.
+ executor->removeTask(taskId);
+ isolationModule->resourcesChanged(framework, executor);
+
+ MSG<S2M_STATUS_UPDATE> out;
+ out.mutable_framework_id()->MergeFrom(frameworkId);
+ TaskStatus *status = out.mutable_status();
+ status->mutable_task_id()->MergeFrom(taskId);
+ status->mutable_slave_id()->MergeFrom(slaveId);
+ status->set_state(TASK_LOST);
+ send(master, out);
+
+ double deadline = elapsedTime() + STATUS_UPDATE_RETRY_TIMEOUT;
+ framework->statuses[deadline][status->task_id()] = *status;
+ } else {
+ // Otherwise, send a message to the executor and wait for
+ // it to send us a status update.
+ MSG<S2E_KILL_TASK> out;
+ out.mutable_framework_id()->MergeFrom(frameworkId);
+ out.mutable_task_id()->MergeFrom(taskId);
+ send(executor->pid, out);
+ }
+ } else {
+ LOG(WARNING) << "Cannot kill task " << taskId
+ << " of framework " << frameworkId
+ << " because no such framework is running";
+
+ MSG<S2M_STATUS_UPDATE> out;
+ out.mutable_framework_id()->MergeFrom(frameworkId);
+ TaskStatus *status = out.mutable_status();
+ status->mutable_task_id()->MergeFrom(taskId);
+ status->mutable_slave_id()->MergeFrom(slaveId);
+ status->set_state(TASK_LOST);
+ send(master, out);
+
+ double deadline = elapsedTime() + STATUS_UPDATE_RETRY_TIMEOUT;
+ framework->statuses[deadline][status->task_id()] = *status;
+ }
+}
+
+
+void Slave::killFramework(const FrameworkID& frameworkId)
+{
+ LOG(INFO) << "Asked to kill framework " << frameworkId;
+
+ Framework* framework = getFramework(frameworkId);
+ if (framework != NULL) {
+ killFramework(framework);
+ }
+}
+
+
+void Slave::schedulerMessage(const SlaveID& slaveId,
+ const FrameworkID& frameworkId,
+ const ExecutorID& executorId,
+ const string& data)
+{
+ Framework* framework = getFramework(frameworkId);
+ if (framework != NULL) {
+ Executor* executor = framework->getExecutor(executorId);
+ if (executor == NULL) {
+ LOG(WARNING) << "Dropping message for executor '"
+ << executorId << "' of framework " << frameworkId
+ << " because executor does not exist";
+ statistics.invalid_framework_messages++;
+ } else if (!executor->pid) {
+ // TODO(*): If executor is not started, queue framework message?
+ // (It's probably okay to just drop it since frameworks can have
+ // the executor send a message to the master to say when it's ready.)
+ LOG(WARNING) << "Dropping message for executor '"
+ << executorId << "' of framework " << frameworkId
+ << " because executor is not running";
+ statistics.invalid_framework_messages++;
+ } else {
+ MSG<S2E_FRAMEWORK_MESSAGE> out;
+ out.mutable_slave_id()->MergeFrom(slaveId);
+ out.mutable_framework_id()->MergeFrom(frameworkId);
+ out.mutable_executor_id()->MergeFrom(executorId);
+ out.set_data(data);
+ send(executor->pid, out);
+
+ statistics.valid_framework_messages++;
+ }
+ } else {
+ LOG(WARNING) << "Dropping message for framework "<< frameworkId
+ << " because framework does not exist";
+ statistics.invalid_framework_messages++;
+ }
+}
+
+
+void Slave::updateFramework(const FrameworkID& frameworkId,
+ const string& pid)
+{
+ Framework* framework = getFramework(frameworkId);
+ if (framework != NULL) {
+ LOG(INFO) << "Updating framework " << frameworkId
+ << " pid to " <<pid;
+ framework->pid = pid;
+ }
+}
+
+
+void Slave::statusUpdateAck(const FrameworkID& frameworkId,
+ const SlaveID& slaveId,
+ const TaskID& taskId)
+{
+ Framework* framework = getFramework(frameworkId);
+ if (framework != NULL) {
+ foreachpair (double deadline, _, framework->statuses) {
+ if (framework->statuses[deadline].count(taskId) > 0) {
+ LOG(INFO) << "Got acknowledgement of status update"
+ << " for task " << taskId
+ << " of framework " << framework->frameworkId;
+ framework->statuses[deadline].erase(taskId);
break;
}
+ }
+ }
+}
- case M2S_SHUTDOWN: {
- LOG(INFO) << "Asked to shut down by master: " << from();
- unordered_map<FrameworkID, Framework*> frameworksCopy = frameworks;
- foreachpair (_, Framework *framework, frameworksCopy) {
- killFramework(framework);
- }
- return;
- }
- case S2S_SHUTDOWN: {
- LOG(INFO) << "Asked to shut down by " << from();
- unordered_map<FrameworkID, Framework*> frameworksCopy = frameworks;
- foreachpair (_, Framework *framework, frameworksCopy) {
- killFramework(framework);
+void Slave::registerExecutor(const FrameworkID& frameworkId,
+ const ExecutorID& executorId)
+{
+ LOG(INFO) << "Got registration for executor '" << executorId
+ << "' of framework " << frameworkId;
+
+ Framework* framework = getFramework(frameworkId);
+ if (framework != NULL) {
+ Executor* executor = framework->getExecutor(executorId);
+
+ // Check the status of the executor.
+ if (executor == NULL) {
+ LOG(WARNING) << "Not expecting executor '" << executorId
+ << "' of framework " << frameworkId;
+ send(from(), S2E_KILL_EXECUTOR);
+ } else if (executor->pid != UPID()) {
+ LOG(WARNING) << "Not good, executor '" << executorId
+ << "' of framework " << frameworkId
+ << " is already running";
+ send(from(), S2E_KILL_EXECUTOR);
+ } else {
+ // Save the pid for the executor.
+ executor->pid = from();
+
+ // Now that the executor is up, set its resource limits.
+ isolationModule->resourcesChanged(framework, executor);
+
+ // Tell executor it's registered and give it any queued tasks.
+ MSG<S2E_REGISTER_REPLY> out;
+ ExecutorArgs* args = out.mutable_args();
+ args->mutable_framework_id()->MergeFrom(framework->frameworkId);
+ args->mutable_executor_id()->MergeFrom(executor->info.executor_id());
+ args->mutable_slave_id()->MergeFrom(slaveId);
+ args->set_hostname(slave.hostname());
+ args->set_data(framework->info.executor().data());
+ send(executor->pid, out);
+ sendQueuedTasks(framework, executor);
+ }
+ } else {
+ // Framework is gone; tell the executor to exit.
+ LOG(WARNING) << "Framework " << frameworkId
+ << " does not exist (it may have been killed),"
+ << " telling executor to exit";
+
+ // TODO(benh): Don't we also want to tell the isolation
+ // module to shut this guy down!
+ send(from(), S2E_KILL_EXECUTOR);
+ }
+}
+
+
+void Slave::statusUpdate(const FrameworkID& frameworkId,
+ const TaskStatus& status)
+{
+ LOG(INFO) << "Status update: task " << status.task_id()
+ << " of framework " << frameworkId
+ << " is now in state "
+ << TaskState_descriptor()->FindValueByNumber(status.state())->name();
+
+ Framework* framework = getFramework(frameworkId);
+ if (framework != NULL) {
+ Executor* executor = framework->getExecutor(status.task_id());
+ if (executor != NULL) {
+ executor->updateTaskState(status.task_id(), status.state());
+
+ // Remove the task if necessary, and update statistics.
+ switch (status.state()) {
+ case TASK_FINISHED:
+ statistics.finished_tasks++;
+ executor->removeTask(status.task_id());
+ isolationModule->resourcesChanged(framework, executor);
+ break;
+ case TASK_FAILED:
+ statistics.failed_tasks++;
+ executor->removeTask(status.task_id());
+ isolationModule->resourcesChanged(framework, executor);
+ break;
+ case TASK_KILLED:
+ statistics.killed_tasks++;
+ executor->removeTask(status.task_id());
+ isolationModule->resourcesChanged(framework, executor);
+ break;
+ case TASK_LOST:
+ statistics.lost_tasks++;
+ executor->removeTask(status.task_id());
+ isolationModule->resourcesChanged(framework, executor);
+ break;
+ }
+
+ // Send message and record the status for possible resending.
+ MSG<S2M_STATUS_UPDATE> out;
+ out.mutable_framework_id()->MergeFrom(frameworkId);
+ out.mutable_status()->MergeFrom(status);
+ send(master, out);
+
+ double deadline = elapsedTime() + STATUS_UPDATE_RETRY_TIMEOUT;
+ framework->statuses[deadline][status.task_id()] = status;
+
+ statistics.valid_status_updates++;
+ } else {
+ LOG(WARNING) << "Status update error: couldn't lookup "
+ << "executor for framework " << frameworkId;
+ statistics.invalid_status_updates++;
+ }
+ } else {
+ LOG(WARNING) << "Status update error: couldn't lookup "
+ << "framework " << frameworkId;
+ statistics.invalid_status_updates++;
+ }
+}
+
+
+void Slave::executorMessage(const SlaveID& slaveId,
+ const FrameworkID& frameworkId,
+ const ExecutorID& executorId,
+ const string& data)
+{
+ Framework* framework = getFramework(frameworkId);
+ if (framework != NULL) {
+ LOG(INFO) << "Sending message for framework " << frameworkId
+ << " to " << framework->pid;
+
+ // TODO(benh): This is weird, sending an M2F message.
+ MSG<M2F_FRAMEWORK_MESSAGE> out;
+ out.mutable_slave_id()->MergeFrom(slaveId);
+ out.mutable_framework_id()->MergeFrom(frameworkId);
+ out.mutable_executor_id()->MergeFrom(executorId);
+ out.set_data(data);
+ send(framework->pid, out);
+
+ statistics.valid_framework_messages++;
+ } else {
+ LOG(WARNING) << "Cannot send framework message from slave "
+ << slaveId << " to framework " << frameworkId
+ << " because framework does not exist";
+ statistics.invalid_framework_messages++;
+ }
+}
+
+
+void Slave::ping()
+{
+ send(from(), PONG);
+}
+
+
+void Slave::timeout()
+{
+ // Check and see if we should re-send any status updates.
+ foreachpair (_, Framework* framework, frameworks) {
+ foreachpair (double deadline, _, framework->statuses) {
+ if (deadline <= elapsedTime()) {
+ foreachpair (_, const TaskStatus& status, framework->statuses[deadline]) {
+ LOG(WARNING) << "Resending status update"
+ << " for task " << status.task_id()
+ << " of framework " << framework->frameworkId;
+ MSG<S2M_STATUS_UPDATE> out;
+ out.mutable_framework_id()->MergeFrom(framework->frameworkId);
+ out.mutable_status()->MergeFrom(status);
+ send(master, out);
}
- return;
}
+ }
+ }
+}
- default: {
- LOG(ERROR) << "Received unknown message ID " << msgid()
- << " from " << from();
- break;
+void Slave::exited()
+{
+ LOG(INFO) << "Process exited: " << from();
+
+ if (from() == master) {
+ LOG(WARNING) << "Master disconnected! "
+ << "Waiting for a new master to be elected.";
+ // TODO(benh): After so long waiting for a master, commit suicide.
+ }
+}
+
+
+Promise<HttpResponse> Slave::http_info_json(const HttpRequest& request)
+{
+ LOG(INFO) << "HTTP request for '/slave/info.json'";
+
+ ostringstream out;
+
+ out <<
+ "{" <<
+ "\"built_date\":\"" << build::DATE << "\"," <<
+ "\"build_user\":\"" << build::USER << "\"," <<
+ "\"start_time\":\"" << startTime << "\"," <<
+ "\"pid\":\"" << self() << "\"" <<
+ "}";
+
+ HttpOKResponse response;
+ response.headers["Content-Type"] = "text/x-json;charset=UTF-8";
+ response.headers["Content-Length"] = lexical_cast<string>(out.str().size());
+ response.body = out.str().data();
+ return response;
+}
+
+
+Promise<HttpResponse> Slave::http_frameworks_json(const HttpRequest& request)
+{
+ LOG(INFO) << "HTTP request for '/slave/frameworks.json'";
+
+ ostringstream out;
+
+ out << "[";
+
+ foreachpair (_, Framework* framework, frameworks) {
+ out <<
+ "{" <<
+ "\"id\":\"" << framework->frameworkId << "\"," <<
+ "\"name\":\"" << framework->info.name() << "\"," <<
+ "\"user\":\"" << framework->info.user() << "\""
+ "},";
+ }
+
+ // Backup the put pointer to overwrite the last comma (hack).
+ if (frameworks.size() > 0) {
+ long pos = out.tellp();
+ out.seekp(pos - 1);
+ }
+
+ out << "]";
+
+ HttpOKResponse response;
+ response.headers["Content-Type"] = "text/x-json;charset=UTF-8";
+ response.headers["Content-Length"] = lexical_cast<string>(out.str().size());
+ response.body = out.str().data();
+ return response;
+}
+
+
+Promise<HttpResponse> Slave::http_tasks_json(const HttpRequest& request)
+{
+ LOG(INFO) << "HTTP request for '/slave/tasks.json'";
+
+ ostringstream out;
+
+ out << "[";
+
+ foreachpair (_, Framework* framework, frameworks) {
+ foreachpair (_, Executor* executor, framework->executors) {
+ foreachpair (_, Task* task, executor->tasks) {
+ // TODO(benh): Send all of the resources (as JSON).
+ Resources resources(task->resources());
+ Resource::Scalar cpus = resources.getScalar("cpus", Resource::Scalar());
+ Resource::Scalar mem = resources.getScalar("mem", Resource::Scalar());
+ const string& state =
+ TaskState_descriptor()->FindValueByNumber(task->state())->name();
+ out <<
+ "{" <<
+ "\"task_id\":\"" << task->task_id() << "\"," <<
+ "\"framework_id\":\"" << task->framework_id() << "\"," <<
+ "\"slave_id\":\"" << task->slave_id() << "\"," <<
+ "\"name\":\"" << task->name() << "\"," <<
+ "\"state\":\"" << state << "\"," <<
+ "\"cpus\":" << cpus.value() << "," <<
+ "\"mem\":" << mem.value() <<
+ "},";
}
}
}
+
+ // Backup the put pointer to overwrite the last comma (hack).
+ if (frameworks.size() > 0) {
+ long pos = out.tellp();
+ out.seekp(pos - 1);
+ }
+
+ out << "]";
+
+ HttpOKResponse response;
+ response.headers["Content-Type"] = "text/x-json;charset=UTF-8";
+ response.headers["Content-Length"] = lexical_cast<string>(out.str().size());
+ response.body = out.str().data();
+ return response;
}
-Framework * Slave::getFramework(FrameworkID frameworkId)
+Promise<HttpResponse> Slave::http_stats_json(const HttpRequest& request)
{
- FrameworkMap::iterator it = frameworks.find(frameworkId);
- if (it == frameworks.end()) return NULL;
- return it->second;
+ LOG(INFO) << "Http request for '/slave/stats.json'";
+
+ ostringstream out;
+
+ out <<
+ "{" <<
+ "\"uptime\":" << elapsedTime() - startTime << "," <<
+ "\"total_frameworks\":" << frameworks.size() << "," <<
+ "\"launched_tasks\":" << statistics.launched_tasks << "," <<
+ "\"finished_tasks\":" << statistics.finished_tasks << "," <<
+ "\"killed_tasks\":" << statistics.killed_tasks << "," <<
+ "\"failed_tasks\":" << statistics.failed_tasks << "," <<
+ "\"lost_tasks\":" << statistics.lost_tasks << "," <<
+ "\"valid_status_updates\":" << statistics.valid_status_updates << "," <<
+ "\"invalid_status_updates\":" << statistics.invalid_status_updates << "," <<
+ "\"valid_framework_messages\":" << statistics.valid_framework_messages << "," <<
+ "\"invalid_framework_messages\":" << statistics.invalid_framework_messages <<
+ "}";
+
+ HttpOKResponse response;
+ response.headers["Content-Type"] = "text/x-json;charset=UTF-8";
+ response.headers["Content-Length"] = lexical_cast<string>(out.str().size());
+ response.body = out.str().data();
+ return response;
+}
+
+
+Promise<HttpResponse> Slave::http_vars(const HttpRequest& request)
+{
+ LOG(INFO) << "HTTP request for '/slave/vars'";
+
+ ostringstream out;
+
+ out <<
+ "build_date " << build::DATE << "\n" <<
+ "build_user " << build::USER << "\n" <<
+ "build_flags " << build::FLAGS << "\n";
+
+ // Also add the configuration values.
+ foreachpair (const string& key, const string& value, conf.getMap()) {
+ out << key << " " << value << "\n";
+ }
+
+ out <<
+ "uptime " << elapsedTime() - startTime << "\n" <<
+ "total_frameworks " << frameworks.size() << "\n" <<
+ "launched_tasks " << statistics.launched_tasks << "\n" <<
+ "finished_tasks " << statistics.finished_tasks << "\n" <<
+ "killed_tasks " << statistics.killed_tasks << "\n" <<
+ "failed_tasks " << statistics.failed_tasks << "\n" <<
+ "lost_tasks " << statistics.lost_tasks << "\n" <<
+ "valid_status_updates " << statistics.valid_status_updates << "\n" <<
+ "invalid_status_updates " << statistics.invalid_status_updates << "\n" <<
+ "valid_framework_messages " << statistics.valid_framework_messages << "\n" <<
+ "invalid_framework_messages " << statistics.invalid_framework_messages << "\n";
+
+ HttpOKResponse response;
+ response.headers["Content-Type"] = "text/plain";
+ response.headers["Content-Length"] = lexical_cast<string>(out.str().size());
+ response.body = out.str().data();
+ return response;
}
-Executor * Slave::getExecutor(FrameworkID frameworkId)
+Framework* Slave::getFramework(const FrameworkID& frameworkId)
{
- ExecutorMap::iterator it = executors.find(frameworkId);
- if (it == executors.end()) return NULL;
- return it->second;
+ if (frameworks.count(frameworkId) > 0) {
+ return frameworks[frameworkId];
+ }
+
+ return NULL;
}
// Send any tasks queued up for the given framework to its executor
// (needed if we received tasks while the executor was starting up)
-void Slave::sendQueuedTasks(Framework *framework)
+void Slave::sendQueuedTasks(Framework* framework, Executor* executor)
{
- LOG(INFO) << "Flushing queued tasks for framework " << framework->id;
- Executor *executor = getExecutor(framework->id);
- if (!executor) return;
- foreach(TaskDescription *td, framework->queuedTasks) {
- send(executor->pid,
- pack<S2E_RUN_TASK>(td->tid, td->name, td->args, td->params));
- delete td;
+ LOG(INFO) << "Flushing queued tasks for framework "
+ << framework->frameworkId;
+
+ CHECK(executor->pid != UPID());
+
+ foreach (const TaskDescription& task, executor->queuedTasks) {
+ // Add the task to the executor.
+ executor->addTask(task);
+
+ MSG<S2E_RUN_TASK> out;
+ out.mutable_framework()->MergeFrom(framework->info);
+ out.mutable_framework_id()->MergeFrom(framework->frameworkId);
+ out.set_pid(framework->pid);
+ out.mutable_task()->MergeFrom(task);
+ send(executor->pid, out);
}
- framework->queuedTasks.clear();
+
+ executor->queuedTasks.clear();
}
// Kill a framework (including its executor if killExecutor is true).
-void Slave::killFramework(Framework *framework, bool killExecutor)
+void Slave::killFramework(Framework *framework, bool killExecutors)
{
- LOG(INFO) << "Cleaning up framework " << framework->id;
+ LOG(INFO) << "Cleaning up framework " << framework->frameworkId;
- // Cancel sending any reliable messages for this framework.
- foreach (int seq, seqs[framework->id])
- cancel(seq);
+ // Shutdown all executors of this framework.
+ foreachpaircopy (const ExecutorID& executorId, Executor* executor, framework->executors) {
+ if (killExecutors) {
+ LOG(INFO) << "Killing executor '" << executorId
+ << "' of framework " << framework->frameworkId;
- seqs.erase(framework->id);
+ send(executor->pid, S2E_KILL_EXECUTOR);
- // Remove its allocated resources.
- framework->resources = Resources();
-
- // If an executor is running, tell it to exit and kill it.
- if (Executor *ex = getExecutor(framework->id)) {
- if (killExecutor) {
- LOG(INFO) << "Killing executor for framework " << framework->id;
// TODO(benh): There really isn't ANY time between when an
// executor gets a S2E_KILL_EXECUTOR message and the isolation
// module goes and kills it. We should really think about making
// the semantics of this better.
- send(ex->pid, pack<S2E_KILL_EXECUTOR>());
- isolationModule->killExecutor(framework);
+
+ isolationModule->killExecutor(framework, executor);
}
- LOG(INFO) << "Cleaning up executor for framework " << framework->id;
- delete ex;
- executors.erase(framework->id);
+ framework->destroyExecutor(executorId);
}
- frameworks.erase(framework->id);
+ frameworks.erase(framework->frameworkId);
delete framework;
}
@@ -525,30 +936,58 @@ void Slave::killFramework(Framework *fra
// Called by isolation module when an executor process exits
// TODO(benh): Make this callback be a message so that we can avoid
// race conditions.
-void Slave::executorExited(FrameworkID fid, int status)
+void Slave::executorExited(const FrameworkID& frameworkId, const ExecutorID& executorId, int result)
{
- if (Framework *f = getFramework(fid)) {
- LOG(INFO) << "Executor for framework " << fid << " exited "
- << "with status " << status;
- send(master, pack<S2M_LOST_EXECUTOR>(id, fid, status));
- killFramework(f, false);
+ Framework* framework = getFramework(frameworkId);
+ if (framework != NULL) {
+ Executor* executor = framework->getExecutor(executorId);
+ if (executor != NULL) {
+ LOG(INFO) << "Exited executor '" << executorId
+ << "' of framework " << frameworkId
+ << " with result " << result;
+
+ MSG<S2M_EXITED_EXECUTOR> out;
+ out.mutable_slave_id()->MergeFrom(slaveId);
+ out.mutable_framework_id()->MergeFrom(frameworkId);
+ out.mutable_executor_id()->MergeFrom(executorId);
+ out.set_result(result);
+ send(master, out);
+
+ framework->destroyExecutor(executorId);
+
+ // TODO(benh): When should we kill the presence of an entire
+ // framework on a slave?
+ if (framework->executors.size() == 0) {
+ killFramework(framework);
+ }
+ } else {
+ LOG(WARNING) << "UNKNOWN executor '" << executorId
+ << "' of framework " << frameworkId
+ << " has exited with result " << result;
+ }
+ } else {
+ LOG(WARNING) << "UNKNOWN executor '" << executorId
+ << "' of UNKNOWN framework " << frameworkId
+ << " has exited with result " << result;
}
};
-string Slave::getUniqueWorkDirectory(FrameworkID fid)
+string Slave::getUniqueWorkDirectory(const FrameworkID& frameworkId,
+ const ExecutorID& executorId)
{
- string workDir;
+ string workDir = ".";
if (conf.contains("work_dir")) {
- workDir = conf["work_dir"];
+ workDir = conf.get("work_dir", workDir);
} else if (conf.contains("home")) {
- workDir = conf["home"] + "/work";
- } else {
- workDir = "work";
+ workDir = conf.get("home", workDir);
}
+ workDir = workDir + "/work";
+
ostringstream os(std::ios_base::app | std::ios_base::out);
- os << workDir << "/slave-" << id << "/fw-" << fid;
+ os << workDir << "/slave-" << slaveId
+ << "/fw-" << frameworkId << "-" << executorId;
// Find a unique directory based on the path given by the slave
// (this is because we might launch multiple executors from the same
@@ -569,7 +1008,7 @@ string Slave::getUniqueWorkDirectory(Fra
}
-const Params& Slave::getConf()
+const Configuration& Slave::getConfiguration()
{
return conf;
}
Modified: incubator/mesos/trunk/src/slave/slave.hpp
URL: http://svn.apache.org/viewvc/incubator/mesos/trunk/src/slave/slave.hpp?rev=1132344&r1=1132343&r2=1132344&view=diff
==============================================================================
--- incubator/mesos/trunk/src/slave/slave.hpp (original)
+++ incubator/mesos/trunk/src/slave/slave.hpp Sun Jun 5 09:28:33 2011
@@ -132,7 +132,7 @@ struct Executor
// Information about a framework.
struct Framework
{
- Framework( const FrameworkID& _frameworkId, const FrameworkInfo& _info,
+ Framework(const FrameworkID& _frameworkId, const FrameworkInfo& _info,
const process::UPID& _pid)
: frameworkId(_frameworkId), info(_info), pid(_pid) {}
@@ -200,13 +200,21 @@ public:
process::Promise<state::SlaveState*> getState();
- // Callback used by isolation module to tell us when an executor exits.
- void executorExited(const FrameworkID& frameworkId, const ExecutorID& executorId, int result);
+ // Callback used by isolation module to tell us when an executor
+ // exits.
+ void executorExited(const FrameworkID& frameworkId,
+ const ExecutorID& executorId,
+ int result);
// Kill a framework (possibly killing its executor).
void killFramework(Framework *framework, bool killExecutors = true);
- std::string getUniqueWorkDirectory(const FrameworkID& frameworkId, const ExecutorID& executorId);
+ // Helper function for generating a unique work directory for this
+ // framework/executor pair (non-trivial since a framework/executor
+ // pair may be launched more than once on the same slave).
+ std::string getUniqueWorkDirectory(
+ const FrameworkID& frameworkId,
+ const ExecutorID& executorId);
const Configuration& getConfiguration();
@@ -260,7 +268,14 @@ protected:
void sendQueuedTasks(Framework* framework, Executor* executor);
private:
- Configuration conf;
+ // TODO(benh): Better naming and name scope for these http handlers.
+ process::Promise<process::HttpResponse> http_info_json(const process::HttpRequest& request);
+ process::Promise<process::HttpResponse> http_frameworks_json(const process::HttpRequest& request);
+ process::Promise<process::HttpResponse> http_tasks_json(const process::HttpRequest& request);
+ process::Promise<process::HttpResponse> http_stats_json(const process::HttpRequest& request);
+ process::Promise<process::HttpResponse> http_vars(const process::HttpRequest& request);
+
+ const Configuration conf;
SlaveInfo slave;
@@ -271,6 +286,21 @@ private:
boost::unordered_map<FrameworkID, Framework*> frameworks;
IsolationModule *isolationModule;
+
+ // Statistics (initialized in Slave::initialize).
+ struct {
+ uint64_t launched_tasks;
+ uint64_t finished_tasks;
+ uint64_t killed_tasks;
+ uint64_t failed_tasks;
+ uint64_t lost_tasks;
+ uint64_t valid_status_updates;
+ uint64_t invalid_status_updates;
+ uint64_t valid_framework_messages;
+ uint64_t invalid_framework_messages;
+ } statistics;
+
+ double startTime;
};
}}}