You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@mesos.apache.org by mp...@apache.org on 2017/12/05 19:54:09 UTC
[1/4] mesos git commit: Added `CallableOnce` support in `defer`.
Repository: mesos
Updated Branches:
refs/heads/master 62b472731 -> 6839897c5
Added `CallableOnce` support in `defer`.
This allows `defer` result to be converted to `CallableOnce`, ensuring
that bound arguments are moved, when call is made, and avoiding making
copies of bound arguments.
Review: https://reviews.apache.org/r/63637/
Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/09b72e9b
Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/09b72e9b
Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/09b72e9b
Branch: refs/heads/master
Commit: 09b72e9bbf87793ce84df5d5f9d5f292c60fa5ee
Parents: 62b4727
Author: Dmitry Zhuk <dz...@twopensource.com>
Authored: Tue Dec 5 11:20:41 2017 -0800
Committer: Michael Park <mp...@apache.org>
Committed: Tue Dec 5 11:20:41 2017 -0800
----------------------------------------------------------------------
.../libprocess/include/process/deferred.hpp | 86 ++++++++++++++++++++
3rdparty/libprocess/src/tests/process_tests.cpp | 28 +++++++
2 files changed, 114 insertions(+)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/mesos/blob/09b72e9b/3rdparty/libprocess/include/process/deferred.hpp
----------------------------------------------------------------------
diff --git a/3rdparty/libprocess/include/process/deferred.hpp b/3rdparty/libprocess/include/process/deferred.hpp
index 8beb2b3..eb6c5be 100644
--- a/3rdparty/libprocess/include/process/deferred.hpp
+++ b/3rdparty/libprocess/include/process/deferred.hpp
@@ -105,6 +105,22 @@ struct _Deferred
});
}
+ operator lambda::CallableOnce<void()>() &&
+ {
+ if (pid.isNone()) {
+ return lambda::CallableOnce<void()>(std::forward<F>(f));
+ }
+
+ Option<UPID> pid_ = pid;
+
+ return lambda::CallableOnce<void()>(
+ lambda::partial(
+ [pid_](typename std::decay<F>::type&& f_) {
+ dispatch(pid_.get(), std::move(f_));
+ },
+ std::forward<F>(f)));
+ }
+
template <typename R>
operator Deferred<R()>() &&
{
@@ -137,6 +153,29 @@ struct _Deferred
});
}
+ template <typename R>
+ operator lambda::CallableOnce<R()>() &&
+ {
+ if (pid.isNone()) {
+ return lambda::CallableOnce<R()>(std::forward<F>(f));
+ }
+
+ Option<UPID> pid_ = pid;
+
+ return lambda::CallableOnce<R()>(
+ lambda::partial(
+ [pid_](typename std::decay<F>::type&& f_) {
+ return dispatch(pid_.get(), std::move(f_));
+ },
+ std::forward<F>(f)));
+ }
+
+// Expands to lambda::_$(N+1). N is zero-based, and placeholders are one-based.
+#define PLACEHOLDER(Z, N, DATA) CAT(lambda::_, INC(N))
+
+// This assumes type and variable base names are `P` and `p` respectively.
+#define FORWARD(Z, N, DATA) std::forward<P ## N>(p ## N)
+
// Due to a bug (http://gcc.gnu.org/bugzilla/show_bug.cgi?id=41933)
// with variadic templates and lambdas, we still need to do
// preprocessor expansions. In addition, due to a bug with clang (or
@@ -180,6 +219,28 @@ struct _Deferred
}); \
dispatch(pid_.get(), f__); \
}); \
+ } \
+ \
+ template <ENUM_PARAMS(N, typename P)> \
+ operator lambda::CallableOnce<void(ENUM_PARAMS(N, P))>() && \
+ { \
+ if (pid.isNone()) { \
+ return lambda::CallableOnce<void(ENUM_PARAMS(N, P))>( \
+ std::forward<F>(f)); \
+ } \
+ \
+ Option<UPID> pid_ = pid; \
+ \
+ return lambda::CallableOnce<void(ENUM_PARAMS(N, P))>( \
+ lambda::partial( \
+ [pid_](typename std::decay<F>::type&& f_, \
+ ENUM_BINARY_PARAMS(N, P, &&p)) { \
+ lambda::CallableOnce<void()> f__( \
+ lambda::partial(std::move(f_), ENUM(N, FORWARD, _))); \
+ dispatch(pid_.get(), std::move(f__)); \
+ }, \
+ std::forward<F>(f), \
+ ENUM(N, PLACEHOLDER, _))); \
}
REPEAT_FROM_TO(1, 3, TEMPLATE, _) // Args A0 -> A1.
@@ -222,11 +283,36 @@ struct _Deferred
}); \
return dispatch(pid_.get(), f__); \
}); \
+ } \
+ \
+ template <typename R, ENUM_PARAMS(N, typename P)> \
+ operator lambda::CallableOnce<R(ENUM_PARAMS(N, P))>() && \
+ { \
+ if (pid.isNone()) { \
+ return lambda::CallableOnce<R(ENUM_PARAMS(N, P))>( \
+ std::forward<F>(f)); \
+ } \
+ \
+ Option<UPID> pid_ = pid; \
+ \
+ return lambda::CallableOnce<R(ENUM_PARAMS(N, P))>( \
+ lambda::partial( \
+ [pid_](typename std::decay<F>::type&& f_, \
+ ENUM_BINARY_PARAMS(N, P, &&p)) { \
+ lambda::CallableOnce<R()> f__( \
+ lambda::partial(std::move(f_), ENUM(N, FORWARD, _))); \
+ return dispatch(pid_.get(), std::move(f__)); \
+ }, \
+ std::forward<F>(f), \
+ ENUM(N, PLACEHOLDER, _))); \
}
REPEAT_FROM_TO(1, 3, TEMPLATE, _) // Args A0 -> A1.
#undef TEMPLATE
+#undef FORWARD
+#undef PLACEHOLDER
+
private:
friend class Executor;
http://git-wip-us.apache.org/repos/asf/mesos/blob/09b72e9b/3rdparty/libprocess/src/tests/process_tests.cpp
----------------------------------------------------------------------
diff --git a/3rdparty/libprocess/src/tests/process_tests.cpp b/3rdparty/libprocess/src/tests/process_tests.cpp
index 4a3e3ca..58741b2 100644
--- a/3rdparty/libprocess/src/tests/process_tests.cpp
+++ b/3rdparty/libprocess/src/tests/process_tests.cpp
@@ -169,6 +169,9 @@ public:
void func5(MoveOnly&& mo) { func5_(mo); }
MOCK_METHOD1(func5_, void(const MoveOnly&));
+
+ bool func6(MoveOnly&& m1, MoveOnly&& m2, bool b) { return func6_(m1, m2, b); }
+ MOCK_METHOD3(func6_, bool(const MoveOnly&, const MoveOnly&, bool));
};
@@ -223,6 +226,11 @@ TEST(ProcessTest, THREADSAFE_Defer1)
EXPECT_CALL(process, func4(_, _))
.WillRepeatedly(ReturnArg<0>());
+ EXPECT_CALL(process, func5_(_));
+
+ EXPECT_CALL(process, func6_(_, _, _))
+ .WillRepeatedly(ReturnArg<2>());
+
PID<DispatchProcess> pid = spawn(&process);
ASSERT_FALSE(!pid);
@@ -270,6 +278,26 @@ TEST(ProcessTest, THREADSAFE_Defer1)
EXPECT_TRUE(future.get());
}
+ {
+ lambda::CallableOnce<void()> func5 =
+ defer(pid, &DispatchProcess::func5, MoveOnly());
+ std::move(func5)();
+ }
+
+ {
+ lambda::CallableOnce<Future<bool>(MoveOnly&&)> func6 =
+ defer(pid, &DispatchProcess::func6, MoveOnly(), lambda::_1, true);
+ future = std::move(func6)(MoveOnly());
+ EXPECT_TRUE(future.get());
+ }
+
+ {
+ lambda::CallableOnce<Future<bool>(MoveOnly&&)> func6 =
+ defer(pid, &DispatchProcess::func6, MoveOnly(), lambda::_1, false);
+ future = std::move(func6)(MoveOnly());
+ EXPECT_FALSE(future.get());
+ }
+
// Only take const &!
terminate(pid);
[4/4] mesos git commit: Made `Event` move-only in libprocess.
Posted by mp...@apache.org.
Made `Event` move-only in libprocess.
Review: https://reviews.apache.org/r/64347/
Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/6839897c
Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/6839897c
Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/6839897c
Branch: refs/heads/master
Commit: 6839897c5464fce6b8cbd253d959a7e2efd72987
Parents: c9e6a03
Author: Dmitry Zhuk <dz...@twopensource.com>
Authored: Tue Dec 5 11:21:27 2017 -0800
Committer: Michael Park <mp...@apache.org>
Committed: Tue Dec 5 11:21:27 2017 -0800
----------------------------------------------------------------------
3rdparty/libprocess/include/process/event.hpp | 6 ++----
1 file changed, 2 insertions(+), 4 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/mesos/blob/6839897c/3rdparty/libprocess/include/process/event.hpp
----------------------------------------------------------------------
diff --git a/3rdparty/libprocess/include/process/event.hpp b/3rdparty/libprocess/include/process/event.hpp
index 76bcdb8..2e93428 100644
--- a/3rdparty/libprocess/include/process/event.hpp
+++ b/3rdparty/libprocess/include/process/event.hpp
@@ -121,8 +121,7 @@ struct MessageEvent : Event
: message{std::move(name), from, to, std::move(data)} {}
MessageEvent(MessageEvent&& that) = default;
- // TODO(dzhuk) Make it movable only, when move is properly supported in defer.
- MessageEvent(const MessageEvent& that) = default;
+ MessageEvent(const MessageEvent& that) = delete;
MessageEvent& operator=(MessageEvent&&) = default;
MessageEvent& operator=(const MessageEvent&) = delete;
@@ -218,8 +217,7 @@ struct ExitedEvent : Event
: pid(_pid) {}
ExitedEvent(ExitedEvent&&) = default;
- // TODO(dzhuk) Make it movable only, when move is properly supported in defer.
- ExitedEvent(const ExitedEvent&) = default;
+ ExitedEvent(const ExitedEvent&) = delete;
ExitedEvent& operator=(ExitedEvent&&) = default;
ExitedEvent& operator=(const ExitedEvent&) = delete;
[3/4] mesos git commit: Used `std::move` for `Event`s consumption in
the master.
Posted by mp...@apache.org.
Used `std::move` for `Event`s consumption in the master.
Review: https://reviews.apache.org/r/63641/
Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/c9e6a03c
Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/c9e6a03c
Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/c9e6a03c
Branch: refs/heads/master
Commit: c9e6a03c02e9f8dc040b937ccd5ae89e5530fd7e
Parents: 8014e3f
Author: Dmitry Zhuk <dz...@twopensource.com>
Authored: Tue Dec 5 11:21:11 2017 -0800
Committer: Michael Park <mp...@apache.org>
Committed: Tue Dec 5 11:21:11 2017 -0800
----------------------------------------------------------------------
src/master/master.cpp | 28 +++++++++++-----------------
src/master/master.hpp | 6 +++---
2 files changed, 14 insertions(+), 20 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/mesos/blob/c9e6a03c/src/master/master.cpp
----------------------------------------------------------------------
diff --git a/src/master/master.cpp b/src/master/master.cpp
index 1fff323..e8257e7 100644
--- a/src/master/master.cpp
+++ b/src/master/master.cpp
@@ -1590,19 +1590,19 @@ void Master::consume(ExitedEvent&& event)
: Option<string>::none();
// Necessary to disambiguate below.
- typedef void(Self::*F)(const ExitedEvent&);
+ typedef void(Self::*F)(ExitedEvent&&);
if (principal.isSome() &&
frameworks.limiters.contains(principal.get()) &&
frameworks.limiters[principal.get()].isSome()) {
- frameworks.limiters[principal.get()].get()->limiter->acquire()
- .onReady(defer(self(), static_cast<F>(&Self::_consume), event));
+ frameworks.limiters[principal.get()].get()->limiter->acquire().onReady(
+ defer(self(), static_cast<F>(&Self::_consume), std::move(event)));
} else if ((principal.isNone() ||
!frameworks.limiters.contains(principal.get())) &&
isRegisteredFramework &&
frameworks.defaultLimiter.isSome()) {
- frameworks.defaultLimiter.get()->limiter->acquire()
- .onReady(defer(self(), static_cast<F>(&Self::_consume), event));
+ frameworks.defaultLimiter.get()->limiter->acquire().onReady(
+ defer(self(), static_cast<F>(&Self::_consume), std::move(event)));
} else {
_consume(std::move(event));
}
@@ -1612,7 +1612,7 @@ void Master::consume(ExitedEvent&& event)
// TODO(greggomann): Change this to accept an `Option<Principal>`
// when MESOS-7202 is resolved.
void Master::throttled(
- const MessageEvent& event,
+ MessageEvent&& event,
const Option<string>& principal)
{
// We already know a RateLimiter is used to throttle this event so
@@ -1625,13 +1625,11 @@ void Master::throttled(
frameworks.defaultLimiter.get()->messages--;
}
- // TODO(dzhuk): Use std::move(event), when defer supports
- // rvalue references as handler parameters.
- _consume(event);
+ _consume(std::move(event));
}
-void Master::_consume(const MessageEvent& event)
+void Master::_consume(MessageEvent&& event)
{
// Obtain the principal before processing the Message because the
// mapping may be deleted in handling 'UnregisterFrameworkMessage'
@@ -1641,9 +1639,7 @@ void Master::_consume(const MessageEvent& event)
? frameworks.principals[event.message.from]
: Option<string>::none();
- // TODO(dzhuk): Use std::move(event), when defer supports
- // rvalue references as handler parameters.
- ProtobufProcess<Master>::consume(MessageEvent(event));
+ ProtobufProcess<Master>::consume(std::move(event));
// Increment 'messages_processed' counter if it still exists.
// Note that it could be removed in handling
@@ -1683,11 +1679,9 @@ void Master::exceededCapacity(
}
-void Master::_consume(const ExitedEvent& event)
+void Master::_consume(ExitedEvent&& event)
{
- // TODO(dzhuk): Use std::move(event), when defer supports
- // rvalue references as handler parameters.
- Process<Master>::consume(ExitedEvent(event));
+ Process<Master>::consume(std::move(event));
}
http://git-wip-us.apache.org/repos/asf/mesos/blob/c9e6a03c/src/master/master.hpp
----------------------------------------------------------------------
diff --git a/src/master/master.hpp b/src/master/master.hpp
index d7fa536..a721131 100644
--- a/src/master/master.hpp
+++ b/src/master/master.hpp
@@ -572,12 +572,12 @@ protected:
// 'principal' being None indicates it is throttled by
// 'defaultLimiter'.
void throttled(
- const process::MessageEvent& event,
+ process::MessageEvent&& event,
const Option<std::string>& principal);
// Continuations of consume().
- void _consume(const process::MessageEvent& event);
- void _consume(const process::ExitedEvent& event);
+ void _consume(process::MessageEvent&& event);
+ void _consume(process::ExitedEvent&& event);
// Helper method invoked when the capacity for a framework
// principal is exceeded.
[2/4] mesos git commit: Added `CallableOnce` support in `Future`.
Posted by mp...@apache.org.
Added `CallableOnce` support in `Future`.
`Future` guarantees that callbacks are called at most once, so it can
use `lambda::CallableOnce` to expicitly declare this, and allow
corresponding optimizations with moves.
Review: https://reviews.apache.org/r/63638/
Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/8014e3f9
Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/8014e3f9
Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/8014e3f9
Branch: refs/heads/master
Commit: 8014e3f9e1838745a6f3af7c1e2a557bd74349b0
Parents: 09b72e9
Author: Dmitry Zhuk <dz...@twopensource.com>
Authored: Tue Dec 5 11:20:55 2017 -0800
Committer: Michael Park <mp...@apache.org>
Committed: Tue Dec 5 11:20:55 2017 -0800
----------------------------------------------------------------------
3rdparty/libprocess/include/process/future.hpp | 240 ++++++++++++--------
3rdparty/libprocess/src/tests/future_tests.cpp | 49 ++++
2 files changed, 189 insertions(+), 100 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/mesos/blob/8014e3f9/3rdparty/libprocess/include/process/future.hpp
----------------------------------------------------------------------
diff --git a/3rdparty/libprocess/include/process/future.hpp b/3rdparty/libprocess/include/process/future.hpp
index d48dfd7..cad4c5e 100644
--- a/3rdparty/libprocess/include/process/future.hpp
+++ b/3rdparty/libprocess/include/process/future.hpp
@@ -159,12 +159,12 @@ public:
// Type of the callback functions that can get invoked when the
// future gets set, fails, or is discarded.
- typedef lambda::function<void()> AbandonedCallback;
- typedef lambda::function<void()> DiscardCallback;
- typedef lambda::function<void(const T&)> ReadyCallback;
- typedef lambda::function<void(const std::string&)> FailedCallback;
- typedef lambda::function<void()> DiscardedCallback;
- typedef lambda::function<void(const Future<T>&)> AnyCallback;
+ typedef lambda::CallableOnce<void()> AbandonedCallback;
+ typedef lambda::CallableOnce<void()> DiscardCallback;
+ typedef lambda::CallableOnce<void(const T&)> ReadyCallback;
+ typedef lambda::CallableOnce<void(const std::string&)> FailedCallback;
+ typedef lambda::CallableOnce<void()> DiscardedCallback;
+ typedef lambda::CallableOnce<void(const Future<T>&)> AnyCallback;
// Installs callbacks for the specified events and returns a const
// reference to 'this' in order to easily support chaining.
@@ -181,40 +181,43 @@ public:
template <typename F>
const Future<T>& onAbandoned(_Deferred<F>&& deferred) const
{
- return onAbandoned(deferred.operator std::function<void()>());
+ return onAbandoned(
+ std::move(deferred).operator lambda::CallableOnce<void()>());
}
template <typename F>
const Future<T>& onDiscard(_Deferred<F>&& deferred) const
{
- return onDiscard(std::move(deferred).operator std::function<void()>());
+ return onDiscard(
+ std::move(deferred).operator lambda::CallableOnce<void()>());
}
template <typename F>
const Future<T>& onReady(_Deferred<F>&& deferred) const
{
return onReady(
- std::move(deferred).operator std::function<void(const T&)>());
+ std::move(deferred).operator lambda::CallableOnce<void(const T&)>());
}
template <typename F>
const Future<T>& onFailed(_Deferred<F>&& deferred) const
{
- return onFailed(
- std::move(deferred).operator std::function<void(const std::string&)>());
+ return onFailed(std::move(deferred)
+ .operator lambda::CallableOnce<void(const std::string&)>());
}
template <typename F>
const Future<T>& onDiscarded(_Deferred<F>&& deferred) const
{
- return onDiscarded(std::move(deferred).operator std::function<void()>());
+ return onDiscarded(
+ std::move(deferred).operator lambda::CallableOnce<void()>());
}
template <typename F>
const Future<T>& onAny(_Deferred<F>&& deferred) const
{
- return onAny(
- std::move(deferred).operator std::function<void(const Future<T>&)>());
+ return onAny(std::move(deferred)
+ .operator lambda::CallableOnce<void(const Future<T>&)>());
}
private:
@@ -233,10 +236,13 @@ private:
template <typename F, typename = typename result_of<F(const T&)>::type>
const Future<T>& onReady(F&& f, Prefer) const
{
- return onReady(std::function<void(const T&)>(
- [=](const T& t) mutable {
- f(t);
- }));
+ return onReady(lambda::CallableOnce<void(const T&)>(
+ lambda::partial(
+ [](typename std::decay<F>::type&& f, const T& t) {
+ std::move(f)(t);
+ },
+ std::forward<F>(f),
+ lambda::_1)));
}
// This is the less preferred `onReady`, we prefer the `onReady` method which
@@ -254,19 +260,25 @@ private:
F>::type()>::type>
const Future<T>& onReady(F&& f, LessPrefer) const
{
- return onReady(std::function<void(const T&)>(
- [=](const T&) mutable {
- f();
- }));
+ return onReady(lambda::CallableOnce<void(const T&)>(
+ lambda::partial(
+ [](typename std::decay<F>::type&& f, const T&) {
+ std::move(f)();
+ },
+ std::forward<F>(f),
+ lambda::_1)));
}
template <typename F, typename = typename result_of<F(const std::string&)>::type> // NOLINT(whitespace/line_length)
const Future<T>& onFailed(F&& f, Prefer) const
{
- return onFailed(std::function<void(const std::string&)>(
- [=](const std::string& message) mutable {
- f(message);
- }));
+ return onFailed(lambda::CallableOnce<void(const std::string&)>(
+ lambda::partial(
+ [](typename std::decay<F>::type&& f, const std::string& message) {
+ std::move(f)(message);
+ },
+ std::forward<F>(f),
+ lambda::_1)));
}
// Refer to the less preferred version of `onReady` for why these SFINAE
@@ -278,19 +290,25 @@ private:
F>::type()>::type>
const Future<T>& onFailed(F&& f, LessPrefer) const
{
- return onFailed(std::function<void(const std::string&)>(
- [=](const std::string&) mutable {
- f();
- }));
+ return onFailed(lambda::CallableOnce<void(const std::string&)>(
+ lambda::partial(
+ [](typename std::decay<F>::type&& f, const std::string&) mutable {
+ std::move(f)();
+ },
+ std::forward<F>(f),
+ lambda::_1)));
}
template <typename F, typename = typename result_of<F(const Future<T>&)>::type> // NOLINT(whitespace/line_length)
const Future<T>& onAny(F&& f, Prefer) const
{
- return onAny(std::function<void(const Future<T>&)>(
- [=](const Future<T>& future) mutable {
- f(future);
- }));
+ return onAny(lambda::CallableOnce<void(const Future<T>&)>(
+ lambda::partial(
+ [](typename std::decay<F>::type&& f, const Future<T>& future) {
+ std::move(f)(future);
+ },
+ std::forward<F>(f),
+ lambda::_1)));
}
// Refer to the less preferred version of `onReady` for why these SFINAE
@@ -302,29 +320,36 @@ private:
F>::type()>::type>
const Future<T>& onAny(F&& f, LessPrefer) const
{
- return onAny(std::function<void(const Future<T>&)>(
- [=](const Future<T>&) mutable {
- f();
- }));
+ return onAny(lambda::CallableOnce<void(const Future<T>&)>(
+ lambda::partial(
+ [](typename std::decay<F>::type&& f, const Future<T>&) {
+ std::move(f)();
+ },
+ std::forward<F>(f),
+ lambda::_1)));
}
public:
template <typename F>
const Future<T>& onAbandoned(F&& f) const
{
- return onAbandoned(std::function<void()>(
- [=]() mutable {
- f();
- }));
+ return onAbandoned(lambda::CallableOnce<void()>(
+ lambda::partial(
+ [](typename std::decay<F>::type&& f) {
+ std::move(f)();
+ },
+ std::forward<F>(f))));
}
template <typename F>
const Future<T>& onDiscard(F&& f) const
{
- return onDiscard(std::function<void()>(
- [=]() mutable {
- f();
- }));
+ return onDiscard(lambda::CallableOnce<void()>(
+ lambda::partial(
+ [](typename std::decay<F>::type&& f) {
+ std::move(f)();
+ },
+ std::forward<F>(f))));
}
template <typename F>
@@ -342,10 +367,12 @@ public:
template <typename F>
const Future<T>& onDiscarded(F&& f) const
{
- return onDiscarded(std::function<void()>(
- [=]() mutable {
- f();
- }));
+ return onDiscarded(lambda::CallableOnce<void()>(
+ lambda::partial(
+ [](typename std::decay<F>::type&& f) {
+ std::move(f)();
+ },
+ std::forward<F>(f))));
}
template <typename F>
@@ -358,22 +385,23 @@ public:
// and associates the result of the callback with the future that is
// returned to the caller (which may be of a different type).
template <typename X>
- Future<X> then(lambda::function<Future<X>(const T&)> f) const;
+ Future<X> then(lambda::CallableOnce<Future<X>(const T&)> f) const;
template <typename X>
- Future<X> then(lambda::function<X(const T&)> f) const;
+ Future<X> then(lambda::CallableOnce<X(const T&)> f) const;
template <typename X>
- Future<X> then(lambda::function<Future<X>()> f) const
+ Future<X> then(lambda::CallableOnce<Future<X>()> f) const
{
- return then(
- lambda::function<Future<X>(const T&)>(lambda::bind(std::move(f))));
+ return then(lambda::CallableOnce<Future<X>(const T&)>(
+ lambda::partial(std::move(f))));
}
template <typename X>
- Future<X> then(lambda::function<X()> f) const
+ Future<X> then(lambda::CallableOnce<X()> f) const
{
- return then(lambda::function<X(const T&)>(lambda::bind(std::move(f))));
+ return then(lambda::CallableOnce<X(const T&)>(
+ lambda::partial(std::move(f))));
}
private:
@@ -385,7 +413,8 @@ private:
{
// note the then<X> is necessary to not have an infinite loop with
// then(F&& f)
- return then<X>(std::move(f).operator std::function<Future<X>(const T&)>());
+ return then<X>(
+ std::move(f).operator lambda::CallableOnce<Future<X>(const T&)>());
}
// Refer to the less preferred version of `onReady` for why these SFINAE
@@ -398,13 +427,14 @@ private:
F>::type()>::type>::type>
Future<X> then(_Deferred<F>&& f, LessPrefer) const
{
- return then<X>(std::move(f).operator std::function<Future<X>()>());
+ return then<X>(std::move(f).operator lambda::CallableOnce<Future<X>()>());
}
template <typename F, typename X = typename internal::unwrap<typename result_of<F(const T&)>::type>::type> // NOLINT(whitespace/line_length)
Future<X> then(F&& f, Prefer) const
{
- return then<X>(std::function<Future<X>(const T&)>(std::forward<F>(f)));
+ return then<X>(
+ lambda::CallableOnce<Future<X>(const T&)>(std::forward<F>(f)));
}
// Refer to the less preferred version of `onReady` for why these SFINAE
@@ -417,7 +447,7 @@ private:
F>::type()>::type>::type>
Future<X> then(F&& f, LessPrefer) const
{
- return then<X>(std::function<Future<X>()>(std::forward<F>(f)));
+ return then<X>(lambda::CallableOnce<Future<X>()>(std::forward<F>(f)));
}
public:
@@ -449,7 +479,7 @@ public:
{
return recover(
std::move(deferred)
- .operator std::function<Future<T>(const Future<T>&)>());
+ .operator lambda::CallableOnce<Future<T>(const Future<T>&)>());
}
// TODO(benh): Considering adding a `rescue` function for rescuing
@@ -458,7 +488,7 @@ public:
// Installs callbacks that get executed if this future completes
// because it failed.
Future<T> repair(
- const lambda::function<Future<T>(const Future<T>&)>& f) const;
+ lambda::CallableOnce<Future<T>(const Future<T>&)> f) const;
// TODO(benh): Add overloads of 'repair' that don't require passing
// in a function that takes the 'const Future<T>&' parameter and use
@@ -472,7 +502,7 @@ public:
// was called on the returned future.
Future<T> after(
const Duration& duration,
- const lambda::function<Future<T>(const Future<T>&)>& f) const;
+ lambda::CallableOnce<Future<T>(const Future<T>&)> f) const;
// TODO(benh): Add overloads of 'after' that don't require passing
// in a function that takes the 'const Future<T>&' parameter and use
@@ -584,10 +614,10 @@ namespace internal {
//
// TODO(*): Invoke callbacks in another execution context.
template <typename C, typename... Arguments>
-void run(const std::vector<C>& callbacks, Arguments&&... arguments)
+void run(std::vector<C>&& callbacks, Arguments&&... arguments)
{
for (size_t i = 0; i < callbacks.size(); ++i) {
- callbacks[i](std::forward<Arguments>(arguments)...);
+ std::move(callbacks[i])(std::forward<Arguments>(arguments)...);
}
}
@@ -995,8 +1025,8 @@ bool Promise<T>::discard(Future<T> future)
// ourselves from one of the callbacks erroneously deleting the
// future. In `Future::_set()` and `Future::fail()` we have to
// explicitly take a copy to protect ourselves.
- internal::run(future.data->onDiscardedCallbacks);
- internal::run(future.data->onAnyCallbacks, future);
+ internal::run(std::move(future.data->onDiscardedCallbacks));
+ internal::run(std::move(future.data->onAnyCallbacks), future);
future.data->clearAllCallbacks();
}
@@ -1157,7 +1187,7 @@ bool Future<T>::discard()
// future. The callbacks get destroyed when we exit from the
// function.
if (result) {
- internal::run(callbacks);
+ internal::run(std::move(callbacks));
}
return result;
@@ -1183,7 +1213,7 @@ bool Future<T>::abandon(bool propagating)
// Invoke all callbacks. The callbacks get destroyed when we exit
// from the function.
if (result) {
- internal::run(callbacks);
+ internal::run(std::move(callbacks));
}
return result;
@@ -1329,7 +1359,7 @@ const Future<T>& Future<T>::onAbandoned(AbandonedCallback&& callback) const
// TODO(*): Invoke callback in another execution context.
if (run) {
- callback(); // NOLINT(misc-use-after-move)
+ std::move(callback)(); // NOLINT(misc-use-after-move)
}
return *this;
@@ -1351,7 +1381,7 @@ const Future<T>& Future<T>::onDiscard(DiscardCallback&& callback) const
// TODO(*): Invoke callback in another execution context.
if (run) {
- callback(); // NOLINT(misc-use-after-move)
+ std::move(callback)(); // NOLINT(misc-use-after-move)
}
return *this;
@@ -1373,7 +1403,7 @@ const Future<T>& Future<T>::onReady(ReadyCallback&& callback) const
// TODO(*): Invoke callback in another execution context.
if (run) {
- callback(data->result.get()); // NOLINT(misc-use-after-move)
+ std::move(callback)(data->result.get()); // NOLINT(misc-use-after-move)
}
return *this;
@@ -1395,7 +1425,7 @@ const Future<T>& Future<T>::onFailed(FailedCallback&& callback) const
// TODO(*): Invoke callback in another execution context.
if (run) {
- callback(data->result.error()); // NOLINT(misc-use-after-move)
+ std::move(callback)(data->result.error()); // NOLINT(misc-use-after-move)
}
return *this;
@@ -1417,7 +1447,7 @@ const Future<T>& Future<T>::onDiscarded(DiscardedCallback&& callback) const
// TODO(*): Invoke callback in another execution context.
if (run) {
- callback(); // NOLINT(misc-use-after-move)
+ std::move(callback)(); // NOLINT(misc-use-after-move)
}
return *this;
@@ -1439,7 +1469,7 @@ const Future<T>& Future<T>::onAny(AnyCallback&& callback) const
// TODO(*): Invoke callback in another execution context.
if (run) {
- callback(*this); // NOLINT(misc-use-after-move)
+ std::move(callback)(*this); // NOLINT(misc-use-after-move)
}
return *this;
@@ -1451,7 +1481,7 @@ namespace internal {
// from the function 'then' whose parameter 'f' doesn't return a
// Future since the compiler can't properly infer otherwise.
template <typename T, typename X>
-void thenf(const lambda::function<Future<X>(const T&)>& f,
+void thenf(lambda::CallableOnce<Future<X>(const T&)>&& f,
const std::shared_ptr<Promise<X>>& promise,
const Future<T>& future)
{
@@ -1459,7 +1489,7 @@ void thenf(const lambda::function<Future<X>(const T&)>& f,
if (future.hasDiscard()) {
promise->discard();
} else {
- promise->associate(f(future.get()));
+ promise->associate(std::move(f)(future.get()));
}
} else if (future.isFailed()) {
promise->fail(future.failure());
@@ -1470,7 +1500,7 @@ void thenf(const lambda::function<Future<X>(const T&)>& f,
template <typename T, typename X>
-void then(const lambda::function<X(const T&)>& f,
+void then(lambda::CallableOnce<X(const T&)>&& f,
const std::shared_ptr<Promise<X>>& promise,
const Future<T>& future)
{
@@ -1478,7 +1508,7 @@ void then(const lambda::function<X(const T&)>& f,
if (future.hasDiscard()) {
promise->discard();
} else {
- promise->set(f(future.get()));
+ promise->set(std::move(f)(future.get()));
}
} else if (future.isFailed()) {
promise->fail(future.failure());
@@ -1490,13 +1520,13 @@ void then(const lambda::function<X(const T&)>& f,
template <typename T>
void repair(
- const lambda::function<Future<T>(const Future<T>&)>& f,
+ lambda::CallableOnce<Future<T>(const Future<T>&)>&& f,
const std::shared_ptr<Promise<T>>& promise,
const Future<T>& future)
{
CHECK(!future.isPending());
if (future.isFailed()) {
- promise->associate(f(future));
+ promise->associate(std::move(f)(future));
} else {
promise->associate(future);
}
@@ -1505,7 +1535,7 @@ void repair(
template <typename T>
void expired(
- const lambda::function<Future<T>(const Future<T>&)>& f,
+ const std::shared_ptr<lambda::CallableOnce<Future<T>(const Future<T>&)>>& f,
const std::shared_ptr<Latch>& latch,
const std::shared_ptr<Promise<T>>& promise,
const std::shared_ptr<Option<Timer>>& timer,
@@ -1525,7 +1555,7 @@ void expired(
// if the future has been discarded and rather than hiding a
// non-deterministic bug we always call 'f' if the timer has
// expired.
- promise->associate(f(future));
+ promise->associate(std::move(*f)(future));
}
}
@@ -1559,12 +1589,12 @@ void after(
template <typename T>
template <typename X>
-Future<X> Future<T>::then(lambda::function<Future<X>(const T&)> f) const
+Future<X> Future<T>::then(lambda::CallableOnce<Future<X>(const T&)> f) const
{
std::shared_ptr<Promise<X>> promise(new Promise<X>());
- lambda::function<void(const Future<T>&)> thenf =
- lambda::bind(&internal::thenf<T, X>, std::move(f), promise, lambda::_1);
+ lambda::CallableOnce<void(const Future<T>&)> thenf =
+ lambda::partial(&internal::thenf<T, X>, std::move(f), promise, lambda::_1);
onAny(std::move(thenf));
@@ -1583,12 +1613,12 @@ Future<X> Future<T>::then(lambda::function<Future<X>(const T&)> f) const
template <typename T>
template <typename X>
-Future<X> Future<T>::then(lambda::function<X(const T&)> f) const
+Future<X> Future<T>::then(lambda::CallableOnce<X(const T&)> f) const
{
std::shared_ptr<Promise<X>> promise(new Promise<X>());
- lambda::function<void(const Future<T>&)> then =
- lambda::bind(&internal::then<T, X>, std::move(f), promise, lambda::_1);
+ lambda::CallableOnce<void(const Future<T>&)> then =
+ lambda::partial(&internal::then<T, X>, std::move(f), promise, lambda::_1);
onAny(std::move(then));
@@ -1613,6 +1643,11 @@ Future<T> Future<T>::recover(F&& f) const
const Future<T> future = *this;
+ typedef decltype(std::move(f)(future)) R;
+
+ std::shared_ptr<lambda::CallableOnce<R(const Future<T>&)>> callable(
+ new lambda::CallableOnce<R(const Future<T>&)>(std::move(f)));
+
onAny([=]() {
if (future.isDiscarded() || future.isFailed()) {
// We reset `discard` so that if a future gets returned from
@@ -1624,7 +1659,7 @@ Future<T> Future<T>::recover(F&& f) const
promise->f.data->discard = false;
}
- promise->set(f(future));
+ promise->set(std::move(*callable)(future));
} else {
promise->associate(future);
}
@@ -1635,7 +1670,7 @@ Future<T> Future<T>::recover(F&& f) const
synchronized (promise->f.data->lock) {
promise->f.data->discard = false;
}
- promise->set(f(future));
+ promise->set(std::move(*callable)(future));
});
// Propagate discarding up the chain. To avoid cyclic dependencies,
@@ -1649,11 +1684,12 @@ Future<T> Future<T>::recover(F&& f) const
template <typename T>
Future<T> Future<T>::repair(
- const lambda::function<Future<T>(const Future<T>&)>& f) const
+ lambda::CallableOnce<Future<T>(const Future<T>&)> f) const
{
std::shared_ptr<Promise<T>> promise(new Promise<T>());
- onAny(lambda::bind(&internal::repair<T>, f, promise, lambda::_1));
+ onAny(
+ lambda::partial(&internal::repair<T>, std::move(f), promise, lambda::_1));
onAbandoned([=]() {
promise->future().abandon();
@@ -1671,7 +1707,7 @@ Future<T> Future<T>::repair(
template <typename T>
Future<T> Future<T>::after(
const Duration& duration,
- const lambda::function<Future<T>(const Future<T>&)>& f) const
+ lambda::CallableOnce<Future<T>(const Future<T>&)> f) const
{
// TODO(benh): Using a Latch here but Once might be cleaner.
// Unfortunately, Once depends on Future so we can't easily use it
@@ -1694,6 +1730,9 @@ Future<T> Future<T>::after(
// issues we have to worry about.
std::shared_ptr<Option<Timer>> timer(new Option<Timer>());
+ typedef lambda::CallableOnce<Future<T>(const Future<T>&)> F;
+ std::shared_ptr<F> callable(new F(std::move(f)));
+
// Set up a timer to invoke the callback if this future has not
// completed. Note that we do not pass a weak reference for this
// future as we don't want the future to get cleaned up and then
@@ -1706,7 +1745,8 @@ Future<T> Future<T>::after(
// force the deallocation of our copy of the timer).
*timer = Clock::timer(
duration,
- lambda::bind(&internal::expired<T>, f, latch, promise, timer, *this));
+ lambda::bind(&internal::expired<T>, callable, latch, promise, timer,
+ *this));
onAny(lambda::bind(&internal::after<T>, latch, promise, timer, lambda::_1));
@@ -1758,8 +1798,8 @@ bool Future<T>::_set(U&& u)
// Grab a copy of `data` just in case invoking the callbacks
// erroneously attempts to delete this future.
std::shared_ptr<typename Future<T>::Data> copy = data;
- internal::run(copy->onReadyCallbacks, copy->result.get());
- internal::run(copy->onAnyCallbacks, *this);
+ internal::run(std::move(copy->onReadyCallbacks), copy->result.get());
+ internal::run(std::move(copy->onAnyCallbacks), *this);
copy->clearAllCallbacks();
}
@@ -1788,8 +1828,8 @@ bool Future<T>::fail(const std::string& _message)
// Grab a copy of `data` just in case invoking the callbacks
// erroneously attempts to delete this future.
std::shared_ptr<typename Future<T>::Data> copy = data;
- internal::run(copy->onFailedCallbacks, copy->result.error());
- internal::run(copy->onAnyCallbacks, *this);
+ internal::run(std::move(copy->onFailedCallbacks), copy->result.error());
+ internal::run(std::move(copy->onAnyCallbacks), *this);
copy->clearAllCallbacks();
}
http://git-wip-us.apache.org/repos/asf/mesos/blob/8014e3f9/3rdparty/libprocess/src/tests/future_tests.cpp
----------------------------------------------------------------------
diff --git a/3rdparty/libprocess/src/tests/future_tests.cpp b/3rdparty/libprocess/src/tests/future_tests.cpp
index bb7a1c8..3dadd72 100644
--- a/3rdparty/libprocess/src/tests/future_tests.cpp
+++ b/3rdparty/libprocess/src/tests/future_tests.cpp
@@ -210,6 +210,55 @@ TEST(FutureTest, Then)
}
+TEST(FutureTest, CallableOnce)
+{
+ Promise<Nothing> promise;
+ promise.set(Nothing());
+
+ Future<int> future = promise.future()
+ .then(lambda::partial(
+ [](std::unique_ptr<int>&& o) {
+ return *o;
+ },
+ std::unique_ptr<int>(new int(42))));
+
+ ASSERT_TRUE(future.isReady());
+ EXPECT_EQ(42, future.get());
+
+ int n = 0;
+ future = promise.future()
+ .onReady(lambda::partial(
+ [&n](std::unique_ptr<int> o) {
+ n += *o;
+ },
+ std::unique_ptr<int>(new int(1))))
+ .onAny(lambda::partial(
+ [&n](std::unique_ptr<int>&& o) {
+ n += *o;
+ },
+ std::unique_ptr<int>(new int(10))))
+ .onFailed(lambda::partial(
+ [&n](const std::unique_ptr<int>& o) {
+ n += *o;
+ },
+ std::unique_ptr<int>(new int(100))))
+ .onDiscard(lambda::partial(
+ [&n](std::unique_ptr<int>&& o) {
+ n += *o;
+ },
+ std::unique_ptr<int>(new int(1000))))
+ .onDiscarded(lambda::partial(
+ [&n](std::unique_ptr<int>&& o) {
+ n += *o;
+ },
+ std::unique_ptr<int>(new int(10000))))
+ .then([&n]() { return n; });
+
+ ASSERT_TRUE(future.isReady());
+ EXPECT_EQ(11, future.get());
+}
+
+
Future<int> repair(const Future<int>& future)
{
EXPECT_TRUE(future.isFailed());