You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@mesos.apache.org by be...@apache.org on 2012/01/27 02:25:15 UTC

svn commit: r1236485 [1/7] - in /incubator/mesos/trunk: ./ include/mesos/ src/common/ src/exec/ src/local/ src/log/ src/master/ src/python/native/ src/sched/ src/slave/ src/tests/ src/zookeeper/ third_party/libprocess/ third_party/libprocess/include/pr...

Author: benh
Date: Fri Jan 27 01:25:13 2012
New Revision: 1236485

URL: http://svn.apache.org/viewvc?rev=1236485&view=rev
Log:
Updating libprocess to support Mac OS X Lion (10.7) and corresponding Mesos changes.

Added:
    incubator/mesos/trunk/third_party/libprocess/include/process/clock.hpp
    incubator/mesos/trunk/third_party/libprocess/include/process/defer.hpp
    incubator/mesos/trunk/third_party/libprocess/include/process/deferred.hpp
    incubator/mesos/trunk/third_party/libprocess/include/process/event.hpp
    incubator/mesos/trunk/third_party/libprocess/include/process/executor.hpp
      - copied, changed from r1235899, incubator/mesos/trunk/third_party/libprocess/include/process/async.hpp
    incubator/mesos/trunk/third_party/libprocess/include/process/filter.hpp
    incubator/mesos/trunk/third_party/libprocess/include/process/message.hpp
    incubator/mesos/trunk/third_party/libprocess/include/process/preprocessor.hpp
    incubator/mesos/trunk/third_party/libprocess/src/thread.hpp
Removed:
    incubator/mesos/trunk/third_party/libprocess/include/process/async.hpp
Modified:
    incubator/mesos/trunk/configure.ac
    incubator/mesos/trunk/include/mesos/mesos.proto
    incubator/mesos/trunk/include/mesos/scheduler.hpp
    incubator/mesos/trunk/src/common/lock.cpp
    incubator/mesos/trunk/src/common/lock.hpp
    incubator/mesos/trunk/src/common/type_utils.hpp
    incubator/mesos/trunk/src/exec/exec.cpp
    incubator/mesos/trunk/src/local/local.cpp
    incubator/mesos/trunk/src/log/coordinator.cpp
    incubator/mesos/trunk/src/log/log.cpp
    incubator/mesos/trunk/src/log/log.hpp
    incubator/mesos/trunk/src/log/network.hpp
    incubator/mesos/trunk/src/log/replica.cpp
    incubator/mesos/trunk/src/master/frameworks_manager.cpp
    incubator/mesos/trunk/src/master/frameworks_manager.hpp
    incubator/mesos/trunk/src/master/http.cpp
    incubator/mesos/trunk/src/master/http.hpp
    incubator/mesos/trunk/src/master/master.cpp
    incubator/mesos/trunk/src/master/master.hpp
    incubator/mesos/trunk/src/master/slaves_manager.cpp
    incubator/mesos/trunk/src/master/slaves_manager.hpp
    incubator/mesos/trunk/src/python/native/mesos_executor_driver_impl.cpp
    incubator/mesos/trunk/src/python/native/mesos_scheduler_driver_impl.cpp
    incubator/mesos/trunk/src/python/native/proxy_scheduler.cpp
    incubator/mesos/trunk/src/sched/sched.cpp
    incubator/mesos/trunk/src/slave/http.cpp
    incubator/mesos/trunk/src/slave/http.hpp
    incubator/mesos/trunk/src/slave/reaper.cpp
    incubator/mesos/trunk/src/slave/reaper.hpp
    incubator/mesos/trunk/src/slave/slave.cpp
    incubator/mesos/trunk/src/slave/slave.hpp
    incubator/mesos/trunk/src/tests/exception_tests.cpp
    incubator/mesos/trunk/src/tests/fault_tolerance_tests.cpp
    incubator/mesos/trunk/src/tests/log_tests.cpp
    incubator/mesos/trunk/src/tests/master_tests.cpp
    incubator/mesos/trunk/src/tests/utils.hpp
    incubator/mesos/trunk/src/zookeeper/group.cpp
    incubator/mesos/trunk/src/zookeeper/zookeeper.cpp
    incubator/mesos/trunk/third_party/libprocess/Makefile.in
    incubator/mesos/trunk/third_party/libprocess/include/process/dispatch.hpp
    incubator/mesos/trunk/third_party/libprocess/include/process/future.hpp
    incubator/mesos/trunk/third_party/libprocess/include/process/gc.hpp
    incubator/mesos/trunk/third_party/libprocess/include/process/latch.hpp
    incubator/mesos/trunk/third_party/libprocess/include/process/process.hpp
    incubator/mesos/trunk/third_party/libprocess/include/process/protobuf.hpp
    incubator/mesos/trunk/third_party/libprocess/include/process/run.hpp
    incubator/mesos/trunk/third_party/libprocess/include/process/timer.hpp
    incubator/mesos/trunk/third_party/libprocess/src/encoder.hpp
    incubator/mesos/trunk/third_party/libprocess/src/foreach.hpp
    incubator/mesos/trunk/third_party/libprocess/src/latch.cpp
    incubator/mesos/trunk/third_party/libprocess/src/process.cpp
    incubator/mesos/trunk/third_party/libprocess/src/tests.cpp
    incubator/mesos/trunk/third_party/libprocess/src/timer.cpp

Modified: incubator/mesos/trunk/configure.ac
URL: http://svn.apache.org/viewvc/incubator/mesos/trunk/configure.ac?rev=1236485&r1=1236484&r2=1236485&view=diff
==============================================================================
--- incubator/mesos/trunk/configure.ac (original)
+++ incubator/mesos/trunk/configure.ac Fri Jan 27 01:25:13 2012
@@ -128,8 +128,6 @@ case "${target_os}" in
     echo Setting up build environment for ${target_cpu} ${target_os}
     echo ===========================================================
     OS_NAME=darwin
-    CFLAGS="$CFLAGS -D_XOPEN_SOURCE"
-    CXXFLAGS="$CXXFLAGS -D_XOPEN_SOURCE"
     ;;
   solaris*)
     echo ===========================================================

Modified: incubator/mesos/trunk/include/mesos/mesos.proto
URL: http://svn.apache.org/viewvc/incubator/mesos/trunk/include/mesos/mesos.proto?rev=1236485&r1=1236484&r2=1236485&view=diff
==============================================================================
--- incubator/mesos/trunk/include/mesos/mesos.proto (original)
+++ incubator/mesos/trunk/include/mesos/mesos.proto Fri Jan 27 01:25:13 2012
@@ -110,6 +110,18 @@ message ExecutorInfo {
 
 
 /**
+ * Describes a master. This will probably have more fields in the
+ * future which might be used, for example, to link a framework webui
+ * to a master webui.
+ */
+message MasterInfo {
+  required string id = 1;
+  required uint32 ip = 2;
+  required uint32 port = 3;
+}
+
+
+/**
  * Describes a slave. The 'webui_hostname' and 'webui_port' are
  * provided in the event a host has different private and public
  * hostnames (e.g., Amazon EC2).

Modified: incubator/mesos/trunk/include/mesos/scheduler.hpp
URL: http://svn.apache.org/viewvc/incubator/mesos/trunk/include/mesos/scheduler.hpp?rev=1236485&r1=1236484&r2=1236485&view=diff
==============================================================================
--- incubator/mesos/trunk/include/mesos/scheduler.hpp (original)
+++ incubator/mesos/trunk/include/mesos/scheduler.hpp Fri Jan 27 01:25:13 2012
@@ -284,7 +284,7 @@ public:
    * variables, as well as any configuration files found through the
    * environment variables.
    */
-  MesosSchedulerDriver(Scheduler* sched,
+  MesosSchedulerDriver(Scheduler* scheduler,
                        const std::string& frameworkName,
                        const ExecutorInfo& executorInfo,
                        const std::string& url,
@@ -299,7 +299,7 @@ public:
    * Additional Mesos config options are obtained from the 'params'
    * argument.
    */
-  MesosSchedulerDriver(Scheduler* sched,
+  MesosSchedulerDriver(Scheduler* scheduler,
                        const std::string& frameworkName,
                        const ExecutorInfo& executorInfo,
                        const std::map<std::string, std::string>& params,
@@ -311,7 +311,7 @@ public:
    * command-line (via 'argc' and 'argv'). Optionally providing an
    * existing framework ID can be used to failover a framework.
    */
-  MesosSchedulerDriver(Scheduler* sched,
+  MesosSchedulerDriver(Scheduler* scheduler,
                        const std::string& frameworkName,
                        const ExecutorInfo& executorInfo,
                        int argc,
@@ -349,16 +349,13 @@ public:
 
 private:
   // Initialization method used by constructors.
-  void init(Scheduler* sched,
+  void init(Scheduler* scheduler,
             internal::Configuration* conf,
             const FrameworkID& frameworkId,
             const std::string& frameworkName,
             const ExecutorInfo& executorInfo);
 
-  // Internal utility method to report an error to the scheduler.
-  void error(int code, const std::string& message);
-
-  Scheduler* sched;
+  Scheduler* scheduler;
   std::string url;
   FrameworkID frameworkId;
   std::string frameworkName;

Modified: incubator/mesos/trunk/src/common/lock.cpp
URL: http://svn.apache.org/viewvc/incubator/mesos/trunk/src/common/lock.cpp?rev=1236485&r1=1236484&r2=1236485&view=diff
==============================================================================
--- incubator/mesos/trunk/src/common/lock.cpp (original)
+++ incubator/mesos/trunk/src/common/lock.cpp Fri Jan 27 01:25:13 2012
@@ -18,16 +18,38 @@
 
 #include "lock.hpp"
 
-using namespace mesos::internal;
+namespace mesos {
+namespace internal {
 
+Lock::Lock(pthread_mutex_t* _mutex)
+  : mutex(_mutex)
+{
+  lock();
+}
+
+
+void Lock::lock()
+{
+  if (!locked) {
+    pthread_mutex_lock(mutex);
+    locked = true;
+  }
+}
 
-Lock::Lock(pthread_mutex_t* _mutex): mutex(_mutex)
+
+void Lock::unlock()
 {
-  pthread_mutex_lock(mutex);
+  if (locked) {
+    pthread_mutex_unlock(mutex);
+    locked = false;
+  }
 }
 
 
 Lock::~Lock()
 {
-  pthread_mutex_unlock(mutex);
+  unlock();
 }
+
+} // namespace internal {
+} // namespace mesos {

Modified: incubator/mesos/trunk/src/common/lock.hpp
URL: http://svn.apache.org/viewvc/incubator/mesos/trunk/src/common/lock.hpp?rev=1236485&r1=1236484&r2=1236485&view=diff
==============================================================================
--- incubator/mesos/trunk/src/common/lock.hpp (original)
+++ incubator/mesos/trunk/src/common/lock.hpp Fri Jan 27 01:25:13 2012
@@ -16,27 +16,30 @@
  * limitations under the License.
  */
 
-#ifndef LOCK_HPP
-#define LOCK_HPP
+#ifndef __LOCK_HPP__
+#define __LOCK_HPP__
 
 #include <pthread.h>
 
+namespace mesos {
+namespace internal {
 
-namespace mesos { namespace internal {
-
-/**
- * RAII class for locking pthread_mutexes.
- */
+// RAII class for locking pthread_mutexes.
 class Lock
 {
-  pthread_mutex_t* mutex;
-
 public:
   Lock(pthread_mutex_t* _mutex);
   ~Lock();
-};
 
-}} /* namespace mesos { namespace internal { */
+  void lock();
+  void unlock();
+
+private:
+  pthread_mutex_t* mutex;
+  bool locked;
+};
 
+} // namespace internal {
+} // namespace mesos {
 
-#endif /* LOCK_HPP */
+#endif // __LOCK_HPP__

Modified: incubator/mesos/trunk/src/common/type_utils.hpp
URL: http://svn.apache.org/viewvc/incubator/mesos/trunk/src/common/type_utils.hpp?rev=1236485&r1=1236484&r2=1236485&view=diff
==============================================================================
--- incubator/mesos/trunk/src/common/type_utils.hpp (original)
+++ incubator/mesos/trunk/src/common/type_utils.hpp Fri Jan 27 01:25:13 2012
@@ -73,6 +73,13 @@ inline std::ostream& operator << (std::o
 }
 
 
+inline std::ostream& operator << (std::ostream& stream, const TaskDescription& task)
+{
+  stream << task.DebugString();
+  return stream;
+}
+
+
 inline bool operator == (const FrameworkID& left, const FrameworkID& right)
 {
   return left.value() == right.value();

Modified: incubator/mesos/trunk/src/exec/exec.cpp
URL: http://svn.apache.org/viewvc/incubator/mesos/trunk/src/exec/exec.cpp?rev=1236485&r1=1236484&r2=1236485&view=diff
==============================================================================
--- incubator/mesos/trunk/src/exec/exec.cpp (original)
+++ incubator/mesos/trunk/src/exec/exec.cpp Fri Jan 27 01:25:13 2012
@@ -24,8 +24,6 @@
 #include <string>
 #include <sstream>
 
-#include <boost/bind.hpp>
-
 #include <mesos/executor.hpp>
 
 #include <process/dispatch.hpp>
@@ -46,14 +44,13 @@ using namespace mesos::internal;
 
 using namespace process;
 
-using boost::bind;
-
 using std::string;
 
 using process::wait; // Necessary on some OS's to disambiguate.
 
 
-namespace mesos { namespace internal {
+namespace mesos {
+namespace internal {
 
 class ExecutorProcess : public ProtobufProcess<ExecutorProcess>
 {
@@ -74,35 +71,33 @@ public:
       aborted(false),
       directory(_directory)
   {
-    installProtobufHandler<ExecutorRegisteredMessage>(
+    install<ExecutorRegisteredMessage>(
         &ExecutorProcess::registered,
         &ExecutorRegisteredMessage::args);
 
-    installProtobufHandler<RunTaskMessage>(
+    install<RunTaskMessage>(
         &ExecutorProcess::runTask,
         &RunTaskMessage::task);
 
-    installProtobufHandler<KillTaskMessage>(
+    install<KillTaskMessage>(
         &ExecutorProcess::killTask,
         &KillTaskMessage::task_id);
 
-    installProtobufHandler<FrameworkToExecutorMessage>(
+    install<FrameworkToExecutorMessage>(
         &ExecutorProcess::frameworkMessage,
         &FrameworkToExecutorMessage::slave_id,
         &FrameworkToExecutorMessage::framework_id,
         &FrameworkToExecutorMessage::executor_id,
         &FrameworkToExecutorMessage::data);
 
-    installProtobufHandler<ShutdownExecutorMessage>(
+    install<ShutdownExecutorMessage>(
         &ExecutorProcess::shutdown);
-
-    installMessageHandler(EXITED, &ExecutorProcess::exited);
   }
 
   virtual ~ExecutorProcess() {}
 
 protected:
-  virtual void operator () ()
+  virtual void initialize()
   {
     VLOG(1) << "Executor started at: " << self();
 
@@ -113,8 +108,6 @@ protected:
     message.mutable_framework_id()->MergeFrom(frameworkId);
     message.mutable_executor_id()->MergeFrom(executorId);
     send(slave, message);
-
-    do { if (serve() == TERMINATE) break; } while (true);
   }
 
   void registered(const ExecutorArgs& args)
@@ -127,7 +120,7 @@ protected:
     VLOG(1) << "Executor registered on slave " << args.slave_id();
 
     slaveId = args.slave_id();
-    invoke(bind(&Executor::init, executor, driver, args));
+    executor->init(driver, args);
   }
 
   void runTask(const TaskDescription& task)
@@ -139,7 +132,7 @@ protected:
 
     VLOG(1) << "Executor asked to run task '" << task.task_id() << "'";
 
-    invoke(bind(&Executor::launchTask, executor, driver, task));
+    executor->launchTask(driver, task);
   }
 
   void killTask(const TaskID& taskId)
@@ -151,7 +144,7 @@ protected:
 
     VLOG(1) << "Executor asked to kill task '" << taskId << "'";
 
-    invoke(bind(&Executor::killTask, executor, driver, taskId));
+    executor->killTask(driver, taskId);
   }
 
   void frameworkMessage(const SlaveID& slaveId,
@@ -166,7 +159,7 @@ protected:
 
     VLOG(1) << "Executor received framework message";
 
-    invoke(bind(&Executor::frameworkMessage, executor, driver, data));
+    executor->frameworkMessage(driver, data);
   }
 
   void shutdown()
@@ -179,7 +172,7 @@ protected:
     VLOG(1) << "Executor asked to shutdown";
 
     // TODO(benh): Any need to invoke driver.stop?
-    invoke(bind(&Executor::shutdown, executor, driver));
+    executor->shutdown(driver);
     if (!local) {
       exit(0);
     } else {
@@ -193,7 +186,7 @@ protected:
     aborted = true;
   }
 
-  void exited()
+  virtual void exited(const UPID& pid)
   {
     if (aborted) {
       VLOG(1) << "Ignoring exited event because the driver is aborted!";
@@ -203,7 +196,7 @@ protected:
     VLOG(1) << "Slave exited, trying to shutdown";
 
     // TODO: Pass an argument to shutdown to tell it this is abnormal?
-    invoke(bind(&Executor::shutdown, executor, driver));
+    executor->shutdown(driver);
 
     // This is a pretty bad state ... no slave is left. Rather
     // than exit lets kill our process group (which includes
@@ -225,7 +218,7 @@ protected:
     update->mutable_executor_id()->MergeFrom(executorId);
     update->mutable_slave_id()->MergeFrom(slaveId);
     update->mutable_status()->MergeFrom(status);
-    update->set_timestamp(elapsedTime());
+    update->set_timestamp(Clock::now());
     update->set_uuid(UUID::random().toBytes());
     send(slave, message);
   }
@@ -254,7 +247,8 @@ private:
   const std::string directory;
 };
 
-}} // namespace mesos { namespace internal {
+} // namespace internal {
+} // namespace mesos {
 
 
 // Implementation of C++ API.
@@ -397,6 +391,11 @@ Status MesosExecutorDriver::stop()
   terminate(process);
 
   state = STOPPED;
+
+  // TODO(benh): Set the condition variable in ExecutorProcess just as
+  // we do with the MesosSchedulerDriver and SchedulerProcess:
+  // dispatch(process, &ExecutorProcess::stop);
+
   pthread_cond_signal(&cond);
 
   return OK;
@@ -419,6 +418,9 @@ Status MesosExecutorDriver::abort()
 
   CHECK(process != NULL);
 
+  // TODO(benh): Set the condition variable in ExecutorProcess just as
+  // we do with the MesosSchedulerDriver and SchedulerProcess.
+
   dispatch(process, &ExecutorProcess::abort);
 
   pthread_cond_signal(&cond);

Modified: incubator/mesos/trunk/src/local/local.cpp
URL: http://svn.apache.org/viewvc/incubator/mesos/trunk/src/local/local.cpp?rev=1236485&r1=1236484&r2=1236485&view=diff
==============================================================================
--- incubator/mesos/trunk/src/local/local.cpp (original)
+++ incubator/mesos/trunk/src/local/local.cpp Fri Jan 27 01:25:13 2012
@@ -118,6 +118,8 @@ PID<Master> launch(const Configuration& 
 
   vector<UPID> pids;
 
+  // TODO(benh): Launching more than one slave is actually not kosher
+  // since each slave tries to take the "slave" id.
   for (int i = 0; i < numSlaves; i++) {
     // TODO(benh): Create a local isolation module?
     ProcessBasedIsolationModule *isolationModule =
@@ -136,7 +138,7 @@ PID<Master> launch(const Configuration& 
 void shutdown()
 {
   if (master != NULL) {
-    process::post(master->self(), process::TERMINATE);
+    process::terminate(master->self());
     process::wait(master->self());
     delete master;
     delete allocator;
@@ -149,7 +151,7 @@ void shutdown()
     // we have stopped the slave.
 
     foreachpair (IsolationModule* isolationModule, Slave* slave, slaves) {
-      process::post(slave->self(), process::TERMINATE);
+      process::terminate(slave->self());
       process::wait(slave->self());
       delete isolationModule;
       delete slave;

Modified: incubator/mesos/trunk/src/log/coordinator.cpp
URL: http://svn.apache.org/viewvc/incubator/mesos/trunk/src/log/coordinator.cpp?rev=1236485&r1=1236484&r2=1236485&view=diff
==============================================================================
--- incubator/mesos/trunk/src/log/coordinator.cpp (original)
+++ incubator/mesos/trunk/src/log/coordinator.cpp Fri Jan 27 01:25:13 2012
@@ -22,7 +22,6 @@
 #include <process/future.hpp>
 
 #include "common/foreach.hpp"
-#include "common/option.hpp"
 
 #include "log/coordinator.hpp"
 #include "log/replica.hpp"
@@ -64,9 +63,9 @@ Result<uint64_t> Coordinator::elect(cons
   // Get the highest known promise from our local replica.
   Future<uint64_t> promise = replica->promised();
 
-  promise.await(); // TODO(benh): Take a timeout and use it here!
-
-  if (promise.isFailed()) {
+  if (!promise.await(timeout.remaining())) {
+    return Result<uint64_t>::none();
+  } else if (promise.isFailed()) {
     return Result<uint64_t>::error(promise.failure());
   }
 
@@ -81,14 +80,13 @@ Result<uint64_t> Coordinator::elect(cons
   set<Future<PromiseResponse> > futures =
     broadcast(protocol::promise, request);
 
-  Option<Future<PromiseResponse> > option;
   int okays = 0;
 
   do {
-    option = select(futures, timeout.remaining());
-    if (option.isSome()) {
-      CHECK(option.get().isReady());
-      const PromiseResponse& response = option.get().get();
+    Future<Future<PromiseResponse> > future = select(futures);
+    if (future.await(timeout.remaining())) {
+      CHECK(future.get().isReady());
+      const PromiseResponse& response = future.get().get();
       if (!response.okay()) {
         return Result<uint64_t>::none(); // Lost an election, but can retry.
       } else if (response.okay()) {
@@ -99,9 +97,9 @@ Result<uint64_t> Coordinator::elect(cons
           break;
         }
       }
-      futures.erase(option.get());
+      futures.erase(future.get());
     }
-  } while (option.isSome());
+  } while (timeout.remaining() > 0);
 
   // Discard the remaining futures.
   discard(futures);
@@ -120,9 +118,10 @@ Result<uint64_t> Coordinator::elect(cons
 
     Future<set<uint64_t> > positions = replica->missing(index);
 
-    positions.await(timeout.remaining());
-
-    if (positions.isFailed()) {
+    if (!positions.await(timeout.remaining())) {
+      elected = false;
+      return Result<uint64_t>::none();
+    } else if (positions.isFailed()) {
       elected = false;
       return Result<uint64_t>::error(positions.failure());
     }
@@ -176,7 +175,7 @@ Result<uint64_t> Coordinator::append(
   Action::Append* append = action.mutable_append();
   append->set_bytes(bytes);
 
-  Result<uint64_t> result = write(action, Timeout(timeout));
+  Result<uint64_t> result = write(action, timeout);
 
   if (result.isSome()) {
     CHECK(result.get() == index);
@@ -266,17 +265,15 @@ Result<uint64_t> Coordinator::write(
   set<Future<WriteResponse> > futures =
     remotecast(protocol::write, request);
 
-  Option<Future<WriteResponse> > option;
   int okays = 0;
 
   do {
-    option = select(futures, timeout.remaining());
-    if (option.isSome()) {
-      CHECK(option.get().isReady());
-      const WriteResponse& response = option.get().get();
+    Future<Future<WriteResponse> > future = select(futures);
+    if (future.await(timeout.remaining())) {
+      CHECK(future.get().isReady());
+      const WriteResponse& response = future.get().get();
       CHECK(response.id() == request.id());
       CHECK(response.position() == request.position());
-
       if (!response.okay()) {
         elected = false;
         return Result<uint64_t>::error("Coordinator demoted");
@@ -296,11 +293,11 @@ Result<uint64_t> Coordinator::write(
           }
         }
       }
-      futures.erase(option.get());
+      futures.erase(future.get());
     }
-  } while (option.isSome());
+  } while (timeout.remaining() > 0);
 
-  // Timed out ...
+  // Timed out ... discard remaining futures.
   discard(futures);
   return Result<uint64_t>::none();
 }
@@ -397,14 +394,13 @@ Result<Action> Coordinator::fill(uint64_
   set<Future<PromiseResponse> > futures =
     broadcast(protocol::promise, request);
 
-  Option<Future<PromiseResponse> > option;
   list<PromiseResponse> responses;
 
   do {
-    option = select(futures, timeout.remaining());
-    if (option.isSome()) {
-      CHECK(option.get().isReady());
-      const PromiseResponse& response = option.get().get();
+    Future<Future<PromiseResponse> > future = select(futures);
+    if (future.await(timeout.remaining())) {
+      CHECK(future.get().isReady());
+      const PromiseResponse& response = future.get().get();
       CHECK(response.id() == request.id());
       if (!response.okay()) {
         elected = false;
@@ -415,9 +411,9 @@ Result<Action> Coordinator::fill(uint64_
           break;
         }
       }
-      futures.erase(option.get());
+      futures.erase(future.get());
     }
-  } while (option.isSome());
+  } while (timeout.remaining() > 0);
 
   // Discard the remaining futures.
   discard(futures);

Modified: incubator/mesos/trunk/src/log/log.cpp
URL: http://svn.apache.org/viewvc/incubator/mesos/trunk/src/log/log.cpp?rev=1236485&r1=1236484&r2=1236485&view=diff
==============================================================================
--- incubator/mesos/trunk/src/log/log.cpp (original)
+++ incubator/mesos/trunk/src/log/log.cpp Fri Jan 27 01:25:13 2012
@@ -241,7 +241,7 @@ public:
   void updated(const string& path);
 
 protected:
-  virtual void operator () ();
+  virtual void initialze();
 
 private:
   // Updates the group.
@@ -526,13 +526,11 @@ void LogProcess::updated(const string& p
 }
 
 
-void LogProcess::operator () ()
+void LogProcess::initalize()
 {
   // TODO(benh): Real testing requires injecting a ZooKeeper instance.
   watcher = new LogProcessWatcher(self());
   zk = new ZooKeeper(servers, 10000, watcher);
-
-  do { if (serve() == process::TERMINATE) break; } while (true);
 }
 
 

Modified: incubator/mesos/trunk/src/log/log.hpp
URL: http://svn.apache.org/viewvc/incubator/mesos/trunk/src/log/log.hpp?rev=1236485&r1=1236484&r2=1236485&view=diff
==============================================================================
--- incubator/mesos/trunk/src/log/log.hpp (original)
+++ incubator/mesos/trunk/src/log/log.hpp Fri Jan 27 01:25:13 2012
@@ -212,13 +212,13 @@ public:
     LOG(INFO) << "Attempting to join replica to ZooKeeper group";
 
     membership = group->join(replica->pid())
-      .onFailed(dispatch(lambda::bind(&Log::failed, this, lambda::_1)))
-      .onDiscarded(dispatch(lambda::bind(&Log::discarded, this)));
+      .onFailed(executor.defer(lambda::bind(&Log::failed, this, lambda::_1)))
+      .onDiscarded(executor.defer(lambda::bind(&Log::discarded, this)));
 
     group->watch()
-      .onReady(dispatch(lambda::bind(&Log::watch, this, lambda::_1)))
-      .onFailed(dispatch(lambda::bind(&Log::failed, this, lambda::_1)))
-      .onDiscarded(dispatch(lambda::bind(&Log::discarded, this)));
+      .onReady(executor.defer(lambda::bind(&Log::watch, this, lambda::_1)))
+      .onFailed(executor.defer(lambda::bind(&Log::failed, this, lambda::_1)))
+      .onDiscarded(executor.defer(lambda::bind(&Log::discarded, this)));
   }
 
   ~Log()
@@ -256,7 +256,7 @@ private:
 
   zookeeper::Group* group;
   process::Future<zookeeper::Group::Membership> membership;
-  async::Dispatch dispatch;
+  process::Executor executor;
 
   int quorum;
 
@@ -418,14 +418,14 @@ void Log::watch(const std::set<zookeeper
     // Our replica's membership must have expired, join back up.
     LOG(INFO) << "Renewing replica group membership";
     membership = group->join(replica->pid())
-      .onFailed(dispatch(lambda::bind(&Log::failed, this, lambda::_1)))
-      .onDiscarded(dispatch(lambda::bind(&Log::discarded, this)));
+      .onFailed(executor.defer(lambda::bind(&Log::failed, this, lambda::_1)))
+      .onDiscarded(executor.defer(lambda::bind(&Log::discarded, this)));
   }
 
   group->watch(memberships)
-    .onReady(dispatch(lambda::bind(&Log::watch, this, lambda::_1)))
-    .onFailed(dispatch(lambda::bind(&Log::failed, this, lambda::_1)))
-    .onDiscarded(dispatch(lambda::bind(&Log::discarded, this)));
+    .onReady(executor.defer(lambda::bind(&Log::watch, this, lambda::_1)))
+    .onFailed(executor.defer(lambda::bind(&Log::failed, this, lambda::_1)))
+    .onDiscarded(executor.defer(lambda::bind(&Log::discarded, this)));
 }
 
 

Modified: incubator/mesos/trunk/src/log/network.hpp
URL: http://svn.apache.org/viewvc/incubator/mesos/trunk/src/log/network.hpp?rev=1236485&r1=1236484&r2=1236485&view=diff
==============================================================================
--- incubator/mesos/trunk/src/log/network.hpp (original)
+++ incubator/mesos/trunk/src/log/network.hpp Fri Jan 27 01:25:13 2012
@@ -25,8 +25,8 @@
 #include <set>
 #include <string>
 
-#include <process/async.hpp>
-#include <process/process.hpp>
+#include <process/deferred.hpp>
+#include <process/executor.hpp>
 #include <process/protobuf.hpp>
 #include <process/timeout.hpp>
 
@@ -34,7 +34,6 @@
 
 #include "common/foreach.hpp"
 #include "common/lambda.hpp"
-#include "common/option.hpp"
 #include "common/seconds.hpp"
 #include "common/utils.hpp"
 
@@ -107,7 +106,7 @@ private:
 
   zookeeper::Group* group;
 
-  async::Dispatch dispatch;
+  process::Executor executor;
 };
 
 
@@ -216,6 +215,7 @@ inline void Network::remove(const proces
   process::dispatch(process, &NetworkProcess::remove, pid);
 }
 
+
 inline void Network::set(const std::set<process::UPID>& pids)
 {
   process::dispatch(process, &NetworkProcess::set, pids);
@@ -239,8 +239,7 @@ void Network::broadcast(
     const std::set<process::UPID>& filter)
 {
   // Need to disambiguate overloaded function.
-  void (NetworkProcess::*broadcast) (
-      const M&, const std::set<process::UPID>&) =
+  void (NetworkProcess::*broadcast)(const M&, const std::set<process::UPID>&) =
     &NetworkProcess::broadcast<M>;
 
   process::dispatch(process, broadcast, m, filter);
@@ -256,12 +255,19 @@ inline ZooKeeperNetwork::ZooKeeperNetwor
 inline void ZooKeeperNetwork::watch(
     const std::set<zookeeper::Group::Membership>& memberships)
 {
+  process::deferred<void(const std::set<zookeeper::Group::Membership>&)> ready =
+    executor.defer(lambda::bind(&ZooKeeperNetwork::ready, this, lambda::_1));
+
+  process::deferred<void(const std::string&)> failed =
+    executor.defer(lambda::bind(&ZooKeeperNetwork::failed, this, lambda::_1));
+
+  process::deferred<void(void)> discarded =
+    executor.defer(lambda::bind(&ZooKeeperNetwork::discarded, this));
+
   group->watch(memberships)
-    .onReady(dispatch(lambda::bind(&ZooKeeperNetwork::ready,
-                                   this, lambda::_1)))
-    .onFailed(dispatch(lambda::bind(&ZooKeeperNetwork::failed,
-                                    this, lambda::_1)))
-    .onDiscarded(dispatch(lambda::bind(&ZooKeeperNetwork::discarded, this)));
+    .onReady(ready)
+    .onFailed(failed)
+    .onDiscarded(discarded);
 }
 
 
@@ -279,18 +285,16 @@ inline void ZooKeeperNetwork::ready(
 
   std::set<process::UPID> pids;
 
-  Option<process::Future<std::string> > option;
-
   process::Timeout timeout = 5.0;
 
   while (!futures.empty()) {
-    option = select(futures, timeout.remaining());
-    if (option.isSome()) {
-      CHECK(option.get().isReady());
-      process::UPID pid(option.get().get());
-      CHECK(pid) << "Failed to parse '" << option.get().get() << "'";
+    process::Future<process::Future<std::string> > future = select(futures);
+    if (future.await(timeout.remaining())) {
+      CHECK(future.get().isReady());
+      process::UPID pid(future.get().get());
+      CHECK(pid) << "Failed to parse '" << future.get().get() << "'";
       pids.insert(pid);
-      futures.erase(option.get());
+      futures.erase(future.get());
     } else {
       watch(); // Try again later assuming empty group.
       return;

Modified: incubator/mesos/trunk/src/log/replica.cpp
URL: http://svn.apache.org/viewvc/incubator/mesos/trunk/src/log/replica.cpp?rev=1236485&r1=1236484&r2=1236485&view=diff
==============================================================================
--- incubator/mesos/trunk/src/log/replica.cpp (original)
+++ incubator/mesos/trunk/src/log/replica.cpp Fri Jan 27 01:25:13 2012
@@ -459,7 +459,7 @@ public:
 
   // Returns all the actions between the specified positions, unless
   // those positions are invalid, in which case returns an error.
-  process::Promise<std::list<Action> > read(uint64_t from, uint64_t to);
+  process::Future<std::list<Action> > read(uint64_t from, uint64_t to);
 
   // Returns missing positions in the log (i.e., unlearned or holes)
   // up to the specified position.
@@ -527,17 +527,17 @@ ReplicaProcess::ReplicaProcess(const str
   recover(path);
 
   // Install protobuf handlers.
-  installProtobufHandler<PromiseRequest>(
+  install<PromiseRequest>(
       &ReplicaProcess::promise);
 
-  installProtobufHandler<WriteRequest>(
+  install<WriteRequest>(
       &ReplicaProcess::write);
 
-  installProtobufHandler<LearnedMessage>(
+  install<LearnedMessage>(
       &ReplicaProcess::learned,
       &LearnedMessage::action);
 
-  installProtobufHandler<LearnRequest>(
+  install<LearnRequest>(
       &ReplicaProcess::learn,
       &LearnRequest::position);
 }
@@ -574,22 +574,22 @@ Result<Action> ReplicaProcess::read(uint
 
 // TODO(benh): Make this function actually return a Try once we change
 // the future semantics to not include failures.
-process::Promise<list<Action> > ReplicaProcess::read(
+process::Future<list<Action> > ReplicaProcess::read(
     uint64_t from,
     uint64_t to)
 {
   if (to < from) {
     process::Promise<list<Action> > promise;
     promise.fail("Bad read range (to < from)");
-    return promise;
+    return promise.future();
   } else if (from < begin) {
     process::Promise<list<Action> > promise;
     promise.fail("Bad read range (truncated position)");
-    return promise;
+    return promise.future();
   } else if (end < to) {
     process::Promise<list<Action> > promise;
     promise.fail("Bad read range (past end of log)");
-    return promise;
+    return promise.future();
   }
 
   list<Action> actions;
@@ -600,7 +600,7 @@ process::Promise<list<Action> > ReplicaP
     if (result.isError()) {
       process::Promise<list<Action> > promise;
       promise.fail(result.error());
-      return promise;
+      return promise.future();
     } else if (result.isSome()) {
       actions.push_back(result.get());
     }
@@ -688,7 +688,7 @@ void ReplicaProcess::promise(const Promi
         response.set_okay(true);
         response.set_id(request.id());
         response.set_position(request.position());
-        send(from(), response);
+        reply(response);
       }
     } else {
       CHECK(result.isSome());
@@ -700,7 +700,7 @@ void ReplicaProcess::promise(const Promi
         response.set_okay(false);
         response.set_id(request.id());
         response.set_position(request.position());
-        send(from(), response);
+        reply(response);
       } else {
         Action original = action;
         action.set_promised(request.id());
@@ -710,7 +710,7 @@ void ReplicaProcess::promise(const Promi
           response.set_okay(true);
           response.set_id(request.id());
           response.mutable_action()->MergeFrom(original);
-          send(from(), response);
+          reply(response);
         }
       }
     }
@@ -722,7 +722,7 @@ void ReplicaProcess::promise(const Promi
       PromiseResponse response;
       response.set_okay(false);
       response.set_id(request.id());
-      send(from(), response);
+      reply(response);
     } else {
       Promise promise;
       promise.set_id(request.id());
@@ -735,7 +735,7 @@ void ReplicaProcess::promise(const Promi
         response.set_okay(true);
         response.set_id(request.id());
         response.set_position(end);
-        send(from(), response);
+        reply(response);
       }
     }
   }
@@ -757,7 +757,7 @@ void ReplicaProcess::write(const WriteRe
       response.set_okay(false);
       response.set_id(request.id());
       response.set_position(request.position());
-      send(from(), response);
+      reply(response);
     } else {
       Action action;
       action.set_position(request.position());
@@ -788,7 +788,7 @@ void ReplicaProcess::write(const WriteRe
         response.set_okay(true);
         response.set_id(request.id());
         response.set_position(request.position());
-        send(from(), response);
+        reply(response);
       }
     }
   } else if (result.isSome()) {
@@ -800,7 +800,7 @@ void ReplicaProcess::write(const WriteRe
       response.set_okay(false);
       response.set_id(request.id());
       response.set_position(request.position());
-      send(from(), response);
+      reply(response);
     } else {
       // TODO(benh): Check if this position has already been learned,
       // and if so, check that we are re-writing the same value!
@@ -835,7 +835,7 @@ void ReplicaProcess::write(const WriteRe
         response.set_okay(true);
         response.set_id(request.id());
         response.set_position(request.position());
-        send(from(), response);
+        reply(response);
       }
     }
   }
@@ -871,11 +871,11 @@ void ReplicaProcess::learn(uint64_t posi
     LearnResponse response;
     response.set_okay(true);
     response.mutable_action()->MergeFrom(result.get());
-    send(from(), response);
+    reply(response);
   } else {
     LearnResponse response;
     response.set_okay(false);
-    send(from(), response);
+    reply(response);
   }
 }
 

Modified: incubator/mesos/trunk/src/master/frameworks_manager.cpp
URL: http://svn.apache.org/viewvc/incubator/mesos/trunk/src/master/frameworks_manager.cpp?rev=1236485&r1=1236484&r2=1236485&view=diff
==============================================================================
--- incubator/mesos/trunk/src/master/frameworks_manager.cpp (original)
+++ incubator/mesos/trunk/src/master/frameworks_manager.cpp Fri Jan 27 01:25:13 2012
@@ -27,7 +27,9 @@
 
 using std::map;
 
-namespace mesos { namespace internal { namespace master {
+namespace mesos {
+namespace internal {
+namespace master {
 
 // Constructor: Initializes storage and resets cached flag.
 FrameworksManager::FrameworksManager(FrameworksStorage* _storage)
@@ -39,8 +41,8 @@ FrameworksManager::FrameworksManager(Fra
 Result<map<FrameworkID, FrameworkInfo> > FrameworksManager::list()
 {
   if (!cache()) {
-    return Result<map<FrameworkID,
-                      FrameworkInfo> >::error("Error caching framework infos.");
+    return Result<map<FrameworkID, FrameworkInfo> >
+      ::error("Error caching framework infos.");
   }
 
   map<FrameworkID, FrameworkInfo> result;
@@ -68,23 +70,27 @@ Result<bool> FrameworksManager::add(cons
   //      return Result<bool>::error("Error caching framework infos.");
   //    }
   //
-  //  if(infos.count(id)){
+  //  if (infos.count(id) > 0) {
   //    LOG(INFO) << "Duplicate framework detected...id: " << id;
   //
-  //    if(infos[id].first. == info)
+  //    if (infos[id].first == info)
   //    {
   //      LOG(INFO) << "Duplicate framework information detected...returning.";
   //      return Result<bool>::some(true);
   //    }
   //  }
 
-  Result<bool> result =
-    call(storage, &FrameworksStorage::add, id, info);
+  Future<Result<bool> > result =
+    dispatch(storage, &FrameworksStorage::add, id, info);
 
-  if (result.isError()) {
+  result.await();
+
+  CHECK(result.isReady());
+
+  if (result.get().isError()) {
     LOG(ERROR) << "Error adding framework to underlying storage: "
-               << result.error();
-    return result;
+               << result.get().error();
+    return result.get();
   }
 
   infos[id] = std::make_pair(info, Option<double>::none());
@@ -93,26 +99,26 @@ Result<bool> FrameworksManager::add(cons
 }
 
 
-// Remove a framework after a delay.
-// Actually sends a message to self after the given delay.
-Promise<Result<bool> > FrameworksManager::remove(const FrameworkID& id,
-                                                 double delay_secs)
+Future<Result<bool> > FrameworksManager::remove(const FrameworkID& id,
+                                                const seconds& s)
 {
   if (!cache()) {
     return Result<bool>::error("Error caching framework infos.");
   }
 
-  if(!infos.count(id)) {
+  if (infos.count(id) == 0) {
     LOG(INFO) << "Can't remove non-existent Framework: " << id;
     return Result<bool>::error("Error removing non-existing framework.");
   }
 
+  LOG(INFO) << "Expiring framework " << id << " in " << s.value << " seconds";
+
   // Set the option to contain the firing time of the message.
-  infos[id].second = Option<double>::some(elapsedTime() + delay_secs);
+  infos[id].second = Option<double>::some(Clock::now() + s.value);
 
-  Promise<Result<bool> > promise;
-  delay(delay_secs, self(), &FrameworksManager::expire, id, promise);
-  return promise;
+  Promise<Result<bool> >* promise = new Promise<Result<bool> >();
+  delay(s.value, self(), &FrameworksManager::expire, id, promise);
+  return promise->future();
 }
 
 
@@ -125,7 +131,7 @@ Result<bool> FrameworksManager::resurrec
     return Result<bool>::error("Error caching framework infos.");
   }
 
-  if (infos.count(id)) {
+  if (infos.count(id) > 0) {
     infos[id].second = Option<double>::none();
 
     return true;
@@ -149,32 +155,44 @@ Result<bool> FrameworksManager::exists(c
 // Actually removes the framework from the underlying storage.
 // Checks for the case when the framework is being resurrected.
 void FrameworksManager::expire(const FrameworkID& id,
-                               Promise<Result<bool> > promise)
+                               Promise<Result<bool> >* promise)
 {
-  if (infos.count(id)) {
-    Option<double>& option = infos[id].second;
-
-    if (option.isSome() && elapsedTime() >= option.get()) {
-      LOG(INFO) << "Removing framework " << id << " from storage";
+  if (infos.count(id) > 0) {
+    const Option<double>& option = infos[id].second;
 
-      Result<bool> result = call(storage, &FrameworksStorage::remove, id);
-
-      // If storage returns successfully remove from cache.
-      if (!result.isError()) {
-        infos.erase(id);
+    if (option.isSome()) {
+      if (Clock::now() >= option.get()) {
+        LOG(INFO) << "Removing framework " << id << " from storage";
+
+        Future<Result<bool> > result =
+          dispatch(storage, &FrameworksStorage::remove, id);
+
+        result.await();
+
+        CHECK(result.isReady());
+
+        // If storage returns successfully remove from cache.
+        if (!result.get().isError()) {
+          infos.erase(id);
+        }
+        promise->set(result.get());
+        delete promise;
+        return;
+      } else {
+        LOG(INFO) << "Framework appears to be resurrected, and then "
+                  << "re-removed, so ignoring this delayed expire";
       }
-      promise.set(result);
-      return;
     } else {
       LOG(INFO) << "Framework appears to have been "
-                << "resurrected, ignoring delayed expire.";
+                << "resurrected, ignoring delayed expire";
     }
   } else {
     LOG(INFO) << "Framework has already been removed by someone else,"
-              << "ignoring delayed expire.";
+              << "ignoring delayed expire";
   }
 
-  promise.set(Result<bool>::some(false));
+  promise->set(Result<bool>::some(false));
+  delete promise;
 }
 
 
@@ -182,16 +200,22 @@ void FrameworksManager::expire(const Fra
 bool FrameworksManager::cache()
 {
   if (!cached) {
-    Result<map<FrameworkID, FrameworkInfo> > result =
-        call(storage, &FrameworksStorage::list);
+    LOG(INFO) << "Caching framework information";
+
+    Future<Result<map<FrameworkID, FrameworkInfo> > > result =
+      dispatch(storage, &FrameworksStorage::list);
+
+    result.await();
+
+    CHECK(result.isReady());
 
-    if (result.isError()) {
+    if (result.get().isError()) {
       LOG(ERROR) << "Error getting framework info from underlying storage: "
-                 << result.error();
+                 << result.get().error();
       return false;
     }
 
-    foreachpair (const FrameworkID& id, const FrameworkInfo& info, result.get()) {
+    foreachpair (const FrameworkID& id, const FrameworkInfo& info, result.get().get()) {
       infos[id] = std::make_pair(info, Option<double>::none());
     }
 
@@ -201,4 +225,6 @@ bool FrameworksManager::cache()
   return true;
 }
 
-}}}  // namespace mesos { namespace internal { namespace master {
+}  // namespace master {
+}  // namespace internal {
+}  // namespace mesos {

Modified: incubator/mesos/trunk/src/master/frameworks_manager.hpp
URL: http://svn.apache.org/viewvc/incubator/mesos/trunk/src/master/frameworks_manager.hpp?rev=1236485&r1=1236484&r2=1236485&view=diff
==============================================================================
--- incubator/mesos/trunk/src/master/frameworks_manager.hpp (original)
+++ incubator/mesos/trunk/src/master/frameworks_manager.hpp Fri Jan 27 01:25:13 2012
@@ -27,22 +27,25 @@
 
 #include "common/option.hpp"
 #include "common/result.hpp"
+#include "common/seconds.hpp"
 
 #include "messages/messages.hpp"
 
-namespace mesos { namespace internal { namespace master {
+namespace mesos {
+namespace internal {
+namespace master {
 
 using namespace process;
 
 class FrameworksStorage : public Process<FrameworksStorage>
 {
 public:
-  virtual Promise<Result<std::map<FrameworkID, FrameworkInfo> > > list() = 0;
+  virtual Future<Result<std::map<FrameworkID, FrameworkInfo> > > list() = 0;
 
-  virtual Promise<Result<bool> > add(const FrameworkID& id,
+  virtual Future<Result<bool> > add(const FrameworkID& id,
       const FrameworkInfo& info) = 0;
 
-  virtual Promise<Result<bool> > remove(const FrameworkID& id) = 0;
+  virtual Future<Result<bool> > remove(const FrameworkID& id) = 0;
 };
 
 class FrameworksManager : public Process<FrameworksManager>
@@ -57,8 +60,8 @@ public:
   // Add a new framework.
   Result<bool> add(const FrameworkID& id, const FrameworkInfo& info);
 
-  // Remove a framework after a certain delay.
-  Promise<Result<bool> > remove(const FrameworkID& id, double delay_secs);
+  // Remove a framework after a specified number of seconds.
+  Future<Result<bool> > remove(const FrameworkID& id, const seconds& s);
 
   // Resurrect the framework.
   Result<bool> resurrect(const FrameworkID& id);
@@ -68,7 +71,7 @@ public:
   Result<bool> exists(const FrameworkID& id);
 
 private:
-  void expire(const FrameworkID& id, Promise<Result<bool> > promise);
+  void expire(const FrameworkID& id, Promise<Result<bool> >* promise);
 
   bool cache();
 
@@ -82,5 +85,8 @@ private:
   std::map<FrameworkID, std::pair<FrameworkInfo, Option<double> > > infos;
 };
 
-}}} // namespace mesos { namespace internal { namespace master {
+} // namespace master {
+} // namespace internal {
+} // namespace mesos {
+
 #endif // __MASTER_FRAMEWORKS_MANAGER_HPP__

Modified: incubator/mesos/trunk/src/master/http.cpp
URL: http://svn.apache.org/viewvc/incubator/mesos/trunk/src/master/http.cpp?rev=1236485&r1=1236484&r2=1236485&view=diff
==============================================================================
--- incubator/mesos/trunk/src/master/http.cpp (original)
+++ incubator/mesos/trunk/src/master/http.cpp Fri Jan 27 01:25:13 2012
@@ -30,9 +30,9 @@
 #include "master/http.hpp"
 #include "master/master.hpp"
 
+using process::Future;
 using process::HttpResponse;
 using process::HttpRequest;
-using process::Promise;
 
 using std::string;
 
@@ -151,7 +151,7 @@ JSON::Object model(const Slave& slave)
 
 namespace http {
 
-Promise<HttpResponse> vars(
+Future<HttpResponse> vars(
     const Master& master,
     const HttpRequest& request)
 {
@@ -184,7 +184,7 @@ Promise<HttpResponse> vars(
 
 namespace json {
 
-Promise<HttpResponse> stats(
+Future<HttpResponse> stats(
     const Master& master,
     const HttpRequest& request)
 {
@@ -240,7 +240,7 @@ Promise<HttpResponse> stats(
 }
 
 
-Promise<HttpResponse> state(
+Future<HttpResponse> state(
     const Master& master,
     const HttpRequest& request)
 {
@@ -250,7 +250,7 @@ Promise<HttpResponse> state(
   object.values["build_date"] = build::DATE;
   object.values["build_user"] = build::USER;
   object.values["start_time"] = master.startTime;
-  object.values["id"] = master.id;
+  object.values["id"] = master.info.id();
   object.values["pid"] = string(master.self());
 
   // Model all of the slaves.

Modified: incubator/mesos/trunk/src/master/http.hpp
URL: http://svn.apache.org/viewvc/incubator/mesos/trunk/src/master/http.hpp?rev=1236485&r1=1236484&r2=1236485&view=diff
==============================================================================
--- incubator/mesos/trunk/src/master/http.hpp (original)
+++ incubator/mesos/trunk/src/master/http.hpp Fri Jan 27 01:25:13 2012
@@ -33,7 +33,7 @@ namespace http {
 
 // Returns current vars in "key value\n" format (keys do not contain
 // spaces, values may contain spaces but are ended by a newline).
-process::Promise<process::HttpResponse> vars(
+process::Future<process::HttpResponse> vars(
     const Master& master,
     const process::HttpRequest& request);
 
@@ -41,13 +41,13 @@ process::Promise<process::HttpResponse> 
 namespace json {
 
 // Returns current statistics of the master.
-process::Promise<process::HttpResponse> stats(
+process::Future<process::HttpResponse> stats(
     const Master& master,
     const process::HttpRequest& request);
 
 
 // Returns current state of the cluster that the master knows about.
-process::Promise<process::HttpResponse> state(
+process::Future<process::HttpResponse> state(
     const Master& master,
     const process::HttpRequest& request);
 

Modified: incubator/mesos/trunk/src/master/master.cpp
URL: http://svn.apache.org/viewvc/incubator/mesos/trunk/src/master/master.cpp?rev=1236485&r1=1236484&r2=1236485&view=diff
==============================================================================
--- incubator/mesos/trunk/src/master/master.cpp (original)
+++ incubator/mesos/trunk/src/master/master.cpp Fri Jan 27 01:25:13 2012
@@ -60,58 +60,56 @@ public:
   SlaveObserver(const UPID& _slave,
                 const SlaveInfo& _slaveInfo,
                 const SlaveID& _slaveId,
-                const PID<SlavesManager>& _slavesManager)
+                const PID<Master>& _master)
     : slave(_slave),
       slaveInfo(_slaveInfo),
       slaveId(_slaveId),
-      slavesManager(_slavesManager),
+      master(_master),
       timeouts(0),
-      pinged(false) {}
-
-  virtual ~SlaveObserver() {}
+      pinged(false)
+  {
+    install("PONG", &SlaveObserver::pong);
+  }
 
 protected:
-  virtual void operator () ()
+  virtual void initialize()
   {
-    // Send a ping some interval after we heard the last pong. Or if
-    // we don't hear a pong, increment the number of timeouts from the
-    // slave and try and send another ping. If we eventually timeout too
-    // many missed pongs in a row, consider the slave dead.
     send(slave, "PING");
     pinged = true;
+    delay(SLAVE_PONG_TIMEOUT, self(), &SlaveObserver::timeout);
+  }
 
-    do {
-      receive(SLAVE_PONG_TIMEOUT);
-      if (name() == "PONG") {
-        timeouts = 0;
-        pinged = false;
-      } else if (name() == TIMEOUT) {
-        if (pinged) {
-          timeouts++;
-          pinged = false;
-        }
+  void pong(const UPID& from, const string& body)
+  {
+    timeouts = 0;
+    pinged = false;
+  }
 
-        send(slave, "PING");
-        pinged = true;
-      } else if (name() == TERMINATE) {
+  void timeout()
+  {
+    if (pinged) { // So we haven't got back a pong yet ...
+      if (++timeouts >= MAX_SLAVE_TIMEOUTS) {
+        deactivate();
         return;
       }
-    } while (timeouts < MAX_SLAVE_TIMEOUTS);
-
-    // Tell the slave manager to deactivate the slave, this will take
-    // care of updating the master too.
-    while (!call(slavesManager, &SlavesManager::deactivate,
-                 slaveInfo.hostname(), slave.port)) {
-      LOG(WARNING) << "Slave \"failed\" but can't be deactivated, retrying";
-      pause(5);
     }
+
+    send(slave, "PING");
+    pinged = true;
+    delay(SLAVE_PONG_TIMEOUT, self(), &SlaveObserver::timeout);
+  }
+
+  void deactivate()
+  {
+    dispatch(master, &Master::deactivatedSlaveHostnamePort,
+             slaveInfo.hostname(), slave.port);
   }
 
 private:
   const UPID slave;
   const SlaveInfo slaveInfo;
   const SlaveID slaveId;
-  const PID<SlavesManager> slavesManager;
+  const PID<Master> master;
   int timeouts;
   bool pinged;
 };
@@ -134,10 +132,15 @@ struct SlaveRegistrar
                   const PID<Master>& master,
                   const PID<SlavesManager>& slavesManager)
   {
-    if (!call(slavesManager, &SlavesManager::add,
-              slave->info.hostname(), slave->pid.port)) {
+    Future<bool> added = dispatch(slavesManager, &SlavesManager::add,
+                                  slave->info.hostname(), slave->pid.port);
+    added.await();
+    if (!added.isReady() || !added.get()) {
       LOG(WARNING) << "Could not register slave because failed"
                    << " to add it to the slaves maanger";
+      // TODO(benh): This could be because our acknowledgement to the
+      // slave was dropped, so they retried, and now we should
+      // probably send another acknowledgement.
       delete slave;
       return false;
     }
@@ -166,10 +169,15 @@ struct SlaveReregistrar
                   const PID<Master>& master,
                   const PID<SlavesManager>& slavesManager)
   {
-    if (!call(slavesManager, &SlavesManager::add,
-              slave->info.hostname(), slave->pid.port)) {
+    Future<bool> added = dispatch(slavesManager, &SlavesManager::add,
+                                  slave->info.hostname(), slave->pid.port);
+    added.await();
+    if (!added.isReady() || !added.get()) {
       LOG(WARNING) << "Could not register slave because failed"
                    << " to add it to the slaves maanger";
+      // TODO(benh): This could be because our acknowledgement to the
+      // slave was dropped, so they retried, and now we should
+      // probably send another acknowledgement.
       delete slave;
       return false;
     }
@@ -182,18 +190,14 @@ struct SlaveReregistrar
 Master::Master(Allocator* _allocator)
   : ProcessBase("master"),
     allocator(_allocator)
-{
-  initialize();
-}
+{}
 
 
 Master::Master(Allocator* _allocator, const Configuration& conf)
   : ProcessBase("master"),
     allocator(_allocator),
     conf(conf)
-{
-  initialize();
-}
+{}
 
 
 Master::~Master()
@@ -259,24 +263,24 @@ vector<Slave*> Master::getActiveSlaves()
 }
 
 
-void Master::operator () ()
+void Master::initialize()
 {
   LOG(INFO) << "Master started at mesos://" << self();
 
-  // Don't do anything until we get a master token.
-  while (receive() != GotMasterTokenMessage().GetTypeName()) {
-    LOG(INFO) << "Oops! We're dropping a message since "
-              << "we haven't received an identifier yet!";
-  }
+  // The master ID is currently comprised of the current date, the IP
+  // address and port from self() and the OS PID.
+
+  Try<string> id =
+    strings::format("%s%d-%d-%d", DateUtils::currentDate().c_str(),
+                    self().ip, self().port, getpid());
 
-  GotMasterTokenMessage message;
-  message.ParseFromString(body());
+  CHECK(!id.isError()) << id.error();
 
-  // The master ID is comprised of the current date and some ephemeral
-  // token (e.g., determined by ZooKeeper).
+  info.set_id(id.get());
+  info.set_ip(self().ip);
+  info.set_port(self().port);
 
-  id = DateUtils::currentDate() + "-" + message.token();
-  LOG(INFO) << "Master ID: " << id;
+  LOG(INFO) << "Master ID: " << info.id();
 
   // Setup slave manager.
   slavesManager = new SlavesManager(conf, self());
@@ -284,27 +288,6 @@ void Master::operator () ()
 
   allocator->initialize(this);
 
-  // Start our timer ticks.
-  delay(1.0, self(), &Master::timerTick);
-
-  while (true) {
-    serve();
-    if (name() == TERMINATE) {
-      LOG(INFO) << "Asked to terminate by " << from();
-      foreachvalue (Slave* slave, slaves) {
-        send(slave->pid, TERMINATE);
-      }
-      break;
-    } else {
-      LOG(WARNING) << "Dropping unknown message '" << name() << "'"
-                   << " from: " << from();
-    }
-  }
-}
-
-
-void Master::initialize()
-{
   elected = false;
 
   nextFrameworkId = 0;
@@ -327,115 +310,153 @@ void Master::initialize()
   stats.validFrameworkMessages = 0;
   stats.invalidFrameworkMessages = 0;
 
-  startTime = elapsedTime();
+  startTime = Clock::now();
+
+  // Start our timer ticks.
+  delay(1.0, self(), &Master::timerTick);
 
   // Install handler functions for certain messages.
-  installProtobufHandler<SubmitSchedulerRequest>(
+  install<SubmitSchedulerRequest>(
       &Master::submitScheduler,
       &SubmitSchedulerRequest::name);
 
-  installProtobufHandler<NewMasterDetectedMessage>(
+  install<NewMasterDetectedMessage>(
       &Master::newMasterDetected,
       &NewMasterDetectedMessage::pid);
 
-  installProtobufHandler<NoMasterDetectedMessage>(
+  install<NoMasterDetectedMessage>(
       &Master::noMasterDetected);
 
-  installProtobufHandler<RegisterFrameworkMessage>(
+  install<RegisterFrameworkMessage>(
       &Master::registerFramework,
       &RegisterFrameworkMessage::framework);
 
-  installProtobufHandler<ReregisterFrameworkMessage>(
+  install<ReregisterFrameworkMessage>(
       &Master::reregisterFramework,
       &ReregisterFrameworkMessage::framework_id,
       &ReregisterFrameworkMessage::framework,
       &ReregisterFrameworkMessage::failover);
 
-  installProtobufHandler<UnregisterFrameworkMessage>(
+  install<UnregisterFrameworkMessage>(
       &Master::unregisterFramework,
       &UnregisterFrameworkMessage::framework_id);
 
-  installProtobufHandler<DeactivateFrameworkMessage>(
+  install<DeactivateFrameworkMessage>(
         &Master::deactivateFramework,
         &DeactivateFrameworkMessage::framework_id);
 
-  installProtobufHandler<ResourceRequestMessage>(
+  install<ResourceRequestMessage>(
       &Master::resourceRequest,
       &ResourceRequestMessage::framework_id,
       &ResourceRequestMessage::requests);
 
-  installProtobufHandler<LaunchTasksMessage>(
+  install<LaunchTasksMessage>(
       &Master::launchTasks,
       &LaunchTasksMessage::framework_id,
       &LaunchTasksMessage::offer_id,
       &LaunchTasksMessage::tasks,
       &LaunchTasksMessage::filters);
 
-  installProtobufHandler<ReviveOffersMessage>(
+  install<ReviveOffersMessage>(
       &Master::reviveOffers,
       &ReviveOffersMessage::framework_id);
 
-  installProtobufHandler<KillTaskMessage>(
+  install<KillTaskMessage>(
       &Master::killTask,
       &KillTaskMessage::framework_id,
       &KillTaskMessage::task_id);
 
-  installProtobufHandler<FrameworkToExecutorMessage>(
+  install<FrameworkToExecutorMessage>(
       &Master::schedulerMessage,
       &FrameworkToExecutorMessage::slave_id,
       &FrameworkToExecutorMessage::framework_id,
       &FrameworkToExecutorMessage::executor_id,
       &FrameworkToExecutorMessage::data);
 
-  installProtobufHandler<RegisterSlaveMessage>(
+  install<RegisterSlaveMessage>(
       &Master::registerSlave,
       &RegisterSlaveMessage::slave);
 
-  installProtobufHandler<ReregisterSlaveMessage>(
+  install<ReregisterSlaveMessage>(
       &Master::reregisterSlave,
       &ReregisterSlaveMessage::slave_id,
       &ReregisterSlaveMessage::slave,
       &ReregisterSlaveMessage::executor_infos,
       &ReregisterSlaveMessage::tasks);
 
-  installProtobufHandler<UnregisterSlaveMessage>(
+  install<UnregisterSlaveMessage>(
       &Master::unregisterSlave,
       &UnregisterSlaveMessage::slave_id);
 
-  installProtobufHandler<StatusUpdateMessage>(
+  install<StatusUpdateMessage>(
       &Master::statusUpdate,
       &StatusUpdateMessage::update,
       &StatusUpdateMessage::pid);
 
-  installProtobufHandler<ExecutorToFrameworkMessage>(
+  install<ExecutorToFrameworkMessage>(
       &Master::executorMessage,
       &ExecutorToFrameworkMessage::slave_id,
       &ExecutorToFrameworkMessage::framework_id,
       &ExecutorToFrameworkMessage::executor_id,
       &ExecutorToFrameworkMessage::data);
 
-  installProtobufHandler<ExitedExecutorMessage>(
+  install<ExitedExecutorMessage>(
       &Master::exitedExecutor,
       &ExitedExecutorMessage::slave_id,
       &ExitedExecutorMessage::framework_id,
       &ExitedExecutorMessage::executor_id,
       &ExitedExecutorMessage::status);
 
-  // Install some message handlers.
-  installMessageHandler(EXITED, &Master::exited);
+  // Setup HTTP request handlers.
+  route("vars", bind(&http::vars, cref(*this), params::_1));
+  route("stats.json", bind(&http::json::stats, cref(*this), params::_1));
+  route("state.json", bind(&http::json::state, cref(*this), params::_1));
+}
+
 
-  // Install HTTP request handlers.
-  installHttpHandler(
-      "vars",
-      bind(&http::vars, cref(*this), params::_1));
-
-  installHttpHandler(
-      "stats.json",
-      bind(&http::json::stats, cref(*this), params::_1));
-
-  installHttpHandler(
-      "state.json",
-      bind(&http::json::state, cref(*this), params::_1));
+void Master::finalize()
+{
+  LOG(INFO) << "Master terminating";
+  foreachvalue (Slave* slave, slaves) {
+    send(slave->pid, ShutdownMessage());
+  }
+}
+
+
+void Master::exited(const UPID& pid)
+{
+  foreachvalue (Framework* framework, frameworks) {
+    if (framework->pid == pid) {
+      LOG(INFO) << "Framework " << framework->id << " disconnected";
+
+//       removeFramework(framework);
+
+      // Stop sending offers here for now.
+      framework->active = false;
+
+      // Delay dispatching a message to ourselves for the timeout.
+      delay(failoverTimeout, self(),
+            &Master::frameworkFailoverTimeout,
+            framework->id, framework->reregisteredTime);
+
+      // Remove the framework's offers.
+      foreach (Offer* offer, utils::copy(framework->offers)) {
+        allocator->resourcesRecovered(offer->framework_id(),
+                                      offer->slave_id(),
+                                      offer->resources());
+        removeOffer(offer);
+      }
+      return;
+    }
+  }
+
+  foreachvalue (Slave* slave, slaves) {
+    if (slave->pid == pid) {
+      LOG(INFO) << "Slave " << slave->id << " disconnected";
+      removeSlave(slave);
+      return;
+    }
+  }
 }
 
 
@@ -444,7 +465,7 @@ void Master::submitScheduler(const strin
   LOG(INFO) << "Scheduler submit request for " << name;
   SubmitSchedulerResponse response;
   response.set_okay(false);
-  send(from(), response);
+  reply(response);
 }
 
 
@@ -487,16 +508,16 @@ void Master::registerFramework(const Fra
   }
 
   Framework* framework =
-    new Framework(frameworkInfo, newFrameworkId(), from(), elapsedTime());
+    new Framework(frameworkInfo, newFrameworkId(), from, Clock::now());
 
-  LOG(INFO) << "Registering framework " << framework->id << " at " << from();
+  LOG(INFO) << "Registering framework " << framework->id << " at " << from;
 
   if (framework->info.executor().uri() == "") {
     LOG(INFO) << framework << " registering without an executor URI";
     FrameworkErrorMessage message;
     message.set_code(1);
     message.set_message("No executor URI given");
-    send(from(), message);
+    reply(message);
     delete framework;
     return;
   }
@@ -509,7 +530,7 @@ void Master::registerFramework(const Fra
     FrameworkErrorMessage message;
     message.set_code(1);
     message.set_message("User 'root' is not allowed to run frameworks");
-    send(from(), message);
+    reply(message);
     delete framework;
     return;
   }
@@ -533,7 +554,7 @@ void Master::reregisterFramework(const F
     FrameworkErrorMessage message;
     message.set_code(1);
     message.set_message("Missing framework id");
-    send(from(), message);
+    reply(message);
     return;
   }
 
@@ -543,12 +564,12 @@ void Master::reregisterFramework(const F
     FrameworkErrorMessage message;
     message.set_code(1);
     message.set_message("No executor URI given");
-    send(from(), message);
+    reply(message);
     return;
   }
 
   LOG(INFO) << "Re-registering framework " << frameworkId
-            << " at " << from();
+            << " at " << from;
 
   if (frameworks.count(frameworkId) > 0) {
     // Using the "failover" of the scheduler allows us to keep a
@@ -567,7 +588,7 @@ void Master::reregisterFramework(const F
       // TODO: Should we check whether the new scheduler has given
       // us a different framework name, user name or executor info?
       LOG(INFO) << "Framework " << frameworkId << " failed over";
-      failoverFramework(framework, from());
+      failoverFramework(framework, from);
     } else {
       LOG(INFO) << "Allowing the Framework " << frameworkId
                 << " to re-register with an already used id";
@@ -585,7 +606,7 @@ void Master::reregisterFramework(const F
 
       FrameworkReregisteredMessage message;
       message.mutable_framework_id()->MergeFrom(frameworkId);
-      send(from(), message);
+      reply(message);
       return;
     }
   } else {
@@ -594,7 +615,7 @@ void Master::reregisterFramework(const F
     // failed-over one is connecting. Create a Framework object and add
     // any tasks it has that have been reported by reconnecting slaves.
     Framework* framework =
-      new Framework(frameworkInfo, frameworkId, from(), elapsedTime());
+      new Framework(frameworkInfo, frameworkId, from, Clock::now());
 
     // TODO(benh): Check for root submissions like above!
 
@@ -626,7 +647,7 @@ void Master::reregisterFramework(const F
   foreachvalue (Slave* slave, slaves) {
     UpdateFrameworkMessage message;
     message.mutable_framework_id()->MergeFrom(frameworkId);
-    message.set_pid(from());
+    message.set_pid(from);
     send(slave->pid, message);
   }
 }
@@ -638,10 +659,10 @@ void Master::unregisterFramework(const F
 
   Framework* framework = getFramework(frameworkId);
   if (framework != NULL) {
-    if (framework->pid == from()) {
+    if (framework->pid == from) {
       removeFramework(framework);
     } else {
-      LOG(WARNING) << from() << " tried to unregister framework; "
+      LOG(WARNING) << from << " tried to unregister framework; "
                    << "expecting " << framework->pid;
     }
   }
@@ -654,10 +675,10 @@ void Master::deactivateFramework(const F
   Framework* framework = getFramework(frameworkId);
 
   if (framework != NULL) {
-    if (framework->pid == from()) {
+    if (framework->pid == from) {
       framework->active = false;
     } else {
-      LOG(WARNING) << from() << " tried to deactivate framework; "
+      LOG(WARNING) << from << " tried to deactivate framework; "
         << "expecting " << framework->pid;
     }
   }
@@ -696,6 +717,7 @@ void Master::launchTasks(const Framework
       // to same offer, etc). Report all tasks in it as failed.
       // TODO: Consider adding a new task state TASK_INVALID for
       // situations like these.
+      LOG(WARNING) << "Offer " << offerId << " is no longer valid";
       foreach (const TaskDescription& task, tasks) {
         StatusUpdateMessage message;
         StatusUpdate* update = message.mutable_update();
@@ -704,7 +726,7 @@ void Master::launchTasks(const Framework
         status->mutable_task_id()->MergeFrom(task.task_id());
         status->set_state(TASK_LOST);
         status->set_message("Task launched with invalid offer");
-        update->set_timestamp(elapsedTime());
+        update->set_timestamp(Clock::now());
         update->set_uuid(UUID::random().toBytes());
         send(framework->pid, message);
       }
@@ -762,7 +784,7 @@ void Master::killTask(const FrameworkID&
       status->mutable_task_id()->MergeFrom(taskId);
       status->set_state(TASK_LOST);
       status->set_message("Task not found");
-      update->set_timestamp(elapsedTime());
+      update->set_timestamp(Clock::now());
       update->set_uuid(UUID::random().toBytes());
       send(framework->pid, message);
     }
@@ -812,22 +834,39 @@ void Master::registerSlave(const SlaveIn
     return;
   }
 
-  Slave* slave = new Slave(slaveInfo, newSlaveId(), from(), elapsedTime());
+  // Check if this slave has already registered (because it's retrying).
+  foreachvalue (Slave* slave, slaves) {
+    if (slave->pid == from) {
+      LOG(INFO) << "Slave " << slave->id << " (" << slave->info.hostname()
+                << ") already registered, re-sending acknowledgement";
+      SlaveRegisteredMessage message;
+      message.mutable_slave_id()->MergeFrom(slave->id);
+      send(slave->pid, message);
+      return;
+    }
+  }
+
+  Slave* slave = new Slave(slaveInfo, newSlaveId(), from, Clock::now());
 
   LOG(INFO) << "Attempting to register slave " << slave->id
             << " at " << slave->pid;
 
-  // Checks if this slave, or if all slaves, can be accepted.
-  if (slaveHostnamePorts.contains(slaveInfo.hostname(), from().port)) {
-    run(&SlaveRegistrar::run, slave, self());
-  } else if (conf.get<string>("slaves", "*") == "*") {
-    run(&SlaveRegistrar::run, slave, self(), slavesManager->self());
-  } else {
-    LOG(WARNING) << "Cannot register slave at "
-                 << slaveInfo.hostname() << ":" << from().port
-                 << " because not in allocated set of slaves!";
-    send(from(), TERMINATE);
-  }
+  // TODO(benh): We assume all slaves can register for now.
+  CHECK(conf.get<string>("slaves", "*") == "*");
+  activatedSlaveHostnamePort(slave->info.hostname(), slave->pid.port);
+  addSlave(slave);
+
+//   // Checks if this slave, or if all slaves, can be accepted.
+//   if (slaveHostnamePorts.contains(slaveInfo.hostname(), from.port)) {
+//     run(&SlaveRegistrar::run, slave, self());
+//   } else if (conf.get<string>("slaves", "*") == "*") {
+//     run(&SlaveRegistrar::run, slave, self(), slavesManager->self());
+//   } else {
+//     LOG(WARNING) << "Cannot register slave at "
+//                  << slaveInfo.hostname() << ":" << from.port
+//                  << " because not in allocated set of slaves!";
+//     reply(ShutdownMessage());
+//   }
 }
 
 
@@ -843,18 +882,18 @@ void Master::reregisterSlave(const Slave
 
   if (slaveId == "") {
     LOG(ERROR) << "Slave re-registered without an id!";
-    send(from(), TERMINATE);
+    reply(ShutdownMessage());
   } else {
     Slave* slave = getSlave(slaveId);
     if (slave != NULL) {
-      // NOTE: This handles the case where a slave tries to re-register with an
-      // existing master (e.g. because of a spurious zookeeper session
-      // expiration.)
-      // For now, we assume this slave is not nefarious (eventually this will
-      // be handled by orthogonal security measures like key based
-      // authentication).
+      // NOTE: This handles the case where a slave tries to
+      // re-register with an existing master (e.g. because of a
+      // spurious zookeeper session expiration).
+      // For now, we assume this slave is not nefarious (eventually
+      // this will be handled by orthogonal security measures like key
+      // based authentication).
 
-      LOG(WARNING) << "Slave at " << from()
+      LOG(WARNING) << "Slave at " << from
                    << " is being allowed to re-register with an already"
                    << " in use id (" << slaveId << ")";
 
@@ -863,23 +902,28 @@ void Master::reregisterSlave(const Slave
       send(slave->pid, message);
 
     } else {
-      Slave* slave = new Slave(slaveInfo, slaveId, from(), elapsedTime());
+      Slave* slave = new Slave(slaveInfo, slaveId, from, Clock::now());
 
       LOG(INFO) << "Attempting to re-register slave " << slave->id
                 << " at " << slave->pid;
 
-      // Checks if this slave, or if all slaves, can be accepted.
-      if (slaveHostnamePorts.contains(slaveInfo.hostname(), from().port)) {
-        run(&SlaveReregistrar::run, slave, executorInfos, tasks, self());
-      } else if (conf.get<string>("slaves", "*") == "*") {
-        run(&SlaveReregistrar::run,
-            slave, executorInfos, tasks, self(), slavesManager->self());
-      } else {
-        LOG(WARNING) << "Cannot re-register slave at "
-                     << slaveInfo.hostname() << ":" << from().port
-                     << " because not in allocated set of slaves!";
-        send(from(), TERMINATE);
-      }
+      // TODO(benh): We assume all slaves can register for now.
+      CHECK(conf.get<string>("slaves", "*") == "*");
+      activatedSlaveHostnamePort(slave->info.hostname(), slave->pid.port);
+      readdSlave(slave, executorInfos, tasks);
+
+//       // Checks if this slave, or if all slaves, can be accepted.
+//       if (slaveHostnamePorts.contains(slaveInfo.hostname(), from.port)) {
+//         run(&SlaveReregistrar::run, slave, executorInfos, tasks, self());
+//       } else if (conf.get<string>("slaves", "*") == "*") {
+//         run(&SlaveReregistrar::run,
+//             slave, executorInfos, tasks, self(), slavesManager->self());
+//       } else {
+//         LOG(WARNING) << "Cannot re-register slave at "
+//                      << slaveInfo.hostname() << ":" << from.port
+//                      << " because not in allocated set of slaves!";
+//         reply(ShutdownMessage());
+//       }
     }
   }
 }
@@ -902,7 +946,7 @@ void Master::statusUpdate(const StatusUp
 {
   const TaskStatus& status = update.status();
 
-  LOG(INFO) << "Status update from " << from()
+  LOG(INFO) << "Status update from " << from
             << ": task " << status.task_id()
             << " of framework " << update.framework_id()
             << " is now in state " << status.state();
@@ -934,19 +978,19 @@ void Master::statusUpdate(const StatusUp
 
         stats.validStatusUpdates++;
       } else {
-        LOG(WARNING) << "Status update from " << from()
+        LOG(WARNING) << "Status update from " << from
                      << ": error, couldn't lookup "
                      << "task " << status.task_id();
 	stats.invalidStatusUpdates++;
       }
     } else {
-      LOG(WARNING) << "Status update from " << from()
+      LOG(WARNING) << "Status update from " << from
                    << ": error, couldn't lookup "
                    << "framework " << update.framework_id();
       stats.invalidStatusUpdates++;
     }
   } else {
-    LOG(WARNING) << "Status update from " << from()
+    LOG(WARNING) << "Status update from " << from
                  << ": error, couldn't lookup slave "
                  << update.slave_id();
     stats.invalidStatusUpdates++;
@@ -1030,7 +1074,7 @@ void Master::exitedExecutor(const SlaveI
           status->mutable_task_id()->MergeFrom(task->task_id());
           status->set_state(TASK_LOST);
           status->set_message("Lost executor");
-          update->set_timestamp(elapsedTime());
+          update->set_timestamp(Clock::now());
           update->set_uuid(UUID::random().toBytes());
           send(framework->pid, message);
 
@@ -1074,7 +1118,7 @@ void Master::deactivatedSlaveHostnamePor
         LOG(WARNING) << "Removing slave " << slave->id << " at "
           << hostname << ":" << port
           << " because it has been deactivated";
-        send(slave->pid, TERMINATE);
+        send(slave->pid, ShutdownMessage());
         removeSlave(slave);
         break;
       }
@@ -1091,7 +1135,7 @@ void Master::timerTick()
 {
   // Check which framework filters can be expired.
   foreachvalue (Framework* framework, frameworks) {
-    framework->removeExpiredFilters(elapsedTime());
+    framework->removeExpiredFilters(Clock::now());
   }
 
   // Do allocations!
@@ -1115,43 +1159,6 @@ void Master::frameworkFailoverTimeout(co
 }
 
 
-void Master::exited()
-{
-  foreachvalue (Framework* framework, frameworks) {
-    if (framework->pid == from()) {
-      LOG(INFO) << "Framework " << framework->id << " disconnected";
-
-//       removeFramework(framework);
-
-      // Stop sending offers here for now.
-      framework->active = false;
-
-      // Delay dispatching a message to ourselves for the timeout.
-      delay(failoverTimeout, self(),
-            &Master::frameworkFailoverTimeout,
-            framework->id, framework->reregisteredTime);
-
-      // Remove the framework's offers.
-      foreach (Offer* offer, utils::copy(framework->offers)) {
-        allocator->resourcesRecovered(offer->framework_id(),
-                                      offer->slave_id(),
-                                      offer->resources());
-        removeOffer(offer);
-      }
-      return;
-    }
-  }
-
-  foreachvalue (Slave* slave, slaves) {
-    if (slave->pid == from()) {
-      LOG(INFO) << "Slave " << slave->id << " disconnected";
-      removeSlave(slave);
-      return;
-    }
-  }
-}
-
-
 void Master::makeOffers(Framework* framework,
                         const hashmap<Slave*, Resources>& offered)
 {
@@ -1368,6 +1375,7 @@ void Master::processTasks(Offer* offer,
       usedResources += launchTask(task, framework, slave);
     } else {
       // Error validating task, send a failed status update.
+      LOG(WARNING) << "Error validating task: " << error.get();
       StatusUpdateMessage message;
       StatusUpdate* update = message.mutable_update();
       update->mutable_framework_id()->MergeFrom(framework->id);
@@ -1375,7 +1383,7 @@ void Master::processTasks(Offer* offer,
       status->mutable_task_id()->MergeFrom(task.task_id());
       status->set_state(TASK_LOST);
       status->set_message(error.get());
-      update->set_timestamp(elapsedTime());
+      update->set_timestamp(Clock::now());
       update->set_uuid(UUID::random().toBytes());
       send(framework->pid, message);
     }
@@ -1414,7 +1422,7 @@ void Master::processTasks(Offer* offer,
               << " for framework " << framework->id
               << " for " << timeout << " seconds";
     framework->slaveFilter[slave] =
-      (timeout == -1) ? 0 : elapsedTime() + timeout;
+      (timeout == -1) ? 0 : Clock::now() + timeout;
   }
 
   removeOffer(offer);
@@ -1518,7 +1526,7 @@ void Master::failoverFramework(Framework
   // Make sure we can get offers again.
   framework->active = true;
 
-  framework->reregisteredTime = elapsedTime();
+  framework->reregisteredTime = Clock::now();
 
   {
     FrameworkRegisteredMessage message;
@@ -1582,7 +1590,7 @@ void Master::removeFramework(Framework* 
 
   // TODO(benh): unlink(framework->pid);
 
-  framework->unregisteredTime = elapsedTime();
+  framework->unregisteredTime = Clock::now();
 
   completedFrameworks.push_back(*framework);
 
@@ -1626,7 +1634,7 @@ void Master::addSlave(Slave* slave, bool
 
   // Set up an observer for the slave.
   slave->observer = new SlaveObserver(slave->pid, slave->info,
-                                      slave->id, slavesManager->self());
+                                      slave->id, self());
   spawn(slave->observer);
 
   allocator->slaveAdded(slave);
@@ -1717,7 +1725,7 @@ void Master::removeSlave(Slave* slave)
       status->mutable_task_id()->MergeFrom(task->task_id());
       status->set_state(TASK_LOST);
       status->set_message("Slave removed");
-      update->set_timestamp(elapsedTime());
+      update->set_timestamp(Clock::now());
       update->set_uuid(UUID::random().toBytes());
       send(framework->pid, message);
     }
@@ -1853,7 +1861,7 @@ FrameworkID Master::newFrameworkId()
 {
   std::ostringstream out;
 
-  out << id << "-" << std::setw(4)
+  out << info.id() << "-" << std::setw(4)
       << std::setfill('0') << nextFrameworkId++;
 
   FrameworkID frameworkId;
@@ -1866,7 +1874,7 @@ FrameworkID Master::newFrameworkId()
 OfferID Master::newOfferId()
 {
   OfferID offerId;
-  offerId.set_value(id + "-" + utils::stringify(nextOfferId++));
+  offerId.set_value(info.id() + "-" + utils::stringify(nextOfferId++));
   return offerId;
 }
 
@@ -1874,7 +1882,7 @@ OfferID Master::newOfferId()
 SlaveID Master::newSlaveId()
 {
   SlaveID slaveId;
-  slaveId.set_value(id + "-" + utils::stringify(nextSlaveId++));
+  slaveId.set_value(info.id() + "-" + utils::stringify(nextSlaveId++));
   return slaveId;
 }
 

Modified: incubator/mesos/trunk/src/master/master.hpp
URL: http://svn.apache.org/viewvc/incubator/mesos/trunk/src/master/master.hpp?rev=1236485&r1=1236484&r2=1236485&view=diff
==============================================================================
--- incubator/mesos/trunk/src/master/master.hpp (original)
+++ incubator/mesos/trunk/src/master/master.hpp Fri Jan 27 01:25:13 2012
@@ -108,7 +108,6 @@ public:
   void timerTick();
   void frameworkFailoverTimeout(const FrameworkID& frameworkId,
                                 double reregisteredTime);
-  void exited();
 
   // Return connected frameworks that are not in the process of being removed
   std::vector<Framework*> getActiveFrameworks() const;
@@ -120,9 +119,9 @@ public:
                   const hashmap<Slave*, Resources>& offered);
 
 protected:
-  virtual void operator () ();
-
-  void initialize();
+  virtual void initialize();
+  virtual void finalize();
+  virtual void exited(const UPID& pid);
 
   // Process a launch tasks request (for a non-cancelled offer) by
   // launching the desired tasks (if the offer contains a valid set of
@@ -184,15 +183,15 @@ private:
   // Http handlers, friends of the master in order to access state,
   // they get invoked from within the master so there is no need to
   // use synchronization mechanisms to protect state.
-  friend Promise<HttpResponse> http::vars(
+  friend Future<HttpResponse> http::vars(
       const Master& master,
       const HttpRequest& request);
 
-  friend Promise<HttpResponse> http::json::stats(
+  friend Future<HttpResponse> http::json::stats(
       const Master& master,
       const HttpRequest& request);
 
-  friend Promise<HttpResponse> http::json::state(
+  friend Future<HttpResponse> http::json::state(
       const Master& master,
       const HttpRequest& request);
 
@@ -203,10 +202,7 @@ private:
   Allocator* allocator;
   SlavesManager* slavesManager;
 
-  // Contains the date the master was launched and some ephemeral
-  // token (e.g. returned from ZooKeeper). Used in framework and slave
-  // IDs created by this master.
-  std::string id;
+  MasterInfo info;
 
   multihashmap<std::string, uint16_t> slaveHostnamePorts;