You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@mesos.apache.org by be...@apache.org on 2011/06/05 11:19:39 UTC

svn commit: r1132286 - in /incubator/mesos/trunk/third_party/libprocess: encoder.hpp future.hpp http.hpp latch.cpp latch.hpp process.cpp process.hpp promise.hpp tests.cpp

Author: benh
Date: Sun Jun  5 09:19:39 2011
New Revision: 1132286

URL: http://svn.apache.org/viewvc?rev=1132286&view=rev
Log:
Improvements and minor bug fixes in libprocess.

Modified:
    incubator/mesos/trunk/third_party/libprocess/encoder.hpp
    incubator/mesos/trunk/third_party/libprocess/future.hpp
    incubator/mesos/trunk/third_party/libprocess/http.hpp
    incubator/mesos/trunk/third_party/libprocess/latch.cpp
    incubator/mesos/trunk/third_party/libprocess/latch.hpp
    incubator/mesos/trunk/third_party/libprocess/process.cpp
    incubator/mesos/trunk/third_party/libprocess/process.hpp
    incubator/mesos/trunk/third_party/libprocess/promise.hpp
    incubator/mesos/trunk/third_party/libprocess/tests.cpp

Modified: incubator/mesos/trunk/third_party/libprocess/encoder.hpp
URL: http://svn.apache.org/viewvc/incubator/mesos/trunk/third_party/libprocess/encoder.hpp?rev=1132286&r1=1132285&r2=1132286&view=diff
==============================================================================
--- incubator/mesos/trunk/third_party/libprocess/encoder.hpp (original)
+++ incubator/mesos/trunk/third_party/libprocess/encoder.hpp Sun Jun  5 09:19:39 2011
@@ -95,46 +95,44 @@ public:
   {
     std::ostringstream out;
 
-    out << "HTTP/1.0 " << response.status << "\r\n";
+    // TODO(benh): Check version?
 
-    foreachpair (const std::string& key, const std::string& value, response.headers) {
-      out << key << ": " << value << "\r\n";
-//         << "Content-Type: text/html\r\n"
-//         << "Content-Length: " << response.body.size() << "\r\n"
-    }
+    out << "HTTP/1.1 " << response.status << "\r\n";
 
-    out << "Connection: close\r\n"
-        << "\r\n";
+    std::map<std::string, std::string> headers = response.headers;
 
-    out.write(response.body.data(), response.body.size());
+    // HTTP 1.1 requires the "Date" header. In the future once we
+    // start checking the version (above) then we can conditionally
+    // add this header, but for now, we always do.
+    time_t rawtime;
+    time(&rawtime);
 
-    return out.str();
-  }
-};
+    char date[256];
 
+    // TODO(benh): Check return code of strftime!
+    strftime(date, 256, "%a, %d %b %Y %H:%M:%S GMT", gmtime(&rawtime));
 
-class HttpGatewayTimeoutEncoder : public DataEncoder
-{
-public:
-  HttpGatewayTimeoutEncoder()
-    : DataEncoder(encode()) {}
+    headers["Date"] = date;
 
-  static std::string encode()
-  {
-    std::ostringstream out;
+    foreachpair (const std::string& key, const std::string& value, headers) {
+      out << key << ": " << value << "\r\n";
+    }
+
+    // Make sure at least the "Content-Length" header since is present
+    // in order to signal to a client the end of a response.
+    if (headers.count("Content-Length") == 0) {
+      out << "Content-Length: " << response.body.size() << "\r\n";
+    }
 
-    // TODO(benh): We send a content length of 0 so that a browser
-    // won't just sit and wait for the socket to get closed (because
-    // we can't close it our self right now).
-    out << "HTTP/1.0 504 Gateway Timeout\r\n"
-        << "Content-Length: 0\r\n"
-        << "Connection: close\r\n"
-        << "\r\n";
+    out << "\r\n";
+
+    out.write(response.body.data(), response.body.size());
 
     return out.str();
   }
 };
 
+
 }  // namespace process {
 
 #endif // __ENCODER_HPP__

Modified: incubator/mesos/trunk/third_party/libprocess/future.hpp
URL: http://svn.apache.org/viewvc/incubator/mesos/trunk/third_party/libprocess/future.hpp?rev=1132286&r1=1132285&r2=1132286&view=diff
==============================================================================
--- incubator/mesos/trunk/third_party/libprocess/future.hpp (original)
+++ incubator/mesos/trunk/third_party/libprocess/future.hpp Sun Jun  5 09:19:39 2011
@@ -15,15 +15,19 @@ class Future
 {
 public:
   Future();
+  Future(const T& _t);
   Future(const Future<T>& that);
   Future<T>& operator = (const Future<T>& that);
   virtual ~Future();
+  bool ready() const;
+  bool await(double secs = 0) const;
   T get() const;
+  operator T () const;
 
 private:
   friend class Promise<T>;
 
-  void set(const T& t_);
+  void set(const T& _t);
 
   int* refs;
   T** t;
@@ -43,6 +47,18 @@ Future<T>::Future()
 
 
 template <typename T>
+Future<T>::Future(const T& _t)
+{
+  refs = new int;
+  *refs = 1;
+  t = new T*;
+  *t = NULL;
+  latch = new Latch();
+  set(_t);
+}
+
+
+template <typename T>
 Future<T>::Future(const Future<T>& that)
 {
   assert(that.refs > 0);
@@ -94,26 +110,51 @@ Future<T>::~Future()
 
 
 template <typename T>
-void Future<T>::set(const T& t_)
+bool Future<T>::ready() const
 {
-  assert(t != NULL && *t == NULL);
-  *t = new T(t_);
-  latch->trigger();
+  assert(t != NULL);
+  if (*t != NULL)
+    return true;
+  return false;
+}
+
+
+template <typename T>
+bool Future<T>::await(double secs) const
+{
+  if (ready())
+    return true;
+  assert(latch != NULL);
+  return latch->await(secs);
 }
 
 
 template <typename T>
 T Future<T>::get() const
 {
-  assert(t != NULL);
-  if (*t != NULL)
+  if (ready())
     return **t;
-  assert(latch != NULL);
-  latch->await();
+  await();
   assert(t != NULL && *t != NULL);
   return **t;
 }
 
+
+template <typename T>
+Future<T>::operator T () const
+{
+  return get();
+}
+
+
+template <typename T>
+void Future<T>::set(const T& _t)
+{
+  assert(t != NULL && *t == NULL);
+  *t = new T(_t);
+  latch->trigger();
+}
+
 }  // namespace process {
 
 #endif // __FUTURE_HPP__

Modified: incubator/mesos/trunk/third_party/libprocess/http.hpp
URL: http://svn.apache.org/viewvc/incubator/mesos/trunk/third_party/libprocess/http.hpp?rev=1132286&r1=1132285&r2=1132286&view=diff
==============================================================================
--- incubator/mesos/trunk/third_party/libprocess/http.hpp (original)
+++ incubator/mesos/trunk/third_party/libprocess/http.hpp Sun Jun  5 09:19:39 2011
@@ -22,6 +22,9 @@ struct HttpResponse
   // TODO(benh): Add major/minor version.
   std::string status;
   std::map<std::string, std::string> headers;
+  // TODO(benh): Make body a stream (channel) instead, and allow a
+  // response to be returned without forcing the stream to be
+  // finished.
   std::string body;
 };
 
@@ -35,6 +38,15 @@ struct HttpOKResponse : HttpResponse
 };
 
 
+struct HttpBadRequestResponse : HttpResponse
+{
+  HttpBadRequestResponse()
+  {
+    status = "400 Bad Request";
+  }
+};
+
+
 struct HttpNotFoundResponse : HttpResponse
 {
   HttpNotFoundResponse()
@@ -43,6 +55,25 @@ struct HttpNotFoundResponse : HttpRespon
   }
 };
 
+
+struct HttpInternalServerErrorResponse : HttpResponse
+{
+  HttpInternalServerErrorResponse()
+  {
+    status = "500 Internal Server Error";
+  }
+};
+
+
+struct HttpServiceUnavailableResponse : HttpResponse
+{
+  HttpServiceUnavailableResponse()
+  {
+    status = "503 Service Unavailable";
+  }
+};
+
+
 } // namespace process {
 
 #endif // __HTTP_HPP

Modified: incubator/mesos/trunk/third_party/libprocess/latch.cpp
URL: http://svn.apache.org/viewvc/incubator/mesos/trunk/third_party/libprocess/latch.cpp?rev=1132286&r1=1132285&r2=1132286&view=diff
==============================================================================
--- incubator/mesos/trunk/third_party/libprocess/latch.cpp (original)
+++ incubator/mesos/trunk/third_party/libprocess/latch.cpp Sun Jun  5 09:19:39 2011
@@ -39,12 +39,14 @@ void Latch::trigger()
 }
 
 
-void Latch::await()
+bool Latch::await(double secs)
 {
   assert(latch != NULL);
   if (!triggered) {
-    wait(latch->self());
+    return wait(latch->self(), secs);
   }
+
+  return true;
 }
 
 } // namespace process {

Modified: incubator/mesos/trunk/third_party/libprocess/latch.hpp
URL: http://svn.apache.org/viewvc/incubator/mesos/trunk/third_party/libprocess/latch.hpp?rev=1132286&r1=1132285&r2=1132286&view=diff
==============================================================================
--- incubator/mesos/trunk/third_party/libprocess/latch.hpp (original)
+++ incubator/mesos/trunk/third_party/libprocess/latch.hpp Sun Jun  5 09:19:39 2011
@@ -16,7 +16,7 @@ public:
   virtual ~Latch();
 
   void trigger();
-  void await();
+  bool await(double secs = 0);
 
 private:
   Latch(const Latch& that);

Modified: incubator/mesos/trunk/third_party/libprocess/process.cpp
URL: http://svn.apache.org/viewvc/incubator/mesos/trunk/third_party/libprocess/process.cpp?rev=1132286&r1=1132285&r2=1132286&view=diff
==============================================================================
--- incubator/mesos/trunk/third_party/libprocess/process.cpp (original)
+++ incubator/mesos/trunk/third_party/libprocess/process.cpp Sun Jun  5 09:19:39 2011
@@ -94,6 +94,9 @@ using std::tr1::function;
 
 struct Node
 {
+  Node(uint32_t _ip = 0, uint16_t _port = 0)
+    : ip(_ip), port(_port) {}
+
   uint32_t ip;
   uint16_t port;
 };
@@ -194,16 +197,151 @@ private:
 };
 
 
-class LinkManager
+/* Tick, tock ... manually controlled clock! */
+class InternalClock
 {
 public:
-  LinkManager();
-  ~LinkManager();
+  InternalClock()
+  {
+    initial = current = elapsed = ev_time();
+  }
+
+  ~InternalClock() {}
+
+  ev_tstamp getCurrent(ProcessBase *process)
+  {
+    ev_tstamp tstamp;
+
+    if (currents.count(process) != 0) {
+      tstamp = currents[process];
+    } else {
+      tstamp = currents[process] = initial;
+    }
+
+    return tstamp;
+  }
+
+  void setCurrent(ProcessBase *process, ev_tstamp tstamp)
+  {
+    currents[process] = tstamp;
+  }
+
+  ev_tstamp getCurrent()
+  {
+    return current;
+  }
+
+  void setCurrent(ev_tstamp tstamp)
+  {
+    current = tstamp;
+  }
+
+  ev_tstamp getElapsed()
+  {
+    return elapsed;
+  }
+
+  void setElapsed(ev_tstamp tstamp)
+  {
+    elapsed = tstamp;
+  }
+
+  void discard(ProcessBase *process)
+  {
+    assert(process != NULL);
+    currents.erase(process);
+  }
+
+private:
+  map<ProcessBase *, ev_tstamp> currents;
+  ev_tstamp initial;
+  ev_tstamp current;
+  ev_tstamp elapsed;
+};
+
+
+class GarbageCollector : public Process<GarbageCollector>
+{
+public:
+  GarbageCollector() {}
+  ~GarbageCollector() {}
+
+  template <typename T>
+  void manage(const T* t)
+  {
+    const ProcessBase* process = t;
+    if (process != NULL) {
+      processes[process->self()] = process;
+      link(process->self());
+    }
+  }
+
+protected:
+  virtual void operator () ()
+  {
+    while (true) {
+      serve();
+      if (name() == EXITED && processes.count(from()) > 0) {
+        const ProcessBase* process = processes[from()];
+        processes.erase(from());
+        delete process;
+      }
+    }
+  }
+
+private:
+  map<UPID, const ProcessBase*> processes;
+};
+
+
+/* Global garbage collector (move to own file). */
+static PID<GarbageCollector> gc;
+
+
+class HttpProxy;
+
+
+class HttpResponseWaiter : public Process<HttpResponseWaiter>
+{
+public:
+  HttpResponseWaiter(const PID<HttpProxy>& _proxy);
+  virtual ~HttpResponseWaiter();
+
+  void await(const Future<HttpResponse>& future, bool persist);
+
+private:
+  const PID<HttpProxy> proxy;
+};
+
+
+class HttpProxy : public Process<HttpProxy>
+{
+public:
+  HttpProxy(int _c);
+  virtual ~HttpProxy();
+
+  void handle(const Future<HttpResponse>& future, bool persist);
+  void ready(const Future<HttpResponse>& future, bool persist);
+  void unavailable(bool persist);
+
+private:
+  int c;
+  HttpResponseWaiter* waiter;
+};
+
+
+class SocketManager
+{
+public:
+  SocketManager();
+  ~SocketManager();
 
   void link(ProcessBase* process, const UPID& to);
 
+  PID<HttpProxy> proxy(int s);
+
+  void send(DataEncoder* encoder, int s, bool persist);
   void send(Message* message);
-  void send(DataEncoder* encoder, int s);
 
   DataEncoder* next(int s);
 
@@ -223,11 +361,15 @@ private:
   map<Node, int> temps;
   map<Node, int> persists;
 
+  /* Set of sockets that should be closed. */
   set<int> disposables;
 
-  /* Map from socket to outgoing messages. */
+  /* Map from socket to outgoing queue. */
   map<int, queue<DataEncoder*> > outgoing;
 
+  /* HTTP proxies. */
+  map<int, HttpProxy*> proxies;
+
   /* Protects instance variables. */
   synchronizable(this);
 };
@@ -277,7 +419,7 @@ private:
   /* Waiting processes (protected by synchronizable(processes)). */
   map<ProcessBase *, set<ProcessBase *> > waiters;
 
-  /* Map of gates for waiting threads. */
+  /* Gates for waiting threads (protected by synchronizable(processes)). */
   map<ProcessBase *, Gate *> gates;
 
   /* Queue of runnable processes (implemented as deque). */
@@ -286,115 +428,6 @@ private:
 };
 
 
-class HttpProxy : public Process<HttpProxy>
-{
-public:
-  HttpProxy(int _c, const Future<HttpResponse>& _future);
-  ~HttpProxy();
-
-protected:
-  virtual void operator () ();
-
-private:
-  int c;
-  Future<HttpResponse> future;
-};
-
-
-class HttpProxyManager : public Process<HttpProxyManager>
-{
-public:
-  HttpProxyManager() {}
-  ~HttpProxyManager() {}
-
-  void manage(HttpProxy* proxy)
-  {
-    assert(proxy != NULL);
-    proxies[proxy->self()] = proxy;
-    link(proxy->self());
-  }
-
-protected:
-  virtual void operator () ()
-  {
-    while (true) {
-      serve();
-      if (name() == EXITED && proxies.count(from()) > 0) {
-        HttpProxy* proxy = proxies[from()];
-        proxies.erase(from());
-        delete proxy;
-      }
-    }
-  }
-
-private:
-  map<UPID, HttpProxy*> proxies;
-};
-
-
-/* Tick, tock ... manually controlled clock! */
-class InternalClock
-{
-public:
-  InternalClock()
-  {
-    initial = current = elapsed = ev_time();
-  }
-
-  ~InternalClock() {}
-
-  ev_tstamp getCurrent(ProcessBase *process)
-  {
-    ev_tstamp tstamp;
-
-    if (currents.count(process) != 0) {
-      tstamp = currents[process];
-    } else {
-      tstamp = currents[process] = initial;
-    }
-
-    return tstamp;
-  }
-
-  void setCurrent(ProcessBase *process, ev_tstamp tstamp)
-  {
-    currents[process] = tstamp;
-  }
-
-  ev_tstamp getCurrent()
-  {
-    return current;
-  }
-
-  void setCurrent(ev_tstamp tstamp)
-  {
-    current = tstamp;
-  }
-
-  ev_tstamp getElapsed()
-  {
-    return elapsed;
-  }
-
-  void setElapsed(ev_tstamp tstamp)
-  {
-    elapsed = tstamp;
-  }
-
-  void discard(ProcessBase *process)
-  {
-    assert(process != NULL);
-    currents.erase(process);
-  }
-
-private:
-  map<ProcessBase *, ev_tstamp> currents;
-  ev_tstamp initial;
-  ev_tstamp current;
-  ev_tstamp elapsed;
-};
-
-
 /* Using manual clock if non-null. */
 static InternalClock *clk = NULL;
 
@@ -410,8 +443,8 @@ static uint32_t ip = 0;
 /* Local port. */
 static uint16_t port = 0;
 
-/* Active LinkManager (eventually will probably be thread-local). */
-static LinkManager *link_manager = NULL;
+/* Active SocketManager (eventually will probably be thread-local). */
+static SocketManager *socket_manager = NULL;
 
 /* Active ProcessManager (eventually will probably be thread-local). */
 static ProcessManager *process_manager = NULL;
@@ -489,12 +522,6 @@ static Filter *filterer = NULL;
 static synchronizable(filterer) = SYNCHRONIZED_INITIALIZER_RECURSIVE;
 
 
-/**
- * Instance of HttpProxyManager.
- */
-static HttpProxyManager* proxy_manager = NULL;
-
-
 int set_nbio(int fd)
 {
   int flags;
@@ -531,11 +558,47 @@ void transport(Message* message, Process
     process_manager->deliver(message, sender);
   } else {
     // Remote message.
-    link_manager->send(message);
+    socket_manager->send(message);
   }
 }
 
 
+Message* parse(HttpRequest* request)
+{
+  if (request->method == "POST" && request->headers.count("User-Agent") > 0) {
+    const string& temp = request->headers["User-Agent"];
+    const string& libprocess = "libprocess/";
+    size_t index = temp.find(libprocess);
+    if (index != string::npos) {
+      // Okay, now determine 'from'.
+      const UPID from(temp.substr(index + libprocess.size(), temp.size()));
+
+      // Now determine 'to'.
+      index = request->path.find('/', 1);
+      index = index != string::npos ? index - 1 : string::npos;
+      const UPID to(request->path.substr(1, index), ip, port);
+
+      // And now determine 'name'.
+      index = index != string::npos ? index + 2: request->path.size();
+      const string& name = request->path.substr(index);
+
+      VLOG(2) << "Parsed message name '" << name
+              << "' for " << to << " from " << from;
+
+      Message* message = new Message();
+      message->name = name;
+      message->from = from;
+      message->to = to;
+      message->body = request->body;
+
+      return message;
+    }
+  }
+
+  return NULL;
+}
+
+
 void handle_async(struct ev_loop *loop, ev_async *_, int revents)
 {
   synchronized (watchers) {
@@ -673,10 +736,14 @@ void recv_data(struct ev_loop *loop, ev_
       // Might block, try again later.
       break;
     } else if (length <= 0) {
-      // Socket error ... we consider closed.
-      const char* error = strerror(errno);
-      VLOG(1) << "Socket error while receiving: " << error;
-      link_manager->closed(c);
+      // Socket error or closed.
+      if (length < 0) {
+        const char* error = strerror(errno);
+        VLOG(2) << "Socket error while receiving: " << error;
+      } else {
+        VLOG(2) << "Socket closed while receiving";
+      }
+      socket_manager->closed(c);
       delete decoder;
       ev_io_stop(loop, watcher);
       delete watcher;
@@ -692,8 +759,8 @@ void recv_data(struct ev_loop *loop, ev_
           process_manager->deliver(c, request);
         }
       } else if (requests.empty() && decoder->failed()) {
-        VLOG(1) << "Decoder error while receiving";
-        link_manager->closed(c);
+        VLOG(2) << "Decoder error while receiving";
+        socket_manager->closed(c);
         delete decoder;
         ev_io_stop(loop, watcher);
         delete watcher;
@@ -726,10 +793,14 @@ void send_data(struct ev_loop *loop, ev_
       // Might block, try again later.
       break;
     } else if (length <= 0) {
-      // Socket closed or error ... we consider closed.
-      const char* error = strerror(errno);
-      VLOG(1) << "Socket error while sending: " << error;
-      link_manager->closed(c);
+      // Socket error or closed.
+      if (length < 0) {
+        const char* error = strerror(errno);
+        VLOG(2) << "Socket error while sending: " << error;
+      } else {
+        VLOG(2) << "Socket closed while sending";
+      }
+      socket_manager->closed(c);
       delete encoder;
       ev_io_stop(loop, watcher);
       delete watcher;
@@ -745,7 +816,7 @@ void send_data(struct ev_loop *loop, ev_
         delete encoder;
 
         // Check for more stuff to send on socket.
-        encoder = link_manager->next(c);
+        encoder = socket_manager->next(c);
         if (encoder != NULL) {
           watcher->data = encoder;
         } else {
@@ -771,7 +842,7 @@ void sending_connect(struct ev_loop *loo
   if (getsockopt(c, SOL_SOCKET, SO_ERROR, &opt, &optlen) < 0 || opt != 0) {
     // Connect failure.
     VLOG(1) << "Socket error while connecting";
-    link_manager->closed(c);
+    socket_manager->closed(c);
     MessageEncoder* encoder = (MessageEncoder*) watcher->data;
     delete encoder;
     ev_io_stop(loop, watcher);
@@ -796,7 +867,7 @@ void receiving_connect(struct ev_loop *l
   if (getsockopt(c, SOL_SOCKET, SO_ERROR, &opt, &optlen) < 0 || opt != 0) {
     // Connect failure.
     VLOG(1) << "Socket error while connecting";
-    link_manager->closed(c);
+    socket_manager->closed(c);
     DataDecoder* decoder = (DataDecoder*) watcher->data;
     delete decoder;
     ev_io_stop(loop, watcher);
@@ -1051,9 +1122,9 @@ void initialize(bool initialize_google_l
   signal(SIGPIPE, SIG_IGN);
 #endif /* __sun__ */
 
-  /* Create a new ProcessManager and LinkManager. */
+  /* Create a new ProcessManager and SocketManager. */
   process_manager = new ProcessManager();
-  link_manager = new LinkManager();
+  socket_manager = new SocketManager();
 
   /* Setup processing thread. */
   if (pthread_create (&proc_thread, NULL, schedule, NULL) != 0)
@@ -1176,61 +1247,97 @@ void initialize(bool initialize_google_l
 
   initializing = false;
 
+  // Create global garbage collector.
+  gc = spawn(new GarbageCollector());
+
   char temp[INET_ADDRSTRLEN];
   CHECK(inet_ntop(AF_INET, (in_addr *) &ip, temp, INET_ADDRSTRLEN) != NULL);
   VLOG(1) << "libprocess is initialized on " << temp << ":" << port;
 }
 
 
-HttpProxy::HttpProxy(int _c, const Future<HttpResponse>& _future)
-  : c(_c), future(_future)
+HttpResponseWaiter::HttpResponseWaiter(const PID<HttpProxy>& _proxy)
+  : proxy(_proxy) {}
+
+
+HttpResponseWaiter::~HttpResponseWaiter() {}
+
+
+void HttpResponseWaiter::await(const Future<HttpResponse>& future, bool persist)
 {
-  // TODO(benh): Do proper initialization of the proxy manager. Right
-  // now we can rely on invocations of the HttpProxy constructor to be
-  // serial, so we don't need to do any fancy
-  // synchronization. However, ultimately, we'd like to stick the
-  // initialization into 'initialize' above, but that seemed to have
-  // some issues. :(
-  if (proxy_manager == NULL) {
-    proxy_manager = new HttpProxyManager();
-    spawn(proxy_manager);
+  if (future.await(30)) {
+    dispatch(proxy, &HttpProxy::ready, future, persist);  
+  } else {
+    dispatch(proxy, &HttpProxy::unavailable, persist);  
   }
+}
+
+
+HttpProxy::HttpProxy(int _c) : c(_c)
+{
+  // Get garbage collected!
+  dispatch(gc, &GarbageCollector::manage<HttpProxy>, this);
+
+  // Create our waiter.
+  waiter = new HttpResponseWaiter(self());
+  spawn(waiter);
+}
+
 
-  dispatch(proxy_manager->self(), &HttpProxyManager::manage, this);
+HttpProxy::~HttpProxy()
+{
+  send(waiter->self(), TERMINATE);
+  wait(waiter->self());
+  delete waiter;
 }
 
 
-HttpProxy::~HttpProxy() {}
+void HttpProxy::handle(const Future<HttpResponse>& future, bool persist)
+{
+  dispatch(waiter->self(), &HttpResponseWaiter::await, future, persist);
+}
 
 
-void HttpProxy::operator () ()
+void HttpProxy::ready(const Future<HttpResponse>& future, bool persist)
 {
-  // Wait for the response!
+  CHECK(future.ready());
+
   const HttpResponse& response = future.get();
 
-  link_manager->send(new HttpResponseEncoder(response), c);
+  // Don't persist the connection if the responder doesn't want it to.
+  if (response.headers.count("Connection") > 0) {
+    const string& connection = response.headers.find("Connection")->second;
+    if (connection == "close") {
+      persist = false;
+    }
+  }
+
+  // See the semantics of SocketManager::send for details about how
+  // the socket will get closed (it might actually already be closed
+  // before we issue this send).
+  socket_manager->send(new HttpResponseEncoder(response), c, persist);
+}
+
+
+void HttpProxy::unavailable(bool persist)
+{
+  HttpResponse response = HttpServiceUnavailableResponse();
 
-  // How the socket gets closed is a bit esoteric. :( If the browser
-  // closes their socket then we will close it in when
-  // LinkManger::closed gets called. Otherwise, after the future gets
-  // set and we send the response then LinkManager::next will get
-  // called, which will close the socket. Ultimately, the
-  // HttpProxyManager will deal with deallocating the HttpProxy, even
-  // though it was allocated in ProcessManager::deliver. Ugh, gross,
-  // gross, gross. :(
+  // As above, the socket might all ready be closed when we do a send.
+  socket_manager->send(new HttpResponseEncoder(response), c, persist);
 }
 
 
-LinkManager::LinkManager()
+SocketManager::SocketManager()
 {
   synchronizer(this) = SYNCHRONIZED_INITIALIZER_RECURSIVE;
 }
 
 
-LinkManager::~LinkManager() {}
+SocketManager::~SocketManager() {}
 
 
-void LinkManager::link(ProcessBase *process, const UPID &to)
+void SocketManager::link(ProcessBase *process, const UPID &to)
 {
   // TODO(benh): The semantics we want to support for link are such
   // that if there is nobody to link to (local or remote) then a
@@ -1242,7 +1349,7 @@ void LinkManager::link(ProcessBase *proc
 
   assert(process != NULL);
 
-  const Node node = { to.ip, to.port };
+  Node node(to.ip, to.port);
 
   synchronized (this) {
     // Check if node is remote and there isn't a persistant link.
@@ -1250,13 +1357,16 @@ void LinkManager::link(ProcessBase *proc
       // Okay, no link, lets create a socket.
       int s;
 
-      if ((s = socket(AF_INET, SOCK_STREAM, IPPROTO_IP)) < 0)
+      if ((s = socket(AF_INET, SOCK_STREAM, IPPROTO_IP)) < 0) {
         fatalerror("failed to link (socket)");
+      }
 
-      if (set_nbio(s) < 0)
+      if (set_nbio(s) < 0) {
         fatalerror("failed to link (set_nbio)");
+      }
 
       sockets[s] = node;
+
       persists[node] = s;
 
       sockaddr_in addr;
@@ -1273,8 +1383,9 @@ void LinkManager::link(ProcessBase *proc
 
       // Try and connect to the node using this socket.
       if (connect(s, (sockaddr *) &addr, sizeof(addr)) < 0) {
-        if (errno != EINPROGRESS)
+        if (errno != EINPROGRESS) {
           fatalerror("failed to link (connect)");
+        }
 
         // Wait for socket to be connected.
         ev_io_init(watcher, receiving_connect, s, EV_WRITE);
@@ -1296,28 +1407,38 @@ void LinkManager::link(ProcessBase *proc
 }
 
 
-void LinkManager::send(Message* message)
+PID<HttpProxy> SocketManager::proxy(int s)
 {
-  assert(message != NULL);
+  synchronized (this) {
+    if (sockets.count(s) > 0) {
+      CHECK(proxies.count(s) > 0);
+      return proxies[s]->self();
+    } else {
+      // Register the socket with the manager for sending purposes. The
+      // current design doesn't let us create a valid "node" for this
+      // socket, so we use a "default" one for now.
+      sockets[s] = Node();
 
-  DataEncoder* encoder = new MessageEncoder(message);
+      CHECK(proxies.count(s) == 0);
+
+      HttpProxy* proxy = new HttpProxy(s);
+      spawn(proxy);
+      proxies[s] = proxy;
+      return proxy->self();
+    }
+  }
+}
 
-  Node node = { message->to.ip, message->to.port };
 
-  synchronized (this) {
-    // Check if there is already a link.
-    if (persists.count(node) > 0 || temps.count(node) > 0) {
-      int s = persists.count(node) > 0 ? persists[node] : temps[node];
+void SocketManager::send(DataEncoder* encoder, int s, bool persist)
+{
+  assert(encoder != NULL);
 
-      // Check whether or not this socket has an outgoing queue.
-      if (outgoing.count(s) != 0) {
+  synchronized (this) {
+    if (sockets.count(s) > 0) {
+      if (outgoing.count(s) > 0) {
         outgoing[s].push(encoder);
       } else {
-        // Must be a persistant socket since temporary socket
-        // shouldn't outlast it's outgoing queue!
-        assert(persists.count(node) != 0);
-        assert(temps.count(node) == 0 || temps[node] != s);
-
         // Initialize the outgoing queue.
         outgoing[s];
 
@@ -1333,19 +1454,50 @@ void LinkManager::send(Message* message)
 
         ev_async_send(loop, &async_watcher);
       }
+
+      // Set the socket to get closed if not persistant.
+      if (!persist) {
+        disposables.insert(s);
+      }
+    } else {
+      VLOG(1) << "Attempting to send on a no longer valid socket!";
+    }
+  }
+}
+
+
+void SocketManager::send(Message* message)
+{
+  assert(message != NULL);
+
+  DataEncoder* encoder = new MessageEncoder(message);
+
+  Node node(message->to.ip, message->to.port);
+
+  synchronized (this) {
+    // Check if there is already a socket.
+    bool persistant = persists.count(node) > 0;
+    bool temporary = temps.count(node) > 0;
+    if (persistant || temporary) {
+      int s = persistant ? persists[node] : temps[node];
+      send(encoder, s, persistant);
     } else {
       // No peristant or temporary socket to the node currently
       // exists, so we create a temporary one.
       int s;
 
-      if ((s = socket(AF_INET, SOCK_STREAM, IPPROTO_IP)) < 0)
+      if ((s = socket(AF_INET, SOCK_STREAM, IPPROTO_IP)) < 0) {
         fatalerror("failed to send (socket)");
+      }
 
-      if (set_nbio(s) < 0)
+      if (set_nbio(s) < 0) {
         fatalerror("failed to send (set_nbio)");
+      }
 
       sockets[s] = node;
+
       temps[node] = s;
+      disposables.insert(s);
 
       // Initialize the outgoing queue.
       outgoing[s];
@@ -1362,8 +1514,9 @@ void LinkManager::send(Message* message)
       watcher->data = encoder;
     
       if (connect(s, (sockaddr *) &addr, sizeof(addr)) < 0) {
-        if (errno != EINPROGRESS)
+        if (errno != EINPROGRESS) {
           fatalerror("failed to send (connect)");
+        }
 
         // Initialize watcher for connecting.
         ev_io_init(watcher, sending_connect, s, EV_WRITE);
@@ -1383,71 +1536,37 @@ void LinkManager::send(Message* message)
 }
 
 
-void LinkManager::send(DataEncoder* encoder, int s)
-{
-  assert(encoder != NULL);
-
-  // TODO(benh): This is a big hack for sending data on a socket from
-  // an encoder where we want one-time (i.e., disposable) use of the
-  // socket. This is particularly useful right now for proxies we set
-  // up for HTTP GET requests, however, the semantics of what this
-  // send means should be less esoteric.
-
-  synchronized (this) {
-    assert(sockets.count(s) == 0);
-    assert(outgoing.count(s) == 0);
-
-    sockets[s] = Node(); // TODO(benh): HACK!
-    outgoing[s];
-    disposables.insert(s);
-
-    // Allocate and initialize the watcher.
-    ev_io *watcher = new ev_io();
-    watcher->data = encoder;
-
-    ev_io_init(watcher, send_data, s, EV_WRITE);
-
-    synchronized (watchers) {
-      watchers->push(watcher);
-    }
-
-    ev_async_send(loop, &async_watcher);
-  }
-}
-
-
-DataEncoder* LinkManager::next(int s)
+DataEncoder* SocketManager::next(int s)
 {
   DataEncoder* encoder = NULL;
 
-  // Sometimes we look for another encoder even though this socket
-  // isn't actually maintained by the LinkManager (e.g., for proxy
-  // sockets). In the future this kind of esoteric semantics will
-  // hopefully get improved. :(
   synchronized (this) {
-    if (sockets.count(s) > 0 && outgoing.count(s) > 0) {
-      if (!outgoing[s].empty()) {
-        // More messages!
-        encoder = outgoing[s].front();
-        outgoing[s].pop();
-      } else {
-        // No more messages ... erase the outgoing queue.
-        outgoing.erase(s);
+    CHECK(sockets.count(s) > 0);
+    CHECK(outgoing.count(s) > 0);
 
-        // Close the socket if it was for one-time use.
-        if (disposables.count(s) > 0) {
-          disposables.erase(s);
-          sockets.erase(s);
-          close(s);
-        }
+    if (!outgoing[s].empty()) {
+      // More messages!
+      encoder = outgoing[s].front();
+      outgoing[s].pop();
+    } else {
+      // No more messages ... erase the outgoing queue.
+      outgoing.erase(s);
 
-        // Close the socket if it was temporary.
-        const Node &node = sockets[s];
+      // Close the socket if it was set for disposal.
+      if (disposables.count(s) > 0) {
+        // Also try and remove from temps.
+        const Node& node = sockets[s];
         if (temps.count(node) > 0 && temps[node] == s) {
           temps.erase(node);
-          sockets.erase(s);
-          close(s);
+        } else if (proxies.count(s) > 0) {
+          HttpProxy* proxy = proxies[s];
+          proxies.erase(s);
+          post(proxy->self(), TERMINATE);
         }
+
+        disposables.erase(s);
+        sockets.erase(s);
+        close(s);
       }
     }
   }
@@ -1456,7 +1575,7 @@ DataEncoder* LinkManager::next(int s)
 }
 
 
-void LinkManager::closed(int s)
+void SocketManager::closed(int s)
 {
   synchronized (this) {
     if (sockets.count(s) > 0) {
@@ -1465,25 +1584,30 @@ void LinkManager::closed(int s)
       // Don't bother invoking exited unless socket was persistant.
       if (persists.count(node) > 0 && persists[node] == s) {
 	persists.erase(node);
-	exited(node);
-      } else {
-        if (temps.count(node) > 0 && temps[node] == s) {
-          temps.erase(node);
-        }
+        exited(node);
+      } else if (temps.count(node) > 0 && temps[node] == s) {
+        temps.erase(node);
+      } else if (proxies.count(s) > 0) {
+        HttpProxy* proxy = proxies[s];
+        proxies.erase(s);
+        post(proxy->self(), TERMINATE);
       }
 
-      sockets.erase(s);
       outgoing.erase(s);
+      disposables.erase(s);
+      sockets.erase(s);
     }
   }
 
-  // This might have just been a receiving socket, so we want to make
-  // sure to call close so that the file descriptor can get reused.
+  // This might have just been a receiving socket (only sending
+  // sockets, with the exception of the receiving side of a persistant
+  // socket, get added to 'sockets'), so we want to make sure to call
+  // close so that the file descriptor can get reused.
   close(s);
 }
 
 
-void LinkManager::exited(const Node &node)
+void SocketManager::exited(const Node &node)
 {
   // TODO(benh): It would be cleaner if this routine could call back
   // into ProcessManager ... then we wouldn't have to convince
@@ -1510,7 +1634,7 @@ void LinkManager::exited(const Node &nod
 }
 
 
-void LinkManager::exited(ProcessBase *process)
+void SocketManager::exited(ProcessBase *process)
 {
   synchronized (this) {
     /* Remove any links this process might have had. */
@@ -1574,12 +1698,16 @@ bool ProcessManager::deliver(Message *me
     if (sender != NULL) {
       synchronized (timeouts) {
         if (clk != NULL) {
-          clk->setCurrent(receiver, max(clk->getCurrent(receiver),
-                                        clk->getCurrent(sender)));
+          ev_tstamp tstamp =
+            max(clk->getCurrent(receiver), clk->getCurrent(sender));
+          clk->setCurrent(receiver, tstamp);
         }
       }
     }
 
+    VLOG(2) << "Delivering message name '" << message->name
+            << "' for " << message->to << " from " << message->from;
+
     receiver->enqueue(message);
   } else {
     delete message;
@@ -1595,51 +1723,18 @@ bool ProcessManager::deliver(int c, Http
 {
   assert(request != NULL);
 
-  // Get out the receiver (needed regardless of if this is a standard
-  // HTTP request or a libprocess message).
-  const vector<string>& pairs = tokenize(request->path, "/");
-  if (pairs.size() != 2) {
-    // This has no receiver, send a response and cleanup.
-    VLOG(1) << "Returning '404 Not Found' for HTTP request '"
-            << request->path << "'";
-    link_manager->send(new HttpResponseEncoder(HttpNotFoundResponse()), c);
-    delete request;
-    return false;
-  }
-
-  UPID to(pairs[0], ip, port);
-
-  // Determine whether or not this is just a libprocess message.
-  if (request->method == "POST" && request->headers.count("User-Agent") > 0) {
-    const string& value = request->headers["User-Agent"];
-    string libprocess = "libprocess/";
-    size_t index = value.find(libprocess);
-    if (index != string::npos) {
-      // Okay, this is a libprocess message, now we can try and
-      // determine name and from.
-      UPID from(value.substr(index + libprocess.size(), value.size()));
-
-      string name = pairs[1];
-      if (name == "") {
-        delete request;
-        return false;
-      }
-  
-      Message* message = new Message();
-      message->name = name;
-      message->from = from;
-      message->to = to;
-      message->body = request->body;
-
-      delete request;
+  // Determine whether or not this is a libprocess message.
+  Message* message = parse(request);
 
-      return deliver(message, sender);
-    }
+  if (message != NULL) {
+    delete request;
+    return deliver(message, sender);
   }
 
-  // Okay, this isn't a libprocess message, so create a proxy and deliver.
+  // Treat this as an HTTP request and check for a valid receiver.
+  string temp = request->path.substr(1, request->path.find('/', 1) - 1);
 
-  if (ProcessReference receiver = use(to)) {
+  if (ProcessReference receiver = use(UPID(temp, ip, port))) {
     // If we have a local sender AND we are using a manual clock
     // then update the current time of the receiver to preserve
     // the happens-before relationship between the sender and
@@ -1649,24 +1744,39 @@ bool ProcessManager::deliver(int c, Http
     if (sender != NULL) {
       synchronized (timeouts) {
         if (clk != NULL) {
-          clk->setCurrent(receiver, max(clk->getCurrent(receiver),
-                                        clk->getCurrent(sender)));
+          ev_tstamp tstamp =
+            max(clk->getCurrent(receiver), clk->getCurrent(sender));
+          clk->setCurrent(receiver, tstamp);
         }
       }
     }
+
+    // Get the HttpProxy pid for this socket.
+    PID<HttpProxy> proxy = socket_manager->proxy(c);
     
     // Create the future to associate with whatever gets returned.
     Future<HttpResponse>* future = new Future<HttpResponse>();
 
-    // Spawn a proxy that waits on the future.
-    spawn(new HttpProxy(c, *future));
+    // Let the HttpProxy know about this request.
+    dispatch(proxy, &HttpProxy::handle, *future, request->keepAlive);
 
+    // Enqueue request and future for receiver.
     receiver->enqueue(new pair<HttpRequest*, Future<HttpResponse>*>(request, future));
   } else {
-    // This has no receiver, send a response and cleanup.
-    VLOG(1) << "Returning '404 Not Found' for HTTP request '"
+    // This has no receiver, send error response.
+    VLOG(1) << "Returning '404 Not Found' for HTTP request for '"
             << request->path << "'";
-    link_manager->send(new HttpResponseEncoder(HttpNotFoundResponse()), c);
+
+    // Get the HttpProxy pid for this socket.
+    PID<HttpProxy> proxy = socket_manager->proxy(c);
+
+    // Create a "future" response.
+    Future<HttpResponse> future = HttpNotFoundResponse();
+
+    // Let the HttpProxy know about this request.
+    dispatch(proxy, &HttpProxy::handle, future, request->keepAlive);
+
+    // Cleanup request.
     delete request;
     return false;
   }
@@ -1690,8 +1800,9 @@ bool ProcessManager::deliver(const UPID&
     if (sender != NULL) {
       synchronized (timeouts) {
         if (clk != NULL) {
-          clk->setCurrent(receiver, max(clk->getCurrent(receiver),
-                                        clk->getCurrent(sender)));
+          ev_tstamp tstamp =
+            max(clk->getCurrent(receiver), clk->getCurrent(sender));
+          clk->setCurrent(receiver, tstamp);
         }
       }
     }
@@ -1782,13 +1893,13 @@ void ProcessManager::link(ProcessBase *p
 {
   // Check if the pid is local.
   if (!(to.ip == ip && to.port == port)) {
-    link_manager->link(process, to);
+    socket_manager->link(process, to);
   } else {
     // Since the pid is local we want to get a reference to it's
     // underlying process so that while we are invoking the link
     // manager we don't miss sending a possible EXITED.
     if (ProcessReference _ = use(to)) {
-      link_manager->link(process, to);
+      socket_manager->link(process, to);
     } else {
       // Since the pid isn't valid it's process must have already died
       // (or hasn't been spawned yet) so send a process exit message.
@@ -1990,8 +2101,10 @@ bool ProcessManager::external_wait(const
       assert(process->state != ProcessBase::FINISHED);
 
       /* Check and see if a gate already exists. */
-      if (gates.find(process) == gates.end())
+      if (gates.find(process) == gates.end()) {
         gates[process] = new Gate();
+      }
+
       gate = gates[process];
       old = gate->approach();
     }
@@ -2000,8 +2113,11 @@ bool ProcessManager::external_wait(const
   /* Now arrive at the gate and wait until it opens. */
   if (gate != NULL) {
     gate->arrive(old);
-    if (gate->empty())
+
+    if (gate->empty()) {
       delete gate;
+    }
+
     return true;
   }
 
@@ -2028,12 +2144,13 @@ bool ProcessManager::poll(ProcessBase *p
       /* Allocate/Initialize the watcher. */
       ev_io *watcher = new ev_io();
 
-      if ((op & ProcessBase::RDWR) == ProcessBase::RDWR)
+      if ((op & ProcessBase::RDWR) == ProcessBase::RDWR) {
         ev_io_init(watcher, handle_poll, fd, EV_READ | EV_WRITE);
-      else if ((op & ProcessBase::RDONLY) == ProcessBase::RDONLY)
+      } else if ((op & ProcessBase::RDONLY) == ProcessBase::RDONLY) {
         ev_io_init(watcher, handle_poll, fd, EV_READ);
-      else if ((op & ProcessBase::WRONLY) == ProcessBase::WRONLY)
+      } else if ((op & ProcessBase::WRONLY) == ProcessBase::WRONLY) {
         ev_io_init(watcher, handle_poll, fd, EV_WRITE);
+      }
 
       // Tuple describing state (on heap in case we can't "cancel" it,
       // the watcher will always fire, even if we get interrupted and
@@ -2067,12 +2184,15 @@ bool ProcessManager::poll(ProcessBase *p
            process->state == ProcessBase::INTERRUPTED);
 
     /* Attempt to cancel the timer if necessary. */
-    if (secs != 0)
-      if (process->state != ProcessBase::TIMEDOUT)
+    if (secs != 0) {
+      if (process->state != ProcessBase::TIMEDOUT) {
         cancel_timeout(timeout);
+      }
+    }
 
-    if (process->state == ProcessBase::INTERRUPTED)
+    if (process->state == ProcessBase::INTERRUPTED) {
       interrupted = true;
+    }
 
     process->state = ProcessBase::RUNNING;
       
@@ -2230,6 +2350,20 @@ void ProcessManager::cleanup(ProcessBase
         delete message;
       }
 
+      // Free any pending requests.
+      while (!process->requests.empty()) {
+        pair<HttpRequest*, Future<HttpResponse>*>* request = process->requests.front();
+        process->requests.pop_front();
+        delete request;
+      }
+
+      // Free any pending delegators.
+      while (!process->delegators.empty()) {
+        function<void(ProcessBase*)>* delegator = process->delegators.front();
+        process->delegators.pop_front();
+        delete delegator;
+      }
+
       // Free current message.
       if (process->current) {
         delete process->current;
@@ -2263,8 +2397,8 @@ void ProcessManager::cleanup(ProcessBase
     process->unlock();
   }
 
-  // Inform link manager.
-  link_manager->exited(process);
+  // Inform socket manager.
+  socket_manager->exited(process);
 
   // Confirm process not in runq.
   synchronized (runq) {
@@ -2724,17 +2858,15 @@ string ProcessBase::serve(double secs, b
     pair<HttpRequest*, Future<HttpResponse>*>* request;
     function<void(ProcessBase*)>* delegator;
     if ((request = dequeue<pair<HttpRequest*, Future<HttpResponse>*> >()) != NULL) {
-      const string& id = "/" + pid.id + "/";
-      size_t index = request->first->path.find(id);
-      CHECK(index != string::npos);
-      const string& name =
-        request->first->path.substr(index + id.size(), request->first->path.size());
+      size_t index = request->first->path.find('/', 1);
+      index = index != string::npos ? index + 1 : request->first->path.size();
+      const string& name = request->first->path.substr(index);
       if (http_handlers.count(name) > 0) {
         http_handlers[name](*request->first).associate(*request->second);
       } else {
-        Promise<HttpResponse>(HttpNotFoundResponse()).associate(*request->second);
-        VLOG(1) << "Returning '404 Not Found' for HTTP request '"
+        VLOG(1) << "Returning '404 Not Found' for HTTP request for '"
                 << request->first->path << "'";
+        Promise<HttpResponse>(HttpNotFoundResponse()).associate(*request->second);
       }
       delete request->first;
       delete request->second;
@@ -2956,6 +3088,48 @@ bool wait(const UPID& pid)
 }
 
 
+class WaitWaiter : public Process<WaitWaiter>
+{
+public:
+  WaitWaiter(const UPID& _pid, double _secs, bool* _waited)
+    : pid(_pid), secs(_secs), waited(_waited) {}
+
+protected:
+  virtual void operator () ()
+  {
+    link(pid);
+    receive(secs);
+    if (name() == EXITED) {
+      *waited = true;
+    } else {
+      *waited = false;
+    }
+  }
+
+private:
+  const UPID pid;
+  const double secs;
+  bool* waited;
+};
+
+
+bool wait(const UPID& pid, double secs)
+{
+  initialize();
+
+  if (!pid) {
+    return false;
+  }
+
+  bool waited = false;
+
+  WaitWaiter waiter(pid, secs, &waited);
+  wait(spawn(&waiter));
+
+  return waited;
+}
+
+
 void invoke(const function<void (void)> &thunk)
 {
   initialize();
@@ -2986,6 +3160,8 @@ void post(const UPID& to, const string& 
 }
 
 
+namespace internal {
+
 void dispatcher(const UPID& pid, function<void(ProcessBase*)>* delegator)
 {
   if (proc_process != NULL) {
@@ -2995,4 +3171,4 @@ void dispatcher(const UPID& pid, functio
   }
 }
 
-}  // namespace process {
+}}  // namespace process { namespace internal {

Modified: incubator/mesos/trunk/third_party/libprocess/process.hpp
URL: http://svn.apache.org/viewvc/incubator/mesos/trunk/third_party/libprocess/process.hpp?rev=1132286&r1=1132285&r2=1132286&view=diff
==============================================================================
--- incubator/mesos/trunk/third_party/libprocess/process.hpp (original)
+++ incubator/mesos/trunk/third_party/libprocess/process.hpp Sun Jun  5 09:19:39 2011
@@ -123,7 +123,7 @@ protected:
   }
 
 private:
-  friend class LinkManager;
+  friend class SocketManager;
   friend class ProcessManager;
   friend class ProcessReference;
   friend void* schedule(void *);
@@ -199,7 +199,7 @@ public:
 protected:
   virtual void operator () ()
   {
-    while (true) serve();
+    do { serve(); if (name() == TERMINATE) break; } while (true);
   }
 };
 
@@ -207,9 +207,9 @@ protected:
 /**
  * Initialize the library.
  *
- * @param initGoogleLogging whether or not to initialize the Google
- *        Logging library (glog). If the application is also using
- *        glog, this should be set to false.
+ * @param initialize_google_logging whether or not to initialize the
+ *        Google Logging library (glog). If the application is also
+ *        using glog, this should be set to false.
  */
 void initialize(bool initialize_google_logging = true);
 
@@ -239,6 +239,16 @@ bool wait(const UPID& pid);
 
 
 /**
+ * Wait for process to exit no more than specified seconds (returns
+ * true if actually waited on a process).
+ *
+ * @param PID id of the process
+ * @param secs max time to wait
+ */
+bool wait(const UPID& pid, double secs);
+
+
+/**
  * Invoke the thunk in a legacy safe way (i.e., outside of libprocess).
  *
  * @param thunk function to be invoked
@@ -266,6 +276,8 @@ void filter(Filter* filter);
 void post(const UPID& to, const std::string& name, const char* data = NULL, size_t length = 0);
 
 
+namespace internal {
+
 template <typename T>
 void vdelegate(ProcessBase* process,
                std::tr1::function<void(T*)>* thunk)
@@ -278,14 +290,28 @@ void vdelegate(ProcessBase* process,
 
 
 template <typename R, typename T>
+void pdelegate(ProcessBase* process,
+               std::tr1::function<Promise<R>(T*)>* thunk,
+               Future<R>* future)
+{
+  assert(process != NULL);
+  assert(thunk != NULL);
+  assert(future != NULL);
+  (*thunk)(static_cast<T*>(process)).associate(*future);
+  delete thunk;
+  delete future;
+}
+
+
+template <typename R, typename T>
 void delegate(ProcessBase* process,
-              std::tr1::function<Promise<R>(T*)>* thunk,
+              std::tr1::function<R(T*)>* thunk,
               Future<R>* future)
 {
   assert(process != NULL);
   assert(thunk != NULL);
   assert(future != NULL);
-  (*thunk)(static_cast<T*>(process)).associate(*future);
+  Promise<R>((*thunk)(static_cast<T*>(process))).associate(*future);
   delete thunk;
   delete future;
 }
@@ -294,6 +320,8 @@ void delegate(ProcessBase* process,
 /* Dispatches the delegator to the specified process. */
 void dispatcher(const UPID& pid, std::tr1::function<void(ProcessBase*)>* delegator);
 
+} // namespace internal {
+
 
 /**
  * Dispatches a void method on a process.
@@ -308,11 +336,11 @@ void dispatch(const PID<T>& pid, void (T
     new std::tr1::function<void(T*)>(std::tr1::bind(method, std::tr1::placeholders::_1));
 
   std::tr1::function<void(ProcessBase*)>* delegator =
-    new std::tr1::function<void(ProcessBase*)>(std::tr1::bind(&vdelegate<T>,
+    new std::tr1::function<void(ProcessBase*)>(std::tr1::bind(&internal::vdelegate<T>,
                                                               std::tr1::placeholders::_1,
                                                               thunk));
 
-  dispatcher(pid, delegator);
+  internal::dispatcher(pid, delegator);
 }
 
 
@@ -328,14 +356,14 @@ void dispatch(const PID<T>& pid, void (T
 {
   std::tr1::function<void(T*)>* thunk =
     new std::tr1::function<void(T*)>(std::tr1::bind(method, std::tr1::placeholders::_1,
-                                                     a1));
+                                                    a1));
 
   std::tr1::function<void(ProcessBase*)>* delegator =
-    new std::tr1::function<void(ProcessBase*)>(std::tr1::bind(&vdelegate<T>,
+    new std::tr1::function<void(ProcessBase*)>(std::tr1::bind(&internal::vdelegate<T>,
                                                               std::tr1::placeholders::_1,
                                                               thunk));
 
-  dispatcher(pid, delegator);
+  internal::dispatcher(pid, delegator);
 }
 
 
@@ -354,14 +382,14 @@ void dispatch(const PID<T>& pid, void (T
 {
   std::tr1::function<void(T*)>* thunk =
     new std::tr1::function<void(T*)>(std::tr1::bind(method, std::tr1::placeholders::_1,
-                                                     a1, a2));
+                                                    a1, a2));
 
   std::tr1::function<void(ProcessBase*)>* delegator =
-    new std::tr1::function<void(ProcessBase*)>(std::tr1::bind(&vdelegate<T>,
+    new std::tr1::function<void(ProcessBase*)>(std::tr1::bind(&internal::vdelegate<T>,
                                                               std::tr1::placeholders::_1,
                                                               thunk));
 
-  dispatcher(pid, delegator);
+  internal::dispatcher(pid, delegator);
 }
 
 
@@ -378,18 +406,18 @@ template <typename T,
           typename P1, typename P2, typename P3,
           typename A1, typename A2, typename A3>
 void dispatch(const PID<T>& pid, void (T::*method)(P1, P2, P3),
-                       A1 a1, A2 a2, A3 a3)
+              A1 a1, A2 a2, A3 a3)
 {
   std::tr1::function<void(T*)>* thunk =
     new std::tr1::function<void(T*)>(std::tr1::bind(method, std::tr1::placeholders::_1,
-                                                     a1, a2, a3));
+                                                    a1, a2, a3));
 
   std::tr1::function<void(ProcessBase*)>* delegator =
-    new std::tr1::function<void(ProcessBase*)>(std::tr1::bind(&vdelegate<T>,
+    new std::tr1::function<void(ProcessBase*)>(std::tr1::bind(&internal::vdelegate<T>,
                                                               std::tr1::placeholders::_1,
                                                               thunk));
 
-  dispatcher(pid, delegator);
+  internal::dispatcher(pid, delegator);
 }
 
 
@@ -407,18 +435,18 @@ template <typename T,
           typename P1, typename P2, typename P3, typename P4,
           typename A1, typename A2, typename A3, typename A4>
 void dispatch(const PID<T>& pid, void (T::*method)(P1, P2, P3, P4),
-                       A1 a1, A2 a2, A3 a3, A4 a4)
+              A1 a1, A2 a2, A3 a3, A4 a4)
 {
   std::tr1::function<void(T*)>* thunk =
     new std::tr1::function<void(T*)>(std::tr1::bind(method, std::tr1::placeholders::_1,
-                                                     a1, a2, a3, a4));
+                                                    a1, a2, a3, a4));
 
   std::tr1::function<void(ProcessBase*)>* delegator =
-    new std::tr1::function<void(ProcessBase*)>(std::tr1::bind(&vdelegate<T>,
+    new std::tr1::function<void(ProcessBase*)>(std::tr1::bind(&internal::vdelegate<T>,
                                                               std::tr1::placeholders::_1,
                                                               thunk));
 
-  dispatcher(pid, delegator);
+  internal::dispatcher(pid, delegator);
 }
 
 
@@ -437,18 +465,18 @@ template <typename T,
           typename P1, typename P2, typename P3, typename P4, typename P5,
           typename A1, typename A2, typename A3, typename A4, typename A5>
 void dispatch(const PID<T>& pid, void (T::*method)(P1, P2, P3, P4, P5),
-                       A1 a1, A2 a2, A3 a3, A4 a4, A5 a5)
+              A1 a1, A2 a2, A3 a3, A4 a4, A5 a5)
 {
   std::tr1::function<void(T*)>* thunk =
     new std::tr1::function<void(T*)>(std::tr1::bind(method, std::tr1::placeholders::_1,
-                                                     a1, a2, a3, a4, a5));
+                                                    a1, a2, a3, a4, a5));
 
   std::tr1::function<void(ProcessBase*)>* delegator =
-    new std::tr1::function<void(ProcessBase*)>(std::tr1::bind(&vdelegate<T>,
+    new std::tr1::function<void(ProcessBase*)>(std::tr1::bind(&internal::vdelegate<T>,
                                                               std::tr1::placeholders::_1,
                                                               thunk));
 
-  dispatcher(pid, delegator);
+  internal::dispatcher(pid, delegator);
 }
 
 
@@ -463,17 +491,17 @@ void dispatch(const PID<T>& pid, void (T
 template <typename R, typename T>
 Future<R> dispatch(const PID<T>& pid, Promise<R> (T::*method)())
 {
-  std::tr1::function<Promise<R> (T*)>* thunk =
-    new std::tr1::function<Promise<R> (T*)>(std::tr1::bind(method, std::tr1::placeholders::_1));
+  std::tr1::function<Promise<R>(T*)>* thunk =
+    new std::tr1::function<Promise<R>(T*)>(std::tr1::bind(method, std::tr1::placeholders::_1));
 
   Future<R>* future = new Future<R>();
 
   std::tr1::function<void(ProcessBase*)>* delegator =
-    new std::tr1::function<void(ProcessBase*)>(std::tr1::bind(&delegate<R, T>,
+    new std::tr1::function<void(ProcessBase*)>(std::tr1::bind(&internal::pdelegate<R, T>,
                                                               std::tr1::placeholders::_1,
                                                               thunk, future));
 
-  dispatcher(pid, delegator);
+  internal::dispatcher(pid, delegator);
 
   return *future;
 }
@@ -491,18 +519,18 @@ Future<R> dispatch(const PID<T>& pid, Pr
 template <typename R, typename T, typename P1, typename A1>
 Future<R> dispatch(const PID<T>& pid, Promise<R> (T::*method)(P1), A1 a1)
 {
-  std::tr1::function<Promise<R> (T*)>* thunk =
-    new std::tr1::function<Promise<R> (T*)>(std::tr1::bind(method, std::tr1::placeholders::_1,
-                                                           a1));
+  std::tr1::function<Promise<R>(T*)>* thunk =
+    new std::tr1::function<Promise<R>(T*)>(std::tr1::bind(method, std::tr1::placeholders::_1,
+                                                          a1));
 
   Future<R>* future = new Future<R>();
 
   std::tr1::function<void(ProcessBase*)>* delegator =
-    new std::tr1::function<void(ProcessBase*)>(std::tr1::bind(&delegate<R, T>,
+    new std::tr1::function<void(ProcessBase*)>(std::tr1::bind(&internal::pdelegate<R, T>,
                                                               std::tr1::placeholders::_1,
                                                               thunk, future));
 
-  dispatcher(pid, delegator);
+  internal::dispatcher(pid, delegator);
 
   return *future;
 }
@@ -522,20 +550,20 @@ template <typename R, typename T,
           typename P1, typename P2,
           typename A1, typename A2>
 Future<R> dispatch(const PID<T>& pid, Promise<R> (T::*method)(P1, P2),
-                            A1 a1, A2 a2)
+                   A1 a1, A2 a2)
 {
-  std::tr1::function<Promise<R> (T*)>* thunk =
-    new std::tr1::function<Promise<R> (T*)>(std::tr1::bind(method, std::tr1::placeholders::_1,
-                                                           a1, a2));
+  std::tr1::function<Promise<R>(T*)>* thunk =
+    new std::tr1::function<Promise<R>(T*)>(std::tr1::bind(method, std::tr1::placeholders::_1,
+                                                          a1, a2));
 
   Future<R>* future = new Future<R>();
 
   std::tr1::function<void(ProcessBase*)>* delegator =
-    new std::tr1::function<void(ProcessBase*)>(std::tr1::bind(&delegate<R, T>,
+    new std::tr1::function<void(ProcessBase*)>(std::tr1::bind(&internal::pdelegate<R, T>,
                                                               std::tr1::placeholders::_1,
                                                               thunk, future));
 
-  dispatcher(pid, delegator);
+  internal::dispatcher(pid, delegator);
 
   return *future;
 }
@@ -556,20 +584,20 @@ template <typename R, typename T,
           typename P1, typename P2, typename P3,
           typename A1, typename A2, typename A3>
 Future<R> dispatch(const PID<T>& pid, Promise<R> (T::*method)(P1, P2, P3),
-                            A1 a1, A2 a2, A3 a3)
+                   A1 a1, A2 a2, A3 a3)
 {
-  std::tr1::function<Promise<R> (T*)>* thunk =
-    new std::tr1::function<Promise<R> (T*)>(std::tr1::bind(method, std::tr1::placeholders::_1,
-                                                           a1, a2, a3));
+  std::tr1::function<Promise<R>(T*)>* thunk =
+    new std::tr1::function<Promise<R>(T*)>(std::tr1::bind(method, std::tr1::placeholders::_1,
+                                                          a1, a2, a3));
 
   Future<R>* future = new Future<R>();
 
   std::tr1::function<void(ProcessBase*)>* delegator =
-    new std::tr1::function<void(ProcessBase*)>(std::tr1::bind(&delegate<R, T>,
+    new std::tr1::function<void(ProcessBase*)>(std::tr1::bind(&internal::pdelegate<R, T>,
                                                               std::tr1::placeholders::_1,
                                                               thunk, future));
 
-  dispatcher(pid, delegator);
+  internal::dispatcher(pid, delegator);
 
   return *future;
 }
@@ -591,20 +619,20 @@ template <typename R, typename T,
           typename P1, typename P2, typename P3, typename P4,
           typename A1, typename A2, typename A3, typename A4>
 Future<R> dispatch(const PID<T>& pid, Promise<R> (T::*method)(P1, P2, P3, P4),
-                            A1 a1, A2 a2, A3 a3, A4 a4)
+                   A1 a1, A2 a2, A3 a3, A4 a4)
 {
-  std::tr1::function<Promise<R> (T*)>* thunk =
-    new std::tr1::function<Promise<R> (T*)>(std::tr1::bind(method, std::tr1::placeholders::_1,
-                                                           a1, a2, a3, a4));
+  std::tr1::function<Promise<R>(T*)>* thunk =
+    new std::tr1::function<Promise<R>(T*)>(std::tr1::bind(method, std::tr1::placeholders::_1,
+                                                          a1, a2, a3, a4));
 
   Future<R>* future = new Future<R>();
 
   std::tr1::function<void(ProcessBase*)>* delegator =
-    new std::tr1::function<void(ProcessBase*)>(std::tr1::bind(&delegate<R, T>,
+    new std::tr1::function<void(ProcessBase*)>(std::tr1::bind(&internal::pdelegate<R, T>,
                                                               std::tr1::placeholders::_1,
                                                               thunk, future));
 
-  dispatcher(pid, delegator);
+  internal::dispatcher(pid, delegator);
 
   return *future;
 }
@@ -627,20 +655,212 @@ template <typename R, typename T,
           typename P1, typename P2, typename P3, typename P4, typename P5,
           typename A1, typename A2, typename A3, typename A4, typename A5>
 Future<R> dispatch(const PID<T>& pid, Promise<R> (T::*method)(P1, P2, P3, P4, P5),
+                   A1 a1, A2 a2, A3 a3, A4 a4, A5 a5)
+{
+  std::tr1::function<Promise<R>(T*)>* thunk =
+    new std::tr1::function<Promise<R>(T*)>(std::tr1::bind(method, std::tr1::placeholders::_1,
+                                                          a1, a2, a3, a4, a5));
+
+  Future<R>* future = new Future<R>();
+
+  std::tr1::function<void(ProcessBase*)>* delegator =
+    new std::tr1::function<void(ProcessBase*)>(std::tr1::bind(&internal::pdelegate<R, T>,
+                                                              std::tr1::placeholders::_1,
+                                                              thunk, future));
+
+  internal::dispatcher(pid, delegator);
+
+  return *future;
+}
+
+
+/**
+ * Dispatches a method on a process and returns the future that
+ * corresponds to the result of executing the method.
+ *
+ * @param pid receiver of message
+ * @param method method to invoke on receiver
+ */
+template <typename R, typename T>
+Future<R> dispatch(const PID<T>& pid, R (T::*method)())
+{
+  std::tr1::function<R(T*)>* thunk =
+    new std::tr1::function<R(T*)>(std::tr1::bind(method, std::tr1::placeholders::_1));
+
+  Future<R>* future = new Future<R>();
+
+  std::tr1::function<void(ProcessBase*)>* delegator =
+    new std::tr1::function<void(ProcessBase*)>(std::tr1::bind(&internal::delegate<R, T>,
+                                                              std::tr1::placeholders::_1,
+                                                              thunk, future));
+
+  internal::dispatcher(pid, delegator);
+
+  return *future;
+}
+
+
+/**
+ * Dispatches a method on a process and returns the future that
+ * corresponds to the result of executing the method.
+ *
+ * @param pid receiver of message
+ * @param method method to invoke on receiver
+ * @param a1 first argument to pass to method
+ */
+template <typename R, typename T, typename P1, typename A1>
+Future<R> dispatch(const PID<T>& pid, R (T::*method)(P1), A1 a1)
+{
+  std::tr1::function<R(T*)>* thunk =
+    new std::tr1::function<R(T*)>(std::tr1::bind(method, std::tr1::placeholders::_1,
+                                                 a1));
+
+  Future<R>* future = new Future<R>();
+
+  std::tr1::function<void(ProcessBase*)>* delegator =
+    new std::tr1::function<void(ProcessBase*)>(std::tr1::bind(&internal::delegate<R, T>,
+                                                              std::tr1::placeholders::_1,
+                                                              thunk, future));
+
+  internal::dispatcher(pid, delegator);
+
+  return *future;
+}
+
+
+/**
+ * Dispatches a method on a process and returns the future that
+ * corresponds to the result of executing the method.
+ *
+ * @param pid receiver of message
+ * @param method method to invoke on instance
+ * @param a1 first argument to pass to method
+ * @param a2 second argument to pass to method
+ * @return future corresponding to the result of executing the method
+ */
+template <typename R, typename T,
+          typename P1, typename P2,
+          typename A1, typename A2>
+Future<R> dispatch(const PID<T>& pid, R (T::*method)(P1, P2),
+                   A1 a1, A2 a2)
+{
+  std::tr1::function<R(T*)>* thunk =
+    new std::tr1::function<R(T*)>(std::tr1::bind(method, std::tr1::placeholders::_1,
+                                                 a1, a2));
+
+  Future<R>* future = new Future<R>();
+
+  std::tr1::function<void(ProcessBase*)>* delegator =
+    new std::tr1::function<void(ProcessBase*)>(std::tr1::bind(&internal::delegate<R, T>,
+                                                              std::tr1::placeholders::_1,
+                                                              thunk, future));
+
+  internal::dispatcher(pid, delegator);
+
+  return *future;
+}
+
+
+/**
+ * Dispatches a method on a process and returns the future that
+ * corresponds to the result of executing the method.
+ *
+ * @param pid receiver of message
+ * @param method method to invoke on instance
+ * @param a1 first argument to pass to method
+ * @param a2 second argument to pass to method
+ * @param a3 second argument to pass to method
+ * @return future corresponding to the result of executing the method
+ */
+template <typename R, typename T,
+          typename P1, typename P2, typename P3,
+          typename A1, typename A2, typename A3>
+Future<R> dispatch(const PID<T>& pid, R (T::*method)(P1, P2, P3),
+                   A1 a1, A2 a2, A3 a3)
+{
+  std::tr1::function<R(T*)>* thunk =
+    new std::tr1::function<R(T*)>(std::tr1::bind(method, std::tr1::placeholders::_1,
+                                                 a1, a2, a3));
+
+  Future<R>* future = new Future<R>();
+
+  std::tr1::function<void(ProcessBase*)>* delegator =
+    new std::tr1::function<void(ProcessBase*)>(std::tr1::bind(&internal::delegate<R, T>,
+                                                              std::tr1::placeholders::_1,
+                                                              thunk, future));
+
+  internal::dispatcher(pid, delegator);
+
+  return *future;
+}
+
+
+/**
+ * Dispatches a method on a process and returns the future that
+ * corresponds to the result of executing the method.
+ *
+ * @param pid receiver of message
+ * @param method method to invoke on instance
+ * @param a1 first argument to pass to method
+ * @param a2 second argument to pass to method
+ * @param a3 third argument to pass to method
+ * @param a4 fourth argument to pass to method
+ * @return future corresponding to the result of executing the method
+ */
+template <typename R, typename T,
+          typename P1, typename P2, typename P3, typename P4,
+          typename A1, typename A2, typename A3, typename A4>
+Future<R> dispatch(const PID<T>& pid, R (T::*method)(P1, P2, P3, P4),
+                   A1 a1, A2 a2, A3 a3, A4 a4)
+{
+  std::tr1::function<R(T*)>* thunk =
+    new std::tr1::function<R(T*)>(std::tr1::bind(method, std::tr1::placeholders::_1,
+                                                 a1, a2, a3, a4));
+
+  Future<R>* future = new Future<R>();
+
+  std::tr1::function<void(ProcessBase*)>* delegator =
+    new std::tr1::function<void(ProcessBase*)>(std::tr1::bind(&internal::delegate<R, T>,
+                                                              std::tr1::placeholders::_1,
+                                                              thunk, future));
+
+  internal::dispatcher(pid, delegator);
+
+  return *future;
+}
+
+
+/**
+ * Dispatches a method on a process and returns the future that
+ * corresponds to the result of executing the method.
+ *
+ * @param pid receiver of message
+ * @param method method to invoke on instance
+ * @param a1 first argument to pass to method
+ * @param a2 second argument to pass to method
+ * @param a3 third argument to pass to method
+ * @param a4 fourth argument to pass to method
+ * @param a5 fifth argument to pass to method
+ * @return future corresponding to the result of executing the method
+ */
+template <typename R, typename T,
+          typename P1, typename P2, typename P3, typename P4, typename P5,
+          typename A1, typename A2, typename A3, typename A4, typename A5>
+Future<R> dispatch(const PID<T>& pid, R (T::*method)(P1, P2, P3, P4, P5),
                             A1 a1, A2 a2, A3 a3, A4 a4, A5 a5)
 {
-  std::tr1::function<Promise<R> (T*)>* thunk =
-    new std::tr1::function<Promise<R> (T*)>(std::tr1::bind(method, std::tr1::placeholders::_1,
-                                                           a1, a2, a3, a4, a5));
+  std::tr1::function<R(T*)>* thunk =
+    new std::tr1::function<R(T*)>(std::tr1::bind(method, std::tr1::placeholders::_1,
+                                                 a1, a2, a3, a4, a5));
 
   Future<R>* future = new Future<R>();
 
   std::tr1::function<void(ProcessBase*)>* delegator =
-    new std::tr1::function<void(ProcessBase*)>(std::tr1::bind(&delegate<R, T>,
+    new std::tr1::function<void(ProcessBase*)>(std::tr1::bind(&internal::delegate<R, T>,
                                                               std::tr1::placeholders::_1,
                                                               thunk, future));
 
-  dispatcher(pid, delegator);
+  internal::dispatcher(pid, delegator);
 
   return *future;
 }
@@ -761,6 +981,122 @@ R call(const PID<T>& pid, Promise<R> (T:
   return dispatch(pid, method, a1, a2, a3, a4, a5).get();
 }
 
+
+/**
+ * Dispatches a method on a process and waits (on the underlying
+ * future) for the result.
+ *
+ * @param pid receiver of message
+ * @param method method to invoke on instance
+ * @return result of executing the method
+ */
+template <typename R, typename T>
+R call(const PID<T>& pid, R (T::*method)())
+{
+  return dispatch(pid, method).get();
+}
+
+
+/**
+ * Dispatches a method on a process and waits (on the underlying
+ * future) for the result.
+ *
+ * @param pid receiver of message
+ * @param method method to invoke on instance
+ * @param a1 argument to pass to method
+ * @return result of executing the method
+ */
+template <typename R, typename T, typename P1, typename A1>
+R call(const PID<T>& pid, R (T::*method)(P1), A1 a1)
+{
+  return dispatch(pid, method, a1).get();
+}
+
+
+/**
+ * Dispatches a method on a process and waits (on the underlying
+ * future) for the result.
+ *
+ * @param pid receiver of message
+ * @param method method to invoke on instance
+ * @param a1 first argument to pass to method
+ * @param a2 second argument to pass to method
+ * @return result of executing the method
+ */
+template <typename R, typename T,
+          typename P1, typename P2,
+          typename A1, typename A2>
+R call(const PID<T>& pid, R (T::*method)(P1, P2), A1 a1, A2 a2)
+{
+  return dispatch(pid, method, a1, a2).get();
+}
+
+
+/**
+ * Dispatches a method on a process and waits (on the underlying
+ * future) for the result.
+ *
+ * @param pid receiver of message
+ * @param method method to invoke on instance
+ * @param a1 first argument to pass to method
+ * @param a2 second argument to pass to method
+ * @param a3 second argument to pass to method
+ * @return result of executing the method
+ */
+template <typename R, typename T,
+          typename P1, typename P2, typename P3,
+          typename A1, typename A2, typename A3>
+R call(const PID<T>& pid, R (T::*method)(P1, P2, P3),
+                A1 a1, A2 a2, A3 a3)
+{
+  return dispatch(pid, method, a1, a2, a3).get();
+}
+
+
+/**
+ * Dispatches a method on a process and waits (on the underlying
+ * future) for the result.
+ *
+ * @param pid receiver of message
+ * @param method method to invoke on instance
+ * @param a1 first argument to pass to method
+ * @param a2 second argument to pass to method
+ * @param a3 third argument to pass to method
+ * @param a4 fourth argument to pass to method
+ * @return result of executing the method
+ */
+template <typename R, typename T,
+          typename P1, typename P2, typename P3, typename P4,
+          typename A1, typename A2, typename A3, typename A4>
+R call(const PID<T>& pid, R (T::*method)(P1, P2, P3, P4),
+                A1 a1, A2 a2, A3 a3, A4 a4)
+{
+  return dispatch(pid, method, a1, a2, a3, a4).get();
+}
+
+
+/**
+ * Dispatches a method on a process and waits (on the underlying
+ * future) for the result.
+ *
+ * @param pid receiver of message
+ * @param method method to invoke on instance
+ * @param a1 first argument to pass to method
+ * @param a2 second argument to pass to method
+ * @param a3 third argument to pass to method
+ * @param a4 fourth argument to pass to method
+ * @param a5 fifth argument to pass to method
+ * @return result of executing the method
+ */
+template <typename R, typename T,
+          typename P1, typename P2, typename P3, typename P4, typename P5,
+          typename A1, typename A2, typename A3, typename A4, typename A5>
+R call(const PID<T>& pid, R (T::*method)(P1, P2, P3, P4, P5),
+                A1 a1, A2 a2, A3 a3, A4 a4, A5 a5)
+{
+  return dispatch(pid, method, a1, a2, a3, a4, a5).get();
+}
+
 } // namespace process {
 
 #endif // __PROCESS_HPP__

Modified: incubator/mesos/trunk/third_party/libprocess/promise.hpp
URL: http://svn.apache.org/viewvc/incubator/mesos/trunk/third_party/libprocess/promise.hpp?rev=1132286&r1=1132285&r2=1132286&view=diff
==============================================================================
--- incubator/mesos/trunk/third_party/libprocess/promise.hpp (original)
+++ incubator/mesos/trunk/third_party/libprocess/promise.hpp Sun Jun  5 09:19:39 2011
@@ -11,14 +11,15 @@ class Promise
 {
 public:
   Promise();
-  Promise(const T &t_);
-  Promise(const Promise<T> &that);
+  Promise(const T& _t);
+  Promise(const Promise<T>& that);
   virtual ~Promise();
-  void set(const T &t_);
-  void associate(const Future<T> &future_);
+  void set(const T& _t);
+  bool ready() const;
+  void associate(const Future<T>& _future);
 
 private:
-  void operator = (const Promise<T> &);
+  void operator = (const Promise<T>&);
 
   enum State {
     UNSET_UNASSOCIATED,
@@ -27,10 +28,10 @@ private:
     SET_ASSOCIATED,
   };
 
-  int *refs;
-  T **t;
-  Future<T> **future;
-  State *state;
+  int* refs;
+  T** t;
+  Future<T>** future;
+  State* state;
 };
 
 
@@ -47,22 +48,22 @@ Promise<T>::Promise()
 {
   refs = new int;
   *refs = 1;
-  t = new T *;
+  t = new T*;
   *t = NULL;
-  future = new Future<T> *;
+  future = new Future<T>*;
   *future = NULL;
   state = new State;
   *state = UNSET_UNASSOCIATED;
 }
 
 template <typename T>
-Promise<T>::Promise(const T &_t)
+Promise<T>::Promise(const T& _t)
 {
   refs = new int;
   *refs = 1;
-  t = new T *;
+  t = new T*;
   *t = new T(_t);
-  future = new Future<T> *;
+  future = new Future<T>*;
   *future = NULL;
   state = new State;
   *state = SET_UNASSOCIATED;
@@ -70,7 +71,7 @@ Promise<T>::Promise(const T &_t)
 
 
 template <typename T>
-Promise<T>::Promise(const Promise<T> &that)
+Promise<T>::Promise(const Promise<T>& that)
 {
   assert(that.refs != NULL);
   assert(*that.refs > 0);
@@ -101,7 +102,7 @@ Promise<T>::~Promise()
 
 
 template <typename T>
-void Promise<T>::set(const T &t_)
+void Promise<T>::set(const T& _t)
 {
   assert(state != NULL);
   assert(*state == UNSET_UNASSOCIATED ||
@@ -116,12 +117,12 @@ void Promise<T>::set(const T &t_)
       (*future)->set(**t);
     } else {
       // Save the value for association later.
-      *t = new T(t_);
+      *t = new T(_t);
     }
   } else if (*state == UNSET_ASSOCIATED) {
     assert(future != NULL && *future != NULL);
     // Directly set the value in the future.
-    (*future)->set(t_);
+    (*future)->set(_t);
     __sync_bool_compare_and_swap(state, UNSET_ASSOCIATED, SET_ASSOCIATED);
   } else {
     assert(*state == SET_ASSOCIATED);
@@ -133,13 +134,21 @@ void Promise<T>::set(const T &t_)
 
 
 template <typename T>
-void Promise<T>::associate(const Future<T> &future_)
+bool Promise<T>::ready() const
+{
+  assert(state != NULL);
+  return *state == SET_UNASSOCIATED || *state == SET_ASSOCIATED;
+}
+
+
+template <typename T>
+void Promise<T>::associate(const Future<T>& _future)
 {
   assert(state != NULL);
   assert(*state == UNSET_UNASSOCIATED ||
          *state == SET_UNASSOCIATED);
   assert(future != NULL);
-  *future = new Future<T>(future_);
+  *future = new Future<T>(_future);
   if (*state == UNSET_UNASSOCIATED) {
     if (!__sync_bool_compare_and_swap(state, UNSET_UNASSOCIATED,
                                       UNSET_ASSOCIATED)) {

Modified: incubator/mesos/trunk/third_party/libprocess/tests.cpp
URL: http://svn.apache.org/viewvc/incubator/mesos/trunk/third_party/libprocess/tests.cpp?rev=1132286&r1=1132285&r2=1132286&view=diff
==============================================================================
--- incubator/mesos/trunk/third_party/libprocess/tests.cpp (original)
+++ incubator/mesos/trunk/third_party/libprocess/tests.cpp Sun Jun  5 09:19:39 2011
@@ -2,11 +2,14 @@
 
 #include <process.hpp>
 
+using process::Future;
 using process::PID;
 using process::Process;
+using process::Promise;
 using process::UPID;
 
-using testing::AtMost;
+using testing::_;
+using testing::ReturnArg;
 
 
 class SpawnMockProcess : public Process<SpawnMockProcess>
@@ -24,7 +27,7 @@ TEST(libprocess, spawn)
   SpawnMockProcess process;
 
   EXPECT_CALL(process, __operator_call__())
-    .Times(AtMost(1));
+    .Times(1);
 
   PID<SpawnMockProcess> pid = process::spawn(&process);
 
@@ -37,8 +40,11 @@ TEST(libprocess, spawn)
 class DispatchMockProcess : public Process<DispatchMockProcess>
 {
 public:
-  MOCK_METHOD0(func, void());
-  virtual void operator () () { serve(); }
+  MOCK_METHOD0(func0, void());
+  MOCK_METHOD1(func1, bool(bool));
+  MOCK_METHOD1(func2, Promise<bool>(bool));
+  MOCK_METHOD1(func3, int(int));
+  MOCK_METHOD1(func4, Promise<int>(int));
 };
 
 
@@ -48,14 +54,61 @@ TEST(libprocess, dispatch)
 
   DispatchMockProcess process;
 
-  EXPECT_CALL(process, func())
-    .Times(AtMost(1));
+  EXPECT_CALL(process, func0())
+    .Times(1);
+
+  EXPECT_CALL(process, func1(_))
+    .WillOnce(ReturnArg<0>());
+
+  EXPECT_CALL(process, func2(_))
+    .WillOnce(ReturnArg<0>());
+
+  PID<DispatchMockProcess> pid = process::spawn(&process);
+
+  ASSERT_FALSE(!pid);
+
+  process::dispatch(pid, &DispatchMockProcess::func0);
+
+  Future<bool> future;
+
+  future = process::dispatch(pid, &DispatchMockProcess::func1, true);
+
+  EXPECT_TRUE(future.get());
+  
+  future = process::dispatch(pid, &DispatchMockProcess::func2, true);
+
+  EXPECT_TRUE(future.get());
+
+  process::post(pid, process::TERMINATE);
+  process::wait(pid);
+}
+
+
+TEST(libprocess, call)
+{
+  ASSERT_TRUE(GTEST_IS_THREADSAFE);
+
+  DispatchMockProcess process;
+
+  EXPECT_CALL(process, func3(_))
+    .WillOnce(ReturnArg<0>());
+
+  EXPECT_CALL(process, func4(_))
+    .WillOnce(ReturnArg<0>());
 
   PID<DispatchMockProcess> pid = process::spawn(&process);
 
   ASSERT_FALSE(!pid);
 
-  process::dispatch(pid, &DispatchMockProcess::func);
+  int result;
+
+  result = process::call(pid, &DispatchMockProcess::func3, 42);
+
+  EXPECT_EQ(42, result);
+  
+  result = process::call(pid, &DispatchMockProcess::func4, 43);
+
+  EXPECT_EQ(43, result);
 
   process::post(pid, process::TERMINATE);
   process::wait(pid);
@@ -67,7 +120,6 @@ class InstallMockProcess : public Proces
 public:
   InstallMockProcess() { install("func", &InstallMockProcess::func); }
   MOCK_METHOD0(func, void());
-  virtual void operator () () { serve(); }
 };
 
 
@@ -78,7 +130,7 @@ TEST(libprocess, install)
   InstallMockProcess process;
 
   EXPECT_CALL(process, func())
-    .Times(AtMost(1));
+    .Times(1);
 
   PID<InstallMockProcess> pid = process::spawn(&process);
 
@@ -104,7 +156,6 @@ class DerivedMockProcess : public BaseMo
 public:
   DerivedMockProcess() {}
   MOCK_METHOD0(func, void());
-  virtual void operator () () { serve(); }
 };
 
 
@@ -115,10 +166,10 @@ TEST(libprocess, inheritance)
   DerivedMockProcess process;
 
   EXPECT_CALL(process, func())
-    .Times(AtMost(1));
+    .Times(2);
 
   EXPECT_CALL(process, foo())
-    .Times(AtMost(1));
+    .Times(1);
 
   PID<DerivedMockProcess> pid1 = process::spawn(&process);
 
@@ -131,6 +182,7 @@ TEST(libprocess, inheritance)
 
   ASSERT_EQ(pid2, pid3);
 
+  process::dispatch(pid3, &BaseMockProcess::func);
   process::dispatch(pid3, &BaseMockProcess::foo);
 
   process::post(pid1, process::TERMINATE);