You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@mesos.apache.org by be...@apache.org on 2017/11/20 00:54:22 UTC

[11/15] mesos git commit: Added overload of process::await that takes and returns single future.

Added overload of process::await that takes and returns single future.

Review: https://reviews.apache.org/r/61152


Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/7a232736
Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/7a232736
Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/7a232736

Branch: refs/heads/master
Commit: 7a232736c0994c90b8209a3beec3df3d9d197295
Parents: b8958e0
Author: Benjamin Hindman <be...@gmail.com>
Authored: Wed Dec 28 06:42:06 2016 -0800
Committer: Benjamin Hindman <be...@gmail.com>
Committed: Sun Nov 19 16:51:26 2017 -0800

----------------------------------------------------------------------
 3rdparty/libprocess/include/process/collect.hpp | 56 ++++++++++++++++
 3rdparty/libprocess/src/tests/collect_tests.cpp | 67 ++++++++++++++++++++
 2 files changed, 123 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/mesos/blob/7a232736/3rdparty/libprocess/include/process/collect.hpp
----------------------------------------------------------------------
diff --git a/3rdparty/libprocess/include/process/collect.hpp b/3rdparty/libprocess/include/process/collect.hpp
index 392b06d..bdb57ef 100644
--- a/3rdparty/libprocess/include/process/collect.hpp
+++ b/3rdparty/libprocess/include/process/collect.hpp
@@ -57,6 +57,62 @@ template <typename... Ts>
 Future<std::tuple<Future<Ts>...>> await(const Future<Ts>&... futures);
 
 
+// Waits on the future specified and returns after the future has been
+// completed or the await has been discarded. This is useful when
+// wanting to "break out" of a future chain if a discard occurs but
+// the underlying future has not been discarded. For example:
+//
+//   Future<string> foo()
+//   {
+//     return bar()
+//       .then([](int i) {
+//         return stringify(i);
+//       });
+//   }
+//
+//   Future<stringify> future = foo();
+//   future.discard();
+//
+// In the above code we'll propagate the discard to `bar()` but might
+// wait forever if `bar()` can't discard their computation. In some
+// circumstances you might want to break out early and you can do that
+// by using `await`, because if we discard an `await` that function
+// will return even though all of the future's it is waiting on have
+// not been discarded (in other words, the `await` can be properly
+// discarded even if the underlying futures have not been discarded).
+//
+//   Future<string> foo()
+//   {
+//     return await(bar())
+//       .recover([](const Future<Future<string>>& future) {
+//         if (future.isDiscarded()) {
+//           cleanup();
+//         }
+//         return Failure("Discarded waiting");
+//       })
+//       .then([](const Future<int>& future) {
+//         return future
+//           .then([](int i) {
+//             return stringify(i);
+//           });
+//       });
+//   }
+//
+//   Future<string> future = foo();
+//   future.discard();
+//
+// Using `await` will enable you to continue execution even if `bar()`
+// does not (or can not) discard their execution.
+template <typename T>
+Future<Future<T>> await(const Future<T>& future)
+{
+  return await(std::list<Future<T>>{future})
+    .then([=]() {
+      return Future<Future<T>>(future);
+    });
+}
+
+
 namespace internal {
 
 template <typename T>

http://git-wip-us.apache.org/repos/asf/mesos/blob/7a232736/3rdparty/libprocess/src/tests/collect_tests.cpp
----------------------------------------------------------------------
diff --git a/3rdparty/libprocess/src/tests/collect_tests.cpp b/3rdparty/libprocess/src/tests/collect_tests.cpp
index 72f2a80..7c2ba90 100644
--- a/3rdparty/libprocess/src/tests/collect_tests.cpp
+++ b/3rdparty/libprocess/src/tests/collect_tests.cpp
@@ -10,17 +10,21 @@
 // See the License for the specific language governing permissions and
 // limitations under the License
 
+#include <string>
+
 #include <process/collect.hpp>
 #include <process/gtest.hpp>
 #include <process/owned.hpp>
 
 #include <stout/gtest.hpp>
+#include <stout/stringify.hpp>
 
 using process::Future;
 using process::Owned;
 using process::Promise;
 
 using std::list;
+using std::string;
 
 TEST(CollectTest, Ready)
 {
@@ -303,3 +307,66 @@ TEST(AwaitTest, AbandonedPropagation)
 
   AWAIT_EQ(42, future);
 }
+
+
+TEST(AwaitTest, AwaitSingleDiscard)
+{
+  Promise<int> promise;
+
+  auto bar = [&]() {
+    return promise.future();
+  };
+
+  auto foo = [&]() {
+    return await(bar())
+      .then([](const Future<int>& f) {
+        return f
+          .then([](int i) {
+            return stringify(i);
+          });
+      });
+  };
+
+  Future<string> future = foo();
+
+  future.discard();
+
+  AWAIT_DISCARDED(future);
+
+  EXPECT_TRUE(promise.future().hasDiscard());
+}
+
+
+TEST(AwaitTest, AwaitSingleAbandon)
+{
+  Owned<Promise<int>> promise(new Promise<int>());
+
+  auto bar = [&]() {
+    return promise->future();
+  };
+
+  auto foo = [&]() {
+    return await(bar())
+      .then([](const Future<int>& f) {
+        return f
+          .then([](int i) {
+            return stringify(i);
+          });
+      });
+  };
+
+  // There is a race from the time that we reset the promise to when
+  // the await process is terminated so we need to use Future::recover
+  // to properly handle this case.
+  Future<string> future = foo()
+    .recover([](const Future<string>& f) -> Future<string> {
+      if (f.isAbandoned()) {
+        return "hello";
+      }
+      return f;
+    });
+
+  promise.reset();
+
+  AWAIT_EQ("hello", future);
+}