You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@mesos.apache.org by be...@apache.org on 2012/08/07 01:50:17 UTC
svn commit: r1370067 - in /incubator/mesos/trunk/third_party/libprocess:
include/process/future.hpp src/tests.cpp
Author: benh
Date: Mon Aug 6 23:50:17 2012
New Revision: 1370067
URL: http://svn.apache.org/viewvc?rev=1370067&view=rev
Log:
Added discard chaining from a future created from Future::then.
Modified:
incubator/mesos/trunk/third_party/libprocess/include/process/future.hpp
incubator/mesos/trunk/third_party/libprocess/src/tests.cpp
Modified: incubator/mesos/trunk/third_party/libprocess/include/process/future.hpp
URL: http://svn.apache.org/viewvc/incubator/mesos/trunk/third_party/libprocess/include/process/future.hpp?rev=1370067&r1=1370066&r2=1370067&view=diff
==============================================================================
--- incubator/mesos/trunk/third_party/libprocess/include/process/future.hpp (original)
+++ incubator/mesos/trunk/third_party/libprocess/include/process/future.hpp Mon Aug 6 23:50:17 2012
@@ -644,31 +644,48 @@ const Future<T>& Future<T>::onAny(const
namespace internal {
+// TODO(benh): Need to pass 'future' as a weak_ptr so that we can
+// avoid reference counting cycles!
template <typename T, typename X>
void thenf(const std::tr1::shared_ptr<Promise<X> >& promise,
- const std::tr1::function<Future<X>(const T&)>& callback,
- const Future<T>& future)
+ const std::tr1::function<Future<X>(const T&)>& callback,
+ const std::tr1::shared_ptr<Future<T> >& future)
{
- if (future.isReady()) {
- promise->associate(callback(future.get()));
- } else if (future.isFailed()) {
- promise->fail(future.failure());
- } else if (future.isDiscarded()) {
- promise->future().discard();
+ // Propagate discarding up the chain (triggered via setting
+ // onDiscarded on promise->future() in Future::then below).
+ if (promise->future().isDiscarded()) {
+ future->discard();
+ } else {
+ if (future->isReady()) {
+ promise->associate(callback(future->get()));
+ } else if (future->isFailed()) {
+ promise->fail(future->failure());
+ } else if (future->isDiscarded()) {
+ promise->future().discard();
+ }
}
}
+
+// TODO(benh): Need to pass 'future' as a weak_ptr so that we can
+// avoid reference counting cycles!
template <typename T, typename X>
void then(const std::tr1::shared_ptr<Promise<X> >& promise,
const std::tr1::function<X(const T&)>& callback,
- const Future<T>& future)
+ const std::tr1::shared_ptr<Future<T> >& future)
{
- if (future.isReady()) {
- promise->set(callback(future.get()));
- } else if (future.isFailed()) {
- promise->fail(future.failure());
- } else if (future.isDiscarded()) {
- promise->future().discard();
+ // Propagate discarding up the chain (triggered via setting
+ // onDiscarded on promise->future() in Future::then below).
+ if (promise->future().isDiscarded()) {
+ future->discard();
+ } else {
+ if (future->isReady()) {
+ promise->set(callback(future->get()));
+ } else if (future->isFailed()) {
+ promise->fail(future->failure());
+ } else if (future->isDiscarded()) {
+ promise->future().discard();
+ }
}
}
@@ -686,10 +703,12 @@ Future<X> Future<T>::then(const std::tr1
std::tr1::bind(&internal::thenf<T, X>,
promise,
f,
- *this);
+ std::tr1::shared_ptr<Future<T> >(new Future<T>(*this)));
onAny(thenf);
+ promise->future().onDiscarded(thenf);
+
return promise->future();
}
@@ -704,10 +723,12 @@ Future<X> Future<T>::then(const std::tr1
std::tr1::bind(&internal::then<T, X>,
promise,
f,
- *this);
+ std::tr1::shared_ptr<Future<T> >(new Future<T>(*this)));
onAny(then);
+ promise->future().onDiscarded(then);
+
return promise->future();
}
@@ -726,10 +747,12 @@ Future<X> Future<T>::then(
std::tr1::bind(&internal::thenf<T, X>,
promise,
callback,
- *this);
+ std::tr1::shared_ptr<Future<T> >(new Future<T>(*this)));
onAny(thenf);
+ promise->future().onDiscarded(thenf);
+
return promise->future();
}
@@ -748,10 +771,12 @@ Future<X> Future<T>::then(
std::tr1::bind(&internal::then<T, X>,
promise,
callback,
- *this);
+ std::tr1::shared_ptr<Future<T> >(new Future<T>(*this)));
onAny(then);
+ promise->future().onDiscarded(then);
+
return promise->future();
}
@@ -776,6 +801,12 @@ auto Future<T>::then(F f) const
}
});
+ promise->future().onDiscarded([=] () {
+ if (this->isPending()) {
+ this->discard();
+ }
+ });
+
return promise->future();
}
#endif
Modified: incubator/mesos/trunk/third_party/libprocess/src/tests.cpp
URL: http://svn.apache.org/viewvc/incubator/mesos/trunk/third_party/libprocess/src/tests.cpp?rev=1370067&r1=1370066&r2=1370067&view=diff
==============================================================================
--- incubator/mesos/trunk/third_party/libprocess/src/tests.cpp (original)
+++ incubator/mesos/trunk/third_party/libprocess/src/tests.cpp Mon Aug 6 23:50:17 2012
@@ -136,6 +136,74 @@ TEST(Process, then)
}
+Future<bool> readyFuture()
+{
+ return true;
+}
+
+
+Future<bool> failedFuture()
+{
+ return Future<bool>::failed("The value is not positive (or zero)");
+}
+
+
+Future<bool> pendingFuture(Future<bool>* future)
+{
+ return *future; // Keep it pending.
+}
+
+
+Future<std::string> second(const bool& b)
+{
+ return b ? std::string("true") : std::string("false");
+}
+
+
+Future<std::string> third(const std::string& s)
+{
+ return s;
+}
+
+
+TEST(Process, chain)
+{
+ Promise<int*> promise;
+
+ Future<std::string> s = readyFuture()
+ .then(std::tr1::bind(&second, std::tr1::placeholders::_1))
+ .then(std::tr1::bind(&third, std::tr1::placeholders::_1));
+
+ s.await();
+
+ ASSERT_TRUE(s.isReady());
+ EXPECT_EQ("true", s.get());
+
+ s = failedFuture()
+ .then(std::tr1::bind(&second, std::tr1::placeholders::_1))
+ .then(std::tr1::bind(&third, std::tr1::placeholders::_1));
+
+ s.await();
+
+ ASSERT_TRUE(s.isFailed());
+
+ Future<bool> future;
+
+ s = pendingFuture(&future)
+ .then(std::tr1::bind(&second, std::tr1::placeholders::_1))
+ .then(std::tr1::bind(&third, std::tr1::placeholders::_1));
+
+ ASSERT_TRUE(s.isPending());
+ ASSERT_TRUE(future.isPending());
+
+ s.discard();
+
+ future.await();
+
+ ASSERT_TRUE(future.isDiscarded());
+}
+
+
class SpawnProcess : public Process<SpawnProcess>
{
public: