You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@mesos.apache.org by be...@apache.org on 2011/06/05 11:20:26 UTC
svn commit: r1132291 - in /incubator/mesos/trunk/third_party/libprocess:
gc.hpp process.cpp process.hpp run.hpp tests.cpp
Author: benh
Date: Sun Jun 5 09:20:26 2011
New Revision: 1132291
URL: http://svn.apache.org/viewvc?rev=1132291&view=rev
Log:
Updates to libprocess, a few small bug fixes and factoring out the GarbageCollector process as well as adding 'run' functionality and adding tests.
Added:
incubator/mesos/trunk/third_party/libprocess/gc.hpp
incubator/mesos/trunk/third_party/libprocess/run.hpp
Modified:
incubator/mesos/trunk/third_party/libprocess/process.cpp
incubator/mesos/trunk/third_party/libprocess/process.hpp
incubator/mesos/trunk/third_party/libprocess/tests.cpp
Added: incubator/mesos/trunk/third_party/libprocess/gc.hpp
URL: http://svn.apache.org/viewvc/incubator/mesos/trunk/third_party/libprocess/gc.hpp?rev=1132291&view=auto
==============================================================================
--- incubator/mesos/trunk/third_party/libprocess/gc.hpp (added)
+++ incubator/mesos/trunk/third_party/libprocess/gc.hpp Sun Jun 5 09:20:26 2011
@@ -0,0 +1,49 @@
+#ifndef __GC_HPP__
+#define __GC_HPP__
+
+#include <map>
+
+#include <process.hpp>
+
+
+namespace process {
+
+class GarbageCollector : public Process<GarbageCollector>
+{
+public:
+ GarbageCollector() {}
+ virtual ~GarbageCollector() {}
+
+ template <typename T>
+ void manage(const T* t)
+ {
+ const ProcessBase* process = t;
+ if (process != NULL) {
+ processes[process->self()] = process;
+ link(process->self());
+ }
+ }
+
+protected:
+ virtual void operator () ()
+ {
+ while (true) {
+ serve();
+ if (name() == EXITED && processes.count(from()) > 0) {
+ const ProcessBase* process = processes[from()];
+ processes.erase(from());
+ delete process;
+ }
+ }
+ }
+
+private:
+ std::map<UPID, const ProcessBase*> processes;
+};
+
+
+extern PID<GarbageCollector> gc;
+
+} // namespace process {
+
+#endif // __GC_HPP__
Modified: incubator/mesos/trunk/third_party/libprocess/process.cpp
URL: http://svn.apache.org/viewvc/incubator/mesos/trunk/third_party/libprocess/process.cpp?rev=1132291&r1=1132290&r2=1132291&view=diff
==============================================================================
--- incubator/mesos/trunk/third_party/libprocess/process.cpp (original)
+++ incubator/mesos/trunk/third_party/libprocess/process.cpp Sun Jun 5 09:20:26 2011
@@ -49,6 +49,7 @@
#include "fatal.hpp"
#include "foreach.hpp"
#include "gate.hpp"
+#include "gc.hpp"
#include "synchronized.hpp"
#include "tokenize.hpp"
@@ -260,44 +261,6 @@ private:
};
-class GarbageCollector : public Process<GarbageCollector>
-{
-public:
- GarbageCollector() {}
- ~GarbageCollector() {}
-
- template <typename T>
- void manage(const T* t)
- {
- const ProcessBase* process = t;
- if (process != NULL) {
- processes[process->self()] = process;
- link(process->self());
- }
- }
-
-protected:
- virtual void operator () ()
- {
- while (true) {
- serve();
- if (name() == EXITED && processes.count(from()) > 0) {
- const ProcessBase* process = processes[from()];
- processes.erase(from());
- delete process;
- }
- }
- }
-
-private:
- map<UPID, const ProcessBase*> processes;
-};
-
-
-/* Global garbage collector (move to own file). */
-static PID<GarbageCollector> gc;
-
-
class HttpProxy;
@@ -387,7 +350,7 @@ public:
bool deliver(int c, HttpRequest* request, ProcessBase *sender = NULL);
bool deliver(const UPID& to, function<void(ProcessBase*)>* delegator, ProcessBase *sender = NULL);
- UPID spawn(ProcessBase *process);
+ UPID spawn(ProcessBase *process, bool manage);
void link(ProcessBase *process, const UPID &to);
bool receive(ProcessBase *process, double secs);
bool serve(ProcessBase *process, double secs);
@@ -521,6 +484,9 @@ static void *recyclable = NULL;
static Filter *filterer = NULL;
static synchronizable(filterer) = SYNCHRONIZED_INITIALIZER_RECURSIVE;
+/* Global garbage collector. */
+PID<GarbageCollector> gc;
+
int set_nbio(int fd)
{
@@ -1275,9 +1241,6 @@ void HttpResponseWaiter::await(const Fut
HttpProxy::HttpProxy(int _c) : c(_c)
{
- // Get garbage collected!
- dispatch(gc, &GarbageCollector::manage<HttpProxy>, this);
-
// Create our waiter.
waiter = new HttpResponseWaiter(self());
spawn(waiter);
@@ -1422,7 +1385,7 @@ PID<HttpProxy> SocketManager::proxy(int
CHECK(proxies.count(s) == 0);
HttpProxy* proxy = new HttpProxy(s);
- spawn(proxy);
+ spawn(proxy, true);
proxies[s] = proxy;
return proxy->self();
}
@@ -1817,7 +1780,7 @@ bool ProcessManager::deliver(const UPID&
}
-UPID ProcessManager::spawn(ProcessBase *process)
+UPID ProcessManager::spawn(ProcessBase *process, bool manage)
{
assert(process != NULL);
@@ -1884,6 +1847,11 @@ UPID ProcessManager::spawn(ProcessBase *
/* Add process to the run queue. */
enqueue(process);
+ /* Use the garbage collector if requested. */
+ if (manage) {
+ dispatch(gc, &GarbageCollector::manage<ProcessBase>, process);
+ }
+
return process->self();
}
@@ -3036,7 +3004,7 @@ double ProcessBase::elapsed()
}
-UPID ProcessBase::spawn(ProcessBase* process)
+UPID ProcessBase::spawn(ProcessBase* process, bool manage)
{
initialize();
@@ -3054,7 +3022,7 @@ UPID ProcessBase::spawn(ProcessBase* pro
}
}
- return process_manager->spawn(process);
+ return process_manager->spawn(process, manage);
} else {
return UPID();
}
@@ -3122,6 +3090,10 @@ bool wait(const UPID& pid, double secs)
return false;
}
+ if (secs == 0) {
+ return wait(pid);
+ }
+
bool waited = false;
WaitWaiter waiter(pid, secs, &waited);
Modified: incubator/mesos/trunk/third_party/libprocess/process.hpp
URL: http://svn.apache.org/viewvc/incubator/mesos/trunk/third_party/libprocess/process.hpp?rev=1132291&r1=1132290&r2=1132291&view=diff
==============================================================================
--- incubator/mesos/trunk/third_party/libprocess/process.hpp (original)
+++ incubator/mesos/trunk/third_party/libprocess/process.hpp Sun Jun 5 09:20:26 2011
@@ -60,7 +60,7 @@ public:
UPID self() const { return pid; }
- static UPID spawn(ProcessBase* process);
+ static UPID spawn(ProcessBase* process, bool manage = false);
protected:
/* Function run when process spawned. */
@@ -218,11 +218,12 @@ void initialize(bool initialize_google_l
* Spawn a new process.
*
* @param process process to be spawned
+ * @param manage boolean whether process should get garbage collected
*/
template <typename T>
-PID<T> spawn(T* t)
+PID<T> spawn(T* t, bool manage = false)
{
- if (!ProcessBase::spawn(t)) {
+ if (!ProcessBase::spawn(t, manage)) {
return PID<T>();
}
Added: incubator/mesos/trunk/third_party/libprocess/run.hpp
URL: http://svn.apache.org/viewvc/incubator/mesos/trunk/third_party/libprocess/run.hpp?rev=1132291&view=auto
==============================================================================
--- incubator/mesos/trunk/third_party/libprocess/run.hpp (added)
+++ incubator/mesos/trunk/third_party/libprocess/run.hpp Sun Jun 5 09:20:26 2011
@@ -0,0 +1,172 @@
+#ifndef __RUN_HPP__
+#define __RUN_HPP__
+
+#include <process.hpp>
+
+
+namespace process {
+
+template <typename R>
+Future<R> run(R (*method)());
+
+
+template <typename R, typename P1, typename A1>
+Future<R> run(R (*method)(P1), A1 a1);
+
+
+template <typename R,
+ typename P1, typename P2,
+ typename A1, typename A2>
+Future<R> run(R (*method)(P1, P2), A1 a1, A2 a2);
+
+
+template <typename R,
+ typename P1, typename P2, typename P3,
+ typename A1, typename A2, typename A3>
+Future<R> run(R (*method)(P1, P2, P3), A1 a1, A2 a2, A3 a3);
+
+
+template <typename R,
+ typename P1, typename P2, typename P3, typename P4,
+ typename A1, typename A2, typename A3, typename A4>
+Future<R> run(R (*method)(P1, P2, P3, P4), A1 a1, A2 a2, A3 a3, A4 a4);
+
+
+template <typename R,
+ typename P1, typename P2, typename P3, typename P4, typename P5,
+ typename A1, typename A2, typename A3, typename A4, typename A5>
+Future<R> run(R (*method)(P1, P2, P3, P4, P5),
+ A1 a1, A2 a2, A3 a3, A4 a4, A5 a5);
+
+
+namespace internal {
+
+template <typename R>
+class ThunkProcess : public Process<ThunkProcess<R> >
+{
+public:
+ ThunkProcess(std::tr1::function<R(void)>* _thunk, Future<R>* _future)
+ : thunk(_thunk), future(_future) {}
+
+ virtual ~ThunkProcess()
+ {
+ if (thunk != NULL) {
+ delete thunk;
+ }
+
+ if (future != NULL) {
+ delete future;
+ }
+ }
+
+protected:
+ virtual void operator () ()
+ {
+ assert(thunk != NULL);
+ assert(future != NULL);
+ Promise<R>((*thunk)()).associate(*future);
+ }
+
+private:
+ std::tr1::function<R(void)>* thunk;
+ Future<R>* future;
+};
+
+} // namespace internal {
+
+
+template <typename R>
+Future<R> run(R (*method)())
+{
+ std::tr1::function<R(void)>* thunk =
+ new std::tr1::function<R(void)>(std::tr1::bind(method));
+
+ Future<R>* future = new Future<R>();
+
+ spawn(new internal::ThunkProcess<R>(thunk, future), true);
+
+ return *future;
+}
+
+
+template <typename R, typename P1, typename A1>
+Future<R> run(R (*method)(P1), A1 a1)
+{
+ std::tr1::function<R(void)>* thunk =
+ new std::tr1::function<R(void)>(std::tr1::bind(method, a1));
+
+ Future<R>* future = new Future<R>();
+
+ spawn(new internal::ThunkProcess<R>(thunk, future), true);
+
+ return *future;
+}
+
+
+template <typename R,
+ typename P1, typename P2,
+ typename A1, typename A2>
+Future<R> run(R (*method)(P1, P2), A1 a1, A2 a2)
+{
+ std::tr1::function<R(void)>* thunk =
+ new std::tr1::function<R(void)>(std::tr1::bind(method, a1, a2));
+
+ Future<R>* future = new Future<R>();
+
+ spawn(new internal::ThunkProcess<R>(thunk, future), true);
+
+ return *future;
+}
+
+
+template <typename R,
+ typename P1, typename P2, typename P3,
+ typename A1, typename A2, typename A3>
+Future<R> run(R (*method)(P1, P2, P3), A1 a1, A2 a2, A3 a3)
+{
+ std::tr1::function<R(void)>* thunk =
+ new std::tr1::function<R(void)>(std::tr1::bind(method, a1, a2, a3));
+
+ Future<R>* future = new Future<R>();
+
+ spawn(new internal::ThunkProcess<R>(thunk, future), true);
+
+ return *future;
+}
+
+
+template <typename R,
+ typename P1, typename P2, typename P3, typename P4,
+ typename A1, typename A2, typename A3, typename A4>
+Future<R> run(R (*method)(P1, P2, P3, P4), A1 a1, A2 a2, A3 a3, A4 a4)
+{
+ std::tr1::function<R(void)>* thunk =
+ new std::tr1::function<R(void)>(std::tr1::bind(method, a1, a2, a3, a4));
+
+ Future<R>* future = new Future<R>();
+
+ spawn(new internal::ThunkProcess<R>(thunk, future), true);
+
+ return *future;
+}
+
+
+template <typename R,
+ typename P1, typename P2, typename P3, typename P4, typename P5,
+ typename A1, typename A2, typename A3, typename A4, typename A5>
+Future<R> run(R (*method)(P1, P2, P3, P4, P5),
+ A1 a1, A2 a2, A3 a3, A4 a4, A5 a5)
+{
+ std::tr1::function<R(void)>* thunk =
+ new std::tr1::function<R(void)>(std::tr1::bind(method, a1, a2, a3, a4, a5));
+
+ Future<R>* future = new Future<R>();
+
+ spawn(new internal::ThunkProcess<R>(thunk, future), true);
+
+ return *future;
+}
+
+} // namespace process {
+
+#endif // __RUN_HPP__
Modified: incubator/mesos/trunk/third_party/libprocess/tests.cpp
URL: http://svn.apache.org/viewvc/incubator/mesos/trunk/third_party/libprocess/tests.cpp?rev=1132291&r1=1132290&r2=1132291&view=diff
==============================================================================
--- incubator/mesos/trunk/third_party/libprocess/tests.cpp (original)
+++ incubator/mesos/trunk/third_party/libprocess/tests.cpp Sun Jun 5 09:20:26 2011
@@ -1,6 +1,7 @@
#include <gmock/gmock.h>
#include <process.hpp>
+#include <run.hpp>
using process::Future;
using process::PID;
@@ -190,6 +191,29 @@ TEST(libprocess, inheritance)
}
+TEST(libprocess, thunk)
+{
+ ASSERT_TRUE(GTEST_IS_THREADSAFE);
+
+ struct Thunk
+ {
+ static int run(int i)
+ {
+ return i;
+ }
+
+ static int run(int i, int j)
+ {
+ return run(i + j);
+ }
+ };
+
+ int result = process::run(&Thunk::run, 21, 21);
+
+ EXPECT_EQ(42, result);
+}
+
+
int main(int argc, char** argv)
{
// Initialize Google Mock/Test.