You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@mesos.apache.org by be...@apache.org on 2017/11/20 00:54:17 UTC
[06/15] mesos git commit: Added Future::onAbandoned and
Future::isAbandoned.
Added Future::onAbandoned and Future::isAbandoned.
Review: https://reviews.apache.org/r/61147
Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/560ae4fc
Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/560ae4fc
Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/560ae4fc
Branch: refs/heads/master
Commit: 560ae4fcb3de5ca6605bb58d2f87babcd6b75593
Parents: e636aa7
Author: Benjamin Hindman <be...@gmail.com>
Authored: Wed Aug 2 20:26:48 2017 -0700
Committer: Benjamin Hindman <be...@gmail.com>
Committed: Sun Nov 19 16:33:47 2017 -0800
----------------------------------------------------------------------
3rdparty/libprocess/include/process/check.hpp | 19 +++
3rdparty/libprocess/include/process/future.hpp | 163 ++++++++++++++++++-
3rdparty/libprocess/include/process/gtest.hpp | 48 ++++++
3rdparty/libprocess/src/tests/future_tests.cpp | 36 ++++
3rdparty/libprocess/src/tests/metrics_tests.cpp | 7 +-
5 files changed, 267 insertions(+), 6 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/mesos/blob/560ae4fc/3rdparty/libprocess/include/process/check.hpp
----------------------------------------------------------------------
diff --git a/3rdparty/libprocess/include/process/check.hpp b/3rdparty/libprocess/include/process/check.hpp
index 1b6ae2c..ea4321e 100644
--- a/3rdparty/libprocess/include/process/check.hpp
+++ b/3rdparty/libprocess/include/process/check.hpp
@@ -35,6 +35,9 @@
#define CHECK_FAILED(expression) \
CHECK_STATE(CHECK_FAILED, _check_failed, expression)
+#define CHECK_ABANDONED(expression) \
+ CHECK_STATE(CHECK_ABANDONED, _check_abandoned, expression)
+
// Private structs/functions used for CHECK_*.
template <typename T>
@@ -100,6 +103,22 @@ Option<Error> _check_failed(const process::Future<T>& f)
}
}
+
+template <typename T>
+Option<Error> _check_abandoned(const process::Future<T>& f)
+{
+ if (f.isReady()) {
+ return Some("is READY");
+ } else if (f.isDiscarded()) {
+ return Some("is DISCARDED");
+ } else if (f.isFailed()) {
+ return Some("is FAILED: " + f.failure());
+ } else if (!f.isAbandoned()) {
+ return Some("is not abandoned");
+ }
+ return None();
+}
+
// TODO(dhamon): CHECK_NPENDING, CHECK_NREADY, etc.
#endif // __PROCESS_CHECK_HPP__
http://git-wip-us.apache.org/repos/asf/mesos/blob/560ae4fc/3rdparty/libprocess/include/process/future.hpp
----------------------------------------------------------------------
diff --git a/3rdparty/libprocess/include/process/future.hpp b/3rdparty/libprocess/include/process/future.hpp
index 1b07bf9..2ce3ccb 100644
--- a/3rdparty/libprocess/include/process/future.hpp
+++ b/3rdparty/libprocess/include/process/future.hpp
@@ -127,6 +127,7 @@ public:
bool isReady() const;
bool isDiscarded() const;
bool isFailed() const;
+ bool isAbandoned() const;
bool hasDiscard() const;
// Discards this future. Returns false if discard has already been
@@ -157,6 +158,7 @@ public:
// Type of the callback functions that can get invoked when the
// future gets set, fails, or is discarded.
+ typedef lambda::function<void()> AbandonedCallback;
typedef lambda::function<void()> DiscardCallback;
typedef lambda::function<void(const T&)> ReadyCallback;
typedef lambda::function<void(const std::string&)> FailedCallback;
@@ -165,6 +167,7 @@ public:
// Installs callbacks for the specified events and returns a const
// reference to 'this' in order to easily support chaining.
+ const Future<T>& onAbandoned(AbandonedCallback&& callback) const;
const Future<T>& onDiscard(DiscardCallback&& callback) const;
const Future<T>& onReady(ReadyCallback&& callback) const;
const Future<T>& onFailed(FailedCallback&& callback) const;
@@ -175,6 +178,12 @@ public:
// is not expected.
template <typename F>
+ const Future<T>& onAbandoned(_Deferred<F>&& deferred) const
+ {
+ return onAbandoned(deferred.operator std::function<void()>());
+ }
+
+ template <typename F>
const Future<T>& onDiscard(_Deferred<F>&& deferred) const
{
return onDiscard(std::move(deferred).operator std::function<void()>());
@@ -215,7 +224,8 @@ private:
// argument (i.e., 'const T&' for 'onReady' and 'then' and 'const
// std::string&' for 'onFailed'), but we allow functors that don't
// care about the argument. We don't need to do this for
- // 'onDiscarded' because it doesn't take an argument.
+ // 'onDiscard', 'onDiscarded' or 'onAbandoned' because they don't
+ // take an argument.
struct LessPrefer {};
struct Prefer : LessPrefer {};
@@ -299,6 +309,15 @@ private:
public:
template <typename F>
+ const Future<T>& onAbandoned(F&& f) const
+ {
+ return onAbandoned(std::function<void()>(
+ [=]() mutable {
+ f();
+ }));
+ }
+
+ template <typename F>
const Future<T>& onDiscard(F&& f) const
{
return onDiscard(std::function<void()>(
@@ -419,6 +438,9 @@ public:
return then(std::forward<F>(f), Prefer());
}
+ // TODO(benh): Considering adding a `rescue` function for rescuing
+ // abandoned futures.
+
// Installs callbacks that get executed if this future completes
// because it failed.
Future<T> repair(
@@ -443,6 +465,8 @@ public:
// Prefer/LessPrefer to disambiguate.
private:
+ template <typename U>
+ friend class Future;
friend class Promise<T>;
friend class WeakFuture<T>;
@@ -465,6 +489,7 @@ private:
State state;
bool discard;
bool associated;
+ bool abandoned;
// One of:
// 1. None, the state is PENDING or DISCARDED.
@@ -472,6 +497,7 @@ private:
// 3. Error, the state is FAILED; 'error()' stores the message.
Result<T> result;
+ std::vector<AbandonedCallback> onAbandonedCallbacks;
std::vector<DiscardCallback> onDiscardCallbacks;
std::vector<ReadyCallback> onReadyCallbacks;
std::vector<FailedCallback> onFailedCallbacks;
@@ -479,6 +505,47 @@ private:
std::vector<AnyCallback> onAnyCallbacks;
};
+ // Abandons this future. Returns false if the future is already
+ // associated or no longer pending. Otherwise returns true and any
+ // Future::onAbandoned callbacks wil be run.
+ //
+ // If `propagating` is true then we'll abandon this future even if
+ // it has already been associated. This is important because
+ // `~Promise()` will try and abandon and we need to ignore that if
+ // the future has been associated since the promise will no longer
+ // be setting the future anyway (and is likely the reason it's being
+ // destructed, because it's useless). When the future that we've
+ // associated with gets abandoned, however, then we need to actually
+ // abandon this future too. Here's an example of this:
+ //
+ // 1: Owned<Promise<int>> promise1(new Promise<int>());
+ // 2: Owned<Promise<int>> promise2(new Promise<int>());
+ // 3:
+ // 4: Future<int> future1 = promise1->future();
+ // 5: Future<int> future2 = promise2->future();
+ // 6:
+ // 7: promise1->associate(future2);
+ // 8:
+ // 9: promise1.reset();
+ // 10:
+ // 11: assert(!future1.isAbandoned());
+ // 12:
+ // 13: promise2.reset();
+ // 14:
+ // 15: assert(future2.isAbandoned());
+ // 16: assert(future3.isAbandoned());
+ //
+ // At line 9 `~Promise()` will attempt to abandon the future by
+ // calling `abandon()` but since it's been associated we won't do
+ // anything. On line 13 the `onAbandoned()` callback will call
+ // `abandon(true)` and know we'll actually abandon the future
+ // because we're _propagating_ the abandon from the associated
+ // future.
+ //
+ // NOTE: this is an _INTERNAL_ function and should never be exposed
+ // or used outside of the implementation.
+ bool abandon(bool propagating = false);
+
// Sets the value for this future, unless the future is already set,
// failed, or discarded, in which case it returns false.
bool set(const T& _t);
@@ -658,7 +725,12 @@ void discarded(Future<T> future)
template <typename T>
-Promise<T>::Promise() {}
+Promise<T>::Promise()
+{
+ // Need to "unset" `abandoned` since it gets set in the empty
+ // constructor for `Future`.
+ f.data->abandoned = false;
+}
template <typename T>
@@ -672,7 +744,11 @@ Promise<T>::~Promise()
// Note that we don't discard the promise as we don't want to give
// the illusion that any computation hasn't started (or possibly
// finished) in the event that computation is "visible" by other
- // means.
+ // means. However, we try and abandon the future if it hasn't been
+ // associated or set (or moved, i.e., `f.data` is true).
+ if (f.data) {
+ f.abandon();
+ }
}
@@ -766,7 +842,8 @@ bool Promise<T>::associate(const Future<T>& future)
future
.onReady(lambda::bind(set, f, lambda::_1))
.onFailed(lambda::bind(&Future<T>::fail, f, lambda::_1))
- .onDiscarded(lambda::bind(&internal::discarded<T>, f));
+ .onDiscarded(lambda::bind(&internal::discarded<T>, f))
+ .onAbandoned(lambda::bind(&Future<T>::abandon, f, true));
}
return associated;
@@ -924,12 +1001,14 @@ Future<T>::Data::Data()
: state(PENDING),
discard(false),
associated(false),
+ abandoned(false),
result(None()) {}
template <typename T>
void Future<T>::Data::clearAllCallbacks()
{
+ onAbandonedCallbacks.clear();
onAnyCallbacks.clear();
onDiscardCallbacks.clear();
onDiscardedCallbacks.clear();
@@ -940,7 +1019,10 @@ void Future<T>::Data::clearAllCallbacks()
template <typename T>
Future<T>::Future()
- : data(new Data()) {}
+ : data(new Data())
+{
+ data->abandoned = true;
+}
template <typename T>
@@ -1065,6 +1147,32 @@ bool Future<T>::discard()
template <typename T>
+bool Future<T>::abandon(bool propagating)
+{
+ bool result = false;
+
+ std::vector<AbandonedCallback> callbacks;
+ synchronized (data->lock) {
+ if (!data->abandoned &&
+ data->state == PENDING &&
+ (!data->associated || propagating)) {
+ result = data->abandoned = true;
+
+ callbacks.swap(data->onAbandonedCallbacks);
+ }
+ }
+
+ // Invoke all callbacks. The callbacks get destroyed when we exit
+ // from the function.
+ if (result) {
+ internal::run(callbacks);
+ }
+
+ return result;
+}
+
+
+template <typename T>
bool Future<T>::isPending() const
{
return data->state == PENDING;
@@ -1093,6 +1201,13 @@ bool Future<T>::isFailed() const
template <typename T>
+bool Future<T>::isAbandoned() const
+{
+ return data->abandoned;
+}
+
+
+template <typename T>
bool Future<T>::hasDiscard() const
{
return data->discard;
@@ -1182,6 +1297,28 @@ const std::string& Future<T>::failure() const
template <typename T>
+const Future<T>& Future<T>::onAbandoned(AbandonedCallback&& callback) const
+{
+ bool run = false;
+
+ synchronized (data->lock) {
+ if (data->abandoned) {
+ run = true;
+ } else if (data->state == PENDING) {
+ data->onAbandonedCallbacks.emplace_back(std::move(callback));
+ }
+ }
+
+ // TODO(*): Invoke callback in another execution context.
+ if (run) {
+ callback();
+ }
+
+ return *this;
+}
+
+
+template <typename T>
const Future<T>& Future<T>::onDiscard(DiscardCallback&& callback) const
{
bool run = false;
@@ -1413,6 +1550,10 @@ Future<X> Future<T>::then(lambda::function<Future<X>(const T&)> f) const
onAny(std::move(thenf));
+ onAbandoned([=]() {
+ promise->future().abandon();
+ });
+
// Propagate discarding up the chain. To avoid cyclic dependencies,
// we keep a weak future in the callback.
promise->future().onDiscard(
@@ -1433,6 +1574,10 @@ Future<X> Future<T>::then(lambda::function<X(const T&)> f) const
onAny(std::move(then));
+ onAbandoned([=]() {
+ promise->future().abandon();
+ });
+
// Propagate discarding up the chain. To avoid cyclic dependencies,
// we keep a weak future in the callback.
promise->future().onDiscard(
@@ -1450,6 +1595,10 @@ Future<T> Future<T>::repair(
onAny(lambda::bind(&internal::repair<T>, f, promise, lambda::_1));
+ onAbandoned([=]() {
+ promise->future().abandon();
+ });
+
// Propagate discarding up the chain. To avoid cyclic dependencies,
// we keep a weak future in the callback.
promise->future().onDiscard(
@@ -1501,6 +1650,10 @@ Future<T> Future<T>::after(
onAny(lambda::bind(&internal::after<T>, latch, promise, timer, lambda::_1));
+ onAbandoned([=]() {
+ promise->future().abandon();
+ });
+
// Propagate discarding up the chain. To avoid cyclic dependencies,
// we keep a weak future in the callback.
promise->future().onDiscard(
http://git-wip-us.apache.org/repos/asf/mesos/blob/560ae4fc/3rdparty/libprocess/include/process/gtest.hpp
----------------------------------------------------------------------
diff --git a/3rdparty/libprocess/include/process/gtest.hpp b/3rdparty/libprocess/include/process/gtest.hpp
index f0ffd9a..eee7266 100644
--- a/3rdparty/libprocess/include/process/gtest.hpp
+++ b/3rdparty/libprocess/include/process/gtest.hpp
@@ -181,6 +181,38 @@ template <typename T>
}
+template <typename T>
+::testing::AssertionResult AwaitAssertAbandoned(
+ const char* expr,
+ const char*, // Unused string representation of 'duration'.
+ const process::Future<T>& actual,
+ const Duration& duration)
+{
+ process::Owned<process::Latch> latch(new process::Latch());
+
+ actual.onAny([=]() { latch->trigger(); });
+ actual.onAbandoned([=]() { latch->trigger(); });
+
+ if (!latch->await(duration)) {
+ return ::testing::AssertionFailure()
+ << "Failed to wait " << duration << " for " << expr;
+ } else if (actual.isDiscarded()) {
+ return ::testing::AssertionFailure()
+ << expr << " was discarded";
+ } else if (actual.isReady()) {
+ return ::testing::AssertionFailure()
+ << expr << " is ready (" << ::testing::PrintToString(actual.get()) << ")";
+ } else if (actual.isFailed()) {
+ return ::testing::AssertionFailure()
+ << "(" << expr << ").failure(): " << actual.failure();
+ }
+
+ CHECK_ABANDONED(actual);
+
+ return ::testing::AssertionSuccess();
+}
+
+
template <typename T1, typename T2>
::testing::AssertionResult AwaitAssertEq(
const char* expectedExpr,
@@ -209,6 +241,22 @@ template <typename T1, typename T2>
}
+#define AWAIT_ASSERT_ABANDONED_FOR(actual, duration) \
+ ASSERT_PRED_FORMAT2(AwaitAssertAbandoned, actual, duration)
+
+
+#define AWAIT_ASSERT_ABANDONED(actual) \
+ AWAIT_ASSERT_ABANDONED_FOR(actual, Seconds(15))
+
+
+#define AWAIT_EXPECT_ABANDONED_FOR(actual, duration) \
+ EXPECT_PRED_FORMAT2(AwaitAssertAbandoned, actual, duration)
+
+
+#define AWAIT_EXPECT_ABANDONED(actual) \
+ AWAIT_EXPECT_ABANDONED_FOR(actual, Seconds(15))
+
+
// TODO(bmahler): Differentiate EXPECT and ASSERT here.
#define AWAIT_FOR(actual, duration) \
ASSERT_PRED_FORMAT2(Await, actual, duration)
http://git-wip-us.apache.org/repos/asf/mesos/blob/560ae4fc/3rdparty/libprocess/src/tests/future_tests.cpp
----------------------------------------------------------------------
diff --git a/3rdparty/libprocess/src/tests/future_tests.cpp b/3rdparty/libprocess/src/tests/future_tests.cpp
index 77529ec..fdda7a0 100644
--- a/3rdparty/libprocess/src/tests/future_tests.cpp
+++ b/3rdparty/libprocess/src/tests/future_tests.cpp
@@ -17,6 +17,7 @@
#include <process/clock.hpp>
#include <process/future.hpp>
#include <process/gtest.hpp>
+#include <process/owned.hpp>
#include <stout/duration.hpp>
#include <stout/nothing.hpp>
@@ -25,6 +26,7 @@
using process::Clock;
using process::Failure;
using process::Future;
+using process::Owned;
using process::Promise;
using process::undiscardable;
@@ -611,3 +613,37 @@ TEST(FutureTest, UndiscardableLambda)
AWAIT_ASSERT_EQ(84, f);
}
+
+
+TEST(FutureTest, Abandoned)
+{
+ AWAIT_EXPECT_ABANDONED(Future<int>());
+
+ Owned<Promise<int>> promise(new Promise<int>());
+
+ Future<int> future = promise->future();
+
+ EXPECT_TRUE(!future.isAbandoned());
+
+ promise.reset();
+
+ AWAIT_EXPECT_ABANDONED(future);
+}
+
+
+TEST(FutureTest, AbandonedChain)
+{
+ Owned<Promise<int>> promise(new Promise<int>());
+
+ Future<string> future = promise->future()
+ .then([]() {
+ return Nothing();
+ })
+ .then([]() -> string {
+ return "hello world";
+ });
+
+ promise.reset();
+
+ AWAIT_EXPECT_ABANDONED(future);
+}
http://git-wip-us.apache.org/repos/asf/mesos/blob/560ae4fc/3rdparty/libprocess/src/tests/metrics_tests.cpp
----------------------------------------------------------------------
diff --git a/3rdparty/libprocess/src/tests/metrics_tests.cpp b/3rdparty/libprocess/src/tests/metrics_tests.cpp
index 161ca0d..bb7c9e3 100644
--- a/3rdparty/libprocess/src/tests/metrics_tests.cpp
+++ b/3rdparty/libprocess/src/tests/metrics_tests.cpp
@@ -54,6 +54,7 @@ using process::Failure;
using process::Future;
using process::PID;
using process::Process;
+using process::Promise;
using process::READONLY_HTTP_AUTHENTICATION_REALM;
using process::Statistics;
using process::UPID;
@@ -76,8 +77,12 @@ public:
Future<double> pending()
{
- return Future<double>();
+ return promise.future();
}
+
+ // Need to use a promise for the call to pending instead of just a
+ // `Future<double>()` so we don't return an abandoned future.
+ Promise<double> promise;
};