You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@mesos.apache.org by be...@apache.org on 2011/06/05 11:20:26 UTC

svn commit: r1132291 - in /incubator/mesos/trunk/third_party/libprocess: gc.hpp process.cpp process.hpp run.hpp tests.cpp

Author: benh
Date: Sun Jun  5 09:20:26 2011
New Revision: 1132291

URL: http://svn.apache.org/viewvc?rev=1132291&view=rev
Log:
Updates to libprocess, a few small bug fixes and factoring out the GarbageCollector process as well as adding 'run' functionality and adding tests.

Added:
    incubator/mesos/trunk/third_party/libprocess/gc.hpp
    incubator/mesos/trunk/third_party/libprocess/run.hpp
Modified:
    incubator/mesos/trunk/third_party/libprocess/process.cpp
    incubator/mesos/trunk/third_party/libprocess/process.hpp
    incubator/mesos/trunk/third_party/libprocess/tests.cpp

Added: incubator/mesos/trunk/third_party/libprocess/gc.hpp
URL: http://svn.apache.org/viewvc/incubator/mesos/trunk/third_party/libprocess/gc.hpp?rev=1132291&view=auto
==============================================================================
--- incubator/mesos/trunk/third_party/libprocess/gc.hpp (added)
+++ incubator/mesos/trunk/third_party/libprocess/gc.hpp Sun Jun  5 09:20:26 2011
@@ -0,0 +1,49 @@
+#ifndef __GC_HPP__
+#define __GC_HPP__
+
+#include <map>
+
+#include <process.hpp>
+
+
+namespace process {
+
+class GarbageCollector : public Process<GarbageCollector>
+{
+public:
+  GarbageCollector() {}
+  virtual ~GarbageCollector() {}
+
+  template <typename T>
+  void manage(const T* t)
+  {
+    const ProcessBase* process = t;
+    if (process != NULL) {
+      processes[process->self()] = process;
+      link(process->self());
+    }
+  }
+
+protected:
+  virtual void operator () ()
+  {
+    while (true) {
+      serve();
+      if (name() == EXITED && processes.count(from()) > 0) {
+        const ProcessBase* process = processes[from()];
+        processes.erase(from());
+        delete process;
+      }
+    }
+  }
+
+private:
+  std::map<UPID, const ProcessBase*> processes;
+};
+
+
+extern PID<GarbageCollector> gc;
+
+} // namespace process {
+
+#endif // __GC_HPP__

Modified: incubator/mesos/trunk/third_party/libprocess/process.cpp
URL: http://svn.apache.org/viewvc/incubator/mesos/trunk/third_party/libprocess/process.cpp?rev=1132291&r1=1132290&r2=1132291&view=diff
==============================================================================
--- incubator/mesos/trunk/third_party/libprocess/process.cpp (original)
+++ incubator/mesos/trunk/third_party/libprocess/process.cpp Sun Jun  5 09:20:26 2011
@@ -49,6 +49,7 @@
 #include "fatal.hpp"
 #include "foreach.hpp"
 #include "gate.hpp"
+#include "gc.hpp"
 #include "synchronized.hpp"
 #include "tokenize.hpp"
 
@@ -260,44 +261,6 @@ private:
 };
 
 
-class GarbageCollector : public Process<GarbageCollector>
-{
-public:
-  GarbageCollector() {}
-  ~GarbageCollector() {}
-
-  template <typename T>
-  void manage(const T* t)
-  {
-    const ProcessBase* process = t;
-    if (process != NULL) {
-      processes[process->self()] = process;
-      link(process->self());
-    }
-  }
-
-protected:
-  virtual void operator () ()
-  {
-    while (true) {
-      serve();
-      if (name() == EXITED && processes.count(from()) > 0) {
-        const ProcessBase* process = processes[from()];
-        processes.erase(from());
-        delete process;
-      }
-    }
-  }
-
-private:
-  map<UPID, const ProcessBase*> processes;
-};
-
-
-/* Global garbage collector (move to own file). */
-static PID<GarbageCollector> gc;
-
-
 class HttpProxy;
 
 
@@ -387,7 +350,7 @@ public:
   bool deliver(int c, HttpRequest* request, ProcessBase *sender = NULL);
   bool deliver(const UPID& to, function<void(ProcessBase*)>* delegator, ProcessBase *sender = NULL);
 
-  UPID spawn(ProcessBase *process);
+  UPID spawn(ProcessBase *process, bool manage);
   void link(ProcessBase *process, const UPID &to);
   bool receive(ProcessBase *process, double secs);
   bool serve(ProcessBase *process, double secs);
@@ -521,6 +484,9 @@ static void *recyclable = NULL;
 static Filter *filterer = NULL;
 static synchronizable(filterer) = SYNCHRONIZED_INITIALIZER_RECURSIVE;
 
+/* Global garbage collector. */
+PID<GarbageCollector> gc;
+
 
 int set_nbio(int fd)
 {
@@ -1275,9 +1241,6 @@ void HttpResponseWaiter::await(const Fut
 
 HttpProxy::HttpProxy(int _c) : c(_c)
 {
-  // Get garbage collected!
-  dispatch(gc, &GarbageCollector::manage<HttpProxy>, this);
-
   // Create our waiter.
   waiter = new HttpResponseWaiter(self());
   spawn(waiter);
@@ -1422,7 +1385,7 @@ PID<HttpProxy> SocketManager::proxy(int 
       CHECK(proxies.count(s) == 0);
 
       HttpProxy* proxy = new HttpProxy(s);
-      spawn(proxy);
+      spawn(proxy, true);
       proxies[s] = proxy;
       return proxy->self();
     }
@@ -1817,7 +1780,7 @@ bool ProcessManager::deliver(const UPID&
 }
 
 
-UPID ProcessManager::spawn(ProcessBase *process)
+UPID ProcessManager::spawn(ProcessBase *process, bool manage)
 {
   assert(process != NULL);
 
@@ -1884,6 +1847,11 @@ UPID ProcessManager::spawn(ProcessBase *
   /* Add process to the run queue. */
   enqueue(process);
 
+  /* Use the garbage collector if requested. */
+  if (manage) {
+    dispatch(gc, &GarbageCollector::manage<ProcessBase>, process);
+  }
+
   return process->self();
 }
 
@@ -3036,7 +3004,7 @@ double ProcessBase::elapsed()
 }
 
 
-UPID ProcessBase::spawn(ProcessBase* process)
+UPID ProcessBase::spawn(ProcessBase* process, bool manage)
 {
   initialize();
 
@@ -3054,7 +3022,7 @@ UPID ProcessBase::spawn(ProcessBase* pro
       }
     }
 
-    return process_manager->spawn(process);
+    return process_manager->spawn(process, manage);
   } else {
     return UPID();
   }
@@ -3122,6 +3090,10 @@ bool wait(const UPID& pid, double secs)
     return false;
   }
 
+  if (secs == 0) {
+    return wait(pid);
+  }
+
   bool waited = false;
 
   WaitWaiter waiter(pid, secs, &waited);

Modified: incubator/mesos/trunk/third_party/libprocess/process.hpp
URL: http://svn.apache.org/viewvc/incubator/mesos/trunk/third_party/libprocess/process.hpp?rev=1132291&r1=1132290&r2=1132291&view=diff
==============================================================================
--- incubator/mesos/trunk/third_party/libprocess/process.hpp (original)
+++ incubator/mesos/trunk/third_party/libprocess/process.hpp Sun Jun  5 09:20:26 2011
@@ -60,7 +60,7 @@ public:
 
   UPID self() const { return pid; }
 
-  static UPID spawn(ProcessBase* process);
+  static UPID spawn(ProcessBase* process, bool manage = false);
 
 protected:
   /* Function run when process spawned. */
@@ -218,11 +218,12 @@ void initialize(bool initialize_google_l
  * Spawn a new process.
  *
  * @param process process to be spawned
+ * @param manage boolean whether process should get garbage collected
  */
 template <typename T>
-PID<T> spawn(T* t)
+PID<T> spawn(T* t, bool manage = false)
 {
-  if (!ProcessBase::spawn(t)) {
+  if (!ProcessBase::spawn(t, manage)) {
     return PID<T>();
   }
 

Added: incubator/mesos/trunk/third_party/libprocess/run.hpp
URL: http://svn.apache.org/viewvc/incubator/mesos/trunk/third_party/libprocess/run.hpp?rev=1132291&view=auto
==============================================================================
--- incubator/mesos/trunk/third_party/libprocess/run.hpp (added)
+++ incubator/mesos/trunk/third_party/libprocess/run.hpp Sun Jun  5 09:20:26 2011
@@ -0,0 +1,172 @@
+#ifndef __RUN_HPP__
+#define __RUN_HPP__
+
+#include <process.hpp>
+
+
+namespace process {
+
+template <typename R>
+Future<R> run(R (*method)());
+
+
+template <typename R, typename P1, typename A1>
+Future<R> run(R (*method)(P1), A1 a1);
+
+
+template <typename R,
+          typename P1, typename P2,
+          typename A1, typename A2>
+Future<R> run(R (*method)(P1, P2), A1 a1, A2 a2);
+
+
+template <typename R,
+          typename P1, typename P2, typename P3,
+          typename A1, typename A2, typename A3>
+Future<R> run(R (*method)(P1, P2, P3), A1 a1, A2 a2, A3 a3);
+
+
+template <typename R,
+          typename P1, typename P2, typename P3, typename P4,
+          typename A1, typename A2, typename A3, typename A4>
+Future<R> run(R (*method)(P1, P2, P3, P4), A1 a1, A2 a2, A3 a3, A4 a4);
+
+
+template <typename R,
+          typename P1, typename P2, typename P3, typename P4, typename P5,
+          typename A1, typename A2, typename A3, typename A4, typename A5>
+Future<R> run(R (*method)(P1, P2, P3, P4, P5),
+              A1 a1, A2 a2, A3 a3, A4 a4, A5 a5);
+
+
+namespace internal {
+
+template <typename R>
+class ThunkProcess : public Process<ThunkProcess<R> >
+{
+public:
+  ThunkProcess(std::tr1::function<R(void)>* _thunk, Future<R>* _future)
+    : thunk(_thunk), future(_future) {}
+
+  virtual ~ThunkProcess()
+  {
+    if (thunk != NULL) {
+      delete thunk;
+    }
+
+    if (future != NULL) {
+      delete future;
+    }
+  }
+
+protected:
+  virtual void operator () ()
+  {
+    assert(thunk != NULL);
+    assert(future != NULL);
+    Promise<R>((*thunk)()).associate(*future);
+  }
+
+private:
+  std::tr1::function<R(void)>* thunk;
+  Future<R>* future;
+};
+
+} // namespace internal {
+
+
+template <typename R>
+Future<R> run(R (*method)())
+{
+  std::tr1::function<R(void)>* thunk =
+    new std::tr1::function<R(void)>(std::tr1::bind(method));
+
+  Future<R>* future = new Future<R>();
+
+  spawn(new internal::ThunkProcess<R>(thunk, future), true);
+
+  return *future;
+}
+
+
+template <typename R, typename P1, typename A1>
+Future<R> run(R (*method)(P1), A1 a1)
+{
+  std::tr1::function<R(void)>* thunk =
+    new std::tr1::function<R(void)>(std::tr1::bind(method, a1));
+
+  Future<R>* future = new Future<R>();
+
+  spawn(new internal::ThunkProcess<R>(thunk, future), true);
+
+  return *future;
+}
+
+
+template <typename R,
+          typename P1, typename P2,
+          typename A1, typename A2>
+Future<R> run(R (*method)(P1, P2), A1 a1, A2 a2)
+{
+  std::tr1::function<R(void)>* thunk =
+    new std::tr1::function<R(void)>(std::tr1::bind(method, a1, a2));
+
+  Future<R>* future = new Future<R>();
+
+  spawn(new internal::ThunkProcess<R>(thunk, future), true);
+
+  return *future;
+}
+
+
+template <typename R,
+          typename P1, typename P2, typename P3,
+          typename A1, typename A2, typename A3>
+Future<R> run(R (*method)(P1, P2, P3), A1 a1, A2 a2, A3 a3)
+{
+  std::tr1::function<R(void)>* thunk =
+    new std::tr1::function<R(void)>(std::tr1::bind(method, a1, a2, a3));
+
+  Future<R>* future = new Future<R>();
+
+  spawn(new internal::ThunkProcess<R>(thunk, future), true);
+
+  return *future;
+}
+
+
+template <typename R,
+          typename P1, typename P2, typename P3, typename P4,
+          typename A1, typename A2, typename A3, typename A4>
+Future<R> run(R (*method)(P1, P2, P3, P4), A1 a1, A2 a2, A3 a3, A4 a4)
+{
+  std::tr1::function<R(void)>* thunk =
+    new std::tr1::function<R(void)>(std::tr1::bind(method, a1, a2, a3, a4));
+
+  Future<R>* future = new Future<R>();
+
+  spawn(new internal::ThunkProcess<R>(thunk, future), true);
+
+  return *future;
+}
+
+
+template <typename R,
+          typename P1, typename P2, typename P3, typename P4, typename P5,
+          typename A1, typename A2, typename A3, typename A4, typename A5>
+Future<R> run(R (*method)(P1, P2, P3, P4, P5),
+              A1 a1, A2 a2, A3 a3, A4 a4, A5 a5)
+{
+  std::tr1::function<R(void)>* thunk =
+    new std::tr1::function<R(void)>(std::tr1::bind(method, a1, a2, a3, a4, a5));
+
+  Future<R>* future = new Future<R>();
+
+  spawn(new internal::ThunkProcess<R>(thunk, future), true);
+
+  return *future;
+}
+
+} // namespace process {
+
+#endif // __RUN_HPP__

Modified: incubator/mesos/trunk/third_party/libprocess/tests.cpp
URL: http://svn.apache.org/viewvc/incubator/mesos/trunk/third_party/libprocess/tests.cpp?rev=1132291&r1=1132290&r2=1132291&view=diff
==============================================================================
--- incubator/mesos/trunk/third_party/libprocess/tests.cpp (original)
+++ incubator/mesos/trunk/third_party/libprocess/tests.cpp Sun Jun  5 09:20:26 2011
@@ -1,6 +1,7 @@
 #include <gmock/gmock.h>
 
 #include <process.hpp>
+#include <run.hpp>
 
 using process::Future;
 using process::PID;
@@ -190,6 +191,29 @@ TEST(libprocess, inheritance)
 }
 
 
+TEST(libprocess, thunk)
+{
+  ASSERT_TRUE(GTEST_IS_THREADSAFE);
+
+  struct Thunk
+  {
+    static int run(int i)
+    {
+      return i;
+    }
+
+    static int run(int i, int j)
+    {
+      return run(i + j);
+    }
+  };
+
+  int result = process::run(&Thunk::run, 21, 21);
+
+  EXPECT_EQ(42, result);
+}
+
+
 int main(int argc, char** argv)
 {
   // Initialize Google Mock/Test.