You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@mesos.apache.org by be...@apache.org on 2012/01/27 02:25:15 UTC

svn commit: r1236485 [5/7] - in /incubator/mesos/trunk: ./ include/mesos/ src/common/ src/exec/ src/local/ src/log/ src/master/ src/python/native/ src/sched/ src/slave/ src/tests/ src/zookeeper/ third_party/libprocess/ third_party/libprocess/include/pr...

Added: incubator/mesos/trunk/third_party/libprocess/include/process/event.hpp
URL: http://svn.apache.org/viewvc/incubator/mesos/trunk/third_party/libprocess/include/process/event.hpp?rev=1236485&view=auto
==============================================================================
--- incubator/mesos/trunk/third_party/libprocess/include/process/event.hpp (added)
+++ incubator/mesos/trunk/third_party/libprocess/include/process/event.hpp Fri Jan 27 01:25:13 2012
@@ -0,0 +1,181 @@
+#ifndef __PROCESS_EVENT_HPP__
+#define __PROCESS_EVENT_HPP__
+
+#include <tr1/functional>
+
+#include <process/future.hpp>
+#include <process/http.hpp>
+#include <process/message.hpp>
+
+namespace process {
+
+// Forward declarations.
+struct ProcessBase;
+struct MessageEvent;
+struct DispatchEvent;
+struct HttpEvent;
+struct ExitedEvent;
+struct TerminateEvent;
+
+
+struct EventVisitor
+{
+  virtual void visit(const MessageEvent& event) {}
+  virtual void visit(const DispatchEvent& event) {}
+  virtual void visit(const HttpEvent& event) {}
+  virtual void visit(const ExitedEvent& event) {}
+  virtual void visit(const TerminateEvent& event) {}
+};
+
+
+struct Event
+{
+  virtual void visit(EventVisitor* visitor) const = 0;
+
+  template <typename T>
+  bool is() const
+  {
+    bool result = false;
+    struct IsVisitor : EventVisitor
+    {
+      IsVisitor(bool* _result) : result(_result) {}
+      virtual void visit(const T& t) { *result = true; }
+      bool* result;
+    } visitor(&result);
+    visit(&visitor);
+    return result;
+  }
+
+  template <typename T>
+  const T& as() const
+  {
+    const T* result = NULL;
+    struct AsVisitor : EventVisitor
+    {
+      AsVisitor(const T** _result) : result(_result) {}
+      virtual void visit(const T& t) { *result = &t; }
+      const T** result;
+    } visitor(&result);
+    visit(&visitor);
+    if (result == NULL) {
+      std::cerr << "Attempting to \"cast\" event incorrectly!" << std::endl;
+      abort();
+    }
+    return *result;
+  }
+};
+
+
+struct MessageEvent : Event
+{
+  MessageEvent(Message* _message)
+    : message(_message) {}
+
+  ~MessageEvent()
+  {
+    delete message;
+  }
+
+  virtual void visit(EventVisitor* visitor) const
+  {
+    visitor->visit(*this);
+  }
+
+  Message* const message;
+
+private:
+  // Not copyable, not assignable.
+  MessageEvent(const MessageEvent&);
+  MessageEvent& operator = (const MessageEvent&);
+};
+
+
+struct HttpEvent : Event
+{
+  HttpEvent(int _c, HttpRequest* _request)
+    : c(_c), request(_request) {}
+
+  ~HttpEvent()
+  {
+    delete request;
+  }
+
+  virtual void visit(EventVisitor* visitor) const
+  {
+    visitor->visit(*this);
+  }
+
+  const int c;
+  HttpRequest* const request;
+
+private:
+  // Not copyable, not assignable.
+  HttpEvent(const HttpEvent&);
+  HttpEvent& operator = (const HttpEvent&);
+};
+
+
+struct DispatchEvent : Event
+{
+  DispatchEvent(std::tr1::function<void(ProcessBase*)>* _function)
+    : function(_function) {}
+
+  ~DispatchEvent()
+  {
+    delete function;
+  }
+
+  virtual void visit(EventVisitor* visitor) const
+  {
+    visitor->visit(*this);
+  }
+
+  std::tr1::function<void(ProcessBase*)>* const function;
+
+private:
+  // Not copyable, not assignable.
+  DispatchEvent(const DispatchEvent&);
+  DispatchEvent& operator = (const DispatchEvent&);
+};
+
+
+struct ExitedEvent : Event
+{
+  ExitedEvent(const UPID& _pid)
+    : pid(_pid) {}
+
+  virtual void visit(EventVisitor* visitor) const
+  {
+    visitor->visit(*this);
+  }
+
+  const UPID pid;
+
+private:
+  // Not copyable, not assignable.
+  ExitedEvent(const ExitedEvent&);
+  ExitedEvent& operator = (const ExitedEvent&);
+};
+
+
+struct TerminateEvent : Event
+{
+  TerminateEvent(const UPID& _from)
+    : from(_from) {}
+
+  virtual void visit(EventVisitor* visitor) const
+  {
+    visitor->visit(*this);
+  }
+
+  const UPID from;
+
+private:
+  // Not copyable, not assignable.
+  TerminateEvent(const TerminateEvent&);
+  TerminateEvent& operator = (const TerminateEvent&);
+};
+
+} // namespace event {
+
+#endif // __PROCESS_EVENT_HPP__

Copied: incubator/mesos/trunk/third_party/libprocess/include/process/executor.hpp (from r1235899, incubator/mesos/trunk/third_party/libprocess/include/process/async.hpp)
URL: http://svn.apache.org/viewvc/incubator/mesos/trunk/third_party/libprocess/include/process/executor.hpp?p2=incubator/mesos/trunk/third_party/libprocess/include/process/executor.hpp&p1=incubator/mesos/trunk/third_party/libprocess/include/process/async.hpp&r1=1235899&r2=1236485&rev=1236485&view=diff
==============================================================================
--- incubator/mesos/trunk/third_party/libprocess/include/process/async.hpp (original)
+++ incubator/mesos/trunk/third_party/libprocess/include/process/executor.hpp Fri Jan 27 01:25:13 2012
@@ -1,63 +1,31 @@
-#ifndef __PROCESS_ASYNC_HPP__
-#define __PROCESS_ASYNC_HPP__
+#ifndef __PROCESS_EXECUTOR_HPP__
+#define __PROCESS_EXECUTOR_HPP__
 
+#include <process/deferred.hpp>
 #include <process/dispatch.hpp>
+#include <process/preprocessor.hpp>
 
-#include <boost/preprocessor/cat.hpp>
-
-#include <boost/preprocessor/arithmetic/inc.hpp>
-
-#include <boost/preprocessor/facilities/intercept.hpp>
-
-#include <boost/preprocessor/repetition/enum_params.hpp>
-#include <boost/preprocessor/repetition/enum_binary_params.hpp>
-#include <boost/preprocessor/repetition/enum_trailing_params.hpp>
-#include <boost/preprocessor/repetition/repeat.hpp>
-#include <boost/preprocessor/repetition/repeat_from_to.hpp>
-
-#define CAT BOOST_PP_CAT
-#define INC BOOST_PP_INC
-#define INTERCEPT BOOST_PP_INTERCEPT
-#define ENUM_PARAMS BOOST_PP_ENUM_PARAMS
-#define ENUM_BINARY_PARAMS BOOST_PP_ENUM_BINARY_PARAMS
-#define ENUM_TRAILING_PARAMS BOOST_PP_ENUM_TRAILING_PARAMS
-#define REPEAT BOOST_PP_REPEAT
-#define REPEAT_FROM_TO BOOST_PP_REPEAT_FROM_TO
-
-
-namespace async {
-
-// Acts like a function call but performs an asynchronous
-// dispatch. Thus, the "caller" knows that it will not block, for
-// example, within the "callee" callback.
-template <typename F>
-struct dispatch : std::tr1::function<F>
-{
-private:
-  friend class Dispatch; // Only class capable of creating these.
-  dispatch(const std::tr1::function<F>& f) : std::tr1::function<F>(f) {}
-};
-
+namespace process {
 
 // Underlying "process" which handles invoking actual callbacks
-// created through a Dispatch object.
-class DispatchProcess : public process::Process<DispatchProcess>
+// created through an Executor.
+class ExecutorProcess : public process::Process<ExecutorProcess>
 {
 private:
-  friend class Dispatch;
+  friend class Executor;
 
-  DispatchProcess() {}
-  ~DispatchProcess() {}
+  ExecutorProcess() {}
+  ~ExecutorProcess() {}
 
   // Not copyable, not assignable.
-  DispatchProcess(const DispatchProcess&);
-  DispatchProcess& operator = (const DispatchProcess&);
+  ExecutorProcess(const ExecutorProcess&);
+  ExecutorProcess& operator = (const ExecutorProcess&);
 
   // No arg invoke.
   void invoke(const std::tr1::function<void()>& f) { f(); }
 
   // Args invoke.
-#define APPLY(Z, N, DATA)                                      \
+#define TEMPLATE(Z, N, DATA)                                   \
   template <ENUM_PARAMS(N, typename A)>                        \
   void CAT(invoke, N)(                                         \
       const std::tr1::function<void(ENUM_PARAMS(N, A))>& f,    \
@@ -66,85 +34,95 @@ private:
     f(ENUM_PARAMS(N, a));                                      \
   }
 
-  REPEAT_FROM_TO(1, 11, APPLY, _) // Args A0 -> A9.
-#undef APPLY
+  REPEAT_FROM_TO(1, 11, TEMPLATE, _) // Args A0 -> A9.
+#undef TEMPLATE
 };
 
 
 // Provides an abstraction that can take a standard function object
-// and convert it to a dispatch object. Each converted function object
-// will get invoked serially with respect to one another.
-class Dispatch
+// and convert it to a 'deferred'. Each converted function object will
+// get invoked serially with respect to one another.
+class Executor
 {
 public:
-  Dispatch() {
+  Executor() {
     process::spawn(process);
   }
 
-  ~Dispatch()
+  ~Executor()
   {
     process::terminate(process);
     process::wait(process);
   }
 
+  bool stop()
+  {
+    process::terminate(process);
+
+    // TODO(benh): Note that this doesn't wait because that could
+    // cause a deadlock ... thus, the semantics here are that no more
+    // dispatches will occur after this function returns but one may
+    // be occuring concurrently.
+  }
+
   // We can't easily use 'std::tr1::_Placeholder<X>' when doing macro
   // expansion via ENUM_BINARY_PARAMS because compilers don't like it
   // when you try and concatenate '<' 'N' '>'. Thus, we typedef them.
 private:
-#define APPLY(Z, N, DATA)                               \
+#define TEMPLATE(Z, N, DATA)                            \
   typedef std::tr1::_Placeholder<INC(N)> _ ## N;
 
-  REPEAT(10, APPLY, _)
-#undef APPLY
+  REPEAT(10, TEMPLATE, _)
+#undef TEMPLATE
 
 public:
   // We provide wrappers for all standard function objects.
-  dispatch<void()> operator () (
-      const std::tr1::function<void()>& f)
+  deferred<void(void)> defer(
+      const std::tr1::function<void(void)>& f)
   {
-    return dispatch<void()>(
+    return deferred<void(void)>(
         std::tr1::bind(
-            &Dispatch::dispatcher,
+            &Executor::dispatcher,
             process.self(), f));
   }
 
-#define APPLY(Z, N, DATA)                                               \
+#define TEMPLATE(Z, N, DATA)                                            \
   template <ENUM_PARAMS(N, typename A)>                                 \
-  dispatch<void(ENUM_PARAMS(N, A))> operator () (                       \
+  deferred<void(ENUM_PARAMS(N, A))> defer(                              \
       const std::tr1::function<void(ENUM_PARAMS(N, A))>& f)             \
   {                                                                     \
-    return dispatch<void(ENUM_PARAMS(N, A))>(                           \
+    return deferred<void(ENUM_PARAMS(N, A))>(                           \
         std::tr1::bind(                                                 \
-            &Dispatch::CAT(dispatcher, N)<ENUM_PARAMS(N, A)>,           \
+            &Executor::CAT(dispatcher, N)<ENUM_PARAMS(N, A)>,           \
             process.self(), f,                                          \
             ENUM_BINARY_PARAMS(N, _, () INTERCEPT)));                   \
   }
 
-  REPEAT_FROM_TO(1, 11, APPLY, _) // Args A0 -> A9.
-#undef APPLY
+  REPEAT_FROM_TO(1, 11, TEMPLATE, _) // Args A0 -> A9.
+#undef TEMPLATE
 
   // Unfortunately, it is currently difficult to "forward" type
   // information from one result to another, so we must explicilty
   // define wrappers for all std::tr1::bind results. First we start
   // with the non-member std::tr1::bind results.
-  dispatch<void()> operator () (
+  deferred<void(void)> defer(
       const std::tr1::_Bind<void(*())()>& b)
   {
-    return (*this)(std::tr1::function<void()>(b));
+    return defer(std::tr1::function<void()>(b));
   }
 
-#define APPLY(Z, N, DATA)                                               \
+#define TEMPLATE(Z, N, DATA)                                            \
   template <ENUM_PARAMS(N, typename A)>                                 \
-  dispatch<void(ENUM_PARAMS(N, A))> operator () (                       \
+  deferred<void(ENUM_PARAMS(N, A))> defer(                              \
       const std::tr1::_Bind<                                            \
       void(*(ENUM_PARAMS(N, _)))                                        \
       (ENUM_PARAMS(N, A))>& b)                                          \
   {                                                                     \
-    return (*this)(std::tr1::function<void(ENUM_PARAMS(N, A))>(b));     \
+    return defer(std::tr1::function<void(ENUM_PARAMS(N, A))>(b));       \
   }
 
-  REPEAT_FROM_TO(1, 11, APPLY, _) // Args A0 -> A9.
-#undef APPLY
+  REPEAT_FROM_TO(1, 11, TEMPLATE, _) // Args A0 -> A9.
+#undef TEMPLATE
 
   // Now the member std::tr1::bind results:
   // 1. Non-const member (function), non-const pointer (receiver).
@@ -155,122 +133,113 @@ public:
   // 6. Const member, const reference.
   // 7. Non-const member, value.
   // 8. Const member, value.
-#define APPLY(Z, N, DATA)                                               \
+#define TEMPLATE(Z, N, DATA)                                            \
   template <typename T ENUM_TRAILING_PARAMS(N, typename A)>             \
-  dispatch<void(ENUM_PARAMS(N, A))> operator () (                       \
+  deferred<void(ENUM_PARAMS(N, A))> defer(                              \
       const std::tr1::_Bind<std::tr1::_Mem_fn<                          \
       void(T::*)(ENUM_PARAMS(N, A))>                                    \
       (T* ENUM_TRAILING_PARAMS(N, _))>& b)                              \
   {                                                                     \
-    return (*this)(std::tr1::function<void(ENUM_PARAMS(N, A))>(b));     \
+    return defer(std::tr1::function<void(ENUM_PARAMS(N, A))>(b));       \
   }                                                                     \
                                                                         \
   template <typename T ENUM_TRAILING_PARAMS(N, typename A)>             \
-  dispatch<void(ENUM_PARAMS(N, A))> operator () (                       \
+  deferred<void(ENUM_PARAMS(N, A))> defer(                              \
       const std::tr1::_Bind<std::tr1::_Mem_fn<                          \
       void(T::*)(ENUM_PARAMS(N, A)) const>                              \
       (T* ENUM_TRAILING_PARAMS(N, _))>& b)                              \
   {                                                                     \
-    return (*this)(std::tr1::function<void(ENUM_PARAMS(N, A))>(b));     \
+    return defer(std::tr1::function<void(ENUM_PARAMS(N, A))>(b));       \
   }                                                                     \
                                                                         \
   template <typename T ENUM_TRAILING_PARAMS(N, typename A)>             \
-  dispatch<void(ENUM_PARAMS(N, A))> operator () (                       \
+  deferred<void(ENUM_PARAMS(N, A))> defer(                              \
       const std::tr1::_Bind<std::tr1::_Mem_fn<                          \
       void(T::*)(ENUM_PARAMS(N, A)) const>                              \
       (const T* ENUM_TRAILING_PARAMS(N, _))>& b)                        \
   {                                                                     \
-    return (*this)(std::tr1::function<void(ENUM_PARAMS(N, A))>(b));     \
+    return defer(std::tr1::function<void(ENUM_PARAMS(N, A))>(b));       \
   }                                                                     \
                                                                         \
   template <typename T ENUM_TRAILING_PARAMS(N, typename A)>             \
-  dispatch<void(ENUM_PARAMS(N, A))> operator () (                       \
+  deferred<void(ENUM_PARAMS(N, A))> defer(                              \
       const std::tr1::_Bind<std::tr1::_Mem_fn<                          \
       void(T::*)(ENUM_PARAMS(N, A))>                                    \
       (std::tr1::reference_wrapper<T> ENUM_TRAILING_PARAMS(N, _))>& b)  \
   {                                                                     \
-    return (*this)(std::tr1::function<void(ENUM_PARAMS(N, A))>(b));     \
+    return defer(std::tr1::function<void(ENUM_PARAMS(N, A))>(b));       \
   }                                                                     \
                                                                         \
   template <typename T ENUM_TRAILING_PARAMS(N, typename A)>             \
-  dispatch<void(ENUM_PARAMS(N, A))> operator () (                       \
+  deferred<void(ENUM_PARAMS(N, A))> defer(                              \
       const std::tr1::_Bind<std::tr1::_Mem_fn<                          \
       void(T::*)(ENUM_PARAMS(N, A)) const>                              \
       (std::tr1::reference_wrapper<T> ENUM_TRAILING_PARAMS(N, _))>& b)  \
   {                                                                     \
-    return (*this)(std::tr1::function<void(ENUM_PARAMS(N, A))>(b));     \
+    return defer(std::tr1::function<void(ENUM_PARAMS(N, A))>(b));       \
   }                                                                     \
                                                                         \
   template <typename T ENUM_TRAILING_PARAMS(N, typename A)>             \
-  dispatch<void(ENUM_PARAMS(N, A))> operator () (                       \
+  deferred<void(ENUM_PARAMS(N, A))> defer(                              \
       const std::tr1::_Bind<std::tr1::_Mem_fn<                          \
       void(T::*)(ENUM_PARAMS(N, A)) const>                              \
       (std::tr1::reference_wrapper<const T> ENUM_TRAILING_PARAMS(N, _))>& b) \
   {                                                                     \
-    return (*this)(std::tr1::function<void(ENUM_PARAMS(N, A))>(b));     \
+    return defer(std::tr1::function<void(ENUM_PARAMS(N, A))>(b));       \
   }                                                                     \
                                                                         \
   template <typename T ENUM_TRAILING_PARAMS(N, typename A)>             \
-  dispatch<void(ENUM_PARAMS(N, A))> operator () (                       \
+  deferred<void(ENUM_PARAMS(N, A))> defer(                              \
       const std::tr1::_Bind<std::tr1::_Mem_fn<                          \
       void(T::*)(ENUM_PARAMS(N, A))>                                    \
       (T ENUM_TRAILING_PARAMS(N, _))>& b)                               \
   {                                                                     \
-    return (*this)(std::tr1::function<void(ENUM_PARAMS(N, A))>(b));     \
+    return defer(std::tr1::function<void(ENUM_PARAMS(N, A))>(b));       \
   }                                                                     \
                                                                         \
   template <typename T ENUM_TRAILING_PARAMS(N, typename A)>             \
-  dispatch<void(ENUM_PARAMS(N, A))> operator () (                       \
+  deferred<void(ENUM_PARAMS(N, A))> defer(                              \
       const std::tr1::_Bind<std::tr1::_Mem_fn<                          \
       void(T::*)(ENUM_PARAMS(N, A)) const>                              \
       (T ENUM_TRAILING_PARAMS(N, _))>& b)                               \
   {                                                                     \
-    return (*this)(std::tr1::function<void(ENUM_PARAMS(N, A))>(b));     \
+    return defer(std::tr1::function<void(ENUM_PARAMS(N, A))>(b));       \
   }
 
-  REPEAT(11, APPLY, _) // No args and args A0 -> A9.
-#undef APPLY
+  REPEAT(11, TEMPLATE, _) // No args and args A0 -> A9.
+#undef TEMPLATE
 
 private:
   // Not copyable, not assignable.
-  Dispatch(const Dispatch&);
-  Dispatch& operator = (const Dispatch&);
+  Executor(const Executor&);
+  Executor& operator = (const Executor&);
 
   static void dispatcher(
-      const process::PID<DispatchProcess>& pid,
-      const std::tr1::function<void()>& f)
+      const process::PID<ExecutorProcess>& pid,
+      const std::tr1::function<void(void)>& f)
   {
-    process::dispatch(pid, &DispatchProcess::invoke, f);
+    process::dispatch(pid, &ExecutorProcess::invoke, f);
   }
 
-#define APPLY(Z, N, DATA)                                               \
+#define TEMPLATE(Z, N, DATA)                                            \
   template <ENUM_PARAMS(N, typename A)>                                 \
   static void CAT(dispatcher, N)(                                       \
-      const process::PID<DispatchProcess>& pid,                         \
+      const process::PID<ExecutorProcess>& pid,                         \
       const std::tr1::function<void(ENUM_PARAMS(N, A))>& f,             \
       ENUM_BINARY_PARAMS(N, A, a))                                      \
   {                                                                     \
     process::dispatch(                                                  \
         pid,                                                            \
-        &DispatchProcess::CAT(invoke, N)<ENUM_PARAMS(N, A)>,            \
+        &ExecutorProcess::CAT(invoke, N)<ENUM_PARAMS(N, A)>,            \
         f, ENUM_PARAMS(N, a));                                          \
   }
 
-  REPEAT_FROM_TO(1, 11, APPLY, _) // Args A0 -> A9.
-#undef APPLY
+  REPEAT_FROM_TO(1, 11, TEMPLATE, _) // Args A0 -> A9.
+#undef TEMPLATE
 
-  DispatchProcess process;
+  ExecutorProcess process;
 };
 
-} // namespace async {
-
-#undef CAT
-#undef INC
-#undef INTERCEPT
-#undef ENUM_PARAMS
-#undef ENUM_BINARY_PARAMS
-#undef ENUM_TRAILING_PARAMS
-#undef REPEAT
-#undef REPEAT_FROM_TO
+} // namespace process {
 
-#endif // __PROCESS_ASYNC_HPP__
+#endif // __PROCESS_EXECUTOR_HPP__

Added: incubator/mesos/trunk/third_party/libprocess/include/process/filter.hpp
URL: http://svn.apache.org/viewvc/incubator/mesos/trunk/third_party/libprocess/include/process/filter.hpp?rev=1236485&view=auto
==============================================================================
--- incubator/mesos/trunk/third_party/libprocess/include/process/filter.hpp (added)
+++ incubator/mesos/trunk/third_party/libprocess/include/process/filter.hpp Fri Jan 27 01:25:13 2012
@@ -0,0 +1,23 @@
+#ifndef __PROCESS_FILTER_HPP__
+#define __PROCESS_FILTER_HPP__
+
+#include <process/event.hpp>
+
+namespace process {
+ 
+class Filter {
+public:
+  virtual bool filter(const MessageEvent& event) { return false; }
+  virtual bool filter(const DispatchEvent& event) { return false; }
+  virtual bool filter(const HttpEvent& event) { return false; }
+  virtual bool filter(const ExitedEvent& event) { return false; }
+};
+
+
+// Use the specified filter on messages that get enqueued (note,
+// however, that you cannot filter timeout messages).
+void filter(Filter* filter);
+
+} // namespace process {
+
+#endif // __PROCESS_FILTER_HPP__

Modified: incubator/mesos/trunk/third_party/libprocess/include/process/future.hpp
URL: http://svn.apache.org/viewvc/incubator/mesos/trunk/third_party/libprocess/include/process/future.hpp?rev=1236485&r1=1236484&r2=1236485&view=diff
==============================================================================
--- incubator/mesos/trunk/third_party/libprocess/include/process/future.hpp (original)
+++ incubator/mesos/trunk/third_party/libprocess/include/process/future.hpp Fri Jan 27 01:25:13 2012
@@ -1,15 +1,17 @@
 #ifndef __PROCESS_FUTURE_HPP__
 #define __PROCESS_FUTURE_HPP__
 
+#include <assert.h>
+
 #include <queue>
 #include <set>
 
 #include <tr1/functional>
+#include <tr1/memory> // TODO(benh): Replace shared_ptr with unique_ptr.
 
 #include <process/latch.hpp>
 #include <process/option.hpp>
 
-
 namespace process {
 
 // Forward declaration of Promise.
@@ -29,9 +31,9 @@ public:
   Future(const Future<T>& that);
   ~Future();
 
-  // Futures are assignable. This results in the reference to the
-  // previous future data being decremented and a reference to 'that'
-  // being incremented.
+  // Futures are assignable (and copyable). This results in the
+  // reference to the previous future data being decremented and a
+  // reference to 'that' being incremented.
   Future<T>& operator = (const Future<T>& that);
 
   // Comparision operators useful for using futures in collections.
@@ -108,15 +110,13 @@ private:
 };
 
 
-// TODO(benh): Consider making promise a non-copyable, non-assignabl
-// subclass of Future. For now, they'll live as distinct types.
+// TODO(benh): Making Promise a subclass of Future?
 template <typename T>
 class Promise
 {
 public:
   Promise();
   Promise(const T& t);
-  Promise(const Promise<T>& that);
   ~Promise();
 
   bool set(const T& _t);
@@ -126,8 +126,9 @@ public:
   Future<T> future() const;
 
 private:
-  // Not assignable.
-  void operator = (const Promise<T>&);
+  // Not copyable, not assignable.
+  Promise(const Promise<T>&);
+  Promise<T>& operator = (const Promise<T>&);
 
   Future<T> f;
 };
@@ -151,11 +152,6 @@ Promise<T>::Promise(const T& t)
 
 
 template <typename T>
-Promise<T>::Promise(const Promise<T>& that)
-  : f(that.f) {}
-
-
-template <typename T>
 Promise<T>::~Promise() {}
 
 
@@ -197,62 +193,56 @@ inline void release(int* lock)
   assert(unlocked);
 }
 
-namespace callbacks {
-
 template <typename T>
-void select(const Future<T>& future, Promise<Future<T> > promise)
+void select(
+    const Future<T>& future,
+    std::tr1::shared_ptr<Promise<Future<T > > > promise)
 {
   // We never fail the future associated with our promise.
-  assert(!promise.future().isFailed());
+  assert(!promise->future().isFailed());
 
-  if (promise.future().isPending()) { // Avoid acquiring a lock.
+  if (promise->future().isPending()) { // No-op if it's discarded.
     if (future.isReady()) { // We only set the promise if a future is ready.
-      promise.set(future);
+      promise->set(future);
     }
   }
 }
 
-} // namespace callback {
 } // namespace internal {
 
 
 // TODO(benh): Move select and discard into 'futures' namespace.
 
-// Returns an option of a ready future or none in the event of
-// timeout. Note that select DOES NOT return for a future that has
-// failed or been discarded.
+// Returns a future that captures any ready future in a set. Note that
+// select DOES NOT capture a future that has failed or been discarded.
 template <typename T>
-Option<Future<T> > select(const std::set<Future<T> >& futures, double secs)
+Future<Future<T> > select(const std::set<Future<T> >& futures)
 {
-  Promise<Future<T> > promise;
+  std::tr1::shared_ptr<Promise<Future<T> > > promise(
+      new Promise<Future<T> >());
 
-  std::tr1::function<void(const Future<T>&)> callback =
-    std::tr1::bind(internal::callbacks::select<T>,
-                   std::tr1::placeholders::_1, promise);
+  Future<Future<T> > future = promise->future();
+
+  std::tr1::function<void(const Future<T>&)> select =
+    std::tr1::bind(&internal::select<T>,
+                   std::tr1::placeholders::_1,
+                   promise);
 
   typename std::set<Future<T> >::iterator iterator;
   for (iterator = futures.begin(); iterator != futures.end(); ++iterator) {
-    const Future<T>& future = *iterator;
-    future.onAny(callback);
+    (*iterator).onAny(select);
   }
 
-  Future<Future<T> > future = promise.future();
-
-  if (future.await(secs)) {
-    return Option<Future<T> >::some(future.get());
-  } else {
-    future.discard();
-    return Option<Future<T> >::none();
-  }
+  return future;
 }
 
 
 template <typename T>
 void discard(const std::set<Future<T> >& futures)
 {
-  typename std::set<Future<T> >::const_iterator iterator;
+  typename std::set<Future<T> >::iterator iterator;
   for (iterator = futures.begin(); iterator != futures.end(); ++iterator) {
-    Future<T> future = *iterator;
+    Future<T> future = *iterator; // Need a non-const copy to discard.
     future.discard();
   }
 }

Modified: incubator/mesos/trunk/third_party/libprocess/include/process/gc.hpp
URL: http://svn.apache.org/viewvc/incubator/mesos/trunk/third_party/libprocess/include/process/gc.hpp?rev=1236485&r1=1236484&r2=1236485&view=diff
==============================================================================
--- incubator/mesos/trunk/third_party/libprocess/include/process/gc.hpp (original)
+++ incubator/mesos/trunk/third_party/libprocess/include/process/gc.hpp Fri Jan 27 01:25:13 2012
@@ -11,7 +11,7 @@ namespace process {
 class GarbageCollector : public Process<GarbageCollector>
 {
 public:
-  GarbageCollector() {}
+  GarbageCollector() : ProcessBase("__gc__") {}
   virtual ~GarbageCollector() {}
 
   template <typename T>
@@ -25,15 +25,12 @@ public:
   }
 
 protected:
-  virtual void operator () ()
+  virtual void exited(const UPID& pid)
   {
-    while (true) {
-      serve();
-      if (name() == EXITED && processes.count(from()) > 0) {
-        const ProcessBase* process = processes[from()];
-        processes.erase(from());
-        delete process;
-      }
+    if (processes.count(pid) > 0) {
+      const ProcessBase* process = processes[pid];
+      processes.erase(pid);
+      delete process;
     }
   }
 

Modified: incubator/mesos/trunk/third_party/libprocess/include/process/latch.hpp
URL: http://svn.apache.org/viewvc/incubator/mesos/trunk/third_party/libprocess/include/process/latch.hpp?rev=1236485&r1=1236484&r2=1236485&view=diff
==============================================================================
--- incubator/mesos/trunk/third_party/libprocess/include/process/latch.hpp (original)
+++ incubator/mesos/trunk/third_party/libprocess/include/process/latch.hpp Fri Jan 27 01:25:13 2012
@@ -5,16 +5,14 @@
 
 namespace process {
 
-class LatchProcess;
-
 class Latch
 {
 public:
   Latch();
   virtual ~Latch();
 
-  bool operator == (const Latch& that) const { return latch == that.latch; }
-  bool operator < (const Latch& that) const { return latch < that.latch; }
+  bool operator == (const Latch& that) const { return pid == that.pid; }
+  bool operator < (const Latch& that) const { return pid < that.pid; }
 
   void trigger();
   bool await(double secs = 0);
@@ -25,7 +23,7 @@ private:
   Latch& operator = (const Latch& that);
 
   bool triggered;
-  PID<LatchProcess> latch;
+  UPID pid;
 };
 
 }  // namespace process {

Added: incubator/mesos/trunk/third_party/libprocess/include/process/message.hpp
URL: http://svn.apache.org/viewvc/incubator/mesos/trunk/third_party/libprocess/include/process/message.hpp?rev=1236485&view=auto
==============================================================================
--- incubator/mesos/trunk/third_party/libprocess/include/process/message.hpp (added)
+++ incubator/mesos/trunk/third_party/libprocess/include/process/message.hpp Fri Jan 27 01:25:13 2012
@@ -0,0 +1,20 @@
+#ifndef __PROCESS_MESSAGE_HPP__
+#define __PROCESS_MESSAGE_HPP__
+
+#include <string>
+
+#include <process/pid.hpp>
+
+namespace process {
+
+struct Message
+{
+  std::string name;
+  UPID from;
+  UPID to;
+  std::string body;
+};
+
+} // namespace process {
+
+#endif // __PROCESS_MESSAGE_HPP__

Added: incubator/mesos/trunk/third_party/libprocess/include/process/preprocessor.hpp
URL: http://svn.apache.org/viewvc/incubator/mesos/trunk/third_party/libprocess/include/process/preprocessor.hpp?rev=1236485&view=auto
==============================================================================
--- incubator/mesos/trunk/third_party/libprocess/include/process/preprocessor.hpp (added)
+++ incubator/mesos/trunk/third_party/libprocess/include/process/preprocessor.hpp Fri Jan 27 01:25:13 2012
@@ -0,0 +1,29 @@
+#ifndef __PROCESS_PREPROCESSOR_HPP__
+#define __PROCESS_PREPROCESSOR_HPP__
+
+#include <boost/preprocessor/cat.hpp>
+
+#include <boost/preprocessor/arithmetic/inc.hpp>
+
+#include <boost/preprocessor/facilities/intercept.hpp>
+
+#include <boost/preprocessor/repetition/enum_params.hpp>
+#include <boost/preprocessor/repetition/enum_binary_params.hpp>
+#include <boost/preprocessor/repetition/enum_trailing_params.hpp>
+#include <boost/preprocessor/repetition/repeat.hpp>
+#include <boost/preprocessor/repetition/repeat_from_to.hpp>
+
+// Provides aliases to a bunch of preprocessor macros useful for
+// creating template definitions that have varying number of
+// parameters (should be removable with C++-11 variadic templates).
+
+#define CAT BOOST_PP_CAT
+#define INC BOOST_PP_INC
+#define INTERCEPT BOOST_PP_INTERCEPT
+#define ENUM_PARAMS BOOST_PP_ENUM_PARAMS
+#define ENUM_BINARY_PARAMS BOOST_PP_ENUM_BINARY_PARAMS
+#define ENUM_TRAILING_PARAMS BOOST_PP_ENUM_TRAILING_PARAMS
+#define REPEAT BOOST_PP_REPEAT
+#define REPEAT_FROM_TO BOOST_PP_REPEAT_FROM_TO
+
+#endif // __PROCESS_PREPROCESSOR_HPP__

Modified: incubator/mesos/trunk/third_party/libprocess/include/process/process.hpp
URL: http://svn.apache.org/viewvc/incubator/mesos/trunk/third_party/libprocess/include/process/process.hpp?rev=1236485&r1=1236484&r2=1236485&view=diff
==============================================================================
--- incubator/mesos/trunk/third_party/libprocess/include/process/process.hpp (original)
+++ incubator/mesos/trunk/third_party/libprocess/include/process/process.hpp Fri Jan 27 01:25:13 2012
@@ -1,57 +1,24 @@
 #ifndef __PROCESS_PROCESS_HPP__
 #define __PROCESS_PROCESS_HPP__
 
-#include <assert.h>
 #include <stdint.h>
-#include <stdlib.h>
-#include <ucontext.h>
-
-#include <sys/time.h>
+#include <pthread.h>
 
 #include <map>
 #include <queue>
 
 #include <tr1/functional>
 
-#include <process/future.hpp>
+#include <process/clock.hpp>
+#include <process/event.hpp>
+#include <process/filter.hpp>
 #include <process/http.hpp>
+#include <process/message.hpp>
 #include <process/pid.hpp>
 
-
 namespace process {
 
-const std::string NOTHING = "__process_nothing__";
-const std::string ERROR = "__process_error__";
-const std::string TIMEOUT = "__process_timeout__";
-const std::string EXITED = "__process_exited__";
-const std::string TERMINATE = "__process_terminate__";
-
-
-struct Message {
-  std::string name;
-  UPID from;
-  UPID to;
-  std::string body;
-};
-
-
-class Clock {
-public:
-  static double now();
-  static void pause();
-  static void resume();
-  static void advance(double secs);
-};
-
-
-class Filter {
-public:
-  // TODO(benh): Support filtering HTTP requests?
-  virtual bool filter(Message*) = 0;
-};
-
-
-class ProcessBase
+class ProcessBase : public EventVisitor
 {
 public:
   ProcessBase(const std::string& id = "");
@@ -60,174 +27,158 @@ public:
 
   UPID self() const { return pid; }
 
-  static UPID spawn(ProcessBase* process, bool manage = false);
-
 protected:
-  // Function run when process spawned.
-//   virtual void operator () () = 0;
-  virtual void operator () ()
+  // Invoked when an event is serviced.
+  virtual void serve(const Event& event)
   {
-    do { if (serve() == TERMINATE) break; } while (true);
+    event.visit(this);
   }
 
-  // Returns the sender's PID of the last dequeued (current) message.
-  UPID from() const;
-
-  // Returns the name of the last dequeued (current) message.
-  const std::string& name() const;
-
-  // Returns the body of the last dequeued (current) message.
-  const std::string& body() const;
-
-  // Put a message at front of queue.
-  void inject(const UPID& from,
-              const std::string& name,
-              const char* data = NULL,
-              size_t length = 0);
+  // Callbacks used to visit (i.e., handle) a specific event.
+  virtual void visit(const MessageEvent& event);
+  virtual void visit(const DispatchEvent& event);
+  virtual void visit(const HttpEvent& event);
+  virtual void visit(const ExitedEvent& event);
+  virtual void visit(const TerminateEvent& event);
+
+  // Invoked when a process gets spawned.
+  virtual void initialize() {}
+
+  // Invoked when a process is terminated (unless visit is overriden).
+  virtual void finalize() {}
+
+  // Invoked when a linked process has exited (see link).
+  virtual void exited(const UPID& pid) {}
+
+  // Invoked when a linked process can no longer be monitored (see link).
+  virtual void lost(const UPID& pid) {}
+
+  // Puts a message at front of queue.
+  void inject(
+      const UPID& from,
+      const std::string& name,
+      const char* data = NULL,
+      size_t length = 0);
 
   // Sends a message with data to PID.
-  void send(const UPID& to,
-            const std::string &name,
-            const char *data = NULL,
-            size_t length = 0);
-
-  // Blocks for message at most specified seconds (0 implies forever).
-  std::string receive(double secs = 0);
-
-  // Processes dispatch messages.
-  std::string serve(double secs = 0, bool once = false);
-
-  // Blocks at least specified seconds (may block longer).
-  void pause(double secs);
+  void send(
+      const UPID& to,
+      const std::string& name,
+      const char* data = NULL,
+      size_t length = 0);
 
-  // Links with the specified PID.
+  // Links with the specified PID. Linking with a process from within
+  // the same "operating system process" is gauranteed to give you
+  // perfect monitoring of that process. However, linking with a
+  // process on another machine might result in receiving lost
+  // callbacks due to the nature of a distributed environment.
   UPID link(const UPID& pid);
 
-  // IO events for polling.
-  enum { RDONLY = 01, WRONLY = 02, RDWR = 03 };
-
-  // Wait until operation is ready for file descriptor (or message
-  // received ignore is false).
-  bool poll(int fd, int op, double secs = 0, bool ignore = true);
-
-  // Returns true if operation on file descriptor is ready.
-  bool ready(int fd, int op);
+  // The default visit implementation for message events invokes
+  // installed message handlers, or delegates the message to another
+  // process (a delegate can be installed below but a message handler
+  // always takes precedence over delegating). A message handler is
+  // any function which takes two arguments, the "from" pid and the
+  // message body.
+  typedef std::tr1::function<void(const UPID&, const std::string&)>
+  MessageHandler;
 
-  // Returns sub-second elapsed time (according to this process).
-  double elapsedTime();
-
-  // Delegate incoming message's with the specified name to pid.
-  void delegate(const std::string& name, const UPID& pid)
-  {
-    delegates[name] = pid;
-  }
-
-  typedef std::tr1::function<void()> MessageHandler;
-
-  // Install a handler for a message.
-  void installMessageHandler(
+  // Setup a handler for a message.
+  void install(
       const std::string& name,
       const MessageHandler& handler)
   {
-    messageHandlers[name] = handler;
+    handlers.message[name] = handler;
   }
 
   template <typename T>
-  void installMessageHandler(
+  void install(
       const std::string& name,
-      void (T::*method)())
+      void (T::*method)(const UPID&, const std::string&))
+  {
+    // Note that we use dynamic_cast here so a process can use
+    // multiple inheritance if it sees so fit (e.g., to implement
+    // multiple callback interfaces).
+    MessageHandler handler =
+      std::tr1::bind(method,
+                     dynamic_cast<T*>(this),
+                     std::tr1::placeholders::_1,
+                     std::tr1::placeholders::_2);
+    install(name, handler);
+  }
+
+  // Delegate incoming message's with the specified name to pid.
+  void delegate(const std::string& name, const UPID& pid)
   {
-    MessageHandler handler = std::tr1::bind(method, dynamic_cast<T*>(this));
-    installMessageHandler(name, handler);
+    delegates[name] = pid;
   }
 
-  typedef std::tr1::function<Promise<HttpResponse>(const HttpRequest&)>
-    HttpRequestHandler;
+  // The default visit implementation for HTTP events invokes
+  // installed HTTP handlers. A HTTP handler is any function which
+  // takes an HttpRequest object and returns and HttpResponse.
+  typedef std::tr1::function<Future<HttpResponse>(const HttpRequest&)>
+  HttpRequestHandler;
 
-  // Install a handler for an HTTP request.
-  void installHttpHandler(
+  // Setup a handler for an HTTP request.
+  void route(
       const std::string& name,
       const HttpRequestHandler& handler)
   {
-    httpHandlers[name] = handler;
+    handlers.http[name] = handler;
   }
 
   template <typename T>
-  void installHttpHandler(
+  void route(
       const std::string& name,
-      Promise<HttpResponse> (T::*method)(const HttpRequest&))
+      Future<HttpResponse> (T::*method)(const HttpRequest&))
   {
+    // Note that we use dynamic_cast here so a process can use
+    // multiple inheritance if it sees so fit (e.g., to implement
+    // multiple callback interfaces).
     HttpRequestHandler handler =
       std::tr1::bind(method, dynamic_cast<T*>(this),
                      std::tr1::placeholders::_1);
-    installHttpHandler(name, handler);
+    route(name, handler);
   }
 
 private:
   friend class SocketManager;
   friend class ProcessManager;
   friend class ProcessReference;
-  friend void* schedule(void *);
+  friend void* schedule(void*);
 
   // Process states.
-  enum { INIT,
-	 READY,
+  enum { BOTTOM,
+         READY,
 	 RUNNING,
-	 RECEIVING,
-	 SERVING,
-	 PAUSED,
-	 POLLING,
-	 WAITING,
-	 INTERRUPTED,
-	 TIMEDOUT,
-         FINISHING,
+         BLOCKED,
 	 FINISHED } state;
 
-  // Lock/mutex protecting internals.
+  // Mutex protecting internals. TODO(benh): Replace with a spinlock.
   pthread_mutex_t m;
   void lock() { pthread_mutex_lock(&m); }
   void unlock() { pthread_mutex_unlock(&m); }
 
-  // Enqueue the specified message, request, or dispatcher.
-  void enqueue(Message* message, bool inject = false);
-  void enqueue(std::pair<HttpRequest*, Promise<HttpResponse>*>* request);
-  void enqueue(std::tr1::function<void(ProcessBase*)>* dispatcher);
-
-  // Dequeue a message, request, or dispatcher, or returns NULL.
-  template <typename T> T* dequeue();
+  // Enqueue the specified message, request, or function call.
+  void enqueue(Event* event, bool inject = false);
 
-  // Queue of received messages.
-  std::deque<Message*> messages;
-
-  // Queue of HTTP requests (with the promise used for responses).
-  std::deque<std::pair<HttpRequest*, Promise<HttpResponse>*>*> requests;
-
-  // Queue of dispatchers.
-  std::deque<std::tr1::function<void(ProcessBase*)>*> dispatchers;
+  // Queue of received events.
+  std::deque<Event*> events;
 
   // Delegates for messages.
   std::map<std::string, UPID> delegates;
 
-  // Handlers for messages.
-  std::map<std::string, MessageHandler> messageHandlers;
-
-  // Handlers for HTTP requests.
-  std::map<std::string, HttpRequestHandler> httpHandlers;
-
-  // Current message.
-  Message* current;
+  // Handlers for messages and HTTP requests.
+  struct {
+    std::map<std::string, MessageHandler> message;
+    std::map<std::string, HttpRequestHandler> http;
+  } handlers;
 
   // Active references.
   int refs;
 
-  // Current "blocking" generation.
-  int generation;
-
   // Process PID.
   UPID pid;
-
-  // Continuation/Context of process.
-  ucontext_t uctx;
 };
 
 
@@ -257,10 +208,12 @@ void initialize(bool initialize_google_l
  * @param process process to be spawned
  * @param manage boolean whether process should get garbage collected
  */
+UPID spawn(ProcessBase* process, bool manage = false);
+
 template <typename T>
 PID<T> spawn(T* t, bool manage = false)
 {
-  if (!ProcessBase::spawn(t, manage)) {
+  if (!spawn(static_cast<ProcessBase*>(t), manage)) {
     return PID<T>();
   }
 
@@ -301,23 +254,6 @@ bool wait(const ProcessBase* process, do
 
 
 /**
- * Invoke the thunk in a legacy safe way (i.e., outside of libprocess).
- *
- * @param thunk function to be invoked
- */
-void invoke(const std::tr1::function<void(void)>& thunk);
-
-
-/**
- * Use the specified filter on messages that get enqueued (note,
- * however, that you cannot filter timeout messages).
- *
- * @param filter message filter
- */
-void filter(Filter* filter);
-
-
-/**
  * Sends a message with data without a return address.
  *
  * @param to receiver

Modified: incubator/mesos/trunk/third_party/libprocess/include/process/protobuf.hpp
URL: http://svn.apache.org/viewvc/incubator/mesos/trunk/third_party/libprocess/include/process/protobuf.hpp?rev=1236485&r1=1236484&r2=1236485&view=diff
==============================================================================
--- incubator/mesos/trunk/third_party/libprocess/include/process/protobuf.hpp (original)
+++ incubator/mesos/trunk/third_party/libprocess/include/process/protobuf.hpp Fri Jan 27 01:25:13 2012
@@ -72,20 +72,15 @@ public:
   virtual ~ProtobufProcess() {}
 
 protected:
-  virtual void operator () ()
+  virtual void visit(const process::MessageEvent& event)
   {
-    // TODO(benh): Shouldn't we just make Process::serve be a virtual
-    // function, and then the one we get from process::Process will be
-    // sufficient?
-    do { if (serve() == process::TERMINATE) break; } while (true);
-  }
-
-  template <typename M>
-  M message()
-  {
-    M m;
-    m.ParseFromString(process::Process<T>::body());
-    return m;
+    if (protobufHandlers.count(event.message->name) > 0) {
+      from = event.message->from; // For 'reply'.
+      protobufHandlers[event.message->name](event.message->body);
+      from = process::UPID();
+    } else {
+      process::Process<T>::visit(event);
+    }
   }
 
   void send(const process::UPID& to,
@@ -99,20 +94,16 @@ protected:
 
   using process::Process<T>::send;
 
-  std::string serve(double secs = 0, bool once = false)
+  void reply(const google::protobuf::Message& message)
   {
-    do {
-      const std::string& name = process::Process<T>::serve(secs, once);
-      if (protobufHandlers.count(name) > 0) {
-        protobufHandlers[name](process::Process<T>::body());
-      } else {
-        return name;
-      }
-    } while (!once);
+    CHECK(from) << "Attempting to reply without a sender";
+    std::string data;
+    message.SerializeToString(&data);
+    send(from, message);
   }
 
   template <typename M>
-  void installProtobufHandler(void (T::*method)(const M&))
+  void install(void (T::*method)(const M&))
   {
     google::protobuf::Message* m = new M();
     T* t = static_cast<T*>(this);
@@ -124,7 +115,7 @@ protected:
   }
 
   template <typename M>
-  void installProtobufHandler(void (T::*method)())
+  void install(void (T::*method)())
   {
     google::protobuf::Message* m = new M();
     T* t = static_cast<T*>(this);
@@ -137,7 +128,7 @@ protected:
 
   template <typename M,
             typename P1, typename P1C>
-  void installProtobufHandler(void (T::*method)(P1C),
+  void install(void (T::*method)(P1C),
                               P1 (M::*param1)() const)
   {
     google::protobuf::Message* m = new M();
@@ -152,7 +143,7 @@ protected:
   template <typename M,
             typename P1, typename P1C,
             typename P2, typename P2C>
-  void installProtobufHandler(void (T::*method)(P1C, P2C),
+  void install(void (T::*method)(P1C, P2C),
                               P1 (M::*p1)() const,
                               P2 (M::*p2)() const)
   {
@@ -169,7 +160,7 @@ protected:
             typename P1, typename P1C,
             typename P2, typename P2C,
             typename P3, typename P3C>
-  void installProtobufHandler(void (T::*method)(P1C, P2C, P3C),
+  void install(void (T::*method)(P1C, P2C, P3C),
                               P1 (M::*p1)() const,
                               P2 (M::*p2)() const,
                               P3 (M::*p3)() const)
@@ -188,7 +179,7 @@ protected:
             typename P2, typename P2C,
             typename P3, typename P3C,
             typename P4, typename P4C>
-  void installProtobufHandler(void (T::*method)(P1C, P2C, P3C, P4C),
+  void install(void (T::*method)(P1C, P2C, P3C, P4C),
                               P1 (M::*p1)() const,
                               P2 (M::*p2)() const,
                               P3 (M::*p3)() const,
@@ -209,7 +200,7 @@ protected:
             typename P3, typename P3C,
             typename P4, typename P4C,
             typename P5, typename P5C>
-  void installProtobufHandler(void (T::*method)(P1C, P2C, P3C, P4C, P5C),
+  void install(void (T::*method)(P1C, P2C, P3C, P4C, P5C),
                               P1 (M::*p1)() const,
                               P2 (M::*p2)() const,
                               P3 (M::*p3)() const,
@@ -225,6 +216,10 @@ protected:
     delete m;
   }
 
+  using process::Process<T>::install;
+
+  process::UPID from; // Sender of "current" message, accessible by subclasses.
+
 private:
   template <typename M>
   static void handlerM(T* t, void (T::*method)(const M&),
@@ -374,18 +369,19 @@ public:
   ReqResProcess(const process::UPID& _pid, const Req& _req)
     : pid(_pid), req(_req)
   {
-    Super::template installProtobufHandler<Res>(
+    Super::template install<Res>(
         &ReqResProcess<Req, Res>::response);
   }
 
-  process::Promise<Res> run()
+  process::Future<Res> run()
   {
     send(pid, req);
     std::tr1::function<void(const process::Future<Res>&)> callback =
       std::tr1::bind(&ReqResProcess<Req, Res>::terminate,
-                     std::tr1::placeholders::_1, Super::self());
+                     std::tr1::placeholders::_1,
+                     Super::self());
     promise.future().onAny(callback);
-    return promise;
+    return promise.future();
   }
 
 private:

Modified: incubator/mesos/trunk/third_party/libprocess/include/process/run.hpp
URL: http://svn.apache.org/viewvc/incubator/mesos/trunk/third_party/libprocess/include/process/run.hpp?rev=1236485&r1=1236484&r2=1236485&view=diff
==============================================================================
--- incubator/mesos/trunk/third_party/libprocess/include/process/run.hpp (original)
+++ incubator/mesos/trunk/third_party/libprocess/include/process/run.hpp Fri Jan 27 01:25:13 2012
@@ -1,162 +1,78 @@
 #ifndef __PROCESS_RUN_HPP__
 #define __PROCESS_RUN_HPP__
 
-#include <process/process.hpp>
+#include <tr1/memory> // TODO(benh): Replace shared_ptr with unique_ptr.
 
+#include <process/process.hpp>
+#include <process/preprocessor.hpp>
 
 namespace process {
 
-template <typename R>
-Future<R> run(R (*method)());
-
-
-template <typename R, typename P1, typename A1>
-Future<R> run(R (*method)(P1), A1 a1);
-
-
-template <typename R,
-          typename P1, typename P2,
-          typename A1, typename A2>
-Future<R> run(R (*method)(P1, P2), A1 a1, A2 a2);
-
-
-template <typename R,
-          typename P1, typename P2, typename P3,
-          typename A1, typename A2, typename A3>
-Future<R> run(R (*method)(P1, P2, P3), A1 a1, A2 a2, A3 a3);
-
-
-template <typename R,
-          typename P1, typename P2, typename P3, typename P4,
-          typename A1, typename A2, typename A3, typename A4>
-Future<R> run(R (*method)(P1, P2, P3, P4), A1 a1, A2 a2, A3 a3, A4 a4);
-
-
-template <typename R,
-          typename P1, typename P2, typename P3, typename P4, typename P5,
-          typename A1, typename A2, typename A3, typename A4, typename A5>
-Future<R> run(R (*method)(P1, P2, P3, P4, P5),
-              A1 a1, A2 a2, A3 a3, A4 a4, A5 a5);
-
-
 namespace internal {
 
 template <typename R>
 class ThunkProcess : public Process<ThunkProcess<R> >
 {
 public:
-  ThunkProcess(const std::tr1::function<R(void)>& _thunk,
-               const Promise<R>& _promise)
+  ThunkProcess(std::tr1::shared_ptr<std::tr1::function<R(void)> > _thunk,
+               std::tr1::shared_ptr<Promise<R> > _promise)
     : thunk(_thunk),
       promise(_promise) {}
 
   virtual ~ThunkProcess() {}
 
 protected:
-  virtual void operator () ()
+  virtual void serve(const Event& event)
   {
-    promise.set(thunk());
+    promise->set((*thunk)());
   }
 
 private:
-  std::tr1::function<R(void)> thunk;
-  Promise<R> promise;
+  std::tr1::shared_ptr<std::tr1::function<R(void)> > thunk;
+  std::tr1::shared_ptr<Promise<R> > promise;
 };
 
 } // namespace internal {
 
 
 template <typename R>
-Future<R> run(R (*method)())
+Future<R> run(R (*method)(void))
 {
-  std::tr1::function<R(void)> thunk =
-    std::tr1::bind(method);
-
-  Promise<R> promise;
-
-  spawn(new internal::ThunkProcess<R>(thunk, promise), true);
-
-  return promise.future();
+  std::tr1::shared_ptr<std::tr1::function<R(void)> > thunk(
+      new std::tr1::function<R(void)>(
+          std::tr1::bind(method)));
+
+  std::tr1::shared_ptr<Promise<R> > promise(new Promise<R>());
+  Future<R> future = promise->future();
+
+  terminate(spawn(new internal::ThunkProcess<R>(thunk, promise), true));
+
+  return future;
 }
 
 
-template <typename R, typename P1, typename A1>
-Future<R> run(R (*method)(P1), A1 a1)
-{
-  std::tr1::function<R(void)> thunk =
-    std::tr1::bind(method, a1);
-
-  Promise<R> promise;
-
-  spawn(new internal::ThunkProcess<R>(thunk, promise), true);
-
-  return promise.future();
-}
-
-
-template <typename R,
-          typename P1, typename P2,
-          typename A1, typename A2>
-Future<R> run(R (*method)(P1, P2), A1 a1, A2 a2)
-{
-  std::tr1::function<R(void)> thunk =
-    std::tr1::bind(method, a1, a2);
-
-  Promise<R> promise;
-
-  spawn(new internal::ThunkProcess<R>(thunk, promise), true);
-
-  return promise.future();
-}
-
-
-template <typename R,
-          typename P1, typename P2, typename P3,
-          typename A1, typename A2, typename A3>
-Future<R> run(R (*method)(P1, P2, P3), A1 a1, A2 a2, A3 a3)
-{
-  std::tr1::function<R(void)> thunk =
-    std::tr1::bind(method, a1, a2, a3);
-
-  Promise<R> promise;
-
-  spawn(new internal::ThunkProcess<R>(thunk, promise), true);
-
-  return promise.future();
-}
-
-
-template <typename R,
-          typename P1, typename P2, typename P3, typename P4,
-          typename A1, typename A2, typename A3, typename A4>
-Future<R> run(R (*method)(P1, P2, P3, P4), A1 a1, A2 a2, A3 a3, A4 a4)
-{
-  std::tr1::function<R(void)> thunk =
-    std::tr1::bind(method, a1, a2, a3, a4);
-
-  Promise<R> promise;
-
-  spawn(new internal::ThunkProcess<R>(thunk, promise), true);
-
-  return promise.future();
-}
-
-
-template <typename R,
-          typename P1, typename P2, typename P3, typename P4, typename P5,
-          typename A1, typename A2, typename A3, typename A4, typename A5>
-Future<R> run(R (*method)(P1, P2, P3, P4, P5),
-              A1 a1, A2 a2, A3 a3, A4 a4, A5 a5)
-{
-  std::tr1::function<R(void)> thunk =
-    std::tr1::bind(method, a1, a2, a3, a4, a5);
-
-  Promise<R> promise;
-
-  spawn(new internal::ThunkProcess<R>(thunk, promise), true);
+#define TEMPLATE(Z, N, DATA)                                            \
+  template <typename R,                                                 \
+            ENUM_PARAMS(N, typename P),                                 \
+            ENUM_PARAMS(N, typename A)>                                 \
+  Future<R> run(                                                        \
+      R (*method)(ENUM_PARAMS(N, P)),                                   \
+      ENUM_BINARY_PARAMS(N, A, a))                                      \
+  {                                                                     \
+    std::tr1::shared_ptr<std::tr1::function<R(void)> > thunk(           \
+        new std::tr1::function<R(void)>(                                \
+            std::tr1::bind(method, ENUM_PARAMS(N, a))));                \
+                                                                        \
+    std::tr1::shared_ptr<Promise<R> > promise(new Promise<R>());        \
+    Future<R> future = promise->future();                               \
+                                                                        \
+    terminate(spawn(new internal::ThunkProcess<R>(thunk, promise), true)); \
+                                                                        \
+    return future;                                                      \
+  }
 
-  return promise.future();
-}
+  REPEAT_FROM_TO(1, 11, TEMPLATE, _) // Args A0 -> A9.
+#undef TEMPLATE
 
 } // namespace process {
 

Modified: incubator/mesos/trunk/third_party/libprocess/include/process/timer.hpp
URL: http://svn.apache.org/viewvc/incubator/mesos/trunk/third_party/libprocess/include/process/timer.hpp?rev=1236485&r1=1236484&r2=1236485&view=diff
==============================================================================
--- incubator/mesos/trunk/third_party/libprocess/include/process/timer.hpp (original)
+++ incubator/mesos/trunk/third_party/libprocess/include/process/timer.hpp Fri Jan 27 01:25:13 2012
@@ -4,100 +4,131 @@
 #include <process/dispatch.hpp>
 #include <process/process.hpp>
 
-
 namespace process {
 
-class TimerProcess;
+// Timer support! Note that we don't store a pointer to the issuing
+// process (if there is one) because we can't dereference it because
+// it might no longer be valid. (Instead we can use the PID to check
+// if the issuing process is still valid and get a refernce to it).
+
+struct timer
+{
+  long id;
+  double timeout;
+  process::UPID pid;
+  std::tr1::function<void(void)> thunk;
+};
+
 
-class Timer
+inline bool operator == (const timer& left, const timer& right)
 {
-public:
-  Timer(double secs, const UPID& pid, internal::Dispatcher* dispatcher);
+  return left.id == right.id;
+}
 
-  virtual ~Timer();
 
-  void cancel();
+namespace timers {
 
-private:
-  PID<TimerProcess> timer;
-};
+timer create(double secs, const std::tr1::function<void(void)>& thunk);
+void cancel(const timer& timer);
+
+} // namespace timers {
 
 
 // Delay a dispatch to a process. Returns a timer which can attempted
 // to be canceled if desired (but might be firing concurrently).
 
 template <typename T>
-Timer delay(double secs,
+timer delay(double secs,
             const PID<T>& pid,
             void (T::*method)())
 {
-  std::tr1::function<void(T*)> thunk =
-    std::tr1::bind(method, std::tr1::placeholders::_1);
+  std::tr1::shared_ptr<std::tr1::function<void(T*)> > thunk(
+      new std::tr1::function<void(T*)>(
+          std::tr1::bind(method, std::tr1::placeholders::_1)));
+
+  std::tr1::function<void(ProcessBase*)>* dispatcher =
+    new std::tr1::function<void(ProcessBase*)>(
+        std::tr1::bind(&internal::vdispatcher<T>,
+                       std::tr1::placeholders::_1,
+                       thunk));
 
-  internal::Dispatcher* dispatcher = new internal::Dispatcher(
-      std::tr1::bind(&internal::vdispatcher<T>,
-                     std::tr1::placeholders::_1,
-                     thunk));
+  std::tr1::function<void(void)> dispatch =
+    std::tr1::bind(internal::dispatch, pid, dispatcher);
 
-  return Timer(secs, pid, dispatcher);
+  return timers::create(secs, dispatch);
 }
 
 
 template <typename T, typename P1, typename A1>
-Timer delay(double secs,
+timer delay(double secs,
             const PID<T>& pid,
             void (T::*method)(P1),
             A1 a1)
 {
-  std::tr1::function<void(T*)> thunk =
-    std::tr1::bind(method, std::tr1::placeholders::_1, a1);
+  std::tr1::shared_ptr<std::tr1::function<void(T*)> > thunk(
+      new std::tr1::function<void(T*)>(
+          std::tr1::bind(method, std::tr1::placeholders::_1, a1)));
+
+  std::tr1::function<void(ProcessBase*)>* dispatcher =
+    new std::tr1::function<void(ProcessBase*)>(
+        std::tr1::bind(&internal::vdispatcher<T>,
+                       std::tr1::placeholders::_1,
+                       thunk));
 
-  internal::Dispatcher* dispatcher = new internal::Dispatcher(
-      std::tr1::bind(&internal::vdispatcher<T>,
-                     std::tr1::placeholders::_1,
-                     thunk));
+  std::tr1::function<void(void)> dispatch =
+    std::tr1::bind(internal::dispatch, pid, dispatcher);
 
-  return Timer(secs, pid, dispatcher);
+  return timers::create(secs, dispatch);
 }
 
 
 template <typename T,
           typename P1, typename P2,
           typename A1, typename A2>
-Timer delay(double secs,
+timer delay(double secs,
             const PID<T>& pid,
             void (T::*method)(P1, P2),
             A1 a1, A2 a2)
 {
-  std::tr1::function<void(T*)> thunk =
-    std::tr1::bind(method, std::tr1::placeholders::_1, a1, a2);
+  std::tr1::shared_ptr<std::tr1::function<void(T*)> > thunk(
+      new std::tr1::function<void(T*)>(
+          std::tr1::bind(method, std::tr1::placeholders::_1, a1, a2)));
+
+  std::tr1::function<void(ProcessBase*)>* dispatcher =
+    new std::tr1::function<void(ProcessBase*)>(
+        std::tr1::bind(&internal::vdispatcher<T>,
+                       std::tr1::placeholders::_1,
+                       thunk));
 
-  internal::Dispatcher* dispatcher = new internal::Dispatcher(
-      std::tr1::bind(&internal::vdispatcher<T>,
-                     std::tr1::placeholders::_1,
-                     thunk));
+  std::tr1::function<void(void)> dispatch =
+    std::tr1::bind(internal::dispatch, pid, dispatcher);
 
-  return Timer(secs, pid, dispatcher);
+  return timers::create(secs, dispatch);
 }
 
 
 template <typename T,
           typename P1, typename P2, typename P3,
           typename A1, typename A2, typename A3>
-Timer delay(double secs,
+timer delay(double secs,
             const PID<T>& pid,
             void (T::*method)(P1, P2, P3),
             A1 a1, A2 a2, A3 a3)
 {
-  std::tr1::function<void(T*)> thunk =
-    std::tr1::bind(method, std::tr1::placeholders::_1, a1, a2, a3);
+  std::tr1::shared_ptr<std::tr1::function<void(T*)> > thunk(
+      new std::tr1::function<void(T*)>(
+          std::tr1::bind(method, std::tr1::placeholders::_1, a1, a2, a3)));
+
+  std::tr1::function<void(ProcessBase*)>* dispatcher =
+    new std::tr1::function<void(ProcessBase*)>(
+        std::tr1::bind(&internal::vdispatcher<T>,
+                       std::tr1::placeholders::_1,
+                       thunk));
 
-  internal::Dispatcher* dispatcher = new internal::Dispatcher(
-      std::tr1::bind(&internal::vdispatcher<T>,
-                     std::tr1::placeholders::_1,
-                     thunk));
+  std::tr1::function<void(void)> dispatch =
+    std::tr1::bind(internal::dispatch, pid, dispatcher);
 
-  return Timer(secs, pid, dispatcher);
+  return timers::create(secs, dispatch);
 }
 
 } // namespace process {

Modified: incubator/mesos/trunk/third_party/libprocess/src/encoder.hpp
URL: http://svn.apache.org/viewvc/incubator/mesos/trunk/third_party/libprocess/src/encoder.hpp?rev=1236485&r1=1236484&r2=1236485&view=diff
==============================================================================
--- incubator/mesos/trunk/third_party/libprocess/src/encoder.hpp (original)
+++ incubator/mesos/trunk/third_party/libprocess/src/encoder.hpp Fri Jan 27 01:25:13 2012
@@ -5,6 +5,8 @@
 
 #include <process/process.hpp>
 
+#include "foreach.hpp"
+
 
 namespace process {
 
@@ -132,7 +134,6 @@ public:
   }
 };
 
-
 }  // namespace process {
 
 #endif // __ENCODER_HPP__

Modified: incubator/mesos/trunk/third_party/libprocess/src/foreach.hpp
URL: http://svn.apache.org/viewvc/incubator/mesos/trunk/third_party/libprocess/src/foreach.hpp?rev=1236485&r1=1236484&r2=1236485&view=diff
==============================================================================
--- incubator/mesos/trunk/third_party/libprocess/src/foreach.hpp (original)
+++ incubator/mesos/trunk/third_party/libprocess/src/foreach.hpp Fri Jan 27 01:25:13 2012
@@ -1,8 +1,11 @@
-#ifndef FOREACH_HPP
-#define FOREACH_HPP
+#ifndef __FOREACH_HPP__
+#define __FOREACH_HPP__
 
 #include <boost/foreach.hpp>
 
+#include <boost/tuple/tuple.hpp>
+
+
 #define BOOST_FOREACH_PAIR(VARFIRST, VARSECOND, COL)                                            \
     BOOST_FOREACH_PREAMBLE()                                                                    \
     if (boost::foreach_detail_::auto_any_t _foreach_col = BOOST_FOREACH_CONTAIN(COL)) {} else   \
@@ -23,8 +26,10 @@
 #define foreach BOOST_FOREACH
 #define foreachpair BOOST_FOREACH_PAIR
 
-#include <boost/tuple/tuple.hpp>
+#define foreachkey(VAR, COL)                    \
+  foreachpair (VAR, boost::tuples::ignore, COL)
 
-const boost::tuples::detail::swallow_assign _ = boost::tuples::ignore;
+#define foreachvalue(VAR, COL)                  \
+  foreachpair (boost::tuples::ignore, VAR, COL)
 
-#endif /* FOREACH_HPP */
+#endif // __FOREACH_HPP__

Modified: incubator/mesos/trunk/third_party/libprocess/src/latch.cpp
URL: http://svn.apache.org/viewvc/incubator/mesos/trunk/third_party/libprocess/src/latch.cpp?rev=1236485&r1=1236484&r2=1236485&view=diff
==============================================================================
--- incubator/mesos/trunk/third_party/libprocess/src/latch.cpp (original)
+++ incubator/mesos/trunk/third_party/libprocess/src/latch.cpp Fri Jan 27 01:25:13 2012
@@ -9,13 +9,6 @@ namespace process {
 // within libprocess such that it doesn't cost a memory allocation, a
 // spawn, a message send, a wait, and two user-space context-switchs.
 
-class LatchProcess : public Process<LatchProcess>
-{
-protected:
-  virtual void operator () () { receive(); }
-};
-
-
 Latch::Latch()
 {
   triggered = false;
@@ -25,20 +18,20 @@ Latch::Latch()
   // deleting thread is holding. Hence, we only save the PID for
   // triggering the latch and let the GC actually do the deleting
   // (thus no waiting is necessary, and deadlocks are avoided).
-  latch = spawn(new LatchProcess(), true);
+  pid = spawn(new ProcessBase(), true);
 }
 
 
 Latch::~Latch()
 {
-  post(latch, TERMINATE);
+  terminate(pid);
 }
 
 
 void Latch::trigger()
 {
   if (!triggered) {
-    post(latch, TERMINATE);
+    terminate(pid);
     triggered = true;
   }
 }
@@ -47,7 +40,7 @@ void Latch::trigger()
 bool Latch::await(double secs)
 {
   if (!triggered) {
-    return wait(latch, secs);
+    return wait(pid, secs);
   }
 
   return true;