You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@mesos.apache.org by vi...@apache.org on 2013/04/17 09:08:41 UTC

svn commit: r1468770 - in /incubator/mesos/trunk/src/slave: slave.cpp slave.hpp

Author: vinodkone
Date: Wed Apr 17 07:08:40 2013
New Revision: 1468770

URL: http://svn.apache.org/r1468770
Log:
Moved slave::Framework/Executor implementation into slave.cpp.

Review: https://reviews.apache.org/r/10112

Modified:
    incubator/mesos/trunk/src/slave/slave.cpp
    incubator/mesos/trunk/src/slave/slave.hpp

Modified: incubator/mesos/trunk/src/slave/slave.cpp
URL: http://svn.apache.org/viewvc/incubator/mesos/trunk/src/slave/slave.cpp?rev=1468770&r1=1468769&r2=1468770&view=diff
==============================================================================
--- incubator/mesos/trunk/src/slave/slave.cpp (original)
+++ incubator/mesos/trunk/src/slave/slave.cpp Wed Apr 17 07:08:40 2013
@@ -162,7 +162,7 @@ Slave::Slave(const slave::Flags& _flags,
     }
   }
 
-  std::string ports;
+  string ports;
   if (resources.ports().isSome()) {
     // TODO(vinod): Validate the ports range.
     ports = stringify(resources.ports().get());
@@ -441,7 +441,7 @@ void Slave::fileAttached(const Future<No
 }
 
 
-void Slave::detachFile(const Future<Nothing>& result, const std::string& path)
+void Slave::detachFile(const Future<Nothing>& result, const string& path)
 {
   CHECK(!result.isDiscarded());
   files->detach(path);
@@ -702,7 +702,7 @@ void Slave::runTask(
 
   stats.tasks[TASK_STAGING]++;
 
-  // Queue task if the executor is not yet registered.
+  // Queue task if the executor has not yet registered.
   if (executor->state == Executor::REGISTERING) {
     LOG(INFO) << "Queuing task '" << task.task_id()
               << "' for executor " << executorId
@@ -1143,12 +1143,10 @@ void Slave::reregisterExecutorTimeout()
       // reaper) and cleaned up!
       // TODO(benh/vinod): Check executor state.
       if (!executor->pid) {
-        LOG(INFO) << "Shutting down un-reregistered executor "
-                  << executor->id << " of framework " << framework->id;
+        LOG(INFO) << "Killing an un-reregistered executor " << executor->id
+                  << " of framework " << framework->id;
 
-        // TODO(vinod): Call shutdownExecutor() when it supports
-        // immediate shutdown of the executor.
-        shutdownExecutorTimeout(framework->id, executor->id, executor->uuid);
+        dispatch(isolator, &Isolator::killExecutor, framework->id, executor->id);
       }
     }
   }
@@ -1202,11 +1200,12 @@ void Slave::statusUpdate(const StatusUpd
   }
 
   // Forward the update to the status update manager.
-  // NOTE: We forward the update even if the framework/executor is unknown
-  // because currently there is no persistent state in the master.
-  // The lack of persistence might lead frameworks to use out-of-band means
-  // to figure out the task state mismatch and use status updates to reconcile.
-  // We need to revisit this issue once master has persistent state.
+  // NOTE: We forward the update even if the framework/executor is
+  // unknown because currently there is no persistent state in the
+  // master. The lack of persistence might lead frameworks to use
+  // out-of-band means to figure out the task state mismatch and use
+  // status updates to reconcile. We need to revisit this issue once
+  // master has persistent state.
   forwardUpdate(update, executor);
 }
 
@@ -1868,7 +1867,7 @@ void Slave::recover(const FrameworkState
   CHECK_SOME(state.info);
   CHECK_SOME(state.pid);
 
-  CHECK(!frameworks.contains(state.id)); // TODO(vinod): Is this correct?
+  CHECK(!frameworks.contains(state.id));
   Framework* framework = new Framework(
       info.id(), state.id, state.info.get(), state.pid.get(), flags);
 
@@ -1908,7 +1907,9 @@ void Slave::recover(const FrameworkState
         message.mutable_slave_id()->MergeFrom(info.id());
         send(executor->pid, message);
       } else {
-        // TODO(vinod): What's supposed to happen here?
+        LOG(INFO) << "Unable to reconnect to executor " << executor->id
+                  << " of framework " << framework->id
+                  << " because no libprocess PID was found";
       }
     } else {
       if (executor->pid) {
@@ -1919,12 +1920,374 @@ void Slave::recover(const FrameworkState
 
         shutdownExecutor(framework, executor);
       } else {
-        // TODO(vinod): What's supposed to happen here?
+        LOG(INFO) << "Killing executor " << executor->id
+                  << " of framework " << framework->id
+                  << " because no libprocess PID was found";
+        dispatch(isolator, &Isolator::killExecutor, framework->id, executor->id);
+      }
+    }
+  }
+}
+
+
+
+Framework::Framework(
+    const SlaveID& _slaveId,
+    const FrameworkID& _id,
+    const FrameworkInfo& _info,
+    const UPID& _pid,
+    const Flags& _flags)
+  : state(RUNNING), // TODO(benh): Skipping INITIALIZING for now.
+    slaveId(_slaveId),
+    id(_id),
+    info(_info),
+    pid(_pid),
+    flags(_flags),
+    completedExecutors(MAX_COMPLETED_EXECUTORS_PER_FRAMEWORK)
+{
+  if (info.checkpoint()) {
+    // Checkpoint the framework info.
+    string path = paths::getFrameworkInfoPath(
+        paths::getMetaRootDir(flags.work_dir),
+        slaveId,
+        id);
+
+    CHECK_SOME(state::checkpoint(path, info));
+
+    // Checkpoint the framework pid.
+    path = paths::getFrameworkPidPath(
+        paths::getMetaRootDir(flags.work_dir),
+        slaveId,
+        id);
+
+    CHECK_SOME(state::checkpoint(path, pid));
+  }
+}
+
+
+Framework::~Framework()
+{
+  // We own the non-completed executor pointers, so they need to be deleted.
+  foreachvalue (Executor* executor, executors) {
+    delete executor;
+  }
+}
+
+
+ExecutorInfo Framework::getExecutorInfo(const TaskInfo& task)
+{
+  CHECK(task.has_executor() != task.has_command());
+
+  if (task.has_command()) {
+    ExecutorInfo executor;
+
+    // Command executors share the same id as the task.
+    executor.mutable_executor_id()->set_value(task.task_id().value());
+
+    // Prepare an executor name which includes information on the
+    // command being launched.
+    string name =
+      "(Task: " + task.task_id().value() + ") " + "(Command: sh -c '";
+    if (task.command().value().length() > 15) {
+      name += task.command().value().substr(0, 12) + "...')";
+    } else {
+      name += task.command().value() + "')";
+    }
+
+    executor.set_name("Command Executor " + name);
+    executor.set_source(task.task_id().value());
+
+    // Copy the CommandInfo to get the URIs and environment, but
+    // update it to invoke 'mesos-executor' (unless we couldn't
+    // resolve 'mesos-executor' via 'realpath', in which case just
+    // echo the error and exit).
+    executor.mutable_command()->MergeFrom(task.command());
+
+    Try<string> path = os::realpath(
+        path::join(flags.launcher_dir, "mesos-executor"));
+
+    if (path.isSome()) {
+      executor.mutable_command()->set_value(path.get());
+    } else {
+      executor.mutable_command()->set_value(
+          "echo '" + path.error() + "'; exit 1");
+    }
+
+    // TODO(benh): Set some resources for the executor so that a task
+    // doesn't end up getting killed because the amount of resources
+    // of the executor went over those allocated. Note that this might
+    // mean that the number of resources on the machine will actually
+    // be slightly oversubscribed, so we'll need to reevaluate with
+    // respect to resources that can't be oversubscribed.
+    return executor;
+  }
+
+  return task.executor();
+}
+
+
+Executor* Framework::createExecutor(const ExecutorInfo& executorInfo)
+{
+  // We create a UUID for the new executor. The UUID uniquely
+  // identifies this new instance of the executor across executors
+  // sharing the same executorID that may have previously run. It
+  // also provides a means for the executor to have a unique
+  // directory.
+  UUID uuid = UUID::random();
+
+  // Create a directory for the executor.
+  const string& directory = paths::createExecutorDirectory(
+      flags.work_dir, slaveId, id, executorInfo.executor_id(), uuid);
+
+  Executor* executor = new Executor(
+      slaveId, id, executorInfo, uuid, directory, flags, info.checkpoint());
+
+  CHECK(!executors.contains(executorInfo.executor_id()));
+  executors[executorInfo.executor_id()] = executor;
+  return executor;
+}
+
+
+void Framework::destroyExecutor(const ExecutorID& executorId)
+{
+  if (executors.contains(executorId)) {
+    Executor* executor = executors[executorId];
+    executors.erase(executorId);
+
+    // Pass ownership of the executor pointer.
+    completedExecutors.push_back(Owned<Executor>(executor));
+  }
+}
+
+
+Executor* Framework::getExecutor(const ExecutorID& executorId)
+{
+  if (executors.contains(executorId)) {
+    return executors[executorId];
+  }
+
+  return NULL;
+}
+
+
+Executor* Framework::getExecutor(const TaskID& taskId)
+{
+  foreachvalue (Executor* executor, executors) {
+    if (executor->queuedTasks.contains(taskId) ||
+        executor->launchedTasks.contains(taskId) ||
+        executor->updates.contains(taskId)) {
+      return executor;
+    }
+  }
+  return NULL;
+}
+
+
+Executor* Framework::recoverExecutor(const ExecutorState& state)
+{
+  LOG(INFO) << "Recovering executor '" << state.id
+            << "' of framework " << id;
+
+  if (state.info.isNone()) {
+    LOG(WARNING) << "Skipping recovery of executor '" << state.id
+                 << "' of framework " << id
+                 << " because its info cannot be recovered";
+    return NULL;
+  }
+
+  if (state.latest.isNone()) {
+    LOG(WARNING) << "Skipping recovery of executor '" << state.id
+                 << "' of framework " << id
+                 << " because its latest run cannot be recovered";
+    return NULL;
+  }
+
+  // We are only interested in the latest run of the executor!
+  const UUID& uuid = state.latest.get();
+
+  // Create executor.
+  const string& directory = paths::getExecutorRunPath(
+      flags.work_dir, slaveId, id, state.id, uuid);
+
+  Executor* executor = new Executor(
+      slaveId, id, state.info.get(), uuid, directory, flags, info.checkpoint());
+
+  CHECK(state.runs.contains(uuid));
+  const RunState& run = state.runs.get(uuid).get();
+
+  // Recover the libprocess PID if possible.
+  if (run.libprocessPid.isSome()) {
+    // When recovering in unsafe mode, the assumption is that the
+    // slave can die after checkpointing the forked pid but before the
+    // libprocess pid. So, it is not possible for libprocess pid to
+    // exist but not forked pid. If so, it is a really bad situation
+    // (file corruption).
+    CHECK_SOME(run.forkedPid);
+    executor->pid = run.libprocessPid.get();
+  }
+
+  // And finally recover all the executor's tasks.
+  foreachvalue (const TaskState& taskState, run.tasks) {
+    executor->recoverTask(taskState);
+  }
+
+  // Add the executor to the framework.
+  executors[executor->id] = executor;
+
+  return executor;
+}
+
+
+Executor::Executor(
+    const SlaveID& _slaveId,
+    const FrameworkID& _frameworkId,
+    const ExecutorInfo& _info,
+    const UUID& _uuid,
+    const string& _directory,
+    const Flags& _flags,
+    bool _checkpoint)
+  : state(REGISTERING), // TODO(benh): Skipping INITIALIZING for now.
+    slaveId(_slaveId),
+    id(_info.executor_id()),
+    info(_info),
+    frameworkId(_frameworkId),
+    uuid(_uuid),
+    directory(_directory),
+    flags(_flags),
+    checkpoint(_checkpoint),
+    pid(UPID()),
+    resources(_info.resources()),
+    completedTasks(MAX_COMPLETED_TASKS_PER_EXECUTOR)
+{
+  if (checkpoint) {
+    // Checkpoint the executor info.
+    const string& path = paths::getExecutorInfoPath(
+        paths::getMetaRootDir(flags.work_dir),
+        slaveId,
+        frameworkId,
+        id);
+
+    CHECK_SOME(state::checkpoint(path, info));
+
+    // Create the meta executor directory.
+    // NOTE: This creates the 'latest' symlink in the meta directory.
+    paths::createExecutorDirectory(
+        paths::getMetaRootDir(flags.work_dir),
+        slaveId,
+        frameworkId,
+        id,
+        uuid);
+  }
+}
+
+
+Executor::~Executor()
+{
+  // Delete the tasks.
+  foreachvalue (Task* task, launchedTasks) {
+    delete task;
+  }
+}
+
+
+Task* Executor::addTask(const TaskInfo& task)
+{
+  // The master should enforce unique task IDs, but just in case
+  // maybe we shouldn't make this a fatal error.
+  CHECK(!launchedTasks.contains(task.task_id()));
+
+  Task* t = new Task(
+      protobuf::createTask(task, TASK_STAGING, id, frameworkId));
+
+  launchedTasks[task.task_id()] = t;
+  resources += task.resources();
+  return t;
+}
+
+
+void Executor::removeTask(const TaskID& taskId)
+{
+  // Remove the task if it's queued.
+  queuedTasks.erase(taskId);
+
+  // Update the resources if it's been launched.
+  if (launchedTasks.contains(taskId)) {
+    Task* task = launchedTasks[taskId];
+    foreach (const Resource& resource, task->resources()) {
+      resources -= resource;
+    }
+    launchedTasks.erase(taskId);
+
+    completedTasks.push_back(*task);
+
+    delete task;
+  }
+}
+
+
+void Executor::checkpointTask(const TaskInfo& task)
+{
+  if (checkpoint) {
+    const string& path = paths::getTaskInfoPath(
+        paths::getMetaRootDir(flags.work_dir),
+        slaveId,
+        frameworkId,
+        id,
+        uuid,
+        task.task_id());
+
+    const Task& t = protobuf::createTask(
+        task, TASK_STAGING, id, frameworkId);
+
+    CHECK_SOME(state::checkpoint(path, t));
+  }
+}
+
+
+void Executor::recoverTask(const TaskState& state)
+{
+  if (state.info.isNone()) {
+    LOG(WARNING) << "Skipping recovery of task " << state.id
+                 << " because its info cannot be recovered";
+    return;
+  }
+
+  launchedTasks[state.id] = new Task(state.info.get());
+
+  // NOTE: Since some tasks might have been terminated when the
+  // slave was down, the executor resources we capture here is an
+  // upper-bound. The actual resources needed (for live tasks) by
+  // the isolator will be calculated when the executor re-registers.
+  resources += state.info.get().resources();
+
+  // Read updates to get the latest state of the task.
+  foreach (const StatusUpdate& update, state.updates) {
+    updateTaskState(state.id, update.status().state());
+    updates.put(state.id, UUID::fromBytes(update.uuid()));
+
+    // Remove the task if it received a terminal update.
+    if (protobuf::isTerminalState(update.status().state())) {
+      removeTask(state.id);
+
+      // If the terminal update has been acknowledged, remove it
+      // from pending tasks.
+      if (state.acks.contains(UUID::fromBytes(update.uuid()))) {
+        updates.remove(state.id, UUID::fromBytes(update.uuid()));
       }
+      break;
     }
   }
 }
 
+
+void Executor::updateTaskState(const TaskID& taskId, TaskState state)
+{
+  if (launchedTasks.contains(taskId)) {
+    launchedTasks[taskId]->set_state(state);
+  }
+}
+
+
 } // namespace slave {
 } // namespace internal {
 } // namespace mesos {

Modified: incubator/mesos/trunk/src/slave/slave.hpp
URL: http://svn.apache.org/viewvc/incubator/mesos/trunk/src/slave/slave.hpp?rev=1468770&r1=1468769&r2=1468770&view=diff
==============================================================================
--- incubator/mesos/trunk/src/slave/slave.hpp (original)
+++ incubator/mesos/trunk/src/slave/slave.hpp Wed Apr 17 07:08:40 2013
@@ -309,147 +309,21 @@ private:
 // Information describing an executor.
 struct Executor
 {
-  Executor(const SlaveID& _slaveId,
-           const FrameworkID& _frameworkId,
-           const ExecutorInfo& _info,
-           const UUID& _uuid,
-           const std::string& _directory,
-           const Flags& _flags,
-           bool _checkpoint)
-    : state(REGISTERING), // TODO(benh): Skipping INITIALIZING for now.
-      slaveId(_slaveId),
-      id(_info.executor_id()),
-      info(_info),
-      frameworkId(_frameworkId),
-      uuid(_uuid),
-      directory(_directory),
-      flags(_flags),
-      checkpoint(_checkpoint),
-      pid(UPID()),
-      resources(_info.resources()),
-      completedTasks(MAX_COMPLETED_TASKS_PER_EXECUTOR)
-  {
-    if (checkpoint) {
-      // Checkpoint the executor info.
-      const std::string& path = paths::getExecutorInfoPath(
-          paths::getMetaRootDir(flags.work_dir),
-          slaveId,
-          frameworkId,
-          id);
-
-      CHECK_SOME(state::checkpoint(path, info));
-
-      // Create the meta executor directory.
-      // NOTE: This creates the 'latest' symlink in the meta directory.
-      paths::createExecutorDirectory(
-          paths::getMetaRootDir(flags.work_dir),
-          slaveId,
-          frameworkId,
-          id,
-          uuid);
-    }
-  }
-
-  ~Executor()
-  {
-    // Delete the tasks.
-    foreachvalue (Task* task, launchedTasks) {
-      delete task;
-    }
-  }
-
-  Task* addTask(const TaskInfo& task)
-  {
-    // The master should enforce unique task IDs, but just in case
-    // maybe we shouldn't make this a fatal error.
-    CHECK(!launchedTasks.contains(task.task_id()));
-
-    Task* t = new Task(
-        protobuf::createTask(task, TASK_STAGING, id, frameworkId));
-
-    launchedTasks[task.task_id()] = t;
-    resources += task.resources();
-    return t;
-  }
-
-  void removeTask(const TaskID& taskId)
-  {
-    // Remove the task if it's queued.
-    queuedTasks.erase(taskId);
-
-    // Update the resources if it's been launched.
-    if (launchedTasks.contains(taskId)) {
-      Task* task = launchedTasks[taskId];
-      foreach (const Resource& resource, task->resources()) {
-        resources -= resource;
-      }
-      launchedTasks.erase(taskId);
-
-      completedTasks.push_back(*task);
-
-      delete task;
-    }
-  }
-
-  void checkpointTask(const TaskInfo& task)
-  {
-    if (checkpoint) {
-      const std::string& path = paths::getTaskInfoPath(
-          paths::getMetaRootDir(flags.work_dir),
-          slaveId,
-          frameworkId,
-          id,
-          uuid,
-          task.task_id());
-
-      const Task& t = protobuf::createTask(
-          task, TASK_STAGING, id, frameworkId);
-
-      CHECK_SOME(state::checkpoint(path, t));
-    }
-  }
-
-  void recoverTask(const state::TaskState& state)
-  {
-    if (state.info.isNone()) {
-      LOG(WARNING) << "Skipping recovery of task " << state.id
-                   << " because its info cannot be recovered";
-      return;
-    }
-
-    launchedTasks[state.id] = new Task(state.info.get());
-
-    // NOTE: Since some tasks might have been terminated when the
-    // slave was down, the executor resources we capture here is an
-    // upper-bound. The actual resources needed (for live tasks) by
-    // the isolator will be calculated when the executor re-registers.
-    resources += state.info.get().resources();
-
-    // Read updates to get the latest state of the task.
-    foreach (const StatusUpdate& update, state.updates) {
-      updateTaskState(state.id, update.status().state());
-      updates.put(state.id, UUID::fromBytes(update.uuid()));
-
-      // Remove the task if it received a terminal update.
-      if (protobuf::isTerminalState(update.status().state())) {
-        removeTask(state.id);
-
-        // If the terminal update has been acknowledged, remove it
-        // from pending tasks.
-        if (state.acks.contains(UUID::fromBytes(update.uuid()))) {
-          updates.remove(state.id, UUID::fromBytes(update.uuid()));
-        }
-        break;
-      }
-    }
-  }
-
-  void updateTaskState(const TaskID& taskId, TaskState state)
-  {
-    if (launchedTasks.contains(taskId)) {
-      launchedTasks[taskId]->set_state(state);
-    }
-  }
+  Executor(
+      const SlaveID& _slaveId,
+      const FrameworkID& _frameworkId,
+      const ExecutorInfo& _info,
+      const UUID& _uuid,
+      const std::string& _directory,
+      const Flags& _flags,
+      bool _checkpoint);
+  ~Executor();
+
+  Task* addTask(const TaskInfo& task);
+  void removeTask(const TaskID& taskId);
+  void checkpointTask(const TaskInfo& task);
+  void recoverTask(const state::TaskState& state);
+  void updateTaskState(const TaskID& taskId, TaskState state);
 
   enum {
     INITIALIZING,
@@ -494,212 +368,23 @@ private:
 // Information about a framework.
 struct Framework
 {
-  Framework(const SlaveID& _slaveId,
-            const FrameworkID& _id,
-            const FrameworkInfo& _info,
-            const UPID& _pid,
-            const Flags& _flags)
-    : state(RUNNING), // TODO(benh): Skipping INITIALIZING for now.
-      slaveId(_slaveId),
-      id(_id),
-      info(_info),
-      pid(_pid),
-      flags(_flags),
-      completedExecutors(MAX_COMPLETED_EXECUTORS_PER_FRAMEWORK)
-  {
-    if (info.checkpoint()) {
-      // Checkpoint the framework info.
-      std::string path = paths::getFrameworkInfoPath(
-          paths::getMetaRootDir(flags.work_dir),
-          slaveId,
-          id);
-
-      CHECK_SOME(state::checkpoint(path, info));
-
-      // Checkpoint the framework pid.
-      path = paths::getFrameworkPidPath(
-          paths::getMetaRootDir(flags.work_dir),
-          slaveId,
-          id);
-
-      CHECK_SOME(state::checkpoint(path, pid));
-    }
-  }
-
-  ~Framework()
-  {
-    // We own the non-completed executor pointers, so they need to be deleted.
-    foreachvalue (Executor* executor, executors) {
-      delete executor;
-    }
-  }
+  Framework(
+      const SlaveID& _slaveId,
+      const FrameworkID& _id,
+      const FrameworkInfo& _info,
+      const UPID& _pid,
+      const Flags& _flags);
+
+  ~Framework();
 
   // Returns an ExecutorInfo for a TaskInfo (possibly
   // constructing one if the task has a CommandInfo).
-  ExecutorInfo getExecutorInfo(const TaskInfo& task)
-  {
-    CHECK(task.has_executor() != task.has_command());
-
-    if (task.has_command()) {
-      ExecutorInfo executor;
-
-      // Command executors share the same id as the task.
-      executor.mutable_executor_id()->set_value(task.task_id().value());
-
-      // Prepare an executor name which includes information on the
-      // command being launched.
-      std::string name =
-        "(Task: " + task.task_id().value() + ") " + "(Command: sh -c '";
-      if (task.command().value().length() > 15) {
-        name += task.command().value().substr(0, 12) + "...')";
-      } else {
-        name += task.command().value() + "')";
-      }
-
-      executor.set_name("Command Executor " + name);
-      executor.set_source(task.task_id().value());
-
-      // Copy the CommandInfo to get the URIs and environment, but
-      // update it to invoke 'mesos-executor' (unless we couldn't
-      // resolve 'mesos-executor' via 'realpath', in which case just
-      // echo the error and exit).
-      executor.mutable_command()->MergeFrom(task.command());
-
-      Try<std::string> path = os::realpath(
-          path::join(flags.launcher_dir, "mesos-executor"));
-
-      if (path.isSome()) {
-        executor.mutable_command()->set_value(path.get());
-      } else {
-        executor.mutable_command()->set_value(
-            "echo '" + path.error() + "'; exit 1");
-      }
-
-      // TODO(benh): Set some resources for the executor so that a task
-      // doesn't end up getting killed because the amount of resources of
-      // the executor went over those allocated. Note that this might mean
-      // that the number of resources on the machine will actually be
-      // slightly oversubscribed, so we'll need to reevaluate with respect
-      // to resources that can't be oversubscribed.
-      return executor;
-    }
-
-    return task.executor();
-  }
-
-  Executor* createExecutor(const ExecutorInfo& executorInfo)
-  {
-    // We create a UUID for the new executor. The UUID uniquely
-    // identifies this new instance of the executor across executors
-    // sharing the same executorID that may have previously run. It
-    // also provides a means for the executor to have a unique
-    // directory.
-    UUID uuid = UUID::random();
-
-    // Create a directory for the executor.
-    const std::string& directory = paths::createExecutorDirectory(
-        flags.work_dir, slaveId, id, executorInfo.executor_id(), uuid);
-
-    Executor* executor = new Executor(
-        slaveId,
-        id,
-        executorInfo,
-        uuid,
-        directory,
-        flags,
-        info.checkpoint());
-
-    CHECK(!executors.contains(executorInfo.executor_id()));
-    executors[executorInfo.executor_id()] = executor;
-    return executor;
-  }
-
-  void destroyExecutor(const ExecutorID& executorId)
-  {
-    if (executors.contains(executorId)) {
-      Executor* executor = executors[executorId];
-      executors.erase(executorId);
-
-      // Pass ownership of the executor pointer.
-      completedExecutors.push_back(Owned<Executor>(executor));
-    }
-  }
-
-  Executor* getExecutor(const ExecutorID& executorId)
-  {
-    if (executors.contains(executorId)) {
-      return executors[executorId];
-    }
-
-    return NULL;
-  }
-
-  Executor* getExecutor(const TaskID& taskId)
-  {
-    foreachvalue (Executor* executor, executors) {
-      if (executor->queuedTasks.contains(taskId) ||
-          executor->launchedTasks.contains(taskId) ||
-          executor->updates.contains(taskId)) {
-        return executor;
-      }
-    }
-    return NULL;
-  }
-
-  Executor* recoverExecutor(const state::ExecutorState& state)
-  {
-    LOG(INFO) << "Recovering executor '" << state.id
-              << "' of framework " << id;
-
-    if (state.info.isNone()) {
-      LOG(WARNING) << "Skipping recovery of executor '" << state.id
-                   << "' of framework " << id
-                   << " because its info cannot be recovered";
-      return NULL;
-    }
-
-    if (state.latest.isNone()) {
-      LOG(WARNING) << "Skipping recovery of executor '" << state.id
-                   << "' of framework " << id
-                   << " because its latest run cannot be recovered";
-      return NULL;
-    }
-
-    // We are only interested in the latest run of the executor!
-    const UUID& uuid = state.latest.get();
-
-    // Create executor.
-    const std::string& directory = paths::getExecutorRunPath(
-        flags.work_dir, slaveId, id, state.id, uuid);
-
-    Executor* executor = new Executor(
-        slaveId,
-        id,
-        state.info.get(),
-        uuid,
-        directory,
-        flags,
-        info.checkpoint());
-
-    CHECK(state.runs.contains(uuid));
-    const state::RunState& run = state.runs.get(uuid).get();
-
-    // Recover the libprocess PID if possible.
-    if (run.libprocessPid.isSome()) {
-      CHECK_SOME(run.forkedPid); // TODO(vinod): Why this check?
-      executor->pid = run.libprocessPid.get();
-    }
-
-    // And finally recover all the executor's tasks.
-    foreachvalue (const state::TaskState& taskState, run.tasks) {
-      executor->recoverTask(taskState);
-    }
-
-    // Add the executor to the framework.
-    executors[executor->id] = executor;
-
-    return executor;
-  }
+  ExecutorInfo getExecutorInfo(const TaskInfo& task);
+  Executor* createExecutor(const ExecutorInfo& executorInfo);
+  void destroyExecutor(const ExecutorID& executorId);
+  Executor* getExecutor(const ExecutorID& executorId);
+  Executor* getExecutor(const TaskID& taskId);
+  Executor* recoverExecutor(const state::ExecutorState& state);
 
   enum {
     INITIALIZING,