You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@mesos.apache.org by vi...@apache.org on 2013/04/17 09:08:36 UTC

svn commit: r1468769 - in /incubator/mesos/trunk/src/slave: cgroups_isolator.cpp cgroups_isolator.hpp http.cpp isolator.hpp process_isolator.cpp process_isolator.hpp slave.cpp slave.hpp

Author: vinodkone
Date: Wed Apr 17 07:08:36 2013
New Revision: 1468769

URL: http://svn.apache.org/r1468769
Log:
Refactored slave::Framework/Executor. Included "states" as well as
added recovery methods.

Review: https://reviews.apache.org/r/10111

Modified:
    incubator/mesos/trunk/src/slave/cgroups_isolator.cpp
    incubator/mesos/trunk/src/slave/cgroups_isolator.hpp
    incubator/mesos/trunk/src/slave/http.cpp
    incubator/mesos/trunk/src/slave/isolator.hpp
    incubator/mesos/trunk/src/slave/process_isolator.cpp
    incubator/mesos/trunk/src/slave/process_isolator.hpp
    incubator/mesos/trunk/src/slave/slave.cpp
    incubator/mesos/trunk/src/slave/slave.hpp

Modified: incubator/mesos/trunk/src/slave/cgroups_isolator.cpp
URL: http://svn.apache.org/viewvc/incubator/mesos/trunk/src/slave/cgroups_isolator.cpp?rev=1468769&r1=1468768&r2=1468769&view=diff
==============================================================================
--- incubator/mesos/trunk/src/slave/cgroups_isolator.cpp (original)
+++ incubator/mesos/trunk/src/slave/cgroups_isolator.cpp Wed Apr 17 07:08:36 2013
@@ -507,15 +507,10 @@ void CgroupsIsolator::launchExecutor(
     const ExecutorInfo& executorInfo,
     const UUID& uuid,
     const string& directory,
-    const Resources& resources,
-    const Option<string>& path)
+    const Resources& resources)
 {
   CHECK(initialized) << "Cannot launch executors before initialization";
 
-  bool checkpoint = frameworkInfo.checkpoint();
-  CHECK(!(checkpoint && path.isNone()))
-    << "Asked to checkpoint forked pid without providing a path";
-
   const ExecutorID& executorId = executorInfo.executor_id();
 
   // Register the cgroup information.
@@ -593,9 +588,16 @@ void CgroupsIsolator::launchExecutor(
     // have a chance to write the pid to disk. That would result in an
     // orphaned executor process unknown to the slave when doing
     // recovery.
-    if (checkpoint) {
+    if (frameworkInfo.checkpoint()) {
+      const string& path = paths::getForkedPidPath(
+          paths::getMetaRootDir(flags.work_dir),
+          slaveId,
+          frameworkId,
+          executorId,
+          uuid);
+
       std::cout << "Checkpointing forked pid " << getpid() << std::endl;
-      state::checkpoint(path.get(), stringify(getpid()));
+      state::checkpoint(path, stringify(getpid()));
     }
 
     // Put self into the newly created cgroup.

Modified: incubator/mesos/trunk/src/slave/cgroups_isolator.hpp
URL: http://svn.apache.org/viewvc/incubator/mesos/trunk/src/slave/cgroups_isolator.hpp?rev=1468769&r1=1468768&r2=1468769&view=diff
==============================================================================
--- incubator/mesos/trunk/src/slave/cgroups_isolator.hpp (original)
+++ incubator/mesos/trunk/src/slave/cgroups_isolator.hpp Wed Apr 17 07:08:36 2013
@@ -105,8 +105,7 @@ public:
       const ExecutorInfo& executorInfo,
       const UUID& uuid,
       const std::string& directory,
-      const Resources& resources,
-      const Option<std::string>& path);
+      const Resources& resources);
 
   virtual void killExecutor(
       const FrameworkID& frameworkId,

Modified: incubator/mesos/trunk/src/slave/http.cpp
URL: http://svn.apache.org/viewvc/incubator/mesos/trunk/src/slave/http.cpp?rev=1468769&r1=1468768&r2=1468769&view=diff
==============================================================================
--- incubator/mesos/trunk/src/slave/http.cpp (original)
+++ incubator/mesos/trunk/src/slave/http.cpp Wed Apr 17 07:08:36 2013
@@ -236,8 +236,7 @@ JSON::Object model(const Framework& fram
   object.values["executors"] = executors;
 
   JSON::Array completedExecutors;
-  foreach (const std::tr1::shared_ptr<Executor>& executor,
-           framework.completedExecutors) {
+  foreach (const Owned<Executor>& executor, framework.completedExecutors) {
     completedExecutors.values.push_back(model(*executor));
   }
   object.values["completed_executors"] = completedExecutors;
@@ -334,8 +333,7 @@ Future<Response> state(
   object.values["frameworks"] = frameworks;
 
   JSON::Array completedFrameworks;
-  foreach (const std::tr1::shared_ptr<Framework>& framework,
-           slave.completedFrameworks) {
+  foreach (const Owned<Framework>& framework, slave.completedFrameworks) {
     completedFrameworks.values.push_back(model(*framework));
   }
   object.values["completed_frameworks"] = completedFrameworks;

Modified: incubator/mesos/trunk/src/slave/isolator.hpp
URL: http://svn.apache.org/viewvc/incubator/mesos/trunk/src/slave/isolator.hpp?rev=1468769&r1=1468768&r2=1468769&view=diff
==============================================================================
--- incubator/mesos/trunk/src/slave/isolator.hpp (original)
+++ incubator/mesos/trunk/src/slave/isolator.hpp Wed Apr 17 07:08:36 2013
@@ -69,14 +69,13 @@ public:
   // If 'checkpoint' is true, the isolator is expected to checkpoint
   // the executor pid to the 'path'.
   virtual void launchExecutor(
-      const SlaveID& slaveId,
+      const SlaveID& slaveId, // TODO(vinod): Why not pass this to initialize?
       const FrameworkID& frameworkId,
       const FrameworkInfo& frameworkInfo,
       const ExecutorInfo& executorInfo,
       const UUID& uuid,
       const std::string& directory,
-      const Resources& resources,
-      const Option<std::string>& path) = 0;
+      const Resources& resources) = 0;
 
   // Terminate a framework's executor, if it is still running.
   // The executor is expected to be gone after this method exits.

Modified: incubator/mesos/trunk/src/slave/process_isolator.cpp
URL: http://svn.apache.org/viewvc/incubator/mesos/trunk/src/slave/process_isolator.cpp?rev=1468769&r1=1468768&r2=1468769&view=diff
==============================================================================
--- incubator/mesos/trunk/src/slave/process_isolator.cpp (original)
+++ incubator/mesos/trunk/src/slave/process_isolator.cpp Wed Apr 17 07:08:36 2013
@@ -109,17 +109,12 @@ void ProcessIsolator::launchExecutor(
     const FrameworkID& frameworkId,
     const FrameworkInfo& frameworkInfo,
     const ExecutorInfo& executorInfo,
-    const UUID& _,
+    const UUID& uuid,
     const string& directory,
-    const Resources& resources,
-    const Option<string>& path)
+    const Resources& resources)
 {
   CHECK(initialized) << "Cannot launch executors before initialization!";
 
-  bool checkpoint = frameworkInfo.checkpoint();
-  CHECK(!(checkpoint && path.isNone()))
-    << "Asked to checkpoint forked pid without providing a path";
-
   const ExecutorID& executorId = executorInfo.executor_id();
 
   LOG(INFO) << "Launching " << executorId
@@ -206,9 +201,16 @@ void ProcessIsolator::launchExecutor(
     // have a chance to write the pid to disk. That would result in an
     // orphaned executor process unknown to the slave when doing
     // recovery.
-    if (checkpoint) {
+    if (frameworkInfo.checkpoint()) {
+      const string& path = paths::getForkedPidPath(
+          paths::getMetaRootDir(flags.work_dir),
+          slaveId,
+          frameworkId,
+          executorId,
+          uuid);
+
       std::cout << "Checkpointing forked pid " << getpid() << std::endl;
-      state::checkpoint(path.get(), stringify(getpid()));
+      state::checkpoint(path, stringify(getpid()));
     }
 
     ExecutorLauncher* launcher = createExecutorLauncher(

Modified: incubator/mesos/trunk/src/slave/process_isolator.hpp
URL: http://svn.apache.org/viewvc/incubator/mesos/trunk/src/slave/process_isolator.hpp?rev=1468769&r1=1468768&r2=1468769&view=diff
==============================================================================
--- incubator/mesos/trunk/src/slave/process_isolator.hpp (original)
+++ incubator/mesos/trunk/src/slave/process_isolator.hpp Wed Apr 17 07:08:36 2013
@@ -67,8 +67,7 @@ public:
       const ExecutorInfo& executorInfo,
       const UUID& uuid,
       const std::string& directory,
-      const Resources& resources,
-      const Option<std::string>& path);
+      const Resources& resources);
 
   virtual void killExecutor(
       const FrameworkID& frameworkId,

Modified: incubator/mesos/trunk/src/slave/slave.cpp
URL: http://svn.apache.org/viewvc/incubator/mesos/trunk/src/slave/slave.cpp?rev=1468769&r1=1468768&r2=1468769&view=diff
==============================================================================
--- incubator/mesos/trunk/src/slave/slave.cpp (original)
+++ incubator/mesos/trunk/src/slave/slave.cpp Wed Apr 17 07:08:36 2013
@@ -364,11 +364,11 @@ void Slave::initialize()
 
   // Start recovery.
   recover(flags.recover == "reconnect", flags.safe)
-   .onAny(defer(self(), &Slave::_recover, params::_1));
+    .onAny(defer(self(), &Slave::_initialize, params::_1));
 }
 
 
-void Slave::_recover(const Future<Nothing>& future)
+void Slave::_initialize(const Future<Nothing>& future)
 {
   if (!future.isReady()) {
     LOG(FATAL) << "Recovery failure: " << future.failure();
@@ -606,42 +606,35 @@ void Slave::runTask(
 
   Framework* framework = getFramework(frameworkId);
   if (framework == NULL) {
-    framework = new Framework(frameworkId, frameworkInfo, pid, flags);
+    framework = new Framework(
+        info.id(), frameworkId, frameworkInfo, pid, flags);
     frameworks[frameworkId] = framework;
+  }
 
-    if (frameworkInfo.checkpoint()) {
-      // Checkpoint the framework info.
-      string path = paths::getFrameworkInfoPath(
-          paths::getMetaRootDir(flags.work_dir),
-          info.id(),
-          frameworkId);
+  CHECK_NOTNULL(framework);
 
-      CHECK_SOME(state::checkpoint(path, frameworkInfo));
+  if (framework->state == Framework::INITIALIZING) {
+    LOG(INFO) << "Enqueuing task " << task.task_id()
+              << " until framework " << frameworkId
+              << " is initialized";
+    framework->pending.push_back(task);
+    return;
+  }
 
-      // Checkpoint the framework pid.
-      path = paths::getFrameworkPidPath(
-          paths::getMetaRootDir(flags.work_dir),
-          info.id(),
-          frameworkId);
+  if (framework->state == Framework::TERMINATING) {
+    LOG(WARNING) << "WARNING! Asked to run task '" << task.task_id()
+                 << "' for framework " << frameworkId
+                 << " which is terminating";
 
-      CHECK_SOME(state::checkpoint(path, framework->pid));
-    }
-  } else {
-    if (framework->shutdown) {
-      LOG(WARNING) << "WARNING! Asked to run task '" << task.task_id()
-                   << "' for framework " << frameworkId
-                   << " which is being shut down";
-
-      const StatusUpdate& update = protobuf::createStatusUpdate(
-          frameworkId,
-          info.id(),
-          task.task_id(),
-          TASK_LOST,
-          "Framework shutting down");
+    const StatusUpdate& update = protobuf::createStatusUpdate(
+        frameworkId,
+        info.id(),
+        task.task_id(),
+        TASK_LOST,
+        "Framework terminating");
 
-      statusUpdate(update);
-      return;
-    }
+    statusUpdate(update);
+    return;
   }
 
   const ExecutorInfo& executorInfo = framework->getExecutorInfo(task);
@@ -651,91 +644,9 @@ void Slave::runTask(
   // and queue the task until the executor has started.
   Executor* executor = framework->getExecutor(executorId);
 
-  if (executor != NULL) {
-    if (executor->shutdown) {
-      LOG(WARNING) << "WARNING! Asked to run task '" << task.task_id()
-                   << "' for framework " << frameworkId
-                   << " with executor '" << executorId
-                   << "' which is being shut down";
-
-      const StatusUpdate& update = protobuf::createStatusUpdate(
-          frameworkId,
-          info.id(),
-          task.task_id(),
-          TASK_LOST,
-          "Executor shutting down");
-
-      statusUpdate(update);
-    } else if (!executor->pid) {
-      // Queue task until the executor starts up.
-      LOG(INFO) << "Queuing task '" << task.task_id()
-                << "' for executor " << executorId
-                << " of framework '" << frameworkId;
-
-      if (frameworkInfo.checkpoint()) {
-        // Checkpoint the task.
-        // TODO(vinod): Consider moving these 3 statements into
-        // a helper function, since its used 3 times in this function!
-        const string& path = paths::getTaskInfoPath(
-            paths::getMetaRootDir(flags.work_dir),
-            info.id(),
-            executor->frameworkId,
-            executor->id,
-            executor->uuid,
-            task.task_id());
-
-        const Task& t = protobuf::createTask(
-            task, TASK_STAGING, executor->id, executor->frameworkId);
-
-        CHECK_SOME(state::checkpoint(path, t));
-      }
-
-      executor->queuedTasks[task.task_id()] = task;
-    } else {
-      if (frameworkInfo.checkpoint()) {
-        // Checkpoint the task.
-        const string& path = paths::getTaskInfoPath(
-            paths::getMetaRootDir(flags.work_dir),
-            info.id(),
-            executor->frameworkId,
-            executor->id,
-            executor->uuid,
-            task.task_id());
-
-        const Task& t = protobuf::createTask(
-            task, TASK_STAGING, executor->id, executor->frameworkId);
-
-        CHECK_SOME(state::checkpoint(path, t));
-      }
-
-      // Add the task and send it to the executor.
-      executor->addTask(task);
-
-      stats.tasks[TASK_STAGING]++;
-
-      // Update the resources.
-      // TODO(Charles Reiss): The isolator is not guaranteed to update
-      // the resources before the executor acts on its RunTaskMessage.
-      dispatch(isolator,
-               &Isolator::resourcesChanged,
-               framework->id,
-               executor->id,
-               executor->resources);
-
-      LOG(INFO) << "Sending task '" << task.task_id()
-                << "' to executor '" << executorId
-                << "' of framework " << framework->id;
-
-      RunTaskMessage message;
-      message.mutable_framework()->MergeFrom(framework->info);
-      message.mutable_framework_id()->MergeFrom(framework->id);
-      message.set_pid(framework->pid);
-      message.mutable_task()->MergeFrom(task);
-      send(executor->pid, message);
-    }
-  } else {
+  if (executor == NULL) {
     // Launch an executor for this task.
-    executor = framework->createExecutor(info.id(), executorInfo);
+    executor = framework->createExecutor(executorInfo);
 
     files->attach(executor->directory, executor->directory)
       .onAny(defer(self(),
@@ -743,52 +654,16 @@ void Slave::runTask(
                    params::_1,
                    executor->directory));
 
-    // Check to see if we need to checkpoint this executor and the task.
-    Option<string> pidPath;
-    if (frameworkInfo.checkpoint()) {
-      // Checkpoint the executor info.
-      string path = paths::getExecutorInfoPath(
-          paths::getMetaRootDir(flags.work_dir),
-          info.id(),
-          executor->frameworkId,
-          executor->id);
-
-      CHECK_SOME(state::checkpoint(path, executor->info));
-
-      // Create the meta executor directory.
-      // NOTE: This creates the 'latest' symlink in the meta directory.
-      paths::createExecutorDirectory(
-          paths::getMetaRootDir(flags.work_dir),
-          info.id(),
-          framework->id,
-          executor->id,
-          executor->uuid);
-
-      // Checkpoint the task.
-      path = paths::getTaskInfoPath(
-          paths::getMetaRootDir(flags.work_dir),
-          info.id(),
-          executor->frameworkId,
-          executor->id,
-          executor->uuid,
-          task.task_id());
-
-      const Task& t = protobuf::createTask(
-          task, TASK_STAGING, executor->id, executor->frameworkId);
-
-      CHECK_SOME(state::checkpoint(path, t));
-
-      // Get the path for isolator to checkpoint the forked pid.
-      pidPath = paths::getForkedPidPath(
-          paths::getMetaRootDir(flags.work_dir),
-          info.id(),
-          framework->id,
-          executor->id,
-          executor->uuid);
-    }
-
-    // Queue task until the executor starts up.
-    executor->queuedTasks[task.task_id()] = task;
+    // Tell the isolator to launch the executor.
+    dispatch(isolator,
+             &Isolator::launchExecutor,
+             info.id(),
+             framework->id,
+             framework->info,
+             executor->info,
+             executor->uuid,
+             executor->directory,
+             executor->resources);
 
     // Make sure the executor registers within the given timeout.
     // NOTE: We send this message before dispatching the launchExecutor to
@@ -799,21 +674,66 @@ void Slave::runTask(
           framework->id,
           executor->id,
           executor->uuid);
+  }
 
-    // Tell the isolator to launch the executor. (TODO(benh): Make the
-    // isolator a process so that it can block while trying to launch
-    // the executor.)
-    dispatch(isolator,
-             &Isolator::launchExecutor,
-             info.id(),
-             framework->id,
-             framework->info,
-             executor->info,
-             executor->uuid,
-             executor->directory,
-             executor->resources,
-             pidPath);
+  CHECK_NOTNULL(executor);
+
+  if (executor->state == Executor::TERMINATING ||
+      executor->state == Executor::TERMINATED) {
+    LOG(WARNING) << "WARNING! Asked to run task '" << task.task_id()
+               << "' for framework " << frameworkId
+               << " with executor '" << executorId
+               << "' which is terminating/terminated";
+
+    const StatusUpdate& update = protobuf::createStatusUpdate(
+        frameworkId,
+        info.id(),
+        task.task_id(),
+        TASK_LOST,
+        "Executor terminating/terminated");
+
+    statusUpdate(update);
+    return;
   }
+
+  // Checkpoint the task before we do anything else (this is a no-op
+  // if the framework doesn't have checkpointing enabled).
+  executor->checkpointTask(task);
+
+  stats.tasks[TASK_STAGING]++;
+
+  // Queue task if the executor is not yet registered.
+  if (executor->state == Executor::REGISTERING) {
+    LOG(INFO) << "Queuing task '" << task.task_id()
+              << "' for executor " << executorId
+              << " of framework '" << frameworkId;
+
+    executor->queuedTasks[task.task_id()] = task;
+    return;
+  }
+
+  // Add the task and send it to the executor.
+  executor->addTask(task);
+
+  // Update the resources.
+  // TODO(Charles Reiss): The isolator is not guaranteed to update
+  // the resources before the executor acts on its RunTaskMessage.
+  dispatch(isolator,
+           &Isolator::resourcesChanged,
+           framework->id,
+           executor->id,
+           executor->resources);
+
+  LOG(INFO) << "Sending task '" << task.task_id()
+            << "' to executor '" << executorId
+            << "' of framework " << frameworkId;
+
+  RunTaskMessage message;
+  message.mutable_framework()->MergeFrom(framework->info);
+  message.mutable_framework_id()->MergeFrom(framework->id);
+  message.set_pid(framework->pid);
+  message.mutable_task()->MergeFrom(task);
+  send(executor->pid, message);
 }
 
 
@@ -835,6 +755,8 @@ void Slave::killTask(const FrameworkID& 
     return;
   }
 
+  // TODO(benh/vinod): Check framework state.
+
   // Tell the executor to kill the task if it is up and
   // running, otherwise, consider the task lost.
   Executor* executor = framework->getExecutor(taskId);
@@ -847,7 +769,7 @@ void Slave::killTask(const FrameworkID& 
         frameworkId, info.id(), taskId, TASK_LOST, "Cannot find executor");
 
     statusUpdate(update);
-  } else if (!executor->pid) {
+  } else if (executor->state == Executor::REGISTERING) {
     // We are here, if the executor hasn't registered with the slave yet.
     const StatusUpdate& update = protobuf::createStatusUpdate(
         frameworkId,
@@ -893,11 +815,12 @@ void Slave::shutdownFramework(const Fram
   if (framework != NULL) {
     LOG(INFO) << "Shutting down framework " << framework->id;
 
-    framework->shutdown = true;
+    framework->state = Framework::TERMINATING;
 
     // Shut down all executors of this framework.
-    // Note that the framework and its corresponding executors are removed from
-    // the frameworks map by shutdownExecutorTimeout() or executorTerminated().
+    // Note that the framework and its corresponding executors are
+    // removed from the frameworks map by shutdownExecutorTimeout() or
+    // executorTerminated().
     foreachvalue (Executor* executor, framework->executors) {
       shutdownExecutor(framework, executor);
     }
@@ -922,21 +845,26 @@ void Slave::schedulerMessage(
     return;
   }
 
+  // TODO(benh/vinod): Check framework state.
+
   Executor* executor = framework->getExecutor(executorId);
   if (executor == NULL) {
     LOG(WARNING) << "Dropping message for executor '"
                  << executorId << "' of framework " << frameworkId
                  << " because executor does not exist";
     stats.invalidFrameworkMessages++;
-  } else if (!executor->pid) {
-    // TODO(*): If executor is not started, queue framework message?
-    // (It's probably okay to just drop it since frameworks can have
-    // the executor send a message to the master to say when it's ready.)
+  } else if (executor->state == Executor::REGISTERING) {
+    // TODO(*): If executor is not yet registered, queue framework
+    // message? It's probably okay to just drop it since frameworks
+    // can have the executor send a message to the master to say when
+    // it's ready.
     LOG(WARNING) << "Dropping message for executor '"
                  << executorId << "' of framework " << frameworkId
                  << " because executor is not running";
     stats.invalidFrameworkMessages++;
   } else {
+    // TODO(benh/vinod): Check executor state.
+
     FrameworkToExecutorMessage message;
     message.mutable_slave_id()->MergeFrom(slaveId);
     message.mutable_framework_id()->MergeFrom(frameworkId);
@@ -953,8 +881,10 @@ void Slave::updateFramework(const Framew
 {
   Framework* framework = getFramework(frameworkId);
   if (framework != NULL) {
+    // TODO(benh/vinod): Check framework state.
+
     LOG(INFO) << "Updating framework " << frameworkId
-              << " pid to " <<pid;
+              << " pid to " << pid;
     framework->pid = pid;
 
     if (framework->info.checkpoint()) {
@@ -1024,6 +954,8 @@ void Slave::_statusUpdateAcknowledgement
     return;
   }
 
+  // TODO(benh/vinod): Check framework state.
+
   // Find the executor that has this update.
   Executor* executor = framework->getExecutor(taskId);
   if (executor == NULL) {
@@ -1032,6 +964,8 @@ void Slave::_statusUpdateAcknowledgement
     return;
   }
 
+  // TODO(benh/vinod): Check executor state.
+
   executor->updates.remove(taskId, uuid);
 
   // Cleanup the executor and framework, if possible.
@@ -1056,6 +990,8 @@ void Slave::registerExecutor(
     return;
   }
 
+  // TODO(benh/vinod): Check framework state.
+
   Executor* executor = framework->getExecutor(executorId);
 
   // Check the status of the executor.
@@ -1063,23 +999,20 @@ void Slave::registerExecutor(
     LOG(WARNING) << "WARNING! Unexpected executor '" << executorId
                  << "' registering for framework " << frameworkId;
     reply(ShutdownExecutorMessage());
-  } else if (executor->pid) {
-    LOG(WARNING) << "WARNING! executor '" << executorId
+  } else if (executor->state != Executor::REGISTERING) {
+    LOG(WARNING) << "WARNING! Executor '" << executorId
                  << "' of framework " << frameworkId
-                 << " is already running";
-    reply(ShutdownExecutorMessage());
-  } else if (executor->shutdown) {
-    LOG(WARNING) << "WARNING! executor '" << executorId
-                 << "' of framework " << frameworkId
-                 << " should be shutting down";
+                 << " is not expected to be registering";
     reply(ShutdownExecutorMessage());
   } else {
     // Save the pid for the executor.
     executor->pid = from;
 
+    executor->state = Executor::RUNNING;
+
     if (framework->info.checkpoint()) {
-      // TODO(vinod): This checkpointing should be done asynchronously as it is
-      // in the fast path of the slave!
+      // TODO(vinod): This checkpointing should be done asynchronously
+      // as it is in the fast path of the slave!
 
       // Checkpoint the libprocess pid.
       string path = paths::getLibprocessPidPath(
@@ -1153,6 +1086,8 @@ void Slave::reregisterExecutor(
 
   executor->pid = from; // Update the pid, to signal re-registration.
 
+  executor->state = Executor::RUNNING;
+
   // Send re-registration message to the executor.
   ExecutorReregisteredMessage message;
   message.mutable_slave_id()->MergeFrom(info.id());
@@ -1206,6 +1141,7 @@ void Slave::reregisterExecutorTimeout()
       // exited! This is because, if the executor properly exited, it
       // should have already been identified by the isolator (via
       // reaper) and cleaned up!
+      // TODO(benh/vinod): Check executor state.
       if (!executor->pid) {
         LOG(INFO) << "Shutting down un-reregistered executor "
                   << executor->id << " of framework " << framework->id;
@@ -1231,13 +1167,13 @@ void Slave::statusUpdate(const StatusUpd
 
   LOG(INFO) << "Handling status update " << update;
 
-  Framework* framework = getFramework(update.framework_id());
-  Executor* executor = NULL;
+  Executor* executor = NULL; // Need to pass to forwardUpdate.
 
+  Framework* framework = getFramework(update.framework_id());
   if (framework != NULL) {
     executor = framework->getExecutor(status.task_id());
-
     if (executor != NULL) {
+      // TODO(benh/vinod): Check executor state.
       executor->updateTaskState(status.task_id(), status.state());
       executor->updates.put(status.task_id(), UUID::fromBytes(update.uuid()));
 
@@ -1313,8 +1249,13 @@ void Slave::forwardUpdate(const StatusUp
   stats.tasks[update.status().state()]++;
   stats.validStatusUpdates++;
 
+  // TODO(benh/vinod): Have the StatusUpdateManager just take
+  // checkpoint and determine the path itself. It can log a warning if
+  // it can't generate the path because there is no executor ID. How
+  // else can we persist status updates for tasks that don't have an
+  // executor ID?
   statusUpdateManager->update(update, checkpoint, path)
-    .onAny(defer(self(), &Slave::_forwardUpdate, params::_1, update, pid));;
+    .onAny(defer(self(), &Slave::_forwardUpdate, params::_1, update, pid));
 }
 
 
@@ -1367,6 +1308,8 @@ void Slave::executorMessage(
     return;
   }
 
+  // TODO(benh/vinod): Check framework state.
+
   LOG(INFO) << "Sending message for framework " << frameworkId
             << " to " << framework->pid;
 
@@ -1430,6 +1373,8 @@ void Slave::executorStarted(
     return;
   }
 
+  // TODO(benh/vinod): Check framework state.
+
   Executor* executor = framework->getExecutor(executorId);
   if (executor == NULL) {
     LOG(WARNING) << "Invalid executor '" << executorId
@@ -1438,6 +1383,8 @@ void Slave::executorStarted(
     return;
   }
 
+  // TODO(benh/vinod): Check executor state.
+
   monitor.watch(
       frameworkId,
       executorId,
@@ -1497,6 +1444,8 @@ void Slave::executorTerminated(
     return;
   }
 
+  // TODO(benh/vinod): Check framework state.
+
   Executor* executor = framework->getExecutor(executorId);
   if (executor == NULL) {
     LOG(WARNING) << "Invalid executor '" << executorId
@@ -1505,7 +1454,9 @@ void Slave::executorTerminated(
     return;
   }
 
-  executor->terminated = true;
+  // TODO(benh/vinod): Check executor state.
+
+  executor->state = Executor::TERMINATED;
 
   bool isCommandExecutor = false;
 
@@ -1513,12 +1464,12 @@ void Slave::executorTerminated(
   // If the isolator destroyed the executor (e.g., due to OOM event)
   // or if this is a command executor, we send TASK_FAILED status updates
   // instead of TASK_LOST.
-  // NOTE: We don't send updates if the framework is shutting down,
+  // NOTE: We don't send updates if the framework is terminating
   // because we don't want the status update manager to keep retrying
-  // these updates for the lack for ACKs from the scheduler. Also,
+  // these updates since it won't receive ACKs from the scheduler.  Also,
   // the status update manager should have already cleaned up all the
-  // status update streams for a framework that is shutting down.
-  if (!framework->shutdown) {
+  // status update streams for a framework that is terminating.
+  if (framework->state != Framework::TERMINATING) {
     StatusUpdate update;
 
     // Transition all live launched tasks.
@@ -1592,11 +1543,12 @@ void Slave::cleanup(Framework* framework
   CHECK_NOTNULL(executor);
 
   // Cleanup this executor if it has terminated and either has no
-  // pending updates or the framework is shutting down. We don't
-  // care for pending updates when framework is shutting down
+  // pending updates or the framework is terminating. We don't
+  // care for pending updates when a framework is terminating
   // because the framework cannot ACK them.
-  if (executor->terminated &&
-      (executor->updates.empty() || framework->shutdown)) {
+  if (executor->state == Executor::TERMINATED &&
+      (executor->updates.empty() ||
+       framework->state == Framework::TERMINATING)) {
 
     // Schedule the executor directory to get garbage collected.
     gc.schedule(flags.gc_delay, executor->directory)
@@ -1634,8 +1586,8 @@ void Slave::cleanup()
   string metaDir = paths::getMetaRootDir(flags.work_dir);
 
   // Archive and delete the meta directory, to allow incompatible upgrades.
-  LOG(INFO) << "Archiving and deleting the meta directory '" << metaDir
-            << "' to allow incompatible upgrade!";
+  LOG(INFO) << "Archiving and deleting the meta directory '"
+            << metaDir << "' to allow incompatible upgrade!";
 
   // Create the archive directory, if it doesn't exist.
   Try<Nothing> result = os::mkdir(archiveDir);
@@ -1644,12 +1596,12 @@ void Slave::cleanup()
         metaDir, path::join(archiveDir, info.id().value() + ".tar.gz"));
 
     if (result.isError()) {
-      LOG(ERROR) << "Failed to archive meta directory '" << archiveDir
-                 << "': " << result.error();
+      LOG(ERROR) << "Failed to archive meta directory '"
+                 << archiveDir << "': " << result.error();
     }
   } else {
-    LOG(ERROR) << "Failed to create archive directory '" << archiveDir
-               << ": " << result.error();
+    LOG(ERROR) << "Failed to create archive directory '"
+               << archiveDir << ": " << result.error();
   }
 
   result = os::rmdir(metaDir);
@@ -1681,7 +1633,7 @@ void Slave::shutdownExecutor(Framework* 
   LOG(INFO) << "Shutting down executor '" << executor->id
             << "' of framework " << framework->id;
 
-  executor->shutdown = true;
+  executor->state = Executor::TERMINATING;
 
   // If the executor hasn't yet registered, this message
   // will be dropped to the floor!
@@ -1710,6 +1662,8 @@ void Slave::shutdownExecutorTimeout(
     return;
   }
 
+  // TODO(benh/vinod): Check framework state.
+
   Executor* executor = framework->getExecutor(executorId);
   if (executor == NULL) {
     LOG(INFO) << "Executor '" << executorId
@@ -1718,6 +1672,8 @@ void Slave::shutdownExecutorTimeout(
     return;
   }
 
+  // TODO(benh/vinod): Check executor state.
+
   // Make sure this timeout is valid.
   if (executor->uuid != uuid ) {
       LOG(INFO) << "A new executor '" << executorId
@@ -1748,6 +1704,8 @@ void Slave::registerExecutorTimeout(
     return;
   }
 
+  // TODO(benh/vinod): Check framework state.
+
   Executor* executor = framework->getExecutor(executorId);
   if (executor == NULL) {
     LOG(INFO) << "Executor '" << executorId
@@ -1756,24 +1714,26 @@ void Slave::registerExecutorTimeout(
     return;
   }
 
+  // TODO(benh/vinod): Check executor state.
+
   // Make sure this timeout is valid.
   if (executor->uuid != uuid ) {
-      LOG(INFO) << "A new executor '" << executorId
-                << "' of framework " << frameworkId
-                << " with run " << executor->uuid
-                << " seems to be active. Ignoring the shutdown timeout"
-                << " for the old executor run " << uuid;
-      return;
+    LOG(INFO) << "A new executor '" << executorId
+              << "' of framework " << frameworkId
+              << " with run " << executor->uuid
+              << " seems to be active. Ignoring the shutdown timeout"
+              << " for the old executor run " << uuid;
+    return;
   }
 
-  // Shutdown the executor if it has not registered yet.
+  // Terminate the executor if it has not registered yet.
   if (!executor->pid) {
-    LOG(INFO) << "Shutting down executor " << executor->id
+    LOG(INFO) << "Terminating executor " << executor->id
               << " of framework " << framework->id
               << " because it did not register within "
               << flags.executor_registration_timeout;
 
-    executor->shutdown = true;
+    executor->state = Executor::TERMINATING;
 
     // Immediately kill the executor.
     dispatch(isolator, &Isolator::killExecutor, framework->id, executor->id);
@@ -1875,149 +1835,94 @@ Future<Nothing> Slave::recover(bool reco
   return statusUpdateManager->recover(metaDir, state.get())
            .then(defer(isolator, &Isolator::recover, state.get()))
            .then(defer(self(),
-                       &Self::recoverExecutors,
+                       &Self::_recover,
                        state.get(),
                        reconnect));
 }
 
 
-Future<Nothing> Slave::recoverExecutors(
-    const SlaveState& state,
-    bool reconnect)
-{
-  LOG(INFO) << "Recovering executors";
-
-  foreachvalue (const FrameworkState& framework_, state.frameworks) {
-    foreachvalue (const ExecutorState& executor_, framework_.executors) {
-      LOG(INFO) << "Recovering executor '" << executor_.id
-                << "' of framework " << framework_.id;
-
-      if (executor_.info.isNone()) {
-        LOG(WARNING) << "Skipping recovery of executor '" << executor_.id
-                     << "' of framework " << framework_.id
-                     << " because its info cannot be recovered";
-        continue;
-      }
+Future<Nothing> Slave::_recover(const SlaveState& state, bool reconnect)
+{
+  foreachvalue (const FrameworkState& frameworkState, state.frameworks) {
+    recover(frameworkState, reconnect);
+  }
 
-      if (executor_.latest.isNone()) {
-        LOG(WARNING) << "Skipping recovery of executor '" << executor_.id
-                     << "' of framework " << framework_.id
-                     << " because its latest run cannot be recovered";
-        continue;
-      }
+  if (reconnect) {
+    // Cleanup unregistered executors after a delay.
+    delay(EXECUTOR_REREGISTER_TIMEOUT,
+          self(),
+          &Slave::reregisterExecutorTimeout);
 
-      // We are only interested in the latest run of the executor!
-      const UUID& uuid = executor_.latest.get();
-      CHECK(executor_.runs.contains(uuid));
-      const RunState& run  = executor_.runs.get(uuid).get();
-
-      // Create framework, if necessary.
-      Framework* framework = getFramework(framework_.id);
-      if (framework == NULL) {
-        CHECK_SOME(framework_.info);
-        CHECK_SOME(framework_.pid);
+    // We set 'recovered' flag inside reregisterExecutorTimeout(),
+    // so that when the slave re-registers with master it can
+    // correctly inform the master about the launched tasks.
+    return recovered.future();
+  }
 
-        framework = new Framework(
-            framework_.id, framework_.info.get(), framework_.pid.get(), flags);
+  return Nothing();
+}
 
-        frameworks[framework_.id] = framework;
-      }
 
-      // Create executor.
-      const string& directory = paths::getExecutorRunPath(
-          flags.work_dir, info.id(), framework_.id, executor_.id, uuid);
-
-      Executor* executor =
-          new Executor(framework_.id, executor_.info.get(), uuid, directory);
-
-      // Recover the tasks.
-      foreachvalue (const TaskState& task, run.tasks) {
-        if (task.info.isNone()) {
-          LOG(WARNING) << "Skipping recovery of task " << task.id
-                       << " because its info cannot be recovered";
-          continue;
-        }
+void Slave::recover(const FrameworkState& state, bool reconnect)
+{
+  CHECK_SOME(state.info);
+  CHECK_SOME(state.pid);
 
-        executor->launchedTasks[task.id] = new Task(task.info.get());
+  CHECK(!frameworks.contains(state.id)); // TODO(vinod): Is this correct?
+  Framework* framework = new Framework(
+      info.id(), state.id, state.info.get(), state.pid.get(), flags);
 
-        // NOTE: Since some tasks might have been terminated when the slave
-        // was down, the executor resources we capture here is an upper-bound.
-        // The actual resources needed (for live tasks) by the isolator
-        // will be calculated when the executor re-registers.
-        executor->resources += task.info.get().resources();
-
-        // Read updates to get the latest state of the task.
-        foreach (const StatusUpdate& update, task.updates) {
-          executor->updateTaskState(task.id, update.status().state());
-          executor->updates.put(task.id, UUID::fromBytes(update.uuid()));
-
-          // Remove the task if it received a terminal update.
-          if (protobuf::isTerminalState(update.status().state())) {
-            executor->removeTask(task.id);
-
-            // If the terminal update has been acknowledged, remove
-            // it from pending tasks.
-            if (task.acks.contains(UUID::fromBytes(update.uuid()))) {
-              executor->updates.remove(task.id, UUID::fromBytes(update.uuid()));
-            }
-            break;
-          }
-        }
-      }
+  frameworks[framework->id] = framework;
 
-      // Add the executor to the framework.
-      framework->executors[executor_.id] = executor;
+  // Now recover the executors for this framework.
+  foreachvalue (const ExecutorState& executorState, state.executors) {
+    Executor* executor = framework->recoverExecutor(executorState);
 
-      files->attach(executor->directory, executor->directory)
-        .onAny(defer(self(),
-                     &Self::fileAttached,
-                     params::_1,
-                     executor->directory));
-
-      // Reconnect with executor, if possible.
-      if (reconnect && run.libprocessPid.isSome()) {
-        CHECK_SOME(run.forkedPid);
-
-        LOG(INFO) << "Sending reconnect request to executor " << executor_.id
-                  << " of framework " << framework_.id
-                  << " at " << run.libprocessPid.get();
+    // Continue to next executor if this one couldn't be recovered.
+    if (executor == NULL) {
+      continue;
+    }
+
+    // Expose the executor's files.
+    files->attach(executor->directory, executor->directory)
+      .onAny(defer(self(),
+                   &Self::fileAttached,
+                   params::_1,
+                   executor->directory));
+
+    // And monitor the executor.
+    monitor.watch(
+        framework->id,
+        executor->id,
+        executor->info,
+        flags.resource_monitoring_interval)
+      .onAny(lambda::bind(_watch, lambda::_1, framework->id, executor->id));
+
+    if (reconnect) {
+      if (executor->pid) {
+        LOG(INFO) << "Sending reconnect request to executor " << executor->id
+                  << " of framework " << framework->id
+                  << " at " << executor->pid;
 
         ReconnectExecutorMessage message;
         message.mutable_slave_id()->MergeFrom(info.id());
-        send(run.libprocessPid.get(), message);
-      } else if (run.libprocessPid.isSome()) {
+        send(executor->pid, message);
+      } else {
+        // TODO(vinod): What's supposed to happen here?
+      }
+    } else {
+      if (executor->pid) {
         // Cleanup executors.
-        LOG(INFO) << "Sending shutdown to executor " << executor_.id
-                  << " of framework " << framework_.id
-                  << " at " << run.libprocessPid.get();
+        LOG(INFO) << "Sending shutdown to executor " << executor->id
+                  << " of framework " << framework->id
+                  << " to " << executor->pid;
 
-        executor->pid = run.libprocessPid.get();
         shutdownExecutor(framework, executor);
+      } else {
+        // TODO(vinod): What's supposed to happen here?
       }
-
-      // Beging monitoring the executor.
-      monitor.watch(
-          framework_.id,
-          executor_.id,
-          executor_.info.get(),
-          flags.resource_monitoring_interval)
-        .onAny(lambda::bind(_watch, lambda::_1, framework_.id, executor_.id));
     }
   }
-
-  if (reconnect) {
-    // Cleanup unregistered executors after a delay.
-    delay(EXECUTOR_REREGISTER_TIMEOUT,
-          self(),
-          &Slave::reregisterExecutorTimeout);
-
-    // We set 'recovered' flag inside reregisterExecutorTimeout(),
-    // so that when the slave re-registers with master it can
-    // correctly inform the master about the launched tasks.
-    return recovered.future();
-  }
-
-  return Nothing();
 }
 
 } // namespace slave {

Modified: incubator/mesos/trunk/src/slave/slave.hpp
URL: http://svn.apache.org/viewvc/incubator/mesos/trunk/src/slave/slave.hpp?rev=1468769&r1=1468768&r2=1468769&view=diff
==============================================================================
--- incubator/mesos/trunk/src/slave/slave.hpp (original)
+++ incubator/mesos/trunk/src/slave/slave.hpp Wed Apr 17 07:08:36 2013
@@ -181,6 +181,9 @@ protected:
   virtual void finalize();
   virtual void exited(const UPID& pid);
 
+  // This is called when recovery finishes.
+  void _initialize(const Future<Nothing>& future);
+
   void fileAttached(const Future<Nothing>& result, const std::string& path);
 
   void detachFile(const Future<Nothing>& result, const std::string& path);
@@ -223,14 +226,10 @@ protected:
   // live executors. Otherwise, the slave attempts to shutdown/kill them.
   // If 'safe' is true, any recovery errors are considered fatal.
   Future<Nothing> recover(bool reconnect, bool safe);
+  Future<Nothing> _recover(const state::SlaveState& state, bool reconnect);
 
-  // This is called when recovery finishes.
-  void _recover(const Future<Nothing>& future);
-
-  // Recovers executors by reconnecting/killing as necessary.
-  Future<Nothing> recoverExecutors(
-      const state::SlaveState& state,
-      bool reconnect);
+  // Helper to recover a framework from the specified state.
+  void recover(const state::FrameworkState& state, bool reconnect);
 
   // Called when an executor terminates or a status update
   // acknowledgement is handled by the status update manager.
@@ -310,20 +309,46 @@ private:
 // Information describing an executor.
 struct Executor
 {
-  Executor(const FrameworkID& _frameworkId,
+  Executor(const SlaveID& _slaveId,
+           const FrameworkID& _frameworkId,
            const ExecutorInfo& _info,
            const UUID& _uuid,
-           const std::string& _directory)
-    : id(_info.executor_id()),
+           const std::string& _directory,
+           const Flags& _flags,
+           bool _checkpoint)
+    : state(REGISTERING), // TODO(benh): Skipping INITIALIZING for now.
+      slaveId(_slaveId),
+      id(_info.executor_id()),
       info(_info),
       frameworkId(_frameworkId),
-      directory(_directory),
       uuid(_uuid),
+      directory(_directory),
+      flags(_flags),
+      checkpoint(_checkpoint),
       pid(UPID()),
-      shutdown(false),
-      terminated(false),
       resources(_info.resources()),
-      completedTasks(MAX_COMPLETED_TASKS_PER_EXECUTOR) {}
+      completedTasks(MAX_COMPLETED_TASKS_PER_EXECUTOR)
+  {
+    if (checkpoint) {
+      // Checkpoint the executor info.
+      const std::string& path = paths::getExecutorInfoPath(
+          paths::getMetaRootDir(flags.work_dir),
+          slaveId,
+          frameworkId,
+          id);
+
+      CHECK_SOME(state::checkpoint(path, info));
+
+      // Create the meta executor directory.
+      // NOTE: This creates the 'latest' symlink in the meta directory.
+      paths::createExecutorDirectory(
+          paths::getMetaRootDir(flags.work_dir),
+          slaveId,
+          frameworkId,
+          id,
+          uuid);
+    }
+  }
 
   ~Executor()
   {
@@ -339,8 +364,8 @@ struct Executor
     // maybe we shouldn't make this a fatal error.
     CHECK(!launchedTasks.contains(task.task_id()));
 
-    Task* t =
-        new Task(protobuf::createTask(task, TASK_STAGING, id, frameworkId));
+    Task* t = new Task(
+        protobuf::createTask(task, TASK_STAGING, id, frameworkId));
 
     launchedTasks[task.task_id()] = t;
     resources += task.resources();
@@ -366,6 +391,59 @@ struct Executor
     }
   }
 
+  void checkpointTask(const TaskInfo& task)
+  {
+    if (checkpoint) {
+      const std::string& path = paths::getTaskInfoPath(
+          paths::getMetaRootDir(flags.work_dir),
+          slaveId,
+          frameworkId,
+          id,
+          uuid,
+          task.task_id());
+
+      const Task& t = protobuf::createTask(
+          task, TASK_STAGING, id, frameworkId);
+
+      CHECK_SOME(state::checkpoint(path, t));
+    }
+  }
+
+  void recoverTask(const state::TaskState& state)
+  {
+    if (state.info.isNone()) {
+      LOG(WARNING) << "Skipping recovery of task " << state.id
+                   << " because its info cannot be recovered";
+      return;
+    }
+
+    launchedTasks[state.id] = new Task(state.info.get());
+
+    // NOTE: Since some tasks might have been terminated when the
+    // slave was down, the executor resources we capture here is an
+    // upper-bound. The actual resources needed (for live tasks) by
+    // the isolator will be calculated when the executor re-registers.
+    resources += state.info.get().resources();
+
+    // Read updates to get the latest state of the task.
+    foreach (const StatusUpdate& update, state.updates) {
+      updateTaskState(state.id, update.status().state());
+      updates.put(state.id, UUID::fromBytes(update.uuid()));
+
+      // Remove the task if it received a terminal update.
+      if (protobuf::isTerminalState(update.status().state())) {
+        removeTask(state.id);
+
+        // If the terminal update has been acknowledged, remove it
+        // from pending tasks.
+        if (state.acks.contains(UUID::fromBytes(update.uuid()))) {
+          updates.remove(state.id, UUID::fromBytes(update.uuid()));
+        }
+        break;
+      }
+    }
+  }
+
   void updateTaskState(const TaskID& taskId, TaskState state)
   {
     if (launchedTasks.contains(taskId)) {
@@ -373,23 +451,30 @@ struct Executor
     }
   }
 
+  enum {
+    INITIALIZING,
+    REGISTERING,
+    RUNNING,
+    TERMINATING,
+    TERMINATED,
+  } state;
+
+  const SlaveID slaveId;
+
   const ExecutorID id;
   const ExecutorInfo info;
 
   const FrameworkID frameworkId;
 
-  const std::string directory;
-
   const UUID uuid; // Distinguishes executor instances with same ExecutorID.
 
-  UPID pid;
+  const std::string directory;
 
-  bool shutdown; // Indicates if executor is being shut down.
+  const Flags flags;
 
-  // Indicates if the executor has terminated. We need this
-  // because a terminated executor might still have pending
-  // status updates that are not yet acknowledged.
-  bool terminated;
+  const bool checkpoint;
+
+  UPID pid;
 
   Resources resources; // Currently consumed resources.
 
@@ -409,16 +494,37 @@ private:
 // Information about a framework.
 struct Framework
 {
-  Framework(const FrameworkID& _id,
+  Framework(const SlaveID& _slaveId,
+            const FrameworkID& _id,
             const FrameworkInfo& _info,
             const UPID& _pid,
             const Flags& _flags)
-    : id(_id),
+    : state(RUNNING), // TODO(benh): Skipping INITIALIZING for now.
+      slaveId(_slaveId),
+      id(_id),
       info(_info),
       pid(_pid),
       flags(_flags),
-      shutdown(false),
-      completedExecutors(MAX_COMPLETED_EXECUTORS_PER_FRAMEWORK) {}
+      completedExecutors(MAX_COMPLETED_EXECUTORS_PER_FRAMEWORK)
+  {
+    if (info.checkpoint()) {
+      // Checkpoint the framework info.
+      std::string path = paths::getFrameworkInfoPath(
+          paths::getMetaRootDir(flags.work_dir),
+          slaveId,
+          id);
+
+      CHECK_SOME(state::checkpoint(path, info));
+
+      // Checkpoint the framework pid.
+      path = paths::getFrameworkPidPath(
+          paths::getMetaRootDir(flags.work_dir),
+          slaveId,
+          id);
+
+      CHECK_SOME(state::checkpoint(path, pid));
+    }
+  }
 
   ~Framework()
   {
@@ -481,21 +587,28 @@ struct Framework
     return task.executor();
   }
 
-  Executor* createExecutor(const SlaveID& slaveId,
-                           const ExecutorInfo& executorInfo)
+  Executor* createExecutor(const ExecutorInfo& executorInfo)
   {
-    // We create a UUID for the new executor. The UUID uniquely identifies this
-    // new instance of the executor across executors sharing the same executorID
-    // that may have previously run. It also provides a means for the executor
-    // to have a unique directory.
-    UUID executorUUID = UUID::random();
+    // We create a UUID for the new executor. The UUID uniquely
+    // identifies this new instance of the executor across executors
+    // sharing the same executorID that may have previously run. It
+    // also provides a means for the executor to have a unique
+    // directory.
+    UUID uuid = UUID::random();
 
     // Create a directory for the executor.
     const std::string& directory = paths::createExecutorDirectory(
-        flags.work_dir, slaveId, id, executorInfo.executor_id(), executorUUID);
+        flags.work_dir, slaveId, id, executorInfo.executor_id(), uuid);
+
+    Executor* executor = new Executor(
+        slaveId,
+        id,
+        executorInfo,
+        uuid,
+        directory,
+        flags,
+        info.checkpoint());
 
-    Executor* executor =
-      new Executor(id, executorInfo, executorUUID, directory);
     CHECK(!executors.contains(executorInfo.executor_id()));
     executors[executorInfo.executor_id()] = executor;
     return executor;
@@ -533,6 +646,70 @@ struct Framework
     return NULL;
   }
 
+  Executor* recoverExecutor(const state::ExecutorState& state)
+  {
+    LOG(INFO) << "Recovering executor '" << state.id
+              << "' of framework " << id;
+
+    if (state.info.isNone()) {
+      LOG(WARNING) << "Skipping recovery of executor '" << state.id
+                   << "' of framework " << id
+                   << " because its info cannot be recovered";
+      return NULL;
+    }
+
+    if (state.latest.isNone()) {
+      LOG(WARNING) << "Skipping recovery of executor '" << state.id
+                   << "' of framework " << id
+                   << " because its latest run cannot be recovered";
+      return NULL;
+    }
+
+    // We are only interested in the latest run of the executor!
+    const UUID& uuid = state.latest.get();
+
+    // Create executor.
+    const std::string& directory = paths::getExecutorRunPath(
+        flags.work_dir, slaveId, id, state.id, uuid);
+
+    Executor* executor = new Executor(
+        slaveId,
+        id,
+        state.info.get(),
+        uuid,
+        directory,
+        flags,
+        info.checkpoint());
+
+    CHECK(state.runs.contains(uuid));
+    const state::RunState& run = state.runs.get(uuid).get();
+
+    // Recover the libprocess PID if possible.
+    if (run.libprocessPid.isSome()) {
+      CHECK_SOME(run.forkedPid); // TODO(vinod): Why this check?
+      executor->pid = run.libprocessPid.get();
+    }
+
+    // And finally recover all the executor's tasks.
+    foreachvalue (const state::TaskState& taskState, run.tasks) {
+      executor->recoverTask(taskState);
+    }
+
+    // Add the executor to the framework.
+    executors[executor->id] = executor;
+
+    return executor;
+  }
+
+  enum {
+    INITIALIZING,
+    RUNNING,
+    TERMINATING,
+    TERMINATED,
+  } state;
+
+  const SlaveID slaveId;
+
   const FrameworkID id;
   const FrameworkInfo info;
 
@@ -540,7 +717,7 @@ struct Framework
 
   const Flags flags;
 
-  bool shutdown; // Indicates if framework is being shut down.
+  std::vector<TaskInfo> pending; // Pending tasks (used while INITIALIZING).
 
   // Current running executors.
   hashmap<ExecutorID, Executor*> executors;