You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@mesos.apache.org by be...@apache.org on 2012/02/16 01:24:04 UTC
svn commit: r1244802 - in /incubator/mesos/trunk/third_party/libprocess:
include/process/collect.hpp include/process/dispatch.hpp
include/process/future.hpp src/process.cpp src/tests.cpp
Author: benh
Date: Thu Feb 16 00:24:03 2012
New Revision: 1244802
URL: http://svn.apache.org/viewvc?rev=1244802&view=rev
Log:
Some libprocess updates: (1) added a collect mechanism for futures, (2) added associate to promise and removed code from dispatch.hpp, (3) added tests.
Added:
incubator/mesos/trunk/third_party/libprocess/include/process/collect.hpp
Modified:
incubator/mesos/trunk/third_party/libprocess/include/process/dispatch.hpp
incubator/mesos/trunk/third_party/libprocess/include/process/future.hpp
incubator/mesos/trunk/third_party/libprocess/src/process.cpp
incubator/mesos/trunk/third_party/libprocess/src/tests.cpp
Added: incubator/mesos/trunk/third_party/libprocess/include/process/collect.hpp
URL: http://svn.apache.org/viewvc/incubator/mesos/trunk/third_party/libprocess/include/process/collect.hpp?rev=1244802&view=auto
==============================================================================
--- incubator/mesos/trunk/third_party/libprocess/include/process/collect.hpp (added)
+++ incubator/mesos/trunk/third_party/libprocess/include/process/collect.hpp Thu Feb 16 00:24:03 2012
@@ -0,0 +1,87 @@
+#ifndef __PROCESS_COLLECT_HPP__
+#define __PROCESS_COLLECT_HPP__
+
+#include <assert.h>
+
+#include <set>
+
+#include <process/defer.hpp>
+#include <process/future.hpp>
+#include <process/process.hpp>
+
+namespace process {
+
+// Transforms a set of futures of type T into a set of T's.
+template <typename T>
+Future<std::set<T> > collect(std::set<Future<T> >& futures);
+
+
+namespace internal {
+
+template <typename T>
+class CollectProcess : public Process<CollectProcess<T> >
+{
+public:
+ CollectProcess(
+ const std::set<Future<T> >& _futures,
+ Promise<std::set<T> >* _promise)
+ : futures(_futures), promise(_promise) {}
+
+ virtual ~CollectProcess()
+ {
+ delete promise;
+ }
+
+ virtual void initialize()
+ {
+ // Stop this nonsense if nobody cares.
+ promise->future().onDiscarded(defer(this, &CollectProcess::discarded));
+
+ typename std::set<Future<T> >::iterator iterator;
+ for (iterator = futures.begin(); iterator != futures.end(); ++iterator) {
+ const Future<T>& future = *iterator;
+ future.onAny(defer(this, &CollectProcess::waited, future));
+ }
+ }
+
+private:
+ void discarded()
+ {
+ terminate(this);
+ }
+
+ void waited(const Future<T>& future)
+ {
+ if (future.isFailed()) {
+ promise->fail(future.failure());
+ } else if (future.isDiscarded()) {
+ promise->future().discard();
+ } else {
+ assert(future.isReady());
+ values.insert(future.get());
+ if (futures.size() == values.size()) {
+ promise->set(values);
+ terminate(this);
+ }
+ }
+ }
+
+ std::set<Future<T> > futures;
+ Promise<std::set<T> >* promise;
+ std::set<T> values;
+};
+
+} // namespace internal {
+
+
+template <typename T>
+inline Future<std::set<T> > collect(std::set<Future<T> >& futures)
+{
+ Promise<std::set<T> >* promise = new Promise<std::set<T> >();
+ spawn(new internal::CollectProcess<T>(futures, promise), true);
+ return promise->future();
+}
+
+} // namespace process {
+
+#endif // __PROCESS_COLLECT_HPP__
Modified: incubator/mesos/trunk/third_party/libprocess/include/process/dispatch.hpp
URL: http://svn.apache.org/viewvc/incubator/mesos/trunk/third_party/libprocess/include/process/dispatch.hpp?rev=1244802&r1=1244801&r2=1244802&view=diff
==============================================================================
--- incubator/mesos/trunk/third_party/libprocess/include/process/dispatch.hpp (original)
+++ incubator/mesos/trunk/third_party/libprocess/include/process/dispatch.hpp Thu Feb 16 00:24:03 2012
@@ -50,44 +50,10 @@ void dispatch(
const UPID& pid,
const std::tr1::shared_ptr<std::tr1::function<void(ProcessBase*)> >& f);
-
-// The magic of dispatch actually occurs within the dispatcher
-// routines. In particular, some of the dispatcher routines need to
-// wait for values and "associate" them with the future that got
-// returned from the original call to dispatch. Those association
-// functions are defined here:
-
-template <typename T>
-void __associate(const Future<T>& from, std::tr1::shared_ptr<Promise<T> > to)
-{
- // The future associated with this promise is either pending
- // (because we haven't set it, failed it, or discarded it) or it's
- // discarded (because the receiver of the future has discarded it).
- assert(to->future().isPending() || to->future().isDiscarded());
-
- if (to->future().isPending()) { // No-op if it's discarded.
- if (from.isReady()) {
- to->set(from.get());
- } else if (from.isFailed()) {
- to->fail(from.failure());
- } else if (from.isDiscarded()) {
- to->future().discard();
- }
- }
-}
-
-
-template <typename T>
-void associate(const Future<T>& from, std::tr1::shared_ptr<Promise<T> > to)
-{
- from.onAny(std::tr1::bind(&__associate<T>, from, to));
-}
-
-
-// Finally come the dispatcher functions (one for each return type:
-// void, future, value) which should complete the picture. Given the
-// process argument these routines downcast the process to the correct
-// subtype and invoke the thunk using the subtype as the argument
+// For each return type (void, future, value) there is a dispatcher
+// function which should complete the picture. Given the process
+// argument these routines downcast the process to the correct subtype
+// and invoke the thunk using the subtype as the argument
// (receiver). Note that we must use dynamic_cast because we permit a
// process to use multiple inheritance (e.g., to expose multiple
// callback interfaces).
@@ -113,7 +79,7 @@ void pdispatcher(
assert(process != NULL);
T* t = dynamic_cast<T*>(process);
assert(t != NULL);
- associate((*thunk)(t), promise);
+ promise->associate((*thunk)(t));
}
Modified: incubator/mesos/trunk/third_party/libprocess/include/process/future.hpp
URL: http://svn.apache.org/viewvc/incubator/mesos/trunk/third_party/libprocess/include/process/future.hpp?rev=1244802&r1=1244801&r2=1244802&view=diff
==============================================================================
--- incubator/mesos/trunk/third_party/libprocess/include/process/future.hpp (original)
+++ incubator/mesos/trunk/third_party/libprocess/include/process/future.hpp Thu Feb 16 00:24:03 2012
@@ -27,6 +27,9 @@ template <typename T>
class Future
{
public:
+ // Constructs a failed future.
+ static Future<T> failed(const std::string& message);
+
Future();
Future(const T& _t);
Future(const Future<T>& that);
@@ -111,7 +114,7 @@ private:
};
-// TODO(benh): Making Promise a subclass of Future?
+// TODO(benh): Make Promise a subclass of Future?
template <typename T>
class Promise
{
@@ -123,6 +126,20 @@ public:
bool set(const T& _t);
bool fail(const std::string& message);
+ bool associate(const Future<T>& future)
+ {
+ if (!f.isPending()) {
+ return false;
+ }
+
+ future
+ .onReady(std::tr1::bind(&Future<T>::set, f, std::tr1::placeholders::_1))
+ .onFailed(std::tr1::bind(&Future<T>::fail, f, std::tr1::placeholders::_1))
+ .onDiscarded(std::tr1::bind(&Future<T>::discard, f));
+
+ return true;
+ }
+
// Returns a copy of the future associated with this promise.
Future<T> future() const;
@@ -187,6 +204,7 @@ inline void acquire(int* lock)
}
}
+
inline void release(int* lock)
{
// Unlock via a compare-and-swap so we get a memory barrier too.
@@ -194,6 +212,7 @@ inline void release(int* lock)
assert(unlocked);
}
+
template <typename T>
void select(
const Future<T>& future,
@@ -250,6 +269,15 @@ void discard(const std::set<Future<T> >&
template <typename T>
+Future<T> Future<T>::failed(const std::string& message)
+{
+ Future<T> future;
+ future.fail(message);
+ return future;
+}
+
+
+template <typename T>
Future<T>::Future()
: refs(new int(1)),
lock(new int(0)),
Modified: incubator/mesos/trunk/third_party/libprocess/src/process.cpp
URL: http://svn.apache.org/viewvc/incubator/mesos/trunk/third_party/libprocess/src/process.cpp?rev=1244802&r1=1244801&r2=1244802&view=diff
==============================================================================
--- incubator/mesos/trunk/third_party/libprocess/src/process.cpp (original)
+++ incubator/mesos/trunk/third_party/libprocess/src/process.cpp Thu Feb 16 00:24:03 2012
@@ -2687,7 +2687,7 @@ void ProcessBase::visit(const HttpEvent&
dispatch(proxy, &HttpProxy::handle, future, event.request->keepAlive);
// Now call the handler and associate the response with the promise.
- internal::associate(handlers.http[name](*event.request), promise);
+ promise->associate(handlers.http[name](*event.request));
} else if (resources.count(name) > 0) {
HttpOKResponse response;
response.headers["Content-Type"] = resources[name].type;
Modified: incubator/mesos/trunk/third_party/libprocess/src/tests.cpp
URL: http://svn.apache.org/viewvc/incubator/mesos/trunk/third_party/libprocess/src/tests.cpp?rev=1244802&r1=1244801&r2=1244802&view=diff
==============================================================================
--- incubator/mesos/trunk/third_party/libprocess/src/tests.cpp (original)
+++ incubator/mesos/trunk/third_party/libprocess/src/tests.cpp Thu Feb 16 00:24:03 2012
@@ -8,6 +8,7 @@
#include <string>
#include <sstream>
+#include <process/collect.hpp>
#include <process/clock.hpp>
#include <process/defer.hpp>
#include <process/dispatch.hpp>
@@ -73,11 +74,34 @@ TEST(libprocess, future)
{
Promise<bool> promise;
promise.set(true);
- promise.future().await();
+ ASSERT_TRUE(promise.future().isReady());
EXPECT_TRUE(promise.future().get());
}
+TEST(libprocess, associate)
+{
+ Promise<bool> promise1;
+ Future<bool> future1(true);
+ promise1.associate(future1);
+ ASSERT_TRUE(promise1.future().isReady());
+ EXPECT_TRUE(promise1.future().get());
+
+ Promise<bool> promise2;
+ Future<bool> future2;
+ promise2.associate(future2);
+ future2.discard();
+ ASSERT_TRUE(promise2.future().isDiscarded());
+
+ Promise<bool> promise3;
+ Promise<bool> promise4;
+ promise3.associate(promise4.future());
+ promise4.fail("associate");
+ ASSERT_TRUE(promise3.future().isFailed());
+ EXPECT_EQ("associate", promise3.future().failure());
+}
+
+
class SpawnProcess : public Process<SpawnProcess>
{
public:
@@ -553,6 +577,41 @@ TEST(libprocess, select)
}
+TEST(libprocess, collect)
+{
+ ASSERT_TRUE(GTEST_IS_THREADSAFE);
+
+ Promise<int> promise1;
+ Promise<int> promise2;
+ Promise<int> promise3;
+ Promise<int> promise4;
+
+ std::set<Future<int> > futures;
+ futures.insert(promise1.future());
+ futures.insert(promise2.future());
+ futures.insert(promise3.future());
+ futures.insert(promise4.future());
+
+ promise1.set(1);
+ promise2.set(2);
+ promise3.set(3);
+ promise4.set(4);
+
+ Future<std::set<int> > future = collect(futures);
+
+ EXPECT_TRUE(future.await());
+ EXPECT_TRUE(future.isReady());
+
+ std::set<int> values;
+ values.insert(1);
+ values.insert(2);
+ values.insert(3);
+ values.insert(4);
+
+ EXPECT_EQ(values, future.get());
+}
+
+
class SettleProcess : public Process<SettleProcess>
{
public: