You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@mesos.apache.org by be...@apache.org on 2013/05/29 19:41:05 UTC
[27/35] Renamed 'third_party' to '3rdparty'.
http://git-wip-us.apache.org/repos/asf/incubator-mesos/blob/71a01bd9/3rdparty/libprocess/include/process/delay.hpp
----------------------------------------------------------------------
diff --git a/3rdparty/libprocess/include/process/delay.hpp b/3rdparty/libprocess/include/process/delay.hpp
new file mode 100644
index 0000000..97acd76
--- /dev/null
+++ b/3rdparty/libprocess/include/process/delay.hpp
@@ -0,0 +1,119 @@
+#ifndef __PROCESS_DELAY_HPP__
+#define __PROCESS_DELAY_HPP__
+
+#include <tr1/functional>
+
+#include <process/dispatch.hpp>
+#include <process/timer.hpp>
+
+#include <stout/duration.hpp>
+#include <stout/preprocessor.hpp>
+
+namespace process {
+
+// The 'delay' mechanism enables you to delay a dispatch to a process
+// for some specified number of seconds. Returns a Timer instance that
+// can be cancelled (but it might have already executed or be
+// executing concurrently).
+
+template <typename T>
+Timer delay(const Duration& duration,
+ const PID<T>& pid,
+ void (T::*method)())
+{
+ std::tr1::shared_ptr<std::tr1::function<void(T*)> > thunk(
+ new std::tr1::function<void(T*)>(
+ std::tr1::bind(method, std::tr1::placeholders::_1)));
+
+ std::tr1::shared_ptr<std::tr1::function<void(ProcessBase*)> > dispatcher(
+ new std::tr1::function<void(ProcessBase*)>(
+ std::tr1::bind(&internal::vdispatcher<T>,
+ std::tr1::placeholders::_1,
+ thunk)));
+
+ std::tr1::function<void(void)> dispatch =
+ std::tr1::bind(internal::dispatch,
+ pid,
+ dispatcher,
+ internal::canonicalize(method));
+
+ return Timer::create(duration, dispatch);
+}
+
+
+template <typename T>
+Timer delay(const Duration& duration,
+ const Process<T>& process,
+ void (T::*method)())
+{
+ return delay(duration, process.self(), method);
+}
+
+
+template <typename T>
+Timer delay(const Duration& duration,
+ const Process<T>* process,
+ void (T::*method)())
+{
+ return delay(duration, process->self(), method);
+}
+
+
+#define TEMPLATE(Z, N, DATA) \
+ template <typename T, \
+ ENUM_PARAMS(N, typename P), \
+ ENUM_PARAMS(N, typename A)> \
+ Timer delay(const Duration& duration, \
+ const PID<T>& pid, \
+ void (T::*method)(ENUM_PARAMS(N, P)), \
+ ENUM_BINARY_PARAMS(N, A, a)) \
+ { \
+ std::tr1::shared_ptr<std::tr1::function<void(T*)> > thunk( \
+ new std::tr1::function<void(T*)>( \
+ std::tr1::bind(method, \
+ std::tr1::placeholders::_1, \
+ ENUM_PARAMS(N, a)))); \
+ \
+ std::tr1::shared_ptr<std::tr1::function<void(ProcessBase*)> > dispatcher( \
+ new std::tr1::function<void(ProcessBase*)>( \
+ std::tr1::bind(&internal::vdispatcher<T>, \
+ std::tr1::placeholders::_1, \
+ thunk))); \
+ \
+ std::tr1::function<void(void)> dispatch = \
+ std::tr1::bind(internal::dispatch, \
+ pid, \
+ dispatcher, \
+ internal::canonicalize(method)); \
+ \
+ return Timer::create(duration, dispatch); \
+ } \
+ \
+ template <typename T, \
+ ENUM_PARAMS(N, typename P), \
+ ENUM_PARAMS(N, typename A)> \
+ Timer delay(const Duration& duration, \
+ const Process<T>& process, \
+ void (T::*method)(ENUM_PARAMS(N, P)), \
+ ENUM_BINARY_PARAMS(N, A, a)) \
+ { \
+ return delay(duration, process.self(), method, ENUM_PARAMS(N, a)); \
+ } \
+ \
+ template <typename T, \
+ ENUM_PARAMS(N, typename P), \
+ ENUM_PARAMS(N, typename A)> \
+ Timer delay(const Duration& duration, \
+ const Process<T>* process, \
+ void (T::*method)(ENUM_PARAMS(N, P)), \
+ ENUM_BINARY_PARAMS(N, A, a)) \
+ { \
+ return delay(duration, process->self(), method, ENUM_PARAMS(N, a)); \
+ }
+
+ REPEAT_FROM_TO(1, 11, TEMPLATE, _) // Args A0 -> A9.
+#undef TEMPLATE
+
+} // namespace process {
+
+#endif // __PROCESS_DELAY_HPP__
http://git-wip-us.apache.org/repos/asf/incubator-mesos/blob/71a01bd9/3rdparty/libprocess/include/process/dispatch.hpp
----------------------------------------------------------------------
diff --git a/3rdparty/libprocess/include/process/dispatch.hpp b/3rdparty/libprocess/include/process/dispatch.hpp
new file mode 100644
index 0000000..b337a87
--- /dev/null
+++ b/3rdparty/libprocess/include/process/dispatch.hpp
@@ -0,0 +1,478 @@
+#ifndef __PROCESS_DISPATCH_HPP__
+#define __PROCESS_DISPATCH_HPP__
+
+#include <string>
+
+#include <tr1/functional>
+#include <tr1/memory> // TODO(benh): Replace all shared_ptr with unique_ptr.
+
+#include <process/process.hpp>
+
+#include <stout/preprocessor.hpp>
+
+namespace process {
+
+// The dispatch mechanism enables you to "schedule" a method to get
+// invoked on a process. The result of that method invocation is
+// accessible via the future that is returned by the dispatch method
+// (note, however, that it might not be the _same_ future as the one
+// returned from the method, if the method even returns a future, see
+// below). Assuming some class 'Fibonacci' has a (visible) method
+// named 'compute' that takes an integer, N (and returns the Nth
+// fibonacci number) you might use dispatch like so:
+//
+// PID<Fibonacci> pid = spawn(new Fibonacci(), true); // Use the GC.
+// Future<int> f = dispatch(pid, &Fibonacci::compute, 10);
+//
+// Because the pid argument is "typed" we can ensure that methods are
+// only invoked on processes that are actually of that type. Providing
+// this mechanism for varying numbers of function types and arguments
+// requires support for variadic templates, slated to be released in
+// C++11. Until then, we use the Boost preprocessor macros to
+// accomplish the same thing (all be it less cleanly). See below for
+// those definitions.
+//
+// Dispatching is done via a level of indirection. The dispatch
+// routine itself creates a promise that is passed as an argument to a
+// partially applied 'dispatcher' function (defined below). The
+// dispatcher routines get passed to the actual process via an
+// internal routine called, not suprisingly, 'dispatch', defined
+// below:
+
+namespace internal {
+
+// The internal dispatch routine schedules a function to get invoked
+// within the context of the process associated with the specified pid
+// (first argument), unless that process is no longer valid. Note that
+// this routine does not expect anything in particular about the
+// specified function (second argument). The semantics are simple: the
+// function gets applied/invoked with the process as its first
+// argument. Currently we wrap the function in a shared_ptr but this
+// will probably change in the future to unique_ptr (or a variant).
+void dispatch(
+ const UPID& pid,
+ const std::tr1::shared_ptr<std::tr1::function<void(ProcessBase*)> >& f,
+ const std::string& method = std::string());
+
+// For each return type (void, future, value) there is a dispatcher
+// function which should complete the picture. Given the process
+// argument these routines downcast the process to the correct subtype
+// and invoke the thunk using the subtype as the argument
+// (receiver). Note that we must use dynamic_cast because we permit a
+// process to use multiple inheritance (e.g., to expose multiple
+// callback interfaces).
+
+template <typename T>
+void vdispatcher(
+ ProcessBase* process,
+ std::tr1::shared_ptr<std::tr1::function<void(T*)> > thunk)
+{
+ assert(process != NULL);
+ T* t = dynamic_cast<T*>(process);
+ assert(t != NULL);
+ (*thunk)(t);
+}
+
+
+template <typename R, typename T>
+void pdispatcher(
+ ProcessBase* process,
+ std::tr1::shared_ptr<std::tr1::function<Future<R>(T*)> > thunk,
+ std::tr1::shared_ptr<Promise<R> > promise)
+{
+ assert(process != NULL);
+ T* t = dynamic_cast<T*>(process);
+ assert(t != NULL);
+ promise->associate((*thunk)(t));
+}
+
+
+template <typename R, typename T>
+void rdispatcher(
+ ProcessBase* process,
+ std::tr1::shared_ptr<std::tr1::function<R(T*)> > thunk,
+ std::tr1::shared_ptr<Promise<R> > promise)
+{
+ assert(process != NULL);
+ T* t = dynamic_cast<T*>(process);
+ assert(t != NULL);
+ promise->set((*thunk)(t));
+}
+
+
+// Canonicalizes a pointer to a member function (i.e., method) into a
+// bytes representation for comparison (e.g., in tests).
+template <typename Method>
+std::string canonicalize(Method method)
+{
+ return std::string(reinterpret_cast<const char*>(&method), sizeof(method));
+}
+
+} // namespace internal {
+
+
+// Okay, now for the definition of the dispatch routines
+// themselves. For each routine we provide the version in C++11 using
+// variadic templates so the reader can see what the Boost
+// preprocessor macros are effectively providing. Using C++11 closures
+// would shorten these definitions even more.
+//
+// First, definitions of dispatch for methods returning void:
+//
+// template <typename T, typename ...P>
+// void dispatch(
+// const PID<T>& pid,
+// void (T::*method)(P...),
+// P... p)
+// {
+// std::tr1::shared_ptr<std::tr1::function<void(T*)> > thunk(
+// new std::tr1::function<void(T*)>(
+// std::tr1::bind(method,
+// std::tr1::placeholders::_1,
+// std::forward<P>(p)...)));
+//
+// std::tr1::shared_ptr<std::tr1::function<void(ProcessBase*)> > dispatcher(
+// new std::tr1::function<void(ProcessBase*)>(
+// std::tr1::bind(&internal::vdispatcher<T>,
+// std::tr1::placeholders::_1,
+// thunk)));
+//
+// internal::dispatch(pid, dispatcher, internal::canonicalize(method));
+// }
+
+template <typename T>
+void dispatch(
+ const PID<T>& pid,
+ void (T::*method)(void))
+{
+ std::tr1::shared_ptr<std::tr1::function<void(T*)> > thunk(
+ new std::tr1::function<void(T*)>(
+ std::tr1::bind(method, std::tr1::placeholders::_1)));
+
+ std::tr1::shared_ptr<std::tr1::function<void(ProcessBase*)> > dispatcher(
+ new std::tr1::function<void(ProcessBase*)>(
+ std::tr1::bind(&internal::vdispatcher<T>,
+ std::tr1::placeholders::_1,
+ thunk)));
+
+ internal::dispatch(pid, dispatcher, internal::canonicalize(method));
+}
+
+template <typename T>
+void dispatch(
+ const Process<T>& process,
+ void (T::*method)(void))
+{
+ dispatch(process.self(), method);
+}
+
+template <typename T>
+void dispatch(
+ const Process<T>* process,
+ void (T::*method)(void))
+{
+ dispatch(process->self(), method);
+}
+
+#define TEMPLATE(Z, N, DATA) \
+ template <typename T, \
+ ENUM_PARAMS(N, typename P), \
+ ENUM_PARAMS(N, typename A)> \
+ void dispatch( \
+ const PID<T>& pid, \
+ void (T::*method)(ENUM_PARAMS(N, P)), \
+ ENUM_BINARY_PARAMS(N, A, a)) \
+ { \
+ std::tr1::shared_ptr<std::tr1::function<void(T*)> > thunk( \
+ new std::tr1::function<void(T*)>( \
+ std::tr1::bind(method, \
+ std::tr1::placeholders::_1, \
+ ENUM_PARAMS(N, a)))); \
+ \
+ std::tr1::shared_ptr<std::tr1::function<void(ProcessBase*)> > dispatcher( \
+ new std::tr1::function<void(ProcessBase*)>( \
+ std::tr1::bind(&internal::vdispatcher<T>, \
+ std::tr1::placeholders::_1, \
+ thunk))); \
+ \
+ internal::dispatch(pid, dispatcher, internal::canonicalize(method)); \
+ } \
+ \
+ template <typename T, \
+ ENUM_PARAMS(N, typename P), \
+ ENUM_PARAMS(N, typename A)> \
+ void dispatch( \
+ const Process<T>& process, \
+ void (T::*method)(ENUM_PARAMS(N, P)), \
+ ENUM_BINARY_PARAMS(N, A, a)) \
+ { \
+ dispatch(process.self(), method, ENUM_PARAMS(N, a)); \
+ } \
+ \
+ template <typename T, \
+ ENUM_PARAMS(N, typename P), \
+ ENUM_PARAMS(N, typename A)> \
+ void dispatch( \
+ const Process<T>* process, \
+ void (T::*method)(ENUM_PARAMS(N, P)), \
+ ENUM_BINARY_PARAMS(N, A, a)) \
+ { \
+ dispatch(process->self(), method, ENUM_PARAMS(N, a)); \
+ }
+
+ REPEAT_FROM_TO(1, 11, TEMPLATE, _) // Args A0 -> A9.
+#undef TEMPLATE
+
+
+// Next, definitions of methods returning a future:
+//
+// template <typename R, typename T, typename ...P>
+// Future<R> dispatch(
+// const PID<T>& pid,
+// Future<R> (T::*method)(P...),
+// P... p)
+// {
+// std::tr1::shared_ptr<std::tr1::function<Future<R>(T*)> > thunk(
+// new std::tr1::function<Future<R>(T*)>(
+// std::tr1::bind(method,
+// std::tr1::placeholders::_1,
+// std::forward<P>(p)...)));
+//
+// std::tr1::shared_ptr<Promise<R> > promise(new Promise<R>());
+// Future<R> future = promise->future();
+//
+// std::tr1::shared_ptr<std::tr1::function<void(ProcessBase*)> > dispatcher(
+// new std::tr1::function<void(ProcessBase*)>(
+// std::tr1::bind(&internal::pdispatcher<R, T>,
+// std::tr1::placeholders::_1,
+// thunk, promise)));
+//
+// internal::dispatch(pid, dispatcher, internal::canonicalize(method));
+//
+// return future;
+// }
+
+template <typename R, typename T>
+Future<R> dispatch(
+ const PID<T>& pid,
+ Future<R> (T::*method)(void))
+{
+ std::tr1::shared_ptr<std::tr1::function<Future<R>(T*)> > thunk(
+ new std::tr1::function<Future<R>(T*)>(
+ std::tr1::bind(method, std::tr1::placeholders::_1)));
+
+ std::tr1::shared_ptr<Promise<R> > promise(new Promise<R>());
+ Future<R> future = promise->future();
+
+ std::tr1::shared_ptr<std::tr1::function<void(ProcessBase*)> > dispatcher(
+ new std::tr1::function<void(ProcessBase*)>(
+ std::tr1::bind(&internal::pdispatcher<R, T>,
+ std::tr1::placeholders::_1,
+ thunk, promise)));
+
+ internal::dispatch(pid, dispatcher, internal::canonicalize(method));
+
+ return future;
+}
+
+template <typename R, typename T>
+Future<R> dispatch(
+ const Process<T>& process,
+ Future<R> (T::*method)(void))
+{
+ return dispatch(process.self(), method);
+}
+
+template <typename R, typename T>
+Future<R> dispatch(
+ const Process<T>* process,
+ Future<R> (T::*method)(void))
+{
+ return dispatch(process->self(), method);
+}
+
+#define TEMPLATE(Z, N, DATA) \
+ template <typename R, \
+ typename T, \
+ ENUM_PARAMS(N, typename P), \
+ ENUM_PARAMS(N, typename A)> \
+ Future<R> dispatch( \
+ const PID<T>& pid, \
+ Future<R> (T::*method)(ENUM_PARAMS(N, P)), \
+ ENUM_BINARY_PARAMS(N, A, a)) \
+ { \
+ std::tr1::shared_ptr<std::tr1::function<Future<R>(T*)> > thunk( \
+ new std::tr1::function<Future<R>(T*)>( \
+ std::tr1::bind(method, \
+ std::tr1::placeholders::_1, \
+ ENUM_PARAMS(N, a)))); \
+ \
+ std::tr1::shared_ptr<Promise<R> > promise(new Promise<R>()); \
+ Future<R> future = promise->future(); \
+ \
+ std::tr1::shared_ptr<std::tr1::function<void(ProcessBase*)> > dispatcher( \
+ new std::tr1::function<void(ProcessBase*)>( \
+ std::tr1::bind(&internal::pdispatcher<R, T>, \
+ std::tr1::placeholders::_1, \
+ thunk, promise))); \
+ \
+ internal::dispatch(pid, dispatcher, internal::canonicalize(method)); \
+ \
+ return future; \
+ } \
+ \
+ template <typename R, \
+ typename T, \
+ ENUM_PARAMS(N, typename P), \
+ ENUM_PARAMS(N, typename A)> \
+ Future<R> dispatch( \
+ const Process<T>& process, \
+ Future<R> (T::*method)(ENUM_PARAMS(N, P)), \
+ ENUM_BINARY_PARAMS(N, A, a)) \
+ { \
+ return dispatch(process.self(), method, ENUM_PARAMS(N, a)); \
+ } \
+ \
+ template <typename R, \
+ typename T, \
+ ENUM_PARAMS(N, typename P), \
+ ENUM_PARAMS(N, typename A)> \
+ Future<R> dispatch( \
+ const Process<T>* process, \
+ Future<R> (T::*method)(ENUM_PARAMS(N, P)), \
+ ENUM_BINARY_PARAMS(N, A, a)) \
+ { \
+ return dispatch(process->self(), method, ENUM_PARAMS(N, a)); \
+ }
+
+ REPEAT_FROM_TO(1, 11, TEMPLATE, _) // Args A0 -> A9.
+#undef TEMPLATE
+
+
+// Next, definitions of methods returning a value.
+//
+// template <typename R, typename T, typename ...P>
+// Future<R> dispatch(
+// const PID<T>& pid,
+// R (T::*method)(P...),
+// P... p)
+// {
+// std::tr1::shared_ptr<std::tr1::function<R(T*)> > thunk(
+// new std::tr1::function<R(T*)>(
+// std::tr1::bind(method,
+// std::tr1::placeholders::_1,
+// std::forward<P>(p)...)));
+//
+// std::tr1::shared_ptr<Promise<R> > promise(new Promise<R>());
+// Future<R> future = promise->future();
+//
+// std::tr1::shared_ptr<std::tr1::function<void(ProcessBase*)> > dispatcher(
+// new std::tr1::function<void(ProcessBase*)>(
+// std::tr1::bind(&internal::rdispatcher<R, T>,
+// std::tr1::placeholders::_1,
+// thunk, promise)));
+//
+// internal::dispatch(pid, dispatcher, internal::canonicalize(method));
+//
+// return future;
+// }
+
+template <typename R, typename T>
+Future<R> dispatch(
+ const PID<T>& pid,
+ R (T::*method)(void))
+{
+ std::tr1::shared_ptr<std::tr1::function<R(T*)> > thunk(
+ new std::tr1::function<R(T*)>(
+ std::tr1::bind(method, std::tr1::placeholders::_1)));
+
+ std::tr1::shared_ptr<Promise<R> > promise(new Promise<R>());
+ Future<R> future = promise->future();
+
+ std::tr1::shared_ptr<std::tr1::function<void(ProcessBase*)> > dispatcher(
+ new std::tr1::function<void(ProcessBase*)>(
+ std::tr1::bind(&internal::rdispatcher<R, T>,
+ std::tr1::placeholders::_1,
+ thunk, promise)));
+
+ internal::dispatch(pid, dispatcher, internal::canonicalize(method));
+
+ return future;
+}
+
+template <typename R, typename T>
+Future<R> dispatch(
+ const Process<T>& process,
+ R (T::*method)(void))
+{
+ return dispatch(process.self(), method);
+}
+
+template <typename R, typename T>
+Future<R> dispatch(
+ const Process<T>* process,
+ R (T::*method)(void))
+{
+ return dispatch(process->self(), method);
+}
+
+#define TEMPLATE(Z, N, DATA) \
+ template <typename R, \
+ typename T, \
+ ENUM_PARAMS(N, typename P), \
+ ENUM_PARAMS(N, typename A)> \
+ Future<R> dispatch( \
+ const PID<T>& pid, \
+ R (T::*method)(ENUM_PARAMS(N, P)), \
+ ENUM_BINARY_PARAMS(N, A, a)) \
+ { \
+ std::tr1::shared_ptr<std::tr1::function<R(T*)> > thunk( \
+ new std::tr1::function<R(T*)>( \
+ std::tr1::bind(method, \
+ std::tr1::placeholders::_1, \
+ ENUM_PARAMS(N, a)))); \
+ \
+ std::tr1::shared_ptr<Promise<R> > promise(new Promise<R>()); \
+ Future<R> future = promise->future(); \
+ \
+ std::tr1::shared_ptr<std::tr1::function<void(ProcessBase*)> > dispatcher( \
+ new std::tr1::function<void(ProcessBase*)>( \
+ std::tr1::bind(&internal::rdispatcher<R, T>, \
+ std::tr1::placeholders::_1, \
+ thunk, promise))); \
+ \
+ internal::dispatch(pid, dispatcher, internal::canonicalize(method)); \
+ \
+ return future; \
+ } \
+ \
+ template <typename R, \
+ typename T, \
+ ENUM_PARAMS(N, typename P), \
+ ENUM_PARAMS(N, typename A)> \
+ Future<R> dispatch( \
+ const Process<T>& process, \
+ R (T::*method)(ENUM_PARAMS(N, P)), \
+ ENUM_BINARY_PARAMS(N, A, a)) \
+ { \
+ return dispatch(process.self(), method, ENUM_PARAMS(N, a)); \
+ } \
+ \
+ template <typename R, \
+ typename T, \
+ ENUM_PARAMS(N, typename P), \
+ ENUM_PARAMS(N, typename A)> \
+ Future<R> dispatch( \
+ const Process<T>* process, \
+ R (T::*method)(ENUM_PARAMS(N, P)), \
+ ENUM_BINARY_PARAMS(N, A, a)) \
+ { \
+ return dispatch(process->self(), method, ENUM_PARAMS(N, a)); \
+ }
+
+ REPEAT_FROM_TO(1, 11, TEMPLATE, _) // Args A0 -> A9.
+#undef TEMPLATE
+
+} // namespace process {
+
+#endif // __PROCESS_DISPATCH_HPP__
http://git-wip-us.apache.org/repos/asf/incubator-mesos/blob/71a01bd9/3rdparty/libprocess/include/process/event.hpp
----------------------------------------------------------------------
diff --git a/3rdparty/libprocess/include/process/event.hpp b/3rdparty/libprocess/include/process/event.hpp
new file mode 100644
index 0000000..84a8790
--- /dev/null
+++ b/3rdparty/libprocess/include/process/event.hpp
@@ -0,0 +1,199 @@
+#ifndef __PROCESS_EVENT_HPP__
+#define __PROCESS_EVENT_HPP__
+
+#include <tr1/functional>
+#include <tr1/memory> // TODO(benh): Replace all shared_ptr with unique_ptr.
+
+#include <process/future.hpp>
+#include <process/http.hpp>
+#include <process/message.hpp>
+#include <process/socket.hpp>
+
+namespace process {
+
+// Forward declarations.
+struct ProcessBase;
+struct MessageEvent;
+struct DispatchEvent;
+struct HttpEvent;
+struct ExitedEvent;
+struct TerminateEvent;
+
+
+struct EventVisitor
+{
+ virtual ~EventVisitor() {}
+ virtual void visit(const MessageEvent& event) {}
+ virtual void visit(const DispatchEvent& event) {}
+ virtual void visit(const HttpEvent& event) {}
+ virtual void visit(const ExitedEvent& event) {}
+ virtual void visit(const TerminateEvent& event) {}
+};
+
+
+struct Event
+{
+ virtual ~Event() {}
+
+ virtual void visit(EventVisitor* visitor) const = 0;
+
+ template <typename T>
+ bool is() const
+ {
+ bool result = false;
+ struct IsVisitor : EventVisitor
+ {
+ IsVisitor(bool* _result) : result(_result) {}
+ virtual void visit(const T& t) { *result = true; }
+ bool* result;
+ } visitor(&result);
+ visit(&visitor);
+ return result;
+ }
+
+ template <typename T>
+ const T& as() const
+ {
+ const T* result = NULL;
+ struct AsVisitor : EventVisitor
+ {
+ AsVisitor(const T** _result) : result(_result) {}
+ virtual void visit(const T& t) { *result = &t; }
+ const T** result;
+ } visitor(&result);
+ visit(&visitor);
+ if (result == NULL) {
+ std::cerr << "Attempting to \"cast\" event incorrectly!" << std::endl;
+ abort();
+ }
+ return *result;
+ }
+};
+
+
+struct MessageEvent : Event
+{
+ MessageEvent(Message* _message)
+ : message(_message) {}
+
+ virtual ~MessageEvent()
+ {
+ delete message;
+ }
+
+ virtual void visit(EventVisitor* visitor) const
+ {
+ visitor->visit(*this);
+ }
+
+ Message* const message;
+
+private:
+ // Not copyable, not assignable.
+ MessageEvent(const MessageEvent&);
+ MessageEvent& operator = (const MessageEvent&);
+};
+
+
+struct HttpEvent : Event
+{
+ HttpEvent(const Socket& _socket, http::Request* _request)
+ : socket(_socket), request(_request) {}
+
+ virtual ~HttpEvent()
+ {
+ delete request;
+ }
+
+ virtual void visit(EventVisitor* visitor) const
+ {
+ visitor->visit(*this);
+ }
+
+ const Socket socket;
+ http::Request* const request;
+
+private:
+ // Not copyable, not assignable.
+ HttpEvent(const HttpEvent&);
+ HttpEvent& operator = (const HttpEvent&);
+};
+
+
+struct DispatchEvent : Event
+{
+ DispatchEvent(
+ const UPID& _pid,
+ const std::tr1::shared_ptr<std::tr1::function<void(ProcessBase*)> >& _f,
+ const std::string& _method)
+ : pid(_pid),
+ f(_f),
+ method(_method)
+ {}
+
+ virtual void visit(EventVisitor* visitor) const
+ {
+ visitor->visit(*this);
+ }
+
+ // PID receiving the dispatch.
+ const UPID pid;
+
+ // Function to get invoked as a result of this dispatch event.
+ const std::tr1::shared_ptr<std::tr1::function<void(ProcessBase*)> > f;
+
+ // Canonical "byte" representation of a pointer to a member function
+ // (i.e., method) encapsulated in the above function (or empty if
+ // not applicable). Note that we use a byte representation because a
+ // pointer to a member function is not actually a pointer, but
+ // instead a POD.
+ // TODO(benh): Perform canonicalization lazily.
+ const std::string method;
+
+private:
+ // Not copyable, not assignable.
+ DispatchEvent(const DispatchEvent&);
+ DispatchEvent& operator = (const DispatchEvent&);
+};
+
+
+struct ExitedEvent : Event
+{
+ ExitedEvent(const UPID& _pid)
+ : pid(_pid) {}
+
+ virtual void visit(EventVisitor* visitor) const
+ {
+ visitor->visit(*this);
+ }
+
+ const UPID pid;
+
+private:
+ // Not copyable, not assignable.
+ ExitedEvent(const ExitedEvent&);
+ ExitedEvent& operator = (const ExitedEvent&);
+};
+
+
+struct TerminateEvent : Event
+{
+ TerminateEvent(const UPID& _from)
+ : from(_from) {}
+
+ virtual void visit(EventVisitor* visitor) const
+ {
+ visitor->visit(*this);
+ }
+
+ const UPID from;
+
+private:
+ // Not copyable, not assignable.
+ TerminateEvent(const TerminateEvent&);
+ TerminateEvent& operator = (const TerminateEvent&);
+};
+
+} // namespace event {
+
+#endif // __PROCESS_EVENT_HPP__
http://git-wip-us.apache.org/repos/asf/incubator-mesos/blob/71a01bd9/3rdparty/libprocess/include/process/executor.hpp
----------------------------------------------------------------------
diff --git a/3rdparty/libprocess/include/process/executor.hpp b/3rdparty/libprocess/include/process/executor.hpp
new file mode 100644
index 0000000..72fb2f1
--- /dev/null
+++ b/3rdparty/libprocess/include/process/executor.hpp
@@ -0,0 +1,260 @@
+#ifndef __PROCESS_EXECUTOR_HPP__
+#define __PROCESS_EXECUTOR_HPP__
+
+#include <process/deferred.hpp>
+#include <process/dispatch.hpp>
+#include <process/id.hpp>
+#include <process/thread.hpp>
+
+#include <stout/preprocessor.hpp>
+
+namespace process {
+
+// Underlying "process" which handles invoking actual callbacks
+// created through an Executor.
+class ExecutorProcess : public Process<ExecutorProcess>
+{
+private:
+ friend class Executor;
+
+ ExecutorProcess() : ProcessBase(ID::generate("__executor__")) {}
+ virtual ~ExecutorProcess() {}
+
+ // Not copyable, not assignable.
+ ExecutorProcess(const ExecutorProcess&);
+ ExecutorProcess& operator = (const ExecutorProcess&);
+
+ // No arg invoke.
+ void invoke(const std::tr1::function<void(void)>& f) { f(); }
+
+ // Args invoke.
+#define TEMPLATE(Z, N, DATA) \
+ template <ENUM_PARAMS(N, typename A)> \
+ void CAT(invoke, N)( \
+ const std::tr1::function<void(ENUM_PARAMS(N, A))>& f, \
+ ENUM_BINARY_PARAMS(N, A, a)) \
+ { \
+ f(ENUM_PARAMS(N, a)); \
+ }
+
+ REPEAT_FROM_TO(1, 11, TEMPLATE, _) // Args A0 -> A9.
+#undef TEMPLATE
+};
+
+
+// Provides an abstraction that can take a standard function object
+// and convert it to a 'Deferred'. Each converted function object will
+// get invoked serially with respect to one another.
+class Executor
+{
+public:
+ Executor()
+ {
+ spawn(process);
+ }
+
+ ~Executor()
+ {
+ terminate(process);
+ wait(process);
+ }
+
+ void stop()
+ {
+ terminate(process);
+
+ // TODO(benh): Note that this doesn't wait because that could
+ // cause a deadlock ... thus, the semantics here are that no more
+ // dispatches will occur after this function returns but one may
+ // be occuring concurrently.
+ }
+
+ // We can't easily use 'std::tr1::_Placeholder<X>' when doing macro
+ // expansion via ENUM_BINARY_PARAMS because compilers don't like it
+ // when you try and concatenate '<' 'N' '>'. Thus, we typedef them.
+private:
+#define TEMPLATE(Z, N, DATA) \
+ typedef std::tr1::_Placeholder<INC(N)> _ ## N;
+
+ REPEAT(10, TEMPLATE, _)
+#undef TEMPLATE
+
+public:
+ // We provide wrappers for all standard function objects.
+ Deferred<void(void)> defer(
+ const std::tr1::function<void(void)>& f)
+ {
+ return Deferred<void(void)>(
+ std::tr1::bind(
+ &Executor::dispatcher,
+ process.self(), f));
+ }
+
+#define TEMPLATE(Z, N, DATA) \
+ template <ENUM_PARAMS(N, typename A)> \
+ Deferred<void(ENUM_PARAMS(N, A))> defer( \
+ const std::tr1::function<void(ENUM_PARAMS(N, A))>& f) \
+ { \
+ return Deferred<void(ENUM_PARAMS(N, A))>( \
+ std::tr1::bind( \
+ &Executor::CAT(dispatcher, N)<ENUM_PARAMS(N, A)>, \
+ process.self(), f, \
+ ENUM_BINARY_PARAMS(N, _, () INTERCEPT))); \
+ }
+
+ REPEAT_FROM_TO(1, 11, TEMPLATE, _) // Args A0 -> A9.
+#undef TEMPLATE
+
+ // Unfortunately, it is currently difficult to "forward" type
+ // information from one result to another, so we must explicilty
+ // define wrappers for all std::tr1::bind results. First we start
+ // with the non-member std::tr1::bind results.
+ Deferred<void(void)> defer(
+ const std::tr1::_Bind<void(*(void))(void)>& b)
+ {
+ return defer(std::tr1::function<void(void)>(b));
+ }
+
+#define TEMPLATE(Z, N, DATA) \
+ template <ENUM_PARAMS(N, typename A)> \
+ Deferred<void(ENUM_PARAMS(N, A))> defer( \
+ const std::tr1::_Bind< \
+ void(*(ENUM_PARAMS(N, _))) \
+ (ENUM_PARAMS(N, A))>& b) \
+ { \
+ return defer(std::tr1::function<void(ENUM_PARAMS(N, A))>(b)); \
+ }
+
+ REPEAT_FROM_TO(1, 11, TEMPLATE, _) // Args A0 -> A9.
+#undef TEMPLATE
+
+ // Now the member std::tr1::bind results:
+ // 1. Non-const member (function), non-const pointer (receiver).
+ // 2. Const member, non-const pointer.
+ // 3. Const member, const pointer.
+ // 4. Non-const member, non-const reference.
+ // 5. Const member, non-const reference.
+ // 6. Const member, const reference.
+ // 7. Non-const member, value.
+ // 8. Const member, value.
+#define TEMPLATE(Z, N, DATA) \
+ template <typename T ENUM_TRAILING_PARAMS(N, typename A)> \
+ Deferred<void(ENUM_PARAMS(N, A))> defer( \
+ const std::tr1::_Bind<std::tr1::_Mem_fn< \
+ void(T::*)(ENUM_PARAMS(N, A))> \
+ (T* ENUM_TRAILING_PARAMS(N, _))>& b) \
+ { \
+ return defer(std::tr1::function<void(ENUM_PARAMS(N, A))>(b)); \
+ } \
+ \
+ template <typename T ENUM_TRAILING_PARAMS(N, typename A)> \
+ Deferred<void(ENUM_PARAMS(N, A))> defer( \
+ const std::tr1::_Bind<std::tr1::_Mem_fn< \
+ void(T::*)(ENUM_PARAMS(N, A)) const> \
+ (T* ENUM_TRAILING_PARAMS(N, _))>& b) \
+ { \
+ return defer(std::tr1::function<void(ENUM_PARAMS(N, A))>(b)); \
+ } \
+ \
+ template <typename T ENUM_TRAILING_PARAMS(N, typename A)> \
+ Deferred<void(ENUM_PARAMS(N, A))> defer( \
+ const std::tr1::_Bind<std::tr1::_Mem_fn< \
+ void(T::*)(ENUM_PARAMS(N, A)) const> \
+ (const T* ENUM_TRAILING_PARAMS(N, _))>& b) \
+ { \
+ return defer(std::tr1::function<void(ENUM_PARAMS(N, A))>(b)); \
+ } \
+ \
+ template <typename T ENUM_TRAILING_PARAMS(N, typename A)> \
+ Deferred<void(ENUM_PARAMS(N, A))> defer( \
+ const std::tr1::_Bind<std::tr1::_Mem_fn< \
+ void(T::*)(ENUM_PARAMS(N, A))> \
+ (std::tr1::reference_wrapper<T> ENUM_TRAILING_PARAMS(N, _))>& b) \
+ { \
+ return defer(std::tr1::function<void(ENUM_PARAMS(N, A))>(b)); \
+ } \
+ \
+ template <typename T ENUM_TRAILING_PARAMS(N, typename A)> \
+ Deferred<void(ENUM_PARAMS(N, A))> defer( \
+ const std::tr1::_Bind<std::tr1::_Mem_fn< \
+ void(T::*)(ENUM_PARAMS(N, A)) const> \
+ (std::tr1::reference_wrapper<T> ENUM_TRAILING_PARAMS(N, _))>& b) \
+ { \
+ return defer(std::tr1::function<void(ENUM_PARAMS(N, A))>(b)); \
+ } \
+ \
+ template <typename T ENUM_TRAILING_PARAMS(N, typename A)> \
+ Deferred<void(ENUM_PARAMS(N, A))> defer( \
+ const std::tr1::_Bind<std::tr1::_Mem_fn< \
+ void(T::*)(ENUM_PARAMS(N, A)) const> \
+ (std::tr1::reference_wrapper<const T> ENUM_TRAILING_PARAMS(N, _))>& b) \
+ { \
+ return defer(std::tr1::function<void(ENUM_PARAMS(N, A))>(b)); \
+ } \
+ \
+ template <typename T ENUM_TRAILING_PARAMS(N, typename A)> \
+ Deferred<void(ENUM_PARAMS(N, A))> defer( \
+ const std::tr1::_Bind<std::tr1::_Mem_fn< \
+ void(T::*)(ENUM_PARAMS(N, A))> \
+ (T ENUM_TRAILING_PARAMS(N, _))>& b) \
+ { \
+ return defer(std::tr1::function<void(ENUM_PARAMS(N, A))>(b)); \
+ } \
+ \
+ template <typename T ENUM_TRAILING_PARAMS(N, typename A)> \
+ Deferred<void(ENUM_PARAMS(N, A))> defer( \
+ const std::tr1::_Bind<std::tr1::_Mem_fn< \
+ void(T::*)(ENUM_PARAMS(N, A)) const> \
+ (T ENUM_TRAILING_PARAMS(N, _))>& b) \
+ { \
+ return defer(std::tr1::function<void(ENUM_PARAMS(N, A))>(b)); \
+ }
+
+ REPEAT(11, TEMPLATE, _) // No args and args A0 -> A9.
+#undef TEMPLATE
+
+private:
+ // Not copyable, not assignable.
+ Executor(const Executor&);
+ Executor& operator = (const Executor&);
+
+ static void dispatcher(
+ const PID<ExecutorProcess>& pid,
+ const std::tr1::function<void(void)>& f)
+ {
+ // TODO(benh): Why not just use internal::dispatch?
+ dispatch(pid, &ExecutorProcess::invoke, f);
+ }
+
+#define TEMPLATE(Z, N, DATA) \
+ template <ENUM_PARAMS(N, typename A)> \
+ static void CAT(dispatcher, N)( \
+ const PID<ExecutorProcess>& pid, \
+ const std::tr1::function<void(ENUM_PARAMS(N, A))>& f, \
+ ENUM_BINARY_PARAMS(N, A, a)) \
+ { \
+ dispatch( \
+ pid, \
+ &ExecutorProcess::CAT(invoke, N)<ENUM_PARAMS(N, A)>, \
+ f, ENUM_PARAMS(N, a)); \
+ }
+
+ REPEAT_FROM_TO(1, 11, TEMPLATE, _) // Args A0 -> A9.
+#undef TEMPLATE
+
+ ExecutorProcess process;
+};
+
+
+// Per thread executor pointer. The extra level of indirection from
+// _executor_ to __executor__ is used in order to take advantage of
+// the ThreadLocal operators without needing the extra dereference as
+// well as lazily construct the actual executor.
+extern ThreadLocal<Executor>* _executor_;
+
+#define __executor__ \
+ (*_executor_ == NULL ? *_executor_ = new Executor() : *_executor_)
+
+} // namespace process {
+
+#endif // __PROCESS_EXECUTOR_HPP__
http://git-wip-us.apache.org/repos/asf/incubator-mesos/blob/71a01bd9/3rdparty/libprocess/include/process/filter.hpp
----------------------------------------------------------------------
diff --git a/3rdparty/libprocess/include/process/filter.hpp b/3rdparty/libprocess/include/process/filter.hpp
new file mode 100644
index 0000000..aa0c91b
--- /dev/null
+++ b/3rdparty/libprocess/include/process/filter.hpp
@@ -0,0 +1,24 @@
+#ifndef __PROCESS_FILTER_HPP__
+#define __PROCESS_FILTER_HPP__
+
+#include <process/event.hpp>
+
+namespace process {
+
+class Filter {
+public:
+ virtual ~Filter() {}
+ virtual bool filter(const MessageEvent& event) { return false; }
+ virtual bool filter(const DispatchEvent& event) { return false; }
+ virtual bool filter(const HttpEvent& event) { return false; }
+ virtual bool filter(const ExitedEvent& event) { return false; }
+};
+
+
+// Use the specified filter on messages that get enqueued (note,
+// however, that you cannot filter timeout messages).
+void filter(Filter* filter);
+
+} // namespace process {
+
+#endif // __PROCESS_FILTER_HPP__
http://git-wip-us.apache.org/repos/asf/incubator-mesos/blob/71a01bd9/3rdparty/libprocess/include/process/future.hpp
----------------------------------------------------------------------
diff --git a/3rdparty/libprocess/include/process/future.hpp b/3rdparty/libprocess/include/process/future.hpp
new file mode 100644
index 0000000..daf4b92
--- /dev/null
+++ b/3rdparty/libprocess/include/process/future.hpp
@@ -0,0 +1,1060 @@
+#ifndef __PROCESS_FUTURE_HPP__
+#define __PROCESS_FUTURE_HPP__
+
+#include <assert.h>
+#include <stdlib.h> // For abort.
+
+#include <iostream>
+#include <list>
+#include <queue>
+#include <set>
+
+#include <glog/logging.h>
+
+#include <tr1/functional>
+#include <tr1/memory> // TODO(benh): Replace shared_ptr with unique_ptr.
+
+#include <process/latch.hpp>
+#include <process/pid.hpp>
+
+#include <stout/duration.hpp>
+#include <stout/option.hpp>
+#include <stout/preprocessor.hpp>
+
+namespace process {
+
+// Forward declaration (instead of include to break circular dependency).
+template <typename _F> struct _Defer;
+
+namespace internal {
+
+template <typename T>
+struct wrap;
+
+template <typename T>
+struct unwrap;
+
+} // namespace internal {
+
+
+// Forward declaration of Promise.
+template <typename T>
+class Promise;
+
+
+// Definition of a "shared" future. A future can hold any
+// copy-constructible value. A future is considered "shared" because
+// by default a future can be accessed concurrently.
+template <typename T>
+class Future
+{
+public:
+ // Constructs a failed future.
+ static Future<T> failed(const std::string& message);
+
+ Future();
+ Future(const T& _t);
+ Future(const Future<T>& that);
+ ~Future();
+
+ // Futures are assignable (and copyable). This results in the
+ // reference to the previous future data being decremented and a
+ // reference to 'that' being incremented.
+ Future<T>& operator = (const Future<T>& that);
+
+ // Comparision operators useful for using futures in collections.
+ bool operator == (const Future<T>& that) const;
+ bool operator < (const Future<T>& that) const;
+
+ // Helpers to get the current state of this future.
+ bool isPending() const;
+ bool isReady() const;
+ bool isDiscarded() const;
+ bool isFailed() const;
+
+ // Discards this future. This is similar to cancelling a future,
+ // however it also occurs when the last reference to this future
+ // gets cleaned up. Returns false if the future could not be
+ // discarded (for example, because it is ready or failed).
+ bool discard();
+
+ // Waits for this future to become ready, discarded, or failed.
+ bool await(const Duration& duration = Seconds(-1)) const;
+
+ // Return the value associated with this future, waits indefinitely
+ // until a value gets associated or until the future is discarded.
+ T get() const;
+
+ // Returns the failure message associated with this future.
+ std::string failure() const;
+
+ // Type of the callback functions that can get invoked when the
+ // future gets set, fails, or is discarded.
+ typedef std::tr1::function<void(const T&)> ReadyCallback;
+ typedef std::tr1::function<void(const std::string&)> FailedCallback;
+ typedef std::tr1::function<void(void)> DiscardedCallback;
+ typedef std::tr1::function<void(const Future<T>&)> AnyCallback;
+
+ // Installs callbacks for the specified events and returns a const
+ // reference to 'this' in order to easily support chaining.
+ const Future<T>& onReady(const ReadyCallback& callback) const;
+ const Future<T>& onFailed(const FailedCallback& callback) const;
+ const Future<T>& onDiscarded(const DiscardedCallback& callback) const;
+ const Future<T>& onAny(const AnyCallback& callback) const;
+
+ // Installs callbacks that get executed when this future is ready
+ // and associates the result of the callback with the future that is
+ // returned to the caller (which may be of a different type).
+ template <typename X>
+ Future<X> then(const std::tr1::function<Future<X>(const T&)>& f) const;
+
+ template <typename X>
+ Future<X> then(const std::tr1::function<X(const T&)>& f) const;
+
+ // Helpers for the compiler to be able to forward std::tr1::bind results.
+ template <typename X>
+ Future<X> then(const std::tr1::_Bind<X(*(void))(void)>& b) const
+ {
+ return then(std::tr1::function<X(const T&)>(b));
+ }
+
+#define TEMPLATE(Z, N, DATA) \
+ template <typename X, \
+ ENUM_PARAMS(N, typename P), \
+ ENUM_PARAMS(N, typename A)> \
+ Future<X> then( \
+ const std::tr1::_Bind<X(*(ENUM_PARAMS(N, A))) \
+ (ENUM_PARAMS(N, P))>& b) const \
+ { \
+ return then(std::tr1::function<X(const T&)>(b)); \
+ }
+
+ REPEAT_FROM_TO(1, 11, TEMPLATE, _) // Args A0 -> A9.
+#undef TEMPLATE
+
+ template <typename X>
+ Future<X> then(const std::tr1::_Bind<Future<X>(*(void))(void)>& b) const
+ {
+ return then(std::tr1::function<Future<X>(const T&)>(b));
+ }
+
+#define TEMPLATE(Z, N, DATA) \
+ template <typename X, \
+ ENUM_PARAMS(N, typename P), \
+ ENUM_PARAMS(N, typename A)> \
+ Future<X> then( \
+ const std::tr1::_Bind<Future<X>(*(ENUM_PARAMS(N, A))) \
+ (ENUM_PARAMS(N, P))>& b) const \
+ { \
+ return then(std::tr1::function<Future<X>(const T&)>(b)); \
+ }
+
+ REPEAT_FROM_TO(1, 11, TEMPLATE, _) // Args A0 -> A9.
+#undef TEMPLATE
+
+ // Helpers for the compiler to be able to forward 'defer' results.
+ template <typename X, typename U>
+ Future<X> then(const _Defer<Future<X>(*(PID<U>, X(U::*)(void)))
+ (const PID<U>&, X(U::*)(void))>& d) const
+ {
+ return then(std::tr1::function<Future<X>(const T&)>(d));
+ }
+
+#define TEMPLATE(Z, N, DATA) \
+ template <typename X, \
+ typename U, \
+ ENUM_PARAMS(N, typename P), \
+ ENUM_PARAMS(N, typename A)> \
+ Future<X> then( \
+ const _Defer<Future<X>(*(PID<U>, \
+ X(U::*)(ENUM_PARAMS(N, P)), \
+ ENUM_PARAMS(N, A))) \
+ (const PID<U>&, \
+ X(U::*)(ENUM_PARAMS(N, P)), \
+ ENUM_PARAMS(N, P))>& d) const \
+ { \
+ return then(std::tr1::function<Future<X>(const T&)>(d)); \
+ }
+
+ REPEAT_FROM_TO(1, 11, TEMPLATE, _) // Args A0 -> A9.
+#undef TEMPLATE
+
+ template <typename X, typename U>
+ Future<X> then(const _Defer<Future<X>(*(PID<U>, Future<X>(U::*)(void)))
+ (const PID<U>&, Future<X>(U::*)(void))>& d) const
+ {
+ return then(std::tr1::function<Future<X>(const T&)>(d));
+ }
+
+#define TEMPLATE(Z, N, DATA) \
+ template <typename X, \
+ typename U, \
+ ENUM_PARAMS(N, typename P), \
+ ENUM_PARAMS(N, typename A)> \
+ Future<X> then( \
+ const _Defer<Future<X>(*(PID<U>, \
+ Future<X>(U::*)(ENUM_PARAMS(N, P)), \
+ ENUM_PARAMS(N, A))) \
+ (const PID<U>&, \
+ Future<X>(U::*)(ENUM_PARAMS(N, P)), \
+ ENUM_PARAMS(N, P))>& d) const \
+ { \
+ return then(std::tr1::function<Future<X>(const T&)>(d)); \
+ }
+
+ REPEAT_FROM_TO(1, 11, TEMPLATE, _) // Args A0 -> A9.
+#undef TEMPLATE
+
+ // C++11 implementation (covers all functors).
+#if __cplusplus >= 201103L
+ template <typename F>
+ auto then(F f) const
+ -> typename internal::wrap<decltype(f(T()))>::Type;
+#endif
+
+private:
+ friend class Promise<T>;
+
+ // Sets the value for this future, unless the future is already set,
+ // failed, or discarded, in which case it returns false.
+ bool set(const T& _t);
+
+ // Sets this future as failed, unless the future is already set,
+ // failed, or discarded, in which case it returns false.
+ bool fail(const std::string& _message);
+
+ void copy(const Future<T>& that);
+ void cleanup();
+
+ enum State {
+ PENDING,
+ READY,
+ FAILED,
+ DISCARDED,
+ };
+
+ int* refs;
+ int* lock;
+ State* state;
+ T** t;
+ std::string** message; // Message associated with failure.
+ std::queue<ReadyCallback>* onReadyCallbacks;
+ std::queue<FailedCallback>* onFailedCallbacks;
+ std::queue<DiscardedCallback>* onDiscardedCallbacks;
+ std::queue<AnyCallback>* onAnyCallbacks;
+ Latch* latch;
+};
+
+
+// TODO(benh): Make Promise a subclass of Future?
+template <typename T>
+class Promise
+{
+public:
+ Promise();
+ Promise(const T& t);
+ ~Promise();
+
+ bool set(const T& _t);
+ bool set(const Future<T>& future); // Alias for associate.
+ bool associate(const Future<T>& future);
+ bool fail(const std::string& message);
+
+ // Returns a copy of the future associated with this promise.
+ Future<T> future() const;
+
+private:
+ // Not copyable, not assignable.
+ Promise(const Promise<T>&);
+ Promise<T>& operator = (const Promise<T>&);
+
+ Future<T> f;
+};
+
+
+template <>
+class Promise<void>;
+
+
+template <typename T>
+class Promise<T&>;
+
+
+template <typename T>
+Promise<T>::Promise() {}
+
+
+template <typename T>
+Promise<T>::Promise(const T& t)
+ : f(t) {}
+
+
+template <typename T>
+Promise<T>::~Promise() {}
+
+
+template <typename T>
+bool Promise<T>::set(const T& t)
+{
+ return f.set(t);
+}
+
+
+template <typename T>
+bool Promise<T>::set(const Future<T>& future)
+{
+ return associate(future);
+}
+
+
+template <typename T>
+bool Promise<T>::associate(const Future<T>& future)
+{
+ if (!f.isPending()) {
+ return false;
+ }
+
+ future
+ .onReady(std::tr1::bind(&Future<T>::set, f, std::tr1::placeholders::_1))
+ .onFailed(std::tr1::bind(&Future<T>::fail, f, std::tr1::placeholders::_1))
+ .onDiscarded(std::tr1::bind(&Future<T>::discard, f));
+
+ return true;
+}
+
+
+template <typename T>
+bool Promise<T>::fail(const std::string& message)
+{
+ return f.fail(message);
+}
+
+
+template <typename T>
+Future<T> Promise<T>::future() const
+{
+ return f;
+}
+
+
+// Internal helper utilities.
+namespace internal {
+
+template <typename T>
+struct wrap
+{
+ typedef Future<T> Type;
+};
+
+
+template <typename X>
+struct wrap<Future<X> >
+{
+ typedef Future<X> Type;
+};
+
+
+template <typename T>
+struct unwrap
+{
+ typedef T Type;
+};
+
+
+template <typename X>
+struct unwrap<Future<X> >
+{
+ typedef X Type;
+};
+
+
+inline void acquire(int* lock)
+{
+ while (!__sync_bool_compare_and_swap(lock, 0, 1)) {
+ asm volatile ("pause");
+ }
+}
+
+
+inline void release(int* lock)
+{
+ // Unlock via a compare-and-swap so we get a memory barrier too.
+ bool unlocked = __sync_bool_compare_and_swap(lock, 1, 0);
+ assert(unlocked);
+}
+
+
+template <typename T>
+void select(
+ const Future<T>& future,
+ std::tr1::shared_ptr<Promise<Future<T > > > promise)
+{
+ // We never fail the future associated with our promise.
+ assert(!promise->future().isFailed());
+
+ if (promise->future().isPending()) { // No-op if it's discarded.
+ if (future.isReady()) { // We only set the promise if a future is ready.
+ promise->set(future);
+ }
+ }
+}
+
+} // namespace internal {
+
+
+// TODO(benh): Move select and discard into 'futures' namespace.
+
+// Returns a future that captures any ready future in a set. Note that
+// select DOES NOT capture a future that has failed or been discarded.
+template <typename T>
+Future<Future<T> > select(const std::set<Future<T> >& futures)
+{
+ std::tr1::shared_ptr<Promise<Future<T> > > promise(
+ new Promise<Future<T> >());
+
+ Future<Future<T> > future = promise->future();
+
+ std::tr1::function<void(const Future<T>&)> select =
+ std::tr1::bind(&internal::select<T>,
+ std::tr1::placeholders::_1,
+ promise);
+
+ typename std::set<Future<T> >::iterator iterator;
+ for (iterator = futures.begin(); iterator != futures.end(); ++iterator) {
+ (*iterator).onAny(std::tr1::bind(select, std::tr1::placeholders::_1));
+ }
+
+ return future;
+}
+
+
+template <typename T>
+void discard(const std::set<Future<T> >& futures)
+{
+ typename std::set<Future<T> >::const_iterator iterator;
+ for (iterator = futures.begin(); iterator != futures.end(); ++iterator) {
+ Future<T> future = *iterator; // Need a non-const copy to discard.
+ future.discard();
+ }
+}
+
+
+template <typename T>
+void discard(const std::list<Future<T> >& futures)
+{
+ typename std::list<Future<T> >::const_iterator iterator;
+ for (iterator = futures.begin(); iterator != futures.end(); ++iterator) {
+ Future<T> future = *iterator; // Need a non-const copy to discard.
+ future.discard();
+ }
+}
+
+
+template <class T>
+void fail(const std::vector<Promise<T>*>& promises, const std::string& message)
+{
+ typename std::vector<Promise<T>*>::const_iterator iterator;
+ for (iterator = promises.begin(); iterator != promises.end(); ++iterator) {
+ Promise<T>* promise = *iterator;
+ promise->fail(message);
+ }
+}
+
+
+template <class T>
+void fail(const std::list<Promise<T>*>& promises, const std::string& message)
+{
+ typename std::list<Promise<T>*>::const_iterator iterator;
+ for (iterator = promises.begin(); iterator != promises.end(); ++iterator) {
+ Promise<T>* promise = *iterator;
+ promise->fail(message);
+ }
+}
+
+
+template <typename T>
+Future<T> Future<T>::failed(const std::string& message)
+{
+ Future<T> future;
+ future.fail(message);
+ return future;
+}
+
+
+template <typename T>
+Future<T>::Future()
+ : refs(new int(1)),
+ lock(new int(0)),
+ state(new State(PENDING)),
+ t(new T*(NULL)),
+ message(new std::string*(NULL)),
+ onReadyCallbacks(new std::queue<ReadyCallback>()),
+ onFailedCallbacks(new std::queue<FailedCallback>()),
+ onDiscardedCallbacks(new std::queue<DiscardedCallback>()),
+ onAnyCallbacks(new std::queue<AnyCallback>()),
+ latch(new Latch()) {}
+
+
+template <typename T>
+Future<T>::Future(const T& _t)
+ : refs(new int(1)),
+ lock(new int(0)),
+ state(new State(PENDING)),
+ t(new T*(NULL)),
+ message(new std::string*(NULL)),
+ onReadyCallbacks(new std::queue<ReadyCallback>()),
+ onFailedCallbacks(new std::queue<FailedCallback>()),
+ onDiscardedCallbacks(new std::queue<DiscardedCallback>()),
+ onAnyCallbacks(new std::queue<AnyCallback>()),
+ latch(new Latch())
+{
+ set(_t);
+}
+
+
+template <typename T>
+Future<T>::Future(const Future<T>& that)
+{
+ copy(that);
+}
+
+
+template <typename T>
+Future<T>::~Future()
+{
+ cleanup();
+}
+
+
+template <typename T>
+Future<T>& Future<T>::operator = (const Future<T>& that)
+{
+ if (this != &that) {
+ cleanup();
+ copy(that);
+ }
+ return *this;
+}
+
+
+template <typename T>
+bool Future<T>::operator == (const Future<T>& that) const
+{
+ assert(latch != NULL);
+ assert(that.latch != NULL);
+ return *latch == *that.latch;
+}
+
+
+template <typename T>
+bool Future<T>::operator < (const Future<T>& that) const
+{
+ assert(latch != NULL);
+ assert(that.latch != NULL);
+ return *latch < *that.latch;
+}
+
+
+template <typename T>
+bool Future<T>::discard()
+{
+ bool result = false;
+
+ assert(lock != NULL);
+ internal::acquire(lock);
+ {
+ assert(state != NULL);
+ if (*state == PENDING) {
+ *state = DISCARDED;
+ latch->trigger();
+ result = true;
+ }
+ }
+ internal::release(lock);
+
+ // Invoke all callbacks associated with this future being
+ // DISCARDED. We don't need a lock because the state is now in
+ // DISCARDED so there should not be any concurrent modifications.
+ if (result) {
+ while (!onDiscardedCallbacks->empty()) {
+ // TODO(*): Invoke callbacks in another execution context.
+ onDiscardedCallbacks->front()();
+ onDiscardedCallbacks->pop();
+ }
+
+ while (!onAnyCallbacks->empty()) {
+ // TODO(*): Invoke callbacks in another execution context.
+ onAnyCallbacks->front()(*this);
+ onAnyCallbacks->pop();
+ }
+ }
+
+ return result;
+}
+
+
+template <typename T>
+bool Future<T>::isPending() const
+{
+ assert(state != NULL);
+ return *state == PENDING;
+}
+
+
+template <typename T>
+bool Future<T>::isReady() const
+{
+ assert(state != NULL);
+ return *state == READY;
+}
+
+
+template <typename T>
+bool Future<T>::isDiscarded() const
+{
+ assert(state != NULL);
+ return *state == DISCARDED;
+}
+
+
+template <typename T>
+bool Future<T>::isFailed() const
+{
+ assert(state != NULL);
+ return *state == FAILED;
+}
+
+
+template <typename T>
+bool Future<T>::await(const Duration& duration) const
+{
+ if (!isReady() && !isDiscarded() && !isFailed()) {
+ assert(latch != NULL);
+ return latch->await(duration);
+ }
+ return true;
+}
+
+
+template <typename T>
+T Future<T>::get() const
+{
+ if (!isReady()) {
+ await();
+ }
+
+ CHECK(!isPending()) << "Future was in PENDING after await()";
+
+ if (!isReady()) {
+ if (isFailed()) {
+ std::cerr << "Future::get() but state == FAILED: "
+ << failure() << std::endl;
+ } else if (isDiscarded()) {
+ std::cerr << "Future::get() but state == DISCARDED" << std::endl;
+ }
+ abort();
+ }
+
+ assert(t != NULL);
+ assert(*t != NULL);
+ return **t;
+}
+
+
+template <typename T>
+std::string Future<T>::failure() const
+{
+ assert(message != NULL);
+ if (*message != NULL) {
+ return **message;
+ }
+
+ return "";
+}
+
+
+template <typename T>
+const Future<T>& Future<T>::onReady(const ReadyCallback& callback) const
+{
+ bool run = false;
+
+ assert(lock != NULL);
+ internal::acquire(lock);
+ {
+ assert(state != NULL);
+ if (*state == READY) {
+ run = true;
+ } else if (*state == PENDING) {
+ onReadyCallbacks->push(callback);
+ }
+ }
+ internal::release(lock);
+
+ // TODO(*): Invoke callback in another execution context.
+ if (run) {
+ callback(**t);
+ }
+
+ return *this;
+}
+
+
+template <typename T>
+const Future<T>& Future<T>::onFailed(const FailedCallback& callback) const
+{
+ bool run = false;
+
+ assert(lock != NULL);
+ internal::acquire(lock);
+ {
+ assert(state != NULL);
+ if (*state == FAILED) {
+ run = true;
+ } else if (*state == PENDING) {
+ onFailedCallbacks->push(callback);
+ }
+ }
+ internal::release(lock);
+
+ // TODO(*): Invoke callback in another execution context.
+ if (run) {
+ callback(**message);
+ }
+
+ return *this;
+}
+
+
+template <typename T>
+const Future<T>& Future<T>::onDiscarded(
+ const DiscardedCallback& callback) const
+{
+ bool run = false;
+
+ assert(lock != NULL);
+ internal::acquire(lock);
+ {
+ assert(state != NULL);
+ if (*state == DISCARDED) {
+ run = true;
+ } else if (*state == PENDING) {
+ onDiscardedCallbacks->push(callback);
+ }
+ }
+ internal::release(lock);
+
+ // TODO(*): Invoke callback in another execution context.
+ if (run) {
+ callback();
+ }
+
+ return *this;
+}
+
+
+template <typename T>
+const Future<T>& Future<T>::onAny(const AnyCallback& callback) const
+{
+ bool run = false;
+
+ assert(lock != NULL);
+ internal::acquire(lock);
+ {
+ assert(state != NULL);
+ if (*state != PENDING) {
+ run = true;
+ } else if (*state == PENDING) {
+ onAnyCallbacks->push(callback);
+ }
+ }
+ internal::release(lock);
+
+ // TODO(*): Invoke callback in another execution context.
+ if (run) {
+ callback(*this);
+ }
+
+ return *this;
+}
+
+
+namespace internal {
+
+template <typename T, typename X>
+void thenf(const std::tr1::shared_ptr<Promise<X> >& promise,
+ const std::tr1::function<Future<X>(const T&)>& f,
+ const Future<T>& future)
+{
+ if (future.isReady()) {
+ promise->associate(f(future.get()));
+ } else if (future.isFailed()) {
+ promise->fail(future.failure());
+ } else if (future.isDiscarded()) {
+ promise->future().discard();
+ }
+}
+
+
+template <typename T, typename X>
+void then(const std::tr1::shared_ptr<Promise<X> >& promise,
+ const std::tr1::function<X(const T&)>& f,
+ const Future<T>& future)
+{
+ if (future.isReady()) {
+ promise->set(f(future.get()));
+ } else if (future.isFailed()) {
+ promise->fail(future.failure());
+ } else if (future.isDiscarded()) {
+ promise->future().discard();
+ }
+}
+
+} // namespace internal {
+
+
+template <typename T>
+template <typename X>
+Future<X> Future<T>::then(const std::tr1::function<Future<X>(const T&)>& f) const
+{
+ std::tr1::shared_ptr<Promise<X> > promise(new Promise<X>());
+
+ std::tr1::function<void(const Future<T>&)> thenf =
+ std::tr1::bind(&internal::thenf<T, X>,
+ promise,
+ f,
+ std::tr1::placeholders::_1);
+
+ onAny(thenf);
+
+ // Propagate discarding up the chain (note that we bind with a copy
+ // of this future since 'this' might no longer be valid but other
+ // references might still exist.
+ // TODO(benh): Need to pass 'future' as a weak_ptr so that we can
+ // avoid reference counting cycles!
+ std::tr1::function<void(void)> discard =
+ std::tr1::bind(&Future<T>::discard, *this);
+
+ promise->future().onDiscarded(discard);
+
+ return promise->future();
+}
+
+
+template <typename T>
+template <typename X>
+Future<X> Future<T>::then(const std::tr1::function<X(const T&)>& f) const
+{
+ std::tr1::shared_ptr<Promise<X> > promise(new Promise<X>());
+
+ std::tr1::function<void(const Future<T>&)> then =
+ std::tr1::bind(&internal::then<T, X>,
+ promise,
+ f,
+ std::tr1::placeholders::_1);
+
+ onAny(then);
+
+ // Propagate discarding up the chain (note that we bind with a copy
+ // of this future since 'this' might no longer be valid but other
+ // references might still exist.
+ // TODO(benh): Need to pass 'future' as a weak_ptr so that we can
+ // avoid reference counting cycles!
+ std::tr1::function<void(void)> discard =
+ std::tr1::bind(&Future<T>::discard, *this);
+
+ promise->future().onDiscarded(discard);
+
+ return promise->future();
+}
+
+
+#if __cplusplus >= 201103L
+template <typename T>
+template <typename F>
+auto Future<T>::then(F f) const
+ -> typename internal::wrap<decltype(f(T()))>::Type
+{
+ typedef typename internal::unwrap<decltype(f(T()))>::Type X;
+
+ std::tr1::shared_ptr<Promise<X>> promise(new Promise<X>());
+
+ onAny([=] (const Future<T>& future) {
+ if (future.isReady()) {
+ promise->set(f(future.get()));
+ } else if (future.isFailed()) {
+ promise->fail(future.failure());
+ } else if (future.isDiscarded()) {
+ promise->future().discard();
+ }
+ });
+
+ // TODO(benh): Need to use weak_ptr here so that we can avoid
+ // reference counting cycles!
+ Future<T> future(*this);
+
+ promise->future().onDiscarded([=] () {
+ future.discard(); // Need a non-const copy to discard.
+ });
+
+ return promise->future();
+}
+#endif
+
+
+template <typename T>
+bool Future<T>::set(const T& _t)
+{
+ bool result = false;
+
+ assert(lock != NULL);
+ internal::acquire(lock);
+ {
+ assert(state != NULL);
+ if (*state == PENDING) {
+ *t = new T(_t);
+ *state = READY;
+ latch->trigger();
+ result = true;
+ }
+ }
+ internal::release(lock);
+
+ // Invoke all callbacks associated with this future being READY. We
+ // don't need a lock because the state is now in READY so there
+ // should not be any concurrent modications.
+ if (result) {
+ while (!onReadyCallbacks->empty()) {
+ // TODO(*): Invoke callbacks in another execution context.
+ onReadyCallbacks->front()(**t);
+ onReadyCallbacks->pop();
+ }
+
+ while (!onAnyCallbacks->empty()) {
+ // TODO(*): Invoke callbacks in another execution context.
+ onAnyCallbacks->front()(*this);
+ onAnyCallbacks->pop();
+ }
+ }
+
+ return result;
+}
+
+
+template <typename T>
+bool Future<T>::fail(const std::string& _message)
+{
+ bool result = false;
+
+ assert(lock != NULL);
+ internal::acquire(lock);
+ {
+ assert(state != NULL);
+ if (*state == PENDING) {
+ *message = new std::string(_message);
+ *state = FAILED;
+ latch->trigger();
+ result = true;
+ }
+ }
+ internal::release(lock);
+
+ // Invoke all callbacks associated with this future being FAILED. We
+ // don't need a lock because the state is now in FAILED so there
+ // should not be any concurrent modications.
+ if (result) {
+ while (!onFailedCallbacks->empty()) {
+ // TODO(*): Invoke callbacks in another execution context.
+ onFailedCallbacks->front()(**message);
+ onFailedCallbacks->pop();
+ }
+
+ while (!onAnyCallbacks->empty()) {
+ // TODO(*): Invoke callbacks in another execution context.
+ onAnyCallbacks->front()(*this);
+ onAnyCallbacks->pop();
+ }
+ }
+
+ return result;
+}
+
+
+template <typename T>
+void Future<T>::copy(const Future<T>& that)
+{
+ assert(that.refs > 0);
+ __sync_fetch_and_add(that.refs, 1);
+ refs = that.refs;
+ lock = that.lock;
+ state = that.state;
+ t = that.t;
+ message = that.message;
+ onReadyCallbacks = that.onReadyCallbacks;
+ onFailedCallbacks = that.onFailedCallbacks;
+ onDiscardedCallbacks = that.onDiscardedCallbacks;
+ onAnyCallbacks = that.onAnyCallbacks;
+ latch = that.latch;
+}
+
+
+template <typename T>
+void Future<T>::cleanup()
+{
+ assert(refs != NULL);
+ if (__sync_sub_and_fetch(refs, 1) == 0) {
+ // Discard the future if it is still pending (so we invoke any
+ // discarded callbacks that have been setup). Note that we put the
+ // reference count back at 1 here in case one of the callbacks
+ // decides it wants to keep a reference.
+ assert(state != NULL);
+ if (*state == PENDING) {
+ *refs = 1;
+ discard();
+ __sync_sub_and_fetch(refs, 1);
+ }
+
+ // Now try and cleanup again (this time we know the future has
+ // either been discarded or was not pending). Note that one of the
+ // callbacks might have stored the future, in which case we'll
+ // just return without doing anything, but the state will forever
+ // be "discarded".
+ assert(refs != NULL);
+ if (*refs == 0) {
+ delete refs;
+ refs = NULL;
+ assert(lock != NULL);
+ delete lock;
+ lock = NULL;
+ assert(state != NULL);
+ delete state;
+ state = NULL;
+ assert(t != NULL);
+ delete *t;
+ delete t;
+ t = NULL;
+ assert(message != NULL);
+ delete *message;
+ delete message;
+ message = NULL;
+ assert(onReadyCallbacks != NULL);
+ delete onReadyCallbacks;
+ onReadyCallbacks = NULL;
+ assert(onFailedCallbacks != NULL);
+ delete onFailedCallbacks;
+ onFailedCallbacks = NULL;
+ assert(onDiscardedCallbacks != NULL);
+ delete onDiscardedCallbacks;
+ onDiscardedCallbacks = NULL;
+ assert(onAnyCallbacks != NULL);
+ delete onAnyCallbacks;
+ onAnyCallbacks = NULL;
+ assert(latch != NULL);
+ delete latch;
+ latch = NULL;
+ }
+ }
+}
+
+} // namespace process {
+
+#endif // __PROCESS_FUTURE_HPP__
http://git-wip-us.apache.org/repos/asf/incubator-mesos/blob/71a01bd9/3rdparty/libprocess/include/process/gc.hpp
----------------------------------------------------------------------
diff --git a/3rdparty/libprocess/include/process/gc.hpp b/3rdparty/libprocess/include/process/gc.hpp
new file mode 100644
index 0000000..e83c636
--- /dev/null
+++ b/3rdparty/libprocess/include/process/gc.hpp
@@ -0,0 +1,46 @@
+#ifndef __PROCESS_GC_HPP__
+#define __PROCESS_GC_HPP__
+
+#include <map>
+
+#include <process/process.hpp>
+
+
+namespace process {
+
+class GarbageCollector : public Process<GarbageCollector>
+{
+public:
+ GarbageCollector() : ProcessBase("__gc__") {}
+ virtual ~GarbageCollector() {}
+
+ template <typename T>
+ void manage(const T* t)
+ {
+ const ProcessBase* process = t;
+ if (process != NULL) {
+ processes[process->self()] = process;
+ link(process->self());
+ }
+ }
+
+protected:
+ virtual void exited(const UPID& pid)
+ {
+ if (processes.count(pid) > 0) {
+ const ProcessBase* process = processes[pid];
+ processes.erase(pid);
+ delete process;
+ }
+ }
+
+private:
+ std::map<UPID, const ProcessBase*> processes;
+};
+
+
+extern PID<GarbageCollector> gc;
+
+} // namespace process {
+
+#endif // __PROCESS_GC_HPP__
http://git-wip-us.apache.org/repos/asf/incubator-mesos/blob/71a01bd9/3rdparty/libprocess/include/process/gmock.hpp
----------------------------------------------------------------------
diff --git a/3rdparty/libprocess/include/process/gmock.hpp b/3rdparty/libprocess/include/process/gmock.hpp
new file mode 100644
index 0000000..a8cab4c
--- /dev/null
+++ b/3rdparty/libprocess/include/process/gmock.hpp
@@ -0,0 +1,327 @@
+#ifndef __PROCESS_GMOCK_HPP__
+#define __PROCESS_GMOCK_HPP__
+
+#include <pthread.h>
+
+#include <gmock/gmock.h>
+
+#include <tr1/tuple>
+
+#include <process/dispatch.hpp>
+#include <process/event.hpp>
+#include <process/filter.hpp>
+#include <process/pid.hpp>
+
+#include <stout/exit.hpp>
+#include <stout/nothing.hpp>
+
+
+// THIS IS DEPRECATED AND BROKEN! REPLACE ALL USES!
+#define EXPECT_MESSAGE(name, from, to) \
+ EXPECT_CALL(*new process::MockFilter(), \
+ filter(testing::A<const process::MessageEvent&>())) \
+ .With(process::MessageMatcher(name, from, to))
+
+
+// THIS IS DEPRECATED AND BROKEN! REPLACE ALL USES!
+#define EXPECT_DISPATCH(pid, method) \
+ EXPECT_CALL(*new process::MockFilter(), \
+ filter(testing::A<const process::DispatchEvent&>())) \
+ .With(process::DispatchMatcher(pid, method))
+
+
+#define FUTURE_MESSAGE(name, from, to) \
+ process::FutureMessage(name, from, to)
+
+#define DROP_MESSAGE(name, from, to) \
+ process::FutureMessage(name, from, to, true)
+
+#define FUTURE_DISPATCH(pid, method) \
+ process::FutureDispatch(pid, method)
+
+#define DROP_DISPATCH(pid, method) \
+ process::FutureDispatch(pid, method, true)
+
+#define DROP_MESSAGES(name, from, to) \
+ process::DropMessages(name, from, to)
+
+#define DROP_DISPATCHES(pid, method) \
+ process::DropDispatches(pid, method)
+
+
+ACTION_TEMPLATE(PromiseArg,
+ HAS_1_TEMPLATE_PARAMS(int, k),
+ AND_1_VALUE_PARAMS(promise))
+{
+ // TODO(benh): Use a shared_ptr for promise to defend against this
+ // action getting invoked more than once (e.g., used via
+ // WillRepeatedly). We won't be able to set it a second time but at
+ // least we won't get a segmentation fault. We could also consider
+ // warning users if they attempted to set it more than once.
+ promise->set(std::tr1::get<k>(args));
+ delete promise;
+}
+
+
+template <int index, typename T>
+PromiseArgActionP<index, process::Promise<T>*> FutureArg(
+ process::Future<T>* future)
+{
+ process::Promise<T>* promise = new process::Promise<T>();
+ *future = promise->future();
+ return PromiseArg<index>(promise);
+}
+
+
+ACTION_TEMPLATE(PromiseArgField,
+ HAS_1_TEMPLATE_PARAMS(int, k),
+ AND_2_VALUE_PARAMS(field, promise))
+{
+ // TODO(benh): Use a shared_ptr for promise to defend against this
+ // action getting invoked more than once (e.g., used via
+ // WillRepeatedly). We won't be able to set it a second time but at
+ // least we won't get a segmentation fault. We could also consider
+ // warning users if they attempted to set it more than once.
+ promise->set(*(std::tr1::get<k>(args).*field));
+ delete promise;
+}
+
+
+template <int index, typename Field, typename T>
+PromiseArgFieldActionP2<index, Field, process::Promise<T>*> FutureArgField(
+ Field field,
+ process::Future<T>* future)
+{
+ process::Promise<T>* promise = new process::Promise<T>();
+ *future = promise->future();
+ return PromiseArgField<index>(field, promise);
+}
+
+
+ACTION_P2(PromiseSatisfy, promise, value)
+{
+ promise->set(value);
+ delete promise;
+}
+
+
+template <typename T>
+PromiseSatisfyActionP2<process::Promise<T>*, T> FutureSatisfy(
+ process::Future<T>* future,
+ T t)
+{
+ process::Promise<T>* promise = new process::Promise<T>();
+ *future = promise->future();
+ return PromiseSatisfy(promise, t);
+}
+
+
+inline PromiseSatisfyActionP2<process::Promise<Nothing>*, Nothing>
+FutureSatisfy(process::Future<Nothing>* future)
+{
+ process::Promise<Nothing>* promise = new process::Promise<Nothing>();
+ *future = promise->future();
+ return PromiseSatisfy(promise, Nothing());
+}
+
+
+namespace process {
+
+class MockFilter : public Filter
+{
+public:
+ MockFilter()
+ {
+ EXPECT_CALL(*this, filter(testing::A<const MessageEvent&>()))
+ .WillRepeatedly(testing::Return(false));
+ EXPECT_CALL(*this, filter(testing::A<const DispatchEvent&>()))
+ .WillRepeatedly(testing::Return(false));
+ EXPECT_CALL(*this, filter(testing::A<const HttpEvent&>()))
+ .WillRepeatedly(testing::Return(false));
+ EXPECT_CALL(*this, filter(testing::A<const ExitedEvent&>()))
+ .WillRepeatedly(testing::Return(false));
+ }
+
+ MOCK_METHOD1(filter, bool(const MessageEvent&));
+ MOCK_METHOD1(filter, bool(const DispatchEvent&));
+ MOCK_METHOD1(filter, bool(const HttpEvent&));
+ MOCK_METHOD1(filter, bool(const ExitedEvent&));
+};
+
+
+// A definition of a libprocess filter to enable waiting for events
+// (such as messages or dispatches) via in tests. This is not meant to
+// be used directly by tests; tests should use macros like
+// FUTURE_MESSAGE and FUTURE_DISPATCH instead.
+class TestsFilter : public Filter
+{
+public:
+ TestsFilter()
+ {
+ // We use a recursive mutex here in the event that satisfying the
+ // future created in FutureMessage or FutureDispatch via the
+ // FutureArgField or FutureSatisfy actions invokes callbacks (from
+ // Future::then or Future::onAny, etc) that themselves invoke
+ // FutureDispatch or FutureMessage.
+ pthread_mutexattr_t attr;
+ pthread_mutexattr_init(&attr);
+ pthread_mutexattr_settype(&attr, PTHREAD_MUTEX_RECURSIVE);
+ pthread_mutex_init(&mutex, &attr);
+ pthread_mutexattr_destroy(&attr);
+ }
+
+ virtual bool filter(const MessageEvent& event) { return handle(event); }
+ virtual bool filter(const DispatchEvent& event) { return handle(event); }
+ virtual bool filter(const HttpEvent& event) { return handle(event); }
+ virtual bool filter(const ExitedEvent& event) { return handle(event); }
+
+ template <typename T>
+ bool handle(const T& t)
+ {
+ pthread_mutex_lock(&mutex);
+ bool drop = mock.filter(t);
+ pthread_mutex_unlock(&mutex);
+ return drop;
+ }
+
+ MockFilter mock;
+ pthread_mutex_t mutex;;
+};
+
+
+class FilterTestEventListener : public ::testing::EmptyTestEventListener
+{
+public:
+ // Returns the singleton instance of the listener.
+ static FilterTestEventListener* instance()
+ {
+ static FilterTestEventListener* listener = new FilterTestEventListener();
+ return listener;
+ }
+
+ // Installs and returns the filter, creating it if necessary.
+ TestsFilter* install()
+ {
+ if (!started) {
+ EXIT(1)
+ << "To use FUTURE/DROP_MESSAGE/DISPATCH, etc. you need to do the "
+ << "following before you invoke RUN_ALL_TESTS():\n\n"
+ << "\t::testing::TestEventListeners& listeners =\n"
+ << "\t ::testing::UnitTest::GetInstance()->listeners();\n"
+ << "\tlisteners.Append(process::FilterTestEventListener::instance());";
+ }
+
+ if (filter != NULL) {
+ return filter;
+ }
+
+ filter = new TestsFilter();
+
+ // Set the filter in libprocess.
+ process::filter(filter);
+
+ return filter;
+ }
+
+ virtual void OnTestProgramStart(const ::testing::UnitTest&)
+ {
+ started = true;
+ }
+
+ virtual void OnTestEnd(const ::testing::TestInfo&)
+ {
+ if (filter != NULL) {
+ // Remove the filter in libprocess _before_ deleting.
+ process::filter(NULL);
+ delete filter;
+ filter = NULL;
+ }
+ }
+
+private:
+ FilterTestEventListener() : filter(NULL), started(false) {}
+
+ TestsFilter* filter;
+
+ // Indicates if we got the OnTestProgramStart callback in order to
+ // detect if we have been properly added as a listener.
+ bool started;
+};
+
+
+MATCHER_P3(MessageMatcher, name, from, to, "")
+{
+ const MessageEvent& event = ::std::tr1::get<0>(arg);
+ return (testing::Matcher<std::string>(name).Matches(event.message->name) &&
+ testing::Matcher<UPID>(from).Matches(event.message->from) &&
+ testing::Matcher<UPID>(to).Matches(event.message->to));
+}
+
+
+MATCHER_P2(DispatchMatcher, pid, method, "")
+{
+ const DispatchEvent& event = ::std::tr1::get<0>(arg);
+ return (testing::Matcher<UPID>(pid).Matches(event.pid) &&
+ testing::Matcher<std::string>(internal::canonicalize(method))
+ .Matches(event.method));
+}
+
+
+template <typename Name, typename From, typename To>
+Future<Message> FutureMessage(Name name, From from, To to, bool drop = false)
+{
+ TestsFilter* filter = FilterTestEventListener::instance()->install();
+ pthread_mutex_lock(&filter->mutex);
+ Future<Message> future;
+ EXPECT_CALL(filter->mock, filter(testing::A<const MessageEvent&>()))
+ .With(MessageMatcher(name, from, to))
+ .WillOnce(testing::DoAll(FutureArgField<0>(&MessageEvent::message, &future),
+ testing::Return(drop)))
+ .RetiresOnSaturation(); // Don't impose any subsequent expectations.
+ pthread_mutex_unlock(&filter->mutex);
+ return future;
+}
+
+
+template <typename PID, typename Method>
+Future<Nothing> FutureDispatch(PID pid, Method method, bool drop = false)
+{
+ TestsFilter* filter = FilterTestEventListener::instance()->install();
+ pthread_mutex_lock(&filter->mutex);
+ Future<Nothing> future;
+ EXPECT_CALL(filter->mock, filter(testing::A<const DispatchEvent&>()))
+ .With(DispatchMatcher(pid, method))
+ .WillOnce(testing::DoAll(FutureSatisfy(&future),
+ testing::Return(drop)))
+ .RetiresOnSaturation(); // Don't impose any subsequent expectations.
+ pthread_mutex_unlock(&filter->mutex);
+ return future;
+}
+
+
+template <typename Name, typename From, typename To>
+void DropMessages(Name name, From from, To to)
+{
+ TestsFilter* filter = FilterTestEventListener::instance()->install();
+ pthread_mutex_lock(&filter->mutex);
+ EXPECT_CALL(filter->mock, filter(testing::A<const MessageEvent&>()))
+ .With(MessageMatcher(name, from, to))
+ .WillRepeatedly(testing::Return(true));
+ pthread_mutex_unlock(&filter->mutex);
+}
+
+
+template <typename PID, typename Method>
+void DropDispatches(PID pid, Method method)
+{
+ TestsFilter* filter = FilterTestEventListener::instance()->install();
+ pthread_mutex_lock(&filter->mutex);
+ EXPECT_CALL(filter->mock, filter(testing::A<const DispatchEvent&>()))
+ .With(DispatchMatcher(pid, method))
+ .WillRepeatedly(testing::Return(true));
+ pthread_mutex_unlock(&filter->mutex);
+}
+
+} // namespace process {
+
+#endif // __PROCESS_GMOCK_HPP__