You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@mesos.apache.org by be...@apache.org on 2017/11/20 00:54:22 UTC
[11/15] mesos git commit: Added overload of process::await that takes
and returns single future.
Added overload of process::await that takes and returns single future.
Review: https://reviews.apache.org/r/61152
Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/7a232736
Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/7a232736
Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/7a232736
Branch: refs/heads/master
Commit: 7a232736c0994c90b8209a3beec3df3d9d197295
Parents: b8958e0
Author: Benjamin Hindman <be...@gmail.com>
Authored: Wed Dec 28 06:42:06 2016 -0800
Committer: Benjamin Hindman <be...@gmail.com>
Committed: Sun Nov 19 16:51:26 2017 -0800
----------------------------------------------------------------------
3rdparty/libprocess/include/process/collect.hpp | 56 ++++++++++++++++
3rdparty/libprocess/src/tests/collect_tests.cpp | 67 ++++++++++++++++++++
2 files changed, 123 insertions(+)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/mesos/blob/7a232736/3rdparty/libprocess/include/process/collect.hpp
----------------------------------------------------------------------
diff --git a/3rdparty/libprocess/include/process/collect.hpp b/3rdparty/libprocess/include/process/collect.hpp
index 392b06d..bdb57ef 100644
--- a/3rdparty/libprocess/include/process/collect.hpp
+++ b/3rdparty/libprocess/include/process/collect.hpp
@@ -57,6 +57,62 @@ template <typename... Ts>
Future<std::tuple<Future<Ts>...>> await(const Future<Ts>&... futures);
+// Waits on the future specified and returns after the future has been
+// completed or the await has been discarded. This is useful when
+// wanting to "break out" of a future chain if a discard occurs but
+// the underlying future has not been discarded. For example:
+//
+// Future<string> foo()
+// {
+// return bar()
+// .then([](int i) {
+// return stringify(i);
+// });
+// }
+//
+// Future<stringify> future = foo();
+// future.discard();
+//
+// In the above code we'll propagate the discard to `bar()` but might
+// wait forever if `bar()` can't discard their computation. In some
+// circumstances you might want to break out early and you can do that
+// by using `await`, because if we discard an `await` that function
+// will return even though all of the future's it is waiting on have
+// not been discarded (in other words, the `await` can be properly
+// discarded even if the underlying futures have not been discarded).
+//
+// Future<string> foo()
+// {
+// return await(bar())
+// .recover([](const Future<Future<string>>& future) {
+// if (future.isDiscarded()) {
+// cleanup();
+// }
+// return Failure("Discarded waiting");
+// })
+// .then([](const Future<int>& future) {
+// return future
+// .then([](int i) {
+// return stringify(i);
+// });
+// });
+// }
+//
+// Future<string> future = foo();
+// future.discard();
+//
+// Using `await` will enable you to continue execution even if `bar()`
+// does not (or can not) discard their execution.
+template <typename T>
+Future<Future<T>> await(const Future<T>& future)
+{
+ return await(std::list<Future<T>>{future})
+ .then([=]() {
+ return Future<Future<T>>(future);
+ });
+}
+
+
namespace internal {
template <typename T>
http://git-wip-us.apache.org/repos/asf/mesos/blob/7a232736/3rdparty/libprocess/src/tests/collect_tests.cpp
----------------------------------------------------------------------
diff --git a/3rdparty/libprocess/src/tests/collect_tests.cpp b/3rdparty/libprocess/src/tests/collect_tests.cpp
index 72f2a80..7c2ba90 100644
--- a/3rdparty/libprocess/src/tests/collect_tests.cpp
+++ b/3rdparty/libprocess/src/tests/collect_tests.cpp
@@ -10,17 +10,21 @@
// See the License for the specific language governing permissions and
// limitations under the License
+#include <string>
+
#include <process/collect.hpp>
#include <process/gtest.hpp>
#include <process/owned.hpp>
#include <stout/gtest.hpp>
+#include <stout/stringify.hpp>
using process::Future;
using process::Owned;
using process::Promise;
using std::list;
+using std::string;
TEST(CollectTest, Ready)
{
@@ -303,3 +307,66 @@ TEST(AwaitTest, AbandonedPropagation)
AWAIT_EQ(42, future);
}
+
+
+TEST(AwaitTest, AwaitSingleDiscard)
+{
+ Promise<int> promise;
+
+ auto bar = [&]() {
+ return promise.future();
+ };
+
+ auto foo = [&]() {
+ return await(bar())
+ .then([](const Future<int>& f) {
+ return f
+ .then([](int i) {
+ return stringify(i);
+ });
+ });
+ };
+
+ Future<string> future = foo();
+
+ future.discard();
+
+ AWAIT_DISCARDED(future);
+
+ EXPECT_TRUE(promise.future().hasDiscard());
+}
+
+
+TEST(AwaitTest, AwaitSingleAbandon)
+{
+ Owned<Promise<int>> promise(new Promise<int>());
+
+ auto bar = [&]() {
+ return promise->future();
+ };
+
+ auto foo = [&]() {
+ return await(bar())
+ .then([](const Future<int>& f) {
+ return f
+ .then([](int i) {
+ return stringify(i);
+ });
+ });
+ };
+
+ // There is a race from the time that we reset the promise to when
+ // the await process is terminated so we need to use Future::recover
+ // to properly handle this case.
+ Future<string> future = foo()
+ .recover([](const Future<string>& f) -> Future<string> {
+ if (f.isAbandoned()) {
+ return "hello";
+ }
+ return f;
+ });
+
+ promise.reset();
+
+ AWAIT_EQ("hello", future);
+}