You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@mesos.apache.org by ji...@apache.org on 2014/10/10 23:11:17 UTC
git commit: Moved executor checkpointing code from the Executor
constructor.
Repository: mesos
Updated Branches:
refs/heads/master 07bc6734c -> 2d78336ea
Moved executor checkpointing code from the Executor constructor.
Review: https://reviews.apache.org/r/26525
Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/2d78336e
Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/2d78336e
Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/2d78336e
Branch: refs/heads/master
Commit: 2d78336ea7e484d1bf45679a31de9ee263332b9d
Parents: 07bc673
Author: Jie Yu <yu...@gmail.com>
Authored: Thu Oct 9 14:14:15 2014 -0700
Committer: Jie Yu <yu...@gmail.com>
Committed: Fri Oct 10 13:49:26 2014 -0700
----------------------------------------------------------------------
src/slave/slave.cpp | 76 +++++++++++++++++++++++++++---------------------
src/slave/slave.hpp | 1 +
2 files changed, 44 insertions(+), 33 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/mesos/blob/2d78336e/src/slave/slave.cpp
----------------------------------------------------------------------
diff --git a/src/slave/slave.cpp b/src/slave/slave.cpp
index f677adf..2b55050 100644
--- a/src/slave/slave.cpp
+++ b/src/slave/slave.cpp
@@ -1292,9 +1292,10 @@ void Slave::_runTask(
break;
}
case Executor::REGISTERING:
- // Checkpoint the task before we do anything else (this is a no-op
- // if the framework doesn't have checkpointing enabled).
- executor->checkpointTask(task);
+ // Checkpoint the task before we do anything else.
+ if (executor->checkpoint) {
+ executor->checkpointTask(task);
+ }
stats.tasks[TASK_STAGING]++;
@@ -1306,9 +1307,10 @@ void Slave::_runTask(
executor->queuedTasks[task.task_id()] = task;
break;
case Executor::RUNNING: {
- // Checkpoint the task before we do anything else (this is a no-op
- // if the framework doesn't have checkpointing enabled).
- executor->checkpointTask(task);
+ // Checkpoint the task before we do anything else.
+ if (executor->checkpoint) {
+ executor->checkpointTask(task);
+ }
stats.tasks[TASK_STAGING]++;
@@ -3719,6 +3721,10 @@ Executor* Framework::launchExecutor(
Executor* executor = new Executor(
slave, id, executorInfo, containerId, directory, info.checkpoint());
+ if (executor->checkpoint) {
+ executor->checkpointExecutor();
+ }
+
CHECK(!executors.contains(executorInfo.executor_id()))
<< "Unknown executor " << executorInfo.executor_id();
@@ -3994,20 +4000,6 @@ Executor::Executor(
commandExecutor =
strings::contains(info.command().value(), executorPath.get());
}
-
- if (checkpoint && slave->state != slave->RECOVERING) {
- // Checkpoint the executor info.
- const string& path = paths::getExecutorInfoPath(
- slave->metaDir, slave->info.id(), frameworkId, id);
-
- VLOG(1) << "Checkpointing ExecutorInfo to '" << path << "'";
- CHECK_SOME(state::checkpoint(path, info));
-
- // Create the meta executor directory.
- // NOTE: This creates the 'latest' symlink in the meta directory.
- paths::createExecutorDirectory(
- slave->metaDir, slave->info.id(), frameworkId, id, containerId);
- }
}
@@ -4096,23 +4088,41 @@ void Executor::completeTask(const TaskID& taskId)
}
+void Executor::checkpointExecutor()
+{
+ CHECK(checkpoint);
+
+ CHECK_NE(slave->state, slave->RECOVERING);
+
+ // Checkpoint the executor info.
+ const string& path = paths::getExecutorInfoPath(
+ slave->metaDir, slave->info.id(), frameworkId, id);
+
+ VLOG(1) << "Checkpointing ExecutorInfo to '" << path << "'";
+ CHECK_SOME(state::checkpoint(path, info));
+
+ // Create the meta executor directory.
+ // NOTE: This creates the 'latest' symlink in the meta directory.
+ paths::createExecutorDirectory(
+ slave->metaDir, slave->info.id(), frameworkId, id, containerId);
+}
+
+
void Executor::checkpointTask(const TaskInfo& task)
{
- if (checkpoint) {
- CHECK_NOTNULL(slave);
+ CHECK(checkpoint);
- const Task& t = protobuf::createTask(task, TASK_STAGING, frameworkId);
- const string& path = paths::getTaskInfoPath(
- slave->metaDir,
- slave->info.id(),
- frameworkId,
- id,
- containerId,
- t.task_id());
+ const Task& t = protobuf::createTask(task, TASK_STAGING, frameworkId);
+ const string& path = paths::getTaskInfoPath(
+ slave->metaDir,
+ slave->info.id(),
+ frameworkId,
+ id,
+ containerId,
+ t.task_id());
- VLOG(1) << "Checkpointing TaskInfo to '" << path << "'";
- CHECK_SOME(state::checkpoint(path, t));
- }
+ VLOG(1) << "Checkpointing TaskInfo to '" << path << "'";
+ CHECK_SOME(state::checkpoint(path, t));
}
http://git-wip-us.apache.org/repos/asf/mesos/blob/2d78336e/src/slave/slave.hpp
----------------------------------------------------------------------
diff --git a/src/slave/slave.hpp b/src/slave/slave.hpp
index cb879cb..efd309e 100644
--- a/src/slave/slave.hpp
+++ b/src/slave/slave.hpp
@@ -500,6 +500,7 @@ struct Executor
Task* addTask(const TaskInfo& task);
void terminateTask(const TaskID& taskId, const mesos::TaskState& state);
void completeTask(const TaskID& taskId);
+ void checkpointExecutor();
void checkpointTask(const TaskInfo& task);
void recoverTask(const state::TaskState& state);
void updateTaskState(const TaskStatus& status);