You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@mesos.apache.org by be...@apache.org on 2011/06/05 11:25:43 UTC

svn commit: r1132329 [6/6] - in /incubator/mesos/trunk: ./ src/ src/common/ src/config/ src/event_history/ src/examples/ src/local/ src/master/ src/messaging/ src/slave/ src/tests/ third_party/sqlite-3.6.23.1/

Modified: incubator/mesos/trunk/src/slave/slave.cpp
URL: http://svn.apache.org/viewvc/incubator/mesos/trunk/src/slave/slave.cpp?rev=1132329&r1=1132328&r2=1132329&view=diff
==============================================================================
--- incubator/mesos/trunk/src/slave/slave.cpp (original)
+++ incubator/mesos/trunk/src/slave/slave.cpp Sun Jun  5 09:25:41 2011
@@ -6,8 +6,6 @@
 #include <algorithm>
 #include <fstream>
 
-#include <google/protobuf/descriptor.h>
-
 #include "slave.hpp"
 #include "webui.hpp"
 
@@ -16,69 +14,101 @@
 #define gethostbyname2(name, _) gethostbyname(name)
 #endif
 
-using namespace mesos;
-using namespace mesos::internal;
-using namespace mesos::internal::slave;
-
-using boost::unordered_map;
-using boost::unordered_set;
-
-using process::Promise;
-using process::UPID;
-
 using std::list;
 using std::make_pair;
 using std::ostringstream;
+using std::istringstream;
 using std::pair;
 using std::queue;
 using std::string;
 using std::vector;
 
+using boost::lexical_cast;
+using boost::unordered_map;
+using boost::unordered_set;
 
-Slave::Slave(const Resources& _resources, bool _local,
-             IsolationModule *_isolationModule)
-  : resources(_resources), local(_local),
-    isolationModule(_isolationModule)
+using namespace mesos;
+using namespace mesos::internal;
+using namespace mesos::internal::slave;
+
+
+namespace {
+
+// Periodically sends heartbeats to the master
+class Heart : public MesosProcess
 {
-  initialize();
-}
+private:
+  PID master;
+  PID slave;
+  SlaveID sid;
+  double interval;
+
+protected:
+  void operator () ()
+  {
+    link(slave);
+    link(master);
+    do {
+      switch (receive(interval)) {
+      case PROCESS_TIMEOUT:
+	send(master, pack<SH2M_HEARTBEAT>(sid));
+	break;
+      case PROCESS_EXIT:
+	return;
+      }
+    } while (true);
+  }
+
+public:
+  Heart(const PID &_master, const PID &_slave, SlaveID _sid, double _interval)
+    : master(_master), slave(_slave), sid(_sid), interval(_interval) {}
+};
+
 
+// Default values for CPU cores and memory to include in configuration
+const int32_t DEFAULT_CPUS = 1;
+const int32_t DEFAULT_MEM = 1 * Gigabyte;
 
-Slave::Slave(const Configuration& _conf, bool _local,
-             IsolationModule* _isolationModule)
-  : conf(_conf), local(_local),
-    isolationModule(_isolationModule)
-{
-  resources =
-    Resources::parse(conf.get<string>("resources", "cpus:1;mem:1024"));
-
-  initialize();
+
+} /* namespace */
+
+
+Slave::Slave(Resources _resources, bool _local,
+             IsolationModule *_isolationModule)
+  : id(""), resources(_resources), local(_local),
+    isolationModule(_isolationModule) {}
+
+
+Slave::Slave(const Params& _conf, bool _local, IsolationModule *_module)
+  : id(""), conf(_conf), local(_local), isolationModule(_module)
+{
+  resources = Resources(conf.get<int32_t>("cpus", DEFAULT_CPUS),
+                        conf.get<int32_t>("mem", DEFAULT_MEM));
 }
 
 
-void Slave::registerOptions(Configurator* configurator)
-{
-  // TODO(benh): Is there a way to specify units for the resources?
-  configurator->addOption<string>("resources",
-                                  "Total consumable resources per slave\n");
-//   configurator->addOption<string>("attributes",
-//                                   "Attributes of machine\n");
-  configurator->addOption<string>("work_dir",
-                                  "Where to place framework work directories\n"
-                                  "(default: MESOS_HOME/work)");
-  configurator->addOption<string>("hadoop_home",
-                                  "Where to find Hadoop installed (for\n"
-                                  "fetching framework executors from HDFS)\n"
-                                  "(default: look for HADOOP_HOME in\n"
-                                  "environment or find hadoop on PATH)");
-  configurator->addOption<bool>("switch_user", 
-                                "Whether to run tasks as the user who\n"
-                                "submitted them rather than the user running\n"
-                                "the slave (requires setuid permission)",
-                                true);
-  configurator->addOption<string>("frameworks_home",
-                                  "Directory prepended to relative executor\n"
-                                  "paths (default: MESOS_HOME/frameworks)");
+void Slave::registerOptions(Configurator* conf)
+{
+  conf->addOption<int32_t>("cpus", 'c', "CPU cores for use by tasks",
+                           DEFAULT_CPUS);
+  conf->addOption<int64_t>("mem", 'm', "Memory for use by tasks, in MB\n",
+                           DEFAULT_MEM);
+  conf->addOption<string>("work_dir",
+                          "Where to place framework work directories\n"
+                          "(default: MESOS_HOME/work)");
+  conf->addOption<string>("hadoop_home",
+                          "Where to find Hadoop installed (for fetching\n"
+                          "framework executors from HDFS)\n"
+                          "(default: look for HADOOP_HOME environment\n"
+                          "variable or find hadoop on PATH)");
+  conf->addOption<bool>("switch_user", 
+                        "Whether to run tasks as the user who\n"
+                        "submitted them rather than the user running\n"
+                        "the slave (requires setuid permission)",
+                        true);
+   conf->addOption<string>("frameworks_home",
+                           "Directory prepended to relative executor\n"
+                           "paths (default: MESOS_HOME/frameworks)");
 }
 
 
@@ -88,63 +118,25 @@ Slave::~Slave()
 }
 
 
-Promise<state::SlaveState*> Slave::getState()
+state::SlaveState *Slave::getState()
 {
-  Resources resources(resources);
-  Resource::Scalar cpus;
-  Resource::Scalar mem;
-  cpus.set_value(-1);
-  mem.set_value(-1);
-  cpus = resources.getScalar("cpus", cpus);
-  mem = resources.getScalar("mem", mem);
-
-  state::SlaveState* state =
-    new state::SlaveState(build::DATE, build::USER, slaveId.value(),
-                          cpus.value(), mem.value(), self(), master);
-
-  foreachpair (_, Framework* f, frameworks) {
-    foreachpair (_, Executor* e, f->executors) {
-      Resources resources(e->resources);
-      Resource::Scalar cpus;
-      Resource::Scalar mem;
-      cpus.set_value(-1);
-      mem.set_value(-1);
-      cpus = resources.getScalar("cpus", cpus);
-      mem = resources.getScalar("mem", mem);
-
-      // TOOD(benh): For now, we will add a state::Framework object
-      // for each executor that the framework has. Therefore, we tweak
-      // the framework ID to also include the associated executor ID
-      // to differentiate them. This is so we don't have to make very
-      // many changes to the webui right now. Note that this ID
-      // construction must be identical to what we do for directory
-      // suffix returned from Slave::getUniqueWorkDirectory.
-
-      string id = f->frameworkId.value() + "-" + e->info.executor_id().value();
-
-      state::Framework* framework =
-        new state::Framework(id, f->info.name(),
-                             e->info.uri(), e->executorStatus,
-                             cpus.value(), mem.value());
-
-      state->frameworks.push_back(framework);
-
-      foreachpair (_, Task* t, e->tasks) {
-        Resources resources(t->resources());
-        Resource::Scalar cpus;
-        Resource::Scalar mem;
-        cpus.set_value(-1);
-        mem.set_value(-1);
-        cpus = resources.getScalar("cpus", cpus);
-        mem = resources.getScalar("mem", mem);
-
-        state::Task* task =
-          new state::Task(t->task_id().value(), t->name(),
-                          TaskState_descriptor()->FindValueByNumber(t->state())->name(),
-                          cpus.value(), mem.value());
-
-        framework->tasks.push_back(task);
-      }
+  std::ostringstream my_pid;
+  my_pid << self();
+  std::ostringstream master_pid;
+  master_pid << master;
+  state::SlaveState *state =
+    new state::SlaveState(BUILD_DATE, BUILD_USER, id, resources.cpus, 
+        resources.mem, my_pid.str(), master_pid.str());
+
+  foreachpair(_, Framework *f, frameworks) {
+    state::Framework *framework = new state::Framework(f->id, f->name, 
+        f->executorInfo.uri, f->executorStatus, f->resources.cpus,
+        f->resources.mem);
+    state->frameworks.push_back(framework);
+    foreachpair(_, Task *t, f->tasks) {
+      state::Task *task = new state::Task(t->id, t->name, t->state,
+          t->resources.cpus, t->resources.mem);
+      framework->tasks.push_back(task);
     }
   }
 
@@ -155,556 +147,377 @@ Promise<state::SlaveState*> Slave::getSt
 void Slave::operator () ()
 {
   LOG(INFO) << "Slave started at " << self();
-  LOG(INFO) << "Slave resources: " << resources;
 
   // Get our hostname
-  char buf[256];
+  char buf[512];
   gethostname(buf, sizeof(buf));
-  hostent* he = gethostbyname2(buf, AF_INET);
+  hostent *he = gethostbyname2(buf, AF_INET);
   string hostname = he->h_name;
 
-  // Check and see if we have a different public DNS name. Normally
-  // this is our hostname, but on EC2 we look for the MESOS_PUBLIC_DNS
-  // environment variable. This allows the master to display our
-  // public name in its web UI.
-  string public_hostname = hostname;
+  // Get our public DNS name. Normally this is our hostname, but on EC2
+  // we look for the MESOS_PUBLIC_DNS environment variable. This allows
+  // the master to display our public name in its web UI.
+  string publicDns = hostname;
   if (getenv("MESOS_PUBLIC_DNS") != NULL) {
-    public_hostname = getenv("MESOS_PUBLIC_DNS");
+    publicDns = getenv("MESOS_PUBLIC_DNS");
   }
 
-  // Initialize slave info.
-  slave.set_hostname(hostname);
-  slave.set_public_hostname(public_hostname);
-  slave.mutable_resources()->MergeFrom(resources);
-
   // Initialize isolation module.
   isolationModule->initialize(this);
 
   while (true) {
-    serve(1);
-    if (name() == process::TERMINATE) {
-      LOG(INFO) << "Asked to shut down by " << from();
-      foreachpaircopy (_, Framework* framework, frameworks) {
-        killFramework(framework);
-      }
-      return;
-    }
-  }
-}
-
-
-void Slave::initialize()
-{
-  install(NEW_MASTER_DETECTED, &Slave::newMasterDetected,
-          &NewMasterDetectedMessage::pid);
-
-  install(NO_MASTER_DETECTED, &Slave::noMasterDetected);
-
-  install(M2S_REGISTER_REPLY, &Slave::registerReply,
-          &SlaveRegisteredMessage::slave_id);
-
-  install(M2S_REREGISTER_REPLY, &Slave::reregisterReply,
-          &SlaveRegisteredMessage::slave_id);
-
-  install(M2S_RUN_TASK, &Slave::runTask,
-          &RunTaskMessage::framework,
-          &RunTaskMessage::framework_id,
-          &RunTaskMessage::pid,
-          &RunTaskMessage::task);
-
-  install(M2S_KILL_TASK, &Slave::killTask,
-          &KillTaskMessage::framework_id,
-          &KillTaskMessage::task_id);
-
-  install(M2S_KILL_FRAMEWORK, &Slave::killFramework,
-          &KillFrameworkMessage::framework_id);
+    switch (receive()) {
+      case NEW_MASTER_DETECTED: {
+	string masterSeq;
+	PID masterPid;
+	tie(masterSeq, masterPid) = unpack<NEW_MASTER_DETECTED>(body());
+
+	LOG(INFO) << "New master at " << masterPid << " with ID:" << masterSeq;
+
+        redirect(master, masterPid);
+	master = masterPid;
+	link(master);
+
+	if (id.empty()) {
+	  // Slave started before master.
+	  send(master, pack<S2M_REGISTER_SLAVE>(hostname, publicDns, resources));
+	} else {
+	  // Reconnecting, so reconstruct resourcesInUse for the master.
+	  Resources resourcesInUse; 
+	  vector<Task> taskVec;
+
+	  foreachpair(_, Framework *framework, frameworks) {
+	    foreachpair(_, Task *task, framework->tasks) {
+	      resourcesInUse += task->resources;
+	      Task ti = *task;
+	      ti.slaveId = id;
+	      taskVec.push_back(ti);
+	    }
+	  }
 
-  install(M2S_FRAMEWORK_MESSAGE, &Slave::schedulerMessage,
-          &FrameworkMessageMessage::slave_id,
-          &FrameworkMessageMessage::framework_id,
-          &FrameworkMessageMessage::executor_id,
-          &FrameworkMessageMessage::data);
-
-  install(M2S_UPDATE_FRAMEWORK, &Slave::updateFramework,
-          &UpdateFrameworkMessage::framework_id,
-          &UpdateFrameworkMessage::pid);
-
-  install(M2S_STATUS_UPDATE_ACK, &Slave::statusUpdateAck,
-          &StatusUpdateAckMessage::framework_id,
-          &StatusUpdateAckMessage::slave_id,
-          &StatusUpdateAckMessage::task_id);
-
-  install(E2S_REGISTER_EXECUTOR, &Slave::registerExecutor,
-          &RegisterExecutorMessage::framework_id,
-          &RegisterExecutorMessage::executor_id);
-
-  install(E2S_STATUS_UPDATE, &Slave::statusUpdate,
-          &StatusUpdateMessage::framework_id,
-          &StatusUpdateMessage::status);
-
-  install(E2S_FRAMEWORK_MESSAGE, &Slave::executorMessage,
-          &FrameworkMessageMessage::slave_id,
-          &FrameworkMessageMessage::framework_id,
-          &FrameworkMessageMessage::executor_id,
-          &FrameworkMessageMessage::data);
-
-  install(PING, &Slave::ping);
-
-  install(process::TIMEOUT, &Slave::timeout);
-
-  install(process::EXITED, &Slave::exited);
-}
-
-
-void Slave::newMasterDetected(const string& pid)
-{
-  LOG(INFO) << "New master detected at " << pid;
-
-  master = pid;
-  link(master);
-
-  if (slaveId == "") {
-    // Slave started before master.
-    MSG<S2M_REGISTER_SLAVE> out;
-    out.mutable_slave()->MergeFrom(slave);
-    send(master, out);
-  } else {
-    // Re-registering, so send tasks running.
-    MSG<S2M_REREGISTER_SLAVE> out;
-    out.mutable_slave_id()->MergeFrom(slaveId);
-    out.mutable_slave()->MergeFrom(slave);
-
-    foreachpair (_, Framework* framework, frameworks) {
-      foreachpair (_, Executor* executor, framework->executors) {
-	foreachpair (_, Task* task, executor->tasks) {
-	  out.add_tasks()->MergeFrom(*task);
+	  send(master, pack<S2M_REREGISTER_SLAVE>(id, hostname, publicDns, resources, taskVec));
 	}
+	break;
+      }
+	
+      case NO_MASTER_DETECTED: {
+	LOG(INFO) << "Lost master(s) ... waiting";
+	break;
       }
-    }
-
-    send(master, out);
-  }
-}
-
-
-void Slave::noMasterDetected()
-{
-  LOG(INFO) << "Lost master(s) ... waiting";
-}
-
-
-void Slave::registerReply(const SlaveID& slaveId)
-{
-  LOG(INFO) << "Registered with master; given slave ID " << slaveId;
-  this->slaveId = slaveId;
-}
-
-
-void Slave::reregisterReply(const SlaveID& slaveId)
-{
-  LOG(INFO) << "Re-registered with master";
-
-  if (!(this->slaveId == slaveId)) {
-    LOG(FATAL) << "Slave re-registered but got wrong ID";
-  }
-}
-
-
-void Slave::runTask(const FrameworkInfo& frameworkInfo,
-                    const FrameworkID& frameworkId,
-                    const string& pid,
-                    const TaskDescription& task)
-{
-  LOG(INFO) << "Got assigned task " << task.task_id()
-            << " for framework " << frameworkId;
-
-  Framework* framework = getFramework(frameworkId);
-  if (framework == NULL) {
-    framework = new Framework(frameworkId, frameworkInfo, pid);
-    frameworks[frameworkId] = framework;
-  }
-
-  // Either send the task to an executor or start a new executor
-  // and queue the task until the executor has started.
-  Executor* executor = task.has_executor()
-    ? framework->getExecutor(task.executor().executor_id())
-    : framework->getExecutor(framework->info.executor().executor_id());
-        
-  if (executor != NULL) {
-    if (!executor->pid) {
-      // Queue task until the executor starts up.
-      executor->queuedTasks.push_back(task);
-    } else {
-      // Add the task to the executor.
-      executor->addTask(task);
-
-      MSG<S2E_RUN_TASK> out;
-      out.mutable_framework()->MergeFrom(framework->info);
-      out.mutable_framework_id()->MergeFrom(framework->frameworkId);
-      out.set_pid(framework->pid);
-      out.mutable_task()->MergeFrom(task);
-      send(executor->pid, out);
-      isolationModule->resourcesChanged(framework, executor);
-    }
-  } else {
-    // Launch an executor for this task.
-    if (task.has_executor()) {
-      executor = framework->createExecutor(task.executor());
-    } else {
-      executor = framework->createExecutor(framework->info.executor());
-    }
-
-    // Queue task until the executor starts up.
-    executor->queuedTasks.push_back(task);
-
-    // Tell the isolation module to launch the executor.
-    isolationModule->launchExecutor(framework, executor);
-  }
-}
-
-
-void Slave::killTask(const FrameworkID& frameworkId,
-                     const TaskID& taskId)
-{
-  LOG(INFO) << "Asked to kill task " << taskId
-            << " of framework " << frameworkId;
-
-  Framework* framework = getFramework(frameworkId);
-  if (framework != NULL) {
-    // Tell the executor to kill the task if it is up and
-    // running, otherwise, consider the task lost.
-    Executor* executor = framework->getExecutor(taskId);
-    if (executor == NULL || !executor->pid) {
-      // Update the resources locally, if an executor comes up
-      // after this then it just won't receive this task.
-      executor->removeTask(taskId);
-      isolationModule->resourcesChanged(framework, executor);
-
-      MSG<S2M_STATUS_UPDATE> out;
-      out.mutable_framework_id()->MergeFrom(frameworkId);
-      TaskStatus *status = out.mutable_status();
-      status->mutable_task_id()->MergeFrom(taskId);
-      status->mutable_slave_id()->MergeFrom(slaveId);
-      status->set_state(TASK_LOST);
-      send(master, out);
-
-      double deadline = elapsed() + STATUS_UPDATE_RETRY_TIMEOUT;
-      framework->statuses[deadline][status->task_id()] = *status;
-    } else {
-      // Otherwise, send a message to the executor and wait for
-      // it to send us a status update.
-      MSG<S2E_KILL_TASK> out;
-      out.mutable_framework_id()->MergeFrom(frameworkId);
-      out.mutable_task_id()->MergeFrom(taskId);
-      send(executor->pid, out);
-    }
-  } else {
-    LOG(WARNING) << "Cannot kill task " << taskId
-                 << " of framework " << frameworkId
-                 << " because no such framework is running";
-
-    MSG<S2M_STATUS_UPDATE> out;
-    out.mutable_framework_id()->MergeFrom(frameworkId);
-    TaskStatus *status = out.mutable_status();
-    status->mutable_task_id()->MergeFrom(taskId);
-    status->mutable_slave_id()->MergeFrom(slaveId);
-    status->set_state(TASK_LOST);
-    send(master, out);
-
-    double deadline = elapsed() + STATUS_UPDATE_RETRY_TIMEOUT;
-    framework->statuses[deadline][status->task_id()] = *status;
-  }
-}
-
-
-void Slave::killFramework(const FrameworkID& frameworkId)
-{
-  LOG(INFO) << "Asked to kill framework " << frameworkId;
-
-  Framework* framework = getFramework(frameworkId);
-  if (framework != NULL) {
-    killFramework(framework);
-  }
-}
-
-
-void Slave::schedulerMessage(const SlaveID& slaveId,
-			     const FrameworkID& frameworkId,
-			     const ExecutorID& executorId,
-                             const string& data)
-{
-  Framework* framework = getFramework(frameworkId);
-  if (framework != NULL) {
-    Executor* executor = framework->getExecutor(executorId);
-    if (executor == NULL) {
-      LOG(WARNING) << "Dropping message for executor '"
-                   << executorId << "' of framework " << frameworkId
-                   << " because executor does not exist";
-    } else if (!executor->pid) {
-      // TODO(*): If executor is not started, queue framework message?
-      // (It's probably okay to just drop it since frameworks can have
-      // the executor send a message to the master to say when it's ready.)
-      LOG(WARNING) << "Dropping message for executor '"
-                   << executorId << "' of framework " << frameworkId
-                   << " because executor is not running";
-    } else {
-      MSG<S2E_FRAMEWORK_MESSAGE> out;
-      out.mutable_slave_id()->MergeFrom(slaveId);
-      out.mutable_framework_id()->MergeFrom(frameworkId);
-      out.mutable_executor_id()->MergeFrom(executorId);
-      out.set_data(data);
-      send(executor->pid, out);
-    }
-  } else {
-    LOG(WARNING) << "Dropping message for framework "<< frameworkId
-                 << " because it does not exist";
-  }
-}
-
 
-void Slave::updateFramework(const FrameworkID& frameworkId,
-                            const string& pid)
-{
-  Framework* framework = getFramework(frameworkId);
-  if (framework != NULL) {
-    LOG(INFO) << "Updating framework " << frameworkId
-              << " pid to " <<pid;
-    framework->pid = pid;
-  }
-}
+      case M2S_REGISTER_REPLY: {
+	double interval = 0;
+        tie(this->id, interval) = unpack<M2S_REGISTER_REPLY>(body());
+        LOG(INFO) << "Registered with master; given slave ID " << this->id;
+        link(spawn(new Heart(master, self(), this->id, interval)));
+        break;
+      }
+      
+      case M2S_REREGISTER_REPLY: {
+        SlaveID sid;
+	double interval = 0;
+        tie(sid, interval) = unpack<M2S_REREGISTER_REPLY>(body());
+        LOG(INFO) << "RE-registered with master; given slave ID " << sid << " had "<< this->id;
+        if (this->id == "")
+          this->id = sid;
+        CHECK(this->id == sid);
+        link(spawn(new Heart(master, self(), this->id, interval)));
+        break;
+      }
+      
+      case M2S_RUN_TASK: {
+	FrameworkID fid;
+        TaskID tid;
+        string fwName, user, taskName, taskArg;
+        ExecutorInfo execInfo;
+        Params params;
+        PID pid;
+        tie(fid, tid, fwName, user, execInfo, taskName, taskArg, params, pid) =
+          unpack<M2S_RUN_TASK>(body());
+        LOG(INFO) << "Got assigned task " << fid << ":" << tid;
+        Resources res;
+        res.cpus = params.getInt32("cpus", -1);
+        res.mem = params.getInt64("mem", -1);
+        Framework *framework = getFramework(fid);
+        if (framework == NULL) {
+          // Framework not yet created on this node - create it.
+          framework = new Framework(fid, fwName, user, execInfo, pid);
+          frameworks[fid] = framework;
+          isolationModule->startExecutor(framework);
+        }
+        Task *task = framework->addTask(tid, taskName, res);
+        Executor *executor = getExecutor(fid);
+        if (executor) {
+          send(executor->pid,
+               pack<S2E_RUN_TASK>(tid, taskName, taskArg, params));
+          isolationModule->resourcesChanged(framework);
+        } else {
+          // Executor not yet registered; queue task for when it starts up
+          TaskDescription *td = new TaskDescription(
+              tid, taskName, taskArg, params.str());
+          framework->queuedTasks.push_back(td);
+        }
+        break;
+      }
 
+      case M2S_KILL_TASK: {
+        FrameworkID fid;
+        TaskID tid;
+        tie(fid, tid) = unpack<M2S_KILL_TASK>(body());
+        LOG(INFO) << "Killing task " << fid << ":" << tid;
+        if (Executor *ex = getExecutor(fid)) {
+          send(ex->pid, pack<S2E_KILL_TASK>(tid));
+        }
+        if (Framework *fw = getFramework(fid)) {
+          fw->removeTask(tid);
+          isolationModule->resourcesChanged(fw);
+        }
+        break;
+      }
 
-void Slave::statusUpdateAck(const FrameworkID& frameworkId,
-                            const SlaveID& slaveId,
-                            const TaskID& taskId)
-{
-  Framework* framework = getFramework(frameworkId);
-  if (framework != NULL) {
-    foreachpair (double deadline, _, framework->statuses) {
-      if (framework->statuses[deadline].count(taskId) > 0) {
-        LOG(INFO) << "Got acknowledgement of status update"
-                  << " for task " << taskId
-                  << " of framework " << framework->frameworkId;
-        framework->statuses[deadline].erase(taskId);
+      case M2S_KILL_FRAMEWORK: {
+        FrameworkID fid;
+        tie(fid) = unpack<M2S_KILL_FRAMEWORK>(body());
+        LOG(INFO) << "Asked to kill framework " << fid;
+        Framework *fw = getFramework(fid);
+        if (fw != NULL)
+          killFramework(fw);
         break;
       }
-    }
-  }
-}
 
+      case M2S_FRAMEWORK_MESSAGE: {
+        FrameworkID fid;
+        FrameworkMessage message;
+        tie(fid, message) = unpack<M2S_FRAMEWORK_MESSAGE>(body());
+        if (Executor *ex = getExecutor(fid)) {
+          VLOG(1) << "Relaying framework message for framework " << fid;
+          send(ex->pid, pack<S2E_FRAMEWORK_MESSAGE>(message));
+        } else {
+          VLOG(1) << "Dropping framework message for framework " << fid
+                  << " because its executor is not running";
+        }
+        // TODO(*): If executor is not started, queue framework message?
+        // (It's probably okay to just drop it since frameworks can have
+        // the executor send a message to the master to say when it's ready.)
+        break;
+      }
 
-void Slave::registerExecutor(const FrameworkID& frameworkId,
-                             const ExecutorID& executorId)
-{
-  LOG(INFO) << "Got registration for executor '" << executorId
-            << "' of framework " << frameworkId;
+      case M2S_UPDATE_FRAMEWORK_PID: {
+        FrameworkID fid;
+        PID pid;
+        tie(fid, pid) = unpack<M2S_UPDATE_FRAMEWORK_PID>(body());
+        Framework *framework = getFramework(fid);
+        if (framework != NULL) {
+          LOG(INFO) << "Updating framework " << fid << " pid to " << pid;
+          framework->pid = pid;
+        }
+        break;
+      }
 
-  Framework* framework = getFramework(frameworkId);
-  if (framework != NULL) {
-    Executor* executor = framework->getExecutor(executorId);
-
-    // Check the status of the executor.
-    if (executor == NULL) {
-      LOG(WARNING) << "Not expecting executor '" << executorId
-                   << "' of framework " << frameworkId;
-      send(from(), S2E_KILL_EXECUTOR);
-    } else if (executor->pid != UPID()) {
-      LOG(WARNING) << "Not good, executor '" << executorId
-                   << "' of framework " << frameworkId
-                   << " is already running";
-      send(from(), S2E_KILL_EXECUTOR);
-    } else {
-      // Save the pid for the executor.
-      executor->pid = from();
-
-      // Now that the executor is up, set its resource limits.
-      isolationModule->resourcesChanged(framework, executor);
-
-      // Tell executor it's registered and give it any queued tasks.
-      MSG<S2E_REGISTER_REPLY> out;
-      ExecutorArgs* args = out.mutable_args();
-      args->mutable_framework_id()->MergeFrom(framework->frameworkId);
-      args->mutable_executor_id()->MergeFrom(executor->info.executor_id());
-      args->mutable_slave_id()->MergeFrom(slaveId);
-      args->set_hostname(slave.hostname());
-      args->set_data(framework->info.executor().data());
-      send(executor->pid, out);
-      sendQueuedTasks(framework, executor);
-    }
-  } else {
-    // Framework is gone; tell the executor to exit.
-    LOG(WARNING) << "Framework " << frameworkId
-                 << " does not exist (it may have been killed),"
-                 << " telling executor to exit";
-
-    // TODO(benh): Don't we also want to tell the isolation
-    // module to shut this guy down!
-    send(from(), S2E_KILL_EXECUTOR);
-  }
-}
+      case E2S_REGISTER_EXECUTOR: {
+        FrameworkID fid;
+        tie(fid) = unpack<E2S_REGISTER_EXECUTOR>(body());
+        LOG(INFO) << "Got executor registration for framework " << fid;
+        if (Framework *fw = getFramework(fid)) {
+          if (getExecutor(fid) != 0) {
+            LOG(ERROR) << "Executor for framework " << fid
+                       << "already exists";
+            send(from(), pack<S2E_KILL_EXECUTOR>());
+            break;
+          }
+          Executor *executor = new Executor(fid, from());
+          executors[fid] = executor;
+          link(from());
+          // Now that the executor is up, set its resource limits
+          isolationModule->resourcesChanged(fw);
+          // Tell executor that it's registered and give it its queued tasks
+          send(from(), pack<S2E_REGISTER_REPLY>(this->id,
+                                                hostname,
+                                                fw->name,
+                                                fw->executorInfo.initArg));
+          sendQueuedTasks(fw);
+        } else {
+          // Framework is gone; tell the executor to exit
+          send(from(), pack<S2E_KILL_EXECUTOR>());
+        }
+        break;
+      }
 
+      case E2S_STATUS_UPDATE: {
+        FrameworkID fid;
+        TaskID tid;
+        TaskState taskState;
+        string data;
+        tie(fid, tid, taskState, data) = unpack<E2S_STATUS_UPDATE>(body());
+
+        Framework *framework = getFramework(fid);
+        if (framework != NULL) {
+	  LOG(INFO) << "Got status update for task " << fid << ":" << tid;
+	  if (taskState == TASK_FINISHED || taskState == TASK_FAILED ||
+	      taskState == TASK_KILLED || taskState == TASK_LOST) {
+	    LOG(INFO) << "Task " << fid << ":" << tid << " done";
+
+            framework->removeTask(tid);
+            isolationModule->resourcesChanged(framework);
+          }
+
+	  // Reliably send message and save sequence number for
+	  // canceling later.
+	  int seq = rsend(master, framework->pid,
+			  pack<S2M_STATUS_UPDATE>(id, fid, tid,
+                                                  taskState, data));
+	  seqs[fid].insert(seq);
+	} else {
+	  LOG(WARNING) << "Got status update for UNKNOWN task "
+		       << fid << ":" << tid;
+	}
+        break;
+      }
 
-void Slave::statusUpdate(const FrameworkID& frameworkId,
-                         const TaskStatus& status)
-{
-  LOG(INFO) << "Status update: task " << status.task_id()
-            << " of framework " << frameworkId
-            << " is now in state "
-            << TaskState_descriptor()->FindValueByNumber(status.state())->name();
-
-  Framework* framework = getFramework(frameworkId);
-  if (framework != NULL) {
-    Executor* executor = framework->getExecutor(status.task_id());
-    if (executor != NULL) {
-      executor->updateTaskState(status.task_id(), status.state());
-      if (status.state() == TASK_FINISHED ||
-          status.state() == TASK_FAILED ||
-          status.state() == TASK_KILLED ||
-          status.state() == TASK_LOST) {
-        executor->removeTask(status.task_id());
-        isolationModule->resourcesChanged(framework, executor);
-      }
-
-      // Send message and record the status for possible resending.
-      MSG<S2M_STATUS_UPDATE> out;
-      out.mutable_framework_id()->MergeFrom(frameworkId);
-      out.mutable_status()->MergeFrom(status);
-      send(master, out);
-
-      double deadline = elapsed() + STATUS_UPDATE_RETRY_TIMEOUT;
-      framework->statuses[deadline][status.task_id()] = status;
-    } else {
-      LOG(WARNING) << "Status update error: couldn't lookup "
-                   << "executor for framework " << frameworkId;
-    }
-  } else {
-    LOG(WARNING) << "Status update error: couldn't lookup "
-                 << "framework " << frameworkId;
-  }
-}
+      case E2S_FRAMEWORK_MESSAGE: {
+        FrameworkID fid;
+        FrameworkMessage message;
+        tie(fid, message) = unpack<E2S_FRAMEWORK_MESSAGE>(body());
+
+        Framework *framework = getFramework(fid);
+        if (framework != NULL) {
+	  LOG(INFO) << "Sending message for framework " << fid
+		    << " to " << framework->pid;
+
+          // Set slave ID in case framework omitted it.
+          message.slaveId = this->id;
+          VLOG(1) << "Sending framework message to framework " << fid
+                  << " with PID " << framework->pid;
+          send(framework->pid, pack<M2F_FRAMEWORK_MESSAGE>(message));
+        }
+        break;
+      }
 
+      case S2S_GET_STATE: {
+ 	send(from(), pack<S2S_GET_STATE_REPLY>(getState()));
+	break;
+      }
 
-void Slave::executorMessage(const SlaveID& slaveId,
-			    const FrameworkID& frameworkId,
-			    const ExecutorID& executorId,
-                            const string& data)
-{
-  Framework* framework = getFramework(frameworkId);
-  if (framework != NULL) {
-    LOG(INFO) << "Sending message for framework " << frameworkId
-              << " to " << framework->pid;
-
-    // TODO(benh): This is weird, sending an M2F message.
-    MSG<M2F_FRAMEWORK_MESSAGE> out;
-    out.mutable_slave_id()->MergeFrom(slaveId);
-    out.mutable_framework_id()->MergeFrom(frameworkId);
-    out.mutable_executor_id()->MergeFrom(executorId);
-    out.set_data(data);
-    send(framework->pid, out);
-  }
-}
+      case PROCESS_EXIT: {
+        LOG(INFO) << "Process exited: " << from();
 
+        if (from() == master) {
+	  LOG(WARNING) << "Master disconnected! "
+		       << "Waiting for a new master to be elected.";
+	  // TODO(benh): After so long waiting for a master, commit suicide.
+	} else {
+	  // Check if an executor has exited (this is technically
+	  // redundant because the isolation module should be doing
+	  // this for us).
+	  foreachpair (_, Executor *ex, executors) {
+	    if (from() == ex->pid) {
+	      LOG(INFO) << "Executor for framework " << ex->frameworkId
+			<< " disconnected";
+	      Framework *framework = getFramework(ex->frameworkId);
+	      if (framework != NULL) {
+		send(master, pack<S2M_LOST_EXECUTOR>(id, ex->frameworkId, -1));
+		killFramework(framework);
+	      }
+	      break;
+	    }
+	  }
+	}
 
-void Slave::ping()
-{
-  send(from(), PONG);
-}
+        break;
+      }
 
+      case M2S_SHUTDOWN: {
+        LOG(INFO) << "Asked to shut down by master: " << from();
+        unordered_map<FrameworkID, Framework*> frameworksCopy = frameworks;
+        foreachpair (_, Framework *framework, frameworksCopy) {
+          killFramework(framework);
+        }
+        return;
+      }
 
-void Slave::timeout()
-{
-  // Check and see if we should re-send any status updates.
-  foreachpair (_, Framework* framework, frameworks) {
-    foreachpair (double deadline, _, framework->statuses) {
-      if (deadline <= elapsed()) {
-        foreachpair (_, const TaskStatus& status, framework->statuses[deadline]) {
-          LOG(WARNING) << "Resending status update"
-                       << " for task " << status.task_id()
-                       << " of framework " << framework->frameworkId;
-          MSG<S2M_STATUS_UPDATE> out;
-          out.mutable_framework_id()->MergeFrom(framework->frameworkId);
-          out.mutable_status()->MergeFrom(status);
-          send(master, out);
+      case S2S_SHUTDOWN: {
+        LOG(INFO) << "Asked to shut down by " << from();
+        unordered_map<FrameworkID, Framework*> frameworksCopy = frameworks;
+        foreachpair (_, Framework *framework, frameworksCopy) {
+          killFramework(framework);
         }
+        return;
+      }
+
+      default: {
+        LOG(ERROR) << "Received unknown message ID " << msgid()
+                   << " from " << from();
+        break;
       }
     }
   }
 }
 
-void Slave::exited()
-{
-  LOG(INFO) << "Process exited: " << from();
 
-  if (from() == master) {
-    LOG(WARNING) << "Master disconnected! "
-                 << "Waiting for a new master to be elected.";
-    // TODO(benh): After so long waiting for a master, commit suicide.
-  }
+Framework * Slave::getFramework(FrameworkID frameworkId)
+{
+  FrameworkMap::iterator it = frameworks.find(frameworkId);
+  if (it == frameworks.end()) return NULL;
+  return it->second;
 }
 
 
-
-
-Framework* Slave::getFramework(const FrameworkID& frameworkId)
+Executor * Slave::getExecutor(FrameworkID frameworkId)
 {
-  if (frameworks.count(frameworkId) > 0) {
-    return frameworks[frameworkId];
-  }
-
-  return NULL;
+  ExecutorMap::iterator it = executors.find(frameworkId);
+  if (it == executors.end()) return NULL;
+  return it->second;
 }
 
 
 // Send any tasks queued up for the given framework to its executor
 // (needed if we received tasks while the executor was starting up)
-void Slave::sendQueuedTasks(Framework* framework, Executor* executor)
+void Slave::sendQueuedTasks(Framework *framework)
 {
-  LOG(INFO) << "Flushing queued tasks for framework "
-            << framework->frameworkId;
-
-  CHECK(executor->pid != UPID());
-
-  foreach (const TaskDescription& task, executor->queuedTasks) {
-    // Add the task to the executor.
-    executor->addTask(task);
-
-    MSG<S2E_RUN_TASK> out;
-    out.mutable_framework()->MergeFrom(framework->info);
-    out.mutable_framework_id()->MergeFrom(framework->frameworkId);
-    out.set_pid(framework->pid);
-    out.mutable_task()->MergeFrom(task);
-    send(executor->pid, out);
+  LOG(INFO) << "Flushing queued tasks for framework " << framework->id;
+  Executor *executor = getExecutor(framework->id);
+  if (!executor) return;
+  foreach(TaskDescription *td, framework->queuedTasks) {
+    send(executor->pid,
+        pack<S2E_RUN_TASK>(td->tid, td->name, td->args, td->params));
+    delete td;
   }
-
-  executor->queuedTasks.clear();
+  framework->queuedTasks.clear();
 }
 
 
 // Kill a framework (including its executor if killExecutor is true).
-void Slave::killFramework(Framework *framework, bool killExecutors)
+void Slave::killFramework(Framework *framework, bool killExecutor)
 {
-  LOG(INFO) << "Cleaning up framework " << framework->frameworkId;
+  LOG(INFO) << "Cleaning up framework " << framework->id;
+
+  // Cancel sending any reliable messages for this framework.
+  foreach (int seq, seqs[framework->id])
+    cancel(seq);
 
-  // Shutdown all executors of this framework.
-  foreachpaircopy (const ExecutorID& executorId, Executor* executor, framework->executors) {
-    if (killExecutors) {
-      LOG(INFO) << "Killing executor '" << executorId
-                << "' of framework " << framework->frameworkId;
+  seqs.erase(framework->id);
 
-      send(executor->pid, S2E_KILL_EXECUTOR);
+  // Remove its allocated resources.
+  framework->resources = Resources();
 
+  // If an executor is running, tell it to exit and kill it.
+  if (Executor *ex = getExecutor(framework->id)) {
+    if (killExecutor) {
+      LOG(INFO) << "Killing executor for framework " << framework->id;
       // TODO(benh): There really isn't ANY time between when an
       // executor gets a S2E_KILL_EXECUTOR message and the isolation
       // module goes and kills it. We should really think about making
       // the semantics of this better.
-
-      isolationModule->killExecutor(framework, executor);
+      send(ex->pid, pack<S2E_KILL_EXECUTOR>());
+      isolationModule->killExecutor(framework);
     }
 
-    framework->destroyExecutor(executorId);
+    LOG(INFO) << "Cleaning up executor for framework " << framework->id;
+    delete ex;
+    executors.erase(framework->id);
   }
 
-  frameworks.erase(framework->frameworkId);
+  frameworks.erase(framework->id);
   delete framework;
 }
 
@@ -712,45 +525,18 @@ void Slave::killFramework(Framework *fra
 // Called by isolation module when an executor process exits
 // TODO(benh): Make this callback be a message so that we can avoid
 // race conditions.
-void Slave::executorExited(const FrameworkID& frameworkId, const ExecutorID& executorId, int result)
+void Slave::executorExited(FrameworkID fid, int status)
 {
-  Framework* framework = getFramework(frameworkId);
-  if (framework != NULL) {
-    Executor* executor = framework->getExecutor(executorId);
-    if (executor != NULL) {
-      LOG(INFO) << "Exited executor '" << executorId
-                << "' of framework " << frameworkId
-                << " with result " << result;
-
-      MSG<S2M_EXITED_EXECUTOR> out;
-      out.mutable_slave_id()->MergeFrom(slaveId);
-      out.mutable_framework_id()->MergeFrom(frameworkId);
-      out.mutable_executor_id()->MergeFrom(executorId);
-      out.set_result(result);
-      send(master, out);
-
-      framework->destroyExecutor(executorId);
-
-      // TODO(benh): When should we kill the presence of an entire
-      // framework on a slave?
-      if (framework->executors.size() == 0) {
-        killFramework(framework);
-      }
-    } else {
-      LOG(WARNING) << "UNKNOWN executor '" << executorId
-                   << "' of framework " << frameworkId
-                   << " has exited with result " << result;
-    }
-  } else {
-    LOG(WARNING) << "UNKNOWN executor '" << executorId
-                 << "' of UNKNOWN framework " << frameworkId
-                 << " has exited with result " << result;
+  if (Framework *f = getFramework(fid)) {
+    LOG(INFO) << "Executor for framework " << fid << " exited "
+              << "with status " << status;
+    send(master, pack<S2M_LOST_EXECUTOR>(id, fid, status));
+    killFramework(f, false);
   }
 };
 
 
-string Slave::getUniqueWorkDirectory(const FrameworkID& frameworkId,
-                                     const ExecutorID& executorId)
+string Slave::getUniqueWorkDirectory(FrameworkID fid)
 {
   string workDir;
   if (conf.contains("work_dir")) {
@@ -762,8 +548,7 @@ string Slave::getUniqueWorkDirectory(con
   }
 
   ostringstream os(std::ios_base::app | std::ios_base::out);
-  os << workDir << "/slave-" << slaveId
-     << "/fw-" << frameworkId << "-" << executorId;
+  os << workDir << "/slave-" << id << "/fw-" << fid;
 
   // Find a unique directory based on the path given by the slave
   // (this is because we might launch multiple executors from the same
@@ -784,7 +569,7 @@ string Slave::getUniqueWorkDirectory(con
 }
 
 
-const Configuration& Slave::getConfiguration()
+const Params& Slave::getConf()
 {
   return conf;
 }

Modified: incubator/mesos/trunk/src/slave/webui.cpp
URL: http://svn.apache.org/viewvc/incubator/mesos/trunk/src/slave/webui.cpp?rev=1132329&r1=1132328&r2=1132329&view=diff
==============================================================================
--- incubator/mesos/trunk/src/slave/webui.cpp (original)
+++ incubator/mesos/trunk/src/slave/webui.cpp Sun Jun  5 09:25:41 2011
@@ -6,29 +6,28 @@
 #include "state.hpp"
 #include "webui.hpp"
 
-#include "configurator/configuration.hpp"
-
 #ifdef MESOS_WEBUI
 
 #include <Python.h>
 
-using process::PID;
-
 using std::string;
 
 
 extern "C" void init_slave();  // Initializer for the Python slave module
 
+namespace {
 
-namespace mesos { namespace internal { namespace slave {
+PID slave;
+string webuiPort;
+string logDir;
+string workDir;
 
-static PID<Slave> slave;
-static string webuiPort;
-static string logDir;
-static string workDir;
+}
+
+namespace mesos { namespace internal { namespace slave {
 
 
-void* runSlaveWebUI(void*)
+void *runSlaveWebUI(void *)
 {
   LOG(INFO) << "Web UI thread started";
   Py_Initialize();
@@ -51,19 +50,19 @@ void* runSlaveWebUI(void*)
 }
 
 
-void startSlaveWebUI(const PID<Slave>& _slave, const Configuration& conf)
+void startSlaveWebUI(const PID &slave, const Params &params)
 {
   // TODO(*): See the note in master/webui.cpp about having to
   // determine default values. These should be set by now and can just
   // be used! For example, what happens when the slave code changes
   // their default location for the work directory, it might not get
   // changed here!
-  webuiPort = conf.get("webui_port", "8081");
-  logDir = conf.get("log_dir", FLAGS_log_dir);
-  if (conf.contains("work_dir")) {
-    workDir = conf.get("work_dir", "");
-  } else if (conf.contains("home")) {
-    workDir = conf.get("home", "") + "/work";
+  webuiPort = params.get("webui_port", "8081");
+  logDir = params.get("log_dir", FLAGS_log_dir);
+  if (params.contains("work_dir")) {
+    workDir = params.get("work_dir", "");
+  } else if (params.contains("home")) {
+    workDir = params.get("home", "") + "/work";
   } else {
     workDir = "work";
   }
@@ -72,7 +71,7 @@ void startSlaveWebUI(const PID<Slave>& _
 
   LOG(INFO) << "Starting slave web UI on port " << webuiPort;
 
-  slave = _slave;
+  ::slave = slave;
   pthread_t thread;
   pthread_create(&thread, 0, runSlaveWebUI, NULL);
 }
@@ -80,15 +79,36 @@ void startSlaveWebUI(const PID<Slave>& _
 
 namespace state {
 
-// From slave_state.hpp.
-SlaveState* get_slave()
+class StateGetter : public MesosProcess
+{
+public:
+  SlaveState *slaveState;
+
+  StateGetter() {}
+  ~StateGetter() {}
+
+  void operator () ()
+  {
+    send(::slave, pack<S2S_GET_STATE>());
+    receive();
+    CHECK(msgid() == S2S_GET_STATE_REPLY);
+    slaveState = unpack<S2S_GET_STATE_REPLY, 0>(body());
+  }
+};
+
+
+// From slave_state.hpp
+SlaveState *get_slave()
 {
-  return process::call(slave, &Slave::getState);
+  StateGetter getter;
+  PID pid = Process::spawn(&getter);
+  Process::wait(pid);
+  return getter.slaveState;
 }
 
-} // namespace state {
+} /* namespace state { */
 
-}}} // namespace mesos { namespace internal { namespace slave {
+}}} /* namespace mesos { namespace internal { namespace slave { */
 
 
-#endif // MESOS_WEBUI
+#endif /* MESOS_WEBUI */

Modified: incubator/mesos/trunk/src/tests/Makefile.in
URL: http://svn.apache.org/viewvc/incubator/mesos/trunk/src/tests/Makefile.in?rev=1132329&r1=1132328&r2=1132329&view=diff
==============================================================================
--- incubator/mesos/trunk/src/tests/Makefile.in (original)
+++ incubator/mesos/trunk/src/tests/Makefile.in Sun Jun  5 09:25:41 2011
@@ -2,8 +2,6 @@
 
 SHELL = '/bin/sh'
 
-SRCDIR = @srcdir@
-INCLUDEDIR = @top_builddir@/include
 BINDIR = @top_builddir@/bin
 LIBDIR = @top_builddir@/lib
 
@@ -20,11 +18,12 @@ WITH_ZOOKEEPER = @WITH_ZOOKEEPER@
 WITH_INCLUDED_ZOOKEEPER = @WITH_INCLUDED_ZOOKEEPER@
 
 LIBPROCESS = third_party/libprocess
+
 LIBEV = $(LIBPROCESS)/third_party/libev-3.8
-BOOST = third_party/boost-1.37.0
-PROTOBUF = third_party/protobuf-2.3.0
+
 GLOG = third_party/glog-0.3.1
 GMOCK = third_party/gmock-1.5.0
+
 ZOOKEEPER = third_party/zookeeper-3.3.1/src/c
 
 # Ensure that we get better debugging info.
@@ -35,17 +34,13 @@ CXXFLAGS += -g
 CFLAGS += -I@srcdir@/.. -I..
 CXXFLAGS += -I@srcdir@/.. -I..
 
-# Add include and build include to CFLAGS and CXXFLAGS.
-CFLAGS += -I@top_srcdir@/include -I$(INCLUDEDIR)
-CXXFLAGS += -I@top_srcdir@/include -I$(INCLUDEDIR)
+# Add include to CFLAGS and CXXFLAGS.
+CFLAGS += -I@top_srcdir@/include
+CXXFLAGS += -I@top_srcdir@/include
 
 # Add boost to CFLAGS and CXXFLAGS.
-CFLAGS += -I@top_srcdir@/$(BOOST)
-CXXFLAGS += -I@top_srcdir@/$(BOOST)
-
-# Add protobuf to include and lib paths.
-CXXFLAGS += -I@top_srcdir@/$(PROTOBUF)/src
-LDFLAGS += -L@top_builddir@/$(PROTOBUF)/src/.libs
+CFLAGS += -I@top_srcdir@/third_party/boost-1.37.0
+CXXFLAGS += -I@top_srcdir@/third_party/boost-1.37.0
 
 # Add libprocess to CFLAGS, CXXFLAGS, and LDFLAGS.
 CFLAGS += -I@top_srcdir@/$(LIBPROCESS)
@@ -69,12 +64,20 @@ endif
 CFLAGS += -MMD -MP
 CXXFLAGS += -MMD -MP
 
-# Add protobuf, glog, gmock, gtest, libev, libprocess, pthread, and dl to LIBS.
-LIBS += -lprotobuf -lglog -lgmock -lgtest -lprocess -lev -lpthread -ldl
+# Add build date to CFLAGS, CXXFLAGS
+CFLAGS += -DBUILD_DATE="\"$$(date '+%Y-%m-%d %H:%M:%S')\""
+CXXFLAGS += -DBUILD_DATE="\"$$(date '+%Y-%m-%d %H:%M:%S')\""
+
+# Add build user to CFLAGS, CXXFLAGS
+CFLAGS += -DBUILD_USER="\"$$USER\""
+CXXFLAGS += -DBUILD_USER="\"$$USER\""
+
+# Add glog, gmock, gtest, libev, libprocess, pthread, and dl to LIBS.
+LIBS += -lglog -lgmock -lgtest -lprocess -lev -lpthread -ldl
 
 # Add ZooKeeper if necessary.
 ifeq ($(WITH_ZOOKEEPER),1)
-  LIBS += -lzookeeper_mt
+  LIBS += -lzookeeper_st
 endif
 
 SCHED_LIB = $(LIBDIR)/libmesos_sched.a
@@ -82,7 +85,7 @@ EXEC_LIB = $(LIBDIR)/libmesos_exec.a
 
 TESTS_OBJ = main.o utils.o master_test.o offer_reply_errors_test.o	\
 	    resources_test.o external_test.o sample_frameworks_test.o	\
-	    configurator_test.o string_utils_test.o multimap_test.o	\
+	    configurator_test.o string_utils_test.o			\
 	    lxc_isolation_test.o
 
 ALLTESTS_EXE = $(BINDIR)/tests/all-tests
@@ -92,6 +95,7 @@ EXTERNAL_SCRIPTS =							\
   $(BINDIR)/tests/external/LxcIsolation/ScaleUpAndDown.sh		\
   $(BINDIR)/tests/external/LxcIsolation/TwoSeparateTasks.sh		\
   $(BINDIR)/tests/external/LxcIsolation/run_scheduled_memhog_test.sh	\
+  $(BINDIR)/tests/external/SampleFrameworks/CFramework.sh		\
   $(BINDIR)/tests/external/SampleFrameworks/CFrameworkCmdlineParsing.sh	\
   $(BINDIR)/tests/external/SampleFrameworks/CFrameworkInvalidCmdline.sh	\
   $(BINDIR)/tests/external/SampleFrameworks/CFrameworkInvalidEnv.sh	\

Modified: incubator/mesos/trunk/src/tests/master_test.cpp
URL: http://svn.apache.org/viewvc/incubator/mesos/trunk/src/tests/master_test.cpp?rev=1132329&r1=1132328&r2=1132329&view=diff
==============================================================================
--- incubator/mesos/trunk/src/tests/master_test.cpp (original)
+++ incubator/mesos/trunk/src/tests/master_test.cpp Sun Jun  5 09:25:41 2011
@@ -5,8 +5,6 @@
 
 #include <boost/lexical_cast.hpp>
 
-#include <detector/detector.hpp>
-
 #include <local/local.hpp>
 
 #include <master/master.hpp>
@@ -24,12 +22,10 @@ using namespace mesos::internal::test;
 using boost::lexical_cast;
 
 using mesos::internal::master::Master;
-
 using mesos::internal::slave::Slave;
+using mesos::internal::slave::Framework;
+using mesos::internal::slave::IsolationModule;
 using mesos::internal::slave::ProcessBasedIsolationModule;
-using mesos::internal::slave::STATUS_UPDATE_RETRY_TIMEOUT;
-
-using process::PID;
 
 using std::string;
 using std::map;
@@ -49,59 +45,42 @@ using testing::Sequence;
 using testing::StrEq;
 
 
-class TestingIsolationModule : public slave::IsolationModule
+class LocalIsolationModule : public IsolationModule
 {
 public:
-  TestingIsolationModule(const map<ExecutorID, Executor*>& _executors)
-    : executors(_executors) {}
+  Executor *executor;
+  MesosExecutorDriver *driver;
+  string pid;
 
-  virtual ~TestingIsolationModule() {}
+  LocalIsolationModule(Executor *_executor)
+    : executor(_executor), driver(NULL) {}
 
-  virtual void initialize(Slave* _slave)
-  {
-    slave = _slave;
-  }
+  virtual ~LocalIsolationModule() {}
 
-  virtual void launchExecutor(slave::Framework* f, slave::Executor* e)
-  {
-    if (executors.count(e->info.executor_id()) > 0) {
-      Executor* executor = executors[e->info.executor_id()];
-      MesosExecutorDriver* driver = new MesosExecutorDriver(executor);
-      drivers[e->info.executor_id()] = driver;
-
-      setenv("MESOS_LOCAL", "1", 1);
-      setenv("MESOS_SLAVE_PID", string(slave->self()).c_str(), 1);
-      setenv("MESOS_FRAMEWORK_ID", f->frameworkId.value().c_str(), 1);
-      setenv("MESOS_EXECUTOR_ID", e->info.executor_id().value().c_str(), 1);
-
-      driver->start();
-
-      unsetenv("MESOS_LOCAL");
-      unsetenv("MESOS_SLAVE_PID");
-      unsetenv("MESOS_FRAMEWORK_ID");
-      unsetenv("MESOS_EXECUTOR_ID");
-    } else {
-      FAIL() << "Cannot launch executor";
-    }
+  virtual void initialize(Slave *slave) {
+    pid = slave->self();
   }
 
-  virtual void killExecutor(slave::Framework* f, slave::Executor* e)
-  {
-    if (drivers.count(e->info.executor_id()) > 0) {
-      MesosExecutorDriver* driver = drivers[e->info.executor_id()];
-      driver->stop();
-      driver->join();
-      delete driver;
-      drivers.erase(e->info.executor_id());
-    } else {
-      FAIL() << "Cannot kill executor";
-    }
+  virtual void startExecutor(Framework *framework) {
+    // TODO(benh): Cleanup the way we launch local drivers!
+    setenv("MESOS_LOCAL", "1", 1);
+    setenv("MESOS_SLAVE_PID", pid.c_str(), 1);
+    setenv("MESOS_FRAMEWORK_ID", framework->id.c_str(), 1);
+
+    driver = new MesosExecutorDriver(executor);
+    driver->start();
   }
 
-private:
-  map<ExecutorID, Executor*> executors;
-  map<ExecutorID, MesosExecutorDriver*> drivers;
-  Slave* slave;
+  virtual void killExecutor(Framework* framework) {
+    driver->stop();
+    driver->join();
+    delete driver;
+
+    // TODO(benh): Cleanup the way we launch local drivers!
+    unsetenv("MESOS_LOCAL");
+    unsetenv("MESOS_SLAVE_PID");
+    unsetenv("MESOS_FRAMEWORK_ID");
+  }
 };
 
 
@@ -109,7 +88,7 @@ TEST(MasterTest, ResourceOfferWithMultip
 {
   ASSERT_TRUE(GTEST_IS_THREADSAFE);
 
-  PID<Master> master = local::launch(10, 2, 1 * Gigabyte, false, false);
+  PID master = local::launch(10, 2, 1 * Gigabyte, false, false);
 
   MockScheduler sched;
   MesosSchedulerDriver driver(&sched, master);
@@ -122,14 +101,13 @@ TEST(MasterTest, ResourceOfferWithMultip
     .WillOnce(Return(""));
 
   EXPECT_CALL(sched, getExecutorInfo(&driver))
-    .WillOnce(Return(DEFAULT_EXECUTOR_INFO));
+    .WillOnce(Return(ExecutorInfo("noexecutor", "")));
 
   EXPECT_CALL(sched, registered(&driver, _))
     .Times(1);
 
   EXPECT_CALL(sched, resourceOffer(&driver, _, _))
-    .WillOnce(DoAll(SaveArg<2>(&offers), Trigger(&resourceOfferCall)))
-    .WillRepeatedly(Return());
+    .WillOnce(DoAll(SaveArg<2>(&offers), Trigger(&resourceOfferCall)));
 
   EXPECT_CALL(sched, offerRescinded(&driver, _))
     .Times(AtMost(1));
@@ -141,9 +119,8 @@ TEST(MasterTest, ResourceOfferWithMultip
   EXPECT_NE(0, offers.size());
   EXPECT_GE(10, offers.size());
 
-  Resources resources(offers[0].resources());
-  EXPECT_EQ(2, resources.getScalar("cpus", Resource::Scalar()).value());
-  EXPECT_EQ(1024, resources.getScalar("mem", Resource::Scalar()).value());
+  EXPECT_EQ("2", offers[0].params["cpus"]);
+  EXPECT_EQ("1024", offers[0].params["mem"]);
 
   driver.stop();
   driver.join();
@@ -156,7 +133,7 @@ TEST(MasterTest, ResourcesReofferedAfter
 {
   ASSERT_TRUE(GTEST_IS_THREADSAFE);
 
-  PID<Master> master = local::launch(1, 2, 1 * Gigabyte, false, false);
+  PID master = local::launch(10, 2, 1 * Gigabyte, false, false);
 
   MockScheduler sched1;
   MesosSchedulerDriver driver1(&sched1, master);
@@ -169,20 +146,19 @@ TEST(MasterTest, ResourcesReofferedAfter
     .WillOnce(Return(""));
 
   EXPECT_CALL(sched1, getExecutorInfo(&driver1))
-    .WillOnce(Return(DEFAULT_EXECUTOR_INFO));
+    .WillOnce(Return(ExecutorInfo("noexecutor", "")));
 
   EXPECT_CALL(sched1, registered(&driver1, _))
     .Times(1);
 
   EXPECT_CALL(sched1, resourceOffer(&driver1, _, _))
-    .WillOnce(DoAll(SaveArg<1>(&offerId), Trigger(&sched1ResourceOfferCall)))
-    .WillRepeatedly(Return());
+    .WillOnce(DoAll(SaveArg<1>(&offerId), Trigger(&sched1ResourceOfferCall)));
 
   driver1.start();
 
   WAIT_UNTIL(sched1ResourceOfferCall);
 
-  driver1.replyToOffer(offerId, vector<TaskDescription>());
+  driver1.replyToOffer(offerId, vector<TaskDescription>(), map<string, string>());
 
   driver1.stop();
   driver1.join();
@@ -196,14 +172,13 @@ TEST(MasterTest, ResourcesReofferedAfter
     .WillOnce(Return(""));
 
   EXPECT_CALL(sched2, getExecutorInfo(&driver2))
-    .WillOnce(Return(DEFAULT_EXECUTOR_INFO));
+    .WillOnce(Return(ExecutorInfo("noexecutor", "")));
 
   EXPECT_CALL(sched2, registered(&driver2, _))
     .Times(1);
 
   EXPECT_CALL(sched2, resourceOffer(&driver2, _, _))
-    .WillOnce(Trigger(&sched2ResourceOfferCall))
-    .WillRepeatedly(Return());
+    .WillOnce(Trigger(&sched2ResourceOfferCall));
 
   EXPECT_CALL(sched2, offerRescinded(&driver2, _))
     .Times(AtMost(1));
@@ -223,7 +198,7 @@ TEST(MasterTest, ResourcesReofferedAfter
 {
   ASSERT_TRUE(GTEST_IS_THREADSAFE);
 
-  PID<Master> master = local::launch(1, 2, 1 * Gigabyte, false, false);
+  PID master = local::launch(1, 2, 1 * Gigabyte, false, false);
 
   MockScheduler sched1;
   MesosSchedulerDriver driver1(&sched1, master);
@@ -237,15 +212,14 @@ TEST(MasterTest, ResourcesReofferedAfter
     .WillOnce(Return(""));
 
   EXPECT_CALL(sched1, getExecutorInfo(&driver1))
-    .WillOnce(Return(DEFAULT_EXECUTOR_INFO));
+    .WillOnce(Return(ExecutorInfo("noexecutor", "")));
 
   EXPECT_CALL(sched1, registered(&driver1, _))
     .Times(1);
 
   EXPECT_CALL(sched1, resourceOffer(&driver1, _, ElementsAre(_)))
     .WillOnce(DoAll(SaveArg<1>(&offerId), SaveArg<2>(&offers),
-                    Trigger(&sched1ResourceOfferCall)))
-    .WillRepeatedly(Return());
+                    Trigger(&sched1ResourceOfferCall)));
 
   driver1.start();
 
@@ -253,34 +227,22 @@ TEST(MasterTest, ResourcesReofferedAfter
 
   EXPECT_NE(0, offers.size());
 
-  TaskDescription task;
-  task.set_name("");
-  task.mutable_task_id()->set_value("1");
-  task.mutable_slave_id()->MergeFrom(offers[0].slave_id());
-
-  Resource* cpus = task.add_resources();
-  cpus->set_name("cpus");
-  cpus->set_type(Resource::SCALAR);
-  cpus->mutable_scalar()->set_value(0);
-
-  Resource* mem = task.add_resources();
-  mem->set_name("mem");
-  mem->set_type(Resource::SCALAR);
-  mem->mutable_scalar()->set_value(1 * Gigabyte);
+  map<string, string> params;
+  params["cpus"] = "0";
+  params["mem"] = lexical_cast<string>(1 * Gigabyte);
 
   vector<TaskDescription> tasks;
-  tasks.push_back(task);
+  tasks.push_back(TaskDescription(1, offers[0].slaveId, "", params, bytes()));
 
   trigger sched1ErrorCall;
 
-  EXPECT_CALL(sched1,
-              error(&driver1, _, "Invalid resources for task"))
+  EXPECT_CALL(sched1, error(&driver1, _, "Invalid task size: <0 CPUs, 1024 MEM>"))
     .WillOnce(Trigger(&sched1ErrorCall));
 
   EXPECT_CALL(sched1, offerRescinded(&driver1, offerId))
     .Times(AtMost(1));
 
-  driver1.replyToOffer(offerId, tasks);
+  driver1.replyToOffer(offerId, tasks, map<string, string>());
 
   WAIT_UNTIL(sched1ErrorCall);
 
@@ -296,14 +258,13 @@ TEST(MasterTest, ResourcesReofferedAfter
     .WillOnce(Return(""));
 
   EXPECT_CALL(sched2, getExecutorInfo(&driver2))
-    .WillOnce(Return(DEFAULT_EXECUTOR_INFO));
+    .WillOnce(Return(ExecutorInfo("noexecutor", "")));
 
   EXPECT_CALL(sched2, registered(&driver2, _))
     .Times(1);
 
   EXPECT_CALL(sched2, resourceOffer(&driver2, _, _))
-    .WillOnce(Trigger(&sched2ResourceOfferCall))
-    .WillRepeatedly(Return());
+    .WillOnce(Trigger(&sched2ResourceOfferCall));
 
   EXPECT_CALL(sched2, offerRescinded(&driver2, _))
     .Times(AtMost(1));
@@ -324,14 +285,11 @@ TEST(MasterTest, SlaveLost)
   ASSERT_TRUE(GTEST_IS_THREADSAFE);
 
   Master m;
-  PID<Master> master = process::spawn(&m);
-
-  Resources resources = Resources::parse("cpus:2;mem:1024");
+  PID master = Process::spawn(&m);
 
   ProcessBasedIsolationModule isolationModule;
-  
-  Slave s(resources, true, &isolationModule);
-  PID<Slave> slave = process::spawn(&s);
+  Slave s(Resources(2, 1 * Gigabyte), true, &isolationModule);
+  PID slave = Process::spawn(&s);
 
   BasicMasterDetector detector(master, slave, true);
 
@@ -347,15 +305,14 @@ TEST(MasterTest, SlaveLost)
     .WillOnce(Return(""));
 
   EXPECT_CALL(sched, getExecutorInfo(&driver))
-    .WillOnce(Return(DEFAULT_EXECUTOR_INFO));
+    .WillOnce(Return(ExecutorInfo("noexecutor", "")));
 
   EXPECT_CALL(sched, registered(&driver, _))
     .Times(1);
 
   EXPECT_CALL(sched, resourceOffer(&driver, _, _))
     .WillOnce(DoAll(SaveArg<1>(&offerId), SaveArg<2>(&offers),
-                    Trigger(&resourceOfferCall)))
-    .WillRepeatedly(Return());
+                    Trigger(&resourceOfferCall)));
 
   driver.start();
 
@@ -368,10 +325,10 @@ TEST(MasterTest, SlaveLost)
   EXPECT_CALL(sched, offerRescinded(&driver, offerId))
     .WillOnce(Trigger(&offerRescindedCall));
 
-  EXPECT_CALL(sched, slaveLost(&driver, offers[0].slave_id()))
+  EXPECT_CALL(sched, slaveLost(&driver, offers[0].slaveId))
     .WillOnce(Trigger(&slaveLostCall));
 
-  process::post(slave, process::TERMINATE);
+  MesosProcess::post(slave, pack<S2S_SHUTDOWN>());
 
   WAIT_UNTIL(offerRescindedCall);
   WAIT_UNTIL(slaveLostCall);
@@ -379,10 +336,10 @@ TEST(MasterTest, SlaveLost)
   driver.stop();
   driver.join();
 
-  process::wait(slave);
+  Process::wait(slave);
 
-  process::post(master, process::TERMINATE);
-  process::wait(master);
+  MesosProcess::post(master, pack<M2M_SHUTDOWN>());
+  Process::wait(master);
 }
 
 
@@ -390,7 +347,7 @@ TEST(MasterTest, SchedulerFailover)
 {
   ASSERT_TRUE(GTEST_IS_THREADSAFE);
 
-  PID<Master> master = local::launch(1, 2, 1 * Gigabyte, false, false);
+  PID master = local::launch(1, 2, 1 * Gigabyte, false, false);
 
   // Launch the first (i.e., failing) scheduler and wait until
   // registered gets called to launch the second (i.e., failover)
@@ -407,13 +364,13 @@ TEST(MasterTest, SchedulerFailover)
     .WillOnce(Return(""));
 
   EXPECT_CALL(sched1, getExecutorInfo(&driver1))
-    .WillOnce(Return(DEFAULT_EXECUTOR_INFO));
+    .WillOnce(Return(ExecutorInfo("noexecutor", "")));
 
   EXPECT_CALL(sched1, registered(&driver1, _))
     .WillOnce(DoAll(SaveArg<1>(&frameworkId), Trigger(&sched1RegisteredCall)));
 
   EXPECT_CALL(sched1, resourceOffer(&driver1, _, _))
-    .WillRepeatedly(Return());
+    .Times(AtMost(1));
 
   EXPECT_CALL(sched1, offerRescinded(&driver1, _))
     .Times(AtMost(1));
@@ -438,13 +395,13 @@ TEST(MasterTest, SchedulerFailover)
     .WillOnce(Return(""));
 
   EXPECT_CALL(sched2, getExecutorInfo(&driver2))
-    .WillOnce(Return(DEFAULT_EXECUTOR_INFO));
+    .WillOnce(Return(ExecutorInfo("noexecutor", "")));
 
   EXPECT_CALL(sched2, registered(&driver2, frameworkId))
     .WillOnce(Trigger(&sched2RegisteredCall));
 
   EXPECT_CALL(sched2, resourceOffer(&driver2, _, _))
-    .WillRepeatedly(Return());
+    .Times(AtMost(1));
 
   EXPECT_CALL(sched2, offerRescinded(&driver2, _))
     .Times(AtMost(1));
@@ -467,15 +424,15 @@ TEST(MasterTest, SlavePartitioned)
 {
   ASSERT_TRUE(GTEST_IS_THREADSAFE);
 
-  process::Clock::pause();
+  Clock::pause();
 
   MockFilter filter;
-  process::filter(&filter);
+  Process::filter(&filter);
 
   EXPECT_MSG(filter, _, _, _)
     .WillRepeatedly(Return(false));
 
-  PID<Master> master = local::launch(1, 2, 1 * Gigabyte, false, false);
+  PID master = local::launch(1, 2, 1 * Gigabyte, false, false);
 
   MockScheduler sched;
   MesosSchedulerDriver driver(&sched, master);
@@ -486,13 +443,13 @@ TEST(MasterTest, SlavePartitioned)
     .WillOnce(Return(""));
 
   EXPECT_CALL(sched, getExecutorInfo(&driver))
-    .WillOnce(Return(DEFAULT_EXECUTOR_INFO));
+    .WillOnce(Return(ExecutorInfo("noexecutor", "")));
 
   EXPECT_CALL(sched, registered(&driver, _))
     .Times(1);
 
   EXPECT_CALL(sched, resourceOffer(&driver, _, _))
-    .WillRepeatedly(Return());
+    .Times(AtMost(1));
 
   EXPECT_CALL(sched, offerRescinded(&driver, _))
     .Times(AtMost(1));
@@ -500,14 +457,12 @@ TEST(MasterTest, SlavePartitioned)
   EXPECT_CALL(sched, slaveLost(&driver, _))
     .WillOnce(Trigger(&slaveLostCall));
 
-  EXPECT_MSG(filter, Eq(PONG), _, _)
+  EXPECT_MSG(filter, Eq(SH2M_HEARTBEAT), _, _)
     .WillRepeatedly(Return(true));
 
   driver.start();
 
-  double secs = master::SLAVE_PONG_TIMEOUT * master::MAX_SLAVE_TIMEOUTS;
-
-  process::Clock::advance(secs);
+  Clock::advance(master::HEARTBEAT_TIMEOUT);
 
   WAIT_UNTIL(slaveLostCall);
 
@@ -516,9 +471,9 @@ TEST(MasterTest, SlavePartitioned)
 
   local::shutdown();
 
-  process::filter(NULL);
+  Process::filter(NULL);
 
-  process::Clock::resume();
+  Clock::resume();
 }
 
 
@@ -527,9 +482,7 @@ TEST(MasterTest, TaskRunning)
   ASSERT_TRUE(GTEST_IS_THREADSAFE);
 
   Master m;
-  PID<Master> master = process::spawn(&m);
-
-  Resources resources = Resources::parse("cpus:2;mem:1024");
+  PID master = Process::spawn(&m);
 
   MockExecutor exec;
 
@@ -542,13 +495,10 @@ TEST(MasterTest, TaskRunning)
   EXPECT_CALL(exec, shutdown(_))
     .Times(1);
 
-  map<ExecutorID, Executor*> execs;
-  execs[DEFAULT_EXECUTOR_ID] = &exec;
-
-  TestingIsolationModule isolationModule(execs);
+  LocalIsolationModule isolationModule(&exec);
 
-  Slave s(resources, true, &isolationModule);
-  PID<Slave> slave = process::spawn(&s);
+  Slave s(Resources(2, 1 * Gigabyte), true, &isolationModule);
+  PID slave = Process::spawn(&s);
 
   BasicMasterDetector detector(master, slave, true);
 
@@ -565,15 +515,14 @@ TEST(MasterTest, TaskRunning)
     .WillOnce(Return(""));
 
   EXPECT_CALL(sched, getExecutorInfo(&driver))
-    .WillOnce(Return(DEFAULT_EXECUTOR_INFO));
+    .WillOnce(Return(ExecutorInfo("noexecutor", "")));
 
   EXPECT_CALL(sched, registered(&driver, _))
     .Times(1);
 
   EXPECT_CALL(sched, resourceOffer(&driver, _, _))
     .WillOnce(DoAll(SaveArg<1>(&offerId), SaveArg<2>(&offers),
-                    Trigger(&resourceOfferCall)))
-    .WillRepeatedly(Return());
+                    Trigger(&resourceOfferCall)));
 
   EXPECT_CALL(sched, statusUpdate(&driver, _))
     .WillOnce(DoAll(SaveArg<1>(&status), Trigger(&statusUpdateCall)));
@@ -584,129 +533,23 @@ TEST(MasterTest, TaskRunning)
 
   EXPECT_NE(0, offers.size());
 
-  TaskDescription task;
-  task.set_name("");
-  task.mutable_task_id()->set_value("1");
-  task.mutable_slave_id()->MergeFrom(offers[0].slave_id());
-  task.mutable_resources()->MergeFrom(offers[0].resources());
-
   vector<TaskDescription> tasks;
-  tasks.push_back(task);
+  tasks.push_back(TaskDescription(1, offers[0].slaveId, "", offers[0].params, ""));
 
-  driver.replyToOffer(offerId, tasks);
+  driver.replyToOffer(offerId, tasks, map<string, string>());
 
   WAIT_UNTIL(statusUpdateCall);
 
-  EXPECT_EQ(TASK_RUNNING, status.state());
+  EXPECT_EQ(TASK_RUNNING, status.state);
 
   driver.stop();
   driver.join();
 
-  process::post(slave, process::TERMINATE);
-  process::wait(slave);
+  MesosProcess::post(slave, pack<S2S_SHUTDOWN>());
+  Process::wait(slave);
 
-  process::post(master, process::TERMINATE);
-  process::wait(master);
-}
-
-
-TEST(MasterTest, KillTask)
-{
-  ASSERT_TRUE(GTEST_IS_THREADSAFE);
-
-  Master m;
-  PID<Master> master = process::spawn(&m);
-
-  Resources resources = Resources::parse("cpus:2;mem:1024");
-
-  MockExecutor exec;
-
-  trigger killTaskCall;
-
-  EXPECT_CALL(exec, init(_, _))
-    .Times(1);
-
-  EXPECT_CALL(exec, launchTask(_, _))
-    .Times(1);
-
-  EXPECT_CALL(exec, killTask(_, _))
-    .WillOnce(Trigger(&killTaskCall));
-
-  EXPECT_CALL(exec, shutdown(_))
-    .Times(1);
-
-  map<ExecutorID, Executor*> execs;
-  execs[DEFAULT_EXECUTOR_ID] = &exec;
-
-  TestingIsolationModule isolationModule(execs);
-
-  Slave s(resources, true, &isolationModule);
-  PID<Slave> slave = process::spawn(&s);
-
-  BasicMasterDetector detector(master, slave, true);
-
-  MockScheduler sched;
-  MesosSchedulerDriver driver(&sched, master);
-
-  OfferID offerId;
-  vector<SlaveOffer> offers;
-  TaskStatus status;
-
-  trigger resourceOfferCall, statusUpdateCall;
-
-  EXPECT_CALL(sched, getFrameworkName(&driver))
-    .WillOnce(Return(""));
-
-  EXPECT_CALL(sched, getExecutorInfo(&driver))
-    .WillOnce(Return(DEFAULT_EXECUTOR_INFO));
-
-  EXPECT_CALL(sched, registered(&driver, _))
-    .Times(1);
-
-  EXPECT_CALL(sched, resourceOffer(&driver, _, _))
-    .WillOnce(DoAll(SaveArg<1>(&offerId), SaveArg<2>(&offers),
-                    Trigger(&resourceOfferCall)))
-    .WillRepeatedly(Return());
-
-  EXPECT_CALL(sched, statusUpdate(&driver, _))
-    .WillOnce(DoAll(SaveArg<1>(&status), Trigger(&statusUpdateCall)));
-
-  driver.start();
-
-  WAIT_UNTIL(resourceOfferCall);
-
-  EXPECT_NE(0, offers.size());
-
-  TaskID taskId;
-  taskId.set_value("1");
-
-  TaskDescription task;
-  task.set_name("");
-  task.mutable_task_id()->MergeFrom(taskId);
-  task.mutable_slave_id()->MergeFrom(offers[0].slave_id());
-  task.mutable_resources()->MergeFrom(offers[0].resources());
-
-  vector<TaskDescription> tasks;
-  tasks.push_back(task);
-
-  driver.replyToOffer(offerId, tasks);
-
-  WAIT_UNTIL(statusUpdateCall);
-
-  EXPECT_EQ(TASK_RUNNING, status.state());
-
-  driver.killTask(taskId);
-
-  WAIT_UNTIL(killTaskCall);
-
-  driver.stop();
-  driver.join();
-
-  process::post(slave, process::TERMINATE);
-  process::wait(slave);
-
-  process::post(master, process::TERMINATE);
-  process::wait(master);
+  MesosProcess::post(master, pack<M2M_SHUTDOWN>());
+  Process::wait(master);
 }
 
 
@@ -714,19 +557,14 @@ TEST(MasterTest, SchedulerFailoverStatus
 {
   ASSERT_TRUE(GTEST_IS_THREADSAFE);
 
-  process::Clock::pause();
+  Clock::pause();
 
   MockFilter filter;
-  process::filter(&filter);
+  Process::filter(&filter);
 
   EXPECT_MSG(filter, _, _, _)
     .WillRepeatedly(Return(false));
 
-  Master m;
-  PID<Master> master = process::spawn(&m);
-
-  Resources resources = Resources::parse("cpus:2;mem:1024");
-
   MockExecutor exec;
 
   EXPECT_CALL(exec, init(_, _))
@@ -738,13 +576,13 @@ TEST(MasterTest, SchedulerFailoverStatus
   EXPECT_CALL(exec, shutdown(_))
     .Times(1);
 
-  map<ExecutorID, Executor*> execs;
-  execs[DEFAULT_EXECUTOR_ID] = &exec;
+  LocalIsolationModule isolationModule(&exec);
 
-  TestingIsolationModule isolationModule(execs);
+  Master m;
+  PID master = Process::spawn(&m);
 
-  Slave s(resources, true, &isolationModule);
-  PID<Slave> slave = process::spawn(&s);
+  Slave s(Resources(2, 1 * Gigabyte), true, &isolationModule);
+  PID slave = Process::spawn(&s);
 
   BasicMasterDetector detector(master, slave, true);
 
@@ -764,15 +602,14 @@ TEST(MasterTest, SchedulerFailoverStatus
     .WillOnce(Return(""));
 
   EXPECT_CALL(sched1, getExecutorInfo(&driver1))
-    .WillOnce(Return(DEFAULT_EXECUTOR_INFO));
+    .WillOnce(Return(ExecutorInfo("noexecutor", "")));
 
   EXPECT_CALL(sched1, registered(&driver1, _))
     .WillOnce(SaveArg<1>(&frameworkId));
 
   EXPECT_CALL(sched1, resourceOffer(&driver1, _, _))
     .WillOnce(DoAll(SaveArg<1>(&offerId), SaveArg<2>(&offers),
-                    Trigger(&resourceOfferCall)))
-    .WillRepeatedly(Return());
+                    Trigger(&resourceOfferCall)));
 
   EXPECT_CALL(sched1, statusUpdate(&driver1, _))
     .Times(0);
@@ -790,16 +627,10 @@ TEST(MasterTest, SchedulerFailoverStatus
 
   EXPECT_NE(0, offers.size());
 
-  TaskDescription task;
-  task.set_name("");
-  task.mutable_task_id()->set_value("1");
-  task.mutable_slave_id()->MergeFrom(offers[0].slave_id());
-  task.mutable_resources()->MergeFrom(offers[0].resources());
-
   vector<TaskDescription> tasks;
-  tasks.push_back(task);
+  tasks.push_back(TaskDescription(1, offers[0].slaveId, "", offers[0].params, ""));
 
-  driver1.replyToOffer(offerId, tasks);
+  driver1.replyToOffer(offerId, tasks, map<string, string>());
 
   WAIT_UNTIL(statusUpdateMsg);
 
@@ -817,7 +648,7 @@ TEST(MasterTest, SchedulerFailoverStatus
     .WillOnce(Return(""));
 
   EXPECT_CALL(sched2, getExecutorInfo(&driver2))
-    .WillOnce(Return(DEFAULT_EXECUTOR_INFO));
+    .WillOnce(Return(ExecutorInfo("noexecutor", "")));
 
   EXPECT_CALL(sched2, registered(&driver2, frameworkId))
     .WillOnce(Trigger(&registeredCall));
@@ -829,7 +660,7 @@ TEST(MasterTest, SchedulerFailoverStatus
 
   WAIT_UNTIL(registeredCall);
 
-  process::Clock::advance(STATUS_UPDATE_RETRY_TIMEOUT);
+  Clock::advance(RELIABLE_TIMEOUT);
 
   WAIT_UNTIL(statusUpdateCall);
 
@@ -839,15 +670,15 @@ TEST(MasterTest, SchedulerFailoverStatus
   driver1.join();
   driver2.join();
 
-  process::post(slave, process::TERMINATE);
-  process::wait(slave);
+  MesosProcess::post(slave, pack<S2S_SHUTDOWN>());
+  Process::wait(slave);
 
-  process::post(master, process::TERMINATE);
-  process::wait(master);
+  MesosProcess::post(master, pack<M2M_SHUTDOWN>());
+  Process::wait(master);
 
-  process::filter(NULL);
+  Process::filter(NULL);
 
-  process::Clock::resume();
+  Clock::resume();
 }
 
 
@@ -855,16 +686,11 @@ TEST(MasterTest, FrameworkMessage)
 {
   ASSERT_TRUE(GTEST_IS_THREADSAFE);
 
-  Master m;
-  PID<Master> master = process::spawn(&m);
-
-  Resources resources = Resources::parse("cpus:2;mem:1024");
-
   MockExecutor exec;
 
-  ExecutorDriver* execDriver;
+  ExecutorDriver *execDriver;
   ExecutorArgs args;
-  string execData;
+  FrameworkMessage execMessage;
 
   trigger execFrameworkMessageCall;
 
@@ -875,19 +701,19 @@ TEST(MasterTest, FrameworkMessage)
     .Times(1);
 
   EXPECT_CALL(exec, frameworkMessage(_, _))
-    .WillOnce(DoAll(SaveArg<1>(&execData),
+    .WillOnce(DoAll(SaveArg<1>(&execMessage),
                     Trigger(&execFrameworkMessageCall)));
 
   EXPECT_CALL(exec, shutdown(_))
     .Times(1);
 
-  map<ExecutorID, Executor*> execs;
-  execs[DEFAULT_EXECUTOR_ID] = &exec;
+  LocalIsolationModule isolationModule(&exec);
 
-  TestingIsolationModule isolationModule(execs);
+  Master m;
+  PID master = Process::spawn(&m);
 
-  Slave s(resources, true, &isolationModule);
-  PID<Slave> slave = process::spawn(&s);
+  Slave s(Resources(2, 1 * Gigabyte), true, &isolationModule);
+  PID slave = Process::spawn(&s);
 
   BasicMasterDetector detector(master, slave, true);
 
@@ -900,7 +726,7 @@ TEST(MasterTest, FrameworkMessage)
   OfferID offerId;
   vector<SlaveOffer> offers;
   TaskStatus status;
-  string schedData;
+  FrameworkMessage schedMessage;
 
   trigger resourceOfferCall, statusUpdateCall, schedFrameworkMessageCall;
 
@@ -908,21 +734,20 @@ TEST(MasterTest, FrameworkMessage)
     .WillOnce(Return(""));
 
   EXPECT_CALL(sched, getExecutorInfo(&schedDriver))
-    .WillOnce(Return(DEFAULT_EXECUTOR_INFO));
+    .WillOnce(Return(ExecutorInfo("noexecutor", "")));
 
   EXPECT_CALL(sched, registered(&schedDriver, _))
     .Times(1);
 
   EXPECT_CALL(sched, resourceOffer(&schedDriver, _, _))
     .WillOnce(DoAll(SaveArg<1>(&offerId), SaveArg<2>(&offers),
-                    Trigger(&resourceOfferCall)))
-    .WillRepeatedly(Return());
+                    Trigger(&resourceOfferCall)));
 
   EXPECT_CALL(sched, statusUpdate(&schedDriver, _))
     .WillOnce(DoAll(SaveArg<1>(&status), Trigger(&statusUpdateCall)));
 
-  EXPECT_CALL(sched, frameworkMessage(&schedDriver, _, _, _))
-    .WillOnce(DoAll(SaveArg<3>(&schedData),
+  EXPECT_CALL(sched, frameworkMessage(&schedDriver, _))
+    .WillOnce(DoAll(SaveArg<1>(&schedMessage),
                     Trigger(&schedFrameworkMessageCall)));
 
   schedDriver.start();
@@ -931,47 +756,37 @@ TEST(MasterTest, FrameworkMessage)
 
   EXPECT_NE(0, offers.size());
 
-  TaskDescription task;
-  task.set_name("");
-  task.mutable_task_id()->set_value("1");
-  task.mutable_slave_id()->MergeFrom(offers[0].slave_id());
-  task.mutable_resources()->MergeFrom(offers[0].resources());
-
   vector<TaskDescription> tasks;
-  tasks.push_back(task);
+  tasks.push_back(TaskDescription(1, offers[0].slaveId, "", offers[0].params, ""));
 
-  schedDriver.replyToOffer(offerId, tasks);
+  schedDriver.replyToOffer(offerId, tasks, map<string, string>());
 
   WAIT_UNTIL(statusUpdateCall);
 
-  EXPECT_EQ(TASK_RUNNING, status.state());
+  EXPECT_EQ(TASK_RUNNING, status.state);
 
-  string hello = "hello";
-
-  schedDriver.sendFrameworkMessage(offers[0].slave_id(),
-				   DEFAULT_EXECUTOR_ID,
-				   hello);
+  FrameworkMessage hello(offers[0].slaveId, 1, "hello");
+  schedDriver.sendFrameworkMessage(hello);
 
   WAIT_UNTIL(execFrameworkMessageCall);
 
-  EXPECT_EQ(hello, execData);
-
-  string reply = "reply";
+  EXPECT_EQ("hello", execMessage.data);
 
+  FrameworkMessage reply(args.slaveId, 1, "reply");
   execDriver->sendFrameworkMessage(reply);
 
   WAIT_UNTIL(schedFrameworkMessageCall);
 
-  EXPECT_EQ(reply, schedData);
+  EXPECT_EQ("reply", schedMessage.data);
 
   schedDriver.stop();
   schedDriver.join();
 
-  process::post(slave, process::TERMINATE);
-  process::wait(slave);
+  MesosProcess::post(slave, pack<S2S_SHUTDOWN>());
+  Process::wait(slave);
 
-  process::post(master, process::TERMINATE);
-  process::wait(master);
+  MesosProcess::post(master, pack<M2M_SHUTDOWN>());
+  Process::wait(master);
 }
 
 
@@ -979,14 +794,9 @@ TEST(MasterTest, SchedulerFailoverFramew
 {
   ASSERT_TRUE(GTEST_IS_THREADSAFE);
 
-  Master m;
-  PID<Master> master = process::spawn(&m);
-
-  Resources resources = Resources::parse("cpus:2;mem:1024");
-
   MockExecutor exec;
 
-  ExecutorDriver* execDriver;
+  ExecutorDriver *execDriver;
 
   EXPECT_CALL(exec, init(_, _))
     .WillOnce(SaveArg<0>(&execDriver));
@@ -997,13 +807,13 @@ TEST(MasterTest, SchedulerFailoverFramew
   EXPECT_CALL(exec, shutdown(_))
     .Times(1);
 
-  map<ExecutorID, Executor*> execs;
-  execs[DEFAULT_EXECUTOR_ID] = &exec;
+  LocalIsolationModule isolationModule(&exec);
 
-  TestingIsolationModule isolationModule(execs);
+  Master m;
+  PID master = Process::spawn(&m);
 
-  Slave s(resources, true, &isolationModule);
-  PID<Slave> slave = process::spawn(&s);
+  Slave s(Resources(2, 1 * Gigabyte), true, &isolationModule);
+  PID slave = Process::spawn(&s);
 
   BasicMasterDetector detector(master, slave, true);
 
@@ -1021,7 +831,7 @@ TEST(MasterTest, SchedulerFailoverFramew
     .WillOnce(Return(""));
 
   EXPECT_CALL(sched1, getExecutorInfo(&driver1))
-    .WillOnce(Return(DEFAULT_EXECUTOR_INFO));
+    .WillOnce(Return(ExecutorInfo("noexecutor", "")));
 
   EXPECT_CALL(sched1, registered(&driver1, _))
     .WillOnce(SaveArg<1>(&frameworkId));
@@ -1031,8 +841,7 @@ TEST(MasterTest, SchedulerFailoverFramew
 
   EXPECT_CALL(sched1, resourceOffer(&driver1, _, ElementsAre(_)))
     .WillOnce(DoAll(SaveArg<1>(&offerId), SaveArg<2>(&offers),
-                    Trigger(&sched1ResourceOfferCall)))
-    .WillRepeatedly(Return());
+                    Trigger(&sched1ResourceOfferCall)));
 
   EXPECT_CALL(sched1, error(&driver1, _, "Framework failover"))
     .Times(1);
@@ -1043,20 +852,14 @@ TEST(MasterTest, SchedulerFailoverFramew
 
   EXPECT_NE(0, offers.size());
 
-  TaskDescription task;
-  task.set_name("");
-  task.mutable_task_id()->set_value("1");
-  task.mutable_slave_id()->MergeFrom(offers[0].slave_id());
-  task.mutable_resources()->MergeFrom(offers[0].resources());
-
   vector<TaskDescription> tasks;
-  tasks.push_back(task);
+  tasks.push_back(TaskDescription(1, offers[0].slaveId, "", offers[0].params, ""));
 
-  driver1.replyToOffer(offerId, tasks);
+  driver1.replyToOffer(offerId, tasks, map<string, string>());
 
   WAIT_UNTIL(sched1StatusUpdateCall);
 
-  EXPECT_EQ(TASK_RUNNING, status.state());
+  EXPECT_EQ(TASK_RUNNING, status.state);
 
   MockScheduler sched2;
   MesosSchedulerDriver driver2(&sched2, master, frameworkId);
@@ -1067,19 +870,19 @@ TEST(MasterTest, SchedulerFailoverFramew
     .WillOnce(Return(""));
 
   EXPECT_CALL(sched2, getExecutorInfo(&driver2))
-    .WillOnce(Return(DEFAULT_EXECUTOR_INFO));
+    .WillOnce(Return(ExecutorInfo("noexecutor", "")));
 
   EXPECT_CALL(sched2, registered(&driver2, frameworkId))
     .WillOnce(Trigger(&sched2RegisteredCall));
 
-  EXPECT_CALL(sched2, frameworkMessage(&driver2, _, _, _))
+  EXPECT_CALL(sched2, frameworkMessage(&driver2, _))
     .WillOnce(Trigger(&sched2FrameworkMessageCall));
 
   driver2.start();
 
   WAIT_UNTIL(sched2RegisteredCall);
 
-  execDriver->sendFrameworkMessage("");
+  execDriver->sendFrameworkMessage(FrameworkMessage());
 
   WAIT_UNTIL(sched2FrameworkMessageCall);
 
@@ -1089,145 +892,9 @@ TEST(MasterTest, SchedulerFailoverFramew
   driver1.join();
   driver2.join();
 
-  process::post(slave, process::TERMINATE);
-  process::wait(slave);
-
-  process::post(master, process::TERMINATE);
-  process::wait(master);
-}
-
-
-TEST(MasterTest, MultipleExecutors)
-{
-  ASSERT_TRUE(GTEST_IS_THREADSAFE);
-
-  Master m;
-  PID<Master> master = process::spawn(&m);
-
-  Resources resources = Resources::parse("cpus:2;mem:1024");
-
-  MockExecutor exec1;
-  TaskDescription exec1Task;
-  trigger exec1LaunchTaskCall;
-
-  EXPECT_CALL(exec1, init(_, _))
-    .Times(1);
-
-  EXPECT_CALL(exec1, launchTask(_, _))
-    .WillOnce(DoAll(SaveArg<1>(&exec1Task),
-                    Trigger(&exec1LaunchTaskCall)));
-
-  EXPECT_CALL(exec1, shutdown(_))
-    .Times(1);
-
-  MockExecutor exec2;
-  TaskDescription exec2Task;
-  trigger exec2LaunchTaskCall;
-
-  EXPECT_CALL(exec2, init(_, _))
-    .Times(1);
-
-  EXPECT_CALL(exec2, launchTask(_, _))
-    .WillOnce(DoAll(SaveArg<1>(&exec2Task),
-                    Trigger(&exec2LaunchTaskCall)));
-
-  EXPECT_CALL(exec2, shutdown(_))
-    .Times(1);
-
-  ExecutorID executorId1;
-  executorId1.set_value("executor-1");
-
-  ExecutorID executorId2;
-  executorId2.set_value("executor-2");
-
-  map<ExecutorID, Executor*> execs;
-  execs[executorId1] = &exec1;
-  execs[executorId2] = &exec2;
-
-  TestingIsolationModule isolationModule(execs);
-
-  Slave s(resources, true, &isolationModule);
-  PID<Slave> slave = process::spawn(&s);
-
-  BasicMasterDetector detector(master, slave, true);
-
-  MockScheduler sched;
-  MesosSchedulerDriver driver(&sched, master);
-
-  OfferID offerId;
-  vector<SlaveOffer> offers;
-  TaskStatus status1, status2;
-
-  trigger resourceOfferCall, statusUpdateCall1, statusUpdateCall2;
-
-  EXPECT_CALL(sched, getFrameworkName(&driver))
-    .WillOnce(Return(""));
-
-  EXPECT_CALL(sched, getExecutorInfo(&driver))
-    .WillOnce(Return(DEFAULT_EXECUTOR_INFO));
-
-  EXPECT_CALL(sched, registered(&driver, _))
-    .Times(1);
-
-  EXPECT_CALL(sched, resourceOffer(&driver, _, _))
-    .WillOnce(DoAll(SaveArg<1>(&offerId), SaveArg<2>(&offers),
-                    Trigger(&resourceOfferCall)))
-    .WillRepeatedly(Return());
-
-  EXPECT_CALL(sched, statusUpdate(&driver, _))
-    .WillOnce(DoAll(SaveArg<1>(&status1), Trigger(&statusUpdateCall1)))
-    .WillOnce(DoAll(SaveArg<1>(&status2), Trigger(&statusUpdateCall2)));
-
-  driver.start();
-
-  WAIT_UNTIL(resourceOfferCall);
-
-  ASSERT_NE(0, offers.size());
-
-  TaskDescription task1;
-  task1.set_name("");
-  task1.mutable_task_id()->set_value("1");
-  task1.mutable_slave_id()->MergeFrom(offers[0].slave_id());
-  task1.mutable_resources()->MergeFrom(Resources::parse("cpus:1;mem:512"));
-  task1.mutable_executor()->mutable_executor_id()->MergeFrom(executorId1);
-  task1.mutable_executor()->set_uri("noexecutor");
-
-  TaskDescription task2;
-  task2.set_name("");
-  task2.mutable_task_id()->set_value("2");
-  task2.mutable_slave_id()->MergeFrom(offers[0].slave_id());
-  task2.mutable_resources()->MergeFrom(Resources::parse("cpus:1;mem:512"));
-  task2.mutable_executor()->mutable_executor_id()->MergeFrom(executorId2);
-  task2.mutable_executor()->set_uri("noexecutor");
-
-  vector<TaskDescription> tasks;
-  tasks.push_back(task1);
-  tasks.push_back(task2);
-
-  driver.replyToOffer(offerId, tasks);
-
-  WAIT_UNTIL(statusUpdateCall1);
-
-  EXPECT_EQ(TASK_RUNNING, status1.state());
-
-  WAIT_UNTIL(statusUpdateCall2);
-
-  EXPECT_EQ(TASK_RUNNING, status2.state());
-
-  WAIT_UNTIL(exec1LaunchTaskCall);
-
-  EXPECT_EQ(task1.task_id(), exec1Task.task_id());
-
-  WAIT_UNTIL(exec2LaunchTaskCall);
-
-  EXPECT_EQ(task2.task_id(), exec2Task.task_id());
-
-  driver.stop();
-  driver.join();
-
-  process::post(slave, process::TERMINATE);
-  process::wait(slave);
+  MesosProcess::post(slave, pack<S2S_SHUTDOWN>());
+  Process::wait(slave);
 
-  process::post(master, process::TERMINATE);
-  process::wait(master);
+  MesosProcess::post(master, pack<M2M_SHUTDOWN>());
+  Process::wait(master);
 }