You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@mesos.apache.org by be...@apache.org on 2017/11/20 00:54:23 UTC

[12/15] mesos git commit: Added Future::onAbandoned semantics to process::collect/await.

Added Future::onAbandoned semantics to process::collect/await.

Review: https://reviews.apache.org/r/61150


Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/613a846f
Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/613a846f
Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/613a846f

Branch: refs/heads/master
Commit: 613a846f197f74d116641c66b9caade65de4edae
Parents: e2eb6ae
Author: Benjamin Hindman <be...@gmail.com>
Authored: Wed Aug 2 20:13:57 2017 -0700
Committer: Benjamin Hindman <be...@gmail.com>
Committed: Sun Nov 19 16:51:26 2017 -0800

----------------------------------------------------------------------
 3rdparty/libprocess/include/process/collect.hpp | 19 +++++++
 3rdparty/libprocess/src/socket.cpp              |  2 +-
 3rdparty/libprocess/src/tests/collect_tests.cpp | 54 ++++++++++++++++++++
 3 files changed, 74 insertions(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/mesos/blob/613a846f/3rdparty/libprocess/include/process/collect.hpp
----------------------------------------------------------------------
diff --git a/3rdparty/libprocess/include/process/collect.hpp b/3rdparty/libprocess/include/process/collect.hpp
index fccf55a..392b06d 100644
--- a/3rdparty/libprocess/include/process/collect.hpp
+++ b/3rdparty/libprocess/include/process/collect.hpp
@@ -76,6 +76,7 @@ public:
     delete promise;
   }
 
+protected:
   virtual void initialize()
   {
     // Stop this nonsense if nobody cares.
@@ -83,10 +84,19 @@ public:
 
     foreach (const Future<T>& future, futures) {
       future.onAny(defer(this, &CollectProcess::waited, lambda::_1));
+      future.onAbandoned(defer(this, &CollectProcess::abandoned));
     }
   }
 
 private:
+  void abandoned()
+  {
+    // There is no use waiting because this future will never complete
+    // so terminate this process which will cause `promise` to get
+    // deleted and our future to also be abandoned.
+    terminate(this);
+  }
+
   void discarded()
   {
     promise->discard();
@@ -150,10 +160,19 @@ public:
 
     foreach (const Future<T>& future, futures) {
       future.onAny(defer(this, &AwaitProcess::waited, lambda::_1));
+      future.onAbandoned(defer(this, &AwaitProcess::abandoned));
     }
   }
 
 private:
+  void abandoned()
+  {
+    // There is no use waiting because this future will never complete
+    // so terminate this process which will cause `promise` to get
+    // deleted and our future to also be abandoned.
+    terminate(this);
+  }
+
   void discarded()
   {
     promise->discard();

http://git-wip-us.apache.org/repos/asf/mesos/blob/613a846f/3rdparty/libprocess/src/socket.cpp
----------------------------------------------------------------------
diff --git a/3rdparty/libprocess/src/socket.cpp b/3rdparty/libprocess/src/socket.cpp
index 490bbd7..504cb54 100644
--- a/3rdparty/libprocess/src/socket.cpp
+++ b/3rdparty/libprocess/src/socket.cpp
@@ -162,7 +162,7 @@ Future<string> SocketImpl::recv(const Option<ssize_t>& size)
       [=]() {
         return self->recv(data.get(), chunk);
       },
-      [=](size_t length) -> ControlFlow<string> {
+      [=](size_t length) mutable -> ControlFlow<string> {
         if (length == 0) { // EOF.
           // Return everything we've received thus far, a subsequent
           // receive will return an empty string.

http://git-wip-us.apache.org/repos/asf/mesos/blob/613a846f/3rdparty/libprocess/src/tests/collect_tests.cpp
----------------------------------------------------------------------
diff --git a/3rdparty/libprocess/src/tests/collect_tests.cpp b/3rdparty/libprocess/src/tests/collect_tests.cpp
index 91760ec..72f2a80 100644
--- a/3rdparty/libprocess/src/tests/collect_tests.cpp
+++ b/3rdparty/libprocess/src/tests/collect_tests.cpp
@@ -12,10 +12,12 @@
 
 #include <process/collect.hpp>
 #include <process/gtest.hpp>
+#include <process/owned.hpp>
 
 #include <stout/gtest.hpp>
 
 using process::Future;
+using process::Owned;
 using process::Promise;
 
 using std::list;
@@ -132,6 +134,30 @@ TEST(CollectTest, DiscardPropagation)
 }
 
 
+TEST(CollectTest, AbandonedPropagation)
+{
+  Owned<Promise<int>> promise(new Promise<int>());
+
+  // There is a race from the time that we reset the promise to when
+  // the collect process is terminated so we need to use
+  // Future::recover to properly handle this case.
+  Future<int> future = process::collect(promise->future())
+    .recover([](const Future<std::tuple<int>>& f) -> Future<std::tuple<int>> {
+      if (f.isAbandoned()) {
+        return std::make_tuple(42);
+      }
+      return f;
+    })
+    .then([](const std::tuple<int>& t) {
+      return std::get<0>(t);
+    });
+
+  promise.reset();
+
+  AWAIT_EQ(42, future);
+}
+
+
 TEST(AwaitTest, Success)
 {
   // First ensure an empty list functions correctly.
@@ -249,3 +275,31 @@ TEST(AwaitTest, DiscardPropagation)
   AWAIT_DISCARDED(promise1.future());
   AWAIT_DISCARDED(promise2.future());
 }
+
+
+TEST(AwaitTest, AbandonedPropagation)
+{
+  Owned<Promise<int>> promise(new Promise<int>());
+
+  // There is a race from the time that we reset the promise to when
+  // the await process is terminated so we need to use
+  // Future::recover to properly handle this case.
+  Future<int> future = process::await(promise->future(), Future<int>())
+    .recover([](const Future<std::tuple<Future<int>, Future<int>>>& f)
+             -> Future<std::tuple<Future<int>, Future<int>>> {
+      if (f.isAbandoned()) {
+        return std::make_tuple(42, 0);
+      }
+      return f;
+    })
+    .then([](const std::tuple<Future<int>, Future<int>>& t) {
+      return std::get<0>(t)
+        .then([](int i) {
+          return i;
+        });
+    });
+
+  promise.reset();
+
+  AWAIT_EQ(42, future);
+}