You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@mesos.apache.org by vi...@apache.org on 2013/04/17 09:08:41 UTC
svn commit: r1468770 - in /incubator/mesos/trunk/src/slave: slave.cpp
slave.hpp
Author: vinodkone
Date: Wed Apr 17 07:08:40 2013
New Revision: 1468770
URL: http://svn.apache.org/r1468770
Log:
Moved slave::Framework/Executor implementation into slave.cpp.
Review: https://reviews.apache.org/r/10112
Modified:
incubator/mesos/trunk/src/slave/slave.cpp
incubator/mesos/trunk/src/slave/slave.hpp
Modified: incubator/mesos/trunk/src/slave/slave.cpp
URL: http://svn.apache.org/viewvc/incubator/mesos/trunk/src/slave/slave.cpp?rev=1468770&r1=1468769&r2=1468770&view=diff
==============================================================================
--- incubator/mesos/trunk/src/slave/slave.cpp (original)
+++ incubator/mesos/trunk/src/slave/slave.cpp Wed Apr 17 07:08:40 2013
@@ -162,7 +162,7 @@ Slave::Slave(const slave::Flags& _flags,
}
}
- std::string ports;
+ string ports;
if (resources.ports().isSome()) {
// TODO(vinod): Validate the ports range.
ports = stringify(resources.ports().get());
@@ -441,7 +441,7 @@ void Slave::fileAttached(const Future<No
}
-void Slave::detachFile(const Future<Nothing>& result, const std::string& path)
+void Slave::detachFile(const Future<Nothing>& result, const string& path)
{
CHECK(!result.isDiscarded());
files->detach(path);
@@ -702,7 +702,7 @@ void Slave::runTask(
stats.tasks[TASK_STAGING]++;
- // Queue task if the executor is not yet registered.
+ // Queue task if the executor has not yet registered.
if (executor->state == Executor::REGISTERING) {
LOG(INFO) << "Queuing task '" << task.task_id()
<< "' for executor " << executorId
@@ -1143,12 +1143,10 @@ void Slave::reregisterExecutorTimeout()
// reaper) and cleaned up!
// TODO(benh/vinod): Check executor state.
if (!executor->pid) {
- LOG(INFO) << "Shutting down un-reregistered executor "
- << executor->id << " of framework " << framework->id;
+ LOG(INFO) << "Killing an un-reregistered executor " << executor->id
+ << " of framework " << framework->id;
- // TODO(vinod): Call shutdownExecutor() when it supports
- // immediate shutdown of the executor.
- shutdownExecutorTimeout(framework->id, executor->id, executor->uuid);
+ dispatch(isolator, &Isolator::killExecutor, framework->id, executor->id);
}
}
}
@@ -1202,11 +1200,12 @@ void Slave::statusUpdate(const StatusUpd
}
// Forward the update to the status update manager.
- // NOTE: We forward the update even if the framework/executor is unknown
- // because currently there is no persistent state in the master.
- // The lack of persistence might lead frameworks to use out-of-band means
- // to figure out the task state mismatch and use status updates to reconcile.
- // We need to revisit this issue once master has persistent state.
+ // NOTE: We forward the update even if the framework/executor is
+ // unknown because currently there is no persistent state in the
+ // master. The lack of persistence might lead frameworks to use
+ // out-of-band means to figure out the task state mismatch and use
+ // status updates to reconcile. We need to revisit this issue once
+ // master has persistent state.
forwardUpdate(update, executor);
}
@@ -1868,7 +1867,7 @@ void Slave::recover(const FrameworkState
CHECK_SOME(state.info);
CHECK_SOME(state.pid);
- CHECK(!frameworks.contains(state.id)); // TODO(vinod): Is this correct?
+ CHECK(!frameworks.contains(state.id));
Framework* framework = new Framework(
info.id(), state.id, state.info.get(), state.pid.get(), flags);
@@ -1908,7 +1907,9 @@ void Slave::recover(const FrameworkState
message.mutable_slave_id()->MergeFrom(info.id());
send(executor->pid, message);
} else {
- // TODO(vinod): What's supposed to happen here?
+ LOG(INFO) << "Unable to reconnect to executor " << executor->id
+ << " of framework " << framework->id
+ << " because no libprocess PID was found";
}
} else {
if (executor->pid) {
@@ -1919,12 +1920,374 @@ void Slave::recover(const FrameworkState
shutdownExecutor(framework, executor);
} else {
- // TODO(vinod): What's supposed to happen here?
+ LOG(INFO) << "Killing executor " << executor->id
+ << " of framework " << framework->id
+ << " because no libprocess PID was found";
+ dispatch(isolator, &Isolator::killExecutor, framework->id, executor->id);
+ }
+ }
+ }
+}
+
+
+
+Framework::Framework(
+ const SlaveID& _slaveId,
+ const FrameworkID& _id,
+ const FrameworkInfo& _info,
+ const UPID& _pid,
+ const Flags& _flags)
+ : state(RUNNING), // TODO(benh): Skipping INITIALIZING for now.
+ slaveId(_slaveId),
+ id(_id),
+ info(_info),
+ pid(_pid),
+ flags(_flags),
+ completedExecutors(MAX_COMPLETED_EXECUTORS_PER_FRAMEWORK)
+{
+ if (info.checkpoint()) {
+ // Checkpoint the framework info.
+ string path = paths::getFrameworkInfoPath(
+ paths::getMetaRootDir(flags.work_dir),
+ slaveId,
+ id);
+
+ CHECK_SOME(state::checkpoint(path, info));
+
+ // Checkpoint the framework pid.
+ path = paths::getFrameworkPidPath(
+ paths::getMetaRootDir(flags.work_dir),
+ slaveId,
+ id);
+
+ CHECK_SOME(state::checkpoint(path, pid));
+ }
+}
+
+
+Framework::~Framework()
+{
+ // We own the non-completed executor pointers, so they need to be deleted.
+ foreachvalue (Executor* executor, executors) {
+ delete executor;
+ }
+}
+
+
+ExecutorInfo Framework::getExecutorInfo(const TaskInfo& task)
+{
+ CHECK(task.has_executor() != task.has_command());
+
+ if (task.has_command()) {
+ ExecutorInfo executor;
+
+ // Command executors share the same id as the task.
+ executor.mutable_executor_id()->set_value(task.task_id().value());
+
+ // Prepare an executor name which includes information on the
+ // command being launched.
+ string name =
+ "(Task: " + task.task_id().value() + ") " + "(Command: sh -c '";
+ if (task.command().value().length() > 15) {
+ name += task.command().value().substr(0, 12) + "...')";
+ } else {
+ name += task.command().value() + "')";
+ }
+
+ executor.set_name("Command Executor " + name);
+ executor.set_source(task.task_id().value());
+
+ // Copy the CommandInfo to get the URIs and environment, but
+ // update it to invoke 'mesos-executor' (unless we couldn't
+ // resolve 'mesos-executor' via 'realpath', in which case just
+ // echo the error and exit).
+ executor.mutable_command()->MergeFrom(task.command());
+
+ Try<string> path = os::realpath(
+ path::join(flags.launcher_dir, "mesos-executor"));
+
+ if (path.isSome()) {
+ executor.mutable_command()->set_value(path.get());
+ } else {
+ executor.mutable_command()->set_value(
+ "echo '" + path.error() + "'; exit 1");
+ }
+
+ // TODO(benh): Set some resources for the executor so that a task
+ // doesn't end up getting killed because the amount of resources
+ // of the executor went over those allocated. Note that this might
+ // mean that the number of resources on the machine will actually
+ // be slightly oversubscribed, so we'll need to reevaluate with
+ // respect to resources that can't be oversubscribed.
+ return executor;
+ }
+
+ return task.executor();
+}
+
+
+Executor* Framework::createExecutor(const ExecutorInfo& executorInfo)
+{
+ // We create a UUID for the new executor. The UUID uniquely
+ // identifies this new instance of the executor across executors
+ // sharing the same executorID that may have previously run. It
+ // also provides a means for the executor to have a unique
+ // directory.
+ UUID uuid = UUID::random();
+
+ // Create a directory for the executor.
+ const string& directory = paths::createExecutorDirectory(
+ flags.work_dir, slaveId, id, executorInfo.executor_id(), uuid);
+
+ Executor* executor = new Executor(
+ slaveId, id, executorInfo, uuid, directory, flags, info.checkpoint());
+
+ CHECK(!executors.contains(executorInfo.executor_id()));
+ executors[executorInfo.executor_id()] = executor;
+ return executor;
+}
+
+
+void Framework::destroyExecutor(const ExecutorID& executorId)
+{
+ if (executors.contains(executorId)) {
+ Executor* executor = executors[executorId];
+ executors.erase(executorId);
+
+ // Pass ownership of the executor pointer.
+ completedExecutors.push_back(Owned<Executor>(executor));
+ }
+}
+
+
+Executor* Framework::getExecutor(const ExecutorID& executorId)
+{
+ if (executors.contains(executorId)) {
+ return executors[executorId];
+ }
+
+ return NULL;
+}
+
+
+Executor* Framework::getExecutor(const TaskID& taskId)
+{
+ foreachvalue (Executor* executor, executors) {
+ if (executor->queuedTasks.contains(taskId) ||
+ executor->launchedTasks.contains(taskId) ||
+ executor->updates.contains(taskId)) {
+ return executor;
+ }
+ }
+ return NULL;
+}
+
+
+Executor* Framework::recoverExecutor(const ExecutorState& state)
+{
+ LOG(INFO) << "Recovering executor '" << state.id
+ << "' of framework " << id;
+
+ if (state.info.isNone()) {
+ LOG(WARNING) << "Skipping recovery of executor '" << state.id
+ << "' of framework " << id
+ << " because its info cannot be recovered";
+ return NULL;
+ }
+
+ if (state.latest.isNone()) {
+ LOG(WARNING) << "Skipping recovery of executor '" << state.id
+ << "' of framework " << id
+ << " because its latest run cannot be recovered";
+ return NULL;
+ }
+
+ // We are only interested in the latest run of the executor!
+ const UUID& uuid = state.latest.get();
+
+ // Create executor.
+ const string& directory = paths::getExecutorRunPath(
+ flags.work_dir, slaveId, id, state.id, uuid);
+
+ Executor* executor = new Executor(
+ slaveId, id, state.info.get(), uuid, directory, flags, info.checkpoint());
+
+ CHECK(state.runs.contains(uuid));
+ const RunState& run = state.runs.get(uuid).get();
+
+ // Recover the libprocess PID if possible.
+ if (run.libprocessPid.isSome()) {
+ // When recovering in unsafe mode, the assumption is that the
+ // slave can die after checkpointing the forked pid but before the
+ // libprocess pid. So, it is not possible for libprocess pid to
+ // exist but not forked pid. If so, it is a really bad situation
+ // (file corruption).
+ CHECK_SOME(run.forkedPid);
+ executor->pid = run.libprocessPid.get();
+ }
+
+ // And finally recover all the executor's tasks.
+ foreachvalue (const TaskState& taskState, run.tasks) {
+ executor->recoverTask(taskState);
+ }
+
+ // Add the executor to the framework.
+ executors[executor->id] = executor;
+
+ return executor;
+}
+
+
+Executor::Executor(
+ const SlaveID& _slaveId,
+ const FrameworkID& _frameworkId,
+ const ExecutorInfo& _info,
+ const UUID& _uuid,
+ const string& _directory,
+ const Flags& _flags,
+ bool _checkpoint)
+ : state(REGISTERING), // TODO(benh): Skipping INITIALIZING for now.
+ slaveId(_slaveId),
+ id(_info.executor_id()),
+ info(_info),
+ frameworkId(_frameworkId),
+ uuid(_uuid),
+ directory(_directory),
+ flags(_flags),
+ checkpoint(_checkpoint),
+ pid(UPID()),
+ resources(_info.resources()),
+ completedTasks(MAX_COMPLETED_TASKS_PER_EXECUTOR)
+{
+ if (checkpoint) {
+ // Checkpoint the executor info.
+ const string& path = paths::getExecutorInfoPath(
+ paths::getMetaRootDir(flags.work_dir),
+ slaveId,
+ frameworkId,
+ id);
+
+ CHECK_SOME(state::checkpoint(path, info));
+
+ // Create the meta executor directory.
+ // NOTE: This creates the 'latest' symlink in the meta directory.
+ paths::createExecutorDirectory(
+ paths::getMetaRootDir(flags.work_dir),
+ slaveId,
+ frameworkId,
+ id,
+ uuid);
+ }
+}
+
+
+Executor::~Executor()
+{
+ // Delete the tasks.
+ foreachvalue (Task* task, launchedTasks) {
+ delete task;
+ }
+}
+
+
+Task* Executor::addTask(const TaskInfo& task)
+{
+ // The master should enforce unique task IDs, but just in case
+ // maybe we shouldn't make this a fatal error.
+ CHECK(!launchedTasks.contains(task.task_id()));
+
+ Task* t = new Task(
+ protobuf::createTask(task, TASK_STAGING, id, frameworkId));
+
+ launchedTasks[task.task_id()] = t;
+ resources += task.resources();
+ return t;
+}
+
+
+void Executor::removeTask(const TaskID& taskId)
+{
+ // Remove the task if it's queued.
+ queuedTasks.erase(taskId);
+
+ // Update the resources if it's been launched.
+ if (launchedTasks.contains(taskId)) {
+ Task* task = launchedTasks[taskId];
+ foreach (const Resource& resource, task->resources()) {
+ resources -= resource;
+ }
+ launchedTasks.erase(taskId);
+
+ completedTasks.push_back(*task);
+
+ delete task;
+ }
+}
+
+
+void Executor::checkpointTask(const TaskInfo& task)
+{
+ if (checkpoint) {
+ const string& path = paths::getTaskInfoPath(
+ paths::getMetaRootDir(flags.work_dir),
+ slaveId,
+ frameworkId,
+ id,
+ uuid,
+ task.task_id());
+
+ const Task& t = protobuf::createTask(
+ task, TASK_STAGING, id, frameworkId);
+
+ CHECK_SOME(state::checkpoint(path, t));
+ }
+}
+
+
+void Executor::recoverTask(const TaskState& state)
+{
+ if (state.info.isNone()) {
+ LOG(WARNING) << "Skipping recovery of task " << state.id
+ << " because its info cannot be recovered";
+ return;
+ }
+
+ launchedTasks[state.id] = new Task(state.info.get());
+
+ // NOTE: Since some tasks might have been terminated when the
+ // slave was down, the executor resources we capture here is an
+ // upper-bound. The actual resources needed (for live tasks) by
+ // the isolator will be calculated when the executor re-registers.
+ resources += state.info.get().resources();
+
+ // Read updates to get the latest state of the task.
+ foreach (const StatusUpdate& update, state.updates) {
+ updateTaskState(state.id, update.status().state());
+ updates.put(state.id, UUID::fromBytes(update.uuid()));
+
+ // Remove the task if it received a terminal update.
+ if (protobuf::isTerminalState(update.status().state())) {
+ removeTask(state.id);
+
+ // If the terminal update has been acknowledged, remove it
+ // from pending tasks.
+ if (state.acks.contains(UUID::fromBytes(update.uuid()))) {
+ updates.remove(state.id, UUID::fromBytes(update.uuid()));
}
+ break;
}
}
}
+
+void Executor::updateTaskState(const TaskID& taskId, TaskState state)
+{
+ if (launchedTasks.contains(taskId)) {
+ launchedTasks[taskId]->set_state(state);
+ }
+}
+
+
} // namespace slave {
} // namespace internal {
} // namespace mesos {
Modified: incubator/mesos/trunk/src/slave/slave.hpp
URL: http://svn.apache.org/viewvc/incubator/mesos/trunk/src/slave/slave.hpp?rev=1468770&r1=1468769&r2=1468770&view=diff
==============================================================================
--- incubator/mesos/trunk/src/slave/slave.hpp (original)
+++ incubator/mesos/trunk/src/slave/slave.hpp Wed Apr 17 07:08:40 2013
@@ -309,147 +309,21 @@ private:
// Information describing an executor.
struct Executor
{
- Executor(const SlaveID& _slaveId,
- const FrameworkID& _frameworkId,
- const ExecutorInfo& _info,
- const UUID& _uuid,
- const std::string& _directory,
- const Flags& _flags,
- bool _checkpoint)
- : state(REGISTERING), // TODO(benh): Skipping INITIALIZING for now.
- slaveId(_slaveId),
- id(_info.executor_id()),
- info(_info),
- frameworkId(_frameworkId),
- uuid(_uuid),
- directory(_directory),
- flags(_flags),
- checkpoint(_checkpoint),
- pid(UPID()),
- resources(_info.resources()),
- completedTasks(MAX_COMPLETED_TASKS_PER_EXECUTOR)
- {
- if (checkpoint) {
- // Checkpoint the executor info.
- const std::string& path = paths::getExecutorInfoPath(
- paths::getMetaRootDir(flags.work_dir),
- slaveId,
- frameworkId,
- id);
-
- CHECK_SOME(state::checkpoint(path, info));
-
- // Create the meta executor directory.
- // NOTE: This creates the 'latest' symlink in the meta directory.
- paths::createExecutorDirectory(
- paths::getMetaRootDir(flags.work_dir),
- slaveId,
- frameworkId,
- id,
- uuid);
- }
- }
-
- ~Executor()
- {
- // Delete the tasks.
- foreachvalue (Task* task, launchedTasks) {
- delete task;
- }
- }
-
- Task* addTask(const TaskInfo& task)
- {
- // The master should enforce unique task IDs, but just in case
- // maybe we shouldn't make this a fatal error.
- CHECK(!launchedTasks.contains(task.task_id()));
-
- Task* t = new Task(
- protobuf::createTask(task, TASK_STAGING, id, frameworkId));
-
- launchedTasks[task.task_id()] = t;
- resources += task.resources();
- return t;
- }
-
- void removeTask(const TaskID& taskId)
- {
- // Remove the task if it's queued.
- queuedTasks.erase(taskId);
-
- // Update the resources if it's been launched.
- if (launchedTasks.contains(taskId)) {
- Task* task = launchedTasks[taskId];
- foreach (const Resource& resource, task->resources()) {
- resources -= resource;
- }
- launchedTasks.erase(taskId);
-
- completedTasks.push_back(*task);
-
- delete task;
- }
- }
-
- void checkpointTask(const TaskInfo& task)
- {
- if (checkpoint) {
- const std::string& path = paths::getTaskInfoPath(
- paths::getMetaRootDir(flags.work_dir),
- slaveId,
- frameworkId,
- id,
- uuid,
- task.task_id());
-
- const Task& t = protobuf::createTask(
- task, TASK_STAGING, id, frameworkId);
-
- CHECK_SOME(state::checkpoint(path, t));
- }
- }
-
- void recoverTask(const state::TaskState& state)
- {
- if (state.info.isNone()) {
- LOG(WARNING) << "Skipping recovery of task " << state.id
- << " because its info cannot be recovered";
- return;
- }
-
- launchedTasks[state.id] = new Task(state.info.get());
-
- // NOTE: Since some tasks might have been terminated when the
- // slave was down, the executor resources we capture here is an
- // upper-bound. The actual resources needed (for live tasks) by
- // the isolator will be calculated when the executor re-registers.
- resources += state.info.get().resources();
-
- // Read updates to get the latest state of the task.
- foreach (const StatusUpdate& update, state.updates) {
- updateTaskState(state.id, update.status().state());
- updates.put(state.id, UUID::fromBytes(update.uuid()));
-
- // Remove the task if it received a terminal update.
- if (protobuf::isTerminalState(update.status().state())) {
- removeTask(state.id);
-
- // If the terminal update has been acknowledged, remove it
- // from pending tasks.
- if (state.acks.contains(UUID::fromBytes(update.uuid()))) {
- updates.remove(state.id, UUID::fromBytes(update.uuid()));
- }
- break;
- }
- }
- }
-
- void updateTaskState(const TaskID& taskId, TaskState state)
- {
- if (launchedTasks.contains(taskId)) {
- launchedTasks[taskId]->set_state(state);
- }
- }
+ Executor(
+ const SlaveID& _slaveId,
+ const FrameworkID& _frameworkId,
+ const ExecutorInfo& _info,
+ const UUID& _uuid,
+ const std::string& _directory,
+ const Flags& _flags,
+ bool _checkpoint);
+ ~Executor();
+
+ Task* addTask(const TaskInfo& task);
+ void removeTask(const TaskID& taskId);
+ void checkpointTask(const TaskInfo& task);
+ void recoverTask(const state::TaskState& state);
+ void updateTaskState(const TaskID& taskId, TaskState state);
enum {
INITIALIZING,
@@ -494,212 +368,23 @@ private:
// Information about a framework.
struct Framework
{
- Framework(const SlaveID& _slaveId,
- const FrameworkID& _id,
- const FrameworkInfo& _info,
- const UPID& _pid,
- const Flags& _flags)
- : state(RUNNING), // TODO(benh): Skipping INITIALIZING for now.
- slaveId(_slaveId),
- id(_id),
- info(_info),
- pid(_pid),
- flags(_flags),
- completedExecutors(MAX_COMPLETED_EXECUTORS_PER_FRAMEWORK)
- {
- if (info.checkpoint()) {
- // Checkpoint the framework info.
- std::string path = paths::getFrameworkInfoPath(
- paths::getMetaRootDir(flags.work_dir),
- slaveId,
- id);
-
- CHECK_SOME(state::checkpoint(path, info));
-
- // Checkpoint the framework pid.
- path = paths::getFrameworkPidPath(
- paths::getMetaRootDir(flags.work_dir),
- slaveId,
- id);
-
- CHECK_SOME(state::checkpoint(path, pid));
- }
- }
-
- ~Framework()
- {
- // We own the non-completed executor pointers, so they need to be deleted.
- foreachvalue (Executor* executor, executors) {
- delete executor;
- }
- }
+ Framework(
+ const SlaveID& _slaveId,
+ const FrameworkID& _id,
+ const FrameworkInfo& _info,
+ const UPID& _pid,
+ const Flags& _flags);
+
+ ~Framework();
// Returns an ExecutorInfo for a TaskInfo (possibly
// constructing one if the task has a CommandInfo).
- ExecutorInfo getExecutorInfo(const TaskInfo& task)
- {
- CHECK(task.has_executor() != task.has_command());
-
- if (task.has_command()) {
- ExecutorInfo executor;
-
- // Command executors share the same id as the task.
- executor.mutable_executor_id()->set_value(task.task_id().value());
-
- // Prepare an executor name which includes information on the
- // command being launched.
- std::string name =
- "(Task: " + task.task_id().value() + ") " + "(Command: sh -c '";
- if (task.command().value().length() > 15) {
- name += task.command().value().substr(0, 12) + "...')";
- } else {
- name += task.command().value() + "')";
- }
-
- executor.set_name("Command Executor " + name);
- executor.set_source(task.task_id().value());
-
- // Copy the CommandInfo to get the URIs and environment, but
- // update it to invoke 'mesos-executor' (unless we couldn't
- // resolve 'mesos-executor' via 'realpath', in which case just
- // echo the error and exit).
- executor.mutable_command()->MergeFrom(task.command());
-
- Try<std::string> path = os::realpath(
- path::join(flags.launcher_dir, "mesos-executor"));
-
- if (path.isSome()) {
- executor.mutable_command()->set_value(path.get());
- } else {
- executor.mutable_command()->set_value(
- "echo '" + path.error() + "'; exit 1");
- }
-
- // TODO(benh): Set some resources for the executor so that a task
- // doesn't end up getting killed because the amount of resources of
- // the executor went over those allocated. Note that this might mean
- // that the number of resources on the machine will actually be
- // slightly oversubscribed, so we'll need to reevaluate with respect
- // to resources that can't be oversubscribed.
- return executor;
- }
-
- return task.executor();
- }
-
- Executor* createExecutor(const ExecutorInfo& executorInfo)
- {
- // We create a UUID for the new executor. The UUID uniquely
- // identifies this new instance of the executor across executors
- // sharing the same executorID that may have previously run. It
- // also provides a means for the executor to have a unique
- // directory.
- UUID uuid = UUID::random();
-
- // Create a directory for the executor.
- const std::string& directory = paths::createExecutorDirectory(
- flags.work_dir, slaveId, id, executorInfo.executor_id(), uuid);
-
- Executor* executor = new Executor(
- slaveId,
- id,
- executorInfo,
- uuid,
- directory,
- flags,
- info.checkpoint());
-
- CHECK(!executors.contains(executorInfo.executor_id()));
- executors[executorInfo.executor_id()] = executor;
- return executor;
- }
-
- void destroyExecutor(const ExecutorID& executorId)
- {
- if (executors.contains(executorId)) {
- Executor* executor = executors[executorId];
- executors.erase(executorId);
-
- // Pass ownership of the executor pointer.
- completedExecutors.push_back(Owned<Executor>(executor));
- }
- }
-
- Executor* getExecutor(const ExecutorID& executorId)
- {
- if (executors.contains(executorId)) {
- return executors[executorId];
- }
-
- return NULL;
- }
-
- Executor* getExecutor(const TaskID& taskId)
- {
- foreachvalue (Executor* executor, executors) {
- if (executor->queuedTasks.contains(taskId) ||
- executor->launchedTasks.contains(taskId) ||
- executor->updates.contains(taskId)) {
- return executor;
- }
- }
- return NULL;
- }
-
- Executor* recoverExecutor(const state::ExecutorState& state)
- {
- LOG(INFO) << "Recovering executor '" << state.id
- << "' of framework " << id;
-
- if (state.info.isNone()) {
- LOG(WARNING) << "Skipping recovery of executor '" << state.id
- << "' of framework " << id
- << " because its info cannot be recovered";
- return NULL;
- }
-
- if (state.latest.isNone()) {
- LOG(WARNING) << "Skipping recovery of executor '" << state.id
- << "' of framework " << id
- << " because its latest run cannot be recovered";
- return NULL;
- }
-
- // We are only interested in the latest run of the executor!
- const UUID& uuid = state.latest.get();
-
- // Create executor.
- const std::string& directory = paths::getExecutorRunPath(
- flags.work_dir, slaveId, id, state.id, uuid);
-
- Executor* executor = new Executor(
- slaveId,
- id,
- state.info.get(),
- uuid,
- directory,
- flags,
- info.checkpoint());
-
- CHECK(state.runs.contains(uuid));
- const state::RunState& run = state.runs.get(uuid).get();
-
- // Recover the libprocess PID if possible.
- if (run.libprocessPid.isSome()) {
- CHECK_SOME(run.forkedPid); // TODO(vinod): Why this check?
- executor->pid = run.libprocessPid.get();
- }
-
- // And finally recover all the executor's tasks.
- foreachvalue (const state::TaskState& taskState, run.tasks) {
- executor->recoverTask(taskState);
- }
-
- // Add the executor to the framework.
- executors[executor->id] = executor;
-
- return executor;
- }
+ ExecutorInfo getExecutorInfo(const TaskInfo& task);
+ Executor* createExecutor(const ExecutorInfo& executorInfo);
+ void destroyExecutor(const ExecutorID& executorId);
+ Executor* getExecutor(const ExecutorID& executorId);
+ Executor* getExecutor(const TaskID& taskId);
+ Executor* recoverExecutor(const state::ExecutorState& state);
enum {
INITIALIZING,