You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@mesos.apache.org by be...@apache.org on 2017/11/20 00:54:17 UTC

[06/15] mesos git commit: Added Future::onAbandoned and Future::isAbandoned.

Added Future::onAbandoned and Future::isAbandoned.

Review: https://reviews.apache.org/r/61147


Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/560ae4fc
Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/560ae4fc
Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/560ae4fc

Branch: refs/heads/master
Commit: 560ae4fcb3de5ca6605bb58d2f87babcd6b75593
Parents: e636aa7
Author: Benjamin Hindman <be...@gmail.com>
Authored: Wed Aug 2 20:26:48 2017 -0700
Committer: Benjamin Hindman <be...@gmail.com>
Committed: Sun Nov 19 16:33:47 2017 -0800

----------------------------------------------------------------------
 3rdparty/libprocess/include/process/check.hpp   |  19 +++
 3rdparty/libprocess/include/process/future.hpp  | 163 ++++++++++++++++++-
 3rdparty/libprocess/include/process/gtest.hpp   |  48 ++++++
 3rdparty/libprocess/src/tests/future_tests.cpp  |  36 ++++
 3rdparty/libprocess/src/tests/metrics_tests.cpp |   7 +-
 5 files changed, 267 insertions(+), 6 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/mesos/blob/560ae4fc/3rdparty/libprocess/include/process/check.hpp
----------------------------------------------------------------------
diff --git a/3rdparty/libprocess/include/process/check.hpp b/3rdparty/libprocess/include/process/check.hpp
index 1b6ae2c..ea4321e 100644
--- a/3rdparty/libprocess/include/process/check.hpp
+++ b/3rdparty/libprocess/include/process/check.hpp
@@ -35,6 +35,9 @@
 #define CHECK_FAILED(expression)                                        \
   CHECK_STATE(CHECK_FAILED, _check_failed, expression)
 
+#define CHECK_ABANDONED(expression)                                     \
+  CHECK_STATE(CHECK_ABANDONED, _check_abandoned, expression)
+
 // Private structs/functions used for CHECK_*.
 
 template <typename T>
@@ -100,6 +103,22 @@ Option<Error> _check_failed(const process::Future<T>& f)
   }
 }
 
+
+template <typename T>
+Option<Error> _check_abandoned(const process::Future<T>& f)
+{
+  if (f.isReady()) {
+    return Some("is READY");
+  } else if (f.isDiscarded()) {
+    return Some("is DISCARDED");
+  } else if (f.isFailed()) {
+    return Some("is FAILED: " + f.failure());
+  } else if (!f.isAbandoned()) {
+    return Some("is not abandoned");
+  }
+  return None();
+}
+
 // TODO(dhamon): CHECK_NPENDING, CHECK_NREADY, etc.
 
 #endif // __PROCESS_CHECK_HPP__

http://git-wip-us.apache.org/repos/asf/mesos/blob/560ae4fc/3rdparty/libprocess/include/process/future.hpp
----------------------------------------------------------------------
diff --git a/3rdparty/libprocess/include/process/future.hpp b/3rdparty/libprocess/include/process/future.hpp
index 1b07bf9..2ce3ccb 100644
--- a/3rdparty/libprocess/include/process/future.hpp
+++ b/3rdparty/libprocess/include/process/future.hpp
@@ -127,6 +127,7 @@ public:
   bool isReady() const;
   bool isDiscarded() const;
   bool isFailed() const;
+  bool isAbandoned() const;
   bool hasDiscard() const;
 
   // Discards this future. Returns false if discard has already been
@@ -157,6 +158,7 @@ public:
 
   // Type of the callback functions that can get invoked when the
   // future gets set, fails, or is discarded.
+  typedef lambda::function<void()> AbandonedCallback;
   typedef lambda::function<void()> DiscardCallback;
   typedef lambda::function<void(const T&)> ReadyCallback;
   typedef lambda::function<void(const std::string&)> FailedCallback;
@@ -165,6 +167,7 @@ public:
 
   // Installs callbacks for the specified events and returns a const
   // reference to 'this' in order to easily support chaining.
+  const Future<T>& onAbandoned(AbandonedCallback&& callback) const;
   const Future<T>& onDiscard(DiscardCallback&& callback) const;
   const Future<T>& onReady(ReadyCallback&& callback) const;
   const Future<T>& onFailed(FailedCallback&& callback) const;
@@ -175,6 +178,12 @@ public:
   // is not expected.
 
   template <typename F>
+  const Future<T>& onAbandoned(_Deferred<F>&& deferred) const
+  {
+    return onAbandoned(deferred.operator std::function<void()>());
+  }
+
+  template <typename F>
   const Future<T>& onDiscard(_Deferred<F>&& deferred) const
   {
     return onDiscard(std::move(deferred).operator std::function<void()>());
@@ -215,7 +224,8 @@ private:
   // argument (i.e., 'const T&' for 'onReady' and 'then' and 'const
   // std::string&' for 'onFailed'), but we allow functors that don't
   // care about the argument. We don't need to do this for
-  // 'onDiscarded' because it doesn't take an argument.
+  // 'onDiscard', 'onDiscarded' or 'onAbandoned' because they don't
+  // take an argument.
   struct LessPrefer {};
   struct Prefer : LessPrefer {};
 
@@ -299,6 +309,15 @@ private:
 
 public:
   template <typename F>
+  const Future<T>& onAbandoned(F&& f) const
+  {
+    return onAbandoned(std::function<void()>(
+        [=]() mutable {
+          f();
+        }));
+  }
+
+  template <typename F>
   const Future<T>& onDiscard(F&& f) const
   {
     return onDiscard(std::function<void()>(
@@ -419,6 +438,9 @@ public:
     return then(std::forward<F>(f), Prefer());
   }
 
+  // TODO(benh): Considering adding a `rescue` function for rescuing
+  // abandoned futures.
+
   // Installs callbacks that get executed if this future completes
   // because it failed.
   Future<T> repair(
@@ -443,6 +465,8 @@ public:
   // Prefer/LessPrefer to disambiguate.
 
 private:
+  template <typename U>
+  friend class Future;
   friend class Promise<T>;
   friend class WeakFuture<T>;
 
@@ -465,6 +489,7 @@ private:
     State state;
     bool discard;
     bool associated;
+    bool abandoned;
 
     // One of:
     //   1. None, the state is PENDING or DISCARDED.
@@ -472,6 +497,7 @@ private:
     //   3. Error, the state is FAILED; 'error()' stores the message.
     Result<T> result;
 
+    std::vector<AbandonedCallback> onAbandonedCallbacks;
     std::vector<DiscardCallback> onDiscardCallbacks;
     std::vector<ReadyCallback> onReadyCallbacks;
     std::vector<FailedCallback> onFailedCallbacks;
@@ -479,6 +505,47 @@ private:
     std::vector<AnyCallback> onAnyCallbacks;
   };
 
+  // Abandons this future. Returns false if the future is already
+  // associated or no longer pending. Otherwise returns true and any
+  // Future::onAbandoned callbacks wil be run.
+  //
+  // If `propagating` is true then we'll abandon this future even if
+  // it has already been associated. This is important because
+  // `~Promise()` will try and abandon and we need to ignore that if
+  // the future has been associated since the promise will no longer
+  // be setting the future anyway (and is likely the reason it's being
+  // destructed, because it's useless). When the future that we've
+  // associated with gets abandoned, however, then we need to actually
+  // abandon this future too. Here's an example of this:
+  //
+  // 1:    Owned<Promise<int>> promise1(new Promise<int>());
+  // 2:    Owned<Promise<int>> promise2(new Promise<int>());
+  // 3:
+  // 4:    Future<int> future1 = promise1->future();
+  // 5:    Future<int> future2 = promise2->future();
+  // 6:
+  // 7:    promise1->associate(future2);
+  // 8:
+  // 9:    promise1.reset();
+  // 10:
+  // 11:   assert(!future1.isAbandoned());
+  // 12:
+  // 13:   promise2.reset();
+  // 14:
+  // 15:   assert(future2.isAbandoned());
+  // 16:   assert(future3.isAbandoned());
+  //
+  // At line 9 `~Promise()` will attempt to abandon the future by
+  // calling `abandon()` but since it's been associated we won't do
+  // anything. On line 13 the `onAbandoned()` callback will call
+  // `abandon(true)` and know we'll actually abandon the future
+  // because we're _propagating_ the abandon from the associated
+  // future.
+  //
+  // NOTE: this is an _INTERNAL_ function and should never be exposed
+  // or used outside of the implementation.
+  bool abandon(bool propagating = false);
+
   // Sets the value for this future, unless the future is already set,
   // failed, or discarded, in which case it returns false.
   bool set(const T& _t);
@@ -658,7 +725,12 @@ void discarded(Future<T> future)
 
 
 template <typename T>
-Promise<T>::Promise() {}
+Promise<T>::Promise()
+{
+  // Need to "unset" `abandoned` since it gets set in the empty
+  // constructor for `Future`.
+  f.data->abandoned = false;
+}
 
 
 template <typename T>
@@ -672,7 +744,11 @@ Promise<T>::~Promise()
   // Note that we don't discard the promise as we don't want to give
   // the illusion that any computation hasn't started (or possibly
   // finished) in the event that computation is "visible" by other
-  // means.
+  // means. However, we try and abandon the future if it hasn't been
+  // associated or set (or moved, i.e., `f.data` is true).
+  if (f.data) {
+    f.abandon();
+  }
 }
 
 
@@ -766,7 +842,8 @@ bool Promise<T>::associate(const Future<T>& future)
     future
       .onReady(lambda::bind(set, f, lambda::_1))
       .onFailed(lambda::bind(&Future<T>::fail, f, lambda::_1))
-      .onDiscarded(lambda::bind(&internal::discarded<T>, f));
+      .onDiscarded(lambda::bind(&internal::discarded<T>, f))
+      .onAbandoned(lambda::bind(&Future<T>::abandon, f, true));
   }
 
   return associated;
@@ -924,12 +1001,14 @@ Future<T>::Data::Data()
   : state(PENDING),
     discard(false),
     associated(false),
+    abandoned(false),
     result(None()) {}
 
 
 template <typename T>
 void Future<T>::Data::clearAllCallbacks()
 {
+  onAbandonedCallbacks.clear();
   onAnyCallbacks.clear();
   onDiscardCallbacks.clear();
   onDiscardedCallbacks.clear();
@@ -940,7 +1019,10 @@ void Future<T>::Data::clearAllCallbacks()
 
 template <typename T>
 Future<T>::Future()
-  : data(new Data()) {}
+  : data(new Data())
+{
+  data->abandoned = true;
+}
 
 
 template <typename T>
@@ -1065,6 +1147,32 @@ bool Future<T>::discard()
 
 
 template <typename T>
+bool Future<T>::abandon(bool propagating)
+{
+  bool result = false;
+
+  std::vector<AbandonedCallback> callbacks;
+  synchronized (data->lock) {
+    if (!data->abandoned &&
+        data->state == PENDING &&
+        (!data->associated || propagating)) {
+      result = data->abandoned = true;
+
+      callbacks.swap(data->onAbandonedCallbacks);
+    }
+  }
+
+  // Invoke all callbacks. The callbacks get destroyed when we exit
+  // from the function.
+  if (result) {
+    internal::run(callbacks);
+  }
+
+  return result;
+}
+
+
+template <typename T>
 bool Future<T>::isPending() const
 {
   return data->state == PENDING;
@@ -1093,6 +1201,13 @@ bool Future<T>::isFailed() const
 
 
 template <typename T>
+bool Future<T>::isAbandoned() const
+{
+  return data->abandoned;
+}
+
+
+template <typename T>
 bool Future<T>::hasDiscard() const
 {
   return data->discard;
@@ -1182,6 +1297,28 @@ const std::string& Future<T>::failure() const
 
 
 template <typename T>
+const Future<T>& Future<T>::onAbandoned(AbandonedCallback&& callback) const
+{
+  bool run = false;
+
+  synchronized (data->lock) {
+    if (data->abandoned) {
+      run = true;
+    } else if (data->state == PENDING) {
+      data->onAbandonedCallbacks.emplace_back(std::move(callback));
+    }
+  }
+
+  // TODO(*): Invoke callback in another execution context.
+  if (run) {
+    callback();
+  }
+
+  return *this;
+}
+
+
+template <typename T>
 const Future<T>& Future<T>::onDiscard(DiscardCallback&& callback) const
 {
   bool run = false;
@@ -1413,6 +1550,10 @@ Future<X> Future<T>::then(lambda::function<Future<X>(const T&)> f) const
 
   onAny(std::move(thenf));
 
+  onAbandoned([=]() {
+    promise->future().abandon();
+  });
+
   // Propagate discarding up the chain. To avoid cyclic dependencies,
   // we keep a weak future in the callback.
   promise->future().onDiscard(
@@ -1433,6 +1574,10 @@ Future<X> Future<T>::then(lambda::function<X(const T&)> f) const
 
   onAny(std::move(then));
 
+  onAbandoned([=]() {
+    promise->future().abandon();
+  });
+
   // Propagate discarding up the chain. To avoid cyclic dependencies,
   // we keep a weak future in the callback.
   promise->future().onDiscard(
@@ -1450,6 +1595,10 @@ Future<T> Future<T>::repair(
 
   onAny(lambda::bind(&internal::repair<T>, f, promise, lambda::_1));
 
+  onAbandoned([=]() {
+    promise->future().abandon();
+  });
+
   // Propagate discarding up the chain. To avoid cyclic dependencies,
   // we keep a weak future in the callback.
   promise->future().onDiscard(
@@ -1501,6 +1650,10 @@ Future<T> Future<T>::after(
 
   onAny(lambda::bind(&internal::after<T>, latch, promise, timer, lambda::_1));
 
+  onAbandoned([=]() {
+    promise->future().abandon();
+  });
+
   // Propagate discarding up the chain. To avoid cyclic dependencies,
   // we keep a weak future in the callback.
   promise->future().onDiscard(

http://git-wip-us.apache.org/repos/asf/mesos/blob/560ae4fc/3rdparty/libprocess/include/process/gtest.hpp
----------------------------------------------------------------------
diff --git a/3rdparty/libprocess/include/process/gtest.hpp b/3rdparty/libprocess/include/process/gtest.hpp
index f0ffd9a..eee7266 100644
--- a/3rdparty/libprocess/include/process/gtest.hpp
+++ b/3rdparty/libprocess/include/process/gtest.hpp
@@ -181,6 +181,38 @@ template <typename T>
 }
 
 
+template <typename T>
+::testing::AssertionResult AwaitAssertAbandoned(
+    const char* expr,
+    const char*, // Unused string representation of 'duration'.
+    const process::Future<T>& actual,
+    const Duration& duration)
+{
+  process::Owned<process::Latch> latch(new process::Latch());
+
+  actual.onAny([=]() { latch->trigger(); });
+  actual.onAbandoned([=]() { latch->trigger(); });
+
+  if (!latch->await(duration)) {
+    return ::testing::AssertionFailure()
+      << "Failed to wait " << duration << " for " << expr;
+  } else if (actual.isDiscarded()) {
+    return ::testing::AssertionFailure()
+      << expr << " was discarded";
+  } else if (actual.isReady()) {
+    return ::testing::AssertionFailure()
+      << expr << " is ready (" << ::testing::PrintToString(actual.get()) << ")";
+  } else if (actual.isFailed()) {
+    return ::testing::AssertionFailure()
+      << "(" << expr << ").failure(): " << actual.failure();
+  }
+
+  CHECK_ABANDONED(actual);
+
+  return ::testing::AssertionSuccess();
+}
+
+
 template <typename T1, typename T2>
 ::testing::AssertionResult AwaitAssertEq(
     const char* expectedExpr,
@@ -209,6 +241,22 @@ template <typename T1, typename T2>
 }
 
 
+#define AWAIT_ASSERT_ABANDONED_FOR(actual, duration)            \
+  ASSERT_PRED_FORMAT2(AwaitAssertAbandoned, actual, duration)
+
+
+#define AWAIT_ASSERT_ABANDONED(actual)                  \
+  AWAIT_ASSERT_ABANDONED_FOR(actual, Seconds(15))
+
+
+#define AWAIT_EXPECT_ABANDONED_FOR(actual, duration)            \
+  EXPECT_PRED_FORMAT2(AwaitAssertAbandoned, actual, duration)
+
+
+#define AWAIT_EXPECT_ABANDONED(actual)                  \
+  AWAIT_EXPECT_ABANDONED_FOR(actual, Seconds(15))
+
+
 // TODO(bmahler): Differentiate EXPECT and ASSERT here.
 #define AWAIT_FOR(actual, duration)             \
   ASSERT_PRED_FORMAT2(Await, actual, duration)

http://git-wip-us.apache.org/repos/asf/mesos/blob/560ae4fc/3rdparty/libprocess/src/tests/future_tests.cpp
----------------------------------------------------------------------
diff --git a/3rdparty/libprocess/src/tests/future_tests.cpp b/3rdparty/libprocess/src/tests/future_tests.cpp
index 77529ec..fdda7a0 100644
--- a/3rdparty/libprocess/src/tests/future_tests.cpp
+++ b/3rdparty/libprocess/src/tests/future_tests.cpp
@@ -17,6 +17,7 @@
 #include <process/clock.hpp>
 #include <process/future.hpp>
 #include <process/gtest.hpp>
+#include <process/owned.hpp>
 
 #include <stout/duration.hpp>
 #include <stout/nothing.hpp>
@@ -25,6 +26,7 @@
 using process::Clock;
 using process::Failure;
 using process::Future;
+using process::Owned;
 using process::Promise;
 using process::undiscardable;
 
@@ -611,3 +613,37 @@ TEST(FutureTest, UndiscardableLambda)
 
   AWAIT_ASSERT_EQ(84, f);
 }
+
+
+TEST(FutureTest, Abandoned)
+{
+  AWAIT_EXPECT_ABANDONED(Future<int>());
+
+  Owned<Promise<int>> promise(new Promise<int>());
+
+  Future<int> future = promise->future();
+
+  EXPECT_TRUE(!future.isAbandoned());
+
+  promise.reset();
+
+  AWAIT_EXPECT_ABANDONED(future);
+}
+
+
+TEST(FutureTest, AbandonedChain)
+{
+  Owned<Promise<int>> promise(new Promise<int>());
+
+  Future<string> future = promise->future()
+    .then([]() {
+      return Nothing();
+    })
+    .then([]() -> string {
+      return "hello world";
+    });
+
+  promise.reset();
+
+  AWAIT_EXPECT_ABANDONED(future);
+}

http://git-wip-us.apache.org/repos/asf/mesos/blob/560ae4fc/3rdparty/libprocess/src/tests/metrics_tests.cpp
----------------------------------------------------------------------
diff --git a/3rdparty/libprocess/src/tests/metrics_tests.cpp b/3rdparty/libprocess/src/tests/metrics_tests.cpp
index 161ca0d..bb7c9e3 100644
--- a/3rdparty/libprocess/src/tests/metrics_tests.cpp
+++ b/3rdparty/libprocess/src/tests/metrics_tests.cpp
@@ -54,6 +54,7 @@ using process::Failure;
 using process::Future;
 using process::PID;
 using process::Process;
+using process::Promise;
 using process::READONLY_HTTP_AUTHENTICATION_REALM;
 using process::Statistics;
 using process::UPID;
@@ -76,8 +77,12 @@ public:
 
   Future<double> pending()
   {
-    return Future<double>();
+    return promise.future();
   }
+
+  // Need to use a promise for the call to pending instead of just a
+  // `Future<double>()` so we don't return an abandoned future.
+  Promise<double> promise;
 };