You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@mesos.apache.org by be...@apache.org on 2011/06/05 11:28:34 UTC

svn commit: r1132344 - in /incubator/mesos/trunk/src: common/utils.hpp slave/slave.cpp slave/slave.hpp

Author: benh
Date: Sun Jun  5 09:28:33 2011
New Revision: 1132344

URL: http://svn.apache.org/viewvc?rev=1132344&view=rev
Log:
Added statistics to slaves, added some http endpoints for accessing those statistics (and getting general 'vars').

Modified:
    incubator/mesos/trunk/src/common/utils.hpp
    incubator/mesos/trunk/src/slave/slave.cpp
    incubator/mesos/trunk/src/slave/slave.hpp

Modified: incubator/mesos/trunk/src/common/utils.hpp
URL: http://svn.apache.org/viewvc/incubator/mesos/trunk/src/common/utils.hpp?rev=1132344&r1=1132343&r2=1132344&view=diff
==============================================================================
--- incubator/mesos/trunk/src/common/utils.hpp (original)
+++ incubator/mesos/trunk/src/common/utils.hpp Sun Jun  5 09:28:33 2011
@@ -1,10 +1,41 @@
 #ifndef __UTILS_HPP__
 #define __UTILS_HPP__
 
+#include <unistd.h>
+
+
+// Useful common macros.
 #define VA_NUM_ARGS_IMPL(_1, _2, _3, _4, _5, N, ...) N
 #define VA_NUM_ARGS(...) VA_NUM_ARGS_IMPL(__VA_ARGS__, 5, 4, 3, 2, 1)
 
 #define CONCAT_IMPL(A, B) A ## B
 #define CONCAT(A, B) CONCAT_IMPL(A, B)
 
+
+namespace mesos { namespace internal { namespace utils {
+
+inline std::string getcwd()
+{
+  size_t size = 100;
+     
+  while (true) {
+    char* temp = new char[size];
+    if (::getcwd(temp, size) == temp) {
+      std::string result(temp);
+      delete[] temp;
+      return result;
+    } else {
+      delete[] temp;
+      if (errno != ERANGE) {
+        return std::string();
+      }
+      size *= 2;
+    }
+  }
+
+  return std::string();
+}
+
+}}} // namespace mesos { namespace internal { namespace utils {
+
 #endif // __UTILS_HPP__

Modified: incubator/mesos/trunk/src/slave/slave.cpp
URL: http://svn.apache.org/viewvc/incubator/mesos/trunk/src/slave/slave.cpp?rev=1132344&r1=1132343&r2=1132344&view=diff
==============================================================================
--- incubator/mesos/trunk/src/slave/slave.cpp (original)
+++ incubator/mesos/trunk/src/slave/slave.cpp Sun Jun  5 09:28:33 2011
@@ -6,109 +6,85 @@
 #include <algorithm>
 #include <fstream>
 
+#include <google/protobuf/descriptor.h>
+
 #include "slave.hpp"
 #include "webui.hpp"
 
+#include "common/utils.hpp"
+
 // There's no gethostbyname2 on Solaris, so fake it by calling gethostbyname
 #ifdef __sun__
 #define gethostbyname2(name, _) gethostbyname(name)
 #endif
 
+using namespace mesos;
+using namespace mesos::internal;
+using namespace mesos::internal::slave;
+
+using boost::lexical_cast;
+using boost::unordered_map;
+using boost::unordered_set;
+
+using process::HttpOKResponse;
+using process::HttpResponse;
+using process::HttpRequest;
+using process::Promise;
+using process::UPID;
+
 using std::list;
 using std::make_pair;
 using std::ostringstream;
-using std::istringstream;
 using std::pair;
 using std::queue;
 using std::string;
 using std::vector;
 
-using boost::lexical_cast;
-using boost::unordered_map;
-using boost::unordered_set;
-
-using namespace mesos;
-using namespace mesos::internal;
-using namespace mesos::internal::slave;
-
-
-namespace {
-
-// Periodically sends heartbeats to the master
-class Heart : public MesosProcess
-{
-private:
-  PID master;
-  PID slave;
-  SlaveID sid;
-  double interval;
-
-protected:
-  void operator () ()
-  {
-    link(slave);
-    link(master);
-    do {
-      switch (receive(interval)) {
-      case PROCESS_TIMEOUT:
-	send(master, pack<SH2M_HEARTBEAT>(sid));
-	break;
-      case PROCESS_EXIT:
-	return;
-      }
-    } while (true);
-  }
-
-public:
-  Heart(const PID &_master, const PID &_slave, SlaveID _sid, double _interval)
-    : master(_master), slave(_slave), sid(_sid), interval(_interval) {}
-};
-
 
-// Default values for CPU cores and memory to include in configuration
-const int32_t DEFAULT_CPUS = 1;
-const int32_t DEFAULT_MEM = 1 * Gigabyte;
-
-
-} /* namespace */
-
-
-Slave::Slave(Resources _resources, bool _local,
+Slave::Slave(const Resources& _resources, bool _local,
              IsolationModule *_isolationModule)
-  : id(""), resources(_resources), local(_local),
-    isolationModule(_isolationModule) {}
+  : MesosProcess<Slave>("slave"),
+    resources(_resources), local(_local), isolationModule(_isolationModule)
+{
+  initialize();
+}
 
 
-Slave::Slave(const Params& _conf, bool _local, IsolationModule *_module)
-  : id(""), conf(_conf), local(_local), isolationModule(_module)
-{
-  resources = Resources(conf.get<int32_t>("cpus", DEFAULT_CPUS),
-                        conf.get<int32_t>("mem", DEFAULT_MEM));
+Slave::Slave(const Configuration& _conf, bool _local,
+             IsolationModule* _isolationModule)
+  : MesosProcess<Slave>("slave"),
+    conf(_conf), local(_local), isolationModule(_isolationModule)
+{
+  resources =
+    Resources::parse(conf.get<string>("resources", "cpus:1;mem:1024"));
+
+  initialize();
 }
 
 
-void Slave::registerOptions(Configurator* conf)
-{
-  conf->addOption<int32_t>("cpus", 'c', "CPU cores for use by tasks",
-                           DEFAULT_CPUS);
-  conf->addOption<int64_t>("mem", 'm', "Memory for use by tasks, in MB\n",
-                           DEFAULT_MEM);
-  conf->addOption<string>("work_dir",
-                          "Where to place framework work directories\n"
-                          "(default: MESOS_HOME/work)");
-  conf->addOption<string>("hadoop_home",
-                          "Where to find Hadoop installed (for fetching\n"
-                          "framework executors from HDFS)\n"
-                          "(default: look for HADOOP_HOME environment\n"
-                          "variable or find hadoop on PATH)");
-  conf->addOption<bool>("switch_user", 
-                        "Whether to run tasks as the user who\n"
-                        "submitted them rather than the user running\n"
-                        "the slave (requires setuid permission)",
-                        true);
-   conf->addOption<string>("frameworks_home",
-                           "Directory prepended to relative executor\n"
-                           "paths (default: MESOS_HOME/frameworks)");
+void Slave::registerOptions(Configurator* configurator)
+{
+  // TODO(benh): Is there a way to specify units for the resources?
+  configurator->addOption<string>("resources",
+                                  "Total consumable resources per slave\n");
+//   configurator->addOption<string>("attributes",
+//                                   "Attributes of machine\n");
+  configurator->addOption<string>("work_dir",
+                                  "Where to place framework work directories\n"
+                                  "(default: MESOS_HOME/work)");
+  configurator->addOption<string>("hadoop_home",
+                                  "Where to find Hadoop installed (for\n"
+                                  "fetching framework executors from HDFS)\n"
+                                  "(default: look for HADOOP_HOME in\n"
+                                  "environment or find hadoop on PATH)");
+  configurator->addOption<bool>("switch_user", 
+                                "Whether to run tasks as the user who\n"
+                                "submitted them rather than the user running\n"
+                                "the slave (requires setuid permission)",
+                                true);
+  configurator->addOption<string>("frameworks_home",
+                                  "Directory prepended to relative executor\n"
+                                  "paths (default: MESOS_HOME/frameworks)");
 }
 
 
@@ -118,25 +94,63 @@ Slave::~Slave()
 }
 
 
-state::SlaveState *Slave::getState()
+Promise<state::SlaveState*> Slave::getState()
 {
-  std::ostringstream my_pid;
-  my_pid << self();
-  std::ostringstream master_pid;
-  master_pid << master;
-  state::SlaveState *state =
-    new state::SlaveState(BUILD_DATE, BUILD_USER, id, resources.cpus, 
-        resources.mem, my_pid.str(), master_pid.str());
-
-  foreachpair(_, Framework *f, frameworks) {
-    state::Framework *framework = new state::Framework(f->id, f->name, 
-        f->executorInfo.uri, f->executorStatus, f->resources.cpus,
-        f->resources.mem);
-    state->frameworks.push_back(framework);
-    foreachpair(_, Task *t, f->tasks) {
-      state::Task *task = new state::Task(t->id, t->name, t->state,
-          t->resources.cpus, t->resources.mem);
-      framework->tasks.push_back(task);
+  Resources resources(resources);
+  Resource::Scalar cpus;
+  Resource::Scalar mem;
+  cpus.set_value(-1);
+  mem.set_value(-1);
+  cpus = resources.getScalar("cpus", cpus);
+  mem = resources.getScalar("mem", mem);
+
+  state::SlaveState* state =
+    new state::SlaveState(build::DATE, build::USER, slaveId.value(),
+                          cpus.value(), mem.value(), self(), master);
+
+  foreachpair (_, Framework* f, frameworks) {
+    foreachpair (_, Executor* e, f->executors) {
+      Resources resources(e->resources);
+      Resource::Scalar cpus;
+      Resource::Scalar mem;
+      cpus.set_value(-1);
+      mem.set_value(-1);
+      cpus = resources.getScalar("cpus", cpus);
+      mem = resources.getScalar("mem", mem);
+
+      // TOOD(benh): For now, we will add a state::Framework object
+      // for each executor that the framework has. Therefore, we tweak
+      // the framework ID to also include the associated executor ID
+      // to differentiate them. This is so we don't have to make very
+      // many changes to the webui right now. Note that this ID
+      // construction must be identical to what we do for directory
+      // suffix returned from Slave::getUniqueWorkDirectory.
+
+      string id = f->frameworkId.value() + "-" + e->info.executor_id().value();
+
+      state::Framework* framework =
+        new state::Framework(id, f->info.name(),
+                             e->info.uri(), e->executorStatus,
+                             cpus.value(), mem.value());
+
+      state->frameworks.push_back(framework);
+
+      foreachpair (_, Task* t, e->tasks) {
+        Resources resources(t->resources());
+        Resource::Scalar cpus;
+        Resource::Scalar mem;
+        cpus.set_value(-1);
+        mem.set_value(-1);
+        cpus = resources.getScalar("cpus", cpus);
+        mem = resources.getScalar("mem", mem);
+
+        state::Task* task =
+          new state::Task(t->task_id().value(), t->name(),
+                          TaskState_descriptor()->FindValueByNumber(t->state())->name(),
+                          cpus.value(), mem.value());
+
+        framework->tasks.push_back(task);
+      }
     }
   }
 
@@ -144,380 +158,777 @@ state::SlaveState *Slave::getState()
 }
 
 
+void Slave::initialize()
+{
+  // Start all the statistics at 0.
+  statistics.launched_tasks = 0;
+  statistics.finished_tasks = 0;
+  statistics.killed_tasks = 0;
+  statistics.failed_tasks = 0;
+  statistics.lost_tasks = 0;
+  statistics.valid_status_updates = 0;
+  statistics.invalid_status_updates = 0;
+  statistics.valid_framework_messages = 0;
+  statistics.invalid_framework_messages = 0;
+
+  startTime = elapsedTime();
+
+  install(NEW_MASTER_DETECTED, &Slave::newMasterDetected,
+          &NewMasterDetectedMessage::pid);
+
+  install(NO_MASTER_DETECTED, &Slave::noMasterDetected);
+
+  install(M2S_REGISTER_REPLY, &Slave::registerReply,
+          &SlaveRegisteredMessage::slave_id);
+
+  install(M2S_REREGISTER_REPLY, &Slave::reregisterReply,
+          &SlaveRegisteredMessage::slave_id);
+
+  install(M2S_RUN_TASK, &Slave::runTask,
+          &RunTaskMessage::framework,
+          &RunTaskMessage::framework_id,
+          &RunTaskMessage::pid,
+          &RunTaskMessage::task);
+
+  install(M2S_KILL_TASK, &Slave::killTask,
+          &KillTaskMessage::framework_id,
+          &KillTaskMessage::task_id);
+
+  install(M2S_KILL_FRAMEWORK, &Slave::killFramework,
+          &KillFrameworkMessage::framework_id);
+
+  install(M2S_FRAMEWORK_MESSAGE, &Slave::schedulerMessage,
+          &FrameworkMessageMessage::slave_id,
+          &FrameworkMessageMessage::framework_id,
+          &FrameworkMessageMessage::executor_id,
+          &FrameworkMessageMessage::data);
+
+  install(M2S_UPDATE_FRAMEWORK, &Slave::updateFramework,
+          &UpdateFrameworkMessage::framework_id,
+          &UpdateFrameworkMessage::pid);
+
+  install(M2S_STATUS_UPDATE_ACK, &Slave::statusUpdateAck,
+          &StatusUpdateAckMessage::framework_id,
+          &StatusUpdateAckMessage::slave_id,
+          &StatusUpdateAckMessage::task_id);
+
+  install(E2S_REGISTER_EXECUTOR, &Slave::registerExecutor,
+          &RegisterExecutorMessage::framework_id,
+          &RegisterExecutorMessage::executor_id);
+
+  install(E2S_STATUS_UPDATE, &Slave::statusUpdate,
+          &StatusUpdateMessage::framework_id,
+          &StatusUpdateMessage::status);
+
+  install(E2S_FRAMEWORK_MESSAGE, &Slave::executorMessage,
+          &FrameworkMessageMessage::slave_id,
+          &FrameworkMessageMessage::framework_id,
+          &FrameworkMessageMessage::executor_id,
+          &FrameworkMessageMessage::data);
+
+  install(PING, &Slave::ping);
+
+  install(process::TIMEOUT, &Slave::timeout);
+
+  install(process::EXITED, &Slave::exited);
+
+  installHttpHandler("info.json", &Slave::http_info_json);
+  installHttpHandler("frameworks.json", &Slave::http_frameworks_json);
+  installHttpHandler("tasks.json", &Slave::http_tasks_json);
+  installHttpHandler("stats.json", &Slave::http_stats_json);
+  installHttpHandler("vars", &Slave::http_vars);
+}
+
+
 void Slave::operator () ()
 {
   LOG(INFO) << "Slave started at " << self();
+  LOG(INFO) << "Slave resources: " << resources;
 
   // Get our hostname
   char buf[512];
   gethostname(buf, sizeof(buf));
-  hostent *he = gethostbyname2(buf, AF_INET);
+  hostent* he = gethostbyname2(buf, AF_INET);
   string hostname = he->h_name;
 
-  // Get our public DNS name. Normally this is our hostname, but on EC2
-  // we look for the MESOS_PUBLIC_DNS environment variable. This allows
-  // the master to display our public name in its web UI.
-  string publicDns = hostname;
+  // Check and see if we have a different public DNS name. Normally
+  // this is our hostname, but on EC2 we look for the MESOS_PUBLIC_DNS
+  // environment variable. This allows the master to display our
+  // public name in its web UI.
+  string public_hostname = hostname;
   if (getenv("MESOS_PUBLIC_DNS") != NULL) {
-    publicDns = getenv("MESOS_PUBLIC_DNS");
+    public_hostname = getenv("MESOS_PUBLIC_DNS");
   }
 
+  // Initialize slave info.
+  slave.set_hostname(hostname);
+  slave.set_public_hostname(public_hostname);
+  slave.mutable_resources()->MergeFrom(resources);
+
   // Initialize isolation module.
   isolationModule->initialize(this);
 
   while (true) {
-    switch (receive()) {
-      case NEW_MASTER_DETECTED: {
-	string masterSeq;
-	PID masterPid;
-	tie(masterSeq, masterPid) = unpack<NEW_MASTER_DETECTED>(body());
-
-	LOG(INFO) << "New master at " << masterPid << " with ID:" << masterSeq;
-
-        redirect(master, masterPid);
-	master = masterPid;
-	link(master);
-
-	if (id.empty()) {
-	  // Slave started before master.
-	  send(master, pack<S2M_REGISTER_SLAVE>(hostname, publicDns, resources));
-	} else {
-	  // Reconnecting, so reconstruct resourcesInUse for the master.
-	  Resources resourcesInUse; 
-	  vector<Task> taskVec;
-
-	  foreachpair(_, Framework *framework, frameworks) {
-	    foreachpair(_, Task *task, framework->tasks) {
-	      resourcesInUse += task->resources;
-	      Task ti = *task;
-	      ti.slaveId = id;
-	      taskVec.push_back(ti);
-	    }
-	  }
+    serve(1);
+    if (name() == process::TERMINATE) {
+      LOG(INFO) << "Asked to shut down by " << from();
+      foreachpaircopy (_, Framework* framework, frameworks) {
+        killFramework(framework);
+      }
+      return;
+    }
+  }
+}
+
+
+void Slave::newMasterDetected(const string& pid)
+{
+  LOG(INFO) << "New master detected at " << pid;
 
-	  send(master, pack<S2M_REREGISTER_SLAVE>(id, hostname, publicDns, resources, taskVec));
+  master = pid;
+  link(master);
+
+  if (slaveId == "") {
+    // Slave started before master.
+    MSG<S2M_REGISTER_SLAVE> out;
+    out.mutable_slave()->MergeFrom(slave);
+    send(master, out);
+  } else {
+    // Re-registering, so send tasks running.
+    MSG<S2M_REREGISTER_SLAVE> out;
+    out.mutable_slave_id()->MergeFrom(slaveId);
+    out.mutable_slave()->MergeFrom(slave);
+
+    foreachpair (_, Framework* framework, frameworks) {
+      foreachpair (_, Executor* executor, framework->executors) {
+	foreachpair (_, Task* task, executor->tasks) {
+	  out.add_tasks()->MergeFrom(*task);
 	}
-	break;
-      }
-	
-      case NO_MASTER_DETECTED: {
-	LOG(INFO) << "Lost master(s) ... waiting";
-	break;
       }
+    }
 
-      case M2S_REGISTER_REPLY: {
-	double interval = 0;
-        tie(this->id, interval) = unpack<M2S_REGISTER_REPLY>(body());
-        LOG(INFO) << "Registered with master; given slave ID " << this->id;
-        link(spawn(new Heart(master, self(), this->id, interval)));
-        break;
-      }
-      
-      case M2S_REREGISTER_REPLY: {
-        SlaveID sid;
-	double interval = 0;
-        tie(sid, interval) = unpack<M2S_REREGISTER_REPLY>(body());
-        LOG(INFO) << "RE-registered with master; given slave ID " << sid << " had "<< this->id;
-        if (this->id == "")
-          this->id = sid;
-        CHECK(this->id == sid);
-        link(spawn(new Heart(master, self(), this->id, interval)));
-        break;
-      }
-      
-      case M2S_RUN_TASK: {
-	FrameworkID fid;
-        TaskID tid;
-        string fwName, user, taskName, taskArg;
-        ExecutorInfo execInfo;
-        Params params;
-        PID pid;
-        tie(fid, tid, fwName, user, execInfo, taskName, taskArg, params, pid) =
-          unpack<M2S_RUN_TASK>(body());
-        LOG(INFO) << "Got assigned task " << fid << ":" << tid;
-        Resources res;
-        res.cpus = params.getInt32("cpus", -1);
-        res.mem = params.getInt64("mem", -1);
-        Framework *framework = getFramework(fid);
-        if (framework == NULL) {
-          // Framework not yet created on this node - create it.
-          framework = new Framework(fid, fwName, user, execInfo, pid);
-          frameworks[fid] = framework;
-          isolationModule->startExecutor(framework);
-        }
-        Task *task = framework->addTask(tid, taskName, res);
-        Executor *executor = getExecutor(fid);
-        if (executor) {
-          send(executor->pid,
-               pack<S2E_RUN_TASK>(tid, taskName, taskArg, params));
-          isolationModule->resourcesChanged(framework);
-        } else {
-          // Executor not yet registered; queue task for when it starts up
-          TaskDescription *td = new TaskDescription(
-              tid, taskName, taskArg, params.str());
-          framework->queuedTasks.push_back(td);
-        }
-        break;
-      }
+    send(master, out);
+  }
+}
 
-      case M2S_KILL_TASK: {
-        FrameworkID fid;
-        TaskID tid;
-        tie(fid, tid) = unpack<M2S_KILL_TASK>(body());
-        LOG(INFO) << "Killing task " << fid << ":" << tid;
-        if (Executor *ex = getExecutor(fid)) {
-          send(ex->pid, pack<S2E_KILL_TASK>(tid));
-        }
-        if (Framework *fw = getFramework(fid)) {
-          fw->removeTask(tid);
-          isolationModule->resourcesChanged(fw);
-        }
-        break;
-      }
 
-      case M2S_KILL_FRAMEWORK: {
-        FrameworkID fid;
-        tie(fid) = unpack<M2S_KILL_FRAMEWORK>(body());
-        LOG(INFO) << "Asked to kill framework " << fid;
-        Framework *fw = getFramework(fid);
-        if (fw != NULL)
-          killFramework(fw);
-        break;
-      }
+void Slave::noMasterDetected()
+{
+  LOG(INFO) << "Lost master(s) ... waiting";
+}
 
-      case M2S_FRAMEWORK_MESSAGE: {
-        FrameworkID fid;
-        FrameworkMessage message;
-        tie(fid, message) = unpack<M2S_FRAMEWORK_MESSAGE>(body());
-        if (Executor *ex = getExecutor(fid)) {
-          VLOG(1) << "Relaying framework message for framework " << fid;
-          send(ex->pid, pack<S2E_FRAMEWORK_MESSAGE>(message));
-        } else {
-          VLOG(1) << "Dropping framework message for framework " << fid
-                  << " because its executor is not running";
-        }
-        // TODO(*): If executor is not started, queue framework message?
-        // (It's probably okay to just drop it since frameworks can have
-        // the executor send a message to the master to say when it's ready.)
-        break;
-      }
 
-      case M2S_UPDATE_FRAMEWORK_PID: {
-        FrameworkID fid;
-        PID pid;
-        tie(fid, pid) = unpack<M2S_UPDATE_FRAMEWORK_PID>(body());
-        Framework *framework = getFramework(fid);
-        if (framework != NULL) {
-          LOG(INFO) << "Updating framework " << fid << " pid to " << pid;
-          framework->pid = pid;
-        }
-        break;
-      }
+void Slave::registerReply(const SlaveID& slaveId)
+{
+  LOG(INFO) << "Registered with master; given slave ID " << slaveId;
+  this->slaveId = slaveId;
+}
 
-      case E2S_REGISTER_EXECUTOR: {
-        FrameworkID fid;
-        tie(fid) = unpack<E2S_REGISTER_EXECUTOR>(body());
-        LOG(INFO) << "Got executor registration for framework " << fid;
-        if (Framework *fw = getFramework(fid)) {
-          if (getExecutor(fid) != 0) {
-            LOG(ERROR) << "Executor for framework " << fid
-                       << "already exists";
-            send(from(), pack<S2E_KILL_EXECUTOR>());
-            break;
-          }
-          Executor *executor = new Executor(fid, from());
-          executors[fid] = executor;
-          link(from());
-          // Now that the executor is up, set its resource limits
-          isolationModule->resourcesChanged(fw);
-          // Tell executor that it's registered and give it its queued tasks
-          send(from(), pack<S2E_REGISTER_REPLY>(this->id,
-                                                hostname,
-                                                fw->name,
-                                                fw->executorInfo.initArg));
-          sendQueuedTasks(fw);
-        } else {
-          // Framework is gone; tell the executor to exit
-          send(from(), pack<S2E_KILL_EXECUTOR>());
-        }
-        break;
-      }
 
-      case E2S_STATUS_UPDATE: {
-        FrameworkID fid;
-        TaskID tid;
-        TaskState taskState;
-        string data;
-        tie(fid, tid, taskState, data) = unpack<E2S_STATUS_UPDATE>(body());
-
-        Framework *framework = getFramework(fid);
-        if (framework != NULL) {
-	  LOG(INFO) << "Got status update for task " << fid << ":" << tid;
-	  if (taskState == TASK_FINISHED || taskState == TASK_FAILED ||
-	      taskState == TASK_KILLED || taskState == TASK_LOST) {
-	    LOG(INFO) << "Task " << fid << ":" << tid << " done";
-
-            framework->removeTask(tid);
-            isolationModule->resourcesChanged(framework);
-          }
-
-	  // Reliably send message and save sequence number for
-	  // canceling later.
-	  int seq = rsend(master, framework->pid,
-			  pack<S2M_STATUS_UPDATE>(id, fid, tid,
-                                                  taskState, data));
-	  seqs[fid].insert(seq);
-	} else {
-	  LOG(WARNING) << "Got status update for UNKNOWN task "
-		       << fid << ":" << tid;
-	}
-        break;
-      }
+void Slave::reregisterReply(const SlaveID& slaveId)
+{
+  LOG(INFO) << "Re-registered with master";
 
-      case E2S_FRAMEWORK_MESSAGE: {
-        FrameworkID fid;
-        FrameworkMessage message;
-        tie(fid, message) = unpack<E2S_FRAMEWORK_MESSAGE>(body());
-
-        Framework *framework = getFramework(fid);
-        if (framework != NULL) {
-	  LOG(INFO) << "Sending message for framework " << fid
-		    << " to " << framework->pid;
-
-          // Set slave ID in case framework omitted it.
-          message.slaveId = this->id;
-          VLOG(1) << "Sending framework message to framework " << fid
-                  << " with PID " << framework->pid;
-          send(framework->pid, pack<M2F_FRAMEWORK_MESSAGE>(message));
-        }
-        break;
-      }
+  if (!(this->slaveId == slaveId)) {
+    LOG(FATAL) << "Slave re-registered but got wrong ID";
+  }
+}
 
-      case S2S_GET_STATE: {
- 	send(from(), pack<S2S_GET_STATE_REPLY>(getState()));
-	break;
-      }
 
-      case PROCESS_EXIT: {
-        LOG(INFO) << "Process exited: " << from();
+void Slave::runTask(const FrameworkInfo& frameworkInfo,
+                    const FrameworkID& frameworkId,
+                    const string& pid,
+                    const TaskDescription& task)
+{
+  LOG(INFO) << "Got assigned task " << task.task_id()
+            << " for framework " << frameworkId;
+
+  Framework* framework = getFramework(frameworkId);
+  if (framework == NULL) {
+    framework = new Framework(frameworkId, frameworkInfo, pid);
+    frameworks[frameworkId] = framework;
+  }
 
-        if (from() == master) {
-	  LOG(WARNING) << "Master disconnected! "
-		       << "Waiting for a new master to be elected.";
-	  // TODO(benh): After so long waiting for a master, commit suicide.
-	} else {
-	  // Check if an executor has exited (this is technically
-	  // redundant because the isolation module should be doing
-	  // this for us).
-	  foreachpair (_, Executor *ex, executors) {
-	    if (from() == ex->pid) {
-	      LOG(INFO) << "Executor for framework " << ex->frameworkId
-			<< " disconnected";
-	      Framework *framework = getFramework(ex->frameworkId);
-	      if (framework != NULL) {
-		send(master, pack<S2M_LOST_EXECUTOR>(id, ex->frameworkId, -1));
-		killFramework(framework);
-	      }
-	      break;
-	    }
-	  }
-	}
+  // Either send the task to an executor or start a new executor
+  // and queue the task until the executor has started.
+  Executor* executor = task.has_executor()
+    ? framework->getExecutor(task.executor().executor_id())
+    : framework->getExecutor(framework->info.executor().executor_id());
+        
+  if (executor != NULL) {
+    if (!executor->pid) {
+      // Queue task until the executor starts up.
+      executor->queuedTasks.push_back(task);
+    } else {
+      // Add the task to the executor.
+      executor->addTask(task);
+
+      MSG<S2E_RUN_TASK> out;
+      out.mutable_framework()->MergeFrom(framework->info);
+      out.mutable_framework_id()->MergeFrom(framework->frameworkId);
+      out.set_pid(framework->pid);
+      out.mutable_task()->MergeFrom(task);
+      send(executor->pid, out);
+      isolationModule->resourcesChanged(framework, executor);
+    }
+  } else {
+    // Launch an executor for this task.
+    if (task.has_executor()) {
+      executor = framework->createExecutor(task.executor());
+    } else {
+      executor = framework->createExecutor(framework->info.executor());
+    }
+
+    // Queue task until the executor starts up.
+    executor->queuedTasks.push_back(task);
 
+    // Tell the isolation module to launch the executor.
+    isolationModule->launchExecutor(framework, executor);
+  }
+}
+
+
+void Slave::killTask(const FrameworkID& frameworkId,
+                     const TaskID& taskId)
+{
+  LOG(INFO) << "Asked to kill task " << taskId
+            << " of framework " << frameworkId;
+
+  Framework* framework = getFramework(frameworkId);
+  if (framework != NULL) {
+    // Tell the executor to kill the task if it is up and
+    // running, otherwise, consider the task lost.
+    Executor* executor = framework->getExecutor(taskId);
+    if (executor == NULL || !executor->pid) {
+      // Update the resources locally, if an executor comes up
+      // after this then it just won't receive this task.
+      executor->removeTask(taskId);
+      isolationModule->resourcesChanged(framework, executor);
+
+      MSG<S2M_STATUS_UPDATE> out;
+      out.mutable_framework_id()->MergeFrom(frameworkId);
+      TaskStatus *status = out.mutable_status();
+      status->mutable_task_id()->MergeFrom(taskId);
+      status->mutable_slave_id()->MergeFrom(slaveId);
+      status->set_state(TASK_LOST);
+      send(master, out);
+
+      double deadline = elapsedTime() + STATUS_UPDATE_RETRY_TIMEOUT;
+      framework->statuses[deadline][status->task_id()] = *status;
+    } else {
+      // Otherwise, send a message to the executor and wait for
+      // it to send us a status update.
+      MSG<S2E_KILL_TASK> out;
+      out.mutable_framework_id()->MergeFrom(frameworkId);
+      out.mutable_task_id()->MergeFrom(taskId);
+      send(executor->pid, out);
+    }
+  } else {
+    LOG(WARNING) << "Cannot kill task " << taskId
+                 << " of framework " << frameworkId
+                 << " because no such framework is running";
+
+    MSG<S2M_STATUS_UPDATE> out;
+    out.mutable_framework_id()->MergeFrom(frameworkId);
+    TaskStatus *status = out.mutable_status();
+    status->mutable_task_id()->MergeFrom(taskId);
+    status->mutable_slave_id()->MergeFrom(slaveId);
+    status->set_state(TASK_LOST);
+    send(master, out);
+
+    double deadline = elapsedTime() + STATUS_UPDATE_RETRY_TIMEOUT;
+    framework->statuses[deadline][status->task_id()] = *status;
+  }
+}
+
+
+void Slave::killFramework(const FrameworkID& frameworkId)
+{
+  LOG(INFO) << "Asked to kill framework " << frameworkId;
+
+  Framework* framework = getFramework(frameworkId);
+  if (framework != NULL) {
+    killFramework(framework);
+  }
+}
+
+
+void Slave::schedulerMessage(const SlaveID& slaveId,
+			     const FrameworkID& frameworkId,
+			     const ExecutorID& executorId,
+                             const string& data)
+{
+  Framework* framework = getFramework(frameworkId);
+  if (framework != NULL) {
+    Executor* executor = framework->getExecutor(executorId);
+    if (executor == NULL) {
+      LOG(WARNING) << "Dropping message for executor '"
+                   << executorId << "' of framework " << frameworkId
+                   << " because executor does not exist";
+      statistics.invalid_framework_messages++;
+    } else if (!executor->pid) {
+      // TODO(*): If executor is not started, queue framework message?
+      // (It's probably okay to just drop it since frameworks can have
+      // the executor send a message to the master to say when it's ready.)
+      LOG(WARNING) << "Dropping message for executor '"
+                   << executorId << "' of framework " << frameworkId
+                   << " because executor is not running";
+      statistics.invalid_framework_messages++;
+    } else {
+      MSG<S2E_FRAMEWORK_MESSAGE> out;
+      out.mutable_slave_id()->MergeFrom(slaveId);
+      out.mutable_framework_id()->MergeFrom(frameworkId);
+      out.mutable_executor_id()->MergeFrom(executorId);
+      out.set_data(data);
+      send(executor->pid, out);
+
+      statistics.valid_framework_messages++;
+    }
+  } else {
+    LOG(WARNING) << "Dropping message for framework "<< frameworkId
+                 << " because framework does not exist";
+    statistics.invalid_framework_messages++;
+  }
+}
+
+
+void Slave::updateFramework(const FrameworkID& frameworkId,
+                            const string& pid)
+{
+  Framework* framework = getFramework(frameworkId);
+  if (framework != NULL) {
+    LOG(INFO) << "Updating framework " << frameworkId
+              << " pid to " <<pid;
+    framework->pid = pid;
+  }
+}
+
+
+void Slave::statusUpdateAck(const FrameworkID& frameworkId,
+                            const SlaveID& slaveId,
+                            const TaskID& taskId)
+{
+  Framework* framework = getFramework(frameworkId);
+  if (framework != NULL) {
+    foreachpair (double deadline, _, framework->statuses) {
+      if (framework->statuses[deadline].count(taskId) > 0) {
+        LOG(INFO) << "Got acknowledgement of status update"
+                  << " for task " << taskId
+                  << " of framework " << framework->frameworkId;
+        framework->statuses[deadline].erase(taskId);
         break;
       }
+    }
+  }
+}
 
-      case M2S_SHUTDOWN: {
-        LOG(INFO) << "Asked to shut down by master: " << from();
-        unordered_map<FrameworkID, Framework*> frameworksCopy = frameworks;
-        foreachpair (_, Framework *framework, frameworksCopy) {
-          killFramework(framework);
-        }
-        return;
-      }
 
-      case S2S_SHUTDOWN: {
-        LOG(INFO) << "Asked to shut down by " << from();
-        unordered_map<FrameworkID, Framework*> frameworksCopy = frameworks;
-        foreachpair (_, Framework *framework, frameworksCopy) {
-          killFramework(framework);
+void Slave::registerExecutor(const FrameworkID& frameworkId,
+                             const ExecutorID& executorId)
+{
+  LOG(INFO) << "Got registration for executor '" << executorId
+            << "' of framework " << frameworkId;
+
+  Framework* framework = getFramework(frameworkId);
+  if (framework != NULL) {
+    Executor* executor = framework->getExecutor(executorId);
+
+    // Check the status of the executor.
+    if (executor == NULL) {
+      LOG(WARNING) << "Not expecting executor '" << executorId
+                   << "' of framework " << frameworkId;
+      send(from(), S2E_KILL_EXECUTOR);
+    } else if (executor->pid != UPID()) {
+      LOG(WARNING) << "Not good, executor '" << executorId
+                   << "' of framework " << frameworkId
+                   << " is already running";
+      send(from(), S2E_KILL_EXECUTOR);
+    } else {
+      // Save the pid for the executor.
+      executor->pid = from();
+
+      // Now that the executor is up, set its resource limits.
+      isolationModule->resourcesChanged(framework, executor);
+
+      // Tell executor it's registered and give it any queued tasks.
+      MSG<S2E_REGISTER_REPLY> out;
+      ExecutorArgs* args = out.mutable_args();
+      args->mutable_framework_id()->MergeFrom(framework->frameworkId);
+      args->mutable_executor_id()->MergeFrom(executor->info.executor_id());
+      args->mutable_slave_id()->MergeFrom(slaveId);
+      args->set_hostname(slave.hostname());
+      args->set_data(framework->info.executor().data());
+      send(executor->pid, out);
+      sendQueuedTasks(framework, executor);
+    }
+  } else {
+    // Framework is gone; tell the executor to exit.
+    LOG(WARNING) << "Framework " << frameworkId
+                 << " does not exist (it may have been killed),"
+                 << " telling executor to exit";
+
+    // TODO(benh): Don't we also want to tell the isolation
+    // module to shut this guy down!
+    send(from(), S2E_KILL_EXECUTOR);
+  }
+}
+
+
+void Slave::statusUpdate(const FrameworkID& frameworkId,
+                         const TaskStatus& status)
+{
+  LOG(INFO) << "Status update: task " << status.task_id()
+            << " of framework " << frameworkId
+            << " is now in state "
+            << TaskState_descriptor()->FindValueByNumber(status.state())->name();
+
+  Framework* framework = getFramework(frameworkId);
+  if (framework != NULL) {
+    Executor* executor = framework->getExecutor(status.task_id());
+    if (executor != NULL) {
+      executor->updateTaskState(status.task_id(), status.state());
+
+      // Remove the task if necessary, and update statistics.
+      switch (status.state()) {
+        case TASK_FINISHED:
+          statistics.finished_tasks++;
+          executor->removeTask(status.task_id());
+          isolationModule->resourcesChanged(framework, executor);
+          break;
+        case TASK_FAILED:
+          statistics.failed_tasks++;
+          executor->removeTask(status.task_id());
+          isolationModule->resourcesChanged(framework, executor);
+          break;
+       case TASK_KILLED:
+         statistics.killed_tasks++;
+         executor->removeTask(status.task_id());
+         isolationModule->resourcesChanged(framework, executor);
+         break;
+        case TASK_LOST:
+          statistics.lost_tasks++;
+          executor->removeTask(status.task_id());
+          isolationModule->resourcesChanged(framework, executor);
+          break;
+      }
+
+      // Send message and record the status for possible resending.
+      MSG<S2M_STATUS_UPDATE> out;
+      out.mutable_framework_id()->MergeFrom(frameworkId);
+      out.mutable_status()->MergeFrom(status);
+      send(master, out);
+
+      double deadline = elapsedTime() + STATUS_UPDATE_RETRY_TIMEOUT;
+      framework->statuses[deadline][status.task_id()] = status;
+
+      statistics.valid_status_updates++;
+    } else {
+      LOG(WARNING) << "Status update error: couldn't lookup "
+                   << "executor for framework " << frameworkId;
+      statistics.invalid_status_updates++;
+    }
+  } else {
+    LOG(WARNING) << "Status update error: couldn't lookup "
+                 << "framework " << frameworkId;
+    statistics.invalid_status_updates++;
+  }
+}
+
+
+void Slave::executorMessage(const SlaveID& slaveId,
+			    const FrameworkID& frameworkId,
+			    const ExecutorID& executorId,
+                            const string& data)
+{
+  Framework* framework = getFramework(frameworkId);
+  if (framework != NULL) {
+    LOG(INFO) << "Sending message for framework " << frameworkId
+              << " to " << framework->pid;
+
+    // TODO(benh): This is weird, sending an M2F message.
+    MSG<M2F_FRAMEWORK_MESSAGE> out;
+    out.mutable_slave_id()->MergeFrom(slaveId);
+    out.mutable_framework_id()->MergeFrom(frameworkId);
+    out.mutable_executor_id()->MergeFrom(executorId);
+    out.set_data(data);
+    send(framework->pid, out);
+
+    statistics.valid_framework_messages++;
+  } else {
+    LOG(WARNING) << "Cannot send framework message from slave "
+                 << slaveId << " to framework " << frameworkId
+                 << " because framework does not exist";
+    statistics.invalid_framework_messages++;
+  }
+}
+
+
+void Slave::ping()
+{
+  send(from(), PONG);
+}
+
+
+void Slave::timeout()
+{
+  // Check and see if we should re-send any status updates.
+  foreachpair (_, Framework* framework, frameworks) {
+    foreachpair (double deadline, _, framework->statuses) {
+      if (deadline <= elapsedTime()) {
+        foreachpair (_, const TaskStatus& status, framework->statuses[deadline]) {
+          LOG(WARNING) << "Resending status update"
+                       << " for task " << status.task_id()
+                       << " of framework " << framework->frameworkId;
+          MSG<S2M_STATUS_UPDATE> out;
+          out.mutable_framework_id()->MergeFrom(framework->frameworkId);
+          out.mutable_status()->MergeFrom(status);
+          send(master, out);
         }
-        return;
       }
+    }
+  }
+}
 
-      default: {
-        LOG(ERROR) << "Received unknown message ID " << msgid()
-                   << " from " << from();
-        break;
+void Slave::exited()
+{
+  LOG(INFO) << "Process exited: " << from();
+
+  if (from() == master) {
+    LOG(WARNING) << "Master disconnected! "
+                 << "Waiting for a new master to be elected.";
+    // TODO(benh): After so long waiting for a master, commit suicide.
+  }
+}
+
+
+Promise<HttpResponse> Slave::http_info_json(const HttpRequest& request)
+{
+  LOG(INFO) << "HTTP request for '/slave/info.json'";
+
+  ostringstream out;
+
+  out <<
+    "{" <<
+    "\"built_date\":\"" << build::DATE << "\"," <<
+    "\"build_user\":\"" << build::USER << "\"," <<
+    "\"start_time\":\"" << startTime << "\"," <<
+    "\"pid\":\"" << self() << "\"" <<
+    "}";
+
+  HttpOKResponse response;
+  response.headers["Content-Type"] = "text/x-json;charset=UTF-8";
+  response.headers["Content-Length"] = lexical_cast<string>(out.str().size());
+  response.body = out.str().data();
+  return response;
+}
+
+
+Promise<HttpResponse> Slave::http_frameworks_json(const HttpRequest& request)
+{
+  LOG(INFO) << "HTTP request for '/slave/frameworks.json'";
+
+  ostringstream out;
+
+  out << "[";
+
+  foreachpair (_, Framework* framework, frameworks) {
+    out <<
+      "{" <<
+      "\"id\":\"" << framework->frameworkId << "\"," <<
+      "\"name\":\"" << framework->info.name() << "\"," <<
+      "\"user\":\"" << framework->info.user() << "\""
+      "},";
+  }
+
+  // Backup the put pointer to overwrite the last comma (hack).
+  if (frameworks.size() > 0) {
+    long pos = out.tellp();
+    out.seekp(pos - 1);
+  }
+
+  out << "]";
+
+  HttpOKResponse response;
+  response.headers["Content-Type"] = "text/x-json;charset=UTF-8";
+  response.headers["Content-Length"] = lexical_cast<string>(out.str().size());
+  response.body = out.str().data();
+  return response;
+}
+
+
+Promise<HttpResponse> Slave::http_tasks_json(const HttpRequest& request)
+{
+  LOG(INFO) << "HTTP request for '/slave/tasks.json'";
+
+  ostringstream out;
+
+  out << "[";
+
+  foreachpair (_, Framework* framework, frameworks) {
+    foreachpair (_, Executor* executor, framework->executors) {
+      foreachpair (_, Task* task, executor->tasks) {
+        // TODO(benh): Send all of the resources (as JSON).
+        Resources resources(task->resources());
+        Resource::Scalar cpus = resources.getScalar("cpus", Resource::Scalar());
+        Resource::Scalar mem = resources.getScalar("mem", Resource::Scalar());
+        const string& state =
+          TaskState_descriptor()->FindValueByNumber(task->state())->name();
+        out <<
+          "{" <<
+          "\"task_id\":\"" << task->task_id() << "\"," <<
+          "\"framework_id\":\"" << task->framework_id() << "\"," <<
+          "\"slave_id\":\"" << task->slave_id() << "\"," <<
+          "\"name\":\"" << task->name() << "\"," <<
+          "\"state\":\"" << state << "\"," <<
+          "\"cpus\":" << cpus.value() << "," <<
+          "\"mem\":" << mem.value() <<
+          "},";
       }
     }
   }
+
+  // Backup the put pointer to overwrite the last comma (hack).
+  if (frameworks.size() > 0) {
+    long pos = out.tellp();
+    out.seekp(pos - 1);
+  }
+
+  out << "]";
+
+  HttpOKResponse response;
+  response.headers["Content-Type"] = "text/x-json;charset=UTF-8";
+  response.headers["Content-Length"] = lexical_cast<string>(out.str().size());
+  response.body = out.str().data();
+  return response;
 }
 
 
-Framework * Slave::getFramework(FrameworkID frameworkId)
+Promise<HttpResponse> Slave::http_stats_json(const HttpRequest& request)
 {
-  FrameworkMap::iterator it = frameworks.find(frameworkId);
-  if (it == frameworks.end()) return NULL;
-  return it->second;
+  LOG(INFO) << "Http request for '/slave/stats.json'";
+
+  ostringstream out;
+
+  out <<
+    "{" <<
+    "\"uptime\":" << elapsedTime() - startTime << "," <<
+    "\"total_frameworks\":" << frameworks.size() << "," <<
+    "\"launched_tasks\":" << statistics.launched_tasks << "," <<
+    "\"finished_tasks\":" << statistics.finished_tasks << "," <<
+    "\"killed_tasks\":" << statistics.killed_tasks << "," <<
+    "\"failed_tasks\":" << statistics.failed_tasks << "," <<
+    "\"lost_tasks\":" << statistics.lost_tasks << "," <<
+    "\"valid_status_updates\":" << statistics.valid_status_updates << "," <<
+    "\"invalid_status_updates\":" << statistics.invalid_status_updates << "," <<
+    "\"valid_framework_messages\":" << statistics.valid_framework_messages << "," <<
+    "\"invalid_framework_messages\":" << statistics.invalid_framework_messages <<
+    "}";
+
+  HttpOKResponse response;
+  response.headers["Content-Type"] = "text/x-json;charset=UTF-8";
+  response.headers["Content-Length"] = lexical_cast<string>(out.str().size());
+  response.body = out.str().data();
+  return response;
+}
+
+
+Promise<HttpResponse> Slave::http_vars(const HttpRequest& request)
+{
+  LOG(INFO) << "HTTP request for '/slave/vars'";
+
+  ostringstream out;
+
+  out <<
+    "build_date " << build::DATE << "\n" <<
+    "build_user " << build::USER << "\n" <<
+    "build_flags " << build::FLAGS << "\n";
+
+  // Also add the configuration values.
+  foreachpair (const string& key, const string& value, conf.getMap()) {
+    out << key << " " << value << "\n";
+  }
+
+  out <<
+    "uptime " << elapsedTime() - startTime << "\n" <<
+    "total_frameworks " << frameworks.size() << "\n" <<
+    "launched_tasks " << statistics.launched_tasks << "\n" <<
+    "finished_tasks " << statistics.finished_tasks << "\n" <<
+    "killed_tasks " << statistics.killed_tasks << "\n" <<
+    "failed_tasks " << statistics.failed_tasks << "\n" <<
+    "lost_tasks " << statistics.lost_tasks << "\n" <<
+    "valid_status_updates " << statistics.valid_status_updates << "\n" <<
+    "invalid_status_updates " << statistics.invalid_status_updates << "\n" <<
+    "valid_framework_messages " << statistics.valid_framework_messages << "\n" <<
+    "invalid_framework_messages " << statistics.invalid_framework_messages << "\n";
+
+  HttpOKResponse response;
+  response.headers["Content-Type"] = "text/plain";
+  response.headers["Content-Length"] = lexical_cast<string>(out.str().size());
+  response.body = out.str().data();
+  return response;
 }
 
 
-Executor * Slave::getExecutor(FrameworkID frameworkId)
+Framework* Slave::getFramework(const FrameworkID& frameworkId)
 {
-  ExecutorMap::iterator it = executors.find(frameworkId);
-  if (it == executors.end()) return NULL;
-  return it->second;
+  if (frameworks.count(frameworkId) > 0) {
+    return frameworks[frameworkId];
+  }
+
+  return NULL;
 }
 
 
 // Send any tasks queued up for the given framework to its executor
 // (needed if we received tasks while the executor was starting up)
-void Slave::sendQueuedTasks(Framework *framework)
+void Slave::sendQueuedTasks(Framework* framework, Executor* executor)
 {
-  LOG(INFO) << "Flushing queued tasks for framework " << framework->id;
-  Executor *executor = getExecutor(framework->id);
-  if (!executor) return;
-  foreach(TaskDescription *td, framework->queuedTasks) {
-    send(executor->pid,
-        pack<S2E_RUN_TASK>(td->tid, td->name, td->args, td->params));
-    delete td;
+  LOG(INFO) << "Flushing queued tasks for framework "
+            << framework->frameworkId;
+
+  CHECK(executor->pid != UPID());
+
+  foreach (const TaskDescription& task, executor->queuedTasks) {
+    // Add the task to the executor.
+    executor->addTask(task);
+
+    MSG<S2E_RUN_TASK> out;
+    out.mutable_framework()->MergeFrom(framework->info);
+    out.mutable_framework_id()->MergeFrom(framework->frameworkId);
+    out.set_pid(framework->pid);
+    out.mutable_task()->MergeFrom(task);
+    send(executor->pid, out);
   }
-  framework->queuedTasks.clear();
+
+  executor->queuedTasks.clear();
 }
 
 
 // Kill a framework (including its executor if killExecutor is true).
-void Slave::killFramework(Framework *framework, bool killExecutor)
+void Slave::killFramework(Framework *framework, bool killExecutors)
 {
-  LOG(INFO) << "Cleaning up framework " << framework->id;
+  LOG(INFO) << "Cleaning up framework " << framework->frameworkId;
 
-  // Cancel sending any reliable messages for this framework.
-  foreach (int seq, seqs[framework->id])
-    cancel(seq);
+  // Shutdown all executors of this framework.
+  foreachpaircopy (const ExecutorID& executorId, Executor* executor, framework->executors) {
+    if (killExecutors) {
+      LOG(INFO) << "Killing executor '" << executorId
+                << "' of framework " << framework->frameworkId;
 
-  seqs.erase(framework->id);
+      send(executor->pid, S2E_KILL_EXECUTOR);
 
-  // Remove its allocated resources.
-  framework->resources = Resources();
-
-  // If an executor is running, tell it to exit and kill it.
-  if (Executor *ex = getExecutor(framework->id)) {
-    if (killExecutor) {
-      LOG(INFO) << "Killing executor for framework " << framework->id;
       // TODO(benh): There really isn't ANY time between when an
       // executor gets a S2E_KILL_EXECUTOR message and the isolation
       // module goes and kills it. We should really think about making
       // the semantics of this better.
-      send(ex->pid, pack<S2E_KILL_EXECUTOR>());
-      isolationModule->killExecutor(framework);
+
+      isolationModule->killExecutor(framework, executor);
     }
 
-    LOG(INFO) << "Cleaning up executor for framework " << framework->id;
-    delete ex;
-    executors.erase(framework->id);
+    framework->destroyExecutor(executorId);
   }
 
-  frameworks.erase(framework->id);
+  frameworks.erase(framework->frameworkId);
   delete framework;
 }
 
@@ -525,30 +936,58 @@ void Slave::killFramework(Framework *fra
 // Called by isolation module when an executor process exits
 // TODO(benh): Make this callback be a message so that we can avoid
 // race conditions.
-void Slave::executorExited(FrameworkID fid, int status)
+void Slave::executorExited(const FrameworkID& frameworkId, const ExecutorID& executorId, int result)
 {
-  if (Framework *f = getFramework(fid)) {
-    LOG(INFO) << "Executor for framework " << fid << " exited "
-              << "with status " << status;
-    send(master, pack<S2M_LOST_EXECUTOR>(id, fid, status));
-    killFramework(f, false);
+  Framework* framework = getFramework(frameworkId);
+  if (framework != NULL) {
+    Executor* executor = framework->getExecutor(executorId);
+    if (executor != NULL) {
+      LOG(INFO) << "Exited executor '" << executorId
+                << "' of framework " << frameworkId
+                << " with result " << result;
+
+      MSG<S2M_EXITED_EXECUTOR> out;
+      out.mutable_slave_id()->MergeFrom(slaveId);
+      out.mutable_framework_id()->MergeFrom(frameworkId);
+      out.mutable_executor_id()->MergeFrom(executorId);
+      out.set_result(result);
+      send(master, out);
+
+      framework->destroyExecutor(executorId);
+
+      // TODO(benh): When should we kill the presence of an entire
+      // framework on a slave?
+      if (framework->executors.size() == 0) {
+        killFramework(framework);
+      }
+    } else {
+      LOG(WARNING) << "UNKNOWN executor '" << executorId
+                   << "' of framework " << frameworkId
+                   << " has exited with result " << result;
+    }
+  } else {
+    LOG(WARNING) << "UNKNOWN executor '" << executorId
+                 << "' of UNKNOWN framework " << frameworkId
+                 << " has exited with result " << result;
   }
 };
 
 
-string Slave::getUniqueWorkDirectory(FrameworkID fid)
+string Slave::getUniqueWorkDirectory(const FrameworkID& frameworkId,
+                                     const ExecutorID& executorId)
 {
-  string workDir;
+  string workDir = ".";
   if (conf.contains("work_dir")) {
-    workDir = conf["work_dir"];
+    workDir = conf.get("work_dir", workDir);
   } else if (conf.contains("home")) {
-    workDir = conf["home"] + "/work";
-  } else {
-    workDir = "work";
+    workDir = conf.get("home", workDir);
   }
 
+  workDir = workDir + "/work";
+
   ostringstream os(std::ios_base::app | std::ios_base::out);
-  os << workDir << "/slave-" << id << "/fw-" << fid;
+  os << workDir << "/slave-" << slaveId
+     << "/fw-" << frameworkId << "-" << executorId;
 
   // Find a unique directory based on the path given by the slave
   // (this is because we might launch multiple executors from the same
@@ -569,7 +1008,7 @@ string Slave::getUniqueWorkDirectory(Fra
 }
 
 
-const Params& Slave::getConf()
+const Configuration& Slave::getConfiguration()
 {
   return conf;
 }

Modified: incubator/mesos/trunk/src/slave/slave.hpp
URL: http://svn.apache.org/viewvc/incubator/mesos/trunk/src/slave/slave.hpp?rev=1132344&r1=1132343&r2=1132344&view=diff
==============================================================================
--- incubator/mesos/trunk/src/slave/slave.hpp (original)
+++ incubator/mesos/trunk/src/slave/slave.hpp Sun Jun  5 09:28:33 2011
@@ -132,7 +132,7 @@ struct Executor
 // Information about a framework.
 struct Framework
 {
-  Framework( const FrameworkID& _frameworkId, const FrameworkInfo& _info,
+  Framework(const FrameworkID& _frameworkId, const FrameworkInfo& _info,
             const process::UPID& _pid)
     : frameworkId(_frameworkId), info(_info), pid(_pid) {}
 
@@ -200,13 +200,21 @@ public:
 
   process::Promise<state::SlaveState*> getState();
 
-  // Callback used by isolation module to tell us when an executor exits.
-  void executorExited(const FrameworkID& frameworkId, const ExecutorID& executorId, int result);
+  // Callback used by isolation module to tell us when an executor
+  // exits.
+  void executorExited(const FrameworkID& frameworkId,
+                      const ExecutorID& executorId,
+                      int result);
 
   // Kill a framework (possibly killing its executor).
   void killFramework(Framework *framework, bool killExecutors = true);
 
-  std::string getUniqueWorkDirectory(const FrameworkID& frameworkId, const ExecutorID& executorId);
+  // Helper function for generating a unique work directory for this
+  // framework/executor pair (non-trivial since a framework/executor
+  // pair may be launched more than once on the same slave).
+  std::string getUniqueWorkDirectory(
+    const FrameworkID& frameworkId,
+    const ExecutorID& executorId);
 
   const Configuration& getConfiguration();
 
@@ -260,7 +268,14 @@ protected:
   void sendQueuedTasks(Framework* framework, Executor* executor);
 
 private:
-  Configuration conf;
+  // TODO(benh): Better naming and name scope for these http handlers.
+  process::Promise<process::HttpResponse> http_info_json(const process::HttpRequest& request);
+  process::Promise<process::HttpResponse> http_frameworks_json(const process::HttpRequest& request);
+  process::Promise<process::HttpResponse> http_tasks_json(const process::HttpRequest& request);
+  process::Promise<process::HttpResponse> http_stats_json(const process::HttpRequest& request);
+  process::Promise<process::HttpResponse> http_vars(const process::HttpRequest& request);
+
+  const Configuration conf;
 
   SlaveInfo slave;
 
@@ -271,6 +286,21 @@ private:
   boost::unordered_map<FrameworkID, Framework*> frameworks;
 
   IsolationModule *isolationModule;
+
+  // Statistics (initialized in Slave::initialize).
+  struct {
+    uint64_t launched_tasks;
+    uint64_t finished_tasks;
+    uint64_t killed_tasks;
+    uint64_t failed_tasks;
+    uint64_t lost_tasks;
+    uint64_t valid_status_updates;
+    uint64_t invalid_status_updates;
+    uint64_t valid_framework_messages;
+    uint64_t invalid_framework_messages;
+  } statistics;
+
+  double startTime;
 };
 
 }}}