You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@mesos.apache.org by ji...@apache.org on 2014/10/10 23:11:17 UTC

git commit: Moved executor checkpointing code from the Executor constructor.

Repository: mesos
Updated Branches:
  refs/heads/master 07bc6734c -> 2d78336ea


Moved executor checkpointing code from the Executor constructor.

Review: https://reviews.apache.org/r/26525


Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/2d78336e
Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/2d78336e
Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/2d78336e

Branch: refs/heads/master
Commit: 2d78336ea7e484d1bf45679a31de9ee263332b9d
Parents: 07bc673
Author: Jie Yu <yu...@gmail.com>
Authored: Thu Oct 9 14:14:15 2014 -0700
Committer: Jie Yu <yu...@gmail.com>
Committed: Fri Oct 10 13:49:26 2014 -0700

----------------------------------------------------------------------
 src/slave/slave.cpp | 76 +++++++++++++++++++++++++++---------------------
 src/slave/slave.hpp |  1 +
 2 files changed, 44 insertions(+), 33 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/mesos/blob/2d78336e/src/slave/slave.cpp
----------------------------------------------------------------------
diff --git a/src/slave/slave.cpp b/src/slave/slave.cpp
index f677adf..2b55050 100644
--- a/src/slave/slave.cpp
+++ b/src/slave/slave.cpp
@@ -1292,9 +1292,10 @@ void Slave::_runTask(
       break;
     }
     case Executor::REGISTERING:
-      // Checkpoint the task before we do anything else (this is a no-op
-      // if the framework doesn't have checkpointing enabled).
-      executor->checkpointTask(task);
+      // Checkpoint the task before we do anything else.
+      if (executor->checkpoint) {
+        executor->checkpointTask(task);
+      }
 
       stats.tasks[TASK_STAGING]++;
 
@@ -1306,9 +1307,10 @@ void Slave::_runTask(
       executor->queuedTasks[task.task_id()] = task;
       break;
     case Executor::RUNNING: {
-      // Checkpoint the task before we do anything else (this is a no-op
-      // if the framework doesn't have checkpointing enabled).
-      executor->checkpointTask(task);
+      // Checkpoint the task before we do anything else.
+      if (executor->checkpoint) {
+        executor->checkpointTask(task);
+      }
 
       stats.tasks[TASK_STAGING]++;
 
@@ -3719,6 +3721,10 @@ Executor* Framework::launchExecutor(
   Executor* executor = new Executor(
       slave, id, executorInfo, containerId, directory, info.checkpoint());
 
+  if (executor->checkpoint) {
+    executor->checkpointExecutor();
+  }
+
   CHECK(!executors.contains(executorInfo.executor_id()))
     << "Unknown executor " << executorInfo.executor_id();
 
@@ -3994,20 +4000,6 @@ Executor::Executor(
     commandExecutor =
       strings::contains(info.command().value(), executorPath.get());
   }
-
-  if (checkpoint && slave->state != slave->RECOVERING) {
-    // Checkpoint the executor info.
-    const string& path = paths::getExecutorInfoPath(
-        slave->metaDir, slave->info.id(), frameworkId, id);
-
-    VLOG(1) << "Checkpointing ExecutorInfo to '" << path << "'";
-    CHECK_SOME(state::checkpoint(path, info));
-
-    // Create the meta executor directory.
-    // NOTE: This creates the 'latest' symlink in the meta directory.
-    paths::createExecutorDirectory(
-        slave->metaDir, slave->info.id(), frameworkId, id, containerId);
-  }
 }
 
 
@@ -4096,23 +4088,41 @@ void Executor::completeTask(const TaskID& taskId)
 }
 
 
+void Executor::checkpointExecutor()
+{
+  CHECK(checkpoint);
+
+  CHECK_NE(slave->state, slave->RECOVERING);
+
+  // Checkpoint the executor info.
+  const string& path = paths::getExecutorInfoPath(
+      slave->metaDir, slave->info.id(), frameworkId, id);
+
+  VLOG(1) << "Checkpointing ExecutorInfo to '" << path << "'";
+  CHECK_SOME(state::checkpoint(path, info));
+
+  // Create the meta executor directory.
+  // NOTE: This creates the 'latest' symlink in the meta directory.
+  paths::createExecutorDirectory(
+      slave->metaDir, slave->info.id(), frameworkId, id, containerId);
+}
+
+
 void Executor::checkpointTask(const TaskInfo& task)
 {
-  if (checkpoint) {
-    CHECK_NOTNULL(slave);
+  CHECK(checkpoint);
 
-    const Task& t = protobuf::createTask(task, TASK_STAGING, frameworkId);
-    const string& path = paths::getTaskInfoPath(
-        slave->metaDir,
-        slave->info.id(),
-        frameworkId,
-        id,
-        containerId,
-        t.task_id());
+  const Task& t = protobuf::createTask(task, TASK_STAGING, frameworkId);
+  const string& path = paths::getTaskInfoPath(
+      slave->metaDir,
+      slave->info.id(),
+      frameworkId,
+      id,
+      containerId,
+      t.task_id());
 
-    VLOG(1) << "Checkpointing TaskInfo to '" << path << "'";
-    CHECK_SOME(state::checkpoint(path, t));
-  }
+  VLOG(1) << "Checkpointing TaskInfo to '" << path << "'";
+  CHECK_SOME(state::checkpoint(path, t));
 }
 
 

http://git-wip-us.apache.org/repos/asf/mesos/blob/2d78336e/src/slave/slave.hpp
----------------------------------------------------------------------
diff --git a/src/slave/slave.hpp b/src/slave/slave.hpp
index cb879cb..efd309e 100644
--- a/src/slave/slave.hpp
+++ b/src/slave/slave.hpp
@@ -500,6 +500,7 @@ struct Executor
   Task* addTask(const TaskInfo& task);
   void terminateTask(const TaskID& taskId, const mesos::TaskState& state);
   void completeTask(const TaskID& taskId);
+  void checkpointExecutor();
   void checkpointTask(const TaskInfo& task);
   void recoverTask(const state::TaskState& state);
   void updateTaskState(const TaskStatus& status);