You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@mesos.apache.org by be...@apache.org on 2017/11/20 00:54:23 UTC
[12/15] mesos git commit: Added Future::onAbandoned semantics to
process::collect/await.
Added Future::onAbandoned semantics to process::collect/await.
Review: https://reviews.apache.org/r/61150
Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/613a846f
Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/613a846f
Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/613a846f
Branch: refs/heads/master
Commit: 613a846f197f74d116641c66b9caade65de4edae
Parents: e2eb6ae
Author: Benjamin Hindman <be...@gmail.com>
Authored: Wed Aug 2 20:13:57 2017 -0700
Committer: Benjamin Hindman <be...@gmail.com>
Committed: Sun Nov 19 16:51:26 2017 -0800
----------------------------------------------------------------------
3rdparty/libprocess/include/process/collect.hpp | 19 +++++++
3rdparty/libprocess/src/socket.cpp | 2 +-
3rdparty/libprocess/src/tests/collect_tests.cpp | 54 ++++++++++++++++++++
3 files changed, 74 insertions(+), 1 deletion(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/mesos/blob/613a846f/3rdparty/libprocess/include/process/collect.hpp
----------------------------------------------------------------------
diff --git a/3rdparty/libprocess/include/process/collect.hpp b/3rdparty/libprocess/include/process/collect.hpp
index fccf55a..392b06d 100644
--- a/3rdparty/libprocess/include/process/collect.hpp
+++ b/3rdparty/libprocess/include/process/collect.hpp
@@ -76,6 +76,7 @@ public:
delete promise;
}
+protected:
virtual void initialize()
{
// Stop this nonsense if nobody cares.
@@ -83,10 +84,19 @@ public:
foreach (const Future<T>& future, futures) {
future.onAny(defer(this, &CollectProcess::waited, lambda::_1));
+ future.onAbandoned(defer(this, &CollectProcess::abandoned));
}
}
private:
+ void abandoned()
+ {
+ // There is no use waiting because this future will never complete
+ // so terminate this process which will cause `promise` to get
+ // deleted and our future to also be abandoned.
+ terminate(this);
+ }
+
void discarded()
{
promise->discard();
@@ -150,10 +160,19 @@ public:
foreach (const Future<T>& future, futures) {
future.onAny(defer(this, &AwaitProcess::waited, lambda::_1));
+ future.onAbandoned(defer(this, &AwaitProcess::abandoned));
}
}
private:
+ void abandoned()
+ {
+ // There is no use waiting because this future will never complete
+ // so terminate this process which will cause `promise` to get
+ // deleted and our future to also be abandoned.
+ terminate(this);
+ }
+
void discarded()
{
promise->discard();
http://git-wip-us.apache.org/repos/asf/mesos/blob/613a846f/3rdparty/libprocess/src/socket.cpp
----------------------------------------------------------------------
diff --git a/3rdparty/libprocess/src/socket.cpp b/3rdparty/libprocess/src/socket.cpp
index 490bbd7..504cb54 100644
--- a/3rdparty/libprocess/src/socket.cpp
+++ b/3rdparty/libprocess/src/socket.cpp
@@ -162,7 +162,7 @@ Future<string> SocketImpl::recv(const Option<ssize_t>& size)
[=]() {
return self->recv(data.get(), chunk);
},
- [=](size_t length) -> ControlFlow<string> {
+ [=](size_t length) mutable -> ControlFlow<string> {
if (length == 0) { // EOF.
// Return everything we've received thus far, a subsequent
// receive will return an empty string.
http://git-wip-us.apache.org/repos/asf/mesos/blob/613a846f/3rdparty/libprocess/src/tests/collect_tests.cpp
----------------------------------------------------------------------
diff --git a/3rdparty/libprocess/src/tests/collect_tests.cpp b/3rdparty/libprocess/src/tests/collect_tests.cpp
index 91760ec..72f2a80 100644
--- a/3rdparty/libprocess/src/tests/collect_tests.cpp
+++ b/3rdparty/libprocess/src/tests/collect_tests.cpp
@@ -12,10 +12,12 @@
#include <process/collect.hpp>
#include <process/gtest.hpp>
+#include <process/owned.hpp>
#include <stout/gtest.hpp>
using process::Future;
+using process::Owned;
using process::Promise;
using std::list;
@@ -132,6 +134,30 @@ TEST(CollectTest, DiscardPropagation)
}
+TEST(CollectTest, AbandonedPropagation)
+{
+ Owned<Promise<int>> promise(new Promise<int>());
+
+ // There is a race from the time that we reset the promise to when
+ // the collect process is terminated so we need to use
+ // Future::recover to properly handle this case.
+ Future<int> future = process::collect(promise->future())
+ .recover([](const Future<std::tuple<int>>& f) -> Future<std::tuple<int>> {
+ if (f.isAbandoned()) {
+ return std::make_tuple(42);
+ }
+ return f;
+ })
+ .then([](const std::tuple<int>& t) {
+ return std::get<0>(t);
+ });
+
+ promise.reset();
+
+ AWAIT_EQ(42, future);
+}
+
+
TEST(AwaitTest, Success)
{
// First ensure an empty list functions correctly.
@@ -249,3 +275,31 @@ TEST(AwaitTest, DiscardPropagation)
AWAIT_DISCARDED(promise1.future());
AWAIT_DISCARDED(promise2.future());
}
+
+
+TEST(AwaitTest, AbandonedPropagation)
+{
+ Owned<Promise<int>> promise(new Promise<int>());
+
+ // There is a race from the time that we reset the promise to when
+ // the await process is terminated so we need to use
+ // Future::recover to properly handle this case.
+ Future<int> future = process::await(promise->future(), Future<int>())
+ .recover([](const Future<std::tuple<Future<int>, Future<int>>>& f)
+ -> Future<std::tuple<Future<int>, Future<int>>> {
+ if (f.isAbandoned()) {
+ return std::make_tuple(42, 0);
+ }
+ return f;
+ })
+ .then([](const std::tuple<Future<int>, Future<int>>& t) {
+ return std::get<0>(t)
+ .then([](int i) {
+ return i;
+ });
+ });
+
+ promise.reset();
+
+ AWAIT_EQ(42, future);
+}