You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@mesos.apache.org by be...@apache.org on 2012/01/27 02:25:15 UTC
svn commit: r1236485 [5/7] - in /incubator/mesos/trunk: ./ include/mesos/
src/common/ src/exec/ src/local/ src/log/ src/master/ src/python/native/
src/sched/ src/slave/ src/tests/ src/zookeeper/ third_party/libprocess/
third_party/libprocess/include/pr...
Added: incubator/mesos/trunk/third_party/libprocess/include/process/event.hpp
URL: http://svn.apache.org/viewvc/incubator/mesos/trunk/third_party/libprocess/include/process/event.hpp?rev=1236485&view=auto
==============================================================================
--- incubator/mesos/trunk/third_party/libprocess/include/process/event.hpp (added)
+++ incubator/mesos/trunk/third_party/libprocess/include/process/event.hpp Fri Jan 27 01:25:13 2012
@@ -0,0 +1,181 @@
+#ifndef __PROCESS_EVENT_HPP__
+#define __PROCESS_EVENT_HPP__
+
+#include <tr1/functional>
+
+#include <process/future.hpp>
+#include <process/http.hpp>
+#include <process/message.hpp>
+
+namespace process {
+
+// Forward declarations.
+struct ProcessBase;
+struct MessageEvent;
+struct DispatchEvent;
+struct HttpEvent;
+struct ExitedEvent;
+struct TerminateEvent;
+
+
+struct EventVisitor
+{
+ virtual void visit(const MessageEvent& event) {}
+ virtual void visit(const DispatchEvent& event) {}
+ virtual void visit(const HttpEvent& event) {}
+ virtual void visit(const ExitedEvent& event) {}
+ virtual void visit(const TerminateEvent& event) {}
+};
+
+
+struct Event
+{
+ virtual void visit(EventVisitor* visitor) const = 0;
+
+ template <typename T>
+ bool is() const
+ {
+ bool result = false;
+ struct IsVisitor : EventVisitor
+ {
+ IsVisitor(bool* _result) : result(_result) {}
+ virtual void visit(const T& t) { *result = true; }
+ bool* result;
+ } visitor(&result);
+ visit(&visitor);
+ return result;
+ }
+
+ template <typename T>
+ const T& as() const
+ {
+ const T* result = NULL;
+ struct AsVisitor : EventVisitor
+ {
+ AsVisitor(const T** _result) : result(_result) {}
+ virtual void visit(const T& t) { *result = &t; }
+ const T** result;
+ } visitor(&result);
+ visit(&visitor);
+ if (result == NULL) {
+ std::cerr << "Attempting to \"cast\" event incorrectly!" << std::endl;
+ abort();
+ }
+ return *result;
+ }
+};
+
+
+struct MessageEvent : Event
+{
+ MessageEvent(Message* _message)
+ : message(_message) {}
+
+ ~MessageEvent()
+ {
+ delete message;
+ }
+
+ virtual void visit(EventVisitor* visitor) const
+ {
+ visitor->visit(*this);
+ }
+
+ Message* const message;
+
+private:
+ // Not copyable, not assignable.
+ MessageEvent(const MessageEvent&);
+ MessageEvent& operator = (const MessageEvent&);
+};
+
+
+struct HttpEvent : Event
+{
+ HttpEvent(int _c, HttpRequest* _request)
+ : c(_c), request(_request) {}
+
+ ~HttpEvent()
+ {
+ delete request;
+ }
+
+ virtual void visit(EventVisitor* visitor) const
+ {
+ visitor->visit(*this);
+ }
+
+ const int c;
+ HttpRequest* const request;
+
+private:
+ // Not copyable, not assignable.
+ HttpEvent(const HttpEvent&);
+ HttpEvent& operator = (const HttpEvent&);
+};
+
+
+struct DispatchEvent : Event
+{
+ DispatchEvent(std::tr1::function<void(ProcessBase*)>* _function)
+ : function(_function) {}
+
+ ~DispatchEvent()
+ {
+ delete function;
+ }
+
+ virtual void visit(EventVisitor* visitor) const
+ {
+ visitor->visit(*this);
+ }
+
+ std::tr1::function<void(ProcessBase*)>* const function;
+
+private:
+ // Not copyable, not assignable.
+ DispatchEvent(const DispatchEvent&);
+ DispatchEvent& operator = (const DispatchEvent&);
+};
+
+
+struct ExitedEvent : Event
+{
+ ExitedEvent(const UPID& _pid)
+ : pid(_pid) {}
+
+ virtual void visit(EventVisitor* visitor) const
+ {
+ visitor->visit(*this);
+ }
+
+ const UPID pid;
+
+private:
+ // Not copyable, not assignable.
+ ExitedEvent(const ExitedEvent&);
+ ExitedEvent& operator = (const ExitedEvent&);
+};
+
+
+struct TerminateEvent : Event
+{
+ TerminateEvent(const UPID& _from)
+ : from(_from) {}
+
+ virtual void visit(EventVisitor* visitor) const
+ {
+ visitor->visit(*this);
+ }
+
+ const UPID from;
+
+private:
+ // Not copyable, not assignable.
+ TerminateEvent(const TerminateEvent&);
+ TerminateEvent& operator = (const TerminateEvent&);
+};
+
+} // namespace event {
+
+#endif // __PROCESS_EVENT_HPP__
Copied: incubator/mesos/trunk/third_party/libprocess/include/process/executor.hpp (from r1235899, incubator/mesos/trunk/third_party/libprocess/include/process/async.hpp)
URL: http://svn.apache.org/viewvc/incubator/mesos/trunk/third_party/libprocess/include/process/executor.hpp?p2=incubator/mesos/trunk/third_party/libprocess/include/process/executor.hpp&p1=incubator/mesos/trunk/third_party/libprocess/include/process/async.hpp&r1=1235899&r2=1236485&rev=1236485&view=diff
==============================================================================
--- incubator/mesos/trunk/third_party/libprocess/include/process/async.hpp (original)
+++ incubator/mesos/trunk/third_party/libprocess/include/process/executor.hpp Fri Jan 27 01:25:13 2012
@@ -1,63 +1,31 @@
-#ifndef __PROCESS_ASYNC_HPP__
-#define __PROCESS_ASYNC_HPP__
+#ifndef __PROCESS_EXECUTOR_HPP__
+#define __PROCESS_EXECUTOR_HPP__
+#include <process/deferred.hpp>
#include <process/dispatch.hpp>
+#include <process/preprocessor.hpp>
-#include <boost/preprocessor/cat.hpp>
-
-#include <boost/preprocessor/arithmetic/inc.hpp>
-
-#include <boost/preprocessor/facilities/intercept.hpp>
-
-#include <boost/preprocessor/repetition/enum_params.hpp>
-#include <boost/preprocessor/repetition/enum_binary_params.hpp>
-#include <boost/preprocessor/repetition/enum_trailing_params.hpp>
-#include <boost/preprocessor/repetition/repeat.hpp>
-#include <boost/preprocessor/repetition/repeat_from_to.hpp>
-
-#define CAT BOOST_PP_CAT
-#define INC BOOST_PP_INC
-#define INTERCEPT BOOST_PP_INTERCEPT
-#define ENUM_PARAMS BOOST_PP_ENUM_PARAMS
-#define ENUM_BINARY_PARAMS BOOST_PP_ENUM_BINARY_PARAMS
-#define ENUM_TRAILING_PARAMS BOOST_PP_ENUM_TRAILING_PARAMS
-#define REPEAT BOOST_PP_REPEAT
-#define REPEAT_FROM_TO BOOST_PP_REPEAT_FROM_TO
-
-
-namespace async {
-
-// Acts like a function call but performs an asynchronous
-// dispatch. Thus, the "caller" knows that it will not block, for
-// example, within the "callee" callback.
-template <typename F>
-struct dispatch : std::tr1::function<F>
-{
-private:
- friend class Dispatch; // Only class capable of creating these.
- dispatch(const std::tr1::function<F>& f) : std::tr1::function<F>(f) {}
-};
-
+namespace process {
// Underlying "process" which handles invoking actual callbacks
-// created through a Dispatch object.
-class DispatchProcess : public process::Process<DispatchProcess>
+// created through an Executor.
+class ExecutorProcess : public process::Process<ExecutorProcess>
{
private:
- friend class Dispatch;
+ friend class Executor;
- DispatchProcess() {}
- ~DispatchProcess() {}
+ ExecutorProcess() {}
+ ~ExecutorProcess() {}
// Not copyable, not assignable.
- DispatchProcess(const DispatchProcess&);
- DispatchProcess& operator = (const DispatchProcess&);
+ ExecutorProcess(const ExecutorProcess&);
+ ExecutorProcess& operator = (const ExecutorProcess&);
// No arg invoke.
void invoke(const std::tr1::function<void()>& f) { f(); }
// Args invoke.
-#define APPLY(Z, N, DATA) \
+#define TEMPLATE(Z, N, DATA) \
template <ENUM_PARAMS(N, typename A)> \
void CAT(invoke, N)( \
const std::tr1::function<void(ENUM_PARAMS(N, A))>& f, \
@@ -66,85 +34,95 @@ private:
f(ENUM_PARAMS(N, a)); \
}
- REPEAT_FROM_TO(1, 11, APPLY, _) // Args A0 -> A9.
-#undef APPLY
+ REPEAT_FROM_TO(1, 11, TEMPLATE, _) // Args A0 -> A9.
+#undef TEMPLATE
};
// Provides an abstraction that can take a standard function object
-// and convert it to a dispatch object. Each converted function object
-// will get invoked serially with respect to one another.
-class Dispatch
+// and convert it to a 'deferred'. Each converted function object will
+// get invoked serially with respect to one another.
+class Executor
{
public:
- Dispatch() {
+ Executor() {
process::spawn(process);
}
- ~Dispatch()
+ ~Executor()
{
process::terminate(process);
process::wait(process);
}
+ bool stop()
+ {
+ process::terminate(process);
+
+ // TODO(benh): Note that this doesn't wait because that could
+ // cause a deadlock ... thus, the semantics here are that no more
+ // dispatches will occur after this function returns but one may
+ // be occuring concurrently.
+ }
+
// We can't easily use 'std::tr1::_Placeholder<X>' when doing macro
// expansion via ENUM_BINARY_PARAMS because compilers don't like it
// when you try and concatenate '<' 'N' '>'. Thus, we typedef them.
private:
-#define APPLY(Z, N, DATA) \
+#define TEMPLATE(Z, N, DATA) \
typedef std::tr1::_Placeholder<INC(N)> _ ## N;
- REPEAT(10, APPLY, _)
-#undef APPLY
+ REPEAT(10, TEMPLATE, _)
+#undef TEMPLATE
public:
// We provide wrappers for all standard function objects.
- dispatch<void()> operator () (
- const std::tr1::function<void()>& f)
+ deferred<void(void)> defer(
+ const std::tr1::function<void(void)>& f)
{
- return dispatch<void()>(
+ return deferred<void(void)>(
std::tr1::bind(
- &Dispatch::dispatcher,
+ &Executor::dispatcher,
process.self(), f));
}
-#define APPLY(Z, N, DATA) \
+#define TEMPLATE(Z, N, DATA) \
template <ENUM_PARAMS(N, typename A)> \
- dispatch<void(ENUM_PARAMS(N, A))> operator () ( \
+ deferred<void(ENUM_PARAMS(N, A))> defer( \
const std::tr1::function<void(ENUM_PARAMS(N, A))>& f) \
{ \
- return dispatch<void(ENUM_PARAMS(N, A))>( \
+ return deferred<void(ENUM_PARAMS(N, A))>( \
std::tr1::bind( \
- &Dispatch::CAT(dispatcher, N)<ENUM_PARAMS(N, A)>, \
+ &Executor::CAT(dispatcher, N)<ENUM_PARAMS(N, A)>, \
process.self(), f, \
ENUM_BINARY_PARAMS(N, _, () INTERCEPT))); \
}
- REPEAT_FROM_TO(1, 11, APPLY, _) // Args A0 -> A9.
-#undef APPLY
+ REPEAT_FROM_TO(1, 11, TEMPLATE, _) // Args A0 -> A9.
+#undef TEMPLATE
// Unfortunately, it is currently difficult to "forward" type
// information from one result to another, so we must explicilty
// define wrappers for all std::tr1::bind results. First we start
// with the non-member std::tr1::bind results.
- dispatch<void()> operator () (
+ deferred<void(void)> defer(
const std::tr1::_Bind<void(*())()>& b)
{
- return (*this)(std::tr1::function<void()>(b));
+ return defer(std::tr1::function<void()>(b));
}
-#define APPLY(Z, N, DATA) \
+#define TEMPLATE(Z, N, DATA) \
template <ENUM_PARAMS(N, typename A)> \
- dispatch<void(ENUM_PARAMS(N, A))> operator () ( \
+ deferred<void(ENUM_PARAMS(N, A))> defer( \
const std::tr1::_Bind< \
void(*(ENUM_PARAMS(N, _))) \
(ENUM_PARAMS(N, A))>& b) \
{ \
- return (*this)(std::tr1::function<void(ENUM_PARAMS(N, A))>(b)); \
+ return defer(std::tr1::function<void(ENUM_PARAMS(N, A))>(b)); \
}
- REPEAT_FROM_TO(1, 11, APPLY, _) // Args A0 -> A9.
-#undef APPLY
+ REPEAT_FROM_TO(1, 11, TEMPLATE, _) // Args A0 -> A9.
+#undef TEMPLATE
// Now the member std::tr1::bind results:
// 1. Non-const member (function), non-const pointer (receiver).
@@ -155,122 +133,113 @@ public:
// 6. Const member, const reference.
// 7. Non-const member, value.
// 8. Const member, value.
-#define APPLY(Z, N, DATA) \
+#define TEMPLATE(Z, N, DATA) \
template <typename T ENUM_TRAILING_PARAMS(N, typename A)> \
- dispatch<void(ENUM_PARAMS(N, A))> operator () ( \
+ deferred<void(ENUM_PARAMS(N, A))> defer( \
const std::tr1::_Bind<std::tr1::_Mem_fn< \
void(T::*)(ENUM_PARAMS(N, A))> \
(T* ENUM_TRAILING_PARAMS(N, _))>& b) \
{ \
- return (*this)(std::tr1::function<void(ENUM_PARAMS(N, A))>(b)); \
+ return defer(std::tr1::function<void(ENUM_PARAMS(N, A))>(b)); \
} \
\
template <typename T ENUM_TRAILING_PARAMS(N, typename A)> \
- dispatch<void(ENUM_PARAMS(N, A))> operator () ( \
+ deferred<void(ENUM_PARAMS(N, A))> defer( \
const std::tr1::_Bind<std::tr1::_Mem_fn< \
void(T::*)(ENUM_PARAMS(N, A)) const> \
(T* ENUM_TRAILING_PARAMS(N, _))>& b) \
{ \
- return (*this)(std::tr1::function<void(ENUM_PARAMS(N, A))>(b)); \
+ return defer(std::tr1::function<void(ENUM_PARAMS(N, A))>(b)); \
} \
\
template <typename T ENUM_TRAILING_PARAMS(N, typename A)> \
- dispatch<void(ENUM_PARAMS(N, A))> operator () ( \
+ deferred<void(ENUM_PARAMS(N, A))> defer( \
const std::tr1::_Bind<std::tr1::_Mem_fn< \
void(T::*)(ENUM_PARAMS(N, A)) const> \
(const T* ENUM_TRAILING_PARAMS(N, _))>& b) \
{ \
- return (*this)(std::tr1::function<void(ENUM_PARAMS(N, A))>(b)); \
+ return defer(std::tr1::function<void(ENUM_PARAMS(N, A))>(b)); \
} \
\
template <typename T ENUM_TRAILING_PARAMS(N, typename A)> \
- dispatch<void(ENUM_PARAMS(N, A))> operator () ( \
+ deferred<void(ENUM_PARAMS(N, A))> defer( \
const std::tr1::_Bind<std::tr1::_Mem_fn< \
void(T::*)(ENUM_PARAMS(N, A))> \
(std::tr1::reference_wrapper<T> ENUM_TRAILING_PARAMS(N, _))>& b) \
{ \
- return (*this)(std::tr1::function<void(ENUM_PARAMS(N, A))>(b)); \
+ return defer(std::tr1::function<void(ENUM_PARAMS(N, A))>(b)); \
} \
\
template <typename T ENUM_TRAILING_PARAMS(N, typename A)> \
- dispatch<void(ENUM_PARAMS(N, A))> operator () ( \
+ deferred<void(ENUM_PARAMS(N, A))> defer( \
const std::tr1::_Bind<std::tr1::_Mem_fn< \
void(T::*)(ENUM_PARAMS(N, A)) const> \
(std::tr1::reference_wrapper<T> ENUM_TRAILING_PARAMS(N, _))>& b) \
{ \
- return (*this)(std::tr1::function<void(ENUM_PARAMS(N, A))>(b)); \
+ return defer(std::tr1::function<void(ENUM_PARAMS(N, A))>(b)); \
} \
\
template <typename T ENUM_TRAILING_PARAMS(N, typename A)> \
- dispatch<void(ENUM_PARAMS(N, A))> operator () ( \
+ deferred<void(ENUM_PARAMS(N, A))> defer( \
const std::tr1::_Bind<std::tr1::_Mem_fn< \
void(T::*)(ENUM_PARAMS(N, A)) const> \
(std::tr1::reference_wrapper<const T> ENUM_TRAILING_PARAMS(N, _))>& b) \
{ \
- return (*this)(std::tr1::function<void(ENUM_PARAMS(N, A))>(b)); \
+ return defer(std::tr1::function<void(ENUM_PARAMS(N, A))>(b)); \
} \
\
template <typename T ENUM_TRAILING_PARAMS(N, typename A)> \
- dispatch<void(ENUM_PARAMS(N, A))> operator () ( \
+ deferred<void(ENUM_PARAMS(N, A))> defer( \
const std::tr1::_Bind<std::tr1::_Mem_fn< \
void(T::*)(ENUM_PARAMS(N, A))> \
(T ENUM_TRAILING_PARAMS(N, _))>& b) \
{ \
- return (*this)(std::tr1::function<void(ENUM_PARAMS(N, A))>(b)); \
+ return defer(std::tr1::function<void(ENUM_PARAMS(N, A))>(b)); \
} \
\
template <typename T ENUM_TRAILING_PARAMS(N, typename A)> \
- dispatch<void(ENUM_PARAMS(N, A))> operator () ( \
+ deferred<void(ENUM_PARAMS(N, A))> defer( \
const std::tr1::_Bind<std::tr1::_Mem_fn< \
void(T::*)(ENUM_PARAMS(N, A)) const> \
(T ENUM_TRAILING_PARAMS(N, _))>& b) \
{ \
- return (*this)(std::tr1::function<void(ENUM_PARAMS(N, A))>(b)); \
+ return defer(std::tr1::function<void(ENUM_PARAMS(N, A))>(b)); \
}
- REPEAT(11, APPLY, _) // No args and args A0 -> A9.
-#undef APPLY
+ REPEAT(11, TEMPLATE, _) // No args and args A0 -> A9.
+#undef TEMPLATE
private:
// Not copyable, not assignable.
- Dispatch(const Dispatch&);
- Dispatch& operator = (const Dispatch&);
+ Executor(const Executor&);
+ Executor& operator = (const Executor&);
static void dispatcher(
- const process::PID<DispatchProcess>& pid,
- const std::tr1::function<void()>& f)
+ const process::PID<ExecutorProcess>& pid,
+ const std::tr1::function<void(void)>& f)
{
- process::dispatch(pid, &DispatchProcess::invoke, f);
+ process::dispatch(pid, &ExecutorProcess::invoke, f);
}
-#define APPLY(Z, N, DATA) \
+#define TEMPLATE(Z, N, DATA) \
template <ENUM_PARAMS(N, typename A)> \
static void CAT(dispatcher, N)( \
- const process::PID<DispatchProcess>& pid, \
+ const process::PID<ExecutorProcess>& pid, \
const std::tr1::function<void(ENUM_PARAMS(N, A))>& f, \
ENUM_BINARY_PARAMS(N, A, a)) \
{ \
process::dispatch( \
pid, \
- &DispatchProcess::CAT(invoke, N)<ENUM_PARAMS(N, A)>, \
+ &ExecutorProcess::CAT(invoke, N)<ENUM_PARAMS(N, A)>, \
f, ENUM_PARAMS(N, a)); \
}
- REPEAT_FROM_TO(1, 11, APPLY, _) // Args A0 -> A9.
-#undef APPLY
+ REPEAT_FROM_TO(1, 11, TEMPLATE, _) // Args A0 -> A9.
+#undef TEMPLATE
- DispatchProcess process;
+ ExecutorProcess process;
};
-} // namespace async {
-
-#undef CAT
-#undef INC
-#undef INTERCEPT
-#undef ENUM_PARAMS
-#undef ENUM_BINARY_PARAMS
-#undef ENUM_TRAILING_PARAMS
-#undef REPEAT
-#undef REPEAT_FROM_TO
+} // namespace process {
-#endif // __PROCESS_ASYNC_HPP__
+#endif // __PROCESS_EXECUTOR_HPP__
Added: incubator/mesos/trunk/third_party/libprocess/include/process/filter.hpp
URL: http://svn.apache.org/viewvc/incubator/mesos/trunk/third_party/libprocess/include/process/filter.hpp?rev=1236485&view=auto
==============================================================================
--- incubator/mesos/trunk/third_party/libprocess/include/process/filter.hpp (added)
+++ incubator/mesos/trunk/third_party/libprocess/include/process/filter.hpp Fri Jan 27 01:25:13 2012
@@ -0,0 +1,23 @@
+#ifndef __PROCESS_FILTER_HPP__
+#define __PROCESS_FILTER_HPP__
+
+#include <process/event.hpp>
+
+namespace process {
+
+class Filter {
+public:
+ virtual bool filter(const MessageEvent& event) { return false; }
+ virtual bool filter(const DispatchEvent& event) { return false; }
+ virtual bool filter(const HttpEvent& event) { return false; }
+ virtual bool filter(const ExitedEvent& event) { return false; }
+};
+
+
+// Use the specified filter on messages that get enqueued (note,
+// however, that you cannot filter timeout messages).
+void filter(Filter* filter);
+
+} // namespace process {
+
+#endif // __PROCESS_FILTER_HPP__
Modified: incubator/mesos/trunk/third_party/libprocess/include/process/future.hpp
URL: http://svn.apache.org/viewvc/incubator/mesos/trunk/third_party/libprocess/include/process/future.hpp?rev=1236485&r1=1236484&r2=1236485&view=diff
==============================================================================
--- incubator/mesos/trunk/third_party/libprocess/include/process/future.hpp (original)
+++ incubator/mesos/trunk/third_party/libprocess/include/process/future.hpp Fri Jan 27 01:25:13 2012
@@ -1,15 +1,17 @@
#ifndef __PROCESS_FUTURE_HPP__
#define __PROCESS_FUTURE_HPP__
+#include <assert.h>
+
#include <queue>
#include <set>
#include <tr1/functional>
+#include <tr1/memory> // TODO(benh): Replace shared_ptr with unique_ptr.
#include <process/latch.hpp>
#include <process/option.hpp>
-
namespace process {
// Forward declaration of Promise.
@@ -29,9 +31,9 @@ public:
Future(const Future<T>& that);
~Future();
- // Futures are assignable. This results in the reference to the
- // previous future data being decremented and a reference to 'that'
- // being incremented.
+ // Futures are assignable (and copyable). This results in the
+ // reference to the previous future data being decremented and a
+ // reference to 'that' being incremented.
Future<T>& operator = (const Future<T>& that);
// Comparision operators useful for using futures in collections.
@@ -108,15 +110,13 @@ private:
};
-// TODO(benh): Consider making promise a non-copyable, non-assignabl
-// subclass of Future. For now, they'll live as distinct types.
+// TODO(benh): Making Promise a subclass of Future?
template <typename T>
class Promise
{
public:
Promise();
Promise(const T& t);
- Promise(const Promise<T>& that);
~Promise();
bool set(const T& _t);
@@ -126,8 +126,9 @@ public:
Future<T> future() const;
private:
- // Not assignable.
- void operator = (const Promise<T>&);
+ // Not copyable, not assignable.
+ Promise(const Promise<T>&);
+ Promise<T>& operator = (const Promise<T>&);
Future<T> f;
};
@@ -151,11 +152,6 @@ Promise<T>::Promise(const T& t)
template <typename T>
-Promise<T>::Promise(const Promise<T>& that)
- : f(that.f) {}
-
-
-template <typename T>
Promise<T>::~Promise() {}
@@ -197,62 +193,56 @@ inline void release(int* lock)
assert(unlocked);
}
-namespace callbacks {
-
template <typename T>
-void select(const Future<T>& future, Promise<Future<T> > promise)
+void select(
+ const Future<T>& future,
+ std::tr1::shared_ptr<Promise<Future<T > > > promise)
{
// We never fail the future associated with our promise.
- assert(!promise.future().isFailed());
+ assert(!promise->future().isFailed());
- if (promise.future().isPending()) { // Avoid acquiring a lock.
+ if (promise->future().isPending()) { // No-op if it's discarded.
if (future.isReady()) { // We only set the promise if a future is ready.
- promise.set(future);
+ promise->set(future);
}
}
}
-} // namespace callback {
} // namespace internal {
// TODO(benh): Move select and discard into 'futures' namespace.
-// Returns an option of a ready future or none in the event of
-// timeout. Note that select DOES NOT return for a future that has
-// failed or been discarded.
+// Returns a future that captures any ready future in a set. Note that
+// select DOES NOT capture a future that has failed or been discarded.
template <typename T>
-Option<Future<T> > select(const std::set<Future<T> >& futures, double secs)
+Future<Future<T> > select(const std::set<Future<T> >& futures)
{
- Promise<Future<T> > promise;
+ std::tr1::shared_ptr<Promise<Future<T> > > promise(
+ new Promise<Future<T> >());
- std::tr1::function<void(const Future<T>&)> callback =
- std::tr1::bind(internal::callbacks::select<T>,
- std::tr1::placeholders::_1, promise);
+ Future<Future<T> > future = promise->future();
+
+ std::tr1::function<void(const Future<T>&)> select =
+ std::tr1::bind(&internal::select<T>,
+ std::tr1::placeholders::_1,
+ promise);
typename std::set<Future<T> >::iterator iterator;
for (iterator = futures.begin(); iterator != futures.end(); ++iterator) {
- const Future<T>& future = *iterator;
- future.onAny(callback);
+ (*iterator).onAny(select);
}
- Future<Future<T> > future = promise.future();
-
- if (future.await(secs)) {
- return Option<Future<T> >::some(future.get());
- } else {
- future.discard();
- return Option<Future<T> >::none();
- }
+ return future;
}
template <typename T>
void discard(const std::set<Future<T> >& futures)
{
- typename std::set<Future<T> >::const_iterator iterator;
+ typename std::set<Future<T> >::iterator iterator;
for (iterator = futures.begin(); iterator != futures.end(); ++iterator) {
- Future<T> future = *iterator;
+ Future<T> future = *iterator; // Need a non-const copy to discard.
future.discard();
}
}
Modified: incubator/mesos/trunk/third_party/libprocess/include/process/gc.hpp
URL: http://svn.apache.org/viewvc/incubator/mesos/trunk/third_party/libprocess/include/process/gc.hpp?rev=1236485&r1=1236484&r2=1236485&view=diff
==============================================================================
--- incubator/mesos/trunk/third_party/libprocess/include/process/gc.hpp (original)
+++ incubator/mesos/trunk/third_party/libprocess/include/process/gc.hpp Fri Jan 27 01:25:13 2012
@@ -11,7 +11,7 @@ namespace process {
class GarbageCollector : public Process<GarbageCollector>
{
public:
- GarbageCollector() {}
+ GarbageCollector() : ProcessBase("__gc__") {}
virtual ~GarbageCollector() {}
template <typename T>
@@ -25,15 +25,12 @@ public:
}
protected:
- virtual void operator () ()
+ virtual void exited(const UPID& pid)
{
- while (true) {
- serve();
- if (name() == EXITED && processes.count(from()) > 0) {
- const ProcessBase* process = processes[from()];
- processes.erase(from());
- delete process;
- }
+ if (processes.count(pid) > 0) {
+ const ProcessBase* process = processes[pid];
+ processes.erase(pid);
+ delete process;
}
}
Modified: incubator/mesos/trunk/third_party/libprocess/include/process/latch.hpp
URL: http://svn.apache.org/viewvc/incubator/mesos/trunk/third_party/libprocess/include/process/latch.hpp?rev=1236485&r1=1236484&r2=1236485&view=diff
==============================================================================
--- incubator/mesos/trunk/third_party/libprocess/include/process/latch.hpp (original)
+++ incubator/mesos/trunk/third_party/libprocess/include/process/latch.hpp Fri Jan 27 01:25:13 2012
@@ -5,16 +5,14 @@
namespace process {
-class LatchProcess;
-
class Latch
{
public:
Latch();
virtual ~Latch();
- bool operator == (const Latch& that) const { return latch == that.latch; }
- bool operator < (const Latch& that) const { return latch < that.latch; }
+ bool operator == (const Latch& that) const { return pid == that.pid; }
+ bool operator < (const Latch& that) const { return pid < that.pid; }
void trigger();
bool await(double secs = 0);
@@ -25,7 +23,7 @@ private:
Latch& operator = (const Latch& that);
bool triggered;
- PID<LatchProcess> latch;
+ UPID pid;
};
} // namespace process {
Added: incubator/mesos/trunk/third_party/libprocess/include/process/message.hpp
URL: http://svn.apache.org/viewvc/incubator/mesos/trunk/third_party/libprocess/include/process/message.hpp?rev=1236485&view=auto
==============================================================================
--- incubator/mesos/trunk/third_party/libprocess/include/process/message.hpp (added)
+++ incubator/mesos/trunk/third_party/libprocess/include/process/message.hpp Fri Jan 27 01:25:13 2012
@@ -0,0 +1,20 @@
+#ifndef __PROCESS_MESSAGE_HPP__
+#define __PROCESS_MESSAGE_HPP__
+
+#include <string>
+
+#include <process/pid.hpp>
+
+namespace process {
+
+struct Message
+{
+ std::string name;
+ UPID from;
+ UPID to;
+ std::string body;
+};
+
+} // namespace process {
+
+#endif // __PROCESS_MESSAGE_HPP__
Added: incubator/mesos/trunk/third_party/libprocess/include/process/preprocessor.hpp
URL: http://svn.apache.org/viewvc/incubator/mesos/trunk/third_party/libprocess/include/process/preprocessor.hpp?rev=1236485&view=auto
==============================================================================
--- incubator/mesos/trunk/third_party/libprocess/include/process/preprocessor.hpp (added)
+++ incubator/mesos/trunk/third_party/libprocess/include/process/preprocessor.hpp Fri Jan 27 01:25:13 2012
@@ -0,0 +1,29 @@
+#ifndef __PROCESS_PREPROCESSOR_HPP__
+#define __PROCESS_PREPROCESSOR_HPP__
+
+#include <boost/preprocessor/cat.hpp>
+
+#include <boost/preprocessor/arithmetic/inc.hpp>
+
+#include <boost/preprocessor/facilities/intercept.hpp>
+
+#include <boost/preprocessor/repetition/enum_params.hpp>
+#include <boost/preprocessor/repetition/enum_binary_params.hpp>
+#include <boost/preprocessor/repetition/enum_trailing_params.hpp>
+#include <boost/preprocessor/repetition/repeat.hpp>
+#include <boost/preprocessor/repetition/repeat_from_to.hpp>
+
+// Provides aliases to a bunch of preprocessor macros useful for
+// creating template definitions that have varying number of
+// parameters (should be removable with C++-11 variadic templates).
+
+#define CAT BOOST_PP_CAT
+#define INC BOOST_PP_INC
+#define INTERCEPT BOOST_PP_INTERCEPT
+#define ENUM_PARAMS BOOST_PP_ENUM_PARAMS
+#define ENUM_BINARY_PARAMS BOOST_PP_ENUM_BINARY_PARAMS
+#define ENUM_TRAILING_PARAMS BOOST_PP_ENUM_TRAILING_PARAMS
+#define REPEAT BOOST_PP_REPEAT
+#define REPEAT_FROM_TO BOOST_PP_REPEAT_FROM_TO
+
+#endif // __PROCESS_PREPROCESSOR_HPP__
Modified: incubator/mesos/trunk/third_party/libprocess/include/process/process.hpp
URL: http://svn.apache.org/viewvc/incubator/mesos/trunk/third_party/libprocess/include/process/process.hpp?rev=1236485&r1=1236484&r2=1236485&view=diff
==============================================================================
--- incubator/mesos/trunk/third_party/libprocess/include/process/process.hpp (original)
+++ incubator/mesos/trunk/third_party/libprocess/include/process/process.hpp Fri Jan 27 01:25:13 2012
@@ -1,57 +1,24 @@
#ifndef __PROCESS_PROCESS_HPP__
#define __PROCESS_PROCESS_HPP__
-#include <assert.h>
#include <stdint.h>
-#include <stdlib.h>
-#include <ucontext.h>
-
-#include <sys/time.h>
+#include <pthread.h>
#include <map>
#include <queue>
#include <tr1/functional>
-#include <process/future.hpp>
+#include <process/clock.hpp>
+#include <process/event.hpp>
+#include <process/filter.hpp>
#include <process/http.hpp>
+#include <process/message.hpp>
#include <process/pid.hpp>
-
namespace process {
-const std::string NOTHING = "__process_nothing__";
-const std::string ERROR = "__process_error__";
-const std::string TIMEOUT = "__process_timeout__";
-const std::string EXITED = "__process_exited__";
-const std::string TERMINATE = "__process_terminate__";
-
-
-struct Message {
- std::string name;
- UPID from;
- UPID to;
- std::string body;
-};
-
-
-class Clock {
-public:
- static double now();
- static void pause();
- static void resume();
- static void advance(double secs);
-};
-
-
-class Filter {
-public:
- // TODO(benh): Support filtering HTTP requests?
- virtual bool filter(Message*) = 0;
-};
-
-
-class ProcessBase
+class ProcessBase : public EventVisitor
{
public:
ProcessBase(const std::string& id = "");
@@ -60,174 +27,158 @@ public:
UPID self() const { return pid; }
- static UPID spawn(ProcessBase* process, bool manage = false);
-
protected:
- // Function run when process spawned.
-// virtual void operator () () = 0;
- virtual void operator () ()
+ // Invoked when an event is serviced.
+ virtual void serve(const Event& event)
{
- do { if (serve() == TERMINATE) break; } while (true);
+ event.visit(this);
}
- // Returns the sender's PID of the last dequeued (current) message.
- UPID from() const;
-
- // Returns the name of the last dequeued (current) message.
- const std::string& name() const;
-
- // Returns the body of the last dequeued (current) message.
- const std::string& body() const;
-
- // Put a message at front of queue.
- void inject(const UPID& from,
- const std::string& name,
- const char* data = NULL,
- size_t length = 0);
+ // Callbacks used to visit (i.e., handle) a specific event.
+ virtual void visit(const MessageEvent& event);
+ virtual void visit(const DispatchEvent& event);
+ virtual void visit(const HttpEvent& event);
+ virtual void visit(const ExitedEvent& event);
+ virtual void visit(const TerminateEvent& event);
+
+ // Invoked when a process gets spawned.
+ virtual void initialize() {}
+
+ // Invoked when a process is terminated (unless visit is overriden).
+ virtual void finalize() {}
+
+ // Invoked when a linked process has exited (see link).
+ virtual void exited(const UPID& pid) {}
+
+ // Invoked when a linked process can no longer be monitored (see link).
+ virtual void lost(const UPID& pid) {}
+
+ // Puts a message at front of queue.
+ void inject(
+ const UPID& from,
+ const std::string& name,
+ const char* data = NULL,
+ size_t length = 0);
// Sends a message with data to PID.
- void send(const UPID& to,
- const std::string &name,
- const char *data = NULL,
- size_t length = 0);
-
- // Blocks for message at most specified seconds (0 implies forever).
- std::string receive(double secs = 0);
-
- // Processes dispatch messages.
- std::string serve(double secs = 0, bool once = false);
-
- // Blocks at least specified seconds (may block longer).
- void pause(double secs);
+ void send(
+ const UPID& to,
+ const std::string& name,
+ const char* data = NULL,
+ size_t length = 0);
- // Links with the specified PID.
+ // Links with the specified PID. Linking with a process from within
+ // the same "operating system process" is gauranteed to give you
+ // perfect monitoring of that process. However, linking with a
+ // process on another machine might result in receiving lost
+ // callbacks due to the nature of a distributed environment.
UPID link(const UPID& pid);
- // IO events for polling.
- enum { RDONLY = 01, WRONLY = 02, RDWR = 03 };
-
- // Wait until operation is ready for file descriptor (or message
- // received ignore is false).
- bool poll(int fd, int op, double secs = 0, bool ignore = true);
-
- // Returns true if operation on file descriptor is ready.
- bool ready(int fd, int op);
+ // The default visit implementation for message events invokes
+ // installed message handlers, or delegates the message to another
+ // process (a delegate can be installed below but a message handler
+ // always takes precedence over delegating). A message handler is
+ // any function which takes two arguments, the "from" pid and the
+ // message body.
+ typedef std::tr1::function<void(const UPID&, const std::string&)>
+ MessageHandler;
- // Returns sub-second elapsed time (according to this process).
- double elapsedTime();
-
- // Delegate incoming message's with the specified name to pid.
- void delegate(const std::string& name, const UPID& pid)
- {
- delegates[name] = pid;
- }
-
- typedef std::tr1::function<void()> MessageHandler;
-
- // Install a handler for a message.
- void installMessageHandler(
+ // Setup a handler for a message.
+ void install(
const std::string& name,
const MessageHandler& handler)
{
- messageHandlers[name] = handler;
+ handlers.message[name] = handler;
}
template <typename T>
- void installMessageHandler(
+ void install(
const std::string& name,
- void (T::*method)())
+ void (T::*method)(const UPID&, const std::string&))
+ {
+ // Note that we use dynamic_cast here so a process can use
+ // multiple inheritance if it sees so fit (e.g., to implement
+ // multiple callback interfaces).
+ MessageHandler handler =
+ std::tr1::bind(method,
+ dynamic_cast<T*>(this),
+ std::tr1::placeholders::_1,
+ std::tr1::placeholders::_2);
+ install(name, handler);
+ }
+
+ // Delegate incoming message's with the specified name to pid.
+ void delegate(const std::string& name, const UPID& pid)
{
- MessageHandler handler = std::tr1::bind(method, dynamic_cast<T*>(this));
- installMessageHandler(name, handler);
+ delegates[name] = pid;
}
- typedef std::tr1::function<Promise<HttpResponse>(const HttpRequest&)>
- HttpRequestHandler;
+ // The default visit implementation for HTTP events invokes
+ // installed HTTP handlers. A HTTP handler is any function which
+ // takes an HttpRequest object and returns and HttpResponse.
+ typedef std::tr1::function<Future<HttpResponse>(const HttpRequest&)>
+ HttpRequestHandler;
- // Install a handler for an HTTP request.
- void installHttpHandler(
+ // Setup a handler for an HTTP request.
+ void route(
const std::string& name,
const HttpRequestHandler& handler)
{
- httpHandlers[name] = handler;
+ handlers.http[name] = handler;
}
template <typename T>
- void installHttpHandler(
+ void route(
const std::string& name,
- Promise<HttpResponse> (T::*method)(const HttpRequest&))
+ Future<HttpResponse> (T::*method)(const HttpRequest&))
{
+ // Note that we use dynamic_cast here so a process can use
+ // multiple inheritance if it sees so fit (e.g., to implement
+ // multiple callback interfaces).
HttpRequestHandler handler =
std::tr1::bind(method, dynamic_cast<T*>(this),
std::tr1::placeholders::_1);
- installHttpHandler(name, handler);
+ route(name, handler);
}
private:
friend class SocketManager;
friend class ProcessManager;
friend class ProcessReference;
- friend void* schedule(void *);
+ friend void* schedule(void*);
// Process states.
- enum { INIT,
- READY,
+ enum { BOTTOM,
+ READY,
RUNNING,
- RECEIVING,
- SERVING,
- PAUSED,
- POLLING,
- WAITING,
- INTERRUPTED,
- TIMEDOUT,
- FINISHING,
+ BLOCKED,
FINISHED } state;
- // Lock/mutex protecting internals.
+ // Mutex protecting internals. TODO(benh): Replace with a spinlock.
pthread_mutex_t m;
void lock() { pthread_mutex_lock(&m); }
void unlock() { pthread_mutex_unlock(&m); }
- // Enqueue the specified message, request, or dispatcher.
- void enqueue(Message* message, bool inject = false);
- void enqueue(std::pair<HttpRequest*, Promise<HttpResponse>*>* request);
- void enqueue(std::tr1::function<void(ProcessBase*)>* dispatcher);
-
- // Dequeue a message, request, or dispatcher, or returns NULL.
- template <typename T> T* dequeue();
+ // Enqueue the specified message, request, or function call.
+ void enqueue(Event* event, bool inject = false);
- // Queue of received messages.
- std::deque<Message*> messages;
-
- // Queue of HTTP requests (with the promise used for responses).
- std::deque<std::pair<HttpRequest*, Promise<HttpResponse>*>*> requests;
-
- // Queue of dispatchers.
- std::deque<std::tr1::function<void(ProcessBase*)>*> dispatchers;
+ // Queue of received events.
+ std::deque<Event*> events;
// Delegates for messages.
std::map<std::string, UPID> delegates;
- // Handlers for messages.
- std::map<std::string, MessageHandler> messageHandlers;
-
- // Handlers for HTTP requests.
- std::map<std::string, HttpRequestHandler> httpHandlers;
-
- // Current message.
- Message* current;
+ // Handlers for messages and HTTP requests.
+ struct {
+ std::map<std::string, MessageHandler> message;
+ std::map<std::string, HttpRequestHandler> http;
+ } handlers;
// Active references.
int refs;
- // Current "blocking" generation.
- int generation;
-
// Process PID.
UPID pid;
-
- // Continuation/Context of process.
- ucontext_t uctx;
};
@@ -257,10 +208,12 @@ void initialize(bool initialize_google_l
* @param process process to be spawned
* @param manage boolean whether process should get garbage collected
*/
+UPID spawn(ProcessBase* process, bool manage = false);
+
template <typename T>
PID<T> spawn(T* t, bool manage = false)
{
- if (!ProcessBase::spawn(t, manage)) {
+ if (!spawn(static_cast<ProcessBase*>(t), manage)) {
return PID<T>();
}
@@ -301,23 +254,6 @@ bool wait(const ProcessBase* process, do
/**
- * Invoke the thunk in a legacy safe way (i.e., outside of libprocess).
- *
- * @param thunk function to be invoked
- */
-void invoke(const std::tr1::function<void(void)>& thunk);
-
-
-/**
- * Use the specified filter on messages that get enqueued (note,
- * however, that you cannot filter timeout messages).
- *
- * @param filter message filter
- */
-void filter(Filter* filter);
-
-
-/**
* Sends a message with data without a return address.
*
* @param to receiver
Modified: incubator/mesos/trunk/third_party/libprocess/include/process/protobuf.hpp
URL: http://svn.apache.org/viewvc/incubator/mesos/trunk/third_party/libprocess/include/process/protobuf.hpp?rev=1236485&r1=1236484&r2=1236485&view=diff
==============================================================================
--- incubator/mesos/trunk/third_party/libprocess/include/process/protobuf.hpp (original)
+++ incubator/mesos/trunk/third_party/libprocess/include/process/protobuf.hpp Fri Jan 27 01:25:13 2012
@@ -72,20 +72,15 @@ public:
virtual ~ProtobufProcess() {}
protected:
- virtual void operator () ()
+ virtual void visit(const process::MessageEvent& event)
{
- // TODO(benh): Shouldn't we just make Process::serve be a virtual
- // function, and then the one we get from process::Process will be
- // sufficient?
- do { if (serve() == process::TERMINATE) break; } while (true);
- }
-
- template <typename M>
- M message()
- {
- M m;
- m.ParseFromString(process::Process<T>::body());
- return m;
+ if (protobufHandlers.count(event.message->name) > 0) {
+ from = event.message->from; // For 'reply'.
+ protobufHandlers[event.message->name](event.message->body);
+ from = process::UPID();
+ } else {
+ process::Process<T>::visit(event);
+ }
}
void send(const process::UPID& to,
@@ -99,20 +94,16 @@ protected:
using process::Process<T>::send;
- std::string serve(double secs = 0, bool once = false)
+ void reply(const google::protobuf::Message& message)
{
- do {
- const std::string& name = process::Process<T>::serve(secs, once);
- if (protobufHandlers.count(name) > 0) {
- protobufHandlers[name](process::Process<T>::body());
- } else {
- return name;
- }
- } while (!once);
+ CHECK(from) << "Attempting to reply without a sender";
+ std::string data;
+ message.SerializeToString(&data);
+ send(from, message);
}
template <typename M>
- void installProtobufHandler(void (T::*method)(const M&))
+ void install(void (T::*method)(const M&))
{
google::protobuf::Message* m = new M();
T* t = static_cast<T*>(this);
@@ -124,7 +115,7 @@ protected:
}
template <typename M>
- void installProtobufHandler(void (T::*method)())
+ void install(void (T::*method)())
{
google::protobuf::Message* m = new M();
T* t = static_cast<T*>(this);
@@ -137,7 +128,7 @@ protected:
template <typename M,
typename P1, typename P1C>
- void installProtobufHandler(void (T::*method)(P1C),
+ void install(void (T::*method)(P1C),
P1 (M::*param1)() const)
{
google::protobuf::Message* m = new M();
@@ -152,7 +143,7 @@ protected:
template <typename M,
typename P1, typename P1C,
typename P2, typename P2C>
- void installProtobufHandler(void (T::*method)(P1C, P2C),
+ void install(void (T::*method)(P1C, P2C),
P1 (M::*p1)() const,
P2 (M::*p2)() const)
{
@@ -169,7 +160,7 @@ protected:
typename P1, typename P1C,
typename P2, typename P2C,
typename P3, typename P3C>
- void installProtobufHandler(void (T::*method)(P1C, P2C, P3C),
+ void install(void (T::*method)(P1C, P2C, P3C),
P1 (M::*p1)() const,
P2 (M::*p2)() const,
P3 (M::*p3)() const)
@@ -188,7 +179,7 @@ protected:
typename P2, typename P2C,
typename P3, typename P3C,
typename P4, typename P4C>
- void installProtobufHandler(void (T::*method)(P1C, P2C, P3C, P4C),
+ void install(void (T::*method)(P1C, P2C, P3C, P4C),
P1 (M::*p1)() const,
P2 (M::*p2)() const,
P3 (M::*p3)() const,
@@ -209,7 +200,7 @@ protected:
typename P3, typename P3C,
typename P4, typename P4C,
typename P5, typename P5C>
- void installProtobufHandler(void (T::*method)(P1C, P2C, P3C, P4C, P5C),
+ void install(void (T::*method)(P1C, P2C, P3C, P4C, P5C),
P1 (M::*p1)() const,
P2 (M::*p2)() const,
P3 (M::*p3)() const,
@@ -225,6 +216,10 @@ protected:
delete m;
}
+ using process::Process<T>::install;
+
+ process::UPID from; // Sender of "current" message, accessible by subclasses.
+
private:
template <typename M>
static void handlerM(T* t, void (T::*method)(const M&),
@@ -374,18 +369,19 @@ public:
ReqResProcess(const process::UPID& _pid, const Req& _req)
: pid(_pid), req(_req)
{
- Super::template installProtobufHandler<Res>(
+ Super::template install<Res>(
&ReqResProcess<Req, Res>::response);
}
- process::Promise<Res> run()
+ process::Future<Res> run()
{
send(pid, req);
std::tr1::function<void(const process::Future<Res>&)> callback =
std::tr1::bind(&ReqResProcess<Req, Res>::terminate,
- std::tr1::placeholders::_1, Super::self());
+ std::tr1::placeholders::_1,
+ Super::self());
promise.future().onAny(callback);
- return promise;
+ return promise.future();
}
private:
Modified: incubator/mesos/trunk/third_party/libprocess/include/process/run.hpp
URL: http://svn.apache.org/viewvc/incubator/mesos/trunk/third_party/libprocess/include/process/run.hpp?rev=1236485&r1=1236484&r2=1236485&view=diff
==============================================================================
--- incubator/mesos/trunk/third_party/libprocess/include/process/run.hpp (original)
+++ incubator/mesos/trunk/third_party/libprocess/include/process/run.hpp Fri Jan 27 01:25:13 2012
@@ -1,162 +1,78 @@
#ifndef __PROCESS_RUN_HPP__
#define __PROCESS_RUN_HPP__
-#include <process/process.hpp>
+#include <tr1/memory> // TODO(benh): Replace shared_ptr with unique_ptr.
+#include <process/process.hpp>
+#include <process/preprocessor.hpp>
namespace process {
-template <typename R>
-Future<R> run(R (*method)());
-
-
-template <typename R, typename P1, typename A1>
-Future<R> run(R (*method)(P1), A1 a1);
-
-
-template <typename R,
- typename P1, typename P2,
- typename A1, typename A2>
-Future<R> run(R (*method)(P1, P2), A1 a1, A2 a2);
-
-
-template <typename R,
- typename P1, typename P2, typename P3,
- typename A1, typename A2, typename A3>
-Future<R> run(R (*method)(P1, P2, P3), A1 a1, A2 a2, A3 a3);
-
-
-template <typename R,
- typename P1, typename P2, typename P3, typename P4,
- typename A1, typename A2, typename A3, typename A4>
-Future<R> run(R (*method)(P1, P2, P3, P4), A1 a1, A2 a2, A3 a3, A4 a4);
-
-
-template <typename R,
- typename P1, typename P2, typename P3, typename P4, typename P5,
- typename A1, typename A2, typename A3, typename A4, typename A5>
-Future<R> run(R (*method)(P1, P2, P3, P4, P5),
- A1 a1, A2 a2, A3 a3, A4 a4, A5 a5);
-
-
namespace internal {
template <typename R>
class ThunkProcess : public Process<ThunkProcess<R> >
{
public:
- ThunkProcess(const std::tr1::function<R(void)>& _thunk,
- const Promise<R>& _promise)
+ ThunkProcess(std::tr1::shared_ptr<std::tr1::function<R(void)> > _thunk,
+ std::tr1::shared_ptr<Promise<R> > _promise)
: thunk(_thunk),
promise(_promise) {}
virtual ~ThunkProcess() {}
protected:
- virtual void operator () ()
+ virtual void serve(const Event& event)
{
- promise.set(thunk());
+ promise->set((*thunk)());
}
private:
- std::tr1::function<R(void)> thunk;
- Promise<R> promise;
+ std::tr1::shared_ptr<std::tr1::function<R(void)> > thunk;
+ std::tr1::shared_ptr<Promise<R> > promise;
};
} // namespace internal {
template <typename R>
-Future<R> run(R (*method)())
+Future<R> run(R (*method)(void))
{
- std::tr1::function<R(void)> thunk =
- std::tr1::bind(method);
-
- Promise<R> promise;
-
- spawn(new internal::ThunkProcess<R>(thunk, promise), true);
-
- return promise.future();
+ std::tr1::shared_ptr<std::tr1::function<R(void)> > thunk(
+ new std::tr1::function<R(void)>(
+ std::tr1::bind(method)));
+
+ std::tr1::shared_ptr<Promise<R> > promise(new Promise<R>());
+ Future<R> future = promise->future();
+
+ terminate(spawn(new internal::ThunkProcess<R>(thunk, promise), true));
+
+ return future;
}
-template <typename R, typename P1, typename A1>
-Future<R> run(R (*method)(P1), A1 a1)
-{
- std::tr1::function<R(void)> thunk =
- std::tr1::bind(method, a1);
-
- Promise<R> promise;
-
- spawn(new internal::ThunkProcess<R>(thunk, promise), true);
-
- return promise.future();
-}
-
-
-template <typename R,
- typename P1, typename P2,
- typename A1, typename A2>
-Future<R> run(R (*method)(P1, P2), A1 a1, A2 a2)
-{
- std::tr1::function<R(void)> thunk =
- std::tr1::bind(method, a1, a2);
-
- Promise<R> promise;
-
- spawn(new internal::ThunkProcess<R>(thunk, promise), true);
-
- return promise.future();
-}
-
-
-template <typename R,
- typename P1, typename P2, typename P3,
- typename A1, typename A2, typename A3>
-Future<R> run(R (*method)(P1, P2, P3), A1 a1, A2 a2, A3 a3)
-{
- std::tr1::function<R(void)> thunk =
- std::tr1::bind(method, a1, a2, a3);
-
- Promise<R> promise;
-
- spawn(new internal::ThunkProcess<R>(thunk, promise), true);
-
- return promise.future();
-}
-
-
-template <typename R,
- typename P1, typename P2, typename P3, typename P4,
- typename A1, typename A2, typename A3, typename A4>
-Future<R> run(R (*method)(P1, P2, P3, P4), A1 a1, A2 a2, A3 a3, A4 a4)
-{
- std::tr1::function<R(void)> thunk =
- std::tr1::bind(method, a1, a2, a3, a4);
-
- Promise<R> promise;
-
- spawn(new internal::ThunkProcess<R>(thunk, promise), true);
-
- return promise.future();
-}
-
-
-template <typename R,
- typename P1, typename P2, typename P3, typename P4, typename P5,
- typename A1, typename A2, typename A3, typename A4, typename A5>
-Future<R> run(R (*method)(P1, P2, P3, P4, P5),
- A1 a1, A2 a2, A3 a3, A4 a4, A5 a5)
-{
- std::tr1::function<R(void)> thunk =
- std::tr1::bind(method, a1, a2, a3, a4, a5);
-
- Promise<R> promise;
-
- spawn(new internal::ThunkProcess<R>(thunk, promise), true);
+#define TEMPLATE(Z, N, DATA) \
+ template <typename R, \
+ ENUM_PARAMS(N, typename P), \
+ ENUM_PARAMS(N, typename A)> \
+ Future<R> run( \
+ R (*method)(ENUM_PARAMS(N, P)), \
+ ENUM_BINARY_PARAMS(N, A, a)) \
+ { \
+ std::tr1::shared_ptr<std::tr1::function<R(void)> > thunk( \
+ new std::tr1::function<R(void)>( \
+ std::tr1::bind(method, ENUM_PARAMS(N, a)))); \
+ \
+ std::tr1::shared_ptr<Promise<R> > promise(new Promise<R>()); \
+ Future<R> future = promise->future(); \
+ \
+ terminate(spawn(new internal::ThunkProcess<R>(thunk, promise), true)); \
+ \
+ return future; \
+ }
- return promise.future();
-}
+ REPEAT_FROM_TO(1, 11, TEMPLATE, _) // Args A0 -> A9.
+#undef TEMPLATE
} // namespace process {
Modified: incubator/mesos/trunk/third_party/libprocess/include/process/timer.hpp
URL: http://svn.apache.org/viewvc/incubator/mesos/trunk/third_party/libprocess/include/process/timer.hpp?rev=1236485&r1=1236484&r2=1236485&view=diff
==============================================================================
--- incubator/mesos/trunk/third_party/libprocess/include/process/timer.hpp (original)
+++ incubator/mesos/trunk/third_party/libprocess/include/process/timer.hpp Fri Jan 27 01:25:13 2012
@@ -4,100 +4,131 @@
#include <process/dispatch.hpp>
#include <process/process.hpp>
-
namespace process {
-class TimerProcess;
+// Timer support! Note that we don't store a pointer to the issuing
+// process (if there is one) because we can't dereference it because
+// it might no longer be valid. (Instead we can use the PID to check
+// if the issuing process is still valid and get a refernce to it).
+
+struct timer
+{
+ long id;
+ double timeout;
+ process::UPID pid;
+ std::tr1::function<void(void)> thunk;
+};
+
-class Timer
+inline bool operator == (const timer& left, const timer& right)
{
-public:
- Timer(double secs, const UPID& pid, internal::Dispatcher* dispatcher);
+ return left.id == right.id;
+}
- virtual ~Timer();
- void cancel();
+namespace timers {
-private:
- PID<TimerProcess> timer;
-};
+timer create(double secs, const std::tr1::function<void(void)>& thunk);
+void cancel(const timer& timer);
+
+} // namespace timers {
// Delay a dispatch to a process. Returns a timer which can attempted
// to be canceled if desired (but might be firing concurrently).
template <typename T>
-Timer delay(double secs,
+timer delay(double secs,
const PID<T>& pid,
void (T::*method)())
{
- std::tr1::function<void(T*)> thunk =
- std::tr1::bind(method, std::tr1::placeholders::_1);
+ std::tr1::shared_ptr<std::tr1::function<void(T*)> > thunk(
+ new std::tr1::function<void(T*)>(
+ std::tr1::bind(method, std::tr1::placeholders::_1)));
+
+ std::tr1::function<void(ProcessBase*)>* dispatcher =
+ new std::tr1::function<void(ProcessBase*)>(
+ std::tr1::bind(&internal::vdispatcher<T>,
+ std::tr1::placeholders::_1,
+ thunk));
- internal::Dispatcher* dispatcher = new internal::Dispatcher(
- std::tr1::bind(&internal::vdispatcher<T>,
- std::tr1::placeholders::_1,
- thunk));
+ std::tr1::function<void(void)> dispatch =
+ std::tr1::bind(internal::dispatch, pid, dispatcher);
- return Timer(secs, pid, dispatcher);
+ return timers::create(secs, dispatch);
}
template <typename T, typename P1, typename A1>
-Timer delay(double secs,
+timer delay(double secs,
const PID<T>& pid,
void (T::*method)(P1),
A1 a1)
{
- std::tr1::function<void(T*)> thunk =
- std::tr1::bind(method, std::tr1::placeholders::_1, a1);
+ std::tr1::shared_ptr<std::tr1::function<void(T*)> > thunk(
+ new std::tr1::function<void(T*)>(
+ std::tr1::bind(method, std::tr1::placeholders::_1, a1)));
+
+ std::tr1::function<void(ProcessBase*)>* dispatcher =
+ new std::tr1::function<void(ProcessBase*)>(
+ std::tr1::bind(&internal::vdispatcher<T>,
+ std::tr1::placeholders::_1,
+ thunk));
- internal::Dispatcher* dispatcher = new internal::Dispatcher(
- std::tr1::bind(&internal::vdispatcher<T>,
- std::tr1::placeholders::_1,
- thunk));
+ std::tr1::function<void(void)> dispatch =
+ std::tr1::bind(internal::dispatch, pid, dispatcher);
- return Timer(secs, pid, dispatcher);
+ return timers::create(secs, dispatch);
}
template <typename T,
typename P1, typename P2,
typename A1, typename A2>
-Timer delay(double secs,
+timer delay(double secs,
const PID<T>& pid,
void (T::*method)(P1, P2),
A1 a1, A2 a2)
{
- std::tr1::function<void(T*)> thunk =
- std::tr1::bind(method, std::tr1::placeholders::_1, a1, a2);
+ std::tr1::shared_ptr<std::tr1::function<void(T*)> > thunk(
+ new std::tr1::function<void(T*)>(
+ std::tr1::bind(method, std::tr1::placeholders::_1, a1, a2)));
+
+ std::tr1::function<void(ProcessBase*)>* dispatcher =
+ new std::tr1::function<void(ProcessBase*)>(
+ std::tr1::bind(&internal::vdispatcher<T>,
+ std::tr1::placeholders::_1,
+ thunk));
- internal::Dispatcher* dispatcher = new internal::Dispatcher(
- std::tr1::bind(&internal::vdispatcher<T>,
- std::tr1::placeholders::_1,
- thunk));
+ std::tr1::function<void(void)> dispatch =
+ std::tr1::bind(internal::dispatch, pid, dispatcher);
- return Timer(secs, pid, dispatcher);
+ return timers::create(secs, dispatch);
}
template <typename T,
typename P1, typename P2, typename P3,
typename A1, typename A2, typename A3>
-Timer delay(double secs,
+timer delay(double secs,
const PID<T>& pid,
void (T::*method)(P1, P2, P3),
A1 a1, A2 a2, A3 a3)
{
- std::tr1::function<void(T*)> thunk =
- std::tr1::bind(method, std::tr1::placeholders::_1, a1, a2, a3);
+ std::tr1::shared_ptr<std::tr1::function<void(T*)> > thunk(
+ new std::tr1::function<void(T*)>(
+ std::tr1::bind(method, std::tr1::placeholders::_1, a1, a2, a3)));
+
+ std::tr1::function<void(ProcessBase*)>* dispatcher =
+ new std::tr1::function<void(ProcessBase*)>(
+ std::tr1::bind(&internal::vdispatcher<T>,
+ std::tr1::placeholders::_1,
+ thunk));
- internal::Dispatcher* dispatcher = new internal::Dispatcher(
- std::tr1::bind(&internal::vdispatcher<T>,
- std::tr1::placeholders::_1,
- thunk));
+ std::tr1::function<void(void)> dispatch =
+ std::tr1::bind(internal::dispatch, pid, dispatcher);
- return Timer(secs, pid, dispatcher);
+ return timers::create(secs, dispatch);
}
} // namespace process {
Modified: incubator/mesos/trunk/third_party/libprocess/src/encoder.hpp
URL: http://svn.apache.org/viewvc/incubator/mesos/trunk/third_party/libprocess/src/encoder.hpp?rev=1236485&r1=1236484&r2=1236485&view=diff
==============================================================================
--- incubator/mesos/trunk/third_party/libprocess/src/encoder.hpp (original)
+++ incubator/mesos/trunk/third_party/libprocess/src/encoder.hpp Fri Jan 27 01:25:13 2012
@@ -5,6 +5,8 @@
#include <process/process.hpp>
+#include "foreach.hpp"
+
namespace process {
@@ -132,7 +134,6 @@ public:
}
};
-
} // namespace process {
#endif // __ENCODER_HPP__
Modified: incubator/mesos/trunk/third_party/libprocess/src/foreach.hpp
URL: http://svn.apache.org/viewvc/incubator/mesos/trunk/third_party/libprocess/src/foreach.hpp?rev=1236485&r1=1236484&r2=1236485&view=diff
==============================================================================
--- incubator/mesos/trunk/third_party/libprocess/src/foreach.hpp (original)
+++ incubator/mesos/trunk/third_party/libprocess/src/foreach.hpp Fri Jan 27 01:25:13 2012
@@ -1,8 +1,11 @@
-#ifndef FOREACH_HPP
-#define FOREACH_HPP
+#ifndef __FOREACH_HPP__
+#define __FOREACH_HPP__
#include <boost/foreach.hpp>
+#include <boost/tuple/tuple.hpp>
+
+
#define BOOST_FOREACH_PAIR(VARFIRST, VARSECOND, COL) \
BOOST_FOREACH_PREAMBLE() \
if (boost::foreach_detail_::auto_any_t _foreach_col = BOOST_FOREACH_CONTAIN(COL)) {} else \
@@ -23,8 +26,10 @@
#define foreach BOOST_FOREACH
#define foreachpair BOOST_FOREACH_PAIR
-#include <boost/tuple/tuple.hpp>
+#define foreachkey(VAR, COL) \
+ foreachpair (VAR, boost::tuples::ignore, COL)
-const boost::tuples::detail::swallow_assign _ = boost::tuples::ignore;
+#define foreachvalue(VAR, COL) \
+ foreachpair (boost::tuples::ignore, VAR, COL)
-#endif /* FOREACH_HPP */
+#endif // __FOREACH_HPP__
Modified: incubator/mesos/trunk/third_party/libprocess/src/latch.cpp
URL: http://svn.apache.org/viewvc/incubator/mesos/trunk/third_party/libprocess/src/latch.cpp?rev=1236485&r1=1236484&r2=1236485&view=diff
==============================================================================
--- incubator/mesos/trunk/third_party/libprocess/src/latch.cpp (original)
+++ incubator/mesos/trunk/third_party/libprocess/src/latch.cpp Fri Jan 27 01:25:13 2012
@@ -9,13 +9,6 @@ namespace process {
// within libprocess such that it doesn't cost a memory allocation, a
// spawn, a message send, a wait, and two user-space context-switchs.
-class LatchProcess : public Process<LatchProcess>
-{
-protected:
- virtual void operator () () { receive(); }
-};
-
-
Latch::Latch()
{
triggered = false;
@@ -25,20 +18,20 @@ Latch::Latch()
// deleting thread is holding. Hence, we only save the PID for
// triggering the latch and let the GC actually do the deleting
// (thus no waiting is necessary, and deadlocks are avoided).
- latch = spawn(new LatchProcess(), true);
+ pid = spawn(new ProcessBase(), true);
}
Latch::~Latch()
{
- post(latch, TERMINATE);
+ terminate(pid);
}
void Latch::trigger()
{
if (!triggered) {
- post(latch, TERMINATE);
+ terminate(pid);
triggered = true;
}
}
@@ -47,7 +40,7 @@ void Latch::trigger()
bool Latch::await(double secs)
{
if (!triggered) {
- return wait(latch, secs);
+ return wait(pid, secs);
}
return true;