You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@mesos.apache.org by be...@apache.org on 2012/02/16 01:24:04 UTC

svn commit: r1244802 - in /incubator/mesos/trunk/third_party/libprocess: include/process/collect.hpp include/process/dispatch.hpp include/process/future.hpp src/process.cpp src/tests.cpp

Author: benh
Date: Thu Feb 16 00:24:03 2012
New Revision: 1244802

URL: http://svn.apache.org/viewvc?rev=1244802&view=rev
Log:
Some libprocess updates: (1) added a collect mechanism for futures, (2) added associate to promise and removed code from dispatch.hpp, (3) added tests.

Added:
    incubator/mesos/trunk/third_party/libprocess/include/process/collect.hpp
Modified:
    incubator/mesos/trunk/third_party/libprocess/include/process/dispatch.hpp
    incubator/mesos/trunk/third_party/libprocess/include/process/future.hpp
    incubator/mesos/trunk/third_party/libprocess/src/process.cpp
    incubator/mesos/trunk/third_party/libprocess/src/tests.cpp

Added: incubator/mesos/trunk/third_party/libprocess/include/process/collect.hpp
URL: http://svn.apache.org/viewvc/incubator/mesos/trunk/third_party/libprocess/include/process/collect.hpp?rev=1244802&view=auto
==============================================================================
--- incubator/mesos/trunk/third_party/libprocess/include/process/collect.hpp (added)
+++ incubator/mesos/trunk/third_party/libprocess/include/process/collect.hpp Thu Feb 16 00:24:03 2012
@@ -0,0 +1,87 @@
+#ifndef __PROCESS_COLLECT_HPP__
+#define __PROCESS_COLLECT_HPP__
+
+#include <assert.h>
+
+#include <set>
+
+#include <process/defer.hpp>
+#include <process/future.hpp>
+#include <process/process.hpp>
+
+namespace process {
+
+// Transforms a set of futures of type T into a set of T's.
+template <typename T>
+Future<std::set<T> > collect(std::set<Future<T> >& futures);
+
+
+namespace internal {
+
+template <typename T>
+class CollectProcess : public Process<CollectProcess<T> >
+{
+public:
+  CollectProcess(
+      const std::set<Future<T> >& _futures,
+      Promise<std::set<T> >* _promise)
+    : futures(_futures), promise(_promise) {}
+
+  virtual ~CollectProcess()
+  {
+    delete promise;
+  }
+
+  virtual void initialize()
+  {
+    // Stop this nonsense if nobody cares.
+    promise->future().onDiscarded(defer(this, &CollectProcess::discarded));
+
+    typename std::set<Future<T> >::iterator iterator;
+    for (iterator = futures.begin(); iterator != futures.end(); ++iterator) {
+      const Future<T>& future = *iterator;
+      future.onAny(defer(this, &CollectProcess::waited, future));
+    }
+  }
+
+private:
+  void discarded()
+  {
+    terminate(this);
+  }
+
+  void waited(const Future<T>& future)
+  {
+    if (future.isFailed()) {
+      promise->fail(future.failure());
+    } else if (future.isDiscarded()) {
+      promise->future().discard();
+    } else {
+      assert(future.isReady());
+      values.insert(future.get());
+      if (futures.size() == values.size()) {
+        promise->set(values);
+        terminate(this);
+      }
+    }
+  }
+
+  std::set<Future<T> > futures;
+  Promise<std::set<T> >* promise;
+  std::set<T> values;
+};
+
+} // namespace internal {
+
+
+template <typename T>
+inline Future<std::set<T> > collect(std::set<Future<T> >& futures)
+{
+  Promise<std::set<T> >* promise = new Promise<std::set<T> >();
+  spawn(new internal::CollectProcess<T>(futures, promise), true);
+  return promise->future();
+}
+
+} // namespace process {
+
+#endif // __PROCESS_COLLECT_HPP__

Modified: incubator/mesos/trunk/third_party/libprocess/include/process/dispatch.hpp
URL: http://svn.apache.org/viewvc/incubator/mesos/trunk/third_party/libprocess/include/process/dispatch.hpp?rev=1244802&r1=1244801&r2=1244802&view=diff
==============================================================================
--- incubator/mesos/trunk/third_party/libprocess/include/process/dispatch.hpp (original)
+++ incubator/mesos/trunk/third_party/libprocess/include/process/dispatch.hpp Thu Feb 16 00:24:03 2012
@@ -50,44 +50,10 @@ void dispatch(
     const UPID& pid,
     const std::tr1::shared_ptr<std::tr1::function<void(ProcessBase*)> >& f);
 
-
-// The magic of dispatch actually occurs within the dispatcher
-// routines. In particular, some of the dispatcher routines need to
-// wait for values and "associate" them with the future that got
-// returned from the original call to dispatch. Those association
-// functions are defined here:
-
-template <typename T>
-void __associate(const Future<T>& from, std::tr1::shared_ptr<Promise<T> > to)
-{
-  // The future associated with this promise is either pending
-  // (because we haven't set it, failed it, or discarded it) or it's
-  // discarded (because the receiver of the future has discarded it).
-  assert(to->future().isPending() || to->future().isDiscarded());
-
-  if (to->future().isPending()) { // No-op if it's discarded.
-    if (from.isReady()) {
-      to->set(from.get());
-    } else if (from.isFailed()) {
-      to->fail(from.failure());
-    } else if (from.isDiscarded()) {
-      to->future().discard();
-    }
-  }
-}
-
-
-template <typename T>
-void associate(const Future<T>& from, std::tr1::shared_ptr<Promise<T> > to)
-{
-  from.onAny(std::tr1::bind(&__associate<T>, from, to));
-}
-
-
-// Finally come the dispatcher functions (one for each return type:
-// void, future, value) which should complete the picture. Given the
-// process argument these routines downcast the process to the correct
-// subtype and invoke the thunk using the subtype as the argument
+// For each return type (void, future, value) there is a dispatcher
+// function which should complete the picture. Given the process
+// argument these routines downcast the process to the correct subtype
+// and invoke the thunk using the subtype as the argument
 // (receiver). Note that we must use dynamic_cast because we permit a
 // process to use multiple inheritance (e.g., to expose multiple
 // callback interfaces).
@@ -113,7 +79,7 @@ void pdispatcher(
   assert(process != NULL);
   T* t = dynamic_cast<T*>(process);
   assert(t != NULL);
-  associate((*thunk)(t), promise);
+  promise->associate((*thunk)(t));
 }
 
 

Modified: incubator/mesos/trunk/third_party/libprocess/include/process/future.hpp
URL: http://svn.apache.org/viewvc/incubator/mesos/trunk/third_party/libprocess/include/process/future.hpp?rev=1244802&r1=1244801&r2=1244802&view=diff
==============================================================================
--- incubator/mesos/trunk/third_party/libprocess/include/process/future.hpp (original)
+++ incubator/mesos/trunk/third_party/libprocess/include/process/future.hpp Thu Feb 16 00:24:03 2012
@@ -27,6 +27,9 @@ template <typename T>
 class Future
 {
 public:
+  // Constructs a failed future.
+  static Future<T> failed(const std::string& message);
+
   Future();
   Future(const T& _t);
   Future(const Future<T>& that);
@@ -111,7 +114,7 @@ private:
 };
 
 
-// TODO(benh): Making Promise a subclass of Future?
+// TODO(benh): Make Promise a subclass of Future?
 template <typename T>
 class Promise
 {
@@ -123,6 +126,20 @@ public:
   bool set(const T& _t);
   bool fail(const std::string& message);
 
+  bool associate(const Future<T>& future)
+  {
+    if (!f.isPending()) {
+      return false;
+    }
+
+    future
+      .onReady(std::tr1::bind(&Future<T>::set, f, std::tr1::placeholders::_1))
+      .onFailed(std::tr1::bind(&Future<T>::fail, f, std::tr1::placeholders::_1))
+      .onDiscarded(std::tr1::bind(&Future<T>::discard, f));
+
+    return true;
+  }
+
   // Returns a copy of the future associated with this promise.
   Future<T> future() const;
 
@@ -187,6 +204,7 @@ inline void acquire(int* lock)
   }
 }
 
+
 inline void release(int* lock)
 {
   // Unlock via a compare-and-swap so we get a memory barrier too.
@@ -194,6 +212,7 @@ inline void release(int* lock)
   assert(unlocked);
 }
 
+
 template <typename T>
 void select(
     const Future<T>& future,
@@ -250,6 +269,15 @@ void discard(const std::set<Future<T> >&
 
 
 template <typename T>
+Future<T> Future<T>::failed(const std::string& message)
+{
+  Future<T> future;
+  future.fail(message);
+  return future;
+}
+
+
+template <typename T>
 Future<T>::Future()
   : refs(new int(1)),
     lock(new int(0)),

Modified: incubator/mesos/trunk/third_party/libprocess/src/process.cpp
URL: http://svn.apache.org/viewvc/incubator/mesos/trunk/third_party/libprocess/src/process.cpp?rev=1244802&r1=1244801&r2=1244802&view=diff
==============================================================================
--- incubator/mesos/trunk/third_party/libprocess/src/process.cpp (original)
+++ incubator/mesos/trunk/third_party/libprocess/src/process.cpp Thu Feb 16 00:24:03 2012
@@ -2687,7 +2687,7 @@ void ProcessBase::visit(const HttpEvent&
     dispatch(proxy, &HttpProxy::handle, future, event.request->keepAlive);
 
     // Now call the handler and associate the response with the promise.
-    internal::associate(handlers.http[name](*event.request), promise);
+    promise->associate(handlers.http[name](*event.request));
   } else if (resources.count(name) > 0) {
     HttpOKResponse response;
     response.headers["Content-Type"] = resources[name].type;

Modified: incubator/mesos/trunk/third_party/libprocess/src/tests.cpp
URL: http://svn.apache.org/viewvc/incubator/mesos/trunk/third_party/libprocess/src/tests.cpp?rev=1244802&r1=1244801&r2=1244802&view=diff
==============================================================================
--- incubator/mesos/trunk/third_party/libprocess/src/tests.cpp (original)
+++ incubator/mesos/trunk/third_party/libprocess/src/tests.cpp Thu Feb 16 00:24:03 2012
@@ -8,6 +8,7 @@
 #include <string>
 #include <sstream>
 
+#include <process/collect.hpp>
 #include <process/clock.hpp>
 #include <process/defer.hpp>
 #include <process/dispatch.hpp>
@@ -73,11 +74,34 @@ TEST(libprocess, future)
 {
   Promise<bool> promise;
   promise.set(true);
-  promise.future().await();
+  ASSERT_TRUE(promise.future().isReady());
   EXPECT_TRUE(promise.future().get());
 }
 
 
+TEST(libprocess, associate)
+{
+  Promise<bool> promise1;
+  Future<bool> future1(true);
+  promise1.associate(future1);
+  ASSERT_TRUE(promise1.future().isReady());
+  EXPECT_TRUE(promise1.future().get());
+
+  Promise<bool> promise2;
+  Future<bool> future2;
+  promise2.associate(future2);
+  future2.discard();
+  ASSERT_TRUE(promise2.future().isDiscarded());
+
+  Promise<bool> promise3;
+  Promise<bool> promise4;
+  promise3.associate(promise4.future());
+  promise4.fail("associate");
+  ASSERT_TRUE(promise3.future().isFailed());
+  EXPECT_EQ("associate", promise3.future().failure());
+}
+
+
 class SpawnProcess : public Process<SpawnProcess>
 {
 public:
@@ -553,6 +577,41 @@ TEST(libprocess, select)
 }
 
 
+TEST(libprocess, collect)
+{
+  ASSERT_TRUE(GTEST_IS_THREADSAFE);
+
+  Promise<int> promise1;
+  Promise<int> promise2;
+  Promise<int> promise3;
+  Promise<int> promise4;
+
+  std::set<Future<int> > futures;
+  futures.insert(promise1.future());
+  futures.insert(promise2.future());
+  futures.insert(promise3.future());
+  futures.insert(promise4.future());
+
+  promise1.set(1);
+  promise2.set(2);
+  promise3.set(3);
+  promise4.set(4);
+
+  Future<std::set<int> > future = collect(futures);
+
+  EXPECT_TRUE(future.await());
+  EXPECT_TRUE(future.isReady());
+
+  std::set<int> values;
+  values.insert(1);
+  values.insert(2);
+  values.insert(3);
+  values.insert(4);
+
+  EXPECT_EQ(values, future.get());
+}
+
+
 class SettleProcess : public Process<SettleProcess>
 {
 public: