You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@mesos.apache.org by be...@apache.org on 2015/02/08 16:51:37 UTC

[10/20] mesos git commit: Added Future::repair.

Added Future::repair.

Review: https://reviews.apache.org/r/29535


Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/b134530f
Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/b134530f
Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/b134530f

Branch: refs/heads/master
Commit: b134530f43dceb7b06639260d004c63cea989261
Parents: 853bd8f
Author: Benjamin Hindman <be...@gmail.com>
Authored: Tue Dec 16 13:13:55 2014 -0800
Committer: Benjamin Hindman <be...@gmail.com>
Committed: Sat Feb 7 14:42:46 2015 -0800

----------------------------------------------------------------------
 3rdparty/libprocess/include/process/future.hpp  | 60 +++++++++++++++++---
 3rdparty/libprocess/src/tests/process_tests.cpp | 42 ++++++++++++++
 2 files changed, 94 insertions(+), 8 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/mesos/blob/b134530f/3rdparty/libprocess/include/process/future.hpp
----------------------------------------------------------------------
diff --git a/3rdparty/libprocess/include/process/future.hpp b/3rdparty/libprocess/include/process/future.hpp
index 1b427fe..a26122f 100644
--- a/3rdparty/libprocess/include/process/future.hpp
+++ b/3rdparty/libprocess/include/process/future.hpp
@@ -468,6 +468,15 @@ public:
 #undef TEMPLATE
 #endif // __cplusplus >= 201103L
 
+  // Installs callbacks that get executed if this future completes
+  // because it failed.
+  Future<T> repair(
+      const lambda::function<Future<T>(const Future<T>&)>& f) const;
+
+  // TODO(benh): Add overloads of 'repair' that don't require passing
+  // in a function that takes the 'const Future<T>&' parameter and use
+  // Prefer/LessPrefer to disambiguate.
+
   // Invokes the specified function after some duration if this future
   // has not been completed (set, failed, or discarded). Note that
   // this function is agnostic of discard semantics and while it will
@@ -1467,9 +1476,12 @@ const Future<T>& Future<T>::onAny(const AnyCallback& callback) const
 
 namespace internal {
 
+// NOTE: We need to name this 'thenf' versus 'then' to distinguish it
+// from the function 'then' whose parameter 'f' doesn't return a
+// Future since the compiler can't properly infer otherwise.
 template <typename T, typename X>
-void thenf(const memory::shared_ptr<Promise<X> >& promise,
-           const lambda::function<Future<X>(const T&)>& f,
+void thenf(const lambda::function<Future<X>(const T&)>& f,
+           const memory::shared_ptr<Promise<X>>& promise,
            const Future<T>& future)
 {
   if (future.isReady()) {
@@ -1487,8 +1499,8 @@ void thenf(const memory::shared_ptr<Promise<X> >& promise,
 
 
 template <typename T, typename X>
-void then(const memory::shared_ptr<Promise<X> >& promise,
-          const lambda::function<X(const T&)>& f,
+void then(const lambda::function<X(const T&)>& f,
+          const memory::shared_ptr<Promise<X>>& promise,
           const Future<T>& future)
 {
   if (future.isReady()) {
@@ -1506,6 +1518,21 @@ void then(const memory::shared_ptr<Promise<X> >& promise,
 
 
 template <typename T>
+void repair(
+    const lambda::function<Future<T>(const Future<T>&)>& f,
+    const memory::shared_ptr<Promise<T>>& promise,
+    const Future<T>& future)
+{
+  CHECK(!future.isPending());
+  if (future.isFailed()) {
+    promise->associate(f(future));
+  } else {
+    promise->associate(future);
+  }
+}
+
+
+template <typename T>
 void expired(
     const lambda::function<Future<T>(const Future<T>&)>& f,
     const memory::shared_ptr<Latch>& latch,
@@ -1546,10 +1573,10 @@ template <typename T>
 template <typename X>
 Future<X> Future<T>::then(const lambda::function<Future<X>(const T&)>& f) const
 {
-  memory::shared_ptr<Promise<X> > promise(new Promise<X>());
+  memory::shared_ptr<Promise<X>> promise(new Promise<X>());
 
   lambda::function<void(const Future<T>&)> thenf =
-    lambda::bind(&internal::thenf<T, X>, promise, f, lambda::_1);
+    lambda::bind(&internal::thenf<T, X>, f, promise, lambda::_1);
 
   onAny(thenf);
 
@@ -1566,10 +1593,10 @@ template <typename T>
 template <typename X>
 Future<X> Future<T>::then(const lambda::function<X(const T&)>& f) const
 {
-  memory::shared_ptr<Promise<X> > promise(new Promise<X>());
+  memory::shared_ptr<Promise<X>> promise(new Promise<X>());
 
   lambda::function<void(const Future<T>&)> then =
-    lambda::bind(&internal::then<T, X>, promise, f, lambda::_1);
+    lambda::bind(&internal::then<T, X>, f, promise, lambda::_1);
 
   onAny(then);
 
@@ -1583,6 +1610,23 @@ Future<X> Future<T>::then(const lambda::function<X(const T&)>& f) const
 
 
 template <typename T>
+Future<T> Future<T>::repair(
+    const lambda::function<Future<T>(const Future<T>&)>& f) const
+{
+  memory::shared_ptr<Promise<T>> promise(new Promise<T>());
+
+  onAny(lambda::bind(&internal::repair<T>, f, promise, lambda::_1));
+
+  // Propagate discarding up the chain. To avoid cyclic dependencies,
+  // we keep a weak future in the callback.
+  promise->future().onDiscard(
+      lambda::bind(&internal::discard<T>, WeakFuture<T>(*this)));
+
+  return promise->future();
+}
+
+
+template <typename T>
 Future<T> Future<T>::after(
     const Duration& duration,
     const lambda::function<Future<T>(const Future<T>&)>& f) const

http://git-wip-us.apache.org/repos/asf/mesos/blob/b134530f/3rdparty/libprocess/src/tests/process_tests.cpp
----------------------------------------------------------------------
diff --git a/3rdparty/libprocess/src/tests/process_tests.cpp b/3rdparty/libprocess/src/tests/process_tests.cpp
index fe758ab..1bcb3c6 100644
--- a/3rdparty/libprocess/src/tests/process_tests.cpp
+++ b/3rdparty/libprocess/src/tests/process_tests.cpp
@@ -164,6 +164,48 @@ TEST(Process, then)
 }
 
 
+Future<int> repair(const Future<int>& future)
+{
+  EXPECT_TRUE(future.isFailed());
+  EXPECT_EQ("Failure", future.failure());
+  return 43;
+}
+
+
+// Checks that 'repair' callback gets executed if the future failed
+// and not executed if the future is completed successfully.
+TEST(Process, repair)
+{
+  // Check that the 'repair' callback _does not_ get executed by
+  // making sure that when we complete the promise with a value that's
+  // the value that we get back.
+  Promise<int> promise1;
+
+  Future<int> future1 = promise1.future()
+    .repair(lambda::bind(&repair, lambda::_1));
+
+  EXPECT_TRUE(future1.isPending());
+
+  promise1.set(42); // So this means 'repair' should not get executed.
+
+  AWAIT_EXPECT_EQ(42, future1);
+
+  // Check that the 'repair' callback gets executed by failing the
+  // promise which should invoke the 'repair' callback.
+  Promise<int> promise2;
+
+  Future<int> future2 = promise2.future()
+    .repair(lambda::bind(&repair, lambda::_1));
+
+  EXPECT_TRUE(future2.isPending());
+
+  promise2.fail("Failure"); // So 'repair' should get called returning '43'.
+
+  AWAIT_EXPECT_EQ(43, future2);
+}
+
+
+
 Future<Nothing> after(volatile bool* executed, const Future<Nothing>& future)
 {
   EXPECT_TRUE(future.hasDiscard());