You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@mesos.apache.org by vi...@apache.org on 2013/04/17 09:08:36 UTC
svn commit: r1468769 - in /incubator/mesos/trunk/src/slave:
cgroups_isolator.cpp cgroups_isolator.hpp http.cpp isolator.hpp
process_isolator.cpp process_isolator.hpp slave.cpp slave.hpp
Author: vinodkone
Date: Wed Apr 17 07:08:36 2013
New Revision: 1468769
URL: http://svn.apache.org/r1468769
Log:
Refactored slave::Framework/Executor. Included "states" as well as
added recovery methods.
Review: https://reviews.apache.org/r/10111
Modified:
incubator/mesos/trunk/src/slave/cgroups_isolator.cpp
incubator/mesos/trunk/src/slave/cgroups_isolator.hpp
incubator/mesos/trunk/src/slave/http.cpp
incubator/mesos/trunk/src/slave/isolator.hpp
incubator/mesos/trunk/src/slave/process_isolator.cpp
incubator/mesos/trunk/src/slave/process_isolator.hpp
incubator/mesos/trunk/src/slave/slave.cpp
incubator/mesos/trunk/src/slave/slave.hpp
Modified: incubator/mesos/trunk/src/slave/cgroups_isolator.cpp
URL: http://svn.apache.org/viewvc/incubator/mesos/trunk/src/slave/cgroups_isolator.cpp?rev=1468769&r1=1468768&r2=1468769&view=diff
==============================================================================
--- incubator/mesos/trunk/src/slave/cgroups_isolator.cpp (original)
+++ incubator/mesos/trunk/src/slave/cgroups_isolator.cpp Wed Apr 17 07:08:36 2013
@@ -507,15 +507,10 @@ void CgroupsIsolator::launchExecutor(
const ExecutorInfo& executorInfo,
const UUID& uuid,
const string& directory,
- const Resources& resources,
- const Option<string>& path)
+ const Resources& resources)
{
CHECK(initialized) << "Cannot launch executors before initialization";
- bool checkpoint = frameworkInfo.checkpoint();
- CHECK(!(checkpoint && path.isNone()))
- << "Asked to checkpoint forked pid without providing a path";
-
const ExecutorID& executorId = executorInfo.executor_id();
// Register the cgroup information.
@@ -593,9 +588,16 @@ void CgroupsIsolator::launchExecutor(
// have a chance to write the pid to disk. That would result in an
// orphaned executor process unknown to the slave when doing
// recovery.
- if (checkpoint) {
+ if (frameworkInfo.checkpoint()) {
+ const string& path = paths::getForkedPidPath(
+ paths::getMetaRootDir(flags.work_dir),
+ slaveId,
+ frameworkId,
+ executorId,
+ uuid);
+
std::cout << "Checkpointing forked pid " << getpid() << std::endl;
- state::checkpoint(path.get(), stringify(getpid()));
+ state::checkpoint(path, stringify(getpid()));
}
// Put self into the newly created cgroup.
Modified: incubator/mesos/trunk/src/slave/cgroups_isolator.hpp
URL: http://svn.apache.org/viewvc/incubator/mesos/trunk/src/slave/cgroups_isolator.hpp?rev=1468769&r1=1468768&r2=1468769&view=diff
==============================================================================
--- incubator/mesos/trunk/src/slave/cgroups_isolator.hpp (original)
+++ incubator/mesos/trunk/src/slave/cgroups_isolator.hpp Wed Apr 17 07:08:36 2013
@@ -105,8 +105,7 @@ public:
const ExecutorInfo& executorInfo,
const UUID& uuid,
const std::string& directory,
- const Resources& resources,
- const Option<std::string>& path);
+ const Resources& resources);
virtual void killExecutor(
const FrameworkID& frameworkId,
Modified: incubator/mesos/trunk/src/slave/http.cpp
URL: http://svn.apache.org/viewvc/incubator/mesos/trunk/src/slave/http.cpp?rev=1468769&r1=1468768&r2=1468769&view=diff
==============================================================================
--- incubator/mesos/trunk/src/slave/http.cpp (original)
+++ incubator/mesos/trunk/src/slave/http.cpp Wed Apr 17 07:08:36 2013
@@ -236,8 +236,7 @@ JSON::Object model(const Framework& fram
object.values["executors"] = executors;
JSON::Array completedExecutors;
- foreach (const std::tr1::shared_ptr<Executor>& executor,
- framework.completedExecutors) {
+ foreach (const Owned<Executor>& executor, framework.completedExecutors) {
completedExecutors.values.push_back(model(*executor));
}
object.values["completed_executors"] = completedExecutors;
@@ -334,8 +333,7 @@ Future<Response> state(
object.values["frameworks"] = frameworks;
JSON::Array completedFrameworks;
- foreach (const std::tr1::shared_ptr<Framework>& framework,
- slave.completedFrameworks) {
+ foreach (const Owned<Framework>& framework, slave.completedFrameworks) {
completedFrameworks.values.push_back(model(*framework));
}
object.values["completed_frameworks"] = completedFrameworks;
Modified: incubator/mesos/trunk/src/slave/isolator.hpp
URL: http://svn.apache.org/viewvc/incubator/mesos/trunk/src/slave/isolator.hpp?rev=1468769&r1=1468768&r2=1468769&view=diff
==============================================================================
--- incubator/mesos/trunk/src/slave/isolator.hpp (original)
+++ incubator/mesos/trunk/src/slave/isolator.hpp Wed Apr 17 07:08:36 2013
@@ -69,14 +69,13 @@ public:
// If 'checkpoint' is true, the isolator is expected to checkpoint
// the executor pid to the 'path'.
virtual void launchExecutor(
- const SlaveID& slaveId,
+ const SlaveID& slaveId, // TODO(vinod): Why not pass this to initialize?
const FrameworkID& frameworkId,
const FrameworkInfo& frameworkInfo,
const ExecutorInfo& executorInfo,
const UUID& uuid,
const std::string& directory,
- const Resources& resources,
- const Option<std::string>& path) = 0;
+ const Resources& resources) = 0;
// Terminate a framework's executor, if it is still running.
// The executor is expected to be gone after this method exits.
Modified: incubator/mesos/trunk/src/slave/process_isolator.cpp
URL: http://svn.apache.org/viewvc/incubator/mesos/trunk/src/slave/process_isolator.cpp?rev=1468769&r1=1468768&r2=1468769&view=diff
==============================================================================
--- incubator/mesos/trunk/src/slave/process_isolator.cpp (original)
+++ incubator/mesos/trunk/src/slave/process_isolator.cpp Wed Apr 17 07:08:36 2013
@@ -109,17 +109,12 @@ void ProcessIsolator::launchExecutor(
const FrameworkID& frameworkId,
const FrameworkInfo& frameworkInfo,
const ExecutorInfo& executorInfo,
- const UUID& _,
+ const UUID& uuid,
const string& directory,
- const Resources& resources,
- const Option<string>& path)
+ const Resources& resources)
{
CHECK(initialized) << "Cannot launch executors before initialization!";
- bool checkpoint = frameworkInfo.checkpoint();
- CHECK(!(checkpoint && path.isNone()))
- << "Asked to checkpoint forked pid without providing a path";
-
const ExecutorID& executorId = executorInfo.executor_id();
LOG(INFO) << "Launching " << executorId
@@ -206,9 +201,16 @@ void ProcessIsolator::launchExecutor(
// have a chance to write the pid to disk. That would result in an
// orphaned executor process unknown to the slave when doing
// recovery.
- if (checkpoint) {
+ if (frameworkInfo.checkpoint()) {
+ const string& path = paths::getForkedPidPath(
+ paths::getMetaRootDir(flags.work_dir),
+ slaveId,
+ frameworkId,
+ executorId,
+ uuid);
+
std::cout << "Checkpointing forked pid " << getpid() << std::endl;
- state::checkpoint(path.get(), stringify(getpid()));
+ state::checkpoint(path, stringify(getpid()));
}
ExecutorLauncher* launcher = createExecutorLauncher(
Modified: incubator/mesos/trunk/src/slave/process_isolator.hpp
URL: http://svn.apache.org/viewvc/incubator/mesos/trunk/src/slave/process_isolator.hpp?rev=1468769&r1=1468768&r2=1468769&view=diff
==============================================================================
--- incubator/mesos/trunk/src/slave/process_isolator.hpp (original)
+++ incubator/mesos/trunk/src/slave/process_isolator.hpp Wed Apr 17 07:08:36 2013
@@ -67,8 +67,7 @@ public:
const ExecutorInfo& executorInfo,
const UUID& uuid,
const std::string& directory,
- const Resources& resources,
- const Option<std::string>& path);
+ const Resources& resources);
virtual void killExecutor(
const FrameworkID& frameworkId,
Modified: incubator/mesos/trunk/src/slave/slave.cpp
URL: http://svn.apache.org/viewvc/incubator/mesos/trunk/src/slave/slave.cpp?rev=1468769&r1=1468768&r2=1468769&view=diff
==============================================================================
--- incubator/mesos/trunk/src/slave/slave.cpp (original)
+++ incubator/mesos/trunk/src/slave/slave.cpp Wed Apr 17 07:08:36 2013
@@ -364,11 +364,11 @@ void Slave::initialize()
// Start recovery.
recover(flags.recover == "reconnect", flags.safe)
- .onAny(defer(self(), &Slave::_recover, params::_1));
+ .onAny(defer(self(), &Slave::_initialize, params::_1));
}
-void Slave::_recover(const Future<Nothing>& future)
+void Slave::_initialize(const Future<Nothing>& future)
{
if (!future.isReady()) {
LOG(FATAL) << "Recovery failure: " << future.failure();
@@ -606,42 +606,35 @@ void Slave::runTask(
Framework* framework = getFramework(frameworkId);
if (framework == NULL) {
- framework = new Framework(frameworkId, frameworkInfo, pid, flags);
+ framework = new Framework(
+ info.id(), frameworkId, frameworkInfo, pid, flags);
frameworks[frameworkId] = framework;
+ }
- if (frameworkInfo.checkpoint()) {
- // Checkpoint the framework info.
- string path = paths::getFrameworkInfoPath(
- paths::getMetaRootDir(flags.work_dir),
- info.id(),
- frameworkId);
+ CHECK_NOTNULL(framework);
- CHECK_SOME(state::checkpoint(path, frameworkInfo));
+ if (framework->state == Framework::INITIALIZING) {
+ LOG(INFO) << "Enqueuing task " << task.task_id()
+ << " until framework " << frameworkId
+ << " is initialized";
+ framework->pending.push_back(task);
+ return;
+ }
- // Checkpoint the framework pid.
- path = paths::getFrameworkPidPath(
- paths::getMetaRootDir(flags.work_dir),
- info.id(),
- frameworkId);
+ if (framework->state == Framework::TERMINATING) {
+ LOG(WARNING) << "WARNING! Asked to run task '" << task.task_id()
+ << "' for framework " << frameworkId
+ << " which is terminating";
- CHECK_SOME(state::checkpoint(path, framework->pid));
- }
- } else {
- if (framework->shutdown) {
- LOG(WARNING) << "WARNING! Asked to run task '" << task.task_id()
- << "' for framework " << frameworkId
- << " which is being shut down";
-
- const StatusUpdate& update = protobuf::createStatusUpdate(
- frameworkId,
- info.id(),
- task.task_id(),
- TASK_LOST,
- "Framework shutting down");
+ const StatusUpdate& update = protobuf::createStatusUpdate(
+ frameworkId,
+ info.id(),
+ task.task_id(),
+ TASK_LOST,
+ "Framework terminating");
- statusUpdate(update);
- return;
- }
+ statusUpdate(update);
+ return;
}
const ExecutorInfo& executorInfo = framework->getExecutorInfo(task);
@@ -651,91 +644,9 @@ void Slave::runTask(
// and queue the task until the executor has started.
Executor* executor = framework->getExecutor(executorId);
- if (executor != NULL) {
- if (executor->shutdown) {
- LOG(WARNING) << "WARNING! Asked to run task '" << task.task_id()
- << "' for framework " << frameworkId
- << " with executor '" << executorId
- << "' which is being shut down";
-
- const StatusUpdate& update = protobuf::createStatusUpdate(
- frameworkId,
- info.id(),
- task.task_id(),
- TASK_LOST,
- "Executor shutting down");
-
- statusUpdate(update);
- } else if (!executor->pid) {
- // Queue task until the executor starts up.
- LOG(INFO) << "Queuing task '" << task.task_id()
- << "' for executor " << executorId
- << " of framework '" << frameworkId;
-
- if (frameworkInfo.checkpoint()) {
- // Checkpoint the task.
- // TODO(vinod): Consider moving these 3 statements into
- // a helper function, since its used 3 times in this function!
- const string& path = paths::getTaskInfoPath(
- paths::getMetaRootDir(flags.work_dir),
- info.id(),
- executor->frameworkId,
- executor->id,
- executor->uuid,
- task.task_id());
-
- const Task& t = protobuf::createTask(
- task, TASK_STAGING, executor->id, executor->frameworkId);
-
- CHECK_SOME(state::checkpoint(path, t));
- }
-
- executor->queuedTasks[task.task_id()] = task;
- } else {
- if (frameworkInfo.checkpoint()) {
- // Checkpoint the task.
- const string& path = paths::getTaskInfoPath(
- paths::getMetaRootDir(flags.work_dir),
- info.id(),
- executor->frameworkId,
- executor->id,
- executor->uuid,
- task.task_id());
-
- const Task& t = protobuf::createTask(
- task, TASK_STAGING, executor->id, executor->frameworkId);
-
- CHECK_SOME(state::checkpoint(path, t));
- }
-
- // Add the task and send it to the executor.
- executor->addTask(task);
-
- stats.tasks[TASK_STAGING]++;
-
- // Update the resources.
- // TODO(Charles Reiss): The isolator is not guaranteed to update
- // the resources before the executor acts on its RunTaskMessage.
- dispatch(isolator,
- &Isolator::resourcesChanged,
- framework->id,
- executor->id,
- executor->resources);
-
- LOG(INFO) << "Sending task '" << task.task_id()
- << "' to executor '" << executorId
- << "' of framework " << framework->id;
-
- RunTaskMessage message;
- message.mutable_framework()->MergeFrom(framework->info);
- message.mutable_framework_id()->MergeFrom(framework->id);
- message.set_pid(framework->pid);
- message.mutable_task()->MergeFrom(task);
- send(executor->pid, message);
- }
- } else {
+ if (executor == NULL) {
// Launch an executor for this task.
- executor = framework->createExecutor(info.id(), executorInfo);
+ executor = framework->createExecutor(executorInfo);
files->attach(executor->directory, executor->directory)
.onAny(defer(self(),
@@ -743,52 +654,16 @@ void Slave::runTask(
params::_1,
executor->directory));
- // Check to see if we need to checkpoint this executor and the task.
- Option<string> pidPath;
- if (frameworkInfo.checkpoint()) {
- // Checkpoint the executor info.
- string path = paths::getExecutorInfoPath(
- paths::getMetaRootDir(flags.work_dir),
- info.id(),
- executor->frameworkId,
- executor->id);
-
- CHECK_SOME(state::checkpoint(path, executor->info));
-
- // Create the meta executor directory.
- // NOTE: This creates the 'latest' symlink in the meta directory.
- paths::createExecutorDirectory(
- paths::getMetaRootDir(flags.work_dir),
- info.id(),
- framework->id,
- executor->id,
- executor->uuid);
-
- // Checkpoint the task.
- path = paths::getTaskInfoPath(
- paths::getMetaRootDir(flags.work_dir),
- info.id(),
- executor->frameworkId,
- executor->id,
- executor->uuid,
- task.task_id());
-
- const Task& t = protobuf::createTask(
- task, TASK_STAGING, executor->id, executor->frameworkId);
-
- CHECK_SOME(state::checkpoint(path, t));
-
- // Get the path for isolator to checkpoint the forked pid.
- pidPath = paths::getForkedPidPath(
- paths::getMetaRootDir(flags.work_dir),
- info.id(),
- framework->id,
- executor->id,
- executor->uuid);
- }
-
- // Queue task until the executor starts up.
- executor->queuedTasks[task.task_id()] = task;
+ // Tell the isolator to launch the executor.
+ dispatch(isolator,
+ &Isolator::launchExecutor,
+ info.id(),
+ framework->id,
+ framework->info,
+ executor->info,
+ executor->uuid,
+ executor->directory,
+ executor->resources);
// Make sure the executor registers within the given timeout.
// NOTE: We send this message before dispatching the launchExecutor to
@@ -799,21 +674,66 @@ void Slave::runTask(
framework->id,
executor->id,
executor->uuid);
+ }
- // Tell the isolator to launch the executor. (TODO(benh): Make the
- // isolator a process so that it can block while trying to launch
- // the executor.)
- dispatch(isolator,
- &Isolator::launchExecutor,
- info.id(),
- framework->id,
- framework->info,
- executor->info,
- executor->uuid,
- executor->directory,
- executor->resources,
- pidPath);
+ CHECK_NOTNULL(executor);
+
+ if (executor->state == Executor::TERMINATING ||
+ executor->state == Executor::TERMINATED) {
+ LOG(WARNING) << "WARNING! Asked to run task '" << task.task_id()
+ << "' for framework " << frameworkId
+ << " with executor '" << executorId
+ << "' which is terminating/terminated";
+
+ const StatusUpdate& update = protobuf::createStatusUpdate(
+ frameworkId,
+ info.id(),
+ task.task_id(),
+ TASK_LOST,
+ "Executor terminating/terminated");
+
+ statusUpdate(update);
+ return;
}
+
+ // Checkpoint the task before we do anything else (this is a no-op
+ // if the framework doesn't have checkpointing enabled).
+ executor->checkpointTask(task);
+
+ stats.tasks[TASK_STAGING]++;
+
+ // Queue task if the executor is not yet registered.
+ if (executor->state == Executor::REGISTERING) {
+ LOG(INFO) << "Queuing task '" << task.task_id()
+ << "' for executor " << executorId
+ << " of framework '" << frameworkId;
+
+ executor->queuedTasks[task.task_id()] = task;
+ return;
+ }
+
+ // Add the task and send it to the executor.
+ executor->addTask(task);
+
+ // Update the resources.
+ // TODO(Charles Reiss): The isolator is not guaranteed to update
+ // the resources before the executor acts on its RunTaskMessage.
+ dispatch(isolator,
+ &Isolator::resourcesChanged,
+ framework->id,
+ executor->id,
+ executor->resources);
+
+ LOG(INFO) << "Sending task '" << task.task_id()
+ << "' to executor '" << executorId
+ << "' of framework " << frameworkId;
+
+ RunTaskMessage message;
+ message.mutable_framework()->MergeFrom(framework->info);
+ message.mutable_framework_id()->MergeFrom(framework->id);
+ message.set_pid(framework->pid);
+ message.mutable_task()->MergeFrom(task);
+ send(executor->pid, message);
}
@@ -835,6 +755,8 @@ void Slave::killTask(const FrameworkID&
return;
}
+ // TODO(benh/vinod): Check framework state.
+
// Tell the executor to kill the task if it is up and
// running, otherwise, consider the task lost.
Executor* executor = framework->getExecutor(taskId);
@@ -847,7 +769,7 @@ void Slave::killTask(const FrameworkID&
frameworkId, info.id(), taskId, TASK_LOST, "Cannot find executor");
statusUpdate(update);
- } else if (!executor->pid) {
+ } else if (executor->state == Executor::REGISTERING) {
// We are here, if the executor hasn't registered with the slave yet.
const StatusUpdate& update = protobuf::createStatusUpdate(
frameworkId,
@@ -893,11 +815,12 @@ void Slave::shutdownFramework(const Fram
if (framework != NULL) {
LOG(INFO) << "Shutting down framework " << framework->id;
- framework->shutdown = true;
+ framework->state = Framework::TERMINATING;
// Shut down all executors of this framework.
- // Note that the framework and its corresponding executors are removed from
- // the frameworks map by shutdownExecutorTimeout() or executorTerminated().
+ // Note that the framework and its corresponding executors are
+ // removed from the frameworks map by shutdownExecutorTimeout() or
+ // executorTerminated().
foreachvalue (Executor* executor, framework->executors) {
shutdownExecutor(framework, executor);
}
@@ -922,21 +845,26 @@ void Slave::schedulerMessage(
return;
}
+ // TODO(benh/vinod): Check framework state.
+
Executor* executor = framework->getExecutor(executorId);
if (executor == NULL) {
LOG(WARNING) << "Dropping message for executor '"
<< executorId << "' of framework " << frameworkId
<< " because executor does not exist";
stats.invalidFrameworkMessages++;
- } else if (!executor->pid) {
- // TODO(*): If executor is not started, queue framework message?
- // (It's probably okay to just drop it since frameworks can have
- // the executor send a message to the master to say when it's ready.)
+ } else if (executor->state == Executor::REGISTERING) {
+ // TODO(*): If executor is not yet registered, queue framework
+ // message? It's probably okay to just drop it since frameworks
+ // can have the executor send a message to the master to say when
+ // it's ready.
LOG(WARNING) << "Dropping message for executor '"
<< executorId << "' of framework " << frameworkId
<< " because executor is not running";
stats.invalidFrameworkMessages++;
} else {
+ // TODO(benh/vinod): Check executor state.
+
FrameworkToExecutorMessage message;
message.mutable_slave_id()->MergeFrom(slaveId);
message.mutable_framework_id()->MergeFrom(frameworkId);
@@ -953,8 +881,10 @@ void Slave::updateFramework(const Framew
{
Framework* framework = getFramework(frameworkId);
if (framework != NULL) {
+ // TODO(benh/vinod): Check framework state.
+
LOG(INFO) << "Updating framework " << frameworkId
- << " pid to " <<pid;
+ << " pid to " << pid;
framework->pid = pid;
if (framework->info.checkpoint()) {
@@ -1024,6 +954,8 @@ void Slave::_statusUpdateAcknowledgement
return;
}
+ // TODO(benh/vinod): Check framework state.
+
// Find the executor that has this update.
Executor* executor = framework->getExecutor(taskId);
if (executor == NULL) {
@@ -1032,6 +964,8 @@ void Slave::_statusUpdateAcknowledgement
return;
}
+ // TODO(benh/vinod): Check executor state.
+
executor->updates.remove(taskId, uuid);
// Cleanup the executor and framework, if possible.
@@ -1056,6 +990,8 @@ void Slave::registerExecutor(
return;
}
+ // TODO(benh/vinod): Check framework state.
+
Executor* executor = framework->getExecutor(executorId);
// Check the status of the executor.
@@ -1063,23 +999,20 @@ void Slave::registerExecutor(
LOG(WARNING) << "WARNING! Unexpected executor '" << executorId
<< "' registering for framework " << frameworkId;
reply(ShutdownExecutorMessage());
- } else if (executor->pid) {
- LOG(WARNING) << "WARNING! executor '" << executorId
+ } else if (executor->state != Executor::REGISTERING) {
+ LOG(WARNING) << "WARNING! Executor '" << executorId
<< "' of framework " << frameworkId
- << " is already running";
- reply(ShutdownExecutorMessage());
- } else if (executor->shutdown) {
- LOG(WARNING) << "WARNING! executor '" << executorId
- << "' of framework " << frameworkId
- << " should be shutting down";
+ << " is not expected to be registering";
reply(ShutdownExecutorMessage());
} else {
// Save the pid for the executor.
executor->pid = from;
+ executor->state = Executor::RUNNING;
+
if (framework->info.checkpoint()) {
- // TODO(vinod): This checkpointing should be done asynchronously as it is
- // in the fast path of the slave!
+ // TODO(vinod): This checkpointing should be done asynchronously
+ // as it is in the fast path of the slave!
// Checkpoint the libprocess pid.
string path = paths::getLibprocessPidPath(
@@ -1153,6 +1086,8 @@ void Slave::reregisterExecutor(
executor->pid = from; // Update the pid, to signal re-registration.
+ executor->state = Executor::RUNNING;
+
// Send re-registration message to the executor.
ExecutorReregisteredMessage message;
message.mutable_slave_id()->MergeFrom(info.id());
@@ -1206,6 +1141,7 @@ void Slave::reregisterExecutorTimeout()
// exited! This is because, if the executor properly exited, it
// should have already been identified by the isolator (via
// reaper) and cleaned up!
+ // TODO(benh/vinod): Check executor state.
if (!executor->pid) {
LOG(INFO) << "Shutting down un-reregistered executor "
<< executor->id << " of framework " << framework->id;
@@ -1231,13 +1167,13 @@ void Slave::statusUpdate(const StatusUpd
LOG(INFO) << "Handling status update " << update;
- Framework* framework = getFramework(update.framework_id());
- Executor* executor = NULL;
+ Executor* executor = NULL; // Need to pass to forwardUpdate.
+ Framework* framework = getFramework(update.framework_id());
if (framework != NULL) {
executor = framework->getExecutor(status.task_id());
-
if (executor != NULL) {
+ // TODO(benh/vinod): Check executor state.
executor->updateTaskState(status.task_id(), status.state());
executor->updates.put(status.task_id(), UUID::fromBytes(update.uuid()));
@@ -1313,8 +1249,13 @@ void Slave::forwardUpdate(const StatusUp
stats.tasks[update.status().state()]++;
stats.validStatusUpdates++;
+ // TODO(benh/vinod): Have the StatusUpdateManager just take
+ // checkpoint and determine the path itself. It can log a warning if
+ // it can't generate the path because there is no executor ID. How
+ // else can we persist status updates for tasks that don't have an
+ // executor ID?
statusUpdateManager->update(update, checkpoint, path)
- .onAny(defer(self(), &Slave::_forwardUpdate, params::_1, update, pid));;
+ .onAny(defer(self(), &Slave::_forwardUpdate, params::_1, update, pid));
}
@@ -1367,6 +1308,8 @@ void Slave::executorMessage(
return;
}
+ // TODO(benh/vinod): Check framework state.
+
LOG(INFO) << "Sending message for framework " << frameworkId
<< " to " << framework->pid;
@@ -1430,6 +1373,8 @@ void Slave::executorStarted(
return;
}
+ // TODO(benh/vinod): Check framework state.
+
Executor* executor = framework->getExecutor(executorId);
if (executor == NULL) {
LOG(WARNING) << "Invalid executor '" << executorId
@@ -1438,6 +1383,8 @@ void Slave::executorStarted(
return;
}
+ // TODO(benh/vinod): Check executor state.
+
monitor.watch(
frameworkId,
executorId,
@@ -1497,6 +1444,8 @@ void Slave::executorTerminated(
return;
}
+ // TODO(benh/vinod): Check framework state.
+
Executor* executor = framework->getExecutor(executorId);
if (executor == NULL) {
LOG(WARNING) << "Invalid executor '" << executorId
@@ -1505,7 +1454,9 @@ void Slave::executorTerminated(
return;
}
- executor->terminated = true;
+ // TODO(benh/vinod): Check executor state.
+
+ executor->state = Executor::TERMINATED;
bool isCommandExecutor = false;
@@ -1513,12 +1464,12 @@ void Slave::executorTerminated(
// If the isolator destroyed the executor (e.g., due to OOM event)
// or if this is a command executor, we send TASK_FAILED status updates
// instead of TASK_LOST.
- // NOTE: We don't send updates if the framework is shutting down,
+ // NOTE: We don't send updates if the framework is terminating
// because we don't want the status update manager to keep retrying
- // these updates for the lack for ACKs from the scheduler. Also,
+ // these updates since it won't receive ACKs from the scheduler. Also,
// the status update manager should have already cleaned up all the
- // status update streams for a framework that is shutting down.
- if (!framework->shutdown) {
+ // status update streams for a framework that is terminating.
+ if (framework->state != Framework::TERMINATING) {
StatusUpdate update;
// Transition all live launched tasks.
@@ -1592,11 +1543,12 @@ void Slave::cleanup(Framework* framework
CHECK_NOTNULL(executor);
// Cleanup this executor if it has terminated and either has no
- // pending updates or the framework is shutting down. We don't
- // care for pending updates when framework is shutting down
+ // pending updates or the framework is terminating. We don't
+ // care for pending updates when a framework is terminating
// because the framework cannot ACK them.
- if (executor->terminated &&
- (executor->updates.empty() || framework->shutdown)) {
+ if (executor->state == Executor::TERMINATED &&
+ (executor->updates.empty() ||
+ framework->state == Framework::TERMINATING)) {
// Schedule the executor directory to get garbage collected.
gc.schedule(flags.gc_delay, executor->directory)
@@ -1634,8 +1586,8 @@ void Slave::cleanup()
string metaDir = paths::getMetaRootDir(flags.work_dir);
// Archive and delete the meta directory, to allow incompatible upgrades.
- LOG(INFO) << "Archiving and deleting the meta directory '" << metaDir
- << "' to allow incompatible upgrade!";
+ LOG(INFO) << "Archiving and deleting the meta directory '"
+ << metaDir << "' to allow incompatible upgrade!";
// Create the archive directory, if it doesn't exist.
Try<Nothing> result = os::mkdir(archiveDir);
@@ -1644,12 +1596,12 @@ void Slave::cleanup()
metaDir, path::join(archiveDir, info.id().value() + ".tar.gz"));
if (result.isError()) {
- LOG(ERROR) << "Failed to archive meta directory '" << archiveDir
- << "': " << result.error();
+ LOG(ERROR) << "Failed to archive meta directory '"
+ << archiveDir << "': " << result.error();
}
} else {
- LOG(ERROR) << "Failed to create archive directory '" << archiveDir
- << ": " << result.error();
+ LOG(ERROR) << "Failed to create archive directory '"
+ << archiveDir << ": " << result.error();
}
result = os::rmdir(metaDir);
@@ -1681,7 +1633,7 @@ void Slave::shutdownExecutor(Framework*
LOG(INFO) << "Shutting down executor '" << executor->id
<< "' of framework " << framework->id;
- executor->shutdown = true;
+ executor->state = Executor::TERMINATING;
// If the executor hasn't yet registered, this message
// will be dropped to the floor!
@@ -1710,6 +1662,8 @@ void Slave::shutdownExecutorTimeout(
return;
}
+ // TODO(benh/vinod): Check framework state.
+
Executor* executor = framework->getExecutor(executorId);
if (executor == NULL) {
LOG(INFO) << "Executor '" << executorId
@@ -1718,6 +1672,8 @@ void Slave::shutdownExecutorTimeout(
return;
}
+ // TODO(benh/vinod): Check executor state.
+
// Make sure this timeout is valid.
if (executor->uuid != uuid ) {
LOG(INFO) << "A new executor '" << executorId
@@ -1748,6 +1704,8 @@ void Slave::registerExecutorTimeout(
return;
}
+ // TODO(benh/vinod): Check framework state.
+
Executor* executor = framework->getExecutor(executorId);
if (executor == NULL) {
LOG(INFO) << "Executor '" << executorId
@@ -1756,24 +1714,26 @@ void Slave::registerExecutorTimeout(
return;
}
+ // TODO(benh/vinod): Check executor state.
+
// Make sure this timeout is valid.
if (executor->uuid != uuid ) {
- LOG(INFO) << "A new executor '" << executorId
- << "' of framework " << frameworkId
- << " with run " << executor->uuid
- << " seems to be active. Ignoring the shutdown timeout"
- << " for the old executor run " << uuid;
- return;
+ LOG(INFO) << "A new executor '" << executorId
+ << "' of framework " << frameworkId
+ << " with run " << executor->uuid
+ << " seems to be active. Ignoring the shutdown timeout"
+ << " for the old executor run " << uuid;
+ return;
}
- // Shutdown the executor if it has not registered yet.
+ // Terminate the executor if it has not registered yet.
if (!executor->pid) {
- LOG(INFO) << "Shutting down executor " << executor->id
+ LOG(INFO) << "Terminating executor " << executor->id
<< " of framework " << framework->id
<< " because it did not register within "
<< flags.executor_registration_timeout;
- executor->shutdown = true;
+ executor->state = Executor::TERMINATING;
// Immediately kill the executor.
dispatch(isolator, &Isolator::killExecutor, framework->id, executor->id);
@@ -1875,149 +1835,94 @@ Future<Nothing> Slave::recover(bool reco
return statusUpdateManager->recover(metaDir, state.get())
.then(defer(isolator, &Isolator::recover, state.get()))
.then(defer(self(),
- &Self::recoverExecutors,
+ &Self::_recover,
state.get(),
reconnect));
}
-Future<Nothing> Slave::recoverExecutors(
- const SlaveState& state,
- bool reconnect)
-{
- LOG(INFO) << "Recovering executors";
-
- foreachvalue (const FrameworkState& framework_, state.frameworks) {
- foreachvalue (const ExecutorState& executor_, framework_.executors) {
- LOG(INFO) << "Recovering executor '" << executor_.id
- << "' of framework " << framework_.id;
-
- if (executor_.info.isNone()) {
- LOG(WARNING) << "Skipping recovery of executor '" << executor_.id
- << "' of framework " << framework_.id
- << " because its info cannot be recovered";
- continue;
- }
+Future<Nothing> Slave::_recover(const SlaveState& state, bool reconnect)
+{
+ foreachvalue (const FrameworkState& frameworkState, state.frameworks) {
+ recover(frameworkState, reconnect);
+ }
- if (executor_.latest.isNone()) {
- LOG(WARNING) << "Skipping recovery of executor '" << executor_.id
- << "' of framework " << framework_.id
- << " because its latest run cannot be recovered";
- continue;
- }
+ if (reconnect) {
+ // Cleanup unregistered executors after a delay.
+ delay(EXECUTOR_REREGISTER_TIMEOUT,
+ self(),
+ &Slave::reregisterExecutorTimeout);
- // We are only interested in the latest run of the executor!
- const UUID& uuid = executor_.latest.get();
- CHECK(executor_.runs.contains(uuid));
- const RunState& run = executor_.runs.get(uuid).get();
-
- // Create framework, if necessary.
- Framework* framework = getFramework(framework_.id);
- if (framework == NULL) {
- CHECK_SOME(framework_.info);
- CHECK_SOME(framework_.pid);
+ // We set 'recovered' flag inside reregisterExecutorTimeout(),
+ // so that when the slave re-registers with master it can
+ // correctly inform the master about the launched tasks.
+ return recovered.future();
+ }
- framework = new Framework(
- framework_.id, framework_.info.get(), framework_.pid.get(), flags);
+ return Nothing();
+}
- frameworks[framework_.id] = framework;
- }
- // Create executor.
- const string& directory = paths::getExecutorRunPath(
- flags.work_dir, info.id(), framework_.id, executor_.id, uuid);
-
- Executor* executor =
- new Executor(framework_.id, executor_.info.get(), uuid, directory);
-
- // Recover the tasks.
- foreachvalue (const TaskState& task, run.tasks) {
- if (task.info.isNone()) {
- LOG(WARNING) << "Skipping recovery of task " << task.id
- << " because its info cannot be recovered";
- continue;
- }
+void Slave::recover(const FrameworkState& state, bool reconnect)
+{
+ CHECK_SOME(state.info);
+ CHECK_SOME(state.pid);
- executor->launchedTasks[task.id] = new Task(task.info.get());
+ CHECK(!frameworks.contains(state.id)); // TODO(vinod): Is this correct?
+ Framework* framework = new Framework(
+ info.id(), state.id, state.info.get(), state.pid.get(), flags);
- // NOTE: Since some tasks might have been terminated when the slave
- // was down, the executor resources we capture here is an upper-bound.
- // The actual resources needed (for live tasks) by the isolator
- // will be calculated when the executor re-registers.
- executor->resources += task.info.get().resources();
-
- // Read updates to get the latest state of the task.
- foreach (const StatusUpdate& update, task.updates) {
- executor->updateTaskState(task.id, update.status().state());
- executor->updates.put(task.id, UUID::fromBytes(update.uuid()));
-
- // Remove the task if it received a terminal update.
- if (protobuf::isTerminalState(update.status().state())) {
- executor->removeTask(task.id);
-
- // If the terminal update has been acknowledged, remove
- // it from pending tasks.
- if (task.acks.contains(UUID::fromBytes(update.uuid()))) {
- executor->updates.remove(task.id, UUID::fromBytes(update.uuid()));
- }
- break;
- }
- }
- }
+ frameworks[framework->id] = framework;
- // Add the executor to the framework.
- framework->executors[executor_.id] = executor;
+ // Now recover the executors for this framework.
+ foreachvalue (const ExecutorState& executorState, state.executors) {
+ Executor* executor = framework->recoverExecutor(executorState);
- files->attach(executor->directory, executor->directory)
- .onAny(defer(self(),
- &Self::fileAttached,
- params::_1,
- executor->directory));
-
- // Reconnect with executor, if possible.
- if (reconnect && run.libprocessPid.isSome()) {
- CHECK_SOME(run.forkedPid);
-
- LOG(INFO) << "Sending reconnect request to executor " << executor_.id
- << " of framework " << framework_.id
- << " at " << run.libprocessPid.get();
+ // Continue to next executor if this one couldn't be recovered.
+ if (executor == NULL) {
+ continue;
+ }
+
+ // Expose the executor's files.
+ files->attach(executor->directory, executor->directory)
+ .onAny(defer(self(),
+ &Self::fileAttached,
+ params::_1,
+ executor->directory));
+
+ // And monitor the executor.
+ monitor.watch(
+ framework->id,
+ executor->id,
+ executor->info,
+ flags.resource_monitoring_interval)
+ .onAny(lambda::bind(_watch, lambda::_1, framework->id, executor->id));
+
+ if (reconnect) {
+ if (executor->pid) {
+ LOG(INFO) << "Sending reconnect request to executor " << executor->id
+ << " of framework " << framework->id
+ << " at " << executor->pid;
ReconnectExecutorMessage message;
message.mutable_slave_id()->MergeFrom(info.id());
- send(run.libprocessPid.get(), message);
- } else if (run.libprocessPid.isSome()) {
+ send(executor->pid, message);
+ } else {
+ // TODO(vinod): What's supposed to happen here?
+ }
+ } else {
+ if (executor->pid) {
// Cleanup executors.
- LOG(INFO) << "Sending shutdown to executor " << executor_.id
- << " of framework " << framework_.id
- << " at " << run.libprocessPid.get();
+ LOG(INFO) << "Sending shutdown to executor " << executor->id
+ << " of framework " << framework->id
+ << " to " << executor->pid;
- executor->pid = run.libprocessPid.get();
shutdownExecutor(framework, executor);
+ } else {
+ // TODO(vinod): What's supposed to happen here?
}
-
- // Beging monitoring the executor.
- monitor.watch(
- framework_.id,
- executor_.id,
- executor_.info.get(),
- flags.resource_monitoring_interval)
- .onAny(lambda::bind(_watch, lambda::_1, framework_.id, executor_.id));
}
}
-
- if (reconnect) {
- // Cleanup unregistered executors after a delay.
- delay(EXECUTOR_REREGISTER_TIMEOUT,
- self(),
- &Slave::reregisterExecutorTimeout);
-
- // We set 'recovered' flag inside reregisterExecutorTimeout(),
- // so that when the slave re-registers with master it can
- // correctly inform the master about the launched tasks.
- return recovered.future();
- }
-
- return Nothing();
}
} // namespace slave {
Modified: incubator/mesos/trunk/src/slave/slave.hpp
URL: http://svn.apache.org/viewvc/incubator/mesos/trunk/src/slave/slave.hpp?rev=1468769&r1=1468768&r2=1468769&view=diff
==============================================================================
--- incubator/mesos/trunk/src/slave/slave.hpp (original)
+++ incubator/mesos/trunk/src/slave/slave.hpp Wed Apr 17 07:08:36 2013
@@ -181,6 +181,9 @@ protected:
virtual void finalize();
virtual void exited(const UPID& pid);
+ // This is called when recovery finishes.
+ void _initialize(const Future<Nothing>& future);
+
void fileAttached(const Future<Nothing>& result, const std::string& path);
void detachFile(const Future<Nothing>& result, const std::string& path);
@@ -223,14 +226,10 @@ protected:
// live executors. Otherwise, the slave attempts to shutdown/kill them.
// If 'safe' is true, any recovery errors are considered fatal.
Future<Nothing> recover(bool reconnect, bool safe);
+ Future<Nothing> _recover(const state::SlaveState& state, bool reconnect);
- // This is called when recovery finishes.
- void _recover(const Future<Nothing>& future);
-
- // Recovers executors by reconnecting/killing as necessary.
- Future<Nothing> recoverExecutors(
- const state::SlaveState& state,
- bool reconnect);
+ // Helper to recover a framework from the specified state.
+ void recover(const state::FrameworkState& state, bool reconnect);
// Called when an executor terminates or a status update
// acknowledgement is handled by the status update manager.
@@ -310,20 +309,46 @@ private:
// Information describing an executor.
struct Executor
{
- Executor(const FrameworkID& _frameworkId,
+ Executor(const SlaveID& _slaveId,
+ const FrameworkID& _frameworkId,
const ExecutorInfo& _info,
const UUID& _uuid,
- const std::string& _directory)
- : id(_info.executor_id()),
+ const std::string& _directory,
+ const Flags& _flags,
+ bool _checkpoint)
+ : state(REGISTERING), // TODO(benh): Skipping INITIALIZING for now.
+ slaveId(_slaveId),
+ id(_info.executor_id()),
info(_info),
frameworkId(_frameworkId),
- directory(_directory),
uuid(_uuid),
+ directory(_directory),
+ flags(_flags),
+ checkpoint(_checkpoint),
pid(UPID()),
- shutdown(false),
- terminated(false),
resources(_info.resources()),
- completedTasks(MAX_COMPLETED_TASKS_PER_EXECUTOR) {}
+ completedTasks(MAX_COMPLETED_TASKS_PER_EXECUTOR)
+ {
+ if (checkpoint) {
+ // Checkpoint the executor info.
+ const std::string& path = paths::getExecutorInfoPath(
+ paths::getMetaRootDir(flags.work_dir),
+ slaveId,
+ frameworkId,
+ id);
+
+ CHECK_SOME(state::checkpoint(path, info));
+
+ // Create the meta executor directory.
+ // NOTE: This creates the 'latest' symlink in the meta directory.
+ paths::createExecutorDirectory(
+ paths::getMetaRootDir(flags.work_dir),
+ slaveId,
+ frameworkId,
+ id,
+ uuid);
+ }
+ }
~Executor()
{
@@ -339,8 +364,8 @@ struct Executor
// maybe we shouldn't make this a fatal error.
CHECK(!launchedTasks.contains(task.task_id()));
- Task* t =
- new Task(protobuf::createTask(task, TASK_STAGING, id, frameworkId));
+ Task* t = new Task(
+ protobuf::createTask(task, TASK_STAGING, id, frameworkId));
launchedTasks[task.task_id()] = t;
resources += task.resources();
@@ -366,6 +391,59 @@ struct Executor
}
}
+ void checkpointTask(const TaskInfo& task)
+ {
+ if (checkpoint) {
+ const std::string& path = paths::getTaskInfoPath(
+ paths::getMetaRootDir(flags.work_dir),
+ slaveId,
+ frameworkId,
+ id,
+ uuid,
+ task.task_id());
+
+ const Task& t = protobuf::createTask(
+ task, TASK_STAGING, id, frameworkId);
+
+ CHECK_SOME(state::checkpoint(path, t));
+ }
+ }
+
+ void recoverTask(const state::TaskState& state)
+ {
+ if (state.info.isNone()) {
+ LOG(WARNING) << "Skipping recovery of task " << state.id
+ << " because its info cannot be recovered";
+ return;
+ }
+
+ launchedTasks[state.id] = new Task(state.info.get());
+
+ // NOTE: Since some tasks might have been terminated when the
+ // slave was down, the executor resources we capture here is an
+ // upper-bound. The actual resources needed (for live tasks) by
+ // the isolator will be calculated when the executor re-registers.
+ resources += state.info.get().resources();
+
+ // Read updates to get the latest state of the task.
+ foreach (const StatusUpdate& update, state.updates) {
+ updateTaskState(state.id, update.status().state());
+ updates.put(state.id, UUID::fromBytes(update.uuid()));
+
+ // Remove the task if it received a terminal update.
+ if (protobuf::isTerminalState(update.status().state())) {
+ removeTask(state.id);
+
+ // If the terminal update has been acknowledged, remove it
+ // from pending tasks.
+ if (state.acks.contains(UUID::fromBytes(update.uuid()))) {
+ updates.remove(state.id, UUID::fromBytes(update.uuid()));
+ }
+ break;
+ }
+ }
+ }
+
void updateTaskState(const TaskID& taskId, TaskState state)
{
if (launchedTasks.contains(taskId)) {
@@ -373,23 +451,30 @@ struct Executor
}
}
+ enum {
+ INITIALIZING,
+ REGISTERING,
+ RUNNING,
+ TERMINATING,
+ TERMINATED,
+ } state;
+
+ const SlaveID slaveId;
+
const ExecutorID id;
const ExecutorInfo info;
const FrameworkID frameworkId;
- const std::string directory;
-
const UUID uuid; // Distinguishes executor instances with same ExecutorID.
- UPID pid;
+ const std::string directory;
- bool shutdown; // Indicates if executor is being shut down.
+ const Flags flags;
- // Indicates if the executor has terminated. We need this
- // because a terminated executor might still have pending
- // status updates that are not yet acknowledged.
- bool terminated;
+ const bool checkpoint;
+
+ UPID pid;
Resources resources; // Currently consumed resources.
@@ -409,16 +494,37 @@ private:
// Information about a framework.
struct Framework
{
- Framework(const FrameworkID& _id,
+ Framework(const SlaveID& _slaveId,
+ const FrameworkID& _id,
const FrameworkInfo& _info,
const UPID& _pid,
const Flags& _flags)
- : id(_id),
+ : state(RUNNING), // TODO(benh): Skipping INITIALIZING for now.
+ slaveId(_slaveId),
+ id(_id),
info(_info),
pid(_pid),
flags(_flags),
- shutdown(false),
- completedExecutors(MAX_COMPLETED_EXECUTORS_PER_FRAMEWORK) {}
+ completedExecutors(MAX_COMPLETED_EXECUTORS_PER_FRAMEWORK)
+ {
+ if (info.checkpoint()) {
+ // Checkpoint the framework info.
+ std::string path = paths::getFrameworkInfoPath(
+ paths::getMetaRootDir(flags.work_dir),
+ slaveId,
+ id);
+
+ CHECK_SOME(state::checkpoint(path, info));
+
+ // Checkpoint the framework pid.
+ path = paths::getFrameworkPidPath(
+ paths::getMetaRootDir(flags.work_dir),
+ slaveId,
+ id);
+
+ CHECK_SOME(state::checkpoint(path, pid));
+ }
+ }
~Framework()
{
@@ -481,21 +587,28 @@ struct Framework
return task.executor();
}
- Executor* createExecutor(const SlaveID& slaveId,
- const ExecutorInfo& executorInfo)
+ Executor* createExecutor(const ExecutorInfo& executorInfo)
{
- // We create a UUID for the new executor. The UUID uniquely identifies this
- // new instance of the executor across executors sharing the same executorID
- // that may have previously run. It also provides a means for the executor
- // to have a unique directory.
- UUID executorUUID = UUID::random();
+ // We create a UUID for the new executor. The UUID uniquely
+ // identifies this new instance of the executor across executors
+ // sharing the same executorID that may have previously run. It
+ // also provides a means for the executor to have a unique
+ // directory.
+ UUID uuid = UUID::random();
// Create a directory for the executor.
const std::string& directory = paths::createExecutorDirectory(
- flags.work_dir, slaveId, id, executorInfo.executor_id(), executorUUID);
+ flags.work_dir, slaveId, id, executorInfo.executor_id(), uuid);
+
+ Executor* executor = new Executor(
+ slaveId,
+ id,
+ executorInfo,
+ uuid,
+ directory,
+ flags,
+ info.checkpoint());
- Executor* executor =
- new Executor(id, executorInfo, executorUUID, directory);
CHECK(!executors.contains(executorInfo.executor_id()));
executors[executorInfo.executor_id()] = executor;
return executor;
@@ -533,6 +646,70 @@ struct Framework
return NULL;
}
+ Executor* recoverExecutor(const state::ExecutorState& state)
+ {
+ LOG(INFO) << "Recovering executor '" << state.id
+ << "' of framework " << id;
+
+ if (state.info.isNone()) {
+ LOG(WARNING) << "Skipping recovery of executor '" << state.id
+ << "' of framework " << id
+ << " because its info cannot be recovered";
+ return NULL;
+ }
+
+ if (state.latest.isNone()) {
+ LOG(WARNING) << "Skipping recovery of executor '" << state.id
+ << "' of framework " << id
+ << " because its latest run cannot be recovered";
+ return NULL;
+ }
+
+ // We are only interested in the latest run of the executor!
+ const UUID& uuid = state.latest.get();
+
+ // Create executor.
+ const std::string& directory = paths::getExecutorRunPath(
+ flags.work_dir, slaveId, id, state.id, uuid);
+
+ Executor* executor = new Executor(
+ slaveId,
+ id,
+ state.info.get(),
+ uuid,
+ directory,
+ flags,
+ info.checkpoint());
+
+ CHECK(state.runs.contains(uuid));
+ const state::RunState& run = state.runs.get(uuid).get();
+
+ // Recover the libprocess PID if possible.
+ if (run.libprocessPid.isSome()) {
+ CHECK_SOME(run.forkedPid); // TODO(vinod): Why this check?
+ executor->pid = run.libprocessPid.get();
+ }
+
+ // And finally recover all the executor's tasks.
+ foreachvalue (const state::TaskState& taskState, run.tasks) {
+ executor->recoverTask(taskState);
+ }
+
+ // Add the executor to the framework.
+ executors[executor->id] = executor;
+
+ return executor;
+ }
+
+ enum {
+ INITIALIZING,
+ RUNNING,
+ TERMINATING,
+ TERMINATED,
+ } state;
+
+ const SlaveID slaveId;
+
const FrameworkID id;
const FrameworkInfo info;
@@ -540,7 +717,7 @@ struct Framework
const Flags flags;
- bool shutdown; // Indicates if framework is being shut down.
+ std::vector<TaskInfo> pending; // Pending tasks (used while INITIALIZING).
// Current running executors.
hashmap<ExecutorID, Executor*> executors;