You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@mesos.apache.org by be...@apache.org on 2011/06/27 08:08:39 UTC

svn commit: r1140024 [5/15] - in /incubator/mesos/trunk: ./ ec2/ ec2/deploy.karmic64/ ec2/deploy.solaris/ frameworks/torque/nexus-hpl/ include/mesos/ src/ src/common/ src/configurator/ src/detector/ src/examples/ src/examples/java/ src/examples/python/...

Modified: incubator/mesos/trunk/src/slave/process_based_isolation_module.hpp
URL: http://svn.apache.org/viewvc/incubator/mesos/trunk/src/slave/process_based_isolation_module.hpp?rev=1140024&r1=1140023&r2=1140024&view=diff
==============================================================================
--- incubator/mesos/trunk/src/slave/process_based_isolation_module.hpp (original)
+++ incubator/mesos/trunk/src/slave/process_based_isolation_module.hpp Mon Jun 27 06:08:33 2011
@@ -1,33 +1,46 @@
 #ifndef __PROCESS_BASED_ISOLATION_MODULE_HPP__
 #define __PROCESS_BASED_ISOLATION_MODULE_HPP__
 
-#include <sys/types.h>
+#include <string>
 
-#include <boost/unordered_map.hpp>
+#include <sys/types.h>
 
 #include "isolation_module.hpp"
+#include "reaper.hpp"
 #include "slave.hpp"
 
-#include "launcher/launcher.hpp"
+#include "common/hashmap.hpp"
 
-#include "messaging/messages.hpp"
+#include "launcher/launcher.hpp"
 
 
 namespace mesos { namespace internal { namespace slave {
 
-class ProcessBasedIsolationModule : public IsolationModule {
+class ProcessBasedIsolationModule
+  : public IsolationModule, public ProcessExitedListener
+{
 public:
   ProcessBasedIsolationModule();
 
   virtual ~ProcessBasedIsolationModule();
 
-  virtual void initialize(Slave *slave);
+  virtual void initialize(const Configuration& conf,
+                          bool local,
+                          const process::PID<Slave>& slave);
+
+  virtual void launchExecutor(const FrameworkID& frameworkId,
+                              const FrameworkInfo& frameworkInfo,
+                              const ExecutorInfo& executorInfo,
+                              const std::string& directory);
+
+  virtual void killExecutor(const FrameworkID& frameworkId,
+                            const ExecutorID& executorId);
+
+  virtual void resourcesChanged(const FrameworkID& frameworkId,
+                                const ExecutorID& executorId,
+                                const Resources& resources);
 
-  virtual void launchExecutor(Framework* framework, Executor* executor);
-
-  virtual void killExecutor(Framework* framework, Executor* executor);
-
-  virtual void resourcesChanged(Framework* framework, Executor* executor);
+  virtual void processExited(pid_t pid, int status);
 
 protected:
   // Main method executed after a fork() to create a Launcher for launching
@@ -37,24 +50,24 @@ protected:
   // Subclasses of ProcessBasedIsolationModule that wish to override the
   // default launching behavior should override createLauncher() and return
   // their own Launcher object (including possibly a subclass of Launcher).
-  virtual launcher::ExecutorLauncher* createExecutorLauncher(Framework* framework, Executor* executor);
+  virtual launcher::ExecutorLauncher* createExecutorLauncher(
+      const FrameworkID& frameworkId,
+      const FrameworkInfo& frameworkInfo,
+      const ExecutorInfo& executorInfo,
+      const std::string& directory);
 
 private:
-  // Reaps child processes and tells the slave if they exit
-  class Reaper : public process::Process<Reaper> {
-    ProcessBasedIsolationModule* module;
-
-  protected:
-    virtual void operator () ();
-
-  public:
-    Reaper(ProcessBasedIsolationModule* module);
-  };
-
+  // No copying, no assigning.
+  ProcessBasedIsolationModule(const ProcessBasedIsolationModule&);
+  ProcessBasedIsolationModule& operator = (const ProcessBasedIsolationModule&);
+
+  // TODO(benh): Make variables const by passing them via constructor.
+  Configuration conf;
+  bool local;
+  process::PID<Slave> slave;
   bool initialized;
-  Slave* slave;
-  boost::unordered_map<FrameworkID, boost::unordered_map<ExecutorID, pid_t> > pgids;
   Reaper* reaper;
+  hashmap<FrameworkID, hashmap<ExecutorID, pid_t> > pgids;
 };
 
 }}}

Modified: incubator/mesos/trunk/src/slave/projd.cpp
URL: http://svn.apache.org/viewvc/incubator/mesos/trunk/src/slave/projd.cpp?rev=1140024&r1=1140023&r2=1140024&view=diff
==============================================================================
--- incubator/mesos/trunk/src/slave/projd.cpp (original)
+++ incubator/mesos/trunk/src/slave/projd.cpp Mon Jun 27 06:08:33 2011
@@ -7,7 +7,7 @@
 
 #include "common/fatal.hpp"
 
-#include "messaging/messages.hpp"
+#include "messages/messages.hpp"
 
 
 namespace mesos { namespace internal { namespace projd {

Added: incubator/mesos/trunk/src/slave/reaper.cpp
URL: http://svn.apache.org/viewvc/incubator/mesos/trunk/src/slave/reaper.cpp?rev=1140024&view=auto
==============================================================================
--- incubator/mesos/trunk/src/slave/reaper.cpp (added)
+++ incubator/mesos/trunk/src/slave/reaper.cpp Mon Jun 27 06:08:33 2011
@@ -0,0 +1,48 @@
+#include <sys/types.h>
+#include <sys/wait.h>
+
+#include <process/dispatch.hpp>
+
+#include "reaper.hpp"
+
+#include "common/foreach.hpp"
+
+using namespace process;
+
+
+namespace mesos { namespace internal { namespace slave {
+
+Reaper::Reaper() {}
+
+
+Reaper::~Reaper() {}
+
+
+void Reaper::addProcessExitedListener(
+    const PID<ProcessExitedListener>& listener)
+{
+  listeners.insert(listener);
+}
+
+
+void Reaper::operator () ()
+{
+  while (true) {
+    serve(1);
+    if (name() == TIMEOUT) {
+      // Check whether any child process has exited.
+      pid_t pid;
+      int status;
+      if ((pid = waitpid((pid_t) -1, &status, WNOHANG)) > 0) {
+        foreach (const PID<ProcessExitedListener>& listener, listeners) {
+          dispatch(listener, &ProcessExitedListener::processExited,
+                   pid, status);
+        }
+      }
+    } else if (name() == TERMINATE) {
+      return;
+    }
+  }
+}
+
+}}} // namespace mesos { namespace internal { namespace slave {

Added: incubator/mesos/trunk/src/slave/reaper.hpp
URL: http://svn.apache.org/viewvc/incubator/mesos/trunk/src/slave/reaper.hpp?rev=1140024&view=auto
==============================================================================
--- incubator/mesos/trunk/src/slave/reaper.hpp (added)
+++ incubator/mesos/trunk/src/slave/reaper.hpp Mon Jun 27 06:08:33 2011
@@ -0,0 +1,36 @@
+#ifndef __REAPER_HPP__
+#define __REAPER_HPP__
+
+#include <set>
+
+#include <process/process.hpp>
+
+
+namespace mesos { namespace internal { namespace slave {
+
+class ProcessExitedListener : public process::Process<ProcessExitedListener>
+{
+public:
+  virtual void processExited(pid_t pid, int status) = 0;
+};
+
+
+class Reaper : public process::Process<Reaper>
+{
+public:
+  Reaper();
+  virtual ~Reaper();
+
+  void addProcessExitedListener(const process::PID<ProcessExitedListener>&);
+
+protected:
+  virtual void operator () ();
+
+private:
+  std::set<process::PID<ProcessExitedListener> > listeners;
+};
+
+
+}}} // namespace mesos { namespace internal { namespace slave {
+
+#endif // __REAPER_HPP__

Modified: incubator/mesos/trunk/src/slave/slave.cpp
URL: http://svn.apache.org/viewvc/incubator/mesos/trunk/src/slave/slave.cpp?rev=1140024&r1=1140023&r2=1140024&view=diff
==============================================================================
--- incubator/mesos/trunk/src/slave/slave.cpp (original)
+++ incubator/mesos/trunk/src/slave/slave.cpp Mon Jun 27 06:08:33 2011
@@ -1,59 +1,205 @@
-#include <dirent.h>
 #include <errno.h>
-#include <stdio.h>
-#include <string.h>
 
 #include <algorithm>
-#include <fstream>
+#include <iomanip>
 
-#include <google/protobuf/descriptor.h>
+#include <process/timer.hpp>
 
 #include "slave.hpp"
-#include "webui.hpp"
 
+#include "common/build.hpp"
+#include "common/type_utils.hpp"
 #include "common/utils.hpp"
 
-// There's no gethostbyname2 on Solaris, so fake it by calling gethostbyname
-#ifdef __sun__
-#define gethostbyname2(name, _) gethostbyname(name)
-#endif
-
-using namespace mesos;
-using namespace mesos::internal;
-using namespace mesos::internal::slave;
-
-using boost::lexical_cast;
-using boost::unordered_map;
-using boost::unordered_set;
-
-using process::HttpOKResponse;
-using process::HttpResponse;
-using process::HttpRequest;
-using process::Promise;
-using process::UPID;
-
-using std::list;
-using std::make_pair;
-using std::ostringstream;
-using std::pair;
-using std::queue;
+using namespace process;
+
 using std::string;
-using std::vector;
 
+using process::wait; // Necessary on some OS's to disambiguate.
 
-Slave::Slave(const Resources& _resources, bool _local,
-             IsolationModule *_isolationModule)
-  : MesosProcess<Slave>("slave"),
-    resources(_resources), local(_local), isolationModule(_isolationModule)
+
+namespace mesos { namespace internal { namespace slave {
+
+// Information describing an executor (goes away if executor crashes).
+struct Executor
 {
-  initialize();
-}
+  Executor(const FrameworkID& _frameworkId,
+           const ExecutorInfo& _info,
+           const string& _directory)
+    : frameworkId(_frameworkId),
+      info(_info),
+      directory(_directory),
+      id(_info.executor_id()),
+      uuid(UUID::random()),
+      pid(UPID()),
+      shutdown(false) {}
+
+  ~Executor()
+  {
+    // Delete the tasks.
+    foreachvalue (Task* task, launchedTasks) {
+      delete task;
+    }
+  }
+
+  Task* addTask(const TaskDescription& task)
+  {
+    // The master should enforce unique task IDs, but just in case
+    // maybe we shouldn't make this a fatal error.
+    CHECK(!launchedTasks.contains(task.task_id()));
+
+    Task *t = new Task();
+    t->mutable_framework_id()->MergeFrom(frameworkId);
+    t->mutable_executor_id()->MergeFrom(id);
+    t->set_state(TASK_STARTING);
+    t->set_name(task.name());
+    t->mutable_task_id()->MergeFrom(task.task_id());
+    t->mutable_slave_id()->MergeFrom(task.slave_id());
+    t->mutable_resources()->MergeFrom(task.resources());
+
+    launchedTasks[task.task_id()] = t;
+    resources += task.resources();
+  }
+
+  void removeTask(const TaskID& taskId)
+  {
+    // Remove the task if it's queued.
+    queuedTasks.erase(taskId);
+
+    // Update the resources if it's been launched.
+    if (launchedTasks.contains(taskId)) {
+      Task* task = launchedTasks[taskId];
+      foreach (const Resource& resource, task->resources()) {
+        resources -= resource;
+      }
+      launchedTasks.erase(taskId);
+      delete task;
+    }
+  }
+
+  void updateTaskState(const TaskID& taskId, TaskState state)
+  {
+    if (launchedTasks.contains(taskId)) {
+      launchedTasks[taskId]->set_state(state);
+    }
+  }
+
+  const ExecutorID id;
+  const ExecutorInfo info;
+
+  const FrameworkID frameworkId;
+
+  const string directory;
+
+  const UUID uuid; // Distinguishes executor instances with same ExecutorID.
+
+  UPID pid;
+
+  bool shutdown; // Indicates if executor is being shut down.
+
+  Resources resources; // Currently consumed resources.
+
+  hashmap<TaskID, TaskDescription> queuedTasks;
+  hashmap<TaskID, Task*> launchedTasks;
+};
+
+
+// Information about a framework.
+struct Framework
+{
+  Framework(const FrameworkID& _id,
+            const FrameworkInfo& _info,
+            const UPID& _pid)
+    : id(_id), info(_info), pid(_pid) {}
+
+  ~Framework() {}
+
+  Executor* createExecutor(const ExecutorInfo& executorInfo,
+                           const string& directory)
+  {
+    Executor* executor = new Executor(id, executorInfo, directory);
+    CHECK(!executors.contains(executorInfo.executor_id()));
+    executors[executorInfo.executor_id()] = executor;
+    return executor;
+  }
+
+  void destroyExecutor(const ExecutorID& executorId)
+  {
+    if (executors.contains(executorId)) {
+      Executor* executor = executors[executorId];
+      executors.erase(executorId);
+      delete executor;
+    }
+  }
+
+  Executor* getExecutor(const ExecutorID& executorId)
+  {
+    if (executors.contains(executorId)) {
+      return executors[executorId];
+    }
+
+    return NULL;
+  }
+
+  Executor* getExecutor(const TaskID& taskId)
+  {
+    foreachvalue (Executor* executor, executors) {
+      if (executor->queuedTasks.contains(taskId) ||
+          executor->launchedTasks.contains(taskId)) {
+        return executor;
+      }
+    }
+
+    return NULL;
+  }
+
+  const FrameworkID id;
+  const FrameworkInfo info;
+
+  UPID pid;
+
+  // Current running executors.
+  hashmap<ExecutorID, Executor*> executors;
+
+  // Status updates keyed by uuid.
+  hashmap<UUID, StatusUpdate> updates;
+};
+
+
+// // Represents a pending status update that has been sent and we are
+// // waiting for an acknowledgement. In pa
+
+// // stream of status updates for a framework/task. Note
+// // that these are stored in the slave rather than per Framework
+// // because a framework might go away before all of the status
+// // updates have been sent and acknowledged.
+// struct Slave::StatusUpdateStream
+// {
+//   StatusUpdateStreamID streamId;
+//   string directory;
+//   FILE* updates;
+//   FILE* acknowledged;
+//   queue<StatusUpdate> pending;
+//   double timeout;
+// };
+
+
+//   StatusUpdateStreamID id;
+
+
+
+//   queue<StatusUpdate> pending;
+//   double timeout;
+// };
 
 
-Slave::Slave(const Configuration& _conf, bool _local,
+Slave::Slave(const Configuration& _conf,
+             bool _local,
              IsolationModule* _isolationModule)
-  : MesosProcess<Slave>("slave"),
-    conf(_conf), local(_local), isolationModule(_isolationModule)
+  : ProcessBase("slave"),
+    conf(_conf),
+    local(_local),
+    isolationModule(_isolationModule)
 {
   resources =
     Resources::parse(conf.get<string>("resources", "cpus:1;mem:1024"));
@@ -62,36 +208,66 @@ Slave::Slave(const Configuration& _conf,
 }
 
 
-void Slave::registerOptions(Configurator* configurator)
+Slave::Slave(const Resources& _resources,
+             bool _local,
+             IsolationModule *_isolationModule)
+  : ProcessBase("slave"),
+    resources(_resources),
+    local(_local),
+    isolationModule(_isolationModule)
 {
-  // TODO(benh): Is there a way to specify units for the resources?
-  configurator->addOption<string>("resources",
-                                  "Total consumable resources per slave\n");
-//   configurator->addOption<string>("attributes",
-//                                   "Attributes of machine\n");
-  configurator->addOption<string>("work_dir",
-                                  "Where to place framework work directories\n"
-                                  "(default: MESOS_HOME/work)");
-  configurator->addOption<string>("hadoop_home",
-                                  "Where to find Hadoop installed (for\n"
-                                  "fetching framework executors from HDFS)\n"
-                                  "(default: look for HADOOP_HOME in\n"
-                                  "environment or find hadoop on PATH)");
-  configurator->addOption<bool>("switch_user", 
-                                "Whether to run tasks as the user who\n"
-                                "submitted them rather than the user running\n"
-                                "the slave (requires setuid permission)",
-                                true);
-  configurator->addOption<string>("frameworks_home",
-                                  "Directory prepended to relative executor\n"
-                                  "paths (default: MESOS_HOME/frameworks)");
+  initialize();
 }
 
 
 Slave::~Slave()
 {
+  // TODO(benh): Shut down and free frameworks?
+
   // TODO(benh): Shut down and free executors? The executor should get
-  // an "exited" event and initiate shutdown itself.
+  // an "exited" event and initiate a shut down itself.
+}
+
+
+void Slave::registerOptions(Configurator* configurator)
+{
+  // TODO(benh): Is there a way to specify units for the resources?
+  configurator->addOption<string>(
+      "resources",
+      "Total consumable resources per slave\n");
+
+  configurator->addOption<string>(
+      "attributes",
+      "Attributes of machine\n");
+
+  configurator->addOption<string>(
+      "work_dir",
+      "Where to place framework work directories\n"
+      "(default: MESOS_HOME/work)");
+
+  configurator->addOption<string>(
+      "hadoop_home",
+      "Where to find Hadoop installed (for\n"
+      "fetching framework executors from HDFS)\n"
+      "(default: look for HADOOP_HOME in\n"
+      "environment or find hadoop on PATH)");
+
+  configurator->addOption<bool>(
+      "switch_user", 
+      "Whether to run tasks as the user who\n"
+      "submitted them rather than the user running\n"
+      "the slave (requires setuid permission)",
+      true);
+
+  configurator->addOption<string>(
+      "frameworks_home",
+      "Directory prepended to relative executor\n"
+      "paths (default: MESOS_HOME/frameworks)");
+
+  configurator->addOption<double>(
+      "executor_shutdown_timeout_seconds",
+      "Amount of time (in seconds) to wait for an executor to shut down\n",
+      EXECUTOR_SHUTDOWN_TIMEOUT_SECONDS);
 }
 
 
@@ -105,12 +281,12 @@ Promise<state::SlaveState*> Slave::getSt
   cpus = resources.getScalar("cpus", cpus);
   mem = resources.getScalar("mem", mem);
 
-  state::SlaveState* state =
-    new state::SlaveState(build::DATE, build::USER, slaveId.value(),
-                          cpus.value(), mem.value(), self(), master);
+  state::SlaveState* state = new state::SlaveState(
+      build::DATE, build::USER, id.value(),
+      cpus.value(), mem.value(), self(), master);
 
-  foreachpair (_, Framework* f, frameworks) {
-    foreachpair (_, Executor* e, f->executors) {
+  foreachvalue (Framework* f, frameworks) {
+    foreachvalue (Executor* e, f->executors) {
       Resources resources(e->resources);
       Resource::Scalar cpus;
       Resource::Scalar mem;
@@ -127,16 +303,16 @@ Promise<state::SlaveState*> Slave::getSt
       // construction must be identical to what we do for directory
       // suffix returned from Slave::getUniqueWorkDirectory.
 
-      string id = f->frameworkId.value() + "-" + e->info.executor_id().value();
+      string id = f->id.value() + "-" + e->id.value();
 
-      state::Framework* framework =
-        new state::Framework(id, f->info.name(),
-                             e->info.uri(), e->executorStatus,
-                             cpus.value(), mem.value());
+      state::Framework* framework = new state::Framework(
+          id, f->info.name(),
+          e->info.uri(), "",
+          cpus.value(), mem.value());
 
       state->frameworks.push_back(framework);
 
-      foreachpair (_, Task* t, e->tasks) {
+      foreachvalue (Task* t, e->launchedTasks) {
         Resources resources(t->resources());
         Resource::Scalar cpus;
         Resource::Scalar mem;
@@ -145,10 +321,10 @@ Promise<state::SlaveState*> Slave::getSt
         cpus = resources.getScalar("cpus", cpus);
         mem = resources.getScalar("mem", mem);
 
-        state::Task* task =
-          new state::Task(t->task_id().value(), t->name(),
-                          TaskState_descriptor()->FindValueByNumber(t->state())->name(),
-                          cpus.value(), mem.value());
+        state::Task* task = new state::Task(
+            t->task_id().value(), t->name(),
+            TaskState_Name(t->state()),
+            cpus.value(), mem.value());
 
         framework->tasks.push_back(task);
       }
@@ -162,77 +338,93 @@ Promise<state::SlaveState*> Slave::getSt
 void Slave::initialize()
 {
   // Start all the statistics at 0.
-  statistics.launched_tasks = 0;
-  statistics.finished_tasks = 0;
-  statistics.killed_tasks = 0;
-  statistics.failed_tasks = 0;
-  statistics.lost_tasks = 0;
-  statistics.valid_status_updates = 0;
-  statistics.invalid_status_updates = 0;
-  statistics.valid_framework_messages = 0;
-  statistics.invalid_framework_messages = 0;
+  CHECK(TASK_STARTING == TaskState_MIN);
+  CHECK(TASK_LOST == TaskState_MAX);
+  stats.tasks[TASK_STARTING] = 0;
+  stats.tasks[TASK_RUNNING] = 0;
+  stats.tasks[TASK_FINISHED] = 0;
+  stats.tasks[TASK_FAILED] = 0;
+  stats.tasks[TASK_KILLED] = 0;
+  stats.tasks[TASK_LOST] = 0;
+  stats.validStatusUpdates = 0;
+  stats.invalidStatusUpdates = 0;
+  stats.validFrameworkMessages = 0;
+  stats.invalidFrameworkMessages = 0;
 
   startTime = elapsedTime();
 
-  install(NEW_MASTER_DETECTED, &Slave::newMasterDetected,
-          &NewMasterDetectedMessage::pid);
-
-  install(NO_MASTER_DETECTED, &Slave::noMasterDetected);
-
-  install(M2S_REGISTER_REPLY, &Slave::registerReply,
-          &SlaveRegisteredMessage::slave_id);
-
-  install(M2S_REREGISTER_REPLY, &Slave::reregisterReply,
-          &SlaveRegisteredMessage::slave_id);
-
-  install(M2S_RUN_TASK, &Slave::runTask,
-          &RunTaskMessage::framework,
-          &RunTaskMessage::framework_id,
-          &RunTaskMessage::pid,
-          &RunTaskMessage::task);
-
-  install(M2S_KILL_TASK, &Slave::killTask,
-          &KillTaskMessage::framework_id,
-          &KillTaskMessage::task_id);
-
-  install(M2S_KILL_FRAMEWORK, &Slave::killFramework,
-          &KillFrameworkMessage::framework_id);
-
-  install(M2S_FRAMEWORK_MESSAGE, &Slave::schedulerMessage,
-          &FrameworkMessageMessage::slave_id,
-          &FrameworkMessageMessage::framework_id,
-          &FrameworkMessageMessage::executor_id,
-          &FrameworkMessageMessage::data);
-
-  install(M2S_UPDATE_FRAMEWORK, &Slave::updateFramework,
-          &UpdateFrameworkMessage::framework_id,
-          &UpdateFrameworkMessage::pid);
-
-  install(M2S_STATUS_UPDATE_ACK, &Slave::statusUpdateAck,
-          &StatusUpdateAckMessage::framework_id,
-          &StatusUpdateAckMessage::slave_id,
-          &StatusUpdateAckMessage::task_id);
-
-  install(E2S_REGISTER_EXECUTOR, &Slave::registerExecutor,
-          &RegisterExecutorMessage::framework_id,
-          &RegisterExecutorMessage::executor_id);
-
-  install(E2S_STATUS_UPDATE, &Slave::statusUpdate,
-          &StatusUpdateMessage::framework_id,
-          &StatusUpdateMessage::status);
-
-  install(E2S_FRAMEWORK_MESSAGE, &Slave::executorMessage,
-          &FrameworkMessageMessage::slave_id,
-          &FrameworkMessageMessage::framework_id,
-          &FrameworkMessageMessage::executor_id,
-          &FrameworkMessageMessage::data);
-
-  install(PING, &Slave::ping);
-
-  install(process::TIMEOUT, &Slave::timeout);
-
-  install(process::EXITED, &Slave::exited);
+  // Install protobuf handlers.
+  installProtobufHandler<NewMasterDetectedMessage>(
+      &Slave::newMasterDetected,
+      &NewMasterDetectedMessage::pid);
+
+  installProtobufHandler<NoMasterDetectedMessage>(
+      &Slave::noMasterDetected);
+
+  installProtobufHandler<SlaveRegisteredMessage>(
+      &Slave::registered,
+      &SlaveRegisteredMessage::slave_id);
+
+  installProtobufHandler<SlaveReregisteredMessage>(
+      &Slave::reregistered,
+      &SlaveReregisteredMessage::slave_id);
+
+  installProtobufHandler<RunTaskMessage>(
+      &Slave::runTask,
+      &RunTaskMessage::framework,
+      &RunTaskMessage::framework_id,
+      &RunTaskMessage::pid,
+      &RunTaskMessage::task);
+
+  installProtobufHandler<KillTaskMessage>(
+      &Slave::killTask,
+      &KillTaskMessage::framework_id,
+      &KillTaskMessage::task_id);
+
+  installProtobufHandler<ShutdownFrameworkMessage>(
+      &Slave::shutdownFramework,
+      &ShutdownFrameworkMessage::framework_id);
+
+  installProtobufHandler<FrameworkToExecutorMessage>(
+      &Slave::schedulerMessage,
+      &FrameworkToExecutorMessage::slave_id,
+      &FrameworkToExecutorMessage::framework_id,
+      &FrameworkToExecutorMessage::executor_id,
+      &FrameworkToExecutorMessage::data);
+
+  installProtobufHandler<UpdateFrameworkMessage>(
+      &Slave::updateFramework,
+      &UpdateFrameworkMessage::framework_id,
+      &UpdateFrameworkMessage::pid);
+
+  installProtobufHandler<StatusUpdateAcknowledgementMessage>(
+      &Slave::statusUpdateAcknowledgement,
+      &StatusUpdateAcknowledgementMessage::slave_id,
+      &StatusUpdateAcknowledgementMessage::framework_id,
+      &StatusUpdateAcknowledgementMessage::task_id,
+      &StatusUpdateAcknowledgementMessage::uuid);
+
+  installProtobufHandler<RegisterExecutorMessage>(
+      &Slave::registerExecutor,
+      &RegisterExecutorMessage::framework_id,
+      &RegisterExecutorMessage::executor_id);
+
+  installProtobufHandler<StatusUpdateMessage>(
+      &Slave::statusUpdate,
+      &StatusUpdateMessage::update);
+
+  installProtobufHandler<ExecutorToFrameworkMessage>(
+      &Slave::executorMessage,
+      &ExecutorToFrameworkMessage::slave_id,
+      &ExecutorToFrameworkMessage::framework_id,
+      &ExecutorToFrameworkMessage::executor_id,
+      &ExecutorToFrameworkMessage::data);
+
+  // Install some message handlers.
+  installMessageHandler(process::EXITED, &Slave::exited);
+  installMessageHandler("PING", &Slave::ping);
 
+  // Install some HTTP handlers.
   installHttpHandler("info.json", &Slave::http_info_json);
   installHttpHandler("frameworks.json", &Slave::http_frameworks_json);
   installHttpHandler("tasks.json", &Slave::http_tasks_json);
@@ -246,11 +438,15 @@ void Slave::operator () ()
   LOG(INFO) << "Slave started at " << self();
   LOG(INFO) << "Slave resources: " << resources;
 
-  // Get our hostname
-  char buf[512];
-  gethostname(buf, sizeof(buf));
-  hostent* he = gethostbyname2(buf, AF_INET);
-  string hostname = he->h_name;
+  Result<string> result = utils::os::hostname();
+
+  if (result.isError()) {
+    LOG(FATAL) << "Failed to get hostname: " << result.error();
+  }
+
+  CHECK(result.isSome());
+
+  string hostname = result.get();
 
   // Check and see if we have a different public DNS name. Normally
   // this is our hostname, but on EC2 we look for the MESOS_PUBLIC_DNS
@@ -262,53 +458,69 @@ void Slave::operator () ()
   }
 
   // Initialize slave info.
-  slave.set_hostname(hostname);
-  slave.set_public_hostname(public_hostname);
-  slave.mutable_resources()->MergeFrom(resources);
-
-  // Initialize isolation module.
-  isolationModule->initialize(this);
+  info.set_hostname(hostname);
+  info.set_public_hostname(public_hostname);
+  info.mutable_resources()->MergeFrom(resources);
+
+  // Spawn and initialize the isolation module.
+  spawn(isolationModule);
+  dispatch(isolationModule,
+           &IsolationModule::initialize,
+           conf, local, self());
 
   while (true) {
     serve(1);
-    if (name() == process::TERMINATE) {
-      LOG(INFO) << "Asked to shut down by " << from();
-      foreachpaircopy (_, Framework* framework, frameworks) {
-        killFramework(framework);
+    if (name() == TERMINATE) {
+      LOG(INFO) << "Asked to terminate by " << from();
+      foreachkey (const FrameworkID& frameworkId, frameworks) {
+        // TODO(benh): Because a shut down isn't instantaneous (but has
+        // a shut down/kill phases) we might not actually propogate all
+        // the status updates appropriately here. Consider providing
+        // an alternative function which skips the shut down phase and
+        // simply does a kill (sending all status updates
+        // immediately). Of course, this still isn't sufficient
+        // because those status updates might get lost and we won't
+        // resend them unless we build that into the system.
+        shutdownFramework(frameworkId);
       }
-      return;
+      break;
     }
   }
+
+  // Stop the isolation module.
+  terminate(isolationModule);
+  wait(isolationModule);
 }
 
 
-void Slave::newMasterDetected(const string& pid)
+void Slave::newMasterDetected(const UPID& pid)
 {
   LOG(INFO) << "New master detected at " << pid;
 
   master = pid;
   link(master);
 
-  if (slaveId == "") {
+  if (id == "") {
     // Slave started before master.
-    MSG<S2M_REGISTER_SLAVE> out;
-    out.mutable_slave()->MergeFrom(slave);
-    send(master, out);
+    RegisterSlaveMessage message;
+    message.mutable_slave()->MergeFrom(info);
+    send(master, message);
   } else {
     // Re-registering, so send tasks running.
-    MSG<S2M_REREGISTER_SLAVE> out;
-    out.mutable_slave_id()->MergeFrom(slaveId);
-    out.mutable_slave()->MergeFrom(slave);
-
-    foreachpair (_, Framework* framework, frameworks) {
-      foreachpair (_, Executor* executor, framework->executors) {
-	foreachpair (_, Task* task, executor->tasks) {
-	  out.add_tasks()->MergeFrom(*task);
+    ReregisterSlaveMessage message;
+    message.mutable_slave_id()->MergeFrom(id);
+    message.mutable_slave()->MergeFrom(info);
+
+    foreachvalue (Framework* framework, frameworks) {
+      foreachvalue (Executor* executor, framework->executors) {
+	foreachvalue (Task* task, executor->launchedTasks) {
+          // TODO(benh): Also need to send queued tasks here ...
+	  message.add_tasks()->MergeFrom(*task);
 	}
       }
     }
 
-    send(master, out);
+    send(master, message);
   }
 }
 
@@ -319,18 +531,18 @@ void Slave::noMasterDetected()
 }
 
 
-void Slave::registerReply(const SlaveID& slaveId)
+void Slave::registered(const SlaveID& slaveId)
 {
   LOG(INFO) << "Registered with master; given slave ID " << slaveId;
-  this->slaveId = slaveId;
+  id = slaveId;
 }
 
 
-void Slave::reregisterReply(const SlaveID& slaveId)
+void Slave::reregistered(const SlaveID& slaveId)
 {
   LOG(INFO) << "Re-registered with master";
 
-  if (!(this->slaveId == slaveId)) {
+  if (!(id == slaveId)) {
     LOG(FATAL) << "Slave re-registered but got wrong ID";
   }
 }
@@ -350,41 +562,77 @@ void Slave::runTask(const FrameworkInfo&
     frameworks[frameworkId] = framework;
   }
 
+  const ExecutorInfo& executorInfo = task.has_executor()
+    ? task.executor()
+    : framework->info.executor();
+
+  const ExecutorID& executorId = executorInfo.executor_id();
+
   // Either send the task to an executor or start a new executor
   // and queue the task until the executor has started.
-  Executor* executor = task.has_executor()
-    ? framework->getExecutor(task.executor().executor_id())
-    : framework->getExecutor(framework->info.executor().executor_id());
-        
+  Executor* executor = framework->getExecutor(executorId);
+
   if (executor != NULL) {
-    if (!executor->pid) {
+    if (executor->shutdown) {
+      LOG(WARNING) << "WARNING! Asked to run task '" << task.task_id()
+                   << "' for framework " << frameworkId
+                   << " with executor '" << executorId
+                   << "' which is being shut down";
+
+      StatusUpdateMessage message;
+      StatusUpdate* update = message.mutable_update();
+      update->mutable_framework_id()->MergeFrom(frameworkId);
+      update->mutable_slave_id()->MergeFrom(id);
+      TaskStatus* status = update->mutable_status();
+      status->mutable_task_id()->MergeFrom(task.task_id());
+      status->set_state(TASK_LOST);
+      update->set_timestamp(elapsedTime());
+      update->set_uuid(UUID::random().toBytes());
+      send(master, message);
+    } else if (!executor->pid) {
       // Queue task until the executor starts up.
-      executor->queuedTasks.push_back(task);
+      LOG(INFO) << "Queuing task '" << task.task_id()
+                << "' for executor " << executorId
+                << " of framework '" << frameworkId;
+      executor->queuedTasks[task.task_id()] = task;
     } else {
-      // Add the task to the executor.
+      // Add the task and send it to the executor.
       executor->addTask(task);
 
-      MSG<S2E_RUN_TASK> out;
-      out.mutable_framework()->MergeFrom(framework->info);
-      out.mutable_framework_id()->MergeFrom(framework->frameworkId);
-      out.set_pid(framework->pid);
-      out.mutable_task()->MergeFrom(task);
-      send(executor->pid, out);
-      isolationModule->resourcesChanged(framework, executor);
+      stats.tasks[TASK_STARTING]++;
+
+      RunTaskMessage message;
+      message.mutable_framework()->MergeFrom(framework->info);
+      message.mutable_framework_id()->MergeFrom(framework->id);
+      message.set_pid(framework->pid);
+      message.mutable_task()->MergeFrom(task);
+      send(executor->pid, message);
+
+      // Now update the resources.
+      dispatch(isolationModule,
+               &IsolationModule::resourcesChanged,
+               framework->id, executor->id, executor->resources);
     }
   } else {
     // Launch an executor for this task.
-    if (task.has_executor()) {
-      executor = framework->createExecutor(task.executor());
-    } else {
-      executor = framework->createExecutor(framework->info.executor());
-    }
+    const string& directory = getUniqueWorkDirectory(framework->id, executorId);
+
+    LOG(INFO) << "Using '" << directory
+              << "' as work directory for executor '" << executorId
+              << "' of framework " << framework->id;
+
+    executor = framework->createExecutor(executorInfo, directory);
 
     // Queue task until the executor starts up.
-    executor->queuedTasks.push_back(task);
+    executor->queuedTasks[task.task_id()] = task;
+
+    // Tell the isolation module to launch the executor. (TODO(benh):
+    // Make the isolation module a process so that it can block while
+    // trying to launch the executor.)
+    dispatch(isolationModule,
+             &IsolationModule::launchExecutor,
+             framework->id, framework->info, executor->info, directory);
 
-    // Tell the isolation module to launch the executor.
-    isolationModule->launchExecutor(framework, executor);
   }
 }
 
@@ -396,60 +644,92 @@ void Slave::killTask(const FrameworkID& 
             << " of framework " << frameworkId;
 
   Framework* framework = getFramework(frameworkId);
-  if (framework != NULL) {
-    // Tell the executor to kill the task if it is up and
-    // running, otherwise, consider the task lost.
-    Executor* executor = framework->getExecutor(taskId);
-    if (executor == NULL || !executor->pid) {
-      // Update the resources locally, if an executor comes up
-      // after this then it just won't receive this task.
-      executor->removeTask(taskId);
-      isolationModule->resourcesChanged(framework, executor);
-
-      MSG<S2M_STATUS_UPDATE> out;
-      out.mutable_framework_id()->MergeFrom(frameworkId);
-      TaskStatus *status = out.mutable_status();
-      status->mutable_task_id()->MergeFrom(taskId);
-      status->mutable_slave_id()->MergeFrom(slaveId);
-      status->set_state(TASK_LOST);
-      send(master, out);
-
-      double deadline = elapsedTime() + STATUS_UPDATE_RETRY_TIMEOUT;
-      framework->statuses[deadline][status->task_id()] = *status;
-    } else {
-      // Otherwise, send a message to the executor and wait for
-      // it to send us a status update.
-      MSG<S2E_KILL_TASK> out;
-      out.mutable_framework_id()->MergeFrom(frameworkId);
-      out.mutable_task_id()->MergeFrom(taskId);
-      send(executor->pid, out);
-    }
-  } else {
-    LOG(WARNING) << "Cannot kill task " << taskId
+  if (framework == NULL) {
+    LOG(WARNING) << "WARNING! Cannot kill task " << taskId
                  << " of framework " << frameworkId
                  << " because no such framework is running";
 
-    MSG<S2M_STATUS_UPDATE> out;
-    out.mutable_framework_id()->MergeFrom(frameworkId);
-    TaskStatus *status = out.mutable_status();
+    StatusUpdateMessage message;
+    StatusUpdate* update = message.mutable_update();
+    update->mutable_framework_id()->MergeFrom(frameworkId);
+    update->mutable_slave_id()->MergeFrom(id);
+    TaskStatus* status = update->mutable_status();
     status->mutable_task_id()->MergeFrom(taskId);
-    status->mutable_slave_id()->MergeFrom(slaveId);
     status->set_state(TASK_LOST);
-    send(master, out);
+    update->set_timestamp(elapsedTime());
+    update->set_uuid(UUID::random().toBytes());
+    send(master, message);
 
-    double deadline = elapsedTime() + STATUS_UPDATE_RETRY_TIMEOUT;
-    framework->statuses[deadline][status->task_id()] = *status;
+    return;
+  }
+
+
+  // Tell the executor to kill the task if it is up and
+  // running, otherwise, consider the task lost.
+  Executor* executor = framework->getExecutor(taskId);
+  if (executor == NULL) {
+    LOG(WARNING) << "WARNING! Cannot kill task " << taskId
+                 << " of framework " << frameworkId
+                 << " because no such task is running";
+
+    StatusUpdateMessage message;
+    StatusUpdate* update = message.mutable_update();
+    update->mutable_framework_id()->MergeFrom(framework->id);
+    update->mutable_slave_id()->MergeFrom(id);
+    TaskStatus* status = update->mutable_status();
+    status->mutable_task_id()->MergeFrom(taskId);
+    status->set_state(TASK_LOST);
+    update->set_timestamp(elapsedTime());
+    update->set_uuid(UUID::random().toBytes());
+    send(master, message);
+  } else if (!executor->pid) {
+    // Remove the task.
+    executor->removeTask(taskId);
+
+    // Tell the isolation module to update the resources.
+    dispatch(isolationModule,
+             &IsolationModule::resourcesChanged,
+             framework->id, executor->id, executor->resources);
+
+    StatusUpdateMessage message;
+    StatusUpdate* update = message.mutable_update();
+    update->mutable_framework_id()->MergeFrom(framework->id);
+    update->mutable_executor_id()->MergeFrom(executor->id);
+    update->mutable_slave_id()->MergeFrom(id);
+    TaskStatus* status = update->mutable_status();
+    status->mutable_task_id()->MergeFrom(taskId);
+    status->set_state(TASK_KILLED);
+    update->set_timestamp(elapsedTime());
+    update->set_uuid(UUID::random().toBytes());
+    send(master, message);
+  } else {
+    // Otherwise, send a message to the executor and wait for
+    // it to send us a status update.
+    KillTaskMessage message;
+    message.mutable_framework_id()->MergeFrom(frameworkId);
+    message.mutable_task_id()->MergeFrom(taskId);
+    send(executor->pid, message);
   }
 }
 
 
-void Slave::killFramework(const FrameworkID& frameworkId)
+// TODO(benh): Consider sending a boolean that specifies if the
+// shut down should be graceful or immediate. Likewise, consider
+// sending back a shut down acknowledgement, because otherwise you
+// couuld get into a state where a shut down was sent, dropped, and
+// therefore never processed.
+void Slave::shutdownFramework(const FrameworkID& frameworkId)
 {
-  LOG(INFO) << "Asked to kill framework " << frameworkId;
+  LOG(INFO) << "Asked to shut down framework " << frameworkId;
 
   Framework* framework = getFramework(frameworkId);
   if (framework != NULL) {
-    killFramework(framework);
+    LOG(INFO) << "Shutting down framework " << framework->id;
+
+    // Shut down all executors of this framework.
+    foreachvalue (Executor* executor, framework->executors) {
+      shutdownExecutor(framework, executor);
+    }
   }
 }
 
@@ -460,35 +740,36 @@ void Slave::schedulerMessage(const Slave
                              const string& data)
 {
   Framework* framework = getFramework(frameworkId);
-  if (framework != NULL) {
-    Executor* executor = framework->getExecutor(executorId);
-    if (executor == NULL) {
-      LOG(WARNING) << "Dropping message for executor '"
-                   << executorId << "' of framework " << frameworkId
-                   << " because executor does not exist";
-      statistics.invalid_framework_messages++;
-    } else if (!executor->pid) {
-      // TODO(*): If executor is not started, queue framework message?
-      // (It's probably okay to just drop it since frameworks can have
-      // the executor send a message to the master to say when it's ready.)
-      LOG(WARNING) << "Dropping message for executor '"
-                   << executorId << "' of framework " << frameworkId
-                   << " because executor is not running";
-      statistics.invalid_framework_messages++;
-    } else {
-      MSG<S2E_FRAMEWORK_MESSAGE> out;
-      out.mutable_slave_id()->MergeFrom(slaveId);
-      out.mutable_framework_id()->MergeFrom(frameworkId);
-      out.mutable_executor_id()->MergeFrom(executorId);
-      out.set_data(data);
-      send(executor->pid, out);
-
-      statistics.valid_framework_messages++;
-    }
-  } else {
+  if (framework == NULL) {
     LOG(WARNING) << "Dropping message for framework "<< frameworkId
                  << " because framework does not exist";
-    statistics.invalid_framework_messages++;
+    stats.invalidFrameworkMessages++;
+    return;
+  }
+
+  Executor* executor = framework->getExecutor(executorId);
+  if (executor == NULL) {
+    LOG(WARNING) << "Dropping message for executor '"
+                 << executorId << "' of framework " << frameworkId
+                 << " because executor does not exist";
+    stats.invalidFrameworkMessages++;
+  } else if (!executor->pid) {
+    // TODO(*): If executor is not started, queue framework message?
+    // (It's probably okay to just drop it since frameworks can have
+    // the executor send a message to the master to say when it's ready.)
+    LOG(WARNING) << "Dropping message for executor '"
+                 << executorId << "' of framework " << frameworkId
+                 << " because executor is not running";
+    stats.invalidFrameworkMessages++;
+  } else {
+    FrameworkToExecutorMessage message;
+    message.mutable_slave_id()->MergeFrom(slaveId);
+    message.mutable_framework_id()->MergeFrom(frameworkId);
+    message.mutable_executor_id()->MergeFrom(executorId);
+    message.set_data(data);
+    send(executor->pid, message);
+
+    stats.validFrameworkMessages++;
   }
 }
 
@@ -505,25 +786,94 @@ void Slave::updateFramework(const Framew
 }
 
 
-void Slave::statusUpdateAck(const FrameworkID& frameworkId,
-                            const SlaveID& slaveId,
-                            const TaskID& taskId)
+void Slave::statusUpdateAcknowledgement(const SlaveID& slaveId,
+                                        const FrameworkID& frameworkId,
+                                        const TaskID& taskId,
+                                        const string& uuid)
 {
   Framework* framework = getFramework(frameworkId);
   if (framework != NULL) {
-    foreachpair (double deadline, _, framework->statuses) {
-      if (framework->statuses[deadline].count(taskId) > 0) {
-        LOG(INFO) << "Got acknowledgement of status update"
-                  << " for task " << taskId
-                  << " of framework " << framework->frameworkId;
-        framework->statuses[deadline].erase(taskId);
-        break;
-      }
+    if (framework->updates.contains(UUID::fromBytes(uuid))) {
+      LOG(INFO) << "Got acknowledgement of status update"
+                << " for task " << taskId
+                << " of framework " << frameworkId;
+      framework->updates.erase(UUID::fromBytes(uuid));
     }
   }
 }
 
 
+// void Slave::statusUpdateAcknowledged(const SlaveID& slaveId,
+//                                      const FrameworkID& frameworkId,
+//                                      const TaskID& taskId,
+//                                      uint32_t sequence)
+// {
+//   StatusUpdateStreamID id(frameworkId, taskId);
+//   StatusUpdateStream* stream = getStatusUpdateStream(id);
+
+//   if (stream == NULL) {
+//     LOG(WARNING) << "WARNING! Received unexpected status update"
+//                  << " acknowledgement for task " << taskId
+//                  << " of framework " << frameworkId;
+//     return;
+//   }
+
+//   CHECK(!stream->pending.empty());
+
+//   const StatusUpdate& update = stream->pending.front();
+
+//   if (update->sequence() != sequence) {
+//     LOG(WARNING) << "WARNING! Received status update acknowledgement"
+//                  << " with bad sequence number (received " << sequence
+//                  << ", expecting " << update->sequence()
+//                  << ") for task " << taskId
+//                  << " of framework " << frameworkId;
+//   } else {
+//     LOG(INFO) << "Received status update acknowledgement for task "
+//               << taskId << " of framework " << frameworkId;
+
+//     // Write the update out to disk.
+//     CHECK(stream->acknowledged != NULL);
+
+//     Result<bool> result =
+//       utils::protobuf::write(stream->acknowledged, update);
+
+//     if (result.isError()) {
+//       // Failing here is rather dramatic, but so is not being able to
+//       // write to disk ... seems like failing early and often might do
+//       // more benefit than harm.
+//       LOG(FATAL) << "Failed to write status update to "
+//                  << stream->directory << "/acknowledged: "
+//                  << result.message();
+//     }
+
+//     stream->pending.pop();
+
+//     bool empty = stream->pending.empty();
+
+//     bool terminal =
+//       update.status().state() == TASK_FINISHED &&
+//       update.status().state() == TASK_FAILED &&
+//       update.status().state() == TASK_KILLED &&
+//       update.status().state() == TASK_LOST;
+
+//     if (empty && terminal) {
+//       cleanupStatusUpdateStream(stream);
+//     } else if (!empty && terminal) {
+//       LOG(WARNING) << "WARNING! Acknowledged a \"terminal\""
+//                    << " task status but updates are still pending";
+//     } else if (!empty) {
+//       StatusUpdateMessage message;
+//       message.mutable_update()->MergeFrom(stream->pending.front());
+//       message.set_reliable(true);
+//       send(master, message);
+
+//       stream->timeout = elapsedTime() + STATUS_UPDATE_RETRY_INTERVAL;
+//     }
+//   }
+// }
+
+
 void Slave::registerExecutor(const FrameworkID& frameworkId,
                              const ExecutorID& executorId)
 {
@@ -531,172 +881,334 @@ void Slave::registerExecutor(const Frame
             << "' of framework " << frameworkId;
 
   Framework* framework = getFramework(frameworkId);
-  if (framework != NULL) {
-    Executor* executor = framework->getExecutor(executorId);
-
-    // Check the status of the executor.
-    if (executor == NULL) {
-      LOG(WARNING) << "Not expecting executor '" << executorId
-                   << "' of framework " << frameworkId;
-      send(from(), S2E_KILL_EXECUTOR);
-    } else if (executor->pid != UPID()) {
-      LOG(WARNING) << "Not good, executor '" << executorId
-                   << "' of framework " << frameworkId
-                   << " is already running";
-      send(from(), S2E_KILL_EXECUTOR);
-    } else {
-      // Save the pid for the executor.
-      executor->pid = from();
-
-      // Now that the executor is up, set its resource limits.
-      isolationModule->resourcesChanged(framework, executor);
-
-      // Tell executor it's registered and give it any queued tasks.
-      MSG<S2E_REGISTER_REPLY> out;
-      ExecutorArgs* args = out.mutable_args();
-      args->mutable_framework_id()->MergeFrom(framework->frameworkId);
-      args->mutable_executor_id()->MergeFrom(executor->info.executor_id());
-      args->mutable_slave_id()->MergeFrom(slaveId);
-      args->set_hostname(slave.hostname());
-      args->set_data(framework->info.executor().data());
-      send(executor->pid, out);
-      sendQueuedTasks(framework, executor);
-    }
-  } else {
+  if (framework == NULL) {
     // Framework is gone; tell the executor to exit.
     LOG(WARNING) << "Framework " << frameworkId
                  << " does not exist (it may have been killed),"
                  << " telling executor to exit";
+    send(from(), ShutdownExecutorMessage());
+    return;
+  }
+
+  Executor* executor = framework->getExecutor(executorId);
 
-    // TODO(benh): Don't we also want to tell the isolation
-    // module to shut this guy down!
-    send(from(), S2E_KILL_EXECUTOR);
+  // Check the status of the executor.
+  if (executor == NULL) {
+    LOG(WARNING) << "WARNING! Unexpected executor '" << executorId
+                 << "' registering for framework " << frameworkId;
+    send(from(), ShutdownExecutorMessage());
+  } else if (executor->pid) {
+    LOG(WARNING) << "WARNING! executor '" << executorId
+                 << "' of framework " << frameworkId
+                 << " is already running";
+    send(from(), ShutdownExecutorMessage());
+  } else {
+    // Save the pid for the executor.
+    executor->pid = from();
+
+    // Now that the executor is up, set its resource limits.
+    dispatch(isolationModule,
+             &IsolationModule::resourcesChanged,
+             framework->id, executor->id, executor->resources);
+
+    // Tell executor it's registered and give it any queued tasks.
+    ExecutorRegisteredMessage message;
+    ExecutorArgs* args = message.mutable_args();
+    args->mutable_framework_id()->MergeFrom(framework->id);
+    args->mutable_executor_id()->MergeFrom(executor->id);
+    args->mutable_slave_id()->MergeFrom(id);
+    args->set_hostname(info.hostname());
+    args->set_data(executor->info.data());
+    send(executor->pid, message);
+
+    LOG(INFO) << "Flushing queued tasks for framework " << framework->id;
+
+    foreachvalue (const TaskDescription& task, executor->queuedTasks) {
+      // Add the task to the executor.
+      executor->addTask(task);
+
+      stats.tasks[TASK_STARTING]++;
+
+      RunTaskMessage message;
+      message.mutable_framework_id()->MergeFrom(framework->id);
+      message.mutable_framework()->MergeFrom(framework->info);
+      message.set_pid(framework->pid);
+      message.mutable_task()->MergeFrom(task);
+      send(executor->pid, message);
+    }
+
+    executor->queuedTasks.clear();
   }
 }
 
 
-void Slave::statusUpdate(const FrameworkID& frameworkId,
-                         const TaskStatus& status)
+// void Slave::statusUpdate(const StatusUpdate& update)
+// {
+//   LOG(INFO) << "Received update that task " << update.status().task_id()
+//             << " of framework " << update.framework_id()
+//             << " is now in state " << update.status().state();
+
+//   Framework* framework = getFramework(update.framework_id());
+//   if (framework == NULL) {
+//     LOG(WARNING) << "WARNING! Failed to lookup"
+//                  << " framework " << update.framework_id()
+//                  << " of received status update";
+//     stats.invalidStatusUpdates++;
+//     return;
+//   }
+
+//   Executor* executor = framework->getExecutor(update.status().task_id());
+//   if (executor == NULL) {
+//     LOG(WARNING) << "WARNING! Failed to lookup executor"
+//                  << " for framework " << update.framework_id()
+//                  << " of received status update";
+//     stats.invalidStatusUpdates++;
+//     return;
+//   }
+
+//   // Create/Get the status update stream for this framework/task.
+//   StatusUpdateStreamID id(update.framework_id(), update.status().task_id());
+
+//   if (!statusUpdateStreams.contains(id)) {
+//     StatusUpdateStream* stream =
+//       createStatusUpdateStream(id, executor->directory);
+
+//     if (stream == NULL) {
+//       LOG(WARNING) << "WARNING! Failed to create status update"
+//                    << " stream for task " << update.status().task_id()
+//                    << " of framework " << update.framework_id()
+//                    << " ... removing executor!";
+//       removeExecutor(framework, executor);
+//       return;
+//     }
+//   }
+
+//   StatusUpdateStream* stream = getStatusUpdateStream(id);
+
+//   CHECK(stream != NULL);
+
+//   // If we are already waiting on an acknowledgement, check that this
+//   // update (coming from the executor), is the same one that we are
+//   // waiting on being acknowledged.
+
+//   // Check that this is status update has not already been
+//   // acknowledged. this could happen because a slave writes the
+//   // acknowledged message but then fails before it can pass the
+//   // message on to the executor, so the executor tries again.
+
+//   returnhere;
+
+//   // TODO(benh): Check that this update hasn't already been received
+//   // or acknowledged! This could happen if a slave receives a status
+//   // update from an executor, then crashes after it writes it to disk
+//   // but before it sends an ack back to 
+
+//   // Okay, record this update as received.
+//   CHECK(stream->received != NULL);
+
+//   Result<bool> result =
+//     utils::protobuf::write(stream->received, &update);
+
+//   if (result.isError()) {
+//     // Failing here is rather dramatic, but so is not being able to
+//     // write to disk ... seems like failing early and often might do
+//     // more benefit than harm.
+//     LOG(FATAL) << "Failed to write status update to "
+//                << stream->directory << "/received: "
+//                << result.message();
+//   }
+
+//   // Now acknowledge the executor.
+//   StatusUpdateAcknowledgementMessage message;
+//   message.mutable_framework_id()->MergeFrom(update.framework_id());
+//   message.mutable_slave_id()->MergeFrom(update.slave_id());
+//   message.mutable_task_id()->MergeFrom(update.status().task_id());
+//   send(executor->pid, message);
+
+//   executor->updateTaskState(
+//       update.status().task_id(),
+//       update.status().state());
+
+//   // Remove the task if it's reached a terminal state.
+//   bool terminal =
+//     update.status().state() == TASK_FINISHED &&
+//     update.status().state() == TASK_FAILED &&
+//     update.status().state() == TASK_KILLED &&
+//     update.status().state() == TASK_LOST;
+
+//   if (terminal) {
+//     executor->removeTask(update.status().task_id());
+//     isolationModule->resourcesChanged(
+//         framework->id, framework->info,
+//         executor->info, executor->resources);
+//   }
+
+//   stream->pending.push(update);
+
+//   // Send the status update if this is the first in the
+//   // stream. Subsequent status updates will get sent in
+//   // Slave::statusUpdateAcknowledged.
+//   if (stream->pending.size() == 1) {
+//     CHECK(stream->timeout == -1);
+//     StatusUpdateMessage message;
+//     message.mutable_update()->MergeFrom(update);
+//     message.set_reliable(true);
+//     send(master, message);
+
+//     stream->timeout = elapsedTime() + STATUS_UPDATE_RETRY_INTERVAL;
+//   }
+
+//   stats.tasks[status.state()]++;
+//   stats.validStatusUpdates++;
+// }
+
+void Slave::statusUpdate(const StatusUpdate& update)
 {
+  const TaskStatus& status = update.status();
+
   LOG(INFO) << "Status update: task " << status.task_id()
-            << " of framework " << frameworkId
-            << " is now in state "
-            << TaskState_descriptor()->FindValueByNumber(status.state())->name();
+            << " of framework " << update.framework_id()
+            << " is now in state " << status.state();
 
-  Framework* framework = getFramework(frameworkId);
+  Framework* framework = getFramework(update.framework_id());
   if (framework != NULL) {
     Executor* executor = framework->getExecutor(status.task_id());
     if (executor != NULL) {
       executor->updateTaskState(status.task_id(), status.state());
 
-      // Remove the task if necessary, and update statistics.
-      switch (status.state()) {
-        case TASK_FINISHED:
-          statistics.finished_tasks++;
-          executor->removeTask(status.task_id());
-          isolationModule->resourcesChanged(framework, executor);
-          break;
-        case TASK_FAILED:
-          statistics.failed_tasks++;
-          executor->removeTask(status.task_id());
-          isolationModule->resourcesChanged(framework, executor);
-          break;
-       case TASK_KILLED:
-         statistics.killed_tasks++;
-         executor->removeTask(status.task_id());
-         isolationModule->resourcesChanged(framework, executor);
-         break;
-        case TASK_LOST:
-          statistics.lost_tasks++;
-          executor->removeTask(status.task_id());
-          isolationModule->resourcesChanged(framework, executor);
-          break;
+      // Handle the task appropriately if it's terminated.
+      if (status.state() == TASK_FINISHED ||
+          status.state() == TASK_FAILED ||
+          status.state() == TASK_KILLED ||
+          status.state() == TASK_LOST) {
+        executor->removeTask(status.task_id());
+
+        dispatch(isolationModule,
+                 &IsolationModule::resourcesChanged,
+                 framework->id, executor->id, executor->resources);
       }
 
       // Send message and record the status for possible resending.
-      MSG<S2M_STATUS_UPDATE> out;
-      out.mutable_framework_id()->MergeFrom(frameworkId);
-      out.mutable_status()->MergeFrom(status);
-      send(master, out);
+      StatusUpdateMessage message;
+      message.mutable_update()->MergeFrom(update);
+      message.set_pid(self());
+      send(master, message);
 
-      double deadline = elapsedTime() + STATUS_UPDATE_RETRY_TIMEOUT;
-      framework->statuses[deadline][status.task_id()] = status;
+      UUID uuid = UUID::fromBytes(update.uuid());
 
-      statistics.valid_status_updates++;
+      // Send us a message to try and resend after some delay.
+      delay(STATUS_UPDATE_RETRY_INTERVAL_SECONDS,
+            self(), &Slave::statusUpdateTimeout,
+            framework->id, uuid);
+
+      framework->updates[uuid] = update;
+
+      stats.tasks[status.state()]++;
+
+      stats.validStatusUpdates++;
     } else {
       LOG(WARNING) << "Status update error: couldn't lookup "
-                   << "executor for framework " << frameworkId;
-      statistics.invalid_status_updates++;
+                   << "executor for framework " << update.framework_id();
+      stats.invalidStatusUpdates++;
     }
   } else {
     LOG(WARNING) << "Status update error: couldn't lookup "
-                 << "framework " << frameworkId;
-    statistics.invalid_status_updates++;
+                 << "framework " << update.framework_id();
+    stats.invalidStatusUpdates++;
   }
 }
 
 
 void Slave::executorMessage(const SlaveID& slaveId,
-			    const FrameworkID& frameworkId,
-			    const ExecutorID& executorId,
+                            const FrameworkID& frameworkId,
+                            const ExecutorID& executorId,
                             const string& data)
 {
   Framework* framework = getFramework(frameworkId);
-  if (framework != NULL) {
-    LOG(INFO) << "Sending message for framework " << frameworkId
-              << " to " << framework->pid;
-
-    // TODO(benh): This is weird, sending an M2F message.
-    MSG<M2F_FRAMEWORK_MESSAGE> out;
-    out.mutable_slave_id()->MergeFrom(slaveId);
-    out.mutable_framework_id()->MergeFrom(frameworkId);
-    out.mutable_executor_id()->MergeFrom(executorId);
-    out.set_data(data);
-    send(framework->pid, out);
-
-    statistics.valid_framework_messages++;
-  } else {
+  if (framework == NULL) {
     LOG(WARNING) << "Cannot send framework message from slave "
                  << slaveId << " to framework " << frameworkId
                  << " because framework does not exist";
-    statistics.invalid_framework_messages++;
+    stats.invalidFrameworkMessages++;
+    return;
   }
+
+  LOG(INFO) << "Sending message for framework " << frameworkId
+            << " to " << framework->pid;
+
+  ExecutorToFrameworkMessage message;
+  message.mutable_slave_id()->MergeFrom(slaveId);
+  message.mutable_framework_id()->MergeFrom(frameworkId);
+  message.mutable_executor_id()->MergeFrom(executorId);
+  message.set_data(data);
+  send(framework->pid, message);
+
+  stats.validFrameworkMessages++;
 }
 
 
 void Slave::ping()
 {
-  send(from(), PONG);
+  send(from(), "PONG");
 }
 
 
-void Slave::timeout()
+void Slave::statusUpdateTimeout(
+    const FrameworkID& frameworkId,
+    const UUID& uuid)
 {
-  // Check and see if we should re-send any status updates.
-  foreachpair (_, Framework* framework, frameworks) {
-    foreachpair (double deadline, _, framework->statuses) {
-      if (deadline <= elapsedTime()) {
-        foreachpair (_, const TaskStatus& status, framework->statuses[deadline]) {
-          LOG(WARNING) << "Resending status update"
-                       << " for task " << status.task_id()
-                       << " of framework " << framework->frameworkId;
-          MSG<S2M_STATUS_UPDATE> out;
-          out.mutable_framework_id()->MergeFrom(framework->frameworkId);
-          out.mutable_status()->MergeFrom(status);
-          send(master, out);
-        }
-      }
+  // Check and see if we still need to send this update.
+  Framework* framework = getFramework(frameworkId);
+  if (framework != NULL) {
+    if (framework->updates.contains(uuid)) {
+      const StatusUpdate& update = framework->updates[uuid];
+
+      LOG(INFO) << "Resending status update"
+                << " for task " << update.status().task_id()
+                << " of framework " << update.framework_id();
+
+      StatusUpdateMessage message;
+      message.mutable_update()->MergeFrom(update);
+      message.set_pid(self());
+      send(master, message);
     }
   }
 }
 
+
+// void Slave::timeout()
+// {
+//   // Check and see if we should re-send any status updates.
+//   double now = elapsedTime();
+
+//   foreachvalue (StatusUpdateStream* stream, statusUpdateStreams) {
+//     CHECK(stream->timeout > 0);
+//     if (stream->timeout < now) {
+//       CHECK(!stream->pending.empty());
+//       const StatusUpdate& update = stream->pending.front();
+
+//       LOG(WARNING) << "WARNING! Resending status update"
+//                 << " for task " << update.status().task_id()
+//                 << " of framework " << update.framework_id();
+      
+//       StatusUpdateMessage message;
+//       message.mutable_update()->MergeFrom(update);
+//       message.set_reliable(true);
+//       send(master, message);
+
+//       stream->timeout = now + STATUS_UPDATE_RETRY_INTERVAL;
+//     }
+//   }
+// }
+
+
 void Slave::exited()
 {
   LOG(INFO) << "Process exited: " << from();
 
   if (from() == master) {
-    LOG(WARNING) << "Master disconnected! "
-                 << "Waiting for a new master to be elected.";
+    LOG(WARNING) << "WARNING! Master disconnected!"
+                 << " Waiting for a new master to be elected.";
     // TODO(benh): After so long waiting for a master, commit suicide.
   }
 }
@@ -706,7 +1218,7 @@ Promise<HttpResponse> Slave::http_info_j
 {
   LOG(INFO) << "HTTP request for '/slave/info.json'";
 
-  ostringstream out;
+  std::ostringstream out;
 
   out <<
     "{" <<
@@ -718,7 +1230,7 @@ Promise<HttpResponse> Slave::http_info_j
 
   HttpOKResponse response;
   response.headers["Content-Type"] = "text/x-json;charset=UTF-8";
-  response.headers["Content-Length"] = lexical_cast<string>(out.str().size());
+  response.headers["Content-Length"] = utils::stringify(out.str().size());
   response.body = out.str().data();
   return response;
 }
@@ -728,14 +1240,14 @@ Promise<HttpResponse> Slave::http_framew
 {
   LOG(INFO) << "HTTP request for '/slave/frameworks.json'";
 
-  ostringstream out;
+  std::ostringstream out;
 
   out << "[";
 
-  foreachpair (_, Framework* framework, frameworks) {
+  foreachvalue (Framework* framework, frameworks) {
     out <<
       "{" <<
-      "\"id\":\"" << framework->frameworkId << "\"," <<
+      "\"id\":\"" << framework->id << "\"," <<
       "\"name\":\"" << framework->info.name() << "\"," <<
       "\"user\":\"" << framework->info.user() << "\""
       "},";
@@ -751,7 +1263,7 @@ Promise<HttpResponse> Slave::http_framew
 
   HttpOKResponse response;
   response.headers["Content-Type"] = "text/x-json;charset=UTF-8";
-  response.headers["Content-Length"] = lexical_cast<string>(out.str().size());
+  response.headers["Content-Length"] = utils::stringify(out.str().size());
   response.body = out.str().data();
   return response;
 }
@@ -761,26 +1273,24 @@ Promise<HttpResponse> Slave::http_tasks_
 {
   LOG(INFO) << "HTTP request for '/slave/tasks.json'";
 
-  ostringstream out;
+  std::ostringstream out;
 
   out << "[";
 
-  foreachpair (_, Framework* framework, frameworks) {
-    foreachpair (_, Executor* executor, framework->executors) {
-      foreachpair (_, Task* task, executor->tasks) {
+  foreachvalue (Framework* framework, frameworks) {
+    foreachvalue (Executor* executor, framework->executors) {
+      foreachvalue (Task* task, executor->launchedTasks) {
         // TODO(benh): Send all of the resources (as JSON).
         Resources resources(task->resources());
         Resource::Scalar cpus = resources.getScalar("cpus", Resource::Scalar());
         Resource::Scalar mem = resources.getScalar("mem", Resource::Scalar());
-        const string& state =
-          TaskState_descriptor()->FindValueByNumber(task->state())->name();
         out <<
           "{" <<
           "\"task_id\":\"" << task->task_id() << "\"," <<
           "\"framework_id\":\"" << task->framework_id() << "\"," <<
           "\"slave_id\":\"" << task->slave_id() << "\"," <<
           "\"name\":\"" << task->name() << "\"," <<
-          "\"state\":\"" << state << "\"," <<
+          "\"state\":\"" << task->state() << "\"," <<
           "\"cpus\":" << cpus.value() << "," <<
           "\"mem\":" << mem.value() <<
           "},";
@@ -798,7 +1308,7 @@ Promise<HttpResponse> Slave::http_tasks_
 
   HttpOKResponse response;
   response.headers["Content-Type"] = "text/x-json;charset=UTF-8";
-  response.headers["Content-Length"] = lexical_cast<string>(out.str().size());
+  response.headers["Content-Length"] = utils::stringify(out.str().size());
   response.body = out.str().data();
   return response;
 }
@@ -808,26 +1318,28 @@ Promise<HttpResponse> Slave::http_stats_
 {
   LOG(INFO) << "Http request for '/slave/stats.json'";
 
-  ostringstream out;
+  std::ostringstream out;
+
+  out << std::setprecision(10);
 
   out <<
     "{" <<
     "\"uptime\":" << elapsedTime() - startTime << "," <<
     "\"total_frameworks\":" << frameworks.size() << "," <<
-    "\"launched_tasks\":" << statistics.launched_tasks << "," <<
-    "\"finished_tasks\":" << statistics.finished_tasks << "," <<
-    "\"killed_tasks\":" << statistics.killed_tasks << "," <<
-    "\"failed_tasks\":" << statistics.failed_tasks << "," <<
-    "\"lost_tasks\":" << statistics.lost_tasks << "," <<
-    "\"valid_status_updates\":" << statistics.valid_status_updates << "," <<
-    "\"invalid_status_updates\":" << statistics.invalid_status_updates << "," <<
-    "\"valid_framework_messages\":" << statistics.valid_framework_messages << "," <<
-    "\"invalid_framework_messages\":" << statistics.invalid_framework_messages <<
+    "\"started_tasks\":" << stats.tasks[TASK_STARTING] << "," <<
+    "\"finished_tasks\":" << stats.tasks[TASK_FINISHED] << "," <<
+    "\"killed_tasks\":" << stats.tasks[TASK_KILLED] << "," <<
+    "\"failed_tasks\":" << stats.tasks[TASK_FAILED] << "," <<
+    "\"lost_tasks\":" << stats.tasks[TASK_LOST] << "," <<
+    "\"valid_status_updates\":" << stats.validStatusUpdates << "," <<
+    "\"invalid_status_updates\":" << stats.invalidStatusUpdates << "," <<
+    "\"valid_framework_messages\":" << stats.validFrameworkMessages << "," <<
+    "\"invalid_framework_messages\":" << stats.invalidFrameworkMessages <<
     "}";
 
   HttpOKResponse response;
   response.headers["Content-Type"] = "text/x-json;charset=UTF-8";
-  response.headers["Content-Length"] = lexical_cast<string>(out.str().size());
+  response.headers["Content-Length"] = utils::stringify(out.str().size());
   response.body = out.str().data();
   return response;
 }
@@ -837,7 +1349,7 @@ Promise<HttpResponse> Slave::http_vars(c
 {
   LOG(INFO) << "HTTP request for '/slave/vars'";
 
-  ostringstream out;
+  std::ostringstream out;
 
   out <<
     "build_date " << build::DATE << "\n" <<
@@ -849,22 +1361,24 @@ Promise<HttpResponse> Slave::http_vars(c
     out << key << " " << value << "\n";
   }
 
+  out << std::setprecision(10);
+
   out <<
     "uptime " << elapsedTime() - startTime << "\n" <<
     "total_frameworks " << frameworks.size() << "\n" <<
-    "launched_tasks " << statistics.launched_tasks << "\n" <<
-    "finished_tasks " << statistics.finished_tasks << "\n" <<
-    "killed_tasks " << statistics.killed_tasks << "\n" <<
-    "failed_tasks " << statistics.failed_tasks << "\n" <<
-    "lost_tasks " << statistics.lost_tasks << "\n" <<
-    "valid_status_updates " << statistics.valid_status_updates << "\n" <<
-    "invalid_status_updates " << statistics.invalid_status_updates << "\n" <<
-    "valid_framework_messages " << statistics.valid_framework_messages << "\n" <<
-    "invalid_framework_messages " << statistics.invalid_framework_messages << "\n";
+    "started_tasks " << stats.tasks[TASK_STARTING] << "\n" <<
+    "finished_tasks " << stats.tasks[TASK_FINISHED] << "\n" <<
+    "killed_tasks " << stats.tasks[TASK_KILLED] << "\n" <<
+    "failed_tasks " << stats.tasks[TASK_FAILED] << "\n" <<
+    "lost_tasks " << stats.tasks[TASK_LOST] << "\n" <<
+    "valid_status_updates " << stats.validStatusUpdates << "\n" <<
+    "invalid_status_updates " << stats.invalidStatusUpdates << "\n" <<
+    "valid_framework_messages " << stats.validFrameworkMessages << "\n" <<
+    "invalid_framework_messages " << stats.invalidFrameworkMessages << "\n";
 
   HttpOKResponse response;
   response.headers["Content-Type"] = "text/plain";
-  response.headers["Content-Length"] = lexical_cast<string>(out.str().size());
+  response.headers["Content-Length"] = utils::stringify(out.str().size());
   response.body = out.str().data();
   return response;
 }
@@ -880,103 +1394,318 @@ Framework* Slave::getFramework(const Fra
 }
 
 
-// Send any tasks queued up for the given framework to its executor
-// (needed if we received tasks while the executor was starting up)
-void Slave::sendQueuedTasks(Framework* framework, Executor* executor)
+// StatusUpdates* Slave::getStatusUpdateStream(const StatusUpdateStreamID& id)
+// {
+//   if (statusUpdateStreams.contains(id)) {
+//     return statusUpdateStreams[id];
+//   }
+
+//   return NULL;
+// }
+
+
+// StatusUpdateStream* Slave::createStatusUpdateStream(
+//     const FrameworkID& frameworkId,
+//     const TaskID& taskId,
+//     const string& directory)
+// {
+//   StatusUpdateStream* stream = new StatusUpdates();
+//   stream->id = id;
+//   stream->directory = directory;
+//   stream->received = NULL;
+//   stream->acknowledged = NULL;
+//   stream->timeout = -1;
+
+//   streams[id] = stream;
+
+//   // Open file descriptors for "updates" and "acknowledged".
+//   string path;
+//   Result<int> result;
+
+//   path = stream->directory + "/received";
+//   result = utils::os::open(path, O_CREAT | O_RDWR | O_SYNC);
+//   if (result.isError() || result.isNone()) {
+//     LOG(WARNING) << "Failed to open " << path
+//                  << " for storing received status updates";
+//     cleanupStatusUpdateStream(stream);
+//     return NULL;
+//   }
+
+//   stream->received = result.get();
+
+//   path = updates->directory + "/acknowledged";
+//   result = utils::os::open(path, O_CREAT | O_RDWR | O_SYNC);
+//   if (result.isError() || result.isNone()) {
+//     LOG(WARNING) << "Failed to open " << path << 
+//                  << " for storing acknowledged status updates";
+//     cleanupStatusUpdateStream(stream);
+//     return NULL;
+//   }
+
+//   stream->acknowledged = result.get();
+
+//   // Replay the status updates. This is necessary because the slave
+//   // might have crashed but was restarted before the executors
+//   // died. Or another task with the same id as before got run again on
+//   // the same executor.
+//   bool replayed = replayStatusUpdateStream(stream);
+
+//   if (!replayed) {
+//     LOG(WARNING) << "Failed to correctly replay status updates"
+//                  << " for task " << taskId
+//                  << " of framework " << frameworkId
+//                  << " found at " << path;
+//     cleanupStatusUpdateStream(stream);
+//     return NULL;
+//   }
+
+//   // Start sending any pending status updates. In this case, the slave
+//   // probably died after it sent the status update and never received
+//   // the acknowledgement.
+//   if (!stream->pending.empty()) {
+//     StatusUpdate* update = stream->pending.front();
+//     StatusUpdateMessage message;
+//     message.mutable_update()->MergeFrom(*update);
+//     message.set_reliable(true);
+//     send(master, message);
+
+//     stream->timeout = elapsedTime() + STATUS_UPDATE_RETRY_INTERVAL;
+//   }
+
+//   return stream;
+// }
+
+
+// bool Slave::replayStatusUpdateStream(StatusUpdateStream* stream)
+// {
+//   CHECK(stream->received != NULL);
+//   CHECK(stream->acknowledged != NULL);
+
+//   Result<StatusUpdate*> result;
+
+//   // Okay, now read all the recevied status updates.
+//   hashmap<uint32_t, StatusUpdate> pending;
+
+//   result = utils::protobuf::read(stream->received);
+//   while (result.isSome()) {
+//     StatusUpdate* update = result.get();
+//     CHECK(!pending.contains(update->sequence()));
+//     pending[update->sequence()] = *update;
+//     delete update;
+//     result = utils::protobuf::read(stream->received);
+//   }
+
+//   if (result.isError()) {
+//     return false;
+//   }
+
+//   CHECK(result.isNone());
+
+//   LOG(INFO) << "Recovered " << pending.size()
+//             << " TOTAL status updates for task "
+//             << stream->id.second << " of framework "
+//             << stream->id.first;
+
+//   // Okay, now get all the acknowledged status updates.
+//   result = utils::protobuf::read(stream->acknowledged);
+//   while (result.isSome()) {
+//     StatusUpdate* update = result.get();
+//     stream->sequence = std::max(stream->sequence, update->sequence());
+//     CHECK(pending.contains(update->sequence()));
+//     pending.erase(update->sequence());
+//     delete update;
+//     result = utils::protobuf::read(stream->acknowledged);
+//   }
+
+//   if (result.isError()) {
+//     return false;
+//   }
+
+//   CHECK(result.isNone());
+
+//   LOG(INFO) << "Recovered " << pending.size()
+//             << " PENDING status updates for task "
+//             << stream->id.second << " of framework "
+//             << stream->id.first;
+
+//   // Add the pending status updates in sorted order.
+//   uint32_t sequence = 0;
+
+//   while (!pending.empty()) {
+//     // Find the smallest sequence number.
+//     foreachvalue (const StatusUpdate& update, pending) {
+//       sequence = std::min(sequence, update.sequence());
+//     }
+
+//     // Push that update and remove it from pending.
+//     stream->pending.push(pending[sequence]);
+//     pending.erase(sequence);
+//   }
+
+//   return true;
+// }
+
+
+// void Slave::cleanupStatusUpdateStream(StatusUpdateStream* stream)
+// {
+//   if (stream->received != NULL) {
+//     fclose(stream->received);
+//   }
+
+//   if (stream->acknowledged != NULL) {
+//     fclose(stream->acknowledged);
+//   }
+
+//   streams.erase(stream->id);
+
+//   delete stream;
+// }
+
+
+// N.B. When the slave is running in "local" mode then the pid is
+// uninteresting (and possibly could cause bugs).
+void Slave::executorStarted(const FrameworkID& frameworkId,
+                            const ExecutorID& executorId,
+                            pid_t pid)
+{
+  LOG(INFO) << "Executor '" << executorId << "' of framework "
+            << frameworkId << " has started at " << pid;
+}
+
+
+// Called by the isolation module when an executor process exits.
+void Slave::executorExited(const FrameworkID& frameworkId,
+                           const ExecutorID& executorId,
+                           int status)
 {
-  LOG(INFO) << "Flushing queued tasks for framework "
-            << framework->frameworkId;
+  Framework* framework = getFramework(frameworkId);
+  if (framework == NULL) {
+    LOG(WARNING) << "WARNING! Unknown executor '" << executorId
+                 << "' of unknown framework " << frameworkId
+                 << " has exited with status " << status;
+    return;
+  }
 
-  CHECK(executor->pid != UPID());
+  Executor* executor = framework->getExecutor(executorId);
+  if (executor == NULL) {
+    LOG(WARNING) << "WARNING! UNKNOWN executor '" << executorId
+                 << "' of framework " << frameworkId
+                 << " has exited with status " << status;
+    return;
+  }
 
-  foreach (const TaskDescription& task, executor->queuedTasks) {
-    // Add the task to the executor.
-    executor->addTask(task);
+  LOG(INFO) << "Executor '" << executorId
+            << "' of framework " << frameworkId
+            << " has exited with status " << status;
 
-    MSG<S2E_RUN_TASK> out;
-    out.mutable_framework()->MergeFrom(framework->info);
-    out.mutable_framework_id()->MergeFrom(framework->frameworkId);
-    out.set_pid(framework->pid);
-    out.mutable_task()->MergeFrom(task);
-    send(executor->pid, out);
-  }
+  ExitedExecutorMessage message;
+  message.mutable_slave_id()->MergeFrom(id);
+  message.mutable_framework_id()->MergeFrom(frameworkId);
+  message.mutable_executor_id()->MergeFrom(executorId);
+  message.set_status(status);
+  send(master, message);
 
-  executor->queuedTasks.clear();
-}
+  // TODO(benh): Send status updates for remaining tasks here rather
+  // than at the master! As in, eliminate the code in
+  // Master::exitedExecutor and put it here.
 
+  framework->destroyExecutor(executor->id);
 
-// Kill a framework (including its executor if killExecutor is true).
-void Slave::killFramework(Framework *framework, bool killExecutors)
-{
-  LOG(INFO) << "Cleaning up framework " << framework->frameworkId;
+  // Cleanup if this framework has nothing running.
+  if (framework->executors.size() == 0) {
+    // TODO(benh): But there might be some remaining status updates
+    // that haven't been acknowledged!
+    frameworks.erase(framework->id);
+    delete framework;
+  }
+}
 
-  // Shutdown all executors of this framework.
-  foreachpaircopy (const ExecutorID& executorId, Executor* executor, framework->executors) {
-    if (killExecutors) {
-      LOG(INFO) << "Killing executor '" << executorId
-                << "' of framework " << framework->frameworkId;
 
-      send(executor->pid, S2E_KILL_EXECUTOR);
+void Slave::shutdownExecutor(Framework* framework, Executor* executor)
+{
+  LOG(INFO) << "Shutting down executor '" << executor->id
+            << "' of framework " << framework->id;
 
-      // TODO(benh): There really isn't ANY time between when an
-      // executor gets a S2E_KILL_EXECUTOR message and the isolation
-      // module goes and kills it. We should really think about making
-      // the semantics of this better.
+  send(executor->pid, ShutdownExecutorMessage());
 
-      isolationModule->killExecutor(framework, executor);
-    }
+  executor->shutdown = true;
 
-    framework->destroyExecutor(executorId);
-  }
+  // Prepare for sending a kill if the executor doesn't comply.
+  double timeout = conf.get<double>("executor_shutdown_timeout_seconds",
+                                    EXECUTOR_SHUTDOWN_TIMEOUT_SECONDS);
 
-  frameworks.erase(framework->frameworkId);
-  delete framework;
+  delay(timeout, self(),
+        &Slave::shutdownExecutorTimeout,
+        framework->id, executor->id, executor->uuid);
 }
 
 
-// Called by isolation module when an executor process exits
-// TODO(benh): Make this callback be a message so that we can avoid
-// race conditions.
-void Slave::executorExited(const FrameworkID& frameworkId, const ExecutorID& executorId, int result)
+void Slave::shutdownExecutorTimeout(const FrameworkID& frameworkId,
+                                    const ExecutorID& executorId,
+                                    const UUID& uuid)
 {
   Framework* framework = getFramework(frameworkId);
-  if (framework != NULL) {
-    Executor* executor = framework->getExecutor(executorId);
-    if (executor != NULL) {
-      LOG(INFO) << "Exited executor '" << executorId
-                << "' of framework " << frameworkId
-                << " with result " << result;
-
-      MSG<S2M_EXITED_EXECUTOR> out;
-      out.mutable_slave_id()->MergeFrom(slaveId);
-      out.mutable_framework_id()->MergeFrom(frameworkId);
-      out.mutable_executor_id()->MergeFrom(executorId);
-      out.set_result(result);
-      send(master, out);
-
-      framework->destroyExecutor(executorId);
-
-      // TODO(benh): When should we kill the presence of an entire
-      // framework on a slave?
-      if (framework->executors.size() == 0) {
-        killFramework(framework);
-      }
-    } else {
-      LOG(WARNING) << "UNKNOWN executor '" << executorId
-                   << "' of framework " << frameworkId
-                   << " has exited with result " << result;
+  if (framework == NULL) {
+    return;
+  }
+
+  Executor* executor = framework->getExecutor(executorId);
+  if (executor == NULL) {
+    return;
+  }
+
+  // Make sure this timeout is valid.
+  if (executor->uuid == uuid) {
+    LOG(INFO) << "Killing executor '" << executor->id
+              << "' of framework " << framework->id;
+
+    dispatch(isolationModule,
+             &IsolationModule::killExecutor,
+             framework->id, executor->id);
+
+    ExitedExecutorMessage message;
+    message.mutable_slave_id()->MergeFrom(id);
+    message.mutable_framework_id()->MergeFrom(frameworkId);
+    message.mutable_executor_id()->MergeFrom(executorId);
+    message.set_status(-1);
+    send(master, message);
+
+    // TODO(benh): Send status updates for remaining tasks here rather
+    // than at the master! As in, eliminate the code in
+    // Master::exitedExecutor and put it here.
+
+    framework->destroyExecutor(executor->id);
+
+    // Cleanup if this framework has nothing running.
+    if (framework->executors.size() == 0) {
+      // TODO(benh): But there might be some remaining status updates
+      // that haven't been acknowledged!
+      frameworks.erase(framework->id);
+      delete framework;
     }
-  } else {
-    LOG(WARNING) << "UNKNOWN executor '" << executorId
-                 << "' of UNKNOWN framework " << frameworkId
-                 << " has exited with result " << result;
   }
-};
+}
+
+
+// void Slave::recover()
+// {
+//   // if we find an executor that is no longer running and it's last
+//   // acknowledged task statuses are not terminal, create a
+//   // statusupdatestream for each task and try and reliably send
+//   // TASK_LOST updates.
+
+//   // otherwise once we reconnect the executor will just start sending
+//   // us status updates that we need to send, wait for ack, write to
+//   // disk, and then respond.
+// }
 
 
 string Slave::getUniqueWorkDirectory(const FrameworkID& frameworkId,
                                      const ExecutorID& executorId)
 {
+  LOG(INFO) << "Generating a unique work directory for executor '"
+            << executorId << "' of framework " << frameworkId;
+
   string workDir = ".";
   if (conf.contains("work_dir")) {
     workDir = conf.get("work_dir", workDir);
@@ -986,30 +1715,32 @@ string Slave::getUniqueWorkDirectory(con
 
   workDir = workDir + "/work";
 
-  ostringstream os(std::ios_base::app | std::ios_base::out);
-  os << workDir << "/slave-" << slaveId
+  std::ostringstream out(std::ios_base::app | std::ios_base::out);
+  out << workDir << "/slave-" << id
      << "/fw-" << frameworkId << "-" << executorId;
 
+  // TODO(benh): Make executor id be in it's own directory.
+
   // Find a unique directory based on the path given by the slave
   // (this is because we might launch multiple executors from the same
   // framework on this slave).
-  os << "/";
+  out << "/";
 
   string dir;
-  dir = os.str();
+  dir = out.str();
 
   for (int i = 0; i < INT_MAX; i++) {
-    os << i;
-    if (opendir(os.str().c_str()) == NULL && errno == ENOENT)
+    out << i;
+    if (opendir(out.str().c_str()) == NULL && errno == ENOENT)
       break;
-    os.str(dir);
+
+    // TODO(benh): Does one need to do any sort of closedir?
+
+    out.str(dir);
   }
 
-  return os.str();
+  return out.str();
 }
 
 
-const Configuration& Slave::getConfiguration()
-{
-  return conf;
-}
+}}} // namespace mesos { namespace internal { namespace slave {