You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@mesos.apache.org by be...@apache.org on 2011/06/27 08:08:39 UTC
svn commit: r1140024 [5/15] - in /incubator/mesos/trunk: ./ ec2/
ec2/deploy.karmic64/ ec2/deploy.solaris/ frameworks/torque/nexus-hpl/
include/mesos/ src/ src/common/ src/configurator/ src/detector/
src/examples/ src/examples/java/ src/examples/python/...
Modified: incubator/mesos/trunk/src/slave/process_based_isolation_module.hpp
URL: http://svn.apache.org/viewvc/incubator/mesos/trunk/src/slave/process_based_isolation_module.hpp?rev=1140024&r1=1140023&r2=1140024&view=diff
==============================================================================
--- incubator/mesos/trunk/src/slave/process_based_isolation_module.hpp (original)
+++ incubator/mesos/trunk/src/slave/process_based_isolation_module.hpp Mon Jun 27 06:08:33 2011
@@ -1,33 +1,46 @@
#ifndef __PROCESS_BASED_ISOLATION_MODULE_HPP__
#define __PROCESS_BASED_ISOLATION_MODULE_HPP__
-#include <sys/types.h>
+#include <string>
-#include <boost/unordered_map.hpp>
+#include <sys/types.h>
#include "isolation_module.hpp"
+#include "reaper.hpp"
#include "slave.hpp"
-#include "launcher/launcher.hpp"
+#include "common/hashmap.hpp"
-#include "messaging/messages.hpp"
+#include "launcher/launcher.hpp"
namespace mesos { namespace internal { namespace slave {
-class ProcessBasedIsolationModule : public IsolationModule {
+class ProcessBasedIsolationModule
+ : public IsolationModule, public ProcessExitedListener
+{
public:
ProcessBasedIsolationModule();
virtual ~ProcessBasedIsolationModule();
- virtual void initialize(Slave *slave);
+ virtual void initialize(const Configuration& conf,
+ bool local,
+ const process::PID<Slave>& slave);
+
+ virtual void launchExecutor(const FrameworkID& frameworkId,
+ const FrameworkInfo& frameworkInfo,
+ const ExecutorInfo& executorInfo,
+ const std::string& directory);
+
+ virtual void killExecutor(const FrameworkID& frameworkId,
+ const ExecutorID& executorId);
+
+ virtual void resourcesChanged(const FrameworkID& frameworkId,
+ const ExecutorID& executorId,
+ const Resources& resources);
- virtual void launchExecutor(Framework* framework, Executor* executor);
-
- virtual void killExecutor(Framework* framework, Executor* executor);
-
- virtual void resourcesChanged(Framework* framework, Executor* executor);
+ virtual void processExited(pid_t pid, int status);
protected:
// Main method executed after a fork() to create a Launcher for launching
@@ -37,24 +50,24 @@ protected:
// Subclasses of ProcessBasedIsolationModule that wish to override the
// default launching behavior should override createLauncher() and return
// their own Launcher object (including possibly a subclass of Launcher).
- virtual launcher::ExecutorLauncher* createExecutorLauncher(Framework* framework, Executor* executor);
+ virtual launcher::ExecutorLauncher* createExecutorLauncher(
+ const FrameworkID& frameworkId,
+ const FrameworkInfo& frameworkInfo,
+ const ExecutorInfo& executorInfo,
+ const std::string& directory);
private:
- // Reaps child processes and tells the slave if they exit
- class Reaper : public process::Process<Reaper> {
- ProcessBasedIsolationModule* module;
-
- protected:
- virtual void operator () ();
-
- public:
- Reaper(ProcessBasedIsolationModule* module);
- };
-
+ // No copying, no assigning.
+ ProcessBasedIsolationModule(const ProcessBasedIsolationModule&);
+ ProcessBasedIsolationModule& operator = (const ProcessBasedIsolationModule&);
+
+ // TODO(benh): Make variables const by passing them via constructor.
+ Configuration conf;
+ bool local;
+ process::PID<Slave> slave;
bool initialized;
- Slave* slave;
- boost::unordered_map<FrameworkID, boost::unordered_map<ExecutorID, pid_t> > pgids;
Reaper* reaper;
+ hashmap<FrameworkID, hashmap<ExecutorID, pid_t> > pgids;
};
}}}
Modified: incubator/mesos/trunk/src/slave/projd.cpp
URL: http://svn.apache.org/viewvc/incubator/mesos/trunk/src/slave/projd.cpp?rev=1140024&r1=1140023&r2=1140024&view=diff
==============================================================================
--- incubator/mesos/trunk/src/slave/projd.cpp (original)
+++ incubator/mesos/trunk/src/slave/projd.cpp Mon Jun 27 06:08:33 2011
@@ -7,7 +7,7 @@
#include "common/fatal.hpp"
-#include "messaging/messages.hpp"
+#include "messages/messages.hpp"
namespace mesos { namespace internal { namespace projd {
Added: incubator/mesos/trunk/src/slave/reaper.cpp
URL: http://svn.apache.org/viewvc/incubator/mesos/trunk/src/slave/reaper.cpp?rev=1140024&view=auto
==============================================================================
--- incubator/mesos/trunk/src/slave/reaper.cpp (added)
+++ incubator/mesos/trunk/src/slave/reaper.cpp Mon Jun 27 06:08:33 2011
@@ -0,0 +1,48 @@
+#include <sys/types.h>
+#include <sys/wait.h>
+
+#include <process/dispatch.hpp>
+
+#include "reaper.hpp"
+
+#include "common/foreach.hpp"
+
+using namespace process;
+
+
+namespace mesos { namespace internal { namespace slave {
+
+Reaper::Reaper() {}
+
+
+Reaper::~Reaper() {}
+
+
+void Reaper::addProcessExitedListener(
+ const PID<ProcessExitedListener>& listener)
+{
+ listeners.insert(listener);
+}
+
+
+void Reaper::operator () ()
+{
+ while (true) {
+ serve(1);
+ if (name() == TIMEOUT) {
+ // Check whether any child process has exited.
+ pid_t pid;
+ int status;
+ if ((pid = waitpid((pid_t) -1, &status, WNOHANG)) > 0) {
+ foreach (const PID<ProcessExitedListener>& listener, listeners) {
+ dispatch(listener, &ProcessExitedListener::processExited,
+ pid, status);
+ }
+ }
+ } else if (name() == TERMINATE) {
+ return;
+ }
+ }
+}
+
+}}} // namespace mesos { namespace internal { namespace slave {
Added: incubator/mesos/trunk/src/slave/reaper.hpp
URL: http://svn.apache.org/viewvc/incubator/mesos/trunk/src/slave/reaper.hpp?rev=1140024&view=auto
==============================================================================
--- incubator/mesos/trunk/src/slave/reaper.hpp (added)
+++ incubator/mesos/trunk/src/slave/reaper.hpp Mon Jun 27 06:08:33 2011
@@ -0,0 +1,36 @@
+#ifndef __REAPER_HPP__
+#define __REAPER_HPP__
+
+#include <set>
+
+#include <process/process.hpp>
+
+
+namespace mesos { namespace internal { namespace slave {
+
+class ProcessExitedListener : public process::Process<ProcessExitedListener>
+{
+public:
+ virtual void processExited(pid_t pid, int status) = 0;
+};
+
+
+class Reaper : public process::Process<Reaper>
+{
+public:
+ Reaper();
+ virtual ~Reaper();
+
+ void addProcessExitedListener(const process::PID<ProcessExitedListener>&);
+
+protected:
+ virtual void operator () ();
+
+private:
+ std::set<process::PID<ProcessExitedListener> > listeners;
+};
+
+
+}}} // namespace mesos { namespace internal { namespace slave {
+
+#endif // __REAPER_HPP__
Modified: incubator/mesos/trunk/src/slave/slave.cpp
URL: http://svn.apache.org/viewvc/incubator/mesos/trunk/src/slave/slave.cpp?rev=1140024&r1=1140023&r2=1140024&view=diff
==============================================================================
--- incubator/mesos/trunk/src/slave/slave.cpp (original)
+++ incubator/mesos/trunk/src/slave/slave.cpp Mon Jun 27 06:08:33 2011
@@ -1,59 +1,205 @@
-#include <dirent.h>
#include <errno.h>
-#include <stdio.h>
-#include <string.h>
#include <algorithm>
-#include <fstream>
+#include <iomanip>
-#include <google/protobuf/descriptor.h>
+#include <process/timer.hpp>
#include "slave.hpp"
-#include "webui.hpp"
+#include "common/build.hpp"
+#include "common/type_utils.hpp"
#include "common/utils.hpp"
-// There's no gethostbyname2 on Solaris, so fake it by calling gethostbyname
-#ifdef __sun__
-#define gethostbyname2(name, _) gethostbyname(name)
-#endif
-
-using namespace mesos;
-using namespace mesos::internal;
-using namespace mesos::internal::slave;
-
-using boost::lexical_cast;
-using boost::unordered_map;
-using boost::unordered_set;
-
-using process::HttpOKResponse;
-using process::HttpResponse;
-using process::HttpRequest;
-using process::Promise;
-using process::UPID;
-
-using std::list;
-using std::make_pair;
-using std::ostringstream;
-using std::pair;
-using std::queue;
+using namespace process;
+
using std::string;
-using std::vector;
+using process::wait; // Necessary on some OS's to disambiguate.
-Slave::Slave(const Resources& _resources, bool _local,
- IsolationModule *_isolationModule)
- : MesosProcess<Slave>("slave"),
- resources(_resources), local(_local), isolationModule(_isolationModule)
+
+namespace mesos { namespace internal { namespace slave {
+
+// Information describing an executor (goes away if executor crashes).
+struct Executor
{
- initialize();
-}
+ Executor(const FrameworkID& _frameworkId,
+ const ExecutorInfo& _info,
+ const string& _directory)
+ : frameworkId(_frameworkId),
+ info(_info),
+ directory(_directory),
+ id(_info.executor_id()),
+ uuid(UUID::random()),
+ pid(UPID()),
+ shutdown(false) {}
+
+ ~Executor()
+ {
+ // Delete the tasks.
+ foreachvalue (Task* task, launchedTasks) {
+ delete task;
+ }
+ }
+
+ Task* addTask(const TaskDescription& task)
+ {
+ // The master should enforce unique task IDs, but just in case
+ // maybe we shouldn't make this a fatal error.
+ CHECK(!launchedTasks.contains(task.task_id()));
+
+ Task *t = new Task();
+ t->mutable_framework_id()->MergeFrom(frameworkId);
+ t->mutable_executor_id()->MergeFrom(id);
+ t->set_state(TASK_STARTING);
+ t->set_name(task.name());
+ t->mutable_task_id()->MergeFrom(task.task_id());
+ t->mutable_slave_id()->MergeFrom(task.slave_id());
+ t->mutable_resources()->MergeFrom(task.resources());
+
+ launchedTasks[task.task_id()] = t;
+ resources += task.resources();
+ }
+
+ void removeTask(const TaskID& taskId)
+ {
+ // Remove the task if it's queued.
+ queuedTasks.erase(taskId);
+
+ // Update the resources if it's been launched.
+ if (launchedTasks.contains(taskId)) {
+ Task* task = launchedTasks[taskId];
+ foreach (const Resource& resource, task->resources()) {
+ resources -= resource;
+ }
+ launchedTasks.erase(taskId);
+ delete task;
+ }
+ }
+
+ void updateTaskState(const TaskID& taskId, TaskState state)
+ {
+ if (launchedTasks.contains(taskId)) {
+ launchedTasks[taskId]->set_state(state);
+ }
+ }
+
+ const ExecutorID id;
+ const ExecutorInfo info;
+
+ const FrameworkID frameworkId;
+
+ const string directory;
+
+ const UUID uuid; // Distinguishes executor instances with same ExecutorID.
+
+ UPID pid;
+
+ bool shutdown; // Indicates if executor is being shut down.
+
+ Resources resources; // Currently consumed resources.
+
+ hashmap<TaskID, TaskDescription> queuedTasks;
+ hashmap<TaskID, Task*> launchedTasks;
+};
+
+
+// Information about a framework.
+struct Framework
+{
+ Framework(const FrameworkID& _id,
+ const FrameworkInfo& _info,
+ const UPID& _pid)
+ : id(_id), info(_info), pid(_pid) {}
+
+ ~Framework() {}
+
+ Executor* createExecutor(const ExecutorInfo& executorInfo,
+ const string& directory)
+ {
+ Executor* executor = new Executor(id, executorInfo, directory);
+ CHECK(!executors.contains(executorInfo.executor_id()));
+ executors[executorInfo.executor_id()] = executor;
+ return executor;
+ }
+
+ void destroyExecutor(const ExecutorID& executorId)
+ {
+ if (executors.contains(executorId)) {
+ Executor* executor = executors[executorId];
+ executors.erase(executorId);
+ delete executor;
+ }
+ }
+
+ Executor* getExecutor(const ExecutorID& executorId)
+ {
+ if (executors.contains(executorId)) {
+ return executors[executorId];
+ }
+
+ return NULL;
+ }
+
+ Executor* getExecutor(const TaskID& taskId)
+ {
+ foreachvalue (Executor* executor, executors) {
+ if (executor->queuedTasks.contains(taskId) ||
+ executor->launchedTasks.contains(taskId)) {
+ return executor;
+ }
+ }
+
+ return NULL;
+ }
+
+ const FrameworkID id;
+ const FrameworkInfo info;
+
+ UPID pid;
+
+ // Current running executors.
+ hashmap<ExecutorID, Executor*> executors;
+
+ // Status updates keyed by uuid.
+ hashmap<UUID, StatusUpdate> updates;
+};
+
+
+// // Represents a pending status update that has been sent and we are
+// // waiting for an acknowledgement. In pa
+
+// // stream of status updates for a framework/task. Note
+// // that these are stored in the slave rather than per Framework
+// // because a framework might go away before all of the status
+// // updates have been sent and acknowledged.
+// struct Slave::StatusUpdateStream
+// {
+// StatusUpdateStreamID streamId;
+// string directory;
+// FILE* updates;
+// FILE* acknowledged;
+// queue<StatusUpdate> pending;
+// double timeout;
+// };
+
+
+// StatusUpdateStreamID id;
+
+
+
+// queue<StatusUpdate> pending;
+// double timeout;
+// };
-Slave::Slave(const Configuration& _conf, bool _local,
+Slave::Slave(const Configuration& _conf,
+ bool _local,
IsolationModule* _isolationModule)
- : MesosProcess<Slave>("slave"),
- conf(_conf), local(_local), isolationModule(_isolationModule)
+ : ProcessBase("slave"),
+ conf(_conf),
+ local(_local),
+ isolationModule(_isolationModule)
{
resources =
Resources::parse(conf.get<string>("resources", "cpus:1;mem:1024"));
@@ -62,36 +208,66 @@ Slave::Slave(const Configuration& _conf,
}
-void Slave::registerOptions(Configurator* configurator)
+Slave::Slave(const Resources& _resources,
+ bool _local,
+ IsolationModule *_isolationModule)
+ : ProcessBase("slave"),
+ resources(_resources),
+ local(_local),
+ isolationModule(_isolationModule)
{
- // TODO(benh): Is there a way to specify units for the resources?
- configurator->addOption<string>("resources",
- "Total consumable resources per slave\n");
-// configurator->addOption<string>("attributes",
-// "Attributes of machine\n");
- configurator->addOption<string>("work_dir",
- "Where to place framework work directories\n"
- "(default: MESOS_HOME/work)");
- configurator->addOption<string>("hadoop_home",
- "Where to find Hadoop installed (for\n"
- "fetching framework executors from HDFS)\n"
- "(default: look for HADOOP_HOME in\n"
- "environment or find hadoop on PATH)");
- configurator->addOption<bool>("switch_user",
- "Whether to run tasks as the user who\n"
- "submitted them rather than the user running\n"
- "the slave (requires setuid permission)",
- true);
- configurator->addOption<string>("frameworks_home",
- "Directory prepended to relative executor\n"
- "paths (default: MESOS_HOME/frameworks)");
+ initialize();
}
Slave::~Slave()
{
+ // TODO(benh): Shut down and free frameworks?
+
// TODO(benh): Shut down and free executors? The executor should get
- // an "exited" event and initiate shutdown itself.
+ // an "exited" event and initiate a shut down itself.
+}
+
+
+void Slave::registerOptions(Configurator* configurator)
+{
+ // TODO(benh): Is there a way to specify units for the resources?
+ configurator->addOption<string>(
+ "resources",
+ "Total consumable resources per slave\n");
+
+ configurator->addOption<string>(
+ "attributes",
+ "Attributes of machine\n");
+
+ configurator->addOption<string>(
+ "work_dir",
+ "Where to place framework work directories\n"
+ "(default: MESOS_HOME/work)");
+
+ configurator->addOption<string>(
+ "hadoop_home",
+ "Where to find Hadoop installed (for\n"
+ "fetching framework executors from HDFS)\n"
+ "(default: look for HADOOP_HOME in\n"
+ "environment or find hadoop on PATH)");
+
+ configurator->addOption<bool>(
+ "switch_user",
+ "Whether to run tasks as the user who\n"
+ "submitted them rather than the user running\n"
+ "the slave (requires setuid permission)",
+ true);
+
+ configurator->addOption<string>(
+ "frameworks_home",
+ "Directory prepended to relative executor\n"
+ "paths (default: MESOS_HOME/frameworks)");
+
+ configurator->addOption<double>(
+ "executor_shutdown_timeout_seconds",
+ "Amount of time (in seconds) to wait for an executor to shut down\n",
+ EXECUTOR_SHUTDOWN_TIMEOUT_SECONDS);
}
@@ -105,12 +281,12 @@ Promise<state::SlaveState*> Slave::getSt
cpus = resources.getScalar("cpus", cpus);
mem = resources.getScalar("mem", mem);
- state::SlaveState* state =
- new state::SlaveState(build::DATE, build::USER, slaveId.value(),
- cpus.value(), mem.value(), self(), master);
+ state::SlaveState* state = new state::SlaveState(
+ build::DATE, build::USER, id.value(),
+ cpus.value(), mem.value(), self(), master);
- foreachpair (_, Framework* f, frameworks) {
- foreachpair (_, Executor* e, f->executors) {
+ foreachvalue (Framework* f, frameworks) {
+ foreachvalue (Executor* e, f->executors) {
Resources resources(e->resources);
Resource::Scalar cpus;
Resource::Scalar mem;
@@ -127,16 +303,16 @@ Promise<state::SlaveState*> Slave::getSt
// construction must be identical to what we do for directory
// suffix returned from Slave::getUniqueWorkDirectory.
- string id = f->frameworkId.value() + "-" + e->info.executor_id().value();
+ string id = f->id.value() + "-" + e->id.value();
- state::Framework* framework =
- new state::Framework(id, f->info.name(),
- e->info.uri(), e->executorStatus,
- cpus.value(), mem.value());
+ state::Framework* framework = new state::Framework(
+ id, f->info.name(),
+ e->info.uri(), "",
+ cpus.value(), mem.value());
state->frameworks.push_back(framework);
- foreachpair (_, Task* t, e->tasks) {
+ foreachvalue (Task* t, e->launchedTasks) {
Resources resources(t->resources());
Resource::Scalar cpus;
Resource::Scalar mem;
@@ -145,10 +321,10 @@ Promise<state::SlaveState*> Slave::getSt
cpus = resources.getScalar("cpus", cpus);
mem = resources.getScalar("mem", mem);
- state::Task* task =
- new state::Task(t->task_id().value(), t->name(),
- TaskState_descriptor()->FindValueByNumber(t->state())->name(),
- cpus.value(), mem.value());
+ state::Task* task = new state::Task(
+ t->task_id().value(), t->name(),
+ TaskState_Name(t->state()),
+ cpus.value(), mem.value());
framework->tasks.push_back(task);
}
@@ -162,77 +338,93 @@ Promise<state::SlaveState*> Slave::getSt
void Slave::initialize()
{
// Start all the statistics at 0.
- statistics.launched_tasks = 0;
- statistics.finished_tasks = 0;
- statistics.killed_tasks = 0;
- statistics.failed_tasks = 0;
- statistics.lost_tasks = 0;
- statistics.valid_status_updates = 0;
- statistics.invalid_status_updates = 0;
- statistics.valid_framework_messages = 0;
- statistics.invalid_framework_messages = 0;
+ CHECK(TASK_STARTING == TaskState_MIN);
+ CHECK(TASK_LOST == TaskState_MAX);
+ stats.tasks[TASK_STARTING] = 0;
+ stats.tasks[TASK_RUNNING] = 0;
+ stats.tasks[TASK_FINISHED] = 0;
+ stats.tasks[TASK_FAILED] = 0;
+ stats.tasks[TASK_KILLED] = 0;
+ stats.tasks[TASK_LOST] = 0;
+ stats.validStatusUpdates = 0;
+ stats.invalidStatusUpdates = 0;
+ stats.validFrameworkMessages = 0;
+ stats.invalidFrameworkMessages = 0;
startTime = elapsedTime();
- install(NEW_MASTER_DETECTED, &Slave::newMasterDetected,
- &NewMasterDetectedMessage::pid);
-
- install(NO_MASTER_DETECTED, &Slave::noMasterDetected);
-
- install(M2S_REGISTER_REPLY, &Slave::registerReply,
- &SlaveRegisteredMessage::slave_id);
-
- install(M2S_REREGISTER_REPLY, &Slave::reregisterReply,
- &SlaveRegisteredMessage::slave_id);
-
- install(M2S_RUN_TASK, &Slave::runTask,
- &RunTaskMessage::framework,
- &RunTaskMessage::framework_id,
- &RunTaskMessage::pid,
- &RunTaskMessage::task);
-
- install(M2S_KILL_TASK, &Slave::killTask,
- &KillTaskMessage::framework_id,
- &KillTaskMessage::task_id);
-
- install(M2S_KILL_FRAMEWORK, &Slave::killFramework,
- &KillFrameworkMessage::framework_id);
-
- install(M2S_FRAMEWORK_MESSAGE, &Slave::schedulerMessage,
- &FrameworkMessageMessage::slave_id,
- &FrameworkMessageMessage::framework_id,
- &FrameworkMessageMessage::executor_id,
- &FrameworkMessageMessage::data);
-
- install(M2S_UPDATE_FRAMEWORK, &Slave::updateFramework,
- &UpdateFrameworkMessage::framework_id,
- &UpdateFrameworkMessage::pid);
-
- install(M2S_STATUS_UPDATE_ACK, &Slave::statusUpdateAck,
- &StatusUpdateAckMessage::framework_id,
- &StatusUpdateAckMessage::slave_id,
- &StatusUpdateAckMessage::task_id);
-
- install(E2S_REGISTER_EXECUTOR, &Slave::registerExecutor,
- &RegisterExecutorMessage::framework_id,
- &RegisterExecutorMessage::executor_id);
-
- install(E2S_STATUS_UPDATE, &Slave::statusUpdate,
- &StatusUpdateMessage::framework_id,
- &StatusUpdateMessage::status);
-
- install(E2S_FRAMEWORK_MESSAGE, &Slave::executorMessage,
- &FrameworkMessageMessage::slave_id,
- &FrameworkMessageMessage::framework_id,
- &FrameworkMessageMessage::executor_id,
- &FrameworkMessageMessage::data);
-
- install(PING, &Slave::ping);
-
- install(process::TIMEOUT, &Slave::timeout);
-
- install(process::EXITED, &Slave::exited);
+ // Install protobuf handlers.
+ installProtobufHandler<NewMasterDetectedMessage>(
+ &Slave::newMasterDetected,
+ &NewMasterDetectedMessage::pid);
+
+ installProtobufHandler<NoMasterDetectedMessage>(
+ &Slave::noMasterDetected);
+
+ installProtobufHandler<SlaveRegisteredMessage>(
+ &Slave::registered,
+ &SlaveRegisteredMessage::slave_id);
+
+ installProtobufHandler<SlaveReregisteredMessage>(
+ &Slave::reregistered,
+ &SlaveReregisteredMessage::slave_id);
+
+ installProtobufHandler<RunTaskMessage>(
+ &Slave::runTask,
+ &RunTaskMessage::framework,
+ &RunTaskMessage::framework_id,
+ &RunTaskMessage::pid,
+ &RunTaskMessage::task);
+
+ installProtobufHandler<KillTaskMessage>(
+ &Slave::killTask,
+ &KillTaskMessage::framework_id,
+ &KillTaskMessage::task_id);
+
+ installProtobufHandler<ShutdownFrameworkMessage>(
+ &Slave::shutdownFramework,
+ &ShutdownFrameworkMessage::framework_id);
+
+ installProtobufHandler<FrameworkToExecutorMessage>(
+ &Slave::schedulerMessage,
+ &FrameworkToExecutorMessage::slave_id,
+ &FrameworkToExecutorMessage::framework_id,
+ &FrameworkToExecutorMessage::executor_id,
+ &FrameworkToExecutorMessage::data);
+
+ installProtobufHandler<UpdateFrameworkMessage>(
+ &Slave::updateFramework,
+ &UpdateFrameworkMessage::framework_id,
+ &UpdateFrameworkMessage::pid);
+
+ installProtobufHandler<StatusUpdateAcknowledgementMessage>(
+ &Slave::statusUpdateAcknowledgement,
+ &StatusUpdateAcknowledgementMessage::slave_id,
+ &StatusUpdateAcknowledgementMessage::framework_id,
+ &StatusUpdateAcknowledgementMessage::task_id,
+ &StatusUpdateAcknowledgementMessage::uuid);
+
+ installProtobufHandler<RegisterExecutorMessage>(
+ &Slave::registerExecutor,
+ &RegisterExecutorMessage::framework_id,
+ &RegisterExecutorMessage::executor_id);
+
+ installProtobufHandler<StatusUpdateMessage>(
+ &Slave::statusUpdate,
+ &StatusUpdateMessage::update);
+
+ installProtobufHandler<ExecutorToFrameworkMessage>(
+ &Slave::executorMessage,
+ &ExecutorToFrameworkMessage::slave_id,
+ &ExecutorToFrameworkMessage::framework_id,
+ &ExecutorToFrameworkMessage::executor_id,
+ &ExecutorToFrameworkMessage::data);
+
+ // Install some message handlers.
+ installMessageHandler(process::EXITED, &Slave::exited);
+ installMessageHandler("PING", &Slave::ping);
+ // Install some HTTP handlers.
installHttpHandler("info.json", &Slave::http_info_json);
installHttpHandler("frameworks.json", &Slave::http_frameworks_json);
installHttpHandler("tasks.json", &Slave::http_tasks_json);
@@ -246,11 +438,15 @@ void Slave::operator () ()
LOG(INFO) << "Slave started at " << self();
LOG(INFO) << "Slave resources: " << resources;
- // Get our hostname
- char buf[512];
- gethostname(buf, sizeof(buf));
- hostent* he = gethostbyname2(buf, AF_INET);
- string hostname = he->h_name;
+ Result<string> result = utils::os::hostname();
+
+ if (result.isError()) {
+ LOG(FATAL) << "Failed to get hostname: " << result.error();
+ }
+
+ CHECK(result.isSome());
+
+ string hostname = result.get();
// Check and see if we have a different public DNS name. Normally
// this is our hostname, but on EC2 we look for the MESOS_PUBLIC_DNS
@@ -262,53 +458,69 @@ void Slave::operator () ()
}
// Initialize slave info.
- slave.set_hostname(hostname);
- slave.set_public_hostname(public_hostname);
- slave.mutable_resources()->MergeFrom(resources);
-
- // Initialize isolation module.
- isolationModule->initialize(this);
+ info.set_hostname(hostname);
+ info.set_public_hostname(public_hostname);
+ info.mutable_resources()->MergeFrom(resources);
+
+ // Spawn and initialize the isolation module.
+ spawn(isolationModule);
+ dispatch(isolationModule,
+ &IsolationModule::initialize,
+ conf, local, self());
while (true) {
serve(1);
- if (name() == process::TERMINATE) {
- LOG(INFO) << "Asked to shut down by " << from();
- foreachpaircopy (_, Framework* framework, frameworks) {
- killFramework(framework);
+ if (name() == TERMINATE) {
+ LOG(INFO) << "Asked to terminate by " << from();
+ foreachkey (const FrameworkID& frameworkId, frameworks) {
+ // TODO(benh): Because a shut down isn't instantaneous (but has
+ // a shut down/kill phases) we might not actually propogate all
+ // the status updates appropriately here. Consider providing
+ // an alternative function which skips the shut down phase and
+ // simply does a kill (sending all status updates
+ // immediately). Of course, this still isn't sufficient
+ // because those status updates might get lost and we won't
+ // resend them unless we build that into the system.
+ shutdownFramework(frameworkId);
}
- return;
+ break;
}
}
+
+ // Stop the isolation module.
+ terminate(isolationModule);
+ wait(isolationModule);
}
-void Slave::newMasterDetected(const string& pid)
+void Slave::newMasterDetected(const UPID& pid)
{
LOG(INFO) << "New master detected at " << pid;
master = pid;
link(master);
- if (slaveId == "") {
+ if (id == "") {
// Slave started before master.
- MSG<S2M_REGISTER_SLAVE> out;
- out.mutable_slave()->MergeFrom(slave);
- send(master, out);
+ RegisterSlaveMessage message;
+ message.mutable_slave()->MergeFrom(info);
+ send(master, message);
} else {
// Re-registering, so send tasks running.
- MSG<S2M_REREGISTER_SLAVE> out;
- out.mutable_slave_id()->MergeFrom(slaveId);
- out.mutable_slave()->MergeFrom(slave);
-
- foreachpair (_, Framework* framework, frameworks) {
- foreachpair (_, Executor* executor, framework->executors) {
- foreachpair (_, Task* task, executor->tasks) {
- out.add_tasks()->MergeFrom(*task);
+ ReregisterSlaveMessage message;
+ message.mutable_slave_id()->MergeFrom(id);
+ message.mutable_slave()->MergeFrom(info);
+
+ foreachvalue (Framework* framework, frameworks) {
+ foreachvalue (Executor* executor, framework->executors) {
+ foreachvalue (Task* task, executor->launchedTasks) {
+ // TODO(benh): Also need to send queued tasks here ...
+ message.add_tasks()->MergeFrom(*task);
}
}
}
- send(master, out);
+ send(master, message);
}
}
@@ -319,18 +531,18 @@ void Slave::noMasterDetected()
}
-void Slave::registerReply(const SlaveID& slaveId)
+void Slave::registered(const SlaveID& slaveId)
{
LOG(INFO) << "Registered with master; given slave ID " << slaveId;
- this->slaveId = slaveId;
+ id = slaveId;
}
-void Slave::reregisterReply(const SlaveID& slaveId)
+void Slave::reregistered(const SlaveID& slaveId)
{
LOG(INFO) << "Re-registered with master";
- if (!(this->slaveId == slaveId)) {
+ if (!(id == slaveId)) {
LOG(FATAL) << "Slave re-registered but got wrong ID";
}
}
@@ -350,41 +562,77 @@ void Slave::runTask(const FrameworkInfo&
frameworks[frameworkId] = framework;
}
+ const ExecutorInfo& executorInfo = task.has_executor()
+ ? task.executor()
+ : framework->info.executor();
+
+ const ExecutorID& executorId = executorInfo.executor_id();
+
// Either send the task to an executor or start a new executor
// and queue the task until the executor has started.
- Executor* executor = task.has_executor()
- ? framework->getExecutor(task.executor().executor_id())
- : framework->getExecutor(framework->info.executor().executor_id());
-
+ Executor* executor = framework->getExecutor(executorId);
+
if (executor != NULL) {
- if (!executor->pid) {
+ if (executor->shutdown) {
+ LOG(WARNING) << "WARNING! Asked to run task '" << task.task_id()
+ << "' for framework " << frameworkId
+ << " with executor '" << executorId
+ << "' which is being shut down";
+
+ StatusUpdateMessage message;
+ StatusUpdate* update = message.mutable_update();
+ update->mutable_framework_id()->MergeFrom(frameworkId);
+ update->mutable_slave_id()->MergeFrom(id);
+ TaskStatus* status = update->mutable_status();
+ status->mutable_task_id()->MergeFrom(task.task_id());
+ status->set_state(TASK_LOST);
+ update->set_timestamp(elapsedTime());
+ update->set_uuid(UUID::random().toBytes());
+ send(master, message);
+ } else if (!executor->pid) {
// Queue task until the executor starts up.
- executor->queuedTasks.push_back(task);
+ LOG(INFO) << "Queuing task '" << task.task_id()
+ << "' for executor " << executorId
+ << " of framework '" << frameworkId;
+ executor->queuedTasks[task.task_id()] = task;
} else {
- // Add the task to the executor.
+ // Add the task and send it to the executor.
executor->addTask(task);
- MSG<S2E_RUN_TASK> out;
- out.mutable_framework()->MergeFrom(framework->info);
- out.mutable_framework_id()->MergeFrom(framework->frameworkId);
- out.set_pid(framework->pid);
- out.mutable_task()->MergeFrom(task);
- send(executor->pid, out);
- isolationModule->resourcesChanged(framework, executor);
+ stats.tasks[TASK_STARTING]++;
+
+ RunTaskMessage message;
+ message.mutable_framework()->MergeFrom(framework->info);
+ message.mutable_framework_id()->MergeFrom(framework->id);
+ message.set_pid(framework->pid);
+ message.mutable_task()->MergeFrom(task);
+ send(executor->pid, message);
+
+ // Now update the resources.
+ dispatch(isolationModule,
+ &IsolationModule::resourcesChanged,
+ framework->id, executor->id, executor->resources);
}
} else {
// Launch an executor for this task.
- if (task.has_executor()) {
- executor = framework->createExecutor(task.executor());
- } else {
- executor = framework->createExecutor(framework->info.executor());
- }
+ const string& directory = getUniqueWorkDirectory(framework->id, executorId);
+
+ LOG(INFO) << "Using '" << directory
+ << "' as work directory for executor '" << executorId
+ << "' of framework " << framework->id;
+
+ executor = framework->createExecutor(executorInfo, directory);
// Queue task until the executor starts up.
- executor->queuedTasks.push_back(task);
+ executor->queuedTasks[task.task_id()] = task;
+
+ // Tell the isolation module to launch the executor. (TODO(benh):
+ // Make the isolation module a process so that it can block while
+ // trying to launch the executor.)
+ dispatch(isolationModule,
+ &IsolationModule::launchExecutor,
+ framework->id, framework->info, executor->info, directory);
- // Tell the isolation module to launch the executor.
- isolationModule->launchExecutor(framework, executor);
}
}
@@ -396,60 +644,92 @@ void Slave::killTask(const FrameworkID&
<< " of framework " << frameworkId;
Framework* framework = getFramework(frameworkId);
- if (framework != NULL) {
- // Tell the executor to kill the task if it is up and
- // running, otherwise, consider the task lost.
- Executor* executor = framework->getExecutor(taskId);
- if (executor == NULL || !executor->pid) {
- // Update the resources locally, if an executor comes up
- // after this then it just won't receive this task.
- executor->removeTask(taskId);
- isolationModule->resourcesChanged(framework, executor);
-
- MSG<S2M_STATUS_UPDATE> out;
- out.mutable_framework_id()->MergeFrom(frameworkId);
- TaskStatus *status = out.mutable_status();
- status->mutable_task_id()->MergeFrom(taskId);
- status->mutable_slave_id()->MergeFrom(slaveId);
- status->set_state(TASK_LOST);
- send(master, out);
-
- double deadline = elapsedTime() + STATUS_UPDATE_RETRY_TIMEOUT;
- framework->statuses[deadline][status->task_id()] = *status;
- } else {
- // Otherwise, send a message to the executor and wait for
- // it to send us a status update.
- MSG<S2E_KILL_TASK> out;
- out.mutable_framework_id()->MergeFrom(frameworkId);
- out.mutable_task_id()->MergeFrom(taskId);
- send(executor->pid, out);
- }
- } else {
- LOG(WARNING) << "Cannot kill task " << taskId
+ if (framework == NULL) {
+ LOG(WARNING) << "WARNING! Cannot kill task " << taskId
<< " of framework " << frameworkId
<< " because no such framework is running";
- MSG<S2M_STATUS_UPDATE> out;
- out.mutable_framework_id()->MergeFrom(frameworkId);
- TaskStatus *status = out.mutable_status();
+ StatusUpdateMessage message;
+ StatusUpdate* update = message.mutable_update();
+ update->mutable_framework_id()->MergeFrom(frameworkId);
+ update->mutable_slave_id()->MergeFrom(id);
+ TaskStatus* status = update->mutable_status();
status->mutable_task_id()->MergeFrom(taskId);
- status->mutable_slave_id()->MergeFrom(slaveId);
status->set_state(TASK_LOST);
- send(master, out);
+ update->set_timestamp(elapsedTime());
+ update->set_uuid(UUID::random().toBytes());
+ send(master, message);
- double deadline = elapsedTime() + STATUS_UPDATE_RETRY_TIMEOUT;
- framework->statuses[deadline][status->task_id()] = *status;
+ return;
+ }
+
+
+ // Tell the executor to kill the task if it is up and
+ // running, otherwise, consider the task lost.
+ Executor* executor = framework->getExecutor(taskId);
+ if (executor == NULL) {
+ LOG(WARNING) << "WARNING! Cannot kill task " << taskId
+ << " of framework " << frameworkId
+ << " because no such task is running";
+
+ StatusUpdateMessage message;
+ StatusUpdate* update = message.mutable_update();
+ update->mutable_framework_id()->MergeFrom(framework->id);
+ update->mutable_slave_id()->MergeFrom(id);
+ TaskStatus* status = update->mutable_status();
+ status->mutable_task_id()->MergeFrom(taskId);
+ status->set_state(TASK_LOST);
+ update->set_timestamp(elapsedTime());
+ update->set_uuid(UUID::random().toBytes());
+ send(master, message);
+ } else if (!executor->pid) {
+ // Remove the task.
+ executor->removeTask(taskId);
+
+ // Tell the isolation module to update the resources.
+ dispatch(isolationModule,
+ &IsolationModule::resourcesChanged,
+ framework->id, executor->id, executor->resources);
+
+ StatusUpdateMessage message;
+ StatusUpdate* update = message.mutable_update();
+ update->mutable_framework_id()->MergeFrom(framework->id);
+ update->mutable_executor_id()->MergeFrom(executor->id);
+ update->mutable_slave_id()->MergeFrom(id);
+ TaskStatus* status = update->mutable_status();
+ status->mutable_task_id()->MergeFrom(taskId);
+ status->set_state(TASK_KILLED);
+ update->set_timestamp(elapsedTime());
+ update->set_uuid(UUID::random().toBytes());
+ send(master, message);
+ } else {
+ // Otherwise, send a message to the executor and wait for
+ // it to send us a status update.
+ KillTaskMessage message;
+ message.mutable_framework_id()->MergeFrom(frameworkId);
+ message.mutable_task_id()->MergeFrom(taskId);
+ send(executor->pid, message);
}
}
-void Slave::killFramework(const FrameworkID& frameworkId)
+// TODO(benh): Consider sending a boolean that specifies if the
+// shut down should be graceful or immediate. Likewise, consider
+// sending back a shut down acknowledgement, because otherwise you
+// couuld get into a state where a shut down was sent, dropped, and
+// therefore never processed.
+void Slave::shutdownFramework(const FrameworkID& frameworkId)
{
- LOG(INFO) << "Asked to kill framework " << frameworkId;
+ LOG(INFO) << "Asked to shut down framework " << frameworkId;
Framework* framework = getFramework(frameworkId);
if (framework != NULL) {
- killFramework(framework);
+ LOG(INFO) << "Shutting down framework " << framework->id;
+
+ // Shut down all executors of this framework.
+ foreachvalue (Executor* executor, framework->executors) {
+ shutdownExecutor(framework, executor);
+ }
}
}
@@ -460,35 +740,36 @@ void Slave::schedulerMessage(const Slave
const string& data)
{
Framework* framework = getFramework(frameworkId);
- if (framework != NULL) {
- Executor* executor = framework->getExecutor(executorId);
- if (executor == NULL) {
- LOG(WARNING) << "Dropping message for executor '"
- << executorId << "' of framework " << frameworkId
- << " because executor does not exist";
- statistics.invalid_framework_messages++;
- } else if (!executor->pid) {
- // TODO(*): If executor is not started, queue framework message?
- // (It's probably okay to just drop it since frameworks can have
- // the executor send a message to the master to say when it's ready.)
- LOG(WARNING) << "Dropping message for executor '"
- << executorId << "' of framework " << frameworkId
- << " because executor is not running";
- statistics.invalid_framework_messages++;
- } else {
- MSG<S2E_FRAMEWORK_MESSAGE> out;
- out.mutable_slave_id()->MergeFrom(slaveId);
- out.mutable_framework_id()->MergeFrom(frameworkId);
- out.mutable_executor_id()->MergeFrom(executorId);
- out.set_data(data);
- send(executor->pid, out);
-
- statistics.valid_framework_messages++;
- }
- } else {
+ if (framework == NULL) {
LOG(WARNING) << "Dropping message for framework "<< frameworkId
<< " because framework does not exist";
- statistics.invalid_framework_messages++;
+ stats.invalidFrameworkMessages++;
+ return;
+ }
+
+ Executor* executor = framework->getExecutor(executorId);
+ if (executor == NULL) {
+ LOG(WARNING) << "Dropping message for executor '"
+ << executorId << "' of framework " << frameworkId
+ << " because executor does not exist";
+ stats.invalidFrameworkMessages++;
+ } else if (!executor->pid) {
+ // TODO(*): If executor is not started, queue framework message?
+ // (It's probably okay to just drop it since frameworks can have
+ // the executor send a message to the master to say when it's ready.)
+ LOG(WARNING) << "Dropping message for executor '"
+ << executorId << "' of framework " << frameworkId
+ << " because executor is not running";
+ stats.invalidFrameworkMessages++;
+ } else {
+ FrameworkToExecutorMessage message;
+ message.mutable_slave_id()->MergeFrom(slaveId);
+ message.mutable_framework_id()->MergeFrom(frameworkId);
+ message.mutable_executor_id()->MergeFrom(executorId);
+ message.set_data(data);
+ send(executor->pid, message);
+
+ stats.validFrameworkMessages++;
}
}
@@ -505,25 +786,94 @@ void Slave::updateFramework(const Framew
}
-void Slave::statusUpdateAck(const FrameworkID& frameworkId,
- const SlaveID& slaveId,
- const TaskID& taskId)
+void Slave::statusUpdateAcknowledgement(const SlaveID& slaveId,
+ const FrameworkID& frameworkId,
+ const TaskID& taskId,
+ const string& uuid)
{
Framework* framework = getFramework(frameworkId);
if (framework != NULL) {
- foreachpair (double deadline, _, framework->statuses) {
- if (framework->statuses[deadline].count(taskId) > 0) {
- LOG(INFO) << "Got acknowledgement of status update"
- << " for task " << taskId
- << " of framework " << framework->frameworkId;
- framework->statuses[deadline].erase(taskId);
- break;
- }
+ if (framework->updates.contains(UUID::fromBytes(uuid))) {
+ LOG(INFO) << "Got acknowledgement of status update"
+ << " for task " << taskId
+ << " of framework " << frameworkId;
+ framework->updates.erase(UUID::fromBytes(uuid));
}
}
}
+// void Slave::statusUpdateAcknowledged(const SlaveID& slaveId,
+// const FrameworkID& frameworkId,
+// const TaskID& taskId,
+// uint32_t sequence)
+// {
+// StatusUpdateStreamID id(frameworkId, taskId);
+// StatusUpdateStream* stream = getStatusUpdateStream(id);
+
+// if (stream == NULL) {
+// LOG(WARNING) << "WARNING! Received unexpected status update"
+// << " acknowledgement for task " << taskId
+// << " of framework " << frameworkId;
+// return;
+// }
+
+// CHECK(!stream->pending.empty());
+
+// const StatusUpdate& update = stream->pending.front();
+
+// if (update->sequence() != sequence) {
+// LOG(WARNING) << "WARNING! Received status update acknowledgement"
+// << " with bad sequence number (received " << sequence
+// << ", expecting " << update->sequence()
+// << ") for task " << taskId
+// << " of framework " << frameworkId;
+// } else {
+// LOG(INFO) << "Received status update acknowledgement for task "
+// << taskId << " of framework " << frameworkId;
+
+// // Write the update out to disk.
+// CHECK(stream->acknowledged != NULL);
+
+// Result<bool> result =
+// utils::protobuf::write(stream->acknowledged, update);
+
+// if (result.isError()) {
+// // Failing here is rather dramatic, but so is not being able to
+// // write to disk ... seems like failing early and often might do
+// // more benefit than harm.
+// LOG(FATAL) << "Failed to write status update to "
+// << stream->directory << "/acknowledged: "
+// << result.message();
+// }
+
+// stream->pending.pop();
+
+// bool empty = stream->pending.empty();
+
+// bool terminal =
+// update.status().state() == TASK_FINISHED &&
+// update.status().state() == TASK_FAILED &&
+// update.status().state() == TASK_KILLED &&
+// update.status().state() == TASK_LOST;
+
+// if (empty && terminal) {
+// cleanupStatusUpdateStream(stream);
+// } else if (!empty && terminal) {
+// LOG(WARNING) << "WARNING! Acknowledged a \"terminal\""
+// << " task status but updates are still pending";
+// } else if (!empty) {
+// StatusUpdateMessage message;
+// message.mutable_update()->MergeFrom(stream->pending.front());
+// message.set_reliable(true);
+// send(master, message);
+
+// stream->timeout = elapsedTime() + STATUS_UPDATE_RETRY_INTERVAL;
+// }
+// }
+// }
+
+
void Slave::registerExecutor(const FrameworkID& frameworkId,
const ExecutorID& executorId)
{
@@ -531,172 +881,334 @@ void Slave::registerExecutor(const Frame
<< "' of framework " << frameworkId;
Framework* framework = getFramework(frameworkId);
- if (framework != NULL) {
- Executor* executor = framework->getExecutor(executorId);
-
- // Check the status of the executor.
- if (executor == NULL) {
- LOG(WARNING) << "Not expecting executor '" << executorId
- << "' of framework " << frameworkId;
- send(from(), S2E_KILL_EXECUTOR);
- } else if (executor->pid != UPID()) {
- LOG(WARNING) << "Not good, executor '" << executorId
- << "' of framework " << frameworkId
- << " is already running";
- send(from(), S2E_KILL_EXECUTOR);
- } else {
- // Save the pid for the executor.
- executor->pid = from();
-
- // Now that the executor is up, set its resource limits.
- isolationModule->resourcesChanged(framework, executor);
-
- // Tell executor it's registered and give it any queued tasks.
- MSG<S2E_REGISTER_REPLY> out;
- ExecutorArgs* args = out.mutable_args();
- args->mutable_framework_id()->MergeFrom(framework->frameworkId);
- args->mutable_executor_id()->MergeFrom(executor->info.executor_id());
- args->mutable_slave_id()->MergeFrom(slaveId);
- args->set_hostname(slave.hostname());
- args->set_data(framework->info.executor().data());
- send(executor->pid, out);
- sendQueuedTasks(framework, executor);
- }
- } else {
+ if (framework == NULL) {
// Framework is gone; tell the executor to exit.
LOG(WARNING) << "Framework " << frameworkId
<< " does not exist (it may have been killed),"
<< " telling executor to exit";
+ send(from(), ShutdownExecutorMessage());
+ return;
+ }
+
+ Executor* executor = framework->getExecutor(executorId);
- // TODO(benh): Don't we also want to tell the isolation
- // module to shut this guy down!
- send(from(), S2E_KILL_EXECUTOR);
+ // Check the status of the executor.
+ if (executor == NULL) {
+ LOG(WARNING) << "WARNING! Unexpected executor '" << executorId
+ << "' registering for framework " << frameworkId;
+ send(from(), ShutdownExecutorMessage());
+ } else if (executor->pid) {
+ LOG(WARNING) << "WARNING! executor '" << executorId
+ << "' of framework " << frameworkId
+ << " is already running";
+ send(from(), ShutdownExecutorMessage());
+ } else {
+ // Save the pid for the executor.
+ executor->pid = from();
+
+ // Now that the executor is up, set its resource limits.
+ dispatch(isolationModule,
+ &IsolationModule::resourcesChanged,
+ framework->id, executor->id, executor->resources);
+
+ // Tell executor it's registered and give it any queued tasks.
+ ExecutorRegisteredMessage message;
+ ExecutorArgs* args = message.mutable_args();
+ args->mutable_framework_id()->MergeFrom(framework->id);
+ args->mutable_executor_id()->MergeFrom(executor->id);
+ args->mutable_slave_id()->MergeFrom(id);
+ args->set_hostname(info.hostname());
+ args->set_data(executor->info.data());
+ send(executor->pid, message);
+
+ LOG(INFO) << "Flushing queued tasks for framework " << framework->id;
+
+ foreachvalue (const TaskDescription& task, executor->queuedTasks) {
+ // Add the task to the executor.
+ executor->addTask(task);
+
+ stats.tasks[TASK_STARTING]++;
+
+ RunTaskMessage message;
+ message.mutable_framework_id()->MergeFrom(framework->id);
+ message.mutable_framework()->MergeFrom(framework->info);
+ message.set_pid(framework->pid);
+ message.mutable_task()->MergeFrom(task);
+ send(executor->pid, message);
+ }
+
+ executor->queuedTasks.clear();
}
}
-void Slave::statusUpdate(const FrameworkID& frameworkId,
- const TaskStatus& status)
+// void Slave::statusUpdate(const StatusUpdate& update)
+// {
+// LOG(INFO) << "Received update that task " << update.status().task_id()
+// << " of framework " << update.framework_id()
+// << " is now in state " << update.status().state();
+
+// Framework* framework = getFramework(update.framework_id());
+// if (framework == NULL) {
+// LOG(WARNING) << "WARNING! Failed to lookup"
+// << " framework " << update.framework_id()
+// << " of received status update";
+// stats.invalidStatusUpdates++;
+// return;
+// }
+
+// Executor* executor = framework->getExecutor(update.status().task_id());
+// if (executor == NULL) {
+// LOG(WARNING) << "WARNING! Failed to lookup executor"
+// << " for framework " << update.framework_id()
+// << " of received status update";
+// stats.invalidStatusUpdates++;
+// return;
+// }
+
+// // Create/Get the status update stream for this framework/task.
+// StatusUpdateStreamID id(update.framework_id(), update.status().task_id());
+
+// if (!statusUpdateStreams.contains(id)) {
+// StatusUpdateStream* stream =
+// createStatusUpdateStream(id, executor->directory);
+
+// if (stream == NULL) {
+// LOG(WARNING) << "WARNING! Failed to create status update"
+// << " stream for task " << update.status().task_id()
+// << " of framework " << update.framework_id()
+// << " ... removing executor!";
+// removeExecutor(framework, executor);
+// return;
+// }
+// }
+
+// StatusUpdateStream* stream = getStatusUpdateStream(id);
+
+// CHECK(stream != NULL);
+
+// // If we are already waiting on an acknowledgement, check that this
+// // update (coming from the executor), is the same one that we are
+// // waiting on being acknowledged.
+
+// // Check that this is status update has not already been
+// // acknowledged. this could happen because a slave writes the
+// // acknowledged message but then fails before it can pass the
+// // message on to the executor, so the executor tries again.
+
+// returnhere;
+
+// // TODO(benh): Check that this update hasn't already been received
+// // or acknowledged! This could happen if a slave receives a status
+// // update from an executor, then crashes after it writes it to disk
+// // but before it sends an ack back to
+
+// // Okay, record this update as received.
+// CHECK(stream->received != NULL);
+
+// Result<bool> result =
+// utils::protobuf::write(stream->received, &update);
+
+// if (result.isError()) {
+// // Failing here is rather dramatic, but so is not being able to
+// // write to disk ... seems like failing early and often might do
+// // more benefit than harm.
+// LOG(FATAL) << "Failed to write status update to "
+// << stream->directory << "/received: "
+// << result.message();
+// }
+
+// // Now acknowledge the executor.
+// StatusUpdateAcknowledgementMessage message;
+// message.mutable_framework_id()->MergeFrom(update.framework_id());
+// message.mutable_slave_id()->MergeFrom(update.slave_id());
+// message.mutable_task_id()->MergeFrom(update.status().task_id());
+// send(executor->pid, message);
+
+// executor->updateTaskState(
+// update.status().task_id(),
+// update.status().state());
+
+// // Remove the task if it's reached a terminal state.
+// bool terminal =
+// update.status().state() == TASK_FINISHED &&
+// update.status().state() == TASK_FAILED &&
+// update.status().state() == TASK_KILLED &&
+// update.status().state() == TASK_LOST;
+
+// if (terminal) {
+// executor->removeTask(update.status().task_id());
+// isolationModule->resourcesChanged(
+// framework->id, framework->info,
+// executor->info, executor->resources);
+// }
+
+// stream->pending.push(update);
+
+// // Send the status update if this is the first in the
+// // stream. Subsequent status updates will get sent in
+// // Slave::statusUpdateAcknowledged.
+// if (stream->pending.size() == 1) {
+// CHECK(stream->timeout == -1);
+// StatusUpdateMessage message;
+// message.mutable_update()->MergeFrom(update);
+// message.set_reliable(true);
+// send(master, message);
+
+// stream->timeout = elapsedTime() + STATUS_UPDATE_RETRY_INTERVAL;
+// }
+
+// stats.tasks[status.state()]++;
+// stats.validStatusUpdates++;
+// }
+
+void Slave::statusUpdate(const StatusUpdate& update)
{
+ const TaskStatus& status = update.status();
+
LOG(INFO) << "Status update: task " << status.task_id()
- << " of framework " << frameworkId
- << " is now in state "
- << TaskState_descriptor()->FindValueByNumber(status.state())->name();
+ << " of framework " << update.framework_id()
+ << " is now in state " << status.state();
- Framework* framework = getFramework(frameworkId);
+ Framework* framework = getFramework(update.framework_id());
if (framework != NULL) {
Executor* executor = framework->getExecutor(status.task_id());
if (executor != NULL) {
executor->updateTaskState(status.task_id(), status.state());
- // Remove the task if necessary, and update statistics.
- switch (status.state()) {
- case TASK_FINISHED:
- statistics.finished_tasks++;
- executor->removeTask(status.task_id());
- isolationModule->resourcesChanged(framework, executor);
- break;
- case TASK_FAILED:
- statistics.failed_tasks++;
- executor->removeTask(status.task_id());
- isolationModule->resourcesChanged(framework, executor);
- break;
- case TASK_KILLED:
- statistics.killed_tasks++;
- executor->removeTask(status.task_id());
- isolationModule->resourcesChanged(framework, executor);
- break;
- case TASK_LOST:
- statistics.lost_tasks++;
- executor->removeTask(status.task_id());
- isolationModule->resourcesChanged(framework, executor);
- break;
+ // Handle the task appropriately if it's terminated.
+ if (status.state() == TASK_FINISHED ||
+ status.state() == TASK_FAILED ||
+ status.state() == TASK_KILLED ||
+ status.state() == TASK_LOST) {
+ executor->removeTask(status.task_id());
+
+ dispatch(isolationModule,
+ &IsolationModule::resourcesChanged,
+ framework->id, executor->id, executor->resources);
}
// Send message and record the status for possible resending.
- MSG<S2M_STATUS_UPDATE> out;
- out.mutable_framework_id()->MergeFrom(frameworkId);
- out.mutable_status()->MergeFrom(status);
- send(master, out);
+ StatusUpdateMessage message;
+ message.mutable_update()->MergeFrom(update);
+ message.set_pid(self());
+ send(master, message);
- double deadline = elapsedTime() + STATUS_UPDATE_RETRY_TIMEOUT;
- framework->statuses[deadline][status.task_id()] = status;
+ UUID uuid = UUID::fromBytes(update.uuid());
- statistics.valid_status_updates++;
+ // Send us a message to try and resend after some delay.
+ delay(STATUS_UPDATE_RETRY_INTERVAL_SECONDS,
+ self(), &Slave::statusUpdateTimeout,
+ framework->id, uuid);
+
+ framework->updates[uuid] = update;
+
+ stats.tasks[status.state()]++;
+
+ stats.validStatusUpdates++;
} else {
LOG(WARNING) << "Status update error: couldn't lookup "
- << "executor for framework " << frameworkId;
- statistics.invalid_status_updates++;
+ << "executor for framework " << update.framework_id();
+ stats.invalidStatusUpdates++;
}
} else {
LOG(WARNING) << "Status update error: couldn't lookup "
- << "framework " << frameworkId;
- statistics.invalid_status_updates++;
+ << "framework " << update.framework_id();
+ stats.invalidStatusUpdates++;
}
}
void Slave::executorMessage(const SlaveID& slaveId,
- const FrameworkID& frameworkId,
- const ExecutorID& executorId,
+ const FrameworkID& frameworkId,
+ const ExecutorID& executorId,
const string& data)
{
Framework* framework = getFramework(frameworkId);
- if (framework != NULL) {
- LOG(INFO) << "Sending message for framework " << frameworkId
- << " to " << framework->pid;
-
- // TODO(benh): This is weird, sending an M2F message.
- MSG<M2F_FRAMEWORK_MESSAGE> out;
- out.mutable_slave_id()->MergeFrom(slaveId);
- out.mutable_framework_id()->MergeFrom(frameworkId);
- out.mutable_executor_id()->MergeFrom(executorId);
- out.set_data(data);
- send(framework->pid, out);
-
- statistics.valid_framework_messages++;
- } else {
+ if (framework == NULL) {
LOG(WARNING) << "Cannot send framework message from slave "
<< slaveId << " to framework " << frameworkId
<< " because framework does not exist";
- statistics.invalid_framework_messages++;
+ stats.invalidFrameworkMessages++;
+ return;
}
+
+ LOG(INFO) << "Sending message for framework " << frameworkId
+ << " to " << framework->pid;
+
+ ExecutorToFrameworkMessage message;
+ message.mutable_slave_id()->MergeFrom(slaveId);
+ message.mutable_framework_id()->MergeFrom(frameworkId);
+ message.mutable_executor_id()->MergeFrom(executorId);
+ message.set_data(data);
+ send(framework->pid, message);
+
+ stats.validFrameworkMessages++;
}
void Slave::ping()
{
- send(from(), PONG);
+ send(from(), "PONG");
}
-void Slave::timeout()
+void Slave::statusUpdateTimeout(
+ const FrameworkID& frameworkId,
+ const UUID& uuid)
{
- // Check and see if we should re-send any status updates.
- foreachpair (_, Framework* framework, frameworks) {
- foreachpair (double deadline, _, framework->statuses) {
- if (deadline <= elapsedTime()) {
- foreachpair (_, const TaskStatus& status, framework->statuses[deadline]) {
- LOG(WARNING) << "Resending status update"
- << " for task " << status.task_id()
- << " of framework " << framework->frameworkId;
- MSG<S2M_STATUS_UPDATE> out;
- out.mutable_framework_id()->MergeFrom(framework->frameworkId);
- out.mutable_status()->MergeFrom(status);
- send(master, out);
- }
- }
+ // Check and see if we still need to send this update.
+ Framework* framework = getFramework(frameworkId);
+ if (framework != NULL) {
+ if (framework->updates.contains(uuid)) {
+ const StatusUpdate& update = framework->updates[uuid];
+
+ LOG(INFO) << "Resending status update"
+ << " for task " << update.status().task_id()
+ << " of framework " << update.framework_id();
+
+ StatusUpdateMessage message;
+ message.mutable_update()->MergeFrom(update);
+ message.set_pid(self());
+ send(master, message);
}
}
}
+
+// void Slave::timeout()
+// {
+// // Check and see if we should re-send any status updates.
+// double now = elapsedTime();
+
+// foreachvalue (StatusUpdateStream* stream, statusUpdateStreams) {
+// CHECK(stream->timeout > 0);
+// if (stream->timeout < now) {
+// CHECK(!stream->pending.empty());
+// const StatusUpdate& update = stream->pending.front();
+
+// LOG(WARNING) << "WARNING! Resending status update"
+// << " for task " << update.status().task_id()
+// << " of framework " << update.framework_id();
+
+// StatusUpdateMessage message;
+// message.mutable_update()->MergeFrom(update);
+// message.set_reliable(true);
+// send(master, message);
+
+// stream->timeout = now + STATUS_UPDATE_RETRY_INTERVAL;
+// }
+// }
+// }
+
+
void Slave::exited()
{
LOG(INFO) << "Process exited: " << from();
if (from() == master) {
- LOG(WARNING) << "Master disconnected! "
- << "Waiting for a new master to be elected.";
+ LOG(WARNING) << "WARNING! Master disconnected!"
+ << " Waiting for a new master to be elected.";
// TODO(benh): After so long waiting for a master, commit suicide.
}
}
@@ -706,7 +1218,7 @@ Promise<HttpResponse> Slave::http_info_j
{
LOG(INFO) << "HTTP request for '/slave/info.json'";
- ostringstream out;
+ std::ostringstream out;
out <<
"{" <<
@@ -718,7 +1230,7 @@ Promise<HttpResponse> Slave::http_info_j
HttpOKResponse response;
response.headers["Content-Type"] = "text/x-json;charset=UTF-8";
- response.headers["Content-Length"] = lexical_cast<string>(out.str().size());
+ response.headers["Content-Length"] = utils::stringify(out.str().size());
response.body = out.str().data();
return response;
}
@@ -728,14 +1240,14 @@ Promise<HttpResponse> Slave::http_framew
{
LOG(INFO) << "HTTP request for '/slave/frameworks.json'";
- ostringstream out;
+ std::ostringstream out;
out << "[";
- foreachpair (_, Framework* framework, frameworks) {
+ foreachvalue (Framework* framework, frameworks) {
out <<
"{" <<
- "\"id\":\"" << framework->frameworkId << "\"," <<
+ "\"id\":\"" << framework->id << "\"," <<
"\"name\":\"" << framework->info.name() << "\"," <<
"\"user\":\"" << framework->info.user() << "\""
"},";
@@ -751,7 +1263,7 @@ Promise<HttpResponse> Slave::http_framew
HttpOKResponse response;
response.headers["Content-Type"] = "text/x-json;charset=UTF-8";
- response.headers["Content-Length"] = lexical_cast<string>(out.str().size());
+ response.headers["Content-Length"] = utils::stringify(out.str().size());
response.body = out.str().data();
return response;
}
@@ -761,26 +1273,24 @@ Promise<HttpResponse> Slave::http_tasks_
{
LOG(INFO) << "HTTP request for '/slave/tasks.json'";
- ostringstream out;
+ std::ostringstream out;
out << "[";
- foreachpair (_, Framework* framework, frameworks) {
- foreachpair (_, Executor* executor, framework->executors) {
- foreachpair (_, Task* task, executor->tasks) {
+ foreachvalue (Framework* framework, frameworks) {
+ foreachvalue (Executor* executor, framework->executors) {
+ foreachvalue (Task* task, executor->launchedTasks) {
// TODO(benh): Send all of the resources (as JSON).
Resources resources(task->resources());
Resource::Scalar cpus = resources.getScalar("cpus", Resource::Scalar());
Resource::Scalar mem = resources.getScalar("mem", Resource::Scalar());
- const string& state =
- TaskState_descriptor()->FindValueByNumber(task->state())->name();
out <<
"{" <<
"\"task_id\":\"" << task->task_id() << "\"," <<
"\"framework_id\":\"" << task->framework_id() << "\"," <<
"\"slave_id\":\"" << task->slave_id() << "\"," <<
"\"name\":\"" << task->name() << "\"," <<
- "\"state\":\"" << state << "\"," <<
+ "\"state\":\"" << task->state() << "\"," <<
"\"cpus\":" << cpus.value() << "," <<
"\"mem\":" << mem.value() <<
"},";
@@ -798,7 +1308,7 @@ Promise<HttpResponse> Slave::http_tasks_
HttpOKResponse response;
response.headers["Content-Type"] = "text/x-json;charset=UTF-8";
- response.headers["Content-Length"] = lexical_cast<string>(out.str().size());
+ response.headers["Content-Length"] = utils::stringify(out.str().size());
response.body = out.str().data();
return response;
}
@@ -808,26 +1318,28 @@ Promise<HttpResponse> Slave::http_stats_
{
LOG(INFO) << "Http request for '/slave/stats.json'";
- ostringstream out;
+ std::ostringstream out;
+
+ out << std::setprecision(10);
out <<
"{" <<
"\"uptime\":" << elapsedTime() - startTime << "," <<
"\"total_frameworks\":" << frameworks.size() << "," <<
- "\"launched_tasks\":" << statistics.launched_tasks << "," <<
- "\"finished_tasks\":" << statistics.finished_tasks << "," <<
- "\"killed_tasks\":" << statistics.killed_tasks << "," <<
- "\"failed_tasks\":" << statistics.failed_tasks << "," <<
- "\"lost_tasks\":" << statistics.lost_tasks << "," <<
- "\"valid_status_updates\":" << statistics.valid_status_updates << "," <<
- "\"invalid_status_updates\":" << statistics.invalid_status_updates << "," <<
- "\"valid_framework_messages\":" << statistics.valid_framework_messages << "," <<
- "\"invalid_framework_messages\":" << statistics.invalid_framework_messages <<
+ "\"started_tasks\":" << stats.tasks[TASK_STARTING] << "," <<
+ "\"finished_tasks\":" << stats.tasks[TASK_FINISHED] << "," <<
+ "\"killed_tasks\":" << stats.tasks[TASK_KILLED] << "," <<
+ "\"failed_tasks\":" << stats.tasks[TASK_FAILED] << "," <<
+ "\"lost_tasks\":" << stats.tasks[TASK_LOST] << "," <<
+ "\"valid_status_updates\":" << stats.validStatusUpdates << "," <<
+ "\"invalid_status_updates\":" << stats.invalidStatusUpdates << "," <<
+ "\"valid_framework_messages\":" << stats.validFrameworkMessages << "," <<
+ "\"invalid_framework_messages\":" << stats.invalidFrameworkMessages <<
"}";
HttpOKResponse response;
response.headers["Content-Type"] = "text/x-json;charset=UTF-8";
- response.headers["Content-Length"] = lexical_cast<string>(out.str().size());
+ response.headers["Content-Length"] = utils::stringify(out.str().size());
response.body = out.str().data();
return response;
}
@@ -837,7 +1349,7 @@ Promise<HttpResponse> Slave::http_vars(c
{
LOG(INFO) << "HTTP request for '/slave/vars'";
- ostringstream out;
+ std::ostringstream out;
out <<
"build_date " << build::DATE << "\n" <<
@@ -849,22 +1361,24 @@ Promise<HttpResponse> Slave::http_vars(c
out << key << " " << value << "\n";
}
+ out << std::setprecision(10);
+
out <<
"uptime " << elapsedTime() - startTime << "\n" <<
"total_frameworks " << frameworks.size() << "\n" <<
- "launched_tasks " << statistics.launched_tasks << "\n" <<
- "finished_tasks " << statistics.finished_tasks << "\n" <<
- "killed_tasks " << statistics.killed_tasks << "\n" <<
- "failed_tasks " << statistics.failed_tasks << "\n" <<
- "lost_tasks " << statistics.lost_tasks << "\n" <<
- "valid_status_updates " << statistics.valid_status_updates << "\n" <<
- "invalid_status_updates " << statistics.invalid_status_updates << "\n" <<
- "valid_framework_messages " << statistics.valid_framework_messages << "\n" <<
- "invalid_framework_messages " << statistics.invalid_framework_messages << "\n";
+ "started_tasks " << stats.tasks[TASK_STARTING] << "\n" <<
+ "finished_tasks " << stats.tasks[TASK_FINISHED] << "\n" <<
+ "killed_tasks " << stats.tasks[TASK_KILLED] << "\n" <<
+ "failed_tasks " << stats.tasks[TASK_FAILED] << "\n" <<
+ "lost_tasks " << stats.tasks[TASK_LOST] << "\n" <<
+ "valid_status_updates " << stats.validStatusUpdates << "\n" <<
+ "invalid_status_updates " << stats.invalidStatusUpdates << "\n" <<
+ "valid_framework_messages " << stats.validFrameworkMessages << "\n" <<
+ "invalid_framework_messages " << stats.invalidFrameworkMessages << "\n";
HttpOKResponse response;
response.headers["Content-Type"] = "text/plain";
- response.headers["Content-Length"] = lexical_cast<string>(out.str().size());
+ response.headers["Content-Length"] = utils::stringify(out.str().size());
response.body = out.str().data();
return response;
}
@@ -880,103 +1394,318 @@ Framework* Slave::getFramework(const Fra
}
-// Send any tasks queued up for the given framework to its executor
-// (needed if we received tasks while the executor was starting up)
-void Slave::sendQueuedTasks(Framework* framework, Executor* executor)
+// StatusUpdates* Slave::getStatusUpdateStream(const StatusUpdateStreamID& id)
+// {
+// if (statusUpdateStreams.contains(id)) {
+// return statusUpdateStreams[id];
+// }
+
+// return NULL;
+// }
+
+
+// StatusUpdateStream* Slave::createStatusUpdateStream(
+// const FrameworkID& frameworkId,
+// const TaskID& taskId,
+// const string& directory)
+// {
+// StatusUpdateStream* stream = new StatusUpdates();
+// stream->id = id;
+// stream->directory = directory;
+// stream->received = NULL;
+// stream->acknowledged = NULL;
+// stream->timeout = -1;
+
+// streams[id] = stream;
+
+// // Open file descriptors for "updates" and "acknowledged".
+// string path;
+// Result<int> result;
+
+// path = stream->directory + "/received";
+// result = utils::os::open(path, O_CREAT | O_RDWR | O_SYNC);
+// if (result.isError() || result.isNone()) {
+// LOG(WARNING) << "Failed to open " << path
+// << " for storing received status updates";
+// cleanupStatusUpdateStream(stream);
+// return NULL;
+// }
+
+// stream->received = result.get();
+
+// path = updates->directory + "/acknowledged";
+// result = utils::os::open(path, O_CREAT | O_RDWR | O_SYNC);
+// if (result.isError() || result.isNone()) {
+// LOG(WARNING) << "Failed to open " << path <<
+// << " for storing acknowledged status updates";
+// cleanupStatusUpdateStream(stream);
+// return NULL;
+// }
+
+// stream->acknowledged = result.get();
+
+// // Replay the status updates. This is necessary because the slave
+// // might have crashed but was restarted before the executors
+// // died. Or another task with the same id as before got run again on
+// // the same executor.
+// bool replayed = replayStatusUpdateStream(stream);
+
+// if (!replayed) {
+// LOG(WARNING) << "Failed to correctly replay status updates"
+// << " for task " << taskId
+// << " of framework " << frameworkId
+// << " found at " << path;
+// cleanupStatusUpdateStream(stream);
+// return NULL;
+// }
+
+// // Start sending any pending status updates. In this case, the slave
+// // probably died after it sent the status update and never received
+// // the acknowledgement.
+// if (!stream->pending.empty()) {
+// StatusUpdate* update = stream->pending.front();
+// StatusUpdateMessage message;
+// message.mutable_update()->MergeFrom(*update);
+// message.set_reliable(true);
+// send(master, message);
+
+// stream->timeout = elapsedTime() + STATUS_UPDATE_RETRY_INTERVAL;
+// }
+
+// return stream;
+// }
+
+
+// bool Slave::replayStatusUpdateStream(StatusUpdateStream* stream)
+// {
+// CHECK(stream->received != NULL);
+// CHECK(stream->acknowledged != NULL);
+
+// Result<StatusUpdate*> result;
+
+// // Okay, now read all the recevied status updates.
+// hashmap<uint32_t, StatusUpdate> pending;
+
+// result = utils::protobuf::read(stream->received);
+// while (result.isSome()) {
+// StatusUpdate* update = result.get();
+// CHECK(!pending.contains(update->sequence()));
+// pending[update->sequence()] = *update;
+// delete update;
+// result = utils::protobuf::read(stream->received);
+// }
+
+// if (result.isError()) {
+// return false;
+// }
+
+// CHECK(result.isNone());
+
+// LOG(INFO) << "Recovered " << pending.size()
+// << " TOTAL status updates for task "
+// << stream->id.second << " of framework "
+// << stream->id.first;
+
+// // Okay, now get all the acknowledged status updates.
+// result = utils::protobuf::read(stream->acknowledged);
+// while (result.isSome()) {
+// StatusUpdate* update = result.get();
+// stream->sequence = std::max(stream->sequence, update->sequence());
+// CHECK(pending.contains(update->sequence()));
+// pending.erase(update->sequence());
+// delete update;
+// result = utils::protobuf::read(stream->acknowledged);
+// }
+
+// if (result.isError()) {
+// return false;
+// }
+
+// CHECK(result.isNone());
+
+// LOG(INFO) << "Recovered " << pending.size()
+// << " PENDING status updates for task "
+// << stream->id.second << " of framework "
+// << stream->id.first;
+
+// // Add the pending status updates in sorted order.
+// uint32_t sequence = 0;
+
+// while (!pending.empty()) {
+// // Find the smallest sequence number.
+// foreachvalue (const StatusUpdate& update, pending) {
+// sequence = std::min(sequence, update.sequence());
+// }
+
+// // Push that update and remove it from pending.
+// stream->pending.push(pending[sequence]);
+// pending.erase(sequence);
+// }
+
+// return true;
+// }
+
+
+// void Slave::cleanupStatusUpdateStream(StatusUpdateStream* stream)
+// {
+// if (stream->received != NULL) {
+// fclose(stream->received);
+// }
+
+// if (stream->acknowledged != NULL) {
+// fclose(stream->acknowledged);
+// }
+
+// streams.erase(stream->id);
+
+// delete stream;
+// }
+
+
+// N.B. When the slave is running in "local" mode then the pid is
+// uninteresting (and possibly could cause bugs).
+void Slave::executorStarted(const FrameworkID& frameworkId,
+ const ExecutorID& executorId,
+ pid_t pid)
+{
+ LOG(INFO) << "Executor '" << executorId << "' of framework "
+ << frameworkId << " has started at " << pid;
+}
+
+
+// Called by the isolation module when an executor process exits.
+void Slave::executorExited(const FrameworkID& frameworkId,
+ const ExecutorID& executorId,
+ int status)
{
- LOG(INFO) << "Flushing queued tasks for framework "
- << framework->frameworkId;
+ Framework* framework = getFramework(frameworkId);
+ if (framework == NULL) {
+ LOG(WARNING) << "WARNING! Unknown executor '" << executorId
+ << "' of unknown framework " << frameworkId
+ << " has exited with status " << status;
+ return;
+ }
- CHECK(executor->pid != UPID());
+ Executor* executor = framework->getExecutor(executorId);
+ if (executor == NULL) {
+ LOG(WARNING) << "WARNING! UNKNOWN executor '" << executorId
+ << "' of framework " << frameworkId
+ << " has exited with status " << status;
+ return;
+ }
- foreach (const TaskDescription& task, executor->queuedTasks) {
- // Add the task to the executor.
- executor->addTask(task);
+ LOG(INFO) << "Executor '" << executorId
+ << "' of framework " << frameworkId
+ << " has exited with status " << status;
- MSG<S2E_RUN_TASK> out;
- out.mutable_framework()->MergeFrom(framework->info);
- out.mutable_framework_id()->MergeFrom(framework->frameworkId);
- out.set_pid(framework->pid);
- out.mutable_task()->MergeFrom(task);
- send(executor->pid, out);
- }
+ ExitedExecutorMessage message;
+ message.mutable_slave_id()->MergeFrom(id);
+ message.mutable_framework_id()->MergeFrom(frameworkId);
+ message.mutable_executor_id()->MergeFrom(executorId);
+ message.set_status(status);
+ send(master, message);
- executor->queuedTasks.clear();
-}
+ // TODO(benh): Send status updates for remaining tasks here rather
+ // than at the master! As in, eliminate the code in
+ // Master::exitedExecutor and put it here.
+ framework->destroyExecutor(executor->id);
-// Kill a framework (including its executor if killExecutor is true).
-void Slave::killFramework(Framework *framework, bool killExecutors)
-{
- LOG(INFO) << "Cleaning up framework " << framework->frameworkId;
+ // Cleanup if this framework has nothing running.
+ if (framework->executors.size() == 0) {
+ // TODO(benh): But there might be some remaining status updates
+ // that haven't been acknowledged!
+ frameworks.erase(framework->id);
+ delete framework;
+ }
+}
- // Shutdown all executors of this framework.
- foreachpaircopy (const ExecutorID& executorId, Executor* executor, framework->executors) {
- if (killExecutors) {
- LOG(INFO) << "Killing executor '" << executorId
- << "' of framework " << framework->frameworkId;
- send(executor->pid, S2E_KILL_EXECUTOR);
+void Slave::shutdownExecutor(Framework* framework, Executor* executor)
+{
+ LOG(INFO) << "Shutting down executor '" << executor->id
+ << "' of framework " << framework->id;
- // TODO(benh): There really isn't ANY time between when an
- // executor gets a S2E_KILL_EXECUTOR message and the isolation
- // module goes and kills it. We should really think about making
- // the semantics of this better.
+ send(executor->pid, ShutdownExecutorMessage());
- isolationModule->killExecutor(framework, executor);
- }
+ executor->shutdown = true;
- framework->destroyExecutor(executorId);
- }
+ // Prepare for sending a kill if the executor doesn't comply.
+ double timeout = conf.get<double>("executor_shutdown_timeout_seconds",
+ EXECUTOR_SHUTDOWN_TIMEOUT_SECONDS);
- frameworks.erase(framework->frameworkId);
- delete framework;
+ delay(timeout, self(),
+ &Slave::shutdownExecutorTimeout,
+ framework->id, executor->id, executor->uuid);
}
-// Called by isolation module when an executor process exits
-// TODO(benh): Make this callback be a message so that we can avoid
-// race conditions.
-void Slave::executorExited(const FrameworkID& frameworkId, const ExecutorID& executorId, int result)
+void Slave::shutdownExecutorTimeout(const FrameworkID& frameworkId,
+ const ExecutorID& executorId,
+ const UUID& uuid)
{
Framework* framework = getFramework(frameworkId);
- if (framework != NULL) {
- Executor* executor = framework->getExecutor(executorId);
- if (executor != NULL) {
- LOG(INFO) << "Exited executor '" << executorId
- << "' of framework " << frameworkId
- << " with result " << result;
-
- MSG<S2M_EXITED_EXECUTOR> out;
- out.mutable_slave_id()->MergeFrom(slaveId);
- out.mutable_framework_id()->MergeFrom(frameworkId);
- out.mutable_executor_id()->MergeFrom(executorId);
- out.set_result(result);
- send(master, out);
-
- framework->destroyExecutor(executorId);
-
- // TODO(benh): When should we kill the presence of an entire
- // framework on a slave?
- if (framework->executors.size() == 0) {
- killFramework(framework);
- }
- } else {
- LOG(WARNING) << "UNKNOWN executor '" << executorId
- << "' of framework " << frameworkId
- << " has exited with result " << result;
+ if (framework == NULL) {
+ return;
+ }
+
+ Executor* executor = framework->getExecutor(executorId);
+ if (executor == NULL) {
+ return;
+ }
+
+ // Make sure this timeout is valid.
+ if (executor->uuid == uuid) {
+ LOG(INFO) << "Killing executor '" << executor->id
+ << "' of framework " << framework->id;
+
+ dispatch(isolationModule,
+ &IsolationModule::killExecutor,
+ framework->id, executor->id);
+
+ ExitedExecutorMessage message;
+ message.mutable_slave_id()->MergeFrom(id);
+ message.mutable_framework_id()->MergeFrom(frameworkId);
+ message.mutable_executor_id()->MergeFrom(executorId);
+ message.set_status(-1);
+ send(master, message);
+
+ // TODO(benh): Send status updates for remaining tasks here rather
+ // than at the master! As in, eliminate the code in
+ // Master::exitedExecutor and put it here.
+
+ framework->destroyExecutor(executor->id);
+
+ // Cleanup if this framework has nothing running.
+ if (framework->executors.size() == 0) {
+ // TODO(benh): But there might be some remaining status updates
+ // that haven't been acknowledged!
+ frameworks.erase(framework->id);
+ delete framework;
}
- } else {
- LOG(WARNING) << "UNKNOWN executor '" << executorId
- << "' of UNKNOWN framework " << frameworkId
- << " has exited with result " << result;
}
-};
+}
+
+
+// void Slave::recover()
+// {
+// // if we find an executor that is no longer running and it's last
+// // acknowledged task statuses are not terminal, create a
+// // statusupdatestream for each task and try and reliably send
+// // TASK_LOST updates.
+
+// // otherwise once we reconnect the executor will just start sending
+// // us status updates that we need to send, wait for ack, write to
+// // disk, and then respond.
+// }
string Slave::getUniqueWorkDirectory(const FrameworkID& frameworkId,
const ExecutorID& executorId)
{
+ LOG(INFO) << "Generating a unique work directory for executor '"
+ << executorId << "' of framework " << frameworkId;
+
string workDir = ".";
if (conf.contains("work_dir")) {
workDir = conf.get("work_dir", workDir);
@@ -986,30 +1715,32 @@ string Slave::getUniqueWorkDirectory(con
workDir = workDir + "/work";
- ostringstream os(std::ios_base::app | std::ios_base::out);
- os << workDir << "/slave-" << slaveId
+ std::ostringstream out(std::ios_base::app | std::ios_base::out);
+ out << workDir << "/slave-" << id
<< "/fw-" << frameworkId << "-" << executorId;
+ // TODO(benh): Make executor id be in it's own directory.
+
// Find a unique directory based on the path given by the slave
// (this is because we might launch multiple executors from the same
// framework on this slave).
- os << "/";
+ out << "/";
string dir;
- dir = os.str();
+ dir = out.str();
for (int i = 0; i < INT_MAX; i++) {
- os << i;
- if (opendir(os.str().c_str()) == NULL && errno == ENOENT)
+ out << i;
+ if (opendir(out.str().c_str()) == NULL && errno == ENOENT)
break;
- os.str(dir);
+
+ // TODO(benh): Does one need to do any sort of closedir?
+
+ out.str(dir);
}
- return os.str();
+ return out.str();
}
-const Configuration& Slave::getConfiguration()
-{
- return conf;
-}
+}}} // namespace mesos { namespace internal { namespace slave {