You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@mesos.apache.org by be...@apache.org on 2012/08/07 01:50:17 UTC

svn commit: r1370067 - in /incubator/mesos/trunk/third_party/libprocess: include/process/future.hpp src/tests.cpp

Author: benh
Date: Mon Aug  6 23:50:17 2012
New Revision: 1370067

URL: http://svn.apache.org/viewvc?rev=1370067&view=rev
Log:
Added discard chaining from a future created from Future::then.

Modified:
    incubator/mesos/trunk/third_party/libprocess/include/process/future.hpp
    incubator/mesos/trunk/third_party/libprocess/src/tests.cpp

Modified: incubator/mesos/trunk/third_party/libprocess/include/process/future.hpp
URL: http://svn.apache.org/viewvc/incubator/mesos/trunk/third_party/libprocess/include/process/future.hpp?rev=1370067&r1=1370066&r2=1370067&view=diff
==============================================================================
--- incubator/mesos/trunk/third_party/libprocess/include/process/future.hpp (original)
+++ incubator/mesos/trunk/third_party/libprocess/include/process/future.hpp Mon Aug  6 23:50:17 2012
@@ -644,31 +644,48 @@ const Future<T>& Future<T>::onAny(const 
 
 namespace internal {
 
+// TODO(benh): Need to pass 'future' as a weak_ptr so that we can
+// avoid reference counting cycles!
 template <typename T, typename X>
 void thenf(const std::tr1::shared_ptr<Promise<X> >& promise,
-          const std::tr1::function<Future<X>(const T&)>& callback,
-          const Future<T>& future)
+           const std::tr1::function<Future<X>(const T&)>& callback,
+           const std::tr1::shared_ptr<Future<T> >& future)
 {
-  if (future.isReady()) {
-    promise->associate(callback(future.get()));
-  } else if (future.isFailed()) {
-    promise->fail(future.failure());
-  } else if (future.isDiscarded()) {
-    promise->future().discard();
+  // Propagate discarding up the chain (triggered via setting
+  // onDiscarded on promise->future() in Future::then below).
+  if (promise->future().isDiscarded()) {
+    future->discard();
+  } else {
+    if (future->isReady()) {
+      promise->associate(callback(future->get()));
+    } else if (future->isFailed()) {
+      promise->fail(future->failure());
+    } else if (future->isDiscarded()) {
+      promise->future().discard();
+    }
   }
 }
 
+
+// TODO(benh): Need to pass 'future' as a weak_ptr so that we can
+// avoid reference counting cycles!
 template <typename T, typename X>
 void then(const std::tr1::shared_ptr<Promise<X> >& promise,
           const std::tr1::function<X(const T&)>& callback,
-          const Future<T>& future)
+          const std::tr1::shared_ptr<Future<T> >& future)
 {
-  if (future.isReady()) {
-    promise->set(callback(future.get()));
-  } else if (future.isFailed()) {
-    promise->fail(future.failure());
-  } else if (future.isDiscarded()) {
-    promise->future().discard();
+  // Propagate discarding up the chain (triggered via setting
+  // onDiscarded on promise->future() in Future::then below).
+  if (promise->future().isDiscarded()) {
+    future->discard();
+  } else {
+    if (future->isReady()) {
+      promise->set(callback(future->get()));
+    } else if (future->isFailed()) {
+      promise->fail(future->failure());
+    } else if (future->isDiscarded()) {
+      promise->future().discard();
+    }
   }
 }
 
@@ -686,10 +703,12 @@ Future<X> Future<T>::then(const std::tr1
     std::tr1::bind(&internal::thenf<T, X>,
                    promise,
                    f,
-                   *this);
+                   std::tr1::shared_ptr<Future<T> >(new Future<T>(*this)));
 
   onAny(thenf);
 
+  promise->future().onDiscarded(thenf);
+
   return promise->future();
 }
 
@@ -704,10 +723,12 @@ Future<X> Future<T>::then(const std::tr1
     std::tr1::bind(&internal::then<T, X>,
                    promise,
                    f,
-                   *this);
+                   std::tr1::shared_ptr<Future<T> >(new Future<T>(*this)));
 
   onAny(then);
 
+  promise->future().onDiscarded(then);
+
   return promise->future();
 }
 
@@ -726,10 +747,12 @@ Future<X> Future<T>::then(
     std::tr1::bind(&internal::thenf<T, X>,
                    promise,
                    callback,
-                   *this);
+                   std::tr1::shared_ptr<Future<T> >(new Future<T>(*this)));
 
   onAny(thenf);
 
+  promise->future().onDiscarded(thenf);
+
   return promise->future();
 }
 
@@ -748,10 +771,12 @@ Future<X> Future<T>::then(
     std::tr1::bind(&internal::then<T, X>,
                    promise,
                    callback,
-                   *this);
+                   std::tr1::shared_ptr<Future<T> >(new Future<T>(*this)));
 
   onAny(then);
 
+  promise->future().onDiscarded(then);
+
   return promise->future();
 }
 
@@ -776,6 +801,12 @@ auto Future<T>::then(F f) const
       }
     });
 
+  promise->future().onDiscarded([=] () {
+      if (this->isPending()) {
+        this->discard();
+      }
+    });
+
   return promise->future();
 }
 #endif

Modified: incubator/mesos/trunk/third_party/libprocess/src/tests.cpp
URL: http://svn.apache.org/viewvc/incubator/mesos/trunk/third_party/libprocess/src/tests.cpp?rev=1370067&r1=1370066&r2=1370067&view=diff
==============================================================================
--- incubator/mesos/trunk/third_party/libprocess/src/tests.cpp (original)
+++ incubator/mesos/trunk/third_party/libprocess/src/tests.cpp Mon Aug  6 23:50:17 2012
@@ -136,6 +136,74 @@ TEST(Process, then)
 }
 
 
+Future<bool> readyFuture()
+{
+  return true;
+}
+
+
+Future<bool> failedFuture()
+{
+  return Future<bool>::failed("The value is not positive (or zero)");
+}
+
+
+Future<bool> pendingFuture(Future<bool>* future)
+{
+  return *future; // Keep it pending.
+}
+
+
+Future<std::string> second(const bool& b)
+{
+  return b ? std::string("true") : std::string("false");
+}
+
+
+Future<std::string> third(const std::string& s)
+{
+  return s;
+}
+
+
+TEST(Process, chain)
+{
+  Promise<int*> promise;
+
+  Future<std::string> s = readyFuture()
+    .then(std::tr1::bind(&second, std::tr1::placeholders::_1))
+    .then(std::tr1::bind(&third, std::tr1::placeholders::_1));
+
+  s.await();
+
+  ASSERT_TRUE(s.isReady());
+  EXPECT_EQ("true", s.get());
+
+  s = failedFuture()
+    .then(std::tr1::bind(&second, std::tr1::placeholders::_1))
+    .then(std::tr1::bind(&third, std::tr1::placeholders::_1));
+
+  s.await();
+
+  ASSERT_TRUE(s.isFailed());
+
+  Future<bool> future;
+
+  s = pendingFuture(&future)
+    .then(std::tr1::bind(&second, std::tr1::placeholders::_1))
+    .then(std::tr1::bind(&third, std::tr1::placeholders::_1));
+
+  ASSERT_TRUE(s.isPending());
+  ASSERT_TRUE(future.isPending());
+
+  s.discard();
+
+  future.await();
+
+  ASSERT_TRUE(future.isDiscarded());
+}
+
+
 class SpawnProcess : public Process<SpawnProcess>
 {
 public: