You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@mesos.apache.org by be...@apache.org on 2011/06/05 11:09:34 UTC

svn commit: r1132263 - in /incubator/mesos/trunk/third_party/libprocess: Makefile.in decoder.hpp future.hpp latch.cpp latch.hpp pid.cpp pid.hpp process.cpp process.hpp promise.hpp

Author: benh
Date: Sun Jun  5 09:09:33 2011
New Revision: 1132263

URL: http://svn.apache.org/viewvc?rev=1132263&view=rev
Log:
Updates to libprocess to make dispatches no longer require a pointer to the process.

Added:
    incubator/mesos/trunk/third_party/libprocess/future.hpp
    incubator/mesos/trunk/third_party/libprocess/latch.cpp
    incubator/mesos/trunk/third_party/libprocess/latch.hpp
    incubator/mesos/trunk/third_party/libprocess/promise.hpp
Modified:
    incubator/mesos/trunk/third_party/libprocess/Makefile.in
    incubator/mesos/trunk/third_party/libprocess/decoder.hpp
    incubator/mesos/trunk/third_party/libprocess/pid.cpp
    incubator/mesos/trunk/third_party/libprocess/pid.hpp
    incubator/mesos/trunk/third_party/libprocess/process.cpp
    incubator/mesos/trunk/third_party/libprocess/process.hpp

Modified: incubator/mesos/trunk/third_party/libprocess/Makefile.in
URL: http://svn.apache.org/viewvc/incubator/mesos/trunk/third_party/libprocess/Makefile.in?rev=1132263&r1=1132262&r2=1132263&view=diff
==============================================================================
--- incubator/mesos/trunk/third_party/libprocess/Makefile.in (original)
+++ incubator/mesos/trunk/third_party/libprocess/Makefile.in Sun Jun  5 09:09:33 2011
@@ -24,7 +24,7 @@ CXXFLAGS += -fPIC -D_XOPEN_SOURCE
 # Add dependency tracking to CXXFLAGS.
 CXXFLAGS += -MMD -MP
 
-LIB_OBJ = process.o pid.o fatal.o tokenize.o
+LIB_OBJ = process.o pid.o fatal.o tokenize.o latch.o
 LIB = libprocess.a
 
 OBJS = $(LIB_OBJ)

Modified: incubator/mesos/trunk/third_party/libprocess/decoder.hpp
URL: http://svn.apache.org/viewvc/incubator/mesos/trunk/third_party/libprocess/decoder.hpp?rev=1132263&r1=1132262&r2=1132263&view=diff
==============================================================================
--- incubator/mesos/trunk/third_party/libprocess/decoder.hpp (original)
+++ incubator/mesos/trunk/third_party/libprocess/decoder.hpp Sun Jun  5 09:09:33 2011
@@ -99,7 +99,7 @@ private:
     // Okay, the message is complete, do the necessary parsing (it
     // would be cooler to do this inline instead).
     std::string name;
-    PID to, from;
+    UPID to, from;
 
     const std::vector<std::string>& pairs = tokenize(decoder->path, "/");
     if (pairs.size() != 2) {
@@ -107,7 +107,7 @@ private:
       return -1;
     }
 
-    to = PID(pairs[0], Process().self().ip, Process().self().port);
+    to = UPID(pairs[0], Process().self().ip, Process().self().port);
     name = pairs[1];
 
     if (name == "") {

Added: incubator/mesos/trunk/third_party/libprocess/future.hpp
URL: http://svn.apache.org/viewvc/incubator/mesos/trunk/third_party/libprocess/future.hpp?rev=1132263&view=auto
==============================================================================
--- incubator/mesos/trunk/third_party/libprocess/future.hpp (added)
+++ incubator/mesos/trunk/third_party/libprocess/future.hpp Sun Jun  5 09:09:33 2011
@@ -0,0 +1,108 @@
+#ifndef __FUTURE_HPP__
+#define __FUTURE_HPP__
+
+#include "latch.hpp"
+
+
+template <typename T>
+class Future
+{
+public:
+  Future();
+  Future(const Future<T>& that);
+  Future<T>& operator = (const Future<T>& that);
+  virtual ~Future();
+  void set(const T& t_);
+  T get() const;
+
+private:
+  int* refs;
+  T** t;
+  Latch* latch;
+};
+
+
+template <typename T>
+Future<T>::Future()
+{
+  refs = new int;
+  *refs = 1;
+  t = new T*;
+  *t = NULL;
+  latch = new Latch();
+}
+
+
+template <typename T>
+Future<T>::Future(const Future<T>& that)
+{
+  assert(that.refs > 0);
+  __sync_fetch_and_add(that.refs, 1);
+  refs = that.refs;
+  t = that.t;
+  latch = that.latch;
+}
+
+
+template <typename T>
+Future<T>& Future<T>::operator = (const Future<T>& that)
+{
+  if (this != &that) {
+    // Destructor ...
+    assert(refs != NULL);
+    if (__sync_sub_and_fetch(refs, 1) == 0) {
+      delete refs;
+      assert(t != NULL);
+      if (*t != NULL)
+        delete *t;
+      assert(latch != NULL);
+      delete latch;
+    }
+
+    // Copy constructor ...
+    assert(that.refs > 0);
+    __sync_fetch_and_add(that.refs, 1);
+    refs = that.refs;
+    t = that.t;
+    latch = that.latch;
+  }
+}
+
+
+template <typename T>
+Future<T>::~Future()
+{
+  assert(refs != NULL);
+  if (__sync_sub_and_fetch(refs, 1) == 0) {
+    delete refs;
+    assert(t != NULL);
+    if (*t != NULL)
+      delete *t;
+    assert(latch != NULL);
+    delete latch;
+  }
+}
+
+
+template <typename T>
+void Future<T>::set(const T& t_)
+{
+  assert(t != NULL && *t == NULL);
+  *t = new T(t_);
+  latch->trigger();
+}
+
+
+template <typename T>
+T Future<T>::get() const
+{
+  assert(t != NULL);
+  if (*t != NULL)
+    return **t;
+  assert(latch != NULL);
+  latch->wait();
+  assert(t != NULL && *t != NULL);
+  return **t;
+}
+
+#endif // __FUTURE_HPP__

Added: incubator/mesos/trunk/third_party/libprocess/latch.cpp
URL: http://svn.apache.org/viewvc/incubator/mesos/trunk/third_party/libprocess/latch.cpp?rev=1132263&view=auto
==============================================================================
--- incubator/mesos/trunk/third_party/libprocess/latch.cpp (added)
+++ incubator/mesos/trunk/third_party/libprocess/latch.cpp Sun Jun  5 09:09:33 2011
@@ -0,0 +1,39 @@
+#include <process.hpp>
+
+#include "latch.hpp"
+
+
+Latch::Latch()
+{
+  triggered = false;
+  process = new Process();
+  Process::spawn(process);
+}
+
+
+Latch::~Latch()
+{
+  assert(process != NULL);
+  Process::post(process->self(), TERMINATE);
+  Process::wait(process->self());
+  delete process;
+}
+
+
+void Latch::trigger()
+{
+  assert(process != NULL);
+  if (!triggered) {
+    triggered = true;
+    Process::post(process->self(), TERMINATE);
+  }
+}
+
+
+void Latch::wait()
+{
+  assert(process != NULL);
+  if (!triggered) {
+    Process::wait(process->self());
+  }
+}

Added: incubator/mesos/trunk/third_party/libprocess/latch.hpp
URL: http://svn.apache.org/viewvc/incubator/mesos/trunk/third_party/libprocess/latch.hpp?rev=1132263&view=auto
==============================================================================
--- incubator/mesos/trunk/third_party/libprocess/latch.hpp (added)
+++ incubator/mesos/trunk/third_party/libprocess/latch.hpp Sun Jun  5 09:09:33 2011
@@ -0,0 +1,28 @@
+#ifndef __LATCH_HPP__
+#define __LATCH_HPP__
+
+#include <process.hpp>
+
+// Need a forward declaration to break the dependency chain between
+// Process, Future, and Latch.
+class Process;
+
+
+class Latch
+{
+public:
+  Latch();
+  virtual ~Latch();
+
+  void trigger();
+  void wait();
+
+private:
+  Latch(const Latch& that);
+  Latch& operator = (const Latch& that);
+
+  bool triggered;
+  Process* process;
+};
+
+#endif // __LATCH_HPP__

Modified: incubator/mesos/trunk/third_party/libprocess/pid.cpp
URL: http://svn.apache.org/viewvc/incubator/mesos/trunk/third_party/libprocess/pid.cpp?rev=1132263&r1=1132262&r2=1132263&view=diff
==============================================================================
--- incubator/mesos/trunk/third_party/libprocess/pid.cpp (original)
+++ incubator/mesos/trunk/third_party/libprocess/pid.cpp Sun Jun  5 09:09:33 2011
@@ -11,6 +11,7 @@
 
 #include "config.hpp"
 #include "pid.hpp"
+#include "process.hpp"
 
 using std::istream;
 using std::ostream;
@@ -18,7 +19,7 @@ using std::size_t;
 using std::string;
 
 
-ostream & operator << (ostream &stream, const PID &pid)
+ostream& operator << (ostream& stream, const UPID& pid)
 {
   // Call inet_ntop since inet_ntoa is not thread-safe!
   char ip[INET_ADDRSTRLEN];
@@ -30,7 +31,7 @@ ostream & operator << (ostream &stream, 
 }
 
 
-istream & operator >> (istream &stream, PID &pid)
+istream& operator >> (istream& stream, UPID& pid)
 {
   pid.id = "";
   pid.ip = 0;
@@ -92,7 +93,7 @@ istream & operator >> (istream &stream, 
 }
 
 
-size_t hash_value(const PID& pid)
+size_t hash_value(const UPID& pid)
 {
   size_t seed = 0;
   boost::hash_combine(seed, pid.id);
@@ -100,3 +101,11 @@ size_t hash_value(const PID& pid)
   boost::hash_combine(seed, pid.port);
   return seed;
 }
+
+
+UPID::UPID(const Process& process)
+{
+  id = process.self().id;
+  ip = process.self().ip;
+  port = process.self().port;
+}

Modified: incubator/mesos/trunk/third_party/libprocess/pid.hpp
URL: http://svn.apache.org/viewvc/incubator/mesos/trunk/third_party/libprocess/pid.hpp?rev=1132263&r1=1132262&r2=1132263&view=diff
==============================================================================
--- incubator/mesos/trunk/third_party/libprocess/pid.hpp (original)
+++ incubator/mesos/trunk/third_party/libprocess/pid.hpp Sun Jun  5 09:09:33 2011
@@ -1,5 +1,5 @@
-#ifndef PID_HPP
-#define PID_HPP
+#ifndef __PID_HPP__
+#define __PID_HPP__
 
 #include <stdint.h>
 
@@ -8,40 +8,43 @@
 #include <string>
 
 
-struct PID;
+class Process;
+struct UPID;
 
 
-/* Outputing PIDs and generating PIDs using streamds. */
-std::ostream & operator << (std::ostream &, const PID &);
-std::istream & operator >> (std::istream &, PID &);
+// Outputing UPIDs and generating UPIDs using streams.
+std::ostream& operator << (std::ostream&, const UPID&);
+std::istream& operator >> (std::istream&, UPID&);
 
 
-/* PID hash value (for example, to use in Boost's unordered maps). */
-std::size_t hash_value(const PID &);
+// UPID hash value (for example, to use in Boost's unordered maps).
+std::size_t hash_value(const UPID&);
 
 
-struct PID
+struct UPID
 {
-  PID() : ip(0), port(0) {}
+  UPID() : ip(0), port(0) {}
 
-  PID(const char *id_, uint32_t ip_, uint16_t port_)
+  UPID(const char* id_, uint32_t ip_, uint16_t port_)
     : id(id_), ip(ip_), port(port_) {}
 
-  PID(const std::string& id_, uint32_t ip_, uint16_t port_)
+  UPID(const std::string& id_, uint32_t ip_, uint16_t port_)
     : id(id_), ip(ip_), port(port_) {}
 
-  PID(const char *s) 
+  UPID(const char* s) 
   {
     std::istringstream in(s);
     in >> *this;
   }
 
-  PID(const std::string &s)
+  UPID(const std::string& s)
   {
     std::istringstream in(s);
     in >> *this;
   }
 
+  UPID(const Process& process);
+
   operator std::string() const
   {
     std::ostringstream out;
@@ -54,7 +57,7 @@ struct PID
     return id == "" && ip == 0 && port == 0;
   }
 
-  bool operator < (const PID &that) const
+  bool operator < (const UPID& that) const
   {
     if (this != &that) {
       if (ip == that.ip && port == that.port)
@@ -68,7 +71,7 @@ struct PID
     return false;
   }
 
-  bool operator == (const PID &that) const
+  bool operator == (const UPID& that) const
   {
     if (this != &that) {
       return (id == that.id &&
@@ -79,16 +82,23 @@ struct PID
     return true;
   }
 
-  bool operator != (const PID &that) const
+  bool operator != (const UPID& that) const
   {
     return !(this->operator == (that));
   }
 
-
   std::string id;
   uint32_t ip;
   uint16_t port;
 };
 
 
-#endif /* PID_HPP */
+template <typename T = Process>
+struct PID : UPID
+{
+  PID() : UPID() {}
+  PID(const T& t) : UPID(static_cast<const Process&>(t)) {}
+};
+
+
+#endif // __PID_HPP__

Modified: incubator/mesos/trunk/third_party/libprocess/process.cpp
URL: http://svn.apache.org/viewvc/incubator/mesos/trunk/third_party/libprocess/process.cpp?rev=1132263&r1=1132262&r2=1132263&view=diff
==============================================================================
--- incubator/mesos/trunk/third_party/libprocess/process.cpp (original)
+++ incubator/mesos/trunk/third_party/libprocess/process.cpp Sun Jun  5 09:09:33 2011
@@ -117,7 +117,7 @@ ostream& operator << (ostream& stream, c
 struct timeout
 {
   ev_tstamp tstamp;
-  PID pid;
+  UPID pid;
   int generation;
 };
 
@@ -191,7 +191,7 @@ public:
   LinkManager();
   ~LinkManager();
 
-  void link(Process* process, const PID& to);
+  void link(Process* process, const UPID& to);
 
   void send(Message* message);
   void send(DataEncoder* encoder, int s);
@@ -204,8 +204,8 @@ public:
   void exited(Process* process);
 
 private:
-  /* Map from PID (local/remote) to process. */
-  map<PID, set<Process*> > links;
+  /* Map from UPID (local/remote) to process. */
+  map<UPID, set<Process*> > links;
 
   /* Map from socket to node (ip, port). */
   map<int, Node> sockets;
@@ -230,23 +230,23 @@ public:
   ProcessManager();
   ~ProcessManager();
 
-  ProcessReference use(const PID &pid);
+  ProcessReference use(const UPID &pid);
 
   void deliver(Message *message, Process *sender = NULL);
 
-  PID spawn(Process *process);
-  void link(Process *process, const PID &to);
+  UPID spawn(Process *process);
+  void link(Process *process, const UPID &to);
   void receive(Process *process, double secs);
   void pause(Process *process, double secs);
-  bool wait(Process *process, const PID &pid);
-  bool external_wait(const PID &pid);
+  bool wait(Process *process, const UPID &pid);
+  bool external_wait(const UPID &pid);
   bool await(Process *process, int fd, int op, double secs, bool ignore);
 
   void enqueue(Process *process);
   Process * dequeue();
 
-  void timedout(const PID &pid, int generation);
-  void awaited(const PID &pid, int generation);
+  void timedout(const UPID &pid, int generation);
+  void awaited(const UPID &pid, int generation);
 
   void run(Process *process);
   void cleanup(Process *process);
@@ -301,7 +301,7 @@ protected:
   }
 
 private:
-  map<PID, Proxy*> proxies;
+  map<UPID, Proxy*> proxies;
 };
 
 
@@ -486,7 +486,7 @@ int set_nbio(int fd)
 }
 
 
-Message* encode(const PID &from, const PID &to, const string &name, const string &data = "")
+Message* encode(const UPID &from, const UPID &to, const string &name, const string &data = "")
 {
   Message* message = new Message();
   message->from = from;
@@ -617,7 +617,7 @@ void handle_timeout(struct ev_loop *loop
 
 void handle_await(struct ev_loop *loop, ev_io *watcher, int revents)
 {
-  tuple<PID, int> *t = (tuple<PID, int> *) watcher->data;
+  tuple<UPID, int> *t = (tuple<UPID, int> *) watcher->data;
   process_manager->awaited(t->get<0>(), t->get<1>());
   ev_io_stop(loop, watcher);
   delete watcher;
@@ -1150,7 +1150,7 @@ Proxy::Proxy(int _c) : c(_c)
     Process::spawn(proxy_manager);
   }
 
-  dispatch(proxy_manager, &ProxyManager::manage, this);
+  dispatch(PID<ProxyManager>(*proxy_manager), &ProxyManager::manage, this);
 }
 
 
@@ -1188,7 +1188,7 @@ LinkManager::LinkManager()
 LinkManager::~LinkManager() {}
 
 
-void LinkManager::link(Process *process, const PID &to)
+void LinkManager::link(Process *process, const UPID &to)
 {
   // TODO(benh): The semantics we want to support for link are such
   // that if there is nobody to link to (local or remote) then a
@@ -1448,9 +1448,9 @@ void LinkManager::exited(const Node &nod
   // ourselves that the accesses to each Process object will always be
   // valid.
   synchronized (this) {
-    list<PID> removed;
+    list<UPID> removed;
     // Look up all linked processes.
-    foreachpair (const PID &pid, set<Process *> &processes, links) {
+    foreachpair (const UPID &pid, set<Process *> &processes, links) {
       if (pid.ip == node.ip && pid.port == node.port) {
         // N.B. If we call exited(pid) we might invalidate iteration.
         foreach (Process *process, processes) {
@@ -1461,7 +1461,7 @@ void LinkManager::exited(const Node &nod
       }
     }
 
-    foreach (const PID &pid, removed) {
+    foreach (const UPID &pid, removed) {
       links.erase(pid);
     }
   }
@@ -1475,10 +1475,10 @@ void LinkManager::exited(Process *proces
     foreachpair (_, set<Process *> &processes, links)
       processes.erase(process);
 
-    const PID &pid = process->self();
+    const UPID &pid = process->self();
 
     /* Look up all linked processes. */
-    map<PID, set<Process *> >::iterator it = links.find(pid);
+    map<UPID, set<Process *> >::iterator it = links.find(pid);
 
     if (it != links.end()) {
       set<Process *> &processes = it->second;
@@ -1504,7 +1504,7 @@ ProcessManager::ProcessManager()
 ProcessManager::~ProcessManager() {}
 
 
-ProcessReference ProcessManager::use(const PID &pid)
+ProcessReference ProcessManager::use(const UPID &pid)
 {
   if (pid.ip == ip && pid.port == port) {
     synchronized (processes) {
@@ -1547,7 +1547,7 @@ void ProcessManager::deliver(Message *me
 }
 
 
-PID ProcessManager::spawn(Process *process)
+UPID ProcessManager::spawn(Process *process)
 {
   assert(process != NULL);
 
@@ -1555,7 +1555,7 @@ PID ProcessManager::spawn(Process *proce
 
   synchronized (processes) {
     if (processes.count(process->pid.id) > 0) {
-      return PID();
+      return UPID();
     } else {
       processes[process->pid.id] = process;
     }
@@ -1619,7 +1619,7 @@ PID ProcessManager::spawn(Process *proce
 
 
 
-void ProcessManager::link(Process *process, const PID &to)
+void ProcessManager::link(Process *process, const UPID &to)
 {
   // Check if the pid is local.
   if (!(to.ip == ip && to.port == port)) {
@@ -1712,7 +1712,7 @@ void ProcessManager::pause(Process *proc
 }
 
 
-bool ProcessManager::wait(Process *process, const PID &pid)
+bool ProcessManager::wait(Process *process, const UPID &pid)
 {
   bool waited = false;
 
@@ -1748,7 +1748,7 @@ bool ProcessManager::wait(Process *proce
 }
 
 
-bool ProcessManager::external_wait(const PID &pid)
+bool ProcessManager::external_wait(const UPID &pid)
 {
   // We use a gate for external waiters. A gate is single use. That
   // is, a new gate is created when the first external thread shows
@@ -1818,7 +1818,7 @@ bool ProcessManager::await(Process *proc
       // the watcher will always fire, even if we get interrupted and
       // return early, so this tuple will get cleaned up when the
       // watcher runs).
-      watcher->data = new tuple<PID, int>(process->pid, process->generation);
+      watcher->data = new tuple<UPID, int>(process->pid, process->generation);
 
       /* Enqueue the watcher. */
       synchronized (watchers) {
@@ -1892,7 +1892,7 @@ Process * ProcessManager::dequeue()
 }
 
 
-void ProcessManager::timedout(const PID &pid, int generation)
+void ProcessManager::timedout(const UPID &pid, int generation)
 {
   if (ProcessReference process = use(pid)) {
     process->lock();
@@ -1931,7 +1931,7 @@ void ProcessManager::timedout(const PID 
 }
 
 
-void ProcessManager::awaited(const PID &pid, int generation)
+void ProcessManager::awaited(const UPID &pid, int generation)
 {
   if (ProcessReference process = use(pid)) {
     process->lock();
@@ -2300,7 +2300,7 @@ Message * Process::dequeue()
 }
 
 
-void Process::inject(const PID &from, const string &name, const char *data, size_t length)
+void Process::inject(const UPID& from, const string& name, const char* data, size_t length)
 {
   if (!from)
     return;
@@ -2328,7 +2328,7 @@ void Process::inject(const PID &from, co
 }
 
 
-void Process::send(const PID &to, const string &name, const char *data, size_t length)
+void Process::send(const UPID& to, const string& name, const char* data, size_t length)
 {
   if (!to) {
     return;
@@ -2387,7 +2387,7 @@ string Process::receive(double secs)
 
  timeout:
   assert(current == NULL);
-  current = encode(PID(), pid, TIMEOUT);
+  current = encode(UPID(), pid, TIMEOUT);
   return name();
 }
 
@@ -2395,12 +2395,12 @@ string Process::receive(double secs)
 string Process::serve(double secs, bool forever)
 {
   do {
-    const string &name = receive(secs);
-    if (name == DISPATCH) {
-      void *pointer = (char *) current->body.data();
-      std::tr1::function<void (void)> *delegator =
-        *reinterpret_cast<std::tr1::function<void (void)> **>(pointer);
-      (*delegator)();
+    const string& name = receive(secs);
+    if (name == "__DISPATCH__") {
+      void* pointer = (char *) current->body.data();
+      std::tr1::function<void(Process*)>* delegator =
+        *reinterpret_cast<std::tr1::function<void(Process*)>**>(pointer);
+      (*delegator)(this);
       delete delegator;
     } else {
       return name;
@@ -2415,17 +2415,17 @@ void Process::operator () ()
 }
 
 
-PID Process::from() const
+UPID Process::from() const
 {
   if (current != NULL) {
     return current->from;
   } else {
-    return PID();
+    return UPID();
   }
 }
 
 
-string Process::name() const
+const string& Process::name() const
 {
   if (current != NULL) {
     return current->name;
@@ -2435,7 +2435,7 @@ string Process::name() const
 }
 
 
-const char * Process::body(size_t *length) const
+const char* Process::body(size_t* length) const
 {
   if (current != NULL && current->body.size() > 0) {
     if (length != NULL) {
@@ -2460,7 +2460,7 @@ void Process::pause(double secs)
 }
 
 
-PID Process::link(const PID &to)
+UPID Process::link(const UPID& to)
 {
   if (!to) {
     return to;
@@ -2533,7 +2533,7 @@ double Process::elapsed()
 }
 
 
-PID Process::spawn(Process *process)
+UPID Process::spawn(Process *process)
 {
   initialize();
 
@@ -2553,12 +2553,12 @@ PID Process::spawn(Process *process)
 
     return process_manager->spawn(process);
   } else {
-    return PID();
+    return UPID();
   }
 }
 
 
-bool Process::wait(const PID &pid)
+bool Process::wait(const UPID& pid)
 {
   initialize();
 
@@ -2603,7 +2603,7 @@ void Process::filter(Filter *filter)
 }
 
 
-void Process::post(const PID &to, const string &name, const char *data, size_t length)
+void Process::post(const UPID& to, const string& name, const char* data, size_t length)
 {
   initialize();
 
@@ -2612,18 +2612,14 @@ void Process::post(const PID &to, const 
   }
 
   // Encode and transport outgoing message.
-  transport(encode(PID(), to, name, string(data, length)));
+  transport(encode(UPID(), to, name, string(data, length)));
 }
 
 
-void Process::dispatcher(Process *process, function<void (void)> *delegator)
+void Process::dispatcher(const UPID& pid, function<void(Process*)>* delegator)
 {
-  if (process == NULL) {
-    return;
-  }
-
   // Encode and deliver outgoing message.
-  Message *message = encode(PID(), process->pid, DISPATCH,
+  Message* message = encode(UPID(), pid, "__DISPATCH__",
                             string((char *) &delegator, sizeof(delegator)));
 
   if (proc_process != NULL) {

Modified: incubator/mesos/trunk/third_party/libprocess/process.hpp
URL: http://svn.apache.org/viewvc/incubator/mesos/trunk/third_party/libprocess/process.hpp?rev=1132263&r1=1132262&r2=1132263&view=diff
==============================================================================
--- incubator/mesos/trunk/third_party/libprocess/process.hpp (original)
+++ incubator/mesos/trunk/third_party/libprocess/process.hpp Sun Jun  5 09:09:33 2011
@@ -12,23 +12,21 @@
 
 #include <tr1/functional>
 
+#include "future.hpp"
 #include "pid.hpp"
+#include "promise.hpp"
 
 
 const std::string ERROR = "error";
 const std::string TIMEOUT = "timeout";
 const std::string EXIT = "exit";
 const std::string TERMINATE = "terminate";
-const std::string DISPATCH = "dispatch"; // TODO(benh): Make this internal.
-
-
-class Process;
 
 
 struct Message {
   std::string name;
-  PID from;
-  PID to;
+  UPID from;
+  UPID to;
   std::string body;
 };
 
@@ -43,87 +41,10 @@ public:
 
 class Filter {
 public:
-  virtual bool filter(Message *) = 0;
-};
-
-
-template <typename T>
-struct Future
-{
-  Future();
-  Future(const Future<T> &that);
-  Future<T> & operator = (const Future<T> &that);
-  virtual ~Future();
-  void set(const T &t_);
-  T get() const;
-
-private:
-  int *refs;
-  T **t;
-  Process *trigger;
-};
-
-
-template <typename T>
-class Promise
-{
-public:
-  Promise();
-  Promise(const Promise<T> &that);
-  virtual ~Promise();
-  void set(const T &t_);
-  void associate(const Future<T> &future_);
-
-private:
-  void operator = (const Promise<T> &);
-
-  enum State {
-    UNSET_UNASSOCIATED,
-    SET_UNASSOCIATED,
-    UNSET_ASSOCIATED,
-    SET_ASSOCIATED,
-  };
-
-  int *refs;
-  T **t;
-  State *state;
-  Future<T> **future;
-};
-
-
-template <>
-class Promise<void>;
-
-
-template <typename T>
-class Promise<T&>;
-
-
-template <typename T>
-struct Result
-{
-  Result(const T &t_);
-  Result(const Promise<T> &promise_);
-  Result(const Result<T> &that);
-  virtual ~Result();
-  bool isPromise() const;
-  Promise<T> getPromise() const;
-
-  T get() const;
-
-private:
-  void operator = (const Result<T> &);
-
-  int *refs;
-  T *t;
-  Promise<T> *promise;
+  virtual bool filter(Message*) = 0;
 };
 
 
-template <>
-struct Result<void>;
-
-
 class Process {
 public:
   Process(const std::string& id = "");
@@ -131,26 +52,26 @@ public:
   virtual ~Process();
 
   /* Returns pid of process; valid even before calling spawn. */
-  PID self() const { return pid; }
+  UPID self() const { return pid; }
 
 protected:
   /* Function run when process spawned. */
   virtual void operator() ();
 
   /* Returns the sender's PID of the last dequeued (current) message. */
-  PID from() const;
+  UPID from() const;
 
   /* Returns the name of the last dequeued (current) message. */
-  std::string name() const;
+  const std::string& name() const;
 
   /* Returns pointer and length of body of last dequeued (current) message. */
-  const char * body(size_t *length) const;
+  const char* body(size_t* length) const;
 
   /* Put a message at front of queue (will not reschedule process). */
-  void inject(const PID &from, const std::string &name, const char *data = NULL, size_t length = 0);
+  void inject(const UPID& from, const std::string& name, const char* data = NULL, size_t length = 0);
 
   /* Sends a message with data to PID. */
-  void send(const PID &to, const std::string &name, const char *data = NULL, size_t length = 0);
+  void send(const UPID& to, const std::string &name, const char *data = NULL, size_t length = 0);
 
   /* Blocks for message at most specified seconds (0 implies forever). */
   std::string receive(double secs = 0);
@@ -162,7 +83,7 @@ protected:
   void pause(double secs);
 
   /* Links with the specified PID. */
-  PID link(const PID &pid);
+  UPID link(const UPID& pid);
 
   /* IO events for awaiting. */
   enum { RDONLY = 01, WRONLY = 02, RDWR = 03 };
@@ -182,21 +103,21 @@ public:
    *
    * @param process process to be spawned
    */
-  static PID spawn(Process *process);
+  static UPID spawn(Process* process);
 
   /**
    * Wait for process to exit (returns true if actually waited on a process).
    *
    * @param PID id of the process
    */
-  static bool wait(const PID &pid);
+  static bool wait(const UPID& pid);
 
   /**
    * Invoke the thunk in a legacy safe way (i.e., outside of libprocess).
    *
    * @param thunk function to be invoked
    */
-  static void invoke(const std::tr1::function<void (void)> &thunk);
+  static void invoke(const std::tr1::function<void(void)>& thunk);
 
   /**
    * Use the specified filter on messages that get enqueued (note,
@@ -204,7 +125,7 @@ public:
    *
    * @param filter message filter
    */
-  static void filter(Filter *filter);
+  static void filter(Filter* filter);
 
   /**
    * Sends a message with data without a return address.
@@ -214,73 +135,73 @@ public:
    * @param data data to send (gets copied)
    * @param length length of data
    */
-  static void post(const PID &to, const std::string &name, const char *data = NULL, size_t length = 0);
+  static void post(const UPID& to, const std::string& name, const char* data = NULL, size_t length = 0);
 
   /**
    * Dispatches a void method on a process.
    *
-   * @param instance running process to receive dispatch message
-   * @param method method to invoke on instance
+   * @param pid receiver of message
+   * @param method method to invoke on receiver
    */
-  template <typename C>
-  static void dispatch(C *instance, void (C::*method)());
+  template <typename T>
+  static void dispatch(const PID<T>& pid, void (T::*method)());
 
   /**
    * Dispatches a void method on a process.
    *
-   * @param instance running process to receive dispatch message
+   * @param pid receiver of message
    * @param method method to invoke on instance
    * @param a1 argument to pass to method
    */
-  template <typename C, typename P1, typename A1>
-  static void dispatch(C *instance, void (C::*method)(P1), A1 a1);
+  template <typename T, typename P1, typename A1>
+  static void dispatch(const PID<T>& pid, void (T::*method)(P1), A1 a1);
 
   /**
    * Dispatches a void method on a process.
    *
-   * @param instance running process to receive dispatch message
+   * @param pid receiver of message
    * @param method method to invoke on instance
    * @param a1 first argument to pass to method
    * @param a2 second argument to pass to method
    */
-  template <typename C, typename P1, typename P2, typename A1, typename A2>
-  static void dispatch(C *instance, void (C::*method)(P1, P2), A1 a1, A2 a2);
+  template <typename T, typename P1, typename P2, typename A1, typename A2>
+  static void dispatch(const PID<T>& pid, void (T::*method)(P1, P2), A1 a1, A2 a2);
 
   /**
    * Dispatches a void method on a process.
    *
-   * @param instance running process to receive dispatch message
+   * @param pid receiver of message
    * @param method method to invoke on instance
    * @param a1 first argument to pass to method
    * @param a2 second argument to pass to method
    * @param a3 second argument to pass to method
    */
-  template <typename C,
+  template <typename T,
             typename P1, typename P2, typename P3,
             typename A1, typename A2, typename A3>
-  static void dispatch(C *instance, void (C::*method)(P1, P2, P3),
+  static void dispatch(const PID<T>& pid, void (T::*method)(P1, P2, P3),
                        A1 a1, A2 a2, A3 a3);
 
   /**
    * Dispatches a void method on a process.
    *
-   * @param instance running process to receive dispatch message
+   * @param pid receiver of message
    * @param method method to invoke on instance
    * @param a1 first argument to pass to method
    * @param a2 second argument to pass to method
    * @param a3 third argument to pass to method
    * @param a4 fourth argument to pass to method
    */
-  template <typename C,
+  template <typename T,
             typename P1, typename P2, typename P3, typename P4,
             typename A1, typename A2, typename A3, typename A4>
-  static void dispatch(C *instance, void (C::*method)(P1, P2, P3, P4),
+  static void dispatch(const PID<T>& pid, void (T::*method)(P1, P2, P3, P4),
                        A1 a1, A2 a2, A3 a3, A4 a4);
 
   /**
    * Dispatches a void method on a process.
    *
-   * @param instance running process to receive dispatch message
+   * @param pid receiver of message
    * @param method method to invoke on instance
    * @param a1 first argument to pass to method
    * @param a2 second argument to pass to method
@@ -288,73 +209,77 @@ public:
    * @param a4 fourth argument to pass to method
    * @param a5 fifth argument to pass to method
    */
-  template <typename C,
+  template <typename T,
             typename P1, typename P2, typename P3, typename P4, typename P5,
             typename A1, typename A2, typename A3, typename A4, typename A5>
-  static void dispatch(C *instance, void (C::*method)(P1, P2, P3, P4, P5),
+  static void dispatch(const PID<T>& pid, void (T::*method)(P1, P2, P3, P4, P5),
                        A1 a1, A2 a2, A3 a3, A4 a4, A5 a5);
 
   /**
    * Dispatches a method on a process and returns the future that
    * corresponds to the result of executing the method.
    *
-   * @param instance running process to receive dispatch message
+   * @param pid receiver of message
    * @param method method to invoke on instance
    * @return future corresponding to the result of executing the method
    */
-  template <typename T, typename C>
-  static Future<T> dispatch(C *instance, Result<T> (C::*method)());
+  template <typename R, typename T>
+  static Future<R> dispatch(const PID<T>& pid, Promise<R> (T::*method)());
 
   /**
    * Dispatches a method on a process and returns the future that
    * corresponds to the result of executing the method.
    *
-   * @param instance running process to receive dispatch message
+   * @param pid receiver of message
    * @param method method to invoke on instance
    * @param a1 argument to pass to method
    * @return future corresponding to the result of executing the method
    */
-  template <typename T, typename C, typename P1, typename A1>
-  static Future<T> dispatch(C *instance, Result<T> (C::*method)(P1), A1 a1);
+  template <typename R, typename T, typename P1, typename A1>
+  static Future<R> dispatch(const PID<T>& pid,
+                            Promise<R> (T::*method)(P1),
+                            A1 a1);
 
   /**
    * Dispatches a method on a process and returns the future that
    * corresponds to the result of executing the method.
    *
-   * @param instance running process to receive dispatch message
+   * @param pid receiver of message
    * @param method method to invoke on instance
    * @param a1 first argument to pass to method
    * @param a2 second argument to pass to method
    * @return future corresponding to the result of executing the method
    */
-  template <typename T, typename C,
+  template <typename R, typename T,
             typename P1, typename P2,
             typename A1, typename A2>
-  static Future<T> dispatch(C *instance, Result<T> (C::*method)(P1, P2),
+  static Future<R> dispatch(const PID<T>& pid,
+                            Promise<R> (T::*method)(P1, P2),
                             A1 a1, A2 a2);
 
   /**
    * Dispatches a method on a process and returns the future that
    * corresponds to the result of executing the method.
    *
-   * @param instance running process to receive dispatch message
+   * @param pid receiver of message
    * @param method method to invoke on instance
    * @param a1 first argument to pass to method
    * @param a2 second argument to pass to method
    * @param a3 second argument to pass to method
    * @return future corresponding to the result of executing the method
    */
-  template <typename T, typename C,
+  template <typename R, typename T,
             typename P1, typename P2, typename P3,
             typename A1, typename A2, typename A3>
-  static Future<T> dispatch(C *instance, Result<T> (C::*method)(P1, P2, P3),
+  static Future<R> dispatch(const PID<T>& pid,
+                            Promise<R> (T::*method)(P1, P2, P3),
                             A1 a1, A2 a2, A3 a3);
 
   /**
    * Dispatches a method on a process and returns the future that
    * corresponds to the result of executing the method.
    *
-   * @param instance running process to receive dispatch message
+   * @param pid receiver of message
    * @param method method to invoke on instance
    * @param a1 first argument to pass to method
    * @param a2 second argument to pass to method
@@ -362,17 +287,18 @@ public:
    * @param a4 fourth argument to pass to method
    * @return future corresponding to the result of executing the method
    */
-  template <typename T, typename C,
+  template <typename R, typename T,
             typename P1, typename P2, typename P3, typename P4,
             typename A1, typename A2, typename A3, typename A4>
-  static Future<T> dispatch(C *instance, Result<T> (C::*method)(P1, P2, P3, P4),
+  static Future<R> dispatch(const PID<T>& pid,
+                            Promise<R> (T::*method)(P1, P2, P3, P4),
                             A1 a1, A2 a2, A3 a3, A4 a4);
 
   /**
    * Dispatches a method on a process and returns the future that
    * corresponds to the result of executing the method.
    *
-   * @param instance running process to receive dispatch message
+   * @param pid receiver of message
    * @param method method to invoke on instance
    * @param a1 first argument to pass to method
    * @param a2 second argument to pass to method
@@ -381,72 +307,76 @@ public:
    * @param a5 fifth argument to pass to method
    * @return future corresponding to the result of executing the method
    */
-  template <typename T, typename C,
+  template <typename R, typename T,
             typename P1, typename P2, typename P3, typename P4, typename P5,
             typename A1, typename A2, typename A3, typename A4, typename A5>
-  static Future<T> dispatch(C *instance, Result<T> (C::*method)(P1, P2, P3, P4, P5),
+  static Future<R> dispatch(const PID<T>& pid,
+                            Promise<R> (T::*method)(P1, P2, P3, P4, P5),
                             A1 a1, A2 a2, A3 a3, A4 a4, A5 a5);
 
   /**
    * Dispatches a method on a process and waits (on the underlying
    * future) for the result.
    *
-   * @param instance running process to receive dispatch message
+   * @param pid receiver of message
    * @param method method to invoke on instance
    * @return result of executing the method
    */
-  template <typename T, typename C>
-  static T call(C *instance, Result<T> (C::*method)());
+  template <typename R, typename T>
+  static R call(const PID<T>& pid, Promise<R> (T::*method)());
 
   /**
    * Dispatches a method on a process and waits (on the underlying
    * future) for the result.
    *
-   * @param instance running process to receive dispatch message
+   * @param pid receiver of message
    * @param method method to invoke on instance
    * @param a1 argument to pass to method
    * @return result of executing the method
    */
-  template <typename T, typename C, typename P1, typename A1>
-  static T call(C *instance, Result<T> (C::*method)(P1), A1 a1);
+  template <typename R, typename T, typename P1, typename A1>
+  static R call(const PID<T>& pid, Promise<R> (T::*method)(P1), A1 a1);
 
   /**
    * Dispatches a method on a process and waits (on the underlying
    * future) for the result.
    *
-   * @param instance running process to receive dispatch message
+   * @param pid receiver of message
    * @param method method to invoke on instance
    * @param a1 first argument to pass to method
    * @param a2 second argument to pass to method
    * @return result of executing the method
    */
-  template <typename T, typename C,
+  template <typename R, typename T,
             typename P1, typename P2,
             typename A1, typename A2>
-  static T call(C *instance, Result<T> (C::*method)(P1, P2), A1 a1, A2 a2);
+  static R call(const PID<T>& pid,
+                Promise<R> (T::*method)(P1, P2),
+                A1 a1, A2 a2);
 
   /**
    * Dispatches a method on a process and waits (on the underlying
    * future) for the result.
    *
-   * @param instance running process to receive dispatch message
+   * @param pid receiver of message
    * @param method method to invoke on instance
    * @param a1 first argument to pass to method
    * @param a2 second argument to pass to method
    * @param a3 second argument to pass to method
    * @return result of executing the method
    */
-  template <typename T, typename C,
+  template <typename R, typename T,
             typename P1, typename P2, typename P3,
             typename A1, typename A2, typename A3>
-  static T call(C *instance, Result<T> (C::*method)(P1, P2, P3),
+  static R call(const PID<T>& pid,
+                Promise<R> (T::*method)(P1, P2, P3),
                 A1 a1, A2 a2, A3 a3);
 
   /**
    * Dispatches a method on a process and waits (on the underlying
    * future) for the result.
    *
-   * @param instance running process to receive dispatch message
+   * @param pid receiver of message
    * @param method method to invoke on instance
    * @param a1 first argument to pass to method
    * @param a2 second argument to pass to method
@@ -454,17 +384,18 @@ public:
    * @param a4 fourth argument to pass to method
    * @return result of executing the method
    */
-  template <typename T, typename C,
+  template <typename R, typename T,
             typename P1, typename P2, typename P3, typename P4,
             typename A1, typename A2, typename A3, typename A4>
-  static T call(C *instance, Result<T> (C::*method)(P1, P2, P3, P4),
+  static R call(const PID<T>& pid,
+                Promise<R> (T::*method)(P1, P2, P3, P4),
                 A1 a1, A2 a2, A3 a3, A4 a4);
 
   /**
    * Dispatches a method on a process and waits (on the underlying
    * future) for the result.
    *
-   * @param instance running process to receive dispatch message
+   * @param pid receiver of message
    * @param method method to invoke on instance
    * @param a1 first argument to pass to method
    * @param a2 second argument to pass to method
@@ -473,17 +404,18 @@ public:
    * @param a5 fifth argument to pass to method
    * @return result of executing the method
    */
-  template <typename T, typename C,
+  template <typename R, typename T,
             typename P1, typename P2, typename P3, typename P4, typename P5,
             typename A1, typename A2, typename A3, typename A4, typename A5>
-  static T call(C *instance, Result<T> (C::*method)(P1, P2, P3, P4, P5),
+  static R call(const PID<T>& pid,
+                Promise<R> (T::*method)(P1, P2, P3, P4, P5),
                 A1 a1, A2 a2, A3 a3, A4 a4, A5 a5);
 
 private:
   friend class LinkManager;
   friend class ProcessManager;
   friend class ProcessReference;
-  friend void * schedule(void *);
+  friend void* schedule(void *);
 
   /* Flag indicating state of process. */
   enum { INIT,
@@ -502,16 +434,16 @@ private:
   int refs;
 
   /* Queue of received messages. */
-  std::deque<Message *> messages;
+  std::deque<Message*> messages;
 
   /* Current message. */
-  Message *current;
+  Message* current;
 
   /* Current "blocking" generation. */
   int generation;
 
   /* Process PID. */
-  PID pid;
+  UPID pid;
 
   /* Continuation/Context of process. */
   ucontext_t uctx;
@@ -522,543 +454,322 @@ private:
   void unlock() { pthread_mutex_unlock(&m); }
 
   /* Enqueues the specified message. */
-  void enqueue(Message *message);
+  void enqueue(Message* message);
 
   /* Dequeues a message or returns NULL. */
-  Message * dequeue();
+  Message* dequeue();
+
+  template <typename T>
+  static void vdelegate(Process* process, std::tr1::function<void(T*)>* thunk)
+  {
+    assert(process != NULL);
+    assert(thunk != NULL);
+    (*thunk)(static_cast<T*>(process));
+    delete thunk;
+  }
+
+  template <typename R, typename T>
+  static void delegate(Process* process, std::tr1::function<Promise<R>(T*)>* thunk, Future<R>* future)
+  {
+    assert(process != NULL);
+    assert(thunk != NULL);
+    assert(future != NULL);
+    (*thunk)(static_cast<T*>(process)).associate(future);
+    delete thunk;
+  }
 
   /* Dispatches the delegator to the specified process. */
-  static void dispatcher(Process *, std::tr1::function<void (void)> *delegator);
+  static void dispatcher(const UPID& pid, std::tr1::function<void(Process*)>* delegator);
 };
 
 
 template <typename T>
-void delegate(std::tr1::function<Result<T> (void)> *thunk, Future<T> *future)
+void Process::dispatch(const PID<T>& pid, void (T::*method)())
 {
-  assert(thunk != NULL);
-  assert(future != NULL);
+  std::tr1::function<void(T*)>* thunk =
+    new std::tr1::function<void(T*)>(std::tr1::bind(method, std::tr1::placeholders::_1));
 
-  const Result<T> &result = (*thunk)();
+  std::tr1::function<void(Process*)>* delegator =
+    new std::tr1::function<void(Process*)>(std::tr1::bind(&Process::vdelegate<T>,
+                                                          std::tr1::placeholders::_1,
+                                                          thunk));
 
-  if (!result.isPromise()) {
-    future->set(result.get());
-  } else {
-    result.getPromise().associate(*future);
-  }
-
-  delete thunk;
-  delete future;
+  dispatcher(pid, delegator);
 }
 
 
-template <typename C>
-void Process::dispatch(C *instance, void (C::*method)())
+template <typename T, typename P1, typename A1>
+void Process::dispatch(const PID<T>& pid, void (T::*method)(P1), A1 a1)
 {
-  std::tr1::function<void (void)> *delegator =
-    new std::tr1::function<void (void)>(std::tr1::bind(method, instance));
-
-  dispatcher(instance, delegator);
-}
-
+  std::tr1::function<void(T*)>* thunk =
+    new std::tr1::function<void(T*)>(std::tr1::bind(method, std::tr1::placeholders::_1,
+                                                     a1));
 
-template <typename C, typename P1, typename A1>
-void Process::dispatch(C *instance, void (C::*method)(P1), A1 a1)
-{
-  std::tr1::function<void (void)> *delegator =
-    new std::tr1::function<void (void)>(std::tr1::bind(method, instance,
-                                                       a1));
+  std::tr1::function<void(Process*)>* delegator =
+    new std::tr1::function<void(Process*)>(std::tr1::bind(&Process::vdelegate<T>,
+                                                          std::tr1::placeholders::_1,
+                                                          thunk));
 
-  dispatcher(instance, delegator);
+  dispatcher(pid, delegator);
 }
 
 
-template <typename C,
+template <typename T,
           typename P1, typename P2,
           typename A1, typename A2>
-void Process::dispatch(C *instance, void (C::*method)(P1, P2), A1 a1, A2 a2)
+void Process::dispatch(const PID<T>& pid, void (T::*method)(P1, P2), A1 a1, A2 a2)
 {
-  std::tr1::function<void (void)> *delegator =
-    new std::tr1::function<void (void)>(std::tr1::bind(method, instance,
-                                                       a1, a2));
+  std::tr1::function<void(T*)>* thunk =
+    new std::tr1::function<void(T*)>(std::tr1::bind(method, std::tr1::placeholders::_1,
+                                                     a1, a2));
+
+  std::tr1::function<void(Process*)>* delegator =
+    new std::tr1::function<void(Process*)>(std::tr1::bind(&Process::vdelegate<T>,
+                                                          std::tr1::placeholders::_1,
+                                                          thunk));
 
-  dispatcher(instance, delegator);
+  dispatcher(pid, delegator);
 }
 
 
-template <typename C,
+template <typename T,
           typename P1, typename P2, typename P3,
           typename A1, typename A2, typename A3>
-void Process::dispatch(C *instance, void (C::*method)(P1, P2, P3),
+void Process::dispatch(const PID<T>& pid, void (T::*method)(P1, P2, P3),
                        A1 a1, A2 a2, A3 a3)
 {
-  std::tr1::function<void (void)> *delegator =
-    new std::tr1::function<void (void)>(std::tr1::bind(method, instance,
-                                                       a1, a2, a3));
+  std::tr1::function<void(T*)>* thunk =
+    new std::tr1::function<void(T*)>(std::tr1::bind(method, std::tr1::placeholders::_1,
+                                                     a1, a2, a3));
+
+  std::tr1::function<void(Process*)>* delegator =
+    new std::tr1::function<void(Process*)>(std::tr1::bind(&Process::vdelegate<T>,
+                                                          std::tr1::placeholders::_1,
+                                                          thunk));
 
-  dispatcher(instance, delegator);
+  dispatcher(pid, delegator);
 }
 
 
-template <typename C,
+template <typename T,
           typename P1, typename P2, typename P3, typename P4,
           typename A1, typename A2, typename A3, typename A4>
-void Process::dispatch(C *instance, void (C::*method)(P1, P2, P3, P4),
-              A1 a1, A2 a2, A3 a3, A4 a4)
+void Process::dispatch(const PID<T>& pid, void (T::*method)(P1, P2, P3, P4),
+                       A1 a1, A2 a2, A3 a3, A4 a4)
 {
-  std::tr1::function<void (void)> *delegator =
-    new std::tr1::function<void (void)>(std::tr1::bind(method, instance, a1, a2, a3,
-                                                 a4));
+  std::tr1::function<void(T*)>* thunk =
+    new std::tr1::function<void(T*)>(std::tr1::bind(method, std::tr1::placeholders::_1,
+                                                     a1, a2, a3, a4));
 
-  dispatcher(instance, delegator);
+  std::tr1::function<void(Process*)>* delegator =
+    new std::tr1::function<void(Process*)>(std::tr1::bind(&Process::vdelegate<T>,
+                                                          std::tr1::placeholders::_1,
+                                                          thunk));
+
+  dispatcher(pid, delegator);
 }
 
 
-template <typename C,
+template <typename T,
           typename P1, typename P2, typename P3, typename P4, typename P5,
           typename A1, typename A2, typename A3, typename A4, typename A5>
-void Process::dispatch(C *instance, void (C::*method)(P1, P2, P3, P4, P5),
+void Process::dispatch(const PID<T>& pid, void (T::*method)(P1, P2, P3, P4, P5),
                        A1 a1, A2 a2, A3 a3, A4 a4, A5 a5)
 {
-  std::tr1::function<void (void)> *delegator =
-    new std::tr1::function<void (void)>(std::tr1::bind(method, instance,
-                                                       a1, a2, a3, a4, a5));
+  std::tr1::function<void(T*)>* thunk =
+    new std::tr1::function<void(T*)>(std::tr1::bind(method, std::tr1::placeholders::_1,
+                                                     a1, a2, a3, a4, a5));
+
+  std::tr1::function<void(Process*)>* delegator =
+    new std::tr1::function<void(Process*)>(std::tr1::bind(&Process::vdelegate<T>,
+                                                          std::tr1::placeholders::_1,
+                                                          thunk));
 
-  dispatcher(instance, delegator);
+  dispatcher(pid, delegator);
 }
 
 
-template <typename T, typename C>
-Future<T> Process::dispatch(C *instance, Result<T> (C::*method)())
+template <typename R, typename T>
+Future<R> Process::dispatch(const PID<T>& pid, Promise<R> (T::*method)())
 {
-  std::tr1::function<Result<T> (void)> *thunk =
-    new std::tr1::function<Result<T> (void)>(std::tr1::bind(method, instance));
+  std::tr1::function<Promise<R> (T*)>* thunk =
+    new std::tr1::function<Promise<R> (T*)>(std::tr1::bind(method, std::tr1::placeholders::_1));
 
-  Future<T> *future = new Future<T>();
+  Future<R>* future = new Future<R>();
 
-  std::tr1::function<void (void)> *delegator =
-    new std::tr1::function<void (void)>(std::tr1::bind(&delegate<T>, thunk,
-                                                       future));
+  std::tr1::function<void(Process*)>* delegator =
+    new std::tr1::function<void(Process*)>(std::tr1::bind(&Process::delegate<R, T>,
+                                                          std::tr1::placeholders::_1,
+                                                          thunk, future));
 
-  dispatcher(instance, delegator);
+  dispatcher(pid, delegator);
 
   return *future;
 }
 
 
-template <typename T, typename C, typename P1, typename A1>
-Future<T> Process::dispatch(C *instance, Result<T> (C::*method)(P1), A1 a1)
+template <typename R, typename T, typename P1, typename A1>
+Future<R> Process::dispatch(const PID<T>& pid, Promise<R> (T::*method)(P1), A1 a1)
 {
-  std::tr1::function<Result<T> (void)> *thunk =
-    new std::tr1::function<Result<T> (void)>(std::tr1::bind(method, instance,
-                                                            a1));
+  std::tr1::function<Promise<R> (T*)>* thunk =
+    new std::tr1::function<Promise<R> (T*)>(std::tr1::bind(method, std::tr1::placeholders::_1,
+                                                           a1));
 
-  Future<T> *future = new Future<T>();
+  Future<R>* future = new Future<R>();
 
-  std::tr1::function<void (void)> *delegator =
-    new std::tr1::function<void (void)>(std::tr1::bind(&delegate<T>, thunk,
-                                                       future));
+  std::tr1::function<void(Process*)>* delegator =
+    new std::tr1::function<void(Process*)>(std::tr1::bind(&Process::delegate<R, T>,
+                                                          std::tr1::placeholders::_1,
+                                                          thunk, future));
 
-  dispatcher(instance, delegator);
+  dispatcher(pid, delegator);
 
   return *future;
 }
 
 
-template <typename T, typename C,
+template <typename R, typename T,
           typename P1, typename P2,
           typename A1, typename A2>
-Future<T> Process::dispatch(C *instance, Result<T> (C::*method)(P1, P2),
+Future<R> Process::dispatch(const PID<T>& pid, Promise<R> (T::*method)(P1, P2),
                             A1 a1, A2 a2)
 {
-  std::tr1::function<Result<T> (void)> *thunk =
-    new std::tr1::function<Result<T> (void)>(std::tr1::bind(method, instance,
-                                                            a1, a2));
+  std::tr1::function<Promise<R> (T*)>* thunk =
+    new std::tr1::function<Promise<R> (T*)>(std::tr1::bind(method, std::tr1::placeholders::_1,
+                                                           a1, a2));
+
+  Future<R>* future = new Future<R>();
+
+  std::tr1::function<void(Process*)>* delegator =
+    new std::tr1::function<void(Process*)>(std::tr1::bind(&Process::delegate<R, T>,
+                                                          std::tr1::placeholders::_1,
+                                                          thunk, future));
 
-  Future<T> *future = new Future<T>();
-
-  std::tr1::function<void (void)> *delegator =
-    new std::tr1::function<void (void)>(std::tr1::bind(&delegate<T>, thunk,
-                                                       future));
-
-  dispatcher(instance, delegator);
+  dispatcher(pid, delegator);
 
   return *future;
 }
 
 
-template <typename T, typename C,
+template <typename R, typename T,
           typename P1, typename P2, typename P3,
           typename A1, typename A2, typename A3>
-Future<T> Process::dispatch(C *instance, Result<T> (C::*method)(P1, P2, P3),
+Future<R> Process::dispatch(const PID<T>& pid, Promise<R> (T::*method)(P1, P2, P3),
                             A1 a1, A2 a2, A3 a3)
 {
-  std::tr1::function<Result<T> (void)> *thunk =
-    new std::tr1::function<Result<T> (void)>(std::tr1::bind(method, instance,
-                                                            a1, a2, a3));
-
-  Future<T> *future = new Future<T>();
+  std::tr1::function<Promise<R> (T*)>* thunk =
+    new std::tr1::function<Promise<R> (T*)>(std::tr1::bind(method, std::tr1::placeholders::_1,
+                                                           a1, a2, a3));
+
+  Future<R>* future = new Future<R>();
+
+  std::tr1::function<void(Process*)>* delegator =
+    new std::tr1::function<void(Process*)>(std::tr1::bind(&Process::delegate<R, T>,
+                                                          std::tr1::placeholders::_1,
+                                                          thunk, future));
 
-  std::tr1::function<void (void)> *delegator =
-    new std::tr1::function<void (void)>(std::tr1::bind(&delegate<T>, thunk,
-                                                       future));
-
-  dispatcher(instance, delegator);
+  dispatcher(pid, delegator);
 
   return *future;
 }
 
 
-template <typename T, typename C,
+template <typename R, typename T,
           typename P1, typename P2, typename P3, typename P4,
           typename A1, typename A2, typename A3, typename A4>
-Future<T> Process::dispatch(C *instance, Result<T> (C::*method)(P1, P2, P3, P4),
+Future<R> Process::dispatch(const PID<T>& pid, Promise<R> (T::*method)(P1, P2, P3, P4),
                             A1 a1, A2 a2, A3 a3, A4 a4)
 {
-  std::tr1::function<Result<T> (void)> *thunk =
-    new std::tr1::function<Result<T> (void)>(std::tr1::bind(method, instance,
-                                                            a1, a2, a3, a4));
-
-  Future<T> *future = new Future<T>();
+  std::tr1::function<Promise<R> (T*)>* thunk =
+    new std::tr1::function<Promise<R> (T*)>(std::tr1::bind(method, std::tr1::placeholders::_1,
+                                                           a1, a2, a3, a4));
+
+  Future<R>* future = new Future<R>();
+
+  std::tr1::function<void(Process*)>* delegator =
+    new std::tr1::function<void(Process*)>(std::tr1::bind(&Process::delegate<R, T>,
+                                                          std::tr1::placeholders::_1,
+                                                          thunk, future));
 
-  std::tr1::function<void (void)> *delegator =
-    new std::tr1::function<void (void)>(std::tr1::bind(&delegate<T>, thunk,
-                                                       future));
-
-  dispatcher(instance, delegator);
+  dispatcher(pid, delegator);
 
   return *future;
 }
 
 
-template <typename T, typename C,
+template <typename R, typename T,
           typename P1, typename P2, typename P3, typename P4, typename P5,
           typename A1, typename A2, typename A3, typename A4, typename A5>
-Future<T> Process::dispatch(C *instance, Result<T> (C::*method)(P1, P2, P3, P4, P5),
+Future<R> Process::dispatch(const PID<T>& pid, Promise<R> (T::*method)(P1, P2, P3, P4, P5),
                             A1 a1, A2 a2, A3 a3, A4 a4, A5 a5)
 {
-  std::tr1::function<Result<T> (void)> *thunk =
-    new std::tr1::function<Result<T> (void)>(std::tr1::bind(method, instance,
-                                                            a1, a2, a3, a4, a5));
-
-  Future<T> *future = new Future<T>();
-
-  std::tr1::function<void (void)> *delegator =
-    new std::tr1::function<void (void)>(std::tr1::bind(&delegate<T>, thunk,
-                                                       future));
+  std::tr1::function<Promise<R> (T*)>* thunk =
+    new std::tr1::function<Promise<R> (T*)>(std::tr1::bind(method, std::tr1::placeholders::_1,
+                                                           a1, a2, a3, a4, a5));
+
+  Future<R>* future = new Future<R>();
+
+  std::tr1::function<void(Process*)>* delegator =
+    new std::tr1::function<void(Process*)>(std::tr1::bind(&Process::delegate<R, T>,
+                                                          std::tr1::placeholders::_1,
+                                                          thunk, future));
 
-  dispatcher(instance, delegator);
+  dispatcher(pid, delegator);
 
   return *future;
 }
 
 
-template <typename T, typename C>
-T Process::call(C *instance, Result<T> (C::*method)())
+template <typename R, typename T>
+R Process::call(const PID<T>& pid, Promise<R> (T::*method)())
 {
-  return dispatch(instance, method).get();
+  return dispatch(pid, method).get();
 }
 
 
-template <typename T, typename C, typename P1, typename A1>
-T Process::call(C *instance, Result<T> (C::*method)(P1), A1 a1)
+template <typename R, typename T, typename P1, typename A1>
+R Process::call(const PID<T>& pid, Promise<R> (T::*method)(P1), A1 a1)
 {
-  return dispatch(instance, method, a1).get();
+  return dispatch(pid, method, a1).get();
 }
 
 
-template <typename T, typename C,
+template <typename R, typename T,
           typename P1, typename P2,
           typename A1, typename A2>
-T Process::call(C *instance, Result<T> (C::*method)(P1, P2), A1 a1, A2 a2)
+R Process::call(const PID<T>& pid, Promise<R> (T::*method)(P1, P2), A1 a1, A2 a2)
 {
-  return dispatch(instance, method, a1, a2).get();
+  return dispatch(pid, method, a1, a2).get();
 }
 
 
-template <typename T, typename C,
+template <typename R, typename T,
           typename P1, typename P2, typename P3,
           typename A1, typename A2, typename A3>
-T Process::call(C *instance, Result<T> (C::*method)(P1, P2, P3),
+R Process::call(const PID<T>& pid, Promise<R> (T::*method)(P1, P2, P3),
                 A1 a1, A2 a2, A3 a3)
 {
-  return dispatch(instance, method, a1, a2, a3).get();
+  return dispatch(pid, method, a1, a2, a3).get();
 }
 
 
-template <typename T, typename C,
+template <typename R, typename T,
           typename P1, typename P2, typename P3, typename P4,
           typename A1, typename A2, typename A3, typename A4>
-T Process::call(C *instance, Result<T> (C::*method)(P1, P2, P3, P4),
+R Process::call(const PID<T>& pid, Promise<R> (T::*method)(P1, P2, P3, P4),
                 A1 a1, A2 a2, A3 a3, A4 a4)
 {
-  return dispatch(instance, method, a1, a2, a3, a4).get();
+  return dispatch(pid, method, a1, a2, a3, a4).get();
 }
 
 
-template <typename T, typename C,
+template <typename R, typename T,
           typename P1, typename P2, typename P3, typename P4, typename P5,
           typename A1, typename A2, typename A3, typename A4, typename A5>
-T Process::call(C *instance, Result<T> (C::*method)(P1, P2, P3, P4, P5),
+R Process::call(const PID<T>& pid, Promise<R> (T::*method)(P1, P2, P3, P4, P5),
                 A1 a1, A2 a2, A3 a3, A4 a4, A5 a5)
 {
-  return dispatch(instance, method, a1, a2, a3, a4, a5).get();
-}
-
-
-template <typename T>
-Future<T>::Future()
-{
-  refs = new int;
-  *refs = 1;
-  t = new T *;
-  *t = NULL;
-  trigger = new Process();
-  Process::spawn(trigger);
-}
-
-
-template <typename T>
-Future<T>::Future(const Future<T> &that)
-{
-  assert(that.refs > 0);
-  __sync_fetch_and_add(that.refs, 1);
-  refs = that.refs;
-  t = that.t;
-  trigger = that.trigger;
-}
-
-
-template <typename T>
-Future<T> & Future<T>::operator = (const Future<T> &that)
-{
-  if (this != &that) {
-    // Destructor ...
-    assert(refs != NULL);
-    if (__sync_sub_and_fetch(refs, 1) == 0) {
-      delete refs;
-      assert(t != NULL);
-      if (*t != NULL)
-        delete *t;
-      assert(trigger != NULL);
-      Process::post(trigger->self(), "");
-      Process::wait(trigger->self());
-      delete trigger;
-    }
-
-    // Copy constructor ...
-    assert(that.refs > 0);
-    __sync_fetch_and_add(that.refs, 1);
-    refs = that.refs;
-    t = that.t;
-    trigger = that.trigger;
-  }
-}
-
-
-template <typename T>
-Future<T>::~Future()
-{
-  assert(refs != NULL);
-  if (__sync_sub_and_fetch(refs, 1) == 0) {
-    delete refs;
-    assert(t != NULL);
-    if (*t != NULL)
-      delete *t;
-    assert(trigger != NULL);
-    Process::post(trigger->self(), "");
-    Process::wait(trigger->self());
-    delete trigger;
-  }
-}
-
-
-template <typename T>
-void Future<T>::set(const T &t_)
-{
-  assert(t != NULL && *t == NULL);
-  *t = new T(t_);
-  Process::post(trigger->self(), "");
-}
-
-
-template <typename T>
-T Future<T>::get() const
-{
-  assert(t != NULL);
-  if (*t != NULL)
-    return **t;
-  assert(trigger != NULL);
-  Process::wait(trigger->self());
-  assert(t != NULL && *t != NULL);
-  return **t;
-}
-
-
-// TODO(benh): Use synchronized instead of CAS?
-#define CAS __sync_bool_compare_and_swap
-
-
-template <typename T>
-Promise<T>::Promise()
-{
-  refs = new int;
-  *refs = 1;
-  t = new T *;
-  *t = NULL;
-  state = new State;
-  *state = UNSET_UNASSOCIATED;
-  future = new Future<T> *;
-  *future = NULL;
-}
-
-
-template <typename T>
-Promise<T>::Promise(const Promise<T> &that)
-{
-  assert(that.refs > 0);
-  __sync_fetch_and_add(that.refs, 1);
-  refs = that.refs;
-  t = that.t;
-  state = that.state;
-  future = that.future;
-}
-
-
-template <typename T>
-Promise<T>::~Promise()
-{
-  assert(refs != NULL);
-  if (__sync_sub_and_fetch(refs, 1) == 0) {
-    delete refs;
-    assert(t != NULL);
-    if (*t != NULL)
-      delete *t;
-    assert(state != NULL);
-    delete state;
-    assert(future != NULL);
-    if (*future != NULL)
-      delete *future;
-  }
+  return dispatch(pid, method, a1, a2, a3, a4, a5).get();
 }
 
-
-template <typename T>
-void Promise<T>::set(const T &t_)
-{
-  assert(state != NULL);
-  assert(*state == UNSET_UNASSOCIATED ||
-         *state == UNSET_ASSOCIATED);
-  assert(t != NULL && *t == NULL);
-  if (*state == UNSET_UNASSOCIATED) {
-    *t = new T(t_);
-    if (!__sync_bool_compare_and_swap(state, UNSET_UNASSOCIATED, SET_UNASSOCIATED)) {
-      assert(*state == UNSET_ASSOCIATED);
-      __sync_bool_compare_and_swap(state, UNSET_ASSOCIATED, SET_ASSOCIATED);
-      assert(future != NULL && *future != NULL);
-      (*future)->set(**t);
-    }
-  } else {
-    assert(*state == UNSET_ASSOCIATED);
-    assert(future != NULL && *future != NULL);
-    (*future)->set(t_);
-    __sync_bool_compare_and_swap(state, UNSET_ASSOCIATED, SET_ASSOCIATED);
-  }
-}
-
-
-template <typename T>
-void Promise<T>::associate(const Future<T> &future_)
-{
-  assert(state != NULL);
-  assert(*state == UNSET_UNASSOCIATED ||
-         *state == SET_UNASSOCIATED);
-  assert(future != NULL);
-  *future = new Future<T>(future_);
-  if (*state == UNSET_UNASSOCIATED) {
-    if (!__sync_bool_compare_and_swap(state, UNSET_UNASSOCIATED,
-                                      UNSET_ASSOCIATED)) {
-      assert(*state == SET_UNASSOCIATED);
-      __sync_bool_compare_and_swap(state, SET_UNASSOCIATED, SET_ASSOCIATED);
-      assert(*state == SET_ASSOCIATED);
-      assert(t != NULL && *t != NULL);
-      (*future)->set(**t);
-    }
-  } else {
-    assert(*state == SET_UNASSOCIATED);
-    __sync_bool_compare_and_swap(state, SET_UNASSOCIATED, SET_ASSOCIATED);
-    assert(*state == SET_ASSOCIATED);
-    assert(t != NULL && *t != NULL);
-    (*future)->set(**t);
-  }
-}
-
-
-template <typename T>
-Result<T>::Result(const T &t_)
-{
-  refs = new int;
-  *refs = 1;
-  t = new T(t_);
-  promise = NULL;
-}
-
-
-template <typename T>
-Result<T>::Result(const Promise<T> &promise_)
-{
-  refs = new int;
-  *refs = 1;
-  t = NULL;
-  promise = new Promise<T>(promise_);
-}
-
-
-template <typename T>
-Result<T>::Result(const Result<T> &that)
-{
-  assert(that.refs > 0);
-  __sync_fetch_and_add(that.refs, 1);
-  refs = that.refs;
-  t = that.t;
-  promise = that.promise;
-}
-
-
-template <typename T>
-Result<T>::~Result()
-{
-  assert(refs != NULL);
-  if (__sync_sub_and_fetch(refs, 1) == 0) {
-    delete refs;
-    if (t != NULL)
-      delete t;
-    if (promise != NULL)
-      delete promise;
-  }
-}
-
-
-template <typename T>
-bool Result<T>::isPromise() const
-{
-  return promise != NULL;
-}
-
-
-template <typename T>
-Promise<T> Result<T>::getPromise() const
-{
-  assert(isPromise());
-  return *promise;
-}
-
-
-template <typename T>
-T Result<T>::get() const
-{
-  assert(!isPromise());
-  return *t;
-}
-
-
 #endif // __PROCESS_HPP__

Added: incubator/mesos/trunk/third_party/libprocess/promise.hpp
URL: http://svn.apache.org/viewvc/incubator/mesos/trunk/third_party/libprocess/promise.hpp?rev=1132263&view=auto
==============================================================================
--- incubator/mesos/trunk/third_party/libprocess/promise.hpp (added)
+++ incubator/mesos/trunk/third_party/libprocess/promise.hpp Sun Jun  5 09:09:33 2011
@@ -0,0 +1,151 @@
+#ifndef __PROMISE_HPP__
+#define __PROMISE_HPP__
+
+#include "future.hpp"
+
+
+template <typename T>
+class Promise
+{
+public:
+  Promise();
+  Promise(const T &t_);
+  Promise(const Promise<T> &that);
+  virtual ~Promise();
+  void set(const T &t_);
+  void associate(Future<T> *future_);
+
+private:
+  void operator = (const Promise<T> &);
+
+  enum State {
+    UNSET_UNASSOCIATED,
+    SET_UNASSOCIATED,
+    UNSET_ASSOCIATED,
+    SET_ASSOCIATED,
+  };
+
+  int *refs;
+  T **t;
+  Future<T> **future;
+  State *state;
+};
+
+
+template <>
+class Promise<void>;
+
+
+template <typename T>
+class Promise<T&>;
+
+
+template <typename T>
+Promise<T>::Promise()
+{
+  refs = new int;
+  *refs = 1;
+  t = new T *;
+  *t = NULL;
+  future = new Future<T> *;
+  *future = NULL;
+  state = new State;
+  *state = UNSET_UNASSOCIATED;
+}
+
+template <typename T>
+Promise<T>::Promise(const T &_t)
+{
+  refs = new int;
+  *refs = 1;
+  t = new T *;
+  *t = new T(_t);
+  future = new Future<T> *;
+  *future = NULL;
+  state = new State;
+  *state = SET_UNASSOCIATED;
+}
+
+
+template <typename T>
+Promise<T>::Promise(const Promise<T> &that)
+{
+  assert(that.refs != NULL);
+  assert(*that.refs > 0);
+  __sync_fetch_and_add(that.refs, 1);
+  refs = that.refs;
+  t = that.t;
+  state = that.state;
+  future = that.future;
+}
+
+
+template <typename T>
+Promise<T>::~Promise()
+{
+  assert(refs != NULL);
+  if (__sync_sub_and_fetch(refs, 1) == 0) {
+    delete refs;
+    assert(t != NULL);
+    if (*t != NULL)
+      delete *t;
+    assert(state != NULL);
+    delete state;
+    assert(future != NULL);
+    if (*future != NULL)
+      delete *future;
+  }
+}
+
+
+template <typename T>
+void Promise<T>::set(const T &t_)
+{
+  assert(state != NULL);
+  assert(*state == UNSET_UNASSOCIATED ||
+         *state == UNSET_ASSOCIATED);
+  assert(t != NULL && *t == NULL);
+  if (*state == UNSET_UNASSOCIATED) {
+    *t = new T(t_);
+    if (!__sync_bool_compare_and_swap(state, UNSET_UNASSOCIATED, SET_UNASSOCIATED)) {
+      assert(*state == UNSET_ASSOCIATED);
+      __sync_bool_compare_and_swap(state, UNSET_ASSOCIATED, SET_ASSOCIATED);
+      assert(future != NULL && *future != NULL);
+      (*future)->set(**t);
+    }
+  } else {
+    assert(*state == UNSET_ASSOCIATED);
+    assert(future != NULL && *future != NULL);
+    (*future)->set(t_);
+    __sync_bool_compare_and_swap(state, UNSET_ASSOCIATED, SET_ASSOCIATED);
+  }
+}
+
+
+template <typename T>
+void Promise<T>::associate(Future<T> *future_)
+{
+  assert(state != NULL);
+  assert(*state == UNSET_UNASSOCIATED ||
+         *state == SET_UNASSOCIATED);
+  assert(future != NULL);
+  *future = future_;
+  if (*state == UNSET_UNASSOCIATED) {
+    if (!__sync_bool_compare_and_swap(state, UNSET_UNASSOCIATED,
+                                      UNSET_ASSOCIATED)) {
+      assert(*state == SET_UNASSOCIATED);
+      __sync_bool_compare_and_swap(state, SET_UNASSOCIATED, SET_ASSOCIATED);
+      assert(*state == SET_ASSOCIATED);
+      assert(t != NULL && *t != NULL);
+      (*future)->set(**t);
+    }
+  } else {
+    assert(*state == SET_UNASSOCIATED);
+    __sync_bool_compare_and_swap(state, SET_UNASSOCIATED, SET_ASSOCIATED);
+    assert(*state == SET_ASSOCIATED);
+    assert(t != NULL && *t != NULL);
+    (*future)->set(**t);
+  }
+}
+
+#endif // __PROMISE_HPP__