You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@mesos.apache.org by bm...@apache.org on 2013/08/08 02:05:30 UTC
git commit: Added process::await() for asynchronously awaiting for
Futures to transition from pending.
Updated Branches:
refs/heads/master a47b58e72 -> c4d96d021
Added process::await() for asynchronously awaiting for Futures to
transition from pending.
Review: https://reviews.apache.org/r/13381
Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/c4d96d02
Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/c4d96d02
Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/c4d96d02
Branch: refs/heads/master
Commit: c4d96d02172042999decee7ee64a00a18398dd0c
Parents: a47b58e
Author: Benjamin Mahler <bm...@twitter.com>
Authored: Thu Jul 25 14:48:32 2013 -0700
Committer: Benjamin Mahler <bm...@twitter.com>
Committed: Wed Aug 7 15:24:31 2013 -0700
----------------------------------------------------------------------
3rdparty/libprocess/include/process/collect.hpp | 105 ++++++++++++++++++-
3rdparty/libprocess/src/tests/process_tests.cpp | 48 ++++++++-
2 files changed, 148 insertions(+), 5 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/mesos/blob/c4d96d02/3rdparty/libprocess/include/process/collect.hpp
----------------------------------------------------------------------
diff --git a/3rdparty/libprocess/include/process/collect.hpp b/3rdparty/libprocess/include/process/collect.hpp
index 3c620aa..27e2729 100644
--- a/3rdparty/libprocess/include/process/collect.hpp
+++ b/3rdparty/libprocess/include/process/collect.hpp
@@ -14,6 +14,9 @@
#include <stout/none.hpp>
#include <stout/option.hpp>
+// TODO(bmahler): Move these into a futures.hpp header to group Future
+// related utilities.
+
namespace process {
// Waits on each future in the specified list and returns the list of
@@ -26,6 +29,14 @@ Future<std::list<T> > collect(
const Option<Timeout>& timeout = None());
+// Waits on each future in the specified set and returns the list of
+// non-pending futures. On timeout, the result will be a failure.
+template <typename T>
+Future<std::list<Future<T> > > await(
+ std::list<Future<T> >& futures,
+ const Option<Timeout>& timeout = None());
+
+
namespace internal {
template <typename T>
@@ -94,7 +105,8 @@ private:
} else {
assert(future.isReady());
ready += 1;
- if (futures.size() == ready) {
+ if (ready == futures.size()) {
+ std::list<T> values;
foreach (const Future<T>& future, futures) {
values.push_back(future.get());
}
@@ -107,7 +119,79 @@ private:
const std::list<Future<T> > futures;
const Option<Timeout> timeout;
Promise<std::list<T> >* promise;
- std::list<T> values;
+ size_t ready;
+};
+
+
+template <typename T>
+class AwaitProcess : public Process<AwaitProcess<T> >
+{
+public:
+ AwaitProcess(
+ const std::list<Future<T> >& _futures,
+ const Option<Timeout>& _timeout,
+ Promise<std::list<Future<T> > >* _promise)
+ : futures(_futures),
+ timeout(_timeout),
+ promise(_promise),
+ ready(0) {}
+
+ virtual ~AwaitProcess()
+ {
+ delete promise;
+ }
+
+ virtual void initialize()
+ {
+ // Stop this nonsense if nobody cares.
+ promise->future().onDiscarded(defer(this, &AwaitProcess::discarded));
+
+ // Only wait as long as requested.
+ if (timeout.isSome()) {
+ delay(timeout.get().remaining(), this, &AwaitProcess::timedout);
+ }
+
+ typename std::list<Future<T> >::const_iterator iterator;
+ for (iterator = futures.begin(); iterator != futures.end(); ++iterator) {
+ (*iterator).onAny(
+ defer(this, &AwaitProcess::waited, std::tr1::placeholders::_1));
+ }
+ }
+
+private:
+ void discarded()
+ {
+ terminate(this);
+ }
+
+ void timedout()
+ {
+ // Need to discard all of the futures so any of their associated
+ // resources can get properly cleaned up.
+ typename std::list<Future<T> >::const_iterator iterator;
+ for (iterator = futures.begin(); iterator != futures.end(); ++iterator) {
+ Future<T> future = *iterator; // Need a non-const copy to discard.
+ future.discard();
+ }
+
+ promise->fail("Collect failed: timed out");
+ terminate(this);
+ }
+
+ void waited(const Future<T>& future)
+ {
+ assert(!future.isPending());
+
+ ready += 1;
+ if (ready == futures.size()) {
+ promise->set(futures);
+ terminate(this);
+ }
+ }
+
+ const std::list<Future<T> > futures;
+ const Option<Timeout> timeout;
+ Promise<std::list<Future<T> > >* promise;
size_t ready;
};
@@ -129,6 +213,23 @@ inline Future<std::list<T> > collect(
return future;
}
+
+template <typename T>
+inline Future<std::list<Future<T> > > await(
+ std::list<Future<T> >& futures,
+ const Option<Timeout>& timeout)
+{
+ if (futures.empty()) {
+ return futures;
+ }
+
+ Promise<std::list<Future<T> > >* promise =
+ new Promise<std::list<Future<T> > >();
+ Future<std::list<Future<T> > > future = promise->future();
+ spawn(new internal::AwaitProcess<T>(futures, timeout, promise), true);
+ return future;
+}
+
} // namespace process {
#endif // __PROCESS_COLLECT_HPP__
http://git-wip-us.apache.org/repos/asf/mesos/blob/c4d96d02/3rdparty/libprocess/src/tests/process_tests.cpp
----------------------------------------------------------------------
diff --git a/3rdparty/libprocess/src/tests/process_tests.cpp b/3rdparty/libprocess/src/tests/process_tests.cpp
index ed510be..5009610 100644
--- a/3rdparty/libprocess/src/tests/process_tests.cpp
+++ b/3rdparty/libprocess/src/tests/process_tests.cpp
@@ -841,7 +841,7 @@ TEST(Process, collect)
// First ensure an empty list functions correctly.
std::list<Future<int> > empty;
Future<std::list<int> > future = collect(empty);
- EXPECT_TRUE(future.await());
+ AWAIT_ASSERT_READY(future);
EXPECT_TRUE(future.get().empty());
Promise<int> promise1;
@@ -863,8 +863,7 @@ TEST(Process, collect)
future = collect(futures);
- EXPECT_TRUE(future.await());
- EXPECT_TRUE(future.isReady());
+ AWAIT_ASSERT_READY(future);
std::list<int> values;
values.push_back(1);
@@ -878,6 +877,49 @@ TEST(Process, collect)
}
+TEST(Process, await)
+{
+ ASSERT_TRUE(GTEST_IS_THREADSAFE);
+
+ // First ensure an empty list functions correctly.
+ std::list<Future<int> > empty;
+ Future<std::list<Future<int> > > future = await(empty);
+ AWAIT_ASSERT_READY(future);
+ EXPECT_TRUE(future.get().empty());
+
+ Promise<int> promise1;
+ Promise<int> promise2;
+ Promise<int> promise3;
+ Promise<int> promise4;
+
+ std::list<Future<int> > futures;
+ futures.push_back(promise1.future());
+ futures.push_back(promise2.future());
+ futures.push_back(promise3.future());
+ futures.push_back(promise4.future());
+
+ // Set them out-of-order.
+ promise4.set(4);
+ promise2.set(2);
+ promise1.set(1);
+ promise3.set(3);
+
+ future = await(futures);
+
+ AWAIT_ASSERT_READY(future);
+
+ EXPECT_EQ(futures.size(), future.get().size());
+
+ // We expect them to be returned in the same order as the
+ // future list that was passed in.
+ int i = 1;
+ foreach (const Future<int>& result, future.get()) {
+ ASSERT_TRUE(result.isReady());
+ ASSERT_EQ(i++, result.get());
+ }
+}
+
+
class SettleProcess : public Process<SettleProcess>
{
public: