You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@mesos.apache.org by be...@apache.org on 2011/06/05 10:56:39 UTC

svn commit: r1132190 - in /incubator/mesos/trunk: src/detector/zookeeper.cpp third_party/libprocess/process.cpp third_party/libprocess/process.hpp third_party/libprocess/reliable.cpp third_party/libprocess/reliable.hpp

Author: benh
Date: Sun Jun  5 08:56:39 2011
New Revision: 1132190

URL: http://svn.apache.org/viewvc?rev=1132190&view=rev
Log:
Added asynchronous function dispatch/call support via futures and promises to libprocess.

Modified:
    incubator/mesos/trunk/src/detector/zookeeper.cpp
    incubator/mesos/trunk/third_party/libprocess/process.cpp
    incubator/mesos/trunk/third_party/libprocess/process.hpp
    incubator/mesos/trunk/third_party/libprocess/reliable.cpp
    incubator/mesos/trunk/third_party/libprocess/reliable.hpp

Modified: incubator/mesos/trunk/src/detector/zookeeper.cpp
URL: http://svn.apache.org/viewvc/incubator/mesos/trunk/src/detector/zookeeper.cpp?rev=1132190&r1=1132189&r2=1132190&view=diff
==============================================================================
--- incubator/mesos/trunk/src/detector/zookeeper.cpp (original)
+++ incubator/mesos/trunk/src/detector/zookeeper.cpp Sun Jun  5 08:56:39 2011
@@ -164,8 +164,9 @@ protected:
   void operator () ()
   {
     WatcherProcess *process = this;
-    if (call(manager, REGISTER,
-	     reinterpret_cast<char *>(&process), sizeof(process)) != OK)
+    send(manager, REGISTER,
+         reinterpret_cast<char *>(&process), sizeof(process));
+    if (receive() != OK)
       fatal("failed to setup underlying watcher mechanism");
     while (true) {
       switch (receive()) {
@@ -177,8 +178,9 @@ protected:
 	break;
       }
       case TERMINATE:
-	if (call(manager, UNREGISTER,
-		 reinterpret_cast<char *>(&process), sizeof(process)) != OK)
+	send(manager, UNREGISTER,
+             reinterpret_cast<char *>(&process), sizeof(process));
+        if (receive() != OK)
 	  fatal("failed to cleanup underlying watcher mechanism");
 	return;
       }
@@ -277,8 +279,9 @@ Watcher::~Watcher()
   protected:
     void operator () ()
     {
-      if (call(manager, LOOKUP_PROCESS,
-	       reinterpret_cast<char *>(&watcher), sizeof(watcher)) != OK)
+      send(manager, LOOKUP_PROCESS,
+           reinterpret_cast<char *>(&watcher), sizeof(watcher));
+      if (receive() != OK)
 	fatal("failed to deallocate resources associated with Watcher");
       WatcherProcess *process =
 	*reinterpret_cast<WatcherProcess **>(const_cast<char *>(body(NULL)));
@@ -445,8 +448,9 @@ protected:
     // Lookup and cache the WatcherProcess PID associated with our
     // Watcher _before_ we yield control via calling zookeeper_process
     // so that Watcher callbacks can occur.
-    if (call(manager, LOOKUP_PID,
-	     reinterpret_cast<char *>(&watcher), sizeof(watcher)) != OK)
+    send(manager, LOOKUP_PID,
+         reinterpret_cast<char *>(&watcher), sizeof(watcher));
+    if (receive() != OK)
       fatal("failed to setup underlying ZooKeeper mechanisms");
 
     // TODO(benh): Link with WatcherProcess?
@@ -468,7 +472,9 @@ protected:
 	tv.tv_usec = 0;
       }
 
-      if (await(fd, ops, tv, false)) {
+      double secs = tv.tv_sec + (tv.tv_usec * 1e-6);
+
+      if (await(fd, ops, secs, false)) {
 	// Either timer expired (might be 0) or data became available on fd.
 	process(fd, ops);
       } else {
@@ -639,10 +645,11 @@ int ZooKeeper::create(const string &path
   protected:
     void operator () ()
     {
-      if (call(zooKeeperProcess->self(),
-	       CREATE,
-	       reinterpret_cast<char *>(&createCall),
-	       sizeof(CreateCall *)) != COMPLETED)
+      send(zooKeeperProcess->self(),
+           CREATE,
+           reinterpret_cast<char *>(&createCall),
+           sizeof(CreateCall *));
+      if (receive() != COMPLETED)
 	createCall->ret = ZSYSTEMERROR;
     }
 
@@ -677,10 +684,11 @@ int ZooKeeper::remove(const string &path
   protected:
     void operator () ()
     {
-      if (call(zooKeeperProcess->self(),
-	       REMOVE,
-	       reinterpret_cast<char *>(&removeCall),
-	       sizeof(RemoveCall *)) != COMPLETED)
+      send(zooKeeperProcess->self(),
+           REMOVE,
+           reinterpret_cast<char *>(&removeCall),
+           sizeof(RemoveCall *));
+      if (receive() != COMPLETED)
 	removeCall->ret = ZSYSTEMERROR;
     }
 
@@ -717,10 +725,11 @@ int ZooKeeper::exists(const string &path
   protected:
     void operator () ()
     {
-      if (call(zooKeeperProcess->self(),
-	       EXISTS,
-	       reinterpret_cast<char *>(&existsCall),
-	       sizeof(ExistsCall *)) != COMPLETED)
+      send(zooKeeperProcess->self(),
+           EXISTS,
+           reinterpret_cast<char *>(&existsCall),
+           sizeof(ExistsCall *));
+      if (receive() != COMPLETED)
 	existsCall->ret = ZSYSTEMERROR;
     }
 
@@ -759,10 +768,11 @@ int ZooKeeper::get(const string &path,
   protected:
     void operator () ()
     {
-      if (call(zooKeeperProcess->self(),
-	       GET,
-	       reinterpret_cast<char *>(&getCall),
-	       sizeof(GetCall *)) != COMPLETED)
+      send(zooKeeperProcess->self(),
+           GET,
+           reinterpret_cast<char *>(&getCall),
+           sizeof(GetCall *));
+      if (receive() != COMPLETED)
 	getCall->ret = ZSYSTEMERROR;
     }
 
@@ -799,10 +809,11 @@ int ZooKeeper::getChildren(const string 
   protected:
     void operator () ()
     {
-      if (call(zooKeeperProcess->self(),
-	       GET_CHILDREN,
-	       reinterpret_cast<char *>(&getChildrenCall),
-	       sizeof(GetChildrenCall *)) != COMPLETED)
+      send(zooKeeperProcess->self(),
+           GET_CHILDREN,
+           reinterpret_cast<char *>(&getChildrenCall),
+           sizeof(GetChildrenCall *));
+      if (receive() != COMPLETED)
 	getChildrenCall->ret = ZSYSTEMERROR;
     }
 
@@ -839,10 +850,11 @@ int ZooKeeper::set(const string &path,
   protected:
     void operator () ()
     {
-      if (call(zooKeeperProcess->self(),
-	       SET,
-	       reinterpret_cast<char *>(&setCall),
-	       sizeof(SetCall *)) != COMPLETED)
+      send(zooKeeperProcess->self(),
+           SET,
+           reinterpret_cast<char *>(&setCall),
+           sizeof(SetCall *));
+      if (receive() != COMPLETED)
 	setCall->ret = ZSYSTEMERROR;
     }
 

Modified: incubator/mesos/trunk/third_party/libprocess/process.cpp
URL: http://svn.apache.org/viewvc/incubator/mesos/trunk/third_party/libprocess/process.cpp?rev=1132190&r1=1132189&r2=1132190&view=diff
==============================================================================
--- incubator/mesos/trunk/third_party/libprocess/process.cpp (original)
+++ incubator/mesos/trunk/third_party/libprocess/process.cpp Sun Jun  5 08:56:39 2011
@@ -60,6 +60,8 @@ using std::queue;
 using std::set;
 using std::stack;
 
+using std::tr1::function;
+
 
 #define Byte (1)
 #define Kilobyte (1024*Byte)
@@ -249,6 +251,8 @@ public:
   void run(Process *process);
   void cleanup(Process *process);
 
+  static void invoke(const function<void (void)> &thunk);
+
 private:
   timeout create_timeout(Process *process, double secs);
   void start_timeout(const timeout &timeout);
@@ -271,15 +275,15 @@ private:
 
 
 /* Tick, tock ... manually controlled clock! */
-class InternalProcessClock
+class InternalClock
 {
 public:
-  InternalProcessClock()
+  InternalClock()
   {
     initial = current = elapsed = ev_time();
   }
 
-  ~InternalProcessClock() {}
+  ~InternalClock() {}
 
   ev_tstamp getCurrent(Process *process)
   {
@@ -334,7 +338,7 @@ private:
 
 
 /* Using manual clock if non-null. */
-static InternalProcessClock *clk = NULL;
+static InternalClock *clk = NULL;
 
 /* Global 'pipe' id uniquely assigned to each process. */
 static uint32_t global_pipe = 0;
@@ -405,8 +409,8 @@ static Process *proc_process = NULL;
 static bool legacy = false;
 
 /* Thunk to safely call into legacy. */
-// static __thread std::tr1::function<void (void)> *legacy_thunk;
-static const std::tr1::function<void (void)> *legacy_thunk;
+// static __thread function<void (void)> *legacy_thunk;
+static const function<void (void)> *legacy_thunk;
 
 /* Scheduler gate. */
 static Gate *gate = new Gate();
@@ -441,7 +445,7 @@ static map<uint32_t, deque<uint32_t> > *
  * recursive incase a filterer wants to do anything fancy (which is
  * possible and likely given that filters will get used for testing).
 */
-static MessageFilter *filterer = NULL;
+static Filter *filterer = NULL;
 static synchronizable(filterer) = SYNCHRONIZED_INITIALIZER_RECURSIVE;
 
 
@@ -497,7 +501,7 @@ void handle_async(struct ev_loop *loop, 
           ev_feed_event(loop, &timeouts_watcher, EV_TIMEOUT);
         } else {
 	  // Only repeat the timer if not using a manual clock (a call
-	  // to ProcessClock::advance() will force a timer event later).
+	  // to Clock::advance() will force a timer event later).
 	  if (clk != NULL && timeouts_watcher.repeat > 0)
 	    timeouts_watcher.repeat = 0;
 	  ev_timer_again(loop, &timeouts_watcher);
@@ -2051,14 +2055,6 @@ bool ProcessManager::await(Process *proc
       return false;
     }
 
-    assert(secs > 0);
-
-    /* Create timeout. */
-    const timeout &timeout = create_timeout(process, secs);
-
-    /* Start the timeout. */
-    start_timeout(timeout);
-
     // Treat an await with a bad fd as an interruptible pause!
     if (fd >= 0) {
       /* Allocate/Initialize the watcher. */
@@ -2086,6 +2082,15 @@ bool ProcessManager::await(Process *proc
       ev_async_send(loop, &async_watcher);
     }
 
+    assert(secs >= 0);
+
+    timeout timeout;
+
+    if (secs != 0) {
+      timeout = create_timeout(process, secs);
+      start_timeout(timeout);
+    }
+
     /* Context switch. */
     process->state = Process::AWAITING;
     swapcontext(&process->uctx, &proc_uctx_running);
@@ -2094,8 +2099,9 @@ bool ProcessManager::await(Process *proc
            process->state == Process::INTERRUPTED);
 
     /* Attempt to cancel the timer if necessary. */
-    if (process->state != Process::TIMEDOUT)
-      cancel_timeout(timeout);
+    if (secs != 0)
+      if (process->state != Process::TIMEDOUT)
+        cancel_timeout(timeout);
 
     if (process->state == Process::INTERRUPTED)
       interrupted = true;
@@ -2323,6 +2329,16 @@ void ProcessManager::cleanup(Process *pr
 }
 
 
+void ProcessManager::invoke(const function<void (void)> &thunk)
+{
+  legacy_thunk = &thunk;
+  legacy = true;
+  assert(proc_process != NULL);
+  swapcontext(&proc_process->uctx, &proc_uctx_running);
+  legacy = false;
+}
+
+
 timeout ProcessManager::create_timeout(Process *process, double secs)
 {
   assert(process != NULL);
@@ -2380,7 +2396,7 @@ void ProcessManager::cancel_timeout(cons
 }
 
 
-void ProcessClock::pause()
+void Clock::pause()
 {
   initialize();
 
@@ -2391,7 +2407,7 @@ void ProcessClock::pause()
     // sends and spawning new processes, not currently done for
     // PROCESS_EXIT messages).
     if (clk == NULL) {
-      clk = new InternalProcessClock();
+      clk = new InternalClock();
 
       // The existing libev timer might actually timeout, but now that
       // clk != NULL, no "time" will actually have passed, so no
@@ -2401,7 +2417,7 @@ void ProcessClock::pause()
 }
 
 
-void ProcessClock::resume()
+void Clock::resume()
 {
   initialize();
 
@@ -2417,7 +2433,7 @@ void ProcessClock::resume()
 }
 
 
-void ProcessClock::advance(double secs)
+void Clock::advance(double secs)
 {
   synchronized(timeouts) {
     if (clk != NULL) {
@@ -2696,11 +2712,30 @@ MSGID Process::receive(double secs)
 }
 
 
-MSGID Process::call(const PID &to, MSGID id,
-		    const char *data, size_t length, double secs)
+MSGID Process::serve(bool forever)
+{
+  do {
+    switch (receive()) {
+      case PROCESS_DISPATCH: {
+        void *pointer = (char *) current + sizeof(struct msg);
+        std::tr1::function<void (void)> *delegator =
+          *reinterpret_cast<std::tr1::function<void (void)> **>(pointer);
+        (*delegator)();
+        delete delegator;
+        break;
+      }
+
+      default: {
+        return msgid();
+      }
+    }
+  } while (forever);
+};
+
+
+void Process::operator () ()
 {
-  send(to, id, data, length);
-  return receive(secs);
+  serve();
 }
 
 
@@ -2742,17 +2777,9 @@ PID Process::link(const PID &to)
 }
 
 
-bool Process::await(int fd, int op, const timeval& tv)
-{
-  return await(fd, op, tv, true);
-}
-
-
-bool Process::await(int fd, int op, const timeval& tv, bool ignore)
+bool Process::await(int fd, int op, double secs, bool ignore)
 {
-  double secs = tv.tv_sec + (tv.tv_usec * 1e-6);
-
-  if (secs <= 0)
+  if (secs < 0)
     return true;
 
   /* TODO(benh): Handle invoking await from "outside" thread. */
@@ -2809,44 +2836,6 @@ double Process::elapsed()
 }
 
 
-void Process::post(const PID &to, MSGID id, const char *data, size_t length)
-{
-  initialize();
-
-  if (replaying)
-    return;
-
-  if (!to)
-    return;
-
-  /* Disallow sending messages using an internal id. */
-  if (id < PROCESS_MSGID)
-    return;
-
-  /* Allocate/Initialize outgoing message. */
-  struct msg *msg = (struct msg *) malloc(sizeof(struct msg) + length);
-
-  msg->from.pipe = 0;
-  msg->from.ip = 0;
-  msg->from.port = 0;
-  msg->to.pipe = to.pipe;
-  msg->to.ip = to.ip;
-  msg->to.port = to.port;
-  msg->id = id;
-  msg->len = length;
-
-  if (length > 0)
-    memcpy((char *) msg + sizeof(struct msg), data, length);
-
-  if (to.ip == ip && to.port == port)
-    /* Local message. */
-    process_manager->deliver(msg);
-  else
-    /* Remote message. */
-    link_manager->send(msg);
-}
-
-
 PID Process::spawn(Process *process)
 {
   initialize();
@@ -2866,7 +2855,8 @@ PID Process::spawn(Process *process)
     }
 
     process_manager->spawn(process);
-    return process->pid;
+
+    return process->self();
   } else {
     return PID();
   }
@@ -2899,18 +2889,14 @@ bool Process::wait(const PID &pid)
 }
 
 
-void Process::invoke(const std::tr1::function<void (void)> &thunk)
+void Process::invoke(const function<void (void)> &thunk)
 {
   initialize();
-  legacy_thunk = &thunk;
-  legacy = true;
-  assert(proc_process != NULL);
-  swapcontext(&proc_process->uctx, &proc_uctx_running);
-  legacy = false;
+  ProcessManager::invoke(thunk);
 }
 
 
-void Process::filter(MessageFilter *filter)
+void Process::filter(Filter *filter)
 {
   initialize();
 
@@ -2918,3 +2904,64 @@ void Process::filter(MessageFilter *filt
     filterer = filter;
   }
 }
+
+
+void Process::post(const PID &to, MSGID id, const char *data, size_t length)
+{
+  initialize();
+
+  if (replaying)
+    return;
+
+  if (!to)
+    return;
+
+  /* Disallow sending messages using an internal id. */
+  if (id < PROCESS_MSGID)
+    return;
+
+  /* Allocate/Initialize outgoing message. */
+  struct msg *msg = (struct msg *) malloc(sizeof(struct msg) + length);
+
+  msg->from.pipe = 0;
+  msg->from.ip = 0;
+  msg->from.port = 0;
+  msg->to.pipe = to.pipe;
+  msg->to.ip = to.ip;
+  msg->to.port = to.port;
+  msg->id = id;
+  msg->len = length;
+
+  if (length > 0)
+    memcpy((char *) msg + sizeof(struct msg), data, length);
+
+  if (to.ip == ip && to.port == port)
+    /* Local message. */
+    process_manager->deliver(msg);
+  else
+    /* Remote message. */
+    link_manager->send(msg);
+}
+
+
+void Process::dispatcher(Process *process, function<void (void)> *delegator)
+{
+  if (replaying)
+    return;
+
+  /* Allocate/Initialize outgoing message. */
+  struct msg *msg = (struct msg *) malloc(sizeof(struct msg) + sizeof(delegator));
+
+  msg->from.pipe = 0;
+  msg->from.ip = 0;
+  msg->from.port = 0;
+  msg->to.pipe = process->pid.pipe;
+  msg->to.ip = process->pid.ip;
+  msg->to.port = process->pid.port;
+  msg->id = PROCESS_DISPATCH;
+  msg->len = sizeof(delegator);
+
+  memcpy((char *) msg + sizeof(struct msg), &delegator, sizeof(delegator));
+
+  process_manager->deliver(msg);
+}

Modified: incubator/mesos/trunk/third_party/libprocess/process.hpp
URL: http://svn.apache.org/viewvc/incubator/mesos/trunk/third_party/libprocess/process.hpp?rev=1132190&r1=1132189&r2=1132190&view=diff
==============================================================================
--- incubator/mesos/trunk/third_party/libprocess/process.hpp (original)
+++ incubator/mesos/trunk/third_party/libprocess/process.hpp Sun Jun  5 08:56:39 2011
@@ -1,6 +1,7 @@
 #ifndef PROCESS_HPP
 #define PROCESS_HPP
 
+#include <assert.h>
 #include <stdint.h>
 #include <stdlib.h>
 #include <ucontext.h>
@@ -17,10 +18,15 @@
 typedef uint16_t MSGID;
 
 
-const MSGID PROCESS_ERROR = 0;
-const MSGID PROCESS_TIMEOUT = 1;
-const MSGID PROCESS_EXIT = 2;
-const MSGID PROCESS_MSGID = PROCESS_EXIT+1;
+const MSGID PROCESS_ERROR = 1;
+const MSGID PROCESS_TIMEOUT = 2;
+const MSGID PROCESS_EXIT = 3;
+const MSGID PROCESS_TERMINATE = 4;
+const MSGID PROCESS_DISPATCH = 5;
+const MSGID PROCESS_MSGID = PROCESS_DISPATCH + 1;
+
+
+class Process;
 
 
 struct msg
@@ -32,7 +38,7 @@ struct msg
 };
 
 
-class ProcessClock {
+class Clock {
 public:
   static void pause();
   static void resume();
@@ -40,74 +46,121 @@ public:
 };
 
 
-class MessageFilter {
+class Filter {
 public:
-  virtual bool filter(struct msg *) = 0;
+  virtual bool filter(msg *) = 0;
 };
 
 
-class Process {
+template <typename T>
+struct Future
+{
+  Future();
+  Future(const Future<T> &that);
+  Future<T> & operator = (const Future<T> &that);
+  virtual ~Future();
+  void set(const T &t_);
+  T get() const;
+
+private:
+  int *refs;
+  T **t;
+  Process *trigger;
+};
+
+
+template <typename T>
+class Promise
+{
 public:
-  /* Returns pid of process; valid even before calling spawn. */
-  PID self() const;
+  Promise();
+  Promise(const Promise<T> &that);
+  virtual ~Promise();
+  void set(const T &t_);
+  void associate(const Future<T> &future_);
 
-  /* Sends a message to PID without a return address. */
-  static void post(const PID &to, MSGID id);
+private:
+  void operator = (const Promise<T> &);
 
-  /* Sends a message with data to PID without a return address. */
-  static void post(const PID &to, MSGID id, const char *data, size_t length);
+  enum State {
+    UNSET_UNASSOCIATED,
+    SET_UNASSOCIATED,
+    UNSET_ASSOCIATED,
+    SET_ASSOCIATED,
+  };
+
+  int *refs;
+  T **t;
+  State *state;
+  Future<T> **future;
+};
 
-  /* Spawn a new process. */
-  static PID spawn(Process *process);
 
-  /* Wait for PID to exit (returns true if actually waited on a process). */
-  static bool wait(const PID &pid);
+template <>
+class Promise<void>;
 
-  /* Invoke the thunk in a legacy safe way. */
-  static void invoke(const std::tr1::function<void (void)> &thunk);
 
-  /* Filter messages to be enqueued (except for timeout messages). */
-  static void filter(MessageFilter *);
+template <typename T>
+class Promise<T&>;
 
-protected:
+
+template <typename T>
+struct Result
+{
+  Result(const T &t_);
+  Result(const Promise<T> &promise_);
+  Result(const Result<T> &that);
+  virtual ~Result();
+  bool isPromise() const;
+  Promise<T> getPromise() const;
+
+  T get() const;
+
+private:
+  void operator = (const Result<T> &);
+
+  int *refs;
+  T *t;
+  Promise<T> *promise;
+};
+
+
+template <>
+struct Result<void>;
+
+
+class Process {
+public:
   Process();
   virtual ~Process();
 
+  /* Returns pid of process; valid even before calling spawn. */
+  PID self() const { return pid; }
+
+protected:
   /* Function run when process spawned. */
-  virtual void operator() () = 0;
+  virtual void operator() ();
 
   /* Returns the sender's PID of the last dequeued (current) message. */
-  PID from() const;
+  PID from() const { return current != NULL ? current->from : PID(); }
 
   /* Returns the id of the last dequeued (current) message. */
-  MSGID msgid() const;
+  MSGID msgid() const { return current != NULL ? current->id : PROCESS_ERROR; }
 
   /* Returns pointer and length of body of last dequeued (current) message. */
-  const char * body(size_t *length) const;
+  virtual const char * body(size_t *length) const;
 
   /* Put a message at front of queue (will not reschedule process). */
-  virtual void inject(const PID &from, MSGID id, const char *data, size_t length);
-
-  /* Sends a message to PID. */
-  virtual void send(const PID &to , MSGID);
+  virtual void inject(const PID &from, MSGID id, const char *data = NULL, size_t length = 0);
 
   /* Sends a message with data to PID. */
-  virtual void send(const PID &to, MSGID id, const char *data, size_t length);
-
-  /* Blocks for message indefinitely. */
-  virtual MSGID receive();
+  virtual void send(const PID &to, MSGID id, const char *data = NULL, size_t length = 0);
 
   /* Blocks for message at most specified seconds. */
-  virtual MSGID receive(double secs);
+  virtual MSGID receive(double secs = 0);
 
-  /* Sends a message to PID and then blocks for a message indefinitely. */
-  virtual MSGID call(const PID &to , MSGID id);
-
-  /* Sends a message with data to PID and then blocks for a message. */
-  virtual MSGID call(const PID &to, MSGID id, const char *data, size_t length);
-
-  /* Sends, and then blocks for a message at most specified seconds. */
-  virtual MSGID call(const PID &to, MSGID id, const char *data, size_t length, double secs);
+  /*  Processes dispatch messages. */
+  virtual MSGID serve(bool forever = true);
 
   /* Blocks at least specified seconds (may block longer). */
   virtual void pause(double secs);
@@ -118,11 +171,8 @@ protected:
   /* IO events for awaiting. */
   enum { RDONLY = 01, WRONLY = 02, RDWR = 03 };
 
-  /* Wait until operation is ready for file descriptor (or message received). */
-  virtual bool await(int fd, int op, const timeval& tv);
-
   /* Wait until operation is ready for file descriptor (or message received if not ignored). */
-  virtual bool await(int fd, int op, const timeval& tv, bool ignore);
+  virtual bool await(int fd, int op, double secs = 0, bool ignore = true);
 
   /* Returns true if operation on file descriptor is ready. */
   virtual bool ready(int fd, int op);
@@ -130,11 +180,314 @@ protected:
   /* Returns sub-second elapsed time (according to this process). */
   double elapsed();
 
+public:
+  /**
+   * Spawn a new process.
+   *
+   * @param process process to be spawned
+   */
+  static PID spawn(Process *process);
+
+  /**
+   * Wait for process to exit (returns true if actually waited on a process).
+   *
+   * @param PID id of the process
+   */
+  static bool wait(const PID &pid);
+
+  /**
+   * Invoke the thunk in a legacy safe way (i.e., outside of libprocess).
+   *
+   * @param thunk function to be invoked
+   */
+  static void invoke(const std::tr1::function<void (void)> &thunk);
+
+  /**
+   * Use the specified filter on messages that get enqueued (note,
+   * however, that for now you cannot filter timeout messages).
+   *
+   * @param filter message filter
+   */
+  static void filter(Filter *filter);
+
+  /**
+   * Sends a message with data without a return address.
+   *
+   * @param to receiver
+   * @param id message id
+   * @param data data to send (gets copied)
+   * @param length length of data
+   */
+  static void post(const PID &to, MSGID id, const char *data = NULL, size_t length = 0);
+
+  /**
+   * Dispatches a void method on a process.
+   *
+   * @param instance running process to receive dispatch message
+   * @param method method to invoke on instance
+   */
+  template <typename C>
+  static void dispatch(C *instance, void (C::*method)());
+
+  /**
+   * Dispatches a void method on a process.
+   *
+   * @param instance running process to receive dispatch message
+   * @param method method to invoke on instance
+   * @param a1 argument to pass to method
+   */
+  template <typename C, typename P1, typename A1>
+  static void dispatch(C *instance, void (C::*method)(P1), A1 a1);
+
+  /**
+   * Dispatches a void method on a process.
+   *
+   * @param instance running process to receive dispatch message
+   * @param method method to invoke on instance
+   * @param a1 first argument to pass to method
+   * @param a2 second argument to pass to method
+   */
+  template <typename C, typename P1, typename P2, typename A1, typename A2>
+  static void dispatch(C *instance, void (C::*method)(P1, P2), A1 a1, A2 a2);
+
+  /**
+   * Dispatches a void method on a process.
+   *
+   * @param instance running process to receive dispatch message
+   * @param method method to invoke on instance
+   * @param a1 first argument to pass to method
+   * @param a2 second argument to pass to method
+   * @param a3 second argument to pass to method
+   */
+  template <typename C,
+            typename P1, typename P2, typename P3,
+            typename A1, typename A2, typename A3>
+  static void dispatch(C *instance, void (C::*method)(P1, P2, P3),
+                       A1 a1, A2 a2, A3 a3);
+
+  /**
+   * Dispatches a void method on a process.
+   *
+   * @param instance running process to receive dispatch message
+   * @param method method to invoke on instance
+   * @param a1 first argument to pass to method
+   * @param a2 second argument to pass to method
+   * @param a3 third argument to pass to method
+   * @param a4 fourth argument to pass to method
+   */
+  template <typename C,
+            typename P1, typename P2, typename P3, typename P4,
+            typename A1, typename A2, typename A3, typename A4>
+  static void dispatch(C *instance, void (C::*method)(P1, P2, P3, P4),
+                       A1 a1, A2 a2, A3 a3, A4 a4);
+
+  /**
+   * Dispatches a void method on a process.
+   *
+   * @param instance running process to receive dispatch message
+   * @param method method to invoke on instance
+   * @param a1 first argument to pass to method
+   * @param a2 second argument to pass to method
+   * @param a3 third argument to pass to method
+   * @param a4 fourth argument to pass to method
+   * @param a5 fifth argument to pass to method
+   */
+  template <typename C,
+            typename P1, typename P2, typename P3, typename P4, typename P5,
+            typename A1, typename A2, typename A3, typename A4, typename A5>
+  static void dispatch(C *instance, void (C::*method)(P1, P2, P3, P4, P5),
+                       A1 a1, A2 a2, A3 a3, A4 a4, A5 a5);
+
+  /**
+   * Dispatches a method on a process and returns the future that
+   * corresponds to the result of executing the method.
+   *
+   * @param instance running process to receive dispatch message
+   * @param method method to invoke on instance
+   * @return future corresponding to the result of executing the method
+   */
+  template <typename T, typename C>
+  static Future<T> dispatch(C *instance, Result<T> (C::*method)());
+
+  /**
+   * Dispatches a method on a process and returns the future that
+   * corresponds to the result of executing the method.
+   *
+   * @param instance running process to receive dispatch message
+   * @param method method to invoke on instance
+   * @param a1 argument to pass to method
+   * @return future corresponding to the result of executing the method
+   */
+  template <typename T, typename C, typename P1, typename A1>
+  static Future<T> dispatch(C *instance, Result<T> (C::*method)(P1), A1 a1);
+
+  /**
+   * Dispatches a method on a process and returns the future that
+   * corresponds to the result of executing the method.
+   *
+   * @param instance running process to receive dispatch message
+   * @param method method to invoke on instance
+   * @param a1 first argument to pass to method
+   * @param a2 second argument to pass to method
+   * @return future corresponding to the result of executing the method
+   */
+  template <typename T, typename C,
+            typename P1, typename P2,
+            typename A1, typename A2>
+  static Future<T> dispatch(C *instance, Result<T> (C::*method)(P1, P2),
+                            A1 a1, A2 a2);
+
+  /**
+   * Dispatches a method on a process and returns the future that
+   * corresponds to the result of executing the method.
+   *
+   * @param instance running process to receive dispatch message
+   * @param method method to invoke on instance
+   * @param a1 first argument to pass to method
+   * @param a2 second argument to pass to method
+   * @param a3 second argument to pass to method
+   * @return future corresponding to the result of executing the method
+   */
+  template <typename T, typename C,
+            typename P1, typename P2, typename P3,
+            typename A1, typename A2, typename A3>
+  static Future<T> dispatch(C *instance, Result<T> (C::*method)(P1, P2, P3),
+                            A1 a1, A2 a2, A3 a3);
+
+  /**
+   * Dispatches a method on a process and returns the future that
+   * corresponds to the result of executing the method.
+   *
+   * @param instance running process to receive dispatch message
+   * @param method method to invoke on instance
+   * @param a1 first argument to pass to method
+   * @param a2 second argument to pass to method
+   * @param a3 third argument to pass to method
+   * @param a4 fourth argument to pass to method
+   * @return future corresponding to the result of executing the method
+   */
+  template <typename T, typename C,
+            typename P1, typename P2, typename P3, typename P4,
+            typename A1, typename A2, typename A3, typename A4>
+  static Future<T> dispatch(C *instance, Result<T> (C::*method)(P1, P2, P3, P4),
+                            A1 a1, A2 a2, A3 a3, A4 a4);
+
+  /**
+   * Dispatches a method on a process and returns the future that
+   * corresponds to the result of executing the method.
+   *
+   * @param instance running process to receive dispatch message
+   * @param method method to invoke on instance
+   * @param a1 first argument to pass to method
+   * @param a2 second argument to pass to method
+   * @param a3 third argument to pass to method
+   * @param a4 fourth argument to pass to method
+   * @param a5 fifth argument to pass to method
+   * @return future corresponding to the result of executing the method
+   */
+  template <typename T, typename C,
+            typename P1, typename P2, typename P3, typename P4, typename P5,
+            typename A1, typename A2, typename A3, typename A4, typename A5>
+  static Future<T> dispatch(C *instance, Result<T> (C::*method)(P1, P2, P3, P4, P5),
+                            A1 a1, A2 a2, A3 a3, A4 a4, A5 a5);
+
+  /**
+   * Dispatches a method on a process and waits (on the underlying
+   * future) for the result.
+   *
+   * @param instance running process to receive dispatch message
+   * @param method method to invoke on instance
+   * @return result of executing the method
+   */
+  template <typename T, typename C>
+  static T call(C *instance, Result<T> (C::*method)());
+
+  /**
+   * Dispatches a method on a process and waits (on the underlying
+   * future) for the result.
+   *
+   * @param instance running process to receive dispatch message
+   * @param method method to invoke on instance
+   * @param a1 argument to pass to method
+   * @return result of executing the method
+   */
+  template <typename T, typename C, typename P1, typename A1>
+  static T call(C *instance, Result<T> (C::*method)(P1), A1 a1);
+
+  /**
+   * Dispatches a method on a process and waits (on the underlying
+   * future) for the result.
+   *
+   * @param instance running process to receive dispatch message
+   * @param method method to invoke on instance
+   * @param a1 first argument to pass to method
+   * @param a2 second argument to pass to method
+   * @return result of executing the method
+   */
+  template <typename T, typename C,
+            typename P1, typename P2,
+            typename A1, typename A2>
+  static T call(C *instance, Result<T> (C::*method)(P1, P2), A1 a1, A2 a2);
+
+  /**
+   * Dispatches a method on a process and waits (on the underlying
+   * future) for the result.
+   *
+   * @param instance running process to receive dispatch message
+   * @param method method to invoke on instance
+   * @param a1 first argument to pass to method
+   * @param a2 second argument to pass to method
+   * @param a3 second argument to pass to method
+   * @return result of executing the method
+   */
+  template <typename T, typename C,
+            typename P1, typename P2, typename P3,
+            typename A1, typename A2, typename A3>
+  static T call(C *instance, Result<T> (C::*method)(P1, P2, P3),
+                A1 a1, A2 a2, A3 a3);
+
+  /**
+   * Dispatches a method on a process and waits (on the underlying
+   * future) for the result.
+   *
+   * @param instance running process to receive dispatch message
+   * @param method method to invoke on instance
+   * @param a1 first argument to pass to method
+   * @param a2 second argument to pass to method
+   * @param a3 third argument to pass to method
+   * @param a4 fourth argument to pass to method
+   * @return result of executing the method
+   */
+  template <typename T, typename C,
+            typename P1, typename P2, typename P3, typename P4,
+            typename A1, typename A2, typename A3, typename A4>
+  static T call(C *instance, Result<T> (C::*method)(P1, P2, P3, P4),
+                A1 a1, A2 a2, A3 a3, A4 a4);
+
+  /**
+   * Dispatches a method on a process and waits (on the underlying
+   * future) for the result.
+   *
+   * @param instance running process to receive dispatch message
+   * @param method method to invoke on instance
+   * @param a1 first argument to pass to method
+   * @param a2 second argument to pass to method
+   * @param a3 third argument to pass to method
+   * @param a4 fourth argument to pass to method
+   * @param a5 fifth argument to pass to method
+   * @return result of executing the method
+   */
+  template <typename T, typename C,
+            typename P1, typename P2, typename P3, typename P4, typename P5,
+            typename A1, typename A2, typename A3, typename A4, typename A5>
+  static T call(C *instance, Result<T> (C::*method)(P1, P2, P3, P4, P5),
+                A1 a1, A2 a2, A3 a3, A4 a4, A5 a5);
+
 private:
   friend class LinkManager;
   friend class ProcessManager;
   friend class ProcessReference;
-  friend void * schedule(void *arg);
+  friend void * schedule(void *);
 
   /* Flag indicating state of process. */
   enum { INIT,
@@ -153,10 +506,10 @@ private:
   int refs;
 
   /* Queue of received messages. */
-  std::deque<struct msg *> msgs;
+  std::deque<msg *> msgs;
 
   /* Current message. */
-  struct msg *current;
+  msg *current;
 
   /* Current "blocking" generation. */
   int generation;
@@ -173,59 +526,542 @@ private:
   void unlock() { pthread_mutex_unlock(&m); }
 
   /* Enqueues the specified message. */
-  void enqueue(struct msg *msg);
+  void enqueue(msg *msg);
 
   /* Dequeues a message or returns NULL. */
-  struct msg * dequeue();
+  msg * dequeue();
+
+  /* Dispatches the delegator to the specified process. */
+  static void dispatcher(Process *, std::tr1::function<void (void)> *delegator);
 };
 
 
-inline PID Process::self() const
+template <typename T>
+void delegate(std::tr1::function<Result<T> (void)> *thunk, Future<T> *future)
+{
+  assert(thunk != NULL);
+  assert(future != NULL);
+
+  const Result<T> &result = (*thunk)();
+
+  if (!result.isPromise()) {
+    future->set(result.get());
+  } else {
+    result.getPromise().associate(*future);
+  }
+
+  delete thunk;
+  delete future;
+}
+
+
+template <typename C>
+void Process::dispatch(C *instance, void (C::*method)())
+{
+  std::tr1::function<void (void)> *delegator =
+    new std::tr1::function<void (void)>(std::tr1::bind(method, instance));
+
+  dispatcher(instance, delegator);
+}
+
+
+template <typename C, typename P1, typename A1>
+void Process::dispatch(C *instance, void (C::*method)(P1), A1 a1)
+{
+  std::tr1::function<void (void)> *delegator =
+    new std::tr1::function<void (void)>(std::tr1::bind(method, instance,
+                                                       a1));
+
+  dispatcher(instance, delegator);
+}
+
+
+template <typename C,
+          typename P1, typename P2,
+          typename A1, typename A2>
+void Process::dispatch(C *instance, void (C::*method)(P1, P2), A1 a1, A2 a2)
+{
+  std::tr1::function<void (void)> *delegator =
+    new std::tr1::function<void (void)>(std::tr1::bind(method, instance,
+                                                       a1, a2));
+
+  dispatcher(instance, delegator);
+}
+
+
+template <typename C,
+          typename P1, typename P2, typename P3,
+          typename A1, typename A2, typename A3>
+void Process::dispatch(C *instance, void (C::*method)(P1, P2, P3),
+                       A1 a1, A2 a2, A3 a3)
+{
+  std::tr1::function<void (void)> *delegator =
+    new std::tr1::function<void (void)>(std::tr1::bind(method, instance,
+                                                       a1, a2, a3));
+
+  dispatcher(instance, delegator);
+}
+
+
+template <typename C,
+          typename P1, typename P2, typename P3, typename P4,
+          typename A1, typename A2, typename A3, typename A4>
+void Process::dispatch(C *instance, void (C::*method)(P1, P2, P3, P4),
+              A1 a1, A2 a2, A3 a3, A4 a4)
+{
+  std::tr1::function<void (void)> *delegator =
+    new std::tr1::function<void (void)>(std::tr1::bind(method, instance, a1, a2, a3,
+                                                 a4));
+
+  dispatcher(instance, delegator);
+}
+
+
+template <typename C,
+          typename P1, typename P2, typename P3, typename P4, typename P5,
+          typename A1, typename A2, typename A3, typename A4, typename A5>
+void Process::dispatch(C *instance, void (C::*method)(P1, P2, P3, P4, P5),
+                       A1 a1, A2 a2, A3 a3, A4 a4, A5 a5)
+{
+  std::tr1::function<void (void)> *delegator =
+    new std::tr1::function<void (void)>(std::tr1::bind(method, instance,
+                                                       a1, a2, a3, a4, a5));
+
+  dispatcher(instance, delegator);
+}
+
+
+template <typename T, typename C>
+Future<T> Process::dispatch(C *instance, Result<T> (C::*method)())
+{
+  std::tr1::function<Result<T> (void)> *thunk =
+    new std::tr1::function<Result<T> (void)>(std::tr1::bind(method, instance));
+
+  Future<T> *future = new Future<T>();
+
+  std::tr1::function<void (void)> *delegator =
+    new std::tr1::function<void (void)>(std::tr1::bind(&delegate<T>, thunk,
+                                                       future));
+
+  dispatcher(instance, delegator);
+
+  return *future;
+}
+
+
+template <typename T, typename C, typename P1, typename A1>
+Future<T> Process::dispatch(C *instance, Result<T> (C::*method)(P1), A1 a1)
+{
+  std::tr1::function<Result<T> (void)> *thunk =
+    new std::tr1::function<Result<T> (void)>(std::tr1::bind(method, instance,
+                                                            a1));
+
+  Future<T> *future = new Future<T>();
+
+  std::tr1::function<void (void)> *delegator =
+    new std::tr1::function<void (void)>(std::tr1::bind(&delegate<T>, thunk,
+                                                       future));
+
+  dispatcher(instance, delegator);
+
+  return *future;
+}
+
+
+template <typename T, typename C,
+          typename P1, typename P2,
+          typename A1, typename A2>
+Future<T> Process::dispatch(C *instance, Result<T> (C::*method)(P1, P2),
+                            A1 a1, A2 a2)
+{
+  std::tr1::function<Result<T> (void)> *thunk =
+    new std::tr1::function<Result<T> (void)>(std::tr1::bind(method, instance,
+                                                            a1, a2));
+
+  Future<T> *future = new Future<T>();
+
+  std::tr1::function<void (void)> *delegator =
+    new std::tr1::function<void (void)>(std::tr1::bind(&delegate<T>, thunk,
+                                                       future));
+
+  dispatcher(instance, delegator);
+
+  return *future;
+}
+
+
+template <typename T, typename C,
+          typename P1, typename P2, typename P3,
+          typename A1, typename A2, typename A3>
+Future<T> Process::dispatch(C *instance, Result<T> (C::*method)(P1, P2, P3),
+                            A1 a1, A2 a2, A3 a3)
+{
+  std::tr1::function<Result<T> (void)> *thunk =
+    new std::tr1::function<Result<T> (void)>(std::tr1::bind(method, instance,
+                                                            a1, a2, a3));
+
+  Future<T> *future = new Future<T>();
+
+  std::tr1::function<void (void)> *delegator =
+    new std::tr1::function<void (void)>(std::tr1::bind(&delegate<T>, thunk,
+                                                       future));
+
+  dispatcher(instance, delegator);
+
+  return *future;
+}
+
+
+template <typename T, typename C,
+          typename P1, typename P2, typename P3, typename P4,
+          typename A1, typename A2, typename A3, typename A4>
+Future<T> Process::dispatch(C *instance, Result<T> (C::*method)(P1, P2, P3, P4),
+                            A1 a1, A2 a2, A3 a3, A4 a4)
+{
+  std::tr1::function<Result<T> (void)> *thunk =
+    new std::tr1::function<Result<T> (void)>(std::tr1::bind(method, instance,
+                                                            a1, a2, a3, a4));
+
+  Future<T> *future = new Future<T>();
+
+  std::tr1::function<void (void)> *delegator =
+    new std::tr1::function<void (void)>(std::tr1::bind(&delegate<T>, thunk,
+                                                       future));
+
+  dispatcher(instance, delegator);
+
+  return *future;
+}
+
+
+template <typename T, typename C,
+          typename P1, typename P2, typename P3, typename P4, typename P5,
+          typename A1, typename A2, typename A3, typename A4, typename A5>
+Future<T> Process::dispatch(C *instance, Result<T> (C::*method)(P1, P2, P3, P4, P5),
+                            A1 a1, A2 a2, A3 a3, A4 a4, A5 a5)
+{
+  std::tr1::function<Result<T> (void)> *thunk =
+    new std::tr1::function<Result<T> (void)>(std::tr1::bind(method, instance,
+                                                            a1, a2, a3, a4, a5));
+
+  Future<T> *future = new Future<T>();
+
+  std::tr1::function<void (void)> *delegator =
+    new std::tr1::function<void (void)>(std::tr1::bind(&delegate<T>, thunk,
+                                                       future));
+
+  dispatcher(instance, delegator);
+
+  return *future;
+}
+
+
+template <typename T, typename C>
+T Process::call(C *instance, Result<T> (C::*method)())
+{
+  return dispatch(instance, method).get();
+}
+
+
+template <typename T, typename C, typename P1, typename A1>
+T Process::call(C *instance, Result<T> (C::*method)(P1), A1 a1)
+{
+  return dispatch(instance, method, a1).get();
+}
+
+
+template <typename T, typename C,
+          typename P1, typename P2,
+          typename A1, typename A2>
+T Process::call(C *instance, Result<T> (C::*method)(P1, P2), A1 a1, A2 a2)
+{
+  return dispatch(instance, method, a1, a2).get();
+}
+
+
+template <typename T, typename C,
+          typename P1, typename P2, typename P3,
+          typename A1, typename A2, typename A3>
+T Process::call(C *instance, Result<T> (C::*method)(P1, P2, P3),
+                A1 a1, A2 a2, A3 a3)
+{
+  return dispatch(instance, method, a1, a2, a3).get();
+}
+
+
+template <typename T, typename C,
+          typename P1, typename P2, typename P3, typename P4,
+          typename A1, typename A2, typename A3, typename A4>
+T Process::call(C *instance, Result<T> (C::*method)(P1, P2, P3, P4),
+                A1 a1, A2 a2, A3 a3, A4 a4)
+{
+  return dispatch(instance, method, a1, a2, a3, a4).get();
+}
+
+
+template <typename T, typename C,
+          typename P1, typename P2, typename P3, typename P4, typename P5,
+          typename A1, typename A2, typename A3, typename A4, typename A5>
+T Process::call(C *instance, Result<T> (C::*method)(P1, P2, P3, P4, P5),
+                A1 a1, A2 a2, A3 a3, A4 a4, A5 a5)
+{
+  return dispatch(instance, method, a1, a2, a3, a4, a5).get();
+}
+
+
+template <typename T>
+Future<T>::Future()
+{
+  refs = new int;
+  *refs = 1;
+  t = new T *;
+  *t = NULL;
+  trigger = new Process();
+  Process::spawn(trigger);
+}
+
+
+template <typename T>
+Future<T>::Future(const Future<T> &that)
+{
+  assert(that.refs > 0);
+  __sync_fetch_and_add(that.refs, 1);
+  refs = that.refs;
+  t = that.t;
+  trigger = that.trigger;
+}
+
+
+template <typename T>
+Future<T> & Future<T>::operator = (const Future<T> &that)
+{
+  if (this != &that) {
+    // Destructor ...
+    assert(refs != NULL);
+    if (__sync_sub_and_fetch(refs, 1) == 0) {
+      delete refs;
+      assert(t != NULL);
+      if (*t != NULL)
+        delete *t;
+      assert(trigger != NULL);
+      Process::post(trigger->self(), PROCESS_MSGID);
+      Process::wait(trigger->self());
+      delete trigger;
+    }
+
+    // Copy constructor ...
+    assert(that.refs > 0);
+    __sync_fetch_and_add(that.refs, 1);
+    refs = that.refs;
+    t = that.t;
+    trigger = that.trigger;
+  }
+}
+
+
+template <typename T>
+Future<T>::~Future()
+{
+  assert(refs != NULL);
+  if (__sync_sub_and_fetch(refs, 1) == 0) {
+    delete refs;
+    assert(t != NULL);
+    if (*t != NULL)
+      delete *t;
+    assert(trigger != NULL);
+    Process::post(trigger->self(), PROCESS_MSGID);
+    Process::wait(trigger->self());
+    delete trigger;
+  }
+}
+
+
+template <typename T>
+void Future<T>::set(const T &t_)
+{
+  assert(t != NULL && *t == NULL);
+  *t = new T(t_);
+  Process::post(trigger->self(), PROCESS_MSGID);
+}
+
+
+template <typename T>
+T Future<T>::get() const
+{
+  assert(t != NULL);
+  if (*t != NULL)
+    return **t;
+  assert(trigger != NULL);
+  Process::wait(trigger->self());
+  assert(t != NULL && *t != NULL);
+  return **t;
+}
+
+
+// TODO(benh): Use synchronized instead of CAS?
+#define CAS __sync_bool_compare_and_swap
+
+
+template <typename T>
+Promise<T>::Promise()
+{
+  refs = new int;
+  *refs = 1;
+  t = new T *;
+  *t = NULL;
+  state = new State;
+  *state = UNSET_UNASSOCIATED;
+  future = new Future<T> *;
+  *future = NULL;
+}
+
+
+template <typename T>
+Promise<T>::Promise(const Promise<T> &that)
+{
+  assert(that.refs > 0);
+  __sync_fetch_and_add(that.refs, 1);
+  refs = that.refs;
+  t = that.t;
+  state = that.state;
+  future = that.future;
+}
+
+
+template <typename T>
+Promise<T>::~Promise()
+{
+  assert(refs != NULL);
+  if (__sync_sub_and_fetch(refs, 1) == 0) {
+    delete refs;
+    assert(t != NULL);
+    if (*t != NULL)
+      delete *t;
+    assert(state != NULL);
+    delete state;
+    assert(future != NULL);
+    if (*future != NULL)
+      delete *future;
+  }
+}
+
+
+template <typename T>
+void Promise<T>::set(const T &t_)
+{
+  assert(state != NULL);
+  assert(*state == UNSET_UNASSOCIATED ||
+         *state == UNSET_ASSOCIATED);
+  assert(t != NULL && *t == NULL);
+  if (*state == UNSET_UNASSOCIATED) {
+    *t = new T(t_);
+    if (!__sync_bool_compare_and_swap(state, UNSET_UNASSOCIATED, SET_UNASSOCIATED)) {
+      assert(*state == UNSET_ASSOCIATED);
+      __sync_bool_compare_and_swap(state, UNSET_ASSOCIATED, SET_ASSOCIATED);
+      assert(future != NULL && *future != NULL);
+      (*future)->set(**t);
+    }
+  } else {
+    assert(*state == UNSET_ASSOCIATED);
+    assert(future != NULL && *future != NULL);
+    (*future)->set(t_);
+    __sync_bool_compare_and_swap(state, UNSET_ASSOCIATED, SET_ASSOCIATED);
+  }
+}
+
+
+template <typename T>
+void Promise<T>::associate(const Future<T> &future_)
 {
-  return pid;
+  assert(state != NULL);
+  assert(*state == UNSET_UNASSOCIATED ||
+         *state == SET_UNASSOCIATED);
+  assert(future != NULL);
+  *future = new Future<T>(future_);
+  if (*state == UNSET_UNASSOCIATED) {
+    if (!__sync_bool_compare_and_swap(state, UNSET_UNASSOCIATED,
+                                      UNSET_ASSOCIATED)) {
+      assert(*state == SET_UNASSOCIATED);
+      __sync_bool_compare_and_swap(state, SET_UNASSOCIATED, SET_ASSOCIATED);
+      assert(*state == SET_ASSOCIATED);
+      assert(t != NULL && *t != NULL);
+      (*future)->set(**t);
+    }
+  } else {
+    assert(*state == SET_UNASSOCIATED);
+    __sync_bool_compare_and_swap(state, SET_UNASSOCIATED, SET_ASSOCIATED);
+    assert(*state == SET_ASSOCIATED);
+    assert(t != NULL && *t != NULL);
+    (*future)->set(**t);
+  }
 }
 
 
-inline PID Process::from() const
+template <typename T>
+Result<T>::Result(const T &t_)
 {
-  return current != NULL ? current->from : PID();
+  refs = new int;
+  *refs = 1;
+  t = new T(t_);
+  promise = NULL;
 }
 
 
-inline MSGID Process::msgid() const
+template <typename T>
+Result<T>::Result(const Promise<T> &promise_)
 {
-  return current != NULL ? current->id : PROCESS_ERROR;
+  refs = new int;
+  *refs = 1;
+  t = NULL;
+  promise = new Promise<T>(promise_);
 }
 
 
-inline void Process::send(const PID &to, MSGID id)
+template <typename T>
+Result<T>::Result(const Result<T> &that)
 {
-  send(to, id, NULL, 0);
+  assert(that.refs > 0);
+  __sync_fetch_and_add(that.refs, 1);
+  refs = that.refs;
+  t = that.t;
+  promise = that.promise;
 }
 
 
-inline MSGID Process::call(const PID &to, MSGID id)
+template <typename T>
+Result<T>::~Result()
 {
-  return call(to, id, NULL, 0);
+  assert(refs != NULL);
+  if (__sync_sub_and_fetch(refs, 1) == 0) {
+    delete refs;
+    if (t != NULL)
+      delete t;
+    if (promise != NULL)
+      delete promise;
+  }
 }
 
 
-inline MSGID Process::call(const PID &to, MSGID id,
-			   const char *data, size_t length)
+template <typename T>
+bool Result<T>::isPromise() const
 {
-  return call(to, id, data, length, 0);
+  return promise != NULL;
 }
 
 
-inline MSGID Process::receive()
+template <typename T>
+Promise<T> Result<T>::getPromise() const
 {
-  return receive(0);
+  assert(isPromise());
+  return *promise;
 }
 
 
-inline void Process::post(const PID &to, MSGID id)
+template <typename T>
+T Result<T>::get() const
 {
-  post(to, id, NULL, 0);
+  assert(!isPromise());
+  return *t;
 }
 
 

Modified: incubator/mesos/trunk/third_party/libprocess/reliable.cpp
URL: http://svn.apache.org/viewvc/incubator/mesos/trunk/third_party/libprocess/reliable.cpp?rev=1132190&r1=1132189&r2=1132190&view=diff
==============================================================================
--- incubator/mesos/trunk/third_party/libprocess/reliable.cpp (original)
+++ incubator/mesos/trunk/third_party/libprocess/reliable.cpp Sun Jun  5 08:56:39 2011
@@ -8,6 +8,7 @@ using std::make_pair;
 using std::map;
 using std::pair;
 
+
 #define malloc(bytes)                                               \
   ({ void *tmp;                                                     \
      if ((tmp = malloc(bytes)) == NULL)                             \

Modified: incubator/mesos/trunk/third_party/libprocess/reliable.hpp
URL: http://svn.apache.org/viewvc/incubator/mesos/trunk/third_party/libprocess/reliable.hpp?rev=1132190&r1=1132189&r2=1132190&view=diff
==============================================================================
--- incubator/mesos/trunk/third_party/libprocess/reliable.hpp (original)
+++ incubator/mesos/trunk/third_party/libprocess/reliable.hpp Sun Jun  5 08:56:39 2011
@@ -154,4 +154,5 @@ inline MSGID ReliableProcess::receive()
   return receive(0);
 }
 
+
 #endif /* __RELIABLE_HPP__ */