You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@mesos.apache.org by bm...@apache.org on 2013/08/08 02:05:30 UTC

git commit: Added process::await() for asynchronously awaiting for Futures to transition from pending.

Updated Branches:
  refs/heads/master a47b58e72 -> c4d96d021


Added process::await() for asynchronously awaiting for Futures to
transition from pending.

Review: https://reviews.apache.org/r/13381


Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/c4d96d02
Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/c4d96d02
Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/c4d96d02

Branch: refs/heads/master
Commit: c4d96d02172042999decee7ee64a00a18398dd0c
Parents: a47b58e
Author: Benjamin Mahler <bm...@twitter.com>
Authored: Thu Jul 25 14:48:32 2013 -0700
Committer: Benjamin Mahler <bm...@twitter.com>
Committed: Wed Aug 7 15:24:31 2013 -0700

----------------------------------------------------------------------
 3rdparty/libprocess/include/process/collect.hpp | 105 ++++++++++++++++++-
 3rdparty/libprocess/src/tests/process_tests.cpp |  48 ++++++++-
 2 files changed, 148 insertions(+), 5 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/mesos/blob/c4d96d02/3rdparty/libprocess/include/process/collect.hpp
----------------------------------------------------------------------
diff --git a/3rdparty/libprocess/include/process/collect.hpp b/3rdparty/libprocess/include/process/collect.hpp
index 3c620aa..27e2729 100644
--- a/3rdparty/libprocess/include/process/collect.hpp
+++ b/3rdparty/libprocess/include/process/collect.hpp
@@ -14,6 +14,9 @@
 #include <stout/none.hpp>
 #include <stout/option.hpp>
 
+// TODO(bmahler): Move these into a futures.hpp header to group Future
+// related utilities.
+
 namespace process {
 
 // Waits on each future in the specified list and returns the list of
@@ -26,6 +29,14 @@ Future<std::list<T> > collect(
     const Option<Timeout>& timeout = None());
 
 
+// Waits on each future in the specified set and returns the list of
+// non-pending futures. On timeout, the result will be a failure.
+template <typename T>
+Future<std::list<Future<T> > > await(
+    std::list<Future<T> >& futures,
+    const Option<Timeout>& timeout = None());
+
+
 namespace internal {
 
 template <typename T>
@@ -94,7 +105,8 @@ private:
     } else {
       assert(future.isReady());
       ready += 1;
-      if (futures.size() == ready) {
+      if (ready == futures.size()) {
+        std::list<T> values;
         foreach (const Future<T>& future, futures) {
           values.push_back(future.get());
         }
@@ -107,7 +119,79 @@ private:
   const std::list<Future<T> > futures;
   const Option<Timeout> timeout;
   Promise<std::list<T> >* promise;
-  std::list<T> values;
+  size_t ready;
+};
+
+
+template <typename T>
+class AwaitProcess : public Process<AwaitProcess<T> >
+{
+public:
+  AwaitProcess(
+      const std::list<Future<T> >& _futures,
+      const Option<Timeout>& _timeout,
+      Promise<std::list<Future<T> > >* _promise)
+    : futures(_futures),
+      timeout(_timeout),
+      promise(_promise),
+      ready(0) {}
+
+  virtual ~AwaitProcess()
+  {
+    delete promise;
+  }
+
+  virtual void initialize()
+  {
+    // Stop this nonsense if nobody cares.
+    promise->future().onDiscarded(defer(this, &AwaitProcess::discarded));
+
+    // Only wait as long as requested.
+    if (timeout.isSome()) {
+      delay(timeout.get().remaining(), this, &AwaitProcess::timedout);
+    }
+
+    typename std::list<Future<T> >::const_iterator iterator;
+    for (iterator = futures.begin(); iterator != futures.end(); ++iterator) {
+      (*iterator).onAny(
+          defer(this, &AwaitProcess::waited, std::tr1::placeholders::_1));
+    }
+  }
+
+private:
+  void discarded()
+  {
+    terminate(this);
+  }
+
+  void timedout()
+  {
+    // Need to discard all of the futures so any of their associated
+    // resources can get properly cleaned up.
+    typename std::list<Future<T> >::const_iterator iterator;
+    for (iterator = futures.begin(); iterator != futures.end(); ++iterator) {
+      Future<T> future = *iterator; // Need a non-const copy to discard.
+      future.discard();
+    }
+
+    promise->fail("Collect failed: timed out");
+    terminate(this);
+  }
+
+  void waited(const Future<T>& future)
+  {
+    assert(!future.isPending());
+
+    ready += 1;
+    if (ready == futures.size()) {
+      promise->set(futures);
+      terminate(this);
+    }
+  }
+
+  const std::list<Future<T> > futures;
+  const Option<Timeout> timeout;
+  Promise<std::list<Future<T> > >* promise;
   size_t ready;
 };
 
@@ -129,6 +213,23 @@ inline Future<std::list<T> > collect(
   return future;
 }
 
+
+template <typename T>
+inline Future<std::list<Future<T> > > await(
+    std::list<Future<T> >& futures,
+    const Option<Timeout>& timeout)
+{
+  if (futures.empty()) {
+    return futures;
+  }
+
+  Promise<std::list<Future<T> > >* promise =
+    new Promise<std::list<Future<T> > >();
+  Future<std::list<Future<T> > > future = promise->future();
+  spawn(new internal::AwaitProcess<T>(futures, timeout, promise), true);
+  return future;
+}
+
 } // namespace process {
 
 #endif // __PROCESS_COLLECT_HPP__

http://git-wip-us.apache.org/repos/asf/mesos/blob/c4d96d02/3rdparty/libprocess/src/tests/process_tests.cpp
----------------------------------------------------------------------
diff --git a/3rdparty/libprocess/src/tests/process_tests.cpp b/3rdparty/libprocess/src/tests/process_tests.cpp
index ed510be..5009610 100644
--- a/3rdparty/libprocess/src/tests/process_tests.cpp
+++ b/3rdparty/libprocess/src/tests/process_tests.cpp
@@ -841,7 +841,7 @@ TEST(Process, collect)
   // First ensure an empty list functions correctly.
   std::list<Future<int> > empty;
   Future<std::list<int> > future = collect(empty);
-  EXPECT_TRUE(future.await());
+  AWAIT_ASSERT_READY(future);
   EXPECT_TRUE(future.get().empty());
 
   Promise<int> promise1;
@@ -863,8 +863,7 @@ TEST(Process, collect)
 
   future = collect(futures);
 
-  EXPECT_TRUE(future.await());
-  EXPECT_TRUE(future.isReady());
+  AWAIT_ASSERT_READY(future);
 
   std::list<int> values;
   values.push_back(1);
@@ -878,6 +877,49 @@ TEST(Process, collect)
 }
 
 
+TEST(Process, await)
+{
+  ASSERT_TRUE(GTEST_IS_THREADSAFE);
+
+  // First ensure an empty list functions correctly.
+  std::list<Future<int> > empty;
+  Future<std::list<Future<int> > > future = await(empty);
+  AWAIT_ASSERT_READY(future);
+  EXPECT_TRUE(future.get().empty());
+
+  Promise<int> promise1;
+  Promise<int> promise2;
+  Promise<int> promise3;
+  Promise<int> promise4;
+
+  std::list<Future<int> > futures;
+  futures.push_back(promise1.future());
+  futures.push_back(promise2.future());
+  futures.push_back(promise3.future());
+  futures.push_back(promise4.future());
+
+  // Set them out-of-order.
+  promise4.set(4);
+  promise2.set(2);
+  promise1.set(1);
+  promise3.set(3);
+
+  future = await(futures);
+
+  AWAIT_ASSERT_READY(future);
+
+  EXPECT_EQ(futures.size(), future.get().size());
+
+  // We expect them to be returned in the same order as the
+  // future list that was passed in.
+  int i = 1;
+  foreach (const Future<int>& result, future.get()) {
+    ASSERT_TRUE(result.isReady());
+    ASSERT_EQ(i++, result.get());
+  }
+}
+
+
 class SettleProcess : public Process<SettleProcess>
 {
 public: