You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@mesos.apache.org by mp...@apache.org on 2017/12/05 19:54:09 UTC

[1/4] mesos git commit: Added `CallableOnce` support in `defer`.

Repository: mesos
Updated Branches:
  refs/heads/master 62b472731 -> 6839897c5


Added `CallableOnce` support in `defer`.

This allows `defer` result to be converted to `CallableOnce`, ensuring
that bound arguments are moved, when call is made, and avoiding making
copies of bound arguments.

Review: https://reviews.apache.org/r/63637/


Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/09b72e9b
Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/09b72e9b
Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/09b72e9b

Branch: refs/heads/master
Commit: 09b72e9bbf87793ce84df5d5f9d5f292c60fa5ee
Parents: 62b4727
Author: Dmitry Zhuk <dz...@twopensource.com>
Authored: Tue Dec 5 11:20:41 2017 -0800
Committer: Michael Park <mp...@apache.org>
Committed: Tue Dec 5 11:20:41 2017 -0800

----------------------------------------------------------------------
 .../libprocess/include/process/deferred.hpp     | 86 ++++++++++++++++++++
 3rdparty/libprocess/src/tests/process_tests.cpp | 28 +++++++
 2 files changed, 114 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/mesos/blob/09b72e9b/3rdparty/libprocess/include/process/deferred.hpp
----------------------------------------------------------------------
diff --git a/3rdparty/libprocess/include/process/deferred.hpp b/3rdparty/libprocess/include/process/deferred.hpp
index 8beb2b3..eb6c5be 100644
--- a/3rdparty/libprocess/include/process/deferred.hpp
+++ b/3rdparty/libprocess/include/process/deferred.hpp
@@ -105,6 +105,22 @@ struct _Deferred
         });
   }
 
+  operator lambda::CallableOnce<void()>() &&
+  {
+    if (pid.isNone()) {
+      return lambda::CallableOnce<void()>(std::forward<F>(f));
+    }
+
+    Option<UPID> pid_ = pid;
+
+    return lambda::CallableOnce<void()>(
+        lambda::partial(
+            [pid_](typename std::decay<F>::type&& f_) {
+              dispatch(pid_.get(), std::move(f_));
+            },
+            std::forward<F>(f)));
+  }
+
   template <typename R>
   operator Deferred<R()>() &&
   {
@@ -137,6 +153,29 @@ struct _Deferred
         });
   }
 
+  template <typename R>
+  operator lambda::CallableOnce<R()>() &&
+  {
+    if (pid.isNone()) {
+      return lambda::CallableOnce<R()>(std::forward<F>(f));
+    }
+
+    Option<UPID> pid_ = pid;
+
+    return lambda::CallableOnce<R()>(
+        lambda::partial(
+          [pid_](typename std::decay<F>::type&& f_) {
+            return dispatch(pid_.get(), std::move(f_));
+          },
+          std::forward<F>(f)));
+  }
+
+// Expands to lambda::_$(N+1). N is zero-based, and placeholders are one-based.
+#define PLACEHOLDER(Z, N, DATA) CAT(lambda::_, INC(N))
+
+// This assumes type and variable base names are `P` and `p` respectively.
+#define FORWARD(Z, N, DATA) std::forward<P ## N>(p ## N)
+
   // Due to a bug (http://gcc.gnu.org/bugzilla/show_bug.cgi?id=41933)
   // with variadic templates and lambdas, we still need to do
   // preprocessor expansions. In addition, due to a bug with clang (or
@@ -180,6 +219,28 @@ struct _Deferred
           });                                                            \
           dispatch(pid_.get(), f__);                                     \
         });                                                              \
+  }                                                                      \
+                                                                         \
+  template <ENUM_PARAMS(N, typename P)>                                  \
+  operator lambda::CallableOnce<void(ENUM_PARAMS(N, P))>() &&            \
+  {                                                                      \
+    if (pid.isNone()) {                                                  \
+      return lambda::CallableOnce<void(ENUM_PARAMS(N, P))>(              \
+          std::forward<F>(f));                                           \
+    }                                                                    \
+                                                                         \
+    Option<UPID> pid_ = pid;                                             \
+                                                                         \
+    return lambda::CallableOnce<void(ENUM_PARAMS(N, P))>(                \
+        lambda::partial(                                                 \
+            [pid_](typename std::decay<F>::type&& f_,                    \
+                   ENUM_BINARY_PARAMS(N, P, &&p)) {                      \
+              lambda::CallableOnce<void()> f__(                          \
+                  lambda::partial(std::move(f_), ENUM(N, FORWARD, _)));  \
+              dispatch(pid_.get(), std::move(f__));                      \
+            },                                                           \
+            std::forward<F>(f),                                          \
+            ENUM(N, PLACEHOLDER, _)));                                   \
   }
 
   REPEAT_FROM_TO(1, 3, TEMPLATE, _) // Args A0 -> A1.
@@ -222,11 +283,36 @@ struct _Deferred
           });                                                           \
           return dispatch(pid_.get(), f__);                             \
         });                                                             \
+  }                                                                     \
+                                                                        \
+  template <typename R, ENUM_PARAMS(N, typename P)>                     \
+  operator lambda::CallableOnce<R(ENUM_PARAMS(N, P))>() &&              \
+  {                                                                     \
+    if (pid.isNone()) {                                                 \
+      return lambda::CallableOnce<R(ENUM_PARAMS(N, P))>(                \
+          std::forward<F>(f));                                          \
+    }                                                                   \
+                                                                        \
+    Option<UPID> pid_ = pid;                                            \
+                                                                        \
+    return lambda::CallableOnce<R(ENUM_PARAMS(N, P))>(                  \
+        lambda::partial(                                                \
+            [pid_](typename std::decay<F>::type&& f_,                   \
+                   ENUM_BINARY_PARAMS(N, P, &&p)) {                     \
+              lambda::CallableOnce<R()> f__(                            \
+                  lambda::partial(std::move(f_), ENUM(N, FORWARD, _))); \
+              return dispatch(pid_.get(), std::move(f__));              \
+        },                                                              \
+        std::forward<F>(f),                                             \
+        ENUM(N, PLACEHOLDER, _)));                                      \
   }
 
   REPEAT_FROM_TO(1, 3, TEMPLATE, _) // Args A0 -> A1.
 #undef TEMPLATE
 
+#undef FORWARD
+#undef PLACEHOLDER
+
 private:
   friend class Executor;
 

http://git-wip-us.apache.org/repos/asf/mesos/blob/09b72e9b/3rdparty/libprocess/src/tests/process_tests.cpp
----------------------------------------------------------------------
diff --git a/3rdparty/libprocess/src/tests/process_tests.cpp b/3rdparty/libprocess/src/tests/process_tests.cpp
index 4a3e3ca..58741b2 100644
--- a/3rdparty/libprocess/src/tests/process_tests.cpp
+++ b/3rdparty/libprocess/src/tests/process_tests.cpp
@@ -169,6 +169,9 @@ public:
 
   void func5(MoveOnly&& mo) { func5_(mo); }
   MOCK_METHOD1(func5_, void(const MoveOnly&));
+
+  bool func6(MoveOnly&& m1, MoveOnly&& m2, bool b) { return func6_(m1, m2, b); }
+  MOCK_METHOD3(func6_, bool(const MoveOnly&, const MoveOnly&, bool));
 };
 
 
@@ -223,6 +226,11 @@ TEST(ProcessTest, THREADSAFE_Defer1)
   EXPECT_CALL(process, func4(_, _))
     .WillRepeatedly(ReturnArg<0>());
 
+  EXPECT_CALL(process, func5_(_));
+
+  EXPECT_CALL(process, func6_(_, _, _))
+    .WillRepeatedly(ReturnArg<2>());
+
   PID<DispatchProcess> pid = spawn(&process);
 
   ASSERT_FALSE(!pid);
@@ -270,6 +278,26 @@ TEST(ProcessTest, THREADSAFE_Defer1)
     EXPECT_TRUE(future.get());
   }
 
+  {
+    lambda::CallableOnce<void()> func5 =
+      defer(pid, &DispatchProcess::func5, MoveOnly());
+    std::move(func5)();
+  }
+
+  {
+    lambda::CallableOnce<Future<bool>(MoveOnly&&)> func6 =
+      defer(pid, &DispatchProcess::func6, MoveOnly(), lambda::_1, true);
+    future = std::move(func6)(MoveOnly());
+    EXPECT_TRUE(future.get());
+  }
+
+  {
+    lambda::CallableOnce<Future<bool>(MoveOnly&&)> func6 =
+      defer(pid, &DispatchProcess::func6, MoveOnly(), lambda::_1, false);
+    future = std::move(func6)(MoveOnly());
+    EXPECT_FALSE(future.get());
+  }
+
   // Only take const &!
 
   terminate(pid);


[4/4] mesos git commit: Made `Event` move-only in libprocess.

Posted by mp...@apache.org.
Made `Event` move-only in libprocess.

Review: https://reviews.apache.org/r/64347/


Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/6839897c
Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/6839897c
Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/6839897c

Branch: refs/heads/master
Commit: 6839897c5464fce6b8cbd253d959a7e2efd72987
Parents: c9e6a03
Author: Dmitry Zhuk <dz...@twopensource.com>
Authored: Tue Dec 5 11:21:27 2017 -0800
Committer: Michael Park <mp...@apache.org>
Committed: Tue Dec 5 11:21:27 2017 -0800

----------------------------------------------------------------------
 3rdparty/libprocess/include/process/event.hpp | 6 ++----
 1 file changed, 2 insertions(+), 4 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/mesos/blob/6839897c/3rdparty/libprocess/include/process/event.hpp
----------------------------------------------------------------------
diff --git a/3rdparty/libprocess/include/process/event.hpp b/3rdparty/libprocess/include/process/event.hpp
index 76bcdb8..2e93428 100644
--- a/3rdparty/libprocess/include/process/event.hpp
+++ b/3rdparty/libprocess/include/process/event.hpp
@@ -121,8 +121,7 @@ struct MessageEvent : Event
     : message{std::move(name), from, to, std::move(data)} {}
 
   MessageEvent(MessageEvent&& that) = default;
-  // TODO(dzhuk) Make it movable only, when move is properly supported in defer.
-  MessageEvent(const MessageEvent& that) = default;
+  MessageEvent(const MessageEvent& that) = delete;
   MessageEvent& operator=(MessageEvent&&) = default;
   MessageEvent& operator=(const MessageEvent&) = delete;
 
@@ -218,8 +217,7 @@ struct ExitedEvent : Event
     : pid(_pid) {}
 
   ExitedEvent(ExitedEvent&&) = default;
-  // TODO(dzhuk) Make it movable only, when move is properly supported in defer.
-  ExitedEvent(const ExitedEvent&) = default;
+  ExitedEvent(const ExitedEvent&) = delete;
   ExitedEvent& operator=(ExitedEvent&&) = default;
   ExitedEvent& operator=(const ExitedEvent&) = delete;
 


[3/4] mesos git commit: Used `std::move` for `Event`s consumption in the master.

Posted by mp...@apache.org.
Used `std::move` for `Event`s consumption in the master.

Review: https://reviews.apache.org/r/63641/


Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/c9e6a03c
Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/c9e6a03c
Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/c9e6a03c

Branch: refs/heads/master
Commit: c9e6a03c02e9f8dc040b937ccd5ae89e5530fd7e
Parents: 8014e3f
Author: Dmitry Zhuk <dz...@twopensource.com>
Authored: Tue Dec 5 11:21:11 2017 -0800
Committer: Michael Park <mp...@apache.org>
Committed: Tue Dec 5 11:21:11 2017 -0800

----------------------------------------------------------------------
 src/master/master.cpp | 28 +++++++++++-----------------
 src/master/master.hpp |  6 +++---
 2 files changed, 14 insertions(+), 20 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/mesos/blob/c9e6a03c/src/master/master.cpp
----------------------------------------------------------------------
diff --git a/src/master/master.cpp b/src/master/master.cpp
index 1fff323..e8257e7 100644
--- a/src/master/master.cpp
+++ b/src/master/master.cpp
@@ -1590,19 +1590,19 @@ void Master::consume(ExitedEvent&& event)
     : Option<string>::none();
 
   // Necessary to disambiguate below.
-  typedef void(Self::*F)(const ExitedEvent&);
+  typedef void(Self::*F)(ExitedEvent&&);
 
   if (principal.isSome() &&
       frameworks.limiters.contains(principal.get()) &&
       frameworks.limiters[principal.get()].isSome()) {
-    frameworks.limiters[principal.get()].get()->limiter->acquire()
-      .onReady(defer(self(), static_cast<F>(&Self::_consume), event));
+    frameworks.limiters[principal.get()].get()->limiter->acquire().onReady(
+        defer(self(), static_cast<F>(&Self::_consume), std::move(event)));
   } else if ((principal.isNone() ||
               !frameworks.limiters.contains(principal.get())) &&
              isRegisteredFramework &&
              frameworks.defaultLimiter.isSome()) {
-    frameworks.defaultLimiter.get()->limiter->acquire()
-      .onReady(defer(self(), static_cast<F>(&Self::_consume), event));
+    frameworks.defaultLimiter.get()->limiter->acquire().onReady(
+        defer(self(), static_cast<F>(&Self::_consume), std::move(event)));
   } else {
     _consume(std::move(event));
   }
@@ -1612,7 +1612,7 @@ void Master::consume(ExitedEvent&& event)
 // TODO(greggomann): Change this to accept an `Option<Principal>`
 // when MESOS-7202 is resolved.
 void Master::throttled(
-    const MessageEvent& event,
+    MessageEvent&& event,
     const Option<string>& principal)
 {
   // We already know a RateLimiter is used to throttle this event so
@@ -1625,13 +1625,11 @@ void Master::throttled(
     frameworks.defaultLimiter.get()->messages--;
   }
 
-  // TODO(dzhuk): Use std::move(event), when defer supports
-  // rvalue references as handler parameters.
-  _consume(event);
+  _consume(std::move(event));
 }
 
 
-void Master::_consume(const MessageEvent& event)
+void Master::_consume(MessageEvent&& event)
 {
   // Obtain the principal before processing the Message because the
   // mapping may be deleted in handling 'UnregisterFrameworkMessage'
@@ -1641,9 +1639,7 @@ void Master::_consume(const MessageEvent& event)
       ? frameworks.principals[event.message.from]
       : Option<string>::none();
 
-  // TODO(dzhuk): Use std::move(event), when defer supports
-  // rvalue references as handler parameters.
-  ProtobufProcess<Master>::consume(MessageEvent(event));
+  ProtobufProcess<Master>::consume(std::move(event));
 
   // Increment 'messages_processed' counter if it still exists.
   // Note that it could be removed in handling
@@ -1683,11 +1679,9 @@ void Master::exceededCapacity(
 }
 
 
-void Master::_consume(const ExitedEvent& event)
+void Master::_consume(ExitedEvent&& event)
 {
-  // TODO(dzhuk): Use std::move(event), when defer supports
-  // rvalue references as handler parameters.
-  Process<Master>::consume(ExitedEvent(event));
+  Process<Master>::consume(std::move(event));
 }
 
 

http://git-wip-us.apache.org/repos/asf/mesos/blob/c9e6a03c/src/master/master.hpp
----------------------------------------------------------------------
diff --git a/src/master/master.hpp b/src/master/master.hpp
index d7fa536..a721131 100644
--- a/src/master/master.hpp
+++ b/src/master/master.hpp
@@ -572,12 +572,12 @@ protected:
   // 'principal' being None indicates it is throttled by
   // 'defaultLimiter'.
   void throttled(
-      const process::MessageEvent& event,
+      process::MessageEvent&& event,
       const Option<std::string>& principal);
 
   // Continuations of consume().
-  void _consume(const process::MessageEvent& event);
-  void _consume(const process::ExitedEvent& event);
+  void _consume(process::MessageEvent&& event);
+  void _consume(process::ExitedEvent&& event);
 
   // Helper method invoked when the capacity for a framework
   // principal is exceeded.


[2/4] mesos git commit: Added `CallableOnce` support in `Future`.

Posted by mp...@apache.org.
Added `CallableOnce` support in `Future`.

`Future` guarantees that callbacks are called at most once, so it can
use `lambda::CallableOnce` to expicitly declare this, and allow
corresponding optimizations with moves.

Review: https://reviews.apache.org/r/63638/


Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/8014e3f9
Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/8014e3f9
Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/8014e3f9

Branch: refs/heads/master
Commit: 8014e3f9e1838745a6f3af7c1e2a557bd74349b0
Parents: 09b72e9
Author: Dmitry Zhuk <dz...@twopensource.com>
Authored: Tue Dec 5 11:20:55 2017 -0800
Committer: Michael Park <mp...@apache.org>
Committed: Tue Dec 5 11:20:55 2017 -0800

----------------------------------------------------------------------
 3rdparty/libprocess/include/process/future.hpp | 240 ++++++++++++--------
 3rdparty/libprocess/src/tests/future_tests.cpp |  49 ++++
 2 files changed, 189 insertions(+), 100 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/mesos/blob/8014e3f9/3rdparty/libprocess/include/process/future.hpp
----------------------------------------------------------------------
diff --git a/3rdparty/libprocess/include/process/future.hpp b/3rdparty/libprocess/include/process/future.hpp
index d48dfd7..cad4c5e 100644
--- a/3rdparty/libprocess/include/process/future.hpp
+++ b/3rdparty/libprocess/include/process/future.hpp
@@ -159,12 +159,12 @@ public:
 
   // Type of the callback functions that can get invoked when the
   // future gets set, fails, or is discarded.
-  typedef lambda::function<void()> AbandonedCallback;
-  typedef lambda::function<void()> DiscardCallback;
-  typedef lambda::function<void(const T&)> ReadyCallback;
-  typedef lambda::function<void(const std::string&)> FailedCallback;
-  typedef lambda::function<void()> DiscardedCallback;
-  typedef lambda::function<void(const Future<T>&)> AnyCallback;
+  typedef lambda::CallableOnce<void()> AbandonedCallback;
+  typedef lambda::CallableOnce<void()> DiscardCallback;
+  typedef lambda::CallableOnce<void(const T&)> ReadyCallback;
+  typedef lambda::CallableOnce<void(const std::string&)> FailedCallback;
+  typedef lambda::CallableOnce<void()> DiscardedCallback;
+  typedef lambda::CallableOnce<void(const Future<T>&)> AnyCallback;
 
   // Installs callbacks for the specified events and returns a const
   // reference to 'this' in order to easily support chaining.
@@ -181,40 +181,43 @@ public:
   template <typename F>
   const Future<T>& onAbandoned(_Deferred<F>&& deferred) const
   {
-    return onAbandoned(deferred.operator std::function<void()>());
+    return onAbandoned(
+        std::move(deferred).operator lambda::CallableOnce<void()>());
   }
 
   template <typename F>
   const Future<T>& onDiscard(_Deferred<F>&& deferred) const
   {
-    return onDiscard(std::move(deferred).operator std::function<void()>());
+    return onDiscard(
+        std::move(deferred).operator lambda::CallableOnce<void()>());
   }
 
   template <typename F>
   const Future<T>& onReady(_Deferred<F>&& deferred) const
   {
     return onReady(
-        std::move(deferred).operator std::function<void(const T&)>());
+        std::move(deferred).operator lambda::CallableOnce<void(const T&)>());
   }
 
   template <typename F>
   const Future<T>& onFailed(_Deferred<F>&& deferred) const
   {
-    return onFailed(
-        std::move(deferred).operator std::function<void(const std::string&)>());
+    return onFailed(std::move(deferred)
+        .operator lambda::CallableOnce<void(const std::string&)>());
   }
 
   template <typename F>
   const Future<T>& onDiscarded(_Deferred<F>&& deferred) const
   {
-    return onDiscarded(std::move(deferred).operator std::function<void()>());
+    return onDiscarded(
+        std::move(deferred).operator lambda::CallableOnce<void()>());
   }
 
   template <typename F>
   const Future<T>& onAny(_Deferred<F>&& deferred) const
   {
-    return onAny(
-        std::move(deferred).operator std::function<void(const Future<T>&)>());
+    return onAny(std::move(deferred)
+        .operator lambda::CallableOnce<void(const Future<T>&)>());
   }
 
 private:
@@ -233,10 +236,13 @@ private:
   template <typename F, typename = typename result_of<F(const T&)>::type>
   const Future<T>& onReady(F&& f, Prefer) const
   {
-    return onReady(std::function<void(const T&)>(
-        [=](const T& t) mutable {
-          f(t);
-        }));
+    return onReady(lambda::CallableOnce<void(const T&)>(
+        lambda::partial(
+            [](typename std::decay<F>::type&& f, const T& t) {
+              std::move(f)(t);
+            },
+            std::forward<F>(f),
+            lambda::_1)));
   }
 
   // This is the less preferred `onReady`, we prefer the `onReady` method which
@@ -254,19 +260,25 @@ private:
           F>::type()>::type>
   const Future<T>& onReady(F&& f, LessPrefer) const
   {
-    return onReady(std::function<void(const T&)>(
-        [=](const T&) mutable {
-          f();
-        }));
+    return onReady(lambda::CallableOnce<void(const T&)>(
+        lambda::partial(
+            [](typename std::decay<F>::type&& f, const T&) {
+              std::move(f)();
+            },
+            std::forward<F>(f),
+            lambda::_1)));
   }
 
   template <typename F, typename = typename result_of<F(const std::string&)>::type> // NOLINT(whitespace/line_length)
   const Future<T>& onFailed(F&& f, Prefer) const
   {
-    return onFailed(std::function<void(const std::string&)>(
-        [=](const std::string& message) mutable {
-          f(message);
-        }));
+    return onFailed(lambda::CallableOnce<void(const std::string&)>(
+        lambda::partial(
+            [](typename std::decay<F>::type&& f, const std::string& message) {
+              std::move(f)(message);
+            },
+            std::forward<F>(f),
+            lambda::_1)));
   }
 
   // Refer to the less preferred version of `onReady` for why these SFINAE
@@ -278,19 +290,25 @@ private:
           F>::type()>::type>
   const Future<T>& onFailed(F&& f, LessPrefer) const
   {
-    return onFailed(std::function<void(const std::string&)>(
-        [=](const std::string&) mutable {
-          f();
-        }));
+    return onFailed(lambda::CallableOnce<void(const std::string&)>(
+        lambda::partial(
+            [](typename std::decay<F>::type&& f, const std::string&) mutable {
+              std::move(f)();
+            },
+            std::forward<F>(f),
+            lambda::_1)));
   }
 
   template <typename F, typename = typename result_of<F(const Future<T>&)>::type> // NOLINT(whitespace/line_length)
   const Future<T>& onAny(F&& f, Prefer) const
   {
-    return onAny(std::function<void(const Future<T>&)>(
-        [=](const Future<T>& future) mutable {
-          f(future);
-        }));
+    return onAny(lambda::CallableOnce<void(const Future<T>&)>(
+        lambda::partial(
+            [](typename std::decay<F>::type&& f, const Future<T>& future) {
+              std::move(f)(future);
+            },
+            std::forward<F>(f),
+            lambda::_1)));
   }
 
   // Refer to the less preferred version of `onReady` for why these SFINAE
@@ -302,29 +320,36 @@ private:
           F>::type()>::type>
   const Future<T>& onAny(F&& f, LessPrefer) const
   {
-    return onAny(std::function<void(const Future<T>&)>(
-        [=](const Future<T>&) mutable {
-          f();
-        }));
+    return onAny(lambda::CallableOnce<void(const Future<T>&)>(
+        lambda::partial(
+            [](typename std::decay<F>::type&& f, const Future<T>&) {
+              std::move(f)();
+            },
+            std::forward<F>(f),
+            lambda::_1)));
   }
 
 public:
   template <typename F>
   const Future<T>& onAbandoned(F&& f) const
   {
-    return onAbandoned(std::function<void()>(
-        [=]() mutable {
-          f();
-        }));
+    return onAbandoned(lambda::CallableOnce<void()>(
+        lambda::partial(
+            [](typename std::decay<F>::type&& f) {
+              std::move(f)();
+            },
+            std::forward<F>(f))));
   }
 
   template <typename F>
   const Future<T>& onDiscard(F&& f) const
   {
-    return onDiscard(std::function<void()>(
-        [=]() mutable {
-          f();
-        }));
+    return onDiscard(lambda::CallableOnce<void()>(
+        lambda::partial(
+            [](typename std::decay<F>::type&& f) {
+              std::move(f)();
+            },
+            std::forward<F>(f))));
   }
 
   template <typename F>
@@ -342,10 +367,12 @@ public:
   template <typename F>
   const Future<T>& onDiscarded(F&& f) const
   {
-    return onDiscarded(std::function<void()>(
-        [=]() mutable {
-          f();
-        }));
+    return onDiscarded(lambda::CallableOnce<void()>(
+        lambda::partial(
+            [](typename std::decay<F>::type&& f) {
+              std::move(f)();
+            },
+            std::forward<F>(f))));
   }
 
   template <typename F>
@@ -358,22 +385,23 @@ public:
   // and associates the result of the callback with the future that is
   // returned to the caller (which may be of a different type).
   template <typename X>
-  Future<X> then(lambda::function<Future<X>(const T&)> f) const;
+  Future<X> then(lambda::CallableOnce<Future<X>(const T&)> f) const;
 
   template <typename X>
-  Future<X> then(lambda::function<X(const T&)> f) const;
+  Future<X> then(lambda::CallableOnce<X(const T&)> f) const;
 
   template <typename X>
-  Future<X> then(lambda::function<Future<X>()> f) const
+  Future<X> then(lambda::CallableOnce<Future<X>()> f) const
   {
-    return then(
-        lambda::function<Future<X>(const T&)>(lambda::bind(std::move(f))));
+    return then(lambda::CallableOnce<Future<X>(const T&)>(
+        lambda::partial(std::move(f))));
   }
 
   template <typename X>
-  Future<X> then(lambda::function<X()> f) const
+  Future<X> then(lambda::CallableOnce<X()> f) const
   {
-    return then(lambda::function<X(const T&)>(lambda::bind(std::move(f))));
+    return then(lambda::CallableOnce<X(const T&)>(
+        lambda::partial(std::move(f))));
   }
 
 private:
@@ -385,7 +413,8 @@ private:
   {
     // note the then<X> is necessary to not have an infinite loop with
     // then(F&& f)
-    return then<X>(std::move(f).operator std::function<Future<X>(const T&)>());
+    return then<X>(
+        std::move(f).operator lambda::CallableOnce<Future<X>(const T&)>());
   }
 
   // Refer to the less preferred version of `onReady` for why these SFINAE
@@ -398,13 +427,14 @@ private:
               F>::type()>::type>::type>
   Future<X> then(_Deferred<F>&& f, LessPrefer) const
   {
-    return then<X>(std::move(f).operator std::function<Future<X>()>());
+    return then<X>(std::move(f).operator lambda::CallableOnce<Future<X>()>());
   }
 
   template <typename F, typename X = typename internal::unwrap<typename result_of<F(const T&)>::type>::type> // NOLINT(whitespace/line_length)
   Future<X> then(F&& f, Prefer) const
   {
-    return then<X>(std::function<Future<X>(const T&)>(std::forward<F>(f)));
+    return then<X>(
+        lambda::CallableOnce<Future<X>(const T&)>(std::forward<F>(f)));
   }
 
   // Refer to the less preferred version of `onReady` for why these SFINAE
@@ -417,7 +447,7 @@ private:
               F>::type()>::type>::type>
   Future<X> then(F&& f, LessPrefer) const
   {
-    return then<X>(std::function<Future<X>()>(std::forward<F>(f)));
+    return then<X>(lambda::CallableOnce<Future<X>()>(std::forward<F>(f)));
   }
 
 public:
@@ -449,7 +479,7 @@ public:
   {
     return recover(
         std::move(deferred)
-          .operator std::function<Future<T>(const Future<T>&)>());
+          .operator lambda::CallableOnce<Future<T>(const Future<T>&)>());
   }
 
   // TODO(benh): Considering adding a `rescue` function for rescuing
@@ -458,7 +488,7 @@ public:
   // Installs callbacks that get executed if this future completes
   // because it failed.
   Future<T> repair(
-      const lambda::function<Future<T>(const Future<T>&)>& f) const;
+      lambda::CallableOnce<Future<T>(const Future<T>&)> f) const;
 
   // TODO(benh): Add overloads of 'repair' that don't require passing
   // in a function that takes the 'const Future<T>&' parameter and use
@@ -472,7 +502,7 @@ public:
   // was called on the returned future.
   Future<T> after(
       const Duration& duration,
-      const lambda::function<Future<T>(const Future<T>&)>& f) const;
+      lambda::CallableOnce<Future<T>(const Future<T>&)> f) const;
 
   // TODO(benh): Add overloads of 'after' that don't require passing
   // in a function that takes the 'const Future<T>&' parameter and use
@@ -584,10 +614,10 @@ namespace internal {
 //
 // TODO(*): Invoke callbacks in another execution context.
 template <typename C, typename... Arguments>
-void run(const std::vector<C>& callbacks, Arguments&&... arguments)
+void run(std::vector<C>&& callbacks, Arguments&&... arguments)
 {
   for (size_t i = 0; i < callbacks.size(); ++i) {
-    callbacks[i](std::forward<Arguments>(arguments)...);
+    std::move(callbacks[i])(std::forward<Arguments>(arguments)...);
   }
 }
 
@@ -995,8 +1025,8 @@ bool Promise<T>::discard(Future<T> future)
     // ourselves from one of the callbacks erroneously deleting the
     // future. In `Future::_set()` and `Future::fail()` we have to
     // explicitly take a copy to protect ourselves.
-    internal::run(future.data->onDiscardedCallbacks);
-    internal::run(future.data->onAnyCallbacks, future);
+    internal::run(std::move(future.data->onDiscardedCallbacks));
+    internal::run(std::move(future.data->onAnyCallbacks), future);
 
     future.data->clearAllCallbacks();
   }
@@ -1157,7 +1187,7 @@ bool Future<T>::discard()
   // future. The callbacks get destroyed when we exit from the
   // function.
   if (result) {
-    internal::run(callbacks);
+    internal::run(std::move(callbacks));
   }
 
   return result;
@@ -1183,7 +1213,7 @@ bool Future<T>::abandon(bool propagating)
   // Invoke all callbacks. The callbacks get destroyed when we exit
   // from the function.
   if (result) {
-    internal::run(callbacks);
+    internal::run(std::move(callbacks));
   }
 
   return result;
@@ -1329,7 +1359,7 @@ const Future<T>& Future<T>::onAbandoned(AbandonedCallback&& callback) const
 
   // TODO(*): Invoke callback in another execution context.
   if (run) {
-    callback(); // NOLINT(misc-use-after-move)
+    std::move(callback)(); // NOLINT(misc-use-after-move)
   }
 
   return *this;
@@ -1351,7 +1381,7 @@ const Future<T>& Future<T>::onDiscard(DiscardCallback&& callback) const
 
   // TODO(*): Invoke callback in another execution context.
   if (run) {
-    callback(); // NOLINT(misc-use-after-move)
+    std::move(callback)(); // NOLINT(misc-use-after-move)
   }
 
   return *this;
@@ -1373,7 +1403,7 @@ const Future<T>& Future<T>::onReady(ReadyCallback&& callback) const
 
   // TODO(*): Invoke callback in another execution context.
   if (run) {
-    callback(data->result.get()); // NOLINT(misc-use-after-move)
+    std::move(callback)(data->result.get()); // NOLINT(misc-use-after-move)
   }
 
   return *this;
@@ -1395,7 +1425,7 @@ const Future<T>& Future<T>::onFailed(FailedCallback&& callback) const
 
   // TODO(*): Invoke callback in another execution context.
   if (run) {
-    callback(data->result.error()); // NOLINT(misc-use-after-move)
+    std::move(callback)(data->result.error()); // NOLINT(misc-use-after-move)
   }
 
   return *this;
@@ -1417,7 +1447,7 @@ const Future<T>& Future<T>::onDiscarded(DiscardedCallback&& callback) const
 
   // TODO(*): Invoke callback in another execution context.
   if (run) {
-    callback(); // NOLINT(misc-use-after-move)
+    std::move(callback)(); // NOLINT(misc-use-after-move)
   }
 
   return *this;
@@ -1439,7 +1469,7 @@ const Future<T>& Future<T>::onAny(AnyCallback&& callback) const
 
   // TODO(*): Invoke callback in another execution context.
   if (run) {
-    callback(*this); // NOLINT(misc-use-after-move)
+    std::move(callback)(*this); // NOLINT(misc-use-after-move)
   }
 
   return *this;
@@ -1451,7 +1481,7 @@ namespace internal {
 // from the function 'then' whose parameter 'f' doesn't return a
 // Future since the compiler can't properly infer otherwise.
 template <typename T, typename X>
-void thenf(const lambda::function<Future<X>(const T&)>& f,
+void thenf(lambda::CallableOnce<Future<X>(const T&)>&& f,
            const std::shared_ptr<Promise<X>>& promise,
            const Future<T>& future)
 {
@@ -1459,7 +1489,7 @@ void thenf(const lambda::function<Future<X>(const T&)>& f,
     if (future.hasDiscard()) {
       promise->discard();
     } else {
-      promise->associate(f(future.get()));
+      promise->associate(std::move(f)(future.get()));
     }
   } else if (future.isFailed()) {
     promise->fail(future.failure());
@@ -1470,7 +1500,7 @@ void thenf(const lambda::function<Future<X>(const T&)>& f,
 
 
 template <typename T, typename X>
-void then(const lambda::function<X(const T&)>& f,
+void then(lambda::CallableOnce<X(const T&)>&& f,
           const std::shared_ptr<Promise<X>>& promise,
           const Future<T>& future)
 {
@@ -1478,7 +1508,7 @@ void then(const lambda::function<X(const T&)>& f,
     if (future.hasDiscard()) {
       promise->discard();
     } else {
-      promise->set(f(future.get()));
+      promise->set(std::move(f)(future.get()));
     }
   } else if (future.isFailed()) {
     promise->fail(future.failure());
@@ -1490,13 +1520,13 @@ void then(const lambda::function<X(const T&)>& f,
 
 template <typename T>
 void repair(
-    const lambda::function<Future<T>(const Future<T>&)>& f,
+    lambda::CallableOnce<Future<T>(const Future<T>&)>&& f,
     const std::shared_ptr<Promise<T>>& promise,
     const Future<T>& future)
 {
   CHECK(!future.isPending());
   if (future.isFailed()) {
-    promise->associate(f(future));
+    promise->associate(std::move(f)(future));
   } else {
     promise->associate(future);
   }
@@ -1505,7 +1535,7 @@ void repair(
 
 template <typename T>
 void expired(
-    const lambda::function<Future<T>(const Future<T>&)>& f,
+    const std::shared_ptr<lambda::CallableOnce<Future<T>(const Future<T>&)>>& f,
     const std::shared_ptr<Latch>& latch,
     const std::shared_ptr<Promise<T>>& promise,
     const std::shared_ptr<Option<Timer>>& timer,
@@ -1525,7 +1555,7 @@ void expired(
     // if the future has been discarded and rather than hiding a
     // non-deterministic bug we always call 'f' if the timer has
     // expired.
-    promise->associate(f(future));
+    promise->associate(std::move(*f)(future));
   }
 }
 
@@ -1559,12 +1589,12 @@ void after(
 
 template <typename T>
 template <typename X>
-Future<X> Future<T>::then(lambda::function<Future<X>(const T&)> f) const
+Future<X> Future<T>::then(lambda::CallableOnce<Future<X>(const T&)> f) const
 {
   std::shared_ptr<Promise<X>> promise(new Promise<X>());
 
-  lambda::function<void(const Future<T>&)> thenf =
-    lambda::bind(&internal::thenf<T, X>, std::move(f), promise, lambda::_1);
+  lambda::CallableOnce<void(const Future<T>&)> thenf =
+    lambda::partial(&internal::thenf<T, X>, std::move(f), promise, lambda::_1);
 
   onAny(std::move(thenf));
 
@@ -1583,12 +1613,12 @@ Future<X> Future<T>::then(lambda::function<Future<X>(const T&)> f) const
 
 template <typename T>
 template <typename X>
-Future<X> Future<T>::then(lambda::function<X(const T&)> f) const
+Future<X> Future<T>::then(lambda::CallableOnce<X(const T&)> f) const
 {
   std::shared_ptr<Promise<X>> promise(new Promise<X>());
 
-  lambda::function<void(const Future<T>&)> then =
-    lambda::bind(&internal::then<T, X>, std::move(f), promise, lambda::_1);
+  lambda::CallableOnce<void(const Future<T>&)> then =
+    lambda::partial(&internal::then<T, X>, std::move(f), promise, lambda::_1);
 
   onAny(std::move(then));
 
@@ -1613,6 +1643,11 @@ Future<T> Future<T>::recover(F&& f) const
 
   const Future<T> future = *this;
 
+  typedef decltype(std::move(f)(future)) R;
+
+  std::shared_ptr<lambda::CallableOnce<R(const Future<T>&)>> callable(
+      new lambda::CallableOnce<R(const Future<T>&)>(std::move(f)));
+
   onAny([=]() {
     if (future.isDiscarded() || future.isFailed()) {
       // We reset `discard` so that if a future gets returned from
@@ -1624,7 +1659,7 @@ Future<T> Future<T>::recover(F&& f) const
         promise->f.data->discard = false;
       }
 
-      promise->set(f(future));
+      promise->set(std::move(*callable)(future));
     } else {
       promise->associate(future);
     }
@@ -1635,7 +1670,7 @@ Future<T> Future<T>::recover(F&& f) const
     synchronized (promise->f.data->lock) {
       promise->f.data->discard = false;
     }
-    promise->set(f(future));
+    promise->set(std::move(*callable)(future));
   });
 
   // Propagate discarding up the chain. To avoid cyclic dependencies,
@@ -1649,11 +1684,12 @@ Future<T> Future<T>::recover(F&& f) const
 
 template <typename T>
 Future<T> Future<T>::repair(
-    const lambda::function<Future<T>(const Future<T>&)>& f) const
+    lambda::CallableOnce<Future<T>(const Future<T>&)> f) const
 {
   std::shared_ptr<Promise<T>> promise(new Promise<T>());
 
-  onAny(lambda::bind(&internal::repair<T>, f, promise, lambda::_1));
+  onAny(
+      lambda::partial(&internal::repair<T>, std::move(f), promise, lambda::_1));
 
   onAbandoned([=]() {
     promise->future().abandon();
@@ -1671,7 +1707,7 @@ Future<T> Future<T>::repair(
 template <typename T>
 Future<T> Future<T>::after(
     const Duration& duration,
-    const lambda::function<Future<T>(const Future<T>&)>& f) const
+    lambda::CallableOnce<Future<T>(const Future<T>&)> f) const
 {
   // TODO(benh): Using a Latch here but Once might be cleaner.
   // Unfortunately, Once depends on Future so we can't easily use it
@@ -1694,6 +1730,9 @@ Future<T> Future<T>::after(
   // issues we have to worry about.
   std::shared_ptr<Option<Timer>> timer(new Option<Timer>());
 
+  typedef lambda::CallableOnce<Future<T>(const Future<T>&)> F;
+  std::shared_ptr<F> callable(new F(std::move(f)));
+
   // Set up a timer to invoke the callback if this future has not
   // completed. Note that we do not pass a weak reference for this
   // future as we don't want the future to get cleaned up and then
@@ -1706,7 +1745,8 @@ Future<T> Future<T>::after(
   // force the deallocation of our copy of the timer).
   *timer = Clock::timer(
       duration,
-      lambda::bind(&internal::expired<T>, f, latch, promise, timer, *this));
+      lambda::bind(&internal::expired<T>, callable, latch, promise, timer,
+          *this));
 
   onAny(lambda::bind(&internal::after<T>, latch, promise, timer, lambda::_1));
 
@@ -1758,8 +1798,8 @@ bool Future<T>::_set(U&& u)
     // Grab a copy of `data` just in case invoking the callbacks
     // erroneously attempts to delete this future.
     std::shared_ptr<typename Future<T>::Data> copy = data;
-    internal::run(copy->onReadyCallbacks, copy->result.get());
-    internal::run(copy->onAnyCallbacks, *this);
+    internal::run(std::move(copy->onReadyCallbacks), copy->result.get());
+    internal::run(std::move(copy->onAnyCallbacks), *this);
 
     copy->clearAllCallbacks();
   }
@@ -1788,8 +1828,8 @@ bool Future<T>::fail(const std::string& _message)
     // Grab a copy of `data` just in case invoking the callbacks
     // erroneously attempts to delete this future.
     std::shared_ptr<typename Future<T>::Data> copy = data;
-    internal::run(copy->onFailedCallbacks, copy->result.error());
-    internal::run(copy->onAnyCallbacks, *this);
+    internal::run(std::move(copy->onFailedCallbacks), copy->result.error());
+    internal::run(std::move(copy->onAnyCallbacks), *this);
 
     copy->clearAllCallbacks();
   }

http://git-wip-us.apache.org/repos/asf/mesos/blob/8014e3f9/3rdparty/libprocess/src/tests/future_tests.cpp
----------------------------------------------------------------------
diff --git a/3rdparty/libprocess/src/tests/future_tests.cpp b/3rdparty/libprocess/src/tests/future_tests.cpp
index bb7a1c8..3dadd72 100644
--- a/3rdparty/libprocess/src/tests/future_tests.cpp
+++ b/3rdparty/libprocess/src/tests/future_tests.cpp
@@ -210,6 +210,55 @@ TEST(FutureTest, Then)
 }
 
 
+TEST(FutureTest, CallableOnce)
+{
+  Promise<Nothing> promise;
+  promise.set(Nothing());
+
+  Future<int> future = promise.future()
+    .then(lambda::partial(
+        [](std::unique_ptr<int>&& o) {
+          return *o;
+        },
+        std::unique_ptr<int>(new int(42))));
+
+  ASSERT_TRUE(future.isReady());
+  EXPECT_EQ(42, future.get());
+
+  int n = 0;
+  future = promise.future()
+    .onReady(lambda::partial(
+        [&n](std::unique_ptr<int> o) {
+          n += *o;
+        },
+        std::unique_ptr<int>(new int(1))))
+    .onAny(lambda::partial(
+        [&n](std::unique_ptr<int>&& o) {
+          n += *o;
+        },
+        std::unique_ptr<int>(new int(10))))
+    .onFailed(lambda::partial(
+        [&n](const std::unique_ptr<int>& o) {
+          n += *o;
+        },
+        std::unique_ptr<int>(new int(100))))
+    .onDiscard(lambda::partial(
+        [&n](std::unique_ptr<int>&& o) {
+          n += *o;
+        },
+        std::unique_ptr<int>(new int(1000))))
+    .onDiscarded(lambda::partial(
+        [&n](std::unique_ptr<int>&& o) {
+          n += *o;
+        },
+        std::unique_ptr<int>(new int(10000))))
+    .then([&n]() { return n; });
+
+  ASSERT_TRUE(future.isReady());
+  EXPECT_EQ(11, future.get());
+}
+
+
 Future<int> repair(const Future<int>& future)
 {
   EXPECT_TRUE(future.isFailed());