You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@mesos.apache.org by be...@apache.org on 2011/06/05 08:38:58 UTC

svn commit: r1131940 [3/3] - in /incubator/mesos/trunk/src/third_party/libprocess: process.cpp process.hpp synchronized.cpp synchronized.hpp todo utility.hpp

Modified: incubator/mesos/trunk/src/third_party/libprocess/process.hpp
URL: http://svn.apache.org/viewvc/incubator/mesos/trunk/src/third_party/libprocess/process.hpp?rev=1131940&r1=1131939&r2=1131940&view=diff
==============================================================================
--- incubator/mesos/trunk/src/third_party/libprocess/process.hpp (original)
+++ incubator/mesos/trunk/src/third_party/libprocess/process.hpp Sun Jun  5 06:38:58 2011
@@ -7,13 +7,6 @@
 
 #include <sys/time.h>
 
-#ifdef USE_LITHE
-#include <lithe.hh>
-
-#include <ht/ht.h>
-#include <ht/spinlock.h>
-#endif /* USE_LITHE */
-
 #include <iostream>
 #include <map>
 #include <queue>
@@ -67,104 +60,36 @@ public:
   virtual bool filter(struct msg *) = 0;
 };
 
-#ifdef USE_LITHE
-
-using lithe::Scheduler;
-
-class ProcessScheduler : public Scheduler
-{
-private:
-  int lock;
-  int waiter;
-  lithe_task_t task;
-  std::map<lithe_sched_t *, std::pair<int, int> > children;
-
-protected:
-  void enter();
-  void yield(lithe_sched_t *child);
-  void reg(lithe_sched_t *child);
-  void unreg(lithe_sched_t *child);
-  void request(lithe_sched_t *child, int k);
-  void unblock(lithe_task_t *task);
-
-  void schedule();
-
-public:
-  ProcessScheduler();
-  ~ProcessScheduler();
-};
-
-#else
-
-void * schedule(void *arg);
-
-#endif /* USE_LITHE */
-
 
 class Process {
-private:
-  friend class LinkManager;
-  friend class ProcessManager;
-#ifdef USE_LITHE
-  friend class ProcessScheduler;
-#else
-  friend void * schedule(void *arg);
-#endif /* USE_LITHE */
-
-  /* Flag indicating state of process. */
-  enum { INIT,
-	 READY,
-	 RUNNING,
-	 RECEIVING,
-	 PAUSED,
-	 AWAITING,
-	 WAITING,
-	 INTERRUPTED,
-	 TIMEDOUT,
-	 EXITED } state;
-
-  /* Queue of received messages. */
-  std::deque<struct msg *> msgs;
+public:
+  virtual ~Process();
 
-  /* Current message. */
-  struct msg *current;
+  /* Returns pid of process; valid even before calling spawn. */
+  PID getPID() const;
 
-  /* Current "blocking" generation. */
-  int generation;
+  /* Sends a message to PID without a return address. */
+  static void post(const PID &to, MSGID id);
 
-  /* Process PID. */
-  PID pid;
+  /* Sends a message with data to PID without a return address. */
+  static void post(const PID &to, MSGID id, const char *data, size_t length);
 
-#ifdef USE_LITHE
-  lithe_task_t task;
-#endif /* USE_LITHE */
+  /* Spawn a new process. */
+  static PID spawn(Process *process);
 
-  /* Continuation/Context of process. */
-  ucontext_t uctx;
+  /* Wait for PID to exit (returns true if actually waited on a process). */
+  static bool wait(const PID &pid);
 
-  /* Lock/mutex protecting internals. */
-#ifdef USE_LITHE
-  int l;
-  void lock() { spinlock_lock(&l); }
-  void unlock() { spinlock_unlock(&l); }
-#else
-  pthread_mutex_t m;
-  void lock() { pthread_mutex_lock(&m); }
-  void unlock() { pthread_mutex_unlock(&m); }
-#endif /* USE_LITHE */
+  /* Wait for PID to exit (returns true if actually waited on a process). */
+  static bool wait(Process *process);
 
-  /* Enqueues the specified message. */
-  void enqueue(struct msg *msg);
+  /* Invoke the thunk in a legacy safe way. */
+  static void invoke(const std::tr1::function<void (void)> &thunk);
 
-  /* Dequeues a message or returns NULL. */
-  struct msg * dequeue();
+  /* Filter messages to be enqueued (except for timeout messages). */
+  static void filter(MessageFilter *);
 
-#if defined(SWIGPYTHON) || defined(SWIGRUBY)
-public:
-#else
 protected:
-#endif /* SWIG */
-
   Process();
 
   /* Function run when process spawned. */
@@ -227,32 +152,53 @@ protected:
   /* Returns sub-second elapsed time (according to this process). */
   double elapsed();
 
-public:
-  virtual ~Process();
+private:
+  friend class LinkManager;
+  friend class ProcessManager;
+  friend class ProcessReference;
+  friend void * schedule(void *arg);
 
-  /* Returns pid of process; valid even before calling spawn. */
-  PID getPID() const;
+  /* Flag indicating state of process. */
+  enum { INIT,
+	 READY,
+	 RUNNING,
+	 RECEIVING,
+	 PAUSED,
+	 AWAITING,
+	 WAITING,
+	 INTERRUPTED,
+	 TIMEDOUT,
+         EXITING,
+	 EXITED } state;
 
-  /* Sends a message to PID without a return address. */
-  static void post(const PID &to, MSGID id);
+  /* Active references. */
+  int refs;
 
-  /* Sends a message with data to PID without a return address. */
-  static void post(const PID &to, MSGID id, const char *data, size_t length);
+  /* Queue of received messages. */
+  std::deque<struct msg *> msgs;
 
-  /* Spawn a new process. */
-  static PID spawn(Process *process);
+  /* Current message. */
+  struct msg *current;
 
-  /* Wait for PID to exit (returns true if actually waited on a process). */
-  static bool wait(PID pid);
+  /* Current "blocking" generation. */
+  int generation;
 
-  /* Wait for PID to exit (returns true if actually waited on a process). */
-  static bool wait(Process *process);
+  /* Process PID. */
+  PID pid;
 
-  /* Invoke the thunk in a legacy safe way. */
-  static void invoke(const std::tr1::function<void (void)> &thunk);
+  /* Continuation/Context of process. */
+  ucontext_t uctx;
 
-  /* Filter messages to be enqueued (except for timeout messages). */
-  static void filter(MessageFilter *);
+  /* Lock/mutex protecting internals. */
+  pthread_mutex_t m;
+  void lock() { pthread_mutex_lock(&m); }
+  void unlock() { pthread_mutex_unlock(&m); }
+
+  /* Enqueues the specified message. */
+  void enqueue(struct msg *msg);
+
+  /* Dequeues a message or returns NULL. */
+  struct msg * dequeue();
 };
 
 

Added: incubator/mesos/trunk/src/third_party/libprocess/synchronized.cpp
URL: http://svn.apache.org/viewvc/incubator/mesos/trunk/src/third_party/libprocess/synchronized.cpp?rev=1131940&view=auto
==============================================================================
--- incubator/mesos/trunk/src/third_party/libprocess/synchronized.cpp (added)
+++ incubator/mesos/trunk/src/third_party/libprocess/synchronized.cpp Sun Jun  5 06:38:58 2011
@@ -0,0 +1,66 @@
+#include "synchronized.hpp"
+
+using std::string;
+
+
+static string s1;
+static synchronizable(s1);
+
+static string s2;
+static synchronizable(s2) = SYNCHRONIZED_INITIALIZER;
+
+static string s3;
+static synchronizable(s3) = SYNCHRONIZED_INITIALIZER_RECURSIVE;
+
+
+void bar()
+{
+  synchronized(s3) {
+
+  }
+}
+
+
+void foo()
+{
+  synchronized(s3) {
+    bar();
+  }
+}
+
+
+class Foo
+{
+public:
+  Foo()
+  {
+    synchronizer(s) = SYNCHRONIZED_INITIALIZER_RECURSIVE;
+  }
+
+  void foo()
+  {
+    synchronized(s) {
+      synchronized(s) {
+
+      }
+    }
+  }
+  
+private:
+  string s;
+  synchronizable(s);
+};
+
+
+int main(int argc, char **argv)
+{
+  synchronizer(s1) = SYNCHRONIZED_INITIALIZER_RECURSIVE;
+  //synchronizer(s2) = SYNCHRONIZED_INITIALIZER;
+
+  //foo();
+
+  Foo f;
+  f.foo();
+
+  return 0;
+}

Added: incubator/mesos/trunk/src/third_party/libprocess/synchronized.hpp
URL: http://svn.apache.org/viewvc/incubator/mesos/trunk/src/third_party/libprocess/synchronized.hpp?rev=1131940&view=auto
==============================================================================
--- incubator/mesos/trunk/src/third_party/libprocess/synchronized.hpp (added)
+++ incubator/mesos/trunk/src/third_party/libprocess/synchronized.hpp Sun Jun  5 06:38:58 2011
@@ -0,0 +1,104 @@
+#include <pthread.h>
+
+#include <iostream>
+
+
+class Synchronizable
+{
+public:
+  Synchronizable()
+    : initialized(false) {}
+
+  explicit Synchronizable(int _type)
+    : type(_type), initialized(false)
+  {
+    initialize();
+  }
+
+  Synchronizable(const Synchronizable &that)
+  {
+    type = that.type;
+    initialize();
+  }
+
+  Synchronizable & operator = (const Synchronizable &that)
+  {
+    type = that.type;
+    initialize();
+    return *this;
+  }
+
+  void acquire()
+  {
+    if (!initialized) {
+      std::cerr << "synchronizable not initialized" << std::endl;
+      abort();
+    }
+    pthread_mutex_lock(&mutex);
+  }
+
+  void release()
+  {
+    if (!initialized) {
+      std::cerr << "synchronizable not initialized" << std::endl;
+      abort();
+    }
+    pthread_mutex_unlock(&mutex);
+  }
+
+private:
+  void initialize()
+  {
+    if (!initialized) {
+      pthread_mutexattr_t attr;
+      pthread_mutexattr_init(&attr);
+      pthread_mutexattr_settype(&attr, type);
+      pthread_mutex_init(&mutex, &attr);
+      pthread_mutexattr_destroy(&attr);
+      initialized = true;
+    } else {
+      std::cerr << "synchronizable already initialized" << std::endl;
+      abort();
+    }
+  }
+
+  int type;
+  bool initialized;
+  pthread_mutex_t mutex;
+};
+
+
+class Synchronized
+{
+public:
+  Synchronized(Synchronizable *_synchronizable)
+    : synchronizable(_synchronizable)
+  {
+    synchronizable->acquire();
+  }
+
+  ~Synchronized()
+  {
+    synchronizable->release();
+  }
+
+  operator bool () { return true; }
+
+private:
+  Synchronizable *synchronizable;
+};
+
+
+#define synchronized(s)                                                 \
+  if (Synchronized __synchronized ## s = Synchronized(&__synchronizable_ ## s))
+
+#define synchronizable(s)                       \
+  Synchronizable __synchronizable_ ## s
+
+#define synchronizer(s)                         \
+  (__synchronizable_ ## s)
+
+
+#define SYNCHRONIZED_INITIALIZER Synchronizable(PTHREAD_MUTEX_NORMAL)
+#define SYNCHRONIZED_INITIALIZER_DEBUG Synchronizable(PTHREAD_MUTEX_ERRORCHECK)
+#define SYNCHRONIZED_INITIALIZER_RECURSIVE Synchronizable(PTHREAD_MUTEX_RECURSIVE)

Added: incubator/mesos/trunk/src/third_party/libprocess/todo
URL: http://svn.apache.org/viewvc/incubator/mesos/trunk/src/third_party/libprocess/todo?rev=1131940&view=auto
==============================================================================
--- incubator/mesos/trunk/src/third_party/libprocess/todo (added)
+++ incubator/mesos/trunk/src/third_party/libprocess/todo Sun Jun  5 06:38:58 2011
@@ -0,0 +1,23 @@
+/* TODO(benh): Compile with a way to figure out which set of messages
+   you used, and that way when someone with a different set of
+   messages sends you a message you can declare that that message is
+   not in your language of understanding. */
+/* TODO(benh): Fix link functionality (processes need to send
+   process_exit message since a dead process on one node might not
+   know that a process on another node linked with it). */
+/* TODO(benh): What happens when a remote link exits? Do we close the
+   socket correclty?. */
+/* TODO(benh): Revisit receive, pause, and await semantics. */
+/* TODO(benh): Handle/Enable forking. */
+/* TODO(benh): Use multiple processing threads (do process affinity). */
+/* TODO(benh): Better error handling (i.e., warn if re-spawn process). */
+/* TODO(benh): Better protocol format checking in read_msg. */
+/* TODO(benh): Use different backends for files and sockets. */
+/* TODO(benh): Allow messages to be received out-of-order (i.e., allow
+   someone to do a receive with a message id and let other messages
+   queue until a message with that message id is received).  */
+/* TODO(benh): LinkManager::link and LinkManager::send are pretty big
+   functions, we could probably create some queue that the I/O thread
+   checks for sending messages and creating links instead ... that
+   would probably be faster, and have less contention for the mutex
+   (that might mean we can eliminate contention for the mutex!). */

Modified: incubator/mesos/trunk/src/third_party/libprocess/utility.hpp
URL: http://svn.apache.org/viewvc/incubator/mesos/trunk/src/third_party/libprocess/utility.hpp?rev=1131940&r1=1131939&r2=1131940&view=diff
==============================================================================
--- incubator/mesos/trunk/src/third_party/libprocess/utility.hpp (original)
+++ incubator/mesos/trunk/src/third_party/libprocess/utility.hpp Sun Jun  5 06:38:58 2011
@@ -2,7 +2,7 @@
 #include <string>
 
 std::string
-operator+(const std::string &s, int i)
+operator + (const std::string &s, int i)
 {
   std::stringstream out;
   out << i;