You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@mesos.apache.org by mp...@apache.org on 2017/03/03 11:46:02 UTC
[3/3] mesos git commit: Re-checkpointed the `Executor`s and `Task`s
during agent recovery.
Re-checkpointed the `Executor`s and `Task`s during agent recovery.
Review: https://reviews.apache.org/r/57109
Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/ed520374
Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/ed520374
Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/ed520374
Branch: refs/heads/master
Commit: ed52037457e471a26e00888bd11efda12c0593d2
Parents: 4912f34
Author: Michael Park <mp...@apache.org>
Authored: Mon Feb 13 14:38:49 2017 -0800
Committer: Michael Park <mp...@apache.org>
Committed: Fri Mar 3 03:39:59 2017 -0800
----------------------------------------------------------------------
src/slave/slave.cpp | 81 +++++++++++++++++++++++++++++++++++-------------
src/slave/slave.hpp | 17 ++++++++--
2 files changed, 73 insertions(+), 25 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/mesos/blob/ed520374/src/slave/slave.cpp
----------------------------------------------------------------------
diff --git a/src/slave/slave.cpp b/src/slave/slave.cpp
index 4db367c..775f43b 100644
--- a/src/slave/slave.cpp
+++ b/src/slave/slave.cpp
@@ -5295,6 +5295,7 @@ Future<Nothing> Slave::recover(const Try<state::State>& state)
const FrameworkInfo& frameworkInfo) {
set<string> roles = protobuf::framework::getRoles(frameworkInfo);
+ bool injectedAllocationInfo = false;
foreach (Resource& resource, *resources) {
if (!resource.has_allocation_info()) {
if (roles.size() != 1) {
@@ -5304,14 +5305,22 @@ Future<Nothing> Slave::recover(const Try<state::State>& state)
}
resource.mutable_allocation_info()->set_role(*roles.begin());
+ injectedAllocationInfo = true;
}
}
+
+ return injectedAllocationInfo;
};
- // TODO(bmahler): We currently don't allow frameworks to
- // change their roles so we do not need to re-persist the
- // resources with `AllocationInfo` injected for existing
- // tasks and executors.
+ // In order to allow frameworks to change their role(s), we need to keep
+ // track of the fact that the resources used to be implicitly allocated to
+ // `FrameworkInfo.role` before the agent upgrade. To this end, we inject
+ // the `AllocationInfo` to the resources in `ExecutorState` and `TaskState`,
+ // and re-checkpoint them if necessary.
+
+ hashset<ExecutorID> injectedExecutors;
+ hashmap<ExecutorID, hashset<TaskID>> injectedTasks;
+
if (slaveState.isSome()) {
foreachvalue (FrameworkState& frameworkState, slaveState->frameworks) {
if (!frameworkState.info.isSome()) {
@@ -5323,9 +5332,11 @@ Future<Nothing> Slave::recover(const Try<state::State>& state)
continue;
}
- injectAllocationInfo(
- executorState.info->mutable_resources(),
- frameworkState.info.get());
+ if (injectAllocationInfo(
+ executorState.info->mutable_resources(),
+ frameworkState.info.get())) {
+ injectedExecutors.insert(executorState.id);
+ }
foreachvalue (RunState& runState, executorState.runs) {
foreachvalue (TaskState& taskState, runState.tasks) {
@@ -5333,9 +5344,11 @@ Future<Nothing> Slave::recover(const Try<state::State>& state)
continue;
}
- injectAllocationInfo(
- taskState.info->mutable_resources(),
- frameworkState.info.get());
+ if (injectAllocationInfo(
+ taskState.info->mutable_resources(),
+ frameworkState.info.get())) {
+ injectedTasks[executorState.id].insert(taskState.id);
+ }
}
}
}
@@ -5452,7 +5465,7 @@ Future<Nothing> Slave::recover(const Try<state::State>& state)
// Recover the frameworks.
foreachvalue (const FrameworkState& frameworkState,
slaveState.get().frameworks) {
- recoverFramework(frameworkState);
+ recoverFramework(frameworkState, injectedExecutors, injectedTasks);
}
}
@@ -5638,7 +5651,10 @@ void Slave::__recover(const Future<Nothing>& future)
}
-void Slave::recoverFramework(const FrameworkState& state)
+void Slave::recoverFramework(
+ const FrameworkState& state,
+ const hashset<ExecutorID>& executorsToRecheckpoint,
+ const hashmap<ExecutorID, hashset<TaskID>>& tasksToRecheckpoint)
{
LOG(INFO) << "Recovering framework " << state.id;
@@ -5692,7 +5708,12 @@ void Slave::recoverFramework(const FrameworkState& state)
// Now recover the executors for this framework.
foreachvalue (const ExecutorState& executorState, state.executors) {
- framework->recoverExecutor(executorState);
+ framework->recoverExecutor(
+ executorState,
+ executorsToRecheckpoint.contains(executorState.id),
+ tasksToRecheckpoint.contains(executorState.id)
+ ? tasksToRecheckpoint.at(executorState.id)
+ : hashset<TaskID>{});
}
// Remove the framework in case we didn't recover any executors.
@@ -6642,7 +6663,10 @@ Executor* Slave::getExecutor(const ContainerID& containerId) const
}
-void Framework::recoverExecutor(const ExecutorState& state)
+void Framework::recoverExecutor(
+ const ExecutorState& state,
+ bool recheckpointExecutor,
+ const hashset<TaskID>& tasksToRecheckpoint)
{
LOG(INFO) << "Recovering executor '" << state.id
<< "' of framework " << id();
@@ -6738,7 +6762,9 @@ void Framework::recoverExecutor(const ExecutorState& state)
// And finally recover all the executor's tasks.
foreachvalue (const TaskState& taskState, run.get().tasks) {
- executor->recoverTask(taskState);
+ executor->recoverTask(
+ taskState,
+ tasksToRecheckpoint.contains(taskState.id));
}
ExecutorID executorId = state.id;
@@ -6762,6 +6788,9 @@ void Framework::recoverExecutor(const ExecutorState& state)
// Add the executor to the framework.
executors[executor->id] = executor;
+ if (recheckpointExecutor) {
+ executor->checkpointExecutor();
+ }
// If the latest run of the executor was completed (i.e., terminated
// and all updates are acknowledged) in the previous run, we
@@ -6890,8 +6919,6 @@ void Executor::checkpointExecutor()
{
CHECK(checkpoint);
- CHECK_NE(slave->state, slave->RECOVERING);
-
// Checkpoint the executor info.
const string path = paths::getExecutorInfoPath(
slave->metaDir, slave->info.id(), frameworkId, id);
@@ -6908,23 +6935,28 @@ void Executor::checkpointExecutor()
void Executor::checkpointTask(const TaskInfo& task)
{
+ checkpointTask(protobuf::createTask(task, TASK_STAGING, frameworkId));
+}
+
+
+void Executor::checkpointTask(const Task& task)
+{
CHECK(checkpoint);
- const Task t = protobuf::createTask(task, TASK_STAGING, frameworkId);
const string path = paths::getTaskInfoPath(
slave->metaDir,
slave->info.id(),
frameworkId,
id,
containerId,
- t.task_id());
+ task.task_id());
VLOG(1) << "Checkpointing TaskInfo to '" << path << "'";
- CHECK_SOME(state::checkpoint(path, t));
+ CHECK_SOME(state::checkpoint(path, task));
}
-void Executor::recoverTask(const TaskState& state)
+void Executor::recoverTask(const TaskState& state, bool recheckpointTask)
{
if (state.info.isNone()) {
LOG(WARNING) << "Skipping recovery of task " << state.id
@@ -6938,7 +6970,12 @@ void Executor::recoverTask(const TaskState& state)
CHECK(resource.has_allocation_info());
}
- launchedTasks[state.id] = new Task(state.info.get());
+ Task* task = new Task(state.info.get());
+ if (recheckpointTask) {
+ checkpointTask(*task);
+ }
+
+ launchedTasks[state.id] = task;
// NOTE: Since some tasks might have been terminated when the
// slave was down, the executor resources we capture here is an
http://git-wip-us.apache.org/repos/asf/mesos/blob/ed520374/src/slave/slave.hpp
----------------------------------------------------------------------
diff --git a/src/slave/slave.hpp b/src/slave/slave.hpp
index e6fac20..857338c 100644
--- a/src/slave/slave.hpp
+++ b/src/slave/slave.hpp
@@ -411,7 +411,10 @@ public:
virtual void __recover(const process::Future<Nothing>& future);
// Helper to recover a framework from the specified state.
- void recoverFramework(const state::FrameworkState& state);
+ void recoverFramework(
+ const state::FrameworkState& state,
+ const hashset<ExecutorID>& executorsToRecheckpoint,
+ const hashmap<ExecutorID, hashset<TaskID>>& tasksToRecheckpoint);
// Removes and garbage collects the executor.
void removeExecutor(Framework* framework, Executor* executor);
@@ -890,7 +893,10 @@ struct Executor
void completeTask(const TaskID& taskId);
void checkpointExecutor();
void checkpointTask(const TaskInfo& task);
- void recoverTask(const state::TaskState& state);
+ void checkpointTask(const Task& task);
+
+ void recoverTask(const state::TaskState& state, bool recheckpointTask);
+
Try<Nothing> updateTaskState(const TaskStatus& status);
// Returns true if there are any queued/launched/terminated tasks.
@@ -1050,7 +1056,12 @@ struct Framework
void destroyExecutor(const ExecutorID& executorId);
Executor* getExecutor(const ExecutorID& executorId) const;
Executor* getExecutor(const TaskID& taskId) const;
- void recoverExecutor(const state::ExecutorState& state);
+
+ void recoverExecutor(
+ const state::ExecutorState& state,
+ bool recheckpointExecutor,
+ const hashset<TaskID>& tasksToRecheckpoint);
+
void checkpointFramework() const;
const FrameworkID id() const { return info.id(); }