You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@mesos.apache.org by be...@apache.org on 2011/06/05 08:38:58 UTC
svn commit: r1131940 [3/3] - in
/incubator/mesos/trunk/src/third_party/libprocess: process.cpp process.hpp
synchronized.cpp synchronized.hpp todo utility.hpp
Modified: incubator/mesos/trunk/src/third_party/libprocess/process.hpp
URL: http://svn.apache.org/viewvc/incubator/mesos/trunk/src/third_party/libprocess/process.hpp?rev=1131940&r1=1131939&r2=1131940&view=diff
==============================================================================
--- incubator/mesos/trunk/src/third_party/libprocess/process.hpp (original)
+++ incubator/mesos/trunk/src/third_party/libprocess/process.hpp Sun Jun 5 06:38:58 2011
@@ -7,13 +7,6 @@
#include <sys/time.h>
-#ifdef USE_LITHE
-#include <lithe.hh>
-
-#include <ht/ht.h>
-#include <ht/spinlock.h>
-#endif /* USE_LITHE */
-
#include <iostream>
#include <map>
#include <queue>
@@ -67,104 +60,36 @@ public:
virtual bool filter(struct msg *) = 0;
};
-#ifdef USE_LITHE
-
-using lithe::Scheduler;
-
-class ProcessScheduler : public Scheduler
-{
-private:
- int lock;
- int waiter;
- lithe_task_t task;
- std::map<lithe_sched_t *, std::pair<int, int> > children;
-
-protected:
- void enter();
- void yield(lithe_sched_t *child);
- void reg(lithe_sched_t *child);
- void unreg(lithe_sched_t *child);
- void request(lithe_sched_t *child, int k);
- void unblock(lithe_task_t *task);
-
- void schedule();
-
-public:
- ProcessScheduler();
- ~ProcessScheduler();
-};
-
-#else
-
-void * schedule(void *arg);
-
-#endif /* USE_LITHE */
-
class Process {
-private:
- friend class LinkManager;
- friend class ProcessManager;
-#ifdef USE_LITHE
- friend class ProcessScheduler;
-#else
- friend void * schedule(void *arg);
-#endif /* USE_LITHE */
-
- /* Flag indicating state of process. */
- enum { INIT,
- READY,
- RUNNING,
- RECEIVING,
- PAUSED,
- AWAITING,
- WAITING,
- INTERRUPTED,
- TIMEDOUT,
- EXITED } state;
-
- /* Queue of received messages. */
- std::deque<struct msg *> msgs;
+public:
+ virtual ~Process();
- /* Current message. */
- struct msg *current;
+ /* Returns pid of process; valid even before calling spawn. */
+ PID getPID() const;
- /* Current "blocking" generation. */
- int generation;
+ /* Sends a message to PID without a return address. */
+ static void post(const PID &to, MSGID id);
- /* Process PID. */
- PID pid;
+ /* Sends a message with data to PID without a return address. */
+ static void post(const PID &to, MSGID id, const char *data, size_t length);
-#ifdef USE_LITHE
- lithe_task_t task;
-#endif /* USE_LITHE */
+ /* Spawn a new process. */
+ static PID spawn(Process *process);
- /* Continuation/Context of process. */
- ucontext_t uctx;
+ /* Wait for PID to exit (returns true if actually waited on a process). */
+ static bool wait(const PID &pid);
- /* Lock/mutex protecting internals. */
-#ifdef USE_LITHE
- int l;
- void lock() { spinlock_lock(&l); }
- void unlock() { spinlock_unlock(&l); }
-#else
- pthread_mutex_t m;
- void lock() { pthread_mutex_lock(&m); }
- void unlock() { pthread_mutex_unlock(&m); }
-#endif /* USE_LITHE */
+ /* Wait for PID to exit (returns true if actually waited on a process). */
+ static bool wait(Process *process);
- /* Enqueues the specified message. */
- void enqueue(struct msg *msg);
+ /* Invoke the thunk in a legacy safe way. */
+ static void invoke(const std::tr1::function<void (void)> &thunk);
- /* Dequeues a message or returns NULL. */
- struct msg * dequeue();
+ /* Filter messages to be enqueued (except for timeout messages). */
+ static void filter(MessageFilter *);
-#if defined(SWIGPYTHON) || defined(SWIGRUBY)
-public:
-#else
protected:
-#endif /* SWIG */
-
Process();
/* Function run when process spawned. */
@@ -227,32 +152,53 @@ protected:
/* Returns sub-second elapsed time (according to this process). */
double elapsed();
-public:
- virtual ~Process();
+private:
+ friend class LinkManager;
+ friend class ProcessManager;
+ friend class ProcessReference;
+ friend void * schedule(void *arg);
- /* Returns pid of process; valid even before calling spawn. */
- PID getPID() const;
+ /* Flag indicating state of process. */
+ enum { INIT,
+ READY,
+ RUNNING,
+ RECEIVING,
+ PAUSED,
+ AWAITING,
+ WAITING,
+ INTERRUPTED,
+ TIMEDOUT,
+ EXITING,
+ EXITED } state;
- /* Sends a message to PID without a return address. */
- static void post(const PID &to, MSGID id);
+ /* Active references. */
+ int refs;
- /* Sends a message with data to PID without a return address. */
- static void post(const PID &to, MSGID id, const char *data, size_t length);
+ /* Queue of received messages. */
+ std::deque<struct msg *> msgs;
- /* Spawn a new process. */
- static PID spawn(Process *process);
+ /* Current message. */
+ struct msg *current;
- /* Wait for PID to exit (returns true if actually waited on a process). */
- static bool wait(PID pid);
+ /* Current "blocking" generation. */
+ int generation;
- /* Wait for PID to exit (returns true if actually waited on a process). */
- static bool wait(Process *process);
+ /* Process PID. */
+ PID pid;
- /* Invoke the thunk in a legacy safe way. */
- static void invoke(const std::tr1::function<void (void)> &thunk);
+ /* Continuation/Context of process. */
+ ucontext_t uctx;
- /* Filter messages to be enqueued (except for timeout messages). */
- static void filter(MessageFilter *);
+ /* Lock/mutex protecting internals. */
+ pthread_mutex_t m;
+ void lock() { pthread_mutex_lock(&m); }
+ void unlock() { pthread_mutex_unlock(&m); }
+
+ /* Enqueues the specified message. */
+ void enqueue(struct msg *msg);
+
+ /* Dequeues a message or returns NULL. */
+ struct msg * dequeue();
};
Added: incubator/mesos/trunk/src/third_party/libprocess/synchronized.cpp
URL: http://svn.apache.org/viewvc/incubator/mesos/trunk/src/third_party/libprocess/synchronized.cpp?rev=1131940&view=auto
==============================================================================
--- incubator/mesos/trunk/src/third_party/libprocess/synchronized.cpp (added)
+++ incubator/mesos/trunk/src/third_party/libprocess/synchronized.cpp Sun Jun 5 06:38:58 2011
@@ -0,0 +1,66 @@
+#include "synchronized.hpp"
+
+using std::string;
+
+
+static string s1;
+static synchronizable(s1);
+
+static string s2;
+static synchronizable(s2) = SYNCHRONIZED_INITIALIZER;
+
+static string s3;
+static synchronizable(s3) = SYNCHRONIZED_INITIALIZER_RECURSIVE;
+
+
+void bar()
+{
+ synchronized(s3) {
+
+ }
+}
+
+
+void foo()
+{
+ synchronized(s3) {
+ bar();
+ }
+}
+
+
+class Foo
+{
+public:
+ Foo()
+ {
+ synchronizer(s) = SYNCHRONIZED_INITIALIZER_RECURSIVE;
+ }
+
+ void foo()
+ {
+ synchronized(s) {
+ synchronized(s) {
+
+ }
+ }
+ }
+
+private:
+ string s;
+ synchronizable(s);
+};
+
+
+int main(int argc, char **argv)
+{
+ synchronizer(s1) = SYNCHRONIZED_INITIALIZER_RECURSIVE;
+ //synchronizer(s2) = SYNCHRONIZED_INITIALIZER;
+
+ //foo();
+
+ Foo f;
+ f.foo();
+
+ return 0;
+}
Added: incubator/mesos/trunk/src/third_party/libprocess/synchronized.hpp
URL: http://svn.apache.org/viewvc/incubator/mesos/trunk/src/third_party/libprocess/synchronized.hpp?rev=1131940&view=auto
==============================================================================
--- incubator/mesos/trunk/src/third_party/libprocess/synchronized.hpp (added)
+++ incubator/mesos/trunk/src/third_party/libprocess/synchronized.hpp Sun Jun 5 06:38:58 2011
@@ -0,0 +1,104 @@
+#include <pthread.h>
+
+#include <iostream>
+
+
+class Synchronizable
+{
+public:
+ Synchronizable()
+ : initialized(false) {}
+
+ explicit Synchronizable(int _type)
+ : type(_type), initialized(false)
+ {
+ initialize();
+ }
+
+ Synchronizable(const Synchronizable &that)
+ {
+ type = that.type;
+ initialize();
+ }
+
+ Synchronizable & operator = (const Synchronizable &that)
+ {
+ type = that.type;
+ initialize();
+ return *this;
+ }
+
+ void acquire()
+ {
+ if (!initialized) {
+ std::cerr << "synchronizable not initialized" << std::endl;
+ abort();
+ }
+ pthread_mutex_lock(&mutex);
+ }
+
+ void release()
+ {
+ if (!initialized) {
+ std::cerr << "synchronizable not initialized" << std::endl;
+ abort();
+ }
+ pthread_mutex_unlock(&mutex);
+ }
+
+private:
+ void initialize()
+ {
+ if (!initialized) {
+ pthread_mutexattr_t attr;
+ pthread_mutexattr_init(&attr);
+ pthread_mutexattr_settype(&attr, type);
+ pthread_mutex_init(&mutex, &attr);
+ pthread_mutexattr_destroy(&attr);
+ initialized = true;
+ } else {
+ std::cerr << "synchronizable already initialized" << std::endl;
+ abort();
+ }
+ }
+
+ int type;
+ bool initialized;
+ pthread_mutex_t mutex;
+};
+
+
+class Synchronized
+{
+public:
+ Synchronized(Synchronizable *_synchronizable)
+ : synchronizable(_synchronizable)
+ {
+ synchronizable->acquire();
+ }
+
+ ~Synchronized()
+ {
+ synchronizable->release();
+ }
+
+ operator bool () { return true; }
+
+private:
+ Synchronizable *synchronizable;
+};
+
+
+#define synchronized(s) \
+ if (Synchronized __synchronized ## s = Synchronized(&__synchronizable_ ## s))
+
+#define synchronizable(s) \
+ Synchronizable __synchronizable_ ## s
+
+#define synchronizer(s) \
+ (__synchronizable_ ## s)
+
+
+#define SYNCHRONIZED_INITIALIZER Synchronizable(PTHREAD_MUTEX_NORMAL)
+#define SYNCHRONIZED_INITIALIZER_DEBUG Synchronizable(PTHREAD_MUTEX_ERRORCHECK)
+#define SYNCHRONIZED_INITIALIZER_RECURSIVE Synchronizable(PTHREAD_MUTEX_RECURSIVE)
Added: incubator/mesos/trunk/src/third_party/libprocess/todo
URL: http://svn.apache.org/viewvc/incubator/mesos/trunk/src/third_party/libprocess/todo?rev=1131940&view=auto
==============================================================================
--- incubator/mesos/trunk/src/third_party/libprocess/todo (added)
+++ incubator/mesos/trunk/src/third_party/libprocess/todo Sun Jun 5 06:38:58 2011
@@ -0,0 +1,23 @@
+/* TODO(benh): Compile with a way to figure out which set of messages
+ you used, and that way when someone with a different set of
+ messages sends you a message you can declare that that message is
+ not in your language of understanding. */
+/* TODO(benh): Fix link functionality (processes need to send
+ process_exit message since a dead process on one node might not
+ know that a process on another node linked with it). */
+/* TODO(benh): What happens when a remote link exits? Do we close the
+ socket correclty?. */
+/* TODO(benh): Revisit receive, pause, and await semantics. */
+/* TODO(benh): Handle/Enable forking. */
+/* TODO(benh): Use multiple processing threads (do process affinity). */
+/* TODO(benh): Better error handling (i.e., warn if re-spawn process). */
+/* TODO(benh): Better protocol format checking in read_msg. */
+/* TODO(benh): Use different backends for files and sockets. */
+/* TODO(benh): Allow messages to be received out-of-order (i.e., allow
+ someone to do a receive with a message id and let other messages
+ queue until a message with that message id is received). */
+/* TODO(benh): LinkManager::link and LinkManager::send are pretty big
+ functions, we could probably create some queue that the I/O thread
+ checks for sending messages and creating links instead ... that
+ would probably be faster, and have less contention for the mutex
+ (that might mean we can eliminate contention for the mutex!). */
Modified: incubator/mesos/trunk/src/third_party/libprocess/utility.hpp
URL: http://svn.apache.org/viewvc/incubator/mesos/trunk/src/third_party/libprocess/utility.hpp?rev=1131940&r1=1131939&r2=1131940&view=diff
==============================================================================
--- incubator/mesos/trunk/src/third_party/libprocess/utility.hpp (original)
+++ incubator/mesos/trunk/src/third_party/libprocess/utility.hpp Sun Jun 5 06:38:58 2011
@@ -2,7 +2,7 @@
#include <string>
std::string
-operator+(const std::string &s, int i)
+operator + (const std::string &s, int i)
{
std::stringstream out;
out << i;