You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@mesos.apache.org by be...@apache.org on 2012/01/27 02:25:15 UTC

svn commit: r1236485 [2/7] - in /incubator/mesos/trunk: ./ include/mesos/ src/common/ src/exec/ src/local/ src/log/ src/master/ src/python/native/ src/sched/ src/slave/ src/tests/ src/zookeeper/ third_party/libprocess/ third_party/libprocess/include/pr...

Modified: incubator/mesos/trunk/src/master/slaves_manager.cpp
URL: http://svn.apache.org/viewvc/incubator/mesos/trunk/src/master/slaves_manager.cpp?rev=1236485&r1=1236484&r2=1236485&view=diff
==============================================================================
--- incubator/mesos/trunk/src/master/slaves_manager.cpp (original)
+++ incubator/mesos/trunk/src/master/slaves_manager.cpp Fri Jan 27 01:25:13 2012
@@ -42,6 +42,7 @@ using namespace mesos::internal::master;
 using boost::bad_lexical_cast;
 using boost::lexical_cast;
 
+using process::Future;
 using process::HttpInternalServerErrorResponse;
 using process::HttpNotFoundResponse;
 using process::HttpOKResponse;
@@ -49,7 +50,6 @@ using process::HttpResponse;
 using process::HttpRequest;
 using process::PID;
 using process::Process;
-using process::Promise;
 using process::UPID;
 
 using std::ostringstream;
@@ -71,16 +71,16 @@ public:
 
   virtual ~ZooKeeperSlavesManagerStorage();
 
-  virtual Promise<bool> add(const string& hostname, uint16_t port);
-  virtual Promise<bool> remove(const string& hostname, uint16_t port);
-  virtual Promise<bool> activate(const string& hostname, uint16_t port);
-  virtual Promise<bool> deactivate(const string& hostname, uint16_t port);
-
-  Promise<bool> connected();
-  Promise<bool> reconnecting();
-  Promise<bool> reconnected();
-  Promise<bool> expired();
-  Promise<bool> updated(const string& path);
+  virtual Future<bool> add(const string& hostname, uint16_t port);
+  virtual Future<bool> remove(const string& hostname, uint16_t port);
+  virtual Future<bool> activate(const string& hostname, uint16_t port);
+  virtual Future<bool> deactivate(const string& hostname, uint16_t port);
+
+  Future<bool> connected();
+  Future<bool> reconnecting();
+  Future<bool> reconnected();
+  Future<bool> expired();
+  Future<bool> updated(const string& path);
 
 private:
   bool parse(const string& key,
@@ -159,7 +159,7 @@ ZooKeeperSlavesManagerStorage::~ZooKeepe
 }
 
 
-Promise<bool> ZooKeeperSlavesManagerStorage::add(const string& hostname, uint16_t port)
+Future<bool> ZooKeeperSlavesManagerStorage::add(const string& hostname, uint16_t port)
 {
   // TODO(benh): Use ZooKeeperSlavesManagerStorage::parse to clean up code.
   int ret;
@@ -214,7 +214,7 @@ Promise<bool> ZooKeeperSlavesManagerStor
 }
 
 
-Promise<bool> ZooKeeperSlavesManagerStorage::remove(const string& hostname, uint16_t port)
+Future<bool> ZooKeeperSlavesManagerStorage::remove(const string& hostname, uint16_t port)
 {
   // TODO(benh): Use ZooKeeperSlavesManagerStorage::parse to clean up code.
   int ret;
@@ -269,7 +269,7 @@ Promise<bool> ZooKeeperSlavesManagerStor
 }
 
 
-Promise<bool> ZooKeeperSlavesManagerStorage::activate(const string& hostname, uint16_t port)
+Future<bool> ZooKeeperSlavesManagerStorage::activate(const string& hostname, uint16_t port)
 {
   // TODO(benh): Use ZooKeeperSlavesManagerStorage::parse to clean up code.
   int ret;
@@ -350,7 +350,7 @@ Promise<bool> ZooKeeperSlavesManagerStor
 }
 
 
-Promise<bool> ZooKeeperSlavesManagerStorage::deactivate(const string& hostname, uint16_t port)
+Future<bool> ZooKeeperSlavesManagerStorage::deactivate(const string& hostname, uint16_t port)
 {
   // TODO(benh): Use ZooKeeperSlavesManagerStorage::parse to clean up code.
   int ret;
@@ -431,7 +431,7 @@ Promise<bool> ZooKeeperSlavesManagerStor
 }
 
 
-Promise<bool> ZooKeeperSlavesManagerStorage::connected()
+Future<bool> ZooKeeperSlavesManagerStorage::connected()
 {
   int ret;
 
@@ -467,7 +467,7 @@ Promise<bool> ZooKeeperSlavesManagerStor
 }
 
 
-Promise<bool> ZooKeeperSlavesManagerStorage::reconnecting()
+Future<bool> ZooKeeperSlavesManagerStorage::reconnecting()
 {
   LOG(INFO) << "Slaves manager storage lost connection to ZooKeeper, "
 	    << "attempting to reconnect ...";
@@ -475,7 +475,7 @@ Promise<bool> ZooKeeperSlavesManagerStor
 }
 
 
-Promise<bool> ZooKeeperSlavesManagerStorage::reconnected()
+Future<bool> ZooKeeperSlavesManagerStorage::reconnected()
 {
   LOG(INFO) << "Slaves manager storage has reconnected ...";
 
@@ -485,7 +485,7 @@ Promise<bool> ZooKeeperSlavesManagerStor
 }
 
 
-Promise<bool> ZooKeeperSlavesManagerStorage::expired()
+Future<bool> ZooKeeperSlavesManagerStorage::expired()
 {
   LOG(WARNING) << "Slaves manager storage session expired!";
 
@@ -501,7 +501,7 @@ Promise<bool> ZooKeeperSlavesManagerStor
 }
 
 
-Promise<bool> ZooKeeperSlavesManagerStorage::updated(const string& path)
+Future<bool> ZooKeeperSlavesManagerStorage::updated(const string& path)
 {
   int ret;
   string result;
@@ -636,19 +636,19 @@ SlavesManager::SlavesManager(const Confi
     process::spawn(storage);
   }
 
-  // Set up our HTTP endpoints.
-  installHttpHandler("add", &SlavesManager::add);
-  installHttpHandler("remove", &SlavesManager::remove);
-  installHttpHandler("activate", &SlavesManager::activate);
-  installHttpHandler("deactivate", &SlavesManager::deactivate);
-  installHttpHandler("activated", &SlavesManager::activated);
-  installHttpHandler("deactivated", &SlavesManager::deactivated);
+  // Setup our HTTP endpoints.
+  route("add", &SlavesManager::add);
+  route("remove", &SlavesManager::remove);
+  route("activate", &SlavesManager::activate);
+  route("deactivate", &SlavesManager::deactivate);
+  route("activated", &SlavesManager::activated);
+  route("deactivated", &SlavesManager::deactivated);
 }
 
 
 SlavesManager::~SlavesManager()
 {
-  process::post(storage->self(), process::TERMINATE);
+  process::terminate(storage->self());
   process::wait(storage->self());
   delete storage;
 }
@@ -668,7 +668,7 @@ bool SlavesManager::add(const string& ho
   // Ignore request if slave is already active.
   if (active.contains(hostname, port)) {
     LOG(WARNING) << "Attempted to add an already added slave!";
-    return true;
+    return false;
   }
 
   // Make sure this slave is not currently deactivated.
@@ -679,8 +679,13 @@ bool SlavesManager::add(const string& ho
   }
 
   // Ask the storage system to persist the addition.
-  if (process::call(storage->self(), &SlavesManagerStorage::add,
-                    hostname, port)) {
+  Future<bool> added =
+    process::dispatch(storage->self(), &SlavesManagerStorage::add,
+                      hostname, port);
+
+  added.await();
+
+  if (added.isReady() && added.get()) {
     active.put(hostname, port);
 
     // Tell the master that this slave is now active.
@@ -704,8 +709,13 @@ bool SlavesManager::remove(const string&
   }
 
   // Get the storage system to persist the removal.
-  if (process::call(storage->self(), &SlavesManagerStorage::remove,
-                    hostname, port)) {
+  Future<bool> removed =
+    process::dispatch(storage->self(), &SlavesManagerStorage::remove,
+                      hostname, port);
+
+  removed.await();
+
+  if (removed.isReady() && removed.get()) {
     active.remove(hostname, port);
     inactive.remove(hostname, port);
 
@@ -725,8 +735,13 @@ bool SlavesManager::activate(const strin
   // Make sure the slave is currently deactivated.
   if (inactive.contains(hostname, port)) {
     // Get the storage system to persist the activation.
-    if (process::call(storage->self(), &SlavesManagerStorage::activate,
-                      hostname, port)) {
+    Future<bool> activated =
+      process::dispatch(storage->self(), &SlavesManagerStorage::activate,
+                        hostname, port);
+
+    activated.await();
+
+    if (activated.isReady() && activated.get()) {
       active.put(hostname, port);
       inactive.remove(hostname, port);
 
@@ -747,8 +762,13 @@ bool SlavesManager::deactivate(const str
   // Make sure the slave is currently activated.
   if (active.contains(hostname, port)) {
     // Get the storage system to persist the deactivation.
-    if (process::call(storage->self(), &SlavesManagerStorage::deactivate,
-                      hostname, port)) {
+    Future<bool> deactivated =
+      process::dispatch(storage->self(), &SlavesManagerStorage::deactivate,
+                        hostname, port);
+
+    deactivated.await();
+
+    if (deactivated.isReady() && deactivated.get()) {
       active.remove(hostname, port);
       inactive.put(hostname, port);
 
@@ -795,7 +815,7 @@ void SlavesManager::updateInactive(
 }
 
 
-Promise<HttpResponse> SlavesManager::add(const HttpRequest& request)
+Future<HttpResponse> SlavesManager::add(const HttpRequest& request)
 {
   // Parse the query to get out the slave hostname and port.
   string hostname = "";
@@ -838,7 +858,7 @@ Promise<HttpResponse> SlavesManager::add
 }
 
 
-Promise<HttpResponse> SlavesManager::remove(const HttpRequest& request)
+Future<HttpResponse> SlavesManager::remove(const HttpRequest& request)
 {
   // Parse the query to get out the slave hostname and port.
   string hostname = "";
@@ -881,7 +901,7 @@ Promise<HttpResponse> SlavesManager::rem
 }
 
 
-Promise<HttpResponse> SlavesManager::activate(const HttpRequest& request)
+Future<HttpResponse> SlavesManager::activate(const HttpRequest& request)
 {
   // Parse the query to get out the slave hostname and port.
   string hostname = "";
@@ -924,7 +944,7 @@ Promise<HttpResponse> SlavesManager::act
 }
 
 
-Promise<HttpResponse> SlavesManager::deactivate(const HttpRequest& request)
+Future<HttpResponse> SlavesManager::deactivate(const HttpRequest& request)
 {
   // Parse the query to get out the slave hostname and port.
   string hostname = "";
@@ -967,7 +987,7 @@ Promise<HttpResponse> SlavesManager::dea
 }
 
 
-Promise<HttpResponse> SlavesManager::activated(const HttpRequest& request)
+Future<HttpResponse> SlavesManager::activated(const HttpRequest& request)
 {
   LOG(INFO) << "Slaves manager received HTTP request for activated slaves";
 
@@ -985,7 +1005,7 @@ Promise<HttpResponse> SlavesManager::act
 }
 
 
-Promise<HttpResponse> SlavesManager::deactivated(const HttpRequest& request)
+Future<HttpResponse> SlavesManager::deactivated(const HttpRequest& request)
 {
   LOG(INFO) << "Slaves manager received HTTP request for deactivated slaves";
 

Modified: incubator/mesos/trunk/src/master/slaves_manager.hpp
URL: http://svn.apache.org/viewvc/incubator/mesos/trunk/src/master/slaves_manager.hpp?rev=1236485&r1=1236484&r2=1236485&view=diff
==============================================================================
--- incubator/mesos/trunk/src/master/slaves_manager.hpp (original)
+++ incubator/mesos/trunk/src/master/slaves_manager.hpp Fri Jan 27 01:25:13 2012
@@ -36,10 +36,10 @@ class Master;
 class SlavesManagerStorage : public process::Process<SlavesManagerStorage>
 {
 public:
-  virtual process::Promise<bool> add(const std::string& hostname, uint16_t port) { return true; }
-  virtual process::Promise<bool> remove(const std::string& hostname, uint16_t port) { return true; }
-  virtual process::Promise<bool> activate(const std::string& hostname, uint16_t port) { return true; }
-  virtual process::Promise<bool> deactivate(const std::string& hostname, uint16_t port) { return true; }
+  virtual process::Future<bool> add(const std::string& hostname, uint16_t port) { return true; }
+  virtual process::Future<bool> remove(const std::string& hostname, uint16_t port) { return true; }
+  virtual process::Future<bool> activate(const std::string& hostname, uint16_t port) { return true; }
+  virtual process::Future<bool> deactivate(const std::string& hostname, uint16_t port) { return true; }
 };
 
 
@@ -61,12 +61,12 @@ public:
   void updateInactive(const multihashmap<std::string, uint16_t>& updated);
 
 private:
-  process::Promise<process::HttpResponse> add(const process::HttpRequest& request);
-  process::Promise<process::HttpResponse> remove(const process::HttpRequest& request);
-  process::Promise<process::HttpResponse> activate(const process::HttpRequest& request);
-  process::Promise<process::HttpResponse> deactivate(const process::HttpRequest& request);
-  process::Promise<process::HttpResponse> activated(const process::HttpRequest& request);
-  process::Promise<process::HttpResponse> deactivated(const process::HttpRequest& request);
+  process::Future<process::HttpResponse> add(const process::HttpRequest& request);
+  process::Future<process::HttpResponse> remove(const process::HttpRequest& request);
+  process::Future<process::HttpResponse> activate(const process::HttpRequest& request);
+  process::Future<process::HttpResponse> deactivate(const process::HttpRequest& request);
+  process::Future<process::HttpResponse> activated(const process::HttpRequest& request);
+  process::Future<process::HttpResponse> deactivated(const process::HttpRequest& request);
 
   const process::PID<Master> master;
 

Modified: incubator/mesos/trunk/src/python/native/mesos_executor_driver_impl.cpp
URL: http://svn.apache.org/viewvc/incubator/mesos/trunk/src/python/native/mesos_executor_driver_impl.cpp?rev=1236485&r1=1236484&r2=1236485&view=diff
==============================================================================
--- incubator/mesos/trunk/src/python/native/mesos_executor_driver_impl.cpp (original)
+++ incubator/mesos/trunk/src/python/native/mesos_executor_driver_impl.cpp Fri Jan 27 01:25:13 2012
@@ -175,13 +175,23 @@ void MesosExecutorDriverImpl_dealloc(Mes
 {
   if (self->driver != NULL) {
     self->driver->stop();
+    // We need to wrap the driver destructor in an "allow threads"
+    // macro since the MesosExecutorDriver destructor waits for the
+    // ExecutorProcess to terminate and there might be a thread that
+    // is trying to acquire the GIL to call through the
+    // ProxyExecutor. It will only be after this thread executes that
+    // the ExecutorProcess might actually get a terminate.
+    Py_BEGIN_ALLOW_THREADS
     delete self->driver;
+    Py_END_ALLOW_THREADS
     self->driver = NULL;
   }
+
   if (self->proxyExecutor != NULL) {
     delete self->proxyExecutor;
     self->proxyExecutor = NULL;
   }
+
   MesosExecutorDriverImpl_clear(self);
   self->ob_type->tp_free((PyObject*) self);
 }

Modified: incubator/mesos/trunk/src/python/native/mesos_scheduler_driver_impl.cpp
URL: http://svn.apache.org/viewvc/incubator/mesos/trunk/src/python/native/mesos_scheduler_driver_impl.cpp?rev=1236485&r1=1236484&r2=1236485&view=diff
==============================================================================
--- incubator/mesos/trunk/src/python/native/mesos_scheduler_driver_impl.cpp (original)
+++ incubator/mesos/trunk/src/python/native/mesos_scheduler_driver_impl.cpp Fri Jan 27 01:25:13 2012
@@ -215,7 +215,15 @@ void MesosSchedulerDriverImpl_dealloc(Me
 {
   if (self->driver != NULL) {
     self->driver->stop();
+    // We need to wrap the driver destructor in an "allow threads"
+    // macro since the MesosSchedulerDriver destructor waits for the
+    // SchedulerProcess to terminate and there might be a thread that
+    // is trying to acquire the GIL to call through the
+    // ProxyScheduler. It will only be after this thread executes that
+    // the SchedulerProcess might actually get a terminate.
+    Py_BEGIN_ALLOW_THREADS
     delete self->driver;
+    Py_END_ALLOW_THREADS
     self->driver = NULL;
   }
 
@@ -273,9 +281,9 @@ PyObject* MesosSchedulerDriverImpl_stop(
     return NULL;
   }
 
-  bool failover = false;
+  bool failover = false; // Should match default in mesos.py.
 
-  if (!PyArg_ParseTuple(args, "b", &failover)) {
+  if (!PyArg_ParseTuple(args, "|b", &failover)) {
     return NULL;
   }
 

Modified: incubator/mesos/trunk/src/python/native/proxy_scheduler.cpp
URL: http://svn.apache.org/viewvc/incubator/mesos/trunk/src/python/native/proxy_scheduler.cpp?rev=1236485&r1=1236484&r2=1236485&view=diff
==============================================================================
--- incubator/mesos/trunk/src/python/native/proxy_scheduler.cpp (original)
+++ incubator/mesos/trunk/src/python/native/proxy_scheduler.cpp Fri Jan 27 01:25:13 2012
@@ -91,6 +91,7 @@ void ProxyScheduler::resourceOffers(Sche
                             (char*) "OO",
                             impl,
                             list);
+
   if (res == NULL) {
     cerr << "Failed to call scheduler's resourceOffer" << endl;
     goto cleanup;

Modified: incubator/mesos/trunk/src/sched/sched.cpp
URL: http://svn.apache.org/viewvc/incubator/mesos/trunk/src/sched/sched.cpp?rev=1236485&r1=1236484&r2=1236485&view=diff
==============================================================================
--- incubator/mesos/trunk/src/sched/sched.cpp (original)
+++ incubator/mesos/trunk/src/sched/sched.cpp Fri Jan 27 01:25:13 2012
@@ -30,8 +30,6 @@
 #include <string>
 #include <sstream>
 
-#include <tr1/functional>
-
 #include <mesos/scheduler.hpp>
 
 #include <process/dispatch.hpp>
@@ -65,10 +63,9 @@ using std::vector;
 
 using process::wait; // Necessary on some OS's to disambiguate.
 
-using std::tr1::bind;
-
 
-namespace mesos { namespace internal {
+namespace mesos {
+namespace internal {
 
 // The scheduler process (below) is responsible for interacting with
 // the master and responding to Mesos API calls from scheduler
@@ -80,59 +77,63 @@ class SchedulerProcess : public Protobuf
 {
 public:
   SchedulerProcess(MesosSchedulerDriver* _driver,
-                   Scheduler* _sched,
+                   Scheduler* _scheduler,
                    const FrameworkID& _frameworkId,
-                   const FrameworkInfo& _framework)
+                   const FrameworkInfo& _framework,
+                   pthread_mutex_t* _mutex,
+                   pthread_cond_t* _cond)
     : driver(_driver),
-      sched(_sched),
+      scheduler(_scheduler),
       frameworkId(_frameworkId),
       framework(_framework),
+      mutex(_mutex),
+      cond(_cond),
       master(UPID()),
       failover(!(_frameworkId == "")),
       connected(false),
       aborted(false)
   {
-    installProtobufHandler<NewMasterDetectedMessage>(
+    install<NewMasterDetectedMessage>(
         &SchedulerProcess::newMasterDetected,
         &NewMasterDetectedMessage::pid);
 
-    installProtobufHandler<NoMasterDetectedMessage>(
+    install<NoMasterDetectedMessage>(
         &SchedulerProcess::noMasterDetected);
 
-    installProtobufHandler<FrameworkRegisteredMessage>(
+    install<FrameworkRegisteredMessage>(
         &SchedulerProcess::registered,
         &FrameworkRegisteredMessage::framework_id);
 
-    installProtobufHandler<FrameworkReregisteredMessage>(
+    install<FrameworkReregisteredMessage>(
         &SchedulerProcess::reregistered,
         &FrameworkReregisteredMessage::framework_id);
 
-    installProtobufHandler<ResourceOffersMessage>(
+    install<ResourceOffersMessage>(
         &SchedulerProcess::resourceOffers,
         &ResourceOffersMessage::offers,
         &ResourceOffersMessage::pids);
 
-    installProtobufHandler<RescindResourceOfferMessage>(
+    install<RescindResourceOfferMessage>(
         &SchedulerProcess::rescindOffer,
         &RescindResourceOfferMessage::offer_id);
 
-    installProtobufHandler<StatusUpdateMessage>(
+    install<StatusUpdateMessage>(
         &SchedulerProcess::statusUpdate,
         &StatusUpdateMessage::update,
         &StatusUpdateMessage::pid);
 
-    installProtobufHandler<LostSlaveMessage>(
+    install<LostSlaveMessage>(
         &SchedulerProcess::lostSlave,
         &LostSlaveMessage::slave_id);
 
-    installProtobufHandler<ExecutorToFrameworkMessage>(
+    install<ExecutorToFrameworkMessage>(
         &SchedulerProcess::frameworkMessage,
         &ExecutorToFrameworkMessage::slave_id,
         &ExecutorToFrameworkMessage::framework_id,
         &ExecutorToFrameworkMessage::executor_id,
         &ExecutorToFrameworkMessage::data);
 
-    installProtobufHandler<FrameworkErrorMessage>(
+    install<FrameworkErrorMessage>(
         &SchedulerProcess::error,
         &FrameworkErrorMessage::code,
         &FrameworkErrorMessage::message);
@@ -176,7 +177,7 @@ protected:
     connected = true;
     failover = false;
 
-    invoke(bind(&Scheduler::registered, sched, driver, frameworkId));
+    scheduler->registered(driver, frameworkId);
   }
 
   void reregistered(const FrameworkID& frameworkId)
@@ -237,7 +238,7 @@ protected:
       }
     }
 
-    invoke(bind(&Scheduler::resourceOffers, sched, driver, offers));
+    scheduler->resourceOffers(driver, offers);
   }
 
   void rescindOffer(const OfferID& offerId)
@@ -251,7 +252,8 @@ protected:
     VLOG(1) << "Rescinded offer " << offerId;
 
     savedOffers.erase(offerId);
-    invoke(bind(&Scheduler::offerRescinded, sched, driver, offerId));
+
+    scheduler->offerRescinded(driver, offerId);
   }
 
   void statusUpdate(const StatusUpdate& update, const UPID& pid)
@@ -279,7 +281,7 @@ protected:
     // multiple times (of course, if a scheduler re-uses a TaskID,
     // that could be bad.
 
-    invoke(bind(&Scheduler::statusUpdate, sched, driver, status));
+    scheduler->statusUpdate(driver, status);
 
     // Send a status update acknowledgement ONLY if not aborted!
     if (!aborted && pid) {
@@ -306,7 +308,8 @@ protected:
     VLOG(1) << "Lost slave " << slaveId;
 
     savedSlavePids.erase(slaveId);
-    invoke(bind(&Scheduler::slaveLost, sched, driver, slaveId));
+
+    scheduler->slaveLost(driver, slaveId);
   }
 
   void frameworkMessage(const SlaveID& slaveId,
@@ -321,8 +324,7 @@ protected:
 
     VLOG(1) << "Received framework message";
 
-    invoke(bind(&Scheduler::frameworkMessage,
-                sched, driver, slaveId, executorId, data));
+    scheduler->frameworkMessage(driver, slaveId, executorId, data);
   }
 
   void error(int32_t code, const string& message)
@@ -336,7 +338,7 @@ protected:
 
     driver->abort();
 
-    invoke(bind(&Scheduler::error, sched, driver, code, message));
+    scheduler->error(driver, code, message);
   }
 
   void stop(bool failover)
@@ -352,14 +354,21 @@ protected:
       message.mutable_framework_id()->MergeFrom(frameworkId);
       send(master, message);
     }
+
+    Lock lock(mutex);
+    pthread_cond_signal(cond);
   }
 
-  // NOTE: This function stops any further callbacks from reaching the
-  // scheduler by informing the master. The abort flag stops
-  // those callbacks that are already enqueued.
+  // NOTE: This function informs the master to stop attempting to send
+  // messages to this scheduler. The abort flag stops any already
+  // enqueued messages or messages in flight from being handled. We
+  // don't want to terminate the process because one might do a
+  // MesosSchedulerDriver::stop later, which dispatches to
+  // SchedulerProcess::stop.
   void abort()
   {
     VLOG(1) << "Aborting the framework";
+
     aborted = true;
 
     if (!connected) {
@@ -373,6 +382,8 @@ protected:
     message.mutable_framework_id()->MergeFrom(frameworkId);
     send(master, message);
 
+    Lock lock(mutex);
+    pthread_cond_signal(cond);
   }
 
   void killTask(const TaskID& taskId)
@@ -424,7 +435,7 @@ protected:
         status->mutable_task_id()->MergeFrom(task.task_id());
         status->set_state(TASK_LOST);
         status->set_message("Master Disconnected");
-        update.set_timestamp(elapsedTime());
+        update.set_timestamp(Clock::now());
         update.set_uuid(UUID::random().toBytes());
 
         statusUpdate(update, UPID());
@@ -517,9 +528,11 @@ private:
   friend class mesos::MesosSchedulerDriver;
 
   MesosSchedulerDriver* driver;
-  Scheduler* sched;
+  Scheduler* scheduler;
   FrameworkID frameworkId;
   FrameworkInfo framework;
+  pthread_mutex_t* mutex;
+  pthread_cond_t* cond;
   bool failover;
   UPID master;
 
@@ -530,7 +543,8 @@ private:
   hashmap<SlaveID, UPID> savedSlavePids;
 };
 
-}} // namespace mesos { namespace internal {
+} // namespace internal {
+} // namespace mesos {
 
 
 // Implementation of C++ API.
@@ -546,7 +560,7 @@ private:
 // (2) There is a variable called state, that represents the current
 //     state of the driver and is used to enforce its state transitions.
 
-MesosSchedulerDriver::MesosSchedulerDriver(Scheduler* sched,
+MesosSchedulerDriver::MesosSchedulerDriver(Scheduler* scheduler,
                                            const std::string& frameworkName,
                                            const ExecutorInfo& executorInfo,
                                            const string& url,
@@ -563,15 +577,15 @@ MesosSchedulerDriver::MesosSchedulerDriv
   } catch (ConfigurationException& e) {
     // TODO(benh|matei): Are error callbacks not fatal!?
     string message = string("Configuration error: ") + e.what();
-    sched->error(this, 2, message);
+    scheduler->error(this, 2, message);
     conf = new Configuration();
   }
   conf->set("url", url); // Override URL param with the one from the user
-  init(sched, conf, frameworkId, frameworkName, executorInfo);
+  init(scheduler, conf, frameworkId, frameworkName, executorInfo);
 }
 
 
-MesosSchedulerDriver::MesosSchedulerDriver(Scheduler* sched,
+MesosSchedulerDriver::MesosSchedulerDriver(Scheduler* scheduler,
                                            const std::string& frameworkName,
                                            const ExecutorInfo& executorInfo,
                                            const map<string, string> &params,
@@ -588,14 +602,14 @@ MesosSchedulerDriver::MesosSchedulerDriv
   } catch (ConfigurationException& e) {
     // TODO(benh|matei): Are error callbacks not fatal?
     string message = string("Configuration error: ") + e.what();
-    sched->error(this, 2, message);
+    scheduler->error(this, 2, message);
     conf = new Configuration();
   }
-  init(sched, conf, frameworkId, frameworkName, executorInfo);
+  init(scheduler, conf, frameworkId, frameworkName, executorInfo);
 }
 
 
-MesosSchedulerDriver::MesosSchedulerDriver(Scheduler* sched,
+MesosSchedulerDriver::MesosSchedulerDriver(Scheduler* scheduler,
                                            const std::string& frameworkName,
                                            const ExecutorInfo& executorInfo,
                                            int argc,
@@ -612,14 +626,14 @@ MesosSchedulerDriver::MesosSchedulerDriv
     conf = new Configuration(configurator.load(argc, argv, false));
   } catch (ConfigurationException& e) {
     string message = string("Configuration error: ") + e.what();
-    sched->error(this, 2, message);
+    scheduler->error(this, 2, message);
     conf = new Configuration();
   }
-  init(sched, conf, frameworkId, frameworkName, executorInfo);
+  init(scheduler, conf, frameworkId, frameworkName, executorInfo);
 }
 
 
-void MesosSchedulerDriver::init(Scheduler* _sched,
+void MesosSchedulerDriver::init(Scheduler* _scheduler,
                                 Configuration* _conf,
                                 const FrameworkID& _frameworkId,
                                 const std::string& _frameworkName,
@@ -627,7 +641,7 @@ void MesosSchedulerDriver::init(Schedule
 {
   GOOGLE_PROTOBUF_VERIFY_VERSION;
 
-  sched = _sched;
+  scheduler = _scheduler;
   conf = _conf;
   frameworkId = _frameworkId;
   frameworkName = _frameworkName;
@@ -667,7 +681,9 @@ MesosSchedulerDriver::~MesosSchedulerDri
   // that we could add a method to libprocess that told us whether or
   // not this was about to be deadlock, and possibly report this back
   // to the user somehow. Note that we will also wait forever if
-  // MesosSchedulerDriver::stop was never called.
+  // MesosSchedulerDriver::stop was never called. It might make sense
+  // to try and add some more debug output for the case where we wait
+  // indefinitely due to deadlock ...
   if (process != NULL) {
     wait(process);
     delete process;
@@ -714,7 +730,10 @@ Status MesosSchedulerDriver::start()
 
   CHECK(process == NULL);
 
-  process = new SchedulerProcess(this, sched, frameworkId, framework);
+  // TODO(benh): Consider using a libprocess Latch rather than a
+  // pthread mutex and condition variable for signaling.
+  process = new SchedulerProcess(this, scheduler, frameworkId,
+                                 framework, &mutex, &cond);
 
   UPID pid = spawn(process);
 
@@ -755,8 +774,6 @@ Status MesosSchedulerDriver::stop(bool f
   // local clusters to exist (i.e. not use global vars in local.cpp) so that
   // ours can just be an instance variable in MesosSchedulerDriver.
 
-  pthread_cond_signal(&cond);
-
   return OK;
 }
 
@@ -779,8 +796,6 @@ Status MesosSchedulerDriver::abort()
 
   state = ABORTED;
 
-  pthread_cond_signal(&cond);
-
   return OK;
 }
 
@@ -801,11 +816,30 @@ Status MesosSchedulerDriver::join()
     pthread_cond_wait(&cond, &mutex);
   }
 
-  if (state == ABORTED)
+  if (state == ABORTED) {
     return DRIVER_ABORTED;
+  }
 
   CHECK(state == STOPPED);
 
+//   // If the driver has been stopped, then we wait for SchedulerProcess
+//   // to terminate. This is necessary to support languages like Python
+//   // that use reference counting on their objects. Without doing this
+//   // the Python interpreter might attempt to delete the
+//   // MesosSchedulerDriver instance from within the context of the
+//   // SchedulerProcess (because a thread is trying to call into a
+//   // scheduler but there are no more references to
+//   // MesosSchedulerDriver), which means we would deadlock and wait
+//   // forever since the MesosSchedulerDriver destructor waits for the
+//   // process to terminate forever. Note that we don't need to wait if
+//   // the driver has been aborted because we won't actually call back
+//   // in to the driver in that case, and thus we won't have 
+
+//   lock.unlock(); // Let a thread in SchedulerProcess use the driver.
+
+//   CHECK(process != NULL);
+//   wait(process);
+
   return OK;
 }
 
@@ -895,12 +929,6 @@ Status MesosSchedulerDriver::sendFramewo
 }
 
 
-void MesosSchedulerDriver::error(int code, const string& message)
-{
-  sched->error(this, code, message);
-}
-
-
 Status MesosSchedulerDriver::requestResources(
     const vector<ResourceRequest>& requests)
 {

Modified: incubator/mesos/trunk/src/slave/http.cpp
URL: http://svn.apache.org/viewvc/incubator/mesos/trunk/src/slave/http.cpp?rev=1236485&r1=1236484&r2=1236485&view=diff
==============================================================================
--- incubator/mesos/trunk/src/slave/http.cpp (original)
+++ incubator/mesos/trunk/src/slave/http.cpp Fri Jan 27 01:25:13 2012
@@ -30,9 +30,9 @@
 #include "slave/http.hpp"
 #include "slave/slave.hpp"
 
+using process::Future;
 using process::HttpResponse;
 using process::HttpRequest;
-using process::Promise;
 
 using std::string;
 
@@ -113,7 +113,7 @@ JSON::Object model(const Framework& fram
 
 namespace http {
 
-Promise<HttpResponse> vars(
+Future<HttpResponse> vars(
     const Slave& slave,
     const HttpRequest& request)
 {
@@ -146,7 +146,7 @@ Promise<HttpResponse> vars(
 
 namespace json {
 
-Promise<HttpResponse> stats(
+Future<HttpResponse> stats(
     const Slave& slave,
     const HttpRequest& request)
 {
@@ -175,7 +175,7 @@ Promise<HttpResponse> stats(
 }
 
 
-Promise<HttpResponse> state(
+Future<HttpResponse> state(
     const Slave& slave,
     const HttpRequest& request)
 {

Modified: incubator/mesos/trunk/src/slave/http.hpp
URL: http://svn.apache.org/viewvc/incubator/mesos/trunk/src/slave/http.hpp?rev=1236485&r1=1236484&r2=1236485&view=diff
==============================================================================
--- incubator/mesos/trunk/src/slave/http.hpp (original)
+++ incubator/mesos/trunk/src/slave/http.hpp Fri Jan 27 01:25:13 2012
@@ -34,7 +34,7 @@ namespace http {
 
 // Returns current vars in "key value\n" format (keys do not contain
 // spaces, values may contain spaces but are ended by a newline).
-process::Promise<process::HttpResponse> vars(
+process::Future<process::HttpResponse> vars(
     const Slave& slave,
     const process::HttpRequest& request);
 
@@ -42,13 +42,13 @@ process::Promise<process::HttpResponse> 
 namespace json {
 
 // Returns current statistics of the slave.
-process::Promise<process::HttpResponse> stats(
+process::Future<process::HttpResponse> stats(
     const Slave& slave,
     const process::HttpRequest& request);
 
 
 // Returns current state of the cluster that the slave knows about.
-process::Promise<process::HttpResponse> state(
+process::Future<process::HttpResponse> state(
     const Slave& slave,
     const process::HttpRequest& request);
 

Modified: incubator/mesos/trunk/src/slave/reaper.cpp
URL: http://svn.apache.org/viewvc/incubator/mesos/trunk/src/slave/reaper.cpp?rev=1236485&r1=1236484&r2=1236485&view=diff
==============================================================================
--- incubator/mesos/trunk/src/slave/reaper.cpp (original)
+++ incubator/mesos/trunk/src/slave/reaper.cpp Fri Jan 27 01:25:13 2012
@@ -20,6 +20,7 @@
 #include <sys/wait.h>
 
 #include <process/dispatch.hpp>
+#include <process/timer.hpp>
 
 #include "reaper.hpp"
 
@@ -28,7 +29,9 @@
 using namespace process;
 
 
-namespace mesos { namespace internal { namespace slave {
+namespace mesos {
+namespace internal {
+namespace slave {
 
 Reaper::Reaper() {}
 
@@ -43,24 +46,28 @@ void Reaper::addProcessExitedListener(
 }
 
 
-void Reaper::operator () ()
+void Reaper::initialize()
 {
-  while (true) {
-    serve(1);
-    if (name() == TIMEOUT) {
-      // Check whether any child process has exited.
-      pid_t pid;
-      int status;
-      if ((pid = waitpid((pid_t) -1, &status, WNOHANG)) > 0) {
-        foreach (const PID<ProcessExitedListener>& listener, listeners) {
-          dispatch(listener, &ProcessExitedListener::processExited,
-                   pid, status);
-        }
-      }
-    } else if (name() == TERMINATE) {
-      return;
+  delay(1.0, self(), &Reaper::reap);
+}
+
+
+void Reaper::reap()
+{
+  // Check whether any child process has exited.
+  pid_t pid;
+  int status;
+  if ((pid = waitpid((pid_t) -1, &status, WNOHANG)) > 0) {
+    foreach (const PID<ProcessExitedListener>& listener, listeners) {
+      dispatch(listener, &ProcessExitedListener::processExited, pid, status);
     }
   }
+
+  delay(1.0, self(), &Reaper::reap); // Reap forever!
 }
 
-}}} // namespace mesos { namespace internal { namespace slave {
+} // namespace slave {
+} // namespace internal {
+} // namespace mesos {
+
+

Modified: incubator/mesos/trunk/src/slave/reaper.hpp
URL: http://svn.apache.org/viewvc/incubator/mesos/trunk/src/slave/reaper.hpp?rev=1236485&r1=1236484&r2=1236485&view=diff
==============================================================================
--- incubator/mesos/trunk/src/slave/reaper.hpp (original)
+++ incubator/mesos/trunk/src/slave/reaper.hpp Fri Jan 27 01:25:13 2012
@@ -24,7 +24,9 @@
 #include <process/process.hpp>
 
 
-namespace mesos { namespace internal { namespace slave {
+namespace mesos {
+namespace internal {
+namespace slave {
 
 class ProcessExitedListener : public process::Process<ProcessExitedListener>
 {
@@ -42,13 +44,17 @@ public:
   void addProcessExitedListener(const process::PID<ProcessExitedListener>&);
 
 protected:
-  virtual void operator () ();
+  virtual void initialize();
+
+  void reap();
 
 private:
   std::set<process::PID<ProcessExitedListener> > listeners;
 };
 
 
-}}} // namespace mesos { namespace internal { namespace slave {
+} // namespace slave {
+} // namespace internal {
+} // namespace mesos {
 
 #endif // __REAPER_HPP__

Modified: incubator/mesos/trunk/src/slave/slave.cpp
URL: http://svn.apache.org/viewvc/incubator/mesos/trunk/src/slave/slave.cpp?rev=1236485&r1=1236484&r2=1236485&view=diff
==============================================================================
--- incubator/mesos/trunk/src/slave/slave.cpp (original)
+++ incubator/mesos/trunk/src/slave/slave.cpp Fri Jan 27 01:25:13 2012
@@ -76,9 +76,7 @@ Slave::Slave(const Resources& _resources
     resources(_resources),
     local(_local),
     isolationModule(_isolationModule)
-{
-  initialize();
-}
+{}
 
 
 Slave::Slave(const Configuration& _conf,
@@ -94,8 +92,6 @@ Slave::Slave(const Configuration& _conf,
 
   attributes =
     Attributes::parse(conf.get<string>("attributes", ""));
-
-  initialize();
 }
 
 
@@ -152,6 +148,43 @@ void Slave::registerOptions(Configurator
 
 void Slave::initialize()
 {
+  LOG(INFO) << "Slave started at " << self();
+  LOG(INFO) << "Slave resources: " << resources;
+
+  Result<string> result = utils::os::hostname();
+
+  if (result.isError()) {
+    LOG(FATAL) << "Failed to get hostname: " << result.error();
+  }
+
+  CHECK(result.isSome());
+
+  string hostname = result.get();
+
+  // Check and see if we have a different public DNS name. Normally
+  // this is our hostname, but on EC2 we look for the MESOS_PUBLIC_DNS
+  // environment variable. This allows the master to display our
+  // public name in its webui.
+  string webui_hostname = hostname;
+  if (getenv("MESOS_PUBLIC_DNS") != NULL) {
+    webui_hostname = getenv("MESOS_PUBLIC_DNS");
+  }
+
+  // Initialize slave info.
+  info.set_hostname(hostname);
+  info.set_webui_hostname(webui_hostname);
+  info.set_webui_port(conf.get<int>("webui_port", 8081));
+  info.mutable_resources()->MergeFrom(resources);
+  info.mutable_attributes()->MergeFrom(attributes);
+
+  // Spawn and initialize the isolation module.
+  // TODO(benh): Seems like the isolation module should really be
+  // spawned before being passed to the slave.
+  spawn(isolationModule);
+  dispatch(isolationModule,
+           &IsolationModule::initialize,
+           conf, local, self());
+
   // Start all the statistics at 0.
   CHECK(TASK_STARTING == TaskState_MIN);
   CHECK(TASK_LOST == TaskState_MAX);
@@ -166,151 +199,104 @@ void Slave::initialize()
   stats.validFrameworkMessages = 0;
   stats.invalidFrameworkMessages = 0;
 
-  startTime = elapsedTime();
+  startTime = Clock::now();
+
   connected = false;
 
   // Install protobuf handlers.
-  installProtobufHandler<NewMasterDetectedMessage>(
+  install<NewMasterDetectedMessage>(
       &Slave::newMasterDetected,
       &NewMasterDetectedMessage::pid);
 
-  installProtobufHandler<NoMasterDetectedMessage>(
+  install<NoMasterDetectedMessage>(
       &Slave::noMasterDetected);
 
-  installProtobufHandler<SlaveRegisteredMessage>(
+  install<SlaveRegisteredMessage>(
       &Slave::registered,
       &SlaveRegisteredMessage::slave_id);
 
-  installProtobufHandler<SlaveReregisteredMessage>(
+  install<SlaveReregisteredMessage>(
       &Slave::reregistered,
       &SlaveReregisteredMessage::slave_id);
 
-  installProtobufHandler<RunTaskMessage>(
+  install<RunTaskMessage>(
       &Slave::runTask,
       &RunTaskMessage::framework,
       &RunTaskMessage::framework_id,
       &RunTaskMessage::pid,
       &RunTaskMessage::task);
 
-  installProtobufHandler<KillTaskMessage>(
+  install<KillTaskMessage>(
       &Slave::killTask,
       &KillTaskMessage::framework_id,
       &KillTaskMessage::task_id);
 
-  installProtobufHandler<ShutdownFrameworkMessage>(
+  install<ShutdownFrameworkMessage>(
       &Slave::shutdownFramework,
       &ShutdownFrameworkMessage::framework_id);
 
-  installProtobufHandler<FrameworkToExecutorMessage>(
+  install<FrameworkToExecutorMessage>(
       &Slave::schedulerMessage,
       &FrameworkToExecutorMessage::slave_id,
       &FrameworkToExecutorMessage::framework_id,
       &FrameworkToExecutorMessage::executor_id,
       &FrameworkToExecutorMessage::data);
 
-  installProtobufHandler<UpdateFrameworkMessage>(
+  install<UpdateFrameworkMessage>(
       &Slave::updateFramework,
       &UpdateFrameworkMessage::framework_id,
       &UpdateFrameworkMessage::pid);
 
-  installProtobufHandler<StatusUpdateAcknowledgementMessage>(
+  install<StatusUpdateAcknowledgementMessage>(
       &Slave::statusUpdateAcknowledgement,
       &StatusUpdateAcknowledgementMessage::slave_id,
       &StatusUpdateAcknowledgementMessage::framework_id,
       &StatusUpdateAcknowledgementMessage::task_id,
       &StatusUpdateAcknowledgementMessage::uuid);
 
-  installProtobufHandler<RegisterExecutorMessage>(
+  install<RegisterExecutorMessage>(
       &Slave::registerExecutor,
       &RegisterExecutorMessage::framework_id,
       &RegisterExecutorMessage::executor_id);
 
-  installProtobufHandler<StatusUpdateMessage>(
+  install<StatusUpdateMessage>(
       &Slave::statusUpdate,
       &StatusUpdateMessage::update);
 
-  installProtobufHandler<ExecutorToFrameworkMessage>(
+  install<ExecutorToFrameworkMessage>(
       &Slave::executorMessage,
       &ExecutorToFrameworkMessage::slave_id,
       &ExecutorToFrameworkMessage::framework_id,
       &ExecutorToFrameworkMessage::executor_id,
       &ExecutorToFrameworkMessage::data);
 
-  // Install some message handlers.
-  installMessageHandler(process::EXITED, &Slave::exited);
-  installMessageHandler("PING", &Slave::ping);
-
-  // Install some HTTP handlers.
-  installHttpHandler(
-      "vars",
-      bind(&http::vars, cref(*this), params::_1));
-
-  installHttpHandler(
-      "stats.json",
-      bind(&http::json::stats, cref(*this), params::_1));
-
-  installHttpHandler(
-      "state.json",
-      bind(&http::json::state, cref(*this), params::_1));
+  install<ShutdownMessage>(
+      &Slave::shutdown);
+  
+  // Install the ping message handler.
+  install("PING", &Slave::ping);
+
+  // Setup some HTTP routes.
+  route("vars", bind(&http::vars, cref(*this), params::_1));
+  route("stats.json", bind(&http::json::stats, cref(*this), params::_1));
+  route("state.json", bind(&http::json::state, cref(*this), params::_1));
 }
 
 
-void Slave::operator () ()
-{
-  LOG(INFO) << "Slave started at " << self();
-  LOG(INFO) << "Slave resources: " << resources;
-
-  Result<string> result = utils::os::hostname();
-
-  if (result.isError()) {
-    LOG(FATAL) << "Failed to get hostname: " << result.error();
-  }
-
-  CHECK(result.isSome());
-
-  string hostname = result.get();
-
-  // Check and see if we have a different public DNS name. Normally
-  // this is our hostname, but on EC2 we look for the MESOS_PUBLIC_DNS
-  // environment variable. This allows the master to display our
-  // public name in its web UI.
-  string webui_hostname = hostname;
-  if (getenv("MESOS_PUBLIC_DNS") != NULL) {
-    webui_hostname = getenv("MESOS_PUBLIC_DNS");
-  }
-
-  // Initialize slave info.
-  info.set_hostname(hostname);
-  info.set_webui_hostname(webui_hostname);
-  info.set_webui_port(conf.get<int>("webui_port", 8081));
-  info.mutable_resources()->MergeFrom(resources);
-  info.mutable_attributes()->MergeFrom(attributes);
-
-  // Spawn and initialize the isolation module.
-  // TODO(benh): Seems like the isolation module should really be
-  // spawned before being passed to the slave.
-  spawn(isolationModule);
-  dispatch(isolationModule,
-           &IsolationModule::initialize,
-           conf, local, self());
-
-  while (true) {
-    serve(1);
-    if (name() == TERMINATE) {
-      LOG(INFO) << "Asked to terminate by " << from();
-      foreachkey (const FrameworkID& frameworkId, frameworks) {
-        // TODO(benh): Because a shut down isn't instantaneous (but has
-        // a shut down/kill phases) we might not actually propogate all
-        // the status updates appropriately here. Consider providing
-        // an alternative function which skips the shut down phase and
-        // simply does a kill (sending all status updates
-        // immediately). Of course, this still isn't sufficient
-        // because those status updates might get lost and we won't
-        // resend them unless we build that into the system.
-        shutdownFramework(frameworkId);
-      }
-      break;
-    }
+void Slave::finalize()
+{
+  LOG(INFO) << "Slave terminating";
+
+  foreachkey (const FrameworkID& frameworkId, frameworks) {
+    // TODO(benh): Because a shut down isn't instantaneous (but has
+    // a shut down/kill phases) we might not actually propogate all
+    // the status updates appropriately here. Consider providing
+    // an alternative function which skips the shut down phase and
+    // simply does a kill (sending all status updates
+    // immediately). Of course, this still isn't sufficient
+    // because those status updates might get lost and we won't
+    // resend them unless we build that into the system.
+    shutdownFramework(frameworkId);
   }
 
   // Stop the isolation module.
@@ -319,6 +305,13 @@ void Slave::operator () ()
 }
 
 
+void Slave::shutdown()
+{
+  LOG(INFO) << "Slave asked to shut down";
+  terminate(self());
+}
+
+
 void Slave::newMasterDetected(const UPID& pid)
 {
   LOG(INFO) << "New master detected at " << pid;
@@ -432,7 +425,7 @@ void Slave::runTask(const FrameworkInfo&
       TaskStatus* status = update->mutable_status();
       status->mutable_task_id()->MergeFrom(task.task_id());
       status->set_state(TASK_LOST);
-      update->set_timestamp(elapsedTime());
+      update->set_timestamp(Clock::now());
       update->set_uuid(UUID::random().toBytes());
       send(master, message);
     } else if (!executor->pid) {
@@ -505,7 +498,7 @@ void Slave::killTask(const FrameworkID& 
     TaskStatus* status = update->mutable_status();
     status->mutable_task_id()->MergeFrom(taskId);
     status->set_state(TASK_LOST);
-    update->set_timestamp(elapsedTime());
+    update->set_timestamp(Clock::now());
     update->set_uuid(UUID::random().toBytes());
     send(master, message);
 
@@ -528,7 +521,7 @@ void Slave::killTask(const FrameworkID& 
     TaskStatus* status = update->mutable_status();
     status->mutable_task_id()->MergeFrom(taskId);
     status->set_state(TASK_LOST);
-    update->set_timestamp(elapsedTime());
+    update->set_timestamp(Clock::now());
     update->set_uuid(UUID::random().toBytes());
     send(master, message);
   } else if (!executor->pid) {
@@ -548,7 +541,7 @@ void Slave::killTask(const FrameworkID& 
     TaskStatus* status = update->mutable_status();
     status->mutable_task_id()->MergeFrom(taskId);
     status->set_state(TASK_KILLED);
-    update->set_timestamp(elapsedTime());
+    update->set_timestamp(Clock::now());
     update->set_uuid(UUID::random().toBytes());
     send(master, message);
   } else {
@@ -717,7 +710,7 @@ void Slave::statusUpdateAcknowledgement(
 //       message.set_reliable(true);
 //       send(master, message);
 
-//       stream->timeout = elapsedTime() + STATUS_UPDATE_RETRY_INTERVAL;
+//       stream->timeout = Clock::now() + STATUS_UPDATE_RETRY_INTERVAL;
 //     }
 //   }
 // }
@@ -735,7 +728,7 @@ void Slave::registerExecutor(const Frame
     LOG(WARNING) << "Framework " << frameworkId
                  << " does not exist (it may have been killed),"
                  << " telling executor to exit";
-    send(from(), ShutdownExecutorMessage());
+    reply(ShutdownExecutorMessage());
     return;
   }
 
@@ -745,15 +738,15 @@ void Slave::registerExecutor(const Frame
   if (executor == NULL) {
     LOG(WARNING) << "WARNING! Unexpected executor '" << executorId
                  << "' registering for framework " << frameworkId;
-    send(from(), ShutdownExecutorMessage());
+    reply(ShutdownExecutorMessage());
   } else if (executor->pid) {
     LOG(WARNING) << "WARNING! executor '" << executorId
                  << "' of framework " << frameworkId
                  << " is already running";
-    send(from(), ShutdownExecutorMessage());
+    reply(ShutdownExecutorMessage());
   } else {
     // Save the pid for the executor.
-    executor->pid = from();
+    executor->pid = from;
 
     // First account for the tasks we're about to start.
     foreachvalue (const TaskDescription& task, executor->queuedTasks) {
@@ -911,7 +904,7 @@ void Slave::registerExecutor(const Frame
 //     message.set_reliable(true);
 //     send(master, message);
 
-//     stream->timeout = elapsedTime() + STATUS_UPDATE_RETRY_INTERVAL;
+//     stream->timeout = Clock::now() + STATUS_UPDATE_RETRY_INTERVAL;
 //   }
 
 //   stats.tasks[status.state()]++;
@@ -1003,9 +996,9 @@ void Slave::executorMessage(const SlaveI
 }
 
 
-void Slave::ping()
+void Slave::ping(const UPID& from, const string& body)
 {
-  send(from(), "PONG");
+  send(from, "PONG");
 }
 
 
@@ -1035,7 +1028,7 @@ void Slave::statusUpdateTimeout(
 // void Slave::timeout()
 // {
 //   // Check and see if we should re-send any status updates.
-//   double now = elapsedTime();
+//   double now = Clock::now();
 
 //   foreachvalue (StatusUpdateStream* stream, statusUpdateStreams) {
 //     CHECK(stream->timeout > 0);
@@ -1058,11 +1051,11 @@ void Slave::statusUpdateTimeout(
 // }
 
 
-void Slave::exited()
+void Slave::exited(const UPID& pid)
 {
-  LOG(INFO) << "Process exited: " << from();
+  LOG(INFO) << "Process exited: " << from;
 
-  if (from() == master) {
+  if (master == pid) {
     LOG(WARNING) << "WARNING! Master disconnected!"
                  << " Waiting for a new master to be elected.";
     // TODO(benh): After so long waiting for a master, commit suicide.
@@ -1155,7 +1148,7 @@ Framework* Slave::getFramework(const Fra
 //     message.set_reliable(true);
 //     send(master, message);
 
-//     stream->timeout = elapsedTime() + STATUS_UPDATE_RETRY_INTERVAL;
+//     stream->timeout = Clock::now() + STATUS_UPDATE_RETRY_INTERVAL;
 //   }
 
 //   return stream;
@@ -1417,27 +1410,21 @@ string Slave::createUniqueWorkDirectory(
   // framework on this slave).
   out << "/runs/";
 
-  string dir;
-  dir = out.str();
+  const string& prefix = out.str();
 
   for (int i = 0; i < INT_MAX; i++) {
     out << i;
-    DIR* d = opendir(out.str().c_str());
-    if (d == NULL && errno == ENOENT) {
-      break;
+    VLOG(1) << "Checking if " << out.str() << " already exists";
+    if (!utils::os::exists(out.str())) {
+      bool created = utils::os::mkdir(out.str());
+      CHECK(created) << "Error creating work directory: " << out.str();
+      return out.str();
+    } else {
+      out.str(prefix); // Try with prefix again.
     }
-    closedir(d);
-    out.str(dir);
   }
-
-  dir = out.str();
-
-  bool created = utils::os::mkdir(out.str());
-
-  CHECK(created) << "Error creating work directory: " << dir;
-
-  return dir;
 }
 
-
-}}} // namespace mesos { namespace internal { namespace slave {
+} // namespace slave {
+} // namespace internal {
+} // namespace mesos {

Modified: incubator/mesos/trunk/src/slave/slave.hpp
URL: http://svn.apache.org/viewvc/incubator/mesos/trunk/src/slave/slave.hpp?rev=1236485&r1=1236484&r2=1236485&view=diff
==============================================================================
--- incubator/mesos/trunk/src/slave/slave.hpp (original)
+++ incubator/mesos/trunk/src/slave/slave.hpp Fri Jan 27 01:25:13 2012
@@ -61,6 +61,8 @@ public:
 
   static void registerOptions(Configurator* configurator);
 
+  void shutdown();
+
   void newMasterDetected(const UPID& pid);
   void noMasterDetected();
   void masterDetectionFailure();
@@ -91,8 +93,7 @@ public:
                        const FrameworkID& frameworkId,
                        const ExecutorID& executorId,
                        const std::string& data);
-  void ping();
-  void exited();
+  void ping(const UPID& from, const std::string& body);
 
   void statusUpdateTimeout(const FrameworkID& frameworkId, const UUID& uuid);
 
@@ -105,9 +106,9 @@ public:
                       int status);
 
 protected:
-  virtual void operator () ();
-
-  void initialize();
+  virtual void initialize();
+  virtual void finalize();
+  virtual void exited(const UPID& pid);
 
   // Helper routine to lookup a framework.
   Framework* getFramework(const FrameworkID& frameworkId);
@@ -141,15 +142,15 @@ private:
   // Http handlers, friends of the slave in order to access state,
   // they get invoked from within the slave so there is no need to
   // use synchronization mechanisms to protect state.
-  friend Promise<HttpResponse> http::vars(
+  friend Future<HttpResponse> http::vars(
       const Slave& slave,
       const HttpRequest& request);
 
-  friend Promise<HttpResponse> http::json::stats(
+  friend Future<HttpResponse> http::json::stats(
       const Slave& slave,
       const HttpRequest& request);
 
-  friend Promise<HttpResponse> http::json::state(
+  friend Future<HttpResponse> http::json::state(
       const Slave& slave,
       const HttpRequest& request);
 

Modified: incubator/mesos/trunk/src/tests/exception_tests.cpp
URL: http://svn.apache.org/viewvc/incubator/mesos/trunk/src/tests/exception_tests.cpp?rev=1236485&r1=1236484&r2=1236485&view=diff
==============================================================================
--- incubator/mesos/trunk/src/tests/exception_tests.cpp (original)
+++ incubator/mesos/trunk/src/tests/exception_tests.cpp Fri Jan 27 01:25:13 2012
@@ -57,7 +57,6 @@ using testing::Eq;
 using testing::Not;
 using testing::Return;
 using testing::SaveArg;
-using testing::SaveArgPointee;
 
 
 TEST(ExceptionTest, AbortOnFrameworkError)
@@ -109,7 +108,7 @@ TEST(ExceptionTest, DeactiveFrameworkOnA
   MockFilter filter;
   process::filter(&filter);
 
-  EXPECT_MSG(filter, _, _, _)
+  EXPECT_MESSAGE(filter, _, _, _)
     .WillRepeatedly(Return(false));
 
   PID<Master> master = local::launch(1, 2, 1 * Gigabyte, false);
@@ -131,7 +130,7 @@ TEST(ExceptionTest, DeactiveFrameworkOnA
 
   trigger deactivateMsg;
 
-  EXPECT_MSG(filter, Eq(DeactivateFrameworkMessage().GetTypeName()), _, _)
+  EXPECT_MESSAGE(filter, Eq(DeactivateFrameworkMessage().GetTypeName()), _, _)
     .WillOnce(DoAll(Trigger(&deactivateMsg), Return(false)));
 
   driver.start();
@@ -197,7 +196,7 @@ TEST(ExceptionTest, DisallowSchedulerCal
   MockFilter filter;
   process::filter(&filter);
 
-  EXPECT_MSG(filter, _, _, _)
+  EXPECT_MESSAGE(filter, _, _, _)
     .WillRepeatedly(Return(false));
 
   PID<Master> master = local::launch(1, 2, 1 * Gigabyte, false);
@@ -236,13 +235,14 @@ TEST(ExceptionTest, DisallowSchedulerCal
   process::Message message;
   trigger rescindMsg, unregisterMsg;
 
-  EXPECT_MSG(filter, Eq(FrameworkRegisteredMessage().GetTypeName()), _, _)
-    .WillOnce(DoAll(SaveArgPointee<0>(&message),Return(false)));
+  EXPECT_MESSAGE(filter, Eq(FrameworkRegisteredMessage().GetTypeName()), _, _)
+    .WillOnce(DoAll(SaveArgField<0>(&process::MessageEvent::message, &message),
+                    Return(false)));
 
-  EXPECT_MSG(filter, Eq(RescindResourceOfferMessage().GetTypeName()), _, _)
+  EXPECT_MESSAGE(filter, Eq(RescindResourceOfferMessage().GetTypeName()), _, _)
     .WillOnce(Trigger(&rescindMsg));
 
-  EXPECT_MSG(filter, Eq(UnregisterFrameworkMessage().GetTypeName()), _, _)
+  EXPECT_MESSAGE(filter, Eq(UnregisterFrameworkMessage().GetTypeName()), _, _)
       .WillOnce(Trigger(&unregisterMsg));
 
   driver.start();

Modified: incubator/mesos/trunk/src/tests/fault_tolerance_tests.cpp
URL: http://svn.apache.org/viewvc/incubator/mesos/trunk/src/tests/fault_tolerance_tests.cpp?rev=1236485&r1=1236484&r2=1236485&view=diff
==============================================================================
--- incubator/mesos/trunk/src/tests/fault_tolerance_tests.cpp (original)
+++ incubator/mesos/trunk/src/tests/fault_tolerance_tests.cpp Fri Jan 27 01:25:13 2012
@@ -44,6 +44,7 @@ using mesos::internal::slave::ProcessBas
 using mesos::internal::slave::Slave;
 using mesos::internal::slave::STATUS_UPDATE_RETRY_INTERVAL_SECONDS;
 
+using process::Clock;
 using process::PID;
 
 using std::string;
@@ -59,7 +60,6 @@ using testing::Eq;
 using testing::Not;
 using testing::Return;
 using testing::SaveArg;
-using testing::SaveArgPointee;
 
 
 TEST(FaultToleranceTest, SlaveLost)
@@ -108,7 +108,7 @@ TEST(FaultToleranceTest, SlaveLost)
   EXPECT_CALL(sched, slaveLost(&driver, offers[0].slave_id()))
     .WillOnce(Trigger(&slaveLostCall));
 
-  process::post(slave, process::TERMINATE);
+  process::terminate(slave);
 
   WAIT_UNTIL(offerRescindedCall);
   WAIT_UNTIL(slaveLostCall);
@@ -118,7 +118,7 @@ TEST(FaultToleranceTest, SlaveLost)
 
   process::wait(slave);
 
-  process::post(master, process::TERMINATE);
+  process::terminate(master);
   process::wait(master);
 }
 
@@ -127,12 +127,12 @@ TEST(FaultToleranceTest, SlavePartitione
 {
   ASSERT_TRUE(GTEST_IS_THREADSAFE);
 
-  process::Clock::pause();
+  Clock::pause();
 
   MockFilter filter;
   process::filter(&filter);
 
-  EXPECT_MSG(filter, _, _, _)
+  EXPECT_MESSAGE(filter, _, _, _)
     .WillRepeatedly(Return(false));
 
   PID<Master> master = local::launch(1, 2, 1 * Gigabyte, false);
@@ -140,7 +140,7 @@ TEST(FaultToleranceTest, SlavePartitione
   MockScheduler sched;
   MesosSchedulerDriver driver(&sched, "", DEFAULT_EXECUTOR_INFO, master);
 
-  trigger slaveLostCall;
+  trigger slaveLostCall, pingMsg;
 
   EXPECT_CALL(sched, registered(&driver, _))
     .Times(1);
@@ -154,14 +154,20 @@ TEST(FaultToleranceTest, SlavePartitione
   EXPECT_CALL(sched, slaveLost(&driver, _))
     .WillOnce(Trigger(&slaveLostCall));
 
-  EXPECT_MSG(filter, Eq("PONG"), _, _)
+  EXPECT_MESSAGE(filter, Eq("PING"), _, _)
+    .WillRepeatedly(DoAll(Trigger(&pingMsg),
+                          Return(false)));
+
+  EXPECT_MESSAGE(filter, Eq("PONG"), _, _)
     .WillRepeatedly(Return(true));
 
   driver.start();
 
+  WAIT_UNTIL(pingMsg);
+
   double secs = master::SLAVE_PONG_TIMEOUT * master::MAX_SLAVE_TIMEOUTS;
 
-  process::Clock::advance(secs);
+  Clock::advance(secs);
 
   WAIT_UNTIL(slaveLostCall);
 
@@ -172,7 +178,7 @@ TEST(FaultToleranceTest, SlavePartitione
 
   process::filter(NULL);
 
-  process::Clock::resume();
+  Clock::resume();
 }
 
 
@@ -249,7 +255,7 @@ TEST(FaultToleranceTest, FrameworkReliab
   MockFilter filter;
   process::filter(&filter);
 
-  EXPECT_MSG(filter, _, _, _)
+  EXPECT_MESSAGE(filter, _, _, _)
     .WillRepeatedly(Return(false));
 
   PID<Master> master = local::launch(1, 2, 1 * Gigabyte, false);
@@ -270,7 +276,7 @@ TEST(FaultToleranceTest, FrameworkReliab
 
   // Drop the registered message, in the first attempt, but allow subsequent
   // tries.
-  EXPECT_MSG(filter, Eq(FrameworkRegisteredMessage().GetTypeName()), _, _)
+  EXPECT_MESSAGE(filter, Eq(FrameworkRegisteredMessage().GetTypeName()), _, _)
     .WillOnce(Return(true))
     .WillRepeatedly(Return(false));
 
@@ -294,7 +300,7 @@ TEST(FaultToleranceTest, FrameworkReregi
   MockFilter filter;
   process::filter(&filter);
 
-  EXPECT_MSG(filter, _, _, _)
+  EXPECT_MESSAGE(filter, _, _, _)
     .WillRepeatedly(Return(false));
 
   PID<Master> master = local::launch(1, 2, 1 * Gigabyte, false);
@@ -316,11 +322,11 @@ TEST(FaultToleranceTest, FrameworkReregi
   process::Message message;
   trigger schedReregisteredMsg;
 
-  EXPECT_MSG(filter, Eq(FrameworkRegisteredMessage().GetTypeName()), _, _)
-    .WillOnce(DoAll(SaveArgPointee<0>(&message),
+  EXPECT_MESSAGE(filter, Eq(FrameworkRegisteredMessage().GetTypeName()), _, _)
+    .WillOnce(DoAll(SaveArgField<0>(&process::MessageEvent::message, &message),
                     Return(false)));
 
-  EXPECT_MSG(filter, Eq(FrameworkReregisteredMessage().GetTypeName()), _, _)
+  EXPECT_MESSAGE(filter, Eq(FrameworkReregisteredMessage().GetTypeName()), _, _)
     .WillOnce(DoAll(Trigger(&schedReregisteredMsg),
                     Return(false)));
 
@@ -358,7 +364,7 @@ TEST(FaultToleranceTest, DISABLED_TaskLo
   MockFilter filter;
   process::filter(&filter);
 
-  EXPECT_MSG(filter, _, _, _)
+  EXPECT_MESSAGE(filter, _, _, _)
     .WillRepeatedly(Return(false));
 
   SimpleAllocator a;
@@ -410,8 +416,8 @@ TEST(FaultToleranceTest, DISABLED_TaskLo
 
   process::Message message;
 
-  EXPECT_MSG(filter, Eq(FrameworkRegisteredMessage().GetTypeName()), _, _)
-    .WillOnce(DoAll(SaveArgPointee<0>(&message),
+  EXPECT_MESSAGE(filter, Eq(FrameworkRegisteredMessage().GetTypeName()), _, _)
+    .WillOnce(DoAll(SaveArgField<0>(&process::MessageEvent::message, &message),
                     Return(false)));
 
   driver.start();
@@ -442,10 +448,10 @@ TEST(FaultToleranceTest, DISABLED_TaskLo
   driver.stop();
   driver.join();
 
-  process::post(slave, process::TERMINATE);
+  process::terminate(slave);
   process::wait(slave);
 
-  process::post(master, process::TERMINATE);
+  process::terminate(master);
   process::wait(master);
 
   process::filter(NULL);
@@ -456,12 +462,12 @@ TEST(FaultToleranceTest, SchedulerFailov
 {
   ASSERT_TRUE(GTEST_IS_THREADSAFE);
 
-  process::Clock::pause();
+  Clock::pause();
 
   MockFilter filter;
   process::filter(&filter);
 
-  EXPECT_MSG(filter, _, _, _)
+  EXPECT_MESSAGE(filter, _, _, _)
     .WillRepeatedly(Return(false));
 
   SimpleAllocator a;
@@ -516,7 +522,7 @@ TEST(FaultToleranceTest, SchedulerFailov
   EXPECT_CALL(sched1, error(&driver1, _, "Framework failover"))
     .Times(1);
 
-  EXPECT_MSG(filter, Eq(StatusUpdateMessage().GetTypeName()), _,
+  EXPECT_MESSAGE(filter, Eq(StatusUpdateMessage().GetTypeName()), _,
              Not(AnyOf(Eq(master), Eq(slave))))
     .WillOnce(DoAll(Trigger(&statusUpdateMsg), Return(true)))
     .RetiresOnSaturation();
@@ -561,7 +567,7 @@ TEST(FaultToleranceTest, SchedulerFailov
 
   WAIT_UNTIL(registeredCall);
 
-  process::Clock::advance(STATUS_UPDATE_RETRY_INTERVAL_SECONDS);
+  Clock::advance(STATUS_UPDATE_RETRY_INTERVAL_SECONDS);
 
   WAIT_UNTIL(statusUpdateCall);
 
@@ -571,15 +577,15 @@ TEST(FaultToleranceTest, SchedulerFailov
   driver1.join();
   driver2.join();
 
-  process::post(slave, process::TERMINATE);
+  process::terminate(slave);
   process::wait(slave);
 
-  process::post(master, process::TERMINATE);
+  process::terminate(master);
   process::wait(master);
 
   process::filter(NULL);
 
-  process::Clock::resume();
+  Clock::resume();
 }
 
 
@@ -686,10 +692,10 @@ TEST(FaultToleranceTest, SchedulerFailov
   driver1.join();
   driver2.join();
 
-  process::post(slave, process::TERMINATE);
+  process::terminate(slave);
   process::wait(slave);
 
-  process::post(master, process::TERMINATE);
+  process::terminate(master);
   process::wait(master);
 }
 
@@ -701,7 +707,7 @@ TEST(FaultToleranceTest, SlaveReliableRe
   MockFilter filter;
   process::filter(&filter);
 
-  EXPECT_MSG(filter, _, _, _)
+  EXPECT_MESSAGE(filter, _, _, _)
     .WillRepeatedly(Return(false));
 
   SimpleAllocator a;
@@ -732,7 +738,7 @@ TEST(FaultToleranceTest, SlaveReliableRe
   trigger slaveRegisterMsg;
 
   // Drop the first registered message, but allow subsequent messages.
-  EXPECT_MSG(filter, Eq(SlaveRegisteredMessage().GetTypeName()), _, _)
+  EXPECT_MESSAGE(filter, Eq(SlaveRegisteredMessage().GetTypeName()), _, _)
     .WillOnce(Return(true))
     .WillRepeatedly(DoAll(Trigger(&slaveRegisterMsg), Return(false)));
 
@@ -743,10 +749,10 @@ TEST(FaultToleranceTest, SlaveReliableRe
   driver.stop();
   driver.join();
 
-  process::post(slave, process::TERMINATE);
+  process::terminate(slave);
   process::wait(slave);
 
-  process::post(master, process::TERMINATE);
+  process::terminate(master);
   process::wait(master);
 
   process::filter(NULL);
@@ -760,7 +766,7 @@ TEST(FaultToleranceTest, SlaveReregister
   MockFilter filter;
   process::filter(&filter);
 
-  EXPECT_MSG(filter, _, _, _)
+  EXPECT_MESSAGE(filter, _, _, _)
     .WillRepeatedly(Return(false));
 
   SimpleAllocator a;
@@ -790,7 +796,7 @@ TEST(FaultToleranceTest, SlaveReregister
 
   trigger slaveReRegisterMsg;
 
-  EXPECT_MSG(filter, Eq(SlaveReregisteredMessage().GetTypeName()), _, _)
+  EXPECT_MESSAGE(filter, Eq(SlaveReregisteredMessage().GetTypeName()), _, _)
     .WillOnce(DoAll(Trigger(&slaveReRegisterMsg), Return(false)));
 
   driver.start();
@@ -810,10 +816,10 @@ TEST(FaultToleranceTest, SlaveReregister
   driver.stop();
   driver.join();
 
-  process::post(slave, process::TERMINATE);
+  process::terminate(slave);
   process::wait(slave);
 
-  process::post(master, process::TERMINATE);
+  process::terminate(master);
   process::wait(master);
 
   process::filter(NULL);

Modified: incubator/mesos/trunk/src/tests/log_tests.cpp
URL: http://svn.apache.org/viewvc/incubator/mesos/trunk/src/tests/log_tests.cpp?rev=1236485&r1=1236484&r2=1236485&view=diff
==============================================================================
--- incubator/mesos/trunk/src/tests/log_tests.cpp (original)
+++ incubator/mesos/trunk/src/tests/log_tests.cpp Fri Jan 27 01:25:13 2012
@@ -679,10 +679,10 @@ TEST(CoordinatorTest, NotLearnedFill)
   MockFilter filter;
   process::filter(&filter);
 
-  EXPECT_MSG(filter, _, _, _)
+  EXPECT_MESSAGE(filter, _, _, _)
     .WillRepeatedly(Return(false));
 
-  EXPECT_MSG(filter, Eq(LearnedMessage().GetTypeName()), _, _)
+  EXPECT_MESSAGE(filter, Eq(LearnedMessage().GetTypeName()), _, _)
     .WillRepeatedly(Return(true));
 
   const std::string path1 = utils::os::getcwd() + "/.log1";
@@ -807,10 +807,10 @@ TEST(CoordinatorTest, MultipleAppendsNot
   MockFilter filter;
   process::filter(&filter);
 
-  EXPECT_MSG(filter, _, _, _)
+  EXPECT_MESSAGE(filter, _, _, _)
     .WillRepeatedly(Return(false));
 
-  EXPECT_MSG(filter, Eq(LearnedMessage().GetTypeName()), _, _)
+  EXPECT_MESSAGE(filter, Eq(LearnedMessage().GetTypeName()), _, _)
     .WillRepeatedly(Return(true));
 
   const std::string path1 = utils::os::getcwd() + "/.log1";
@@ -947,10 +947,10 @@ TEST(CoordinatorTest, TruncateNotLearned
   MockFilter filter;
   process::filter(&filter);
 
-  EXPECT_MSG(filter, _, _, _)
+  EXPECT_MESSAGE(filter, _, _, _)
     .WillRepeatedly(Return(false));
 
-  EXPECT_MSG(filter, Eq(LearnedMessage().GetTypeName()), _, _)
+  EXPECT_MESSAGE(filter, Eq(LearnedMessage().GetTypeName()), _, _)
     .WillRepeatedly(Return(true));
 
   const std::string path1 = utils::os::getcwd() + "/.log1";

Modified: incubator/mesos/trunk/src/tests/master_tests.cpp
URL: http://svn.apache.org/viewvc/incubator/mesos/trunk/src/tests/master_tests.cpp?rev=1236485&r1=1236484&r2=1236485&view=diff
==============================================================================
--- incubator/mesos/trunk/src/tests/master_tests.cpp (original)
+++ incubator/mesos/trunk/src/tests/master_tests.cpp Fri Jan 27 01:25:13 2012
@@ -48,9 +48,9 @@ using mesos::internal::master::SimpleAll
 
 using mesos::internal::slave::Slave;
 
-using process::PID;
+using process::Clock;
 using process::Future;
-using process::Promise;
+using process::PID;
 
 using std::string;
 using std::map;
@@ -74,6 +74,8 @@ TEST(MasterTest, TaskRunning)
 
   MockExecutor exec;
 
+  trigger shutdownCall;
+
   EXPECT_CALL(exec, init(_, _))
     .Times(1);
 
@@ -81,7 +83,7 @@ TEST(MasterTest, TaskRunning)
     .WillOnce(SendStatusUpdate(TASK_RUNNING));
 
   EXPECT_CALL(exec, shutdown(_))
-    .Times(AtMost(1));
+    .WillOnce(Trigger(&shutdownCall));
 
   map<ExecutorID, Executor*> execs;
   execs[DEFAULT_EXECUTOR_ID] = &exec;
@@ -144,10 +146,12 @@ TEST(MasterTest, TaskRunning)
   driver.stop();
   driver.join();
 
-  process::post(slave, process::TERMINATE);
+  WAIT_UNTIL(shutdownCall); // To ensure can deallocate MockExecutor.
+
+  process::terminate(slave);
   process::wait(slave);
 
-  process::post(master, process::TERMINATE);
+  process::terminate(master);
   process::wait(master);
 }
 
@@ -162,7 +166,7 @@ TEST(MasterTest, KillTask)
 
   MockExecutor exec;
 
-  trigger killTaskCall;
+  trigger killTaskCall, shutdownCall;
 
   EXPECT_CALL(exec, init(_, _))
     .Times(1);
@@ -174,7 +178,7 @@ TEST(MasterTest, KillTask)
     .WillOnce(Trigger(&killTaskCall));
 
   EXPECT_CALL(exec, shutdown(_))
-    .Times(AtMost(1));
+    .WillOnce(Trigger(&shutdownCall));
 
   map<ExecutorID, Executor*> execs;
   execs[DEFAULT_EXECUTOR_ID] = &exec;
@@ -238,10 +242,12 @@ TEST(MasterTest, KillTask)
   driver.stop();
   driver.join();
 
-  process::post(slave, process::TERMINATE);
+  WAIT_UNTIL(shutdownCall); // To ensure can deallocate MockExecutor.
+
+  process::terminate(slave);
   process::wait(slave);
 
-  process::post(master, process::TERMINATE);
+  process::terminate(master);
   process::wait(master);
 }
 
@@ -260,7 +266,7 @@ TEST(MasterTest, FrameworkMessage)
   ExecutorArgs args;
   string execData;
 
-  trigger execFrameworkMessageCall;
+  trigger execFrameworkMessageCall, shutdownCall;
 
   EXPECT_CALL(exec, init(_, _))
     .WillOnce(DoAll(SaveArg<0>(&execDriver), SaveArg<1>(&args)));
@@ -273,7 +279,7 @@ TEST(MasterTest, FrameworkMessage)
                     Trigger(&execFrameworkMessageCall)));
 
   EXPECT_CALL(exec, shutdown(_))
-    .Times(AtMost(1));
+    .WillOnce(Trigger(&shutdownCall));
 
   map<ExecutorID, Executor*> execs;
   execs[DEFAULT_EXECUTOR_ID] = &exec;
@@ -356,10 +362,12 @@ TEST(MasterTest, FrameworkMessage)
   schedDriver.stop();
   schedDriver.join();
 
-  process::post(slave, process::TERMINATE);
+  WAIT_UNTIL(shutdownCall); // To ensure can deallocate MockExecutor.
+
+  process::terminate(slave);
   process::wait(slave);
 
-  process::post(master, process::TERMINATE);
+  process::terminate(master);
   process::wait(master);
 }
 
@@ -374,7 +382,7 @@ TEST(MasterTest, MultipleExecutors)
 
   MockExecutor exec1;
   TaskDescription exec1Task;
-  trigger exec1LaunchTaskCall;
+  trigger exec1LaunchTaskCall, exec1ShutdownCall;
 
   EXPECT_CALL(exec1, init(_, _))
     .Times(1);
@@ -385,11 +393,11 @@ TEST(MasterTest, MultipleExecutors)
                     SendStatusUpdate(TASK_RUNNING)));
 
   EXPECT_CALL(exec1, shutdown(_))
-    .Times(AtMost(1));
+    .WillOnce(Trigger(&exec1ShutdownCall));
 
   MockExecutor exec2;
   TaskDescription exec2Task;
-  trigger exec2LaunchTaskCall;
+  trigger exec2LaunchTaskCall, exec2ShutdownCall;
 
   EXPECT_CALL(exec2, init(_, _))
     .Times(1);
@@ -400,7 +408,7 @@ TEST(MasterTest, MultipleExecutors)
                     SendStatusUpdate(TASK_RUNNING)));
 
   EXPECT_CALL(exec2, shutdown(_))
-    .Times(AtMost(1));
+    .WillOnce(Trigger(&exec2ShutdownCall));
 
   ExecutorID executorId1;
   executorId1.set_value("executor-1");
@@ -488,10 +496,13 @@ TEST(MasterTest, MultipleExecutors)
   driver.stop();
   driver.join();
 
-  process::post(slave, process::TERMINATE);
+  WAIT_UNTIL(exec1ShutdownCall); // To ensure can deallocate MockExecutor.
+  WAIT_UNTIL(exec2ShutdownCall); // To ensure can deallocate MockExecutor.
+
+  process::terminate(slave);
   process::wait(slave);
 
-  process::post(master, process::TERMINATE);
+  process::terminate(master);
   process::wait(master);
 }
 
@@ -504,10 +515,10 @@ public:
   // We need this typedef because MOCK_METHOD is a macro.
   typedef map<FrameworkID, FrameworkInfo> Map_FrameworkId_FrameworkInfo;
 
-  MOCK_METHOD0(list, Promise<Result<Map_FrameworkId_FrameworkInfo> >());
-  MOCK_METHOD2(add, Promise<Result<bool> >(const FrameworkID&,
-                                           const FrameworkInfo&));
-  MOCK_METHOD1(remove, Promise<Result<bool> >(const FrameworkID&));
+  MOCK_METHOD0(list, Future<Result<Map_FrameworkId_FrameworkInfo> >());
+  MOCK_METHOD2(add, Future<Result<bool> >(const FrameworkID&,
+                                          const FrameworkInfo&));
+  MOCK_METHOD1(remove, Future<Result<bool> >(const FrameworkID&));
 };
 
 
@@ -601,15 +612,17 @@ TEST_F(FrameworksManagerTestFixture, Add
 
 TEST_F(FrameworksManagerTestFixture, RemoveFramework)
 {
+  Clock::pause();
+
   // Remove a non-existent framework.
   FrameworkID id;
   id.set_value("non-existent framework");
 
-  Future<Result<bool> > future =
-    process::dispatch(manager, &FrameworksManager::remove, id, 0);
+  Future<Result<bool> > future1 =
+    process::dispatch(manager, &FrameworksManager::remove, id, seconds(0));
 
-  ASSERT_TRUE(future.await(2.0));
-  EXPECT_TRUE(future.get().isError());
+  ASSERT_TRUE(future1.await(2.0));
+  EXPECT_TRUE(future1.get().isError());
 
   // Remove an existing framework.
 
@@ -630,7 +643,9 @@ TEST_F(FrameworksManagerTestFixture, Rem
 
   // Now remove the added framework.
   Future<Result<bool> > future3 =
-    process::dispatch(manager, &FrameworksManager::remove, id2, 1.0);
+    process::dispatch(manager, &FrameworksManager::remove, id2, seconds(1.0));
+
+  Clock::update(Clock::now(manager) + 1.0);
 
   ASSERT_TRUE(future3.await(2.0));
   EXPECT_TRUE(future2.get().get());
@@ -641,6 +656,8 @@ TEST_F(FrameworksManagerTestFixture, Rem
 
   ASSERT_TRUE(future4.await(2.0));
   EXPECT_FALSE(future4.get().get());
+
+  Clock::resume();
 }
 
 
@@ -650,11 +667,11 @@ TEST_F(FrameworksManagerTestFixture, Res
   FrameworkID id;
   id.set_value("non-existent framework");
 
-  Future<Result<bool> > future =
+  Future<Result<bool> > future1 =
     process::dispatch(manager, &FrameworksManager::resurrect, id);
 
-  ASSERT_TRUE(future.await(2.0));
-  EXPECT_FALSE(future.get().get());
+  ASSERT_TRUE(future1.await(2.0));
+  EXPECT_FALSE(future1.get().get());
 
   // Resurrect an existent framework that is NOT being removed.
   // Add a dummy framework.
@@ -680,9 +697,8 @@ TEST_F(FrameworksManagerTestFixture, Res
 }
 
 
-// NOTE: In the following tests, with paused clocks, future.await() may wait
-// forever. This makes debugging failed tests hard.
-// TODO(vinod) Need better constructs.
+// TODO(vinod): Using a paused clock in the tests means that
+// future.await() may wait forever. This makes debugging hard.
 TEST_F(FrameworksManagerTestFixture, ResurrectExpiringFramework)
 {
   // This is the crucial test.
@@ -698,28 +714,27 @@ TEST_F(FrameworksManagerTestFixture, Res
   info.set_user("test user");
 
   // Add the framework.
-  Future<Result<bool> > future =
-    process::dispatch(manager, &FrameworksManager::add, id, info);
+  process::dispatch(manager, &FrameworksManager::add, id, info);
 
-  process::Clock::pause();
+  Clock::pause();
 
   // Remove after 2 secs.
-  Future<Result<bool> > future2 =
-    process::dispatch(manager, &FrameworksManager::remove, id, 2.0);
+  Future<Result<bool> > future1 =
+    process::dispatch(manager, &FrameworksManager::remove, id, seconds(2.0));
 
   // Resurrect in the meanwhile.
-  Future<Result<bool> > future3 =
+  Future<Result<bool> > future2 =
     process::dispatch(manager, &FrameworksManager::resurrect, id);
 
-  ASSERT_TRUE(future3.await());
-  EXPECT_TRUE(future3.get().get());
+  ASSERT_TRUE(future2.await(2.0));
+  EXPECT_TRUE(future2.get().get());
 
-  process::Clock::advance(2.0);
+  Clock::update(Clock::now(manager) + 2.0);
 
-  ASSERT_TRUE(future2.await());
-  EXPECT_FALSE(future2.get().get());
+  ASSERT_TRUE(future1.await(2.0));
+  EXPECT_FALSE(future1.get().get());
 
-  process::Clock::resume();
+  Clock::resume();
 }
 
 
@@ -738,36 +753,35 @@ TEST_F(FrameworksManagerTestFixture, Res
   info.set_user("test user");
 
   // Add the framework.
-  Future<Result<bool> > future =
-    process::dispatch(manager, &FrameworksManager::add, id, info);
+  process::dispatch(manager, &FrameworksManager::add, id, info);
 
-  process::Clock::pause();
+  Clock::pause();
 
-  Future<Result<bool> > future2 =
-    process::dispatch(manager, &FrameworksManager::remove, id, 2.0);
+  Future<Result<bool> > future1 =
+    process::dispatch(manager, &FrameworksManager::remove, id, seconds(2.0));
 
   // Resurrect in the meanwhile.
-  Future<Result<bool> > future3 =
+  Future<Result<bool> > future2 =
     process::dispatch(manager, &FrameworksManager::resurrect, id);
 
   // Remove again.
-  Future<Result<bool> > future4 =
-    process::dispatch(manager, &FrameworksManager::remove, id, 1.0);
+  Future<Result<bool> > future3 =
+    process::dispatch(manager, &FrameworksManager::remove, id, seconds(1.0));
 
-  ASSERT_TRUE(future3.await());
-  EXPECT_TRUE(future3.get().get());
+  ASSERT_TRUE(future2.await(2.0));
+  EXPECT_TRUE(future2.get().get());
 
-  process::Clock::advance(1.0);
+  Clock::update(Clock::now(manager) + 1.0);
 
-  ASSERT_TRUE(future4.await());
-  EXPECT_TRUE(future4.get().get());
+  ASSERT_TRUE(future3.await(2.0));
+  EXPECT_TRUE(future3.get().get());
 
-  process::Clock::advance(1.0);
+  Clock::update(Clock::now(manager) + 2.0);
 
-  ASSERT_TRUE(future2.await());
-  EXPECT_FALSE(future2.get().get());
+  ASSERT_TRUE(future1.await(2.0));
+  EXPECT_FALSE(future1.get().get());
 
-  process::Clock::resume();
+  Clock::resume();
 }
 
 
@@ -797,12 +811,12 @@ TEST(FrameworksManagerTest, CacheFailure
   process::spawn(manager);
 
   // Test if initially FrameworksManager returns error.
-  Future<Result<map<FrameworkID, FrameworkInfo> > > future =
+  Future<Result<map<FrameworkID, FrameworkInfo> > > future1 =
     process::dispatch(manager, &FrameworksManager::list);
 
-  ASSERT_TRUE(future.await(2.0));
-  ASSERT_TRUE(future.get().isError());
-  EXPECT_EQ(future.get().error(), "Error caching framework infos.");
+  ASSERT_TRUE(future1.await(2.0));
+  ASSERT_TRUE(future1.get().isError());
+  EXPECT_EQ(future1.get().error(), "Error caching framework infos.");
 
   // Add framework should function normally despite caching failure.
   FrameworkID id;
@@ -821,7 +835,7 @@ TEST(FrameworksManagerTest, CacheFailure
 
   // Remove framework should fail due to caching failure.
   Future<Result<bool> > future3 =
-    process::dispatch(manager, &FrameworksManager::remove, id, 0);
+    process::dispatch(manager, &FrameworksManager::remove, id, seconds(0));
 
   ASSERT_TRUE(future3.await(2.0));
   ASSERT_TRUE(future3.get().isError());

Modified: incubator/mesos/trunk/src/tests/utils.hpp
URL: http://svn.apache.org/viewvc/incubator/mesos/trunk/src/tests/utils.hpp?rev=1236485&r1=1236484&r2=1236485&view=diff
==============================================================================
--- incubator/mesos/trunk/src/tests/utils.hpp (original)
+++ incubator/mesos/trunk/src/tests/utils.hpp Fri Jan 27 01:25:13 2012
@@ -40,7 +40,9 @@
 #include "slave/slave.hpp"
 
 
-namespace mesos { namespace internal { namespace test {
+namespace mesos {
+namespace internal {
+namespace test {
 
 /**
  * The location of the Mesos source directory.  Used by tests to locate
@@ -161,7 +163,22 @@ public:
 class MockFilter : public process::Filter
 {
 public:
-  MOCK_METHOD1(filter, bool(process::Message *));
+  MockFilter()
+  {
+    EXPECT_CALL(*this, filter(testing::A<const process::MessageEvent&>()))
+      .WillRepeatedly(testing::Return(false));
+    EXPECT_CALL(*this, filter(testing::A<const process::DispatchEvent&>()))
+      .WillRepeatedly(testing::Return(false));
+    EXPECT_CALL(*this, filter(testing::A<const process::HttpEvent&>()))
+      .WillRepeatedly(testing::Return(false));
+    EXPECT_CALL(*this, filter(testing::A<const process::ExitedEvent&>()))
+      .WillRepeatedly(testing::Return(false));
+  }
+
+  MOCK_METHOD1(filter, bool(const process::MessageEvent&));
+  MOCK_METHOD1(filter, bool(const process::DispatchEvent&));
+  MOCK_METHOD1(filter, bool(const process::HttpEvent&));
+  MOCK_METHOD1(filter, bool(const process::ExitedEvent&));
 };
 
 
@@ -171,19 +188,29 @@ public:
  */
 MATCHER_P3(MsgMatcher, name, from, to, "")
 {
-  return (testing::Matcher<std::string>(name).Matches(arg->name) &&
-          testing::Matcher<process::UPID>(from).Matches(arg->from) &&
-          testing::Matcher<process::UPID>(to).Matches(arg->to));
+  const process::MessageEvent& event = ::std::tr1::get<0>(arg);
+  return (testing::Matcher<std::string>(name).Matches(event.message->name) &&
+          testing::Matcher<process::UPID>(from).Matches(event.message->from) &&
+          testing::Matcher<process::UPID>(to).Matches(event.message->to));
 }
 
 
 /**
  * This macro provides some syntactic sugar for matching messages
  * using the message matcher (see above) as well as the MockFilter
- * (see above).
+ * (see above). We should also add EXPECT_DISPATCH, EXPECT_HTTP, etc.
  */
-#define EXPECT_MSG(mockFilter, name, from, to)                  \
-  EXPECT_CALL(mockFilter, filter(MsgMatcher(name, from, to)))
+#define EXPECT_MESSAGE(mockFilter, name, from, to)              \
+  EXPECT_CALL(mockFilter, filter(testing::A<const process::MessageEvent&>())) \
+    .With(MsgMatcher(name, from, to))
+
+
+ACTION_TEMPLATE(SaveArgField,
+                HAS_1_TEMPLATE_PARAMS(int, k),
+                AND_2_VALUE_PARAMS(field, pointer))
+{
+  *pointer = *(::std::tr1::get<k>(args).*field);
+}
 
 
 /**
@@ -317,7 +344,8 @@ private:
   process::PID<slave::Slave> slave;
 };
 
-}}} // namespace mesos { namespace internal { namespace test {
-
+} // namespace test {
+} // namespace internal {
+} // namespace mesos {
 
 #endif // __TESTING_UTILS_HPP__

Modified: incubator/mesos/trunk/src/zookeeper/group.cpp
URL: http://svn.apache.org/viewvc/incubator/mesos/trunk/src/zookeeper/group.cpp?rev=1236485&r1=1236484&r2=1236485&view=diff
==============================================================================
--- incubator/mesos/trunk/src/zookeeper/group.cpp (original)
+++ incubator/mesos/trunk/src/zookeeper/group.cpp Fri Jan 27 01:25:13 2012
@@ -47,12 +47,12 @@ public:
   void initialize();
 
   // Group implementation.
-  Promise<Group::Membership> join(const string& info);
-  Promise<bool> cancel(const Group::Membership& membership);
-  Promise<string> info(const Group::Membership& membership);
-  Promise<set<Group::Membership> > watch(
+  Future<Group::Membership> join(const string& info);
+  Future<bool> cancel(const Group::Membership& membership);
+  Future<string> info(const Group::Membership& membership);
+  Future<set<Group::Membership> > watch(
       const set<Group::Membership>& expected);
-  Promise<Option<int64_t> > session();
+  Future<Option<int64_t> > session();
 
   // ZooKeeper events.
   void connected(bool reconnect);
@@ -137,10 +137,10 @@ private:
   };
 
   struct {
-    queue<Join> joins;
-    queue<Cancel> cancels;
-    queue<Info> infos;
-    queue<Watch> watches;
+    queue<Join*> joins;
+    queue<Cancel*> cancels;
+    queue<Info*> infos;
+    queue<Watch*> watches;
   } pending;
 
   bool retrying;
@@ -185,16 +185,16 @@ void GroupProcess::initialize()
 }
 
 
-Promise<Group::Membership> GroupProcess::join(const string& info)
+Future<Group::Membership> GroupProcess::join(const string& info)
 {
   if (error.isSome()) {
     Promise<Group::Membership> promise;
     promise.fail(error.get());
-    return promise;
+    return promise.future();
   } else if (state != CONNECTED) {
-    Join join(info);
+    Join* join = new Join(info);
     pending.joins.push(join);
-    return join.promise;
+    return join->promise.future();
   }
 
   // TODO(benh): Write a test to see how ZooKeeper fails setting znode
@@ -212,13 +212,13 @@ Promise<Group::Membership> GroupProcess:
       delay(RETRY_SECONDS, self(), &GroupProcess::retry, RETRY_SECONDS);
       retrying = true;
     }
-    Join join(info);
+    Join* join = new Join(info);
     pending.joins.push(join);
-    return join.promise;
+    return join->promise.future();
   } else if (membership.isError()) {
     Promise<Group::Membership> promise;
     promise.fail(membership.error());
-    return promise;
+    return promise.future();
   }
 
   owned.insert(make_pair(membership.get(), info));
@@ -227,20 +227,20 @@ Promise<Group::Membership> GroupProcess:
 }
 
 
-Promise<bool> GroupProcess::cancel(const Group::Membership& membership)
+Future<bool> GroupProcess::cancel(const Group::Membership& membership)
 {
   if (error.isSome()) {
     Promise<bool> promise;
     promise.fail(error.get());
-    return promise;
+    return promise.future();
   } else if (owned.count(membership) == 0) {
     return false; // TODO(benh): Should this be an error?
   }
 
   if (state != CONNECTED) {
-    Cancel cancel(membership);
+    Cancel* cancel = new Cancel(membership);
     pending.cancels.push(cancel);
-    return cancel.promise;
+    return cancel->promise.future();
   }
 
   // TODO(benh): Only attempt if the pending queue is empty so that a
@@ -254,29 +254,29 @@ Promise<bool> GroupProcess::cancel(const
       delay(RETRY_SECONDS, self(), &GroupProcess::retry, RETRY_SECONDS);
       retrying = true;
     }
-    Cancel cancel(membership);
+    Cancel* cancel = new Cancel(membership);
     pending.cancels.push(cancel);
-    return cancel.promise;
+    return cancel->promise.future();
   } else if (cancellation.isError()) {
     Promise<bool> promise;
     promise.fail(cancellation.error());
-    return promise;
+    return promise.future();
   }
 
   return cancellation.get();
 }
 
 
-Promise<string> GroupProcess::info(const Group::Membership& membership)
+Future<string> GroupProcess::info(const Group::Membership& membership)
 {
   if (error.isSome()) {
     Promise<string> promise;
     promise.fail(error.get());
-    return promise;
+    return promise.future();
   } else if (state != CONNECTED) {
-    Info info(membership);
+    Info* info = new Info(membership);
     pending.infos.push(info);
-    return info.promise;
+    return info->promise.future();
   }
 
   // TODO(benh): Only attempt if the pending queue is empty so that a
@@ -286,30 +286,30 @@ Promise<string> GroupProcess::info(const
   Result<string> result = doInfo(membership);
 
   if (result.isNone()) { // Try again later.
-    Info info(membership);
+    Info* info = new Info(membership);
     pending.infos.push(info);
-    return info.promise;
+    return info->promise.future();
   } else if (result.isError()) {
     Promise<string> promise;
     promise.fail(result.error());
-    return promise;
+    return promise.future();
   }
 
   return result.get();
 }
 
 
-Promise<set<Group::Membership> > GroupProcess::watch(
+Future<set<Group::Membership> > GroupProcess::watch(
     const set<Group::Membership>& expected)
 {
   if (error.isSome()) {
     Promise<set<Group::Membership> > promise;
     promise.fail(error.get());
-    return promise;
+    return promise.future();
   } else if (state != CONNECTED) {
-    Watch watch(expected);
+    Watch* watch = new Watch(expected);
     pending.watches.push(watch);
-    return watch.promise;
+    return watch->promise.future();
   }
 
   // To guarantee causality, we must invalidate our cache of
@@ -330,25 +330,25 @@ Promise<set<Group::Membership> > GroupPr
       delay(RETRY_SECONDS, self(), &GroupProcess::retry, RETRY_SECONDS);
       retrying = true;
     }
-    Watch watch(expected);
+    Watch* watch = new Watch(expected);
     pending.watches.push(watch);
-    return watch.promise;
+    return watch->promise.future();
   } else if (memberships.get() == expected) { // Just wait for updates.
-    Watch watch(expected);
+    Watch* watch = new Watch(expected);
     pending.watches.push(watch);
-    return watch.promise;
+    return watch->promise.future();
   }
 
   return memberships.get();
 }
 
 
-Promise<Option<int64_t> > GroupProcess::session()
+Future<Option<int64_t> > GroupProcess::session()
 {
   if (error.isSome()) {
     Promise<Option<int64_t> > promise;
     promise.fail(error.get());
-    return promise;
+    return promise.future();
   } else if (state != CONNECTED) {
     return Option<int64_t>::none();
   }
@@ -622,12 +622,14 @@ void GroupProcess::update()
   CHECK(memberships.isSome());
   size_t size = pending.watches.size();
   for (int i = 0; i < size; i++) {
-    if (memberships.get() != pending.watches.front().expected) {
-      pending.watches.front().promise.set(memberships.get());
+    if (memberships.get() != pending.watches.front()->expected) {
+      pending.watches.front()->promise.set(memberships.get());
     } else {
       pending.watches.push(pending.watches.front());
     }
+    Watch* watch = pending.watches.front();
     pending.watches.pop();
+    delete watch;
   }
 }
 
@@ -639,43 +641,49 @@ bool GroupProcess::sync()
 
   // Do joins.
   while (!pending.joins.empty()) {
-    Result<Group::Membership> membership = doJoin(pending.joins.front().info);
+    Result<Group::Membership> membership = doJoin(pending.joins.front()->info);
     if (membership.isNone()) {
       return false; // Try again later.
     } else if (membership.isError()) {
-      pending.joins.front().promise.fail(membership.error());
+      pending.joins.front()->promise.fail(membership.error());
     } else {
-      owned.insert(make_pair(membership.get(), pending.joins.front().info));
-      pending.joins.front().promise.set(membership.get());
+      owned.insert(make_pair(membership.get(), pending.joins.front()->info));
+      pending.joins.front()->promise.set(membership.get());
     }
+    Join* join = pending.joins.front();
     pending.joins.pop();
+    delete join;
   }
 
   // Do cancels.
   while (!pending.cancels.empty()) {
-    Result<bool> cancellation = doCancel(pending.cancels.front().membership);
+    Result<bool> cancellation = doCancel(pending.cancels.front()->membership);
     if (cancellation.isNone()) {
       return false; // Try again later.
     } else if (cancellation.isError()) {
-      pending.cancels.front().promise.fail(cancellation.error());
+      pending.cancels.front()->promise.fail(cancellation.error());
     } else {
-      pending.cancels.front().promise.set(cancellation.get());
+      pending.cancels.front()->promise.set(cancellation.get());
     }
+    Cancel* cancel = pending.cancels.front();
     pending.cancels.pop();
+    delete cancel;
   }
 
   // Do infos.
   while (!pending.infos.empty()) {
     // TODO(benh): Ignore if future has been discarded?
-    Result<string> result = doInfo(pending.infos.front().membership);
+    Result<string> result = doInfo(pending.infos.front()->membership);
     if (result.isNone()) {
       return false; // Try again later.
     } else if (result.isError()) {
-      pending.infos.front().promise.fail(result.error());
+      pending.infos.front()->promise.fail(result.error());
     } else {
-      pending.infos.front().promise.set(result.get());
+      pending.infos.front()->promise.set(result.get());
     }
+    Info* info = pending.infos.front();
     pending.infos.pop();
+    delete info;
   }
 
   // Get cache of memberships if we don't have one.
@@ -708,11 +716,13 @@ void GroupProcess::retry(double seconds)
 
 
 template <typename T>
-void fail(queue<T>* queue, const string& message)
+void fail(queue<T*>* queue, const string& message)
 {
   while (!queue->empty()) {
-    queue->front().promise.fail(message);
+    T* t = queue->front();
     queue->pop();
+    t->promise.fail(message);
+    delete t;
   }
 }